ws support same terminal login

This commit is contained in:
Gordon 2023-03-02 18:47:11 +08:00
parent a64e5a776a
commit 7e48d1a00f
2 changed files with 226 additions and 211 deletions

View File

@ -101,47 +101,48 @@ func (r *RPCServer) run() {
} }
} }
func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgReq) (*pbRelay.OnlinePushMsgResp, error) { func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgReq) (*pbRelay.OnlinePushMsgResp, error) {
log.NewInfo(in.OperationID, "PushMsgToUser is arriving", in.String()) //log.NewInfo(in.OperationID, "PushMsgToUser is arriving", in.String())
var resp []*pbRelay.SingleMsgToUserPlatform //var resp []*pbRelay.SingleMsgToUserPlatform
msgBytes, _ := proto.Marshal(in.MsgData) //msgBytes, _ := proto.Marshal(in.MsgData)
mReply := Resp{ //mReply := Resp{
ReqIdentifier: constant.WSPushMsg, // ReqIdentifier: constant.WSPushMsg,
OperationID: in.OperationID, // OperationID: in.OperationID,
Data: msgBytes, // Data: msgBytes,
} //}
var replyBytes bytes.Buffer //var replyBytes bytes.Buffer
enc := gob.NewEncoder(&replyBytes) //enc := gob.NewEncoder(&replyBytes)
err := enc.Encode(mReply) //err := enc.Encode(mReply)
if err != nil { //if err != nil {
log.NewError(in.OperationID, "data encode err", err.Error()) // log.NewError(in.OperationID, "data encode err", err.Error())
} //}
var tag bool //var tag bool
recvID := in.PushToUserID //recvID := in.PushToUserID
for _, v := range r.platformList { //for _, v := range r.platformList {
if conn := ws.getUserConn(recvID, v); conn != nil { // if conn := ws.getUserConn(recvID, v); conn != nil {
tag = true // tag = true
resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, v, recvID) // resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, v, recvID)
temp := &pbRelay.SingleMsgToUserPlatform{ // temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: resultCode, // ResultCode: resultCode,
RecvID: recvID, // RecvID: recvID,
RecvPlatFormID: int32(v), // RecvPlatFormID: int32(v),
} // }
resp = append(resp, temp) // resp = append(resp, temp)
} else { // } else {
temp := &pbRelay.SingleMsgToUserPlatform{ // temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: -1, // ResultCode: -1,
RecvID: recvID, // RecvID: recvID,
RecvPlatFormID: int32(v), // RecvPlatFormID: int32(v),
} // }
resp = append(resp, temp) // resp = append(resp, temp)
} // }
} //}
if !tag { //if !tag {
log.NewDebug(in.OperationID, "push err ,no matched ws conn not in map", in.String()) // log.NewDebug(in.OperationID, "push err ,no matched ws conn not in map", in.String())
} //}
return &pbRelay.OnlinePushMsgResp{ //return &pbRelay.OnlinePushMsgResp{
Resp: resp, // Resp: resp,
}, nil //}, nil
return nil, nil
} }
func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUsersOnlineStatusReq) (*pbRelay.GetUsersOnlineStatusResp, error) { func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUsersOnlineStatusReq) (*pbRelay.GetUsersOnlineStatusResp, error) {
log.NewInfo(req.OperationID, "rpc GetUsersOnlineStatus arrived server", req.String()) log.NewInfo(req.OperationID, "rpc GetUsersOnlineStatus arrived server", req.String())
@ -154,13 +155,13 @@ func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUser
temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult) temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult)
temp.UserID = userID temp.UserID = userID
userConnMap := ws.getUserAllCons(userID) userConnMap := ws.getUserAllCons(userID)
for platform, userConn := range userConnMap { for platform, userConns := range userConnMap {
if userConn != nil { if len(userConns) != 0 {
ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail) ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail)
ps.Platform = constant.PlatformIDToName(platform) ps.Platform = constant.PlatformIDToName(platform)
ps.Status = constant.OnlineStatus ps.Status = constant.OnlineStatus
ps.ConnID = userConn.connID ps.ConnID = userConns[0].connID
ps.IsBackground = userConn.IsBackground ps.IsBackground = userConns[0].IsBackground
temp.Status = constant.OnlineStatus temp.Status = constant.OnlineStatus
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps) temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)
} }
@ -196,25 +197,28 @@ func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRe
UserID: v, UserID: v,
} }
userConnMap := ws.getUserAllCons(v) userConnMap := ws.getUserAllCons(v)
for platform, userConn := range userConnMap { for platform, userConns := range userConnMap {
if userConn != nil { if len(userConns) != 0 {
temp := &pbRelay.SingleMsgToUserPlatform{ for _, userConn := range userConns {
RecvID: v, temp := &pbRelay.SingleMsgToUserPlatform{
RecvPlatFormID: int32(platform), RecvID: v,
} RecvPlatFormID: int32(platform),
if !userConn.IsBackground { }
resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) if !userConn.IsBackground {
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
tempT.OnlinePush = true if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter) tempT.OnlinePush = true
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter)
temp.ResultCode = resultCode log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp.ResultCode = resultCode
resp = append(resp, temp)
}
} else {
temp.ResultCode = -2
resp = append(resp, temp) resp = append(resp, temp)
} }
} else {
temp.ResultCode = -2
resp = append(resp, temp)
} }
} }
} }
tempT.Resp = resp tempT.Resp = resp
@ -247,22 +251,28 @@ func (r *RPCServer) SuperGroupBackgroundOnlinePush(_ context.Context, req *pbRel
UserID: v, UserID: v,
} }
userConnMap := ws.getUserAllCons(v) userConnMap := ws.getUserAllCons(v)
for platform, userConn := range userConnMap { for platform, userConns := range userConnMap {
if userConn != nil && userConn.IsBackground { if len(userConns) != 0 {
temp := &pbRelay.SingleMsgToUserPlatform{ for _, userConn := range userConns {
RecvID: v, temp := &pbRelay.SingleMsgToUserPlatform{
RecvPlatFormID: int32(platform), RecvID: v,
} RecvPlatFormID: int32(platform),
if constant.PlatformIDToClass(int(userConn.PlatformID)) == constant.TerminalPC || userConn.PlatformID == constant.WebPlatformID { }
resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) if !userConn.IsBackground {
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v)
tempT.OnlinePush = true if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter) tempT.OnlinePush = true
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter)
temp.ResultCode = resultCode log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp.ResultCode = resultCode
resp = append(resp, temp)
}
} else {
temp.ResultCode = -2
resp = append(resp, temp) resp = append(resp, temp)
} }
} }
} }
} }
tempT.Resp = resp tempT.Resp = resp
@ -274,76 +284,77 @@ func (r *RPCServer) SuperGroupBackgroundOnlinePush(_ context.Context, req *pbRel
}, nil }, nil
} }
func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) { func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) {
log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String()) //log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String())
var singleUserResult []*pbRelay.SingelMsgToUserResultList //var singleUserResult []*pbRelay.SingelMsgToUserResultList
//
for _, v := range req.PushToUserIDList { //for _, v := range req.PushToUserIDList {
var resp []*pbRelay.SingleMsgToUserPlatform // var resp []*pbRelay.SingleMsgToUserPlatform
tempT := &pbRelay.SingelMsgToUserResultList{ // tempT := &pbRelay.SingelMsgToUserResultList{
UserID: v, // UserID: v,
} // }
userConnMap := ws.getUserAllCons(v) // userConnMap := ws.getUserAllCons(v)
var platformList []int // var platformList []int
for k, _ := range userConnMap { // for k, _ := range userConnMap {
platformList = append(platformList, k) // platformList = append(platformList, k)
} // }
log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms begin", req.MsgData.Seq, v, platformList, req.MsgData.String()) // log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms begin", req.MsgData.Seq, v, platformList, req.MsgData.String())
needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList) // needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList)
log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms end", req.MsgData.Seq, v, platformList, len(needPushMapList)) // log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms end", req.MsgData.Seq, v, platformList, len(needPushMapList))
for platform, list := range needPushMapList { // for platform, list := range needPushMapList {
if list != nil { // if list != nil {
log.Debug(req.OperationID, "needPushMapList ", "userID: ", v, "platform: ", platform, "push msg num:") // log.Debug(req.OperationID, "needPushMapList ", "userID: ", v, "platform: ", platform, "push msg num:")
//for _, v := range list { // //for _, v := range list {
// log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList), v.String()) // // log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList), v.String())
// req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) // // req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v)
// log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList)) // // log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList))
//} // //}
msgBytes, err := proto.Marshal(list) // msgBytes, err := proto.Marshal(list)
if err != nil { // if err != nil {
log.Error(req.OperationID, "proto marshal err", err.Error()) // log.Error(req.OperationID, "proto marshal err", err.Error())
continue // continue
} // }
req.MsgData.MsgDataList = msgBytes // req.MsgData.MsgDataList = msgBytes
//req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) // //req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v)
log.Debug(req.OperationID, "r.encodeWsData no string") // log.Debug(req.OperationID, "r.encodeWsData no string")
//log.Debug(req.OperationID, "r.encodeWsData data0 list ", req.MsgData.MsgDataList[0].String()) // //log.Debug(req.OperationID, "r.encodeWsData data0 list ", req.MsgData.MsgDataList[0].String())
//
log.Debug(req.OperationID, "r.encodeWsData ", req.MsgData.String()) // log.Debug(req.OperationID, "r.encodeWsData ", req.MsgData.String())
replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID) // replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID)
if err != nil { // if err != nil {
log.Error(req.OperationID, "encodeWsData failed ", req.MsgData.String()) // log.Error(req.OperationID, "encodeWsData failed ", req.MsgData.String())
continue // continue
} // }
log.Debug(req.OperationID, "encodeWsData", "len: ", replyBytes.Len()) // log.Debug(req.OperationID, "encodeWsData", "len: ", replyBytes.Len())
resultCode := sendMsgBatchToUser(userConnMap[platform], replyBytes.Bytes(), req, platform, v) // resultCode := sendMsgBatchToUser(userConnMap[platform], replyBytes.Bytes(), req, platform, v)
if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { // if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true // tempT.OnlinePush = true
log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recv PlatForm", constant.PlatformIDToName(platform), "recvID", v) // log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recv PlatForm", constant.PlatformIDToName(platform), "recvID", v)
temp := &pbRelay.SingleMsgToUserPlatform{ // temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: resultCode, // ResultCode: resultCode,
RecvID: v, // RecvID: v,
RecvPlatFormID: int32(platform), // RecvPlatFormID: int32(platform),
} // }
resp = append(resp, temp) // resp = append(resp, temp)
} // }
} else { // } else {
if utils.IsContainInt(platform, r.pushTerminal) { // if utils.IsContainInt(platform, r.pushTerminal) {
tempT.OnlinePush = true // tempT.OnlinePush = true
temp := &pbRelay.SingleMsgToUserPlatform{ // temp := &pbRelay.SingleMsgToUserPlatform{
ResultCode: 0, // ResultCode: 0,
RecvID: v, // RecvID: v,
RecvPlatFormID: int32(platform), // RecvPlatFormID: int32(platform),
} // }
resp = append(resp, temp) // resp = append(resp, temp)
} // }
} // }
} // }
tempT.Resp = resp // tempT.Resp = resp
singleUserResult = append(singleUserResult, tempT) // singleUserResult = append(singleUserResult, tempT)
} //}
return &pbRelay.OnlineBatchPushOneMsgResp{ //return &pbRelay.OnlineBatchPushOneMsgResp{
SinglePushResult: singleUserResult, // SinglePushResult: singleUserResult,
}, nil //}, nil
return nil, nil
} }
func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (bytes.Buffer, error) { func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (bytes.Buffer, error) {
log.Debug(operationID, "encodeWsData begin", wsData.String()) log.Debug(operationID, "encodeWsData begin", wsData.String())
@ -374,10 +385,11 @@ func (r *RPCServer) KickUserOffline(_ context.Context, req *pbRelay.KickUserOffl
log.NewWarn(req.OperationID, "SetTokenKicked ", v, req.PlatformID, req.OperationID) log.NewWarn(req.OperationID, "SetTokenKicked ", v, req.PlatformID, req.OperationID)
SetTokenKicked(v, int(req.PlatformID), req.OperationID) SetTokenKicked(v, int(req.PlatformID), req.OperationID)
oldConnMap := ws.getUserAllCons(v) oldConnMap := ws.getUserAllCons(v)
if conn, ok := oldConnMap[int(req.PlatformID)]; ok { // user->map[platform->conn] if conns, ok := oldConnMap[int(req.PlatformID)]; ok { // user->map[platform->conn]
log.NewWarn(req.OperationID, "send kick msg, close connection ", req.PlatformID, v) log.NewWarn(req.OperationID, "send kick msg, close connection ", req.PlatformID, v)
ws.sendKickMsg(conn, req.OperationID) for _, conn := range conns {
conn.Close() ws.sendKickMsg(conn, req.OperationID)
}
} }
} }
return &pbRelay.KickUserOfflineResp{}, nil return &pbRelay.KickUserOfflineResp{}, nil

