feat: local cache

This commit is contained in:
withchao 2024-01-12 15:41:05 +08:00
parent 006e4a1e93
commit 45064ae5ca
28 changed files with 268 additions and 386 deletions

View File

@ -530,6 +530,16 @@ prometheus:
localCache: localCache:
friend: friend:
topic: friend topic: delete_cache_friend
slotNum: 500
slotSize: 20000
group:
topic: delete_cache_group
slotNum: 500
slotSize: 20000
conversation:
topic: delete_cache_conversation
slotNum: 500 slotNum: 500
slotSize: 20000 slotSize: 20000

View File

@ -17,6 +17,7 @@ package push
import ( import (
"context" "context"
"github.com/OpenIMSDK/tools/utils" "github.com/OpenIMSDK/tools/utils"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
"sync" "sync"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -28,7 +29,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
) )
@ -51,8 +51,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
client, client,
offlinePusher, offlinePusher,
database, database,
localcache.NewGroupLocalCache(&groupRpcClient), rpccache.NewGroupLocalCache(rpcclient.NewGroupRpcClient(client), rdb),
localcache.NewConversationLocalCache(&conversationRpcClient), rpccache.NewConversationLocalCache(rpcclient.NewConversationRpcClient(client), rdb),
&conversationRpcClient, &conversationRpcClient,
&groupRpcClient, &groupRpcClient,
&msgRpcClient, &msgRpcClient,

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
"google.golang.org/grpc" "google.golang.org/grpc"
"sync" "sync"
@ -40,7 +41,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
@ -50,8 +50,8 @@ type Pusher struct {
database controller.PushDatabase database controller.PushDatabase
discov discoveryregistry.SvcDiscoveryRegistry discov discoveryregistry.SvcDiscoveryRegistry
offlinePusher offlinepush.OfflinePusher offlinePusher offlinepush.OfflinePusher
groupLocalCache *localcache.GroupLocalCache groupLocalCache *rpccache.GroupLocalCache
conversationLocalCache *localcache.ConversationLocalCache conversationLocalCache *rpccache.ConversationLocalCache
msgRpcClient *rpcclient.MessageRpcClient msgRpcClient *rpcclient.MessageRpcClient
conversationRpcClient *rpcclient.ConversationRpcClient conversationRpcClient *rpcclient.ConversationRpcClient
groupRpcClient *rpcclient.GroupRpcClient groupRpcClient *rpcclient.GroupRpcClient
@ -60,7 +60,7 @@ type Pusher struct {
var errNoOfflinePusher = errors.New("no offlinePusher is configured") var errNoOfflinePusher = errors.New("no offlinePusher is configured")
func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase, func NewPusher(discov discoveryregistry.SvcDiscoveryRegistry, offlinePusher offlinepush.OfflinePusher, database controller.PushDatabase,
groupLocalCache *localcache.GroupLocalCache, conversationLocalCache *localcache.ConversationLocalCache, groupLocalCache *rpccache.GroupLocalCache, conversationLocalCache *rpccache.ConversationLocalCache,
conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient, msgRpcClient *rpcclient.MessageRpcClient,
) *Pusher { ) *Pusher {
return &Pusher{ return &Pusher{

View File

@ -27,7 +27,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/db/controller"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation" "github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
) )
@ -41,9 +40,8 @@ type (
User *rpcclient.UserRpcClient User *rpcclient.UserRpcClient
Conversation *rpcclient.ConversationRpcClient Conversation *rpcclient.ConversationRpcClient
friend *rpccache.FriendLocalCache friend *rpccache.FriendLocalCache
//friend *rpcclient.FriendRpcClient GroupLocalCache *rpccache.GroupLocalCache
GroupLocalCache *localcache.GroupLocalCache ConversationLocalCache *rpccache.ConversationLocalCache
ConversationLocalCache *localcache.ConversationLocalCache
Handlers MessageInterceptorChain Handlers MessageInterceptorChain
notificationSender *rpcclient.NotificationSender notificationSender *rpcclient.NotificationSender
} }
@ -89,8 +87,8 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
Group: &groupRpcClient, Group: &groupRpcClient,
MsgDatabase: msgDatabase, MsgDatabase: msgDatabase,
RegisterCenter: client, RegisterCenter: client,
GroupLocalCache: localcache.NewGroupLocalCache(&groupRpcClient), GroupLocalCache: rpccache.NewGroupLocalCache(rpcclient.NewGroupRpcClient(client), rdb),
ConversationLocalCache: localcache.NewConversationLocalCache(&conversationClient), ConversationLocalCache: rpccache.NewConversationLocalCache(rpcclient.NewConversationRpcClient(client), rdb),
friend: rpccache.NewFriendLocalCache(rpcclient.NewFriendRpcClient(client), rdb), friend: rpccache.NewFriendLocalCache(rpcclient.NewFriendRpcClient(client), rdb),
} }
s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg)) s.notificationSender = rpcclient.NewNotificationSender(rpcclient.WithLocalSendMsg(s.SendMsg))

