Change exporterhelper from observability to obsreport package (#568)
This is the first part of moving all components to use the obsreport package (with the goal of making uniform the metrics used by various components). - introduce the command-line option to select the new/legacy metrics, the default for now is legacy metrics. - remove the options to have or not metrics and tracing from exporterhelper since all usages were enabling both (the only not using it on contrib was by mistake)
This commit is contained in:
parent
d17176da05
commit
d6147389a6
|
|
@ -27,32 +27,12 @@ type Shutdown func() error
|
|||
|
||||
// ExporterOptions contains options concerning how an Exporter is configured.
|
||||
type ExporterOptions struct {
|
||||
// TODO: Retry logic must be in the same place as metrics recording because
|
||||
// if a request is retried we should not record metrics otherwise number of
|
||||
// spans received + dropped will be different than the number of received spans
|
||||
// in the receiver.
|
||||
recordMetrics bool
|
||||
recordTrace bool
|
||||
shutdown Shutdown
|
||||
shutdown Shutdown
|
||||
}
|
||||
|
||||
// ExporterOption apply changes to ExporterOptions.
|
||||
type ExporterOption func(*ExporterOptions)
|
||||
|
||||
// WithMetrics makes new Exporter to record metrics for every request.
|
||||
func WithMetrics(recordMetrics bool) ExporterOption {
|
||||
return func(o *ExporterOptions) {
|
||||
o.recordMetrics = recordMetrics
|
||||
}
|
||||
}
|
||||
|
||||
// WithTracing makes new Exporter to wrap every request with a trace Span.
|
||||
func WithTracing(recordTrace bool) ExporterOption {
|
||||
return func(o *ExporterOptions) {
|
||||
o.recordTrace = recordTrace
|
||||
}
|
||||
}
|
||||
|
||||
// WithShutdown overrides the default Shutdown function for an exporter.
|
||||
// The default shutdown function does nothing and always returns nil.
|
||||
func WithShutdown(shutdown Shutdown) ExporterOption {
|
||||
|
|
|
|||
|
|
@ -17,35 +17,11 @@ import (
|
|||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
func TestDefaultOptions(t *testing.T) {
|
||||
checkRecordMetrics(t, newExporterOptions(), false)
|
||||
checkRecordTrace(t, newExporterOptions(), false)
|
||||
}
|
||||
|
||||
func TestWithRecordMetrics(t *testing.T) {
|
||||
checkRecordMetrics(t, newExporterOptions(WithMetrics(true)), true)
|
||||
checkRecordMetrics(t, newExporterOptions(WithMetrics(false)), false)
|
||||
}
|
||||
|
||||
func TestWithSpanName(t *testing.T) {
|
||||
checkRecordTrace(t, newExporterOptions(WithTracing(true)), true)
|
||||
checkRecordTrace(t, newExporterOptions(WithTracing(false)), false)
|
||||
}
|
||||
|
||||
func TestErrorToStatus(t *testing.T) {
|
||||
require.Equal(t, okStatus, errToStatus(nil))
|
||||
require.Equal(t, trace.Status{Code: trace.StatusCodeUnknown, Message: "my_error"}, errToStatus(errors.New("my_error")))
|
||||
}
|
||||
|
||||
func checkRecordMetrics(t *testing.T, opts ExporterOptions, recordMetrics bool) {
|
||||
assert.Equalf(t, opts.recordMetrics, recordMetrics, "Wrong recordMetrics Want: %t Got: %t", opts.recordMetrics, recordMetrics)
|
||||
}
|
||||
|
||||
func checkRecordTrace(t *testing.T, opts ExporterOptions, recordTrace bool) {
|
||||
assert.Equalf(t, opts.recordTrace, recordTrace, "Wrong spanName Want: %s Got: %s", opts.recordTrace, recordTrace)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,10 +26,3 @@ var (
|
|||
// errNilPushMetricsData is returned when a nil pushMetricsData is given.
|
||||
errNilPushMetricsData = errors.New("nil pushMetricsData")
|
||||
)
|
||||
|
||||
const (
|
||||
numDroppedTimeSeriesAttribute = "num_dropped_timeseries"
|
||||
numReceivedTimeSeriesAttribute = "num_received_timeseries"
|
||||
numDroppedSpansAttribute = "num_dropped_spans"
|
||||
numReceivedSpansAttribute = "num_received_spans"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -17,13 +17,11 @@ package exporterhelper
|
|||
import (
|
||||
"context"
|
||||
|
||||
"go.opencensus.io/trace"
|
||||
|
||||
"github.com/open-telemetry/opentelemetry-collector/component"
|
||||
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
|
||||
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
|
||||
"github.com/open-telemetry/opentelemetry-collector/exporter"
|
||||
"github.com/open-telemetry/opentelemetry-collector/observability"
|
||||
"github.com/open-telemetry/opentelemetry-collector/obsreport"
|
||||
)
|
||||
|
||||
// PushMetricsData is a helper function that is similar to ConsumeMetricsData but also returns
|
||||
|
|
@ -43,7 +41,7 @@ func (me *metricsExporter) Start(host component.Host) error {
|
|||
}
|
||||
|
||||
func (me *metricsExporter) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
|
||||
exporterCtx := observability.ContextWithExporterName(ctx, me.exporterFullName)
|
||||
exporterCtx := obsreport.ExporterContext(ctx, me.exporterFullName)
|
||||
_, err := me.pushMetricsData(exporterCtx, md)
|
||||
return err
|
||||
}
|
||||
|
|
@ -66,13 +64,8 @@ func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetric
|
|||
}
|
||||
|
||||
opts := newExporterOptions(options...)
|
||||
if opts.recordMetrics {
|
||||
pushMetricsData = pushMetricsDataWithMetrics(pushMetricsData)
|
||||
}
|
||||
|
||||
if opts.recordTrace {
|
||||
pushMetricsData = pushMetricsDataWithSpan(pushMetricsData, config.Name()+".ExportMetricsData")
|
||||
}
|
||||
pushMetricsData = pushMetricsWithObservability(pushMetricsData, config.Name())
|
||||
|
||||
// The default shutdown method always returns nil.
|
||||
if opts.shutdown == nil {
|
||||
|
|
@ -86,33 +79,19 @@ func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetric
|
|||
}, nil
|
||||
}
|
||||
|
||||
func pushMetricsDataWithMetrics(next PushMetricsData) PushMetricsData {
|
||||
func pushMetricsWithObservability(next PushMetricsData, exporterName string) PushMetricsData {
|
||||
return func(ctx context.Context, md consumerdata.MetricsData) (int, error) {
|
||||
// TODO: Add retry logic here if we want to support because we need to record special metrics.
|
||||
droppedTimeSeries, err := next(ctx, md)
|
||||
// TODO: How to record the reason of dropping?
|
||||
observability.RecordMetricsForMetricsExporter(ctx, NumTimeSeries(md), droppedTimeSeries)
|
||||
return droppedTimeSeries, err
|
||||
}
|
||||
}
|
||||
exporterCtx, span := obsreport.StartMetricsExportOp(ctx, exporterName)
|
||||
numDroppedTimeSeries, err := next(exporterCtx, md)
|
||||
|
||||
func pushMetricsDataWithSpan(next PushMetricsData, spanName string) PushMetricsData {
|
||||
return func(ctx context.Context, md consumerdata.MetricsData) (int, error) {
|
||||
ctx, span := trace.StartSpan(ctx, spanName)
|
||||
defer span.End()
|
||||
// Call next stage.
|
||||
droppedTimeSeries, err := next(ctx, md)
|
||||
if span.IsRecordingEvents() {
|
||||
receivedTimeSeries := NumTimeSeries(md)
|
||||
span.AddAttributes(
|
||||
trace.Int64Attribute(numReceivedTimeSeriesAttribute, int64(receivedTimeSeries)),
|
||||
trace.Int64Attribute(numDroppedTimeSeriesAttribute, int64(droppedTimeSeries)),
|
||||
)
|
||||
if err != nil {
|
||||
span.SetStatus(errToStatus(err))
|
||||
}
|
||||
}
|
||||
return droppedTimeSeries, err
|
||||
// TODO: this is not ideal: it should come from the next function itself.
|
||||
// temporarily loading it from internal format. Once full switch is done
|
||||
// to new metrics will remove this.
|
||||
numReceivedTimeSeries, numPoints := measureMetricsExport(md)
|
||||
|
||||
obsreport.EndMetricsExportOp(
|
||||
exporterCtx, span, numPoints, numReceivedTimeSeries, numDroppedTimeSeries, err)
|
||||
return numDroppedTimeSeries, err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -124,3 +103,16 @@ func NumTimeSeries(md consumerdata.MetricsData) int {
|
|||
}
|
||||
return receivedTimeSeries
|
||||
}
|
||||
|
||||
func measureMetricsExport(md consumerdata.MetricsData) (int, int) {
|
||||
numTimeSeries := 0
|
||||
numPoints := 0
|
||||
for _, metric := range md.Metrics {
|
||||
tss := metric.GetTimeseries()
|
||||
numTimeSeries += len(metric.GetTimeseries())
|
||||
for _, ts := range tss {
|
||||
numPoints += len(ts.GetPoints())
|
||||
}
|
||||
}
|
||||
return numTimeSeries, numPoints
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import (
|
|||
"github.com/open-telemetry/opentelemetry-collector/exporter"
|
||||
"github.com/open-telemetry/opentelemetry-collector/observability"
|
||||
"github.com/open-telemetry/opentelemetry-collector/observability/observabilitytest"
|
||||
"github.com/open-telemetry/opentelemetry-collector/obsreport"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -77,7 +78,7 @@ func TestMetricsExporter_Default_ReturnError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestMetricsExporter_WithRecordMetrics(t *testing.T) {
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(0, nil), WithMetrics(true))
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(0, nil))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, me)
|
||||
|
||||
|
|
@ -85,7 +86,7 @@ func TestMetricsExporter_WithRecordMetrics(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestMetricsExporter_WithRecordMetrics_NonZeroDropped(t *testing.T) {
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(1, nil), WithMetrics(true))
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(1, nil))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, me)
|
||||
|
||||
|
|
@ -94,7 +95,7 @@ func TestMetricsExporter_WithRecordMetrics_NonZeroDropped(t *testing.T) {
|
|||
|
||||
func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
|
||||
want := errors.New("my_error")
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(0, want), WithMetrics(true))
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(0, want))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, me)
|
||||
|
||||
|
|
@ -102,14 +103,14 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestMetricsExporter_WithSpan(t *testing.T) {
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(0, nil), WithTracing(true))
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(0, nil))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, me)
|
||||
checkWrapSpanForMetricsExporter(t, me, nil, 0)
|
||||
checkWrapSpanForMetricsExporter(t, me, nil, 1)
|
||||
}
|
||||
|
||||
func TestMetricsExporter_WithSpan_NonZeroDropped(t *testing.T) {
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(1, nil), WithTracing(true))
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(1, nil))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, me)
|
||||
checkWrapSpanForMetricsExporter(t, me, nil, 1)
|
||||
|
|
@ -117,10 +118,10 @@ func TestMetricsExporter_WithSpan_NonZeroDropped(t *testing.T) {
|
|||
|
||||
func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) {
|
||||
want := errors.New("my_error")
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(0, want), WithTracing(true))
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(0, want))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, me)
|
||||
checkWrapSpanForMetricsExporter(t, me, want, 0)
|
||||
checkWrapSpanForMetricsExporter(t, me, want, 1)
|
||||
}
|
||||
|
||||
func TestMetricsExporter_WithShutdown(t *testing.T) {
|
||||
|
|
@ -180,7 +181,12 @@ func checkRecordedMetricsForMetricsExporter(t *testing.T, me exporter.MetricsExp
|
|||
func generateMetricsTraffic(t *testing.T, me exporter.MetricsExporter, numRequests int, wantError error) {
|
||||
md := consumerdata.MetricsData{Metrics: []*metricspb.Metric{
|
||||
{
|
||||
Timeseries: make([]*metricspb.TimeSeries, 1),
|
||||
// Create a empty timeseries with one point.
|
||||
Timeseries: []*metricspb.TimeSeries{
|
||||
{
|
||||
Points: []*metricspb.Point{{}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}}
|
||||
ctx, span := trace.StartSpan(context.Background(), fakeMetricsParentSpanName, trace.WithSampler(trace.AlwaysSample()))
|
||||
|
|
@ -190,7 +196,7 @@ func generateMetricsTraffic(t *testing.T, me exporter.MetricsExporter, numReques
|
|||
}
|
||||
}
|
||||
|
||||
func checkWrapSpanForMetricsExporter(t *testing.T, me exporter.MetricsExporter, wantError error, droppedSpans int) {
|
||||
func checkWrapSpanForMetricsExporter(t *testing.T, me exporter.MetricsExporter, wantError error, numMetricPoints int64) {
|
||||
ocSpansSaver := new(testOCTraceExporter)
|
||||
trace.RegisterExporter(ocSpansSaver)
|
||||
defer trace.UnregisterExporter(ocSpansSaver)
|
||||
|
|
@ -213,7 +219,14 @@ func checkWrapSpanForMetricsExporter(t *testing.T, me exporter.MetricsExporter,
|
|||
for _, sd := range gotSpanData[:numRequests] {
|
||||
require.Equalf(t, parentSpan.SpanContext.SpanID, sd.ParentSpanID, "Exporter span not a child\nSpanData %v", sd)
|
||||
require.Equalf(t, errToStatus(wantError), sd.Status, "SpanData %v", sd)
|
||||
require.Equalf(t, int64(1), sd.Attributes[numReceivedTimeSeriesAttribute], "SpanData %v", sd)
|
||||
require.Equalf(t, int64(droppedSpans), sd.Attributes[numDroppedTimeSeriesAttribute], "SpanData %v", sd)
|
||||
|
||||
sentMetricPoints := numMetricPoints
|
||||
var failedToSendMetricPoints int64
|
||||
if wantError != nil {
|
||||
sentMetricPoints = 0
|
||||
failedToSendMetricPoints = numMetricPoints
|
||||
}
|
||||
require.Equalf(t, sentMetricPoints, sd.Attributes[obsreport.SentMetricPointsKey], "SpanData %v", sd)
|
||||
require.Equalf(t, failedToSendMetricPoints, sd.Attributes[obsreport.FailedToSendMetricPointsKey], "SpanData %v", sd)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,18 +17,13 @@ package exporterhelper
|
|||
import (
|
||||
"context"
|
||||
|
||||
"go.opencensus.io/trace"
|
||||
|
||||
"github.com/open-telemetry/opentelemetry-collector/component"
|
||||
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
|
||||
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
|
||||
"github.com/open-telemetry/opentelemetry-collector/exporter"
|
||||
"github.com/open-telemetry/opentelemetry-collector/observability"
|
||||
"github.com/open-telemetry/opentelemetry-collector/obsreport"
|
||||
)
|
||||
|
||||
// Suffix to use for span names emitted by exporter for observability purposes.
|
||||
const spanNameSuffix = ".ExportTraceData"
|
||||
|
||||
// traceDataPusher is a helper function that is similar to ConsumeTraceData but also
|
||||
// returns the number of dropped spans.
|
||||
type traceDataPusher func(ctx context.Context, td consumerdata.TraceData) (droppedSpans int, err error)
|
||||
|
|
@ -51,7 +46,7 @@ func (te *traceExporter) Start(host component.Host) error {
|
|||
}
|
||||
|
||||
func (te *traceExporter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
|
||||
exporterCtx := observability.ContextWithExporterName(ctx, te.exporterFullName)
|
||||
exporterCtx := obsreport.ExporterContext(ctx, te.exporterFullName)
|
||||
_, err := te.dataPusher(exporterCtx, td)
|
||||
return err
|
||||
}
|
||||
|
|
@ -79,14 +74,8 @@ func NewTraceExporter(
|
|||
}
|
||||
|
||||
opts := newExporterOptions(options...)
|
||||
if opts.recordMetrics {
|
||||
dataPusher = dataPusher.withMetrics()
|
||||
}
|
||||
|
||||
if opts.recordTrace {
|
||||
spanName := config.Name() + spanNameSuffix
|
||||
dataPusher = dataPusher.withSpan(spanName)
|
||||
}
|
||||
dataPusher = dataPusher.withObservability(config.Name())
|
||||
|
||||
// The default shutdown function does nothing.
|
||||
if opts.shutdown == nil {
|
||||
|
|
@ -102,35 +91,19 @@ func NewTraceExporter(
|
|||
}, nil
|
||||
}
|
||||
|
||||
// withMetrics wraps the current pusher into a function that records the metrics of the
|
||||
// pusher execution.
|
||||
func (p traceDataPusher) withMetrics() traceDataPusher {
|
||||
// withObservability wraps the current pusher into a function that records
|
||||
// the observability signals during the pusher execution.
|
||||
func (p traceDataPusher) withObservability(exporterName string) traceDataPusher {
|
||||
return func(ctx context.Context, td consumerdata.TraceData) (int, error) {
|
||||
exporterCtx, span := obsreport.StartTraceDataExportOp(ctx, exporterName)
|
||||
// Forward the data to the next consumer (this pusher is the next).
|
||||
droppedSpans, err := p(ctx, td)
|
||||
// TODO: How to record the reason of dropping?
|
||||
observability.RecordMetricsForTraceExporter(ctx, len(td.Spans), droppedSpans)
|
||||
return droppedSpans, err
|
||||
}
|
||||
}
|
||||
droppedSpans, err := p(exporterCtx, td)
|
||||
|
||||
// withSpan wraps the current pusher into a function that records a span during
|
||||
// pusher execution.
|
||||
func (p traceDataPusher) withSpan(spanName string) traceDataPusher {
|
||||
return func(ctx context.Context, td consumerdata.TraceData) (int, error) {
|
||||
ctx, span := trace.StartSpan(ctx, spanName)
|
||||
defer span.End()
|
||||
// Forward the data to the next consumer (this pusher is the next).
|
||||
droppedSpans, err := p(ctx, td)
|
||||
if span.IsRecordingEvents() {
|
||||
span.AddAttributes(
|
||||
trace.Int64Attribute(numReceivedSpansAttribute, int64(len(td.Spans))),
|
||||
trace.Int64Attribute(numDroppedSpansAttribute, int64(droppedSpans)),
|
||||
)
|
||||
if err != nil {
|
||||
span.SetStatus(errToStatus(err))
|
||||
}
|
||||
}
|
||||
// TODO: this is not ideal: it should come from the next function itself.
|
||||
// temporarily loading it from internal format. Once full switch is done
|
||||
// to new metrics will remove this.
|
||||
numSpans := len(td.Spans)
|
||||
obsreport.EndTraceDataExportOp(exporterCtx, span, numSpans, droppedSpans, err)
|
||||
return droppedSpans, err
|
||||
}
|
||||
}
|
||||
|
|
@ -151,7 +124,7 @@ func (te *otlpTraceExporter) ConsumeOTLPTrace(
|
|||
ctx context.Context,
|
||||
td consumerdata.OTLPTraceData,
|
||||
) error {
|
||||
exporterCtx := observability.ContextWithExporterName(ctx, te.exporterFullName)
|
||||
exporterCtx := obsreport.ExporterContext(ctx, te.exporterFullName)
|
||||
_, err := te.dataPusher(exporterCtx, td)
|
||||
return err
|
||||
}
|
||||
|
|
@ -178,14 +151,8 @@ func NewOTLPTraceExporter(
|
|||
}
|
||||
|
||||
opts := newExporterOptions(options...)
|
||||
if opts.recordMetrics {
|
||||
dataPusher = dataPusher.withMetrics()
|
||||
}
|
||||
|
||||
if opts.recordTrace {
|
||||
spanName := config.Name() + spanNameSuffix
|
||||
dataPusher = dataPusher.withSpan(spanName)
|
||||
}
|
||||
dataPusher = dataPusher.withObservability(config.Name())
|
||||
|
||||
// The default shutdown function does nothing.
|
||||
if opts.shutdown == nil {
|
||||
|
|
@ -201,41 +168,19 @@ func NewOTLPTraceExporter(
|
|||
}, nil
|
||||
}
|
||||
|
||||
// withMetrics wraps the current pusher into a function that records the metrics of the
|
||||
// pusher execution.
|
||||
func (p otlpTraceDataPusher) withMetrics() otlpTraceDataPusher {
|
||||
// withObservability wraps the current pusher into a function that records
|
||||
// the observability signals during the pusher execution.
|
||||
func (p otlpTraceDataPusher) withObservability(exporterName string) otlpTraceDataPusher {
|
||||
return func(ctx context.Context, td consumerdata.OTLPTraceData) (int, error) {
|
||||
exporterCtx, span := obsreport.StartTraceDataExportOp(ctx, exporterName)
|
||||
// Forward the data to the next consumer (this pusher is the next).
|
||||
droppedSpans, err := p(ctx, td)
|
||||
|
||||
// Record the results as metrics.
|
||||
observability.RecordMetricsForTraceExporter(ctx, td.SpanCount(), droppedSpans)
|
||||
droppedSpans, err := p(exporterCtx, td)
|
||||
|
||||
return droppedSpans, err
|
||||
}
|
||||
}
|
||||
|
||||
// withSpan wraps the current pusher into a function that records a span during
|
||||
// pusher execution.
|
||||
func (p otlpTraceDataPusher) withSpan(spanName string) otlpTraceDataPusher {
|
||||
return func(ctx context.Context, td consumerdata.OTLPTraceData) (int, error) {
|
||||
// Start a span.
|
||||
ctx, span := trace.StartSpan(ctx, spanName)
|
||||
|
||||
// End the span after this function is done.
|
||||
defer span.End()
|
||||
|
||||
// Forward the data to the next consumer (this pusher is the next).
|
||||
droppedSpans, err := p(ctx, td)
|
||||
if span.IsRecordingEvents() {
|
||||
span.AddAttributes(
|
||||
trace.Int64Attribute(numReceivedSpansAttribute, int64(td.SpanCount())),
|
||||
trace.Int64Attribute(numDroppedSpansAttribute, int64(droppedSpans)),
|
||||
)
|
||||
if err != nil {
|
||||
span.SetStatus(errToStatus(err))
|
||||
}
|
||||
}
|
||||
// TODO: this is not ideal: it should come from the next function itself.
|
||||
// temporarily loading it from internal format. Once full switch is done
|
||||
// to new metrics will remove this.
|
||||
numSpans := td.SpanCount()
|
||||
obsreport.EndTraceDataExportOp(exporterCtx, span, numSpans, droppedSpans, err)
|
||||
return droppedSpans, err
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/open-telemetry/opentelemetry-collector/exporter"
|
||||
"github.com/open-telemetry/opentelemetry-collector/observability"
|
||||
"github.com/open-telemetry/opentelemetry-collector/observability/observabilitytest"
|
||||
"github.com/open-telemetry/opentelemetry-collector/obsreport"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -83,7 +84,7 @@ func TestTraceExporter_Default_ReturnError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTraceExporter_WithRecordMetrics(t *testing.T) {
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newPushTraceData(0, nil), WithMetrics(true))
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newPushTraceData(0, nil))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, te)
|
||||
|
||||
|
|
@ -91,7 +92,7 @@ func TestTraceExporter_WithRecordMetrics(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTraceExporter_WithRecordMetrics_NonZeroDropped(t *testing.T) {
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newPushTraceData(1, nil), WithMetrics(true))
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newPushTraceData(1, nil))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, te)
|
||||
|
||||
|
|
@ -100,7 +101,7 @@ func TestTraceExporter_WithRecordMetrics_NonZeroDropped(t *testing.T) {
|
|||
|
||||
func TestTraceExporter_WithRecordMetrics_ReturnError(t *testing.T) {
|
||||
want := errors.New("my_error")
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newPushTraceData(0, want), WithMetrics(true))
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newPushTraceData(0, want))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, te)
|
||||
|
||||
|
|
@ -108,15 +109,15 @@ func TestTraceExporter_WithRecordMetrics_ReturnError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTraceExporter_WithSpan(t *testing.T) {
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newPushTraceData(0, nil), WithTracing(true))
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newPushTraceData(0, nil))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, te)
|
||||
|
||||
checkWrapSpanForTraceExporter(t, te, nil, 0)
|
||||
checkWrapSpanForTraceExporter(t, te, nil, 1)
|
||||
}
|
||||
|
||||
func TestTraceExporter_WithSpan_NonZeroDropped(t *testing.T) {
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newPushTraceData(1, nil), WithTracing(true))
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newPushTraceData(1, nil))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, te)
|
||||
|
||||
|
|
@ -125,11 +126,11 @@ func TestTraceExporter_WithSpan_NonZeroDropped(t *testing.T) {
|
|||
|
||||
func TestTraceExporter_WithSpan_ReturnError(t *testing.T) {
|
||||
want := errors.New("my_error")
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newPushTraceData(0, want), WithTracing(true))
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newPushTraceData(0, want))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, te)
|
||||
|
||||
checkWrapSpanForTraceExporter(t, te, want, 0)
|
||||
checkWrapSpanForTraceExporter(t, te, want, 1)
|
||||
}
|
||||
|
||||
func TestTraceExporter_WithShutdown(t *testing.T) {
|
||||
|
|
@ -189,7 +190,7 @@ func generateTraceTraffic(t *testing.T, te exporter.TraceExporter, numRequests i
|
|||
}
|
||||
}
|
||||
|
||||
func checkWrapSpanForTraceExporter(t *testing.T, te exporter.TraceExporter, wantError error, droppedSpans int) {
|
||||
func checkWrapSpanForTraceExporter(t *testing.T, te exporter.TraceExporter, wantError error, numSpans int64) {
|
||||
ocSpansSaver := new(testOCTraceExporter)
|
||||
trace.RegisterExporter(ocSpansSaver)
|
||||
defer trace.UnregisterExporter(ocSpansSaver)
|
||||
|
|
@ -212,8 +213,15 @@ func checkWrapSpanForTraceExporter(t *testing.T, te exporter.TraceExporter, want
|
|||
for _, sd := range gotSpanData[:numRequests] {
|
||||
require.Equalf(t, parentSpan.SpanContext.SpanID, sd.ParentSpanID, "Exporter span not a child\nSpanData %v", sd)
|
||||
require.Equalf(t, errToStatus(wantError), sd.Status, "SpanData %v", sd)
|
||||
require.Equalf(t, int64(1), sd.Attributes[numReceivedSpansAttribute], "SpanData %v", sd)
|
||||
require.Equalf(t, int64(droppedSpans), sd.Attributes[numDroppedSpansAttribute], "SpanData %v", sd)
|
||||
|
||||
sentSpans := numSpans
|
||||
var failedToSendSpans int64
|
||||
if wantError != nil {
|
||||
sentSpans = 0
|
||||
failedToSendSpans = numSpans
|
||||
}
|
||||
require.Equalf(t, sentSpans, sd.Attributes[obsreport.SentSpansKey], "SpanData %v", sd)
|
||||
require.Equalf(t, failedToSendSpans, sd.Attributes[obsreport.FailedToSendSpansKey], "SpanData %v", sd)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -263,7 +271,7 @@ func TestOTLPTraceExporter_Default_ReturnError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOTLPTraceExporter_WithRecordMetrics(t *testing.T) {
|
||||
te, err := NewOTLPTraceExporter(fakeTraceExporterConfig, newPushOTLPTrace(0, nil), WithMetrics(true))
|
||||
te, err := NewOTLPTraceExporter(fakeTraceExporterConfig, newPushOTLPTrace(0, nil))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, te)
|
||||
|
||||
|
|
@ -271,7 +279,7 @@ func TestOTLPTraceExporter_WithRecordMetrics(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOTLPTraceExporter_WithRecordMetrics_NonZeroDropped(t *testing.T) {
|
||||
te, err := NewOTLPTraceExporter(fakeTraceExporterConfig, newPushOTLPTrace(1, nil), WithMetrics(true))
|
||||
te, err := NewOTLPTraceExporter(fakeTraceExporterConfig, newPushOTLPTrace(1, nil))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, te)
|
||||
|
||||
|
|
@ -280,7 +288,7 @@ func TestOTLPTraceExporter_WithRecordMetrics_NonZeroDropped(t *testing.T) {
|
|||
|
||||
func TestOTLPTraceExporter_WithRecordMetrics_ReturnError(t *testing.T) {
|
||||
want := errors.New("my_error")
|
||||
te, err := NewOTLPTraceExporter(fakeTraceExporterConfig, newPushOTLPTrace(0, want), WithMetrics(true))
|
||||
te, err := NewOTLPTraceExporter(fakeTraceExporterConfig, newPushOTLPTrace(0, want))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, te)
|
||||
|
||||
|
|
@ -288,15 +296,15 @@ func TestOTLPTraceExporter_WithRecordMetrics_ReturnError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestOTLPTraceExporter_WithSpan(t *testing.T) {
|
||||
te, err := NewOTLPTraceExporter(fakeTraceExporterConfig, newPushOTLPTrace(0, nil), WithTracing(true))
|
||||
te, err := NewOTLPTraceExporter(fakeTraceExporterConfig, newPushOTLPTrace(0, nil))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, te)
|
||||
|
||||
checkWrapSpanForOTLPTraceExporter(t, te, nil, 0)
|
||||
checkWrapSpanForOTLPTraceExporter(t, te, nil, 1)
|
||||
}
|
||||
|
||||
func TestOTLPTraceExporter_WithSpan_NonZeroDropped(t *testing.T) {
|
||||
te, err := NewOTLPTraceExporter(fakeTraceExporterConfig, newPushOTLPTrace(1, nil), WithTracing(true))
|
||||
te, err := NewOTLPTraceExporter(fakeTraceExporterConfig, newPushOTLPTrace(1, nil))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, te)
|
||||
|
||||
|
|
@ -305,11 +313,11 @@ func TestOTLPTraceExporter_WithSpan_NonZeroDropped(t *testing.T) {
|
|||
|
||||
func TestOTLPTraceExporter_WithSpan_ReturnError(t *testing.T) {
|
||||
want := errors.New("my_error")
|
||||
te, err := NewOTLPTraceExporter(fakeTraceExporterConfig, newPushOTLPTrace(0, want), WithTracing(true))
|
||||
te, err := NewOTLPTraceExporter(fakeTraceExporterConfig, newPushOTLPTrace(0, want))
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, te)
|
||||
|
||||
checkWrapSpanForOTLPTraceExporter(t, te, want, 0)
|
||||
checkWrapSpanForOTLPTraceExporter(t, te, want, 1)
|
||||
}
|
||||
|
||||
func TestOTLPTraceExporter_WithShutdown(t *testing.T) {
|
||||
|
|
@ -369,7 +377,7 @@ func generateOTLPTraceTraffic(t *testing.T, te exporter.OTLPTraceExporter, numRe
|
|||
}
|
||||
}
|
||||
|
||||
func checkWrapSpanForOTLPTraceExporter(t *testing.T, te exporter.OTLPTraceExporter, wantError error, droppedSpans int) {
|
||||
func checkWrapSpanForOTLPTraceExporter(t *testing.T, te exporter.OTLPTraceExporter, wantError error, numSpans int64) {
|
||||
ocSpansSaver := new(testOCTraceExporter)
|
||||
trace.RegisterExporter(ocSpansSaver)
|
||||
defer trace.UnregisterExporter(ocSpansSaver)
|
||||
|
|
@ -392,7 +400,15 @@ func checkWrapSpanForOTLPTraceExporter(t *testing.T, te exporter.OTLPTraceExport
|
|||
for _, sd := range gotSpanData[:numRequests] {
|
||||
require.Equalf(t, parentSpan.SpanContext.SpanID, sd.ParentSpanID, "Exporter span not a child\nSpanData %v", sd)
|
||||
require.Equalf(t, errToStatus(wantError), sd.Status, "SpanData %v", sd)
|
||||
require.Equalf(t, int64(1), sd.Attributes[numReceivedSpansAttribute], "SpanData %v", sd)
|
||||
require.Equalf(t, int64(droppedSpans), sd.Attributes[numDroppedSpansAttribute], "SpanData %v", sd)
|
||||
|
||||
sentSpans := numSpans
|
||||
var failedToSendSpans int64
|
||||
if wantError != nil {
|
||||
sentSpans = 0
|
||||
failedToSendSpans = numSpans
|
||||
}
|
||||
|
||||
require.Equalf(t, sentSpans, sd.Attributes[obsreport.SentSpansKey], "SpanData %v", sd)
|
||||
require.Equalf(t, failedToSendSpans, sd.Attributes[obsreport.FailedToSendSpansKey], "SpanData %v", sd)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,9 +52,7 @@ func New(config *Config) (exporter.TraceExporter, error) {
|
|||
|
||||
exp, err := exporterhelper.NewTraceExporter(
|
||||
config,
|
||||
s.pushTraceData,
|
||||
exporterhelper.WithTracing(true),
|
||||
exporterhelper.WithMetrics(true))
|
||||
s.pushTraceData)
|
||||
|
||||
return exp, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -63,9 +63,7 @@ func New(
|
|||
|
||||
exp, err := exporterhelper.NewTraceExporter(
|
||||
config,
|
||||
s.pushTraceData,
|
||||
exporterhelper.WithTracing(true),
|
||||
exporterhelper.WithMetrics(true))
|
||||
s.pushTraceData)
|
||||
|
||||
return exp, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,8 +37,6 @@ func NewTraceExporter(config configmodels.Exporter, logger *zap.Logger) (exporte
|
|||
// TODO: Add ability to record the received data
|
||||
return 0, nil
|
||||
},
|
||||
exporterhelper.WithTracing(true),
|
||||
exporterhelper.WithMetrics(true),
|
||||
exporterhelper.WithShutdown(logger.Sync),
|
||||
)
|
||||
}
|
||||
|
|
@ -55,8 +53,6 @@ func NewMetricsExporter(config configmodels.Exporter, logger *zap.Logger) (expor
|
|||
// TODO: Add ability to record the received data
|
||||
return 0, nil
|
||||
},
|
||||
exporterhelper.WithTracing(true),
|
||||
exporterhelper.WithMetrics(true),
|
||||
exporterhelper.WithShutdown(logger.Sync),
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,8 +70,6 @@ func NewTraceExporter(logger *zap.Logger, config configmodels.Exporter, opts ...
|
|||
oexp, err := exporterhelper.NewTraceExporter(
|
||||
config,
|
||||
oce.PushTraceData,
|
||||
exporterhelper.WithTracing(true),
|
||||
exporterhelper.WithMetrics(true),
|
||||
exporterhelper.WithShutdown(oce.Shutdown))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -113,8 +111,6 @@ func NewMetricsExporter(logger *zap.Logger, config configmodels.Exporter, opts .
|
|||
oexp, err := exporterhelper.NewMetricsExporter(
|
||||
config,
|
||||
oce.PushMetricsData,
|
||||
exporterhelper.WithTracing(true),
|
||||
exporterhelper.WithMetrics(true),
|
||||
exporterhelper.WithShutdown(oce.Shutdown))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -68,9 +68,7 @@ func NewTraceExporter(logger *zap.Logger, config configmodels.Exporter) (exporte
|
|||
}
|
||||
zexp, err := exporterhelper.NewTraceExporter(
|
||||
config,
|
||||
ze.PushTraceData,
|
||||
exporterhelper.WithTracing(true),
|
||||
exporterhelper.WithMetrics(true))
|
||||
ze.PushTraceData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,9 +68,12 @@ var (
|
|||
// StartTraceDataExportOp is called at the start of an Export operation.
|
||||
// The returned context should be used in other calls to the obsreport functions
|
||||
// dealing with the same export operation.
|
||||
func StartTraceDataExportOp(ctx context.Context, exporter string) (context.Context, *trace.Span) {
|
||||
func StartTraceDataExportOp(
|
||||
operationCtx context.Context,
|
||||
exporter string,
|
||||
) (context.Context, *trace.Span) {
|
||||
return traceExportDataOp(
|
||||
exporterContext(ctx, exporter),
|
||||
operationCtx,
|
||||
exporter,
|
||||
exportTraceDataOperationSuffix)
|
||||
}
|
||||
|
|
@ -78,7 +81,7 @@ func StartTraceDataExportOp(ctx context.Context, exporter string) (context.Conte
|
|||
// EndTraceDataExportOp completes the export operation that was started with
|
||||
// StartTraceDataExportOp.
|
||||
func EndTraceDataExportOp(
|
||||
ctx context.Context,
|
||||
exporterCtx context.Context,
|
||||
span *trace.Span,
|
||||
numExportedSpans int,
|
||||
numDroppedSpans int, // TODO: For legacy measurements, to be removed in the future.
|
||||
|
|
@ -86,11 +89,11 @@ func EndTraceDataExportOp(
|
|||
) {
|
||||
if useLegacy {
|
||||
observability.RecordMetricsForTraceExporter(
|
||||
ctx, numExportedSpans, numDroppedSpans)
|
||||
exporterCtx, numExportedSpans, numDroppedSpans)
|
||||
}
|
||||
|
||||
endExportOp(
|
||||
ctx,
|
||||
exporterCtx,
|
||||
span,
|
||||
numExportedSpans,
|
||||
err,
|
||||
|
|
@ -102,11 +105,11 @@ func EndTraceDataExportOp(
|
|||
// The returned context should be used in other calls to the obsreport functions
|
||||
// dealing with the same export operation.
|
||||
func StartMetricsExportOp(
|
||||
ctx context.Context,
|
||||
operationCtx context.Context,
|
||||
exporter string,
|
||||
) (context.Context, *trace.Span) {
|
||||
return traceExportDataOp(
|
||||
exporterContext(ctx, exporter),
|
||||
operationCtx,
|
||||
exporter,
|
||||
exportMetricsOperationSuffix)
|
||||
}
|
||||
|
|
@ -114,7 +117,7 @@ func StartMetricsExportOp(
|
|||
// EndMetricsExportOp completes the export operation that was started with
|
||||
// StartMetricsExportOp.
|
||||
func EndMetricsExportOp(
|
||||
ctx context.Context,
|
||||
exporterCtx context.Context,
|
||||
span *trace.Span,
|
||||
numExportedPoints int,
|
||||
numExportedTimeSeries int, // TODO: For legacy measurements, to be removed in the future.
|
||||
|
|
@ -123,11 +126,11 @@ func EndMetricsExportOp(
|
|||
) {
|
||||
if useLegacy {
|
||||
observability.RecordMetricsForMetricsExporter(
|
||||
ctx, numExportedTimeSeries, numDroppedTimeSeries)
|
||||
exporterCtx, numExportedTimeSeries, numDroppedTimeSeries)
|
||||
}
|
||||
|
||||
endExportOp(
|
||||
ctx,
|
||||
exporterCtx,
|
||||
span,
|
||||
numExportedPoints,
|
||||
err,
|
||||
|
|
@ -135,11 +138,11 @@ func EndMetricsExportOp(
|
|||
)
|
||||
}
|
||||
|
||||
// exporterContext adds the keys used when recording observability metrics to
|
||||
// ExporterContext adds the keys used when recording observability metrics to
|
||||
// the given context returning the newly created context. This context should
|
||||
// be used in related calls to the obsreport functions so metrics are properly
|
||||
// recorded.
|
||||
func exporterContext(
|
||||
func ExporterContext(
|
||||
ctx context.Context,
|
||||
exporter string,
|
||||
) context.Context {
|
||||
|
|
@ -181,22 +184,17 @@ func endExportOp(
|
|||
numFailedToSend = numExportedItems
|
||||
}
|
||||
|
||||
var sentMeasure, failedToSendMeasure *stats.Int64Measure
|
||||
var sentItemsKey, failedToSendItemsKey string
|
||||
switch dataType {
|
||||
case configmodels.TracesDataType:
|
||||
sentMeasure = mExporterSentSpans
|
||||
failedToSendMeasure = mExporterFailedToSendSpans
|
||||
sentItemsKey = SentSpansKey
|
||||
failedToSendItemsKey = FailedToSendSpansKey
|
||||
case configmodels.MetricsDataType:
|
||||
sentMeasure = mExporterSentMetricPoints
|
||||
failedToSendMeasure = mExporterFailedToSendMetricPoints
|
||||
sentItemsKey = SentMetricPointsKey
|
||||
failedToSendItemsKey = FailedToSendMetricPointsKey
|
||||
}
|
||||
|
||||
if useNew {
|
||||
var sentMeasure, failedToSendMeasure *stats.Int64Measure
|
||||
switch dataType {
|
||||
case configmodels.TracesDataType:
|
||||
sentMeasure = mExporterSentSpans
|
||||
failedToSendMeasure = mExporterFailedToSendSpans
|
||||
case configmodels.MetricsDataType:
|
||||
sentMeasure = mExporterSentMetricPoints
|
||||
failedToSendMeasure = mExporterFailedToSendMetricPoints
|
||||
}
|
||||
|
||||
stats.Record(
|
||||
exporterCtx,
|
||||
sentMeasure.M(int64(numSent)),
|
||||
|
|
@ -205,6 +203,16 @@ func endExportOp(
|
|||
|
||||
// End span according to errors.
|
||||
if span.IsRecordingEvents() {
|
||||
var sentItemsKey, failedToSendItemsKey string
|
||||
switch dataType {
|
||||
case configmodels.TracesDataType:
|
||||
sentItemsKey = SentSpansKey
|
||||
failedToSendItemsKey = FailedToSendSpansKey
|
||||
case configmodels.MetricsDataType:
|
||||
sentItemsKey = SentMetricPointsKey
|
||||
failedToSendItemsKey = FailedToSendMetricPointsKey
|
||||
}
|
||||
|
||||
span.AddAttributes(
|
||||
trace.Int64Attribute(
|
||||
sentItemsKey, int64(numSent)),
|
||||
|
|
|
|||
|
|
@ -72,13 +72,12 @@ var (
|
|||
// The returned context should be used in other calls to the obsreport functions
|
||||
// dealing with the same receive operation.
|
||||
func StartTraceDataReceiveOp(
|
||||
ctx context.Context,
|
||||
operationCtx context.Context,
|
||||
receiver string,
|
||||
transport string,
|
||||
legacyName string,
|
||||
) (context.Context, *trace.Span) {
|
||||
return traceReceiveTraceDataOp(
|
||||
receiverContext(ctx, receiver, transport, legacyName),
|
||||
operationCtx,
|
||||
receiver,
|
||||
transport,
|
||||
receiveTraceDataOperationSuffix)
|
||||
|
|
@ -87,7 +86,7 @@ func StartTraceDataReceiveOp(
|
|||
// EndTraceDataReceiveOp completes the receive operation that was started with
|
||||
// StartTraceDataReceiveOp.
|
||||
func EndTraceDataReceiveOp(
|
||||
ctx context.Context,
|
||||
receiverCtx context.Context,
|
||||
span *trace.Span,
|
||||
format string,
|
||||
numReceivedSpans int,
|
||||
|
|
@ -101,11 +100,11 @@ func EndTraceDataReceiveOp(
|
|||
numReceivedLegacy = 0
|
||||
}
|
||||
observability.RecordMetricsForTraceReceiver(
|
||||
ctx, numReceivedLegacy, numDroppedSpans)
|
||||
receiverCtx, numReceivedLegacy, numDroppedSpans)
|
||||
}
|
||||
|
||||
endReceiveOp(
|
||||
ctx,
|
||||
receiverCtx,
|
||||
span,
|
||||
format,
|
||||
numReceivedSpans,
|
||||
|
|
@ -118,13 +117,12 @@ func EndTraceDataReceiveOp(
|
|||
// The returned context should be used in other calls to the obsreport functions
|
||||
// dealing with the same receive operation.
|
||||
func StartMetricsReceiveOp(
|
||||
ctx context.Context,
|
||||
operationCtx context.Context,
|
||||
receiver string,
|
||||
transport string,
|
||||
legacyName string,
|
||||
) (context.Context, *trace.Span) {
|
||||
return traceReceiveTraceDataOp(
|
||||
receiverContext(ctx, receiver, transport, legacyName),
|
||||
operationCtx,
|
||||
receiver,
|
||||
transport,
|
||||
receiverMetricsOperationSuffix)
|
||||
|
|
@ -133,7 +131,7 @@ func StartMetricsReceiveOp(
|
|||
// EndMetricsReceiveOp completes the receive operation that was started with
|
||||
// StartMetricsReceiveOp.
|
||||
func EndMetricsReceiveOp(
|
||||
ctx context.Context,
|
||||
receiverCtx context.Context,
|
||||
span *trace.Span,
|
||||
format string,
|
||||
numReceivedPoints int,
|
||||
|
|
@ -147,11 +145,11 @@ func EndMetricsReceiveOp(
|
|||
numReceivedTimeSeries = 0
|
||||
}
|
||||
observability.RecordMetricsForMetricsReceiver(
|
||||
ctx, numReceivedTimeSeries, numDroppedTimeSeries)
|
||||
receiverCtx, numReceivedTimeSeries, numDroppedTimeSeries)
|
||||
}
|
||||
|
||||
endReceiveOp(
|
||||
ctx,
|
||||
receiverCtx,
|
||||
span,
|
||||
format,
|
||||
numReceivedPoints,
|
||||
|
|
@ -160,11 +158,11 @@ func EndMetricsReceiveOp(
|
|||
)
|
||||
}
|
||||
|
||||
// receiverContext adds the keys used when recording observability metrics to
|
||||
// ReceiverContext adds the keys used when recording observability metrics to
|
||||
// the given context returning the newly created context. This context should
|
||||
// be used in related calls to the obsreport functions so metrics are properly
|
||||
// recorded.
|
||||
func receiverContext(
|
||||
func ReceiverContext(
|
||||
ctx context.Context,
|
||||
receiver string,
|
||||
transport string,
|
||||
|
|
@ -220,22 +218,17 @@ func endReceiveOp(
|
|||
numRefused = numReceivedItems
|
||||
}
|
||||
|
||||
var acceptedMeasure, refusedMeasure *stats.Int64Measure
|
||||
var acceptedItemsKey, refusedItemsKey string
|
||||
switch dataType {
|
||||
case configmodels.TracesDataType:
|
||||
acceptedMeasure = mReceiverAcceptedSpans
|
||||
refusedMeasure = mReceiverRefusedSpans
|
||||
acceptedItemsKey = AcceptedSpansKey
|
||||
refusedItemsKey = RefusedSpansKey
|
||||
case configmodels.MetricsDataType:
|
||||
acceptedMeasure = mReceiverAcceptedMetricPoints
|
||||
refusedMeasure = mReceiverRefusedMetricPoints
|
||||
acceptedItemsKey = AcceptedMetricPointsKey
|
||||
refusedItemsKey = RefusedMetricPointsKey
|
||||
}
|
||||
|
||||
if useNew {
|
||||
var acceptedMeasure, refusedMeasure *stats.Int64Measure
|
||||
switch dataType {
|
||||
case configmodels.TracesDataType:
|
||||
acceptedMeasure = mReceiverAcceptedSpans
|
||||
refusedMeasure = mReceiverRefusedSpans
|
||||
case configmodels.MetricsDataType:
|
||||
acceptedMeasure = mReceiverAcceptedMetricPoints
|
||||
refusedMeasure = mReceiverRefusedMetricPoints
|
||||
}
|
||||
|
||||
stats.Record(
|
||||
receiverCtx,
|
||||
acceptedMeasure.M(int64(numAccepted)),
|
||||
|
|
@ -244,6 +237,16 @@ func endReceiveOp(
|
|||
|
||||
// end span according to errors
|
||||
if span.IsRecordingEvents() {
|
||||
var acceptedItemsKey, refusedItemsKey string
|
||||
switch dataType {
|
||||
case configmodels.TracesDataType:
|
||||
acceptedItemsKey = AcceptedSpansKey
|
||||
refusedItemsKey = RefusedSpansKey
|
||||
case configmodels.MetricsDataType:
|
||||
acceptedItemsKey = AcceptedMetricPointsKey
|
||||
refusedItemsKey = RefusedMetricPointsKey
|
||||
}
|
||||
|
||||
span.AddAttributes(
|
||||
trace.StringAttribute(
|
||||
FormatKey, format),
|
||||
|
|
|
|||
|
|
@ -103,14 +103,11 @@ func Test_obsreport_ReceiveTraceDataOp(t *testing.T) {
|
|||
t.Name(), trace.WithSampler(trace.AlwaysSample()))
|
||||
defer parentSpan.End()
|
||||
|
||||
receiverCtx := ReceiverContext(parentCtx, receiver, transport, legacyName)
|
||||
errs := []error{nil, errFake}
|
||||
rcvdSpans := []int{13, 42}
|
||||
for i, err := range errs {
|
||||
ctx, span := StartTraceDataReceiveOp(
|
||||
parentCtx,
|
||||
receiver,
|
||||
transport,
|
||||
legacyName)
|
||||
ctx, span := StartTraceDataReceiveOp(receiverCtx, receiver, transport)
|
||||
assert.NotNil(t, ctx)
|
||||
assert.NotNil(t, span)
|
||||
|
||||
|
|
@ -167,15 +164,12 @@ func Test_obsreport_ReceiveMetricsOp(t *testing.T) {
|
|||
t.Name(), trace.WithSampler(trace.AlwaysSample()))
|
||||
defer parentSpan.End()
|
||||
|
||||
receiverCtx := ReceiverContext(parentCtx, receiver, transport, legacyName)
|
||||
errs := []error{errFake, nil}
|
||||
rcvdMetricPts := []int{23, 29}
|
||||
rcvdTimeSeries := []int{2, 3}
|
||||
for i, err := range errs {
|
||||
ctx, span := StartMetricsReceiveOp(
|
||||
parentCtx,
|
||||
receiver,
|
||||
transport,
|
||||
legacyName)
|
||||
ctx, span := StartMetricsReceiveOp(receiverCtx, receiver, transport)
|
||||
assert.NotNil(t, ctx)
|
||||
assert.NotNil(t, span)
|
||||
|
||||
|
|
@ -240,10 +234,11 @@ func Test_obsreport_ExportTraceDataOp(t *testing.T) {
|
|||
// receiver tags, adding that to parent span.
|
||||
parentCtx = observability.ContextWithReceiverName(parentCtx, receiver)
|
||||
|
||||
exporterCtx := ExporterContext(parentCtx, exporter)
|
||||
errs := []error{nil, errFake}
|
||||
numExportedSpans := []int{22, 14}
|
||||
for i, err := range errs {
|
||||
ctx, span := StartTraceDataExportOp(parentCtx, exporter)
|
||||
ctx, span := StartTraceDataExportOp(exporterCtx, exporter)
|
||||
assert.NotNil(t, ctx)
|
||||
assert.NotNil(t, span)
|
||||
|
||||
|
|
@ -304,11 +299,12 @@ func Test_obsreport_ExportMetricsOp(t *testing.T) {
|
|||
// receiver tags, adding that to parent span.
|
||||
parentCtx = observability.ContextWithReceiverName(parentCtx, receiver)
|
||||
|
||||
exporterCtx := ExporterContext(parentCtx, exporter)
|
||||
errs := []error{nil, errFake}
|
||||
toSendMetricPts := []int{17, 23}
|
||||
toSendTimeSeries := []int{3, 5}
|
||||
for i, err := range errs {
|
||||
ctx, span := StartMetricsExportOp(parentCtx, exporter)
|
||||
ctx, span := StartMetricsExportOp(exporterCtx, exporter)
|
||||
assert.NotNil(t, ctx)
|
||||
assert.NotNil(t, span)
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/open-telemetry/opentelemetry-collector/internal/collector/telemetry"
|
||||
"github.com/open-telemetry/opentelemetry-collector/observability"
|
||||
"github.com/open-telemetry/opentelemetry-collector/obsreport"
|
||||
"github.com/open-telemetry/opentelemetry-collector/processor"
|
||||
"github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor"
|
||||
"github.com/open-telemetry/opentelemetry-collector/processor/queuedprocessor"
|
||||
|
|
@ -46,6 +46,9 @@ var (
|
|||
metricsLevelPtr *string
|
||||
metricsPortPtr *uint
|
||||
metricsPrefixPtr *string
|
||||
|
||||
useLegacyMetricsPtr *bool
|
||||
useNewMetricsPtr *bool
|
||||
)
|
||||
|
||||
type appTelemetry struct {
|
||||
|
|
@ -69,6 +72,18 @@ func telemetryFlags(flags *flag.FlagSet) {
|
|||
metricsPrefixCfg,
|
||||
"otelcol",
|
||||
"Prefix to the metrics generated by the collector.")
|
||||
|
||||
useLegacyMetricsPtr = flags.Bool(
|
||||
"legacy-metrics",
|
||||
true,
|
||||
"Flag to control usage of legacy metrics",
|
||||
)
|
||||
|
||||
useNewMetricsPtr = flags.Bool(
|
||||
"new-metrics",
|
||||
false,
|
||||
"Flag to control usage of new metrics",
|
||||
)
|
||||
}
|
||||
|
||||
func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes uint64, logger *zap.Logger) error {
|
||||
|
|
@ -84,9 +99,9 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u
|
|||
port := int(*metricsPortPtr)
|
||||
|
||||
views := processor.MetricViews(level)
|
||||
views = append(views, obsreport.Configure(*useLegacyMetricsPtr, *useNewMetricsPtr)...)
|
||||
views = append(views, queuedprocessor.MetricViews(level)...)
|
||||
views = append(views, batchprocessor.MetricViews(level)...)
|
||||
views = append(views, observability.AllViews...)
|
||||
views = append(views, tailsamplingprocessor.SamplingProcessorMetricViews(level)...)
|
||||
processMetricsViews := telemetry.NewProcessMetricsViews(ballastSizeBytes)
|
||||
views = append(views, processMetricsViews.Views()...)
|
||||
|
|
|
|||
Loading…
Reference in New Issue