633 lines
22 KiB
Go
633 lines
22 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
// Package graph contains the internal graph representation of the pipelines.
|
|
//
|
|
// [Build] is the constructor for a [Graph] object. The method calls out to helpers that transform the graph from a config
|
|
// to a DAG of components. The configuration undergoes additional validation here as well, and is used to instantiate
|
|
// the components of the pipeline.
|
|
//
|
|
// [Graph.StartAll] starts all components in each pipeline.
|
|
//
|
|
// [Graph.ShutdownAll] stops all components in each pipeline.
|
|
package graph // import "go.opentelemetry.io/collector/service/internal/graph"
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"go.uber.org/multierr"
|
|
"go.uber.org/zap"
|
|
"gonum.org/v1/gonum/graph"
|
|
"gonum.org/v1/gonum/graph/simple"
|
|
"gonum.org/v1/gonum/graph/topo"
|
|
|
|
"go.opentelemetry.io/collector/component"
|
|
"go.opentelemetry.io/collector/component/componentstatus"
|
|
"go.opentelemetry.io/collector/connector"
|
|
"go.opentelemetry.io/collector/connector/xconnector"
|
|
"go.opentelemetry.io/collector/consumer"
|
|
"go.opentelemetry.io/collector/consumer/xconsumer"
|
|
"go.opentelemetry.io/collector/internal/fanoutconsumer"
|
|
"go.opentelemetry.io/collector/pipeline"
|
|
"go.opentelemetry.io/collector/pipeline/xpipeline"
|
|
"go.opentelemetry.io/collector/service/hostcapabilities"
|
|
"go.opentelemetry.io/collector/service/internal/builders"
|
|
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
|
|
"go.opentelemetry.io/collector/service/internal/status"
|
|
"go.opentelemetry.io/collector/service/pipelines"
|
|
)
|
|
|
|
// Settings holds configuration for building builtPipelines.
|
|
type Settings struct {
|
|
Telemetry component.TelemetrySettings
|
|
BuildInfo component.BuildInfo
|
|
|
|
ReceiverBuilder *builders.ReceiverBuilder
|
|
ProcessorBuilder *builders.ProcessorBuilder
|
|
ExporterBuilder *builders.ExporterBuilder
|
|
ConnectorBuilder *builders.ConnectorBuilder
|
|
|
|
// PipelineConfigs is a map of component.ID to PipelineConfig.
|
|
PipelineConfigs pipelines.Config
|
|
|
|
ReportStatus status.ServiceStatusFunc
|
|
}
|
|
|
|
type Graph struct {
|
|
// All component instances represented as nodes, with directed edges indicating data flow.
|
|
componentGraph *simple.DirectedGraph
|
|
|
|
// Keep track of how nodes relate to pipelines, so we can declare edges in the graph.
|
|
pipelines map[pipeline.ID]*pipelineNodes
|
|
|
|
// Keep track of status source per node
|
|
instanceIDs map[int64]*componentstatus.InstanceID
|
|
|
|
telemetry component.TelemetrySettings
|
|
}
|
|
|
|
// Build builds a full pipeline graph.
|
|
// Build also validates the configuration of the pipelines and does the actual initialization of each Component in the Graph.
|
|
func Build(ctx context.Context, set Settings) (*Graph, error) {
|
|
pipelines := &Graph{
|
|
componentGraph: simple.NewDirectedGraph(),
|
|
pipelines: make(map[pipeline.ID]*pipelineNodes, len(set.PipelineConfigs)),
|
|
instanceIDs: make(map[int64]*componentstatus.InstanceID),
|
|
telemetry: set.Telemetry,
|
|
}
|
|
for pipelineID := range set.PipelineConfigs {
|
|
pipelines.pipelines[pipelineID] = &pipelineNodes{
|
|
receivers: make(map[int64]graph.Node),
|
|
exporters: make(map[int64]graph.Node),
|
|
}
|
|
}
|
|
if err := pipelines.createNodes(set); err != nil {
|
|
return nil, err
|
|
}
|
|
pipelines.createEdges()
|
|
err := pipelines.buildComponents(ctx, set)
|
|
return pipelines, err
|
|
}
|
|
|
|
// Creates a node for each instance of a component and adds it to the graph.
|
|
// Validates that connectors are configured to export and receive correctly.
|
|
func (g *Graph) createNodes(set Settings) error {
|
|
// Build a list of all connectors for easy reference.
|
|
connectors := make(map[component.ID]struct{})
|
|
|
|
// Keep track of connectors and where they are used. (map[connectorID][]pipelineID).
|
|
connectorsAsExporter := make(map[component.ID][]pipeline.ID)
|
|
connectorsAsReceiver := make(map[component.ID][]pipeline.ID)
|
|
|
|
// Build each pipelineNodes struct for each pipeline by parsing the pipelineCfg.
|
|
// Also populates the connectors, connectorsAsExporter and connectorsAsReceiver maps.
|
|
for pipelineID, pipelineCfg := range set.PipelineConfigs {
|
|
pipe := g.pipelines[pipelineID]
|
|
for _, recvID := range pipelineCfg.Receivers {
|
|
// Checks if this receiver is a connector or a regular receiver.
|
|
if set.ConnectorBuilder.IsConfigured(recvID) {
|
|
connectors[recvID] = struct{}{}
|
|
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
|
|
continue
|
|
}
|
|
rcvrNode := g.createReceiver(pipelineID, recvID)
|
|
pipe.receivers[rcvrNode.ID()] = rcvrNode
|
|
}
|
|
|
|
pipe.capabilitiesNode = newCapabilitiesNode(pipelineID)
|
|
|
|
for _, procID := range pipelineCfg.Processors {
|
|
procNode := g.createProcessor(pipelineID, procID)
|
|
pipe.processors = append(pipe.processors, procNode)
|
|
}
|
|
|
|
pipe.fanOutNode = newFanOutNode(pipelineID)
|
|
|
|
for _, exprID := range pipelineCfg.Exporters {
|
|
if set.ConnectorBuilder.IsConfigured(exprID) {
|
|
connectors[exprID] = struct{}{}
|
|
connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID)
|
|
continue
|
|
}
|
|
expNode := g.createExporter(pipelineID, exprID)
|
|
pipe.exporters[expNode.ID()] = expNode
|
|
}
|
|
}
|
|
|
|
for connID := range connectors {
|
|
factory := set.ConnectorBuilder.Factory(connID.Type())
|
|
if factory == nil {
|
|
return fmt.Errorf("connector factory not available for: %q", connID.Type())
|
|
}
|
|
connFactory := factory.(connector.Factory)
|
|
|
|
expTypes := make(map[pipeline.Signal]bool)
|
|
for _, pipelineID := range connectorsAsExporter[connID] {
|
|
// The presence of each key indicates how the connector is used as an exporter.
|
|
// The value is initially set to false. Later we will set the value to true *if* we
|
|
// confirm that there is a supported corresponding use as a receiver.
|
|
expTypes[pipelineID.Signal()] = false
|
|
}
|
|
recTypes := make(map[pipeline.Signal]bool)
|
|
for _, pipelineID := range connectorsAsReceiver[connID] {
|
|
// The presence of each key indicates how the connector is used as a receiver.
|
|
// The value is initially set to false. Later we will set the value to true *if* we
|
|
// confirm that there is a supported corresponding use as an exporter.
|
|
recTypes[pipelineID.Signal()] = false
|
|
}
|
|
|
|
for expType := range expTypes {
|
|
for recType := range recTypes {
|
|
// Typechecks the connector's receiving and exporting datatypes.
|
|
if connectorStability(connFactory, expType, recType) != component.StabilityLevelUndefined {
|
|
expTypes[expType] = true
|
|
recTypes[recType] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
for expType, supportedUse := range expTypes {
|
|
if supportedUse {
|
|
continue
|
|
}
|
|
return fmt.Errorf("connector %q used as exporter in %v pipeline but not used in any supported receiver pipeline", connID, formatPipelineNamesWithSignal(connectorsAsExporter[connID], expType))
|
|
}
|
|
for recType, supportedUse := range recTypes {
|
|
if supportedUse {
|
|
continue
|
|
}
|
|
return fmt.Errorf("connector %q used as receiver in %v pipeline but not used in any supported exporter pipeline", connID, formatPipelineNamesWithSignal(connectorsAsReceiver[connID], recType))
|
|
}
|
|
|
|
for _, eID := range connectorsAsExporter[connID] {
|
|
for _, rID := range connectorsAsReceiver[connID] {
|
|
if connectorStability(connFactory, eID.Signal(), rID.Signal()) == component.StabilityLevelUndefined {
|
|
// Connector is not supported for this combination, but we know it is used correctly elsewhere
|
|
continue
|
|
}
|
|
connNode := g.createConnector(eID, rID, connID)
|
|
|
|
g.pipelines[eID].exporters[connNode.ID()] = connNode
|
|
g.pipelines[rID].receivers[connNode.ID()] = connNode
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// formatPipelineNamesWithSignal formats pipeline name with signal as "signal[/name]" format.
|
|
func formatPipelineNamesWithSignal(pipelineIDs []pipeline.ID, signal pipeline.Signal) []string {
|
|
var formatted []string
|
|
for _, pid := range pipelineIDs {
|
|
if pid.Signal() == signal {
|
|
formatted = append(formatted, pid.String())
|
|
}
|
|
}
|
|
return formatted
|
|
}
|
|
|
|
func (g *Graph) createReceiver(pipelineID pipeline.ID, recvID component.ID) *receiverNode {
|
|
rcvrNode := newReceiverNode(pipelineID.Signal(), recvID)
|
|
if node := g.componentGraph.Node(rcvrNode.ID()); node != nil {
|
|
instanceID := g.instanceIDs[node.ID()]
|
|
g.instanceIDs[node.ID()] = instanceID.WithPipelines(pipelineID)
|
|
return node.(*receiverNode)
|
|
}
|
|
g.componentGraph.AddNode(rcvrNode)
|
|
g.instanceIDs[rcvrNode.ID()] = componentstatus.NewInstanceID(
|
|
recvID, component.KindReceiver, pipelineID,
|
|
)
|
|
return rcvrNode
|
|
}
|
|
|
|
func (g *Graph) createProcessor(pipelineID pipeline.ID, procID component.ID) *processorNode {
|
|
procNode := newProcessorNode(pipelineID, procID)
|
|
g.componentGraph.AddNode(procNode)
|
|
g.instanceIDs[procNode.ID()] = componentstatus.NewInstanceID(
|
|
procID, component.KindProcessor, pipelineID,
|
|
)
|
|
return procNode
|
|
}
|
|
|
|
func (g *Graph) createExporter(pipelineID pipeline.ID, exprID component.ID) *exporterNode {
|
|
expNode := newExporterNode(pipelineID.Signal(), exprID)
|
|
if node := g.componentGraph.Node(expNode.ID()); node != nil {
|
|
instanceID := g.instanceIDs[expNode.ID()]
|
|
g.instanceIDs[expNode.ID()] = instanceID.WithPipelines(pipelineID)
|
|
return node.(*exporterNode)
|
|
}
|
|
g.componentGraph.AddNode(expNode)
|
|
g.instanceIDs[expNode.ID()] = componentstatus.NewInstanceID(
|
|
expNode.componentID, component.KindExporter, pipelineID,
|
|
)
|
|
return expNode
|
|
}
|
|
|
|
func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID pipeline.ID, connID component.ID) *connectorNode {
|
|
connNode := newConnectorNode(exprPipelineID.Signal(), rcvrPipelineID.Signal(), connID)
|
|
if node := g.componentGraph.Node(connNode.ID()); node != nil {
|
|
instanceID := g.instanceIDs[connNode.ID()]
|
|
g.instanceIDs[connNode.ID()] = instanceID.WithPipelines(exprPipelineID, rcvrPipelineID)
|
|
return node.(*connectorNode)
|
|
}
|
|
g.componentGraph.AddNode(connNode)
|
|
g.instanceIDs[connNode.ID()] = componentstatus.NewInstanceID(
|
|
connNode.componentID, component.KindConnector, exprPipelineID, rcvrPipelineID,
|
|
)
|
|
return connNode
|
|
}
|
|
|
|
// Iterates through the pipelines and creates edges between components.
|
|
func (g *Graph) createEdges() {
|
|
for _, pg := range g.pipelines {
|
|
// Draw edges from each receiver to the capability node.
|
|
for _, receiver := range pg.receivers {
|
|
g.componentGraph.SetEdge(g.componentGraph.NewEdge(receiver, pg.capabilitiesNode))
|
|
}
|
|
|
|
// Iterates through processors, chaining them together. starts with the capabilities node.
|
|
var from, to graph.Node
|
|
from = pg.capabilitiesNode
|
|
for _, processor := range pg.processors {
|
|
to = processor
|
|
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))
|
|
from = processor
|
|
}
|
|
// Always inserts a fanout node before any exporters. If there is only one
|
|
// exporter, the fanout node is still created and acts as a noop.
|
|
to = pg.fanOutNode
|
|
g.componentGraph.SetEdge(g.componentGraph.NewEdge(from, to))
|
|
|
|
for _, exporter := range pg.exporters {
|
|
g.componentGraph.SetEdge(g.componentGraph.NewEdge(pg.fanOutNode, exporter))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Uses the already built graph g to instantiate the actual components for each component of each pipeline.
|
|
// Handles calling the factories for each component - and hooking up each component to the next.
|
|
// Also calculates whether each pipeline mutates data so the receiver can know whether it needs to clone the data.
|
|
func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
|
|
nodes, err := topo.Sort(g.componentGraph)
|
|
if err != nil {
|
|
return cycleErr(err, topo.DirectedCyclesIn(g.componentGraph))
|
|
}
|
|
|
|
for i := len(nodes) - 1; i >= 0; i-- {
|
|
node := nodes[i]
|
|
|
|
switch n := node.(type) {
|
|
case *receiverNode:
|
|
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID()))
|
|
case *processorNode:
|
|
// nextConsumers is guaranteed to be length 1. Either it is the next processor or it is the fanout node for the exporters.
|
|
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0])
|
|
case *exporterNode:
|
|
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ExporterBuilder)
|
|
case *connectorNode:
|
|
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ConnectorBuilder, g.nextConsumers(n.ID()))
|
|
case *capabilitiesNode:
|
|
capability := consumer.Capabilities{
|
|
// The fanOutNode represents the aggregate capabilities of the exporters in the pipeline.
|
|
MutatesData: g.pipelines[n.pipelineID].fanOutNode.getConsumer().Capabilities().MutatesData,
|
|
}
|
|
for _, proc := range g.pipelines[n.pipelineID].processors {
|
|
capability.MutatesData = capability.MutatesData || proc.(*processorNode).getConsumer().Capabilities().MutatesData
|
|
}
|
|
next := g.nextConsumers(n.ID())[0]
|
|
switch n.pipelineID.Signal() {
|
|
case pipeline.SignalTraces:
|
|
cc := capabilityconsumer.NewTraces(next.(consumer.Traces), capability)
|
|
n.baseConsumer = cc
|
|
n.ConsumeTracesFunc = cc.ConsumeTraces
|
|
case pipeline.SignalMetrics:
|
|
cc := capabilityconsumer.NewMetrics(next.(consumer.Metrics), capability)
|
|
n.baseConsumer = cc
|
|
n.ConsumeMetricsFunc = cc.ConsumeMetrics
|
|
case pipeline.SignalLogs:
|
|
cc := capabilityconsumer.NewLogs(next.(consumer.Logs), capability)
|
|
n.baseConsumer = cc
|
|
n.ConsumeLogsFunc = cc.ConsumeLogs
|
|
case xpipeline.SignalProfiles:
|
|
cc := capabilityconsumer.NewProfiles(next.(xconsumer.Profiles), capability)
|
|
n.baseConsumer = cc
|
|
n.ConsumeProfilesFunc = cc.ConsumeProfiles
|
|
}
|
|
case *fanOutNode:
|
|
nexts := g.nextConsumers(n.ID())
|
|
switch n.pipelineID.Signal() {
|
|
case pipeline.SignalTraces:
|
|
consumers := make([]consumer.Traces, 0, len(nexts))
|
|
for _, next := range nexts {
|
|
consumers = append(consumers, next.(consumer.Traces))
|
|
}
|
|
n.baseConsumer = fanoutconsumer.NewTraces(consumers)
|
|
case pipeline.SignalMetrics:
|
|
consumers := make([]consumer.Metrics, 0, len(nexts))
|
|
for _, next := range nexts {
|
|
consumers = append(consumers, next.(consumer.Metrics))
|
|
}
|
|
n.baseConsumer = fanoutconsumer.NewMetrics(consumers)
|
|
case pipeline.SignalLogs:
|
|
consumers := make([]consumer.Logs, 0, len(nexts))
|
|
for _, next := range nexts {
|
|
consumers = append(consumers, next.(consumer.Logs))
|
|
}
|
|
n.baseConsumer = fanoutconsumer.NewLogs(consumers)
|
|
case xpipeline.SignalProfiles:
|
|
consumers := make([]xconsumer.Profiles, 0, len(nexts))
|
|
for _, next := range nexts {
|
|
consumers = append(consumers, next.(xconsumer.Profiles))
|
|
}
|
|
n.baseConsumer = fanoutconsumer.NewProfiles(consumers)
|
|
}
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Find all nodes
|
|
func (g *Graph) nextConsumers(nodeID int64) []baseConsumer {
|
|
nextNodes := g.componentGraph.From(nodeID)
|
|
nexts := make([]baseConsumer, 0, nextNodes.Len())
|
|
for nextNodes.Next() {
|
|
nexts = append(nexts, nextNodes.Node().(consumerNode).getConsumer())
|
|
}
|
|
return nexts
|
|
}
|
|
|
|
// A node-based representation of a pipeline configuration.
|
|
type pipelineNodes struct {
|
|
// Use map to assist with deduplication of connector instances.
|
|
receivers map[int64]graph.Node
|
|
|
|
// The node to which receivers emit. Passes through to processors.
|
|
// Easily accessible as the first node in a pipeline.
|
|
*capabilitiesNode
|
|
|
|
// The order of processors is very important. Therefore use a slice for processors.
|
|
processors []graph.Node
|
|
|
|
// Emits to exporters.
|
|
*fanOutNode
|
|
|
|
// Use map to assist with deduplication of connector instances.
|
|
exporters map[int64]graph.Node
|
|
}
|
|
|
|
func (g *Graph) StartAll(ctx context.Context, host *Host) error {
|
|
if host == nil {
|
|
return errors.New("host cannot be nil")
|
|
}
|
|
|
|
nodes, err := topo.Sort(g.componentGraph)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Start in reverse topological order so that downstream components
|
|
// are started before upstream components. This ensures that each
|
|
// component's consumer is ready to consume.
|
|
for i := len(nodes) - 1; i >= 0; i-- {
|
|
node := nodes[i]
|
|
comp, ok := node.(component.Component)
|
|
|
|
if !ok {
|
|
// Skip capabilities/fanout nodes
|
|
continue
|
|
}
|
|
|
|
instanceID := g.instanceIDs[node.ID()]
|
|
host.Reporter.ReportStatus(
|
|
instanceID,
|
|
componentstatus.NewEvent(componentstatus.StatusStarting),
|
|
)
|
|
|
|
if compErr := comp.Start(ctx, &HostWrapper{Host: host, InstanceID: instanceID}); compErr != nil {
|
|
host.Reporter.ReportStatus(
|
|
instanceID,
|
|
componentstatus.NewPermanentErrorEvent(compErr),
|
|
)
|
|
// We log with zap.AddStacktrace(zap.DPanicLevel) to avoid adding the stack trace to the error log
|
|
g.telemetry.Logger.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).
|
|
Error("Failed to start component",
|
|
zap.Error(compErr),
|
|
zap.String("type", instanceID.Kind().String()),
|
|
zap.String("id", instanceID.ComponentID().String()),
|
|
)
|
|
return fmt.Errorf("failed to start %q %s: %w", instanceID.ComponentID().String(), strings.ToLower(instanceID.Kind().String()), compErr)
|
|
}
|
|
|
|
host.Reporter.ReportOKIfStarting(instanceID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (g *Graph) ShutdownAll(ctx context.Context, reporter status.Reporter) error {
|
|
nodes, err := topo.Sort(g.componentGraph)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Stop in topological order so that upstream components
|
|
// are stopped before downstream components. This ensures
|
|
// that each component has a chance to drain to its consumer
|
|
// before the consumer is stopped.
|
|
var errs error
|
|
for i := 0; i < len(nodes); i++ {
|
|
node := nodes[i]
|
|
comp, ok := node.(component.Component)
|
|
|
|
if !ok {
|
|
// Skip capabilities/fanout nodes
|
|
continue
|
|
}
|
|
|
|
instanceID := g.instanceIDs[node.ID()]
|
|
reporter.ReportStatus(
|
|
instanceID,
|
|
componentstatus.NewEvent(componentstatus.StatusStopping),
|
|
)
|
|
|
|
if compErr := comp.Shutdown(ctx); compErr != nil {
|
|
errs = multierr.Append(errs, compErr)
|
|
reporter.ReportStatus(
|
|
instanceID,
|
|
componentstatus.NewPermanentErrorEvent(compErr),
|
|
)
|
|
continue
|
|
}
|
|
|
|
reporter.ReportStatus(
|
|
instanceID,
|
|
componentstatus.NewEvent(componentstatus.StatusStopped),
|
|
)
|
|
}
|
|
return errs
|
|
}
|
|
|
|
func (g *Graph) GetExporters() map[pipeline.Signal]map[component.ID]component.Component {
|
|
exportersMap := make(map[pipeline.Signal]map[component.ID]component.Component)
|
|
exportersMap[pipeline.SignalTraces] = make(map[component.ID]component.Component)
|
|
exportersMap[pipeline.SignalMetrics] = make(map[component.ID]component.Component)
|
|
exportersMap[pipeline.SignalLogs] = make(map[component.ID]component.Component)
|
|
exportersMap[xpipeline.SignalProfiles] = make(map[component.ID]component.Component)
|
|
|
|
for _, pg := range g.pipelines {
|
|
for _, expNode := range pg.exporters {
|
|
// Skip connectors, otherwise individual components can introduce cycles
|
|
if expNode, ok := g.componentGraph.Node(expNode.ID()).(*exporterNode); ok {
|
|
exportersMap[expNode.pipelineType][expNode.componentID] = expNode.Component
|
|
}
|
|
}
|
|
}
|
|
return exportersMap
|
|
}
|
|
|
|
func cycleErr(err error, cycles [][]graph.Node) error {
|
|
var topoErr topo.Unorderable
|
|
if !errors.As(err, &topoErr) || len(cycles) == 0 || len(cycles[0]) == 0 {
|
|
return err
|
|
}
|
|
|
|
// There may be multiple cycles, but report only the first one.
|
|
cycle := cycles[0]
|
|
|
|
// The last node is a duplicate of the first node.
|
|
// Remove it because we may start from a different node.
|
|
cycle = cycle[:len(cycle)-1]
|
|
|
|
// A cycle always contains a connector. For the sake of consistent
|
|
// error messages report the cycle starting from a connector.
|
|
for i := 0; i < len(cycle); i++ {
|
|
if _, ok := cycle[i].(*connectorNode); ok {
|
|
cycle = append(cycle[i:], cycle[:i]...)
|
|
break
|
|
}
|
|
}
|
|
|
|
// Repeat the first node at the end to clarify the cycle
|
|
cycle = append(cycle, cycle[0])
|
|
|
|
// Build the error message
|
|
componentDetails := make([]string, 0, len(cycle))
|
|
for _, node := range cycle {
|
|
switch n := node.(type) {
|
|
case *processorNode:
|
|
componentDetails = append(componentDetails, fmt.Sprintf("processor %q in pipeline %q", n.componentID, n.pipelineID.String()))
|
|
case *connectorNode:
|
|
componentDetails = append(componentDetails, fmt.Sprintf("connector %q (%s to %s)", n.componentID, n.exprPipelineType, n.rcvrPipelineType))
|
|
default:
|
|
continue // skip capabilities/fanout nodes
|
|
}
|
|
}
|
|
return fmt.Errorf("cycle detected: %s", strings.Join(componentDetails, " -> "))
|
|
}
|
|
|
|
func connectorStability(f connector.Factory, expType, recType pipeline.Signal) component.StabilityLevel {
|
|
switch expType {
|
|
case pipeline.SignalTraces:
|
|
switch recType {
|
|
case pipeline.SignalTraces:
|
|
return f.TracesToTracesStability()
|
|
case pipeline.SignalMetrics:
|
|
return f.TracesToMetricsStability()
|
|
case pipeline.SignalLogs:
|
|
return f.TracesToLogsStability()
|
|
case xpipeline.SignalProfiles:
|
|
fprof, ok := f.(xconnector.Factory)
|
|
if !ok {
|
|
return component.StabilityLevelUndefined
|
|
}
|
|
return fprof.TracesToProfilesStability()
|
|
}
|
|
case pipeline.SignalMetrics:
|
|
switch recType {
|
|
case pipeline.SignalTraces:
|
|
return f.MetricsToTracesStability()
|
|
case pipeline.SignalMetrics:
|
|
return f.MetricsToMetricsStability()
|
|
case pipeline.SignalLogs:
|
|
return f.MetricsToLogsStability()
|
|
case xpipeline.SignalProfiles:
|
|
fprof, ok := f.(xconnector.Factory)
|
|
if !ok {
|
|
return component.StabilityLevelUndefined
|
|
}
|
|
return fprof.MetricsToProfilesStability()
|
|
}
|
|
case pipeline.SignalLogs:
|
|
switch recType {
|
|
case pipeline.SignalTraces:
|
|
return f.LogsToTracesStability()
|
|
case pipeline.SignalMetrics:
|
|
return f.LogsToMetricsStability()
|
|
case pipeline.SignalLogs:
|
|
return f.LogsToLogsStability()
|
|
case xpipeline.SignalProfiles:
|
|
fprof, ok := f.(xconnector.Factory)
|
|
if !ok {
|
|
return component.StabilityLevelUndefined
|
|
}
|
|
return fprof.LogsToProfilesStability()
|
|
}
|
|
case xpipeline.SignalProfiles:
|
|
fprof, ok := f.(xconnector.Factory)
|
|
if !ok {
|
|
return component.StabilityLevelUndefined
|
|
}
|
|
switch recType {
|
|
case pipeline.SignalTraces:
|
|
return fprof.ProfilesToTracesStability()
|
|
case pipeline.SignalMetrics:
|
|
return fprof.ProfilesToMetricsStability()
|
|
case pipeline.SignalLogs:
|
|
return fprof.ProfilesToLogsStability()
|
|
case xpipeline.SignalProfiles:
|
|
return fprof.ProfilesToProfilesStability()
|
|
}
|
|
}
|
|
return component.StabilityLevelUndefined
|
|
}
|
|
|
|
var (
|
|
_ component.Host = (*HostWrapper)(nil)
|
|
_ componentstatus.Reporter = (*HostWrapper)(nil)
|
|
_ hostcapabilities.ExposeExporters = (*HostWrapper)(nil)
|
|
)
|
|
|
|
type HostWrapper struct {
|
|
*Host
|
|
InstanceID *componentstatus.InstanceID
|
|
}
|
|
|
|
func (host *HostWrapper) Report(event *componentstatus.Event) {
|
|
host.Reporter.ReportStatus(host.InstanceID, event)
|
|
}
|