This commit is contained in:
Yevhenii Solomchenko 2025-07-24 17:19:34 +02:00 committed by GitHub
commit 178aeba7ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 47 additions and 35 deletions

View File

@ -17,7 +17,6 @@ import (
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/internal/x"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
)
@ -37,17 +36,24 @@ type instrumentSync struct {
compAgg aggregate.ComputeAggregation
}
func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFilter exemplar.Filter) *pipeline {
func newPipeline(
res *resource.Resource,
reader Reader,
views []View,
exemplarFilter exemplar.Filter,
cardinalityLimit int,
) *pipeline {
if res == nil {
res = resource.Empty()
}
return &pipeline{
resource: res,
reader: reader,
views: views,
int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{},
float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{},
exemplarFilter: exemplarFilter,
resource: res,
reader: reader,
views: views,
int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{},
float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{},
exemplarFilter: exemplarFilter,
cardinalityLimit: cardinalityLimit,
// aggregations is lazy allocated when needed.
}
}
@ -65,12 +71,13 @@ type pipeline struct {
views []View
sync.Mutex
int64Measures map[observableID[int64]][]aggregate.Measure[int64]
float64Measures map[observableID[float64]][]aggregate.Measure[float64]
aggregations map[instrumentation.Scope][]instrumentSync
callbacks []func(context.Context) error
multiCallbacks list.List
exemplarFilter exemplar.Filter
int64Measures map[observableID[int64]][]aggregate.Measure[int64]
float64Measures map[observableID[float64]][]aggregate.Measure[float64]
aggregations map[instrumentation.Scope][]instrumentSync
callbacks []func(context.Context) error
multiCallbacks list.List
exemplarFilter exemplar.Filter
cardinalityLimit int
}
// addInt64Measure adds a new int64 measure to the pipeline for each observer.
@ -390,8 +397,7 @@ func (i *inserter[N]) cachedAggregator(
// limits for the builder (an all the created aggregates).
// CardinalityLimit.Lookup returns 0 by default if unset (or
// unrecognized input). Use that value directly.
b.AggregationLimit, _ = x.CardinalityLimit.Lookup()
b.AggregationLimit = i.pipeline.cardinalityLimit
in, out, err := i.aggregateFunc(b, stream.Aggregation, kind)
if err != nil {
return aggVal[N]{0, nil, err}
@ -590,10 +596,16 @@ func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error {
// measurement.
type pipelines []*pipeline
func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines {
func newPipelines(
res *resource.Resource,
readers []Reader,
views []View,
exemplarFilter exemplar.Filter,
cardinalityLimit int,
) pipelines {
pipes := make([]*pipeline, 0, len(readers))
for _, r := range readers {
p := newPipeline(res, r, views, exemplarFilter)
p := newPipeline(res, r, views, exemplarFilter, cardinalityLimit)
r.register(p)
pipes = append(pipes, p)
}

View File

@ -392,7 +392,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
var c cache[string, instID]
p := newPipeline(nil, tt.reader, tt.views, exemplar.AlwaysOffFilter)
p := newPipeline(nil, tt.reader, tt.views, exemplar.AlwaysOffFilter, 0)
i := newInserter[N](p, &c)
readerAggregation := i.readerDefaultAggregation(tt.inst.Kind)
input, err := i.Instrument(tt.inst, readerAggregation)
@ -414,7 +414,7 @@ func TestCreateAggregators(t *testing.T) {
func testInvalidInstrumentShouldPanic[N int64 | float64]() {
var c cache[string, instID]
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}, exemplar.AlwaysOffFilter), &c)
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}, exemplar.AlwaysOffFilter, 0), &c)
inst := Instrument{
Name: "foo",
Kind: InstrumentKind(255),
@ -430,7 +430,7 @@ func TestInvalidInstrumentShouldPanic(t *testing.T) {
func TestPipelinesAggregatorForEachReader(t *testing.T) {
r0, r1 := NewManualReader(), NewManualReader()
pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil, exemplar.AlwaysOffFilter)
pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil, exemplar.AlwaysOffFilter, 0)
require.Len(t, pipes, 2, "created pipelines")
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
@ -504,7 +504,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
p := newPipelines(resource.Empty(), tt.readers, tt.views, exemplar.AlwaysOffFilter)
p := newPipelines(resource.Empty(), tt.readers, tt.views, exemplar.AlwaysOffFilter, 0)
testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount)
testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount)
testPipelineRegistryResolveIntHistogramAggregators(t, p, tt.wantCount)
@ -558,7 +558,7 @@ func TestPipelineRegistryResource(t *testing.T) {
readers := []Reader{NewManualReader()}
views := []View{defaultView, v}
res := resource.NewSchemaless(attribute.String("key", "val"))
pipes := newPipelines(res, readers, views, exemplar.AlwaysOffFilter)
pipes := newPipelines(res, readers, views, exemplar.AlwaysOffFilter, 0)
for _, p := range pipes {
assert.True(t, res.Equal(p.resource), "resource not set")
}
@ -571,7 +571,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
readers := []Reader{testRdrHistogram}
views := []View{defaultView}
p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter)
p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter, 0)
inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge}
var vc cache[string, instID]
@ -631,7 +631,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {
fooInst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
barInst := Instrument{Name: "bar", Kind: InstrumentKindCounter}
p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter)
p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter, 0)
var vc cache[string, instID]
ri := newResolver[int64](p, &vc)

