diff --git a/internal/msggateway/callback.go b/internal/msggateway/callback.go index 7d5381754..d9507c85e 100644 --- a/internal/msggateway/callback.go +++ b/internal/msggateway/callback.go @@ -105,7 +105,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err // func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID // string) cbApi.CommonCallbackResp { // callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} -// if !config.Config.Callback.CallbackUserOnline.Enable { +// if !config.Config.Callback.CallbackUserOnline.WithEnable { // return callbackResp // } // callbackUserOnlineReq := cbApi.CallbackUserOnlineReq{ @@ -134,7 +134,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err //} //func callbackUserOffline(operationID, userID string, platformID int, connID string) cbApi.CommonCallbackResp { // callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} -// if !config.Config.Callback.CallbackUserOffline.Enable { +// if !config.Config.Callback.CallbackUserOffline.WithEnable { // return callbackResp // } // callbackOfflineReq := cbApi.CallbackUserOfflineReq{ @@ -161,7 +161,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err //} //func callbackUserKickOff(operationID string, userID string, platformID int) cbApi.CommonCallbackResp { // callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} -// if !config.Config.Callback.CallbackUserKickOff.Enable { +// if !config.Config.Callback.CallbackUserKickOff.WithEnable { // return callbackResp // } // callbackUserKickOffReq := cbApi.CallbackUserKickOffReq{ diff --git a/pkg/common/localcache/cache.go b/pkg/common/localcache/cache.go index 9b8613ecf..64df1fe02 100644 --- a/pkg/common/localcache/cache.go +++ b/pkg/common/localcache/cache.go @@ -2,11 +2,13 @@ package localcache import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/link" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local" + opt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" ) type Cache[V any] interface { - Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error) + Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error) Del(ctx context.Context, key ...string) } @@ -15,7 +17,7 @@ func New[V any](opts ...Option) Cache[V] { for _, o := range opts { o(opt) } - c := &cache[V]{opt: opt} + c := &cache[V]{opt: opt, link: link.New(opt.localSlotNum)} c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) go func() { c.opt.delCh(c.del) @@ -25,11 +27,16 @@ func New[V any](opts ...Option) Cache[V] { type cache[V any] struct { opt *option + link link.Link local local.Cache[V] } func (c *cache[V]) onEvict(key string, value V) { - + for k := range c.link.Del(key) { + if key != k { + c.local.Del(k) + } + } } func (c *cache[V]) del(key ...string) { @@ -38,8 +45,15 @@ func (c *cache[V]) del(key ...string) { } } -func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error) { - if c.opt.enable { +func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error) { + enable := c.opt.enable + if len(opts) > 0 && opts[0].Enable != nil { + enable = *opts[0].Enable + } + if enable { + if len(opts) > 0 && len(opts[0].Link) > 0 { + c.link.Link(key, opts[0].Link...) + } return c.local.Get(key, func() (V, error) { return fetch(ctx) }) diff --git a/pkg/common/localcache/link/link.go b/pkg/common/localcache/link/link.go new file mode 100644 index 000000000..4f238907b --- /dev/null +++ b/pkg/common/localcache/link/link.go @@ -0,0 +1,109 @@ +package link + +import ( + "hash/fnv" + "sync" + "unsafe" +) + +type Link interface { + Link(key string, link ...string) + Del(key string) map[string]struct{} +} + +func newLinkKey() *linkKey { + return &linkKey{ + data: make(map[string]map[string]struct{}), + } +} + +type linkKey struct { + lock sync.Mutex + data map[string]map[string]struct{} +} + +func (x *linkKey) link(key string, link ...string) { + x.lock.Lock() + defer x.lock.Unlock() + v, ok := x.data[key] + if !ok { + v = make(map[string]struct{}) + x.data[key] = v + } + for _, k := range link { + v[k] = struct{}{} + } +} + +func (x *linkKey) del(key string) map[string]struct{} { + x.lock.Lock() + defer x.lock.Unlock() + ks, ok := x.data[key] + if !ok { + return nil + } + delete(x.data, key) + return ks +} + +func New(n int) Link { + if n <= 0 { + panic("must be greater than 0") + } + slots := make([]*linkKey, n) + for i := 0; i < len(slots); i++ { + slots[i] = newLinkKey() + } + return &slot{ + n: uint64(n), + slots: slots, + } +} + +type slot struct { + n uint64 + slots []*linkKey +} + +func (x *slot) index(s string) uint64 { + h := fnv.New64a() + _, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s))) + return h.Sum64() % x.n +} + +func (x *slot) Link(key string, link ...string) { + if len(link) == 0 { + return + } + mk := key + lks := make([]string, len(link)) + for i, k := range link { + lks[i] = k + } + x.slots[x.index(mk)].link(mk, lks...) + for _, lk := range lks { + x.slots[x.index(lk)].link(lk, mk) + } +} + +func (x *slot) Del(key string) map[string]struct{} { + return x.delKey(key) +} + +func (x *slot) delKey(k string) map[string]struct{} { + del := make(map[string]struct{}) + stack := []string{k} + for len(stack) > 0 { + curr := stack[len(stack)-1] + stack = stack[:len(stack)-1] + if _, ok := del[curr]; ok { + continue + } + del[curr] = struct{}{} + childKeys := x.slots[x.index(curr)].del(curr) + for ck := range childKeys { + stack = append(stack, ck) + } + } + return del +} diff --git a/pkg/common/localcache/link/link_test.go b/pkg/common/localcache/link/link_test.go new file mode 100644 index 000000000..994c26ec7 --- /dev/null +++ b/pkg/common/localcache/link/link_test.go @@ -0,0 +1,24 @@ +package link + +import ( + "testing" +) + +func TestName(t *testing.T) { + + v := New(1) + + //v.Link("a:1", "b:1", "c:1", "d:1") + v.Link("a:1", "b:1", "c:1") + v.Link("z:1", "b:1") + + //v.DelKey("a:1") + v.Del("z:1") + + t.Log(v.slots[0].data) + + for k, v := range v.slots[0].data { + t.Log(k, v) + } + +} diff --git a/pkg/common/localcache/option/option.go b/pkg/common/localcache/option/option.go index adaaa3a45..798c93ba5 100644 --- a/pkg/common/localcache/option/option.go +++ b/pkg/common/localcache/option/option.go @@ -1,31 +1,32 @@ package option -var ( - t = true - f = false -) +func NewOption() *Option { + return &Option{} +} type Option struct { - enable *bool - key []string + Enable *bool + Link []string } -func (o *Option) Enable() *Option { - o.enable = &t +func (o *Option) WithEnable() *Option { + t := true + o.Enable = &t return o } -func (o *Option) Disable() *Option { - o.enable = &f +func (o *Option) WithDisable() *Option { + f := false + o.Enable = &f return o } -func (o *Option) DelKey(key ...string) *Option { +func (o *Option) WithLink(key ...string) *Option { if len(key) > 0 { - if o.key == nil { - o.key = key + if len(o.Link) == 0 { + o.Link = key } else { - o.key = append(o.key, key...) + o.Link = append(o.Link, key...) } } return o diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go index 0585b2e9d..68c6a736a 100644 --- a/pkg/rpccache/friend.go +++ b/pkg/rpccache/friend.go @@ -4,6 +4,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache" + "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" ) @@ -28,11 +29,11 @@ func (f *FriendLocalCache) GetFriendIDs(ctx context.Context, ownerUserID string) func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (bool, error) { return localcache.AnyValue[bool](f.local.Get(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) (any, error) { return f.client.IsFriend(ctx, possibleFriendUserID, userID) - })) + }, option.NewOption().WithLink(cachekey.GetFriendIDsKey(possibleFriendUserID), cachekey.GetFriendIDsKey(userID)))) } func (f *FriendLocalCache) IsBlocked(ctx context.Context, possibleBlackUserID, userID string) (bool, error) { return localcache.AnyValue[bool](f.local.Get(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) (any, error) { - return f.client.IsFriend(ctx, possibleBlackUserID, userID) - })) + return f.client.IsBlocked(ctx, possibleBlackUserID, userID) + }, option.NewOption().WithLink(cachekey.GetBlackIDsKey(possibleBlackUserID), cachekey.GetBlackIDsKey(userID)))) }