message update

This commit is contained in:
Gordon 2023-02-15 19:57:16 +08:00
parent f517e63fca
commit 1ff8ba7fc2
3 changed files with 146 additions and 57 deletions

View File

@ -2,21 +2,20 @@ package new
import ( import (
"Open_IM/pkg/common/constant" "Open_IM/pkg/common/constant"
promePkg "Open_IM/pkg/common/prometheus" "Open_IM/pkg/utils"
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/envoyproxy/protoc-gen-validate/validate"
"github.com/go-playground/validator/v10" "github.com/go-playground/validator/v10"
"open_im_sdk/pkg/log"
"open_im_sdk/pkg/utils"
"runtime/debug" "runtime/debug"
"sync" "sync"
"time"
) )
const ( const (
// MessageText is for UTF-8 encoded text messages like JSON. // MessageText is for UTF-8 encoded text messages like JSON.
MessageText = iota + 1 MessageText = iota + 1
// MessageBinary is for binary messages like protobufs. // MessageBinary is for binary messages like protobufs.
MessageBinary MessageBinary
// CloseMessage denotes a close control message. The optional message // CloseMessage denotes a close control message. The optional message
@ -34,48 +33,51 @@ const (
) )
type Client struct { type Client struct {
w *sync.Mutex w *sync.Mutex
conn LongConn conn LongConn
PlatformID int32 PlatformID int32
PushedMaxSeq uint32 PushedMaxSeq uint32
IsCompress bool IsCompress bool
userID string userID string
IsBackground bool IsBackground bool
token string token string
connID string connID string
onlineAt int64 // 上线时间戳(毫秒) onlineAt int64 // 上线时间戳(毫秒)
handler MessageHandler handler MessageHandler
unregisterChan chan *Client unregisterChan chan *Client
compressor Compressor compressor Compressor
encoder Encoder encoder Encoder
userContext UserConnContext userContext UserConnContext
validate *validator.Validate validate *validator.Validate
closed bool
} }
func newClient( conn LongConn,isCompress bool, userID string, isBackground bool, token string, func newClient(conn LongConn, isCompress bool, userID string, isBackground bool, token string,
connID string, onlineAt int64, handler MessageHandler,unregisterChan chan *Client) *Client { connID string, onlineAt int64, handler MessageHandler, unregisterChan chan *Client) *Client {
return &Client{ return &Client{
conn: conn, conn: conn,
IsCompress: isCompress, IsCompress: isCompress,
userID: userID, IsBackground: userID: userID, IsBackground: isBackground, token: token,
isBackground, token: token, connID: connID,
connID: connID, onlineAt: onlineAt,
onlineAt: onlineAt, handler: handler,
handler: handler, unregisterChan: unregisterChan,
unregisterChan: unregisterChan,
} }
} }
func(c *Client) readMessage(){ func (c *Client) readMessage() {
defer func() { defer func() {
if r:=recover(); r != nil { if r := recover(); r != nil {
fmt.Println("socket have panic err:", r, string(debug.Stack())) fmt.Println("socket have panic err:", r, string(debug.Stack()))
} }
//c.close() //c.close()
}() }()
var returnErr error var returnErr error
for { for {
messageType, message, returnErr := c.conn.ReadMessage() messageType, message, returnErr := c.conn.ReadMessage()
if returnErr!=nil{ if returnErr != nil {
break
}
if c.closed == true {
break break
} }
switch messageType { switch messageType {
@ -89,7 +91,7 @@ func(c *Client) readMessage(){
continue continue
} }
returnErr = c.handleMessage(message) returnErr = c.handleMessage(message)
if returnErr!=nil{ if returnErr != nil {
break break
} }
@ -97,52 +99,88 @@ func(c *Client) readMessage(){
} }
} }
func (c *Client) handleMessage(message []byte)error { func (c *Client) handleMessage(message []byte) error {
if c.IsCompress { if c.IsCompress {
var decompressErr error var decompressErr error
message,decompressErr = c.compressor.DeCompress(message) message, decompressErr = c.compressor.DeCompress(message)
if decompressErr != nil { if decompressErr != nil {
return utils.Wrap(decompressErr,"") return utils.Wrap(decompressErr, "")
} }
} }
var binaryReq Req var binaryReq Req
err := c.encoder.Decode(message, &binaryReq) err := c.encoder.Decode(message, &binaryReq)
if err != nil { if err != nil {
return utils.Wrap(err,"") return utils.Wrap(err, "")
} }
if err := c.validate.Struct(binaryReq); err != nil { if err := c.validate.Struct(binaryReq); err != nil {
return utils.Wrap(err,"") return utils.Wrap(err, "")
} }
if binaryReq.SendID != c.userID { if binaryReq.SendID != c.userID {
return errors.New("exception conn userID not same to req userID") return errors.New("exception conn userID not same to req userID")
} }
ctx:=context.Background() ctx := context.Background()
ctx =context.WithValue(ctx,"operationID",binaryReq.OperationID) ctx = context.WithValue(ctx, "operationID", binaryReq.OperationID)
ctx = context.WithValue(ctx,"userID",binaryReq.SendID) ctx = context.WithValue(ctx, "userID", binaryReq.SendID)
var messageErr error var messageErr error
var resp []byte var resp []byte
switch binaryReq.ReqIdentifier { switch binaryReq.ReqIdentifier {
case constant.WSGetNewestSeq: case constant.WSGetNewestSeq:
resp,messageErr=c.handler.GetSeq(ctx,binaryReq) resp, messageErr = c.handler.GetSeq(ctx, binaryReq)
case constant.WSSendMsg: case constant.WSSendMsg:
resp,messageErr=c.handler.SendMessage(ctx,binaryReq) resp, messageErr = c.handler.SendMessage(ctx, binaryReq)
case constant.WSSendSignalMsg: case constant.WSSendSignalMsg:
resp,messageErr=c.handler.SendSignalMessage(ctx,binaryReq) resp, messageErr = c.handler.SendSignalMessage(ctx, binaryReq)
case constant.WSPullMsgBySeqList: case constant.WSPullMsgBySeqList:
resp,messageErr=c.handler.PullMessageBySeqList(ctx,binaryReq) resp, messageErr = c.handler.PullMessageBySeqList(ctx, binaryReq)
case constant.WsLogoutMsg: case constant.WsLogoutMsg:
resp,messageErr=c.handler.UserLogout(ctx,binaryReq) resp, messageErr = c.handler.UserLogout(ctx, binaryReq)
case constant.WsSetBackgroundStatus: case constant.WsSetBackgroundStatus:
resp,messageErr=c.handler.SetUserDeviceBackground(ctx,binaryReq) resp, messageErr = c.handler.SetUserDeviceBackground(ctx, binaryReq)
default: default:
return errors.New(fmt.Sprintf("ReqIdentifier failed,sendID:%d,msgIncr:%s,reqIdentifier:%s",binaryReq.SendID,binaryReq.MsgIncr,binaryReq.ReqIdentifier)) return errors.New(fmt.Sprintf("ReqIdentifier failed,sendID:%d,msgIncr:%s,reqIdentifier:%s", binaryReq.SendID, binaryReq.MsgIncr, binaryReq.ReqIdentifier))
} }
c.replyMessage(binaryReq, messageErr, resp)
return nil
} }
func (c *Client) close() { func (c *Client) close() {
c.w.Lock()
defer c.w.Unlock()
c.conn.Close()
c.unregisterChan <- c
} }
func () { func (c *Client) replyMessage(binaryReq Req, err error, resp []byte) {
mReply := Resp{
ReqIdentifier: binaryReq.ReqIdentifier,
MsgIncr: binaryReq.MsgIncr,
OperationID: binaryReq.OperationID,
Data: resp,
}
_ = c.writeMsg(mReply)
}
func (c *Client) writeMsg(resp Resp) error {
c.w.Lock()
defer c.w.Unlock()
if c.closed == true {
return nil
}
encodedBuf := bufferPool.Get().([]byte)
resultBuf := bufferPool.Get().([]byte)
encodeBuf, err := c.encoder.Encode(resp)
if err != nil {
return utils.Wrap(err, "")
}
_ = c.conn.SetWriteTimeout(60)
if c.IsCompress {
var compressErr error
resultBuf, compressErr = c.compressor.Compress(encodeBuf)
if compressErr != nil {
return utils.Wrap(compressErr, "")
}
return c.conn.WriteMessage(MessageBinary, resultBuf)
} else {
return c.conn.WriteMessage(MessageBinary, encodedBuf)
}
} }

View File

@ -10,6 +10,14 @@ type Req struct {
MsgIncr string `json:"msgIncr" validate:"required"` MsgIncr string `json:"msgIncr" validate:"required"`
Data []byte `json:"data"` Data []byte `json:"data"`
} }
type Resp struct {
ReqIdentifier int32 `json:"reqIdentifier"`
MsgIncr string `json:"msgIncr"`
OperationID string `json:"operationID"`
ErrCode int32 `json:"errCode"`
ErrMsg string `json:"errMsg"`
Data []byte `json:"data"`
}
type MessageHandler interface { type MessageHandler interface {
GetSeq(context context.Context, data Req) ([]byte, error) GetSeq(context context.Context, data Req) ([]byte, error)
SendMessage(context context.Context, data Req) ([]byte, error) SendMessage(context context.Context, data Req) ([]byte, error)

View File

@ -1,14 +1,22 @@
package new package new
import ( import (
"bytes"
"errors" "errors"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"net/http" "net/http"
"open_im_sdk/pkg/utils" "open_im_sdk/pkg/utils"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
var bufferPool = sync.Pool{
New: func() interface{} {
return make([]byte, 1000)
},
}
type LongConnServer interface { type LongConnServer interface {
Run() error Run() error
} }
@ -58,6 +66,41 @@ func newWsServer(opts ...Option) (*WsServer, error) {
}, nil }, nil
} }
func (ws *WsServer) Run() error { func (ws *WsServer) Run() error {
var client *Client
go func() {
for {
select {
case client = <-ws.registerChan:
ws.registerClient(client)
case client = <-h.unregisterChan:
h.unregisterClient(client)
case msg = <-h.readChan:
h.messageHandler(msg)
}
}
}()
}
func (ws *WsServer) registerClient(client *Client) {
var (
ok bool
cli *Client
)
if cli, ok = h.clients.Get(client.key); ok == false {
h.clients.Set(client.key, client)
atomic.AddInt64(&h.onlineConnections, 1)
fmt.Println("R在线用户数量:", h.onlineConnections)
return
}
if client.onlineAt > cli.onlineAt {
h.clients.Set(client.key, client)
h.close(cli)
return
}
h.close(client)
}
http.HandleFunc("/", ws.wsHandler) //Get request from client to handle by wsHandler http.HandleFunc("/", ws.wsHandler) //Get request from client to handle by wsHandler
return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) //Start listening return http.ListenAndServe(":"+utils.IntToString(ws.port), nil) //Start listening