Add capabilities to consumer, remove from processor (#2770)

* Add capabilities to consumer, remove from processor

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

* Rename baseConsumer

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
Bogdan Drutu 2021-05-11 16:13:47 -07:00 committed by GitHub
parent dea2f6e19f
commit ffb332b37b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 242 additions and 141 deletions

View File

@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
)
@ -86,3 +87,7 @@ type nopExporter struct {
component.Component
consumertest.Consumer
}
func (ne *nopExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

View File

@ -93,6 +93,6 @@ type nopProcessor struct {
consumertest.Consumer
}
func (*nopProcessor) GetCapabilities() component.ProcessorCapabilities {
return component.ProcessorCapabilities{MutatesConsumedData: false}
func (*nopProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

View File

@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
)
@ -36,21 +37,21 @@ func TestNewNopProcessorFactory(t *testing.T) {
traces, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, component.ProcessorCapabilities{MutatesConsumedData: false}, traces.GetCapabilities())
assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities())
assert.NoError(t, traces.Start(context.Background(), NewNopHost()))
assert.NoError(t, traces.ConsumeTraces(context.Background(), pdata.NewTraces()))
assert.NoError(t, traces.Shutdown(context.Background()))
metrics, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, component.ProcessorCapabilities{MutatesConsumedData: false}, metrics.GetCapabilities())
assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities())
assert.NoError(t, metrics.Start(context.Background(), NewNopHost()))
assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pdata.NewMetrics()))
assert.NoError(t, metrics.Shutdown(context.Background()))
logs, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.Equal(t, component.ProcessorCapabilities{MutatesConsumedData: false}, logs.GetCapabilities())
assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities())
assert.NoError(t, logs.Start(context.Background(), NewNopHost()))
assert.NoError(t, logs.ConsumeLogs(context.Background(), pdata.NewLogs()))
assert.NoError(t, logs.Shutdown(context.Background()))

View File

