Use Func pattern in processorhelper, consistent with others (#3570)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
Bogdan Drutu 2021-07-13 11:35:47 -07:00 committed by GitHub
parent 56c7382b7b
commit 28091a2384
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 62 additions and 94 deletions

View File

@ -39,8 +39,7 @@ func newLogAttributesProcessor(attrProc *processorhelper.AttrProc, include, excl
}
}
// ProcessLogs implements the LogsProcessor
func (a *logAttributesProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
func (a *logAttributesProcessor) processLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rs := rls.At(i)

View File

@ -39,8 +39,7 @@ func newSpanAttributesProcessor(attrProc *processorhelper.AttrProc, include, exc
}
}
// ProcessTraces implements the TProcessor
func (a *spanAttributesProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
func (a *spanAttributesProcessor) processTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)

View File

@ -75,7 +75,7 @@ func createTracesProcessor(
return processorhelper.NewTracesProcessor(
cfg,
nextConsumer,
newSpanAttributesProcessor(attrProc, include, exclude),
newSpanAttributesProcessor(attrProc, include, exclude).processTraces,
processorhelper.WithCapabilities(processorCapabilities))
}
@ -105,6 +105,6 @@ func createLogProcessor(
return processorhelper.NewLogsProcessor(
cfg,
nextConsumer,
newLogAttributesProcessor(attrProc, include, exclude),
newLogAttributesProcessor(attrProc, include, exclude).processLogs,
processorhelper.WithCapabilities(processorCapabilities))
}

View File

@ -57,6 +57,6 @@ func createMetricsProcessor(
return processorhelper.NewMetricsProcessor(
cfg,
nextConsumer,
fp,
fp.processMetrics,
processorhelper.WithCapabilities(processorCapabilities))
}

View File

