fix: multiple gateway kick user each other. (#943)

* fix: to start im or chat, ZooKeeper must be started first.

* fix: msg gateway start output err info

Signed-off-by: Gordon <1432970085@qq.com>

* fix: msg gateway start output err info

Signed-off-by: Gordon <1432970085@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* fix: go mod update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* chore: package path changes

Signed-off-by: withchao <993506633@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: token update

Signed-off-by: Gordon <1432970085@qq.com>

* fix: get all userID

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: msggateway add online status call

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: log change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* refactor: log change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* chore: network mode change

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* feat: add api of get server time

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* feat: remove go work sum

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: pull message add isRead field

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: check msg-transfer script

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix:  script update

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: start don't kill old process

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: check component

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: pull message set isRead only message come from single.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

* cicd: robot automated Change

Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* fix: multiple gateway kick user each other.

Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>

---------

Signed-off-by: Gordon <1432970085@qq.com>
Signed-off-by: withchao <993506633@qq.com>
Signed-off-by: Gordon <46924906+FGadvancer@users.noreply.github.com>
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: withchao <993506633@qq.com>
Co-authored-by: Xinwei Xiong <3293172751NSS@gmail.com>
Co-authored-by: FGadvancer <FGadvancer@users.noreply.github.com>
This commit is contained in:
Gordon 2023-08-25 11:14:35 +08:00 committed by GitHub
parent 19343fef62
commit 4c509ed67e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 80 additions and 17 deletions

2
go.mod
View File

@ -38,7 +38,7 @@ require github.com/google/uuid v1.3.0
require ( require (
github.com/OpenIMSDK/protocol v0.0.14 github.com/OpenIMSDK/protocol v0.0.14
github.com/OpenIMSDK/tools v0.0.13 github.com/OpenIMSDK/tools v0.0.14
github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible
github.com/go-redis/redis v6.15.9+incompatible github.com/go-redis/redis v6.15.9+incompatible
github.com/go-sql-driver/mysql v1.7.1 github.com/go-sql-driver/mysql v1.7.1

4
go.sum
View File

@ -19,8 +19,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/OpenIMSDK/protocol v0.0.14 h1:cvQ3f8MTcyYygAnZ7Exq6zIbvHGCEV0fWdpzjQEDDBQ= github.com/OpenIMSDK/protocol v0.0.14 h1:cvQ3f8MTcyYygAnZ7Exq6zIbvHGCEV0fWdpzjQEDDBQ=
github.com/OpenIMSDK/protocol v0.0.14/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/protocol v0.0.14/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
github.com/OpenIMSDK/tools v0.0.13 h1:rcw4HS8S2DPZR9UOBxD8/ol9UBMzXBypzOVEytDRIMo= github.com/OpenIMSDK/tools v0.0.14 h1:WLof/+WxyPyRST+QkoTKubYCiV73uCLiL8pgnpH/yKQ=
github.com/OpenIMSDK/tools v0.0.13/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/OpenIMSDK/tools v0.0.14/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/Shopify/sarama v1.29.0 h1:ARid8o8oieau9XrHI55f/L3EoRAhm9px6sonbD7yuUE= github.com/Shopify/sarama v1.29.0 h1:ARid8o8oieau9XrHI55f/L3EoRAhm9px6sonbD7yuUE=
github.com/Shopify/sarama v1.29.0/go.mod h1:2QpgD79wpdAESqNQMxNc0KYMkycd4slxGdV3TWSVqrU= github.com/Shopify/sarama v1.29.0/go.mod h1:2QpgD79wpdAESqNQMxNc0KYMkycd4slxGdV3TWSVqrU=

View File

@ -26,7 +26,7 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/http" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/http"
) )
func url() string { func callBackURL() string {
return config.Config.Callback.CallbackUrl return config.Config.Callback.CallbackUrl
} }
@ -49,7 +49,7 @@ func CallbackUserOnline(ctx context.Context, userID string, platformID int, isAp
ConnID: connID, ConnID: connID,
} }
resp := cbapi.CommonCallbackResp{} resp := cbapi.CommonCallbackResp{}
return http.CallBackPostReturn(ctx, url(), &req, &resp, config.Config.Callback.CallbackUserOnline) return http.CallBackPostReturn(ctx, callBackURL(), &req, &resp, config.Config.Callback.CallbackUserOnline)
} }
func CallbackUserOffline(ctx context.Context, userID string, platformID int, connID string) error { func CallbackUserOffline(ctx context.Context, userID string, platformID int, connID string) error {
@ -70,7 +70,7 @@ func CallbackUserOffline(ctx context.Context, userID string, platformID int, con
ConnID: connID, ConnID: connID,
} }
resp := &cbapi.CallbackUserOfflineResp{} resp := &cbapi.CallbackUserOfflineResp{}
return http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackUserOffline) return http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline)
} }
func CallbackUserKickOff(ctx context.Context, userID string, platformID int) error { func CallbackUserKickOff(ctx context.Context, userID string, platformID int) error {
@ -90,7 +90,7 @@ func CallbackUserKickOff(ctx context.Context, userID string, platformID int) err
Seq: time.Now().UnixMilli(), Seq: time.Now().UnixMilli(),
} }
resp := &cbapi.CommonCallbackResp{} resp := &cbapi.CommonCallbackResp{}
return http.CallBackPostReturn(ctx, url(), req, resp, config.Config.Callback.CallbackUserOffline) return http.CallBackPostReturn(ctx, callBackURL(), req, resp, config.Config.Callback.CallbackUserOffline)
} }
// func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID // func callbackUserOnline(operationID, userID string, platformID int, token string, isAppBackground bool, connID

