From 34971c8b96ce0ac893b16631fde3cc3fd54ed9d2 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 14 Aug 2025 10:19:11 +0800 Subject: [PATCH] fix: optimize to lru local cache. (#3514) * fix: optimize to lru local cache. * revert lock timing. --- pkg/common/storage/cache/mcache/msg_cache.go | 2 +- pkg/localcache/cache.go | 2 +- pkg/localcache/lru/lru_lazy.go | 56 +++++++++++--------- pkg/localcache/lru/lru_slot.go | 6 +-- pkg/rpccache/online.go | 2 +- 5 files changed, 37 insertions(+), 31 deletions(-) diff --git a/pkg/common/storage/cache/mcache/msg_cache.go b/pkg/common/storage/cache/mcache/msg_cache.go index 3846be3f8..6fd5f80a1 100644 --- a/pkg/common/storage/cache/mcache/msg_cache.go +++ b/pkg/common/storage/cache/mcache/msg_cache.go @@ -24,7 +24,7 @@ var ( func NewMsgCache(cache database.Cache, msgDocDatabase database.Msg) cache.MsgCache { initMemMsgCache.Do(func() { - memMsgCache = lru.NewLayLRU[string, *model.MsgInfoModel](1024*8, time.Hour, time.Second*10, localcache.EmptyTarget{}, nil) + memMsgCache = lru.NewLazyLRU[string, *model.MsgInfoModel](1024*8, time.Hour, time.Second*10, localcache.EmptyTarget{}, nil) }) return &msgCache{ cache: cache, diff --git a/pkg/localcache/cache.go b/pkg/localcache/cache.go index ba849f892..92695c05d 100644 --- a/pkg/localcache/cache.go +++ b/pkg/localcache/cache.go @@ -49,7 +49,7 @@ func New[V any](opts ...Option) Cache[V] { if opt.expirationEvict { return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) } else { - return lru.NewLayLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + return lru.NewLazyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) } } if opt.localSlotNum == 1 { diff --git a/pkg/localcache/lru/lru_lazy.go b/pkg/localcache/lru/lru_lazy.go index b4f0377a7..4a3db46c9 100644 --- a/pkg/localcache/lru/lru_lazy.go +++ b/pkg/localcache/lru/lru_lazy.go @@ -21,25 +21,25 @@ import ( "github.com/hashicorp/golang-lru/v2/simplelru" ) -type layLruItem[V any] struct { +type lazyLruItem[V any] struct { lock sync.Mutex expires int64 err error value V } -func NewLayLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *LayLRU[K, V] { - var cb simplelru.EvictCallback[K, *layLruItem[V]] +func NewLazyLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *LazyLRU[K, V] { + var cb simplelru.EvictCallback[K, *lazyLruItem[V]] if onEvict != nil { - cb = func(key K, value *layLruItem[V]) { + cb = func(key K, value *lazyLruItem[V]) { onEvict(key, value.value) } } - core, err := simplelru.NewLRU[K, *layLruItem[V]](size, cb) + core, err := simplelru.NewLRU[K, *lazyLruItem[V]](size, cb) if err != nil { panic(err) } - return &LayLRU[K, V]{ + return &LazyLRU[K, V]{ core: core, successTTL: successTTL, failedTTL: failedTTL, @@ -47,15 +47,15 @@ func NewLayLRU[K comparable, V any](size int, successTTL, failedTTL time.Duratio } } -type LayLRU[K comparable, V any] struct { +type LazyLRU[K comparable, V any] struct { lock sync.Mutex - core *simplelru.LRU[K, *layLruItem[V]] + core *simplelru.LRU[K, *lazyLruItem[V]] successTTL time.Duration failedTTL time.Duration target Target } -func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { +func (x *LazyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { x.lock.Lock() v, ok := x.core.Get(key) if ok { @@ -68,7 +68,7 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { return value, err } } else { - v = &layLruItem[V]{} + v = &lazyLruItem[V]{} x.core.Add(key, v) v.lock.Lock() x.lock.Unlock() @@ -88,15 +88,15 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { return v.value, v.err } -func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { +func (x *LazyLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { var ( err error once sync.Once ) res := make(map[K]V) - queries := make([]K, 0) - setVs := make(map[K]*layLruItem[V]) + queries := make([]K, 0, len(keys)) + for _, key := range keys { x.lock.Lock() v, ok := x.core.Get(key) @@ -118,14 +118,20 @@ func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) } queries = append(queries, key) } - values, err1 := fetch(queries) - if err1 != nil { + + if len(queries) == 0 { + return res, err + } + + values, fetchErr := fetch(queries) + if fetchErr != nil { once.Do(func() { - err = err1 + err = fetchErr }) } + for key, val := range values { - v := &layLruItem[V]{} + v := &lazyLruItem[V]{} v.value = val if err == nil { @@ -135,7 +141,7 @@ func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) v.expires = time.Now().Add(x.failedTTL).UnixMilli() x.target.IncrGetFailed() } - setVs[key] = v + x.lock.Lock() x.core.Add(key, v) x.lock.Unlock() @@ -145,29 +151,29 @@ func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) return res, err } -//func (x *LayLRU[K, V]) Has(key K) bool { +//func (x *LazyLRU[K, V]) Has(key K) bool { // x.lock.Lock() // defer x.lock.Unlock() // return x.core.Contains(key) //} -func (x *LayLRU[K, V]) Set(key K, value V) { +func (x *LazyLRU[K, V]) Set(key K, value V) { x.lock.Lock() defer x.lock.Unlock() - x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) + x.core.Add(key, &lazyLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) } -func (x *LayLRU[K, V]) SetHas(key K, value V) bool { +func (x *LazyLRU[K, V]) SetHas(key K, value V) bool { x.lock.Lock() defer x.lock.Unlock() if x.core.Contains(key) { - x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) + x.core.Add(key, &lazyLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) return true } return false } -func (x *LayLRU[K, V]) Del(key K) bool { +func (x *LazyLRU[K, V]) Del(key K) bool { x.lock.Lock() ok := x.core.Remove(key) x.lock.Unlock() @@ -179,6 +185,6 @@ func (x *LayLRU[K, V]) Del(key K) bool { return ok } -func (x *LayLRU[K, V]) Stop() { +func (x *LazyLRU[K, V]) Stop() { } diff --git a/pkg/localcache/lru/lru_slot.go b/pkg/localcache/lru/lru_slot.go index 077219b75..14ee3b50f 100644 --- a/pkg/localcache/lru/lru_slot.go +++ b/pkg/localcache/lru/lru_slot.go @@ -35,7 +35,7 @@ type slotLRU[K comparable, V any] struct { func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { var ( slotKeys = make(map[uint64][]K) - vs = make(map[K]V) + kVs = make(map[K]V) ) for _, k := range keys { @@ -49,10 +49,10 @@ func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error) return nil, err } for key, value := range batches { - vs[key] = value + kVs[key] = value } } - return vs, nil + return kVs, nil } func (x *slotLRU[K, V]) getIndex(k K) uint64 { diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index 9b8c03101..fb7f18393 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -64,7 +64,7 @@ func NewOnlineCache(client *rpcli.UserClient, group *GroupLocalCache, rdb redis. case false: log.ZDebug(ctx, "fullUserCache is false") x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { - return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {}) + return lru.NewLazyLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {}) }) x.CurrentPhase.Store(DoSubscribeOver) x.Cond.Broadcast()