From 21051900f9bb14a4fe611b5966af78f1cc5bcd66 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 15 Jun 2022 17:33:48 +0800 Subject: [PATCH 1/4] hash userID --- internal/demo/register/set_password.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/internal/demo/register/set_password.go b/internal/demo/register/set_password.go index fde97a634..545a489d3 100644 --- a/internal/demo/register/set_password.go +++ b/internal/demo/register/set_password.go @@ -11,7 +11,10 @@ import ( "Open_IM/pkg/utils" "encoding/json" "github.com/gin-gonic/gin" + "math/big" "net/http" + "strconv" + "time" ) type ParamsSetPassword struct { @@ -53,7 +56,13 @@ func SetPassword(c *gin.Context) { return } } - userID := utils.Base64Encode(account) + //userID := utils.Base64Encode(account) + + userID := utils.Md5(params.OperationID + strconv.FormatInt(time.Now().UnixNano(), 10)) + bi := big.NewInt(0) + bi.SetString(userID[0:8], 16) + userID = bi.String() + url := config.Config.Demo.ImAPIURL + "/auth/user_register" openIMRegisterReq := api.UserRegisterReq{} openIMRegisterReq.OperationID = params.OperationID From f4f6ea681e596d0a05b4cb58c80ba2c76b00b566 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Wed, 15 Jun 2022 18:02:48 +0800 Subject: [PATCH 2/4] Optimization for log --- internal/msg_gateway/gate/logic.go | 32 ++++------ internal/msg_gateway/gate/ws_server.go | 87 +++++++++++++------------- internal/rpc/msg/send_msg.go | 2 +- pkg/common/kafka/producer.go | 9 ++- pkg/common/token_verify/jwt_token.go | 10 ++- 5 files changed, 70 insertions(+), 70 deletions(-) diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index e12062338..238d73e91 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -19,19 +19,12 @@ import ( ) func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { - //ws online debug data - //{"ReqIdentifier":1001,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0} - //{"ReqIdentifier":1002,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0,"SeqBegin":1,"SeqEnd":6} - //{"ReqIdentifier":1003,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b", - //"RecvID":"a87ff679a2f3e71d9181a67b7542122c","ClientMsgID":"2343","Time":"147878787","OperationID": - //"123","MsgIncr":0,"SubMsgType":101,"MsgType":100,"MsgFrom":1,"Content":"sdfsdf"} b := bytes.NewBuffer(binaryMsg) m := Req{} dec := gob.NewDecoder(b) err := dec.Decode(&m) if err != nil { log.NewError("", "ws Decode err", err.Error()) - ws.sendErrMsg(conn, 200, err.Error(), constant.WSDataError, "", "") err = conn.Close() if err != nil { log.NewError("", "ws close err", err.Error()) @@ -43,27 +36,29 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { ws.sendErrMsg(conn, 201, err.Error(), m.ReqIdentifier, m.MsgIncr, m.OperationID) return } - //if !utils.VerifyToken(m.Token, m.SendID) { - // ws.sendErrMsg(conn, 202, "token validate err", m.ReqIdentifier, m.MsgIncr,m.OperationID) - // return - //} - log.NewInfo(m.OperationID, "Basic Info Authentication Success", m) + + log.NewInfo(m.OperationID, "Basic Info Authentication Success", m.SendID, m.MsgIncr, m.ReqIdentifier) switch m.ReqIdentifier { case constant.WSGetNewestSeq: + log.NewInfo(m.OperationID, "getSeqReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.getSeqReq(conn, &m) case constant.WSSendMsg: + log.NewInfo(m.OperationID, "sendMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.sendMsgReq(conn, &m) case constant.WSSendSignalMsg: + log.NewInfo(m.OperationID, "sendSignalMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.sendSignalMsgReq(conn, &m) case constant.WSPullMsgBySeqList: + log.NewInfo(m.OperationID, "pullMsgBySeqListReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.pullMsgBySeqListReq(conn, &m) default: + log.Error(m.OperationID, "ReqIdentifier failed ", m.SendID, m.MsgIncr, m.ReqIdentifier) } log.NewInfo(m.OperationID, "goroutine num is ", runtime.NumGoroutine()) } func (ws *WServer) getSeqReq(conn *UserConn, m *Req) { - log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier, m.Data) + log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier) nReply := new(sdk_ws.GetMaxAndMinSeqResp) isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSGetNewestSeq) if isPass { @@ -88,7 +83,6 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) { nReply.ErrCode = errCode nReply.ErrMsg = errMsg ws.getSeqResp(conn, m, nReply) - } } @@ -189,8 +183,6 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) { } func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.SendMsgResp) { - // := make(map[string]interface{}) - var mReplyData sdk_ws.UserSendMsgResp mReplyData.ClientMsgID = pb.GetClientMsgID() mReplyData.ServerMsgID = pb.GetServerMsgID() @@ -277,14 +269,14 @@ func (ws *WServer) sendMsg(conn *UserConn, mReply interface{}) { enc := gob.NewEncoder(&b) err := enc.Encode(mReply) if err != nil { - uid, platform := ws.getUserUid(conn) - log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "Encode Msg error", conn.RemoteAddr().String(), uid, platform, err.Error()) + // uid, platform := ws.getUserUid(conn) + log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "Encode Msg error", conn.RemoteAddr().String(), err.Error()) return } err = ws.writeMsg(conn, websocket.BinaryMessage, b.Bytes()) if err != nil { - uid, platform := ws.getUserUid(conn) - log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws writeMsg error", conn.RemoteAddr().String(), uid, platform, err.Error()) + // uid, platform := ws.getUserUid(conn) + log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws writeMsg error", conn.RemoteAddr().String(), err.Error()) } else { log.Debug(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws write response success") } diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index 54ea7138e..223270197 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -51,21 +51,27 @@ func (ws *WServer) run() { } func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) { - if ws.headerCheck(w, r) { - query := r.URL.Query() + query := r.URL.Query() + operationID := "" + if len(query["operationID"]) != 0 { + operationID = query["operationID"][0] + } else { + operationID = utils.OperationIDGenerator() + } + log.Debug(operationID, utils.GetSelfFuncName(), " args: ", query) + if ws.headerCheck(w, r, operationID) { conn, err := ws.wsUpGrader.Upgrade(w, r, nil) //Conn is obtained through the upgraded escalator if err != nil { - log.Error("", "upgrade http conn err", err, query) + log.Error(operationID, "upgrade http conn err", err.Error(), query) return } else { - //Connection mapping relationship, - //userID+" "+platformID->conn - //Initialize a lock for each user newConn := &UserConn{conn, new(sync.Mutex), 0} userCount++ - ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0]) + ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], operationID) go ws.readMsg(newConn) } + } else { + log.Error(operationID, "headerCheck failed ") } } @@ -76,18 +82,13 @@ func (ws *WServer) readMsg(conn *UserConn) { log.NewInfo("", "this is a pingMessage") } if err != nil { - uid, platform := ws.getUserUid(conn) - log.Error("", "WS ReadMsg error", "userIP", conn.RemoteAddr().String(), "userUid", uid, "platform", platform, "error", err.Error()) + log.Error("", "WS ReadMsg error ", " userIP", conn.RemoteAddr().String(), "userUid", "platform", "error", err.Error()) userCount-- ws.delUserConn(conn) return - } else { - //log.ErrorByKv("test", "", "msgType", msgType, "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn)) } ws.msgParse(conn, msg) - //ws.writeMsg(conn, 1, chat) } - } func (ws *WServer) SetWriteTimeout(conn *UserConn, timeout int) { @@ -115,25 +116,27 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn] if oldConn, ok := oldConnMap[platformID]; ok { log.NewDebug(operationID, uid, platformID, "kick old conn") - ws.sendKickMsg(oldConn, newConn) + // ws.sendKickMsg(oldConn, newConn) m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID)) if err != nil && err != go_redis.Nil { - log.NewError(operationID, "get token from redis err", err.Error(), uid) + log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID)) return } if m == nil { - log.NewError(operationID, "get token from redis err", "m is nil") + log.NewError(operationID, "get token from redis err", "m is nil", uid, constant.PlatformIDToName(platformID)) return } + log.NewDebug(operationID, "get token map is ", m, uid, constant.PlatformIDToName(platformID)) + for k, _ := range m { if k != token { m[k] = constant.KickedToken } } - log.NewDebug(operationID, "get map is ", m) + log.NewDebug(operationID, "set token map is ", m, uid, constant.PlatformIDToName(platformID)) err = db.DB.SetTokenMapByUidPid(uid, platformID, m) if err != nil { - log.NewError(operationID, "SetTokenMapByUidPid err", err.Error()) + log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m) return } err = oldConn.Close() @@ -146,7 +149,6 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn if err != nil { log.NewError(operationID, "conn close err", err.Error(), uid, platformID) } - } else { log.NewWarn(operationID, "abnormal uid-conn ", uid, platformID, oldConnMap[platformID]) } @@ -177,10 +179,11 @@ func (ws *WServer) sendKickMsg(oldConn, newConn *UserConn) { log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "sendKickMsg WS WriteMsg error", oldConn.RemoteAddr().String(), newConn.RemoteAddr().String(), err.Error()) } } -func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string) { + +func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, operationID string) { rwLock.Lock() defer rwLock.Unlock() - operationID := utils.OperationIDGenerator() + log.Info(operationID, utils.GetSelfFuncName(), " args: ", uid, platformID, conn, token) callbackResp := callbackUserOnline(operationID, uid, platformID, token) if callbackResp.ErrCode != 0 { log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOnline resp:", callbackResp) @@ -268,43 +271,39 @@ func (ws *WServer) getUserAllCons(uid string) map[int]*UserConn { return nil } -func (ws *WServer) getUserUid(conn *UserConn) (uid string, platform int) { - rwLock.RLock() - defer rwLock.RUnlock() - - if stringMap, ok := ws.wsConnToUser[conn]; ok { - for k, v := range stringMap { - platform = k - uid = v - } - return uid, platform - } - return "", 0 -} -func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request) bool { +//func (ws *WServer) getUserUid(conn *UserConn) (uid string, platform int) { +// rwLock.RLock() +// defer rwLock.RUnlock() +// +// if stringMap, ok := ws.wsConnToUser[conn]; ok { +// for k, v := range stringMap { +// platform = k +// uid = v +// } +// return uid, platform +// } +// return "", 0 +//} +func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request, operationID string) bool { status := http.StatusUnauthorized query := r.URL.Query() - operationID := "" - if len(query["operationID"]) != 0 { - operationID = query["operationID"][0] - } if len(query["token"]) != 0 && len(query["sendID"]) != 0 && len(query["platformID"]) != 0 { if ok, err, msg := token_verify.WsVerifyToken(query["token"][0], query["sendID"][0], query["platformID"][0], operationID); !ok { - // e := err.(*constant.ErrInfo) log.Error(operationID, "Token verify failed ", "query ", query, msg, err.Error()) w.Header().Set("Sec-Websocket-Version", "13") w.Header().Set("ws_err_msg", err.Error()) http.Error(w, err.Error(), status) return false } else { - log.Info(operationID, "Connection Authentication Success", "", "token", query["token"][0], "userID", query["sendID"][0]) + log.Info(operationID, "Connection Authentication Success", "", "token ", query["token"][0], "userID ", query["sendID"][0], "platformID ", query["platformID"][0]) return true } } else { - log.Error(operationID, "Args err", "query", query) + log.Error(operationID, "Args err ", "query ", query) w.Header().Set("Sec-Websocket-Version", "13") - w.Header().Set("ws_err_msg", "args err, need token, sendID, platformID") - http.Error(w, http.StatusText(status), status) + errMsg := "args err, need token, sendID, platformID" + w.Header().Set("ws_err_msg", errMsg) + http.Error(w, errMsg, status) return false } } diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index fffbf6789..6afbc7549 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -176,7 +176,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S return returnMsg(&replay, pb, errCode, errMsg, "", 0) } rpc.encapsulateMsgData(pb.MsgData) - log.Info("", "this is a test MsgData ", pb.MsgData) + log.Info(pb.OperationID, "this is a test MsgData ", pb.MsgData) msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} // callback diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 3d8ca02f5..c4cd9717a 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -16,8 +16,9 @@ type Producer struct { func NewKafkaProducer(addr []string, topic string) *Producer { p := Producer{} - p.config = sarama.NewConfig() //Instantiate a sarama Config - p.config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully + p.config = sarama.NewConfig() //Instantiate a sarama Config + p.config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully + p.config.Producer.Return.Errors = true p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly @@ -48,6 +49,10 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string) } kMsg.Value = sarama.ByteEncoder(bMsg) log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer) + if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { + log2.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg) + return -1, -1, errors.New("key or value == 0") + } a, b, c := p.producer.SendMessage(kMsg) log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg, p.producer) return a, b, c diff --git a/pkg/common/token_verify/jwt_token.go b/pkg/common/token_verify/jwt_token.go index aed09190f..54a1b4274 100644 --- a/pkg/common/token_verify/jwt_token.go +++ b/pkg/common/token_verify/jwt_token.go @@ -245,15 +245,19 @@ func VerifyToken(token, uid string) (bool, error) { return true, nil } func WsVerifyToken(token, uid string, platformID string, operationID string) (bool, error, string) { + argMsg := "token: " + token + " operationID: " + operationID + " userID: " + uid + " platformID: " + platformID claims, err := ParseToken(token, operationID) if err != nil { - return false, utils.Wrap(err, "parse token err"), "parse token err" + errMsg := "parse token err " + argMsg + return false, utils.Wrap(err, errMsg), errMsg } if claims.UID != uid { - return false, utils.Wrap(&constant.ErrTokenUnknown, "uid is not same to token uid"), "uid is not same to token uid" + errMsg := " uid is not same to token uid " + " claims.UID " + claims.UID + argMsg + return false, utils.Wrap(&constant.ErrTokenUnknown, errMsg), errMsg } if claims.Platform != constant.PlatformIDToName(utils.StringToInt(platformID)) { - return false, utils.Wrap(&constant.ErrTokenUnknown, "platform is not same to token platform"), "platform is not same to token platform" + errMsg := " platform is not same to token platform " + argMsg + "claims platformID " + claims.Platform + return false, utils.Wrap(&constant.ErrTokenUnknown, errMsg), errMsg } log.NewDebug(operationID, utils.GetSelfFuncName(), " check ok ", claims.UID, uid, claims.Platform) return true, nil, "" From 9f7cc1ae6985b3bc51839d4a3578c56b1056c260 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Wed, 15 Jun 2022 18:11:15 +0800 Subject: [PATCH 3/4] Optimization for log --- internal/msg_gateway/gate/logic.go | 8 ++++--- internal/msg_gateway/gate/validate.go | 33 ++++++++------------------- 2 files changed, 14 insertions(+), 27 deletions(-) diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index 238d73e91..88c4af013 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -36,7 +36,6 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { ws.sendErrMsg(conn, 201, err.Error(), m.ReqIdentifier, m.MsgIncr, m.OperationID) return } - log.NewInfo(m.OperationID, "Basic Info Authentication Success", m.SendID, m.MsgIncr, m.ReqIdentifier) switch m.ReqIdentifier { @@ -57,10 +56,12 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { } log.NewInfo(m.OperationID, "goroutine num is ", runtime.NumGoroutine()) } + func (ws *WServer) getSeqReq(conn *UserConn, m *Req) { log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier) nReply := new(sdk_ws.GetMaxAndMinSeqResp) - isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSGetNewestSeq) + isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSGetNewestSeq, m.OperationID) + log.Info(m.OperationID, "argsValidate ", isPass, errCode, errMsg) if isPass { rpcReq := sdk_ws.GetMaxAndMinSeqReq{} rpcReq.GroupIDList = data.(sdk_ws.GetMaxAndMinSeqReq).GroupIDList @@ -71,9 +72,9 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) { msgClient := pbChat.NewChatClient(grpcConn) rpcReply, err := msgClient.GetMaxAndMinSeq(context.Background(), &rpcReq) if err != nil { - log.Error(rpcReq.OperationID, "rpc call failed to getSeqReq", err.Error(), rpcReq.String()) nReply.ErrCode = 500 nReply.ErrMsg = err.Error() + log.Error(rpcReq.OperationID, "rpc call failed to GetMaxAndMinSeq ", nReply.String()) ws.getSeqResp(conn, m, nReply) } else { log.NewInfo(rpcReq.OperationID, "rpc call success to getSeqReq", rpcReply.String()) @@ -82,6 +83,7 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) { } else { nReply.ErrCode = errCode nReply.ErrMsg = errMsg + log.Error(m.OperationID, "argsValidate failed send resp: ", nReply.String()) ws.getSeqResp(conn, m, nReply) } } diff --git a/internal/msg_gateway/gate/validate.go b/internal/msg_gateway/gate/validate.go index 0f4950728..9a5558528 100644 --- a/internal/msg_gateway/gate/validate.go +++ b/internal/msg_gateway/gate/validate.go @@ -57,16 +57,16 @@ type SeqListData struct { SeqList []int64 `mapstructure:"seqList" validate:"required"` } -func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, returnData interface{}) { +func (ws *WServer) argsValidate(m *Req, r int32, operationID string) (isPass bool, errCode int32, errMsg string, returnData interface{}) { switch r { case constant.WSGetNewestSeq: data := open_im_sdk.GetMaxAndMinSeqReq{} if err := proto.Unmarshal(m.Data, &data); err != nil { - log.Error("", "Decode Data struct err", err.Error(), r) + log.Error(operationID, "Decode Data struct err", err.Error(), r) return false, 203, err.Error(), nil } if err := validate.Struct(data); err != nil { - log.Error("", "data args validate err", err.Error(), r) + log.Error(operationID, "data args validate err", err.Error(), r) return false, 204, err.Error(), nil } @@ -74,11 +74,11 @@ func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, er case constant.WSSendMsg: data := open_im_sdk.MsgData{} if err := proto.Unmarshal(m.Data, &data); err != nil { - log.Error("", "Decode Data struct err", err.Error(), r) + log.Error(operationID, "Decode Data struct err", err.Error(), r) return false, 203, err.Error(), nil } if err := validate.Struct(data); err != nil { - log.Error("", "data args validate err", err.Error(), r) + log.Error(operationID, "data args validate err", err.Error(), r) return false, 204, err.Error(), nil } @@ -86,11 +86,11 @@ func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, er case constant.WSSendSignalMsg: data := pbRtc.SignalReq{} if err := proto.Unmarshal(m.Data, &data); err != nil { - log.Error("", "Decode Data struct err", err.Error(), r) + log.Error(operationID, "Decode Data struct err", err.Error(), r) return false, 203, err.Error(), nil } if err := validate.Struct(data); err != nil { - log.Error("", "data args validate err", err.Error(), r) + log.Error(operationID, "data args validate err", err.Error(), r) return false, 204, err.Error(), nil } @@ -98,31 +98,16 @@ func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, er case constant.WSPullMsgBySeqList: data := open_im_sdk.PullMessageBySeqListReq{} if err := proto.Unmarshal(m.Data, &data); err != nil { - log.Error("", "Decode Data struct err", err.Error(), r) + log.Error(operationID, "Decode Data struct err", err.Error(), r) return false, 203, err.Error(), nil } if err := validate.Struct(data); err != nil { - log.Error("", "data args validate err", err.Error(), r) + log.Error(operationID, "data args validate err", err.Error(), r) return false, 204, err.Error(), nil } return true, 0, "", data - default: } - return false, 204, "args err", nil - - //b := bytes.NewBuffer(m.Data) - //dec := gob.NewDecoder(b) - //err := dec.Decode(&data) - //if err != nil { - // log.ErrorByKv("Decode Data struct err", "", "err", err.Error(), "reqIdentifier", r) - // return false, 203, err.Error(), nil - //} - //if err := mapstructure.WeakDecode(m.Data, &data); err != nil { - // log.ErrorByKv("map to Data struct err", "", "err", err.Error(), "reqIdentifier", r) - // return false, 203, err.Error(), nil - //} else - } From 1af78c9bcba826e9008f407da215919bc05ed345 Mon Sep 17 00:00:00 2001 From: skiffer-git <44203734@qq.com> Date: Wed, 15 Jun 2022 18:15:00 +0800 Subject: [PATCH 4/4] Optimization for log --- internal/msg_gateway/gate/logic.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index 88c4af013..668b3339a 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -89,7 +89,7 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) { } func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeqResp) { - log.Debug(m.OperationID, "getSeqResp come here ", pb.String()) + b, _ := proto.Marshal(pb) mReply := Resp{ ReqIdentifier: m.ReqIdentifier, @@ -99,6 +99,8 @@ func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeq OperationID: m.OperationID, Data: b, } + log.Debug(m.OperationID, "getSeqResp come here req: ", pb.String(), "send resp: ", + mReply.ReqIdentifier, mReply.MsgIncr, mReply.ErrCode, mReply.ErrMsg) ws.sendMsg(conn, mReply) }