View File

@ -16,6 +16,7 @@ package msggateway
import ( import (
"net/http" "net/http"
"net/url"
"strconv" "strconv"
"time" "time"
@ -71,6 +72,11 @@ func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnCont
ConnID: utils.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))), ConnID: utils.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))),
} }
} }
func newTempContext() *UserConnContext {
return &UserConnContext{
Req: &http.Request{URL: &url.URL{}},
}
}
func (c *UserConnContext) GetRemoteAddr() string { func (c *UserConnContext) GetRemoteAddr() string {
return c.RemoteAddr return c.RemoteAddr
@ -116,9 +122,15 @@ func (c *UserConnContext) GetOperationID() string {
return c.Req.URL.Query().Get(OperationID) return c.Req.URL.Query().Get(OperationID)
} }
func (c *UserConnContext) SetOperationID(operationID string) {
c.Req.URL.Query().Set(OperationID, operationID)
}
func (c *UserConnContext) GetToken() string { func (c *UserConnContext) GetToken() string {
return c.Req.URL.Query().Get(Token) return c.Req.URL.Query().Get(Token)
} }
func (c *UserConnContext) SetToken(token string) {
c.Req.URL.RawQuery = Token + "=" + token
}
func (c *UserConnContext) GetBackground() bool { func (c *UserConnContext) GetBackground() bool {
b, err := strconv.ParseBool(c.Req.URL.Query().Get(BackgroundStatus)) b, err := strconv.ParseBool(c.Req.URL.Query().Get(BackgroundStatus))

View File

@ -17,6 +17,8 @@ package msggateway
import ( import (
"context" "context"
"github.com/OpenIMSDK/tools/mcontext"
"github.com/OpenIMSDK/Open-IM-Server/pkg/authverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/authverify"
"github.com/OpenIMSDK/tools/errs" "github.com/OpenIMSDK/tools/errs"
@ -35,13 +37,13 @@ import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/startrpc" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/startrpc"
) )
func (s *Server) InitServer(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error { func (s *Server) InitServer(disCov discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) error {
rdb, err := cache.NewRedis() rdb, err := cache.NewRedis()
if err != nil { if err != nil {
return err return err
} }
msgModel := cache.NewMsgCacheModel(rdb) msgModel := cache.NewMsgCacheModel(rdb)
s.LongConnServer.SetDiscoveryRegistry(client) s.LongConnServer.SetDiscoveryRegistry(disCov)
s.LongConnServer.SetCacheHandler(msgModel) s.LongConnServer.SetCacheHandler(msgModel)
msggateway.RegisterMsgGatewayServer(server, s) msggateway.RegisterMsgGatewayServer(server, s)
return nil return nil
@ -198,6 +200,20 @@ func (s *Server) MultiTerminalLoginCheck(
ctx context.Context, ctx context.Context,
req *msggateway.MultiTerminalLoginCheckReq, req *msggateway.MultiTerminalLoginCheckReq,
) (*msggateway.MultiTerminalLoginCheckResp, error) { ) (*msggateway.MultiTerminalLoginCheckResp, error) {
// TODO implement me if oldClients, userOK, clientOK := s.LongConnServer.GetUserPlatformCons(req.UserID, int(req.PlatformID)); userOK {
panic("implement me") tempUserCtx := newTempContext()
tempUserCtx.SetToken(req.Token)
tempUserCtx.SetOperationID(mcontext.GetOperationID(ctx))
client := &Client{}
client.ctx = tempUserCtx
client.UserID = req.UserID
client.PlatformID = int(req.PlatformID)
i := &kickHandler{
clientOK: clientOK,
oldClients: oldClients,
newClient: client,
}
s.LongConnServer.SetKickHandlerInfo(i)
}
return &msggateway.MultiTerminalLoginCheckResp{}, nil
} }

View File

@ -23,6 +23,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/OpenIMSDK/protocol/msggateway"
"github.com/OpenIMSDK/Open-IM-Server/pkg/authverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/authverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient"
@ -52,6 +54,7 @@ type LongConnServer interface {
SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry)
KickUserConn(client *Client) error KickUserConn(client *Client) error
UnRegister(c *Client) UnRegister(c *Client)
SetKickHandlerInfo(i *kickHandler)
Compressor Compressor
Encoder Encoder
MessageHandler MessageHandler
@ -78,6 +81,7 @@ type WsServer struct {
validate *validator.Validate validate *validator.Validate
cache cache.MsgModel cache cache.MsgModel
userClient *rpcclient.UserRpcClient userClient *rpcclient.UserRpcClient
disCov discoveryregistry.SvcDiscoveryRegistry
Compressor Compressor
Encoder Encoder
MessageHandler MessageHandler
@ -88,10 +92,11 @@ type kickHandler struct {
newClient *Client newClient *Client
} }
func (ws *WsServer) SetDiscoveryRegistry(client discoveryregistry.SvcDiscoveryRegistry) { func (ws *WsServer) SetDiscoveryRegistry(disCov discoveryregistry.SvcDiscoveryRegistry) {
ws.MessageHandler = NewGrpcHandler(ws.validate, client) ws.MessageHandler = NewGrpcHandler(ws.validate, disCov)
u := rpcclient.NewUserRpcClient(client) u := rpcclient.NewUserRpcClient(disCov)
ws.userClient = &u ws.userClient = &u
ws.disCov = disCov
} }
func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, status int32) { func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, status int32) {
@ -180,6 +185,31 @@ func (ws *WsServer) Run() error {
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) // Start listening return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) // Start listening
} }
func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error {
conns, err := ws.disCov.GetConns(ctx, config.Config.RpcRegisterName.OpenImMessageGatewayName)
if err != nil {
return err
}
// Online push user online message to other node
for _, v := range conns {
if v.Target() == ws.disCov.GetSelfConnTarget() {
log.ZDebug(ctx, "Filter out this node", "node", v.Target())
continue
}
msgClient := msggateway.NewMsgGatewayClient(v)
_, err := msgClient.MultiTerminalLoginCheck(ctx, &msggateway.MultiTerminalLoginCheckReq{UserID: client.UserID,
PlatformID: int32(client.PlatformID), Token: client.token})
if err != nil {
log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target())
continue
}
}
return nil
}
func (ws *WsServer) SetKickHandlerInfo(i *kickHandler) {
ws.kickHandlerChan <- i
}
func (ws *WsServer) registerClient(client *Client) { func (ws *WsServer) registerClient(client *Client) {
var ( var (
userOK bool userOK bool
@ -211,6 +241,7 @@ func (ws *WsServer) registerClient(client *Client) {
atomic.AddInt64(&ws.onlineUserConnNum, 1) atomic.AddInt64(&ws.onlineUserConnNum, 1)
} }
} }
ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
ws.SetUserOnlineStatus(client.ctx, client, constant.Online) ws.SetUserOnlineStatus(client.ctx, client, constant.Online)
log.ZInfo( log.ZInfo(
client.ctx, client.ctx,
@ -249,7 +280,10 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
fallthrough fallthrough
case constant.AllLoginButSameTermKick: case constant.AllLoginButSameTermKick:
if clientOK { if clientOK {
ws.clients.deleteClients(newClient.UserID, oldClients) isDeleteUser := ws.clients.deleteClients(newClient.UserID, oldClients)
if isDeleteUser {
atomic.AddInt64(&ws.onlineUserNum, -1)
}
for _, c := range oldClients { for _, c := range oldClients {
err := c.KickOnlineMessage() err := c.KickOnlineMessage()
if err != nil { if err != nil {
@ -301,7 +335,8 @@ func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Clien
m[k] = constant.KickedToken m[k] = constant.KickedToken
} }
} }
log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID", newClient.UserID) log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID",
newClient.UserID, "token", newClient.ctx.GetToken())
err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m) err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m)
if err != nil { if err != nil {
log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID) log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID)

View File

@ -134,7 +134,7 @@ func (s *authServer) forceKickOff(ctx context.Context, userID string, platformID
return err return err
} }
for _, v := range conns { for _, v := range conns {
log.ZDebug(ctx, "forceKickOff", "conn", v.(*grpc.ClientConn).Target()) log.ZDebug(ctx, "forceKickOff", "conn", v.Target())
} }
for _, v := range conns { for _, v := range conns {
client := msggateway.NewMsgGatewayClient(v) client := msggateway.NewMsgGatewayClient(v)