diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 9ea433faa..8e4e3c2a2 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -416,22 +416,21 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS select { case msg := <-claim.Messages(): log.NewDebug("", "claim.Messages ", msg) - if msg != nil { - cMsg = append(cMsg, msg) - if len(cMsg) >= 1000 { - ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) - for _, v := range cMsg { - ccMsg = append(ccMsg, v) - } - triggerID = utils.OperationIDGenerator() - log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: ccMsg}} - sess.MarkMessage(msg, "") - cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) + cMsg = append(cMsg, msg) + if len(cMsg) >= 1000 { + ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) + for _, v := range cMsg { + ccMsg = append(ccMsg, v) } + triggerID = utils.OperationIDGenerator() + log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg}} + sess.MarkMessage(msg, "") + cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) } + case <-t.C: if len(cMsg) > 0 { ccMsg := make([]*sarama.ConsumerMessage, 0, 1000)