send msg file modify

This commit is contained in:
Gordon 2021-12-23 19:37:41 +08:00
parent d40e1e47fd
commit ba249d294e
6 changed files with 109 additions and 88 deletions

View File

@ -77,10 +77,10 @@ func main() {
//Message //Message
chatGroup := r.Group("/msg") chatGroup := r.Group("/msg")
{ {
chatGroup.POST("/newest_seq", apiChat.UserGetSeq) chatGroup.POST("/newest_seq", apiChat.GetSeq)
chatGroup.POST("/pull_msg", apiChat.UserPullMsg) chatGroup.POST("/pull_msg", apiChat.PullMsg)
chatGroup.POST("/send_msg", apiChat.UserSendMsg) chatGroup.POST("/send_msg", apiChat.SendMsg)
chatGroup.POST("/pull_msg_by_seq", apiChat.UserPullMsgBySeqList) chatGroup.POST("/pull_msg_by_seq", apiChat.PullMsgBySeqList)
} }
//Manager //Manager
managementGroup := r.Group("/manager") managementGroup := r.Group("/manager")

View File

@ -19,7 +19,7 @@ type paramsUserNewestSeq struct {
MsgIncr int `json:"msgIncr" binding:"required"` MsgIncr int `json:"msgIncr" binding:"required"`
} }
func UserGetSeq(c *gin.Context) { func GetSeq(c *gin.Context) {
params := paramsUserNewestSeq{} params := paramsUserNewestSeq{}
if err := c.BindJSON(&params); err != nil { if err := c.BindJSON(&params); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})

View File

@ -6,6 +6,7 @@ import (
"Open_IM/pkg/common/token_verify" "Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/grpc-etcdv3/getcdv3"
"Open_IM/pkg/proto/chat" "Open_IM/pkg/proto/chat"
"Open_IM/pkg/proto/sdk_ws"
"context" "context"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"net/http" "net/http"
@ -22,7 +23,7 @@ type paramsUserPullMsg struct {
} }
} }
func UserPullMsg(c *gin.Context) { func PullMsg(c *gin.Context) {
params := paramsUserPullMsg{} params := paramsUserPullMsg{}
if err := c.BindJSON(&params); err != nil { if err := c.BindJSON(&params); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
@ -34,7 +35,7 @@ func UserPullMsg(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "token validate err" + err.Error()}) c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "token validate err" + err.Error()})
return return
} }
pbData := pbChat.PullMessageReq{} pbData := open_im_sdk.PullMessageReq{}
pbData.UserID = params.SendID pbData.UserID = params.SendID
pbData.OperationID = params.OperationID pbData.OperationID = params.OperationID
pbData.SeqBegin = *params.Data.SeqBegin pbData.SeqBegin = *params.Data.SeqBegin
@ -54,12 +55,12 @@ func UserPullMsg(c *gin.Context) {
if v := reply.GetSingleUserMsg(); v != nil { if v := reply.GetSingleUserMsg(); v != nil {
msg["single"] = v msg["single"] = v
} else { } else {
msg["single"] = []pbChat.GatherFormat{} msg["single"] = []open_im_sdk.GatherFormat{}
} }
if v := reply.GetGroupUserMsg(); v != nil { if v := reply.GetGroupUserMsg(); v != nil {
msg["group"] = v msg["group"] = v
} else { } else {
msg["group"] = []pbChat.GatherFormat{} msg["group"] = []open_im_sdk.GatherFormat{}
} }
msg["maxSeq"] = reply.GetMaxSeq() msg["maxSeq"] = reply.GetMaxSeq()
msg["minSeq"] = reply.GetMinSeq() msg["minSeq"] = reply.GetMinSeq()
@ -79,7 +80,7 @@ type paramsUserPullMsgBySeqList struct {
SeqList []int64 `json:"seqList"` SeqList []int64 `json:"seqList"`
} }
func UserPullMsgBySeqList(c *gin.Context) { func PullMsgBySeqList(c *gin.Context) {
params := paramsUserPullMsgBySeqList{} params := paramsUserPullMsgBySeqList{}
if err := c.BindJSON(&params); err != nil { if err := c.BindJSON(&params); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
@ -91,7 +92,7 @@ func UserPullMsgBySeqList(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "token validate err" + err.Error()}) c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "token validate err" + err.Error()})
return return
} }
pbData := pbChat.PullMessageBySeqListReq{} pbData := open_im_sdk.PullMessageBySeqListReq{}
pbData.UserID = params.SendID pbData.UserID = params.SendID
pbData.OperationID = params.OperationID pbData.OperationID = params.OperationID
pbData.SeqList = params.SeqList pbData.SeqList = params.SeqList
@ -110,12 +111,12 @@ func UserPullMsgBySeqList(c *gin.Context) {
if v := reply.GetSingleUserMsg(); v != nil { if v := reply.GetSingleUserMsg(); v != nil {
msg["single"] = v msg["single"] = v
} else { } else {
msg["single"] = []pbChat.GatherFormat{} msg["single"] = []open_im_sdk.GatherFormat{}
} }
if v := reply.GetGroupUserMsg(); v != nil { if v := reply.GetGroupUserMsg(); v != nil {
msg["group"] = v msg["group"] = v
} else { } else {
msg["group"] = []pbChat.GatherFormat{} msg["group"] = []open_im_sdk.GatherFormat{}
} }
msg["maxSeq"] = reply.GetMaxSeq() msg["maxSeq"] = reply.GetMaxSeq()
msg["minSeq"] = reply.GetMinSeq() msg["minSeq"] = reply.GetMinSeq()

View File

@ -4,7 +4,7 @@ import (
"Open_IM/pkg/common/config" "Open_IM/pkg/common/config"
"Open_IM/pkg/common/log" "Open_IM/pkg/common/log"
pbChat "Open_IM/pkg/proto/chat" pbChat "Open_IM/pkg/proto/chat"
"Open_IM/pkg/utils" "Open_IM/pkg/proto/sdk_ws"
"context" "context"
"Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/grpc-etcdv3/getcdv3"
@ -14,8 +14,7 @@ import (
) )
type paramsUserSendMsg struct { type paramsUserSendMsg struct {
ReqIdentifier int32 `json:"reqIdentifier" binding:"required"` SenderPlatformID int32 `json:"senderPlatformID" binding:"required"`
PlatformID int32 `json:"platformID" binding:"required"`
SendID string `json:"sendID" binding:"required"` SendID string `json:"sendID" binding:"required"`
SenderNickName string `json:"senderNickName"` SenderNickName string `json:"senderNickName"`
SenderFaceURL string `json:"senderFaceUrl"` SenderFaceURL string `json:"senderFaceUrl"`
@ -24,40 +23,42 @@ type paramsUserSendMsg struct {
SessionType int32 `json:"sessionType" binding:"required"` SessionType int32 `json:"sessionType" binding:"required"`
MsgFrom int32 `json:"msgFrom" binding:"required"` MsgFrom int32 `json:"msgFrom" binding:"required"`
ContentType int32 `json:"contentType" binding:"required"` ContentType int32 `json:"contentType" binding:"required"`
RecvID string `json:"recvID" binding:"required"` RecvID string `json:"recvID" `
GroupID string `json:"groupID" `
ForceList []string `json:"forceList"` ForceList []string `json:"forceList"`
Content string `json:"content" binding:"required"` Content []byte `json:"content" binding:"required"`
Options map[string]int32 `json:"options" ` Options map[string]bool `json:"options" `
ClientMsgID string `json:"clientMsgID" binding:"required"` ClientMsgID string `json:"clientMsgID" binding:"required"`
OffLineInfo map[string]interface{} `json:"offlineInfo" ` CreateTime int64 `json:"createTime" binding:"required"`
Ex map[string]interface{} `json:"ext"` OffLineInfo *open_im_sdk.OfflinePushInfo `json:"offlineInfo" `
} }
} }
func newUserSendMsgReq(token string, params *paramsUserSendMsg) *pbChat.SendMsgReq { func newUserSendMsgReq(token string, params *paramsUserSendMsg) *pbChat.SendMsgReq {
pbData := pbChat.SendMsgReq{ pbData := pbChat.SendMsgReq{
ReqIdentifier: params.ReqIdentifier,
Token: token, Token: token,
OperationID: params.OperationID,
MsgData: &open_im_sdk.MsgData{
SendID: params.SendID, SendID: params.SendID,
RecvID: params.Data.RecvID,
GroupID: params.Data.GroupID,
ClientMsgID: params.Data.ClientMsgID,
SenderPlatformID: params.SenderPlatformID,
SenderNickName: params.SenderNickName, SenderNickName: params.SenderNickName,
SenderFaceURL: params.SenderFaceURL, SenderFaceURL: params.SenderFaceURL,
OperationID: params.OperationID,
PlatformID: params.PlatformID,
SessionType: params.Data.SessionType, SessionType: params.Data.SessionType,
MsgFrom: params.Data.MsgFrom, MsgFrom: params.Data.MsgFrom,
ContentType: params.Data.ContentType, ContentType: params.Data.ContentType,
RecvID: params.Data.RecvID,
ForceList: params.Data.ForceList,
Content: params.Data.Content, Content: params.Data.Content,
Options: utils.MapIntToJsonString(params.Data.Options), CreateTime: params.Data.CreateTime,
ClientMsgID: params.Data.ClientMsgID, Options: params.Data.Options,
OffLineInfo: utils.MapToJsonString(params.Data.OffLineInfo), OfflinePushInfo: params.Data.OffLineInfo,
Ex: utils.MapToJsonString(params.Data.Ex), },
} }
return &pbData return &pbData
} }
func UserSendMsg(c *gin.Context) { func SendMsg(c *gin.Context) {
params := paramsUserSendMsg{} params := paramsUserSendMsg{}
if err := c.BindJSON(&params); err != nil { if err := c.BindJSON(&params); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
@ -88,7 +89,6 @@ func UserSendMsg(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{ c.JSON(http.StatusOK, gin.H{
"errCode": reply.ErrCode, "errCode": reply.ErrCode,
"errMsg": reply.ErrMsg, "errMsg": reply.ErrMsg,
"reqIdentifier": reply.ReqIdentifier,
"data": gin.H{ "data": gin.H{
"clientMsgID": reply.ClientMsgID, "clientMsgID": reply.ClientMsgID,
"serverMsgID": reply.ServerMsgID, "serverMsgID": reply.ServerMsgID,

View File

@ -13,6 +13,7 @@ import (
"Open_IM/pkg/common/token_verify" "Open_IM/pkg/common/token_verify"
"Open_IM/pkg/grpc-etcdv3/getcdv3" "Open_IM/pkg/grpc-etcdv3/getcdv3"
pbChat "Open_IM/pkg/proto/chat" pbChat "Open_IM/pkg/proto/chat"
"Open_IM/pkg/proto/sdk_ws"
"Open_IM/pkg/utils" "Open_IM/pkg/utils"
"context" "context"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@ -27,7 +28,8 @@ var validate *validator.Validate
type paramsManagementSendMsg struct { type paramsManagementSendMsg struct {
OperationID string `json:"operationID" binding:"required"` OperationID string `json:"operationID" binding:"required"`
SendID string `json:"sendID" binding:"required"` SendID string `json:"sendID" binding:"required"`
RecvID string `json:"recvID" binding:"required"` RecvID string `json:"recvID" `
GroupID string `json:"groupID" `
SenderNickName string `json:"senderNickName" ` SenderNickName string `json:"senderNickName" `
SenderFaceURL string `json:"senderFaceURL" ` SenderFaceURL string `json:"senderFaceURL" `
SenderPlatformID int32 `json:"senderPlatformID"` SenderPlatformID int32 `json:"senderPlatformID"`
@ -36,9 +38,10 @@ type paramsManagementSendMsg struct {
ContentType int32 `json:"contentType" binding:"required"` ContentType int32 `json:"contentType" binding:"required"`
SessionType int32 `json:"sessionType" binding:"required"` SessionType int32 `json:"sessionType" binding:"required"`
IsOnlineOnly bool `json:"isOnlineOnly"` IsOnlineOnly bool `json:"isOnlineOnly"`
OfflinePushInfo *open_im_sdk.OfflinePushInfo `json:"offlinePushInfo"`
} }
func newUserSendMsgReq(params *paramsManagementSendMsg) *pbChat.UserSendMsgReq { func newUserSendMsgReq(params *paramsManagementSendMsg) *pbChat.SendMsgReq {
var newContent string var newContent string
switch params.ContentType { switch params.ContentType {
case constant.Text: case constant.Text:
@ -53,26 +56,31 @@ func newUserSendMsgReq(params *paramsManagementSendMsg) *pbChat.UserSendMsgReq {
newContent = utils.StructToJsonString(params.Content) newContent = utils.StructToJsonString(params.Content)
default: default:
} }
options := make(map[string]int32, 2) options := make(map[string]bool, 2)
if params.IsOnlineOnly { if params.IsOnlineOnly {
options["history"] = 0 utils.SetSwitchFromOptions(options, constant.IsOfflinePush, false)
options["persistent"] = 0 utils.SetSwitchFromOptions(options, constant.IsHistory, false)
utils.SetSwitchFromOptions(options, constant.IsPersistent, false)
} }
pbData := pbChat.UserSendMsgReq{ pbData := pbChat.SendMsgReq{
ReqIdentifier: constant.WSSendMsg, OperationID: params.OperationID,
MsgData: &open_im_sdk.MsgData{
SendID: params.SendID, SendID: params.SendID,
RecvID: params.RecvID,
GroupID: params.GroupID,
ClientMsgID: utils.GetMsgID(params.SendID),
SenderPlatformID: params.SenderPlatformID,
SenderNickName: params.SenderNickName, SenderNickName: params.SenderNickName,
SenderFaceURL: params.SenderFaceURL, SenderFaceURL: params.SenderFaceURL,
OperationID: params.OperationID,
PlatformID: params.SenderPlatformID,
SessionType: params.SessionType, SessionType: params.SessionType,
MsgFrom: constant.UserMsgType, MsgFrom: constant.SysMsgType,
ContentType: params.ContentType, ContentType: params.ContentType,
RecvID: params.RecvID, Content: []byte(newContent),
ForceList: params.ForceList, ForceList: params.ForceList,
Content: newContent, CreateTime: utils.GetCurrentTimestampByNano(),
ClientMsgID: utils.GetMsgID(params.SendID), Options: options,
Options: utils.MapIntToJsonString(options), OfflinePushInfo: params.OfflinePushInfo,
},
} }
return &pbData return &pbData
} }
@ -135,6 +143,19 @@ func ManagementSendMsg(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "not authorized", "sendTime": 0, "MsgID": ""}) c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "not authorized", "sendTime": 0, "MsgID": ""})
return return
}
switch params.SessionType {
case constant.SingleChatType:
if len(params.RecvID) == 0 {
log.NewError(params.OperationID, "recvID is a null string")
c.JSON(http.StatusBadRequest, gin.H{"errCode": 405, "errMsg": "recvID is a null string", "sendTime": 0, "MsgID": ""})
}
case constant.GroupChatType:
if len(params.GroupID) == 0 {
log.NewError(params.OperationID, "groupID is a null string")
c.JSON(http.StatusBadRequest, gin.H{"errCode": 405, "errMsg": "groupID is a null string", "sendTime": 0, "MsgID": ""})
}
} }
log.InfoByKv("Ws call success to ManagementSendMsgReq", params.OperationID, "Parameters", params) log.InfoByKv("Ws call success to ManagementSendMsgReq", params.OperationID, "Parameters", params)
@ -146,7 +167,7 @@ func ManagementSendMsg(c *gin.Context) {
log.Info("", "", "api ManagementSendMsg call, api call rpc...") log.Info("", "", "api ManagementSendMsg call, api call rpc...")
reply, err := client.UserSendMsg(context.Background(), pbData) reply, err := client.SendMsg(context.Background(), pbData)
if err != nil { if err != nil {
log.NewError(params.OperationID, "call delete UserSendMsg rpc server failed", err.Error()) log.NewError(params.OperationID, "call delete UserSendMsg rpc server failed", err.Error())
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call UserSendMsg rpc server failed"}) c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "call UserSendMsg rpc server failed"})

View File

@ -155,7 +155,6 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
client := pbGroup.NewGroupClient(etcdConn) client := pbGroup.NewGroupClient(etcdConn)
req := &pbGroup.GetGroupAllMemberReq{ req := &pbGroup.GetGroupAllMemberReq{
GroupID: pb.MsgData.GroupID, GroupID: pb.MsgData.GroupID,
Token: pb.Token,
OperationID: pb.OperationID, OperationID: pb.OperationID,
} }
reply, err := client.GetGroupAllMember(context.Background(), req) reply, err := client.GetGroupAllMember(context.Background(), req)
@ -169,13 +168,13 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
} }
groupID := pb.MsgData.GroupID groupID := pb.MsgData.GroupID
for _, v := range reply.MemberList { for _, v := range reply.MemberList {
pb.MsgData.RecvID = v.UserId pb.MsgData.RecvID = v.UserID
isSend := modifyMessageByUserMessageReceiveOpt(v.UserId, groupID, constant.GroupChatType, pb) isSend := modifyMessageByUserMessageReceiveOpt(v.UserID, groupID, constant.GroupChatType, pb)
if isSend { if isSend {
msgToMQ.MsgData = pb.MsgData msgToMQ.MsgData = pb.MsgData
err := rpc.sendMsgToKafka(&msgToMQ, v.UserId) err := rpc.sendMsgToKafka(&msgToMQ, v.UserID)
if err != nil { if err != nil {
log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v.UserId, msgToMQ.String()) log.NewError(msgToMQ.OperationID, "kafka send msg err:UserId", v.UserID, msgToMQ.String())
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
} }
} }