From 77c7efbe896d3632673ae3e80c4553669b10cc16 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 24 May 2022 23:31:37 +0800 Subject: [PATCH] redis add get message --- .../logic/online_history_msg_handler.go | 100 +++++++++--------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 5a2c95eb2..805eef805 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -41,13 +41,13 @@ type OnlineHistoryConsumerHandler struct { msgCh chan Cmd2Value chArrays [ChannelNum]chan Cmd2Value chMongoArrays [ChannelNum]chan Cmd2Value - msgDistributionCh chan Cmd2Value + //msgDistributionCh chan Cmd2Value } func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { och.msgHandle = make(map[string]fcb) - och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel - go och.MessagesDistributionHandle() + //och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel + //go och.MessagesDistributionHandle() och.cmdCh = cmdCh och.msgCh = make(chan Cmd2Value, 1000) for i := 0; i < ChannelNum; i++ { @@ -180,52 +180,52 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { } } -func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { - for { - UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) - select { - case cmd := <-och.msgDistributionCh: - switch cmd.Cmd { - case ConsumerMsgs: - triggerChannelValue := cmd.Value.(TriggerChannelValue) - triggerID := triggerChannelValue.triggerID - consumerMessages := triggerChannelValue.cmsgList - //Aggregation map[userid]message list - log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages)) - for i := 0; i < len(consumerMessages); i++ { - msgFromMQ := pbMsg.MsgDataToMQ{} - err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) - if err != nil { - log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error()) - return - } - log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) - if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { - oldM = append(oldM, &msgFromMQ) - UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM - } else { - m := make([]*pbMsg.MsgDataToMQ, 0, 100) - m = append(m, &msgFromMQ) - UserAggregationMsgs[string(consumerMessages[i].Key)] = m - } - } - log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs)) - for userID, v := range UserAggregationMsgs { - if len(v) >= 0 { - hashCode := getHashCode(userID) - channelID := hashCode % ChannelNum - log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) - //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: v, triggerID: triggerID}} - //}(channelID, userID, v) - } - } - } - } - - } - -} +//func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { +// for { +// UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) +// select { +// case cmd := <-och.msgDistributionCh: +// switch cmd.Cmd { +// case ConsumerMsgs: +// triggerChannelValue := cmd.Value.(TriggerChannelValue) +// triggerID := triggerChannelValue.triggerID +// consumerMessages := triggerChannelValue.cmsgList +// //Aggregation map[userid]message list +// log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages)) +// for i := 0; i < len(consumerMessages); i++ { +// msgFromMQ := pbMsg.MsgDataToMQ{} +// err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) +// if err != nil { +// log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error()) +// return +// } +// log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String(), string(consumerMessages[i].Key)) +// if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { +// oldM = append(oldM, &msgFromMQ) +// UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM +// } else { +// m := make([]*pbMsg.MsgDataToMQ, 0, 100) +// m = append(m, &msgFromMQ) +// UserAggregationMsgs[string(consumerMessages[i].Key)] = m +// } +// } +// log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs)) +// for userID, v := range UserAggregationMsgs { +// if len(v) >= 0 { +// hashCode := getHashCode(userID) +// channelID := hashCode % ChannelNum +// log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) +// //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { +// och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: v, triggerID: triggerID}} +// //}(channelID, userID, v) +// } +// } +// } +// } +// +// } +// +//} func (mc *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { msg := cMsg.Value now := time.Now() @@ -385,7 +385,7 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS channelID := hashCode % ChannelNum log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: triggerID}} + och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: msgFromMQ.OperationID}} sess.MarkMessage(msg, "") //cMsg = append(cMsg, msg) //och.TriggerCmd(OnlineTopicBusy)