// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. // // This Source Code Form is subject to the terms of the MIT License. // If a copy of the MIT was not distributed with this file, // You can obtain one at https://github.com/gogf/gf. // https://github.com/open-telemetry/opentelemetry-go-contrib/blob/master/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go package tracing // gRPC tracing middleware // https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/rpc.md import ( "context" "errors" "io" "net" "strings" "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.7.0" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" grpcCodes "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/protobuf/proto" ) type messageType attribute.KeyValue // Event adds an event of the messageType to the span associated with the // passed context with id and size (if message is a proto message). func (m messageType) Event(ctx context.Context, id int, message interface{}) { span := trace.SpanFromContext(ctx) if p, ok := message.(proto.Message); ok { span.AddEvent("message", trace.WithAttributes( attribute.KeyValue(m), attribute.Key("message.id").Int(id), attribute.Key("message.uncompressed_size").Int(proto.Size(p)), )) } else { span.AddEvent("message", trace.WithAttributes( attribute.KeyValue(m), attribute.Key("message.id").Int(id), )) } } var ( messageSent = messageType(attribute.Key("message.type").String("SENT")) messageReceived = messageType(attribute.Key("message.type").String("RECEIVED")) ) type streamEventType int type streamEvent struct { Type streamEventType Err error } const ( closeEvent streamEventType = iota receiveEndEvent errorEvent ) // clientStream wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and // SendMsg method call. type clientStream struct { grpc.ClientStream desc *grpc.StreamDesc events chan streamEvent eventsDone chan struct{} finished chan error receivedMessageID int sentMessageID int } var _ = proto.Marshal func (w *clientStream) RecvMsg(m interface{}) error { err := w.ClientStream.RecvMsg(m) if err == nil && !w.desc.ServerStreams { w.sendStreamEvent(receiveEndEvent, nil) } else if errors.Is(err, io.EOF) { w.sendStreamEvent(receiveEndEvent, nil) } else if err != nil { w.sendStreamEvent(errorEvent, err) } else { w.receivedMessageID++ messageReceived.Event(w.Context(), w.receivedMessageID, m) } return err } func (w *clientStream) SendMsg(m interface{}) error { err := w.ClientStream.SendMsg(m) w.sentMessageID++ messageSent.Event(w.Context(), w.sentMessageID, m) if err != nil { w.sendStreamEvent(errorEvent, err) } return err } func (w *clientStream) Header() (metadata.MD, error) { md, err := w.ClientStream.Header() if err != nil { w.sendStreamEvent(errorEvent, err) } return md, err } func (w *clientStream) CloseSend() error { err := w.ClientStream.CloseSend() if err != nil { w.sendStreamEvent(errorEvent, err) } else { w.sendStreamEvent(closeEvent, nil) } return err } const ( clientClosedState byte = 1 << iota receiveEndedState ) func wrapClientStream(s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream { var ( events = make(chan streamEvent) eventsDone = make(chan struct{}) finished = make(chan error) ) go func() { defer close(eventsDone) // Both streams have to be closed state := byte(0) for event := range events { switch event.Type { case closeEvent: state |= clientClosedState case receiveEndEvent: state |= receiveEndedState case errorEvent: finished <- event.Err return } if state == clientClosedState|receiveEndedState { finished <- nil return } } }() return &clientStream{ ClientStream: s, desc: desc, events: events, eventsDone: eventsDone, finished: finished, } } func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) { select { case <-w.eventsDone: case w.events <- streamEvent{Type: eventType, Err: err}: } } // serverStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and // SendMsg method call. type serverStream struct { grpc.ServerStream ctx context.Context receivedMessageID int sentMessageID int } func (w *serverStream) Context() context.Context { return w.ctx } func (w *serverStream) RecvMsg(m interface{}) error { err := w.ServerStream.RecvMsg(m) if err == nil { w.receivedMessageID++ messageReceived.Event(w.Context(), w.receivedMessageID, m) } return err } func (w *serverStream) SendMsg(m interface{}) error { err := w.ServerStream.SendMsg(m) w.sentMessageID++ messageSent.Event(w.Context(), w.sentMessageID, m) return err } func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream { return &serverStream{ ServerStream: ss, ctx: ctx, } } // spanInfo returns a span name and all appropriate attributes from the gRPC // method and peer address. func spanInfo(fullMethod, peerAddress string) (string, []attribute.KeyValue) { attrs := []attribute.KeyValue{attribute.Key("rpc.system").String("grpc")} name, mAttrs := parseFullMethod(fullMethod) attrs = append(attrs, mAttrs...) attrs = append(attrs, peerAttr(peerAddress)...) return name, attrs } // peerAttr returns attributes about the peer address. func peerAttr(addr string) []attribute.KeyValue { host, port, err := net.SplitHostPort(addr) if err != nil { return []attribute.KeyValue(nil) } if host == "" { host = "127.0.0.1" } return []attribute.KeyValue{ semconv.NetPeerIPKey.String(host), semconv.NetPeerPortKey.String(port), } } // peerFromCtx returns a peer address from a context, if one exists. func peerFromCtx(ctx context.Context) string { p, ok := peer.FromContext(ctx) if !ok { return "" } return p.Addr.String() } // parseFullMethod returns a span name following the OpenTelemetry semantic // conventions as well as all applicable span attribute.KeyValue attributes based // on a gRPC's FullMethod. func parseFullMethod(fullMethod string) (string, []attribute.KeyValue) { name := strings.TrimLeft(fullMethod, "/") parts := strings.SplitN(name, "/", 2) if len(parts) != 2 { // Invalid format, does not follow `/package.service/method`. return name, []attribute.KeyValue(nil) } var attrs []attribute.KeyValue if service := parts[0]; service != "" { attrs = append(attrs, semconv.RPCServiceKey.String(service)) } if method := parts[1]; method != "" { attrs = append(attrs, semconv.RPCMethodKey.String(method)) } return name, attrs } // statusCodeAttr returns status code attribute based on given gRPC code. func statusCodeAttr(c grpcCodes.Code) attribute.KeyValue { return GRPCStatusCodeKey.Int64(int64(c)) }