View File

@ -45,15 +45,13 @@ type WServer struct {
wsAddr string wsAddr string
wsMaxConnNum int wsMaxConnNum int
wsUpGrader *websocket.Upgrader wsUpGrader *websocket.Upgrader
wsConnToUser map[*UserConn]map[int]string wsUserToConn map[string]map[int][]*UserConn
wsUserToConn map[string]map[int]*UserConn
} }
func (ws *WServer) onInit(wsPort int) { func (ws *WServer) onInit(wsPort int) {
ws.wsAddr = ":" + utils.IntToString(wsPort) ws.wsAddr = ":" + utils.IntToString(wsPort)
ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum
ws.wsConnToUser = make(map[*UserConn]map[int]string) ws.wsUserToConn = make(map[string]map[int][]*UserConn)
ws.wsUserToConn = make(map[string]map[int]*UserConn)
ws.wsUpGrader = &websocket.Upgrader{ ws.wsUpGrader = &websocket.Upgrader{
HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second, HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second,
ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen, ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen,
@ -203,8 +201,11 @@ func (ws *WServer) MultiTerminalLoginCheckerWithLock(uid string, platformID int,
fallthrough fallthrough
case constant.AllLoginButSameTermKick: case constant.AllLoginButSameTermKick:
if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn] if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn]
if oldConn, ok := oldConnMap[platformID]; ok { if oldConns, ok := oldConnMap[platformID]; ok {
log.NewDebug(operationID, uid, platformID, "kick old conn") log.NewDebug(operationID, uid, platformID, "kick old conn")
for _, conn := range oldConns {
ws.sendKickMsg(conn, operationID)
}
m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID)) m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID))
if err != nil && err != go_redis.Nil { if err != nil && err != go_redis.Nil {
log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID)) log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID))
@ -227,16 +228,12 @@ func (ws *WServer) MultiTerminalLoginCheckerWithLock(uid string, platformID int,
log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m) log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m)
return return
} }
err = oldConn.Close()
//delete(oldConnMap, platformID) delete(oldConnMap, platformID)
ws.wsUserToConn[uid] = oldConnMap ws.wsUserToConn[uid] = oldConnMap
if len(oldConnMap) == 0 { if len(oldConnMap) == 0 {
delete(ws.wsUserToConn, uid) delete(ws.wsUserToConn, uid)
} }
delete(ws.wsConnToUser, oldConn)
if err != nil {
log.NewError(operationID, "conn close err", err.Error(), uid, platformID)
}
} else { } else {
log.NewWarn(operationID, "abnormal uid-conn ", uid, platformID, oldConnMap[platformID]) log.NewWarn(operationID, "abnormal uid-conn ", uid, platformID, oldConnMap[platformID])
} }
@ -259,9 +256,11 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn
fallthrough fallthrough
case constant.AllLoginButSameTermKick: case constant.AllLoginButSameTermKick:
if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn] if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn]
if oldConn, ok := oldConnMap[platformID]; ok { if oldConns, ok := oldConnMap[platformID]; ok {
log.NewDebug(operationID, uid, platformID, "kick old conn") log.NewDebug(operationID, uid, platformID, "kick old conn")
ws.sendKickMsg(oldConn, operationID) for _, conn := range oldConns {
ws.sendKickMsg(conn, operationID)
}
m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID)) m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID))
if err != nil && err != go_redis.Nil { if err != nil && err != go_redis.Nil {
log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID)) log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID))
@ -284,16 +283,11 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn
log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m) log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m)
return return
} }
err = oldConn.Close()
delete(oldConnMap, platformID) delete(oldConnMap, platformID)
ws.wsUserToConn[uid] = oldConnMap ws.wsUserToConn[uid] = oldConnMap
if len(oldConnMap) == 0 { if len(oldConnMap) == 0 {
delete(ws.wsUserToConn, uid) delete(ws.wsUserToConn, uid)
} }
delete(ws.wsConnToUser, oldConn)
if err != nil {
log.NewError(operationID, "conn close err", err.Error(), uid, platformID)
}
callbackResp := callbackUserKickOff(operationID, uid, platformID) callbackResp := callbackUserKickOff(operationID, uid, platformID)
if callbackResp.ErrCode != 0 { if callbackResp.ErrCode != 0 {
log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp)
@ -328,6 +322,11 @@ func (ws *WServer) sendKickMsg(oldConn *UserConn, operationID string) {
if err != nil { if err != nil {
log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "sendKickMsg WS WriteMsg error", oldConn.RemoteAddr().String(), err.Error()) log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "sendKickMsg WS WriteMsg error", oldConn.RemoteAddr().String(), err.Error())
} }
errClose := oldConn.Close()
if errClose != nil {
log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "close old conn error", oldConn.RemoteAddr().String(), err.Error())
}
} }
func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, connID, operationID string) { func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, connID, operationID string) {
@ -341,23 +340,24 @@ func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token
go ws.MultiTerminalLoginRemoteChecker(uid, int32(platformID), token, operationID) go ws.MultiTerminalLoginRemoteChecker(uid, int32(platformID), token, operationID)
ws.MultiTerminalLoginChecker(uid, platformID, conn, token, operationID) ws.MultiTerminalLoginChecker(uid, platformID, conn, token, operationID)
if oldConnMap, ok := ws.wsUserToConn[uid]; ok { if oldConnMap, ok := ws.wsUserToConn[uid]; ok {
oldConnMap[platformID] = conn if conns, ok := oldConnMap[platformID]; ok {
conns = append(conns, conn)
oldConnMap[platformID] = conns
} else {
conns := make([]*UserConn, 2)
conns = append(conns, conn)
oldConnMap[platformID] = conns
}
ws.wsUserToConn[uid] = oldConnMap ws.wsUserToConn[uid] = oldConnMap
log.Debug(operationID, "user not first come in, add conn ", uid, platformID, conn, oldConnMap) log.Debug(operationID, "user not first come in, add conn ", uid, platformID, conn, oldConnMap)
} else { } else {
i := make(map[int]*UserConn) i := make(map[int][]*UserConn)
i[platformID] = conn conns := make([]*UserConn, 2)
conns = append(conns, conn)
i[platformID] = conns
ws.wsUserToConn[uid] = i ws.wsUserToConn[uid] = i
log.Debug(operationID, "user first come in, new user, conn", uid, platformID, conn, ws.wsUserToConn[uid]) log.Debug(operationID, "user first come in, new user, conn", uid, platformID, conn, ws.wsUserToConn[uid])
} }
if oldStringMap, ok := ws.wsConnToUser[conn]; ok {
oldStringMap[platformID] = uid
ws.wsConnToUser[conn] = oldStringMap
} else {
i := make(map[int]string)
i[platformID] = uid
ws.wsConnToUser[conn] = i
}
count := 0 count := 0
for _, v := range ws.wsUserToConn { for _, v := range ws.wsUserToConn {
count = count + len(v) count = count + len(v)
@ -370,36 +370,39 @@ func (ws *WServer) delUserConn(conn *UserConn) {
rwLock.Lock() rwLock.Lock()
defer rwLock.Unlock() defer rwLock.Unlock()
operationID := utils.OperationIDGenerator() operationID := utils.OperationIDGenerator()
var uid string platform := int(conn.PlatformID)
var platform int
if oldStringMap, okg := ws.wsConnToUser[conn]; okg { if oldConnMap, ok := ws.wsUserToConn[conn.userID]; ok { // only recycle self conn
for k, v := range oldStringMap { if oldconns, okMap := oldConnMap[platform]; okMap {
platform = k var flag bool
uid = v a := make([]*UserConn, 2)
} for _, client := range oldconns {
if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // only recycle self conn if client != conn {
if oldconn, okMap := oldConnMap[platform]; okMap { a = append(a, client)
if oldconn == conn { flag = true
delete(oldConnMap, platform)
} }
} }
ws.wsUserToConn[uid] = oldConnMap if flag {
if len(oldConnMap) == 0 { oldConnMap[platform] = a
delete(ws.wsUserToConn, uid) } else {
delete(oldConnMap, platform)
} }
count := 0
for _, v := range ws.wsUserToConn {
count = count + len(v)
}
log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
} else {
log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn))
} }
delete(ws.wsConnToUser, conn) ws.wsUserToConn[conn.userID] = oldConnMap
if len(oldConnMap) == 0 {
delete(ws.wsUserToConn, conn.userID)
}
count := 0
for _, v := range ws.wsUserToConn {
count = count + len(v)
}
log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", conn.userID, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn), "online_conn_num", count)
} }
err := conn.Close() err := conn.Close()
if err != nil { if err != nil {
log.Error(operationID, " close err", "", "uid", uid, "platform", platform) log.Error(operationID, " close err", "", "uid", conn.userID, "platform", platform)
} }
if conn.PlatformID == 0 || conn.connID == "" { if conn.PlatformID == 0 || conn.connID == "" {
log.NewWarn(operationID, utils.GetSelfFuncName(), "PlatformID or connID is null", conn.PlatformID, conn.connID) log.NewWarn(operationID, utils.GetSelfFuncName(), "PlatformID or connID is null", conn.PlatformID, conn.connID)
@ -412,21 +415,21 @@ func (ws *WServer) delUserConn(conn *UserConn) {
} }
func (ws *WServer) getUserConn(uid string, platform int) *UserConn { // func (ws *WServer) getUserConn(uid string, platform int) *UserConn {
// rwLock.RLock()
// defer rwLock.RUnlock()
// if connMap, ok := ws.wsUserToConn[uid]; ok {
// if conn, flag := connMap[platform]; flag {
// return conn
// }
// }
// return nil
// }
func (ws *WServer) getUserAllCons(uid string) map[int][]*UserConn {
rwLock.RLock() rwLock.RLock()
defer rwLock.RUnlock() defer rwLock.RUnlock()
if connMap, ok := ws.wsUserToConn[uid]; ok { if connMap, ok := ws.wsUserToConn[uid]; ok {
if conn, flag := connMap[platform]; flag { newConnMap := make(map[int][]*UserConn)
return conn
}
}
return nil
}
func (ws *WServer) getUserAllCons(uid string) map[int]*UserConn {
rwLock.RLock()
defer rwLock.RUnlock()
if connMap, ok := ws.wsUserToConn[uid]; ok {
newConnMap := make(map[int]*UserConn)
for k, v := range connMap { for k, v := range connMap {
newConnMap[k] = v newConnMap[k] = v
} }