633 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			633 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
| // Copyright The OpenTelemetry Authors
 | |
| // SPDX-License-Identifier: Apache-2.0
 | |
| 
 | |
| // Package graph contains the internal graph representation of the pipelines.
 | |
| //
 | |
| // [Build] is the constructor for a [Graph] object.  The method calls out to helpers that transform the graph from a config
 | |
| // to a DAG of components.  The configuration undergoes additional validation here as well, and is used to instantiate
 | |
| // the components of the pipeline.
 | |
| //
 | |
| // [Graph.StartAll] starts all components in each pipeline.
 | |
| //
 | |
| // [Graph.ShutdownAll] stops all components in each pipeline.
 | |
| package graph // import "go.opentelemetry.io/collector/service/internal/graph"
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"strings"
 | |
| 
 | |
| 	"go.uber.org/multierr"
 | |
| 	"go.uber.org/zap"
 | |
| 	"gonum.org/v1/gonum/graph"
 | |
| 	"gonum.org/v1/gonum/graph/simple"
 | |
| 	"gonum.org/v1/gonum/graph/topo"
 | |
| 
 | |
| 	"go.opentelemetry.io/collector/component"
 | |
| 	"go.opentelemetry.io/collector/component/componentstatus"
 | |
| 	"go.opentelemetry.io/collector/connector"
 | |
| 	"go.opentelemetry.io/collector/connector/xconnector"
 | |
| 	"go.opentelemetry.io/collector/consumer"
 | |
| 	"go.opentelemetry.io/collector/consumer/xconsumer"
 | |
| 	"go.opentelemetry.io/collector/internal/fanoutconsumer"
 | |
| 	"go.opentelemetry.io/collector/pipeline"
 | |
| 	"go.opentelemetry.io/collector/pipeline/xpipeline"
 | |
| 	"go.opentelemetry.io/collector/service/hostcapabilities"
 | |
| 	"go.opentelemetry.io/collector/service/internal/builders"
 | |
| 	"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
 | |
| 	"go.opentelemetry.io/collector/service/internal/status"
 | |
| 	"go.opentelemetry.io/collector/service/pipelines"
 | |
| )
 | |
| 
 | |
| // Settings holds configuration for building builtPipelines.
 | |
| type Settings struct {
 | |
| 	Telemetry component.TelemetrySettings
 | |
| 	BuildInfo component.BuildInfo
 | |
| 
 | |
| 	ReceiverBuilder  *builders.ReceiverBuilder
 | |
| 	ProcessorBuilder *builders.ProcessorBuilder
 | |
| 	ExporterBuilder  *builders.ExporterBuilder
 | |
| 	ConnectorBuilder *builders.ConnectorBuilder
 | |
| 
 | |
| 	// PipelineConfigs is a map of component.ID to PipelineConfig.
 | |
| 	PipelineConfigs pipelines.Config
 | |
| 
 | |
| 	ReportStatus status.ServiceStatusFunc
 | |
| }
 | |
| 
 | |
| type Graph struct {
 | |
| 	// All component instances represented as nodes, with directed edges indicating data flow.
 | |
| 	componentGraph *simple.DirectedGraph
 | |
| 
 | |
| 	// Keep track of how nodes relate to pipelines, so we can declare edges in the graph.
 | |
| 	pipelines map[pipeline.ID]*pipelineNodes
 | |
| 
 | |
| 	// Keep track of status source per node
 | |
| 	instanceIDs map[int64]*componentstatus.InstanceID
 | |
| 
 | |
| 	telemetry component.TelemetrySettings
 | |
| }
 | |
| 
 | |
| // Build builds a full pipeline graph.
 | |
| // Build also validates the configuration of the pipelines and does the actual initialization of each Component in the Graph.
 | |
| func Build(ctx context.Context, set Settings) (*Graph, error) {
 | |
| 	pipelines := &Graph{
 | |
| 		componentGraph: simple.NewDirectedGraph(),
 | |
| 		pipelines:      make(map[pipeline.ID]*pipelineNodes, len(set.PipelineConfigs)),
 | |
| 		instanceIDs:    make(map[int64]*componentstatus.InstanceID),
 | |
| 		telemetry:      set.Telemetry,
 | |
| 	}
 | |
| 	for pipelineID := range set.PipelineConfigs {
 | |
| 		pipelines.pipelines[pipelineID] = &pipelineNodes{
 | |
| 			receivers: make(map[int64]graph.Node),
 | |
| 			exporters: make(map[int64]graph.Node),
 | |
| 		}
 | |
| 	}
 | |
| 	if err := pipelines.createNodes(set); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	pipelines.createEdges()
 | |
| 	err := pipelines.buildComponents(ctx, set)
 | |
| 	return pipelines, err
 | |
| }
 | |
| 
 | |
| // Creates a node for each instance of a component and adds it to the graph.
 | |
| // Validates that connectors are configured to export and receive correctly.
 | |
| func (g *Graph) createNodes(set Settings) error {
 | |
| 	// Build a list of all connectors for easy reference.
 | |
| 	connectors := make(map[component.ID]struct{})
 | |
| 
 | |
| 	// Keep track of connectors and where they are used. (map[connectorID][]pipelineID).
 | |
| 	connectorsAsExporter := make(map[component.ID][]pipeline.ID)
 | |
| 	connectorsAsReceiver := make(map[component.ID][]pipeline.ID)
 | |
| 
 | |
| 	// Build each pipelineNodes struct for each pipeline by parsing the pipelineCfg.
 | |
| 	// Also populates the connectors, connectorsAsExporter and connectorsAsReceiver maps.
 | |
| 	for pipelineID, pipelineCfg := range set.PipelineConfigs {
 | |
| 		pipe := g.pipelines[pipelineID]
 | |
| 		for _, recvID := range pipelineCfg.Receivers {
 | |
| 			// Checks if this receiver is a connector or a regular receiver.
 | |
| 			if set.ConnectorBuilder.IsConfigured(recvID) {
 | |
| 				connectors[recvID] = struct{}{}
 | |
| 				connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
 | |
| 				continue
 | |
| 			}
 | |
| 			rcvrNode := g.createReceiver(pipelineID, recvID)
 | |
| 			pipe.receivers[rcvrNode.ID()] = rcvrNode
 | |
| 		}
 | |
| 
 | |
| 		pipe.capabilitiesNode = newCapabilitiesNode(pipelineID)
 | |
| 
 | |
| 		for _, procID := range pipelineCfg.Processors {
 | |
| 			procNode := g.createProcessor(pipelineID, procID)
 | |
| 			pipe.processors = append(pipe.processors, procNode)
 | |
| 		}
 | |
| 
 | |
| 		pipe.fanOutNode = newFanOutNode(pipelineID)
 | |
| 
 | |
| 		for _, exprID := range pipelineCfg.Exporters {
 | |
| 			if set.ConnectorBuilder.IsConfigured(exprID) {
 | |
| 				connectors[exprID] = struct{}{}
 | |
| 				connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID)
 | |
| 				continue
 | |
| 			}
 | |
| 			expNode := g.createExporter(pipelineID, exprID)
 | |
| 			pipe.exporters[expNode.ID()] = expNode
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	for connID := range connectors {
 | |
| 		factory := set.ConnectorBuilder.Factory(connID.Type())
 | |
| 		if factory == nil {
 | |
| 			return fmt.Errorf("connector factory not available for: %q", connID.Type())
 | |
| 		}
 | |
| 		connFactory := factory.(connector.Factory)
 | |
| 
 | |
| 		expTypes := make(map[pipeline.Signal]bool)
 | |
| 		for _, pipelineID := range connectorsAsExporter[connID] {
 | |
| 			// The presence of each key indicates how the connector is used as an exporter.
 | |
| 			// The value is initially set to false. Later we will set the value to true *if* we
 | |
| 			// confirm that there is a supported corresponding use as a receiver.
 | |
| 			expTypes[pipelineID.Signal()] = false
 | |
| 		}
 | |
| 		recTypes := make(map[pipeline.Signal]bool)
 | |
| 		for _, pipelineID := range connectorsAsReceiver[connID] {
 | |
| 			// The presence of each key indicates how the connector is used as a receiver.
 | |
| 			// The value is initially set to false. Later we will set the value to true *if* we
 | |
| 			// confirm that there is a supported corresponding use as an exporter.
 | |
| 			recTypes[pipelineID.Signal()] = false
 | |
| 		}
 | |
| 
 | |
| 		for expType := range expTypes {
 | |
| 			for recType := range recTypes {
 | |
| 				// Typechecks the connector's receiving and exporting datatypes.
 | |
| 				if connectorStability(connFactory, expType, recType) != component.StabilityLevelUndefined {
 | |
| 					expTypes[expType] = true
 | |
| 					recTypes[recType] = true
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		for expType, supportedUse := range expTypes {
 | |
| 			if supportedUse {
 | |
| 				continue
 | |
| 			}
 | |
| 			return fmt.Errorf("connector %q used as exporter in %v pipeline but not used in any supported receiver pipeline", connID, formatPipelineNamesWithSignal(connectorsAsExporter[connID], expType))
 | |
| 		}
 | |
| 		for recType, supportedUse := range recTypes {
 | |
| 			if supportedUse {
 | |
| 				continue
 | |
| 			}
 | |
| 			return fmt.Errorf("connector %q used as receiver in %v pipeline but not used in any supported exporter pipeline", connID, formatPipelineNamesWithSignal(connectorsAsReceiver[connID], recType))
 | |
| 		}
 | |
| 
 | |
| 		for _, eID := range connectorsAsExporter[connID] {
 | |
| 			for _, rID := range connectorsAsReceiver[connID] {
 | |
| 				if connectorStability(connFactory, eID.Signal(), rID.Signal()) == component.StabilityLevelUndefined {
 | |
| 					// Connector is not supported for this combination, but we know it is used correctly elsewhere
 | |
| 					continue
 | |
| 				}
 | |
| 				connNode := g.createConnector(eID, rID, connID)
 | |
| 
 | |
| 				g.pipelines[eID].exporters[connNode.ID()] = connNode
 | |
| 				g.pipelines[rID].receivers[connNode.ID()] = connNode
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // formatPipelineNamesWithSignal formats pipeline name with signal as "signal[/name]" format.
 | |
| func formatPipelineNamesWithSignal(pipelineIDs []pipeline.ID, signal pipeline.Signal) []string {
 | |
| 	var formatted []string
 | |
| 	for _, pid := range pipelineIDs {
 | |
| 		if pid.Signal() == signal {
 | |
| 			formatted = append(formatted, pid.String())
 | |
| 		}
 | |
| 	}
 | |
| 	return formatted
 | |
| }
 | |
| 
 | |
| func (g *Graph) createReceiver(pipelineID pipeline.ID, recvID component.ID) *receiverNode {
 | |
| 	rcvrNode := newReceiverNode(pipelineID.Signal(), recvID)
 | |
| 	if node := g.componentGraph.Node(rcvrNode.ID()); node != nil {
 | |
| 		instanceID := g.instanceIDs[node.ID()]
 | |
| 		g.instanceIDs[node.ID()] = instanceID.WithPipelines(pipelineID)
 | |
| 		return node.(*receiverNode)
 | |
| 	}
 | |
| 	g.componentGraph.AddNode(rcvrNode)
 | |
| 	g.instanceIDs[rcvrNode.ID()] = componentstatus.NewInstanceID(
 | |
| 		recvID, component.KindReceiver, pipelineID,
 | |
| 	)
 | |
| 	return rcvrNode
 | |
| }
 | |
| 
 | |
| func (g *Graph) createProcessor(pipelineID pipeline.ID, procID component.ID) *processorNode {
 | |
| 	procNode := newProcessorNode(pipelineID, procID)
 | |
| 	g.componentGraph.AddNode(procNode)
 | |
| 	g.instanceIDs[procNode.ID()] = componentstatus.NewInstanceID(
 | |
| 		procID, component.KindProcessor, pipelineID,
 | |
| 	)
 | |
| 	return procNode
 | |
| }
 | |
| 
 | |
| func (g *Graph) createExporter(pipelineID pipeline.ID, exprID component.ID) *exporterNode {
 | |
| 	expNode := newExporterNode(pipelineID.Signal(), exprID)
 | |
| 	if node := g.componentGraph.Node(expNode.ID()); node != nil {
 | |
| 		instanceID := g.instanceIDs[expNode.ID()]
 | |
| 		g.instanceIDs[expNode.ID()] = instanceID.WithPipelines(pipelineID)
 | |
| 		return node.(*exporterNode)
 | |
| 	}
 | |
| 	g.componentGraph.AddNode(expNode)
 | |
| 	g.instanceIDs[expNode.ID()] = componentstatus.NewInstanceID(
 | |
| 		expNode.componentID, component.KindExporter, pipelineID,
 | |
| 	)
 | |
| 	return expNode
 | |
| }
 | |
| 
 | |
| func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID pipeline.ID, connID component.ID) *connectorNode {
 | |
| 	connNode := newConnectorNode(exprPipelineID.Signal(), rcvrPipelineID.Signal(), connID)
 | |
| 	if node := g.componentGraph.Node(connNode.ID()); node != nil {
 | |
| 		instanceID := g.instanceIDs[connNode.ID()]
 | |
| 		g.instanceIDs[connNode.ID()] = instanceID.WithPipelines(exprPipelineID, rcvrPipelineID)
 | |
| 		return node.(*connectorNode)
 | |
| 	}
 | |
| 	g.componentGraph.AddNode(connNode)
 | |
| 	g.instanceIDs[connNode.ID()] = componentstatus.NewInstanceID(
 | |
| 		connNode.componentID, component.KindConnector, exprPipelineID, rcvrPipelineID,
 | |
| 	)
 | |
| 	return connNode
 | |
| }
 | |
| 
 | |
| // Iterates through the pipelines and creates edges between components.
 | |
| func (g *Graph) createEdges() {
 | |
| 	for _, pg := range g.pipelines {
 | |
| 		// Draw edges from each receiver to the capability node.
 | |
| 		for _, receiver := range pg.receivers {
 | |
| 			g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, pg.capabilitiesNode))
 | |
| 		}
 | |
| 
 | |
| 		// Iterates through processors, chaining them together.  starts with the capabilities node.
 | |
| 		var from, to graph.Node
 | |
| 		from = pg.capabilitiesNode
 | |
| 		for _, processor := range pg.processors {
 | |
| 			to = processor
 | |
| 			g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))
 | |
| 			from = processor
 | |
| 		}
 | |
| 		// Always inserts a fanout node before any exporters. If there is only one
 | |
| 		// exporter, the fanout node is still created and acts as a noop.
 | |
| 		to = pg.fanOutNode
 | |
| 		g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))
 | |
| 
 | |
| 		for _, exporter := range pg.exporters {
 | |
| 			g.componentGraph.SetEdge(g.componentGraph.NewEdge(pg.fanOutNode, exporter))
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Uses the already built graph g to instantiate the actual components for each component of each pipeline.
 | |
| // Handles calling the factories for each component - and hooking up each component to the next.
 | |
| // Also calculates whether each pipeline mutates data so the receiver can know whether it needs to clone the data.
 | |
| func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
 | |
| 	nodes, err := topo.Sort(g.componentGraph)
 | |
| 	if err != nil {
 | |
| 		return cycleErr(err, topo.DirectedCyclesIn(g.componentGraph))
 | |
| 	}
 | |
| 
 | |
| 	for i := len(nodes) - 1; i >= 0; i-- {
 | |
| 		node := nodes[i]
 | |
| 
 | |
| 		switch n := node.(type) {
 | |
| 		case *receiverNode:
 | |
| 			err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID()))
 | |
| 		case *processorNode:
 | |
| 			// nextConsumers is guaranteed to be length 1.  Either it is the next processor or it is the fanout node for the exporters.
 | |
| 			err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0])
 | |
| 		case *exporterNode:
 | |
| 			err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ExporterBuilder)
 | |
| 		case *connectorNode:
 | |
| 			err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ConnectorBuilder, g.nextConsumers(n.ID()))
 | |
