mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 21:22:16 +08:00 
			
		
		
		
	feat: add filtering for invalid messages and invalid conversations to prevent data-fetching exceptions after conversations are deleted. (#3239)
This commit is contained in:
		
							parent
							
								
									22ba315acd
								
							
						
					
					
						commit
						10da5ee5d9
					
				| @ -2,10 +2,14 @@ package jssdk | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" |  | ||||||
| 	"sort" | 	"sort" | ||||||
| 
 | 
 | ||||||
|  | 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||||
|  | 	"github.com/openimsdk/protocol/constant" | ||||||
|  | 	"github.com/openimsdk/tools/log" | ||||||
|  | 
 | ||||||
| 	"github.com/gin-gonic/gin" | 	"github.com/gin-gonic/gin" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/protocol/conversation" | 	"github.com/openimsdk/protocol/conversation" | ||||||
| 	"github.com/openimsdk/protocol/jssdk" | 	"github.com/openimsdk/protocol/jssdk" | ||||||
| 	"github.com/openimsdk/protocol/msg" | 	"github.com/openimsdk/protocol/msg" | ||||||
| @ -109,10 +113,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive | |||||||
| 	if len(conversationIDs) == 0 { | 	if len(conversationIDs) == 0 { | ||||||
| 		return &jssdk.GetActiveConversationsResp{}, nil | 		return &jssdk.GetActiveConversationsResp{}, nil | ||||||
| 	} | 	} | ||||||
| 	readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID) | 
 | ||||||
| 	if err != nil { |  | ||||||
| 		return nil, err |  | ||||||
| 	} |  | ||||||
| 	activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs) | 	activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| @ -120,6 +121,10 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive | |||||||
| 	if len(activeConversation) == 0 { | 	if len(activeConversation) == 0 { | ||||||
| 		return &jssdk.GetActiveConversationsResp{}, nil | 		return &jssdk.GetActiveConversationsResp{}, nil | ||||||
| 	} | 	} | ||||||
|  | 	readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
| 	sortConversations := sortActiveConversations{ | 	sortConversations := sortActiveConversations{ | ||||||
| 		Conversation: activeConversation, | 		Conversation: activeConversation, | ||||||
| 	} | 	} | ||||||
| @ -147,6 +152,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | 	x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs) | ||||||
| 	conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string { | 	conversationMap := datautil.SliceToMap(conversations, func(c *conversation.Conversation) string { | ||||||
| 		return c.ConversationID | 		return c.ConversationID | ||||||
| 	}) | 	}) | ||||||
| @ -156,17 +162,16 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive | |||||||
| 		if !ok { | 		if !ok { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		var lastMsg *sdkws.MsgData |  | ||||||
| 		if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { | 		if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { | ||||||
| 			lastMsg = msgList.Msgs[0] |  | ||||||
| 		} |  | ||||||
| 			resp = append(resp, &jssdk.ConversationMsg{ | 			resp = append(resp, &jssdk.ConversationMsg{ | ||||||
| 				Conversation: conv, | 				Conversation: conv, | ||||||
| 			LastMsg:      lastMsg, | 				LastMsg:      msgList.Msgs[0], | ||||||
| 				MaxSeq:       c.MaxSeq, | 				MaxSeq:       c.MaxSeq, | ||||||
| 				ReadSeq:      readSeq[c.ConversationID], | 				ReadSeq:      readSeq[c.ConversationID], | ||||||
| 			}) | 			}) | ||||||
| 		} | 		} | ||||||
|  | 
 | ||||||
|  | 	} | ||||||
| 	if err := x.fillConversations(ctx, resp); err != nil { | 	if err := x.fillConversations(ctx, resp); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| @ -219,19 +224,19 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation | |||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  | 	x.checkMessagesAndGetLastMessage(ctx, req.OwnerUserID, msgs) | ||||||
| 	resp := make([]*jssdk.ConversationMsg, 0, len(conversations)) | 	resp := make([]*jssdk.ConversationMsg, 0, len(conversations)) | ||||||
| 	for _, c := range conversations { | 	for _, c := range conversations { | ||||||
| 		var lastMsg *sdkws.MsgData |  | ||||||
| 		if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { | 		if msgList, ok := msgs[c.ConversationID]; ok && len(msgList.Msgs) > 0 { | ||||||
| 			lastMsg = msgList.Msgs[0] |  | ||||||
| 		} |  | ||||||
| 			resp = append(resp, &jssdk.ConversationMsg{ | 			resp = append(resp, &jssdk.ConversationMsg{ | ||||||
| 				Conversation: c, | 				Conversation: c, | ||||||
| 			LastMsg:      lastMsg, | 				LastMsg:      msgList.Msgs[0], | ||||||
| 				MaxSeq:       maxSeqs[c.ConversationID], | 				MaxSeq:       maxSeqs[c.ConversationID], | ||||||
| 				ReadSeq:      readSeqs[c.ConversationID], | 				ReadSeq:      readSeqs[c.ConversationID], | ||||||
| 			}) | 			}) | ||||||
| 		} | 		} | ||||||
|  | 
 | ||||||
|  | 	} | ||||||
| 	if err := x.fillConversations(ctx, resp); err != nil { | 	if err := x.fillConversations(ctx, resp); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| @ -247,3 +252,36 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation | |||||||
| 		UnreadCount:   unreadCount, | 		UnreadCount:   unreadCount, | ||||||
| 	}, nil | 	}, nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | // This function checks whether the latest MaxSeq message is valid. | ||||||
|  | // If not, it needs to fetch a valid message again. | ||||||
|  | func (x *JSSdk) checkMessagesAndGetLastMessage(ctx context.Context, userID string, messages map[string]*sdkws.PullMsgs) { | ||||||
|  | 	var conversationIDs []string | ||||||
|  | 
 | ||||||
|  | 	for conversationID, message := range messages { | ||||||
|  | 		allInValid := true | ||||||
|  | 		for _, data := range message.Msgs { | ||||||
|  | 			if data.Status < constant.MsgStatusHasDeleted { | ||||||
|  | 				allInValid = false | ||||||
|  | 				break | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		if allInValid { | ||||||
|  | 			conversationIDs = append(conversationIDs, conversationID) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	if len(conversationIDs) > 0 { | ||||||
|  | 		resp, err := x.msgClient.GetLastMessage(ctx, &msg.GetLastMessageReq{ | ||||||
|  | 			UserID:          userID, | ||||||
|  | 			ConversationIDs: conversationIDs, | ||||||
|  | 		}) | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.ZError(ctx, "fetchLatestValidMessages", err, "conversationIDs", conversationIDs) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 		for conversationID, message := range resp.Msgs { | ||||||
|  | 			messages[conversationID] = &sdkws.PullMsgs{Msgs: []*sdkws.MsgData{message}} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | |||||||
| @ -2,9 +2,11 @@ package rpcli | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 
 | ||||||
|  | 	"google.golang.org/grpc" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/protocol/msg" | 	"github.com/openimsdk/protocol/msg" | ||||||
| 	"github.com/openimsdk/protocol/sdkws" | 	"github.com/openimsdk/protocol/sdkws" | ||||||
| 	"google.golang.org/grpc" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient { | func NewMsgClient(cc grpc.ClientConnInterface) *MsgClient { | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user