mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-05-05 09:46:49 +08:00
refactor: rename cache.
This commit is contained in:
parent
9660556f35
commit
119a2c2247
@ -16,12 +16,19 @@ func NewSubscriber(client redis.UniversalClient, channel string) *Subscriber {
|
|||||||
return &Subscriber{client: client, channel: channel}
|
return &Subscriber{client: client, channel: channel}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Subscriber) OnMessage(callback func(string)) error {
|
func (s *Subscriber) OnMessage(ctx context.Context, callback func(string)) error {
|
||||||
messageChannel := s.client.Subscribe(ctx, s.channel).Channel()
|
messageChannel := s.client.Subscribe(ctx, s.channel).Channel()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for msg := range messageChannel {
|
for {
|
||||||
callback(msg.Payload)
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case msg := <-messageChannel:
|
||||||
|
callback(msg.Payload)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -21,13 +21,14 @@ func New[V any](opts ...Option) Cache[V] {
|
|||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(opt)
|
o(opt)
|
||||||
}
|
}
|
||||||
|
|
||||||
c := cache[V]{opt: opt}
|
c := cache[V]{opt: opt}
|
||||||
if opt.localSlotNum > 0 && opt.localSlotSize > 0 {
|
if opt.localSlotNum > 0 && opt.localSlotSize > 0 {
|
||||||
createSimpleLRU := func() lru.LRU[string, V] {
|
createSimpleLRU := func() lru.LRU[string, V] {
|
||||||
if opt.actively {
|
if opt.expirationEvict {
|
||||||
return lru.NewActivelyLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
return lru.NewExpirationLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
||||||
} else {
|
} else {
|
||||||
return lru.NewInertiaLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
return lru.NewLayLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if opt.localSlotNum == 1 {
|
if opt.localSlotNum == 1 {
|
||||||
|
@ -11,7 +11,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestName(t *testing.T) {
|
func TestName(t *testing.T) {
|
||||||
c := New[string](WithActively())
|
c := New[string](WithExpirationEvict())
|
||||||
//c := New[string]()
|
//c := New[string]()
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
@ -6,15 +6,15 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewActivelyLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) LRU[K, V] {
|
func NewExpirationLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) LRU[K, V] {
|
||||||
var cb expirable.EvictCallback[K, *activelyLruItem[V]]
|
var cb expirable.EvictCallback[K, *expirationLruItem[V]]
|
||||||
if onEvict != nil {
|
if onEvict != nil {
|
||||||
cb = func(key K, value *activelyLruItem[V]) {
|
cb = func(key K, value *expirationLruItem[V]) {
|
||||||
onEvict(key, value.value)
|
onEvict(key, value.value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
core := expirable.NewLRU[K, *activelyLruItem[V]](size, cb, successTTL)
|
core := expirable.NewLRU[K, *expirationLruItem[V]](size, cb, successTTL)
|
||||||
return &activelyLRU[K, V]{
|
return &ExpirationLRU[K, V]{
|
||||||
core: core,
|
core: core,
|
||||||
successTTL: successTTL,
|
successTTL: successTTL,
|
||||||
failedTTL: failedTTL,
|
failedTTL: failedTTL,
|
||||||
@ -22,21 +22,21 @@ func NewActivelyLRU[K comparable, V any](size int, successTTL, failedTTL time.Du
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type activelyLruItem[V any] struct {
|
type expirationLruItem[V any] struct {
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
err error
|
err error
|
||||||
value V
|
value V
|
||||||
}
|
}
|
||||||
|
|
||||||
type activelyLRU[K comparable, V any] struct {
|
type ExpirationLRU[K comparable, V any] struct {
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
core *expirable.LRU[K, *activelyLruItem[V]]
|
core *expirable.LRU[K, *expirationLruItem[V]]
|
||||||
successTTL time.Duration
|
successTTL time.Duration
|
||||||
failedTTL time.Duration
|
failedTTL time.Duration
|
||||||
target Target
|
target Target
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *activelyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
||||||
x.lock.Lock()
|
x.lock.Lock()
|
||||||
v, ok := x.core.Get(key)
|
v, ok := x.core.Get(key)
|
||||||
if ok {
|
if ok {
|
||||||
@ -46,7 +46,7 @@ func (x *activelyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
|||||||
defer v.lock.RUnlock()
|
defer v.lock.RUnlock()
|
||||||
return v.value, v.err
|
return v.value, v.err
|
||||||
} else {
|
} else {
|
||||||
v = &activelyLruItem[V]{}
|
v = &expirationLruItem[V]{}
|
||||||
x.core.Add(key, v)
|
x.core.Add(key, v)
|
||||||
v.lock.Lock()
|
v.lock.Lock()
|
||||||
x.lock.Unlock()
|
x.lock.Unlock()
|
||||||
@ -62,7 +62,7 @@ func (x *activelyLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *activelyLRU[K, V]) Del(key K) bool {
|
func (x *ExpirationLRU[K, V]) Del(key K) bool {
|
||||||
x.lock.Lock()
|
x.lock.Lock()
|
||||||
ok := x.core.Remove(key)
|
ok := x.core.Remove(key)
|
||||||
x.lock.Unlock()
|
x.lock.Unlock()
|
||||||
@ -74,5 +74,5 @@ func (x *activelyLRU[K, V]) Del(key K) bool {
|
|||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *activelyLRU[K, V]) Stop() {
|
func (x *ExpirationLRU[K, V]) Stop() {
|
||||||
}
|
}
|
@ -6,25 +6,25 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type inertiaLruItem[V any] struct {
|
type layLruItem[V any] struct {
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
expires int64
|
expires int64
|
||||||
err error
|
err error
|
||||||
value V
|
value V
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewInertiaLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *InertiaLRU[K, V] {
|
func NewLayLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[K, V]) *LayLRU[K, V] {
|
||||||
var cb simplelru.EvictCallback[K, *inertiaLruItem[V]]
|
var cb simplelru.EvictCallback[K, *layLruItem[V]]
|
||||||
if onEvict != nil {
|
if onEvict != nil {
|
||||||
cb = func(key K, value *inertiaLruItem[V]) {
|
cb = func(key K, value *layLruItem[V]) {
|
||||||
onEvict(key, value.value)
|
onEvict(key, value.value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
core, err := simplelru.NewLRU[K, *inertiaLruItem[V]](size, cb)
|
core, err := simplelru.NewLRU[K, *layLruItem[V]](size, cb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
return &InertiaLRU[K, V]{
|
return &LayLRU[K, V]{
|
||||||
core: core,
|
core: core,
|
||||||
successTTL: successTTL,
|
successTTL: successTTL,
|
||||||
failedTTL: failedTTL,
|
failedTTL: failedTTL,
|
||||||
@ -32,15 +32,15 @@ func NewInertiaLRU[K comparable, V any](size int, successTTL, failedTTL time.Dur
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type InertiaLRU[K comparable, V any] struct {
|
type LayLRU[K comparable, V any] struct {
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
core *simplelru.LRU[K, *inertiaLruItem[V]]
|
core *simplelru.LRU[K, *layLruItem[V]]
|
||||||
successTTL time.Duration
|
successTTL time.Duration
|
||||||
failedTTL time.Duration
|
failedTTL time.Duration
|
||||||
target Target
|
target Target
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *InertiaLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
||||||
x.lock.Lock()
|
x.lock.Lock()
|
||||||
v, ok := x.core.Get(key)
|
v, ok := x.core.Get(key)
|
||||||
if ok {
|
if ok {
|
||||||
@ -53,7 +53,7 @@ func (x *InertiaLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
|||||||
return value, err
|
return value, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
v = &inertiaLruItem[V]{}
|
v = &layLruItem[V]{}
|
||||||
x.core.Add(key, v)
|
x.core.Add(key, v)
|
||||||
v.lock.Lock()
|
v.lock.Lock()
|
||||||
x.lock.Unlock()
|
x.lock.Unlock()
|
||||||
@ -73,7 +73,7 @@ func (x *InertiaLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
|||||||
return v.value, v.err
|
return v.value, v.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *InertiaLRU[K, V]) Del(key K) bool {
|
func (x *LayLRU[K, V]) Del(key K) bool {
|
||||||
x.lock.Lock()
|
x.lock.Lock()
|
||||||
ok := x.core.Remove(key)
|
ok := x.core.Remove(key)
|
||||||
x.lock.Unlock()
|
x.lock.Unlock()
|
||||||
@ -85,6 +85,6 @@ func (x *InertiaLRU[K, V]) Del(key K) bool {
|
|||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *InertiaLRU[K, V]) Stop() {
|
func (x *LayLRU[K, V]) Stop() {
|
||||||
|
|
||||||
}
|
}
|
@ -49,7 +49,7 @@ func TestName(t *testing.T) {
|
|||||||
h.Write(*(*[]byte)(unsafe.Pointer(&k)))
|
h.Write(*(*[]byte)(unsafe.Pointer(&k)))
|
||||||
return h.Sum64()
|
return h.Sum64()
|
||||||
}, func() LRU[string, string] {
|
}, func() LRU[string, string] {
|
||||||
return NewActivelyLRU[string, string](100, time.Second*60, time.Second, target, nil)
|
return NewExpirationLRU[string, string](100, time.Second*60, time.Second, target, nil)
|
||||||
})
|
})
|
||||||
//l := NewInertiaLRU[string, string](1000, time.Second*20, time.Second*5, target)
|
//l := NewInertiaLRU[string, string](1000, time.Second*20, time.Second*5, target)
|
||||||
|
|
@ -11,7 +11,7 @@ func defaultOption() *option {
|
|||||||
localSlotNum: 500,
|
localSlotNum: 500,
|
||||||
localSlotSize: 20000,
|
localSlotSize: 20000,
|
||||||
linkSlotNum: 500,
|
linkSlotNum: 500,
|
||||||
actively: false,
|
expirationEvict: false,
|
||||||
localSuccessTTL: time.Minute,
|
localSuccessTTL: time.Minute,
|
||||||
localFailedTTL: time.Second * 5,
|
localFailedTTL: time.Second * 5,
|
||||||
delFn: make([]func(ctx context.Context, key ...string), 0, 2),
|
delFn: make([]func(ctx context.Context, key ...string), 0, 2),
|
||||||
@ -20,10 +20,12 @@ func defaultOption() *option {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type option struct {
|
type option struct {
|
||||||
localSlotNum int
|
localSlotNum int
|
||||||
localSlotSize int
|
localSlotSize int
|
||||||
linkSlotNum int
|
linkSlotNum int
|
||||||
actively bool
|
// expirationEvict: true means that the cache will be actively cleared when the timer expires,
|
||||||
|
// false means that the cache will be lazily deleted.
|
||||||
|
expirationEvict bool
|
||||||
localSuccessTTL time.Duration
|
localSuccessTTL time.Duration
|
||||||
localFailedTTL time.Duration
|
localFailedTTL time.Duration
|
||||||
delFn []func(ctx context.Context, key ...string)
|
delFn []func(ctx context.Context, key ...string)
|
||||||
@ -32,15 +34,15 @@ type option struct {
|
|||||||
|
|
||||||
type Option func(o *option)
|
type Option func(o *option)
|
||||||
|
|
||||||
func WithActively() Option {
|
func WithExpirationEvict() Option {
|
||||||
return func(o *option) {
|
return func(o *option) {
|
||||||
o.actively = true
|
o.expirationEvict = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithInertia() Option {
|
func WithLazy() Option {
|
||||||
return func(o *option) {
|
return func(o *option) {
|
||||||
o.actively = false
|
o.expirationEvict = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user