diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index f15f5403f..50f35448d 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -197,7 +197,6 @@ func (c *ConsumerHandler) shouldPushOffline(_ context.Context, msg *sdkws.MsgDat } func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.MsgData, pushToUserIDs []string) ([]*msggateway.SingleMsgToUserResults, error) { - onlineUserIDs, offlineUserIDs, err := c.onlineCache.GetUsersOnline(ctx, pushToUserIDs) if err != nil { return nil, err diff --git a/pkg/localcache/lru/lru.go b/pkg/localcache/lru/lru.go index 41de3c8fd..726535c48 100644 --- a/pkg/localcache/lru/lru.go +++ b/pkg/localcache/lru/lru.go @@ -22,6 +22,7 @@ type LRU[K comparable, V any] interface { Get(key K, fetch func() (V, error)) (V, error) Set(key K, value V) SetHas(key K, value V) bool + GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) Del(key K) bool Stop() } diff --git a/pkg/localcache/lru/lru_expiration.go b/pkg/localcache/lru/lru_expiration.go index 98f170cc8..df6bacbf4 100644 --- a/pkg/localcache/lru/lru_expiration.go +++ b/pkg/localcache/lru/lru_expiration.go @@ -51,6 +51,11 @@ type ExpirationLRU[K comparable, V any] struct { target Target } +func (x *ExpirationLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { + //TODO implement me + panic("implement me") +} + func (x *ExpirationLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { x.lock.Lock() v, ok := x.core.Get(key) diff --git a/pkg/localcache/lru/lru_lazy.go b/pkg/localcache/lru/lru_lazy.go index b4e560c4d..e7f7b8bd5 100644 --- a/pkg/localcache/lru/lru_lazy.go +++ b/pkg/localcache/lru/lru_lazy.go @@ -15,14 +15,10 @@ package lru import ( - "context" "sync" "time" "github.com/hashicorp/golang-lru/v2/simplelru" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/log" - "github.com/openimsdk/tools/utils/datautil" ) type layLruItem[V any] struct { @@ -92,77 +88,14 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { return v.value, v.err } -func (x *LayLRU[K, V]) GetBatch(keys []K, fetchBatch func([]K) (map[K]V, error)) (map[K]V, error) { - ctx := context.Background() - resultMap := make(map[K]V) - // errorMap := make(map[K]error) - missingKeys := []K{} - lazyLruItemMap := make(map[K]*layLruItem[V]) - - for _, key := range keys { - x.lock.Lock() - v, ok := x.core.Get(key) - lazyLruItemMap[key] = v - if ok { - x.lock.Unlock() - v.lock.Lock() - expires, value, err := v.expires, v.value, v.err - if expires != 0 && expires > time.Now().UnixMilli() { - v.lock.Unlock() - resultMap[key] = value - x.target.IncrGetHit() - } else { - missingKeys = append(missingKeys, key) - v.lock.Unlock() - } - if err != nil { - log.ZWarn(ctx, "Get Local LRU is failed.", errs.Wrap(err)) - } - continue - } else { - // initialize and insert new lazyLruItem - v = &layLruItem[V]{} - lazyLruItemMap[key] = v - x.core.Add(key, v) - v.lock.Lock() - missingKeys = append(missingKeys, key) - x.lock.Unlock() - } - defer v.lock.Unlock() - } - - x.lock.Unlock() - - // Fetch missing Key - if len(missingKeys) > 0 { - failedKey := missingKeys - fetchMap, err := fetchBatch(missingKeys) - if err != nil { - log.ZWarn(ctx, "fetch Key is failed.", errs.Wrap(err)) - } - - for key, value := range fetchMap { - resultMap[key] = value - lazyLruItemMap[key].expires = time.Now().Add(x.successTTL).UnixMilli() - x.target.IncrGetSuccess() - failedKey = datautil.DeleteElems(failedKey, key) - } - for _, key := range failedKey { - lazyLruItemMap[key].expires = time.Now().Add(x.failedTTL).UnixMilli() - x.target.IncrGetFailed() - } - } - return resultMap, nil -} - -func (x *LayLRU[K, V]) GetBatchs(keys []K, fetch func(keys []K) (map[K]V, error)) ([]V, error) { +func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { var ( err error once sync.Once ) x.lock.Lock() - res := make([]V, 0) + res := make(map[K]V) queries := make([]K, 0) setVs := make(map[K]*layLruItem[V]) for _, key := range keys { @@ -174,7 +107,7 @@ func (x *LayLRU[K, V]) GetBatchs(keys []K, fetch func(keys []K) (map[K]V, error) if expires != 0 && expires > time.Now().UnixMilli() { v.lock.Unlock() x.target.IncrGetHit() - res = append(res, value) + res[key] = value if err1 != nil { once.Do(func() { err = err1 @@ -207,27 +140,12 @@ func (x *LayLRU[K, V]) GetBatchs(keys []K, fetch func(keys []K) (map[K]V, error) x.lock.Lock() x.core.Add(key, v) x.lock.Unlock() - res = append(res, val) + res[key] = val } return res, err } -func (x *LayLRU[K, V]) SetBatch(data map[K]V) { - x.lock.Lock() - defer x.lock.Unlock() - - for key, value := range data { - x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) - } -} - -//func (x *LayLRU[K, V]) Set(key K, value V) { -// x.lock.Lock() -// x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) -// x.lock.Unlock() -//} -// //func (x *LayLRU[K, V]) Has(key K) bool { // x.lock.Lock() // defer x.lock.Unlock() diff --git a/pkg/localcache/lru/lru_slot.go b/pkg/localcache/lru/lru_slot.go index 30ba25c41..077219b75 100644 --- a/pkg/localcache/lru/lru_slot.go +++ b/pkg/localcache/lru/lru_slot.go @@ -32,6 +32,29 @@ type slotLRU[K comparable, V any] struct { hash func(k K) uint64 } +func (x *slotLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) (map[K]V, error) { + var ( + slotKeys = make(map[uint64][]K) + vs = make(map[K]V) + ) + + for _, k := range keys { + index := x.getIndex(k) + slotKeys[index] = append(slotKeys[index], k) + } + + for k, v := range slotKeys { + batches, err := x.slots[k].GetBatch(v, fetch) + if err != nil { + return nil, err + } + for key, value := range batches { + vs[key] = value + } + } + return vs, nil +} + func (x *slotLRU[K, V]) getIndex(k K) uint64 { return x.hash(k) % x.n } diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index 88e1ad46c..9b3a27e31 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -205,29 +205,27 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e return len(platformIDs) > 0, nil } -//func (o *OnlineCache) getUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string][]int32, error) { -// platformIDsMap, err := o.lruCache.GetBatch(userIDs, func(missingUsers []string) (map[string][]int32, error) { -// platformIDsMap := make(map[string][]int32) -// -// usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, missingUsers) -// if err != nil { -// return nil, err -// } -// -// for _, user := range usersStatus { -// platformIDsMap[user.UserID] = user.PlatformIDs -// } -// -// return platformIDsMap, nil -// }) -// if err != nil { -// log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userIDs) -// return nil, err -// } -// -// //log.ZDebug(ctx, "OnlineCache GetUserOnlinePlatform", "userID", userID, "platformIDs", platformIDs) -// return platformIDsMap, nil -//} +func (o *OnlineCache) getUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string][]int32, error) { + platformIDsMap, err := o.lruCache.GetBatch(userIDs, func(missingUsers []string) (map[string][]int32, error) { + platformIDsMap := make(map[string][]int32) + + usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, missingUsers) + if err != nil { + return nil, err + } + + for _, u := range usersStatus { + platformIDsMap[u.UserID] = u.PlatformIDs + } + + return platformIDsMap, nil + }) + if err != nil { + log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userIDs) + return nil, err + } + return platformIDsMap, nil +} func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, []string, error) { t := time.Now() @@ -237,19 +235,6 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]s offlineUserIDs = make([]string, 0, len(userIDs)) ) - //userOnlineMap, err := o.getUserOnlinePlatformBatch(ctx, userIDs) - //if err != nil { - // return nil, nil, err - //} - // - //for key, value := range userOnlineMap { - // if len(value) > 0 { - // onlineUserIDs = append(onlineUserIDs, key) - // } else { - // offlineUserIDs = append(offlineUserIDs, key) - // } - //} - switch o.fullUserCache { case true: for _, userID := range userIDs { @@ -260,6 +245,18 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]s } } case false: + userOnlineMap, err := o.getUserOnlinePlatformBatch(ctx, userIDs) + if err != nil { + return nil, nil, err + } + + for key, value := range userOnlineMap { + if len(value) > 0 { + onlineUserIDs = append(onlineUserIDs, key) + } else { + offlineUserIDs = append(offlineUserIDs, key) + } + } } log.ZWarn(ctx, "get users online", nil, "online users length", len(userIDs), "offline users length", len(offlineUserIDs), "cost", time.Since(t))