mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-24 02:16:16 +08:00
cache
This commit is contained in:
parent
df68e1cb4f
commit
dacee9e86f
@ -173,6 +173,9 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *conversationServer) SetConversations(ctx context.Context, req *pbConversation.SetConversationsReq) (*pbConversation.SetConversationsResp, error) {
|
func (c *conversationServer) SetConversations(ctx context.Context, req *pbConversation.SetConversationsReq) (*pbConversation.SetConversationsResp, error) {
|
||||||
|
if req.Conversation == nil {
|
||||||
|
return nil, errs.ErrArgs.Wrap("conversation must not be nil")
|
||||||
|
}
|
||||||
isSyncConversation := true
|
isSyncConversation := true
|
||||||
if req.Conversation.ConversationType == constant.GroupChatType {
|
if req.Conversation.ConversationType == constant.GroupChatType {
|
||||||
groupInfo, err := c.groupRpcClient.GetGroupInfo(ctx, req.Conversation.GroupID)
|
groupInfo, err := c.groupRpcClient.GetGroupInfo(ctx, req.Conversation.GroupID)
|
||||||
|
@ -7,7 +7,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (m *msgServer) GetConversationMaxSeq(ctx context.Context, req *pbMsg.GetConversationMaxSeqReq) (resp *pbMsg.GetConversationMaxSeqResp, err error) {
|
func (m *msgServer) GetConversationMaxSeq(ctx context.Context, req *pbMsg.GetConversationMaxSeqReq) (resp *pbMsg.GetConversationMaxSeqResp, err error) {
|
||||||
resp = &pbMsg.GetConversationMaxSeqResp{}
|
maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID)
|
||||||
resp.MaxSeq, err = m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID)
|
if err != nil {
|
||||||
return resp, err
|
return nil, err
|
||||||
|
}
|
||||||
|
return &pbMsg.GetConversationMaxSeqResp{MaxSeq: maxSeq}, nil
|
||||||
}
|
}
|
||||||
|
@ -510,24 +510,41 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
|
|||||||
delSeqs, err := db.cache.GetUserDelList(ctx, userID, conversationID)
|
delSeqs, err := db.cache.GetUserDelList(ctx, userID, conversationID)
|
||||||
if err != nil && errs.Unwrap(err) != redis.Nil {
|
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||||
return 0, 0, nil, err
|
return 0, 0, nil, err
|
||||||
} else {
|
|
||||||
log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID)
|
|
||||||
}
|
}
|
||||||
|
var cacheDelNum int
|
||||||
for _, msg := range cachedMsgs {
|
for _, msg := range cachedMsgs {
|
||||||
if !utils.Contain(msg.Seq, delSeqs...) {
|
if !utils.Contain(msg.Seq, delSeqs...) {
|
||||||
successMsgs = append(successMsgs, msg)
|
successMsgs = append(successMsgs, msg)
|
||||||
|
} else {
|
||||||
|
cacheDelNum += 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := 1; i <= len(delSeqs); i++ {
|
log.ZDebug(ctx, "get delSeqs from redis", "delSeqs", delSeqs, "userID", userID, "conversationID", conversationID, "cacheDelNum", cacheDelNum)
|
||||||
|
var reGetSeqsCache []int64
|
||||||
|
for i := 1; i <= cacheDelNum; {
|
||||||
newSeq := newBegin - int64(i)
|
newSeq := newBegin - int64(i)
|
||||||
if newSeq >= begin {
|
if newSeq >= begin {
|
||||||
|
if !utils.Contain(newSeq, delSeqs...) {
|
||||||
log.ZDebug(ctx, "seq del in cache, a new seq in range append", "new seq", newSeq)
|
log.ZDebug(ctx, "seq del in cache, a new seq in range append", "new seq", newSeq)
|
||||||
failedSeqs = append(failedSeqs, newSeq)
|
reGetSeqsCache = append(reGetSeqsCache, newSeq)
|
||||||
|
i++
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if len(reGetSeqsCache) > 0 {
|
||||||
|
log.ZDebug(ctx, "reGetSeqsCache", "reGetSeqsCache", reGetSeqsCache)
|
||||||
|
cachedMsgs, failedSeqs2, err := db.cache.GetMessagesBySeq(ctx, conversationID, reGetSeqsCache)
|
||||||
|
if err != nil {
|
||||||
|
if err != redis.Nil {
|
||||||
|
prome.Add(prome.MsgPullFromRedisFailedCounter, len(failedSeqs2))
|
||||||
|
log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", reGetSeqsCache)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
failedSeqs = append(failedSeqs, failedSeqs2...)
|
||||||
|
successMsgs = append(successMsgs, cachedMsgs...)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
log.ZDebug(ctx, "get msgs from cache", "successMsgs", successMsgs)
|
log.ZDebug(ctx, "get msgs from cache", "successMsgs", successMsgs)
|
||||||
if len(failedSeqs) != 0 {
|
if len(failedSeqs) != 0 {
|
||||||
@ -544,6 +561,7 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
|
|||||||
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
|
prome.Add(prome.MsgPullFromMongoSuccessCounter, len(mongoMsgs))
|
||||||
successMsgs = append(successMsgs, mongoMsgs...)
|
successMsgs = append(successMsgs, mongoMsgs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
return minSeq, maxSeq, successMsgs, nil
|
return minSeq, maxSeq, successMsgs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user