View File

@ -42,7 +42,7 @@ func testSumAggregateOutput(dest *metricdata.Aggregation) int {
}
func TestNewPipeline(t *testing.T) {
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter)
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter, 0)
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
@ -68,7 +68,7 @@ func TestNewPipeline(t *testing.T) {
func TestPipelineUsesResource(t *testing.T) {
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
pipe := newPipeline(res, nil, nil, exemplar.AlwaysOffFilter)
pipe := newPipeline(res, nil, nil, exemplar.AlwaysOffFilter, 0)
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
@ -77,7 +77,7 @@ func TestPipelineUsesResource(t *testing.T) {
}
func TestPipelineConcurrentSafe(t *testing.T) {
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter)
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter, 0)
ctx := context.Background()
var output metricdata.ResourceMetrics
@ -142,13 +142,13 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
}{
{
name: "NoView",
pipe: newPipeline(nil, reader, nil, exemplar.AlwaysOffFilter),
pipe: newPipeline(nil, reader, nil, exemplar.AlwaysOffFilter, 0),
},
{
name: "NoMatchingView",
pipe: newPipeline(nil, reader, []View{
NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}),
}, exemplar.AlwaysOffFilter),
}, exemplar.AlwaysOffFilter, 0),
},
}
@ -233,7 +233,7 @@ func TestLogConflictName(t *testing.T) {
return instID{Name: tc.existing}
})
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc)
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter, 0), &vc)
i.logConflict(instID{Name: tc.name})
if tc.conflict {
@ -275,7 +275,7 @@ func TestLogConflictSuggestView(t *testing.T) {
var vc cache[string, instID]
name := strings.ToLower(orig.Name)
_ = vc.Lookup(name, func() instID { return orig })
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc)
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter, 0), &vc)
viewSuggestion := func(inst instID, stream string) string {
return `"NewView(Instrument{` +
@ -380,7 +380,7 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) {
}
var vc cache[string, instID]
pipe := newPipeline(nil, NewManualReader(), nil, exemplar.AlwaysOffFilter)
pipe := newPipeline(nil, NewManualReader(), nil, exemplar.AlwaysOffFilter, 0)
i := newInserter[int64](pipe, &vc)
readerAggregation := i.readerDefaultAggregation(kind)
@ -621,7 +621,7 @@ func TestPipelineWithMultipleReaders(t *testing.T) {
func TestPipelineProduceErrors(t *testing.T) {
// Create a test pipeline with aggregations
pipeReader := NewManualReader()
pipe := newPipeline(nil, pipeReader, nil, exemplar.AlwaysOffFilter)
pipe := newPipeline(nil, pipeReader, nil, exemplar.AlwaysOffFilter, 0)
// Set up an observable with callbacks
var testObsID observableID[int64]

View File

@ -42,7 +42,7 @@ func NewMeterProvider(options ...Option) *MeterProvider {
flush, sdown := conf.readerSignals()
mp := &MeterProvider{
pipes: newPipelines(conf.res, conf.readers, conf.views, conf.exemplarFilter),
pipes: newPipelines(conf.res, conf.readers, conf.views, conf.exemplarFilter, conf.cardinalityLimit),
forceFlush: flush,
shutdown: sdown,
}