From abbb701192b79a67104e73138ffdd62e3e9a4384 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Mon, 15 Jan 2024 15:23:42 +0800 Subject: [PATCH] feat: local cache --- pkg/localcache/cache.go | 70 +++++++++++++------ pkg/localcache/local/cache.go | 50 ------------- pkg/localcache/local/callback.go | 1 - pkg/localcache/{local => lru}/lru.go | 2 +- .../lru_expirable.go => lru/lru_actively.go} | 31 ++++---- pkg/localcache/{local => lru}/lru_inertia.go | 7 +- .../{local => lru}/lru_inertia_test.go | 15 +++- pkg/localcache/lru/lru_slot.go | 37 ++++++++++ pkg/localcache/option.go | 20 +++++- pkg/localcache/option/option.go | 20 ------ pkg/rpccache/conversation.go | 7 +- pkg/rpccache/friend.go | 31 +++----- pkg/rpccache/group.go | 7 +- 13 files changed, 158 insertions(+), 140 deletions(-) delete mode 100644 pkg/localcache/local/cache.go delete mode 100644 pkg/localcache/local/callback.go rename pkg/localcache/{local => lru}/lru.go (96%) rename pkg/localcache/{local/lru_expirable.go => lru/lru_actively.go} (53%) rename pkg/localcache/{local => lru}/lru_inertia.go (95%) rename pkg/localcache/{local => lru}/lru_inertia_test.go (83%) create mode 100644 pkg/localcache/lru/lru_slot.go delete mode 100644 pkg/localcache/option/option.go diff --git a/pkg/localcache/cache.go b/pkg/localcache/cache.go index 2d2e8018e..ed0e16419 100644 --- a/pkg/localcache/cache.go +++ b/pkg/localcache/cache.go @@ -3,13 +3,17 @@ package localcache import ( "context" "github.com/openimsdk/localcache/link" - "github.com/openimsdk/localcache/local" - lopt "github.com/openimsdk/localcache/option" + "github.com/openimsdk/localcache/lru" + "hash/fnv" + "unsafe" ) type Cache[V any] interface { - Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error) + Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error) + GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error) Del(ctx context.Context, key ...string) + DelLocal(ctx context.Context, key ...string) + Stop() } func New[V any](opts ...Option) Cache[V] { @@ -19,10 +23,22 @@ func New[V any](opts ...Option) Cache[V] { } c := cache[V]{opt: opt} if opt.localSlotNum > 0 && opt.localSlotSize > 0 { - c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) - go func() { - c.opt.delCh(c.del) - }() + createSimpleLRU := func() lru.LRU[string, V] { + if opt.actively { + return lru.NewActivelyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + } else { + return lru.NewInertiaLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + } + } + if opt.localSlotNum == 1 { + c.local = createSimpleLRU() + } else { + c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, func(key string) uint64 { + h := fnv.New64a() + h.Write(*(*[]byte)(unsafe.Pointer(&key))) + return h.Sum64() + }, createSimpleLRU) + } if opt.linkSlotNum > 0 { c.link = link.New(opt.linkSlotNum) } @@ -33,7 +49,7 @@ func New[V any](opts ...Option) Cache[V] { type cache[V any] struct { opt *option link link.Link - local local.Cache[V] + local lru.LRU[string, V] } func (c *cache[V]) onEvict(key string, value V) { @@ -48,22 +64,29 @@ func (c *cache[V]) onEvict(key string, value V) { } func (c *cache[V]) del(key ...string) { + if c.local == nil { + return + } for _, k := range key { - lks := c.link.Del(k) c.local.Del(k) - for k := range lks { - c.local.Del(k) + if c.link != nil { + lks := c.link.Del(k) + for k := range lks { + c.local.Del(k) + } } } } -func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error) { +func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error) { + return c.GetLink(ctx, key, fetch) +} + +func (c *cache[V]) GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error) { if c.local != nil { return c.local.Get(key, func() (V, error) { - if c.link != nil { - for _, o := range opts { - c.link.Link(key, o.Link...) - } + if len(link) > 0 { + c.link.Link(key, link...) } return fetch(ctx) }) @@ -73,13 +96,16 @@ func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.C } func (c *cache[V]) Del(ctx context.Context, key ...string) { - if len(key) == 0 { - return - } for _, fn := range c.opt.delFn { fn(ctx, key...) } - if c.local != nil { - c.del(key...) - } + c.del(key...) +} + +func (c *cache[V]) DelLocal(ctx context.Context, key ...string) { + c.del(key...) +} + +func (c *cache[V]) Stop() { + c.local.Stop() } diff --git a/pkg/localcache/local/cache.go b/pkg/localcache/local/cache.go deleted file mode 100644 index e21d72425..000000000 --- a/pkg/localcache/local/cache.go +++ /dev/null @@ -1,50 +0,0 @@ -package local - -import ( - "hash/fnv" - "time" - "unsafe" -) - -type Cache[V any] interface { - Get(key string, fetch func() (V, error)) (V, error) - Del(key string) bool -} - -func NewCache[V any](slotNum, slotSize int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[string, V]) Cache[V] { - c := &slot[V]{ - n: uint64(slotNum), - slots: make([]*InertiaLRU[string, V], slotNum), - target: target, - } - for i := 0; i < slotNum; i++ { - c.slots[i] = NewInertiaLRU[string, V](slotSize, successTTL, failedTTL, c.target, onEvict) - } - return c -} - -type slot[V any] struct { - n uint64 - slots []*InertiaLRU[string, V] - target Target -} - -func (c *slot[V]) index(s string) uint64 { - h := fnv.New64a() - _, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s))) - return h.Sum64() % c.n -} - -func (c *slot[V]) Get(key string, fetch func() (V, error)) (V, error) { - return c.slots[c.index(key)].Get(key, fetch) -} - -func (c *slot[V]) Del(key string) bool { - if c.slots[c.index(key)].Del(key) { - c.target.IncrDelHit() - return true - } else { - c.target.IncrDelNotFound() - return false - } -} diff --git a/pkg/localcache/local/callback.go b/pkg/localcache/local/callback.go deleted file mode 100644 index 469c3dc0d..000000000 --- a/pkg/localcache/local/callback.go +++ /dev/null @@ -1 +0,0 @@ -package local diff --git a/pkg/localcache/local/lru.go b/pkg/localcache/lru/lru.go similarity index 96% rename from pkg/localcache/local/lru.go rename to pkg/localcache/lru/lru.go index 9cb984b81..2995ec028 100644 --- a/pkg/localcache/local/lru.go +++ b/pkg/localcache/lru/lru.go @@ -1,4 +1,4 @@ -package local +package lru import "github.com/hashicorp/golang-lru/v2/simplelru" diff --git a/pkg/localcache/local/lru_expirable.go b/pkg/localcache/lru/lru_actively.go similarity index 53% rename from pkg/localcache/local/lru_expirable.go rename to pkg/localcache/lru/lru_actively.go index 72acb4ffa..55fa4d0d1 100644 --- a/pkg/localcache/local/lru_expirable.go +++ b/pkg/localcache/lru/lru_actively.go @@ -1,4 +1,4 @@ -package local +package lru import ( "github.com/hashicorp/golang-lru/v2/expirable" @@ -6,15 +6,15 @@ import ( "time" ) -func NewExpirableLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) LRU[K, V] { - var cb expirable.EvictCallback[K, *expirableLruItem[V]] +func NewActivelyLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) LRU[K, V] { + var cb expirable.EvictCallback[K, *activelyLruItem[V]] if onEvict != nil { - cb = func(key K, value *expirableLruItem[V]) { + cb = func(key K, value *activelyLruItem[V]) { onEvict(key, value.value) } } - core := expirable.NewLRU[K, *expirableLruItem[V]](size, cb, successTTL) - return &expirableLRU[K, V]{ + core := expirable.NewLRU[K, *activelyLruItem[V]](size, cb, successTTL) + return &activelyLRU[K, V]{ core: core, successTTL: successTTL, failedTTL: failedTTL, @@ -22,21 +22,21 @@ func NewExpirableLRU[K comparable, V any](size int, successTTL, failedTTL time.D } } -type expirableLruItem[V any] struct { +type activelyLruItem[V any] struct { lock sync.RWMutex err error value V } -type expirableLRU[K comparable, V any] struct { +type activelyLRU[K comparable, V any] struct { lock sync.Mutex - core *expirable.LRU[K, *expirableLruItem[V]] + core *expirable.LRU[K, *activelyLruItem[V]] successTTL time.Duration failedTTL time.Duration target Target } -func (x *expirableLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { +func (x *activelyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { x.lock.Lock() v, ok := x.core.Get(key) if ok { @@ -46,7 +46,7 @@ func (x *expirableLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { defer v.lock.RUnlock() return v.value, v.err } else { - v = &expirableLruItem[V]{} + v = &activelyLruItem[V]{} x.core.Add(key, v) v.lock.Lock() x.lock.Unlock() @@ -62,12 +62,17 @@ func (x *expirableLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { } } -func (x *expirableLRU[K, V]) Del(key K) bool { +func (x *activelyLRU[K, V]) Del(key K) bool { x.lock.Lock() ok := x.core.Remove(key) x.lock.Unlock() + if ok { + x.target.IncrDelHit() + } else { + x.target.IncrDelNotFound() + } return ok } -func (x *expirableLRU[K, V]) Stop() { +func (x *activelyLRU[K, V]) Stop() { } diff --git a/pkg/localcache/local/lru_inertia.go b/pkg/localcache/lru/lru_inertia.go similarity index 95% rename from pkg/localcache/local/lru_inertia.go rename to pkg/localcache/lru/lru_inertia.go index cd161e16e..b1f9f24af 100644 --- a/pkg/localcache/local/lru_inertia.go +++ b/pkg/localcache/lru/lru_inertia.go @@ -1,4 +1,4 @@ -package local +package lru import ( "github.com/hashicorp/golang-lru/v2/simplelru" @@ -77,6 +77,11 @@ func (x *InertiaLRU[K, V]) Del(key K) bool { x.lock.Lock() ok := x.core.Remove(key) x.lock.Unlock() + if ok { + x.target.IncrDelHit() + } else { + x.target.IncrDelNotFound() + } return ok } diff --git a/pkg/localcache/local/lru_inertia_test.go b/pkg/localcache/lru/lru_inertia_test.go similarity index 83% rename from pkg/localcache/local/lru_inertia_test.go rename to pkg/localcache/lru/lru_inertia_test.go index 6bff90011..df4919d2d 100644 --- a/pkg/localcache/local/lru_inertia_test.go +++ b/pkg/localcache/lru/lru_inertia_test.go @@ -1,11 +1,13 @@ -package local +package lru import ( "fmt" + "hash/fnv" "sync" "sync/atomic" "testing" "time" + "unsafe" ) type cacheTarget struct { @@ -42,7 +44,13 @@ func (r *cacheTarget) String() string { func TestName(t *testing.T) { target := &cacheTarget{} - l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target, nil) + l := NewSlotLRU[string, string](100, func(k string) uint64 { + h := fnv.New64a() + h.Write(*(*[]byte)(unsafe.Pointer(&k))) + return h.Sum64() + }, func() LRU[string, string] { + return NewActivelyLRU[string, string](100, time.Second*60, time.Second, target, nil) + }) //l := NewInertiaLRU[string, string](1000, time.Second*20, time.Second*5, target) fn := func(key string, n int, fetch func() (string, error)) { @@ -53,8 +61,9 @@ func TestName(t *testing.T) { //} else { // t.Error("key", key, err) //} - l.Get(key, fetch) + v, err := l.Get(key, fetch) //time.Sleep(time.Second / 100) + func(v ...any) {}(v, err) } } diff --git a/pkg/localcache/lru/lru_slot.go b/pkg/localcache/lru/lru_slot.go new file mode 100644 index 000000000..c1b8b94d0 --- /dev/null +++ b/pkg/localcache/lru/lru_slot.go @@ -0,0 +1,37 @@ +package lru + +func NewSlotLRU[K comparable, V any](slotNum int, hash func(K) uint64, create func() LRU[K, V]) LRU[K, V] { + x := &slotLRU[K, V]{ + n: uint64(slotNum), + slots: make([]LRU[K, V], slotNum), + hash: hash, + } + for i := 0; i < slotNum; i++ { + x.slots[i] = create() + } + return x +} + +type slotLRU[K comparable, V any] struct { + n uint64 + slots []LRU[K, V] + hash func(k K) uint64 +} + +func (x *slotLRU[K, V]) getIndex(k K) uint64 { + return x.hash(k) % x.n +} + +func (x *slotLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { + return x.slots[x.getIndex(key)].Get(key, fetch) +} + +func (x *slotLRU[K, V]) Del(key K) bool { + return x.slots[x.getIndex(key)].Del(key) +} + +func (x *slotLRU[K, V]) Stop() { + for _, slot := range x.slots { + slot.Stop() + } +} diff --git a/pkg/localcache/option.go b/pkg/localcache/option.go index 9f77bc502..178faf273 100644 --- a/pkg/localcache/option.go +++ b/pkg/localcache/option.go @@ -2,7 +2,7 @@ package localcache import ( "context" - "github.com/openimsdk/localcache/local" + "github.com/openimsdk/localcache/lru" "time" ) @@ -11,6 +11,7 @@ func defaultOption() *option { localSlotNum: 500, localSlotSize: 20000, linkSlotNum: 500, + actively: false, localSuccessTTL: time.Minute, localFailedTTL: time.Second * 5, delFn: make([]func(ctx context.Context, key ...string), 0, 2), @@ -22,15 +23,28 @@ type option struct { localSlotNum int localSlotSize int linkSlotNum int + actively bool localSuccessTTL time.Duration localFailedTTL time.Duration delFn []func(ctx context.Context, key ...string) delCh func(fn func(key ...string)) - target local.Target + target lru.Target } type Option func(o *option) +func WithActively() Option { + return func(o *option) { + o.actively = true + } +} + +func WithInertia() Option { + return func(o *option) { + o.actively = false + } +} + func WithLocalDisable() Option { return WithLinkSlotNum(0) } @@ -75,7 +89,7 @@ func WithLocalFailedTTL(localFailedTTL time.Duration) Option { } } -func WithTarget(target local.Target) Option { +func WithTarget(target lru.Target) Option { if target == nil { panic("target should not be nil") } diff --git a/pkg/localcache/option/option.go b/pkg/localcache/option/option.go deleted file mode 100644 index 8547b3339..000000000 --- a/pkg/localcache/option/option.go +++ /dev/null @@ -1,20 +0,0 @@ -package option - -func NewOption() *Option { - return &Option{} -} - -type Option struct { - Link []string -} - -func (o *Option) WithLink(key ...string) *Option { - if len(key) > 0 { - if len(o.Link) == 0 { - o.Link = key - } else { - o.Link = append(o.Link, key...) - } - } - return o -} diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 5a76990d0..a0aba105a 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -11,14 +11,17 @@ import ( func NewConversationLocalCache(client rpcclient.ConversationRpcClient, cli redis.UniversalClient) *ConversationLocalCache { return &ConversationLocalCache{ - local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Conversation.Topic, cli)), client: client, + local: localcache.New[any]( + localcache.WithLocalSlotNum(config.Config.LocalCache.Conversation.SlotNum), + localcache.WithLocalSlotSize(config.Config.LocalCache.Conversation.SlotSize), + ), } } type ConversationLocalCache struct { - local localcache.Cache[any] client rpcclient.ConversationRpcClient + local localcache.Cache[any] } func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go index 6d640ea8f..fc5f0e96c 100644 --- a/pkg/rpccache/friend.go +++ b/pkg/rpccache/friend.go @@ -4,7 +4,6 @@ import ( "context" "github.com/OpenIMSDK/tools/log" "github.com/openimsdk/localcache" - "github.com/openimsdk/localcache/option" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" @@ -13,31 +12,19 @@ import ( func NewFriendLocalCache(client rpcclient.FriendRpcClient, cli redis.UniversalClient) *FriendLocalCache { return &FriendLocalCache{ - local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Friend.Topic, cli)), client: client, + local: localcache.New[any]( + localcache.WithLocalSlotNum(config.Config.LocalCache.Friend.SlotNum), + localcache.WithLocalSlotSize(config.Config.LocalCache.Friend.SlotSize), + ), } } type FriendLocalCache struct { - local localcache.Cache[any] client rpcclient.FriendRpcClient + local localcache.Cache[any] } -//func (f *FriendLocalCache) GetFriendIDs(ctx context.Context, ownerUserID string) (val []string, err error) { -// log.ZDebug(ctx, "FriendLocalCache GetFriendIDs req", "ownerUserID", ownerUserID) -// defer func() { -// if err == nil { -// log.ZDebug(ctx, "FriendLocalCache GetFriendIDs return", "value", val) -// } else { -// log.ZError(ctx, "FriendLocalCache GetFriendIDs return", err) -// } -// }() -// return localcache.AnyValue[[]string](f.local.Get(ctx, cachekey.GetFriendIDsKey(ownerUserID), func(ctx context.Context) (any, error) { -// log.ZDebug(ctx, "FriendLocalCache GetFriendIDs call rpc", "ownerUserID", ownerUserID) -// return f.client.GetFriendIDs(ctx, ownerUserID) -// })) -//} - func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (val bool, err error) { log.ZDebug(ctx, "FriendLocalCache IsFriend req", "possibleFriendUserID", possibleFriendUserID, "userID", userID) defer func() { @@ -47,10 +34,10 @@ func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, u log.ZError(ctx, "FriendLocalCache IsFriend return", err) } }() - return localcache.AnyValue[bool](f.local.Get(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) (any, error) { + return localcache.AnyValue[bool](f.local.GetLink(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) (any, error) { log.ZDebug(ctx, "FriendLocalCache IsFriend rpc", "possibleFriendUserID", possibleFriendUserID, "userID", userID) return f.client.IsFriend(ctx, possibleFriendUserID, userID) - }, option.NewOption().WithLink(cachekey.GetFriendIDsKey(possibleFriendUserID)))) + }, cachekey.GetFriendIDsKey(possibleFriendUserID))) } // IsBlack possibleBlackUserID selfUserID @@ -63,8 +50,8 @@ func (f *FriendLocalCache) IsBlack(ctx context.Context, possibleBlackUserID, use log.ZError(ctx, "FriendLocalCache IsBlack return", err) } }() - return localcache.AnyValue[bool](f.local.Get(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) (any, error) { + return localcache.AnyValue[bool](f.local.GetLink(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) (any, error) { log.ZDebug(ctx, "FriendLocalCache IsBlack rpc", "possibleBlackUserID", possibleBlackUserID, "userID", userID) return f.client.IsBlack(ctx, possibleBlackUserID, userID) - }, option.NewOption().WithLink(cachekey.GetBlackIDsKey(userID)))) + }, cachekey.GetBlackIDsKey(userID))) } diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go index 3c3333b92..d3617bfc4 100644 --- a/pkg/rpccache/group.go +++ b/pkg/rpccache/group.go @@ -11,14 +11,17 @@ import ( func NewGroupLocalCache(client rpcclient.GroupRpcClient, cli redis.UniversalClient) *GroupLocalCache { return &GroupLocalCache{ - local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Group.Topic, cli)), client: client, + local: localcache.New[any]( + localcache.WithLocalSlotNum(config.Config.LocalCache.Group.SlotNum), + localcache.WithLocalSlotSize(config.Config.LocalCache.Group.SlotSize), + ), } } type GroupLocalCache struct { - local localcache.Cache[any] client rpcclient.GroupRpcClient + local localcache.Cache[any] } func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) {