conversationLocalCache

This commit is contained in:
wangchuxiao 2023-05-17 17:09:26 +08:00
parent 9b633b6729
commit dacf6cca31
8 changed files with 651 additions and 421 deletions

View File

@ -195,9 +195,6 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbConver
if req.Conversation.DraftTextTime != nil {
m["draft_text_time"] = req.Conversation.DraftTextTime.Value
}
if req.Conversation.UnreadCount != nil {
m["unread_count"] = req.Conversation.UnreadCount.Value
}
if req.Conversation.AttachedInfo != nil {
m["attached_info"] = req.Conversation.AttachedInfo.Value
}
@ -283,7 +280,8 @@ func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, r
func (c *conversationServer) DelGroupChatConversations(ctx context.Context, req *pbConversation.DelGroupChatConversationsReq) (*pbConversation.DelGroupChatConversationsResp, error) {
if err := c.conversationDatabase.UpdateUsersConversationFiled(ctx, req.OwnerUserID,
utils.GetConversationIDBySessionType(constant.SuperGroupChatType, req.GroupID), map[string]interface{}{"max_seq": req.MaxSeq}); err != nil {
utils.GetConversationIDBySessionType(constant.SuperGroupChatType, req.GroupID),
map[string]interface{}{"max_seq": req.MaxSeq}); err != nil {
return nil, err
}
return &pbConversation.DelGroupChatConversationsResp{}, nil
@ -315,3 +313,11 @@ func (c *conversationServer) GetConversationsHasReadAndMaxSeq(ctx context.Contex
}
return resp, nil
}
func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req *pbConversation.GetUserConversationIDsHashReq) (*pbConversation.GetUserConversationIDsHashResp, error) {
hash, err := c.conversationDatabase.GetUserConversationIDsHash(ctx, req.OwnerUserID)
if err != nil {
return nil, err
}
return &pbConversation.GetUserConversationIDsHashResp{Hash: hash}, nil
}

View File

@ -23,22 +23,24 @@ import (
type MessageInterceptorChain []MessageInterceptorFunc
type msgServer struct {
RegisterCenter discoveryregistry.SvcDiscoveryRegistry
MsgDatabase controller.CommonMsgDatabase
ExtendMsgDatabase controller.ExtendMsgDatabase
Group *rpcclient.GroupClient
User *rpcclient.UserClient
Conversation *rpcclient.ConversationClient
friend *rpcclient.FriendClient
black *rpcclient.BlackClient
GroupLocalCache *localcache.GroupLocalCache
MessageLocker MessageLocker
Handlers MessageInterceptorChain
RegisterCenter discoveryregistry.SvcDiscoveryRegistry
MsgDatabase controller.CommonMsgDatabase
ExtendMsgDatabase controller.ExtendMsgDatabase
Group *rpcclient.GroupClient
User *rpcclient.UserClient
Conversation *rpcclient.ConversationClient
friend *rpcclient.FriendClient
black *rpcclient.BlackClient
GroupLocalCache *localcache.GroupLocalCache
ConversationLocalCache *localcache.ConversationLocalCache
MessageLocker MessageLocker
Handlers MessageInterceptorChain
}
func (m *msgServer) addInterceptorHandler(interceptorFunc ...MessageInterceptorFunc) {
m.Handlers = append(m.Handlers, interceptorFunc...)
}
func (m *msgServer) execInterceptorHandler(ctx context.Context, req *msg.SendMsgReq) error {
for _, handler := range m.Handlers {
msgData, err := handler(ctx, req)
@ -49,6 +51,7 @@ func (m *msgServer) execInterceptorHandler(ctx context.Context, req *msg.SendMsg
}
return nil
}
func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis()
if err != nil {
@ -66,16 +69,17 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, cacheModel)
s := &msgServer{
Conversation: rpcclient.NewConversationClient(client),
User: rpcclient.NewUserClient(client),
Group: rpcclient.NewGroupClient(client),
MsgDatabase: msgDatabase,
ExtendMsgDatabase: extendMsgDatabase,
RegisterCenter: client,
GroupLocalCache: localcache.NewGroupLocalCache(client),
black: rpcclient.NewBlackClient(client),
friend: rpcclient.NewFriendClient(client),
MessageLocker: NewLockerMessage(cacheModel),
Conversation: rpcclient.NewConversationClient(client),
User: rpcclient.NewUserClient(client),
Group: rpcclient.NewGroupClient(client),
MsgDatabase: msgDatabase,
ExtendMsgDatabase: extendMsgDatabase,
RegisterCenter: client,
GroupLocalCache: localcache.NewGroupLocalCache(client),
ConversationLocalCache: localcache.NewConversationLocalCache(client),
black: rpcclient.NewBlackClient(client),
friend: rpcclient.NewFriendClient(client),
MessageLocker: NewLockerMessage(cacheModel),
}
s.addInterceptorHandler(MessageHasReadEnabled, MessageModifyCallback)
s.initPrometheus()
@ -127,7 +131,7 @@ func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sd
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
return nil, err
}
conversationIDs, err := m.Conversation.GetConversationIDs(ctx, req.UserID)
conversationIDs, err := m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID)
if err != nil {
return nil, err
}

