Avoid using config2 in imports, remove duplicate code in exporter builder (#2863)
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
parent
929f737475
commit
196df24cda
|
|
@ -29,7 +29,7 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
config2 "go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/config/confighttp"
|
||||
"go.opentelemetry.io/collector/consumer/pdata"
|
||||
"go.opentelemetry.io/collector/exporter/exporterhelper"
|
||||
|
|
@ -42,8 +42,8 @@ import (
|
|||
|
||||
// Test_ NewPrwExporter checks that a new exporter instance with non-nil fields is initialized
|
||||
func Test_NewPrwExporter(t *testing.T) {
|
||||
config := &Config{
|
||||
ExporterSettings: config2.ExporterSettings{},
|
||||
cfg := &Config{
|
||||
ExporterSettings: config.ExporterSettings{},
|
||||
TimeoutSettings: exporterhelper.TimeoutSettings{},
|
||||
QueueSettings: exporterhelper.QueueSettings{},
|
||||
RetrySettings: exporterhelper.RetrySettings{},
|
||||
|
|
@ -62,7 +62,7 @@ func Test_NewPrwExporter(t *testing.T) {
|
|||
}{
|
||||
{
|
||||
"invalid_URL",
|
||||
config,
|
||||
cfg,
|
||||
"test",
|
||||
"invalid URL",
|
||||
map[string]string{"Key1": "Val1"},
|
||||
|
|
@ -71,7 +71,7 @@ func Test_NewPrwExporter(t *testing.T) {
|
|||
},
|
||||
{
|
||||
"nil_client",
|
||||
config,
|
||||
cfg,
|
||||
"test",
|
||||
"http://some.url:9411/api/prom/push",
|
||||
map[string]string{"Key1": "Val1"},
|
||||
|
|
@ -80,7 +80,7 @@ func Test_NewPrwExporter(t *testing.T) {
|
|||
},
|
||||
{
|
||||
"invalid_labels_case",
|
||||
config,
|
||||
cfg,
|
||||
"test",
|
||||
"http://some.url:9411/api/prom/push",
|
||||
map[string]string{"Key1": ""},
|
||||
|
|
@ -89,7 +89,7 @@ func Test_NewPrwExporter(t *testing.T) {
|
|||
},
|
||||
{
|
||||
"success_case",
|
||||
config,
|
||||
cfg,
|
||||
"test",
|
||||
"http://some.url:9411/api/prom/push",
|
||||
map[string]string{"Key1": "Val1"},
|
||||
|
|
@ -98,7 +98,7 @@ func Test_NewPrwExporter(t *testing.T) {
|
|||
},
|
||||
{
|
||||
"success_case_no_labels",
|
||||
config,
|
||||
cfg,
|
||||
"test",
|
||||
"http://some.url:9411/api/prom/push",
|
||||
map[string]string{},
|
||||
|
|
@ -685,7 +685,7 @@ func Test_PushMetrics(t *testing.T) {
|
|||
assert.NoError(t, uErr)
|
||||
|
||||
config := &Config{
|
||||
ExporterSettings: config2.ExporterSettings{
|
||||
ExporterSettings: config.ExporterSettings{
|
||||
TypeVal: "prometheusremotewrite",
|
||||
NameVal: "prometheusremotewrite",
|
||||
},
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ import (
|
|||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/component/componenttest"
|
||||
config2 "go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/config/confighttp"
|
||||
"go.opentelemetry.io/collector/receiver/zipkinreceiver"
|
||||
"go.opentelemetry.io/collector/testutil"
|
||||
|
|
@ -59,13 +59,13 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) {
|
|||
}))
|
||||
defer cst.Close()
|
||||
|
||||
config := &Config{
|
||||
cfg := &Config{
|
||||
HTTPClientSettings: confighttp.HTTPClientSettings{
|
||||
Endpoint: cst.URL,
|
||||
},
|
||||
Format: "json",
|
||||
}
|
||||
zexp, err := NewFactory().CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, config)
|
||||
zexp, err := NewFactory().CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
|
||||
assert.NoError(t, err)
|
||||
require.NotNil(t, zexp)
|
||||
|
||||
|
|
@ -75,15 +75,15 @@ func TestZipkinExporter_roundtripJSON(t *testing.T) {
|
|||
|
||||
// Run the Zipkin receiver to "receive spans upload from a client application"
|
||||
addr := testutil.GetAvailableLocalAddress(t)
|
||||
cfg := &zipkinreceiver.Config{
|
||||
ReceiverSettings: config2.ReceiverSettings{
|
||||
recvCfg := &zipkinreceiver.Config{
|
||||
ReceiverSettings: config.ReceiverSettings{
|
||||
NameVal: "zipkin_receiver",
|
||||
},
|
||||
HTTPServerSettings: confighttp.HTTPServerSettings{
|
||||
Endpoint: addr,
|
||||
},
|
||||
}
|
||||
zi, err := zipkinreceiver.New(cfg, zexp)
|
||||
zi, err := zipkinreceiver.New(recvCfg, zexp)
|
||||
assert.NoError(t, err)
|
||||
require.NotNil(t, zi)
|
||||
|
||||
|
|
@ -307,13 +307,13 @@ func TestZipkinExporter_roundtripProto(t *testing.T) {
|
|||
}))
|
||||
defer cst.Close()
|
||||
|
||||
config := &Config{
|
||||
cfg := &Config{
|
||||
HTTPClientSettings: confighttp.HTTPClientSettings{
|
||||
Endpoint: cst.URL,
|
||||
},
|
||||
Format: "proto",
|
||||
}
|
||||
zexp, err := NewFactory().CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, config)
|
||||
zexp, err := NewFactory().CreateTracesExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
// The test requires the spans from zipkinSpansJSONJavaLibrary to be sent in a single batch, use
|
||||
|
|
@ -324,15 +324,15 @@ func TestZipkinExporter_roundtripProto(t *testing.T) {
|
|||
|
||||
// Run the Zipkin receiver to "receive spans upload from a client application"
|
||||
port := testutil.GetAvailablePort(t)
|
||||
cfg := &zipkinreceiver.Config{
|
||||
ReceiverSettings: config2.ReceiverSettings{
|
||||
recvCfg := &zipkinreceiver.Config{
|
||||
ReceiverSettings: config.ReceiverSettings{
|
||||
NameVal: "zipkin_receiver",
|
||||
},
|
||||
HTTPServerSettings: confighttp.HTTPServerSettings{
|
||||
Endpoint: fmt.Sprintf(":%d", port),
|
||||
},
|
||||
}
|
||||
zi, err := zipkinreceiver.New(cfg, zexp)
|
||||
zi, err := zipkinreceiver.New(recvCfg, zexp)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = zi.Start(context.Background(), componenttest.NewNopHost())
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/collector/component/componenttest"
|
||||
config2 "go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/config/configtest"
|
||||
"go.opentelemetry.io/collector/internal/processor/filtermetric"
|
||||
fsregexp "go.opentelemetry.io/collector/internal/processor/filterset/regexp"
|
||||
|
|
@ -45,11 +45,11 @@ func TestLoadingConfigStrict(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
|
||||
factory := NewFactory()
|
||||
factories.Processors[config2.Type(typeStr)] = factory
|
||||
config, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config_strict.yaml"), factories)
|
||||
factories.Processors[config.Type(typeStr)] = factory
|
||||
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config_strict.yaml"), factories)
|
||||
|
||||
assert.Nil(t, err)
|
||||
require.NotNil(t, config)
|
||||
require.NotNil(t, cfg)
|
||||
|
||||
tests := []struct {
|
||||
filterName string
|
||||
|
|
@ -58,7 +58,7 @@ func TestLoadingConfigStrict(t *testing.T) {
|
|||
{
|
||||
filterName: "filter/empty",
|
||||
expCfg: &Config{
|
||||
ProcessorSettings: config2.ProcessorSettings{
|
||||
ProcessorSettings: config.ProcessorSettings{
|
||||
NameVal: "filter/empty",
|
||||
TypeVal: typeStr,
|
||||
},
|
||||
|
|
@ -71,7 +71,7 @@ func TestLoadingConfigStrict(t *testing.T) {
|
|||
}, {
|
||||
filterName: "filter/include",
|
||||
expCfg: &Config{
|
||||
ProcessorSettings: config2.ProcessorSettings{
|
||||
ProcessorSettings: config.ProcessorSettings{
|
||||
NameVal: "filter/include",
|
||||
TypeVal: typeStr,
|
||||
},
|
||||
|
|
@ -82,7 +82,7 @@ func TestLoadingConfigStrict(t *testing.T) {
|
|||
}, {
|
||||
filterName: "filter/exclude",
|
||||
expCfg: &Config{
|
||||
ProcessorSettings: config2.ProcessorSettings{
|
||||
ProcessorSettings: config.ProcessorSettings{
|
||||
NameVal: "filter/exclude",
|
||||
TypeVal: typeStr,
|
||||
},
|
||||
|
|
@ -93,7 +93,7 @@ func TestLoadingConfigStrict(t *testing.T) {
|
|||
}, {
|
||||
filterName: "filter/includeexclude",
|
||||
expCfg: &Config{
|
||||
ProcessorSettings: config2.ProcessorSettings{
|
||||
ProcessorSettings: config.ProcessorSettings{
|
||||
NameVal: "filter/includeexclude",
|
||||
TypeVal: typeStr,
|
||||
},
|
||||
|
|
@ -110,7 +110,7 @@ func TestLoadingConfigStrict(t *testing.T) {
|
|||
|
||||
for _, test := range tests {
|
||||
t.Run(test.filterName, func(t *testing.T) {
|
||||
cfg := config.Processors[test.filterName]
|
||||
cfg := cfg.Processors[test.filterName]
|
||||
assert.Equal(t, test.expCfg, cfg)
|
||||
})
|
||||
}
|
||||
|
|
@ -140,10 +140,10 @@ func TestLoadingConfigRegexp(t *testing.T) {
|
|||
|
||||
factory := NewFactory()
|
||||
factories.Processors[typeStr] = factory
|
||||
config, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config_regexp.yaml"), factories)
|
||||
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config_regexp.yaml"), factories)
|
||||
|
||||
assert.Nil(t, err)
|
||||
require.NotNil(t, config)
|
||||
require.NotNil(t, cfg)
|
||||
|
||||
tests := []struct {
|
||||
filterName string
|
||||
|
|
@ -152,7 +152,7 @@ func TestLoadingConfigRegexp(t *testing.T) {
|
|||
{
|
||||
filterName: "filter/include",
|
||||
expCfg: &Config{
|
||||
ProcessorSettings: config2.ProcessorSettings{
|
||||
ProcessorSettings: config.ProcessorSettings{
|
||||
NameVal: "filter/include",
|
||||
TypeVal: typeStr,
|
||||
},
|
||||
|
|
@ -163,7 +163,7 @@ func TestLoadingConfigRegexp(t *testing.T) {
|
|||
}, {
|
||||
filterName: "filter/exclude",
|
||||
expCfg: &Config{
|
||||
ProcessorSettings: config2.ProcessorSettings{
|
||||
ProcessorSettings: config.ProcessorSettings{
|
||||
NameVal: "filter/exclude",
|
||||
TypeVal: typeStr,
|
||||
},
|
||||
|
|
@ -174,7 +174,7 @@ func TestLoadingConfigRegexp(t *testing.T) {
|
|||
}, {
|
||||
filterName: "filter/unlimitedcache",
|
||||
expCfg: &Config{
|
||||
ProcessorSettings: config2.ProcessorSettings{
|
||||
ProcessorSettings: config.ProcessorSettings{
|
||||
NameVal: "filter/unlimitedcache",
|
||||
TypeVal: typeStr,
|
||||
},
|
||||
|
|
@ -191,7 +191,7 @@ func TestLoadingConfigRegexp(t *testing.T) {
|
|||
}, {
|
||||
filterName: "filter/limitedcache",
|
||||
expCfg: &Config{
|
||||
ProcessorSettings: config2.ProcessorSettings{
|
||||
ProcessorSettings: config.ProcessorSettings{
|
||||
NameVal: "filter/limitedcache",
|
||||
TypeVal: typeStr,
|
||||
},
|
||||
|
|
@ -211,7 +211,7 @@ func TestLoadingConfigRegexp(t *testing.T) {
|
|||
|
||||
for _, test := range tests {
|
||||
t.Run(test.filterName, func(t *testing.T) {
|
||||
cfg := config.Processors[test.filterName]
|
||||
cfg := cfg.Processors[test.filterName]
|
||||
assert.Equal(t, test.expCfg, cfg)
|
||||
})
|
||||
}
|
||||
|
|
@ -221,19 +221,19 @@ func TestLoadingConfigExpr(t *testing.T) {
|
|||
factories, err := componenttest.NopFactories()
|
||||
require.NoError(t, err)
|
||||
factory := NewFactory()
|
||||
factories.Processors[config2.Type(typeStr)] = factory
|
||||
config, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config_expr.yaml"), factories)
|
||||
factories.Processors[config.Type(typeStr)] = factory
|
||||
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config_expr.yaml"), factories)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, config)
|
||||
require.NotNil(t, cfg)
|
||||
|
||||
tests := []struct {
|
||||
filterName string
|
||||
expCfg config2.Processor
|
||||
expCfg config.Processor
|
||||
}{
|
||||
{
|
||||
filterName: "filter/empty",
|
||||
expCfg: &Config{
|
||||
ProcessorSettings: config2.ProcessorSettings{
|
||||
ProcessorSettings: config.ProcessorSettings{
|
||||
NameVal: "filter/empty",
|
||||
TypeVal: typeStr,
|
||||
},
|
||||
|
|
@ -247,7 +247,7 @@ func TestLoadingConfigExpr(t *testing.T) {
|
|||
{
|
||||
filterName: "filter/include",
|
||||
expCfg: &Config{
|
||||
ProcessorSettings: config2.ProcessorSettings{
|
||||
ProcessorSettings: config.ProcessorSettings{
|
||||
NameVal: "filter/include",
|
||||
TypeVal: typeStr,
|
||||
},
|
||||
|
|
@ -265,7 +265,7 @@ func TestLoadingConfigExpr(t *testing.T) {
|
|||
{
|
||||
filterName: "filter/exclude",
|
||||
expCfg: &Config{
|
||||
ProcessorSettings: config2.ProcessorSettings{
|
||||
ProcessorSettings: config.ProcessorSettings{
|
||||
NameVal: "filter/exclude",
|
||||
TypeVal: typeStr,
|
||||
},
|
||||
|
|
@ -283,7 +283,7 @@ func TestLoadingConfigExpr(t *testing.T) {
|
|||
{
|
||||
filterName: "filter/includeexclude",
|
||||
expCfg: &Config{
|
||||
ProcessorSettings: config2.ProcessorSettings{
|
||||
ProcessorSettings: config.ProcessorSettings{
|
||||
NameVal: "filter/includeexclude",
|
||||
TypeVal: typeStr,
|
||||
},
|
||||
|
|
@ -306,7 +306,7 @@ func TestLoadingConfigExpr(t *testing.T) {
|
|||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.filterName, func(t *testing.T) {
|
||||
cfg := config.Processors[test.filterName]
|
||||
cfg := cfg.Processors[test.filterName]
|
||||
assert.Equal(t, test.expCfg, cfg)
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
config2 "go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/config/configerror"
|
||||
"go.opentelemetry.io/collector/consumer/consumererror"
|
||||
)
|
||||
|
|
@ -30,7 +30,7 @@ import (
|
|||
// a trace and/or a metrics consumer and have a shutdown function.
|
||||
type builtExporter struct {
|
||||
logger *zap.Logger
|
||||
expByDataType map[config2.DataType]component.Exporter
|
||||
expByDataType map[config.DataType]component.Exporter
|
||||
}
|
||||
|
||||
// Start the exporter.
|
||||
|
|
@ -60,7 +60,7 @@ func (bexp *builtExporter) Shutdown(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (bexp *builtExporter) getTraceExporter() component.TracesExporter {
|
||||
exp := bexp.expByDataType[config2.TracesDataType]
|
||||
exp := bexp.expByDataType[config.TracesDataType]
|
||||
if exp == nil {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -68,7 +68,7 @@ func (bexp *builtExporter) getTraceExporter() component.TracesExporter {
|
|||
}
|
||||
|
||||
func (bexp *builtExporter) getMetricExporter() component.MetricsExporter {
|
||||
exp := bexp.expByDataType[config2.MetricsDataType]
|
||||
exp := bexp.expByDataType[config.MetricsDataType]
|
||||
if exp == nil {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -76,7 +76,7 @@ func (bexp *builtExporter) getMetricExporter() component.MetricsExporter {
|
|||
}
|
||||
|
||||
func (bexp *builtExporter) getLogExporter() component.LogsExporter {
|
||||
exp := bexp.expByDataType[config2.LogsDataType]
|
||||
exp := bexp.expByDataType[config.LogsDataType]
|
||||
if exp == nil {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -84,7 +84,7 @@ func (bexp *builtExporter) getLogExporter() component.LogsExporter {
|
|||
}
|
||||
|
||||
// Exporters is a map of exporters created from exporter configs.
|
||||
type Exporters map[config2.Exporter]*builtExporter
|
||||
type Exporters map[config.Exporter]*builtExporter
|
||||
|
||||
// StartAll starts all exporters.
|
||||
func (exps Exporters) StartAll(ctx context.Context, host component.Host) error {
|
||||
|
|
@ -112,13 +112,13 @@ func (exps Exporters) ShutdownAll(ctx context.Context) error {
|
|||
return consumererror.Combine(errs)
|
||||
}
|
||||
|
||||
func (exps Exporters) ToMapByDataType() map[config2.DataType]map[config2.NamedEntity]component.Exporter {
|
||||
func (exps Exporters) ToMapByDataType() map[config.DataType]map[config.NamedEntity]component.Exporter {
|
||||
|
||||
exportersMap := make(map[config2.DataType]map[config2.NamedEntity]component.Exporter)
|
||||
exportersMap := make(map[config.DataType]map[config.NamedEntity]component.Exporter)
|
||||
|
||||
exportersMap[config2.TracesDataType] = make(map[config2.NamedEntity]component.Exporter, len(exps))
|
||||
exportersMap[config2.MetricsDataType] = make(map[config2.NamedEntity]component.Exporter, len(exps))
|
||||
exportersMap[config2.LogsDataType] = make(map[config2.NamedEntity]component.Exporter, len(exps))
|
||||
exportersMap[config.TracesDataType] = make(map[config.NamedEntity]component.Exporter, len(exps))
|
||||
exportersMap[config.MetricsDataType] = make(map[config.NamedEntity]component.Exporter, len(exps))
|
||||
exportersMap[config.LogsDataType] = make(map[config.NamedEntity]component.Exporter, len(exps))
|
||||
|
||||
for cfg, bexp := range exps {
|
||||
for t, exp := range bexp.expByDataType {
|
||||
|
|
@ -131,29 +131,29 @@ func (exps Exporters) ToMapByDataType() map[config2.DataType]map[config2.NamedEn
|
|||
|
||||
type dataTypeRequirement struct {
|
||||
// Pipeline that requires the data type.
|
||||
requiredBy *config2.Pipeline
|
||||
requiredBy *config.Pipeline
|
||||
}
|
||||
|
||||
// Map of data type requirements.
|
||||
type dataTypeRequirements map[config2.DataType]dataTypeRequirement
|
||||
type dataTypeRequirements map[config.DataType]dataTypeRequirement
|
||||
|
||||
// Data type requirements for all exporters.
|
||||
type exportersRequiredDataTypes map[config2.Exporter]dataTypeRequirements
|
||||
type exportersRequiredDataTypes map[config.Exporter]dataTypeRequirements
|
||||
|
||||
// exportersBuilder builds exporters from config.
|
||||
type exportersBuilder struct {
|
||||
logger *zap.Logger
|
||||
appInfo component.ApplicationStartInfo
|
||||
config *config2.Config
|
||||
factories map[config2.Type]component.ExporterFactory
|
||||
config *config.Config
|
||||
factories map[config.Type]component.ExporterFactory
|
||||
}
|
||||
|
||||
// BuildExporters builds Exporters from config.
|
||||
func BuildExporters(
|
||||
logger *zap.Logger,
|
||||
appInfo component.ApplicationStartInfo,
|
||||
config *config2.Config,
|
||||
factories map[config2.Type]component.ExporterFactory,
|
||||
config *config.Config,
|
||||
factories map[config.Type]component.ExporterFactory,
|
||||
) (Exporters, error) {
|
||||
eb := &exportersBuilder{logger.With(zap.String(kindLogKey, kindLogsExporter)), appInfo, config, factories}
|
||||
|
||||
|
|
@ -213,20 +213,20 @@ func (eb *exportersBuilder) buildExporter(
|
|||
ctx context.Context,
|
||||
logger *zap.Logger,
|
||||
appInfo component.ApplicationStartInfo,
|
||||
config config2.Exporter,
|
||||
cfg config.Exporter,
|
||||
exportersInputDataTypes exportersRequiredDataTypes,
|
||||
) (*builtExporter, error) {
|
||||
factory := eb.factories[config.Type()]
|
||||
factory := eb.factories[cfg.Type()]
|
||||
if factory == nil {
|
||||
return nil, fmt.Errorf("exporter factory not found for type: %s", config.Type())
|
||||
return nil, fmt.Errorf("exporter factory not found for type: %s", cfg.Type())
|
||||
}
|
||||
|
||||
exporter := &builtExporter{
|
||||
logger: logger,
|
||||
expByDataType: make(map[config2.DataType]component.Exporter, 3),
|
||||
expByDataType: make(map[config.DataType]component.Exporter, 3),
|
||||
}
|
||||
|
||||
inputDataTypes := exportersInputDataTypes[config]
|
||||
inputDataTypes := exportersInputDataTypes[cfg]
|
||||
if inputDataTypes == nil {
|
||||
eb.logger.Info("Ignoring exporter as it is not used by any pipeline")
|
||||
return exporter, nil
|
||||
|
|
@ -237,77 +237,49 @@ func (eb *exportersBuilder) buildExporter(
|
|||
ApplicationStartInfo: appInfo,
|
||||
}
|
||||
|
||||
var err error
|
||||
var createdExporter component.Exporter
|
||||
for dataType, requirement := range inputDataTypes {
|
||||
switch dataType {
|
||||
case config2.TracesDataType:
|
||||
// Traces data type is required. Create a trace exporter based on config.
|
||||
te, err := factory.CreateTracesExporter(ctx, creationParams, config)
|
||||
if err != nil {
|
||||
if err == configerror.ErrDataTypeIsNotSupported {
|
||||
// Could not create because this exporter does not support this data type.
|
||||
return nil, exporterTypeMismatchErr(config, requirement.requiredBy, dataType)
|
||||
}
|
||||
return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err)
|
||||
}
|
||||
case config.TracesDataType:
|
||||
createdExporter, err = factory.CreateTracesExporter(ctx, creationParams, cfg)
|
||||
|
||||
// Check if the factory really created the exporter.
|
||||
if te == nil {
|
||||
return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name())
|
||||
}
|
||||
case config.MetricsDataType:
|
||||
createdExporter, err = factory.CreateMetricsExporter(ctx, creationParams, cfg)
|
||||
|
||||
exporter.expByDataType[config2.TracesDataType] = te
|
||||
|
||||
case config2.MetricsDataType:
|
||||
// Metrics data type is required. Create a trace exporter based on config.
|
||||
me, err := factory.CreateMetricsExporter(ctx, creationParams, config)
|
||||
if err != nil {
|
||||
if err == configerror.ErrDataTypeIsNotSupported {
|
||||
// Could not create because this exporter does not support this data type.
|
||||
return nil, exporterTypeMismatchErr(config, requirement.requiredBy, dataType)
|
||||
}
|
||||
return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err)
|
||||
}
|
||||
|
||||
// The factories can be implemented by third parties, check if they really
|
||||
// created the exporter.
|
||||
if me == nil {
|
||||
return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name())
|
||||
}
|
||||
|
||||
exporter.expByDataType[config2.MetricsDataType] = me
|
||||
|
||||
case config2.LogsDataType:
|
||||
le, err := factory.CreateLogsExporter(ctx, creationParams, config)
|
||||
if err != nil {
|
||||
if err == configerror.ErrDataTypeIsNotSupported {
|
||||
// Could not create because this exporter does not support this data type.
|
||||
return nil, exporterTypeMismatchErr(config, requirement.requiredBy, dataType)
|
||||
}
|
||||
return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err)
|
||||
}
|
||||
|
||||
// Check if the factory really created the exporter.
|
||||
if le == nil {
|
||||
return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name())
|
||||
}
|
||||
|
||||
exporter.expByDataType[config2.LogsDataType] = le
|
||||
case config.LogsDataType:
|
||||
createdExporter, err = factory.CreateLogsExporter(ctx, creationParams, cfg)
|
||||
|
||||
default:
|
||||
// Could not create because this exporter does not support this data type.
|
||||
return nil, exporterTypeMismatchErr(config, requirement.requiredBy, dataType)
|
||||
return nil, exporterTypeMismatchErr(cfg, requirement.requiredBy, dataType)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if err == configerror.ErrDataTypeIsNotSupported {
|
||||
// Could not create because this exporter does not support this data type.
|
||||
return nil, exporterTypeMismatchErr(cfg, requirement.requiredBy, dataType)
|
||||
}
|
||||
return nil, fmt.Errorf("error creating %s exporter: %v", cfg.Name(), err)
|
||||
}
|
||||
|
||||
// Check if the factory really created the exporter.
|
||||
if createdExporter == nil {
|
||||
return nil, fmt.Errorf("factory for %q produced a nil exporter", cfg.Name())
|
||||
}
|
||||
|
||||
exporter.expByDataType[dataType] = createdExporter
|
||||
}
|
||||
|
||||
eb.logger.Info("Exporter was built.", zap.String("exporter", config.Name()))
|
||||
eb.logger.Info("Exporter was built.", zap.String("exporter", cfg.Name()))
|
||||
|
||||
return exporter, nil
|
||||
}
|
||||
|
||||
func exporterTypeMismatchErr(
|
||||
config config2.Exporter,
|
||||
requiredByPipeline *config2.Pipeline,
|
||||
dataType config2.DataType,
|
||||
config config.Exporter,
|
||||
requiredByPipeline *config.Pipeline,
|
||||
dataType config.DataType,
|
||||
) error {
|
||||
return fmt.Errorf("pipeline %q of data type %q has an exporter %q, which does not support that data type",
|
||||
requiredByPipeline.Name, dataType,
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
config2 "go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
"go.opentelemetry.io/collector/consumer/consumererror"
|
||||
"go.opentelemetry.io/collector/consumer/fanoutconsumer"
|
||||
|
|
@ -44,7 +44,7 @@ type builtPipeline struct {
|
|||
}
|
||||
|
||||
// BuiltPipelines is a map of build pipelines created from pipeline configs.
|
||||
type BuiltPipelines map[*config2.Pipeline]*builtPipeline
|
||||
type BuiltPipelines map[*config.Pipeline]*builtPipeline
|
||||
|
||||
func (bps BuiltPipelines) StartProcessors(ctx context.Context, host component.Host) error {
|
||||
for _, bp := range bps {
|
||||
|
|
@ -83,9 +83,9 @@ func (bps BuiltPipelines) ShutdownProcessors(ctx context.Context) error {
|
|||
type pipelinesBuilder struct {
|
||||
logger *zap.Logger
|
||||
appInfo component.ApplicationStartInfo
|
||||
config *config2.Config
|
||||
config *config.Config
|
||||
exporters Exporters
|
||||
factories map[config2.Type]component.ProcessorFactory
|
||||
factories map[config.Type]component.ProcessorFactory
|
||||
}
|
||||
|
||||
// BuildPipelines builds pipeline processors from config. Requires exporters to be already
|
||||
|
|
@ -93,9 +93,9 @@ type pipelinesBuilder struct {
|
|||
func BuildPipelines(
|
||||
logger *zap.Logger,
|
||||
appInfo component.ApplicationStartInfo,
|
||||
config *config2.Config,
|
||||
config *config.Config,
|
||||
exporters Exporters,
|
||||
factories map[config2.Type]component.ProcessorFactory,
|
||||
factories map[config.Type]component.ProcessorFactory,
|
||||
) (BuiltPipelines, error) {
|
||||
pb := &pipelinesBuilder{logger, appInfo, config, exporters, factories}
|
||||
|
||||
|
|
@ -114,7 +114,7 @@ func BuildPipelines(
|
|||
// Builds a pipeline of processors. Returns the first processor in the pipeline.
|
||||
// The last processor in the pipeline will be plugged to fan out the data into exporters
|
||||
// that are configured for this pipeline.
|
||||
func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *config2.Pipeline) (*builtPipeline, error) {
|
||||
func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *config.Pipeline) (*builtPipeline, error) {
|
||||
|
||||
// BuildProcessors the pipeline backwards.
|
||||
|
||||
|
|
@ -124,11 +124,11 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
|
|||
var lc consumer.Logs
|
||||
|
||||
switch pipelineCfg.InputType {
|
||||
case config2.TracesDataType:
|
||||
case config.TracesDataType:
|
||||
tc = pb.buildFanoutExportersTraceConsumer(pipelineCfg.Exporters)
|
||||
case config2.MetricsDataType:
|
||||
case config.MetricsDataType:
|
||||
mc = pb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters)
|
||||
case config2.LogsDataType:
|
||||
case config.LogsDataType:
|
||||
lc = pb.buildFanoutExportersLogConsumer(pipelineCfg.Exporters)
|
||||
}
|
||||
|
||||
|
|
@ -157,7 +157,7 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
|
|||
}
|
||||
|
||||
switch pipelineCfg.InputType {
|
||||
case config2.TracesDataType:
|
||||
case config.TracesDataType:
|
||||
var proc component.TracesProcessor
|
||||
proc, err = factory.CreateTracesProcessor(ctx, creationParams, procCfg, tc)
|
||||
if proc != nil {
|
||||
|
|
@ -165,7 +165,7 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
|
|||
}
|
||||
processors[i] = proc
|
||||
tc = proc
|
||||
case config2.MetricsDataType:
|
||||
case config.MetricsDataType:
|
||||
var proc component.MetricsProcessor
|
||||
proc, err = factory.CreateMetricsProcessor(ctx, creationParams, procCfg, mc)
|
||||
if proc != nil {
|
||||
|
|
@ -174,7 +174,7 @@ func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineCfg *conf
|
|||
processors[i] = proc
|
||||
mc = proc
|
||||
|
||||
case config2.LogsDataType:
|
||||
case config.LogsDataType:
|
||||
var proc component.LogsProcessor
|
||||
proc, err = factory.CreateLogsProcessor(ctx, creationParams, procCfg, lc)
|
||||
if proc != nil {
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
config2 "go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/config/configerror"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
"go.opentelemetry.io/collector/consumer/consumererror"
|
||||
|
|
@ -49,7 +49,7 @@ func (rcv *builtReceiver) Shutdown(ctx context.Context) error {
|
|||
}
|
||||
|
||||
// Receivers is a map of receivers created from receiver configs.
|
||||
type Receivers map[config2.Receiver]*builtReceiver
|
||||
type Receivers map[config.Receiver]*builtReceiver
|
||||
|
||||
// StopAll stops all receivers.
|
||||
func (rcvs Receivers) ShutdownAll(ctx context.Context) error {
|
||||
|
|
@ -81,18 +81,18 @@ func (rcvs Receivers) StartAll(ctx context.Context, host component.Host) error {
|
|||
type receiversBuilder struct {
|
||||
logger *zap.Logger
|
||||
appInfo component.ApplicationStartInfo
|
||||
config *config2.Config
|
||||
config *config.Config
|
||||
builtPipelines BuiltPipelines
|
||||
factories map[config2.Type]component.ReceiverFactory
|
||||
factories map[config.Type]component.ReceiverFactory
|
||||
}
|
||||
|
||||
// BuildReceivers builds Receivers from config.
|
||||
func BuildReceivers(
|
||||
logger *zap.Logger,
|
||||
appInfo component.ApplicationStartInfo,
|
||||
config *config2.Config,
|
||||
config *config.Config,
|
||||
builtPipelines BuiltPipelines,
|
||||
factories map[config2.Type]component.ReceiverFactory,
|
||||
factories map[config.Type]component.ReceiverFactory,
|
||||
) (Receivers, error) {
|
||||
rb := &receiversBuilder{logger.With(zap.String(kindLogKey, kindLogsReceiver)), appInfo, config, builtPipelines, factories}
|
||||
|
||||
|
|
@ -114,7 +114,7 @@ func BuildReceivers(
|
|||
}
|
||||
|
||||
// hasReceiver returns true if the pipeline is attached to specified receiver.
|
||||
func hasReceiver(pipeline *config2.Pipeline, receiverName string) bool {
|
||||
func hasReceiver(pipeline *config.Pipeline, receiverName string) bool {
|
||||
for _, name := range pipeline.Receivers {
|
||||
if name == receiverName {
|
||||
return true
|
||||
|
|
@ -123,16 +123,16 @@ func hasReceiver(pipeline *config2.Pipeline, receiverName string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
type attachedPipelines map[config2.DataType][]*builtPipeline
|
||||
type attachedPipelines map[config.DataType][]*builtPipeline
|
||||
|
||||
func (rb *receiversBuilder) findPipelinesToAttach(config config2.Receiver) (attachedPipelines, error) {
|
||||
func (rb *receiversBuilder) findPipelinesToAttach(cfg config.Receiver) (attachedPipelines, error) {
|
||||
// A receiver may be attached to multiple pipelines. Pipelines may consume different
|
||||
// data types. We need to compile the list of pipelines of each type that must be
|
||||
// attached to this receiver according to configuration.
|
||||
|
||||
pipelinesToAttach := make(attachedPipelines)
|
||||
pipelinesToAttach[config2.TracesDataType] = make([]*builtPipeline, 0)
|
||||
pipelinesToAttach[config2.MetricsDataType] = make([]*builtPipeline, 0)
|
||||
pipelinesToAttach[config.TracesDataType] = make([]*builtPipeline, 0)
|
||||
pipelinesToAttach[config.MetricsDataType] = make([]*builtPipeline, 0)
|
||||
|
||||
// Iterate over all pipelines.
|
||||
for _, pipelineCfg := range rb.config.Service.Pipelines {
|
||||
|
|
@ -144,7 +144,7 @@ func (rb *receiversBuilder) findPipelinesToAttach(config config2.Receiver) (atta
|
|||
}
|
||||
|
||||
// Is this receiver attached to the pipeline?
|
||||
if hasReceiver(pipelineCfg, config.Name()) {
|
||||
if hasReceiver(pipelineCfg, cfg.Name()) {
|
||||
if _, exists := pipelinesToAttach[pipelineCfg.InputType]; !exists {
|
||||
pipelinesToAttach[pipelineCfg.InputType] = make([]*builtPipeline, 0)
|
||||
}
|
||||
|
|
@ -163,8 +163,8 @@ func (rb *receiversBuilder) attachReceiverToPipelines(
|
|||
logger *zap.Logger,
|
||||
appInfo component.ApplicationStartInfo,
|
||||
factory component.ReceiverFactory,
|
||||
dataType config2.DataType,
|
||||
config config2.Receiver,
|
||||
dataType config.DataType,
|
||||
cfg config.Receiver,
|
||||
rcv *builtReceiver,
|
||||
builtPipelines []*builtPipeline,
|
||||
) error {
|
||||
|
|
@ -179,17 +179,17 @@ func (rb *receiversBuilder) attachReceiverToPipelines(
|
|||
}
|
||||
|
||||
switch dataType {
|
||||
case config2.TracesDataType:
|
||||
case config.TracesDataType:
|
||||
junction := buildFanoutTraceConsumer(builtPipelines)
|
||||
createdReceiver, err = factory.CreateTracesReceiver(ctx, creationParams, config, junction)
|
||||
createdReceiver, err = factory.CreateTracesReceiver(ctx, creationParams, cfg, junction)
|
||||
|
||||
case config2.MetricsDataType:
|
||||
case config.MetricsDataType:
|
||||
junction := buildFanoutMetricConsumer(builtPipelines)
|
||||
createdReceiver, err = factory.CreateMetricsReceiver(ctx, creationParams, config, junction)
|
||||
createdReceiver, err = factory.CreateMetricsReceiver(ctx, creationParams, cfg, junction)
|
||||
|
||||
case config2.LogsDataType:
|
||||
case config.LogsDataType:
|
||||
junction := buildFanoutLogConsumer(builtPipelines)
|
||||
createdReceiver, err = factory.CreateLogsReceiver(ctx, creationParams, config, junction)
|
||||
createdReceiver, err = factory.CreateLogsReceiver(ctx, creationParams, cfg, junction)
|
||||
|
||||
default:
|
||||
err = configerror.ErrDataTypeIsNotSupported
|
||||
|
|
@ -200,16 +200,16 @@ func (rb *receiversBuilder) attachReceiverToPipelines(
|
|||
return fmt.Errorf(
|
||||
"receiver %s does not support %s but it was used in a "+
|
||||
"%s pipeline",
|
||||
config.Name(),
|
||||
cfg.Name(),
|
||||
dataType,
|
||||
dataType)
|
||||
}
|
||||
return fmt.Errorf("cannot create receiver %s: %s", config.Name(), err.Error())
|
||||
return fmt.Errorf("cannot create receiver %s: %s", cfg.Name(), err.Error())
|
||||
}
|
||||
|
||||
// Check if the factory really created the receiver.
|
||||
if createdReceiver == nil {
|
||||
return fmt.Errorf("factory for %q produced a nil receiver", config.Name())
|
||||
return fmt.Errorf("factory for %q produced a nil receiver", cfg.Name())
|
||||
}
|
||||
|
||||
if rcv.receiver != nil {
|
||||
|
|
@ -221,7 +221,7 @@ func (rb *receiversBuilder) attachReceiverToPipelines(
|
|||
"factory for %q is implemented incorrectly: "+
|
||||
"CreateTracesReceiver and CreateMetricsReceiver must return the same "+
|
||||
"receiver pointer when creating receivers of different data types",
|
||||
config.Name(),
|
||||
cfg.Name(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
@ -232,7 +232,7 @@ func (rb *receiversBuilder) attachReceiverToPipelines(
|
|||
return nil
|
||||
}
|
||||
|
||||
func (rb *receiversBuilder) buildReceiver(ctx context.Context, logger *zap.Logger, appInfo component.ApplicationStartInfo, config config2.Receiver) (*builtReceiver, error) {
|
||||
func (rb *receiversBuilder) buildReceiver(ctx context.Context, logger *zap.Logger, appInfo component.ApplicationStartInfo, config config.Receiver) (*builtReceiver, error) {
|
||||
|
||||
// First find pipelines that must be attached to this receiver.
|
||||
pipelinesToAttach, err := rb.findPipelinesToAttach(config)
|
||||
|
|
|
|||
|
|
@ -21,12 +21,12 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
promconfig "github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/discovery"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
config2 "go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/config"
|
||||
"go.opentelemetry.io/collector/config/configgrpc"
|
||||
"go.opentelemetry.io/collector/config/confignet"
|
||||
"go.opentelemetry.io/collector/consumer"
|
||||
|
|
@ -67,16 +67,16 @@ func (mb *DataReceiverBase) ReportFatalError(err error) {
|
|||
}
|
||||
|
||||
// GetFactory of the specified kind. Returns the factory for a component type.
|
||||
func (mb *DataReceiverBase) GetFactory(_ component.Kind, _ config2.Type) component.Factory {
|
||||
func (mb *DataReceiverBase) GetFactory(_ component.Kind, _ config.Type) component.Factory {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Return map of extensions. Only enabled and created extensions will be returned.
|
||||
func (mb *DataReceiverBase) GetExtensions() map[config2.NamedEntity]component.Extension {
|
||||
func (mb *DataReceiverBase) GetExtensions() map[config.NamedEntity]component.Extension {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mb *DataReceiverBase) GetExporters() map[config2.DataType]map[config2.NamedEntity]component.Exporter {
|
||||
func (mb *DataReceiverBase) GetExporters() map[config.DataType]map[config.NamedEntity]component.Exporter {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -349,8 +349,8 @@ func (dr *PrometheusDataReceiver) Start(_ consumer.Traces, mc consumer.Metrics,
|
|||
factory := prometheusreceiver.NewFactory()
|
||||
cfg := factory.CreateDefaultConfig().(*prometheusreceiver.Config)
|
||||
addr := fmt.Sprintf("0.0.0.0:%d", dr.Port)
|
||||
cfg.PrometheusConfig = &config.Config{
|
||||
ScrapeConfigs: []*config.ScrapeConfig{{
|
||||
cfg.PrometheusConfig = &promconfig.Config{
|
||||
ScrapeConfigs: []*promconfig.ScrapeConfig{{
|
||||
JobName: "testbed-job",
|
||||
ScrapeInterval: model.Duration(100 * time.Millisecond),
|
||||
ScrapeTimeout: model.Duration(time.Second),
|
||||
|
|
|
|||
Loading…
Reference in New Issue