From 4c509ed67e1e66424a526ec020d690e4764b4b45 Mon Sep 17 00:00:00 2001 From: Gordon <46924906+FGadvancer@users.noreply.github.com> Date: Fri, 25 Aug 2023 11:14:35 +0800 Subject: [PATCH] fix: multiple gateway kick user each other. (#943) * fix: to start im or chat, ZooKeeper must be started first. * fix: msg gateway start output err info Signed-off-by: Gordon <1432970085@qq.com> * fix: msg gateway start output err info Signed-off-by: Gordon <1432970085@qq.com> * chore: package path changes Signed-off-by: withchao <993506633@qq.com> * fix: go mod update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * chore: package path changes Signed-off-by: withchao <993506633@qq.com> * chore: package path changes Signed-off-by: withchao <993506633@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: token update Signed-off-by: Gordon <1432970085@qq.com> * fix: get all userID Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: msggateway add online status call Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: log change Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * refactor: log change Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * chore: network mode change Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * feat: add api of get server time Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * feat: remove go work sum Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: pull message add isRead field Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: check msg-transfer script Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: script update Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: start don't kill old process Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: check component Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: pull message set isRead only message come from single. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> * cicd: robot automated Change Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> * fix: multiple gateway kick user each other. Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> --------- Signed-off-by: Gordon <1432970085@qq.com> Signed-off-by: withchao <993506633@qq.com> Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com> Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: withchao <993506633@qq.com> Co-authored-by: Xinwei Xiong <3293172751NSS@gmail.com> Co-authored-by: FGadvancer --- go.mod | 2 +- go.sum | 4 +-- internal/msggateway/callback.go | 8 +++--- internal/msggateway/context.go | 12 ++++++++ internal/msggateway/hub_server.go | 24 +++++++++++++--- internal/msggateway/n_ws_server.go | 45 ++++++++++++++++++++++++++---- internal/rpc/auth/auth.go | 2 +- 7 files changed, 80 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index 01ff50b18..d282b0960 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require github.com/google/uuid v1.3.0 require ( github.com/OpenIMSDK/protocol v0.0.14 - github.com/OpenIMSDK/tools v0.0.13 + github.com/OpenIMSDK/tools v0.0.14 github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible github.com/go-redis/redis v6.15.9+incompatible github.com/go-sql-driver/mysql v1.7.1 diff --git a/go.sum b/go.sum index 26e009e28..9f1c2d072 100644 --- a/go.sum +++ b/go.sum @@ -19,8 +19,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/OpenIMSDK/protocol v0.0.14 h1:cvQ3f8MTcyYygAnZ7Exq6zIbvHGCEV0fWdpzjQEDDBQ= github.com/OpenIMSDK/protocol v0.0.14/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= -github.com/OpenIMSDK/tools v0.0.13 h1:rcw4HS8S2DPZR9UOBxD8/ol9UBMzXBypzOVEytDRIMo= -github.com/OpenIMSDK/tools v0.0.13/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= +github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ= +github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/Shopify/sarama v1.29.0 h1:ARid8o8oieau9XrHI55f/L3EoRAhm9px6sonbD7yuUE= github.com/Shopify/sarama v1.29.0/go.mod h1:2QpgD79wpdAESqNQMxNc0KYMkycd4slxGdV3TWSVqrU= diff --git a/internal/msggateway/callback.go b/internal/msggateway/callback.go index 11fdd0298..b2999f385 100644 --- a/internal/msggateway/callback.go +++ b/internal/msggateway/callback.go @@ -26,7 +26,7 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/http" ) -func url() string { +func callBackURL() string { return config.Config.Callback.CallbackUrl } @@ -49,7 +49,7 @@ func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAp ConnID: connID, } resp := cbapi.CommonCallbackResp{} - return http.CallBackPostReturn(ctx, url(), &req, &resp, config.Config.Callback.CallbackUserOnline) + return http.CallBackPostReturn(ctx, callBackURL(), &req, &resp, config.Config.Callback.CallbackUserOnline) } func CallbackUserOffline(ctx context.Context, userID string, platformID int, connID string) error { @@ -70,7 +70,7 @@ func CallbackUserOffline(ctx context.Context, userID string, platformID int, con ConnID: connID, } resp := &cbapi.CallbackUserOfflineResp{} - return http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackUserOffline) + return http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline) } func CallbackUserKickOff(ctx context.Context, userID string, platformID int) error { @@ -90,7 +90,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err Seq: time.Now().UnixMilli(), } resp := &cbapi.CommonCallbackResp{} - return http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackUserOffline) + return http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline) } // func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID diff --git a/internal/msggateway/context.go b/internal/msggateway/context.go index 601c28a34..1d32ff71c 100644 --- a/internal/msggateway/context.go +++ b/internal/msggateway/context.go @@ -16,6 +16,7 @@ package msggateway import ( "net/http" + "net/url" "strconv" "time" @@ -71,6 +72,11 @@ func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnCont ConnID: utils.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))), } } +func newTempContext() *UserConnContext { + return &UserConnContext{ + Req: &http.Request{URL: &url.URL{}}, + } +} func (c *UserConnContext) GetRemoteAddr() string { return c.RemoteAddr @@ -116,9 +122,15 @@ func (c *UserConnContext) GetOperationID() string { return c.Req.URL.Query().Get(OperationID) } +func (c *UserConnContext) SetOperationID(operationID string) { + c.Req.URL.Query().Set(OperationID, operationID) +} func (c *UserConnContext) GetToken() string { return c.Req.URL.Query().Get(Token) } +func (c *UserConnContext) SetToken(token string) { + c.Req.URL.RawQuery = Token + "=" + token +} func (c *UserConnContext) GetBackground() bool { b, err := strconv.ParseBool(c.Req.URL.Query().Get(BackgroundStatus)) diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index d57d41aec..1ebbb5902 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -17,6 +17,8 @@ package msggateway import ( "context" + "github.com/OpenIMSDK/tools/mcontext" + "github.com/OpenIMSDK/Open-IM-Server/pkg/authverify" "github.com/OpenIMSDK/tools/errs" @@ -35,13 +37,13 @@ import ( "github.com/OpenIMSDK/Open-IM-Server/pkg/common/startrpc" ) -func (s *Server) InitServer(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { +func (s *Server) InitServer(disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { rdb, err := cache.NewRedis() if err != nil { return err } msgModel := cache.NewMsgCacheModel(rdb) - s.LongConnServer.SetDiscoveryRegistry(client) + s.LongConnServer.SetDiscoveryRegistry(disCov) s.LongConnServer.SetCacheHandler(msgModel) msggateway.RegisterMsgGatewayServer(server, s) return nil @@ -198,6 +200,20 @@ func (s *Server) MultiTerminalLoginCheck( ctx context.Context, req *msggateway.MultiTerminalLoginCheckReq, ) (*msggateway.MultiTerminalLoginCheckResp, error) { - // TODO implement me - panic("implement me") + if oldClients, userOK, clientOK := s.LongConnServer.GetUserPlatformCons(req.UserID, int(req.PlatformID)); userOK { + tempUserCtx := newTempContext() + tempUserCtx.SetToken(req.Token) + tempUserCtx.SetOperationID(mcontext.GetOperationID(ctx)) + client := &Client{} + client.ctx = tempUserCtx + client.UserID = req.UserID + client.PlatformID = int(req.PlatformID) + i := &kickHandler{ + clientOK: clientOK, + oldClients: oldClients, + newClient: client, + } + s.LongConnServer.SetKickHandlerInfo(i) + } + return &msggateway.MultiTerminalLoginCheckResp{}, nil } diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index ee8853af6..124898f7a 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -23,6 +23,8 @@ import ( "sync/atomic" "time" + "github.com/OpenIMSDK/protocol/msggateway" + "github.com/OpenIMSDK/Open-IM-Server/pkg/authverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" @@ -52,6 +54,7 @@ type LongConnServer interface { SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) KickUserConn(client *Client) error UnRegister(c *Client) + SetKickHandlerInfo(i *kickHandler) Compressor Encoder MessageHandler @@ -78,6 +81,7 @@ type WsServer struct { validate *validator.Validate cache cache.MsgModel userClient *rpcclient.UserRpcClient + disCov discoveryregistry.SvcDiscoveryRegistry Compressor Encoder MessageHandler @@ -88,10 +92,11 @@ type kickHandler struct { newClient *Client } -func (ws *WsServer) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) { - ws.MessageHandler = NewGrpcHandler(ws.validate, client) - u := rpcclient.NewUserRpcClient(client) +func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry) { + ws.MessageHandler = NewGrpcHandler(ws.validate, disCov) + u := rpcclient.NewUserRpcClient(disCov) ws.userClient = &u + ws.disCov = disCov } func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, status int32) { @@ -180,6 +185,31 @@ func (ws *WsServer) Run() error { return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) // Start listening } +func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error { + conns, err := ws.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName) + if err != nil { + return err + } + // Online push user online message to other node + for _, v := range conns { + if v.Target() == ws.disCov.GetSelfConnTarget() { + log.ZDebug(ctx, "Filter out this node", "node", v.Target()) + continue + } + msgClient := msggateway.NewMsgGatewayClient(v) + _, err := msgClient.MultiTerminalLoginCheck(ctx, &msggateway.MultiTerminalLoginCheckReq{UserID: client.UserID, + PlatformID: int32(client.PlatformID), Token: client.token}) + if err != nil { + log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target()) + continue + } + } + return nil +} +func (ws *WsServer) SetKickHandlerInfo(i *kickHandler) { + ws.kickHandlerChan <- i +} + func (ws *WsServer) registerClient(client *Client) { var ( userOK bool @@ -211,6 +241,7 @@ func (ws *WsServer) registerClient(client *Client) { atomic.AddInt64(&ws.onlineUserConnNum, 1) } } + ws.sendUserOnlineInfoToOtherNode(client.ctx, client) ws.SetUserOnlineStatus(client.ctx, client, constant.Online) log.ZInfo( client.ctx, @@ -249,7 +280,10 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien fallthrough case constant.AllLoginButSameTermKick: if clientOK { - ws.clients.deleteClients(newClient.UserID, oldClients) + isDeleteUser := ws.clients.deleteClients(newClient.UserID, oldClients) + if isDeleteUser { + atomic.AddInt64(&ws.onlineUserNum, -1) + } for _, c := range oldClients { err := c.KickOnlineMessage() if err != nil { @@ -301,7 +335,8 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien m[k] = constant.KickedToken } } - log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID", newClient.UserID) + log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID", + newClient.UserID, "token", newClient.ctx.GetToken()) err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m) if err != nil { log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID) diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index 1babb0ca5..c1fa0d6e6 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -134,7 +134,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID return err } for _, v := range conns { - log.ZDebug(ctx, "forceKickOff", "conn", v.(*grpc.ClientConn).Target()) + log.ZDebug(ctx, "forceKickOff", "conn", v.Target()) } for _, v := range conns { client := msggateway.NewMsgGatewayClient(v)