Add helper processor factory to reduce boilerplate (#1339)
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
parent
2195bbc09d
commit
0c7de066d7
|
|
@ -135,7 +135,7 @@ func TestSpanProcessor_NilEmptyData(t *testing.T) {
|
|||
output: testdata.GenerateTraceDataOneEmptyOneNilInstrumentationLibrary(),
|
||||
},
|
||||
}
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Settings.Actions = []attraction.ActionKeyValue{
|
||||
|
|
@ -195,7 +195,7 @@ func TestAttributes_FilterSpans(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Actions = []attraction.ActionKeyValue{
|
||||
|
|
@ -265,7 +265,7 @@ func TestAttributes_FilterSpansByNameStrict(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Actions = []attraction.ActionKeyValue{
|
||||
|
|
@ -333,7 +333,7 @@ func TestAttributes_FilterSpansByNameRegexp(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Actions = []attraction.ActionKeyValue{
|
||||
|
|
@ -396,7 +396,7 @@ func TestAttributes_Hash(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Actions = []attraction.ActionKeyValue{
|
||||
|
|
@ -441,7 +441,7 @@ func BenchmarkAttributes_FilterSpansByName(b *testing.B) {
|
|||
},
|
||||
}
|
||||
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Actions = []attraction.ActionKeyValue{
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ func TestLoadingConifg(t *testing.T) {
|
|||
factories, err := config.ExampleComponents()
|
||||
assert.NoError(t, err)
|
||||
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
factories.Processors[typeStr] = factory
|
||||
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
|
||||
assert.NoError(t, err)
|
||||
|
|
|
|||
|
|
@ -19,11 +19,11 @@ import (
|
|||
"fmt"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configerror"
|
||||
"go.opentelemetry.io/collector/config/configmodels"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
"go.opentelemetry.io/collector/internal/processor/attraction"
|
||||
"go.opentelemetry.io/collector/internal/processor/filterspan"
|
||||
"go.opentelemetry.io/collector/processor/processorhelper"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -31,18 +31,16 @@ const (
|
|||
typeStr = "attributes"
|
||||
)
|
||||
|
||||
// Factory is the factory for Attributes processor.
|
||||
type Factory struct {
|
||||
// NewFactory returns a new factory for the Attributes processor.
|
||||
func NewFactory() component.ProcessorFactory {
|
||||
return processorhelper.NewFactory(
|
||||
typeStr,
|
||||
createDefaultConfig,
|
||||
processorhelper.WithTraceProcessor(createTraceProcessor))
|
||||
}
|
||||
|
||||
// Type gets the type of the config created by this factory.
|
||||
func (f *Factory) Type() configmodels.Type {
|
||||
return typeStr
|
||||
}
|
||||
|
||||
// CreateDefaultConfig creates the default configuration for the processor.
|
||||
// Note: This isn't a valid configuration because the processor would do no work.
|
||||
func (f *Factory) CreateDefaultConfig() configmodels.Processor {
|
||||
func createDefaultConfig() configmodels.Processor {
|
||||
return &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: typeStr,
|
||||
|
|
@ -51,14 +49,12 @@ func (f *Factory) CreateDefaultConfig() configmodels.Processor {
|
|||
}
|
||||
}
|
||||
|
||||
// CreateTraceProcessor creates a trace processor based on this config.
|
||||
func (f *Factory) CreateTraceProcessor(
|
||||
func createTraceProcessor(
|
||||
_ context.Context,
|
||||
_ component.ProcessorCreateParams,
|
||||
nextConsumer consumer.TraceConsumer,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.TraceConsumer,
|
||||
) (component.TraceProcessor, error) {
|
||||
|
||||
oCfg := cfg.(*Config)
|
||||
if len(oCfg.Actions) == 0 {
|
||||
return nil, fmt.Errorf("error creating \"attributes\" processor due to missing required field \"actions\" of processor %q", cfg.Name())
|
||||
|
|
@ -77,13 +73,3 @@ func (f *Factory) CreateTraceProcessor(
|
|||
}
|
||||
return newTraceProcessor(nextConsumer, attrProc, include, exclude)
|
||||
}
|
||||
|
||||
// CreateMetricsProcessor creates a metrics processor based on this config.
|
||||
func (f *Factory) CreateMetricsProcessor(
|
||||
_ context.Context,
|
||||
_ component.ProcessorCreateParams,
|
||||
_ consumer.MetricsConsumer,
|
||||
_ configmodels.Processor,
|
||||
) (component.MetricsProcessor, error) {
|
||||
return nil, configerror.ErrDataTypeIsNotSupported
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,12 +30,12 @@ import (
|
|||
)
|
||||
|
||||
func TestFactory_Type(t *testing.T) {
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
assert.Equal(t, factory.Type(), configmodels.Type(typeStr))
|
||||
}
|
||||
|
||||
func TestFactory_CreateDefaultConfig(t *testing.T) {
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
assert.Equal(t, cfg, &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
|
|
@ -47,7 +47,7 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFactoryCreateTraceProcessor_EmptyActions(t *testing.T) {
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
ap, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, exportertest.NewNopTraceExporter(), cfg)
|
||||
assert.Error(t, err)
|
||||
|
|
@ -55,7 +55,7 @@ func TestFactoryCreateTraceProcessor_EmptyActions(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFactoryCreateTraceProcessor_InvalidActions(t *testing.T) {
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
// Missing key
|
||||
|
|
@ -68,7 +68,7 @@ func TestFactoryCreateTraceProcessor_InvalidActions(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFactoryCreateTraceProcessor(t *testing.T) {
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Actions = []attraction.ActionKeyValue{
|
||||
|
|
@ -95,7 +95,7 @@ func TestFactoryCreateTraceProcessor(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFactory_CreateMetricsProcessor(t *testing.T) {
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
|
||||
mp, err := factory.CreateMetricsProcessor(
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ import (
|
|||
|
||||
func TestBatchProcessorSpansDelivered(t *testing.T) {
|
||||
sink := &exportertest.SinkTraceExporter{}
|
||||
cfg := generateDefaultConfig()
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.SendBatchSize = 128
|
||||
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
|
||||
batcher := newBatchTracesProcessor(creationParams, sink, cfg)
|
||||
|
|
@ -80,7 +80,7 @@ func TestBatchProcessorSentBySize(t *testing.T) {
|
|||
defer view.Unregister(views...)
|
||||
|
||||
sink := &exportertest.SinkTraceExporter{}
|
||||
cfg := generateDefaultConfig()
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
sendBatchSize := 20
|
||||
cfg.SendBatchSize = uint32(sendBatchSize)
|
||||
cfg.Timeout = 500 * time.Millisecond
|
||||
|
|
@ -128,7 +128,7 @@ func TestBatchProcessorSentBySize(t *testing.T) {
|
|||
|
||||
func TestBatchProcessorSentByTimeout(t *testing.T) {
|
||||
sink := &exportertest.SinkTraceExporter{}
|
||||
cfg := generateDefaultConfig()
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
sendBatchSize := 100
|
||||
cfg.SendBatchSize = uint32(sendBatchSize)
|
||||
cfg.Timeout = 100 * time.Millisecond
|
||||
|
|
|
|||
|
|
@ -30,8 +30,8 @@ func TestLoadConfig(t *testing.T) {
|
|||
factories, err := config.ExampleComponents()
|
||||
assert.NoError(t, err)
|
||||
|
||||
factory := &Factory{}
|
||||
factories.Processors[typeStr] = &Factory{}
|
||||
factory := NewFactory()
|
||||
factories.Processors[typeStr] = factory
|
||||
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
|
||||
|
||||
require.Nil(t, err)
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configmodels"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
"go.opentelemetry.io/collector/processor/processorhelper"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -31,43 +32,16 @@ const (
|
|||
defaultTimeout = 200 * time.Millisecond
|
||||
)
|
||||
|
||||
// Factory is the factory for batch processor.
|
||||
type Factory struct {
|
||||
// NewFactory returns a new factory for the Batch processor.
|
||||
func NewFactory() component.ProcessorFactory {
|
||||
return processorhelper.NewFactory(
|
||||
typeStr,
|
||||
createDefaultConfig,
|
||||
processorhelper.WithTraceProcessor(createTraceProcessor),
|
||||
processorhelper.WithMetricsProcessor(createMetricsProcessor))
|
||||
}
|
||||
|
||||
// Type gets the type of the config created by this factory.
|
||||
func (f *Factory) Type() configmodels.Type {
|
||||
return typeStr
|
||||
}
|
||||
|
||||
// CreateDefaultConfig creates the default configuration for processor.
|
||||
func (f *Factory) CreateDefaultConfig() configmodels.Processor {
|
||||
return generateDefaultConfig()
|
||||
}
|
||||
|
||||
// CreateTraceProcessor creates a trace processor based on this config.
|
||||
func (f *Factory) CreateTraceProcessor(
|
||||
ctx context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
nextConsumer consumer.TraceConsumer,
|
||||
c configmodels.Processor,
|
||||
) (component.TraceProcessor, error) {
|
||||
cfg := c.(*Config)
|
||||
return newBatchTracesProcessor(params, nextConsumer, cfg), nil
|
||||
}
|
||||
|
||||
// CreateMetricsProcessor creates a metrics processor based on this config.
|
||||
func (f *Factory) CreateMetricsProcessor(
|
||||
ctx context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
nextConsumer consumer.MetricsConsumer,
|
||||
c configmodels.Processor,
|
||||
) (component.MetricsProcessor, error) {
|
||||
cfg := c.(*Config)
|
||||
return newBatchMetricsProcessor(params, nextConsumer, cfg), nil
|
||||
}
|
||||
|
||||
func generateDefaultConfig() *Config {
|
||||
func createDefaultConfig() configmodels.Processor {
|
||||
return &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: typeStr,
|
||||
|
|
@ -77,3 +51,23 @@ func generateDefaultConfig() *Config {
|
|||
Timeout: defaultTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
func createTraceProcessor(
|
||||
_ context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.TraceConsumer,
|
||||
) (component.TraceProcessor, error) {
|
||||
oCfg := cfg.(*Config)
|
||||
return newBatchTracesProcessor(params, nextConsumer, oCfg), nil
|
||||
}
|
||||
|
||||
func createMetricsProcessor(
|
||||
_ context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.MetricsConsumer,
|
||||
) (component.MetricsProcessor, error) {
|
||||
oCfg := cfg.(*Config)
|
||||
return newBatchMetricsProcessor(params, nextConsumer, oCfg), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ import (
|
|||
)
|
||||
|
||||
func TestCreateDefaultConfig(t *testing.T) {
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
assert.NotNil(t, cfg, "failed to create default config")
|
||||
|
|
@ -34,7 +34,7 @@ func TestCreateDefaultConfig(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCreateProcessor(t *testing.T) {
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ func TestLoadingConfigStrict(t *testing.T) {
|
|||
factories, err := config.ExampleComponents()
|
||||
assert.Nil(t, err)
|
||||
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
factories.Processors[configmodels.Type(typeStr)] = factory
|
||||
config, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config_strict.yaml"), factories)
|
||||
|
||||
|
|
@ -146,7 +146,7 @@ func TestLoadingConfigRegexp(t *testing.T) {
|
|||
factories, err := config.ExampleComponents()
|
||||
assert.Nil(t, err)
|
||||
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
factories.Processors[typeStr] = factory
|
||||
config, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config_regexp.yaml"), factories)
|
||||
|
||||
|
|
|
|||
|
|
@ -18,9 +18,9 @@ import (
|
|||
"context"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configerror"
|
||||
"go.opentelemetry.io/collector/config/configmodels"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
"go.opentelemetry.io/collector/processor/processorhelper"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -28,17 +28,15 @@ const (
|
|||
typeStr = "filter"
|
||||
)
|
||||
|
||||
// Factory is the factory for filter processor.
|
||||
type Factory struct {
|
||||
// NewFactory returns a new factory for the Filter processor.
|
||||
func NewFactory() component.ProcessorFactory {
|
||||
return processorhelper.NewFactory(
|
||||
typeStr,
|
||||
createDefaultConfig,
|
||||
processorhelper.WithMetricsProcessor(createMetricsProcessor))
|
||||
}
|
||||
|
||||
// Type gets the type of the Option config created by this factory.
|
||||
func (f Factory) Type() configmodels.Type {
|
||||
return typeStr
|
||||
}
|
||||
|
||||
// CreateDefaultConfig creates the default configuration for processor.
|
||||
func (f Factory) CreateDefaultConfig() configmodels.Processor {
|
||||
func createDefaultConfig() configmodels.Processor {
|
||||
return &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: typeStr,
|
||||
|
|
@ -47,23 +45,12 @@ func (f Factory) CreateDefaultConfig() configmodels.Processor {
|
|||
}
|
||||
}
|
||||
|
||||
// CreateTraceProcessor creates a trace processor based on this config.
|
||||
func (f *Factory) CreateTraceProcessor(
|
||||
ctx context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
nextConsumer consumer.TraceConsumer,
|
||||
c configmodels.Processor,
|
||||
) (component.TraceProcessor, error) {
|
||||
return nil, configerror.ErrDataTypeIsNotSupported
|
||||
}
|
||||
|
||||
// CreateMetricsProcessor creates a metrics processor based on this config.
|
||||
func (f *Factory) CreateMetricsProcessor(
|
||||
ctx context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
func createMetricsProcessor(
|
||||
_ context.Context,
|
||||
_ component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.MetricsConsumer,
|
||||
c configmodels.Processor,
|
||||
) (component.MetricsProcessor, error) {
|
||||
oCfg := c.(*Config)
|
||||
oCfg := cfg.(*Config)
|
||||
return newFilterMetricProcessor(nextConsumer, oCfg)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,14 +30,14 @@ import (
|
|||
)
|
||||
|
||||
func TestType(t *testing.T) {
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
pType := factory.Type()
|
||||
|
||||
assert.Equal(t, pType, configmodels.Type("filter"))
|
||||
}
|
||||
|
||||
func TestCreateDefaultConfig(t *testing.T) {
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
assert.Equal(t, cfg, &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
|
|
@ -69,14 +69,14 @@ func TestCreateProcessors(t *testing.T) {
|
|||
factories, err := config.ExampleComponents()
|
||||
assert.Nil(t, err)
|
||||
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
factories.Processors[typeStr] = factory
|
||||
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", test.configName), factories)
|
||||
assert.Nil(t, err)
|
||||
|
||||
for name, cfg := range cfg.Processors {
|
||||
t.Run(fmt.Sprintf("%s/%s", test.configName, name), func(t *testing.T) {
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
|
||||
tp, tErr := factory.CreateTraceProcessor(
|
||||
context.Background(),
|
||||
|
|
|
|||
|
|
@ -29,19 +29,19 @@ import (
|
|||
func TestLoadConfig(t *testing.T) {
|
||||
factories, err := config.ExampleComponents()
|
||||
require.NoError(t, err)
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
factories.Processors[typeStr] = factory
|
||||
require.NoError(t, err)
|
||||
|
||||
config, err := config.LoadConfigFile(
|
||||
cfg, err := config.LoadConfigFile(
|
||||
t,
|
||||
path.Join(".", "testdata", "config.yaml"),
|
||||
factories)
|
||||
|
||||
require.Nil(t, err)
|
||||
require.NotNil(t, config)
|
||||
require.NotNil(t, cfg)
|
||||
|
||||
p0 := config.Processors["memory_limiter"]
|
||||
p0 := cfg.Processors["memory_limiter"]
|
||||
assert.Equal(t, p0,
|
||||
&Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
|
|
@ -50,7 +50,7 @@ func TestLoadConfig(t *testing.T) {
|
|||
},
|
||||
})
|
||||
|
||||
p1 := config.Processors["memory_limiter/with-settings"]
|
||||
p1 := cfg.Processors["memory_limiter/with-settings"]
|
||||
assert.Equal(t, p1,
|
||||
&Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import (
|
|||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configmodels"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
"go.opentelemetry.io/collector/processor/processorhelper"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -29,53 +30,54 @@ const (
|
|||
typeStr = "memory_limiter"
|
||||
)
|
||||
|
||||
// Factory is the factory for Attribute Key processor.
|
||||
type Factory struct {
|
||||
}
|
||||
|
||||
// Type gets the type of the config created by this factory.
|
||||
func (f *Factory) Type() configmodels.Type {
|
||||
return typeStr
|
||||
// NewFactory returns a new factory for the Memory Limiter processor.
|
||||
func NewFactory() component.ProcessorFactory {
|
||||
return processorhelper.NewFactory(
|
||||
typeStr,
|
||||
createDefaultConfig,
|
||||
processorhelper.WithTraceProcessor(createTraceProcessor),
|
||||
processorhelper.WithMetricsProcessor(createMetricsProcessor),
|
||||
processorhelper.WithLogProcessor(createLogProcessor))
|
||||
}
|
||||
|
||||
// CreateDefaultConfig creates the default configuration for processor. Notice
|
||||
// that the default configuration is expected to fail for this processor.
|
||||
func (f *Factory) CreateDefaultConfig() configmodels.Processor {
|
||||
return generateDefaultConfig()
|
||||
func createDefaultConfig() configmodels.Processor {
|
||||
return &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: typeStr,
|
||||
NameVal: typeStr,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// CreateTraceProcessor creates a trace processor based on this config.
|
||||
func (f *Factory) CreateTraceProcessor(
|
||||
func createTraceProcessor(
|
||||
_ context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.TraceConsumer,
|
||||
cfg configmodels.Processor,
|
||||
) (component.TraceProcessor, error) {
|
||||
return f.createProcessor(params.Logger, nextConsumer, nil, nil, cfg)
|
||||
return createProcessor(params.Logger, nextConsumer, nil, nil, cfg)
|
||||
}
|
||||
|
||||
// CreateMetricsProcessor creates a metrics processor based on this config.
|
||||
func (f *Factory) CreateMetricsProcessor(
|
||||
func createMetricsProcessor(
|
||||
_ context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
nextConsumer consumer.MetricsConsumer,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.MetricsConsumer,
|
||||
) (component.MetricsProcessor, error) {
|
||||
return f.createProcessor(params.Logger, nil, nextConsumer, nil, cfg)
|
||||
return createProcessor(params.Logger, nil, nextConsumer, nil, cfg)
|
||||
}
|
||||
|
||||
// CreateLogsProcessor creates a metrics processor based on this config.
|
||||
func (f *Factory) CreateLogProcessor(
|
||||
func createLogProcessor(
|
||||
_ context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.LogConsumer,
|
||||
) (component.LogProcessor, error) {
|
||||
return f.createProcessor(params.Logger, nil, nil, nextConsumer, cfg)
|
||||
return createProcessor(params.Logger, nil, nil, nextConsumer, cfg)
|
||||
}
|
||||
|
||||
var _ component.LogProcessorFactory = new(Factory)
|
||||
|
||||
type TripleTypeProcessor interface {
|
||||
consumer.TraceConsumer
|
||||
consumer.MetricsConsumer
|
||||
|
|
@ -83,7 +85,7 @@ type TripleTypeProcessor interface {
|
|||
component.Processor
|
||||
}
|
||||
|
||||
func (f *Factory) createProcessor(
|
||||
func createProcessor(
|
||||
logger *zap.Logger,
|
||||
traceConsumer consumer.TraceConsumer,
|
||||
metricConsumer consumer.MetricsConsumer,
|
||||
|
|
@ -99,12 +101,3 @@ func (f *Factory) createProcessor(
|
|||
pCfg,
|
||||
)
|
||||
}
|
||||
|
||||
func generateDefaultConfig() *Config {
|
||||
return &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: typeStr,
|
||||
NameVal: typeStr,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ import (
|
|||
)
|
||||
|
||||
func TestCreateDefaultConfig(t *testing.T) {
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
require.NotNil(t, factory)
|
||||
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
|
|
@ -38,7 +38,7 @@ func TestCreateDefaultConfig(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCreateProcessor(t *testing.T) {
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
require.NotNil(t, factory)
|
||||
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
|
|
@ -52,7 +52,8 @@ func TestCreateProcessor(t *testing.T) {
|
|||
assert.Nil(t, mp)
|
||||
assert.Error(t, err, "created processor with invalid settings")
|
||||
|
||||
lp, err := factory.CreateLogProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, exportertest.NewNopLogsExporter())
|
||||
lfactory := factory.(component.LogProcessorFactory)
|
||||
lp, err := lfactory.CreateLogProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, exportertest.NewNopLogsExporter())
|
||||
assert.Nil(t, lp)
|
||||
assert.Error(t, err, "created processor with invalid settings")
|
||||
|
||||
|
|
@ -73,7 +74,7 @@ func TestCreateProcessor(t *testing.T) {
|
|||
assert.NotNil(t, mp)
|
||||
assert.NoError(t, mp.Shutdown(context.Background()))
|
||||
|
||||
lp, err = factory.CreateLogProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, exportertest.NewNopLogsExporter())
|
||||
lp, err = lfactory.CreateLogProcessor(context.Background(), component.ProcessorCreateParams{Logger: zap.NewNop()}, cfg, exportertest.NewNopLogsExporter())
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, lp)
|
||||
assert.NoError(t, lp.Shutdown(context.Background()))
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ func TestNew(t *testing.T) {
|
|||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
cfg := generateDefaultConfig()
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.CheckInterval = tt.args.checkInterval
|
||||
cfg.MemoryLimitMiB = tt.args.memoryLimitMiB
|
||||
cfg.MemorySpikeLimitMiB = tt.args.memorySpikeLimitMiB
|
||||
|
|
|
|||
|
|
@ -0,0 +1,130 @@
|
|||
// Copyright OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package processorhelper
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configerror"
|
||||
"go.opentelemetry.io/collector/config/configmodels"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
)
|
||||
|
||||
// FactoryOption apply changes to ProcessorOptions.
|
||||
type FactoryOption func(o *factory)
|
||||
|
||||
// CreateDefaultConfig is the equivalent of component.ProcessorFactory.CreateDefaultConfig()
|
||||
type CreateDefaultConfig func() configmodels.Processor
|
||||
|
||||
// CreateTraceProcessor is the equivalent of component.ProcessorFactory.CreateTraceProcessor()
|
||||
type CreateTraceProcessor func(ctx context.Context, params component.ProcessorCreateParams, config configmodels.Processor, nextConsumer consumer.TraceConsumer) (component.TraceProcessor, error)
|
||||
|
||||
// CreateMetricsProcessor is the equivalent of component.ProcessorFactory.CreateMetricsProcessor()
|
||||
type CreateMetricsProcessor func(ctx context.Context, params component.ProcessorCreateParams, config configmodels.Processor, nextConsumer consumer.MetricsConsumer) (component.MetricsProcessor, error)
|
||||
|
||||
// CreateMetricsProcessor is the equivalent of component.ProcessorFactory.CreateLogProcessor()
|
||||
type CreateLogProcessor func(ctx context.Context, params component.ProcessorCreateParams, cfg configmodels.Processor, nextConsumer consumer.LogConsumer) (component.LogProcessor, error)
|
||||
|
||||
// factory is the factory for Jaeger gRPC exporter.
|
||||
type factory struct {
|
||||
cfgType configmodels.Type
|
||||
createDefaultConfig CreateDefaultConfig
|
||||
createTraceProcessor CreateTraceProcessor
|
||||
createMetricsProcessor CreateMetricsProcessor
|
||||
createLogProcessor CreateLogProcessor
|
||||
}
|
||||
|
||||
var _ component.LogProcessorFactory = new(factory)
|
||||
|
||||
func WithTraceProcessor(createTraceProcessor CreateTraceProcessor) FactoryOption {
|
||||
return func(o *factory) {
|
||||
o.createTraceProcessor = createTraceProcessor
|
||||
}
|
||||
}
|
||||
|
||||
func WithMetricsProcessor(createMetricsProcessor CreateMetricsProcessor) FactoryOption {
|
||||
return func(o *factory) {
|
||||
o.createMetricsProcessor = createMetricsProcessor
|
||||
}
|
||||
}
|
||||
|
||||
func WithLogProcessor(createLogProcessor CreateLogProcessor) FactoryOption {
|
||||
return func(o *factory) {
|
||||
o.createLogProcessor = createLogProcessor
|
||||
}
|
||||
}
|
||||
|
||||
// NewFactory returns a component.ProcessorFactory that only supports all types.
|
||||
func NewFactory(
|
||||
cfgType configmodels.Type,
|
||||
createDefaultConfig CreateDefaultConfig,
|
||||
options ...FactoryOption) component.ProcessorFactory {
|
||||
f := &factory{
|
||||
cfgType: cfgType,
|
||||
createDefaultConfig: createDefaultConfig,
|
||||
}
|
||||
for _, opt := range options {
|
||||
opt(f)
|
||||
}
|
||||
return f
|
||||
}
|
||||
|
||||
// Type gets the type of the Processor config created by this factory.
|
||||
func (f *factory) Type() configmodels.Type {
|
||||
return f.cfgType
|
||||
}
|
||||
|
||||
// CreateDefaultConfig creates the default configuration for processor.
|
||||
func (f *factory) CreateDefaultConfig() configmodels.Processor {
|
||||
return f.createDefaultConfig()
|
||||
}
|
||||
|
||||
// CreateTraceProcessor creates a component.TraceProcessor based on this config.
|
||||
func (f *factory) CreateTraceProcessor(
|
||||
ctx context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
nextConsumer consumer.TraceConsumer,
|
||||
cfg configmodels.Processor) (component.TraceProcessor, error) {
|
||||
if f.createTraceProcessor != nil {
|
||||
return f.createTraceProcessor(ctx, params, cfg, nextConsumer)
|
||||
}
|
||||
return nil, configerror.ErrDataTypeIsNotSupported
|
||||
}
|
||||
|
||||
// CreateMetricsProcessor creates a consumer.MetricsConsumer based on this config.
|
||||
func (f *factory) CreateMetricsProcessor(
|
||||
ctx context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
nextConsumer consumer.MetricsConsumer,
|
||||
cfg configmodels.Processor) (component.MetricsProcessor, error) {
|
||||
if f.createMetricsProcessor != nil {
|
||||
return f.createMetricsProcessor(ctx, params, cfg, nextConsumer)
|
||||
}
|
||||
return nil, configerror.ErrDataTypeIsNotSupported
|
||||
}
|
||||
|
||||
// CreateLogProcessor creates a metrics processor based on this config.
|
||||
func (f *factory) CreateLogProcessor(
|
||||
ctx context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.LogConsumer,
|
||||
) (component.LogProcessor, error) {
|
||||
if f.createLogProcessor != nil {
|
||||
return f.createLogProcessor(ctx, params, cfg, nextConsumer)
|
||||
}
|
||||
return nil, configerror.ErrDataTypeIsNotSupported
|
||||
}
|
||||
|
|
@ -0,0 +1,86 @@
|
|||
// Copyright OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package processorhelper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configmodels"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
)
|
||||
|
||||
const typeStr = "test"
|
||||
|
||||
var defaultCfg = &configmodels.ProcessorSettings{
|
||||
TypeVal: typeStr,
|
||||
NameVal: typeStr,
|
||||
}
|
||||
|
||||
func TestNewTrace(t *testing.T) {
|
||||
factory := NewFactory(
|
||||
typeStr,
|
||||
defaultConfig)
|
||||
assert.EqualValues(t, typeStr, factory.Type())
|
||||
assert.EqualValues(t, defaultCfg, factory.CreateDefaultConfig())
|
||||
_, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, defaultCfg)
|
||||
assert.Error(t, err)
|
||||
_, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, defaultCfg)
|
||||
assert.Error(t, err)
|
||||
|
||||
lfactory := factory.(component.LogProcessorFactory)
|
||||
_, err = lfactory.CreateLogProcessor(context.Background(), component.ProcessorCreateParams{}, defaultCfg, nil)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestNewMetrics_WithConstructors(t *testing.T) {
|
||||
factory := NewFactory(
|
||||
typeStr,
|
||||
defaultConfig,
|
||||
WithTraceProcessor(createTraceProcessor),
|
||||
WithMetricsProcessor(createMetricsProcessor),
|
||||
WithLogProcessor(createLogProcessor))
|
||||
assert.EqualValues(t, typeStr, factory.Type())
|
||||
assert.EqualValues(t, defaultCfg, factory.CreateDefaultConfig())
|
||||
|
||||
_, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, defaultCfg)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, err = factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, defaultCfg)
|
||||
assert.NoError(t, err)
|
||||
|
||||
lfactory := factory.(component.LogProcessorFactory)
|
||||
_, err = lfactory.CreateLogProcessor(context.Background(), component.ProcessorCreateParams{}, defaultCfg, nil)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func defaultConfig() configmodels.Processor {
|
||||
return defaultCfg
|
||||
}
|
||||
|
||||
func createTraceProcessor(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.TraceConsumer) (component.TraceProcessor, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func createMetricsProcessor(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.MetricsConsumer) (component.MetricsProcessor, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func createLogProcessor(context.Context, component.ProcessorCreateParams, configmodels.Processor, consumer.LogConsumer) (component.LogProcessor, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
@ -30,8 +30,8 @@ func TestLoadConfig(t *testing.T) {
|
|||
factories, err := config.ExampleComponents()
|
||||
assert.NoError(t, err)
|
||||
|
||||
factory := &Factory{}
|
||||
factories.Processors[typeStr] = &Factory{}
|
||||
factory := NewFactory()
|
||||
factories.Processors[typeStr] = factory
|
||||
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
|
||||
|
||||
require.Nil(t, err)
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configmodels"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
"go.opentelemetry.io/collector/processor/processorhelper"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -28,43 +29,16 @@ const (
|
|||
typeStr = "queued_retry"
|
||||
)
|
||||
|
||||
// Factory is the factory for OpenCensus exporter.
|
||||
type Factory struct {
|
||||
// NewFactory returns a new factory for the Queued processor.
|
||||
func NewFactory() component.ProcessorFactory {
|
||||
return processorhelper.NewFactory(
|
||||
typeStr,
|
||||
createDefaultConfig,
|
||||
processorhelper.WithTraceProcessor(createTraceProcessor),
|
||||
processorhelper.WithMetricsProcessor(createMetricsProcessor))
|
||||
}
|
||||
|
||||
// Type gets the type of the Option config created by this factory.
|
||||
func (f *Factory) Type() configmodels.Type {
|
||||
return typeStr
|
||||
}
|
||||
|
||||
// CreateDefaultConfig creates the default configuration for exporter.
|
||||
func (f *Factory) CreateDefaultConfig() configmodels.Processor {
|
||||
return generateDefaultConfig()
|
||||
}
|
||||
|
||||
// CreateTraceProcessor creates a trace processor based on this config.
|
||||
func (f *Factory) CreateTraceProcessor(
|
||||
_ context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
nextConsumer consumer.TraceConsumer,
|
||||
cfg configmodels.Processor,
|
||||
) (component.TraceProcessor, error) {
|
||||
oCfg := cfg.(*Config)
|
||||
return newQueuedTracesProcessor(params, nextConsumer, oCfg), nil
|
||||
}
|
||||
|
||||
// CreateMetricsProcessor creates a metrics processor based on this config.
|
||||
func (f *Factory) CreateMetricsProcessor(
|
||||
_ context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
nextConsumer consumer.MetricsConsumer,
|
||||
cfg configmodels.Processor,
|
||||
) (component.MetricsProcessor, error) {
|
||||
oCfg := cfg.(*Config)
|
||||
return newQueuedMetricsProcessor(params, nextConsumer, oCfg), nil
|
||||
}
|
||||
|
||||
func generateDefaultConfig() *Config {
|
||||
func createDefaultConfig() configmodels.Processor {
|
||||
return &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: typeStr,
|
||||
|
|
@ -76,3 +50,21 @@ func generateDefaultConfig() *Config {
|
|||
BackoffDelay: time.Second * 5,
|
||||
}
|
||||
}
|
||||
|
||||
func createTraceProcessor(
|
||||
_ context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.TraceConsumer,
|
||||
) (component.TraceProcessor, error) {
|
||||
return newQueuedTracesProcessor(params, nextConsumer, cfg.(*Config)), nil
|
||||
}
|
||||
|
||||
func createMetricsProcessor(
|
||||
_ context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.MetricsConsumer,
|
||||
) (component.MetricsProcessor, error) {
|
||||
return newQueuedMetricsProcessor(params, nextConsumer, cfg.(*Config)), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,14 +26,14 @@ import (
|
|||
)
|
||||
|
||||
func TestCreateDefaultConfig(t *testing.T) {
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
assert.NotNil(t, cfg, "failed to create default config")
|
||||
assert.NoError(t, configcheck.ValidateConfig(cfg))
|
||||
}
|
||||
|
||||
func TestCreateProcessor(t *testing.T) {
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
|
||||
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ func TestTraceQueueProcessor_NoEnqueueOnPermanentError(t *testing.T) {
|
|||
mockP := newMockConcurrentSpanProcessor()
|
||||
mockP.updateError(consumererror.Permanent(errors.New("bad data")))
|
||||
|
||||
cfg := generateDefaultConfig()
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.RetryOnFailure = true
|
||||
cfg.BackoffDelay = time.Hour
|
||||
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
|
||||
|
|
@ -82,7 +82,7 @@ func TestTraceQueueProcessor_EnqueueOnNoRetry(t *testing.T) {
|
|||
mockP := newMockConcurrentSpanProcessor()
|
||||
mockP.updateError(errors.New("transient error"))
|
||||
|
||||
cfg := generateDefaultConfig()
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.RetryOnFailure = false
|
||||
cfg.BackoffDelay = 0
|
||||
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
|
||||
|
|
@ -115,7 +115,7 @@ func TestTraceQueueProcessor_PartialError(t *testing.T) {
|
|||
mockP := newMockConcurrentSpanProcessor()
|
||||
mockP.updateError(partialErr)
|
||||
|
||||
cfg := generateDefaultConfig()
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.NumWorkers = 1
|
||||
cfg.RetryOnFailure = true
|
||||
cfg.BackoffDelay = time.Second
|
||||
|
|
@ -153,7 +153,7 @@ func TestTraceQueueProcessor_EnqueueOnError(t *testing.T) {
|
|||
mockP := newMockConcurrentSpanProcessor()
|
||||
mockP.updateError(errors.New("transient error"))
|
||||
|
||||
cfg := generateDefaultConfig()
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.NumWorkers = 1
|
||||
cfg.QueueSize = 1
|
||||
cfg.RetryOnFailure = true
|
||||
|
|
@ -192,7 +192,7 @@ func TestMetricsQueueProcessor_NoEnqueueOnPermanentError(t *testing.T) {
|
|||
mockP := newMockConcurrentSpanProcessor()
|
||||
mockP.updateError(consumererror.Permanent(errors.New("bad data")))
|
||||
|
||||
cfg := generateDefaultConfig()
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.RetryOnFailure = true
|
||||
cfg.BackoffDelay = time.Hour
|
||||
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
|
||||
|
|
@ -224,7 +224,7 @@ func TestMetricsQueueProcessor_NoEnqueueOnNoRetry(t *testing.T) {
|
|||
mockP := newMockConcurrentSpanProcessor()
|
||||
mockP.updateError(errors.New("transient error"))
|
||||
|
||||
cfg := generateDefaultConfig()
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.RetryOnFailure = false
|
||||
cfg.BackoffDelay = 0
|
||||
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
|
||||
|
|
@ -256,7 +256,7 @@ func TestMetricsQueueProcessor_EnqueueOnError(t *testing.T) {
|
|||
mockP := newMockConcurrentSpanProcessor()
|
||||
mockP.updateError(errors.New("transient error"))
|
||||
|
||||
cfg := generateDefaultConfig()
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
cfg.NumWorkers = 1
|
||||
cfg.QueueSize = 1
|
||||
cfg.RetryOnFailure = true
|
||||
|
|
@ -296,7 +296,7 @@ func TestTraceQueueProcessorHappyPath(t *testing.T) {
|
|||
|
||||
mockP := newMockConcurrentSpanProcessor()
|
||||
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
|
||||
cfg := generateDefaultConfig()
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
qp := newQueuedTracesProcessor(creationParams, mockP, cfg)
|
||||
require.NoError(t, qp.Start(context.Background(), componenttest.NewNopHost()))
|
||||
t.Cleanup(func() {
|
||||
|
|
@ -340,7 +340,7 @@ func TestMetricsQueueProcessorHappyPath(t *testing.T) {
|
|||
|
||||
mockP := newMockConcurrentSpanProcessor()
|
||||
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
|
||||
cfg := generateDefaultConfig()
|
||||
cfg := createDefaultConfig().(*Config)
|
||||
qp := newQueuedMetricsProcessor(creationParams, mockP, cfg)
|
||||
require.NoError(t, qp.Start(context.Background(), componenttest.NewNopHost()))
|
||||
t.Cleanup(func() {
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ func TestLoadConfig(t *testing.T) {
|
|||
factories, err := config.ExampleComponents()
|
||||
assert.NoError(t, err)
|
||||
|
||||
factories.Processors[typeStr] = &Factory{}
|
||||
factories.Processors[typeStr] = NewFactory()
|
||||
|
||||
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
|
||||
assert.NoError(t, err)
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import (
|
|||
"go.opentelemetry.io/collector/config/configmodels"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
"go.opentelemetry.io/collector/internal/processor/attraction"
|
||||
"go.opentelemetry.io/collector/processor/processorhelper"
|
||||
"go.opentelemetry.io/collector/translator/conventions"
|
||||
)
|
||||
|
||||
|
|
@ -32,18 +33,17 @@ const (
|
|||
typeStr = "resource"
|
||||
)
|
||||
|
||||
// Factory is the factory for OpenCensus exporter.
|
||||
type Factory struct {
|
||||
// NewFactory returns a new factory for the Resource processor.
|
||||
func NewFactory() component.ProcessorFactory {
|
||||
return processorhelper.NewFactory(
|
||||
typeStr,
|
||||
createDefaultConfig,
|
||||
processorhelper.WithTraceProcessor(createTraceProcessor),
|
||||
processorhelper.WithMetricsProcessor(createMetricsProcessor))
|
||||
}
|
||||
|
||||
// Type gets the type of the Option config created by this factory.
|
||||
func (*Factory) Type() configmodels.Type {
|
||||
return typeStr
|
||||
}
|
||||
|
||||
// CreateDefaultConfig creates the default configuration for processor.
|
||||
// Note: This isn't a valid configuration because the processor would do no work.
|
||||
func (*Factory) CreateDefaultConfig() configmodels.Processor {
|
||||
func createDefaultConfig() configmodels.Processor {
|
||||
return &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: typeStr,
|
||||
|
|
@ -52,13 +52,11 @@ func (*Factory) CreateDefaultConfig() configmodels.Processor {
|
|||
}
|
||||
}
|
||||
|
||||
// CreateTraceProcessor creates a trace processor based on this config.
|
||||
func (*Factory) CreateTraceProcessor(
|
||||
ctx context.Context,
|
||||
func createTraceProcessor(
|
||||
_ context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
nextConsumer consumer.TraceConsumer,
|
||||
cfg configmodels.Processor,
|
||||
) (component.TraceProcessor, error) {
|
||||
nextConsumer consumer.TraceConsumer) (component.TraceProcessor, error) {
|
||||
attrProc, err := createAttrProcessor(cfg.(*Config), params.Logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -66,13 +64,11 @@ func (*Factory) CreateTraceProcessor(
|
|||
return newResourceTraceProcessor(nextConsumer, attrProc), nil
|
||||
}
|
||||
|
||||
// CreateMetricsProcessor creates a metrics processor based on this config.
|
||||
func (*Factory) CreateMetricsProcessor(
|
||||
ctx context.Context,
|
||||
func createMetricsProcessor(
|
||||
_ context.Context,
|
||||
params component.ProcessorCreateParams,
|
||||
nextConsumer consumer.MetricsConsumer,
|
||||
cfg configmodels.Processor,
|
||||
) (component.MetricsProcessor, error) {
|
||||
nextConsumer consumer.MetricsConsumer) (component.MetricsProcessor, error) {
|
||||
attrProc, err := createAttrProcessor(cfg.(*Config), params.Logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -28,14 +28,14 @@ import (
|
|||
)
|
||||
|
||||
func TestCreateDefaultConfig(t *testing.T) {
|
||||
var factory Factory
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
assert.NoError(t, configcheck.ValidateConfig(cfg))
|
||||
assert.NotNil(t, cfg)
|
||||
}
|
||||
|
||||
func TestCreateProcessor(t *testing.T) {
|
||||
var factory Factory
|
||||
factory := NewFactory()
|
||||
cfg := &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: "resource",
|
||||
|
|
@ -56,7 +56,7 @@ func TestCreateProcessor(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestInvalidEmptyActions(t *testing.T) {
|
||||
var factory Factory
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
|
||||
_, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
|
||||
|
|
@ -67,7 +67,7 @@ func TestInvalidEmptyActions(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestInvalidAttributeActions(t *testing.T) {
|
||||
var factory Factory
|
||||
factory := NewFactory()
|
||||
cfg := &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: "resource",
|
||||
|
|
|
|||
|
|
@ -30,15 +30,15 @@ func TestLoadConfig(t *testing.T) {
|
|||
factories, err := config.ExampleComponents()
|
||||
assert.NoError(t, err)
|
||||
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
factories.Processors[typeStr] = factory
|
||||
|
||||
config, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
|
||||
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, config)
|
||||
assert.NotNil(t, cfg)
|
||||
|
||||
p0 := config.Processors["span/custom"]
|
||||
p0 := cfg.Processors["span/custom"]
|
||||
assert.Equal(t, p0, &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: typeStr,
|
||||
|
|
@ -50,7 +50,7 @@ func TestLoadConfig(t *testing.T) {
|
|||
},
|
||||
})
|
||||
|
||||
p1 := config.Processors["span/no-separator"]
|
||||
p1 := cfg.Processors["span/no-separator"]
|
||||
assert.Equal(t, p1, &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: typeStr,
|
||||
|
|
@ -62,7 +62,7 @@ func TestLoadConfig(t *testing.T) {
|
|||
},
|
||||
})
|
||||
|
||||
p2 := config.Processors["span/to_attributes"]
|
||||
p2 := cfg.Processors["span/to_attributes"]
|
||||
assert.Equal(t, p2, &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: typeStr,
|
||||
|
|
@ -75,7 +75,7 @@ func TestLoadConfig(t *testing.T) {
|
|||
},
|
||||
})
|
||||
|
||||
p3 := config.Processors["span/includeexclude"]
|
||||
p3 := cfg.Processors["span/includeexclude"]
|
||||
assert.Equal(t, p3, &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: typeStr,
|
||||
|
|
|
|||
|
|
@ -19,9 +19,9 @@ import (
|
|||
"errors"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configerror"
|
||||
"go.opentelemetry.io/collector/config/configmodels"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
"go.opentelemetry.io/collector/processor/processorhelper"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -31,23 +31,19 @@ const (
|
|||
|
||||
// errMissingRequiredField is returned when a required field in the config
|
||||
// is not specified.
|
||||
// TODO https://go.opentelemetry.io/collector/issues/215
|
||||
// TODO https://github.com/open-telemetry/opentelemetry-collector/issues/215
|
||||
// Move this to the error package that allows for span name and field to be specified.
|
||||
var errMissingRequiredField = errors.New("error creating \"span\" processor: either \"from_attributes\" or \"to_attributes\" must be specified in \"name:\"")
|
||||
|
||||
// Factory is the factory for the Span processor.
|
||||
type Factory struct {
|
||||
// NewFactory returns a new factory for the Span processor.
|
||||
func NewFactory() component.ProcessorFactory {
|
||||
return processorhelper.NewFactory(
|
||||
typeStr,
|
||||
createDefaultConfig,
|
||||
processorhelper.WithTraceProcessor(createTraceProcessor))
|
||||
}
|
||||
|
||||
var _ component.ProcessorFactory = (*Factory)(nil)
|
||||
|
||||
// Type gets the type of the config created by this factory.
|
||||
func (f *Factory) Type() configmodels.Type {
|
||||
return typeStr
|
||||
}
|
||||
|
||||
// CreateDefaultConfig creates the default configuration for processor.
|
||||
func (f *Factory) CreateDefaultConfig() configmodels.Processor {
|
||||
func createDefaultConfig() configmodels.Processor {
|
||||
return &Config{
|
||||
ProcessorSettings: configmodels.ProcessorSettings{
|
||||
TypeVal: typeStr,
|
||||
|
|
@ -56,12 +52,12 @@ func (f *Factory) CreateDefaultConfig() configmodels.Processor {
|
|||
}
|
||||
}
|
||||
|
||||
// CreateTraceProcessor creates a trace processor based on this config.
|
||||
func (f *Factory) CreateTraceProcessor(
|
||||
func createTraceProcessor(
|
||||
_ context.Context,
|
||||
_ component.ProcessorCreateParams,
|
||||
cfg configmodels.Processor,
|
||||
nextConsumer consumer.TraceConsumer,
|
||||
cfg configmodels.Processor) (component.TraceProcessor, error) {
|
||||
) (component.TraceProcessor, error) {
|
||||
|
||||
// 'from_attributes' or 'to_attributes' under 'name' has to be set for the span
|
||||
// processor to be valid. If not set and not enforced, the processor would do no work.
|
||||
|
|
@ -73,13 +69,3 @@ func (f *Factory) CreateTraceProcessor(
|
|||
|
||||
return newSpanProcessor(nextConsumer, *oCfg)
|
||||
}
|
||||
|
||||
// CreateMetricsProcessor creates a metric processor based on this config.
|
||||
func (f *Factory) CreateMetricsProcessor(
|
||||
_ context.Context,
|
||||
_ component.ProcessorCreateParams,
|
||||
_ consumer.MetricsConsumer,
|
||||
_ configmodels.Processor) (component.MetricsProcessor, error) {
|
||||
// Span Processor does not support Metrics.
|
||||
return nil, configerror.ErrDataTypeIsNotSupported
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,12 +31,12 @@ import (
|
|||
)
|
||||
|
||||
func TestFactory_Type(t *testing.T) {
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
assert.Equal(t, factory.Type(), configmodels.Type(typeStr))
|
||||
}
|
||||
|
||||
func TestFactory_CreateDefaultConfig(t *testing.T) {
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
assert.NoError(t, configcheck.ValidateConfig(cfg))
|
||||
|
||||
|
|
@ -47,7 +47,7 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFactory_CreateTraceProcessor(t *testing.T) {
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
|
||||
|
|
@ -63,7 +63,7 @@ func TestFactory_CreateTraceProcessor(t *testing.T) {
|
|||
// TestFactory_CreateTraceProcessor_InvalidConfig ensures the default configuration
|
||||
// returns an error.
|
||||
func TestFactory_CreateTraceProcessor_InvalidConfig(t *testing.T) {
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
|
||||
testcases := []struct {
|
||||
name string
|
||||
|
|
@ -100,7 +100,7 @@ func TestFactory_CreateTraceProcessor_InvalidConfig(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestFactory_CreateMetricProcessor(t *testing.T) {
|
||||
factory := &Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
|
||||
mp, err := factory.CreateMetricsProcessor(
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ import (
|
|||
)
|
||||
|
||||
func TestNewTraceProcessor(t *testing.T) {
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
tp, err := newSpanProcessor(nil, *oCfg)
|
||||
|
|
@ -146,7 +146,7 @@ func TestSpanProcessor_NilEmptyData(t *testing.T) {
|
|||
output: testdata.GenerateTraceDataOneEmptyOneNilInstrumentationLibrary(),
|
||||
},
|
||||
}
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Include = &filterspan.MatchProperties{
|
||||
|
|
@ -255,7 +255,7 @@ func TestSpanProcessor_Values(t *testing.T) {
|
|||
},*/
|
||||
}
|
||||
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Rename.FromAttributes = []string{"key1"}
|
||||
|
|
@ -331,7 +331,7 @@ func TestSpanProcessor_MissingKeys(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Rename.FromAttributes = []string{"key1", "key2", "key3", "key4"}
|
||||
|
|
@ -350,7 +350,7 @@ func TestSpanProcessor_MissingKeys(t *testing.T) {
|
|||
// the single key.
|
||||
func TestSpanProcessor_Separator(t *testing.T) {
|
||||
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Rename.FromAttributes = []string{"key1"}
|
||||
|
|
@ -380,7 +380,7 @@ func TestSpanProcessor_Separator(t *testing.T) {
|
|||
// TestSpanProcessor_NoSeparatorMultipleKeys tests naming a span using multiple keys and no separator.
|
||||
func TestSpanProcessor_NoSeparatorMultipleKeys(t *testing.T) {
|
||||
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Rename.FromAttributes = []string{"key1", "key2"}
|
||||
|
|
@ -411,7 +411,7 @@ func TestSpanProcessor_NoSeparatorMultipleKeys(t *testing.T) {
|
|||
// TestSpanProcessor_SeparatorMultipleKeys tests naming a span with multiple keys and a separator.
|
||||
func TestSpanProcessor_SeparatorMultipleKeys(t *testing.T) {
|
||||
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Rename.FromAttributes = []string{"key1", "key2", "key3", "key4"}
|
||||
|
|
@ -447,7 +447,7 @@ func TestSpanProcessor_SeparatorMultipleKeys(t *testing.T) {
|
|||
// TestSpanProcessor_NilName tests naming a span when the input span had no name.
|
||||
func TestSpanProcessor_NilName(t *testing.T) {
|
||||
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Rename.FromAttributes = []string{"key1"}
|
||||
|
|
@ -543,7 +543,7 @@ func TestSpanProcessor_ToAttributes(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Rename.ToAttributes = &ToAttributes{}
|
||||
|
|
@ -604,7 +604,7 @@ func TestSpanProcessor_skipSpan(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
factory := Factory{}
|
||||
factory := NewFactory()
|
||||
cfg := factory.CreateDefaultConfig()
|
||||
oCfg := cfg.(*Config)
|
||||
oCfg.Include = &filterspan.MatchProperties{
|
||||
|
|
|
|||
|
|
@ -288,7 +288,7 @@ func assertEqualMetricsData(t *testing.T, expected consumerdata.MetricsData, act
|
|||
func testPipeline(t *testing.T, pipelineName string, exporterNames []string) {
|
||||
factories, err := config.ExampleComponents()
|
||||
assert.NoError(t, err)
|
||||
attrFactory := &attributesprocessor.Factory{}
|
||||
attrFactory := attributesprocessor.NewFactory()
|
||||
factories.Processors[attrFactory.Type()] = attrFactory
|
||||
cfg, err := config.LoadConfigFile(t, "testdata/pipelines_builder.yaml", factories)
|
||||
// Load the config
|
||||
|
|
@ -348,7 +348,7 @@ func testPipeline(t *testing.T, pipelineName string, exporterNames []string) {
|
|||
func TestPipelinesBuilder_Error(t *testing.T) {
|
||||
factories, err := config.ExampleComponents()
|
||||
assert.NoError(t, err)
|
||||
attrFactory := &attributesprocessor.Factory{}
|
||||
attrFactory := attributesprocessor.NewFactory()
|
||||
factories.Processors[attrFactory.Type()] = attrFactory
|
||||
cfg, err := config.LoadConfigFile(t, "testdata/pipelines_builder.yaml", factories)
|
||||
require.Nil(t, err)
|
||||
|
|
|
|||
|
|
@ -96,7 +96,7 @@ func testReceivers(
|
|||
factories, err := config.ExampleComponents()
|
||||
assert.NoError(t, err)
|
||||
|
||||
attrFactory := &attributesprocessor.Factory{}
|
||||
attrFactory := attributesprocessor.NewFactory()
|
||||
factories.Processors[attrFactory.Type()] = attrFactory
|
||||
cfg, err := config.LoadConfigFile(t, "testdata/pipelines_builder.yaml", factories)
|
||||
require.Nil(t, err)
|
||||
|
|
@ -268,7 +268,7 @@ func TestReceiversBuilder_DataTypeError(t *testing.T) {
|
|||
factories, err := config.ExampleComponents()
|
||||
assert.NoError(t, err)
|
||||
|
||||
attrFactory := &attributesprocessor.Factory{}
|
||||
attrFactory := attributesprocessor.NewFactory()
|
||||
factories.Processors[attrFactory.Type()] = attrFactory
|
||||
cfg, err := config.LoadConfigFile(t, "testdata/pipelines_builder.yaml", factories)
|
||||
assert.NoError(t, err)
|
||||
|
|
@ -369,7 +369,7 @@ func TestReceiversBuilder_Unused(t *testing.T) {
|
|||
factories, err := config.ExampleComponents()
|
||||
assert.NoError(t, err)
|
||||
|
||||
attrFactory := &attributesprocessor.Factory{}
|
||||
attrFactory := attributesprocessor.NewFactory()
|
||||
factories.Processors[attrFactory.Type()] = attrFactory
|
||||
|
||||
zpkFactory := &zipkinreceiver.Factory{}
|
||||
|
|
|
|||
|
|
@ -89,15 +89,15 @@ func Components() (
|
|||
}
|
||||
|
||||
processors, err := component.MakeProcessorFactoryMap(
|
||||
&attributesprocessor.Factory{},
|
||||
&resourceprocessor.Factory{},
|
||||
&queuedprocessor.Factory{},
|
||||
&batchprocessor.Factory{},
|
||||
&memorylimiter.Factory{},
|
||||
attributesprocessor.NewFactory(),
|
||||
resourceprocessor.NewFactory(),
|
||||
queuedprocessor.NewFactory(),
|
||||
batchprocessor.NewFactory(),
|
||||
memorylimiter.NewFactory(),
|
||||
&tailsamplingprocessor.Factory{},
|
||||
&probabilisticsamplerprocessor.Factory{},
|
||||
&spanprocessor.Factory{},
|
||||
&filterprocessor.Factory{},
|
||||
spanprocessor.NewFactory(),
|
||||
filterprocessor.NewFactory(),
|
||||
)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
|
|
|
|||
Loading…
Reference in New Issue