View File

@ -1,15 +1,15 @@
package cachekey package cachekey
const ( const (
blackIDsKey = "BLACK_IDS:" BlackIDsKey = "BLACK_IDS:"
isBlackKey = "IS_BLACK:" IsBlackKey = "IS_BLACK:" // local cache
) )
func GetBlackIDsKey(ownerUserID string) string { func GetBlackIDsKey(ownerUserID string) string {
return blackIDsKey + ownerUserID return BlackIDsKey + ownerUserID
} }
func GetIsBlackIDsKey(possibleBlackUserID, userID string) string { func GetIsBlackIDsKey(possibleBlackUserID, userID string) string {
return isBlackKey + userID + "-" + possibleBlackUserID return IsBlackKey + userID + "-" + possibleBlackUserID
} }

View File

@ -1,48 +1,44 @@
package cachekey package cachekey
import "time"
const ( const (
conversationKey = "CONVERSATION:" ConversationKey = "CONVERSATION:"
conversationIDsKey = "CONVERSATION_IDS:" ConversationIDsKey = "CONVERSATION_IDS:"
conversationIDsHashKey = "CONVERSATION_IDS_HASH:" ConversationIDsHashKey = "CONVERSATION_IDS_HASH:"
conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" ConversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:"
recvMsgOptKey = "RECV_MSG_OPT:" RecvMsgOptKey = "RECV_MSG_OPT:"
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" SuperGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" SuperGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
conversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:" ConversationNotReceiveMessageUserIDsKey = "CONVERSATION_NOT_RECEIVE_MESSAGE_USER_IDS:"
conversationExpireTime = time.Second * 60 * 60 * 12
) )
func GetConversationKey(ownerUserID, conversationID string) string { func GetConversationKey(ownerUserID, conversationID string) string {
return conversationKey + ownerUserID + ":" + conversationID return ConversationKey + ownerUserID + ":" + conversationID
} }
func GetConversationIDsKey(ownerUserID string) string { func GetConversationIDsKey(ownerUserID string) string {
return conversationIDsKey + ownerUserID return ConversationIDsKey + ownerUserID
} }
func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string {
return superGroupRecvMsgNotNotifyUserIDsKey + groupID return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID
} }
func GetRecvMsgOptKey(ownerUserID, conversationID string) string { func GetRecvMsgOptKey(ownerUserID, conversationID string) string {
return recvMsgOptKey + ownerUserID + ":" + conversationID return RecvMsgOptKey + ownerUserID + ":" + conversationID
} }
func GetSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string { func GetSuperGroupRecvNotNotifyUserIDsHashKey(groupID string) string {
return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID return SuperGroupRecvMsgNotNotifyUserIDsHashKey + groupID
} }
func GetConversationHasReadSeqKey(ownerUserID, conversationID string) string { func GetConversationHasReadSeqKey(ownerUserID, conversationID string) string {
return conversationHasReadSeqKey + ownerUserID + ":" + conversationID return ConversationHasReadSeqKey + ownerUserID + ":" + conversationID
} }
func GetConversationNotReceiveMessageUserIDsKey(conversationID string) string { func GetConversationNotReceiveMessageUserIDsKey(conversationID string) string {
return conversationNotReceiveMessageUserIDsKey + conversationID return ConversationNotReceiveMessageUserIDsKey + conversationID
} }
func GetUserConversationIDsHashKey(ownerUserID string) string { func GetUserConversationIDsHashKey(ownerUserID string) string {
return conversationIDsHashKey + ownerUserID return ConversationIDsHashKey + ownerUserID
} }

View File

