Handle factories returning nil objects without error (#381)

The factories can be implemented by 3rd party and this gives a better error message instead of a crash.
This commit is contained in:
Paulo Janotti 2019-10-07 19:45:01 -07:00 committed by Tigran Najaryan
parent ecd2d5db38
commit 3a0aa77705
12 changed files with 493 additions and 26 deletions

View File

@ -17,6 +17,9 @@ package processortest
import (
"context"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/processor"
@ -51,3 +54,44 @@ func NewNopTraceProcessor(nextTraceProcessor consumer.TraceConsumer) consumer.Tr
func NewNopMetricsProcessor(nextMetricsProcessor consumer.MetricsConsumer) consumer.MetricsConsumer {
return &nopProcessor{nextMetricsProcessor: nextMetricsProcessor}
}
// NopProcessorFactory allows the creation of the no operation processor via
// config, so it can be used in tests that cannot create it directly.
type NopProcessorFactory struct{}
var _ processor.Factory = (*NopProcessorFactory)(nil)
// Type gets the type of the Processor created by this factory.
func (npf *NopProcessorFactory) Type() string {
return "nop"
}
// CreateDefaultConfig creates the default configuration for the Processor.
func (npf *NopProcessorFactory) CreateDefaultConfig() configmodels.Processor {
return &configmodels.ProcessorSettings{
TypeVal: npf.Type(),
NameVal: npf.Type(),
}
}
// CreateTraceProcessor creates a trace processor based on this config.
// If the processor type does not support tracing or if the config is not valid
// error will be returned instead.
func (npf *NopProcessorFactory) CreateTraceProcessor(
logger *zap.Logger,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (processor.TraceProcessor, error) {
return &nopProcessor{nextTraceProcessor: nextConsumer}, nil
}
// CreateMetricsProcessor creates a metrics processor based on this config.
// If the processor type does not support metrics or if the config is not valid
// error will be returned instead.
func (npf *NopProcessorFactory) CreateMetricsProcessor(
logger *zap.Logger,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.MetricsProcessor, error) {
return &nopProcessor{nextMetricsProcessor: nextConsumer}, nil
}

View File

@ -15,11 +15,12 @@ package processortest
import (
"context"
"reflect"
"testing"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
@ -31,14 +32,11 @@ func TestNopTraceProcessorNoErrors(t *testing.T) {
want := consumerdata.TraceData{
Spans: make([]*tracepb.Span, 7),
}
if err := ntp.ConsumeTraceData(context.Background(), want); err != nil {
t.Errorf("Wanted nil got error")
return
}
assert.NoError(t, ntp.ConsumeTraceData(context.Background(), want))
got := sink.AllTraces()[0]
if !reflect.DeepEqual(got, want) {
t.Errorf("Mismatches responses\nGot:\n\t%v\nWant:\n\t%v\n", got, want)
}
assert.Equal(t, want, got)
}
func TestNopMetricsProcessorNoErrors(t *testing.T) {
@ -47,12 +45,28 @@ func TestNopMetricsProcessorNoErrors(t *testing.T) {
want := consumerdata.MetricsData{
Metrics: make([]*metricspb.Metric, 7),
}
if err := nmp.ConsumeMetricsData(context.Background(), want); err != nil {
t.Errorf("Wanted nil got error")
return
}
assert.NoError(t, nmp.ConsumeMetricsData(context.Background(), want))
got := sink.AllMetrics()[0]
if !reflect.DeepEqual(got, want) {
t.Errorf("Mismatches responses\nGot:\n\t%v\nWant:\n\t%v\n", got, want)
}
assert.Equal(t, want, got)
}
func TestNopProcessorFactory(t *testing.T) {
f := &NopProcessorFactory{}
cfg := f.CreateDefaultConfig()
tp, err := f.CreateTraceProcessor(
zap.NewNop(),
new(exportertest.SinkTraceExporter),
cfg)
assert.NoError(t, err)
assert.NotNil(t, tp)
mp, err := f.CreateMetricsProcessor(
zap.NewNop(),
new(exportertest.SinkMetricsExporter),
cfg)
assert.NoError(t, err)
assert.NotNil(t, mp)
}

View File

@ -134,6 +134,7 @@ func (eb *ExportersBuilder) Build() (Exporters, error) {
if err != nil {
return nil, err
}
exporters[cfg] = exp
}
@ -206,6 +207,11 @@ func (eb *ExportersBuilder) buildExporter(
return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err)
}
// Check if the factory really created the exporter.
if te == nil {
return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name())
}
exporter.te = te
}
@ -220,6 +226,12 @@ func (eb *ExportersBuilder) buildExporter(
return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err)
}
// The factories can be implemented by third parties, check if they really
// created the exporter.
if me == nil {
return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name())
}
exporter.me = me
}

