From 7e48d1a00f3f7a7cc7b2d911db3b8d222d3c75a9 Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 2 Mar 2023 18:47:11 +0800 Subject: [PATCH] ws support same terminal login --- internal/msg_gateway/gate/relay_rpc_server.go | 306 +++++++++--------- internal/msg_gateway/gate/ws_server.go | 131 ++++---- 2 files changed, 226 insertions(+), 211 deletions(-) diff --git a/internal/msg_gateway/gate/relay_rpc_server.go b/internal/msg_gateway/gate/relay_rpc_server.go index 89a63a293..8e7589ab8 100644 --- a/internal/msg_gateway/gate/relay_rpc_server.go +++ b/internal/msg_gateway/gate/relay_rpc_server.go @@ -101,47 +101,48 @@ func (r *RPCServer) run() { } } func (r *RPCServer) OnlinePushMsg(_ context.Context, in *pbRelay.OnlinePushMsgReq) (*pbRelay.OnlinePushMsgResp, error) { - log.NewInfo(in.OperationID, "PushMsgToUser is arriving", in.String()) - var resp []*pbRelay.SingleMsgToUserPlatform - msgBytes, _ := proto.Marshal(in.MsgData) - mReply := Resp{ - ReqIdentifier: constant.WSPushMsg, - OperationID: in.OperationID, - Data: msgBytes, - } - var replyBytes bytes.Buffer - enc := gob.NewEncoder(&replyBytes) - err := enc.Encode(mReply) - if err != nil { - log.NewError(in.OperationID, "data encode err", err.Error()) - } - var tag bool - recvID := in.PushToUserID - for _, v := range r.platformList { - if conn := ws.getUserConn(recvID, v); conn != nil { - tag = true - resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, v, recvID) - temp := &pbRelay.SingleMsgToUserPlatform{ - ResultCode: resultCode, - RecvID: recvID, - RecvPlatFormID: int32(v), - } - resp = append(resp, temp) - } else { - temp := &pbRelay.SingleMsgToUserPlatform{ - ResultCode: -1, - RecvID: recvID, - RecvPlatFormID: int32(v), - } - resp = append(resp, temp) - } - } - if !tag { - log.NewDebug(in.OperationID, "push err ,no matched ws conn not in map", in.String()) - } - return &pbRelay.OnlinePushMsgResp{ - Resp: resp, - }, nil + //log.NewInfo(in.OperationID, "PushMsgToUser is arriving", in.String()) + //var resp []*pbRelay.SingleMsgToUserPlatform + //msgBytes, _ := proto.Marshal(in.MsgData) + //mReply := Resp{ + // ReqIdentifier: constant.WSPushMsg, + // OperationID: in.OperationID, + // Data: msgBytes, + //} + //var replyBytes bytes.Buffer + //enc := gob.NewEncoder(&replyBytes) + //err := enc.Encode(mReply) + //if err != nil { + // log.NewError(in.OperationID, "data encode err", err.Error()) + //} + //var tag bool + //recvID := in.PushToUserID + //for _, v := range r.platformList { + // if conn := ws.getUserConn(recvID, v); conn != nil { + // tag = true + // resultCode := sendMsgToUser(conn, replyBytes.Bytes(), in, v, recvID) + // temp := &pbRelay.SingleMsgToUserPlatform{ + // ResultCode: resultCode, + // RecvID: recvID, + // RecvPlatFormID: int32(v), + // } + // resp = append(resp, temp) + // } else { + // temp := &pbRelay.SingleMsgToUserPlatform{ + // ResultCode: -1, + // RecvID: recvID, + // RecvPlatFormID: int32(v), + // } + // resp = append(resp, temp) + // } + //} + //if !tag { + // log.NewDebug(in.OperationID, "push err ,no matched ws conn not in map", in.String()) + //} + //return &pbRelay.OnlinePushMsgResp{ + // Resp: resp, + //}, nil + return nil, nil } func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUsersOnlineStatusReq) (*pbRelay.GetUsersOnlineStatusResp, error) { log.NewInfo(req.OperationID, "rpc GetUsersOnlineStatus arrived server", req.String()) @@ -154,13 +155,13 @@ func (r *RPCServer) GetUsersOnlineStatus(_ context.Context, req *pbRelay.GetUser temp := new(pbRelay.GetUsersOnlineStatusResp_SuccessResult) temp.UserID = userID userConnMap := ws.getUserAllCons(userID) - for platform, userConn := range userConnMap { - if userConn != nil { + for platform, userConns := range userConnMap { + if len(userConns) != 0 { ps := new(pbRelay.GetUsersOnlineStatusResp_SuccessDetail) ps.Platform = constant.PlatformIDToName(platform) ps.Status = constant.OnlineStatus - ps.ConnID = userConn.connID - ps.IsBackground = userConn.IsBackground + ps.ConnID = userConns[0].connID + ps.IsBackground = userConns[0].IsBackground temp.Status = constant.OnlineStatus temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps) } @@ -196,25 +197,28 @@ func (r *RPCServer) SuperGroupOnlineBatchPushOneMsg(_ context.Context, req *pbRe UserID: v, } userConnMap := ws.getUserAllCons(v) - for platform, userConn := range userConnMap { - if userConn != nil { - temp := &pbRelay.SingleMsgToUserPlatform{ - RecvID: v, - RecvPlatFormID: int32(platform), - } - if !userConn.IsBackground { - resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) - if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { - tempT.OnlinePush = true - promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter) - log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) - temp.ResultCode = resultCode + for platform, userConns := range userConnMap { + if len(userConns) != 0 { + for _, userConn := range userConns { + temp := &pbRelay.SingleMsgToUserPlatform{ + RecvID: v, + RecvPlatFormID: int32(platform), + } + if !userConn.IsBackground { + resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) + if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { + tempT.OnlinePush = true + promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter) + log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) + temp.ResultCode = resultCode + resp = append(resp, temp) + } + } else { + temp.ResultCode = -2 resp = append(resp, temp) } - } else { - temp.ResultCode = -2 - resp = append(resp, temp) } + } } tempT.Resp = resp @@ -247,22 +251,28 @@ func (r *RPCServer) SuperGroupBackgroundOnlinePush(_ context.Context, req *pbRel UserID: v, } userConnMap := ws.getUserAllCons(v) - for platform, userConn := range userConnMap { - if userConn != nil && userConn.IsBackground { - temp := &pbRelay.SingleMsgToUserPlatform{ - RecvID: v, - RecvPlatFormID: int32(platform), - } - if constant.PlatformIDToClass(int(userConn.PlatformID)) == constant.TerminalPC || userConn.PlatformID == constant.WebPlatformID { - resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) - if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { - tempT.OnlinePush = true - promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter) - log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) - temp.ResultCode = resultCode + for platform, userConns := range userConnMap { + if len(userConns) != 0 { + for _, userConn := range userConns { + temp := &pbRelay.SingleMsgToUserPlatform{ + RecvID: v, + RecvPlatFormID: int32(platform), + } + if !userConn.IsBackground { + resultCode := sendMsgBatchToUser(userConn, replyBytes.Bytes(), req, platform, v) + if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { + tempT.OnlinePush = true + promePkg.PromeInc(promePkg.MsgOnlinePushSuccessCounter) + log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recvPlatForm", constant.PlatformIDToName(platform), "recvID", v) + temp.ResultCode = resultCode + resp = append(resp, temp) + } + } else { + temp.ResultCode = -2 resp = append(resp, temp) } } + } } tempT.Resp = resp @@ -274,76 +284,77 @@ func (r *RPCServer) SuperGroupBackgroundOnlinePush(_ context.Context, req *pbRel }, nil } func (r *RPCServer) OnlineBatchPushOneMsg(_ context.Context, req *pbRelay.OnlineBatchPushOneMsgReq) (*pbRelay.OnlineBatchPushOneMsgResp, error) { - log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String()) - var singleUserResult []*pbRelay.SingelMsgToUserResultList - - for _, v := range req.PushToUserIDList { - var resp []*pbRelay.SingleMsgToUserPlatform - tempT := &pbRelay.SingelMsgToUserResultList{ - UserID: v, - } - userConnMap := ws.getUserAllCons(v) - var platformList []int - for k, _ := range userConnMap { - platformList = append(platformList, k) - } - log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms begin", req.MsgData.Seq, v, platformList, req.MsgData.String()) - needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList) - log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms end", req.MsgData.Seq, v, platformList, len(needPushMapList)) - for platform, list := range needPushMapList { - if list != nil { - log.Debug(req.OperationID, "needPushMapList ", "userID: ", v, "platform: ", platform, "push msg num:") - //for _, v := range list { - // log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList), v.String()) - // req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) - // log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList)) - //} - msgBytes, err := proto.Marshal(list) - if err != nil { - log.Error(req.OperationID, "proto marshal err", err.Error()) - continue - } - req.MsgData.MsgDataList = msgBytes - //req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) - log.Debug(req.OperationID, "r.encodeWsData no string") - //log.Debug(req.OperationID, "r.encodeWsData data0 list ", req.MsgData.MsgDataList[0].String()) - - log.Debug(req.OperationID, "r.encodeWsData ", req.MsgData.String()) - replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID) - if err != nil { - log.Error(req.OperationID, "encodeWsData failed ", req.MsgData.String()) - continue - } - log.Debug(req.OperationID, "encodeWsData", "len: ", replyBytes.Len()) - resultCode := sendMsgBatchToUser(userConnMap[platform], replyBytes.Bytes(), req, platform, v) - if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { - tempT.OnlinePush = true - log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recv PlatForm", constant.PlatformIDToName(platform), "recvID", v) - temp := &pbRelay.SingleMsgToUserPlatform{ - ResultCode: resultCode, - RecvID: v, - RecvPlatFormID: int32(platform), - } - resp = append(resp, temp) - } - } else { - if utils.IsContainInt(platform, r.pushTerminal) { - tempT.OnlinePush = true - temp := &pbRelay.SingleMsgToUserPlatform{ - ResultCode: 0, - RecvID: v, - RecvPlatFormID: int32(platform), - } - resp = append(resp, temp) - } - } - } - tempT.Resp = resp - singleUserResult = append(singleUserResult, tempT) - } - return &pbRelay.OnlineBatchPushOneMsgResp{ - SinglePushResult: singleUserResult, - }, nil + //log.NewInfo(req.OperationID, "BatchPushMsgToUser is arriving", req.String()) + //var singleUserResult []*pbRelay.SingelMsgToUserResultList + // + //for _, v := range req.PushToUserIDList { + // var resp []*pbRelay.SingleMsgToUserPlatform + // tempT := &pbRelay.SingelMsgToUserResultList{ + // UserID: v, + // } + // userConnMap := ws.getUserAllCons(v) + // var platformList []int + // for k, _ := range userConnMap { + // platformList = append(platformList, k) + // } + // log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms begin", req.MsgData.Seq, v, platformList, req.MsgData.String()) + // needPushMapList := r.GetSingleUserMsgForPushPlatforms(req.OperationID, req.MsgData, v, platformList) + // log.Debug(req.OperationID, "GetSingleUserMsgForPushPlatforms end", req.MsgData.Seq, v, platformList, len(needPushMapList)) + // for platform, list := range needPushMapList { + // if list != nil { + // log.Debug(req.OperationID, "needPushMapList ", "userID: ", v, "platform: ", platform, "push msg num:") + // //for _, v := range list { + // // log.Debug(req.OperationID, "req.MsgData.MsgDataList begin", "len: ", len(req.MsgData.MsgDataList), v.String()) + // // req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) + // // log.Debug(req.OperationID, "req.MsgData.MsgDataList end", "len: ", len(req.MsgData.MsgDataList)) + // //} + // msgBytes, err := proto.Marshal(list) + // if err != nil { + // log.Error(req.OperationID, "proto marshal err", err.Error()) + // continue + // } + // req.MsgData.MsgDataList = msgBytes + // //req.MsgData.MsgDataList = append(req.MsgData.MsgDataList, v) + // log.Debug(req.OperationID, "r.encodeWsData no string") + // //log.Debug(req.OperationID, "r.encodeWsData data0 list ", req.MsgData.MsgDataList[0].String()) + // + // log.Debug(req.OperationID, "r.encodeWsData ", req.MsgData.String()) + // replyBytes, err := r.encodeWsData(req.MsgData, req.OperationID) + // if err != nil { + // log.Error(req.OperationID, "encodeWsData failed ", req.MsgData.String()) + // continue + // } + // log.Debug(req.OperationID, "encodeWsData", "len: ", replyBytes.Len()) + // resultCode := sendMsgBatchToUser(userConnMap[platform], replyBytes.Bytes(), req, platform, v) + // if resultCode == 0 && utils.IsContainInt(platform, r.pushTerminal) { + // tempT.OnlinePush = true + // log.Info(req.OperationID, "PushSuperMsgToUser is success By Ws", "args", req.String(), "recv PlatForm", constant.PlatformIDToName(platform), "recvID", v) + // temp := &pbRelay.SingleMsgToUserPlatform{ + // ResultCode: resultCode, + // RecvID: v, + // RecvPlatFormID: int32(platform), + // } + // resp = append(resp, temp) + // } + // } else { + // if utils.IsContainInt(platform, r.pushTerminal) { + // tempT.OnlinePush = true + // temp := &pbRelay.SingleMsgToUserPlatform{ + // ResultCode: 0, + // RecvID: v, + // RecvPlatFormID: int32(platform), + // } + // resp = append(resp, temp) + // } + // } + // } + // tempT.Resp = resp + // singleUserResult = append(singleUserResult, tempT) + //} + //return &pbRelay.OnlineBatchPushOneMsgResp{ + // SinglePushResult: singleUserResult, + //}, nil + return nil, nil } func (r *RPCServer) encodeWsData(wsData *sdk_ws.MsgData, operationID string) (bytes.Buffer, error) { log.Debug(operationID, "encodeWsData begin", wsData.String()) @@ -374,10 +385,11 @@ func (r *RPCServer) KickUserOffline(_ context.Context, req *pbRelay.KickUserOffl log.NewWarn(req.OperationID, "SetTokenKicked ", v, req.PlatformID, req.OperationID) SetTokenKicked(v, int(req.PlatformID), req.OperationID) oldConnMap := ws.getUserAllCons(v) - if conn, ok := oldConnMap[int(req.PlatformID)]; ok { // user->map[platform->conn] + if conns, ok := oldConnMap[int(req.PlatformID)]; ok { // user->map[platform->conn] log.NewWarn(req.OperationID, "send kick msg, close connection ", req.PlatformID, v) - ws.sendKickMsg(conn, req.OperationID) - conn.Close() + for _, conn := range conns { + ws.sendKickMsg(conn, req.OperationID) + } } } return &pbRelay.KickUserOfflineResp{}, nil diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index 9025b287b..7ffe7b94b 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -45,15 +45,13 @@ type WServer struct { wsAddr string wsMaxConnNum int wsUpGrader *websocket.Upgrader - wsConnToUser map[*UserConn]map[int]string - wsUserToConn map[string]map[int]*UserConn + wsUserToConn map[string]map[int][]*UserConn } func (ws *WServer) onInit(wsPort int) { ws.wsAddr = ":" + utils.IntToString(wsPort) ws.wsMaxConnNum = config.Config.LongConnSvr.WebsocketMaxConnNum - ws.wsConnToUser = make(map[*UserConn]map[int]string) - ws.wsUserToConn = make(map[string]map[int]*UserConn) + ws.wsUserToConn = make(map[string]map[int][]*UserConn) ws.wsUpGrader = &websocket.Upgrader{ HandshakeTimeout: time.Duration(config.Config.LongConnSvr.WebsocketTimeOut) * time.Second, ReadBufferSize: config.Config.LongConnSvr.WebsocketMaxMsgLen, @@ -203,8 +201,11 @@ func (ws *WServer) MultiTerminalLoginCheckerWithLock(uid string, platformID int, fallthrough case constant.AllLoginButSameTermKick: if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn] - if oldConn, ok := oldConnMap[platformID]; ok { + if oldConns, ok := oldConnMap[platformID]; ok { log.NewDebug(operationID, uid, platformID, "kick old conn") + for _, conn := range oldConns { + ws.sendKickMsg(conn, operationID) + } m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID)) if err != nil && err != go_redis.Nil { log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID)) @@ -227,16 +228,12 @@ func (ws *WServer) MultiTerminalLoginCheckerWithLock(uid string, platformID int, log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m) return } - err = oldConn.Close() - //delete(oldConnMap, platformID) + + delete(oldConnMap, platformID) ws.wsUserToConn[uid] = oldConnMap if len(oldConnMap) == 0 { delete(ws.wsUserToConn, uid) } - delete(ws.wsConnToUser, oldConn) - if err != nil { - log.NewError(operationID, "conn close err", err.Error(), uid, platformID) - } } else { log.NewWarn(operationID, "abnormal uid-conn ", uid, platformID, oldConnMap[platformID]) } @@ -259,9 +256,11 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn fallthrough case constant.AllLoginButSameTermKick: if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // user->map[platform->conn] - if oldConn, ok := oldConnMap[platformID]; ok { + if oldConns, ok := oldConnMap[platformID]; ok { log.NewDebug(operationID, uid, platformID, "kick old conn") - ws.sendKickMsg(oldConn, operationID) + for _, conn := range oldConns { + ws.sendKickMsg(conn, operationID) + } m, err := db.DB.GetTokenMapByUidPid(uid, constant.PlatformIDToName(platformID)) if err != nil && err != go_redis.Nil { log.NewError(operationID, "get token from redis err", err.Error(), uid, constant.PlatformIDToName(platformID)) @@ -284,16 +283,11 @@ func (ws *WServer) MultiTerminalLoginChecker(uid string, platformID int, newConn log.NewError(operationID, "SetTokenMapByUidPid err", err.Error(), uid, platformID, m) return } - err = oldConn.Close() delete(oldConnMap, platformID) ws.wsUserToConn[uid] = oldConnMap if len(oldConnMap) == 0 { delete(ws.wsUserToConn, uid) } - delete(ws.wsConnToUser, oldConn) - if err != nil { - log.NewError(operationID, "conn close err", err.Error(), uid, platformID) - } callbackResp := callbackUserKickOff(operationID, uid, platformID) if callbackResp.ErrCode != 0 { log.NewError(operationID, utils.GetSelfFuncName(), "callbackUserOffline failed", callbackResp) @@ -328,6 +322,11 @@ func (ws *WServer) sendKickMsg(oldConn *UserConn, operationID string) { if err != nil { log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "sendKickMsg WS WriteMsg error", oldConn.RemoteAddr().String(), err.Error()) } + errClose := oldConn.Close() + if errClose != nil { + log.NewError(mReply.OperationID, mReply.ReqIdentifier, mReply.ErrCode, mReply.ErrMsg, "close old conn error", oldConn.RemoteAddr().String(), err.Error()) + + } } func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token string, connID, operationID string) { @@ -341,23 +340,24 @@ func (ws *WServer) addUserConn(uid string, platformID int, conn *UserConn, token go ws.MultiTerminalLoginRemoteChecker(uid, int32(platformID), token, operationID) ws.MultiTerminalLoginChecker(uid, platformID, conn, token, operationID) if oldConnMap, ok := ws.wsUserToConn[uid]; ok { - oldConnMap[platformID] = conn + if conns, ok := oldConnMap[platformID]; ok { + conns = append(conns, conn) + oldConnMap[platformID] = conns + } else { + conns := make([]*UserConn, 2) + conns = append(conns, conn) + oldConnMap[platformID] = conns + } ws.wsUserToConn[uid] = oldConnMap log.Debug(operationID, "user not first come in, add conn ", uid, platformID, conn, oldConnMap) } else { - i := make(map[int]*UserConn) - i[platformID] = conn + i := make(map[int][]*UserConn) + conns := make([]*UserConn, 2) + conns = append(conns, conn) + i[platformID] = conns ws.wsUserToConn[uid] = i log.Debug(operationID, "user first come in, new user, conn", uid, platformID, conn, ws.wsUserToConn[uid]) } - if oldStringMap, ok := ws.wsConnToUser[conn]; ok { - oldStringMap[platformID] = uid - ws.wsConnToUser[conn] = oldStringMap - } else { - i := make(map[int]string) - i[platformID] = uid - ws.wsConnToUser[conn] = i - } count := 0 for _, v := range ws.wsUserToConn { count = count + len(v) @@ -370,36 +370,39 @@ func (ws *WServer) delUserConn(conn *UserConn) { rwLock.Lock() defer rwLock.Unlock() operationID := utils.OperationIDGenerator() - var uid string - var platform int - if oldStringMap, okg := ws.wsConnToUser[conn]; okg { - for k, v := range oldStringMap { - platform = k - uid = v - } - if oldConnMap, ok := ws.wsUserToConn[uid]; ok { // only recycle self conn - if oldconn, okMap := oldConnMap[platform]; okMap { - if oldconn == conn { - delete(oldConnMap, platform) + platform := int(conn.PlatformID) + + if oldConnMap, ok := ws.wsUserToConn[conn.userID]; ok { // only recycle self conn + if oldconns, okMap := oldConnMap[platform]; okMap { + var flag bool + a := make([]*UserConn, 2) + for _, client := range oldconns { + if client != conn { + a = append(a, client) + flag = true } } - ws.wsUserToConn[uid] = oldConnMap - if len(oldConnMap) == 0 { - delete(ws.wsUserToConn, uid) + if flag { + oldConnMap[platform] = a + } else { + delete(oldConnMap, platform) } - count := 0 - for _, v := range ws.wsUserToConn { - count = count + len(v) - } - log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn), "online_conn_num", count) - } else { - log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", uid, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn)) + } - delete(ws.wsConnToUser, conn) + ws.wsUserToConn[conn.userID] = oldConnMap + if len(oldConnMap) == 0 { + delete(ws.wsUserToConn, conn.userID) + } + count := 0 + for _, v := range ws.wsUserToConn { + count = count + len(v) + } + log.Debug(operationID, "WS delete operation", "", "wsUser deleted", ws.wsUserToConn, "disconnection_uid", conn.userID, "disconnection_platform", platform, "online_user_num", len(ws.wsUserToConn), "online_conn_num", count) } + err := conn.Close() if err != nil { - log.Error(operationID, " close err", "", "uid", uid, "platform", platform) + log.Error(operationID, " close err", "", "uid", conn.userID, "platform", platform) } if conn.PlatformID == 0 || conn.connID == "" { log.NewWarn(operationID, utils.GetSelfFuncName(), "PlatformID or connID is null", conn.PlatformID, conn.connID) @@ -412,21 +415,21 @@ func (ws *WServer) delUserConn(conn *UserConn) { } -func (ws *WServer) getUserConn(uid string, platform int) *UserConn { +// func (ws *WServer) getUserConn(uid string, platform int) *UserConn { +// rwLock.RLock() +// defer rwLock.RUnlock() +// if connMap, ok := ws.wsUserToConn[uid]; ok { +// if conn, flag := connMap[platform]; flag { +// return conn +// } +// } +// return nil +// } +func (ws *WServer) getUserAllCons(uid string) map[int][]*UserConn { rwLock.RLock() defer rwLock.RUnlock() if connMap, ok := ws.wsUserToConn[uid]; ok { - if conn, flag := connMap[platform]; flag { - return conn - } - } - return nil -} -func (ws *WServer) getUserAllCons(uid string) map[int]*UserConn { - rwLock.RLock() - defer rwLock.RUnlock() - if connMap, ok := ws.wsUserToConn[uid]; ok { - newConnMap := make(map[int]*UserConn) + newConnMap := make(map[int][]*UserConn) for k, v := range connMap { newConnMap[k] = v }