mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-27 05:52:29 +08:00 
			
		
		
		
	update set seq implement.
This commit is contained in:
		
							parent
							
								
									83ac4e83b9
								
							
						
					
					
						commit
						dda7e13dce
					
				| @ -436,19 +436,37 @@ func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, r | |||||||
| 	return &pbconversation.CreateGroupChatConversationsResp{}, nil | 	return &pbconversation.CreateGroupChatConversationsResp{}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbconversation.SetConversationMaxSeqReq) (*pbconversation.SetConversationMaxSeqResp, error) { | ||||||
|  | // 	if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, | ||||||
|  | // 		map[string]any{"max_seq": req.MaxSeq}); err != nil { | ||||||
|  | // 		return nil, err | ||||||
|  | // 	} | ||||||
|  | 
 | ||||||
|  | // 	return &pbconversation.SetConversationMaxSeqResp{}, nil | ||||||
|  | // } | ||||||
|  | 
 | ||||||
| func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbconversation.SetConversationMaxSeqReq) (*pbconversation.SetConversationMaxSeqResp, error) { | func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbconversation.SetConversationMaxSeqReq) (*pbconversation.SetConversationMaxSeqResp, error) { | ||||||
| 	if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, | 
 | ||||||
| 		map[string]any{"max_seq": req.MaxSeq}); err != nil { | 	if err := c.conversationDatabase.SetUserConversationMaxSeq(ctx, req.ConversationID, req.OwnerUserID[0], req.MaxSeq); err != nil { | ||||||
| 		return nil, err | 		return nil, errs.Wrap(err) | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	return &pbconversation.SetConversationMaxSeqResp{}, nil | 	return &pbconversation.SetConversationMaxSeqResp{}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) { | ||||||
|  | // 	if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, | ||||||
|  | // 		map[string]any{"min_seq": req.MinSeq}); err != nil { | ||||||
|  | // 		return nil, err | ||||||
|  | // 	} | ||||||
|  | // 	return &pbconversation.SetConversationMinSeqResp{}, nil | ||||||
|  | // } | ||||||
|  | 
 | ||||||
| func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) { | func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) { | ||||||
| 	if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID, | 	if err := c.conversationDatabase.SetUserConversationMinSeq(ctx, req.ConversationID, req.OwnerUserID[0], req.MinSeq); err != nil { | ||||||
| 		map[string]any{"min_seq": req.MinSeq}); err != nil { | 		return nil, errs.Wrap(err) | ||||||
| 		return nil, err |  | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
| 	return &pbconversation.SetConversationMinSeqResp{}, nil | 	return &pbconversation.SetConversationMinSeqResp{}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -12,6 +12,7 @@ import ( | |||||||
| 	"github.com/openimsdk/tools/errs" | 	"github.com/openimsdk/tools/errs" | ||||||
| 	"github.com/openimsdk/tools/log" | 	"github.com/openimsdk/tools/log" | ||||||
| 	"github.com/openimsdk/tools/mcontext" | 	"github.com/openimsdk/tools/mcontext" | ||||||
|  | 	"github.com/openimsdk/tools/utils/datautil" | ||||||
| 	"github.com/openimsdk/tools/utils/idutil" | 	"github.com/openimsdk/tools/utils/idutil" | ||||||
| 	"github.com/openimsdk/tools/utils/stringutil" | 	"github.com/openimsdk/tools/utils/stringutil" | ||||||
| 	"golang.org/x/sync/errgroup" | 	"golang.org/x/sync/errgroup" | ||||||
| @ -100,6 +101,8 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) | |||||||
| 				} | 				} | ||||||
| 
 | 
 | ||||||
| 				if len(seqs) > 0 { | 				if len(seqs) > 0 { | ||||||
|  | 					minseq := datautil.Max(seqs...) | ||||||
|  | 
 | ||||||
| 					if err := m.Conversation.UpdateConversation(handleCtx, | 					if err := m.Conversation.UpdateConversation(handleCtx, | ||||||
| 						&pbconversation.UpdateConversationReq{ | 						&pbconversation.UpdateConversationReq{ | ||||||
| 							UserIDs:               []string{conversation.OwnerUserID}, | 							UserIDs:               []string{conversation.OwnerUserID}, | ||||||
| @ -109,6 +112,10 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) | |||||||
| 						continue | 						continue | ||||||
| 					} | 					} | ||||||
| 
 | 
 | ||||||
|  | 					if err := m.Conversation.SetConversationMinSeq(handleCtx, []string{conversation.OwnerUserID}, conversation.ConversationID, minseq); err != nil { | ||||||
|  | 						return err | ||||||
|  | 					} | ||||||
|  | 
 | ||||||
| 					// if you need Notify SDK client userseq is update. | 					// if you need Notify SDK client userseq is update. | ||||||
| 					// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) | 					// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs) | ||||||
| 				} | 				} | ||||||
|  | |||||||
| @ -74,6 +74,15 @@ type ConversationDatabase interface { | |||||||
| 	GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) | 	GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) | ||||||
| 	// GetPinnedConversationIDs gets pinned conversationIDs by userID | 	// GetPinnedConversationIDs gets pinned conversationIDs by userID | ||||||
| 	GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) | 	GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) | ||||||
|  | 
 | ||||||
