From b64e9af92fb58c086e1acda83307de86daf00fd8 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Fri, 12 May 2023 20:05:25 +0800 Subject: [PATCH] msg --- internal/msgtransfer/init.go | 9 +- .../online_msg_to_mongo_handler.go | 69 +- internal/rpc/msg/server.go | 25 +- pkg/common/db/cache/notification.go | 515 ------------- pkg/common/db/controller/common_msg.go | 36 +- pkg/common/db/controller/notification.go | 721 ------------------ pkg/common/db/localcache/conversation.go | 2 + pkg/common/db/localcache/meta_local_cache.go | 1 + pkg/common/db/unrelation/notification.go | 134 ---- 9 files changed, 43 insertions(+), 1469 deletions(-) delete mode 100644 pkg/common/db/cache/notification.go delete mode 100644 pkg/common/db/controller/notification.go create mode 100644 pkg/common/db/localcache/meta_local_cache.go delete mode 100644 pkg/common/db/unrelation/notification.go diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 2d5fb73e1..2be9d9743 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -51,27 +51,24 @@ func StartTransfer(prometheusPort int) error { } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) msgModel := cache.NewMsgCacheModel(rdb) - notificationModel := cache.NewNotificationCacheModel(rdb) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) - notificationDocModel := unrelation.NewNotificationMongoDriver(mongo.GetDatabase()) extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase()) extendMsgCache := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt()) chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db)) extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient())) msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel) - notificationDatabase := controller.NewNotificationDatabase(notificationDocModel, notificationModel) conversationRpcClient := rpcclient.NewConversationClient(client) - msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, notificationDatabase, conversationRpcClient) + msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, conversationRpcClient) msgTransfer.initPrometheus() return msgTransfer.Start(prometheusPort) } func NewMsgTransfer(chatLogDatabase controller.ChatLogDatabase, - extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.CommonMsgDatabase, notificationDatabase controller.NotificationDatabase, + extendMsgDatabase controller.ExtendMsgDatabase, msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationClient) *MsgTransfer { return &MsgTransfer{persistentCH: NewPersistentConsumerHandler(chatLogDatabase), historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient), - historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase, notificationDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)} + historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase), modifyCH: NewModifyMsgConsumerHandler(extendMsgDatabase)} } func (m *MsgTransfer) initPrometheus() { diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 96060461d..464de6e45 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -4,14 +4,12 @@ import ( "context" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" - "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" kfk "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/Shopify/sarama" "google.golang.org/protobuf/proto" @@ -20,16 +18,14 @@ import ( type OnlineHistoryMongoConsumerHandler struct { historyConsumerGroup *kfk.MConsumerGroup msgDatabase controller.CommonMsgDatabase - notificationDatabase controller.NotificationDatabase } -func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase, notificationDatabase controller.NotificationDatabase) *OnlineHistoryMongoConsumerHandler { +func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase) *OnlineHistoryMongoConsumerHandler { mc := &OnlineHistoryMongoConsumerHandler{ historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{KafkaVersion: sarama.V2_0_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false}, []string{config.Config.Kafka.MsgToMongo.Topic}, config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo), - msgDatabase: database, - notificationDatabase: notificationDatabase, + msgDatabase: database, } return mc } @@ -37,7 +33,6 @@ func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase, func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Context, cMsg *sarama.ConsumerMessage, conversationID string, session sarama.ConsumerGroupSession) { msg := cMsg.Value msgFromMQ := pbMsg.MsgDataToMongoByMQ{} - operationID := mcontext.GetOperationID(ctx) err := proto.Unmarshal(msg, &msgFromMQ) if err != nil { log.ZError(ctx, "unmarshall failed", err, "conversationID", conversationID, "len", len(msg)) @@ -48,52 +43,28 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont return } log.ZInfo(ctx, "mongo consumer recv msg", "msgs", msgFromMQ.MsgData) - isNotification := msgFromMQ.MsgData[0].Options[constant.IsNotification] - if isNotification { - err = mc.notificationDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) - if err != nil { - log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.MsgData, msgFromMQ.ConversationID, msgFromMQ.TriggerID) - } - err = mc.notificationDatabase.DeleteMessageFromCache(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData) - if err != nil { - log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.MsgData, msgFromMQ.ConversationID, msgFromMQ.TriggerID) - } - for _, v := range msgFromMQ.MsgData { - if v.ContentType == constant.DeleteMessageNotification { - deleteMessageTips := sdkws.DeleteMessageTips{} - err := proto.Unmarshal(v.Content, &deleteMessageTips) - if err != nil { - log.NewError(operationID, "tips unmarshal err:", err.Error(), v.String()) - continue - } - if totalUnExistSeqs, err := mc.notificationDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", deleteMessageTips.UserID, deleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs) - } + err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) + if err != nil { + log.ZError(ctx, "single data insert to mongo err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID, "triggerID", msgFromMQ.TriggerID) + } + err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData) + if err != nil { + log.ZError(ctx, "remove cache msg from redis err", err, "msg", msgFromMQ.MsgData, "conversationID", msgFromMQ.ConversationID, "triggerID", msgFromMQ.TriggerID) + } + for _, v := range msgFromMQ.MsgData { + if v.ContentType == constant.DeleteMessageNotification { + deleteMessageTips := sdkws.DeleteMessageTips{} + err := proto.Unmarshal(v.Content, &deleteMessageTips) + if err != nil { + log.ZError(ctx, "tips unmarshal err:", err, "msg", msg) + continue } - } - } else { - err = mc.msgDatabase.BatchInsertChat2DB(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData, msgFromMQ.LastSeq) - if err != nil { - log.NewError(operationID, "single data insert to mongo err", err.Error(), msgFromMQ.MsgData, msgFromMQ.ConversationID, msgFromMQ.TriggerID) - } - err = mc.msgDatabase.DeleteMessageFromCache(ctx, msgFromMQ.ConversationID, msgFromMQ.MsgData) - if err != nil { - log.NewError(operationID, "remove cache msg from redis err", err.Error(), msgFromMQ.MsgData, msgFromMQ.ConversationID, msgFromMQ.TriggerID) - } - for _, v := range msgFromMQ.MsgData { - if v.ContentType == constant.DeleteMessageNotification { - deleteMessageTips := sdkws.DeleteMessageTips{} - err := proto.Unmarshal(v.Content, &deleteMessageTips) - if err != nil { - log.NewError(operationID, "tips unmarshal err:", err.Error(), v.String()) - continue - } - if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil { - log.NewError(operationID, utils.GetSelfFuncName(), "DelMsgBySeqs args: ", deleteMessageTips.UserID, deleteMessageTips.Seqs, "error:", err.Error(), "totalUnExistSeqs: ", totalUnExistSeqs) - } + if totalUnExistSeqs, err := mc.msgDatabase.DelMsgBySeqs(ctx, deleteMessageTips.UserID, deleteMessageTips.Seqs); err != nil { + log.ZError(ctx, "DelMsgBySeqs", err, "userIDs", deleteMessageTips.UserID, "seqs", deleteMessageTips.Seqs, "totalUnExistSeqs", totalUnExistSeqs) } } } + } func (OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 1c36f3b87..7ecc98b9c 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -23,18 +23,17 @@ import ( type MessageInterceptorChain []MessageInterceptorFunc type msgServer struct { - RegisterCenter discoveryregistry.SvcDiscoveryRegistry - MsgDatabase controller.CommonMsgDatabase - notificationDatabase controller.NotificationDatabase - ExtendMsgDatabase controller.ExtendMsgDatabase - Group *rpcclient.GroupClient - User *rpcclient.UserClient - Conversation *rpcclient.ConversationClient - friend *rpcclient.FriendClient - black *rpcclient.BlackClient - GroupLocalCache *localcache.GroupLocalCache - MessageLocker MessageLocker - Handlers MessageInterceptorChain + RegisterCenter discoveryregistry.SvcDiscoveryRegistry + MsgDatabase controller.CommonMsgDatabase + ExtendMsgDatabase controller.ExtendMsgDatabase + Group *rpcclient.GroupClient + User *rpcclient.UserClient + Conversation *rpcclient.ConversationClient + friend *rpcclient.FriendClient + black *rpcclient.BlackClient + GroupLocalCache *localcache.GroupLocalCache + MessageLocker MessageLocker + Handlers MessageInterceptorChain } func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) { @@ -159,7 +158,7 @@ func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessag for i := seq.Begin; i <= seq.End; i++ { seqs = append(seqs, i) } - notificationMsgs, err := m.notificationDatabase.GetMsgBySeqs(ctx, seq.ConversationID, seqs) + notificationMsgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, seq.ConversationID, seqs) if err != nil { return nil, err } diff --git a/pkg/common/db/cache/notification.go b/pkg/common/db/cache/notification.go deleted file mode 100644 index e64a422cf..000000000 --- a/pkg/common/db/cache/notification.go +++ /dev/null @@ -1,515 +0,0 @@ -package cache - -import ( - "context" - "errors" - "fmt" - "strconv" - "time" - - "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" - - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" - "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" - "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "github.com/gogo/protobuf/jsonpb" - - "google.golang.org/protobuf/proto" - - "github.com/go-redis/redis/v8" -) - -const ( - NotificationUserIncrSeq = "NOTIFICATION_REDIS_USER_INCR_SEQ:" // user incr seq - NotificationUserMinSeq = "NOTIFICATION_REDIS_USER_MIN_SEQ:" - NotificationGetuiToken = "NOTIFICATION_GETUI_TOKEN" - NotificationGetuiTaskID = "NOTIFICATION_GETUI_TASK_ID" - NotificationMessageCache = "NOTIFICATION_MESSAGE_CACHE:" - NotificationSignalCache = "NOTIFICATION_SIGNAL_CACHE:" - NotificationSignalListCache = "NOTIFICATION_SIGNAL_LIST_CACHE:" - NotificationFcmToken = "NOTIFICATION_FCM_TOKEN:" - NotificationGroupUserMinSeq = "NOTIFICATION_GROUP_USER_MIN_SEQ:" - NotificationGroupMaxSeq = "NOTIFICATION_GROUP_MAX_SEQ:" - NotificationGroupMinSeq = "NOTIFICATION_GROUP_MIN_SEQ:" - NotificationSendMsgFailedFlag = "NOTIFICATION_SEND_MSG_FAILED_FLAG:" - NotificationUserBadgeUnreadCountSum = "NOTIFICATION_USER_BADGE_UNREAD_COUNT_SUM:" - NotificationExTypeKeyLocker = "NOTIFICATION_EX_LOCK:" - NotificationUidPidToken = "NOTIFICATION_UID_PID_TOKEN_STATUS:" -) - -type NotificationModel interface { - IncrUserSeq(ctx context.Context, userID string) (int64, error) - GetUserMaxSeq(ctx context.Context, userID string) (int64, error) - SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error - SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) - GetUserMinSeq(ctx context.Context, userID string) (int64, error) - SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) - GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) - GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) - GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) - IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) - SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error - SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error - AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error - GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) - SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error - DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error - GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) - SetMessageToCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) (int, error) - DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error - CleanUpOneUserAllMsg(ctx context.Context, userID string) error - HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) - GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *sdkws.SignalInviteReq, err error) - GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error) - DelUserSignalList(ctx context.Context, userID string) error - DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error - SetGetuiToken(ctx context.Context, token string, expireTime int64) error - GetGetuiToken(ctx context.Context) (string, error) - SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error - GetGetuiTaskID(ctx context.Context) (string, error) - SetSendMsgStatus(ctx context.Context, id string, status int32) error - GetSendMsgStatus(ctx context.Context, id string) (int32, error) - SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) - GetFcmToken(ctx context.Context, account string, platformID int) (string, error) - DelFcmToken(ctx context.Context, account string, platformID int) error - IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) - SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error - GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) - JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) - GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) - DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error - SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) - GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) - SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error - LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error - UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error -} - -func NewNotificationCacheModel(client redis.UniversalClient) NotificationModel { - return ¬ificationCache{rdb: client} -} - -type notificationCache struct { - rdb redis.UniversalClient -} - -// 兼容老版本调用 -func (c *notificationCache) DelKeys() { - for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE:", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE:", "JOINED_GROUP_LIST_CACHE:", - "GROUP_MEMBER_INFO_CACHE:", "GROUP_ALL_MEMBER_INFO_CACHE:", "ALL_FRIEND_INFO_CACHE:"} { - fName := utils.GetSelfFuncName() - var cursor uint64 - var n int - for { - var keys []string - var err error - keys, cursor, err = c.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result() - if err != nil { - panic(err.Error()) - } - n += len(keys) - // for each for redis cluster - for _, key := range keys { - if err = c.rdb.Del(context.Background(), key).Err(); err != nil { - log.NewError("", fName, key, err.Error()) - err = c.rdb.Del(context.Background(), key).Err() - if err != nil { - panic(err.Error()) - } - } - } - if cursor == 0 { - break - } - } - } -} - -func (c *notificationCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) { - return utils.Wrap2(c.rdb.Get(ctx, NotificationUserIncrSeq+userID).Int64()) -} - -func (c *notificationCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { - return utils.Wrap2(c.rdb.Get(ctx, NotificationUserIncrSeq+userID).Int64()) -} - -func (c *notificationCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error { - return errs.Wrap(c.rdb.Set(ctx, NotificationUserIncrSeq+userID, maxSeq, 0).Err()) -} - -func (c *notificationCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { - return errs.Wrap(c.rdb.Set(ctx, NotificationUserMinSeq+userID, minSeq, 0).Err()) -} - -func (c *notificationCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { - return utils.Wrap2(c.rdb.Get(ctx, NotificationUserMinSeq+userID).Int64()) -} - -func (c *notificationCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { - key := NotificationGroupUserMinSeq + "g:" + groupID + "u:" + userID - return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err()) -} - -func (c *notificationCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { - key := NotificationGroupUserMinSeq + "g:" + groupID + "u:" + userID - return utils.Wrap2(c.rdb.Get(ctx, key).Int64()) -} - -func (c *notificationCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { - return utils.Wrap2(c.rdb.Get(ctx, NotificationGroupMaxSeq+groupID).Int64()) -} - -func (c *notificationCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { - return utils.Wrap2(c.rdb.Get(ctx, NotificationGroupMinSeq+groupID).Int64()) -} - -func (c *notificationCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { - key := NotificationGroupMaxSeq + groupID - seq, err := c.rdb.Incr(ctx, key).Uint64() - return int64(seq), errs.Wrap(err) -} - -func (c *notificationCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error { - key := NotificationGroupMaxSeq + groupID - return errs.Wrap(c.rdb.Set(ctx, key, maxSeq, 0).Err()) -} - -func (c *notificationCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error { - key := NotificationGroupMinSeq + groupID - return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err()) -} - -func (c *notificationCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { - key := NotificationUidPidToken + userID + ":" + constant.PlatformIDToName(platformID) - return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err()) -} - -func (c *notificationCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) { - key := NotificationUidPidToken + userID + ":" + platformID - m, err := c.rdb.HGetAll(ctx, key).Result() - if err != nil { - return nil, errs.Wrap(err) - } - mm := make(map[string]int) - for k, v := range m { - mm[k] = utils.StringToInt(v) - } - return mm, nil -} - -func (c *notificationCache) SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error { - key := NotificationUidPidToken + userID + ":" + platform - mm := make(map[string]interface{}) - for k, v := range m { - mm[k] = v - } - return errs.Wrap(c.rdb.HSet(ctx, key, mm).Err()) -} - -func (c *notificationCache) DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error { - key := NotificationUidPidToken + userID + ":" + platform - return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err()) -} - -func (c *notificationCache) getMessageCacheKey(conversationID string, seq int64) string { - return NotificationMessageCache + conversationID + "_" + strconv.Itoa(int(seq)) -} - -func (c *notificationCache) allMessageCacheKey(conversationID string) string { - return NotificationMessageCache + conversationID + "_*" -} - -func (c *notificationCache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { - pipe := c.rdb.Pipeline() - for _, v := range seqs { - //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 - key := c.getMessageCacheKey(userID, v) - if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil { - return nil, nil, err - } - } - result, err := pipe.Exec(ctx) - for i, v := range result { - if v.Err() != nil { - failedSeqs = append(failedSeqs, seqs[i]) - } else { - msg := sdkws.MsgData{} - err = jsonpb.UnmarshalString(v.String(), &msg) - if err != nil { - failedSeqs = append(failedSeqs, seqs[i]) - } else { - seqMsgs = append(seqMsgs, &msg) - } - } - } - return seqMsgs, failedSeqs, err -} - -func (c *notificationCache) SetMessageToCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) (int, error) { - pipe := c.rdb.Pipeline() - var failedMsgs []sdkws.MsgData - for _, msg := range msgList { - key := c.getMessageCacheKey(userID, msg.Seq) - s, err := utils.Pb2String(msg) - if err != nil { - return 0, errs.Wrap(err) - } - err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() - if err != nil { - return 0, errs.Wrap(err) - } - } - if len(failedMsgs) != 0 { - return len(failedMsgs), fmt.Errorf("set msg to notificationCache failed, failed lists: %v, %s", failedMsgs, userID) - } - _, err := pipe.Exec(ctx) - return 0, err -} - -func (c *notificationCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error { - pipe := c.rdb.Pipeline() - for _, v := range msgList { - if err := pipe.Del(ctx, c.getMessageCacheKey(userID, v.Seq)).Err(); err != nil { - return errs.Wrap(err) - } - } - _, err := pipe.Exec(ctx) - return errs.Wrap(err) -} - -func (c *notificationCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { - vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(userID)).Result() - if err == redis.Nil { - return nil - } - if err != nil { - return errs.Wrap(err) - } - pipe := c.rdb.Pipeline() - for _, v := range vals { - if err := pipe.Del(ctx, v).Err(); err != nil { - return errs.Wrap(err) - } - } - _, err = pipe.Exec(ctx) - return errs.Wrap(err) -} - -func (c *notificationCache) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) { - req := &sdkws.SignalReq{} - if err := proto.Unmarshal(msg.Content, req); err != nil { - return false, errs.Wrap(err) - } - var inviteeUserIDs []string - var isInviteSignal bool - switch signalInfo := req.Payload.(type) { - case *sdkws.SignalReq_Invite: - inviteeUserIDs = signalInfo.Invite.Invitation.InviteeUserIDList - isInviteSignal = true - case *sdkws.SignalReq_InviteInGroup: - inviteeUserIDs = signalInfo.InviteInGroup.Invitation.InviteeUserIDList - isInviteSignal = true - if !utils.Contain(pushToUserID, inviteeUserIDs...) { - return false, nil - } - case *sdkws.SignalReq_HungUp, *sdkws.SignalReq_Cancel, *sdkws.SignalReq_Reject, *sdkws.SignalReq_Accept: - return false, errs.Wrap(errors.New("signalInfo do not need offlinePush")) - default: - return false, nil - } - if isInviteSignal { - pipe := c.rdb.Pipeline() - for _, userID := range inviteeUserIDs { - timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout) - if err != nil { - return false, errs.Wrap(err) - } - keys := NotificationSignalListCache + userID - err = pipe.LPush(ctx, keys, msg.ClientMsgID).Err() - if err != nil { - return false, errs.Wrap(err) - } - err = pipe.Expire(ctx, keys, time.Duration(timeout)*time.Second).Err() - if err != nil { - return false, errs.Wrap(err) - } - key := NotificationSignalCache + msg.ClientMsgID - err = pipe.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err() - if err != nil { - return false, errs.Wrap(err) - } - } - _, err := pipe.Exec(ctx) - if err != nil { - return false, errs.Wrap(err) - } - } - return true, nil -} - -func (c *notificationCache) GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (signalInviteReq *sdkws.SignalInviteReq, err error) { - bytes, err := c.rdb.Get(ctx, NotificationSignalCache+clientMsgID).Bytes() - if err != nil { - return nil, errs.Wrap(err) - } - signalReq := &sdkws.SignalReq{} - if err = proto.Unmarshal(bytes, signalReq); err != nil { - return nil, errs.Wrap(err) - } - signalInviteReq = &sdkws.SignalInviteReq{} - switch req := signalReq.Payload.(type) { - case *sdkws.SignalReq_Invite: - signalInviteReq.Invitation = req.Invite.Invitation - signalInviteReq.OpUserID = req.Invite.OpUserID - case *sdkws.SignalReq_InviteInGroup: - signalInviteReq.Invitation = req.InviteInGroup.Invitation - signalInviteReq.OpUserID = req.InviteInGroup.OpUserID - } - return signalInviteReq, nil -} - -func (c *notificationCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error) { - key, err := c.rdb.LPop(ctx, NotificationSignalListCache+userID).Result() - if err != nil { - return nil, errs.Wrap(err) - } - invitationInfo, err = c.GetSignalInvitationInfoByClientMsgID(ctx, key) - if err != nil { - return nil, err - } - return invitationInfo, errs.Wrap(c.DelUserSignalList(ctx, userID)) -} - -func (c *notificationCache) DelUserSignalList(ctx context.Context, userID string) error { - return errs.Wrap(c.rdb.Del(ctx, NotificationSignalListCache+userID).Err()) -} - -func (c *notificationCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error { - for _, seq := range seqs { - key := c.getMessageCacheKey(userID, seq) - result, err := c.rdb.Get(ctx, key).Result() - if err != nil { - if err == redis.Nil { - continue - } - return errs.Wrap(err) - } - var msg sdkws.MsgData - if err := jsonpb.UnmarshalString(result, &msg); err != nil { - return err - } - msg.Status = constant.MsgDeleted - s, err := utils.Pb2String(&msg) - if err != nil { - return errs.Wrap(err) - } - if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { - return errs.Wrap(err) - } - } - return nil -} - -func (c *notificationCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { - return errs.Wrap(c.rdb.Set(ctx, NotificationGetuiToken, token, time.Duration(expireTime)*time.Second).Err()) -} - -func (c *notificationCache) GetGetuiToken(ctx context.Context) (string, error) { - return utils.Wrap2(c.rdb.Get(ctx, NotificationGetuiToken).Result()) -} - -func (c *notificationCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { - return errs.Wrap(c.rdb.Set(ctx, NotificationGetuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()) -} - -func (c *notificationCache) GetGetuiTaskID(ctx context.Context) (string, error) { - return utils.Wrap2(c.rdb.Get(ctx, NotificationGetuiTaskID).Result()) -} - -func (c *notificationCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { - return errs.Wrap(c.rdb.Set(ctx, NotificationSendMsgFailedFlag+id, status, time.Hour*24).Err()) -} - -func (c *notificationCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { - result, err := c.rdb.Get(ctx, NotificationSendMsgFailedFlag+id).Int() - return int32(result), errs.Wrap(err) -} - -func (c *notificationCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { - return errs.Wrap(c.rdb.Set(ctx, NotificationFcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err()) -} - -func (c *notificationCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { - return utils.Wrap2(c.rdb.Get(ctx, NotificationFcmToken+account+":"+strconv.Itoa(platformID)).Result()) -} - -func (c *notificationCache) DelFcmToken(ctx context.Context, account string, platformID int) error { - return errs.Wrap(c.rdb.Del(ctx, NotificationFcmToken+account+":"+strconv.Itoa(platformID)).Err()) -} - -func (c *notificationCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { - seq, err := c.rdb.Incr(ctx, NotificationUserBadgeUnreadCountSum+userID).Result() - return int(seq), errs.Wrap(err) -} - -func (c *notificationCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error { - return errs.Wrap(c.rdb.Set(ctx, NotificationUserBadgeUnreadCountSum+userID, value, 0).Err()) -} - -func (c *notificationCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { - return utils.Wrap2(c.rdb.Get(ctx, NotificationUserBadgeUnreadCountSum+userID).Int()) -} - -func (c *notificationCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { - key := NotificationExTypeKeyLocker + clientMsgID + "_" + TypeKey - return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err()) -} - -func (c *notificationCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { - key := NotificationExTypeKeyLocker + clientMsgID + "_" + TypeKey - return errs.Wrap(c.rdb.Del(ctx, key).Err()) -} - -func (c *notificationCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { - switch sessionType { - case constant.SingleChatType: - return "EX_SINGLE_" + clientMsgID - case constant.GroupChatType: - return "EX_GROUP_" + clientMsgID - case constant.SuperGroupChatType: - return "EX_SUPER_GROUP_" + clientMsgID - case constant.NotificationChatType: - return "EX_NOTIFICATION" + clientMsgID - } - return "" -} - -func (c *notificationCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { - n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() - if err != nil { - return false, utils.Wrap(err, "") - } - return n > 0, nil -} - -func (c *notificationCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { - return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) -} - -func (c *notificationCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { - return utils.Wrap2(c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()) -} - -func (c *notificationCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { - return utils.Wrap2(c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()) -} - -func (c *notificationCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { - return utils.Wrap2(c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()) -} - -func (c *notificationCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { - return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) -} diff --git a/pkg/common/db/controller/common_msg.go b/pkg/common/db/controller/common_msg.go index 8216e4103..9195046fb 100644 --- a/pkg/common/db/controller/common_msg.go +++ b/pkg/common/db/controller/common_msg.go @@ -346,38 +346,12 @@ func (db *commonMsgDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (m } func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, err error) { - var hasSeqs []int64 - singleCount := 0 - m := db.msg.GetDocIDSeqsMap(conversationID, seqs) - for docID, value := range m { - doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) - if err != nil { - log.ZError(ctx, "get message from mongo exception", err, "docID", docID) - continue - } - singleCount = 0 - for i := 0; i < len(doc.Msg); i++ { - msgPb, err := db.unmarshalMsg(&doc.Msg[i]) - if err != nil { - log.ZError(ctx, "unmarshal message exception", err, "docID", docID, "msg", &doc.Msg[i]) - return nil, err - } - if utils.Contain(msgPb.Seq, value...) { - seqMsgs = append(seqMsgs, msgPb) - hasSeqs = append(hasSeqs, msgPb.Seq) - singleCount++ - if singleCount == len(value) { - break - } - } - } + seqMsgs, unexistSeqs, err := db.findMsgBySeq(ctx, conversationID, seqs) + if err != nil { + return nil, err } - if len(hasSeqs) != len(seqs) { - var diff []int64 - var exceptionMsg []*sdkws.MsgData - diff = utils.Difference(hasSeqs, seqs) - exceptionMsg = db.msg.GenExceptionSuperGroupMessageBySeqs(diff, conversationID) - seqMsgs = append(seqMsgs, exceptionMsg...) + for _, unexistSeq := range unexistSeqs { + seqMsgs = append(seqMsgs, db.msg.GenExceptionMessageBySeqs([]int64{unexistSeq})...) } return seqMsgs, nil } diff --git a/pkg/common/db/controller/notification.go b/pkg/common/db/controller/notification.go deleted file mode 100644 index 032a0fa51..000000000 --- a/pkg/common/db/controller/notification.go +++ /dev/null @@ -1,721 +0,0 @@ -package controller - -import ( - "fmt" - "sync" - "time" - - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" - unRelationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/kafka" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/mcontext" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" - "github.com/gogo/protobuf/sortkeys" - - "context" - "errors" - - pbMsg "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" - "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" - "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "github.com/go-redis/redis/v8" - "go.mongodb.org/mongo-driver/mongo" - - "google.golang.org/protobuf/proto" -) - -type NotificationDatabase interface { - // 批量插入消息 - BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error - // 刪除redis中消息缓存 - DeleteMessageFromCache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) error - // incrSeq然后批量插入缓存 - BatchInsertChat2Cache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) (int64, error) - // 删除消息 返回不存在的seqList - DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) - // 获取群ID或者UserID最新一条在mongo里面的消息 - // 通过seqList获取mongo中写扩散消息 - GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) - // 通过seqList获取大群在 mongo里面的消息 - GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (seqMsg []*sdkws.MsgData, err error) - // 删除用户所有消息/redis/mongo然后重置seq - CleanUpUserMsg(ctx context.Context, userID string) error - // 删除大群消息重置群成员最小群seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除 redis cache) - DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error - // 删除用户消息重置最小seq, remainTime为消息保留的时间单位秒,超时消息删除, 传0删除所有消息(此方法不删除redis cache) - DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error - // 获取用户 seq mongo和redis - GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) - // 获取群 seq mongo和redis - GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) - // 设置群用户最小seq 直接调用cache - SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) - GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) - - // 设置用户最小seq 直接调用cache - SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) - - JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) - - SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error - - SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) - GetExtendMsg(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) - InsertOrUpdateReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error - GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) - GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) - DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error - DeleteReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensionList map[string]*sdkws.KeyValue) error - SetSendMsgStatus(ctx context.Context, id string, status int32) error - GetSendMsgStatus(ctx context.Context, id string) (int32, error) - GetUserMaxSeq(ctx context.Context, userID string) (int64, error) - GetUserMinSeq(ctx context.Context, userID string) (int64, error) - GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) - GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) - - MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error - MsgToModifyMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData) error - MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) - MsgToMongoMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error -} - -func NewNotificationDatabase(msgDocModel unRelationTb.NotificationDocModelInterface, cacheModel cache.NotificationModel) NotificationDatabase { - return ¬ificationDatabase{ - msgDocDatabase: msgDocModel, - cache: cacheModel, - producer: kafka.NewKafkaProducer(config.Config.Kafka.Ws2mschat.Addr, config.Config.Kafka.Ws2mschat.Topic), - producerToMongo: kafka.NewKafkaProducer(config.Config.Kafka.MsgToMongo.Addr, config.Config.Kafka.MsgToMongo.Topic), - producerToPush: kafka.NewKafkaProducer(config.Config.Kafka.Ms2pschat.Addr, config.Config.Kafka.Ms2pschat.Topic), - producerToModify: kafka.NewKafkaProducer(config.Config.Kafka.MsgToModify.Addr, config.Config.Kafka.MsgToModify.Topic), - } -} - -func InitNotificationDatabase(rdb redis.UniversalClient, database *mongo.Database) CommonMsgDatabase { - cacheModel := cache.NewMsgCacheModel(rdb) - msgDocModel := unrelation.NewMsgMongoDriver(database) - msgDatabase := NewCommonMsgDatabase(msgDocModel, cacheModel) - return msgDatabase -} - -type notificationDatabase struct { - msgDocDatabase unRelationTb.NotificationDocModelInterface - extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface - cache cache.NotificationModel - producer *kafka.Producer - producerToMongo *kafka.Producer - producerToModify *kafka.Producer - producerToPush *kafka.Producer - // model - //msg unRelationTb.MsgDocModel - msg unRelationTb.NotificationDocModel - extendMsgSetModel unRelationTb.ExtendMsgSetModel -} - -func (db *notificationDatabase) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { - return db.cache.JudgeMessageReactionExist(ctx, clientMsgID, sessionType) -} - -func (db *notificationDatabase) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { - return db.cache.SetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey, value) -} - -func (db *notificationDatabase) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { - return db.cache.SetMessageReactionExpire(ctx, clientMsgID, sessionType, expiration) -} - -func (db *notificationDatabase) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { - return db.cache.GetMessageTypeKeyValue(ctx, clientMsgID, sessionType, typeKey) -} - -func (db *notificationDatabase) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { - return db.cache.GetOneMessageAllReactionList(ctx, clientMsgID, sessionType) -} - -func (db *notificationDatabase) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { - return db.cache.DeleteOneMessageKey(ctx, clientMsgID, sessionType, subKey) -} - -func (db *notificationDatabase) InsertOrUpdateReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error { - return db.extendMsgDatabase.InsertOrUpdateReactionExtendMsgSet(ctx, conversationID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions)) -} - -func (db *notificationDatabase) GetExtendMsg(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, maxMsgUpdateTime int64) (*pbMsg.ExtendMsg, error) { - extendMsgSet, err := db.extendMsgDatabase.GetExtendMsgSet(ctx, conversationID, sessionType, maxMsgUpdateTime) - if err != nil { - return nil, err - } - extendMsg, ok := extendMsgSet.ExtendMsgs[clientMsgID] - if !ok { - return nil, errors.New(fmt.Sprintf("cant find client msg id: %s", clientMsgID)) - } - reactionExtensionList := make(map[string]*pbMsg.KeyValueResp) - for key, model := range extendMsg.ReactionExtensionList { - reactionExtensionList[key] = &pbMsg.KeyValueResp{ - KeyValue: &sdkws.KeyValue{ - TypeKey: model.TypeKey, - Value: model.Value, - LatestUpdateTime: model.LatestUpdateTime, - }, - } - } - return &pbMsg.ExtendMsg{ - ReactionExtensions: reactionExtensionList, - ClientMsgID: extendMsg.ClientMsgID, - MsgFirstModifyTime: extendMsg.MsgFirstModifyTime, - AttachedInfo: extendMsg.AttachedInfo, - Ex: extendMsg.Ex, - }, nil -} - -func (db *notificationDatabase) DeleteReactionExtendMsgSet(ctx context.Context, conversationID string, sessionType int32, clientMsgID string, msgFirstModifyTime int64, reactionExtensions map[string]*sdkws.KeyValue) error { - return db.extendMsgDatabase.DeleteReactionExtendMsgSet(ctx, conversationID, sessionType, clientMsgID, msgFirstModifyTime, db.extendMsgSetModel.Pb2Model(reactionExtensions)) -} - -func (db *notificationDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { - return db.cache.SetSendMsgStatus(ctx, id, status) -} - -func (db *notificationDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { - return db.cache.GetSendMsgStatus(ctx, id) -} - -func (db *notificationDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sdkws.MsgData) error { - _, _, err := db.producer.SendMessage(ctx, key, msg2mq) - return err -} - -func (db *notificationDatabase) MsgToModifyMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData) error { - if len(messages) > 0 { - _, _, err := db.producerToModify.SendMessage(ctx, conversationID, &pbMsg.MsgDataToModifyByMQ{ConversationID: conversationID, Messages: messages}) - return err - } - return nil -} - -func (db *notificationDatabase) MsgToPushMQ(ctx context.Context, conversationID string, msg2mq *sdkws.MsgData) (int32, int64, error) { - mqPushMsg := pbMsg.PushMsgDataToMQ{MsgData: msg2mq, ConversationID: conversationID} - partition, offset, err := db.producerToPush.SendMessage(ctx, conversationID, &mqPushMsg) - if err != nil { - log.ZError(ctx, "MsgToPushMQ", err, "conversationID", conversationID, "msg2mq", msg2mq) - } - return partition, offset, err -} - -func (db *notificationDatabase) MsgToMongoMQ(ctx context.Context, conversationID string, messages []*sdkws.MsgData, lastSeq int64) error { - if len(messages) > 0 { - _, _, err := db.producerToModify.SendMessage(ctx, conversationID, &pbMsg.MsgDataToMongoByMQ{LastSeq: lastSeq, ConversationID: conversationID, MsgData: messages}) - return err - } - return nil -} -func (db *notificationDatabase) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { - return db.cache.GetUserMaxSeq(ctx, userID) -} - -func (db *notificationDatabase) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { - return db.cache.GetUserMinSeq(ctx, userID) -} - -func (db *notificationDatabase) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { - return db.cache.GetGroupMaxSeq(ctx, groupID) -} - -func (db *notificationDatabase) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { - return db.cache.GetGroupMinSeq(ctx, groupID) -} - -func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, conversationID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { - if int64(len(msgList)) > db.msg.GetsingleGocNotificationNum() { - return errors.New("too large") - } - var remain int64 - blk0 := db.msg.GetsingleGocNotificationNum() - 1 - //currentMaxSeq 4998 - if currentMaxSeq < db.msg.GetsingleGocNotificationNum() { - remain = blk0 - currentMaxSeq //1 - } else { - excludeBlk0 := currentMaxSeq - blk0 //=1 - //(5000-1)%5000 == 4999 - remain = (db.msg.GetsingleGocNotificationNum() - (excludeBlk0 % db.msg.GetsingleGocNotificationNum())) % db.msg.GetsingleGocNotificationNum() - } - //remain=1 - var insertCounter int64 - msgsToMongo := make([]unRelationTb.NotificationInfoModel, 0) - msgsToMongoNext := make([]unRelationTb.NotificationInfoModel, 0) - docID := "" - docIDNext := "" - var err error - for _, m := range msgList { - currentMaxSeq++ - sMsg := unRelationTb.NotificationInfoModel{} - sMsg.SendTime = m.SendTime - m.Seq = currentMaxSeq - if sMsg.Msg, err = proto.Marshal(m); err != nil { - return utils.Wrap(err, "") - } - if insertCounter < remain { - msgsToMongo = append(msgsToMongo, sMsg) - insertCounter++ - docID = db.msg.GetDocID(conversationID, currentMaxSeq) - } else { - msgsToMongoNext = append(msgsToMongoNext, sMsg) - docIDNext = db.msg.GetDocID(conversationID, currentMaxSeq) - } - } - - if docID != "" { - err = db.msgDocDatabase.PushMsgsToDoc(ctx, docID, msgsToMongo) - if err != nil { - if err == mongo.ErrNoDocuments { - doc := &unRelationTb.NotificationDocModel{} - doc.DocID = docID - doc.Msg = msgsToMongo - if err = db.msgDocDatabase.Create(ctx, doc); err != nil { - prome.Inc(prome.MsgInsertMongoFailedCounter) - return utils.Wrap(err, "") - } - prome.Inc(prome.MsgInsertMongoSuccessCounter) - } else { - prome.Inc(prome.MsgInsertMongoFailedCounter) - return utils.Wrap(err, "") - } - } else { - prome.Inc(prome.MsgInsertMongoSuccessCounter) - } - } - if docIDNext != "" { - nextDoc := &unRelationTb.NotificationDocModel{} - nextDoc.DocID = docIDNext - nextDoc.Msg = msgsToMongoNext - if err = db.msgDocDatabase.Create(ctx, nextDoc); err != nil { - prome.Inc(prome.MsgInsertMongoFailedCounter) - return utils.Wrap(err, "") - } - prome.Inc(prome.MsgInsertMongoSuccessCounter) - } - return nil -} - -func (db *notificationDatabase) DeleteMessageFromCache(ctx context.Context, userID string, msgs []*sdkws.MsgData) error { - return db.cache.DeleteMessageFromCache(ctx, userID, msgs) -} - -func (db *notificationDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgList []*sdkws.MsgData) (int64, error) { - //newTime := utils.GetCurrentTimestampByMill() - lenList := len(msgList) - if int64(lenList) > db.msg.GetsingleGocNotificationNum() { - return 0, errors.New("too large") - } - if lenList < 1 { - return 0, errors.New("too short as 0") - } - // judge sessionType to get seq - var currentMaxSeq int64 - var err error - if msgList[0].SessionType == constant.SuperGroupChatType { - currentMaxSeq, err = db.cache.GetGroupMaxSeq(ctx, conversationID) - //log.Debug(operationID, "constant.SuperGroupChatType lastMaxSeq before add ", currentMaxSeq, "userID ", conversationID, err) - } else { - currentMaxSeq, err = db.cache.GetUserMaxSeq(ctx, conversationID) - //log.Debug(operationID, "constant.SingleChatType lastMaxSeq before add ", currentMaxSeq, "userID ", conversationID, err) - } - if err != nil && err != redis.Nil { - prome.Inc(prome.SeqGetFailedCounter) - return 0, utils.Wrap(err, "") - } - prome.Inc(prome.SeqGetSuccessCounter) - lastMaxSeq := currentMaxSeq - for _, m := range msgList { - currentMaxSeq++ - m.Seq = currentMaxSeq - //log.Debug(operationID, "cache msg node ", m.String(), m.MsgData.ClientMsgID, "userID: ", conversationID, "seq: ", currentMaxSeq) - } - //log.Debug(operationID, "SetMessageToCache ", conversationID, len(msgList)) - failedNum, err := db.cache.SetMessageToCache(ctx, conversationID, msgList) - if err != nil { - prome.Add(prome.MsgInsertRedisFailedCounter, failedNum) - //log.Error(operationID, "setMessageToCache failed, continue ", err.Error(), len(msgList), conversationID) - } else { - prome.Inc(prome.MsgInsertRedisSuccessCounter) - } - //log.Debug(operationID, "batch to redis cost time ", mongo2.getCurrentTimestampByMill()-newTime, conversationID, len(msgList)) - if msgList[0].SessionType == constant.SuperGroupChatType { - err = db.cache.SetGroupMaxSeq(ctx, conversationID, currentMaxSeq) - } else { - err = db.cache.SetUserMaxSeq(ctx, conversationID, currentMaxSeq) - } - if err != nil { - prome.Inc(prome.SeqSetFailedCounter) - } else { - prome.Inc(prome.SeqSetSuccessCounter) - } - return lastMaxSeq, utils.Wrap(err, "") -} - -func (db *notificationDatabase) DelMsgBySeqs(ctx context.Context, userID string, seqs []int64) (totalUnExistSeqs []int64, err error) { - sortkeys.Int64s(seqs) - docIDSeqsMap := db.msg.GetDocIDSeqsMap(userID, seqs) - lock := sync.Mutex{} - var wg sync.WaitGroup - wg.Add(len(docIDSeqsMap)) - for k, v := range docIDSeqsMap { - go func(docID string, seqs []int64) { - defer wg.Done() - unExistSeqList, err := db.DelMsgBySeqsInOneDoc(ctx, docID, seqs) - if err != nil { - return - } - lock.Lock() - totalUnExistSeqs = append(totalUnExistSeqs, unExistSeqList...) - lock.Unlock() - }(k, v) - } - return totalUnExistSeqs, nil -} - -func (db *notificationDatabase) DelMsgBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (unExistSeqs []int64, err error) { - seqMsgs, indexes, unExistSeqs, err := db.GetMsgAndIndexBySeqsInOneDoc(ctx, docID, seqs) - if err != nil { - return nil, err - } - for i, v := range seqMsgs { - if err = db.msgDocDatabase.UpdateMsgStatusByIndexInOneDoc(ctx, docID, v, indexes[i], constant.MsgDeleted); err != nil { - return nil, err - } - } - return unExistSeqs, nil -} - -func (db *notificationDatabase) GetMsgAndIndexBySeqsInOneDoc(ctx context.Context, docID string, seqs []int64) (seqMsgs []*sdkws.MsgData, indexes []int, unExistSeqs []int64, err error) { - doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) - if err != nil { - return nil, nil, nil, err - } - singleCount := 0 - var hasSeqList []int64 - for i := 0; i < len(doc.Msg); i++ { - msgPb, err := db.unmarshalMsg(&doc.Msg[i]) - if err != nil { - return nil, nil, nil, err - } - if utils.Contain(msgPb.Seq, seqs...) { - indexes = append(indexes, i) - seqMsgs = append(seqMsgs, msgPb) - hasSeqList = append(hasSeqList, msgPb.Seq) - singleCount++ - if singleCount == len(seqs) { - break - } - } - } - for _, i := range seqs { - if utils.Contain(i, hasSeqList...) { - continue - } - unExistSeqs = append(unExistSeqs, i) - } - return seqMsgs, indexes, unExistSeqs, nil -} - -func (db *notificationDatabase) GetNewestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) { - msgInfo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID) - if err != nil { - return nil, err - } - return db.unmarshalMsg(msgInfo) -} - -func (db *notificationDatabase) GetOldestMsg(ctx context.Context, conversationID string) (msgPb *sdkws.MsgData, err error) { - msgInfo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID) - if err != nil { - return nil, err - } - return db.unmarshalMsg(msgInfo) -} - -func (db *notificationDatabase) unmarshalMsg(msgInfo *unRelationTb.NotificationInfoModel) (msgPb *sdkws.MsgData, err error) { - msgPb = &sdkws.MsgData{} - err = proto.Unmarshal(msgInfo.Msg, msgPb) - if err != nil { - return nil, utils.Wrap(err, "") - } - return msgPb, nil -} - -func (db *notificationDatabase) getMsgBySeqs(ctx context.Context, conversationID string, seqs []int64, diffusionType int) (seqMsgs []*sdkws.MsgData, err error) { - var hasSeqs []int64 - singleCount := 0 - m := db.msg.GetDocIDSeqsMap(conversationID, seqs) - for docID, value := range m { - doc, err := db.msgDocDatabase.FindOneByDocID(ctx, docID) - if err != nil { - //log.NewError(operationID, "not find seqUid", seqUid, value, uid, seqList, err.Error()) - continue - } - singleCount = 0 - for i := 0; i < len(doc.Msg); i++ { - msgPb, err := db.unmarshalMsg(&doc.Msg[i]) - if err != nil { - //log.NewError(operationID, "Unmarshal err", seqUid, value, uid, seqList, err.Error()) - return nil, err - } - if utils.Contain(msgPb.Seq, value...) { - seqMsgs = append(seqMsgs, msgPb) - hasSeqs = append(hasSeqs, msgPb.Seq) - singleCount++ - if singleCount == len(value) { - break - } - } - } - } - if len(hasSeqs) != len(seqs) { - var diff []int64 - var exceptionMsg []*sdkws.MsgData - diff = utils.Difference(hasSeqs, seqs) - if diffusionType == constant.WriteDiffusion { - exceptionMsg = db.msg.GenExceptionMessageBySeqs(diff) - } else if diffusionType == constant.ReadDiffusion { - exceptionMsg = db.msg.GenExceptionSuperGroupMessageBySeqs(diff, conversationID) - } - seqMsgs = append(seqMsgs, exceptionMsg...) - } - return seqMsgs, nil -} - -func (db *notificationDatabase) GetMsgBySeqs(ctx context.Context, userID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) { - successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, userID, seqs) - if err != nil { - if err != redis.Nil { - prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) - log.Error(mcontext.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs) - } - } - prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) - if len(failedSeqs) > 0 { - mongoMsgs, err := db.getMsgBySeqs(ctx, userID, seqs, constant.WriteDiffusion) - if err != nil { - prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) - return nil, err - } - prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) - successMsgs = append(successMsgs, mongoMsgs...) - } - return successMsgs, nil -} - -func (db *notificationDatabase) GetSuperGroupMsgBySeqs(ctx context.Context, groupID string, seqs []int64) (successMsgs []*sdkws.MsgData, err error) { - successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, groupID, seqs) - if err != nil { - if err != redis.Nil { - prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs)) - log.Error(mcontext.GetOperationID(ctx), "get message from redis exception", err.Error(), failedSeqs) - } - } - prome.Add(prome.MsgPullFromRedisSuccessCounter, len(successMsgs)) - if len(failedSeqs) > 0 { - mongoMsgs, err := db.getMsgBySeqs(ctx, groupID, seqs, constant.ReadDiffusion) - if err != nil { - prome.Add(prome.MsgPullFromMongoFailedCounter, len(failedSeqs)) - return nil, err - } - prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) - successMsgs = append(successMsgs, mongoMsgs...) - } - return successMsgs, nil -} - -func (db *notificationDatabase) CleanUpUserMsg(ctx context.Context, userID string) error { - err := db.DeleteUserMsgsAndSetMinSeq(ctx, userID, 0) - if err != nil { - return err - } - err = db.cache.CleanUpOneUserAllMsg(ctx, userID) - return utils.Wrap(err, "") -} - -func (db *notificationDatabase) DeleteUserSuperGroupMsgsAndSetMinSeq(ctx context.Context, groupID string, userIDs []string, remainTime int64) error { - var delStruct delNotificationRecursionStruct - minSeq, err := db.deleteMsgRecursion(ctx, groupID, unRelationTb.OldestList, &delStruct, remainTime) - if err != nil { - //log.NewError(operationID, utils.GetSelfFuncName(), groupID, "deleteMsg failed") - } - if minSeq == 0 { - return nil - } - //log.NewDebug(operationID, utils.GetSelfFuncName(), "delMsgIDList:", delStruct, "minSeq", minSeq) - for _, userID := range userIDs { - userMinSeq, err := db.cache.GetGroupUserMinSeq(ctx, groupID, userID) - if err != nil && err != redis.Nil { - //log.NewError(operationID, utils.GetSelfFuncName(), "GetGroupUserMinSeq failed", groupID, userID, err.Error()) - continue - } - if userMinSeq > minSeq { - err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, userMinSeq) - } else { - err = db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq) - } - if err != nil { - //log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), groupID, userID, userMinSeq, minSeq) - } - } - return nil -} - -func (db *notificationDatabase) DeleteUserMsgsAndSetMinSeq(ctx context.Context, userID string, remainTime int64) error { - var delStruct delNotificationRecursionStruct - minSeq, err := db.deleteMsgRecursion(ctx, userID, unRelationTb.OldestList, &delStruct, remainTime) - if err != nil { - return utils.Wrap(err, "") - } - if minSeq == 0 { - return nil - } - return db.cache.SetUserMinSeq(ctx, userID, minSeq) -} - -// this is struct for recursion -type delNotificationRecursionStruct struct { - minSeq int64 - delDocIDs []string -} - -func (d *delNotificationRecursionStruct) getSetMinSeq() int64 { - return d.minSeq -} - -// index 0....19(del) 20...69 -// seq 70 -// set minSeq 21 -// recursion 删除list并且返回设置的最小seq -func (db *notificationDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delNotificationRecursionStruct, remainTime int64) (int64, error) { - // find from oldest list - msgs, err := db.msgDocDatabase.GetMsgsByIndex(ctx, conversationID, index) - if err != nil || msgs.DocID == "" { - if err != nil { - if err == unrelation.ErrMsgListNotExist { - log.NewDebug(mcontext.GetOperationID(ctx), utils.GetSelfFuncName(), "ID:", conversationID, "index:", index, err.Error()) - } else { - //log.NewError(operationID, utils.GetSelfFuncName(), "GetUserMsgListByIndex failed", err.Error(), index, ID) - } - } - // 获取报错,或者获取不到了,物理删除并且返回seq delMongoMsgsPhysical(delStruct.delDocIDList), 结束递归 - err = db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs) - if err != nil { - return 0, err - } - return delStruct.getSetMinSeq() + 1, nil - } - //log.NewDebug(operationID, "ID:", conversationID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg)) - if int64(len(msgs.Msg)) > db.msg.GetsingleGocNotificationNum() { - log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID) - } - if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() { - delStruct.delDocIDs = append(delStruct.delDocIDs, msgs.DocID) - lastMsgPb := &sdkws.MsgData{} - err = proto.Unmarshal(msgs.Msg[len(msgs.Msg)-1].Msg, lastMsgPb) - if err != nil { - //log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) - return 0, utils.Wrap(err, "proto.Unmarshal failed") - } - delStruct.minSeq = lastMsgPb.Seq - } else { - var hasMarkDelFlag bool - for _, msg := range msgs.Msg { - msgPb := &sdkws.MsgData{} - err = proto.Unmarshal(msg.Msg, msgPb) - if err != nil { - //log.NewError(operationID, utils.GetSelfFuncName(), err.Error(), len(msgs.Msg)-1, msgs.UID) - return 0, utils.Wrap(err, "proto.Unmarshal failed") - } - if utils.GetCurrentTimestampByMill() > msg.SendTime+(remainTime*1000) { - msgPb.Status = constant.MsgDeleted - bytes, _ := proto.Marshal(msgPb) - msg.Msg = bytes - msg.SendTime = 0 - hasMarkDelFlag = true - } else { - // 到本条消息不需要删除, minSeq置为这条消息的seq - if err := db.msgDocDatabase.Delete(ctx, delStruct.delDocIDs); err != nil { - return 0, err - } - if hasMarkDelFlag { - if err := db.msgDocDatabase.UpdateOneDoc(ctx, msgs); err != nil { - return delStruct.getSetMinSeq(), utils.Wrap(err, "") - } - } - return msgPb.Seq, nil - } - } - } - // 继续递归 index+1 - seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime) - return seq, utils.Wrap(err, "deleteMsg failed") -} - -func (db *notificationDatabase) GetUserMinMaxSeqInMongoAndCache(ctx context.Context, userID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { - minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, userID) - if err != nil { - return 0, 0, 0, 0, err - } - // from cache - minSeqCache, err = db.cache.GetUserMinSeq(ctx, userID) - if err != nil { - return 0, 0, 0, 0, err - } - maxSeqCache, err = db.cache.GetUserMaxSeq(ctx, userID) - if err != nil { - return 0, 0, 0, 0, err - } - return -} - -func (db *notificationDatabase) GetSuperGroupMinMaxSeqInMongoAndCache(ctx context.Context, groupID string) (minSeqMongo, maxSeqMongo, maxSeqCache int64, err error) { - minSeqMongo, maxSeqMongo, err = db.GetMinMaxSeqMongo(ctx, groupID) - if err != nil { - return 0, 0, 0, err - } - maxSeqCache, err = db.cache.GetGroupMaxSeq(ctx, groupID) - if err != nil { - return 0, 0, 0, err - } - return -} - -func (db *notificationDatabase) GetMinMaxSeqMongo(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) { - oldestMsgMongo, err := db.msgDocDatabase.GetOldestMsg(ctx, conversationID) - if err != nil { - return 0, 0, err - } - msgPb, err := db.unmarshalMsg(oldestMsgMongo) - if err != nil { - return 0, 0, err - } - minSeqMongo = msgPb.Seq - newestMsgMongo, err := db.msgDocDatabase.GetNewestMsg(ctx, conversationID) - if err != nil { - return 0, 0, err - } - msgPb, err = db.unmarshalMsg(newestMsgMongo) - if err != nil { - return 0, 0, err - } - maxSeqMongo = msgPb.Seq - return -} - -func (db *notificationDatabase) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { - return db.cache.SetGroupUserMinSeq(ctx, groupID, userID, minSeq) -} - -func (db *notificationDatabase) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { - return db.cache.SetUserMinSeq(ctx, userID, minSeq) -} - -func (db *notificationDatabase) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { - return db.cache.GetGroupUserMinSeq(ctx, groupID, userID) -} diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go index 5d2e11f30..a6c7251d4 100644 --- a/pkg/common/db/localcache/conversation.go +++ b/pkg/common/db/localcache/conversation.go @@ -17,12 +17,14 @@ type ConversationLocalCacheInterface interface { type ConversationLocalCache struct { lock sync.Mutex SuperGroupRecvMsgNotNotifyUserIDs map[string][]string + ConversationIDs map[string][]string client discoveryregistry.SvcDiscoveryRegistry } func NewConversationLocalCache(client discoveryregistry.SvcDiscoveryRegistry) *ConversationLocalCache { return &ConversationLocalCache{ SuperGroupRecvMsgNotNotifyUserIDs: make(map[string][]string, 0), + ConversationIDs: make(map[string][]string, 0), client: client, } } diff --git a/pkg/common/db/localcache/meta_local_cache.go b/pkg/common/db/localcache/meta_local_cache.go new file mode 100644 index 000000000..e0c30e523 --- /dev/null +++ b/pkg/common/db/localcache/meta_local_cache.go @@ -0,0 +1 @@ +package localcache diff --git a/pkg/common/db/unrelation/notification.go b/pkg/common/db/unrelation/notification.go deleted file mode 100644 index bd539d356..000000000 --- a/pkg/common/db/unrelation/notification.go +++ /dev/null @@ -1,134 +0,0 @@ -package unrelation - -import ( - "context" - "errors" - "fmt" - - table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" - "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "google.golang.org/protobuf/proto" -) - -var ErrNotificationListNotExist = errors.New("user not have msg in mongoDB") -var ErrNotificationNotFound = errors.New("msg not found") - -type NotificationMongoDriver struct { - MsgCollection *mongo.Collection - msg table.NotificationDocModel -} - -func NewNotificationMongoDriver(database *mongo.Database) table.NotificationDocModelInterface { - return &NotificationMongoDriver{MsgCollection: database.Collection(table.NotificationDocModel{}.TableName())} -} - -func (m *NotificationMongoDriver) PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []table.NotificationInfoModel) error { - filter := bson.M{"doc_id": docID} - return m.MsgCollection.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgsToMongo}}}).Err() -} - -func (m *NotificationMongoDriver) Create(ctx context.Context, model *table.NotificationDocModel) error { - _, err := m.MsgCollection.InsertOne(ctx, model) - return err -} - -func (m *NotificationMongoDriver) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error { - msg.Status = status - bytes, err := proto.Marshal(msg) - if err != nil { - return utils.Wrap(err, "") - } - _, err = m.MsgCollection.UpdateOne(ctx, bson.M{"doc_id": docID}, bson.M{"$set": bson.M{fmt.Sprintf("msg.%d.msg", seqIndex): bytes}}) - if err != nil { - return utils.Wrap(err, "") - } - return nil -} - -func (m *NotificationMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*table.NotificationDocModel, error) { - doc := &table.NotificationDocModel{} - err := m.MsgCollection.FindOne(ctx, bson.M{"doc_id": docID}).Decode(doc) - return doc, err -} - -func (m *NotificationMongoDriver) GetMsgsByIndex(ctx context.Context, conversationID string, index int64) (*table.NotificationDocModel, error) { - findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"doc_id": 1}) - cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)}}, findOpts) - if err != nil { - return nil, utils.Wrap(err, "") - } - var msgs []table.NotificationDocModel - err = cursor.All(context.Background(), &msgs) - if err != nil { - return nil, utils.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String())) - } - if len(msgs) > 0 { - return &msgs[0], nil - } - return nil, ErrMsgListNotExist -} - -func (m *NotificationMongoDriver) GetNewestMsg(ctx context.Context, conversationID string) (*table.NotificationInfoModel, error) { - var msgDocs []table.NotificationDocModel - cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"doc_id": -1})) - if err != nil { - return nil, utils.Wrap(err, "") - } - err = cursor.All(ctx, &msgDocs) - if err != nil { - return nil, utils.Wrap(err, "") - } - if len(msgDocs) > 0 { - if len(msgDocs[0].Msg) > 0 { - return &msgDocs[0].Msg[len(msgDocs[0].Msg)-1], nil - } - return nil, errors.New("len(msgDocs[0].Msg) < 0") - } - return nil, ErrMsgNotFound -} - -func (m *NotificationMongoDriver) GetOldestMsg(ctx context.Context, conversationID string) (*table.NotificationInfoModel, error) { - var msgDocs []table.NotificationDocModel - cursor, err := m.MsgCollection.Find(ctx, bson.M{"doc_id": bson.M{"$regex": fmt.Sprintf("^%s:", conversationID)}}, options.Find().SetLimit(1).SetSort(bson.M{"doc_id": 1})) - if err != nil { - return nil, err - } - err = cursor.All(ctx, &msgDocs) - if err != nil { - return nil, utils.Wrap(err, "") - } - var oldestMsg table.NotificationInfoModel - if len(msgDocs) > 0 { - for _, v := range msgDocs[0].Msg { - if v.SendTime != 0 { - oldestMsg = v - break - } - } - if len(oldestMsg.Msg) == 0 { - if len(msgDocs[0].Msg) > 0 { - oldestMsg = msgDocs[0].Msg[0] - } - } - return &oldestMsg, nil - } - return nil, ErrMsgNotFound -} - -func (m *NotificationMongoDriver) Delete(ctx context.Context, docIDs []string) error { - if docIDs == nil { - return nil - } - _, err := m.MsgCollection.DeleteMany(ctx, bson.M{"doc_id": bson.M{"$in": docIDs}}) - return err -} - -func (m *NotificationMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.NotificationDocModel) error { - _, err := m.MsgCollection.UpdateOne(ctx, bson.M{"doc_id": msg.DocID}, bson.M{"$set": bson.M{"msg": msg.Msg}}) - return err -}