Refactor processorhelper to use consumerhelper, split by signal type (#3180)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
Bogdan Drutu 2021-05-13 19:27:39 -07:00 committed by GitHub
parent 65a43fe399
commit 9a1d11aeaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 533 additions and 349 deletions

View File

@ -0,0 +1,84 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package processorhelper
import (
"context"
"errors"
"go.opencensus.io/trace"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
)
// LProcessor is a helper interface that allows avoiding implementing all functions in LogsProcessor by using NewLogsProcessor.
type LProcessor interface {
// ProcessLogs is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessLogs(context.Context, pdata.Logs) (pdata.Logs, error)
}
type logProcessor struct {
component.Component
consumer.Logs
}
// NewLogsProcessor creates a LogsProcessor that ensure context propagation and the right tags are set.
// TODO: Add observability metrics support
func NewLogsProcessor(
cfg config.Processor,
nextConsumer consumer.Logs,
processor LProcessor,
options ...Option,
) (component.LogsProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
}
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}
traceAttributes := spanAttributes(cfg.ID())
bs := fromOptions(options)
logsConsumer, err := consumerhelper.NewLogs(func(ctx context.Context, ld pdata.Logs) error {
span := trace.FromContext(ctx)
span.Annotate(traceAttributes, "Start processing.")
var err error
ld, err = processor.ProcessLogs(ctx, ld)
span.Annotate(traceAttributes, "End processing.")
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
return nil
}
return err
}
return nextConsumer.ConsumeLogs(ctx, ld)
}, bs.consumerOptions...)
if err != nil {
return nil, err
}
return &logProcessor{
Component: componenthelper.New(bs.componentOptions...),
Logs: logsConsumer,
}, nil
}

View File

@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package processorhelper
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/testdata"
)
var testLogsCfg = config.NewProcessorSettings(config.NewID(typeStr))
func TestNewLogsProcessor(t *testing.T) {
lp, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil))
require.NoError(t, err)
assert.True(t, lp.Capabilities().MutatesData)
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, lp.ConsumeLogs(context.Background(), testdata.GenerateLogDataEmpty()))
assert.NoError(t, lp.Shutdown(context.Background()))
}
func TestNewLogsProcessor_WithOptions(t *testing.T) {
want := errors.New("my_error")
lp, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil),
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithCapabilities(consumer.Capabilities{MutatesData: false}))
assert.NoError(t, err)
assert.Equal(t, want, lp.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, want, lp.Shutdown(context.Background()))
assert.False(t, lp.Capabilities().MutatesData)
}
func TestNewLogsProcessor_NilRequiredFields(t *testing.T) {
_, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), nil)
assert.Error(t, err)
_, err = NewLogsProcessor(&testLogsCfg, nil, newTestLProcessor(nil))
assert.Equal(t, componenterror.ErrNilNextConsumer, err)
}
func TestNewLogsProcessor_ProcessLogError(t *testing.T) {
want := errors.New("my_error")
lp, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), newTestLProcessor(want))
require.NoError(t, err)
assert.Equal(t, want, lp.ConsumeLogs(context.Background(), testdata.GenerateLogDataEmpty()))
}
func TestNewLogsProcessor_ProcessLogsErrSkipProcessingData(t *testing.T) {
lp, err := NewLogsProcessor(&testLogsCfg, consumertest.NewNop(), newTestLProcessor(ErrSkipProcessingData))
require.NoError(t, err)
assert.Equal(t, nil, lp.ConsumeLogs(context.Background(), testdata.GenerateLogDataEmpty()))
}
type testLProcessor struct {
retError error
}
func newTestLProcessor(retError error) LProcessor {
return &testLProcessor{retError: retError}
}
func (tlp *testLProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
return ld, tlp.retError
}

View File

