[chore] service/telemetry: encapsulate SDK creation (#13606)

#### Description

Encapsulate opentelemetry-go SDK creation in
service/telemetry/otelconftelemetry.

To enable this change, the Factory interface now provides
`CreateProviders` method in place of the old `CreateLogger`,
`CreateMeterProvider`, and `CreateTracerProvider` methods which have now
been removed. `CreateProviders` returns an instance of the new
`telemetry.Providers` interface, which provides access to telemetry
providers.

In a follow-up I will move the `Factory` interface from
`service/telemetry/otelconftelemetry` to `service/telemetry`, and inject
the default level-based metric views through `telemetry.Settings`.

#### Link to tracking issue

Part of #4970

#### Testing

N/A, non-functional change.

#### Documentation

N/A
This commit is contained in:
Andrew Wilkins 2025-08-27 19:21:48 +08:00 committed by GitHub
parent a0d335da98
commit 5a5da91c88
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 346 additions and 521 deletions

View File

@ -27,11 +27,9 @@ type configSettings struct {
// unmarshal the configSettings from a confmap.Conf.
// After the config is unmarshalled, `Validate()` must be called to validate.
func unmarshal(v *confmap.Conf, factories Factories) (*configSettings, error) {
// TODO remove these params once SDK and resource creation are encapsulated
// within the otelconftelemetry factory. They are not used when creating
// the default config.
// TODO: inject the telemetry factory through factories, once available.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/4970
telFactory := otelconftelemetry.NewFactory(nil, nil)
telFactory := otelconftelemetry.NewFactory()
defaultTelConfig := *telFactory.CreateDefaultConfig().(*otelconftelemetry.Config)
// Unmarshal top level sections and validate.

View File

@ -79,7 +79,7 @@ func TestConfigValidate(t *testing.T) {
}
func TestConfmapMarshalConfig(t *testing.T) {
telFactory := otelconftelemetry.NewFactory(nil, nil)
telFactory := otelconftelemetry.NewFactory()
defaultTelConfig := *telFactory.CreateDefaultConfig().(*otelconftelemetry.Config)
conf := confmap.New()

View File

@ -13,9 +13,7 @@ import (
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
nooptrace "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/multierr"
"go.uber.org/zap"
@ -37,7 +35,6 @@ import (
"go.opentelemetry.io/collector/service/internal/graph"
"go.opentelemetry.io/collector/service/internal/moduleinfo"
"go.opentelemetry.io/collector/service/internal/proctelemetry"
"go.opentelemetry.io/collector/service/internal/resource"
"go.opentelemetry.io/collector/service/internal/status"
"go.opentelemetry.io/collector/service/telemetry"
"go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"
@ -101,11 +98,11 @@ type Settings struct {
// Service represents the implementation of a component.Host.
type Service struct {
buildInfo component.BuildInfo
telemetrySettings component.TelemetrySettings
host *graph.Host
collectorConf *confmap.Conf
sdk *config.SDK
buildInfo component.BuildInfo
telemetrySettings component.TelemetrySettings
host *graph.Host
collectorConf *confmap.Conf
telemetryProviders telemetry.Providers
}
// New creates a new Service, its telemetry, and Components.
@ -126,39 +123,31 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
collectorConf: set.CollectorConf,
}
// Fetch data for internal telemetry like instance id and sdk version to provide for internal telemetry.
res := resource.New(set.BuildInfo, cfg.Telemetry.Resource)
pcommonRes := pdataFromSdk(res)
mpConfig := &cfg.Telemetry.Metrics.MeterProvider
if mpConfig.Views == nil {
mpConfig.Views = configureViews(cfg.Telemetry.Metrics.Level)
}
sdk, err := otelconftelemetry.NewSDK(ctx, &cfg.Telemetry, res)
if err != nil {
return nil, fmt.Errorf("failed to create SDK: %w", err)
}
srv.sdk = sdk
defer func() {
if err != nil {
err = multierr.Append(err, sdk.Shutdown(ctx))
}
}()
telFactory := otelconftelemetry.NewFactory(sdk, res)
telFactory := otelconftelemetry.NewFactory()
telset := telemetry.Settings{
BuildInfo: set.BuildInfo,
ZapOptions: set.LoggingOptions,
}
logger, loggerProvider, err := telFactory.CreateLogger(ctx, telset, &cfg.Telemetry)
telProviders, err := telFactory.CreateProviders(ctx, telset, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create logger: %w", err)
return nil, fmt.Errorf("failed to create telemetry providers: %w", err)
}
srv.telemetryProviders = telProviders
defer func() {
if err != nil {
err = multierr.Append(err, telProviders.Shutdown(ctx))
}
}()
// Use initialized logger to handle any subsequent errors
// https://github.com/open-telemetry/opentelemetry-collector/pull/13081
logger := telProviders.Logger()
defer func() {
if err != nil {
logger.Error("error found during service initialization", zap.Error(err))
@ -168,6 +157,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
// Wrap the zap.Logger with componentattribute so scope attributes
// can be added and removed dynamically, and tee logs to the
// LoggerProvider.
loggerProvider := telProviders.LoggerProvider()
logger = logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
core = componentattribute.NewConsoleCoreWithAttributes(core, attribute.NewSet())
core = componentattribute.NewOTelTeeCoreWithAttributes(
@ -179,23 +169,11 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
return core
}))
tracerProvider, err := telFactory.CreateTracerProvider(ctx, telset, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create tracer provider: %w", err)
}
logger.Info("Setting up own telemetry...")
mp, err := telFactory.CreateMeterProvider(ctx, telset, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create meter provider: %w", err)
}
srv.telemetrySettings = component.TelemetrySettings{
Logger: logger,
MeterProvider: mp,
TracerProvider: tracerProvider,
// Construct telemetry attributes from build info and config's resource attributes.
Resource: pcommonRes,
MeterProvider: telProviders.MeterProvider(),
TracerProvider: telProviders.TracerProvider(),
Resource: telProviders.Resource(),
}
srv.host.Reporter = status.NewReporter(srv.host.NotifyComponentStatusChange, func(err error) {
if errors.Is(err, status.ErrStatusNotReady) {
@ -220,25 +198,9 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
return nil, fmt.Errorf("failed to register process metrics: %w", err)
}
}
logsAboutMeterProvider(logger, cfg.Telemetry.Metrics, mp)
return srv, nil
}
func logsAboutMeterProvider(logger *zap.Logger, cfg otelconftelemetry.MetricsConfig, mp metric.MeterProvider) {
if cfg.Level == configtelemetry.LevelNone || len(cfg.Readers) == 0 {
logger.Info("Skipped telemetry setup.")
return
}
if lmp, ok := mp.(interface {
LogAboutServers(logger *zap.Logger, cfg otelconftelemetry.MetricsConfig)
}); ok {
lmp.LogAboutServers(logger, cfg)
}
}
// Start starts the extensions and pipelines. If Start fails Shutdown should be called to ensure a clean state.
// Start does the following steps in order:
// 1. Start all extensions.
@ -299,8 +261,8 @@ func (srv *Service) Shutdown(ctx context.Context) error {
srv.telemetrySettings.Logger.Info("Shutdown complete.")
if err := srv.sdk.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry: %w", err))
if err := srv.telemetryProviders.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry providers: %w", err))
}
return errs
@ -344,17 +306,6 @@ func (srv *Service) Logger() *zap.Logger {
return srv.telemetrySettings.Logger
}
func pdataFromSdk(res *sdkresource.Resource) pcommon.Resource {
// pcommon.NewResource is the best way to generate a new resource currently and is safe to use outside of tests.
// Because the resource is signal agnostic, and we need a net new resource, not an existing one, this is the only
// method of creating it without exposing internal packages.
pcommonRes := pcommon.NewResource()
for _, keyValue := range res.Attributes() {
pcommonRes.Attributes().PutStr(string(keyValue.Key), keyValue.Value.AsString())
}
return pcommonRes
}
func dropViewOption(selector *config.ViewSelector) config.View {
return config.View{
Selector: selector,

View File

@ -22,12 +22,12 @@ import (
func TestComponentConfigStruct(t *testing.T) {
require.NoError(t, componenttest.CheckConfigStruct(
NewFactory(nil, nil).CreateDefaultConfig(),
NewFactory().CreateDefaultConfig(),
))
}
func TestUnmarshalDefaultConfig(t *testing.T) {
factory := NewFactory(nil, nil)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
require.NoError(t, confmap.New().Unmarshal(&cfg))
assert.Equal(t, factory.CreateDefaultConfig(), cfg)

View File

@ -8,11 +8,6 @@ import (
"time"
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.opentelemetry.io/collector/component"
@ -28,7 +23,7 @@ var useLocalHostAsDefaultMetricsAddressFeatureGate = featuregate.GlobalRegistry(
featuregate.WithRegisterDescription("controls whether default Prometheus metrics server use localhost as the default host for their endpoints"),
)
// Factory is factory interface for telemetry.
// Factory is factory interface for telemetry providers.
// This interface cannot be directly implemented. Implementations must
// use the NewFactory to implement it.
//
@ -36,17 +31,10 @@ var useLocalHostAsDefaultMetricsAddressFeatureGate = featuregate.GlobalRegistry(
// See https://github.com/open-telemetry/opentelemetry-collector/issues/4970
type Factory interface {
// CreateDefaultConfig creates the default configuration for the telemetry.
// TODO: Should we just inherit from component.Factory?
CreateDefaultConfig() component.Config
// CreateLogger creates a logger.
CreateLogger(context.Context, telemetry.Settings, component.Config) (*zap.Logger, log.LoggerProvider, error)
// CreateTracerProvider creates a TracerProvider.
CreateTracerProvider(context.Context, telemetry.Settings, component.Config) (trace.TracerProvider, error)
// CreateMeterProvider creates a MeterProvider.
CreateMeterProvider(context.Context, telemetry.Settings, component.Config) (metric.MeterProvider, error)
// CreateProviders creates telemetry providers.
CreateProviders(context.Context, telemetry.Settings, component.Config) (telemetry.Providers, error)
// unexportedFactoryFunc is used to prevent external implementations of Factory.
unexportedFactoryFunc()
@ -56,24 +44,8 @@ type Factory interface {
//
// NOTE This API is experimental and will change soon - use at your own risk.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/4970
//
// TODO remove the parameters once the factory is fully self-contained
// and is responsible for creating the SDK and resource itself.
func NewFactory(sdk *config.SDK, res *resource.Resource) Factory {
return newFactory(createDefaultConfig,
withLogger(func(_ context.Context, set telemetry.Settings, cfg component.Config) (*zap.Logger, log.LoggerProvider, error) {
c := *cfg.(*Config)
return newLogger(set, c, sdk, res)
}),
withTracerProvider(func(_ context.Context, _ telemetry.Settings, cfg component.Config) (trace.TracerProvider, error) {
c := *cfg.(*Config)
return newTracerProvider(c, sdk)
}),
withMeterProvider(func(_ context.Context, _ telemetry.Settings, cfg component.Config) (metric.MeterProvider, error) {
c := *cfg.(*Config)
return newMeterProvider(c, sdk)
}),
)
func NewFactory() Factory {
return newFactory(createDefaultConfig, createProviders)
}
func createDefaultConfig() component.Config {

View File

@ -6,14 +6,6 @@ package otelconftelemetry // import "go.opentelemetry.io/collector/service/telem
import (
"context"
"go.opentelemetry.io/otel/log"
lognoop "go.opentelemetry.io/otel/log/noop"
"go.opentelemetry.io/otel/metric"
metricnoop "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/trace"
tracenoop "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/telemetry"
)
@ -38,72 +30,27 @@ var _ Factory = (*factory)(nil)
// Factory is the implementation of Factory.
type factory struct {
createDefaultConfig component.CreateDefaultConfigFunc
createLoggerFunc
createTracerProviderFunc
createMeterProviderFunc
createProvidersFunc
}
func (f *factory) CreateDefaultConfig() component.Config {
return f.createDefaultConfig()
}
// createLoggerFunc is the equivalent of Factory.CreateLogger.
type createLoggerFunc func(context.Context, telemetry.Settings, component.Config) (*zap.Logger, log.LoggerProvider, error)
// createProvidersFunc is the equivalent of Factory.CreateProviders.
type createProvidersFunc func(context.Context, telemetry.Settings, component.Config) (telemetry.Providers, error)
// withLogger overrides the default no-op logger.
func withLogger(createLogger createLoggerFunc) factoryOption {
return factoryOptionFunc(func(o *factory) {
o.createLoggerFunc = createLogger
})
}
func (f *factory) CreateLogger(ctx context.Context, set telemetry.Settings, cfg component.Config) (*zap.Logger, log.LoggerProvider, error) {
if f.createLoggerFunc == nil {
return zap.NewNop(), lognoop.NewLoggerProvider(), nil
}
return f.createLoggerFunc(ctx, set, cfg)
}
// createTracerProviderFunc is the equivalent of Factory.CreateTracerProvider.
type createTracerProviderFunc func(context.Context, telemetry.Settings, component.Config) (trace.TracerProvider, error)
// withTracerProvider overrides the default no-op tracer provider.
func withTracerProvider(createTracerProvider createTracerProviderFunc) factoryOption {
return factoryOptionFunc(func(o *factory) {
o.createTracerProviderFunc = createTracerProvider
})
}
func (f *factory) CreateTracerProvider(ctx context.Context, set telemetry.Settings, cfg component.Config) (trace.TracerProvider, error) {
if f.createTracerProviderFunc == nil {
return tracenoop.NewTracerProvider(), nil
}
return f.createTracerProviderFunc(ctx, set, cfg)
}
// createMeterProviderFunc is the equivalent of Factory.CreateMeterProvider.
type createMeterProviderFunc func(context.Context, telemetry.Settings, component.Config) (metric.MeterProvider, error)
// withMeterProvider overrides the default no-op meter provider.
func withMeterProvider(createMeterProvider createMeterProviderFunc) factoryOption {
return factoryOptionFunc(func(o *factory) {
o.createMeterProviderFunc = createMeterProvider
})
}
func (f *factory) CreateMeterProvider(ctx context.Context, set telemetry.Settings, cfg component.Config) (metric.MeterProvider, error) {
if f.createMeterProviderFunc == nil {
return metricnoop.NewMeterProvider(), nil
}
return f.createMeterProviderFunc(ctx, set, cfg)
func (f *factory) CreateProviders(ctx context.Context, set telemetry.Settings, cfg component.Config) (telemetry.Providers, error) {
return f.createProvidersFunc(ctx, set, cfg)
}
func (f *factory) unexportedFactoryFunc() {}
// newFactory returns a new Factory.
func newFactory(createDefaultConfig component.CreateDefaultConfigFunc, options ...factoryOption) Factory {
func newFactory(createDefaultConfig component.CreateDefaultConfigFunc, createProviders createProvidersFunc, options ...factoryOption) Factory {
f := &factory{
createDefaultConfig: createDefaultConfig,
createProvidersFunc: createProviders,
}
for _, op := range options {
op.applyTelemetryFactoryOption(f)

View File

@ -4,19 +4,13 @@
package otelconftelemetry
import (
"context"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.uber.org/zap/zapcore"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service/telemetry"
)
func TestDefaultConfig(t *testing.T) {
@ -42,133 +36,10 @@ func TestDefaultConfig(t *testing.T) {
// Restore previous value.
require.NoError(t, featuregate.GlobalRegistry().Set(useLocalHostAsDefaultMetricsAddressFeatureGate.ID(), prev))
}()
cfg := NewFactory(nil, nil).CreateDefaultConfig()
cfg := NewFactory().CreateDefaultConfig()
require.Len(t, cfg.(*Config).Metrics.Readers, 1)
assert.Equal(t, tt.expected, *cfg.(*Config).Metrics.Readers[0].Pull.Exporter.Prometheus.Host)
assert.Equal(t, 8888, *cfg.(*Config).Metrics.Readers[0].Pull.Exporter.Prometheus.Port)
})
}
}
func TestTelemetryConfiguration(t *testing.T) {
tests := []struct {
name string
cfg *Config
success bool
}{
{
name: "Valid config",
cfg: &Config{
Logs: LogsConfig{
Level: zapcore.DebugLevel,
Encoding: "console",
},
Metrics: MetricsConfig{
Level: configtelemetry.LevelBasic,
MeterProvider: config.MeterProvider{
Readers: []config.MetricReader{
{
Pull: &config.PullMetricReader{Exporter: config.PullMetricExporter{Prometheus: &config.Prometheus{
Host: ptr("127.0.0.1"),
Port: ptr(3333),
}}},
},
},
},
},
},
success: true,
},
{
name: "Invalid config",
cfg: &Config{
Logs: LogsConfig{
Level: zapcore.DebugLevel,
},
Metrics: MetricsConfig{
Level: configtelemetry.LevelBasic,
MeterProvider: config.MeterProvider{
Readers: []config.MetricReader{
{
Pull: &config.PullMetricReader{Exporter: config.PullMetricExporter{Prometheus: &config.Prometheus{
Host: ptr("127.0.0.1"),
Port: ptr(3333),
}}},
},
},
},
},
},
success: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
f := NewFactory(nil, nil)
set := telemetry.Settings{}
logger, _, err := f.CreateLogger(context.Background(), set, tt.cfg)
if tt.success {
require.NoError(t, err)
assert.NotNil(t, logger)
} else {
require.Error(t, err)
assert.Nil(t, logger)
}
})
}
}
func TestSampledLogger(t *testing.T) {
tests := []struct {
name string
cfg *Config
}{
{
name: "Default sampling",
cfg: &Config{
Logs: LogsConfig{
Encoding: "console",
},
},
},
{
name: "Custom sampling",
cfg: &Config{
Logs: LogsConfig{
Level: zapcore.DebugLevel,
Encoding: "console",
Sampling: &LogsSamplingConfig{
Enabled: true,
Tick: 1 * time.Second,
Initial: 100,
Thereafter: 100,
},
},
},
},
{
name: "Disable sampling",
cfg: &Config{
Logs: LogsConfig{
Level: zapcore.DebugLevel,
Encoding: "console",
Sampling: &LogsSamplingConfig{
Enabled: false,
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
f := NewFactory(nil, nil)
ctx := context.Background()
set := telemetry.Settings{}
logger, _, err := f.CreateLogger(ctx, set, tt.cfg)
require.NoError(t, err)
assert.NotNil(t, logger)
})
}
}

View File

@ -6,7 +6,6 @@ package otelconftelemetry // import "go.opentelemetry.io/collector/service/telem
import (
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/noop"
"go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -15,7 +14,7 @@ import (
)
// newLogger creates a Logger and a LoggerProvider from Config.
func newLogger(set telemetry.Settings, cfg Config, sdk *config.SDK, res *resource.Resource) (*zap.Logger, log.LoggerProvider, error) {
func newLogger(set telemetry.Settings, cfg *Config, sdk *config.SDK, res *resource.Resource) (*zap.Logger, log.LoggerProvider, error) {
// Copied from NewProductionConfig.
ec := zap.NewProductionEncoderConfig()
ec.EncodeTime = zapcore.ISO8601TimeEncoder
@ -71,11 +70,6 @@ func newLogger(set telemetry.Settings, cfg Config, sdk *config.SDK, res *resourc
}))
}
var lp log.LoggerProvider
if sdk != nil {
lp = sdk.LoggerProvider()
} else {
lp = noop.NewLoggerProvider()
}
lp := sdk.LoggerProvider()
return logger, lp, nil
}

View File

@ -16,15 +16,11 @@ import (
"github.com/stretchr/testify/require"
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel/log"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/service/internal/resource"
"go.opentelemetry.io/collector/service/telemetry"
)
@ -107,17 +103,14 @@ func TestNewLogger(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
buildInfo := component.BuildInfo{}
sdk, err := NewSDK(context.Background(), &tt.cfg, resource.New(buildInfo, nil))
require.NoError(t, err)
defer func() {
require.NoError(t, sdk.Shutdown(context.Background()))
}()
factory := NewFactory()
_, _, err = newLogger(telemetry.Settings{}, tt.cfg, sdk, nil)
providers, err := factory.CreateProviders(context.Background(), telemetry.Settings{BuildInfo: buildInfo}, &tt.cfg)
if tt.wantErr != nil {
require.ErrorContains(t, err, tt.wantErr.Error())
} else {
require.NoError(t, err)
require.NoError(t, providers.Shutdown(context.Background()))
}
})
}
@ -193,35 +186,26 @@ func TestNewLoggerWithResource(t *testing.T) {
name: "resource with no attributes",
buildInfo: component.BuildInfo{},
resourceConfig: nil,
wantFields: nil,
wantFields: map[string]string{
// A random UUID is injected for service.instance.id by default
string(semconv.ServiceInstanceIDKey): "", // Just check presence
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
observerCore, observedLogs := observer.New(zap.InfoLevel)
set := telemetry.Settings{
ZapOptions: []zap.Option{
zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewTee(core, observerCore)
}),
},
}
var res *sdkresource.Resource
if tt.wantFields != nil {
res = resource.New(tt.buildInfo, tt.resourceConfig)
}
set := telemetry.Settings{BuildInfo: tt.buildInfo}
cfg := Config{
Logs: LogsConfig{
Level: zapcore.InfoLevel,
Encoding: "json",
},
Resource: tt.resourceConfig,
}
mylogger, _, _ := newLogger(set, cfg, nil, res)
tel, observedLogs := newTelemetryProviders(t, set, &cfg)
mylogger := tel.Logger()
mylogger.Info("Test log message")
require.Len(t, observedLogs.All(), 1)
@ -317,22 +301,15 @@ func newOTLPLoggerProvider(t *testing.T, level zapcore.Level, handler http.Handl
Encoding: "json",
Processors: processors,
},
Resource: map[string]*string{
string(semconv.ServiceNameKey): ptr(service),
string(semconv.ServiceVersionKey): ptr(version),
testAttribute: ptr(testValue),
},
}
buildInfo := component.BuildInfo{}
res := resource.New(buildInfo, map[string]*string{
string(semconv.ServiceNameKey): ptr(service),
string(semconv.ServiceVersionKey): ptr(version),
testAttribute: ptr(testValue),
})
sdk, err := NewSDK(context.Background(), &cfg, res)
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, sdk.Shutdown(context.Background()))
})
_, lp, err := newLogger(telemetry.Settings{}, cfg, sdk, res)
require.NoError(t, err)
tel, _ := newTelemetryProviders(t, telemetry.Settings{}, &cfg)
lp := tel.LoggerProvider()
require.NotNil(t, lp)
return lp
}

View File

@ -1,26 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otelconftelemetry // import "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"
import (
"errors"
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/collector/config/configtelemetry"
)
// newMeterProvider creates a new MeterProvider from Config.
func newMeterProvider(cfg Config, sdk *config.SDK) (metric.MeterProvider, error) {
if cfg.Metrics.Level == configtelemetry.LevelNone || len(cfg.Metrics.Readers) == 0 {
return noop.NewMeterProvider(), nil
}
if sdk != nil {
return sdk.MeterProvider(), nil
}
return nil, errors.New("no sdk set")
}

View File

@ -19,10 +19,9 @@ import (
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/service/internal/promtest"
"go.opentelemetry.io/collector/service/internal/resource"
"go.opentelemetry.io/collector/service/telemetry"
)
const (
@ -93,32 +92,27 @@ func TestTelemetryInit(t *testing.T) {
} {
prom := promtest.GetAvailableLocalAddressPrometheus(t)
endpoint := fmt.Sprintf("http://%s:%d/metrics", *prom.Host, *prom.Port)
cfg := Config{
Metrics: MetricsConfig{
Level: configtelemetry.LevelDetailed,
MeterProvider: config.MeterProvider{
Readers: []config.MetricReader{{
Pull: &config.PullMetricReader{
Exporter: config.PullMetricExporter{Prometheus: prom},
},
}},
},
cfg := createDefaultConfig().(*Config)
cfg.Metrics = MetricsConfig{
Level: configtelemetry.LevelDetailed,
MeterProvider: config.MeterProvider{
Readers: []config.MetricReader{{
Pull: &config.PullMetricReader{
Exporter: config.PullMetricExporter{Prometheus: prom},
},
}},
},
}
t.Run(tt.name, func(t *testing.T) {
res := resource.New(component.BuildInfo{}, map[string]*string{
string(semconv.ServiceNameKey): ptr("otelcol"),
string(semconv.ServiceVersionKey): ptr("latest"),
string(semconv.ServiceInstanceIDKey): ptr(testInstanceID),
})
sdk, err := NewSDK(context.Background(), &cfg, res)
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, sdk.Shutdown(context.Background()))
})
cfg.Resource = map[string]*string{
string(semconv.ServiceNameKey): ptr("otelcol"),
string(semconv.ServiceVersionKey): ptr("latest"),
string(semconv.ServiceInstanceIDKey): ptr(testInstanceID),
}
mp, err := newMeterProvider(cfg, sdk)
require.NoError(t, err)
t.Run(tt.name, func(t *testing.T) {
providers, _ := newTelemetryProviders(t, telemetry.Settings{}, cfg)
mp := providers.MeterProvider()
createTestMetrics(t, mp)
@ -203,16 +197,14 @@ func TestTelemetryMetricsDisabled(t *testing.T) {
Periodic: &config.PeriodicMetricReader{Exporter: config.PushMetricExporter{OTLP: &config.OTLPMetric{}}},
}}
res := resource.New(component.BuildInfo{}, nil)
_, err := NewSDK(context.Background(), cfg, res)
require.EqualError(t, err, "no valid metric exporter")
factory := NewFactory()
_, err := factory.CreateProviders(context.Background(), telemetry.Settings{}, cfg)
require.EqualError(t, err, "failed to create SDK: no valid metric exporter")
// Setting Metrics.Level to LevelNone disables metrics,
// so the invalid configuration should not cause an error.
cfg.Metrics.Level = configtelemetry.LevelNone
sdk, err := NewSDK(context.Background(), cfg, res)
require.NoError(t, err)
assert.NoError(t, sdk.Shutdown(context.Background()))
_, _ = newTelemetryProviders(t, telemetry.Settings{}, cfg)
}
// Test that the MeterProvider implements the 'Enabled' functionality.
@ -224,15 +216,8 @@ func TestInstrumentEnabled(t *testing.T) {
Pull: &config.PullMetricReader{Exporter: config.PullMetricExporter{Prometheus: prom}},
}}
sdk, err := NewSDK(context.Background(), cfg, resource.New(component.BuildInfo{}, nil))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, sdk.Shutdown(context.Background()))
})
require.NoError(t, err)
meterProvider, err := newMeterProvider(*cfg, sdk)
require.NoError(t, err)
providers, _ := newTelemetryProviders(t, telemetry.Settings{}, cfg)
meterProvider := providers.MeterProvider()
meter := meterProvider.Meter("go.opentelemetry.io/collector/service/telemetry")

View File

@ -1,50 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otelconftelemetry // import "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"
import (
"context"
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/collector/config/configtelemetry"
)
// NewSDK creates a new OpenTelemetry SDK with the provided configuration.
func NewSDK(ctx context.Context, cfg *Config, res *resource.Resource) (*config.SDK, error) {
mpConfig := cfg.Metrics.MeterProvider
if cfg.Metrics.Level == configtelemetry.LevelNone {
mpConfig.Readers = nil
}
var resourceAttrs []config.AttributeNameValue
for _, r := range res.Attributes() {
resourceAttrs = append(resourceAttrs, config.AttributeNameValue{
Name: string(r.Key), Value: r.Value.AsString(),
})
}
sdk, err := config.NewSDK(
config.WithContext(ctx),
config.WithOpenTelemetryConfiguration(
config.OpenTelemetryConfiguration{
LoggerProvider: &config.LoggerProvider{
Processors: cfg.Logs.Processors,
},
MeterProvider: &mpConfig,
TracerProvider: &cfg.Traces.TracerProvider,
Resource: &config.Resource{
SchemaUrl: ptr(semconv.SchemaURL),
Attributes: resourceAttrs,
},
},
),
)
if err != nil {
return nil, err
}
return &sdk, nil
}

View File

@ -0,0 +1,158 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otelconftelemetry // import "go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"
import (
"context"
"fmt"
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/service/internal/resource"
"go.opentelemetry.io/collector/service/telemetry"
)
type otelconfTelemetry struct {
sdk *config.SDK
resource pcommon.Resource
logger *zap.Logger
loggerProvider log.LoggerProvider
meterProvider metric.MeterProvider
tracerProvider trace.TracerProvider
}
func createProviders(
ctx context.Context, set telemetry.Settings, componentConfig component.Config,
) (_ telemetry.Providers, resultErr error) {
cfg := componentConfig.(*Config)
res := resource.New(set.BuildInfo, cfg.Resource)
sdk, err := newSDK(ctx, set, cfg, res)
if err != nil {
return nil, fmt.Errorf("failed to create SDK: %w", err)
}
defer func() {
if resultErr != nil {
resultErr = multierr.Append(resultErr, sdk.Shutdown(ctx))
}
}()
logger, loggerProvider, err := newLogger(set, cfg, sdk, res)
if err != nil {
return nil, fmt.Errorf("failed to create logger provider: %w", err)
}
var meterProvider metric.MeterProvider
if cfg.Metrics.Level == configtelemetry.LevelNone {
logger.Info("Internal metrics telemetry disabled")
meterProvider = noopmetric.NewMeterProvider()
} else {
meterProvider = sdk.MeterProvider()
}
var tracerProvider trace.TracerProvider
if cfg.Traces.Level == configtelemetry.LevelNone {
logger.Info("Internal trace telemetry disabled")
tracerProvider = &noopNoContextTracerProvider{}
} else {
propagator, err := textMapPropagatorFromConfig(cfg.Traces.Propagators)
if err != nil {
return nil, fmt.Errorf("error creating propagator: %w", err)
}
otel.SetTextMapPropagator(propagator)
tracerProvider = sdk.TracerProvider()
}
pcommonRes := pcommon.NewResource()
for _, keyValue := range res.Attributes() {
pcommonRes.Attributes().PutStr(string(keyValue.Key), keyValue.Value.AsString())
}
return &otelconfTelemetry{
sdk: sdk,
resource: pcommonRes,
logger: logger,
loggerProvider: loggerProvider,
meterProvider: meterProvider,
tracerProvider: tracerProvider,
}, nil
}
func (t *otelconfTelemetry) Resource() pcommon.Resource {
return t.resource
}
func (t *otelconfTelemetry) Logger() *zap.Logger {
return t.logger
}
func (t *otelconfTelemetry) LoggerProvider() log.LoggerProvider {
return t.loggerProvider
}
func (t *otelconfTelemetry) MeterProvider() metric.MeterProvider {
return t.meterProvider
}
func (t *otelconfTelemetry) TracerProvider() trace.TracerProvider {
return t.tracerProvider
}
func (t *otelconfTelemetry) Shutdown(ctx context.Context) error {
if t.sdk != nil {
err := t.sdk.Shutdown(ctx)
t.sdk = nil
if err != nil {
return fmt.Errorf("failed to shutdown SDK: %w", err)
}
}
return nil
}
func newSDK(ctx context.Context, _ telemetry.Settings, cfg *Config, res *sdkresource.Resource) (*config.SDK, error) {
mpConfig := cfg.Metrics.MeterProvider
if cfg.Metrics.Level == configtelemetry.LevelNone {
mpConfig.Readers = nil
}
// Merge the BuildInfo-based resource attributes with the user-defined resource attributes.
resourceAttrs := make([]config.AttributeNameValue, 0, res.Len())
for _, r := range res.Attributes() {
resourceAttrs = append(resourceAttrs, config.AttributeNameValue{
Name: string(r.Key), Value: r.Value.AsString(),
})
}
sdk, err := config.NewSDK(
config.WithContext(ctx),
config.WithOpenTelemetryConfiguration(
config.OpenTelemetryConfiguration{
LoggerProvider: &config.LoggerProvider{
Processors: cfg.Logs.Processors,
},
MeterProvider: &mpConfig,
TracerProvider: &cfg.Traces.TracerProvider,
Resource: &config.Resource{
SchemaUrl: ptr(semconv.SchemaURL),
Attributes: resourceAttrs,
},
},
),
)
if err != nil {
return nil, err
}
return &sdk, nil
}

View File

@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otelconftelemetry
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"go.opentelemetry.io/collector/service/telemetry"
)
func newTelemetryProviders(t *testing.T, set telemetry.Settings, cfg *Config) (telemetry.Providers, *observer.ObservedLogs) {
t.Helper()
core, observedLogs := observer.New(zapcore.DebugLevel)
set.ZapOptions = append(set.ZapOptions, zap.WrapCore(func(zapcore.Core) zapcore.Core { return core }))
if len(cfg.Metrics.Readers) == 1 && cfg.Metrics.Readers[0].Pull != nil &&
cfg.Metrics.Readers[0].Pull.Exporter.Prometheus != nil &&
cfg.Metrics.Readers[0].Pull.Exporter.Prometheus.Port != nil &&
*cfg.Metrics.Readers[0].Pull.Exporter.Prometheus.Port == 8888 {
// Replace the default port with 0 to bind to an ephemeral port,
// avoiding flaky tests.
*cfg.Metrics.Readers[0].Pull.Exporter.Prometheus.Port = 0
}
factory := NewFactory()
providers, err := factory.CreateProviders(context.Background(), set, cfg)
require.NoError(t, err)
require.NotNil(t, providers)
t.Cleanup(func() {
assert.NoError(t, providers.Shutdown(context.Background()))
})
return providers, observedLogs
}

View File

@ -7,15 +7,12 @@ import (
"context"
"errors"
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/embedded"
"go.opentelemetry.io/otel/trace/noop"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
)
@ -51,24 +48,6 @@ func (n *noopNoContextTracerProvider) Tracer(_ string, _ ...trace.TracerOption)
return &noopNoContextTracer{}
}
// newTracerProvider creates a new TracerProvider from Config.
func newTracerProvider(cfg Config, sdk *config.SDK) (trace.TracerProvider, error) {
if cfg.Traces.Level == configtelemetry.LevelNone {
return &noopNoContextTracerProvider{}, nil
}
if tp, err := textMapPropagatorFromConfig(cfg.Traces.Propagators); err == nil {
otel.SetTextMapPropagator(tp)
} else {
return nil, err
}
if sdk != nil {
return sdk.TracerProvider(), nil
}
return nil, errors.New("no sdk set")
}
func textMapPropagatorFromConfig(props []string) (propagation.TextMapPropagator, error) {
var textMapPropagators []propagation.TextMapPropagator
for _, prop := range props {

View File

@ -20,7 +20,7 @@ import (
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.opentelemetry.io/collector/service/internal/resource"
"go.opentelemetry.io/collector/service/telemetry"
)
func TestTracerProvider(t *testing.T) {
@ -42,16 +42,9 @@ func TestTracerProvider(t *testing.T) {
cfg.Traces.Processors = []config.SpanProcessor{newOTLPSimpleSpanProcessor(srv)}
buildInfo := component.BuildInfo{Command: "otelcol", Version: "latest"}
sdk, err := NewSDK(context.Background(), cfg, resource.New(buildInfo, cfg.Resource))
require.NoError(t, err)
defer func() {
assert.NoError(t, sdk.Shutdown(context.Background()))
}()
provider, err := newTracerProvider(*cfg, sdk)
require.NoError(t, err)
require.NotNil(t, provider)
providers, _ := newTelemetryProviders(t, telemetry.Settings{BuildInfo: buildInfo}, cfg)
provider := providers.TracerProvider()
tracer := provider.Tracer("test_tracer")
_, span := tracer.Start(context.Background(), "test_span")
span.End()
@ -73,14 +66,9 @@ func TestTelemetry_TracerProvider_Propagators(t *testing.T) {
cfg.Traces.Processors = []config.SpanProcessor{newOTLPSimpleSpanProcessor(srv)}
buildInfo := component.BuildInfo{Command: "otelcol", Version: "latest"}
sdk, err := NewSDK(context.Background(), cfg, resource.New(buildInfo, cfg.Resource))
require.NoError(t, err)
defer func() {
assert.NoError(t, sdk.Shutdown(context.Background()))
}()
providers, _ := newTelemetryProviders(t, telemetry.Settings{BuildInfo: buildInfo}, cfg)
provider, err := newTracerProvider(*cfg, sdk)
require.NoError(t, err)
provider := providers.TracerProvider()
propagator := otel.GetTextMapPropagator()
require.NotNil(t, propagator)
@ -111,14 +99,9 @@ func TestTelemetry_TracerProviderDisabled(t *testing.T) {
}
buildInfo := component.BuildInfo{Command: "otelcol", Version: "latest"}
sdk, err := NewSDK(context.Background(), cfg, resource.New(buildInfo, cfg.Resource))
require.NoError(t, err)
defer func() {
assert.NoError(t, sdk.Shutdown(context.Background()))
}()
providers, _ := newTelemetryProviders(t, telemetry.Settings{BuildInfo: buildInfo}, cfg)
provider, err := newTracerProvider(*cfg, sdk)
require.NoError(t, err)
provider := providers.TracerProvider()
tracer := provider.Tracer("test_tracer")
_, span := tracer.Start(context.Background(), "test_span")
span.End()
@ -126,7 +109,7 @@ func TestTelemetry_TracerProviderDisabled(t *testing.T) {
}
t.Run("level_none", func(t *testing.T) {
cfg := &Config{}
cfg := createDefaultConfig().(*Config)
cfg.Traces.Level = configtelemetry.LevelNone
test(t, cfg)
})

View File

@ -4,23 +4,67 @@
package telemetry // import "go.opentelemetry.io/collector/service/telemetry"
import (
"context"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/service/telemetry/internal/migration"
)
// Settings holds configuration for building Telemetry.
type Settings struct {
BuildInfo component.BuildInfo
ZapOptions []zap.Option
}
// TODO create abstract Telemetry interface and Factory interfaces
// that are implemented by otelconftelemetry.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/4970
// NOTE TracesConfig will be removed once opentelemetry-collector-contrib
// has been updated to use otelconftelemetry instead; use at your own risk.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/4970
type TracesConfig = migration.TracesConfigV030
// Providers is an interface for internal telemetry providers.
//
// NOTE this interface is experimental and may change in the future.
type Providers interface {
// Shutdown gracefully shuts down the telemetry providers.
Shutdown(context.Context) error
// Resource returns a pcommon.Resource representing the collector.
// This may be used by components in their internal telemetry.
Resource() pcommon.Resource
// Logger returns a zap.Logger that may be used by components to
// log their internal operations.
//
// NOTE: from the perspective of the Telemetry implementation,
// this Logger and the LoggerProvider are independent. However,
// the service package will arrange for logs written to this
// logger to be copied to the LoggerProvider. The effective
// level of the logger will be the lower of the Logger's and
// the LoggerProvider's levels.
Logger() *zap.Logger
// LoggerProvider returns a log.LoggerProvider that may be used
// for components to log their internal operations.
LoggerProvider() log.LoggerProvider
// MeterProvider returns a metric.MeterProvider that may be used
// by components to record metrics relating to their internal
// operations.
MeterProvider() metric.MeterProvider
// TracerProvider returns a trace.TracerProvider that may be used
// by components to trace their internal operations.
TracerProvider() trace.TracerProvider
}
// Settings holds configuration for building Telemetry.
type Settings struct {
// BuildInfo contains build information about the collector.
BuildInfo component.BuildInfo
// ZapOptions contains options for creating the zap logger.
ZapOptions []zap.Option
}
// TODO create abstract Factory interface that is implemented by otelconftelemetry.
// See https://github.com/open-telemetry/opentelemetry-collector/issues/4970