View File

@ -24,6 +24,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configgrpc"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/open-telemetry/opentelemetry-collector/exporter/opencensusexporter"
"github.com/open-telemetry/opentelemetry-collector/receiver/receivertest"
)
@ -139,3 +140,67 @@ func TestExportersBuilder_StopAll(t *testing.T) {
assert.True(t, traceExporter.ExporterShutdown)
assert.True(t, metricExporter.ExporterShutdown)
}
func TestExportersBuilder_ErrorOnNilExporter(t *testing.T) {
bf := &badExporterFactory{}
fm := map[string]exporter.Factory{
bf.Type(): bf,
}
pipelines := []*configmodels.Pipeline{
{
Name: "trace",
InputType: configmodels.TracesDataType,
Exporters: []string{bf.Type()},
},
{
Name: "metrics",
InputType: configmodels.MetricsDataType,
Exporters: []string{bf.Type()},
},
}
for _, pipeline := range pipelines {
t.Run(pipeline.Name, func(t *testing.T) {
cfg := &configmodels.Config{
Exporters: map[string]configmodels.Exporter{
bf.Type(): &configmodels.ExporterSettings{
TypeVal: bf.Type(),
},
},
Service: configmodels.Service{
Pipelines: map[string]*configmodels.Pipeline{
pipeline.Name: pipeline,
},
},
}
exporters, err := NewExportersBuilder(zap.NewNop(), cfg, fm).Build()
assert.Error(t, err)
assert.Zero(t, len(exporters))
})
}
}
// badExporterFactory is a factory that returns no error but returns a nil object.
type badExporterFactory struct{}
var _ exporter.Factory = (*badExporterFactory)(nil)
func (b *badExporterFactory) Type() string {
return "bf"
}
func (b *badExporterFactory) CreateDefaultConfig() configmodels.Exporter {
return &configmodels.ExporterSettings{}
}
func (b *badExporterFactory) CreateTraceExporter(logger *zap.Logger, cfg configmodels.Exporter) (exporter.TraceExporter, error) {
return nil, nil
}
func (b *badExporterFactory) CreateMetricsExporter(logger *zap.Logger, cfg configmodels.Exporter) (exporter.MetricsExporter, error) {
return nil, nil
}

View File

@ -130,6 +130,11 @@ func (pb *PipelinesBuilder) buildPipeline(
return nil, fmt.Errorf("error creating processor %q in pipeline %q: %v",
procName, pipelineCfg.Name, err)
}
// Check if the factory really created the processor.
if tc == nil && mc == nil {
return nil, fmt.Errorf("factory for %q produced a nil processor", procCfg.Name())
}
}
pb.logger.Info("Pipeline is enabled.", zap.String("pipelines", pipelineCfg.Name))

View File

