diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 88be287fd..9b1965b00 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -16,6 +16,7 @@ package msg import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "google.golang.org/grpc" @@ -34,12 +35,13 @@ import ( type ( MessageInterceptorChain []MessageInterceptorFunc msgServer struct { - RegisterCenter discoveryregistry.SvcDiscoveryRegistry - MsgDatabase controller.CommonMsgDatabase - Group *rpcclient.GroupRpcClient - User *rpcclient.UserRpcClient - Conversation *rpcclient.ConversationRpcClient - friend *rpcclient.FriendRpcClient + RegisterCenter discoveryregistry.SvcDiscoveryRegistry + MsgDatabase controller.CommonMsgDatabase + Group *rpcclient.GroupRpcClient + User *rpcclient.UserRpcClient + Conversation *rpcclient.ConversationRpcClient + friend *rpccache.FriendLocalCache + //friend *rpcclient.FriendRpcClient GroupLocalCache *localcache.GroupLocalCache ConversationLocalCache *localcache.ConversationLocalCache Handlers MessageInterceptorChain @@ -79,7 +81,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e conversationClient := rpcclient.NewConversationRpcClient(client) userRpcClient := rpcclient.NewUserRpcClient(client) groupRpcClient := rpcclient.NewGroupRpcClient(client) - friendRpcClient := rpcclient.NewFriendRpcClient(client) msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, cacheModel) s := &msgServer{ Conversation: &conversationClient, @@ -89,7 +90,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e RegisterCenter: client, GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient), ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient), - friend: &friendRpcClient, + friend: rpccache.NewFriendLocalCache(rpcclient.NewFriendRpcClient(client)), } s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg)) s.addInterceptorHandler(MessageHasReadEnabled) diff --git a/pkg/common/cachekey/black.go b/pkg/common/cachekey/black.go new file mode 100644 index 000000000..841e10c6a --- /dev/null +++ b/pkg/common/cachekey/black.go @@ -0,0 +1,15 @@ +package cachekey + +const ( + blackIDsKey = "BLACK_IDS:" + isBlackKey = "IS_BLACK:" +) + +func GetBlackIDsKey(ownerUserID string) string { + return blackIDsKey + ownerUserID + +} + +func GetIsBlackIDsKey(possibleBlackUserID, userID string) string { + return isBlackKey + possibleBlackUserID + "-" + userID +} diff --git a/pkg/common/cachekey/conversation.go b/pkg/common/cachekey/conversation.go new file mode 100644 index 000000000..eb5b83f0e --- /dev/null +++ b/pkg/common/cachekey/conversation.go @@ -0,0 +1,48 @@ +package cachekey + +import "time" + +const ( + conversationKey = "CONVERSATION:" + conversationIDsKey = "CONVERSATION_IDS:" + conversationIDsHashKey = "CONVERSATION_IDS_HASH:" + conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" + recvMsgOptKey = "RECV_MSG_OPT:" + superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" + superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" + conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" + + conversationExpireTime = time.Second * 60 * 60 * 12 +) + +func GetConversationKey(ownerUserID, conversationID string) string { + return conversationKey + ownerUserID + ":" + conversationID +} + +func GetConversationIDsKey(ownerUserID string) string { + return conversationIDsKey + ownerUserID +} + +func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { + return superGroupRecvMsgNotNotifyUserIDsKey + groupID +} + +func GetRecvMsgOptKey(ownerUserID, conversationID string) string { + return recvMsgOptKey + ownerUserID + ":" + conversationID +} + +func GetSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string { + return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID +} + +func GetConversationHasReadSeqKey(ownerUserID, conversationID string) string { + return conversationHasReadSeqKey + ownerUserID + ":" + conversationID +} + +func GetConversationNotReceiveMessageUserIDsKey(conversationID string) string { + return conversationNotReceiveMessageUserIDsKey + conversationID +} + +func GetUserConversationIDsHashKey(ownerUserID string) string { + return conversationIDsHashKey + ownerUserID +} diff --git a/pkg/common/cachekey/friend.go b/pkg/common/cachekey/friend.go new file mode 100644 index 000000000..1f9d4e94f --- /dev/null +++ b/pkg/common/cachekey/friend.go @@ -0,0 +1,27 @@ +package cachekey + +import "time" + +const ( + friendExpireTime = time.Second * 60 * 60 * 12 + friendIDsKey = "FRIEND_IDS:" + twoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" + friendKey = "FRIEND_INFO:" + isFriendKey = "IS_FRIEND:" +) + +func GetFriendIDsKey(ownerUserID string) string { + return friendIDsKey + ownerUserID +} + +func GetTwoWayFriendsIDsKey(ownerUserID string) string { + return twoWayFriendsIDsKey + ownerUserID +} + +func GetFriendKey(ownerUserID, friendUserID string) string { + return friendKey + ownerUserID + "-" + friendUserID +} + +func GetIsFriendKey(possibleFriendUserID, userID string) string { + return isFriendKey + possibleFriendUserID + "-" + userID +} diff --git a/pkg/common/cachekey/group.go b/pkg/common/cachekey/group.go new file mode 100644 index 000000000..3e5431b83 --- /dev/null +++ b/pkg/common/cachekey/group.go @@ -0,0 +1,45 @@ +package cachekey + +import ( + "strconv" + "time" +) + +const ( + groupExpireTime = time.Second * 60 * 60 * 12 + groupInfoKey = "GROUP_INFO:" + groupMemberIDsKey = "GROUP_MEMBER_IDS:" + groupMembersHashKey = "GROUP_MEMBERS_HASH2:" + groupMemberInfoKey = "GROUP_MEMBER_INFO:" + joinedGroupsKey = "JOIN_GROUPS_KEY:" + groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" + groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" +) + +func GetGroupInfoKey(groupID string) string { + return groupInfoKey + groupID +} + +func GetJoinedGroupsKey(userID string) string { + return joinedGroupsKey + userID +} + +func GetGroupMembersHashKey(groupID string) string { + return groupMembersHashKey + groupID +} + +func GetGroupMemberIDsKey(groupID string) string { + return groupMemberIDsKey + groupID +} + +func GetGroupMemberInfoKey(groupID, userID string) string { + return groupMemberInfoKey + groupID + "-" + userID +} + +func GetGroupMemberNumKey(groupID string) string { + return groupMemberNumKey + groupID +} + +func GetGroupRoleLevelMemberIDsKey(groupID string, roleLevel int32) string { + return groupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel)) +} diff --git a/pkg/common/cachekey/msg.go b/pkg/common/cachekey/msg.go new file mode 100644 index 000000000..b19c4f473 --- /dev/null +++ b/pkg/common/cachekey/msg.go @@ -0,0 +1,23 @@ +package cachekey + +const ( + maxSeq = "MAX_SEQ:" + minSeq = "MIN_SEQ:" + conversationUserMinSeq = "CON_USER_MIN_SEQ:" + hasReadSeq = "HAS_READ_SEQ:" + + //appleDeviceToken = "DEVICE_TOKEN" + getuiToken = "GETUI_TOKEN" + getuiTaskID = "GETUI_TASK_ID" + //signalCache = "SIGNAL_CACHE:" + //signalListCache = "SIGNAL_LIST_CACHE:" + FCM_TOKEN = "FCM_TOKEN:" + + messageCache = "MESSAGE_CACHE:" + messageDelUserList = "MESSAGE_DEL_USER_LIST:" + userDelMessagesList = "USER_DEL_MESSAGES_LIST:" + sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" + userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" + exTypeKeyLocker = "EX_LOCK:" + uidPidToken = "UID_PID_TOKEN_STATUS:" +) diff --git a/pkg/common/cachekey/s3.go b/pkg/common/cachekey/s3.go new file mode 100644 index 000000000..cbf8bfad5 --- /dev/null +++ b/pkg/common/cachekey/s3.go @@ -0,0 +1 @@ +package cachekey diff --git a/pkg/common/cachekey/user.go b/pkg/common/cachekey/user.go new file mode 100644 index 000000000..ed11641d5 --- /dev/null +++ b/pkg/common/cachekey/user.go @@ -0,0 +1,21 @@ +package cachekey + +import "time" + +const ( + userExpireTime = time.Second * 60 * 60 * 12 + userInfoKey = "USER_INFO:" + userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:" + olineStatusKey = "ONLINE_STATUS:" + userOlineStatusExpireTime = time.Second * 60 * 60 * 24 + statusMod = 501 + platformID = "_PlatformIDSuffix" +) + +func GetUserInfoKey(userID string) string { + return userInfoKey + userID +} + +func GetUserGlobalRecvMsgOptKey(userID string) string { + return userGlobalRecvMsgOptKey + userID +} diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index d1abe945c..f69b83afe 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -16,6 +16,7 @@ package cache import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "time" "github.com/dtm-labs/rockscache" @@ -71,7 +72,7 @@ func (b *BlackCacheRedis) NewCache() BlackCache { } func (b *BlackCacheRedis) getBlackIDsKey(ownerUserID string) string { - return blackIDsKey + ownerUserID + return cachekey.GetBlackIDsKey(ownerUserID) } func (b *BlackCacheRedis) GetBlackIDs(ctx context.Context, userID string) (blackIDs []string, err error) { diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index a7018bc18..cf638f38f 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -17,6 +17,7 @@ package cache import ( "context" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "math/big" "strings" "time" @@ -30,14 +31,14 @@ import ( ) const ( - conversationKey = "CONVERSATION:" - conversationIDsKey = "CONVERSATION_IDS:" - conversationIDsHashKey = "CONVERSATION_IDS_HASH:" - conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" - recvMsgOptKey = "RECV_MSG_OPT:" - superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" - superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" - conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" + //conversationKey = "CONVERSATION:" + //conversationIDsKey = "CONVERSATION_IDS:" + //conversationIDsHashKey = "CONVERSATION_IDS_HASH:" + //conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" + //recvMsgOptKey = "RECV_MSG_OPT:" + //superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" + //superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" + //conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" conversationExpireTime = time.Second * 60 * 60 * 12 ) @@ -125,31 +126,35 @@ func (c *ConversationRedisCache) NewCache() ConversationCache { } func (c *ConversationRedisCache) getConversationKey(ownerUserID, conversationID string) string { - return conversationKey + ownerUserID + ":" + conversationID + return cachekey.GetConversationKey(ownerUserID, conversationID) } func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) string { - return conversationIDsKey + ownerUserID + return cachekey.GetConversationIDsKey(ownerUserID) } func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { - return superGroupRecvMsgNotNotifyUserIDsKey + groupID + return cachekey.GetSuperGroupRecvNotNotifyUserIDsKey(groupID) } func (c *ConversationRedisCache) getRecvMsgOptKey(ownerUserID, conversationID string) string { - return recvMsgOptKey + ownerUserID + ":" + conversationID + return cachekey.GetRecvMsgOptKey(ownerUserID, conversationID) } func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string { - return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID + return cachekey.GetSuperGroupRecvNotNotifyUserIDsHashKey(groupID) } func (c *ConversationRedisCache) getConversationHasReadSeqKey(ownerUserID, conversationID string) string { - return conversationHasReadSeqKey + ownerUserID + ":" + conversationID + return cachekey.GetConversationHasReadSeqKey(ownerUserID, conversationID) } func (c *ConversationRedisCache) getConversationNotReceiveMessageUserIDsKey(conversationID string) string { - return conversationNotReceiveMessageUserIDsKey + conversationID + return cachekey.GetConversationNotReceiveMessageUserIDsKey(conversationID) +} + +func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID string) string { + return cachekey.GetUserConversationIDsHashKey(ownerUserID) } func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { @@ -169,10 +174,6 @@ func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) Conversat return cache } -func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID string) string { - return conversationIDsHashKey + ownerUserID -} - func (c *ConversationRedisCache) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) { return getCache( ctx, diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index 1708f7664..4805271a3 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -16,6 +16,7 @@ package cache import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "time" "github.com/dtm-labs/rockscache" @@ -27,10 +28,10 @@ import ( ) const ( - friendExpireTime = time.Second * 60 * 60 * 12 - friendIDsKey = "FRIEND_IDS:" - TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" - friendKey = "FRIEND_INFO:" + friendExpireTime = time.Second * 60 * 60 * 12 + //friendIDsKey = "FRIEND_IDS:" + //TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" + //friendKey = "FRIEND_INFO:" ) // FriendCache is an interface for caching friend-related data. @@ -78,17 +79,17 @@ func (f *FriendCacheRedis) NewCache() FriendCache { // getFriendIDsKey returns the key for storing friend IDs in the cache. func (f *FriendCacheRedis) getFriendIDsKey(ownerUserID string) string { - return friendIDsKey + ownerUserID + return cachekey.GetFriendIDsKey(ownerUserID) } // getTwoWayFriendsIDsKey returns the key for storing two-way friend IDs in the cache. func (f *FriendCacheRedis) getTwoWayFriendsIDsKey(ownerUserID string) string { - return TwoWayFriendsIDsKey + ownerUserID + return cachekey.GetTwoWayFriendsIDsKey(ownerUserID) } // getFriendKey returns the key for storing friend info in the cache. func (f *FriendCacheRedis) getFriendKey(ownerUserID, friendUserID string) string { - return friendKey + ownerUserID + "-" + friendUserID + return cachekey.GetFriendKey(ownerUserID, friendUserID) } // GetFriendIDs retrieves friend IDs from the cache or the database if not found. diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 57fcf1a9b..58c42ccfd 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -17,7 +17,7 @@ package cache import ( "context" "fmt" - "strconv" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "time" "github.com/OpenIMSDK/protocol/constant" @@ -34,15 +34,14 @@ import ( ) const ( - groupExpireTime = time.Second * 60 * 60 * 12 - groupInfoKey = "GROUP_INFO:" - groupMemberIDsKey = "GROUP_MEMBER_IDS:" - groupMembersHashKey = "GROUP_MEMBERS_HASH2:" - groupMemberInfoKey = "GROUP_MEMBER_INFO:" - //groupOwnerInfoKey = "GROUP_OWNER_INFO:". - joinedGroupsKey = "JOIN_GROUPS_KEY:" - groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" - groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" + groupExpireTime = time.Second * 60 * 60 * 12 + //groupInfoKey = "GROUP_INFO:" + //groupMemberIDsKey = "GROUP_MEMBER_IDS:" + //groupMembersHashKey = "GROUP_MEMBERS_HASH2:" + //groupMemberInfoKey = "GROUP_MEMBER_INFO:" + //joinedGroupsKey = "JOIN_GROUPS_KEY:" + //groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" + //groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" ) type GroupHash interface { @@ -126,31 +125,31 @@ func (g *GroupCacheRedis) NewCache() GroupCache { } func (g *GroupCacheRedis) getGroupInfoKey(groupID string) string { - return groupInfoKey + groupID + return cachekey.GetGroupInfoKey(groupID) } func (g *GroupCacheRedis) getJoinedGroupsKey(userID string) string { - return joinedGroupsKey + userID + return cachekey.GetJoinedGroupsKey(userID) } func (g *GroupCacheRedis) getGroupMembersHashKey(groupID string) string { - return groupMembersHashKey + groupID + return cachekey.GetGroupMembersHashKey(groupID) } func (g *GroupCacheRedis) getGroupMemberIDsKey(groupID string) string { - return groupMemberIDsKey + groupID + return cachekey.GetGroupMemberIDsKey(groupID) } func (g *GroupCacheRedis) getGroupMemberInfoKey(groupID, userID string) string { - return groupMemberInfoKey + groupID + "-" + userID + return cachekey.GetGroupMemberInfoKey(groupID, userID) } func (g *GroupCacheRedis) getGroupMemberNumKey(groupID string) string { - return groupMemberNumKey + groupID + return cachekey.GetGroupMemberNumKey(groupID) } func (g *GroupCacheRedis) getGroupRoleLevelMemberIDsKey(groupID string, roleLevel int32) string { - return groupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel)) + return cachekey.GetGroupRoleLevelMemberIDsKey(groupID, roleLevel) } func (g *GroupCacheRedis) GetGroupIndex(group *relationtb.GroupModel, keys []string) (int, error) { diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index 5cd3cb22c..c1749e719 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -44,12 +44,12 @@ const ( conversationUserMinSeq = "CON_USER_MIN_SEQ:" hasReadSeq = "HAS_READ_SEQ:" - appleDeviceToken = "DEVICE_TOKEN" - getuiToken = "GETUI_TOKEN" - getuiTaskID = "GETUI_TASK_ID" - signalCache = "SIGNAL_CACHE:" - signalListCache = "SIGNAL_LIST_CACHE:" - FCM_TOKEN = "FCM_TOKEN:" + //appleDeviceToken = "DEVICE_TOKEN" + getuiToken = "GETUI_TOKEN" + getuiTaskID = "GETUI_TASK_ID" + //signalCache = "SIGNAL_CACHE:" + //signalListCache = "SIGNAL_LIST_CACHE:" + FCM_TOKEN = "FCM_TOKEN:" messageCache = "MESSAGE_CACHE:" messageDelUserList = "MESSAGE_DEL_USER_LIST:" @@ -148,6 +148,10 @@ func (c *msgCache) getHasReadSeqKey(conversationID string, userID string) string return hasReadSeq + userID + ":" + conversationID } +func (c *msgCache) getConversationUserMinSeqKey(conversationID, userID string) string { + return conversationUserMinSeq + conversationID + "u:" + userID +} + func (c *msgCache) setSeq(ctx context.Context, conversationID string, seq int64, getkey func(conversationID string) string) error { return utils.Wrap1(c.rdb.Set(ctx, getkey(conversationID), seq, 0).Err()) } @@ -209,10 +213,6 @@ func (c *msgCache) GetMinSeq(ctx context.Context, conversationID string) (int64, return c.getSeq(ctx, conversationID, c.getMinSeqKey) } -func (c *msgCache) getConversationUserMinSeqKey(conversationID, userID string) string { - return conversationUserMinSeq + conversationID + "u:" + userID -} - func (c *msgCache) GetConversationUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { return utils.Wrap2(c.rdb.Get(ctx, c.getConversationUserMinSeqKey(conversationID, userID)).Int64()) } diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index 979bd06e4..14ed7988e 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "hash/crc32" "strconv" "time" @@ -36,8 +37,8 @@ import ( ) const ( - userExpireTime = time.Second * 60 * 60 * 12 - userInfoKey = "USER_INFO:" + userExpireTime = time.Second * 60 * 60 * 12 + //userInfoKey = "USER_INFO:" userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:" olineStatusKey = "ONLINE_STATUS:" userOlineStatusExpireTime = time.Second * 60 * 60 * 24 @@ -93,18 +94,17 @@ func (u *UserCacheRedis) NewCache() UserCache { } func (u *UserCacheRedis) getUserInfoKey(userID string) string { - return userInfoKey + userID + return cachekey.GetUserInfoKey(userID) } func (u *UserCacheRedis) getUserGlobalRecvMsgOptKey(userID string) string { - return userGlobalRecvMsgOptKey + userID + return cachekey.GetUserGlobalRecvMsgOptKey(userID) } func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userInfo *relationtb.UserModel, err error) { return getCache(ctx, u.rcClient, u.getUserInfoKey(userID), u.expireTime, func(ctx context.Context) (*relationtb.UserModel, error) { return u.userDB.Take(ctx, userID) - }, - ) + }) } func (u *UserCacheRedis) GetUsersInfo(ctx context.Context, userIDs []string) ([]*relationtb.UserModel, error) { diff --git a/pkg/common/localcache/business.go b/pkg/common/localcache/business.go new file mode 100644 index 000000000..221eb2664 --- /dev/null +++ b/pkg/common/localcache/business.go @@ -0,0 +1,69 @@ +package localcache + +import ( + "context" + "encoding/json" + "github.com/OpenIMSDK/tools/log" + "github.com/dtm-labs/rockscache" + "github.com/redis/go-redis/v9" +) + +func WithRedisDeleteSubscribe(topic string, cli redis.UniversalClient) Option { + return WithDeleteLocal(func(fn func(key ...string)) { + if fn == nil { + log.ZDebug(context.Background(), "WithRedisDeleteSubscribe fn is nil", "topic", topic) + return + } + msg := cli.Subscribe(context.Background(), topic).Channel() + for m := range msg { + var key []string + if err := json.Unmarshal([]byte(m.Payload), &key); err != nil { + log.ZError(context.Background(), "WithRedisDeleteSubscribe json unmarshal error", err, "topic", topic, "payload", m.Payload) + continue + } + if len(key) == 0 { + continue + } + fn(key...) + } + }) +} + +func WithRedisDeletePublish(topic string, cli redis.UniversalClient) Option { + return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { + data, err := json.Marshal(key) + if err != nil { + log.ZError(ctx, "json marshal error", err, "topic", topic, "key", key) + return + } + if err := cli.Publish(ctx, topic, data).Err(); err != nil { + log.ZError(ctx, "redis publish error", err, "topic", topic, "key", key) + } else { + log.ZDebug(ctx, "redis publish success", "topic", topic, "key", key) + } + }) +} + +func WithRedisDelete(cli redis.UniversalClient) Option { + return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { + for _, s := range key { + if err := cli.Del(ctx, s).Err(); err != nil { + log.ZError(ctx, "redis delete error", err, "key", s) + } else { + log.ZDebug(ctx, "redis delete success", "key", s) + } + } + }) +} + +func WithRocksCacheDelete(cli *rockscache.Client) Option { + return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { + for _, k := range key { + if err := cli.TagAsDeleted2(ctx, k); err != nil { + log.ZError(ctx, "rocksdb delete error", err, "key", k) + } else { + log.ZDebug(ctx, "rocksdb delete success", "key", k) + } + } + }) +} diff --git a/pkg/common/localcache/cache.go b/pkg/common/localcache/cache.go new file mode 100644 index 000000000..f70f9435b --- /dev/null +++ b/pkg/common/localcache/cache.go @@ -0,0 +1,66 @@ +package localcache + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local" +) + +type Cache[V any] interface { + Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error) + Del(ctx context.Context, key ...string) +} + +func New[V any](opts ...Option) Cache[V] { + opt := defaultOption() + for _, o := range opts { + o(opt) + } + if opt.enable { + lc := local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target) + c := &cache[V]{ + opt: opt, + local: lc, + } + go func() { + c.opt.delCh(c.del) + }() + return c + } else { + return &cache[V]{ + opt: opt, + } + } +} + +type cache[V any] struct { + opt *option + local local.Cache[V] +} + +func (c *cache[V]) del(key ...string) { + for _, k := range key { + c.local.Del(k) + } +} + +func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error) { + if c.opt.enable { + return c.local.Get(key, func() (V, error) { + return fetch(ctx) + }) + } else { + return fetch(ctx) + } +} + +func (c *cache[V]) Del(ctx context.Context, key ...string) { + if len(key) == 0 { + return + } + for _, fn := range c.opt.delFn { + fn(ctx, key...) + } + if c.opt.enable { + c.del(key...) + } +} diff --git a/pkg/common/localcache/local/cache.go b/pkg/common/localcache/local/cache.go new file mode 100644 index 000000000..a957166fe --- /dev/null +++ b/pkg/common/localcache/local/cache.go @@ -0,0 +1,51 @@ +package local + +import ( + "hash/fnv" + "time" + "unsafe" +) + +type Cache[V any] interface { + Get(key string, fetch func() (V, error)) (V, error) + Del(key string) bool +} + +func NewCache[V any](slotNum, slotSize int, successTTL, failedTTL time.Duration, target Target) Cache[V] { + c := &cache[V]{ + n: uint64(slotNum), + slots: make([]*LRU[string, V], slotNum), + target: target, + } + for i := 0; i < slotNum; i++ { + c.slots[i] = NewLRU[string, V](slotSize, successTTL, failedTTL, c.target) + } + return c +} + +type cache[V any] struct { + n uint64 + slots []*LRU[string, V] + target Target +} + +func (c *cache[V]) index(s string) uint64 { + h := fnv.New64a() + _, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s))) + //_, _ = h.Write([]byte(s)) + return h.Sum64() % c.n +} + +func (c *cache[V]) Get(key string, fetch func() (V, error)) (V, error) { + return c.slots[c.index(key)].Get(key, fetch) +} + +func (c *cache[V]) Del(key string) bool { + if c.slots[c.index(key)].Del(key) { + c.target.IncrDelHit() + return true + } else { + c.target.IncrDelNotFound() + return false + } +} diff --git a/pkg/common/localcache/local/lru.go b/pkg/common/localcache/local/lru.go new file mode 100644 index 000000000..0befdfcab --- /dev/null +++ b/pkg/common/localcache/local/lru.go @@ -0,0 +1,76 @@ +package local + +import ( + "github.com/hashicorp/golang-lru/v2/simplelru" + "sync" + "time" +) + +type waitItem[V any] struct { + lock sync.Mutex + expires int64 + active bool + err error + value V +} + +func NewLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target) *LRU[K, V] { + core, err := simplelru.NewLRU[K, *waitItem[V]](size, nil) + if err != nil { + panic(err) + } + return &LRU[K, V]{ + core: core, + successTTL: successTTL, + failedTTL: failedTTL, + target: target, + } +} + +type LRU[K comparable, V any] struct { + lock sync.Mutex + core *simplelru.LRU[K, *waitItem[V]] + successTTL time.Duration + failedTTL time.Duration + target Target +} + +func (x *LRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { + x.lock.Lock() + v, ok := x.core.Get(key) + if ok { + x.lock.Unlock() + v.lock.Lock() + expires, value, err := v.expires, v.value, v.err + if expires != 0 && expires > time.Now().UnixMilli() { + v.lock.Unlock() + x.target.IncrGetHit() + return value, err + } + } else { + v = &waitItem[V]{} + x.core.Add(key, v) + v.lock.Lock() + x.lock.Unlock() + } + defer v.lock.Unlock() + if v.expires > time.Now().UnixMilli() { + return v.value, v.err + } + v.value, v.err = fetch() + if v.err == nil { + v.expires = time.Now().Add(x.successTTL).UnixMilli() + x.target.IncrGetSuccess() + } else { + v.expires = time.Now().Add(x.failedTTL).UnixMilli() + x.target.IncrGetFailed() + } + return v.value, v.err +} + +func (x *LRU[K, V]) Del(key K) bool { + x.lock.Lock() + ok := x.core.Remove(key) + x.lock.Unlock() + return ok +} diff --git a/pkg/common/localcache/local/lru_test.go b/pkg/common/localcache/local/lru_test.go new file mode 100644 index 000000000..adbd52277 --- /dev/null +++ b/pkg/common/localcache/local/lru_test.go @@ -0,0 +1,95 @@ +package local + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" +) + +type cacheTarget struct { + getHit int64 + getSuccess int64 + getFailed int64 + delHit int64 + delNotFound int64 +} + +func (r *cacheTarget) IncrGetHit() { + atomic.AddInt64(&r.getHit, 1) +} + +func (r *cacheTarget) IncrGetSuccess() { + atomic.AddInt64(&r.getSuccess, 1) +} + +func (r *cacheTarget) IncrGetFailed() { + atomic.AddInt64(&r.getFailed, 1) +} + +func (r *cacheTarget) IncrDelHit() { + atomic.AddInt64(&r.delHit, 1) +} + +func (r *cacheTarget) IncrDelNotFound() { + atomic.AddInt64(&r.delNotFound, 1) +} + +func (r *cacheTarget) String() string { + return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound) +} + +func TestName(t *testing.T) { + target := &cacheTarget{} + l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target) + //l := NewLRU[string, string](1000, time.Second*20, time.Second*5, target) + + fn := func(key string, n int, fetch func() (string, error)) { + for i := 0; i < n; i++ { + //v, err := l.Get(key, fetch) + //if err == nil { + // t.Log("key", key, "value", v) + //} else { + // t.Error("key", key, err) + //} + l.Get(key, fetch) + //time.Sleep(time.Second / 100) + } + } + + tmp := make(map[string]struct{}) + + var wg sync.WaitGroup + for i := 0; i < 10000; i++ { + wg.Add(1) + key := fmt.Sprintf("key_%d", i%200) + tmp[key] = struct{}{} + go func() { + defer wg.Done() + //t.Log(key) + fn(key, 10000, func() (string, error) { + //time.Sleep(time.Second * 3) + //t.Log(time.Now(), "key", key, "fetch") + //if rand.Uint32()%5 == 0 { + // return "value_" + key, nil + //} + //return "", errors.New("rand error") + return "value_" + key, nil + }) + }() + + //wg.Add(1) + //go func() { + // defer wg.Done() + // for i := 0; i < 10; i++ { + // l.Del(key) + // time.Sleep(time.Second / 3) + // } + //}() + } + wg.Wait() + t.Log(len(tmp)) + t.Log(target.String()) + +} diff --git a/pkg/common/localcache/local/target.go b/pkg/common/localcache/local/target.go new file mode 100644 index 000000000..6cb134fb0 --- /dev/null +++ b/pkg/common/localcache/local/target.go @@ -0,0 +1,10 @@ +package local + +type Target interface { + IncrGetHit() + IncrGetSuccess() + IncrGetFailed() + + IncrDelHit() + IncrDelNotFound() +} diff --git a/pkg/common/localcache/option.go b/pkg/common/localcache/option.go new file mode 100644 index 000000000..01c9ce57d --- /dev/null +++ b/pkg/common/localcache/option.go @@ -0,0 +1,113 @@ +package localcache + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local" + "time" +) + +func defaultOption() *option { + return &option{ + enable: true, + localSlotNum: 500, + localSlotSize: 20000, + localSuccessTTL: time.Minute, + localFailedTTL: time.Second * 5, + delFn: make([]func(ctx context.Context, key ...string), 0, 2), + target: emptyTarget{}, + } +} + +type option struct { + enable bool + localSlotNum int + localSlotSize int + localSuccessTTL time.Duration + localFailedTTL time.Duration + delFn []func(ctx context.Context, key ...string) + delCh func(fn func(key ...string)) + target local.Target +} + +type Option func(o *option) + +func WithDisable() Option { + return func(o *option) { + o.enable = false + } +} + +func WithLocalSlotNum(localSlotNum int) Option { + if localSlotNum < 1 { + panic("localSlotNum should be greater than 0") + } + return func(o *option) { + o.localSlotNum = localSlotNum + } +} + +func WithLocalSlotSize(localSlotSize int) Option { + if localSlotSize < 1 { + panic("localSlotSize should be greater than 0") + } + return func(o *option) { + o.localSlotSize = localSlotSize + } +} + +func WithLocalSuccessTTL(localSuccessTTL time.Duration) Option { + if localSuccessTTL < 0 { + panic("localSuccessTTL should be greater than 0") + } + return func(o *option) { + o.localSuccessTTL = localSuccessTTL + } +} + +func WithLocalFailedTTL(localFailedTTL time.Duration) Option { + if localFailedTTL < 0 { + panic("localFailedTTL should be greater than 0") + } + return func(o *option) { + o.localFailedTTL = localFailedTTL + } +} + +func WithTarget(target local.Target) Option { + if target == nil { + panic("target should not be nil") + } + return func(o *option) { + o.target = target + } +} + +func WithDeleteKeyBefore(fn func(ctx context.Context, key ...string)) Option { + if fn == nil { + panic("fn should not be nil") + } + return func(o *option) { + o.delFn = append(o.delFn, fn) + } +} + +func WithDeleteLocal(fn func(fn func(key ...string))) Option { + if fn == nil { + panic("fn should not be nil") + } + return func(o *option) { + o.delCh = fn + } +} + +type emptyTarget struct{} + +func (e emptyTarget) IncrGetHit() {} + +func (e emptyTarget) IncrGetSuccess() {} + +func (e emptyTarget) IncrGetFailed() {} + +func (e emptyTarget) IncrDelHit() {} + +func (e emptyTarget) IncrDelNotFound() {} diff --git a/pkg/common/localcache/tool.go b/pkg/common/localcache/tool.go new file mode 100644 index 000000000..ea3590823 --- /dev/null +++ b/pkg/common/localcache/tool.go @@ -0,0 +1,9 @@ +package localcache + +func AnyValue[V any](v any, err error) (V, error) { + if err != nil { + var zero V + return zero, err + } + return v.(V), nil +} diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go new file mode 100644 index 000000000..0585b2e9d --- /dev/null +++ b/pkg/rpccache/friend.go @@ -0,0 +1,38 @@ +package rpccache + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" +) + +func NewFriendLocalCache(client rpcclient.FriendRpcClient) *FriendLocalCache { + return &FriendLocalCache{ + local: localcache.New[any](), + client: client, + } +} + +type FriendLocalCache struct { + local localcache.Cache[any] + client rpcclient.FriendRpcClient +} + +func (f *FriendLocalCache) GetFriendIDs(ctx context.Context, ownerUserID string) ([]string, error) { + return localcache.AnyValue[[]string](f.local.Get(ctx, cachekey.GetFriendIDsKey(ownerUserID), func(ctx context.Context) (any, error) { + return f.client.GetFriendIDs(ctx, ownerUserID) + })) +} + +func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (bool, error) { + return localcache.AnyValue[bool](f.local.Get(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) (any, error) { + return f.client.IsFriend(ctx, possibleFriendUserID, userID) + })) +} + +func (f *FriendLocalCache) IsBlocked(ctx context.Context, possibleBlackUserID, userID string) (bool, error) { + return localcache.AnyValue[bool](f.local.Get(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) (any, error) { + return f.client.IsFriend(ctx, possibleBlackUserID, userID) + })) +}