diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 4d6d70e03..5bf3b4035 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -38,7 +38,7 @@ func toCommonCallback(ctx context.Context, msg *pbChat.SendMsgReq, command strin } } -func CallbackBeforeSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { +func callbackBeforeSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable { return nil } @@ -50,7 +50,7 @@ func CallbackBeforeSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) er return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg) } -func CallbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { +func callbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable { return nil } @@ -62,7 +62,7 @@ func CallbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) err return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg) } -func CallbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { +func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable { return nil } @@ -74,7 +74,7 @@ func CallbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) err return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg) } -func CallbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { +func callbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error { if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { return nil } @@ -86,7 +86,7 @@ func CallbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) erro return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg) } -func CallbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error { +func callbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error { if !config.Config.Callback.CallbackMsgModify.Enable || msg.MsgData.ContentType != constant.Text { return nil } diff --git a/internal/rpc/msg/extend_msg_callback.go b/internal/rpc/msg/extend_msg_callback.go index 9f88e4999..ca17c69e1 100644 --- a/internal/rpc/msg/extend_msg_callback.go +++ b/internal/rpc/msg/extend_msg_callback.go @@ -11,7 +11,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" ) -func CallbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMessageReactionExtensionsReq) error { +func callbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMessageReactionExtensionsReq) error { if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { return nil } @@ -35,7 +35,7 @@ func CallbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMe return nil } -func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessagesReactionExtensionsReq) error { +func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessagesReactionExtensionsReq) error { if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { return nil } @@ -54,7 +54,7 @@ func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessagesReactionE return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg) } -func CallbackGetMessageListReactionExtensions(ctx context.Context, getReq *msg.GetMessagesReactionExtensionsReq) error { +func callbackGetMessageListReactionExtensions(ctx context.Context, getReq *msg.GetMessagesReactionExtensionsReq) error { if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { return nil } @@ -70,7 +70,7 @@ func CallbackGetMessageListReactionExtensions(ctx context.Context, getReq *msg.G return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg) } -func CallbackAddMessageReactionExtensions(ctx context.Context, setReq *msg.ModifyMessageReactionExtensionsReq) error { +func callbackAddMessageReactionExtensions(ctx context.Context, setReq *msg.ModifyMessageReactionExtensionsReq) error { req := &cbapi.CallbackAddMessageReactionExtReq{ OperationID: mcontext.GetOperationID(ctx), CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand, diff --git a/internal/rpc/msg/message_interceptor.go b/internal/rpc/msg/message_interceptor.go index e07db2ec8..9d4314dff 100644 --- a/internal/rpc/msg/message_interceptor.go +++ b/internal/rpc/msg/message_interceptor.go @@ -2,6 +2,7 @@ package msg import ( "context" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/config" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" @@ -30,7 +31,7 @@ func MessageHasReadEnabled(_ context.Context, req *msg.SendMsgReq) (*sdkws.MsgDa return req.MsgData, nil } func MessageModifyCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) { - if err := CallbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue { + if err := callbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue { log.ZWarn(ctx, "CallbackMsgModify failed", err, "req", req.String()) return nil, err } @@ -39,18 +40,13 @@ func MessageModifyCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.Msg func MessageBeforeSendCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) { switch req.MsgData.SessionType { case constant.SingleChatType: - if err := CallbackBeforeSendSingleMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue { + if err := callbackBeforeSendSingleMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue { log.ZWarn(ctx, "CallbackBeforeSendSingleMsg failed", err, "req", req.String()) return nil, err } - case constant.GroupChatType: - if err := CallbackBeforeSendGroupMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue { - log.ZWarn(ctx, "CallbackBeforeSendGroupMsg failed", err, "req", req.String()) - return nil, err - } case constant.NotificationChatType: case constant.SuperGroupChatType: - if err := CallbackBeforeSendGroupMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue { + if err := callbackBeforeSendGroupMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue { log.ZWarn(ctx, "CallbackBeforeSendGroupMsg failed", err, "req", req.String()) return nil, err } diff --git a/internal/rpc/msg/send_pull.go b/internal/rpc/msg/send.go similarity index 78% rename from internal/rpc/msg/send_pull.go rename to internal/rpc/msg/send.go index ee8d60243..885487896 100644 --- a/internal/rpc/msg/send_pull.go +++ b/internal/rpc/msg/send.go @@ -12,6 +12,28 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" ) +func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, error error) { + resp = &msg.SendMsgResp{} + flag := isMessageHasReadEnabled(req.MsgData) + if !flag { + return nil, errs.ErrMessageHasReadDisable.Wrap() + } + m.encapsulateMsgData(req.MsgData) + if err := callbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue { + return nil, err + } + switch req.MsgData.SessionType { + case constant.SingleChatType: + return m.sendMsgSingleChat(ctx, req) + case constant.NotificationChatType: + return m.sendMsgNotification(ctx, req) + case constant.SuperGroupChatType: + return m.sendMsgSuperGroupChat(ctx, req) + default: + return nil, errs.ErrArgs.Wrap("unknown sessionType") + } +} + func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMsgReq) (resp *pbMsg.SendMsgResp, err error) { resp = &pbMsg.SendMsgResp{} promePkg.Inc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter) @@ -23,7 +45,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMs if err != nil { return nil, err } - if err = CallbackAfterSendGroupMsg(ctx, req); err != nil { + if err = callbackAfterSendGroupMsg(ctx, req); err != nil { log.ZError(ctx, "CallbackAfterSendGroupMsg", err) } promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter) @@ -69,7 +91,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbMsg.SendMsgReq promePkg.Inc(promePkg.SingleChatMsgProcessFailedCounter) return nil, err } - err = CallbackAfterSendSingleMsg(ctx, req) + err = callbackAfterSendSingleMsg(ctx, req) if err != nil && err != errs.ErrCallbackContinue { return nil, err } diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index f9288d312..15392ada2 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -3,21 +3,15 @@ package msg import ( "context" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome" - "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry" - "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg" - "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" - "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "google.golang.org/grpc" ) @@ -102,75 +96,3 @@ func (m *msgServer) initPrometheus() { prome.NewWorkSuperGroupChatMsgProcessSuccessCounter() prome.NewWorkSuperGroupChatMsgProcessFailedCounter() } - -func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, error error) { - resp = &msg.SendMsgResp{} - flag := isMessageHasReadEnabled(req.MsgData) - if !flag { - return nil, errs.ErrMessageHasReadDisable.Wrap() - } - m.encapsulateMsgData(req.MsgData) - if err := CallbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue { - return nil, err - } - switch req.MsgData.SessionType { - case constant.SingleChatType: - return m.sendMsgSingleChat(ctx, req) - case constant.NotificationChatType: - return m.sendMsgNotification(ctx, req) - case constant.SuperGroupChatType: - return m.sendMsgSuperGroupChat(ctx, req) - default: - return nil, errs.ErrArgs.Wrap("unknown sessionType") - } -} - -func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) { - if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil { - return nil, err - } - conversationIDs, err := m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID) - if err != nil { - return nil, err - } - for _, conversationID := range conversationIDs { - conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID)) - } - log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs) - maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs) - if err != nil { - log.ZWarn(ctx, "GetMaxSeqs error", err, "conversationIDs", conversationIDs, "maxSeqs", maxSeqs) - return nil, err - } - resp := new(sdkws.GetMaxSeqResp) - resp.MaxSeqs = maxSeqs - return resp, nil -} - -func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) { - resp := &sdkws.PullMessageBySeqsResp{} - resp.Msgs = make(map[string]*sdkws.PullMsgs) - resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs) - for _, seq := range req.SeqRanges { - if !utils.IsNotification(seq.ConversationID) { - msgs, err := m.MsgDatabase.GetMsgBySeqsRange(ctx, seq.ConversationID, seq.Begin, seq.End, seq.Num) - if err != nil { - log.ZWarn(ctx, "GetMsgBySeqsRange error", err, "conversationID", seq.ConversationID, "seq", seq) - continue - } - resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: msgs} - } else { - var seqs []int64 - for i := seq.Begin; i <= seq.End; i++ { - seqs = append(seqs, i) - } - notificationMsgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, seq.ConversationID, seqs) - if err != nil { - log.ZWarn(ctx, "GetMsgBySeqs error", err, "conversationID", seq.ConversationID, "seq", seq) - continue - } - resp.NotificationMsgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: notificationMsgs} - } - } - return resp, nil -} diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go new file mode 100644 index 000000000..da010b0f5 --- /dev/null +++ b/internal/rpc/msg/sync_msg.go @@ -0,0 +1,60 @@ +package msg + +import ( + "context" + + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" + "github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws" + "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" +) + +func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) { + resp := &sdkws.PullMessageBySeqsResp{} + resp.Msgs = make(map[string]*sdkws.PullMsgs) + resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs) + for _, seq := range req.SeqRanges { + if !utils.IsNotification(seq.ConversationID) { + msgs, err := m.MsgDatabase.GetMsgBySeqsRange(ctx, seq.ConversationID, seq.Begin, seq.End, seq.Num) + if err != nil { + log.ZWarn(ctx, "GetMsgBySeqsRange error", err, "conversationID", seq.ConversationID, "seq", seq) + continue + } + resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: msgs} + } else { + var seqs []int64 + for i := seq.Begin; i <= seq.End; i++ { + seqs = append(seqs, i) + } + notificationMsgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, seq.ConversationID, seqs) + if err != nil { + log.ZWarn(ctx, "GetMsgBySeqs error", err, "conversationID", seq.ConversationID, "seq", seq) + continue + } + resp.NotificationMsgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: notificationMsgs} + } + } + return resp, nil +} + +func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) { + if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil { + return nil, err + } + conversationIDs, err := m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID) + if err != nil { + return nil, err + } + for _, conversationID := range conversationIDs { + conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID)) + } + log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs) + maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs) + if err != nil { + log.ZWarn(ctx, "GetMaxSeqs error", err, "conversationIDs", conversationIDs, "maxSeqs", maxSeqs) + return nil, err + } + resp := new(sdkws.GetMaxSeqResp) + resp.MaxSeqs = maxSeqs + return resp, nil +} diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/verify.go similarity index 95% rename from internal/rpc/msg/send_msg.go rename to internal/rpc/msg/verify.go index 7f963b537..446363354 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/verify.go @@ -292,20 +292,3 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, us } return true, nil } - -func valueCopy(pb *msg.SendMsgReq) *msg.SendMsgReq { - offlinePushInfo := sdkws.OfflinePushInfo{} - if pb.MsgData.OfflinePushInfo != nil { - offlinePushInfo = *pb.MsgData.OfflinePushInfo - } - msgData := sdkws.MsgData{} - msgData = *pb.MsgData - msgData.OfflinePushInfo = &offlinePushInfo - - options := make(map[string]bool, 10) - for key, value := range pb.MsgData.Options { - options[key] = value - } - msgData.Options = options - return &msg.SendMsgReq{MsgData: &msgData} -}