View File

@ -17,6 +17,7 @@ import (
const (
conversationKey = "CONVERSATION:"
conversationIDsKey = "CONVERSATION_IDS:"
conversationIDsHashKey = "CONVERSATION_IDS_HASH:"
recvMsgOptKey = "RECV_MSG_OPT:"
superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:"
superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:"
@ -31,6 +32,10 @@ type ConversationCache interface {
// get user's conversationIDs from msgCache
GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error)
DelConversationIDs(userIDs ...string) ConversationCache
GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
DelUserConversationIDsHash(ownerUserIDs ...string) ConversationCache
// get one conversation from msgCache
GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error)
DelConvsersations(ownerUserID string, conversationIDs ...string) ConversationCache
@ -107,6 +112,33 @@ func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) Conversat
return cache
}
func (c *ConversationRedisCache) getUserConversationIDsHashKey(ownerUserID string) string {
return conversationIDsHashKey + ownerUserID
}
func (c *ConversationRedisCache) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) {
return getCache(ctx, c.rcClient, c.getUserConversationIDsHashKey(ownerUserID), c.expireTime, func(ctx context.Context) (uint64, error) {
conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID)
if err != nil {
return 0, err
}
utils.Sort(conversationIDs, true)
bi := big.NewInt(0)
bi.SetString(utils.Md5(strings.Join(conversationIDs, ";"))[0:8], 16)
return bi.Uint64(), nil
})
}
func (c *ConversationRedisCache) DelUserConversationIDsHash(ownerUserIDs ...string) ConversationCache {
var keys []string
for _, ownerUserID := range ownerUserIDs {
keys = append(keys, c.getUserConversationIDsHashKey(ownerUserID))
}
cache := c.NewCache()
cache.AddKeys(keys...)
return cache
}
func (c *ConversationRedisCache) GetConversation(ctx context.Context, ownerUserID, conversationID string) (*relationTb.ConversationModel, error) {
return getCache(ctx, c.rcClient, c.getConversationKey(ownerUserID, conversationID), c.expireTime, func(ctx context.Context) (*relationTb.ConversationModel, error) {
return c.conversationDB.Take(ctx, ownerUserID, conversationID)

View File

@ -30,6 +30,7 @@ type ConversationDatabase interface {
SetUsersConversationFiledTx(ctx context.Context, userIDs []string, conversation *relationTb.ConversationModel, filedMap map[string]interface{}) error
CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error
GetConversationIDs(ctx context.Context, userID string) ([]string, error)
GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error)
}
func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
@ -78,7 +79,7 @@ func (c *ConversationDataBase) SetUsersConversationFiledTx(ctx context.Context,
if err != nil {
return err
}
cache = cache.DelConversationIDs(NotUserIDs...)
cache = cache.DelConversationIDs(NotUserIDs...).DelUserConversationIDsHash(NotUserIDs...)
}
return nil
}); err != nil {
@ -103,7 +104,7 @@ func (c *ConversationDataBase) CreateConversation(ctx context.Context, conversat
for _, conversation := range conversations {
userIDs = append(userIDs, conversation.OwnerUserID)
}
return c.cache.DelConversationIDs(userIDs...).ExecDel(ctx)
return c.cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).ExecDel(ctx)
}
func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationTb.ConversationModel) error {
@ -131,7 +132,7 @@ func (c *ConversationDataBase) SyncPeerUserPrivateConversationTx(ctx context.Con
if err := conversationTx.Create(ctx, []*relationTb.ConversationModel{&newConversation}); err != nil {
return err
}
cache = cache.DelConversationIDs([]string{v[0]}...)
cache = cache.DelConversationIDs(v[0]).DelUserConversationIDsHash(v[0])
}
}
}
@ -190,7 +191,7 @@ func (c *ConversationDataBase) SetUserConversations(ctx context.Context, ownerUs
if err != nil {
return err
}
cache = cache.DelConversationIDs([]string{ownerUserID}...)
cache = cache.DelConversationIDs(ownerUserID).DelUserConversationIDsHash(ownerUserID)
}
cache = cache.DelConvsersations(ownerUserID, existConversationIDs...)
return nil
@ -218,7 +219,7 @@ func (c *ConversationDataBase) CreateGroupChatConversation(ctx context.Context,
conversation := relationTb.ConversationModel{ConversationType: constant.SuperGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID}
conversations = append(conversations, &conversation)
}
cache = cache.DelConversationIDs(notExistUserIDs...)
cache = cache.DelConversationIDs(notExistUserIDs...).DelUserConversationIDsHash(notExistUserIDs...)
err = c.conversationDB.Create(ctx, conversations)
if err != nil {
return err
@ -240,3 +241,7 @@ func (c *ConversationDataBase) CreateGroupChatConversation(ctx context.Context,
func (c *ConversationDataBase) GetConversationIDs(ctx context.Context, userID string) ([]string, error) {
return c.cache.GetUserConversationIDs(ctx, userID)
}
func (c *ConversationDataBase) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) {
return c.cache.GetUserConversationIDsHash(ctx, ownerUserID)
}

View File

@ -9,22 +9,22 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/conversation"
)
type ConversationLocalCacheInterface interface {
GetRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error)
GetConversationIDs(ctx context.Context, userID string) ([]string, error)
}
type ConversationLocalCache struct {
lock sync.Mutex
SuperGroupRecvMsgNotNotifyUserIDs map[string][]string
ConversationIDs map[string][]string
SuperGroupRecvMsgNotNotifyUserIDs map[string]Hash
ConversationIDs map[string]Hash
client discoveryregistry.SvcDiscoveryRegistry
}
type Hash struct {
hash uint64
ids []string
}
func NewConversationLocalCache(client discoveryregistry.SvcDiscoveryRegistry) *ConversationLocalCache {
return &ConversationLocalCache{
SuperGroupRecvMsgNotNotifyUserIDs: make(map[string][]string, 0),
ConversationIDs: make(map[string][]string, 0),
SuperGroupRecvMsgNotNotifyUserIDs: make(map[string]Hash),
ConversationIDs: make(map[string]Hash),
client: client,
}
}
@ -50,11 +50,28 @@ func (g *ConversationLocalCache) GetConversationIDs(ctx context.Context, userID
return nil, err
}
client := conversation.NewConversationClient(conn)
resp, err := client.GetConversationIDs(ctx, &conversation.GetConversationIDsReq{
UserID: userID,
resp, err := client.GetUserConversationIDsHash(ctx, &conversation.GetUserConversationIDsHashReq{
OwnerUserID: userID,
})
if err != nil {
return nil, err
}
return resp.ConversationIDs, nil
g.lock.Lock()
defer g.lock.Unlock()
hash, ok := g.ConversationIDs[userID]
if !ok || hash.hash != resp.Hash {
conversationIDsResp, err := client.GetConversationIDs(ctx, &conversation.GetConversationIDsReq{
UserID: userID,
})
if err != nil {
return nil, err
}
g.ConversationIDs[userID] = Hash{
hash: resp.Hash,
ids: conversationIDsResp.ConversationIDs,
}
return conversationIDsResp.ConversationIDs, nil
}
return hash.ids, nil
}

View File

@ -10,10 +10,6 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/group"
)
type GroupLocalCacheInterface interface {
GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error)
}
type GroupLocalCache struct {
lock sync.Mutex
cache map[string]GroupMemberIDsHash

File diff suppressed because it is too large Load Diff

View File

@ -165,6 +165,14 @@ message SetConversationsReq {
message SetConversationsResp {
}
message GetUserConversationIDsHashReq {
string ownerUserID = 1;
}
message GetUserConversationIDsHashResp {
uint64 hash = 1;
}
service conversation {
rpc ModifyConversationField(ModifyConversationFieldReq)returns(ModifyConversationFieldResp);
rpc GetConversation(GetConversationReq)returns(GetConversationResp);
@ -180,4 +188,5 @@ service conversation {
rpc GetConversationIDs(GetConversationIDsReq) returns(GetConversationIDsResp);
rpc GetConversationsHasReadAndMaxSeq(GetConversationsHasReadAndMaxSeqReq) returns(GetConversationsHasReadAndMaxSeqResp);
rpc SetConversations(SetConversationsReq) returns(SetConversationsResp);
rpc GetUserConversationIDsHash(GetUserConversationIDsHashReq) returns(GetUserConversationIDsHashResp);
}