stats/opentelemetry: Introduce Tracing API (#7852)

This commit is contained in:
Abhishek Ranjan 2025-01-30 09:16:28 +05:30 committed by GitHub
parent 7e1c9b2029
commit 78eebff58b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1334 additions and 59 deletions

View File

@ -0,0 +1,36 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package opentelemetry is EXPERIMENTAL and will be moved to stats/opentelemetry
// package in a later release.
package opentelemetry
import (
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)
// TraceOptions contains the tracing settings for OpenTelemetry instrumentation.
type TraceOptions struct {
// TracerProvider is the OpenTelemetry tracer which is required to
// record traces/trace spans for instrumentation. If unset, tracing
// will not be recorded.
TracerProvider trace.TracerProvider
// TextMapPropagator propagates span context through text map carrier.
// If unset, tracing will not be recorded.
TextMapPropagator propagation.TextMapPropagator
}

View File

@ -21,7 +21,10 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc" "google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
estats "google.golang.org/grpc/experimental/stats" estats "google.golang.org/grpc/experimental/stats"
istats "google.golang.org/grpc/internal/stats" istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
@ -85,8 +88,12 @@ func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string
} }
startTime := time.Now() startTime := time.Now()
var span trace.Span
if h.options.isTracingEnabled() {
ctx, span = h.createCallTraceSpan(ctx, method)
}
err := invoker(ctx, method, req, reply, cc, opts...) err := invoker(ctx, method, req, reply, cc, opts...)
h.perCallMetrics(ctx, err, startTime, ci) h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
return err return err
} }
@ -119,16 +126,30 @@ func (h *clientStatsHandler) streamInterceptor(ctx context.Context, desc *grpc.S
} }
startTime := time.Now() startTime := time.Now()
var span trace.Span
if h.options.isTracingEnabled() {
ctx, span = h.createCallTraceSpan(ctx, method)
}
callback := func(err error) { callback := func(err error) {
h.perCallMetrics(ctx, err, startTime, ci) h.perCallTracesAndMetrics(ctx, err, startTime, ci, span)
} }
opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...) opts = append([]grpc.CallOption{grpc.OnFinish(callback)}, opts...)
return streamer(ctx, desc, cc, method, opts...) return streamer(ctx, desc, cc, method, opts...)
} }
func (h *clientStatsHandler) perCallMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo) { // perCallTracesAndMetrics records per call trace spans and metrics.
callLatency := float64(time.Since(startTime)) / float64(time.Second) // calculate ASAP func (h *clientStatsHandler) perCallTracesAndMetrics(ctx context.Context, err error, startTime time.Time, ci *callInfo, ts trace.Span) {
if h.options.isTracingEnabled() {
s := status.Convert(err)
if s.Code() == grpccodes.OK {
ts.SetStatus(otelcodes.Ok, s.Message())
} else {
ts.SetStatus(otelcodes.Error, s.Message())
}
ts.End()
}
if h.options.isMetricsEnabled() {
callLatency := float64(time.Since(startTime)) / float64(time.Second)
attrs := otelmetric.WithAttributeSet(otelattribute.NewSet( attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(
otelattribute.String("grpc.method", ci.method), otelattribute.String("grpc.method", ci.method),
otelattribute.String("grpc.target", ci.target), otelattribute.String("grpc.target", ci.target),
@ -136,6 +157,7 @@ func (h *clientStatsHandler) perCallMetrics(ctx context.Context, err error, star
)) ))
h.clientMetrics.callDuration.Record(ctx, callLatency, attrs) h.clientMetrics.callDuration.Record(ctx, callLatency, attrs)
} }
}
// TagConn exists to satisfy stats.Handler. // TagConn exists to satisfy stats.Handler.
func (h *clientStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { func (h *clientStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
@ -163,15 +185,17 @@ func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
} }
ctx = istats.SetLabels(ctx, labels) ctx = istats.SetLabels(ctx, labels)
} }
ai := &attemptInfo{ // populates information about RPC start. ai := &attemptInfo{
startTime: time.Now(), startTime: time.Now(),
xdsLabels: labels.TelemetryLabels, xdsLabels: labels.TelemetryLabels,
method: info.FullMethodName, method: removeLeadingSlash(info.FullMethodName),
} }
ri := &rpcInfo{ if h.options.isTracingEnabled() {
ctx, ai = h.traceTagRPC(ctx, ai)
}
return setRPCInfo(ctx, &rpcInfo{
ai: ai, ai: ai,
} })
return setRPCInfo(ctx, ri)
} }
func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
@ -180,8 +204,13 @@ func (h *clientStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present") logger.Error("ctx passed into client side stats handler metrics event handling has no client attempt data present")
return return
} }
if h.options.isMetricsEnabled() {
h.processRPCEvent(ctx, rs, ri.ai) h.processRPCEvent(ctx, rs, ri.ai)
} }
if h.options.isTracingEnabled() {
populateSpan(rs, ri.ai)
}
}
func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { func (h *clientStatsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
switch st := s.(type) { switch st := s.(type) {

View File

@ -0,0 +1,54 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package opentelemetry
import (
"context"
"strings"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
)
// traceTagRPC populates provided context with a new span using the
// TextMapPropagator supplied in trace options and internal itracing.carrier.
// It creates a new outgoing carrier which serializes information about this
// span into gRPC Metadata, if TextMapPropagator is provided in the trace
// options. if TextMapPropagator is not provided, it returns the context as is.
func (h *clientStatsHandler) traceTagRPC(ctx context.Context, ai *attemptInfo) (context.Context, *attemptInfo) {
mn := "Attempt." + strings.Replace(ai.method, "/", ".", -1)
tracer := otel.Tracer("grpc-open-telemetry")
ctx, span := tracer.Start(ctx, mn)
carrier := otelinternaltracing.NewOutgoingCarrier(ctx)
otel.GetTextMapPropagator().Inject(ctx, carrier)
ai.traceSpan = span
return carrier.Context(), ai
}
// createCallTraceSpan creates a call span to put in the provided context using
// provided TraceProvider. If TraceProvider is nil, it returns context as is.
func (h *clientStatsHandler) createCallTraceSpan(ctx context.Context, method string) (context.Context, trace.Span) {
if h.options.TraceOptions.TracerProvider == nil {
logger.Error("TraceProvider is not provided in trace options")
return ctx, nil
}
mn := strings.Replace(removeLeadingSlash(method), "/", ".", -1)
tracer := otel.Tracer("grpc-open-telemetry")
ctx, span := tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindClient))
return ctx, span
}

File diff suppressed because it is too large Load Diff

View File

@ -27,35 +27,50 @@ import (
"strings" "strings"
"time" "time"
otelattribute "go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
experimental "google.golang.org/grpc/experimental/opentelemetry"
estats "google.golang.org/grpc/experimental/stats" estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal" "google.golang.org/grpc/internal"
"google.golang.org/grpc/stats" "google.golang.org/grpc/stats"
otelinternal "google.golang.org/grpc/stats/opentelemetry/internal" otelinternal "google.golang.org/grpc/stats/opentelemetry/internal"
otelattribute "go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
) )
func init() { func init() {
otelinternal.SetPluginOption = func(o *Options, po otelinternal.PluginOption) { otelinternal.SetPluginOption = func(o *Options, po otelinternal.PluginOption) {
o.MetricsOptions.pluginOption = po o.MetricsOptions.pluginOption = po
// Log an error if one of the options is missing.
if (o.TraceOptions.TextMapPropagator == nil) != (o.TraceOptions.TracerProvider == nil) {
logger.Warning("Tracing will not be recorded because traceOptions are not set properly: one of TextMapPropagator or TracerProvider is missing")
}
} }
} }
var logger = grpclog.Component("otel-plugin") var (
logger = grpclog.Component("otel-plugin")
var canonicalString = internal.CanonicalString.(func(codes.Code) string) canonicalString = internal.CanonicalString.(func(codes.Code) string)
joinDialOptions = internal.JoinDialOptions.(func(...grpc.DialOption) grpc.DialOption)
var joinDialOptions = internal.JoinDialOptions.(func(...grpc.DialOption) grpc.DialOption) )
// Options are the options for OpenTelemetry instrumentation. // Options are the options for OpenTelemetry instrumentation.
type Options struct { type Options struct {
// MetricsOptions are the metrics options for OpenTelemetry instrumentation. // MetricsOptions are the metrics options for OpenTelemetry instrumentation.
MetricsOptions MetricsOptions MetricsOptions MetricsOptions
// TraceOptions are the tracing options for OpenTelemetry instrumentation.
TraceOptions experimental.TraceOptions
}
func (o *Options) isMetricsEnabled() bool {
return o.MetricsOptions.MeterProvider != nil
}
func (o *Options) isTracingEnabled() bool {
return o.TraceOptions.TracerProvider != nil
} }
// MetricsOptions are the metrics options for OpenTelemetry instrumentation. // MetricsOptions are the metrics options for OpenTelemetry instrumentation.
@ -187,6 +202,15 @@ type attemptInfo struct {
pluginOptionLabels map[string]string // pluginOptionLabels to attach to metrics emitted pluginOptionLabels map[string]string // pluginOptionLabels to attach to metrics emitted
xdsLabels map[string]string xdsLabels map[string]string
// traceSpan is data used for recording traces.
traceSpan trace.Span
// message counters for sent and received messages (used for
// generating message IDs), and the number of previous RPC attempts for the
// associated call.
countSentMsg uint32
countRecvMsg uint32
previousRPCAttempts uint32
} }
type clientMetrics struct { type clientMetrics struct {

View File

@ -201,10 +201,12 @@ func (h *serverStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo)
startTime: time.Now(), startTime: time.Now(),
method: removeLeadingSlash(method), method: removeLeadingSlash(method),
} }
ri := &rpcInfo{ if h.options.isTracingEnabled() {
ai: ai, ctx, ai = h.traceTagRPC(ctx, ai)
} }
return setRPCInfo(ctx, ri) return setRPCInfo(ctx, &rpcInfo{
ai: ai,
})
} }
// HandleRPC implements per RPC tracing and stats implementation. // HandleRPC implements per RPC tracing and stats implementation.
@ -214,8 +216,13 @@ func (h *serverStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
logger.Error("ctx passed into server side stats handler metrics event handling has no server call data present") logger.Error("ctx passed into server side stats handler metrics event handling has no server call data present")
return return
} }
if h.options.isTracingEnabled() {
populateSpan(rs, ri.ai)
}
if h.options.isMetricsEnabled() {
h.processRPCData(ctx, rs, ri.ai) h.processRPCData(ctx, rs, ri.ai)
} }
}
func (h *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { func (h *serverStatsHandler) processRPCData(ctx context.Context, s stats.RPCStats, ai *attemptInfo) {
switch st := s.(type) { switch st := s.(type) {

View File

@ -0,0 +1,46 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package opentelemetry
import (
"context"
"strings"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
otelinternaltracing "google.golang.org/grpc/stats/opentelemetry/internal/tracing"
)
// traceTagRPC populates context with new span data using the TextMapPropagator
// supplied in trace options and internal itracing.Carrier. It creates a new
// incoming carrier which extracts an existing span context (if present) by
// deserializing from provided context. If valid span context is extracted, it
// is set as parent of the new span otherwise new span remains the root span.
// If TextMapPropagator is not provided in the trace options, it returns context
// as is.
func (h *serverStatsHandler) traceTagRPC(ctx context.Context, ai *attemptInfo) (context.Context, *attemptInfo) {
mn := strings.Replace(ai.method, "/", ".", -1)
var span trace.Span
tracer := otel.Tracer("grpc-open-telemetry")
ctx = otel.GetTextMapPropagator().Extract(ctx, otelinternaltracing.NewIncomingCarrier(ctx))
// If the context.Context provided in `ctx` to tracer.Start(), contains a
// span then the newly-created Span will be a child of that span,
// otherwise it will be a root span.
ctx, span = tracer.Start(ctx, mn, trace.WithSpanKind(trace.SpanKindServer))
ai.traceSpan = span
return ctx, ai
}

View File

@ -0,0 +1,82 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package opentelemetry
import (
"sync/atomic"
"go.opentelemetry.io/otel/attribute"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)
// populateSpan populates span information based on stats passed in, representing
// invariants of the RPC lifecycle. It ends the span, triggering its export.
// This function handles attempt spans on the client-side and call spans on the
// server-side.
func populateSpan(rs stats.RPCStats, ai *attemptInfo) {
if ai == nil || ai.traceSpan == nil {
// Shouldn't happen, tagRPC call comes before this function gets called
// which populates this information.
logger.Error("ctx passed into stats handler tracing event handling has no traceSpan present")
return
}
span := ai.traceSpan
switch rs := rs.(type) {
case *stats.Begin:
// Note: Go always added Client and FailFast attributes even though they are not
// defined by the OpenCensus gRPC spec. Thus, they are unimportant for
// correctness.
span.SetAttributes(
attribute.Bool("Client", rs.Client),
attribute.Bool("FailFast", rs.Client),
attribute.Int64("previous-rpc-attempts", int64(ai.previousRPCAttempts)),
attribute.Bool("transparent-retry", rs.IsTransparentRetryAttempt),
)
// increment previous rpc attempts applicable for next attempt
atomic.AddUint32(&ai.previousRPCAttempts, 1)
case *stats.PickerUpdated:
span.AddEvent("Delayed LB pick complete")
case *stats.InPayload:
// message id - "must be calculated as two different counters starting
// from one for sent messages and one for received messages."
ai.countRecvMsg++
span.AddEvent("Inbound compressed message", trace.WithAttributes(
attribute.Int64("sequence-number", int64(ai.countRecvMsg)),
attribute.Int64("message-size", int64(rs.Length)),
attribute.Int64("message-size-compressed", int64(rs.CompressedLength)),
))
case *stats.OutPayload:
ai.countSentMsg++
span.AddEvent("Outbound compressed message", trace.WithAttributes(
attribute.Int64("sequence-number", int64(ai.countSentMsg)),
attribute.Int64("message-size", int64(rs.Length)),
attribute.Int64("message-size-compressed", int64(rs.CompressedLength)),
))
case *stats.End:
if rs.Error != nil {
s := status.Convert(rs.Error)
span.SetStatus(otelcodes.Error, s.Message())
} else {
span.SetStatus(otelcodes.Ok, "Ok")
}
span.End()
}
}