mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 05:02:11 +08:00 
			
		
		
		
	feat: add a function for business info change to update related conve… (#3225)
* feat: add a function for business info change to update related conversation's ex info. * feat: add a function for business info change to update related conversation's ex info. * feat: add a function for business info change to update related conversation's ex info. * feat: add a function for business info change to update related conversation's ex info. (cherry picked from commit 11044eac586ee2bec8870cc638cb0688631cfdb5)
This commit is contained in:
		
							parent
							
								
									4c16294f67
								
							
						
					
					
						commit
						388a945cc5
					
				| @ -16,6 +16,7 @@ package api | |||||||
| 
 | 
 | ||||||
| import ( | import ( | ||||||
| 	"github.com/gin-gonic/gin" | 	"github.com/gin-gonic/gin" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/protocol/conversation" | 	"github.com/openimsdk/protocol/conversation" | ||||||
| 	"github.com/openimsdk/tools/a2r" | 	"github.com/openimsdk/tools/a2r" | ||||||
| ) | ) | ||||||
| @ -71,3 +72,7 @@ func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) { | |||||||
| func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { | func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { | ||||||
| 	a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client) | 	a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client) | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) { | ||||||
|  | 	a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client) | ||||||
|  | } | ||||||
|  | |||||||
| @ -9,6 +9,8 @@ import ( | |||||||
| 	"github.com/gin-gonic/gin" | 	"github.com/gin-gonic/gin" | ||||||
| 	"github.com/gin-gonic/gin/binding" | 	"github.com/gin-gonic/gin/binding" | ||||||
| 	"github.com/go-playground/validator/v10" | 	"github.com/go-playground/validator/v10" | ||||||
|  | 	clientv3 "go.etcd.io/etcd/client/v3" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/open-im-server/v3/internal/api/jssdk" | 	"github.com/openimsdk/open-im-server/v3/internal/api/jssdk" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/authverify" | 	"github.com/openimsdk/open-im-server/v3/pkg/authverify" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||||
| @ -28,7 +30,6 @@ import ( | |||||||
| 	"github.com/openimsdk/tools/discovery/etcd" | 	"github.com/openimsdk/tools/discovery/etcd" | ||||||
| 	"github.com/openimsdk/tools/log" | 	"github.com/openimsdk/tools/log" | ||||||
| 	"github.com/openimsdk/tools/mw" | 	"github.com/openimsdk/tools/mw" | ||||||
| 	clientv3 "go.etcd.io/etcd/client/v3" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| const ( | const ( | ||||||
| @ -268,6 +269,7 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin | |||||||
| 		conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) | 		conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) | ||||||
| 		conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) | 		conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) | ||||||
| 		conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) | 		conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) | ||||||
|  | 		conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	{ | 	{ | ||||||
|  | |||||||
| @ -46,6 +46,9 @@ type ConversationDatabase interface { | |||||||
| 	// SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is | 	// SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is | ||||||
| 	// transactional. | 	// transactional. | ||||||
| 	SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error | 	SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error | ||||||
|  | 	// UpdateUserConversations updates all conversations related to a specified user. | ||||||
|  | 	// This function does NOT update the user's own conversations but rather the conversations where this user is involved (e.g., other users' conversations referencing this user). | ||||||
|  | 	UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error | ||||||
| 	// CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs. | 	// CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs. | ||||||
| 	CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string, conversations *relationtb.Conversation) error | 	CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string, conversations *relationtb.Conversation) error | ||||||
| 	// GetConversationIDs retrieves conversation IDs for a given user. | 	// GetConversationIDs retrieves conversation IDs for a given user. | ||||||
| @ -145,6 +148,18 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, | |||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (c *conversationDatabase) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error { | ||||||
|  | 	conversations, err := c.conversationDB.UpdateUserConversations(ctx, userID, args) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	cache := c.cache.CloneConversationCache() | ||||||
|  | 	for _, conversation := range conversations { | ||||||
|  | 		cache = cache.DelUsersConversation(conversation.ConversationID, conversation.OwnerUserID).DelConversationVersionUserIDs(conversation.OwnerUserID) | ||||||
|  | 	} | ||||||
|  | 	return cache.ChainExecDel(ctx) | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error { | func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error { | ||||||
| 	_, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args) | 	_, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  | |||||||
| @ -24,6 +24,7 @@ import ( | |||||||
| type Conversation interface { | type Conversation interface { | ||||||
| 	Create(ctx context.Context, conversations []*model.Conversation) (err error) | 	Create(ctx context.Context, conversations []*model.Conversation) (err error) | ||||||
| 	UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) | 	UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) | ||||||
|  | 	UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) | ||||||
| 	Update(ctx context.Context, conversation *model.Conversation) (err error) | 	Update(ctx context.Context, conversation *model.Conversation) (err error) | ||||||
| 	Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) | 	Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) | ||||||
| 	FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) | 	FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) | ||||||
|  | |||||||
| @ -21,23 +21,32 @@ import ( | |||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||||
| 
 | 
 | ||||||
|  | 	"go.mongodb.org/mongo-driver/bson" | ||||||
|  | 	"go.mongodb.org/mongo-driver/mongo" | ||||||
|  | 	"go.mongodb.org/mongo-driver/mongo/options" | ||||||
|  | 
 | ||||||
| 	"github.com/openimsdk/protocol/constant" | 	"github.com/openimsdk/protocol/constant" | ||||||
| 	"github.com/openimsdk/tools/db/mongoutil" | 	"github.com/openimsdk/tools/db/mongoutil" | ||||||
| 	"github.com/openimsdk/tools/db/pagination" | 	"github.com/openimsdk/tools/db/pagination" | ||||||
| 	"github.com/openimsdk/tools/errs" | 	"github.com/openimsdk/tools/errs" | ||||||
| 	"go.mongodb.org/mongo-driver/bson" |  | ||||||
| 	"go.mongodb.org/mongo-driver/mongo" |  | ||||||
| 	"go.mongodb.org/mongo-driver/mongo/options" |  | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { | func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { | ||||||
| 	coll := db.Collection(database.ConversationName) | 	coll := db.Collection(database.ConversationName) | ||||||
| 	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ | 	_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ | ||||||
| 		Keys: bson.D{ | 		{ | ||||||
| 			{Key: "owner_user_id", Value: 1}, | 			Keys: bson.D{ | ||||||
| 			{Key: "conversation_id", Value: 1}, | 				{Key: "owner_user_id", Value: 1}, | ||||||
|  | 				{Key: "conversation_id", Value: 1}, | ||||||
|  | 			}, | ||||||
|  | 			Options: options.Index().SetUnique(true), | ||||||
|  | 		}, | ||||||
|  | 		{ | ||||||
|  | 			Keys: bson.D{ | ||||||
|  | 				{Key: "user_id", Value: 1}, | ||||||
|  | 			}, | ||||||
|  | 			Options: options.Index(), | ||||||
| 		}, | 		}, | ||||||
| 		Options: options.Index().SetUnique(true), |  | ||||||
| 	}) | 	}) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, errs.Wrap(err) | 		return nil, errs.Wrap(err) | ||||||
| @ -101,6 +110,38 @@ func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, con | |||||||
| 	return rows, nil | 	return rows, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | func (c *ConversationMgo) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) { | ||||||
|  | 	if len(args) == 0 { | ||||||
|  | 		return nil, nil | ||||||
|  | 	} | ||||||
|  | 	filter := bson.M{ | ||||||
|  | 		"user_id": userID, | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	conversations, err := mongoutil.Find[*model.Conversation](ctx, c.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1, "conversation_id": 1})) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	err = mongoutil.IncrVersion(func() error { | ||||||
|  | 		_, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args}) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  | 		return nil | ||||||
|  | 	}, func() error { | ||||||
|  | 		for _, conversation := range conversations { | ||||||
|  | 			if err := c.version.IncrVersion(ctx, conversation.OwnerUserID, []string{conversation.ConversationID}, model.VersionStateUpdate); err != nil { | ||||||
|  | 				return err | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		return nil | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return conversations, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
| func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) { | func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) { | ||||||
| 	return mongoutil.IncrVersion(func() error { | 	return mongoutil.IncrVersion(func() error { | ||||||
| 		return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true) | 		return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true) | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user