From 94e6380485289d8f6bd7cf23f719ef98d744eb6a Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 24 May 2022 21:14:39 +0800 Subject: [PATCH] redis add get message --- internal/msg_transfer/logic/init.go | 1 + .../logic/online_history_msg_handler.go | 28 ++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/internal/msg_transfer/logic/init.go b/internal/msg_transfer/logic/init.go index f3f1b138e..f8c424dcc 100644 --- a/internal/msg_transfer/logic/init.go +++ b/internal/msg_transfer/logic/init.go @@ -15,6 +15,7 @@ const OnlineTopicVacancy = 0 const Msg = 2 const ConsumerMsgs = 3 const UserMessages = 4 +const MongoMessages = 5 const ChannelNum = 100 var ( diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 698e52cc4..cf51e9628 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -39,6 +39,7 @@ type OnlineHistoryConsumerHandler struct { cmdCh chan Cmd2Value msgCh chan Cmd2Value chArrays [ChannelNum]chan Cmd2Value + chMongoArrays [ChannelNum]chan Cmd2Value msgDistributionCh chan Cmd2Value } @@ -52,6 +53,10 @@ func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { och.chArrays[i] = make(chan Cmd2Value, 1000) go och.Run(i) } + for i := 0; i < ChannelNum; i++ { + och.chMongoArrays[i] = make(chan Cmd2Value, 1000) + go och.MongoMessageRun(i) + } if config.Config.ReliableStorage { och.msgHandle[config.Config.Kafka.Ws2mschat.Topic] = och.handleChatWs2Mongo } else { @@ -146,6 +151,28 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { } } } +func (och *OnlineHistoryConsumerHandler) SendMessageToMongoCH(userID string, triggerID string, messages []*pbMsg.MsgDataToMQ) { + hashCode := getHashCode(userID) + channelID := hashCode % ChannelNum + log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) + //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { + och.chMongoArrays[channelID] <- Cmd2Value{Cmd: MongoMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID}} +} +func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { + for { + select { + case cmd := <-och.chArrays[channelID]: + switch cmd.Cmd { + + case MongoMessages: + msgChannelValue := cmd.Value.(MsgChannelValue) + msgList := msgChannelValue.msgList + triggerID := msgChannelValue.triggerID + userID := msgChannelValue.userID + } + } + } +} //func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { // msg := cMsg.Value @@ -216,7 +243,6 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { // sess.MarkMessage(cMsg, "") // log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) //} - func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { for { UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum)