diff --git a/internal/rpc/msg/verify.go b/internal/rpc/msg/verify.go index e316ef88d..9f6ca727a 100644 --- a/internal/rpc/msg/verify.go +++ b/internal/rpc/msg/verify.go @@ -95,13 +95,13 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe data.MsgData.ContentType >= constant.NotificationBegin { return nil } - // memberIDs, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, data.MsgData.GroupID) - // if err != nil { - // return err - // } - // if !utils.IsContain(data.MsgData.SendID, memberIDs) { - // return errs.ErrNotInGroupYet.Wrap() - // } + memberIDs, err := m.GroupLocalCache.GetGroupMemberIDMap(ctx, data.MsgData.GroupID) + if err != nil { + return err + } + if _, ok := memberIDs[data.MsgData.SendID]; !ok { + return errs.ErrNotInGroupYet.Wrap() + } groupMemberInfo, err := m.Group.GetGroupMemberCache(ctx, data.MsgData.GroupID, data.MsgData.SendID) if err != nil { diff --git a/pkg/common/cachekey/user.go b/pkg/common/cachekey/user.go new file mode 100644 index 000000000..fbea5168b --- /dev/null +++ b/pkg/common/cachekey/user.go @@ -0,0 +1,14 @@ +package cachekey + +const ( + userInfoKey = "USER_INFO:" + userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:" +) + +func GetUserInfoKey(userID string) string { + return userInfoKey + userID +} + +func GetUserGlobalRecvMsgOptKey(userID string) string { + return userGlobalRecvMsgOptKey + userID +} diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index c1749e719..768e75a27 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -17,6 +17,7 @@ package cache import ( "context" "errors" + "github.com/dtm-labs/rockscache" "strconv" "time" @@ -128,7 +129,8 @@ type MsgModel interface { } func NewMsgCacheModel(client redis.UniversalClient) MsgModel { - return &msgCache{rdb: client} + rcClient := rockscache.NewClient(client, rockscache.NewDefaultOptions()) + return &msgCache{metaCache: NewMetaCacheRedis(rcClient), rdb: client} } type msgCache struct { diff --git a/pkg/localcache/option.go b/pkg/localcache/option.go index 178faf273..17780161a 100644 --- a/pkg/localcache/option.go +++ b/pkg/localcache/option.go @@ -27,7 +27,6 @@ type option struct { localSuccessTTL time.Duration localFailedTTL time.Duration delFn []func(ctx context.Context, key ...string) - delCh func(fn func(key ...string)) target lru.Target } @@ -107,15 +106,6 @@ func WithDeleteKeyBefore(fn func(ctx context.Context, key ...string)) Option { } } -func WithDeleteLocal(fn func(fn func(key ...string))) Option { - if fn == nil { - panic("fn should not be nil") - } - return func(o *option) { - o.delCh = fn - } -} - type emptyTarget struct{} func (e emptyTarget) IncrGetHit() {} diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index a0aba105a..e5c28f464 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -2,6 +2,7 @@ package rpccache import ( "context" + "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -10,13 +11,16 @@ import ( ) func NewConversationLocalCache(client rpcclient.ConversationRpcClient, cli redis.UniversalClient) *ConversationLocalCache { - return &ConversationLocalCache{ + lc := config.Config.LocalCache.Conversation + x := &ConversationLocalCache{ client: client, local: localcache.New[any]( - localcache.WithLocalSlotNum(config.Config.LocalCache.Conversation.SlotNum), - localcache.WithLocalSlotSize(config.Config.LocalCache.Conversation.SlotSize), + localcache.WithLocalSlotNum(lc.SlotNum), + localcache.WithLocalSlotSize(lc.SlotSize), ), } + go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) + return x } type ConversationLocalCache struct { @@ -24,8 +28,17 @@ type ConversationLocalCache struct { local localcache.Cache[any] } -func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { +func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) (val []string, err error) { + log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs req", "ownerUserID", ownerUserID) + defer func() { + if err == nil { + log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs return", "value", val) + } else { + log.ZError(ctx, "ConversationLocalCache GetConversationIDs return", err) + } + }() return localcache.AnyValue[[]string](c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs rpc", "ownerUserID", ownerUserID) return c.client.GetConversationIDs(ctx, ownerUserID) })) } diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go index fc5f0e96c..0e1faa78b 100644 --- a/pkg/rpccache/friend.go +++ b/pkg/rpccache/friend.go @@ -11,13 +11,16 @@ import ( ) func NewFriendLocalCache(client rpcclient.FriendRpcClient, cli redis.UniversalClient) *FriendLocalCache { - return &FriendLocalCache{ + lc := config.Config.LocalCache.Friend + x := &FriendLocalCache{ client: client, local: localcache.New[any]( - localcache.WithLocalSlotNum(config.Config.LocalCache.Friend.SlotNum), - localcache.WithLocalSlotSize(config.Config.LocalCache.Friend.SlotSize), + localcache.WithLocalSlotNum(lc.SlotNum), + localcache.WithLocalSlotSize(lc.SlotSize), ), } + go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) + return x } type FriendLocalCache struct { diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go index d3617bfc4..4b9281366 100644 --- a/pkg/rpccache/group.go +++ b/pkg/rpccache/group.go @@ -2,6 +2,7 @@ package rpccache import ( "context" + "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -10,13 +11,16 @@ import ( ) func NewGroupLocalCache(client rpcclient.GroupRpcClient, cli redis.UniversalClient) *GroupLocalCache { - return &GroupLocalCache{ + lc := config.Config.LocalCache.Group + x := &GroupLocalCache{ client: client, local: localcache.New[any]( - localcache.WithLocalSlotNum(config.Config.LocalCache.Group.SlotNum), - localcache.WithLocalSlotSize(config.Config.LocalCache.Group.SlotSize), + localcache.WithLocalSlotNum(lc.SlotNum), + localcache.WithLocalSlotSize(lc.SlotSize), ), } + go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal) + return x } type GroupLocalCache struct { @@ -24,8 +28,52 @@ type GroupLocalCache struct { local localcache.Cache[any] } -func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { - return localcache.AnyValue[[]string](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) { - return g.client.GetGroupMemberIDs(ctx, groupID) +type listMap[V comparable] struct { + List []V + Map map[V]struct{} +} + +func newListMap[V comparable](values []V, err error) (*listMap[V], error) { + if err != nil { + return nil, err + } + lm := &listMap[V]{ + List: values, + Map: make(map[V]struct{}, len(values)), + } + for _, value := range values { + lm.Map[value] = struct{}{} + } + return lm, nil +} + +func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (val *listMap[string], err error) { + log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs req", "groupID", groupID) + defer func() { + if err == nil { + log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs return", "value", val.List) + } else { + log.ZError(ctx, "GroupLocalCache getGroupMemberIDs return", err) + } + }() + return localcache.AnyValue[*listMap[string]](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) { + log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs rpc", "groupID", groupID) + return newListMap(g.client.GetGroupMemberIDs(ctx, groupID)) })) } + +func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { + res, err := g.getGroupMemberIDs(ctx, groupID) + if err != nil { + return nil, err + } + return res.List, nil +} + +func (g *GroupLocalCache) GetGroupMemberIDMap(ctx context.Context, groupID string) (map[string]struct{}, error) { + res, err := g.getGroupMemberIDs(ctx, groupID) + if err != nil { + return nil, err + } + return res.Map, nil +} diff --git a/pkg/rpccache/subscriber.go b/pkg/rpccache/subscriber.go new file mode 100644 index 000000000..571ff6d2d --- /dev/null +++ b/pkg/rpccache/subscriber.go @@ -0,0 +1,23 @@ +package rpccache + +import ( + "context" + "encoding/json" + "github.com/OpenIMSDK/tools/log" + "github.com/redis/go-redis/v9" +) + +func subscriberRedisDeleteCache(ctx context.Context, client redis.UniversalClient, channel string, del func(ctx context.Context, key ...string)) { + for message := range client.Subscribe(ctx, channel).Channel() { + log.ZDebug(ctx, "subscriberRedisDeleteCache", "channel", channel, "payload", message.Payload) + var keys []string + if err := json.Unmarshal([]byte(message.Payload), &keys); err != nil { + log.ZError(ctx, "subscriberRedisDeleteCache json.Unmarshal error", err) + continue + } + if len(keys) == 0 { + continue + } + del(ctx, keys...) + } +}