From 84e7935971fbe0aa936a3873f21f9a9068e2f4dd Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 18:03:43 +0800 Subject: [PATCH 1/4] channelNum --- internal/msg_transfer/logic/init.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index 96412fa40..f8c424dcc 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -16,7 +16,7 @@ const Msg = 2 const ConsumerMsgs = 3 const UserMessages = 4 const MongoMessages = 5 -const ChannelNum = 10 +const ChannelNum = 100 var ( persistentCH PersistentConsumerHandler From eb41d8b16de26de942a8392dc3c8190b79891d8a Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 18:10:53 +0800 Subject: [PATCH 2/4] channelNum --- internal/msg_transfer/logic/online_history_msg_handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 5c34b8ff4..069ec8c1b 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -130,7 +130,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { // return //} log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragepushMsgList)) - err, lastSeq := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) + err, _ := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) if err != nil { singleMsgFailedCount += uint64(len(storageMsgList)) log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList) @@ -138,7 +138,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { singleMsgSuccessCountMutex.Lock() singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCountMutex.Unlock() - och.SendMessageToMongoCH(msgChannelValue.userID, triggerID, storageMsgList, lastSeq) + //och.SendMessageToMongoCH(msgChannelValue.userID, triggerID, storageMsgList, lastSeq) go func(push, storage []*pbMsg.MsgDataToMQ) { for _, v := range storage { sendMessageToPush(v, msgChannelValue.userID) From b797492dbd98087ef1ca74e05f40240b50918bbd Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 18:11:49 +0800 Subject: [PATCH 3/4] channelNum --- internal/msg_transfer/logic/online_history_msg_handler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 069ec8c1b..8e8941d62 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -130,7 +130,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { // return //} log.Debug(triggerID, "msg storage length", len(storageMsgList), "push length", len(notStoragepushMsgList)) - err, _ := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) + err, lastSeq := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) if err != nil { singleMsgFailedCount += uint64(len(storageMsgList)) log.NewError(triggerID, "single data insert to redis err", err.Error(), storageMsgList) @@ -138,7 +138,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { singleMsgSuccessCountMutex.Lock() singleMsgSuccessCount += uint64(len(storageMsgList)) singleMsgSuccessCountMutex.Unlock() - //och.SendMessageToMongoCH(msgChannelValue.userID, triggerID, storageMsgList, lastSeq) + och.SendMessageToMongoCH(msgChannelValue.userID, triggerID, storageMsgList, lastSeq) go func(push, storage []*pbMsg.MsgDataToMQ) { for _, v := range storage { sendMessageToPush(v, msgChannelValue.userID) @@ -155,6 +155,7 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { } } func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ, lastSeq uint64) { + return hashCode := getHashCode(userID) channelID := hashCode % ChannelNum log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) From 7267fd080e6e2e0567c28cf2a453320578f34ec5 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Wed, 25 May 2022 18:17:55 +0800 Subject: [PATCH 4/4] channelNum --- internal/msg_transfer/logic/init.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index f8c424dcc..96412fa40 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -16,7 +16,7 @@ const Msg = 2 const ConsumerMsgs = 3 const UserMessages = 4 const MongoMessages = 5 -const ChannelNum = 100 +const ChannelNum = 10 var ( persistentCH PersistentConsumerHandler