feat: msg local cache

This commit is contained in:
withchao 2024-01-09 17:01:50 +08:00
parent 0248fbb47d
commit bbb5ef5ccc
9 changed files with 93 additions and 66 deletions

View File

@ -31,7 +31,7 @@ envs:
# Zookeeper password # Zookeeper password
zookeeper: zookeeper:
schema: ${ZOOKEEPER_SCHEMA} schema: ${ZOOKEEPER_SCHEMA}
address: [ ${ZOOKEEPER_ADDRESS}:${ZOOKEEPER_PORT} ] address: [ ${ ZOOKEEPER_ADDRESS }:${ ZOOKEEPER_PORT } ]
username: ${ZOOKEEPER_USERNAME} username: ${ZOOKEEPER_USERNAME}
password: ${ZOOKEEPER_PASSWORD} password: ${ZOOKEEPER_PASSWORD}
@ -44,14 +44,14 @@ zookeeper:
mongo: mongo:
uri: ${MONGO_URI} uri: ${MONGO_URI}
# List of MongoDB server addresses. # List of MongoDB server addresses.
# Used for constructing the MongoDB URI if 'uri' above is empty. # Used for constructing the MongoDB URI if 'uri' above is empty.
# For a standalone setup, specify the address of the single server. # For a standalone setup, specify the address of the single server.
# For a sharded cluster, specify the addresses of the Mongos servers. # For a sharded cluster, specify the addresses of the Mongos servers.
# Example: [ '172.28.0.1:37017', '172.28.0.2:37017' ] # Example: [ '172.28.0.1:37017', '172.28.0.2:37017' ]
# Default MongoDB database name # Default MongoDB database name
# Maximum connection pool size # Maximum connection pool size
address: [ ${MONGO_ADDRESS}:${MONGO_PORT} ] address: [ ${ MONGO_ADDRESS }:${ MONGO_PORT } ]
database: ${MONGO_DATABASE} database: ${MONGO_DATABASE}
username: ${MONGO_OPENIM_USERNAME} username: ${MONGO_OPENIM_USERNAME}
password: ${MONGO_OPENIM_PASSWORD} password: ${MONGO_OPENIM_PASSWORD}
@ -62,7 +62,7 @@ mongo:
# #
# Username is required only for Redis version 6.0+ # Username is required only for Redis version 6.0+
redis: redis:
address: [ ${REDIS_ADDRESS}:${REDIS_PORT} ] address: [ ${ REDIS_ADDRESS }:${ REDIS_PORT } ]
username: ${REDIS_USERNAME} username: ${REDIS_USERNAME}
password: ${REDIS_PASSWORD} password: ${REDIS_PASSWORD}
@ -76,7 +76,7 @@ redis:
kafka: kafka:
username: ${KAFKA_USERNAME} username: ${KAFKA_USERNAME}
password: ${KAFKA_PASSWORD} password: ${KAFKA_PASSWORD}
addr: [ ${KAFKA_ADDRESS}:${KAFKA_PORT} ] addr: [ ${ KAFKA_ADDRESS }:${ KAFKA_PORT } ]
latestMsgToRedis: latestMsgToRedis:
topic: "${KAFKA_LATESTMSG_REDIS_TOPIC}" topic: "${KAFKA_LATESTMSG_REDIS_TOPIC}"
offlineMsgToMongo: offlineMsgToMongo:
@ -104,7 +104,7 @@ rpc:
# API service port # API service port
# Default listen IP is 0.0.0.0 # Default listen IP is 0.0.0.0
api: api:
openImApiPort: [ ${API_OPENIM_PORT} ] openImApiPort: [ ${ API_OPENIM_PORT } ]
listenIP: ${API_LISTEN_IP} listenIP: ${API_LISTEN_IP}
###################### Object configuration information ###################### ###################### Object configuration information ######################
@ -160,14 +160,14 @@ object:
# For launching multiple programs, just fill in multiple ports separated by commas # For launching multiple programs, just fill in multiple ports separated by commas
# For example, [10110, 10111] # For example, [10110, 10111]
rpcPort: rpcPort:
openImUserPort: [ ${OPENIM_USER_PORT} ] openImUserPort: [ ${ OPENIM_USER_PORT } ]
openImFriendPort: [ ${OPENIM_FRIEND_PORT} ] openImFriendPort: [ ${ OPENIM_FRIEND_PORT } ]
openImMessagePort: [ ${OPENIM_MESSAGE_PORT} ] openImMessagePort: [ ${ OPENIM_MESSAGE_PORT } ]
openImGroupPort: [ ${OPENIM_GROUP_PORT} ] openImGroupPort: [ ${ OPENIM_GROUP_PORT } ]
openImAuthPort: [ ${OPENIM_AUTH_PORT} ] openImAuthPort: [ ${ OPENIM_AUTH_PORT } ]
openImPushPort: [ ${OPENIM_PUSH_PORT} ] openImPushPort: [ ${ OPENIM_PUSH_PORT } ]
openImConversationPort: [ ${OPENIM_CONVERSATION_PORT} ] openImConversationPort: [ ${ OPENIM_CONVERSATION_PORT } ]
openImThirdPort: [ ${OPENIM_THIRD_PORT} ] openImThirdPort: [ ${ OPENIM_THIRD_PORT } ]
###################### RPC Register Name Configuration ###################### ###################### RPC Register Name Configuration ######################
# RPC service names for registration, it's not recommended to modify these # RPC service names for registration, it's not recommended to modify these
@ -209,9 +209,9 @@ log:
# Maximum length of websocket request package # Maximum length of websocket request package
# Websocket connection handshake timeout # Websocket connection handshake timeout
longConnSvr: longConnSvr:
openImWsPort: [ ${OPENIM_WS_PORT} ] openImWsPort: [ ${ OPENIM_WS_PORT } ]
websocketMaxConnNum: ${WEBSOCKET_MAX_CONN_NUM} websocketMaxConnNum: ${WEBSOCKET_MAX_CONN_NUM}
openImMessageGatewayPort: [ ${OPENIM_MESSAGE_GATEWAY_PORT} ] openImMessageGatewayPort: [ ${ OPENIM_MESSAGE_GATEWAY_PORT } ]
websocketMaxMsgLen: ${WEBSOCKET_MAX_MSG_LEN} websocketMaxMsgLen: ${WEBSOCKET_MAX_MSG_LEN}
websocketTimeout: ${WEBSOCKET_TIMEOUT} websocketTimeout: ${WEBSOCKET_TIMEOUT}
@ -507,15 +507,22 @@ callback:
prometheus: prometheus:
enable: ${PROMETHEUS_ENABLE} enable: ${PROMETHEUS_ENABLE}
grafanaUrl: ${GRAFANA_URL} grafanaUrl: ${GRAFANA_URL}
apiPrometheusPort: [${API_PROM_PORT}] apiPrometheusPort: [ ${ API_PROM_PORT } ]
userPrometheusPort: [ ${USER_PROM_PORT} ] userPrometheusPort: [ ${ USER_PROM_PORT } ]
friendPrometheusPort: [ ${FRIEND_PROM_PORT} ] friendPrometheusPort: [ ${ FRIEND_PROM_PORT } ]
messagePrometheusPort: [ ${MESSAGE_PROM_PORT} ] messagePrometheusPort: [ ${ MESSAGE_PROM_PORT } ]
messageGatewayPrometheusPort: [ ${MSG_GATEWAY_PROM_PORT} ] messageGatewayPrometheusPort: [ ${ MSG_GATEWAY_PROM_PORT } ]
groupPrometheusPort: [ ${GROUP_PROM_PORT} ] groupPrometheusPort: [ ${ GROUP_PROM_PORT } ]
authPrometheusPort: [ ${AUTH_PROM_PORT} ] authPrometheusPort: [ ${ AUTH_PROM_PORT } ]
pushPrometheusPort: [ ${PUSH_PROM_PORT} ] pushPrometheusPort: [ ${ PUSH_PROM_PORT } ]
conversationPrometheusPort: [ ${CONVERSATION_PROM_PORT} ] conversationPrometheusPort: [ ${ CONVERSATION_PROM_PORT } ]
rtcPrometheusPort: [ ${RTC_PROM_PORT} ] rtcPrometheusPort: [ ${ RTC_PROM_PORT } ]
thirdPrometheusPort: [ ${THIRD_PROM_PORT} ] thirdPrometheusPort: [ ${ THIRD_PROM_PORT } ]
messageTransferPrometheusPort: [ ${MSG_TRANSFER_PROM_PORT} ] # List of ports messageTransferPrometheusPort: [ ${ MSG_TRANSFER_PROM_PORT } ] # List of ports
localCache:
friend:
topic: "friend"
slotNum: 500
slotSize: 20000