@ -28,9 +28,6 @@ import (
// and MetricsProcessor.
type Processor interface {
Component
// GetCapabilities must return the capabilities of the processor.
GetCapabilities() ProcessorCapabilities
}
// TracesProcessor is a processor that can consume traces.
@ -51,16 +48,6 @@ type LogsProcessor interface {
consumer.Logs
}
// ProcessorCapabilities describes the capabilities of a Processor.
type ProcessorCapabilities struct {
// MutatesConsumedData is set to true if Consume* function of the
// processor modifies the input TraceData or MetricsData argument.
// Processors which modify the input data MUST set this flag to true. If the processor
// does not modify the data it MUST set this flag to false. If the processor creates
// a copy of the data before modifying then this flag can be safely set to false.
MutatesConsumedData bool
}
// ProcessorCreateParams is passed to Create* functions in ProcessorFactory.
type ProcessorCreateParams struct {
// Logger that the factory can use during creation and can pass to the created

View File

@ -21,9 +21,24 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)
// Capabilities describes the capabilities of a Processor.
type Capabilities struct {
// MutatesData is set to true if Consume* function of the
// processor modifies the input TraceData or MetricsData argument.
// Processors which modify the input data MUST set this flag to true. If the processor
// does not modify the data it MUST set this flag to false. If the processor creates
// a copy of the data before modifying then this flag can be safely set to false.
MutatesData bool
}
type baseConsumer interface {
Capabilities() Capabilities
}
// Metrics is the new metrics consumer interface that receives pdata.Metrics, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type Metrics interface {
baseConsumer
// ConsumeMetrics receives pdata.Metrics for consumption.
ConsumeMetrics(ctx context.Context, md pdata.Metrics) error
}
@ -31,6 +46,7 @@ type Metrics interface {
// Traces is an interface that receives pdata.Traces, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type Traces interface {
baseConsumer
// ConsumeTraces receives pdata.Traces for consumption.
ConsumeTraces(ctx context.Context, td pdata.Traces) error
}
@ -38,6 +54,7 @@ type Traces interface {
// Logs is an interface that receives pdata.Logs, processes it
// as needed, and sends it to the next processing node if any or to the destination.
type Logs interface {
baseConsumer
// ConsumeLogs receives pdata.Logs for consumption.
ConsumeLogs(ctx context.Context, ld pdata.Logs) error
}

View File

@ -0,0 +1,25 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumertest
import (
"go.opentelemetry.io/collector/consumer"
)
type nonMutatingConsumer struct{}
func (bc nonMutatingConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

View File

@ -26,6 +26,8 @@ import (
// to allow us to add extra functions without breaking compatibility because
// nobody else implements this interface.
type Consumer interface {
// Capabilities to implement the base consumer functionality.
Capabilities() consumer.Capabilities
// ConsumeTraces to implement the consumer.Traces.
ConsumeTraces(context.Context, pdata.Traces) error
// ConsumeMetrics to implement the consumer.Metrics.

View File

@ -21,6 +21,7 @@ import (
)
type errConsumer struct {
nonMutatingConsumer
err error
}

View File

@ -24,7 +24,9 @@ var (
nopInstance = &nopConsumer{}
)
type nopConsumer struct{}
type nopConsumer struct {
nonMutatingConsumer
}
func (nc *nopConsumer) unexported() {}

View File

@ -25,6 +25,7 @@ import (
// TracesSink is a consumer.Traces that acts like a sink that
// stores all traces and allows querying them for testing.
type TracesSink struct {
nonMutatingConsumer
mu sync.Mutex
traces []pdata.Traces
spansCount int
@ -72,6 +73,7 @@ func (ste *TracesSink) Reset() {
// MetricsSink is a consumer.Metrics that acts like a sink that
// stores all metrics and allows querying them for testing.
type MetricsSink struct {
nonMutatingConsumer
mu sync.Mutex
metrics []pdata.Metrics
metricsCount int
@ -119,6 +121,7 @@ func (sme *MetricsSink) Reset() {
// LogsSink is a consumer.Logs that acts like a sink that
// stores all logs and allows querying them for testing.
type LogsSink struct {
nonMutatingConsumer
mu sync.Mutex
logs []pdata.Logs
logRecordsCount int

View File

@ -36,6 +36,10 @@ type metricsCloningConsumer []consumer.Metrics
var _ consumer.Metrics = (*metricsCloningConsumer)(nil)
func (mfc metricsCloningConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}
// ConsumeMetrics exports the pdata.Metrics to all consumers wrapped by the current one.
func (mfc metricsCloningConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
var errs []error
@ -73,6 +77,10 @@ type tracesCloningConsumer []consumer.Traces
var _ consumer.Traces = (*tracesCloningConsumer)(nil)
func (tfc tracesCloningConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}
// ConsumeTraces exports the pdata.Traces to all consumers wrapped by the current one.
func (tfc tracesCloningConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
var errs []error
@ -110,6 +118,10 @@ type logsCloningConsumer []consumer.Logs
var _ consumer.Logs = (*logsCloningConsumer)(nil)
func (lfc logsCloningConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}
// ConsumeLogs exports the pdata.Logs to all consumers wrapped by the current one.
func (lfc logsCloningConsumer) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
var errs []error

View File

@ -40,6 +40,10 @@ type metricsConsumer []consumer.Metrics
var _ consumer.Metrics = (*metricsConsumer)(nil)
func (mfc metricsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// ConsumeMetrics exports the pdata.Metrics to all consumers wrapped by the current one.
func (mfc metricsConsumer) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
var errs []error
@ -64,6 +68,10 @@ type traceConsumer []consumer.Traces
var _ consumer.Traces = (*traceConsumer)(nil)
func (tfc traceConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// ConsumeTraces exports the pdata.Traces to all consumers wrapped by the current one.
func (tfc traceConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
var errs []error
@ -88,6 +96,10 @@ type logsConsumer []consumer.Logs
var _ consumer.Logs = (*logsConsumer)(nil)
func (lfc logsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// ConsumeLogs exports the pdata.Logs to all consumers wrapped by the current one.
func (lfc logsConsumer) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
var errs []error

View File

@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
@ -66,6 +67,10 @@ type logsExporter struct {
pusher PushLogs
}
func (lexp *logsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
return lexp.sender.send(newLogsRequest(ctx, ld, lexp.pusher))
}

View File

@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
@ -67,6 +68,10 @@ type metricsExporter struct {
pusher PushMetrics
}
func (mexp *metricsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
if mexp.baseExporter.convertResourceToTelemetry {
md = convertResourceToLabels(md)

View File

@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
@ -66,6 +67,10 @@ type traceExporter struct {
pusher PushTraces
}
func (texp *traceExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (texp *traceExporter) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
return texp.sender.send(newTracesRequest(ctx, td, texp.pusher))
}

View File

@ -23,6 +23,7 @@ import (
"github.com/gogo/protobuf/proto"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal"
)
@ -37,6 +38,10 @@ type fileExporter struct {
mutex sync.Mutex
}
func (e *fileExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (e *fileExporter) ConsumeTraces(_ context.Context, td pdata.Traces) error {
return exportMessageAsLine(e, internal.TracesToOtlp(td.InternalRep()))
}
@ -46,8 +51,7 @@ func (e *fileExporter) ConsumeMetrics(_ context.Context, md pdata.Metrics) error
}
func (e *fileExporter) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
request := internal.LogsToOtlp(ld.InternalRep())
return exportMessageAsLine(e, request)
return exportMessageAsLine(e, internal.LogsToOtlp(ld.InternalRep()))
}
func exportMessageAsLine(e *fileExporter, message proto.Message) error {

View File

@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
)
@ -92,6 +93,10 @@ func (pe *prometheusExporter) Start(_ context.Context, _ component.Host) error {
return nil
}
func (pe *prometheusExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (pe *prometheusExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
pe.obsrep.StartMetricsExportOp(ctx)
n := 0
@ -104,6 +109,6 @@ func (pe *prometheusExporter) ConsumeMetrics(ctx context.Context, md pdata.Metri
return nil
}
func (pe *prometheusExporter) Shutdown(_ context.Context) error {
func (pe *prometheusExporter) Shutdown(context.Context) error {
return pe.shutdownFunc()
}

View File

@ -18,6 +18,7 @@ import (
"context"
"sync"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
)
@ -36,6 +37,10 @@ func (esc *ErrOrSinkConsumer) SetConsumeError(err error) {
esc.consumeError = err
}
func (esc *ErrOrSinkConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// ConsumeTraces stores traces to this sink.
func (esc *ErrOrSinkConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
esc.mu.Lock()

View File

@ -19,6 +19,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)
@ -107,6 +108,10 @@ func (exp *ExampleExporterConsumer) ConsumeTraces(_ context.Context, td pdata.Tr
return nil
}
func (exp *ExampleExporterConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// ConsumeMetrics receives pdata.Metrics for processing by the Metrics.
func (exp *ExampleExporterConsumer) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
exp.Metrics = append(exp.Metrics, md)

View File

@ -78,6 +78,6 @@ func (ep *exampleProcessor) Shutdown(_ context.Context) error {
return nil
}
func (ep *exampleProcessor) GetCapabilities() component.ProcessorCapabilities {
return component.ProcessorCapabilities{MutatesConsumedData: false}
func (ep *exampleProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

View File

@ -62,8 +62,8 @@ From data ownership perspective pipelines can work in 2 modes:
* Shared data ownership
The mode is defined during startup based on data modification intent reported by the
processors. The intent is reported by each processor via `MutatesConsumedData` field of
the struct returned by `GetCapabilities` function. If any processor in the pipeline
processors. The intent is reported by each processor via `MutatesData` field of
the struct returned by `Capabilities` function. If any processor in the pipeline
declares an intent to modify the data then that pipeline will work in exclusive ownership
mode. In addition, any other pipeline that receives data from a receiver that is attached
to a pipeline with exclusive ownership mode will be also operating in exclusive ownership
@ -112,7 +112,7 @@ the processor can implement copy-on-write approach for individual sub-parts of
mutate the original `pdata.Traces`/`pdata.Metrics`/`pdata.Logs` is allowed.
If the processor uses such technique it should declare that it does not intend
to modify the original data by setting `MutatesConsumedData=false` in its capabilities
to modify the original data by setting `MutatesData=false` in its capabilities
to avoid marking the pipeline for Exclusive ownership and to avoid the cost of
data cloning described in Exclusive Ownership section.

View File

@ -31,7 +31,7 @@ const (
typeStr = "attributes"
)
var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: true}
var processorCapabilities = consumer.Capabilities{MutatesData: true}
// NewFactory returns a new factory for the Attributes processor.
func NewFactory() component.ProcessorFactory {

View File

@ -92,8 +92,8 @@ func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batc
}, nil
}
func (bp *batchProcessor) GetCapabilities() component.ProcessorCapabilities {
return component.ProcessorCapabilities{MutatesConsumedData: true}
func (bp *batchProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: true}
}
// Start is invoked during service startup.

View File

@ -28,7 +28,7 @@ const (
typeStr = "filter"
)
var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: true}
var processorCapabilities = consumer.Capabilities{MutatesData: true}
// NewFactory returns a new factory for the Filter processor.
func NewFactory() component.ProcessorFactory {

View File

@ -324,8 +324,8 @@ func TestFilterMetricProcessor(t *testing.T) {
assert.NotNil(t, fmp)
assert.Nil(t, err)
caps := fmp.GetCapabilities()
assert.True(t, caps.MutatesConsumedData)
caps := fmp.Capabilities()
assert.True(t, caps.MutatesData)
ctx := context.Background()
assert.NoError(t, fmp.Start(ctx, nil))

View File

@ -28,7 +28,7 @@ const (
typeStr = "memory_limiter"
)
var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: false}
var processorCapabilities = consumer.Capabilities{MutatesData: false}
// NewFactory returns a new factory for the Memory Limiter processor.
func NewFactory() component.ProcessorFactory {

View File

@ -66,7 +66,7 @@ func newTracesProcessor(nextConsumer consumer.Traces, cfg *Config) (component.Tr
cfg,
nextConsumer,
tsp,
processorhelper.WithCapabilities(component.ProcessorCapabilities{MutatesConsumedData: true}))
processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}))
}
func (tsp *tracesamplerprocessor) ProcessTraces(_ context.Context, td pdata.Traces) (pdata.Traces, error) {

View File

@ -76,7 +76,7 @@ func WithShutdown(shutdown componenthelper.ShutdownFunc) Option {
// WithCapabilities overrides the default GetCapabilities function for an processor.
// The default GetCapabilities function returns mutable capabilities.
func WithCapabilities(capabilities component.ProcessorCapabilities) Option {
func WithCapabilities(capabilities consumer.Capabilities) Option {
return func(o *baseSettings) {
o.capabilities = capabilities
}
@ -84,14 +84,14 @@ func WithCapabilities(capabilities component.ProcessorCapabilities) Option {
type baseSettings struct {
componentOptions []componenthelper.Option
capabilities component.ProcessorCapabilities
capabilities consumer.Capabilities
}
// fromOptions returns the internal settings starting from the default and applying all options.
func fromOptions(options []Option) *baseSettings {
// Start from the default options:
opts := &baseSettings{
capabilities: component.ProcessorCapabilities{MutatesConsumedData: true},
capabilities: consumer.Capabilities{MutatesData: true},
}
for _, op := range options {
@ -104,7 +104,7 @@ func fromOptions(options []Option) *baseSettings {
// internalOptions contains internalOptions concerning how an Processor is configured.
type baseProcessor struct {
component.Component
capabilities component.ProcessorCapabilities
capabilities consumer.Capabilities
traceAttributes []trace.Attribute
}
@ -122,7 +122,7 @@ func newBaseProcessor(id config.ComponentID, options ...Option) baseProcessor {
return be
}
func (bp *baseProcessor) GetCapabilities() component.ProcessorCapabilities {
func (bp *baseProcessor) Capabilities() consumer.Capabilities {
return bp.capabilities
}

View File

@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/testdata"
@ -35,7 +36,7 @@ var testCfg = config.NewProcessorSettings(config.NewID(typeStr))
func TestDefaultOptions(t *testing.T) {
bp := newBaseProcessor(config.NewID(typeStr))
assert.True(t, bp.GetCapabilities().MutatesConsumedData)
assert.True(t, bp.Capabilities().MutatesData)
assert.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, bp.Shutdown(context.Background()))
}
@ -45,10 +46,10 @@ func TestWithOptions(t *testing.T) {
bp := newBaseProcessor(config.NewID(typeStr),
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithCapabilities(component.ProcessorCapabilities{MutatesConsumedData: false}))
WithCapabilities(consumer.Capabilities{MutatesData: false}))
assert.Equal(t, want, bp.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, want, bp.Shutdown(context.Background()))
assert.False(t, bp.GetCapabilities().MutatesConsumedData)
assert.False(t, bp.Capabilities().MutatesData)
}
func TestNewTracesProcessor(t *testing.T) {

View File

@ -29,7 +29,7 @@ const (
typeStr = "resource"
)
var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: true}
var processorCapabilities = consumer.Capabilities{MutatesData: true}
// NewFactory returns a new factory for the Resource processor.
func NewFactory() component.ProcessorFactory {

View File

@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/processor/processorhelper"
@ -97,68 +98,74 @@ func TestResourceProcessorAttributesUpsert(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Test trace consumer
ttn := &testTraceConsumer{}
ttn := new(consumertest.TracesSink)
factory := NewFactory()
rtp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, tt.config, ttn)
require.NoError(t, err)
assert.True(t, rtp.GetCapabilities().MutatesConsumedData)
assert.True(t, rtp.Capabilities().MutatesData)
sourceTraceData := generateTraceData(tt.sourceAttributes)
wantTraceData := generateTraceData(tt.wantAttributes)
err = rtp.ConsumeTraces(context.Background(), sourceTraceData)
require.NoError(t, err)
assert.EqualValues(t, wantTraceData, ttn.td)
traces := ttn.AllTraces()
require.Len(t, traces, 1)
traces[0].ResourceSpans().At(0).Resource().Attributes().Sort()
assert.EqualValues(t, wantTraceData, traces[0])
// Test metrics consumer
tmn := &testMetricsConsumer{}
tmn := new(consumertest.MetricsSink)
rmp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, tt.config, tmn)
require.NoError(t, err)
assert.True(t, rtp.GetCapabilities().MutatesConsumedData)
assert.True(t, rtp.Capabilities().MutatesData)
sourceMetricData := generateMetricData(tt.sourceAttributes)
wantMetricData := generateMetricData(tt.wantAttributes)
err = rmp.ConsumeMetrics(context.Background(), sourceMetricData)
require.NoError(t, err)
assert.EqualValues(t, wantMetricData, tmn.md)
metrics := tmn.AllMetrics()
require.Len(t, metrics, 1)
metrics[0].ResourceMetrics().At(0).Resource().Attributes().Sort()
assert.EqualValues(t, wantMetricData, metrics[0])
// Test logs consumer
tln := &testLogsConsumer{}
tln := new(consumertest.LogsSink)
rlp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, tt.config, tln)
require.NoError(t, err)
assert.True(t, rtp.GetCapabilities().MutatesConsumedData)
assert.True(t, rtp.Capabilities().MutatesData)
sourceLogData := generateLogData(tt.sourceAttributes)
wantLogData := generateLogData(tt.wantAttributes)
err = rlp.ConsumeLogs(context.Background(), sourceLogData)
require.NoError(t, err)
assert.EqualValues(t, wantLogData, tln.ld)
logs := tln.AllLogs()
require.Len(t, logs, 1)
logs[0].ResourceLogs().At(0).Resource().Attributes().Sort()
assert.EqualValues(t, wantLogData, logs[0])
})
}
}
func TestResourceProcessorError(t *testing.T) {
ttn := &testTraceConsumer{}
badCfg := &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)),
AttributesActions: nil,
}
// Test traces consumer
factory := NewFactory()
rtp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, badCfg, ttn)
rtp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, badCfg, consumertest.NewNop())
require.Error(t, err)
require.Nil(t, rtp)
// Test metrics consumer
tmn := &testMetricsConsumer{}
rmp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, badCfg, tmn)
rmp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, badCfg, consumertest.NewNop())
require.Error(t, err)
require.Nil(t, rmp)
// Test logs consumer
tln := &testLogsConsumer{}
rlp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, badCfg, tln)
rlp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, badCfg, consumertest.NewNop())
require.Error(t, err)
require.Nil(t, rlp)
}
@ -201,42 +208,3 @@ func generateLogData(attributes map[string]string) pdata.Logs {
resource.Attributes().Sort()
return ld
}
type testTraceConsumer struct {
td pdata.Traces
}
func (ttn *testTraceConsumer) ConsumeTraces(_ context.Context, td pdata.Traces) error {
// sort attributes to be able to compare traces
for i := 0; i < td.ResourceSpans().Len(); i++ {
td.ResourceSpans().At(i).Resource().Attributes().Sort()
}
ttn.td = td
return nil
}
type testMetricsConsumer struct {
md pdata.Metrics
}
func (tmn *testMetricsConsumer) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
// sort attributes to be able to compare traces
for i := 0; i < md.ResourceMetrics().Len(); i++ {
md.ResourceMetrics().At(i).Resource().Attributes().Sort()
}
tmn.md = md
return nil
}
type testLogsConsumer struct {
ld pdata.Logs
}
func (tln *testLogsConsumer) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
// sort attributes to be able to compare traces
for i := 0; i < ld.ResourceLogs().Len(); i++ {
ld.ResourceLogs().At(i).Resource().Attributes().Sort()
}
tln.ld = ld
return nil
}

View File

@ -29,7 +29,7 @@ const (
typeStr = "span"
)
var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: true}
var processorCapabilities = consumer.Capabilities{MutatesData: true}
// errMissingRequiredField is returned when a required field in the config
// is not specified.

View File

@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal"
@ -278,7 +279,11 @@ type notifyingSink struct {
ch chan int
}
func (s *notifyingSink) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
func (s *notifyingSink) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (s *notifyingSink) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
if md.MetricCount() > 0 {
s.receivedMetrics = true
}

View File

@ -47,6 +47,7 @@ import (
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/testutil"
@ -67,6 +68,10 @@ type traceConsumer struct {
cb func(context.Context, pdata.Traces)
}
func (t traceConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (t traceConsumer) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
go t.cb(ctx, td)
return nil

View File

@ -515,6 +515,10 @@ type zipkinMockTraceConsumer struct {
err error
}
func (m *zipkinMockTraceConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (m *zipkinMockTraceConsumer) ConsumeTraces(_ context.Context, td pdata.Traces) error {
m.ch <- td
return m.err

View File

@ -36,9 +36,9 @@ type builtPipeline struct {
firstMC consumer.Metrics
firstLC consumer.Logs
// MutatesConsumedData is set to true if any processors in the pipeline
// MutatesData is set to true if any processors in the pipeline
// can mutate the TraceData or MetricsData input argument.
MutatesConsumedData bool
MutatesData bool
processors []component.Processor
}
@ -161,7 +161,7 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
var proc component.TracesProcessor
proc, err = factory.CreateTracesProcessor(ctx, creationParams, procCfg, tc)
if proc != nil {
mutatesConsumedData = mutatesConsumedData || proc.GetCapabilities().MutatesConsumedData
mutatesConsumedData = mutatesConsumedData || proc.Capabilities().MutatesData
}
processors[i] = proc
tc = proc
@ -169,7 +169,7 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
var proc component.MetricsProcessor
proc, err = factory.CreateMetricsProcessor(ctx, creationParams, procCfg, mc)
if proc != nil {
mutatesConsumedData = mutatesConsumedData || proc.GetCapabilities().MutatesConsumedData
mutatesConsumedData = mutatesConsumedData || proc.Capabilities().MutatesData
}
processors[i] = proc
mc = proc
@ -178,7 +178,7 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
var proc component.LogsProcessor
proc, err = factory.CreateLogsProcessor(ctx, creationParams, procCfg, lc)
if proc != nil {
mutatesConsumedData = mutatesConsumedData || proc.GetCapabilities().MutatesConsumedData
mutatesConsumedData = mutatesConsumedData || proc.Capabilities().MutatesData
}
processors[i] = proc
lc = proc

View File

@ -281,7 +281,7 @@ func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.Traces {
anyPipelineMutatesData := false
for _, pipeline := range pipelines {
pipelineConsumers = append(pipelineConsumers, pipeline.firstTC)
anyPipelineMutatesData = anyPipelineMutatesData || pipeline.MutatesConsumedData
anyPipelineMutatesData = anyPipelineMutatesData || pipeline.MutatesData
}
// Create a junction point that fans out to all pipelines.
@ -306,7 +306,7 @@ func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.Metrics {
anyPipelineMutatesData := false
for _, pipeline := range pipelines {
pipelineConsumers = append(pipelineConsumers, pipeline.firstMC)
anyPipelineMutatesData = anyPipelineMutatesData || pipeline.MutatesConsumedData
anyPipelineMutatesData = anyPipelineMutatesData || pipeline.MutatesData
}
// Create a junction point that fans out to all pipelines.
@ -331,7 +331,7 @@ func buildFanoutLogConsumer(pipelines []*builtPipeline) consumer.Logs {
anyPipelineMutatesData := false
for _, pipeline := range pipelines {
pipelineConsumers = append(pipelineConsumers, pipeline.firstLC)
anyPipelineMutatesData = anyPipelineMutatesData || pipeline.MutatesConsumedData
anyPipelineMutatesData = anyPipelineMutatesData || pipeline.MutatesData
}
// Create a junction point that fans out to all pipelines.

View File

@ -91,12 +91,12 @@ type SummaryPipelinesTableData struct {
// SummaryPipelinesTableRowData contains data for one row in pipelines summary table template.
type SummaryPipelinesTableRowData struct {
FullName string
InputType string
MutatesConsumedData bool
Receivers []string
Processors []string
Exporters []string
FullName string
InputType string
MutatesData bool
Receivers []string
Processors []string
Exporters []string
}
// WriteHTMLPipelinesSummaryTable writes the summary table for one component type (receivers, processors, exporters).

View File

@ -4,7 +4,7 @@
<td>&nbsp;&nbsp;|&nbsp;&nbsp;</td>
<td colspan=1 align=center><b>InputType</b></td>
<td>&nbsp;&nbsp;|&nbsp;&nbsp;</td>
<td colspan=1 align=center><b>MutatesConsumedData</b></td>
<td colspan=1 align=center><b>MutatesData</b></td>
<td>&nbsp;&nbsp;|&nbsp;&nbsp;</td>
<td colspan=1 align=center><b>Receivers</b></td>
<td>&nbsp;&nbsp;|&nbsp;&nbsp;</td>
@ -20,7 +20,7 @@
<tr>{{end -}}
<td>{{$row.FullName}}</td><td>&nbsp;&nbsp;|&nbsp;&nbsp;</td>
<td>{{$row.InputType}}</td><td>&nbsp;&nbsp;|&nbsp;&nbsp;</td>
<td>{{$row.MutatesConsumedData}}</td><td>&nbsp;&nbsp;|&nbsp;&nbsp;</td>
<td>{{$row.MutatesData}}</td><td>&nbsp;&nbsp;|&nbsp;&nbsp;</td>
<td align="center">
{{range $recindex, $rec := $row.Receivers}}
<a href="{{$a}}?zpipelinename={{$row.FullName}}&zcomponentname={{$rec}}&zcomponentkind=receiver">{{$rec}}</a>

View File

@ -62,12 +62,12 @@ func TestNoCrash(t *testing.T) {
WriteHTMLPipelinesSummaryTable(buf, SummaryPipelinesTableData{
ComponentEndpoint: "pagez",
Rows: []SummaryPipelinesTableRowData{{
FullName: "test",
InputType: "metrics",
MutatesConsumedData: false,
Receivers: []string{"oc"},
Processors: []string{"nop"},
Exporters: []string{"oc"},
FullName: "test",
InputType: "metrics",
MutatesData: false,
Receivers: []string{"oc"},
Processors: []string{"nop"},
Exporters: []string{"oc"},
}},
})
})

View File

@ -275,17 +275,17 @@ vuDEoocBiqjF/5RszGuV1uhFsCujl0bMC/Vz62vzZe1hY98DAAD//7qRGmLTAQAA
"/templates/pipelines_table.html": {
name: "pipelines_table.html",
local: "../templates/pipelines_table.html",
size: 1946,
size: 1930,
modtime: 0,
compressed: `
H4sIAAAAAAAC/7SVwXLTMBCG7zyFxnRyIjVcU1scSpnhAMN0eAFZ2gRNlZVmJbdujd+dsWyrTp0LtL5k
rOjX/tlv/8hFEJUB5sOjgTKrLCmgrXdCajzs2MeMv2OMsSLQ8DAsFJPWeCew/MSE0QcsDewDLyr+tTbm
hzhCkVe8yIM6OcU3WHl3NXz+mS8W0oWBBAxAvcU3dHX49ejW9PheBxHAX1v09RHUFxHEim63IEHfA/kV
PX6SleC9XdXkpnGWwqKRIp/i07YXgu1Kdnltj84iYLhB5azG0HWjgAQegF2QfdCooPkQH+OZW/vgR9kg
3TK9Z3AP+Cyf7Y+5TdEW8u5Atka1Y+8BIOOzSmA8LI/ytgVUbDvb6VG1bW93OUW962Kn/wZxKpLC/Koq
Z+L6X/XGgWbDRGeETkcDMo0GZD+a+CNSil+AjLUF+02wL7M+AF33+clpB0YjoDhCuQC6eZJTPpIA5Mn3
dxpVSaNlxidFkQu+dK/oZSuAaj7VN0a1IUF0dZ6eIzvRc2QTvef/5yr4HNklPjd5Rn5Rcpbf2XbWJZhw
QeMmXNC4hCvdNKvQgsYtacFoGWFFxSvCNl+lu3HQFXl8JfO/AQAA//9We3KLmgcAAA==
H4sIAAAAAAAC/7SVwXLTMBCG7zyFxnRyIjVcU1scoMxwgGE6vIAsbYKmykqzklu3xu/OWLZVp84Fgi8Z
Kfq1/+y3f5QiiMoA8+HJQJlVlhTQ1jshNR527H3G3zDGWBFoWAwbxaQ13gksPzBh9AFLA/vAi4p/qY35
Lo5Q5BUv8qBObvENVt7dDJ+/55uFdGEgAQNQb/EVXR1+Prk1Pb7VQQTwn0UQK7rcgQT9AORX9PhBVoL3
dlWT28ZZCotGinyKTdteCbYr2fUne3QWAcMtKmc1hq4bBSTwAOyK7KNGBc27uIx37uyjH2WDdMv0nsED
4It8dj7mNUVayPsD2RrVjr0FgIzPKoHxsLzK2xZQse3spEfVtr3d9RTxroud/h3EqUgK8UVVZjH9pzrj
ILNhkjMypyMBmUYCsh9JNE/pfQUw1hbsF8G+zPrBd93HZ6cdGI2A4gjlAuTmWU65SAKQJ9/fa1QljZYZ
nxRFLvjSvaLXrQCq+TT/M6oNCaKb8/Qc2YmeI5vovfwuV8HnyC7xuckz8ouSs/zOtrMuwYQLGjfhgsYl
XOmFWYUWNG5JC0bLCCsqLgjbfJfexEFX5PEvmP8JAAD//50711CKBwAA
`,
},

View File

@ -99,12 +99,12 @@ func (srv *service) getPipelinesSummaryTableData() zpages.SummaryPipelinesTableD
exps = append(exps, expID.String())
}
row := zpages.SummaryPipelinesTableRowData{
FullName: c.Name,
InputType: string(c.InputType),
MutatesConsumedData: p.MutatesConsumedData,
Receivers: recvs,
Processors: procs,
Exporters: exps,
FullName: c.Name,
InputType: string(c.InputType),
MutatesData: p.MutatesData,
Receivers: recvs,
Processors: procs,
Exporters: exps,
}
data.Rows = append(data.Rows, row)
}

View File

@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/testbed/testbed"
)
@ -59,6 +60,10 @@ func newTestHarness(
}
}
func (h *testHarness) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (h *testHarness) ConsumeMetrics(_ context.Context, pdm pdata.Metrics) error {
h.compare(pdm)
if h.metricIndex.allReceived() {

View File

@ -165,6 +165,10 @@ type MockTraceConsumer struct {
backend *MockBackend
}
func (tc *MockTraceConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (tc *MockTraceConsumer) ConsumeTraces(_ context.Context, td pdata.Traces) error {
tc.numSpansReceived.Add(uint64(td.SpanCount()))
@ -208,6 +212,10 @@ type MockMetricConsumer struct {
backend *MockBackend
}
func (mc *MockMetricConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (mc *MockMetricConsumer) ConsumeMetrics(_ context.Context, md pdata.Metrics) error {
_, dataPoints := md.MetricAndDataPointCount()
mc.numMetricsReceived.Add(uint64(dataPoints))
@ -230,9 +238,13 @@ type MockLogConsumer struct {
backend *MockBackend
}
func (mc *MockLogConsumer) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
func (lc *MockLogConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
func (lc *MockLogConsumer) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
recordCount := ld.LogRecordCount()
mc.numLogRecordsReceived.Add(uint64(recordCount))
mc.backend.ConsumeLogs(ld)
lc.numLogRecordsReceived.Add(uint64(recordCount))
lc.backend.ConsumeLogs(ld)
return nil
}