| 		case *capabilitiesNode:
 | |
| 			capability := consumer.Capabilities{
 | |
| 				// The fanOutNode represents the aggregate capabilities of the exporters in the pipeline.
 | |
| 				MutatesData: g.pipelines[n.pipelineID].fanOutNode.getConsumer().Capabilities().MutatesData,
 | |
| 			}
 | |
| 			for _, proc := range g.pipelines[n.pipelineID].processors {
 | |
| 				capability.MutatesData = capability.MutatesData || proc.(*processorNode).getConsumer().Capabilities().MutatesData
 | |
| 			}
 | |
| 			next := g.nextConsumers(n.ID())[0]
 | |
| 			switch n.pipelineID.Signal() {
 | |
| 			case pipeline.SignalTraces:
 | |
| 				cc := capabilityconsumer.NewTraces(next.(consumer.Traces), capability)
 | |
| 				n.baseConsumer = cc
 | |
| 				n.ConsumeTracesFunc = cc.ConsumeTraces
 | |
| 			case pipeline.SignalMetrics:
 | |
| 				cc := capabilityconsumer.NewMetrics(next.(consumer.Metrics), capability)
 | |
| 				n.baseConsumer = cc
 | |
| 				n.ConsumeMetricsFunc = cc.ConsumeMetrics
 | |
| 			case pipeline.SignalLogs:
 | |
| 				cc := capabilityconsumer.NewLogs(next.(consumer.Logs), capability)
 | |
| 				n.baseConsumer = cc
 | |
| 				n.ConsumeLogsFunc = cc.ConsumeLogs
 | |
| 			case xpipeline.SignalProfiles:
 | |
| 				cc := capabilityconsumer.NewProfiles(next.(xconsumer.Profiles), capability)
 | |
| 				n.baseConsumer = cc
 | |
| 				n.ConsumeProfilesFunc = cc.ConsumeProfiles
 | |
| 			}
 | |
