chore: enable fullUserCache

This commit is contained in:
icey-yu 2024-09-05 17:22:05 +08:00
parent 7f8d26c36b
commit d9064867d5
12 changed files with 137 additions and 24 deletions

View File

@ -38,7 +38,7 @@ iosPush:
badgeCount: true badgeCount: true
production: false production: false
fullUserCache: true

4
go.mod
View File

@ -12,8 +12,8 @@ require (
github.com/gorilla/websocket v1.5.1 github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.12 github.com/openimsdk/protocol v0.0.72-alpha.14
github.com/openimsdk/tools v0.0.50-alpha.11 github.com/openimsdk/tools v0.0.50-alpha.12
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0 github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0 github.com/stretchr/testify v1.9.0

8
go.sum
View File

@ -319,10 +319,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.12 h1:GXUtSFXlh1AeOmMjN1CsRfRZMTQYBWZ8mTuRoB7KxLQ= github.com/openimsdk/protocol v0.0.72-alpha.14 h1:XnmTUJXxxqxVqvpaO90Y+pn6b4Sz5+kvCb73p3ot1/4=
github.com/openimsdk/protocol v0.0.72-alpha.12/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/protocol v0.0.72-alpha.14/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.50-alpha.11 h1:ClhkRjUVJWbmOiQ14G6do/ES1a6ZueDITv40Apwq/Tc= github.com/openimsdk/tools v0.0.50-alpha.12 h1:rV3BxgqN+F79vZvdoQ+97Eob8ScsRVEM8D+Wrcl23uo=
github.com/openimsdk/tools v0.0.50-alpha.11/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/openimsdk/tools v0.0.50-alpha.12/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=

View File

@ -58,7 +58,7 @@ func Start(ctx context.Context, index int, conf *Config) error {
) )
hubServer := NewServer(rpcPort, longServer, conf, func(srv *Server) error { hubServer := NewServer(rpcPort, longServer, conf, func(srv *Server) error {
longServer.online = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, longServer.subscriberUserOnlineStatusChanges) longServer.online = rpccache.NewOnlineCache(srv.userRcp, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges)
return nil return nil
}) })

View File

@ -76,7 +76,7 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher,
consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb) consumerHandler.conversationLocalCache = rpccache.NewConversationLocalCache(consumerHandler.conversationRpcClient, &config.LocalCacheConfig, rdb)
consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
consumerHandler.config = config consumerHandler.config = config
consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil) consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, config.RpcConfig.FullUserCache, nil)
return &consumerHandler, nil return &consumerHandler, nil
} }

View File

@ -220,6 +220,7 @@ type Push struct {
BadgeCount bool `mapstructure:"badgeCount"` BadgeCount bool `mapstructure:"badgeCount"`
Production bool `mapstructure:"production"` Production bool `mapstructure:"production"`
} `mapstructure:"iosPush"` } `mapstructure:"iosPush"`
FullUserCache bool `mapstructure:"fullUserCache"`
} }
type Auth struct { type Auth struct {

View File

@ -20,6 +20,7 @@ type EvictCallback[K comparable, V any] simplelru.EvictCallback[K, V]
type LRU[K comparable, V any] interface { type LRU[K comparable, V any] interface {
Get(key K, fetch func() (V, error)) (V, error) Get(key K, fetch func() (V, error)) (V, error)
Set(key K, value V)
SetHas(key K, value V) bool SetHas(key K, value V) bool
Del(key K) bool Del(key K) bool
Stop() Stop()

View File

@ -99,5 +99,11 @@ func (x *ExpirationLRU[K, V]) SetHas(key K, value V) bool {
return false return false
} }
func (x *ExpirationLRU[K, V]) Set(key K, value V) {
x.lock.Lock()
defer x.lock.Unlock()
x.core.Add(key, &expirationLruItem[V]{value: value})
}
func (x *ExpirationLRU[K, V]) Stop() { func (x *ExpirationLRU[K, V]) Stop() {
} }

View File

@ -116,6 +116,12 @@ func (x *LayLRU[K, V]) SetHasBatch(data map[K]V) bool {
// return x.core.Contains(key) // return x.core.Contains(key)
//} //}
func (x *LayLRU[K, V]) Set(key K, value V) {
x.lock.Lock()
defer x.lock.Unlock()
x.core.Add(key, &layLruItem[V]{value: value, expires: time.Now().Add(x.successTTL).UnixMilli()})
}
func (x *LayLRU[K, V]) SetHas(key K, value V) bool { func (x *LayLRU[K, V]) SetHas(key K, value V) bool {
x.lock.Lock() x.lock.Lock()
defer x.lock.Unlock() defer x.lock.Unlock()

View File

@ -40,6 +40,10 @@ func (x *slotLRU[K, V]) Get(key K, fetch func() (V, error)) (V, error) {
return x.slots[x.getIndex(key)].Get(key, fetch) return x.slots[x.getIndex(key)].Get(key, fetch)
} }
func (x *slotLRU[K, V]) Set(key K, value V) {
x.slots[x.getIndex(key)].Set(key, value)
}
func (x *slotLRU[K, V]) SetHas(key K, value V) bool { func (x *slotLRU[K, V]) SetHas(key K, value V) bool {
return x.slots[x.getIndex(key)].SetHas(key, value) return x.slots[x.getIndex(key)].SetHas(key, value)
} }

View File

@ -7,6 +7,8 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/open-im-server/v3/pkg/util/useronline" "github.com/openimsdk/open-im-server/v3/pkg/util/useronline"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/cacheutil"
"github.com/openimsdk/tools/log" "github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mcontext"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
@ -15,27 +17,47 @@ import (
"time" "time"
) )
func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fn func(ctx context.Context, userID string, platformIDs []int32)) *OnlineCache { func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb redis.UniversalClient, fullUserCache bool, fn func(ctx context.Context, userID string, platformIDs []int32)) *OnlineCache {
x := &OnlineCache{ x := &OnlineCache{
user: user, user: user,
group: group, group: group,
local: lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] { fullUserCache: fullUserCache,
return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {})
}),
} }
switch x.fullUserCache {
case true:
x.mapCache = cacheutil.NewCache[string, []int32]()
case false:
x.lruCache = lru.NewSlotLRU(1024, localcache.LRUStringHash, func() lru.LRU[string, []int32] {
return lru.NewLayLRU[string, []int32](2048, cachekey.OnlineExpire/2, time.Second*3, localcache.EmptyTarget{}, func(key string, value []int32) {})
})
}
go func() { go func() {
ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10)) ctx := mcontext.SetOperationID(context.Background(), cachekey.OnlineChannel+strconv.FormatUint(rand.Uint64(), 10))
for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() { for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() {
userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload) userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload)
if err != nil { if err != nil {
log.ZError(ctx, "OnlineCache setUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel) log.ZError(ctx, "OnlineCache setHasUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
continue continue
} }
storageCache := x.setUserOnline(userID, platformIDs)
log.ZDebug(ctx, "OnlineCache setUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload, "storageCache", storageCache) switch x.fullUserCache {
if fn != nil { case true:
fn(ctx, userID, platformIDs) if len(platformIDs) == 0 {
// offline
x.mapCache.Delete(userID)
} else {
x.mapCache.Store(userID, platformIDs)
}
case false:
storageCache := x.setHasUserOnline(userID, platformIDs)
log.ZDebug(ctx, "OnlineCache setHasUserOnline", "userID", userID, "platformIDs", platformIDs, "payload", message.Payload, "storageCache", storageCache)
if fn != nil {
fn(ctx, userID, platformIDs)
}
} }
} }
}() }()
return x return x
@ -44,11 +66,53 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re
type OnlineCache struct { type OnlineCache struct {
user rpcclient.UserRpcClient user rpcclient.UserRpcClient
group *GroupLocalCache group *GroupLocalCache
local lru.LRU[string, []int32]
// fullUserCache if enabled, caches the online status of all users using mapCache;
// otherwise, only a portion of users' online statuses (regardless of whether they are online) will be cached using lruCache.
fullUserCache bool
lruCache lru.LRU[string, []int32]
mapCache *cacheutil.Cache[string, []int32]
}
func (o *OnlineCache) initUsersOnlineStatus(ctx context.Context) error {
log.ZDebug(ctx, "init users online status begin")
var (
totalSet int
)
defer func(t time.Time) {
log.ZDebug(ctx, "init users online status end", "cost", time.Since(t), "totalSet", totalSet)
}(time.Now())
for page := int32(1); ; page++ {
resp, err := o.user.GetAllUserID(ctx, page, constant.ParamMaxLength)
if err != nil {
return err
}
usersStatus, err := o.user.GetUsersOnlinePlatform(ctx, resp.UserIDs)
if err != nil {
return err
}
for _, user := range usersStatus {
if user.Status == constant.Online {
o.setUserOnline(user.UserID, user.PlatformIDs)
}
totalSet++
}
if len(resp.UserIDs) < constant.ParamMaxLength {
break
}
}
return nil
} }
func (o *OnlineCache) getUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { func (o *OnlineCache) getUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) {
platformIDs, err := o.local.Get(userID, func() ([]int32, error) { platformIDs, err := o.lruCache.Get(userID, func() ([]int32, error) {
return o.user.GetUserOnlinePlatform(ctx, userID) return o.user.GetUserOnlinePlatform(ctx, userID)
}) })
if err != nil { if err != nil {
@ -83,6 +147,19 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, usersID []string) ([]s
offlineUserIDs []string offlineUserIDs []string
) )
switch o.fullUserCache {
case true:
for _, userID := range usersID {
if _, ok := o.mapCache.Load(userID); ok {
onlineUserIDS = append(onlineUserIDS, userID)
} else {
offlineUserIDs = append(offlineUserIDs, userID)
}
}
case false:
}
log.ZDebug(ctx, "get users online", "online users length", len(onlineUserIDS), "offline users length", len(offlineUserIDs))
return onlineUserIDS, offlineUserIDs, nil return onlineUserIDS, offlineUserIDs, nil
} }
@ -120,6 +197,15 @@ func (o *OnlineCache) GetUsersOnline(ctx context.Context, usersID []string) ([]s
// return onlineUserIDs, nil // return onlineUserIDs, nil
//} //}
func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) bool { func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) {
return o.local.SetHas(userID, platformIDs) switch o.fullUserCache {
case true:
o.mapCache.Store(userID, platformIDs)
case false:
o.lruCache.Set(userID, platformIDs)
}
}
func (o *OnlineCache) setHasUserOnline(userID string, platformIDs []int32) bool {
return o.lruCache.SetHas(userID, platformIDs)
} }

View File

@ -169,6 +169,15 @@ func (u *UserRpcClient) Access(ctx context.Context, ownerUserID string) error {
return authverify.CheckAccessV3(ctx, ownerUserID, u.imAdminUserID) return authverify.CheckAccessV3(ctx, ownerUserID, u.imAdminUserID)
} }
// GetAllUserID retrieves all user IDs with pagination options.
func (u *UserRpcClient) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (*user.GetAllUserIDResp, error) {
resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}})
if err != nil {
return nil, err
}
return resp, nil
}
// GetAllUserIDs retrieves all user IDs with pagination options. // GetAllUserIDs retrieves all user IDs with pagination options.
func (u *UserRpcClient) GetAllUserIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) { func (u *UserRpcClient) GetAllUserIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) {
resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}}) resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}})