Rename processor component.TraceProcessor to component.TracesProcessor, and equivalent Create method feature request (#2026)
Fixes #1975 Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
parent
1ec1aa6975
commit
a3fa0a9166
|
|
@ -9,6 +9,7 @@
|
|||
## 🛑 Breaking changes 🛑
|
||||
|
||||
- Rename consumer.TraceConsumer to consumer.TracesConsumer #1974
|
||||
- Rename component.TraceProcessor to component.TracesProcessor #1976
|
||||
|
||||
## v0.13.0 Beta
|
||||
|
||||
|
|
|
|||
|
|
@ -385,7 +385,7 @@ func (f *ExampleProcessorFactory) CreateDefaultConfig() configmodels.Processor {
|
|||
}
|
||||
|
||||
// CreateTraceProcessor creates a trace processor based on this config.
|
||||
func (f *ExampleProcessorFactory) CreateTraceProcessor(ctx context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, nextConsumer consumer.TracesConsumer) (component.TraceProcessor, error) {
|
||||
func (f *ExampleProcessorFactory) CreateTracesProcessor(ctx context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, nextConsumer consumer.TracesConsumer) (component.TracesProcessor, error) {
|
||||
return &ExampleProcessor{nextTraces: nextConsumer}, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ import (
|
|||
"go.opentelemetry.io/collector/consumer"
|
||||
)
|
||||
|
||||
// Processor defines the common functions that must be implemented by TraceProcessor
|
||||
// Processor defines the common functions that must be implemented by TracesProcessor
|
||||
// and MetricsProcessor.
|
||||
type Processor interface {
|
||||
Component
|
||||
|
|
@ -32,8 +32,8 @@ type Processor interface {
|
|||
GetCapabilities() ProcessorCapabilities
|
||||
}
|
||||
|
||||
// TraceProcessor is a processor that can consume traces.
|
||||
type TraceProcessor interface {
|
||||
// TracesProcessor is a processor that can consume traces.
|
||||
type TracesProcessor interface {
|
||||
Processor
|
||||
consumer.TracesConsumer
|
||||
}
|
||||
|
|
@ -87,12 +87,12 @@ type ProcessorFactory interface {
|
|||
// CreateTraceProcessor creates a trace processor based on this config.
|
||||
// If the processor type does not support tracing or if the config is not valid
|
||||
// error will be returned instead.
|
||||
CreateTraceProcessor(
|
||||
CreateTracesProcessor(
|
||||
ctx context.Context,
|
||||
params ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.TracesConsumer,
|
||||
) (TraceProcessor, error)
|
||||
) (TracesProcessor, error)
|
||||
|
||||
// CreateMetricsProcessor creates a metrics processor based on this config.
|
||||
// If the processor type does not support metrics or if the config is not valid
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ func (f *TestProcessorFactory) CreateDefaultConfig() configmodels.Processor {
|
|||
}
|
||||
|
||||
// CreateTraceProcessor creates a trace processor based on this config.
|
||||
func (f *TestProcessorFactory) CreateTraceProcessor(context.Context, ProcessorCreateParams, configmodels.Processor, consumer.TracesConsumer) (TraceProcessor, error) {
|
||||
func (f *TestProcessorFactory) CreateTracesProcessor(context.Context, ProcessorCreateParams, configmodels.Processor, consumer.TracesConsumer) (TracesProcessor, error) {
|
||||
return nil, configerror.ErrDataTypeIsNotSupported
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ type testCase struct {
|
|||
}
|
||||
|
||||
// runIndividualTestCase is the common logic of passing trace data through a configured attributes processor.
|
||||
func runIndividualTestCase(t *testing.T, tt testCase, tp component.TraceProcessor) {
|
||||
func runIndividualTestCase(t *testing.T, tt testCase, tp component.TracesProcessor) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
td := generateTraceData(tt.serviceName, tt.name, tt.inputAttributes)
|
||||
assert.NoError(t, tp.ConsumeTraces(context.Background(), td))
|
||||
|
|
@ -143,7 +143,7 @@ func TestSpanProcessor_NilEmptyData(t *testing.T) {
|
|||
{Key: "attribute1", Action: processorhelper.DELETE},
|
||||
}
|
||||
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
for i := range testCases {
|
||||
|
|
@ -210,7 +210,7 @@ func TestAttributes_FilterSpans(t *testing.T) {
|
|||
},
|
||||
Config: *createConfig(filterset.Strict),
|
||||
}
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
|
||||
|
|
@ -278,7 +278,7 @@ func TestAttributes_FilterSpansByNameStrict(t *testing.T) {
|
|||
SpanNames: []string{"dont_apply"},
|
||||
Config: *createConfig(filterset.Strict),
|
||||
}
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
|
||||
|
|
@ -346,7 +346,7 @@ func TestAttributes_FilterSpansByNameRegexp(t *testing.T) {
|
|||
SpanNames: []string{".*dont_apply$"},
|
||||
Config: *createConfig(filterset.Regexp),
|
||||
}
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
|
||||
|
|
@ -405,7 +405,7 @@ func TestAttributes_Hash(t *testing.T) {
|
|||
{Key: "user.authenticated", Action: processorhelper.HASH},
|
||||
}
|
||||
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
|
||||
|
|
@ -449,7 +449,7 @@ func BenchmarkAttributes_FilterSpansByName(b *testing.B) {
|
|||
oCfg.Include = &filterconfig.MatchProperties{
|
||||
SpanNames: []string{"^apply.*"},
|
||||
}
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
require.Nil(b, err)
|
||||
require.NotNil(b, tp)
|
||||
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ func createTraceProcessor(
|
|||
_ component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.TracesConsumer,
|
||||
) (component.TraceProcessor, error) {
|
||||
) (component.TracesProcessor, error) {
|
||||
oCfg := cfg.(*Config)
|
||||
if len(oCfg.Actions) == 0 {
|
||||
return nil, fmt.Errorf("error creating \"attributes\" processor due to missing required field \"actions\" of processor %q", cfg.Name())
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
|
|||
func TestFactoryCreateTraceProcessor_EmptyActions(t *testing.T) {
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
ap, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
ap, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, ap)
|
||||
}
|
||||
|
|
@ -62,7 +62,7 @@ func TestFactoryCreateTraceProcessor_InvalidActions(t *testing.T) {
|
|||
oCfg.Actions = []processorhelper.ActionKeyValue{
|
||||
{Key: "", Value: 123, Action: processorhelper.UPSERT},
|
||||
}
|
||||
ap, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
ap, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, ap)
|
||||
}
|
||||
|
|
@ -75,18 +75,18 @@ func TestFactoryCreateTraceProcessor(t *testing.T) {
|
|||
{Key: "a key", Action: processorhelper.DELETE},
|
||||
}
|
||||
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
assert.NotNil(t, tp)
|
||||
assert.NoError(t, err)
|
||||
|
||||
tp, err = factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, nil)
|
||||
tp, err = factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, nil)
|
||||
assert.Nil(t, tp)
|
||||
assert.Error(t, err)
|
||||
|
||||
oCfg.Actions = []processorhelper.ActionKeyValue{
|
||||
{Action: processorhelper.DELETE},
|
||||
}
|
||||
tp, err = factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
tp, err = factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
assert.Nil(t, tp)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -168,7 +168,7 @@ func (bp *batchProcessor) sendItems(measure *stats.Int64Measure) {
|
|||
bp.batch.reset()
|
||||
}
|
||||
|
||||
// ConsumeTraces implements TraceProcessor
|
||||
// ConsumeTraces implements TracesProcessor
|
||||
func (bp *batchProcessor) ConsumeTraces(_ context.Context, td pdata.Traces) error {
|
||||
bp.newItem <- td
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ func createTraceProcessor(
|
|||
params component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.TracesConsumer,
|
||||
) (component.TraceProcessor, error) {
|
||||
) (component.TracesProcessor, error) {
|
||||
oCfg := cfg.(*Config)
|
||||
// error can be ignored, level is parsed at the service startup
|
||||
level, _ := telemetry.GetLevel()
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ func TestCreateProcessor(t *testing.T) {
|
|||
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), creationParams, cfg, nil)
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), creationParams, cfg, nil)
|
||||
assert.NotNil(t, tp)
|
||||
assert.NoError(t, err, "cannot create trace processor")
|
||||
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ func TestCreateProcessors(t *testing.T) {
|
|||
t.Run(fmt.Sprintf("%s/%s", test.configName, name), func(t *testing.T) {
|
||||
factory := NewFactory()
|
||||
|
||||
tp, tErr := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewTracesNop())
|
||||
tp, tErr := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewTracesNop())
|
||||
// Not implemented error
|
||||
assert.NotNil(t, tErr)
|
||||
assert.Nil(t, tp)
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ func createTraceProcessor(
|
|||
params component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.TracesConsumer,
|
||||
) (component.TraceProcessor, error) {
|
||||
) (component.TracesProcessor, error) {
|
||||
ml, err := newMemoryLimiter(params.Logger, cfg.(*Config))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ func TestCreateProcessor(t *testing.T) {
|
|||
cfg := factory.CreateDefaultConfig()
|
||||
|
||||
// This processor can't be created with the default config.
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewTracesNop())
|
||||
assert.Nil(t, tp)
|
||||
assert.Error(t, err, "created processor with invalid settings")
|
||||
|
||||
|
|
@ -63,7 +63,7 @@ func TestCreateProcessor(t *testing.T) {
|
|||
pCfg.BallastSizeMiB = 2048
|
||||
pCfg.CheckInterval = 100 * time.Millisecond
|
||||
|
||||
tp, err = factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewTracesNop())
|
||||
tp, err = factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewTracesNop())
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, tp)
|
||||
assert.NoError(t, tp.Shutdown(context.Background()))
|
||||
|
|
|
|||
|
|
@ -31,8 +31,8 @@ type FactoryOption func(o *factory)
|
|||
// CreateDefaultConfig is the equivalent of component.ProcessorFactory.CreateDefaultConfig()
|
||||
type CreateDefaultConfig func() configmodels.Processor
|
||||
|
||||
// CreateTraceProcessor is the equivalent of component.ProcessorFactory.CreateTraceProcessor()
|
||||
type CreateTraceProcessor func(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.TracesConsumer) (component.TraceProcessor, error)
|
||||
// CreateTraceProcessor is the equivalent of component.ProcessorFactory.CreateTracesProcessor()
|
||||
type CreateTraceProcessor func(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.TracesConsumer) (component.TracesProcessor, error)
|
||||
|
||||
// CreateMetricsProcessor is the equivalent of component.ProcessorFactory.CreateMetricsProcessor()
|
||||
type CreateMetricsProcessor func(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.MetricsConsumer) (component.MetricsProcessor, error)
|
||||
|
|
@ -108,8 +108,8 @@ func (f *factory) CreateDefaultConfig() configmodels.Processor {
|
|||
return f.createDefaultConfig()
|
||||
}
|
||||
|
||||
// CreateTraceProcessor creates a component.TraceProcessor based on this config.
|
||||
func (f *factory) CreateTraceProcessor(ctx context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, nextConsumer consumer.TracesConsumer) (component.TraceProcessor, error) {
|
||||
// CreateTraceProcessor creates a component.TracesProcessor based on this config.
|
||||
func (f *factory) CreateTracesProcessor(ctx context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, nextConsumer consumer.TracesConsumer) (component.TracesProcessor, error) {
|
||||
if f.createTraceProcessor != nil {
|
||||
return f.createTraceProcessor(ctx, params, cfg, nextConsumer)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ func TestNewTrace(t *testing.T) {
|
|||
assert.EqualValues(t, defaultCfg, factory.CreateDefaultConfig())
|
||||
_, ok := factory.(component.ConfigUnmarshaler)
|
||||
assert.False(t, ok)
|
||||
_, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, defaultCfg, nil)
|
||||
_, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, defaultCfg, nil)
|
||||
assert.Error(t, err)
|
||||
_, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, defaultCfg, nil)
|
||||
assert.Error(t, err)
|
||||
|
|
@ -65,7 +65,7 @@ func TestNewMetrics_WithConstructors(t *testing.T) {
|
|||
assert.True(t, ok)
|
||||
assert.Equal(t, errors.New("my error"), fu.Unmarshal(nil, nil))
|
||||
|
||||
_, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, defaultCfg, nil)
|
||||
_, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, defaultCfg, nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, defaultCfg, nil)
|
||||
|
|
@ -79,7 +79,7 @@ func defaultConfig() configmodels.Processor {
|
|||
return defaultCfg
|
||||
}
|
||||
|
||||
func createTraceProcessor(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.TracesConsumer) (component.TraceProcessor, error) {
|
||||
func createTraceProcessor(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.TracesConsumer) (component.TracesProcessor, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ type Start func(context.Context, component.Host) error
|
|||
// Shutdown specifies the function invoked when the processor is being shutdown.
|
||||
type Shutdown func(context.Context) error
|
||||
|
||||
// TProcessor is a helper interface that allows avoiding implementing all functions in TraceProcessor by using NewTraceProcessor.
|
||||
// TProcessor is a helper interface that allows avoiding implementing all functions in TracesProcessor by using NewTraceProcessor.
|
||||
type TProcessor interface {
|
||||
// ProcessTraces is a helper function that processes the incoming data and returns the data to be sent to the next component.
|
||||
// If error is returned then returned data are ignored. It MUST not call the next component.
|
||||
|
|
@ -143,14 +143,14 @@ func (mp *tracesProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) e
|
|||
return mp.nextConsumer.ConsumeTraces(ctx, td)
|
||||
}
|
||||
|
||||
// NewTraceProcessor creates a TraceProcessor that ensure context propagation and the right tags are set.
|
||||
// NewTraceProcessor creates a TracesProcessor that ensure context propagation and the right tags are set.
|
||||
// TODO: Add observability metrics support
|
||||
func NewTraceProcessor(
|
||||
config configmodels.Processor,
|
||||
nextConsumer consumer.TracesConsumer,
|
||||
processor TProcessor,
|
||||
options ...Option,
|
||||
) (component.TraceProcessor, error) {
|
||||
) (component.TracesProcessor, error) {
|
||||
if processor == nil {
|
||||
return nil, errors.New("nil processor")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ func createTraceProcessor(
|
|||
params component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.TracesConsumer,
|
||||
) (component.TraceProcessor, error) {
|
||||
) (component.TracesProcessor, error) {
|
||||
params.Logger.Warn("QueuedRetry processor is deprecated. Use exporter's queued retry config.")
|
||||
return newQueuedTracesProcessor(params, nextConsumer, cfg.(*Config)), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ func TestCreateProcessor(t *testing.T) {
|
|||
cfg := factory.CreateDefaultConfig()
|
||||
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
|
||||
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), creationParams, cfg, nil)
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), creationParams, cfg, nil)
|
||||
assert.NotNil(t, tp)
|
||||
assert.NoError(t, err, "cannot create trace processor")
|
||||
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ func createTraceProcessor(
|
|||
_ context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.TracesConsumer) (component.TraceProcessor, error) {
|
||||
nextConsumer consumer.TracesConsumer) (component.TracesProcessor, error) {
|
||||
attrProc, err := createAttrProcessor(cfg.(*Config), params.Logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ func TestCreateProcessor(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, tp)
|
||||
|
||||
|
|
@ -60,7 +60,7 @@ func TestInvalidEmptyActions(t *testing.T) {
|
|||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
|
||||
_, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
_, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewMetricsNop())
|
||||
|
|
@ -79,7 +79,7 @@ func TestInvalidAttributeActions(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
_, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, nil)
|
||||
_, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, nil)
|
||||
assert.Error(t, err)
|
||||
|
||||
_, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, nil)
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ func TestResourceProcessorAttributesUpsert(t *testing.T) {
|
|||
ttn := &testTraceConsumer{}
|
||||
|
||||
factory := NewFactory()
|
||||
rtp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, tt.config, ttn)
|
||||
rtp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, tt.config, ttn)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, rtp.GetCapabilities().MutatesConsumedData)
|
||||
|
||||
|
|
@ -151,7 +151,7 @@ func TestResourceProcessorError(t *testing.T) {
|
|||
}
|
||||
|
||||
factory := NewFactory()
|
||||
rtp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, badCfg, ttn)
|
||||
rtp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, badCfg, ttn)
|
||||
require.Error(t, err)
|
||||
require.Nil(t, rtp)
|
||||
|
||||
|
|
|
|||
|
|
@ -45,13 +45,13 @@ func createDefaultConfig() configmodels.Processor {
|
|||
}
|
||||
}
|
||||
|
||||
// CreateTraceProcessor creates a trace processor based on this config.
|
||||
// CreateTracesProcessor creates a trace processor based on this config.
|
||||
func createTraceProcessor(
|
||||
_ context.Context,
|
||||
_ component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.TracesConsumer,
|
||||
) (component.TraceProcessor, error) {
|
||||
) (component.TracesProcessor, error) {
|
||||
oCfg := cfg.(*Config)
|
||||
return newTraceProcessor(nextConsumer, *oCfg)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -54,9 +54,9 @@ type tracesamplerprocessor struct {
|
|||
hashSeed uint32
|
||||
}
|
||||
|
||||
// newTraceProcessor returns a processor.TraceProcessor that will perform head sampling according to the given
|
||||
// newTraceProcessor returns a processor.TracesProcessor that will perform head sampling according to the given
|
||||
// configuration.
|
||||
func newTraceProcessor(nextConsumer consumer.TracesConsumer, cfg Config) (component.TraceProcessor, error) {
|
||||
func newTraceProcessor(nextConsumer consumer.TracesConsumer, cfg Config) (component.TracesProcessor, error) {
|
||||
if nextConsumer == nil {
|
||||
return nil, componenterror.ErrNilNextConsumer
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ func TestNewTraceProcessor(t *testing.T) {
|
|||
name string
|
||||
nextConsumer consumer.TracesConsumer
|
||||
cfg Config
|
||||
want component.TraceProcessor
|
||||
want component.TracesProcessor
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ func createTraceProcessor(
|
|||
_ component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.TracesConsumer,
|
||||
) (component.TraceProcessor, error) {
|
||||
) (component.TracesProcessor, error) {
|
||||
|
||||
// 'from_attributes' or 'to_attributes' under 'name' has to be set for the span
|
||||
// processor to be valid. If not set and not enforced, the processor would do no work.
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ func TestFactory_CreateTraceProcessor(t *testing.T) {
|
|||
|
||||
// Name.FromAttributes field needs to be set for the configuration to be valid.
|
||||
oCfg.Rename.FromAttributes = []string{"test-key"}
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
|
||||
require.Nil(t, err)
|
||||
assert.NotNil(t, tp)
|
||||
|
|
@ -90,7 +90,7 @@ func TestFactory_CreateTraceProcessor_InvalidConfig(t *testing.T) {
|
|||
cfg := factory.CreateDefaultConfig().(*Config)
|
||||
cfg.Rename = test.cfg
|
||||
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, tp)
|
||||
assert.EqualValues(t, err, test.err)
|
||||
})
|
||||
|
|
|
|||
|
|
@ -37,11 +37,11 @@ func TestNewTraceProcessor(t *testing.T) {
|
|||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Rename.FromAttributes = []string{"foo"}
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, nil)
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, nil)
|
||||
require.Error(t, componenterror.ErrNilNextConsumer, err)
|
||||
require.Nil(t, tp)
|
||||
|
||||
tp, err = factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
tp, err = factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{}, cfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
}
|
||||
|
|
@ -56,7 +56,7 @@ type testCase struct {
|
|||
}
|
||||
|
||||
// runIndividualTestCase is the common logic of passing trace data through a configured attributes processor.
|
||||
func runIndividualTestCase(t *testing.T, tt testCase, tp component.TraceProcessor) {
|
||||
func runIndividualTestCase(t *testing.T, tt testCase, tp component.TracesProcessor) {
|
||||
t.Run(tt.inputName, func(t *testing.T) {
|
||||
td := generateTraceData(tt.serviceName, tt.inputName, tt.inputAttributes)
|
||||
|
||||
|
|
@ -156,7 +156,7 @@ func TestSpanProcessor_NilEmptyData(t *testing.T) {
|
|||
}
|
||||
oCfg.Rename.FromAttributes = []string{"key"}
|
||||
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
for i := range testCases {
|
||||
|
|
@ -260,7 +260,7 @@ func TestSpanProcessor_Values(t *testing.T) {
|
|||
oCfg := cfg.(*Config)
|
||||
oCfg.Rename.FromAttributes = []string{"key1"}
|
||||
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
for _, tc := range testCases {
|
||||
|
|
@ -336,7 +336,7 @@ func TestSpanProcessor_MissingKeys(t *testing.T) {
|
|||
oCfg.Rename.FromAttributes = []string{"key1", "key2", "key3", "key4"}
|
||||
oCfg.Rename.Separator = "::"
|
||||
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
for _, tc := range testCases {
|
||||
|
|
@ -354,7 +354,7 @@ func TestSpanProcessor_Separator(t *testing.T) {
|
|||
oCfg.Rename.FromAttributes = []string{"key1"}
|
||||
oCfg.Rename.Separator = "::"
|
||||
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
|
||||
|
|
@ -383,7 +383,7 @@ func TestSpanProcessor_NoSeparatorMultipleKeys(t *testing.T) {
|
|||
oCfg.Rename.FromAttributes = []string{"key1", "key2"}
|
||||
oCfg.Rename.Separator = ""
|
||||
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
|
||||
|
|
@ -413,7 +413,7 @@ func TestSpanProcessor_SeparatorMultipleKeys(t *testing.T) {
|
|||
oCfg.Rename.FromAttributes = []string{"key1", "key2", "key3", "key4"}
|
||||
oCfg.Rename.Separator = "::"
|
||||
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
|
||||
|
|
@ -448,7 +448,7 @@ func TestSpanProcessor_NilName(t *testing.T) {
|
|||
oCfg.Rename.FromAttributes = []string{"key1"}
|
||||
oCfg.Rename.Separator = "::"
|
||||
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
|
||||
|
|
@ -545,7 +545,7 @@ func TestSpanProcessor_ToAttributes(t *testing.T) {
|
|||
for _, tc := range testCases {
|
||||
oCfg.Rename.ToAttributes.Rules = tc.rules
|
||||
oCfg.Rename.ToAttributes.BreakAfterMatch = tc.breakAfterMatch
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
|
||||
|
|
@ -612,7 +612,7 @@ func TestSpanProcessor_skipSpan(t *testing.T) {
|
|||
oCfg.Rename.ToAttributes = &ToAttributes{
|
||||
Rules: []string{`(?P<operation_website>.*?)$`},
|
||||
}
|
||||
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
tp, err := factory.CreateTracesProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, oCfg, consumertest.NewTracesNop())
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, tp)
|
||||
|
||||
|
|
|
|||
|
|
@ -161,8 +161,8 @@ func (pb *PipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
|
|||
|
||||
switch pipelineCfg.InputType {
|
||||
case configmodels.TracesDataType:
|
||||
var proc component.TraceProcessor
|
||||
proc, err = factory.CreateTraceProcessor(ctx, creationParams, procCfg, tc)
|
||||
var proc component.TracesProcessor
|
||||
proc, err = factory.CreateTracesProcessor(ctx, creationParams, procCfg, tc)
|
||||
if proc != nil {
|
||||
mutatesConsumedData = mutatesConsumedData || proc.GetCapabilities().MutatesConsumedData
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue