mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-05-14 23:59:02 +08:00
feat: cache add single-flight and timing-wheel.
This commit is contained in:
parent
006e4a1e93
commit
1f1ab65375
@ -3,10 +3,14 @@ package localcache
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/link"
|
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/link"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local"
|
|
||||||
opt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option"
|
opt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option"
|
||||||
|
"hash/fnv"
|
||||||
|
"time"
|
||||||
|
"unsafe"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const TimingWheelSize = 500
|
||||||
|
|
||||||
type Cache[V any] interface {
|
type Cache[V any] interface {
|
||||||
Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error)
|
Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error)
|
||||||
Del(ctx context.Context, key ...string)
|
Del(ctx context.Context, key ...string)
|
||||||
@ -17,8 +21,15 @@ func New[V any](opts ...Option) Cache[V] {
|
|||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(opt)
|
o(opt)
|
||||||
}
|
}
|
||||||
c := &cache[V]{opt: opt, link: link.New(opt.localSlotNum)}
|
c := &cache[V]{
|
||||||
c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
opt: opt,
|
||||||
|
link: link.New(opt.localSlotNum),
|
||||||
|
n: uint64(opt.localSlotNum),
|
||||||
|
}
|
||||||
|
c.timingWheel = NewTimeWheel[string, V](TimingWheelSize, time.Second, c.exec)
|
||||||
|
for i := 0; i < opt.localSlotNum; i++ {
|
||||||
|
c.slots[i] = NewLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
|
||||||
|
}
|
||||||
go func() {
|
go func() {
|
||||||
c.opt.delCh(c.del)
|
c.opt.delCh(c.del)
|
||||||
}()
|
}()
|
||||||
@ -26,16 +37,24 @@ func New[V any](opts ...Option) Cache[V] {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type cache[V any] struct {
|
type cache[V any] struct {
|
||||||
|
n uint64
|
||||||
|
slots []*LRU[string, V]
|
||||||
opt *option
|
opt *option
|
||||||
link link.Link
|
link link.Link
|
||||||
local local.Cache[V]
|
timingWheel *TimeWheel[string, V]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache[V]) index(key string) uint64 {
|
||||||
|
h := fnv.New64a()
|
||||||
|
_, _ = h.Write(*(*[]byte)(unsafe.Pointer(&key)))
|
||||||
|
return h.Sum64() % c.n
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache[V]) onEvict(key string, value V) {
|
func (c *cache[V]) onEvict(key string, value V) {
|
||||||
lks := c.link.Del(key)
|
lks := c.link.Del(key)
|
||||||
for k := range lks {
|
for k := range lks {
|
||||||
if key != k { // prevent deadlock
|
if key != k { // prevent deadlock
|
||||||
c.local.Del(k)
|
c.slots[c.index(k)].Del(k)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -43,9 +62,9 @@ func (c *cache[V]) onEvict(key string, value V) {
|
|||||||
func (c *cache[V]) del(key ...string) {
|
func (c *cache[V]) del(key ...string) {
|
||||||
for _, k := range key {
|
for _, k := range key {
|
||||||
lks := c.link.Del(k)
|
lks := c.link.Del(k)
|
||||||
c.local.Del(k)
|
c.slots[c.index(k)].Del(k)
|
||||||
for k := range lks {
|
for k := range lks {
|
||||||
c.local.Del(k)
|
c.slots[c.index(k)].Del(k)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -59,7 +78,7 @@ func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.C
|
|||||||
if len(opts) > 0 && len(opts[0].Link) > 0 {
|
if len(opts) > 0 && len(opts[0].Link) > 0 {
|
||||||
c.link.Link(key, opts[0].Link...)
|
c.link.Link(key, opts[0].Link...)
|
||||||
}
|
}
|
||||||
return c.local.Get(key, func() (V, error) {
|
return c.slots[c.index(key)].Get(key, func() (V, error) {
|
||||||
return fetch(ctx)
|
return fetch(ctx)
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
@ -78,3 +97,6 @@ func (c *cache[V]) Del(ctx context.Context, key ...string) {
|
|||||||
c.del(key...)
|
c.del(key...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
func (c *cache[V]) exec(key string, value V) {
|
||||||
|
|
||||||
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package local
|
package localcache
|
||||||
|
|
||||||
import "github.com/hashicorp/golang-lru/v2/simplelru"
|
import "github.com/hashicorp/golang-lru/v2/simplelru"
|
||||||
|
|
@ -1,50 +0,0 @@
|
|||||||
package local
|
|
||||||
|
|
||||||
import (
|
|
||||||
"hash/fnv"
|
|
||||||
"time"
|
|
||||||
"unsafe"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Cache[V any] interface {
|
|
||||||
Get(key string, fetch func() (V, error)) (V, error)
|
|
||||||
Del(key string) bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewCache[V any](slotNum, slotSize int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[string, V]) Cache[V] {
|
|
||||||
c := &slot[V]{
|
|
||||||
n: uint64(slotNum),
|
|
||||||
slots: make([]*LRU[string, V], slotNum),
|
|
||||||
target: target,
|
|
||||||
}
|
|
||||||
for i := 0; i < slotNum; i++ {
|
|
||||||
c.slots[i] = NewLRU[string, V](slotSize, successTTL, failedTTL, c.target, onEvict)
|
|
||||||
}
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
type slot[V any] struct {
|
|
||||||
n uint64
|
|
||||||
slots []*LRU[string, V]
|
|
||||||
target Target
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *slot[V]) index(s string) uint64 {
|
|
||||||
h := fnv.New64a()
|
|
||||||
_, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s)))
|
|
||||||
return h.Sum64() % c.n
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *slot[V]) Get(key string, fetch func() (V, error)) (V, error) {
|
|
||||||
return c.slots[c.index(key)].Get(key, fetch)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *slot[V]) Del(key string) bool {
|
|
||||||
if c.slots[c.index(key)].Del(key) {
|
|
||||||
c.target.IncrDelHit()
|
|
||||||
return true
|
|
||||||
} else {
|
|
||||||
c.target.IncrDelNotFound()
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,95 +0,0 @@
|
|||||||
package local
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type cacheTarget struct {
|
|
||||||
getHit int64
|
|
||||||
getSuccess int64
|
|
||||||
getFailed int64
|
|
||||||
delHit int64
|
|
||||||
delNotFound int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *cacheTarget) IncrGetHit() {
|
|
||||||
atomic.AddInt64(&r.getHit, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *cacheTarget) IncrGetSuccess() {
|
|
||||||
atomic.AddInt64(&r.getSuccess, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *cacheTarget) IncrGetFailed() {
|
|
||||||
atomic.AddInt64(&r.getFailed, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *cacheTarget) IncrDelHit() {
|
|
||||||
atomic.AddInt64(&r.delHit, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *cacheTarget) IncrDelNotFound() {
|
|
||||||
atomic.AddInt64(&r.delNotFound, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *cacheTarget) String() string {
|
|
||||||
return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestName(t *testing.T) {
|
|
||||||
target := &cacheTarget{}
|
|
||||||
l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target, nil)
|
|
||||||
//l := NewLRU[string, string](1000, time.Second*20, time.Second*5, target)
|
|
||||||
|
|
||||||
fn := func(key string, n int, fetch func() (string, error)) {
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
//v, err := l.Get(key, fetch)
|
|
||||||
//if err == nil {
|
|
||||||
// t.Log("key", key, "value", v)
|
|
||||||
//} else {
|
|
||||||
// t.Error("key", key, err)
|
|
||||||
//}
|
|
||||||
l.Get(key, fetch)
|
|
||||||
//time.Sleep(time.Second / 100)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tmp := make(map[string]struct{})
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
for i := 0; i < 10000; i++ {
|
|
||||||
wg.Add(1)
|
|
||||||
key := fmt.Sprintf("key_%d", i%200)
|
|
||||||
tmp[key] = struct{}{}
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
//t.Log(key)
|
|
||||||
fn(key, 10000, func() (string, error) {
|
|
||||||
//time.Sleep(time.Second * 3)
|
|
||||||
//t.Log(time.Now(), "key", key, "fetch")
|
|
||||||
//if rand.Uint32()%5 == 0 {
|
|
||||||
// return "value_" + key, nil
|
|
||||||
//}
|
|
||||||
//return "", errors.New("rand error")
|
|
||||||
return "value_" + key, nil
|
|
||||||
})
|
|
||||||
}()
|
|
||||||
|
|
||||||
//wg.Add(1)
|
|
||||||
//go func() {
|
|
||||||
// defer wg.Done()
|
|
||||||
// for i := 0; i < 10; i++ {
|
|
||||||
// l.Del(key)
|
|
||||||
// time.Sleep(time.Second / 3)
|
|
||||||
// }
|
|
||||||
//}()
|
|
||||||
}
|
|
||||||
wg.Wait()
|
|
||||||
t.Log(len(tmp))
|
|
||||||
t.Log(target.String())
|
|
||||||
|
|
||||||
}
|
|
@ -1,10 +0,0 @@
|
|||||||
package local
|
|
||||||
|
|
||||||
type Target interface {
|
|
||||||
IncrGetHit()
|
|
||||||
IncrGetSuccess()
|
|
||||||
IncrGetFailed()
|
|
||||||
|
|
||||||
IncrDelHit()
|
|
||||||
IncrDelNotFound()
|
|
||||||
}
|
|
@ -1,4 +1,4 @@
|
|||||||
package local
|
package localcache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/hashicorp/golang-lru/v2/simplelru"
|
"github.com/hashicorp/golang-lru/v2/simplelru"
|
||||||
@ -30,6 +30,7 @@ func NewLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration,
|
|||||||
successTTL: successTTL,
|
successTTL: successTTL,
|
||||||
failedTTL: failedTTL,
|
failedTTL: failedTTL,
|
||||||
target: target,
|
target: target,
|
||||||
|
s: NewSingleFlight[K, V](),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -39,6 +40,7 @@ type LRU[K comparable, V any] struct {
|
|||||||
successTTL time.Duration
|
successTTL time.Duration
|
||||||
failedTTL time.Duration
|
failedTTL time.Duration
|
||||||
target Target
|
target Target
|
||||||
|
s *SingleFlight[K, V]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *LRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
func (x *LRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
|
||||||
@ -78,5 +80,10 @@ func (x *LRU[K, V]) Del(key K) bool {
|
|||||||
x.lock.Lock()
|
x.lock.Lock()
|
||||||
ok := x.core.Remove(key)
|
ok := x.core.Remove(key)
|
||||||
x.lock.Unlock()
|
x.lock.Unlock()
|
||||||
|
if ok {
|
||||||
|
x.target.IncrDelHit()
|
||||||
|
} else {
|
||||||
|
x.target.IncrDelNotFound()
|
||||||
|
}
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
55
pkg/common/localcache/lru_test.go
Normal file
55
pkg/common/localcache/lru_test.go
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
package localcache
|
||||||
|
|
||||||
|
//func TestName(t *testing.T) {
|
||||||
|
// target := &cacheTarget{}
|
||||||
|
// l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target, nil)
|
||||||
|
// //l := NewLRU[string, string](1000, time.Second*20, time.Second*5, target)
|
||||||
|
//
|
||||||
|
// fn := func(key string, n int, fetch func() (string, error)) {
|
||||||
|
// for i := 0; i < n; i++ {
|
||||||
|
// //v, err := l.Get(key, fetch)
|
||||||
|
// //if err == nil {
|
||||||
|
// // t.Log("key", key, "value", v)
|
||||||
|
// //} else {
|
||||||
|
// // t.Error("key", key, err)
|
||||||
|
// //}
|
||||||
|
// l.Get(key, fetch)
|
||||||
|
// //time.Sleep(time.Second / 100)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// tmp := make(map[string]struct{})
|
||||||
|
//
|
||||||
|
// var wg sync.WaitGroup
|
||||||
|
// for i := 0; i < 10000; i++ {
|
||||||
|
// wg.Add(1)
|
||||||
|
// key := fmt.Sprintf("key_%d", i%200)
|
||||||
|
// tmp[key] = struct{}{}
|
||||||
|
// go func() {
|
||||||
|
// defer wg.Done()
|
||||||
|
// //t.Log(key)
|
||||||
|
// fn(key, 10000, func() (string, error) {
|
||||||
|
// //time.Sleep(time.Second * 3)
|
||||||
|
// //t.Log(time.Now(), "key", key, "fetch")
|
||||||
|
// //if rand.Uint32()%5 == 0 {
|
||||||
|
// // return "value_" + key, nil
|
||||||
|
// //}
|
||||||
|
// //return "", errors.New("rand error")
|
||||||
|
// return "value_" + key, nil
|
||||||
|
// })
|
||||||
|
// }()
|
||||||
|
//
|
||||||
|
// //wg.Add(1)
|
||||||
|
// //go func() {
|
||||||
|
// // defer wg.Done()
|
||||||
|
// // for i := 0; i < 10; i++ {
|
||||||
|
// // l.Del(key)
|
||||||
|
// // time.Sleep(time.Second / 3)
|
||||||
|
// // }
|
||||||
|
// //}()
|
||||||
|
// }
|
||||||
|
// wg.Wait()
|
||||||
|
// t.Log(len(tmp))
|
||||||
|
// t.Log(target.String())
|
||||||
|
//
|
||||||
|
//}
|
@ -2,15 +2,14 @@ package localcache
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func defaultOption() *option {
|
func defaultOption() *option {
|
||||||
return &option{
|
return &option{
|
||||||
enable: true,
|
enable: true,
|
||||||
localSlotNum: 500,
|
localSlotNum: 500, //LRU的slot数量
|
||||||
localSlotSize: 20000,
|
localSlotSize: 20000, //每个LRU的大小
|
||||||
localSuccessTTL: time.Minute,
|
localSuccessTTL: time.Minute,
|
||||||
localFailedTTL: time.Second * 5,
|
localFailedTTL: time.Second * 5,
|
||||||
delFn: make([]func(ctx context.Context, key ...string), 0, 2),
|
delFn: make([]func(ctx context.Context, key ...string), 0, 2),
|
||||||
@ -26,7 +25,7 @@ type option struct {
|
|||||||
localFailedTTL time.Duration
|
localFailedTTL time.Duration
|
||||||
delFn []func(ctx context.Context, key ...string)
|
delFn []func(ctx context.Context, key ...string)
|
||||||
delCh func(fn func(key ...string))
|
delCh func(fn func(key ...string))
|
||||||
target local.Target
|
target Target
|
||||||
}
|
}
|
||||||
|
|
||||||
type Option func(o *option)
|
type Option func(o *option)
|
||||||
@ -73,7 +72,7 @@ func WithLocalFailedTTL(localFailedTTL time.Duration) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithTarget(target local.Target) Option {
|
func WithTarget(target Target) Option {
|
||||||
if target == nil {
|
if target == nil {
|
||||||
panic("target should not be nil")
|
panic("target should not be nil")
|
||||||
}
|
}
|
||||||
@ -99,15 +98,3 @@ func WithDeleteLocal(fn func(fn func(key ...string))) Option {
|
|||||||
o.delCh = fn
|
o.delCh = fn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type emptyTarget struct{}
|
|
||||||
|
|
||||||
func (e emptyTarget) IncrGetHit() {}
|
|
||||||
|
|
||||||
func (e emptyTarget) IncrGetSuccess() {}
|
|
||||||
|
|
||||||
func (e emptyTarget) IncrGetFailed() {}
|
|
||||||
|
|
||||||
func (e emptyTarget) IncrDelHit() {}
|
|
||||||
|
|
||||||
func (e emptyTarget) IncrDelNotFound() {}
|
|
||||||
|
43
pkg/common/localcache/singleflight.go
Normal file
43
pkg/common/localcache/singleflight.go
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
package localcache
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
type call[K comparable, V any] struct {
|
||||||
|
wg sync.WaitGroup
|
||||||
|
val V
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type SingleFlight[K comparable, V any] struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
m map[K]*call[K, V]
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSingleFlight[K comparable, V any]() *SingleFlight[K, V] {
|
||||||
|
return &SingleFlight[K, V]{m: make(map[K]*call[K, V])}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *SingleFlight[K, V]) Do(key K, fn func() (V, error)) (V, error) {
|
||||||
|
r.mu.Lock()
|
||||||
|
if r.m == nil {
|
||||||
|
r.m = make(map[K]*call[K, V])
|
||||||
|
}
|
||||||
|
if c, ok := r.m[key]; ok {
|
||||||
|
r.mu.Unlock()
|
||||||
|
c.wg.Wait()
|
||||||
|
return c.val, c.err
|
||||||
|
}
|
||||||
|
c := new(call[K, V])
|
||||||
|
c.wg.Add(1)
|
||||||
|
r.m[key] = c
|
||||||
|
r.mu.Unlock()
|
||||||
|
|
||||||
|
c.val, c.err = fn()
|
||||||
|
c.wg.Done()
|
||||||
|
|
||||||
|
r.mu.Lock()
|
||||||
|
delete(r.m, key)
|
||||||
|
r.mu.Unlock()
|
||||||
|
|
||||||
|
return c.val, c.err
|
||||||
|
}
|
59
pkg/common/localcache/target.go
Normal file
59
pkg/common/localcache/target.go
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
package localcache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Target interface {
|
||||||
|
IncrGetHit()
|
||||||
|
IncrGetSuccess()
|
||||||
|
IncrGetFailed()
|
||||||
|
|
||||||
|
IncrDelHit()
|
||||||
|
IncrDelNotFound()
|
||||||
|
}
|
||||||
|
|
||||||
|
type cacheTarget struct {
|
||||||
|
getHit int64
|
||||||
|
getSuccess int64
|
||||||
|
getFailed int64
|
||||||
|
delHit int64
|
||||||
|
delNotFound int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *cacheTarget) IncrGetHit() {
|
||||||
|
atomic.AddInt64(&r.getHit, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *cacheTarget) IncrGetSuccess() {
|
||||||
|
atomic.AddInt64(&r.getSuccess, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *cacheTarget) IncrGetFailed() {
|
||||||
|
atomic.AddInt64(&r.getFailed, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *cacheTarget) IncrDelHit() {
|
||||||
|
atomic.AddInt64(&r.delHit, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *cacheTarget) IncrDelNotFound() {
|
||||||
|
atomic.AddInt64(&r.delNotFound, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *cacheTarget) String() string {
|
||||||
|
return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
type emptyTarget struct{}
|
||||||
|
|
||||||
|
func (e emptyTarget) IncrGetHit() {}
|
||||||
|
|
||||||
|
func (e emptyTarget) IncrGetSuccess() {}
|
||||||
|
|
||||||
|
func (e emptyTarget) IncrGetFailed() {}
|
||||||
|
|
||||||
|
func (e emptyTarget) IncrDelHit() {}
|
||||||
|
|
||||||
|
func (e emptyTarget) IncrDelNotFound() {}
|
71
pkg/common/localcache/timingwheel.go
Normal file
71
pkg/common/localcache/timingwheel.go
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
package localcache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Execute[K comparable, V any] func(K, V)
|
||||||
|
|
||||||
|
type Task[K comparable, V any] struct {
|
||||||
|
key K
|
||||||
|
value V
|
||||||
|
}
|
||||||
|
|
||||||
|
type TimeWheel[K comparable, V any] struct {
|
||||||
|
ticker *time.Ticker
|
||||||
|
slots [][]Task[K, V]
|
||||||
|
currentPos int
|
||||||
|
size int
|
||||||
|
slotMutex sync.Mutex
|
||||||
|
execute Execute[K, V]
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTimeWheel[K comparable, V any](size int, tickDuration time.Duration, execute Execute[K, V]) *TimeWheel[K, V] {
|
||||||
|
return &TimeWheel[K, V]{
|
||||||
|
ticker: time.NewTicker(tickDuration),
|
||||||
|
slots: make([][]Task[K, V], size),
|
||||||
|
currentPos: 0,
|
||||||
|
size: size,
|
||||||
|
execute: execute,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tw *TimeWheel[K, V]) Start() {
|
||||||
|
for range tw.ticker.C {
|
||||||
|
tw.tick()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tw *TimeWheel[K, V]) Stop() {
|
||||||
|
tw.ticker.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tw *TimeWheel[K, V]) tick() {
|
||||||
|
tw.slotMutex.Lock()
|
||||||
|
defer tw.slotMutex.Unlock()
|
||||||
|
|
||||||
|
tasks := tw.slots[tw.currentPos]
|
||||||
|
tw.slots[tw.currentPos] = nil
|
||||||
|
if len(tasks) > 0 {
|
||||||
|
go func(tasks []Task[K, V]) {
|
||||||
|
for _, task := range tasks {
|
||||||
|
tw.execute(task.key, task.value)
|
||||||
|
}
|
||||||
|
}(tasks)
|
||||||
|
}
|
||||||
|
|
||||||
|
tw.currentPos = (tw.currentPos + 1) % tw.size
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tw *TimeWheel[K, V]) AddTask(delay int, task Task[K, V]) {
|
||||||
|
if delay < 0 || delay >= tw.size {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
tw.slotMutex.Lock()
|
||||||
|
defer tw.slotMutex.Unlock()
|
||||||
|
|
||||||
|
pos := (tw.currentPos + delay) % tw.size
|
||||||
|
tw.slots[pos] = append(tw.slots[pos], task)
|
||||||
|
}
|
16
pkg/common/redispubsub/redispubliser.go
Normal file
16
pkg/common/redispubsub/redispubliser.go
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
package redispubsub
|
||||||
|
|
||||||
|
import "github.com/redis/go-redis/v9"
|
||||||
|
|
||||||
|
type Publisher struct {
|
||||||
|
client redis.UniversalClient
|
||||||
|
channel string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPublisher(client redis.UniversalClient, channel string) *Publisher {
|
||||||
|
return &Publisher{client: client, channel: channel}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Publisher) Publish(message string) error {
|
||||||
|
return p.client.Publish(ctx, p.channel, message).Err()
|
||||||
|
}
|
27
pkg/common/redispubsub/redissubscriber.go
Normal file
27
pkg/common/redispubsub/redissubscriber.go
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
package redispubsub
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ctx = context.Background()
|
||||||
|
|
||||||
|
type Subscriber struct {
|
||||||
|
client redis.UniversalClient
|
||||||
|
channel string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSubscriber(client redis.UniversalClient, channel string) *Subscriber {
|
||||||
|
return &Subscriber{client: client, channel: channel}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Subscriber) OnMessage(callback func(string)) error {
|
||||||
|
messageChannel := s.client.Subscribe(ctx, s.channel).Channel()
|
||||||
|
go func() {
|
||||||
|
for msg := range messageChannel {
|
||||||
|
callback(msg.Payload)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user