change send message by ws and optimization

This commit is contained in:
Gordon 2021-10-21 19:10:55 +08:00
parent c8155553b8
commit 098c7cfa9e
24 changed files with 603 additions and 227 deletions

View File

@ -8,18 +8,22 @@ source ./path_info.cfg
#Check if the service exists #Check if the service exists
#If it is exists,kill this process #If it is exists,kill this process
check=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep| wc -l` check=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep| wc -l`
if [ $check -eq 1 ] if [ $check -ge 1 ]
then then
oldPid=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep|awk '{print $2}'` oldPid=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep|awk '{print $2}'`
kill -9 $oldPid kill -9 $oldPid
fi fi
#Waiting port recycling #Waiting port recycling
sleep 1 sleep 1
cd ${msg_transfer_binary_root} cd ${msg_transfer_binary_root}
nohup ./${msg_transfer_name} >>../logs/openIM.log 2>&1 & for ((i = 0; i < ${msg_transfer_service_num}; i++)); do
nohup ./${msg_transfer_name} >>../logs/openIM.log 2>&1 &
done
#Check launched service process #Check launched service process
check=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep| wc -l` check=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep| wc -l`
if [ $check -eq 1 ] if [ $check -ge 1 ]
then then
newPid=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep|awk '{print $2}'` newPid=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep|awk '{print $2}'`
ports=`netstat -netulp | grep -w ${newPid}|awk '{print $4}'|awk -F '[:]' '{print $NF}'` ports=`netstat -netulp | grep -w ${newPid}|awk '{print $4}'|awk -F '[:]' '{print $NF}'`

View File

@ -16,6 +16,7 @@ push_source_root="../src/push/"
msg_transfer_name="open_im_msg_transfer" msg_transfer_name="open_im_msg_transfer"
msg_transfer_binary_root="../bin/" msg_transfer_binary_root="../bin/"
msg_transfer_source_root="../src/msg_transfer/" msg_transfer_source_root="../src/msg_transfer/"
msg_transfer_service_num=2
sdk_server_name="open_im_sdk_server" sdk_server_name="open_im_sdk_server"

View File

@ -13,7 +13,7 @@ rpc_ports=($ports_array)
#Check if the service exists #Check if the service exists
#If it is exists,kill this process #If it is exists,kill this process
check=$(ps aux | grep -w ./${push_name} | grep -v grep | wc -l) check=$(ps aux | grep -w ./${push_name} | grep -v grep | wc -l)
if [ $check -eq 1 ]; then if [ $check -ge 1 ]; then
oldPid=$(ps aux | grep -w ./${push_name} | grep -v grep | awk '{print $2}') oldPid=$(ps aux | grep -w ./${push_name} | grep -v grep | awk '{print $2}')
kill -9 $oldPid kill -9 $oldPid
fi fi
@ -28,7 +28,7 @@ done
sleep 3 sleep 3
#Check launched service process #Check launched service process
check=$(ps aux | grep -w ./${push_name} | grep -v grep | wc -l) check=$(ps aux | grep -w ./${push_name} | grep -v grep | wc -l)
if [ $check -eq 1 ]; then if [ $check -ge 1 ]; then
newPid=$(ps aux | grep -w ./${push_name} | grep -v grep | awk '{print $2}') newPid=$(ps aux | grep -w ./${push_name} | grep -v grep | awk '{print $2}')
ports=$(netstat -netulp | grep -w ${newPid} | awk '{print $4}' | awk -F '[:]' '{print $NF}') ports=$(netstat -netulp | grep -w ${newPid} | awk '{print $4}' | awk -F '[:]' '{print $NF}')
allPorts="" allPorts=""

View File

@ -70,3 +70,58 @@ func UserPullMsg(c *gin.Context) {
}) })
} }
type paramsUserPullMsgBySeqList struct {
ReqIdentifier int `json:"reqIdentifier" binding:"required"`
SendID string `json:"sendID" binding:"required"`
OperationID string `json:"operationID" binding:"required"`
SeqList []int64 `json:"seqList"`
}
func UserPullMsgBySeqList(c *gin.Context) {
params := paramsUserPullMsgBySeqList{}
if err := c.BindJSON(&params); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()})
return
}
token := c.Request.Header.Get("token")
if !utils.VerifyToken(token, params.SendID) {
c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "token validate err"})
return
}
pbData := pbChat.PullMessageBySeqListReq{}
pbData.UserID = params.SendID
pbData.OperationID = params.OperationID
pbData.SeqList = params.SeqList
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
msgClient := pbChat.NewChatClient(grpcConn)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &pbData)
if err != nil {
log.ErrorByKv("PullMessageBySeqList error", pbData.OperationID, "err", err.Error())
return
}
log.InfoByKv("rpc call success to PullMessageBySeqList", pbData.OperationID, "ReplyArgs", reply.String(), "maxSeq", reply.GetMaxSeq(),
"MinSeq", reply.GetMinSeq(), "singLen", len(reply.GetSingleUserMsg()), "groupLen", len(reply.GetGroupUserMsg()))
msg := make(map[string]interface{})
if v := reply.GetSingleUserMsg(); v != nil {
msg["single"] = v
} else {
msg["single"] = []pbChat.GatherFormat{}
}
if v := reply.GetGroupUserMsg(); v != nil {
msg["group"] = v
} else {
msg["group"] = []pbChat.GatherFormat{}
}
msg["maxSeq"] = reply.GetMaxSeq()
msg["minSeq"] = reply.GetMinSeq()
c.JSON(http.StatusOK, gin.H{
"errCode": reply.ErrCode,
"errMsg": reply.ErrMsg,
"reqIdentifier": params.ReqIdentifier,
"data": msg,
})
}

View File

@ -87,7 +87,7 @@ func UserSendMsg(c *gin.Context) {
"data": gin.H{ "data": gin.H{
"clientMsgID": reply.ClientMsgID, "clientMsgID": reply.ClientMsgID,
"serverMsgID": reply.ServerMsgID, "serverMsgID": reply.ServerMsgID,
"sendTime": reply.SendTime, "sendTime": 0,
}, },
}) })

View File

@ -41,7 +41,9 @@ func ImportFriend(c *gin.Context) {
OwnerUid: params.OwnerUid, OwnerUid: params.OwnerUid,
Token: c.Request.Header.Get("token"), Token: c.Request.Header.Get("token"),
} }
log.ErrorByKv("ImportFriend Test start", params.OperationID)
RpcResp, err := client.ImportFriend(context.Background(), req) RpcResp, err := client.ImportFriend(context.Background(), req)
//log.ErrorByKv("ImportFriend Test end", params.OperationID, "resp", RpcResp, "err", err.Error())
if err != nil { if err != nil {
log.Error(req.Token, req.OperationID, "err=%s,ImportFriend failed", err) log.Error(req.Token, req.OperationID, "err=%s,ImportFriend failed", err)
c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "cImportFriend failed" + err.Error()}) c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "cImportFriend failed" + err.Error()})

View File

@ -125,7 +125,7 @@ func ManagementSendMsg(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{ c.JSON(http.StatusOK, gin.H{
"errCode": reply.ErrCode, "errCode": reply.ErrCode,
"errMsg": reply.ErrMsg, "errMsg": reply.ErrMsg,
"sendTime": reply.SendTime, "sendTime": "",
"msgID": reply.ClientMsgID, "msgID": reply.ClientMsgID,
}) })

View File

@ -86,6 +86,7 @@ func main() {
chatGroup.POST("/newest_seq", apiChat.UserNewestSeq) chatGroup.POST("/newest_seq", apiChat.UserNewestSeq)
chatGroup.POST("/pull_msg", apiChat.UserPullMsg) chatGroup.POST("/pull_msg", apiChat.UserPullMsg)
chatGroup.POST("/send_msg", apiChat.UserSendMsg) chatGroup.POST("/send_msg", apiChat.UserSendMsg)
chatGroup.POST("/pull_msg_by_seq", apiChat.UserPullMsgBySeqList)
} }
//Manager //Manager
managementGroup := r.Group("/manager") managementGroup := r.Group("/manager")

View File

@ -17,10 +17,12 @@ const (
RefuseFriendFlag = -1 RefuseFriendFlag = -1
//Websocket Protocol //Websocket Protocol
WSGetNewestSeq = 1001 WSGetNewestSeq = 1001
WSPullMsg = 1002 WSPullMsg = 1002
WSSendMsg = 1003 WSSendMsg = 1003
WSPushMsg = 2001 WSPullMsgBySeqList = 1004
WSPushMsg = 2001
WSDataError = 3001
///ContentType ///ContentType
//UserRelated //UserRelated

View File

@ -3,6 +3,7 @@ package db
import ( import (
"Open_IM/src/common/config" "Open_IM/src/common/config"
"Open_IM/src/common/constant" "Open_IM/src/common/constant"
"Open_IM/src/common/log"
pbMsg "Open_IM/src/proto/chat" pbMsg "Open_IM/src/proto/chat"
"errors" "errors"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -28,8 +29,8 @@ type GroupMember struct {
UIDList []string UIDList []string
} }
func (d *DataBases) GetUserChat(uid string, seqBegin, seqEnd int64) (SingleMsg []*pbMsg.MsgFormat, GroupMsg []*pbMsg.MsgFormat, MaxSeq int64, MinSeq int64, err error) { func (d *DataBases) GetMsgBySeqRange(uid string, seqBegin, seqEnd int64) (SingleMsg []*pbMsg.MsgFormat, GroupMsg []*pbMsg.MsgFormat, MaxSeq int64, MinSeq int64, err error) {
count := 0 var count int64
session := d.mgoSession.Clone() session := d.mgoSession.Clone()
if session == nil { if session == nil {
return nil, nil, MaxSeq, MinSeq, errors.New("session == nil") return nil, nil, MaxSeq, MinSeq, errors.New("session == nil")
@ -76,27 +77,85 @@ func (d *DataBases) GetUserChat(uid string, seqBegin, seqEnd int64) (SingleMsg [
GroupMsg = append(GroupMsg, temp) GroupMsg = append(GroupMsg, temp)
} }
count++ count++
if count == (seqEnd - seqBegin + 1) {
break
}
} }
} }
return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil
} }
func (d *DataBases) GetMsgBySeqList(uid string, seqList []int64) (SingleMsg []*pbMsg.MsgFormat, GroupMsg []*pbMsg.MsgFormat, MaxSeq int64, MinSeq int64, err error) {
count := 0
session := d.mgoSession.Clone()
if session == nil {
return nil, nil, MaxSeq, MinSeq, errors.New("session == nil")
}
defer session.Close()
c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
sChat := UserChat{}
if err = c.Find(bson.M{"uid": uid}).One(&sChat); err != nil {
return nil, nil, MaxSeq, MinSeq, err
}
pChat := pbMsg.MsgSvrToPushSvrChatMsg{}
for i := 0; i < len(sChat.Msg); i++ {
temp := new(pbMsg.MsgFormat)
if err = proto.Unmarshal(sChat.Msg[i].Msg, &pChat); err != nil {
return nil, nil, MaxSeq, MinSeq, err
}
if isContainInt64(pChat.RecvSeq, seqList) {
temp.SendID = pChat.SendID
temp.RecvID = pChat.RecvID
temp.MsgFrom = pChat.MsgFrom
temp.Seq = pChat.RecvSeq
temp.ServerMsgID = pChat.MsgID
temp.SendTime = pChat.SendTime
temp.Content = pChat.Content
temp.ContentType = pChat.ContentType
temp.SenderPlatformID = pChat.PlatformID
temp.ClientMsgID = pChat.ClientMsgID
temp.SenderFaceURL = pChat.SenderFaceURL
temp.SenderNickName = pChat.SenderNickName
if pChat.RecvSeq > MaxSeq {
MaxSeq = pChat.RecvSeq
}
if count == 0 {
MinSeq = pChat.RecvSeq
}
if pChat.RecvSeq < MinSeq {
MinSeq = pChat.RecvSeq
}
if pChat.SessionType == constant.SingleChatType {
SingleMsg = append(SingleMsg, temp)
} else {
GroupMsg = append(GroupMsg, temp)
}
count++
if count == len(seqList) {
break
}
}
}
return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil
}
func (d *DataBases) SaveUserChat(uid string, sendTime int64, m proto.Message) error { func (d *DataBases) SaveUserChat(uid string, sendTime int64, m proto.Message) error {
newTime := getCurrentTimestampByMill()
session := d.mgoSession.Clone() session := d.mgoSession.Clone()
if session == nil { if session == nil {
return errors.New("session == nil") return errors.New("session == nil")
} }
defer session.Close() defer session.Close()
log.NewInfo("", "get mgoSession cost time", getCurrentTimestampByMill()-newTime)
c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) c := session.DB(config.Config.Mongo.DBDatabase).C(cChat)
n, err := c.Find(bson.M{"uid": uid}).Count() n, err := c.Find(bson.M{"uid": uid}).Count()
if err != nil { if err != nil {
return err return err
} }
log.NewInfo("", "find mgo uid cost time", getCurrentTimestampByMill()-newTime)
sMsg := MsgInfo{} sMsg := MsgInfo{}
sMsg.SendTime = sendTime sMsg.SendTime = sendTime
if sMsg.Msg, err = proto.Marshal(m); err != nil { if sMsg.Msg, err = proto.Marshal(m); err != nil {
@ -117,7 +176,7 @@ func (d *DataBases) SaveUserChat(uid string, sendTime int64, m proto.Message) er
return err return err
} }
} }
log.NewInfo("", "insert mgo data cost time", getCurrentTimestampByMill()-newTime)
return nil return nil
} }
@ -231,3 +290,17 @@ func (d *DataBases) DelGroupMember(groupID, uid string) error {
return nil return nil
} }
func isContainInt64(target int64, List []int64) bool {
for _, element := range List {
if target == element {
return true
}
}
return false
}
func getCurrentTimestampByMill() int64 {
return time.Now().UnixNano() / 1e6
}

View File

@ -9,11 +9,13 @@ import (
"Open_IM/src/utils" "Open_IM/src/utils"
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"runtime"
"strings" "strings"
) )
func (ws *WServer) msgParse(conn *websocket.Conn, jsonMsg []byte) { func (ws *WServer) msgParse(conn *UserConn, jsonMsg []byte) {
//ws online debug data //ws online debug data
//{"ReqIdentifier":1001,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0} //{"ReqIdentifier":1001,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0}
//{"ReqIdentifier":1002,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0,"SeqBegin":1,"SeqEnd":6} //{"ReqIdentifier":1002,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0,"SeqBegin":1,"SeqEnd":6}
@ -23,32 +25,43 @@ func (ws *WServer) msgParse(conn *websocket.Conn, jsonMsg []byte) {
m := Req{} m := Req{}
if err := json.Unmarshal(jsonMsg, &m); err != nil { if err := json.Unmarshal(jsonMsg, &m); err != nil {
log.ErrorByKv("ws json Unmarshal err", "", "err", err.Error()) log.ErrorByKv("ws json Unmarshal err", "", "err", err.Error())
ws.sendErrMsg(conn, 200, err.Error()) ws.sendErrMsg(conn, 200, err.Error(), constant.WSDataError, "")
err = conn.Close()
if err != nil {
log.NewError("", "ws close err", err.Error())
}
return return
} }
if err := validate.Struct(m); err != nil { if err := validate.Struct(m); err != nil {
log.ErrorByKv("ws args validate err", "", "err", err.Error()) log.ErrorByKv("ws args validate err", "", "err", err.Error())
ws.sendErrMsg(conn, 201, err.Error()) ws.sendErrMsg(conn, 201, err.Error(), m.ReqIdentifier, m.MsgIncr)
return return
} }
if !utils.VerifyToken(m.Token, m.SendID) { if !utils.VerifyToken(m.Token, m.SendID) {
ws.sendErrMsg(conn, 202, "token validate err") ws.sendErrMsg(conn, 202, "token validate err", m.ReqIdentifier, m.MsgIncr)
return return
} }
fmt.Println("test fmt Basic Info Authentication Success", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID)
log.InfoByKv("Basic Info Authentication Success", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID) log.InfoByKv("Basic Info Authentication Success", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID)
switch m.ReqIdentifier { switch m.ReqIdentifier {
case constant.WSGetNewestSeq: case constant.WSGetNewestSeq:
ws.newestSeqReq(conn, &m) go ws.newestSeqReq(conn, &m)
case constant.WSPullMsg: case constant.WSPullMsg:
ws.pullMsgReq(conn, &m) go ws.pullMsgReq(conn, &m)
case constant.WSSendMsg: case constant.WSSendMsg:
ws.sendMsgReq(conn, &m) sendTime := utils.GetCurrentTimestampByNano()
go ws.sendMsgReq(conn, &m, sendTime)
case constant.WSPullMsgBySeqList:
go ws.pullMsgBySeqListReq(conn, &m)
default: default:
} }
log.NewInfo("", "goroutine num is ", runtime.NumGoroutine())
} }
func (ws *WServer) newestSeqResp(conn *websocket.Conn, m *Req, pb *pbChat.GetNewSeqResp) { func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetNewSeqResp) {
mReply := make(map[string]interface{}) mReply := make(map[string]interface{})
mData := make(map[string]interface{}) mData := make(map[string]interface{})
mReply["reqIdentifier"] = m.ReqIdentifier mReply["reqIdentifier"] = m.ReqIdentifier
@ -59,7 +72,7 @@ func (ws *WServer) newestSeqResp(conn *websocket.Conn, m *Req, pb *pbChat.GetNew
mReply["data"] = mData mReply["data"] = mData
ws.sendMsg(conn, mReply) ws.sendMsg(conn, mReply)
} }
func (ws *WServer) newestSeqReq(conn *websocket.Conn, m *Req) { func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) {
log.InfoByKv("Ws call success to getNewSeq", m.OperationID, "Parameters", m) log.InfoByKv("Ws call success to getNewSeq", m.OperationID, "Parameters", m)
pbData := pbChat.GetNewSeqReq{} pbData := pbChat.GetNewSeqReq{}
pbData.UserID = m.SendID pbData.UserID = m.SendID
@ -79,7 +92,7 @@ func (ws *WServer) newestSeqReq(conn *websocket.Conn, m *Req) {
} }
func (ws *WServer) pullMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.PullMessageResp) { func (ws *WServer) pullMsgResp(conn *UserConn, m *Req, pb *pbChat.PullMessageResp) {
mReply := make(map[string]interface{}) mReply := make(map[string]interface{})
msg := make(map[string]interface{}) msg := make(map[string]interface{})
mReply["reqIdentifier"] = m.ReqIdentifier mReply["reqIdentifier"] = m.ReqIdentifier
@ -104,7 +117,7 @@ func (ws *WServer) pullMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.PullMess
} }
func (ws *WServer) pullMsgReq(conn *websocket.Conn, m *Req) { func (ws *WServer) pullMsgReq(conn *UserConn, m *Req) {
log.InfoByKv("Ws call success to pullMsgReq", m.OperationID, "Parameters", m) log.InfoByKv("Ws call success to pullMsgReq", m.OperationID, "Parameters", m)
reply := new(pbChat.PullMessageResp) reply := new(pbChat.PullMessageResp)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsg) isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsg)
@ -130,21 +143,55 @@ func (ws *WServer) pullMsgReq(conn *websocket.Conn, m *Req) {
ws.pullMsgResp(conn, m, reply) ws.pullMsgResp(conn, m, reply)
} }
} }
func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) {
log.NewError(m.OperationID, "Ws call success to pullMsgBySeqListReq", m)
reply := new(pbChat.PullMessageResp)
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList)
if isPass {
pbData := pbChat.PullMessageBySeqListReq{}
pbData.SeqList = data.(SeqListData).SeqList
pbData.UserID = m.SendID
pbData.OperationID = m.OperationID
grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
msgClient := pbChat.NewChatClient(grpcConn)
reply, err := msgClient.PullMessageBySeqList(context.Background(), &pbData)
if err != nil {
log.NewError(pbData.OperationID, "pullMsgBySeqListReq err", err.Error())
return
}
log.NewInfo(pbData.OperationID, "rpc call success to pullMsgBySeqListReq", reply.String(), reply.GetMaxSeq(), reply.GetMinSeq(), len(reply.GetSingleUserMsg()), len(reply.GetGroupUserMsg()))
ws.pullMsgResp(conn, m, reply)
} else {
reply.ErrCode = errCode
reply.ErrMsg = errMsg
ws.pullMsgResp(conn, m, reply)
}
}
func (ws *WServer) sendMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.UserSendMsgResp) { func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.UserSendMsgResp, sendTime int64) {
mReply := make(map[string]interface{}) // := make(map[string]interface{})
mReplyData := make(map[string]interface{}) mReplyData := make(map[string]interface{})
mReply["reqIdentifier"] = m.ReqIdentifier //mReply["reqIdentifier"] = m.ReqIdentifier
mReply["msgIncr"] = m.MsgIncr //mReply["msgIncr"] = m.MsgIncr
mReply["errCode"] = pb.GetErrCode() //mReply["errCode"] = pb.GetErrCode()
mReply["errMsg"] = pb.GetErrMsg() //mReply["errMsg"] = pb.GetErrMsg()
mReplyData["clientMsgID"] = pb.GetClientMsgID() mReplyData["clientMsgID"] = pb.GetClientMsgID()
mReplyData["serverMsgID"] = pb.GetServerMsgID() mReplyData["serverMsgID"] = pb.GetServerMsgID()
mReply["data"] = mReplyData mReplyData["sendTime"] = utils.Int64ToString(sendTime)
//mReply["data"] = mReplyData
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
ErrCode: pb.GetErrCode(),
ErrMsg: pb.GetErrMsg(),
OperationID: m.OperationID,
Data: mReplyData,
}
fmt.Println("test fmt send msg resp", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID)
ws.sendMsg(conn, mReply) ws.sendMsg(conn, mReply)
} }
func (ws *WServer) sendMsgReq(conn *websocket.Conn, m *Req) { func (ws *WServer) sendMsgReq(conn *UserConn, m *Req, sendTime int64) {
log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m) log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m)
reply := new(pbChat.UserSendMsgResp) reply := new(pbChat.UserSendMsgResp)
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg) isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg)
@ -165,32 +212,43 @@ func (ws *WServer) sendMsgReq(conn *websocket.Conn, m *Req) {
Options: utils.MapToJsonString(data.Options), Options: utils.MapToJsonString(data.Options),
ClientMsgID: data.ClientMsgID, ClientMsgID: data.ClientMsgID,
OffLineInfo: utils.MapToJsonString(data.OfflineInfo), OffLineInfo: utils.MapToJsonString(data.OfflineInfo),
SendTime: sendTime,
} }
log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m) log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m)
time := utils.GetCurrentTimestampBySecond()
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName)
client := pbChat.NewChatClient(etcdConn) client := pbChat.NewChatClient(etcdConn)
log.Info("", "", "api UserSendMsg call, api call rpc...") log.Info("", "", "ws UserSendMsg call, api call rpc...")
reply, _ := client.UserSendMsg(context.Background(), &pbData) reply, err := client.UserSendMsg(context.Background(), &pbData)
if err != nil {
log.NewError(pbData.OperationID, "UserSendMsg err", err.Error())
reply.ErrCode = 100
reply.ErrMsg = "rpc err"
}
log.NewInfo(pbData.OperationID, "sendMsgReq call rpc cost time ", utils.GetCurrentTimestampBySecond()-time)
log.Info("", "", "api UserSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), reply.String()) log.Info("", "", "api UserSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), reply.String())
ws.sendMsgResp(conn, m, reply) ws.sendMsgResp(conn, m, reply, sendTime)
log.NewInfo(pbData.OperationID, "sendMsgResp end cost time ", utils.GetCurrentTimestampBySecond()-time)
} else { } else {
reply.ErrCode = errCode reply.ErrCode = errCode
reply.ErrMsg = errMsg reply.ErrMsg = errMsg
ws.sendMsgResp(conn, m, reply) ws.sendMsgResp(conn, m, reply, sendTime)
} }
} }
func (ws *WServer) sendMsg(conn *websocket.Conn, mReply map[string]interface{}) { func (ws *WServer) sendMsg(conn *UserConn, mReply interface{}) {
bMsg, _ := json.Marshal(mReply) bMsg, _ := json.Marshal(mReply)
err := ws.writeMsg(conn, websocket.TextMessage, bMsg) err := ws.writeMsg(conn, websocket.TextMessage, bMsg)
if err != nil { if err != nil {
log.ErrorByKv("WS WriteMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err, "mReply", mReply) log.ErrorByKv("WS WriteMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err, "mReply", mReply)
} }
} }
func (ws *WServer) sendErrMsg(conn *websocket.Conn, errCode int32, errMsg string) { func (ws *WServer) sendErrMsg(conn *UserConn, errCode int32, errMsg string, reqIdentifier int32, msgIncr string) {
mReply := make(map[string]interface{}) mReply := make(map[string]interface{})
mReply["errCode"] = errCode mReply["errCode"] = errCode
mReply["errMsg"] = errMsg mReply["errMsg"] = errMsg
mReply["reqIdentifier"] = reqIdentifier
mReply["msgIncr"] = msgIncr
ws.sendMsg(conn, mReply) ws.sendMsg(conn, mReply)
} }

View File

@ -81,10 +81,12 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR
case constant.GroupChatType: case constant.GroupChatType:
RecvID = strings.Split(in.GetRecvID(), " ")[0] RecvID = strings.Split(in.GetRecvID(), " ")[0]
} }
log.InfoByKv("test", in.OperationID, "wsUserToConn", ws.wsUserToConn) var tag bool
for key, conn := range ws.wsUserToConn { a := genUidPlatformArray(RecvID)
UIDAndPID := strings.Split(key, " ") for _, v := range a {
if UIDAndPID[0] == RecvID { if conn := ws.getUserConn(v); conn != nil {
UIDAndPID := strings.Split(v, " ")
tag = true
resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0]) resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0])
temp := &pbRelay.SingleMsgToUser{ temp := &pbRelay.SingleMsgToUser{
ResultCode: resultCode, ResultCode: resultCode,
@ -94,6 +96,25 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR
resp = append(resp, temp) resp = append(resp, temp)
} }
} }
if !tag {
log.NewError(in.OperationID, "push err ,ws conn not in map", in.String())
}
//for key, conn := range ws.wsUserToConn {
// UIDAndPID := strings.Split(key, " ")
// if UIDAndPID[0] == RecvID {
// tag = true
// resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0])
// temp := &pbRelay.SingleMsgToUser{
// ResultCode: resultCode,
// RecvID: UIDAndPID[0],
// RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]),
// }
// resp = append(resp, temp)
// }
//}
//if !tag {
// log.NewError(in.OperationID, "push err ,ws conn not in map", in.String())
//}
//switch in.GetContentType() { //switch in.GetContentType() {
//case constant.SyncSenderMsg: //case constant.SyncSenderMsg:
// log.InfoByKv("come sync", in.OperationID, "args", in.String()) // log.InfoByKv("come sync", in.OperationID, "args", in.String())
@ -143,7 +164,7 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR
}, nil }, nil
} }
func sendMsgToUser(conn *websocket.Conn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) { func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) {
err := ws.writeMsg(conn, websocket.TextMessage, bMsg) err := ws.writeMsg(conn, websocket.TextMessage, bMsg)
if err != nil { if err != nil {
log.ErrorByKv("PushMsgToUser is failed By Ws", "", "Addr", conn.RemoteAddr().String(), log.ErrorByKv("PushMsgToUser is failed By Ws", "", "Addr", conn.RemoteAddr().String(),
@ -157,3 +178,9 @@ func sendMsgToUser(conn *websocket.Conn, bMsg []byte, in *pbRelay.MsgToUserReq,
} }
} }
func genUidPlatformArray(uid string) (array []string) {
for i := 1; i <= utils.LinuxPlatformID; i++ {
array = append(array, uid+" "+utils.PlatformIDToName(int32(i)))
}
return array
}

View File

@ -13,13 +13,22 @@ import (
) )
type Req struct { type Req struct {
ReqIdentifier int32 `json:"reqIdentifier" validate:"required"` ReqIdentifier int32 `json:"reqIdentifier" validate:"required"`
Token string `json:"token" validate:"required"` Token string `json:"token" validate:"required"`
SendID string `json:"sendID" validate:"required"` SendID string `json:"sendID" validate:"required"`
OperationID string `json:"operationID" validate:"required"` OperationID string `json:"operationID" validate:"required"`
MsgIncr int32 `json:"msgIncr" validate:"required"` MsgIncr string `json:"msgIncr" validate:"required"`
Data map[string]interface{} `json:"data"` Data interface{} `json:"data"`
} }
type Resp struct {
ReqIdentifier int32 `json:"reqIdentifier"`
MsgIncr string `json:"msgIncr"`
OperationID string `json:"operationID"`
ErrCode int32 `json:"errCode"`
ErrMsg string `json:"errMsg"`
Data interface{} `json:"data"`
}
type SeqData struct { type SeqData struct {
SeqBegin int64 `mapstructure:"seqBegin" validate:"required"` SeqBegin int64 `mapstructure:"seqBegin" validate:"required"`
SeqEnd int64 `mapstructure:"seqEnd" validate:"required"` SeqEnd int64 `mapstructure:"seqEnd" validate:"required"`
@ -30,13 +39,16 @@ type MsgData struct {
MsgFrom int32 `mapstructure:"msgFrom" validate:"required"` MsgFrom int32 `mapstructure:"msgFrom" validate:"required"`
ContentType int32 `mapstructure:"contentType" validate:"required"` ContentType int32 `mapstructure:"contentType" validate:"required"`
RecvID string `mapstructure:"recvID" validate:"required"` RecvID string `mapstructure:"recvID" validate:"required"`
ForceList []string `mapstructure:"forceList" validate:"required"` ForceList []string `mapstructure:"forceList"`
Content string `mapstructure:"content" validate:"required"` Content string `mapstructure:"content" validate:"required"`
Options map[string]interface{} `mapstructure:"options" validate:"required"` Options map[string]interface{} `mapstructure:"options" validate:"required"`
ClientMsgID string `mapstructure:"clientMsgID" validate:"required"` ClientMsgID string `mapstructure:"clientMsgID" validate:"required"`
OfflineInfo map[string]interface{} `mapstructure:"offlineInfo" validate:"required"` OfflineInfo map[string]interface{} `mapstructure:"offlineInfo" validate:"required"`
Ext map[string]interface{} `mapstructure:"ext"` Ext map[string]interface{} `mapstructure:"ext"`
} }
type SeqListData struct {
SeqList []int64 `mapstructure:"seqList" validate:"required"`
}
func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, data interface{}) { func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, data interface{}) {
switch r { switch r {
@ -44,6 +56,8 @@ func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, er
data = SeqData{} data = SeqData{}
case constant.WSSendMsg: case constant.WSSendMsg:
data = MsgData{} data = MsgData{}
case constant.WSPullMsgBySeqList:
data = SeqListData{}
default: default:
} }
if err := mapstructure.WeakDecode(m.Data, &data); err != nil { if err := mapstructure.WeakDecode(m.Data, &data); err != nil {

View File

@ -6,23 +6,28 @@ import (
"Open_IM/src/utils" "Open_IM/src/utils"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"net/http" "net/http"
"sync"
"time" "time"
) )
type UserConn struct {
*websocket.Conn
w *sync.Mutex
}
type WServer struct { type WServer struct {
wsAddr string wsAddr string
wsMaxConnNum int wsMaxConnNum int
wsUpGrader *websocket.Upgrader wsUpGrader *websocket.Upgrader
wsConnToUser map[*websocket.Conn]string wsConnToUser map[*UserConn]string
wsUserToConn map[string]*websocket.Conn wsUserToConn map[string]*UserConn
} }
func (ws *WServer) onInit(wsPort int) { func (ws *WServer) onInit(wsPort int) {
ip := utils.ServerIP ip := utils.ServerIP
ws.wsAddr = ip + ":" + utils.IntToString(wsPort) ws.wsAddr = ip + ":" + utils.IntToString(wsPort)
ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum
ws.wsConnToUser = make(map[*websocket.Conn]string) ws.wsConnToUser = make(map[*UserConn]string)
ws.wsUserToConn = make(map[string]*websocket.Conn) ws.wsUserToConn = make(map[string]*UserConn)
ws.wsUpGrader = &websocket.Upgrader{ ws.wsUpGrader = &websocket.Upgrader{
HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second, HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second,
ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen, ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen,
@ -49,35 +54,36 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
//Connection mapping relationship, //Connection mapping relationship,
//userID+" "+platformID->conn //userID+" "+platformID->conn
SendID := query["sendID"][0] + " " + utils.PlatformIDToName(int32(utils.StringToInt64(query["platformID"][0]))) SendID := query["sendID"][0] + " " + utils.PlatformIDToName(int32(utils.StringToInt64(query["platformID"][0])))
ws.addUserConn(SendID, conn) //Initialize a lock for each user
go ws.readMsg(conn) newConn := &UserConn{conn, new(sync.Mutex)}
ws.addUserConn(SendID, newConn)
go ws.readMsg(newConn)
} }
} }
} }
func (ws *WServer) readMsg(conn *websocket.Conn) { func (ws *WServer) readMsg(conn *UserConn) {
for { for {
msgType, msg, err := conn.ReadMessage() _, msg, err := conn.ReadMessage()
if err != nil { if err != nil {
log.ErrorByKv("WS ReadMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err) log.ErrorByKv("WS ReadMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err)
ws.delUserConn(conn) ws.delUserConn(conn)
return return
} else { } else {
log.ErrorByKv("test", "", "msgType", msgType, "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn)) //log.ErrorByKv("test", "", "msgType", msgType, "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn))
} }
ws.msgParse(conn, msg) ws.msgParse(conn, msg)
//ws.writeMsg(conn, 1, chat) //ws.writeMsg(conn, 1, chat)
} }
} }
func (ws *WServer) writeMsg(conn *UserConn, a int, msg []byte) error {
func (ws *WServer) writeMsg(conn *websocket.Conn, a int, msg []byte) error { conn.w.Lock()
rwLock.Lock() defer conn.w.Unlock()
defer rwLock.Unlock()
return conn.WriteMessage(a, msg) return conn.WriteMessage(a, msg)
} }
func (ws *WServer) addUserConn(uid string, conn *websocket.Conn) { func (ws *WServer) addUserConn(uid string, conn *UserConn) {
rwLock.Lock() rwLock.Lock()
defer rwLock.Unlock() defer rwLock.Unlock()
if oldConn, ok := ws.wsUserToConn[uid]; ok { if oldConn, ok := ws.wsUserToConn[uid]; ok {
@ -95,7 +101,7 @@ func (ws *WServer) addUserConn(uid string, conn *websocket.Conn) {
} }
func (ws *WServer) delUserConn(conn *websocket.Conn) { func (ws *WServer) delUserConn(conn *UserConn) {
rwLock.Lock() rwLock.Lock()
defer rwLock.Unlock() defer rwLock.Unlock()
var uidPlatform string var uidPlatform string
@ -111,12 +117,12 @@ func (ws *WServer) delUserConn(conn *websocket.Conn) {
} }
err := conn.Close() err := conn.Close()
if err != nil { if err != nil {
log.ErrorByKv("close err", "", "uid", uidPlatform, "conn", conn) log.ErrorByKv("close err", "", "uid", uidPlatform)
} }
} }
func (ws *WServer) getUserConn(uid string) *websocket.Conn { func (ws *WServer) getUserConn(uid string) *UserConn {
rwLock.RLock() rwLock.RLock()
defer rwLock.RUnlock() defer rwLock.RUnlock()
if conn, ok := ws.wsUserToConn[uid]; ok { if conn, ok := ws.wsUserToConn[uid]; ok {
@ -124,7 +130,7 @@ func (ws *WServer) getUserConn(uid string) *websocket.Conn {
} }
return nil return nil
} }
func (ws *WServer) getUserUid(conn *websocket.Conn) string { func (ws *WServer) getUserUid(conn *UserConn) string {
rwLock.RLock() rwLock.RLock()
defer rwLock.RUnlock() defer rwLock.RUnlock()

View File

@ -3,15 +3,20 @@ package logic
import ( import (
"Open_IM/src/common/db" "Open_IM/src/common/db"
"Open_IM/src/common/db/mysql_model/im_mysql_model" "Open_IM/src/common/db/mysql_model/im_mysql_model"
"Open_IM/src/common/log"
pbMsg "Open_IM/src/proto/chat" pbMsg "Open_IM/src/proto/chat"
"Open_IM/src/utils"
) )
func saveUserChat(uid string, pbMsg *pbMsg.MsgSvrToPushSvrChatMsg) error { func saveUserChat(uid string, pbMsg *pbMsg.MsgSvrToPushSvrChatMsg) error {
time := utils.GetCurrentTimestampByMill()
seq, err := db.DB.IncrUserSeq(uid) seq, err := db.DB.IncrUserSeq(uid)
if err != nil { if err != nil {
log.NewError(pbMsg.OperationID, "data insert to redis err", err.Error(), pbMsg.String())
return err return err
} }
pbMsg.RecvSeq = seq pbMsg.RecvSeq = seq
log.NewInfo(pbMsg.OperationID, "IncrUserSeq cost time", utils.GetCurrentTimestampByMill()-time)
return db.DB.SaveUserChat(uid, pbMsg.SendTime, pbMsg) return db.DB.SaveUserChat(uid, pbMsg.SendTime, pbMsg)
} }

View File

@ -33,6 +33,7 @@ func (mc *HistoryConsumerHandler) Init() {
func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) {
log.InfoByKv("chat come mongo!!!", "", "chat", string(msg)) log.InfoByKv("chat come mongo!!!", "", "chat", string(msg))
time := utils.GetCurrentTimestampBySecond()
pbData := pbMsg.WSToMsgSvrChatMsg{} pbData := pbMsg.WSToMsgSvrChatMsg{}
err := proto.Unmarshal(msg, &pbData) err := proto.Unmarshal(msg, &pbData)
if err != nil { if err != nil {
@ -78,11 +79,13 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string)
//} //}
} }
log.NewInfo(pbSaveData.OperationID, "saveUserChat cost time ", utils.GetCurrentTimestampBySecond()-time)
} }
if msgKey == pbSaveData.RecvID { if msgKey == pbSaveData.RecvID {
pbSaveData.Options = pbData.Options pbSaveData.Options = pbData.Options
pbSaveData.OfflineInfo = pbData.OfflineInfo pbSaveData.OfflineInfo = pbData.OfflineInfo
sendMessageToPush(&pbSaveData) sendMessageToPush(&pbSaveData)
log.NewInfo(pbSaveData.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampBySecond()-time)
} }
log.InfoByKv("msg_transfer handle topic success...", "", "") log.InfoByKv("msg_transfer handle topic success...", "", "")

View File

@ -20,6 +20,6 @@ func Init() {
} }
func Run() { func Run() {
//register mysqlConsumerHandler to //register mysqlConsumerHandler to
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH) //go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
} }

View File

@ -50,7 +50,7 @@ func (m *WSToMsgSvrChatMsg) Reset() { *m = WSToMsgSvrChatMsg{} }
func (m *WSToMsgSvrChatMsg) String() string { return proto.CompactTextString(m) } func (m *WSToMsgSvrChatMsg) String() string { return proto.CompactTextString(m) }
func (*WSToMsgSvrChatMsg) ProtoMessage() {} func (*WSToMsgSvrChatMsg) ProtoMessage() {}
func (*WSToMsgSvrChatMsg) Descriptor() ([]byte, []int) { func (*WSToMsgSvrChatMsg) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_34beadf7348900d2, []int{0} return fileDescriptor_chat_4f51bfe711d80421, []int{0}
} }
func (m *WSToMsgSvrChatMsg) XXX_Unmarshal(b []byte) error { func (m *WSToMsgSvrChatMsg) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_WSToMsgSvrChatMsg.Unmarshal(m, b) return xxx_messageInfo_WSToMsgSvrChatMsg.Unmarshal(m, b)
@ -215,7 +215,7 @@ func (m *MsgSvrToPushSvrChatMsg) Reset() { *m = MsgSvrToPushSvrChatMsg{}
func (m *MsgSvrToPushSvrChatMsg) String() string { return proto.CompactTextString(m) } func (m *MsgSvrToPushSvrChatMsg) String() string { return proto.CompactTextString(m) }
func (*MsgSvrToPushSvrChatMsg) ProtoMessage() {} func (*MsgSvrToPushSvrChatMsg) ProtoMessage() {}
func (*MsgSvrToPushSvrChatMsg) Descriptor() ([]byte, []int) { func (*MsgSvrToPushSvrChatMsg) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_34beadf7348900d2, []int{1} return fileDescriptor_chat_4f51bfe711d80421, []int{1}
} }
func (m *MsgSvrToPushSvrChatMsg) XXX_Unmarshal(b []byte) error { func (m *MsgSvrToPushSvrChatMsg) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_MsgSvrToPushSvrChatMsg.Unmarshal(m, b) return xxx_messageInfo_MsgSvrToPushSvrChatMsg.Unmarshal(m, b)
@ -361,7 +361,7 @@ func (m *PullMessageReq) Reset() { *m = PullMessageReq{} }
func (m *PullMessageReq) String() string { return proto.CompactTextString(m) } func (m *PullMessageReq) String() string { return proto.CompactTextString(m) }
func (*PullMessageReq) ProtoMessage() {} func (*PullMessageReq) ProtoMessage() {}
func (*PullMessageReq) Descriptor() ([]byte, []int) { func (*PullMessageReq) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_34beadf7348900d2, []int{2} return fileDescriptor_chat_4f51bfe711d80421, []int{2}
} }
func (m *PullMessageReq) XXX_Unmarshal(b []byte) error { func (m *PullMessageReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PullMessageReq.Unmarshal(m, b) return xxx_messageInfo_PullMessageReq.Unmarshal(m, b)
@ -425,7 +425,7 @@ func (m *PullMessageResp) Reset() { *m = PullMessageResp{} }
func (m *PullMessageResp) String() string { return proto.CompactTextString(m) } func (m *PullMessageResp) String() string { return proto.CompactTextString(m) }
func (*PullMessageResp) ProtoMessage() {} func (*PullMessageResp) ProtoMessage() {}
func (*PullMessageResp) Descriptor() ([]byte, []int) { func (*PullMessageResp) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_34beadf7348900d2, []int{3} return fileDescriptor_chat_4f51bfe711d80421, []int{3}
} }
func (m *PullMessageResp) XXX_Unmarshal(b []byte) error { func (m *PullMessageResp) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PullMessageResp.Unmarshal(m, b) return xxx_messageInfo_PullMessageResp.Unmarshal(m, b)
@ -487,6 +487,60 @@ func (m *PullMessageResp) GetGroupUserMsg() []*GatherFormat {
return nil return nil
} }
type PullMessageBySeqListReq struct {
UserID string `protobuf:"bytes,1,opt,name=UserID" json:"UserID,omitempty"`
OperationID string `protobuf:"bytes,2,opt,name=OperationID" json:"OperationID,omitempty"`
SeqList []int64 `protobuf:"varint,3,rep,packed,name=seqList" json:"seqList,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PullMessageBySeqListReq) Reset() { *m = PullMessageBySeqListReq{} }
func (m *PullMessageBySeqListReq) String() string { return proto.CompactTextString(m) }
func (*PullMessageBySeqListReq) ProtoMessage() {}
func (*PullMessageBySeqListReq) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_4f51bfe711d80421, []int{4}
}
func (m *PullMessageBySeqListReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PullMessageBySeqListReq.Unmarshal(m, b)
}
func (m *PullMessageBySeqListReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_PullMessageBySeqListReq.Marshal(b, m, deterministic)
}
func (dst *PullMessageBySeqListReq) XXX_Merge(src proto.Message) {
xxx_messageInfo_PullMessageBySeqListReq.Merge(dst, src)
}
func (m *PullMessageBySeqListReq) XXX_Size() int {
return xxx_messageInfo_PullMessageBySeqListReq.Size(m)
}
func (m *PullMessageBySeqListReq) XXX_DiscardUnknown() {
xxx_messageInfo_PullMessageBySeqListReq.DiscardUnknown(m)
}
var xxx_messageInfo_PullMessageBySeqListReq proto.InternalMessageInfo
func (m *PullMessageBySeqListReq) GetUserID() string {
if m != nil {
return m.UserID
}
return ""
}
func (m *PullMessageBySeqListReq) GetOperationID() string {
if m != nil {
return m.OperationID
}
return ""
}
func (m *PullMessageBySeqListReq) GetSeqList() []int64 {
if m != nil {
return m.SeqList
}
return nil
}
type GetNewSeqReq struct { type GetNewSeqReq struct {
UserID string `protobuf:"bytes,1,opt,name=UserID" json:"UserID,omitempty"` UserID string `protobuf:"bytes,1,opt,name=UserID" json:"UserID,omitempty"`
OperationID string `protobuf:"bytes,2,opt,name=OperationID" json:"OperationID,omitempty"` OperationID string `protobuf:"bytes,2,opt,name=OperationID" json:"OperationID,omitempty"`
@ -499,7 +553,7 @@ func (m *GetNewSeqReq) Reset() { *m = GetNewSeqReq{} }
func (m *GetNewSeqReq) String() string { return proto.CompactTextString(m) } func (m *GetNewSeqReq) String() string { return proto.CompactTextString(m) }
func (*GetNewSeqReq) ProtoMessage() {} func (*GetNewSeqReq) ProtoMessage() {}
func (*GetNewSeqReq) Descriptor() ([]byte, []int) { func (*GetNewSeqReq) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_34beadf7348900d2, []int{4} return fileDescriptor_chat_4f51bfe711d80421, []int{5}
} }
func (m *GetNewSeqReq) XXX_Unmarshal(b []byte) error { func (m *GetNewSeqReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_GetNewSeqReq.Unmarshal(m, b) return xxx_messageInfo_GetNewSeqReq.Unmarshal(m, b)
@ -546,7 +600,7 @@ func (m *GetNewSeqResp) Reset() { *m = GetNewSeqResp{} }
func (m *GetNewSeqResp) String() string { return proto.CompactTextString(m) } func (m *GetNewSeqResp) String() string { return proto.CompactTextString(m) }
func (*GetNewSeqResp) ProtoMessage() {} func (*GetNewSeqResp) ProtoMessage() {}
func (*GetNewSeqResp) Descriptor() ([]byte, []int) { func (*GetNewSeqResp) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_34beadf7348900d2, []int{5} return fileDescriptor_chat_4f51bfe711d80421, []int{6}
} }
func (m *GetNewSeqResp) XXX_Unmarshal(b []byte) error { func (m *GetNewSeqResp) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_GetNewSeqResp.Unmarshal(m, b) return xxx_messageInfo_GetNewSeqResp.Unmarshal(m, b)
@ -601,7 +655,7 @@ func (m *GatherFormat) Reset() { *m = GatherFormat{} }
func (m *GatherFormat) String() string { return proto.CompactTextString(m) } func (m *GatherFormat) String() string { return proto.CompactTextString(m) }
func (*GatherFormat) ProtoMessage() {} func (*GatherFormat) ProtoMessage() {}
func (*GatherFormat) Descriptor() ([]byte, []int) { func (*GatherFormat) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_34beadf7348900d2, []int{6} return fileDescriptor_chat_4f51bfe711d80421, []int{7}
} }
func (m *GatherFormat) XXX_Unmarshal(b []byte) error { func (m *GatherFormat) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_GatherFormat.Unmarshal(m, b) return xxx_messageInfo_GatherFormat.Unmarshal(m, b)
@ -669,7 +723,7 @@ func (m *MsgFormat) Reset() { *m = MsgFormat{} }
func (m *MsgFormat) String() string { return proto.CompactTextString(m) } func (m *MsgFormat) String() string { return proto.CompactTextString(m) }
func (*MsgFormat) ProtoMessage() {} func (*MsgFormat) ProtoMessage() {}
func (*MsgFormat) Descriptor() ([]byte, []int) { func (*MsgFormat) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_34beadf7348900d2, []int{7} return fileDescriptor_chat_4f51bfe711d80421, []int{8}
} }
func (m *MsgFormat) XXX_Unmarshal(b []byte) error { func (m *MsgFormat) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_MsgFormat.Unmarshal(m, b) return xxx_messageInfo_MsgFormat.Unmarshal(m, b)
@ -791,6 +845,7 @@ type UserSendMsgReq struct {
ClientMsgID string `protobuf:"bytes,15,opt,name=ClientMsgID" json:"ClientMsgID,omitempty"` ClientMsgID string `protobuf:"bytes,15,opt,name=ClientMsgID" json:"ClientMsgID,omitempty"`
OffLineInfo string `protobuf:"bytes,16,opt,name=OffLineInfo" json:"OffLineInfo,omitempty"` OffLineInfo string `protobuf:"bytes,16,opt,name=OffLineInfo" json:"OffLineInfo,omitempty"`
Ex string `protobuf:"bytes,17,opt,name=Ex" json:"Ex,omitempty"` Ex string `protobuf:"bytes,17,opt,name=Ex" json:"Ex,omitempty"`
SendTime int64 `protobuf:"varint,18,opt,name=sendTime" json:"sendTime,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -800,7 +855,7 @@ func (m *UserSendMsgReq) Reset() { *m = UserSendMsgReq{} }
func (m *UserSendMsgReq) String() string { return proto.CompactTextString(m) } func (m *UserSendMsgReq) String() string { return proto.CompactTextString(m) }
func (*UserSendMsgReq) ProtoMessage() {} func (*UserSendMsgReq) ProtoMessage() {}
func (*UserSendMsgReq) Descriptor() ([]byte, []int) { func (*UserSendMsgReq) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_34beadf7348900d2, []int{8} return fileDescriptor_chat_4f51bfe711d80421, []int{9}
} }
func (m *UserSendMsgReq) XXX_Unmarshal(b []byte) error { func (m *UserSendMsgReq) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_UserSendMsgReq.Unmarshal(m, b) return xxx_messageInfo_UserSendMsgReq.Unmarshal(m, b)
@ -939,13 +994,19 @@ func (m *UserSendMsgReq) GetEx() string {
return "" return ""
} }
func (m *UserSendMsgReq) GetSendTime() int64 {
if m != nil {
return m.SendTime
}
return 0
}
type UserSendMsgResp struct { type UserSendMsgResp struct {
ErrCode int32 `protobuf:"varint,1,opt,name=ErrCode" json:"ErrCode,omitempty"` ErrCode int32 `protobuf:"varint,1,opt,name=ErrCode" json:"ErrCode,omitempty"`
ErrMsg string `protobuf:"bytes,2,opt,name=ErrMsg" json:"ErrMsg,omitempty"` ErrMsg string `protobuf:"bytes,2,opt,name=ErrMsg" json:"ErrMsg,omitempty"`
ReqIdentifier int32 `protobuf:"varint,3,opt,name=ReqIdentifier" json:"ReqIdentifier,omitempty"` ReqIdentifier int32 `protobuf:"varint,3,opt,name=ReqIdentifier" json:"ReqIdentifier,omitempty"`
SendTime int64 `protobuf:"varint,5,opt,name=SendTime" json:"SendTime,omitempty"` ServerMsgID string `protobuf:"bytes,4,opt,name=ServerMsgID" json:"ServerMsgID,omitempty"`
ServerMsgID string `protobuf:"bytes,6,opt,name=ServerMsgID" json:"ServerMsgID,omitempty"` ClientMsgID string `protobuf:"bytes,5,opt,name=ClientMsgID" json:"ClientMsgID,omitempty"`
ClientMsgID string `protobuf:"bytes,7,opt,name=ClientMsgID" json:"ClientMsgID,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -955,7 +1016,7 @@ func (m *UserSendMsgResp) Reset() { *m = UserSendMsgResp{} }
func (m *UserSendMsgResp) String() string { return proto.CompactTextString(m) } func (m *UserSendMsgResp) String() string { return proto.CompactTextString(m) }
func (*UserSendMsgResp) ProtoMessage() {} func (*UserSendMsgResp) ProtoMessage() {}
func (*UserSendMsgResp) Descriptor() ([]byte, []int) { func (*UserSendMsgResp) Descriptor() ([]byte, []int) {
return fileDescriptor_chat_34beadf7348900d2, []int{9} return fileDescriptor_chat_4f51bfe711d80421, []int{10}
} }
func (m *UserSendMsgResp) XXX_Unmarshal(b []byte) error { func (m *UserSendMsgResp) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_UserSendMsgResp.Unmarshal(m, b) return xxx_messageInfo_UserSendMsgResp.Unmarshal(m, b)
@ -996,13 +1057,6 @@ func (m *UserSendMsgResp) GetReqIdentifier() int32 {
return 0 return 0
} }
func (m *UserSendMsgResp) GetSendTime() int64 {
if m != nil {
return m.SendTime
}
return 0
}
func (m *UserSendMsgResp) GetServerMsgID() string { func (m *UserSendMsgResp) GetServerMsgID() string {
if m != nil { if m != nil {
return m.ServerMsgID return m.ServerMsgID
@ -1022,6 +1076,7 @@ func init() {
proto.RegisterType((*MsgSvrToPushSvrChatMsg)(nil), "pbChat.MsgSvrToPushSvrChatMsg") proto.RegisterType((*MsgSvrToPushSvrChatMsg)(nil), "pbChat.MsgSvrToPushSvrChatMsg")
proto.RegisterType((*PullMessageReq)(nil), "pbChat.PullMessageReq") proto.RegisterType((*PullMessageReq)(nil), "pbChat.PullMessageReq")
proto.RegisterType((*PullMessageResp)(nil), "pbChat.PullMessageResp") proto.RegisterType((*PullMessageResp)(nil), "pbChat.PullMessageResp")
proto.RegisterType((*PullMessageBySeqListReq)(nil), "pbChat.PullMessageBySeqListReq")
proto.RegisterType((*GetNewSeqReq)(nil), "pbChat.GetNewSeqReq") proto.RegisterType((*GetNewSeqReq)(nil), "pbChat.GetNewSeqReq")
proto.RegisterType((*GetNewSeqResp)(nil), "pbChat.GetNewSeqResp") proto.RegisterType((*GetNewSeqResp)(nil), "pbChat.GetNewSeqResp")
proto.RegisterType((*GatherFormat)(nil), "pbChat.GatherFormat") proto.RegisterType((*GatherFormat)(nil), "pbChat.GatherFormat")
@ -1043,6 +1098,7 @@ const _ = grpc.SupportPackageIsVersion4
type ChatClient interface { type ChatClient interface {
GetNewSeq(ctx context.Context, in *GetNewSeqReq, opts ...grpc.CallOption) (*GetNewSeqResp, error) GetNewSeq(ctx context.Context, in *GetNewSeqReq, opts ...grpc.CallOption) (*GetNewSeqResp, error)
PullMessage(ctx context.Context, in *PullMessageReq, opts ...grpc.CallOption) (*PullMessageResp, error) PullMessage(ctx context.Context, in *PullMessageReq, opts ...grpc.CallOption) (*PullMessageResp, error)
PullMessageBySeqList(ctx context.Context, in *PullMessageBySeqListReq, opts ...grpc.CallOption) (*PullMessageResp, error)
UserSendMsg(ctx context.Context, in *UserSendMsgReq, opts ...grpc.CallOption) (*UserSendMsgResp, error) UserSendMsg(ctx context.Context, in *UserSendMsgReq, opts ...grpc.CallOption) (*UserSendMsgResp, error)
} }
@ -1072,6 +1128,15 @@ func (c *chatClient) PullMessage(ctx context.Context, in *PullMessageReq, opts .
return out, nil return out, nil
} }
func (c *chatClient) PullMessageBySeqList(ctx context.Context, in *PullMessageBySeqListReq, opts ...grpc.CallOption) (*PullMessageResp, error) {
out := new(PullMessageResp)
err := grpc.Invoke(ctx, "/pbChat.Chat/PullMessageBySeqList", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *chatClient) UserSendMsg(ctx context.Context, in *UserSendMsgReq, opts ...grpc.CallOption) (*UserSendMsgResp, error) { func (c *chatClient) UserSendMsg(ctx context.Context, in *UserSendMsgReq, opts ...grpc.CallOption) (*UserSendMsgResp, error) {
out := new(UserSendMsgResp) out := new(UserSendMsgResp)
err := grpc.Invoke(ctx, "/pbChat.Chat/UserSendMsg", in, out, c.cc, opts...) err := grpc.Invoke(ctx, "/pbChat.Chat/UserSendMsg", in, out, c.cc, opts...)
@ -1086,6 +1151,7 @@ func (c *chatClient) UserSendMsg(ctx context.Context, in *UserSendMsgReq, opts .
type ChatServer interface { type ChatServer interface {
GetNewSeq(context.Context, *GetNewSeqReq) (*GetNewSeqResp, error) GetNewSeq(context.Context, *GetNewSeqReq) (*GetNewSeqResp, error)
PullMessage(context.Context, *PullMessageReq) (*PullMessageResp, error) PullMessage(context.Context, *PullMessageReq) (*PullMessageResp, error)
PullMessageBySeqList(context.Context, *PullMessageBySeqListReq) (*PullMessageResp, error)
UserSendMsg(context.Context, *UserSendMsgReq) (*UserSendMsgResp, error) UserSendMsg(context.Context, *UserSendMsgReq) (*UserSendMsgResp, error)
} }
@ -1129,6 +1195,24 @@ func _Chat_PullMessage_Handler(srv interface{}, ctx context.Context, dec func(in
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _Chat_PullMessageBySeqList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PullMessageBySeqListReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ChatServer).PullMessageBySeqList(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pbChat.Chat/PullMessageBySeqList",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ChatServer).PullMessageBySeqList(ctx, req.(*PullMessageBySeqListReq))
}
return interceptor(ctx, in, info, handler)
}
func _Chat_UserSendMsg_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _Chat_UserSendMsg_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(UserSendMsgReq) in := new(UserSendMsgReq)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@ -1159,6 +1243,10 @@ var _Chat_serviceDesc = grpc.ServiceDesc{
MethodName: "PullMessage", MethodName: "PullMessage",
Handler: _Chat_PullMessage_Handler, Handler: _Chat_PullMessage_Handler,
}, },
{
MethodName: "PullMessageBySeqList",
Handler: _Chat_PullMessageBySeqList_Handler,
},
{ {
MethodName: "UserSendMsg", MethodName: "UserSendMsg",
Handler: _Chat_UserSendMsg_Handler, Handler: _Chat_UserSendMsg_Handler,
@ -1168,66 +1256,69 @@ var _Chat_serviceDesc = grpc.ServiceDesc{
Metadata: "chat/chat.proto", Metadata: "chat/chat.proto",
} }
func init() { proto.RegisterFile("chat/chat.proto", fileDescriptor_chat_34beadf7348900d2) } func init() { proto.RegisterFile("chat/chat.proto", fileDescriptor_chat_4f51bfe711d80421) }
var fileDescriptor_chat_34beadf7348900d2 = []byte{ var fileDescriptor_chat_4f51bfe711d80421 = []byte{
// 919 bytes of a gzipped FileDescriptorProto // 970 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x5f, 0x6b, 0xe3, 0x46, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0xdd, 0x6a, 0xe3, 0x46,
0x10, 0xc7, 0x96, 0x2d, 0x5b, 0xe3, 0xbf, 0x59, 0xae, 0xe9, 0x12, 0x4a, 0x31, 0xe6, 0x5a, 0x4c, 0x14, 0xc6, 0x92, 0x7f, 0xa2, 0x23, 0xff, 0x65, 0x48, 0xb3, 0x43, 0x28, 0xad, 0x11, 0xdb, 0x62,
0x1f, 0x52, 0xb8, 0xbe, 0x1c, 0x2d, 0xf4, 0xe1, 0x62, 0x27, 0x15, 0x44, 0xb9, 0x20, 0xf9, 0x28, 0x7a, 0x91, 0xc2, 0xf6, 0x66, 0x69, 0x61, 0x2f, 0x12, 0x3b, 0xa9, 0x20, 0xce, 0x06, 0xc9, 0x4b,
0xf4, 0x4d, 0xe7, 0xac, 0x15, 0x71, 0xb6, 0x24, 0xef, 0x2a, 0x69, 0xfa, 0xd2, 0xaf, 0xd0, 0x7e, 0xa1, 0x77, 0x5a, 0x67, 0xac, 0x88, 0xb5, 0x25, 0x79, 0x46, 0x49, 0xb3, 0x37, 0x7d, 0x85, 0x42,
0xa4, 0xbe, 0xf6, 0xa3, 0xf4, 0x2b, 0x14, 0x0a, 0x65, 0x77, 0x25, 0x6b, 0x57, 0x72, 0x1c, 0x13, 0x9f, 0xa3, 0x7d, 0xb0, 0xbe, 0x42, 0xa1, 0x50, 0x66, 0x46, 0xb2, 0x46, 0x3f, 0x4e, 0x42, 0x96,
0xc8, 0x4b, 0xc8, 0xfc, 0x34, 0xb3, 0xbb, 0x33, 0xf3, 0x9b, 0xdf, 0x18, 0x06, 0x8b, 0x5b, 0x3f, 0xbd, 0x59, 0xf6, 0xfb, 0x74, 0xe6, 0xe7, 0x9c, 0xf3, 0xcd, 0x77, 0x62, 0x18, 0x2c, 0x6e, 0xbc,
0xfd, 0x96, 0xff, 0x39, 0x4d, 0x68, 0x9c, 0xc6, 0xc8, 0x4c, 0x3e, 0x9e, 0xdd, 0xfa, 0xe9, 0xf8, 0xe4, 0x7b, 0xfe, 0xcf, 0x71, 0x4c, 0xa3, 0x24, 0x42, 0xed, 0xf8, 0xfd, 0xe9, 0x8d, 0x97, 0x58,
0x8f, 0x06, 0x1c, 0xfd, 0xec, 0xcd, 0x63, 0x87, 0x05, 0xde, 0x3d, 0xe5, 0x90, 0xc3, 0x02, 0x74, 0x7f, 0x34, 0x61, 0xff, 0x17, 0x77, 0x1e, 0xcd, 0x98, 0xef, 0xde, 0x51, 0x4e, 0xcd, 0x98, 0x8f,
0x0c, 0xa6, 0x47, 0xa2, 0x1b, 0x7b, 0x8a, 0x6b, 0xa3, 0xda, 0xc4, 0x72, 0x33, 0x8b, 0xe3, 0x2e, 0x0e, 0xa1, 0xed, 0x92, 0xf0, 0xda, 0x9e, 0xe0, 0xc6, 0xa8, 0x31, 0x36, 0x9c, 0x14, 0x71, 0xde,
0x59, 0xdc, 0xdb, 0x53, 0x5c, 0x97, 0xb8, 0xb4, 0x10, 0x86, 0xd6, 0x59, 0x1c, 0xa5, 0x24, 0x4a, 0x21, 0x8b, 0x3b, 0x7b, 0x82, 0x35, 0xc9, 0x4b, 0x84, 0x30, 0x74, 0x4e, 0xa3, 0x30, 0x21, 0x61,
0xb1, 0x21, 0x3e, 0xe4, 0x26, 0x3a, 0x81, 0x36, 0x8f, 0x9d, 0x87, 0x6b, 0x82, 0x1b, 0xa3, 0xda, 0x82, 0x75, 0xf1, 0x21, 0x83, 0xe8, 0x08, 0xf6, 0xf8, 0xda, 0x79, 0xb0, 0x26, 0xb8, 0x39, 0x6a,
0xc4, 0x70, 0xb7, 0x36, 0x8f, 0x72, 0x58, 0x70, 0x4e, 0xe3, 0x35, 0x6e, 0x8e, 0x6a, 0x93, 0xa6, 0x8c, 0x75, 0x67, 0x8b, 0xf9, 0xaa, 0x19, 0xf3, 0xcf, 0x68, 0xb4, 0xc6, 0xad, 0x51, 0x63, 0xdc,
0x9b, 0x9b, 0xe8, 0x6b, 0xe8, 0x73, 0x2f, 0x42, 0xaf, 0xc2, 0xc5, 0xa7, 0x2b, 0x7f, 0x4d, 0xb0, 0x72, 0x32, 0x88, 0xbe, 0x85, 0x3e, 0x8f, 0x22, 0xf4, 0x32, 0x58, 0x7c, 0xb8, 0xf4, 0xd6, 0x04,
0x29, 0x8e, 0x2d, 0xa1, 0xe8, 0x35, 0xf4, 0x24, 0x72, 0xee, 0x2f, 0xc8, 0x07, 0xf7, 0x12, 0xb7, 0xb7, 0xc5, 0xb6, 0x25, 0x16, 0xbd, 0x84, 0x9e, 0x64, 0xce, 0xbc, 0x05, 0x79, 0xe7, 0x5c, 0xe0,
0x84, 0x9b, 0x0e, 0xa2, 0x11, 0x74, 0xb2, 0xe7, 0xcc, 0x7f, 0x4b, 0x08, 0x6e, 0x8b, 0xbb, 0x54, 0x8e, 0x08, 0x2b, 0x92, 0x68, 0x04, 0x66, 0x7a, 0x9d, 0xf9, 0xc7, 0x98, 0xe0, 0x3d, 0x71, 0x96,
0x88, 0x7b, 0x78, 0x84, 0xb1, 0x30, 0x8e, 0x84, 0x87, 0x25, 0x3d, 0x14, 0x88, 0x7b, 0xbc, 0x4f, 0x4a, 0xf1, 0x08, 0x97, 0x30, 0x16, 0x44, 0xa1, 0x88, 0x30, 0x64, 0x84, 0x42, 0xf1, 0x88, 0xb7,
0x08, 0xf5, 0xd3, 0x30, 0x8e, 0xec, 0x29, 0x06, 0x71, 0x8f, 0x0a, 0xa1, 0x57, 0xd0, 0x74, 0x58, 0x31, 0xa1, 0x5e, 0x12, 0x44, 0xa1, 0x3d, 0xc1, 0x20, 0xce, 0x51, 0x29, 0x74, 0x00, 0xad, 0x19,
0x60, 0x4f, 0x71, 0x47, 0x7c, 0x93, 0x06, 0x47, 0xe7, 0xf1, 0x27, 0x12, 0xe1, 0xae, 0x44, 0x85, 0xf3, 0xed, 0x09, 0x36, 0xc5, 0x37, 0x09, 0x38, 0x3b, 0x8f, 0x3e, 0x90, 0x10, 0x77, 0x25, 0x2b,
0x21, 0x4e, 0x5b, 0x2e, 0x57, 0x61, 0x44, 0xec, 0x68, 0x19, 0xe3, 0x5e, 0x76, 0x5a, 0x01, 0xf1, 0x80, 0xd8, 0x6d, 0xb9, 0x5c, 0x05, 0x21, 0xb1, 0xc3, 0x65, 0x84, 0x7b, 0xe9, 0x6e, 0x39, 0xc5,
0xda, 0xbc, 0x4f, 0xf8, 0xc9, 0x0c, 0xf7, 0x65, 0x45, 0x33, 0x13, 0x7d, 0x09, 0x70, 0xbd, 0xf2, 0x6b, 0xf3, 0x36, 0xe6, 0x3b, 0x33, 0xdc, 0x97, 0x15, 0x4d, 0x21, 0xfa, 0x0a, 0xe0, 0x6a, 0xe5,
0xd3, 0x65, 0x4c, 0xd7, 0xf6, 0x14, 0x0f, 0xc4, 0x53, 0x15, 0x04, 0x7d, 0x01, 0xd6, 0x79, 0x4c, 0x25, 0xcb, 0x88, 0xae, 0xed, 0x09, 0x1e, 0x88, 0xab, 0x2a, 0x0c, 0xfa, 0x12, 0x8c, 0xb3, 0x88,
0x17, 0xe4, 0x32, 0x64, 0x29, 0x1e, 0x8e, 0x8c, 0x89, 0xe5, 0x16, 0x80, 0xa8, 0xc5, 0x2a, 0x24, 0x2e, 0xc8, 0x45, 0xc0, 0x12, 0x3c, 0x1c, 0xe9, 0x63, 0xc3, 0xc9, 0x09, 0x51, 0x8b, 0x55, 0x40,
0x51, 0x2a, 0xdf, 0x7a, 0x24, 0x6f, 0x56, 0xa0, 0xf1, 0xbf, 0x06, 0x1c, 0x4b, 0x36, 0xcc, 0xe3, 0xc2, 0x44, 0xde, 0x75, 0x5f, 0x9e, 0xac, 0x50, 0xd6, 0xbf, 0x3a, 0x1c, 0x4a, 0x35, 0xcc, 0xa3,
0xeb, 0x3b, 0x76, 0xfb, 0x22, 0xb4, 0xc0, 0xd0, 0xe2, 0x3e, 0x1e, 0xd9, 0x64, 0xac, 0xc8, 0x4d, 0xab, 0x5b, 0x76, 0xf3, 0x59, 0x64, 0x81, 0xa1, 0xc3, 0x63, 0x5c, 0xb2, 0x49, 0x55, 0x91, 0xc1,
0x8d, 0x30, 0xcd, 0xc7, 0x09, 0x63, 0x3e, 0x45, 0x98, 0xd6, 0x61, 0x84, 0x69, 0x1f, 0x40, 0x18, 0x82, 0x60, 0x5a, 0xbb, 0x05, 0xd3, 0x7e, 0x4c, 0x30, 0x9d, 0xa7, 0x09, 0x66, 0xef, 0x09, 0x82,
0xeb, 0x49, 0xc2, 0xc0, 0x93, 0x84, 0xe9, 0xec, 0x21, 0x4c, 0x57, 0x25, 0xcc, 0x4b, 0x52, 0xa3, 0x31, 0x1e, 0x15, 0x0c, 0x3c, 0x2a, 0x18, 0xf3, 0x01, 0xc1, 0x74, 0x55, 0xc1, 0x7c, 0x4e, 0x69,
0xd4, 0xfc, 0x61, 0xb5, 0xf9, 0xbf, 0x43, 0xff, 0xfa, 0x6e, 0xb5, 0x72, 0x08, 0x63, 0x7e, 0x40, 0x94, 0x9a, 0x3f, 0xac, 0x36, 0xff, 0x77, 0xe8, 0x5f, 0xdd, 0xae, 0x56, 0x33, 0xc2, 0x98, 0xe7,
0x5c, 0xb2, 0xe1, 0xbd, 0xfd, 0xc0, 0x08, 0x2d, 0x7a, 0x2e, 0x2d, 0xd9, 0xa7, 0xcd, 0x3b, 0x12, 0x13, 0x87, 0x6c, 0x78, 0x6f, 0xdf, 0x31, 0x42, 0xf3, 0x9e, 0x4b, 0x24, 0xfb, 0xb4, 0x39, 0x21,
0x84, 0x91, 0xe8, 0xba, 0xe8, 0x93, 0xb4, 0x25, 0x4f, 0x36, 0xb3, 0xe8, 0x46, 0xb4, 0xdd, 0x70, 0x7e, 0x10, 0x8a, 0xae, 0x8b, 0x3e, 0x49, 0x2c, 0x75, 0xb2, 0x99, 0x86, 0xd7, 0xa2, 0xed, 0xba,
0x33, 0xab, 0x5c, 0x93, 0x46, 0xa5, 0x26, 0xe3, 0x7f, 0x6a, 0x30, 0xd0, 0x1e, 0xc0, 0x12, 0x9e, 0x93, 0xa2, 0x72, 0x4d, 0x9a, 0x95, 0x9a, 0x58, 0xff, 0x34, 0x60, 0x50, 0xb8, 0x00, 0x8b, 0x79,
0xef, 0x8c, 0xd2, 0xb3, 0xf8, 0x86, 0x88, 0x27, 0x34, 0xdd, 0xdc, 0xe4, 0xf7, 0xcc, 0x28, 0x75, 0xbe, 0x53, 0x4a, 0x4f, 0xa3, 0x6b, 0x22, 0xae, 0xd0, 0x72, 0x32, 0xc8, 0xcf, 0x99, 0x52, 0x3a,
0x58, 0x90, 0xf3, 0x4e, 0x5a, 0x1c, 0x77, 0xfc, 0x07, 0x4e, 0xae, 0xec, 0x7e, 0x69, 0x09, 0x3c, 0x63, 0x7e, 0xa6, 0x3b, 0x89, 0x38, 0x3f, 0xf3, 0xee, 0xb9, 0xb8, 0xd2, 0xf3, 0x25, 0x12, 0x7c,
0x8c, 0x0a, 0xd2, 0x65, 0x16, 0xfa, 0x1e, 0x7a, 0x5e, 0x18, 0x05, 0x2b, 0xc2, 0x73, 0xe3, 0xc7, 0x10, 0xe6, 0xa2, 0x4b, 0x11, 0xfa, 0x11, 0x7a, 0x6e, 0x10, 0xfa, 0x2b, 0xc2, 0x73, 0xe3, 0xdb,
0x35, 0x47, 0xc6, 0xa4, 0xf3, 0xe6, 0xd5, 0xa9, 0x14, 0xc9, 0xd3, 0x0b, 0x3f, 0xbd, 0x25, 0xf4, 0xb5, 0x46, 0xfa, 0xd8, 0x7c, 0x75, 0x70, 0x2c, 0x4d, 0xf2, 0xf8, 0xdc, 0x4b, 0x6e, 0x08, 0x3d,
0x3c, 0xa6, 0x6b, 0x3f, 0x75, 0x75, 0x57, 0xf4, 0x16, 0xba, 0x17, 0x34, 0xbe, 0x4b, 0xf2, 0x50, 0x8b, 0xe8, 0xda, 0x4b, 0x9c, 0x62, 0x28, 0x7a, 0x0d, 0xdd, 0x73, 0x1a, 0xdd, 0xc6, 0xd9, 0xd2,
0x73, 0x4f, 0xa8, 0xe6, 0x39, 0xfe, 0x09, 0xba, 0x17, 0x24, 0xbd, 0x22, 0xbf, 0x7a, 0x64, 0xb3, 0xf6, 0x03, 0x4b, 0x0b, 0x91, 0xd6, 0x1a, 0x5e, 0x28, 0xa9, 0x9e, 0x7c, 0x74, 0xc9, 0x86, 0x3f,
0xaf, 0xd2, 0xa5, 0xaa, 0xd5, 0xab, 0x55, 0xf3, 0xa0, 0xa7, 0x9c, 0xf4, 0xac, 0x92, 0x0d, 0xc1, 0xd1, 0x87, 0x8a, 0x5e, 0x2a, 0xa0, 0x56, 0x15, 0x15, 0x86, 0x0e, 0x93, 0xfb, 0x60, 0x7d, 0xa4,
0x28, 0xea, 0xc5, 0xff, 0x1d, 0xcf, 0xa0, 0xab, 0x3e, 0x1e, 0xf5, 0xa1, 0xbe, 0x7d, 0x5a, 0xdd, 0xf3, 0x87, 0x95, 0x42, 0xeb, 0x67, 0xe8, 0x9e, 0x93, 0xe4, 0x92, 0xfc, 0xe6, 0x92, 0xcd, 0x27,
0x9e, 0xa2, 0xaf, 0xa0, 0x21, 0x24, 0xa6, 0x2e, 0x12, 0x3e, 0xca, 0x13, 0xe6, 0x13, 0x29, 0xb3, 0x9d, 0x61, 0xb9, 0xd0, 0x53, 0x76, 0x7a, 0x56, 0x87, 0x86, 0xa0, 0xe7, 0xed, 0xe1, 0xff, 0xb5,
0x15, 0x9f, 0xc7, 0xff, 0xd5, 0xc1, 0xda, 0x62, 0xcf, 0x51, 0x90, 0x7c, 0xe2, 0x0d, 0x7d, 0xe2, 0xa6, 0xd0, 0x55, 0x6b, 0x85, 0xfa, 0xa0, 0x6d, 0xaf, 0xa6, 0xd9, 0x13, 0xf4, 0x0d, 0x34, 0x45,
0x4b, 0x33, 0xda, 0x78, 0x64, 0x46, 0xe9, 0xbd, 0x28, 0xb6, 0x3d, 0x15, 0x62, 0x62, 0xb9, 0x2a, 0x56, 0x9a, 0xa8, 0xef, 0x7e, 0x56, 0x5f, 0x6e, 0x00, 0xb2, 0xb8, 0xe2, 0xb3, 0xf5, 0x9f, 0x06,
0xa4, 0xea, 0x93, 0xa9, 0xeb, 0x53, 0x56, 0x8e, 0xd6, 0xb6, 0x1c, 0x9a, 0x2e, 0xb5, 0x4b, 0xba, 0xc6, 0x96, 0x7b, 0x8e, 0x61, 0x65, 0x06, 0xa3, 0x17, 0x0d, 0xa6, 0x64, 0x09, 0xcd, 0x1d, 0x96,
0xf4, 0x0d, 0x0c, 0xa5, 0x80, 0x28, 0xd3, 0x27, 0x45, 0xa3, 0x82, 0xef, 0x50, 0x2a, 0x38, 0x4c, 0x40, 0xef, 0x44, 0x6f, 0xed, 0x89, 0xf0, 0x2e, 0xc3, 0x51, 0x29, 0xd5, 0x0e, 0xdb, 0x45, 0x3b,
0xa9, 0x3a, 0x8f, 0x29, 0x95, 0x32, 0xd1, 0xdd, 0xea, 0x44, 0xff, 0xd9, 0x80, 0x3e, 0x27, 0x12, 0x4c, 0xcb, 0xd1, 0xd9, 0x96, 0xa3, 0x60, 0x83, 0x7b, 0x25, 0x1b, 0xfc, 0x0e, 0x86, 0xd2, 0xaf,
0x8f, 0x73, 0x58, 0xc0, 0x89, 0xf6, 0x1a, 0x7a, 0x2e, 0xd9, 0xd8, 0x37, 0x24, 0x4a, 0xc3, 0x65, 0x94, 0xc7, 0x2e, 0x3d, 0xaa, 0xc2, 0xd7, 0x18, 0x23, 0x3c, 0xcd, 0x18, 0xcd, 0x5d, 0xc6, 0xa8,
0x48, 0x68, 0xc6, 0x11, 0x1d, 0x2c, 0x36, 0x57, 0x5d, 0xdd, 0x5c, 0x45, 0x03, 0x0d, 0xad, 0x81, 0x18, 0x48, 0xb7, 0x6a, 0x20, 0x7f, 0x35, 0xa1, 0xcf, 0x85, 0xc4, 0xd7, 0xcd, 0x98, 0xcf, 0x85,
0x4f, 0x8e, 0xf6, 0x8e, 0xc4, 0x9b, 0x87, 0x25, 0x6e, 0xee, 0x4a, 0x5c, 0x97, 0xba, 0xd6, 0x2e, 0xf6, 0x12, 0x7a, 0x0e, 0xd9, 0xd8, 0xd7, 0x24, 0x4c, 0x82, 0x65, 0x40, 0x68, 0xaa, 0x91, 0x22,
0xa9, 0x53, 0x05, 0xba, 0x5d, 0x15, 0x68, 0x85, 0x5a, 0xd6, 0x5e, 0x6a, 0x41, 0x95, 0x5a, 0x05, 0x99, 0x0f, 0x4a, 0x4d, 0x1d, 0x94, 0x79, 0x03, 0xf5, 0x42, 0x03, 0x1f, 0x75, 0x92, 0x9a, 0xc4,
0x5d, 0x3b, 0x1a, 0x5d, 0xb5, 0xdd, 0xdb, 0x2d, 0xef, 0x5e, 0x85, 0x6e, 0xbd, 0xca, 0x3a, 0x7c, 0x5b, 0x4f, 0x4b, 0xbc, 0x5d, 0x97, 0x78, 0xd1, 0x59, 0x3b, 0x75, 0xce, 0xaa, 0xce, 0x83, 0xbd,
0x44, 0xd2, 0x4b, 0x0d, 0x1e, 0x54, 0x1a, 0x9c, 0x2d, 0x8c, 0xcb, 0x7c, 0x61, 0x0c, 0xb7, 0x0b, 0xea, 0x3c, 0x50, 0xa4, 0x65, 0x3c, 0x28, 0x2d, 0xa8, 0x4a, 0x2b, 0x97, 0xab, 0x59, 0x90, 0x6b,
0x23, 0x87, 0xf8, 0xe4, 0xce, 0x1e, 0xb2, 0x55, 0x5f, 0x9f, 0x3d, 0x8c, 0xff, 0xae, 0xc1, 0x40, 0x61, 0xd4, 0x77, 0xcb, 0xa3, 0x5e, 0x91, 0x5b, 0xaf, 0x32, 0x7d, 0x77, 0x4c, 0x90, 0x52, 0x83,
0xa3, 0xc4, 0xb3, 0x14, 0xa3, 0xc2, 0x22, 0x63, 0x17, 0x8b, 0xf6, 0xad, 0xf3, 0xd2, 0x80, 0x9a, 0x07, 0x95, 0x06, 0xa7, 0xf3, 0xe9, 0x22, 0x9b, 0x4f, 0xc3, 0xed, 0x7c, 0xca, 0x28, 0xfe, 0x72,
0xd5, 0x01, 0x2d, 0x65, 0xdf, 0xaa, 0x64, 0xff, 0xe6, 0xaf, 0x1a, 0x34, 0xb8, 0xee, 0xa0, 0xb7, 0xa7, 0xf7, 0xe9, 0x5f, 0x16, 0xda, 0xf4, 0x9e, 0x4b, 0x99, 0x65, 0x52, 0x46, 0x52, 0xca, 0x19,
0x60, 0x6d, 0x35, 0x10, 0x15, 0xf2, 0xab, 0x08, 0xec, 0xc9, 0x67, 0x3b, 0x50, 0x96, 0xa0, 0x1f, 0xb6, 0xfe, 0x6e, 0xc0, 0xa0, 0x20, 0x97, 0x67, 0xb9, 0x49, 0x45, 0x61, 0x7a, 0x9d, 0xc2, 0x4a,
0xa1, 0xa3, 0xac, 0x1c, 0x74, 0x9c, 0x7b, 0xe9, 0x8b, 0xf0, 0xe4, 0xf3, 0x9d, 0xb8, 0x8c, 0x57, 0x0f, 0xb4, 0x59, 0x7d, 0xa0, 0xa5, 0xec, 0x5b, 0x95, 0xec, 0x5f, 0xfd, 0xa9, 0x41, 0x93, 0xfb,
0xaa, 0x59, 0xc4, 0xeb, 0x53, 0x57, 0xc4, 0x97, 0x4a, 0xff, 0xae, 0xf7, 0x4b, 0x87, 0xff, 0x30, 0x0e, 0x7a, 0x0d, 0xc6, 0xd6, 0x03, 0x51, 0xee, 0xf6, 0x8a, 0xc1, 0x1e, 0x7d, 0x51, 0xc3, 0xb2,
0xff, 0x41, 0x7e, 0xfe, 0x68, 0x8a, 0x1f, 0xe8, 0xdf, 0xfd, 0x1f, 0x00, 0x00, 0xff, 0xff, 0xfe, 0x18, 0xbd, 0x01, 0x53, 0xb1, 0x7d, 0x74, 0x98, 0x45, 0x15, 0xe7, 0xee, 0xd1, 0x8b, 0x5a, 0x9e,
0xa0, 0x14, 0x71, 0xb3, 0x0b, 0x00, 0x00, 0xc5, 0xe8, 0x0a, 0x0e, 0xea, 0xc6, 0x06, 0xfa, 0xba, 0x66, 0x81, 0x3a, 0x54, 0x76, 0xef, 0xf8,
0x06, 0x4c, 0xa5, 0x07, 0xf9, 0x8d, 0x8a, 0xef, 0x38, 0x5f, 0x5f, 0x6a, 0xd8, 0x49, 0xef, 0x57,
0x93, 0xff, 0xb2, 0xf8, 0x49, 0x7e, 0x7e, 0xdf, 0x16, 0xbf, 0x30, 0x7e, 0xf8, 0x3f, 0x00, 0x00,
0xff, 0xff, 0x6c, 0xad, 0x53, 0x41, 0x74, 0x0c, 0x00, 0x00,
} }

View File

@ -58,6 +58,11 @@ message PullMessageResp {
repeated GatherFormat SingleUserMsg = 5; repeated GatherFormat SingleUserMsg = 5;
repeated GatherFormat GroupUserMsg = 6; repeated GatherFormat GroupUserMsg = 6;
} }
message PullMessageBySeqListReq{
string UserID = 1;
string OperationID = 2;
repeated int64 seqList =3;
}
message GetNewSeqReq { message GetNewSeqReq {
string UserID = 1; string UserID = 1;
string OperationID = 2; string OperationID = 2;
@ -119,6 +124,7 @@ message UserSendMsgReq {
string ClientMsgID = 15; string ClientMsgID = 15;
string OffLineInfo = 16; string OffLineInfo = 16;
string Ex = 17; string Ex = 17;
int64 sendTime = 18;
} }
@ -127,13 +133,13 @@ message UserSendMsgResp {
int32 ErrCode = 1; int32 ErrCode = 1;
string ErrMsg = 2; string ErrMsg = 2;
int32 ReqIdentifier = 3; int32 ReqIdentifier = 3;
int64 SendTime = 5; string ServerMsgID = 4;
string ServerMsgID = 6; string ClientMsgID = 5;
string ClientMsgID = 7;
} }
service Chat { service Chat {
rpc GetNewSeq(GetNewSeqReq) returns(GetNewSeqResp); rpc GetNewSeq(GetNewSeqReq) returns(GetNewSeqResp);
rpc PullMessage(PullMessageReq) returns(PullMessageResp); rpc PullMessage(PullMessageReq) returns(PullMessageResp);
rpc PullMessageBySeqList(PullMessageBySeqListReq) returns(PullMessageResp);
rpc UserSendMsg(UserSendMsgReq) returns(UserSendMsgResp); rpc UserSendMsg(UserSendMsgReq) returns(UserSendMsgResp);
} }

View File

@ -51,7 +51,7 @@ func MsgToUser(sendPbData *pbRelay.MsgToUserReq, OfflineInfo, Options string) {
wsResult = append(wsResult, reply.Resp...) wsResult = append(wsResult, reply.Resp...)
} }
} }
log.InfoByKv("push_result", sendPbData.OperationID, "result", wsResult) log.InfoByKv("push_result", sendPbData.OperationID, "result", wsResult, "sendData", sendPbData)
if isOfflinePush { if isOfflinePush {
for _, t := range pushTerminal { for _, t := range pushTerminal {

View File

@ -41,7 +41,7 @@ func (rpc *rpcChat) PullMessage(_ context.Context, in *pbMsg.PullMessageReq) (*p
resp := new(pbMsg.PullMessageResp) resp := new(pbMsg.PullMessageResp)
var respSingleMsgFormat []*pbMsg.GatherFormat var respSingleMsgFormat []*pbMsg.GatherFormat
var respGroupMsgFormat []*pbMsg.GatherFormat var respGroupMsgFormat []*pbMsg.GatherFormat
SingleMsgFormat, GroupMsgFormat, MaxSeq, MinSeq, err := commonDB.DB.GetUserChat(in.UserID, in.SeqBegin, in.SeqEnd) SingleMsgFormat, GroupMsgFormat, MaxSeq, MinSeq, err := commonDB.DB.GetMsgBySeqRange(in.UserID, in.SeqBegin, in.SeqEnd)
if err != nil { if err != nil {
log.ErrorByKv("pullMsg data error", in.OperationID, in.String()) log.ErrorByKv("pullMsg data error", in.OperationID, in.String())
resp.ErrCode = 1 resp.ErrCode = 1
@ -59,6 +59,30 @@ func (rpc *rpcChat) PullMessage(_ context.Context, in *pbMsg.PullMessageReq) (*p
GroupUserMsg: respGroupMsgFormat, GroupUserMsg: respGroupMsgFormat,
}, nil }, nil
} }
func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *pbMsg.PullMessageBySeqListReq) (*pbMsg.PullMessageResp, error) {
log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String())
resp := new(pbMsg.PullMessageResp)
var respSingleMsgFormat []*pbMsg.GatherFormat
var respGroupMsgFormat []*pbMsg.GatherFormat
SingleMsgFormat, GroupMsgFormat, MaxSeq, MinSeq, err := commonDB.DB.GetMsgBySeqList(in.UserID, in.SeqList)
if err != nil {
log.ErrorByKv("PullMessageBySeqList data error", in.OperationID, in.String())
resp.ErrCode = 1
resp.ErrMsg = err.Error()
return resp, nil
}
respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID)
respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat)
return &pbMsg.PullMessageResp{
ErrCode: 0,
ErrMsg: "",
MaxSeq: MaxSeq,
MinSeq: MinSeq,
SingleUserMsg: respSingleMsgFormat,
GroupUserMsg: respGroupMsgFormat,
}, nil
panic("implement me")
}
func singleMsgHandleByUser(allMsg []*pbMsg.MsgFormat, ownerId string) []*pbMsg.GatherFormat { func singleMsgHandleByUser(allMsg []*pbMsg.MsgFormat, ownerId string) []*pbMsg.GatherFormat {
var userid string var userid string
var respMsgFormat []*pbMsg.GatherFormat var respMsgFormat []*pbMsg.GatherFormat

View File

@ -43,9 +43,11 @@ type MsgCallBackResp struct {
func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) { func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) {
replay := pbChat.UserSendMsgResp{} replay := pbChat.UserSendMsgResp{}
log.InfoByKv("sendMsg", pb.OperationID, "args", pb.String()) log.InfoByKv("sendMsg", pb.OperationID, "args", pb.String())
time := utils.GetCurrentTimestampByMill()
if !utils.VerifyToken(pb.Token, pb.SendID) { if !utils.VerifyToken(pb.Token, pb.SendID) {
return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0) return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0)
} }
log.NewInfo(pb.OperationID, "VerifyToken cost time ", utils.GetCurrentTimestampByMill()-time)
serverMsgID := GetMsgID(pb.SendID) serverMsgID := GetMsgID(pb.SendID)
pbData := pbChat.WSToMsgSvrChatMsg{} pbData := pbChat.WSToMsgSvrChatMsg{}
pbData.MsgFrom = pb.MsgFrom pbData.MsgFrom = pb.MsgFrom
@ -64,7 +66,7 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*
pbData.MsgID = serverMsgID pbData.MsgID = serverMsgID
pbData.OperationID = pb.OperationID pbData.OperationID = pb.OperationID
pbData.Token = pb.Token pbData.Token = pb.Token
pbData.SendTime = utils.GetCurrentTimestampByNano() pbData.SendTime = pb.SendTime
m := MsgCallBackResp{} m := MsgCallBackResp{}
if config.Config.MessageCallBack.CallbackSwitch { if config.Config.MessageCallBack.CallbackSwitch {
bMsg, err := http2.Post(config.Config.MessageCallBack.CallbackUrl, MsgCallBackReq{ bMsg, err := http2.Post(config.Config.MessageCallBack.CallbackUrl, MsgCallBackReq{
@ -88,85 +90,79 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*
return returnMsg(&replay, pb, m.ResponseErrCode, m.ErrMsg, "", 0) return returnMsg(&replay, pb, m.ResponseErrCode, m.ErrMsg, "", 0)
} else { } else {
pbData.Content = m.ResponseResult.ModifiedMsg pbData.Content = m.ResponseResult.ModifiedMsg
err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID)
err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID)
if err1 != nil || err2 != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
} }
} }
} else { }
switch pbData.SessionType { switch pbData.SessionType {
case constant.SingleChatType: case constant.SingleChatType:
err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID) time := utils.GetCurrentTimestampByMill()
err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID) err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID)
if err1 != nil || err2 != nil { err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID)
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) log.NewInfo(pb.OperationID, "send kafka cost time ", utils.GetCurrentTimestampByMill()-time)
} if err1 != nil || err2 != nil {
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
case constant.GroupChatType: }
etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName) return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
client := pbGroup.NewGroupClient(etcdConn) case constant.GroupChatType:
req := &pbGroup.GetGroupAllMemberReq{ etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName)
GroupID: pbData.RecvID, client := pbGroup.NewGroupClient(etcdConn)
Token: pbData.Token, req := &pbGroup.GetGroupAllMemberReq{
OperationID: pbData.OperationID, GroupID: pbData.RecvID,
} Token: pbData.Token,
reply, err := client.GetGroupAllMember(context.Background(), req) OperationID: pbData.OperationID,
}
reply, err := client.GetGroupAllMember(context.Background(), req)
if err != nil {
log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", err.Error())
return returnMsg(&replay, pb, 201, err.Error(), "", 0)
}
if reply.ErrorCode != 0 {
log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", reply.ErrorMsg)
return returnMsg(&replay, pb, reply.ErrorCode, reply.ErrorMsg, "", 0)
}
var addUidList []string
switch pbData.ContentType {
case constant.KickGroupMemberTip:
var notification content_struct.NotificationContent
var kickContent group.KickGroupMemberReq
err := utils.JsonStringToStruct(pbData.Content, &notification)
if err != nil { if err != nil {
log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", err.Error()) log.ErrorByKv("json unmarshall err", pbData.OperationID, "err", err.Error())
return returnMsg(&replay, pb, 201, err.Error(), "", 0) return returnMsg(&replay, pb, 200, err.Error(), "", 0)
} } else {
if reply.ErrorCode != 0 { err := utils.JsonStringToStruct(notification.Detail, &kickContent)
log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", reply.ErrorMsg)
return returnMsg(&replay, pb, reply.ErrorCode, reply.ErrorMsg, "", 0)
}
var addUidList []string
switch pbData.ContentType {
case constant.KickGroupMemberTip:
var notification content_struct.NotificationContent
var kickContent group.KickGroupMemberReq
err := utils.JsonStringToStruct(pbData.Content, &notification)
if err != nil { if err != nil {
log.ErrorByKv("json unmarshall err", pbData.OperationID, "err", err.Error()) log.ErrorByKv("json unmarshall err", pbData.OperationID, "err", err.Error())
return returnMsg(&replay, pb, 200, err.Error(), "", 0) return returnMsg(&replay, pb, 200, err.Error(), "", 0)
} else {
err := utils.JsonStringToStruct(notification.Detail, &kickContent)
if err != nil {
log.ErrorByKv("json unmarshall err", pbData.OperationID, "err", err.Error())
return returnMsg(&replay, pb, 200, err.Error(), "", 0)
}
for _, v := range kickContent.UidListInfo {
addUidList = append(addUidList, v.UserId)
}
} }
case constant.QuitGroupTip: for _, v := range kickContent.UidListInfo {
addUidList = append(addUidList, pbData.SendID) addUidList = append(addUidList, v.UserId)
default:
}
groupID := pbData.RecvID
for i, v := range reply.MemberList {
pbData.RecvID = v.UserId + " " + groupID
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i))
if err != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
} }
} }
for i, v := range addUidList { case constant.QuitGroupTip:
pbData.RecvID = v + " " + groupID addUidList = append(addUidList, pbData.SendID)
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1))
if err != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
default: default:
} }
groupID := pbData.RecvID
for i, v := range reply.MemberList {
pbData.RecvID = v.UserId + " " + groupID
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i))
if err != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
for i, v := range addUidList {
pbData.RecvID = v + " " + groupID
err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1))
if err != nil {
return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0)
}
}
return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime)
default:
return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0)
} }
return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0)
} }
func (rpc *rpcChat) sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string) error { func (rpc *rpcChat) sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string) error {
@ -186,6 +182,5 @@ func returnMsg(replay *pbChat.UserSendMsgResp, pb *pbChat.UserSendMsgReq, errCod
replay.ReqIdentifier = pb.ReqIdentifier replay.ReqIdentifier = pb.ReqIdentifier
replay.ClientMsgID = pb.ClientMsgID replay.ClientMsgID = pb.ClientMsgID
replay.ServerMsgID = serverMsgID replay.ServerMsgID = serverMsgID
replay.SendTime = sendTime
return replay, nil return replay, nil
} }

View File

@ -70,6 +70,12 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite
return &pbGroup.InviteUserToGroupResp{ErrorCode: config.ErrAccess.ErrCode, ErrorMsg: config.ErrAccess.ErrMsg}, nil return &pbGroup.InviteUserToGroupResp{ErrorCode: config.ErrAccess.ErrCode, ErrorMsg: config.ErrAccess.ErrMsg}, nil
} }
groupInfoFromMysql, err := imdb.FindGroupInfoByGroupId(req.GroupID)
if err != nil || groupInfoFromMysql == nil {
log.NewError(req.OperationID, "get group info error", req.GroupID, req.UidList)
return &pbGroup.InviteUserToGroupResp{ErrorCode: config.ErrAccess.ErrCode, ErrorMsg: config.ErrAccess.ErrMsg}, nil
}
// //
//from User: invite: applicant //from User: invite: applicant
//to user: invite: invited //to user: invite: invited

View File

@ -62,3 +62,6 @@ func GetMsgID(sendID string) string {
func int64ToString(i int64) string { func int64ToString(i int64) string {
return strconv.FormatInt(i, 10) return strconv.FormatInt(i, 10)
} }
func Int64ToString(i int64) string {
return strconv.FormatInt(i, 10)
}