diff --git a/CHANGELOG.md b/CHANGELOG.md index b5b402cf3..6ee9ba0ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed - Metric SDK/API implementation type `InstrumentKind` moves into `sdkapi` sub-package. (#2091) +- The Metrics SDK export record no longer contains a Resource pointer, the SDK `"go.opentelemetry.io/otel/sdk/trace/export/metric".Exporter.Export()` function for push-based exporters now takes a single Resource argument, pull-based exporters use `"go.opentelemetry.io/otel/sdk/metric/controller/basic".Controller.Resource()`. (#2120) ### Deprecated @@ -35,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `fromEnv` detector no longer throws an error when `OTEL_RESOURCE_ATTRIBUTES` environment variable is not set or empty. (#2138) - Setting the global `ErrorHandler` with `"go.opentelemetry.io/otel".SetErrorHandler` multiple times is now supported. (#2160, #2140) - The `"go.opentelemetry.io/otel/attribute".Any` function now supports `int32` values. (#2169) +- Multiple calls to `"go.opentelemetry.io/otel/sdk/metric/controller/basic".WithResource()` are handled correctly, and when no resources are provided `"go.opentelemetry.io/otel/sdk/resource".Default()` is used. (#2120) ### Security diff --git a/bridge/opencensus/exporter.go b/bridge/opencensus/exporter.go index bc709a69c..c5a0986b4 100644 --- a/bridge/opencensus/exporter.go +++ b/bridge/opencensus/exporter.go @@ -51,7 +51,11 @@ type exporter struct { // ExportMetrics implements the OpenCensus metric Exporter interface func (e *exporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error { - return e.base.Export(ctx, &checkpointSet{metrics: metrics}) + res := resource.Empty() + if len(metrics) != 0 { + res = convertResource(metrics[0].Resource) + } + return e.base.Export(ctx, res, &checkpointSet{metrics: metrics}) } type checkpointSet struct { @@ -69,7 +73,6 @@ func (d *checkpointSet) ForEach(exporter export.ExportKindSelector, f func(expor otel.Handle(err) continue } - res := convertResource(m.Resource) for _, ts := range m.TimeSeries { if len(ts.Points) == 0 { continue @@ -87,7 +90,6 @@ func (d *checkpointSet) ForEach(exporter export.ExportKindSelector, f func(expor if err := f(export.NewRecord( &descriptor, &ls, - res, agg, ts.StartTime, agg.end(), @@ -119,6 +121,7 @@ func convertLabels(keys []metricdata.LabelKey, values []metricdata.LabelValue) ( } // convertResource converts an OpenCensus Resource to an OpenTelemetry Resource +// Note: the ocresource.Resource Type field is not used. func convertResource(res *ocresource.Resource) *resource.Resource { labels := []attribute.KeyValue{} if res == nil { diff --git a/bridge/opencensus/exporter_test.go b/bridge/opencensus/exporter_test.go index 8e6b98631..eace2b063 100644 --- a/bridge/opencensus/exporter_test.go +++ b/bridge/opencensus/exporter_test.go @@ -39,12 +39,14 @@ import ( type fakeExporter struct { export.Exporter - records []export.Record - err error + records []export.Record + resource *resource.Resource + err error } -func (f *fakeExporter) Export(ctx context.Context, cps exportmetric.CheckpointSet) error { +func (f *fakeExporter) Export(ctx context.Context, res *resource.Resource, cps exportmetric.CheckpointSet) error { return cps.ForEach(f, func(record exportmetric.Record) error { + f.resource = res f.records = append(f.records, record) return f.err }) @@ -82,6 +84,7 @@ func TestExportMetrics(t *testing.T) { input []*metricdata.Metric exportErr error expected []export.Record + expectedResource *resource.Resource expectedHandledError error }{ { @@ -142,6 +145,12 @@ func TestExportMetrics(t *testing.T) { desc: "success", input: []*metricdata.Metric{ { + Resource: &ocresource.Resource{ + Labels: map[string]string{ + "R1": "V1", + "R2": "V2", + }, + }, TimeSeries: []*metricdata.TimeSeries{ { StartTime: now, @@ -152,11 +161,14 @@ func TestExportMetrics(t *testing.T) { }, }, }, + expectedResource: resource.NewSchemaless( + attribute.String("R1", "V1"), + attribute.String("R2", "V2"), + ), expected: []export.Record{ export.NewRecord( &basicDesc, attribute.EmptySet(), - resource.NewSchemaless(), &ocExactAggregator{ points: []aggregation.Point{ { @@ -188,7 +200,6 @@ func TestExportMetrics(t *testing.T) { export.NewRecord( &basicDesc, attribute.EmptySet(), - resource.NewSchemaless(), &ocExactAggregator{ points: []aggregation.Point{ { @@ -223,7 +234,6 @@ func TestExportMetrics(t *testing.T) { export.NewRecord( &basicDesc, attribute.EmptySet(), - resource.NewSchemaless(), &ocExactAggregator{ points: []aggregation.Point{ { @@ -255,6 +265,9 @@ func TestExportMetrics(t *testing.T) { if len(tc.expected) != len(output) { t.Fatalf("ExportMetrics(%+v) = %d records, want %d records", tc.input, len(output), len(tc.expected)) } + if fakeExporter.resource.String() != tc.expectedResource.String() { + t.Errorf("ExportMetrics(%+v)[i].Resource() = %+v, want %+v", tc.input, fakeExporter.resource.String(), tc.expectedResource.String()) + } for i, expected := range tc.expected { if output[i].StartTime() != expected.StartTime() { t.Errorf("ExportMetrics(%+v)[i].StartTime() = %+v, want %+v", tc.input, output[i].StartTime(), expected.StartTime()) @@ -262,9 +275,6 @@ func TestExportMetrics(t *testing.T) { if output[i].EndTime() != expected.EndTime() { t.Errorf("ExportMetrics(%+v)[i].EndTime() = %+v, want %+v", tc.input, output[i].EndTime(), expected.EndTime()) } - if output[i].Resource().String() != expected.Resource().String() { - t.Errorf("ExportMetrics(%+v)[i].Resource() = %+v, want %+v", tc.input, output[i].Resource().String(), expected.Resource().String()) - } if output[i].Descriptor().Name() != expected.Descriptor().Name() { t.Errorf("ExportMetrics(%+v)[i].Descriptor() = %+v, want %+v", tc.input, output[i].Descriptor().Name(), expected.Descriptor().Name()) } diff --git a/exporters/otlp/otlpmetric/exporter.go b/exporters/otlp/otlpmetric/exporter.go index 673223c99..548a4afa3 100644 --- a/exporters/otlp/otlpmetric/exporter.go +++ b/exporters/otlp/otlpmetric/exporter.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/otel/metric" metricsdk "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" + "go.opentelemetry.io/otel/sdk/resource" ) var ( @@ -42,8 +43,8 @@ type Exporter struct { } // Export exports a batch of metrics. -func (e *Exporter) Export(ctx context.Context, checkpointSet metricsdk.CheckpointSet) error { - rms, err := metrictransform.CheckpointSet(ctx, e, checkpointSet, 1) +func (e *Exporter) Export(ctx context.Context, res *resource.Resource, checkpointSet metricsdk.CheckpointSet) error { + rms, err := metrictransform.CheckpointSet(ctx, e, res, checkpointSet, 1) if err != nil { return err } diff --git a/exporters/otlp/otlpmetric/exporter_test.go b/exporters/otlp/otlpmetric/exporter_test.go index 9f767e355..c98cff6ec 100644 --- a/exporters/otlp/otlpmetric/exporter_test.go +++ b/exporters/otlp/otlpmetric/exporter_test.go @@ -16,27 +16,29 @@ package otlpmetric_test import ( "context" + "fmt" "sync" "testing" "time" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" - "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" - + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/testing/protocmp" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/metrictransform" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/number" "go.opentelemetry.io/otel/metric/sdkapi" metricsdk "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" + "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/resource" commonpb "go.opentelemetry.io/proto/otlp/common/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" - resourcepb "go.opentelemetry.io/proto/otlp/resource/v1" ) var ( @@ -98,21 +100,17 @@ func (m *checkpointSet) ForEach(_ metricsdk.ExportKindSelector, fn func(metricsd } type record struct { - name string - iKind sdkapi.InstrumentKind - nKind number.Kind - resource *resource.Resource - opts []metric.InstrumentOption - labels []attribute.KeyValue + name string + iKind sdkapi.InstrumentKind + nKind number.Kind + opts []metric.InstrumentOption + labels []attribute.KeyValue } var ( baseKeyValues = []attribute.KeyValue{attribute.String("host", "test.com")} cpuKey = attribute.Key("CPU") - testInstA = resource.NewSchemaless(attribute.String("instance", "tester-a")) - testInstB = resource.NewSchemaless(attribute.String("instance", "tester-b")) - testHistogramBoundaries = []float64{2.0, 4.0, 8.0} cpu1Labels = []*commonpb.KeyValue{ @@ -152,43 +150,21 @@ var ( }, } - testerAResource = &resourcepb.Resource{ - Attributes: []*commonpb.KeyValue{ - { - Key: "instance", - Value: &commonpb.AnyValue{ - Value: &commonpb.AnyValue_StringValue{ - StringValue: "tester-a", - }, - }, - }, - }, - } - testerBResource = &resourcepb.Resource{ - Attributes: []*commonpb.KeyValue{ - { - Key: "instance", - Value: &commonpb.AnyValue{ - Value: &commonpb.AnyValue_StringValue{ - StringValue: "tester-b", - }, - }, - }, - }, - } + testerAResource = resource.NewSchemaless(attribute.String("instance", "tester-a")) + testerAResourcePb = metrictransform.Resource(testerAResource) ) func TestNoGroupingExport(t *testing.T) { runMetricExportTests( t, nil, + nil, []record{ { "int64-count", sdkapi.CounterInstrumentKind, number.Int64Kind, nil, - nil, append(baseKeyValues, cpuKey.Int(1)), }, { @@ -196,7 +172,6 @@ func TestNoGroupingExport(t *testing.T) { sdkapi.CounterInstrumentKind, number.Int64Kind, nil, - nil, append(baseKeyValues, cpuKey.Int(2)), }, }, @@ -243,7 +218,6 @@ func TestValuerecorderMetricGroupingExport(t *testing.T) { sdkapi.ValueRecorderInstrumentKind, number.Int64Kind, nil, - nil, append(baseKeyValues, cpuKey.Int(1)), } expected := []*metricpb.ResourceMetrics{ @@ -285,7 +259,7 @@ func TestValuerecorderMetricGroupingExport(t *testing.T) { }, }, } - runMetricExportTests(t, nil, []record{r, r}, expected) + runMetricExportTests(t, nil, nil, []record{r, r}, expected) } func TestCountInt64MetricGroupingExport(t *testing.T) { @@ -294,12 +268,12 @@ func TestCountInt64MetricGroupingExport(t *testing.T) { sdkapi.CounterInstrumentKind, number.Int64Kind, nil, - nil, append(baseKeyValues, cpuKey.Int(1)), } runMetricExportTests( t, nil, + nil, []record{r, r}, []*metricpb.ResourceMetrics{ { @@ -344,12 +318,12 @@ func TestCountFloat64MetricGroupingExport(t *testing.T) { sdkapi.CounterInstrumentKind, number.Float64Kind, nil, - nil, append(baseKeyValues, cpuKey.Int(1)), } runMetricExportTests( t, nil, + nil, []record{r, r}, []*metricpb.ResourceMetrics{ { @@ -392,12 +366,12 @@ func TestResourceMetricGroupingExport(t *testing.T) { runMetricExportTests( t, nil, + testerAResource, []record{ { "int64-count", sdkapi.CounterInstrumentKind, number.Int64Kind, - testInstA, nil, append(baseKeyValues, cpuKey.Int(1)), }, @@ -405,7 +379,6 @@ func TestResourceMetricGroupingExport(t *testing.T) { "int64-count", sdkapi.CounterInstrumentKind, number.Int64Kind, - testInstA, nil, append(baseKeyValues, cpuKey.Int(1)), }, @@ -413,7 +386,6 @@ func TestResourceMetricGroupingExport(t *testing.T) { "int64-count", sdkapi.CounterInstrumentKind, number.Int64Kind, - testInstA, nil, append(baseKeyValues, cpuKey.Int(2)), }, @@ -421,14 +393,13 @@ func TestResourceMetricGroupingExport(t *testing.T) { "int64-count", sdkapi.CounterInstrumentKind, number.Int64Kind, - testInstB, nil, append(baseKeyValues, cpuKey.Int(1)), }, }, []*metricpb.ResourceMetrics{ { - Resource: testerAResource, + Resource: testerAResourcePb, InstrumentationLibraryMetrics: []*metricpb.InstrumentationLibraryMetrics{ { Metrics: []*metricpb.Metric{ @@ -457,26 +428,6 @@ func TestResourceMetricGroupingExport(t *testing.T) { StartTimeUnixNano: startTime(), TimeUnixNano: pointTime(), }, - }, - }, - }, - }, - }, - }, - }, - }, - { - Resource: testerBResource, - InstrumentationLibraryMetrics: []*metricpb.InstrumentationLibraryMetrics{ - { - Metrics: []*metricpb.Metric{ - { - Name: "int64-count", - Data: &metricpb.Metric_Sum{ - Sum: &metricpb.Sum{ - IsMonotonic: true, - AggregationTemporality: metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, - DataPoints: []*metricpb.NumberDataPoint{ { Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11}, Attributes: cpu1Labels, @@ -510,12 +461,12 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) { runMetricExportTests( t, nil, + testerAResource, []record{ { "int64-count", sdkapi.CounterInstrumentKind, number.Int64Kind, - testInstA, countingLib1, append(baseKeyValues, cpuKey.Int(1)), }, @@ -523,7 +474,6 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) { "int64-count", sdkapi.CounterInstrumentKind, number.Int64Kind, - testInstA, countingLib2, append(baseKeyValues, cpuKey.Int(1)), }, @@ -531,7 +481,6 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) { "int64-count", sdkapi.CounterInstrumentKind, number.Int64Kind, - testInstA, countingLib1, append(baseKeyValues, cpuKey.Int(1)), }, @@ -539,7 +488,6 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) { "int64-count", sdkapi.CounterInstrumentKind, number.Int64Kind, - testInstA, countingLib1, append(baseKeyValues, cpuKey.Int(2)), }, @@ -547,22 +495,13 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) { "int64-count", sdkapi.CounterInstrumentKind, number.Int64Kind, - testInstA, summingLib, append(baseKeyValues, cpuKey.Int(1)), }, - { - "int64-count", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - testInstB, - countingLib1, - append(baseKeyValues, cpuKey.Int(1)), - }, }, []*metricpb.ResourceMetrics{ { - Resource: testerAResource, + Resource: testerAResourcePb, InstrumentationLibraryMetrics: []*metricpb.InstrumentationLibraryMetrics{ { InstrumentationLibrary: &commonpb.InstrumentationLibrary{ @@ -652,36 +591,6 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) { }, }, }, - { - Resource: testerBResource, - InstrumentationLibraryMetrics: []*metricpb.InstrumentationLibraryMetrics{ - { - InstrumentationLibrary: &commonpb.InstrumentationLibrary{ - Name: "counting-lib", - Version: "v1", - }, - Metrics: []*metricpb.Metric{ - { - Name: "int64-count", - Data: &metricpb.Metric_Sum{ - Sum: &metricpb.Sum{ - IsMonotonic: true, - AggregationTemporality: metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, - DataPoints: []*metricpb.NumberDataPoint{ - { - Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11}, - Attributes: cpu1Labels, - StartTimeUnixNano: startTime(), - TimeUnixNano: pointTime(), - }, - }, - }, - }, - }, - }, - }, - }, - }, }, ) } @@ -708,19 +617,19 @@ func TestStatelessExportKind(t *testing.T) { metricsdk.StatelessExportKindSelector(), ), }, + testerAResource, []record{ { "instrument", k.instrumentKind, number.Int64Kind, - testInstA, nil, append(baseKeyValues, cpuKey.Int(1)), }, }, []*metricpb.ResourceMetrics{ { - Resource: testerAResource, + Resource: testerAResourcePb, InstrumentationLibraryMetrics: []*metricpb.InstrumentationLibraryMetrics{ { Metrics: []*metricpb.Metric{ @@ -751,12 +660,11 @@ func TestStatelessExportKind(t *testing.T) { } } -func runMetricExportTests(t *testing.T, opts []otlpmetric.Option, rs []record, expected []*metricpb.ResourceMetrics) { +func runMetricExportTests(t *testing.T, opts []otlpmetric.Option, res *resource.Resource, records []record, expected []*metricpb.ResourceMetrics) { exp, driver := newExporter(t, opts...) - recs := map[attribute.Distinct][]metricsdk.Record{} - resources := map[attribute.Distinct]*resource.Resource{} - for _, r := range rs { + recs := []metricsdk.Record{} + for _, r := range records { lcopy := make([]attribute.KeyValue, len(r.labels)) copy(lcopy, r.labels) desc := metric.NewDescriptor(r.name, r.iKind, r.nKind, r.opts...) @@ -797,83 +705,47 @@ func runMetricExportTests(t *testing.T, opts []otlpmetric.Option, rs []record, e } require.NoError(t, agg.SynchronizedMove(ckpt, &desc)) - equiv := r.resource.Equivalent() - resources[equiv] = r.resource - recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, ckpt.Aggregation(), intervalStart, intervalEnd)) - } - for _, records := range recs { - assert.NoError(t, exp.Export(context.Background(), &checkpointSet{records: records})) + recs = append(recs, metricsdk.NewRecord(&desc, &labs, ckpt.Aggregation(), intervalStart, intervalEnd)) } + assert.NoError(t, exp.Export(context.Background(), res, &checkpointSet{records: recs})) // assert.ElementsMatch does not equate nested slices of different order, // therefore this requires the top level slice to be broken down. // Build a map of Resource/InstrumentationLibrary pairs to Metrics, from // that validate the metric elements match for all expected pairs. Finally, // make we saw all expected pairs. - type key struct { - resource, instrumentationLibrary string + keyFor := func(ilm *metricpb.InstrumentationLibraryMetrics) string { + return fmt.Sprintf("%s/%s", ilm.GetInstrumentationLibrary().GetName(), ilm.GetInstrumentationLibrary().GetVersion()) } - got := map[key][]*metricpb.Metric{} + got := map[string][]*metricpb.Metric{} for _, rm := range driver.rm { for _, ilm := range rm.InstrumentationLibraryMetrics { - k := key{ - resource: rm.GetResource().String(), - instrumentationLibrary: ilm.GetInstrumentationLibrary().String(), - } - got[k] = ilm.GetMetrics() + k := keyFor(ilm) + got[k] = append(got[k], ilm.GetMetrics()...) } } - seen := map[key]struct{}{} + + seen := map[string]struct{}{} for _, rm := range expected { for _, ilm := range rm.InstrumentationLibraryMetrics { - k := key{ - resource: rm.GetResource().String(), - instrumentationLibrary: ilm.GetInstrumentationLibrary().String(), - } + k := keyFor(ilm) seen[k] = struct{}{} g, ok := got[k] if !ok { - t.Errorf("missing metrics for:\n\tResource: %s\n\tInstrumentationLibrary: %s\n", k.resource, k.instrumentationLibrary) + t.Errorf("missing metrics for:\n\tInstrumentationLibrary: %q\n", k) continue } if !assert.Len(t, g, len(ilm.GetMetrics())) { continue } for i, expected := range ilm.GetMetrics() { - assert.Equal(t, expected.Name, g[i].Name) - assert.Equal(t, expected.Unit, g[i].Unit) - assert.Equal(t, expected.Description, g[i].Description) - switch g[i].Data.(type) { - case *metricpb.Metric_Gauge: - assert.ElementsMatch(t, expected.GetGauge().GetDataPoints(), g[i].GetGauge().GetDataPoints()) - case *metricpb.Metric_Sum: - assert.Equal(t, - expected.GetSum().GetAggregationTemporality(), - g[i].GetSum().GetAggregationTemporality(), - ) - assert.Equal(t, - expected.GetSum().GetIsMonotonic(), - g[i].GetSum().GetIsMonotonic(), - ) - assert.ElementsMatch(t, expected.GetSum().GetDataPoints(), g[i].GetSum().GetDataPoints()) - case *metricpb.Metric_Histogram: - assert.Equal( - t, - expected.GetHistogram().GetAggregationTemporality(), - g[i].GetHistogram().GetAggregationTemporality(), - ) - assert.ElementsMatch(t, expected.GetHistogram().GetDataPoints(), g[i].GetHistogram().GetDataPoints()) - case *metricpb.Metric_Summary: - assert.ElementsMatch(t, expected.GetSummary().GetDataPoints(), g[i].GetSummary().GetDataPoints()) - default: - assert.Failf(t, "unknown data type", g[i].Name) - } + assert.Equal(t, "", cmp.Diff(expected, g[i], protocmp.Transform())) } } } for k := range got { if _, ok := seen[k]; !ok { - t.Errorf("did not expect metrics for:\n\tResource: %s\n\tInstrumentationLibrary: %s\n", k.resource, k.instrumentationLibrary) + t.Errorf("did not expect metrics for:\n\tInstrumentationLibrary: %s\n", k) } } } @@ -895,7 +767,7 @@ func TestEmptyMetricExport(t *testing.T) { }, } { driver.Reset() - require.NoError(t, exp.Export(context.Background(), &checkpointSet{records: test.records})) + require.NoError(t, exp.Export(context.Background(), resource.Empty(), &checkpointSet{records: test.records})) assert.Equal(t, test.want, driver.rm) } } diff --git a/exporters/otlp/otlpmetric/go.mod b/exporters/otlp/otlpmetric/go.mod index 89498fda0..7ab9856f9 100644 --- a/exporters/otlp/otlpmetric/go.mod +++ b/exporters/otlp/otlpmetric/go.mod @@ -4,6 +4,7 @@ go 1.15 require ( github.com/cenkalti/backoff/v4 v4.1.1 + github.com/google/go-cmp v0.5.6 github.com/stretchr/testify v1.7.0 go.opentelemetry.io/otel v1.0.0-RC2 go.opentelemetry.io/otel/metric v0.22.0 diff --git a/exporters/otlp/otlpmetric/internal/metrictransform/metric.go b/exporters/otlp/otlpmetric/internal/metrictransform/metric.go index cd24c7a7b..44d9e4700 100644 --- a/exporters/otlp/otlpmetric/internal/metrictransform/metric.go +++ b/exporters/otlp/otlpmetric/internal/metrictransform/metric.go @@ -24,10 +24,8 @@ import ( "sync" "time" - "go.opentelemetry.io/otel/attribute" commonpb "go.opentelemetry.io/proto/otlp/common/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" - resourcepb "go.opentelemetry.io/proto/otlp/resource/v1" "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" @@ -60,7 +58,6 @@ var ( // result is the product of transforming Records into OTLP Metrics. type result struct { - Resource *resource.Resource InstrumentationLibrary instrumentation.Library Metric *metricpb.Metric Err error @@ -76,7 +73,7 @@ func toNanos(t time.Time) uint64 { // CheckpointSet transforms all records contained in a checkpoint into // batched OTLP ResourceMetrics. -func CheckpointSet(ctx context.Context, exportSelector export.ExportKindSelector, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) { +func CheckpointSet(ctx context.Context, exportSelector export.ExportKindSelector, res *resource.Resource, cps export.CheckpointSet, numWorkers uint) ([]*metricpb.ResourceMetrics, error) { records, errc := source(ctx, exportSelector, cps) // Start a fixed number of goroutines to transform records. @@ -95,7 +92,7 @@ func CheckpointSet(ctx context.Context, exportSelector export.ExportKindSelector }() // Synchronously collect the transformed records and transmit. - rms, err := sink(ctx, transformed) + rms, err := sink(ctx, res, transformed) if err != nil { return nil, err } @@ -139,7 +136,6 @@ func transformer(ctx context.Context, exportSelector export.ExportKindSelector, continue } res := result{ - Resource: r.Resource(), InstrumentationLibrary: instrumentation.Library{ Name: r.Descriptor().InstrumentationName(), Version: r.Descriptor().InstrumentationVersion(), @@ -160,41 +156,21 @@ func transformer(ctx context.Context, exportSelector export.ExportKindSelector, // Any errors encoutered transforming input will be reported with an // ErrTransforming as well as the completed ResourceMetrics. It is up to the // caller to handle any incorrect data in these ResourceMetrics. -func sink(ctx context.Context, in <-chan result) ([]*metricpb.ResourceMetrics, error) { +func sink(ctx context.Context, res *resource.Resource, in <-chan result) ([]*metricpb.ResourceMetrics, error) { var errStrings []string - type resourceBatch struct { - Resource *resourcepb.Resource - // Group by instrumentation library name and then the MetricDescriptor. - InstrumentationLibraryBatches map[instrumentation.Library]map[string]*metricpb.Metric - SchemaURL string - } - - // group by unique Resource string. - grouped := make(map[attribute.Distinct]resourceBatch) + // Group by instrumentation library name and then the MetricDescriptor. + grouped := map[instrumentation.Library]map[string]*metricpb.Metric{} for res := range in { if res.Err != nil { errStrings = append(errStrings, res.Err.Error()) continue } - rID := res.Resource.Equivalent() - rb, ok := grouped[rID] - if !ok { - rb = resourceBatch{ - Resource: Resource(res.Resource), - InstrumentationLibraryBatches: make(map[instrumentation.Library]map[string]*metricpb.Metric), - } - if res.Resource != nil { - rb.SchemaURL = res.Resource.SchemaURL() - } - grouped[rID] = rb - } - - mb, ok := rb.InstrumentationLibraryBatches[res.InstrumentationLibrary] + mb, ok := grouped[res.InstrumentationLibrary] if !ok { mb = make(map[string]*metricpb.Metric) - rb.InstrumentationLibraryBatches[res.InstrumentationLibrary] = mb + grouped[res.InstrumentationLibrary] = mb } mID := res.Metric.GetName() @@ -222,26 +198,26 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.ResourceMetrics, e return nil, nil } - var rms []*metricpb.ResourceMetrics - for _, rb := range grouped { - // TODO: populate ResourceMetrics.SchemaURL when the field is added to the Protobuf message. - rm := &metricpb.ResourceMetrics{Resource: rb.Resource} - for il, mb := range rb.InstrumentationLibraryBatches { - ilm := &metricpb.InstrumentationLibraryMetrics{ - Metrics: make([]*metricpb.Metric, 0, len(mb)), - } - if il != (instrumentation.Library{}) { - ilm.InstrumentationLibrary = &commonpb.InstrumentationLibrary{ - Name: il.Name, - Version: il.Version, - } - } - for _, m := range mb { - ilm.Metrics = append(ilm.Metrics, m) - } - rm.InstrumentationLibraryMetrics = append(rm.InstrumentationLibraryMetrics, ilm) + rm := &metricpb.ResourceMetrics{ + Resource: Resource(res), + } + if res != nil { + rm.SchemaUrl = res.SchemaURL() + } + + rms := []*metricpb.ResourceMetrics{rm} + for il, mb := range grouped { + ilm := &metricpb.InstrumentationLibraryMetrics{ + Metrics: make([]*metricpb.Metric, 0, len(mb)), + InstrumentationLibrary: &commonpb.InstrumentationLibrary{ + Name: il.Name, + Version: il.Version, + }, } - rms = append(rms, rm) + for _, m := range mb { + ilm.Metrics = append(ilm.Metrics, m) + } + rm.InstrumentationLibraryMetrics = append(rm.InstrumentationLibraryMetrics, ilm) } // Report any transform errors. diff --git a/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go b/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go index 428f68c79..4a8ec9288 100644 --- a/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go +++ b/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go @@ -36,7 +36,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" sumAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" - "go.opentelemetry.io/otel/sdk/resource" commonpb "go.opentelemetry.io/proto/otlp/common/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -155,7 +154,7 @@ func TestMinMaxSumCountDatapoints(t *testing.T) { }, }, } - record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd) + record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd) m, err := minMaxSumCount(record, ckpt) if assert.NoError(t, err) { assert.Nil(t, m.GetGauge()) @@ -186,7 +185,7 @@ func TestSumIntDataPoints(t *testing.T) { assert.NoError(t, s.Update(context.Background(), number.Number(1), &desc)) require.NoError(t, s.SynchronizedMove(ckpt, &desc)) - record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd) + record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd) value, err := ckpt.Sum() require.NoError(t, err) @@ -226,7 +225,7 @@ func TestSumFloatDataPoints(t *testing.T) { assert.NoError(t, s.Update(context.Background(), number.NewFloat64Number(1), &desc)) require.NoError(t, s.SynchronizedMove(ckpt, &desc)) - record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd) + record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd) value, err := ckpt.Sum() require.NoError(t, err) @@ -264,7 +263,7 @@ func TestLastValueIntDataPoints(t *testing.T) { assert.NoError(t, lv.Update(context.Background(), number.Number(100), &desc)) require.NoError(t, lv.SynchronizedMove(ckpt, &desc)) - record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd) + record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd) value, timestamp, err := ckpt.LastValue() require.NoError(t, err) @@ -299,7 +298,7 @@ func TestExactIntDataPoints(t *testing.T) { assert.NoError(t, e.Update(context.Background(), number.Number(100), &desc)) require.NoError(t, e.SynchronizedMove(ckpt, &desc)) - record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd) + record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd) pts, err := ckpt.Points() require.NoError(t, err) @@ -333,7 +332,7 @@ func TestExactFloatDataPoints(t *testing.T) { e, ckpt := &arrs[0], &arrs[1] assert.NoError(t, e.Update(context.Background(), number.NewFloat64Number(100), &desc)) require.NoError(t, e.SynchronizedMove(ckpt, &desc)) - record := export.NewRecord(&desc, &labels, nil, ckpt.Aggregation(), intervalStart, intervalEnd) + record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd) pts, err := ckpt.Points() require.NoError(t, err) @@ -364,7 +363,7 @@ func TestSumErrUnknownValueType(t *testing.T) { desc := metric.NewDescriptor("", sdkapi.ValueRecorderInstrumentKind, number.Kind(-1)) labels := attribute.NewSet() s := &sumAgg.New(1)[0] - record := export.NewRecord(&desc, &labels, nil, s, intervalStart, intervalEnd) + record := export.NewRecord(&desc, &labels, s, intervalStart, intervalEnd) value, err := s.Sum() require.NoError(t, err) @@ -448,12 +447,11 @@ func TestRecordAggregatorIncompatibleErrors(t *testing.T) { makeMpb := func(kind aggregation.Kind, agg aggregation.Aggregation) (*metricpb.Metric, error) { desc := metric.NewDescriptor("things", sdkapi.CounterInstrumentKind, number.Int64Kind) labels := attribute.NewSet() - res := resource.Empty() test := &testAgg{ kind: kind, agg: agg, } - return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, res, test, intervalStart, intervalEnd)) + return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, test, intervalStart, intervalEnd)) } mpb, err := makeMpb(aggregation.SumKind, &lastvalue.New(1)[0]) @@ -485,8 +483,7 @@ func TestRecordAggregatorUnexpectedErrors(t *testing.T) { makeMpb := func(kind aggregation.Kind, agg aggregation.Aggregation) (*metricpb.Metric, error) { desc := metric.NewDescriptor("things", sdkapi.CounterInstrumentKind, number.Int64Kind) labels := attribute.NewSet() - res := resource.Empty() - return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, res, agg, intervalStart, intervalEnd)) + return Record(export.CumulativeExportKindSelector(), export.NewRecord(&desc, &labels, agg, intervalStart, intervalEnd)) } errEx := fmt.Errorf("timeout") diff --git a/exporters/otlp/otlpmetric/internal/otlpmetrictest/data.go b/exporters/otlp/otlpmetric/internal/otlpmetrictest/data.go index e9c5c0cc0..1e7144ad7 100644 --- a/exporters/otlp/otlpmetric/internal/otlpmetrictest/data.go +++ b/exporters/otlp/otlpmetric/internal/otlpmetrictest/data.go @@ -25,7 +25,6 @@ import ( "go.opentelemetry.io/otel/metric/sdkapi" exportmetric "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" - "go.opentelemetry.io/otel/sdk/resource" ) // Used to avoid implementing locking functions for test @@ -63,7 +62,6 @@ func (OneRecordCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelect sdkapi.CounterInstrumentKind, number.Int64Kind, ) - res := resource.NewSchemaless(attribute.String("a", "b")) agg := sum.New(1) if err := agg[0].Update(context.Background(), number.NewInt64Number(42), &desc); err != nil { return err @@ -71,7 +69,7 @@ func (OneRecordCheckpointSet) ForEach(kindSelector exportmetric.ExportKindSelect start := time.Date(2020, time.December, 8, 19, 15, 0, 0, time.UTC) end := time.Date(2020, time.December, 8, 19, 16, 0, 0, time.UTC) labels := attribute.NewSet(attribute.String("abc", "def"), attribute.Int64("one", 1)) - rec := exportmetric.NewRecord(&desc, &labels, res, agg[0].Aggregation(), start, end) + rec := exportmetric.NewRecord(&desc, &labels, agg[0].Aggregation(), start, end) return recordFunc(rec) } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go index 11465a058..e55d240d9 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpmetrictest" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/sdk/resource" "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" @@ -40,6 +41,8 @@ import ( var ( oneRecord = otlpmetrictest.OneRecordCheckpointSet{} + + testResource = resource.Empty() ) func TestNewExporter_endToEnd(t *testing.T) { @@ -183,11 +186,11 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test // first export, it will send disconnected message to the channel on export failure, // trigger almost immediate reconnection - require.Error(t, exp.Export(ctx, oneRecord)) + require.Error(t, exp.Export(ctx, testResource, oneRecord)) // second export, it will detect connection issue, change state of exporter to disconnected and // send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel) - require.Error(t, exp.Export(ctx, oneRecord)) + require.Error(t, exp.Export(ctx, testResource, oneRecord)) // as a result we have exporter in disconnected state waiting for disconnection message to reconnect @@ -202,7 +205,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test for i := 0; i < n; i++ { // when disconnected exp.Export doesnt send disconnected messages again // it just quits and return last connection error - require.NoError(t, exp.Export(ctx, oneRecord)) + require.NoError(t, exp.Export(ctx, testResource, oneRecord)) } nmaMetrics := nmc.getMetrics() @@ -231,7 +234,7 @@ func TestExporterExportFailureAndRecoveryModes(t *testing.T) { { name: "Do not retry if succeeded", fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.NoError(t, exp.Export(ctx, oneRecord)) + require.NoError(t, exp.Export(ctx, testResource, oneRecord)) metrics := mc.getMetrics() @@ -245,7 +248,7 @@ func TestExporterExportFailureAndRecoveryModes(t *testing.T) { status.Error(codes.OK, ""), }, fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.NoError(t, exp.Export(ctx, oneRecord)) + require.NoError(t, exp.Export(ctx, testResource, oneRecord)) metrics := mc.getMetrics() @@ -267,7 +270,7 @@ func TestExporterExportFailureAndRecoveryModes(t *testing.T) { status.Error(codes.Unavailable, "backend under pressure"), }, fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.NoError(t, exp.Export(ctx, oneRecord)) + require.NoError(t, exp.Export(ctx, testResource, oneRecord)) metrics := mc.getMetrics() @@ -287,7 +290,7 @@ func TestExporterExportFailureAndRecoveryModes(t *testing.T) { status.Error(codes.InvalidArgument, "invalid arguments"), }, fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.Error(t, exp.Export(ctx, oneRecord)) + require.Error(t, exp.Export(ctx, testResource, oneRecord)) metric := mc.getMetrics() @@ -313,7 +316,7 @@ func TestExporterExportFailureAndRecoveryModes(t *testing.T) { status.Error(codes.DataLoss, ""), }, fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - require.NoError(t, exp.Export(ctx, oneRecord)) + require.NoError(t, exp.Export(ctx, testResource, oneRecord)) metrics := mc.getMetrics() @@ -336,7 +339,7 @@ func TestExporterExportFailureAndRecoveryModes(t *testing.T) { newThrottlingError(codes.ResourceExhausted, time.Second*30), }, fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - err := exp.Export(ctx, oneRecord) + err := exp.Export(ctx, testResource, oneRecord) require.Error(t, err) require.Equal(t, "context deadline exceeded", err.Error()) @@ -358,7 +361,7 @@ func TestExporterExportFailureAndRecoveryModes(t *testing.T) { newThrottlingError(codes.ResourceExhausted, time.Minute), }, fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - err := exp.Export(ctx, oneRecord) + err := exp.Export(ctx, testResource, oneRecord) require.Error(t, err) require.Equal(t, "max elapsed time expired when respecting server throttle: rpc error: code = ResourceExhausted desc = ", err.Error()) @@ -385,7 +388,7 @@ func TestExporterExportFailureAndRecoveryModes(t *testing.T) { status.Error(codes.Unavailable, "unavailable"), }, fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - err := exp.Export(ctx, oneRecord) + err := exp.Export(ctx, testResource, oneRecord) require.Error(t, err) require.Equal(t, "max elapsed time expired: rpc error: code = Unavailable desc = unavailable", err.Error()) @@ -405,7 +408,7 @@ func TestExporterExportFailureAndRecoveryModes(t *testing.T) { status.Error(codes.Unavailable, "unavailable"), }, fn: func(t *testing.T, ctx context.Context, exp *otlpmetric.Exporter, mc *mockCollector) { - err := exp.Export(ctx, oneRecord) + err := exp.Export(ctx, testResource, oneRecord) require.Error(t, err) require.Equal(t, "rpc error: code = Unavailable desc = unavailable", err.Error()) @@ -468,7 +471,7 @@ func TestPermanentErrorsShouldNotBeRetried(t *testing.T) { exp := newGRPCExporter(t, ctx, mc.endpoint) - err := exp.Export(ctx, oneRecord) + err := exp.Export(ctx, testResource, oneRecord) require.Error(t, err) require.Len(t, mc.getMetrics(), 0) require.Equal(t, 1, mc.metricSvc.requests, "metric service must receive 1 permanent error requests.") @@ -509,7 +512,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { for j := 0; j < 3; j++ { // No endpoint up. - require.Error(t, exp.Export(ctx, oneRecord)) + require.Error(t, exp.Export(ctx, testResource, oneRecord)) // Now resurrect the collector by making a new one but reusing the // old endpoint, and the collector should reconnect automatically. @@ -520,7 +523,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { n := 10 for i := 0; i < n; i++ { - require.NoError(t, exp.Export(ctx, oneRecord)) + require.NoError(t, exp.Export(ctx, testResource, oneRecord)) } nmaMetrics := nmc.getMetrics() @@ -582,7 +585,7 @@ func TestNewExporter_withHeaders(t *testing.T) { ctx := context.Background() exp := newGRPCExporter(t, ctx, mc.endpoint, otlpmetricgrpc.WithHeaders(map[string]string{"header1": "value1"})) - require.NoError(t, exp.Export(ctx, oneRecord)) + require.NoError(t, exp.Export(ctx, testResource, oneRecord)) defer func() { _ = exp.Shutdown(ctx) @@ -606,7 +609,7 @@ func TestNewExporter_WithTimeout(t *testing.T) { { name: "Timeout Metrics", fn: func(exp *otlpmetric.Exporter) error { - return exp.Export(context.Background(), oneRecord) + return exp.Export(context.Background(), testResource, oneRecord) }, timeout: time.Millisecond * 100, code: codes.DeadlineExceeded, @@ -616,7 +619,7 @@ func TestNewExporter_WithTimeout(t *testing.T) { { name: "No Timeout Metrics", fn: func(exp *otlpmetric.Exporter) error { - return exp.Export(context.Background(), oneRecord) + return exp.Export(context.Background(), testResource, oneRecord) }, timeout: time.Minute, metrics: 1, @@ -670,7 +673,7 @@ func TestNewExporter_withInvalidSecurityConfiguration(t *testing.T) { t.Fatalf("failed to create a new collector exporter: %v", err) } - err = exp.Export(ctx, oneRecord) + err = exp.Export(ctx, testResource, oneRecord) expectedErr := fmt.Sprintf("metrics exporter is disconnected from the server %s: grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)", mc.endpoint) @@ -698,7 +701,7 @@ func TestDisconnected(t *testing.T) { assert.NoError(t, exp.Shutdown(ctx)) }() - assert.Error(t, exp.Export(ctx, oneRecord)) + assert.Error(t, exp.Export(ctx, testResource, oneRecord)) } func TestEmptyData(t *testing.T) { @@ -716,7 +719,7 @@ func TestEmptyData(t *testing.T) { assert.NoError(t, exp.Shutdown(ctx)) }() - assert.NoError(t, exp.Export(ctx, otlpmetrictest.EmptyCheckpointSet{})) + assert.NoError(t, exp.Export(ctx, testResource, otlpmetrictest.EmptyCheckpointSet{})) } func TestFailedMetricTransform(t *testing.T) { @@ -734,5 +737,5 @@ func TestFailedMetricTransform(t *testing.T) { assert.NoError(t, exp.Shutdown(ctx)) }() - assert.Error(t, exp.Export(ctx, otlpmetrictest.FailCheckpointSet{})) + assert.Error(t, exp.Export(ctx, testResource, otlpmetrictest.FailCheckpointSet{})) } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod index 143a41e82..f2a001c8f 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod @@ -7,6 +7,7 @@ require ( go.opentelemetry.io/otel v1.0.0-RC2 go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.22.0 go.opentelemetry.io/otel/metric v0.22.0 + go.opentelemetry.io/otel/sdk v1.0.0-RC2 go.opentelemetry.io/otel/sdk/metric v0.22.0 go.opentelemetry.io/proto/otlp v0.9.0 google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go index 763a3475d..55bd67e11 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go @@ -23,6 +23,7 @@ import ( "time" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/sdk/resource" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -38,6 +39,8 @@ const ( var ( oneRecord = otlpmetrictest.OneRecordCheckpointSet{} + + testResource = resource.Empty() ) var ( @@ -163,7 +166,7 @@ func TestRetry(t *testing.T) { defer func() { assert.NoError(t, exporter.Shutdown(ctx)) }() - err = exporter.Export(ctx, oneRecord) + err = exporter.Export(ctx, testResource, oneRecord) assert.NoError(t, err) assert.Len(t, mc.GetMetrics(), 1) } @@ -185,7 +188,7 @@ func TestTimeout(t *testing.T) { defer func() { assert.NoError(t, exporter.Shutdown(ctx)) }() - err = exporter.Export(ctx, oneRecord) + err = exporter.Export(ctx, testResource, oneRecord) assert.Equal(t, true, os.IsTimeout(err)) } @@ -210,7 +213,7 @@ func TestRetryFailed(t *testing.T) { defer func() { assert.NoError(t, exporter.Shutdown(ctx)) }() - err = exporter.Export(ctx, oneRecord) + err = exporter.Export(ctx, testResource, oneRecord) assert.Error(t, err) assert.Empty(t, mc.GetMetrics()) } @@ -235,7 +238,7 @@ func TestNoRetry(t *testing.T) { defer func() { assert.NoError(t, exporter.Shutdown(ctx)) }() - err = exporter.Export(ctx, oneRecord) + err = exporter.Export(ctx, testResource, oneRecord) assert.Error(t, err) assert.Equal(t, fmt.Sprintf("failed to send metrics to http://%s/v1/metrics with HTTP status 400 Bad Request", mc.endpoint), err.Error()) assert.Empty(t, mc.GetMetrics()) @@ -256,7 +259,7 @@ func TestEmptyData(t *testing.T) { assert.NoError(t, exporter.Shutdown(ctx)) }() assert.NoError(t, err) - err = exporter.Export(ctx, oneRecord) + err = exporter.Export(ctx, testResource, oneRecord) assert.NoError(t, err) assert.NotEmpty(t, mc.GetMetrics()) } @@ -302,7 +305,7 @@ func TestUnreasonableMaxAttempts(t *testing.T) { defer func() { assert.NoError(t, exporter.Shutdown(ctx)) }() - err = exporter.Export(ctx, oneRecord) + err = exporter.Export(ctx, testResource, oneRecord) assert.Error(t, err) assert.Empty(t, mc.GetMetrics()) }) @@ -338,7 +341,7 @@ func TestUnreasonableBackoff(t *testing.T) { defer func() { assert.NoError(t, exporter.Shutdown(context.Background())) }() - err = exporter.Export(ctx, oneRecord) + err = exporter.Export(ctx, testResource, oneRecord) assert.Error(t, err) assert.Empty(t, mc.GetMetrics()) } @@ -363,7 +366,7 @@ func TestCancelledContext(t *testing.T) { assert.NoError(t, exporter.Shutdown(context.Background())) }() cancel() - _ = exporter.Export(ctx, oneRecord) + _ = exporter.Export(ctx, testResource, oneRecord) assert.Empty(t, mc.GetMetrics()) } @@ -390,7 +393,7 @@ func TestDeadlineContext(t *testing.T) { }() ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - err = exporter.Export(ctx, oneRecord) + err = exporter.Export(ctx, testResource, oneRecord) assert.Error(t, err) assert.Empty(t, mc.GetMetrics()) } @@ -418,7 +421,7 @@ func TestStopWhileExporting(t *testing.T) { }() doneCh := make(chan struct{}) go func() { - err := exporter.Export(ctx, oneRecord) + err := exporter.Export(ctx, testResource, oneRecord) assert.Error(t, err) assert.Empty(t, mc.GetMetrics()) close(doneCh) diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod index 61340430b..22d8a4060 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod @@ -6,6 +6,7 @@ require ( github.com/stretchr/testify v1.7.0 go.opentelemetry.io/otel v1.0.0-RC2 go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.22.0 + go.opentelemetry.io/otel/sdk v1.0.0-RC2 go.opentelemetry.io/proto/otlp v0.9.0 google.golang.org/protobuf v1.27.1 ) diff --git a/exporters/prometheus/prometheus.go b/exporters/prometheus/prometheus.go index c9dbe6c92..1b6fd1f3c 100644 --- a/exporters/prometheus/prometheus.go +++ b/exporters/prometheus/prometheus.go @@ -33,6 +33,7 @@ import ( export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" + "go.opentelemetry.io/otel/sdk/resource" ) // Exporter supports Prometheus pulls. It does not implement the @@ -153,7 +154,7 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) { _ = c.exp.Controller().ForEach(c.exp, func(record export.Record) error { var labelKeys []string - mergeLabels(record, &labelKeys, nil) + mergeLabels(record, c.exp.controller.Resource(), &labelKeys, nil) ch <- c.toDesc(record, labelKeys) return nil }) @@ -178,7 +179,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { instrumentKind := record.Descriptor().InstrumentKind() var labelKeys, labels []string - mergeLabels(record, &labelKeys, &labels) + mergeLabels(record, c.exp.controller.Resource(), &labelKeys, &labels) desc := c.toDesc(record, labelKeys) @@ -294,17 +295,17 @@ func (c *collector) toDesc(record export.Record, labelKeys []string) *prometheus // duplicate keys. This outputs one or both of the keys and the // values as a slice, and either argument may be nil to avoid // allocating an unnecessary slice. -func mergeLabels(record export.Record, keys, values *[]string) { +func mergeLabels(record export.Record, res *resource.Resource, keys, values *[]string) { if keys != nil { - *keys = make([]string, 0, record.Labels().Len()+record.Resource().Len()) + *keys = make([]string, 0, record.Labels().Len()+res.Len()) } if values != nil { - *values = make([]string, 0, record.Labels().Len()+record.Resource().Len()) + *values = make([]string, 0, record.Labels().Len()+res.Len()) } // Duplicate keys are resolved by taking the record label value over // the resource value. - mi := attribute.NewMergeIterator(record.Labels(), record.Resource().Set()) + mi := attribute.NewMergeIterator(record.Labels(), res.Set()) for mi.Next() { label := mi.Label() if keys != nil { diff --git a/exporters/stdout/stdoutmetric/metric.go b/exporters/stdout/stdoutmetric/metric.go index 9355e143d..922d473f8 100644 --- a/exporters/stdout/stdoutmetric/metric.go +++ b/exporters/stdout/stdoutmetric/metric.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/otel/metric" exportmetric "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" + "go.opentelemetry.io/otel/sdk/resource" ) type metricExporter struct { @@ -49,14 +50,14 @@ func (e *metricExporter) ExportKindFor(desc *metric.Descriptor, kind aggregation return exportmetric.StatelessExportKindSelector().ExportKindFor(desc, kind) } -func (e *metricExporter) Export(_ context.Context, checkpointSet exportmetric.CheckpointSet) error { +func (e *metricExporter) Export(_ context.Context, res *resource.Resource, checkpointSet exportmetric.CheckpointSet) error { var aggError error var batch []line aggError = checkpointSet.ForEach(e, func(record exportmetric.Record) error { desc := record.Descriptor() agg := record.Aggregation() kind := desc.NumberKind() - encodedResource := record.Resource().Encoded(e.config.LabelEncoder) + encodedResource := res.Encoded(e.config.LabelEncoder) var instLabels []attribute.KeyValue if name := desc.InstrumentationName(); name != "" { diff --git a/exporters/stdout/stdoutmetric/metric_test.go b/exporters/stdout/stdoutmetric/metric_test.go index c30c3d446..d1f3c2265 100644 --- a/exporters/stdout/stdoutmetric/metric_test.go +++ b/exporters/stdout/stdoutmetric/metric_test.go @@ -207,47 +207,62 @@ func TestStdoutNoData(t *testing.T) { func TestStdoutResource(t *testing.T) { type testCase struct { + name string expect string res *resource.Resource attrs []attribute.KeyValue } - newCase := func(expect string, res *resource.Resource, attrs ...attribute.KeyValue) testCase { + newCase := func(name, expect string, res *resource.Resource, attrs ...attribute.KeyValue) testCase { return testCase{ + name: name, expect: expect, res: res, attrs: attrs, } } testCases := []testCase{ - newCase("R1=V1,R2=V2,instrumentation.name=test,A=B,C=D", + newCase("resource and attribute", + "R1=V1,R2=V2,instrumentation.name=test,A=B,C=D", resource.NewSchemaless(attribute.String("R1", "V1"), attribute.String("R2", "V2")), attribute.String("A", "B"), attribute.String("C", "D")), - newCase("R1=V1,R2=V2,instrumentation.name=test", + newCase("only resource", + "R1=V1,R2=V2,instrumentation.name=test", resource.NewSchemaless(attribute.String("R1", "V1"), attribute.String("R2", "V2")), ), - newCase("instrumentation.name=test,A=B,C=D", - nil, + newCase("empty resource", + "instrumentation.name=test,A=B,C=D", + resource.Empty(), + attribute.String("A", "B"), + attribute.String("C", "D"), + ), + newCase("default resource", + fmt.Sprint(resource.Default().Encoded(attribute.DefaultEncoder()), + ",instrumentation.name=test,A=B,C=D"), + resource.Default(), attribute.String("A", "B"), attribute.String("C", "D"), ), // We explicitly do not de-duplicate between resources // and metric labels in this exporter. - newCase("R1=V1,R2=V2,instrumentation.name=test,R1=V3,R2=V4", + newCase("resource deduplication", + "R1=V1,R2=V2,instrumentation.name=test,R1=V3,R2=V4", resource.NewSchemaless(attribute.String("R1", "V1"), attribute.String("R2", "V2")), attribute.String("R1", "V3"), attribute.String("R2", "V4")), } for _, tc := range testCases { - ctx := context.Background() - fix := newFixtureWithResource(t, tc.res) + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + fix := newFixtureWithResource(t, tc.res) - counter := metric.Must(fix.meter).NewFloat64Counter("name.lastvalue") - counter.Add(ctx, 123.456, tc.attrs...) + counter := metric.Must(fix.meter).NewFloat64Counter("name.lastvalue") + counter.Add(ctx, 123.456, tc.attrs...) - require.NoError(t, fix.cont.Stop(fix.ctx)) + require.NoError(t, fix.cont.Stop(fix.ctx)) - require.Equal(t, `[{"Name":"name.lastvalue{`+tc.expect+`}","Last":123.456}]`, fix.Output()) + require.Equal(t, `[{"Name":"name.lastvalue{`+tc.expect+`}","Last":123.456}]`, fix.Output()) + }) } } diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index b10813ba4..77b42fa1c 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -211,7 +211,7 @@ type Exporter interface { // // The CheckpointSet interface refers to the Processor that just // completed collection. - Export(ctx context.Context, checkpointSet CheckpointSet) error + Export(ctx context.Context, resource *resource.Resource, checkpointSet CheckpointSet) error // ExportKindSelector is an interface used by the Processor // in deciding whether to compute Delta or Cumulative @@ -269,7 +269,6 @@ type CheckpointSet interface { type Metadata struct { descriptor *metric.Descriptor labels *attribute.Set - resource *resource.Resource } // Accumulation contains the exported data for a single metric instrument @@ -300,21 +299,15 @@ func (m Metadata) Labels() *attribute.Set { return m.labels } -// Resource contains common attributes that apply to this metric event. -func (m Metadata) Resource() *resource.Resource { - return m.resource -} - // NewAccumulation allows Accumulator implementations to construct new -// Accumulations to send to Processors. The Descriptor, Labels, Resource, +// Accumulations to send to Processors. The Descriptor, Labels, // and Aggregator represent aggregate metric events received over a single // collection period. -func NewAccumulation(descriptor *metric.Descriptor, labels *attribute.Set, resource *resource.Resource, aggregator Aggregator) Accumulation { +func NewAccumulation(descriptor *metric.Descriptor, labels *attribute.Set, aggregator Aggregator) Accumulation { return Accumulation{ Metadata: Metadata{ descriptor: descriptor, labels: labels, - resource: resource, }, aggregator: aggregator, } @@ -329,12 +322,11 @@ func (r Accumulation) Aggregator() Aggregator { // NewRecord allows Processor implementations to construct export // records. The Descriptor, Labels, and Aggregator represent // aggregate metric events received over a single collection period. -func NewRecord(descriptor *metric.Descriptor, labels *attribute.Set, resource *resource.Resource, aggregation aggregation.Aggregation, start, end time.Time) Record { +func NewRecord(descriptor *metric.Descriptor, labels *attribute.Set, aggregation aggregation.Aggregation, start, end time.Time) Record { return Record{ Metadata: Metadata{ descriptor: descriptor, labels: labels, - resource: resource, }, aggregation: aggregation, start: start, diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 63c421f01..b29adf0a0 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -42,7 +42,7 @@ func newFixture(b *testing.B) *benchFixture { AggregatorSelector: processortest.AggregatorSelector(), } - bf.accumulator = sdk.NewAccumulator(bf, nil) + bf.accumulator = sdk.NewAccumulator(bf) bf.meter = metric.WrapMeterImpl(bf.accumulator, "benchmarks") return bf } diff --git a/sdk/metric/controller/basic/config.go b/sdk/metric/controller/basic/config.go index 77b5b9024..cbc91a8f5 100644 --- a/sdk/metric/controller/basic/config.go +++ b/sdk/metric/controller/basic/config.go @@ -68,17 +68,17 @@ type Option interface { // WithResource sets the Resource configuration option of a Config by merging it // with the Resource configuration in the environment. func WithResource(r *resource.Resource) Option { - res, err := resource.Merge(resource.Environment(), r) - if err != nil { - otel.Handle(err) - } - return resourceOption{res} + return resourceOption{r} } type resourceOption struct{ *resource.Resource } func (o resourceOption) apply(cfg *config) { - cfg.Resource = o.Resource + res, err := resource.Merge(cfg.Resource, o.Resource) + if err != nil { + otel.Handle(err) + } + cfg.Resource = res } // WithCollectPeriod sets the CollectPeriod configuration option of a Config. diff --git a/sdk/metric/controller/basic/controller.go b/sdk/metric/controller/basic/controller.go index 985980212..0f79e0250 100644 --- a/sdk/metric/controller/basic/controller.go +++ b/sdk/metric/controller/basic/controller.go @@ -59,6 +59,7 @@ type Controller struct { accumulator *sdk.Accumulator provider *registry.MeterProvider checkpointer export.Checkpointer + resource *resource.Resource exporter export.Exporter wg sync.WaitGroup stopCh chan struct{} @@ -88,16 +89,19 @@ func New(checkpointer export.Checkpointer, opts ...Option) *Controller { } if c.Resource == nil { c.Resource = resource.Default() + } else { + var err error + c.Resource, err = resource.Merge(resource.Environment(), c.Resource) + if err != nil { + otel.Handle(err) + } } - - impl := sdk.NewAccumulator( - checkpointer, - c.Resource, - ) + impl := sdk.NewAccumulator(checkpointer) return &Controller{ provider: registry.NewMeterProvider(impl), accumulator: impl, checkpointer: checkpointer, + resource: c.Resource, exporter: c.Exporter, stopCh: nil, clock: controllerTime.RealClock{}, @@ -121,6 +125,12 @@ func (c *Controller) MeterProvider() metric.MeterProvider { return c.provider } +// Resource returns the *resource.Resource associated with this +// controller. +func (c *Controller) Resource() *resource.Resource { + return c.resource +} + // Start begins a ticker that periodically collects and exports // metrics with the configured interval. This is required for calling // a configured Exporter (see WithExporter) and is otherwise optional @@ -257,7 +267,7 @@ func (c *Controller) export(ctx context.Context) error { defer cancel() } - return c.exporter.Export(ctx, ckpt) + return c.exporter.Export(ctx, c.resource, ckpt) } // ForEach gives the caller read-locked access to the current diff --git a/sdk/metric/controller/basic/controller_test.go b/sdk/metric/controller/basic/controller_test.go index 9bfced75b..b67ce74ee 100644 --- a/sdk/metric/controller/basic/controller_test.go +++ b/sdk/metric/controller/basic/controller_test.go @@ -61,9 +61,11 @@ func checkTestContext(t *testing.T, ctx context.Context) { } func TestControllerUsesResource(t *testing.T) { + const envVal = "T=U,key=value" store, err := ottest.SetEnvVariables(map[string]string{ - envVar: "key=value,T=U", + envVar: envVal, }) + require.NoError(t, err) defer func() { require.NoError(t, store.Restore()) }() @@ -75,48 +77,62 @@ func TestControllerUsesResource(t *testing.T) { { name: "explicitly empty resource", options: []controller.Option{controller.WithResource(resource.Empty())}, - wanted: resource.Environment().Encoded(attribute.DefaultEncoder())}, + wanted: envVal, + }, { name: "uses default if no resource option", options: nil, - wanted: resource.Default().Encoded(attribute.DefaultEncoder())}, + wanted: resource.Default().Encoded(attribute.DefaultEncoder()), + }, { name: "explicit resource", options: []controller.Option{controller.WithResource(resource.NewSchemaless(attribute.String("R", "S")))}, - wanted: "R=S,T=U,key=value"}, + wanted: "R=S," + envVal, + }, { - name: "last resource wins", + name: "multi resource", options: []controller.Option{ - controller.WithResource(resource.Default()), + controller.WithResource(resource.NewSchemaless(attribute.String("R", "WRONG"))), controller.WithResource(resource.NewSchemaless(attribute.String("R", "S"))), + controller.WithResource(resource.NewSchemaless(attribute.String("W", "X"))), + controller.WithResource(resource.NewSchemaless(attribute.String("T", "V"))), }, - wanted: "R=S,T=U,key=value"}, + wanted: "R=S,T=V,W=X,key=value", + }, { - name: "overlapping attributes with environment resource", - options: []controller.Option{controller.WithResource(resource.NewSchemaless(attribute.String("T", "V")))}, - wanted: "T=V,key=value"}, + name: "user override environment", + options: []controller.Option{ + controller.WithResource(resource.NewSchemaless(attribute.String("T", "V"))), + controller.WithResource(resource.NewSchemaless(attribute.String("key", "I win"))), + }, + wanted: "T=V,key=I win", + }, } for _, c := range cases { t.Run(fmt.Sprintf("case-%s", c.name), func(t *testing.T) { + sel := export.CumulativeExportKindSelector() + exp := processortest.New(sel, attribute.DefaultEncoder()) cont := controller.New( processor.New( processortest.AggregatorSelector(), - export.CumulativeExportKindSelector(), + exp, ), - c.options..., + append(c.options, controller.WithExporter(exp))..., ) + ctx := context.Background() + require.NoError(t, cont.Start(ctx)) prov := cont.MeterProvider() ctr := metric.Must(prov.Meter("named")).NewFloat64Counter("calls.sum") ctr.Add(context.Background(), 1.) // Collect once - require.NoError(t, cont.Collect(context.Background())) + require.NoError(t, cont.Stop(ctx)) expect := map[string]float64{ "calls.sum//" + c.wanted: 1., } - require.EqualValues(t, expect, getMap(t, cont)) + require.EqualValues(t, expect, exp.Values()) }) } } @@ -268,9 +284,9 @@ func newBlockingExporter() *blockingExporter { } } -func (b *blockingExporter) Export(ctx context.Context, output export.CheckpointSet) error { +func (b *blockingExporter) Export(ctx context.Context, res *resource.Resource, output export.CheckpointSet) error { var err error - _ = b.exporter.Export(ctx, output) + _ = b.exporter.Export(ctx, res, output) if b.calls == 0 { // timeout once <-ctx.Done() diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 7081f8ff8..98f2512c7 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -30,11 +30,9 @@ import ( "go.opentelemetry.io/otel/sdk/export/metric/aggregation" metricsdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/processor/processortest" - "go.opentelemetry.io/otel/sdk/resource" ) var Must = metric.Must -var testResource = resource.NewSchemaless(attribute.String("R", "V")) type handler struct { sync.Mutex @@ -87,7 +85,6 @@ func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *testSelector, ) accum := metricsdk.NewAccumulator( processor, - testResource, ) meter := metric.WrapMeterImpl(accum, "test") return meter, accum, testSelector, processor @@ -109,7 +106,7 @@ func TestInputRangeCounter(t *testing.T) { counter.Add(ctx, 1) checkpointed = sdk.Collect(ctx) require.Equal(t, map[string]float64{ - "name.sum//R=V": 1, + "name.sum//": 1, }, processor.Values()) require.Equal(t, 1, checkpointed) require.Nil(t, testHandler.Flush()) @@ -128,7 +125,7 @@ func TestInputRangeUpDownCounter(t *testing.T) { checkpointed := sdk.Collect(ctx) require.Equal(t, map[string]float64{ - "name.sum//R=V": 1, + "name.sum//": 1, }, processor.Values()) require.Equal(t, 1, checkpointed) require.Nil(t, testHandler.Flush()) @@ -153,7 +150,7 @@ func TestInputRangeValueRecorder(t *testing.T) { checkpointed = sdk.Collect(ctx) require.Equal(t, map[string]float64{ - "name.exact//R=V": 3, + "name.exact//": 3, }, processor.Values()) require.Equal(t, 1, checkpointed) require.Nil(t, testHandler.Flush()) @@ -225,7 +222,7 @@ func TestSDKLabelsDeduplication(t *testing.T) { counter.Add(ctx, 1, kvsA...) format := func(attrs []attribute.KeyValue) string { str := attribute.DefaultEncoder().Encode(newSetIter(attrs...)) - return fmt.Sprint("name.sum/", str, "/R=V") + return fmt.Sprint("name.sum/", str, "/") } allExpect[format(expectA)] += 2 @@ -329,20 +326,20 @@ func TestObserverCollection(t *testing.T) { mult := float64(mult) require.EqualValues(t, map[string]float64{ - "float.valueobserver.lastvalue/A=B/R=V": -mult, - "float.valueobserver.lastvalue/C=D/R=V": -mult, - "int.valueobserver.lastvalue//R=V": mult, - "int.valueobserver.lastvalue/A=B/R=V": mult, + "float.valueobserver.lastvalue/A=B/": -mult, + "float.valueobserver.lastvalue/C=D/": -mult, + "int.valueobserver.lastvalue//": mult, + "int.valueobserver.lastvalue/A=B/": mult, - "float.sumobserver.sum/A=B/R=V": 2 * mult, - "float.sumobserver.sum/C=D/R=V": mult, - "int.sumobserver.sum//R=V": mult, - "int.sumobserver.sum/A=B/R=V": mult, + "float.sumobserver.sum/A=B/": 2 * mult, + "float.sumobserver.sum/C=D/": mult, + "int.sumobserver.sum//": mult, + "int.sumobserver.sum/A=B/": mult, - "float.updownsumobserver.sum/A=B/R=V": -2 * mult, - "float.updownsumobserver.sum/C=D/R=V": mult, - "int.updownsumobserver.sum//R=V": -mult, - "int.updownsumobserver.sum/A=B/R=V": mult, + "float.updownsumobserver.sum/A=B/": -2 * mult, + "float.updownsumobserver.sum/C=D/": mult, + "int.updownsumobserver.sum//": -mult, + "int.updownsumobserver.sum/A=B/": mult, }, processor.Values()) } } @@ -429,20 +426,20 @@ func TestObserverBatch(t *testing.T) { require.Equal(t, collected, len(processor.Values())) require.EqualValues(t, map[string]float64{ - "float.sumobserver.sum//R=V": 1.1, - "float.sumobserver.sum/A=B/R=V": 1000, - "int.sumobserver.sum//R=V": 10, - "int.sumobserver.sum/A=B/R=V": 100, + "float.sumobserver.sum//": 1.1, + "float.sumobserver.sum/A=B/": 1000, + "int.sumobserver.sum//": 10, + "int.sumobserver.sum/A=B/": 100, - "int.updownsumobserver.sum/A=B/R=V": -100, - "float.updownsumobserver.sum/A=B/R=V": -1000, - "int.updownsumobserver.sum//R=V": 10, - "float.updownsumobserver.sum/C=D/R=V": -1, + "int.updownsumobserver.sum/A=B/": -100, + "float.updownsumobserver.sum/A=B/": -1000, + "int.updownsumobserver.sum//": 10, + "float.updownsumobserver.sum/C=D/": -1, - "float.valueobserver.lastvalue/A=B/R=V": -1, - "float.valueobserver.lastvalue/C=D/R=V": -1, - "int.valueobserver.lastvalue//R=V": 1, - "int.valueobserver.lastvalue/A=B/R=V": 1, + "float.valueobserver.lastvalue/A=B/": -1, + "float.valueobserver.lastvalue/C=D/": -1, + "int.valueobserver.lastvalue//": 1, + "int.valueobserver.lastvalue/A=B/": 1, }, processor.Values()) } @@ -470,10 +467,10 @@ func TestRecordBatch(t *testing.T) { sdk.Collect(ctx) require.EqualValues(t, map[string]float64{ - "int64.sum/A=B,C=D/R=V": 1, - "float64.sum/A=B,C=D/R=V": 2, - "int64.exact/A=B,C=D/R=V": 3, - "float64.exact/A=B,C=D/R=V": 4, + "int64.sum/A=B,C=D/": 1, + "float64.sum/A=B,C=D/": 2, + "int64.exact/A=B,C=D/": 3, + "float64.exact/A=B,C=D/": 4, }, processor.Values()) } @@ -549,7 +546,7 @@ func TestSyncInAsync(t *testing.T) { sdk.Collect(ctx) require.EqualValues(t, map[string]float64{ - "counter.sum//R=V": 100, - "observer.lastvalue//R=V": 10, + "counter.sum//": 100, + "observer.lastvalue//": 10, }, processor.Values()) } diff --git a/sdk/metric/processor/basic/basic.go b/sdk/metric/processor/basic/basic.go index 4cb8d6728..8fb24d091 100644 --- a/sdk/metric/processor/basic/basic.go +++ b/sdk/metric/processor/basic/basic.go @@ -24,7 +24,6 @@ import ( "go.opentelemetry.io/otel/metric" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" - "go.opentelemetry.io/otel/sdk/resource" ) type ( @@ -38,27 +37,23 @@ type ( stateKey struct { // TODO: This code is organized to support multiple // accumulators which could theoretically produce the - // data for the same instrument with the same - // resources, and this code has logic to combine data - // properly from multiple accumulators. However, the - // use of *metric.Descriptor in the stateKey makes - // such combination impossible, because each - // accumulator allocates its own instruments. This - // can be fixed by using the instrument name and kind - // instead of the descriptor pointer. See + // data for the same instrument, and this code has + // logic to combine data properly from multiple + // accumulators. However, the use of + // *metric.Descriptor in the stateKey makes such + // combination impossible, because each accumulator + // allocates its own instruments. This can be fixed + // by using the instrument name and kind instead of + // the descriptor pointer. See // https://github.com/open-telemetry/opentelemetry-go/issues/862. descriptor *metric.Descriptor distinct attribute.Distinct - resource attribute.Distinct } stateValue struct { // labels corresponds to the stateKey.distinct field. labels *attribute.Set - // resource corresponds to the stateKey.resource field. - resource *resource.Resource - // updated indicates the last sequence number when this value had // Process() called by an accumulator. updated int64 @@ -157,7 +152,6 @@ func (b *Processor) Process(accum export.Accumulation) error { key := stateKey{ descriptor: desc, distinct: accum.Labels().Equivalent(), - resource: accum.Resource().Equivalent(), } agg := accum.Aggregator() @@ -168,7 +162,6 @@ func (b *Processor) Process(accum export.Accumulation) error { newValue := &stateValue{ labels: accum.Labels(), - resource: accum.Resource(), updated: b.state.finishedCollection, stateful: stateful, current: agg, @@ -369,7 +362,6 @@ func (b *state) ForEach(exporter export.ExportKindSelector, f func(export.Record if err := f(export.NewRecord( key.descriptor, value.labels, - value.resource, agg, start, b.intervalEnd, diff --git a/sdk/metric/processor/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go index 41f7f0708..0bb3a6386 100644 --- a/sdk/metric/processor/basic/basic_test.go +++ b/sdk/metric/processor/basic/basic_test.go @@ -108,13 +108,13 @@ func asNumber(nkind number.Kind, value int64) number.Number { return number.NewFloat64Number(float64(value)) } -func updateFor(t *testing.T, desc *metric.Descriptor, selector export.AggregatorSelector, res *resource.Resource, value int64, labs ...attribute.KeyValue) export.Accumulation { +func updateFor(t *testing.T, desc *metric.Descriptor, selector export.AggregatorSelector, value int64, labs ...attribute.KeyValue) export.Accumulation { ls := attribute.NewSet(labs...) var agg export.Aggregator selector.AggregatorFor(desc, &agg) require.NoError(t, agg.Update(context.Background(), asNumber(desc.NumberKind(), value), desc)) - return export.NewAccumulation(desc, &ls, res, agg) + return export.NewAccumulation(desc, &ls, agg) } func testProcessor( @@ -127,7 +127,6 @@ func testProcessor( // Note: this selector uses the instrument name to dictate // aggregation kind. selector := processorTest.AggregatorSelector() - res := resource.NewSchemaless(attribute.String("R", "V")) labs1 := []attribute.KeyValue{attribute.String("L1", "V")} labs2 := []attribute.KeyValue{attribute.String("L2", "V")} @@ -154,8 +153,8 @@ func testProcessor( processor.StartCollection() for na := 0; na < nAccum; na++ { - _ = processor.Process(updateFor(t, &desc1, selector, res, input, labs1...)) - _ = processor.Process(updateFor(t, &desc2, selector, res, input, labs2...)) + _ = processor.Process(updateFor(t, &desc1, selector, input, labs1...)) + _ = processor.Process(updateFor(t, &desc2, selector, input, labs2...)) } err := processor.FinishCollection() @@ -235,8 +234,8 @@ func testProcessor( exp := map[string]float64{} if hasMemory || !repetitionAfterEmptyInterval { exp = map[string]float64{ - fmt.Sprintf("inst1%s/L1=V/R=V", instSuffix): float64(multiplier * 10), // labels1 - fmt.Sprintf("inst2%s/L2=V/R=V", instSuffix): float64(multiplier * 10), // labels2 + fmt.Sprintf("inst1%s/L1=V/", instSuffix): float64(multiplier * 10), // labels1 + fmt.Sprintf("inst2%s/L2=V/", instSuffix): float64(multiplier * 10), // labels2 } } @@ -302,7 +301,7 @@ func TestBasicInconsistent(t *testing.T) { b = basic.New(processorTest.AggregatorSelector(), export.StatelessExportKindSelector()) desc := metric.NewDescriptor("inst", sdkapi.CounterInstrumentKind, number.Int64Kind) - accum := export.NewAccumulation(&desc, attribute.EmptySet(), resource.Empty(), aggregatortest.NoopAggregator{}) + accum := export.NewAccumulation(&desc, attribute.EmptySet(), aggregatortest.NoopAggregator{}) require.Equal(t, basic.ErrInconsistentState, b.Process(accum)) // Test invalid kind: @@ -327,7 +326,7 @@ func TestBasicTimestamps(t *testing.T) { afterNew := time.Now() desc := metric.NewDescriptor("inst", sdkapi.CounterInstrumentKind, number.Int64Kind) - accum := export.NewAccumulation(&desc, attribute.EmptySet(), resource.Empty(), aggregatortest.NoopAggregator{}) + accum := export.NewAccumulation(&desc, attribute.EmptySet(), aggregatortest.NoopAggregator{}) b.StartCollection() _ = b.Process(accum) @@ -369,7 +368,6 @@ func TestBasicTimestamps(t *testing.T) { } func TestStatefulNoMemoryCumulative(t *testing.T) { - res := resource.NewSchemaless(attribute.String("R", "V")) ekindSel := export.CumulativeExportKindSelector() desc := metric.NewDescriptor("inst.sum", sdkapi.CounterInstrumentKind, number.Int64Kind) @@ -390,20 +388,19 @@ func TestStatefulNoMemoryCumulative(t *testing.T) { // Add 10 processor.StartCollection() - _ = processor.Process(updateFor(t, &desc, selector, res, 10, attribute.String("A", "B"))) + _ = processor.Process(updateFor(t, &desc, selector, 10, attribute.String("A", "B"))) require.NoError(t, processor.FinishCollection()) // Verify one element records = processorTest.NewOutput(attribute.DefaultEncoder()) require.NoError(t, checkpointSet.ForEach(ekindSel, records.AddRecord)) require.EqualValues(t, map[string]float64{ - "inst.sum/A=B/R=V": float64(i * 10), + "inst.sum/A=B/": float64(i * 10), }, records.Map()) } } func TestStatefulNoMemoryDelta(t *testing.T) { - res := resource.NewSchemaless(attribute.String("R", "V")) ekindSel := export.DeltaExportKindSelector() desc := metric.NewDescriptor("inst.sum", sdkapi.SumObserverInstrumentKind, number.Int64Kind) @@ -424,14 +421,14 @@ func TestStatefulNoMemoryDelta(t *testing.T) { // Add 10 processor.StartCollection() - _ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), attribute.String("A", "B"))) + _ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))) require.NoError(t, processor.FinishCollection()) // Verify one element records = processorTest.NewOutput(attribute.DefaultEncoder()) require.NoError(t, checkpointSet.ForEach(ekindSel, records.AddRecord)) require.EqualValues(t, map[string]float64{ - "inst.sum/A=B/R=V": 10, + "inst.sum/A=B/": 10, }, records.Map()) } } @@ -442,7 +439,6 @@ func TestMultiObserverSum(t *testing.T) { export.DeltaExportKindSelector(), } { - res := resource.NewSchemaless(attribute.String("R", "V")) desc := metric.NewDescriptor("observe.sum", sdkapi.SumObserverInstrumentKind, number.Int64Kind) selector := processorTest.AggregatorSelector() @@ -452,9 +448,9 @@ func TestMultiObserverSum(t *testing.T) { for i := 1; i < 3; i++ { // Add i*10*3 times processor.StartCollection() - _ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), attribute.String("A", "B"))) - _ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), attribute.String("A", "B"))) - _ = processor.Process(updateFor(t, &desc, selector, res, int64(i*10), attribute.String("A", "B"))) + _ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))) + _ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))) + _ = processor.Process(updateFor(t, &desc, selector, int64(i*10), attribute.String("A", "B"))) require.NoError(t, processor.FinishCollection()) // Multiplier is 1 for deltas, otherwise i. @@ -467,7 +463,7 @@ func TestMultiObserverSum(t *testing.T) { records := processorTest.NewOutput(attribute.DefaultEncoder()) require.NoError(t, checkpointSet.ForEach(ekindSel, records.AddRecord)) require.EqualValues(t, map[string]float64{ - "observe.sum/A=B/R=V": float64(3 * 10 * multiplier), + "observe.sum/A=B/": float64(3 * 10 * multiplier), }, records.Map()) } } @@ -480,7 +476,7 @@ func TestSumObserverEndToEnd(t *testing.T) { processorTest.AggregatorSelector(), eselector, ) - accum := sdk.NewAccumulator(proc, resource.Empty()) + accum := sdk.NewAccumulator(proc) meter := metric.WrapMeterImpl(accum, "testing") var calls int64 @@ -502,7 +498,7 @@ func TestSumObserverEndToEnd(t *testing.T) { require.NoError(t, proc.FinishCollection()) exporter := processortest.New(eselector, attribute.DefaultEncoder()) - require.NoError(t, exporter.Export(ctx, data)) + require.NoError(t, exporter.Export(ctx, resource.Empty(), data)) require.EqualValues(t, map[string]float64{ "observer.sum//": float64(i + 1), diff --git a/sdk/metric/processor/processortest/test.go b/sdk/metric/processor/processortest/test.go index 8c008c617..a58eeef47 100644 --- a/sdk/metric/processor/processortest/test.go +++ b/sdk/metric/processor/processortest/test.go @@ -220,7 +220,6 @@ func (o *Output) ForEach(_ export.ExportKindSelector, ff func(export.Record) err if err := ff(export.NewRecord( key.desc, value.labels, - value.resource, value.aggregator.Aggregation(), time.Time{}, time.Time{}, @@ -236,10 +235,14 @@ func (o *Output) ForEach(_ export.ExportKindSelector, ff func(export.Record) err // either the Sum() or the LastValue() of its Aggregation(), whichever // is defined. Record timestamps are ignored. func (o *Output) AddRecord(rec export.Record) error { + return o.AddRecordWithResource(rec, resource.Empty()) +} + +func (o *Output) AddRecordWithResource(rec export.Record, res *resource.Resource) error { key := mapKey{ desc: rec.Descriptor(), labels: rec.Labels().Equivalent(), - resource: rec.Resource().Equivalent(), + resource: res.Equivalent(), } if _, ok := o.m[key]; !ok { var agg export.Aggregator @@ -247,7 +250,7 @@ func (o *Output) AddRecord(rec export.Record) error { o.m[key] = mapValue{ aggregator: agg, labels: rec.Labels(), - resource: rec.Resource(), + resource: res, } } return o.m[key].aggregator.Merge(rec.Aggregation().(export.Aggregator), rec.Descriptor()) @@ -306,7 +309,6 @@ func (o *Output) AddAccumulation(acc export.Accumulation) error { export.NewRecord( acc.Descriptor(), acc.Labels(), - acc.Resource(), acc.Aggregator().Aggregation(), time.Time{}, time.Time{}, @@ -330,7 +332,7 @@ func New(selector export.ExportKindSelector, encoder attribute.Encoder) *Exporte } } -func (e *Exporter) Export(_ context.Context, ckpt export.CheckpointSet) error { +func (e *Exporter) Export(_ context.Context, res *resource.Resource, ckpt export.CheckpointSet) error { e.output.Lock() defer e.output.Unlock() e.exportCount++ @@ -340,7 +342,7 @@ func (e *Exporter) Export(_ context.Context, ckpt export.CheckpointSet) error { return err } } - return e.output.AddRecord(r) + return e.output.AddRecordWithResource(r, res) }) } diff --git a/sdk/metric/processor/processortest/test_test.go b/sdk/metric/processor/processortest/test_test.go index 0b650ac4f..0c0b244f3 100644 --- a/sdk/metric/processor/processortest/test_test.go +++ b/sdk/metric/processor/processortest/test_test.go @@ -30,10 +30,7 @@ import ( func generateTestData(proc export.Processor) { ctx := context.Background() - accum := metricsdk.NewAccumulator( - proc, - resource.NewSchemaless(attribute.String("R", "V")), - ) + accum := metricsdk.NewAccumulator(proc) meter := metric.WrapMeterImpl(accum, "testing") counter := metric.Must(meter).NewFloat64Counter("counter.sum") @@ -62,6 +59,7 @@ func TestProcessorTesting(t *testing.T) { generateTestData(checkpointer) + res := resource.NewSchemaless(attribute.String("R", "V")) expect := map[string]float64{ "counter.sum/K1=V1/R=V": 100, "counter.sum/K1=V2/R=V": 101, @@ -69,16 +67,13 @@ func TestProcessorTesting(t *testing.T) { "observer.sum/K1=V2/R=V": 11, } - // Validate the processor's checkpoint directly. - require.EqualValues(t, expect, testProc.Values()) - // Export the data and validate it again. exporter := processorTest.New( export.StatelessExportKindSelector(), attribute.DefaultEncoder(), ) - err := exporter.Export(context.Background(), checkpointer.CheckpointSet()) + err := exporter.Export(context.Background(), res, checkpointer.CheckpointSet()) require.NoError(t, err) require.EqualValues(t, expect, exporter.Values()) } diff --git a/sdk/metric/processor/reducer/reducer.go b/sdk/metric/processor/reducer/reducer.go index 9a9947067..6b8f3cd6e 100644 --- a/sdk/metric/processor/reducer/reducer.go +++ b/sdk/metric/processor/reducer/reducer.go @@ -60,7 +60,6 @@ func (p *Processor) Process(accum export.Accumulation) error { export.NewAccumulation( accum.Descriptor(), &reduced, - accum.Resource(), accum.Aggregator(), ), ) diff --git a/sdk/metric/processor/reducer/reducer_test.go b/sdk/metric/processor/reducer/reducer_test.go index 0acfe8701..cd429c68c 100644 --- a/sdk/metric/processor/reducer/reducer_test.go +++ b/sdk/metric/processor/reducer/reducer_test.go @@ -75,15 +75,14 @@ func TestFilterProcessor(t *testing.T) { ) accum := metricsdk.NewAccumulator( reducer.New(testFilter{}, processorTest.Checkpointer(testProc)), - resource.NewSchemaless(attribute.String("R", "V")), ) generateData(accum) accum.Collect(context.Background()) require.EqualValues(t, map[string]float64{ - "counter.sum/A=1,C=3/R=V": 200, - "observer.sum/A=1,C=3/R=V": 20, + "counter.sum/A=1,C=3/": 200, + "observer.sum/A=1,C=3/": 20, }, testProc.Values()) } @@ -92,7 +91,6 @@ func TestFilterBasicProcessor(t *testing.T) { basicProc := basic.New(processorTest.AggregatorSelector(), export.CumulativeExportKindSelector()) accum := metricsdk.NewAccumulator( reducer.New(testFilter{}, basicProc), - resource.NewSchemaless(attribute.String("R", "V")), ) exporter := processorTest.New(basicProc, attribute.DefaultEncoder()) @@ -104,7 +102,8 @@ func TestFilterBasicProcessor(t *testing.T) { t.Error(err) } - require.NoError(t, exporter.Export(context.Background(), basicProc.CheckpointSet())) + res := resource.NewSchemaless(attribute.String("R", "V")) + require.NoError(t, exporter.Export(context.Background(), res, basicProc.CheckpointSet())) require.EqualValues(t, map[string]float64{ "counter.sum/A=1,C=3/R=V": 200, diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index c0f333273..d06596f91 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -28,7 +28,6 @@ import ( "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator" - "go.opentelemetry.io/otel/sdk/resource" ) type ( @@ -64,9 +63,6 @@ type ( // place for sorting during labels creation to avoid // allocation. It is cleared after use. asyncSortSlice attribute.Sortable - - // resource is applied to all records in this Accumulator. - resource *resource.Resource } syncInstrument struct { @@ -304,11 +300,10 @@ func (s *syncInstrument) RecordOne(ctx context.Context, num number.Number, kvs [ // processor will call Collect() when it receives a request to scrape // current metric values. A push-based processor should configure its // own periodic collection. -func NewAccumulator(processor export.Processor, resource *resource.Resource) *Accumulator { +func NewAccumulator(processor export.Processor) *Accumulator { return &Accumulator{ processor: processor, asyncInstruments: internal.NewAsyncInstrumentState(), - resource: resource, } } @@ -437,7 +432,7 @@ func (m *Accumulator) checkpointRecord(r *record) int { return 0 } - a := export.NewAccumulation(&r.inst.descriptor, r.labels, m.resource, r.checkpoint) + a := export.NewAccumulation(&r.inst.descriptor, r.labels, r.checkpoint) err = m.processor.Process(a) if err != nil { otel.Handle(err) @@ -455,7 +450,7 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int { epochDiff := m.currentEpoch - lrec.observedEpoch if epochDiff == 0 { if lrec.observed != nil { - a := export.NewAccumulation(&a.descriptor, lrec.labels, m.resource, lrec.observed) + a := export.NewAccumulation(&a.descriptor, lrec.labels, lrec.observed) err := m.processor.Process(a) if err != nil { otel.Handle(err) diff --git a/sdk/metric/stress_test.go b/sdk/metric/stress_test.go index 469b89e38..9783ea0fc 100644 --- a/sdk/metric/stress_test.go +++ b/sdk/metric/stress_test.go @@ -295,7 +295,7 @@ func stressTest(t *testing.T, impl testImpl) { } cc := concurrency() - sdk := NewAccumulator(fixture, nil) + sdk := NewAccumulator(fixture) meter := metric.WrapMeterImpl(sdk, "stress_test") fixture.wg.Add(cc + 1)