@ -28,7 +28,9 @@ import (
"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/processor"
"github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor"
)
@ -176,3 +178,64 @@ func TestPipelinesBuilder_Error(t *testing.T) {
assert.NotNil(t, err)
}
func TestProcessorsBuilder_ErrorOnNilProcessor(t *testing.T) {
factories, err := config.ExampleComponents()
assert.Nil(t, err)
bf := &badProcessorFactory{}
factories.Processors[bf.Type()] = bf
cfg, err := config.LoadConfigFile(t, "testdata/bad_processor_factory.yaml", factories)
require.Nil(t, err)
allExporters, err := NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build()
assert.NoError(t, err)
// First test only trace receivers by removing the metrics pipeline.
metricsPipeline := cfg.Service.Pipelines["metrics"]
delete(cfg.Service.Pipelines, "metrics")
require.Equal(t, 1, len(cfg.Service.Pipelines))
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), cfg, allExporters, factories.Processors).Build()
assert.Error(t, err)
assert.Zero(t, len(pipelineProcessors))
// Now test the metric pipeline.
delete(cfg.Service.Pipelines, "traces")
cfg.Service.Pipelines["metrics"] = metricsPipeline
require.Equal(t, 1, len(cfg.Service.Pipelines))
pipelineProcessors, err = NewPipelinesBuilder(zap.NewNop(), cfg, allExporters, factories.Processors).Build()
assert.Error(t, err)
assert.Zero(t, len(pipelineProcessors))
}
// badProcessorFactory is a factory that returns no error but returns a nil object.
type badProcessorFactory struct{}
var _ processor.Factory = (*badProcessorFactory)(nil)
func (b *badProcessorFactory) Type() string {
return "bf"
}
func (b *badProcessorFactory) CreateDefaultConfig() configmodels.Processor {
return &configmodels.ProcessorSettings{}
}
func (b *badProcessorFactory) CreateTraceProcessor(
logger *zap.Logger,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (processor.TraceProcessor, error) {
return nil, nil
}
func (b *badProcessorFactory) CreateMetricsProcessor(
logger *zap.Logger,
consumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.MetricsProcessor, error) {
return nil, nil
}

View File

@ -192,9 +192,20 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines(
// Now create the receiver and tell it to send to the junction point.
rcv.trace, err = factory.CreateTraceReceiver(context.Background(), rb.logger, config, junction)
// Check if the factory really created the receiver.
if rcv.trace == nil {
return fmt.Errorf("factory for %q produced a nil receiver", config.Name())
}
case configmodels.MetricsDataType:
junction := buildFanoutMetricConsumer(builtPipelines)
rcv.metrics, err = factory.CreateMetricsReceiver(rb.logger, config, junction)
// The factories can be implemented by third parties, check if they really
// created the exporter.
if rcv.metrics == nil {
return fmt.Errorf("factory for %q produced a nil receiver", config.Name())
}
}
if err != nil {

View File

@ -18,18 +18,19 @@ import (
"context"
"testing"
"go.uber.org/zap"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/processortest"
"github.com/open-telemetry/opentelemetry-collector/receiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/receivertest"
)
@ -270,3 +271,75 @@ func TestReceiversBuilder_StopAll(t *testing.T) {
assert.Equal(t, true, receiver.TraceStopped)
assert.Equal(t, true, receiver.MetricsStopped)
}
func TestReceiversBuilder_ErrorOnNilReceiver(t *testing.T) {
factories, err := config.ExampleComponents()
assert.Nil(t, err)
npf := &processortest.NopProcessorFactory{}
factories.Processors[npf.Type()] = npf
bf := &badReceiverFactory{}
factories.Receivers[bf.Type()] = bf
cfg, err := config.LoadConfigFile(t, "testdata/bad_receiver_factory.yaml", factories)
require.Nil(t, err)
// Build the pipeline
allExporters, err := NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build()
assert.NoError(t, err)
pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), cfg, allExporters, factories.Processors).Build()
assert.NoError(t, err)
// First test only trace receivers by removing the metrics pipeline.
metricsPipeline := cfg.Service.Pipelines["metrics"]
delete(cfg.Service.Pipelines, "metrics")
require.Equal(t, 1, len(cfg.Service.Pipelines))
receivers, err := NewReceiversBuilder(zap.NewNop(), cfg, pipelineProcessors, factories.Receivers).Build()
assert.Error(t, err)
assert.Zero(t, len(receivers))
// Now test the metric pipeline.
delete(cfg.Service.Pipelines, "traces")
cfg.Service.Pipelines["metrics"] = metricsPipeline
require.Equal(t, 1, len(cfg.Service.Pipelines))
receivers, err = NewReceiversBuilder(zap.NewNop(), cfg, pipelineProcessors, factories.Receivers).Build()
assert.Error(t, err)
assert.Zero(t, len(receivers))
}
// badReceiverFactory is a factory that returns no error but returns a nil object.
type badReceiverFactory struct{}
var _ receiver.Factory = (*badReceiverFactory)(nil)
func (b *badReceiverFactory) Type() string {
return "bf"
}
func (b *badReceiverFactory) CreateDefaultConfig() configmodels.Receiver {
return &configmodels.ReceiverSettings{}
}
func (b *badReceiverFactory) CustomUnmarshaler() receiver.CustomUnmarshaler {
return nil
}
func (b *badReceiverFactory) CreateTraceReceiver(
ctx context.Context,
logger *zap.Logger,
cfg configmodels.Receiver,
nextConsumer consumer.TraceConsumer,
) (receiver.TraceReceiver, error) {
return nil, nil
}
func (b *badReceiverFactory) CreateMetricsReceiver(
logger *zap.Logger,
cfg configmodels.Receiver,
consumer consumer.MetricsConsumer,
) (receiver.MetricsReceiver, error) {
return nil, nil
}

