From 23cfc8df4863a54543e2e0a6365a78fd8ba88c7b Mon Sep 17 00:00:00 2001 From: OpenIM-Gordon <1432970085@qq.com> Date: Fri, 21 Mar 2025 15:10:31 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20add=20a=20function=20for=20business=20i?= =?UTF-8?q?nfo=20change=20to=20update=20related=20conve=E2=80=A6=20(#3225)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add a function for business info change to update related conversation's ex info. * feat: add a function for business info change to update related conversation's ex info. * feat: add a function for business info change to update related conversation's ex info. * feat: add a function for business info change to update related conversation's ex info. --- go.mod | 4 +- go.sum | 12 ++-- internal/api/conversation.go | 5 ++ internal/api/router.go | 4 +- internal/rpc/conversation/conversation.go | 16 +++++- internal/rpc/msg/send.go | 3 +- pkg/common/storage/controller/conversation.go | 15 +++++ pkg/common/storage/database/conversation.go | 1 + .../storage/database/mgo/conversation.go | 57 ++++++++++++++++--- 9 files changed, 98 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index e83a9984c..dd7fecf53 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.73-alpha.6 - github.com/openimsdk/tools v0.0.50-alpha.83 + github.com/openimsdk/protocol v0.0.72-alpha.81 + github.com/openimsdk/tools v0.0.50-alpha.74 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 35fc3d4c6..ea5a6e5b7 100644 --- a/go.sum +++ b/go.sum @@ -345,12 +345,12 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.15-alpha.5 h1:eEZCEHm+NsmcO3onXZPIUbGFCYPYbsX5beV3ZyOsGhY= -github.com/openimsdk/gomake v0.0.15-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.73-alpha.6 h1:sna9coWG7HN1zObBPtvG0Ki/vzqHXiB4qKbA5P3w7kc= -github.com/openimsdk/protocol v0.0.73-alpha.6/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= -github.com/openimsdk/tools v0.0.50-alpha.83 h1:7c1D40YGqIWUmGfCII5pduETGC/8c2DyS9SQ4LvoplU= -github.com/openimsdk/tools v0.0.50-alpha.83/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= +github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= +github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/protocol v0.0.72-alpha.81 h1:6tDuZ3Anfi1uhX/V5mWxITqJnGQPnvgeaxeqJlEHIVE= +github.com/openimsdk/protocol v0.0.72-alpha.81/go.mod h1:WF7EuE55vQvpyUAzDXcqg+B+446xQyEba0X35lTINmw= +github.com/openimsdk/tools v0.0.50-alpha.74 h1:yh10SiMiivMEjicEQg+QAsH4pvaO+4noMPdlw+ew0Kc= +github.com/openimsdk/tools v0.0.50-alpha.74/go.mod h1:n2poR3asX1e1XZce4O+MOWAp+X02QJRFvhcLCXZdzRo= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/conversation.go b/internal/api/conversation.go index f7dbc133c..f6eadf15a 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -16,6 +16,7 @@ package api import ( "github.com/gin-gonic/gin" + "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/tools/a2r" ) @@ -71,3 +72,7 @@ func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) { func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client) } + +func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) { + a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client) +} diff --git a/internal/api/router.go b/internal/api/router.go index add8ef36b..8ee7f9feb 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -9,6 +9,8 @@ import ( "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/go-playground/validator/v10" + clientv3 "go.etcd.io/etcd/client/v3" + "github.com/openimsdk/open-im-server/v3/internal/api/jssdk" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -28,7 +30,6 @@ import ( "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mw" - clientv3 "go.etcd.io/etcd/client/v3" ) const ( @@ -268,6 +269,7 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) + conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser) } { diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index cd7839234..b95db9366 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -22,6 +22,8 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "google.golang.org/grpc" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" @@ -39,7 +41,6 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" - "google.golang.org/grpc" ) type conversationServer struct { @@ -396,6 +397,19 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver return &pbconversation.SetConversationsResp{}, nil } +func (c *conversationServer) UpdateConversationsByUser(ctx context.Context, req *pbconversation.UpdateConversationsByUserReq) (*pbconversation.UpdateConversationsByUserResp, error) { + m := make(map[string]any) + if req.Ex != nil { + m["ex"] = req.Ex.Value + } + if len(m) > 0 { + if err := c.conversationDatabase.UpdateUserConversations(ctx, req.UserID, m); err != nil { + return nil, err + } + } + return &pbconversation.UpdateConversationsByUserResp{}, nil +} + // Get user IDs with "Do Not Disturb" enabled in super large groups. func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) { return nil, errs.New("deprecated") diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 2b66f7a9a..54153ec5d 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -17,6 +17,8 @@ package msg import ( "context" + "google.golang.org/protobuf/proto" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" @@ -29,7 +31,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" - "google.golang.org/protobuf/proto" ) func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index d4088e0c0..192af1cc9 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -47,6 +47,9 @@ type ConversationDatabase interface { // SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is // transactional. SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error + // UpdateUserConversations updates all conversations related to a specified user. + // This function does NOT update the user's own conversations but rather the conversations where this user is involved (e.g., other users' conversations referencing this user). + UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error // CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs. CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error // GetConversationIDs retrieves conversation IDs for a given user. @@ -146,6 +149,18 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, }) } +func (c *conversationDatabase) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error { + conversations, err := c.conversationDB.UpdateUserConversations(ctx, userID, args) + if err != nil { + return err + } + cache := c.cache.CloneConversationCache() + for _, conversation := range conversations { + cache = cache.DelUsersConversation(conversation.ConversationID, conversation.OwnerUserID).DelConversationVersionUserIDs(conversation.OwnerUserID) + } + return cache.ChainExecDel(ctx) +} + func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error { _, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args) if err != nil { diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 1fb53cfed..d612dfc2d 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -24,6 +24,7 @@ import ( type Conversation interface { Create(ctx context.Context, conversations []*model.Conversation) (err error) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) + UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) Update(ctx context.Context, conversation *model.Conversation) (err error) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 536827450..89f13ea3d 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -21,23 +21,32 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/errs" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" ) func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { coll := db.Collection(database.ConversationName) - _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ - Keys: bson.D{ - {Key: "owner_user_id", Value: 1}, - {Key: "conversation_id", Value: 1}, + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{ + {Key: "owner_user_id", Value: 1}, + {Key: "conversation_id", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{ + {Key: "user_id", Value: 1}, + }, + Options: options.Index(), }, - Options: options.Index().SetUnique(true), }) if err != nil { return nil, errs.Wrap(err) @@ -101,6 +110,38 @@ func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, con return rows, nil } +func (c *ConversationMgo) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) { + if len(args) == 0 { + return nil, nil + } + filter := bson.M{ + "user_id": userID, + } + + conversations, err := mongoutil.Find[*model.Conversation](ctx, c.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1, "conversation_id": 1})) + if err != nil { + return nil, err + } + err = mongoutil.IncrVersion(func() error { + _, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args}) + if err != nil { + return err + } + return nil + }, func() error { + for _, conversation := range conversations { + if err := c.version.IncrVersion(ctx, conversation.OwnerUserID, []string{conversation.ConversationID}, model.VersionStateUpdate); err != nil { + return err + } + } + return nil + }) + if err != nil { + return nil, err + } + return conversations, nil +} + func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) { return mongoutil.IncrVersion(func() error { return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true)