From 119a2c2247a7420b6ecd229835eac0978841e073 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Mon, 22 Jan 2024 21:14:21 +0800 Subject: [PATCH] refactor: rename cache. --- pkg/common/redispubsub/redissubscriber.go | 13 +++++++--- pkg/localcache/cache.go | 7 +++--- pkg/localcache/cache_test.go | 2 +- .../{lru_actively.go => lru_expiration.go} | 24 +++++++++---------- .../lru/{lru_inertia.go => lru_lazy.go} | 24 +++++++++---------- .../{lru_inertia_test.go => lru_lazy_test.go} | 2 +- pkg/localcache/option.go | 20 +++++++++------- 7 files changed, 51 insertions(+), 41 deletions(-) rename pkg/localcache/lru/{lru_actively.go => lru_expiration.go} (57%) rename pkg/localcache/lru/{lru_inertia.go => lru_lazy.go} (65%) rename pkg/localcache/lru/{lru_inertia_test.go => lru_lazy_test.go} (96%) diff --git a/pkg/common/redispubsub/redissubscriber.go b/pkg/common/redispubsub/redissubscriber.go index 69cfd8a69..a7029a993 100644 --- a/pkg/common/redispubsub/redissubscriber.go +++ b/pkg/common/redispubsub/redissubscriber.go @@ -16,12 +16,19 @@ func NewSubscriber(client redis.UniversalClient, channel string) *Subscriber { return &Subscriber{client: client, channel: channel} } -func (s *Subscriber) OnMessage(callback func(string)) error { +func (s *Subscriber) OnMessage(ctx context.Context, callback func(string)) error { messageChannel := s.client.Subscribe(ctx, s.channel).Channel() + go func() { - for msg := range messageChannel { - callback(msg.Payload) + for { + select { + case <-ctx.Done(): + return + case msg := <-messageChannel: + callback(msg.Payload) + } } }() + return nil } diff --git a/pkg/localcache/cache.go b/pkg/localcache/cache.go index ed0e16419..4b405b46a 100644 --- a/pkg/localcache/cache.go +++ b/pkg/localcache/cache.go @@ -21,13 +21,14 @@ func New[V any](opts ...Option) Cache[V] { for _, o := range opts { o(opt) } + c := cache[V]{opt: opt} if opt.localSlotNum > 0 && opt.localSlotSize > 0 { createSimpleLRU := func() lru.LRU[string, V] { - if opt.actively { - return lru.NewActivelyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + if opt.expirationEvict { + return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) } else { - return lru.NewInertiaLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + return lru.NewLayLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) } } if opt.localSlotNum == 1 { diff --git a/pkg/localcache/cache_test.go b/pkg/localcache/cache_test.go index 90413fd20..c497b7b4a 100644 --- a/pkg/localcache/cache_test.go +++ b/pkg/localcache/cache_test.go @@ -11,7 +11,7 @@ import ( ) func TestName(t *testing.T) { - c := New[string](WithActively()) + c := New[string](WithExpirationEvict()) //c := New[string]() ctx := context.Background() diff --git a/pkg/localcache/lru/lru_actively.go b/pkg/localcache/lru/lru_expiration.go similarity index 57% rename from pkg/localcache/lru/lru_actively.go rename to pkg/localcache/lru/lru_expiration.go index 55fa4d0d1..3cf61f061 100644 --- a/pkg/localcache/lru/lru_actively.go +++ b/pkg/localcache/lru/lru_expiration.go @@ -6,15 +6,15 @@ import ( "time" ) -func NewActivelyLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) LRU[K, V] { - var cb expirable.EvictCallback[K, *activelyLruItem[V]] +func NewExpirationLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) LRU[K, V] { + var cb expirable.EvictCallback[K, *expirationLruItem[V]] if onEvict != nil { - cb = func(key K, value *activelyLruItem[V]) { + cb = func(key K, value *expirationLruItem[V]) { onEvict(key, value.value) } } - core := expirable.NewLRU[K, *activelyLruItem[V]](size, cb, successTTL) - return &activelyLRU[K, V]{ + core := expirable.NewLRU[K, *expirationLruItem[V]](size, cb, successTTL) + return &ExpirationLRU[K, V]{ core: core, successTTL: successTTL, failedTTL: failedTTL, @@ -22,21 +22,21 @@ func NewActivelyLRU[K comparable, V any](size int, successTTL, failedTTL time.Du } } -type activelyLruItem[V any] struct { +type expirationLruItem[V any] struct { lock sync.RWMutex err error value V } -type activelyLRU[K comparable, V any] struct { +type ExpirationLRU[K comparable, V any] struct { lock sync.Mutex - core *expirable.LRU[K, *activelyLruItem[V]] + core *expirable.LRU[K, *expirationLruItem[V]] successTTL time.Duration failedTTL time.Duration target Target } -func (x *activelyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { +func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { x.lock.Lock() v, ok := x.core.Get(key) if ok { @@ -46,7 +46,7 @@ func (x *activelyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { defer v.lock.RUnlock() return v.value, v.err } else { - v = &activelyLruItem[V]{} + v = &expirationLruItem[V]{} x.core.Add(key, v) v.lock.Lock() x.lock.Unlock() @@ -62,7 +62,7 @@ func (x *activelyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { } } -func (x *activelyLRU[K, V]) Del(key K) bool { +func (x *ExpirationLRU[K, V]) Del(key K) bool { x.lock.Lock() ok := x.core.Remove(key) x.lock.Unlock() @@ -74,5 +74,5 @@ func (x *activelyLRU[K, V]) Del(key K) bool { return ok } -func (x *activelyLRU[K, V]) Stop() { +func (x *ExpirationLRU[K, V]) Stop() { } diff --git a/pkg/localcache/lru/lru_inertia.go b/pkg/localcache/lru/lru_lazy.go similarity index 65% rename from pkg/localcache/lru/lru_inertia.go rename to pkg/localcache/lru/lru_lazy.go index b1f9f24af..a9270ea4a 100644 --- a/pkg/localcache/lru/lru_inertia.go +++ b/pkg/localcache/lru/lru_lazy.go @@ -6,25 +6,25 @@ import ( "time" ) -type inertiaLruItem[V any] struct { +type layLruItem[V any] struct { lock sync.Mutex expires int64 err error value V } -func NewInertiaLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *InertiaLRU[K, V] { - var cb simplelru.EvictCallback[K, *inertiaLruItem[V]] +func NewLayLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *LayLRU[K, V] { + var cb simplelru.EvictCallback[K, *layLruItem[V]] if onEvict != nil { - cb = func(key K, value *inertiaLruItem[V]) { + cb = func(key K, value *layLruItem[V]) { onEvict(key, value.value) } } - core, err := simplelru.NewLRU[K, *inertiaLruItem[V]](size, cb) + core, err := simplelru.NewLRU[K, *layLruItem[V]](size, cb) if err != nil { panic(err) } - return &InertiaLRU[K, V]{ + return &LayLRU[K, V]{ core: core, successTTL: successTTL, failedTTL: failedTTL, @@ -32,15 +32,15 @@ func NewInertiaLRU[K comparable, V any](size int, successTTL, failedTTL time.Dur } } -type InertiaLRU[K comparable, V any] struct { +type LayLRU[K comparable, V any] struct { lock sync.Mutex - core *simplelru.LRU[K, *inertiaLruItem[V]] + core *simplelru.LRU[K, *layLruItem[V]] successTTL time.Duration failedTTL time.Duration target Target } -func (x *InertiaLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { +func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { x.lock.Lock() v, ok := x.core.Get(key) if ok { @@ -53,7 +53,7 @@ func (x *InertiaLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { return value, err } } else { - v = &inertiaLruItem[V]{} + v = &layLruItem[V]{} x.core.Add(key, v) v.lock.Lock() x.lock.Unlock() @@ -73,7 +73,7 @@ func (x *InertiaLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { return v.value, v.err } -func (x *InertiaLRU[K, V]) Del(key K) bool { +func (x *LayLRU[K, V]) Del(key K) bool { x.lock.Lock() ok := x.core.Remove(key) x.lock.Unlock() @@ -85,6 +85,6 @@ func (x *InertiaLRU[K, V]) Del(key K) bool { return ok } -func (x *InertiaLRU[K, V]) Stop() { +func (x *LayLRU[K, V]) Stop() { } diff --git a/pkg/localcache/lru/lru_inertia_test.go b/pkg/localcache/lru/lru_lazy_test.go similarity index 96% rename from pkg/localcache/lru/lru_inertia_test.go rename to pkg/localcache/lru/lru_lazy_test.go index df4919d2d..09fd04cd3 100644 --- a/pkg/localcache/lru/lru_inertia_test.go +++ b/pkg/localcache/lru/lru_lazy_test.go @@ -49,7 +49,7 @@ func TestName(t *testing.T) { h.Write(*(*[]byte)(unsafe.Pointer(&k))) return h.Sum64() }, func() LRU[string, string] { - return NewActivelyLRU[string, string](100, time.Second*60, time.Second, target, nil) + return NewExpirationLRU[string, string](100, time.Second*60, time.Second, target, nil) }) //l := NewInertiaLRU[string, string](1000, time.Second*20, time.Second*5, target) diff --git a/pkg/localcache/option.go b/pkg/localcache/option.go index 17780161a..ecb5da0e6 100644 --- a/pkg/localcache/option.go +++ b/pkg/localcache/option.go @@ -11,7 +11,7 @@ func defaultOption() *option { localSlotNum: 500, localSlotSize: 20000, linkSlotNum: 500, - actively: false, + expirationEvict: false, localSuccessTTL: time.Minute, localFailedTTL: time.Second * 5, delFn: make([]func(ctx context.Context, key ...string), 0, 2), @@ -20,10 +20,12 @@ func defaultOption() *option { } type option struct { - localSlotNum int - localSlotSize int - linkSlotNum int - actively bool + localSlotNum int + localSlotSize int + linkSlotNum int + // expirationEvict: true means that the cache will be actively cleared when the timer expires, + // false means that the cache will be lazily deleted. + expirationEvict bool localSuccessTTL time.Duration localFailedTTL time.Duration delFn []func(ctx context.Context, key ...string) @@ -32,15 +34,15 @@ type option struct { type Option func(o *option) -func WithActively() Option { +func WithExpirationEvict() Option { return func(o *option) { - o.actively = true + o.expirationEvict = true } } -func WithInertia() Option { +func WithLazy() Option { return func(o *option) { - o.actively = false + o.expirationEvict = false } }