From 16231ee077dc32ac9912c11c2b22d6a4565a8abe Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 26 Apr 2023 14:10:12 +0800 Subject: [PATCH] conversation --- config/notification.yaml | 8 +- internal/rpc/conversation/conversaion.go | 22 +-- pkg/common/config/config.go | 14 +- pkg/common/constant/constant.go | 3 +- pkg/rpcclient/meta.go | 71 ++++++++-- pkg/rpcclient/msg.go | 35 ++--- pkg/rpcclient/notification/conevrsation.go | 24 +--- pkg/utils/options.go | 149 +++++++++++++++++++++ 8 files changed, 261 insertions(+), 65 deletions(-) diff --git a/config/notification.yaml b/config/notification.yaml index 1e1f9e467..8bd20b2d2 100644 --- a/config/notification.yaml +++ b/config/notification.yaml @@ -244,14 +244,14 @@ userInfoUpdated: ext: "Remove a blocked user" #####################conversation######################### -conversationOptUpdate: +conversationChanged: isSendMsg: true unreadCount: false offlinePush: enable: true - title: "conversation opt update" - desc: "conversation opt update" - ext: "conversation opt update" + title: "conversation changed" + desc: "conversation changed" + ext: "conversation changed" conversationSetPrivate: isSendMsg: true diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 0b3bf222a..3f3c00a3e 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -21,7 +21,7 @@ import ( type conversationServer struct { group *rpcclient.GroupClient - ConversationDatabase controller.ConversationDatabase + conversationDatabase controller.ConversationDatabase conversationNotificationSender *notification.ConversationNotificationSender } @@ -41,14 +41,14 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e pbConversation.RegisterConversationServer(server, &conversationServer{ conversationNotificationSender: notification.NewConversationNotificationSender(client), group: rpcclient.NewGroupClient(client), - ConversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewGorm(db)), + conversationDatabase: controller.NewConversationDatabase(conversationDB, cache.NewConversationRedis(rdb, cache.GetDefaultOpt(), conversationDB), tx.NewGorm(db)), }) return nil } func (c *conversationServer) GetConversation(ctx context.Context, req *pbConversation.GetConversationReq) (*pbConversation.GetConversationResp, error) { resp := &pbConversation.GetConversationResp{Conversation: &pbConversation.Conversation{}} - conversations, err := c.ConversationDatabase.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID}) + conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, []string{req.ConversationID}) if err != nil { return nil, err } @@ -61,7 +61,7 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbConvers func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbConversation.GetAllConversationsReq) (*pbConversation.GetAllConversationsResp, error) { resp := &pbConversation.GetAllConversationsResp{Conversations: []*pbConversation.Conversation{}} - conversations, err := c.ConversationDatabase.GetUserAllConversation(ctx, req.OwnerUserID) + conversations, err := c.conversationDatabase.GetUserAllConversation(ctx, req.OwnerUserID) if err != nil { return nil, err } @@ -70,7 +70,7 @@ func (c *conversationServer) GetAllConversations(ctx context.Context, req *pbCon } func (c *conversationServer) GetConversations(ctx context.Context, req *pbConversation.GetConversationsReq) (*pbConversation.GetConversationsResp, error) { - conversations, err := c.ConversationDatabase.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs) + conversations, err := c.conversationDatabase.FindConversations(ctx, req.OwnerUserID, req.ConversationIDs) if err != nil { return nil, err } @@ -81,7 +81,7 @@ func (c *conversationServer) GetConversations(ctx context.Context, req *pbConver func (c *conversationServer) BatchSetConversations(ctx context.Context, req *pbConversation.BatchSetConversationsReq) (*pbConversation.BatchSetConversationsResp, error) { conversations := convert.ConversationsPb2DB(req.Conversations) - err := c.ConversationDatabase.SetUserConversations(ctx, req.OwnerUserID, conversations) + err := c.conversationDatabase.SetUserConversations(ctx, req.OwnerUserID, conversations) if err != nil { return nil, err } @@ -94,7 +94,7 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbConvers if err := utils.CopyStructFields(&conversation, req.Conversation); err != nil { return nil, err } - err := c.ConversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tableRelation.ConversationModel{&conversation}) + err := c.conversationDatabase.SetUserConversations(ctx, req.Conversation.OwnerUserID, []*tableRelation.ConversationModel{&conversation}) if err != nil { return nil, err } @@ -104,7 +104,7 @@ func (c *conversationServer) SetConversation(ctx context.Context, req *pbConvers } func (c *conversationServer) SetRecvMsgOpt(ctx context.Context, req *pbConversation.SetRecvMsgOptReq) (*pbConversation.SetRecvMsgOptResp, error) { - if err := c.ConversationDatabase.SetUsersConversationFiledTx(ctx, []string{req.OwnerUserID}, &tableRelation.ConversationModel{OwnerUserID: req.OwnerUserID, ConversationID: req.ConversationID, RecvMsgOpt: req.RecvMsgOpt}, map[string]interface{}{"recv_msg_opt": req.RecvMsgOpt}); err != nil { + if err := c.conversationDatabase.SetUsersConversationFiledTx(ctx, []string{req.OwnerUserID}, &tableRelation.ConversationModel{OwnerUserID: req.OwnerUserID, ConversationID: req.ConversationID, RecvMsgOpt: req.RecvMsgOpt}, map[string]interface{}{"recv_msg_opt": req.RecvMsgOpt}); err != nil { return nil, err } c.conversationNotificationSender.ConversationChangeNotification(ctx, req.OwnerUserID) @@ -129,7 +129,7 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p return nil, err } if req.FieldType == constant.FieldIsPrivateChat { - err := c.ConversationDatabase.SyncPeerUserPrivateConversationTx(ctx, &conversation) + err := c.conversationDatabase.SyncPeerUserPrivateConversationTx(ctx, &conversation) if err != nil { return nil, err } @@ -156,7 +156,7 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p case constant.FieldBurnDuration: filedMap["burn_duration"] = req.Conversation.BurnDuration } - err = c.ConversationDatabase.SetUsersConversationFiledTx(ctx, req.UserIDList, &conversation, filedMap) + err = c.conversationDatabase.SetUsersConversationFiledTx(ctx, req.UserIDList, &conversation, filedMap) if err != nil { return nil, err } @@ -175,7 +175,7 @@ func (c *conversationServer) ModifyConversationField(ctx context.Context, req *p // 获取超级大群开启免打扰的用户ID func (c *conversationServer) GetRecvMsgNotNotifyUserIDs(ctx context.Context, req *pbConversation.GetRecvMsgNotNotifyUserIDsReq) (*pbConversation.GetRecvMsgNotNotifyUserIDsResp, error) { - userIDs, err := c.ConversationDatabase.FindRecvMsgNotNotifyUserIDs(ctx, req.GroupID) + userIDs, err := c.conversationDatabase.FindRecvMsgNotNotifyUserIDs(ctx, req.GroupID) if err != nil { return nil, err } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 5351c80d4..1ac97cc51 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -8,6 +8,7 @@ import ( "runtime" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" + "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" _ "embed" @@ -348,10 +349,21 @@ type Notification struct { BlackDeleted NotificationConf `yaml:"blackDeleted"` FriendInfoUpdated NotificationConf `yaml:"friendInfoUpdated"` //////////////////////conversation/////////////////////// - ConversationOptUpdate NotificationConf `yaml:"conversationOptUpdate"` + ConversationChanged NotificationConf `yaml:"conversationChanged"` ConversationSetPrivate NotificationConf `yaml:"conversationSetPrivate"` } +func GetOptionsByNotification(cfg NotificationConf) utils.Options { + opts := utils.NewOptions() + if cfg.UnreadCount { + opts = utils.WithOptions(opts, utils.WithUnreadCount()) + } + if cfg.OfflinePush.Enable { + opts = utils.WithOptions(opts, utils.WithOfflinePush()) + } + return opts +} + func (c *config) unmarshalConfig(config interface{}, configPath string) error { bytes, err := ioutil.ReadFile(configPath) if err != nil { diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index 3b8d62560..d6efe31b3 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -44,7 +44,7 @@ const ( BlackDeletedNotification = 1208 //remove_black FriendInfoUpdatedNotification = 1209 - ConversationOptChangeNotification = 1300 // change conversation opt + ConversationChangeNotification = 1300 // change conversation opt UserNotificationBegin = 1301 UserInfoUpdatedNotification = 1303 //SetSelfInfoTip = 204 @@ -142,6 +142,7 @@ const ( IsSenderConversationUpdate = "senderConversationUpdate" IsSenderNotificationPush = "senderNotificationPush" IsReactionFromCache = "reactionFromCache" + IsNotification = "isNotification" //GroupStatus GroupOk = 0 diff --git a/pkg/rpcclient/meta.go b/pkg/rpcclient/meta.go index 55f2f4ccf..f6c81f76a 100644 --- a/pkg/rpcclient/meta.go +++ b/pkg/rpcclient/meta.go @@ -1,7 +1,13 @@ package rpcclient import ( + "context" + "fmt" + + relationTb "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/table/relation" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" + "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" + sdkws "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "google.golang.org/grpc" ) @@ -9,30 +15,69 @@ type MetaClient struct { // contains filtered or unexported fields client discoveryregistry.SvcDiscoveryRegistry rpcRegisterName string + getUsersInfo func(ctx context.Context, userIDs []string) ([]CommonUser, error) } -func NewMetaClient(client discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string) *MetaClient { - return &MetaClient{ +func NewMetaClient(client discoveryregistry.SvcDiscoveryRegistry, rpcRegisterName string, opts ...MetaClientOptions) *MetaClient { + c := &MetaClient{ client: client, rpcRegisterName: rpcRegisterName, } + for _, opt := range opts { + opt(c) + } + return c +} + +type MetaClientOptions func(*MetaClient) + +func WithDBFunc(fn func(ctx context.Context, userIDs []string) (users []*relationTb.UserModel, err error)) MetaClientOptions { + return func(s *MetaClient) { + f := func(ctx context.Context, userIDs []string) (result []CommonUser, err error) { + users, err := fn(ctx, userIDs) + if err != nil { + return nil, err + } + for _, user := range users { + result = append(result, user) + } + return result, nil + } + s.getUsersInfo = f + } +} + +func WithRpcFunc(fn func(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error)) MetaClientOptions { + return func(s *MetaClient) { + f := func(ctx context.Context, userIDs []string) (result []CommonUser, err error) { + users, err := fn(ctx, userIDs) + if err != nil { + return nil, err + } + for _, user := range users { + result = append(result, user) + } + return result, err + } + s.getUsersInfo = f + } +} + +func (m *MetaClient) getFaceURLAndName(userID string) (faceURL, nickname string, err error) { + users, err := m.getUsersInfo(context.Background(), []string{userID}) + if err != nil { + return "", "", err + } + if len(users) == 0 { + return "", "", errs.ErrRecordNotFound.Wrap(fmt.Sprintf("notification user %s not found", userID)) + } + return users[0].GetFaceURL(), users[0].GetNickname(), nil } func (m *MetaClient) getConn() (*grpc.ClientConn, error) { return m.client.GetConn(m.rpcRegisterName) } -type NotificationMsg struct { - SendID string - RecvID string - Content []byte - MsgFrom int32 - ContentType int32 - SessionType int32 - SenderNickname string - SenderFaceURL string -} - type CommonUser interface { GetNickname() string GetFaceURL() string diff --git a/pkg/rpcclient/msg.go b/pkg/rpcclient/msg.go index c2f3cd251..f406c701f 100644 --- a/pkg/rpcclient/msg.go +++ b/pkg/rpcclient/msg.go @@ -2,6 +2,7 @@ package rpcclient import ( "context" + "encoding/json" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" @@ -9,6 +10,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" + "github.com/golang/protobuf/proto" ) type MsgClient struct { @@ -46,26 +48,29 @@ func (m *MsgClient) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMes return resp, err } -func (c *MsgClient) Notification(ctx context.Context, notificationMsg *NotificationMsg) error { - var err error +func (c *MsgClient) Notification(ctx context.Context, sendID, recvID string, contentType, sessionType int32, m proto.Message, cfg config.NotificationConf, opts ...utils.OptionsOpt) error { + content, err := json.Marshal(m) + if err != nil { + return err + } var req msg.SendMsgReq var msg sdkws.MsgData var offlineInfo sdkws.OfflinePushInfo var title, desc, ex string - var pushEnable, unReadCount bool - msg.SendID = notificationMsg.SendID - msg.RecvID = notificationMsg.RecvID - msg.Content = notificationMsg.Content - msg.MsgFrom = notificationMsg.MsgFrom - msg.ContentType = notificationMsg.ContentType - msg.SessionType = notificationMsg.SessionType + msg.SendID = sendID + msg.RecvID = recvID + msg.Content = content + msg.MsgFrom = constant.SysMsgType + msg.ContentType = contentType + msg.SessionType = sessionType msg.CreateTime = utils.GetCurrentTimestampByMill() - msg.ClientMsgID = utils.GetMsgID(notificationMsg.SendID) - msg.Options = make(map[string]bool, 7) - msg.SenderNickname = notificationMsg.SenderNickname - msg.SenderFaceURL = notificationMsg.SenderFaceURL - utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, unReadCount) - utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, pushEnable) + msg.ClientMsgID = utils.GetMsgID(sendID) + // msg.Options = make(map[string]bool, 7) + // todo notification get sender name and face url + // msg.SenderNickname, msg.SenderFaceURL, err = c.getFaceURLAndName(sendID) + options := config.GetOptionsByNotification(cfg) + options = utils.WithOptions(options, opts...) + msg.Options = options offlineInfo.Title = title offlineInfo.Desc = desc offlineInfo.Ex = ex diff --git a/pkg/rpcclient/notification/conevrsation.go b/pkg/rpcclient/notification/conevrsation.go index 646fedd0d..4e8f739ba 100644 --- a/pkg/rpcclient/notification/conevrsation.go +++ b/pkg/rpcclient/notification/conevrsation.go @@ -2,13 +2,12 @@ package notification import ( "context" - "encoding/json" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" - "github.com/golang/protobuf/proto" ) type ConversationNotificationSender struct { @@ -19,21 +18,6 @@ func NewConversationNotificationSender(client discoveryregistry.SvcDiscoveryRegi return &ConversationNotificationSender{rpcclient.NewMsgClient(client)} } -func (c *ConversationNotificationSender) SetConversationNotification(ctx context.Context, sendID, recvID string, contentType int, m proto.Message) { - var err error - var n rpcclient.NotificationMsg - n.SendID = sendID - n.RecvID = recvID - n.ContentType = int32(contentType) - n.SessionType = constant.SingleChatType - n.MsgFrom = constant.SysMsgType - n.Content, err = json.Marshal(m) - if err != nil { - return - } - c.Notification(ctx, &n) -} - // SetPrivate调用 func (c *ConversationNotificationSender) ConversationSetPrivateNotification(ctx context.Context, sendID, recvID string, isPrivateChat bool) { tips := &sdkws.ConversationSetPrivateTips{ @@ -41,7 +25,7 @@ func (c *ConversationNotificationSender) ConversationSetPrivateNotification(ctx SendID: sendID, IsPrivate: isPrivateChat, } - c.SetConversationNotification(ctx, sendID, recvID, constant.ConversationPrivateChatNotification, tips) + c.Notification(ctx, sendID, recvID, constant.ConversationPrivateChatNotification, constant.SingleChatType, tips, config.Config.Notification.ConversationSetPrivate) } // 会话改变 @@ -49,7 +33,7 @@ func (c *ConversationNotificationSender) ConversationChangeNotification(ctx cont tips := &sdkws.ConversationUpdateTips{ UserID: userID, } - c.SetConversationNotification(ctx, userID, userID, constant.ConversationOptChangeNotification, tips) + c.Notification(ctx, userID, userID, constant.ConversationChangeNotification, constant.SingleChatType, tips, config.Config.Notification.ConversationChanged) } // 会话未读数同步 @@ -59,5 +43,5 @@ func (c *ConversationNotificationSender) ConversationUnreadChangeNotification(ct ConversationIDList: []string{conversationID}, UpdateUnreadCountTime: updateUnreadCountTime, } - c.SetConversationNotification(ctx, userID, userID, constant.ConversationUnreadNotification, tips) + c.Notification(ctx, userID, userID, constant.ConversationUnreadNotification, constant.SingleChatType, tips, config.Config.Notification.ConversationChanged) } diff --git a/pkg/utils/options.go b/pkg/utils/options.go index d4b585bf7..5a551e4f4 100644 --- a/pkg/utils/options.go +++ b/pkg/utils/options.go @@ -1 +1,150 @@ package utils + +import "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" + +type Options map[string]bool +type OptionsOpt func(Options) + +func NewOptions(opts ...OptionsOpt) Options { + options := make(map[string]bool, 11) + options[constant.IsNotification] = false + options[constant.IsHistory] = false + options[constant.IsPersistent] = false + options[constant.IsOfflinePush] = false + options[constant.IsUnreadCount] = false + options[constant.IsConversationUpdate] = false + options[constant.IsSenderSync] = false + options[constant.IsNotPrivate] = false + options[constant.IsSenderConversationUpdate] = false + options[constant.IsSenderNotificationPush] = false + options[constant.IsReactionFromCache] = false + for _, opt := range opts { + opt(options) + } + return options +} + +func WithOptions(options Options, opts ...OptionsOpt) Options { + for _, opt := range opts { + opt(options) + } + return options +} + +func WithNotification() OptionsOpt { + return func(options Options) { + options[constant.IsNotification] = true + } +} + +func WithHistory() OptionsOpt { + return func(options Options) { + options[constant.IsHistory] = true + } +} + +func WithPersistent() OptionsOpt { + return func(options Options) { + options[constant.IsPersistent] = true + } +} + +func WithOfflinePush() OptionsOpt { + return func(options Options) { + options[constant.IsOfflinePush] = true + } +} + +func WithUnreadCount() OptionsOpt { + return func(options Options) { + options[constant.IsUnreadCount] = true + } +} + +func WithConversationUpdate() OptionsOpt { + return func(options Options) { + options[constant.IsConversationUpdate] = true + } +} + +func WithSenderSync() OptionsOpt { + return func(options Options) { + options[constant.IsSenderSync] = true + } +} + +func WithNotPrivate() OptionsOpt { + return func(options Options) { + options[constant.IsNotPrivate] = true + } +} + +func WithSenderConversationUpdate() OptionsOpt { + return func(options Options) { + options[constant.IsSenderConversationUpdate] = true + } +} + +func WithSenderNotificationPush() OptionsOpt { + return func(options Options) { + options[constant.IsSenderNotificationPush] = true + } +} + +func WithReactionFromCache() OptionsOpt { + return func(options Options) { + options[constant.IsReactionFromCache] = true + } +} + +func (o Options) Is(notification string) bool { + v, ok := o[notification] + if !ok || v { + return true + } + return false +} + +func (o Options) IsNotification() bool { + return o.Is(constant.IsNotification) +} + +func (o Options) IsHistory(options Options) bool { + return o.Is(constant.IsHistory) +} + +func (o Options) IsPersistent(options Options) bool { + return o.Is(constant.IsPersistent) +} + +func (o Options) IsOfflinePush(options Options) bool { + return o.Is(constant.IsOfflinePush) +} + +func (o Options) IsUnreadCount(options Options) bool { + return o.Is(constant.IsUnreadCount) +} + +func (o Options) IsConversationUpdate(options Options) bool { + return o.Is(constant.IsConversationUpdate) +} + +func (o Options) IsSenderSync(options Options) bool { + return o.Is(constant.IsSenderSync) +} + +func (o Options) IsNotPrivate(options Options) bool { + return o.Is(constant.IsNotPrivate) +} + +func (o Options) IsSenderConversationUpdate(options Options) bool { + return o.Is(constant.IsSenderConversationUpdate) +} + +func (o Options) IsSenderNotificationPush(options Options) bool { + return o.Is(constant.IsSenderNotificationPush) +} + +func (o Options) IsReactionFromCache(options Options) bool { + return o.Is(constant.IsReactionFromCache) +}