| 		case *fanOutNode:
 | |
| 			nexts := g.nextConsumers(n.ID())
 | |
| 			switch n.pipelineID.Signal() {
 | |
| 			case pipeline.SignalTraces:
 | |
| 				consumers := make([]consumer.Traces, 0, len(nexts))
 | |
| 				for _, next := range nexts {
 | |
| 					consumers = append(consumers, next.(consumer.Traces))
 | |
| 				}
 | |
| 				n.baseConsumer = fanoutconsumer.NewTraces(consumers)
 | |
| 			case pipeline.SignalMetrics:
 | |
| 				consumers := make([]consumer.Metrics, 0, len(nexts))
 | |
| 				for _, next := range nexts {
 | |
| 					consumers = append(consumers, next.(consumer.Metrics))
 | |
| 				}
 | |
| 				n.baseConsumer = fanoutconsumer.NewMetrics(consumers)
 | |
| 			case pipeline.SignalLogs:
 | |
| 				consumers := make([]consumer.Logs, 0, len(nexts))
 | |
| 				for _, next := range nexts {
 | |
| 					consumers = append(consumers, next.(consumer.Logs))
 | |
| 				}
 | |
| 				n.baseConsumer = fanoutconsumer.NewLogs(consumers)
 | |
| 			case xpipeline.SignalProfiles:
 | |
| 				consumers := make([]xconsumer.Profiles, 0, len(nexts))
 | |
| 				for _, next := range nexts {
 | |
| 					consumers = append(consumers, next.(xconsumer.Profiles))
 | |
| 				}
 | |
| 				n.baseConsumer = fanoutconsumer.NewProfiles(consumers)
 | |
| 			}
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Find all nodes
 | |
