// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 //go:generate mdatagen metadata.yaml package service // import "go.opentelemetry.io/collector/service" import ( "context" "errors" "fmt" "runtime" config "go.opentelemetry.io/contrib/otelconf/v0.3.0" "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/metric" noopmetric "go.opentelemetry.io/otel/metric/noop" sdkresource "go.opentelemetry.io/otel/sdk/resource" semconv118 "go.opentelemetry.io/otel/semconv/v1.18.0" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" nooptrace "go.opentelemetry.io/otel/trace/noop" "go.uber.org/multierr" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/graph" "go.opentelemetry.io/collector/service/internal/moduleinfo" "go.opentelemetry.io/collector/service/internal/proctelemetry" "go.opentelemetry.io/collector/service/internal/resource" "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/telemetry" ) // disableHighCardinalityMetricsFeatureGate is the feature gate that controls whether the collector should enable // potentially high cardinality metrics. The gate will be removed when the collector allows for view configuration. var disableHighCardinalityMetricsFeatureGate = featuregate.GlobalRegistry().MustRegister( "telemetry.disableHighCardinalityMetrics", featuregate.StageAlpha, featuregate.WithRegisterDescription("controls whether the collector should enable potentially high"+ "cardinality metrics. The gate will be removed when the collector allows for view configuration.")) // ModuleInfo describes the Go module for a particular component. type ModuleInfo = moduleinfo.ModuleInfo // ModuleInfos describes the go module for all components. type ModuleInfos = moduleinfo.ModuleInfos // Settings holds configuration for building a new Service. type Settings struct { // BuildInfo provides collector start information. BuildInfo component.BuildInfo // CollectorConf contains the Collector's current configuration CollectorConf *confmap.Conf // Receivers configuration to its builder. ReceiversConfigs map[component.ID]component.Config ReceiversFactories map[component.Type]receiver.Factory // Processors configuration to its builder. ProcessorsConfigs map[component.ID]component.Config ProcessorsFactories map[component.Type]processor.Factory // exporters configuration to its builder. ExportersConfigs map[component.ID]component.Config ExportersFactories map[component.Type]exporter.Factory // Connectors configuration to its builder. ConnectorsConfigs map[component.ID]component.Config ConnectorsFactories map[component.Type]connector.Factory // Extensions builder for extensions. Extensions builders.Extension // Extensions configuration to its builder. ExtensionsConfigs map[component.ID]component.Config ExtensionsFactories map[component.Type]extension.Factory // ModuleInfo describes the go module for each component. ModuleInfos ModuleInfos // AsyncErrorChannel is the channel that is used to report fatal errors. AsyncErrorChannel chan error // LoggingOptions provides a way to change behavior of zap logging. LoggingOptions []zap.Option } // Service represents the implementation of a component.Host. type Service struct { buildInfo component.BuildInfo telemetrySettings component.TelemetrySettings host *graph.Host collectorConf *confmap.Conf loggerProvider log.LoggerProvider } // New creates a new Service, its telemetry, and Components. func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { srv := &Service{ buildInfo: set.BuildInfo, host: &graph.Host{ Receivers: builders.NewReceiver(set.ReceiversConfigs, set.ReceiversFactories), Processors: builders.NewProcessor(set.ProcessorsConfigs, set.ProcessorsFactories), Exporters: builders.NewExporter(set.ExportersConfigs, set.ExportersFactories), Connectors: builders.NewConnector(set.ConnectorsConfigs, set.ConnectorsFactories), Extensions: builders.NewExtension(set.ExtensionsConfigs, set.ExtensionsFactories), ModuleInfos: set.ModuleInfos, BuildInfo: set.BuildInfo, AsyncErrorChannel: set.AsyncErrorChannel, }, collectorConf: set.CollectorConf, } // Fetch data for internal telemetry like instance id and sdk version to provide for internal telemetry. res := resource.New(set.BuildInfo, cfg.Telemetry.Resource) pcommonRes := pdataFromSdk(res) sch := semconv.SchemaURL mpConfig := &cfg.Telemetry.Metrics.MeterProvider if mpConfig.Views != nil { if disableHighCardinalityMetricsFeatureGate.IsEnabled() { return nil, errors.New("telemetry.disableHighCardinalityMetrics gate is incompatible with service::telemetry::metrics::views") } } else { mpConfig.Views = configureViews(cfg.Telemetry.Metrics.Level) } if cfg.Telemetry.Metrics.Level == configtelemetry.LevelNone { mpConfig.Readers = []config.MetricReader{} } sdk, err := config.NewSDK( config.WithContext(ctx), config.WithOpenTelemetryConfiguration( config.OpenTelemetryConfiguration{ LoggerProvider: &config.LoggerProvider{ Processors: cfg.Telemetry.Logs.Processors, }, MeterProvider: mpConfig, TracerProvider: &config.TracerProvider{ Processors: cfg.Telemetry.Traces.Processors, }, Resource: &config.Resource{ SchemaUrl: &sch, Attributes: attributes(res, cfg.Telemetry), }, }, ), ) if err != nil { return nil, fmt.Errorf("failed to create SDK: %w", err) } telFactory := telemetry.NewFactory() telset := telemetry.Settings{ AsyncErrorChannel: set.AsyncErrorChannel, BuildInfo: set.BuildInfo, ZapOptions: set.LoggingOptions, SDK: &sdk, Resource: res, } logger, lp, err := telFactory.CreateLogger(ctx, telset, &cfg.Telemetry) if err != nil { err = multierr.Append(err, sdk.Shutdown(ctx)) return nil, fmt.Errorf("failed to create logger: %w", err) } srv.loggerProvider = lp // Use initialized logger to handle any subsequent errors // https://github.com/open-telemetry/opentelemetry-collector/pull/13081 defer func() { if err != nil { logger.Error("error found during service initialization", zap.Error(err)) _ = sdk.Shutdown(ctx) } }() tracerProvider, err := telFactory.CreateTracerProvider(ctx, telset, &cfg.Telemetry) if err != nil { return nil, fmt.Errorf("failed to create tracer provider: %w", err) } logger.Info("Setting up own telemetry...") mp, err := telFactory.CreateMeterProvider(ctx, telset, &cfg.Telemetry) if err != nil { return nil, fmt.Errorf("failed to create meter provider: %w", err) } srv.telemetrySettings = component.TelemetrySettings{ Logger: logger, MeterProvider: mp, TracerProvider: tracerProvider, // Construct telemetry attributes from build info and config's resource attributes. Resource: pcommonRes, } srv.host.Reporter = status.NewReporter(srv.host.NotifyComponentStatusChange, func(err error) { if errors.Is(err, status.ErrStatusNotReady) { logger.Warn("Invalid transition", zap.Error(err)) } // ignore other errors as they represent invalid state transitions and are considered benign. }) if err = srv.initGraph(ctx, cfg); err != nil { return nil, err } // process the configuration and initialize the pipeline if err = srv.initExtensions(ctx, cfg.Extensions); err != nil { return nil, err } if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && len(mpConfig.Readers) != 0 { if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings); err != nil { return nil, fmt.Errorf("failed to register process metrics: %w", err) } } logsAboutMeterProvider(logger, cfg.Telemetry.Metrics, mp) return srv, nil } func logsAboutMeterProvider(logger *zap.Logger, cfg telemetry.MetricsConfig, mp metric.MeterProvider) { if cfg.Level == configtelemetry.LevelNone || len(cfg.Readers) == 0 { logger.Info("Skipped telemetry setup.") return } if lmp, ok := mp.(interface { LogAboutServers(logger *zap.Logger, cfg telemetry.MetricsConfig) }); ok { lmp.LogAboutServers(logger, cfg) } } // Start starts the extensions and pipelines. If Start fails Shutdown should be called to ensure a clean state. // Start does the following steps in order: // 1. Start all extensions. // 2. Notify extensions about Collector configuration // 3. Start all pipelines. // 4. Notify extensions that the pipeline is ready. func (srv *Service) Start(ctx context.Context) error { srv.telemetrySettings.Logger.Info("Starting "+srv.buildInfo.Command+"...", zap.String("Version", srv.buildInfo.Version), zap.Int("NumCPU", runtime.NumCPU()), ) if err := srv.host.ServiceExtensions.Start(ctx, srv.host); err != nil { return fmt.Errorf("failed to start extensions: %w", err) } if srv.collectorConf != nil { if err := srv.host.ServiceExtensions.NotifyConfig(ctx, srv.collectorConf); err != nil { return err } } if err := srv.host.Pipelines.StartAll(ctx, srv.host); err != nil { return fmt.Errorf("cannot start pipelines: %w", err) } if err := srv.host.ServiceExtensions.NotifyPipelineReady(); err != nil { return err } srv.telemetrySettings.Logger.Info("Everything is ready. Begin running and processing data.") return nil } func (srv *Service) shutdownTelemetry(ctx context.Context) error { // The metric.MeterProvider and trace.TracerProvider interfaces do not have a Shutdown method. // To shutdown the providers we try to cast to this interface, which matches the type signature used in the SDK. type shutdownable interface { Shutdown(context.Context) error } var err error if prov, ok := srv.telemetrySettings.MeterProvider.(shutdownable); ok { if shutdownErr := prov.Shutdown(ctx); shutdownErr != nil { err = multierr.Append(err, fmt.Errorf("failed to shutdown meter provider: %w", shutdownErr)) } } if prov, ok := srv.telemetrySettings.TracerProvider.(shutdownable); ok { if shutdownErr := prov.Shutdown(ctx); shutdownErr != nil { err = multierr.Append(err, fmt.Errorf("failed to shutdown tracer provider: %w", shutdownErr)) } } if prov, ok := srv.loggerProvider.(shutdownable); ok { if shutdownErr := prov.Shutdown(ctx); shutdownErr != nil { err = multierr.Append(err, fmt.Errorf("failed to shutdown logger provider: %w", shutdownErr)) } } return err } // Shutdown the service. Shutdown will do the following steps in order: // 1. Notify extensions that the pipeline is shutting down. // 2. Shutdown all pipelines. // 3. Shutdown all extensions. // 4. Shutdown telemetry. func (srv *Service) Shutdown(ctx context.Context) error { // Accumulate errors and proceed with shutting down remaining components. var errs error // Begin shutdown sequence. srv.telemetrySettings.Logger.Info("Starting shutdown...") if err := srv.host.ServiceExtensions.NotifyPipelineNotReady(); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err)) } if err := srv.host.Pipelines.ShutdownAll(ctx, srv.host.Reporter); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown pipelines: %w", err)) } if err := srv.host.ServiceExtensions.Shutdown(ctx); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown extensions: %w", err)) } srv.telemetrySettings.Logger.Info("Shutdown complete.") errs = multierr.Append(errs, srv.shutdownTelemetry(ctx)) return errs } // Creates extensions. func (srv *Service) initExtensions(ctx context.Context, cfg extensions.Config) error { var err error extensionsSettings := extensions.Settings{ Telemetry: srv.telemetrySettings, BuildInfo: srv.buildInfo, Extensions: srv.host.Extensions, } if srv.host.ServiceExtensions, err = extensions.New(ctx, extensionsSettings, cfg, extensions.WithReporter(srv.host.Reporter)); err != nil { return fmt.Errorf("failed to build extensions: %w", err) } return nil } // Creates the pipeline graph. func (srv *Service) initGraph(ctx context.Context, cfg Config) error { var err error if srv.host.Pipelines, err = graph.Build(ctx, graph.Settings{ Telemetry: srv.telemetrySettings, BuildInfo: srv.buildInfo, ReceiverBuilder: srv.host.Receivers, ProcessorBuilder: srv.host.Processors, ExporterBuilder: srv.host.Exporters, ConnectorBuilder: srv.host.Connectors, PipelineConfigs: cfg.Pipelines, ReportStatus: srv.host.Reporter.ReportStatus, }); err != nil { return fmt.Errorf("failed to build pipelines: %w", err) } return nil } // Logger returns the logger created for this service. // This is a temporary API that may be removed soon after investigating how the collector should record different events. func (srv *Service) Logger() *zap.Logger { return srv.telemetrySettings.Logger } func pdataFromSdk(res *sdkresource.Resource) pcommon.Resource { // pcommon.NewResource is the best way to generate a new resource currently and is safe to use outside of tests. // Because the resource is signal agnostic, and we need a net new resource, not an existing one, this is the only // method of creating it without exposing internal packages. pcommonRes := pcommon.NewResource() for _, keyValue := range res.Attributes() { pcommonRes.Attributes().PutStr(string(keyValue.Key), keyValue.Value.AsString()) } return pcommonRes } func dropViewOption(selector *config.ViewSelector) config.View { return config.View{ Selector: selector, Stream: &config.ViewStream{ Aggregation: &config.ViewStreamAggregation{ Drop: config.ViewStreamAggregationDrop{}, }, }, } } func configureViews(level configtelemetry.Level) []config.View { views := []config.View{} if level < configtelemetry.LevelDetailed { // Drop all otelhttp and otelgrpc metrics if the level is not detailed. views = append(views, dropViewOption(&config.ViewSelector{ MeterName: ptr("go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"), }), dropViewOption(&config.ViewSelector{ MeterName: ptr("go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"), }), ) } // Make sure to add the AttributeKeys view after the AggregationDrop view: // Only the first view outputting a given metric identity is actually used, so placing the // AttributeKeys view first would never drop the metrics regadless of level. if disableHighCardinalityMetricsFeatureGate.IsEnabled() { views = append(views, []config.View{ { Selector: &config.ViewSelector{ MeterName: ptr("go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"), }, Stream: &config.ViewStream{ AttributeKeys: &config.IncludeExclude{ Excluded: []string{ string(semconv118.NetSockPeerAddrKey), string(semconv118.NetSockPeerPortKey), string(semconv118.NetSockPeerNameKey), }, }, }, }, { Selector: &config.ViewSelector{ MeterName: ptr("go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"), }, Stream: &config.ViewStream{ AttributeKeys: &config.IncludeExclude{ Excluded: []string{ string(semconv118.NetHostNameKey), string(semconv118.NetHostPortKey), }, }, }, }, }...) } // otel-arrow library metrics // See https://github.com/open-telemetry/otel-arrow/blob/c39257/pkg/otel/arrow_record/consumer.go#L174-L176 if level < configtelemetry.LevelNormal { scope := ptr("otel-arrow/pkg/otel/arrow_record") views = append(views, dropViewOption(&config.ViewSelector{ MeterName: scope, InstrumentName: ptr("arrow_batch_records"), }), dropViewOption(&config.ViewSelector{ MeterName: scope, InstrumentName: ptr("arrow_schema_resets"), }), dropViewOption(&config.ViewSelector{ MeterName: scope, InstrumentName: ptr("arrow_memory_inuse"), }), ) } // contrib's internal/otelarrow/netstats metrics // See // - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L130 // - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L165 if level < configtelemetry.LevelDetailed { scope := ptr("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats") views = append(views, // Compressed size metrics. dropViewOption(&config.ViewSelector{ MeterName: scope, InstrumentName: ptr("otelcol_*_compressed_size"), }), dropViewOption(&config.ViewSelector{ MeterName: scope, InstrumentName: ptr("otelcol_*_compressed_size"), }), // makeRecvMetrics for exporters. dropViewOption(&config.ViewSelector{ MeterName: scope, InstrumentName: ptr("otelcol_exporter_recv"), }), dropViewOption(&config.ViewSelector{ MeterName: scope, InstrumentName: ptr("otelcol_exporter_recv_wire"), }), // makeSentMetrics for receivers. dropViewOption(&config.ViewSelector{ MeterName: scope, InstrumentName: ptr("otelcol_receiver_sent"), }), dropViewOption(&config.ViewSelector{ MeterName: scope, InstrumentName: ptr("otelcol_receiver_sent_wire"), }), ) } // Batch processor metrics scope := ptr("go.opentelemetry.io/collector/processor/batchprocessor") if level < configtelemetry.LevelNormal { views = append(views, dropViewOption(&config.ViewSelector{ MeterName: scope, })) } else if level < configtelemetry.LevelDetailed { views = append(views, dropViewOption(&config.ViewSelector{ MeterName: scope, InstrumentName: ptr("otelcol_processor_batch_batch_send_size_bytes"), })) } // Internal graph metrics graphScope := ptr("go.opentelemetry.io/collector/service") if level < configtelemetry.LevelDetailed { views = append(views, dropViewOption(&config.ViewSelector{ MeterName: graphScope, InstrumentName: ptr("otelcol.*.consumed.size"), })) views = append(views, dropViewOption(&config.ViewSelector{ MeterName: graphScope, InstrumentName: ptr("otelcol.*.produced.size"), })) } return views } func ptr[T any](v T) *T { return &v } // Validate verifies the graph by calling the internal graph.Build. func Validate(ctx context.Context, set Settings, cfg Config) error { tel := component.TelemetrySettings{ Logger: zap.NewNop(), TracerProvider: nooptrace.NewTracerProvider(), MeterProvider: noopmetric.NewMeterProvider(), Resource: pcommon.NewResource(), } _, err := graph.Build(ctx, graph.Settings{ Telemetry: tel, BuildInfo: set.BuildInfo, ReceiverBuilder: builders.NewReceiver(set.ReceiversConfigs, set.ReceiversFactories), ProcessorBuilder: builders.NewProcessor(set.ProcessorsConfigs, set.ProcessorsFactories), ExporterBuilder: builders.NewExporter(set.ExportersConfigs, set.ExportersFactories), ConnectorBuilder: builders.NewConnector(set.ConnectorsConfigs, set.ConnectorsFactories), PipelineConfigs: cfg.Pipelines, }) if err != nil { return fmt.Errorf("failed to build pipelines: %w", err) } return nil }