ws update

This commit is contained in:
Gordon 2023-03-23 15:14:50 +08:00
parent 6382528360
commit e08a127213
5 changed files with 55 additions and 15 deletions

View File

@ -37,7 +37,7 @@ type Client struct {
isCompress bool
userID string
isBackground bool
connID string
ctx *UserConnContext
onlineAt int64 // 上线时间戳(毫秒)
longConnServer LongConnServer
closed bool
@ -50,7 +50,7 @@ func newClient(ctx *UserConnContext, conn LongConn, isCompress bool) *Client {
platformID: utils.StringToInt(ctx.GetPlatformID()),
isCompress: isCompress,
userID: ctx.GetUserID(),
connID: ctx.GetConnID(),
ctx: ctx,
onlineAt: utils.GetCurrentTimestampByMill(),
}
}
@ -60,7 +60,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, isCompress boo
c.platformID = utils.StringToInt(ctx.GetPlatformID())
c.isCompress = isCompress
c.userID = ctx.GetUserID()
c.connID = ctx.GetConnID()
c.ctx = ctx
c.onlineAt = utils.GetCurrentTimestampByMill()
c.longConnServer = longConnServer
}
@ -120,7 +120,7 @@ func (c *Client) handleMessage(message []byte) error {
return errors.New("exception conn userID not same to req userID")
}
ctx := context.Background()
ctx = context.WithValue(ctx, ConnID, c.connID)
ctx = context.WithValue(ctx, ConnID, c.ctx.GetConnID())
ctx = context.WithValue(ctx, OperationID, binaryReq.OperationID)
ctx = context.WithValue(ctx, CommonUserID, binaryReq.SendID)
ctx = context.WithValue(ctx, PlatformID, c.platformID)

View File

@ -1,9 +1,11 @@
package msggateway
import (
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/constant"
"github.com/OpenIMSDK/Open-IM-Server/pkg/utils"
"net/http"
"strconv"
"time"
)
type UserConnContext struct {
@ -12,6 +14,35 @@ type UserConnContext struct {
Path string
Method string
RemoteAddr string
ConnID string
}
func (c *UserConnContext) Deadline() (deadline time.Time, ok bool) {
return
}
func (c *UserConnContext) Done() <-chan struct{} {
return nil
}
func (c *UserConnContext) Err() error {
return nil
}
func (c *UserConnContext) Value(key any) any {
switch key {
case constant.OpUserID:
c.GetUserID()
case constant.OperationID:
c.GetOperationID()
case constant.ConnID:
c.GetConnID()
case constant.OpUserPlatform:
constant.PlatformIDToName(utils.StringToInt(c.GetPlatformID()))
default:
return ""
}
return ""
}
func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnContext {
@ -21,6 +52,7 @@ func newContext(respWriter http.ResponseWriter, req *http.Request) *UserConnCont
Path: req.URL.Path,
Method: req.Method,
RemoteAddr: req.RemoteAddr,
ConnID: utils.Md5(req.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill()))),
}
}
func (c *UserConnContext) Query(key string) (string, bool) {
@ -44,7 +76,7 @@ func (c *UserConnContext) ErrReturn(error string, code int) {
http.Error(c.RespWriter, error, code)
}
func (c *UserConnContext) GetConnID() string {
return utils.Md5(c.RemoteAddr + "_" + strconv.Itoa(int(utils.GetCurrentTimestampByMill())))
return c.ConnID
}
func (c *UserConnContext) GetUserID() string {
return c.Req.URL.Query().Get(WsUserID)
@ -52,3 +84,6 @@ func (c *UserConnContext) GetUserID() string {
func (c *UserConnContext) GetPlatformID() string {
return c.Req.URL.Query().Get(PlatformID)
}
func (c *UserConnContext) GetOperationID() string {
return c.Req.URL.Query().Get(OperationID)
}

View File

@ -67,7 +67,7 @@ func (s *Server) GetUsersOnlineStatus(ctx context.Context, req *msggateway.GetUs
ps := new(msggateway.GetUsersOnlineStatusResp_SuccessDetail)
ps.Platform = constant.PlatformIDToName(client.platformID)
ps.Status = constant.OnlineStatus
ps.ConnID = client.connID
ps.ConnID = client.ctx.GetConnID()
ps.IsBackground = client.isBackground
temp.Status = constant.OnlineStatus
temp.DetailPlatformStatus = append(temp.DetailPlatformStatus, ps)

View File

@ -2,7 +2,7 @@ package msggateway
import (
"errors"
"fmt"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/log"
"github.com/OpenIMSDK/Open-IM-Server/pkg/common/tokenverify"
"github.com/OpenIMSDK/Open-IM-Server/pkg/errs"
"github.com/OpenIMSDK/Open-IM-Server/pkg/rpcclient/notification"
@ -97,7 +97,6 @@ func NewWsServer(opts ...Option) (*WsServer, error) {
clients: newUserMap(),
Compressor: NewGzipCompressor(),
Encoder: NewGobEncoder(),
//handler: NewGrpcHandler(validate),
}, nil
}
func (ws *WsServer) Run() error {
@ -127,8 +126,7 @@ func (ws *WsServer) registerClient(client *Client) {
ws.clients.Set(client.userID, client)
atomic.AddInt64(&ws.onlineUserNum, 1)
atomic.AddInt64(&ws.onlineUserConnNum, 1)
fmt.Println("R在线用户数量:", ws.onlineUserNum)
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
} else {
if clientOK { //已经有同平台的连接存在
ws.clients.Set(client.userID, client)
@ -136,11 +134,9 @@ func (ws *WsServer) registerClient(client *Client) {
} else {
ws.clients.Set(client.userID, client)
atomic.AddInt64(&ws.onlineUserConnNum, 1)
fmt.Println("R在线用户数量:", ws.onlineUserNum)
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
}
}
log.ZInfo(client.ctx, "user online", "online user Num", ws.onlineUserNum, "online user conn Num", ws.onlineUserConnNum)
}
func (ws *WsServer) multiTerminalLoginChecker(client []*Client) {
@ -153,8 +149,7 @@ func (ws *WsServer) unregisterClient(client *Client) {
atomic.AddInt64(&ws.onlineUserNum, -1)
}
atomic.AddInt64(&ws.onlineUserConnNum, -1)
fmt.Println("R在线用户数量:", ws.onlineUserNum)
fmt.Println("R在线用户连接数量:", ws.onlineUserConnNum)
log.ZInfo(client.ctx, "user offline", "online user Num", ws.onlineUserNum, "online user conn Num", ws.onlineUserConnNum)
}
func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {

View File

@ -72,6 +72,16 @@ func GetTriggerID(ctx context.Context) string {
}
return ""
}
func GetOpUserPlatform(ctx context.Context) string {
if ctx.Value(constant.OpUserPlatform) != "" {
s, ok := ctx.Value(constant.OpUserPlatform).(string)
if ok {
return s
}
}
return ""
}
func GetMustCtxInfo(ctx context.Context) (operationID, opUserID, platform, connID string, err error) {
operationID, ok := ctx.Value(constant.OperationID).(string)
if !ok {