feat: msg local cache

This commit is contained in:
withchao 2024-01-08 20:36:41 +08:00
parent 3d77c1c8cf
commit 938622b1fe
6 changed files with 174 additions and 25 deletions

View File

@ -105,7 +105,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err
// func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID // func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID
// string) cbApi.CommonCallbackResp { // string) cbApi.CommonCallbackResp {
// callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} // callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
// if !config.Config.Callback.CallbackUserOnline.Enable { // if !config.Config.Callback.CallbackUserOnline.WithEnable {
// return callbackResp // return callbackResp
// } // }
// callbackUserOnlineReq := cbApi.CallbackUserOnlineReq{ // callbackUserOnlineReq := cbApi.CallbackUserOnlineReq{
@ -134,7 +134,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err
//} //}
//func callbackUserOffline(operationID, userID string, platformID int, connID string) cbApi.CommonCallbackResp { //func callbackUserOffline(operationID, userID string, platformID int, connID string) cbApi.CommonCallbackResp {
// callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} // callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
// if !config.Config.Callback.CallbackUserOffline.Enable { // if !config.Config.Callback.CallbackUserOffline.WithEnable {
// return callbackResp // return callbackResp
// } // }
// callbackOfflineReq := cbApi.CallbackUserOfflineReq{ // callbackOfflineReq := cbApi.CallbackUserOfflineReq{
@ -161,7 +161,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err
//} //}
//func callbackUserKickOff(operationID string, userID string, platformID int) cbApi.CommonCallbackResp { //func callbackUserKickOff(operationID string, userID string, platformID int) cbApi.CommonCallbackResp {
// callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} // callbackResp := cbApi.CommonCallbackResp{OperationID: operationID}
// if !config.Config.Callback.CallbackUserKickOff.Enable { // if !config.Config.Callback.CallbackUserKickOff.WithEnable {
// return callbackResp // return callbackResp
// } // }
// callbackUserKickOffReq := cbApi.CallbackUserKickOffReq{ // callbackUserKickOffReq := cbApi.CallbackUserKickOffReq{

View File