@ -0,0 +1,84 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package processorhelper
import (
"context"
"errors"
"go.opencensus.io/trace"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
)
// MProcessor is a helper interface that allows avoiding implementing all functions in MetricsProcessor by using NewTracesProcessor.
type MProcessor interface {
// ProcessMetrics is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessMetrics(context.Context, pdata.Metrics) (pdata.Metrics, error)
}
type metricsProcessor struct {
component.Component
consumer.Metrics
}
// NewMetricsProcessor creates a MetricsProcessor that ensure context propagation and the right tags are set.
// TODO: Add observability metrics support
func NewMetricsProcessor(
cfg config.Processor,
nextConsumer consumer.Metrics,
processor MProcessor,
options ...Option,
) (component.MetricsProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
}
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}
traceAttributes := spanAttributes(cfg.ID())
bs := fromOptions(options)
metricsConsumer, err := consumerhelper.NewMetrics(func(ctx context.Context, md pdata.Metrics) error {
span := trace.FromContext(ctx)
span.Annotate(traceAttributes, "Start processing.")
var err error
md, err = processor.ProcessMetrics(ctx, md)
span.Annotate(traceAttributes, "End processing.")
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
return nil
}
return err
}
return nextConsumer.ConsumeMetrics(ctx, md)
}, bs.consumerOptions...)
if err != nil {
return nil, err
}
return &metricsProcessor{
Component: componenthelper.New(bs.componentOptions...),
Metrics: metricsConsumer,
}, nil
}

View File

@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package processorhelper
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/testdata"
)
var testMetricsCfg = config.NewProcessorSettings(config.NewID(typeStr))
func TestNewMetricsProcessor(t *testing.T) {
mp, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil))
require.NoError(t, err)
assert.True(t, mp.Capabilities().MutatesData)
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, mp.ConsumeMetrics(context.Background(), testdata.GenerateMetricsEmpty()))
assert.NoError(t, mp.Shutdown(context.Background()))
}
func TestNewMetricsProcessor_WithOptions(t *testing.T) {
want := errors.New("my_error")
mp, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil),
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithCapabilities(consumer.Capabilities{MutatesData: false}))
assert.NoError(t, err)
assert.Equal(t, want, mp.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, want, mp.Shutdown(context.Background()))
assert.False(t, mp.Capabilities().MutatesData)
}
func TestNewMetricsProcessor_NilRequiredFields(t *testing.T) {
_, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), nil)
assert.Error(t, err)
_, err = NewMetricsProcessor(&testMetricsCfg, nil, newTestMProcessor(nil))
assert.Equal(t, componenterror.ErrNilNextConsumer, err)
}
func TestNewMetricsProcessor_ProcessMetricsError(t *testing.T) {
want := errors.New("my_error")
mp, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), newTestMProcessor(want))
require.NoError(t, err)
assert.Equal(t, want, mp.ConsumeMetrics(context.Background(), testdata.GenerateMetricsEmpty()))
}
func TestNewMetricsProcessor_ProcessMetricsErrSkipProcessingData(t *testing.T) {
mp, err := NewMetricsProcessor(&testMetricsCfg, consumertest.NewNop(), newTestMProcessor(ErrSkipProcessingData))
require.NoError(t, err)
assert.Equal(t, nil, mp.ConsumeMetrics(context.Background(), testdata.GenerateMetricsEmpty()))
}
type testMProcessor struct {
retError error
}
func newTestMProcessor(retError error) MProcessor {
return &testMProcessor{retError: retError}
}
func (tmp *testMProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
return md, tmp.retError
}

View File

