From d855beb3549f07f97991fdc86f422759d247be1b Mon Sep 17 00:00:00 2001
From: wangchuxiao <wangchuxiao97@outlook.com>
Date: Fri, 28 Apr 2023 18:38:12 +0800
Subject: [PATCH] aggres

---
 .../msgtransfer/online_history_msg_handler.go | 70 +++++++++----------
 .../online_msg_to_mongo_handler.go            | 10 +--
 pkg/common/db/controller/msg.go               | 12 ++--
 3 files changed, 46 insertions(+), 46 deletions(-)

diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go
index 994c96abe..87d516657 100644
--- a/internal/msgtransfer/online_history_msg_handler.go
+++ b/internal/msgtransfer/online_history_msg_handler.go
@@ -23,14 +23,14 @@ import (
 )
 
 const ConsumerMsgs = 3
-const AggregationMessages = 4
+const SourceMessages = 4
 const MongoMessages = 5
 const ChannelNum = 100
 
 type MsgChannelValue struct {
-	aggregationID string //maybe userID or super groupID
-	ctx           context.Context
-	ctxMsgList    []*ContextMsg
+	sourceID   string //maybe userID or super groupID
+	ctx        context.Context
+	ctxMsgList []*ContextMsg
 }
 
 type TriggerChannelValue struct {
@@ -84,16 +84,16 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
 		select {
 		case cmd := <-och.chArrays[channelID]:
 			switch cmd.Cmd {
-			case AggregationMessages:
+			case SourceMessages:
 				msgChannelValue := cmd.Value.(MsgChannelValue)
 				ctxMsgList := msgChannelValue.ctxMsgList
 				ctx := msgChannelValue.ctx
-				log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "aggregationID", msgChannelValue.aggregationID)
-				storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(msgChannelValue.aggregationID, ctxMsgList)
-				och.handleMsg(ctx, msgChannelValue.aggregationID, storageMsgList, notStorageMsgList)
-				och.handleNotification(ctx, msgChannelValue.aggregationID, storageNotificationList, notStorageNotificationList)
-				if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.aggregationID, modifyMsgList); err != nil {
-					log.ZError(ctx, "msg to modify mq error", err, "aggregationID", msgChannelValue.aggregationID, "modifyMsgList", modifyMsgList)
+				log.ZDebug(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMsgList), "sourceID", msgChannelValue.sourceID)
+				storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList, modifyMsgList := och.getPushStorageMsgList(msgChannelValue.sourceID, ctxMsgList)
+				och.handleMsg(ctx, msgChannelValue.sourceID, storageMsgList, notStorageMsgList)
+				och.handleNotification(ctx, msgChannelValue.sourceID, storageNotificationList, notStorageNotificationList)
+				if err := och.msgDatabase.MsgToModifyMQ(ctx, msgChannelValue.sourceID, modifyMsgList); err != nil {
+					log.ZError(ctx, "msg to modify mq error", err, "sourceID", msgChannelValue.sourceID, "modifyMsgList", modifyMsgList)
 				}
 			}
 		}
@@ -101,13 +101,13 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
 }
 
 // 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表,
