diff --git a/internal/msg_transfer/logic/offline_history_msg_handler.go b/internal/msg_transfer/logic/offline_history_msg_handler.go index 4fdd30577..2f63f2007 100644 --- a/internal/msg_transfer/logic/offline_history_msg_handler.go +++ b/internal/msg_transfer/logic/offline_history_msg_handler.go @@ -18,14 +18,12 @@ type OfflineHistoryConsumerHandler struct { historyConsumerGroup *kfk.MConsumerGroup cmdCh chan Cmd2Value msgCh chan Cmd2Value - UserAggregationMsgs map[string][]*pbMsg.MsgDataToMQ chArrays [ChannelNum]chan Cmd2Value msgDistributionCh chan Cmd2Value } func (mc *OfflineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { mc.msgHandle = make(map[string]fcb) - mc.UserAggregationMsgs = make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) mc.msgDistributionCh = make(chan Cmd2Value) //no buffer channel go mc.MessagesDistributionHandle() mc.cmdCh = cmdCh @@ -52,11 +50,12 @@ func (och *OfflineHistoryConsumerHandler) Run(channelID int) { case UserMessages: msgChannelValue := cmd.Value.(MsgChannelValue) msgList := msgChannelValue.msgList + triggerID := msgChannelValue.triggerID storageMsgList := make([]*pbMsg.MsgDataToMQ, 80) pushMsgList := make([]*pbMsg.MsgDataToMQ, 80) - latestMsgOperationID := msgList[len(msgList)-1].OperationID - log.Debug(latestMsgOperationID, "msg arrived channel", "channel id", channelID, msgList) + log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList)) for _, v := range msgList { + log.Debug(triggerID, "msg come to storage center", v.String()) isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory) isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync) if isHistory { @@ -76,10 +75,10 @@ func (och *OfflineHistoryConsumerHandler) Run(channelID int) { // return //} - err := saveUserChatList(msgChannelValue.userID, storageMsgList, latestMsgOperationID) + err := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) if err != nil { singleMsgFailedCount += uint64(len(storageMsgList)) - log.NewError(latestMsgOperationID, "single data insert to mongo err", err.Error(), storageMsgList) + log.NewError(triggerID, "single data insert to mongo err", err.Error(), storageMsgList) } else { singleMsgSuccessCountMutex.Lock() singleMsgSuccessCount += uint64(len(storageMsgList)) @@ -94,34 +93,40 @@ func (och *OfflineHistoryConsumerHandler) Run(channelID int) { } } func (och *OfflineHistoryConsumerHandler) MessagesDistributionHandle() { + UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) for { select { case cmd := <-och.msgDistributionCh: switch cmd.Cmd { case ConsumerMsgs: - consumerMessages := cmd.Value.([]*sarama.ConsumerMessage) + triggerChannelValue := cmd.Value.(TriggerChannelValue) + triggerID := triggerChannelValue.triggerID + consumerMessages := triggerChannelValue.cmsgList //Aggregation map[userid]message list + log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages)) for i := 0; i < len(consumerMessages); i++ { msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) if err != nil { - log.Error("msg_transfer Unmarshal msg err", "", "msg", string(consumerMessages[i].Value), "err", err.Error()) + log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error()) return } - if oldM, ok := och.UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { + log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String()) + if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { oldM = append(oldM, &msgFromMQ) - och.UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM + UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM } else { - m := make([]*pbMsg.MsgDataToMQ, 100) + m := make([]*pbMsg.MsgDataToMQ, 0, 100) m = append(m, &msgFromMQ) - och.UserAggregationMsgs[string(consumerMessages[i].Key)] = m + UserAggregationMsgs[string(consumerMessages[i].Key)] = m } } - for userID, v := range och.UserAggregationMsgs { + log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs)) + for userID, v := range UserAggregationMsgs { if len(v) >= 0 { channelID := getHashCode(userID) % ChannelNum go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages}} + och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID}} }(channelID, userID, v) } } @@ -271,23 +276,32 @@ func (OfflineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) erro func (och *OfflineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - cMsg := make([]*sarama.ConsumerMessage, 500) + cMsg := make([]*sarama.ConsumerMessage, 0, 500) t := time.NewTicker(time.Duration(500) * time.Millisecond) + var triggerID string for msg := range claim.Messages() { //och.TriggerCmd(OnlineTopicBusy) cMsg = append(cMsg, msg) select { case <-t.C: if len(cMsg) >= 0 { - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg} + triggerID = utils.OperationIDGenerator() + log.Debug(triggerID, "timer trigger msg consumer start", len(cMsg)) + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: cMsg}} sess.MarkMessage(msg, "") cMsg = cMsg[0:0] + log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) } default: if len(cMsg) >= 500 { - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg} + triggerID = utils.OperationIDGenerator() + log.Debug(triggerID, "length trigger msg consumer start", len(cMsg)) + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: cMsg}} sess.MarkMessage(msg, "") cMsg = cMsg[0:0] + log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) } } diff --git a/internal/msg_transfer/logic/online_history_msg_handler.go b/internal/msg_transfer/logic/online_history_msg_handler.go index 248e55bd6..b33014bc4 100644 --- a/internal/msg_transfer/logic/online_history_msg_handler.go +++ b/internal/msg_transfer/logic/online_history_msg_handler.go @@ -20,8 +20,13 @@ import ( ) type MsgChannelValue struct { - userID string - msgList []*pbMsg.MsgDataToMQ + userID string + triggerID string + msgList []*pbMsg.MsgDataToMQ +} +type TriggerChannelValue struct { + triggerID string + cmsgList []*sarama.ConsumerMessage } type fcb func(cMsg *sarama.ConsumerMessage, msgKey string, sess sarama.ConsumerGroupSession) type Cmd2Value struct { @@ -33,14 +38,12 @@ type OnlineHistoryConsumerHandler struct { historyConsumerGroup *kfk.MConsumerGroup cmdCh chan Cmd2Value msgCh chan Cmd2Value - UserAggregationMsgs map[string][]*pbMsg.MsgDataToMQ chArrays [ChannelNum]chan Cmd2Value msgDistributionCh chan Cmd2Value } func (och *OnlineHistoryConsumerHandler) Init(cmdCh chan Cmd2Value) { och.msgHandle = make(map[string]fcb) - och.UserAggregationMsgs = make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) och.msgDistributionCh = make(chan Cmd2Value) //no buffer channel go och.MessagesDistributionHandle() och.cmdCh = cmdCh @@ -91,11 +94,12 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { case UserMessages: msgChannelValue := cmd.Value.(MsgChannelValue) msgList := msgChannelValue.msgList + triggerID := msgChannelValue.triggerID storageMsgList := make([]*pbMsg.MsgDataToMQ, 80) pushMsgList := make([]*pbMsg.MsgDataToMQ, 80) - latestMsgOperationID := msgList[len(msgList)-1].OperationID - log.Debug(latestMsgOperationID, "msg arrived channel", "channel id", channelID, msgList) + log.Debug(triggerID, "msg arrived channel", "channel id", channelID, msgList, msgChannelValue.userID, len(msgList)) for _, v := range msgList { + log.Debug(triggerID, "msg come to storage center", v.String()) isHistory := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsHistory) isSenderSync := utils.GetSwitchFromOptions(v.MsgData.Options, constant.IsSenderSync) if isHistory { @@ -115,10 +119,10 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { // return //} - err := saveUserChatList(msgChannelValue.userID, storageMsgList, latestMsgOperationID) + err := saveUserChatList(msgChannelValue.userID, storageMsgList, triggerID) if err != nil { singleMsgFailedCount += uint64(len(storageMsgList)) - log.NewError(latestMsgOperationID, "single data insert to mongo err", err.Error(), storageMsgList) + log.NewError(triggerID, "single data insert to mongo err", err.Error(), storageMsgList) } else { singleMsgSuccessCountMutex.Lock() singleMsgSuccessCount += uint64(len(storageMsgList)) @@ -204,34 +208,40 @@ func (och *OnlineHistoryConsumerHandler) Run(channelID int) { //} func (och *OnlineHistoryConsumerHandler) MessagesDistributionHandle() { + UserAggregationMsgs := make(map[string][]*pbMsg.MsgDataToMQ, ChannelNum) for { select { case cmd := <-och.msgDistributionCh: switch cmd.Cmd { case ConsumerMsgs: - consumerMessages := cmd.Value.([]*sarama.ConsumerMessage) + triggerChannelValue := cmd.Value.(TriggerChannelValue) + triggerID := triggerChannelValue.triggerID + consumerMessages := triggerChannelValue.cmsgList //Aggregation map[userid]message list + log.Debug(triggerID, "batch messages come to distribution center", len(consumerMessages)) for i := 0; i < len(consumerMessages); i++ { msgFromMQ := pbMsg.MsgDataToMQ{} err := proto.Unmarshal(consumerMessages[i].Value, &msgFromMQ) if err != nil { - log.Error("msg_transfer Unmarshal msg err", "", "msg", string(consumerMessages[i].Value), "err", err.Error()) + log.Error(triggerID, "msg_transfer Unmarshal msg err", "msg", string(consumerMessages[i].Value), "err", err.Error()) return } - if oldM, ok := och.UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { + log.Debug(triggerID, "single msg come to distribution center", msgFromMQ.String()) + if oldM, ok := UserAggregationMsgs[string(consumerMessages[i].Key)]; ok { oldM = append(oldM, &msgFromMQ) - och.UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM + UserAggregationMsgs[string(consumerMessages[i].Key)] = oldM } else { - m := make([]*pbMsg.MsgDataToMQ, 100) + m := make([]*pbMsg.MsgDataToMQ, 0, 100) m = append(m, &msgFromMQ) - och.UserAggregationMsgs[string(consumerMessages[i].Key)] = m + UserAggregationMsgs[string(consumerMessages[i].Key)] = m } } - for userID, v := range och.UserAggregationMsgs { + log.Debug(triggerID, "generate map list users len", len(UserAggregationMsgs)) + for userID, v := range UserAggregationMsgs { if len(v) >= 0 { channelID := getHashCode(userID) % ChannelNum go func(cID uint32, userID string, messages []*pbMsg.MsgDataToMQ) { - och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages}} + och.chArrays[cID] <- Cmd2Value{Cmd: UserMessages, Value: MsgChannelValue{userID: userID, msgList: messages, triggerID: triggerID}} }(channelID, userID, v) } } @@ -375,23 +385,32 @@ func (OnlineHistoryConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error func (och *OnlineHistoryConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // a instance in the consumer group log.NewDebug("", "online new session msg come", claim.HighWaterMarkOffset(), claim.Topic(), claim.Partition()) - cMsg := make([]*sarama.ConsumerMessage, 500) + cMsg := make([]*sarama.ConsumerMessage, 0, 500) t := time.NewTicker(time.Duration(500) * time.Millisecond) + var triggerID string for msg := range claim.Messages() { //och.TriggerCmd(OnlineTopicBusy) cMsg = append(cMsg, msg) select { case <-t.C: if len(cMsg) >= 0 { - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg} + triggerID = utils.OperationIDGenerator() + log.Debug(triggerID, "timer trigger msg consumer start", len(cMsg)) + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: cMsg}} sess.MarkMessage(msg, "") cMsg = cMsg[0:0] + log.Debug(triggerID, "timer trigger msg consumer end", len(cMsg)) } default: if len(cMsg) >= 500 { - och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: cMsg} + triggerID = utils.OperationIDGenerator() + log.Debug(triggerID, "length trigger msg consumer start", len(cMsg)) + och.msgDistributionCh <- Cmd2Value{Cmd: ConsumerMsgs, Value: TriggerChannelValue{ + triggerID: triggerID, cmsgList: cMsg}} sess.MarkMessage(msg, "") cMsg = cMsg[0:0] + log.Debug(triggerID, "length trigger msg consumer end", len(cMsg)) } }