diff --git a/pkg/common/mw/rpc_client_interceptor.go b/pkg/common/mw/rpc_client_interceptor.go new file mode 100644 index 000000000..dd0052658 --- /dev/null +++ b/pkg/common/mw/rpc_client_interceptor.go @@ -0,0 +1,54 @@ +package mw + +import ( + "OpenIM/pkg/common/constant" + "OpenIM/pkg/common/log" + "OpenIM/pkg/errs" + "context" + "errors" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +func GrpcClient() grpc.DialOption { + return grpc.WithUnaryInterceptor(rpcClientInterceptor) +} + +func rpcClientInterceptor(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) { + if ctx == nil { + return errs.ErrInternalServer.Wrap("call rpc request context is nil") + } + log.ZInfo(ctx, "rpc client req", "req", "funcName", method, rpcString(req)) + operationID, ok := ctx.Value(constant.OperationID).(string) + if !ok { + log.ZWarn(ctx, "ctx missing operationID", errors.New("ctx missing operationID")) + return errs.ErrArgs.Wrap("ctx missing operationID") + } + md := metadata.Pairs(constant.OperationID, operationID) + opUserID, ok := ctx.Value(constant.OpUserID).(string) + if ok { + md.Append(constant.OpUserID, opUserID) + } + err = invoker(metadata.NewOutgoingContext(ctx, md), method, req, resp, cc, opts...) + if err == nil { + log.ZInfo(ctx, "rpc client resp", "funcName", method, rpcString(resp)) + return nil + } + log.ZError(ctx, "rpc result error:", err) + rpcErr, ok := err.(interface{ GRPCStatus() *status.Status }) + if !ok { + return errs.ErrInternalServer.Wrap(err.Error()) + } + sta := rpcErr.GRPCStatus() + if sta.Code() == 0 { + return errs.NewCodeError(errs.ServerInternalError, err.Error()).Wrap() + } + if details := sta.Details(); len(details) > 0 { + if v, ok := details[0].(*wrapperspb.StringValue); ok { + return errs.NewCodeError(int(sta.Code()), sta.Message()).Wrap(v.String()) + } + } + return errs.NewCodeError(int(sta.Code()), sta.Message()).Wrap() +} diff --git a/pkg/common/mw/rpc.go b/pkg/common/mw/rpc_server_interceptor.go similarity index 54% rename from pkg/common/mw/rpc.go rename to pkg/common/mw/rpc_server_interceptor.go index a96e4ca70..346d7e556 100644 --- a/pkg/common/mw/rpc.go +++ b/pkg/common/mw/rpc_server_interceptor.go @@ -1,7 +1,6 @@ package mw import ( - "OpenIM/pkg/common/constant" "OpenIM/pkg/common/log" "OpenIM/pkg/common/mw/specialerror" "OpenIM/pkg/errs" @@ -14,8 +13,6 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" "math" "runtime/debug" - - "errors" ) const OperationID = "operationID" @@ -36,7 +33,7 @@ func rpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary } }() funcName := info.FullMethod - log.ZInfo(ctx, "rpc input", "funcName", funcName, "req", rpcString(req)) + log.ZInfo(ctx, "rpc req", "funcName", funcName, "req", rpcString(req)) md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, status.New(codes.InvalidArgument, "missing metadata").Err() @@ -57,74 +54,28 @@ func rpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary log.ZInfo(ctx, "server handle rpc success", "funcName", funcName, "resp", rpcString(resp)) return resp, nil } - log.ZError(ctx, "rpc InternalServer:", err, "req", req) unwrap := errs.Unwrap(err) codeErr := specialerror.ErrCode(unwrap) if codeErr == nil { log.ZError(ctx, "rpc InternalServer:", err, "req", req) codeErr = errs.ErrInternalServer } - var stack string - if unwrap != err { - stack = fmt.Sprintf("%+v", err) - log.ZError(ctx, "rpc error stack:", err) - } code := codeErr.Code() if code <= 0 || code > math.MaxUint32 { log.ZError(ctx, "rpc UnknownError", err, "rpc UnknownCode:", code) code = errs.ServerInternalError } grpcStatus := status.New(codes.Code(code), codeErr.Msg()) - if errs.Unwrap(err) != err { + if unwrap != err { + stack := fmt.Sprintf("%+v", err) if details, err := grpcStatus.WithDetails(wrapperspb.String(stack)); err == nil { grpcStatus = details } } - log.ZInfo(ctx, "rpc resp", "funcName", funcName, "Resp", rpcString(resp)) + log.ZWarn(ctx, "rpc resp", err, "funcName", funcName) return nil, grpcStatus.Err() } -func rpcClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) { - if ctx == nil { - return errs.ErrInternalServer.Wrap("call rpc request context is nil") - } - log.ZInfo(ctx, "rpc input", "req", req) - operationID, ok := ctx.Value(constant.OperationID).(string) - if !ok { - log.ZError(ctx, "ctx missing operationID", errors.New("ctx missing operationID")) - return errs.ErrArgs.Wrap("ctx missing operationID") - } - md := metadata.Pairs(constant.OperationID, operationID) - opUserID, ok := ctx.Value(constant.OpUserID).(string) - if ok { - md.Append(constant.OpUserID, opUserID) - } - err = invoker(metadata.NewOutgoingContext(ctx, md), method, req, reply, cc, opts...) - if err == nil { - log.ZInfo(ctx, "rpc return", "resp", rpcString(reply)) - return nil - } - log.ZError(ctx, "rpc result error:", err) - rpcErr, ok := err.(interface{ GRPCStatus() *status.Status }) - if !ok { - return errs.ErrInternalServer.Wrap(err.Error()) - } - sta := rpcErr.GRPCStatus() - if sta.Code() == 0 { - return errs.NewCodeError(errs.ServerInternalError, err.Error()).Wrap() - } - if details := sta.Details(); len(details) > 0 { - if v, ok := details[0].(*wrapperspb.StringValue); ok { - return errs.NewCodeError(int(sta.Code()), sta.Message()).Wrap(v.String()) - } - } - return errs.NewCodeError(int(sta.Code()), sta.Message()).Wrap() -} - func GrpcServer() grpc.ServerOption { return grpc.UnaryInterceptor(rpcServerInterceptor) } - -func GrpcClient() grpc.DialOption { - return grpc.WithUnaryInterceptor(rpcClientInterceptor) -}