-func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(aggregationID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) {
+func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(sourceID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) {
 	isStorage := func(msg *pbMsg.MsgDataToMQ) bool {
 		options2 := utils.Options(msg.MsgData.Options)
 		if options2.IsHistory() {
 			return true
 		} else {
-			if !(!options2.IsSenderSync() && aggregationID == msg.MsgData.SendID) {
+			if !(!options2.IsSenderSync() && sourceID == msg.MsgData.SendID) {
 				return false
 			}
 		}
@@ -142,52 +142,52 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(aggregationI
 	return
 }
 
-func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
-	och.toPushTopic(ctx, aggregationID, notStorageList)
+func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, sourceID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
+	och.toPushTopic(ctx, sourceID, notStorageList)
 	if len(storageList) > 0 {
-		lastSeq, err := och.msgDatabase.NotificationBatchInsertChat2Cache(ctx, aggregationID, storageList)
+		lastSeq, err := och.msgDatabase.NotificationBatchInsertChat2Cache(ctx, sourceID, storageList)
 		if err != nil {
-			log.ZError(ctx, "notification batch insert to redis error", err, "aggregationID", aggregationID, "storageList", storageList)
+			log.ZError(ctx, "notification batch insert to redis error", err, "sourceID", sourceID, "storageList", storageList)
 			return
 		}
-		och.msgDatabase.MsgToMongoMQ(ctx, aggregationID, storageList, lastSeq)
-		och.toPushTopic(ctx, aggregationID, storageList)
+		och.msgDatabase.MsgToMongoMQ(ctx, sourceID, storageList, lastSeq)
+		och.toPushTopic(ctx, sourceID, storageList)
 	}
 }
 
-func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, aggregationID string, msgs []*pbMsg.MsgDataToMQ) {
+func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, sourceID string, msgs []*pbMsg.MsgDataToMQ) {
 	for _, v := range msgs {
-		och.msgDatabase.MsgToPushMQ(ctx, aggregationID, v)
+		och.msgDatabase.MsgToPushMQ(ctx, sourceID, v)
 	}
 }
 
-func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, aggregationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
-	och.toPushTopic(ctx, aggregationID, notStorageList)
+func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, sourceID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) {
+	och.toPushTopic(ctx, sourceID, notStorageList)
 	if len(storageList) > 0 {
 		var currentMaxSeq int64
 		var err error
 		if storageList[0].MsgData.SessionType == constant.SuperGroupChatType {
-			currentMaxSeq, err = och.msgDatabase.GetGroupMaxSeq(ctx, aggregationID)
+			currentMaxSeq, err = och.msgDatabase.GetGroupMaxSeq(ctx, sourceID)
 			if err == redis.Nil {
 				if err := och.GroupChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil {
-					log.ZError(ctx, "single chat first create conversation error", err, "aggregationID", aggregationID)
+					log.ZError(ctx, "single chat first create conversation error", err, "sourceID", sourceID)
 				}
 			}
 		} else {
-			currentMaxSeq, err = och.msgDatabase.GetUserMaxSeq(ctx, aggregationID)
+			currentMaxSeq, err = och.msgDatabase.GetUserMaxSeq(ctx, sourceID)
 			if err == redis.Nil {
 				if err := och.SingleChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil {
-					log.ZError(ctx, "single chat first create conversation error", err, "aggregationID", aggregationID)
+					log.ZError(ctx, "single chat first create conversation error", err, "sourceID", sourceID)
 				}
 			}
 		}
 		if err != nil && err != redis.Nil {
 			prome.Inc(prome.SeqGetFailedCounter)
-			log.ZError(ctx, "get max seq err", err, "aggregationID", aggregationID)
+			log.ZError(ctx, "get max seq err", err, "sourceID", sourceID)
 			return
 		}
 		prome.Inc(prome.SeqGetSuccessCounter)
-		lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, aggregationID, storageList, currentMaxSeq)
+		lastSeq, err := och.msgDatabase.BatchInsertChat2Cache(ctx, sourceID, storageList, currentMaxSeq)
 		if err != nil && err != redis.Nil {
 			log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList)
 			och.singleMsgFailedCountMutex.Lock()
@@ -198,8 +198,8 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, agg
 		och.singleMsgSuccessCountMutex.Lock()
 		och.singleMsgSuccessCount += uint64(len(storageList))
 		och.singleMsgSuccessCountMutex.Unlock()
-		och.msgDatabase.MsgToMongoMQ(ctx, aggregationID, storageList, lastSeq)
-		och.toPushTopic(ctx, aggregationID, storageList)
+		och.msgDatabase.MsgToMongoMQ(ctx, sourceID, storageList, lastSeq)
+		och.toPushTopic(ctx, sourceID, storageList)
 	}
 }
 
@@ -265,12 +265,12 @@ func (och *OnlineHistoryRedisConsumerHandler) MessagesDistributionHandle() {
 					}
 				}
 				log.ZDebug(ctx, "generate map list users len", "length", len(aggregationMsgs))