@ -1,27 +1,24 @@
package cachekey package cachekey
import "time"
const ( const (
friendExpireTime = time.Second * 60 * 60 * 12 FriendIDsKey = "FRIEND_IDS:"
friendIDsKey = "FRIEND_IDS:" TwoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:"
twoWayFriendsIDsKey = "COMMON_FRIENDS_IDS:" FriendKey = "FRIEND_INFO:"
friendKey = "FRIEND_INFO:" IsFriendKey = "IS_FRIEND:" // local cache key
isFriendKey = "IS_FRIEND:"
) )
func GetFriendIDsKey(ownerUserID string) string { func GetFriendIDsKey(ownerUserID string) string {
return friendIDsKey + ownerUserID return FriendIDsKey + ownerUserID
} }
func GetTwoWayFriendsIDsKey(ownerUserID string) string { func GetTwoWayFriendsIDsKey(ownerUserID string) string {
return twoWayFriendsIDsKey + ownerUserID return TwoWayFriendsIDsKey + ownerUserID
} }
func GetFriendKey(ownerUserID, friendUserID string) string { func GetFriendKey(ownerUserID, friendUserID string) string {
return friendKey + ownerUserID + "-" + friendUserID return FriendKey + ownerUserID + "-" + friendUserID
} }
func GetIsFriendKey(possibleFriendUserID, userID string) string { func GetIsFriendKey(possibleFriendUserID, userID string) string {
return isFriendKey + possibleFriendUserID + "-" + userID return IsFriendKey + possibleFriendUserID + "-" + userID
} }

View File

@ -7,39 +7,39 @@ import (
const ( const (
groupExpireTime = time.Second * 60 * 60 * 12 groupExpireTime = time.Second * 60 * 60 * 12
groupInfoKey = "GROUP_INFO:" GroupInfoKey = "GROUP_INFO:"
groupMemberIDsKey = "GROUP_MEMBER_IDS:" GroupMemberIDsKey = "GROUP_MEMBER_IDS:"
groupMembersHashKey = "GROUP_MEMBERS_HASH2:" GroupMembersHashKey = "GROUP_MEMBERS_HASH2:"
groupMemberInfoKey = "GROUP_MEMBER_INFO:" GroupMemberInfoKey = "GROUP_MEMBER_INFO:"
joinedGroupsKey = "JOIN_GROUPS_KEY:" JoinedGroupsKey = "JOIN_GROUPS_KEY:"
groupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:" GroupMemberNumKey = "GROUP_MEMBER_NUM_CACHE:"
groupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:" GroupRoleLevelMemberIDsKey = "GROUP_ROLE_LEVEL_MEMBER_IDS:"
) )
func GetGroupInfoKey(groupID string) string { func GetGroupInfoKey(groupID string) string {
return groupInfoKey + groupID return GroupInfoKey + groupID
} }
func GetJoinedGroupsKey(userID string) string { func GetJoinedGroupsKey(userID string) string {
return joinedGroupsKey + userID return JoinedGroupsKey + userID
} }
func GetGroupMembersHashKey(groupID string) string { func GetGroupMembersHashKey(groupID string) string {
return groupMembersHashKey + groupID return GroupMembersHashKey + groupID
} }
func GetGroupMemberIDsKey(groupID string) string { func GetGroupMemberIDsKey(groupID string) string {
return groupMemberIDsKey + groupID return GroupMemberIDsKey + groupID
} }
func GetGroupMemberInfoKey(groupID, userID string) string { func GetGroupMemberInfoKey(groupID, userID string) string {
return groupMemberInfoKey + groupID + "-" + userID return GroupMemberInfoKey + groupID + "-" + userID
} }
func GetGroupMemberNumKey(groupID string) string { func GetGroupMemberNumKey(groupID string) string {
return groupMemberNumKey + groupID return GroupMemberNumKey + groupID
} }
func GetGroupRoleLevelMemberIDsKey(groupID string, roleLevel int32) string { func GetGroupRoleLevelMemberIDsKey(groupID string, roleLevel int32) string {
return groupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel)) return GroupRoleLevelMemberIDsKey + groupID + "-" + strconv.Itoa(int(roleLevel))
} }

View File

