[processorhelper] report signal as attribute on incoming/outgoing items (#11144)

This updating the existing metric points that were recently added to use
signal as an attribute instead of separating the metric name. It follows
the suggestions in [otep
259](https://github.com/open-telemetry/oteps/pull/259) for the metric
and attribute names.

Putting this in draft to get some feedback from @djaglowski before
moving forward with this change

---------

Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com>
This commit is contained in:
Alex Boten 2024-09-13 13:07:15 -07:00 committed by GitHub
parent 8a070097ec
commit 3b50b38d39
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 94 additions and 157 deletions

View File

@ -0,0 +1,36 @@
# Use this changelog template to create an entry for release notes.
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processorhelper
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Update incoming/outgoing metrics to a single metric with a `otel.signal` attributes."
# One or more tracking issues or pull requests related to the change
issues: [11144]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following metrics were added in the previous version
- otelcol_processor_incoming_spans
- otelcol_processor_outgoing_spans
- otelcol_processor_incoming_metric_points
- otelcol_processor_outgoing_metric_points
- otelcol_processor_incoming_log_records
- otelcol_processor_outgoing_log_records
They are being replaced with the following to more closely align with OTEP 259:
- otelcol_processor_incoming_items
- otelcol_processor_outgoing_items
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []

View File

@ -54,53 +54,21 @@ Number of spans that were dropped.
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |
### otelcol_processor_incoming_log_records
### otelcol_processor_incoming_items
Number of log records passed to the processor.
Number of items passed to the processor.
| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {records} | Sum | Int | true |
| {items} | Sum | Int | true |
### otelcol_processor_incoming_metric_points
### otelcol_processor_outgoing_items
Number of metric points passed to the processor.
Number of items emitted from the processor.
| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {datapoints} | Sum | Int | true |
### otelcol_processor_incoming_spans
Number of spans passed to the processor.
| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |
### otelcol_processor_outgoing_log_records
Number of log records emitted from the processor.
| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {records} | Sum | Int | true |
### otelcol_processor_outgoing_metric_points
Number of metric points emitted from the processor.
| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {datapoints} | Sum | Int | true |
### otelcol_processor_outgoing_spans
Number of spans emitted from the processor.
| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |
| {items} | Sum | Int | true |
### otelcol_processor_refused_log_records

View File

@ -35,12 +35,8 @@ type TelemetryBuilder struct {
ProcessorDroppedLogRecords metric.Int64Counter
ProcessorDroppedMetricPoints metric.Int64Counter
ProcessorDroppedSpans metric.Int64Counter
ProcessorIncomingLogRecords metric.Int64Counter
ProcessorIncomingMetricPoints metric.Int64Counter
ProcessorIncomingSpans metric.Int64Counter
ProcessorOutgoingLogRecords metric.Int64Counter
ProcessorOutgoingMetricPoints metric.Int64Counter
ProcessorOutgoingSpans metric.Int64Counter
ProcessorIncomingItems metric.Int64Counter
ProcessorOutgoingItems metric.Int64Counter
ProcessorRefusedLogRecords metric.Int64Counter
ProcessorRefusedMetricPoints metric.Int64Counter
ProcessorRefusedSpans metric.Int64Counter
@ -103,40 +99,16 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme
metric.WithUnit("{spans}"),
)
errs = errors.Join(errs, err)
builder.ProcessorIncomingLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
"otelcol_processor_incoming_log_records",
metric.WithDescription("Number of log records passed to the processor."),
metric.WithUnit("{records}"),
builder.ProcessorIncomingItems, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
"otelcol_processor_incoming_items",
metric.WithDescription("Number of items passed to the processor."),
metric.WithUnit("{items}"),
)
errs = errors.Join(errs, err)
builder.ProcessorIncomingMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
"otelcol_processor_incoming_metric_points",
metric.WithDescription("Number of metric points passed to the processor."),
metric.WithUnit("{datapoints}"),
)
errs = errors.Join(errs, err)
builder.ProcessorIncomingSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
"otelcol_processor_incoming_spans",
metric.WithDescription("Number of spans passed to the processor."),
metric.WithUnit("{spans}"),
)
errs = errors.Join(errs, err)
builder.ProcessorOutgoingLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
"otelcol_processor_outgoing_log_records",
metric.WithDescription("Number of log records emitted from the processor."),
metric.WithUnit("{records}"),
)
errs = errors.Join(errs, err)
builder.ProcessorOutgoingMetricPoints, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
"otelcol_processor_outgoing_metric_points",
metric.WithDescription("Number of metric points emitted from the processor."),
metric.WithUnit("{datapoints}"),
)
errs = errors.Join(errs, err)
builder.ProcessorOutgoingSpans, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
"otelcol_processor_outgoing_spans",
metric.WithDescription("Number of spans emitted from the processor."),
metric.WithUnit("{spans}"),
builder.ProcessorOutgoingItems, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(
"otelcol_processor_outgoing_items",
metric.WithDescription("Number of items emitted from the processor."),
metric.WithUnit("{items}"),
)
errs = errors.Join(errs, err)
builder.ProcessorRefusedLogRecords, err = builder.meters[configtelemetry.LevelBasic].Int64Counter(

View File

@ -7,6 +7,7 @@ import (
"context"
"errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/collector/component"
@ -46,6 +47,7 @@ func NewLogsProcessor(
if err != nil {
return nil, err
}
obs.otelAttrs = append(obs.otelAttrs, attribute.String("otel.signal", "logs"))
eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
@ -63,7 +65,7 @@ func NewLogsProcessor(
return err
}
recordsOut := ld.LogRecordCount()
obs.recordInOut(ctx, component.DataTypeLogs, recordsIn, recordsOut)
obs.recordInOut(ctx, recordsIn, recordsOut)
return nextConsumer.ConsumeLogs(ctx, ld)
}, bs.consumerOptions...)
if err != nil {

View File

@ -96,31 +96,31 @@ func TestLogsProcessor_RecordInOut(t *testing.T) {
testTelemetry.assertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_processor_incoming_log_records",
Description: "Number of log records passed to the processor.",
Unit: "{records}",
Name: "otelcol_processor_incoming_items",
Description: "Number of items passed to the processor.",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 3,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "logs")),
},
},
},
},
{
Name: "otelcol_processor_outgoing_log_records",
Description: "Number of log records emitted from the processor.",
Unit: "{records}",
Name: "otelcol_processor_outgoing_items",
Description: "Number of items emitted from the processor.",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "logs")),
},
},
},