-				for aggregationID, v := range aggregationMsgs {
+				for sourceID, v := range aggregationMsgs {
 					if len(v) >= 0 {
-						hashCode := utils.GetHashCode(aggregationID)
+						hashCode := utils.GetHashCode(sourceID)
 						channelID := hashCode % ChannelNum
-						log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "aggregationID", aggregationID)
-						och.chArrays[channelID] <- Cmd2Value{Cmd: AggregationMessages, Value: MsgChannelValue{aggregationID: aggregationID, ctxMsgList: v, ctx: ctx}}
+						log.ZDebug(ctx, "generate channelID", "hashCode", hashCode, "channelID", channelID, "sourceID", sourceID)
+						och.chArrays[channelID] <- Cmd2Value{Cmd: AggregationMessages, Value: MsgChannelValue{sourceID: sourceID, ctxMsgList: v, ctx: ctx}}
 					}
 				}
 			}
diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go
index dccf30bb2..a038a9d1f 100644
--- a/internal/msgtransfer/online_msg_to_mongo_handler.go
+++ b/internal/msgtransfer/online_msg_to_mongo_handler.go
@@ -40,14 +40,14 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
 		log.Error("msg_transfer Unmarshal msg err", "", "msg", string(msg), "err", err.Error())
 		return
 	}
-	log.Info(operationID, "BatchInsertChat2DB userID: ", msgFromMQ.AggregationID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq)
-	err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages, msgFromMQ.LastSeq)
+	log.Info(operationID, "BatchInsertChat2DB userID: ", msgFromMQ.SourceID, "msgFromMQ.LastSeq: ", msgFromMQ.LastSeq)
+	err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.SourceID, msgFromMQ.Messages, msgFromMQ.LastSeq)
 	if err != nil {
-		log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
+		log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.Messages, msgFromMQ.SourceID, msgFromMQ.TriggerID)
 	}
-	err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.AggregationID, msgFromMQ.Messages)
+	err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.SourceID, msgFromMQ.Messages)
 	if err != nil {
-		log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.Messages, msgFromMQ.AggregationID, msgFromMQ.TriggerID)
+		log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.Messages, msgFromMQ.SourceID, msgFromMQ.TriggerID)
 	}
 	for _, v := range msgFromMQ.Messages {
 		if v.MsgData.ContentType == constant.DeleteMessageNotification {
diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go
index d1edd0464..23dda6cb4 100644
--- a/pkg/common/db/controller/msg.go
+++ b/pkg/common/db/controller/msg.go
@@ -80,9 +80,9 @@ type MsgDatabase interface {
 	GetGroupMinSeq(ctx context.Context, groupID string) (int64, error)
 
 	MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.MsgDataToMQ) error
-	MsgToModifyMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ) error
+	MsgToModifyMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ) error
 	MsgToPushMQ(ctx context.Context, sourceID string, msg2mq *pbMsg.MsgDataToMQ) (int32, int64, error)
-	MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error
+	MsgToMongoMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error
 }
 
 func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) MsgDatabase {
@@ -189,9 +189,9 @@ func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *pbMsg.Ms
 	return err
 }
 
-func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ) error {
+func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ) error {
 	if len(messages) > 0 {
-		_, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: aggregationID, Messages: messages})
+		_, _, err := db.producerToModify.SendMessage(ctx, sourceID, &pbMsg.MsgDataToModifyByMQ{SourceID: sourceID, Messages: messages})
 		return err
 	}
 	return nil
@@ -206,9 +206,9 @@ func (db *msgDatabase) MsgToPushMQ(ctx context.Context, key string, msg2mq *pbMs
 	return partition, offset, err
 }
 
-func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error {
+func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error {
 	if len(messages) > 0 {
-		_, _, err := db.producerToModify.SendMessage(ctx, aggregationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: aggregationID, Messages: messages})
+		_, _, err := db.producerToModify.SendMessage(ctx, sourceID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, SourceID: sourceID, Messages: messages})
 		return err
 	}
 	return nil