505 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			505 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Go
		
	
	
	
| // Copyright The OpenTelemetry Authors
 | |
| // SPDX-License-Identifier: Apache-2.0
 | |
| 
 | |
| //go:generate mdatagen metadata.yaml
 | |
| 
 | |
| package service // import "go.opentelemetry.io/collector/service"
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"runtime"
 | |
| 
 | |
| 	config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
 | |
| 	"go.opentelemetry.io/otel/attribute"
 | |
| 	"go.opentelemetry.io/otel/metric"
 | |
| 	noopmetric "go.opentelemetry.io/otel/metric/noop"
 | |
| 	sdkresource "go.opentelemetry.io/otel/sdk/resource"
 | |
| 	nooptrace "go.opentelemetry.io/otel/trace/noop"
 | |
| 	"go.uber.org/multierr"
 | |
| 	"go.uber.org/zap"
 | |
| 	"go.uber.org/zap/zapcore"
 | |
| 
 | |
| 	"go.opentelemetry.io/collector/component"
 | |
| 	"go.opentelemetry.io/collector/config/configtelemetry"
 | |
| 	"go.opentelemetry.io/collector/confmap"
 | |
| 	"go.opentelemetry.io/collector/connector"
 | |
| 	"go.opentelemetry.io/collector/exporter"
 | |
| 	"go.opentelemetry.io/collector/extension"
 | |
| 	"go.opentelemetry.io/collector/featuregate"
 | |
| 	"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
 | |
| 	"go.opentelemetry.io/collector/pdata/pcommon"
 | |
| 	"go.opentelemetry.io/collector/processor"
 | |
| 	"go.opentelemetry.io/collector/receiver"
 | |
| 	"go.opentelemetry.io/collector/service/extensions"
 | |
| 	"go.opentelemetry.io/collector/service/internal/builders"
 | |
| 	"go.opentelemetry.io/collector/service/internal/graph"
 | |
| 	"go.opentelemetry.io/collector/service/internal/moduleinfo"
 | |
| 	"go.opentelemetry.io/collector/service/internal/proctelemetry"
 | |
| 	"go.opentelemetry.io/collector/service/internal/resource"
 | |
| 	"go.opentelemetry.io/collector/service/internal/status"
 | |
| 	"go.opentelemetry.io/collector/service/telemetry"
 | |
| 	"go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"
 | |
| )
 | |
| 
 | |
| // This feature gate is deprecated and will be removed in 1.40.0. Views can now be configured.
 | |
| var _ = featuregate.GlobalRegistry().MustRegister(
 | |
| 	"telemetry.disableHighCardinalityMetrics",
 | |
| 	featuregate.StageDeprecated,
 | |
| 	featuregate.WithRegisterToVersion("0.133.0"),
 | |
| 	featuregate.WithRegisterDescription(
 | |
| 		"Controls whether the collector should enable potentially high "+
 | |
| 			"cardinality metrics. Deprecated, configure service::telemetry::metrics::views instead."))
 | |
| 
 | |
| // ModuleInfo describes the Go module for a particular component.
 | |
| type ModuleInfo = moduleinfo.ModuleInfo
 | |
| 
 | |
| // ModuleInfos describes the go module for all components.
 | |
| type ModuleInfos = moduleinfo.ModuleInfos
 | |
| 
 | |
| // Settings holds configuration for building a new Service.
 | |
