[chore][service] Drop component metrics depending on level (#12143)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

<!-- Issue number if applicable -->

Drops metrics that depend on the metrics level:
- Batch processor metric
- otelarrow metrics (see open-telemetry/otel-arrow/issues/280 for
limitation).
- internal/otelarrow/netstats metrics. I did not implement
a25f058256/internal/otelarrow/netstats/netstats.go (L133-L136)
since `LevelNone` drops all metrics.

This attemps to unblock #11601 by hardcoding the metrics here since
there is a small number of them. Once we do #11754 we can move this back
to the individual components

#### Link to tracking issue

Updates #11061
This commit is contained in:
Pablo Baeyens 2025-01-22 09:51:27 +00:00 committed by GitHub
parent 6740a28a80
commit 70f9fe94fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 155 additions and 6 deletions

View File

@ -264,10 +264,17 @@ func (b *shard[T]) sendItems(trigger trigger) {
return return
} }
var bytes int var bytes int
if b.processor.telemetry.detailed { bpt := b.processor.telemetry
// Check if the instrument is enabled to calculate the size of the batch in bytes.
// See https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/internal/x#readme-instrument-enabled
batchSendSizeBytes := bpt.telemetryBuilder.ProcessorBatchBatchSendSizeBytes
instr, ok := batchSendSizeBytes.(interface{ Enabled(context.Context) bool })
if !ok || instr.Enabled(bpt.exportCtx) {
bytes = b.batch.sizeBytes(req) bytes = b.batch.sizeBytes(req)
} }
b.processor.telemetry.record(trigger, int64(sent), int64(bytes))
bpt.record(trigger, int64(sent), int64(bytes))
} }
// singleShardBatcher is used when metadataKeys is empty, to avoid the // singleShardBatcher is used when metadataKeys is empty, to avoid the

View File