View File

@ -0,0 +1,18 @@
receivers:
examplereceiver:
processors:
bf/traces: # this is the bad processor factory
bf/metrics:
exporters:
exampleexporter:
service:
pipelines:
traces:
receivers: [examplereceiver]
processors: [bf/traces]
exporters: [exampleexporter]
metrics:
receivers: [examplereceiver]
processors: [bf/metrics]
exporters: [exampleexporter]

View File

@ -0,0 +1,16 @@
receivers:
bf: # this is the bad receiver factory
processors:
nop:
exporters:
exampleexporter:
service:
pipelines:
traces:
receivers: [bf]
processors: [nop] # trace pipeline requires a processor
exporters: [exampleexporter]
metrics:
receivers: [bf]
exporters: [exampleexporter]

View File

@ -19,6 +19,7 @@ package service
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
@ -170,32 +171,43 @@ func (app *Application) setupConfigurationComponents() {
app.logger.Info("Applying configuration...")
app.setupExtensions()
if err := app.setupExtensions(); err != nil {
log.Fatalf("Cannot setup extensions: %v", err)
}
app.setupPipelines()
}
func (app *Application) setupExtensions() {
func (app *Application) setupExtensions() error {
for _, extName := range app.config.Service.Extensions {
extCfg, exists := app.config.Extensions[extName]
if !exists {
log.Fatalf("Cannot load configuration: extension %q is not configured", extName)
return fmt.Errorf("extension %q is not configured", extName)
}
factory, exists := app.factories.Extensions[extCfg.Type()]
if !exists {
log.Fatalf("Cannot load configuration: extension factory for type %q is not configured", extCfg.Type())
return fmt.Errorf("extension factory for type %q is not configured", extCfg.Type())
}
ext, err := factory.CreateExtension(app.logger, extCfg)
if err != nil {
log.Fatalf("Cannot load configuration: failed to create extension %q: %v", extName, err)
return fmt.Errorf("failed to create extension %q: %v", extName, err)
}
// Check if the factory really created the extension.
if ext == nil {
return fmt.Errorf("factory for %q produced a nil extension", extName)
}
if err := ext.Start(app); err != nil {
log.Fatalf("Cannot start extension %q: %v", extName, err)
return fmt.Errorf("error starting extension %q: %v", extName, err)
}
app.extensions = append(app.extensions, ext)
}
return nil
}
func (app *Application) setupPipelines() {

View File

@ -21,8 +21,12 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/defaults"
"github.com/open-telemetry/opentelemetry-collector/extension"
"github.com/open-telemetry/opentelemetry-collector/internal/testutils"
)
@ -69,3 +73,133 @@ func isAppAvailable(t *testing.T, healthCheckEndPoint string) bool {
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}
func TestApplication_setupExtensions(t *testing.T) {
exampleExtensionFactory := &config.ExampleExtensionFactory{}
exampleExtensionConfig := &config.ExampleExtension{
ExtensionSettings: configmodels.ExtensionSettings{
TypeVal: exampleExtensionFactory.Type(),
NameVal: exampleExtensionFactory.Type(),
},
}
badExtensionFactory := &badExtensionFactory{}
badExtensionFactoryConfig := &configmodels.ExtensionSettings{
TypeVal: "bf",
NameVal: "bf",
}
tests := []struct {
name string
factories config.Factories
config *configmodels.Config
wantErrMsg string
}{
{
name: "extension_not_configured",
config: &configmodels.Config{
Service: configmodels.Service{
Extensions: []string{
"myextension",
},
},
},
wantErrMsg: "extension \"myextension\" is not configured",
},
{
name: "missing_extension_factory",
config: &configmodels.Config{
Extensions: map[string]configmodels.Extension{
exampleExtensionFactory.Type(): exampleExtensionConfig,
},
Service: configmodels.Service{
Extensions: []string{
exampleExtensionFactory.Type(),
},
},
},
wantErrMsg: "extension factory for type \"exampleextension\" is not configured",
},
{
name: "error_on_create_extension",
factories: config.Factories{
Extensions: map[string]extension.Factory{
exampleExtensionFactory.Type(): exampleExtensionFactory,
},
},
config: &configmodels.Config{
Extensions: map[string]configmodels.Extension{
exampleExtensionFactory.Type(): exampleExtensionConfig,
},
Service: configmodels.Service{
Extensions: []string{
exampleExtensionFactory.Type(),
},
},
},
wantErrMsg: "failed to create extension \"exampleextension\": cannot create \"exampleextension\" extension type",
},
{
name: "bad_factory",
factories: config.Factories{
Extensions: map[string]extension.Factory{
badExtensionFactory.Type(): badExtensionFactory,
},
},
config: &configmodels.Config{
Extensions: map[string]configmodels.Extension{
badExtensionFactory.Type(): badExtensionFactoryConfig,
},
Service: configmodels.Service{
Extensions: []string{
badExtensionFactory.Type(),
},
},
},
wantErrMsg: "factory for \"bf\" produced a nil extension",
},
}
nopLogger := zap.NewNop()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
app := &Application{
logger: nopLogger,
factories: tt.factories,
config: tt.config,
}
err := app.setupExtensions()
if tt.wantErrMsg == "" {
assert.NoError(t, err)
assert.Equal(t, 1, len(app.extensions))
assert.NotNil(t, app.extensions[0])
} else {
assert.Error(t, err)
assert.Equal(t, tt.wantErrMsg, err.Error())
assert.Equal(t, 0, len(app.extensions))
}
})
}
}
// badExtensionFactory is a factory that returns no error but returns a nil object.
type badExtensionFactory struct{}
var _ extension.Factory = (*badExtensionFactory)(nil)
func (b badExtensionFactory) Type() string {
return "bf"
}
func (b badExtensionFactory) CreateDefaultConfig() configmodels.Extension {
return &configmodels.ExtensionSettings{}
}
func (b badExtensionFactory) CreateExtension(
logger *zap.Logger,
cfg configmodels.Extension,
) (extension.ServiceExtension, error) {
return nil, nil
}