@ -2,11 +2,13 @@ package localcache
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/link"
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local"
opt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option"
) )
type Cache[V any] interface { type Cache[V any] interface {
Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error)
Del(ctx context.Context, key ...string) Del(ctx context.Context, key ...string)
} }
@ -15,7 +17,7 @@ func New[V any](opts ...Option) Cache[V] {
for _, o := range opts { for _, o := range opts {
o(opt) o(opt)
} }
c := &cache[V]{opt: opt} c := &cache[V]{opt: opt, link: link.New(opt.localSlotNum)}
c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
go func() { go func() {
c.opt.delCh(c.del) c.opt.delCh(c.del)
@ -25,11 +27,16 @@ func New[V any](opts ...Option) Cache[V] {
type cache[V any] struct { type cache[V any] struct {
opt *option opt *option
link link.Link
local local.Cache[V] local local.Cache[V]
} }
func (c *cache[V]) onEvict(key string, value V) { func (c *cache[V]) onEvict(key string, value V) {
for k := range c.link.Del(key) {
if key != k {
c.local.Del(k)
}
}
} }
func (c *cache[V]) del(key ...string) { func (c *cache[V]) del(key ...string) {
@ -38,8 +45,15 @@ func (c *cache[V]) del(key ...string) {
} }
} }
func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error) { func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error) {
if c.opt.enable { enable := c.opt.enable
if len(opts) > 0 && opts[0].Enable != nil {
enable = *opts[0].Enable
}
if enable {
if len(opts) > 0 && len(opts[0].Link) > 0 {
c.link.Link(key, opts[0].Link...)
}
return c.local.Get(key, func() (V, error) { return c.local.Get(key, func() (V, error) {
return fetch(ctx) return fetch(ctx)
}) })

View File

@ -0,0 +1,109 @@
package link
import (
"hash/fnv"
"sync"
"unsafe"
)
type Link interface {
Link(key string, link ...string)
Del(key string) map[string]struct{}
}
func newLinkKey() *linkKey {
return &linkKey{
data: make(map[string]map[string]struct{}),
}
}
type linkKey struct {
lock sync.Mutex
data map[string]map[string]struct{}
}
func (x *linkKey) link(key string, link ...string) {
x.lock.Lock()
defer x.lock.Unlock()
v, ok := x.data[key]
if !ok {
v = make(map[string]struct{})
x.data[key] = v
}
for _, k := range link {
v[k] = struct{}{}
}
}
func (x *linkKey) del(key string) map[string]struct{} {
x.lock.Lock()
defer x.lock.Unlock()
ks, ok := x.data[key]
if !ok {
return nil
}
delete(x.data, key)
return ks
}
func New(n int) Link {
if n <= 0 {
panic("must be greater than 0")
}
slots := make([]*linkKey, n)
for i := 0; i < len(slots); i++ {
slots[i] = newLinkKey()
}
return &slot{
n: uint64(n),
slots: slots,
}
}
type slot struct {
n uint64
slots []*linkKey
}
func (x *slot) index(s string) uint64 {
h := fnv.New64a()
_, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s)))
return h.Sum64() % x.n
}
func (x *slot) Link(key string, link ...string) {
if len(link) == 0 {
return
}
mk := key
lks := make([]string, len(link))
for i, k := range link {
lks[i] = k
}
x.slots[x.index(mk)].link(mk, lks...)
for _, lk := range lks {
x.slots[x.index(lk)].link(lk, mk)
}
}
func (x *slot) Del(key string) map[string]struct{} {
return x.delKey(key)
}
func (x *slot) delKey(k string) map[string]struct{} {
del := make(map[string]struct{})
stack := []string{k}
for len(stack) > 0 {
curr := stack[len(stack)-1]
stack = stack[:len(stack)-1]
if _, ok := del[curr]; ok {
continue
}
del[curr] = struct{}{}
childKeys := x.slots[x.index(curr)].del(curr)
for ck := range childKeys {
stack = append(stack, ck)
}
}
return del
}

View File

@ -0,0 +1,24 @@
package link
import (
"testing"
)
func TestName(t *testing.T) {
v := New(1)
//v.Link("a:1", "b:1", "c:1", "d:1")
v.Link("a:1", "b:1", "c:1")
v.Link("z:1", "b:1")
//v.DelKey("a:1")
v.Del("z:1")
t.Log(v.slots[0].data)
for k, v := range v.slots[0].data {
t.Log(k, v)
}
}

View File

@ -1,31 +1,32 @@
package option package option
var ( func NewOption() *Option {
t = true return &Option{}
f = false }
)
type Option struct { type Option struct {
enable *bool Enable *bool
key []string Link []string
} }
func (o *Option) Enable() *Option { func (o *Option) WithEnable() *Option {
o.enable = &t t := true
o.Enable = &t
return o return o
} }
func (o *Option) Disable() *Option { func (o *Option) WithDisable() *Option {
o.enable = &f f := false
o.Enable = &f
return o return o
} }
func (o *Option) DelKey(key ...string) *Option { func (o *Option) WithLink(key ...string) *Option {
if len(key) > 0 { if len(key) > 0 {
if o.key == nil { if len(o.Link) == 0 {
o.key = key o.Link = key
} else { } else {
o.key = append(o.key, key...) o.Link = append(o.Link, key...)
} }
} }
return o return o

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
) )
@ -28,11 +29,11 @@ func (f *FriendLocalCache) GetFriendIDs(ctx context.Context, ownerUserID string)
func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (bool, error) { func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (bool, error) {
return localcache.AnyValue[bool](f.local.Get(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) (any, error) { return localcache.AnyValue[bool](f.local.Get(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) (any, error) {
return f.client.IsFriend(ctx, possibleFriendUserID, userID) return f.client.IsFriend(ctx, possibleFriendUserID, userID)
})) }, option.NewOption().WithLink(cachekey.GetFriendIDsKey(possibleFriendUserID), cachekey.GetFriendIDsKey(userID))))
} }
func (f *FriendLocalCache) IsBlocked(ctx context.Context, possibleBlackUserID, userID string) (bool, error) { func (f *FriendLocalCache) IsBlocked(ctx context.Context, possibleBlackUserID, userID string) (bool, error) {
return localcache.AnyValue[bool](f.local.Get(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) (any, error) { return localcache.AnyValue[bool](f.local.Get(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) (any, error) {
return f.client.IsFriend(ctx, possibleBlackUserID, userID) return f.client.IsBlocked(ctx, possibleBlackUserID, userID)
})) }, option.NewOption().WithLink(cachekey.GetBlackIDsKey(possibleBlackUserID), cachekey.GetBlackIDsKey(userID))))
} }