View File

@ -90,7 +90,7 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
RegisterCenter: client, RegisterCenter: client,
GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient), GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient),
ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient), ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient),
friend: rpccache.NewFriendLocalCache(rpcclient.NewFriendRpcClient(client)), friend: rpccache.NewFriendLocalCache(rpcclient.NewFriendRpcClient(client), rdb),
} }
s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg)) s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg))
s.addInterceptorHandler(MessageHasReadEnabled) s.addInterceptorHandler(MessageHasReadEnabled)

View File

@ -258,6 +258,8 @@ type configStruct struct {
FriendVerify *bool `yaml:"friendVerify"` FriendVerify *bool `yaml:"friendVerify"`
} `yaml:"messageVerify"` } `yaml:"messageVerify"`
LocalCache localCache `yaml:"localCache"`
IOSPush struct { IOSPush struct {
PushSound string `yaml:"pushSound"` PushSound string `yaml:"pushSound"`
BadgeCount bool `yaml:"badgeCount"` BadgeCount bool `yaml:"badgeCount"`
@ -370,6 +372,16 @@ type notification struct {
ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"` ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"`
} }
type LocalCache struct {
Topic string `yaml:"topic"`
SlotNum int `yaml:"slotNum"`
SlotSize int `yaml:"slotSize"`
}
type localCache struct {
Friend LocalCache `yaml:"friend"`
}
func (c *configStruct) GetServiceNames() []string { func (c *configStruct) GetServiceNames() []string {
return []string{ return []string{
c.RpcRegisterName.OpenImUserName, c.RpcRegisterName.OpenImUserName,

View File

@ -17,6 +17,7 @@ package cache
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"time" "time"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
@ -53,11 +54,13 @@ func NewBlackCacheRedis(
options rockscache.Options, options rockscache.Options,
) BlackCache { ) BlackCache {
rcClient := rockscache.NewClient(rdb, options) rcClient := rockscache.NewClient(rdb, options)
mc := NewMetaCacheRedis(rcClient)
mc.SetTopic(config.Config.LocalCache.Friend.Topic)
mc.SetRawRedisClient(rdb)
return &BlackCacheRedis{ return &BlackCacheRedis{
expireTime: blackExpireTime, expireTime: blackExpireTime,
rcClient: rcClient, rcClient: rcClient,
metaCache: NewMetaCacheRedis(rcClient), metaCache: mc,
blackDB: blackDB, blackDB: blackDB,
} }
} }

View File

@ -17,6 +17,7 @@ package cache
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"time" "time"
"github.com/dtm-labs/rockscache" "github.com/dtm-labs/rockscache"
@ -59,8 +60,11 @@ type FriendCacheRedis struct {
func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendModelInterface, func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendModelInterface,
options rockscache.Options) FriendCache { options rockscache.Options) FriendCache {
rcClient := rockscache.NewClient(rdb, options) rcClient := rockscache.NewClient(rdb, options)
mc := NewMetaCacheRedis(rcClient)
mc.SetTopic(config.Config.LocalCache.Friend.Topic)
mc.SetRawRedisClient(rdb)
return &FriendCacheRedis{ return &FriendCacheRedis{
metaCache: NewMetaCacheRedis(rcClient), metaCache: mc,
friendDB: friendDB, friendDB: friendDB,
expireTime: friendExpireTime, expireTime: friendExpireTime,
rcClient: rcClient, rcClient: rcClient,

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/redis/go-redis/v9"
"time" "time"
"github.com/OpenIMSDK/tools/mw/specialerror" "github.com/OpenIMSDK/tools/mw/specialerror"
@ -44,6 +45,8 @@ type metaCache interface {
AddKeys(keys ...string) AddKeys(keys ...string)
ClearKeys() ClearKeys()
GetPreDelKeys() []string GetPreDelKeys() []string
SetTopic(topic string)
SetRawRedisClient(cli redis.UniversalClient)
} }
func NewMetaCacheRedis(rcClient *rockscache.Client, keys ...string) metaCache { func NewMetaCacheRedis(rcClient *rockscache.Client, keys ...string) metaCache {
@ -51,10 +54,20 @@ func NewMetaCacheRedis(rcClient *rockscache.Client, keys ...string) metaCache {
} }
type metaCacheRedis struct { type metaCacheRedis struct {
topic string
rcClient *rockscache.Client rcClient *rockscache.Client
keys []string keys []string
maxRetryTimes int maxRetryTimes int
retryInterval time.Duration retryInterval time.Duration
redisClient redis.UniversalClient
}
func (m *metaCacheRedis) SetTopic(topic string) {
m.topic = topic
}
func (m *metaCacheRedis) SetRawRedisClient(cli redis.UniversalClient) {
m.redisClient = cli
} }
func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error { func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error {
@ -72,31 +85,18 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error {
} }
break break
} }
}
//retryTimes := 0 if m.topic != "" && m.redisClient != nil {
//for { data, err := json.Marshal(m.keys)
// m.rcClient.TagAsDeleted() if err != nil {
// if err := m.rcClient.TagAsDeletedBatch2(ctx, []string{key}); err != nil { log.ZError(ctx, "keys json marshal failed", err, "topic", m.topic, "keys", m.keys)
// if retryTimes >= m.maxRetryTimes { } else {
// err = errs.ErrInternalServer.Wrap( if err := m.redisClient.Publish(ctx, m.topic, string(data)).Err(); err != nil {
// fmt.Sprintf( log.ZError(ctx, "redis publish cache delete error", err, "topic", m.topic, "keys", m.keys)
// "delete cache error: %v, keys: %v, retry times %d, please check redis server", }
// err, }
// key,
// retryTimes,
// ),
// )
// log.ZWarn(ctx, "delete cache failed, please handle keys", err, "keys", key)
// return err
// }
// retryTimes++
// } else {
// break
// }
//}
} }
} }
return nil return nil
} }

View File

@ -16,6 +16,7 @@ func WithRedisDeleteSubscribe(topic string, cli redis.UniversalClient) Option {
} }
msg := cli.Subscribe(context.Background(), topic).Channel() msg := cli.Subscribe(context.Background(), topic).Channel()
for m := range msg { for m := range msg {
log.ZDebug(context.Background(), "WithRedisDeleteSubscribe delete", "topic", m.Channel, "payload", m.Payload)
var key []string var key []string
if err := json.Unmarshal([]byte(m.Payload), &key); err != nil { if err := json.Unmarshal([]byte(m.Payload), &key); err != nil {
log.ZError(context.Background(), "WithRedisDeleteSubscribe json unmarshal error", err, "topic", topic, "payload", m.Payload) log.ZError(context.Background(), "WithRedisDeleteSubscribe json unmarshal error", err, "topic", topic, "payload", m.Payload)

View File

@ -33,10 +33,8 @@ type cache[V any] struct {
func (c *cache[V]) onEvict(key string, value V) { func (c *cache[V]) onEvict(key string, value V) {
for k := range c.link.Del(key) { for k := range c.link.Del(key) {
if key != k {
c.local.Del(k) c.local.Del(k)
} }
}
} }
func (c *cache[V]) del(key ...string) { func (c *cache[V]) del(key ...string) {

View File

@ -3,14 +3,16 @@ package rpccache
import ( import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/redis/go-redis/v9"
) )
func NewFriendLocalCache(client rpcclient.FriendRpcClient) *FriendLocalCache { func NewFriendLocalCache(client rpcclient.FriendRpcClient, cli redis.UniversalClient) *FriendLocalCache {
return &FriendLocalCache{ return &FriendLocalCache{
local: localcache.New[any](), local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Friend.Topic, cli)),
client: client, client: client,
} }
} }