test recover

This commit is contained in:
Gordon 2022-03-04 16:11:04 +08:00
parent c108115c83
commit 8dc04667b0
4 changed files with 80 additions and 76 deletions

View File

@ -5,6 +5,7 @@ import (
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/db"
"Open_IM/pkg/common/log"
"Open_IM/pkg/common/token_verify"
"Open_IM/pkg/utils"
"bytes"
"encoding/gob"
@ -257,16 +258,16 @@ func (ws *WServer) headerCheck(w http.ResponseWriter, r *http.Request) bool {
status := http.StatusUnauthorized
query := r.URL.Query()
if len(query["token"]) != 0 && len(query["sendID"]) != 0 && len(query["platformID"]) != 0 {
//if ok, err := token_verify.VerifyToken(query["token"][0], query["sendID"][0]); !ok {
// e := err.(*constant.ErrInfo)
// log.ErrorByKv("Token verify failed", "", "query", query)
// w.Header().Set("Sec-Websocket-Version", "13")
// http.Error(w, e.ErrMsg, int(e.ErrCode))
// return false
//} else {
log.InfoByKv("Connection Authentication Success", "", "token", query["token"][0], "userID", query["sendID"][0])
return true
//}
if ok, err := token_verify.VerifyToken(query["token"][0], query["sendID"][0]); !ok {
e := err.(*constant.ErrInfo)
log.ErrorByKv("Token verify failed", "", "query", query)
w.Header().Set("Sec-Websocket-Version", "13")
http.Error(w, e.ErrMsg, int(e.ErrCode))
return false
} else {
log.InfoByKv("Connection Authentication Success", "", "token", query["token"][0], "userID", query["sendID"][0])
return true
}
} else {
log.ErrorByKv("Args err", "", "query", query)
w.Header().Set("Sec-Websocket-Version", "13")

View File

@ -20,6 +20,6 @@ func Init() {
}
func Run() {
//register mysqlConsumerHandler to
//go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
go persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(&persistentCH)
go historyCH.historyConsumerGroup.RegisterHandleAndConsumer(&historyCH)
}

View File

@ -7,12 +7,16 @@
package logic
import (
push "Open_IM/internal/push/jpush"
"Open_IM/pkg/common/config"
"Open_IM/pkg/common/constant"
"Open_IM/pkg/common/log"
"Open_IM/pkg/grpc-etcdv3/getcdv3"
pbPush "Open_IM/pkg/proto/push"
pbRelay "Open_IM/pkg/proto/relay"
"Open_IM/pkg/utils"
"context"
"encoding/json"
"strings"
)
@ -30,7 +34,7 @@ type AtContent struct {
func MsgToUser(pushMsg *pbPush.PushMsgReq) {
var wsResult []*pbRelay.SingleMsgToUser
//isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
isOfflinePush := utils.GetSwitchFromOptions(pushMsg.MsgData.Options, constant.IsOfflinePush)
log.Debug("Get msg from msg_transfer And push msg", pushMsg.OperationID, "PushData", pushMsg.String())
grpcCons := getcdv3.GetConn4Unique(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImOnlineMessageRelayName)
//Online push message
@ -48,66 +52,66 @@ func MsgToUser(pushMsg *pbPush.PushMsgReq) {
}
log.InfoByKv("push_result", pushMsg.OperationID, "result", wsResult, "sendData", pushMsg.MsgData)
count++
//if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
// for _, v := range wsResult {
// if v.ResultCode == 0 {
// continue
// }
// //supported terminal
// for _, t := range pushTerminal {
// if v.RecvPlatFormID == t {
// //Use offline push messaging
// var UIDList []string
// UIDList = append(UIDList, v.RecvID)
// customContent := OpenIMContent{
// SessionType: int(pushMsg.MsgData.SessionType),
// From: pushMsg.MsgData.SendID,
// To: pushMsg.MsgData.RecvID,
// Seq: pushMsg.MsgData.Seq,
// }
// bCustomContent, _ := json.Marshal(customContent)
// jsonCustomContent := string(bCustomContent)
// var content string
// if pushMsg.MsgData.OfflinePushInfo != nil {
// content = pushMsg.MsgData.OfflinePushInfo.Title
//
// } else {
// switch pushMsg.MsgData.ContentType {
// case constant.Text:
// content = constant.ContentType2PushContent[constant.Text]
// case constant.Picture:
// content = constant.ContentType2PushContent[constant.Picture]
// case constant.Voice:
// content = constant.ContentType2PushContent[constant.Voice]
// case constant.Video:
// content = constant.ContentType2PushContent[constant.Video]
// case constant.File:
// content = constant.ContentType2PushContent[constant.File]
// case constant.AtText:
// a := AtContent{}
// _ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a)
// if utils.IsContain(v.RecvID, a.AtUserList) {
// content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
// } else {
// content = constant.ContentType2PushContent[constant.GroupMsg]
// }
// default:
// content = constant.ContentType2PushContent[constant.Common]
// }
// }
//
// pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, constant.PlatformIDToName(t))
// if err != nil {
// log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error(), constant.PlatformIDToName(t))
// } else {
// log.NewDebug(pushMsg.OperationID, "offline push return result is ", string(pushResult), pushMsg.MsgData, constant.PlatformIDToName(t))
// }
//
// }
// }
// }
//
//}
if isOfflinePush && pushMsg.PushToUserID != pushMsg.MsgData.SendID {
for _, v := range wsResult {
if v.ResultCode == 0 {
continue
}
//supported terminal
for _, t := range pushTerminal {
if v.RecvPlatFormID == t {
//Use offline push messaging
var UIDList []string
UIDList = append(UIDList, v.RecvID)
customContent := OpenIMContent{
SessionType: int(pushMsg.MsgData.SessionType),
From: pushMsg.MsgData.SendID,
To: pushMsg.MsgData.RecvID,
Seq: pushMsg.MsgData.Seq,
}
bCustomContent, _ := json.Marshal(customContent)
jsonCustomContent := string(bCustomContent)
var content string
if pushMsg.MsgData.OfflinePushInfo != nil {
content = pushMsg.MsgData.OfflinePushInfo.Title
} else {
switch pushMsg.MsgData.ContentType {
case constant.Text:
content = constant.ContentType2PushContent[constant.Text]
case constant.Picture:
content = constant.ContentType2PushContent[constant.Picture]
case constant.Voice:
content = constant.ContentType2PushContent[constant.Voice]
case constant.Video:
content = constant.ContentType2PushContent[constant.Video]
case constant.File:
content = constant.ContentType2PushContent[constant.File]
case constant.AtText:
a := AtContent{}
_ = utils.JsonStringToStruct(string(pushMsg.MsgData.Content), &a)
if utils.IsContain(v.RecvID, a.AtUserList) {
content = constant.ContentType2PushContent[constant.AtText] + constant.ContentType2PushContent[constant.Common]
} else {
content = constant.ContentType2PushContent[constant.GroupMsg]
}
default:
content = constant.ContentType2PushContent[constant.Common]
}
}
pushResult, err := push.JGAccountListPush(UIDList, content, jsonCustomContent, constant.PlatformIDToName(t))
if err != nil {
log.NewError(pushMsg.OperationID, "offline push error", pushMsg.String(), err.Error(), constant.PlatformIDToName(t))
} else {
log.NewDebug(pushMsg.OperationID, "offline push return result is ", string(pushResult), pushMsg.MsgData, constant.PlatformIDToName(t))
}
}
}
}
}
}
//func SendMsgByWS(m *pbChat.WSToMsgSvrChatMsg) {

View File

@ -1,7 +1,6 @@
package statistics
import (
"Open_IM/pkg/common/log"
"time"
)
@ -15,13 +14,13 @@ type Statistics struct {
func (s *Statistics) output() {
t := time.NewTicker(time.Duration(s.SleepTime) * time.Second)
defer t.Stop()
var sum uint64
//var sum uint64
for {
sum = *s.Count
//sum = *s.Count
select {
case <-t.C:
}
log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, *s.Count-sum, "total:", *s.Count)
//log.NewWarn("", " system stat ", s.ModuleName, s.PrintArgs, *s.Count-sum, "total:", *s.Count)
}
}