@ -15,17 +15,14 @@
package processorhelper
import (
"context"
"errors"
"go.opencensus.io/trace"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/obsreport"
)
@ -34,27 +31,6 @@ import (
// to stop further processing without propagating an error back up the pipeline to logs.
var ErrSkipProcessingData = errors.New("sentinel error to skip processing data from the remainder of the pipeline")
// TProcessor is a helper interface that allows avoiding implementing all functions in TracesProcessor by using NewTracesProcessor.
type TProcessor interface {
// ProcessTraces is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessTraces(context.Context, pdata.Traces) (pdata.Traces, error)
}
// MProcessor is a helper interface that allows avoiding implementing all functions in MetricsProcessor by using NewTracesProcessor.
type MProcessor interface {
// ProcessMetrics is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessMetrics(context.Context, pdata.Metrics) (pdata.Metrics, error)
}
// LProcessor is a helper interface that allows avoiding implementing all functions in LogsProcessor by using NewLogsProcessor.
type LProcessor interface {
// ProcessLogs is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessLogs(context.Context, pdata.Logs) (pdata.Logs, error)
}
// Option apply changes to internalOptions.
type Option func(*baseSettings)
@ -78,20 +54,20 @@ func WithShutdown(shutdown componenthelper.ShutdownFunc) Option {
// The default GetCapabilities function returns mutable capabilities.
func WithCapabilities(capabilities consumer.Capabilities) Option {
return func(o *baseSettings) {
o.capabilities = capabilities
o.consumerOptions = append(o.consumerOptions, consumerhelper.WithCapabilities(capabilities))
}
}
type baseSettings struct {
componentOptions []componenthelper.Option
capabilities consumer.Capabilities
consumerOptions []consumerhelper.Option
}
// fromOptions returns the internal settings starting from the default and applying all options.
func fromOptions(options []Option) *baseSettings {
// Start from the default options:
opts := &baseSettings{
capabilities: consumer.Capabilities{MutatesData: true},
consumerOptions: []consumerhelper.Option{consumerhelper.WithCapabilities(consumer.Capabilities{MutatesData: true})},
}
for _, op := range options {
@ -101,159 +77,8 @@ func fromOptions(options []Option) *baseSettings {
return opts
}
// internalOptions contains internalOptions concerning how an Processor is configured.
type baseProcessor struct {
component.Component
capabilities consumer.Capabilities
traceAttributes []trace.Attribute
}
// Construct the internalOptions from multiple Option.
func newBaseProcessor(id config.ComponentID, options ...Option) baseProcessor {
bs := fromOptions(options)
be := baseProcessor{
Component: componenthelper.New(bs.componentOptions...),
capabilities: bs.capabilities,
traceAttributes: []trace.Attribute{
trace.StringAttribute(obsreport.ProcessorKey, id.String()),
},
func spanAttributes(id config.ComponentID) []trace.Attribute {
return []trace.Attribute{
trace.StringAttribute(obsreport.ProcessorKey, id.String()),
}
return be
}
func (bp *baseProcessor) Capabilities() consumer.Capabilities {
return bp.capabilities
}
type tracesProcessor struct {
baseProcessor
processor TProcessor
nextConsumer consumer.Traces
}
func (tp *tracesProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
span := trace.FromContext(ctx)
span.Annotate(tp.traceAttributes, "Start processing.")
var err error
td, err = tp.processor.ProcessTraces(ctx, td)
span.Annotate(tp.traceAttributes, "End processing.")
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
return nil
}
return err
}
return tp.nextConsumer.ConsumeTraces(ctx, td)
}
// NewTracesProcessor creates a TracesProcessor that ensure context propagation and the right tags are set.
// TODO: Add observability metrics support
func NewTracesProcessor(
cfg config.Processor,
nextConsumer consumer.Traces,
processor TProcessor,
options ...Option,
) (component.TracesProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
}
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}
return &tracesProcessor{
baseProcessor: newBaseProcessor(cfg.ID(), options...),
processor: processor,
nextConsumer: nextConsumer,
}, nil
}
type metricsProcessor struct {
baseProcessor
processor MProcessor
nextConsumer consumer.Metrics
}
func (mp *metricsProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
span := trace.FromContext(ctx)
span.Annotate(mp.traceAttributes, "Start processing.")
var err error
md, err = mp.processor.ProcessMetrics(ctx, md)
span.Annotate(mp.traceAttributes, "End processing.")
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
return nil
}
return err
}
return mp.nextConsumer.ConsumeMetrics(ctx, md)
}
// NewMetricsProcessor creates a MetricsProcessor that ensure context propagation and the right tags are set.
// TODO: Add observability metrics support
func NewMetricsProcessor(
cfg config.Processor,
nextConsumer consumer.Metrics,
processor MProcessor,
options ...Option,
) (component.MetricsProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
}
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}
return &metricsProcessor{
baseProcessor: newBaseProcessor(cfg.ID(), options...),
processor: processor,
nextConsumer: nextConsumer,
}, nil
}
type logProcessor struct {
baseProcessor
processor LProcessor
nextConsumer consumer.Logs
}
func (lp *logProcessor) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
span := trace.FromContext(ctx)
span.Annotate(lp.traceAttributes, "Start processing.")
var err error
ld, err = lp.processor.ProcessLogs(ctx, ld)
span.Annotate(lp.traceAttributes, "End processing.")
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
return nil
}
return err
}
return lp.nextConsumer.ConsumeLogs(ctx, ld)
}
// NewLogsProcessor creates a LogsProcessor that ensure context propagation and the right tags are set.
// TODO: Add observability metrics support
func NewLogsProcessor(
cfg config.Processor,
nextConsumer consumer.Logs,
processor LProcessor,
options ...Option,
) (component.LogsProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
}
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}
return &logProcessor{
baseProcessor: newBaseProcessor(cfg.ID(), options...),
processor: processor,
nextConsumer: nextConsumer,
}, nil
}

View File

