From b4da35a0abaafd461f708b2f0cfd5211b606908c Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Thu, 26 May 2022 18:02:00 +0800 Subject: [PATCH] ingress --- internal/api/manage/management_chat.go | 135 ++++++++++++++++++++----- internal/msg_gateway/gate/callback.go | 51 ++++++++++ internal/msg_gateway/gate/ws_server.go | 8 +- internal/push/logic/callback.go | 33 ++++++ internal/push/logic/push_to_client.go | 8 ++ internal/rpc/msg/callback.go | 109 ++++++++++---------- internal/rpc/msg/send_msg.go | 65 +++++------- pkg/base_info/manage_api_struct.go | 30 ++++++ pkg/call_back_struct/common.go | 8 ++ pkg/call_back_struct/msg_gateway.go | 18 ++++ pkg/call_back_struct/push.go | 12 +++ pkg/common/config/config.go | 5 +- pkg/common/constant/constant.go | 3 + 13 files changed, 359 insertions(+), 126 deletions(-) create mode 100644 internal/msg_gateway/gate/callback.go create mode 100644 internal/push/logic/callback.go create mode 100644 pkg/call_back_struct/msg_gateway.go create mode 100644 pkg/call_back_struct/push.go diff --git a/internal/api/manage/management_chat.go b/internal/api/manage/management_chat.go index 94caa5e2a..9ee8c7874 100644 --- a/internal/api/manage/management_chat.go +++ b/internal/api/manage/management_chat.go @@ -28,7 +28,7 @@ import ( var validate *validator.Validate -func newUserSendMsgReq(params *ManagementSendMsgReq) *pbChat.SendMsgReq { +func newUserSendMsgReq(params *api.ManagementSendMsgReq) *pbChat.SendMsgReq { var newContent string var err error switch params.ContentType { @@ -91,7 +91,7 @@ func init() { func ManagementSendMsg(c *gin.Context) { var data interface{} - params := ManagementSendMsgReq{} + params := api.ManagementSendMsgReq{} if err := c.BindJSON(¶ms); err != nil { c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) log.Error(c.PostForm("operationID"), "json unmarshal err", err.Error(), c.PostForm("content")) @@ -186,34 +186,113 @@ func ManagementSendMsg(c *gin.Context) { resp := api.ManagementSendMsgResp{CommResp: api.CommResp{ErrCode: RpcResp.ErrCode, ErrMsg: RpcResp.ErrMsg}, ResultList: server_api_params.UserSendMsgResp{ServerMsgID: RpcResp.ServerMsgID, ClientMsgID: RpcResp.ClientMsgID, SendTime: RpcResp.SendTime}} log.Info(params.OperationID, "ManagementSendMsg return", resp) c.JSON(http.StatusOK, resp) - } -// -//type MergeElem struct { -// Title string `json:"title"` -// AbstractList []string `json:"abstractList"` -// MultiMessage []*MsgStruct `json:"multiMessage"` -//} -// -//type QuoteElem struct { -// Text string `json:"text"` -// QuoteMessage *MsgStruct `json:"quoteMessage"` -//} -type ManagementSendMsgReq struct { - OperationID string `json:"operationID" binding:"required"` - SendID string `json:"sendID" binding:"required"` - RecvID string `json:"recvID" ` - GroupID string `json:"groupID" ` - SenderNickname string `json:"senderNickname" ` - SenderFaceURL string `json:"senderFaceURL" ` - SenderPlatformID int32 `json:"senderPlatformID"` - ForceList []string `json:"forceList" ` - Content map[string]interface{} `json:"content" binding:"required"` - ContentType int32 `json:"contentType" binding:"required"` - SessionType int32 `json:"sessionType" binding:"required"` - IsOnlineOnly bool `json:"isOnlineOnly"` - OfflinePushInfo *open_im_sdk.OfflinePushInfo `json:"offlinePushInfo"` +func ManagementBatchSendMsg(c *gin.Context) { + var data interface{} + params := api.ManagementBatchSendMsgReq{} + resp := api.ManagementBatchSendMsgResp{} + if err := c.BindJSON(¶ms); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) + log.Error(c.PostForm("operationID"), "json unmarshal err", err.Error(), c.PostForm("content")) + return + } + switch params.ContentType { + case constant.Text: + data = TextElem{} + case constant.Picture: + data = PictureElem{} + case constant.Voice: + data = SoundElem{} + case constant.Video: + data = VideoElem{} + case constant.File: + data = FileElem{} + //case constant.AtText: + // data = AtElem{} + //case constant.Merger: + // data = + //case constant.Card: + //case constant.Location: + case constant.Custom: + data = CustomElem{} + case constant.Revoke: + data = RevokeElem{} + case constant.OANotification: + data = OANotificationElem{} + params.SessionType = constant.NotificationChatType + //case constant.HasReadReceipt: + //case constant.Typing: + //case constant.Quote: + default: + c.JSON(http.StatusBadRequest, gin.H{"errCode": 404, "errMsg": "contentType err"}) + log.Error(c.PostForm("operationID"), "contentType err", c.PostForm("content")) + return + } + if err := mapstructure.WeakDecode(params.Content, &data); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 401, "errMsg": err.Error()}) + log.Error(c.PostForm("operationID"), "content to Data struct err", err.Error()) + return + } else if err := validate.Struct(data); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 403, "errMsg": err.Error()}) + log.Error(c.PostForm("operationID"), "data args validate err", err.Error()) + return + } + log.NewInfo("", data, params) + token := c.Request.Header.Get("token") + claims, err := token_verify.ParseToken(token, params.OperationID) + if err != nil { + log.NewError(params.OperationID, "parse token failed", err.Error()) + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "parse token failed", "sendTime": 0, "MsgID": ""}) + return + } + if !utils.IsContain(claims.UID, config.Config.Manager.AppManagerUid) { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "not authorized", "sendTime": 0, "MsgID": ""}) + return + + } + switch params.SessionType { + case constant.SingleChatType: + if len(params.RecvID) == 0 { + log.NewError(params.OperationID, "recvID is a null string") + c.JSON(http.StatusBadRequest, gin.H{"errCode": 405, "errMsg": "recvID is a null string", "sendTime": 0, "MsgID": ""}) + return + } + case constant.GroupChatType: + if len(params.GroupID) == 0 { + log.NewError(params.OperationID, "groupID is a null string") + c.JSON(http.StatusBadRequest, gin.H{"errCode": 405, "errMsg": "groupID is a null string", "sendTime": 0, "MsgID": ""}) + return + } + + } + log.NewInfo(params.OperationID, "Ws call success to ManagementSendMsgReq", params) + for _, recvID := range params.RecvIDList { + pbData := newUserSendMsgReq(¶ms.ManagementSendMsgReq) + pbData.MsgData.RecvID = recvID + log.Info("", "", "api ManagementSendMsg call start..., [data: %s]", pbData.String()) + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) + client := pbChat.NewChatClient(etcdConn) + rpcResp, err := client.SendMsg(context.Background(), pbData) + if err != nil { + log.NewError(params.OperationID, "call delete UserSendMsg rpc server failed", err.Error()) + resp.Data.FailedIDList = append(resp.Data.FailedIDList, recvID) + continue + } + if rpcResp.ErrCode != 0 { + log.NewError(params.OperationID, utils.GetSelfFuncName(), "rpc failed", pbData) + resp.Data.FailedIDList = append(resp.Data.FailedIDList, recvID) + continue + } + resp.Data.ResultList = append(resp.Data.ResultList, server_api_params.UserSendMsgResp{ + ServerMsgID: rpcResp.ServerMsgID, + ClientMsgID: rpcResp.ClientMsgID, + SendTime: rpcResp.SendTime, + }) + } + + log.NewInfo(params.OperationID, utils.GetSelfFuncName(), "resp: ", resp) + c.JSON(http.StatusOK, resp) } type PictureBaseInfo struct { diff --git a/internal/msg_gateway/gate/callback.go b/internal/msg_gateway/gate/callback.go new file mode 100644 index 000000000..58d3a98b2 --- /dev/null +++ b/internal/msg_gateway/gate/callback.go @@ -0,0 +1,51 @@ +package gate + +import ( + cbApi "Open_IM/pkg/call_back_struct" + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/http" + http2 "net/http" +) + +func callbackUserOnline(operationID, userID string, platformID int32, token string) cbApi.CommonCallbackResp { + callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} + if !config.Config.Callback.CallbackUserOnline.Enable { + return callbackResp + } + callbackUserOnlineReq := cbApi.CallbackUserOnlineReq{ + Token: token, + UserStatusCallbackReq: cbApi.UserStatusCallbackReq{ + CallbackCommand: constant.CallbackUserOnlineCommand, + OperationID: operationID, + UserID: userID, + PlatformID: platformID, + Platform: constant.PlatformIDToName(platformID), + }} + callbackUserOnlineResp := &cbApi.CallbackUserOnlineResp{CommonCallbackResp: callbackResp} + if err := http.PostReturn(config.Config.Callback.CallbackUrl, callbackUserOnlineReq, callbackUserOnlineResp, config.Config.Callback.CallbackUserOnline.CallbackTimeOut); err != nil { + callbackResp.ErrCode = http2.StatusInternalServerError + callbackResp.ErrMsg = err.Error() + } + return callbackResp +} + +func callbackUserOffline(operationID, userID string, platform string) cbApi.CommonCallbackResp { + callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} + if !config.Config.Callback.CallbackUserOffline.Enable { + return callbackResp + } + callbackOfflineReq := cbApi.CallbackUserOfflineReq{UserStatusCallbackReq: cbApi.UserStatusCallbackReq{ + CallbackCommand: constant.CallbackUserOfflineCommand, + OperationID: operationID, + UserID: userID, + PlatformID: constant.PlatformNameToID(platform), + Platform: platform, + }} + callbackUserOfflineResp := &cbApi.CallbackUserOfflineResp{CommonCallbackResp: callbackResp} + if err := http.PostReturn(config.Config.Callback.CallbackUrl, callbackOfflineReq, callbackUserOfflineResp, config.Config.Callback.CallbackUserOffline.CallbackTimeOut); err != nil { + callbackResp.ErrCode = http2.StatusInternalServerError + callbackResp.ErrMsg = err.Error() + } + return callbackResp +} diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index c14277c9c..daae9e793 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -59,7 +59,6 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) { } else { //Connection mapping relationship, //userID+" "+platformID->conn - //Initialize a lock for each user newConn := &UserConn{conn, new(sync.Mutex)} userCount++ @@ -167,6 +166,10 @@ func (ws *WServer) addUserConn(uid string, platformID int32, conn *UserConn, tok rwLock.Lock() defer rwLock.Unlock() operationID := utils.OperationIDGenerator() + callbackResp := callbackUserOnline(operationID, uid, platformID, token) + if callbackResp.ErrCode != 0 { + log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOnline resp:", callbackResp) + } ws.MultiTerminalLoginChecker(uid, platformID, conn, token, operationID) if oldConnMap, ok := ws.wsUserToConn[uid]; ok { oldConnMap[constant.PlatformIDToName(platformID)] = conn @@ -191,7 +194,6 @@ func (ws *WServer) addUserConn(uid string, platformID int32, conn *UserConn, tok count = count + len(v) } log.Debug(operationID, "WS Add operation", "", "wsUser added", ws.wsUserToConn, "connection_uid", uid, "connection_platform", constant.PlatformIDToName(platformID), "online_user_num", len(ws.wsUserToConn), "online_conn_num", count) - } func (ws *WServer) delUserConn(conn *UserConn) { @@ -225,7 +227,7 @@ func (ws *WServer) delUserConn(conn *UserConn) { if err != nil { log.Error(operationID, " close err", "", "uid", uid, "platform", platform) } - + callbaclResp := callbackUserOffline(operationID, uid, platform) } func (ws *WServer) getUserConn(uid string, platform string) *UserConn { diff --git a/internal/push/logic/callback.go b/internal/push/logic/callback.go new file mode 100644 index 000000000..a706ef006 --- /dev/null +++ b/internal/push/logic/callback.go @@ -0,0 +1,33 @@ +package logic + +import ( + cbApi "Open_IM/pkg/call_back_struct" + "Open_IM/pkg/common/config" + "Open_IM/pkg/common/constant" + "Open_IM/pkg/common/http" + commonPb "Open_IM/pkg/proto/sdk_ws" + http2 "net/http" +) + +func callbackOfflinePush(operationID, userID string, info *commonPb.OfflinePushInfo, platformID int32) cbApi.CommonCallbackResp { + callbackResp := cbApi.CommonCallbackResp{OperationID: operationID} + if !config.Config.Callback.CallbackUserOnline.Enable { + return callbackResp + } + callbackOfflinePushReq := cbApi.CallbackOfflinePushReq{ + UserStatusCallbackReq: cbApi.UserStatusCallbackReq{ + CallbackCommand: constant.CallbackOfflinePushCommand, + OperationID: operationID, + UserID: userID, + PlatformID: platformID, + Platform: constant.PlatformIDToName(platformID), + }, + OfflinePushInfo: info, + } + callbackOfflinePushResp := &cbApi.CallbackOfflinePushResp{CommonCallbackResp: callbackResp} + if err := http.PostReturn(config.Config.Callback.CallbackUrl, callbackOfflinePushReq, callbackOfflinePushResp, config.Config.Callback.CallbackOfflinePush.CallbackTimeOut); err != nil { + callbackResp.ErrCode = http2.StatusInternalServerError + callbackResp.ErrMsg = err.Error() + } + return callbackResp +} diff --git a/internal/push/logic/push_to_client.go b/internal/push/logic/push_to_client.go index 6e9b8787b..8911218f2 100644 --- a/internal/push/logic/push_to_client.go +++ b/internal/push/logic/push_to_client.go @@ -106,6 +106,14 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) { content = constant.ContentType2PushContent[constant.Common] } } + callbackResp := callbackOfflinePush(pushMsg.OperationID, UIDList[0], pushMsg.MsgData.OfflinePushInfo, v.RecvPlatFormID) + if callbackResp.ErrCode != 0 { + log.NewError(pushMsg.OperationID, utils.GetSelfFuncName(), "callbackOfflinePush result: ", callbackResp) + } + if callbackResp.ActionCode != constant.ActionAllow { + break + } + if offlinePusher == nil { offlinePusher = jpush.JPushClient } diff --git a/internal/rpc/msg/callback.go b/internal/rpc/msg/callback.go index 452846799..af8f7c334 100644 --- a/internal/rpc/msg/callback.go +++ b/internal/rpc/msg/callback.go @@ -8,6 +8,7 @@ import ( "Open_IM/pkg/common/log" pbChat "Open_IM/pkg/proto/chat" "Open_IM/pkg/utils" + http2 "net/http" ) func copyCallbackCommonReqStruct(msg *pbChat.SendMsgReq) cbApi.CommonCallbackReq { @@ -27,11 +28,11 @@ func copyCallbackCommonReqStruct(msg *pbChat.SendMsgReq) cbApi.CommonCallbackReq } } -func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) { +func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp { + callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID} if !config.Config.Callback.CallbackBeforeSendSingleMsg.Enable { - return true, nil + return callbackResp } - log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) commonCallbackReq := copyCallbackCommonReqStruct(msg) commonCallbackReq.CallbackCommand = constant.CallbackBeforeSendSingleMsgCommand @@ -40,27 +41,28 @@ func callbackBeforeSendSingleMsg(msg *pbChat.SendMsgReq) (canSend bool, err erro RecvID: msg.MsgData.RecvID, } resp := &cbApi.CallbackBeforeSendSingleMsgResp{ - CommonCallbackResp: cbApi.CommonCallbackResp{}, + CommonCallbackResp: callbackResp, } //utils.CopyStructFields(req, msg.MsgData) defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp) if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSendSingleMsg.CallbackTimeOut); err != nil { + callbackResp.ErrCode = http2.StatusInternalServerError + callbackResp.ErrMsg = err.Error() if !config.Config.Callback.CallbackBeforeSendSingleMsg.CallbackFailedContinue { - return false, err + callbackResp.ActionCode = constant.ActionForbidden + return callbackResp } else { - return true, err - } - } else { - if resp.ActionCode == constant.ActionForbidden && resp.ErrCode == constant.CallbackHandleSuccess { - return false, nil + callbackResp.ActionCode = constant.ActionAllow + return callbackResp } } - return true, err + return callbackResp } -func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) error { +func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp { + callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID} if !config.Config.Callback.CallbackAfterSendSingleMsg.Enable { - return nil + return callbackResp } log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) commonCallbackReq := copyCallbackCommonReqStruct(msg) @@ -69,18 +71,20 @@ func callbackAfterSendSingleMsg(msg *pbChat.SendMsgReq) error { CommonCallbackReq: commonCallbackReq, RecvID: msg.MsgData.RecvID, } - resp := &cbApi.CallbackAfterSendSingleMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} - //utils.CopyStructFields(req, msg.MsgData) + resp := &cbApi.CallbackAfterSendSingleMsgResp{CommonCallbackResp: callbackResp} defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp) if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendSingleMsg.CallbackTimeOut); err != nil { - return err + callbackResp.ErrCode = http2.StatusInternalServerError + callbackResp.ErrMsg = err.Error() + return callbackResp } - return nil + return callbackResp } -func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) (canSend bool, err error) { +func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp { + callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID} if !config.Config.Callback.CallbackBeforeSendGroupMsg.Enable { - return true, nil + return callbackResp } log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) commonCallbackReq := copyCallbackCommonReqStruct(msg) @@ -89,26 +93,26 @@ func callbackBeforeSendGroupMsg(msg *pbChat.SendMsgReq) (canSend bool, err error CommonCallbackReq: commonCallbackReq, GroupID: msg.MsgData.GroupID, } - resp := &cbApi.CallbackBeforeSendGroupMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} - //utils.CopyStructFields(req, msg.MsgData) + resp := &cbApi.CallbackBeforeSendGroupMsgResp{CommonCallbackResp: callbackResp} defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp) if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackTimeOut); err != nil { - if !config.Config.Callback.CallbackBeforeSendGroupMsg.CallbackFailedContinue { - return false, err + callbackResp.ErrCode = http2.StatusInternalServerError + callbackResp.ErrMsg = err.Error() + if !config.Config.Callback.CallbackBeforeSendSingleMsg.CallbackFailedContinue { + callbackResp.ActionCode = constant.ActionForbidden + return callbackResp } else { - return true, err - } - } else { - if resp.ActionCode == constant.ActionForbidden && resp.ErrCode == constant.CallbackHandleSuccess { - return false, nil + callbackResp.ActionCode = constant.ActionAllow + return callbackResp } } - return true, err + return callbackResp } -func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) error { +func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp { + callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID} if !config.Config.Callback.CallbackAfterSendGroupMsg.Enable { - return nil + return callbackResp } log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) commonCallbackReq := copyCallbackCommonReqStruct(msg) @@ -117,19 +121,20 @@ func callbackAfterSendGroupMsg(msg *pbChat.SendMsgReq) error { CommonCallbackReq: commonCallbackReq, GroupID: msg.MsgData.GroupID, } - resp := &cbApi.CallbackAfterSendGroupMsgResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} - - //utils.CopyStructFields(req, msg.MsgData) + resp := &cbApi.CallbackAfterSendGroupMsgResp{CommonCallbackResp: callbackResp} defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp) if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackAfterSendGroupMsg.CallbackTimeOut); err != nil { - return err + callbackResp.ErrCode = http2.StatusInternalServerError + callbackResp.ErrMsg = err.Error() + return callbackResp } - return nil + return callbackResp } -func callbackWordFilter(msg *pbChat.SendMsgReq) (canSend bool, err error) { +func callbackWordFilter(msg *pbChat.SendMsgReq) cbApi.CommonCallbackResp { + callbackResp := cbApi.CommonCallbackResp{OperationID: msg.OperationID} if !config.Config.Callback.CallbackWordFilter.Enable || msg.MsgData.ContentType != constant.Text { - return true, nil + return callbackResp } log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), msg) commonCallbackReq := copyCallbackCommonReqStruct(msg) @@ -137,24 +142,22 @@ func callbackWordFilter(msg *pbChat.SendMsgReq) (canSend bool, err error) { req := cbApi.CallbackWordFilterReq{ CommonCallbackReq: commonCallbackReq, } - resp := &cbApi.CallbackWordFilterResp{CommonCallbackResp: cbApi.CommonCallbackResp{}} - //utils.CopyStructFields(&req., msg.MsgData) + resp := &cbApi.CallbackWordFilterResp{CommonCallbackResp: callbackResp} defer log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), req, *resp) if err := http.PostReturn(config.Config.Callback.CallbackUrl, req, resp, config.Config.Callback.CallbackWordFilter.CallbackTimeOut); err != nil { - if !config.Config.Callback.CallbackWordFilter.CallbackFailedContinue { - log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), "callback failed and config disable, stop this operation") - return false, err + callbackResp.ErrCode = http2.StatusInternalServerError + callbackResp.ErrMsg = err.Error() + if !config.Config.Callback.CallbackBeforeSendSingleMsg.CallbackFailedContinue { + callbackResp.ActionCode = constant.ActionForbidden + return callbackResp } else { - return true, err + callbackResp.ActionCode = constant.ActionAllow + return callbackResp } - } else { - if resp.ActionCode == constant.ActionForbidden && resp.ErrCode == constant.CallbackHandleSuccess { - return false, nil - } - if resp.ErrCode == constant.CallbackHandleSuccess { - msg.MsgData.Content = []byte(resp.Content) - } - log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), string(msg.MsgData.Content)) } - return true, err + if resp.ErrCode == constant.CallbackHandleSuccess && resp.ActionCode == constant.ActionAllow { + msg.MsgData.Content = []byte(resp.Content) + } + log.NewDebug(msg.OperationID, utils.GetSelfFuncName(), string(msg.MsgData.Content)) + return callbackResp } diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index 93f488729..549de2fc9 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -148,47 +148,29 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S if !flag { return returnMsg(&replay, pb, errCode, errMsg, "", 0) } - //if !utils.VerifyToken(pb.Token, pb.SendID) { - // return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0) rpc.encapsulateMsgData(pb.MsgData) log.Info("", "this is a test MsgData ", pb.MsgData) msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} - //options := utils.JsonStringToMap(pbData.Options) - isHistory := utils.GetSwitchFromOptions(pb.MsgData.Options, constant.IsHistory) - mReq := MsgCallBackReq{ - SendID: pb.MsgData.SendID, - RecvID: pb.MsgData.RecvID, - Content: string(pb.MsgData.Content), - SendTime: pb.MsgData.SendTime, - MsgFrom: pb.MsgData.MsgFrom, - ContentType: pb.MsgData.ContentType, - SessionType: pb.MsgData.SessionType, - PlatformID: pb.MsgData.SenderPlatformID, - MsgID: pb.MsgData.ClientMsgID, - } - if !isHistory { - mReq.IsOnlineOnly = true - } // callback - canSend, err := callbackWordFilter(pb) - if err != nil { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter failed", err.Error(), pb.MsgData) + callbackResp := callbackWordFilter(pb) + if callbackResp.ErrCode != 0 { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp) } - if !canSend { - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter result", canSend, "end rpc and return", pb.MsgData) - return returnMsg(&replay, pb, 201, "callbackWordFilter result stop rpc and return", "", 0) + if callbackResp.ActionCode != constant.ActionAllow { + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter result", "end rpc and return", pb.MsgData) + return returnMsg(&replay, pb, int32(callbackResp.ErrCode), callbackResp.ErrMsg, "", 0) } switch pb.MsgData.SessionType { case constant.SingleChatType: // callback - canSend, err := callbackBeforeSendSingleMsg(pb) - if err != nil { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg failed", err.Error()) + callbackResp := callbackBeforeSendSingleMsg(pb) + if callbackResp.ErrCode != 0 { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg resp: ", callbackResp) } - if !canSend { - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", canSend, "end rpc and return") - return returnMsg(&replay, pb, 201, "callbackBeforeSendSingleMsg result stop rpc and return", "", 0) + if callbackResp.ActionCode != constant.ActionAllow { + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp) + return returnMsg(&replay, pb, int32(callbackResp.ActionCode), callbackResp.ErrMsg, "", 0) } isSend := modifyMessageByUserMessageReceiveOpt(pb.MsgData.RecvID, pb.MsgData.SendID, constant.SingleChatType, pb) if isSend { @@ -208,19 +190,20 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } } // callback - if err := callbackAfterSendSingleMsg(pb); err != nil { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg failed", err.Error()) + callbackResp = callbackAfterSendSingleMsg(pb) + if callbackResp.ErrCode != 0 { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendSingleMsg resp: ", callbackResp) } return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) case constant.GroupChatType: // callback - canSend, err := callbackBeforeSendGroupMsg(pb) - if err != nil { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg failed", err.Error()) + callbackResp := callbackBeforeSendGroupMsg(pb) + if callbackResp.ErrCode != 0 { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg resp:", callbackResp) } - if !canSend { - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendGroupMsg result", canSend, "end rpc and return") - return returnMsg(&replay, pb, 201, "callbackBeforeSendGroupMsg result stop rpc and return", "", 0) + if callbackResp.ActionCode != constant.ActionAllow { + log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackBeforeSendSingleMsg result", "end rpc and return", callbackResp) + return returnMsg(&replay, pb, int32(callbackResp.ActionCode), callbackResp.ErrMsg, "", 0) } getGroupMemberIDListFromCacheReq := &pbCache.GetGroupMemberIDListFromCacheReq{OperationID: pb.OperationID, GroupID: pb.MsgData.GroupID} etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImCacheName) @@ -295,8 +278,9 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.Debug(pb.OperationID, "send msg cost time2 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) newTime = db.GetCurrentTimestampByMill() // callback - if err := callbackAfterSendGroupMsg(pb); err != nil { - log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg failed", err.Error()) + callbackResp = callbackAfterSendGroupMsg(pb) + if callbackResp.ErrCode != 0 { + log.NewError(pb.OperationID, utils.GetSelfFuncName(), "callbackAfterSendGroupMsg resp: ", callbackResp) } if !sendTag { return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) @@ -354,7 +338,6 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S } log.Debug(pb.OperationID, "send msg cost time3 ", db.GetCurrentTimestampByMill()-newTime, pb.MsgData.ClientMsgID) return returnMsg(&replay, pb, 0, "", msgToMQSingle.MsgData.ServerMsgID, msgToMQSingle.MsgData.SendTime) - } case constant.NotificationChatType: msgToMQSingle.MsgData = pb.MsgData diff --git a/pkg/base_info/manage_api_struct.go b/pkg/base_info/manage_api_struct.go index 4b2061b57..f8515dd5b 100644 --- a/pkg/base_info/manage_api_struct.go +++ b/pkg/base_info/manage_api_struct.go @@ -3,6 +3,7 @@ package base_info import ( pbRelay "Open_IM/pkg/proto/relay" "Open_IM/pkg/proto/sdk_ws" + open_im_sdk "Open_IM/pkg/proto/sdk_ws" pbUser "Open_IM/pkg/proto/user" ) @@ -38,7 +39,36 @@ type AccountCheckResp struct { ResultList []*pbUser.AccountCheckResp_SingleUserStatus `json:"data"` } +type ManagementSendMsgReq struct { + OperationID string `json:"operationID" binding:"required"` + SendID string `json:"sendID" binding:"required"` + RecvID string `json:"recvID" ` + GroupID string `json:"groupID" ` + SenderNickname string `json:"senderNickname" ` + SenderFaceURL string `json:"senderFaceURL" ` + SenderPlatformID int32 `json:"senderPlatformID"` + ForceList []string `json:"forceList" ` + Content map[string]interface{} `json:"content" binding:"required"` + ContentType int32 `json:"contentType" binding:"required"` + SessionType int32 `json:"sessionType" binding:"required"` + IsOnlineOnly bool `json:"isOnlineOnly"` + OfflinePushInfo *open_im_sdk.OfflinePushInfo `json:"offlinePushInfo"` +} + type ManagementSendMsgResp struct { CommResp ResultList server_api_params.UserSendMsgResp `json:"data"` } + +type ManagementBatchSendMsgReq struct { + ManagementSendMsgReq + RecvIDList []string `json:"recvIDList"` +} + +type ManagementBatchSendMsgResp struct { + CommResp + Data struct { + ResultList []server_api_params.UserSendMsgResp `json:"resultList"` + FailedIDList []string + } `json:"data"` +} diff --git a/pkg/call_back_struct/common.go b/pkg/call_back_struct/common.go index bc9ad5dac..14aed5b97 100644 --- a/pkg/call_back_struct/common.go +++ b/pkg/call_back_struct/common.go @@ -22,3 +22,11 @@ type CommonCallbackResp struct { ErrMsg string `json:"errMsg"` OperationID string `json:"operationID"` } + +type UserStatusCallbackReq struct { + CallbackCommand string `json:"callbackCommand"` + OperationID string `json:"operationID"` + UserID string `json:"userID"` + PlatformID int32 `json:"platformID"` + Platform string `json:"platform"` +} diff --git a/pkg/call_back_struct/msg_gateway.go b/pkg/call_back_struct/msg_gateway.go new file mode 100644 index 000000000..e6c3ec389 --- /dev/null +++ b/pkg/call_back_struct/msg_gateway.go @@ -0,0 +1,18 @@ +package call_back_struct + +type CallbackUserOnlineReq struct { + UserStatusCallbackReq + Token string `json:"token"` +} + +type CallbackUserOnlineResp struct { + CommonCallbackResp +} + +type CallbackUserOfflineReq struct { + UserStatusCallbackReq +} + +type CallbackUserOfflineResp struct { + CommonCallbackResp +} diff --git a/pkg/call_back_struct/push.go b/pkg/call_back_struct/push.go new file mode 100644 index 000000000..a63b2930d --- /dev/null +++ b/pkg/call_back_struct/push.go @@ -0,0 +1,12 @@ +package call_back_struct + +import commonPb "Open_IM/pkg/proto/sdk_ws" + +type CallbackOfflinePushReq struct { + UserStatusCallbackReq + *commonPb.OfflinePushInfo +} + +type CallbackOfflinePushResp struct { + CommonCallbackResp +} diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 76e80767e..a14b268a6 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -234,11 +234,14 @@ type config struct { Callback struct { CallbackUrl string `yaml:"callbackUrl"` - CallbackBeforeSendSingleMsg callBackConfig `yaml:"callbackbeforeSendSingleMsg"` + CallbackBeforeSendSingleMsg callBackConfig `yaml:"callbackBeforeSendSingleMsg"` CallbackAfterSendSingleMsg callBackConfig `yaml:"callbackAfterSendSingleMsg"` CallbackBeforeSendGroupMsg callBackConfig `yaml:"callbackBeforeSendGroupMsg"` CallbackAfterSendGroupMsg callBackConfig `yaml:"callbackAfterSendGroupMsg"` CallbackWordFilter callBackConfig `yaml:"callbackWordFilter"` + CallbackUserOnline callBackConfig `yaml:"callbackUserOnline"` + CallbackUserOffline callBackConfig `yaml:"callbackUserOffline"` + CallbackOfflinePush callBackConfig `yaml:"callbackOfflinePush"` } `yaml:"callback"` Notification struct { ///////////////////////group///////////////////////////// diff --git a/pkg/common/constant/constant.go b/pkg/common/constant/constant.go index d8d305f95..e27be4d8e 100644 --- a/pkg/common/constant/constant.go +++ b/pkg/common/constant/constant.go @@ -177,6 +177,9 @@ const ( CallbackBeforeSendGroupMsgCommand = "callbackBeforeSendGroupMsgCommand" CallbackAfterSendGroupMsgCommand = "callbackAfterSendGroupMsgCommand" CallbackWordFilterCommand = "callbackWordFilterCommand" + CallbackUserOnlineCommand = "callbackUserOnlineCommand" + CallbackUserOfflineCommand = "callbackUserOfflineCommand" + CallbackOfflinePushCommand = "callbackOfflinePushCommand" //callback actionCode ActionAllow = 0 ActionForbidden = 1