@ -9,7 +9,6 @@ import (
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/batchprocessor/internal/metadata" "go.opentelemetry.io/collector/processor/batchprocessor/internal/metadata"
"go.opentelemetry.io/collector/processor/internal" "go.opentelemetry.io/collector/processor/internal"
@ -23,8 +22,6 @@ const (
) )
type batchProcessorTelemetry struct { type batchProcessorTelemetry struct {
detailed bool
exportCtx context.Context exportCtx context.Context
processorAttr metric.MeasurementOption processorAttr metric.MeasurementOption
@ -44,7 +41,6 @@ func newBatchProcessorTelemetry(set processor.Settings, currentMetadataCardinali
return &batchProcessorTelemetry{ return &batchProcessorTelemetry{
exportCtx: context.Background(), exportCtx: context.Background(),
detailed: set.MetricsLevel == configtelemetry.LevelDetailed,
telemetryBuilder: telemetryBuilder, telemetryBuilder: telemetryBuilder,
processorAttr: attrs, processorAttr: attrs,
}, nil }, nil

View File

@ -10,6 +10,7 @@ import (
"go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric" sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/multierr" "go.uber.org/multierr"
@ -36,6 +37,15 @@ type meterProviderSettings struct {
asyncErrorChannel chan error asyncErrorChannel chan error
} }
func dropViewOption(instrument sdkmetric.Instrument) sdkmetric.Option {
return sdkmetric.WithView(sdkmetric.NewView(
instrument,
sdkmetric.Stream{
Aggregation: sdkmetric.AggregationDrop{},
},
))
}
// newMeterProvider creates a new MeterProvider from Config. // newMeterProvider creates a new MeterProvider from Config.
func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (metric.MeterProvider, error) { func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (metric.MeterProvider, error) {
if set.cfg.Level == configtelemetry.LevelNone || len(set.cfg.Readers) == 0 { if set.cfg.Level == configtelemetry.LevelNone || len(set.cfg.Readers) == 0 {
@ -56,6 +66,73 @@ func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (m
opts = append(opts, sdkmetric.WithReader(r)) opts = append(opts, sdkmetric.WithReader(r))
} }
// otel-arrow library metrics
// See https://github.com/open-telemetry/otel-arrow/blob/c39257/pkg/otel/arrow_record/consumer.go#L174-L176
if set.cfg.Level < configtelemetry.LevelNormal {
scope := instrumentation.Scope{Name: "otel-arrow/pkg/otel/arrow_record"}
opts = append(opts,
dropViewOption(sdkmetric.Instrument{
Name: "arrow_batch_records",
Scope: scope,
}),
dropViewOption(sdkmetric.Instrument{
Name: "arrow_schema_resets",
Scope: scope,
}),
dropViewOption(sdkmetric.Instrument{
Name: "arrow_memory_inuse",
Scope: scope,
}),
)
}
// contrib's internal/otelarrow/netstats metrics
// See
// - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L130
// - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L165
if set.cfg.Level < configtelemetry.LevelDetailed {
scope := instrumentation.Scope{Name: "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"}
// Compressed size metrics.
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_*_compressed_size",
Scope: scope,
}))
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_*_compressed_size",
Scope: scope,
}))
// makeRecvMetrics for exporters.
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_exporter_recv",
Scope: scope,
}))
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_exporter_recv_wire",
Scope: scope,
}))
// makeSentMetrics for receivers.
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_receiver_sent",
Scope: scope,
}))
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_receiver_sent_wire",
Scope: scope,
}))
}
// Batch processor metrics
if set.cfg.Level < configtelemetry.LevelDetailed {
scope := instrumentation.Scope{Name: "go.opentelemetry.io/collector/processor/batchprocessor"}
opts = append(opts, dropViewOption(sdkmetric.Instrument{
Name: "otelcol_processor_batch_batch_send_size_bytes",
Scope: scope,
}))
}
var err error var err error
mp.MeterProvider, err = otelinit.InitOpenTelemetry(set.res, opts, disableHighCardinality) mp.MeterProvider, err = otelinit.InitOpenTelemetry(set.res, opts, disableHighCardinality)
if err != nil { if err != nil {

View File

@ -12,9 +12,11 @@ import (
io_prometheus_client "github.com/prometheus/client_model/go" io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt" "github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.opentelemetry.io/contrib/config" "go.opentelemetry.io/contrib/config"
"go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/config/configtelemetry"
@ -232,3 +234,70 @@ func getMetricsFromPrometheus(t *testing.T, endpoint string) map[string]*io_prom
return parsed return parsed
} }
// Test that the MeterProvider implements the 'Enabled' functionality.
// See https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/internal/x#readme-instrument-enabled.
func TestInstrumentEnabled(t *testing.T) {
prom := promtest.GetAvailableLocalAddressPrometheus(t)
set := meterProviderSettings{
res: sdkresource.Default(),
cfg: MetricsConfig{
Level: configtelemetry.LevelDetailed,
Readers: []config.MetricReader{{
Pull: &config.PullMetricReader{Exporter: config.MetricExporter{Prometheus: prom}},
}},
},
asyncErrorChannel: make(chan error),
}
meterProvider, err := newMeterProvider(set, false)
defer func() {
if prov, ok := meterProvider.(interface{ Shutdown(context.Context) error }); ok {
require.NoError(t, prov.Shutdown(context.Background()))
}
}()
require.NoError(t, err)
meter := meterProvider.Meter("go.opentelemetry.io/collector/service/telemetry")
type enabledInstrument interface{ Enabled(context.Context) bool }
intCnt, err := meter.Int64Counter("int64.counter")
require.NoError(t, err)
_, ok := intCnt.(enabledInstrument)
assert.True(t, ok, "Int64Counter does not implement the experimental 'Enabled' method")
intUpDownCnt, err := meter.Int64UpDownCounter("int64.updowncounter")
require.NoError(t, err)
_, ok = intUpDownCnt.(enabledInstrument)
assert.True(t, ok, "Int64UpDownCounter does not implement the experimental 'Enabled' method")
intHist, err := meter.Int64Histogram("int64.updowncounter")
require.NoError(t, err)
_, ok = intHist.(enabledInstrument)
assert.True(t, ok, "Int64Histogram does not implement the experimental 'Enabled' method")
intGauge, err := meter.Int64Gauge("int64.updowncounter")
require.NoError(t, err)
_, ok = intGauge.(enabledInstrument)
assert.True(t, ok, "Int64Gauge does not implement the experimental 'Enabled' method")
floatCnt, err := meter.Float64Counter("int64.updowncounter")
require.NoError(t, err)
_, ok = floatCnt.(enabledInstrument)
assert.True(t, ok, "Float64Counter does not implement the experimental 'Enabled' method")
floatUpDownCnt, err := meter.Float64UpDownCounter("int64.updowncounter")
require.NoError(t, err)
_, ok = floatUpDownCnt.(enabledInstrument)
assert.True(t, ok, "Float64UpDownCounter does not implement the experimental 'Enabled' method")
floatHist, err := meter.Float64Histogram("int64.updowncounter")
require.NoError(t, err)
_, ok = floatHist.(enabledInstrument)
assert.True(t, ok, "Float64Histogram does not implement the experimental 'Enabled' method")
floatGauge, err := meter.Float64Gauge("int64.updowncounter")
require.NoError(t, err)
_, ok = floatGauge.(enabledInstrument)
assert.True(t, ok, "Float64Gauge does not implement the experimental 'Enabled' method")
}