ws modify

This commit is contained in:
Gordon 2021-10-25 16:03:14 +08:00
parent d98253db43
commit 5dabd9bf4b
4 changed files with 98 additions and 69 deletions

View File

@ -6,11 +6,14 @@ import (
"Open_IM/src/common/log" "Open_IM/src/common/log"
"Open_IM/src/grpc-etcdv3/getcdv3" "Open_IM/src/grpc-etcdv3/getcdv3"
pbChat "Open_IM/src/proto/chat" pbChat "Open_IM/src/proto/chat"
pbWs "Open_IM/src/proto/sdk_ws"
"Open_IM/src/utils" "Open_IM/src/utils"
"bytes" "bytes"
"context" "context"
"encoding/gob" "encoding/gob"
"encoding/json"
"fmt" "fmt"
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"runtime" "runtime"
"strings" "strings"
@ -66,14 +69,17 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
} }
func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetNewSeqResp) { func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetNewSeqResp) {
mReply := make(map[string]interface{}) var mReplyData pbWs.GetNewSeqResp
mData := make(map[string]interface{}) mReplyData.Seq = pb.GetSeq()
mReply["reqIdentifier"] = m.ReqIdentifier b, _ := proto.Marshal(&mReplyData)
mReply["msgIncr"] = m.MsgIncr mReply := Resp{
mReply["errCode"] = pb.GetErrCode() ReqIdentifier: m.ReqIdentifier,
mReply["errMsg"] = pb.GetErrMsg() MsgIncr: m.MsgIncr,
mData["seq"] = pb.GetSeq() ErrCode: pb.GetErrCode(),
mReply["data"] = mData ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID,
Data: b,
}
ws.sendMsg(conn, mReply) ws.sendMsg(conn, mReply)
} }
func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) { func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) {
@ -97,26 +103,28 @@ func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) {
} }
func (ws *WServer) pullMsgResp(conn *UserConn, m *Req, pb *pbChat.PullMessageResp) { func (ws *WServer) pullMsgResp(conn *UserConn, m *Req, pb *pbChat.PullMessageResp) {
mReply := make(map[string]interface{}) var mReplyData pbWs.PullMessageBySeqListResp
msg := make(map[string]interface{}) mReplyData.MaxSeq = pb.GetMaxSeq()
mReply["reqIdentifier"] = m.ReqIdentifier mReplyData.MinSeq = pb.GetMinSeq()
mReply["msgIncr"] = m.MsgIncr b, _ := json.Marshal(pb.GetSingleUserMsg)
mReply["errCode"] = pb.GetErrCode() err := json.Unmarshal(b, &mReplyData.SingleUserMsg)
mReply["errMsg"] = pb.GetErrMsg() if err != nil {
//空切片 log.NewError(m.OperationID, "SingleUserMsg,json Unmarshal,err", err.Error())
if v := pb.GetSingleUserMsg(); v != nil {
msg["single"] = v
} else {
msg["single"] = []pbChat.GatherFormat{}
} }
if v := pb.GetGroupUserMsg(); v != nil { b, _ = json.Marshal(pb.GetGroupUserMsg())
msg["group"] = v err = json.Unmarshal(b, &mReplyData.GroupUserMsg)
} else { if err != nil {
msg["group"] = []pbChat.GatherFormat{} log.NewError(m.OperationID, "GroupUserMsg,json Unmarshal,err", err.Error())
}
c, _ := proto.Marshal(&mReplyData)
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
ErrCode: pb.GetErrCode(),
ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID,
Data: c,
} }
msg["maxSeq"] = pb.GetMaxSeq()
msg["minSeq"] = pb.GetMinSeq()
mReply["data"] = msg
ws.sendMsg(conn, mReply) ws.sendMsg(conn, mReply)
} }
@ -153,7 +161,7 @@ func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList) isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList)
if isPass { if isPass {
pbData := pbChat.PullMessageBySeqListReq{} pbData := pbChat.PullMessageBySeqListReq{}
pbData.SeqList = data.(SeqListData).SeqList pbData.SeqList = data.(pbWs.PullMessageBySeqListReq).SeqList
pbData.UserID = m.SendID pbData.UserID = m.SendID
pbData.OperationID = m.OperationID pbData.OperationID = m.OperationID
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
@ -174,22 +182,19 @@ func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.UserSendMsgResp, sendTime int64) { func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.UserSendMsgResp, sendTime int64) {
// := make(map[string]interface{}) // := make(map[string]interface{})
mReplyData := make(map[string]interface{})
//mReply["reqIdentifier"] = m.ReqIdentifier var mReplyData pbWs.UserSendMsgResp
//mReply["msgIncr"] = m.MsgIncr mReplyData.ClientMsgID = pb.GetClientMsgID()
//mReply["errCode"] = pb.GetErrCode() mReplyData.ServerMsgID = pb.GetServerMsgID()
//mReply["errMsg"] = pb.GetErrMsg() mReplyData.SendTime = sendTime
mReplyData["clientMsgID"] = pb.GetClientMsgID() b, _ := proto.Marshal(&mReplyData)
mReplyData["serverMsgID"] = pb.GetServerMsgID()
mReplyData["sendTime"] = utils.Int64ToString(sendTime)
//mReply["data"] = mReplyData
mReply := Resp{ mReply := Resp{
ReqIdentifier: m.ReqIdentifier, ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr, MsgIncr: m.MsgIncr,
ErrCode: pb.GetErrCode(), ErrCode: pb.GetErrCode(),
ErrMsg: pb.GetErrMsg(), ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID, OperationID: m.OperationID,
Data: mReplyData, Data: b,
} }
fmt.Println("test fmt send msg resp", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID) fmt.Println("test fmt send msg resp", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID)
ws.sendMsg(conn, mReply) ws.sendMsg(conn, mReply)
@ -200,7 +205,7 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req, sendTime int64) {
reply := new(pbChat.UserSendMsgResp) reply := new(pbChat.UserSendMsgResp)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg) isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg)
if isPass { if isPass {
data := pData.(MsgData) data := pData.(pbWs.UserSendMsgReq)
pbData := pbChat.UserSendMsgReq{ pbData := pbChat.UserSendMsgReq{
ReqIdentifier: m.ReqIdentifier, ReqIdentifier: m.ReqIdentifier,
Token: m.Token, Token: m.Token,
@ -213,9 +218,8 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req, sendTime int64) {
RecvID: data.RecvID, RecvID: data.RecvID,
ForceList: data.ForceList, ForceList: data.ForceList,
Content: data.Content, Content: data.Content,
Options: utils.MapToJsonString(data.Options), Options: utils.MapIntToJsonString(data.Options),
ClientMsgID: data.ClientMsgID, ClientMsgID: data.ClientMsgID,
OffLineInfo: utils.MapToJsonString(data.OfflineInfo),
SendTime: sendTime, SendTime: sendTime,
} }
log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m) log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m)

View File

@ -9,8 +9,8 @@ package gate
import ( import (
"Open_IM/src/common/constant" "Open_IM/src/common/constant"
"Open_IM/src/common/log" "Open_IM/src/common/log"
"bytes" pbWs "Open_IM/src/proto/sdk_ws"
"encoding/gob" "github.com/golang/protobuf/proto"
) )
type Req struct { type Req struct {
@ -27,7 +27,7 @@ type Resp struct {
OperationID string `json:"operationID"` OperationID string `json:"operationID"`
ErrCode int32 `json:"errCode"` ErrCode int32 `json:"errCode"`
ErrMsg string `json:"errMsg"` ErrMsg string `json:"errMsg"`
Data interface{} `json:"data"` Data []byte `json:"data"`
} }
type SeqData struct { type SeqData struct {
@ -47,37 +47,57 @@ type MsgData struct {
OfflineInfo map[string]interface{} `mapstructure:"offlineInfo" validate:"required"` OfflineInfo map[string]interface{} `mapstructure:"offlineInfo" validate:"required"`
Ext map[string]interface{} `mapstructure:"ext"` Ext map[string]interface{} `mapstructure:"ext"`
} }
type MaxSeqResp struct {
MaxSeq int64 `json:"maxSeq"`
}
type PullMessageResp struct {
}
type SeqListData struct { type SeqListData struct {
SeqList []int64 `mapstructure:"seqList" validate:"required"` SeqList []int64 `mapstructure:"seqList" validate:"required"`
} }
func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, data interface{}) { func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, returnData interface{}) {
switch r { switch r {
case constant.WSPullMsg:
data = SeqData{}
case constant.WSSendMsg: case constant.WSSendMsg:
data = MsgData{} data := pbWs.UserSendMsgReq{}
case constant.WSPullMsgBySeqList: if err := proto.Unmarshal(m.Data, &data); err != nil {
data = SeqListData{}
default:
}
b := bytes.NewBuffer(m.Data)
dec := gob.NewDecoder(b)
err := dec.Decode(&data)
if err != nil {
log.ErrorByKv("Decode Data struct err", "", "err", err.Error(), "reqIdentifier", r) log.ErrorByKv("Decode Data struct err", "", "err", err.Error(), "reqIdentifier", r)
return false, 203, err.Error(), nil return false, 203, err.Error(), nil
} }
if err := validate.Struct(data); err != nil {
log.ErrorByKv("data args validate err", "", "err", err.Error(), "reqIdentifier", r)
return false, 204, err.Error(), nil
}
return true, 0, "", data
case constant.WSPullMsgBySeqList:
data := pbWs.PullMessageBySeqListReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.ErrorByKv("Decode Data struct err", "", "err", err.Error(), "reqIdentifier", r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.ErrorByKv("data args validate err", "", "err", err.Error(), "reqIdentifier", r)
return false, 204, err.Error(), nil
}
return true, 0, "", data
default:
}
return false, 204, "args err", nil
//b := bytes.NewBuffer(m.Data)
//dec := gob.NewDecoder(b)
//err := dec.Decode(&data)
//if err != nil {
// log.ErrorByKv("Decode Data struct err", "", "err", err.Error(), "reqIdentifier", r)
// return false, 203, err.Error(), nil
//}
//if err := mapstructure.WeakDecode(m.Data, &data); err != nil { //if err := mapstructure.WeakDecode(m.Data, &data); err != nil {
// log.ErrorByKv("map to Data struct err", "", "err", err.Error(), "reqIdentifier", r) // log.ErrorByKv("map to Data struct err", "", "err", err.Error(), "reqIdentifier", r)
// return false, 203, err.Error(), nil // return false, 203, err.Error(), nil
//} else //} else
if err = validate.Struct(data); err != nil {
log.ErrorByKv("data args validate err", "", "err", err.Error(), "reqIdentifier", r)
return false, 204, err.Error(), nil
} else {
return true, 0, "", data
}
} }

View File

@ -1,6 +1,6 @@
syntax = "proto3"; syntax = "proto3";
package pbWs;//The package name to which the proto file belongs package open_im_sdk;//The package name to which the proto file belongs
option go_package = "./sdk_ws;pbWs";//The generated go pb file is in the current directory, and the package name is pbChat option go_package = "./sdk_ws;open_im_sdk";//The generated go pb file is in the current directory, and the package name is open_im_sdk
message PullMessageBySeqListResp { message PullMessageBySeqListResp {

View File

@ -106,6 +106,11 @@ func MapToJsonString(param map[string]interface{}) string {
dataString := string(dataType) dataString := string(dataType)
return dataString return dataString
} }
func MapIntToJsonString(param map[string]int32) string {
dataType, _ := json.Marshal(param)
dataString := string(dataType)
return dataString
}
func JsonStringToMap(str string) (tempMap map[string]interface{}) { func JsonStringToMap(str string) (tempMap map[string]interface{}) {
_ = json.Unmarshal([]byte(str), &tempMap) _ = json.Unmarshal([]byte(str), &tempMap)
return tempMap return tempMap