From 07a2af016975e0847202200f27db3ecb682b6a18 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 21 Mar 2025 14:54:34 +0800 Subject: [PATCH] feat: add filtering for invalid messages and invalid conversations to prevent data-fetching exceptions after conversations are deleted. --- internal/api/jssdk/jssdk.go | 80 +++++++++++++++++++++++++++---------- pkg/rpcli/msg.go | 4 +- 2 files changed, 62 insertions(+), 22 deletions(-) diff --git a/internal/api/jssdk/jssdk.go b/internal/api/jssdk/jssdk.go index 3c0911207..0d30b1ea0 100644 --- a/internal/api/jssdk/jssdk.go +++ b/internal/api/jssdk/jssdk.go @@ -2,10 +2,14 @@ package jssdk import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "sort" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/log" + "github.com/gin-gonic/gin" + "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/jssdk" "github.com/openimsdk/protocol/msg" @@ -109,10 +113,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive if len(conversationIDs) == 0 { return &jssdk.GetActiveConversationsResp{}, nil } - readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID) - if err != nil { - return nil, err - } + activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs) if err != nil { return nil, err @@ -120,6 +121,10 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive if len(activeConversation) == 0 { return &jssdk.GetActiveConversationsResp{}, nil } + readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID) + if err != nil { + return nil, err + } sortConversations := sortActiveConversations{ Conversation: activeConversation, } @@ -147,6 +152,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive if err != nil { return nil, err } + x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs) conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string { return c.ConversationID }) @@ -156,16 +162,15 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive if !ok { continue } - var lastMsg *sdkws.MsgData if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { - lastMsg = msgList.Msgs[0] + resp = append(resp, &jssdk.ConversationMsg{ + Conversation: conv, + LastMsg: msgList.Msgs[0], + MaxSeq: c.MaxSeq, + ReadSeq: readSeq[c.ConversationID], + }) } - resp = append(resp, &jssdk.ConversationMsg{ - Conversation: conv, - LastMsg: lastMsg, - MaxSeq: c.MaxSeq, - ReadSeq: readSeq[c.ConversationID], - }) + } if err := x.fillConversations(ctx, resp); err != nil { return nil, err @@ -219,18 +224,18 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation return nil, err } } + x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs) resp := make([]*jssdk.ConversationMsg, 0, len(conversations)) for _, c := range conversations { - var lastMsg *sdkws.MsgData if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { - lastMsg = msgList.Msgs[0] + resp = append(resp, &jssdk.ConversationMsg{ + Conversation: c, + LastMsg: msgList.Msgs[0], + MaxSeq: maxSeqs[c.ConversationID], + ReadSeq: readSeqs[c.ConversationID], + }) } - resp = append(resp, &jssdk.ConversationMsg{ - Conversation: c, - LastMsg: lastMsg, - MaxSeq: maxSeqs[c.ConversationID], - ReadSeq: readSeqs[c.ConversationID], - }) + } if err := x.fillConversations(ctx, resp); err != nil { return nil, err @@ -247,3 +252,36 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation UnreadCount: unreadCount, }, nil } + +// This function checks whether the latest MaxSeq message is valid. +// If not, it needs to fetch a valid message again. +func (x *JSSdk) checkMessagesAndGetLastMessage(ctx context.Context, userID string, messages map[string]*sdkws.PullMsgs) { + var conversationIDs []string + + for conversationID, message := range messages { + allInValid := true + for _, data := range message.Msgs { + if data.Status < constant.MsgStatusHasDeleted { + allInValid = false + break + } + } + if allInValid { + conversationIDs = append(conversationIDs, conversationID) + } + } + if len(conversationIDs) > 0 { + resp, err := x.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{ + UserID: userID, + ConversationIDs: conversationIDs, + }) + if err != nil { + log.ZError(ctx, "fetchLatestValidMessages", err, "conversationIDs", conversationIDs) + return + } + for conversationID, message := range resp.Msgs { + messages[conversationID] = &sdkws.PullMsgs{Msgs: []*sdkws.MsgData{message}} + } + } + +} diff --git a/pkg/rpcli/msg.go b/pkg/rpcli/msg.go index 0c44b7c8b..e4d1ece6e 100644 --- a/pkg/rpcli/msg.go +++ b/pkg/rpcli/msg.go @@ -2,9 +2,11 @@ package rpcli import ( "context" + + "google.golang.org/grpc" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" - "google.golang.org/grpc" ) func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient {