mirror of
				https://github.com/openimsdk/open-im-server.git
				synced 2025-10-26 05:02:11 +08:00 
			
		
		
		
	feat: add a function for business info change to update related conversation's ex info.
This commit is contained in:
		
							parent
							
								
									e76e02fdba
								
							
						
					
					
						commit
						d2008e3e5c
					
				
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @ -12,7 +12,7 @@ require ( | ||||
| 	github.com/gorilla/websocket v1.5.1 | ||||
| 	github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 | ||||
| 	github.com/mitchellh/mapstructure v1.5.0 | ||||
| 	github.com/openimsdk/protocol v0.0.72-alpha.79 | ||||
| 	github.com/openimsdk/protocol v0.0.72-alpha.80 | ||||
| 	github.com/openimsdk/tools v0.0.50-alpha.74 | ||||
| 	github.com/pkg/errors v0.9.1 // indirect | ||||
| 	github.com/prometheus/client_golang v1.18.0 | ||||
|  | ||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							| @ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= | ||||
| github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= | ||||
| github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= | ||||
| github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.79 h1:e46no8WVAsmTzyy405klrdoUiG7u+1ohDsXvQuFng4s= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.79/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.80 h1:30v74YZVF012YKAKTIy7HfOk/W5mILm+HpJtxwPPCPM= | ||||
| github.com/openimsdk/protocol v0.0.72-alpha.80/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= | ||||
| github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= | ||||
| github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= | ||||
|  | ||||
| @ -16,6 +16,7 @@ package api | ||||
| 
 | ||||
| import ( | ||||
| 	"github.com/gin-gonic/gin" | ||||
| 
 | ||||
| 	"github.com/openimsdk/protocol/conversation" | ||||
| 	"github.com/openimsdk/tools/a2r" | ||||
| ) | ||||
| @ -71,3 +72,7 @@ func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) { | ||||
| func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { | ||||
| 	a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client) | ||||
| } | ||||
| 
 | ||||
| func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) { | ||||
| 	a2r.Call(conversation.ConversationClient.UpdateConversationsByUser, o.Client, c) | ||||
| } | ||||
|  | ||||
| @ -9,6 +9,8 @@ import ( | ||||
| 	"github.com/gin-gonic/gin" | ||||
| 	"github.com/gin-gonic/gin/binding" | ||||
| 	"github.com/go-playground/validator/v10" | ||||
| 	clientv3 "go.etcd.io/etcd/client/v3" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/internal/api/jssdk" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" | ||||
| @ -27,7 +29,6 @@ import ( | ||||
| 	"github.com/openimsdk/tools/discovery/etcd" | ||||
| 	"github.com/openimsdk/tools/log" | ||||
| 	"github.com/openimsdk/tools/mw" | ||||
| 	clientv3 "go.etcd.io/etcd/client/v3" | ||||
| ) | ||||
| 
 | ||||
| const ( | ||||
| @ -264,6 +265,7 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin | ||||
| 		conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) | ||||
| 		conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) | ||||
| 		conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) | ||||
| 		conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser) | ||||
| 	} | ||||
| 
 | ||||
| 	{ | ||||
|  | ||||
| @ -22,6 +22,8 @@ import ( | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/dbbuild" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/rpcli" | ||||
| 
 | ||||
| 	"google.golang.org/grpc" | ||||
| 
 | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/config" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/convert" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" | ||||
| @ -39,7 +41,6 @@ import ( | ||||
| 	"github.com/openimsdk/tools/errs" | ||||
| 	"github.com/openimsdk/tools/log" | ||||
| 	"github.com/openimsdk/tools/utils/datautil" | ||||
| 	"google.golang.org/grpc" | ||||
| ) | ||||
| 
 | ||||
| type conversationServer struct { | ||||
| @ -317,6 +318,19 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver | ||||
| 	return &pbconversation.SetConversationsResp{}, nil | ||||
| } | ||||
| 
 | ||||
| func (c *conversationServer) UpdateConversationsByUser(ctx context.Context, req *pbconversation.UpdateConversationsByUserReq) (*pbconversation.UpdateConversationsByUserResp, error) { | ||||
| 	m := make(map[string]any) | ||||
| 	if req.Ex != nil { | ||||
| 		m["ex"] = req.Ex.Value | ||||
| 	} | ||||
| 	if len(m) > 0 { | ||||
| 		if err := c.conversationDatabase.UpdateUserConversations(ctx, req.UserID, m); err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
| 	return &pbconversation.UpdateConversationsByUserResp{}, nil | ||||
| } | ||||
| 
 | ||||
| // Get user IDs with "Do Not Disturb" enabled in super large groups. | ||||
| func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) { | ||||
| 	return nil, errs.New("deprecated") | ||||
|  | ||||
| @ -47,6 +47,9 @@ type ConversationDatabase interface { | ||||
| 	// SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is | ||||
| 	// transactional. | ||||
| 	SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error | ||||
| 	// UpdateUserConversations updates all conversations related to a specified user. | ||||
| 	// This function does NOT update the user's own conversations but rather the conversations where this user is involved (e.g., other users' conversations referencing this user). | ||||
| 	UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error | ||||
| 	// CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs. | ||||
| 	CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error | ||||
| 	// GetConversationIDs retrieves conversation IDs for a given user. | ||||
| @ -146,6 +149,18 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, | ||||
| 	}) | ||||
| } | ||||
| 
 | ||||
| func (c *conversationDatabase) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error { | ||||
| 	conversations, err := c.conversationDB.UpdateUserConversations(ctx, userID, args) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	cache := c.cache.CloneConversationCache() | ||||
| 	for _, conversation := range conversations { | ||||
| 		cache = cache.DelUsersConversation(conversation.ConversationID, conversation.OwnerUserID).DelConversationVersionUserIDs(conversation.OwnerUserID) | ||||
| 	} | ||||
| 	return cache.ChainExecDel(ctx) | ||||
| } | ||||
| 
 | ||||
| func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error { | ||||
| 	_, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args) | ||||
| 	if err != nil { | ||||
|  | ||||
| @ -24,6 +24,7 @@ import ( | ||||
| type Conversation interface { | ||||
| 	Create(ctx context.Context, conversations []*model.Conversation) (err error) | ||||
| 	UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) | ||||
| 	UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) | ||||
| 	Update(ctx context.Context, conversation *model.Conversation) (err error) | ||||
| 	Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) | ||||
| 	FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) | ||||
|  | ||||
| @ -21,23 +21,32 @@ import ( | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" | ||||
| 	"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" | ||||
| 
 | ||||
| 	"go.mongodb.org/mongo-driver/bson" | ||||
| 	"go.mongodb.org/mongo-driver/mongo" | ||||
| 	"go.mongodb.org/mongo-driver/mongo/options" | ||||
| 
 | ||||
| 	"github.com/openimsdk/protocol/constant" | ||||
| 	"github.com/openimsdk/tools/db/mongoutil" | ||||
| 	"github.com/openimsdk/tools/db/pagination" | ||||
| 	"github.com/openimsdk/tools/errs" | ||||
| 	"go.mongodb.org/mongo-driver/bson" | ||||
| 	"go.mongodb.org/mongo-driver/mongo" | ||||
| 	"go.mongodb.org/mongo-driver/mongo/options" | ||||
| ) | ||||
| 
 | ||||
| func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { | ||||
| 	coll := db.Collection(database.ConversationName) | ||||
| 	_, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ | ||||
| 		Keys: bson.D{ | ||||
| 			{Key: "owner_user_id", Value: 1}, | ||||
| 			{Key: "conversation_id", Value: 1}, | ||||
| 	_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ | ||||
| 		{ | ||||
| 			Keys: bson.D{ | ||||
| 				{Key: "owner_user_id", Value: 1}, | ||||
| 				{Key: "conversation_id", Value: 1}, | ||||
| 			}, | ||||
| 			Options: options.Index().SetUnique(true), | ||||
| 		}, | ||||
| 		{ | ||||
| 			Keys: bson.D{ | ||||
| 				{Key: "user_id", Value: 1}, | ||||
| 			}, | ||||
| 			Options: options.Index(), | ||||
| 		}, | ||||
| 		Options: options.Index().SetUnique(true), | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, errs.Wrap(err) | ||||
| @ -101,6 +110,38 @@ func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, con | ||||
| 	return rows, nil | ||||
| } | ||||
| 
 | ||||
| func (c *ConversationMgo) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) { | ||||
| 	if len(args) == 0 { | ||||
| 		return nil, nil | ||||
| 	} | ||||
| 	filter := bson.M{ | ||||
| 		"user_id": userID, | ||||
| 	} | ||||
| 
 | ||||
| 	conversations, err := mongoutil.Find[*model.Conversation](ctx, c.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1, "conversation_id": 1})) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	err = mongoutil.IncrVersion(func() error { | ||||
| 		_, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args}) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		return nil | ||||
| 	}, func() error { | ||||
| 		for _, conversation := range conversations { | ||||
| 			if err := c.version.IncrVersion(ctx, conversation.OwnerUserID, []string{conversation.ConversationID}, model.VersionStateUpdate); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
| 		return nil | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return conversations, nil | ||||
| } | ||||
| 
 | ||||
| func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) { | ||||
| 	return mongoutil.IncrVersion(func() error { | ||||
| 		return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true) | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user