This commit is contained in:
wangchuxiao 2022-11-28 19:09:38 +08:00
parent cf8dc4e529
commit 1be16380da
11 changed files with 1168 additions and 1522 deletions

View File

@ -195,7 +195,7 @@ func checkMaxSeqWithMongo(operationID, ID string, diffusionType int) error {
if math.Abs(float64(seqMongo-uint32(seqRedis))) > 10 { if math.Abs(float64(seqMongo-uint32(seqRedis))) > 10 {
log.NewWarn(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", seqMongo, seqRedis, ID, "redis maxSeq is different with msg.Seq > 10", "status: ", msgPb.Status, msg.SendTime) log.NewWarn(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", seqMongo, seqRedis, ID, "redis maxSeq is different with msg.Seq > 10", "status: ", msgPb.Status, msg.SendTime)
} else { } else {
log.NewInfo(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", seqMongo, seqRedis, ID, "seq and msg OK", "status: ", msgPb.Status, msg.SendTime) log.NewInfo(operationID, utils.GetSelfFuncName(), "seqMongo, seqRedis", seqMongo, seqRedis, ID, "seq and msg OK", "status:", msgPb.Status, msg.SendTime)
} }
return nil return nil
} }

View File

@ -65,6 +65,9 @@ func (ws *WServer) msgParse(conn *UserConn, binaryMsg []byte) {
case constant.WsLogoutMsg: case constant.WsLogoutMsg:
log.NewInfo(m.OperationID, "conn.Close()", m.SendID, m.MsgIncr, m.ReqIdentifier) log.NewInfo(m.OperationID, "conn.Close()", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.userLogoutReq(conn, &m) ws.userLogoutReq(conn, &m)
case constant.WsSetBackgroundStatus:
log.NewInfo(m.OperationID, "WsSetBackgroundStatus", m.SendID, m.MsgIncr, m.ReqIdentifier)
ws.setUserDeviceBackground(conn, &m)
default: default:
log.Error(m.OperationID, "ReqIdentifier failed ", m.SendID, m.MsgIncr, m.ReqIdentifier) log.Error(m.OperationID, "ReqIdentifier failed ", m.SendID, m.MsgIncr, m.ReqIdentifier)
} }
@ -394,3 +397,26 @@ func SetTokenKicked(userID string, platformID int, operationID string) {
return return
} }
} }
func (ws *WServer) setUserDeviceBackground(conn *UserConn, m *Req) {
isPass, errCode, errMsg, pData := ws.argsValidate(m, constant.WsSetBackgroundStatus, m.OperationID)
if isPass {
req := pData.(*sdk_ws.SetAppBackgroundStatusReq)
conn.IsBackground = req.IsBackground
log.NewInfo(m.OperationID, "SetUserDeviceBackground", "success", *conn, req.IsBackground)
ws.setUserDeviceBackgroundResp(conn, m, 0, "")
}
ws.setUserDeviceBackgroundResp(conn, m, errCode, errMsg)
}
func (ws *WServer) setUserDeviceBackgroundResp(conn *UserConn, m *Req, errCode int32, errMsg string) {
mReply := Resp{
ReqIdentifier: m.ReqIdentifier,
MsgIncr: m.MsgIncr,
OperationID: m.OperationID,
ErrCode: errCode,
ErrMsg: errMsg,
}
ws.sendMsg(conn, mReply)
_ = conn.Close()
}

View File