View File

@ -10,50 +10,18 @@ status:
telemetry:
metrics:
processor_incoming_spans:
processor_incoming_items:
enabled: true
description: Number of spans passed to the processor.
unit: "{spans}"
description: Number of items passed to the processor.
unit: "{items}"
sum:
value_type: int
monotonic: true
processor_outgoing_spans:
processor_outgoing_items:
enabled: true
description: Number of spans emitted from the processor.
unit: "{spans}"
sum:
value_type: int
monotonic: true
processor_incoming_metric_points:
enabled: true
description: Number of metric points passed to the processor.
unit: "{datapoints}"
sum:
value_type: int
monotonic: true
processor_outgoing_metric_points:
enabled: true
description: Number of metric points emitted from the processor.
unit: "{datapoints}"
sum:
value_type: int
monotonic: true
processor_incoming_log_records:
enabled: true
description: Number of log records passed to the processor.
unit: "{records}"
sum:
value_type: int
monotonic: true
processor_outgoing_log_records:
enabled: true
description: Number of log records emitted from the processor.
unit: "{records}"
description: Number of items emitted from the processor.
unit: "{items}"
sum:
value_type: int
monotonic: true

View File

@ -7,6 +7,7 @@ import (
"context"
"errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/collector/component"
@ -46,6 +47,7 @@ func NewMetricsProcessor(
if err != nil {
return nil, err
}
obs.otelAttrs = append(obs.otelAttrs, attribute.String("otel.signal", "metrics"))
eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
@ -63,7 +65,7 @@ func NewMetricsProcessor(
return err
}
pointsOut := md.DataPointCount()
obs.recordInOut(ctx, component.DataTypeMetrics, pointsIn, pointsOut)
obs.recordInOut(ctx, pointsIn, pointsOut)
return nextConsumer.ConsumeMetrics(ctx, md)
}, bs.consumerOptions...)
if err != nil {

View File

@ -97,31 +97,31 @@ func TestMetricsProcessor_RecordInOut(t *testing.T) {
testTelemetry.assertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_processor_incoming_metric_points",
Description: "Number of metric points passed to the processor.",
Unit: "{datapoints}",
Name: "otelcol_processor_incoming_items",
Description: "Number of items passed to the processor.",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 2,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "metrics")),
},
},
},
},
{
Name: "otelcol_processor_outgoing_metric_points",
Description: "Number of metric points emitted from the processor.",
Unit: "{datapoints}",
Name: "otelcol_processor_outgoing_items",
Description: "Number of items emitted from the processor.",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 3,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "metrics")),
},
},
},

