From 1b8a3b0b75a59b762c7a3ea25b4a0edcc992a25f Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Fri, 12 Dec 2025 16:24:39 +0800 Subject: [PATCH] fix: resolve deadlock in cache eviction and improve GetBatch implementation and full id version (#3591) * fix: performance issues with Kafka caused by encapsulating the MQ interface * fix: admin token in standalone mode * fix: full id version * fix: resolve deadlock in cache eviction and improve GetBatch implementation --- pkg/localcache/cache.go | 27 ++++++----- pkg/localcache/cache_test.go | 67 ++++++++++++++++++++++++++++ pkg/localcache/init.go | 4 -- pkg/localcache/lru/lru_expiration.go | 49 +++++++++++++++++++- pkg/localcache/lru/lru_slot.go | 6 +-- 5 files changed, 133 insertions(+), 20 deletions(-) diff --git a/pkg/localcache/cache.go b/pkg/localcache/cache.go index 07d36cf46..b2376d6f1 100644 --- a/pkg/localcache/cache.go +++ b/pkg/localcache/cache.go @@ -47,15 +47,15 @@ func New[V any](opts ...Option) Cache[V] { if opt.localSlotNum > 0 && opt.localSlotSize > 0 { createSimpleLRU := func() lru.LRU[string, V] { if opt.expirationEvict { - return lru.NewExpirationLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) } else { - return lru.NewLazyLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + return lru.NewLazyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) } } if opt.localSlotNum == 1 { c.local = createSimpleLRU() } else { - c.local = lru.NewSlotLRU(opt.localSlotNum, LRUStringHash, createSimpleLRU) + c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, LRUStringHash, createSimpleLRU) } if opt.linkSlotNum > 0 { c.link = link.New(opt.linkSlotNum) @@ -71,14 +71,19 @@ type cache[V any] struct { } func (c *cache[V]) onEvict(key string, value V) { - _ = value - if c.link != nil { - lks := c.link.Del(key) - for k := range lks { - if key != k { // prevent deadlock - c.local.Del(k) - } + // Do not delete other keys while the underlying LRU still holds its lock; + // defer linked deletions to avoid re-entering the same slot and deadlocking. + if lks := c.link.Del(key); len(lks) > 0 { + go c.delLinked(key, lks) + } + } +} + +func (c *cache[V]) delLinked(src string, keys map[string]struct{}) { + for k := range keys { + if src != k { + c.local.Del(k) } } } @@ -105,7 +110,7 @@ func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.C func (c *cache[V]) GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error) { if c.local != nil { return c.local.Get(key, func() (V, error) { - if len(link) > 0 { + if len(link) > 0 && c.link != nil { c.link.Link(key, link...) } return fetch(ctx) diff --git a/pkg/localcache/cache_test.go b/pkg/localcache/cache_test.go index c206e6799..13eb20797 100644 --- a/pkg/localcache/cache_test.go +++ b/pkg/localcache/cache_test.go @@ -22,6 +22,8 @@ import ( "sync/atomic" "testing" "time" + + "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" ) func TestName(t *testing.T) { @@ -91,3 +93,68 @@ func TestName(t *testing.T) { t.Log("del", del.Load()) // 137.35s } + +// Test deadlock scenario when eviction callback deletes a linked key that hashes to the same slot. +func TestCacheEvictDeadlock(t *testing.T) { + ctx := context.Background() + c := New[string](WithLocalSlotNum(1), WithLocalSlotSize(1), WithLazy()) + + if _, err := c.GetLink(ctx, "k1", func(ctx context.Context) (string, error) { + return "v1", nil + }, "k2"); err != nil { + t.Fatalf("seed cache failed: %v", err) + } + + done := make(chan struct{}) + go func() { + defer close(done) + _, _ = c.GetLink(ctx, "k2", func(ctx context.Context) (string, error) { + return "v2", nil + }, "k1") + }() + + select { + case <-done: + // expected to finish quickly; current implementation deadlocks here. + case <-time.After(time.Second): + t.Fatal("GetLink deadlocked during eviction of linked key") + } +} + +func TestExpirationLRUGetBatch(t *testing.T) { + l := lru.NewExpirationLRU[string, string](2, time.Minute, time.Second*5, EmptyTarget{}, nil) + + keys := []string{"a", "b"} + values, err := l.GetBatch(keys, func(keys []string) (map[string]string, error) { + res := make(map[string]string) + for _, k := range keys { + res[k] = k + "_v" + } + return res, nil + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(values) != len(keys) { + t.Fatalf("expected %d values, got %d", len(keys), len(values)) + } + for _, k := range keys { + if v, ok := values[k]; !ok || v != k+"_v" { + t.Fatalf("unexpected value for %s: %q, ok=%v", k, v, ok) + } + } + + // second batch should hit cache + values, err = l.GetBatch(keys, func(keys []string) (map[string]string, error) { + t.Fatalf("should not fetch on cache hit") + return nil, nil + }) + if err != nil { + t.Fatalf("unexpected error on cache hit: %v", err) + } + for _, k := range keys { + if v, ok := values[k]; !ok || v != k+"_v" { + t.Fatalf("unexpected cached value for %s: %q, ok=%v", k, v, ok) + } + } +} diff --git a/pkg/localcache/init.go b/pkg/localcache/init.go index ad339da7c..d0bccaa7e 100644 --- a/pkg/localcache/init.go +++ b/pkg/localcache/init.go @@ -33,10 +33,6 @@ func InitLocalCache(localCache *config.LocalCache) { Local config.CacheConfig Keys []string }{ - { - Local: localCache.Auth, - Keys: []string{cachekey.UidPidToken}, - }, { Local: localCache.User, Keys: []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey}, diff --git a/pkg/localcache/lru/lru_expiration.go b/pkg/localcache/lru/lru_expiration.go index df6bacbf4..4197cacec 100644 --- a/pkg/localcache/lru/lru_expiration.go +++ b/pkg/localcache/lru/lru_expiration.go @@ -52,8 +52,53 @@ type ExpirationLRU[K comparable, V any] struct { } func (x *ExpirationLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { - //TODO implement me - panic("implement me") + var ( + err error + results = make(map[K]V) + misses = make([]K, 0, len(keys)) + ) + + for _, key := range keys { + x.lock.Lock() + v, ok := x.core.Get(key) + x.lock.Unlock() + if ok { + x.target.IncrGetHit() + v.lock.RLock() + results[key] = v.value + if v.err != nil && err == nil { + err = v.err + } + v.lock.RUnlock() + continue + } + misses = append(misses, key) + } + + if len(misses) == 0 { + return results, err + } + + fetchValues, fetchErr := fetch(misses) + if fetchErr != nil && err == nil { + err = fetchErr + } + + for key, val := range fetchValues { + results[key] = val + if fetchErr != nil { + x.target.IncrGetFailed() + continue + } + x.target.IncrGetSuccess() + item := &expirationLruItem[V]{value: val} + x.lock.Lock() + x.core.Add(key, item) + x.lock.Unlock() + } + + // any keys not returned from fetch remain absent (no cache write) + return results, err } func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { diff --git a/pkg/localcache/lru/lru_slot.go b/pkg/localcache/lru/lru_slot.go index 14ee3b50f..077219b75 100644 --- a/pkg/localcache/lru/lru_slot.go +++ b/pkg/localcache/lru/lru_slot.go @@ -35,7 +35,7 @@ type slotLRU[K comparable, V any] struct { func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { var ( slotKeys = make(map[uint64][]K) - kVs = make(map[K]V) + vs = make(map[K]V) ) for _, k := range keys { @@ -49,10 +49,10 @@ func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error) return nil, err } for key, value := range batches { - kVs[key] = value + vs[key] = value } } - return kVs, nil + return vs, nil } func (x *slotLRU[K, V]) getIndex(k K) uint64 {