diff --git a/pkg/common/localcache/cache.go b/pkg/common/localcache/cache.go index 228538fa6..f81a1a214 100644 --- a/pkg/common/localcache/cache.go +++ b/pkg/common/localcache/cache.go @@ -3,10 +3,14 @@ package localcache import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/link" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local" opt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" + "hash/fnv" + "time" + "unsafe" ) +const TimingWheelSize = 500 + type Cache[V any] interface { Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error) Del(ctx context.Context, key ...string) @@ -17,8 +21,15 @@ func New[V any](opts ...Option) Cache[V] { for _, o := range opts { o(opt) } - c := &cache[V]{opt: opt, link: link.New(opt.localSlotNum)} - c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + c := &cache[V]{ + opt: opt, + link: link.New(opt.localSlotNum), + n: uint64(opt.localSlotNum), + } + c.timingWheel = NewTimeWheel[string, V](TimingWheelSize, time.Second, c.exec) + for i := 0; i < opt.localSlotNum; i++ { + c.slots[i] = NewLRU[string, V](opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) + } go func() { c.opt.delCh(c.del) }() @@ -26,16 +37,24 @@ func New[V any](opts ...Option) Cache[V] { } type cache[V any] struct { - opt *option - link link.Link - local local.Cache[V] + n uint64 + slots []*LRU[string, V] + opt *option + link link.Link + timingWheel *TimeWheel[string, V] +} + +func (c *cache[V]) index(key string) uint64 { + h := fnv.New64a() + _, _ = h.Write(*(*[]byte)(unsafe.Pointer(&key))) + return h.Sum64() % c.n } func (c *cache[V]) onEvict(key string, value V) { lks := c.link.Del(key) for k := range lks { if key != k { // prevent deadlock - c.local.Del(k) + c.slots[c.index(k)].Del(k) } } } @@ -43,9 +62,9 @@ func (c *cache[V]) onEvict(key string, value V) { func (c *cache[V]) del(key ...string) { for _, k := range key { lks := c.link.Del(k) - c.local.Del(k) + c.slots[c.index(k)].Del(k) for k := range lks { - c.local.Del(k) + c.slots[c.index(k)].Del(k) } } } @@ -59,7 +78,7 @@ func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.C if len(opts) > 0 && len(opts[0].Link) > 0 { c.link.Link(key, opts[0].Link...) } - return c.local.Get(key, func() (V, error) { + return c.slots[c.index(key)].Get(key, func() (V, error) { return fetch(ctx) }) } else { @@ -78,3 +97,6 @@ func (c *cache[V]) Del(ctx context.Context, key ...string) { c.del(key...) } } +func (c *cache[V]) exec(key string, value V) { + +} diff --git a/pkg/common/localcache/local/callback.go b/pkg/common/localcache/callback.go similarity index 86% rename from pkg/common/localcache/local/callback.go rename to pkg/common/localcache/callback.go index 32aef112b..4bd37a2c2 100644 --- a/pkg/common/localcache/local/callback.go +++ b/pkg/common/localcache/callback.go @@ -1,4 +1,4 @@ -package local +package localcache import "github.com/hashicorp/golang-lru/v2/simplelru" diff --git a/pkg/common/localcache/local/cache.go b/pkg/common/localcache/local/cache.go deleted file mode 100644 index 06569965e..000000000 --- a/pkg/common/localcache/local/cache.go +++ /dev/null @@ -1,50 +0,0 @@ -package local - -import ( - "hash/fnv" - "time" - "unsafe" -) - -type Cache[V any] interface { - Get(key string, fetch func() (V, error)) (V, error) - Del(key string) bool -} - -func NewCache[V any](slotNum, slotSize int, successTTL, failedTTL time.Duration, target Target, onEvict EvictCallback[string, V]) Cache[V] { - c := &slot[V]{ - n: uint64(slotNum), - slots: make([]*LRU[string, V], slotNum), - target: target, - } - for i := 0; i < slotNum; i++ { - c.slots[i] = NewLRU[string, V](slotSize, successTTL, failedTTL, c.target, onEvict) - } - return c -} - -type slot[V any] struct { - n uint64 - slots []*LRU[string, V] - target Target -} - -func (c *slot[V]) index(s string) uint64 { - h := fnv.New64a() - _, _ = h.Write(*(*[]byte)(unsafe.Pointer(&s))) - return h.Sum64() % c.n -} - -func (c *slot[V]) Get(key string, fetch func() (V, error)) (V, error) { - return c.slots[c.index(key)].Get(key, fetch) -} - -func (c *slot[V]) Del(key string) bool { - if c.slots[c.index(key)].Del(key) { - c.target.IncrDelHit() - return true - } else { - c.target.IncrDelNotFound() - return false - } -} diff --git a/pkg/common/localcache/local/lru_test.go b/pkg/common/localcache/local/lru_test.go deleted file mode 100644 index a6e7553ee..000000000 --- a/pkg/common/localcache/local/lru_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package local - -import ( - "fmt" - "sync" - "sync/atomic" - "testing" - "time" -) - -type cacheTarget struct { - getHit int64 - getSuccess int64 - getFailed int64 - delHit int64 - delNotFound int64 -} - -func (r *cacheTarget) IncrGetHit() { - atomic.AddInt64(&r.getHit, 1) -} - -func (r *cacheTarget) IncrGetSuccess() { - atomic.AddInt64(&r.getSuccess, 1) -} - -func (r *cacheTarget) IncrGetFailed() { - atomic.AddInt64(&r.getFailed, 1) -} - -func (r *cacheTarget) IncrDelHit() { - atomic.AddInt64(&r.delHit, 1) -} - -func (r *cacheTarget) IncrDelNotFound() { - atomic.AddInt64(&r.delNotFound, 1) -} - -func (r *cacheTarget) String() string { - return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound) -} - -func TestName(t *testing.T) { - target := &cacheTarget{} - l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target, nil) - //l := NewLRU[string, string](1000, time.Second*20, time.Second*5, target) - - fn := func(key string, n int, fetch func() (string, error)) { - for i := 0; i < n; i++ { - //v, err := l.Get(key, fetch) - //if err == nil { - // t.Log("key", key, "value", v) - //} else { - // t.Error("key", key, err) - //} - l.Get(key, fetch) - //time.Sleep(time.Second / 100) - } - } - - tmp := make(map[string]struct{}) - - var wg sync.WaitGroup - for i := 0; i < 10000; i++ { - wg.Add(1) - key := fmt.Sprintf("key_%d", i%200) - tmp[key] = struct{}{} - go func() { - defer wg.Done() - //t.Log(key) - fn(key, 10000, func() (string, error) { - //time.Sleep(time.Second * 3) - //t.Log(time.Now(), "key", key, "fetch") - //if rand.Uint32()%5 == 0 { - // return "value_" + key, nil - //} - //return "", errors.New("rand error") - return "value_" + key, nil - }) - }() - - //wg.Add(1) - //go func() { - // defer wg.Done() - // for i := 0; i < 10; i++ { - // l.Del(key) - // time.Sleep(time.Second / 3) - // } - //}() - } - wg.Wait() - t.Log(len(tmp)) - t.Log(target.String()) - -} diff --git a/pkg/common/localcache/local/target.go b/pkg/common/localcache/local/target.go deleted file mode 100644 index 6cb134fb0..000000000 --- a/pkg/common/localcache/local/target.go +++ /dev/null @@ -1,10 +0,0 @@ -package local - -type Target interface { - IncrGetHit() - IncrGetSuccess() - IncrGetFailed() - - IncrDelHit() - IncrDelNotFound() -} diff --git a/pkg/common/localcache/local/lru.go b/pkg/common/localcache/lru.go similarity index 91% rename from pkg/common/localcache/local/lru.go rename to pkg/common/localcache/lru.go index 45dc3b651..4fd1704d2 100644 --- a/pkg/common/localcache/local/lru.go +++ b/pkg/common/localcache/lru.go @@ -1,4 +1,4 @@ -package local +package localcache import ( "github.com/hashicorp/golang-lru/v2/simplelru" @@ -30,6 +30,7 @@ func NewLRU[K comparable, V any](size int, successTTL, failedTTL time.Duration, successTTL: successTTL, failedTTL: failedTTL, target: target, + s: NewSingleFlight[K, V](), } } @@ -39,6 +40,7 @@ type LRU[K comparable, V any] struct { successTTL time.Duration failedTTL time.Duration target Target + s *SingleFlight[K, V] } func (x *LRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { @@ -78,5 +80,10 @@ func (x *LRU[K, V]) Del(key K) bool { x.lock.Lock() ok := x.core.Remove(key) x.lock.Unlock() + if ok { + x.target.IncrDelHit() + } else { + x.target.IncrDelNotFound() + } return ok } diff --git a/pkg/common/localcache/lru_test.go b/pkg/common/localcache/lru_test.go new file mode 100644 index 000000000..fff925eaa --- /dev/null +++ b/pkg/common/localcache/lru_test.go @@ -0,0 +1,55 @@ +package localcache + +//func TestName(t *testing.T) { +// target := &cacheTarget{} +// l := NewCache[string](100, 1000, time.Second*20, time.Second*5, target, nil) +// //l := NewLRU[string, string](1000, time.Second*20, time.Second*5, target) +// +// fn := func(key string, n int, fetch func() (string, error)) { +// for i := 0; i < n; i++ { +// //v, err := l.Get(key, fetch) +// //if err == nil { +// // t.Log("key", key, "value", v) +// //} else { +// // t.Error("key", key, err) +// //} +// l.Get(key, fetch) +// //time.Sleep(time.Second / 100) +// } +// } +// +// tmp := make(map[string]struct{}) +// +// var wg sync.WaitGroup +// for i := 0; i < 10000; i++ { +// wg.Add(1) +// key := fmt.Sprintf("key_%d", i%200) +// tmp[key] = struct{}{} +// go func() { +// defer wg.Done() +// //t.Log(key) +// fn(key, 10000, func() (string, error) { +// //time.Sleep(time.Second * 3) +// //t.Log(time.Now(), "key", key, "fetch") +// //if rand.Uint32()%5 == 0 { +// // return "value_" + key, nil +// //} +// //return "", errors.New("rand error") +// return "value_" + key, nil +// }) +// }() +// +// //wg.Add(1) +// //go func() { +// // defer wg.Done() +// // for i := 0; i < 10; i++ { +// // l.Del(key) +// // time.Sleep(time.Second / 3) +// // } +// //}() +// } +// wg.Wait() +// t.Log(len(tmp)) +// t.Log(target.String()) +// +//} diff --git a/pkg/common/localcache/option.go b/pkg/common/localcache/option.go index 01c9ce57d..6f0b973d4 100644 --- a/pkg/common/localcache/option.go +++ b/pkg/common/localcache/option.go @@ -2,15 +2,14 @@ package localcache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local" "time" ) func defaultOption() *option { return &option{ enable: true, - localSlotNum: 500, - localSlotSize: 20000, + localSlotNum: 500, //LRU的slot数量 + localSlotSize: 20000, //每个LRU的大小 localSuccessTTL: time.Minute, localFailedTTL: time.Second * 5, delFn: make([]func(ctx context.Context, key ...string), 0, 2), @@ -26,7 +25,7 @@ type option struct { localFailedTTL time.Duration delFn []func(ctx context.Context, key ...string) delCh func(fn func(key ...string)) - target local.Target + target Target } type Option func(o *option) @@ -73,7 +72,7 @@ func WithLocalFailedTTL(localFailedTTL time.Duration) Option { } } -func WithTarget(target local.Target) Option { +func WithTarget(target Target) Option { if target == nil { panic("target should not be nil") } @@ -99,15 +98,3 @@ func WithDeleteLocal(fn func(fn func(key ...string))) Option { o.delCh = fn } } - -type emptyTarget struct{} - -func (e emptyTarget) IncrGetHit() {} - -func (e emptyTarget) IncrGetSuccess() {} - -func (e emptyTarget) IncrGetFailed() {} - -func (e emptyTarget) IncrDelHit() {} - -func (e emptyTarget) IncrDelNotFound() {} diff --git a/pkg/common/localcache/singleflight.go b/pkg/common/localcache/singleflight.go new file mode 100644 index 000000000..5dcd70669 --- /dev/null +++ b/pkg/common/localcache/singleflight.go @@ -0,0 +1,43 @@ +package localcache + +import "sync" + +type call[K comparable, V any] struct { + wg sync.WaitGroup + val V + err error +} + +type SingleFlight[K comparable, V any] struct { + mu sync.Mutex + m map[K]*call[K, V] +} + +func NewSingleFlight[K comparable, V any]() *SingleFlight[K, V] { + return &SingleFlight[K, V]{m: make(map[K]*call[K, V])} +} + +func (r *SingleFlight[K, V]) Do(key K, fn func() (V, error)) (V, error) { + r.mu.Lock() + if r.m == nil { + r.m = make(map[K]*call[K, V]) + } + if c, ok := r.m[key]; ok { + r.mu.Unlock() + c.wg.Wait() + return c.val, c.err + } + c := new(call[K, V]) + c.wg.Add(1) + r.m[key] = c + r.mu.Unlock() + + c.val, c.err = fn() + c.wg.Done() + + r.mu.Lock() + delete(r.m, key) + r.mu.Unlock() + + return c.val, c.err +} diff --git a/pkg/common/localcache/target.go b/pkg/common/localcache/target.go new file mode 100644 index 000000000..2edf51ddb --- /dev/null +++ b/pkg/common/localcache/target.go @@ -0,0 +1,59 @@ +package localcache + +import ( + "fmt" + "sync/atomic" +) + +type Target interface { + IncrGetHit() + IncrGetSuccess() + IncrGetFailed() + + IncrDelHit() + IncrDelNotFound() +} + +type cacheTarget struct { + getHit int64 + getSuccess int64 + getFailed int64 + delHit int64 + delNotFound int64 +} + +func (r *cacheTarget) IncrGetHit() { + atomic.AddInt64(&r.getHit, 1) +} + +func (r *cacheTarget) IncrGetSuccess() { + atomic.AddInt64(&r.getSuccess, 1) +} + +func (r *cacheTarget) IncrGetFailed() { + atomic.AddInt64(&r.getFailed, 1) +} + +func (r *cacheTarget) IncrDelHit() { + atomic.AddInt64(&r.delHit, 1) +} + +func (r *cacheTarget) IncrDelNotFound() { + atomic.AddInt64(&r.delNotFound, 1) +} + +func (r *cacheTarget) String() string { + return fmt.Sprintf("getHit: %d, getSuccess: %d, getFailed: %d, delHit: %d, delNotFound: %d", r.getHit, r.getSuccess, r.getFailed, r.delHit, r.delNotFound) +} + +type emptyTarget struct{} + +func (e emptyTarget) IncrGetHit() {} + +func (e emptyTarget) IncrGetSuccess() {} + +func (e emptyTarget) IncrGetFailed() {} + +func (e emptyTarget) IncrDelHit() {} + +func (e emptyTarget) IncrDelNotFound() {} diff --git a/pkg/common/localcache/timingwheel.go b/pkg/common/localcache/timingwheel.go new file mode 100644 index 000000000..ca1bac033 --- /dev/null +++ b/pkg/common/localcache/timingwheel.go @@ -0,0 +1,71 @@ +package localcache + +import ( + "sync" + "time" +) + +type Execute[K comparable, V any] func(K, V) + +type Task[K comparable, V any] struct { + key K + value V +} + +type TimeWheel[K comparable, V any] struct { + ticker *time.Ticker + slots [][]Task[K, V] + currentPos int + size int + slotMutex sync.Mutex + execute Execute[K, V] +} + +func NewTimeWheel[K comparable, V any](size int, tickDuration time.Duration, execute Execute[K, V]) *TimeWheel[K, V] { + return &TimeWheel[K, V]{ + ticker: time.NewTicker(tickDuration), + slots: make([][]Task[K, V], size), + currentPos: 0, + size: size, + execute: execute, + } +} + +func (tw *TimeWheel[K, V]) Start() { + for range tw.ticker.C { + tw.tick() + } +} + +func (tw *TimeWheel[K, V]) Stop() { + tw.ticker.Stop() +} + +func (tw *TimeWheel[K, V]) tick() { + tw.slotMutex.Lock() + defer tw.slotMutex.Unlock() + + tasks := tw.slots[tw.currentPos] + tw.slots[tw.currentPos] = nil + if len(tasks) > 0 { + go func(tasks []Task[K, V]) { + for _, task := range tasks { + tw.execute(task.key, task.value) + } + }(tasks) + } + + tw.currentPos = (tw.currentPos + 1) % tw.size +} + +func (tw *TimeWheel[K, V]) AddTask(delay int, task Task[K, V]) { + if delay < 0 || delay >= tw.size { + return + } + + tw.slotMutex.Lock() + defer tw.slotMutex.Unlock() + + pos := (tw.currentPos + delay) % tw.size + tw.slots[pos] = append(tw.slots[pos], task) +} diff --git a/pkg/common/redispubsub/redispubliser.go b/pkg/common/redispubsub/redispubliser.go new file mode 100644 index 000000000..822b25bf9 --- /dev/null +++ b/pkg/common/redispubsub/redispubliser.go @@ -0,0 +1,16 @@ +package redispubsub + +import "github.com/redis/go-redis/v9" + +type Publisher struct { + client redis.UniversalClient + channel string +} + +func NewPublisher(client redis.UniversalClient, channel string) *Publisher { + return &Publisher{client: client, channel: channel} +} + +func (p *Publisher) Publish(message string) error { + return p.client.Publish(ctx, p.channel, message).Err() +} diff --git a/pkg/common/redispubsub/redissubscriber.go b/pkg/common/redispubsub/redissubscriber.go new file mode 100644 index 000000000..69cfd8a69 --- /dev/null +++ b/pkg/common/redispubsub/redissubscriber.go @@ -0,0 +1,27 @@ +package redispubsub + +import ( + "context" + "github.com/redis/go-redis/v9" +) + +var ctx = context.Background() + +type Subscriber struct { + client redis.UniversalClient + channel string +} + +func NewSubscriber(client redis.UniversalClient, channel string) *Subscriber { + return &Subscriber{client: client, channel: channel} +} + +func (s *Subscriber) OnMessage(callback func(string)) error { + messageChannel := s.client.Subscribe(ctx, s.channel).Channel() + go func() { + for msg := range messageChannel { + callback(msg.Payload) + } + }() + return nil +}