mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-08-20 11:49:57 +08:00
Merge remote-tracking branch 'origin/localcache' into localcache
This commit is contained in:
commit
1e81d29e86
@ -95,13 +95,13 @@ func (m *msgServer) messageVerification(ctx context.Context, data *msg.SendMsgRe
|
||||
data.MsgData.ContentType >= constant.NotificationBegin {
|
||||
return nil
|
||||
}
|
||||
// memberIDs, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, data.MsgData.GroupID)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// if !utils.IsContain(data.MsgData.SendID, memberIDs) {
|
||||
// return errs.ErrNotInGroupYet.Wrap()
|
||||
// }
|
||||
memberIDs, err := m.GroupLocalCache.GetGroupMemberIDMap(ctx, data.MsgData.GroupID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, ok := memberIDs[data.MsgData.SendID]; !ok {
|
||||
return errs.ErrNotInGroupYet.Wrap()
|
||||
}
|
||||
|
||||
groupMemberInfo, err := m.Group.GetGroupMemberCache(ctx, data.MsgData.GroupID, data.MsgData.SendID)
|
||||
if err != nil {
|
||||
|
14
pkg/common/cachekey/user.go
Normal file
14
pkg/common/cachekey/user.go
Normal file
@ -0,0 +1,14 @@
|
||||
package cachekey
|
||||
|
||||
const (
|
||||
userInfoKey = "USER_INFO:"
|
||||
userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:"
|
||||
)
|
||||
|
||||
func GetUserInfoKey(userID string) string {
|
||||
return userInfoKey + userID
|
||||
}
|
||||
|
||||
func GetUserGlobalRecvMsgOptKey(userID string) string {
|
||||
return userGlobalRecvMsgOptKey + userID
|
||||
}
|
4
pkg/common/db/cache/msg.go
vendored
4
pkg/common/db/cache/msg.go
vendored
@ -17,6 +17,7 @@ package cache
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/dtm-labs/rockscache"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@ -128,7 +129,8 @@ type MsgModel interface {
|
||||
}
|
||||
|
||||
func NewMsgCacheModel(client redis.UniversalClient) MsgModel {
|
||||
return &msgCache{rdb: client}
|
||||
rcClient := rockscache.NewClient(client, rockscache.NewDefaultOptions())
|
||||
return &msgCache{metaCache: NewMetaCacheRedis(rcClient), rdb: client}
|
||||
}
|
||||
|
||||
type msgCache struct {
|
||||
|
@ -27,7 +27,6 @@ type option struct {
|
||||
localSuccessTTL time.Duration
|
||||
localFailedTTL time.Duration
|
||||
delFn []func(ctx context.Context, key ...string)
|
||||
delCh func(fn func(key ...string))
|
||||
target lru.Target
|
||||
}
|
||||
|
||||
@ -107,15 +106,6 @@ func WithDeleteKeyBefore(fn func(ctx context.Context, key ...string)) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func WithDeleteLocal(fn func(fn func(key ...string))) Option {
|
||||
if fn == nil {
|
||||
panic("fn should not be nil")
|
||||
}
|
||||
return func(o *option) {
|
||||
o.delCh = fn
|
||||
}
|
||||
}
|
||||
|
||||
type emptyTarget struct{}
|
||||
|
||||
func (e emptyTarget) IncrGetHit() {}
|
||||
|
@ -2,6 +2,7 @@ package rpccache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/openimsdk/localcache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
@ -10,13 +11,16 @@ import (
|
||||
)
|
||||
|
||||
func NewConversationLocalCache(client rpcclient.ConversationRpcClient, cli redis.UniversalClient) *ConversationLocalCache {
|
||||
return &ConversationLocalCache{
|
||||
lc := config.Config.LocalCache.Conversation
|
||||
x := &ConversationLocalCache{
|
||||
client: client,
|
||||
local: localcache.New[any](
|
||||
localcache.WithLocalSlotNum(config.Config.LocalCache.Conversation.SlotNum),
|
||||
localcache.WithLocalSlotSize(config.Config.LocalCache.Conversation.SlotSize),
|
||||
localcache.WithLocalSlotNum(lc.SlotNum),
|
||||
localcache.WithLocalSlotSize(lc.SlotSize),
|
||||
),
|
||||
}
|
||||
go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal)
|
||||
return x
|
||||
}
|
||||
|
||||
type ConversationLocalCache struct {
|
||||
@ -24,8 +28,17 @@ type ConversationLocalCache struct {
|
||||
local localcache.Cache[any]
|
||||
}
|
||||
|
||||
func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
|
||||
func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) (val []string, err error) {
|
||||
log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs req", "ownerUserID", ownerUserID)
|
||||
defer func() {
|
||||
if err == nil {
|
||||
log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs return", "value", val)
|
||||
} else {
|
||||
log.ZError(ctx, "ConversationLocalCache GetConversationIDs return", err)
|
||||
}
|
||||
}()
|
||||
return localcache.AnyValue[[]string](c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) (any, error) {
|
||||
log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs rpc", "ownerUserID", ownerUserID)
|
||||
return c.client.GetConversationIDs(ctx, ownerUserID)
|
||||
}))
|
||||
}
|
||||
|
@ -11,13 +11,16 @@ import (
|
||||
)
|
||||
|
||||
func NewFriendLocalCache(client rpcclient.FriendRpcClient, cli redis.UniversalClient) *FriendLocalCache {
|
||||
return &FriendLocalCache{
|
||||
lc := config.Config.LocalCache.Friend
|
||||
x := &FriendLocalCache{
|
||||
client: client,
|
||||
local: localcache.New[any](
|
||||
localcache.WithLocalSlotNum(config.Config.LocalCache.Friend.SlotNum),
|
||||
localcache.WithLocalSlotSize(config.Config.LocalCache.Friend.SlotSize),
|
||||
localcache.WithLocalSlotNum(lc.SlotNum),
|
||||
localcache.WithLocalSlotSize(lc.SlotSize),
|
||||
),
|
||||
}
|
||||
go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal)
|
||||
return x
|
||||
}
|
||||
|
||||
type FriendLocalCache struct {
|
||||
|
@ -2,6 +2,7 @@ package rpccache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/openimsdk/localcache"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
@ -10,13 +11,16 @@ import (
|
||||
)
|
||||
|
||||
func NewGroupLocalCache(client rpcclient.GroupRpcClient, cli redis.UniversalClient) *GroupLocalCache {
|
||||
return &GroupLocalCache{
|
||||
lc := config.Config.LocalCache.Group
|
||||
x := &GroupLocalCache{
|
||||
client: client,
|
||||
local: localcache.New[any](
|
||||
localcache.WithLocalSlotNum(config.Config.LocalCache.Group.SlotNum),
|
||||
localcache.WithLocalSlotSize(config.Config.LocalCache.Group.SlotSize),
|
||||
localcache.WithLocalSlotNum(lc.SlotNum),
|
||||
localcache.WithLocalSlotSize(lc.SlotSize),
|
||||
),
|
||||
}
|
||||
go subscriberRedisDeleteCache(context.Background(), cli, lc.Topic, x.local.DelLocal)
|
||||
return x
|
||||
}
|
||||
|
||||
type GroupLocalCache struct {
|
||||
@ -24,8 +28,52 @@ type GroupLocalCache struct {
|
||||
local localcache.Cache[any]
|
||||
}
|
||||
|
||||
func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) {
|
||||
return localcache.AnyValue[[]string](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) {
|
||||
return g.client.GetGroupMemberIDs(ctx, groupID)
|
||||
type listMap[V comparable] struct {
|
||||
List []V
|
||||
Map map[V]struct{}
|
||||
}
|
||||
|
||||
func newListMap[V comparable](values []V, err error) (*listMap[V], error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lm := &listMap[V]{
|
||||
List: values,
|
||||
Map: make(map[V]struct{}, len(values)),
|
||||
}
|
||||
for _, value := range values {
|
||||
lm.Map[value] = struct{}{}
|
||||
}
|
||||
return lm, nil
|
||||
}
|
||||
|
||||
func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (val *listMap[string], err error) {
|
||||
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs req", "groupID", groupID)
|
||||
defer func() {
|
||||
if err == nil {
|
||||
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs return", "value", val.List)
|
||||
} else {
|
||||
log.ZError(ctx, "GroupLocalCache getGroupMemberIDs return", err)
|
||||
}
|
||||
}()
|
||||
return localcache.AnyValue[*listMap[string]](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) {
|
||||
log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs rpc", "groupID", groupID)
|
||||
return newListMap(g.client.GetGroupMemberIDs(ctx, groupID))
|
||||
}))
|
||||
}
|
||||
|
||||
func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) {
|
||||
res, err := g.getGroupMemberIDs(ctx, groupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res.List, nil
|
||||
}
|
||||
|
||||
func (g *GroupLocalCache) GetGroupMemberIDMap(ctx context.Context, groupID string) (map[string]struct{}, error) {
|
||||
res, err := g.getGroupMemberIDs(ctx, groupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res.Map, nil
|
||||
}
|
||||
|
23
pkg/rpccache/subscriber.go
Normal file
23
pkg/rpccache/subscriber.go
Normal file
@ -0,0 +1,23 @@
|
||||
package rpccache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/OpenIMSDK/tools/log"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func subscriberRedisDeleteCache(ctx context.Context, client redis.UniversalClient, channel string, del func(ctx context.Context, key ...string)) {
|
||||
for message := range client.Subscribe(ctx, channel).Channel() {
|
||||
log.ZDebug(ctx, "subscriberRedisDeleteCache", "channel", channel, "payload", message.Payload)
|
||||
var keys []string
|
||||
if err := json.Unmarshal([]byte(message.Payload), &keys); err != nil {
|
||||
log.ZError(ctx, "subscriberRedisDeleteCache json.Unmarshal error", err)
|
||||
continue
|
||||
}
|
||||
if len(keys) == 0 {
|
||||
continue
|
||||
}
|
||||
del(ctx, keys...)
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user