From c9193302a83dccb760febba742f5946c0897ad20 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 15 Jan 2024 15:53:40 +0800 Subject: [PATCH 1/6] feat: local cache --- internal/rpc/msg/verify.go | 14 ++++++------- pkg/localcache/option.go | 9 --------- pkg/rpccache/group.go | 41 +++++++++++++++++++++++++++++++++++--- 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index e316ef88d..9f6ca727a 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -95,13 +95,13 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe data.MsgData.ContentType >= constant.NotificationBegin { return nil } - // memberIDs, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, data.MsgData.GroupID) - // if err != nil { - // return err - // } - // if !utils.IsContain(data.MsgData.SendID, memberIDs) { - // return errs.ErrNotInGroupYet.Wrap() - // } + memberIDs, err := m.GroupLocalCache.GetGroupMemberIDMap(ctx, data.MsgData.GroupID) + if err != nil { + return err + } + if _, ok := memberIDs[data.MsgData.SendID]; !ok { + return errs.ErrNotInGroupYet.Wrap() + } groupMemberInfo, err := m.Group.GetGroupMemberCache(ctx, data.MsgData.GroupID, data.MsgData.SendID) if err != nil { diff --git a/pkg/localcache/option.go b/pkg/localcache/option.go index 178faf273..40f56f2d0 100644 --- a/pkg/localcache/option.go +++ b/pkg/localcache/option.go @@ -107,15 +107,6 @@ func WithDeleteKeyBefore(fn func(ctx context.Context, key ...string)) Option { } } -func WithDeleteLocal(fn func(fn func(key ...string))) Option { - if fn == nil { - panic("fn should not be nil") - } - return func(o *option) { - o.delCh = fn - } -} - type emptyTarget struct{} func (e emptyTarget) IncrGetHit() {} diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go index d3617bfc4..333966fbb 100644 --- a/pkg/rpccache/group.go +++ b/pkg/rpccache/group.go @@ -24,8 +24,43 @@ type GroupLocalCache struct { local localcache.Cache[any] } -func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { - return localcache.AnyValue[[]string](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) { - return g.client.GetGroupMemberIDs(ctx, groupID) +type listMap[V comparable] struct { + List []V + Map map[V]struct{} +} + +func newListMap[V comparable](values []V, err error) (*listMap[V], error) { + if err != nil { + return nil, err + } + lm := &listMap[V]{ + List: values, + Map: make(map[V]struct{}, len(values)), + } + for _, value := range values { + lm.Map[value] = struct{}{} + } + return lm, nil +} + +func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (*listMap[string], error) { + return localcache.AnyValue[*listMap[string]](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) { + return newListMap(g.client.GetGroupMemberIDs(ctx, groupID)) })) } + +func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { + res, err := g.getGroupMemberIDs(ctx, groupID) + if err != nil { + return nil, err + } + return res.List, nil +} + +func (g *GroupLocalCache) GetGroupMemberIDMap(ctx context.Context, groupID string) (map[string]struct{}, error) { + res, err := g.getGroupMemberIDs(ctx, groupID) + if err != nil { + return nil, err + } + return res.Map, nil +} From ad5e4e15408d6ff1ccc51e02775f648a14863da8 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 15 Jan 2024 16:02:34 +0800 Subject: [PATCH 2/6] feat: local cache --- pkg/rpccache/conversation.go | 9 ++++++--- pkg/rpccache/friend.go | 9 ++++++--- pkg/rpccache/group.go | 9 ++++++--- pkg/rpccache/subscriber.go | 23 +++++++++++++++++++++++ 4 files changed, 41 insertions(+), 9 deletions(-) create mode 100644 pkg/rpccache/subscriber.go diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index a0aba105a..b6e190bb9 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -10,13 +10,16 @@ import ( ) func NewConversationLocalCache(client rpcclient.ConversationRpcClient, cli redis.UniversalClient) *ConversationLocalCache { - return &ConversationLocalCache{ + lc := config.Config.LocalCache.Conversation + x := &ConversationLocalCache{ client: client, local: localcache.New[any]( - localcache.WithLocalSlotNum(config.Config.LocalCache.Conversation.SlotNum), - localcache.WithLocalSlotSize(config.Config.LocalCache.Conversation.SlotSize), + localcache.WithLocalSlotNum(lc.SlotNum), + localcache.WithLocalSlotSize(lc.SlotSize), ), } + go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) + return x } type ConversationLocalCache struct { diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go index fc5f0e96c..0e1faa78b 100644 --- a/pkg/rpccache/friend.go +++ b/pkg/rpccache/friend.go @@ -11,13 +11,16 @@ import ( ) func NewFriendLocalCache(client rpcclient.FriendRpcClient, cli redis.UniversalClient) *FriendLocalCache { - return &FriendLocalCache{ + lc := config.Config.LocalCache.Friend + x := &FriendLocalCache{ client: client, local: localcache.New[any]( - localcache.WithLocalSlotNum(config.Config.LocalCache.Friend.SlotNum), - localcache.WithLocalSlotSize(config.Config.LocalCache.Friend.SlotSize), + localcache.WithLocalSlotNum(lc.SlotNum), + localcache.WithLocalSlotSize(lc.SlotSize), ), } + go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) + return x } type FriendLocalCache struct { diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go index 333966fbb..6e5595a42 100644 --- a/pkg/rpccache/group.go +++ b/pkg/rpccache/group.go @@ -10,13 +10,16 @@ import ( ) func NewGroupLocalCache(client rpcclient.GroupRpcClient, cli redis.UniversalClient) *GroupLocalCache { - return &GroupLocalCache{ + lc := config.Config.LocalCache.Group + x := &GroupLocalCache{ client: client, local: localcache.New[any]( - localcache.WithLocalSlotNum(config.Config.LocalCache.Group.SlotNum), - localcache.WithLocalSlotSize(config.Config.LocalCache.Group.SlotSize), + localcache.WithLocalSlotNum(lc.SlotNum), + localcache.WithLocalSlotSize(lc.SlotSize), ), } + go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) + return x } type GroupLocalCache struct { diff --git a/pkg/rpccache/subscriber.go b/pkg/rpccache/subscriber.go new file mode 100644 index 000000000..571ff6d2d --- /dev/null +++ b/pkg/rpccache/subscriber.go @@ -0,0 +1,23 @@ +package rpccache + +import ( + "context" + "encoding/json" + "github.com/OpenIMSDK/tools/log" + "github.com/redis/go-redis/v9" +) + +func subscriberRedisDeleteCache(ctx context.Context, client redis.UniversalClient, channel string, del func(ctx context.Context, key ...string)) { + for message := range client.Subscribe(ctx, channel).Channel() { + log.ZDebug(ctx, "subscriberRedisDeleteCache", "channel", channel, "payload", message.Payload) + var keys []string + if err := json.Unmarshal([]byte(message.Payload), &keys); err != nil { + log.ZError(ctx, "subscriberRedisDeleteCache json.Unmarshal error", err) + continue + } + if len(keys) == 0 { + continue + } + del(ctx, keys...) + } +} From 32221da613bd42b67a2e556567580cff097dc35e Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 15 Jan 2024 16:27:02 +0800 Subject: [PATCH 3/6] feat: local cache --- pkg/localcache/option.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/localcache/option.go b/pkg/localcache/option.go index 40f56f2d0..17780161a 100644 --- a/pkg/localcache/option.go +++ b/pkg/localcache/option.go @@ -27,7 +27,6 @@ type option struct { localSuccessTTL time.Duration localFailedTTL time.Duration delFn []func(ctx context.Context, key ...string) - delCh func(fn func(key ...string)) target lru.Target } From 1db6e3b3897888da5f8f5e959f0d6b9c8e9c41f3 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 15 Jan 2024 16:33:25 +0800 Subject: [PATCH 4/6] feat: local cache --- pkg/common/cachekey/user.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 pkg/common/cachekey/user.go diff --git a/pkg/common/cachekey/user.go b/pkg/common/cachekey/user.go new file mode 100644 index 000000000..fbea5168b --- /dev/null +++ b/pkg/common/cachekey/user.go @@ -0,0 +1,14 @@ +package cachekey + +const ( + userInfoKey = "USER_INFO:" + userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:" +) + +func GetUserInfoKey(userID string) string { + return userInfoKey + userID +} + +func GetUserGlobalRecvMsgOptKey(userID string) string { + return userGlobalRecvMsgOptKey + userID +} From 656bfef069bb0206d1a53fd2c1f22f4d5af1f5af Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 15 Jan 2024 16:48:21 +0800 Subject: [PATCH 5/6] feat: local cache --- pkg/rpccache/conversation.go | 12 +++++++++++- pkg/rpccache/group.go | 12 +++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index b6e190bb9..e5c28f464 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -2,6 +2,7 @@ package rpccache import ( "context" + "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -27,8 +28,17 @@ type ConversationLocalCache struct { local localcache.Cache[any] } -func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { +func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) (val []string, err error) { + log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs req", "ownerUserID", ownerUserID) + defer func() { + if err == nil { + log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs return", "value", val) + } else { + log.ZError(ctx, "ConversationLocalCache GetConversationIDs return", err) + } + }() return localcache.AnyValue[[]string](c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs rpc", "ownerUserID", ownerUserID) return c.client.GetConversationIDs(ctx, ownerUserID) })) } diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go index 6e5595a42..4b9281366 100644 --- a/pkg/rpccache/group.go +++ b/pkg/rpccache/group.go @@ -2,6 +2,7 @@ package rpccache import ( "context" + "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -46,8 +47,17 @@ func newListMap[V comparable](values []V, err error) (*listMap[V], error) { return lm, nil } -func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (*listMap[string], error) { +func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (val *listMap[string], err error) { + log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs req", "groupID", groupID) + defer func() { + if err == nil { + log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs return", "value", val.List) + } else { + log.ZError(ctx, "GroupLocalCache getGroupMemberIDs return", err) + } + }() return localcache.AnyValue[*listMap[string]](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs rpc", "groupID", groupID) return newListMap(g.client.GetGroupMemberIDs(ctx, groupID)) })) } From 6e1f96a7206ad08035f8c91345701372b1ef5211 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 15 Jan 2024 17:08:02 +0800 Subject: [PATCH 6/6] feat: local cache --- pkg/common/db/cache/msg.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index c1749e719..768e75a27 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -17,6 +17,7 @@ package cache import ( "context" "errors" + "github.com/dtm-labs/rockscache" "strconv" "time" @@ -128,7 +129,8 @@ type MsgModel interface { } func NewMsgCacheModel(client redis.UniversalClient) MsgModel { - return &msgCache{rdb: client} + rcClient := rockscache.NewClient(client, rockscache.NewDefaultOptions()) + return &msgCache{metaCache: NewMetaCacheRedis(rcClient), rdb: client} } type msgCache struct {