From c5f055718ab8d87f063d7e783bd7b5c3b5d036df Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Fri, 20 May 2022 17:32:40 +0800 Subject: [PATCH] concurrent consumption of messages --- internal/msg_transfer/logic/online_history_msg_handler.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 6e38ac015..f36d6cfaf 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -209,8 +209,8 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { //} func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { - UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) for { + UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) select { case cmd := <-och.msgDistributionCh: switch cmd.Cmd { @@ -250,6 +250,7 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { } } } + } } @@ -403,7 +404,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS } triggerID = utils.OperationIDGenerator() - log.Debug(triggerID, "timer trigger msg consumer start", len(cMsg)) + log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ triggerID: triggerID, cmsgList: ccMsg}} sess.MarkMessage(msg, "") @@ -418,7 +419,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS } triggerID = utils.OperationIDGenerator() - log.Debug(triggerID, "length trigger msg consumer start", len(cMsg)) + log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ triggerID: triggerID, cmsgList: ccMsg}} sess.MarkMessage(msg, "")