@ -1,167 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package processorhelper
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/testdata"
)
var testCfg = config.NewProcessorSettings(config.NewID(typeStr))
func TestDefaultOptions(t *testing.T) {
bp := newBaseProcessor(config.NewID(typeStr))
assert.True(t, bp.Capabilities().MutatesData)
assert.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, bp.Shutdown(context.Background()))
}
func TestWithOptions(t *testing.T) {
want := errors.New("my_error")
bp := newBaseProcessor(config.NewID(typeStr),
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithCapabilities(consumer.Capabilities{MutatesData: false}))
assert.Equal(t, want, bp.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, want, bp.Shutdown(context.Background()))
assert.False(t, bp.Capabilities().MutatesData)
}
func TestNewTracesProcessor(t *testing.T) {
me, err := NewTracesProcessor(&testCfg, consumertest.NewNop(), newTestTProcessor(nil))
require.NoError(t, err)
assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, me.ConsumeTraces(context.Background(), testdata.GenerateTraceDataEmpty()))
assert.NoError(t, me.Shutdown(context.Background()))
}
func TestNewTracesProcessor_NilRequiredFields(t *testing.T) {
_, err := NewTracesProcessor(&testCfg, consumertest.NewNop(), nil)
assert.Error(t, err)
_, err = NewTracesProcessor(&testCfg, nil, newTestTProcessor(nil))
assert.Equal(t, componenterror.ErrNilNextConsumer, err)
}
func TestNewTracesProcessor_ProcessTraceError(t *testing.T) {
want := errors.New("my_error")
me, err := NewTracesProcessor(&testCfg, consumertest.NewNop(), newTestTProcessor(want))
require.NoError(t, err)
assert.Equal(t, want, me.ConsumeTraces(context.Background(), testdata.GenerateTraceDataEmpty()))
}
func TestNewMetricsProcessor(t *testing.T) {
me, err := NewMetricsProcessor(&testCfg, consumertest.NewNop(), newTestMProcessor(nil))
require.NoError(t, err)
assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, me.ConsumeMetrics(context.Background(), testdata.GenerateMetricsEmpty()))
assert.NoError(t, me.Shutdown(context.Background()))
}
func TestNewMetricsProcessor_NilRequiredFields(t *testing.T) {
_, err := NewMetricsProcessor(&testCfg, consumertest.NewNop(), nil)
assert.Error(t, err)
_, err = NewMetricsProcessor(&testCfg, nil, newTestMProcessor(nil))
assert.Equal(t, componenterror.ErrNilNextConsumer, err)
}
func TestNewMetricsProcessor_ProcessMetricsError(t *testing.T) {
want := errors.New("my_error")
me, err := NewMetricsProcessor(&testCfg, consumertest.NewNop(), newTestMProcessor(want))
require.NoError(t, err)
assert.Equal(t, want, me.ConsumeMetrics(context.Background(), testdata.GenerateMetricsEmpty()))
}
func TestNewMetricsProcessor_ProcessMetricsErrSkipProcessingData(t *testing.T) {
me, err := NewMetricsProcessor(&testCfg, consumertest.NewNop(), newTestMProcessor(ErrSkipProcessingData))
require.NoError(t, err)
assert.Equal(t, nil, me.ConsumeMetrics(context.Background(), testdata.GenerateMetricsEmpty()))
}
func TestNewLogsProcessor(t *testing.T) {
me, err := NewLogsProcessor(&testCfg, consumertest.NewNop(), newTestLProcessor(nil))
require.NoError(t, err)
assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, me.ConsumeLogs(context.Background(), testdata.GenerateLogDataEmpty()))
assert.NoError(t, me.Shutdown(context.Background()))
}
func TestNewLogsProcessor_NilRequiredFields(t *testing.T) {
_, err := NewLogsProcessor(&testCfg, consumertest.NewNop(), nil)
assert.Error(t, err)
_, err = NewLogsProcessor(&testCfg, nil, newTestLProcessor(nil))
assert.Equal(t, componenterror.ErrNilNextConsumer, err)
}
func TestNewLogsProcessor_ProcessLogError(t *testing.T) {
want := errors.New("my_error")
me, err := NewLogsProcessor(&testCfg, consumertest.NewNop(), newTestLProcessor(want))
require.NoError(t, err)
assert.Equal(t, want, me.ConsumeLogs(context.Background(), testdata.GenerateLogDataEmpty()))
}
type testTProcessor struct {
retError error
}
func newTestTProcessor(retError error) TProcessor {
return &testTProcessor{retError: retError}
}
func (ttp *testTProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
return td, ttp.retError
}
type testMProcessor struct {
retError error
}
func newTestMProcessor(retError error) MProcessor {
return &testMProcessor{retError: retError}
}
func (tmp *testMProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics) (pdata.Metrics, error) {
return md, tmp.retError
}
type testLProcessor struct {
retError error
}
func newTestLProcessor(retError error) LProcessor {
return &testLProcessor{retError: retError}
}
func (tlp *testLProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
return ld, tlp.retError
}

