feat: optimize openim config code

This commit is contained in:
Xinwei Xiong (cubxxw) 2024-03-18 14:45:01 +08:00
parent aca8a5020a
commit 999d0dfd85
11 changed files with 67 additions and 54 deletions

View File

@ -179,18 +179,19 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM
return nil, err
}
default:
return nil, errs.ErrArgs.WithDetail("not support err contentType")
return nil, errs.WrapMsg(errs.ErrArgs, "unsupported content type", "contentType", req.ContentType)
}
if err := mapstructure.WeakDecode(req.Content, &data); err != nil {
return nil, err
return nil, errs.WrapMsg(err, "failed to decode message content")
}
log.ZDebug(c, "getSendMsgReq", "req", req.Content)
log.ZDebug(c, "getSendMsgReq", "decodedContent", data)
if err := m.validate.Struct(data); err != nil {
return nil, err
return nil, errs.WrapMsg(err, "validation error")
}
return m.newUserSendMsgReq(c, &req), nil
}
// SendMessage handles the sending of a message. It's an HTTP handler function to be used with Gin framework.
func (m *MessageApi) SendMessage(c *gin.Context) {
// Initialize a request struct for sending a message.

View File

@ -53,9 +53,11 @@ import (
func Start(config *config.GlobalConfig, port int, proPort int) error {
if port == 0 || proPort == 0 {
err := errors.New("port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort))
return errs.Wrap(err)
err := errors.New("port or proPort is empty")
wrappedErr := errs.WrapMsg(err, "validation error", "port", port, "proPort", proPort)
return wrappedErr
}
rdb, err := cache.NewRedis(&config.Redis)
if err != nil {
return err
@ -66,20 +68,22 @@ func Start(config *config.GlobalConfig, port int, proPort int) error {
// Determine whether zk is passed according to whether it is a clustered deployment
client, err = kdisc.NewDiscoveryRegister(config)
if err != nil {
return errs.WrapMsg(err, "register discovery err")
return errs.WrapMsg(err, "failed to register discovery service")
}
if err = client.CreateRpcRootNodes(config.GetServiceNames()); err != nil {
return errs.WrapMsg(err, "create rpc root nodes error")
return errs.WrapMsg(err, "failed to create RPC root nodes")
}
if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.EncodeConfig()); err != nil {
return errs.Wrap(err)
return errs.WrapMsg(err, "failed to register configuration to registry")
}
var (
netDone = make(chan struct{}, 1)
netErr error
)
router := newGinRouter(client, rdb, config)
if config.Prometheus.Enable {
go func() {

View File

@ -268,7 +268,7 @@ func (c *Client) replyMessage(ctx context.Context, binaryReq *Req, err error, re
}
if binaryReq.ReqIdentifier == WsLogoutMsg {
return errs.Wrap(errors.New("user logout"))
return errs.WrapMsg(errors.New("user logout"), "user requested logout", "operationID", binaryReq.OperationID)
}
return nil
}

View File

@ -34,6 +34,7 @@ type Compressor interface {
DeCompress(compressedData []byte) ([]byte, error)
DecompressWithPool(compressedData []byte) ([]byte, error)
}
type GzipCompressor struct {
compressProtocol string
}
@ -47,7 +48,21 @@ func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error) {
gz := gzip.NewWriter(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil {
return nil, errs.WrapMsg(err, "GzipCompressor.Compress: writing to gzip writer failed")
return nil, errs.WrapMsg(err, func (g *GzipCompressor) CompressWithPool(rawData []byte) ([]byte, error) {
gz := gzipWriterPool.Get().(*gzip.Writer)
defer gzipWriterPool.Put(gz)
gzipBuffer := bytes.Buffer{}
gz.Reset(&gzipBuffer)
if _, err := gz.Write(rawData); err != nil {
return nil, errs.WrapMsg(err, "GzipCompressor.CompressWithPool: error writing data")
}
if err := gz.Close(); err != nil {
return nil, errs.WrapMsg(err, "GzipCompressor.CompressWithPool: error closing gzip writer")
}
return gzipBuffer.Bytes(), nil
}"GzipCompressor.Compress: writing to gzip writer failed")
}
if err := gz.Close(); err != nil {

View File

@ -141,7 +141,6 @@ func (c *UserConnContext) GetBackground() bool {
b, err := strconv.ParseBool(c.Req.URL.Query().Get(BackgroundStatus))
if err != nil {
return false
} else {
}
return b
}
}

View File

@ -35,9 +35,8 @@ func NewGobEncoder() *GobEncoder {
func (g *GobEncoder) Encode(data any) ([]byte, error) {
buff := bytes.Buffer{}
enc := gob.NewEncoder(&buff)
err := enc.Encode(data)
if err != nil {
return nil, errs.WrapMsg(err, "GobEncoder.Encode failed")
if err := enc.Encode(data); err != nil {
return nil, errs.WrapMsg(err, "GobEncoder.Encode failed", "action", "encode")
}
return buff.Bytes(), nil
}
@ -45,9 +44,8 @@ func (g *GobEncoder) Encode(data any) ([]byte, error) {
func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
buff := bytes.NewBuffer(encodeData)
dec := gob.NewDecoder(buff)
err := dec.Decode(decodeData)
if err != nil {
return errs.WrapMsg(err, "GobEncoder.Decode failed")
if err := dec.Decode(decodeData); err != nil {
return errs.WrapMsg(err, "GobEncoder.Decode failed", "action", "decode")
}
return nil
}

View File

@ -123,10 +123,7 @@ func (s *Server) GetUsersOnlineStatus(
return &resp, nil
}
func (s *Server) OnlineBatchPushOneMsg(
ctx context.Context,
req *msggateway.OnlineBatchPushOneMsgReq,
) (*msggateway.OnlineBatchPushOneMsgResp, error) {
func (s *Server) OnlineBatchPushOneMsg(ctx context.Context, req *msggateway.OnlineBatchPushOneMsgReq) (*msggateway.OnlineBatchPushOneMsgResp, error) {
panic("implement me")
}
@ -204,10 +201,7 @@ func (s *Server) KickUserOffline(
return &msggateway.KickUserOfflineResp{}, nil
}
func (s *Server) MultiTerminalLoginCheck(
ctx context.Context,
req *msggateway.MultiTerminalLoginCheckReq,
) (*msggateway.MultiTerminalLoginCheckResp, error) {
func (s *Server) MultiTerminalLoginCheck(ctx context.Context, req *msggateway.MultiTerminalLoginCheckReq) (*msggateway.MultiTerminalLoginCheckResp, error) {
if oldClients, userOK, clientOK := s.LongConnServer.GetUserPlatformCons(req.UserID, int(req.PlatformID)); userOK {
tempUserCtx := newTempContext()
tempUserCtx.SetToken(req.Token)

View File

@ -113,11 +113,13 @@ func (d *GWebSocket) SetWriteDeadline(timeout time.Duration) error {
func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Response, error) {
conn, httpResp, err := websocket.DefaultDialer.Dial(urlStr, requestHeader)
if err == nil {
if err != nil {
return httpResp, errs.WrapMsg(err, "GWebSocket.Dial failed", "url", urlStr)
}
d.conn = conn
return httpResp, nil
}
return httpResp, err
}
func (d *GWebSocket) IsNil() bool {
return d.conn == nil

View File

@ -116,21 +116,21 @@ func NewGrpcHandler(validate *validator.Validate, client discoveryregistry.SvcDi
}
}
func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error) {
func (g GrpcHandler) GetSeq(ctx context.Context, data *Req) ([]byte, error) {
req := sdkws.GetMaxSeqReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.WrapMsg(err, "GetSeq: error unmarshaling request")
return nil, errs.WrapMsg(err, "GetSeq: error unmarshaling request", "action", "unmarshal", "dataType", "GetMaxSeqReq")
}
if err := g.validate.Struct(&req); err != nil {
return nil, errs.WrapMsg(err, "GetSeq: validation failed")
return nil, errs.WrapMsg(err, "GetSeq: validation failed", "action", "validate", "dataType", "GetMaxSeqReq")
}
resp, err := g.msgRpcClient.GetMaxSeq(context, &req)
resp, err := g.msgRpcClient.GetMaxSeq(ctx, &req)
if err != nil {
return nil, err
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.WrapMsg(err, "GetSeq: error marshaling response")
return nil, errs.WrapMsg(err, "GetSeq: error marshaling response", "action", "marshal", "dataType", "GetMaxSeqResp")
}
return c, nil
}
@ -138,19 +138,16 @@ func (g GrpcHandler) GetSeq(context context.Context, data *Req) ([]byte, error)
// SendMessage handles the sending of messages through gRPC. It unmarshals the request data,
// validates the message, and then sends it using the message RPC client.
func (g GrpcHandler) SendMessage(ctx context.Context, data *Req) ([]byte, error) {
// Unmarshal the message data from the request.
var msgData sdkws.MsgData
if err := proto.Unmarshal(data.Data, &msgData); err != nil {
return nil, errs.WrapMsg(err, "error unmarshalling message data")
return nil, errs.WrapMsg(err, "SendMessage: error unmarshaling message data", "action", "unmarshal", "dataType", "MsgData")
}
// Validate the message data structure.
if err := g.validate.Struct(&msgData); err != nil {
return nil, errs.WrapMsg(err, "message data validation failed")
return nil, errs.WrapMsg(err, "SendMessage: message data validation failed", "action", "validate", "dataType", "MsgData")
}
req := msg.SendMsgReq{MsgData: &msgData}
resp, err := g.msgRpcClient.SendMsg(ctx, &req)
if err != nil {
return nil, err
@ -158,12 +155,13 @@ func (g GrpcHandler) SendMessage(ctx context.Context, data *Req) ([]byte, error)
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.WrapMsg(err, "error marshaling response")
return nil, errs.WrapMsg(err, "SendMessage: error marshaling response", "action", "marshal", "dataType", "SendMsgResp")
}
return c, nil
}
func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]byte, error) {
resp, err := g.msgRpcClient.SendMsg(context, nil)
if err != nil {
@ -171,7 +169,7 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.WrapMsg(err, "error marshaling response")
return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "SendMsgResp")
}
return c, nil
}
@ -179,10 +177,10 @@ func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]by
func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([]byte, error) {
req := sdkws.PullMessageBySeqsReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.WrapMsg(err, "error unmarshaling request")
return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "PullMessageBySeqsReq")
}
if err := g.validate.Struct(data); err != nil {
return nil, errs.WrapMsg(err, "validation failed")
return nil, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "PullMessageBySeqsReq")
}
resp, err := g.msgRpcClient.PullMessageBySeqList(context, &req)
if err != nil {
@ -190,7 +188,7 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.WrapMsg(err, "error marshaling response")
return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "PullMessageBySeqsResp")
}
return c, nil
}
@ -198,7 +196,7 @@ func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([
func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, error) {
req := push.DelUserPushTokenReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, errs.WrapMsg(err, "error unmarshaling request")
return nil, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "DelUserPushTokenReq")
}
resp, err := g.pushClient.DelUserPushToken(context, &req)
if err != nil {
@ -206,7 +204,7 @@ func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, err
}
c, err := proto.Marshal(resp)
if err != nil {
return nil, errs.WrapMsg(err, "error marshaling response")
return nil, errs.WrapMsg(err, "error marshaling response", "action", "marshal", "dataType", "DelUserPushTokenResp")
}
return c, nil
}
@ -214,10 +212,10 @@ func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, err
func (g GrpcHandler) SetUserDeviceBackground(_ context.Context, data *Req) ([]byte, bool, error) {
req := sdkws.SetAppBackgroundStatusReq{}
if err := proto.Unmarshal(data.Data, &req); err != nil {
return nil, false, errs.WrapMsg(err, "error unmarshaling request")
return nil, false, errs.WrapMsg(err, "error unmarshaling request", "action", "unmarshal", "dataType", "SetAppBackgroundStatusReq")
}
if err := g.validate.Struct(data); err != nil {
return nil, false, errs.WrapMsg(err, "validation failed")
return nil, false, errs.WrapMsg(err, "validation failed", "action", "validate", "dataType", "SetAppBackgroundStatusReq")
}
return nil, req.IsBackground, nil
}

View File

@ -130,7 +130,7 @@ func (ws *WsServer) UnRegister(c *Client) {
func (ws *WsServer) Validate(s any) error {
if s == nil {
return errs.Wrap(errors.New("input cannot be nil"))
return errs.WrapMsg(errors.New("input cannot be nil"), "Validate: input is nil", "action", "validate", "dataType", "any")
}
return nil
}
@ -197,9 +197,9 @@ func (ws *WsServer) Run(done chan error) error {
go func() {
http.HandleFunc("/", ws.wsHandler)
err := server.ListenAndServe()
defer close(netDone)
if err != nil && err != http.ErrServerClosed {
netErr = errs.WrapMsg(err, "ws start err", server.Addr)
close(netDone)
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
@ -296,6 +296,7 @@ func (ws *WsServer) registerClient(client *Client) {
_ = ws.sendUserOnlineInfoToOtherNode(client.ctx, client)
}()
}
wg.Add(1)
go func() {
defer wg.Done()

View File

@ -42,6 +42,7 @@ func NewDiscoveryRegister(config *config.GlobalConfig) (discoveryregistry.SvcDis
case "direct":
return direct.NewConnDirect(config)
default:
return nil, errs.Wrap(errors.New("envType not correct"))
errMsg := "unsupported discovery type"
return nil, errs.WrapMsg(errors.New(errMsg), errMsg, "type", config.Envs.Discovery)
}
}