diff --git a/internal/api/msg.go b/internal/api/msg.go index cff83ac87..dbedf85a6 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -179,18 +179,19 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM return nil, err } default: - return nil, errs.ErrArgs.WithDetail("not support err contentType") + return nil, errs.WrapMsg(errs.ErrArgs, "unsupported content type", "contentType", req.ContentType) } if err := mapstructure.WeakDecode(req.Content, &data); err != nil { - return nil, err + return nil, errs.WrapMsg(err, "failed to decode message content") } - log.ZDebug(c, "getSendMsgReq", "req", req.Content) + log.ZDebug(c, "getSendMsgReq", "decodedContent", data) if err := m.validate.Struct(data); err != nil { - return nil, err + return nil, errs.WrapMsg(err, "validation error") } return m.newUserSendMsgReq(c, &req), nil } + // SendMessage handles the sending of a message. It's an HTTP handler function to be used with Gin framework. func (m *MessageApi) SendMessage(c *gin.Context) { // Initialize a request struct for sending a message. diff --git a/internal/api/route.go b/internal/api/route.go index c136d1b98..d1f35ddf7 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -53,9 +53,11 @@ import ( func Start(config *config.GlobalConfig, port int, proPort int) error { if port == 0 || proPort == 0 { - err := errors.New("port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort)) - return errs.Wrap(err) + err := errors.New("port or proPort is empty") + wrappedErr := errs.WrapMsg(err, "validation error", "port", port, "proPort", proPort) + return wrappedErr } + rdb, err := cache.NewRedis(&config.Redis) if err != nil { return err @@ -66,20 +68,22 @@ func Start(config *config.GlobalConfig, port int, proPort int) error { // Determine whether zk is passed according to whether it is a clustered deployment client, err = kdisc.NewDiscoveryRegister(config) if err != nil { - return errs.WrapMsg(err, "register discovery err") + return errs.WrapMsg(err, "failed to register discovery service") } - + if err = client.CreateRpcRootNodes(config.GetServiceNames()); err != nil { - return errs.WrapMsg(err, "create rpc root nodes error") + return errs.WrapMsg(err, "failed to create RPC root nodes") } - + if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.EncodeConfig()); err != nil { - return errs.Wrap(err) + return errs.WrapMsg(err, "failed to register configuration to registry") } + var ( netDone = make(chan struct{}, 1) netErr error ) + router := newGinRouter(client, rdb, config) if config.Prometheus.Enable { go func() { diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index cbddf3d87..5ed3cb3a2 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -268,7 +268,7 @@ func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, re } if binaryReq.ReqIdentifier == WsLogoutMsg { - return errs.Wrap(errors.New("user logout")) + return errs.WrapMsg(errors.New("user logout"), "user requested logout", "operationID", binaryReq.OperationID) } return nil } diff --git a/internal/msggateway/compressor.go b/internal/msggateway/compressor.go index 399a972a0..7d64b1b86 100644 --- a/internal/msggateway/compressor.go +++ b/internal/msggateway/compressor.go @@ -34,6 +34,7 @@ type Compressor interface { DeCompress(compressedData []byte) ([]byte, error) DecompressWithPool(compressedData []byte) ([]byte, error) } + type GzipCompressor struct { compressProtocol string } @@ -47,7 +48,21 @@ func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error) { gz := gzip.NewWriter(&gzipBuffer) if _, err := gz.Write(rawData); err != nil { - return nil, errs.WrapMsg(err, "GzipCompressor.Compress: writing to gzip writer failed") + return nil, errs.WrapMsg(err, func (g *GzipCompressor) CompressWithPool(rawData []byte) ([]byte, error) { + gz := gzipWriterPool.Get().(*gzip.Writer) + defer gzipWriterPool.Put(gz) + + gzipBuffer := bytes.Buffer{} + gz.Reset(&gzipBuffer) + + if _, err := gz.Write(rawData); err != nil { + return nil, errs.WrapMsg(err, "GzipCompressor.CompressWithPool: error writing data") + } + if err := gz.Close(); err != nil { + return nil, errs.WrapMsg(err, "GzipCompressor.CompressWithPool: error closing gzip writer") + } + return gzipBuffer.Bytes(), nil + }"GzipCompressor.Compress: writing to gzip writer failed") } if err := gz.Close(); err != nil { diff --git a/internal/msggateway/context.go b/internal/msggateway/context.go index 85fe5c734..ad679c1a1 100644 --- a/internal/msggateway/context.go +++ b/internal/msggateway/context.go @@ -141,7 +141,6 @@ func (c *UserConnContext) GetBackground() bool { b, err := strconv.ParseBool(c.Req.URL.Query().Get(BackgroundStatus)) if err != nil { return false - } else { - return b } + return b } diff --git a/internal/msggateway/encoder.go b/internal/msggateway/encoder.go index 342c99f3f..caf91deec 100644 --- a/internal/msggateway/encoder.go +++ b/internal/msggateway/encoder.go @@ -35,9 +35,8 @@ func NewGobEncoder() *GobEncoder { func (g *GobEncoder) Encode(data any) ([]byte, error) { buff := bytes.Buffer{} enc := gob.NewEncoder(&buff) - err := enc.Encode(data) - if err != nil { - return nil, errs.WrapMsg(err, "GobEncoder.Encode failed") + if err := enc.Encode(data); err != nil { + return nil, errs.WrapMsg(err, "GobEncoder.Encode failed", "action", "encode") } return buff.Bytes(), nil } @@ -45,9 +44,8 @@ func (g *GobEncoder) Encode(data any) ([]byte, error) { func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error { buff := bytes.NewBuffer(encodeData) dec := gob.NewDecoder(buff) - err := dec.Decode(decodeData) - if err != nil { - return errs.WrapMsg(err, "GobEncoder.Decode failed") + if err := dec.Decode(decodeData); err != nil { + return errs.WrapMsg(err, "GobEncoder.Decode failed", "action", "decode") } return nil } diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 18b0f0880..2ba872946 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -123,10 +123,7 @@ func (s *Server) GetUsersOnlineStatus( return &resp, nil } -func (s *Server) OnlineBatchPushOneMsg( - ctx context.Context, - req *msggateway.OnlineBatchPushOneMsgReq, -) (*msggateway.OnlineBatchPushOneMsgResp, error) { +func (s *Server) OnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) { panic("implement me") } @@ -204,10 +201,7 @@ func (s *Server) KickUserOffline( return &msggateway.KickUserOfflineResp{}, nil } -func (s *Server) MultiTerminalLoginCheck( - ctx context.Context, - req *msggateway.MultiTerminalLoginCheckReq, -) (*msggateway.MultiTerminalLoginCheckResp, error) { +func (s *Server) MultiTerminalLoginCheck(ctx context.Context, req *msggateway.MultiTerminalLoginCheckReq) (*msggateway.MultiTerminalLoginCheckResp, error) { if oldClients, userOK, clientOK := s.LongConnServer.GetUserPlatformCons(req.UserID, int(req.PlatformID)); userOK { tempUserCtx := newTempContext() tempUserCtx.SetToken(req.Token) diff --git a/internal/msggateway/long_conn.go b/internal/msggateway/long_conn.go index c5ea10558..baa1e6d22 100644 --- a/internal/msggateway/long_conn.go +++ b/internal/msggateway/long_conn.go @@ -113,12 +113,14 @@ func (d *GWebSocket) SetWriteDeadline(timeout time.Duration) error { func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Response, error) { conn, httpResp, err := websocket.DefaultDialer.Dial(urlStr, requestHeader) - if err == nil { - d.conn = conn + if err != nil { + return httpResp, errs.WrapMsg(err, "GWebSocket.Dial failed", "url", urlStr) } - return httpResp, err + d.conn = conn + return httpResp, nil } + func (d *GWebSocket) IsNil() bool { return d.conn == nil // diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index 56f88ce05..7bcaebcb8 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -116,21 +116,21 @@ func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDi } } -func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error) { +func (g GrpcHandler) GetSeq(ctx context.Context, data *Req) ([]byte, error) { req := sdkws.GetMaxSeqReq{} if err := proto.Unmarshal(data.Data, &req); err != nil { - return nil, errs.WrapMsg(err, "GetSeq: error unmarshaling request") + return nil, errs.WrapMsg(err, "GetSeq: error unmarshaling request", "action", "unmarshal", "dataType", "GetMaxSeqReq") } if err := g.validate.Struct(&req); err != nil { - return nil, errs.WrapMsg(err, "GetSeq: validation failed") + return nil, errs.WrapMsg(err, "GetSeq: validation failed", "action", "validate", "dataType", "GetMaxSeqReq") } - resp, err := g.msgRpcClient.GetMaxSeq(context, &req) + resp, err := g.msgRpcClient.GetMaxSeq(ctx, &req) if err != nil { return nil, err } c, err := proto.Marshal(resp) if err != nil { - return nil, errs.WrapMsg(err, "GetSeq: error marshaling response") + return nil, errs.WrapMsg(err, "GetSeq: error marshaling response", "action", "marshal", "dataType", "GetMaxSeqResp") } return c, nil } @@ -138,19 +138,16 @@ func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error) // SendMessage handles the sending of messages through gRPC. It unmarshals the request data, // validates the message, and then sends it using the message RPC client. func (g GrpcHandler) SendMessage(ctx context.Context, data *Req) ([]byte, error) { - // Unmarshal the message data from the request. var msgData sdkws.MsgData if err := proto.Unmarshal(data.Data, &msgData); err != nil { - return nil, errs.WrapMsg(err, "error unmarshalling message data") + return nil, errs.WrapMsg(err, "SendMessage: error unmarshaling message data", "action", "unmarshal", "dataType", "MsgData") } - // Validate the message data structure. if err := g.validate.Struct(&msgData); err != nil { - return nil, errs.WrapMsg(err, "message data validation failed") + return nil, errs.WrapMsg(err, "SendMessage: message data validation failed", "action", "validate", "dataType", "MsgData") } req := msg.SendMsgReq{MsgData: &msgData} - resp, err := g.msgRpcClient.SendMsg(ctx, &req) if err != nil { return nil, err @@ -158,12 +155,13 @@ func (g GrpcHandler) SendMessage(ctx context.Context, data *Req) ([]byte, error) c, err := proto.Marshal(resp) if err != nil { - return nil, errs.WrapMsg(err, "error marshaling response") + return nil, errs.WrapMsg(err, "SendMessage: error marshaling response", "action", "marshal", "dataType", "SendMsgResp") } return c, nil } + func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]byte, error) { resp, err := g.msgRpcClient.SendMsg(context, nil) if err != nil { @@ -171,7 +169,7 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by } c, err := proto.Marshal(resp) if err != nil { - return nil, errs.WrapMsg(err, "error marshaling response") + return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "SendMsgResp") } return c, nil } @@ -179,10 +177,10 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([]byte, error) { req := sdkws.PullMessageBySeqsReq{} if err := proto.Unmarshal(data.Data, &req); err != nil { - return nil, errs.WrapMsg(err, "error unmarshaling request") + return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "PullMessageBySeqsReq") } if err := g.validate.Struct(data); err != nil { - return nil, errs.WrapMsg(err, "validation failed") + return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "PullMessageBySeqsReq") } resp, err := g.msgRpcClient.PullMessageBySeqList(context, &req) if err != nil { @@ -190,7 +188,7 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([ } c, err := proto.Marshal(resp) if err != nil { - return nil, errs.WrapMsg(err, "error marshaling response") + return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "PullMessageBySeqsResp") } return c, nil } @@ -198,7 +196,7 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([ func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, error) { req := push.DelUserPushTokenReq{} if err := proto.Unmarshal(data.Data, &req); err != nil { - return nil, errs.WrapMsg(err, "error unmarshaling request") + return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "DelUserPushTokenReq") } resp, err := g.pushClient.DelUserPushToken(context, &req) if err != nil { @@ -206,7 +204,7 @@ func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, err } c, err := proto.Marshal(resp) if err != nil { - return nil, errs.WrapMsg(err, "error marshaling response") + return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "DelUserPushTokenResp") } return c, nil } @@ -214,10 +212,10 @@ func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, err func (g GrpcHandler) SetUserDeviceBackground(_ context.Context, data *Req) ([]byte, bool, error) { req := sdkws.SetAppBackgroundStatusReq{} if err := proto.Unmarshal(data.Data, &req); err != nil { - return nil, false, errs.WrapMsg(err, "error unmarshaling request") + return nil, false, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "SetAppBackgroundStatusReq") } if err := g.validate.Struct(data); err != nil { - return nil, false, errs.WrapMsg(err, "validation failed") + return nil, false, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "SetAppBackgroundStatusReq") } return nil, req.IsBackground, nil } diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/n_ws_server.go index 965464fe7..061321180 100644 --- a/internal/msggateway/n_ws_server.go +++ b/internal/msggateway/n_ws_server.go @@ -130,7 +130,7 @@ func (ws *WsServer) UnRegister(c *Client) { func (ws *WsServer) Validate(s any) error { if s == nil { - return errs.Wrap(errors.New("input cannot be nil")) + return errs.WrapMsg(errors.New("input cannot be nil"), "Validate: input is nil", "action", "validate", "dataType", "any") } return nil } @@ -197,9 +197,9 @@ func (ws *WsServer) Run(done chan error) error { go func() { http.HandleFunc("/", ws.wsHandler) err := server.ListenAndServe() + defer close(netDone) if err != nil && err != http.ErrServerClosed { netErr = errs.WrapMsg(err, "ws start err", server.Addr) - close(netDone) } }() ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) @@ -296,6 +296,7 @@ func (ws *WsServer) registerClient(client *Client) { _ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client) }() } + wg.Add(1) go func() { defer wg.Done() diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index 2d1bee25e..832c1ed94 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -42,6 +42,7 @@ func NewDiscoveryRegister(config *config.GlobalConfig) (discoveryregistry.SvcDis case "direct": return direct.NewConnDirect(config) default: - return nil, errs.Wrap(errors.New("envType not correct")) + errMsg := "unsupported discovery type" + return nil, errs.WrapMsg(errors.New(errMsg), errMsg, "type", config.Envs.Discovery) } }