diff --git a/experimental/stats/metricregistry.go b/experimental/stats/metricregistry.go index 392256942..e82eda566 100644 --- a/experimental/stats/metricregistry.go +++ b/experimental/stats/metricregistry.go @@ -23,8 +23,13 @@ import ( "testing" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal" ) +func init() { + internal.SnapshotMetricRegistryForTesting = snapshotMetricsRegistryForTesting +} + var logger = grpclog.Component("metrics-registry") // DefaultMetrics are the default metrics registered through global metrics @@ -54,6 +59,10 @@ type MetricDescriptor struct { // The type of metric. This is set by the metric registry, and not intended // to be set by a component registering a metric. Type MetricType + // Bounds are the bounds of this metric. This only applies to histogram + // metrics. If unset or set with length 0, stats handlers will fall back to + // default bounds. + Bounds []float64 } // MetricType is the type of metric. diff --git a/internal/internal.go b/internal/internal.go index 46ed25768..e1e1422e1 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -215,6 +215,12 @@ var ( // SetConnectedAddress sets the connected address for a SubConnState. SetConnectedAddress any // func(scs *SubConnState, addr resolver.Address) + + // SnapshotMetricRegistryForTesting snapshots the global data of the metric + // registry. Registers a cleanup function on the provided testing.T that + // sets the metric registry to its original state. Only called in testing + // functions. + SnapshotMetricRegistryForTesting any // func(t *testing.T) ) // HealthChecker defines the signature of the client-side LB channel health diff --git a/stats/opentelemetry/client_metrics.go b/stats/opentelemetry/client_metrics.go index f9bcf4665..3fbae43bf 100644 --- a/stats/opentelemetry/client_metrics.go +++ b/stats/opentelemetry/client_metrics.go @@ -33,8 +33,8 @@ import ( ) type clientStatsHandler struct { - options Options - + estats.MetricsRecorder + options Options clientMetrics clientMetrics } @@ -52,7 +52,7 @@ func (h *clientStatsHandler) initializeMetrics() { metrics := h.options.MetricsOptions.Metrics if metrics == nil { - metrics = DefaultMetrics + metrics = DefaultMetrics() } h.clientMetrics.attemptStarted = createInt64Counter(metrics.Metrics(), "grpc.client.attempt.started", meter, otelmetric.WithUnit("attempt"), otelmetric.WithDescription("Number of client call attempts started.")) @@ -60,6 +60,12 @@ func (h *clientStatsHandler) initializeMetrics() { h.clientMetrics.attemptSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per client call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) h.clientMetrics.attemptRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.client.attempt.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per call attempt."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) h.clientMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.client.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("Time taken by gRPC to complete an RPC from application's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + + rm := ®istryMetrics{ + optionalLabels: h.options.MetricsOptions.OptionalLabels, + } + h.MetricsRecorder = rm + rm.registerMetrics(metrics, meter) } func (h *clientStatsHandler) unaryInterceptor(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { diff --git a/stats/opentelemetry/csm/observability_test.go b/stats/opentelemetry/csm/observability_test.go index edb7a8585..d2b2884c2 100644 --- a/stats/opentelemetry/csm/observability_test.go +++ b/stats/opentelemetry/csm/observability_test.go @@ -206,13 +206,13 @@ func (s) TestCSMPluginOptionUnary(t *testing.T) { serverOptionWithCSMPluginOption(opentelemetry.Options{ MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics, + Metrics: opentelemetry.DefaultMetrics(), }}, po), } dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{ MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics, + Metrics: opentelemetry.DefaultMetrics(), OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, }, }, po)} @@ -368,13 +368,13 @@ func (s) TestCSMPluginOptionStreaming(t *testing.T) { serverOptionWithCSMPluginOption(opentelemetry.Options{ MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics, + Metrics: opentelemetry.DefaultMetrics(), }}, po), } dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{ MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics, + Metrics: opentelemetry.DefaultMetrics(), OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, }, }, po)} @@ -460,7 +460,7 @@ func (s) TestXDSLabels(t *testing.T) { dopts := []grpc.DialOption{dialOptionWithCSMPluginOption(opentelemetry.Options{ MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics, + Metrics: opentelemetry.DefaultMetrics(), OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name"}, }, }, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachXDSLabels)} diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 2aaa47061..0e8558aae 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -52,9 +52,7 @@ func Test(t *testing.T) { // component and the server. func setup(t *testing.T, methodAttributeFilter func(string) bool) (*metric.ManualReader, *stubserver.StubServer) { reader := metric.NewManualReader() - provider := metric.NewMeterProvider( - metric.WithReader(reader), - ) + provider := metric.NewMeterProvider(metric.WithReader(reader)) ss := &stubserver.StubServer{ UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { return &testpb.SimpleResponse{Payload: &testpb.Payload{ @@ -74,12 +72,12 @@ func setup(t *testing.T, methodAttributeFilter func(string) bool) (*metric.Manua if err := ss.Start([]grpc.ServerOption{opentelemetry.ServerOption(opentelemetry.Options{ MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics, + Metrics: opentelemetry.DefaultMetrics(), MethodAttributeFilter: methodAttributeFilter, }})}, opentelemetry.DialOption(opentelemetry.Options{ MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics, + Metrics: opentelemetry.DefaultMetrics(), }, })); err != nil { t.Fatalf("Error starting endpoint server: %v", err) diff --git a/stats/opentelemetry/example_test.go b/stats/opentelemetry/example_test.go index 156918e24..66676f3c0 100644 --- a/stats/opentelemetry/example_test.go +++ b/stats/opentelemetry/example_test.go @@ -19,7 +19,7 @@ package opentelemetry_test import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - stats2 "google.golang.org/grpc/experimental/stats" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/stats/opentelemetry" "go.opentelemetry.io/otel/sdk/metric" @@ -53,7 +53,7 @@ func Example_dialOption() { opts := opentelemetry.Options{ MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, - Metrics: opentelemetry.DefaultMetrics, // equivalent to unset - distinct from empty + Metrics: opentelemetry.DefaultMetrics(), // equivalent to unset - distinct from empty }, } do := opentelemetry.DialOption(opts) @@ -88,7 +88,7 @@ func ExampleMetrics_excludeSome() { // To exclude specific metrics, initialize Options as follows: opts := opentelemetry.Options{ MetricsOptions: opentelemetry.MetricsOptions{ - Metrics: opentelemetry.DefaultMetrics.Remove(opentelemetry.ClientAttemptDuration, opentelemetry.ClientAttemptRcvdCompressedTotalMessageSize), + Metrics: opentelemetry.DefaultMetrics().Remove(opentelemetry.ClientAttemptDuration, opentelemetry.ClientAttemptRcvdCompressedTotalMessageSize), }, } do := opentelemetry.DialOption(opts) @@ -103,7 +103,7 @@ func ExampleMetrics_disableAll() { // To disable all metrics, initialize Options as follows: opts := opentelemetry.Options{ MetricsOptions: opentelemetry.MetricsOptions{ - Metrics: stats2.NewMetrics(), // Distinct to nil, which creates default metrics. This empty set creates no metrics. + Metrics: estats.NewMetrics(), // Distinct to nil, which creates default metrics. This empty set creates no metrics. }, } do := opentelemetry.DialOption(opts) @@ -118,7 +118,7 @@ func ExampleMetrics_enableSome() { // To only create specific metrics, initialize Options as follows: opts := opentelemetry.Options{ MetricsOptions: opentelemetry.MetricsOptions{ - Metrics: stats2.NewMetrics(opentelemetry.ClientAttemptDuration, opentelemetry.ClientAttemptRcvdCompressedTotalMessageSize), // only create these metrics + Metrics: estats.NewMetrics(opentelemetry.ClientAttemptDuration, opentelemetry.ClientAttemptRcvdCompressedTotalMessageSize), // only create these metrics }, } do := opentelemetry.DialOption(opts) diff --git a/stats/opentelemetry/metricsregistry_test.go b/stats/opentelemetry/metricsregistry_test.go new file mode 100644 index 000000000..0d2bb956e --- /dev/null +++ b/stats/opentelemetry/metricsregistry_test.go @@ -0,0 +1,309 @@ +/* + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package opentelemetry + +import ( + "context" + "testing" + "time" + + estats "google.golang.org/grpc/experimental/stats" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpctest" + + "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +var defaultTestTimeout = 5 * time.Second + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +type metricsRecorderForTest interface { + estats.MetricsRecorder + initializeMetrics() +} + +func newClientStatsHandler(options MetricsOptions) metricsRecorderForTest { + return &clientStatsHandler{options: Options{MetricsOptions: options}} +} + +func newServerStatsHandler(options MetricsOptions) metricsRecorderForTest { + return &serverStatsHandler{options: Options{MetricsOptions: options}} +} + +// TestMetricsRegistryMetrics tests the OpenTelemetry behavior with respect to +// registered metrics. It registers metrics in the metrics registry. It then +// creates an OpenTelemetry client and server stats handler This test then makes +// measurements on those instruments using one of the stats handlers, then tests +// the expected metrics emissions, which includes default metrics and optional +// label assertions. +func (s) TestMetricsRegistryMetrics(t *testing.T) { + internal.SnapshotMetricRegistryForTesting.(func(t *testing.T))(t) + intCountHandle1 := estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "int-counter-1", + Description: "Sum of calls from test", + Unit: "int", + Labels: []string{"int counter 1 label key"}, + OptionalLabels: []string{"int counter 1 optional label key"}, + Default: true, + }) + // A non default metric. If not specified in OpenTelemetry constructor, this + // will become a no-op, so measurements recorded on it won't show up in + // emitted metrics. + intCountHandle2 := estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "int-counter-2", + Description: "Sum of calls from test", + Unit: "int", + Labels: []string{"int counter 2 label key"}, + OptionalLabels: []string{"int counter 2 optional label key"}, + Default: false, + }) + // Register another non default metric. This will get added to the default + // metrics set in the OpenTelemetry constructor options, so metrics recorded + // on this should show up in metrics emissions. + intCountHandle3 := estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "int-counter-3", + Description: "sum of calls from test", + Unit: "int", + Labels: []string{"int counter 3 label key"}, + OptionalLabels: []string{"int counter 3 optional label key"}, + Default: false, + }) + floatCountHandle := estats.RegisterFloat64Count(estats.MetricDescriptor{ + Name: "float-counter", + Description: "sum of calls from test", + Unit: "float", + Labels: []string{"float counter label key"}, + OptionalLabels: []string{"float counter optional label key"}, + Default: true, + }) + bounds := []float64{0, 5, 10} + intHistoHandle := estats.RegisterInt64Histo(estats.MetricDescriptor{ + Name: "int-histo", + Description: "histogram of call values from tests", + Unit: "int", + Labels: []string{"int histo label key"}, + OptionalLabels: []string{"int histo optional label key"}, + Default: true, + Bounds: bounds, + }) + floatHistoHandle := estats.RegisterFloat64Histo(estats.MetricDescriptor{ + Name: "float-histo", + Description: "histogram of call values from tests", + Unit: "float", + Labels: []string{"float histo label key"}, + OptionalLabels: []string{"float histo optional label key"}, + Default: true, + Bounds: bounds, + }) + intGaugeHandle := estats.RegisterInt64Gauge(estats.MetricDescriptor{ + Name: "simple-gauge", + Description: "the most recent int emitted by test", + Unit: "int", + Labels: []string{"int gauge label key"}, + OptionalLabels: []string{"int gauge optional label key"}, + Default: true, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Only float optional labels are configured, so only float optional labels should show up. + // All required labels should show up. + wantMetrics := []metricdata.Metrics{ + { + Name: "int-counter-1", + Description: "Sum of calls from test", + Unit: "int", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("int counter 1 label key", "int counter 1 label value")), // No optional label, not float. + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "int-counter-3", + Description: "sum of calls from test", + Unit: "int", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("int counter 3 label key", "int counter 3 label value")), // No optional label, not float. + Value: 4, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "float-counter", + Description: "sum of calls from test", + Unit: "float", + Data: metricdata.Sum[float64]{ + DataPoints: []metricdata.DataPoint[float64]{ + { + Attributes: attribute.NewSet(attribute.String("float counter label key", "float counter label value"), attribute.String("float counter optional label key", "float counter optional label value")), + Value: 1.2, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "int-histo", + Description: "histogram of call values from tests", + Unit: "int", + Data: metricdata.Histogram[int64]{ + DataPoints: []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("int histo label key", "int histo label value")), // No optional label, not float. + Count: 1, + Bounds: bounds, + BucketCounts: []uint64{0, 1, 0, 0}, + Min: metricdata.NewExtrema(int64(3)), + Max: metricdata.NewExtrema(int64(3)), + Sum: 3, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "float-histo", + Description: "histogram of call values from tests", + Unit: "float", + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + { + Attributes: attribute.NewSet(attribute.String("float histo label key", "float histo label value"), attribute.String("float histo optional label key", "float histo optional label value")), + Count: 1, + Bounds: bounds, + BucketCounts: []uint64{0, 1, 0, 0}, + Min: metricdata.NewExtrema(float64(4.3)), + Max: metricdata.NewExtrema(float64(4.3)), + Sum: 4.3, + }, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: "simple-gauge", + Description: "the most recent int emitted by test", + Unit: "int", + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("int gauge label key", "int gauge label value")), // No optional label, not float. + Value: 8, + }, + }, + }, + }, + } + + for _, test := range []struct { + name string + constructor func(options MetricsOptions) metricsRecorderForTest + }{ + { + name: "client stats handler", + constructor: newClientStatsHandler, + }, + { + name: "server stats handler", + constructor: newServerStatsHandler, + }, + } { + t.Run(test.name, func(t *testing.T) { + reader := otelmetric.NewManualReader() + provider := otelmetric.NewMeterProvider(otelmetric.WithReader(reader)) + + // This configures the defaults alongside int counter 3. All the instruments + // registered except int counter 2 and 3 are default, so all measurements + // recorded should show up in reader collected metrics except those for int + // counter 2. + // This also only toggles the float count and float histo optional labels, + // so only those should show up in metrics emissions. All the required + // labels should show up in metrics emissions. + mo := MetricsOptions{ + Metrics: DefaultMetrics().Add("int-counter-3"), + OptionalLabels: []string{"float counter optional label key", "float histo optional label key"}, + MeterProvider: provider, + } + mr := test.constructor(mo) + mr.initializeMetrics() + // These Record calls are guaranteed at a layer underneath OpenTelemetry for + // labels emitted to match the length of labels + optional labels. + intCountHandle1.Record(mr, 1, []string{"int counter 1 label value", "int counter 1 optional label value"}...) + // int-counter-2 is not part of metrics specified (not default), so this + // record call shouldn't show up in the reader. + intCountHandle2.Record(mr, 2, []string{"int counter 2 label value", "int counter 2 optional label value"}...) + // int-counter-3 is part of metrics specified, so this call should show up + // in the reader. + intCountHandle3.Record(mr, 4, []string{"int counter 3 label value", "int counter 3 optional label value"}...) + + // All future recording points should show up in emissions as all of these are defaults. + floatCountHandle.Record(mr, 1.2, []string{"float counter label value", "float counter optional label value"}...) + intHistoHandle.Record(mr, 3, []string{"int histo label value", "int histo optional label value"}...) + floatHistoHandle.Record(mr, 4.3, []string{"float histo label value", "float histo optional label value"}...) + intGaugeHandle.Record(mr, 7, []string{"int gauge label value", "int gauge optional label value"}...) + // This second gauge call should take the place of the previous gauge call. + intGaugeHandle.Record(mr, 8, []string{"int gauge label value", "int gauge optional label value"}...) + rm := &metricdata.ResourceMetrics{} + reader.Collect(ctx, rm) + gotMetrics := map[string]metricdata.Metrics{} + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + gotMetrics[m.Name] = m + } + } + + for _, metric := range wantMetrics { + val, ok := gotMetrics[metric.Name] + if !ok { + t.Fatalf("Metric %v not present in recorded metrics", metric.Name) + } + if !metricdatatest.AssertEqual(t, metric, val, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) { + t.Fatalf("Metrics data type not equal for metric: %v", metric.Name) + } + } + + // int-counter-2 is not a default metric and wasn't specified in + // constructor, so emissions should not show up. + if _, ok := gotMetrics["int-counter-2"]; ok { + t.Fatalf("Metric int-counter-2 present in recorded metrics, was not configured") + } + }) + } +} diff --git a/stats/opentelemetry/opentelemetry.go b/stats/opentelemetry/opentelemetry.go index aa5354d7a..fbc1ec1c0 100644 --- a/stats/opentelemetry/opentelemetry.go +++ b/stats/opentelemetry/opentelemetry.go @@ -30,7 +30,8 @@ import ( "google.golang.org/grpc/internal" otelinternal "google.golang.org/grpc/stats/opentelemetry/internal" - "go.opentelemetry.io/otel/metric" + otelattribute "go.opentelemetry.io/otel/attribute" + otelmetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" ) @@ -59,7 +60,7 @@ type MetricsOptions struct { // unset, no metrics will be recorded. Any implementation knobs (i.e. views, // bounds) set in the MeterProvider take precedence over the API calls from // this interface. (i.e. it will create default views for unset views). - MeterProvider metric.MeterProvider + MeterProvider otelmetric.MeterProvider // Metrics are the metrics to instrument. Will create instrument and record telemetry // for corresponding metric supported by the client and server @@ -184,65 +185,190 @@ type attemptInfo struct { type clientMetrics struct { // "grpc.client.attempt.started" - attemptStarted metric.Int64Counter + attemptStarted otelmetric.Int64Counter // "grpc.client.attempt.duration" - attemptDuration metric.Float64Histogram + attemptDuration otelmetric.Float64Histogram // "grpc.client.attempt.sent_total_compressed_message_size" - attemptSentTotalCompressedMessageSize metric.Int64Histogram + attemptSentTotalCompressedMessageSize otelmetric.Int64Histogram // "grpc.client.attempt.rcvd_total_compressed_message_size" - attemptRcvdTotalCompressedMessageSize metric.Int64Histogram - + attemptRcvdTotalCompressedMessageSize otelmetric.Int64Histogram // "grpc.client.call.duration" - callDuration metric.Float64Histogram + callDuration otelmetric.Float64Histogram } type serverMetrics struct { // "grpc.server.call.started" - callStarted metric.Int64Counter + callStarted otelmetric.Int64Counter // "grpc.server.call.sent_total_compressed_message_size" - callSentTotalCompressedMessageSize metric.Int64Histogram + callSentTotalCompressedMessageSize otelmetric.Int64Histogram // "grpc.server.call.rcvd_total_compressed_message_size" - callRcvdTotalCompressedMessageSize metric.Int64Histogram + callRcvdTotalCompressedMessageSize otelmetric.Int64Histogram // "grpc.server.call.duration" - callDuration metric.Float64Histogram + callDuration otelmetric.Float64Histogram } -func createInt64Counter(setOfMetrics map[estats.Metric]bool, metricName estats.Metric, meter metric.Meter, options ...metric.Int64CounterOption) metric.Int64Counter { +func createInt64Counter(setOfMetrics map[estats.Metric]bool, metricName estats.Metric, meter otelmetric.Meter, options ...otelmetric.Int64CounterOption) otelmetric.Int64Counter { if _, ok := setOfMetrics[metricName]; !ok { return noop.Int64Counter{} } ret, err := meter.Int64Counter(string(metricName), options...) if err != nil { - logger.Errorf("failed to register metric \"%v\", will not record", metricName) + logger.Errorf("failed to register metric \"%v\", will not record: %v", metricName, err) return noop.Int64Counter{} } return ret } -func createInt64Histogram(setOfMetrics map[estats.Metric]bool, metricName estats.Metric, meter metric.Meter, options ...metric.Int64HistogramOption) metric.Int64Histogram { +func createFloat64Counter(setOfMetrics map[estats.Metric]bool, metricName estats.Metric, meter otelmetric.Meter, options ...otelmetric.Float64CounterOption) otelmetric.Float64Counter { + if _, ok := setOfMetrics[metricName]; !ok { + return noop.Float64Counter{} + } + ret, err := meter.Float64Counter(string(metricName), options...) + if err != nil { + logger.Errorf("failed to register metric \"%v\", will not record: %v", metricName, err) + return noop.Float64Counter{} + } + return ret +} + +func createInt64Histogram(setOfMetrics map[estats.Metric]bool, metricName estats.Metric, meter otelmetric.Meter, options ...otelmetric.Int64HistogramOption) otelmetric.Int64Histogram { if _, ok := setOfMetrics[metricName]; !ok { return noop.Int64Histogram{} } ret, err := meter.Int64Histogram(string(metricName), options...) if err != nil { - logger.Errorf("failed to register metric \"%v\", will not record", metricName) + logger.Errorf("failed to register metric \"%v\", will not record: %v", metricName, err) return noop.Int64Histogram{} } return ret } -func createFloat64Histogram(setOfMetrics map[estats.Metric]bool, metricName estats.Metric, meter metric.Meter, options ...metric.Float64HistogramOption) metric.Float64Histogram { +func createFloat64Histogram(setOfMetrics map[estats.Metric]bool, metricName estats.Metric, meter otelmetric.Meter, options ...otelmetric.Float64HistogramOption) otelmetric.Float64Histogram { if _, ok := setOfMetrics[metricName]; !ok { return noop.Float64Histogram{} } ret, err := meter.Float64Histogram(string(metricName), options...) if err != nil { - logger.Errorf("failed to register metric \"%v\", will not record", metricName) + logger.Errorf("failed to register metric \"%v\", will not record: %v", metricName, err) return noop.Float64Histogram{} } return ret } +func createInt64Gauge(setOfMetrics map[estats.Metric]bool, metricName estats.Metric, meter otelmetric.Meter, options ...otelmetric.Int64GaugeOption) otelmetric.Int64Gauge { + if _, ok := setOfMetrics[metricName]; !ok { + return noop.Int64Gauge{} + } + ret, err := meter.Int64Gauge(string(metricName), options...) + if err != nil { + logger.Errorf("failed to register metric \"%v\", will not record: %v", metricName, err) + return noop.Int64Gauge{} + } + return ret +} + +func optionFromLabels(labelKeys []string, optionalLabelKeys []string, optionalLabels []string, labelVals ...string) otelmetric.MeasurementOption { + var attributes []otelattribute.KeyValue + + // Once it hits here lower level has guaranteed length of labelVals matches + // labelKeys + optionalLabelKeys. + for i, label := range labelKeys { + attributes = append(attributes, otelattribute.String(label, labelVals[i])) + } + + for i, label := range optionalLabelKeys { + for _, optLabel := range optionalLabels { // o(n) could build out a set but n is currently capped at < 5 + if label == optLabel { + attributes = append(attributes, otelattribute.String(label, labelVals[i+len(labelKeys)])) + } + } + } + return otelmetric.WithAttributes(attributes...) +} + +// registryMetrics implements MetricsRecorder for the client and server stats +// handlers. +type registryMetrics struct { + intCounts map[*estats.MetricDescriptor]otelmetric.Int64Counter + floatCounts map[*estats.MetricDescriptor]otelmetric.Float64Counter + intHistos map[*estats.MetricDescriptor]otelmetric.Int64Histogram + floatHistos map[*estats.MetricDescriptor]otelmetric.Float64Histogram + intGauges map[*estats.MetricDescriptor]otelmetric.Int64Gauge + + optionalLabels []string +} + +func (rm *registryMetrics) registerMetrics(metrics *estats.Metrics, meter otelmetric.Meter) { + rm.intCounts = make(map[*estats.MetricDescriptor]otelmetric.Int64Counter) + rm.floatCounts = make(map[*estats.MetricDescriptor]otelmetric.Float64Counter) + rm.intHistos = make(map[*estats.MetricDescriptor]otelmetric.Int64Histogram) + rm.floatHistos = make(map[*estats.MetricDescriptor]otelmetric.Float64Histogram) + rm.intGauges = make(map[*estats.MetricDescriptor]otelmetric.Int64Gauge) + + for metric := range metrics.Metrics() { + desc := estats.DescriptorForMetric(metric) + if desc == nil { + // Either the metric was per call or the metric is not registered. + // Thus, if this component ever receives the desc as a handle in + // record it will be a no-op. + continue + } + switch desc.Type { + case estats.MetricTypeIntCount: + rm.intCounts[desc] = createInt64Counter(metrics.Metrics(), desc.Name, meter, otelmetric.WithUnit(desc.Unit), otelmetric.WithDescription(desc.Description)) + case estats.MetricTypeFloatCount: + rm.floatCounts[desc] = createFloat64Counter(metrics.Metrics(), desc.Name, meter, otelmetric.WithUnit(desc.Unit), otelmetric.WithDescription(desc.Description)) + case estats.MetricTypeIntHisto: + rm.intHistos[desc] = createInt64Histogram(metrics.Metrics(), desc.Name, meter, otelmetric.WithUnit(desc.Unit), otelmetric.WithDescription(desc.Description), otelmetric.WithExplicitBucketBoundaries(desc.Bounds...)) + case estats.MetricTypeFloatHisto: + rm.floatHistos[desc] = createFloat64Histogram(metrics.Metrics(), desc.Name, meter, otelmetric.WithUnit(desc.Unit), otelmetric.WithDescription(desc.Description), otelmetric.WithExplicitBucketBoundaries(desc.Bounds...)) + case estats.MetricTypeIntGauge: + rm.intGauges[desc] = createInt64Gauge(metrics.Metrics(), desc.Name, meter, otelmetric.WithUnit(desc.Unit), otelmetric.WithDescription(desc.Description)) + } + } +} + +func (rm *registryMetrics) RecordInt64Count(handle *estats.Int64CountHandle, incr int64, labels ...string) { + desc := (*estats.MetricDescriptor)(handle) + ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) + + if ic, ok := rm.intCounts[desc]; ok { + ic.Add(context.TODO(), incr, ao) + } +} + +func (rm *registryMetrics) RecordFloat64Count(handle *estats.Float64CountHandle, incr float64, labels ...string) { + desc := (*estats.MetricDescriptor)(handle) + ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) + if fc, ok := rm.floatCounts[desc]; ok { + fc.Add(context.TODO(), incr, ao) + } +} + +func (rm *registryMetrics) RecordInt64Histo(handle *estats.Int64HistoHandle, incr int64, labels ...string) { + desc := (*estats.MetricDescriptor)(handle) + ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) + if ih, ok := rm.intHistos[desc]; ok { + ih.Record(context.TODO(), incr, ao) + } +} + +func (rm *registryMetrics) RecordFloat64Histo(handle *estats.Float64HistoHandle, incr float64, labels ...string) { + desc := (*estats.MetricDescriptor)(handle) + ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) + if fh, ok := rm.floatHistos[desc]; ok { + fh.Record(context.TODO(), incr, ao) + } +} + +func (rm *registryMetrics) RecordInt64Gauge(handle *estats.Int64GaugeHandle, incr int64, labels ...string) { + desc := (*estats.MetricDescriptor)(handle) + ao := optionFromLabels(desc.Labels, desc.OptionalLabels, rm.optionalLabels, labels...) + if ig, ok := rm.intGauges[desc]; ok { + ig.Record(context.TODO(), incr, ao) + } +} + // Users of this component should use these bucket boundaries as part of their // SDK MeterProvider passed in. This component sends this as "advice" to the // API, which works, however this stability is not guaranteed, so for safety the @@ -253,6 +379,13 @@ var ( DefaultLatencyBounds = []float64{0, 0.00001, 0.00005, 0.0001, 0.0003, 0.0006, 0.0008, 0.001, 0.002, 0.003, 0.004, 0.005, 0.006, 0.008, 0.01, 0.013, 0.016, 0.02, 0.025, 0.03, 0.04, 0.05, 0.065, 0.08, 0.1, 0.13, 0.16, 0.2, 0.25, 0.3, 0.4, 0.5, 0.65, 0.8, 1, 2, 5, 10, 20, 50, 100} // provide "advice" through API, SDK should set this too // DefaultSizeBounds are the default bounds for metrics which record size. DefaultSizeBounds = []float64{0, 1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296} - // DefaultMetrics are the default metrics provided by this module. - DefaultMetrics = estats.NewMetrics(ClientAttemptStarted, ClientAttemptDuration, ClientAttemptSentCompressedTotalMessageSize, ClientAttemptRcvdCompressedTotalMessageSize, ClientCallDuration, ServerCallStarted, ServerCallSentCompressedTotalMessageSize, ServerCallRcvdCompressedTotalMessageSize, ServerCallDuration) + // defaultPerCallMetrics are the default metrics provided by this module. + defaultPerCallMetrics = estats.NewMetrics(ClientAttemptStarted, ClientAttemptDuration, ClientAttemptSentCompressedTotalMessageSize, ClientAttemptRcvdCompressedTotalMessageSize, ClientCallDuration, ServerCallStarted, ServerCallSentCompressedTotalMessageSize, ServerCallRcvdCompressedTotalMessageSize, ServerCallDuration) ) + +// DefaultMetrics returns a set of default OpenTelemetry metrics. +// +// This should only be invoked after init time. +func DefaultMetrics() *estats.Metrics { + return defaultPerCallMetrics.Join(estats.DefaultMetrics) +} diff --git a/stats/opentelemetry/server_metrics.go b/stats/opentelemetry/server_metrics.go index 6f9ad7a80..0b4246c18 100644 --- a/stats/opentelemetry/server_metrics.go +++ b/stats/opentelemetry/server_metrics.go @@ -33,8 +33,8 @@ import ( ) type serverStatsHandler struct { - options Options - + estats.MetricsRecorder + options Options serverMetrics serverMetrics } @@ -51,13 +51,19 @@ func (h *serverStatsHandler) initializeMetrics() { } metrics := h.options.MetricsOptions.Metrics if metrics == nil { - metrics = DefaultMetrics + metrics = DefaultMetrics() } h.serverMetrics.callStarted = createInt64Counter(metrics.Metrics(), "grpc.server.call.started", meter, otelmetric.WithUnit("call"), otelmetric.WithDescription("Number of server calls started.")) h.serverMetrics.callSentTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.server.call.sent_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes sent per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) h.serverMetrics.callRcvdTotalCompressedMessageSize = createInt64Histogram(metrics.Metrics(), "grpc.server.call.rcvd_total_compressed_message_size", meter, otelmetric.WithUnit("By"), otelmetric.WithDescription("Compressed message bytes received per server call."), otelmetric.WithExplicitBucketBoundaries(DefaultSizeBounds...)) h.serverMetrics.callDuration = createFloat64Histogram(metrics.Metrics(), "grpc.server.call.duration", meter, otelmetric.WithUnit("s"), otelmetric.WithDescription("End-to-end time taken to complete a call from server transport's perspective."), otelmetric.WithExplicitBucketBoundaries(DefaultLatencyBounds...)) + + rm := ®istryMetrics{ + optionalLabels: h.options.MetricsOptions.OptionalLabels, + } + h.MetricsRecorder = rm + rm.registerMetrics(metrics, meter) } // attachLabelsTransportStream intercepts SetHeader and SendHeader calls of the