diff --git a/.chloggen/processor-internal-duration.yaml b/.chloggen/processor-internal-duration.yaml new file mode 100644 index 0000000000..4a67a953bd --- /dev/null +++ b/.chloggen/processor-internal-duration.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processorhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add processor internal duration metric. + +# One or more tracking issues or pull requests related to the change +issues: [13231] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/processorhelper/documentation.md b/processor/processorhelper/documentation.md index 1c1b10f9f4..28ada41b36 100644 --- a/processor/processorhelper/documentation.md +++ b/processor/processorhelper/documentation.md @@ -14,6 +14,14 @@ Number of items passed to the processor. [alpha] | ---- | ----------- | ---------- | --------- | | {items} | Sum | Int | true | +### otelcol_processor_internal_duration + +Duration of time taken to process a batch of telemetry data through the processor. [alpha] + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| s | Histogram | Double | + ### otelcol_processor_outgoing_items Number of items emitted from the processor. [alpha] diff --git a/processor/processorhelper/internal/metadata/generated_telemetry.go b/processor/processorhelper/internal/metadata/generated_telemetry.go index dd60965bf9..7ea57e264c 100644 --- a/processor/processorhelper/internal/metadata/generated_telemetry.go +++ b/processor/processorhelper/internal/metadata/generated_telemetry.go @@ -23,11 +23,12 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { // TelemetryBuilder provides an interface for components to report telemetry // as defined in metadata and user config. type TelemetryBuilder struct { - meter metric.Meter - mu sync.Mutex - registrations []metric.Registration - ProcessorIncomingItems metric.Int64Counter - ProcessorOutgoingItems metric.Int64Counter + meter metric.Meter + mu sync.Mutex + registrations []metric.Registration + ProcessorIncomingItems metric.Int64Counter + ProcessorInternalDuration metric.Float64Histogram + ProcessorOutgoingItems metric.Int64Counter } // TelemetryBuilderOption applies changes to default builder. @@ -65,6 +66,12 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{items}"), ) errs = errors.Join(errs, err) + builder.ProcessorInternalDuration, err = builder.meter.Float64Histogram( + "otelcol_processor_internal_duration", + metric.WithDescription("Duration of time taken to process a batch of telemetry data through the processor. [alpha]"), + metric.WithUnit("s"), + ) + errs = errors.Join(errs, err) builder.ProcessorOutgoingItems, err = builder.meter.Int64Counter( "otelcol_processor_outgoing_items", metric.WithDescription("Number of items emitted from the processor. [alpha]"), diff --git a/processor/processorhelper/internal/metadatatest/generated_telemetrytest.go b/processor/processorhelper/internal/metadatatest/generated_telemetrytest.go index 75a7c3f0ad..77ec5636f2 100644 --- a/processor/processorhelper/internal/metadatatest/generated_telemetrytest.go +++ b/processor/processorhelper/internal/metadatatest/generated_telemetrytest.go @@ -28,6 +28,21 @@ func AssertEqualProcessorIncomingItems(t *testing.T, tt *componenttest.Telemetry metricdatatest.AssertEqual(t, want, got, opts...) } +func AssertEqualProcessorInternalDuration(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.HistogramDataPoint[float64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_processor_internal_duration", + Description: "Duration of time taken to process a batch of telemetry data through the processor. [alpha]", + Unit: "s", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_processor_internal_duration") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + func AssertEqualProcessorOutgoingItems(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_processor_outgoing_items", diff --git a/processor/processorhelper/internal/metadatatest/generated_telemetrytest_test.go b/processor/processorhelper/internal/metadatatest/generated_telemetrytest_test.go index 701bc100bd..a59c1eb824 100644 --- a/processor/processorhelper/internal/metadatatest/generated_telemetrytest_test.go +++ b/processor/processorhelper/internal/metadatatest/generated_telemetrytest_test.go @@ -20,10 +20,14 @@ func TestSetupTelemetry(t *testing.T) { require.NoError(t, err) defer tb.Shutdown() tb.ProcessorIncomingItems.Add(context.Background(), 1) + tb.ProcessorInternalDuration.Record(context.Background(), 1) tb.ProcessorOutgoingItems.Add(context.Background(), 1) AssertEqualProcessorIncomingItems(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) + AssertEqualProcessorInternalDuration(t, testTel, + []metricdata.HistogramDataPoint[float64]{{}}, metricdatatest.IgnoreValue(), + metricdatatest.IgnoreTimestamp()) AssertEqualProcessorOutgoingItems(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) diff --git a/processor/processorhelper/logs.go b/processor/processorhelper/logs.go index 23c702cd2a..9ea284e35f 100644 --- a/processor/processorhelper/logs.go +++ b/processor/processorhelper/logs.go @@ -6,6 +6,7 @@ package processorhelper // import "go.opentelemetry.io/collector/processor/proce import ( "context" "errors" + "time" "go.opentelemetry.io/otel/trace" @@ -49,10 +50,14 @@ func NewLogs( logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { span := trace.SpanFromContext(ctx) span.AddEvent("Start processing.", eventOptions) + + startTime := time.Now() + recordsIn := ld.LogRecordCount() var errFunc error ld, errFunc = logsFunc(ctx, ld) + obs.recordInternalDuration(ctx, startTime) span.AddEvent("End processing.", eventOptions) if errFunc != nil { obs.recordInOut(ctx, recordsIn, 0) diff --git a/processor/processorhelper/logs_test.go b/processor/processorhelper/logs_test.go index 440090098d..8b62c68cc6 100644 --- a/processor/processorhelper/logs_test.go +++ b/processor/processorhelper/logs_test.go @@ -183,6 +183,33 @@ func TestLogs_RecordIn_ErrorOut(t *testing.T) { }, metricdatatest.IgnoreTimestamp()) } +func TestLogs_ProcessInternalDuration(t *testing.T) { + mockAggregate := func(_ context.Context, _ plog.Logs) (plog.Logs, error) { + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + return ld, nil + } + + incomingLogs := plog.NewLogs() + + tel := componenttest.NewTelemetry() + lp, err := NewLogs(context.Background(), newSettings(tel), &testLogsCfg, consumertest.NewNop(), mockAggregate) + require.NoError(t, err) + + assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs)) + assert.NoError(t, lp.Shutdown(context.Background())) + + metadatatest.AssertEqualProcessorInternalDuration(t, tel, + []metricdata.HistogramDataPoint[float64]{ + { + Count: 1, + BucketCounts: []uint64{1}, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "logs")), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) +} + func newSettings(tel *componenttest.Telemetry) processor.Settings { set := processortest.NewNopSettings(component.MustNewType("processorhelper")) set.TelemetrySettings = tel.NewTelemetrySettings() diff --git a/processor/processorhelper/metadata.yaml b/processor/processorhelper/metadata.yaml index fbfdc15443..89f2775dd2 100644 --- a/processor/processorhelper/metadata.yaml +++ b/processor/processorhelper/metadata.yaml @@ -28,3 +28,13 @@ telemetry: sum: value_type: int monotonic: true + + processor_internal_duration: + enabled: true + stability: + level: alpha + description: Duration of time taken to process a batch of telemetry data through the processor. + unit: s + histogram: + async: false + value_type: double diff --git a/processor/processorhelper/metrics.go b/processor/processorhelper/metrics.go index 0bd3255333..07c54187e9 100644 --- a/processor/processorhelper/metrics.go +++ b/processor/processorhelper/metrics.go @@ -6,6 +6,7 @@ package processorhelper // import "go.opentelemetry.io/collector/processor/proce import ( "context" "errors" + "time" "go.opentelemetry.io/otel/trace" @@ -49,10 +50,14 @@ func NewMetrics( metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { span := trace.SpanFromContext(ctx) span.AddEvent("Start processing.", eventOptions) + + startTime := time.Now() + pointsIn := md.DataPointCount() var errFunc error md, errFunc = metricsFunc(ctx, md) + obs.recordInternalDuration(ctx, startTime) span.AddEvent("End processing.", eventOptions) if errFunc != nil { obs.recordInOut(ctx, pointsIn, 0) diff --git a/processor/processorhelper/metrics_test.go b/processor/processorhelper/metrics_test.go index 12b77643ee..a2d7ef5ecd 100644 --- a/processor/processorhelper/metrics_test.go +++ b/processor/processorhelper/metrics_test.go @@ -180,3 +180,32 @@ func TestMetrics_RecordIn_ErrorOut(t *testing.T) { }, }, metricdatatest.IgnoreTimestamp()) } + +func TestMetrics_ProcessInternalDuration(t *testing.T) { + mockAggregate := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) { + md := pmetric.NewMetrics() + md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty() + md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty() + md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty() + return md, nil + } + + incomingMetrics := pmetric.NewMetrics() + + tel := componenttest.NewTelemetry() + mp, err := NewMetrics(context.Background(), newSettings(tel), &testMetricsCfg, consumertest.NewNop(), mockAggregate) + require.NoError(t, err) + + assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, mp.ConsumeMetrics(context.Background(), incomingMetrics)) + assert.NoError(t, mp.Shutdown(context.Background())) + + metadatatest.AssertEqualProcessorInternalDuration(t, tel, + []metricdata.HistogramDataPoint[float64]{ + { + Count: 1, + BucketCounts: []uint64{1}, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "metrics")), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) +} diff --git a/processor/processorhelper/obsreport.go b/processor/processorhelper/obsreport.go index a342e021d7..4ba1be5a85 100644 --- a/processor/processorhelper/obsreport.go +++ b/processor/processorhelper/obsreport.go @@ -5,6 +5,7 @@ package processorhelper // import "go.opentelemetry.io/collector/processor/proce import ( "context" + "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -40,3 +41,8 @@ func (or *obsReport) recordInOut(ctx context.Context, incoming, outgoing int) { or.telemetryBuilder.ProcessorIncomingItems.Add(ctx, int64(incoming), or.otelAttrs) or.telemetryBuilder.ProcessorOutgoingItems.Add(ctx, int64(outgoing), or.otelAttrs) } + +func (or *obsReport) recordInternalDuration(ctx context.Context, startTime time.Time) { + duration := time.Since(startTime) + or.telemetryBuilder.ProcessorInternalDuration.Record(ctx, duration.Seconds(), or.otelAttrs) +} diff --git a/processor/processorhelper/traces.go b/processor/processorhelper/traces.go index f6389a2272..765b955534 100644 --- a/processor/processorhelper/traces.go +++ b/processor/processorhelper/traces.go @@ -6,6 +6,7 @@ package processorhelper // import "go.opentelemetry.io/collector/processor/proce import ( "context" "errors" + "time" "go.opentelemetry.io/otel/trace" @@ -49,10 +50,14 @@ func NewTraces( traceConsumer, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { span := trace.SpanFromContext(ctx) span.AddEvent("Start processing.", eventOptions) + + startTime := time.Now() + spansIn := td.SpanCount() var errFunc error td, errFunc = tracesFunc(ctx, td) + obs.recordInternalDuration(ctx, startTime) span.AddEvent("End processing.", eventOptions) if errFunc != nil { obs.recordInOut(ctx, spansIn, 0) diff --git a/processor/processorhelper/traces_test.go b/processor/processorhelper/traces_test.go index 17825096ff..524c7e795e 100644 --- a/processor/processorhelper/traces_test.go +++ b/processor/processorhelper/traces_test.go @@ -184,3 +184,30 @@ func TestTraces_RecordIn_ErrorOut(t *testing.T) { }, }, metricdatatest.IgnoreTimestamp()) } + +func TestTraces_ProcessInternalDuration(t *testing.T) { + mockAggregate := func(_ context.Context, _ ptrace.Traces) (ptrace.Traces, error) { + td := ptrace.NewTraces() + td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + return td, nil + } + + incomingTraces := ptrace.NewTraces() + + tel := componenttest.NewTelemetry() + tp, err := NewTraces(context.Background(), newSettings(tel), &testLogsCfg, consumertest.NewNop(), mockAggregate) + require.NoError(t, err) + + assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, tp.ConsumeTraces(context.Background(), incomingTraces)) + assert.NoError(t, tp.Shutdown(context.Background())) + + metadatatest.AssertEqualProcessorInternalDuration(t, tel, + []metricdata.HistogramDataPoint[float64]{ + { + Count: 1, + BucketCounts: []uint64{1}, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "traces")), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) +} diff --git a/service/service.go b/service/service.go index ce63f153a0..642622dc28 100644 --- a/service/service.go +++ b/service/service.go @@ -379,6 +379,11 @@ func configureViews(level configtelemetry.Level) []config.View { dropViewOption(&config.ViewSelector{ MeterName: ptr("go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"), }), + // Drop duration metric if the level is not detailed + dropViewOption(&config.ViewSelector{ + MeterName: ptr("go.opentelemetry.io/collector/processor/processorhelper"), + InstrumentName: ptr("otelcol_processor_internal_duration"), + }), ) }