mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 21:22:16 +08:00 
			
		
		
		
	feat: get not notify conversationIDs
This commit is contained in:
		
							parent
							
								
									7090c99fa5
								
							
						
					
					
						commit
						db183688f0
					
				
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -12,7 +12,7 @@ require ( | ||||
| 	github.com/gorilla/websocket v1.5.1 | ||||
| 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | ||||
| 	github.com/mitchellh/mapstructure v1.5.0 | ||||
| 	github.com/openimsdk/protocol v0.0.72-alpha.25 | ||||
| 	github.com/openimsdk/protocol v0.0.72-alpha.27 | ||||
| 	github.com/openimsdk/tools v0.0.50-alpha.12 | ||||
| 	github.com/pkg/errors v0.9.1 // indirect | ||||
| 	github.com/prometheus/client_golang v1.18.0 | ||||
|  | ||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							| @ -322,8 +322,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= | ||||
| github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= | ||||
| github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= | ||||
| github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.25 h1:W8E6gnwt5V6anr/8lYOf5v/Lcsggf7gIAzJbw7YU6So= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.25/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.27 h1:S6n3uj7YhKjo2NCHHSnUijaJ9YYiy8TTMquc4EJOm50= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.27/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.12 h1:rV3BxgqN+F79vZvdoQ+97Eob8ScsRVEM8D+Wrcl23uo= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.12/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= | ||||
| github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | ||||
|  | ||||
| @ -710,3 +710,11 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex | ||||
| 
 | ||||
| 	return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil | ||||
| } | ||||
| 
 | ||||
| func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) { | ||||
| 	conversationIDs, err := c.conversationDatabase.GetNotNotifyConversationIDs(ctx, req.UserID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &pbconversation.GetNotNotifyConversationIDsResp{ConversationIDs: conversationIDs}, nil | ||||
| } | ||||
|  | ||||
| @ -17,6 +17,7 @@ package cachekey | ||||
| const ( | ||||
| 	ConversationKey                          = "CONVERSATION:" | ||||
| 	ConversationIDsKey                       = "CONVERSATION_IDS:" | ||||
| 	NotNotifyConversationIDsKey              = "NOT_NOTIFY_CONVERSATION_IDS:" | ||||
| 	ConversationIDsHashKey                   = "CONVERSATION_IDS_HASH:" | ||||
| 	ConversationHasReadSeqKey                = "CONVERSATION_HAS_READ_SEQ:" | ||||
| 	RecvMsgOptKey                            = "RECV_MSG_OPT:" | ||||
| @ -34,6 +35,10 @@ func GetConversationIDsKey(ownerUserID string) string { | ||||
| 	return ConversationIDsKey + ownerUserID | ||||
| } | ||||
| 
 | ||||
| func GetNotNotifyConversationIDsKey(ownerUserID string) string { | ||||
| 	return NotNotifyConversationIDsKey + ownerUserID | ||||
| } | ||||
| 
 | ||||
| func GetSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { | ||||
| 	return SuperGroupRecvMsgNotNotifyUserIDsKey + groupID | ||||
| } | ||||
|  | ||||
							
								
								
									
										3
									
								
								pkg/common/storage/cache/conversation.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										3
									
								
								pkg/common/storage/cache/conversation.go
									
									
									
									
										vendored
									
									
								
							| @ -25,6 +25,7 @@ type ConversationCache interface { | ||||
| 	CloneConversationCache() ConversationCache | ||||
| 	// get user's conversationIDs from msgCache | ||||
| 	GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) | ||||
| 	GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) | ||||
| 	DelConversationIDs(userIDs ...string) ConversationCache | ||||
| 
 | ||||
| 	GetUserConversationIDsHash(ctx context.Context, ownerUserID string) (hash uint64, err error) | ||||
| @ -54,7 +55,7 @@ type ConversationCache interface { | ||||
| 
 | ||||
| 	GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) | ||||
| 	DelConversationNotReceiveMessageUserIDs(conversationIDs ...string) ConversationCache | ||||
| 
 | ||||
| 	DelConversationNotNotifyMessageUserIDs(userIDs ...string) ConversationCache | ||||
| 	DelConversationVersionUserIDs(userIDs ...string) ConversationCache | ||||
| 
 | ||||
| 	FindMaxConversationUserVersion(ctx context.Context, userID string) (*relationtb.VersionLog, error) | ||||
|  | ||||
							
								
								
									
										20
									
								
								pkg/common/storage/cache/redis/conversation.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										20
									
								
								pkg/common/storage/cache/redis/conversation.go
									
									
									
									
										vendored
									
									
								
							| @ -71,6 +71,10 @@ func (c *ConversationRedisCache) getConversationIDsKey(ownerUserID string) strin | ||||
| 	return cachekey.GetConversationIDsKey(ownerUserID) | ||||
| } | ||||
| 
 | ||||
| func (c *ConversationRedisCache) getNotNotifyConversationIDsKey(ownerUserID string) string { | ||||
| 	return cachekey.GetNotNotifyConversationIDsKey(ownerUserID) | ||||
| } | ||||
| 
 | ||||
| func (c *ConversationRedisCache) getSuperGroupRecvNotNotifyUserIDsKey(groupID string) string { | ||||
| 	return cachekey.GetSuperGroupRecvNotNotifyUserIDsKey(groupID) | ||||
| } | ||||
| @ -100,11 +104,17 @@ func (c *ConversationRedisCache) getConversationUserMaxVersionKey(ownerUserID st | ||||
| } | ||||
| 
 | ||||
| func (c *ConversationRedisCache) GetUserConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { | ||||
| 	return getCache(ctx, c.rcClient, c.getConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) { | ||||
| 	return getCache(ctx, c.rcClient, c.getNotNotifyConversationIDsKey(ownerUserID), c.expireTime, func(ctx context.Context) ([]string, error) { | ||||
| 		return c.conversationDB.FindUserIDAllConversationID(ctx, ownerUserID) | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| func (c *ConversationRedisCache) GetUserNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) { | ||||
| 	return getCache(ctx, c.rcClient, c.getConversationIDsKey(userID), c.expireTime, func(ctx context.Context) ([]string, error) { | ||||
| 		return c.conversationDB.FindUserIDAllNotNotifyConversationID(ctx, userID) | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| func (c *ConversationRedisCache) DelConversationIDs(userIDs ...string) cache.ConversationCache { | ||||
| 	keys := make([]string, 0, len(userIDs)) | ||||
| 	for _, userID := range userIDs { | ||||
| @ -242,6 +252,14 @@ func (c *ConversationRedisCache) DelConversationNotReceiveMessageUserIDs(convers | ||||
| 	return cache | ||||
| } | ||||
| 
 | ||||
| func (c *ConversationRedisCache) DelConversationNotNotifyMessageUserIDs(userIDs ...string) cache.ConversationCache { | ||||
| 	cache := c.CloneConversationCache() | ||||
| 	for _, userID := range userIDs { | ||||
| 		cache.AddKeys(c.getNotNotifyConversationIDsKey(userID)) | ||||
| 	} | ||||
| 	return cache | ||||
| } | ||||
| 
 | ||||
| func (c *ConversationRedisCache) DelConversationVersionUserIDs(userIDs ...string) cache.ConversationCache { | ||||
| 	cache := c.CloneConversationCache() | ||||
| 	for _, userID := range userIDs { | ||||
|  | ||||
| @ -69,6 +69,8 @@ type ConversationDatabase interface { | ||||
| 	FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*relationtb.VersionLog, error) | ||||
| 	FindMaxConversationUserVersionCache(ctx context.Context, userID string) (*relationtb.VersionLog, error) | ||||
| 	GetOwnerConversation(ctx context.Context, ownerUserID string, pagination pagination.Pagination) (int64, []*relationtb.Conversation, error) | ||||
| 	// GetNotNotifyConversationIDs gets not notify conversationIDs by userID | ||||
| 	GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) | ||||
| } | ||||
| 
 | ||||
| func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { | ||||
| @ -108,6 +110,7 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, | ||||
| 			} | ||||
| 			if _, ok := fieldMap["recv_msg_opt"]; ok { | ||||
| 				cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) | ||||
| 				cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) | ||||
| 			} | ||||
| 			cache = cache.DelConversationVersionUserIDs(haveUserIDs...) | ||||
| 		} | ||||
| @ -144,6 +147,7 @@ func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, | ||||
| 	cache = cache.DelUsersConversation(conversationID, userIDs...).DelConversationVersionUserIDs(userIDs...) | ||||
| 	if _, ok := args["recv_msg_opt"]; ok { | ||||
| 		cache = cache.DelConversationNotReceiveMessageUserIDs(conversationID) | ||||
| 		cache = cache.DelConversationNotNotifyMessageUserIDs(userIDs...) | ||||
| 	} | ||||
| 	return cache.ChainExecDel(ctx) | ||||
| } | ||||
| @ -152,14 +156,22 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat | ||||
| 	if err := c.conversationDB.Create(ctx, conversations); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	var userIDs []string | ||||
| 	var ( | ||||
| 		userIDs          []string | ||||
| 		notNotifyUserIDs []string | ||||
| 	) | ||||
| 
 | ||||
| 	cache := c.cache.CloneConversationCache() | ||||
| 	for _, conversation := range conversations { | ||||
| 		cache = cache.DelConversations(conversation.OwnerUserID, conversation.ConversationID) | ||||
| 		cache = cache.DelConversationNotReceiveMessageUserIDs(conversation.ConversationID) | ||||
| 		userIDs = append(userIDs, conversation.OwnerUserID) | ||||
| 		if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage { | ||||
| 			notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID) | ||||
| 		} | ||||
| 	} | ||||
| 	return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...).ChainExecDel(ctx) | ||||
| 	return cache.DelConversationIDs(userIDs...).DelUserConversationIDsHash(userIDs...).DelConversationVersionUserIDs(userIDs...). | ||||
| 		DelConversationNotNotifyMessageUserIDs(notNotifyUserIDs...).ChainExecDel(ctx) | ||||
| } | ||||
| 
 | ||||
| func (c *conversationDatabase) SyncPeerUserPrivateConversationTx(ctx context.Context, conversations []*relationtb.Conversation) error { | ||||
| @ -212,7 +224,8 @@ func (c *conversationDatabase) GetUserAllConversation(ctx context.Context, owner | ||||
| func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUserID string, conversations []*relationtb.Conversation) error { | ||||
| 	return c.tx.Transaction(ctx, func(ctx context.Context) error { | ||||
| 		cache := c.cache.CloneConversationCache() | ||||
| 		cache = cache.DelConversationVersionUserIDs(ownerUserID) | ||||
| 		cache = cache.DelConversationVersionUserIDs(ownerUserID).DelConversationNotNotifyMessageUserIDs(ownerUserID) | ||||
| 
 | ||||
| 		groupIDs := datautil.Distinct(datautil.Filter(conversations, func(e *relationtb.Conversation) (string, bool) { | ||||
| 			return e.GroupID, e.GroupID != "" | ||||
| 		})) | ||||
| @ -353,3 +366,11 @@ func (c *conversationDatabase) GetOwnerConversation(ctx context.Context, ownerUs | ||||
| 	} | ||||
| 	return int64(len(conversationIDs)), conversations, nil | ||||
| } | ||||
| 
 | ||||
| func (c *conversationDatabase) GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) { | ||||
| 	conversationIDs, err := c.cache.GetUserNotNotifyConversationIDs(ctx, userID) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return conversationIDs, nil | ||||
| } | ||||
|  | ||||
| @ -27,6 +27,7 @@ type Conversation interface { | ||||
| 	Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) | ||||
| 	FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) | ||||
| 	FindUserIDAllConversationID(ctx context.Context, userID string) ([]string, error) | ||||
| 	FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error) | ||||
| 	Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error) | ||||
| 	FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error) | ||||
| 	FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*model.Conversation, err error) | ||||
|  | ||||
| @ -124,6 +124,13 @@ func (c *ConversationMgo) FindUserIDAllConversationID(ctx context.Context, userI | ||||
| 	return mongoutil.Find[string](ctx, c.coll, bson.M{"owner_user_id": userID}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1})) | ||||
| } | ||||
| 
 | ||||
| func (c *ConversationMgo) FindUserIDAllNotNotifyConversationID(ctx context.Context, userID string) ([]string, error) { | ||||
| 	return mongoutil.Find[string](ctx, c.coll, bson.M{ | ||||
| 		"owner_user_id": userID, | ||||
| 		"recv_msg_opt":  constant.ReceiveNotNotifyMessage, | ||||
| 	}, options.Find().SetProjection(bson.M{"_id": 0, "conversation_id": 1})) | ||||
| } | ||||
| 
 | ||||
| func (c *ConversationMgo) Take(ctx context.Context, userID, conversationID string) (conversation *model.Conversation, err error) { | ||||
| 	return mongoutil.FindOne[*model.Conversation](ctx, c.coll, bson.M{"owner_user_id": userID, "conversation_id": conversationID}) | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user