diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index ad871d7b2..36c1bdd70 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -74,6 +74,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, &config.Share.IMAdmin) + cache.InitLocalCache(&config.LocalCacheConfig) pbconversation.RegisterConversationServer(server, &conversationServer{ msgRpcClient: &msgRpcClient, user: &userRpcClient, diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index 201883d67..fb7a14276 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -94,6 +94,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg &msgRpcClient, notification.WithRpcFunc(userRpcClient.GetUsersInfo), ) + cache.InitLocalCache(&config.LocalCacheConfig) // Register Friend server with refactored MongoDB and Redis integrations pbfriend.RegisterFriendServer(server, &friendServer{ diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 9e02c4058..b80a2cfca 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "math/big" "math/rand" "strconv" @@ -70,8 +71,6 @@ type Config struct { Share config.Share WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache - - } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { @@ -99,7 +98,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) var gs groupServer - database := controller.NewGroupDatabase(rdb,&config.LocalCacheConfig groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs)) + database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs)) gs.db = database gs.user = userRpcClient gs.notification = notification.NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, config, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) { @@ -109,6 +108,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg } return datautil.Slice(users, func(e *sdkws.UserInfo) notification.CommonUser { return e }), nil }) + cache.InitLocalCache(&config.LocalCacheConfig) gs.conversationRpcClient = conversationRpcClient gs.msgRpcClient = msgRpcClient gs.config = config diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 6aa214a34..e73c0adab 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -53,6 +53,7 @@ type Config struct { NotificationConfig config.Notification Share config.Share MinioConfig config.Minio + LocalCacheConfig config.LocalCache } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { @@ -100,6 +101,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } + cache.InitLocalCache(&config.LocalCacheConfig) third.RegisterThirdServer(server, &thirdServer{ apiURL: apiURL, thirdDatabase: controller.NewThirdDatabase(cache.NewThirdCache(rdb), logdb), diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 969e536db..ba39f24e7 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -86,12 +86,13 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi if err != nil { return err } - cache := cache.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, cache.GetDefaultOpt()) + userCache := cache.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, cache.GetDefaultOpt()) userMongoDB := mgo.NewUserMongoDriver(mgocli.GetDB()) - database := controller.NewUserDatabase(userDB, cache, mgocli.GetTx(), userMongoDB) + database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx(), userMongoDB) friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend) groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) + cache.InitLocalCache(&config.LocalCacheConfig) u := &userServer{ db: database, RegisterCenter: client, diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index b6531e215..38e7935b3 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -41,6 +41,7 @@ func NewThirdRpcCmd() *ThirdRpcCmd { ShareFileName: &thirdConfig.Share, NotificationFileName: &thirdConfig.NotificationConfig, MinioConfigFileName: &thirdConfig.MinioConfig, + LocalCacheConfigFileName: &thirdConfig.LocalCacheConfig, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", config.Version) diff --git a/pkg/common/db/cache/config.go b/pkg/common/db/cache/config.go index c760e0c03..bb5bd449b 100644 --- a/pkg/common/db/cache/config.go +++ b/pkg/common/db/cache/config.go @@ -27,29 +27,26 @@ var ( subscribe map[string][]string ) -func getPublishKey(topic string, key []string) []string { - if topic == "" || len(key) == 0 { - return nil - } +func InitLocalCache(localCache *config.LocalCache) { once.Do(func() { list := []struct { - Local config.LocalCache + Local config.CacheConfig Keys []string }{ { - Local: config.Config.LocalCache.User, + Local: localCache.User, Keys: []string{cachekey.UserInfoKey, cachekey.UserGlobalRecvMsgOptKey}, }, { - Local: config.Config.LocalCache.Group, + Local: localCache.Group, Keys: []string{cachekey.GroupMemberIDsKey, cachekey.GroupInfoKey, cachekey.GroupMemberInfoKey}, }, { - Local: config.Config.LocalCache.Friend, + Local: localCache.Friend, Keys: []string{cachekey.FriendIDsKey, cachekey.BlackIDsKey}, }, { - Local: config.Config.LocalCache.Conversation, + Local: localCache.Conversation, Keys: []string{cachekey.ConversationKey, cachekey.ConversationIDsKey, cachekey.ConversationNotReceiveMessageUserIDsKey}, }, } @@ -60,6 +57,12 @@ func getPublishKey(topic string, key []string) []string { } } }) +} + +func getPublishKey(topic string, key []string) []string { + if topic == "" || len(key) == 0 { + return nil + } prefix, ok := subscribe[topic] if !ok { return nil