@ -1,23 +0,0 @@
package cachekey
const (
maxSeq = "MAX_SEQ:"
minSeq = "MIN_SEQ:"
conversationUserMinSeq = "CON_USER_MIN_SEQ:"
hasReadSeq = "HAS_READ_SEQ:"
//appleDeviceToken = "DEVICE_TOKEN"
getuiToken = "GETUI_TOKEN"
getuiTaskID = "GETUI_TASK_ID"
//signalCache = "SIGNAL_CACHE:"
//signalListCache = "SIGNAL_LIST_CACHE:"
FCM_TOKEN = "FCM_TOKEN:"
messageCache = "MESSAGE_CACHE:"
messageDelUserList = "MESSAGE_DEL_USER_LIST:"
userDelMessagesList = "USER_DEL_MESSAGES_LIST:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
userBadgeUnreadCountSum = "USER_BADGE_UNREAD_COUNT_SUM:"
exTypeKeyLocker = "EX_LOCK:"
uidPidToken = "UID_PID_TOKEN_STATUS:"
)

View File

@ -1 +0,0 @@
package cachekey

View File

@ -1,21 +0,0 @@
package cachekey
import "time"
const (
userExpireTime = time.Second * 60 * 60 * 12
userInfoKey = "USER_INFO:"
userGlobalRecvMsgOptKey = "USER_GLOBAL_RECV_MSG_OPT_KEY:"
olineStatusKey = "ONLINE_STATUS:"
userOlineStatusExpireTime = time.Second * 60 * 60 * 24
statusMod = 501
platformID = "_PlatformIDSuffix"
)
func GetUserInfoKey(userID string) string {
return userInfoKey + userID
}
func GetUserGlobalRecvMsgOptKey(userID string) string {
return userGlobalRecvMsgOptKey + userID
}

View File

