mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-05 05:12:45 +08:00
feat: add filtering for invalid messages and invalid conversations to prevent data-fetching exceptions after conversations are deleted. (#3239) (#3247)
Co-authored-by: OpenIM-Gordon <1432970085@qq.com>
This commit is contained in:
parent
22ba315acd
commit
fac3a5b1d8
@ -2,10 +2,14 @@ package jssdk
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||
"sort"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
|
||||
"github.com/openimsdk/protocol/constant"
|
||||
"github.com/openimsdk/tools/log"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
"github.com/openimsdk/protocol/conversation"
|
||||
"github.com/openimsdk/protocol/jssdk"
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
@ -109,10 +113,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
||||
if len(conversationIDs) == 0 {
|
||||
return &jssdk.GetActiveConversationsResp{}, nil
|
||||
}
|
||||
readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -120,6 +121,10 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
||||
if len(activeConversation) == 0 {
|
||||
return &jssdk.GetActiveConversationsResp{}, nil
|
||||
}
|
||||
readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sortConversations := sortActiveConversations{
|
||||
Conversation: activeConversation,
|
||||
}
|
||||
@ -147,6 +152,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs)
|
||||
conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string {
|
||||
return c.ConversationID
|
||||
})
|
||||
@ -156,16 +162,15 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
var lastMsg *sdkws.MsgData
|
||||
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
||||
lastMsg = msgList.Msgs[0]
|
||||
resp = append(resp, &jssdk.ConversationMsg{
|
||||
Conversation: conv,
|
||||
LastMsg: msgList.Msgs[0],
|
||||
MaxSeq: c.MaxSeq,
|
||||
ReadSeq: readSeq[c.ConversationID],
|
||||
})
|
||||
}
|
||||
resp = append(resp, &jssdk.ConversationMsg{
|
||||
Conversation: conv,
|
||||
LastMsg: lastMsg,
|
||||
MaxSeq: c.MaxSeq,
|
||||
ReadSeq: readSeq[c.ConversationID],
|
||||
})
|
||||
|
||||
}
|
||||
if err := x.fillConversations(ctx, resp); err != nil {
|
||||
return nil, err
|
||||
@ -219,18 +224,18 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs)
|
||||
resp := make([]*jssdk.ConversationMsg, 0, len(conversations))
|
||||
for _, c := range conversations {
|
||||
var lastMsg *sdkws.MsgData
|
||||
if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 {
|
||||
lastMsg = msgList.Msgs[0]
|
||||
resp = append(resp, &jssdk.ConversationMsg{
|
||||
Conversation: c,
|
||||
LastMsg: msgList.Msgs[0],
|
||||
MaxSeq: maxSeqs[c.ConversationID],
|
||||
ReadSeq: readSeqs[c.ConversationID],
|
||||
})
|
||||
}
|
||||
resp = append(resp, &jssdk.ConversationMsg{
|
||||
Conversation: c,
|
||||
LastMsg: lastMsg,
|
||||
MaxSeq: maxSeqs[c.ConversationID],
|
||||
ReadSeq: readSeqs[c.ConversationID],
|
||||
})
|
||||
|
||||
}
|
||||
if err := x.fillConversations(ctx, resp); err != nil {
|
||||
return nil, err
|
||||
@ -247,3 +252,36 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
|
||||
UnreadCount: unreadCount,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// This function checks whether the latest MaxSeq message is valid.
|
||||
// If not, it needs to fetch a valid message again.
|
||||
func (x *JSSdk) checkMessagesAndGetLastMessage(ctx context.Context, userID string, messages map[string]*sdkws.PullMsgs) {
|
||||
var conversationIDs []string
|
||||
|
||||
for conversationID, message := range messages {
|
||||
allInValid := true
|
||||
for _, data := range message.Msgs {
|
||||
if data.Status < constant.MsgStatusHasDeleted {
|
||||
allInValid = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if allInValid {
|
||||
conversationIDs = append(conversationIDs, conversationID)
|
||||
}
|
||||
}
|
||||
if len(conversationIDs) > 0 {
|
||||
resp, err := x.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{
|
||||
UserID: userID,
|
||||
ConversationIDs: conversationIDs,
|
||||
})
|
||||
if err != nil {
|
||||
log.ZError(ctx, "fetchLatestValidMessages", err, "conversationIDs", conversationIDs)
|
||||
return
|
||||
}
|
||||
for conversationID, message := range resp.Msgs {
|
||||
messages[conversationID] = &sdkws.PullMsgs{Msgs: []*sdkws.MsgData{message}}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,9 +2,11 @@ package rpcli
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/openimsdk/protocol/msg"
|
||||
"github.com/openimsdk/protocol/sdkws"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient {
|
||||
|
Loading…
x
Reference in New Issue
Block a user