|  | 	// GetUserConversationMinSeq is get user specific conversation min seq | ||||||
|  | 	GetUserConversationMinSeq(ctx context.Context, conversationID, userID string) (int64, error) | ||||||
|  | 	// SetUserConversationMinSeq is set user specific conversation min seq | ||||||
|  | 	SetUserConversationMinSeq(ctx context.Context, conversationID, userID string, seq int64) error | ||||||
|  | 	// GetUserConversationMaxSeq is get user specific conversation max seq | ||||||
|  | 	GetUserConversationMaxSeq(ctx context.Context, conversationID, userID string) (int64, error) | ||||||
|  | 	// SetUserConversationMaxSeq is set user specific conversation max seq | ||||||
|  | 	SetUserConversationMaxSeq(ctx context.Context, conversationID, userID string, seq int64) error | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { | func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { | ||||||
| @ -401,3 +410,28 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use | |||||||
| 	} | 	} | ||||||
| 	return conversationIDs, nil | 	return conversationIDs, nil | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (c *conversationDatabase) GetUserConversationMinSeq(ctx context.Context, conversationID, userID string) (int64, error) { | ||||||
|  | 	seq, err := c.conversationDB.GetUserConversationMinSeq(ctx, conversationID, userID) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return 0, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return seq, nil | ||||||
|  | } | ||||||
|  | func (c *conversationDatabase) SetUserConversationMinSeq(ctx context.Context, conversationID, userID string, seq int64) error { | ||||||
|  | 	return c.conversationDB.SetUserConversationMinSeq(ctx, conversationID, userID, seq) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *conversationDatabase) GetUserConversationMaxSeq(ctx context.Context, conversationID, userID string) (int64, error) { | ||||||
|  | 	seq, err := c.conversationDB.GetUserConversationMaxSeq(ctx, conversationID, userID) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return 0, err | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	return seq, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (c *conversationDatabase) SetUserConversationMaxSeq(ctx context.Context, conversationID, userID string, seq int64) error { | ||||||
|  | 	return c.conversationDB.SetUserConversationMaxSeq(ctx, conversationID, userID, seq) | ||||||
|  | } | ||||||
|  | |||||||
| @ -16,6 +16,7 @@ package database | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
| 	"github.com/openimsdk/tools/db/pagination" | 	"github.com/openimsdk/tools/db/pagination" | ||||||
| ) | ) | ||||||
| @ -41,4 +42,9 @@ type Conversation interface { | |||||||
| 	GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) | 	GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) | ||||||
| 	GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) | 	GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) | ||||||
| 	FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) | 	FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) | ||||||
|  | 
 | ||||||
|  | 	GetUserConversationMinSeq(ctx context.Context, conversationID, userID string) (int64, error) | ||||||
|  | 	SetUserConversationMinSeq(ctx context.Context, conversationID, userID string, seq int64) error | ||||||
|  | 	GetUserConversationMaxSeq(ctx context.Context, conversationID, userID string) (int64, error) | ||||||
|  | 	SetUserConversationMaxSeq(ctx context.Context, conversationID, userID string, seq int64) error | ||||||
| } | } | ||||||
|  | |||||||
| @ -16,9 +16,11 @@ package mgo | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"errors" | ||||||
|  | 	"time" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
| 	"time" |  | ||||||
| 
 | 
 | ||||||
| 	"github.com/openimsdk/protocol/constant" | 	"github.com/openimsdk/protocol/constant" | ||||||
| 	"github.com/openimsdk/tools/db/mongoutil" | 	"github.com/openimsdk/tools/db/mongoutil" | ||||||
| @ -227,3 +229,49 @@ func (c *ConversationMgo) GetConversationNotReceiveMessageUserIDs(ctx context.Co | |||||||
| func (c *ConversationMgo) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) { | func (c *ConversationMgo) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) { | ||||||
| 	return c.version.FindChangeLog(ctx, userID, version, limit) | 	return c.version.FindChangeLog(ctx, userID, version, limit) | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (s *ConversationMgo) setSeq(ctx context.Context, conversationID string, userID string, seq int64, field string) error { | ||||||
|  | 	filter := map[string]any{ | ||||||
|  | 		"owner_user_id":   userID, | ||||||
|  | 		"conversation_id": conversationID, | ||||||
|  | 	} | ||||||
|  | 	update := map[string]any{ | ||||||
|  | 		"$set": bson.M{ | ||||||
|  | 			field: seq, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  | 	return mongoutil.UpdateOne(ctx, s.coll, filter, update, false, nil) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (s *ConversationMgo) getSeq(ctx context.Context, conversationID string, userID string, failed string) (int64, error) { | ||||||
|  | 	filter := map[string]any{ | ||||||
|  | 		"owner_user_id":   userID, | ||||||
|  | 		"conversation_id": conversationID, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	opt := options.FindOne().SetProjection(bson.M{"_id": 0, failed: 1}) | ||||||
|  | 	seq, err := mongoutil.FindOne[int64](ctx, s.coll, filter, opt) | ||||||
|  | 	if err == nil { | ||||||
|  | 		return seq, nil | ||||||
|  | 	} else if errors.Is(err, mongo.ErrNoDocuments) { | ||||||
|  | 		return 0, nil | ||||||
|  | 	} else { | ||||||
|  | 		return 0, err | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (s *ConversationMgo) GetUserConversationMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) { | ||||||
|  | 	return s.getSeq(ctx, conversationID, userID, "min_seq") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (s *ConversationMgo) SetUserConversationMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error { | ||||||
|  | 	return s.setSeq(ctx, conversationID, userID, seq, "min_seq") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (s *ConversationMgo) GetUserConversationMaxSeq(ctx context.Context, conversationID string, userID string) (int64, error) { | ||||||
|  | 	return s.getSeq(ctx, conversationID, userID, "max_seq") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (s *ConversationMgo) SetUserConversationMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error { | ||||||
|  | 	return s.setSeq(ctx, conversationID, userID, seq, "max_seq") | ||||||
|  | } | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user