@ -378,8 +378,14 @@ type LocalCache struct {
SlotSize int `yaml:"slotSize"` SlotSize int `yaml:"slotSize"`
} }
func (l LocalCache) Enable() bool {
return l.Topic != "" && l.SlotNum > 0 && l.SlotSize > 0
}
type localCache struct { type localCache struct {
Friend LocalCache `yaml:"friend"` Friend LocalCache `yaml:"friend"`
Group LocalCache `yaml:"group"`
Conversation LocalCache `yaml:"conversation"`
} }
func (c *configStruct) GetServiceNames() []string { func (c *configStruct) GetServiceNames() []string {

View File

@ -16,7 +16,6 @@ package cache
import ( import (
"context" "context"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/config"
"time" "time"
@ -56,9 +55,7 @@ func NewBlackCacheRedis(
) BlackCache { ) BlackCache {
rcClient := rockscache.NewClient(rdb, options) rcClient := rockscache.NewClient(rdb, options)
mc := NewMetaCacheRedis(rcClient) mc := NewMetaCacheRedis(rcClient)
f := config.Config.LocalCache.Friend mc.SetTopic(config.Config.LocalCache.Friend.Topic)
log.ZDebug(context.Background(), "black local cache init", "Topic", f.Topic, "SlotNum", f.SlotNum, "SlotSize", f.SlotSize)
mc.SetTopic(f.Topic)
mc.SetRawRedisClient(rdb) mc.SetRawRedisClient(rdb)
return &BlackCacheRedis{ return &BlackCacheRedis{
expireTime: blackExpireTime, expireTime: blackExpireTime,
@ -73,7 +70,7 @@ func (b *BlackCacheRedis) NewCache() BlackCache {
expireTime: b.expireTime, expireTime: b.expireTime,
rcClient: b.rcClient, rcClient: b.rcClient,
blackDB: b.blackDB, blackDB: b.blackDB,
metaCache: b.metaCache.Copy(), metaCache: b.Copy(),
} }
} }

62
pkg/common/db/cache/config.go vendored Normal file
View File

@ -0,0 +1,62 @@
package cache
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"strings"
"sync"
)
var (
once sync.Once
subscribe map[string][]string
)
func getPublishKey(topic string, key []string) []string {
if topic == "" || len(key) == 0 {
return nil
}
once.Do(func() {
list := []struct {
Local config.LocalCache
Keys []string
}{
{
Local: config.Config.LocalCache.Group,
Keys: []string{cachekey.GroupMemberIDsKey},
},
{
Local: config.Config.LocalCache.Friend,
Keys: []string{cachekey.FriendIDsKey, cachekey.BlackIDsKey},
},
{
Local: config.Config.LocalCache.Conversation,
Keys: []string{cachekey.ConversationIDsKey},
},
}
subscribe = make(map[string][]string)
for _, v := range list {
if v.Local.Enable() {
subscribe[v.Local.Topic] = v.Keys
}
}
})
prefix, ok := subscribe[topic]
if !ok {
return nil
}
res := make([]string, 0, len(key))
for _, k := range key {
var exist bool
for _, p := range prefix {
if strings.HasPrefix(k, p) {
exist = true
break
}
}
if exist {
res = append(res, k)
}
}
return res
}

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"errors" "errors"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"math/big" "math/big"
"strings" "strings"
"time" "time"
@ -85,10 +86,12 @@ type ConversationCache interface {
func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationtb.ConversationModelInterface) ConversationCache { func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationtb.ConversationModelInterface) ConversationCache {
rcClient := rockscache.NewClient(rdb, opts) rcClient := rockscache.NewClient(rdb, opts)
mc := NewMetaCacheRedis(rcClient)
mc.SetTopic(config.Config.LocalCache.Conversation.Topic)
mc.SetRawRedisClient(rdb)
return &ConversationRedisCache{ return &ConversationRedisCache{
rcClient: rcClient, rcClient: rcClient,
metaCache: NewMetaCacheRedis(rcClient), metaCache: mc,
conversationDB: db, conversationDB: db,
expireTime: conversationExpireTime, expireTime: conversationExpireTime,
} }
@ -119,7 +122,7 @@ type ConversationRedisCache struct {
func (c *ConversationRedisCache) NewCache() ConversationCache { func (c *ConversationRedisCache) NewCache() ConversationCache {
return &ConversationRedisCache{ return &ConversationRedisCache{
rcClient: c.rcClient, rcClient: c.rcClient,
metaCache: NewMetaCacheRedis(c.rcClient, c.metaCache.GetPreDelKeys()...), metaCache: c.Copy(),
conversationDB: c.conversationDB, conversationDB: c.conversationDB,
expireTime: c.expireTime, expireTime: c.expireTime,
} }

View File

@ -80,7 +80,7 @@ func NewFriendCacheRedis(rdb redis.UniversalClient, friendDB relationtb.FriendMo
func (f *FriendCacheRedis) NewCache() FriendCache { func (f *FriendCacheRedis) NewCache() FriendCache {
return &FriendCacheRedis{ return &FriendCacheRedis{
rcClient: f.rcClient, rcClient: f.rcClient,
metaCache: f.metaCache.Copy(), metaCache: f.Copy(),
friendDB: f.friendDB, friendDB: f.friendDB,
expireTime: f.expireTime, expireTime: f.expireTime,
} }

View File

@ -18,6 +18,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"time" "time"
"github.com/OpenIMSDK/protocol/constant" "github.com/OpenIMSDK/protocol/constant"
@ -104,12 +105,14 @@ func NewGroupCacheRedis(
opts rockscache.Options, opts rockscache.Options,
) GroupCache { ) GroupCache {
rcClient := rockscache.NewClient(rdb, opts) rcClient := rockscache.NewClient(rdb, opts)
mc := NewMetaCacheRedis(rcClient)
mc.SetTopic(config.Config.LocalCache.Group.Topic)
mc.SetRawRedisClient(rdb)
return &GroupCacheRedis{ return &GroupCacheRedis{
rcClient: rcClient, expireTime: groupExpireTime, rcClient: rcClient, expireTime: groupExpireTime,
groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB,
groupHash: hashCode, groupHash: hashCode,
metaCache: NewMetaCacheRedis(rcClient), metaCache: mc,
} }
} }
@ -120,7 +123,7 @@ func (g *GroupCacheRedis) NewCache() GroupCache {
groupDB: g.groupDB, groupDB: g.groupDB,
groupMemberDB: g.groupMemberDB, groupMemberDB: g.groupMemberDB,
groupRequestDB: g.groupRequestDB, groupRequestDB: g.groupRequestDB,
metaCache: NewMetaCacheRedis(g.rcClient, g.metaCache.GetPreDelKeys()...), metaCache: g.Copy(),
} }
} }

View File

@ -103,13 +103,13 @@ func (m *metaCacheRedis) ExecDel(ctx context.Context, distinct ...bool) error {
break break
} }
} }
if m.topic != "" && m.redisClient != nil { if pk := getPublishKey(m.topic, m.keys); len(pk) > 0 {
data, err := json.Marshal(m.keys) data, err := json.Marshal(pk)
if err != nil { if err != nil {
log.ZError(ctx, "keys json marshal failed", err, "topic", m.topic, "keys", m.keys) log.ZError(ctx, "keys json marshal failed", err, "topic", m.topic, "keys", pk)
} else { } else {
if err := m.redisClient.Publish(ctx, m.topic, string(data)).Err(); err != nil { if err := m.redisClient.Publish(ctx, m.topic, string(data)).Err(); err != nil {
log.ZError(ctx, "redis publish cache delete error", err, "topic", m.topic, "keys", m.keys) log.ZError(ctx, "redis publish cache delete error", err, "topic", m.topic, "keys", pk)
} }
} }
} }

View File

@ -1,87 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package localcache
import (
"context"
"sync"
"github.com/OpenIMSDK/protocol/conversation"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)
type ConversationLocalCache struct {
lock sync.Mutex
superGroupRecvMsgNotNotifyUserIDs map[string]Hash
conversationIDs map[string]Hash
client *rpcclient.ConversationRpcClient
}
type Hash struct {
hash uint64
ids []string
}
func NewConversationLocalCache(client *rpcclient.ConversationRpcClient) *ConversationLocalCache {
return &ConversationLocalCache{
superGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash),
conversationIDs: make(map[string]Hash),
client: client,
}
}
func (g *ConversationLocalCache) GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) {
resp, err := g.client.Client.GetRecvMsgNotNotifyUserIDs(ctx, &conversation.GetRecvMsgNotNotifyUserIDsReq{
GroupID: groupID,
})
if err != nil {
return nil, err
}
return resp.UserIDs, nil
}
func (g *ConversationLocalCache) GetConversationIDs(ctx context.Context, userID string) ([]string, error) {
resp, err := g.client.Client.GetUserConversationIDsHash(ctx, &conversation.GetUserConversationIDsHashReq{
OwnerUserID: userID,
})
if err != nil {
return nil, err
}
g.lock.Lock()
hash, ok := g.conversationIDs[userID]
g.lock.Unlock()
if !ok || hash.hash != resp.Hash {
conversationIDsResp, err := g.client.Client.GetConversationIDs(ctx, &conversation.GetConversationIDsReq{
UserID: userID,
})
if err != nil {
return nil, err
}
g.lock.Lock()
defer g.lock.Unlock()
g.conversationIDs[userID] = Hash{
hash: resp.Hash,
ids: conversationIDsResp.ConversationIDs,
}
return conversationIDsResp.ConversationIDs, nil
}
return hash.ids, nil
}

View File

@ -1,15 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package localcache // import "github.com/openimsdk/open-im-server/v3/pkg/common/db/localcache"

View File

@ -1,78 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package localcache
import (
"context"
"sync"
"github.com/OpenIMSDK/protocol/group"
"github.com/OpenIMSDK/tools/errs"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
)
type GroupLocalCache struct {
lock sync.Mutex
cache map[string]GroupMemberIDsHash
client *rpcclient.GroupRpcClient
}
type GroupMemberIDsHash struct {
memberListHash uint64
userIDs []string
}
func NewGroupLocalCache(client *rpcclient.GroupRpcClient) *GroupLocalCache {
return &GroupLocalCache{
cache: make(map[string]GroupMemberIDsHash, 0),
client: client,
}
}
func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) {
resp, err := g.client.Client.GetGroupAbstractInfo(ctx, &group.GetGroupAbstractInfoReq{
GroupIDs: []string{groupID},
})
if err != nil {
return nil, err
}
if len(resp.GroupAbstractInfos) < 1 {
return nil, errs.ErrGroupIDNotFound
}
g.lock.Lock()
localHashInfo, ok := g.cache[groupID]
if ok && localHashInfo.memberListHash == resp.GroupAbstractInfos[0].GroupMemberListHash {
g.lock.Unlock()
return localHashInfo.userIDs, nil
}
g.lock.Unlock()
groupMembersResp, err := g.client.Client.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{
GroupID: groupID,
})
if err != nil {
return nil, err
}
g.lock.Lock()
defer g.lock.Unlock()
g.cache[groupID] = GroupMemberIDsHash{
memberListHash: resp.GroupAbstractInfos[0].GroupMemberListHash,
userIDs: groupMembersResp.UserIDs,
}
return g.cache[groupID].userIDs, nil
}

