This commit is contained in:
wangchuxiao 2023-05-09 11:35:59 +08:00
parent f5c80ffd50
commit 22e0fc216b
3 changed files with 8 additions and 6 deletions

View File

@ -16,10 +16,10 @@ import (
) )
func GrpcClient() grpc.DialOption { func GrpcClient() grpc.DialOption {
return grpc.WithUnaryInterceptor(rpcClientInterceptor) return grpc.WithUnaryInterceptor(RpcClientInterceptor)
} }
func rpcClientInterceptor(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) { func RpcClientInterceptor(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) {
if ctx == nil { if ctx == nil {
return errs.ErrInternalServer.Wrap("call rpc request context is nil") return errs.ErrInternalServer.Wrap("call rpc request context is nil")
} }

View File

@ -29,7 +29,7 @@ func rpcString(v interface{}) string {
return fmt.Sprintf("%+v", v) return fmt.Sprintf("%+v", v)
} }
func rpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { func RpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
log.ZError(ctx, "rpc panic", nil, "FullMethod", info.FullMethod, "type:", fmt.Sprintf("%T", r), "panic:", r) log.ZError(ctx, "rpc panic", nil, "FullMethod", info.FullMethod, "type:", fmt.Sprintf("%T", r), "panic:", r)
@ -144,5 +144,5 @@ func rpcServerInterceptor(ctx context.Context, req interface{}, info *grpc.Unary
} }
func GrpcServer() grpc.ServerOption { func GrpcServer() grpc.ServerOption {
return grpc.UnaryInterceptor(rpcServerInterceptor) return grpc.UnaryInterceptor(RpcServerInterceptor)
} }

View File

@ -40,16 +40,18 @@ func Start(rpcPort int, rpcRegisterName string, prometheusPort int, rpcFn func(c
if err != nil { if err != nil {
return err return err
} }
options = append(options, mw.GrpcServer()) // ctx 中间件 // ctx 中间件
if config.Config.Prometheus.Enable { if config.Config.Prometheus.Enable {
prome.NewGrpcRequestCounter() prome.NewGrpcRequestCounter()
prome.NewGrpcRequestFailedCounter() prome.NewGrpcRequestFailedCounter()
prome.NewGrpcRequestSuccessCounter() prome.NewGrpcRequestSuccessCounter()
unaryInterceptor := mw.InterceptChain(grpcPrometheus.UnaryServerInterceptor, grpcPrometheus.UnaryServerInterceptor) unaryInterceptor := mw.InterceptChain(grpcPrometheus.UnaryServerInterceptor, mw.RpcServerInterceptor)
options = append(options, []grpc.ServerOption{ options = append(options, []grpc.ServerOption{
grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor), grpc.StreamInterceptor(grpcPrometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(unaryInterceptor), grpc.UnaryInterceptor(unaryInterceptor),
}...) }...)
} else {
options = append(options, mw.GrpcServer())
} }
srv := grpc.NewServer(options...) srv := grpc.NewServer(options...)
defer srv.GracefulStop() defer srv.GracefulStop()