View File

@ -60,22 +60,9 @@ func newObsReport(cfg ObsReportSettings) (*ObsReport, error) {
}, nil
}
func (or *ObsReport) recordInOut(ctx context.Context, dataType component.DataType, incoming, outgoing int) {
var incomingCount, outgoingCount metric.Int64Counter
switch dataType {
case component.DataTypeTraces:
incomingCount = or.telemetryBuilder.ProcessorIncomingSpans
outgoingCount = or.telemetryBuilder.ProcessorOutgoingSpans
case component.DataTypeMetrics:
incomingCount = or.telemetryBuilder.ProcessorIncomingMetricPoints
outgoingCount = or.telemetryBuilder.ProcessorOutgoingMetricPoints
case component.DataTypeLogs:
incomingCount = or.telemetryBuilder.ProcessorIncomingLogRecords
outgoingCount = or.telemetryBuilder.ProcessorOutgoingLogRecords
}
incomingCount.Add(ctx, int64(incoming), metric.WithAttributes(or.otelAttrs...))
outgoingCount.Add(ctx, int64(outgoing), metric.WithAttributes(or.otelAttrs...))
func (or *ObsReport) recordInOut(ctx context.Context, incoming, outgoing int) {
or.telemetryBuilder.ProcessorIncomingItems.Add(ctx, int64(incoming), metric.WithAttributes(or.otelAttrs...))
or.telemetryBuilder.ProcessorOutgoingItems.Add(ctx, int64(outgoing), metric.WithAttributes(or.otelAttrs...))
}
func (or *ObsReport) recordData(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) {

View File

@ -7,6 +7,7 @@ import (
"context"
"errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/collector/component"
@ -46,6 +47,7 @@ func NewTracesProcessor(
if err != nil {
return nil, err
}
obs.otelAttrs = append(obs.otelAttrs, attribute.String("otel.signal", "traces"))
eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
@ -63,7 +65,7 @@ func NewTracesProcessor(
return err
}
spansOut := td.SpanCount()
obs.recordInOut(ctx, component.DataTypeTraces, spansIn, spansOut)
obs.recordInOut(ctx, spansIn, spansOut)
return nextConsumer.ConsumeTraces(ctx, td)
}, bs.consumerOptions...)

View File

@ -97,31 +97,31 @@ func TestTracesProcessor_RecordInOut(t *testing.T) {
testTelemetry.assertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_processor_incoming_spans",
Description: "Number of spans passed to the processor.",
Unit: "{spans}",
Name: "otelcol_processor_incoming_items",
Description: "Number of items passed to the processor.",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 4,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "traces")),
},
},
},
},
{
Name: "otelcol_processor_outgoing_spans",
Description: "Number of spans emitted from the processor.",
Unit: "{spans}",
Name: "otelcol_processor_outgoing_items",
Description: "Number of items emitted from the processor.",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 1,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper")),
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "traces")),
},
},
},