Remove already deprecates config.Config (#6394)

* Remove already deprecates config.Config

Signed-off-by: Bogdan <bogdandrutu@gmail.com>

* Update service/internal/configunmarshaler/defaultunmarshaler.go

Co-authored-by: Alex Boten <alex@boten.ca>

Signed-off-by: Bogdan <bogdandrutu@gmail.com>
Co-authored-by: Alex Boten <alex@boten.ca>
This commit is contained in:
Bogdan Drutu 2022-10-26 15:01:35 -07:00 committed by GitHub
parent 99fdd1f563
commit 3262fd8f8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 195 additions and 210 deletions

11
.chloggen/rmconfigconfig.yaml Executable file
View File

@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: config
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Remove already deprecates `config.Config`.
# One or more tracking issues or pull requests related to the change
issues: [6394]

View File

@ -15,145 +15,9 @@
package config // import "go.opentelemetry.io/collector/config"
import (
"errors"
"fmt"
"go.opentelemetry.io/collector/service/telemetry"
)
var (
errMissingExporters = errors.New("no enabled exporters specified in config")
errMissingReceivers = errors.New("no enabled receivers specified in config")
errMissingServicePipelines = errors.New("service must have at least one pipeline")
)
// Config defines the configuration for the various elements of collector or agent.
// Deprecated: [v0.52.0] Use service.Config
type Config struct {
// Receivers is a map of ComponentID to Receivers.
Receivers map[ComponentID]Receiver
// Exporters is a map of ComponentID to Exporters.
Exporters map[ComponentID]Exporter
// Processors is a map of ComponentID to Processors.
Processors map[ComponentID]Processor
// Extensions is a map of ComponentID to extensions.
Extensions map[ComponentID]Extension
Service
}
var _ validatable = (*Config)(nil)
// Validate returns an error if the config is invalid.
//
// This function performs basic validation of configuration. There may be more subtle
// invalid cases that we currently don't check for but which we may want to add in
// the future (e.g. disallowing receiving and exporting on the same endpoint).
func (cfg *Config) Validate() error {
// Currently, there is no default receiver enabled.
// The configuration must specify at least one receiver to be valid.
if len(cfg.Receivers) == 0 {
return errMissingReceivers
}
// Validate the receiver configuration.
for recvID, recvCfg := range cfg.Receivers {
if err := recvCfg.Validate(); err != nil {
return fmt.Errorf("receiver %q has invalid configuration: %w", recvID, err)
}
}
// Currently, there is no default exporter enabled.
// The configuration must specify at least one exporter to be valid.
if len(cfg.Exporters) == 0 {
return errMissingExporters
}
// Validate the exporter configuration.
for expID, expCfg := range cfg.Exporters {
if err := expCfg.Validate(); err != nil {
return fmt.Errorf("exporter %q has invalid configuration: %w", expID, err)
}
}
// Validate the processor configuration.
for procID, procCfg := range cfg.Processors {
if err := procCfg.Validate(); err != nil {
return fmt.Errorf("processor %q has invalid configuration: %w", procID, err)
}
}
// Validate the extension configuration.
for extID, extCfg := range cfg.Extensions {
if err := extCfg.Validate(); err != nil {
return fmt.Errorf("extension %q has invalid configuration: %w", extID, err)
}
}
return cfg.validateService()
}
func (cfg *Config) validateService() error {
// Check that all enabled extensions in the service are configured.
for _, ref := range cfg.Service.Extensions {
// Check that the name referenced in the Service extensions exists in the top-level extensions.
if cfg.Extensions[ref] == nil {
return fmt.Errorf("service references extension %q which does not exist", ref)
}
}
// Must have at least one pipeline.
if len(cfg.Service.Pipelines) == 0 {
return errMissingServicePipelines
}
// Check that all pipelines have at least one receiver and one exporter, and they reference
// only configured components.
for pipelineID, pipeline := range cfg.Service.Pipelines {
if pipelineID.Type() != TracesDataType && pipelineID.Type() != MetricsDataType && pipelineID.Type() != LogsDataType {
return fmt.Errorf("unknown pipeline datatype %q for %v", pipelineID.Type(), pipelineID)
}
// Validate pipeline has at least one receiver.
if len(pipeline.Receivers) == 0 {
return fmt.Errorf("pipeline %q must have at least one receiver", pipelineID)
}
// Validate pipeline receiver name references.
for _, ref := range pipeline.Receivers {
// Check that the name referenced in the pipeline's receivers exists in the top-level receivers.
if cfg.Receivers[ref] == nil {
return fmt.Errorf("pipeline %q references receiver %q which does not exist", pipelineID, ref)
}
}
// Validate pipeline processor name references.
for _, ref := range pipeline.Processors {
// Check that the name referenced in the pipeline's processors exists in the top-level processors.
if cfg.Processors[ref] == nil {
return fmt.Errorf("pipeline %q references processor %q which does not exist", pipelineID, ref)
}
}
// Validate pipeline has at least one exporter.
if len(pipeline.Exporters) == 0 {
return fmt.Errorf("pipeline %q must have at least one exporter", pipelineID)
}
// Validate pipeline exporter name references.
for _, ref := range pipeline.Exporters {
// Check that the name referenced in the pipeline's Exporters exists in the top-level Exporters.
if cfg.Exporters[ref] == nil {
return fmt.Errorf("pipeline %q references exporter %q which does not exist", pipelineID, ref)
}
}
}
return nil
}
// Service defines the configurable components of the service.
// Deprecated: [v0.52.0] Use service.ConfigService
type Service struct {

View File

@ -15,10 +15,141 @@
package service // import "go.opentelemetry.io/collector/service"
import (
"errors"
"fmt"
"go.opentelemetry.io/collector/config"
)
type Config = config.Config
var (
errMissingExporters = errors.New("no enabled exporters specified in config")
errMissingReceivers = errors.New("no enabled receivers specified in config")
errMissingServicePipelines = errors.New("service must have at least one pipeline")
)
// Config defines the configuration for the various elements of collector or agent.
type Config struct {
// Receivers is a map of ComponentID to Receivers.
Receivers map[config.ComponentID]config.Receiver
// Exporters is a map of ComponentID to Exporters.
Exporters map[config.ComponentID]config.Exporter
// Processors is a map of ComponentID to Processors.
Processors map[config.ComponentID]config.Processor
// Extensions is a map of ComponentID to extensions.
Extensions map[config.ComponentID]config.Extension
Service ConfigService
}
// Validate returns an error if the config is invalid.
//
// This function performs basic validation of configuration. There may be more subtle
// invalid cases that we currently don't check for but which we may want to add in
// the future (e.g. disallowing receiving and exporting on the same endpoint).
func (cfg *Config) Validate() error {
// Currently, there is no default receiver enabled.
// The configuration must specify at least one receiver to be valid.
if len(cfg.Receivers) == 0 {
return errMissingReceivers
}
// Validate the receiver configuration.
for recvID, recvCfg := range cfg.Receivers {
if err := recvCfg.Validate(); err != nil {
return fmt.Errorf("receiver %q has invalid configuration: %w", recvID, err)
}
}
// Currently, there is no default exporter enabled.
// The configuration must specify at least one exporter to be valid.
if len(cfg.Exporters) == 0 {
return errMissingExporters
}
// Validate the exporter configuration.
for expID, expCfg := range cfg.Exporters {
if err := expCfg.Validate(); err != nil {
return fmt.Errorf("exporter %q has invalid configuration: %w", expID, err)
}
}
// Validate the processor configuration.
for procID, procCfg := range cfg.Processors {
if err := procCfg.Validate(); err != nil {
return fmt.Errorf("processor %q has invalid configuration: %w", procID, err)
}
}
// Validate the extension configuration.
for extID, extCfg := range cfg.Extensions {
if err := extCfg.Validate(); err != nil {
return fmt.Errorf("extension %q has invalid configuration: %w", extID, err)
}
}
return cfg.validateService()
}
func (cfg *Config) validateService() error {
// Check that all enabled extensions in the service are configured.
for _, ref := range cfg.Service.Extensions {
// Check that the name referenced in the Service extensions exists in the top-level extensions.
if cfg.Extensions[ref] == nil {
return fmt.Errorf("service references extension %q which does not exist", ref)
}
}
// Must have at least one pipeline.
if len(cfg.Service.Pipelines) == 0 {
return errMissingServicePipelines
}
// Check that all pipelines have at least one receiver and one exporter, and they reference
// only configured components.
for pipelineID, pipeline := range cfg.Service.Pipelines {
if pipelineID.Type() != config.TracesDataType && pipelineID.Type() != config.MetricsDataType && pipelineID.Type() != config.LogsDataType {
return fmt.Errorf("unknown pipeline datatype %q for %v", pipelineID.Type(), pipelineID)
}
// Validate pipeline has at least one receiver.
if len(pipeline.Receivers) == 0 {
return fmt.Errorf("pipeline %q must have at least one receiver", pipelineID)
}
// Validate pipeline receiver name references.
for _, ref := range pipeline.Receivers {
// Check that the name referenced in the pipeline's receivers exists in the top-level receivers.
if cfg.Receivers[ref] == nil {
return fmt.Errorf("pipeline %q references receiver %q which does not exist", pipelineID, ref)
}
}
// Validate pipeline processor name references.
for _, ref := range pipeline.Processors {
// Check that the name referenced in the pipeline's processors exists in the top-level processors.
if cfg.Processors[ref] == nil {
return fmt.Errorf("pipeline %q references processor %q which does not exist", pipelineID, ref)
}
}
// Validate pipeline has at least one exporter.
if len(pipeline.Exporters) == 0 {
return fmt.Errorf("pipeline %q must have at least one exporter", pipelineID)
}
// Validate pipeline exporter name references.
for _, ref := range pipeline.Exporters {
// Check that the name referenced in the pipeline's Exporters exists in the top-level Exporters.
if cfg.Exporters[ref] == nil {
return fmt.Errorf("pipeline %q references exporter %q which does not exist", pipelineID, ref)
}
}
}
return nil
}
type ConfigService = config.Service

View File

@ -100,17 +100,23 @@ func NewConfigProvider(set ConfigProviderSettings) (ConfigProvider, error) {
}
func (cm *configProvider) Get(ctx context.Context, factories component.Factories) (*Config, error) {
retMap, err := cm.mapResolver.Resolve(ctx)
conf, err := cm.mapResolver.Resolve(ctx)
if err != nil {
return nil, fmt.Errorf("cannot resolve the configuration: %w", err)
}
var cfg *Config
if cfg, err = configunmarshaler.Unmarshal(retMap, factories); err != nil {
cfg, err := configunmarshaler.Unmarshal(conf, factories)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal the configuration: %w", err)
}
return cfg, nil
return &Config{
Receivers: cfg.Receivers.GetReceivers(),
Processors: cfg.Processors.GetProcessors(),
Exporters: cfg.Exporters.GetExporters(),
Extensions: cfg.Extensions.GetExtensions(),
Service: cfg.Service,
}, nil
}
func (cm *configProvider) Watch() <-chan error {

View File

@ -28,10 +28,6 @@ import (
)
var (
errMissingExporters = errors.New("no enabled exporters specified in config")
errMissingReceivers = errors.New("no enabled receivers specified in config")
errMissingServicePipelines = errors.New("service must have at least one pipeline")
errInvalidRecvConfig = errors.New("invalid receiver config")
errInvalidExpConfig = errors.New("invalid exporter config")
errInvalidProcConfig = errors.New("invalid processor config")

View File

@ -46,7 +46,7 @@ type configError struct {
code configErrorCode
}
type configSettings struct {
type Config struct {
Receivers *Receivers `mapstructure:"receivers"`
Processors *Processors `mapstructure:"processors"`
Exporters *Exporters `mapstructure:"exporters"`
@ -54,11 +54,11 @@ type configSettings struct {
Service config.Service `mapstructure:"service"`
}
// Unmarshal the config.Config from a confmap.Conf.
// Unmarshal the Config from a confmap.Conf.
// After the config is unmarshalled, `Validate()` must be called to validate.
func Unmarshal(v *confmap.Conf, factories component.Factories) (*config.Config, error) {
func Unmarshal(v *confmap.Conf, factories component.Factories) (*Config, error) {
// Unmarshal top level sections and validate.
rawCfg := configSettings{
cfg := &Config{
Receivers: NewReceivers(factories.Receivers),
Processors: NewProcessors(factories.Processors),
Exporters: NewExporters(factories.Exporters),
@ -83,21 +83,13 @@ func Unmarshal(v *confmap.Conf, factories component.Factories) (*config.Config,
},
},
}
if err := v.Unmarshal(&rawCfg, confmap.WithErrorUnused()); err != nil {
if err := v.Unmarshal(&cfg, confmap.WithErrorUnused()); err != nil {
return nil, configError{
error: fmt.Errorf("error reading top level configuration sections: %w", err),
code: errUnmarshalTopLevelStructure,
}
}
cfg := &config.Config{
Receivers: rawCfg.Receivers.GetReceivers(),
Processors: rawCfg.Processors.GetProcessors(),
Exporters: rawCfg.Exporters.GetExporters(),
Extensions: rawCfg.Extensions.GetExtensions(),
Service: rawCfg.Service,
}
return cfg, nil
}

View File

@ -75,47 +75,38 @@ func TestUnmarshal(t *testing.T) {
cfg, err := loadConfigFile(t, filepath.Join("testdata", "valid-config.yaml"), factories)
require.NoError(t, err, "Unable to load config")
assert.Equal(t, map[config.ComponentID]config.Receiver{config.NewComponentID("nop"): factories.Receivers["nop"].CreateDefaultConfig()}, cfg.Receivers.GetReceivers())
assert.Equal(t, map[config.ComponentID]config.Processor{config.NewComponentID("nop"): factories.Processors["nop"].CreateDefaultConfig()}, cfg.Processors.GetProcessors())
assert.Equal(t, map[config.ComponentID]config.Exporter{config.NewComponentID("nop"): factories.Exporters["nop"].CreateDefaultConfig()}, cfg.Exporters.GetExporters())
assert.Equal(t, map[config.ComponentID]config.Extension{config.NewComponentID("nop"): factories.Extensions["nop"].CreateDefaultConfig()}, cfg.Extensions.GetExtensions())
assert.Equal(t,
&config.Config{
Receivers: map[config.ComponentID]config.Receiver{
config.NewComponentID("nop"): factories.Receivers["nop"].CreateDefaultConfig(),
},
Exporters: map[config.ComponentID]config.Exporter{
config.NewComponentID("nop"): factories.Exporters["nop"].CreateDefaultConfig(),
},
Processors: map[config.ComponentID]config.Processor{
config.NewComponentID("nop"): factories.Processors["nop"].CreateDefaultConfig(),
},
Extensions: map[config.ComponentID]config.Extension{
config.NewComponentID("nop"): factories.Extensions["nop"].CreateDefaultConfig(),
},
Service: config.Service{
Extensions: []config.ComponentID{config.NewComponentID("nop")},
Pipelines: map[config.ComponentID]*config.Pipeline{
config.NewComponentID("traces"): {
Receivers: []config.ComponentID{config.NewComponentID("nop")},
Processors: []config.ComponentID{config.NewComponentID("nop")},
Exporters: []config.ComponentID{config.NewComponentID("nop")},
},
},
Telemetry: telemetry.Config{
Logs: telemetry.LogsConfig{
Level: zapcore.DebugLevel,
Development: true,
Encoding: "console",
DisableCaller: true,
DisableStacktrace: true,
OutputPaths: []string{"stderr", "./output-logs"},
ErrorOutputPaths: []string{"stderr", "./error-output-logs"},
InitialFields: map[string]interface{}{"field_key": "filed_value"},
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelNormal,
Address: ":8081",
},
config.Service{
Extensions: []config.ComponentID{config.NewComponentID("nop")},
Pipelines: map[config.ComponentID]*config.Pipeline{
config.NewComponentID("traces"): {
Receivers: []config.ComponentID{config.NewComponentID("nop")},
Processors: []config.ComponentID{config.NewComponentID("nop")},
Exporters: []config.ComponentID{config.NewComponentID("nop")},
},
},
}, cfg)
Telemetry: telemetry.Config{
Logs: telemetry.LogsConfig{
Level: zapcore.DebugLevel,
Development: true,
Encoding: "console",
DisableCaller: true,
DisableStacktrace: true,
OutputPaths: []string{"stderr", "./output-logs"},
ErrorOutputPaths: []string{"stderr", "./error-output-logs"},
InitialFields: map[string]interface{}{"field_key": "filed_value"},
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelNormal,
Address: ":8081",
},
},
}, cfg.Service)
}
func TestUnmarshalError(t *testing.T) {
@ -157,7 +148,7 @@ func TestUnmarshalError(t *testing.T) {
}
}
func loadConfigFile(t *testing.T, fileName string, factories component.Factories) (*config.Config, error) {
func loadConfigFile(t *testing.T, fileName string, factories component.Factories) (*Config, error) {
cm, err := confmaptest.LoadConf(fileName)
require.NoError(t, err)

View File

@ -88,7 +88,7 @@ func TestBuild(t *testing.T) {
factories, err := testcomponents.ExampleComponents()
assert.NoError(t, err)
cfg := loadConfigAndValidate(t, filepath.Join("testdata", test.name), factories)
cfg := loadConfig(t, filepath.Join("testdata", test.name), factories)
// Build the pipeline
pipelines, err := Build(context.Background(), toSettings(factories, cfg))
@ -434,16 +434,16 @@ func newErrExporterFactory() component.ExporterFactory {
)
}
func toSettings(factories component.Factories, cfg *config.Config) Settings {
func toSettings(factories component.Factories, cfg *configunmarshaler.Config) Settings {
return Settings{
Telemetry: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
ReceiverFactories: factories.Receivers,
ReceiverConfigs: cfg.Receivers,
ReceiverConfigs: cfg.Receivers.GetReceivers(),
ProcessorFactories: factories.Processors,
ProcessorConfigs: cfg.Processors,
ProcessorConfigs: cfg.Processors.GetProcessors(),
ExporterFactories: factories.Exporters,
ExporterConfigs: cfg.Exporters,
ExporterConfigs: cfg.Exporters.GetExporters(),
PipelineConfigs: cfg.Service.Pipelines,
}
}
@ -464,7 +464,7 @@ func (e errComponent) Shutdown(context.Context) error {
return errors.New("my error")
}
func loadConfig(t *testing.T, fileName string, factories component.Factories) *config.Config {
func loadConfig(t *testing.T, fileName string, factories component.Factories) *configunmarshaler.Config {
// Read yaml config from file
conf, err := confmaptest.LoadConf(fileName)
require.NoError(t, err)
@ -472,9 +472,3 @@ func loadConfig(t *testing.T, fileName string, factories component.Factories) *c
require.NoError(t, err)
return cfg
}
func loadConfigAndValidate(t *testing.T, fileName string, factories component.Factories) *config.Config {
cfg := loadConfig(t, fileName, factories)
require.NoError(t, cfg.Validate())
return cfg
}

View File

@ -163,7 +163,7 @@ func (srv *service) initExtensionsAndPipeline(set *settings) error {
return fmt.Errorf("cannot build pipelines: %w", err)
}
if set.Config.Telemetry.Metrics.Level != configtelemetry.LevelNone && set.Config.Telemetry.Metrics.Address != "" {
if set.Config.Service.Telemetry.Metrics.Level != configtelemetry.LevelNone && set.Config.Service.Telemetry.Metrics.Address != "" {
// The process telemetry initialization requires the ballast size, which is available after the extensions are initialized.
if err = proctelemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
return fmt.Errorf("failed to register process metrics: %w", err)