Notify extensions of the Collector's effective configuration (#6833)
**Description:** This adds an optional `ConfigWatcher` interface that extensions can implement if they want to be notified of the effective configuration that is used by the Collector. I don't feel very strongly about any of the decisions I made in this PR, so I am open to input if we would like to take a different approach anywhere. I will leave some comments to explain the decisions I made. **Link to tracking Issue:** Closes https://github.com/open-telemetry/opentelemetry-collector/issues/6596 **Testing:** I've made minimal unit test changes, but I expect to write more tests once there is consensus on the direction for implementing this functionality. I have done some manual testing to show that an extension can get a YAML representation of the effective config using two YAML input files. --------- Co-authored-by: Evan Bradley <evan-bradley@users.noreply.github.com>
This commit is contained in:
parent
3f9beca962
commit
49a090ba92
|
|
@ -0,0 +1,16 @@
|
|||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
|
||||
change_type: enhancement
|
||||
|
||||
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
|
||||
component: extension
|
||||
|
||||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
|
||||
note: Add optional `ConfigWatcher` interface
|
||||
|
||||
# One or more tracking issues or pull requests related to the change
|
||||
issues: [6596]
|
||||
|
||||
# (Optional) One or more lines of additional information to render under the primary note.
|
||||
# These lines will be padded with 2 spaces and then inserted directly into the document.
|
||||
# Use pipe (|) for multiline entries.
|
||||
subtext: Extensions implementing this interface will be notified of the Collector's effective config.
|
||||
|
|
@ -0,0 +1,18 @@
|
|||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
|
||||
change_type: enhancement
|
||||
|
||||
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
|
||||
component: otelcol
|
||||
|
||||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
|
||||
note: Add optional `ConfmapProvider` interface for Config Providers
|
||||
|
||||
# One or more tracking issues or pull requests related to the change
|
||||
issues: [6596]
|
||||
|
||||
# (Optional) One or more lines of additional information to render under the primary note.
|
||||
# These lines will be padded with 2 spaces and then inserted directly into the document.
|
||||
# Use pipe (|) for multiline entries.
|
||||
subtext: |
|
||||
This allows providing the Collector's configuration as a marshaled confmap.Conf object
|
||||
from a ConfigProvider
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
|
||||
change_type: enhancement
|
||||
|
||||
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
|
||||
component: service
|
||||
|
||||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
|
||||
note: Add `CollectorConf` field to `service.Settings`
|
||||
|
||||
# One or more tracking issues or pull requests related to the change
|
||||
issues: [6596]
|
||||
|
||||
# (Optional) One or more lines of additional information to render under the primary note.
|
||||
# These lines will be padded with 2 spaces and then inserted directly into the document.
|
||||
# Use pipe (|) for multiline entries.
|
||||
subtext: |
|
||||
This field is intended to be used by the Collector to pass its effective configuration to the service.
|
||||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/confmap"
|
||||
)
|
||||
|
||||
// Extension is the interface for objects hosted by the OpenTelemetry Collector that
|
||||
|
|
@ -32,6 +33,13 @@ type PipelineWatcher interface {
|
|||
NotReady() error
|
||||
}
|
||||
|
||||
// ConfigWatcher is an interface that should be implemented by an extension that
|
||||
// wishes to be notified of the Collector's effective configuration.
|
||||
type ConfigWatcher interface {
|
||||
// NotifyConfig notifies the extension of the Collector's current effective configuration.
|
||||
NotifyConfig(ctx context.Context, conf *confmap.Conf) error
|
||||
}
|
||||
|
||||
// CreateSettings is passed to Factory.Create(...) function.
|
||||
type CreateSettings struct {
|
||||
// ID returns the ID of the component that will be created.
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/confmap"
|
||||
"go.opentelemetry.io/collector/connector"
|
||||
"go.opentelemetry.io/collector/exporter"
|
||||
"go.opentelemetry.io/collector/extension"
|
||||
|
|
@ -143,6 +144,17 @@ func (col *Collector) Shutdown() {
|
|||
func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
|
||||
col.setCollectorState(StateStarting)
|
||||
|
||||
var conf *confmap.Conf
|
||||
|
||||
if cp, ok := col.set.ConfigProvider.(ConfmapProvider); ok {
|
||||
var err error
|
||||
conf, err = cp.GetConfmap(ctx)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to resolve config: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
cfg, err := col.set.ConfigProvider.Get(ctx, col.set.Factories)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get config: %w", err)
|
||||
|
|
@ -154,6 +166,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
|
|||
|
||||
col.service, err = service.New(ctx, service.Settings{
|
||||
BuildInfo: col.set.BuildInfo,
|
||||
CollectorConf: conf,
|
||||
Receivers: receiver.NewBuilder(cfg.Receivers, col.set.Factories.Receivers),
|
||||
Processors: processor.NewBuilder(cfg.Processors, col.set.Factories.Processors),
|
||||
Exporters: exporter.NewBuilder(cfg.Exporters, col.set.Factories.Exporters),
|
||||
|
|
@ -174,6 +187,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
|
|||
return multierr.Combine(err, col.service.Shutdown(ctx))
|
||||
}
|
||||
col.setCollectorState(StateRunning)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/confmap"
|
||||
"go.opentelemetry.io/collector/confmap/converter/expandconverter"
|
||||
)
|
||||
|
||||
func TestStateString(t *testing.T) {
|
||||
|
|
@ -369,6 +371,31 @@ func TestCollectorDryRun(t *testing.T) {
|
|||
require.Error(t, col.DryRun(context.Background()))
|
||||
}
|
||||
|
||||
func TestPassConfmapToServiceFailure(t *testing.T) {
|
||||
factories, err := nopFactories()
|
||||
require.NoError(t, err)
|
||||
|
||||
cfgProvider, err := NewConfigProvider(ConfigProviderSettings{
|
||||
ResolverSettings: confmap.ResolverSettings{
|
||||
URIs: []string{filepath.Join("testdata", "otelcol-invalid.yaml")},
|
||||
Providers: makeMapProvidersMap(newFailureProvider()),
|
||||
Converters: []confmap.Converter{expandconverter.New()},
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
set := CollectorSettings{
|
||||
BuildInfo: component.NewDefaultBuildInfo(),
|
||||
Factories: factories,
|
||||
ConfigProvider: cfgProvider,
|
||||
}
|
||||
col, err := NewCollector(set)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = col.Run(context.Background())
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func startCollector(ctx context.Context, t *testing.T, col *Collector) *sync.WaitGroup {
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
|
|
@ -378,3 +405,21 @@ func startCollector(ctx context.Context, t *testing.T, col *Collector) *sync.Wai
|
|||
}()
|
||||
return wg
|
||||
}
|
||||
|
||||
type failureProvider struct{}
|
||||
|
||||
func newFailureProvider() confmap.Provider {
|
||||
return &failureProvider{}
|
||||
}
|
||||
|
||||
func (fmp *failureProvider) Retrieve(_ context.Context, _ string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) {
|
||||
return nil, errors.New("a failure occurred during configuration retrieval")
|
||||
}
|
||||
|
||||
func (*failureProvider) Scheme() string {
|
||||
return "file"
|
||||
}
|
||||
|
||||
func (*failureProvider) Shutdown(context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,10 +50,26 @@ type ConfigProvider interface {
|
|||
Shutdown(ctx context.Context) error
|
||||
}
|
||||
|
||||
// ConfmapProvider is an optional interface to be implemented by ConfigProviders
|
||||
// to provide confmap.Conf objects representing a marshaled version of the
|
||||
// Collector's configuration.
|
||||
//
|
||||
// The purpose of this interface is that otelcol.ConfigProvider structs do not
|
||||
// necessarily need to use confmap.Conf as their underlying config structure.
|
||||
type ConfmapProvider interface {
|
||||
// GetConfmap resolves the Collector's configuration and provides it as a confmap.Conf object.
|
||||
//
|
||||
// Should never be called concurrently with itself or any ConfigProvider method.
|
||||
GetConfmap(ctx context.Context) (*confmap.Conf, error)
|
||||
}
|
||||
|
||||
type configProvider struct {
|
||||
mapResolver *confmap.Resolver
|
||||
}
|
||||
|
||||
var _ ConfigProvider = &configProvider{}
|
||||
var _ ConfmapProvider = &configProvider{}
|
||||
|
||||
// ConfigProviderSettings are the settings to configure the behavior of the ConfigProvider.
|
||||
type ConfigProviderSettings struct {
|
||||
// ResolverSettings are the settings to configure the behavior of the confmap.Resolver.
|
||||
|
|
@ -106,6 +122,15 @@ func (cm *configProvider) Shutdown(ctx context.Context) error {
|
|||
return cm.mapResolver.Shutdown(ctx)
|
||||
}
|
||||
|
||||
func (cm *configProvider) GetConfmap(ctx context.Context) (*confmap.Conf, error) {
|
||||
conf, err := cm.mapResolver.Resolve(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot resolve the configuration: %w", err)
|
||||
}
|
||||
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
func newDefaultConfigProviderSettings(uris []string) ConfigProviderSettings {
|
||||
return ConfigProviderSettings{
|
||||
ResolverSettings: confmap.ResolverSettings{
|
||||
|
|
|
|||
|
|
@ -11,69 +11,36 @@ import (
|
|||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"gopkg.in/yaml.v3"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
"go.opentelemetry.io/collector/confmap"
|
||||
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
|
||||
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
|
||||
"go.opentelemetry.io/collector/connector/connectortest"
|
||||
"go.opentelemetry.io/collector/exporter/exportertest"
|
||||
"go.opentelemetry.io/collector/extension/extensiontest"
|
||||
"go.opentelemetry.io/collector/processor/processortest"
|
||||
"go.opentelemetry.io/collector/receiver/receivertest"
|
||||
"go.opentelemetry.io/collector/service"
|
||||
"go.opentelemetry.io/collector/service/pipelines"
|
||||
"go.opentelemetry.io/collector/service/telemetry"
|
||||
)
|
||||
|
||||
var configNop = &Config{
|
||||
Receivers: map[component.ID]component.Config{component.NewID("nop"): receivertest.NewNopFactory().CreateDefaultConfig()},
|
||||
Processors: map[component.ID]component.Config{component.NewID("nop"): processortest.NewNopFactory().CreateDefaultConfig()},
|
||||
Exporters: map[component.ID]component.Config{component.NewID("nop"): exportertest.NewNopFactory().CreateDefaultConfig()},
|
||||
Connectors: map[component.ID]component.Config{component.NewIDWithName("nop", "con"): connectortest.NewNopFactory().CreateDefaultConfig()},
|
||||
Extensions: map[component.ID]component.Config{component.NewID("nop"): extensiontest.NewNopFactory().CreateDefaultConfig()},
|
||||
Service: service.Config{
|
||||
Extensions: []component.ID{component.NewID("nop")},
|
||||
Pipelines: pipelines.Config{
|
||||
component.NewID("traces"): {
|
||||
Receivers: []component.ID{component.NewID("nop")},
|
||||
Processors: []component.ID{component.NewID("nop")},
|
||||
Exporters: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "con")},
|
||||
},
|
||||
component.NewID("metrics"): {
|
||||
Receivers: []component.ID{component.NewID("nop")},
|
||||
Processors: []component.ID{component.NewID("nop")},
|
||||
Exporters: []component.ID{component.NewID("nop")},
|
||||
},
|
||||
component.NewID("logs"): {
|
||||
Receivers: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "con")},
|
||||
Processors: []component.ID{component.NewID("nop")},
|
||||
Exporters: []component.ID{component.NewID("nop")},
|
||||
},
|
||||
},
|
||||
Telemetry: telemetry.Config{
|
||||
Logs: telemetry.LogsConfig{
|
||||
Level: zapcore.InfoLevel,
|
||||
Development: false,
|
||||
Encoding: "console",
|
||||
Sampling: &telemetry.LogsSamplingConfig{
|
||||
Initial: 100,
|
||||
Thereafter: 100,
|
||||
},
|
||||
OutputPaths: []string{"stderr"},
|
||||
ErrorOutputPaths: []string{"stderr"},
|
||||
DisableCaller: false,
|
||||
DisableStacktrace: false,
|
||||
InitialFields: map[string]any(nil),
|
||||
},
|
||||
Metrics: telemetry.MetricsConfig{
|
||||
Level: configtelemetry.LevelBasic,
|
||||
Address: "localhost:8888",
|
||||
},
|
||||
},
|
||||
},
|
||||
func newConfig(yamlBytes []byte, factories Factories) (*Config, error) {
|
||||
var stringMap = map[string]interface{}{}
|
||||
err := yaml.Unmarshal(yamlBytes, stringMap)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conf := confmap.NewFromStringMap(stringMap)
|
||||
|
||||
cfg, err := unmarshal(conf, factories)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Config{
|
||||
Receivers: cfg.Receivers.Configs(),
|
||||
Processors: cfg.Processors.Configs(),
|
||||
Exporters: cfg.Exporters.Configs(),
|
||||
Connectors: cfg.Connectors.Configs(),
|
||||
Extensions: cfg.Extensions.Configs(),
|
||||
Service: cfg.Service,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func TestConfigProviderYaml(t *testing.T) {
|
||||
|
|
@ -97,6 +64,10 @@ func TestConfigProviderYaml(t *testing.T) {
|
|||
|
||||
cfg, err := cp.Get(context.Background(), factories)
|
||||
require.NoError(t, err)
|
||||
|
||||
configNop, err := newConfig(yamlBytes, factories)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.EqualValues(t, configNop, cfg)
|
||||
}
|
||||
|
||||
|
|
@ -118,5 +89,41 @@ func TestConfigProviderFile(t *testing.T) {
|
|||
|
||||
cfg, err := cp.Get(context.Background(), factories)
|
||||
require.NoError(t, err)
|
||||
|
||||
yamlBytes, err := os.ReadFile(filepath.Join("testdata", "otelcol-nop.yaml"))
|
||||
require.NoError(t, err)
|
||||
|
||||
configNop, err := newConfig(yamlBytes, factories)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.EqualValues(t, configNop, cfg)
|
||||
}
|
||||
|
||||
func TestGetConfmap(t *testing.T) {
|
||||
uriLocation := "file:" + filepath.Join("testdata", "otelcol-nop.yaml")
|
||||
provider := fileprovider.New()
|
||||
set := ConfigProviderSettings{
|
||||
ResolverSettings: confmap.ResolverSettings{
|
||||
URIs: []string{uriLocation},
|
||||
Providers: map[string]confmap.Provider{provider.Scheme(): provider},
|
||||
},
|
||||
}
|
||||
|
||||
configBytes, err := os.ReadFile(filepath.Join("testdata", "otelcol-nop.yaml"))
|
||||
require.NoError(t, err)
|
||||
|
||||
yamlMap := map[string]any{}
|
||||
err = yaml.Unmarshal(configBytes, yamlMap)
|
||||
require.NoError(t, err)
|
||||
|
||||
cp, err := NewConfigProvider(set)
|
||||
require.NoError(t, err)
|
||||
|
||||
cmp, ok := cp.(ConfmapProvider)
|
||||
require.True(t, ok)
|
||||
|
||||
cmap, err := cmp.GetConfmap(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.EqualValues(t, yamlMap, cmap.ToStringMap())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import (
|
|||
"go.uber.org/multierr"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/confmap"
|
||||
"go.opentelemetry.io/collector/extension"
|
||||
"go.opentelemetry.io/collector/service/internal/components"
|
||||
"go.opentelemetry.io/collector/service/internal/zpages"
|
||||
|
|
@ -72,6 +73,17 @@ func (bes *Extensions) NotifyPipelineNotReady() error {
|
|||
return errs
|
||||
}
|
||||
|
||||
func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) error {
|
||||
var errs error
|
||||
for _, ext := range bes.extMap {
|
||||
if cw, ok := ext.(extension.ConfigWatcher); ok {
|
||||
clonedConf := confmap.NewFromStringMap(conf.ToStringMap())
|
||||
errs = multierr.Append(errs, cw.NotifyConfig(ctx, clonedConf))
|
||||
}
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
||||
func (bes *Extensions) GetExtensions() map[component.ID]component.Component {
|
||||
result := make(map[component.ID]component.Component, len(bes.extMap))
|
||||
for extID, v := range bes.extMap {
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import (
|
|||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/component/componenttest"
|
||||
"go.opentelemetry.io/collector/confmap"
|
||||
"go.opentelemetry.io/collector/extension"
|
||||
"go.opentelemetry.io/collector/extension/extensiontest"
|
||||
)
|
||||
|
|
@ -91,6 +92,132 @@ func TestBuildExtensions(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestNotifyConfig(t *testing.T) {
|
||||
notificationError := errors.New("Error processing config")
|
||||
nopExtensionFactory := extensiontest.NewNopFactory()
|
||||
nopExtensionConfig := nopExtensionFactory.CreateDefaultConfig()
|
||||
n1ExtensionFactory := newConfigWatcherExtensionFactory("notifiable1", func() error { return nil })
|
||||
n1ExtensionConfig := n1ExtensionFactory.CreateDefaultConfig()
|
||||
n2ExtensionFactory := newConfigWatcherExtensionFactory("notifiable2", func() error { return nil })
|
||||
n2ExtensionConfig := n1ExtensionFactory.CreateDefaultConfig()
|
||||
nErrExtensionFactory := newConfigWatcherExtensionFactory("notifiableErr", func() error { return notificationError })
|
||||
nErrExtensionConfig := nErrExtensionFactory.CreateDefaultConfig()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
factories map[component.Type]extension.Factory
|
||||
extensionsConfigs map[component.ID]component.Config
|
||||
serviceExtensions []component.ID
|
||||
wantErrMsg string
|
||||
want error
|
||||
}{
|
||||
{
|
||||
name: "No notifiable extensions",
|
||||
factories: map[component.Type]extension.Factory{
|
||||
"nop": nopExtensionFactory,
|
||||
},
|
||||
extensionsConfigs: map[component.ID]component.Config{
|
||||
component.NewID("nop"): nopExtensionConfig,
|
||||
},
|
||||
serviceExtensions: []component.ID{
|
||||
component.NewID("nop"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "One notifiable extension",
|
||||
factories: map[component.Type]extension.Factory{
|
||||
"notifiable1": n1ExtensionFactory,
|
||||
},
|
||||
extensionsConfigs: map[component.ID]component.Config{
|
||||
component.NewID("notifiable1"): n1ExtensionConfig,
|
||||
},
|
||||
serviceExtensions: []component.ID{
|
||||
component.NewID("notifiable1"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Multiple notifiable extensions",
|
||||
factories: map[component.Type]extension.Factory{
|
||||
"notifiable1": n1ExtensionFactory,
|
||||
"notifiable2": n2ExtensionFactory,
|
||||
},
|
||||
extensionsConfigs: map[component.ID]component.Config{
|
||||
component.NewID("notifiable1"): n1ExtensionConfig,
|
||||
component.NewID("notifiable2"): n2ExtensionConfig,
|
||||
},
|
||||
serviceExtensions: []component.ID{
|
||||
component.NewID("notifiable1"),
|
||||
component.NewID("notifiable2"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Errors in extension notification",
|
||||
factories: map[component.Type]extension.Factory{
|
||||
"notifiableErr": nErrExtensionFactory,
|
||||
},
|
||||
extensionsConfigs: map[component.ID]component.Config{
|
||||
component.NewID("notifiableErr"): nErrExtensionConfig,
|
||||
},
|
||||
serviceExtensions: []component.ID{
|
||||
component.NewID("notifiableErr"),
|
||||
},
|
||||
want: notificationError,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
extensions, err := New(context.Background(), Settings{
|
||||
Telemetry: componenttest.NewNopTelemetrySettings(),
|
||||
BuildInfo: component.NewDefaultBuildInfo(),
|
||||
Configs: tt.extensionsConfigs,
|
||||
Factories: tt.factories,
|
||||
}, tt.serviceExtensions)
|
||||
assert.NoError(t, err)
|
||||
errs := extensions.NotifyConfig(context.Background(), confmap.NewFromStringMap(map[string]interface{}{}))
|
||||
assert.Equal(t, tt.want, errs)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type configWatcherExtension struct {
|
||||
fn func() error
|
||||
}
|
||||
|
||||
func (comp *configWatcherExtension) Start(_ context.Context, _ component.Host) error {
|
||||
return comp.fn()
|
||||
}
|
||||
|
||||
func (comp *configWatcherExtension) Shutdown(_ context.Context) error {
|
||||
return comp.fn()
|
||||
}
|
||||
|
||||
func (comp *configWatcherExtension) NotifyConfig(_ context.Context, _ *confmap.Conf) error {
|
||||
return comp.fn()
|
||||
}
|
||||
|
||||
func newConfigWatcherExtension(fn func() error) *configWatcherExtension {
|
||||
comp := &configWatcherExtension{
|
||||
fn: fn,
|
||||
}
|
||||
|
||||
return comp
|
||||
|
||||
}
|
||||
|
||||
func newConfigWatcherExtensionFactory(name component.Type, fn func() error) extension.Factory {
|
||||
return extension.NewFactory(
|
||||
name,
|
||||
func() component.Config {
|
||||
return &struct{}{}
|
||||
},
|
||||
func(ctx context.Context, set extension.CreateSettings, extension component.Config) (extension.Extension, error) {
|
||||
return newConfigWatcherExtension(fn), nil
|
||||
},
|
||||
component.StabilityLevelDevelopment,
|
||||
)
|
||||
}
|
||||
|
||||
func newBadExtensionFactory() extension.Factory {
|
||||
return extension.NewFactory(
|
||||
"bf",
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ import (
|
|||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
"go.opentelemetry.io/collector/confmap"
|
||||
"go.opentelemetry.io/collector/connector"
|
||||
"go.opentelemetry.io/collector/exporter"
|
||||
"go.opentelemetry.io/collector/extension"
|
||||
|
|
@ -36,6 +37,9 @@ type Settings struct {
|
|||
// BuildInfo provides collector start information.
|
||||
BuildInfo component.BuildInfo
|
||||
|
||||
// CollectorConf contains the Collector's current configuration
|
||||
CollectorConf *confmap.Conf
|
||||
|
||||
// Receivers builder for receivers.
|
||||
Receivers *receiver.Builder
|
||||
|
||||
|
|
@ -68,6 +72,7 @@ type Service struct {
|
|||
telemetrySettings component.TelemetrySettings
|
||||
host *serviceHost
|
||||
telemetryInitializer *telemetryInitializer
|
||||
collectorConf *confmap.Conf
|
||||
}
|
||||
|
||||
func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
|
||||
|
|
@ -89,6 +94,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
|
|||
asyncErrorChannel: set.AsyncErrorChannel,
|
||||
},
|
||||
telemetryInitializer: newColTelemetry(useOtel, disableHighCard, extendedConfig),
|
||||
collectorConf: set.CollectorConf,
|
||||
}
|
||||
var err error
|
||||
srv.telemetry, err = telemetry.New(ctx, telemetry.Settings{ZapOptions: set.LoggingOptions}, cfg.Telemetry)
|
||||
|
|
@ -138,6 +144,12 @@ func (srv *Service) Start(ctx context.Context) error {
|
|||
return fmt.Errorf("failed to start extensions: %w", err)
|
||||
}
|
||||
|
||||
if srv.collectorConf != nil {
|
||||
if err := srv.host.serviceExtensions.NotifyConfig(ctx, srv.collectorConf); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := srv.host.pipelines.StartAll(ctx, srv.host); err != nil {
|
||||
return fmt.Errorf("cannot start pipelines: %w", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ package service
|
|||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
|
@ -21,6 +22,7 @@ import (
|
|||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/config/confignet"
|
||||
"go.opentelemetry.io/collector/config/configtelemetry"
|
||||
"go.opentelemetry.io/collector/confmap"
|
||||
"go.opentelemetry.io/collector/connector/connectortest"
|
||||
"go.opentelemetry.io/collector/exporter/exportertest"
|
||||
"go.opentelemetry.io/collector/extension"
|
||||
|
|
@ -356,6 +358,51 @@ func TestServiceTelemetryRestart(t *testing.T) {
|
|||
assert.NoError(t, srvTwo.Shutdown(context.Background()))
|
||||
}
|
||||
|
||||
func TestExtensionNotificationFailure(t *testing.T) {
|
||||
set := newNopSettings()
|
||||
cfg := newNopConfig()
|
||||
|
||||
var extName component.Type = "configWatcher"
|
||||
configWatcherExtensionFactory := newConfigWatcherExtensionFactory(extName)
|
||||
set.Extensions = extension.NewBuilder(
|
||||
map[component.ID]component.Config{component.NewID(extName): configWatcherExtensionFactory.CreateDefaultConfig()},
|
||||
map[component.Type]extension.Factory{extName: configWatcherExtensionFactory})
|
||||
cfg.Extensions = []component.ID{component.NewID(extName)}
|
||||
|
||||
// Create a service
|
||||
srv, err := New(context.Background(), set, cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start the service
|
||||
require.Error(t, srv.Start(context.Background()))
|
||||
|
||||
// Shut down the service
|
||||
require.NoError(t, srv.Shutdown(context.Background()))
|
||||
}
|
||||
|
||||
func TestNilCollectorEffectiveConfig(t *testing.T) {
|
||||
set := newNopSettings()
|
||||
set.CollectorConf = nil
|
||||
cfg := newNopConfig()
|
||||
|
||||
var extName component.Type = "configWatcher"
|
||||
configWatcherExtensionFactory := newConfigWatcherExtensionFactory(extName)
|
||||
set.Extensions = extension.NewBuilder(
|
||||
map[component.ID]component.Config{component.NewID(extName): configWatcherExtensionFactory.CreateDefaultConfig()},
|
||||
map[component.Type]extension.Factory{extName: configWatcherExtensionFactory})
|
||||
cfg.Extensions = []component.ID{component.NewID(extName)}
|
||||
|
||||
// Create a service
|
||||
srv, err := New(context.Background(), set, cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start the service
|
||||
require.NoError(t, srv.Start(context.Background()))
|
||||
|
||||
// Shut down the service
|
||||
require.NoError(t, srv.Shutdown(context.Background()))
|
||||
}
|
||||
|
||||
func assertResourceLabels(t *testing.T, res pcommon.Resource, expectedLabels map[string]labelValue) {
|
||||
for key, labelValue := range expectedLabels {
|
||||
lookupKey, ok := prometheusToOtelConv[key]
|
||||
|
|
@ -446,12 +493,13 @@ func assertZPages(t *testing.T, zpagesAddr string) {
|
|||
|
||||
func newNopSettings() Settings {
|
||||
return Settings{
|
||||
BuildInfo: component.NewDefaultBuildInfo(),
|
||||
Receivers: receivertest.NewNopBuilder(),
|
||||
Processors: processortest.NewNopBuilder(),
|
||||
Exporters: exportertest.NewNopBuilder(),
|
||||
Connectors: connectortest.NewNopBuilder(),
|
||||
Extensions: extensiontest.NewNopBuilder(),
|
||||
BuildInfo: component.NewDefaultBuildInfo(),
|
||||
CollectorConf: confmap.New(),
|
||||
Receivers: receivertest.NewNopBuilder(),
|
||||
Processors: processortest.NewNopBuilder(),
|
||||
Exporters: exportertest.NewNopBuilder(),
|
||||
Connectors: connectortest.NewNopBuilder(),
|
||||
Extensions: extensiontest.NewNopBuilder(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -501,3 +549,30 @@ func newNopConfigPipelineConfigs(pipelineCfgs pipelines.Config) Config {
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
type configWatcherExtension struct{}
|
||||
|
||||
func (comp *configWatcherExtension) Start(_ context.Context, _ component.Host) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (comp *configWatcherExtension) Shutdown(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (comp *configWatcherExtension) NotifyConfig(_ context.Context, _ *confmap.Conf) error {
|
||||
return errors.New("Failed to resolve config")
|
||||
}
|
||||
|
||||
func newConfigWatcherExtensionFactory(name component.Type) extension.Factory {
|
||||
return extension.NewFactory(
|
||||
name,
|
||||
func() component.Config {
|
||||
return &struct{}{}
|
||||
},
|
||||
func(ctx context.Context, set extension.CreateSettings, extension component.Config) (extension.Extension, error) {
|
||||
return &configWatcherExtension{}, nil
|
||||
},
|
||||
component.StabilityLevelDevelopment,
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue