diff --git a/config/example_factories.go b/config/example_factories.go index f996d89c9c..ad59c1531e 100644 --- a/config/example_factories.go +++ b/config/example_factories.go @@ -286,9 +286,18 @@ func (f *ExampleExporterFactory) CreateMetricsExporter(logger *zap.Logger, cfg c type ExampleExporterConsumer struct { Traces []consumerdata.TraceData Metrics []consumerdata.MetricsData + ExporterStarted bool ExporterShutdown bool } +// Start tells the exporter to start. The exporter may prepare for exporting +// by connecting to the endpoint. Host parameter can be used for communicating +// with the host after Start() has already returned. +func (exp *ExampleExporterConsumer) Start(host exporter.Host) error { + exp.ExporterStarted = true + return nil +} + // ConsumeTraceData receives consumerdata.TraceData for processing by the TraceConsumer. func (exp *ExampleExporterConsumer) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { exp.Traces = append(exp.Traces, td) diff --git a/exporter/exporter.go b/exporter/exporter.go index 32bf05a4c1..76d3cb9d49 100644 --- a/exporter/exporter.go +++ b/exporter/exporter.go @@ -19,18 +19,35 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer" ) -// TraceExporter composes TraceConsumer with some additional exporter-specific functions. -type TraceExporter interface { - consumer.TraceConsumer +// Host represents the entity where the exporter is being hosted. It is used to +// allow communication between the exporter and its host. +type Host interface { + // ReportFatalError is used to report to the host that the exporter encountered + // a fatal error (i.e.: an error that the instance can't recover from) after + // its start function has already returned. + ReportFatalError(err error) +} + +// Exporter defines functions that trace and metric exporters must implement. +type Exporter interface { + // Start tells the exporter to start. The exporter may prepare for exporting + // by connecting to the endpoint. Host parameter can be used for communicating + // with the host after Start() has already returned. If error is returned by + // Start() then the collector startup will be aborted. + Start(host Host) error // Shutdown is invoked during service shutdown. Shutdown() error } +// TraceExporter composes TraceConsumer with some additional exporter-specific functions. +type TraceExporter interface { + consumer.TraceConsumer + Exporter +} + // MetricsExporter composes MetricsConsumer with some additional exporter-specific functions. type MetricsExporter interface { consumer.MetricsConsumer - - // Shutdown is invoked during service shutdown. - Shutdown() error + Exporter } diff --git a/exporter/exporterhelper/metricshelper.go b/exporter/exporterhelper/metricshelper.go index 5dfe3535db..acd335faa0 100644 --- a/exporter/exporterhelper/metricshelper.go +++ b/exporter/exporterhelper/metricshelper.go @@ -37,6 +37,10 @@ type metricsExporter struct { var _ (exporter.MetricsExporter) = (*metricsExporter)(nil) +func (me *metricsExporter) Start(host exporter.Host) error { + return nil +} + func (me *metricsExporter) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { exporterCtx := observability.ContextWithExporterName(ctx, me.exporterFullName) _, err := me.pushMetricsData(exporterCtx, md) diff --git a/exporter/exporterhelper/tracehelper.go b/exporter/exporterhelper/tracehelper.go index 405a59de89..b379b9d9e7 100644 --- a/exporter/exporterhelper/tracehelper.go +++ b/exporter/exporterhelper/tracehelper.go @@ -37,6 +37,10 @@ type traceExporter struct { var _ (exporter.TraceExporter) = (*traceExporter)(nil) +func (te *traceExporter) Start(host exporter.Host) error { + return nil +} + func (te *traceExporter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { exporterCtx := observability.ContextWithExporterName(ctx, te.exporterFullName) _, err := te.pushTraceData(exporterCtx, td) diff --git a/exporter/exportertest/nop_exporter.go b/exporter/exportertest/nop_exporter.go index 2345d5c70d..72259e9c91 100644 --- a/exporter/exportertest/nop_exporter.go +++ b/exporter/exportertest/nop_exporter.go @@ -32,6 +32,10 @@ type nopExporter struct { var _ exporter.TraceExporter = (*nopExporter)(nil) var _ exporter.MetricsExporter = (*nopExporter)(nil) +func (ne *nopExporter) Start(host exporter.Host) error { + return nil +} + func (ne *nopExporter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { return ne.retError } diff --git a/exporter/exportertest/sink_exporter.go b/exporter/exportertest/sink_exporter.go index ed42b6447f..a3dcdfade7 100644 --- a/exporter/exportertest/sink_exporter.go +++ b/exporter/exportertest/sink_exporter.go @@ -30,6 +30,13 @@ type SinkTraceExporter struct { var _ exporter.TraceExporter = (*SinkTraceExporter)(nil) +// Start tells the exporter to start. The exporter may prepare for exporting +// by connecting to the endpoint. Host parameter can be used for communicating +// with the host after Start() has already returned. +func (ste *SinkTraceExporter) Start(host exporter.Host) error { + return nil +} + // ConsumeTraceData stores traces for tests. func (ste *SinkTraceExporter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { ste.mu.Lock() @@ -71,6 +78,13 @@ type SinkMetricsExporter struct { var _ exporter.MetricsExporter = (*SinkMetricsExporter)(nil) +// Start tells the exporter to start. The exporter may prepare for exporting +// by connecting to the endpoint. Host parameter can be used for communicating +// with the host after Start() has already returned. +func (sme *SinkMetricsExporter) Start(host exporter.Host) error { + return nil +} + // ConsumeMetricsData stores traces for tests. func (sme *SinkMetricsExporter) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { sme.mu.Lock() diff --git a/exporter/opencensusexporter/opencensus.go b/exporter/opencensusexporter/opencensus.go index 67c94caecb..035f0cf6e3 100644 --- a/exporter/opencensusexporter/opencensus.go +++ b/exporter/opencensusexporter/opencensus.go @@ -90,6 +90,10 @@ func createOCAgentExporter(logger *zap.Logger, config configmodels.Exporter, opt exportersChan := make(chan *ocagent.Exporter, numWorkers) for exporterIndex := 0; exporterIndex < numWorkers; exporterIndex++ { + // TODO: ocagent.NewExporter blocks for connection. Now that we have ability + // to report errors asynchronously using Host.ReportFatalError we can move this + // code to Start() and do it in background to avoid blocking Collector startup + // as we do now. exporter, serr := ocagent.NewExporter(opts...) if serr != nil { return nil, fmt.Errorf("cannot configure OpenCensus exporter: %v", serr) diff --git a/exporter/prometheusexporter/prometheus.go b/exporter/prometheusexporter/prometheus.go index de8b880949..cdb30cb668 100644 --- a/exporter/prometheusexporter/prometheus.go +++ b/exporter/prometheusexporter/prometheus.go @@ -24,6 +24,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer" "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + "github.com/open-telemetry/opentelemetry-collector/exporter" "github.com/open-telemetry/opentelemetry-collector/exporter/exporterhelper" ) @@ -37,6 +38,10 @@ type prometheusExporter struct { var _ consumer.MetricsConsumer = (*prometheusExporter)(nil) +func (pe *prometheusExporter) Start(host exporter.Host) error { + return nil +} + func (pe *prometheusExporter) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { for _, metric := range md.Metrics { _ = pe.exporter.ExportMetric(ctx, md.Node, md.Resource, metric) diff --git a/exporter/zipkinexporter/zipkin.go b/exporter/zipkinexporter/zipkin.go index 8f4fdec4bd..1de31c4ea9 100644 --- a/exporter/zipkinexporter/zipkin.go +++ b/exporter/zipkinexporter/zipkin.go @@ -29,6 +29,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/consumer/consumererror" + "github.com/open-telemetry/opentelemetry-collector/exporter" "github.com/open-telemetry/opentelemetry-collector/observability" tracetranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace" spandatatranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace/spandata" @@ -127,6 +128,10 @@ func (ze *zipkinExporter) Name() string { return ze.defaultServiceName } +func (ze *zipkinExporter) Start(host exporter.Host) error { + return nil +} + func (ze *zipkinExporter) Shutdown() error { ze.mu.Lock() defer ze.mu.Unlock() diff --git a/service/builder/exporters_builder.go b/service/builder/exporters_builder.go index 74e41995f9..a0e25de3a1 100644 --- a/service/builder/exporters_builder.go +++ b/service/builder/exporters_builder.go @@ -32,6 +32,26 @@ type builtExporter struct { me exporter.MetricsExporter } +// Start the exporter. +func (exp *builtExporter) Start(host exporter.Host) error { + var errors []error + if exp.te != nil { + err := exp.te.Start(host) + if err != nil { + errors = append(errors, err) + } + } + + if exp.me != nil { + err := exp.me.Start(host) + if err != nil { + errors = append(errors, err) + } + } + + return oterr.CombineErrors(errors) +} + // Shutdown the trace component and the metrics component of an exporter. func (exp *builtExporter) Shutdown() error { var errors []error @@ -53,6 +73,19 @@ func (exp *builtExporter) Shutdown() error { // Exporters is a map of exporters created from exporter configs. type Exporters map[configmodels.Exporter]*builtExporter +// StartAll starts all exporters. +func (exps Exporters) StartAll(logger *zap.Logger, host exporter.Host) error { + for cfg, exp := range exps { + logger.Info("Exporter is starting...", zap.String("exporter", cfg.Name())) + + if err := exp.Start(host); err != nil { + return err + } + logger.Info("Exporter started.", zap.String("exporter", cfg.Name())) + } + return nil +} + // ShutdownAll stops all exporters. func (exps Exporters) ShutdownAll() { for _, exp := range exps { diff --git a/service/builder/exporters_builder_test.go b/service/builder/exporters_builder_test.go index 51cd6f386b..0597b45255 100644 --- a/service/builder/exporters_builder_test.go +++ b/service/builder/exporters_builder_test.go @@ -25,6 +25,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/config/configgrpc" "github.com/open-telemetry/opentelemetry-collector/config/configmodels" "github.com/open-telemetry/opentelemetry-collector/exporter/opencensusexporter" + "github.com/open-telemetry/opentelemetry-collector/receiver/receivertest" ) func TestExportersBuilder_Build(t *testing.T) { @@ -69,8 +70,12 @@ func TestExportersBuilder_Build(t *testing.T) { assert.NotNil(t, e1.te) assert.Nil(t, e1.me) - // Ensure it can be stopped. + // Ensure it can be started. + mh := receivertest.NewMockHost() + err = exporters.StartAll(zap.NewNop(), mh) + assert.NoError(t, err) + // Ensure it can be stopped. if err = e1.Shutdown(); err != nil { // TODO Find a better way to handle this case // Since the endpoint of opencensus exporter doesn't actually exist, e1 may @@ -98,6 +103,26 @@ func TestExportersBuilder_Build(t *testing.T) { // TODO: once we have an exporter that supports metrics data type test it too. } +func TestExportersBuilder_StartAll(t *testing.T) { + exporters := make(Exporters) + expCfg := &configmodels.ExporterSettings{} + traceExporter := &config.ExampleExporterConsumer{} + metricExporter := &config.ExampleExporterConsumer{} + exporters[expCfg] = &builtExporter{ + te: traceExporter, + me: metricExporter, + } + assert.False(t, traceExporter.ExporterStarted) + assert.False(t, metricExporter.ExporterStarted) + + mh := receivertest.NewMockHost() + err := exporters.StartAll(zap.NewNop(), mh) + assert.NoError(t, err) + + assert.True(t, traceExporter.ExporterStarted) + assert.True(t, metricExporter.ExporterStarted) +} + func TestExportersBuilder_StopAll(t *testing.T) { exporters := make(Exporters) expCfg := &configmodels.ExporterSettings{} diff --git a/service/builder/receivers_builder.go b/service/builder/receivers_builder.go index 673893f7de..6abcea08cc 100644 --- a/service/builder/receivers_builder.go +++ b/service/builder/receivers_builder.go @@ -93,7 +93,7 @@ func (rcvs Receivers) StartAll(logger *zap.Logger, host receiver.Host) error { if err := rcv.Start(host); err != nil { return err } - logger.Info("Receiver is started.", zap.String("receiver", cfg.Name())) + logger.Info("Receiver started.", zap.String("receiver", cfg.Name())) } return nil } diff --git a/service/service.go b/service/service.go index 72c3d7825b..bc88d927d2 100644 --- a/service/service.go +++ b/service/service.go @@ -206,20 +206,25 @@ func (app *Application) setupPipelines() { var err error app.exporters, err = builder.NewExportersBuilder(app.logger, app.config, app.factories.Exporters).Build() if err != nil { - log.Fatalf("Cannot load configuration: %v", err) + log.Fatalf("Cannot build exporters: %v", err) + } + app.logger.Info("Starting exporters...") + err = app.exporters.StartAll(app.logger, app) + if err != nil { + log.Fatalf("Cannot start exporters: %v", err) } // Create pipelines and their processors and plug exporters to the // end of the pipelines. pipelines, err := builder.NewPipelinesBuilder(app.logger, app.config, app.exporters, app.factories.Processors).Build() if err != nil { - log.Fatalf("Cannot load configuration: %v", err) + log.Fatalf("Cannot build pipelines: %v", err) } // Create receivers and plug them into the start of the pipelines. app.builtReceivers, err = builder.NewReceiversBuilder(app.logger, app.config, pipelines, app.factories.Receivers).Build() if err != nil { - log.Fatalf("Cannot load configuration: %v", err) + log.Fatalf("Cannot build receivers: %v", err) } app.logger.Info("Starting receivers...")