opentelemetry-collector/service/internal/graph/graph.go

633 lines
22 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package graph contains the internal graph representation of the pipelines.
//
// [Build] is the constructor for a [Graph] object. The method calls out to helpers that transform the graph from a config
// to a DAG of components. The configuration undergoes additional validation here as well, and is used to instantiate
// the components of the pipeline.
//
// [Graph.StartAll] starts all components in each pipeline.
//
// [Graph.ShutdownAll] stops all components in each pipeline.
package graph // import "go.opentelemetry.io/collector/service/internal/graph"
import (
"context"
"errors"
"fmt"
"strings"
"go.uber.org/multierr"
"go.uber.org/zap"
"gonum.org/v1/gonum/graph"
"gonum.org/v1/gonum/graph/simple"
"gonum.org/v1/gonum/graph/topo"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/connector/xconnector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/pipeline/xpipeline"
"go.opentelemetry.io/collector/service/hostcapabilities"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/status"
"go.opentelemetry.io/collector/service/pipelines"
)
// Settings holds configuration for building builtPipelines.
type Settings struct {
Telemetry component.TelemetrySettings
BuildInfo component.BuildInfo
ReceiverBuilder *builders.ReceiverBuilder
ProcessorBuilder *builders.ProcessorBuilder
ExporterBuilder *builders.ExporterBuilder
ConnectorBuilder *builders.ConnectorBuilder
// PipelineConfigs is a map of component.ID to PipelineConfig.
PipelineConfigs pipelines.Config
ReportStatus status.ServiceStatusFunc
}
type Graph struct {
// All component instances represented as nodes, with directed edges indicating data flow.
componentGraph *simple.DirectedGraph
// Keep track of how nodes relate to pipelines, so we can declare edges in the graph.
pipelines map[pipeline.ID]*pipelineNodes
// Keep track of status source per node
instanceIDs map[int64]*componentstatus.InstanceID
telemetry component.TelemetrySettings
}
// Build builds a full pipeline graph.
// Build also validates the configuration of the pipelines and does the actual initialization of each Component in the Graph.
func Build(ctx context.Context, set Settings) (*Graph, error) {
pipelines := &Graph{
componentGraph: simple.NewDirectedGraph(),
pipelines: make(map[pipeline.ID]*pipelineNodes, len(set.PipelineConfigs)),
instanceIDs: make(map[int64]*componentstatus.InstanceID),
telemetry: set.Telemetry,
}
for pipelineID := range set.PipelineConfigs {
pipelines.pipelines[pipelineID] = &pipelineNodes{
receivers: make(map[int64]graph.Node),
exporters: make(map[int64]graph.Node),
}
}
if err := pipelines.createNodes(set); err != nil {
return nil, err
}
pipelines.createEdges()
err := pipelines.buildComponents(ctx, set)
return pipelines, err
}
// Creates a node for each instance of a component and adds it to the graph.
// Validates that connectors are configured to export and receive correctly.
func (g *Graph) createNodes(set Settings) error {
// Build a list of all connectors for easy reference.
connectors := make(map[component.ID]struct{})
// Keep track of connectors and where they are used. (map[connectorID][]pipelineID).
connectorsAsExporter := make(map[component.ID][]pipeline.ID)
connectorsAsReceiver := make(map[component.ID][]pipeline.ID)
// Build each pipelineNodes struct for each pipeline by parsing the pipelineCfg.
// Also populates the connectors, connectorsAsExporter and connectorsAsReceiver maps.
for pipelineID, pipelineCfg := range set.PipelineConfigs {
pipe := g.pipelines[pipelineID]
for _, recvID := range pipelineCfg.Receivers {
// Checks if this receiver is a connector or a regular receiver.
if set.ConnectorBuilder.IsConfigured(recvID) {
connectors[recvID] = struct{}{}
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
continue
}
rcvrNode := g.createReceiver(pipelineID, recvID)
pipe.receivers[rcvrNode.ID()] = rcvrNode
}
pipe.capabilitiesNode = newCapabilitiesNode(pipelineID)
for _, procID := range pipelineCfg.Processors {
procNode := g.createProcessor(pipelineID, procID)
pipe.processors = append(pipe.processors, procNode)
}
pipe.fanOutNode = newFanOutNode(pipelineID)
for _, exprID := range pipelineCfg.Exporters {
if set.ConnectorBuilder.IsConfigured(exprID) {
connectors[exprID] = struct{}{}
connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID)
continue
}
expNode := g.createExporter(pipelineID, exprID)
pipe.exporters[expNode.ID()] = expNode
}
}
for connID := range connectors {
factory := set.ConnectorBuilder.Factory(connID.Type())
if factory == nil {
return fmt.Errorf("connector factory not available for: %q", connID.Type())
}
connFactory := factory.(connector.Factory)
expTypes := make(map[pipeline.Signal]bool)
for _, pipelineID := range connectorsAsExporter[connID] {
// The presence of each key indicates how the connector is used as an exporter.
// The value is initially set to false. Later we will set the value to true *if* we
// confirm that there is a supported corresponding use as a receiver.
expTypes[pipelineID.Signal()] = false
}
recTypes := make(map[pipeline.Signal]bool)
for _, pipelineID := range connectorsAsReceiver[connID] {
// The presence of each key indicates how the connector is used as a receiver.
// The value is initially set to false. Later we will set the value to true *if* we
// confirm that there is a supported corresponding use as an exporter.
recTypes[pipelineID.Signal()] = false
}
for expType := range expTypes {
for recType := range recTypes {
// Typechecks the connector's receiving and exporting datatypes.
if connectorStability(connFactory, expType, recType) != component.StabilityLevelUndefined {
expTypes[expType] = true
recTypes[recType] = true
}
}
}
for expType, supportedUse := range expTypes {
if supportedUse {
continue
}
return fmt.Errorf("connector %q used as exporter in %v pipeline but not used in any supported receiver pipeline", connID, formatPipelineNamesWithSignal(connectorsAsExporter[connID], expType))
}
for recType, supportedUse := range recTypes {
if supportedUse {
continue
}
return fmt.Errorf("connector %q used as receiver in %v pipeline but not used in any supported exporter pipeline", connID, formatPipelineNamesWithSignal(connectorsAsReceiver[connID], recType))
}
for _, eID := range connectorsAsExporter[connID] {
for _, rID := range connectorsAsReceiver[connID] {
if connectorStability(connFactory, eID.Signal(), rID.Signal()) == component.StabilityLevelUndefined {
// Connector is not supported for this combination, but we know it is used correctly elsewhere
continue
}
connNode := g.createConnector(eID, rID, connID)
g.pipelines[eID].exporters[connNode.ID()] = connNode
g.pipelines[rID].receivers[connNode.ID()] = connNode
}
}
}
return nil
}
// formatPipelineNamesWithSignal formats pipeline name with signal as "signal[/name]" format.
func formatPipelineNamesWithSignal(pipelineIDs []pipeline.ID, signal pipeline.Signal) []string {
var formatted []string
for _, pid := range pipelineIDs {
if pid.Signal() == signal {
formatted = append(formatted, pid.String())
}
}
return formatted
}
func (g *Graph) createReceiver(pipelineID pipeline.ID, recvID component.ID) *receiverNode {
rcvrNode := newReceiverNode(pipelineID.Signal(), recvID)
if node := g.componentGraph.Node(rcvrNode.ID()); node != nil {
instanceID := g.instanceIDs[node.ID()]
g.instanceIDs[node.ID()] = instanceID.WithPipelines(pipelineID)
return node.(*receiverNode)
}
g.componentGraph.AddNode(rcvrNode)
g.instanceIDs[rcvrNode.ID()] = componentstatus.NewInstanceID(
recvID, component.KindReceiver, pipelineID,
)
return rcvrNode
}
func (g *Graph) createProcessor(pipelineID pipeline.ID, procID component.ID) *processorNode {
procNode := newProcessorNode(pipelineID, procID)
g.componentGraph.AddNode(procNode)
g.instanceIDs[procNode.ID()] = componentstatus.NewInstanceID(
procID, component.KindProcessor, pipelineID,
)
return procNode
}
func (g *Graph) createExporter(pipelineID pipeline.ID, exprID component.ID) *exporterNode {
expNode := newExporterNode(pipelineID.Signal(), exprID)
if node := g.componentGraph.Node(expNode.ID()); node != nil {
instanceID := g.instanceIDs[expNode.ID()]
g.instanceIDs[expNode.ID()] = instanceID.WithPipelines(pipelineID)
return node.(*exporterNode)
}
g.componentGraph.AddNode(expNode)
g.instanceIDs[expNode.ID()] = componentstatus.NewInstanceID(
expNode.componentID, component.KindExporter, pipelineID,
)
return expNode
}
func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID pipeline.ID, connID component.ID) *connectorNode {
connNode := newConnectorNode(exprPipelineID.Signal(), rcvrPipelineID.Signal(), connID)
if node := g.componentGraph.Node(connNode.ID()); node != nil {
instanceID := g.instanceIDs[connNode.ID()]
g.instanceIDs[connNode.ID()] = instanceID.WithPipelines(exprPipelineID, rcvrPipelineID)
return node.(*connectorNode)
}
g.componentGraph.AddNode(connNode)
g.instanceIDs[connNode.ID()] = componentstatus.NewInstanceID(
connNode.componentID, component.KindConnector, exprPipelineID, rcvrPipelineID,
)
return connNode
}
// Iterates through the pipelines and creates edges between components.
func (g *Graph) createEdges() {
for _, pg := range g.pipelines {
// Draw edges from each receiver to the capability node.
for _, receiver := range pg.receivers {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, pg.capabilitiesNode))
}
// Iterates through processors, chaining them together. starts with the capabilities node.
var from, to graph.Node
from = pg.capabilitiesNode
for _, processor := range pg.processors {
to = processor
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))
from = processor
}
// Always inserts a fanout node before any exporters. If there is only one
// exporter, the fanout node is still created and acts as a noop.
to = pg.fanOutNode
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))
for _, exporter := range pg.exporters {
g.componentGraph.SetEdge(g.componentGraph.NewEdge(pg.fanOutNode, exporter))
}
}
}
// Uses the already built graph g to instantiate the actual components for each component of each pipeline.
// Handles calling the factories for each component - and hooking up each component to the next.
// Also calculates whether each pipeline mutates data so the receiver can know whether it needs to clone the data.
func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
return cycleErr(err, topo.DirectedCyclesIn(g.componentGraph))
}
for i := len(nodes) - 1; i >= 0; i-- {
node := nodes[i]
switch n := node.(type) {
case *receiverNode:
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID()))
case *processorNode:
// nextConsumers is guaranteed to be length 1. Either it is the next processor or it is the fanout node for the exporters.
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0])
case *exporterNode:
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ExporterBuilder)
case *connectorNode:
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ConnectorBuilder, g.nextConsumers(n.ID()))
case *capabilitiesNode:
capability := consumer.Capabilities{
// The fanOutNode represents the aggregate capabilities of the exporters in the pipeline.
MutatesData: g.pipelines[n.pipelineID].fanOutNode.getConsumer().Capabilities().MutatesData,
}
for _, proc := range g.pipelines[n.pipelineID].processors {
capability.MutatesData = capability.MutatesData || proc.(*processorNode).getConsumer().Capabilities().MutatesData
}
next := g.nextConsumers(n.ID())[0]
switch n.pipelineID.Signal() {
case pipeline.SignalTraces:
cc := capabilityconsumer.NewTraces(next.(consumer.Traces), capability)
n.baseConsumer = cc
n.ConsumeTracesFunc = cc.ConsumeTraces
case pipeline.SignalMetrics:
cc := capabilityconsumer.NewMetrics(next.(consumer.Metrics), capability)
n.baseConsumer = cc
n.ConsumeMetricsFunc = cc.ConsumeMetrics
case pipeline.SignalLogs:
cc := capabilityconsumer.NewLogs(next.(consumer.Logs), capability)
n.baseConsumer = cc
n.ConsumeLogsFunc = cc.ConsumeLogs
case xpipeline.SignalProfiles:
cc := capabilityconsumer.NewProfiles(next.(xconsumer.Profiles), capability)
n.baseConsumer = cc
n.ConsumeProfilesFunc = cc.ConsumeProfiles
}
case *fanOutNode:
nexts := g.nextConsumers(n.ID())
switch n.pipelineID.Signal() {
case pipeline.SignalTraces:
consumers := make([]consumer.Traces, 0, len(nexts))
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Traces))
}
n.baseConsumer = fanoutconsumer.NewTraces(consumers)
case pipeline.SignalMetrics:
consumers := make([]consumer.Metrics, 0, len(nexts))
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Metrics))
}
n.baseConsumer = fanoutconsumer.NewMetrics(consumers)
case pipeline.SignalLogs:
consumers := make([]consumer.Logs, 0, len(nexts))
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Logs))
}
n.baseConsumer = fanoutconsumer.NewLogs(consumers)
case xpipeline.SignalProfiles:
consumers := make([]xconsumer.Profiles, 0, len(nexts))
for _, next := range nexts {
consumers = append(consumers, next.(xconsumer.Profiles))
}
n.baseConsumer = fanoutconsumer.NewProfiles(consumers)
}
}
if err != nil {
return err
}
}
return nil
}
// Find all nodes
func (g *Graph) nextConsumers(nodeID int64) []baseConsumer {
nextNodes := g.componentGraph.From(nodeID)
nexts := make([]baseConsumer, 0, nextNodes.Len())
for nextNodes.Next() {
nexts = append(nexts, nextNodes.Node().(consumerNode).getConsumer())
}
return nexts
}
// A node-based representation of a pipeline configuration.
type pipelineNodes struct {
// Use map to assist with deduplication of connector instances.
receivers map[int64]graph.Node
// The node to which receivers emit. Passes through to processors.
// Easily accessible as the first node in a pipeline.
*capabilitiesNode
// The order of processors is very important. Therefore use a slice for processors.
processors []graph.Node
// Emits to exporters.
*fanOutNode
// Use map to assist with deduplication of connector instances.
exporters map[int64]graph.Node
}
func (g *Graph) StartAll(ctx context.Context, host *Host) error {
if host == nil {
return errors.New("host cannot be nil")
}
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
return err
}
// Start in reverse topological order so that downstream components
// are started before upstream components. This ensures that each
// component's consumer is ready to consume.
for i := len(nodes) - 1; i >= 0; i-- {
node := nodes[i]
comp, ok := node.(component.Component)
if !ok {
// Skip capabilities/fanout nodes
continue
}
instanceID := g.instanceIDs[node.ID()]
host.Reporter.ReportStatus(
instanceID,
componentstatus.NewEvent(componentstatus.StatusStarting),
)
if compErr := comp.Start(ctx, &HostWrapper{Host: host, InstanceID: instanceID}); compErr != nil {
host.Reporter.ReportStatus(
instanceID,
componentstatus.NewPermanentErrorEvent(compErr),
)
// We log with zap.AddStacktrace(zap.DPanicLevel) to avoid adding the stack trace to the error log
g.telemetry.Logger.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).
Error("Failed to start component",
zap.Error(compErr),
zap.String("type", instanceID.Kind().String()),
zap.String("id", instanceID.ComponentID().String()),
)
return fmt.Errorf("failed to start %q %s: %w", instanceID.ComponentID().String(), strings.ToLower(instanceID.Kind().String()), compErr)
}
host.Reporter.ReportOKIfStarting(instanceID)
}
return nil
}
func (g *Graph) ShutdownAll(ctx context.Context, reporter status.Reporter) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
return err
}
// Stop in topological order so that upstream components
// are stopped before downstream components. This ensures
// that each component has a chance to drain to its consumer
// before the consumer is stopped.
var errs error
for i := 0; i < len(nodes); i++ {
node := nodes[i]
comp, ok := node.(component.Component)
if !ok {
// Skip capabilities/fanout nodes
continue
}
instanceID := g.instanceIDs[node.ID()]
reporter.ReportStatus(
instanceID,
componentstatus.NewEvent(componentstatus.StatusStopping),
)
if compErr := comp.Shutdown(ctx); compErr != nil {
errs = multierr.Append(errs, compErr)
reporter.ReportStatus(
instanceID,
componentstatus.NewPermanentErrorEvent(compErr),
)
continue
}
reporter.ReportStatus(
instanceID,
componentstatus.NewEvent(componentstatus.StatusStopped),
)
}
return errs
}
func (g *Graph) GetExporters() map[pipeline.Signal]map[component.ID]component.Component {
exportersMap := make(map[pipeline.Signal]map[component.ID]component.Component)
exportersMap[pipeline.SignalTraces] = make(map[component.ID]component.Component)
exportersMap[pipeline.SignalMetrics] = make(map[component.ID]component.Component)
exportersMap[pipeline.SignalLogs] = make(map[component.ID]component.Component)
exportersMap[xpipeline.SignalProfiles] = make(map[component.ID]component.Component)
for _, pg := range g.pipelines {
for _, expNode := range pg.exporters {
// Skip connectors, otherwise individual components can introduce cycles
if expNode, ok := g.componentGraph.Node(expNode.ID()).(*exporterNode); ok {
exportersMap[expNode.pipelineType][expNode.componentID] = expNode.Component
}
}
}
return exportersMap
}
func cycleErr(err error, cycles [][]graph.Node) error {
var topoErr topo.Unorderable
if !errors.As(err, &topoErr) || len(cycles) == 0 || len(cycles[0]) == 0 {
return err
}
// There may be multiple cycles, but report only the first one.
cycle := cycles[0]
// The last node is a duplicate of the first node.
// Remove it because we may start from a different node.
cycle = cycle[:len(cycle)-1]
// A cycle always contains a connector. For the sake of consistent
// error messages report the cycle starting from a connector.
for i := 0; i < len(cycle); i++ {
if _, ok := cycle[i].(*connectorNode); ok {
cycle = append(cycle[i:], cycle[:i]...)
break
}
}
// Repeat the first node at the end to clarify the cycle
cycle = append(cycle, cycle[0])
// Build the error message
componentDetails := make([]string, 0, len(cycle))
for _, node := range cycle {
switch n := node.(type) {
case *processorNode:
componentDetails = append(componentDetails, fmt.Sprintf("processor %q in pipeline %q", n.componentID, n.pipelineID.String()))
case *connectorNode:
componentDetails = append(componentDetails, fmt.Sprintf("connector %q (%s to %s)", n.componentID, n.exprPipelineType, n.rcvrPipelineType))
default:
continue // skip capabilities/fanout nodes
}
}
return fmt.Errorf("cycle detected: %s", strings.Join(componentDetails, " -> "))
}
func connectorStability(f connector.Factory, expType, recType pipeline.Signal) component.StabilityLevel {
switch expType {
case pipeline.SignalTraces:
switch recType {
case pipeline.SignalTraces:
return f.TracesToTracesStability()
case pipeline.SignalMetrics:
return f.TracesToMetricsStability()
case pipeline.SignalLogs:
return f.TracesToLogsStability()
case xpipeline.SignalProfiles:
fprof, ok := f.(xconnector.Factory)
if !ok {
return component.StabilityLevelUndefined
}
return fprof.TracesToProfilesStability()
}
case pipeline.SignalMetrics:
switch recType {
case pipeline.SignalTraces:
return f.MetricsToTracesStability()
case pipeline.SignalMetrics:
return f.MetricsToMetricsStability()
case pipeline.SignalLogs:
return f.MetricsToLogsStability()
case xpipeline.SignalProfiles:
fprof, ok := f.(xconnector.Factory)
if !ok {
return component.StabilityLevelUndefined
}
return fprof.MetricsToProfilesStability()
}
case pipeline.SignalLogs:
switch recType {
case pipeline.SignalTraces:
return f.LogsToTracesStability()
case pipeline.SignalMetrics:
return f.LogsToMetricsStability()
case pipeline.SignalLogs:
return f.LogsToLogsStability()
case xpipeline.SignalProfiles:
fprof, ok := f.(xconnector.Factory)
if !ok {
return component.StabilityLevelUndefined
}
return fprof.LogsToProfilesStability()
}
case xpipeline.SignalProfiles:
fprof, ok := f.(xconnector.Factory)
if !ok {
return component.StabilityLevelUndefined
}
switch recType {
case pipeline.SignalTraces:
return fprof.ProfilesToTracesStability()
case pipeline.SignalMetrics:
return fprof.ProfilesToMetricsStability()
case pipeline.SignalLogs:
return fprof.ProfilesToLogsStability()
case xpipeline.SignalProfiles:
return fprof.ProfilesToProfilesStability()
}
}
return component.StabilityLevelUndefined
}
var (
_ component.Host = (*HostWrapper)(nil)
_ componentstatus.Reporter = (*HostWrapper)(nil)
_ hostcapabilities.ExposeExporters = (*HostWrapper)(nil)
)
type HostWrapper struct {
*Host
InstanceID *componentstatus.InstanceID
}
func (host *HostWrapper) Report(event *componentstatus.Event) {
host.Reporter.ReportStatus(host.InstanceID, event)
}