diff --git a/go.mod b/go.mod index 6df88e549..e34d69ab0 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,6 @@ require ( github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/olivere/elastic/v7 v7.0.23 github.com/pkg/errors v0.9.1 - github.com/prometheus/client_golang v1.11.1 github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.0 @@ -51,3 +50,5 @@ require ( gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) + +replace github.com/Shopify/sarama => github.com/Shopify/sarama v1.29.0 diff --git a/internal/msg_gateway/gate/logic.go b/internal/msg_gateway/gate/logic.go index e12062338..95b6fd85c 100644 --- a/internal/msg_gateway/gate/logic.go +++ b/internal/msg_gateway/gate/logic.go @@ -19,19 +19,12 @@ import ( ) func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { - //ws online debug data - //{"ReqIdentifier":1001,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0} - //{"ReqIdentifier":1002,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b","Time":"123","OperationID":"123","MsgIncr":0,"SeqBegin":1,"SeqEnd":6} - //{"ReqIdentifier":1003,"Token":"123","SendID":"c4ca4238a0b923820dcc509a6f75849b", - //"RecvID":"a87ff679a2f3e71d9181a67b7542122c","ClientMsgID":"2343","Time":"147878787","OperationID": - //"123","MsgIncr":0,"SubMsgType":101,"MsgType":100,"MsgFrom":1,"Content":"sdfsdf"} b := bytes.NewBuffer(binaryMsg) m := Req{} dec := gob.NewDecoder(b) err := dec.Decode(&m) if err != nil { log.NewError("", "ws Decode err", err.Error()) - ws.sendErrMsg(conn, 200, err.Error(), constant.WSDataError, "", "") err = conn.Close() if err != nil { log.NewError("", "ws close err", err.Error()) @@ -43,29 +36,32 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) { ws.sendErrMsg(conn, 201, err.Error(), m.ReqIdentifier, m.MsgIncr, m.OperationID) return } - //if !utils.VerifyToken(m.Token, m.SendID) { - // ws.sendErrMsg(conn, 202, "token validate err", m.ReqIdentifier, m.MsgIncr,m.OperationID) - // return - //} - log.NewInfo(m.OperationID, "Basic Info Authentication Success", m) + log.NewInfo(m.OperationID, "Basic Info Authentication Success", m.SendID, m.MsgIncr, m.ReqIdentifier) switch m.ReqIdentifier { case constant.WSGetNewestSeq: + log.NewInfo(m.OperationID, "getSeqReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.getSeqReq(conn, &m) case constant.WSSendMsg: + log.NewInfo(m.OperationID, "sendMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.sendMsgReq(conn, &m) case constant.WSSendSignalMsg: + log.NewInfo(m.OperationID, "sendSignalMsgReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.sendSignalMsgReq(conn, &m) case constant.WSPullMsgBySeqList: + log.NewInfo(m.OperationID, "pullMsgBySeqListReq ", m.SendID, m.MsgIncr, m.ReqIdentifier) ws.pullMsgBySeqListReq(conn, &m) default: + log.Error(m.OperationID, "ReqIdentifier failed ", m.SendID, m.MsgIncr, m.ReqIdentifier) } log.NewInfo(m.OperationID, "goroutine num is ", runtime.NumGoroutine()) } + func (ws *WServer) getSeqReq(conn *UserConn, m *Req) { - log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier, m.Data) + log.NewInfo(m.OperationID, "Ws call success to getNewSeq", m.MsgIncr, m.SendID, m.ReqIdentifier) nReply := new(sdk_ws.GetMaxAndMinSeqResp) - isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSGetNewestSeq) + isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSGetNewestSeq, m.OperationID) + log.Info(m.OperationID, "argsValidate ", isPass, errCode, errMsg) if isPass { rpcReq := sdk_ws.GetMaxAndMinSeqReq{} rpcReq.GroupIDList = data.(sdk_ws.GetMaxAndMinSeqReq).GroupIDList @@ -76,9 +72,9 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) { msgClient := pbChat.NewChatClient(grpcConn) rpcReply, err := msgClient.GetMaxAndMinSeq(context.Background(), &rpcReq) if err != nil { - log.Error(rpcReq.OperationID, "rpc call failed to getSeqReq", err.Error(), rpcReq.String()) nReply.ErrCode = 500 nReply.ErrMsg = err.Error() + log.Error(rpcReq.OperationID, "rpc call failed to GetMaxAndMinSeq ", nReply.String()) ws.getSeqResp(conn, m, nReply) } else { log.NewInfo(rpcReq.OperationID, "rpc call success to getSeqReq", rpcReply.String()) @@ -87,13 +83,13 @@ func (ws *WServer) getSeqReq(conn *UserConn, m *Req) { } else { nReply.ErrCode = errCode nReply.ErrMsg = errMsg + log.Error(m.OperationID, "argsValidate failed send resp: ", nReply.String()) ws.getSeqResp(conn, m, nReply) - } } func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeqResp) { - log.Debug(m.OperationID, "getSeqResp come here ", pb.String()) + b, _ := proto.Marshal(pb) mReply := Resp{ ReqIdentifier: m.ReqIdentifier, @@ -103,13 +99,15 @@ func (ws *WServer) getSeqResp(conn *UserConn, m *Req, pb *sdk_ws.GetMaxAndMinSeq OperationID: m.OperationID, Data: b, } + log.Debug(m.OperationID, "getSeqResp come here req: ", pb.String(), "send resp: ", + mReply.ReqIdentifier, mReply.MsgIncr, mReply.ErrCode, mReply.ErrMsg) ws.sendMsg(conn, mReply) } func (ws *WServer) pullMsgBySeqListReq(conn *UserConn, m *Req) { log.NewInfo(m.OperationID, "Ws call success to pullMsgBySeqListReq start", m.SendID, m.ReqIdentifier, m.MsgIncr, string(m.Data)) nReply := new(sdk_ws.PullMessageBySeqListResp) - isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList) + isPass, errCode, errMsg, data := ws.argsValidate(m, constant.WSPullMsgBySeqList, m.OperationID) if isPass { rpcReq := sdk_ws.PullMessageBySeqListReq{} rpcReq.SeqList = data.(sdk_ws.PullMessageBySeqListReq).SeqList @@ -159,7 +157,7 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) { log.NewInfo(m.OperationID, "Ws call success to sendMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, m.Data) nReply := new(pbChat.SendMsgResp) - isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg) + isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendMsg, m.OperationID) if isPass { data := pData.(sdk_ws.MsgData) pbData := pbChat.SendMsgReq{ @@ -189,8 +187,6 @@ func (ws *WServer) sendMsgReq(conn *UserConn, m *Req) { } func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.SendMsgResp) { - // := make(map[string]interface{}) - var mReplyData sdk_ws.UserSendMsgResp mReplyData.ClientMsgID = pb.GetClientMsgID() mReplyData.ServerMsgID = pb.GetServerMsgID() @@ -210,7 +206,7 @@ func (ws *WServer) sendMsgResp(conn *UserConn, m *Req, pb *pbChat.SendMsgResp) { func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) { log.NewInfo(m.OperationID, "Ws call success to sendSignalMsgReq start", m.MsgIncr, m.ReqIdentifier, m.SendID, string(m.Data)) nReply := new(pbChat.SendMsgResp) - isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendSignalMsg) + isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WSSendSignalMsg, m.OperationID) if isPass { signalResp := pbRtc.SignalResp{} etcdConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImRealTimeCommName) @@ -260,7 +256,7 @@ func (ws *WServer) sendSignalMsgReq(conn *UserConn, m *Req) { } func (ws *WServer) sendSignalMsgResp(conn *UserConn, errCode int32, errMsg string, m *Req, pb *pbRtc.SignalResp) { // := make(map[string]interface{}) - log.Debug(m.OperationID, "SignalMsgResp is", pb.String()) + log.Debug(m.OperationID, "sendSignalMsgResp is", pb.String()) b, _ := proto.Marshal(pb) mReply := Resp{ ReqIdentifier: m.ReqIdentifier, @@ -277,14 +273,14 @@ func (ws *WServer) sendMsg(conn *UserConn, mReply interface{}) { enc := gob.NewEncoder(&b) err := enc.Encode(mReply) if err != nil { - uid, platform := ws.getUserUid(conn) - log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "Encode Msg error", conn.RemoteAddr().String(), uid, platform, err.Error()) + // uid, platform := ws.getUserUid(conn) + log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "Encode Msg error", conn.RemoteAddr().String(), err.Error()) return } err = ws.writeMsg(conn, websocket.BinaryMessage, b.Bytes()) if err != nil { - uid, platform := ws.getUserUid(conn) - log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws writeMsg error", conn.RemoteAddr().String(), uid, platform, err.Error()) + // uid, platform := ws.getUserUid(conn) + log.NewError(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws writeMsg error", conn.RemoteAddr().String(), err.Error()) } else { log.Debug(mReply.(Resp).OperationID, mReply.(Resp).ReqIdentifier, mReply.(Resp).ErrCode, mReply.(Resp).ErrMsg, "ws write response success") } diff --git a/internal/msg_gateway/gate/validate.go b/internal/msg_gateway/gate/validate.go index 0f4950728..9a5558528 100644 --- a/internal/msg_gateway/gate/validate.go +++ b/internal/msg_gateway/gate/validate.go @@ -57,16 +57,16 @@ type SeqListData struct { SeqList []int64 `mapstructure:"seqList" validate:"required"` } -func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, errMsg string, returnData interface{}) { +func (ws *WServer) argsValidate(m *Req, r int32, operationID string) (isPass bool, errCode int32, errMsg string, returnData interface{}) { switch r { case constant.WSGetNewestSeq: data := open_im_sdk.GetMaxAndMinSeqReq{} if err := proto.Unmarshal(m.Data, &data); err != nil { - log.Error("", "Decode Data struct err", err.Error(), r) + log.Error(operationID, "Decode Data struct err", err.Error(), r) return false, 203, err.Error(), nil } if err := validate.Struct(data); err != nil { - log.Error("", "data args validate err", err.Error(), r) + log.Error(operationID, "data args validate err", err.Error(), r) return false, 204, err.Error(), nil } @@ -74,11 +74,11 @@ func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, er case constant.WSSendMsg: data := open_im_sdk.MsgData{} if err := proto.Unmarshal(m.Data, &data); err != nil { - log.Error("", "Decode Data struct err", err.Error(), r) + log.Error(operationID, "Decode Data struct err", err.Error(), r) return false, 203, err.Error(), nil } if err := validate.Struct(data); err != nil { - log.Error("", "data args validate err", err.Error(), r) + log.Error(operationID, "data args validate err", err.Error(), r) return false, 204, err.Error(), nil } @@ -86,11 +86,11 @@ func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, er case constant.WSSendSignalMsg: data := pbRtc.SignalReq{} if err := proto.Unmarshal(m.Data, &data); err != nil { - log.Error("", "Decode Data struct err", err.Error(), r) + log.Error(operationID, "Decode Data struct err", err.Error(), r) return false, 203, err.Error(), nil } if err := validate.Struct(data); err != nil { - log.Error("", "data args validate err", err.Error(), r) + log.Error(operationID, "data args validate err", err.Error(), r) return false, 204, err.Error(), nil } @@ -98,31 +98,16 @@ func (ws *WServer) argsValidate(m *Req, r int32) (isPass bool, errCode int32, er case constant.WSPullMsgBySeqList: data := open_im_sdk.PullMessageBySeqListReq{} if err := proto.Unmarshal(m.Data, &data); err != nil { - log.Error("", "Decode Data struct err", err.Error(), r) + log.Error(operationID, "Decode Data struct err", err.Error(), r) return false, 203, err.Error(), nil } if err := validate.Struct(data); err != nil { - log.Error("", "data args validate err", err.Error(), r) + log.Error(operationID, "data args validate err", err.Error(), r) return false, 204, err.Error(), nil } return true, 0, "", data - default: } - return false, 204, "args err", nil - - //b := bytes.NewBuffer(m.Data) - //dec := gob.NewDecoder(b) - //err := dec.Decode(&data) - //if err != nil { - // log.ErrorByKv("Decode Data struct err", "", "err", err.Error(), "reqIdentifier", r) - // return false, 203, err.Error(), nil - //} - //if err := mapstructure.WeakDecode(m.Data, &data); err != nil { - // log.ErrorByKv("map to Data struct err", "", "err", err.Error(), "reqIdentifier", r) - // return false, 203, err.Error(), nil - //} else - } diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index 54ea7138e..223270197 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -51,21 +51,27 @@ func (ws *WServer) run() { } func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) { - if ws.headerCheck(w, r) { - query := r.URL.Query() + query := r.URL.Query() + operationID := "" + if len(query["operationID"]) != 0 { + operationID = query["operationID"][0] + } else { + operationID = utils.OperationIDGenerator() + } + log.Debug(operationID, utils.GetSelfFuncName(), " args: ", query) + if ws.headerCheck(w, r, operationID) { conn, err := ws.wsUpGrader.Upgrade(w, r, nil) //Conn is obtained through the upgraded escalator if err != nil { - log.Error("", "upgrade http conn err", err, query) + log.Error(operationID, "upgrade http conn err", err.Error(), query) return } else { - //Connection mapping relationship, - //userID+" "+platformID->conn - //Initialize a lock for each user newConn := &UserConn{conn, new(sync.Mutex), 0} userCount++ - ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0]) + ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], operationID) go ws.readMsg(newConn) } + } else { + log.Error(operationID, "headerCheck failed ") } } @@ -76,18 +82,13 @@ func (ws *WServer) readMsg(conn *UserConn) { log.NewInfo("", "this is a pingMessage") } if err != nil { - uid, platform := ws.getUserUid(conn) - log.Error("", "WS ReadMsg error", "userIP", conn.RemoteAddr().String(), "userUid", uid, "platform", platform, "error", err.Error()) + log.Error("", "WS ReadMsg error ", " userIP", conn.RemoteAddr().String(), "userUid", "platform", "error", err.Error()) userCount-- ws.delUserConn(conn) return - } else { - //log.ErrorByKv("test", "", "msgType", msgType, "userIP", conn.RemoteAddr().String(), "userUid", ws.getUserUid(conn)) } ws.msgParse(conn, msg) - //ws.writeMsg(conn, 1, chat) } - } func (ws *WServer) SetWriteTimeout(conn *UserConn, timeout int) { @@ -115,25 +116,27 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn] if oldConn, ok := oldConnMap[platformID]; ok { log.NewDebug(operationID, uid, platformID, "kick old conn") - ws.sendKickMsg(oldConn, newConn) + // ws.sendKickMsg(oldConn, newConn) m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID)) if err != nil && err != go_redis.Nil { - log.NewError(operationID, "get token from redis err", err.Error(), uid) + log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID)) return } if m == nil { - log.NewError(operationID, "get token from redis err", "m is nil") + log.NewError(operationID, "get token from redis err", "m is nil", uid, constant.PlatformIDToName(platformID)) return } + log.NewDebug(operationID, "get token map is ", m, uid, constant.PlatformIDToName(platformID)) + for k, _ := range m { if k != token { m[k] = constant.KickedToken } } - log.NewDebug(operationID, "get map is ", m) + log.NewDebug(operationID, "set token map is ", m, uid, constant.PlatformIDToName(platformID)) err = db.DB.SetTokenMapByUidPid(uid, platformID, m) if err != nil { - log.NewError(operationID, "SetTokenMapByUidPid err", err.Error()) + log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m) return } err = oldConn.Close() @@ -146,7 +149,6 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn if err != nil { log.NewError(operationID, "conn close err", err.Error(), uid, platformID) } - } else { log.NewWarn(operationID, "abnormal uid-conn ", uid, platformID, oldConnMap[platformID]) } @@ -177,10 +179,11 @@ func (ws *WServer) sendKickMsg(oldConn, newConn *UserConn) { log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "sendKickMsg WS WriteMsg error", oldConn.RemoteAddr().String(), newConn.RemoteAddr().String(), err.Error()) } } -func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string) { + +func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, operationID string) { rwLock.Lock() defer rwLock.Unlock() - operationID := utils.OperationIDGenerator() + log.Info(operationID, utils.GetSelfFuncName(), " args: ", uid, platformID, conn, token) callbackResp := callbackUserOnline(operationID, uid, platformID, token) if callbackResp.ErrCode != 0 { log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOnline resp:", callbackResp) @@ -268,43 +271,39 @@ func (ws *WServer) getUserAllCons(uid string) map[int]*UserConn { return nil } -func (ws *WServer) getUserUid(conn *UserConn) (uid string, platform int) { - rwLock.RLock() - defer rwLock.RUnlock() - - if stringMap, ok := ws.wsConnToUser[conn]; ok { - for k, v := range stringMap { - platform = k - uid = v - } - return uid, platform - } - return "", 0 -} -func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request) bool { +//func (ws *WServer) getUserUid(conn *UserConn) (uid string, platform int) { +// rwLock.RLock() +// defer rwLock.RUnlock() +// +// if stringMap, ok := ws.wsConnToUser[conn]; ok { +// for k, v := range stringMap { +// platform = k +// uid = v +// } +// return uid, platform +// } +// return "", 0 +//} +func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request, operationID string) bool { status := http.StatusUnauthorized query := r.URL.Query() - operationID := "" - if len(query["operationID"]) != 0 { - operationID = query["operationID"][0] - } if len(query["token"]) != 0 && len(query["sendID"]) != 0 && len(query["platformID"]) != 0 { if ok, err, msg := token_verify.WsVerifyToken(query["token"][0], query["sendID"][0], query["platformID"][0], operationID); !ok { - // e := err.(*constant.ErrInfo) log.Error(operationID, "Token verify failed ", "query ", query, msg, err.Error()) w.Header().Set("Sec-Websocket-Version", "13") w.Header().Set("ws_err_msg", err.Error()) http.Error(w, err.Error(), status) return false } else { - log.Info(operationID, "Connection Authentication Success", "", "token", query["token"][0], "userID", query["sendID"][0]) + log.Info(operationID, "Connection Authentication Success", "", "token ", query["token"][0], "userID ", query["sendID"][0], "platformID ", query["platformID"][0]) return true } } else { - log.Error(operationID, "Args err", "query", query) + log.Error(operationID, "Args err ", "query ", query) w.Header().Set("Sec-Websocket-Version", "13") - w.Header().Set("ws_err_msg", "args err, need token, sendID, platformID") - http.Error(w, http.StatusText(status), status) + errMsg := "args err, need token, sendID, platformID" + w.Header().Set("ws_err_msg", errMsg) + http.Error(w, errMsg, status) return false } } diff --git a/internal/rpc/msg/send_msg.go b/internal/rpc/msg/send_msg.go index fffbf6789..3dc98c3a7 100644 --- a/internal/rpc/msg/send_msg.go +++ b/internal/rpc/msg/send_msg.go @@ -160,29 +160,29 @@ func (rpc *rpcChat) encapsulateMsgData(msg *sdk_ws.MsgData) { utils.SetSwitchFromOptions(msg.Options, constant.IsSenderConversationUpdate, false) utils.SetSwitchFromOptions(msg.Options, constant.IsUnreadCount, false) utils.SetSwitchFromOptions(msg.Options, constant.IsOfflinePush, false) - } } func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.SendMsgResp, error) { replay := pbChat.SendMsgResp{} newTime := db.GetCurrentTimestampByMill() - log.NewWarn(pb.OperationID, "rpc sendMsg come here", pb.String(), pb.MsgData.ClientMsgID) + log.Info(pb.OperationID, "rpc sendMsg come here ", pb.String()) flag, errCode, errMsg := isMessageHasReadEnabled(pb) + log.Info(pb.OperationID, "isMessageHasReadEnabled ", flag) if !flag { return returnMsg(&replay, pb, errCode, errMsg, "", 0) } flag, errCode, errMsg = userRelationshipVerification(pb) + log.Info(pb.OperationID, "userRelationshipVerification ", flag) if !flag { return returnMsg(&replay, pb, errCode, errMsg, "", 0) } rpc.encapsulateMsgData(pb.MsgData) - log.Info("", "this is a test MsgData ", pb.MsgData) msgToMQSingle := pbChat.MsgDataToMQ{Token: pb.Token, OperationID: pb.OperationID, MsgData: pb.MsgData} - // callback callbackResp := callbackWordFilter(pb) + log.Info(pb.OperationID, "callbackWordFilter ", callbackResp) if callbackResp.ErrCode != 0 { - log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp) + log.Error(pb.OperationID, utils.GetSelfFuncName(), "callbackWordFilter resp: ", callbackResp) } log.NewDebug(pb.OperationID, utils.GetSelfFuncName(), "callbackResp: ", callbackResp) if callbackResp.ActionCode != constant.ActionAllow { @@ -212,7 +212,7 @@ func (rpc *rpcChat) SendMsg(_ context.Context, pb *pbChat.SendMsgReq) (*pbChat.S log.NewInfo(msgToMQSingle.OperationID, msgToMQSingle) err1 := rpc.sendMsgToKafka(&msgToMQSingle, msgToMQSingle.MsgData.RecvID, constant.OnlineStatus) if err1 != nil { - log.NewError(msgToMQSingle.OperationID, "kafka send msg err:RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String()) + log.NewError(msgToMQSingle.OperationID, "kafka send msg err :RecvID", msgToMQSingle.MsgData.RecvID, msgToMQSingle.String(), err1.Error()) return returnMsg(&replay, pb, 201, "kafka send msg err", "", 0) } } diff --git a/pkg/common/db/newRedisModel.go b/pkg/common/db/newRedisModel.go index 9c279b69f..da5ec2127 100644 --- a/pkg/common/db/newRedisModel.go +++ b/pkg/common/db/newRedisModel.go @@ -155,6 +155,7 @@ func (d *DataBases) GetMessageListBySeq(userID string, seqList []uint32, operati } func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, operationID string) error { ctx := context.Background() + pipe := d.rdb.Pipeline() var failedList []pbChat.MsgDataToMQ for _, msg := range msgList { key := messageCache + uid + "_" + strconv.Itoa(int(msg.MsgData.Seq)) @@ -164,7 +165,7 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, continue } log2.NewDebug(operationID, "convert string is ", s) - err = d.rdb.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() + err = pipe.Set(ctx, key, s, time.Duration(config.Config.MsgCacheTimeout)*time.Second).Err() //err = d.rdb.HMSet(context.Background(), "12", map[string]interface{}{"1": 2, "343": false}).Err() if err != nil { log2.NewWarn(operationID, utils.GetSelfFuncName(), "redis failed", "args:", key, *msg, uid, s, err.Error()) @@ -174,7 +175,8 @@ func (d *DataBases) SetMessageToCache(msgList []*pbChat.MsgDataToMQ, uid string, if len(failedList) != 0 { return errors.New(fmt.Sprintf("set msg to cache failed, failed lists: %q,%s", failedList, operationID)) } - return nil + _, err := pipe.Exec(ctx) + return err } func (d *DataBases) CleanUpOneUserAllMsgFromRedis(userID string, operationID string) error { diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 3d8ca02f5..1ebaaab10 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -2,6 +2,7 @@ package kafka import ( log2 "Open_IM/pkg/common/log" + "Open_IM/pkg/utils" "errors" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" @@ -16,8 +17,9 @@ type Producer struct { func NewKafkaProducer(addr []string, topic string) *Producer { p := Producer{} - p.config = sarama.NewConfig() //Instantiate a sarama Config - p.config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully + p.config = sarama.NewConfig() //Instantiate a sarama Config + p.config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully + p.config.Producer.Return.Errors = true p.config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all p.config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly @@ -44,11 +46,16 @@ func (p *Producer) SendMessage(m proto.Message, key string, operationID string) return -1, -1, err } if len(bMsg) == 0 { - return 0, 0, errors.New("msg content is nil") + log2.Error(operationID, "len(bMsg) == 0 ") + return 0, 0, errors.New("len(bMsg) == 0 ") } kMsg.Value = sarama.ByteEncoder(bMsg) - log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer) + log2.Info(operationID, "ByteEncoder SendMessage begin", "key ", kMsg, p.producer, "len: ", kMsg.Key.Length(), kMsg.Value.Length()) + if kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 { + log2.Error(operationID, "kMsg.Key.Length() == 0 || kMsg.Value.Length() == 0 ", kMsg) + return -1, -1, errors.New("key or value == 0") + } a, b, c := p.producer.SendMessage(kMsg) - log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg, p.producer) - return a, b, c + log2.Info(operationID, "ByteEncoder SendMessage end", "key ", kMsg.Key.Length(), kMsg.Value.Length(), p.producer) + return a, b, utils.Wrap(c, "") } diff --git a/pkg/common/token_verify/jwt_token.go b/pkg/common/token_verify/jwt_token.go index aed09190f..54a1b4274 100644 --- a/pkg/common/token_verify/jwt_token.go +++ b/pkg/common/token_verify/jwt_token.go @@ -245,15 +245,19 @@ func VerifyToken(token, uid string) (bool, error) { return true, nil } func WsVerifyToken(token, uid string, platformID string, operationID string) (bool, error, string) { + argMsg := "token: " + token + " operationID: " + operationID + " userID: " + uid + " platformID: " + platformID claims, err := ParseToken(token, operationID) if err != nil { - return false, utils.Wrap(err, "parse token err"), "parse token err" + errMsg := "parse token err " + argMsg + return false, utils.Wrap(err, errMsg), errMsg } if claims.UID != uid { - return false, utils.Wrap(&constant.ErrTokenUnknown, "uid is not same to token uid"), "uid is not same to token uid" + errMsg := " uid is not same to token uid " + " claims.UID " + claims.UID + argMsg + return false, utils.Wrap(&constant.ErrTokenUnknown, errMsg), errMsg } if claims.Platform != constant.PlatformIDToName(utils.StringToInt(platformID)) { - return false, utils.Wrap(&constant.ErrTokenUnknown, "platform is not same to token platform"), "platform is not same to token platform" + errMsg := " platform is not same to token platform " + argMsg + "claims platformID " + claims.Platform + return false, utils.Wrap(&constant.ErrTokenUnknown, errMsg), errMsg } log.NewDebug(operationID, utils.GetSelfFuncName(), " check ok ", claims.UID, uid, claims.Platform) return true, nil, ""