From bbb5ef5cccbff2f4ff2c6f64538b1cda02509a16 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Tue, 9 Jan 2024 17:01:50 +0800 Subject: [PATCH] feat: msg local cache --- deployments/templates/openim.yaml | 75 +++++++++++++++++-------------- internal/rpc/msg/server.go | 2 +- pkg/common/config/config.go | 12 +++++ pkg/common/db/cache/black.go | 7 ++- pkg/common/db/cache/friend.go | 6 ++- pkg/common/db/cache/meta_cache.go | 46 +++++++++---------- pkg/common/localcache/business.go | 1 + pkg/common/localcache/cache.go | 4 +- pkg/rpccache/friend.go | 6 ++- 9 files changed, 93 insertions(+), 66 deletions(-) diff --git a/deployments/templates/openim.yaml b/deployments/templates/openim.yaml index 6880e4c4e..7f33fa015 100644 --- a/deployments/templates/openim.yaml +++ b/deployments/templates/openim.yaml @@ -31,7 +31,7 @@ envs: # Zookeeper password zookeeper: schema: ${ZOOKEEPER_SCHEMA} - address: [ ${ZOOKEEPER_ADDRESS}:${ZOOKEEPER_PORT} ] + address: [ ${ ZOOKEEPER_ADDRESS }:${ ZOOKEEPER_PORT } ] username: ${ZOOKEEPER_USERNAME} password: ${ZOOKEEPER_PASSWORD} @@ -44,14 +44,14 @@ zookeeper: mongo: uri: ${MONGO_URI} -# List of MongoDB server addresses. -# Used for constructing the MongoDB URI if 'uri' above is empty. -# For a standalone setup, specify the address of the single server. -# For a sharded cluster, specify the addresses of the Mongos servers. -# Example: [ '172.28.0.1:37017', '172.28.0.2:37017' ] -# Default MongoDB database name -# Maximum connection pool size - address: [ ${MONGO_ADDRESS}:${MONGO_PORT} ] + # List of MongoDB server addresses. + # Used for constructing the MongoDB URI if 'uri' above is empty. + # For a standalone setup, specify the address of the single server. + # For a sharded cluster, specify the addresses of the Mongos servers. + # Example: [ '172.28.0.1:37017', '172.28.0.2:37017' ] + # Default MongoDB database name + # Maximum connection pool size + address: [ ${ MONGO_ADDRESS }:${ MONGO_PORT } ] database: ${MONGO_DATABASE} username: ${MONGO_OPENIM_USERNAME} password: ${MONGO_OPENIM_PASSWORD} @@ -62,7 +62,7 @@ mongo: # # Username is required only for Redis version 6.0+ redis: - address: [ ${REDIS_ADDRESS}:${REDIS_PORT} ] + address: [ ${ REDIS_ADDRESS }:${ REDIS_PORT } ] username: ${REDIS_USERNAME} password: ${REDIS_PASSWORD} @@ -76,7 +76,7 @@ redis: kafka: username: ${KAFKA_USERNAME} password: ${KAFKA_PASSWORD} - addr: [ ${KAFKA_ADDRESS}:${KAFKA_PORT} ] + addr: [ ${ KAFKA_ADDRESS }:${ KAFKA_PORT } ] latestMsgToRedis: topic: "${KAFKA_LATESTMSG_REDIS_TOPIC}" offlineMsgToMongo: @@ -104,7 +104,7 @@ rpc: # API service port # Default listen IP is 0.0.0.0 api: - openImApiPort: [ ${API_OPENIM_PORT} ] + openImApiPort: [ ${ API_OPENIM_PORT } ] listenIP: ${API_LISTEN_IP} ###################### Object configuration information ###################### @@ -160,14 +160,14 @@ object: # For launching multiple programs, just fill in multiple ports separated by commas # For example, [10110, 10111] rpcPort: - openImUserPort: [ ${OPENIM_USER_PORT} ] - openImFriendPort: [ ${OPENIM_FRIEND_PORT} ] - openImMessagePort: [ ${OPENIM_MESSAGE_PORT} ] - openImGroupPort: [ ${OPENIM_GROUP_PORT} ] - openImAuthPort: [ ${OPENIM_AUTH_PORT} ] - openImPushPort: [ ${OPENIM_PUSH_PORT} ] - openImConversationPort: [ ${OPENIM_CONVERSATION_PORT} ] - openImThirdPort: [ ${OPENIM_THIRD_PORT} ] + openImUserPort: [ ${ OPENIM_USER_PORT } ] + openImFriendPort: [ ${ OPENIM_FRIEND_PORT } ] + openImMessagePort: [ ${ OPENIM_MESSAGE_PORT } ] + openImGroupPort: [ ${ OPENIM_GROUP_PORT } ] + openImAuthPort: [ ${ OPENIM_AUTH_PORT } ] + openImPushPort: [ ${ OPENIM_PUSH_PORT } ] + openImConversationPort: [ ${ OPENIM_CONVERSATION_PORT } ] + openImThirdPort: [ ${ OPENIM_THIRD_PORT } ] ###################### RPC Register Name Configuration ###################### # RPC service names for registration, it's not recommended to modify these @@ -209,9 +209,9 @@ log: # Maximum length of websocket request package # Websocket connection handshake timeout longConnSvr: - openImWsPort: [ ${OPENIM_WS_PORT} ] + openImWsPort: [ ${ OPENIM_WS_PORT } ] websocketMaxConnNum: ${WEBSOCKET_MAX_CONN_NUM} - openImMessageGatewayPort: [ ${OPENIM_MESSAGE_GATEWAY_PORT} ] + openImMessageGatewayPort: [ ${ OPENIM_MESSAGE_GATEWAY_PORT } ] websocketMaxMsgLen: ${WEBSOCKET_MAX_MSG_LEN} websocketTimeout: ${WEBSOCKET_TIMEOUT} @@ -507,15 +507,22 @@ callback: prometheus: enable: ${PROMETHEUS_ENABLE} grafanaUrl: ${GRAFANA_URL} - apiPrometheusPort: [${API_PROM_PORT}] - userPrometheusPort: [ ${USER_PROM_PORT} ] - friendPrometheusPort: [ ${FRIEND_PROM_PORT} ] - messagePrometheusPort: [ ${MESSAGE_PROM_PORT} ] - messageGatewayPrometheusPort: [ ${MSG_GATEWAY_PROM_PORT} ] - groupPrometheusPort: [ ${GROUP_PROM_PORT} ] - authPrometheusPort: [ ${AUTH_PROM_PORT} ] - pushPrometheusPort: [ ${PUSH_PROM_PORT} ] - conversationPrometheusPort: [ ${CONVERSATION_PROM_PORT} ] - rtcPrometheusPort: [ ${RTC_PROM_PORT} ] - thirdPrometheusPort: [ ${THIRD_PROM_PORT} ] - messageTransferPrometheusPort: [ ${MSG_TRANSFER_PROM_PORT} ] # List of ports + apiPrometheusPort: [ ${ API_PROM_PORT } ] + userPrometheusPort: [ ${ USER_PROM_PORT } ] + friendPrometheusPort: [ ${ FRIEND_PROM_PORT } ] + messagePrometheusPort: [ ${ MESSAGE_PROM_PORT } ] + messageGatewayPrometheusPort: [ ${ MSG_GATEWAY_PROM_PORT } ] + groupPrometheusPort: [ ${ GROUP_PROM_PORT } ] + authPrometheusPort: [ ${ AUTH_PROM_PORT } ] + pushPrometheusPort: [ ${ PUSH_PROM_PORT } ] + conversationPrometheusPort: [ ${ CONVERSATION_PROM_PORT } ] + rtcPrometheusPort: [ ${ RTC_PROM_PORT } ] + thirdPrometheusPort: [ ${ THIRD_PROM_PORT } ] + messageTransferPrometheusPort: [ ${ MSG_TRANSFER_PROM_PORT } ] # List of ports + +localCache: + friend: + topic: "friend" + slotNum: 500 + slotSize: 20000 + diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 9b1965b00..0600031cd 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -90,7 +90,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e RegisterCenter: client, GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient), ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient), - friend: rpccache.NewFriendLocalCache(rpcclient.NewFriendRpcClient(client)), + friend: rpccache.NewFriendLocalCache(rpcclient.NewFriendRpcClient(client), rdb), } s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg)) s.addInterceptorHandler(MessageHasReadEnabled) diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 88e87e709..0b31220c9 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -258,6 +258,8 @@ type configStruct struct { FriendVerify *bool `yaml:"friendVerify"` } `yaml:"messageVerify"` + LocalCache localCache `yaml:"localCache"` + IOSPush struct { PushSound string `yaml:"pushSound"` BadgeCount bool `yaml:"badgeCount"` @@ -370,6 +372,16 @@ type notification struct { ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"` } +type LocalCache struct { + Topic string `yaml:"topic"` + SlotNum int `yaml:"slotNum"` + SlotSize int `yaml:"slotSize"` +} + +type localCache struct { + Friend LocalCache `yaml:"friend"` +} + func (c *configStruct) GetServiceNames() []string { return []string{ c.RpcRegisterName.OpenImUserName, diff --git a/pkg/common/db/cache/black.go b/pkg/common/db/cache/black.go index f69b83afe..e98f7de7b 100644 --- a/pkg/common/db/cache/black.go +++ b/pkg/common/db/cache/black.go @@ -17,6 +17,7 @@ package cache import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "time" "github.com/dtm-labs/rockscache" @@ -53,11 +54,13 @@ func NewBlackCacheRedis( options rockscache.Options, ) BlackCache { rcClient := rockscache.NewClient(rdb, options) - + mc := NewMetaCacheRedis(rcClient) + mc.SetTopic(config.Config.LocalCache.Friend.Topic) + mc.SetRawRedisClient(rdb) return &BlackCacheRedis{ expireTime: blackExpireTime, rcClient: rcClient, - metaCache: NewMetaCacheRedis(rcClient), + metaCache: mc, blackDB: blackDB, } } diff --git a/pkg/common/db/cache/friend.go b/pkg/common/db/cache/friend.go index 4805271a3..160c6df95 100644 --- a/pkg/common/db/cache/friend.go +++ b/pkg/common/db/cache/friend.go @@ -17,6 +17,7 @@ package cache import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "time" "github.com/dtm-labs/rockscache" @@ -59,8 +60,11 @@ type FriendCacheRedis struct { func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendModelInterface, options rockscache.Options) FriendCache { rcClient := rockscache.NewClient(rdb, options) + mc := NewMetaCacheRedis(rcClient) + mc.SetTopic(config.Config.LocalCache.Friend.Topic) + mc.SetRawRedisClient(rdb) return &FriendCacheRedis{ - metaCache: NewMetaCacheRedis(rcClient), + metaCache: mc, friendDB: friendDB, expireTime: friendExpireTime, rcClient: rcClient, diff --git a/pkg/common/db/cache/meta_cache.go b/pkg/common/db/cache/meta_cache.go index 4bc2a046a..863f6d243 100644 --- a/pkg/common/db/cache/meta_cache.go +++ b/pkg/common/db/cache/meta_cache.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "github.com/redis/go-redis/v9" "time" "github.com/OpenIMSDK/tools/mw/specialerror" @@ -44,6 +45,8 @@ type metaCache interface { AddKeys(keys ...string) ClearKeys() GetPreDelKeys() []string + SetTopic(topic string) + SetRawRedisClient(cli redis.UniversalClient) } func NewMetaCacheRedis(rcClient *rockscache.Client, keys ...string) metaCache { @@ -51,10 +54,20 @@ func NewMetaCacheRedis(rcClient *rockscache.Client, keys ...string) metaCache { } type metaCacheRedis struct { + topic string rcClient *rockscache.Client keys []string maxRetryTimes int retryInterval time.Duration + redisClient redis.UniversalClient +} + +func (m *metaCacheRedis) SetTopic(topic string) { + m.topic = topic +} + +func (m *metaCacheRedis) SetRawRedisClient(cli redis.UniversalClient) { + m.redisClient = cli } func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error { @@ -72,31 +85,18 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error { } break } - - //retryTimes := 0 - //for { - // m.rcClient.TagAsDeleted() - // if err := m.rcClient.TagAsDeletedBatch2(ctx, []string{key}); err != nil { - // if retryTimes >= m.maxRetryTimes { - // err = errs.ErrInternalServer.Wrap( - // fmt.Sprintf( - // "delete cache error: %v, keys: %v, retry times %d, please check redis server", - // err, - // key, - // retryTimes, - // ), - // ) - // log.ZWarn(ctx, "delete cache failed, please handle keys", err, "keys", key) - // return err - // } - // retryTimes++ - // } else { - // break - // } - //} + } + if m.topic != "" && m.redisClient != nil { + data, err := json.Marshal(m.keys) + if err != nil { + log.ZError(ctx, "keys json marshal failed", err, "topic", m.topic, "keys", m.keys) + } else { + if err := m.redisClient.Publish(ctx, m.topic, string(data)).Err(); err != nil { + log.ZError(ctx, "redis publish cache delete error", err, "topic", m.topic, "keys", m.keys) + } + } } } - return nil } diff --git a/pkg/common/localcache/business.go b/pkg/common/localcache/business.go index 221eb2664..f011719df 100644 --- a/pkg/common/localcache/business.go +++ b/pkg/common/localcache/business.go @@ -16,6 +16,7 @@ func WithRedisDeleteSubscribe(topic string, cli redis.UniversalClient) Option { } msg := cli.Subscribe(context.Background(), topic).Channel() for m := range msg { + log.ZDebug(context.Background(), "WithRedisDeleteSubscribe delete", "topic", m.Channel, "payload", m.Payload) var key []string if err := json.Unmarshal([]byte(m.Payload), &key); err != nil { log.ZError(context.Background(), "WithRedisDeleteSubscribe json unmarshal error", err, "topic", topic, "payload", m.Payload) diff --git a/pkg/common/localcache/cache.go b/pkg/common/localcache/cache.go index 64df1fe02..77e77ce5f 100644 --- a/pkg/common/localcache/cache.go +++ b/pkg/common/localcache/cache.go @@ -33,9 +33,7 @@ type cache[V any] struct { func (c *cache[V]) onEvict(key string, value V) { for k := range c.link.Del(key) { - if key != k { - c.local.Del(k) - } + c.local.Del(k) } } diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go index 68c6a736a..737f674ab 100644 --- a/pkg/rpccache/friend.go +++ b/pkg/rpccache/friend.go @@ -3,14 +3,16 @@ package rpccache import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/redis/go-redis/v9" ) -func NewFriendLocalCache(client rpcclient.FriendRpcClient) *FriendLocalCache { +func NewFriendLocalCache(client rpcclient.FriendRpcClient, cli redis.UniversalClient) *FriendLocalCache { return &FriendLocalCache{ - local: localcache.New[any](), + local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Friend.Topic, cli)), client: client, } }