| type Settings struct {
 | |
| 	// BuildInfo provides collector start information.
 | |
| 	BuildInfo component.BuildInfo
 | |
| 
 | |
| 	// CollectorConf contains the Collector's current configuration
 | |
| 	CollectorConf *confmap.Conf
 | |
| 
 | |
| 	// Receivers configuration to its builder.
 | |
| 	ReceiversConfigs   map[component.ID]component.Config
 | |
| 	ReceiversFactories map[component.Type]receiver.Factory
 | |
| 
 | |
| 	// Processors configuration to its builder.
 | |
| 	ProcessorsConfigs   map[component.ID]component.Config
 | |
| 	ProcessorsFactories map[component.Type]processor.Factory
 | |
| 
 | |
| 	// exporters configuration to its builder.
 | |
| 	ExportersConfigs   map[component.ID]component.Config
 | |
| 	ExportersFactories map[component.Type]exporter.Factory
 | |
| 
 | |
| 	// Connectors configuration to its builder.
 | |
| 	ConnectorsConfigs   map[component.ID]component.Config
 | |
| 	ConnectorsFactories map[component.Type]connector.Factory
 | |
| 
 | |
| 	// Extensions builder for extensions.
 | |
| 	Extensions builders.Extension
 | |
| 
 | |
| 	// Extensions configuration to its builder.
 | |
| 	ExtensionsConfigs   map[component.ID]component.Config
 | |
| 	ExtensionsFactories map[component.Type]extension.Factory
 | |
| 
 | |
| 	// ModuleInfo describes the go module for each component.
 | |
| 	ModuleInfos ModuleInfos
 | |
| 
 | |
| 	// AsyncErrorChannel is the channel that is used to report fatal errors.
 | |
| 	AsyncErrorChannel chan error
 | |
| 
 | |
| 	// LoggingOptions provides a way to change behavior of zap logging.
 | |
| 	LoggingOptions []zap.Option
 | |
| }
 | |
| 
 | |
| // Service represents the implementation of a component.Host.
 | |
| type Service struct {
 | |
| 	buildInfo         component.BuildInfo
 | |
| 	telemetrySettings component.TelemetrySettings
 | |
| 	host              *graph.Host
 | |
| 	collectorConf     *confmap.Conf
 | |
| 	sdk               *config.SDK
 | |
| }
 | |
| 
 | |
| // New creates a new Service, its telemetry, and Components.
 | |
| func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
 | |
| 	srv := &Service{
 | |
| 		buildInfo: set.BuildInfo,
 | |
| 		host: &graph.Host{
 | |
| 			Receivers:  builders.NewReceiver(set.ReceiversConfigs, set.ReceiversFactories),
 | |
| 			Processors: builders.NewProcessor(set.ProcessorsConfigs, set.ProcessorsFactories),
 | |
| 			Exporters:  builders.NewExporter(set.ExportersConfigs, set.ExportersFactories),
 | |
| 			Connectors: builders.NewConnector(set.ConnectorsConfigs, set.ConnectorsFactories),
 | |
| 			Extensions: builders.NewExtension(set.ExtensionsConfigs, set.ExtensionsFactories),
 | |
| 
 | |
| 			ModuleInfos:       set.ModuleInfos,
 | |
| 			BuildInfo:         set.BuildInfo,
 | |
| 			AsyncErrorChannel: set.AsyncErrorChannel,
 | |
| 		},
 | |
| 		collectorConf: set.CollectorConf,
 | |
| 	}
 | |
| 
 | |
| 	// Fetch data for internal telemetry like instance id and sdk version to provide for internal telemetry.
 | |
| 	res := resource.New(set.BuildInfo, cfg.Telemetry.Resource)
 | |
| 	pcommonRes := pdataFromSdk(res)
 | |
| 
 | |
| 	mpConfig := &cfg.Telemetry.Metrics.MeterProvider
 | |
| 	if mpConfig.Views == nil {
 | |
| 		mpConfig.Views = configureViews(cfg.Telemetry.Metrics.Level)
 | |
| 	}
 | |
| 
 | |
| 	sdk, err := otelconftelemetry.NewSDK(ctx, &cfg.Telemetry, res)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create SDK: %w", err)
 | |
| 	}
 | |
| 	srv.sdk = sdk
 | |
| 	defer func() {
 | |
| 		if err != nil {
 | |
| 			err = multierr.Append(err, sdk.Shutdown(ctx))
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	telFactory := otelconftelemetry.NewFactory(sdk, res)
 | |
| 	telset := telemetry.Settings{
 | |
| 		BuildInfo:  set.BuildInfo,
 | |
| 		ZapOptions: set.LoggingOptions,
 | |
| 	}
 | |
| 
 | |
| 	logger, loggerProvider, err := telFactory.CreateLogger(ctx, telset, &cfg.Telemetry)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create logger: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Use initialized logger to handle any subsequent errors
 | |
| 	// https://github.com/open-telemetry/opentelemetry-collector/pull/13081
 | |
| 	defer func() {
 | |
| 		if err != nil {
 | |
| 			logger.Error("error found during service initialization", zap.Error(err))
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	// Wrap the zap.Logger with componentattribute so scope attributes
 | |
| 	// can be added and removed dynamically, and tee logs to the
 | |
| 	// LoggerProvider.
 | |
| 	logger = logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
 | |
| 		core = componentattribute.NewConsoleCoreWithAttributes(core, attribute.NewSet())
 | |
| 		core = componentattribute.NewOTelTeeCoreWithAttributes(
 | |
| 			core,
 | |
| 			loggerProvider,
 | |
| 			"go.opentelemetry.io/collector/service",
 | |
| 			attribute.NewSet(),
 | |
| 		)
 | |
| 		return core
 | |
| 	}))
 | |
| 
 | |
| 	tracerProvider, err := telFactory.CreateTracerProvider(ctx, telset, &cfg.Telemetry)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create tracer provider: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	logger.Info("Setting up own telemetry...")
 | |
| 
 | |
| 	mp, err := telFactory.CreateMeterProvider(ctx, telset, &cfg.Telemetry)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create meter provider: %w", err)
 | |
| 	}
 | |
| 	srv.telemetrySettings = component.TelemetrySettings{
 | |
| 		Logger:         logger,
 | |
| 		MeterProvider:  mp,
 | |
| 		TracerProvider: tracerProvider,
 | |
| 		// Construct telemetry attributes from build info and config's resource attributes.
 | |
| 		Resource: pcommonRes,
 | |
| 	}
 | |
| 	srv.host.Reporter = status.NewReporter(srv.host.NotifyComponentStatusChange, func(err error) {
 | |
| 		if errors.Is(err, status.ErrStatusNotReady) {
 | |
| 			logger.Warn("Invalid transition", zap.Error(err))
 | |
| 		}
 | |
| 		// ignore other errors as they represent invalid state transitions and are considered benign.
 | |
| 	})
 | |
| 
 | |
| 	err = srv.initGraph(ctx, cfg)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	// process the configuration and initialize the pipeline
 | |
| 	err = srv.initExtensions(ctx, cfg.Extensions)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && len(mpConfig.Readers) != 0 {
 | |
| 		if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings); err != nil {
 | |
| 			return nil, fmt.Errorf("failed to register process metrics: %w", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	logsAboutMeterProvider(logger, cfg.Telemetry.Metrics, mp)
 | |
| 
 | |
| 	return srv, nil
 | |
| }
 | |
| 
 | |
| func logsAboutMeterProvider(logger *zap.Logger, cfg otelconftelemetry.MetricsConfig, mp metric.MeterProvider) {
 | |
| 	if cfg.Level == configtelemetry.LevelNone || len(cfg.Readers) == 0 {
 | |
| 		logger.Info("Skipped telemetry setup.")
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	if lmp, ok := mp.(interface {
 | |
| 		LogAboutServers(logger *zap.Logger, cfg otelconftelemetry.MetricsConfig)
 | |
| 	}); ok {
 | |
| 		lmp.LogAboutServers(logger, cfg)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Start starts the extensions and pipelines. If Start fails Shutdown should be called to ensure a clean state.
 | |
| // Start does the following steps in order:
 | |
| // 1. Start all extensions.
 | |
| // 2. Notify extensions about Collector configuration
 | |
| // 3. Start all pipelines.
 | |
| // 4. Notify extensions that the pipeline is ready.
 | |
| func (srv *Service) Start(ctx context.Context) error {
 | |
| 	srv.telemetrySettings.Logger.Info("Starting "+srv.buildInfo.Command+"...",
 | |
| 		zap.String("Version", srv.buildInfo.Version),
 | |
| 		zap.Int("NumCPU", runtime.NumCPU()),
 | |
| 	)
 | |
| 
 | |
| 	if err := srv.host.ServiceExtensions.Start(ctx, srv.host); err != nil {
 | |
| 		return fmt.Errorf("failed to start extensions: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	if srv.collectorConf != nil {
 | |
| 		if err := srv.host.ServiceExtensions.NotifyConfig(ctx, srv.collectorConf); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if err := srv.host.Pipelines.StartAll(ctx, srv.host); err != nil {
 | |
| 		return fmt.Errorf("cannot start pipelines: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	if err := srv.host.ServiceExtensions.NotifyPipelineReady(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	srv.telemetrySettings.Logger.Info("Everything is ready. Begin running and processing data.")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Shutdown the service. Shutdown will do the following steps in order:
 | |
| // 1. Notify extensions that the pipeline is shutting down.
 | |
| // 2. Shutdown all pipelines.
 | |
| // 3. Shutdown all extensions.
 | |
| // 4. Shutdown telemetry.
 | |
| func (srv *Service) Shutdown(ctx context.Context) error {
 | |
| 	// Accumulate errors and proceed with shutting down remaining components.
 | |
| 	var errs error
 | |
| 
 | |
| 	// Begin shutdown sequence.
 | |
| 	srv.telemetrySettings.Logger.Info("Starting shutdown...")
 | |
| 
 | |
| 	if err := srv.host.ServiceExtensions.NotifyPipelineNotReady(); err != nil {
 | |
| 		errs = multierr.Append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err))
 | |
| 	}
 | |
| 
 | |
| 	if err := srv.host.Pipelines.ShutdownAll(ctx, srv.host.Reporter); err != nil {
 | |
| 		errs = multierr.Append(errs, fmt.Errorf("failed to shutdown pipelines: %w", err))
 | |
| 	}
 | |
| 
 | |
| 	if err := srv.host.ServiceExtensions.Shutdown(ctx); err != nil {
 | |
| 		errs = multierr.Append(errs, fmt.Errorf("failed to shutdown extensions: %w", err))
 | |
| 	}
 | |
| 
 | |
| 	srv.telemetrySettings.Logger.Info("Shutdown complete.")
 | |
| 
 | |
| 	if err := srv.sdk.Shutdown(ctx); err != nil {
 | |
| 		errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry: %w", err))
 | |
| 	}
 | |
| 
 | |
| 	return errs
 | |
| }
 | |
| 
 | |
| // Creates extensions.
 | |
| func (srv *Service) initExtensions(ctx context.Context, cfg extensions.Config) error {
 | |
| 	var err error
 | |
| 	extensionsSettings := extensions.Settings{
 | |
| 		Telemetry:  srv.telemetrySettings,
 | |
| 		BuildInfo:  srv.buildInfo,
 | |
| 		Extensions: srv.host.Extensions,
 | |
| 	}
 | |
| 	if srv.host.ServiceExtensions, err = extensions.New(ctx, extensionsSettings, cfg, extensions.WithReporter(srv.host.Reporter)); err != nil {
 | |
| 		return fmt.Errorf("failed to build extensions: %w", err)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Creates the pipeline graph.
 | |
| func (srv *Service) initGraph(ctx context.Context, cfg Config) error {
 | |
| 	var err error
 | |
| 	if srv.host.Pipelines, err = graph.Build(ctx, graph.Settings{
 | |
| 		Telemetry:        srv.telemetrySettings,
 | |
| 		BuildInfo:        srv.buildInfo,
 | |
| 		ReceiverBuilder:  srv.host.Receivers,
 | |
| 		ProcessorBuilder: srv.host.Processors,
 | |
| 		ExporterBuilder:  srv.host.Exporters,
 | |
| 		ConnectorBuilder: srv.host.Connectors,
 | |
| 		PipelineConfigs:  cfg.Pipelines,
 | |
| 		ReportStatus:     srv.host.Reporter.ReportStatus,
 | |
| 	}); err != nil {
 | |
| 		return fmt.Errorf("failed to build pipelines: %w", err)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Logger returns the logger created for this service.
 | |
| // This is a temporary API that may be removed soon after investigating how the collector should record different events.
 | |
| func (srv *Service) Logger() *zap.Logger {
 | |
| 	return srv.telemetrySettings.Logger
 | |
| }
 | |
| 
 | |
| func pdataFromSdk(res *sdkresource.Resource) pcommon.Resource {
 | |
| 	// pcommon.NewResource is the best way to generate a new resource currently and is safe to use outside of tests.
 | |
| 	// Because the resource is signal agnostic, and we need a net new resource, not an existing one, this is the only
 | |
| 	// method of creating it without exposing internal packages.
 | |
| 	pcommonRes := pcommon.NewResource()
 | |
| 	for _, keyValue := range res.Attributes() {
 | |
| 		pcommonRes.Attributes().PutStr(string(keyValue.Key), keyValue.Value.AsString())
 | |
| 	}
 | |
| 	return pcommonRes
 | |
| }
 | |
| 
 | |
| func dropViewOption(selector *config.ViewSelector) config.View {
 | |
| 	return config.View{
 | |
| 		Selector: selector,
 | |
| 		Stream: &config.ViewStream{
 | |
| 			Aggregation: &config.ViewStreamAggregation{
 | |
| 				Drop: config.ViewStreamAggregationDrop{},
 | |
| 			},
 | |
| 		},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func configureViews(level configtelemetry.Level) []config.View {
 | |
| 	views := []config.View{}
 | |
| 
 | |
| 	if level < configtelemetry.LevelDetailed {
 | |
| 		// Drop all otelhttp and otelgrpc metrics if the level is not detailed.
 | |
| 		views = append(views,
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName: ptr("go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"),
 | |
| 			}),
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName: ptr("go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"),
 | |
| 			}),
 | |
| 			// Drop duration metric if the level is not detailed
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName:      ptr("go.opentelemetry.io/collector/processor/processorhelper"),
 | |
| 				InstrumentName: ptr("otelcol_processor_internal_duration"),
 | |
| 			}),
 | |
| 		)
 | |
| 	}
 | |
| 
 | |
| 	// otel-arrow library metrics
 | |
| 	// See https://github.com/open-telemetry/otel-arrow/blob/c39257/pkg/otel/arrow_record/consumer.go#L174-L176
 | |
| 	if level < configtelemetry.LevelNormal {
 | |
| 		scope := ptr("otel-arrow/pkg/otel/arrow_record")
 | |
| 		views = append(views,
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName:      scope,
 | |
| 				InstrumentName: ptr("arrow_batch_records"),
 | |
| 			}),
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName:      scope,
 | |
| 				InstrumentName: ptr("arrow_schema_resets"),
 | |
| 			}),
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName:      scope,
 | |
| 				InstrumentName: ptr("arrow_memory_inuse"),
 | |
| 			}),
 | |
| 		)
 | |
| 	}
 | |
| 
 | |
| 	// contrib's internal/otelarrow/netstats metrics
 | |
| 	// See
 | |
| 	// - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L130
 | |
| 	// - https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/a25f05/internal/otelarrow/netstats/netstats.go#L165
 | |
| 	if level < configtelemetry.LevelDetailed {
 | |
| 		scope := ptr("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats")
 | |
| 
 | |
| 		views = append(views,
 | |
| 			// Compressed size metrics.
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName:      scope,
 | |
| 				InstrumentName: ptr("otelcol_*_compressed_size"),
 | |
| 			}),
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName:      scope,
 | |
| 				InstrumentName: ptr("otelcol_*_compressed_size"),
 | |
| 			}),
 | |
| 
 | |
| 			// makeRecvMetrics for exporters.
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName:      scope,
 | |
| 				InstrumentName: ptr("otelcol_exporter_recv"),
 | |
| 			}),
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName:      scope,
 | |
| 				InstrumentName: ptr("otelcol_exporter_recv_wire"),
 | |
| 			}),
 | |
| 
 | |
| 			// makeSentMetrics for receivers.
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName:      scope,
 | |
| 				InstrumentName: ptr("otelcol_receiver_sent"),
 | |
| 			}),
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName:      scope,
 | |
| 				InstrumentName: ptr("otelcol_receiver_sent_wire"),
 | |
| 			}),
 | |
| 		)
 | |
| 	}
 | |
| 
 | |
| 	// Batch processor metrics
 | |
| 	scope := ptr("go.opentelemetry.io/collector/processor/batchprocessor")
 | |
| 	if level < configtelemetry.LevelNormal {
 | |
| 		views = append(views, dropViewOption(&config.ViewSelector{
 | |
| 			MeterName: scope,
 | |
| 		}))
 | |
| 	} else if level < configtelemetry.LevelDetailed {
 | |
| 		views = append(views, dropViewOption(&config.ViewSelector{
 | |
| 			MeterName:      scope,
 | |
| 			InstrumentName: ptr("otelcol_processor_batch_batch_send_size_bytes"),
 | |
| 		}))
 | |
| 	}
 | |