View File

@ -0,0 +1,85 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package processorhelper
import (
"context"
"errors"
"go.opencensus.io/trace"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerhelper"
"go.opentelemetry.io/collector/consumer/pdata"
)
// TProcessor is a helper interface that allows avoiding implementing all functions in TracesProcessor by using NewTracesProcessor.
type TProcessor interface {
// ProcessTraces is a helper function that processes the incoming data and returns the data to be sent to the next component.
// If error is returned then returned data are ignored. It MUST not call the next component.
ProcessTraces(context.Context, pdata.Traces) (pdata.Traces, error)
}
type tracesProcessor struct {
component.Component
consumer.Traces
}
// NewTracesProcessor creates a TracesProcessor that ensure context propagation and the right tags are set.
// TODO: Add observability metrics support
func NewTracesProcessor(
cfg config.Processor,
nextConsumer consumer.Traces,
processor TProcessor,
options ...Option,
) (component.TracesProcessor, error) {
if processor == nil {
return nil, errors.New("nil processor")
}
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}
traceAttributes := spanAttributes(cfg.ID())
bs := fromOptions(options)
traceConsumer, err := consumerhelper.NewTraces(func(ctx context.Context, td pdata.Traces) error {
span := trace.FromContext(ctx)
span.Annotate(traceAttributes, "Start processing.")
var err error
td, err = processor.ProcessTraces(ctx, td)
span.Annotate(traceAttributes, "End processing.")
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
return nil
}
return err
}
return nextConsumer.ConsumeTraces(ctx, td)
}, bs.consumerOptions...)
if err != nil {
return nil, err
}
return &tracesProcessor{
Component: componenthelper.New(bs.componentOptions...),
Traces: traceConsumer,
}, nil
}

View File

@ -0,0 +1,91 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package processorhelper
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/testdata"
)
var testTracesCfg = config.NewProcessorSettings(config.NewID(typeStr))
func TestNewTracesProcessor(t *testing.T) {
tp, err := NewTracesProcessor(&testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil))
require.NoError(t, err)
assert.True(t, tp.Capabilities().MutatesData)
assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, tp.ConsumeTraces(context.Background(), testdata.GenerateTraceDataEmpty()))
assert.NoError(t, tp.Shutdown(context.Background()))
}
func TestNewTracesProcessor_WithOptions(t *testing.T) {
want := errors.New("my_error")
tp, err := NewTracesProcessor(&testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil),
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithCapabilities(consumer.Capabilities{MutatesData: false}))
assert.NoError(t, err)
assert.Equal(t, want, tp.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, want, tp.Shutdown(context.Background()))
assert.False(t, tp.Capabilities().MutatesData)
}
func TestNewTracesProcessor_NilRequiredFields(t *testing.T) {
_, err := NewTracesProcessor(&testTracesCfg, consumertest.NewNop(), nil)
assert.Error(t, err)
_, err = NewTracesProcessor(&testTracesCfg, nil, newTestTProcessor(nil))
assert.Equal(t, componenterror.ErrNilNextConsumer, err)
}
func TestNewTracesProcessor_ProcessTraceError(t *testing.T) {
want := errors.New("my_error")
tp, err := NewTracesProcessor(&testTracesCfg, consumertest.NewNop(), newTestTProcessor(want))
require.NoError(t, err)
assert.Equal(t, want, tp.ConsumeTraces(context.Background(), testdata.GenerateTraceDataEmpty()))
}
func TestNewTracesProcessor_ProcessTracesErrSkipProcessingData(t *testing.T) {
tp, err := NewTracesProcessor(&testTracesCfg, consumertest.NewNop(), newTestTProcessor(ErrSkipProcessingData))
require.NoError(t, err)
assert.Equal(t, nil, tp.ConsumeTraces(context.Background(), testdata.GenerateTraceDataEmpty()))
}
type testTProcessor struct {
retError error
}
func newTestTProcessor(retError error) TProcessor {
return &testTProcessor{retError: retError}
}
func (ttp *testTProcessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {
return td, ttp.retError
}