From 15329a97fe2ed1a3b0c6d1a7011d856b8b0deb61 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Fri, 20 May 2022 16:07:32 +0800 Subject: [PATCH] concurrent consumption of messages --- internal/msg_transfer/logic/online_history_msg_handler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 6efef8124..297c3ea99 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -96,8 +96,8 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { msgChannelValue := cmd.Value.(MsgChannelValue) msgList := msgChannelValue.msgList triggerID := msgChannelValue.triggerID - storageMsgList := make([]*pbMsg.MsgDataToMQ, 80) - pushMsgList := make([]*pbMsg.MsgDataToMQ, 80) + storageMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) + pushMsgList := make([]*pbMsg.MsgDataToMQ, 0, 80) log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList)) for _, v := range msgList { log.Debug(triggerID, "msg come to storage center", v.String()) @@ -119,7 +119,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { // log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) // return //} - + log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(pushMsgList)) err := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) if err != nil { singleMsgFailedCount += uint64(len(storageMsgList)) @@ -227,7 +227,7 @@ func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error()) return } - log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String()) + log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { oldM = append(oldM, &msgFromMQ) UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM