diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 297c3ea99..36fcb45a0 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -397,20 +397,30 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS select { case <-t.C: if len(cMsg) >= 0 { + ccMsg := make([]*sarama.ConsumerMessage, 0, 500) + for _, v := range cMsg { + ccMsg = append(ccMsg, v) + + } triggerID = utils.OperationIDGenerator() log.Debug(triggerID, "timer trigger msg consumer start", len(cMsg)) och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: cMsg}} + triggerID: triggerID, cmsgList: ccMsg}} sess.MarkMessage(msg, "") cMsg = cMsg[0:0] log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) } default: if len(cMsg) >= 500 { + ccMsg := make([]*sarama.ConsumerMessage, 0, 500) + for _, v := range cMsg { + ccMsg = append(ccMsg, v) + + } triggerID = utils.OperationIDGenerator() log.Debug(triggerID, "length trigger msg consumer start", len(cMsg)) och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: cMsg}} + triggerID: triggerID, cmsgList: ccMsg}} sess.MarkMessage(msg, "") cMsg = cMsg[0:0] log.Debug(triggerID, "length trigger msg consumer end", len(cMsg))