diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 3deed6a23..095319f10 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -446,11 +446,17 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS } cMsg = make([]*sarama.ConsumerMessage, 0, 1000) rwLock.Unlock() - + split := 1000 triggerID = utils.OperationIDGenerator() - log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: ccMsg}} + for i := 0; i < len(ccMsg)/split; i++ { + log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg[i*split : (i+1)*split]}} + } + if (len(ccMsg) % split) > 0 { + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: ccMsg[split*(len(ccMsg)/split):]}} + } //sess.MarkMessage(ccMsg[len(cMsg)-1], "") log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg))