mirror of
https://github.com/openimsdk/open-im-server.git
synced 2025-04-06 04:15:46 +08:00
Optimization for log
This commit is contained in:
parent
21051900f9
commit
f4f6ea681e
@ -19,19 +19,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
|
func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
|
||||||
//ws online debug data
|
|
||||||
//{"ReqIdentifier":1001,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0}
|
|
||||||
//{"ReqIdentifier":1002,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0,"SeqBegin":1,"SeqEnd":6}
|
|
||||||
//{"ReqIdentifier":1003,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b",
|
|
||||||
//"RecvID":"a87ff679a2f3e71d9181a67b7542122c","ClientMsgID":"2343","Time":"147878787","OperationID":
|
|
||||||
//"123","MsgIncr":0,"SubMsgType":101,"MsgType":100,"MsgFrom":1,"Content":"sdfsdf"}
|
|
||||||
b := bytes.NewBuffer(binaryMsg)
|
b := bytes.NewBuffer(binaryMsg)
|
||||||
m := Req{}
|
m := Req{}
|
||||||
dec := gob.NewDecoder(b)
|
dec := gob.NewDecoder(b)
|
||||||
err := dec.Decode(&m)
|
err := dec.Decode(&m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError("", "ws Decode err", err.Error())
|
log.NewError("", "ws Decode err", err.Error())
|
||||||
ws.sendErrMsg(conn, 200, err.Error(), constant.WSDataError, "", "")
|
|
||||||
err = conn.Close()
|
err = conn.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError("", "ws close err", err.Error())
|
log.NewError("", "ws close err", err.Error())
|
||||||
@ -43,27 +36,29 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
|
|||||||
ws.sendErrMsg(conn, 201, err.Error(), m.ReqIdentifier, m.MsgIncr, m.OperationID)
|
ws.sendErrMsg(conn, 201, err.Error(), m.ReqIdentifier, m.MsgIncr, m.OperationID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
//if !utils.VerifyToken(m.Token, m.SendID) {
|
|
||||||
// ws.sendErrMsg(conn, 202, "token validate err", m.ReqIdentifier, m.MsgIncr,m.OperationID)
|
log.NewInfo(m.OperationID, "Basic Info Authentication Success", m.SendID, m.MsgIncr, m.ReqIdentifier)
|
||||||
// return
|
|
||||||
//}
|
|
||||||
log.NewInfo(m.OperationID, "Basic Info Authentication Success", m)
|
|
||||||
|
|
||||||
switch m.ReqIdentifier {
|
switch m.ReqIdentifier {
|
||||||
case constant.WSGetNewestSeq:
|
case constant.WSGetNewestSeq:
|
||||||
|
log.NewInfo(m.OperationID, "getSeqReq ", m.SendID, m.MsgIncr, m.ReqIdentifier)
|
||||||
ws.getSeqReq(conn, &m)
|
ws.getSeqReq(conn, &m)
|
||||||
case constant.WSSendMsg:
|
case constant.WSSendMsg:
|
||||||
|
log.NewInfo(m.OperationID, "sendMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier)
|
||||||
ws.sendMsgReq(conn, &m)
|
ws.sendMsgReq(conn, &m)
|
||||||
case constant.WSSendSignalMsg:
|
case constant.WSSendSignalMsg:
|
||||||
|
log.NewInfo(m.OperationID, "sendSignalMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier)
|
||||||
ws.sendSignalMsgReq(conn, &m)
|
ws.sendSignalMsgReq(conn, &m)
|
||||||
case constant.WSPullMsgBySeqList:
|
case constant.WSPullMsgBySeqList:
|
||||||
|
log.NewInfo(m.OperationID, "pullMsgBySeqListReq ", m.SendID, m.MsgIncr, m.ReqIdentifier)
|
||||||
ws.pullMsgBySeqListReq(conn, &m)
|
ws.pullMsgBySeqListReq(conn, &m)
|
||||||
default:
|
default:
|
||||||
|
log.Error(m.OperationID, "ReqIdentifier failed ", m.SendID, m.MsgIncr, m.ReqIdentifier)
|
||||||
}
|
}
|
||||||
log.NewInfo(m.OperationID, "goroutine num is ", runtime.NumGoroutine())
|
log.NewInfo(m.OperationID, "goroutine num is ", runtime.NumGoroutine())
|
||||||
}
|
}
|
||||||
func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
|
func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
|
||||||
log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier, m.Data)
|
log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier)
|
||||||
nReply := new(sdk_ws.GetMaxAndMinSeqResp)
|
nReply := new(sdk_ws.GetMaxAndMinSeqResp)
|
||||||
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSGetNewestSeq)
|
isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSGetNewestSeq)
|
||||||
if isPass {
|
if isPass {
|
||||||
@ -88,7 +83,6 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) {
|
|||||||
nReply.ErrCode = errCode
|
nReply.ErrCode = errCode
|
||||||
nReply.ErrMsg = errMsg
|
nReply.ErrMsg = errMsg
|
||||||
ws.getSeqResp(conn, m, nReply)
|
ws.getSeqResp(conn, m, nReply)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -189,8 +183,6 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.SendMsgResp) {
|
func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.SendMsgResp) {
|
||||||
// := make(map[string]interface{})
|
|
||||||
|
|
||||||
var mReplyData sdk_ws.UserSendMsgResp
|
var mReplyData sdk_ws.UserSendMsgResp
|
||||||
mReplyData.ClientMsgID = pb.GetClientMsgID()
|
mReplyData.ClientMsgID = pb.GetClientMsgID()
|
||||||
mReplyData.ServerMsgID = pb.GetServerMsgID()
|
mReplyData.ServerMsgID = pb.GetServerMsgID()
|
||||||
@ -277,14 +269,14 @@ func (ws *WServer) sendMsg(conn *UserConn, mReply interface{}) {
|
|||||||
enc := gob.NewEncoder(&b)
|
enc := gob.NewEncoder(&b)
|
||||||
err := enc.Encode(mReply)
|
err := enc.Encode(mReply)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
uid, platform := ws.getUserUid(conn)
|
// uid, platform := ws.getUserUid(conn)
|
||||||
log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "Encode Msg error", conn.RemoteAddr().String(), uid, platform, err.Error())
|
log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "Encode Msg error", conn.RemoteAddr().String(), err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = ws.writeMsg(conn, websocket.BinaryMessage, b.Bytes())
|
err = ws.writeMsg(conn, websocket.BinaryMessage, b.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
uid, platform := ws.getUserUid(conn)
|
// uid, platform := ws.getUserUid(conn)
|
||||||
log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws writeMsg error", conn.RemoteAddr().String(), uid, platform, err.Error())
|
log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws writeMsg error", conn.RemoteAddr().String(), err.Error())
|
||||||
} else {
|
} else {
|
||||||
log.Debug(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws write response success")
|
log.Debug(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws write response success")
|
||||||
}
|
}
|
||||||
|
@ -51,21 +51,27 @@ func (ws *WServer) run() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
if ws.headerCheck(w, r) {
|
|
||||||
query := r.URL.Query()
|
query := r.URL.Query()
|
||||||
|
operationID := ""
|
||||||
|
if len(query["operationID"]) != 0 {
|
||||||
|
operationID = query["operationID"][0]
|
||||||
|
} else {
|
||||||
|
operationID = utils.OperationIDGenerator()
|
||||||
|
}
|
||||||
|
log.Debug(operationID, utils.GetSelfFuncName(), " args: ", query)
|
||||||
|
if ws.headerCheck(w, r, operationID) {
|
||||||
conn, err := ws.wsUpGrader.Upgrade(w, r, nil) //Conn is obtained through the upgraded escalator
|
conn, err := ws.wsUpGrader.Upgrade(w, r, nil) //Conn is obtained through the upgraded escalator
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("", "upgrade http conn err", err, query)
|
log.Error(operationID, "upgrade http conn err", err.Error(), query)
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
//Connection mapping relationship,
|
|
||||||
//userID+" "+platformID->conn
|
|
||||||
//Initialize a lock for each user
|
|
||||||
newConn := &UserConn{conn, new(sync.Mutex), 0}
|
newConn := &UserConn{conn, new(sync.Mutex), 0}
|
||||||
userCount++
|
userCount++
|
||||||
ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0])
|
ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], operationID)
|
||||||
go ws.readMsg(newConn)
|
go ws.readMsg(newConn)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
log.Error(operationID, "headerCheck failed ")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,18 +82,13 @@ func (ws *WServer) readMsg(conn *UserConn) {
|
|||||||
log.NewInfo("", "this is a pingMessage")
|
log.NewInfo("", "this is a pingMessage")
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
uid, platform := ws.getUserUid(conn)
|
log.Error("", "WS ReadMsg error ", " userIP", conn.RemoteAddr().String(), "userUid", "platform", "error", err.Error())
|
||||||
log.Error("", "WS ReadMsg error", "userIP", conn.RemoteAddr().String(), "userUid", uid, "platform", platform, "error", err.Error())
|
|
||||||
userCount--
|
userCount--
|
||||||
ws.delUserConn(conn)
|
ws.delUserConn(conn)
|
||||||
return
|
return
|
||||||
} else {
|
|
||||||
//log.ErrorByKv("test", "", "msgType", msgType, "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn))
|
|
||||||
}
|
}
|
||||||
ws.msgParse(conn, msg)
|
ws.msgParse(conn, msg)
|
||||||
//ws.writeMsg(conn, 1, chat)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WServer) SetWriteTimeout(conn *UserConn, timeout int) {
|
func (ws *WServer) SetWriteTimeout(conn *UserConn, timeout int) {
|
||||||
@ -115,25 +116,27 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn
|
|||||||
if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn]
|
if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn]
|
||||||
if oldConn, ok := oldConnMap[platformID]; ok {
|
if oldConn, ok := oldConnMap[platformID]; ok {
|
||||||
log.NewDebug(operationID, uid, platformID, "kick old conn")
|
log.NewDebug(operationID, uid, platformID, "kick old conn")
|
||||||
ws.sendKickMsg(oldConn, newConn)
|
// ws.sendKickMsg(oldConn, newConn)
|
||||||
m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID))
|
m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID))
|
||||||
if err != nil && err != go_redis.Nil {
|
if err != nil && err != go_redis.Nil {
|
||||||
log.NewError(operationID, "get token from redis err", err.Error(), uid)
|
log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if m == nil {
|
if m == nil {
|
||||||
log.NewError(operationID, "get token from redis err", "m is nil")
|
log.NewError(operationID, "get token from redis err", "m is nil", uid, constant.PlatformIDToName(platformID))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.NewDebug(operationID, "get token map is ", m, uid, constant.PlatformIDToName(platformID))
|
||||||
|
|
||||||
for k, _ := range m {
|
for k, _ := range m {
|
||||||
if k != token {
|
if k != token {
|
||||||
m[k] = constant.KickedToken
|
m[k] = constant.KickedToken
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.NewDebug(operationID, "get map is ", m)
|
log.NewDebug(operationID, "set token map is ", m, uid, constant.PlatformIDToName(platformID))
|
||||||
err = db.DB.SetTokenMapByUidPid(uid, platformID, m)
|
err = db.DB.SetTokenMapByUidPid(uid, platformID, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError(operationID, "SetTokenMapByUidPid err", err.Error())
|
log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = oldConn.Close()
|
err = oldConn.Close()
|
||||||
@ -146,7 +149,6 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.NewError(operationID, "conn close err", err.Error(), uid, platformID)
|
log.NewError(operationID, "conn close err", err.Error(), uid, platformID)
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
log.NewWarn(operationID, "abnormal uid-conn ", uid, platformID, oldConnMap[platformID])
|
log.NewWarn(operationID, "abnormal uid-conn ", uid, platformID, oldConnMap[platformID])
|
||||||
}
|
}
|
||||||
@ -177,10 +179,11 @@ func (ws *WServer) sendKickMsg(oldConn, newConn *UserConn) {
|
|||||||
log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "sendKickMsg WS WriteMsg error", oldConn.RemoteAddr().String(), newConn.RemoteAddr().String(), err.Error())
|
log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "sendKickMsg WS WriteMsg error", oldConn.RemoteAddr().String(), newConn.RemoteAddr().String(), err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string) {
|
|
||||||
|
func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, operationID string) {
|
||||||
rwLock.Lock()
|
rwLock.Lock()
|
||||||
defer rwLock.Unlock()
|
defer rwLock.Unlock()
|
||||||
operationID := utils.OperationIDGenerator()
|
log.Info(operationID, utils.GetSelfFuncName(), " args: ", uid, platformID, conn, token)
|
||||||
callbackResp := callbackUserOnline(operationID, uid, platformID, token)
|
callbackResp := callbackUserOnline(operationID, uid, platformID, token)
|
||||||
if callbackResp.ErrCode != 0 {
|
if callbackResp.ErrCode != 0 {
|
||||||
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOnline resp:", callbackResp)
|
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOnline resp:", callbackResp)
|
||||||
@ -268,43 +271,39 @@ func (ws *WServer) getUserAllCons(uid string) map[int]*UserConn {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WServer) getUserUid(conn *UserConn) (uid string, platform int) {
|
//func (ws *WServer) getUserUid(conn *UserConn) (uid string, platform int) {
|
||||||
rwLock.RLock()
|
// rwLock.RLock()
|
||||||
defer rwLock.RUnlock()
|
// defer rwLock.RUnlock()
|
||||||
|
//
|
||||||
if stringMap, ok := ws.wsConnToUser[conn]; ok {
|
// if stringMap, ok := ws.wsConnToUser[conn]; ok {
|
||||||
for k, v := range stringMap {
|
// for k, v := range stringMap {
|
||||||
platform = k
|
// platform = k
|
||||||
uid = v
|
// uid = v
|
||||||
}
|
// }
|
||||||
return uid, platform
|
// return uid, platform
|
||||||
}
|
// }
|
||||||
return "", 0
|
// return "", 0
|
||||||
}
|
//}
|
||||||
func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request) bool {
|
func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request, operationID string) bool {
|
||||||
status := http.StatusUnauthorized
|
status := http.StatusUnauthorized
|
||||||
query := r.URL.Query()
|
query := r.URL.Query()
|
||||||
operationID := ""
|
|
||||||
if len(query["operationID"]) != 0 {
|
|
||||||
operationID = query["operationID"][0]
|
|
||||||
}
|
|
||||||
if len(query["token"]) != 0 && len(query["sendID"]) != 0 && len(query["platformID"]) != 0 {
|
if len(query["token"]) != 0 && len(query["sendID"]) != 0 && len(query["platformID"]) != 0 {
|
||||||
if ok, err, msg := token_verify.WsVerifyToken(query["token"][0], query["sendID"][0], query["platformID"][0], operationID); !ok {
|
if ok, err, msg := token_verify.WsVerifyToken(query["token"][0], query["sendID"][0], query["platformID"][0], operationID); !ok {
|
||||||
// e := err.(*constant.ErrInfo)
|
|
||||||
log.Error(operationID, "Token verify failed ", "query ", query, msg, err.Error())
|
log.Error(operationID, "Token verify failed ", "query ", query, msg, err.Error())
|
||||||
w.Header().Set("Sec-Websocket-Version", "13")
|
w.Header().Set("Sec-Websocket-Version", "13")
|
||||||
w.Header().Set("ws_err_msg", err.Error())
|
w.Header().Set("ws_err_msg", err.Error())
|
||||||
http.Error(w, err.Error(), status)
|
http.Error(w, err.Error(), status)
|
||||||
return false
|
return false
|
||||||
} else {
|
} else {
|
||||||
log.Info(operationID, "Connection Authentication Success", "", "token", query["token"][0], "userID", query["sendID"][0])
|
log.Info(operationID, "Connection Authentication Success", "", "token ", query["token"][0], "userID ", query["sendID"][0], "platformID ", query["platformID"][0])
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Error(operationID, "Args err", "query", query)
|
log.Error(operationID, "Args err ", "query ", query)
|
||||||
w.Header().Set("Sec-Websocket-Version", "13")
|
w.Header().Set("Sec-Websocket-Version", "13")
|
||||||
w.Header().Set("ws_err_msg", "args err, need token, sendID, platformID")
|
errMsg := "args err, need token, sendID, platformID"
|
||||||
http.Error(w, http.StatusText(status), status)
|
w.Header().Set("ws_err_msg", errMsg)
|
||||||
|
http.Error(w, errMsg, status)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -176,7 +176,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S
|
|||||||
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
|
return returnMsg(&replay, pb, errCode, errMsg, "", 0)
|
||||||
}
|
}
|
||||||
rpc.encapsulateMsgData(pb.MsgData)
|
rpc.encapsulateMsgData(pb.MsgData)
|
||||||
log.Info("", "this is a test MsgData ", pb.MsgData)
|
log.Info(pb.OperationID, "this is a test MsgData ", pb.MsgData)
|
||||||
msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData}
|
msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData}
|
||||||
|
|
||||||
// callback
|
// callback
|
||||||
|
@ -18,6 +18,7 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
|
|||||||
p := Producer{}
|
p := Producer{}
|
||||||
p.config = sarama.NewConfig() //Instantiate a sarama Config
|
p.config = sarama.NewConfig() //Instantiate a sarama Config
|
||||||
p.config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully
|
p.config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully
|
||||||
|
p.config.Producer.Return.Errors = true
|
||||||
p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all
|
p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all
|
||||||
p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
|
p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly
|
||||||
|
|
||||||
@ -48,6 +49,10 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string)
|
|||||||
}
|
}
|
||||||
kMsg.Value = sarama.ByteEncoder(bMsg)
|
kMsg.Value = sarama.ByteEncoder(bMsg)
|
||||||
log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer)
|
log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer)
|
||||||
|
if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 {
|
||||||
|
log2.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg)
|
||||||
|
return -1, -1, errors.New("key or value == 0")
|
||||||
|
}
|
||||||
a, b, c := p.producer.SendMessage(kMsg)
|
a, b, c := p.producer.SendMessage(kMsg)
|
||||||
log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg, p.producer)
|
log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg, p.producer)
|
||||||
return a, b, c
|
return a, b, c
|
||||||
|
@ -245,15 +245,19 @@ func VerifyToken(token, uid string) (bool, error) {
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
func WsVerifyToken(token, uid string, platformID string, operationID string) (bool, error, string) {
|
func WsVerifyToken(token, uid string, platformID string, operationID string) (bool, error, string) {
|
||||||
|
argMsg := "token: " + token + " operationID: " + operationID + " userID: " + uid + " platformID: " + platformID
|
||||||
claims, err := ParseToken(token, operationID)
|
claims, err := ParseToken(token, operationID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, utils.Wrap(err, "parse token err"), "parse token err"
|
errMsg := "parse token err " + argMsg
|
||||||
|
return false, utils.Wrap(err, errMsg), errMsg
|
||||||
}
|
}
|
||||||
if claims.UID != uid {
|
if claims.UID != uid {
|
||||||
return false, utils.Wrap(&constant.ErrTokenUnknown, "uid is not same to token uid"), "uid is not same to token uid"
|
errMsg := " uid is not same to token uid " + " claims.UID " + claims.UID + argMsg
|
||||||
|
return false, utils.Wrap(&constant.ErrTokenUnknown, errMsg), errMsg
|
||||||
}
|
}
|
||||||
if claims.Platform != constant.PlatformIDToName(utils.StringToInt(platformID)) {
|
if claims.Platform != constant.PlatformIDToName(utils.StringToInt(platformID)) {
|
||||||
return false, utils.Wrap(&constant.ErrTokenUnknown, "platform is not same to token platform"), "platform is not same to token platform"
|
errMsg := " platform is not same to token platform " + argMsg + "claims platformID " + claims.Platform
|
||||||
|
return false, utils.Wrap(&constant.ErrTokenUnknown, errMsg), errMsg
|
||||||
}
|
}
|
||||||
log.NewDebug(operationID, utils.GetSelfFuncName(), " check ok ", claims.UID, uid, claims.Platform)
|
log.NewDebug(operationID, utils.GetSelfFuncName(), " check ok ", claims.UID, uid, claims.Platform)
|
||||||
return true, nil, ""
|
return true, nil, ""
|
||||||
|
Loading…
x
Reference in New Issue
Block a user