@ -113,8 +113,8 @@ func createMatcher(mp *filtermetric.MatchProperties) (filtermetric.Matcher, filt
return nameMatcher, attributeMatcher, err
}
// ProcessMetrics filters the given metrics based off the filterMetricProcessor's filters.
func (fmp *filterMetricProcessor) ProcessMetrics(_ context.Context, pdm pdata.Metrics) (pdata.Metrics, error) {
// processMetrics filters the given metrics based off the filterMetricProcessor's filters.
func (fmp *filterMetricProcessor) processMetrics(_ context.Context, pdm pdata.Metrics) (pdata.Metrics, error) {
pdm.ResourceMetrics().RemoveIf(func(rm pdata.ResourceMetrics) bool {
keepMetricsForResource := fmp.shouldKeepMetricsForResource(rm.Resource())
if !keepMetricsForResource {

View File

@ -61,7 +61,7 @@ func createTracesProcessor(
return processorhelper.NewTracesProcessor(
cfg,
nextConsumer,
ml,
ml.processTraces,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
}
@ -79,7 +79,7 @@ func createMetricsProcessor(
return processorhelper.NewMetricsProcessor(
cfg,
nextConsumer,
ml,
ml.processMetrics,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
}
@ -97,7 +97,7 @@ func createLogsProcessor(
return processorhelper.NewLogsProcessor(
cfg,
nextConsumer,
ml,
ml.processLogs,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
}

View File

@ -147,8 +147,7 @@ func (ml *memoryLimiter) shutdown(context.Context) error {
return nil
}
// ProcessTraces implements the TProcessor interface
func (ml *memoryLimiter) ProcessTraces(ctx context.Context, td pdata.Traces) (pdata.Traces, error) {
func (ml *memoryLimiter) processTraces(ctx context.Context, td pdata.Traces) (pdata.Traces, error) {
numSpans := td.SpanCount()
if ml.forcingDrop() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
@ -167,8 +166,7 @@ func (ml *memoryLimiter) ProcessTraces(ctx context.Context, td pdata.Traces) (pd
return td, nil
}
// ProcessMetrics implements the MProcessor interface
func (ml *memoryLimiter) ProcessMetrics(ctx context.Context, md pdata.Metrics) (pdata.Metrics, error) {
func (ml *memoryLimiter) processMetrics(ctx context.Context, md pdata.Metrics) (pdata.Metrics, error) {
numDataPoints := md.DataPointCount()
if ml.forcingDrop() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
@ -186,8 +184,7 @@ func (ml *memoryLimiter) ProcessMetrics(ctx context.Context, md pdata.Metrics) (
return md, nil
}
// ProcessLogs implements the LProcessor interface
func (ml *memoryLimiter) ProcessLogs(ctx context.Context, ld pdata.Logs) (pdata.Logs, error) {
func (ml *memoryLimiter) processLogs(ctx context.Context, ld pdata.Logs) (pdata.Logs, error) {
numRecords := ld.LogRecordCount()
if ml.forcingDrop() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"

View File

@ -122,7 +122,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) {
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
},
consumertest.NewNop(),
ml,
ml.processMetrics,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
require.NoError(t, err)
@ -193,7 +193,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) {
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
},
consumertest.NewNop(),
ml,
ml.processTraces,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
require.NoError(t, err)
@ -264,7 +264,7 @@ func TestLogMemoryPressureResponse(t *testing.T) {
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
},
consumertest.NewNop(),
ml,
ml.processLogs,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
require.NoError(t, err)

View File

@ -65,11 +65,11 @@ func newTracesProcessor(nextConsumer consumer.Traces, cfg *Config) (component.Tr
return processorhelper.NewTracesProcessor(
cfg,
nextConsumer,
tsp,
tsp.processTraces,
processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}))
}
func (tsp *tracesamplerprocessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
func (tsp *tracesamplerprocessor) processTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
td.ResourceSpans().RemoveIf(func(rs pdata.ResourceSpans) bool {
rs.InstrumentationLibrarySpans().RemoveIf(func(ils pdata.InstrumentationLibrarySpans) bool {
ils.Spans().RemoveIf(func(s pdata.Span) bool {

View File

@ -29,12 +29,9 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)
// LProcessor is a helper interface that allows avoiding implementing all functions in LogsProcessor by using NewLogsProcessor.
type LProcessor interface {
// ProcessLogs is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessLogs(context.Context, pdata.Logs) (pdata.Logs, error)
}
// ProcessLogsFunc is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
type ProcessLogsFunc func(context.Context, pdata.Logs) (pdata.Logs, error)
type logProcessor struct {
component.Component
@ -46,11 +43,11 @@ type logProcessor struct {
func NewLogsProcessor(
cfg config.Processor,
nextConsumer consumer.Logs,
processor LProcessor,
logsFunc ProcessLogsFunc,
options ...Option,
) (component.LogsProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
if logsFunc == nil {
return nil, errors.New("nil logsFunc")
}
if nextConsumer == nil {
@ -63,7 +60,7 @@ func NewLogsProcessor(
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
ld, err = processor.ProcessLogs(ctx, ld)
ld, err = logsFunc(ctx, ld)
span.AddEvent("End processing.", eventOptions)
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {

View File

@ -77,14 +77,8 @@ func TestNewLogsProcessor_ProcessLogsErrSkipProcessingData(t *testing.T) {
assert.Equal(t, nil, lp.ConsumeLogs(context.Background(), pdata.NewLogs()))
}
type testLProcessor struct {
retError error
}
func newTestLProcessor(retError error) LProcessor {
return &testLProcessor{retError: retError}
}
func (tlp *testLProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
return ld, tlp.retError
func newTestLProcessor(retError error) ProcessLogsFunc {
return func(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
return ld, retError
}
}

View File

@ -29,12 +29,9 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)
// MProcessor is a helper interface that allows avoiding implementing all functions in MetricsProcessor by using NewTracesProcessor.
type MProcessor interface {
// ProcessMetrics is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessMetrics(context.Context, pdata.Metrics) (pdata.Metrics, error)
}
// ProcessMetricsFunc is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
type ProcessMetricsFunc func(context.Context, pdata.Metrics) (pdata.Metrics, error)
type metricsProcessor struct {
component.Component
@ -46,11 +43,11 @@ type metricsProcessor struct {
func NewMetricsProcessor(
cfg config.Processor,
nextConsumer consumer.Metrics,
processor MProcessor,
metricsFunc ProcessMetricsFunc,
options ...Option,
) (component.MetricsProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
if metricsFunc == nil {
return nil, errors.New("nil metricsFunc")
}
if nextConsumer == nil {
@ -63,7 +60,7 @@ func NewMetricsProcessor(
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
md, err = processor.ProcessMetrics(ctx, md)
md, err = metricsFunc(ctx, md)
span.AddEvent("End processing.", eventOptions)
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {

View File

@ -77,14 +77,8 @@ func TestNewMetricsProcessor_ProcessMetricsErrSkipProcessingData(t *testing.T) {
assert.Equal(t, nil, mp.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
}
type testMProcessor struct {
retError error
}
func newTestMProcessor(retError error) MProcessor {
return &testMProcessor{retError: retError}
}
func (tmp *testMProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
return md, tmp.retError
func newTestMProcessor(retError error) ProcessMetricsFunc {
return func(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
return md, retError
}
}

View File

@ -29,12 +29,9 @@ import (
"go.opentelemetry.io/collector/model/pdata"
)
// TProcessor is a helper interface that allows avoiding implementing all functions in TracesProcessor by using NewTracesProcessor.
type TProcessor interface {
// ProcessTraces is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessTraces(context.Context, pdata.Traces) (pdata.Traces, error)
}
// ProcessTracesFunc is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
type ProcessTracesFunc func(context.Context, pdata.Traces) (pdata.Traces, error)
type tracesProcessor struct {
component.Component
@ -46,11 +43,11 @@ type tracesProcessor struct {
func NewTracesProcessor(
cfg config.Processor,
nextConsumer consumer.Traces,
processor TProcessor,
tracesFunc ProcessTracesFunc,
options ...Option,
) (component.TracesProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
if tracesFunc == nil {
return nil, errors.New("nil tracesFunc")
}
if nextConsumer == nil {
@ -63,7 +60,7 @@ func NewTracesProcessor(
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
td, err = processor.ProcessTraces(ctx, td)
td, err = tracesFunc(ctx, td)
span.AddEvent("End processing.", eventOptions)
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {

View File

@ -77,14 +77,8 @@ func TestNewTracesProcessor_ProcessTracesErrSkipProcessingData(t *testing.T) {
assert.Equal(t, nil, tp.ConsumeTraces(context.Background(), pdata.NewTraces()))
}
type testTProcessor struct {
retError error
}
func newTestTProcessor(retError error) TProcessor {
return &testTProcessor{retError: retError}
}
func (ttp *testTProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
return td, ttp.retError
func newTestTProcessor(retError error) ProcessTracesFunc {
return func(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
return td, retError
}
}

View File

@ -57,10 +57,11 @@ func createTracesProcessor(
if err != nil {
return nil, err
}
proc := &resourceProcessor{attrProc: attrProc}
return processorhelper.NewTracesProcessor(
cfg,
nextConsumer,
&resourceProcessor{attrProc: attrProc},
proc.processTraces,
processorhelper.WithCapabilities(processorCapabilities))
}
@ -73,10 +74,11 @@ func createMetricsProcessor(
if err != nil {
return nil, err
}
proc := &resourceProcessor{attrProc: attrProc}
return processorhelper.NewMetricsProcessor(
cfg,
nextConsumer,
&resourceProcessor{attrProc: attrProc},
proc.processMetrics,
processorhelper.WithCapabilities(processorCapabilities))
}
@ -89,10 +91,11 @@ func createLogsProcessor(
if err != nil {
return nil, err
}
proc := &resourceProcessor{attrProc: attrProc}
return processorhelper.NewLogsProcessor(
cfg,
nextConsumer,
&resourceProcessor{attrProc: attrProc},
proc.processLogs,
processorhelper.WithCapabilities(processorCapabilities))
}

View File

@ -25,8 +25,7 @@ type resourceProcessor struct {
attrProc *processorhelper.AttrProc
}
// ProcessTraces implements the TProcessor interface
func (rp *resourceProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
func (rp *resourceProcessor) processTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
rp.attrProc.Process(rss.At(i).Resource().Attributes())
@ -34,8 +33,7 @@ func (rp *resourceProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (
return td, nil
}
// ProcessMetrics implements the MProcessor interface
func (rp *resourceProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
func (rp *resourceProcessor) processMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rp.attrProc.Process(rms.At(i).Resource().Attributes())
@ -43,8 +41,7 @@ func (rp *resourceProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics)
return md, nil
}
// ProcessLogs implements the LProcessor interface
func (rp *resourceProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
func (rp *resourceProcessor) processLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rp.attrProc.Process(rls.At(i).Resource().Attributes())

View File

@ -73,6 +73,6 @@ func createTracesProcessor(
return processorhelper.NewTracesProcessor(
cfg,
nextConsumer,
sp,
sp.processTraces,
processorhelper.WithCapabilities(processorCapabilities))
}

View File

@ -79,7 +79,7 @@ func newSpanProcessor(config Config) (*spanProcessor, error) {
return sp, nil
}
func (sp *spanProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
func (sp *spanProcessor) processTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)