2024-04-15 14:50:10 +08:00

464 lines
13 KiB
Go

// Copyright © 2023 OpenIM. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package msggateway
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/webhook"
"github.com/openimsdk/open-im-server/v3/pkg/util/memAsyncQueue"
pbAuth "github.com/openimsdk/protocol/auth"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/go-playground/validator/v10"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/protocol/msggateway"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/stringutil"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
)
type LongConnServer interface {
Run(done chan error) error
wsHandler(w http.ResponseWriter, r *http.Request)
GetUserAllCons(userID string) ([]*Client, bool)
GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool)
Validate(s any) error
SetCacheHandler(cache cache.TokenModel)
SetDiscoveryRegistry(client discovery.SvcDiscoveryRegistry, config *Config)
KickUserConn(client *Client) error
UnRegister(c *Client)
SetKickHandlerInfo(i *kickHandler)
Compressor
Encoder
MessageHandler
}
const (
webhookWorkerCount = 2
webhookBufferSize = 100
)
type WsServer struct {
msgGatewayConfig *Config
port int
wsMaxConnNum int64
registerChan chan *Client
unregisterChan chan *Client
kickHandlerChan chan *kickHandler
clients *UserMap
clientPool sync.Pool
onlineUserNum atomic.Int64
onlineUserConnNum atomic.Int64
handshakeTimeout time.Duration
writeBufferSize int
validate *validator.Validate
cache cache.TokenModel
userClient *rpcclient.UserRpcClient
authClient *rpcclient.Auth
disCov discovery.SvcDiscoveryRegistry
Compressor
Encoder
MessageHandler
webhookClient *webhook.Client
}
type kickHandler struct {
clientOK bool
oldClients []*Client
newClient *Client
}
func (ws *WsServer) SetDiscoveryRegistry(disCov discovery.SvcDiscoveryRegistry, config *Config) {
ws.MessageHandler = NewGrpcHandler(ws.validate, disCov, &config.Share.RpcRegisterName)
u := rpcclient.NewUserRpcClient(disCov, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
ws.authClient = rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth)
ws.userClient = &u
ws.disCov = disCov
}
func (ws *WsServer) SetUserOnlineStatus(ctx context.Context, client *Client, status int32) {
err := ws.userClient.SetUserStatus(ctx, client.UserID, status, client.PlatformID)
if err != nil {
log.ZWarn(ctx, "SetUserStatus err", err)
}
switch status {
case constant.Online:
ws.webhookAfterUserOnline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOnline, client.UserID, client.PlatformID, client.IsBackground, client.ctx.GetConnID())
case constant.Offline:
ws.webhookAfterUserOffline(ctx, &ws.msgGatewayConfig.WebhooksConfig.AfterUserOffline, client.UserID, client.PlatformID, client.ctx.GetConnID())
}
}
func (ws *WsServer) SetCacheHandler(cache cache.TokenModel) {
ws.cache = cache
}
func (ws *WsServer) UnRegister(c *Client) {
ws.unregisterChan <- c
}
func (ws *WsServer) Validate(_ any) error {
return nil
}
func (ws *WsServer) GetUserAllCons(userID string) ([]*Client, bool) {
return ws.clients.GetAll(userID)
}
func (ws *WsServer) GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool) {
return ws.clients.Get(userID, platform)
}
func NewWsServer(msgGatewayConfig *Config, opts ...Option) (*WsServer, error) {
var config configs
for _, o := range opts {
o(&config)
}
v := validator.New()
return &WsServer{
msgGatewayConfig: msgGatewayConfig,
port: config.port,
wsMaxConnNum: config.maxConnNum,
writeBufferSize: config.writeBufferSize,
handshakeTimeout: config.handshakeTimeout,
clientPool: sync.Pool{
New: func() any {
return new(Client)
},
},
registerChan: make(chan *Client, 1000),
unregisterChan: make(chan *Client, 1000),
kickHandlerChan: make(chan *kickHandler, 1000),
validate: v,
clients: newUserMap(),
Compressor: NewGzipCompressor(),
Encoder: NewGobEncoder(),
webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL, memAsyncQueue.NewMemoryQueue(webhookWorkerCount, webhookBufferSize)),
}, nil
}
func (ws *WsServer) Run(done chan error) error {
var (
client *Client
netErr error
shutdownDone = make(chan struct{}, 1)
)
server := http.Server{Addr: ":" + stringutil.IntToString(ws.port), Handler: nil}
go func() {
for {
select {
case <-shutdownDone:
return
case client = <-ws.registerChan:
ws.registerClient(client)
case client = <-ws.unregisterChan:
ws.unregisterClient(client)
case onlineInfo := <-ws.kickHandlerChan:
ws.multiTerminalLoginChecker(onlineInfo.clientOK, onlineInfo.oldClients, onlineInfo.newClient)
}
}
}()
netDone := make(chan struct{}, 1)
go func() {
http.HandleFunc("/", ws.wsHandler)
err := server.ListenAndServe()
defer close(netDone)
if err != nil && err != http.ErrServerClosed {
netErr = errs.WrapMsg(err, "ws start err", server.Addr)
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
var err error
select {
case err = <-done:
sErr := server.Shutdown(ctx)
if sErr != nil {
return errs.WrapMsg(sErr, "shutdown err")
}
close(shutdownDone)
if err != nil {
return err
}
case <-netDone:
}
return netErr
}
var concurrentRequest = 3
func (ws *WsServer) sendUserOnlineInfoToOtherNode(ctx context.Context, client *Client) error {
conns, err := ws.disCov.GetConns(ctx, ws.msgGatewayConfig.Share.RpcRegisterName.MessageGateway)
if err != nil {
return err
}
wg := errgroup.Group{}
wg.SetLimit(concurrentRequest)
// Online push user online message to other node
for _, v := range conns {
v := v // safe closure var
if v.Target() == ws.disCov.GetSelfConnTarget() {
log.ZDebug(ctx, "Filter out this node", "node", v.Target())
continue
}
wg.Go(func() error {
msgClient := msggateway.NewMsgGatewayClient(v)
_, err := msgClient.MultiTerminalLoginCheck(ctx, &msggateway.MultiTerminalLoginCheckReq{
UserID: client.UserID,
PlatformID: int32(client.PlatformID), Token: client.token,
})
if err != nil {
log.ZWarn(ctx, "MultiTerminalLoginCheck err", err, "node", v.Target())
}
return nil
})
}
_ = wg.Wait()
return nil
}
func (ws *WsServer) SetKickHandlerInfo(i *kickHandler) {
ws.kickHandlerChan <- i
}
func (ws *WsServer) registerClient(client *Client) {
var (
userOK bool
clientOK bool
oldClients []*Client
)
oldClients, userOK, clientOK = ws.clients.Get(client.UserID, client.PlatformID)
if !userOK {
ws.clients.Set(client.UserID, client)
log.ZDebug(client.ctx, "user not exist", "userID", client.UserID, "platformID", client.PlatformID)
prommetrics.OnlineUserGauge.Add(1)
ws.onlineUserNum.Add(1)
ws.onlineUserConnNum.Add(1)
} else {
ws.multiTerminalLoginChecker(clientOK, oldClients, client)
log.ZDebug(client.ctx, "user exist", "userID", client.UserID, "platformID", client.PlatformID)
if clientOK {
ws.clients.Set(client.UserID, client)
// There is already a connection to the platform
log.ZInfo(client.ctx, "repeat login", "userID", client.UserID, "platformID", client.PlatformID, "old remote addr", getRemoteAdders(oldClients))
ws.onlineUserConnNum.Add(1)
} else {
ws.clients.Set(client.UserID, client)
ws.onlineUserConnNum.Add(1)
}
}
wg := sync.WaitGroup{}
if ws.msgGatewayConfig.Share.Env == "zookeeper" {
wg.Add(1)
go func() {
defer wg.Done()
_ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
}()
}
wg.Add(1)
go func() {
defer wg.Done()
ws.SetUserOnlineStatus(client.ctx, client, constant.Online)
}()
wg.Wait()
log.ZInfo(
client.ctx,
"user online",
"online user Num",
ws.onlineUserNum.Load(),
"online user conn Num",
ws.onlineUserConnNum.Load(),
)
}
func getRemoteAdders(client []*Client) string {
var ret string
for i, c := range client {
if i == 0 {
ret = c.ctx.GetRemoteAddr()
} else {
ret += "@" + c.ctx.GetRemoteAddr()
}
}
return ret
}
func (ws *WsServer) KickUserConn(client *Client) error {
ws.clients.deleteClients(client.UserID, []*Client{client})
return client.KickOnlineMessage()
}
func (ws *WsServer) multiTerminalLoginChecker(clientOK bool, oldClients []*Client, newClient *Client) {
switch ws.msgGatewayConfig.MsgGateway.MultiLoginPolicy {
case constant.DefalutNotKick:
case constant.PCAndOther:
if constant.PlatformIDToClass(newClient.PlatformID) == constant.TerminalPC {
return
}
fallthrough
case constant.AllLoginButSameTermKick:
if !clientOK {
return
}
ws.clients.deleteClients(newClient.UserID, oldClients)
for _, c := range oldClients {
err := c.KickOnlineMessage()
if err != nil {
log.ZWarn(c.ctx, "KickOnlineMessage", err)
}
}
m, err := ws.cache.GetTokensWithoutError(
newClient.ctx,
newClient.UserID,
newClient.PlatformID,
)
if err != nil && err != redis.Nil {
log.ZWarn(
newClient.ctx,
"get token from redis err",
err,
"userID",
newClient.UserID,
"platformID",
newClient.PlatformID,
)
return
}
if m == nil {
log.ZWarn(
newClient.ctx,
"m is nil",
errs.New("m is nil"),
"userID",
newClient.UserID,
"platformID",
newClient.PlatformID,
)
return
}
log.ZDebug(
newClient.ctx,
"get token from redis",
"userID",
newClient.UserID,
"platformID",
newClient.PlatformID,
"tokenMap",
m,
)
for k := range m {
if k != newClient.ctx.GetToken() {
m[k] = constant.KickedToken
}
}
log.ZDebug(newClient.ctx, "set token map is ", "token map", m, "userID",
newClient.UserID, "token", newClient.ctx.GetToken())
err = ws.cache.SetTokenMapByUidPid(newClient.ctx, newClient.UserID, newClient.PlatformID, m)
if err != nil {
log.ZWarn(newClient.ctx, "SetTokenMapByUidPid err", err, "userID", newClient.UserID, "platformID", newClient.PlatformID)
return
}
}
}
func (ws *WsServer) unregisterClient(client *Client) {
defer ws.clientPool.Put(client)
isDeleteUser := ws.clients.delete(client.UserID, client.ctx.GetRemoteAddr())
if isDeleteUser {
ws.onlineUserNum.Add(-1)
prommetrics.OnlineUserGauge.Dec()
}
ws.onlineUserConnNum.Add(-1)
ws.SetUserOnlineStatus(client.ctx, client, constant.Offline)
log.ZInfo(client.ctx, "user offline", "close reason", client.closedErr, "online user Num", ws.onlineUserNum.Load(), "online user conn Num",
ws.onlineUserConnNum.Load(),
)
}
// validateRespWithRequest checks if the response matches the expected userID and platformID.
func (ws *WsServer) validateRespWithRequest(ctx *UserConnContext, resp *pbAuth.ParseTokenResp) error {
userID := ctx.GetUserID()
platformID := stringutil.StringToInt32(ctx.GetPlatformID())
if resp.UserID != userID {
return servererrs.ErrTokenInvalid.WrapMsg(fmt.Sprintf("token uid %s != userID %s", resp.UserID, userID))
}
if resp.PlatformID != platformID {
return servererrs.ErrTokenInvalid.WrapMsg(fmt.Sprintf("token platform %d != platformID %d", resp.PlatformID, platformID))
}
return nil
}
func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {
connContext := newContext(w, r)
if ws.onlineUserConnNum.Load() >= ws.wsMaxConnNum {
httpError(connContext, servererrs.ErrConnOverMaxNumLimit.WrapMsg("over max conn num limit"))
return
}
err := connContext.ParseEssentialArgs()
if err != nil {
httpError(connContext, err)
return
}
resp, err := ws.authClient.ParseToken(connContext, connContext.GetToken())
if err != nil {
shouldSendError := connContext.ShouldSendError()
if shouldSendError {
wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout, ws.writeBufferSize)
if err := wsLongConn.RespErrInfo(err, w, r); err == nil {
return
}
}
httpError(connContext, err)
return
}
err = ws.validateRespWithRequest(connContext, resp)
if err != nil {
httpError(connContext, err)
return
}
wsLongConn := newGWebSocket(WebSocket, ws.handshakeTimeout, ws.writeBufferSize)
if err := wsLongConn.GenerateLongConn(w, r); err != nil {
httpError(connContext, err)
return
}
client := ws.clientPool.Get().(*Client)
client.ResetClient(connContext, wsLongConn, ws)
ws.registerChan <- client
go client.readMessage()
}