feat: local cache

This commit is contained in:
withchao 2024-01-15 15:23:42 +08:00
parent 3e30e50a09
commit abbb701192
13 changed files with 158 additions and 140 deletions

View File

@ -3,13 +3,17 @@ package localcache
import (
"context"
"github.com/openimsdk/localcache/link"
"github.com/openimsdk/localcache/local"
lopt "github.com/openimsdk/localcache/option"
"github.com/openimsdk/localcache/lru"
"hash/fnv"
"unsafe"
)
type Cache[V any] interface {
Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error)
Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error)
GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error)
Del(ctx context.Context, key ...string)
DelLocal(ctx context.Context, key ...string)
Stop()
}
func New[V any](opts ...Option) Cache[V] {
@ -19,10 +23,22 @@ func New[V any](opts ...Option) Cache[V] {
}
c := cache[V]{opt: opt}
if opt.localSlotNum > 0 && opt.localSlotSize > 0 {
c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
go func() {
c.opt.delCh(c.del)
}()
createSimpleLRU := func() lru.LRU[string, V] {
if opt.actively {
return lru.NewActivelyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
} else {
return lru.NewInertiaLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
}
}
if opt.localSlotNum == 1 {
c.local = createSimpleLRU()
} else {
c.local = lru.NewSlotLRU[string, V](opt.localSlotNum, func(key string) uint64 {
h := fnv.New64a()
h.Write(*(*[]byte)(unsafe.Pointer(&key)))
return h.Sum64()
}, createSimpleLRU)
}
if opt.linkSlotNum > 0 {
c.link = link.New(opt.linkSlotNum)
}
@ -33,7 +49,7 @@ func New[V any](opts ...Option) Cache[V] {
type cache[V any] struct {
opt *option
link link.Link
local local.Cache[V]
local lru.LRU[string, V]
}
func (c *cache[V]) onEvict(key string, value V) {
@ -48,22 +64,29 @@ func (c *cache[V]) onEvict(key string, value V) {
}
func (c *cache[V]) del(key ...string) {
if c.local == nil {
return
}
for _, k := range key {
lks := c.link.Del(k)
c.local.Del(k)
if c.link != nil {
lks := c.link.Del(k)
for k := range lks {
c.local.Del(k)
}
}
}
}
func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error) {
func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error)) (V, error) {
return c.GetLink(ctx, key, fetch)
}
func (c *cache[V]) GetLink(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), link ...string) (V, error) {
if c.local != nil {
return c.local.Get(key, func() (V, error) {
if c.link != nil {
for _, o := range opts {
c.link.Link(key, o.Link...)
}
if len(link) > 0 {
c.link.Link(key, link...)
}
return fetch(ctx)
})
@ -73,13 +96,16 @@ func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.C
}
func (c *cache[V]) Del(ctx context.Context, key ...string) {
if len(key) == 0 {
return
}
for _, fn := range c.opt.delFn {
fn(ctx, key...)
}
if c.local != nil {
c.del(key...)
}
}
func (c *cache[V]) DelLocal(ctx context.Context, key ...string) {
c.del(key...)
}
func (c *cache[V]) Stop() {
c.local.Stop()
}

View File

@ -1,50 +0,0 @@
package local
import (
"hash/fnv"
"time"
"unsafe"
)
type Cache[V any] interface {
Get(key string, fetch func() (V, error)) (V, error)
Del(key string) bool
}
func NewCache[V any](slotNum, slotSize int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[string, V]) Cache[V] {
c := &slot[V]{
n: uint64(slotNum),
slots: make([]*InertiaLRU[string, V], slotNum),
target: target,
}
for i := 0; i < slotNum; i++ {
c.slots[i] = NewInertiaLRU[string, V](slotSize, successTTL, failedTTL, c.target, onEvict)
}
return c
}
type slot[V any] struct {
n uint64
slots []*InertiaLRU[string, V]
target Target
}
func (c *slot[V]) index(s string) uint64 {
h := fnv.New64a()
_, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s)))
return h.Sum64() % c.n
}
func (c *slot[V]) Get(key string, fetch func() (V, error)) (V, error) {
return c.slots[c.index(key)].Get(key, fetch)
}
func (c *slot[V]) Del(key string) bool {
if c.slots[c.index(key)].Del(key) {
c.target.IncrDelHit()
return true
} else {
c.target.IncrDelNotFound()
return false
}
}