| func (g *Graph) nextConsumers(nodeID int64) []baseConsumer {
 | |
| 	nextNodes := g.componentGraph.From(nodeID)
 | |
| 	nexts := make([]baseConsumer, 0, nextNodes.Len())
 | |
| 	for nextNodes.Next() {
 | |
| 		nexts = append(nexts, nextNodes.Node().(consumerNode).getConsumer())
 | |
| 	}
 | |
| 	return nexts
 | |
| }
 | |
| 
 | |
| // A node-based representation of a pipeline configuration.
 | |
| type pipelineNodes struct {
 | |
| 	// Use map to assist with deduplication of connector instances.
 | |
| 	receivers map[int64]graph.Node
 | |
| 
 | |
| 	// The node to which receivers emit. Passes through to processors.
 | |
| 	// Easily accessible as the first node in a pipeline.
 | |
| 	*capabilitiesNode
 | |
| 
 | |
| 	// The order of processors is very important. Therefore use a slice for processors.
 | |
| 	processors []graph.Node
 | |
| 
 | |
| 	// Emits to exporters.
 | |
| 	*fanOutNode
 | |
| 
 | |
| 	// Use map to assist with deduplication of connector instances.
 | |
| 	exporters map[int64]graph.Node
 | |
| }
 | |
| 
 | |
| func (g *Graph) StartAll(ctx context.Context, host *Host) error {
 | |
| 	if host == nil {
 | |
| 		return errors.New("host cannot be nil")
 | |
| 	}
 | |
| 
 | |
| 	nodes, err := topo.Sort(g.componentGraph)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Start in reverse topological order so that downstream components
 | |
| 	// are started before upstream components. This ensures that each
 | |
| 	// component's consumer is ready to consume.
 | |
| 	for i := len(nodes) - 1; i >= 0; i-- {
 | |
| 		node := nodes[i]
 | |
| 		comp, ok := node.(component.Component)
 | |
| 
 | |
| 		if !ok {
 | |
| 			// Skip capabilities/fanout nodes
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		instanceID := g.instanceIDs[node.ID()]
 | |
| 		host.Reporter.ReportStatus(
 | |
| 			instanceID,
 | |
| 			componentstatus.NewEvent(componentstatus.StatusStarting),
 | |
| 		)
 | |
| 
 | |
| 		if compErr := comp.Start(ctx, &HostWrapper{Host: host, InstanceID: instanceID}); compErr != nil {
 | |
| 			host.Reporter.ReportStatus(
 | |
| 				instanceID,
 | |
| 				componentstatus.NewPermanentErrorEvent(compErr),
 | |
| 			)
 | |
| 			// We log with zap.AddStacktrace(zap.DPanicLevel) to avoid adding the stack trace to the error log
 | |
| 			g.telemetry.Logger.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).
 | |
| 				Error("Failed to start component",
 | |
| 					zap.Error(compErr),
 | |
| 					zap.String("type", instanceID.Kind().String()),
 | |
| 					zap.String("id", instanceID.ComponentID().String()),
 | |
| 				)
 | |
| 			return fmt.Errorf("failed to start %q %s: %w", instanceID.ComponentID().String(), strings.ToLower(instanceID.Kind().String()), compErr)
 | |
| 		}
 | |
| 
 | |
| 		host.Reporter.ReportOKIfStarting(instanceID)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (g *Graph) ShutdownAll(ctx context.Context, reporter status.Reporter) error {
 | |
| 	nodes, err := topo.Sort(g.componentGraph)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Stop in topological order so that upstream components
 | |
| 	// are stopped before downstream components.  This ensures
 | |
| 	// that each component has a chance to drain to its consumer
 | |
| 	// before the consumer is stopped.
 | |
| 	var errs error
 | |
| 	for i := 0; i < len(nodes); i++ {
 | |
| 		node := nodes[i]
 | |
| 		comp, ok := node.(component.Component)
 | |
| 
 | |
| 		if !ok {
 | |
| 			// Skip capabilities/fanout nodes
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		instanceID := g.instanceIDs[node.ID()]
 | |
| 		reporter.ReportStatus(
 | |
| 			instanceID,
 | |
| 			componentstatus.NewEvent(componentstatus.StatusStopping),
 | |
| 		)
 | |
| 
 | |
| 		if compErr := comp.Shutdown(ctx); compErr != nil {
 | |
| 			errs = multierr.Append(errs, compErr)
 | |
| 			reporter.ReportStatus(
 | |
| 				instanceID,
 | |
| 				componentstatus.NewPermanentErrorEvent(compErr),
 | |
| 			)
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		reporter.ReportStatus(
 | |
| 			instanceID,
 | |
| 			componentstatus.NewEvent(componentstatus.StatusStopped),
 | |
| 		)
 | |
| 	}
 | |
| 	return errs
 | |
| }
 | |
| 
 | |
| func (g *Graph) GetExporters() map[pipeline.Signal]map[component.ID]component.Component {
 | |
| 	exportersMap := make(map[pipeline.Signal]map[component.ID]component.Component)
 | |
| 	exportersMap[pipeline.SignalTraces] = make(map[component.ID]component.Component)
 | |
| 	exportersMap[pipeline.SignalMetrics] = make(map[component.ID]component.Component)
 | |
| 	exportersMap[pipeline.SignalLogs] = make(map[component.ID]component.Component)
 | |
| 	exportersMap[xpipeline.SignalProfiles] = make(map[component.ID]component.Component)
 | |
| 
 | |
| 	for _, pg := range g.pipelines {
 | |
| 		for _, expNode := range pg.exporters {
 | |
| 			// Skip connectors, otherwise individual components can introduce cycles
 | |
| 			if expNode, ok := g.componentGraph.Node(expNode.ID()).(*exporterNode); ok {
 | |
| 				exportersMap[expNode.pipelineType][expNode.componentID] = expNode.Component
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return exportersMap
 | |
| }
 | |
| 
 | |
| func cycleErr(err error, cycles [][]graph.Node) error {
 | |
| 	var topoErr topo.Unorderable
 | |
| 	if !errors.As(err, &topoErr) || len(cycles) == 0 || len(cycles[0]) == 0 {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// There may be multiple cycles, but report only the first one.
 | |
| 	cycle := cycles[0]
 | |
| 
 | |
| 	// The last node is a duplicate of the first node.
 | |
| 	// Remove it because we may start from a different node.
 | |
| 	cycle = cycle[:len(cycle)-1]
 | |
| 
 | |
| 	// A cycle always contains a connector. For the sake of consistent
 | |
| 	// error messages report the cycle starting from a connector.
 | |
| 	for i := 0; i < len(cycle); i++ {
 | |
| 		if _, ok := cycle[i].(*connectorNode); ok {
 | |
| 			cycle = append(cycle[i:], cycle[:i]...)
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Repeat the first node at the end to clarify the cycle
 | |
| 	cycle = append(cycle, cycle[0])
 | |
| 
 | |
| 	// Build the error message
 | |
| 	componentDetails := make([]string, 0, len(cycle))
 | |
| 	for _, node := range cycle {
 | |
| 		switch n := node.(type) {
 | |
| 		case *processorNode:
 | |
| 			componentDetails = append(componentDetails, fmt.Sprintf("processor %q in pipeline %q", n.componentID, n.pipelineID.String()))
 | |
| 		case *connectorNode:
 | |
| 			componentDetails = append(componentDetails, fmt.Sprintf("connector %q (%s to %s)", n.componentID, n.exprPipelineType, n.rcvrPipelineType))
 | |
| 		default:
 | |
| 			continue // skip capabilities/fanout nodes
 | |
| 		}
 | |
| 	}
 | |
| 	return fmt.Errorf("cycle detected: %s", strings.Join(componentDetails, " -> "))
 | |
| }
 | |
| 
 | |
| func connectorStability(f connector.Factory, expType, recType pipeline.Signal) component.StabilityLevel {
 | |
| 	switch expType {
 | |
| 	case pipeline.SignalTraces:
 | |
| 		switch recType {
 | |
| 		case pipeline.SignalTraces:
 | |
| 			return f.TracesToTracesStability()
 | |
| 		case pipeline.SignalMetrics:
 | |
| 			return f.TracesToMetricsStability()
 | |
| 		case pipeline.SignalLogs:
 | |
| 			return f.TracesToLogsStability()
 | |
| 		case xpipeline.SignalProfiles:
 | |
| 			fprof, ok := f.(xconnector.Factory)
 | |
| 			if !ok {
 | |
| 				return component.StabilityLevelUndefined
 | |
| 			}
 | |
| 			return fprof.TracesToProfilesStability()
 | |
| 		}
 | |
| 	case pipeline.SignalMetrics:
 | |
| 		switch recType {
 | |
| 		case pipeline.SignalTraces:
 | |
| 			return f.MetricsToTracesStability()
 | |
| 		case pipeline.SignalMetrics:
 | |
| 			return f.MetricsToMetricsStability()
 | |
| 		case pipeline.SignalLogs:
 | |
| 			return f.MetricsToLogsStability()
 | |
| 		case xpipeline.SignalProfiles:
 | |
| 			fprof, ok := f.(xconnector.Factory)
 | |
| 			if !ok {
 | |
| 				return component.StabilityLevelUndefined
 | |
| 			}
 | |
| 			return fprof.MetricsToProfilesStability()
 | |
| 		}
 | |
| 	case pipeline.SignalLogs:
 | |
| 		switch recType {
 | |
| 		case pipeline.SignalTraces:
 | |
| 			return f.LogsToTracesStability()
 | |
| 		case pipeline.SignalMetrics:
 | |
| 			return f.LogsToMetricsStability()
 | |
| 		case pipeline.SignalLogs:
 | |
| 			return f.LogsToLogsStability()
 | |
| 		case xpipeline.SignalProfiles:
 | |
| 			fprof, ok := f.(xconnector.Factory)
 | |
| 			if !ok {
 | |
| 				return component.StabilityLevelUndefined
 | |
| 			}
 | |
| 			return fprof.LogsToProfilesStability()
 | |
| 		}
 | |
| 	case xpipeline.SignalProfiles:
 | |
| 		fprof, ok := f.(xconnector.Factory)
 | |
| 		if !ok {
 | |
| 			return component.StabilityLevelUndefined
 | |
| 		}
 | |
| 		switch recType {
 | |
| 		case pipeline.SignalTraces:
 | |
| 			return fprof.ProfilesToTracesStability()
 | |
| 		case pipeline.SignalMetrics:
 | |
| 			return fprof.ProfilesToMetricsStability()
 | |
| 		case pipeline.SignalLogs:
 | |
| 			return fprof.ProfilesToLogsStability()
 | |
| 		case xpipeline.SignalProfiles:
 | |
| 			return fprof.ProfilesToProfilesStability()
 | |
| 		}
 | |
| 	}
 | |
| 	return component.StabilityLevelUndefined
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	_ component.Host                   = (*HostWrapper)(nil)
 | |
| 	_ componentstatus.Reporter         = (*HostWrapper)(nil)
 | |
| 	_ hostcapabilities.ExposeExporters = (*HostWrapper)(nil)
 | |
| )
 | |
| 
 | |
| type HostWrapper struct {
 | |
| 	*Host
 | |
| 	InstanceID *componentstatus.InstanceID
 | |
| }
 | |
| 
 | |
| func (host *HostWrapper) Report(event *componentstatus.Event) {
 | |
| 	host.Reporter.ReportStatus(host.InstanceID, event)
 | |
| }
 |