diff --git a/internal/rpc/user/online.go b/internal/rpc/user/online.go index 99b272006..5ac4085a9 100644 --- a/internal/rpc/user/online.go +++ b/internal/rpc/user/online.go @@ -2,6 +2,7 @@ package user import ( "context" + "github.com/openimsdk/protocol/constant" pbuser "github.com/openimsdk/protocol/user" ) diff --git a/pkg/localcache/lru/lru.go b/pkg/localcache/lru/lru.go index 41de3c8fd..943151246 100644 --- a/pkg/localcache/lru/lru.go +++ b/pkg/localcache/lru/lru.go @@ -21,6 +21,7 @@ type EvictCallback[K comparable, V any] simplelru.EvictCallback[K, V] type LRU[K comparable, V any] interface { Get(key K, fetch func() (V, error)) (V, error) Set(key K, value V) + GetBatch(key []K, fetchBatch func([]K) (map[string]V, error)) (map[string]V, error) SetHas(key K, value V) bool Del(key K) bool Stop() diff --git a/pkg/localcache/lru/lru_lazy.go b/pkg/localcache/lru/lru_lazy.go index 942773ed6..0c74f8343 100644 --- a/pkg/localcache/lru/lru_lazy.go +++ b/pkg/localcache/lru/lru_lazy.go @@ -15,10 +15,14 @@ package lru import ( + "context" "sync" "time" "github.com/hashicorp/golang-lru/v2/simplelru" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" ) type layLruItem[V any] struct { @@ -88,7 +92,70 @@ func (x *LayLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) { return v.value, v.err } -func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) ([]V, error) { +func (x *LayLRU[K, V]) GetBatch(keys []K, fetchBatch func([]K) (map[K]V, error)) (map[K]V, error) { + ctx := context.Background() + resultMap := make(map[K]V) + // errorMap := make(map[K]error) + missingKeys := []K{} + lazyLruItemMap := make(map[K]*layLruItem[V]) + + for _, key := range keys { + x.lock.Lock() + v, ok := x.core.Get(key) + lazyLruItemMap[key] = v + if ok { + x.lock.Unlock() + v.lock.Lock() + expires, value, err := v.expires, v.value, v.err + if expires != 0 && expires > time.Now().UnixMilli() { + v.lock.Unlock() + resultMap[key] = value + x.target.IncrGetHit() + } else { + missingKeys = append(missingKeys, key) + v.lock.Unlock() + } + if err != nil { + log.ZWarn(ctx, "Get Local LRU is failed.", errs.Wrap(err)) + } + continue + } else { + // initialize and insert new lazyLruItem + v = &layLruItem[V]{} + lazyLruItemMap[key] = v + x.core.Add(key, v) + v.lock.Lock() + missingKeys = append(missingKeys, key) + x.lock.Unlock() + } + defer v.lock.Unlock() + } + + x.lock.Unlock() + + // Fetch missing Key + if len(missingKeys) > 0 { + failedKey := missingKeys + fetchMap, err := fetchBatch(missingKeys) + if err != nil { + log.ZWarn(ctx, "fetch Key is failed.", errs.Wrap(err)) + } + + for key, value := range fetchMap { + resultMap[key] = value + lazyLruItemMap[key].expires = time.Now().Add(x.successTTL).UnixMilli() + x.target.IncrGetSuccess() + failedKey = datautil.DeleteElems(failedKey, key) + } + for _, key := range failedKey { + lazyLruItemMap[key].expires = time.Now().Add(x.failedTTL).UnixMilli() + x.target.IncrGetFailed() + } + } + return resultMap, nil +} + +func (x *LayLRU[K, V]) GetBatchs(keys []K, fetch func(keys []K) (map[K]V, error)) ([]V, error) { x.lock.Lock() res := make([]V, 0) queries := make([]K, 0) @@ -134,13 +201,10 @@ func (x *LayLRU[K, V]) GetBatch(keys []K, fetch func(keys []K) (map[K]V, error)) func (x *LayLRU[K, V]) SetBatch(data map[K]V) bool { x.lock.Lock() defer x.lock.Unlock() + for key, value := range data { - if x.core.Contains(key) { - x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) - return true - } + x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()}) } - return false } //func (x *LayLRU[K, V]) Set(key K, value V) { diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index 640e95d1b..5fe8f97a1 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -2,6 +2,10 @@ package rpccache import ( "context" + "math/rand" + "strconv" + "time" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" @@ -12,9 +16,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/redis/go-redis/v9" - "math/rand" - "strconv" - "time" ) func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) (*OnlineCache, error) { @@ -136,6 +137,16 @@ func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) return platformIDs, nil } +// func (o *OnlineCache) GetUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string]int32, error) { +// platformIDs, err := o.getUserOnlinePlatform(ctx, userIDs) +// if err != nil { +// return nil, err +// } +// tmp := make([]int32, len(platformIDs)) +// copy(tmp, platformIDs) +// return platformIDs, nil +// } + func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) { platformIDs, err := o.getUserOnlinePlatform(ctx, userID) if err != nil { @@ -144,18 +155,57 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e return len(platformIDs) > 0, nil } +func (o *OnlineCache) getUserOnlinePlatformBatch(ctx context.Context, userIDs []string) (map[string][]int32, error) { + platformIDsMap, err := o.lruCache.GetBatch(userIDs, func(missingUsers []string) (map[string][]int32, error) { + platformIDsMap := make(map[string][]int32) + + usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, missingUsers) + if err != nil { + return nil, err + } + + for _, user := range usersStatus { + platformIDsMap[user.UserID] = user.PlatformIDs + } + + return platformIDsMap, nil + }) + if err != nil { + log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userIDs) + return nil, err + } + + //log.ZDebug(ctx, "OnlineCache GetUserOnlinePlatform", "userID", userID, "platformIDs", platformIDs) + return platformIDsMap, nil +} + + func (o *OnlineCache) GetUsersOnline(ctx context.Context, usersID []string) ([]string, []string, error) { t := time.Now() + var ( - onlineUserIDS []string + onlineUserIDs []string offlineUserIDs []string ) + userOnlineMap, err := o.getUserOnlinePlatformBatch(ctx, userIDs) + if err != nil { + return nil, nil, err + } + + for key, value := range userOnlineMap { + if len(value) > 0 { + onlineUserIDs = append(onlineUserIDs, key) + } else { + offlineUserIDs = append(offlineUserIDs, key) + } + } + switch o.fullUserCache { case true: - for _, userID := range usersID { + for _, userID := range userIDs { if _, ok := o.mapCache.Load(userID); ok { - onlineUserIDS = append(onlineUserIDS, userID) + onlineUserIDs = append(onlineUserIDs, userID) } else { offlineUserIDs = append(offlineUserIDs, userID) } @@ -170,7 +220,7 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, usersID []string) ([]s //func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) { // onlineUserIDs := make([]string, 0, len(userIDs)) // for _, userID := range userIDs { -// online, err := o.GetUserOnline(ctx, userID) +// online, err := o.GetUserOnline(ctx, userID) // if err != nil { // return nil, err // }