opentelemetry-collector/service/service_test.go

910 lines
30 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package service
import (
"bufio"
"context"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"testing"
"time"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
config "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/zpagesextension"
"go.opentelemetry.io/collector/internal/testutil"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/pipeline/xpipeline"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/promtest"
"go.opentelemetry.io/collector/service/pipelines"
"go.opentelemetry.io/collector/service/telemetry"
)
type labelState int
const (
labelNotPresent labelState = iota
labelSpecificValue
labelAnyValue
)
type labelValue struct {
label string
state labelState
}
type ownMetricsTestCase struct {
name string
userDefinedResource map[string]*string
expectedLabels map[string]labelValue
}
var (
testResourceAttrValue = "resource_attr_test_value"
testInstanceID = "test_instance_id"
testServiceVersion = "2022-05-20"
testServiceName = "test name"
)
// prometheusToOtelConv is used to check that the expected resource labels exist as
// part of the otel resource attributes.
var prometheusToOtelConv = map[string]string{
"service_instance_id": "service.instance.id",
"service_name": "service.name",
"service_version": "service.version",
}
const (
metricsVersion = "test version"
otelCommand = "otelcoltest"
)
func ownMetricsTestCases() []ownMetricsTestCase {
return []ownMetricsTestCase{
{
name: "no resource",
userDefinedResource: nil,
// All labels added to all collector metrics by default are listed below.
// These labels are hard coded here in order to avoid inadvertent changes:
// at this point changing labels should be treated as a breaking changing
// and requires a good justification. The reason is that changes to metric
// names or labels can break alerting, dashboards, etc that are used to
// monitor the Collector in production deployments.
expectedLabels: map[string]labelValue{
"service_instance_id": {state: labelAnyValue},
"service_name": {label: otelCommand, state: labelSpecificValue},
"service_version": {label: metricsVersion, state: labelSpecificValue},
},
},
{
name: "resource with custom attr",
userDefinedResource: map[string]*string{
"custom_resource_attr": &testResourceAttrValue,
},
expectedLabels: map[string]labelValue{
"service_instance_id": {state: labelAnyValue},
"service_name": {label: otelCommand, state: labelSpecificValue},
"service_version": {label: metricsVersion, state: labelSpecificValue},
"custom_resource_attr": {label: "resource_attr_test_value", state: labelSpecificValue},
},
},
{
name: "override service.name",
userDefinedResource: map[string]*string{
"service.name": &testServiceName,
},
expectedLabels: map[string]labelValue{
"service_instance_id": {state: labelAnyValue},
"service_name": {label: testServiceName, state: labelSpecificValue},
"service_version": {label: metricsVersion, state: labelSpecificValue},
},
},
{
name: "suppress service.name",
userDefinedResource: map[string]*string{
"service.name": nil,
},
expectedLabels: map[string]labelValue{
"service_instance_id": {state: labelAnyValue},
"service_name": {state: labelNotPresent},
"service_version": {label: metricsVersion, state: labelSpecificValue},
},
},
{
name: "override service.instance.id",
userDefinedResource: map[string]*string{
"service.instance.id": &testInstanceID,
},
expectedLabels: map[string]labelValue{
"service_instance_id": {label: "test_instance_id", state: labelSpecificValue},
"service_name": {label: otelCommand, state: labelSpecificValue},
"service_version": {label: metricsVersion, state: labelSpecificValue},
},
},
{
name: "suppress service.instance.id",
userDefinedResource: map[string]*string{
"service.instance.id": nil, // nil value in config is used to suppress attributes.
},
expectedLabels: map[string]labelValue{
"service_instance_id": {state: labelNotPresent},
"service_name": {label: otelCommand, state: labelSpecificValue},
"service_version": {label: metricsVersion, state: labelSpecificValue},
},
},
{
name: "override service.version",
userDefinedResource: map[string]*string{
"service.version": &testServiceVersion,
},
expectedLabels: map[string]labelValue{
"service_instance_id": {state: labelAnyValue},
"service_name": {label: otelCommand, state: labelSpecificValue},
"service_version": {label: "2022-05-20", state: labelSpecificValue},
},
},
{
name: "suppress service.version",
userDefinedResource: map[string]*string{
"service.version": nil, // nil value in config is used to suppress attributes.
},
expectedLabels: map[string]labelValue{
"service_instance_id": {state: labelAnyValue},
"service_name": {label: otelCommand, state: labelSpecificValue},
"service_version": {state: labelNotPresent},
},
},
}
}
var (
nopType = component.MustNewType("nop")
wrongType = component.MustNewType("wrong")
)
func TestServiceGetFactory(t *testing.T) {
set := newNopSettings()
srv, err := New(context.Background(), set, newNopConfig())
require.NoError(t, err)
assert.NoError(t, srv.Start(context.Background()))
t.Cleanup(func() {
assert.NoError(t, srv.Shutdown(context.Background()))
})
assert.Nil(t, srv.host.GetFactory(component.KindReceiver, wrongType))
assert.Equal(t, srv.host.Receivers.Factory(nopType), srv.host.GetFactory(component.KindReceiver, nopType))
assert.Nil(t, srv.host.GetFactory(component.KindProcessor, wrongType))
assert.Equal(t, srv.host.Processors.Factory(nopType), srv.host.GetFactory(component.KindProcessor, nopType))
assert.Nil(t, srv.host.GetFactory(component.KindExporter, wrongType))
assert.Equal(t, srv.host.Exporters.Factory(nopType), srv.host.GetFactory(component.KindExporter, nopType))
assert.Nil(t, srv.host.GetFactory(component.KindConnector, wrongType))
assert.Equal(t, srv.host.Connectors.Factory(nopType), srv.host.GetFactory(component.KindConnector, nopType))
assert.Nil(t, srv.host.GetFactory(component.KindExtension, wrongType))
assert.Equal(t, srv.host.Extensions.Factory(nopType), srv.host.GetFactory(component.KindExtension, nopType))
// Try retrieve non existing component.Kind.
assert.Nil(t, srv.host.GetFactory(component.Kind{}, nopType))
}
func TestServiceGetExtensions(t *testing.T) {
srv, err := New(context.Background(), newNopSettings(), newNopConfig())
require.NoError(t, err)
assert.NoError(t, srv.Start(context.Background()))
t.Cleanup(func() {
assert.NoError(t, srv.Shutdown(context.Background()))
})
extMap := srv.host.GetExtensions()
assert.Len(t, extMap, 1)
assert.Contains(t, extMap, component.NewID(nopType))
}
func TestServiceGetExporters(t *testing.T) {
srv, err := New(context.Background(), newNopSettings(), newNopConfig())
require.NoError(t, err)
assert.NoError(t, srv.Start(context.Background()))
t.Cleanup(func() {
assert.NoError(t, srv.Shutdown(context.Background()))
})
//nolint:staticcheck
expMap := srv.host.GetExporters()
v, ok := expMap[pipeline.SignalTraces]
assert.True(t, ok)
assert.NotNil(t, v)
assert.Len(t, expMap, 4)
assert.Len(t, expMap[pipeline.SignalTraces], 1)
assert.Contains(t, expMap[pipeline.SignalTraces], component.NewID(nopType))
assert.Len(t, expMap[pipeline.SignalMetrics], 1)
assert.Contains(t, expMap[pipeline.SignalMetrics], component.NewID(nopType))
assert.Len(t, expMap[pipeline.SignalLogs], 1)
assert.Contains(t, expMap[pipeline.SignalLogs], component.NewID(nopType))
assert.Len(t, expMap[xpipeline.SignalProfiles], 1)
assert.Contains(t, expMap[xpipeline.SignalProfiles], component.NewID(nopType))
}
// TestServiceTelemetryCleanupOnError tests that if newService errors due to an invalid config telemetry is cleaned up
// and another service with a valid config can be started right after.
func TestServiceTelemetryCleanupOnError(t *testing.T) {
invalidCfg := newNopConfig()
invalidCfg.Pipelines[pipeline.NewID(pipeline.SignalTraces)].Processors[0] = component.MustNewID("invalid")
// Create a service with an invalid config and expect an error
_, err := New(context.Background(), newNopSettings(), invalidCfg)
require.Error(t, err)
// Create a service with a valid config and expect no error
srv, err := New(context.Background(), newNopSettings(), newNopConfig())
require.NoError(t, err)
assert.NoError(t, srv.Shutdown(context.Background()))
}
func TestServiceTelemetry(t *testing.T) {
for _, tc := range ownMetricsTestCases() {
t.Run("ipv4_"+tc.name, func(t *testing.T) {
testCollectorStartHelperWithReaders(t, tc, "tcp4")
})
t.Run("ipv6_"+tc.name, func(t *testing.T) {
testCollectorStartHelperWithReaders(t, tc, "tcp6")
})
}
}
func testCollectorStartHelperWithReaders(t *testing.T, tc ownMetricsTestCase, network string) {
var once sync.Once
loggingHookCalled := false
hook := func(zapcore.Entry) error {
once.Do(func() {
loggingHookCalled = true
})
return nil
}
var (
metricsAddr *config.Prometheus
zpagesAddr string
)
switch network {
case "tcp", "tcp4":
metricsAddr = promtest.GetAvailableLocalAddressPrometheus(t)
zpagesAddr = testutil.GetAvailableLocalAddress(t)
case "tcp6":
metricsAddr = promtest.GetAvailableLocalIPv6AddressPrometheus(t)
zpagesAddr = testutil.GetAvailableLocalIPv6Address(t)
}
require.NotZero(t, metricsAddr, "network must be either of tcp, tcp4 or tcp6")
require.NotEmpty(t, zpagesAddr, "network must be either of tcp, tcp4 or tcp6")
set := newNopSettings()
set.BuildInfo = component.BuildInfo{Version: "test version", Command: otelCommand}
set.ExtensionsConfigs = map[component.ID]component.Config{
component.MustNewID("zpages"): &zpagesextension.Config{
ServerConfig: confighttp.ServerConfig{Endpoint: zpagesAddr},
},
}
set.ExtensionsFactories = map[component.Type]extension.Factory{component.MustNewType("zpages"): zpagesextension.NewFactory()}
set.LoggingOptions = []zap.Option{zap.Hooks(hook)}
cfg := newNopConfig()
cfg.Extensions = []component.ID{component.MustNewID("zpages")}
cfg.Telemetry.Metrics.Readers = []config.MetricReader{
{
Pull: &config.PullMetricReader{
Exporter: config.PullMetricExporter{
Prometheus: metricsAddr,
},
},
},
}
cfg.Telemetry.Resource = make(map[string]*string)
// Include resource attributes under the service::telemetry::resource key.
for k, v := range tc.userDefinedResource {
cfg.Telemetry.Resource[k] = v
}
// Create a service, check for metrics, shutdown and repeat to ensure that telemetry can be started/shutdown and started again.
for i := 0; i < 2; i++ {
srv, err := New(context.Background(), set, cfg)
require.NoError(t, err)
require.NoError(t, srv.Start(context.Background()))
// Sleep for 1 second to ensure the http server is started.
time.Sleep(1 * time.Second)
assert.True(t, loggingHookCalled)
assertResourceLabels(t, srv.telemetrySettings.Resource, tc.expectedLabels)
assertMetrics(t, fmt.Sprintf("%s:%d", *metricsAddr.Host, *metricsAddr.Port), tc.expectedLabels)
assertZPages(t, zpagesAddr)
require.NoError(t, srv.Shutdown(context.Background()))
}
}
// TestServiceTelemetryRestart tests that the service correctly restarts the telemetry server.
func TestServiceTelemetryRestart(t *testing.T) {
metricsAddr := promtest.GetAvailableLocalAddressPrometheus(t)
cfg := newNopConfig()
cfg.Telemetry.Metrics.Readers = []config.MetricReader{
{
Pull: &config.PullMetricReader{
Exporter: config.PullMetricExporter{
Prometheus: metricsAddr,
},
},
},
}
// Create a service
srvOne, err := New(context.Background(), newNopSettings(), cfg)
require.NoError(t, err)
// URL of the telemetry service metrics endpoint
telemetryURL := fmt.Sprintf("http://%s:%d/metrics", *metricsAddr.Host, *metricsAddr.Port)
// Start the service
require.NoError(t, srvOne.Start(context.Background()))
// check telemetry server to ensure we get a response
var resp *http.Response
//nolint:gosec
resp, err = http.Get(telemetryURL)
assert.NoError(t, err)
assert.NoError(t, resp.Body.Close())
assert.Equal(t, http.StatusOK, resp.StatusCode)
// Response body must be closed now instead of defer as the test
// restarts the server on the same port. Leaving response open
// leaks a goroutine.
resp.Body.Close()
// Shutdown the service
require.NoError(t, srvOne.Shutdown(context.Background()))
// Create a new service with the same telemetry
srvTwo, err := New(context.Background(), newNopSettings(), cfg)
require.NoError(t, err)
// Start the new service
require.NoError(t, srvTwo.Start(context.Background()))
// check telemetry server to ensure we get a response
require.Eventually(t,
func() bool {
//nolint:gosec
resp, err = http.Get(telemetryURL)
assert.NoError(t, resp.Body.Close())
return err == nil
},
500*time.Millisecond,
100*time.Millisecond,
"Must get a valid response from the service",
)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)
// Shutdown the new service
assert.NoError(t, srvTwo.Shutdown(context.Background()))
}
func TestExtensionNotificationFailure(t *testing.T) {
set := newNopSettings()
cfg := newNopConfig()
extName := component.MustNewType("configWatcher")
configWatcherExtensionFactory := newConfigWatcherExtensionFactory(extName)
set.ExtensionsConfigs = map[component.ID]component.Config{component.NewID(extName): configWatcherExtensionFactory.CreateDefaultConfig()}
set.ExtensionsFactories = map[component.Type]extension.Factory{extName: configWatcherExtensionFactory}
cfg.Extensions = []component.ID{component.NewID(extName)}
// Create a service
srv, err := New(context.Background(), set, cfg)
require.NoError(t, err)
// Start the service
require.Error(t, srv.Start(context.Background()))
// Shut down the service
require.NoError(t, srv.Shutdown(context.Background()))
}
func TestNilCollectorEffectiveConfig(t *testing.T) {
set := newNopSettings()
set.CollectorConf = nil
cfg := newNopConfig()
extName := component.MustNewType("configWatcher")
configWatcherExtensionFactory := newConfigWatcherExtensionFactory(extName)
set.ExtensionsConfigs = map[component.ID]component.Config{component.NewID(extName): configWatcherExtensionFactory.CreateDefaultConfig()}
set.ExtensionsFactories = map[component.Type]extension.Factory{extName: configWatcherExtensionFactory}
cfg.Extensions = []component.ID{component.NewID(extName)}
// Create a service
srv, err := New(context.Background(), set, cfg)
require.NoError(t, err)
// Start the service
require.NoError(t, srv.Start(context.Background()))
// Shut down the service
require.NoError(t, srv.Shutdown(context.Background()))
}
func TestServiceTelemetryLogger(t *testing.T) {
srv, err := New(context.Background(), newNopSettings(), newNopConfig())
require.NoError(t, err)
assert.NoError(t, srv.Start(context.Background()))
t.Cleanup(func() {
assert.NoError(t, srv.Shutdown(context.Background()))
})
assert.NotNil(t, srv.telemetrySettings.Logger)
}
func TestServiceFatalError(t *testing.T) {
set := newNopSettings()
set.AsyncErrorChannel = make(chan error)
srv, err := New(context.Background(), set, newNopConfig())
require.NoError(t, err)
assert.NoError(t, srv.Start(context.Background()))
t.Cleanup(func() {
assert.NoError(t, srv.Shutdown(context.Background()))
})
go func() {
ev := componentstatus.NewFatalErrorEvent(assert.AnError)
srv.host.NotifyComponentStatusChange(&componentstatus.InstanceID{}, ev)
}()
err = <-srv.host.AsyncErrorChannel
require.ErrorIs(t, err, assert.AnError)
}
func TestServiceInvalidTelemetryConfiguration(t *testing.T) {
tests := []struct {
name string
wantErr error
cfg telemetry.Config
}{
{
name: "log config with processors and invalid config",
cfg: telemetry.Config{
Logs: telemetry.LogsConfig{
Encoding: "console",
Processors: []config.LogRecordProcessor{
{
Batch: &config.BatchLogRecordProcessor{
Exporter: config.LogRecordExporter{
OTLP: &config.OTLP{},
},
},
},
},
},
},
wantErr: errors.New("no valid log exporter"),
},
}
for _, tt := range tests {
set := newNopSettings()
set.AsyncErrorChannel = make(chan error)
cfg := newNopConfig()
cfg.Telemetry = tt.cfg
_, err := New(context.Background(), set, cfg)
if tt.wantErr != nil {
require.ErrorContains(t, err, tt.wantErr.Error())
} else {
require.NoError(t, err)
}
}
}
func assertResourceLabels(t *testing.T, res pcommon.Resource, expectedLabels map[string]labelValue) {
for key, labelValue := range expectedLabels {
lookupKey, ok := prometheusToOtelConv[key]
if !ok {
lookupKey = key
}
value, ok := res.Attributes().Get(lookupKey)
switch labelValue.state {
case labelNotPresent:
assert.False(t, ok)
case labelAnyValue:
assert.True(t, ok)
default:
assert.Equal(t, labelValue.label, value.AsString())
}
}
}
func assertMetrics(t *testing.T, metricsAddr string, expectedLabels map[string]labelValue) {
client := &http.Client{}
resp, err := client.Get("http://" + metricsAddr + "/metrics")
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, resp.Body.Close())
})
reader := bufio.NewReader(resp.Body)
var parser expfmt.TextParser
parsed, err := parser.TextToMetricFamilies(reader)
require.NoError(t, err)
prefix := "otelcol"
expectedMetrics := map[string]bool{
"target_info": false,
"otelcol_process_memory_rss": false,
"otelcol_process_cpu_seconds": false,
"otelcol_process_runtime_total_sys_memory_bytes": false,
"otelcol_process_runtime_heap_alloc_bytes": false,
"otelcol_process_runtime_total_alloc_bytes": false,
"otelcol_process_uptime": false,
"promhttp_metric_handler_errors_total": false,
}
for metricName, metricFamily := range parsed {
if _, ok := expectedMetrics[metricName]; !ok {
require.True(t, ok, "unexpected metric: %s", metricName)
}
expectedMetrics[metricName] = true
if metricName == "promhttp_metric_handler_errors_total" {
continue
}
if metricName != "target_info" {
// require is used here so test fails with a single message.
require.True(
t,
strings.HasPrefix(metricName, prefix),
"expected prefix %q but string starts with %q",
prefix,
metricName[:len(prefix)+1]+"...")
}
for _, metric := range metricFamily.Metric {
labelMap := map[string]string{}
for _, labelPair := range metric.Label {
labelMap[*labelPair.Name] = *labelPair.Value
}
for k, v := range expectedLabels {
switch v.state {
case labelNotPresent:
_, present := labelMap[k]
assert.Falsef(t, present, "label %q must not be present", k)
case labelSpecificValue:
require.Equalf(t, v.label, labelMap[k], "mandatory label %q value mismatch", k)
case labelAnyValue:
assert.NotEmptyf(t, labelMap[k], "mandatory label %q not present", k)
}
}
}
}
for k, val := range expectedMetrics {
require.True(t, val, "missing metric: %s", k)
}
}
func assertZPages(t *testing.T, zpagesAddr string) {
paths := []string{
"/debug/tracez",
"/debug/pipelinez",
"/debug/servicez",
"/debug/extensionz",
}
testZPagePathFn := func(t *testing.T, path string) {
client := &http.Client{}
resp, err := client.Get("http://" + zpagesAddr + path)
require.NoError(t, err, "error retrieving zpage at %q", path)
assert.Equal(t, http.StatusOK, resp.StatusCode, "unsuccessful zpage %q GET", path)
assert.NoError(t, resp.Body.Close())
}
for _, path := range paths {
testZPagePathFn(t, path)
}
}
func newNopSettings() Settings {
receiversConfigs, receiversFactories := builders.NewNopReceiverConfigsAndFactories()
processorsConfigs, processorsFactories := builders.NewNopProcessorConfigsAndFactories()
connectorsConfigs, connectorsFactories := builders.NewNopConnectorConfigsAndFactories()
exportersConfigs, exportersFactories := builders.NewNopExporterConfigsAndFactories()
extensionsConfigs, extensionsFactories := builders.NewNopExtensionConfigsAndFactories()
return Settings{
BuildInfo: component.NewDefaultBuildInfo(),
CollectorConf: confmap.New(),
ReceiversConfigs: receiversConfigs,
ReceiversFactories: receiversFactories,
ProcessorsConfigs: processorsConfigs,
ProcessorsFactories: processorsFactories,
ExportersConfigs: exportersConfigs,
ExportersFactories: exportersFactories,
ConnectorsConfigs: connectorsConfigs,
ConnectorsFactories: connectorsFactories,
ExtensionsConfigs: extensionsConfigs,
ExtensionsFactories: extensionsFactories,
AsyncErrorChannel: make(chan error),
}
}
func newNopConfig() Config {
return newNopConfigPipelineConfigs(pipelines.Config{
pipeline.NewID(pipeline.SignalTraces): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{component.NewID(nopType)},
Exporters: []component.ID{component.NewID(nopType)},
},
pipeline.NewID(pipeline.SignalMetrics): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{component.NewID(nopType)},
Exporters: []component.ID{component.NewID(nopType)},
},
pipeline.NewID(pipeline.SignalLogs): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{component.NewID(nopType)},
Exporters: []component.ID{component.NewID(nopType)},
},
pipeline.NewID(xpipeline.SignalProfiles): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{component.NewID(nopType)},
Exporters: []component.ID{component.NewID(nopType)},
},
})
}
func newNopConfigPipelineConfigs(pipelineCfgs pipelines.Config) Config {
return Config{
Extensions: extensions.Config{component.NewID(nopType)},
Pipelines: pipelineCfgs,
Telemetry: telemetry.Config{
Logs: telemetry.LogsConfig{
Level: zapcore.InfoLevel,
Development: false,
Encoding: "console",
Sampling: &telemetry.LogsSamplingConfig{
Enabled: true,
Tick: 10 * time.Second,
Initial: 100,
Thereafter: 100,
},
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
DisableCaller: false,
DisableStacktrace: false,
InitialFields: map[string]any(nil),
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelBasic,
},
},
}
}
type configWatcherExtension struct{}
func (comp *configWatcherExtension) Start(context.Context, component.Host) error {
return nil
}
func (comp *configWatcherExtension) Shutdown(context.Context) error {
return nil
}
func (comp *configWatcherExtension) NotifyConfig(context.Context, *confmap.Conf) error {
return errors.New("Failed to resolve config")
}
func newConfigWatcherExtensionFactory(name component.Type) extension.Factory {
return extension.NewFactory(
name,
func() component.Config {
return &struct{}{}
},
func(context.Context, extension.Settings, component.Config) (extension.Extension, error) {
return &configWatcherExtension{}, nil
},
component.StabilityLevelDevelopment,
)
}
func newPtr[T int | string](str T) *T {
return &str
}
func TestValidateGraph(t *testing.T) {
testCases := map[string]struct {
connectorCfg map[component.ID]component.Config
receiverCfg map[component.ID]component.Config
exporterCfg map[component.ID]component.Config
pipelinesCfg pipelines.Config
expectedError string
}{
"Valid connector usage": {
connectorCfg: map[component.ID]component.Config{
component.NewIDWithName(nopType, "connector1"): &struct{}{},
},
receiverCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
exporterCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
pipelinesCfg: pipelines.Config{
pipeline.NewIDWithName(pipeline.SignalLogs, "in"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewIDWithName(nopType, "connector1")},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "out"): {
Receivers: []component.ID{component.NewIDWithName(nopType, "connector1")},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
},
expectedError: "",
},
"Valid without Connector": {
receiverCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
exporterCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
pipelinesCfg: pipelines.Config{
pipeline.NewIDWithName(pipeline.SignalLogs, "in"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "out"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
},
expectedError: "",
},
"Connector used as exporter but not as receiver": {
connectorCfg: map[component.ID]component.Config{
component.NewIDWithName(nopType, "connector1"): &struct{}{},
},
receiverCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
exporterCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
pipelinesCfg: pipelines.Config{
pipeline.NewIDWithName(pipeline.SignalLogs, "in1"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "in2"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewIDWithName(nopType, "connector1")},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "out"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
},
expectedError: `failed to build pipelines: connector "nop/connector1" used as exporter in [logs/in2] pipeline but not used in any supported receiver pipeline`,
},
"Connector used as receiver but not as exporter": {
connectorCfg: map[component.ID]component.Config{
component.NewIDWithName(nopType, "connector1"): &struct{}{},
},
receiverCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
exporterCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
pipelinesCfg: pipelines.Config{
pipeline.NewIDWithName(pipeline.SignalLogs, "in1"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "in2"): {
Receivers: []component.ID{component.NewIDWithName(nopType, "connector1")},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
pipeline.NewIDWithName(pipeline.SignalLogs, "out"): {
Receivers: []component.ID{component.NewID(nopType)},
Processors: []component.ID{},
Exporters: []component.ID{component.NewID(nopType)},
},
},
expectedError: `failed to build pipelines: connector "nop/connector1" used as receiver in [logs/in2] pipeline but not used in any supported exporter pipeline`,
},
"Connector creates direct cycle between pipelines": {
connectorCfg: map[component.ID]component.Config{
component.NewIDWithName(nopType, "forward"): &struct{}{},
},
receiverCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
exporterCfg: map[component.ID]component.Config{
component.NewID(nopType): &struct{}{},
},
pipelinesCfg: pipelines.Config{
pipeline.NewIDWithName(pipeline.SignalTraces, "in"): {
Receivers: []component.ID{component.NewIDWithName(nopType, "forward")},
Processors: []component.ID{},
Exporters: []component.ID{component.NewIDWithName(nopType, "forward")},
},
pipeline.NewIDWithName(pipeline.SignalTraces, "out"): {
Receivers: []component.ID{component.NewIDWithName(nopType, "forward")},
Processors: []component.ID{},
Exporters: []component.ID{component.NewIDWithName(nopType, "forward")},
},
},
expectedError: `failed to build pipelines: cycle detected: connector "nop/forward" (traces to traces) -> connector "nop/forward" (traces to traces)`,
},
}
_, connectorsFactories := builders.NewNopConnectorConfigsAndFactories()
_, receiversFactories := builders.NewNopReceiverConfigsAndFactories()
_, exportersFactories := builders.NewNopExporterConfigsAndFactories()
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
settings := Settings{
ConnectorsConfigs: tc.connectorCfg,
ConnectorsFactories: connectorsFactories,
ReceiversConfigs: tc.receiverCfg,
ReceiversFactories: receiversFactories,
ExportersConfigs: tc.exporterCfg,
ExportersFactories: exportersFactories,
}
cfg := Config{
Pipelines: tc.pipelinesCfg,
}
err := Validate(context.Background(), settings, cfg)
if tc.expectedError == "" {
require.NoError(t, err)
} else {
require.Error(t, err)
assert.Equal(t, tc.expectedError, err.Error())
}
})
}
}