| 
 | |
| 	// Internal graph metrics
 | |
| 	graphScope := ptr("go.opentelemetry.io/collector/service")
 | |
| 	if level < configtelemetry.LevelDetailed {
 | |
| 		views = append(views,
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName:      graphScope,
 | |
| 				InstrumentName: ptr("otelcol.*.consumed.size"),
 | |
| 			}),
 | |
| 			dropViewOption(&config.ViewSelector{
 | |
| 				MeterName:      graphScope,
 | |
| 				InstrumentName: ptr("otelcol.*.produced.size"),
 | |
| 			}))
 | |
| 	}
 | |
| 
 | |
| 	return views
 | |
| }
 | |
| 
 | |
| func ptr[T any](v T) *T {
 | |
| 	return &v
 | |
| }
 | |
| 
 | |
| // Validate verifies the graph by calling the internal graph.Build.
 | |
| func Validate(ctx context.Context, set Settings, cfg Config) error {
 | |
| 	tel := component.TelemetrySettings{
 | |
| 		Logger:         zap.NewNop(),
 | |
| 		TracerProvider: nooptrace.NewTracerProvider(),
 | |
| 		MeterProvider:  noopmetric.NewMeterProvider(),
 | |
| 		Resource:       pcommon.NewResource(),
 | |
| 	}
 | |
| 	_, err := graph.Build(ctx, graph.Settings{
 | |
| 		Telemetry:        tel,
 | |
| 		BuildInfo:        set.BuildInfo,
 | |
| 		ReceiverBuilder:  builders.NewReceiver(set.ReceiversConfigs, set.ReceiversFactories),
 | |
| 		ProcessorBuilder: builders.NewProcessor(set.ProcessorsConfigs, set.ProcessorsFactories),
 | |
| 		ExporterBuilder:  builders.NewExporter(set.ExportersConfigs, set.ExportersFactories),
 | |
| 		ConnectorBuilder: builders.NewConnector(set.ConnectorsConfigs, set.ConnectorsFactories),
 | |
| 		PipelineConfigs:  cfg.Pipelines,
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to build pipelines: %w", err)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |