Add extensions and service to configuration (#299)

* Add extensions and service to configuration

Adds the extensions and service to the configuration model so "extension components" can be configured in the standard way of data pipelines.

* Rename extension.Component to extension.ServiceExtension
This commit is contained in:
Paulo Janotti 2019-08-27 17:41:27 -07:00 committed by GitHub
parent 0e5f808a6d
commit 3c3718be3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 678 additions and 250 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/exporter"
"github.com/open-telemetry/opentelemetry-service/extension"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/receiver"
)
@ -38,10 +39,12 @@ type configErrorCode int
const (
_ configErrorCode = iota // skip 0, start errors codes from 1.
errInvalidTypeAndNameKey
errUnknownExtensionType
errUnknownReceiverType
errUnknownExporterType
errUnknownProcessorType
errInvalidPipelineType
errDuplicateExtensionName
errDuplicateReceiverName
errDuplicateExporterName
errDuplicateProcessorName
@ -50,6 +53,7 @@ const (
errPipelineMustHaveReceiver
errPipelineMustHaveExporter
errPipelineMustHaveProcessors
errExtensionNotExists
errPipelineReceiverNotExists
errPipelineProcessorNotExists
errPipelineExporterNotExists
@ -70,6 +74,12 @@ func (e *configError) Error() string {
// YAML top-level configuration keys
const (
// extensionsKeyName is the configuration key name for extensions section.
extensionsKeyName = "extensions"
// serviceKeyName is the configuration key name for service section.
serviceKeyName = "service"
// receiversKeyName is the configuration key name for receivers section.
receiversKeyName = "receivers"
@ -86,12 +96,26 @@ const (
// typeAndNameSeparator is the separator that is used between type and name in type/name composite keys.
const typeAndNameSeparator = "/"
// Factories struct holds in a single type all component factories that
// can be handled by the Config.
type Factories struct {
// Receivers maps receiver type names in the config to the respective factory.
Receivers map[string]receiver.Factory
// Processors maps processor type names in the config to the respective factory.
Processors map[string]processor.Factory
// Exporters maps exporter type names in the config to the respective factory.
Exporters map[string]exporter.Factory
// Extensions maps extension type names in the config to the respective factory.
Extensions map[string]extension.Factory
}
// Load loads a Config from Viper.
func Load(
v *viper.Viper,
receiverFactories map[string]receiver.Factory,
processorFactories map[string]processor.Factory,
exporterFactories map[string]exporter.Factory,
factories Factories,
logger *zap.Logger,
) (*configmodels.Config, error) {
@ -99,19 +123,35 @@ func Load(
// Load the config.
receivers, err := loadReceivers(v, receiverFactories)
// Start with extensions and service.
extensions, err := loadExtensions(v, factories.Extensions)
if err != nil {
return nil, err
}
config.Extensions = extensions
service, err := loadService(v)
if err != nil {
return nil, err
}
config.Service = service
// Load data components (receivers, exporters, processores, and pipelines).
receivers, err := loadReceivers(v, factories.Receivers)
if err != nil {
return nil, err
}
config.Receivers = receivers
exporters, err := loadExporters(v, exporterFactories)
exporters, err := loadExporters(v, factories.Exporters)
if err != nil {
return nil, err
}
config.Exporters = exporters
processors, err := loadProcessors(v, processorFactories)
processors, err := loadProcessors(v, factories.Processors)
if err != nil {
return nil, err
}
@ -170,6 +210,75 @@ func decodeTypeAndName(key string) (typeStr, fullName string, err error) {
return
}
func loadExtensions(v *viper.Viper, factories map[string]extension.Factory) (configmodels.Extensions, error) {
// Get the list of all "extensions" sub vipers from config source.
subViper := v.Sub(extensionsKeyName)
// Get the map of "extensions" sub-keys.
keyMap := v.GetStringMap(extensionsKeyName)
// Prepare resulting map.
extensions := make(configmodels.Extensions)
// Iterate over extensions and create a config for each.
for key := range keyMap {
// Decode the key into type and fullName components.
typeStr, fullName, err := decodeTypeAndName(key)
if err != nil || typeStr == "" {
return nil, &configError{
code: errInvalidTypeAndNameKey,
msg: fmt.Sprintf("invalid key %q: %s", key, err.Error()),
}
}
// Find extension factory based on "type" that we read from config source.
factory := factories[typeStr]
if factory == nil {
return nil, &configError{
code: errUnknownExtensionType,
msg: fmt.Sprintf("unknown extension type %q", typeStr),
}
}
// Create the default config for this extension
extensionCfg := factory.CreateDefaultConfig()
extensionCfg.SetType(typeStr)
extensionCfg.SetName(fullName)
// Now that the default config struct is created we can Unmarshal into it
// and it will apply user-defined config on top of the default.
if err := subViper.UnmarshalKey(key, extensionCfg); err != nil {
return nil, &configError{
code: errUnmarshalError,
msg: fmt.Sprintf("error reading settings for extension type %q: %v", typeStr, err),
}
}
if extensions[fullName] != nil {
return nil, &configError{
code: errDuplicateExtensionName,
msg: fmt.Sprintf("duplicate extension name %q", fullName),
}
}
extensions[fullName] = extensionCfg
}
return extensions, nil
}
func loadService(v *viper.Viper) (configmodels.Service, error) {
var service configmodels.Service
if err := v.UnmarshalKey(serviceKeyName, &service); err != nil {
return service, &configError{
code: errUnmarshalError,
msg: fmt.Sprintf("error reading settings for %q: %v", serviceKeyName, err),
}
}
return service, nil
}
func loadReceivers(v *viper.Viper, factories map[string]receiver.Factory) (configmodels.Receivers, error) {
// Get the list of all "receivers" sub vipers from config source.
subViper := v.Sub(receiversKeyName)
@ -436,6 +545,10 @@ func validateConfig(cfg *configmodels.Config, logger *zap.Logger) error {
// invalid cases that we currently don't check for but which we may want to add in
// the future (e.g. disallowing receiving and exporting on the same endpoint).
if err := validateService(cfg, logger); err != nil {
return err
}
if err := validatePipelines(cfg, logger); err != nil {
return err
}
@ -451,6 +564,49 @@ func validateConfig(cfg *configmodels.Config, logger *zap.Logger) error {
return nil
}
func validateService(cfg *configmodels.Config, logger *zap.Logger) error {
// Currently only to validate extensions.
return validateServiceExtensions(cfg, &cfg.Service, logger)
}
func validateServiceExtensions(
cfg *configmodels.Config,
service *configmodels.Service,
logger *zap.Logger,
) error {
if len(cfg.Service.Extensions) < 1 {
return nil
}
// Validate extensions.
for _, ref := range service.Extensions {
// Check that the name referenced in the service extensions exists in the top-level extensions
if cfg.Extensions[ref] == nil {
return &configError{
code: errExtensionNotExists,
msg: fmt.Sprintf("service references extension %q which does not exists", ref),
}
}
}
// Remove disabled extensions.
extensions := service.Extensions[:0]
for _, ref := range service.Extensions {
ext := cfg.Extensions[ref]
if ext.IsEnabled() {
// The extension is enabled. Keep it in the pipeline.
extensions = append(extensions, ref)
} else {
logger.Info("service references a disabled extension. Ignoring the extension.",
zap.String("extension", ref))
}
}
service.Extensions = extensions
return nil
}
func validatePipelines(cfg *configmodels.Config, logger *zap.Logger) error {
// Must have at least one pipeline.
if len(cfg.Pipelines) < 1 {

View File

@ -24,17 +24,25 @@ import (
)
func TestDecodeConfig(t *testing.T) {
receivers, processors, exporters, err := ExampleComponents()
factories, err := ExampleComponents()
assert.Nil(t, err)
// Load the config
config, err := LoadConfigFile(
t, path.Join(".", "testdata", "valid-config.yaml"), receivers, processors, exporters,
)
config, err := LoadConfigFile(t, path.Join(".", "testdata", "valid-config.yaml"), factories)
if err != nil {
t.Fatalf("unable to load config, %v", err)
}
// Verify extensions.
assert.Equal(t, 3, len(config.Extensions))
assert.False(t, config.Extensions["exampleextension/disabled"].IsEnabled())
assert.Equal(t, "some string", config.Extensions["exampleextension/1"].(*ExampleExtension).ExtraSetting)
// Verify service.
assert.Equal(t, 2, len(config.Service.Extensions))
assert.Equal(t, "exampleextension/0", config.Service.Extensions[0])
assert.Equal(t, "exampleextension/1", config.Service.Extensions[1])
// Verify receivers
assert.Equal(t, 2, len(config.Receivers), "Incorrect receivers count")
@ -117,13 +125,11 @@ func TestDecodeConfig(t *testing.T) {
}
func TestDecodeConfig_MultiProto(t *testing.T) {
receivers, processors, exporters, err := ExampleComponents()
factories, err := ExampleComponents()
assert.Nil(t, err)
// Load the config
config, err := LoadConfigFile(
t, path.Join(".", "testdata", "multiproto-config.yaml"), receivers, processors, exporters,
)
config, err := LoadConfigFile(t, path.Join(".", "testdata", "multiproto-config.yaml"), factories)
if err != nil {
t.Fatalf("unable to load config, %v", err)
}
@ -181,8 +187,10 @@ func TestDecodeConfig_Invalid(t *testing.T) {
{name: "missing-exporters", expected: errMissingExporters},
{name: "missing-receivers", expected: errMissingReceivers},
{name: "missing-processors"},
{name: "invalid-extension-name", expected: errExtensionNotExists},
{name: "invalid-receiver-name"},
{name: "invalid-receiver-reference", expected: errPipelineReceiverNotExists},
{name: "missing-extension-type", expected: errInvalidTypeAndNameKey},
{name: "missing-receiver-type", expected: errInvalidTypeAndNameKey},
{name: "missing-exporter-name-after-slash", expected: errInvalidTypeAndNameKey},
{name: "missing-processor-type", expected: errInvalidTypeAndNameKey},
@ -194,28 +202,30 @@ func TestDecodeConfig_Invalid(t *testing.T) {
{name: "pipeline-processor-not-exists", expected: errPipelineProcessorNotExists},
{name: "pipeline-must-have-processors", expected: errPipelineMustHaveProcessors},
{name: "metric-pipeline-cannot-have-processors", expected: errMetricPipelineCannotHaveProcessors},
{name: "unknown-extension-type", expected: errUnknownExtensionType},
{name: "unknown-receiver-type", expected: errUnknownReceiverType},
{name: "unknown-exporter-type", expected: errUnknownExporterType},
{name: "unknown-processor-type", expected: errUnknownProcessorType},
{name: "invalid-extension-disabled-value", expected: errUnmarshalError},
{name: "invalid-service-extensions-value", expected: errUnmarshalError},
{name: "invalid-bool-value", expected: errUnmarshalError},
{name: "invalid-sequence-value", expected: errUnmarshalError},
{name: "invalid-disabled-bool-value", expected: errUnmarshalError},
{name: "invalid-disabled-bool-value2", expected: errUnmarshalError},
{name: "invalid-pipeline-type", expected: errInvalidPipelineType},
{name: "invalid-pipeline-type-and-name", expected: errInvalidTypeAndNameKey},
{name: "duplicate-extension", expected: errDuplicateExtensionName},
{name: "duplicate-receiver", expected: errDuplicateReceiverName},
{name: "duplicate-exporter", expected: errDuplicateExporterName},
{name: "duplicate-processor", expected: errDuplicateProcessorName},
{name: "duplicate-pipeline", expected: errDuplicatePipelineName},
}
receivers, processors, exporters, err := ExampleComponents()
factories, err := ExampleComponents()
assert.Nil(t, err)
for _, test := range testCases {
_, err := LoadConfigFile(
t, path.Join(".", "testdata", test.name+".yaml"), receivers, processors, exporters,
)
_, err := LoadConfigFile(t, path.Join(".", "testdata", test.name+".yaml"), factories)
if err == nil {
t.Errorf("expected error but succeeded on invalid config case: %s", test.name)
} else if test.expected != 0 {

View File

@ -31,12 +31,14 @@ corresponding interface and if they have additional settings they must also exte
the corresponding common settings struct (the easiest approach is to embed the common struct).
*/
// Config defines the configuration V2 for the various elements of collector or agent.
// Config defines the configuration for the various elements of collector or agent.
type Config struct {
Receivers Receivers
Exporters Exporters
Processors Processors
Pipelines Pipelines
Extensions Extensions
Service Service
}
// NamedEntity is a configuration entity that has a name.
@ -125,6 +127,25 @@ type Pipeline struct {
// Pipelines is a map of names to Pipelines.
type Pipelines map[string]*Pipeline
// Extension is the configuration of a service extension. Specific extensions
// must implement this interface and will typically embed ExtensionSettings
// struct or a struct that extends it.
type Extension interface {
NamedEntity
IsEnabled() bool
Type() string
SetType(typeStr string)
}
// Extensions is a map of names to extensions.
type Extensions map[string]Extension
// Service defines the configurable components of the service.
type Service struct {
// Extensions is the ordered list of extensions configured for the service.
Extensions []string `mapstructure:"extensions"`
}
// Below are common setting structs for Receivers, Exporters and Processors.
// These are helper structs which you can embed when implementing your specific
// receiver/exporter/processor config storage.
@ -239,3 +260,38 @@ func (proc *ProcessorSettings) IsEnabled() bool {
}
var _ Processor = (*ProcessorSettings)(nil)
// ExtensionSettings defines common settings for a service extension configuration.
// Specific extensions can embed this struct and extend it with more fields if needed.
type ExtensionSettings struct {
TypeVal string `mapstructure:"-"`
NameVal string `mapstructure:"-"`
Disabled bool `mapstructure:"disabled"`
}
// Name gets the extension name.
func (ext *ExtensionSettings) Name() string {
return ext.NameVal
}
// SetName sets the extension name.
func (ext *ExtensionSettings) SetName(name string) {
ext.NameVal = name
}
// Type sets the extension type.
func (ext *ExtensionSettings) Type() string {
return ext.TypeVal
}
// SetType sets the extension type.
func (ext *ExtensionSettings) SetType(typeStr string) {
ext.TypeVal = typeStr
}
// IsEnabled returns true if the entity is enabled.
func (ext *ExtensionSettings) IsEnabled() bool {
return !ext.Disabled
}
var _ Extension = (*ExtensionSettings)(nil)

View File

@ -16,6 +16,7 @@ package config
import (
"context"
"fmt"
"go.uber.org/zap"
@ -24,6 +25,7 @@ import (
"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-service/exporter"
"github.com/open-telemetry/opentelemetry-service/extension"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/receiver"
)
@ -343,14 +345,50 @@ func (f *ExampleProcessorFactory) CreateMetricsProcessor(
return nil, configerror.ErrDataTypeIsNotSupported
}
// ExampleExtension is for testing purposes. We are defining an example config and factory
// for "exampleextension" extension type.
type ExampleExtension struct {
configmodels.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
ExtraSetting string `mapstructure:"extra"`
}
// ExampleExtensionFactory is factory for ExampleExtension.
type ExampleExtensionFactory struct {
}
// Type gets the type of the Extension config created by this factory.
func (f *ExampleExtensionFactory) Type() string {
return "exampleextension"
}
// CreateDefaultConfig creates the default configuration for the Extension.
func (f *ExampleExtensionFactory) CreateDefaultConfig() configmodels.Extension {
return &ExampleExtension{
ExtensionSettings: configmodels.ExtensionSettings{},
ExtraSetting: "extra string setting",
}
}
// CreateExtension creates an Extension based on this config.
func (f *ExampleExtensionFactory) CreateExtension(
logger *zap.Logger,
cfg configmodels.Extension,
) (extension.ServiceExtension, error) {
return nil, fmt.Errorf("cannot create %q extension type", f.Type())
}
var _ (extension.Factory) = (*ExampleExtensionFactory)(nil)
// ExampleComponents registers example factories. This is only used by tests.
func ExampleComponents() (
receivers map[string]receiver.Factory,
processors map[string]processor.Factory,
exporters map[string]exporter.Factory,
factories Factories,
err error,
) {
receivers, err = receiver.Build(
if factories.Extensions, err = extension.Build(&ExampleExtensionFactory{}); err != nil {
return
}
factories.Receivers, err = receiver.Build(
&ExampleReceiverFactory{},
&MultiProtoReceiverFactory{},
)
@ -358,11 +396,12 @@ func ExampleComponents() (
return
}
exporters, err = exporter.Build(&ExampleExporterFactory{})
factories.Exporters, err = exporter.Build(&ExampleExporterFactory{})
if err != nil {
return
}
processors, err = processor.Build(&ExampleProcessorFactory{})
factories.Processors, err = processor.Build(&ExampleProcessorFactory{})
return
}

View File

@ -22,19 +22,10 @@ import (
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/exporter"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/receiver"
)
// LoadConfigFile loads a config from file.
func LoadConfigFile(
t *testing.T,
fileName string,
receivers map[string]receiver.Factory,
processors map[string]processor.Factory,
exporters map[string]exporter.Factory,
) (*configmodels.Config, error) {
func LoadConfigFile(t *testing.T, fileName string, factories Factories) (*configmodels.Config, error) {
// Open the file for reading.
file, err := os.Open(fileName)
if err != nil {
@ -51,6 +42,6 @@ func LoadConfigFile(
return nil, err
}
// Load the config from viper
return Load(v, receivers, processors, exporters, zap.NewNop())
// Load the config from viper using the given factories.
return Load(v, factories, zap.NewNop())
}

View File

@ -0,0 +1,3 @@
extensions:
exampleextension/ext:
exampleextension/ ext:

View File

@ -0,0 +1,3 @@
extensions:
exampleextension:
disabled: "not a bool value"

View File

@ -0,0 +1,16 @@
extensions:
exampleextension:
service:
extensions: [exampleextension, nosuchextension, and, another, three]
receivers:
examplereceiver:
processors:
exampleprocessor:
exporters:
exampleexporter:
pipelines:
traces:
receivers: [examplereceiver]
processors: [exampleprocessor]
exporters: [exampleexporter]

View File

@ -0,0 +1,18 @@
extensions:
exampleextension:
service:
extensions:
exampleextension:
disabled: true
receivers:
examplereceiver:
processors:
exampleprocessor:
exporters:
exampleexporter:
pipelines:
traces:
receivers: [examplereceiver]
processors: [exampleprocessor]
exporters: [exampleexporter]

View File

@ -0,0 +1,2 @@
extensions:
/exampleextension:

View File

@ -0,0 +1,2 @@
extensions:
nosuchextension:

View File

@ -23,3 +23,13 @@ pipelines:
receivers: [examplereceiver, examplereceiver/disabled]
processors: [exampleprocessor, exampleprocessor/disabled]
exporters: [exampleexporter/disabled, exampleexporter]
extensions:
exampleextension/0:
exampleextension/disabled:
disabled: true
exampleextension/1:
extra: "some string"
service:
extensions: [exampleextension/0, exampleextension/disabled, exampleextension/1]

View File

@ -1,7 +1,7 @@
# OpenTelemetry Service: Extension Components
# OpenTelemetry Service: Extensions
Besides the pipeline elements (receivers, processors, and exporters) the OTelSvc
uses various extension components (e.g.: healthcheck, z-pages, etc).
uses various service extensions (e.g.: healthcheck, z-pages, etc).
This document describes the “extensions” design and how they are implemented.
## Configuration and Interface
@ -10,8 +10,8 @@ The configuration follows the same pattern used for pipelines: a base
configuration type and the creation of factories to instantiate the extension
objects.
In order to support generic extension components an interface is defined
so the service can interact uniformly with these. At minimum extension components
In order to support generic service extensions an interface is defined
so the service can interact uniformly with these. At minimum service extensions
need to implement the interface that covers Start and Shutdown.
In addition to this base interface there is support to notify extensions when
@ -39,8 +39,8 @@ and how it will interact with the service extensions.
## Configuration
The config package will be extended to load the extension components when the
configuration is loaded. The settings for extension components will live in the
The config package will be extended to load the service extensions when the
configuration is loaded. The settings for service extensions will live in the
same configuration file as the pipeline elements. Below is an example of how
these sections would look like in the configuration file:
@ -61,7 +61,7 @@ extensions:
# The service lists extensions not directly related to data pipelines, but used
# by the service.
service:
# extensions lists the components added to the service. They are started
# extensions lists the extensions added to the service. They are started
# in the order presented below and stopped in the reverse order.
extensions: [health-check, pprof, zpages]
```
@ -79,67 +79,62 @@ The factory follows the same pattern established for pipeline configuration:
```go
// Factory is a factory interface for extensions to the service.
type Factory interface {
// Type gets the type of the extension component created by this factory.
// Type gets the type of the extension created by this factory.
Type() string
// CreateDefaultConfig creates the default configuration for the extension.
CreateDefaultConfig() configmodels.Extension
// CustomUnmarshaler returns a custom unmarshaler for the configuration or nil if
// there is no need for custom unmarshaling. This is typically used if viper.Unmarshal()
// is not sufficient to unmarshal correctly.
CustomUnmarshaler(v *viper.Viper, viperKey string, intoCfg interface{}) CustomUnmarshaler
// CreateExtension creates a service extension based on the given config.
CreateExtension(logger *zap.Logger, cfg configmodels.Extension) (Component, error)
CreateExtension(logger *zap.Logger, cfg configmodels.Extension) (ServiceExtension, error)
}
```
## Extension Interface
The interface defined below is the minimum required to keep same behavior for
ad-hoc components currently in use on the service:
The interface defined below is the minimum required for
extensions in use on the service:
```go
// Component is the interface for objects hosted by the OpenTelemetry Service that
// doesn't participate directly on data pipelines but provide some functionality
// ServiceExtension is the interface for objects hosted by the OpenTelemetry Service that
// don't participate directly on data pipelines but provide some functionality
// to the service, examples: health check endpoint, z-pages, etc.
type Component interface {
// Start the Component object hosted by the given host. At this point in the
// process life-cycle the receivers are not started and the host did not
// receive any data yet.
Start(host Host) error
type ServiceExtension interface {
// Start the ServiceExtension object hosted by the given host. At this point in the
// process life-cycle the receivers are not started and the host did not
// receive any data yet.
Start(host Host) error
// Shutdown the Component instance. This happens after the pipelines were
// shutdown.
Shutdown() error
// Shutdown the ServiceExtension instance. This happens after the pipelines were
// shutdown.
Shutdown() error
}
// PipelineWatcher is an extra interface for Components hosted by the OpenTelemetry
// Service that is to be implemented by Components interested in changes to pipeline
// states. Typically this will be used by Components that change their behavior if data is
// PipelineWatcher is an extra interface for ServiceExtension hosted by the OpenTelemetry
// Service that is to be implemented by extensions interested in changes to pipeline
// states. Typically this will be used by extensions that change their behavior if data is
// being ingested or not, e.g.: a k8s readiness probe.
type PipelineWatcher interface {
// Ready notifies the Component that all pipelines were built and the
// receivers were started, i.e.: the service is ready to receive data
// (notice that it may already have received data when this method is called).
Ready() error
// Ready notifies the ServiceExtension that all pipelines were built and the
// receivers were started, i.e.: the service is ready to receive data
// (notice that it may already have received data when this method is called).
Ready() error
// NotReady notifies the Component that all receivers are about to be stopped,
// i.e.: pipeline receivers will not accept new data.
// This is sent before receivers are stopped, so the Component can take any
// appropriate action before that happens.
NotReady() error
// NotReady notifies the ServiceExtension that all receivers are about to be stopped,
// i.e.: pipeline receivers will not accept new data.
// This is sent before receivers are stopped, so the ServiceExtension can take any
// appropriate action before that happens.
NotReady() error
}
// Host represents the entity where the extension component is being hosted.
// Host represents the entity where the extension is being hosted.
// It is used to allow communication between the extension and its host.
type Host interface {
// ReportFatalError is used to report to the host that the extension
// encountered a fatal error (i.e.: an error that the instance can't recover
// from) after its start function had already returned.
ReportFatalError(err error)
// ReportFatalError is used to report to the host that the extension
// encountered a fatal error (i.e.: an error that the instance can't recover
// from) after its start function had already returned.
ReportFatalError(err error)
}
```

View File

@ -26,14 +26,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
exporters[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Exporters[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

View File

@ -28,14 +28,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
exporters[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Exporters[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

View File

@ -26,14 +26,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
exporters[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Exporters[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

View File

@ -26,14 +26,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
exporters[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Exporters[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

View File

@ -26,14 +26,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
exporters[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Exporters[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

View File

@ -26,14 +26,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
exporters[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Exporters[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

58
extension/extension.go Normal file
View File

@ -0,0 +1,58 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package extension defines service extensions that can be added to the OpenTelemetry
// service but that not interact if the data pipelines, but provide some functionality
// to the service, examples: health check endpoint, z-pages, etc.
package extension
// Host represents the entity where the extension is being hosted.
// It is used to allow communication between the extension and its host.
type Host interface {
// ReportFatalError is used to report to the host that the extension
// encountered a fatal error (i.e.: an error that the instance can't recover
// from) after its start function had already returned.
ReportFatalError(err error)
}
// ServiceExtension is the interface for objects hosted by the OpenTelemetry Service that
// don't participate directly on data pipelines but provide some functionality
// to the service, examples: health check endpoint, z-pages, etc.
type ServiceExtension interface {
// Start the ServiceExtension object hosted by the given host. At this point in the
// process life-cycle the receivers are not started and the host did not
// receive any data yet.
Start(host Host) error
// Shutdown the ServiceExtension instance. This happens after the pipelines were
// shutdown.
Shutdown() error
}
// PipelineWatcher is an extra interface for ServiceExtension hosted by the OpenTelemetry
// Service that is to be implemented by extensions interested in changes to pipeline
// states. Typically this will be used by extensions that change their behavior if data is
// being ingested or not, e.g.: a k8s readiness probe.
type PipelineWatcher interface {
// Ready notifies the ServiceExtension that all pipelines were built and the
// receivers were started, i.e.: the service is ready to receive data
// (notice that it may already have received data when this method is called).
Ready() error
// NotReady notifies the ServiceExtension that all receivers are about to be stopped,
// i.e.: pipeline receivers will not accept new data.
// This is sent before receivers are stopped, so the ServiceExtension can take any
// appropriate action before that happens.
NotReady() error
}

49
extension/factory.go Normal file
View File

@ -0,0 +1,49 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package extension
import (
"fmt"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
)
// Factory is a factory interface for extensions to the service.
type Factory interface {
// Type gets the type of the extension created by this factory.
Type() string
// CreateDefaultConfig creates the default configuration for the extension.
CreateDefaultConfig() configmodels.Extension
// CreateExtension creates a service extension based on the given config.
CreateExtension(logger *zap.Logger, cfg configmodels.Extension) (ServiceExtension, error)
}
// Build takes a list of extension factories and returns a map of type map[string]Factory
// with factory type as keys. It returns a non-nil error when more than one factories
// have the same type.
func Build(factories ...Factory) (map[string]Factory, error) {
fMap := map[string]Factory{}
for _, f := range factories {
if _, ok := fMap[f.Type()]; ok {
return fMap, fmt.Errorf("duplicate extension factory %q", f.Type())
}
fMap[f.Type()] = f
}
return fMap, nil
}

86
extension/factory_test.go Normal file
View File

@ -0,0 +1,86 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package extension
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
)
type TestFactory struct {
name string
}
// Type gets the type of the extension config created by this factory.
func (f *TestFactory) Type() string {
return f.name
}
// CreateDefaultConfig creates the default configuration for the extension.
func (f *TestFactory) CreateDefaultConfig() configmodels.Extension {
return nil
}
// CreateTraceProcessor creates a trace processor based on this config.
func (f *TestFactory) CreateExtension(
logger *zap.Logger,
cfg configmodels.Extension,
) (ServiceExtension, error) {
return nil, errors.New("cannot create extension from TestFactory")
}
func TestFactoriesBuilder(t *testing.T) {
type testCase struct {
in []Factory
out map[string]Factory
err bool
}
testCases := []testCase{
{
in: []Factory{
&TestFactory{"ext1"},
&TestFactory{"ext2"},
},
out: map[string]Factory{
"ext1": &TestFactory{"ext1"},
"ext2": &TestFactory{"ext2"},
},
err: false,
},
{
in: []Factory{
&TestFactory{"ext1"},
&TestFactory{"ext1"},
},
err: true,
},
}
for _, c := range testCases {
out, err := Build(c.in...)
if c.err {
assert.NotNil(t, err)
continue
}
assert.Nil(t, err)
assert.Equal(t, c.out, out)
}
}

View File

@ -26,14 +26,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
processors[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Processors[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.Nil(t, err)
require.NotNil(t, cfg)

View File

@ -27,18 +27,13 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, _, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
require.NoError(t, err)
processors, err := processor.Build(&Factory{})
require.NotNil(t, processors)
factories.Processors, err = processor.Build(&Factory{})
require.NotNil(t, factories.Processors)
require.NoError(t, err)
config, err := config.LoadConfigFile(
t,
path.Join(".", "testdata", "config.yaml"),
receivers,
processors,
exporters)
config, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.Nil(t, err)
require.NotNil(t, config)
@ -76,18 +71,13 @@ func TestLoadConfig(t *testing.T) {
}
func TestLoadConfigEmpty(t *testing.T) {
receivers, _, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
require.NoError(t, err)
processors, err := processor.Build(&Factory{})
require.NotNil(t, processors)
factories.Processors, err = processor.Build(&Factory{})
require.NotNil(t, factories.Processors)
require.NoError(t, err)
config, err := config.LoadConfigFile(
t,
path.Join(".", "testdata", "empty.yaml"),
receivers,
processors,
exporters)
config, err := config.LoadConfigFile(t, path.Join(".", "testdata", "empty.yaml"), factories)
require.Nil(t, err)
require.NotNil(t, config)

View File

@ -25,17 +25,12 @@ import (
)
func TestLoadingConifg(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
processors[typeStr] = factory
config, err := config.LoadConfigFile(
t,
path.Join(".", "testdata", "config.yaml"),
receivers,
processors,
exporters)
factories.Processors[typeStr] = factory
config, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
assert.Nil(t, err)
assert.NotNil(t, config)

View File

@ -27,14 +27,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
processors[typeStr] = &Factory{}
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Processors[typeStr] = &Factory{}
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.Nil(t, err)
require.NotNil(t, cfg)

View File

@ -27,14 +27,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
processors[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Processors[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.Nil(t, err)
require.NotNil(t, cfg)
@ -53,18 +51,13 @@ func TestLoadConfig(t *testing.T) {
}
func TestLoadConfigEmpty(t *testing.T) {
receivers, _, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
require.NoError(t, err)
processors, err := processor.Build(&Factory{})
require.NotNil(t, processors)
factories.Processors, err = processor.Build(&Factory{})
require.NotNil(t, factories.Processors)
require.NoError(t, err)
config, err := config.LoadConfigFile(
t,
path.Join(".", "testdata", "empty.yaml"),
receivers,
processors,
exporters)
config, err := config.LoadConfigFile(t, path.Join(".", "testdata", "empty.yaml"), factories)
require.Nil(t, err)
require.NotNil(t, config)

View File

@ -27,14 +27,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
processors[typeStr] = &Factory{}
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Processors[typeStr] = &Factory{}
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.Nil(t, err)
require.NotNil(t, cfg)

View File

@ -25,18 +25,13 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.NoError(t, err)
factory := &Factory{}
processors[typeStr] = factory
factories.Processors[typeStr] = factory
config, err := config.LoadConfigFile(
t,
path.Join(".", "testdata", "config.yaml"),
receivers,
processors,
exporters)
config, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
assert.Nil(t, err)
assert.NotNil(t, config)

View File

@ -27,15 +27,13 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
processors[factory.Type()] = factory
factories.Processors[factory.Type()] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "tail_sampling_config.yaml"), receivers, processors, exporters,
)
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "tail_sampling_config.yaml"), factories)
require.Nil(t, err)
require.NotNil(t, cfg)

View File

@ -26,14 +26,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
receivers[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Receivers[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

View File

@ -27,14 +27,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
receivers[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Receivers[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

View File

@ -27,14 +27,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
receivers[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Receivers[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

View File

@ -27,14 +27,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
receivers[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Receivers[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

View File

@ -26,14 +26,12 @@ import (
)
func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
factory := &Factory{}
receivers[typeStr] = factory
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "config.yaml"), receivers, processors, exporters,
)
factories.Receivers[typeStr] = factory
cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

View File

@ -27,11 +27,11 @@ import (
)
func TestExportersBuilder_Build(t *testing.T) {
_, _, exporterFactories, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
oceFactory := &opencensusexporter.Factory{}
exporterFactories[oceFactory.Type()] = oceFactory
factories.Exporters[oceFactory.Type()] = oceFactory
cfg := &configmodels.Config{
Exporters: map[string]configmodels.Exporter{
"opencensus": &opencensusexporter.Config{
@ -52,7 +52,7 @@ func TestExportersBuilder_Build(t *testing.T) {
},
}
exporters, err := NewExportersBuilder(zap.NewNop(), cfg, exporterFactories).Build()
exporters, err := NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build()
assert.NoError(t, err)
require.NotNil(t, exporters)
@ -78,7 +78,7 @@ func TestExportersBuilder_Build(t *testing.T) {
// This should result in creating an exporter that has none of consumption
// functions set.
delete(cfg.Pipelines, "trace")
exporters, err = NewExportersBuilder(zap.NewNop(), cfg, exporterFactories).Build()
exporters, err = NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build()
assert.NotNil(t, exporters)
assert.Nil(t, err)

View File

@ -57,20 +57,18 @@ func TestPipelinesBuilder_Build(t *testing.T) {
}
func testPipeline(t *testing.T, pipelineName string, exporterNames []string) {
receiverFactories, processorsFactories, exporterFactories, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
attrFactory := &addattributesprocessor.Factory{}
processorsFactories[attrFactory.Type()] = attrFactory
cfg, err := config.LoadConfigFile(
t, "testdata/pipelines_builder.yaml", receiverFactories, processorsFactories, exporterFactories,
)
factories.Processors[attrFactory.Type()] = attrFactory
cfg, err := config.LoadConfigFile(t, "testdata/pipelines_builder.yaml", factories)
// Load the config
require.Nil(t, err)
// Build the pipeline
allExporters, err := NewExportersBuilder(zap.NewNop(), cfg, exporterFactories).Build()
allExporters, err := NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build()
assert.NoError(t, err)
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), cfg, allExporters, processorsFactories).Build()
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), cfg, allExporters, factories.Processors).Build()
assert.NoError(t, err)
require.NotNil(t, pipelineProcessors)
@ -125,13 +123,11 @@ func testPipeline(t *testing.T, pipelineName string, exporterNames []string) {
}
func TestPipelinesBuilder_Error(t *testing.T) {
receiverFactories, processorsFactories, exporterFactories, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
attrFactory := &addattributesprocessor.Factory{}
processorsFactories[attrFactory.Type()] = attrFactory
cfg, err := config.LoadConfigFile(
t, "testdata/pipelines_builder.yaml", receiverFactories, processorsFactories, exporterFactories,
)
factories.Processors[attrFactory.Type()] = attrFactory
cfg, err := config.LoadConfigFile(t, "testdata/pipelines_builder.yaml", factories)
require.Nil(t, err)
// Corrupt the pipeline, change data type to metrics. We have to forcedly do it here
@ -140,12 +136,12 @@ func TestPipelinesBuilder_Error(t *testing.T) {
pipeline := cfg.Pipelines["traces"]
pipeline.InputType = configmodels.MetricsDataType
exporters, err := NewExportersBuilder(zap.NewNop(), cfg, exporterFactories).Build()
exporters, err := NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build()
assert.NoError(t, err)
// This should fail because "attributes" processor defined in the config does
// not support metrics data type.
_, err = NewPipelinesBuilder(zap.NewNop(), cfg, exporters, processorsFactories).Build()
_, err = NewPipelinesBuilder(zap.NewNop(), cfg, exporters, factories.Processors).Build()
assert.NotNil(t, err)
}

View File

@ -91,22 +91,20 @@ func testReceivers(
t *testing.T,
test testCase,
) {
receiverFactories, processorsFactories, exporterFactories, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
attrFactory := &addattributesprocessor.Factory{}
processorsFactories[attrFactory.Type()] = attrFactory
cfg, err := config.LoadConfigFile(
t, "testdata/pipelines_builder.yaml", receiverFactories, processorsFactories, exporterFactories,
)
factories.Processors[attrFactory.Type()] = attrFactory
cfg, err := config.LoadConfigFile(t, "testdata/pipelines_builder.yaml", factories)
require.Nil(t, err)
// Build the pipeline
allExporters, err := NewExportersBuilder(zap.NewNop(), cfg, exporterFactories).Build()
allExporters, err := NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build()
assert.NoError(t, err)
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), cfg, allExporters, processorsFactories).Build()
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), cfg, allExporters, factories.Processors).Build()
assert.NoError(t, err)
receivers, err := NewReceiversBuilder(zap.NewNop(), cfg, pipelineProcessors, receiverFactories).Build()
receivers, err := NewReceiversBuilder(zap.NewNop(), cfg, pipelineProcessors, factories.Receivers).Build()
assert.NoError(t, err)
require.NotNil(t, receivers)
@ -206,14 +204,12 @@ func testReceivers(
}
func TestReceiversBuilder_DataTypeError(t *testing.T) {
receiverFactories, processorsFactories, exporterFactories, err := config.ExampleComponents()
factories, err := config.ExampleComponents()
assert.Nil(t, err)
attrFactory := &addattributesprocessor.Factory{}
processorsFactories[attrFactory.Type()] = attrFactory
cfg, err := config.LoadConfigFile(
t, "testdata/pipelines_builder.yaml", receiverFactories, processorsFactories, exporterFactories,
)
factories.Processors[attrFactory.Type()] = attrFactory
cfg, err := config.LoadConfigFile(t, "testdata/pipelines_builder.yaml", factories)
assert.Nil(t, err)
// Make examplereceiver to "unsupport" trace data type.
@ -221,11 +217,11 @@ func TestReceiversBuilder_DataTypeError(t *testing.T) {
receiver.(*config.ExampleReceiver).FailTraceCreation = true
// Build the pipeline
allExporters, err := NewExportersBuilder(zap.NewNop(), cfg, exporterFactories).Build()
allExporters, err := NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build()
assert.NoError(t, err)
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), cfg, allExporters, processorsFactories).Build()
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), cfg, allExporters, factories.Processors).Build()
assert.NoError(t, err)
receivers, err := NewReceiversBuilder(zap.NewNop(), cfg, pipelineProcessors, receiverFactories).Build()
receivers, err := NewReceiversBuilder(zap.NewNop(), cfg, pipelineProcessors, factories.Receivers).Build()
// This should fail because "examplereceiver" is attached to "traces" pipeline
// which is a configuration error.

View File

@ -47,10 +47,7 @@ type Application struct {
exporters builder.Exporters
builtReceivers builder.Receivers
// factories
receiverFactories map[string]receiver.Factory
exporterFactories map[string]exporter.Factory
processorFactories map[string]processor.Factory
factories config.Factories
// stopTestChan is used to terminate the application in end to end tests.
stopTestChan chan struct{}
@ -89,11 +86,13 @@ func New(
exporterFactories map[string]exporter.Factory,
) *Application {
return &Application{
v: viper.New(),
readyChan: make(chan struct{}),
receiverFactories: receiverFactories,
processorFactories: processorFactories,
exporterFactories: exporterFactories,
v: viper.New(),
readyChan: make(chan struct{}),
factories: config.Factories{
Receivers: receiverFactories,
Processors: processorFactories,
Exporters: exporterFactories,
},
}
}
@ -193,7 +192,7 @@ func (app *Application) setupPipelines() {
app.logger.Info("Loading configuration...")
// Load configuration.
cfg, err := config.Load(app.v, app.receiverFactories, app.processorFactories, app.exporterFactories, app.logger)
cfg, err := config.Load(app.v, app.factories, app.logger)
if err != nil {
log.Fatalf("Cannot load configuration: %v", err)
}
@ -204,20 +203,20 @@ func (app *Application) setupPipelines() {
// which are referenced before objects which reference them.
// First create exporters.
app.exporters, err = builder.NewExportersBuilder(app.logger, cfg, app.exporterFactories).Build()
app.exporters, err = builder.NewExportersBuilder(app.logger, cfg, app.factories.Exporters).Build()
if err != nil {
log.Fatalf("Cannot load configuration: %v", err)
}
// Create pipelines and their processors and plug exporters to the
// end of the pipelines.
pipelines, err := builder.NewPipelinesBuilder(app.logger, cfg, app.exporters, app.processorFactories).Build()
pipelines, err := builder.NewPipelinesBuilder(app.logger, cfg, app.exporters, app.factories.Processors).Build()
if err != nil {
log.Fatalf("Cannot load configuration: %v", err)
}
// Create receivers and plug them into the start of the pipelines.
app.builtReceivers, err = builder.NewReceiversBuilder(app.logger, cfg, pipelines, app.receiverFactories).Build()
app.builtReceivers, err = builder.NewReceiversBuilder(app.logger, cfg, pipelines, app.factories.Receivers).Build()
if err != nil {
log.Fatalf("Cannot load configuration: %v", err)
}