diff --git a/internal/msgtransfer/modify_msg_handler.go b/internal/msgtransfer/modify_msg_handler.go index 99a24d30a..beeb48d3e 100644 --- a/internal/msgtransfer/modify_msg_handler.go +++ b/internal/msgtransfer/modify_msg_handler.go @@ -62,15 +62,15 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama return } log.Debug(msgFromMQ.TriggerID, "proto.Unmarshal MsgDataToMQ", msgFromMQ.String()) - for _, msgDataToMQ := range msgFromMQ.Messages { - isReactionFromCache := utils.GetSwitchFromOptions(msgDataToMQ.MsgData.Options, constant.IsReactionFromCache) + for _, msg := range msgFromMQ.Messages { + isReactionFromCache := utils.GetSwitchFromOptions(msg.Options, constant.IsReactionFromCache) if !isReactionFromCache { continue } ctx = mcontext.SetOperationID(ctx, operationID) - if msgDataToMQ.MsgData.ContentType == constant.ReactionMessageModifier { + if msg.ContentType == constant.ReactionMessageModifier { notification := &sdkws.ReactionMessageModifierNotification{} - if err := json.Unmarshal(msgDataToMQ.MsgData.Content, notification); err != nil { + if err := json.Unmarshal(msg.Content, notification); err != nil { continue } if notification.IsExternalExtensions { @@ -102,9 +102,9 @@ func (mmc *ModifyMsgConsumerHandler) ModifyMsg(ctx context.Context, cMsg *sarama log.NewError(operationID, "InsertOrUpdateReactionExtendMsgSet failed") } } - } else if msgDataToMQ.MsgData.ContentType == constant.ReactionMessageDeleter { + } else if msg.ContentType == constant.ReactionMessageDeleter { notification := &sdkws.ReactionMessageDeleteNotification{} - if err := json.Unmarshal(msgDataToMQ.MsgData.Content, notification); err != nil { + if err := json.Unmarshal(msg.Content, notification); err != nil { continue } if err := mmc.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, notification.ConversationID, notification.SessionType, notification.ClientMsgID, notification.MsgFirstModifyTime, mmc.extendSetMsgModel.Pb2Model(notification.SuccessReactionExtensions)); err != nil { diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 2e28bdb64..f5445762f 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -5,9 +5,6 @@ import ( "sync" "time" - "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" - "github.com/go-redis/redis" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" @@ -16,11 +13,11 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" pbConversation "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation" - pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/Shopify/sarama" + "github.com/go-redis/redis" "github.com/golang/protobuf/proto" ) @@ -103,7 +100,7 @@ func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) { } // 获取消息/通知 存储的消息列表, 不存储并且推送的消息列表, -func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(sourceID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*pbMsg.MsgDataToMQ) { +func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(sourceID string, totalMsgs []*ContextMsg) (storageMsgList, notStorageMsgList, storageNotificatoinList, notStorageNotificationList, modifyMsgList []*sdkws.MsgData) { isStorage := func(msg *sdkws.MsgData) bool { options2 := utils.Options(msg.Options) if options2.IsHistory() { @@ -120,10 +117,10 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(sourceID str options := utils.Options(v.message.Options) if options.IsNotification() { // 原通知 - notificationMsg := proto.Clone(v.message).(*pbMsg.MsgDataToMQ) + notificationMsg := proto.Clone(v.message).(*sdkws.MsgData) if options.IsSendMsg() { // 消息 - v.message.Options = utils.WithOptions(utils.Options(v.message.MsgData.Options), utils.WithNotification(false), utils.WithSendMsg(false)) + v.message.Options = utils.WithOptions(utils.Options(v.message.Options), utils.WithNotification(false), utils.WithSendMsg(false)) storageMsgList = append(storageMsgList, v.message) } if isStorage(notificationMsg) { @@ -145,7 +142,7 @@ func (och *OnlineHistoryRedisConsumerHandler) getPushStorageMsgList(sourceID str return } -func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, conversationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) { +func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) { och.toPushTopic(ctx, conversationID, notStorageList) if len(storageList) > 0 { lastSeq, err := och.msgDatabase.NotificationBatchInsertChat2Cache(ctx, conversationID, storageList) @@ -159,22 +156,22 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(ctx context.Con } } -func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, conversationID string, msgs []*pbMsg.MsgDataToMQ) { +func (och *OnlineHistoryRedisConsumerHandler) toPushTopic(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) { for _, v := range msgs { och.msgDatabase.MsgToPushMQ(ctx, conversationID, v) } } -func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, conversationID string, storageList, notStorageList []*pbMsg.MsgDataToMQ) { +func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, conversationID string, storageList, notStorageList []*sdkws.MsgData) { och.toPushTopic(ctx, conversationID, notStorageList) if len(storageList) > 0 { var currentMaxSeq int64 var err error - if storageList[0].MsgData.SessionType == constant.SuperGroupChatType { + if storageList[0].SessionType == constant.SuperGroupChatType { currentMaxSeq, err = och.msgDatabase.GetGroupMaxSeq(ctx, conversationID) if err == redis.Nil { log.ZInfo(ctx, "group chat first create conversation", "conversationID", conversationID) - if err := och.GroupChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil { + if err := och.GroupChatFirstCreateConversation(ctx, storageList[0]); err != nil { log.ZError(ctx, "single chat first create conversation error", err, "conversationID", conversationID) } } @@ -182,7 +179,7 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, con currentMaxSeq, err = och.msgDatabase.GetUserMaxSeq(ctx, conversationID) if err == redis.Nil { log.ZInfo(ctx, "single chat first create conversation", "conversationID", conversationID) - if err := och.SingleChatFirstCreateConversation(ctx, storageList[0].MsgData); err != nil { + if err := och.SingleChatFirstCreateConversation(ctx, storageList[0]); err != nil { log.ZError(ctx, "single chat first create conversation error", err, "conversationID", conversationID) } } diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 516d771cf..8961e1cde 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -274,7 +274,6 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, us if err != nil { return false, err } - return true, nil switch singleOpt { case constant.ReceiveMessage: return true, nil @@ -311,30 +310,29 @@ func valueCopy(pb *msg.SendMsgReq) *msg.SendMsgReq { return &msg.SendMsgReq{MsgData: &msgData} } -func (m *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []string, groupPB *msg.SendMsgReq, wg *sync.WaitGroup) error { - msgToMQGroup := msg.MsgDataToMQ{MsgData: groupPB.MsgData} +func (m *msgServer) sendMsgToGroupOptimization(ctx context.Context, list []string, req *msg.SendMsgReq, wg *sync.WaitGroup) error { tempOptions := make(map[string]bool, 1) - for k, v := range groupPB.MsgData.Options { + for k, v := range req.MsgData.Options { tempOptions[k] = v } for _, v := range list { - groupPB.MsgData.RecvID = v + req.MsgData.RecvID = v options := make(map[string]bool, 1) for k, v := range tempOptions { options[k] = v } - groupPB.MsgData.Options = options - conversationID := utils.GetConversationIDBySessionType(constant.GroupChatType, groupPB.MsgData.GroupID) - isSend, err := m.modifyMessageByUserMessageReceiveOpt(ctx, v, conversationID, constant.GroupChatType, groupPB) + req.MsgData.Options = options + conversationID := utils.GetConversationIDBySessionType(constant.GroupChatType, req.MsgData.GroupID) + isSend, err := m.modifyMessageByUserMessageReceiveOpt(ctx, v, conversationID, constant.GroupChatType, req) if err != nil { wg.Done() return err } if isSend { - if v == "" || groupPB.MsgData.SendID == "" { + if v == "" || req.MsgData.SendID == "" { return errs.ErrArgs.Wrap("userID or groupPB.MsgData.SendID is empty") } - err := m.MsgDatabase.MsgToMQ(ctx, v, &msgToMQGroup) + err := m.MsgDatabase.MsgToMQ(ctx, v, req.MsgData) if err != nil { wg.Done() return err diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index 219d10d7a..ccced1005 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -21,8 +21,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) return nil, err } - msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} - err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.GroupID, &msgToMQSingle) + err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.GroupID, req.MsgData) if err != nil { return nil, err } @@ -32,27 +31,26 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR } promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter) - resp.SendTime = msgToMQSingle.MsgData.SendTime - resp.ServerMsgID = msgToMQSingle.MsgData.ServerMsgID - resp.ClientMsgID = msgToMQSingle.MsgData.ClientMsgID + resp.SendTime = req.MsgData.SendTime + resp.ServerMsgID = req.MsgData.ServerMsgID + resp.ClientMsgID = req.MsgData.ClientMsgID return resp, nil } func (m *msgServer) sendMsgNotification(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, err error) { - msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} - err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.RecvID, &msgToMQSingle) + err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.RecvID, req.MsgData) if err != nil { return nil, err } - if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself - err = m.MsgDatabase.MsgToMQ(ctx, msgToMQSingle.MsgData.SendID, &msgToMQSingle) + if req.MsgData.SendID != req.MsgData.RecvID { //Filter messages sent to yourself + err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.SendID, req.MsgData) if err != nil { return nil, err } } resp = &msg.SendMsgResp{ - ServerMsgID: msgToMQSingle.MsgData.ServerMsgID, - ClientMsgID: msgToMQSingle.MsgData.ClientMsgID, - SendTime: msgToMQSingle.MsgData.SendTime, + ServerMsgID: req.MsgData.ServerMsgID, + ClientMsgID: req.MsgData.ClientMsgID, + SendTime: req.MsgData.SendTime, } return resp, nil } @@ -67,15 +65,14 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) if err != nil { return nil, err } - msgToMQSingle := msg.MsgDataToMQ{MsgData: req.MsgData} if isSend { - err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.RecvID, &msgToMQSingle) + err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.RecvID, req.MsgData) if err != nil { return nil, errs.ErrInternalServer.Wrap("insert to mq") } } - if msgToMQSingle.MsgData.SendID != msgToMQSingle.MsgData.RecvID { //Filter messages sent to yourself - err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.SendID, &msgToMQSingle) + if req.MsgData.SendID != req.MsgData.RecvID { //Filter messages sent to yourself + err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.SendID, req.MsgData) if err != nil { return nil, errs.ErrInternalServer.Wrap("insert to mq") } @@ -86,9 +83,9 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *msg.SendMsgReq) } promePkg.Inc(promePkg.SingleChatMsgProcessSuccessCounter) resp = &msg.SendMsgResp{ - ServerMsgID: msgToMQSingle.MsgData.ServerMsgID, - ClientMsgID: msgToMQSingle.MsgData.ClientMsgID, - SendTime: msgToMQSingle.MsgData.SendTime, + ServerMsgID: req.MsgData.ServerMsgID, + ClientMsgID: req.MsgData.ClientMsgID, + SendTime: req.MsgData.SendTime, } return resp, nil } diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index ec4ed7f22..ced7bf683 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -457,15 +457,15 @@ func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, erro } func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { - return errs.Wrap(c.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err()) + return errs.Wrap(c.rdb.Set(ctx, fcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err()) } func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { - return utils.Wrap2(c.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result()) + return utils.Wrap2(c.rdb.Get(ctx, fcmToken+account+":"+strconv.Itoa(platformID)).Result()) } func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error { - return errs.Wrap(c.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err()) + return errs.Wrap(c.rdb.Del(ctx, fcmToken+account+":"+strconv.Itoa(platformID)).Err()) } func (c *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 1de7aa356..61eb920bf 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -34,7 +34,7 @@ type MsgDatabase interface { // 刪除redis中消息缓存 DeleteMessageFromCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) error // incrSeq然后批量插入缓存 - BatchInsertChat2Cache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) (int64, error) + BatchInsertChat2Cache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) (int64, error) // incrSeq通知seq然后批量插入缓存 NotificationBatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int64, error) // 删除消息 返回不存在的seqList @@ -190,7 +190,7 @@ func (db *msgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.Ms return err } -func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ) error { +func (db *msgDatabase) MsgToModifyMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData) error { if len(messages) > 0 { _, _, err := db.producerToModify.SendMessage(ctx, conversationID, &pbMsg.MsgDataToModifyByMQ{AggregationID: conversationID, Messages: messages}) return err @@ -207,9 +207,9 @@ func (db *msgDatabase) MsgToPushMQ(ctx context.Context, conversationID string, m return partition, offset, err } -func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, sourceID string, messages []*pbMsg.MsgDataToMQ, lastSeq int64) error { +func (db *msgDatabase) MsgToMongoMQ(ctx context.Context, sourceID string, messages []*sdkws.MsgData, lastSeq int64) error { if len(messages) > 0 { - _, _, err := db.producerToModify.SendMessage(ctx, sourceID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, SourceID: sourceID, Messages: messages}) + _, _, err := db.producerToModify.SendMessage(ctx, sourceID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, AggregationID: sourceID, MsgData: messages}) return err } return nil diff --git a/pkg/common/db/controller/notification.go b/pkg/common/db/controller/notification.go index af0c7c989..cb0ee5604 100644 --- a/pkg/common/db/controller/notification.go +++ b/pkg/common/db/controller/notification.go @@ -198,11 +198,11 @@ func (db *notificationDatabase) MsgToModifyMQ(ctx context.Context, aggregationID return nil } -func (db *notificationDatabase) MsgToPushMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) (int32, int64, error) { - mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: msg2mq, SourceID: key} - partition, offset, err := db.producerToPush.SendMessage(ctx, key, &mqPushMsg) +func (db *notificationDatabase) MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) { + mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID} + partition, offset, err := db.producerToPush.SendMessage(ctx, conversationID, &mqPushMsg) if err != nil { - log.ZError(ctx, "MsgToPushMQ", err, "key", key, "msg2mq", msg2mq) + log.ZError(ctx, "MsgToPushMQ", err, "conversationID", conversationID, "msg2mq", msg2mq) } return partition, offset, err } diff --git a/pkg/common/db/table/unrelation/notification.go b/pkg/common/db/table/unrelation/notification.go index e5b674f0b..cb4eca55e 100644 --- a/pkg/common/db/table/unrelation/notification.go +++ b/pkg/common/db/table/unrelation/notification.go @@ -84,7 +84,7 @@ func (m NotificationDocModel) superGroupIndexGen(groupID string, seqSuffix int64 return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10) } -func (m NotificationDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 { +func (m NotificationDocModel) GetDocIDSeqsMap(conversationID string, seqs []int64) map[string][]int64 { t := make(map[string][]int64) for i := 0; i < len(seqs); i++ { docID := m.GetDocID(conversationID, seqs[i])