View File

@ -1 +0,0 @@
package local

View File

@ -1,4 +1,4 @@
package local
package lru
import "github.com/hashicorp/golang-lru/v2/simplelru"

View File

@ -1,4 +1,4 @@
package local
package lru
import (
"github.com/hashicorp/golang-lru/v2/expirable"
@ -6,15 +6,15 @@ import (
"time"
)
func NewExpirableLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) LRU[K, V] {
var cb expirable.EvictCallback[K, *expirableLruItem[V]]
func NewActivelyLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) LRU[K, V] {
var cb expirable.EvictCallback[K, *activelyLruItem[V]]
if onEvict != nil {
cb = func(key K, value *expirableLruItem[V]) {
cb = func(key K, value *activelyLruItem[V]) {
onEvict(key, value.value)
}
}
core := expirable.NewLRU[K, *expirableLruItem[V]](size, cb, successTTL)
return &expirableLRU[K, V]{
core := expirable.NewLRU[K, *activelyLruItem[V]](size, cb, successTTL)
return &activelyLRU[K, V]{
core: core,
successTTL: successTTL,
failedTTL: failedTTL,
@ -22,21 +22,21 @@ func NewExpirableLRU[K comparable, V any](size int, successTTL, failedTTL time.D
}
}
type expirableLruItem[V any] struct {
type activelyLruItem[V any] struct {
lock sync.RWMutex
err error
value V
}
type expirableLRU[K comparable, V any] struct {
type activelyLRU[K comparable, V any] struct {
lock sync.Mutex
core *expirable.LRU[K, *expirableLruItem[V]]
core *expirable.LRU[K, *activelyLruItem[V]]
successTTL time.Duration
failedTTL time.Duration
target Target
}
func (x *expirableLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
func (x *activelyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
x.lock.Lock()
v, ok := x.core.Get(key)
if ok {
@ -46,7 +46,7 @@ func (x *expirableLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
defer v.lock.RUnlock()
return v.value, v.err
} else {
v = &expirableLruItem[V]{}
v = &activelyLruItem[V]{}
x.core.Add(key, v)
v.lock.Lock()
x.lock.Unlock()
@ -62,12 +62,17 @@ func (x *expirableLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
}
}
func (x *expirableLRU[K, V]) Del(key K) bool {
func (x *activelyLRU[K, V]) Del(key K) bool {
x.lock.Lock()
ok := x.core.Remove(key)
x.lock.Unlock()
if ok {
x.target.IncrDelHit()
} else {
x.target.IncrDelNotFound()
}
return ok
}
func (x *expirableLRU[K, V]) Stop() {
func (x *activelyLRU[K, V]) Stop() {
}

View File

@ -1,4 +1,4 @@
package local
package lru
import (
"github.com/hashicorp/golang-lru/v2/simplelru"
@ -77,6 +77,11 @@ func (x *InertiaLRU[K, V]) Del(key K) bool {
x.lock.Lock()
ok := x.core.Remove(key)
x.lock.Unlock()
if ok {
x.target.IncrDelHit()
} else {
x.target.IncrDelNotFound()
}
return ok
}

View File

@ -1,11 +1,13 @@
package local
package lru
import (
"fmt"
"hash/fnv"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"
)
type cacheTarget struct {
@ -42,7 +44,13 @@ func (r *cacheTarget) String() string {
func TestName(t *testing.T) {
target := &cacheTarget{}
l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target, nil)
l := NewSlotLRU[string, string](100, func(k string) uint64 {
h := fnv.New64a()
h.Write(*(*[]byte)(unsafe.Pointer(&k)))
return h.Sum64()
}, func() LRU[string, string] {
return NewActivelyLRU[string, string](100, time.Second*60, time.Second, target, nil)
})
//l := NewInertiaLRU[string, string](1000, time.Second*20, time.Second*5, target)
fn := func(key string, n int, fetch func() (string, error)) {
@ -53,8 +61,9 @@ func TestName(t *testing.T) {
//} else {
// t.Error("key", key, err)
//}
l.Get(key, fetch)
v, err := l.Get(key, fetch)
//time.Sleep(time.Second / 100)
func(v ...any) {}(v, err)
}
}

View File

@ -0,0 +1,37 @@
package lru
func NewSlotLRU[K comparable, V any](slotNum int, hash func(K) uint64, create func() LRU[K, V]) LRU[K, V] {
x := &slotLRU[K, V]{
n: uint64(slotNum),
slots: make([]LRU[K, V], slotNum),
hash: hash,
}
for i := 0; i < slotNum; i++ {
x.slots[i] = create()
}
return x
}
type slotLRU[K comparable, V any] struct {
n uint64
slots []LRU[K, V]
hash func(k K) uint64
}
func (x *slotLRU[K, V]) getIndex(k K) uint64 {
return x.hash(k) % x.n
}
func (x *slotLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
return x.slots[x.getIndex(key)].Get(key, fetch)
}
func (x *slotLRU[K, V]) Del(key K) bool {
return x.slots[x.getIndex(key)].Del(key)
}
func (x *slotLRU[K, V]) Stop() {
for _, slot := range x.slots {
slot.Stop()
}
}

View File

@ -2,7 +2,7 @@ package localcache
import (
"context"
"github.com/openimsdk/localcache/local"
"github.com/openimsdk/localcache/lru"
"time"
)
@ -11,6 +11,7 @@ func defaultOption() *option {
localSlotNum: 500,
localSlotSize: 20000,
linkSlotNum: 500,
actively: false,
localSuccessTTL: time.Minute,
localFailedTTL: time.Second * 5,
delFn: make([]func(ctx context.Context, key ...string), 0, 2),
@ -22,15 +23,28 @@ type option struct {
localSlotNum int
localSlotSize int
linkSlotNum int
actively bool
localSuccessTTL time.Duration
localFailedTTL time.Duration
delFn []func(ctx context.Context, key ...string)
delCh func(fn func(key ...string))
target local.Target
target lru.Target
}
type Option func(o *option)
func WithActively() Option {
return func(o *option) {
o.actively = true
}
}
func WithInertia() Option {
return func(o *option) {
o.actively = false
}
}
func WithLocalDisable() Option {
return WithLinkSlotNum(0)
}
@ -75,7 +89,7 @@ func WithLocalFailedTTL(localFailedTTL time.Duration) Option {
}
}
func WithTarget(target local.Target) Option {
func WithTarget(target lru.Target) Option {
if target == nil {
panic("target should not be nil")
}

View File

@ -1,20 +0,0 @@
package option
func NewOption() *Option {
return &Option{}
}
type Option struct {
Link []string
}
func (o *Option) WithLink(key ...string) *Option {
if len(key) > 0 {
if len(o.Link) == 0 {
o.Link = key
} else {
o.Link = append(o.Link, key...)
}
}
return o
}

View File

@ -11,14 +11,17 @@ import (
func NewConversationLocalCache(client rpcclient.ConversationRpcClient, cli redis.UniversalClient) *ConversationLocalCache {
return &ConversationLocalCache{
local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Conversation.Topic, cli)),
client: client,
local: localcache.New[any](
localcache.WithLocalSlotNum(config.Config.LocalCache.Conversation.SlotNum),
localcache.WithLocalSlotSize(config.Config.LocalCache.Conversation.SlotSize),
),
}
}
type ConversationLocalCache struct {
local localcache.Cache[any]
client rpcclient.ConversationRpcClient
local localcache.Cache[any]
}
func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {

View File

@ -4,7 +4,6 @@ import (
"context"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/localcache"
"github.com/openimsdk/localcache/option"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
@ -13,31 +12,19 @@ import (
func NewFriendLocalCache(client rpcclient.FriendRpcClient, cli redis.UniversalClient) *FriendLocalCache {
return &FriendLocalCache{
local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Friend.Topic, cli)),
client: client,
local: localcache.New[any](
localcache.WithLocalSlotNum(config.Config.LocalCache.Friend.SlotNum),
localcache.WithLocalSlotSize(config.Config.LocalCache.Friend.SlotSize),
),
}
}
type FriendLocalCache struct {
local localcache.Cache[any]
client rpcclient.FriendRpcClient
local localcache.Cache[any]
}
//func (f *FriendLocalCache) GetFriendIDs(ctx context.Context, ownerUserID string) (val []string, err error) {
// log.ZDebug(ctx, "FriendLocalCache GetFriendIDs req", "ownerUserID", ownerUserID)
// defer func() {
// if err == nil {
// log.ZDebug(ctx, "FriendLocalCache GetFriendIDs return", "value", val)
// } else {
// log.ZError(ctx, "FriendLocalCache GetFriendIDs return", err)
// }
// }()
// return localcache.AnyValue[[]string](f.local.Get(ctx, cachekey.GetFriendIDsKey(ownerUserID), func(ctx context.Context) (any, error) {
// log.ZDebug(ctx, "FriendLocalCache GetFriendIDs call rpc", "ownerUserID", ownerUserID)
// return f.client.GetFriendIDs(ctx, ownerUserID)
// }))
//}
func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (val bool, err error) {
log.ZDebug(ctx, "FriendLocalCache IsFriend req", "possibleFriendUserID", possibleFriendUserID, "userID", userID)
defer func() {
@ -47,10 +34,10 @@ func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, u
log.ZError(ctx, "FriendLocalCache IsFriend return", err)
}
}()
return localcache.AnyValue[bool](f.local.Get(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) (any, error) {
return localcache.AnyValue[bool](f.local.GetLink(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "FriendLocalCache IsFriend rpc", "possibleFriendUserID", possibleFriendUserID, "userID", userID)
return f.client.IsFriend(ctx, possibleFriendUserID, userID)
}, option.NewOption().WithLink(cachekey.GetFriendIDsKey(possibleFriendUserID))))
}, cachekey.GetFriendIDsKey(possibleFriendUserID)))
}
// IsBlack possibleBlackUserID selfUserID
@ -63,8 +50,8 @@ func (f *FriendLocalCache) IsBlack(ctx context.Context, possibleBlackUserID, use
log.ZError(ctx, "FriendLocalCache IsBlack return", err)
}
}()
return localcache.AnyValue[bool](f.local.Get(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) (any, error) {
return localcache.AnyValue[bool](f.local.GetLink(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "FriendLocalCache IsBlack rpc", "possibleBlackUserID", possibleBlackUserID, "userID", userID)
return f.client.IsBlack(ctx, possibleBlackUserID, userID)
}, option.NewOption().WithLink(cachekey.GetBlackIDsKey(userID))))
}, cachekey.GetBlackIDsKey(userID)))
}

View File

@ -11,14 +11,17 @@ import (
func NewGroupLocalCache(client rpcclient.GroupRpcClient, cli redis.UniversalClient) *GroupLocalCache {
return &GroupLocalCache{
local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Group.Topic, cli)),
client: client,
local: localcache.New[any](
localcache.WithLocalSlotNum(config.Config.LocalCache.Group.SlotNum),
localcache.WithLocalSlotSize(config.Config.LocalCache.Group.SlotSize),
),
}
}
type GroupLocalCache struct {
local localcache.Cache[any]
client rpcclient.GroupRpcClient
local localcache.Cache[any]
}
func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) {