mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-08-15 08:19:55 +08:00
fix: optimize to lru local cache. (#3514)
* fix: optimize to lru local cache. * revert lock timing.
This commit is contained in:
parent
d542df7000
commit
34971c8b96
2
pkg/common/storage/cache/mcache/msg_cache.go
vendored
2
pkg/common/storage/cache/mcache/msg_cache.go
vendored
@ -24,7 +24,7 @@ var (
|
|||||||
|
|
||||||
func NewMsgCache(cache database.Cache, msgDocDatabase database.Msg) cache.MsgCache {
|
func NewMsgCache(cache database.Cache, msgDocDatabase database.Msg) cache.MsgCache {
|
||||||
initMemMsgCache.Do(func() {
|
initMemMsgCache.Do(func() {
|
||||||
memMsgCache = lru.NewLayLRU[string, *model.MsgInfoModel](1024*8, time.Hour, time.Second*10, localcache.EmptyTarget{}, nil)
|
memMsgCache = lru.NewLazyLRU[string, *model.MsgInfoModel](1024*8, time.Hour, time.Second*10, localcache.EmptyTarget{}, nil)
|
||||||
})
|
})
|
||||||
return &msgCache{
|
return &msgCache{
|
||||||
cache: cache,
|
cache: cache,
|
||||||
|
@ -49,7 +49,7 @@ func New[V any](opts ...Option) Cache[V] {
|
|||||||
if opt.expirationEvict {
|
if opt.expirationEvict {
|
||||||
return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
||||||
} else {
|
} else {
|
||||||
return lru.NewLayLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
return lru.NewLazyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if opt.localSlotNum == 1 {
|
if opt.localSlotNum == 1 {
|
||||||
|
@ -21,25 +21,25 @@ import (
|
|||||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||||
)
|
)
|
||||||
|
|
||||||
type layLruItem[V any] struct {
|
type lazyLruItem[V any] struct {
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
expires int64
|
expires int64
|
||||||
err error
|
err error
|
||||||
value V
|
value V
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLayLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *LayLRU[K, V] {
|
func NewLazyLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *LazyLRU[K, V] {
|
||||||
var cb simplelru.EvictCallback[K, *layLruItem[V]]
|
var cb simplelru.EvictCallback[K, *lazyLruItem[V]]
|
||||||
if onEvict != nil {
|
if onEvict != nil {
|
||||||
cb = func(key K, value *layLruItem[V]) {
|
cb = func(key K, value *lazyLruItem[V]) {
|
||||||
onEvict(key, value.value)
|
onEvict(key, value.value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
core, err := simplelru.NewLRU[K, *layLruItem[V]](size, cb)
|
core, err := simplelru.NewLRU[K, *lazyLruItem[V]](size, cb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
return &LayLRU[K, V]{
|
return &LazyLRU[K, V]{
|
||||||
core: core,
|
core: core,
|
||||||
successTTL: successTTL,
|
successTTL: successTTL,
|
||||||
failedTTL: failedTTL,
|
failedTTL: failedTTL,
|
||||||
@ -47,15 +47,15 @@ func NewLayLRU[K comparable, V any](size int, successTTL, failedTTL time.Duratio
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type LayLRU[K comparable, V any] struct {
|
type LazyLRU[K comparable, V any] struct {
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
core *simplelru.LRU[K, *layLruItem[V]]
|
core *simplelru.LRU[K, *lazyLruItem[V]]
|
||||||
successTTL time.Duration
|
successTTL time.Duration
|
||||||
failedTTL time.Duration
|
failedTTL time.Duration
|
||||||
target Target
|
target Target
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
func (x *LazyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
||||||
x.lock.Lock()
|
x.lock.Lock()
|
||||||
v, ok := x.core.Get(key)
|
v, ok := x.core.Get(key)
|
||||||
if ok {
|
if ok {
|
||||||
@ -68,7 +68,7 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
|||||||
return value, err
|
return value, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
v = &layLruItem[V]{}
|
v = &lazyLruItem[V]{}
|
||||||
x.core.Add(key, v)
|
x.core.Add(key, v)
|
||||||
v.lock.Lock()
|
v.lock.Lock()
|
||||||
x.lock.Unlock()
|
x.lock.Unlock()
|
||||||
@ -88,15 +88,15 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
|||||||
return v.value, v.err
|
return v.value, v.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
|
func (x *LazyLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
once sync.Once
|
once sync.Once
|
||||||
)
|
)
|
||||||
|
|
||||||
res := make(map[K]V)
|
res := make(map[K]V)
|
||||||
queries := make([]K, 0)
|
queries := make([]K, 0, len(keys))
|
||||||
setVs := make(map[K]*layLruItem[V])
|
|
||||||
for _, key := range keys {
|
for _, key := range keys {
|
||||||
x.lock.Lock()
|
x.lock.Lock()
|
||||||
v, ok := x.core.Get(key)
|
v, ok := x.core.Get(key)
|
||||||
@ -118,14 +118,20 @@ func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error))
|
|||||||
}
|
}
|
||||||
queries = append(queries, key)
|
queries = append(queries, key)
|
||||||
}
|
}
|
||||||
values, err1 := fetch(queries)
|
|
||||||
if err1 != nil {
|
if len(queries) == 0 {
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
values, fetchErr := fetch(queries)
|
||||||
|
if fetchErr != nil {
|
||||||
once.Do(func() {
|
once.Do(func() {
|
||||||
err = err1
|
err = fetchErr
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
for key, val := range values {
|
for key, val := range values {
|
||||||
v := &layLruItem[V]{}
|
v := &lazyLruItem[V]{}
|
||||||
v.value = val
|
v.value = val
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -135,7 +141,7 @@ func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error))
|
|||||||
v.expires = time.Now().Add(x.failedTTL).UnixMilli()
|
v.expires = time.Now().Add(x.failedTTL).UnixMilli()
|
||||||
x.target.IncrGetFailed()
|
x.target.IncrGetFailed()
|
||||||
}
|
}
|
||||||
setVs[key] = v
|
|
||||||
x.lock.Lock()
|
x.lock.Lock()
|
||||||
x.core.Add(key, v)
|
x.core.Add(key, v)
|
||||||
x.lock.Unlock()
|
x.lock.Unlock()
|
||||||
@ -145,29 +151,29 @@ func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error))
|
|||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
//func (x *LayLRU[K, V]) Has(key K) bool {
|
//func (x *LazyLRU[K, V]) Has(key K) bool {
|
||||||
// x.lock.Lock()
|
// x.lock.Lock()
|
||||||
// defer x.lock.Unlock()
|
// defer x.lock.Unlock()
|
||||||
// return x.core.Contains(key)
|
// return x.core.Contains(key)
|
||||||
//}
|
//}
|
||||||
|
|
||||||
func (x *LayLRU[K, V]) Set(key K, value V) {
|
func (x *LazyLRU[K, V]) Set(key K, value V) {
|
||||||
x.lock.Lock()
|
x.lock.Lock()
|
||||||
defer x.lock.Unlock()
|
defer x.lock.Unlock()
|
||||||
x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
|
x.core.Add(key, &lazyLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *LayLRU[K, V]) SetHas(key K, value V) bool {
|
func (x *LazyLRU[K, V]) SetHas(key K, value V) bool {
|
||||||
x.lock.Lock()
|
x.lock.Lock()
|
||||||
defer x.lock.Unlock()
|
defer x.lock.Unlock()
|
||||||
if x.core.Contains(key) {
|
if x.core.Contains(key) {
|
||||||
x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
|
x.core.Add(key, &lazyLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *LayLRU[K, V]) Del(key K) bool {
|
func (x *LazyLRU[K, V]) Del(key K) bool {
|
||||||
x.lock.Lock()
|
x.lock.Lock()
|
||||||
ok := x.core.Remove(key)
|
ok := x.core.Remove(key)
|
||||||
x.lock.Unlock()
|
x.lock.Unlock()
|
||||||
@ -179,6 +185,6 @@ func (x *LayLRU[K, V]) Del(key K) bool {
|
|||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *LayLRU[K, V]) Stop() {
|
func (x *LazyLRU[K, V]) Stop() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,7 @@ type slotLRU[K comparable, V any] struct {
|
|||||||
func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
|
func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) {
|
||||||
var (
|
var (
|
||||||
slotKeys = make(map[uint64][]K)
|
slotKeys = make(map[uint64][]K)
|
||||||
vs = make(map[K]V)
|
kVs = make(map[K]V)
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, k := range keys {
|
for _, k := range keys {
|
||||||
@ -49,10 +49,10 @@ func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for key, value := range batches {
|
for key, value := range batches {
|
||||||
vs[key] = value
|
kVs[key] = value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return vs, nil
|
return kVs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *slotLRU[K, V]) getIndex(k K) uint64 {
|
func (x *slotLRU[K, V]) getIndex(k K) uint64 {
|
||||||
|
@ -64,7 +64,7 @@ func NewOnlineCache(client *rpcli.UserClient, group *GroupLocalCache, rdb redis.
|
|||||||
case false:
|
case false:
|
||||||
log.ZDebug(ctx, "fullUserCache is false")
|
log.ZDebug(ctx, "fullUserCache is false")
|
||||||
x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] {
|
x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] {
|
||||||
return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {})
|
return lru.NewLazyLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {})
|
||||||
})
|
})
|
||||||
x.CurrentPhase.Store(DoSubscribeOver)
|
x.CurrentPhase.Store(DoSubscribeOver)
|
||||||
x.Cond.Broadcast()
|
x.Cond.Broadcast()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user