View File

@ -1,15 +0,0 @@
// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package localcache

View File

@ -4,11 +4,11 @@ import (
"context" "context"
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/link" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/link"
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local" "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/local"
opt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option" lopt "github.com/openimsdk/open-im-server/v3/pkg/common/localcache/option"
) )
type Cache[V any] interface { type Cache[V any] interface {
Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error)
Del(ctx context.Context, key ...string) Del(ctx context.Context, key ...string)
} }
@ -17,12 +17,17 @@ func New[V any](opts ...Option) Cache[V] {
for _, o := range opts { for _, o := range opts {
o(opt) o(opt)
} }
c := &cache[V]{opt: opt, link: link.New(opt.localSlotNum)} c := cache[V]{opt: opt}
if opt.localSlotNum > 0 && opt.localSlotSize > 0 {
c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict) c.local = local.NewCache[V](opt.localSlotNum, opt.localSlotSize, opt.localSuccessTTL, opt.localFailedTTL, opt.target, c.onEvict)
go func() { go func() {
c.opt.delCh(c.del) c.opt.delCh(c.del)
}() }()
return c if opt.linkSlotNum > 0 {
c.link = link.New(opt.linkSlotNum)
}
}
return &c
} }
type cache[V any] struct { type cache[V any] struct {
@ -32,6 +37,7 @@ type cache[V any] struct {
} }
func (c *cache[V]) onEvict(key string, value V) { func (c *cache[V]) onEvict(key string, value V) {
if c.link != nil {
lks := c.link.Del(key) lks := c.link.Del(key)
for k := range lks { for k := range lks {
if key != k { // prevent deadlock if key != k { // prevent deadlock
@ -39,6 +45,7 @@ func (c *cache[V]) onEvict(key string, value V) {
} }
} }
} }
}
func (c *cache[V]) del(key ...string) { func (c *cache[V]) del(key ...string) {
for _, k := range key { for _, k := range key {
@ -50,16 +57,14 @@ func (c *cache[V]) del(key ...string) {
} }
} }
func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*opt.Option) (V, error) { func (c *cache[V]) Get(ctx context.Context, key string, fetch func(ctx context.Context) (V, error), opts ...*lopt.Option) (V, error) {
enable := c.opt.enable if c.local != nil {
if len(opts) > 0 && opts[0].Enable != nil {
enable = *opts[0].Enable
}
if enable {
if len(opts) > 0 && len(opts[0].Link) > 0 {
c.link.Link(key, opts[0].Link...)
}
return c.local.Get(key, func() (V, error) { return c.local.Get(key, func() (V, error) {
if c.link != nil {
for _, o := range opts {
c.link.Link(key, o.Link...)
}
}
return fetch(ctx) return fetch(ctx)
}) })
} else { } else {
@ -74,7 +79,7 @@ func (c *cache[V]) Del(ctx context.Context, key ...string) {
for _, fn := range c.opt.delFn { for _, fn := range c.opt.delFn {
fn(ctx, key...) fn(ctx, key...)
} }
if c.opt.enable { if c.local != nil {
c.del(key...) c.del(key...)
} }
} }

View File

@ -8,9 +8,9 @@ import (
func defaultOption() *option { func defaultOption() *option {
return &option{ return &option{
enable: true,
localSlotNum: 500, localSlotNum: 500,
localSlotSize: 20000, localSlotSize: 20000,
linkSlotNum: 500,
localSuccessTTL: time.Minute, localSuccessTTL: time.Minute,
localFailedTTL: time.Second * 5, localFailedTTL: time.Second * 5,
delFn: make([]func(ctx context.Context, key ...string), 0, 2), delFn: make([]func(ctx context.Context, key ...string), 0, 2),
@ -19,9 +19,9 @@ func defaultOption() *option {
} }
type option struct { type option struct {
enable bool
localSlotNum int localSlotNum int
localSlotSize int localSlotSize int
linkSlotNum int
localSuccessTTL time.Duration localSuccessTTL time.Duration
localFailedTTL time.Duration localFailedTTL time.Duration
delFn []func(ctx context.Context, key ...string) delFn []func(ctx context.Context, key ...string)
@ -31,25 +31,27 @@ type option struct {
type Option func(o *option) type Option func(o *option)
func WithDisable() Option { func WithLocalDisable() Option {
return WithLinkSlotNum(0)
}
func WithLinkDisable() Option {
return WithLinkSlotNum(0)
}
func WithLinkSlotNum(linkSlotNum int) Option {
return func(o *option) { return func(o *option) {
o.enable = false o.linkSlotNum = linkSlotNum
} }
} }
func WithLocalSlotNum(localSlotNum int) Option { func WithLocalSlotNum(localSlotNum int) Option {
if localSlotNum < 1 {
panic("localSlotNum should be greater than 0")
}
return func(o *option) { return func(o *option) {
o.localSlotNum = localSlotNum o.localSlotNum = localSlotNum
} }
} }
func WithLocalSlotSize(localSlotSize int) Option { func WithLocalSlotSize(localSlotSize int) Option {
if localSlotSize < 1 {
panic("localSlotSize should be greater than 0")
}
return func(o *option) { return func(o *option) {
o.localSlotSize = localSlotSize o.localSlotSize = localSlotSize
} }

View File

@ -5,22 +5,9 @@ func NewOption() *Option {
} }
type Option struct { type Option struct {
Enable *bool
Link []string Link []string
} }
func (o *Option) WithEnable() *Option {
t := true
o.Enable = &t
return o
}
func (o *Option) WithDisable() *Option {
f := false
o.Enable = &f
return o
}
func (o *Option) WithLink(key ...string) *Option { func (o *Option) WithLink(key ...string) *Option {
if len(key) > 0 { if len(key) > 0 {
if len(o.Link) == 0 { if len(o.Link) == 0 {

View File

@ -0,0 +1,28 @@
package rpccache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/redis/go-redis/v9"
)
func NewConversationLocalCache(client rpcclient.ConversationRpcClient, cli redis.UniversalClient) *ConversationLocalCache {
return &ConversationLocalCache{
local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Conversation.Topic, cli)),
client: client,
}
}
type ConversationLocalCache struct {
local localcache.Cache[any]
client rpcclient.ConversationRpcClient
}
func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) {
return localcache.AnyValue[[]string](c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) (any, error) {
return c.client.GetConversationIDs(ctx, ownerUserID)
}))
}

View File

@ -23,20 +23,20 @@ type FriendLocalCache struct {
client rpcclient.FriendRpcClient client rpcclient.FriendRpcClient
} }
func (f *FriendLocalCache) GetFriendIDs(ctx context.Context, ownerUserID string) (val []string, err error) { //func (f *FriendLocalCache) GetFriendIDs(ctx context.Context, ownerUserID string) (val []string, err error) {
log.ZDebug(ctx, "FriendLocalCache GetFriendIDs req", "ownerUserID", ownerUserID) // log.ZDebug(ctx, "FriendLocalCache GetFriendIDs req", "ownerUserID", ownerUserID)
defer func() { // defer func() {
if err == nil { // if err == nil {
log.ZDebug(ctx, "FriendLocalCache GetFriendIDs return", "value", val) // log.ZDebug(ctx, "FriendLocalCache GetFriendIDs return", "value", val)
} else { // } else {
log.ZError(ctx, "FriendLocalCache GetFriendIDs return", err) // log.ZError(ctx, "FriendLocalCache GetFriendIDs return", err)
} // }
}() // }()
return localcache.AnyValue[[]string](f.local.Get(ctx, cachekey.GetFriendIDsKey(ownerUserID), func(ctx context.Context) (any, error) { // return localcache.AnyValue[[]string](f.local.Get(ctx, cachekey.GetFriendIDsKey(ownerUserID), func(ctx context.Context) (any, error) {
log.ZDebug(ctx, "FriendLocalCache GetFriendIDs call rpc", "ownerUserID", ownerUserID) // log.ZDebug(ctx, "FriendLocalCache GetFriendIDs call rpc", "ownerUserID", ownerUserID)
return f.client.GetFriendIDs(ctx, ownerUserID) // return f.client.GetFriendIDs(ctx, ownerUserID)
})) // }))
} //}
func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (val bool, err error) { func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (val bool, err error) {
log.ZDebug(ctx, "FriendLocalCache IsFriend req", "possibleFriendUserID", possibleFriendUserID, "userID", userID) log.ZDebug(ctx, "FriendLocalCache IsFriend req", "possibleFriendUserID", possibleFriendUserID, "userID", userID)

28
pkg/rpccache/group.go Normal file
View File

@ -0,0 +1,28 @@
package rpccache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/redis/go-redis/v9"
)
func NewGroupLocalCache(client rpcclient.GroupRpcClient, cli redis.UniversalClient) *GroupLocalCache {
return &GroupLocalCache{
local: localcache.New[any](localcache.WithRedisDeleteSubscribe(config.Config.LocalCache.Group.Topic, cli)),
client: client,
}
}
type GroupLocalCache struct {
local localcache.Cache[any]
client rpcclient.GroupRpcClient
}
func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) {
return localcache.AnyValue[[]string](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) {
return g.client.GetGroupMemberIDs(ctx, groupID)
}))
}