mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
del msg
This commit is contained in:
parent
c16c78aca5
commit
6c271ef546
@ -38,7 +38,7 @@ func toCommonCallback(ctx context.Context, msg *pbChat.SendMsgReq, command strin
|
||||
}
|
||||
}
|
||||
|
||||
func CallbackBeforeSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||
func callbackBeforeSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||
if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable {
|
||||
return nil
|
||||
}
|
||||
@ -50,7 +50,7 @@ func CallbackBeforeSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) er
|
||||
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg)
|
||||
}
|
||||
|
||||
func CallbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||
func callbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||
if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable {
|
||||
return nil
|
||||
}
|
||||
@ -62,7 +62,7 @@ func CallbackAfterSendSingleMsg(ctx context.Context, msg *pbChat.SendMsgReq) err
|
||||
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg)
|
||||
}
|
||||
|
||||
func CallbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||
func callbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||
if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable {
|
||||
return nil
|
||||
}
|
||||
@ -74,7 +74,7 @@ func CallbackBeforeSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) err
|
||||
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendSingleMsg)
|
||||
}
|
||||
|
||||
func CallbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||
func callbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
|
||||
return nil
|
||||
}
|
||||
@ -86,7 +86,7 @@ func CallbackAfterSendGroupMsg(ctx context.Context, msg *pbChat.SendMsgReq) erro
|
||||
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
|
||||
}
|
||||
|
||||
func CallbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||
func callbackMsgModify(ctx context.Context, msg *pbChat.SendMsgReq) error {
|
||||
if !config.Config.Callback.CallbackMsgModify.Enable || msg.MsgData.ContentType != constant.Text {
|
||||
return nil
|
||||
}
|
||||
|
@ -11,7 +11,7 @@ import (
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||
)
|
||||
|
||||
func CallbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMessageReactionExtensionsReq) error {
|
||||
func callbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMessageReactionExtensionsReq) error {
|
||||
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
|
||||
return nil
|
||||
}
|
||||
@ -35,7 +35,7 @@ func CallbackSetMessageReactionExtensions(ctx context.Context, setReq *msg.SetMe
|
||||
return nil
|
||||
}
|
||||
|
||||
func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessagesReactionExtensionsReq) error {
|
||||
func callbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessagesReactionExtensionsReq) error {
|
||||
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
|
||||
return nil
|
||||
}
|
||||
@ -54,7 +54,7 @@ func CallbackDeleteMessageReactionExtensions(setReq *msg.DeleteMessagesReactionE
|
||||
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
|
||||
}
|
||||
|
||||
func CallbackGetMessageListReactionExtensions(ctx context.Context, getReq *msg.GetMessagesReactionExtensionsReq) error {
|
||||
func callbackGetMessageListReactionExtensions(ctx context.Context, getReq *msg.GetMessagesReactionExtensionsReq) error {
|
||||
if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable {
|
||||
return nil
|
||||
}
|
||||
@ -70,7 +70,7 @@ func CallbackGetMessageListReactionExtensions(ctx context.Context, getReq *msg.G
|
||||
return http.CallBackPostReturn(cbURL(), req, resp, config.Config.Callback.CallbackAfterSendGroupMsg)
|
||||
}
|
||||
|
||||
func CallbackAddMessageReactionExtensions(ctx context.Context, setReq *msg.ModifyMessageReactionExtensionsReq) error {
|
||||
func callbackAddMessageReactionExtensions(ctx context.Context, setReq *msg.ModifyMessageReactionExtensionsReq) error {
|
||||
req := &cbapi.CallbackAddMessageReactionExtReq{
|
||||
OperationID: mcontext.GetOperationID(ctx),
|
||||
CallbackCommand: constant.CallbackAddMessageListReactionExtensionsCommand,
|
||||
|
@ -2,6 +2,7 @@ package msg
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/config"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||
@ -30,7 +31,7 @@ func MessageHasReadEnabled(_ context.Context, req *msg.SendMsgReq) (*sdkws.MsgDa
|
||||
return req.MsgData, nil
|
||||
}
|
||||
func MessageModifyCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) {
|
||||
if err := CallbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
||||
if err := callbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
||||
log.ZWarn(ctx, "CallbackMsgModify failed", err, "req", req.String())
|
||||
return nil, err
|
||||
}
|
||||
@ -39,18 +40,13 @@ func MessageModifyCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.Msg
|
||||
func MessageBeforeSendCallback(ctx context.Context, req *msg.SendMsgReq) (*sdkws.MsgData, error) {
|
||||
switch req.MsgData.SessionType {
|
||||
case constant.SingleChatType:
|
||||
if err := CallbackBeforeSendSingleMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
||||
if err := callbackBeforeSendSingleMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
||||
log.ZWarn(ctx, "CallbackBeforeSendSingleMsg failed", err, "req", req.String())
|
||||
return nil, err
|
||||
}
|
||||
case constant.GroupChatType:
|
||||
if err := CallbackBeforeSendGroupMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
||||
log.ZWarn(ctx, "CallbackBeforeSendGroupMsg failed", err, "req", req.String())
|
||||
return nil, err
|
||||
}
|
||||
case constant.NotificationChatType:
|
||||
case constant.SuperGroupChatType:
|
||||
if err := CallbackBeforeSendGroupMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
||||
if err := callbackBeforeSendGroupMsg(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
||||
log.ZWarn(ctx, "CallbackBeforeSendGroupMsg failed", err, "req", req.String())
|
||||
return nil, err
|
||||
}
|
||||
|
@ -12,6 +12,28 @@ import (
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
)
|
||||
|
||||
func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, error error) {
|
||||
resp = &msg.SendMsgResp{}
|
||||
flag := isMessageHasReadEnabled(req.MsgData)
|
||||
if !flag {
|
||||
return nil, errs.ErrMessageHasReadDisable.Wrap()
|
||||
}
|
||||
m.encapsulateMsgData(req.MsgData)
|
||||
if err := callbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
||||
return nil, err
|
||||
}
|
||||
switch req.MsgData.SessionType {
|
||||
case constant.SingleChatType:
|
||||
return m.sendMsgSingleChat(ctx, req)
|
||||
case constant.NotificationChatType:
|
||||
return m.sendMsgNotification(ctx, req)
|
||||
case constant.SuperGroupChatType:
|
||||
return m.sendMsgSuperGroupChat(ctx, req)
|
||||
default:
|
||||
return nil, errs.ErrArgs.Wrap("unknown sessionType")
|
||||
}
|
||||
}
|
||||
|
||||
func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMsgReq) (resp *pbMsg.SendMsgResp, err error) {
|
||||
resp = &pbMsg.SendMsgResp{}
|
||||
promePkg.Inc(promePkg.WorkSuperGroupChatMsgRecvSuccessCounter)
|
||||
@ -23,7 +45,7 @@ func (m *msgServer) sendMsgSuperGroupChat(ctx context.Context, req *pbMsg.SendMs
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = CallbackAfterSendGroupMsg(ctx, req); err != nil {
|
||||
if err = callbackAfterSendGroupMsg(ctx, req); err != nil {
|
||||
log.ZError(ctx, "CallbackAfterSendGroupMsg", err)
|
||||
}
|
||||
promePkg.Inc(promePkg.WorkSuperGroupChatMsgProcessSuccessCounter)
|
||||
@ -69,7 +91,7 @@ func (m *msgServer) sendMsgSingleChat(ctx context.Context, req *pbMsg.SendMsgReq
|
||||
promePkg.Inc(promePkg.SingleChatMsgProcessFailedCounter)
|
||||
return nil, err
|
||||
}
|
||||
err = CallbackAfterSendSingleMsg(ctx, req)
|
||||
err = callbackAfterSendSingleMsg(ctx, req)
|
||||
if err != nil && err != errs.ErrCallbackContinue {
|
||||
return nil, err
|
||||
}
|
@ -3,21 +3,15 @@ package msg
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/cache"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/controller"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/localcache"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/tx"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/db/unrelation"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/prome"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/discoveryregistry"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/msg"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
@ -102,75 +96,3 @@ func (m *msgServer) initPrometheus() {
|
||||
prome.NewWorkSuperGroupChatMsgProcessSuccessCounter()
|
||||
prome.NewWorkSuperGroupChatMsgProcessFailedCounter()
|
||||
}
|
||||
|
||||
func (m *msgServer) SendMsg(ctx context.Context, req *msg.SendMsgReq) (resp *msg.SendMsgResp, error error) {
|
||||
resp = &msg.SendMsgResp{}
|
||||
flag := isMessageHasReadEnabled(req.MsgData)
|
||||
if !flag {
|
||||
return nil, errs.ErrMessageHasReadDisable.Wrap()
|
||||
}
|
||||
m.encapsulateMsgData(req.MsgData)
|
||||
if err := CallbackMsgModify(ctx, req); err != nil && err != errs.ErrCallbackContinue {
|
||||
return nil, err
|
||||
}
|
||||
switch req.MsgData.SessionType {
|
||||
case constant.SingleChatType:
|
||||
return m.sendMsgSingleChat(ctx, req)
|
||||
case constant.NotificationChatType:
|
||||
return m.sendMsgNotification(ctx, req)
|
||||
case constant.SuperGroupChatType:
|
||||
return m.sendMsgSuperGroupChat(ctx, req)
|
||||
default:
|
||||
return nil, errs.ErrArgs.Wrap("unknown sessionType")
|
||||
}
|
||||
}
|
||||
|
||||
func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) {
|
||||
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversationIDs, err := m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, conversationID := range conversationIDs {
|
||||
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
|
||||
}
|
||||
log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs)
|
||||
maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "GetMaxSeqs error", err, "conversationIDs", conversationIDs, "maxSeqs", maxSeqs)
|
||||
return nil, err
|
||||
}
|
||||
resp := new(sdkws.GetMaxSeqResp)
|
||||
resp.MaxSeqs = maxSeqs
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
|
||||
resp := &sdkws.PullMessageBySeqsResp{}
|
||||
resp.Msgs = make(map[string]*sdkws.PullMsgs)
|
||||
resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs)
|
||||
for _, seq := range req.SeqRanges {
|
||||
if !utils.IsNotification(seq.ConversationID) {
|
||||
msgs, err := m.MsgDatabase.GetMsgBySeqsRange(ctx, seq.ConversationID, seq.Begin, seq.End, seq.Num)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "GetMsgBySeqsRange error", err, "conversationID", seq.ConversationID, "seq", seq)
|
||||
continue
|
||||
}
|
||||
resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: msgs}
|
||||
} else {
|
||||
var seqs []int64
|
||||
for i := seq.Begin; i <= seq.End; i++ {
|
||||
seqs = append(seqs, i)
|
||||
}
|
||||
notificationMsgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, seq.ConversationID, seqs)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "GetMsgBySeqs error", err, "conversationID", seq.ConversationID, "seq", seq)
|
||||
continue
|
||||
}
|
||||
resp.NotificationMsgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: notificationMsgs}
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
60
internal/rpc/msg/sync_msg.go
Normal file
60
internal/rpc/msg/sync_msg.go
Normal file
@ -0,0 +1,60 @@
|
||||
package msg
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/proto/sdkws"
|
||||
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
|
||||
)
|
||||
|
||||
func (m *msgServer) PullMessageBySeqs(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) {
|
||||
resp := &sdkws.PullMessageBySeqsResp{}
|
||||
resp.Msgs = make(map[string]*sdkws.PullMsgs)
|
||||
resp.NotificationMsgs = make(map[string]*sdkws.PullMsgs)
|
||||
for _, seq := range req.SeqRanges {
|
||||
if !utils.IsNotification(seq.ConversationID) {
|
||||
msgs, err := m.MsgDatabase.GetMsgBySeqsRange(ctx, seq.ConversationID, seq.Begin, seq.End, seq.Num)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "GetMsgBySeqsRange error", err, "conversationID", seq.ConversationID, "seq", seq)
|
||||
continue
|
||||
}
|
||||
resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: msgs}
|
||||
} else {
|
||||
var seqs []int64
|
||||
for i := seq.Begin; i <= seq.End; i++ {
|
||||
seqs = append(seqs, i)
|
||||
}
|
||||
notificationMsgs, err := m.MsgDatabase.GetMsgBySeqs(ctx, seq.ConversationID, seqs)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "GetMsgBySeqs error", err, "conversationID", seq.ConversationID, "seq", seq)
|
||||
continue
|
||||
}
|
||||
resp.NotificationMsgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: notificationMsgs}
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *msgServer) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) {
|
||||
if err := tokenverify.CheckAccessV3(ctx, req.UserID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conversationIDs, err := m.ConversationLocalCache.GetConversationIDs(ctx, req.UserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, conversationID := range conversationIDs {
|
||||
conversationIDs = append(conversationIDs, utils.GetNotificationConversationIDByConversationID(conversationID))
|
||||
}
|
||||
log.ZDebug(ctx, "GetMaxSeq", "conversationIDs", conversationIDs)
|
||||
maxSeqs, err := m.MsgDatabase.GetMaxSeqs(ctx, conversationIDs)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "GetMaxSeqs error", err, "conversationIDs", conversationIDs, "maxSeqs", maxSeqs)
|
||||
return nil, err
|
||||
}
|
||||
resp := new(sdkws.GetMaxSeqResp)
|
||||
resp.MaxSeqs = maxSeqs
|
||||
return resp, nil
|
||||
}
|
@ -292,20 +292,3 @@ func (m *msgServer) modifyMessageByUserMessageReceiveOpt(ctx context.Context, us
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func valueCopy(pb *msg.SendMsgReq) *msg.SendMsgReq {
|
||||
offlinePushInfo := sdkws.OfflinePushInfo{}
|
||||
if pb.MsgData.OfflinePushInfo != nil {
|
||||
offlinePushInfo = *pb.MsgData.OfflinePushInfo
|
||||
}
|
||||
msgData := sdkws.MsgData{}
|
||||
msgData = *pb.MsgData
|
||||
msgData.OfflinePushInfo = &offlinePushInfo
|
||||
|
||||
options := make(map[string]bool, 10)
|
||||
for key, value := range pb.MsgData.Options {
|
||||
options[key] = value
|
||||
}
|
||||
msgData.Options = options
|
||||
return &msg.SendMsgReq{MsgData: &msgData}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user