From 45064ae5cab5224a36858455b1f42a087c1a05bd Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 12 Jan 2024 15:41:05 +0800 Subject: [PATCH 1/3] feat: local cache --- deployments/templates/openim.yaml | 12 ++- internal/push/push_rpc_server.go | 6 +- internal/push/push_to_client.go | 8 +- internal/rpc/msg/server.go | 22 +++-- pkg/common/cachekey/black.go | 8 +- pkg/common/cachekey/conversation.go | 36 ++++---- pkg/common/cachekey/friend.go | 19 ++--- pkg/common/cachekey/group.go | 28 +++---- pkg/common/cachekey/msg.go | 23 ------ pkg/common/cachekey/s3.go | 1 - pkg/common/cachekey/user.go | 21 ----- pkg/common/config/config.go | 8 +- pkg/common/db/cache/black.go | 7 +- pkg/common/db/cache/config.go | 62 ++++++++++++++ pkg/common/db/cache/conversation.go | 9 +- pkg/common/db/cache/friend.go | 2 +- pkg/common/db/cache/group.go | 9 +- pkg/common/db/cache/meta_cache.go | 8 +- pkg/common/db/localcache/conversation.go | 87 -------------------- pkg/common/db/localcache/doc.go | 15 ---- pkg/common/db/localcache/group.go | 78 ------------------ pkg/common/db/localcache/meta_local_cache.go | 15 ---- pkg/common/localcache/cache.go | 49 ++++++----- pkg/common/localcache/option.go | 22 ++--- pkg/common/localcache/option/option.go | 15 +--- pkg/rpccache/conversation.go | 28 +++++++ pkg/rpccache/friend.go | 28 +++---- pkg/rpccache/group.go | 28 +++++++ 28 files changed, 268 insertions(+), 386 deletions(-) delete mode 100644 pkg/common/cachekey/msg.go delete mode 100644 pkg/common/cachekey/s3.go delete mode 100644 pkg/common/cachekey/user.go create mode 100644 pkg/common/db/cache/config.go delete mode 100644 pkg/common/db/localcache/conversation.go delete mode 100644 pkg/common/db/localcache/doc.go delete mode 100644 pkg/common/db/localcache/group.go delete mode 100644 pkg/common/db/localcache/meta_local_cache.go create mode 100644 pkg/rpccache/conversation.go create mode 100644 pkg/rpccache/group.go diff --git a/deployments/templates/openim.yaml b/deployments/templates/openim.yaml index 69f578f81..3a416a806 100644 --- a/deployments/templates/openim.yaml +++ b/deployments/templates/openim.yaml @@ -530,6 +530,16 @@ prometheus: localCache: friend: - topic: friend + topic: delete_cache_friend + slotNum: 500 + slotSize: 20000 + + group: + topic: delete_cache_group + slotNum: 500 + slotSize: 20000 + + conversation: + topic: delete_cache_conversation slotNum: 500 slotSize: 20000 \ No newline at end of file diff --git a/internal/push/push_rpc_server.go b/internal/push/push_rpc_server.go index 9e66f8f73..8a9da0577 100644 --- a/internal/push/push_rpc_server.go +++ b/internal/push/push_rpc_server.go @@ -17,6 +17,7 @@ package push import ( "context" "github.com/OpenIMSDK/tools/utils" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "sync" "google.golang.org/grpc" @@ -28,7 +29,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -51,8 +51,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e client, offlinePusher, database, - localcache.NewGroupLocalCache(&groupRpcClient), - localcache.NewConversationLocalCache(&conversationRpcClient), + rpccache.NewGroupLocalCache(rpcclient.NewGroupRpcClient(client), rdb), + rpccache.NewConversationLocalCache(rpcclient.NewConversationRpcClient(client), rdb), &conversationRpcClient, &groupRpcClient, &msgRpcClient, diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go index 7cee7b99d..2cf1a34b5 100644 --- a/internal/push/push_to_client.go +++ b/internal/push/push_to_client.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" "google.golang.org/grpc" "sync" @@ -40,7 +41,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" @@ -50,8 +50,8 @@ type Pusher struct { database controller.PushDatabase discov discoveryregistry.SvcDiscoveryRegistry offlinePusher offlinepush.OfflinePusher - groupLocalCache *localcache.GroupLocalCache - conversationLocalCache *localcache.ConversationLocalCache + groupLocalCache *rpccache.GroupLocalCache + conversationLocalCache *rpccache.ConversationLocalCache msgRpcClient *rpcclient.MessageRpcClient conversationRpcClient *rpcclient.ConversationRpcClient groupRpcClient *rpcclient.GroupRpcClient @@ -60,7 +60,7 @@ type Pusher struct { var errNoOfflinePusher = errors.New("no offlinePusher is configured") func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, - groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache, + groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient, ) *Pusher { return &Pusher{ diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index f5b1dd823..08786b43b 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -27,7 +27,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" - "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -35,15 +34,14 @@ import ( type ( MessageInterceptorChain []MessageInterceptorFunc msgServer struct { - RegisterCenter discoveryregistry.SvcDiscoveryRegistry - MsgDatabase controller.CommonMsgDatabase - Group *rpcclient.GroupRpcClient - User *rpcclient.UserRpcClient - Conversation *rpcclient.ConversationRpcClient - friend *rpccache.FriendLocalCache - //friend *rpcclient.FriendRpcClient - GroupLocalCache *localcache.GroupLocalCache - ConversationLocalCache *localcache.ConversationLocalCache + RegisterCenter discoveryregistry.SvcDiscoveryRegistry + MsgDatabase controller.CommonMsgDatabase + Group *rpcclient.GroupRpcClient + User *rpcclient.UserRpcClient + Conversation *rpcclient.ConversationRpcClient + friend *rpccache.FriendLocalCache + GroupLocalCache *rpccache.GroupLocalCache + ConversationLocalCache *rpccache.ConversationLocalCache Handlers MessageInterceptorChain notificationSender *rpcclient.NotificationSender } @@ -89,8 +87,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e Group: &groupRpcClient, MsgDatabase: msgDatabase, RegisterCenter: client, - GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient), - ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient), + GroupLocalCache: rpccache.NewGroupLocalCache(rpcclient.NewGroupRpcClient(client), rdb), + ConversationLocalCache: rpccache.NewConversationLocalCache(rpcclient.NewConversationRpcClient(client), rdb), friend: rpccache.NewFriendLocalCache(rpcclient.NewFriendRpcClient(client), rdb), } s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg)) diff --git a/pkg/common/cachekey/black.go b/pkg/common/cachekey/black.go index 136972cb7..527ad14dc 100644 --- a/pkg/common/cachekey/black.go +++ b/pkg/common/cachekey/black.go @@ -1,15 +1,15 @@ package cachekey const ( - blackIDsKey = "BLACK_IDS:" - isBlackKey = "IS_BLACK:" + BlackIDsKey = "BLACK_IDS:" + IsBlackKey = "IS_BLACK:" // local cache ) func GetBlackIDsKey(ownerUserID string) string { - return blackIDsKey + ownerUserID + return BlackIDsKey + ownerUserID } func GetIsBlackIDsKey(possibleBlackUserID, userID string) string { - return isBlackKey + userID + "-" + possibleBlackUserID + return IsBlackKey + userID + "-" + possibleBlackUserID } diff --git a/pkg/common/cachekey/conversation.go b/pkg/common/cachekey/conversation.go index eb5b83f0e..665ca11c6 100644 --- a/pkg/common/cachekey/conversation.go +++ b/pkg/common/cachekey/conversation.go @@ -1,48 +1,44 @@ package cachekey -import "time" - const ( - conversationKey = "CONVERSATION:" - conversationIDsKey = "CONVERSATION_IDS:" - conversationIDsHashKey = "CONVERSATION_IDS_HASH:" - conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" - recvMsgOptKey = "RECV_MSG_OPT:" - superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" - superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" - conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" - - conversationExpireTime = time.Second * 60 * 60 * 12 + ConversationKey = "CONVERSATION:" + ConversationIDsKey = "CONVERSATION_IDS:" + ConversationIDsHashKey = "CONVERSATION_IDS_HASH:" + ConversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" + RecvMsgOptKey = "RECV_MSG_OPT:" + SuperGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" + SuperGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" + ConversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" ) func GetConversationKey(ownerUserID, conversationID string) string { - return conversationKey + ownerUserID + ":" + conversationID + return ConversationKey + ownerUserID + ":" + conversationID } func GetConversationIDsKey(ownerUserID string) string { - return conversationIDsKey + ownerUserID + return ConversationIDsKey + ownerUserID } func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { - return superGroupRecvMsgNotNotifyUserIDsKey + groupID + return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID } func GetRecvMsgOptKey(ownerUserID, conversationID string) string { - return recvMsgOptKey + ownerUserID + ":" + conversationID + return RecvMsgOptKey + ownerUserID + ":" + conversationID } func GetSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string { - return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID + return SuperGroupRecvMsgNotNotifyUserIDsHashKey + groupID } func GetConversationHasReadSeqKey(ownerUserID, conversationID string) string { - return conversationHasReadSeqKey + ownerUserID + ":" + conversationID + return ConversationHasReadSeqKey + ownerUserID + ":" + conversationID } func GetConversationNotReceiveMessageUserIDsKey(conversationID string) string { - return conversationNotReceiveMessageUserIDsKey + conversationID + return ConversationNotReceiveMessageUserIDsKey + conversationID } func GetUserConversationIDsHashKey(ownerUserID string) string { - return conversationIDsHashKey + ownerUserID + return ConversationIDsHashKey + ownerUserID } diff --git a/pkg/common/cachekey/friend.go b/pkg/common/cachekey/friend.go index 1f9d4e94f..f37c9da37 100644 --- a/pkg/common/cachekey/friend.go +++ b/pkg/common/cachekey/friend.go @@ -1,27 +1,24 @@ package cachekey -import "time" - const ( - friendExpireTime = time.Second * 60 * 60 * 12 - friendIDsKey = "FRIEND_IDS:" - twoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" - friendKey = "FRIEND_INFO:" - isFriendKey = "IS_FRIEND:" + FriendIDsKey = "FRIEND_IDS:" + TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" + FriendKey = "FRIEND_INFO:" + IsFriendKey = "IS_FRIEND:" // local cache key ) func GetFriendIDsKey(ownerUserID string) string { - return friendIDsKey + ownerUserID + return FriendIDsKey + ownerUserID } func GetTwoWayFriendsIDsKey(ownerUserID string) string { - return twoWayFriendsIDsKey + ownerUserID + return TwoWayFriendsIDsKey + ownerUserID } func GetFriendKey(ownerUserID, friendUserID string) string { - return friendKey + ownerUserID + "-" + friendUserID + return FriendKey + ownerUserID + "-" + friendUserID } func GetIsFriendKey(possibleFriendUserID, userID string) string { - return isFriendKey + possibleFriendUserID + "-" + userID + return IsFriendKey + possibleFriendUserID + "-" + userID } diff --git a/pkg/common/cachekey/group.go b/pkg/common/cachekey/group.go index 3e5431b83..1dcf0ffce 100644 --- a/pkg/common/cachekey/group.go +++ b/pkg/common/cachekey/group.go @@ -7,39 +7,39 @@ import ( const ( groupExpireTime = time.Second * 60 * 60 * 12 - groupInfoKey = "GROUP_INFO:" - groupMemberIDsKey = "GROUP_MEMBER_IDS:" - groupMembersHashKey = "GROUP_MEMBERS_HASH2:" - groupMemberInfoKey = "GROUP_MEMBER_INFO:" - joinedGroupsKey = "JOIN_GROUPS_KEY:" - groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" - groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" + GroupInfoKey = "GROUP_INFO:" + GroupMemberIDsKey = "GROUP_MEMBER_IDS:" + GroupMembersHashKey = "GROUP_MEMBERS_HASH2:" + GroupMemberInfoKey = "GROUP_MEMBER_INFO:" + JoinedGroupsKey = "JOIN_GROUPS_KEY:" + GroupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" + GroupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" ) func GetGroupInfoKey(groupID string) string { - return groupInfoKey + groupID + return GroupInfoKey + groupID } func GetJoinedGroupsKey(userID string) string { - return joinedGroupsKey + userID + return JoinedGroupsKey + userID } func GetGroupMembersHashKey(groupID string) string { - return groupMembersHashKey + groupID + return GroupMembersHashKey + groupID } func GetGroupMemberIDsKey(groupID string) string { - return groupMemberIDsKey + groupID + return GroupMemberIDsKey + groupID } func GetGroupMemberInfoKey(groupID, userID string) string { - return groupMemberInfoKey + groupID + "-" + userID + return GroupMemberInfoKey + groupID + "-" + userID } func GetGroupMemberNumKey(groupID string) string { - return groupMemberNumKey + groupID + return GroupMemberNumKey + groupID } func GetGroupRoleLevelMemberIDsKey(groupID string, roleLevel int32) string { - return groupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel)) + return GroupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel)) } diff --git a/pkg/common/cachekey/msg.go b/pkg/common/cachekey/msg.go deleted file mode 100644 index b19c4f473..000000000 --- a/pkg/common/cachekey/msg.go +++ /dev/null @@ -1,23 +0,0 @@ -package cachekey - -const ( - maxSeq = "MAX_SEQ:" - minSeq = "MIN_SEQ:" - conversationUserMinSeq = "CON_USER_MIN_SEQ:" - hasReadSeq = "HAS_READ_SEQ:" - - //appleDeviceToken = "DEVICE_TOKEN" - getuiToken = "GETUI_TOKEN" - getuiTaskID = "GETUI_TASK_ID" - //signalCache = "SIGNAL_CACHE:" - //signalListCache = "SIGNAL_LIST_CACHE:" - FCM_TOKEN = "FCM_TOKEN:" - - messageCache = "MESSAGE_CACHE:" - messageDelUserList = "MESSAGE_DEL_USER_LIST:" - userDelMessagesList = "USER_DEL_MESSAGES_LIST:" - sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" - userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:" - exTypeKeyLocker = "EX_LOCK:" - uidPidToken = "UID_PID_TOKEN_STATUS:" -) diff --git a/pkg/common/cachekey/s3.go b/pkg/common/cachekey/s3.go deleted file mode 100644 index cbf8bfad5..000000000 --- a/pkg/common/cachekey/s3.go +++ /dev/null @@ -1 +0,0 @@ -package cachekey diff --git a/pkg/common/cachekey/user.go b/pkg/common/cachekey/user.go deleted file mode 100644 index ed11641d5..000000000 --- a/pkg/common/cachekey/user.go +++ /dev/null @@ -1,21 +0,0 @@ -package cachekey - -import "time" - -const ( - userExpireTime = time.Second * 60 * 60 * 12 - userInfoKey = "USER_INFO:" - userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:" - olineStatusKey = "ONLINE_STATUS:" - userOlineStatusExpireTime = time.Second * 60 * 60 * 24 - statusMod = 501 - platformID = "_PlatformIDSuffix" -) - -func GetUserInfoKey(userID string) string { - return userInfoKey + userID -} - -func GetUserGlobalRecvMsgOptKey(userID string) string { - return userGlobalRecvMsgOptKey + userID -} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index a8d865ef5..b06209c16 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -378,8 +378,14 @@ type LocalCache struct { SlotSize int `yaml:"slotSize"` } +func (l LocalCache) Enable() bool { + return l.Topic != "" && l.SlotNum > 0 && l.SlotSize > 0 +} + type localCache struct { - Friend LocalCache `yaml:"friend"` + Friend LocalCache `yaml:"friend"` + Group LocalCache `yaml:"group"` + Conversation LocalCache `yaml:"conversation"` } func (c *configStruct) GetServiceNames() []string { diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index ca116fdc7..716c9eef0 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -16,7 +16,6 @@ package cache import ( "context" - "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "time" @@ -56,9 +55,7 @@ func NewBlackCacheRedis( ) BlackCache { rcClient := rockscache.NewClient(rdb, options) mc := NewMetaCacheRedis(rcClient) - f := config.Config.LocalCache.Friend - log.ZDebug(context.Background(), "black local cache init", "Topic", f.Topic, "SlotNum", f.SlotNum, "SlotSize", f.SlotSize) - mc.SetTopic(f.Topic) + mc.SetTopic(config.Config.LocalCache.Friend.Topic) mc.SetRawRedisClient(rdb) return &BlackCacheRedis{ expireTime: blackExpireTime, @@ -73,7 +70,7 @@ func (b *BlackCacheRedis) NewCache() BlackCache { expireTime: b.expireTime, rcClient: b.rcClient, blackDB: b.blackDB, - metaCache: b.metaCache.Copy(), + metaCache: b.Copy(), } } diff --git a/pkg/common/db/cache/config.go b/pkg/common/db/cache/config.go new file mode 100644 index 000000000..dc5c55143 --- /dev/null +++ b/pkg/common/db/cache/config.go @@ -0,0 +1,62 @@ +package cache + +import ( + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "strings" + "sync" +) + +var ( + once sync.Once + subscribe map[string][]string +) + +func getPublishKey(topic string, key []string) []string { + if topic == "" || len(key) == 0 { + return nil + } + once.Do(func() { + list := []struct { + Local config.LocalCache + Keys []string + }{ + { + Local: config.Config.LocalCache.Group, + Keys: []string{cachekey.GroupMemberIDsKey}, + }, + { + Local: config.Config.LocalCache.Friend, + Keys: []string{cachekey.FriendIDsKey, cachekey.BlackIDsKey}, + }, + { + Local: config.Config.LocalCache.Conversation, + Keys: []string{cachekey.ConversationIDsKey}, + }, + } + subscribe = make(map[string][]string) + for _, v := range list { + if v.Local.Enable() { + subscribe[v.Local.Topic] = v.Keys + } + } + }) + prefix, ok := subscribe[topic] + if !ok { + return nil + } + res := make([]string, 0, len(key)) + for _, k := range key { + var exist bool + for _, p := range prefix { + if strings.HasPrefix(k, p) { + exist = true + break + } + } + if exist { + res = append(res, k) + } + } + return res +} diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index cf638f38f..9c0391e67 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -18,6 +18,7 @@ import ( "context" "errors" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "math/big" "strings" "time" @@ -85,10 +86,12 @@ type ConversationCache interface { func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationtb.ConversationModelInterface) ConversationCache { rcClient := rockscache.NewClient(rdb, opts) - + mc := NewMetaCacheRedis(rcClient) + mc.SetTopic(config.Config.LocalCache.Conversation.Topic) + mc.SetRawRedisClient(rdb) return &ConversationRedisCache{ rcClient: rcClient, - metaCache: NewMetaCacheRedis(rcClient), + metaCache: mc, conversationDB: db, expireTime: conversationExpireTime, } @@ -119,7 +122,7 @@ type ConversationRedisCache struct { func (c *ConversationRedisCache) NewCache() ConversationCache { return &ConversationRedisCache{ rcClient: c.rcClient, - metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...), + metaCache: c.Copy(), conversationDB: c.conversationDB, expireTime: c.expireTime, } diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index 6d287d2f5..432d08572 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -80,7 +80,7 @@ func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendMo func (f *FriendCacheRedis) NewCache() FriendCache { return &FriendCacheRedis{ rcClient: f.rcClient, - metaCache: f.metaCache.Copy(), + metaCache: f.Copy(), friendDB: f.friendDB, expireTime: f.expireTime, } diff --git a/pkg/common/db/cache/group.go b/pkg/common/db/cache/group.go index 58c42ccfd..783f2e515 100644 --- a/pkg/common/db/cache/group.go +++ b/pkg/common/db/cache/group.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "time" "github.com/OpenIMSDK/protocol/constant" @@ -104,12 +105,14 @@ func NewGroupCacheRedis( opts rockscache.Options, ) GroupCache { rcClient := rockscache.NewClient(rdb, opts) - + mc := NewMetaCacheRedis(rcClient) + mc.SetTopic(config.Config.LocalCache.Group.Topic) + mc.SetRawRedisClient(rdb) return &GroupCacheRedis{ rcClient: rcClient, expireTime: groupExpireTime, groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, groupHash: hashCode, - metaCache: NewMetaCacheRedis(rcClient), + metaCache: mc, } } @@ -120,7 +123,7 @@ func (g *GroupCacheRedis) NewCache() GroupCache { groupDB: g.groupDB, groupMemberDB: g.groupMemberDB, groupRequestDB: g.groupRequestDB, - metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDelKeys()...), + metaCache: g.Copy(), } } diff --git a/pkg/common/db/cache/meta_cache.go b/pkg/common/db/cache/meta_cache.go index dbe2344f0..86ade0b68 100644 --- a/pkg/common/db/cache/meta_cache.go +++ b/pkg/common/db/cache/meta_cache.go @@ -103,13 +103,13 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error { break } } - if m.topic != "" && m.redisClient != nil { - data, err := json.Marshal(m.keys) + if pk := getPublishKey(m.topic, m.keys); len(pk) > 0 { + data, err := json.Marshal(pk) if err != nil { - log.ZError(ctx, "keys json marshal failed", err, "topic", m.topic, "keys", m.keys) + log.ZError(ctx, "keys json marshal failed", err, "topic", m.topic, "keys", pk) } else { if err := m.redisClient.Publish(ctx, m.topic, string(data)).Err(); err != nil { - log.ZError(ctx, "redis publish cache delete error", err, "topic", m.topic, "keys", m.keys) + log.ZError(ctx, "redis publish cache delete error", err, "topic", m.topic, "keys", pk) } } } diff --git a/pkg/common/db/localcache/conversation.go b/pkg/common/db/localcache/conversation.go deleted file mode 100644 index c40bcdbce..000000000 --- a/pkg/common/db/localcache/conversation.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package localcache - -import ( - "context" - "sync" - - "github.com/OpenIMSDK/protocol/conversation" - - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" -) - -type ConversationLocalCache struct { - lock sync.Mutex - superGroupRecvMsgNotNotifyUserIDs map[string]Hash - conversationIDs map[string]Hash - client *rpcclient.ConversationRpcClient -} - -type Hash struct { - hash uint64 - ids []string -} - -func NewConversationLocalCache(client *rpcclient.ConversationRpcClient) *ConversationLocalCache { - return &ConversationLocalCache{ - superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash), - conversationIDs: make(map[string]Hash), - client: client, - } -} - -func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) { - resp, err := g.client.Client.GetRecvMsgNotNotifyUserIDs(ctx, &conversation.GetRecvMsgNotNotifyUserIDsReq{ - GroupID: groupID, - }) - if err != nil { - return nil, err - } - return resp.UserIDs, nil -} - -func (g *ConversationLocalCache) GetConversationIDs(ctx context.Context, userID string) ([]string, error) { - resp, err := g.client.Client.GetUserConversationIDsHash(ctx, &conversation.GetUserConversationIDsHashReq{ - OwnerUserID: userID, - }) - if err != nil { - return nil, err - } - - g.lock.Lock() - hash, ok := g.conversationIDs[userID] - g.lock.Unlock() - - if !ok || hash.hash != resp.Hash { - conversationIDsResp, err := g.client.Client.GetConversationIDs(ctx, &conversation.GetConversationIDsReq{ - UserID: userID, - }) - if err != nil { - return nil, err - } - - g.lock.Lock() - defer g.lock.Unlock() - g.conversationIDs[userID] = Hash{ - hash: resp.Hash, - ids: conversationIDsResp.ConversationIDs, - } - - return conversationIDsResp.ConversationIDs, nil - } - - return hash.ids, nil -} diff --git a/pkg/common/db/localcache/doc.go b/pkg/common/db/localcache/doc.go deleted file mode 100644 index d349373ee..000000000 --- a/pkg/common/db/localcache/doc.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package localcache // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache" diff --git a/pkg/common/db/localcache/group.go b/pkg/common/db/localcache/group.go deleted file mode 100644 index 4958d91ee..000000000 --- a/pkg/common/db/localcache/group.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package localcache - -import ( - "context" - "sync" - - "github.com/OpenIMSDK/protocol/group" - "github.com/OpenIMSDK/tools/errs" - - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" -) - -type GroupLocalCache struct { - lock sync.Mutex - cache map[string]GroupMemberIDsHash - client *rpcclient.GroupRpcClient -} - -type GroupMemberIDsHash struct { - memberListHash uint64 - userIDs []string -} - -func NewGroupLocalCache(client *rpcclient.GroupRpcClient) *GroupLocalCache { - return &GroupLocalCache{ - cache: make(map[string]GroupMemberIDsHash, 0), - client: client, - } -} - -func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { - resp, err := g.client.Client.GetGroupAbstractInfo(ctx, &group.GetGroupAbstractInfoReq{ - GroupIDs: []string{groupID}, - }) - if err != nil { - return nil, err - } - if len(resp.GroupAbstractInfos) < 1 { - return nil, errs.ErrGroupIDNotFound - } - - g.lock.Lock() - localHashInfo, ok := g.cache[groupID] - if ok && localHashInfo.memberListHash == resp.GroupAbstractInfos[0].GroupMemberListHash { - g.lock.Unlock() - return localHashInfo.userIDs, nil - } - g.lock.Unlock() - - groupMembersResp, err := g.client.Client.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{ - GroupID: groupID, - }) - if err != nil { - return nil, err - } - - g.lock.Lock() - defer g.lock.Unlock() - g.cache[groupID] = GroupMemberIDsHash{ - memberListHash: resp.GroupAbstractInfos[0].GroupMemberListHash, - userIDs: groupMembersResp.UserIDs, - } - return g.cache[groupID].userIDs, nil -} diff --git a/pkg/common/db/localcache/meta_local_cache.go b/pkg/common/db/localcache/meta_local_cache.go deleted file mode 100644 index ed9389c27..000000000 --- a/pkg/common/db/localcache/meta_local_cache.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package localcache diff --git a/pkg/common/localcache/cache.go b/pkg/common/localcache/cache.go index 228538fa6..56cf14ff8 100644 --- a/pkg/common/localcache/cache.go +++ b/pkg/common/localcache/cache.go @@ -4,11 +4,11 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/link" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local" - opt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" + lopt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" ) type Cache[V any] interface { - Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error) + Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error) Del(ctx context.Context, key ...string) } @@ -17,12 +17,17 @@ func New[V any](opts ...Option) Cache[V] { for _, o := range opts { o(opt) } - c := &cache[V]{opt: opt, link: link.New(opt.localSlotNum)} - c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) - go func() { - c.opt.delCh(c.del) - }() - return c + c := cache[V]{opt: opt} + if opt.localSlotNum > 0 && opt.localSlotSize > 0 { + c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + go func() { + c.opt.delCh(c.del) + }() + if opt.linkSlotNum > 0 { + c.link = link.New(opt.linkSlotNum) + } + } + return &c } type cache[V any] struct { @@ -32,10 +37,12 @@ type cache[V any] struct { } func (c *cache[V]) onEvict(key string, value V) { - lks := c.link.Del(key) - for k := range lks { - if key != k { // prevent deadlock - c.local.Del(k) + if c.link != nil { + lks := c.link.Del(key) + for k := range lks { + if key != k { // prevent deadlock + c.local.Del(k) + } } } } @@ -50,16 +57,14 @@ func (c *cache[V]) del(key ...string) { } } -func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error) { - enable := c.opt.enable - if len(opts) > 0 && opts[0].Enable != nil { - enable = *opts[0].Enable - } - if enable { - if len(opts) > 0 && len(opts[0].Link) > 0 { - c.link.Link(key, opts[0].Link...) - } +func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error) { + if c.local != nil { return c.local.Get(key, func() (V, error) { + if c.link != nil { + for _, o := range opts { + c.link.Link(key, o.Link...) + } + } return fetch(ctx) }) } else { @@ -74,7 +79,7 @@ func (c *cache[V]) Del(ctx context.Context, key ...string) { for _, fn := range c.opt.delFn { fn(ctx, key...) } - if c.opt.enable { + if c.local != nil { c.del(key...) } } diff --git a/pkg/common/localcache/option.go b/pkg/common/localcache/option.go index 01c9ce57d..f23a04e68 100644 --- a/pkg/common/localcache/option.go +++ b/pkg/common/localcache/option.go @@ -8,9 +8,9 @@ import ( func defaultOption() *option { return &option{ - enable: true, localSlotNum: 500, localSlotSize: 20000, + linkSlotNum: 500, localSuccessTTL: time.Minute, localFailedTTL: time.Second * 5, delFn: make([]func(ctx context.Context, key ...string), 0, 2), @@ -19,9 +19,9 @@ func defaultOption() *option { } type option struct { - enable bool localSlotNum int localSlotSize int + linkSlotNum int localSuccessTTL time.Duration localFailedTTL time.Duration delFn []func(ctx context.Context, key ...string) @@ -31,25 +31,27 @@ type option struct { type Option func(o *option) -func WithDisable() Option { +func WithLocalDisable() Option { + return WithLinkSlotNum(0) +} + +func WithLinkDisable() Option { + return WithLinkSlotNum(0) +} + +func WithLinkSlotNum(linkSlotNum int) Option { return func(o *option) { - o.enable = false + o.linkSlotNum = linkSlotNum } } func WithLocalSlotNum(localSlotNum int) Option { - if localSlotNum < 1 { - panic("localSlotNum should be greater than 0") - } return func(o *option) { o.localSlotNum = localSlotNum } } func WithLocalSlotSize(localSlotSize int) Option { - if localSlotSize < 1 { - panic("localSlotSize should be greater than 0") - } return func(o *option) { o.localSlotSize = localSlotSize } diff --git a/pkg/common/localcache/option/option.go b/pkg/common/localcache/option/option.go index 798c93ba5..8547b3339 100644 --- a/pkg/common/localcache/option/option.go +++ b/pkg/common/localcache/option/option.go @@ -5,20 +5,7 @@ func NewOption() *Option { } type Option struct { - Enable *bool - Link []string -} - -func (o *Option) WithEnable() *Option { - t := true - o.Enable = &t - return o -} - -func (o *Option) WithDisable() *Option { - f := false - o.Enable = &f - return o + Link []string } func (o *Option) WithLink(key ...string) *Option { diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go new file mode 100644 index 000000000..181b8fd8a --- /dev/null +++ b/pkg/rpccache/conversation.go @@ -0,0 +1,28 @@ +package rpccache + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/redis/go-redis/v9" +) + +func NewConversationLocalCache(client rpcclient.ConversationRpcClient, cli redis.UniversalClient) *ConversationLocalCache { + return &ConversationLocalCache{ + local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Conversation.Topic, cli)), + client: client, + } +} + +type ConversationLocalCache struct { + local localcache.Cache[any] + client rpcclient.ConversationRpcClient +} + +func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { + return localcache.AnyValue[[]string](c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) (any, error) { + return c.client.GetConversationIDs(ctx, ownerUserID) + })) +} diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go index 14adf1e0a..82d0a03a5 100644 --- a/pkg/rpccache/friend.go +++ b/pkg/rpccache/friend.go @@ -23,20 +23,20 @@ type FriendLocalCache struct { client rpcclient.FriendRpcClient } -func (f *FriendLocalCache) GetFriendIDs(ctx context.Context, ownerUserID string) (val []string, err error) { - log.ZDebug(ctx, "FriendLocalCache GetFriendIDs req", "ownerUserID", ownerUserID) - defer func() { - if err == nil { - log.ZDebug(ctx, "FriendLocalCache GetFriendIDs return", "value", val) - } else { - log.ZError(ctx, "FriendLocalCache GetFriendIDs return", err) - } - }() - return localcache.AnyValue[[]string](f.local.Get(ctx, cachekey.GetFriendIDsKey(ownerUserID), func(ctx context.Context) (any, error) { - log.ZDebug(ctx, "FriendLocalCache GetFriendIDs call rpc", "ownerUserID", ownerUserID) - return f.client.GetFriendIDs(ctx, ownerUserID) - })) -} +//func (f *FriendLocalCache) GetFriendIDs(ctx context.Context, ownerUserID string) (val []string, err error) { +// log.ZDebug(ctx, "FriendLocalCache GetFriendIDs req", "ownerUserID", ownerUserID) +// defer func() { +// if err == nil { +// log.ZDebug(ctx, "FriendLocalCache GetFriendIDs return", "value", val) +// } else { +// log.ZError(ctx, "FriendLocalCache GetFriendIDs return", err) +// } +// }() +// return localcache.AnyValue[[]string](f.local.Get(ctx, cachekey.GetFriendIDsKey(ownerUserID), func(ctx context.Context) (any, error) { +// log.ZDebug(ctx, "FriendLocalCache GetFriendIDs call rpc", "ownerUserID", ownerUserID) +// return f.client.GetFriendIDs(ctx, ownerUserID) +// })) +//} func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (val bool, err error) { log.ZDebug(ctx, "FriendLocalCache IsFriend req", "possibleFriendUserID", possibleFriendUserID, "userID", userID) diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go new file mode 100644 index 000000000..0c3bc6b93 --- /dev/null +++ b/pkg/rpccache/group.go @@ -0,0 +1,28 @@ +package rpccache + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/redis/go-redis/v9" +) + +func NewGroupLocalCache(client rpcclient.GroupRpcClient, cli redis.UniversalClient) *GroupLocalCache { + return &GroupLocalCache{ + local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Group.Topic, cli)), + client: client, + } +} + +type GroupLocalCache struct { + local localcache.Cache[any] + client rpcclient.GroupRpcClient +} + +func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { + return localcache.AnyValue[[]string](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) { + return g.client.GetGroupMemberIDs(ctx, groupID) + })) +} From a81bc3fc23991208d66bb1e3b7365d9dc0015f63 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 12 Jan 2024 17:51:01 +0800 Subject: [PATCH 2/3] feat: local cache --- go.mod | 5 +- pkg/common/localcache/business.go | 70 -------------- pkg/common/localcache/lru_test.go | 55 ----------- pkg/common/localcache/singleflight.go | 43 --------- pkg/common/localcache/target.go | 59 ------------ pkg/common/localcache/timingwheel.go | 71 -------------- pkg/localcache/business.go | 71 ++++++++++++++ pkg/{common => }/localcache/cache.go | 6 +- pkg/localcache/go.mod | 5 + pkg/{common => }/localcache/link/link.go | 0 pkg/{common => }/localcache/link/link_test.go | 0 pkg/localcache/local/cache.go | 50 ++++++++++ .../local}/callback.go | 2 +- .../localcache => localcache/local}/lru.go | 9 +- pkg/localcache/local/lru_test.go | 95 +++++++++++++++++++ pkg/localcache/local/target.go | 10 ++ pkg/{common => }/localcache/option.go | 2 +- pkg/{common => }/localcache/option/option.go | 0 pkg/{common => }/localcache/tool.go | 0 pkg/rpccache/conversation.go | 2 +- pkg/rpccache/friend.go | 4 +- pkg/rpccache/group.go | 2 +- 22 files changed, 245 insertions(+), 316 deletions(-) delete mode 100644 pkg/common/localcache/business.go delete mode 100644 pkg/common/localcache/lru_test.go delete mode 100644 pkg/common/localcache/singleflight.go delete mode 100644 pkg/common/localcache/target.go delete mode 100644 pkg/common/localcache/timingwheel.go create mode 100644 pkg/localcache/business.go rename pkg/{common => }/localcache/cache.go (88%) create mode 100644 pkg/localcache/go.mod rename pkg/{common => }/localcache/link/link.go (100%) rename pkg/{common => }/localcache/link/link_test.go (100%) create mode 100644 pkg/localcache/local/cache.go rename pkg/{common/localcache => localcache/local}/callback.go (86%) rename pkg/{common/localcache => localcache/local}/lru.go (91%) create mode 100644 pkg/localcache/local/lru_test.go create mode 100644 pkg/localcache/local/target.go rename pkg/{common => }/localcache/option.go (97%) rename pkg/{common => }/localcache/option/option.go (100%) rename pkg/{common => }/localcache/tool.go (100%) diff --git a/go.mod b/go.mod index 11a87d882..1086560bc 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/minio/minio-go/v7 v7.0.63 github.com/mitchellh/mapstructure v1.5.0 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 // indirect + github.com/openimsdk/localcache v0.0.1 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.17.0 github.com/robfig/cron/v3 v3.0.1 @@ -37,7 +38,6 @@ require ( github.com/IBM/sarama v1.41.3 github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible github.com/go-redis/redis v6.15.9+incompatible - github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/redis/go-redis/v9 v9.2.1 github.com/stathat/consistent v1.0.0 github.com/tencentyun/cos-go-sdk-v5 v0.7.45 @@ -82,6 +82,7 @@ require ( github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect @@ -157,3 +158,5 @@ require ( golang.org/x/crypto v0.14.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + +replace github.com/openimsdk/localcache => ./pkg/localcache diff --git a/pkg/common/localcache/business.go b/pkg/common/localcache/business.go deleted file mode 100644 index f011719df..000000000 --- a/pkg/common/localcache/business.go +++ /dev/null @@ -1,70 +0,0 @@ -package localcache - -import ( - "context" - "encoding/json" - "github.com/OpenIMSDK/tools/log" - "github.com/dtm-labs/rockscache" - "github.com/redis/go-redis/v9" -) - -func WithRedisDeleteSubscribe(topic string, cli redis.UniversalClient) Option { - return WithDeleteLocal(func(fn func(key ...string)) { - if fn == nil { - log.ZDebug(context.Background(), "WithRedisDeleteSubscribe fn is nil", "topic", topic) - return - } - msg := cli.Subscribe(context.Background(), topic).Channel() - for m := range msg { - log.ZDebug(context.Background(), "WithRedisDeleteSubscribe delete", "topic", m.Channel, "payload", m.Payload) - var key []string - if err := json.Unmarshal([]byte(m.Payload), &key); err != nil { - log.ZError(context.Background(), "WithRedisDeleteSubscribe json unmarshal error", err, "topic", topic, "payload", m.Payload) - continue - } - if len(key) == 0 { - continue - } - fn(key...) - } - }) -} - -func WithRedisDeletePublish(topic string, cli redis.UniversalClient) Option { - return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { - data, err := json.Marshal(key) - if err != nil { - log.ZError(ctx, "json marshal error", err, "topic", topic, "key", key) - return - } - if err := cli.Publish(ctx, topic, data).Err(); err != nil { - log.ZError(ctx, "redis publish error", err, "topic", topic, "key", key) - } else { - log.ZDebug(ctx, "redis publish success", "topic", topic, "key", key) - } - }) -} - -func WithRedisDelete(cli redis.UniversalClient) Option { - return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { - for _, s := range key { - if err := cli.Del(ctx, s).Err(); err != nil { - log.ZError(ctx, "redis delete error", err, "key", s) - } else { - log.ZDebug(ctx, "redis delete success", "key", s) - } - } - }) -} - -func WithRocksCacheDelete(cli *rockscache.Client) Option { - return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { - for _, k := range key { - if err := cli.TagAsDeleted2(ctx, k); err != nil { - log.ZError(ctx, "rocksdb delete error", err, "key", k) - } else { - log.ZDebug(ctx, "rocksdb delete success", "key", k) - } - } - }) -} diff --git a/pkg/common/localcache/lru_test.go b/pkg/common/localcache/lru_test.go deleted file mode 100644 index fff925eaa..000000000 --- a/pkg/common/localcache/lru_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package localcache - -//func TestName(t *testing.T) { -// target := &cacheTarget{} -// l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target, nil) -// //l := NewLRU[string, string](1000, time.Second*20, time.Second*5, target) -// -// fn := func(key string, n int, fetch func() (string, error)) { -// for i := 0; i < n; i++ { -// //v, err := l.Get(key, fetch) -// //if err == nil { -// // t.Log("key", key, "value", v) -// //} else { -// // t.Error("key", key, err) -// //} -// l.Get(key, fetch) -// //time.Sleep(time.Second / 100) -// } -// } -// -// tmp := make(map[string]struct{}) -// -// var wg sync.WaitGroup -// for i := 0; i < 10000; i++ { -// wg.Add(1) -// key := fmt.Sprintf("key_%d", i%200) -// tmp[key] = struct{}{} -// go func() { -// defer wg.Done() -// //t.Log(key) -// fn(key, 10000, func() (string, error) { -// //time.Sleep(time.Second * 3) -// //t.Log(time.Now(), "key", key, "fetch") -// //if rand.Uint32()%5 == 0 { -// // return "value_" + key, nil -// //} -// //return "", errors.New("rand error") -// return "value_" + key, nil -// }) -// }() -// -// //wg.Add(1) -// //go func() { -// // defer wg.Done() -// // for i := 0; i < 10; i++ { -// // l.Del(key) -// // time.Sleep(time.Second / 3) -// // } -// //}() -// } -// wg.Wait() -// t.Log(len(tmp)) -// t.Log(target.String()) -// -//} diff --git a/pkg/common/localcache/singleflight.go b/pkg/common/localcache/singleflight.go deleted file mode 100644 index 5dcd70669..000000000 --- a/pkg/common/localcache/singleflight.go +++ /dev/null @@ -1,43 +0,0 @@ -package localcache - -import "sync" - -type call[K comparable, V any] struct { - wg sync.WaitGroup - val V - err error -} - -type SingleFlight[K comparable, V any] struct { - mu sync.Mutex - m map[K]*call[K, V] -} - -func NewSingleFlight[K comparable, V any]() *SingleFlight[K, V] { - return &SingleFlight[K, V]{m: make(map[K]*call[K, V])} -} - -func (r *SingleFlight[K, V]) Do(key K, fn func() (V, error)) (V, error) { - r.mu.Lock() - if r.m == nil { - r.m = make(map[K]*call[K, V]) - } - if c, ok := r.m[key]; ok { - r.mu.Unlock() - c.wg.Wait() - return c.val, c.err - } - c := new(call[K, V]) - c.wg.Add(1) - r.m[key] = c - r.mu.Unlock() - - c.val, c.err = fn() - c.wg.Done() - - r.mu.Lock() - delete(r.m, key) - r.mu.Unlock() - - return c.val, c.err -} diff --git a/pkg/common/localcache/target.go b/pkg/common/localcache/target.go deleted file mode 100644 index 2edf51ddb..000000000 --- a/pkg/common/localcache/target.go +++ /dev/null @@ -1,59 +0,0 @@ -package localcache - -import ( - "fmt" - "sync/atomic" -) - -type Target interface { - IncrGetHit() - IncrGetSuccess() - IncrGetFailed() - - IncrDelHit() - IncrDelNotFound() -} - -type cacheTarget struct { - getHit int64 - getSuccess int64 - getFailed int64 - delHit int64 - delNotFound int64 -} - -func (r *cacheTarget) IncrGetHit() { - atomic.AddInt64(&r.getHit, 1) -} - -func (r *cacheTarget) IncrGetSuccess() { - atomic.AddInt64(&r.getSuccess, 1) -} - -func (r *cacheTarget) IncrGetFailed() { - atomic.AddInt64(&r.getFailed, 1) -} - -func (r *cacheTarget) IncrDelHit() { - atomic.AddInt64(&r.delHit, 1) -} - -func (r *cacheTarget) IncrDelNotFound() { - atomic.AddInt64(&r.delNotFound, 1) -} - -func (r *cacheTarget) String() string { - return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound) -} - -type emptyTarget struct{} - -func (e emptyTarget) IncrGetHit() {} - -func (e emptyTarget) IncrGetSuccess() {} - -func (e emptyTarget) IncrGetFailed() {} - -func (e emptyTarget) IncrDelHit() {} - -func (e emptyTarget) IncrDelNotFound() {} diff --git a/pkg/common/localcache/timingwheel.go b/pkg/common/localcache/timingwheel.go deleted file mode 100644 index ca1bac033..000000000 --- a/pkg/common/localcache/timingwheel.go +++ /dev/null @@ -1,71 +0,0 @@ -package localcache - -import ( - "sync" - "time" -) - -type Execute[K comparable, V any] func(K, V) - -type Task[K comparable, V any] struct { - key K - value V -} - -type TimeWheel[K comparable, V any] struct { - ticker *time.Ticker - slots [][]Task[K, V] - currentPos int - size int - slotMutex sync.Mutex - execute Execute[K, V] -} - -func NewTimeWheel[K comparable, V any](size int, tickDuration time.Duration, execute Execute[K, V]) *TimeWheel[K, V] { - return &TimeWheel[K, V]{ - ticker: time.NewTicker(tickDuration), - slots: make([][]Task[K, V], size), - currentPos: 0, - size: size, - execute: execute, - } -} - -func (tw *TimeWheel[K, V]) Start() { - for range tw.ticker.C { - tw.tick() - } -} - -func (tw *TimeWheel[K, V]) Stop() { - tw.ticker.Stop() -} - -func (tw *TimeWheel[K, V]) tick() { - tw.slotMutex.Lock() - defer tw.slotMutex.Unlock() - - tasks := tw.slots[tw.currentPos] - tw.slots[tw.currentPos] = nil - if len(tasks) > 0 { - go func(tasks []Task[K, V]) { - for _, task := range tasks { - tw.execute(task.key, task.value) - } - }(tasks) - } - - tw.currentPos = (tw.currentPos + 1) % tw.size -} - -func (tw *TimeWheel[K, V]) AddTask(delay int, task Task[K, V]) { - if delay < 0 || delay >= tw.size { - return - } - - tw.slotMutex.Lock() - defer tw.slotMutex.Unlock() - - pos := (tw.currentPos + delay) % tw.size - tw.slots[pos] = append(tw.slots[pos], task) -} diff --git a/pkg/localcache/business.go b/pkg/localcache/business.go new file mode 100644 index 000000000..c5260b3e2 --- /dev/null +++ b/pkg/localcache/business.go @@ -0,0 +1,71 @@ +package localcache + +// +//import ( +// "context" +// "encoding/json" +// "github.com/OpenIMSDK/tools/log" +// "github.com/dtm-labs/rockscache" +// "github.com/redis/go-redis/v9" +//) +// +//func WithRedisDeleteSubscribe(topic string, cli redis.UniversalClient) Option { +// return WithDeleteLocal(func(fn func(key ...string)) { +// if fn == nil { +// log.ZDebug(context.Background(), "WithRedisDeleteSubscribe fn is nil", "topic", topic) +// return +// } +// msg := cli.Subscribe(context.Background(), topic).Channel() +// for m := range msg { +// log.ZDebug(context.Background(), "WithRedisDeleteSubscribe delete", "topic", m.Channel, "payload", m.Payload) +// var key []string +// if err := json.Unmarshal([]byte(m.Payload), &key); err != nil { +// log.ZError(context.Background(), "WithRedisDeleteSubscribe json unmarshal error", err, "topic", topic, "payload", m.Payload) +// continue +// } +// if len(key) == 0 { +// continue +// } +// fn(key...) +// } +// }) +//} +// +//func WithRedisDeletePublish(topic string, cli redis.UniversalClient) Option { +// return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { +// data, err := json.Marshal(key) +// if err != nil { +// log.ZError(ctx, "json marshal error", err, "topic", topic, "key", key) +// return +// } +// if err := cli.Publish(ctx, topic, data).Err(); err != nil { +// log.ZError(ctx, "redis publish error", err, "topic", topic, "key", key) +// } else { +// log.ZDebug(ctx, "redis publish success", "topic", topic, "key", key) +// } +// }) +//} +// +//func WithRedisDelete(cli redis.UniversalClient) Option { +// return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { +// for _, s := range key { +// if err := cli.Del(ctx, s).Err(); err != nil { +// log.ZError(ctx, "redis delete error", err, "key", s) +// } else { +// log.ZDebug(ctx, "redis delete success", "key", s) +// } +// } +// }) +//} +// +//func WithRocksCacheDelete(cli *rockscache.Client) Option { +// return WithDeleteKeyBefore(func(ctx context.Context, key ...string) { +// for _, k := range key { +// if err := cli.TagAsDeleted2(ctx, k); err != nil { +// log.ZError(ctx, "rocksdb delete error", err, "key", k) +// } else { +// log.ZDebug(ctx, "rocksdb delete success", "key", k) +// } +// } +// }) +//} diff --git a/pkg/common/localcache/cache.go b/pkg/localcache/cache.go similarity index 88% rename from pkg/common/localcache/cache.go rename to pkg/localcache/cache.go index 56cf14ff8..2d2e8018e 100644 --- a/pkg/common/localcache/cache.go +++ b/pkg/localcache/cache.go @@ -2,9 +2,9 @@ package localcache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/link" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local" - lopt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" + "github.com/openimsdk/localcache/link" + "github.com/openimsdk/localcache/local" + lopt "github.com/openimsdk/localcache/option" ) type Cache[V any] interface { diff --git a/pkg/localcache/go.mod b/pkg/localcache/go.mod new file mode 100644 index 000000000..5f0793042 --- /dev/null +++ b/pkg/localcache/go.mod @@ -0,0 +1,5 @@ +module github.com/openimsdk/localcache + +go 1.19 + +require github.com/hashicorp/golang-lru/v2 v2.0.7 diff --git a/pkg/common/localcache/link/link.go b/pkg/localcache/link/link.go similarity index 100% rename from pkg/common/localcache/link/link.go rename to pkg/localcache/link/link.go diff --git a/pkg/common/localcache/link/link_test.go b/pkg/localcache/link/link_test.go similarity index 100% rename from pkg/common/localcache/link/link_test.go rename to pkg/localcache/link/link_test.go diff --git a/pkg/localcache/local/cache.go b/pkg/localcache/local/cache.go new file mode 100644 index 000000000..06569965e --- /dev/null +++ b/pkg/localcache/local/cache.go @@ -0,0 +1,50 @@ +package local + +import ( + "hash/fnv" + "time" + "unsafe" +) + +type Cache[V any] interface { + Get(key string, fetch func() (V, error)) (V, error) + Del(key string) bool +} + +func NewCache[V any](slotNum, slotSize int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[string, V]) Cache[V] { + c := &slot[V]{ + n: uint64(slotNum), + slots: make([]*LRU[string, V], slotNum), + target: target, + } + for i := 0; i < slotNum; i++ { + c.slots[i] = NewLRU[string, V](slotSize, successTTL, failedTTL, c.target, onEvict) + } + return c +} + +type slot[V any] struct { + n uint64 + slots []*LRU[string, V] + target Target +} + +func (c *slot[V]) index(s string) uint64 { + h := fnv.New64a() + _, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s))) + return h.Sum64() % c.n +} + +func (c *slot[V]) Get(key string, fetch func() (V, error)) (V, error) { + return c.slots[c.index(key)].Get(key, fetch) +} + +func (c *slot[V]) Del(key string) bool { + if c.slots[c.index(key)].Del(key) { + c.target.IncrDelHit() + return true + } else { + c.target.IncrDelNotFound() + return false + } +} diff --git a/pkg/common/localcache/callback.go b/pkg/localcache/local/callback.go similarity index 86% rename from pkg/common/localcache/callback.go rename to pkg/localcache/local/callback.go index 4bd37a2c2..32aef112b 100644 --- a/pkg/common/localcache/callback.go +++ b/pkg/localcache/local/callback.go @@ -1,4 +1,4 @@ -package localcache +package local import "github.com/hashicorp/golang-lru/v2/simplelru" diff --git a/pkg/common/localcache/lru.go b/pkg/localcache/local/lru.go similarity index 91% rename from pkg/common/localcache/lru.go rename to pkg/localcache/local/lru.go index 4fd1704d2..45dc3b651 100644 --- a/pkg/common/localcache/lru.go +++ b/pkg/localcache/local/lru.go @@ -1,4 +1,4 @@ -package localcache +package local import ( "github.com/hashicorp/golang-lru/v2/simplelru" @@ -30,7 +30,6 @@ func NewLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, successTTL: successTTL, failedTTL: failedTTL, target: target, - s: NewSingleFlight[K, V](), } } @@ -40,7 +39,6 @@ type LRU[K comparable, V any] struct { successTTL time.Duration failedTTL time.Duration target Target - s *SingleFlight[K, V] } func (x *LRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { @@ -80,10 +78,5 @@ func (x *LRU[K, V]) Del(key K) bool { x.lock.Lock() ok := x.core.Remove(key) x.lock.Unlock() - if ok { - x.target.IncrDelHit() - } else { - x.target.IncrDelNotFound() - } return ok } diff --git a/pkg/localcache/local/lru_test.go b/pkg/localcache/local/lru_test.go new file mode 100644 index 000000000..a6e7553ee --- /dev/null +++ b/pkg/localcache/local/lru_test.go @@ -0,0 +1,95 @@ +package local + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" +) + +type cacheTarget struct { + getHit int64 + getSuccess int64 + getFailed int64 + delHit int64 + delNotFound int64 +} + +func (r *cacheTarget) IncrGetHit() { + atomic.AddInt64(&r.getHit, 1) +} + +func (r *cacheTarget) IncrGetSuccess() { + atomic.AddInt64(&r.getSuccess, 1) +} + +func (r *cacheTarget) IncrGetFailed() { + atomic.AddInt64(&r.getFailed, 1) +} + +func (r *cacheTarget) IncrDelHit() { + atomic.AddInt64(&r.delHit, 1) +} + +func (r *cacheTarget) IncrDelNotFound() { + atomic.AddInt64(&r.delNotFound, 1) +} + +func (r *cacheTarget) String() string { + return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound) +} + +func TestName(t *testing.T) { + target := &cacheTarget{} + l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target, nil) + //l := NewLRU[string, string](1000, time.Second*20, time.Second*5, target) + + fn := func(key string, n int, fetch func() (string, error)) { + for i := 0; i < n; i++ { + //v, err := l.Get(key, fetch) + //if err == nil { + // t.Log("key", key, "value", v) + //} else { + // t.Error("key", key, err) + //} + l.Get(key, fetch) + //time.Sleep(time.Second / 100) + } + } + + tmp := make(map[string]struct{}) + + var wg sync.WaitGroup + for i := 0; i < 10000; i++ { + wg.Add(1) + key := fmt.Sprintf("key_%d", i%200) + tmp[key] = struct{}{} + go func() { + defer wg.Done() + //t.Log(key) + fn(key, 10000, func() (string, error) { + //time.Sleep(time.Second * 3) + //t.Log(time.Now(), "key", key, "fetch") + //if rand.Uint32()%5 == 0 { + // return "value_" + key, nil + //} + //return "", errors.New("rand error") + return "value_" + key, nil + }) + }() + + //wg.Add(1) + //go func() { + // defer wg.Done() + // for i := 0; i < 10; i++ { + // l.Del(key) + // time.Sleep(time.Second / 3) + // } + //}() + } + wg.Wait() + t.Log(len(tmp)) + t.Log(target.String()) + +} diff --git a/pkg/localcache/local/target.go b/pkg/localcache/local/target.go new file mode 100644 index 000000000..6cb134fb0 --- /dev/null +++ b/pkg/localcache/local/target.go @@ -0,0 +1,10 @@ +package local + +type Target interface { + IncrGetHit() + IncrGetSuccess() + IncrGetFailed() + + IncrDelHit() + IncrDelNotFound() +} diff --git a/pkg/common/localcache/option.go b/pkg/localcache/option.go similarity index 97% rename from pkg/common/localcache/option.go rename to pkg/localcache/option.go index f23a04e68..9f77bc502 100644 --- a/pkg/common/localcache/option.go +++ b/pkg/localcache/option.go @@ -2,7 +2,7 @@ package localcache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local" + "github.com/openimsdk/localcache/local" "time" ) diff --git a/pkg/common/localcache/option/option.go b/pkg/localcache/option/option.go similarity index 100% rename from pkg/common/localcache/option/option.go rename to pkg/localcache/option/option.go diff --git a/pkg/common/localcache/tool.go b/pkg/localcache/tool.go similarity index 100% rename from pkg/common/localcache/tool.go rename to pkg/localcache/tool.go diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 181b8fd8a..5a76990d0 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -2,9 +2,9 @@ package rpccache import ( "context" + "github.com/openimsdk/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/redis/go-redis/v9" ) diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go index 82d0a03a5..6d640ea8f 100644 --- a/pkg/rpccache/friend.go +++ b/pkg/rpccache/friend.go @@ -3,10 +3,10 @@ package rpccache import ( "context" "github.com/OpenIMSDK/tools/log" + "github.com/openimsdk/localcache" + "github.com/openimsdk/localcache/option" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/redis/go-redis/v9" ) diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go index 0c3bc6b93..3c3333b92 100644 --- a/pkg/rpccache/group.go +++ b/pkg/rpccache/group.go @@ -2,9 +2,9 @@ package rpccache import ( "context" + "github.com/openimsdk/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/redis/go-redis/v9" ) From 749eb118307c5a2dd190c1855f68cb61dfdf06ef Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 12 Jan 2024 18:23:15 +0800 Subject: [PATCH 3/3] feat: local cache --- pkg/localcache/local/lru.go | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/pkg/localcache/local/lru.go b/pkg/localcache/local/lru.go index 45dc3b651..5ce0f1cc4 100644 --- a/pkg/localcache/local/lru.go +++ b/pkg/localcache/local/lru.go @@ -6,10 +6,32 @@ import ( "time" ) +type LRU1[K comparable, V any] interface { + Get(key K, fetch func() (V, error)) (V, error) + Del(key K) bool + Stop() +} + +//type expirableLRU[K comparable, V any] struct { +// core expirable.LRU[K, V] +//} +// +//func (x *expirableLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { +// +// return x.core.Get(key, fetch) +//} +// +//func (x *expirableLRU[K, V]) Del(key K) bool { +// return x.core.Remove(key) +//} +// +//func (x *expirableLRU[K, V]) Stop() { +// +//} + type waitItem[V any] struct { lock sync.Mutex expires int64 - active bool err error value V } @@ -80,3 +102,7 @@ func (x *LRU[K, V]) Del(key K) bool { x.lock.Unlock() return ok } + +func (x *LRU[K, V]) Stop() { + +}