From 05d83ebe8b6fa3215a8554f9f2a81e36b16ba1a1 Mon Sep 17 00:00:00 2001 From: wangchuxiao Date: Wed, 16 Nov 2022 18:55:42 +0800 Subject: [PATCH] gzip msg --- internal/msg_gateway/gate/ws_server.go | 37 +++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/internal/msg_gateway/gate/ws_server.go b/internal/msg_gateway/gate/ws_server.go index eb101f2ff..53f2807cd 100644 --- a/internal/msg_gateway/gate/ws_server.go +++ b/internal/msg_gateway/gate/ws_server.go @@ -11,8 +11,10 @@ import ( pbRelay "Open_IM/pkg/proto/relay" "Open_IM/pkg/utils" "bytes" + "compress/gzip" "context" "encoding/gob" + "io/ioutil" "strings" go_redis "github.com/go-redis/redis/v8" @@ -31,6 +33,7 @@ type UserConn struct { w *sync.Mutex platformID int32 PushedMaxSeq uint32 + IsCompress bool } type WServer struct { wsAddr string @@ -75,7 +78,11 @@ func (ws *WServer) wsHandler(w http.ResponseWriter, r *http.Request) { log.Error(operationID, "upgrade http conn err", err.Error(), query) return } else { - newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0} + var isCompress = false + if r.Header.Get("compression") == "gzip" { + isCompress = true + } + newConn := &UserConn{conn, new(sync.Mutex), utils.StringToInt32(query["platformID"][0]), 0, isCompress} userCount++ ws.addUserConn(query["sendID"][0], utils.StringToInt(query["platformID"][0]), newConn, query["token"][0], operationID) go ws.readMsg(newConn) @@ -97,6 +104,23 @@ func (ws *WServer) readMsg(conn *UserConn) { ws.delUserConn(conn) return } + if conn.IsCompress { + buff := bytes.NewBuffer(msg) + reader, err := gzip.NewReader(buff) + if err != nil { + log.NewWarn("", "un gzip read failed") + continue + } + msg, err = ioutil.ReadAll(reader) + if err != nil { + log.NewWarn("", "ReadAll failed") + continue + } + err = reader.Close() + if err != nil { + log.NewWarn("", "reader close failed") + } + } ws.msgParse(conn, msg) } } @@ -110,6 +134,17 @@ func (ws *WServer) SetWriteTimeout(conn *UserConn, timeout int) { func (ws *WServer) writeMsg(conn *UserConn, a int, msg []byte) error { conn.w.Lock() defer conn.w.Unlock() + if conn.IsCompress { + buff := bytes.NewBuffer(msg) + gz := gzip.NewWriter(buff) + if _, err := gz.Write(buff.Bytes()); err != nil { + return utils.Wrap(err, "") + } + if err := gz.Close(); err != nil { + return utils.Wrap(err, "") + } + msg = buff.Bytes() + } conn.SetWriteDeadline(time.Now().Add(time.Duration(60) * time.Second)) return conn.WriteMessage(a, msg) }