From 8d78ef5946991c6e176b0410cb9133f6b82d4423 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Tue, 24 May 2022 23:07:07 +0800 Subject: [PATCH] redis add get message --- .../logic/online_history_msg_handler.go | 202 +++++++++--------- 1 file changed, 103 insertions(+), 99 deletions(-) diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 9e1a07925..5a2c95eb2 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -180,75 +180,6 @@ func (och *OnlineHistoryConsumerHandler) MongoMessageRun(channelID int) { } } -//func (och *OnlineHistoryConsumerHandler) handleChatWs2Mongo(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) { -// msg := cMsg.Value -// now := time.Now() -// msgFromMQ := pbMsg.MsgDataToMQ{} -// err := proto.Unmarshal(msg, &msgFromMQ) -// if err != nil { -// log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) -// return -// } -// operationID := msgFromMQ.OperationID -// log.NewInfo(operationID, "msg come mongo!!!", "", "msg", string(msg)) -// //Control whether to store offline messages (mongo) -// isHistory := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsHistory) -// //Control whether to store history messages (mysql) -// isPersist := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsPersistent) -// isSenderSync := utils.GetSwitchFromOptions(msgFromMQ.MsgData.Options, constant.IsSenderSync) -// switch msgFromMQ.MsgData.SessionType { -// case constant.SingleChatType: -// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = SingleChatType", isHistory, isPersist) -// if isHistory { -// err := saveUserChat(msgKey, &msgFromMQ) -// if err != nil { -// singleMsgFailedCount++ -// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) -// return -// } -// singleMsgSuccessCountMutex.Lock() -// singleMsgSuccessCount++ -// singleMsgSuccessCountMutex.Unlock() -// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) -// } -// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { -// } else { -// go sendMessageToPush(&msgFromMQ, msgKey) -// } -// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) -// case constant.GroupChatType: -// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = GroupChatType", isHistory, isPersist) -// if isHistory { -// err := saveUserChat(msgFromMQ.MsgData.RecvID, &msgFromMQ) -// if err != nil { -// log.NewError(operationID, "group data insert to mongo err", msgFromMQ.String(), msgFromMQ.MsgData.RecvID, err.Error()) -// return -// } -// groupMsgCount++ -// } -// go sendMessageToPush(&msgFromMQ, msgFromMQ.MsgData.RecvID) -// case constant.NotificationChatType: -// log.NewDebug(msgFromMQ.OperationID, "msg_transfer msg type = NotificationChatType", isHistory, isPersist) -// if isHistory { -// err := saveUserChat(msgKey, &msgFromMQ) -// if err != nil { -// log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.String()) -// return -// } -// log.NewDebug(msgFromMQ.OperationID, "sendMessageToPush cost time ", time.Since(now)) -// } -// if !isSenderSync && msgKey == msgFromMQ.MsgData.SendID { -// } else { -// go sendMessageToPush(&msgFromMQ, msgKey) -// } -// log.NewDebug(operationID, "saveUserChat cost time ", time.Since(now)) -// default: -// log.NewError(msgFromMQ.OperationID, "SessionType error", msgFromMQ.String()) -// return -// } -// sess.MarkMessage(cMsg, "") -// log.NewDebug(msgFromMQ.OperationID, "msg_transfer handle topic data to database success...", msgFromMQ.String()) -//} func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { for { UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) @@ -440,53 +371,126 @@ func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupS } log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - cMsg := make([]*sarama.ConsumerMessage, 0, 1000) - t := time.NewTicker(time.Duration(100) * time.Millisecond) + //cMsg := make([]*sarama.ConsumerMessage, 0, 1000) + //t := time.NewTicker(time.Duration(100) * time.Millisecond) var triggerID string for msg := range claim.Messages() { - cMsg = append(cMsg, msg) + msgFromMQ := pbMsg.MsgDataToMQ{} + err := proto.Unmarshal(msg.Value, &msgFromMQ) + if err != nil { + log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(msg.Value), "err", err.Error()) + } + userID := string(msg.Key) + hashCode := getHashCode(userID) + channelID := hashCode % ChannelNum + log.Debug(triggerID, "generate channelID", hashCode, channelID, userID) + //go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { + och.chArrays[channelID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: []*pbMsg.MsgDataToMQ{&msgFromMQ}, triggerID: triggerID}} + sess.MarkMessage(msg, "") + //cMsg = append(cMsg, msg) //och.TriggerCmd(OnlineTopicBusy) - select { - //case : - // triggerID = utils.OperationIDGenerator() + //select { + ////case : + //// triggerID = utils.OperationIDGenerator() + //// + //// log.NewDebug(triggerID, "claim.Messages ", msg) + //// cMsg = append(cMsg, msg) + //// if len(cMsg) >= 1000 { + //// ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) + //// for _, v := range cMsg { + //// ccMsg = append(ccMsg, v) + //// } + //// log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) + //// och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + //// triggerID: triggerID, cmsgList: ccMsg}} + //// sess.MarkMessage(msg, "") + //// cMsg = make([]*sarama.ConsumerMessage, 0, 1000) + //// log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) + //// } // - // log.NewDebug(triggerID, "claim.Messages ", msg) - // cMsg = append(cMsg, msg) - // if len(cMsg) >= 1000 { + //case <-t.C: + // if len(cMsg) > 0 { // ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) // for _, v := range cMsg { // ccMsg = append(ccMsg, v) // } - // log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) + // triggerID = utils.OperationIDGenerator() + // log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) // och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ // triggerID: triggerID, cmsgList: ccMsg}} - // sess.MarkMessage(msg, "") + // sess.MarkMessage(cMsg[len(cMsg)-1], "") // cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - // log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) + // log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) // } - - case <-t.C: - if len(cMsg) > 0 { - ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) - for _, v := range cMsg { - ccMsg = append(ccMsg, v) - } - triggerID = utils.OperationIDGenerator() - log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ - triggerID: triggerID, cmsgList: ccMsg}} - sess.MarkMessage(cMsg[len(cMsg)-1], "") - cMsg = make([]*sarama.ConsumerMessage, 0, 1000) - log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) - } - default: - - } + //default: + // + //} //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) } return nil } + +//func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, +// claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group +// +// for { +// if sess == nil { +// log.NewWarn("", " sess == nil, waiting ") +// time.Sleep(100 * time.Millisecond) +// } else { +// break +// } +// } +// +// log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) +// cMsg := make([]*sarama.ConsumerMessage, 0, 1000) +// t := time.NewTicker(time.Duration(100) * time.Millisecond) +// var triggerID string +// for msg := range claim.Messages() { +// cMsg = append(cMsg, msg) +// //och.TriggerCmd(OnlineTopicBusy) +// select { +// //case : +// // triggerID = utils.OperationIDGenerator() +// // +// // log.NewDebug(triggerID, "claim.Messages ", msg) +// // cMsg = append(cMsg, msg) +// // if len(cMsg) >= 1000 { +// // ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) +// // for _, v := range cMsg { +// // ccMsg = append(ccMsg, v) +// // } +// // log.Debug(triggerID, "length trigger msg consumer start", len(ccMsg)) +// // och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ +// // triggerID: triggerID, cmsgList: ccMsg}} +// // sess.MarkMessage(msg, "") +// // cMsg = make([]*sarama.ConsumerMessage, 0, 1000) +// // log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) +// // } +// +// case <-t.C: +// if len(cMsg) > 0 { +// ccMsg := make([]*sarama.ConsumerMessage, 0, 1000) +// for _, v := range cMsg { +// ccMsg = append(ccMsg, v) +// } +// triggerID = utils.OperationIDGenerator() +// log.Debug(triggerID, "timer trigger msg consumer start", len(ccMsg)) +// och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ +// triggerID: triggerID, cmsgList: ccMsg}} +// sess.MarkMessage(cMsg[len(cMsg)-1], "") +// cMsg = make([]*sarama.ConsumerMessage, 0, 1000) +// log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) +// } +// default: +// +// } +// //log.NewDebug("", "online kafka get info to mongo", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "online", msg.Offset, claim.HighWaterMarkOffset()) +// +// } +// return nil +//} func sendMessageToPush(message *pbMsg.MsgDataToMQ, pushToUserID string) { log.Info(message.OperationID, "msg_transfer send message to push", "message", message.String()) rpcPushMsg := pbPush.PushMsgReq{OperationID: message.OperationID, MsgData: message.MsgData, PushToUserID: pushToUserID}