From 505ce8aad193b592f94e2cdbe6aeb9c4b9eb9a04 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Tue, 20 May 2025 11:30:00 +0800 Subject: [PATCH 1/4] feat: add rpc interface permission check (#3366) * pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * fix * fix * optimize log output * feat: support GetLastMessage * feat: support GetLastMessage * feat: s3 switch * feat: s3 switch * fix: GetUsersOnline * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: SendBusinessNotification supported configuration parameters * feat: seq conversion failed without exiting * fix: DeleteDoc crash * fix: fill send time * fix: fill send time * fix: crash caused by withdrawing messages from users who have left the group * fix: user msg timestamp * seq read config * seq read config * fix: the source message of the reference is withdrawn, and the referenced message is deleted * feat: optimize the default notification.yml * fix: shouldPushOffline * fix: the sorting is wrong after canceling the administrator in group settings * feat: Sending messages supports returning fields modified by webhook * feat: Sending messages supports returning fields modified by webhook * feat: Sending messages supports returning fields modified by webhook * fix: oss specifies content-type when uploading * fix: the version number contains a line break * fix: the version number contains a line break * feat: GetConversationsHasReadAndMaxSeq support pinned * feat: GetConversationsHasReadAndMaxSeq support pinned * feat: GetConversationsHasReadAndMaxSeq support pinned * fix: transferring the group owner to a muted member, incremental version error * feat: unified conversion code * feat: update gomake * fix: in standalone mode, the user online status is wrong * fix: add permission check * fix: add permission check (cherry picked from commit 748d783d36fc3f1e4eacaee64203b10e8e2b8744) # Conflicts: # internal/rpc/conversation/conversation.go # internal/rpc/group/cache.go # internal/rpc/group/statistics.go # internal/rpc/msg/send.go --- internal/api/conversation.go | 6 +- internal/api/router.go | 2 +- internal/rpc/conversation/callback.go | 117 ++++++++++ internal/rpc/conversation/conversation.go | 250 +++++++++++----------- internal/rpc/conversation/db_map.go | 85 ++++++++ internal/rpc/conversation/notification.go | 2 +- internal/rpc/conversation/sync.go | 3 + internal/rpc/group/cache.go | 12 +- internal/rpc/group/group.go | 48 ++++- internal/rpc/group/statistics.go | 12 +- internal/rpc/group/sync.go | 15 +- internal/rpc/msg/as_read.go | 15 +- internal/rpc/msg/delete.go | 3 + internal/rpc/msg/send.go | 19 +- internal/rpc/msg/seq.go | 3 +- internal/rpc/msg/statistics.go | 10 +- internal/rpc/msg/sync_msg.go | 3 + internal/rpc/relation/black.go | 3 + internal/rpc/relation/friend.go | 28 ++- internal/rpc/third/log.go | 3 +- internal/rpc/third/third.go | 4 + pkg/authverify/token.go | 49 ++++- pkg/tools/batcher/batcher.go | 4 +- 23 files changed, 516 insertions(+), 180 deletions(-) create mode 100644 internal/rpc/conversation/callback.go create mode 100644 internal/rpc/conversation/db_map.go diff --git a/internal/api/conversation.go b/internal/api/conversation.go index f7dbc133c..39d9859f0 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -48,9 +48,9 @@ func (o *ConversationApi) SetConversations(c *gin.Context) { a2r.Call(c, conversation.ConversationClient.SetConversations, o.Client) } -func (o *ConversationApi) GetConversationOfflinePushUserIDs(c *gin.Context) { - a2r.Call(c, conversation.ConversationClient.GetConversationOfflinePushUserIDs, o.Client) -} +//func (o *ConversationApi) GetConversationOfflinePushUserIDs(c *gin.Context) { +// a2r.Call(c, conversation.ConversationClient.GetConversationOfflinePushUserIDs, o.Client) +//} func (o *ConversationApi) GetFullOwnerConversationIDs(c *gin.Context) { a2r.Call(c, conversation.ConversationClient.GetFullOwnerConversationIDs, o.Client) diff --git a/internal/api/router.go b/internal/api/router.go index add8ef36b..700d8392e 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -262,7 +262,7 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin conversationGroup.POST("/get_conversation", c.GetConversation) conversationGroup.POST("/get_conversations", c.GetConversations) conversationGroup.POST("/set_conversations", c.SetConversations) - conversationGroup.POST("/get_conversation_offline_push_user_ids", c.GetConversationOfflinePushUserIDs) + //conversationGroup.POST("/get_conversation_offline_push_user_ids", c.GetConversationOfflinePushUserIDs) conversationGroup.POST("/get_full_conversation_ids", c.GetFullOwnerConversationIDs) conversationGroup.POST("/get_incremental_conversations", c.GetIncrementalConversation) conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) diff --git a/internal/rpc/conversation/callback.go b/internal/rpc/conversation/callback.go new file mode 100644 index 000000000..93e925afd --- /dev/null +++ b/internal/rpc/conversation/callback.go @@ -0,0 +1,117 @@ +package conversation + +import ( + "context" + + "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/tools/utils/datautil" +) + +func (c *conversationServer) webhookBeforeCreateSingleChatConversations(ctx context.Context, before *config.BeforeConfig, req *dbModel.Conversation) error { + return webhook.WithCondition(ctx, before, func(ctx context.Context) error { + cbReq := &callbackstruct.CallbackBeforeCreateSingleChatConversationsReq{ + CallbackCommand: callbackstruct.CallbackBeforeCreateSingleChatConversationsCommand, + OwnerUserID: req.OwnerUserID, + ConversationID: req.ConversationID, + ConversationType: req.ConversationType, + UserID: req.UserID, + RecvMsgOpt: req.RecvMsgOpt, + IsPinned: req.IsPinned, + IsPrivateChat: req.IsPrivateChat, + BurnDuration: req.BurnDuration, + GroupAtType: req.GroupAtType, + AttachedInfo: req.AttachedInfo, + Ex: req.Ex, + } + + resp := &callbackstruct.CallbackBeforeCreateSingleChatConversationsResp{} + + if err := c.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { + return err + } + + datautil.NotNilReplace(&req.RecvMsgOpt, resp.RecvMsgOpt) + datautil.NotNilReplace(&req.IsPinned, resp.IsPinned) + datautil.NotNilReplace(&req.IsPrivateChat, resp.IsPrivateChat) + datautil.NotNilReplace(&req.BurnDuration, resp.BurnDuration) + datautil.NotNilReplace(&req.GroupAtType, resp.GroupAtType) + datautil.NotNilReplace(&req.AttachedInfo, resp.AttachedInfo) + datautil.NotNilReplace(&req.Ex, resp.Ex) + return nil + }) +} + +func (c *conversationServer) webhookAfterCreateSingleChatConversations(ctx context.Context, after *config.AfterConfig, req *dbModel.Conversation) error { + cbReq := &callbackstruct.CallbackAfterCreateSingleChatConversationsReq{ + CallbackCommand: callbackstruct.CallbackAfterCreateSingleChatConversationsCommand, + OwnerUserID: req.OwnerUserID, + ConversationID: req.ConversationID, + ConversationType: req.ConversationType, + UserID: req.UserID, + RecvMsgOpt: req.RecvMsgOpt, + IsPinned: req.IsPinned, + IsPrivateChat: req.IsPrivateChat, + BurnDuration: req.BurnDuration, + GroupAtType: req.GroupAtType, + AttachedInfo: req.AttachedInfo, + Ex: req.Ex, + } + + c.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateSingleChatConversationsResp{}, after) + return nil +} + +func (c *conversationServer) webhookBeforeCreateGroupChatConversations(ctx context.Context, before *config.BeforeConfig, req *dbModel.Conversation) error { + return webhook.WithCondition(ctx, before, func(ctx context.Context) error { + cbReq := &callbackstruct.CallbackBeforeCreateGroupChatConversationsReq{ + CallbackCommand: callbackstruct.CallbackBeforeCreateGroupChatConversationsCommand, + ConversationID: req.ConversationID, + ConversationType: req.ConversationType, + GroupID: req.GroupID, + RecvMsgOpt: req.RecvMsgOpt, + IsPinned: req.IsPinned, + IsPrivateChat: req.IsPrivateChat, + BurnDuration: req.BurnDuration, + GroupAtType: req.GroupAtType, + AttachedInfo: req.AttachedInfo, + Ex: req.Ex, + } + + resp := &callbackstruct.CallbackBeforeCreateGroupChatConversationsResp{} + + if err := c.webhookClient.SyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, resp, before); err != nil { + return err + } + + datautil.NotNilReplace(&req.RecvMsgOpt, resp.RecvMsgOpt) + datautil.NotNilReplace(&req.IsPinned, resp.IsPinned) + datautil.NotNilReplace(&req.IsPrivateChat, resp.IsPrivateChat) + datautil.NotNilReplace(&req.BurnDuration, resp.BurnDuration) + datautil.NotNilReplace(&req.GroupAtType, resp.GroupAtType) + datautil.NotNilReplace(&req.AttachedInfo, resp.AttachedInfo) + datautil.NotNilReplace(&req.Ex, resp.Ex) + return nil + }) +} + +func (c *conversationServer) webhookAfterCreateGroupChatConversations(ctx context.Context, after *config.AfterConfig, req *dbModel.Conversation) error { + cbReq := &callbackstruct.CallbackAfterCreateGroupChatConversationsReq{ + CallbackCommand: callbackstruct.CallbackAfterCreateGroupChatConversationsCommand, + ConversationID: req.ConversationID, + ConversationType: req.ConversationType, + GroupID: req.GroupID, + RecvMsgOpt: req.RecvMsgOpt, + IsPinned: req.IsPinned, + IsPrivateChat: req.IsPrivateChat, + BurnDuration: req.BurnDuration, + GroupAtType: req.GroupAtType, + AttachedInfo: req.AttachedInfo, + Ex: req.Ex, + } + + c.webhookClient.AsyncPost(ctx, cbReq.GetCallbackCommand(), cbReq, &callbackstruct.CallbackAfterCreateGroupChatConversationsResp{}, after) + return nil +} diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index cd7839234..b0b1053ed 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -19,9 +19,12 @@ import ( "sort" "time" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "google.golang.org/grpc" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" @@ -30,6 +33,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" @@ -39,7 +43,6 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" - "google.golang.org/grpc" ) type conversationServer struct { @@ -49,9 +52,10 @@ type conversationServer struct { conversationNotificationSender *ConversationNotificationSender config *Config - userClient *rpcli.UserClient - msgClient *rpcli.MsgClient - groupClient *rpcli.GroupClient + webhookClient *webhook.Client + userClient *rpcli.UserClient + msgClient *rpcli.MsgClient + groupClient *rpcli.GroupClient } type Config struct { @@ -60,6 +64,7 @@ type Config struct { MongodbConfig config.Mongo NotificationConfig config.Notification Share config.Share + WebhooksConfig config.Webhooks LocalCacheConfig config.LocalCache Discovery config.Discovery } @@ -90,20 +95,32 @@ func Start(ctx context.Context, config *Config, client discovery.Conn, server gr if err != nil { return err } + msgClient := rpcli.NewMsgClient(msgConn) + + cs := conversationServer{ + config: config, + webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL), + userClient: rpcli.NewUserClient(userConn), + groupClient: rpcli.NewGroupClient(groupConn), + msgClient: msgClient, + } + + cs.conversationNotificationSender = NewConversationNotificationSender(&config.NotificationConfig, msgClient) + cs.conversationDatabase = controller.NewConversationDatabase( + conversationDB, + redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB), + mgocli.GetTx()) + localcache.InitLocalCache(&config.LocalCacheConfig) - pbconversation.RegisterConversationServer(server, &conversationServer{ - conversationNotificationSender: NewConversationNotificationSender(&config.NotificationConfig, msgClient), - conversationDatabase: controller.NewConversationDatabase(conversationDB, - redis.NewConversationRedis(rdb, &config.LocalCacheConfig, conversationDB), mgocli.GetTx()), - userClient: rpcli.NewUserClient(userConn), - groupClient: rpcli.NewGroupClient(groupConn), - msgClient: msgClient, - }) + pbconversation.RegisterConversationServer(server, &cs) return nil } func (c *conversationServer) GetConversation(ctx context.Context, req *pbconversation.GetConversationReq) (*pbconversation.GetConversationResp, error) { + if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { + return nil, err + } conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID}) if err != nil { return nil, err @@ -117,7 +134,9 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbconvers } func (c *conversationServer) GetSortedConversationList(ctx context.Context, req *pbconversation.GetSortedConversationListReq) (resp *pbconversation.GetSortedConversationListResp, err error) { - log.ZDebug(ctx, "GetSortedConversationList", "seqs", req, "userID", req.UserID) + if err := authverify.CheckAccess(ctx, req.UserID); err != nil { + return nil, err + } var conversationIDs []string if len(req.ConversationIDs) == 0 { conversationIDs, err = c.conversationDatabase.GetConversationIDs(ctx, req.UserID) @@ -190,6 +209,9 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req } func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbconversation.GetAllConversationsReq) (*pbconversation.GetAllConversationsResp, error) { + if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { + return nil, err + } conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.OwnerUserID) if err != nil { return nil, err @@ -200,6 +222,9 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbcon } func (c *conversationServer) GetConversations(ctx context.Context, req *pbconversation.GetConversationsReq) (*pbconversation.GetConversationsResp, error) { + if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { + return nil, err + } conversations, err := c.getConversations(ctx, req.OwnerUserID, req.ConversationIDs) if err != nil { return nil, err @@ -220,6 +245,9 @@ func (c *conversationServer) getConversations(ctx context.Context, ownerUserID s } func (c *conversationServer) SetConversation(ctx context.Context, req *pbconversation.SetConversationReq) (*pbconversation.SetConversationResp, error) { + if err := authverify.CheckAccess(ctx, req.GetConversation().GetUserID()); err != nil { + return nil, err + } var conversation dbModel.Conversation if err := datautil.CopyStructFields(&conversation, req.Conversation); err != nil { return nil, err @@ -234,8 +262,10 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbconvers } func (c *conversationServer) SetConversations(ctx context.Context, req *pbconversation.SetConversationsReq) (*pbconversation.SetConversationsResp, error) { - if req.Conversation == nil { - return nil, errs.ErrArgs.WrapMsg("conversation must not be nil") + for _, userID := range req.UserIDs { + if err := authverify.CheckAccess(ctx, userID); err != nil { + return nil, err + } } if req.Conversation.ConversationType == constant.WriteGroupChatType { groupInfo, err := c.groupClient.GetGroupInfo(ctx, req.Conversation.GroupID) @@ -271,109 +301,29 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver conversation.UserID = req.Conversation.UserID conversation.GroupID = req.Conversation.GroupID - m := make(map[string]any) - - setConversationFieldsFunc := func() { - if req.Conversation.RecvMsgOpt != nil { - conversation.RecvMsgOpt = req.Conversation.RecvMsgOpt.Value - m["recv_msg_opt"] = req.Conversation.RecvMsgOpt.Value - } - if req.Conversation.AttachedInfo != nil { - conversation.AttachedInfo = req.Conversation.AttachedInfo.Value - m["attached_info"] = req.Conversation.AttachedInfo.Value - } - if req.Conversation.Ex != nil { - conversation.Ex = req.Conversation.Ex.Value - m["ex"] = req.Conversation.Ex.Value - } - if req.Conversation.IsPinned != nil { - conversation.IsPinned = req.Conversation.IsPinned.Value - m["is_pinned"] = req.Conversation.IsPinned.Value - } - if req.Conversation.GroupAtType != nil { - conversation.GroupAtType = req.Conversation.GroupAtType.Value - m["group_at_type"] = req.Conversation.GroupAtType.Value - } - if req.Conversation.MsgDestructTime != nil { - conversation.MsgDestructTime = req.Conversation.MsgDestructTime.Value - m["msg_destruct_time"] = req.Conversation.MsgDestructTime.Value - } - if req.Conversation.IsMsgDestruct != nil { - conversation.IsMsgDestruct = req.Conversation.IsMsgDestruct.Value - m["is_msg_destruct"] = req.Conversation.IsMsgDestruct.Value - } - if req.Conversation.BurnDuration != nil { - conversation.BurnDuration = req.Conversation.BurnDuration.Value - m["burn_duration"] = req.Conversation.BurnDuration.Value - } + m, conversation, err := UpdateConversationsMap(ctx, req) + if err != nil { + return nil, err } - // set need set field in conversation - setConversationFieldsFunc() - for userID := range conversationMap { - unequal := len(m) + unequal := UserUpdateCheckMap(ctx, userID, req.Conversation, conversationMap[userID]) - if req.Conversation.RecvMsgOpt != nil { - if req.Conversation.RecvMsgOpt.Value == conversationMap[userID].RecvMsgOpt { - unequal-- - } - } - - if req.Conversation.AttachedInfo != nil { - if req.Conversation.AttachedInfo.Value == conversationMap[userID].AttachedInfo { - unequal-- - } - } - - if req.Conversation.Ex != nil { - if req.Conversation.Ex.Value == conversationMap[userID].Ex { - unequal-- - } - } - if req.Conversation.IsPinned != nil { - if req.Conversation.IsPinned.Value == conversationMap[userID].IsPinned { - unequal-- - } - } - - if req.Conversation.GroupAtType != nil { - if req.Conversation.GroupAtType.Value == conversationMap[userID].GroupAtType { - unequal-- - } - } - - if req.Conversation.MsgDestructTime != nil { - if req.Conversation.MsgDestructTime.Value == conversationMap[userID].MsgDestructTime { - unequal-- - } - } - - if req.Conversation.IsMsgDestruct != nil { - if req.Conversation.IsMsgDestruct.Value == conversationMap[userID].IsMsgDestruct { - unequal-- - } - } - - if req.Conversation.BurnDuration != nil { - if req.Conversation.BurnDuration.Value == conversationMap[userID].BurnDuration { - unequal-- - } - } - - if unequal > 0 { + if unequal { needUpdateUsersList = append(needUpdateUsersList, userID) } } + if len(m) != 0 && len(needUpdateUsersList) != 0 { if err := c.conversationDatabase.SetUsersConversationFieldTx(ctx, needUpdateUsersList, &conversation, m); err != nil { return nil, err } - for _, v := range needUpdateUsersList { - c.conversationNotificationSender.ConversationChangeNotification(ctx, v, []string{req.Conversation.ConversationID}) + for _, userID := range needUpdateUsersList { + c.conversationNotificationSender.ConversationChangeNotification(ctx, userID, []string{req.Conversation.ConversationID}) } } + if req.Conversation.IsPrivateChat != nil && req.Conversation.ConversationType != constant.ReadGroupChatType { var conversations []*dbModel.Conversation for _, ownerUserID := range req.UserIDs { @@ -396,58 +346,94 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver return &pbconversation.SetConversationsResp{}, nil } -// Get user IDs with "Do Not Disturb" enabled in super large groups. -func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbconversation.GetRecvMsgNotNotifyUserIDsReq) (*pbconversation.GetRecvMsgNotNotifyUserIDsResp, error) { - return nil, errs.New("deprecated") +func (c *conversationServer) UpdateConversationsByUser(ctx context.Context, req *pbconversation.UpdateConversationsByUserReq) (*pbconversation.UpdateConversationsByUserResp, error) { + if err := authverify.CheckAccess(ctx, req.UserID); err != nil { + return nil, err + } + m := make(map[string]any) + if req.Ex != nil { + m["ex"] = req.Ex.Value + } + if len(m) > 0 { + if err := c.conversationDatabase.UpdateUserConversations(ctx, req.UserID, m); err != nil { + return nil, err + } + } + return &pbconversation.UpdateConversationsByUserResp{}, nil } // create conversation without notification for msg redis transfer. -func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, - req *pbconversation.CreateSingleChatConversationsReq, -) (*pbconversation.CreateSingleChatConversationsResp, error) { +func (c *conversationServer) CreateSingleChatConversations(ctx context.Context, req *pbconversation.CreateSingleChatConversationsReq) (*pbconversation.CreateSingleChatConversationsResp, error) { + var conversation dbModel.Conversation switch req.ConversationType { case constant.SingleChatType: - var conversation dbModel.Conversation + // sendUser create conversation.ConversationID = req.ConversationID conversation.ConversationType = req.ConversationType conversation.OwnerUserID = req.SendID conversation.UserID = req.RecvID + if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue { + return nil, err + } + err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation}) if err != nil { log.ZWarn(ctx, "create conversation failed", err, "conversation", conversation) } + c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation) + + // recvUser create conversation2 := conversation conversation2.OwnerUserID = req.RecvID conversation2.UserID = req.SendID + if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue { + return nil, err + } + err = c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation2}) if err != nil { log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation) } + + c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation2) case constant.NotificationChatType: - var conversation dbModel.Conversation conversation.ConversationID = req.ConversationID conversation.ConversationType = req.ConversationType conversation.OwnerUserID = req.RecvID conversation.UserID = req.SendID + if err := c.webhookBeforeCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateSingleChatConversations, &conversation); err != nil && err != servererrs.ErrCallbackContinue { + return nil, err + } + err := c.conversationDatabase.CreateConversation(ctx, []*dbModel.Conversation{&conversation}) if err != nil { log.ZWarn(ctx, "create conversation failed", err, "conversation2", conversation) } + + c.webhookAfterCreateSingleChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateSingleChatConversations, &conversation) } return &pbconversation.CreateSingleChatConversationsResp{}, nil } func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, req *pbconversation.CreateGroupChatConversationsReq) (*pbconversation.CreateGroupChatConversationsResp, error) { - err := c.conversationDatabase.CreateGroupChatConversation(ctx, req.GroupID, req.UserIDs) + var conversation dbModel.Conversation + + conversation.ConversationID = msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID) + conversation.GroupID = req.GroupID + conversation.ConversationType = constant.ReadGroupChatType + + if err := c.webhookBeforeCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.BeforeCreateGroupChatConversations, &conversation); err != nil { + return nil, err + } + + err := c.conversationDatabase.CreateGroupChatConversation(ctx, req.GroupID, req.UserIDs, &conversation) if err != nil { return nil, err } - conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID) - if err := c.msgClient.SetUserConversationMaxSeq(ctx, conversationID, req.UserIDs, 0); err != nil { - return nil, err - } + + c.webhookAfterCreateGroupChatConversations(ctx, &c.config.WebhooksConfig.AfterCreateGroupChatConversations, &conversation) return &pbconversation.CreateGroupChatConversationsResp{}, nil } @@ -480,6 +466,9 @@ func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbc } func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbconversation.GetConversationIDsReq) (*pbconversation.GetConversationIDsResp, error) { + if err := authverify.CheckAccess(ctx, req.UserID); err != nil { + return nil, err + } conversationIDs, err := c.conversationDatabase.GetConversationIDs(ctx, req.UserID) if err != nil { return nil, err @@ -488,6 +477,9 @@ func (c *conversationServer) GetConversationIDs(ctx context.Context, req *pbconv } func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req *pbconversation.GetUserConversationIDsHashReq) (*pbconversation.GetUserConversationIDsHashResp, error) { + if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { + return nil, err + } hash, err := c.conversationDatabase.GetUserConversationIDsHash(ctx, req.OwnerUserID) if err != nil { return nil, err @@ -495,10 +487,7 @@ func (c *conversationServer) GetUserConversationIDsHash(ctx context.Context, req return &pbconversation.GetUserConversationIDsHashResp{Hash: hash}, nil } -func (c *conversationServer) GetConversationsByConversationID( - ctx context.Context, - req *pbconversation.GetConversationsByConversationIDReq, -) (*pbconversation.GetConversationsByConversationIDResp, error) { +func (c *conversationServer) GetConversationsByConversationID(ctx context.Context, req *pbconversation.GetConversationsByConversationIDReq) (*pbconversation.GetConversationsByConversationIDResp, error) { conversations, err := c.conversationDatabase.GetConversationsByConversationID(ctx, req.ConversationIDs) if err != nil { return nil, err @@ -552,10 +541,7 @@ func (c *conversationServer) conversationSort(conversations map[int64]string, re resp.ConversationElems = append(resp.ConversationElems, cons...) } -func (c *conversationServer) getConversationInfo( - ctx context.Context, - chatLogs map[string]*sdkws.MsgData, - userID string) (map[string]*pbconversation.ConversationElem, error) { +func (c *conversationServer) getConversationInfo(ctx context.Context, chatLogs map[string]*sdkws.MsgData, userID string) (map[string]*pbconversation.ConversationElem, error) { var ( sendIDs []string groupIDs []string @@ -641,6 +627,11 @@ func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(ctx context } func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconversation.UpdateConversationReq) (*pbconversation.UpdateConversationResp, error) { + for _, userID := range req.UserIDs { + if err := authverify.CheckAccess(ctx, userID); err != nil { + return nil, err + } + } m := make(map[string]any) if req.RecvMsgOpt != nil { m["recv_msg_opt"] = req.RecvMsgOpt.Value @@ -687,6 +678,9 @@ func (c *conversationServer) UpdateConversation(ctx context.Context, req *pbconv } func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbconversation.GetOwnerConversationReq) (*pbconversation.GetOwnerConversationResp, error) { + if err := authverify.CheckAccess(ctx, req.UserID); err != nil { + return nil, err + } total, conversations, err := c.conversationDatabase.GetOwnerConversation(ctx, req.UserID, req.Pagination) if err != nil { return nil, err @@ -748,6 +742,9 @@ func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ } func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) { + if err := authverify.CheckAccess(ctx, req.UserID); err != nil { + return nil, err + } conversationIDs, err := c.conversationDatabase.GetNotNotifyConversationIDs(ctx, req.UserID) if err != nil { return nil, err @@ -756,6 +753,9 @@ func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, re } func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req *pbconversation.GetPinnedConversationIDsReq) (*pbconversation.GetPinnedConversationIDsResp, error) { + if err := authverify.CheckAccess(ctx, req.UserID); err != nil { + return nil, err + } conversationIDs, err := c.conversationDatabase.GetPinnedConversationIDs(ctx, req.UserID) if err != nil { return nil, err diff --git a/internal/rpc/conversation/db_map.go b/internal/rpc/conversation/db_map.go new file mode 100644 index 000000000..4acb16e4b --- /dev/null +++ b/internal/rpc/conversation/db_map.go @@ -0,0 +1,85 @@ +package conversation + +import ( + "context" + + dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/protocol/conversation" +) + +func UpdateConversationsMap(ctx context.Context, req *conversation.SetConversationsReq) (m map[string]any, conversation dbModel.Conversation, err error) { + m = make(map[string]any) + + conversation.ConversationID = req.Conversation.ConversationID + conversation.ConversationType = req.Conversation.ConversationType + conversation.UserID = req.Conversation.UserID + conversation.GroupID = req.Conversation.GroupID + + if req.Conversation.RecvMsgOpt != nil { + conversation.RecvMsgOpt = req.Conversation.RecvMsgOpt.Value + m["recv_msg_opt"] = req.Conversation.RecvMsgOpt.Value + } + + if req.Conversation.AttachedInfo != nil { + conversation.AttachedInfo = req.Conversation.AttachedInfo.Value + m["attached_info"] = req.Conversation.AttachedInfo.Value + } + + if req.Conversation.Ex != nil { + conversation.Ex = req.Conversation.Ex.Value + m["ex"] = req.Conversation.Ex.Value + } + if req.Conversation.IsPinned != nil { + conversation.IsPinned = req.Conversation.IsPinned.Value + m["is_pinned"] = req.Conversation.IsPinned.Value + } + if req.Conversation.GroupAtType != nil { + conversation.GroupAtType = req.Conversation.GroupAtType.Value + m["group_at_type"] = req.Conversation.GroupAtType.Value + } + if req.Conversation.MsgDestructTime != nil { + conversation.MsgDestructTime = req.Conversation.MsgDestructTime.Value + m["msg_destruct_time"] = req.Conversation.MsgDestructTime.Value + } + if req.Conversation.IsMsgDestruct != nil { + conversation.IsMsgDestruct = req.Conversation.IsMsgDestruct.Value + m["is_msg_destruct"] = req.Conversation.IsMsgDestruct.Value + } + if req.Conversation.BurnDuration != nil { + conversation.BurnDuration = req.Conversation.BurnDuration.Value + m["burn_duration"] = req.Conversation.BurnDuration.Value + } + + return m, conversation, nil +} + +func UserUpdateCheckMap(ctx context.Context, userID string, req *conversation.ConversationReq, conversation *dbModel.Conversation) (unequal bool) { + unequal = false + + if req.RecvMsgOpt != nil && conversation.RecvMsgOpt != req.RecvMsgOpt.Value { + unequal = true + } + if req.AttachedInfo != nil && conversation.AttachedInfo != req.AttachedInfo.Value { + unequal = true + } + if req.Ex != nil && conversation.Ex != req.Ex.Value { + unequal = true + } + if req.IsPinned != nil && conversation.IsPinned != req.IsPinned.Value { + unequal = true + } + if req.GroupAtType != nil && conversation.GroupAtType != req.GroupAtType.Value { + unequal = true + } + if req.MsgDestructTime != nil && conversation.MsgDestructTime != req.MsgDestructTime.Value { + unequal = true + } + if req.IsMsgDestruct != nil && conversation.IsMsgDestruct != req.IsMsgDestruct.Value { + unequal = true + } + if req.BurnDuration != nil && conversation.BurnDuration != req.BurnDuration.Value { + unequal = true + } + + return unequal +} diff --git a/internal/rpc/conversation/notification.go b/internal/rpc/conversation/notification.go index 370865c1a..6e865ba42 100644 --- a/internal/rpc/conversation/notification.go +++ b/internal/rpc/conversation/notification.go @@ -17,11 +17,11 @@ package conversation import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/notification" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/notification" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" ) diff --git a/internal/rpc/conversation/sync.go b/internal/rpc/conversation/sync.go index cee74b319..a24dd85c6 100644 --- a/internal/rpc/conversation/sync.go +++ b/internal/rpc/conversation/sync.go @@ -35,6 +35,9 @@ func (c *conversationServer) GetFullOwnerConversationIDs(ctx context.Context, re } func (c *conversationServer) GetIncrementalConversation(ctx context.Context, req *conversation.GetIncrementalConversationReq) (*conversation.GetIncrementalConversationResp, error) { + if err := authverify.CheckAccess(ctx, req.UserID); err != nil { + return nil, err + } opt := incrversion.Option[*conversation.Conversation, conversation.GetIncrementalConversationResp]{ Ctx: ctx, VersionKey: req.UserID, diff --git a/internal/rpc/group/cache.go b/internal/rpc/group/cache.go index 022a0f4ef..ec0e5b566 100644 --- a/internal/rpc/group/cache.go +++ b/internal/rpc/group/cache.go @@ -17,13 +17,14 @@ package group import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" pbgroup "github.com/openimsdk/protocol/group" ) // GetGroupInfoCache get group info from cache. -func (s *groupServer) GetGroupInfoCache(ctx context.Context, req *pbgroup.GetGroupInfoCacheReq) (*pbgroup.GetGroupInfoCacheResp, error) { - group, err := s.db.TakeGroup(ctx, req.GroupID) +func (g *groupServer) GetGroupInfoCache(ctx context.Context, req *pbgroup.GetGroupInfoCacheReq) (*pbgroup.GetGroupInfoCacheResp, error) { + group, err := g.db.TakeGroup(ctx, req.GroupID) if err != nil { return nil, err } @@ -32,8 +33,11 @@ func (s *groupServer) GetGroupInfoCache(ctx context.Context, req *pbgroup.GetGro }, nil } -func (s *groupServer) GetGroupMemberCache(ctx context.Context, req *pbgroup.GetGroupMemberCacheReq) (*pbgroup.GetGroupMemberCacheResp, error) { - members, err := s.db.TakeGroupMember(ctx, req.GroupID, req.GroupMemberID) +func (g *groupServer) GetGroupMemberCache(ctx context.Context, req *pbgroup.GetGroupMemberCacheReq) (*pbgroup.GetGroupMemberCacheResp, error) { + if err := authverify.CheckAccess(ctx, req.GroupMemberID); err != nil { + return nil, err + } + members, err := g.db.TakeGroupMember(ctx, req.GroupID, req.GroupMemberID) if err != nil { return nil, err } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 10cdc2546..2026ba71b 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -476,6 +476,19 @@ func (g *groupServer) GetGroupAllMember(ctx context.Context, req *pbgroup.GetGro if err != nil { return nil, err } + if !authverify.IsAdmin(ctx) { + var inGroup bool + opUserID := mcontext.GetOpUserID(ctx) + for _, member := range members { + if member.UserID == opUserID { + inGroup = true + break + } + } + if !inGroup { + return nil, errs.ErrNoPermission.WrapMsg("opuser not in group") + } + } if err := g.PopulateGroupMember(ctx, members...); err != nil { return nil, err } @@ -486,11 +499,24 @@ func (g *groupServer) GetGroupAllMember(ctx context.Context, req *pbgroup.GetGro return &resp, nil } +func (g *groupServer) checkAdminOrInGroup(ctx context.Context, groupID string) error { + if authverify.IsAdmin(ctx) { + return nil + } + opUserID := mcontext.GetOpUserID(ctx) + members, err := g.db.FindGroupMembers(ctx, groupID, []string{opUserID}) + if err != nil { + return err + } + if len(members) == 0 { + return errs.ErrNoPermission.WrapMsg("op user not in group") + } + return nil +} + func (g *groupServer) GetGroupMemberList(ctx context.Context, req *pbgroup.GetGroupMemberListReq) (*pbgroup.GetGroupMemberListResp, error) { - if opUserID := mcontext.GetOpUserID(ctx); !datautil.Contain(opUserID, g.config.Share.IMAdminUserID...) { - if _, err := g.db.TakeGroupMember(ctx, req.GroupID, opUserID); err != nil { - return nil, err - } + if err := g.checkAdminOrInGroup(ctx, req.GroupID); err != nil { + return nil, err } var ( total int64 @@ -631,6 +657,9 @@ func (g *groupServer) GetGroupMembersInfo(ctx context.Context, req *pbgroup.GetG if req.GroupID == "" { return nil, errs.ErrArgs.WrapMsg("groupID empty") } + if err := g.checkAdminOrInGroup(ctx, req.GroupID); err != nil { + return nil, err + } members, err := g.getGroupMembersInfo(ctx, req.GroupID, req.UserIDs) if err != nil { return nil, err @@ -658,6 +687,9 @@ func (g *groupServer) getGroupMembersInfo(ctx context.Context, groupID string, u // GetGroupApplicationList handles functions that get a list of group requests. func (g *groupServer) GetGroupApplicationList(ctx context.Context, req *pbgroup.GetGroupApplicationListReq) (*pbgroup.GetGroupApplicationListResp, error) { + if err := authverify.CheckAccess(ctx, req.FromUserID); err != nil { + return nil, err + } groupIDs, err := g.db.FindUserManagedGroupID(ctx, req.FromUserID) if err != nil { return nil, err @@ -1652,6 +1684,11 @@ func (g *groupServer) GetGroupAbstractInfo(ctx context.Context, req *pbgroup.Get if datautil.Duplicate(req.GroupIDs) { return nil, errs.ErrArgs.WrapMsg("groupIDs duplicate") } + for _, groupID := range req.GroupIDs { + if err := g.checkAdminOrInGroup(ctx, groupID); err != nil { + return nil, err + } + } groups, err := g.db.FindGroup(ctx, req.GroupIDs) if err != nil { return nil, err @@ -1699,6 +1736,9 @@ func (g *groupServer) GetGroupMemberUserIDs(ctx context.Context, req *pbgroup.Ge if err != nil { return nil, err } + if err := authverify.CheckAccessIn(ctx, userIDs...); err != nil { + return nil, err + } return &pbgroup.GetGroupMemberUserIDsResp{ UserIDs: userIDs, }, nil diff --git a/internal/rpc/group/statistics.go b/internal/rpc/group/statistics.go index 6adb1261a..4ee3396da 100644 --- a/internal/rpc/group/statistics.go +++ b/internal/rpc/group/statistics.go @@ -18,24 +18,28 @@ import ( "context" "time" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/protocol/group" "github.com/openimsdk/tools/errs" ) -func (s *groupServer) GroupCreateCount(ctx context.Context, req *group.GroupCreateCountReq) (*group.GroupCreateCountResp, error) { +func (g *groupServer) GroupCreateCount(ctx context.Context, req *group.GroupCreateCountReq) (*group.GroupCreateCountResp, error) { if req.Start > req.End { return nil, errs.ErrArgs.WrapMsg("start > end: %d > %d", req.Start, req.End) } - total, err := s.db.CountTotal(ctx, nil) + if err := authverify.CheckAdmin(ctx); err != nil { + return nil, err + } + total, err := g.db.CountTotal(ctx, nil) if err != nil { return nil, err } start := time.UnixMilli(req.Start) - before, err := s.db.CountTotal(ctx, &start) + before, err := g.db.CountTotal(ctx, &start) if err != nil { return nil, err } - count, err := s.db.CountRangeEverydayTotal(ctx, start, time.UnixMilli(req.End)) + count, err := g.db.CountRangeEverydayTotal(ctx, start, time.UnixMilli(req.End)) if err != nil { return nil, err } diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index 822b15307..baee2f2d4 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -11,9 +11,6 @@ import ( "github.com/openimsdk/protocol/constant" pbgroup "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/mcontext" - "github.com/openimsdk/tools/utils/datautil" ) const versionSyncLimit = 500 @@ -23,10 +20,8 @@ func (g *groupServer) GetFullGroupMemberUserIDs(ctx context.Context, req *pbgrou if err != nil { return nil, err } - if opUserID := mcontext.GetOpUserID(ctx); !datautil.Contain(opUserID, g.config.Share.IMAdminUserID...) { - if !datautil.Contain(opUserID, userIDs...) { - return nil, errs.ErrNoPermission.WrapMsg("user not in group") - } + if err := authverify.CheckAccessIn(ctx, userIDs...); err != nil { + return nil, err } vl, err := g.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID) if err != nil { @@ -69,6 +64,9 @@ func (g *groupServer) GetFullJoinGroupIDs(ctx context.Context, req *pbgroup.GetF } func (g *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) { + if err := g.checkAdminOrInGroup(ctx, req.GroupID); err != nil { + return nil, err + } group, err := g.db.TakeGroup(ctx, req.GroupID) if err != nil { return nil, err @@ -76,9 +74,6 @@ func (g *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou if group.Status == constant.GroupStatusDismissed { return nil, servererrs.ErrDismissedAlready.Wrap() } - if _, err := g.db.TakeGroupMember(ctx, req.GroupID, mcontext.GetOpUserID(ctx)); err != nil { - return nil, err - } var ( hasGroupUpdate bool sortVersion uint64 diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index b25eae6b1..c52ce9c07 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -18,6 +18,7 @@ import ( "context" "errors" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" cbapi "github.com/openimsdk/open-im-server/v3/pkg/callbackstruct" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" @@ -29,6 +30,9 @@ import ( ) func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *msg.GetConversationsHasReadAndMaxSeqReq) (*msg.GetConversationsHasReadAndMaxSeqResp, error) { + if err := authverify.CheckAccess(ctx, req.UserID); err != nil { + return nil, err + } var conversationIDs []string if len(req.ConversationIDs) == 0 { var err error @@ -82,6 +86,9 @@ func (m *msgServer) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *m } func (m *msgServer) SetConversationHasReadSeq(ctx context.Context, req *msg.SetConversationHasReadSeqReq) (*msg.SetConversationHasReadSeqResp, error) { + if err := authverify.CheckAccess(ctx, req.UserID); err != nil { + return nil, err + } maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID) if err != nil { return nil, err @@ -97,8 +104,8 @@ func (m *msgServer) SetConversationHasReadSeq(ctx context.Context, req *msg.SetC } func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadReq) (*msg.MarkMsgsAsReadResp, error) { - if len(req.Seqs) < 1 { - return nil, errs.ErrArgs.WrapMsg("seqs must not be empty") + if err := authverify.CheckAccess(ctx, req.UserID); err != nil { + return nil, err } maxSeq, err := m.MsgDatabase.GetMaxSeq(ctx, req.ConversationID) if err != nil { @@ -139,6 +146,9 @@ func (m *msgServer) MarkMsgsAsRead(ctx context.Context, req *msg.MarkMsgsAsReadR } func (m *msgServer) MarkConversationAsRead(ctx context.Context, req *msg.MarkConversationAsReadReq) (*msg.MarkConversationAsReadResp, error) { + if err := authverify.CheckAccess(ctx, req.UserID); err != nil { + return nil, err + } conversation, err := m.ConversationLocalCache.GetConversation(ctx, req.UserID, req.ConversationID) if err != nil { return nil, err @@ -216,5 +226,4 @@ func (m *msgServer) sendMarkAsReadNotification(ctx context.Context, conversation HasReadSeq: hasReadSeq, } m.notificationSender.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips) - } diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index 4590523d5..d24420ebb 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -94,6 +94,9 @@ func (m *msgServer) DeleteMsgs(ctx context.Context, req *msg.DeleteMsgsReq) (*ms } func (m *msgServer) DeleteMsgPhysicalBySeq(ctx context.Context, req *msg.DeleteMsgPhysicalBySeqReq) (*msg.DeleteMsgPhysicalBySeqResp, error) { + if err := authverify.CheckAdmin(ctx); err != nil { + return nil, err + } err := m.MsgDatabase.DeleteMsgsPhysicalBySeqs(ctx, req.ConversationID, req.Seqs) if err != nil { return nil, err diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index 2b66f7a9a..0e3a9950b 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -17,11 +17,14 @@ package msg import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "google.golang.org/protobuf/proto" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" "github.com/openimsdk/protocol/constant" - pbconversation "github.com/openimsdk/protocol/conversation" + pbconv "github.com/openimsdk/protocol/conversation" pbmsg "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/protocol/wrapperspb" @@ -29,13 +32,15 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" - "google.golang.org/protobuf/proto" ) func (m *msgServer) SendMsg(ctx context.Context, req *pbmsg.SendMsgReq) (*pbmsg.SendMsgResp, error) { if req.MsgData == nil { return nil, errs.ErrArgs.WrapMsg("msgData is nil") } + if err := authverify.CheckAccess(ctx, req.MsgData.SendID); err != nil { + return nil, err + } before := new(*sdkws.MsgData) resp, err := m.sendMsg(ctx, req, before) if err != nil { @@ -104,7 +109,7 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa var atUserID []string - conversation := &pbconversation.ConversationReq{ + conversation := &pbconv.ConversationReq{ ConversationID: msgprocessor.GetConversationIDByMsg(msg), ConversationType: msg.SessionType, GroupID: msg.GroupID, @@ -171,13 +176,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbmsg.SendMsgReq isSend := true isNotification := msgprocessor.IsNotificationByMsg(req.MsgData) if !isNotification { - isSend, err = m.modifyMessageByUserMessageReceiveOpt( - ctx, - req.MsgData.RecvID, - conversationutil.GenConversationIDForSingle(req.MsgData.SendID, req.MsgData.RecvID), - constant.SingleChatType, - req, - ) + isSend, err = m.modifyMessageByUserMessageReceiveOpt(authverify.WithTempAdmin(ctx), req.MsgData.RecvID, conversationutil.GenConversationIDForSingle(req.MsgData.SendID, req.MsgData.RecvID), constant.SingleChatType, req) if err != nil { return nil, err } diff --git a/internal/rpc/msg/seq.go b/internal/rpc/msg/seq.go index bd68138fb..6a0461dc8 100644 --- a/internal/rpc/msg/seq.go +++ b/internal/rpc/msg/seq.go @@ -17,9 +17,10 @@ package msg import ( "context" "errors" + "sort" + pbmsg "github.com/openimsdk/protocol/msg" "github.com/redis/go-redis/v9" - "sort" ) func (m *msgServer) GetConversationMaxSeq(ctx context.Context, req *pbmsg.GetConversationMaxSeqReq) (*pbmsg.GetConversationMaxSeqResp, error) { diff --git a/internal/rpc/msg/statistics.go b/internal/rpc/msg/statistics.go index 01c0f1c46..b1f90cae4 100644 --- a/internal/rpc/msg/statistics.go +++ b/internal/rpc/msg/statistics.go @@ -16,15 +16,20 @@ package msg import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "time" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/utils/datautil" ) func (m *msgServer) GetActiveUser(ctx context.Context, req *msg.GetActiveUserReq) (*msg.GetActiveUserResp, error) { + if err := authverify.CheckAdmin(ctx); err != nil { + return nil, err + } msgCount, userCount, users, dateCount, err := m.MsgDatabase.RangeUserSendCount(ctx, time.UnixMilli(req.Start), time.UnixMilli(req.End), req.Group, req.Ase, req.Pagination.PageNumber, req.Pagination.ShowNumber) if err != nil { return nil, err @@ -60,6 +65,9 @@ func (m *msgServer) GetActiveUser(ctx context.Context, req *msg.GetActiveUserReq } func (m *msgServer) GetActiveGroup(ctx context.Context, req *msg.GetActiveGroupReq) (*msg.GetActiveGroupResp, error) { + if err := authverify.CheckAdmin(ctx); err != nil { + return nil, err + } msgCount, groupCount, groups, dateCount, err := m.MsgDatabase.RangeGroupSendCount(ctx, time.UnixMilli(req.Start), time.UnixMilli(req.End), req.Ase, req.Pagination.PageNumber, req.Pagination.ShowNumber) if err != nil { return nil, err diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index 38eed93bc..259c7f85d 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -29,6 +29,9 @@ import ( ) func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) { + if err := authverify.CheckAccess(ctx, req.UserID); err != nil { + return nil, err + } resp := &sdkws.PullMessageBySeqsResp{} resp.Msgs = make(map[string]*sdkws.PullMsgs) resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs) diff --git a/internal/rpc/relation/black.go b/internal/rpc/relation/black.go index 381a56273..dddb9b4c2 100644 --- a/internal/rpc/relation/black.go +++ b/internal/rpc/relation/black.go @@ -47,6 +47,9 @@ func (s *friendServer) GetPaginationBlacks(ctx context.Context, req *relation.Ge } func (s *friendServer) IsBlack(ctx context.Context, req *relation.IsBlackReq) (*relation.IsBlackResp, error) { + if err := authverify.CheckAccessIn(ctx, req.UserID1, req.UserID2); err != nil { + return nil, err + } in1, in2, err := s.blackDatabase.CheckIn(ctx, req.UserID1, req.UserID2) if err != nil { return nil, err diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 050cd5ffe..06661c79d 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -280,6 +280,9 @@ func (s *friendServer) SetFriendRemark(ctx context.Context, req *relation.SetFri } func (s *friendServer) GetFriendInfo(ctx context.Context, req *relation.GetFriendInfoReq) (*relation.GetFriendInfoResp, error) { + if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { + return nil, err + } friends, err := s.db.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs) if err != nil { return nil, err @@ -288,6 +291,9 @@ func (s *friendServer) GetFriendInfo(ctx context.Context, req *relation.GetFrien } func (s *friendServer) GetDesignatedFriends(ctx context.Context, req *relation.GetDesignatedFriendsReq) (resp *relation.GetDesignatedFriendsResp, err error) { + if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { + return nil, err + } resp = &relation.GetDesignatedFriendsResp{} if datautil.Duplicate(req.FriendUserIDs) { return nil, errs.ErrArgs.WrapMsg("friend userID repeated") @@ -313,9 +319,10 @@ func (s *friendServer) getFriend(ctx context.Context, ownerUserID string, friend } // Get the list of friend requests sent out proactively. -func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context, - req *relation.GetDesignatedFriendsApplyReq, -) (resp *relation.GetDesignatedFriendsApplyResp, err error) { +func (s *friendServer) GetDesignatedFriendsApply(ctx context.Context, req *relation.GetDesignatedFriendsApplyReq) (resp *relation.GetDesignatedFriendsApplyResp, err error) { + if err := authverify.CheckAccessIn(ctx, req.FromUserID, req.ToUserID); err != nil { + return nil, err + } friendRequests, err := s.db.FindBothFriendRequests(ctx, req.FromUserID, req.ToUserID) if err != nil { return nil, err @@ -374,6 +381,9 @@ func (s *friendServer) GetPaginationFriendsApplyFrom(ctx context.Context, req *r // ok. func (s *friendServer) IsFriend(ctx context.Context, req *relation.IsFriendReq) (resp *relation.IsFriendResp, err error) { + if err := authverify.CheckAccessIn(ctx, req.UserID1, req.UserID2); err != nil { + return nil, err + } resp = &relation.IsFriendResp{} resp.InUser1Friends, resp.InUser2Friends, err = s.db.CheckIn(ctx, req.UserID1, req.UserID2) if err != nil { @@ -426,6 +436,9 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *relatio return nil, errs.ErrArgs.WrapMsg("userIDList repeated") } + if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { + return nil, err + } userMap, err := s.userClient.GetUsersInfoMap(ctx, req.UserIDList) if err != nil { return nil, err @@ -494,10 +507,7 @@ func (s *friendServer) GetSpecifiedFriendsInfo(ctx context.Context, req *relatio return resp, nil } -func (s *friendServer) UpdateFriends( - ctx context.Context, - req *relation.UpdateFriendsReq, -) (*relation.UpdateFriendsResp, error) { +func (s *friendServer) UpdateFriends(ctx context.Context, req *relation.UpdateFriendsReq) (*relation.UpdateFriendsResp, error) { if len(req.FriendUserIDs) == 0 { return nil, errs.ErrArgs.WrapMsg("friendIDList is empty") } @@ -505,6 +515,10 @@ func (s *friendServer) UpdateFriends( return nil, errs.ErrArgs.WrapMsg("friendIDList repeated") } + if err := authverify.CheckAccess(ctx, req.OwnerUserID); err != nil { + return nil, err + } + _, err := s.db.FindFriendsWithError(ctx, req.OwnerUserID, req.FriendUserIDs) if err != nil { return nil, err diff --git a/internal/rpc/third/log.go b/internal/rpc/third/log.go index fba3ecb88..9a4995ace 100644 --- a/internal/rpc/third/log.go +++ b/internal/rpc/third/log.go @@ -25,6 +25,7 @@ import ( "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" ) @@ -45,7 +46,7 @@ func genLogID() string { func (t *thirdServer) UploadLogs(ctx context.Context, req *third.UploadLogsReq) (*third.UploadLogsResp, error) { var dbLogs []*relationtb.Log - userID := ctx.Value(constant.OpUserID).(string) + userID := mcontext.GetOpUserID(ctx) platform := constant.PlatformID2Name[int(req.Platform)] for _, fileURL := range req.FileURLs { log := relationtb.Log{ diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 0afd54014..c6dcb2ea4 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -19,6 +19,7 @@ import ( "fmt" "time" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/mcache" "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" @@ -148,6 +149,9 @@ func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTo } func (t *thirdServer) SetAppBadge(ctx context.Context, req *third.SetAppBadgeReq) (resp *third.SetAppBadgeResp, err error) { + if err := authverify.CheckAccess(ctx, req.UserID); err != nil { + return nil, err + } err = t.thirdDatabase.SetAppBadge(ctx, req.UserID, int(req.AppUnreadCount)) if err != nil { return nil, err diff --git a/pkg/authverify/token.go b/pkg/authverify/token.go index 2e3639776..a82eba30a 100644 --- a/pkg/authverify/token.go +++ b/pkg/authverify/token.go @@ -64,16 +64,57 @@ func GetIMAdminUserIDs(ctx context.Context) []string { } func IsAdmin(ctx context.Context) bool { - return datautil.Contain(mcontext.GetOpUserID(ctx), GetIMAdminUserIDs(ctx)...) + return IsTempAdmin(ctx) || IsSystemAdmin(ctx) } func CheckAccess(ctx context.Context, ownerUserID string) error { - opUserID := mcontext.GetOpUserID(ctx) - if opUserID == ownerUserID { + if mcontext.GetOpUserID(ctx) == ownerUserID { return nil } - if datautil.Contain(mcontext.GetOpUserID(ctx), GetIMAdminUserIDs(ctx)...) { + if IsAdmin(ctx) { return nil } return servererrs.ErrNoPermission.WrapMsg("ownerUserID", ownerUserID) } + +func CheckAccessIn(ctx context.Context, ownerUserIDs ...string) error { + opUserID := mcontext.GetOpUserID(ctx) + for _, userID := range ownerUserIDs { + if opUserID == userID { + return nil + } + } + if IsAdmin(ctx) { + return nil + } + return servererrs.ErrNoPermission.WrapMsg("opUser in ownerUserIDs") +} + +var tempAdminValue = []string{"1"} + +const ctxTempAdminKey = "ctxImTempAdminKey" + +func WithTempAdmin(ctx context.Context) context.Context { + keys, _ := ctx.Value(constant.RpcCustomHeader).([]string) + if datautil.Contain(ctxTempAdminKey, keys...) { + return ctx + } + if len(keys) > 0 { + temp := make([]string, 0, len(keys)+1) + temp = append(temp, keys...) + keys = append(temp, ctxTempAdminKey) + } else { + keys = []string{ctxTempAdminKey} + } + ctx = context.WithValue(ctx, constant.RpcCustomHeader, keys) + return context.WithValue(ctx, ctxTempAdminKey, tempAdminValue) +} + +func IsTempAdmin(ctx context.Context) bool { + values, _ := ctx.Value(ctxTempAdminKey).([]string) + return datautil.Equal(tempAdminValue, values) +} + +func IsSystemAdmin(ctx context.Context) bool { + return datautil.Contain(mcontext.GetOpUserID(ctx), GetIMAdminUserIDs(ctx)...) +} diff --git a/pkg/tools/batcher/batcher.go b/pkg/tools/batcher/batcher.go index dcf5d07ad..93a31ed8f 100644 --- a/pkg/tools/batcher/batcher.go +++ b/pkg/tools/batcher/batcher.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/idutil" ) @@ -253,13 +254,14 @@ func (b *Batcher[T]) distributeMessage(messages map[string][]*T, totalCount int, func (b *Batcher[T]) run(channelID int, ch <-chan *Msg[T]) { defer b.wait.Done() + ctx := authverify.WithTempAdmin(context.Background()) for { select { case messages, ok := <-ch: if !ok { return } - b.Do(context.Background(), channelID, messages) + b.Do(ctx, channelID, messages) if b.config.syncWait { b.counter.Done() } From 1ef55cda051694a6cc4d43dee19664b2c74f2020 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 14 Mar 2025 16:46:29 +0800 Subject: [PATCH 2/4] feat: Implement webhook in createConversation (#3228) * update test method args. * feat: implement createConversations webhook function. * improve webhookCreateConversations Implement * implement createconversation webhook. * remove unused paramaters. (cherry picked from commit b969827b9af9120b28f267b7f600e3b97c928e59) --- config/webhooks.yml | 20 +++- internal/rpc/conversation/conversation.go | 4 +- internal/tools/cron/cron_test.go | 2 +- pkg/callbackstruct/constant.go | 98 ++++++++++--------- pkg/callbackstruct/conversation.go | 91 +++++++++++++++++ pkg/common/cmd/conversation.go | 2 + pkg/common/storage/controller/conversation.go | 19 ++-- 7 files changed, 177 insertions(+), 59 deletions(-) create mode 100644 pkg/callbackstruct/conversation.go diff --git a/config/webhooks.yml b/config/webhooks.yml index 41c60e7e2..283a23ed4 100644 --- a/config/webhooks.yml +++ b/config/webhooks.yml @@ -7,11 +7,11 @@ beforeSendSingleMsg: # If not set, all contentType messages will through this filter. deniedTypes: [] beforeUpdateUserInfoEx: - enable: false + enable: false timeout: 5 failedContinue: true afterUpdateUserInfoEx: - enable: false + enable: false timeout: 5 afterSendSingleMsg: enable: false @@ -181,3 +181,19 @@ afterImportFriends: afterRemoveBlack: enable: false timeout: 5 +beforeCreateSingleChatConversations: + enable: false + timeout: 5 + failedContinue: false +afterCreateSingleChatConversations: + enable: false + timeout: 5 + failedContinue: false +beforeCreateGroupChatConversations: + enable: false + timeout: 5 + failedContinue: false +afterCreateGroupChatConversations: + enable: false + timeout: 5 + failedContinue: false diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index b0b1053ed..be10f4201 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -19,12 +19,9 @@ import ( "sort" "time" - "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" - "google.golang.org/grpc" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" @@ -43,6 +40,7 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" + "google.golang.org/grpc" ) type conversationServer struct { diff --git a/internal/tools/cron/cron_test.go b/internal/tools/cron/cron_test.go index b98b14f14..af827fc38 100644 --- a/internal/tools/cron/cron_test.go +++ b/internal/tools/cron/cron_test.go @@ -46,7 +46,7 @@ func TestName(t *testing.T) { srv := &cronServer{ ctx: ctx, - config: &CronTaskConfig{ + config: &Config{ CronTask: config.CronTask{ RetainChatRecords: 1, FileExpireTime: 1, diff --git a/pkg/callbackstruct/constant.go b/pkg/callbackstruct/constant.go index 73f89a719..bbc34e71f 100644 --- a/pkg/callbackstruct/constant.go +++ b/pkg/callbackstruct/constant.go @@ -15,51 +15,55 @@ package callbackstruct const ( - CallbackBeforeInviteJoinGroupCommand = "callbackBeforeInviteJoinGroupCommand" - CallbackAfterJoinGroupCommand = "callbackAfterJoinGroupCommand" - CallbackAfterSetGroupInfoCommand = "callbackAfterSetGroupInfoCommand" - CallbackAfterSetGroupInfoExCommand = "callbackAfterSetGroupInfoExCommand" - CallbackBeforeSetGroupInfoCommand = "callbackBeforeSetGroupInfoCommand" - CallbackBeforeSetGroupInfoExCommand = "callbackBeforeSetGroupInfoExCommand" - CallbackAfterRevokeMsgCommand = "callbackBeforeAfterMsgCommand" - CallbackBeforeAddBlackCommand = "callbackBeforeAddBlackCommand" - CallbackAfterAddFriendCommand = "callbackAfterAddFriendCommand" - CallbackBeforeAddFriendAgreeCommand = "callbackBeforeAddFriendAgreeCommand" - CallbackAfterAddFriendAgreeCommand = "callbackAfterAddFriendAgreeCommand" - CallbackAfterDeleteFriendCommand = "callbackAfterDeleteFriendCommand" - CallbackBeforeImportFriendsCommand = "callbackBeforeImportFriendsCommand" - CallbackAfterImportFriendsCommand = "callbackAfterImportFriendsCommand" - CallbackAfterRemoveBlackCommand = "callbackAfterRemoveBlackCommand" - CallbackAfterQuitGroupCommand = "callbackAfterQuitGroupCommand" - CallbackAfterKickGroupCommand = "callbackAfterKickGroupCommand" - CallbackAfterDisMissGroupCommand = "callbackAfterDisMissGroupCommand" - CallbackBeforeJoinGroupCommand = "callbackBeforeJoinGroupCommand" - CallbackAfterGroupMsgReadCommand = "callbackAfterGroupMsgReadCommand" - CallbackBeforeMsgModifyCommand = "callbackBeforeMsgModifyCommand" - CallbackAfterUpdateUserInfoCommand = "callbackAfterUpdateUserInfoCommand" - CallbackAfterUpdateUserInfoExCommand = "callbackAfterUpdateUserInfoExCommand" - CallbackBeforeUpdateUserInfoExCommand = "callbackBeforeUpdateUserInfoExCommand" - CallbackBeforeUserRegisterCommand = "callbackBeforeUserRegisterCommand" - CallbackAfterUserRegisterCommand = "callbackAfterUserRegisterCommand" - CallbackAfterTransferGroupOwnerCommand = "callbackAfterTransferGroupOwnerCommand" - CallbackBeforeSetFriendRemarkCommand = "callbackBeforeSetFriendRemarkCommand" - CallbackAfterSetFriendRemarkCommand = "callbackAfterSetFriendRemarkCommand" - CallbackAfterSingleMsgReadCommand = "callbackAfterSingleMsgReadCommand" - CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand" - CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand" - CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand" - CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand" - CallbackAfterUserOnlineCommand = "callbackAfterUserOnlineCommand" - CallbackAfterUserOfflineCommand = "callbackAfterUserOfflineCommand" - CallbackAfterUserKickOffCommand = "callbackAfterUserKickOffCommand" - CallbackBeforeOfflinePushCommand = "callbackBeforeOfflinePushCommand" - CallbackBeforeOnlinePushCommand = "callbackBeforeOnlinePushCommand" - CallbackBeforeGroupOnlinePushCommand = "callbackBeforeGroupOnlinePushCommand" - CallbackBeforeAddFriendCommand = "callbackBeforeAddFriendCommand" - CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand" - CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand" - CallbackAfterCreateGroupCommand = "callbackAfterCreateGroupCommand" - CallbackBeforeMembersJoinGroupCommand = "callbackBeforeMembersJoinGroupCommand" - CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand" - CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand" + CallbackBeforeInviteJoinGroupCommand = "callbackBeforeInviteJoinGroupCommand" + CallbackAfterJoinGroupCommand = "callbackAfterJoinGroupCommand" + CallbackAfterSetGroupInfoCommand = "callbackAfterSetGroupInfoCommand" + CallbackAfterSetGroupInfoExCommand = "callbackAfterSetGroupInfoExCommand" + CallbackBeforeSetGroupInfoCommand = "callbackBeforeSetGroupInfoCommand" + CallbackBeforeSetGroupInfoExCommand = "callbackBeforeSetGroupInfoExCommand" + CallbackAfterRevokeMsgCommand = "callbackBeforeAfterMsgCommand" + CallbackBeforeAddBlackCommand = "callbackBeforeAddBlackCommand" + CallbackAfterAddFriendCommand = "callbackAfterAddFriendCommand" + CallbackBeforeAddFriendAgreeCommand = "callbackBeforeAddFriendAgreeCommand" + CallbackAfterAddFriendAgreeCommand = "callbackAfterAddFriendAgreeCommand" + CallbackAfterDeleteFriendCommand = "callbackAfterDeleteFriendCommand" + CallbackBeforeImportFriendsCommand = "callbackBeforeImportFriendsCommand" + CallbackAfterImportFriendsCommand = "callbackAfterImportFriendsCommand" + CallbackAfterRemoveBlackCommand = "callbackAfterRemoveBlackCommand" + CallbackAfterQuitGroupCommand = "callbackAfterQuitGroupCommand" + CallbackAfterKickGroupCommand = "callbackAfterKickGroupCommand" + CallbackAfterDisMissGroupCommand = "callbackAfterDisMissGroupCommand" + CallbackBeforeJoinGroupCommand = "callbackBeforeJoinGroupCommand" + CallbackAfterGroupMsgReadCommand = "callbackAfterGroupMsgReadCommand" + CallbackBeforeMsgModifyCommand = "callbackBeforeMsgModifyCommand" + CallbackAfterUpdateUserInfoCommand = "callbackAfterUpdateUserInfoCommand" + CallbackAfterUpdateUserInfoExCommand = "callbackAfterUpdateUserInfoExCommand" + CallbackBeforeUpdateUserInfoExCommand = "callbackBeforeUpdateUserInfoExCommand" + CallbackBeforeUserRegisterCommand = "callbackBeforeUserRegisterCommand" + CallbackAfterUserRegisterCommand = "callbackAfterUserRegisterCommand" + CallbackAfterTransferGroupOwnerCommand = "callbackAfterTransferGroupOwnerCommand" + CallbackBeforeSetFriendRemarkCommand = "callbackBeforeSetFriendRemarkCommand" + CallbackAfterSetFriendRemarkCommand = "callbackAfterSetFriendRemarkCommand" + CallbackAfterSingleMsgReadCommand = "callbackAfterSingleMsgReadCommand" + CallbackBeforeSendSingleMsgCommand = "callbackBeforeSendSingleMsgCommand" + CallbackAfterSendSingleMsgCommand = "callbackAfterSendSingleMsgCommand" + CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand" + CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand" + CallbackAfterUserOnlineCommand = "callbackAfterUserOnlineCommand" + CallbackAfterUserOfflineCommand = "callbackAfterUserOfflineCommand" + CallbackAfterUserKickOffCommand = "callbackAfterUserKickOffCommand" + CallbackBeforeOfflinePushCommand = "callbackBeforeOfflinePushCommand" + CallbackBeforeOnlinePushCommand = "callbackBeforeOnlinePushCommand" + CallbackBeforeGroupOnlinePushCommand = "callbackBeforeGroupOnlinePushCommand" + CallbackBeforeAddFriendCommand = "callbackBeforeAddFriendCommand" + CallbackBeforeUpdateUserInfoCommand = "callbackBeforeUpdateUserInfoCommand" + CallbackBeforeCreateGroupCommand = "callbackBeforeCreateGroupCommand" + CallbackAfterCreateGroupCommand = "callbackAfterCreateGroupCommand" + CallbackBeforeMembersJoinGroupCommand = "callbackBeforeMembersJoinGroupCommand" + CallbackBeforeSetGroupMemberInfoCommand = "callbackBeforeSetGroupMemberInfoCommand" + CallbackAfterSetGroupMemberInfoCommand = "callbackAfterSetGroupMemberInfoCommand" + CallbackBeforeCreateSingleChatConversationsCommand = "callbackBeforeCreateSingleChatConversationsCommand" + CallbackAfterCreateSingleChatConversationsCommand = "callbackAfterCreateSingleChatConversationsCommand" + CallbackBeforeCreateGroupChatConversationsCommand = "callbackBeforeCreateGroupChatConversationsCommand" + CallbackAfterCreateGroupChatConversationsCommand = "callbackAfterCreateGroupChatConversationsCommand" ) diff --git a/pkg/callbackstruct/conversation.go b/pkg/callbackstruct/conversation.go new file mode 100644 index 000000000..14e78094c --- /dev/null +++ b/pkg/callbackstruct/conversation.go @@ -0,0 +1,91 @@ +package callbackstruct + +type CallbackBeforeCreateSingleChatConversationsReq struct { + CallbackCommand `json:"callbackCommand"` + OwnerUserID string `json:"owner_user_id"` + ConversationID string `json:"conversation_id"` + ConversationType int32 `json:"conversation_type"` + UserID string `json:"user_id"` + RecvMsgOpt int32 `json:"recv_msg_opt"` + IsPinned bool `json:"is_pinned"` + IsPrivateChat bool `json:"is_private_chat"` + BurnDuration int32 `json:"burn_duration"` + GroupAtType int32 `json:"group_at_type"` + AttachedInfo string `json:"attached_info"` + Ex string `json:"ex"` +} + +type CallbackBeforeCreateSingleChatConversationsResp struct { + CommonCallbackResp + RecvMsgOpt *int32 `json:"recv_msg_opt"` + IsPinned *bool `json:"is_pinned"` + IsPrivateChat *bool `json:"is_private_chat"` + BurnDuration *int32 `json:"burn_duration"` + GroupAtType *int32 `json:"group_at_type"` + AttachedInfo *string `json:"attached_info"` + Ex *string `json:"ex"` +} + +type CallbackAfterCreateSingleChatConversationsReq struct { + CallbackCommand `json:"callbackCommand"` + OwnerUserID string `json:"owner_user_id"` + ConversationID string `json:"conversation_id"` + ConversationType int32 `json:"conversation_type"` + UserID string `json:"user_id"` + RecvMsgOpt int32 `json:"recv_msg_opt"` + IsPinned bool `json:"is_pinned"` + IsPrivateChat bool `json:"is_private_chat"` + BurnDuration int32 `json:"burn_duration"` + GroupAtType int32 `json:"group_at_type"` + AttachedInfo string `json:"attached_info"` + Ex string `json:"ex"` +} + +type CallbackAfterCreateSingleChatConversationsResp struct { + CommonCallbackResp +} + +type CallbackBeforeCreateGroupChatConversationsReq struct { + CallbackCommand `json:"callbackCommand"` + OwnerUserID string `json:"owner_user_id"` + ConversationID string `json:"conversation_id"` + ConversationType int32 `json:"conversation_type"` + GroupID string `json:"group_id"` + RecvMsgOpt int32 `json:"recv_msg_opt"` + IsPinned bool `json:"is_pinned"` + IsPrivateChat bool `json:"is_private_chat"` + BurnDuration int32 `json:"burn_duration"` + GroupAtType int32 `json:"group_at_type"` + AttachedInfo string `json:"attached_info"` + Ex string `json:"ex"` +} + +type CallbackBeforeCreateGroupChatConversationsResp struct { + CommonCallbackResp + RecvMsgOpt *int32 `json:"recv_msg_opt"` + IsPinned *bool `json:"is_pinned"` + IsPrivateChat *bool `json:"is_private_chat"` + BurnDuration *int32 `json:"burn_duration"` + GroupAtType *int32 `json:"group_at_type"` + AttachedInfo *string `json:"attached_info"` + Ex *string `json:"ex"` +} + +type CallbackAfterCreateGroupChatConversationsReq struct { + CallbackCommand `json:"callbackCommand"` + OwnerUserID string `json:"owner_user_id"` + ConversationID string `json:"conversation_id"` + ConversationType int32 `json:"conversation_type"` + GroupID string `json:"group_id"` + RecvMsgOpt int32 `json:"recv_msg_opt"` + IsPinned bool `json:"is_pinned"` + IsPrivateChat bool `json:"is_private_chat"` + BurnDuration int32 `json:"burn_duration"` + GroupAtType int32 `json:"group_at_type"` + AttachedInfo string `json:"attached_info"` + Ex string `json:"ex"` +} + +type CallbackAfterCreateGroupChatConversationsResp struct { + CommonCallbackResp +} diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 2f8769897..428c442da 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -41,6 +41,7 @@ func NewConversationRpcCmd() *ConversationRpcCmd { config.MongodbConfigFileName: &conversationConfig.MongodbConfig, config.ShareFileName: &conversationConfig.Share, config.NotificationFileName: &conversationConfig.NotificationConfig, + config.WebhooksConfigFileName: &conversationConfig.WebhooksConfig, config.LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig, config.DiscoveryConfigFilename: &conversationConfig.Discovery, } @@ -67,6 +68,7 @@ func (a *ConversationRpcCmd) runE() error { a.conversationConfig.NotificationConfig.GetConfigFileName(), a.conversationConfig.Share.GetConfigFileName(), a.conversationConfig.LocalCacheConfig.GetConfigFileName(), + a.conversationConfig.WebhooksConfig.GetConfigFileName(), a.conversationConfig.Discovery.GetConfigFileName(), }, nil, conversation.Start) diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index d4088e0c0..7578394b5 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -22,7 +22,6 @@ import ( relationtb "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" - "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/db/tx" @@ -48,7 +47,7 @@ type ConversationDatabase interface { // transactional. SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error // CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs. - CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error + CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string, conversations *relationtb.Conversation) error // GetConversationIDs retrieves conversation IDs for a given user. GetConversationIDs(ctx context.Context, userID string) ([]string, error) // GetUserConversationIDsHash gets the hash of conversation IDs for a given user. @@ -298,10 +297,10 @@ func (c *conversationDatabase) SetUserConversations(ctx context.Context, ownerUs // return c.cache.GetSuperGroupRecvMsgNotNotifyUserIDs(ctx, groupID) //} -func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string) error { +func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string, conversation *relationtb.Conversation) error { return c.tx.Transaction(ctx, func(ctx context.Context) error { cache := c.cache.CloneConversationCache() - conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID) + conversationID := conversation.ConversationID existConversationUserIDs, err := c.conversationDB.FindUserID(ctx, userIDs, []string{conversationID}) if err != nil { return err @@ -309,7 +308,15 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, notExistUserIDs := stringutil.DifferenceString(userIDs, existConversationUserIDs) var conversations []*relationtb.Conversation for _, v := range notExistUserIDs { - conversation := relationtb.Conversation{ConversationType: constant.ReadGroupChatType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID} + conversation := relationtb.Conversation{ + ConversationType: conversation.ConversationType, GroupID: groupID, OwnerUserID: v, ConversationID: conversationID, + // the parameters have default value + RecvMsgOpt: conversation.RecvMsgOpt, IsPinned: conversation.IsPinned, IsPrivateChat: conversation.IsPrivateChat, + BurnDuration: conversation.BurnDuration, GroupAtType: conversation.GroupAtType, AttachedInfo: conversation.AttachedInfo, + Ex: conversation.Ex, MaxSeq: conversation.MaxSeq, MinSeq: conversation.MinSeq, CreateTime: conversation.CreateTime, + MsgDestructTime: conversation.MsgDestructTime, IsMsgDestruct: conversation.IsMsgDestruct, LatestMsgDestructTime: conversation.LatestMsgDestructTime, + } + conversations = append(conversations, &conversation) cache = cache.DelConversations(v, conversationID).DelConversationNotReceiveMessageUserIDs(conversationID) } @@ -320,7 +327,7 @@ func (c *conversationDatabase) CreateGroupChatConversation(ctx context.Context, return err } } - _, err = c.conversationDB.UpdateByMap(ctx, existConversationUserIDs, conversationID, map[string]any{"max_seq": 0}) + _, err = c.conversationDB.UpdateByMap(ctx, existConversationUserIDs, conversationID, map[string]any{"max_seq": conversation.MaxSeq}) if err != nil { return err } From 4c16294f6733e5ffcd567154ba621df8789626d7 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Tue, 20 May 2025 16:08:35 +0800 Subject: [PATCH 3/4] feat: Implement webhook in createConversation (#3228) * update test method args. * feat: implement createConversations webhook function. * improve webhookCreateConversations Implement * implement createconversation webhook. * remove unused paramaters. (cherry picked from commit b969827b9af9120b28f267b7f600e3b97c928e59) --- internal/rpc/conversation/conversation.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index be10f4201..b0b1053ed 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -19,9 +19,12 @@ import ( "sort" "time" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/dbbuild" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + "google.golang.org/grpc" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" @@ -40,7 +43,6 @@ import ( "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" - "google.golang.org/grpc" ) type conversationServer struct { From 388a945cc5c3ad816535ceea4194260de3264ffc Mon Sep 17 00:00:00 2001 From: OpenIM-Gordon <1432970085@qq.com> Date: Fri, 21 Mar 2025 15:10:31 +0800 Subject: [PATCH 4/4] =?UTF-8?q?feat:=20add=20a=20function=20for=20business?= =?UTF-8?q?=20info=20change=20to=20update=20related=20conve=E2=80=A6=20(#3?= =?UTF-8?q?225)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add a function for business info change to update related conversation's ex info. * feat: add a function for business info change to update related conversation's ex info. * feat: add a function for business info change to update related conversation's ex info. * feat: add a function for business info change to update related conversation's ex info. (cherry picked from commit 11044eac586ee2bec8870cc638cb0688631cfdb5) --- internal/api/conversation.go | 5 ++ internal/api/router.go | 4 +- pkg/common/storage/controller/conversation.go | 15 +++++ pkg/common/storage/database/conversation.go | 1 + .../storage/database/mgo/conversation.go | 57 ++++++++++++++++--- 5 files changed, 73 insertions(+), 9 deletions(-) diff --git a/internal/api/conversation.go b/internal/api/conversation.go index 39d9859f0..5a191c0ec 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -16,6 +16,7 @@ package api import ( "github.com/gin-gonic/gin" + "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/tools/a2r" ) @@ -71,3 +72,7 @@ func (o *ConversationApi) GetNotNotifyConversationIDs(c *gin.Context) { func (o *ConversationApi) GetPinnedConversationIDs(c *gin.Context) { a2r.Call(c, conversation.ConversationClient.GetPinnedConversationIDs, o.Client) } + +func (o *ConversationApi) UpdateConversationsByUser(c *gin.Context) { + a2r.Call(c, conversation.ConversationClient.UpdateConversationsByUser, o.Client) +} diff --git a/internal/api/router.go b/internal/api/router.go index 700d8392e..dc004a29e 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -9,6 +9,8 @@ import ( "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/go-playground/validator/v10" + clientv3 "go.etcd.io/etcd/client/v3" + "github.com/openimsdk/open-im-server/v3/internal/api/jssdk" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/config" @@ -28,7 +30,6 @@ import ( "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mw" - clientv3 "go.etcd.io/etcd/client/v3" ) const ( @@ -268,6 +269,7 @@ func newGinRouter(ctx context.Context, client discovery.Conn, cfg *Config) (*gin conversationGroup.POST("/get_owner_conversation", c.GetOwnerConversation) conversationGroup.POST("/get_not_notify_conversation_ids", c.GetNotNotifyConversationIDs) conversationGroup.POST("/get_pinned_conversation_ids", c.GetPinnedConversationIDs) + conversationGroup.POST("/update_conversations_by_user", c.UpdateConversationsByUser) } { diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index 7578394b5..27442ca66 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -46,6 +46,9 @@ type ConversationDatabase interface { // SetUsersConversationFieldTx updates a specific field for multiple users' conversations, creating new conversations if they do not exist, or updates them otherwise. This operation is // transactional. SetUsersConversationFieldTx(ctx context.Context, userIDs []string, conversation *relationtb.Conversation, fieldMap map[string]any) error + // UpdateUserConversations updates all conversations related to a specified user. + // This function does NOT update the user's own conversations but rather the conversations where this user is involved (e.g., other users' conversations referencing this user). + UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error // CreateGroupChatConversation creates a group chat conversation for the specified group ID and user IDs. CreateGroupChatConversation(ctx context.Context, groupID string, userIDs []string, conversations *relationtb.Conversation) error // GetConversationIDs retrieves conversation IDs for a given user. @@ -145,6 +148,18 @@ func (c *conversationDatabase) SetUsersConversationFieldTx(ctx context.Context, }) } +func (c *conversationDatabase) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) error { + conversations, err := c.conversationDB.UpdateUserConversations(ctx, userID, args) + if err != nil { + return err + } + cache := c.cache.CloneConversationCache() + for _, conversation := range conversations { + cache = cache.DelUsersConversation(conversation.ConversationID, conversation.OwnerUserID).DelConversationVersionUserIDs(conversation.OwnerUserID) + } + return cache.ChainExecDel(ctx) +} + func (c *conversationDatabase) UpdateUsersConversationField(ctx context.Context, userIDs []string, conversationID string, args map[string]any) error { _, err := c.conversationDB.UpdateByMap(ctx, userIDs, conversationID, args) if err != nil { diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 1fb53cfed..d612dfc2d 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -24,6 +24,7 @@ import ( type Conversation interface { Create(ctx context.Context, conversations []*model.Conversation) (err error) UpdateByMap(ctx context.Context, userIDs []string, conversationID string, args map[string]any) (rows int64, err error) + UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) Update(ctx context.Context, conversation *model.Conversation) (err error) Find(ctx context.Context, ownerUserID string, conversationIDs []string) (conversations []*model.Conversation, err error) FindUserID(ctx context.Context, userIDs []string, conversationIDs []string) ([]string, error) diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 536827450..89f13ea3d 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -21,23 +21,32 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/errs" - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" ) func NewConversationMongo(db *mongo.Database) (*ConversationMgo, error) { coll := db.Collection(database.ConversationName) - _, err := coll.Indexes().CreateOne(context.Background(), mongo.IndexModel{ - Keys: bson.D{ - {Key: "owner_user_id", Value: 1}, - {Key: "conversation_id", Value: 1}, + _, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{ + { + Keys: bson.D{ + {Key: "owner_user_id", Value: 1}, + {Key: "conversation_id", Value: 1}, + }, + Options: options.Index().SetUnique(true), + }, + { + Keys: bson.D{ + {Key: "user_id", Value: 1}, + }, + Options: options.Index(), }, - Options: options.Index().SetUnique(true), }) if err != nil { return nil, errs.Wrap(err) @@ -101,6 +110,38 @@ func (c *ConversationMgo) UpdateByMap(ctx context.Context, userIDs []string, con return rows, nil } +func (c *ConversationMgo) UpdateUserConversations(ctx context.Context, userID string, args map[string]any) ([]*model.Conversation, error) { + if len(args) == 0 { + return nil, nil + } + filter := bson.M{ + "user_id": userID, + } + + conversations, err := mongoutil.Find[*model.Conversation](ctx, c.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1, "conversation_id": 1})) + if err != nil { + return nil, err + } + err = mongoutil.IncrVersion(func() error { + _, err := mongoutil.UpdateMany(ctx, c.coll, filter, bson.M{"$set": args}) + if err != nil { + return err + } + return nil + }, func() error { + for _, conversation := range conversations { + if err := c.version.IncrVersion(ctx, conversation.OwnerUserID, []string{conversation.ConversationID}, model.VersionStateUpdate); err != nil { + return err + } + } + return nil + }) + if err != nil { + return nil, err + } + return conversations, nil +} + func (c *ConversationMgo) Update(ctx context.Context, conversation *model.Conversation) (err error) { return mongoutil.IncrVersion(func() error { return mongoutil.UpdateOne(ctx, c.coll, bson.M{"owner_user_id": conversation.OwnerUserID, "conversation_id": conversation.ConversationID}, bson.M{"$set": conversation}, true)