@ -196,24 +196,27 @@ func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRe
userConnMap := ws.getUserAllCons(v) userConnMap := ws.getUserAllCons(v)
for platform, userConn := range userConnMap { for platform, userConn := range userConnMap {
if userConn != nil { if userConn != nil {
resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) temp := &pbRelay.SingleMsgToUserPlatform{
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { RecvID: v,
tempT.OnlinePush = true RecvPlatFormID: int32(platform),
promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter) }
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) if !userConn.IsBackground {
temp := &pbRelay.SingleMsgToUserPlatform{ resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
ResultCode: resultCode, if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
RecvID: v, tempT.OnlinePush = true
RecvPlatFormID: int32(platform), promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter)
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp.ResultCode = resultCode
resp = append(resp, temp)
} }
} else {
temp.ResultCode = -2
resp = append(resp, temp) resp = append(resp, temp)
} }
} }
} }
tempT.Resp = resp tempT.Resp = resp
singleUserResult = append(singleUserResult, tempT) singleUserResult = append(singleUserResult, tempT)
} }
return &pbRelay.OnlineBatchPushOneMsgResp{ return &pbRelay.OnlineBatchPushOneMsgResp{

View File

@ -105,6 +105,18 @@ func (ws *WServer) argsValidate(m *Req, r int32, operationID string) (isPass boo
log.Error(operationID, "data args validate err", err.Error(), r) log.Error(operationID, "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil return false, 204, err.Error(), nil
}
return true, 0, "", data
case constant.WsSetBackgroundStatus:
data := open_im_sdk.SetAppBackgroundStatusReq{}
if err := proto.Unmarshal(m.Data, &data); err != nil {
log.Error(operationID, "Decode Data struct err", err.Error(), r)
return false, 203, err.Error(), nil
}
if err := validate.Struct(data); err != nil {
log.Error(operationID, "data args validate err", err.Error(), r)
return false, 204, err.Error(), nil
} }
return true, 0, "", data return true, 0, "", data
default: default:

View File

@ -35,7 +35,9 @@ type UserConn struct {
PushedMaxSeq uint32 PushedMaxSeq uint32
IsCompress bool IsCompress bool
userID string userID string
IsBackground bool
} }
type WServer struct { type WServer struct {
wsAddr string wsAddr string
wsMaxConnNum int wsMaxConnNum int
@ -84,7 +86,7 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) {
log.NewDebug(operationID, query["sendID"][0], "enable compression") log.NewDebug(operationID, query["sendID"][0], "enable compression")
isCompress = true isCompress = true
} }
newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0, isCompress, query["sendID"][0]} newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0, isCompress, query["sendID"][0], false}
userCount++ userCount++
ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], operationID) ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], operationID)
go ws.readMsg(newConn) go ws.readMsg(newConn)

View File

@ -17,14 +17,15 @@ const (
RefuseFriendFlag = -1 RefuseFriendFlag = -1
//Websocket Protocol //Websocket Protocol
WSGetNewestSeq = 1001 WSGetNewestSeq = 1001
WSPullMsgBySeqList = 1002 WSPullMsgBySeqList = 1002
WSSendMsg = 1003 WSSendMsg = 1003
WSSendSignalMsg = 1004 WSSendSignalMsg = 1004
WSPushMsg = 2001 WSPushMsg = 2001
WSKickOnlineMsg = 2002 WSKickOnlineMsg = 2002
WsLogoutMsg = 2003 WsLogoutMsg = 2003
WSDataError = 3001 WsSetBackgroundStatus = 2004
WSDataError = 3001
///ContentType ///ContentType
//UserRelated //UserRelated

View File

@ -33,6 +33,7 @@ var (
ErrSendLimit = ErrInfo{ErrCode: 810, ErrMsg: "send msg limit, to many request, try again later"} ErrSendLimit = ErrInfo{ErrCode: 810, ErrMsg: "send msg limit, to many request, try again later"}
ErrMessageHasReadDisable = ErrInfo{ErrCode: 811, ErrMsg: "message has read disable"} ErrMessageHasReadDisable = ErrInfo{ErrCode: 811, ErrMsg: "message has read disable"}
ErrInternal = ErrInfo{ErrCode: 812, ErrMsg: "internal error"} ErrInternal = ErrInfo{ErrCode: 812, ErrMsg: "internal error"}
ErrWsConnNotExist = ErrInfo{ErrCode: 813, ErrMsg: "ws conn not exist"}
) )
var ( var (

File diff suppressed because it is too large Load Diff

View File

@ -89,7 +89,6 @@ message MultiTerminalLoginCheckResp{
string errMsg = 2; string errMsg = 2;
} }
service relay { service relay {
rpc OnlinePushMsg(OnlinePushMsgReq) returns(OnlinePushMsgResp); rpc OnlinePushMsg(OnlinePushMsgReq) returns(OnlinePushMsgResp);
rpc GetUsersOnlineStatus(GetUsersOnlineStatusReq) returns(GetUsersOnlineStatusResp); rpc GetUsersOnlineStatus(GetUsersOnlineStatusReq) returns(GetUsersOnlineStatusResp);

File diff suppressed because it is too large Load Diff

View File

@ -696,4 +696,12 @@ message DelMsgListResp{
string errMsg = 2; string errMsg = 2;
} }
message SetAppBackgroundStatusReq {
string userID = 1;
bool isBackground = 2;
}
message SetAppBackgroundStatusResp {
int32 errCode = 1;
string errMsg = 2;
}