Remove usage and the global funcs in obsreport (#2161)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
Bogdan Drutu 2020-11-16 15:44:09 -05:00 committed by GitHub
parent 9bf78853b2
commit 8e8cb7078a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 75 additions and 113 deletions

View File

@ -117,23 +117,23 @@ func ProcessorMetricViews(configType string, legacyViews []*view.View) []*view.V
var gProcessorObsReport = &ProcessorObsReport{level: configtelemetry.LevelNone} var gProcessorObsReport = &ProcessorObsReport{level: configtelemetry.LevelNone}
type ProcessorObsReport struct { type ProcessorObsReport struct {
level configtelemetry.Level level configtelemetry.Level
mutators []tag.Mutator
} }
// WrapContext adds the keys used when recording observability metrics to func NewProcessorObsReport(level configtelemetry.Level, processorName string) *ProcessorObsReport {
// the given context returning the newly created context. This context should return &ProcessorObsReport{
// be used in related calls to the obsreport functions so metrics are properly level: level,
// recorded. mutators: []tag.Mutator{tag.Upsert(tagKeyProcessor, processorName, tag.WithTTL(tag.TTLNoPropagation))},
func (*ProcessorObsReport) WrapContext(ctx context.Context, processor string) context.Context { }
ctx, _ = tag.New(ctx, tag.Upsert(tagKeyProcessor, processor, tag.WithTTL(tag.TTLNoPropagation)))
return ctx
} }
// TracesAccepted reports that the trace data was accepted. // TracesAccepted reports that the trace data was accepted.
func (por *ProcessorObsReport) TracesAccepted(ctx context.Context, numSpans int) { func (por *ProcessorObsReport) TracesAccepted(ctx context.Context, numSpans int) {
if por.level != configtelemetry.LevelNone { if por.level != configtelemetry.LevelNone {
stats.Record( stats.RecordWithTags(
ctx, ctx,
por.mutators,
mProcessorAcceptedSpans.M(int64(numSpans)), mProcessorAcceptedSpans.M(int64(numSpans)),
mProcessorRefusedSpans.M(0), mProcessorRefusedSpans.M(0),
mProcessorDroppedSpans.M(0), mProcessorDroppedSpans.M(0),
@ -144,8 +144,9 @@ func (por *ProcessorObsReport) TracesAccepted(ctx context.Context, numSpans int)
// TracesRefused reports that the trace data was refused. // TracesRefused reports that the trace data was refused.
func (por *ProcessorObsReport) TracesRefused(ctx context.Context, numSpans int) { func (por *ProcessorObsReport) TracesRefused(ctx context.Context, numSpans int) {
if por.level != configtelemetry.LevelNone { if por.level != configtelemetry.LevelNone {
stats.Record( stats.RecordWithTags(
ctx, ctx,
por.mutators,
mProcessorAcceptedSpans.M(0), mProcessorAcceptedSpans.M(0),
mProcessorRefusedSpans.M(int64(numSpans)), mProcessorRefusedSpans.M(int64(numSpans)),
mProcessorDroppedSpans.M(0), mProcessorDroppedSpans.M(0),
@ -156,8 +157,9 @@ func (por *ProcessorObsReport) TracesRefused(ctx context.Context, numSpans int)
// TracesDropped reports that the trace data was dropped. // TracesDropped reports that the trace data was dropped.
func (por *ProcessorObsReport) TracesDropped(ctx context.Context, numSpans int) { func (por *ProcessorObsReport) TracesDropped(ctx context.Context, numSpans int) {
if por.level != configtelemetry.LevelNone { if por.level != configtelemetry.LevelNone {
stats.Record( stats.RecordWithTags(
ctx, ctx,
por.mutators,
mProcessorAcceptedSpans.M(0), mProcessorAcceptedSpans.M(0),
mProcessorRefusedSpans.M(0), mProcessorRefusedSpans.M(0),
mProcessorDroppedSpans.M(int64(numSpans)), mProcessorDroppedSpans.M(int64(numSpans)),
@ -168,8 +170,9 @@ func (por *ProcessorObsReport) TracesDropped(ctx context.Context, numSpans int)
// MetricsAccepted reports that the metrics were accepted. // MetricsAccepted reports that the metrics were accepted.
func (por *ProcessorObsReport) MetricsAccepted(ctx context.Context, numPoints int) { func (por *ProcessorObsReport) MetricsAccepted(ctx context.Context, numPoints int) {
if por.level != configtelemetry.LevelNone { if por.level != configtelemetry.LevelNone {
stats.Record( stats.RecordWithTags(
ctx, ctx,
por.mutators,
mProcessorAcceptedMetricPoints.M(int64(numPoints)), mProcessorAcceptedMetricPoints.M(int64(numPoints)),
mProcessorRefusedMetricPoints.M(0), mProcessorRefusedMetricPoints.M(0),
mProcessorDroppedMetricPoints.M(0), mProcessorDroppedMetricPoints.M(0),
@ -180,8 +183,9 @@ func (por *ProcessorObsReport) MetricsAccepted(ctx context.Context, numPoints in
// MetricsRefused reports that the metrics were refused. // MetricsRefused reports that the metrics were refused.
func (por *ProcessorObsReport) MetricsRefused(ctx context.Context, numPoints int) { func (por *ProcessorObsReport) MetricsRefused(ctx context.Context, numPoints int) {
if por.level != configtelemetry.LevelNone { if por.level != configtelemetry.LevelNone {
stats.Record( stats.RecordWithTags(
ctx, ctx,
por.mutators,
mProcessorAcceptedMetricPoints.M(0), mProcessorAcceptedMetricPoints.M(0),
mProcessorRefusedMetricPoints.M(int64(numPoints)), mProcessorRefusedMetricPoints.M(int64(numPoints)),
mProcessorDroppedMetricPoints.M(0), mProcessorDroppedMetricPoints.M(0),
@ -192,8 +196,9 @@ func (por *ProcessorObsReport) MetricsRefused(ctx context.Context, numPoints int
// MetricsDropped reports that the metrics were dropped. // MetricsDropped reports that the metrics were dropped.
func (por *ProcessorObsReport) MetricsDropped(ctx context.Context, numPoints int) { func (por *ProcessorObsReport) MetricsDropped(ctx context.Context, numPoints int) {
if por.level != configtelemetry.LevelNone { if por.level != configtelemetry.LevelNone {
stats.Record( stats.RecordWithTags(
ctx, ctx,
por.mutators,
mProcessorAcceptedMetricPoints.M(0), mProcessorAcceptedMetricPoints.M(0),
mProcessorRefusedMetricPoints.M(0), mProcessorRefusedMetricPoints.M(0),
mProcessorDroppedMetricPoints.M(int64(numPoints)), mProcessorDroppedMetricPoints.M(int64(numPoints)),
@ -204,8 +209,9 @@ func (por *ProcessorObsReport) MetricsDropped(ctx context.Context, numPoints int
// LogsAccepted reports that the logs were accepted. // LogsAccepted reports that the logs were accepted.
func (por *ProcessorObsReport) LogsAccepted(ctx context.Context, numRecords int) { func (por *ProcessorObsReport) LogsAccepted(ctx context.Context, numRecords int) {
if por.level != configtelemetry.LevelNone { if por.level != configtelemetry.LevelNone {
stats.Record( stats.RecordWithTags(
ctx, ctx,
por.mutators,
mProcessorAcceptedLogRecords.M(int64(numRecords)), mProcessorAcceptedLogRecords.M(int64(numRecords)),
mProcessorRefusedLogRecords.M(0), mProcessorRefusedLogRecords.M(0),
mProcessorDroppedLogRecords.M(0), mProcessorDroppedLogRecords.M(0),
@ -216,8 +222,9 @@ func (por *ProcessorObsReport) LogsAccepted(ctx context.Context, numRecords int)
// LogsRefused reports that the logs were refused. // LogsRefused reports that the logs were refused.
func (por *ProcessorObsReport) LogsRefused(ctx context.Context, numRecords int) { func (por *ProcessorObsReport) LogsRefused(ctx context.Context, numRecords int) {
if por.level != configtelemetry.LevelNone { if por.level != configtelemetry.LevelNone {
stats.Record( stats.RecordWithTags(
ctx, ctx,
por.mutators,
mProcessorAcceptedLogRecords.M(0), mProcessorAcceptedLogRecords.M(0),
mProcessorRefusedLogRecords.M(int64(numRecords)), mProcessorRefusedLogRecords.M(int64(numRecords)),
mProcessorDroppedMetricPoints.M(0), mProcessorDroppedMetricPoints.M(0),
@ -228,64 +235,12 @@ func (por *ProcessorObsReport) LogsRefused(ctx context.Context, numRecords int)
// LogsDropped reports that the logs were dropped. // LogsDropped reports that the logs were dropped.
func (por *ProcessorObsReport) LogsDropped(ctx context.Context, numRecords int) { func (por *ProcessorObsReport) LogsDropped(ctx context.Context, numRecords int) {
if por.level != configtelemetry.LevelNone { if por.level != configtelemetry.LevelNone {
stats.Record( stats.RecordWithTags(
ctx, ctx,
por.mutators,
mProcessorAcceptedLogRecords.M(0), mProcessorAcceptedLogRecords.M(0),
mProcessorRefusedLogRecords.M(0), mProcessorRefusedLogRecords.M(0),
mProcessorDroppedLogRecords.M(int64(numRecords)), mProcessorDroppedLogRecords.M(int64(numRecords)),
) )
} }
} }
// ProcessorContext adds the keys used when recording observability metrics to
// the given context returning the newly created context. This context should
// be used in related calls to the obsreport functions so metrics are properly
// recorded.
func ProcessorContext(ctx context.Context, processor string) context.Context {
return gProcessorObsReport.WrapContext(ctx, processor)
}
// ProcessorTraceDataAccepted reports that the trace data was accepted.
func ProcessorTraceDataAccepted(ctx context.Context, numSpans int) {
gProcessorObsReport.TracesAccepted(ctx, numSpans)
}
// ProcessorTraceDataRefused reports that the trace data was refused.
func ProcessorTraceDataRefused(ctx context.Context, numSpans int) {
gProcessorObsReport.TracesRefused(ctx, numSpans)
}
// ProcessorTraceDataDropped reports that the trace data was dropped.
func ProcessorTraceDataDropped(ctx context.Context, numSpans int) {
gProcessorObsReport.TracesDropped(ctx, numSpans)
}
// ProcessorMetricsDataAccepted reports that the metrics were accepted.
func ProcessorMetricsDataAccepted(ctx context.Context, numPoints int) {
gProcessorObsReport.MetricsAccepted(ctx, numPoints)
}
// ProcessorMetricsDataRefused reports that the metrics were refused.
func ProcessorMetricsDataRefused(ctx context.Context, numPoints int) {
gProcessorObsReport.MetricsRefused(ctx, numPoints)
}
// ProcessorMetricsDataDropped reports that the metrics were dropped.
func ProcessorMetricsDataDropped(ctx context.Context, numPoints int) {
gProcessorObsReport.MetricsDropped(ctx, numPoints)
}
// ProcessorLogRecordsAccepted reports that the metrics were accepted.
func ProcessorLogRecordsAccepted(ctx context.Context, numRecords int) {
gProcessorObsReport.LogsAccepted(ctx, numRecords)
}
// ProcessorLogRecordsRefused reports that the metrics were refused.
func ProcessorLogRecordsRefused(ctx context.Context, numRecords int) {
gProcessorObsReport.LogsRefused(ctx, numRecords)
}
// ProcessorLogRecordsDropped reports that the metrics were dropped.
func ProcessorLogRecordsDropped(ctx context.Context, numRecords int) {
gProcessorObsReport.LogsDropped(ctx, numRecords)
}

View File

@ -501,11 +501,10 @@ func TestProcessorTraceData(t *testing.T) {
const refusedSpans = 19 const refusedSpans = 19
const droppedSpans = 13 const droppedSpans = 13
processorCtx := obsreport.ProcessorContext(context.Background(), processor) obsrep := obsreport.NewProcessorObsReport(configtelemetry.LevelNormal, processor)
obsrep.TracesAccepted(context.Background(), acceptedSpans)
obsreport.ProcessorTraceDataAccepted(processorCtx, acceptedSpans) obsrep.TracesRefused(context.Background(), refusedSpans)
obsreport.ProcessorTraceDataRefused(processorCtx, refusedSpans) obsrep.TracesDropped(context.Background(), droppedSpans)
obsreport.ProcessorTraceDataDropped(processorCtx, droppedSpans)
obsreporttest.CheckProcessorTracesViews(t, processor, acceptedSpans, refusedSpans, droppedSpans) obsreporttest.CheckProcessorTracesViews(t, processor, acceptedSpans, refusedSpans, droppedSpans)
} }
@ -519,10 +518,10 @@ func TestProcessorMetricsData(t *testing.T) {
const refusedPoints = 11 const refusedPoints = 11
const droppedPoints = 17 const droppedPoints = 17
processorCtx := obsreport.ProcessorContext(context.Background(), processor) obsrep := obsreport.NewProcessorObsReport(configtelemetry.LevelNormal, processor)
obsreport.ProcessorMetricsDataAccepted(processorCtx, acceptedPoints) obsrep.MetricsAccepted(context.Background(), acceptedPoints)
obsreport.ProcessorMetricsDataRefused(processorCtx, refusedPoints) obsrep.MetricsRefused(context.Background(), refusedPoints)
obsreport.ProcessorMetricsDataDropped(processorCtx, droppedPoints) obsrep.MetricsDropped(context.Background(), droppedPoints)
obsreporttest.CheckProcessorMetricsViews(t, processor, acceptedPoints, refusedPoints, droppedPoints) obsreporttest.CheckProcessorMetricsViews(t, processor, acceptedPoints, refusedPoints, droppedPoints)
} }
@ -590,10 +589,10 @@ func TestProcessorLogRecords(t *testing.T) {
const refusedRecords = 11 const refusedRecords = 11
const droppedRecords = 17 const droppedRecords = 17
processorCtx := obsreport.ProcessorContext(context.Background(), processor) obsrep := obsreport.NewProcessorObsReport(configtelemetry.LevelNormal, processor)
obsreport.ProcessorLogRecordsAccepted(processorCtx, acceptedRecords) obsrep.LogsAccepted(context.Background(), acceptedRecords)
obsreport.ProcessorLogRecordsRefused(processorCtx, refusedRecords) obsrep.LogsRefused(context.Background(), refusedRecords)
obsreport.ProcessorLogRecordsDropped(processorCtx, droppedRecords) obsrep.LogsDropped(context.Background(), droppedRecords)
obsreporttest.CheckProcessorLogsViews(t, processor, acceptedRecords, refusedRecords, droppedRecords) obsreporttest.CheckProcessorLogsViews(t, processor, acceptedRecords, refusedRecords, droppedRecords)
} }

View File

@ -25,6 +25,7 @@ import (
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.uber.org/zap" "go.uber.org/zap"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor"
@ -78,6 +79,8 @@ type memoryLimiter struct {
procName string procName string
logger *zap.Logger logger *zap.Logger
configMismatchedLogged bool configMismatchedLogged bool
obsrep *obsreport.ProcessorObsReport
} }
// newMemoryLimiter returns a new memorylimiter processor. // newMemoryLimiter returns a new memorylimiter processor.
@ -109,6 +112,7 @@ func newMemoryLimiter(logger *zap.Logger, cfg *Config) (*memoryLimiter, error) {
readMemStatsFn: runtime.ReadMemStats, readMemStatsFn: runtime.ReadMemStats,
procName: cfg.Name(), procName: cfg.Name(),
logger: logger, logger: logger,
obsrep: obsreport.NewProcessorObsReport(configtelemetry.GetMetricsLevelFlagValue(), cfg.Name()),
} }
ml.startMonitoring() ml.startMonitoring()
@ -152,14 +156,14 @@ func (ml *memoryLimiter) ProcessTraces(ctx context.Context, td pdata.Traces) (pd
// to a receiver (ie.: a receiver is on the call stack). For now it // to a receiver (ie.: a receiver is on the call stack). For now it
// assumes that the pipeline is properly configured and a receiver is on the // assumes that the pipeline is properly configured and a receiver is on the
// callstack. // callstack.
obsreport.ProcessorTraceDataRefused(ctx, numSpans) ml.obsrep.TracesRefused(ctx, numSpans)
return td, errForcedDrop return td, errForcedDrop
} }
// Even if the next consumer returns error record the data as accepted by // Even if the next consumer returns error record the data as accepted by
// this processor. // this processor.
obsreport.ProcessorTraceDataAccepted(ctx, numSpans) ml.obsrep.TracesAccepted(ctx, numSpans)
return td, nil return td, nil
} }
@ -172,14 +176,14 @@ func (ml *memoryLimiter) ProcessMetrics(ctx context.Context, md pdata.Metrics) (
// to a receiver (ie.: a receiver is on the call stack). For now it // to a receiver (ie.: a receiver is on the call stack). For now it
// assumes that the pipeline is properly configured and a receiver is on the // assumes that the pipeline is properly configured and a receiver is on the
// callstack. // callstack.
obsreport.ProcessorMetricsDataRefused(ctx, numDataPoints) ml.obsrep.MetricsRefused(ctx, numDataPoints)
return md, errForcedDrop return md, errForcedDrop
} }
// Even if the next consumer returns error record the data as accepted by // Even if the next consumer returns error record the data as accepted by
// this processor. // this processor.
obsreport.ProcessorMetricsDataAccepted(ctx, numDataPoints) ml.obsrep.MetricsAccepted(ctx, numDataPoints)
return md, nil return md, nil
} }
@ -192,14 +196,14 @@ func (ml *memoryLimiter) ProcessLogs(ctx context.Context, ld pdata.Logs) (pdata.
// to a receiver (ie.: a receiver is on the call stack). For now it // to a receiver (ie.: a receiver is on the call stack). For now it
// assumes that the pipeline is properly configured and a receiver is on the // assumes that the pipeline is properly configured and a receiver is on the
// callstack. // callstack.
obsreport.ProcessorLogRecordsRefused(ctx, numRecords) ml.obsrep.LogsRefused(ctx, numRecords)
return ld, errForcedDrop return ld, errForcedDrop
} }
// Even if the next consumer returns error record the data as accepted by // Even if the next consumer returns error record the data as accepted by
// this processor. // this processor.
obsreport.ProcessorLogRecordsAccepted(ctx, numRecords) ml.obsrep.LogsAccepted(ctx, numRecords)
return ld, nil return ld, nil
} }

View File

@ -25,9 +25,11 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/processor/memorylimiter/internal/iruntime" "go.opentelemetry.io/collector/processor/memorylimiter/internal/iruntime"
"go.opentelemetry.io/collector/processor/processorhelper" "go.opentelemetry.io/collector/processor/processorhelper"
) )
@ -108,6 +110,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
readMemStatsFn: func(ms *runtime.MemStats) { readMemStatsFn: func(ms *runtime.MemStats) {
ms.Alloc = currentMemAlloc ms.Alloc = currentMemAlloc
}, },
obsrep: obsreport.NewProcessorObsReport(configtelemetry.LevelNone, ""),
} }
mp, err := processorhelper.NewMetricsProcessor( mp, err := processorhelper.NewMetricsProcessor(
&Config{ &Config{
@ -177,6 +180,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
readMemStatsFn: func(ms *runtime.MemStats) { readMemStatsFn: func(ms *runtime.MemStats) {
ms.Alloc = currentMemAlloc ms.Alloc = currentMemAlloc
}, },
obsrep: obsreport.NewProcessorObsReport(configtelemetry.LevelNone, ""),
} }
tp, err := processorhelper.NewTraceProcessor( tp, err := processorhelper.NewTraceProcessor(
&Config{ &Config{
@ -246,6 +250,7 @@ func TestLogMemoryPressureResponse(t *testing.T) {
readMemStatsFn: func(ms *runtime.MemStats) { readMemStatsFn: func(ms *runtime.MemStats) {
ms.Alloc = currentMemAlloc ms.Alloc = currentMemAlloc
}, },
obsrep: obsreport.NewProcessorObsReport(configtelemetry.LevelNone, ""),
} }
lp, err := processorhelper.NewLogsProcessor( lp, err := processorhelper.NewLogsProcessor(
&Config{ &Config{

View File

@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
) )
// ErrSkipProcessingData is a sentinel value to indicate when traces or metrics should intentionally be dropped // ErrSkipProcessingData is a sentinel value to indicate when traces or metrics should intentionally be dropped
@ -134,9 +133,8 @@ type tracesProcessor struct {
} }
func (mp *tracesProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error { func (mp *tracesProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
processorCtx := obsreport.ProcessorContext(ctx, mp.fullName)
var err error var err error
td, err = mp.processor.ProcessTraces(processorCtx, td) td, err = mp.processor.ProcessTraces(ctx, td)
if err != nil { if err != nil {
return err return err
} }
@ -173,9 +171,8 @@ type metricsProcessor struct {
} }
func (mp *metricsProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { func (mp *metricsProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
processorCtx := obsreport.ProcessorContext(ctx, mp.fullName)
var err error var err error
md, err = mp.processor.ProcessMetrics(processorCtx, md) md, err = mp.processor.ProcessMetrics(ctx, md)
if err != nil { if err != nil {
if err == ErrSkipProcessingData { if err == ErrSkipProcessingData {
return nil return nil
@ -215,9 +212,8 @@ type logProcessor struct {
} }
func (lp *logProcessor) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { func (lp *logProcessor) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
processorCtx := obsreport.ProcessorContext(ctx, lp.fullName)
var err error var err error
ld, err = lp.processor.ProcessLogs(processorCtx, ld) ld, err = lp.processor.ProcessLogs(ctx, ld)
if err != nil { if err != nil {
return err return err
} }

View File

@ -28,6 +28,7 @@ import (
"go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/pdata"
@ -46,6 +47,7 @@ type queuedProcessor struct {
backoffDelay time.Duration backoffDelay time.Duration
stopCh chan struct{} stopCh chan struct{}
stopOnce sync.Once stopOnce sync.Once
obsrep *obsreport.ProcessorObsReport
} }
var _ consumer.TracesConsumer = (*queuedProcessor)(nil) var _ consumer.TracesConsumer = (*queuedProcessor)(nil)
@ -63,8 +65,9 @@ type queueItem interface {
} }
type baseQueueItem struct { type baseQueueItem struct {
ctx context.Context ctx context.Context
qt time.Time qt time.Time
obsrep *obsreport.ProcessorObsReport
} }
func (item *baseQueueItem) context() context.Context { func (item *baseQueueItem) context() context.Context {
@ -81,9 +84,9 @@ type traceQueueItem struct {
spanCountStats *processor.SpanCountStats spanCountStats *processor.SpanCountStats
} }
func newTraceQueueItem(ctx context.Context, td pdata.Traces) queueItem { func newTraceQueueItem(ctx context.Context, td pdata.Traces, obsrep *obsreport.ProcessorObsReport) queueItem {
return &traceQueueItem{ return &traceQueueItem{
baseQueueItem: baseQueueItem{ctx: ctx, qt: time.Now()}, baseQueueItem: baseQueueItem{ctx: ctx, qt: time.Now(), obsrep: obsrep},
td: td, td: td,
spanCountStats: processor.NewSpanCountStats(td), spanCountStats: processor.NewSpanCountStats(td),
} }
@ -91,18 +94,18 @@ func newTraceQueueItem(ctx context.Context, td pdata.Traces) queueItem {
func (item *traceQueueItem) onAccepted() { func (item *traceQueueItem) onAccepted() {
processor.RecordsSpanCountMetrics(item.ctx, item.spanCountStats, processor.StatReceivedSpanCount) processor.RecordsSpanCountMetrics(item.ctx, item.spanCountStats, processor.StatReceivedSpanCount)
obsreport.ProcessorTraceDataAccepted(item.ctx, item.spanCountStats.GetAllSpansCount()) item.obsrep.TracesAccepted(item.ctx, item.spanCountStats.GetAllSpansCount())
} }
func (item *traceQueueItem) onPartialError(partialErr consumererror.PartialError) queueItem { func (item *traceQueueItem) onPartialError(partialErr consumererror.PartialError) queueItem {
return newTraceQueueItem(item.ctx, partialErr.GetTraces()) return newTraceQueueItem(item.ctx, partialErr.GetTraces(), item.obsrep)
} }
func (item *traceQueueItem) onRefused(logger *zap.Logger, err error) { func (item *traceQueueItem) onRefused(logger *zap.Logger, err error) {
// Count the StatReceivedSpanCount even if items were refused. // Count the StatReceivedSpanCount even if items were refused.
processor.RecordsSpanCountMetrics(item.ctx, item.spanCountStats, processor.StatReceivedSpanCount) processor.RecordsSpanCountMetrics(item.ctx, item.spanCountStats, processor.StatReceivedSpanCount)
obsreport.ProcessorTraceDataRefused(item.ctx, item.spanCountStats.GetAllSpansCount()) item.obsrep.TracesRefused(item.ctx, item.spanCountStats.GetAllSpansCount())
// TODO: in principle this may not end in data loss because this can be // TODO: in principle this may not end in data loss because this can be
// in the same call stack as the receiver, ie.: the call from the receiver // in the same call stack as the receiver, ie.: the call from the receiver
@ -115,7 +118,7 @@ func (item *traceQueueItem) onRefused(logger *zap.Logger, err error) {
} }
func (item *traceQueueItem) onDropped(logger *zap.Logger, err error) { func (item *traceQueueItem) onDropped(logger *zap.Logger, err error) {
obsreport.ProcessorTraceDataDropped(item.ctx, item.spanCountStats.GetAllSpansCount()) item.obsrep.TracesDropped(item.ctx, item.spanCountStats.GetAllSpansCount())
stats.Record(item.ctx, processor.StatTraceBatchesDroppedCount.M(int64(1))) stats.Record(item.ctx, processor.StatTraceBatchesDroppedCount.M(int64(1)))
processor.RecordsSpanCountMetrics(item.ctx, item.spanCountStats, processor.StatDroppedSpanCount) processor.RecordsSpanCountMetrics(item.ctx, item.spanCountStats, processor.StatDroppedSpanCount)
@ -132,17 +135,17 @@ type metricsQueueItem struct {
numPoints int numPoints int
} }
func newMetricsQueueItem(ctx context.Context, md pdata.Metrics) queueItem { func newMetricsQueueItem(ctx context.Context, md pdata.Metrics, obsrep *obsreport.ProcessorObsReport) queueItem {
_, numPoints := md.MetricAndDataPointCount() _, numPoints := md.MetricAndDataPointCount()
return &metricsQueueItem{ return &metricsQueueItem{
baseQueueItem: baseQueueItem{ctx: ctx, qt: time.Now()}, baseQueueItem: baseQueueItem{ctx: ctx, qt: time.Now(), obsrep: obsrep},
md: md, md: md,
numPoints: numPoints, numPoints: numPoints,
} }
} }
func (item *metricsQueueItem) onAccepted() { func (item *metricsQueueItem) onAccepted() {
obsreport.ProcessorMetricsDataAccepted(item.ctx, item.numPoints) item.obsrep.MetricsAccepted(item.ctx, item.numPoints)
} }
func (item *metricsQueueItem) onPartialError(consumererror.PartialError) queueItem { func (item *metricsQueueItem) onPartialError(consumererror.PartialError) queueItem {
@ -151,14 +154,14 @@ func (item *metricsQueueItem) onPartialError(consumererror.PartialError) queueIt
} }
func (item *metricsQueueItem) onRefused(logger *zap.Logger, err error) { func (item *metricsQueueItem) onRefused(logger *zap.Logger, err error) {
obsreport.ProcessorMetricsDataRefused(item.ctx, item.numPoints) item.obsrep.MetricsRefused(item.ctx, item.numPoints)
logger.Error("Failed to process batch, refused", zap.Int("#points", item.numPoints), zap.Error(err)) logger.Error("Failed to process batch, refused", zap.Int("#points", item.numPoints), zap.Error(err))
} }
func (item *metricsQueueItem) onDropped(logger *zap.Logger, err error) { func (item *metricsQueueItem) onDropped(logger *zap.Logger, err error) {
stats.Record(item.ctx, processor.StatTraceBatchesDroppedCount.M(int64(1))) stats.Record(item.ctx, processor.StatTraceBatchesDroppedCount.M(int64(1)))
obsreport.ProcessorMetricsDataDropped(item.ctx, item.numPoints) item.obsrep.MetricsDropped(item.ctx, item.numPoints)
logger.Error("Failed to process batch, discarding", zap.Int("#points", item.numPoints), zap.Error(err)) logger.Error("Failed to process batch, discarding", zap.Int("#points", item.numPoints), zap.Error(err))
} }
@ -182,6 +185,7 @@ func newQueuedTracesProcessor(
retryOnProcessingFailure: cfg.RetryOnFailure, retryOnProcessingFailure: cfg.RetryOnFailure,
backoffDelay: cfg.BackoffDelay, backoffDelay: cfg.BackoffDelay,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
obsrep: obsreport.NewProcessorObsReport(configtelemetry.GetMetricsLevelFlagValue(), cfg.Name()),
} }
} }
@ -200,6 +204,7 @@ func newQueuedMetricsProcessor(
retryOnProcessingFailure: cfg.RetryOnFailure, retryOnProcessingFailure: cfg.RetryOnFailure,
backoffDelay: cfg.BackoffDelay, backoffDelay: cfg.BackoffDelay,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
obsrep: obsreport.NewProcessorObsReport(configtelemetry.GetMetricsLevelFlagValue(), cfg.Name()),
} }
} }
@ -240,8 +245,7 @@ func (sp *queuedProcessor) Start(ctx context.Context, _ component.Host) error {
// ConsumeTraces implements the TracesProcessor interface // ConsumeTraces implements the TracesProcessor interface
func (sp *queuedProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error { func (sp *queuedProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
ctx = obsreport.ProcessorContext(ctx, sp.name) item := newTraceQueueItem(ctx, td, sp.obsrep)
item := newTraceQueueItem(ctx, td)
addedToQueue := sp.queue.Produce(item) addedToQueue := sp.queue.Produce(item)
if !addedToQueue { if !addedToQueue {
@ -255,8 +259,7 @@ func (sp *queuedProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) e
// ConsumeMetrics implements the MetricsProcessor interface // ConsumeMetrics implements the MetricsProcessor interface
func (sp *queuedProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { func (sp *queuedProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
ctx = obsreport.ProcessorContext(ctx, sp.name) item := newMetricsQueueItem(ctx, md, sp.obsrep)
item := newMetricsQueueItem(ctx, md)
addedToQueue := sp.queue.Produce(item) addedToQueue := sp.queue.Produce(item)
if !addedToQueue { if !addedToQueue {