mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-12-18 20:19:14 +08:00
fix: resolve deadlock in cache eviction and improve GetBatch implementation and full id version (#3591)
* fix: performance issues with Kafka caused by encapsulating the MQ interface * fix: admin token in standalone mode * fix: full id version * fix: resolve deadlock in cache eviction and improve GetBatch implementation
This commit is contained in:
parent
b8c4b459fa
commit
1b8a3b0b75
@ -47,15 +47,15 @@ func New[V any](opts ...Option) Cache[V] {
|
|||||||
if opt.localSlotNum > 0 && opt.localSlotSize > 0 {
|
if opt.localSlotNum > 0 && opt.localSlotSize > 0 {
|
||||||
createSimpleLRU := func() lru.LRU[string, V] {
|
createSimpleLRU := func() lru.LRU[string, V] {
|
||||||
if opt.expirationEvict {
|
if opt.expirationEvict {
|
||||||
return lru.NewExpirationLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
||||||
} else {
|
} else {
|
||||||
return lru.NewLazyLRU(opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
return lru.NewLazyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if opt.localSlotNum == 1 {
|
if opt.localSlotNum == 1 {
|
||||||
c.local = createSimpleLRU()
|
c.local = createSimpleLRU()
|
||||||
} else {
|
} else {
|
||||||
c.local = lru.NewSlotLRU(opt.localSlotNum, LRUStringHash, createSimpleLRU)
|
c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, LRUStringHash, createSimpleLRU)
|
||||||
}
|
}
|
||||||
if opt.linkSlotNum > 0 {
|
if opt.linkSlotNum > 0 {
|
||||||
c.link = link.New(opt.linkSlotNum)
|
c.link = link.New(opt.linkSlotNum)
|
||||||
@ -71,15 +71,20 @@ type cache[V any] struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache[V]) onEvict(key string, value V) {
|
func (c *cache[V]) onEvict(key string, value V) {
|
||||||
_ = value
|
|
||||||
|
|
||||||
if c.link != nil {
|
if c.link != nil {
|
||||||
lks := c.link.Del(key)
|
// Do not delete other keys while the underlying LRU still holds its lock;
|
||||||
for k := range lks {
|
// defer linked deletions to avoid re-entering the same slot and deadlocking.
|
||||||
if key != k { // prevent deadlock
|
if lks := c.link.Del(key); len(lks) > 0 {
|
||||||
c.local.Del(k)
|
go c.delLinked(key, lks)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache[V]) delLinked(src string, keys map[string]struct{}) {
|
||||||
|
for k := range keys {
|
||||||
|
if src != k {
|
||||||
|
c.local.Del(k)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -105,7 +110,7 @@ func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.C
|
|||||||
func (c *cache[V]) GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error) {
|
func (c *cache[V]) GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error) {
|
||||||
if c.local != nil {
|
if c.local != nil {
|
||||||
return c.local.Get(key, func() (V, error) {
|
return c.local.Get(key, func() (V, error) {
|
||||||
if len(link) > 0 {
|
if len(link) > 0 && c.link != nil {
|
||||||
c.link.Link(key, link...)
|
c.link.Link(key, link...)
|
||||||
}
|
}
|
||||||
return fetch(ctx)
|
return fetch(ctx)
|
||||||
|
|||||||
@ -22,6 +22,8 @@ import (
|
|||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/openimsdk/open-im-server/v3/pkg/localcache/lru"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestName(t *testing.T) {
|
func TestName(t *testing.T) {
|
||||||
@ -91,3 +93,68 @@ func TestName(t *testing.T) {
|
|||||||
t.Log("del", del.Load())
|
t.Log("del", del.Load())
|
||||||
// 137.35s
|
// 137.35s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test deadlock scenario when eviction callback deletes a linked key that hashes to the same slot.
|
||||||
|
func TestCacheEvictDeadlock(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
c := New[string](WithLocalSlotNum(1), WithLocalSlotSize(1), WithLazy())
|
||||||
|
|
||||||
|
if _, err := c.GetLink(ctx, "k1", func(ctx context.Context) (string, error) {
|
||||||
|
return "v1", nil
|
||||||
|
}, "k2"); err != nil {
|
||||||
|
t.Fatalf("seed cache failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(done)
|
||||||
|
_, _ = c.GetLink(ctx, "k2", func(ctx context.Context) (string, error) {
|
||||||
|
return "v2", nil
|
||||||
|
}, "k1")
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
// expected to finish quickly; current implementation deadlocks here.
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("GetLink deadlocked during eviction of linked key")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExpirationLRUGetBatch(t *testing.T) {
|
||||||
|
l := lru.NewExpirationLRU[string, string](2, time.Minute, time.Second*5, EmptyTarget{}, nil)
|
||||||
|
|
||||||
|
keys := []string{"a", "b"}
|
||||||
|
values, err := l.GetBatch(keys, func(keys []string) (map[string]string, error) {
|
||||||
|
res := make(map[string]string)
|
||||||
|
for _, k := range keys {
|
||||||
|
res[k] = k + "_v"
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(values) != len(keys) {
|
||||||
|
t.Fatalf("expected %d values, got %d", len(keys), len(values))
|
||||||
|
}
|
||||||
|
for _, k := range keys {
|
||||||
|
if v, ok := values[k]; !ok || v != k+"_v" {
|
||||||
|
t.Fatalf("unexpected value for %s: %q, ok=%v", k, v, ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// second batch should hit cache
|
||||||
|
values, err = l.GetBatch(keys, func(keys []string) (map[string]string, error) {
|
||||||
|
t.Fatalf("should not fetch on cache hit")
|
||||||
|
return nil, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error on cache hit: %v", err)
|
||||||
|
}
|
||||||
|
for _, k := range keys {
|
||||||
|
if v, ok := values[k]; !ok || v != k+"_v" {
|
||||||
|
t.Fatalf("unexpected cached value for %s: %q, ok=%v", k, v, ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@ -33,10 +33,6 @@ func InitLocalCache(localCache *config.LocalCache) {
|
|||||||
Local config.CacheConfig
|
Local config.CacheConfig
|
||||||
Keys []string
|
Keys []string
|
||||||
}{
|
}{
|
||||||
{
|
|
||||||
Local: localCache.Auth,
|
|
||||||
Keys: []string{cachekey.UidPidToken},
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
Local: localCache.User,
|
Local: localCache.User,
|
||||||
Keys: []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey},
|
Keys: []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey},
|
||||||
|
|||||||
@ -52,8 +52,53 @@ type ExpirationLRU[K comparable, V any] struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (x *ExpirationLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
|
func (x *ExpirationLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
|
||||||
//TODO implement me
|
var (
|
||||||
panic("implement me")
|
err error
|
||||||
|
results = make(map[K]V)
|
||||||
|
misses = make([]K, 0, len(keys))
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
x.lock.Lock()
|
||||||
|
v, ok := x.core.Get(key)
|
||||||
|
x.lock.Unlock()
|
||||||
|
if ok {
|
||||||
|
x.target.IncrGetHit()
|
||||||
|
v.lock.RLock()
|
||||||
|
results[key] = v.value
|
||||||
|
if v.err != nil && err == nil {
|
||||||
|
err = v.err
|
||||||
|
}
|
||||||
|
v.lock.RUnlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
misses = append(misses, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(misses) == 0 {
|
||||||
|
return results, err
|
||||||
|
}
|
||||||
|
|
||||||
|
fetchValues, fetchErr := fetch(misses)
|
||||||
|
if fetchErr != nil && err == nil {
|
||||||
|
err = fetchErr
|
||||||
|
}
|
||||||
|
|
||||||
|
for key, val := range fetchValues {
|
||||||
|
results[key] = val
|
||||||
|
if fetchErr != nil {
|
||||||
|
x.target.IncrGetFailed()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
x.target.IncrGetSuccess()
|
||||||
|
item := &expirationLruItem[V]{value: val}
|
||||||
|
x.lock.Lock()
|
||||||
|
x.core.Add(key, item)
|
||||||
|
x.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// any keys not returned from fetch remain absent (no cache write)
|
||||||
|
return results, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
||||||
|
|||||||
@ -35,7 +35,7 @@ type slotLRU[K comparable, V any] struct {
|
|||||||
func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
|
func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
|
||||||
var (
|
var (
|
||||||
slotKeys = make(map[uint64][]K)
|
slotKeys = make(map[uint64][]K)
|
||||||
kVs = make(map[K]V)
|
vs = make(map[K]V)
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, k := range keys {
|
for _, k := range keys {
|
||||||
@ -49,10 +49,10 @@ func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for key, value := range batches {
|
for key, value := range batches {
|
||||||
kVs[key] = value
|
vs[key] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return kVs, nil
|
return vs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *slotLRU[K, V]) getIndex(k K) uint64 {
|
func (x *slotLRU[K, V]) getIndex(k K) uint64 {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user