diff --git a/deployments/templates/openim.yaml b/deployments/templates/openim.yaml index 69f578f81..3a416a806 100644 --- a/deployments/templates/openim.yaml +++ b/deployments/templates/openim.yaml @@ -530,6 +530,16 @@ prometheus: localCache: friend: - topic: friend + topic: delete_cache_friend + slotNum: 500 + slotSize: 20000 + + group: + topic: delete_cache_group + slotNum: 500 + slotSize: 20000 + + conversation: + topic: delete_cache_conversation slotNum: 500 slotSize: 20000 \ No newline at end of file diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 9e66f8f73..8a9da0577 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -17,6 +17,7 @@ package push import ( "context" "github.com/OpenIMSDK/tools/utils" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "sync" "google.golang.org/grpc" @@ -28,7 +29,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -51,8 +51,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e client, offlinePusher, database, - localcache.NewGroupLocalCache(&groupRpcClient), - localcache.NewConversationLocalCache(&conversationRpcClient), + rpccache.NewGroupLocalCache(rpcclient.NewGroupRpcClient(client), rdb), + rpccache.NewConversationLocalCache(rpcclient.NewConversationRpcClient(client), rdb), &conversationRpcClient, &groupRpcClient, &msgRpcClient, diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 7cee7b99d..2cf1a34b5 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "google.golang.org/grpc" "sync" @@ -40,7 +41,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" @@ -50,8 +50,8 @@ type Pusher struct { database controller.PushDatabase discov discoveryregistry.SvcDiscoveryRegistry offlinePusher offlinepush.OfflinePusher - groupLocalCache *localcache.GroupLocalCache - conversationLocalCache *localcache.ConversationLocalCache + groupLocalCache *rpccache.GroupLocalCache + conversationLocalCache *rpccache.ConversationLocalCache msgRpcClient *rpcclient.MessageRpcClient conversationRpcClient *rpcclient.ConversationRpcClient groupRpcClient *rpcclient.GroupRpcClient @@ -60,7 +60,7 @@ type Pusher struct { var errNoOfflinePusher = errors.New("no offlinePusher is configured") func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, - groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache, + groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient, ) *Pusher { return &Pusher{ diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index f5b1dd823..08786b43b 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -27,7 +27,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -35,15 +34,14 @@ import ( type ( MessageInterceptorChain []MessageInterceptorFunc msgServer struct { - RegisterCenter discoveryregistry.SvcDiscoveryRegistry - MsgDatabase controller.CommonMsgDatabase - Group *rpcclient.GroupRpcClient - User *rpcclient.UserRpcClient - Conversation *rpcclient.ConversationRpcClient - friend *rpccache.FriendLocalCache - //friend *rpcclient.FriendRpcClient - GroupLocalCache *localcache.GroupLocalCache - ConversationLocalCache *localcache.ConversationLocalCache + RegisterCenter discoveryregistry.SvcDiscoveryRegistry + MsgDatabase controller.CommonMsgDatabase + Group *rpcclient.GroupRpcClient + User *rpcclient.UserRpcClient + Conversation *rpcclient.ConversationRpcClient + friend *rpccache.FriendLocalCache + GroupLocalCache *rpccache.GroupLocalCache + ConversationLocalCache *rpccache.ConversationLocalCache Handlers MessageInterceptorChain notificationSender *rpcclient.NotificationSender } @@ -89,8 +87,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e Group: &groupRpcClient, MsgDatabase: msgDatabase, RegisterCenter: client, - GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient), - ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient), + GroupLocalCache: rpccache.NewGroupLocalCache(rpcclient.NewGroupRpcClient(client), rdb), + ConversationLocalCache: rpccache.NewConversationLocalCache(rpcclient.NewConversationRpcClient(client), rdb), friend: rpccache.NewFriendLocalCache(rpcclient.NewFriendRpcClient(client), rdb), } s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg)) diff --git a/pkg/common/cachekey/black.go b/pkg/common/cachekey/black.go index 136972cb7..527ad14dc 100644 --- a/pkg/common/cachekey/black.go +++ b/pkg/common/cachekey/black.go @@ -1,15 +1,15 @@ package cachekey const ( - blackIDsKey = "BLACK_IDS:" - isBlackKey = "IS_BLACK:" + BlackIDsKey = "BLACK_IDS:" + IsBlackKey = "IS_BLACK:" // local cache ) func GetBlackIDsKey(ownerUserID string) string { - return blackIDsKey + ownerUserID + return BlackIDsKey + ownerUserID } func GetIsBlackIDsKey(possibleBlackUserID, userID string) string { - return isBlackKey + userID + "-" + possibleBlackUserID + return IsBlackKey + userID + "-" + possibleBlackUserID } diff --git a/pkg/common/cachekey/conversation.go b/pkg/common/cachekey/conversation.go index eb5b83f0e..665ca11c6 100644 --- a/pkg/common/cachekey/conversation.go +++ b/pkg/common/cachekey/conversation.go @@ -1,48 +1,44 @@ package cachekey -import "time" - const ( - conversationKey = "CONVERSATION:" - conversationIDsKey = "CONVERSATION_IDS:" - conversationIDsHashKey = "CONVERSATION_IDS_HASH:" - conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" - recvMsgOptKey = "RECV_MSG_OPT:" - superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" - superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" - conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" - - conversationExpireTime = time.Second * 60 * 60 * 12 + ConversationKey = "CONVERSATION:" + ConversationIDsKey = "CONVERSATION_IDS:" + ConversationIDsHashKey = "CONVERSATION_IDS_HASH:" + ConversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" + RecvMsgOptKey = "RECV_MSG_OPT:" + SuperGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" + SuperGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" + ConversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" ) func GetConversationKey(ownerUserID, conversationID string) string { - return conversationKey + ownerUserID + ":" + conversationID + return ConversationKey + ownerUserID + ":" + conversationID } func GetConversationIDsKey(ownerUserID string) string { - return conversationIDsKey + ownerUserID + return ConversationIDsKey + ownerUserID } func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { - return superGroupRecvMsgNotNotifyUserIDsKey + groupID + return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID } func GetRecvMsgOptKey(ownerUserID, conversationID string) string { - return recvMsgOptKey + ownerUserID + ":" + conversationID + return RecvMsgOptKey + ownerUserID + ":" + conversationID } func GetSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string { - return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID + return SuperGroupRecvMsgNotNotifyUserIDsHashKey + groupID } func GetConversationHasReadSeqKey(ownerUserID, conversationID string) string { - return conversationHasReadSeqKey + ownerUserID + ":" + conversationID + return ConversationHasReadSeqKey + ownerUserID + ":" + conversationID } func GetConversationNotReceiveMessageUserIDsKey(conversationID string) string { - return conversationNotReceiveMessageUserIDsKey + conversationID + return ConversationNotReceiveMessageUserIDsKey + conversationID } func GetUserConversationIDsHashKey(ownerUserID string) string { - return conversationIDsHashKey + ownerUserID + return ConversationIDsHashKey + ownerUserID } diff --git a/pkg/common/cachekey/friend.go b/pkg/common/cachekey/friend.go index 1f9d4e94f..f37c9da37 100644 --- a/pkg/common/cachekey/friend.go +++ b/pkg/common/cachekey/friend.go @@ -1,27 +1,24 @@ package cachekey -import "time" - const ( - friendExpireTime = time.Second * 60 * 60 * 12 - friendIDsKey = "FRIEND_IDS:" - twoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" - friendKey = "FRIEND_INFO:" - isFriendKey = "IS_FRIEND:" + FriendIDsKey = "FRIEND_IDS:" + TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" + FriendKey = "FRIEND_INFO:" + IsFriendKey = "IS_FRIEND:" // local cache key ) func GetFriendIDsKey(ownerUserID string) string { - return friendIDsKey + ownerUserID + return FriendIDsKey + ownerUserID } func GetTwoWayFriendsIDsKey(ownerUserID string) string { - return twoWayFriendsIDsKey + ownerUserID + return TwoWayFriendsIDsKey + ownerUserID } func GetFriendKey(ownerUserID, friendUserID string) string { - return friendKey + ownerUserID + "-" + friendUserID + return FriendKey + ownerUserID + "-" + friendUserID } func GetIsFriendKey(possibleFriendUserID, userID string) string { - return isFriendKey + possibleFriendUserID + "-" + userID + return IsFriendKey + possibleFriendUserID + "-" + userID } diff --git a/pkg/common/cachekey/group.go b/pkg/common/cachekey/group.go index 3e5431b83..1dcf0ffce 100644 --- a/pkg/common/cachekey/group.go +++ b/pkg/common/cachekey/group.go @@ -7,39 +7,39 @@ import ( const ( groupExpireTime = time.Second * 60 * 60 * 12 - groupInfoKey = "GROUP_INFO:" - groupMemberIDsKey = "GROUP_MEMBER_IDS:" - groupMembersHashKey = "GROUP_MEMBERS_HASH2:" - groupMemberInfoKey = "GROUP_MEMBER_INFO:" - joinedGroupsKey = "JOIN_GROUPS_KEY:" - groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" - groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" + GroupInfoKey = "GROUP_INFO:" + GroupMemberIDsKey = "GROUP_MEMBER_IDS:" + GroupMembersHashKey = "GROUP_MEMBERS_HASH2:" + GroupMemberInfoKey = "GROUP_MEMBER_INFO:" + JoinedGroupsKey = "JOIN_GROUPS_KEY:" + GroupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" + GroupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" ) func GetGroupInfoKey(groupID string) string { - return groupInfoKey + groupID + return GroupInfoKey + groupID } func GetJoinedGroupsKey(userID string) string { - return joinedGroupsKey + userID + return JoinedGroupsKey + userID } func GetGroupMembersHashKey(groupID string) string { - return groupMembersHashKey + groupID + return GroupMembersHashKey + groupID } func GetGroupMemberIDsKey(groupID string) string { - return groupMemberIDsKey + groupID + return GroupMemberIDsKey + groupID } func GetGroupMemberInfoKey(groupID, userID string) string { - return groupMemberInfoKey + groupID + "-" + userID + return GroupMemberInfoKey + groupID + "-" + userID } func GetGroupMemberNumKey(groupID string) string { - return groupMemberNumKey + groupID + return GroupMemberNumKey + groupID } func GetGroupRoleLevelMemberIDsKey(groupID string, roleLevel int32) string { - return groupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel)) + return GroupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel)) } diff --git a/pkg/common/cachekey/msg.go b/pkg/common/cachekey/msg.go deleted file mode 100644 index b19c4f473..000000000 --- a/pkg/common/cachekey/msg.go +++ /dev/null @@ -1,23 +0,0 @@ -package cachekey - -const ( - maxSeq = "MAX_SEQ:" - minSeq = "MIN_SEQ:" - conversationUserMinSeq = "CON_USER_MIN_SEQ:" - hasReadSeq = "HAS_READ_SEQ:" - - //appleDeviceToken = "DEVICE_TOKEN" - getuiToken = "GETUI_TOKEN" - getuiTaskID = "GETUI_TASK_ID" - //signalCache = "SIGNAL_CACHE:" - //signalListCache = "SIGNAL_LIST_CACHE:" - FCM_TOKEN = "FCM_TOKEN:" - - messageCache = "MESSAGE_CACHE:" - messageDelUserList = "MESSAGE_DEL_USER_LIST:" - userDelMessagesList = "USER_DEL_MESSAGES_LIST:" - sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" - userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" - exTypeKeyLocker = "EX_LOCK:" - uidPidToken = "UID_PID_TOKEN_STATUS:" -) diff --git a/pkg/common/cachekey/s3.go b/pkg/common/cachekey/s3.go deleted file mode 100644 index cbf8bfad5..000000000 --- a/pkg/common/cachekey/s3.go +++ /dev/null @@ -1 +0,0 @@ -package cachekey diff --git a/pkg/common/cachekey/user.go b/pkg/common/cachekey/user.go deleted file mode 100644 index ed11641d5..000000000 --- a/pkg/common/cachekey/user.go +++ /dev/null @@ -1,21 +0,0 @@ -package cachekey - -import "time" - -const ( - userExpireTime = time.Second * 60 * 60 * 12 - userInfoKey = "USER_INFO:" - userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:" - olineStatusKey = "ONLINE_STATUS:" - userOlineStatusExpireTime = time.Second * 60 * 60 * 24 - statusMod = 501 - platformID = "_PlatformIDSuffix" -) - -func GetUserInfoKey(userID string) string { - return userInfoKey + userID -} - -func GetUserGlobalRecvMsgOptKey(userID string) string { - return userGlobalRecvMsgOptKey + userID -} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index a8d865ef5..b06209c16 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -378,8 +378,14 @@ type LocalCache struct { SlotSize int `yaml:"slotSize"` } +func (l LocalCache) Enable() bool { + return l.Topic != "" && l.SlotNum > 0 && l.SlotSize > 0 +} + type localCache struct { - Friend LocalCache `yaml:"friend"` + Friend LocalCache `yaml:"friend"` + Group LocalCache `yaml:"group"` + Conversation LocalCache `yaml:"conversation"` } func (c *configStruct) GetServiceNames() []string { diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index ca116fdc7..716c9eef0 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -16,7 +16,6 @@ package cache import ( "context" - "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "time" @@ -56,9 +55,7 @@ func NewBlackCacheRedis( ) BlackCache { rcClient := rockscache.NewClient(rdb, options) mc := NewMetaCacheRedis(rcClient) - f := config.Config.LocalCache.Friend - log.ZDebug(context.Background(), "black local cache init", "Topic", f.Topic, "SlotNum", f.SlotNum, "SlotSize", f.SlotSize) - mc.SetTopic(f.Topic) + mc.SetTopic(config.Config.LocalCache.Friend.Topic) mc.SetRawRedisClient(rdb) return &BlackCacheRedis{ expireTime: blackExpireTime, @@ -73,7 +70,7 @@ func (b *BlackCacheRedis) NewCache() BlackCache { expireTime: b.expireTime, rcClient: b.rcClient, blackDB: b.blackDB, - metaCache: b.metaCache.Copy(), + metaCache: b.Copy(), } } diff --git a/pkg/common/db/cache/config.go b/pkg/common/db/cache/config.go new file mode 100644 index 000000000..dc5c55143 --- /dev/null +++ b/pkg/common/db/cache/config.go @@ -0,0 +1,62 @@ +package cache + +import ( + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "strings" + "sync" +) + +var ( + once sync.Once + subscribe map[string][]string +) + +func getPublishKey(topic string, key []string) []string { + if topic == "" || len(key) == 0 { + return nil + } + once.Do(func() { + list := []struct { + Local config.LocalCache + Keys []string + }{ + { + Local: config.Config.LocalCache.Group, + Keys: []string{cachekey.GroupMemberIDsKey}, + }, + { + Local: config.Config.LocalCache.Friend, + Keys: []string{cachekey.FriendIDsKey, cachekey.BlackIDsKey}, + }, + { + Local: config.Config.LocalCache.Conversation, + Keys: []string{cachekey.ConversationIDsKey}, + }, + } + subscribe = make(map[string][]string) + for _, v := range list { + if v.Local.Enable() { + subscribe[v.Local.Topic] = v.Keys + } + } + }) + prefix, ok := subscribe[topic] + if !ok { + return nil + } + res := make([]string, 0, len(key)) + for _, k := range key { + var exist bool + for _, p := range prefix { + if strings.HasPrefix(k, p) { + exist = true + break + } + } + if exist { + res = append(res, k) + } + } + return res +} diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index cf638f38f..9c0391e67 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -18,6 +18,7 @@ import ( "context" "errors" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "math/big" "strings" "time" @@ -85,10 +86,12 @@ type ConversationCache interface { func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationtb.ConversationModelInterface) ConversationCache { rcClient := rockscache.NewClient(rdb, opts) - + mc := NewMetaCacheRedis(rcClient) + mc.SetTopic(config.Config.LocalCache.Conversation.Topic) + mc.SetRawRedisClient(rdb) return &ConversationRedisCache{ rcClient: rcClient, - metaCache: NewMetaCacheRedis(rcClient), + metaCache: mc, conversationDB: db, expireTime: conversationExpireTime, } @@ -119,7 +122,7 @@ type ConversationRedisCache struct { func (c *ConversationRedisCache) NewCache() ConversationCache { return &ConversationRedisCache{ rcClient: c.rcClient, - metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...), + metaCache: c.Copy(), conversationDB: c.conversationDB, expireTime: c.expireTime, } diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index 6d287d2f5..432d08572 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -80,7 +80,7 @@ func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendMo func (f *FriendCacheRedis) NewCache() FriendCache { return &FriendCacheRedis{ rcClient: f.rcClient, - metaCache: f.metaCache.Copy(), + metaCache: f.Copy(), friendDB: f.friendDB, expireTime: f.expireTime, } diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 58c42ccfd..783f2e515 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "time" "github.com/OpenIMSDK/protocol/constant" @@ -104,12 +105,14 @@ func NewGroupCacheRedis( opts rockscache.Options, ) GroupCache { rcClient := rockscache.NewClient(rdb, opts) - + mc := NewMetaCacheRedis(rcClient) + mc.SetTopic(config.Config.LocalCache.Group.Topic) + mc.SetRawRedisClient(rdb) return &GroupCacheRedis{ rcClient: rcClient, expireTime: groupExpireTime, groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, groupHash: hashCode, - metaCache: NewMetaCacheRedis(rcClient), + metaCache: mc, } } @@ -120,7 +123,7 @@ func (g *GroupCacheRedis) NewCache() GroupCache { groupDB: g.groupDB, groupMemberDB: g.groupMemberDB, groupRequestDB: g.groupRequestDB, - metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDelKeys()...), + metaCache: g.Copy(), } } diff --git a/pkg/common/db/cache/meta_cache.go b/pkg/common/db/cache/meta_cache.go index dbe2344f0..86ade0b68 100644 --- a/pkg/common/db/cache/meta_cache.go +++ b/pkg/common/db/cache/meta_cache.go @@ -103,13 +103,13 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error { break } } - if m.topic != "" && m.redisClient != nil { - data, err := json.Marshal(m.keys) + if pk := getPublishKey(m.topic, m.keys); len(pk) > 0 { + data, err := json.Marshal(pk) if err != nil { - log.ZError(ctx, "keys json marshal failed", err, "topic", m.topic, "keys", m.keys) + log.ZError(ctx, "keys json marshal failed", err, "topic", m.topic, "keys", pk) } else { if err := m.redisClient.Publish(ctx, m.topic, string(data)).Err(); err != nil { - log.ZError(ctx, "redis publish cache delete error", err, "topic", m.topic, "keys", m.keys) + log.ZError(ctx, "redis publish cache delete error", err, "topic", m.topic, "keys", pk) } } } diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go deleted file mode 100644 index c40bcdbce..000000000 --- a/pkg/common/db/localcache/conversation.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package localcache - -import ( - "context" - "sync" - - "github.com/OpenIMSDK/protocol/conversation" - - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" -) - -type ConversationLocalCache struct { - lock sync.Mutex - superGroupRecvMsgNotNotifyUserIDs map[string]Hash - conversationIDs map[string]Hash - client *rpcclient.ConversationRpcClient -} - -type Hash struct { - hash uint64 - ids []string -} - -func NewConversationLocalCache(client *rpcclient.ConversationRpcClient) *ConversationLocalCache { - return &ConversationLocalCache{ - superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash), - conversationIDs: make(map[string]Hash), - client: client, - } -} - -func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { - resp, err := g.client.Client.GetRecvMsgNotNotifyUserIDs(ctx, &conversation.GetRecvMsgNotNotifyUserIDsReq{ - GroupID: groupID, - }) - if err != nil { - return nil, err - } - return resp.UserIDs, nil -} - -func (g *ConversationLocalCache) GetConversationIDs(ctx context.Context, userID string) ([]string, error) { - resp, err := g.client.Client.GetUserConversationIDsHash(ctx, &conversation.GetUserConversationIDsHashReq{ - OwnerUserID: userID, - }) - if err != nil { - return nil, err - } - - g.lock.Lock() - hash, ok := g.conversationIDs[userID] - g.lock.Unlock() - - if !ok || hash.hash != resp.Hash { - conversationIDsResp, err := g.client.Client.GetConversationIDs(ctx, &conversation.GetConversationIDsReq{ - UserID: userID, - }) - if err != nil { - return nil, err - } - - g.lock.Lock() - defer g.lock.Unlock() - g.conversationIDs[userID] = Hash{ - hash: resp.Hash, - ids: conversationIDsResp.ConversationIDs, - } - - return conversationIDsResp.ConversationIDs, nil - } - - return hash.ids, nil -} diff --git a/pkg/common/db/localcache/doc.go b/pkg/common/db/localcache/doc.go deleted file mode 100644 index d349373ee..000000000 --- a/pkg/common/db/localcache/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package localcache // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" diff --git a/pkg/common/db/localcache/group.go b/pkg/common/db/localcache/group.go deleted file mode 100644 index 4958d91ee..000000000 --- a/pkg/common/db/localcache/group.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package localcache - -import ( - "context" - "sync" - - "github.com/OpenIMSDK/protocol/group" - "github.com/OpenIMSDK/tools/errs" - - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" -) - -type GroupLocalCache struct { - lock sync.Mutex - cache map[string]GroupMemberIDsHash - client *rpcclient.GroupRpcClient -} - -type GroupMemberIDsHash struct { - memberListHash uint64 - userIDs []string -} - -func NewGroupLocalCache(client *rpcclient.GroupRpcClient) *GroupLocalCache { - return &GroupLocalCache{ - cache: make(map[string]GroupMemberIDsHash, 0), - client: client, - } -} - -func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { - resp, err := g.client.Client.GetGroupAbstractInfo(ctx, &group.GetGroupAbstractInfoReq{ - GroupIDs: []string{groupID}, - }) - if err != nil { - return nil, err - } - if len(resp.GroupAbstractInfos) < 1 { - return nil, errs.ErrGroupIDNotFound - } - - g.lock.Lock() - localHashInfo, ok := g.cache[groupID] - if ok && localHashInfo.memberListHash == resp.GroupAbstractInfos[0].GroupMemberListHash { - g.lock.Unlock() - return localHashInfo.userIDs, nil - } - g.lock.Unlock() - - groupMembersResp, err := g.client.Client.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{ - GroupID: groupID, - }) - if err != nil { - return nil, err - } - - g.lock.Lock() - defer g.lock.Unlock() - g.cache[groupID] = GroupMemberIDsHash{ - memberListHash: resp.GroupAbstractInfos[0].GroupMemberListHash, - userIDs: groupMembersResp.UserIDs, - } - return g.cache[groupID].userIDs, nil -} diff --git a/pkg/common/db/localcache/meta_local_cache.go b/pkg/common/db/localcache/meta_local_cache.go deleted file mode 100644 index ed9389c27..000000000 --- a/pkg/common/db/localcache/meta_local_cache.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package localcache diff --git a/pkg/common/localcache/cache.go b/pkg/common/localcache/cache.go index 228538fa6..56cf14ff8 100644 --- a/pkg/common/localcache/cache.go +++ b/pkg/common/localcache/cache.go @@ -4,11 +4,11 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/link" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local" - opt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" + lopt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" ) type Cache[V any] interface { - Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error) + Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error) Del(ctx context.Context, key ...string) } @@ -17,12 +17,17 @@ func New[V any](opts ...Option) Cache[V] { for _, o := range opts { o(opt) } - c := &cache[V]{opt: opt, link: link.New(opt.localSlotNum)} - c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) - go func() { - c.opt.delCh(c.del) - }() - return c + c := cache[V]{opt: opt} + if opt.localSlotNum > 0 && opt.localSlotSize > 0 { + c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + go func() { + c.opt.delCh(c.del) + }() + if opt.linkSlotNum > 0 { + c.link = link.New(opt.linkSlotNum) + } + } + return &c } type cache[V any] struct { @@ -32,10 +37,12 @@ type cache[V any] struct { } func (c *cache[V]) onEvict(key string, value V) { - lks := c.link.Del(key) - for k := range lks { - if key != k { // prevent deadlock - c.local.Del(k) + if c.link != nil { + lks := c.link.Del(key) + for k := range lks { + if key != k { // prevent deadlock + c.local.Del(k) + } } } } @@ -50,16 +57,14 @@ func (c *cache[V]) del(key ...string) { } } -func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error) { - enable := c.opt.enable - if len(opts) > 0 && opts[0].Enable != nil { - enable = *opts[0].Enable - } - if enable { - if len(opts) > 0 && len(opts[0].Link) > 0 { - c.link.Link(key, opts[0].Link...) - } +func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error) { + if c.local != nil { return c.local.Get(key, func() (V, error) { + if c.link != nil { + for _, o := range opts { + c.link.Link(key, o.Link...) + } + } return fetch(ctx) }) } else { @@ -74,7 +79,7 @@ func (c *cache[V]) Del(ctx context.Context, key ...string) { for _, fn := range c.opt.delFn { fn(ctx, key...) } - if c.opt.enable { + if c.local != nil { c.del(key...) } } diff --git a/pkg/common/localcache/option.go b/pkg/common/localcache/option.go index 01c9ce57d..f23a04e68 100644 --- a/pkg/common/localcache/option.go +++ b/pkg/common/localcache/option.go @@ -8,9 +8,9 @@ import ( func defaultOption() *option { return &option{ - enable: true, localSlotNum: 500, localSlotSize: 20000, + linkSlotNum: 500, localSuccessTTL: time.Minute, localFailedTTL: time.Second * 5, delFn: make([]func(ctx context.Context, key ...string), 0, 2), @@ -19,9 +19,9 @@ func defaultOption() *option { } type option struct { - enable bool localSlotNum int localSlotSize int + linkSlotNum int localSuccessTTL time.Duration localFailedTTL time.Duration delFn []func(ctx context.Context, key ...string) @@ -31,25 +31,27 @@ type option struct { type Option func(o *option) -func WithDisable() Option { +func WithLocalDisable() Option { + return WithLinkSlotNum(0) +} + +func WithLinkDisable() Option { + return WithLinkSlotNum(0) +} + +func WithLinkSlotNum(linkSlotNum int) Option { return func(o *option) { - o.enable = false + o.linkSlotNum = linkSlotNum } } func WithLocalSlotNum(localSlotNum int) Option { - if localSlotNum < 1 { - panic("localSlotNum should be greater than 0") - } return func(o *option) { o.localSlotNum = localSlotNum } } func WithLocalSlotSize(localSlotSize int) Option { - if localSlotSize < 1 { - panic("localSlotSize should be greater than 0") - } return func(o *option) { o.localSlotSize = localSlotSize } diff --git a/pkg/common/localcache/option/option.go b/pkg/common/localcache/option/option.go index 798c93ba5..8547b3339 100644 --- a/pkg/common/localcache/option/option.go +++ b/pkg/common/localcache/option/option.go @@ -5,20 +5,7 @@ func NewOption() *Option { } type Option struct { - Enable *bool - Link []string -} - -func (o *Option) WithEnable() *Option { - t := true - o.Enable = &t - return o -} - -func (o *Option) WithDisable() *Option { - f := false - o.Enable = &f - return o + Link []string } func (o *Option) WithLink(key ...string) *Option { diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go new file mode 100644 index 000000000..181b8fd8a --- /dev/null +++ b/pkg/rpccache/conversation.go @@ -0,0 +1,28 @@ +package rpccache + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/redis/go-redis/v9" +) + +func NewConversationLocalCache(client rpcclient.ConversationRpcClient, cli redis.UniversalClient) *ConversationLocalCache { + return &ConversationLocalCache{ + local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Conversation.Topic, cli)), + client: client, + } +} + +type ConversationLocalCache struct { + local localcache.Cache[any] + client rpcclient.ConversationRpcClient +} + +func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { + return localcache.AnyValue[[]string](c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) (any, error) { + return c.client.GetConversationIDs(ctx, ownerUserID) + })) +} diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go index 14adf1e0a..82d0a03a5 100644 --- a/pkg/rpccache/friend.go +++ b/pkg/rpccache/friend.go @@ -23,20 +23,20 @@ type FriendLocalCache struct { client rpcclient.FriendRpcClient } -func (f *FriendLocalCache) GetFriendIDs(ctx context.Context, ownerUserID string) (val []string, err error) { - log.ZDebug(ctx, "FriendLocalCache GetFriendIDs req", "ownerUserID", ownerUserID) - defer func() { - if err == nil { - log.ZDebug(ctx, "FriendLocalCache GetFriendIDs return", "value", val) - } else { - log.ZError(ctx, "FriendLocalCache GetFriendIDs return", err) - } - }() - return localcache.AnyValue[[]string](f.local.Get(ctx, cachekey.GetFriendIDsKey(ownerUserID), func(ctx context.Context) (any, error) { - log.ZDebug(ctx, "FriendLocalCache GetFriendIDs call rpc", "ownerUserID", ownerUserID) - return f.client.GetFriendIDs(ctx, ownerUserID) - })) -} +//func (f *FriendLocalCache) GetFriendIDs(ctx context.Context, ownerUserID string) (val []string, err error) { +// log.ZDebug(ctx, "FriendLocalCache GetFriendIDs req", "ownerUserID", ownerUserID) +// defer func() { +// if err == nil { +// log.ZDebug(ctx, "FriendLocalCache GetFriendIDs return", "value", val) +// } else { +// log.ZError(ctx, "FriendLocalCache GetFriendIDs return", err) +// } +// }() +// return localcache.AnyValue[[]string](f.local.Get(ctx, cachekey.GetFriendIDsKey(ownerUserID), func(ctx context.Context) (any, error) { +// log.ZDebug(ctx, "FriendLocalCache GetFriendIDs call rpc", "ownerUserID", ownerUserID) +// return f.client.GetFriendIDs(ctx, ownerUserID) +// })) +//} func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (val bool, err error) { log.ZDebug(ctx, "FriendLocalCache IsFriend req", "possibleFriendUserID", possibleFriendUserID, "userID", userID) diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go new file mode 100644 index 000000000..0c3bc6b93 --- /dev/null +++ b/pkg/rpccache/group.go @@ -0,0 +1,28 @@ +package rpccache + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/redis/go-redis/v9" +) + +func NewGroupLocalCache(client rpcclient.GroupRpcClient, cli redis.UniversalClient) *GroupLocalCache { + return &GroupLocalCache{ + local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Group.Topic, cli)), + client: client, + } +} + +type GroupLocalCache struct { + local localcache.Cache[any] + client rpcclient.GroupRpcClient +} + +func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { + return localcache.AnyValue[[]string](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) { + return g.client.GetGroupMemberIDs(ctx, groupID) + })) +}