diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index b1f10ea41..70bb5dfbb 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -2,8 +2,6 @@ package msgtransfer import ( "fmt" - "sync" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" @@ -43,14 +41,16 @@ func StartTransfer(prometheusPort int) error { if err != nil { return err } - cacheModel := cache.NewCacheModel(rdb) + msgModel := cache.NewMsgCacheModel(rdb) + notificationModel := cache.NewNotificationCacheModel(rdb) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) + notificationDocModel := unrelation.NewNotificationMongoDriver(mongo.GetDatabase()) extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase()) extendMsgCache := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt()) chatLogDatabase := controller.NewChatLogDatabase(relation.NewChatLogGorm(db)) extendMsgDatabase := controller.NewExtendMsgDatabase(extendMsgModel, extendMsgCache, tx.NewMongo(mongo.GetClient())) - msgDatabase := controller.NewMsgDatabase(msgDocModel, cacheModel) - notificationDatabase := controller.NewNotificationDatabase(msgDocModel, cacheModel) + msgDatabase := controller.NewMsgDatabase(msgDocModel, msgModel) + notificationDatabase := controller.NewNotificationDatabase(notificationDocModel, notificationModel) conversationRpcClient := rpcclient.NewConversationClient(client) msgTransfer := NewMsgTransfer(chatLogDatabase, extendMsgDatabase, msgDatabase, notificationDatabase, conversationRpcClient) @@ -77,8 +77,8 @@ func (m *MsgTransfer) initPrometheus() { } func (m *MsgTransfer) Start(prometheusPort int) error { - var wg sync.WaitGroup - wg.Add(4) + //var wg sync.WaitGroup + //wg.Add(4) fmt.Println("start msg transfer", "prometheusPort:", prometheusPort) if config.Config.ChatPersistenceMysql { go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH) @@ -92,6 +92,6 @@ func (m *MsgTransfer) Start(prometheusPort int) error { if err != nil { return err } - wg.Wait() + //wg.Wait() return nil } diff --git a/internal/push/offlinepush/fcm/push.go b/internal/push/offlinepush/fcm/push.go index a453b3606..66bde4056 100644 --- a/internal/push/offlinepush/fcm/push.go +++ b/internal/push/offlinepush/fcm/push.go @@ -20,10 +20,10 @@ var Terminal = []int{constant.IOSPlatformID, constant.AndroidPlatformID, constan type Fcm struct { fcmMsgCli *messaging.Client - cache cache.Model + cache cache.MsgModel } -func NewClient(cache cache.Model) *Fcm { +func NewClient(cache cache.MsgModel) *Fcm { opt := option.WithCredentialsFile(filepath.Join(config.Root, "config", config.Config.Push.Fcm.ServiceAccount)) fcmApp, err := firebase.NewApp(context.Background(), nil, opt) if err != nil { diff --git a/internal/push/offlinepush/fcm/push_test.go b/internal/push/offlinepush/fcm/push_test.go index e3a11e34b..81b54cfbb 100644 --- a/internal/push/offlinepush/fcm/push_test.go +++ b/internal/push/offlinepush/fcm/push_test.go @@ -9,7 +9,7 @@ import ( ) func Test_Push(t *testing.T) { - var redis cache.Model + var redis cache.MsgModel offlinePusher := NewClient(redis) err := offlinePusher.Push(context.Background(), []string{"userID1"}, "test", "test", &offlinepush.Opts{}) assert.Nil(t, err) diff --git a/internal/push/offlinepush/getui/push.go b/internal/push/offlinepush/getui/push.go index 0192b54a8..da1ec8ff8 100644 --- a/internal/push/offlinepush/getui/push.go +++ b/internal/push/offlinepush/getui/push.go @@ -38,12 +38,12 @@ const ( ) type Client struct { - cache cache.Model + cache cache.MsgModel tokenExpireTime int64 taskIDTTL int64 } -func NewClient(cache cache.Model) *Client { +func NewClient(cache cache.MsgModel) *Client { return &Client{cache: cache, tokenExpireTime: tokenExpireTime, taskIDTTL: taskIDTTL} } diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index cebf55ecb..ed7df20c5 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -21,7 +21,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e if err != nil { return err } - cacheModel := cache.NewCacheModel(rdb) + cacheModel := cache.NewMsgCacheModel(rdb) offlinePusher := NewOfflinePusher(cacheModel) database := controller.NewPushDatabase(cacheModel) pusher := NewPusher(client, offlinePusher, database, localcache.NewGroupLocalCache(client), localcache.NewConversationLocalCache(client)) diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index f2dc9135b..4c0984bfd 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -44,7 +44,7 @@ func NewPusher(client discoveryregistry.SvcDiscoveryRegistry, offlinePusher offl } } -func NewOfflinePusher(cache cache.Model) offlinepush.OfflinePusher { +func NewOfflinePusher(cache cache.MsgModel) offlinepush.OfflinePusher { var offlinePusher offlinepush.OfflinePusher if config.Config.Push.Getui.Enable { offlinePusher = getui.NewClient(cache) diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 713a202e3..fd7433f65 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -32,7 +32,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e pbAuth.RegisterAuthServer(server, &authServer{ userRpcClient: rpcclient.NewUserClient(client), RegisterCenter: client, - authDatabase: controller.NewAuthDatabase(cache.NewCacheModel(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire), + authDatabase: controller.NewAuthDatabase(cache.NewMsgCacheModel(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire), }) return nil } diff --git a/internal/rpc/msg/lock.go b/internal/rpc/msg/lock.go index 9a57a727a..9b558f26c 100644 --- a/internal/rpc/msg/lock.go +++ b/internal/rpc/msg/lock.go @@ -15,10 +15,10 @@ type MessageLocker interface { UnLockGlobalMessage(ctx context.Context, clientMsgID string) (err error) } type LockerMessage struct { - cache cache.Model + cache cache.MsgModel } -func NewLockerMessage(cache cache.Model) *LockerMessage { +func NewLockerMessage(cache cache.MsgModel) *LockerMessage { return &LockerMessage{cache: cache} } func (l *LockerMessage) LockMessageTypeKey(ctx context.Context, clientMsgID, typeKey string) (err error) { diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 8719fc26a..c30c199ad 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -57,7 +57,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e if err != nil { return err } - cacheModel := cache.NewCacheModel(rdb) + cacheModel := cache.NewMsgCacheModel(rdb) msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase()) extendMsgModel := unrelation.NewExtendMsgSetMongoDriver(mongo.GetDatabase()) extendMsgCacheModel := cache.NewExtendMsgSetCacheRedis(rdb, extendMsgModel, cache.GetDefaultOpt()) diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index d3520463c..7a995ce5a 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -37,7 +37,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e return err } third.RegisterThirdServer(server, &thirdServer{ - thirdDatabase: controller.NewThirdDatabase(cache.NewCacheModel(rdb)), + thirdDatabase: controller.NewThirdDatabase(cache.NewMsgCacheModel(rdb)), userRpcClient: rpcclient.NewUserClient(client), s3dataBase: controller.NewS3Database(o, relation.NewObjectHash(db), relation.NewObjectInfo(db), relation.NewObjectPut(db), u), }) diff --git a/internal/tools/msg_test.go b/internal/tools/msg_test.go index c09c349ed..3bb09d90c 100644 --- a/internal/tools/msg_test.go +++ b/internal/tools/msg_test.go @@ -63,7 +63,7 @@ func TestDeleteMongoMsgAndResetRedisSeq(t *testing.T) { if err != nil { return } - cacheModel := cache.NewCacheModel(rdb) + cacheModel := cache.NewMsgCacheModel(rdb) mongoClient := mgo.GetDatabase().Collection(unRelationTb.MsgDocModel{}.TableName()) ctx := context.Background() diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index 6bf48f122..2ccd61367 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -14,13 +14,13 @@ const ( blackExpireTime = time.Second * 60 * 60 * 12 ) -// args fn will exec when no data in cache +// args fn will exec when no data in msgCache type BlackCache interface { - //get blackIDs from cache + //get blackIDs from msgCache metaCache NewCache() BlackCache GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) - //del user's blackIDs cache, exec when a user's black list changed + //del user's blackIDs msgCache, exec when a user's black list changed DelBlackIDs(ctx context.Context, userID string) BlackCache } diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index ede6417ce..77979e153 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -24,22 +24,22 @@ const ( conversationExpireTime = time.Second * 60 * 60 * 12 ) -// arg fn will exec when no data in cache +// arg fn will exec when no data in msgCache type ConversationCache interface { metaCache NewCache() ConversationCache - // get user's conversationIDs from cache + // get user's conversationIDs from msgCache GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) DelConversationIDs(userIDs []string) ConversationCache - // get one conversation from cache + // get one conversation from msgCache GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error) DelConvsersations(ownerUserID string, conversationIDs []string) ConversationCache DelUsersConversation(conversationID string, ownerUserIDs ...string) ConversationCache - // get one conversation from cache + // get one conversation from msgCache GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*relationTb.ConversationModel, error) - // get one user's all conversations from cache + // get one user's all conversations from msgCache GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) - // get user conversation recv msg from cache + // get user conversation recv msg from msgCache GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) DelUserRecvMsgOpt(ownerUserID, conversationID string) ConversationCache // get one super group recv msg but do not notification userID list diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index b159aeb22..b8aa3f8ff 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -17,14 +17,14 @@ const ( friendKey = "FRIEND_INFO:" ) -// args fn will exec when no data in cache +// args fn will exec when no data in msgCache type FriendCache interface { metaCache NewCache() FriendCache GetFriendIDs(ctx context.Context, ownerUserID string) (friendIDs []string, err error) // call when friendID List changed DelFriendIDs(ownerUserID ...string) FriendCache - // get single friendInfo from cache + // get single friendInfo from msgCache GetFriend(ctx context.Context, ownerUserID, friendUserID string) (friend *relationTb.FriendModel, err error) // del friend when friend info changed DelFriend(ownerUserID, friendUserID string) FriendCache diff --git a/pkg/common/db/cache/redis.go b/pkg/common/db/cache/msg.go similarity index 70% rename from pkg/common/db/cache/redis.go rename to pkg/common/db/cache/msg.go index 1185d9608..f229d7f7c 100644 --- a/pkg/common/db/cache/redis.go +++ b/pkg/common/db/cache/msg.go @@ -20,26 +20,25 @@ import ( "github.com/go-redis/redis/v8" ) -const ( - userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq - appleDeviceToken = "DEVICE_TOKEN" - userMinSeq = "REDIS_USER_MIN_SEQ:" - getuiToken = "GETUI_TOKEN" - getuiTaskID = "GETUI_TASK_ID" - messageCache = "MESSAGE_CACHE:" - signalCache = "SIGNAL_CACHE:" - signalListCache = "SIGNAL_LIST_CACHE:" - FcmToken = "FCM_TOKEN:" - groupUserMinSeq = "GROUP_USER_MIN_SEQ:" - groupMaxSeq = "GROUP_MAX_SEQ:" - groupMinSeq = "GROUP_MIN_SEQ:" - sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" - userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" - exTypeKeyLocker = "EX_LOCK:" - uidPidToken = "UID_PID_TOKEN_STATUS:" -) +//const ( +// userIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq +// userMinSeq = "REDIS_USER_MIN_SEQ:" +// getuiToken = "GETUI_TOKEN" +// getuiTaskID = "GETUI_TASK_ID" +// messageCache = "MESSAGE_CACHE:" +// signalCache = "SIGNAL_CACHE:" +// signalListCache = "SIGNAL_LIST_CACHE:" +// FcmToken = "FCM_TOKEN:" +// groupUserMinSeq = "GROUP_USER_MIN_SEQ:" +// groupMaxSeq = "GROUP_MAX_SEQ:" +// groupMinSeq = "GROUP_MIN_SEQ:" +// sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" +// userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" +// exTypeKeyLocker = "EX_LOCK:" +// uidPidToken = "UID_PID_TOKEN_STATUS:" +//) -type Model interface { +type MsgModel interface { IncrUserSeq(ctx context.Context, userID string) (int64, error) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error @@ -87,16 +86,16 @@ type Model interface { UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error } -func NewCacheModel(client redis.UniversalClient) Model { - return &cache{rdb: client} +func NewMsgCacheModel(client redis.UniversalClient) MsgModel { + return &msgCache{rdb: client} } -type cache struct { +type msgCache struct { rdb redis.UniversalClient } // 兼容老版本调用 -func (c *cache) DelKeys() { +func (c *msgCache) DelKeys() { for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE:", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE:", "JOINED_GROUP_LIST_CACHE:", "GROUP_MEMBER_INFO_CACHE:", "GROUP_ALL_MEMBER_INFO_CACHE:", "ALL_FRIEND_INFO_CACHE:"} { fName := utils.GetSelfFuncName() @@ -127,66 +126,66 @@ func (c *cache) DelKeys() { } } -func (c *cache) IncrUserSeq(ctx context.Context, userID string) (int64, error) { +func (c *msgCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) { return utils.Wrap2(c.rdb.Get(ctx, userIncrSeq+userID).Int64()) } -func (c *cache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { +func (c *msgCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { return utils.Wrap2(c.rdb.Get(ctx, userIncrSeq+userID).Int64()) } -func (c *cache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error { +func (c *msgCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error { return errs.Wrap(c.rdb.Set(ctx, userIncrSeq+userID, maxSeq, 0).Err()) } -func (c *cache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { +func (c *msgCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { return errs.Wrap(c.rdb.Set(ctx, userMinSeq+userID, minSeq, 0).Err()) } -func (c *cache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { +func (c *msgCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { return utils.Wrap2(c.rdb.Get(ctx, userMinSeq+userID).Int64()) } -func (c *cache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { +func (c *msgCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { key := groupUserMinSeq + "g:" + groupID + "u:" + userID return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err()) } -func (c *cache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { +func (c *msgCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { key := groupUserMinSeq + "g:" + groupID + "u:" + userID return utils.Wrap2(c.rdb.Get(ctx, key).Int64()) } -func (c *cache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { +func (c *msgCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { return utils.Wrap2(c.rdb.Get(ctx, groupMaxSeq+groupID).Int64()) } -func (c *cache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { +func (c *msgCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { return utils.Wrap2(c.rdb.Get(ctx, groupMinSeq+groupID).Int64()) } -func (c *cache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { +func (c *msgCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { key := groupMaxSeq + groupID seq, err := c.rdb.Incr(ctx, key).Uint64() return int64(seq), errs.Wrap(err) } -func (c *cache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error { +func (c *msgCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error { key := groupMaxSeq + groupID return errs.Wrap(c.rdb.Set(ctx, key, maxSeq, 0).Err()) } -func (c *cache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error { +func (c *msgCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error { key := groupMinSeq + groupID return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err()) } -func (c *cache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { +func (c *msgCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { key := uidPidToken + userID + ":" + constant.PlatformIDToName(platformID) return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err()) } -func (c *cache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) { +func (c *msgCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) { key := uidPidToken + userID + ":" + platformID m, err := c.rdb.HGetAll(ctx, key).Result() if err != nil { @@ -199,7 +198,7 @@ func (c *cache) GetTokensWithoutError(ctx context.Context, userID, platformID st return mm, nil } -func (c *cache) SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error { +func (c *msgCache) SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error { key := uidPidToken + userID + ":" + platform mm := make(map[string]interface{}) for k, v := range m { @@ -208,20 +207,20 @@ func (c *cache) SetTokenMapByUidPid(ctx context.Context, userID string, platform return errs.Wrap(c.rdb.HSet(ctx, key, mm).Err()) } -func (c *cache) DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error { +func (c *msgCache) DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error { key := uidPidToken + userID + ":" + platform return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err()) } -func (c *cache) getMessageCacheKey(sourceID string, seq int64) string { +func (c *msgCache) getMessageCacheKey(sourceID string, seq int64) string { return messageCache + sourceID + "_" + strconv.Itoa(int(seq)) } -func (c *cache) allMessageCacheKey(sourceID string) string { +func (c *msgCache) allMessageCacheKey(sourceID string) string { return messageCache + sourceID + "_*" } -func (c *cache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { +func (c *msgCache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { pipe := c.rdb.Pipeline() for _, v := range seqs { //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 @@ -247,7 +246,7 @@ func (c *cache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int6 return seqMsgs, failedSeqs, err } -func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) (int, error) { +func (c *msgCache) SetMessageToCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) (int, error) { pipe := c.rdb.Pipeline() var failedMsgs []sdkws.MsgData for _, msg := range msgList { @@ -262,13 +261,13 @@ func (c *cache) SetMessageToCache(ctx context.Context, userID string, msgList [] } } if len(failedMsgs) != 0 { - return len(failedMsgs), fmt.Errorf("set msg to cache failed, failed lists: %v, %s", failedMsgs, userID) + return len(failedMsgs), fmt.Errorf("set msg to msgCache failed, failed lists: %v, %s", failedMsgs, userID) } _, err := pipe.Exec(ctx) return 0, err } -func (c *cache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error { +func (c *msgCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error { pipe := c.rdb.Pipeline() for _, v := range msgList { if err := pipe.Del(ctx, c.getMessageCacheKey(userID, v.Seq)).Err(); err != nil { @@ -279,7 +278,7 @@ func (c *cache) DeleteMessageFromCache(ctx context.Context, userID string, msgLi return errs.Wrap(err) } -func (c *cache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { +func (c *msgCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(userID)).Result() if err == redis.Nil { return nil @@ -297,7 +296,7 @@ func (c *cache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { return errs.Wrap(err) } -func (c *cache) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) { +func (c *msgCache) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) { req := &sdkws.SignalReq{} if err := proto.Unmarshal(msg.Content, req); err != nil { return false, errs.Wrap(err) @@ -349,7 +348,7 @@ func (c *cache) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, push return true, nil } -func (c *cache) GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (signalInviteReq *sdkws.SignalInviteReq, err error) { +func (c *msgCache) GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (signalInviteReq *sdkws.SignalInviteReq, err error) { bytes, err := c.rdb.Get(ctx, signalCache+clientMsgID).Bytes() if err != nil { return nil, errs.Wrap(err) @@ -370,7 +369,7 @@ func (c *cache) GetSignalInvitationInfoByClientMsgID(ctx context.Context, client return signalInviteReq, nil } -func (c *cache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error) { +func (c *msgCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error) { key, err := c.rdb.LPop(ctx, signalListCache+userID).Result() if err != nil { return nil, errs.Wrap(err) @@ -382,11 +381,11 @@ func (c *cache) GetAvailableSignalInvitationInfo(ctx context.Context, userID str return invitationInfo, errs.Wrap(c.DelUserSignalList(ctx, userID)) } -func (c *cache) DelUserSignalList(ctx context.Context, userID string) error { +func (c *msgCache) DelUserSignalList(ctx context.Context, userID string) error { return errs.Wrap(c.rdb.Del(ctx, signalListCache+userID).Err()) } -func (c *cache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error { +func (c *msgCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error { for _, seq := range seqs { key := c.getMessageCacheKey(userID, seq) result, err := c.rdb.Get(ctx, key).Result() @@ -412,67 +411,67 @@ func (c *cache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64 return nil } -func (c *cache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { +func (c *msgCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { return errs.Wrap(c.rdb.Set(ctx, getuiToken, token, time.Duration(expireTime)*time.Second).Err()) } -func (c *cache) GetGetuiToken(ctx context.Context) (string, error) { +func (c *msgCache) GetGetuiToken(ctx context.Context) (string, error) { return utils.Wrap2(c.rdb.Get(ctx, getuiToken).Result()) } -func (c *cache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { +func (c *msgCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { return errs.Wrap(c.rdb.Set(ctx, getuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()) } -func (c *cache) GetGetuiTaskID(ctx context.Context) (string, error) { +func (c *msgCache) GetGetuiTaskID(ctx context.Context) (string, error) { return utils.Wrap2(c.rdb.Get(ctx, getuiTaskID).Result()) } -func (c *cache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { +func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { return errs.Wrap(c.rdb.Set(ctx, sendMsgFailedFlag+id, status, time.Hour*24).Err()) } -func (c *cache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { +func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { result, err := c.rdb.Get(ctx, sendMsgFailedFlag+id).Int() return int32(result), errs.Wrap(err) } -func (c *cache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { +func (c *msgCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { return errs.Wrap(c.rdb.Set(ctx, FcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err()) } -func (c *cache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { +func (c *msgCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { return utils.Wrap2(c.rdb.Get(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Result()) } -func (c *cache) DelFcmToken(ctx context.Context, account string, platformID int) error { +func (c *msgCache) DelFcmToken(ctx context.Context, account string, platformID int) error { return errs.Wrap(c.rdb.Del(ctx, FcmToken+account+":"+strconv.Itoa(platformID)).Err()) } -func (c *cache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { +func (c *msgCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { seq, err := c.rdb.Incr(ctx, userBadgeUnreadCountSum+userID).Result() return int(seq), errs.Wrap(err) } -func (c *cache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error { +func (c *msgCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error { return errs.Wrap(c.rdb.Set(ctx, userBadgeUnreadCountSum+userID, value, 0).Err()) } -func (c *cache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { +func (c *msgCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { return utils.Wrap2(c.rdb.Get(ctx, userBadgeUnreadCountSum+userID).Int()) } -func (c *cache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { +func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { key := exTypeKeyLocker + clientMsgID + "_" + TypeKey return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err()) } -func (c *cache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { +func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { key := exTypeKeyLocker + clientMsgID + "_" + TypeKey return errs.Wrap(c.rdb.Del(ctx, key).Err()) } -func (c *cache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { +func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { switch sessionType { case constant.SingleChatType: return "EX_SINGLE_" + clientMsgID @@ -486,7 +485,7 @@ func (c *cache) getMessageReactionExPrefix(clientMsgID string, sessionType int32 return "" } -func (c *cache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { +func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() if err != nil { return false, utils.Wrap(err, "") @@ -494,22 +493,22 @@ func (c *cache) JudgeMessageReactionExist(ctx context.Context, clientMsgID strin return n > 0, nil } -func (c *cache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { +func (c *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) } -func (c *cache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { +func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { return utils.Wrap2(c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()) } -func (c *cache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { +func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { return utils.Wrap2(c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()) } -func (c *cache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { +func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { return utils.Wrap2(c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()) } -func (c *cache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { +func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) } diff --git a/pkg/common/db/cache/notification.go b/pkg/common/db/cache/notification.go new file mode 100644 index 000000000..8bf691c79 --- /dev/null +++ b/pkg/common/db/cache/notification.go @@ -0,0 +1,514 @@ +package cache + +import ( + "context" + "errors" + "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" + "strconv" + "time" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" + "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" + "github.com/gogo/protobuf/jsonpb" + + "google.golang.org/protobuf/proto" + + "github.com/go-redis/redis/v8" +) + +const ( + NotificationUserIncrSeq = "REDIS_USER_INCR_SEQ:" // user incr seq + NotificationUserMinSeq = "REDIS_USER_MIN_SEQ:" + NotificationGetuiToken = "GETUI_TOKEN" + NotificationGetuiTaskID = "GETUI_TASK_ID" + NotificationMessageCache = "MESSAGE_CACHE:" + NotificationSignalCache = "SIGNAL_CACHE:" + NotificationSignalListCache = "SIGNAL_LIST_CACHE:" + NotificationFcmToken = "FCM_TOKEN:" + NotificationGroupUserMinSeq = "GROUP_USER_MIN_SEQ:" + NotificationGroupMaxSeq = "GROUP_MAX_SEQ:" + NotificationGroupMinSeq = "GROUP_MIN_SEQ:" + NotificationSendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" + NotificationUserBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" + NotificationExTypeKeyLocker = "EX_LOCK:" + NotificationUidPidToken = "UID_PID_TOKEN_STATUS:" +) + +type NotificationModel interface { + IncrUserSeq(ctx context.Context, userID string) (int64, error) + GetUserMaxSeq(ctx context.Context, userID string) (int64, error) + SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error + SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) + GetUserMinSeq(ctx context.Context, userID string) (int64, error) + SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) + GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) + GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) + GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) + IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) + SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error + SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error + AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error + GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) + SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error + DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error + GetMessagesBySeq(ctx context.Context, userID string, seqList []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) + SetMessageToCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) (int, error) + DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error + CleanUpOneUserAllMsg(ctx context.Context, userID string) error + HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) + GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (invitationInfo *sdkws.SignalInviteReq, err error) + GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error) + DelUserSignalList(ctx context.Context, userID string) error + DelMsgFromCache(ctx context.Context, userID string, seqList []int64) error + SetGetuiToken(ctx context.Context, token string, expireTime int64) error + GetGetuiToken(ctx context.Context) (string, error) + SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error + GetGetuiTaskID(ctx context.Context) (string, error) + SetSendMsgStatus(ctx context.Context, id string, status int32) error + GetSendMsgStatus(ctx context.Context, id string) (int32, error) + SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) + GetFcmToken(ctx context.Context, account string, platformID int) (string, error) + DelFcmToken(ctx context.Context, account string, platformID int) error + IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) + SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error + GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) + JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) + GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) + DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error + SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) + GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) + SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error + LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error + UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error +} + +func NewNotificationCacheModel(client redis.UniversalClient) NotificationModel { + return ¬ificationCache{rdb: client} +} + +type notificationCache struct { + rdb redis.UniversalClient +} + +// 兼容老版本调用 +func (c *notificationCache) DelKeys() { + for _, key := range []string{"GROUP_CACHE:", "FRIEND_RELATION_CACHE:", "BLACK_LIST_CACHE:", "USER_INFO_CACHE:", "GROUP_INFO_CACHE:", "JOINED_GROUP_LIST_CACHE:", + "GROUP_MEMBER_INFO_CACHE:", "GROUP_ALL_MEMBER_INFO_CACHE:", "ALL_FRIEND_INFO_CACHE:"} { + fName := utils.GetSelfFuncName() + var cursor uint64 + var n int + for { + var keys []string + var err error + keys, cursor, err = c.rdb.Scan(context.Background(), cursor, key+"*", scanCount).Result() + if err != nil { + panic(err.Error()) + } + n += len(keys) + // for each for redis cluster + for _, key := range keys { + if err = c.rdb.Del(context.Background(), key).Err(); err != nil { + log.NewError("", fName, key, err.Error()) + err = c.rdb.Del(context.Background(), key).Err() + if err != nil { + panic(err.Error()) + } + } + } + if cursor == 0 { + break + } + } + } +} + +func (c *notificationCache) IncrUserSeq(ctx context.Context, userID string) (int64, error) { + return utils.Wrap2(c.rdb.Get(ctx, NotificationUserIncrSeq+userID).Int64()) +} + +func (c *notificationCache) GetUserMaxSeq(ctx context.Context, userID string) (int64, error) { + return utils.Wrap2(c.rdb.Get(ctx, NotificationUserIncrSeq+userID).Int64()) +} + +func (c *notificationCache) SetUserMaxSeq(ctx context.Context, userID string, maxSeq int64) error { + return errs.Wrap(c.rdb.Set(ctx, NotificationUserIncrSeq+userID, maxSeq, 0).Err()) +} + +func (c *notificationCache) SetUserMinSeq(ctx context.Context, userID string, minSeq int64) (err error) { + return errs.Wrap(c.rdb.Set(ctx, NotificationUserMinSeq+userID, minSeq, 0).Err()) +} + +func (c *notificationCache) GetUserMinSeq(ctx context.Context, userID string) (int64, error) { + return utils.Wrap2(c.rdb.Get(ctx, NotificationUserMinSeq+userID).Int64()) +} + +func (c *notificationCache) SetGroupUserMinSeq(ctx context.Context, groupID, userID string, minSeq int64) (err error) { + key := NotificationGroupUserMinSeq + "g:" + groupID + "u:" + userID + return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err()) +} + +func (c *notificationCache) GetGroupUserMinSeq(ctx context.Context, groupID, userID string) (int64, error) { + key := NotificationGroupUserMinSeq + "g:" + groupID + "u:" + userID + return utils.Wrap2(c.rdb.Get(ctx, key).Int64()) +} + +func (c *notificationCache) GetGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { + return utils.Wrap2(c.rdb.Get(ctx, NotificationGroupMaxSeq+groupID).Int64()) +} + +func (c *notificationCache) GetGroupMinSeq(ctx context.Context, groupID string) (int64, error) { + return utils.Wrap2(c.rdb.Get(ctx, NotificationGroupMinSeq+groupID).Int64()) +} + +func (c *notificationCache) IncrGroupMaxSeq(ctx context.Context, groupID string) (int64, error) { + key := NotificationGroupMaxSeq + groupID + seq, err := c.rdb.Incr(ctx, key).Uint64() + return int64(seq), errs.Wrap(err) +} + +func (c *notificationCache) SetGroupMaxSeq(ctx context.Context, groupID string, maxSeq int64) error { + key := NotificationGroupMaxSeq + groupID + return errs.Wrap(c.rdb.Set(ctx, key, maxSeq, 0).Err()) +} + +func (c *notificationCache) SetGroupMinSeq(ctx context.Context, groupID string, minSeq int64) error { + key := NotificationGroupMinSeq + groupID + return errs.Wrap(c.rdb.Set(ctx, key, minSeq, 0).Err()) +} + +func (c *notificationCache) AddTokenFlag(ctx context.Context, userID string, platformID int, token string, flag int) error { + key := NotificationUidPidToken + userID + ":" + constant.PlatformIDToName(platformID) + return errs.Wrap(c.rdb.HSet(ctx, key, token, flag).Err()) +} + +func (c *notificationCache) GetTokensWithoutError(ctx context.Context, userID, platformID string) (map[string]int, error) { + key := NotificationUidPidToken + userID + ":" + platformID + m, err := c.rdb.HGetAll(ctx, key).Result() + if err != nil { + return nil, errs.Wrap(err) + } + mm := make(map[string]int) + for k, v := range m { + mm[k] = utils.StringToInt(v) + } + return mm, nil +} + +func (c *notificationCache) SetTokenMapByUidPid(ctx context.Context, userID string, platform string, m map[string]int) error { + key := NotificationUidPidToken + userID + ":" + platform + mm := make(map[string]interface{}) + for k, v := range m { + mm[k] = v + } + return errs.Wrap(c.rdb.HSet(ctx, key, mm).Err()) +} + +func (c *notificationCache) DeleteTokenByUidPid(ctx context.Context, userID string, platform string, fields []string) error { + key := NotificationUidPidToken + userID + ":" + platform + return errs.Wrap(c.rdb.HDel(ctx, key, fields...).Err()) +} + +func (c *notificationCache) getMessageCacheKey(sourceID string, seq int64) string { + return NotificationMessageCache + sourceID + "_" + strconv.Itoa(int(seq)) +} + +func (c *notificationCache) allMessageCacheKey(sourceID string) string { + return NotificationMessageCache + sourceID + "_*" +} + +func (c *notificationCache) GetMessagesBySeq(ctx context.Context, userID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { + pipe := c.rdb.Pipeline() + for _, v := range seqs { + //MESSAGE_CACHE:169.254.225.224_reliability1653387820_0_1 + key := c.getMessageCacheKey(userID, v) + if err := pipe.Get(ctx, key).Err(); err != nil && err != redis.Nil { + return nil, nil, err + } + } + result, err := pipe.Exec(ctx) + for i, v := range result { + if v.Err() != nil { + failedSeqs = append(failedSeqs, seqs[i]) + } else { + msg := sdkws.MsgData{} + err = jsonpb.UnmarshalString(v.String(), &msg) + if err != nil { + failedSeqs = append(failedSeqs, seqs[i]) + } else { + seqMsgs = append(seqMsgs, &msg) + } + } + } + return seqMsgs, failedSeqs, err +} + +func (c *notificationCache) SetMessageToCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) (int, error) { + pipe := c.rdb.Pipeline() + var failedMsgs []sdkws.MsgData + for _, msg := range msgList { + key := c.getMessageCacheKey(userID, msg.Seq) + s, err := utils.Pb2String(msg) + if err != nil { + return 0, errs.Wrap(err) + } + err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() + if err != nil { + return 0, errs.Wrap(err) + } + } + if len(failedMsgs) != 0 { + return len(failedMsgs), fmt.Errorf("set msg to notificationCache failed, failed lists: %v, %s", failedMsgs, userID) + } + _, err := pipe.Exec(ctx) + return 0, err +} + +func (c *notificationCache) DeleteMessageFromCache(ctx context.Context, userID string, msgList []*sdkws.MsgData) error { + pipe := c.rdb.Pipeline() + for _, v := range msgList { + if err := pipe.Del(ctx, c.getMessageCacheKey(userID, v.Seq)).Err(); err != nil { + return errs.Wrap(err) + } + } + _, err := pipe.Exec(ctx) + return errs.Wrap(err) +} + +func (c *notificationCache) CleanUpOneUserAllMsg(ctx context.Context, userID string) error { + vals, err := c.rdb.Keys(ctx, c.allMessageCacheKey(userID)).Result() + if err == redis.Nil { + return nil + } + if err != nil { + return errs.Wrap(err) + } + pipe := c.rdb.Pipeline() + for _, v := range vals { + if err := pipe.Del(ctx, v).Err(); err != nil { + return errs.Wrap(err) + } + } + _, err = pipe.Exec(ctx) + return errs.Wrap(err) +} + +func (c *notificationCache) HandleSignalInvite(ctx context.Context, msg *sdkws.MsgData, pushToUserID string) (isSend bool, err error) { + req := &sdkws.SignalReq{} + if err := proto.Unmarshal(msg.Content, req); err != nil { + return false, errs.Wrap(err) + } + var inviteeUserIDs []string + var isInviteSignal bool + switch signalInfo := req.Payload.(type) { + case *sdkws.SignalReq_Invite: + inviteeUserIDs = signalInfo.Invite.Invitation.InviteeUserIDList + isInviteSignal = true + case *sdkws.SignalReq_InviteInGroup: + inviteeUserIDs = signalInfo.InviteInGroup.Invitation.InviteeUserIDList + isInviteSignal = true + if !utils.Contain(pushToUserID, inviteeUserIDs...) { + return false, nil + } + case *sdkws.SignalReq_HungUp, *sdkws.SignalReq_Cancel, *sdkws.SignalReq_Reject, *sdkws.SignalReq_Accept: + return false, errs.Wrap(errors.New("signalInfo do not need offlinePush")) + default: + return false, nil + } + if isInviteSignal { + pipe := c.rdb.Pipeline() + for _, userID := range inviteeUserIDs { + timeout, err := strconv.Atoi(config.Config.Rtc.SignalTimeout) + if err != nil { + return false, errs.Wrap(err) + } + keys := NotificationSignalListCache + userID + err = pipe.LPush(ctx, keys, msg.ClientMsgID).Err() + if err != nil { + return false, errs.Wrap(err) + } + err = pipe.Expire(ctx, keys, time.Duration(timeout)*time.Second).Err() + if err != nil { + return false, errs.Wrap(err) + } + key := NotificationSignalCache + msg.ClientMsgID + err = pipe.Set(ctx, key, msg.Content, time.Duration(timeout)*time.Second).Err() + if err != nil { + return false, errs.Wrap(err) + } + } + _, err := pipe.Exec(ctx) + if err != nil { + return false, errs.Wrap(err) + } + } + return true, nil +} + +func (c *notificationCache) GetSignalInvitationInfoByClientMsgID(ctx context.Context, clientMsgID string) (signalInviteReq *sdkws.SignalInviteReq, err error) { + bytes, err := c.rdb.Get(ctx, NotificationSignalCache+clientMsgID).Bytes() + if err != nil { + return nil, errs.Wrap(err) + } + signalReq := &sdkws.SignalReq{} + if err = proto.Unmarshal(bytes, signalReq); err != nil { + return nil, errs.Wrap(err) + } + signalInviteReq = &sdkws.SignalInviteReq{} + switch req := signalReq.Payload.(type) { + case *sdkws.SignalReq_Invite: + signalInviteReq.Invitation = req.Invite.Invitation + signalInviteReq.OpUserID = req.Invite.OpUserID + case *sdkws.SignalReq_InviteInGroup: + signalInviteReq.Invitation = req.InviteInGroup.Invitation + signalInviteReq.OpUserID = req.InviteInGroup.OpUserID + } + return signalInviteReq, nil +} + +func (c *notificationCache) GetAvailableSignalInvitationInfo(ctx context.Context, userID string) (invitationInfo *sdkws.SignalInviteReq, err error) { + key, err := c.rdb.LPop(ctx, NotificationSignalListCache+userID).Result() + if err != nil { + return nil, errs.Wrap(err) + } + invitationInfo, err = c.GetSignalInvitationInfoByClientMsgID(ctx, key) + if err != nil { + return nil, err + } + return invitationInfo, errs.Wrap(c.DelUserSignalList(ctx, userID)) +} + +func (c *notificationCache) DelUserSignalList(ctx context.Context, userID string) error { + return errs.Wrap(c.rdb.Del(ctx, NotificationSignalListCache+userID).Err()) +} + +func (c *notificationCache) DelMsgFromCache(ctx context.Context, userID string, seqs []int64) error { + for _, seq := range seqs { + key := c.getMessageCacheKey(userID, seq) + result, err := c.rdb.Get(ctx, key).Result() + if err != nil { + if err == redis.Nil { + continue + } + return errs.Wrap(err) + } + var msg sdkws.MsgData + if err := jsonpb.UnmarshalString(result, &msg); err != nil { + return err + } + msg.Status = constant.MsgDeleted + s, err := utils.Pb2String(&msg) + if err != nil { + return errs.Wrap(err) + } + if err := c.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err(); err != nil { + return errs.Wrap(err) + } + } + return nil +} + +func (c *notificationCache) SetGetuiToken(ctx context.Context, token string, expireTime int64) error { + return errs.Wrap(c.rdb.Set(ctx, NotificationGetuiToken, token, time.Duration(expireTime)*time.Second).Err()) +} + +func (c *notificationCache) GetGetuiToken(ctx context.Context) (string, error) { + return utils.Wrap2(c.rdb.Get(ctx, NotificationGetuiToken).Result()) +} + +func (c *notificationCache) SetGetuiTaskID(ctx context.Context, taskID string, expireTime int64) error { + return errs.Wrap(c.rdb.Set(ctx, NotificationGetuiTaskID, taskID, time.Duration(expireTime)*time.Second).Err()) +} + +func (c *notificationCache) GetGetuiTaskID(ctx context.Context) (string, error) { + return utils.Wrap2(c.rdb.Get(ctx, NotificationGetuiTaskID).Result()) +} + +func (c *notificationCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { + return errs.Wrap(c.rdb.Set(ctx, NotificationSendMsgFailedFlag+id, status, time.Hour*24).Err()) +} + +func (c *notificationCache) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { + result, err := c.rdb.Get(ctx, NotificationSendMsgFailedFlag+id).Int() + return int32(result), errs.Wrap(err) +} + +func (c *notificationCache) SetFcmToken(ctx context.Context, account string, platformID int, fcmToken string, expireTime int64) (err error) { + return errs.Wrap(c.rdb.Set(ctx, NotificationFcmToken+account+":"+strconv.Itoa(platformID), fcmToken, time.Duration(expireTime)*time.Second).Err()) +} + +func (c *notificationCache) GetFcmToken(ctx context.Context, account string, platformID int) (string, error) { + return utils.Wrap2(c.rdb.Get(ctx, NotificationFcmToken+account+":"+strconv.Itoa(platformID)).Result()) +} + +func (c *notificationCache) DelFcmToken(ctx context.Context, account string, platformID int) error { + return errs.Wrap(c.rdb.Del(ctx, NotificationFcmToken+account+":"+strconv.Itoa(platformID)).Err()) +} + +func (c *notificationCache) IncrUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { + seq, err := c.rdb.Incr(ctx, NotificationUserBadgeUnreadCountSum+userID).Result() + return int(seq), errs.Wrap(err) +} + +func (c *notificationCache) SetUserBadgeUnreadCountSum(ctx context.Context, userID string, value int) error { + return errs.Wrap(c.rdb.Set(ctx, NotificationUserBadgeUnreadCountSum+userID, value, 0).Err()) +} + +func (c *notificationCache) GetUserBadgeUnreadCountSum(ctx context.Context, userID string) (int, error) { + return utils.Wrap2(c.rdb.Get(ctx, NotificationUserBadgeUnreadCountSum+userID).Int()) +} + +func (c *notificationCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { + key := NotificationExTypeKeyLocker + clientMsgID + "_" + TypeKey + return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err()) +} + +func (c *notificationCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { + key := NotificationExTypeKeyLocker + clientMsgID + "_" + TypeKey + return errs.Wrap(c.rdb.Del(ctx, key).Err()) +} + +func (c *notificationCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { + switch sessionType { + case constant.SingleChatType: + return "EX_SINGLE_" + clientMsgID + case constant.GroupChatType: + return "EX_GROUP_" + clientMsgID + case constant.SuperGroupChatType: + return "EX_SUPER_GROUP_" + clientMsgID + case constant.NotificationChatType: + return "EX_NOTIFICATION" + clientMsgID + } + return "" +} + +func (c *notificationCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { + n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() + if err != nil { + return false, utils.Wrap(err, "") + } + return n > 0, nil +} + +func (c *notificationCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { + return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) +} + +func (c *notificationCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { + return utils.Wrap2(c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result()) +} + +func (c *notificationCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { + return utils.Wrap2(c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result()) +} + +func (c *notificationCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { + return utils.Wrap2(c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result()) +} + +func (c *notificationCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { + return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) +} diff --git a/pkg/common/db/cache/rockscache.go b/pkg/common/db/cache/rockscache.go index 4c911f369..44e13b992 100644 --- a/pkg/common/db/cache/rockscache.go +++ b/pkg/common/db/cache/rockscache.go @@ -87,11 +87,11 @@ func getCache[T any](ctx context.Context, rcClient *rockscache.Client, key strin return t, nil } if v == "" { - return t, errs.ErrRecordNotFound.Wrap("cache is not found") + return t, errs.ErrRecordNotFound.Wrap("msgCache is not found") } err = json.Unmarshal([]byte(v), &t) if err != nil { - log.ZError(ctx, "cache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire) + log.ZError(ctx, "msgCache json.Unmarshal failed", err, "key", key, "value", v, "expire", expire) return t, utils.Wrap(err, "") } return t, nil diff --git a/pkg/common/db/controller/auth.go b/pkg/common/db/controller/auth.go index 167ff80d0..148ef6c96 100644 --- a/pkg/common/db/controller/auth.go +++ b/pkg/common/db/controller/auth.go @@ -18,13 +18,13 @@ type AuthDatabase interface { } type authDatabase struct { - cache cache.Model + cache cache.MsgModel accessSecret string accessExpire int64 } -func NewAuthDatabase(cache cache.Model, accessSecret string, accessExpire int64) AuthDatabase { +func NewAuthDatabase(cache cache.MsgModel, accessSecret string, accessExpire int64) AuthDatabase { return &authDatabase{cache: cache, accessSecret: accessSecret, accessExpire: accessExpire} } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index c0d8ee7e6..1cf962f16 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -85,7 +85,7 @@ type MsgDatabase interface { MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*sdkws.MsgData, lastSeq int64) error } -func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) MsgDatabase { +func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.MsgModel) MsgDatabase { return &msgDatabase{ msgDocDatabase: msgDocModel, cache: cacheModel, @@ -97,7 +97,7 @@ func NewMsgDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel ca } func InitMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) MsgDatabase { - cacheModel := cache.NewCacheModel(rdb) + cacheModel := cache.NewMsgCacheModel(rdb) msgDocModel := unrelation.NewMsgMongoDriver(database) msgDatabase := NewMsgDatabase(msgDocModel, cacheModel) return msgDatabase @@ -106,7 +106,7 @@ func InitMsgDatabase(rdb redis.UniversalClient, database *mongo.Database) MsgDat type msgDatabase struct { msgDocDatabase unRelationTb.MsgDocModelInterface extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface - cache cache.Model + cache cache.MsgModel producer *kafka.Producer producerToMongo *kafka.Producer producerToModify *kafka.Producer diff --git a/pkg/common/db/controller/notification.go b/pkg/common/db/controller/notification.go index 1bbcdf5b3..af0c7c989 100644 --- a/pkg/common/db/controller/notification.go +++ b/pkg/common/db/controller/notification.go @@ -85,7 +85,7 @@ type NotificationDatabase interface { MsgToMongoMQ(ctx context.Context, aggregationID string, messages []*sdkws.MsgData, lastSeq int64) error } -func NewNotificationDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cacheModel cache.Model) NotificationDatabase { +func NewNotificationDatabase(msgDocModel unRelationTb.NotificationDocModelInterface, cacheModel cache.NotificationModel) NotificationDatabase { return ¬ificationDatabase{ msgDocDatabase: msgDocModel, cache: cacheModel, @@ -97,22 +97,23 @@ func NewNotificationDatabase(msgDocModel unRelationTb.MsgDocModelInterface, cach } func InitNotificationDatabase(rdb redis.UniversalClient, database *mongo.Database) MsgDatabase { - cacheModel := cache.NewCacheModel(rdb) + cacheModel := cache.NewMsgCacheModel(rdb) msgDocModel := unrelation.NewMsgMongoDriver(database) msgDatabase := NewMsgDatabase(msgDocModel, cacheModel) return msgDatabase } type notificationDatabase struct { - msgDocDatabase unRelationTb.MsgDocModelInterface + msgDocDatabase unRelationTb.NotificationDocModelInterface extendMsgDatabase unRelationTb.ExtendMsgSetModelInterface - cache cache.Model + cache cache.NotificationModel producer *kafka.Producer producerToMongo *kafka.Producer producerToModify *kafka.Producer producerToPush *kafka.Producer // model - msg unRelationTb.MsgDocModel + //msg unRelationTb.MsgDocModel + msg unRelationTb.NotificationDocModel extendMsgSetModel unRelationTb.ExtendMsgSetModel } @@ -231,30 +232,30 @@ func (db *notificationDatabase) GetGroupMinSeq(ctx context.Context, groupID stri func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, sourceID string, msgList []*sdkws.MsgData, currentMaxSeq int64) error { //newTime := utils.GetCurrentTimestampByMill() - if int64(len(msgList)) > db.msg.GetSingleGocMsgNum() { + if int64(len(msgList)) > db.msg.GetsingleGocNotificationNum() { return errors.New("too large") } var remain int64 - blk0 := db.msg.GetSingleGocMsgNum() - 1 + blk0 := db.msg.GetsingleGocNotificationNum() - 1 //currentMaxSeq 4998 - if currentMaxSeq < db.msg.GetSingleGocMsgNum() { + if currentMaxSeq < db.msg.GetsingleGocNotificationNum() { remain = blk0 - currentMaxSeq //1 } else { excludeBlk0 := currentMaxSeq - blk0 //=1 //(5000-1)%5000 == 4999 - remain = (db.msg.GetSingleGocMsgNum() - (excludeBlk0 % db.msg.GetSingleGocMsgNum())) % db.msg.GetSingleGocMsgNum() + remain = (db.msg.GetsingleGocNotificationNum() - (excludeBlk0 % db.msg.GetsingleGocNotificationNum())) % db.msg.GetsingleGocNotificationNum() } //remain=1 var insertCounter int64 - msgsToMongo := make([]unRelationTb.MsgInfoModel, 0) - msgsToMongoNext := make([]unRelationTb.MsgInfoModel, 0) + msgsToMongo := make([]unRelationTb.NotificationInfoModel, 0) + msgsToMongoNext := make([]unRelationTb.NotificationInfoModel, 0) docID := "" docIDNext := "" var err error for _, m := range msgList { //log.Debug(operationID, "msg node ", m.String(), m.MsgData.ClientMsgID) currentMaxSeq++ - sMsg := unRelationTb.MsgInfoModel{} + sMsg := unRelationTb.NotificationInfoModel{} sMsg.SendTime = m.SendTime m.Seq = currentMaxSeq if sMsg.Msg, err = proto.Marshal(m); err != nil { @@ -279,7 +280,7 @@ func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, sourceID err = db.msgDocDatabase.PushMsgsToDoc(ctx, docID, msgsToMongo) if err != nil { if err == mongo.ErrNoDocuments { - doc := &unRelationTb.MsgDocModel{} + doc := &unRelationTb.NotificationDocModel{} doc.DocID = docID doc.Msg = msgsToMongo if err = db.msgDocDatabase.Create(ctx, doc); err != nil { @@ -298,7 +299,7 @@ func (db *notificationDatabase) BatchInsertChat2DB(ctx context.Context, sourceID } } if docIDNext != "" { - nextDoc := &unRelationTb.MsgDocModel{} + nextDoc := &unRelationTb.NotificationDocModel{} nextDoc.DocID = docIDNext nextDoc.Msg = msgsToMongoNext //log.NewDebug(operationID, "filter ", seqUidNext, "list ", msgListToMongoNext, "userID: ", userID) @@ -324,7 +325,7 @@ func (db *notificationDatabase) NotificationBatchInsertChat2Cache(ctx context.Co func (db *notificationDatabase) BatchInsertChat2Cache(ctx context.Context, sourceID string, msgList []*sdkws.MsgData) (int64, error) { //newTime := utils.GetCurrentTimestampByMill() lenList := len(msgList) - if int64(lenList) > db.msg.GetSingleGocMsgNum() { + if int64(lenList) > db.msg.GetsingleGocNotificationNum() { return 0, errors.New("too large") } if lenList < 1 { @@ -454,7 +455,7 @@ func (db *notificationDatabase) GetOldestMsg(ctx context.Context, sourceID strin return db.unmarshalMsg(msgInfo) } -func (db *notificationDatabase) unmarshalMsg(msgInfo *unRelationTb.MsgInfoModel) (msgPb *sdkws.MsgData, err error) { +func (db *notificationDatabase) unmarshalMsg(msgInfo *unRelationTb.NotificationInfoModel) (msgPb *sdkws.MsgData, err error) { msgPb = &sdkws.MsgData{} err = proto.Unmarshal(msgInfo.Msg, msgPb) if err != nil { @@ -628,7 +629,7 @@ func (db *notificationDatabase) deleteMsgRecursion(ctx context.Context, sourceID return delStruct.getSetMinSeq() + 1, nil } //log.NewDebug(operationID, "ID:", sourceID, "index:", index, "uid:", msgs.UID, "len:", len(msgs.Msg)) - if int64(len(msgs.Msg)) > db.msg.GetSingleGocMsgNum() { + if int64(len(msgs.Msg)) > db.msg.GetsingleGocNotificationNum() { log.ZWarn(ctx, "msgs too large", nil, "lenth", len(msgs.Msg), "docID:", msgs.DocID) } if msgs.Msg[len(msgs.Msg)-1].SendTime+(remainTime*1000) < utils.GetCurrentTimestampByMill() && msgs.IsFull() { diff --git a/pkg/common/db/controller/push.go b/pkg/common/db/controller/push.go index 940359f0c..1109970de 100644 --- a/pkg/common/db/controller/push.go +++ b/pkg/common/db/controller/push.go @@ -12,10 +12,10 @@ type PushDatabase interface { } type pushDataBase struct { - cache cache.Model + cache cache.MsgModel } -func NewPushDatabase(cache cache.Model) PushDatabase { +func NewPushDatabase(cache cache.MsgModel) PushDatabase { return &pushDataBase{cache: cache} } diff --git a/pkg/common/db/controller/third.go b/pkg/common/db/controller/third.go index 476cba21f..1eb23fad9 100644 --- a/pkg/common/db/controller/third.go +++ b/pkg/common/db/controller/third.go @@ -14,10 +14,10 @@ type ThirdDatabase interface { } type thirdDatabase struct { - cache cache.Model + cache cache.MsgModel } -func NewThirdDatabase(cache cache.Model) ThirdDatabase { +func NewThirdDatabase(cache cache.MsgModel) ThirdDatabase { return &thirdDatabase{cache: cache} } diff --git a/pkg/common/db/table/unrelation/msg.go b/pkg/common/db/table/unrelation/msg.go index 4b9c445f1..94e2c7d56 100644 --- a/pkg/common/db/table/unrelation/msg.go +++ b/pkg/common/db/table/unrelation/msg.go @@ -1,28 +1,133 @@ package unrelation -import "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" +import ( + "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" + "strconv" + "strings" +) -type MsgModel struct { - SendID string `bson:"send_id"` - RecvID string `bson:"recv_id"` - GroupID string `bson:"group_id"` - ClientMsgID string `bson:"client_msg_id"` // 客户端消息ID - ServerMsgID string `bson:"server_msg_id"` // 服务端消息ID - SenderPlatformID int32 `bson:"sender_platform_id"` - SenderNickname string `bson:"sender_nickname"` - SenderFaceURL string `bson:"sender_face_url"` - SessionType int32 `bson:"session_type"` - MsgFrom int32 `bson:"msg_from"` - ContentType int32 `bson:"contentType"` - Content []byte `bson:"content"` - Seq int64 `bson:"seq"` - SendTime int64 `bson:"sendTime"` - CreateTime int64 `bson:"createTime"` - Status int32 `bson:"status"` - Options map[string]bool `bson:"options"` - OfflinePushInfo *sdkws.OfflinePushInfo `bson:"offlinePushInfo"` - AtUserIDList []string `bson:"atUserIDList"` - MsgDataList []byte `bson:"msgDataList"` - AttachedInfo string `bson:"attachedInfo"` - Ex string `bson:"ex"` +const ( + singleGocMsgNum = 5000 + Msg = "msg" + OldestList = 0 + NewestList = -1 +) + +type MsgDocModel struct { + DocID string `bson:"uid"` + Msg []MsgInfoModel `bson:"msg"` +} + +type MsgInfoModel struct { + SendTime int64 `bson:"sendtime"` + Msg []byte `bson:"msg"` +} + +type MsgDocModelInterface interface { + PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error + Create(ctx context.Context, model *MsgDocModel) error + UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error + FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) + GetNewestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error) + GetOldestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error) + Delete(ctx context.Context, docIDs []string) error + GetMsgsByIndex(ctx context.Context, sourceID string, index int64) (*MsgDocModel, error) + UpdateOneDoc(ctx context.Context, msg *MsgDocModel) error +} + +func (MsgDocModel) TableName() string { + return Msg +} + +func (MsgDocModel) GetSingleGocMsgNum() int64 { + return singleGocMsgNum +} + +func (m *MsgDocModel) IsFull() bool { + index, _ := strconv.Atoi(strings.Split(m.DocID, ":")[1]) + if index == 0 { + if len(m.Msg) >= singleGocMsgNum-1 { + return true + } + } + if len(m.Msg) >= singleGocMsgNum { + return true + } + + return false +} + +func (m MsgDocModel) GetDocID(sourceID string, seq int64) string { + seqSuffix := seq / singleGocMsgNum + return m.indexGen(sourceID, seqSuffix) +} + +func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string { + seqMaxSuffix := maxSeq / singleGocMsgNum + var seqUserIDs []string + for i := 0; i <= int(seqMaxSuffix); i++ { + seqUserID := m.indexGen(userID, int64(i)) + seqUserIDs = append(seqUserIDs, seqUserID) + } + return seqUserIDs +} + +func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq int64) string { + seqSuffix := seq / singleGocMsgNum + return m.superGroupIndexGen(groupID, seqSuffix) +} + +func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string { + return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10) +} + +func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 { + t := make(map[string][]int64) + for i := 0; i < len(seqs); i++ { + docID := m.GetDocID(sourceID, seqs[i]) + if value, ok := t[docID]; !ok { + var temp []int64 + t[docID] = append(temp, seqs[i]) + } else { + t[docID] = append(value, seqs[i]) + } + } + return t +} + +func (m MsgDocModel) getMsgIndex(seq uint32) int { + seqSuffix := seq / singleGocMsgNum + var index uint32 + if seqSuffix == 0 { + index = (seq - seqSuffix*singleGocMsgNum) - 1 + } else { + index = seq - seqSuffix*singleGocMsgNum + } + return int(index) +} + +func (m MsgDocModel) indexGen(sourceID string, seqSuffix int64) string { + return sourceID + ":" + strconv.FormatInt(seqSuffix, 10) +} + +func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) { + for _, v := range seqs { + msg := new(sdkws.MsgData) + msg.Seq = v + exceptionMsg = append(exceptionMsg, msg) + } + return exceptionMsg +} + +func (MsgDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []int64, groupID string) (exceptionMsg []*sdkws.MsgData) { + for _, v := range seqs { + msg := new(sdkws.MsgData) + msg.Seq = v + msg.GroupID = groupID + msg.SessionType = constant.SuperGroupChatType + exceptionMsg = append(exceptionMsg, msg) + } + return exceptionMsg } diff --git a/pkg/common/db/table/unrelation/notification.go b/pkg/common/db/table/unrelation/notification.go index 94e2c7d56..3c8094078 100644 --- a/pkg/common/db/table/unrelation/notification.go +++ b/pkg/common/db/table/unrelation/notification.go @@ -9,63 +9,63 @@ import ( ) const ( - singleGocMsgNum = 5000 - Msg = "msg" - OldestList = 0 - NewestList = -1 + singleGocNotificationNum = 5000 + Notification = "notification" + //OldestList = 0 + //NewestList = -1 ) -type MsgDocModel struct { - DocID string `bson:"uid"` - Msg []MsgInfoModel `bson:"msg"` +type NotificationDocModel struct { + DocID string `bson:"uid"` + Msg []NotificationInfoModel `bson:"msg"` } -type MsgInfoModel struct { +type NotificationInfoModel struct { SendTime int64 `bson:"sendtime"` Msg []byte `bson:"msg"` } -type MsgDocModelInterface interface { - PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []MsgInfoModel) error - Create(ctx context.Context, model *MsgDocModel) error +type NotificationDocModelInterface interface { + PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []NotificationInfoModel) error + Create(ctx context.Context, model *NotificationDocModel) error UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error - FindOneByDocID(ctx context.Context, docID string) (*MsgDocModel, error) - GetNewestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error) - GetOldestMsg(ctx context.Context, sourceID string) (*MsgInfoModel, error) + FindOneByDocID(ctx context.Context, docID string) (*NotificationDocModel, error) + GetNewestMsg(ctx context.Context, sourceID string) (*NotificationInfoModel, error) + GetOldestMsg(ctx context.Context, sourceID string) (*NotificationInfoModel, error) Delete(ctx context.Context, docIDs []string) error - GetMsgsByIndex(ctx context.Context, sourceID string, index int64) (*MsgDocModel, error) - UpdateOneDoc(ctx context.Context, msg *MsgDocModel) error + GetMsgsByIndex(ctx context.Context, sourceID string, index int64) (*NotificationDocModel, error) + UpdateOneDoc(ctx context.Context, msg *NotificationDocModel) error } -func (MsgDocModel) TableName() string { - return Msg +func (NotificationDocModel) TableName() string { + return Notification } -func (MsgDocModel) GetSingleGocMsgNum() int64 { - return singleGocMsgNum +func (NotificationDocModel) GetsingleGocNotificationNum() int64 { + return singleGocNotificationNum } -func (m *MsgDocModel) IsFull() bool { +func (m *NotificationDocModel) IsFull() bool { index, _ := strconv.Atoi(strings.Split(m.DocID, ":")[1]) if index == 0 { - if len(m.Msg) >= singleGocMsgNum-1 { + if len(m.Msg) >= singleGocNotificationNum-1 { return true } } - if len(m.Msg) >= singleGocMsgNum { + if len(m.Msg) >= singleGocNotificationNum { return true } return false } -func (m MsgDocModel) GetDocID(sourceID string, seq int64) string { - seqSuffix := seq / singleGocMsgNum +func (m NotificationDocModel) GetDocID(sourceID string, seq int64) string { + seqSuffix := seq / singleGocNotificationNum return m.indexGen(sourceID, seqSuffix) } -func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string { - seqMaxSuffix := maxSeq / singleGocMsgNum +func (m NotificationDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string { + seqMaxSuffix := maxSeq / singleGocNotificationNum var seqUserIDs []string for i := 0; i <= int(seqMaxSuffix); i++ { seqUserID := m.indexGen(userID, int64(i)) @@ -74,16 +74,16 @@ func (m MsgDocModel) GetSeqDocIDList(userID string, maxSeq int64) []string { return seqUserIDs } -func (m MsgDocModel) getSeqSuperGroupID(groupID string, seq int64) string { - seqSuffix := seq / singleGocMsgNum +func (m NotificationDocModel) getSeqSuperGroupID(groupID string, seq int64) string { + seqSuffix := seq / singleGocNotificationNum return m.superGroupIndexGen(groupID, seqSuffix) } -func (m MsgDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string { +func (m NotificationDocModel) superGroupIndexGen(groupID string, seqSuffix int64) string { return "super_group_" + groupID + ":" + strconv.FormatInt(int64(seqSuffix), 10) } -func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 { +func (m NotificationDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][]int64 { t := make(map[string][]int64) for i := 0; i < len(seqs); i++ { docID := m.GetDocID(sourceID, seqs[i]) @@ -97,22 +97,22 @@ func (m MsgDocModel) GetDocIDSeqsMap(sourceID string, seqs []int64) map[string][ return t } -func (m MsgDocModel) getMsgIndex(seq uint32) int { - seqSuffix := seq / singleGocMsgNum +func (m NotificationDocModel) getMsgIndex(seq uint32) int { + seqSuffix := seq / singleGocNotificationNum var index uint32 if seqSuffix == 0 { - index = (seq - seqSuffix*singleGocMsgNum) - 1 + index = (seq - seqSuffix*singleGocNotificationNum) - 1 } else { - index = seq - seqSuffix*singleGocMsgNum + index = seq - seqSuffix*singleGocNotificationNum } return int(index) } -func (m MsgDocModel) indexGen(sourceID string, seqSuffix int64) string { +func (m NotificationDocModel) indexGen(sourceID string, seqSuffix int64) string { return sourceID + ":" + strconv.FormatInt(seqSuffix, 10) } -func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) { +func (NotificationDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkws.MsgData) { for _, v := range seqs { msg := new(sdkws.MsgData) msg.Seq = v @@ -121,7 +121,7 @@ func (MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdkw return exceptionMsg } -func (MsgDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []int64, groupID string) (exceptionMsg []*sdkws.MsgData) { +func (NotificationDocModel) GenExceptionSuperGroupMessageBySeqs(seqs []int64, groupID string) (exceptionMsg []*sdkws.MsgData) { for _, v := range seqs { msg := new(sdkws.MsgData) msg.Seq = v diff --git a/pkg/common/db/unrelation/notification.go b/pkg/common/db/unrelation/notification.go new file mode 100644 index 000000000..8b62c04f1 --- /dev/null +++ b/pkg/common/db/unrelation/notification.go @@ -0,0 +1,134 @@ +package unrelation + +import ( + "context" + "errors" + "fmt" + + table "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/unrelation" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" + "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "google.golang.org/protobuf/proto" +) + +var ErrNotificationListNotExist = errors.New("user not have msg in mongoDB") +var ErrNotificationNotFound = errors.New("msg not found") + +type NotificationMongoDriver struct { + MsgCollection *mongo.Collection + msg table.NotificationDocModel +} + +func NewNotificationMongoDriver(database *mongo.Database) table.NotificationDocModelInterface { + return &NotificationMongoDriver{MsgCollection: database.Collection(table.NotificationDocModel{}.TableName())} +} + +func (m *NotificationMongoDriver) PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []table.NotificationInfoModel) error { + filter := bson.M{"uid": docID} + return m.MsgCollection.FindOneAndUpdate(ctx, filter, bson.M{"$push": bson.M{"msg": bson.M{"$each": msgsToMongo}}}).Err() +} + +func (m *NotificationMongoDriver) Create(ctx context.Context, model *table.NotificationDocModel) error { + _, err := m.MsgCollection.InsertOne(ctx, model) + return err +} + +func (m *NotificationMongoDriver) UpdateMsgStatusByIndexInOneDoc(ctx context.Context, docID string, msg *sdkws.MsgData, seqIndex int, status int32) error { + msg.Status = status + bytes, err := proto.Marshal(msg) + if err != nil { + return utils.Wrap(err, "") + } + _, err = m.MsgCollection.UpdateOne(ctx, bson.M{"uid": docID}, bson.M{"$set": bson.M{fmt.Sprintf("msg.%d.msg", seqIndex): bytes}}) + if err != nil { + return utils.Wrap(err, "") + } + return nil +} + +func (m *NotificationMongoDriver) FindOneByDocID(ctx context.Context, docID string) (*table.NotificationDocModel, error) { + doc := &table.NotificationDocModel{} + err := m.MsgCollection.FindOne(ctx, bson.M{"uid": docID}).Decode(doc) + return doc, err +} + +func (m *NotificationMongoDriver) GetMsgsByIndex(ctx context.Context, sourceID string, index int64) (*table.NotificationDocModel, error) { + findOpts := options.Find().SetLimit(1).SetSkip(index).SetSort(bson.M{"uid": 1}) + cursor, err := m.MsgCollection.Find(ctx, bson.M{"uid": primitive.Regex{Pattern: fmt.Sprintf("^%s:", sourceID)}}, findOpts) + if err != nil { + return nil, utils.Wrap(err, "") + } + var msgs []table.NotificationDocModel + err = cursor.All(context.Background(), &msgs) + if err != nil { + return nil, utils.Wrap(err, fmt.Sprintf("cursor is %s", cursor.Current.String())) + } + if len(msgs) > 0 { + return &msgs[0], nil + } + return nil, ErrMsgListNotExist +} + +func (m *NotificationMongoDriver) GetNewestMsg(ctx context.Context, sourceID string) (*table.NotificationInfoModel, error) { + var msgDocs []table.NotificationDocModel + cursor, err := m.MsgCollection.Find(ctx, bson.M{"uid": bson.M{"$regex": fmt.Sprintf("^%s:", sourceID)}}, options.Find().SetLimit(1).SetSort(bson.M{"uid": -1})) + if err != nil { + return nil, utils.Wrap(err, "") + } + err = cursor.All(ctx, &msgDocs) + if err != nil { + return nil, utils.Wrap(err, "") + } + if len(msgDocs) > 0 { + if len(msgDocs[0].Msg) > 0 { + return &msgDocs[0].Msg[len(msgDocs[0].Msg)-1], nil + } + return nil, errors.New("len(msgDocs[0].Msg) < 0") + } + return nil, ErrMsgNotFound +} + +func (m *NotificationMongoDriver) GetOldestMsg(ctx context.Context, sourceID string) (*table.NotificationInfoModel, error) { + var msgDocs []table.NotificationDocModel + cursor, err := m.MsgCollection.Find(ctx, bson.M{"uid": bson.M{"$regex": fmt.Sprintf("^%s:", sourceID)}}, options.Find().SetLimit(1).SetSort(bson.M{"uid": 1})) + if err != nil { + return nil, err + } + err = cursor.All(ctx, &msgDocs) + if err != nil { + return nil, utils.Wrap(err, "") + } + var oldestMsg table.NotificationInfoModel + if len(msgDocs) > 0 { + for _, v := range msgDocs[0].Msg { + if v.SendTime != 0 { + oldestMsg = v + break + } + } + if len(oldestMsg.Msg) == 0 { + if len(msgDocs[0].Msg) > 0 { + oldestMsg = msgDocs[0].Msg[0] + } + } + return &oldestMsg, nil + } + return nil, ErrMsgNotFound +} + +func (m *NotificationMongoDriver) Delete(ctx context.Context, docIDs []string) error { + if docIDs == nil { + return nil + } + _, err := m.MsgCollection.DeleteMany(ctx, bson.M{"uid": bson.M{"$in": docIDs}}) + return err +} + +func (m *NotificationMongoDriver) UpdateOneDoc(ctx context.Context, msg *table.NotificationDocModel) error { + _, err := m.MsgCollection.UpdateOne(ctx, bson.M{"uid": msg.DocID}, bson.M{"$set": bson.M{"msg": msg.Msg}}) + return err +} diff --git a/pkg/common/mw/gin.go b/pkg/common/mw/gin.go index 05c3bf6b1..ee6e0ad7e 100644 --- a/pkg/common/mw/gin.go +++ b/pkg/common/mw/gin.go @@ -109,7 +109,7 @@ func GinParseOperationID() gin.HandlerFunc { } } func GinParseToken(rdb redis.UniversalClient) gin.HandlerFunc { - dataBase := controller.NewAuthDatabase(cache.NewCacheModel(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire) + dataBase := controller.NewAuthDatabase(cache.NewMsgCacheModel(rdb), config.Config.TokenPolicy.AccessSecret, config.Config.TokenPolicy.AccessExpire) return func(c *gin.Context) { switch c.Request.Method { case http.MethodPost: