diff --git a/script/msg_transfer_start.sh b/script/msg_transfer_start.sh index 30caeb466..9caeed9f2 100644 --- a/script/msg_transfer_start.sh +++ b/script/msg_transfer_start.sh @@ -8,18 +8,22 @@ source ./path_info.cfg #Check if the service exists #If it is exists,kill this process check=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep| wc -l` -if [ $check -eq 1 ] +if [ $check -ge 1 ] then oldPid=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep|awk '{print $2}'` kill -9 $oldPid fi #Waiting port recycling sleep 1 + cd ${msg_transfer_binary_root} - nohup ./${msg_transfer_name} >>../logs/openIM.log 2>&1 & +for ((i = 0; i < ${msg_transfer_service_num}; i++)); do + nohup ./${msg_transfer_name} >>../logs/openIM.log 2>&1 & +done + #Check launched service process check=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep| wc -l` -if [ $check -eq 1 ] +if [ $check -ge 1 ] then newPid=`ps aux | grep -w ./${msg_transfer_name} | grep -v grep|awk '{print $2}'` ports=`netstat -netulp | grep -w ${newPid}|awk '{print $4}'|awk -F '[:]' '{print $NF}'` diff --git a/script/path_info.cfg b/script/path_info.cfg index ed250d29d..f67e017c2 100644 --- a/script/path_info.cfg +++ b/script/path_info.cfg @@ -16,6 +16,7 @@ push_source_root="../src/push/" msg_transfer_name="open_im_msg_transfer" msg_transfer_binary_root="../bin/" msg_transfer_source_root="../src/msg_transfer/" +msg_transfer_service_num=2 sdk_server_name="open_im_sdk_server" diff --git a/script/push_start.sh b/script/push_start.sh index 668c0068f..9c7812dab 100644 --- a/script/push_start.sh +++ b/script/push_start.sh @@ -13,7 +13,7 @@ rpc_ports=($ports_array) #Check if the service exists #If it is exists,kill this process check=$(ps aux | grep -w ./${push_name} | grep -v grep | wc -l) -if [ $check -eq 1 ]; then +if [ $check -ge 1 ]; then oldPid=$(ps aux | grep -w ./${push_name} | grep -v grep | awk '{print $2}') kill -9 $oldPid fi @@ -28,7 +28,7 @@ done sleep 3 #Check launched service process check=$(ps aux | grep -w ./${push_name} | grep -v grep | wc -l) -if [ $check -eq 1 ]; then +if [ $check -ge 1 ]; then newPid=$(ps aux | grep -w ./${push_name} | grep -v grep | awk '{print $2}') ports=$(netstat -netulp | grep -w ${newPid} | awk '{print $4}' | awk -F '[:]' '{print $NF}') allPorts="" diff --git a/src/api/chat/pull_msg.go b/src/api/chat/pull_msg.go index 7e62d4f7d..e7a038ac6 100644 --- a/src/api/chat/pull_msg.go +++ b/src/api/chat/pull_msg.go @@ -70,3 +70,58 @@ func UserPullMsg(c *gin.Context) { }) } + +type paramsUserPullMsgBySeqList struct { + ReqIdentifier int `json:"reqIdentifier" binding:"required"` + SendID string `json:"sendID" binding:"required"` + OperationID string `json:"operationID" binding:"required"` + SeqList []int64 `json:"seqList"` +} + +func UserPullMsgBySeqList(c *gin.Context) { + params := paramsUserPullMsgBySeqList{} + if err := c.BindJSON(¶ms); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": err.Error()}) + return + } + + token := c.Request.Header.Get("token") + if !utils.VerifyToken(token, params.SendID) { + c.JSON(http.StatusBadRequest, gin.H{"errCode": 400, "errMsg": "token validate err"}) + return + } + pbData := pbChat.PullMessageBySeqListReq{} + pbData.UserID = params.SendID + pbData.OperationID = params.OperationID + pbData.SeqList = params.SeqList + + grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) + msgClient := pbChat.NewChatClient(grpcConn) + reply, err := msgClient.PullMessageBySeqList(context.Background(), &pbData) + if err != nil { + log.ErrorByKv("PullMessageBySeqList error", pbData.OperationID, "err", err.Error()) + return + } + log.InfoByKv("rpc call success to PullMessageBySeqList", pbData.OperationID, "ReplyArgs", reply.String(), "maxSeq", reply.GetMaxSeq(), + "MinSeq", reply.GetMinSeq(), "singLen", len(reply.GetSingleUserMsg()), "groupLen", len(reply.GetGroupUserMsg())) + + msg := make(map[string]interface{}) + if v := reply.GetSingleUserMsg(); v != nil { + msg["single"] = v + } else { + msg["single"] = []pbChat.GatherFormat{} + } + if v := reply.GetGroupUserMsg(); v != nil { + msg["group"] = v + } else { + msg["group"] = []pbChat.GatherFormat{} + } + msg["maxSeq"] = reply.GetMaxSeq() + msg["minSeq"] = reply.GetMinSeq() + c.JSON(http.StatusOK, gin.H{ + "errCode": reply.ErrCode, + "errMsg": reply.ErrMsg, + "reqIdentifier": params.ReqIdentifier, + "data": msg, + }) +} diff --git a/src/api/chat/send_msg.go b/src/api/chat/send_msg.go index ede81abdd..b6c1c75ac 100644 --- a/src/api/chat/send_msg.go +++ b/src/api/chat/send_msg.go @@ -87,7 +87,7 @@ func UserSendMsg(c *gin.Context) { "data": gin.H{ "clientMsgID": reply.ClientMsgID, "serverMsgID": reply.ServerMsgID, - "sendTime": reply.SendTime, + "sendTime": 0, }, }) diff --git a/src/api/friend/add_friend.go b/src/api/friend/add_friend.go index 40fee7dac..d2a67cafd 100644 --- a/src/api/friend/add_friend.go +++ b/src/api/friend/add_friend.go @@ -41,7 +41,9 @@ func ImportFriend(c *gin.Context) { OwnerUid: params.OwnerUid, Token: c.Request.Header.Get("token"), } + log.ErrorByKv("ImportFriend Test start", params.OperationID) RpcResp, err := client.ImportFriend(context.Background(), req) + //log.ErrorByKv("ImportFriend Test end", params.OperationID, "resp", RpcResp, "err", err.Error()) if err != nil { log.Error(req.Token, req.OperationID, "err=%s,ImportFriend failed", err) c.JSON(http.StatusInternalServerError, gin.H{"errCode": 500, "errMsg": "cImportFriend failed" + err.Error()}) diff --git a/src/api/manage/management_chat.go b/src/api/manage/management_chat.go index fd9c89b27..19c6713dd 100644 --- a/src/api/manage/management_chat.go +++ b/src/api/manage/management_chat.go @@ -125,7 +125,7 @@ func ManagementSendMsg(c *gin.Context) { c.JSON(http.StatusOK, gin.H{ "errCode": reply.ErrCode, "errMsg": reply.ErrMsg, - "sendTime": reply.SendTime, + "sendTime": "", "msgID": reply.ClientMsgID, }) diff --git a/src/api/open_im_api.go b/src/api/open_im_api.go index 6065ea7e7..674597203 100644 --- a/src/api/open_im_api.go +++ b/src/api/open_im_api.go @@ -86,6 +86,7 @@ func main() { chatGroup.POST("/newest_seq", apiChat.UserNewestSeq) chatGroup.POST("/pull_msg", apiChat.UserPullMsg) chatGroup.POST("/send_msg", apiChat.UserSendMsg) + chatGroup.POST("/pull_msg_by_seq", apiChat.UserPullMsgBySeqList) } //Manager managementGroup := r.Group("/manager") diff --git a/src/common/constant/constant.go b/src/common/constant/constant.go index 7a8f0d1ee..2d3d0d799 100644 --- a/src/common/constant/constant.go +++ b/src/common/constant/constant.go @@ -17,10 +17,12 @@ const ( RefuseFriendFlag = -1 //Websocket Protocol - WSGetNewestSeq = 1001 - WSPullMsg = 1002 - WSSendMsg = 1003 - WSPushMsg = 2001 + WSGetNewestSeq = 1001 + WSPullMsg = 1002 + WSSendMsg = 1003 + WSPullMsgBySeqList = 1004 + WSPushMsg = 2001 + WSDataError = 3001 ///ContentType //UserRelated diff --git a/src/common/db/mongoModel.go b/src/common/db/mongoModel.go index 3d0a7b4aa..a6e2651fe 100644 --- a/src/common/db/mongoModel.go +++ b/src/common/db/mongoModel.go @@ -3,6 +3,7 @@ package db import ( "Open_IM/src/common/config" "Open_IM/src/common/constant" + "Open_IM/src/common/log" pbMsg "Open_IM/src/proto/chat" "errors" "github.com/golang/protobuf/proto" @@ -28,8 +29,8 @@ type GroupMember struct { UIDList []string } -func (d *DataBases) GetUserChat(uid string, seqBegin, seqEnd int64) (SingleMsg []*pbMsg.MsgFormat, GroupMsg []*pbMsg.MsgFormat, MaxSeq int64, MinSeq int64, err error) { - count := 0 +func (d *DataBases) GetMsgBySeqRange(uid string, seqBegin, seqEnd int64) (SingleMsg []*pbMsg.MsgFormat, GroupMsg []*pbMsg.MsgFormat, MaxSeq int64, MinSeq int64, err error) { + var count int64 session := d.mgoSession.Clone() if session == nil { return nil, nil, MaxSeq, MinSeq, errors.New("session == nil") @@ -76,27 +77,85 @@ func (d *DataBases) GetUserChat(uid string, seqBegin, seqEnd int64) (SingleMsg [ GroupMsg = append(GroupMsg, temp) } count++ + if count == (seqEnd - seqBegin + 1) { + break + } } } return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil } +func (d *DataBases) GetMsgBySeqList(uid string, seqList []int64) (SingleMsg []*pbMsg.MsgFormat, GroupMsg []*pbMsg.MsgFormat, MaxSeq int64, MinSeq int64, err error) { + count := 0 + session := d.mgoSession.Clone() + if session == nil { + return nil, nil, MaxSeq, MinSeq, errors.New("session == nil") + } + defer session.Close() + c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) + + sChat := UserChat{} + if err = c.Find(bson.M{"uid": uid}).One(&sChat); err != nil { + return nil, nil, MaxSeq, MinSeq, err + } + pChat := pbMsg.MsgSvrToPushSvrChatMsg{} + for i := 0; i < len(sChat.Msg); i++ { + temp := new(pbMsg.MsgFormat) + if err = proto.Unmarshal(sChat.Msg[i].Msg, &pChat); err != nil { + return nil, nil, MaxSeq, MinSeq, err + } + if isContainInt64(pChat.RecvSeq, seqList) { + temp.SendID = pChat.SendID + temp.RecvID = pChat.RecvID + temp.MsgFrom = pChat.MsgFrom + temp.Seq = pChat.RecvSeq + temp.ServerMsgID = pChat.MsgID + temp.SendTime = pChat.SendTime + temp.Content = pChat.Content + temp.ContentType = pChat.ContentType + temp.SenderPlatformID = pChat.PlatformID + temp.ClientMsgID = pChat.ClientMsgID + temp.SenderFaceURL = pChat.SenderFaceURL + temp.SenderNickName = pChat.SenderNickName + if pChat.RecvSeq > MaxSeq { + MaxSeq = pChat.RecvSeq + } + if count == 0 { + MinSeq = pChat.RecvSeq + } + if pChat.RecvSeq < MinSeq { + MinSeq = pChat.RecvSeq + } + if pChat.SessionType == constant.SingleChatType { + SingleMsg = append(SingleMsg, temp) + } else { + GroupMsg = append(GroupMsg, temp) + } + count++ + if count == len(seqList) { + break + } + } + } + + return SingleMsg, GroupMsg, MaxSeq, MinSeq, nil +} func (d *DataBases) SaveUserChat(uid string, sendTime int64, m proto.Message) error { - + newTime := getCurrentTimestampByMill() session := d.mgoSession.Clone() if session == nil { return errors.New("session == nil") } defer session.Close() - + log.NewInfo("", "get mgoSession cost time", getCurrentTimestampByMill()-newTime) c := session.DB(config.Config.Mongo.DBDatabase).C(cChat) n, err := c.Find(bson.M{"uid": uid}).Count() if err != nil { return err } - + log.NewInfo("", "find mgo uid cost time", getCurrentTimestampByMill()-newTime) sMsg := MsgInfo{} sMsg.SendTime = sendTime if sMsg.Msg, err = proto.Marshal(m); err != nil { @@ -117,7 +176,7 @@ func (d *DataBases) SaveUserChat(uid string, sendTime int64, m proto.Message) er return err } } - + log.NewInfo("", "insert mgo data cost time", getCurrentTimestampByMill()-newTime) return nil } @@ -231,3 +290,17 @@ func (d *DataBases) DelGroupMember(groupID, uid string) error { return nil } +func isContainInt64(target int64, List []int64) bool { + + for _, element := range List { + + if target == element { + return true + } + } + return false + +} +func getCurrentTimestampByMill() int64 { + return time.Now().UnixNano() / 1e6 +} diff --git a/src/msg_gateway/gate/logic.go b/src/msg_gateway/gate/logic.go index a79fd25c6..2f99fa8e7 100644 --- a/src/msg_gateway/gate/logic.go +++ b/src/msg_gateway/gate/logic.go @@ -9,11 +9,13 @@ import ( "Open_IM/src/utils" "context" "encoding/json" + "fmt" "github.com/gorilla/websocket" + "runtime" "strings" ) -func (ws *WServer) msgParse(conn *websocket.Conn, jsonMsg []byte) { +func (ws *WServer) msgParse(conn *UserConn, jsonMsg []byte) { //ws online debug data //{"ReqIdentifier":1001,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0} //{"ReqIdentifier":1002,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0,"SeqBegin":1,"SeqEnd":6} @@ -23,32 +25,43 @@ func (ws *WServer) msgParse(conn *websocket.Conn, jsonMsg []byte) { m := Req{} if err := json.Unmarshal(jsonMsg, &m); err != nil { log.ErrorByKv("ws json Unmarshal err", "", "err", err.Error()) - ws.sendErrMsg(conn, 200, err.Error()) + ws.sendErrMsg(conn, 200, err.Error(), constant.WSDataError, "") + err = conn.Close() + if err != nil { + log.NewError("", "ws close err", err.Error()) + } return } if err := validate.Struct(m); err != nil { log.ErrorByKv("ws args validate err", "", "err", err.Error()) - ws.sendErrMsg(conn, 201, err.Error()) + ws.sendErrMsg(conn, 201, err.Error(), m.ReqIdentifier, m.MsgIncr) return } if !utils.VerifyToken(m.Token, m.SendID) { - ws.sendErrMsg(conn, 202, "token validate err") + ws.sendErrMsg(conn, 202, "token validate err", m.ReqIdentifier, m.MsgIncr) return } + fmt.Println("test fmt Basic Info Authentication Success", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID) log.InfoByKv("Basic Info Authentication Success", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID) switch m.ReqIdentifier { case constant.WSGetNewestSeq: - ws.newestSeqReq(conn, &m) + go ws.newestSeqReq(conn, &m) case constant.WSPullMsg: - ws.pullMsgReq(conn, &m) + go ws.pullMsgReq(conn, &m) case constant.WSSendMsg: - ws.sendMsgReq(conn, &m) + sendTime := utils.GetCurrentTimestampByNano() + go ws.sendMsgReq(conn, &m, sendTime) + case constant.WSPullMsgBySeqList: + go ws.pullMsgBySeqListReq(conn, &m) default: } + + log.NewInfo("", "goroutine num is ", runtime.NumGoroutine()) + } -func (ws *WServer) newestSeqResp(conn *websocket.Conn, m *Req, pb *pbChat.GetNewSeqResp) { +func (ws *WServer) newestSeqResp(conn *UserConn, m *Req, pb *pbChat.GetNewSeqResp) { mReply := make(map[string]interface{}) mData := make(map[string]interface{}) mReply["reqIdentifier"] = m.ReqIdentifier @@ -59,7 +72,7 @@ func (ws *WServer) newestSeqResp(conn *websocket.Conn, m *Req, pb *pbChat.GetNew mReply["data"] = mData ws.sendMsg(conn, mReply) } -func (ws *WServer) newestSeqReq(conn *websocket.Conn, m *Req) { +func (ws *WServer) newestSeqReq(conn *UserConn, m *Req) { log.InfoByKv("Ws call success to getNewSeq", m.OperationID, "Parameters", m) pbData := pbChat.GetNewSeqReq{} pbData.UserID = m.SendID @@ -79,7 +92,7 @@ func (ws *WServer) newestSeqReq(conn *websocket.Conn, m *Req) { } -func (ws *WServer) pullMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.PullMessageResp) { +func (ws *WServer) pullMsgResp(conn *UserConn, m *Req, pb *pbChat.PullMessageResp) { mReply := make(map[string]interface{}) msg := make(map[string]interface{}) mReply["reqIdentifier"] = m.ReqIdentifier @@ -104,7 +117,7 @@ func (ws *WServer) pullMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.PullMess } -func (ws *WServer) pullMsgReq(conn *websocket.Conn, m *Req) { +func (ws *WServer) pullMsgReq(conn *UserConn, m *Req) { log.InfoByKv("Ws call success to pullMsgReq", m.OperationID, "Parameters", m) reply := new(pbChat.PullMessageResp) isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsg) @@ -130,21 +143,55 @@ func (ws *WServer) pullMsgReq(conn *websocket.Conn, m *Req) { ws.pullMsgResp(conn, m, reply) } } +func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) { + log.NewError(m.OperationID, "Ws call success to pullMsgBySeqListReq", m) + reply := new(pbChat.PullMessageResp) + isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList) + if isPass { + pbData := pbChat.PullMessageBySeqListReq{} + pbData.SeqList = data.(SeqListData).SeqList + pbData.UserID = m.SendID + pbData.OperationID = m.OperationID + grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) + msgClient := pbChat.NewChatClient(grpcConn) + reply, err := msgClient.PullMessageBySeqList(context.Background(), &pbData) + if err != nil { + log.NewError(pbData.OperationID, "pullMsgBySeqListReq err", err.Error()) + return + } + log.NewInfo(pbData.OperationID, "rpc call success to pullMsgBySeqListReq", reply.String(), reply.GetMaxSeq(), reply.GetMinSeq(), len(reply.GetSingleUserMsg()), len(reply.GetGroupUserMsg())) + ws.pullMsgResp(conn, m, reply) + } else { + reply.ErrCode = errCode + reply.ErrMsg = errMsg + ws.pullMsgResp(conn, m, reply) + } +} -func (ws *WServer) sendMsgResp(conn *websocket.Conn, m *Req, pb *pbChat.UserSendMsgResp) { - mReply := make(map[string]interface{}) +func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.UserSendMsgResp, sendTime int64) { + // := make(map[string]interface{}) mReplyData := make(map[string]interface{}) - mReply["reqIdentifier"] = m.ReqIdentifier - mReply["msgIncr"] = m.MsgIncr - mReply["errCode"] = pb.GetErrCode() - mReply["errMsg"] = pb.GetErrMsg() + //mReply["reqIdentifier"] = m.ReqIdentifier + //mReply["msgIncr"] = m.MsgIncr + //mReply["errCode"] = pb.GetErrCode() + //mReply["errMsg"] = pb.GetErrMsg() mReplyData["clientMsgID"] = pb.GetClientMsgID() mReplyData["serverMsgID"] = pb.GetServerMsgID() - mReply["data"] = mReplyData + mReplyData["sendTime"] = utils.Int64ToString(sendTime) + //mReply["data"] = mReplyData + mReply := Resp{ + ReqIdentifier: m.ReqIdentifier, + MsgIncr: m.MsgIncr, + ErrCode: pb.GetErrCode(), + ErrMsg: pb.GetErrMsg(), + OperationID: m.OperationID, + Data: mReplyData, + } + fmt.Println("test fmt send msg resp", m.OperationID, "reqIdentifier", m.ReqIdentifier, "sendID", m.SendID) ws.sendMsg(conn, mReply) } -func (ws *WServer) sendMsgReq(conn *websocket.Conn, m *Req) { +func (ws *WServer) sendMsgReq(conn *UserConn, m *Req, sendTime int64) { log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m) reply := new(pbChat.UserSendMsgResp) isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg) @@ -165,32 +212,43 @@ func (ws *WServer) sendMsgReq(conn *websocket.Conn, m *Req) { Options: utils.MapToJsonString(data.Options), ClientMsgID: data.ClientMsgID, OffLineInfo: utils.MapToJsonString(data.OfflineInfo), + SendTime: sendTime, } log.InfoByKv("Ws call success to sendMsgReq", m.OperationID, "Parameters", m) + time := utils.GetCurrentTimestampBySecond() etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOfflineMessageName) client := pbChat.NewChatClient(etcdConn) - log.Info("", "", "api UserSendMsg call, api call rpc...") - reply, _ := client.UserSendMsg(context.Background(), &pbData) + log.Info("", "", "ws UserSendMsg call, api call rpc...") + reply, err := client.UserSendMsg(context.Background(), &pbData) + if err != nil { + log.NewError(pbData.OperationID, "UserSendMsg err", err.Error()) + reply.ErrCode = 100 + reply.ErrMsg = "rpc err" + } + log.NewInfo(pbData.OperationID, "sendMsgReq call rpc cost time ", utils.GetCurrentTimestampBySecond()-time) log.Info("", "", "api UserSendMsg call end..., [data: %s] [reply: %s]", pbData.String(), reply.String()) - ws.sendMsgResp(conn, m, reply) + ws.sendMsgResp(conn, m, reply, sendTime) + log.NewInfo(pbData.OperationID, "sendMsgResp end cost time ", utils.GetCurrentTimestampBySecond()-time) } else { reply.ErrCode = errCode reply.ErrMsg = errMsg - ws.sendMsgResp(conn, m, reply) + ws.sendMsgResp(conn, m, reply, sendTime) } } -func (ws *WServer) sendMsg(conn *websocket.Conn, mReply map[string]interface{}) { +func (ws *WServer) sendMsg(conn *UserConn, mReply interface{}) { bMsg, _ := json.Marshal(mReply) err := ws.writeMsg(conn, websocket.TextMessage, bMsg) if err != nil { log.ErrorByKv("WS WriteMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err, "mReply", mReply) } } -func (ws *WServer) sendErrMsg(conn *websocket.Conn, errCode int32, errMsg string) { +func (ws *WServer) sendErrMsg(conn *UserConn, errCode int32, errMsg string, reqIdentifier int32, msgIncr string) { mReply := make(map[string]interface{}) mReply["errCode"] = errCode mReply["errMsg"] = errMsg + mReply["reqIdentifier"] = reqIdentifier + mReply["msgIncr"] = msgIncr ws.sendMsg(conn, mReply) } diff --git a/src/msg_gateway/gate/rpc_server.go b/src/msg_gateway/gate/rpc_server.go index e9aa28033..bab04bb96 100644 --- a/src/msg_gateway/gate/rpc_server.go +++ b/src/msg_gateway/gate/rpc_server.go @@ -81,10 +81,12 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR case constant.GroupChatType: RecvID = strings.Split(in.GetRecvID(), " ")[0] } - log.InfoByKv("test", in.OperationID, "wsUserToConn", ws.wsUserToConn) - for key, conn := range ws.wsUserToConn { - UIDAndPID := strings.Split(key, " ") - if UIDAndPID[0] == RecvID { + var tag bool + a := genUidPlatformArray(RecvID) + for _, v := range a { + if conn := ws.getUserConn(v); conn != nil { + UIDAndPID := strings.Split(v, " ") + tag = true resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0]) temp := &pbRelay.SingleMsgToUser{ ResultCode: resultCode, @@ -94,6 +96,25 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR resp = append(resp, temp) } } + if !tag { + log.NewError(in.OperationID, "push err ,ws conn not in map", in.String()) + } + //for key, conn := range ws.wsUserToConn { + // UIDAndPID := strings.Split(key, " ") + // if UIDAndPID[0] == RecvID { + // tag = true + // resultCode := sendMsgToUser(conn, bMsg, in, UIDAndPID[1], UIDAndPID[0]) + // temp := &pbRelay.SingleMsgToUser{ + // ResultCode: resultCode, + // RecvID: UIDAndPID[0], + // RecvPlatFormID: utils.PlatformNameToID(UIDAndPID[1]), + // } + // resp = append(resp, temp) + // } + //} + //if !tag { + // log.NewError(in.OperationID, "push err ,ws conn not in map", in.String()) + //} //switch in.GetContentType() { //case constant.SyncSenderMsg: // log.InfoByKv("come sync", in.OperationID, "args", in.String()) @@ -143,7 +164,7 @@ func (r *RPCServer) MsgToUser(_ context.Context, in *pbRelay.MsgToUserReq) (*pbR }, nil } -func sendMsgToUser(conn *websocket.Conn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) { +func sendMsgToUser(conn *UserConn, bMsg []byte, in *pbRelay.MsgToUserReq, RecvPlatForm, RecvID string) (ResultCode int64) { err := ws.writeMsg(conn, websocket.TextMessage, bMsg) if err != nil { log.ErrorByKv("PushMsgToUser is failed By Ws", "", "Addr", conn.RemoteAddr().String(), @@ -157,3 +178,9 @@ func sendMsgToUser(conn *websocket.Conn, bMsg []byte, in *pbRelay.MsgToUserReq, } } +func genUidPlatformArray(uid string) (array []string) { + for i := 1; i <= utils.LinuxPlatformID; i++ { + array = append(array, uid+" "+utils.PlatformIDToName(int32(i))) + } + return array +} diff --git a/src/msg_gateway/gate/validate.go b/src/msg_gateway/gate/validate.go index a7a75f8e8..57ec1877d 100644 --- a/src/msg_gateway/gate/validate.go +++ b/src/msg_gateway/gate/validate.go @@ -13,13 +13,22 @@ import ( ) type Req struct { - ReqIdentifier int32 `json:"reqIdentifier" validate:"required"` - Token string `json:"token" validate:"required"` - SendID string `json:"sendID" validate:"required"` - OperationID string `json:"operationID" validate:"required"` - MsgIncr int32 `json:"msgIncr" validate:"required"` - Data map[string]interface{} `json:"data"` + ReqIdentifier int32 `json:"reqIdentifier" validate:"required"` + Token string `json:"token" validate:"required"` + SendID string `json:"sendID" validate:"required"` + OperationID string `json:"operationID" validate:"required"` + MsgIncr string `json:"msgIncr" validate:"required"` + Data interface{} `json:"data"` } +type Resp struct { + ReqIdentifier int32 `json:"reqIdentifier"` + MsgIncr string `json:"msgIncr"` + OperationID string `json:"operationID"` + ErrCode int32 `json:"errCode"` + ErrMsg string `json:"errMsg"` + Data interface{} `json:"data"` +} + type SeqData struct { SeqBegin int64 `mapstructure:"seqBegin" validate:"required"` SeqEnd int64 `mapstructure:"seqEnd" validate:"required"` @@ -30,13 +39,16 @@ type MsgData struct { MsgFrom int32 `mapstructure:"msgFrom" validate:"required"` ContentType int32 `mapstructure:"contentType" validate:"required"` RecvID string `mapstructure:"recvID" validate:"required"` - ForceList []string `mapstructure:"forceList" validate:"required"` + ForceList []string `mapstructure:"forceList"` Content string `mapstructure:"content" validate:"required"` Options map[string]interface{} `mapstructure:"options" validate:"required"` ClientMsgID string `mapstructure:"clientMsgID" validate:"required"` OfflineInfo map[string]interface{} `mapstructure:"offlineInfo" validate:"required"` Ext map[string]interface{} `mapstructure:"ext"` } +type SeqListData struct { + SeqList []int64 `mapstructure:"seqList" validate:"required"` +} func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, data interface{}) { switch r { @@ -44,6 +56,8 @@ func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, er data = SeqData{} case constant.WSSendMsg: data = MsgData{} + case constant.WSPullMsgBySeqList: + data = SeqListData{} default: } if err := mapstructure.WeakDecode(m.Data, &data); err != nil { diff --git a/src/msg_gateway/gate/ws_server.go b/src/msg_gateway/gate/ws_server.go index ffc14680d..122e525cb 100644 --- a/src/msg_gateway/gate/ws_server.go +++ b/src/msg_gateway/gate/ws_server.go @@ -6,23 +6,28 @@ import ( "Open_IM/src/utils" "github.com/gorilla/websocket" "net/http" + "sync" "time" ) +type UserConn struct { + *websocket.Conn + w *sync.Mutex +} type WServer struct { wsAddr string wsMaxConnNum int wsUpGrader *websocket.Upgrader - wsConnToUser map[*websocket.Conn]string - wsUserToConn map[string]*websocket.Conn + wsConnToUser map[*UserConn]string + wsUserToConn map[string]*UserConn } func (ws *WServer) onInit(wsPort int) { ip := utils.ServerIP ws.wsAddr = ip + ":" + utils.IntToString(wsPort) ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum - ws.wsConnToUser = make(map[*websocket.Conn]string) - ws.wsUserToConn = make(map[string]*websocket.Conn) + ws.wsConnToUser = make(map[*UserConn]string) + ws.wsUserToConn = make(map[string]*UserConn) ws.wsUpGrader = &websocket.Upgrader{ HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second, ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen, @@ -49,35 +54,36 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) { //Connection mapping relationship, //userID+" "+platformID->conn SendID := query["sendID"][0] + " " + utils.PlatformIDToName(int32(utils.StringToInt64(query["platformID"][0]))) - ws.addUserConn(SendID, conn) - go ws.readMsg(conn) + //Initialize a lock for each user + newConn := &UserConn{conn, new(sync.Mutex)} + ws.addUserConn(SendID, newConn) + go ws.readMsg(newConn) } } } -func (ws *WServer) readMsg(conn *websocket.Conn) { +func (ws *WServer) readMsg(conn *UserConn) { for { - msgType, msg, err := conn.ReadMessage() + _, msg, err := conn.ReadMessage() if err != nil { log.ErrorByKv("WS ReadMsg error", "", "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn), "error", err) ws.delUserConn(conn) return } else { - log.ErrorByKv("test", "", "msgType", msgType, "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn)) + //log.ErrorByKv("test", "", "msgType", msgType, "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn)) } ws.msgParse(conn, msg) //ws.writeMsg(conn, 1, chat) } } - -func (ws *WServer) writeMsg(conn *websocket.Conn, a int, msg []byte) error { - rwLock.Lock() - defer rwLock.Unlock() +func (ws *WServer) writeMsg(conn *UserConn, a int, msg []byte) error { + conn.w.Lock() + defer conn.w.Unlock() return conn.WriteMessage(a, msg) } -func (ws *WServer) addUserConn(uid string, conn *websocket.Conn) { +func (ws *WServer) addUserConn(uid string, conn *UserConn) { rwLock.Lock() defer rwLock.Unlock() if oldConn, ok := ws.wsUserToConn[uid]; ok { @@ -95,7 +101,7 @@ func (ws *WServer) addUserConn(uid string, conn *websocket.Conn) { } -func (ws *WServer) delUserConn(conn *websocket.Conn) { +func (ws *WServer) delUserConn(conn *UserConn) { rwLock.Lock() defer rwLock.Unlock() var uidPlatform string @@ -111,12 +117,12 @@ func (ws *WServer) delUserConn(conn *websocket.Conn) { } err := conn.Close() if err != nil { - log.ErrorByKv("close err", "", "uid", uidPlatform, "conn", conn) + log.ErrorByKv("close err", "", "uid", uidPlatform) } } -func (ws *WServer) getUserConn(uid string) *websocket.Conn { +func (ws *WServer) getUserConn(uid string) *UserConn { rwLock.RLock() defer rwLock.RUnlock() if conn, ok := ws.wsUserToConn[uid]; ok { @@ -124,7 +130,7 @@ func (ws *WServer) getUserConn(uid string) *websocket.Conn { } return nil } -func (ws *WServer) getUserUid(conn *websocket.Conn) string { +func (ws *WServer) getUserUid(conn *UserConn) string { rwLock.RLock() defer rwLock.RUnlock() diff --git a/src/msg_transfer/logic/db.go b/src/msg_transfer/logic/db.go index 93dccfc2b..65eaf691e 100644 --- a/src/msg_transfer/logic/db.go +++ b/src/msg_transfer/logic/db.go @@ -3,15 +3,20 @@ package logic import ( "Open_IM/src/common/db" "Open_IM/src/common/db/mysql_model/im_mysql_model" + "Open_IM/src/common/log" pbMsg "Open_IM/src/proto/chat" + "Open_IM/src/utils" ) func saveUserChat(uid string, pbMsg *pbMsg.MsgSvrToPushSvrChatMsg) error { + time := utils.GetCurrentTimestampByMill() seq, err := db.DB.IncrUserSeq(uid) if err != nil { + log.NewError(pbMsg.OperationID, "data insert to redis err", err.Error(), pbMsg.String()) return err } pbMsg.RecvSeq = seq + log.NewInfo(pbMsg.OperationID, "IncrUserSeq cost time", utils.GetCurrentTimestampByMill()-time) return db.DB.SaveUserChat(uid, pbMsg.SendTime, pbMsg) } diff --git a/src/msg_transfer/logic/history_msg_handler.go b/src/msg_transfer/logic/history_msg_handler.go index bea5c20b4..c547e449b 100644 --- a/src/msg_transfer/logic/history_msg_handler.go +++ b/src/msg_transfer/logic/history_msg_handler.go @@ -33,6 +33,7 @@ func (mc *HistoryConsumerHandler) Init() { func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) { log.InfoByKv("chat come mongo!!!", "", "chat", string(msg)) + time := utils.GetCurrentTimestampBySecond() pbData := pbMsg.WSToMsgSvrChatMsg{} err := proto.Unmarshal(msg, &pbData) if err != nil { @@ -78,11 +79,13 @@ func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) //} } + log.NewInfo(pbSaveData.OperationID, "saveUserChat cost time ", utils.GetCurrentTimestampBySecond()-time) } if msgKey == pbSaveData.RecvID { pbSaveData.Options = pbData.Options pbSaveData.OfflineInfo = pbData.OfflineInfo sendMessageToPush(&pbSaveData) + log.NewInfo(pbSaveData.OperationID, "sendMessageToPush cost time ", utils.GetCurrentTimestampBySecond()-time) } log.InfoByKv("msg_transfer handle topic success...", "", "") diff --git a/src/msg_transfer/logic/init.go b/src/msg_transfer/logic/init.go index 03a7d2c33..cd7f10106 100644 --- a/src/msg_transfer/logic/init.go +++ b/src/msg_transfer/logic/init.go @@ -20,6 +20,6 @@ func Init() { } func Run() { //register mysqlConsumerHandler to - go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH) + //go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH) go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH) } diff --git a/src/proto/chat/chat.pb.go b/src/proto/chat/chat.pb.go index f98855332..87867262a 100644 --- a/src/proto/chat/chat.pb.go +++ b/src/proto/chat/chat.pb.go @@ -50,7 +50,7 @@ func (m *WSToMsgSvrChatMsg) Reset() { *m = WSToMsgSvrChatMsg{} } func (m *WSToMsgSvrChatMsg) String() string { return proto.CompactTextString(m) } func (*WSToMsgSvrChatMsg) ProtoMessage() {} func (*WSToMsgSvrChatMsg) Descriptor() ([]byte, []int) { - return fileDescriptor_chat_34beadf7348900d2, []int{0} + return fileDescriptor_chat_4f51bfe711d80421, []int{0} } func (m *WSToMsgSvrChatMsg) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_WSToMsgSvrChatMsg.Unmarshal(m, b) @@ -215,7 +215,7 @@ func (m *MsgSvrToPushSvrChatMsg) Reset() { *m = MsgSvrToPushSvrChatMsg{} func (m *MsgSvrToPushSvrChatMsg) String() string { return proto.CompactTextString(m) } func (*MsgSvrToPushSvrChatMsg) ProtoMessage() {} func (*MsgSvrToPushSvrChatMsg) Descriptor() ([]byte, []int) { - return fileDescriptor_chat_34beadf7348900d2, []int{1} + return fileDescriptor_chat_4f51bfe711d80421, []int{1} } func (m *MsgSvrToPushSvrChatMsg) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MsgSvrToPushSvrChatMsg.Unmarshal(m, b) @@ -361,7 +361,7 @@ func (m *PullMessageReq) Reset() { *m = PullMessageReq{} } func (m *PullMessageReq) String() string { return proto.CompactTextString(m) } func (*PullMessageReq) ProtoMessage() {} func (*PullMessageReq) Descriptor() ([]byte, []int) { - return fileDescriptor_chat_34beadf7348900d2, []int{2} + return fileDescriptor_chat_4f51bfe711d80421, []int{2} } func (m *PullMessageReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PullMessageReq.Unmarshal(m, b) @@ -425,7 +425,7 @@ func (m *PullMessageResp) Reset() { *m = PullMessageResp{} } func (m *PullMessageResp) String() string { return proto.CompactTextString(m) } func (*PullMessageResp) ProtoMessage() {} func (*PullMessageResp) Descriptor() ([]byte, []int) { - return fileDescriptor_chat_34beadf7348900d2, []int{3} + return fileDescriptor_chat_4f51bfe711d80421, []int{3} } func (m *PullMessageResp) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PullMessageResp.Unmarshal(m, b) @@ -487,6 +487,60 @@ func (m *PullMessageResp) GetGroupUserMsg() []*GatherFormat { return nil } +type PullMessageBySeqListReq struct { + UserID string `protobuf:"bytes,1,opt,name=UserID" json:"UserID,omitempty"` + OperationID string `protobuf:"bytes,2,opt,name=OperationID" json:"OperationID,omitempty"` + SeqList []int64 `protobuf:"varint,3,rep,packed,name=seqList" json:"seqList,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PullMessageBySeqListReq) Reset() { *m = PullMessageBySeqListReq{} } +func (m *PullMessageBySeqListReq) String() string { return proto.CompactTextString(m) } +func (*PullMessageBySeqListReq) ProtoMessage() {} +func (*PullMessageBySeqListReq) Descriptor() ([]byte, []int) { + return fileDescriptor_chat_4f51bfe711d80421, []int{4} +} +func (m *PullMessageBySeqListReq) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_PullMessageBySeqListReq.Unmarshal(m, b) +} +func (m *PullMessageBySeqListReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_PullMessageBySeqListReq.Marshal(b, m, deterministic) +} +func (dst *PullMessageBySeqListReq) XXX_Merge(src proto.Message) { + xxx_messageInfo_PullMessageBySeqListReq.Merge(dst, src) +} +func (m *PullMessageBySeqListReq) XXX_Size() int { + return xxx_messageInfo_PullMessageBySeqListReq.Size(m) +} +func (m *PullMessageBySeqListReq) XXX_DiscardUnknown() { + xxx_messageInfo_PullMessageBySeqListReq.DiscardUnknown(m) +} + +var xxx_messageInfo_PullMessageBySeqListReq proto.InternalMessageInfo + +func (m *PullMessageBySeqListReq) GetUserID() string { + if m != nil { + return m.UserID + } + return "" +} + +func (m *PullMessageBySeqListReq) GetOperationID() string { + if m != nil { + return m.OperationID + } + return "" +} + +func (m *PullMessageBySeqListReq) GetSeqList() []int64 { + if m != nil { + return m.SeqList + } + return nil +} + type GetNewSeqReq struct { UserID string `protobuf:"bytes,1,opt,name=UserID" json:"UserID,omitempty"` OperationID string `protobuf:"bytes,2,opt,name=OperationID" json:"OperationID,omitempty"` @@ -499,7 +553,7 @@ func (m *GetNewSeqReq) Reset() { *m = GetNewSeqReq{} } func (m *GetNewSeqReq) String() string { return proto.CompactTextString(m) } func (*GetNewSeqReq) ProtoMessage() {} func (*GetNewSeqReq) Descriptor() ([]byte, []int) { - return fileDescriptor_chat_34beadf7348900d2, []int{4} + return fileDescriptor_chat_4f51bfe711d80421, []int{5} } func (m *GetNewSeqReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_GetNewSeqReq.Unmarshal(m, b) @@ -546,7 +600,7 @@ func (m *GetNewSeqResp) Reset() { *m = GetNewSeqResp{} } func (m *GetNewSeqResp) String() string { return proto.CompactTextString(m) } func (*GetNewSeqResp) ProtoMessage() {} func (*GetNewSeqResp) Descriptor() ([]byte, []int) { - return fileDescriptor_chat_34beadf7348900d2, []int{5} + return fileDescriptor_chat_4f51bfe711d80421, []int{6} } func (m *GetNewSeqResp) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_GetNewSeqResp.Unmarshal(m, b) @@ -601,7 +655,7 @@ func (m *GatherFormat) Reset() { *m = GatherFormat{} } func (m *GatherFormat) String() string { return proto.CompactTextString(m) } func (*GatherFormat) ProtoMessage() {} func (*GatherFormat) Descriptor() ([]byte, []int) { - return fileDescriptor_chat_34beadf7348900d2, []int{6} + return fileDescriptor_chat_4f51bfe711d80421, []int{7} } func (m *GatherFormat) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_GatherFormat.Unmarshal(m, b) @@ -669,7 +723,7 @@ func (m *MsgFormat) Reset() { *m = MsgFormat{} } func (m *MsgFormat) String() string { return proto.CompactTextString(m) } func (*MsgFormat) ProtoMessage() {} func (*MsgFormat) Descriptor() ([]byte, []int) { - return fileDescriptor_chat_34beadf7348900d2, []int{7} + return fileDescriptor_chat_4f51bfe711d80421, []int{8} } func (m *MsgFormat) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MsgFormat.Unmarshal(m, b) @@ -791,6 +845,7 @@ type UserSendMsgReq struct { ClientMsgID string `protobuf:"bytes,15,opt,name=ClientMsgID" json:"ClientMsgID,omitempty"` OffLineInfo string `protobuf:"bytes,16,opt,name=OffLineInfo" json:"OffLineInfo,omitempty"` Ex string `protobuf:"bytes,17,opt,name=Ex" json:"Ex,omitempty"` + SendTime int64 `protobuf:"varint,18,opt,name=sendTime" json:"sendTime,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -800,7 +855,7 @@ func (m *UserSendMsgReq) Reset() { *m = UserSendMsgReq{} } func (m *UserSendMsgReq) String() string { return proto.CompactTextString(m) } func (*UserSendMsgReq) ProtoMessage() {} func (*UserSendMsgReq) Descriptor() ([]byte, []int) { - return fileDescriptor_chat_34beadf7348900d2, []int{8} + return fileDescriptor_chat_4f51bfe711d80421, []int{9} } func (m *UserSendMsgReq) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_UserSendMsgReq.Unmarshal(m, b) @@ -939,13 +994,19 @@ func (m *UserSendMsgReq) GetEx() string { return "" } +func (m *UserSendMsgReq) GetSendTime() int64 { + if m != nil { + return m.SendTime + } + return 0 +} + type UserSendMsgResp struct { ErrCode int32 `protobuf:"varint,1,opt,name=ErrCode" json:"ErrCode,omitempty"` ErrMsg string `protobuf:"bytes,2,opt,name=ErrMsg" json:"ErrMsg,omitempty"` ReqIdentifier int32 `protobuf:"varint,3,opt,name=ReqIdentifier" json:"ReqIdentifier,omitempty"` - SendTime int64 `protobuf:"varint,5,opt,name=SendTime" json:"SendTime,omitempty"` - ServerMsgID string `protobuf:"bytes,6,opt,name=ServerMsgID" json:"ServerMsgID,omitempty"` - ClientMsgID string `protobuf:"bytes,7,opt,name=ClientMsgID" json:"ClientMsgID,omitempty"` + ServerMsgID string `protobuf:"bytes,4,opt,name=ServerMsgID" json:"ServerMsgID,omitempty"` + ClientMsgID string `protobuf:"bytes,5,opt,name=ClientMsgID" json:"ClientMsgID,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -955,7 +1016,7 @@ func (m *UserSendMsgResp) Reset() { *m = UserSendMsgResp{} } func (m *UserSendMsgResp) String() string { return proto.CompactTextString(m) } func (*UserSendMsgResp) ProtoMessage() {} func (*UserSendMsgResp) Descriptor() ([]byte, []int) { - return fileDescriptor_chat_34beadf7348900d2, []int{9} + return fileDescriptor_chat_4f51bfe711d80421, []int{10} } func (m *UserSendMsgResp) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_UserSendMsgResp.Unmarshal(m, b) @@ -996,13 +1057,6 @@ func (m *UserSendMsgResp) GetReqIdentifier() int32 { return 0 } -func (m *UserSendMsgResp) GetSendTime() int64 { - if m != nil { - return m.SendTime - } - return 0 -} - func (m *UserSendMsgResp) GetServerMsgID() string { if m != nil { return m.ServerMsgID @@ -1022,6 +1076,7 @@ func init() { proto.RegisterType((*MsgSvrToPushSvrChatMsg)(nil), "pbChat.MsgSvrToPushSvrChatMsg") proto.RegisterType((*PullMessageReq)(nil), "pbChat.PullMessageReq") proto.RegisterType((*PullMessageResp)(nil), "pbChat.PullMessageResp") + proto.RegisterType((*PullMessageBySeqListReq)(nil), "pbChat.PullMessageBySeqListReq") proto.RegisterType((*GetNewSeqReq)(nil), "pbChat.GetNewSeqReq") proto.RegisterType((*GetNewSeqResp)(nil), "pbChat.GetNewSeqResp") proto.RegisterType((*GatherFormat)(nil), "pbChat.GatherFormat") @@ -1043,6 +1098,7 @@ const _ = grpc.SupportPackageIsVersion4 type ChatClient interface { GetNewSeq(ctx context.Context, in *GetNewSeqReq, opts ...grpc.CallOption) (*GetNewSeqResp, error) PullMessage(ctx context.Context, in *PullMessageReq, opts ...grpc.CallOption) (*PullMessageResp, error) + PullMessageBySeqList(ctx context.Context, in *PullMessageBySeqListReq, opts ...grpc.CallOption) (*PullMessageResp, error) UserSendMsg(ctx context.Context, in *UserSendMsgReq, opts ...grpc.CallOption) (*UserSendMsgResp, error) } @@ -1072,6 +1128,15 @@ func (c *chatClient) PullMessage(ctx context.Context, in *PullMessageReq, opts . return out, nil } +func (c *chatClient) PullMessageBySeqList(ctx context.Context, in *PullMessageBySeqListReq, opts ...grpc.CallOption) (*PullMessageResp, error) { + out := new(PullMessageResp) + err := grpc.Invoke(ctx, "/pbChat.Chat/PullMessageBySeqList", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *chatClient) UserSendMsg(ctx context.Context, in *UserSendMsgReq, opts ...grpc.CallOption) (*UserSendMsgResp, error) { out := new(UserSendMsgResp) err := grpc.Invoke(ctx, "/pbChat.Chat/UserSendMsg", in, out, c.cc, opts...) @@ -1086,6 +1151,7 @@ func (c *chatClient) UserSendMsg(ctx context.Context, in *UserSendMsgReq, opts . type ChatServer interface { GetNewSeq(context.Context, *GetNewSeqReq) (*GetNewSeqResp, error) PullMessage(context.Context, *PullMessageReq) (*PullMessageResp, error) + PullMessageBySeqList(context.Context, *PullMessageBySeqListReq) (*PullMessageResp, error) UserSendMsg(context.Context, *UserSendMsgReq) (*UserSendMsgResp, error) } @@ -1129,6 +1195,24 @@ func _Chat_PullMessage_Handler(srv interface{}, ctx context.Context, dec func(in return interceptor(ctx, in, info, handler) } +func _Chat_PullMessageBySeqList_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PullMessageBySeqListReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ChatServer).PullMessageBySeqList(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/pbChat.Chat/PullMessageBySeqList", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ChatServer).PullMessageBySeqList(ctx, req.(*PullMessageBySeqListReq)) + } + return interceptor(ctx, in, info, handler) +} + func _Chat_UserSendMsg_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(UserSendMsgReq) if err := dec(in); err != nil { @@ -1159,6 +1243,10 @@ var _Chat_serviceDesc = grpc.ServiceDesc{ MethodName: "PullMessage", Handler: _Chat_PullMessage_Handler, }, + { + MethodName: "PullMessageBySeqList", + Handler: _Chat_PullMessageBySeqList_Handler, + }, { MethodName: "UserSendMsg", Handler: _Chat_UserSendMsg_Handler, @@ -1168,66 +1256,69 @@ var _Chat_serviceDesc = grpc.ServiceDesc{ Metadata: "chat/chat.proto", } -func init() { proto.RegisterFile("chat/chat.proto", fileDescriptor_chat_34beadf7348900d2) } +func init() { proto.RegisterFile("chat/chat.proto", fileDescriptor_chat_4f51bfe711d80421) } -var fileDescriptor_chat_34beadf7348900d2 = []byte{ - // 919 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x5f, 0x6b, 0xe3, 0x46, - 0x10, 0xc7, 0x96, 0x2d, 0x5b, 0xe3, 0xbf, 0x59, 0xae, 0xe9, 0x12, 0x4a, 0x31, 0xe6, 0x5a, 0x4c, - 0x1f, 0x52, 0xb8, 0xbe, 0x1c, 0x2d, 0xf4, 0xe1, 0x62, 0x27, 0x15, 0x44, 0xb9, 0x20, 0xf9, 0x28, - 0xf4, 0x4d, 0xe7, 0xac, 0x15, 0x71, 0xb6, 0x24, 0xef, 0x2a, 0x69, 0xfa, 0xd2, 0xaf, 0xd0, 0x7e, - 0xa4, 0xbe, 0xf6, 0xa3, 0xf4, 0x2b, 0x14, 0x0a, 0x65, 0x77, 0x25, 0x6b, 0x57, 0x72, 0x1c, 0x13, - 0xc8, 0x4b, 0xc8, 0xfc, 0x34, 0xb3, 0xbb, 0x33, 0xf3, 0x9b, 0xdf, 0x18, 0x06, 0x8b, 0x5b, 0x3f, - 0xfd, 0x96, 0xff, 0x39, 0x4d, 0x68, 0x9c, 0xc6, 0xc8, 0x4c, 0x3e, 0x9e, 0xdd, 0xfa, 0xe9, 0xf8, - 0x8f, 0x06, 0x1c, 0xfd, 0xec, 0xcd, 0x63, 0x87, 0x05, 0xde, 0x3d, 0xe5, 0x90, 0xc3, 0x02, 0x74, - 0x0c, 0xa6, 0x47, 0xa2, 0x1b, 0x7b, 0x8a, 0x6b, 0xa3, 0xda, 0xc4, 0x72, 0x33, 0x8b, 0xe3, 0x2e, - 0x59, 0xdc, 0xdb, 0x53, 0x5c, 0x97, 0xb8, 0xb4, 0x10, 0x86, 0xd6, 0x59, 0x1c, 0xa5, 0x24, 0x4a, - 0xb1, 0x21, 0x3e, 0xe4, 0x26, 0x3a, 0x81, 0x36, 0x8f, 0x9d, 0x87, 0x6b, 0x82, 0x1b, 0xa3, 0xda, - 0xc4, 0x70, 0xb7, 0x36, 0x8f, 0x72, 0x58, 0x70, 0x4e, 0xe3, 0x35, 0x6e, 0x8e, 0x6a, 0x93, 0xa6, - 0x9b, 0x9b, 0xe8, 0x6b, 0xe8, 0x73, 0x2f, 0x42, 0xaf, 0xc2, 0xc5, 0xa7, 0x2b, 0x7f, 0x4d, 0xb0, - 0x29, 0x8e, 0x2d, 0xa1, 0xe8, 0x35, 0xf4, 0x24, 0x72, 0xee, 0x2f, 0xc8, 0x07, 0xf7, 0x12, 0xb7, - 0x84, 0x9b, 0x0e, 0xa2, 0x11, 0x74, 0xb2, 0xe7, 0xcc, 0x7f, 0x4b, 0x08, 0x6e, 0x8b, 0xbb, 0x54, - 0x88, 0x7b, 0x78, 0x84, 0xb1, 0x30, 0x8e, 0x84, 0x87, 0x25, 0x3d, 0x14, 0x88, 0x7b, 0xbc, 0x4f, - 0x08, 0xf5, 0xd3, 0x30, 0x8e, 0xec, 0x29, 0x06, 0x71, 0x8f, 0x0a, 0xa1, 0x57, 0xd0, 0x74, 0x58, - 0x60, 0x4f, 0x71, 0x47, 0x7c, 0x93, 0x06, 0x47, 0xe7, 0xf1, 0x27, 0x12, 0xe1, 0xae, 0x44, 0x85, - 0x21, 0x4e, 0x5b, 0x2e, 0x57, 0x61, 0x44, 0xec, 0x68, 0x19, 0xe3, 0x5e, 0x76, 0x5a, 0x01, 0xf1, - 0xda, 0xbc, 0x4f, 0xf8, 0xc9, 0x0c, 0xf7, 0x65, 0x45, 0x33, 0x13, 0x7d, 0x09, 0x70, 0xbd, 0xf2, - 0xd3, 0x65, 0x4c, 0xd7, 0xf6, 0x14, 0x0f, 0xc4, 0x53, 0x15, 0x04, 0x7d, 0x01, 0xd6, 0x79, 0x4c, - 0x17, 0xe4, 0x32, 0x64, 0x29, 0x1e, 0x8e, 0x8c, 0x89, 0xe5, 0x16, 0x80, 0xa8, 0xc5, 0x2a, 0x24, - 0x51, 0x2a, 0xdf, 0x7a, 0x24, 0x6f, 0x56, 0xa0, 0xf1, 0xbf, 0x06, 0x1c, 0x4b, 0x36, 0xcc, 0xe3, - 0xeb, 0x3b, 0x76, 0xfb, 0x22, 0xb4, 0xc0, 0xd0, 0xe2, 0x3e, 0x1e, 0xd9, 0x64, 0xac, 0xc8, 0x4d, - 0x8d, 0x30, 0xcd, 0xc7, 0x09, 0x63, 0x3e, 0x45, 0x98, 0xd6, 0x61, 0x84, 0x69, 0x1f, 0x40, 0x18, - 0xeb, 0x49, 0xc2, 0xc0, 0x93, 0x84, 0xe9, 0xec, 0x21, 0x4c, 0x57, 0x25, 0xcc, 0x4b, 0x52, 0xa3, - 0xd4, 0xfc, 0x61, 0xb5, 0xf9, 0xbf, 0x43, 0xff, 0xfa, 0x6e, 0xb5, 0x72, 0x08, 0x63, 0x7e, 0x40, - 0x5c, 0xb2, 0xe1, 0xbd, 0xfd, 0xc0, 0x08, 0x2d, 0x7a, 0x2e, 0x2d, 0xd9, 0xa7, 0xcd, 0x3b, 0x12, - 0x84, 0x91, 0xe8, 0xba, 0xe8, 0x93, 0xb4, 0x25, 0x4f, 0x36, 0xb3, 0xe8, 0x46, 0xb4, 0xdd, 0x70, - 0x33, 0xab, 0x5c, 0x93, 0x46, 0xa5, 0x26, 0xe3, 0x7f, 0x6a, 0x30, 0xd0, 0x1e, 0xc0, 0x12, 0x9e, - 0xef, 0x8c, 0xd2, 0xb3, 0xf8, 0x86, 0x88, 0x27, 0x34, 0xdd, 0xdc, 0xe4, 0xf7, 0xcc, 0x28, 0x75, - 0x58, 0x90, 0xf3, 0x4e, 0x5a, 0x1c, 0x77, 0xfc, 0x07, 0x4e, 0xae, 0xec, 0x7e, 0x69, 0x09, 0x3c, - 0x8c, 0x0a, 0xd2, 0x65, 0x16, 0xfa, 0x1e, 0x7a, 0x5e, 0x18, 0x05, 0x2b, 0xc2, 0x73, 0xe3, 0xc7, - 0x35, 0x47, 0xc6, 0xa4, 0xf3, 0xe6, 0xd5, 0xa9, 0x14, 0xc9, 0xd3, 0x0b, 0x3f, 0xbd, 0x25, 0xf4, - 0x3c, 0xa6, 0x6b, 0x3f, 0x75, 0x75, 0x57, 0xf4, 0x16, 0xba, 0x17, 0x34, 0xbe, 0x4b, 0xf2, 0x50, - 0x73, 0x4f, 0xa8, 0xe6, 0x39, 0xfe, 0x09, 0xba, 0x17, 0x24, 0xbd, 0x22, 0xbf, 0x7a, 0x64, 0xb3, - 0xaf, 0xd2, 0xa5, 0xaa, 0xd5, 0xab, 0x55, 0xf3, 0xa0, 0xa7, 0x9c, 0xf4, 0xac, 0x92, 0x0d, 0xc1, - 0x28, 0xea, 0xc5, 0xff, 0x1d, 0xcf, 0xa0, 0xab, 0x3e, 0x1e, 0xf5, 0xa1, 0xbe, 0x7d, 0x5a, 0xdd, - 0x9e, 0xa2, 0xaf, 0xa0, 0x21, 0x24, 0xa6, 0x2e, 0x12, 0x3e, 0xca, 0x13, 0xe6, 0x13, 0x29, 0xb3, - 0x15, 0x9f, 0xc7, 0xff, 0xd5, 0xc1, 0xda, 0x62, 0xcf, 0x51, 0x90, 0x7c, 0xe2, 0x0d, 0x7d, 0xe2, - 0x4b, 0x33, 0xda, 0x78, 0x64, 0x46, 0xe9, 0xbd, 0x28, 0xb6, 0x3d, 0x15, 0x62, 0x62, 0xb9, 0x2a, - 0xa4, 0xea, 0x93, 0xa9, 0xeb, 0x53, 0x56, 0x8e, 0xd6, 0xb6, 0x1c, 0x9a, 0x2e, 0xb5, 0x4b, 0xba, - 0xf4, 0x0d, 0x0c, 0xa5, 0x80, 0x28, 0xd3, 0x27, 0x45, 0xa3, 0x82, 0xef, 0x50, 0x2a, 0x38, 0x4c, - 0xa9, 0x3a, 0x8f, 0x29, 0x95, 0x32, 0xd1, 0xdd, 0xea, 0x44, 0xff, 0xd9, 0x80, 0x3e, 0x27, 0x12, - 0x8f, 0x73, 0x58, 0xc0, 0x89, 0xf6, 0x1a, 0x7a, 0x2e, 0xd9, 0xd8, 0x37, 0x24, 0x4a, 0xc3, 0x65, - 0x48, 0x68, 0xc6, 0x11, 0x1d, 0x2c, 0x36, 0x57, 0x5d, 0xdd, 0x5c, 0x45, 0x03, 0x0d, 0xad, 0x81, - 0x4f, 0x8e, 0xf6, 0x8e, 0xc4, 0x9b, 0x87, 0x25, 0x6e, 0xee, 0x4a, 0x5c, 0x97, 0xba, 0xd6, 0x2e, - 0xa9, 0x53, 0x05, 0xba, 0x5d, 0x15, 0x68, 0x85, 0x5a, 0xd6, 0x5e, 0x6a, 0x41, 0x95, 0x5a, 0x05, - 0x5d, 0x3b, 0x1a, 0x5d, 0xb5, 0xdd, 0xdb, 0x2d, 0xef, 0x5e, 0x85, 0x6e, 0xbd, 0xca, 0x3a, 0x7c, - 0x44, 0xd2, 0x4b, 0x0d, 0x1e, 0x54, 0x1a, 0x9c, 0x2d, 0x8c, 0xcb, 0x7c, 0x61, 0x0c, 0xb7, 0x0b, - 0x23, 0x87, 0xf8, 0xe4, 0xce, 0x1e, 0xb2, 0x55, 0x5f, 0x9f, 0x3d, 0x8c, 0xff, 0xae, 0xc1, 0x40, - 0xa3, 0xc4, 0xb3, 0x14, 0xa3, 0xc2, 0x22, 0x63, 0x17, 0x8b, 0xf6, 0xad, 0xf3, 0xd2, 0x80, 0x9a, - 0xd5, 0x01, 0x2d, 0x65, 0xdf, 0xaa, 0x64, 0xff, 0xe6, 0xaf, 0x1a, 0x34, 0xb8, 0xee, 0xa0, 0xb7, - 0x60, 0x6d, 0x35, 0x10, 0x15, 0xf2, 0xab, 0x08, 0xec, 0xc9, 0x67, 0x3b, 0x50, 0x96, 0xa0, 0x1f, - 0xa1, 0xa3, 0xac, 0x1c, 0x74, 0x9c, 0x7b, 0xe9, 0x8b, 0xf0, 0xe4, 0xf3, 0x9d, 0xb8, 0x8c, 0x57, - 0xaa, 0x59, 0xc4, 0xeb, 0x53, 0x57, 0xc4, 0x97, 0x4a, 0xff, 0xae, 0xf7, 0x4b, 0x87, 0xff, 0x30, - 0xff, 0x41, 0x7e, 0xfe, 0x68, 0x8a, 0x1f, 0xe8, 0xdf, 0xfd, 0x1f, 0x00, 0x00, 0xff, 0xff, 0xfe, - 0xa0, 0x14, 0x71, 0xb3, 0x0b, 0x00, 0x00, +var fileDescriptor_chat_4f51bfe711d80421 = []byte{ + // 970 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x57, 0xdd, 0x6a, 0xe3, 0x46, + 0x14, 0xc6, 0x92, 0x7f, 0xa2, 0x23, 0xff, 0x65, 0x48, 0xb3, 0x43, 0x28, 0xad, 0x11, 0xdb, 0x62, + 0x7a, 0x91, 0xc2, 0xf6, 0x66, 0x69, 0x61, 0x2f, 0x12, 0x3b, 0xa9, 0x20, 0xce, 0x06, 0xc9, 0x4b, + 0xa1, 0x77, 0x5a, 0x67, 0xac, 0x88, 0xb5, 0x25, 0x79, 0x46, 0x49, 0xb3, 0x37, 0x7d, 0x85, 0x42, + 0x9f, 0xa3, 0x7d, 0xb0, 0xbe, 0x42, 0xa1, 0x50, 0x66, 0x46, 0xb2, 0x46, 0x3f, 0x4e, 0x42, 0x96, + 0xbd, 0x59, 0xf6, 0xfb, 0x74, 0xe6, 0xe7, 0x9c, 0xf3, 0xcd, 0x77, 0x62, 0x18, 0x2c, 0x6e, 0xbc, + 0xe4, 0x7b, 0xfe, 0xcf, 0x71, 0x4c, 0xa3, 0x24, 0x42, 0xed, 0xf8, 0xfd, 0xe9, 0x8d, 0x97, 0x58, + 0x7f, 0x34, 0x61, 0xff, 0x17, 0x77, 0x1e, 0xcd, 0x98, 0xef, 0xde, 0x51, 0x4e, 0xcd, 0x98, 0x8f, + 0x0e, 0xa1, 0xed, 0x92, 0xf0, 0xda, 0x9e, 0xe0, 0xc6, 0xa8, 0x31, 0x36, 0x9c, 0x14, 0x71, 0xde, + 0x21, 0x8b, 0x3b, 0x7b, 0x82, 0x35, 0xc9, 0x4b, 0x84, 0x30, 0x74, 0x4e, 0xa3, 0x30, 0x21, 0x61, + 0x82, 0x75, 0xf1, 0x21, 0x83, 0xe8, 0x08, 0xf6, 0xf8, 0xda, 0x79, 0xb0, 0x26, 0xb8, 0x39, 0x6a, + 0x8c, 0x75, 0x67, 0x8b, 0xf9, 0xaa, 0x19, 0xf3, 0xcf, 0x68, 0xb4, 0xc6, 0xad, 0x51, 0x63, 0xdc, + 0x72, 0x32, 0x88, 0xbe, 0x85, 0x3e, 0x8f, 0x22, 0xf4, 0x32, 0x58, 0x7c, 0xb8, 0xf4, 0xd6, 0x04, + 0xb7, 0xc5, 0xb6, 0x25, 0x16, 0xbd, 0x84, 0x9e, 0x64, 0xce, 0xbc, 0x05, 0x79, 0xe7, 0x5c, 0xe0, + 0x8e, 0x08, 0x2b, 0x92, 0x68, 0x04, 0x66, 0x7a, 0x9d, 0xf9, 0xc7, 0x98, 0xe0, 0x3d, 0x71, 0x96, + 0x4a, 0xf1, 0x08, 0x97, 0x30, 0x16, 0x44, 0xa1, 0x88, 0x30, 0x64, 0x84, 0x42, 0xf1, 0x88, 0xb7, + 0x31, 0xa1, 0x5e, 0x12, 0x44, 0xa1, 0x3d, 0xc1, 0x20, 0xce, 0x51, 0x29, 0x74, 0x00, 0xad, 0x19, + 0xf3, 0xed, 0x09, 0x36, 0xc5, 0x37, 0x09, 0x38, 0x3b, 0x8f, 0x3e, 0x90, 0x10, 0x77, 0x25, 0x2b, + 0x80, 0xd8, 0x6d, 0xb9, 0x5c, 0x05, 0x21, 0xb1, 0xc3, 0x65, 0x84, 0x7b, 0xe9, 0x6e, 0x39, 0xc5, + 0x6b, 0xf3, 0x36, 0xe6, 0x3b, 0x33, 0xdc, 0x97, 0x15, 0x4d, 0x21, 0xfa, 0x0a, 0xe0, 0x6a, 0xe5, + 0x25, 0xcb, 0x88, 0xae, 0xed, 0x09, 0x1e, 0x88, 0xab, 0x2a, 0x0c, 0xfa, 0x12, 0x8c, 0xb3, 0x88, + 0x2e, 0xc8, 0x45, 0xc0, 0x12, 0x3c, 0x1c, 0xe9, 0x63, 0xc3, 0xc9, 0x09, 0x51, 0x8b, 0x55, 0x40, + 0xc2, 0x44, 0xde, 0x75, 0x5f, 0x9e, 0xac, 0x50, 0xd6, 0xbf, 0x3a, 0x1c, 0x4a, 0x35, 0xcc, 0xa3, + 0xab, 0x5b, 0x76, 0xf3, 0x59, 0x64, 0x81, 0xa1, 0xc3, 0x63, 0x5c, 0xb2, 0x49, 0x55, 0x91, 0xc1, + 0x82, 0x60, 0x5a, 0xbb, 0x05, 0xd3, 0x7e, 0x4c, 0x30, 0x9d, 0xa7, 0x09, 0x66, 0xef, 0x09, 0x82, + 0x31, 0x1e, 0x15, 0x0c, 0x3c, 0x2a, 0x18, 0xf3, 0x01, 0xc1, 0x74, 0x55, 0xc1, 0x7c, 0x4e, 0x69, + 0x94, 0x9a, 0x3f, 0xac, 0x36, 0xff, 0x77, 0xe8, 0x5f, 0xdd, 0xae, 0x56, 0x33, 0xc2, 0x98, 0xe7, + 0x13, 0x87, 0x6c, 0x78, 0x6f, 0xdf, 0x31, 0x42, 0xf3, 0x9e, 0x4b, 0x24, 0xfb, 0xb4, 0x39, 0x21, + 0x7e, 0x10, 0x8a, 0xae, 0x8b, 0x3e, 0x49, 0x2c, 0x75, 0xb2, 0x99, 0x86, 0xd7, 0xa2, 0xed, 0xba, + 0x93, 0xa2, 0x72, 0x4d, 0x9a, 0x95, 0x9a, 0x58, 0xff, 0x34, 0x60, 0x50, 0xb8, 0x00, 0x8b, 0x79, + 0xbe, 0x53, 0x4a, 0x4f, 0xa3, 0x6b, 0x22, 0xae, 0xd0, 0x72, 0x32, 0xc8, 0xcf, 0x99, 0x52, 0x3a, + 0x63, 0x7e, 0xa6, 0x3b, 0x89, 0x38, 0x3f, 0xf3, 0xee, 0xb9, 0xb8, 0xd2, 0xf3, 0x25, 0x12, 0x7c, + 0x10, 0xe6, 0xa2, 0x4b, 0x11, 0xfa, 0x11, 0x7a, 0x6e, 0x10, 0xfa, 0x2b, 0xc2, 0x73, 0xe3, 0xdb, + 0xb5, 0x46, 0xfa, 0xd8, 0x7c, 0x75, 0x70, 0x2c, 0x4d, 0xf2, 0xf8, 0xdc, 0x4b, 0x6e, 0x08, 0x3d, + 0x8b, 0xe8, 0xda, 0x4b, 0x9c, 0x62, 0x28, 0x7a, 0x0d, 0xdd, 0x73, 0x1a, 0xdd, 0xc6, 0xd9, 0xd2, + 0xf6, 0x03, 0x4b, 0x0b, 0x91, 0xd6, 0x1a, 0x5e, 0x28, 0xa9, 0x9e, 0x7c, 0x74, 0xc9, 0x86, 0x3f, + 0xd1, 0x87, 0x8a, 0x5e, 0x2a, 0xa0, 0x56, 0x15, 0x15, 0x86, 0x0e, 0x93, 0xfb, 0x60, 0x7d, 0xa4, + 0xf3, 0x87, 0x95, 0x42, 0xeb, 0x67, 0xe8, 0x9e, 0x93, 0xe4, 0x92, 0xfc, 0xe6, 0x92, 0xcd, 0x27, + 0x9d, 0x61, 0xb9, 0xd0, 0x53, 0x76, 0x7a, 0x56, 0x87, 0x86, 0xa0, 0xe7, 0xed, 0xe1, 0xff, 0xb5, + 0xa6, 0xd0, 0x55, 0x6b, 0x85, 0xfa, 0xa0, 0x6d, 0xaf, 0xa6, 0xd9, 0x13, 0xf4, 0x0d, 0x34, 0x45, + 0x56, 0x9a, 0xa8, 0xef, 0x7e, 0x56, 0x5f, 0x6e, 0x00, 0xb2, 0xb8, 0xe2, 0xb3, 0xf5, 0x9f, 0x06, + 0xc6, 0x96, 0x7b, 0x8e, 0x61, 0x65, 0x06, 0xa3, 0x17, 0x0d, 0xa6, 0x64, 0x09, 0xcd, 0x1d, 0x96, + 0x40, 0xef, 0x44, 0x6f, 0xed, 0x89, 0xf0, 0x2e, 0xc3, 0x51, 0x29, 0xd5, 0x0e, 0xdb, 0x45, 0x3b, + 0x4c, 0xcb, 0xd1, 0xd9, 0x96, 0xa3, 0x60, 0x83, 0x7b, 0x25, 0x1b, 0xfc, 0x0e, 0x86, 0xd2, 0xaf, + 0x94, 0xc7, 0x2e, 0x3d, 0xaa, 0xc2, 0xd7, 0x18, 0x23, 0x3c, 0xcd, 0x18, 0xcd, 0x5d, 0xc6, 0xa8, + 0x18, 0x48, 0xb7, 0x6a, 0x20, 0x7f, 0x35, 0xa1, 0xcf, 0x85, 0xc4, 0xd7, 0xcd, 0x98, 0xcf, 0x85, + 0xf6, 0x12, 0x7a, 0x0e, 0xd9, 0xd8, 0xd7, 0x24, 0x4c, 0x82, 0x65, 0x40, 0x68, 0xaa, 0x91, 0x22, + 0x99, 0x0f, 0x4a, 0x4d, 0x1d, 0x94, 0x79, 0x03, 0xf5, 0x42, 0x03, 0x1f, 0x75, 0x92, 0x9a, 0xc4, + 0x5b, 0x4f, 0x4b, 0xbc, 0x5d, 0x97, 0x78, 0xd1, 0x59, 0x3b, 0x75, 0xce, 0xaa, 0xce, 0x83, 0xbd, + 0xea, 0x3c, 0x50, 0xa4, 0x65, 0x3c, 0x28, 0x2d, 0xa8, 0x4a, 0x2b, 0x97, 0xab, 0x59, 0x90, 0x6b, + 0x61, 0xd4, 0x77, 0xcb, 0xa3, 0x5e, 0x91, 0x5b, 0xaf, 0x32, 0x7d, 0x77, 0x4c, 0x90, 0x52, 0x83, + 0x07, 0x95, 0x06, 0xa7, 0xf3, 0xe9, 0x22, 0x9b, 0x4f, 0xc3, 0xed, 0x7c, 0xca, 0x28, 0xfe, 0x72, + 0xa7, 0xf7, 0xe9, 0x5f, 0x16, 0xda, 0xf4, 0x9e, 0x4b, 0x99, 0x65, 0x52, 0x46, 0x52, 0xca, 0x19, + 0xb6, 0xfe, 0x6e, 0xc0, 0xa0, 0x20, 0x97, 0x67, 0xb9, 0x49, 0x45, 0x61, 0x7a, 0x9d, 0xc2, 0x4a, + 0x0f, 0xb4, 0x59, 0x7d, 0xa0, 0xa5, 0xec, 0x5b, 0x95, 0xec, 0x5f, 0xfd, 0xa9, 0x41, 0x93, 0xfb, + 0x0e, 0x7a, 0x0d, 0xc6, 0xd6, 0x03, 0x51, 0xee, 0xf6, 0x8a, 0xc1, 0x1e, 0x7d, 0x51, 0xc3, 0xb2, + 0x18, 0xbd, 0x01, 0x53, 0xb1, 0x7d, 0x74, 0x98, 0x45, 0x15, 0xe7, 0xee, 0xd1, 0x8b, 0x5a, 0x9e, + 0xc5, 0xe8, 0x0a, 0x0e, 0xea, 0xc6, 0x06, 0xfa, 0xba, 0x66, 0x81, 0x3a, 0x54, 0x76, 0xef, 0xf8, + 0x06, 0x4c, 0xa5, 0x07, 0xf9, 0x8d, 0x8a, 0xef, 0x38, 0x5f, 0x5f, 0x6a, 0xd8, 0x49, 0xef, 0x57, + 0x93, 0xff, 0xb2, 0xf8, 0x49, 0x7e, 0x7e, 0xdf, 0x16, 0xbf, 0x30, 0x7e, 0xf8, 0x3f, 0x00, 0x00, + 0xff, 0xff, 0x6c, 0xad, 0x53, 0x41, 0x74, 0x0c, 0x00, 0x00, } diff --git a/src/proto/chat/chat.proto b/src/proto/chat/chat.proto index 1273682c7..6faa654c2 100644 --- a/src/proto/chat/chat.proto +++ b/src/proto/chat/chat.proto @@ -58,6 +58,11 @@ message PullMessageResp { repeated GatherFormat SingleUserMsg = 5; repeated GatherFormat GroupUserMsg = 6; } +message PullMessageBySeqListReq{ + string UserID = 1; + string OperationID = 2; + repeated int64 seqList =3; +} message GetNewSeqReq { string UserID = 1; string OperationID = 2; @@ -119,6 +124,7 @@ message UserSendMsgReq { string ClientMsgID = 15; string OffLineInfo = 16; string Ex = 17; + int64 sendTime = 18; } @@ -127,13 +133,13 @@ message UserSendMsgResp { int32 ErrCode = 1; string ErrMsg = 2; int32 ReqIdentifier = 3; - int64 SendTime = 5; - string ServerMsgID = 6; - string ClientMsgID = 7; + string ServerMsgID = 4; + string ClientMsgID = 5; } service Chat { rpc GetNewSeq(GetNewSeqReq) returns(GetNewSeqResp); rpc PullMessage(PullMessageReq) returns(PullMessageResp); + rpc PullMessageBySeqList(PullMessageBySeqListReq) returns(PullMessageResp); rpc UserSendMsg(UserSendMsgReq) returns(UserSendMsgResp); } diff --git a/src/push/logic/push_to_client.go b/src/push/logic/push_to_client.go index c52ff0a61..7dad81d5b 100644 --- a/src/push/logic/push_to_client.go +++ b/src/push/logic/push_to_client.go @@ -51,7 +51,7 @@ func MsgToUser(sendPbData *pbRelay.MsgToUserReq, OfflineInfo, Options string) { wsResult = append(wsResult, reply.Resp...) } } - log.InfoByKv("push_result", sendPbData.OperationID, "result", wsResult) + log.InfoByKv("push_result", sendPbData.OperationID, "result", wsResult, "sendData", sendPbData) if isOfflinePush { for _, t := range pushTerminal { diff --git a/src/rpc/chat/chat/pull_message.go b/src/rpc/chat/chat/pull_message.go index 744ae1a36..359b3709f 100644 --- a/src/rpc/chat/chat/pull_message.go +++ b/src/rpc/chat/chat/pull_message.go @@ -41,7 +41,7 @@ func (rpc *rpcChat) PullMessage(_ context.Context, in *pbMsg.PullMessageReq) (*p resp := new(pbMsg.PullMessageResp) var respSingleMsgFormat []*pbMsg.GatherFormat var respGroupMsgFormat []*pbMsg.GatherFormat - SingleMsgFormat, GroupMsgFormat, MaxSeq, MinSeq, err := commonDB.DB.GetUserChat(in.UserID, in.SeqBegin, in.SeqEnd) + SingleMsgFormat, GroupMsgFormat, MaxSeq, MinSeq, err := commonDB.DB.GetMsgBySeqRange(in.UserID, in.SeqBegin, in.SeqEnd) if err != nil { log.ErrorByKv("pullMsg data error", in.OperationID, in.String()) resp.ErrCode = 1 @@ -59,6 +59,30 @@ func (rpc *rpcChat) PullMessage(_ context.Context, in *pbMsg.PullMessageReq) (*p GroupUserMsg: respGroupMsgFormat, }, nil } +func (rpc *rpcChat) PullMessageBySeqList(_ context.Context, in *pbMsg.PullMessageBySeqListReq) (*pbMsg.PullMessageResp, error) { + log.NewInfo(in.OperationID, "rpc PullMessageBySeqList is arriving", in.String()) + resp := new(pbMsg.PullMessageResp) + var respSingleMsgFormat []*pbMsg.GatherFormat + var respGroupMsgFormat []*pbMsg.GatherFormat + SingleMsgFormat, GroupMsgFormat, MaxSeq, MinSeq, err := commonDB.DB.GetMsgBySeqList(in.UserID, in.SeqList) + if err != nil { + log.ErrorByKv("PullMessageBySeqList data error", in.OperationID, in.String()) + resp.ErrCode = 1 + resp.ErrMsg = err.Error() + return resp, nil + } + respSingleMsgFormat = singleMsgHandleByUser(SingleMsgFormat, in.UserID) + respGroupMsgFormat = groupMsgHandleByUser(GroupMsgFormat) + return &pbMsg.PullMessageResp{ + ErrCode: 0, + ErrMsg: "", + MaxSeq: MaxSeq, + MinSeq: MinSeq, + SingleUserMsg: respSingleMsgFormat, + GroupUserMsg: respGroupMsgFormat, + }, nil + panic("implement me") +} func singleMsgHandleByUser(allMsg []*pbMsg.MsgFormat, ownerId string) []*pbMsg.GatherFormat { var userid string var respMsgFormat []*pbMsg.GatherFormat diff --git a/src/rpc/chat/chat/send_msg.go b/src/rpc/chat/chat/send_msg.go index 21a1a7be8..a11f929e6 100644 --- a/src/rpc/chat/chat/send_msg.go +++ b/src/rpc/chat/chat/send_msg.go @@ -43,9 +43,11 @@ type MsgCallBackResp struct { func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) { replay := pbChat.UserSendMsgResp{} log.InfoByKv("sendMsg", pb.OperationID, "args", pb.String()) + time := utils.GetCurrentTimestampByMill() if !utils.VerifyToken(pb.Token, pb.SendID) { return returnMsg(&replay, pb, http.StatusUnauthorized, "token validate err,not authorized", "", 0) } + log.NewInfo(pb.OperationID, "VerifyToken cost time ", utils.GetCurrentTimestampByMill()-time) serverMsgID := GetMsgID(pb.SendID) pbData := pbChat.WSToMsgSvrChatMsg{} pbData.MsgFrom = pb.MsgFrom @@ -64,7 +66,7 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (* pbData.MsgID = serverMsgID pbData.OperationID = pb.OperationID pbData.Token = pb.Token - pbData.SendTime = utils.GetCurrentTimestampByNano() + pbData.SendTime = pb.SendTime m := MsgCallBackResp{} if config.Config.MessageCallBack.CallbackSwitch { bMsg, err := http2.Post(config.Config.MessageCallBack.CallbackUrl, MsgCallBackReq{ @@ -88,85 +90,79 @@ func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (* return returnMsg(&replay, pb, m.ResponseErrCode, m.ErrMsg, "", 0) } else { pbData.Content = m.ResponseResult.ModifiedMsg - err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID) - err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID) - if err1 != nil || err2 != nil { - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) - } - return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) } } - } else { - switch pbData.SessionType { - case constant.SingleChatType: - err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID) - err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID) - if err1 != nil || err2 != nil { - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) - } - return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) - case constant.GroupChatType: - etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName) - client := pbGroup.NewGroupClient(etcdConn) - req := &pbGroup.GetGroupAllMemberReq{ - GroupID: pbData.RecvID, - Token: pbData.Token, - OperationID: pbData.OperationID, - } - reply, err := client.GetGroupAllMember(context.Background(), req) + } + switch pbData.SessionType { + case constant.SingleChatType: + time := utils.GetCurrentTimestampByMill() + err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID) + err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID) + log.NewInfo(pb.OperationID, "send kafka cost time ", utils.GetCurrentTimestampByMill()-time) + if err1 != nil || err2 != nil { + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } + return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) + case constant.GroupChatType: + etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImGroupName) + client := pbGroup.NewGroupClient(etcdConn) + req := &pbGroup.GetGroupAllMemberReq{ + GroupID: pbData.RecvID, + Token: pbData.Token, + OperationID: pbData.OperationID, + } + reply, err := client.GetGroupAllMember(context.Background(), req) + if err != nil { + log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", err.Error()) + return returnMsg(&replay, pb, 201, err.Error(), "", 0) + } + if reply.ErrorCode != 0 { + log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", reply.ErrorMsg) + return returnMsg(&replay, pb, reply.ErrorCode, reply.ErrorMsg, "", 0) + } + var addUidList []string + switch pbData.ContentType { + case constant.KickGroupMemberTip: + var notification content_struct.NotificationContent + var kickContent group.KickGroupMemberReq + err := utils.JsonStringToStruct(pbData.Content, ¬ification) if err != nil { - log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", err.Error()) - return returnMsg(&replay, pb, 201, err.Error(), "", 0) - } - if reply.ErrorCode != 0 { - log.Error(pbData.Token, pbData.OperationID, "rpc send_msg getGroupInfo failed, err = %s", reply.ErrorMsg) - return returnMsg(&replay, pb, reply.ErrorCode, reply.ErrorMsg, "", 0) - } - var addUidList []string - switch pbData.ContentType { - case constant.KickGroupMemberTip: - var notification content_struct.NotificationContent - var kickContent group.KickGroupMemberReq - err := utils.JsonStringToStruct(pbData.Content, ¬ification) + log.ErrorByKv("json unmarshall err", pbData.OperationID, "err", err.Error()) + return returnMsg(&replay, pb, 200, err.Error(), "", 0) + } else { + err := utils.JsonStringToStruct(notification.Detail, &kickContent) if err != nil { log.ErrorByKv("json unmarshall err", pbData.OperationID, "err", err.Error()) return returnMsg(&replay, pb, 200, err.Error(), "", 0) - } else { - err := utils.JsonStringToStruct(notification.Detail, &kickContent) - if err != nil { - log.ErrorByKv("json unmarshall err", pbData.OperationID, "err", err.Error()) - return returnMsg(&replay, pb, 200, err.Error(), "", 0) - } - for _, v := range kickContent.UidListInfo { - addUidList = append(addUidList, v.UserId) - } } - case constant.QuitGroupTip: - addUidList = append(addUidList, pbData.SendID) - default: - } - groupID := pbData.RecvID - for i, v := range reply.MemberList { - pbData.RecvID = v.UserId + " " + groupID - err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i)) - if err != nil { - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + for _, v := range kickContent.UidListInfo { + addUidList = append(addUidList, v.UserId) } } - for i, v := range addUidList { - pbData.RecvID = v + " " + groupID - err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1)) - if err != nil { - return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) - } - } - return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) + case constant.QuitGroupTip: + addUidList = append(addUidList, pbData.SendID) default: - } + groupID := pbData.RecvID + for i, v := range reply.MemberList { + pbData.RecvID = v.UserId + " " + groupID + err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i)) + if err != nil { + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } + } + for i, v := range addUidList { + pbData.RecvID = v + " " + groupID + err := rpc.sendMsgToKafka(&pbData, utils.IntToString(i+1)) + if err != nil { + return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) + } + } + return returnMsg(&replay, pb, 0, "", serverMsgID, pbData.SendTime) + default: + return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0) } - return returnMsg(&replay, pb, 203, "unkonwn sessionType", "", 0) } func (rpc *rpcChat) sendMsgToKafka(m *pbChat.WSToMsgSvrChatMsg, key string) error { @@ -186,6 +182,5 @@ func returnMsg(replay *pbChat.UserSendMsgResp, pb *pbChat.UserSendMsgReq, errCod replay.ReqIdentifier = pb.ReqIdentifier replay.ClientMsgID = pb.ClientMsgID replay.ServerMsgID = serverMsgID - replay.SendTime = sendTime return replay, nil } diff --git a/src/rpc/group/group/group.go b/src/rpc/group/group/group.go index 749932194..87f904802 100644 --- a/src/rpc/group/group/group.go +++ b/src/rpc/group/group/group.go @@ -70,6 +70,12 @@ func (s *groupServer) InviteUserToGroup(ctx context.Context, req *pbGroup.Invite return &pbGroup.InviteUserToGroupResp{ErrorCode: config.ErrAccess.ErrCode, ErrorMsg: config.ErrAccess.ErrMsg}, nil } + groupInfoFromMysql, err := imdb.FindGroupInfoByGroupId(req.GroupID) + if err != nil || groupInfoFromMysql == nil { + log.NewError(req.OperationID, "get group info error", req.GroupID, req.UidList) + return &pbGroup.InviteUserToGroupResp{ErrorCode: config.ErrAccess.ErrCode, ErrorMsg: config.ErrAccess.ErrMsg}, nil + } + // //from User: invite: applicant //to user: invite: invited diff --git a/src/utils/strings.go b/src/utils/strings.go index f676da884..c6d676dff 100644 --- a/src/utils/strings.go +++ b/src/utils/strings.go @@ -62,3 +62,6 @@ func GetMsgID(sendID string) string { func int64ToString(i int64) string { return strconv.FormatInt(i, 10) } +func Int64ToString(i int64) string { + return strconv.FormatInt(i, 10) +}