diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 4649409b7..99a18baac 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -50,7 +50,6 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e } func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) { - resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}} conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID}) if err != nil { return nil, err @@ -58,16 +57,17 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbConvers if len(conversations) < 1 { return nil, errs.ErrRecordNotFound.Wrap("conversation not found") } + resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}} resp.Conversation = convert.ConversationDB2Pb(conversations[0]) return resp, nil } func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) { - resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}} conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.OwnerUserID) if err != nil { return nil, err } + resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}} resp.Conversations = convert.ConversationsDB2Pb(conversations) return resp, nil } @@ -297,7 +297,7 @@ func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbConv } func (c *conversationServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *pbConversation.GetConversationsHasReadAndMaxSeqReq) (*pbConversation.GetConversationsHasReadAndMaxSeqResp, error) { - conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.UserID) + conversations, err := c.conversationDatabase.GetUserAllHasReadSeqs(ctx, req.UserID) if err != nil { return nil, err } @@ -306,10 +306,10 @@ func (c *conversationServer) GetConversationsHasReadAndMaxSeq(ctx context.Contex return nil, err } resp := &pbConversation.GetConversationsHasReadAndMaxSeqResp{Seqs: make(map[string]*pbConversation.Seqs)} - for _, v := range conversations { - resp.Seqs[v.ConversationID] = &pbConversation.Seqs{ - HasReadSeq: v.HasReadSeq, - MaxSeq: maxSeqs.MaxSeqs[v.ConversationID], + for conversationID, seq := range conversations { + resp.Seqs[conversationID] = &pbConversation.Seqs{ + HasReadSeq: seq, + MaxSeq: maxSeqs.MaxSeqs[conversationID], } } return resp, nil diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send_pull.go index b2215ec6c..e156650fe 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send_pull.go @@ -21,7 +21,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *msg.SendMsgR promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessFailedCounter) return nil, err } - err = m.MsgDatabase.MsgToMQ(ctx, req.MsgData.GroupID, req.MsgData) + err = m.MsgDatabase.MsgToMQ(ctx, utils.GetConversationIDByMsg(req.MsgData), req.MsgData) if err != nil { return nil, err } diff --git a/pkg/common/db/cache/conversation.go b/pkg/common/db/cache/conversation.go index 79d9f4a38..f50f6decb 100644 --- a/pkg/common/db/cache/conversation.go +++ b/pkg/common/db/cache/conversation.go @@ -18,6 +18,7 @@ const ( conversationKey = "CONVERSATION:" conversationIDsKey = "CONVERSATION_IDS:" conversationIDsHashKey = "CONVERSATION_IDS_HASH:" + conversationHasReadSeqKey = "CONVERSATION_HAS_READ_SEQ:" recvMsgOptKey = "RECV_MSG_OPT:" superGroupRecvMsgNotNotifyUserIDsKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS:" superGroupRecvMsgNotNotifyUserIDsHashKey = "SUPER_GROUP_RECV_MSG_NOT_NOTIFY_USER_IDS_HASH:" @@ -53,6 +54,9 @@ type ConversationCache interface { // get one super group recv msg but do not notification userID list hash GetSuperGroupRecvMsgNotNotifyUserIDsHash(ctx context.Context, groupID string) (hash uint64, err error) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupID string) ConversationCache + + GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) + DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache } func NewConversationRedis(rdb redis.UniversalClient, opts rockscache.Options, db relationTb.ConversationModelInterface) ConversationCache { @@ -96,6 +100,22 @@ func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsHashKey(groupI return superGroupRecvMsgNotNotifyUserIDsHashKey + groupID } +func (c *ConversationRedisCache) getConversationHasReadSeqKey(ownerUserID, conversationID string) string { + return conversationHasReadSeqKey + ownerUserID + ":" + conversationID +} + +func (c *ConversationRedisCache) getAllConversationIDsKeys(ctx context.Context, ownerUserID string) ([]string, []string, error) { + conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID) + if err != nil { + return nil, nil, err + } + var keys []string + for _, conversarionID := range conversationIDs { + keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) + } + return keys, conversationIDs, nil +} + func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) { return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID) @@ -155,7 +175,7 @@ func (c *ConversationRedisCache) DelConvsersations(ownerUserID string, convsersa return cache } -func (c *ConversationRedisCache) GetConversationIndex(convsation *relationTb.ConversationModel, keys []string) (int, error) { +func (c *ConversationRedisCache) getConversationIndex(convsation *relationTb.ConversationModel, keys []string) (int, error) { key := c.getConversationKey(convsation.OwnerUserID, convsation.ConversationID) for _i, _key := range keys { if _key == key { @@ -170,21 +190,17 @@ func (c *ConversationRedisCache) GetConversations(ctx context.Context, ownerUser for _, conversarionID := range conversationIDs { keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) } - return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.GetConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) { + return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) { return c.conversationDB.Find(ctx, ownerUserID, conversationIDs) }) } func (c *ConversationRedisCache) GetUserAllConversations(ctx context.Context, ownerUserID string) ([]*relationTb.ConversationModel, error) { - conversationIDs, err := c.GetUserConversationIDs(ctx, ownerUserID) + keys, _, err := c.getAllConversationIDsKeys(ctx, ownerUserID) if err != nil { return nil, err } - var keys []string - for _, conversarionID := range conversationIDs { - keys = append(keys, c.getConversationKey(ownerUserID, conversarionID)) - } - return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.GetConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) { + return batchGetCache(ctx, c.rcClient, keys, c.expireTime, c.getConversationIndex, func(ctx context.Context) ([]*relationTb.ConversationModel, error) { return c.conversationDB.FindUserIDAllConversations(ctx, ownerUserID) }) } @@ -241,3 +257,30 @@ func (c *ConversationRedisCache) DelSuperGroupRecvMsgNotNotifyUserIDsHash(groupI cache.AddKeys(c.getSuperGroupRecvNotNotifyUserIDsHashKey(groupID)) return cache } + +func (c *ConversationRedisCache) getUserAllHasReadSeqsIndex(conversationID string, conversationIDs []string) (int, error) { + for _i, _conversationID := range conversationIDs { + if _conversationID == conversationID { + return _i, nil + } + } + return 0, errors.New("not found key:" + conversationID + " in keys") +} + +func (c *ConversationRedisCache) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) { + keys, conversationIDs, err := c.getAllConversationIDsKeys(ctx, ownerUserID) + if err != nil { + return nil, err + } + return batchGetCacheMap(ctx, c.rcClient, keys, conversationIDs, c.expireTime, c.getUserAllHasReadSeqsIndex, func(ctx context.Context) (map[string]int64, error) { + return c.conversationDB.GetUserAllHasReadSeqs(ctx, ownerUserID) + }) +} + +func (c *ConversationRedisCache) DelUserAllHasReadSeqs(ownerUserID string, conversationIDs ...string) ConversationCache { + cache := c.NewCache() + for _, conversationID := range conversationIDs { + cache.AddKeys(c.getConversationHasReadSeqKey(ownerUserID, conversationID)) + } + return cache +} diff --git a/pkg/common/db/cache/meta_cache.go b/pkg/common/db/cache/meta_cache.go index 5866b5b82..b001fa227 100644 --- a/pkg/common/db/cache/meta_cache.go +++ b/pkg/common/db/cache/meta_cache.go @@ -153,15 +153,15 @@ func batchGetCache[T any](ctx context.Context, rcClient *rockscache.Client, keys return tArrays, nil } -func batchGetCacheMap[T any](ctx context.Context, rcClient *rockscache.Client, keys []string, originKeys []string, expire time.Duration, keyIndexFn func(t T, keys []string) (int, error), fn func(ctx context.Context) (map[string]T, error)) (map[string]T, error) { +func batchGetCacheMap[T any](ctx context.Context, rcClient *rockscache.Client, keys, originKeys []string, expire time.Duration, keyIndexFn func(s string, keys []string) (int, error), fn func(ctx context.Context) (map[string]T, error)) (map[string]T, error) { batchMap, err := rcClient.FetchBatch2(ctx, keys, expire, func(idxs []int) (m map[int]string, err error) { - values := make(map[int]string) tArrays, err := fn(ctx) if err != nil { return nil, err } - for _, v := range tArrays { - index, err := keyIndexFn(v, keys) + values := make(map[int]string) + for k, v := range tArrays { + index, err := keyIndexFn(k, originKeys) if err != nil { continue } @@ -184,7 +184,7 @@ func batchGetCacheMap[T any](ctx context.Context, rcClient *rockscache.Client, k if err != nil { return nil, utils.Wrap(err, "unmarshal failed") } - tMap[keys[i]] = t + tMap[originKeys[i]] = t } } return tMap, nil diff --git a/pkg/common/db/cache/msg.go b/pkg/common/db/cache/msg.go index a53e5a5e8..6d56dba4a 100644 --- a/pkg/common/db/cache/msg.go +++ b/pkg/common/db/cache/msg.go @@ -251,14 +251,14 @@ func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, failedSeqs = append(failedSeqs, seqs[i]) } else { msg := sdkws.MsgData{} - err = jsonpb.UnmarshalString(cmd.Val(), &msg) + err = utils.String2Pb(cmd.Val(), &msg) if err == nil { if msg.Status != constant.MsgDeleted { seqMsgs = append(seqMsgs, &msg) continue } } else { - log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i]) + log.ZWarn(ctx, "UnmarshalString failed", err, "conversationID", conversationID, "seq", seqs[i], "msg", cmd.Val()) } failedSeqs = append(failedSeqs, seqs[i]) } diff --git a/pkg/common/db/controller/conversation.go b/pkg/common/db/controller/conversation.go index cb2dc6f15..b5640a169 100644 --- a/pkg/common/db/controller/conversation.go +++ b/pkg/common/db/controller/conversation.go @@ -32,6 +32,7 @@ type ConversationDatabase interface { GetConversationIDs(ctx context.Context, userID string) ([]string, error) GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) GetAllConversationIDs(ctx context.Context) ([]string, error) + GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) } func NewConversationDatabase(conversation relationTb.ConversationModelInterface, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -250,3 +251,7 @@ func (c *ConversationDataBase) GetUserConversationIDsHash(ctx context.Context, o func (c *ConversationDataBase) GetAllConversationIDs(ctx context.Context) ([]string, error) { return c.conversationDB.GetAllConversationIDs(ctx) } + +func (c *ConversationDataBase) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (map[string]int64, error) { + return c.cache.GetUserAllHasReadSeqs(ctx, ownerUserID) +} diff --git a/pkg/common/db/relation/conversation_model.go b/pkg/common/db/relation/conversation_model.go index 09b7599da..f6cd94317 100644 --- a/pkg/common/db/relation/conversation_model.go +++ b/pkg/common/db/relation/conversation_model.go @@ -80,3 +80,13 @@ func (c *ConversationGorm) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, c func (c *ConversationGorm) GetAllConversationIDs(ctx context.Context) (conversationIDs []string, err error) { return conversationIDs, utils.Wrap(c.db(ctx).Distinct("conversation_id").Pluck("conversation_id", &conversationIDs).Error, "") } + +func (c *ConversationGorm) GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hasReadSeqs map[string]int64, err error) { + var conversations []*relation.ConversationModel + err = utils.Wrap(c.db(ctx).Where("owner_user_id = ?", ownerUserID).Select("conversation_id", "has_read_seq").Find(&conversations).Error, "") + hasReadSeqs = make(map[string]int64, len(conversations)) + for _, conversation := range conversations { + hasReadSeqs[conversation.ConversationID] = conversation.HasReadSeq + } + return hasReadSeqs, err +} diff --git a/pkg/common/db/table/relation/conversation.go b/pkg/common/db/table/relation/conversation.go index 0f44d096c..d5fc2bea0 100644 --- a/pkg/common/db/table/relation/conversation.go +++ b/pkg/common/db/table/relation/conversation.go @@ -45,5 +45,6 @@ type ConversationModelInterface interface { GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) FindSuperGroupRecvMsgNotNotifyUserIDs(ctx context.Context, groupID string) ([]string, error) GetAllConversationIDs(ctx context.Context) ([]string, error) + GetUserAllHasReadSeqs(ctx context.Context, ownerUserID string) (hashReadSeqs map[string]int64, err error) NewTx(tx any) ConversationModelInterface }