From dacee9e86fe240bfb6d8f7ca64e305aae65f30b1 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Mon, 12 Jun 2023 11:54:44 +0800 Subject: [PATCH] cache --- internal/rpc/conversation/conversaion.go | 3 +++ internal/rpc/msg/seq.go | 8 ++++--- pkg/common/db/controller/msg.go | 30 +++++++++++++++++++----- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 326be49da..7a8775981 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -173,6 +173,9 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p } func (c *conversationServer) SetConversations(ctx context.Context, req *pbConversation.SetConversationsReq) (*pbConversation.SetConversationsResp, error) { + if req.Conversation == nil { + return nil, errs.ErrArgs.Wrap("conversation must not be nil") + } isSyncConversation := true if req.Conversation.ConversationType == constant.GroupChatType { groupInfo, err := c.groupRpcClient.GetGroupInfo(ctx, req.Conversation.GroupID) diff --git a/internal/rpc/msg/seq.go b/internal/rpc/msg/seq.go index c73cd1d06..0d5e23d38 100644 --- a/internal/rpc/msg/seq.go +++ b/internal/rpc/msg/seq.go @@ -7,7 +7,9 @@ import ( ) func (m *msgServer) GetConversationMaxSeq(ctx context.Context, req *pbMsg.GetConversationMaxSeqReq) (resp *pbMsg.GetConversationMaxSeqResp, err error) { - resp = &pbMsg.GetConversationMaxSeqResp{} - resp.MaxSeq, err = m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID) - return resp, err + maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID) + if err != nil { + return nil, err + } + return &pbMsg.GetConversationMaxSeqResp{MaxSeq: maxSeq}, nil } diff --git a/pkg/common/db/controller/msg.go b/pkg/common/db/controller/msg.go index 53448682b..58b27d3f3 100644 --- a/pkg/common/db/controller/msg.go +++ b/pkg/common/db/controller/msg.go @@ -510,24 +510,41 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin delSeqs, err := db.cache.GetUserDelList(ctx, userID, conversationID) if err != nil && errs.Unwrap(err) != redis.Nil { return 0, 0, nil, err - } else { - log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID) } - + var cacheDelNum int for _, msg := range cachedMsgs { if !utils.Contain(msg.Seq, delSeqs...) { successMsgs = append(successMsgs, msg) + } else { + cacheDelNum += 1 } } - for i := 1; i <= len(delSeqs); i++ { + log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID, "cacheDelNum", cacheDelNum) + var reGetSeqsCache []int64 + for i := 1; i <= cacheDelNum; { newSeq := newBegin - int64(i) if newSeq >= begin { - log.ZDebug(ctx, "seq del in cache, a new seq in range append", "new seq", newSeq) - failedSeqs = append(failedSeqs, newSeq) + if !utils.Contain(newSeq, delSeqs...) { + log.ZDebug(ctx, "seq del in cache, a new seq in range append", "new seq", newSeq) + reGetSeqsCache = append(reGetSeqsCache, newSeq) + i++ + } } else { break } } + if len(reGetSeqsCache) > 0 { + log.ZDebug(ctx, "reGetSeqsCache", "reGetSeqsCache", reGetSeqsCache) + cachedMsgs, failedSeqs2, err := db.cache.GetMessagesBySeq(ctx, conversationID, reGetSeqsCache) + if err != nil { + if err != redis.Nil { + prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs2)) + log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", reGetSeqsCache) + } + } + failedSeqs = append(failedSeqs, failedSeqs2...) + successMsgs = append(successMsgs, cachedMsgs...) + } } log.ZDebug(ctx, "get msgs from cache", "successMsgs", successMsgs) if len(failedSeqs) != 0 { @@ -544,6 +561,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs)) successMsgs = append(successMsgs, mongoMsgs...) } + return minSeq, maxSeq, successMsgs, nil }