From e08a127213030bc7f37257a4cbc8164c939ff79a Mon Sep 17 00:00:00 2001 From: Gordon <1432970085@qq.com> Date: Thu, 23 Mar 2023 15:14:50 +0800 Subject: [PATCH] ws update --- internal/msggateway/client.go | 8 +++---- internal/msggateway/context.go | 37 +++++++++++++++++++++++++++++- internal/msggateway/hub_server.go | 2 +- internal/msggateway/n_ws_server.go | 13 ++++------- pkg/common/mcontext/ctx.go | 10 ++++++++ 5 files changed, 55 insertions(+), 15 deletions(-) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 3299be1ca..6e84ac25b 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -37,7 +37,7 @@ type Client struct { isCompress bool userID string isBackground bool - connID string + ctx *UserConnContext onlineAt int64 // 上线时间戳(毫秒) longConnServer LongConnServer closed bool @@ -50,7 +50,7 @@ func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client { platformID: utils.StringToInt(ctx.GetPlatformID()), isCompress: isCompress, userID: ctx.GetUserID(), - connID: ctx.GetConnID(), + ctx: ctx, onlineAt: utils.GetCurrentTimestampByMill(), } } @@ -60,7 +60,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isCompress boo c.platformID = utils.StringToInt(ctx.GetPlatformID()) c.isCompress = isCompress c.userID = ctx.GetUserID() - c.connID = ctx.GetConnID() + c.ctx = ctx c.onlineAt = utils.GetCurrentTimestampByMill() c.longConnServer = longConnServer } @@ -120,7 +120,7 @@ func (c *Client) handleMessage(message []byte) error { return errors.New("exception conn userID not same to req userID") } ctx := context.Background() - ctx = context.WithValue(ctx, ConnID, c.connID) + ctx = context.WithValue(ctx, ConnID, c.ctx.GetConnID()) ctx = context.WithValue(ctx, OperationID, binaryReq.OperationID) ctx = context.WithValue(ctx, CommonUserID, binaryReq.SendID) ctx = context.WithValue(ctx, PlatformID, c.platformID) diff --git a/internal/msggateway/context.go b/internal/msggateway/context.go index 164b697e0..781a321ef 100644 --- a/internal/msggateway/context.go +++ b/internal/msggateway/context.go @@ -1,9 +1,11 @@ package msggateway import ( + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant" "github.com/OpenIMSDK/Open-IM-Server/pkg/utils" "net/http" "strconv" + "time" ) type UserConnContext struct { @@ -12,6 +14,35 @@ type UserConnContext struct { Path string Method string RemoteAddr string + ConnID string +} + +func (c *UserConnContext) Deadline() (deadline time.Time, ok bool) { + return +} + +func (c *UserConnContext) Done() <-chan struct{} { + return nil +} + +func (c *UserConnContext) Err() error { + return nil +} + +func (c *UserConnContext) Value(key any) any { + switch key { + case constant.OpUserID: + c.GetUserID() + case constant.OperationID: + c.GetOperationID() + case constant.ConnID: + c.GetConnID() + case constant.OpUserPlatform: + constant.PlatformIDToName(utils.StringToInt(c.GetPlatformID())) + default: + return "" + } + return "" } func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnContext { @@ -21,6 +52,7 @@ func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnCont Path: req.URL.Path, Method: req.Method, RemoteAddr: req.RemoteAddr, + ConnID: utils.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))), } } func (c *UserConnContext) Query(key string) (string, bool) { @@ -44,7 +76,7 @@ func (c *UserConnContext) ErrReturn(error string, code int) { http.Error(c.RespWriter, error, code) } func (c *UserConnContext) GetConnID() string { - return utils.Md5(c.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))) + return c.ConnID } func (c *UserConnContext) GetUserID() string { return c.Req.URL.Query().Get(WsUserID) @@ -52,3 +84,6 @@ func (c *UserConnContext) GetUserID() string { func (c *UserConnContext) GetPlatformID() string { return c.Req.URL.Query().Get(PlatformID) } +func (c *UserConnContext) GetOperationID() string { + return c.Req.URL.Query().Get(OperationID) +} diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index f9f7bfe8d..9f20143ef 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -67,7 +67,7 @@ func (s *Server) GetUsersOnlineStatus(ctx context.Context, req *msggateway.GetUs ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail) ps.Platform = constant.PlatformIDToName(client.platformID) ps.Status = constant.OnlineStatus - ps.ConnID = client.connID + ps.ConnID = client.ctx.GetConnID() ps.IsBackground = client.isBackground temp.Status = constant.OnlineStatus temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps) diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 655ba0c89..26d3ea4fa 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -2,7 +2,7 @@ package msggateway import ( "errors" - "fmt" + "github.com/OpenIMSDK/Open-IM-Server/pkg/common/log" "github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify" "github.com/OpenIMSDK/Open-IM-Server/pkg/errs" "github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification" @@ -97,7 +97,6 @@ func NewWsServer(opts ...Option) (*WsServer, error) { clients: newUserMap(), Compressor: NewGzipCompressor(), Encoder: NewGobEncoder(), - //handler: NewGrpcHandler(validate), }, nil } func (ws *WsServer) Run() error { @@ -127,8 +126,7 @@ func (ws *WsServer) registerClient(client *Client) { ws.clients.Set(client.userID, client) atomic.AddInt64(&ws.onlineUserNum, 1) atomic.AddInt64(&ws.onlineUserConnNum, 1) - fmt.Println("R在线用户数量:", ws.onlineUserNum) - fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum) + } else { if clientOK { //已经有同平台的连接存在 ws.clients.Set(client.userID, client) @@ -136,11 +134,9 @@ func (ws *WsServer) registerClient(client *Client) { } else { ws.clients.Set(client.userID, client) atomic.AddInt64(&ws.onlineUserConnNum, 1) - fmt.Println("R在线用户数量:", ws.onlineUserNum) - fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum) } } - + log.ZInfo(client.ctx, "user online", "online user Num", ws.onlineUserNum, "online user conn Num", ws.onlineUserConnNum) } func (ws *WsServer) multiTerminalLoginChecker(client []*Client) { @@ -153,8 +149,7 @@ func (ws *WsServer) unregisterClient(client *Client) { atomic.AddInt64(&ws.onlineUserNum, -1) } atomic.AddInt64(&ws.onlineUserConnNum, -1) - fmt.Println("R在线用户数量:", ws.onlineUserNum) - fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum) + log.ZInfo(client.ctx, "user offline", "online user Num", ws.onlineUserNum, "online user conn Num", ws.onlineUserConnNum) } func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/common/mcontext/ctx.go b/pkg/common/mcontext/ctx.go index c1191bdaa..fe4344616 100644 --- a/pkg/common/mcontext/ctx.go +++ b/pkg/common/mcontext/ctx.go @@ -72,6 +72,16 @@ func GetTriggerID(ctx context.Context) string { } return "" } +func GetOpUserPlatform(ctx context.Context) string { + if ctx.Value(constant.OpUserPlatform) != "" { + s, ok := ctx.Value(constant.OpUserPlatform).(string) + if ok { + return s + } + } + return "" +} + func GetMustCtxInfo(ctx context.Context) (operationID, opUserID, platform, connID string, err error) { operationID, ok := ctx.Value(constant.OperationID).(string) if !ok {