diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 994c96abe..87d516657 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -23,14 +23,14 @@ import ( ) const ConsumerMsgs = 3 -const AggregationMessages = 4 +const SourceMessages = 4 const MongoMessages = 5 const ChannelNum = 100 type MsgChannelValue struct { - aggregationID string //maybe userID or super groupID - ctx context.Context - ctxMsgList []*ContextMsg + sourceID string //maybe userID or super groupID + ctx context.Context + ctxMsgList []*ContextMsg } type TriggerChannelValue struct { @@ -84,16 +84,16 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { select { case cmd := <-och.chArrays[channelID]: switch cmd.Cmd { - case AggregationMessages: + case SourceMessages: msgChannelValue := cmd.Value.(MsgChannelValue) ctxMsgList := msgChannelValue.ctxMsgList ctx := msgChannelValue.ctx - log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "aggregationID", msgChannelValue.aggregationID) - storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(msgChannelValue.aggregationID, ctxMsgList) - och.handleMsg(ctx, msgChannelValue.aggregationID, storageMsgList, notStorageMsgList) - och.handleNotification(ctx, msgChannelValue.aggregationID, storageNotificationList, notStorageNotificationList) - if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, modifyMsgList); err != nil { - log.ZError(ctx, "msg to modify mq error", err, "aggregationID", msgChannelValue.aggregationID, "modifyMsgList", modifyMsgList) + log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "sourceID", msgChannelValue.sourceID) + storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(msgChannelValue.sourceID, ctxMsgList) + och.handleMsg(ctx, msgChannelValue.sourceID, storageMsgList, notStorageMsgList) + och.handleNotification(ctx, msgChannelValue.sourceID, storageNotificationList, notStorageNotificationList) + if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.sourceID, modifyMsgList); err != nil { + log.ZError(ctx, "msg to modify mq error", err, "sourceID", msgChannelValue.sourceID, "modifyMsgList", modifyMsgList) } } } @@ -101,13 +101,13 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { } // 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表, -func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(aggregationID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) { +func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(sourceID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) { isStorage := func(msg *pbMsg.MsgDataToMQ) bool { options2 := utils.Options(msg.MsgData.Options) if options2.IsHistory() { return true } else { - if !(!options2.IsSenderSync() && aggregationID == msg.MsgData.SendID) { + if !(!options2.IsSenderSync() && sourceID == msg.MsgData.SendID) { return false } } @@ -142,52 +142,52 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(aggregationI return } -func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) { - och.toPushTopic(ctx, aggregationID, notStorageList) +func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, sourceID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) { + och.toPushTopic(ctx, sourceID, notStorageList) if len(storageList) > 0 { - lastSeq, err := och.msgDatabase.NotificationBatchInsertChat2Cache(ctx, aggregationID, storageList) + lastSeq, err := och.msgDatabase.NotificationBatchInsertChat2Cache(ctx, sourceID, storageList) if err != nil { - log.ZError(ctx, "notification batch insert to redis error", err, "aggregationID", aggregationID, "storageList", storageList) + log.ZError(ctx, "notification batch insert to redis error", err, "sourceID", sourceID, "storageList", storageList) return } - och.msgDatabase.MsgToMongoMQ(ctx, aggregationID, storageList, lastSeq) - och.toPushTopic(ctx, aggregationID, storageList) + och.msgDatabase.MsgToMongoMQ(ctx, sourceID, storageList, lastSeq) + och.toPushTopic(ctx, sourceID, storageList) } } -func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, aggregationID string, msgs []*pbMsg.MsgDataToMQ) { +func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, sourceID string, msgs []*pbMsg.MsgDataToMQ) { for _, v := range msgs { - och.msgDatabase.MsgToPushMQ(ctx, aggregationID, v) + och.msgDatabase.MsgToPushMQ(ctx, sourceID, v) } } -func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) { - och.toPushTopic(ctx, aggregationID, notStorageList) +func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, sourceID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) { + och.toPushTopic(ctx, sourceID, notStorageList) if len(storageList) > 0 { var currentMaxSeq int64 var err error if storageList[0].MsgData.SessionType == constant.SuperGroupChatType { - currentMaxSeq, err = och.msgDatabase.GetGroupMaxSeq(ctx, aggregationID) + currentMaxSeq, err = och.msgDatabase.GetGroupMaxSeq(ctx, sourceID) if err == redis.Nil { if err := och.GroupChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil { - log.ZError(ctx, "single chat first create conversation error", err, "aggregationID", aggregationID) + log.ZError(ctx, "single chat first create conversation error", err, "sourceID", sourceID) } } } else { - currentMaxSeq, err = och.msgDatabase.GetUserMaxSeq(ctx, aggregationID) + currentMaxSeq, err = och.msgDatabase.GetUserMaxSeq(ctx, sourceID) if err == redis.Nil { if err := och.SingleChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil { - log.ZError(ctx, "single chat first create conversation error", err, "aggregationID", aggregationID) + log.ZError(ctx, "single chat first create conversation error", err, "sourceID", sourceID) } } } if err != nil && err != redis.Nil { prome.Inc(prome.SeqGetFailedCounter) - log.ZError(ctx, "get max seq err", err, "aggregationID", aggregationID) + log.ZError(ctx, "get max seq err", err, "sourceID", sourceID) return } prome.Inc(prome.SeqGetSuccessCounter) - lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, aggregationID, storageList, currentMaxSeq) + lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, sourceID, storageList, currentMaxSeq) if err != nil && err != redis.Nil { log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList) och.singleMsgFailedCountMutex.Lock() @@ -198,8 +198,8 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, agg och.singleMsgSuccessCountMutex.Lock() och.singleMsgSuccessCount += uint64(len(storageList)) och.singleMsgSuccessCountMutex.Unlock() - och.msgDatabase.MsgToMongoMQ(ctx, aggregationID, storageList, lastSeq) - och.toPushTopic(ctx, aggregationID, storageList) + och.msgDatabase.MsgToMongoMQ(ctx, sourceID, storageList, lastSeq) + och.toPushTopic(ctx, sourceID, storageList) } } @@ -265,12 +265,12 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() { } } log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs)) - for aggregationID, v := range aggregationMsgs { + for sourceID, v := range aggregationMsgs { if len(v) >= 0 { - hashCode := utils.GetHashCode(aggregationID) + hashCode := utils.GetHashCode(sourceID) channelID := hashCode % ChannelNum - log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "aggregationID", aggregationID) - och.chArrays[channelID] <- Cmd2Value{Cmd: AggregationMessages, Value: MsgChannelValue{aggregationID: aggregationID, ctxMsgList: v, ctx: ctx}} + log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "sourceID", sourceID) + och.chArrays[channelID] <- Cmd2Value{Cmd: AggregationMessages, Value: MsgChannelValue{sourceID: sourceID, ctxMsgList: v, ctx: ctx}} } } } diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index dccf30bb2..a038a9d1f 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -40,14 +40,14 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error()) return } - log.Info(operationID, "BatchInsertChat2DB userID: ", msgFromMQ.AggregationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq) - err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages, msgFromMQ.LastSeq) + log.Info(operationID, "BatchInsertChat2DB userID: ", msgFromMQ.SourceID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq) + err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.SourceID, msgFromMQ.Messages, msgFromMQ.LastSeq) if err != nil { - log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID) + log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.Messages, msgFromMQ.SourceID, msgFromMQ.TriggerID) } - err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages) + err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.SourceID, msgFromMQ.Messages) if err != nil { - log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID) + log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.Messages, msgFromMQ.SourceID, msgFromMQ.TriggerID) } for _, v := range msgFromMQ.Messages { if v.MsgData.ContentType == constant.DeleteMessageNotification { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index d1edd0464..23dda6cb4 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -80,9 +80,9 @@ type MsgDatabase interface { GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error - MsgToModifyMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ) error + MsgToModifyMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ) error MsgToPushMQ(ctx context.Context, sourceID string, msg2mq *pbMsg.MsgDataToMQ) (int32, int64, error) - MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error + MsgToMongoMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error } func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) MsgDatabase { @@ -189,9 +189,9 @@ func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.Ms return err } -func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ) error { +func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ) error { if len(messages) > 0 { - _, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages}) + _, _, err := db.producerToModify.SendMessage(ctx, sourceID, &pbMsg.MsgDataToModifyByMQ{SourceID: sourceID, Messages: messages}) return err } return nil @@ -206,9 +206,9 @@ func (db *msgDatabase) MsgToPushMQ(ctx context.Context, key string, msg2mq *pbMs return partition, offset, err } -func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error { +func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error { if len(messages) > 0 { - _, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages}) + _, _, err := db.producerToModify.SendMessage(ctx, sourceID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, SourceID: sourceID, Messages: messages}) return err } return nil