Add Start() function to Exporter interface (#380)

This is done to have uniformness with Receivers and allow
exporters to communicate with the host (Collector) in the
future.

Implements https://github.com/open-telemetry/opentelemetry-collector/issues/373
This commit is contained in:
Tigran Najaryan 2019-10-07 13:28:06 -04:00 committed by GitHub
parent 6db8faefb1
commit b125d052db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 140 additions and 11 deletions

View File

@ -286,9 +286,18 @@ func (f *ExampleExporterFactory) CreateMetricsExporter(logger *zap.Logger, cfg c
type ExampleExporterConsumer struct { type ExampleExporterConsumer struct {
Traces []consumerdata.TraceData Traces []consumerdata.TraceData
Metrics []consumerdata.MetricsData Metrics []consumerdata.MetricsData
ExporterStarted bool
ExporterShutdown bool ExporterShutdown bool
} }
// Start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned.
func (exp *ExampleExporterConsumer) Start(host exporter.Host) error {
exp.ExporterStarted = true
return nil
}
// ConsumeTraceData receives consumerdata.TraceData for processing by the TraceConsumer. // ConsumeTraceData receives consumerdata.TraceData for processing by the TraceConsumer.
func (exp *ExampleExporterConsumer) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { func (exp *ExampleExporterConsumer) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
exp.Traces = append(exp.Traces, td) exp.Traces = append(exp.Traces, td)

View File

@ -19,18 +19,35 @@ import (
"github.com/open-telemetry/opentelemetry-collector/consumer" "github.com/open-telemetry/opentelemetry-collector/consumer"
) )
// TraceExporter composes TraceConsumer with some additional exporter-specific functions. // Host represents the entity where the exporter is being hosted. It is used to
type TraceExporter interface { // allow communication between the exporter and its host.
consumer.TraceConsumer type Host interface {
// ReportFatalError is used to report to the host that the exporter encountered
// a fatal error (i.e.: an error that the instance can't recover from) after
// its start function has already returned.
ReportFatalError(err error)
}
// Exporter defines functions that trace and metric exporters must implement.
type Exporter interface {
// Start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned. If error is returned by
// Start() then the collector startup will be aborted.
Start(host Host) error
// Shutdown is invoked during service shutdown. // Shutdown is invoked during service shutdown.
Shutdown() error Shutdown() error
} }
// TraceExporter composes TraceConsumer with some additional exporter-specific functions.
type TraceExporter interface {
consumer.TraceConsumer
Exporter
}
// MetricsExporter composes MetricsConsumer with some additional exporter-specific functions. // MetricsExporter composes MetricsConsumer with some additional exporter-specific functions.
type MetricsExporter interface { type MetricsExporter interface {
consumer.MetricsConsumer consumer.MetricsConsumer
Exporter
// Shutdown is invoked during service shutdown.
Shutdown() error
} }

View File

@ -37,6 +37,10 @@ type metricsExporter struct {
var _ (exporter.MetricsExporter) = (*metricsExporter)(nil) var _ (exporter.MetricsExporter) = (*metricsExporter)(nil)
func (me *metricsExporter) Start(host exporter.Host) error {
return nil
}
func (me *metricsExporter) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { func (me *metricsExporter) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
exporterCtx := observability.ContextWithExporterName(ctx, me.exporterFullName) exporterCtx := observability.ContextWithExporterName(ctx, me.exporterFullName)
_, err := me.pushMetricsData(exporterCtx, md) _, err := me.pushMetricsData(exporterCtx, md)

View File

@ -37,6 +37,10 @@ type traceExporter struct {
var _ (exporter.TraceExporter) = (*traceExporter)(nil) var _ (exporter.TraceExporter) = (*traceExporter)(nil)
func (te *traceExporter) Start(host exporter.Host) error {
return nil
}
func (te *traceExporter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { func (te *traceExporter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
exporterCtx := observability.ContextWithExporterName(ctx, te.exporterFullName) exporterCtx := observability.ContextWithExporterName(ctx, te.exporterFullName)
_, err := te.pushTraceData(exporterCtx, td) _, err := te.pushTraceData(exporterCtx, td)

View File

@ -32,6 +32,10 @@ type nopExporter struct {
var _ exporter.TraceExporter = (*nopExporter)(nil) var _ exporter.TraceExporter = (*nopExporter)(nil)
var _ exporter.MetricsExporter = (*nopExporter)(nil) var _ exporter.MetricsExporter = (*nopExporter)(nil)
func (ne *nopExporter) Start(host exporter.Host) error {
return nil
}
func (ne *nopExporter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { func (ne *nopExporter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
return ne.retError return ne.retError
} }

View File

@ -30,6 +30,13 @@ type SinkTraceExporter struct {
var _ exporter.TraceExporter = (*SinkTraceExporter)(nil) var _ exporter.TraceExporter = (*SinkTraceExporter)(nil)
// Start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned.
func (ste *SinkTraceExporter) Start(host exporter.Host) error {
return nil
}
// ConsumeTraceData stores traces for tests. // ConsumeTraceData stores traces for tests.
func (ste *SinkTraceExporter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { func (ste *SinkTraceExporter) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
ste.mu.Lock() ste.mu.Lock()
@ -71,6 +78,13 @@ type SinkMetricsExporter struct {
var _ exporter.MetricsExporter = (*SinkMetricsExporter)(nil) var _ exporter.MetricsExporter = (*SinkMetricsExporter)(nil)
// Start tells the exporter to start. The exporter may prepare for exporting
// by connecting to the endpoint. Host parameter can be used for communicating
// with the host after Start() has already returned.
func (sme *SinkMetricsExporter) Start(host exporter.Host) error {
return nil
}
// ConsumeMetricsData stores traces for tests. // ConsumeMetricsData stores traces for tests.
func (sme *SinkMetricsExporter) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { func (sme *SinkMetricsExporter) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
sme.mu.Lock() sme.mu.Lock()

View File

@ -90,6 +90,10 @@ func createOCAgentExporter(logger *zap.Logger, config configmodels.Exporter, opt
exportersChan := make(chan *ocagent.Exporter, numWorkers) exportersChan := make(chan *ocagent.Exporter, numWorkers)
for exporterIndex := 0; exporterIndex < numWorkers; exporterIndex++ { for exporterIndex := 0; exporterIndex < numWorkers; exporterIndex++ {
// TODO: ocagent.NewExporter blocks for connection. Now that we have ability
// to report errors asynchronously using Host.ReportFatalError we can move this
// code to Start() and do it in background to avoid blocking Collector startup
// as we do now.
exporter, serr := ocagent.NewExporter(opts...) exporter, serr := ocagent.NewExporter(opts...)
if serr != nil { if serr != nil {
return nil, fmt.Errorf("cannot configure OpenCensus exporter: %v", serr) return nil, fmt.Errorf("cannot configure OpenCensus exporter: %v", serr)

View File

@ -24,6 +24,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/consumer" "github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/open-telemetry/opentelemetry-collector/exporter/exporterhelper" "github.com/open-telemetry/opentelemetry-collector/exporter/exporterhelper"
) )
@ -37,6 +38,10 @@ type prometheusExporter struct {
var _ consumer.MetricsConsumer = (*prometheusExporter)(nil) var _ consumer.MetricsConsumer = (*prometheusExporter)(nil)
func (pe *prometheusExporter) Start(host exporter.Host) error {
return nil
}
func (pe *prometheusExporter) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error { func (pe *prometheusExporter) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
for _, metric := range md.Metrics { for _, metric := range md.Metrics {
_ = pe.exporter.ExportMetric(ctx, md.Node, md.Resource, metric) _ = pe.exporter.ExportMetric(ctx, md.Node, md.Resource, metric)

View File

@ -29,6 +29,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumererror" "github.com/open-telemetry/opentelemetry-collector/consumer/consumererror"
"github.com/open-telemetry/opentelemetry-collector/exporter"
"github.com/open-telemetry/opentelemetry-collector/observability" "github.com/open-telemetry/opentelemetry-collector/observability"
tracetranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace" tracetranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace"
spandatatranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace/spandata" spandatatranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace/spandata"
@ -127,6 +128,10 @@ func (ze *zipkinExporter) Name() string {
return ze.defaultServiceName return ze.defaultServiceName
} }
func (ze *zipkinExporter) Start(host exporter.Host) error {
return nil
}
func (ze *zipkinExporter) Shutdown() error { func (ze *zipkinExporter) Shutdown() error {
ze.mu.Lock() ze.mu.Lock()
defer ze.mu.Unlock() defer ze.mu.Unlock()

View File

@ -32,6 +32,26 @@ type builtExporter struct {
me exporter.MetricsExporter me exporter.MetricsExporter
} }
// Start the exporter.
func (exp *builtExporter) Start(host exporter.Host) error {
var errors []error
if exp.te != nil {
err := exp.te.Start(host)
if err != nil {
errors = append(errors, err)
}
}
if exp.me != nil {
err := exp.me.Start(host)
if err != nil {
errors = append(errors, err)
}
}
return oterr.CombineErrors(errors)
}
// Shutdown the trace component and the metrics component of an exporter. // Shutdown the trace component and the metrics component of an exporter.
func (exp *builtExporter) Shutdown() error { func (exp *builtExporter) Shutdown() error {
var errors []error var errors []error
@ -53,6 +73,19 @@ func (exp *builtExporter) Shutdown() error {
// Exporters is a map of exporters created from exporter configs. // Exporters is a map of exporters created from exporter configs.
type Exporters map[configmodels.Exporter]*builtExporter type Exporters map[configmodels.Exporter]*builtExporter
// StartAll starts all exporters.
func (exps Exporters) StartAll(logger *zap.Logger, host exporter.Host) error {
for cfg, exp := range exps {
logger.Info("Exporter is starting...", zap.String("exporter", cfg.Name()))
if err := exp.Start(host); err != nil {
return err
}
logger.Info("Exporter started.", zap.String("exporter", cfg.Name()))
}
return nil
}
// ShutdownAll stops all exporters. // ShutdownAll stops all exporters.
func (exps Exporters) ShutdownAll() { func (exps Exporters) ShutdownAll() {
for _, exp := range exps { for _, exp := range exps {

View File

@ -25,6 +25,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/config/configgrpc" "github.com/open-telemetry/opentelemetry-collector/config/configgrpc"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels" "github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/exporter/opencensusexporter" "github.com/open-telemetry/opentelemetry-collector/exporter/opencensusexporter"
"github.com/open-telemetry/opentelemetry-collector/receiver/receivertest"
) )
func TestExportersBuilder_Build(t *testing.T) { func TestExportersBuilder_Build(t *testing.T) {
@ -69,8 +70,12 @@ func TestExportersBuilder_Build(t *testing.T) {
assert.NotNil(t, e1.te) assert.NotNil(t, e1.te)
assert.Nil(t, e1.me) assert.Nil(t, e1.me)
// Ensure it can be stopped. // Ensure it can be started.
mh := receivertest.NewMockHost()
err = exporters.StartAll(zap.NewNop(), mh)
assert.NoError(t, err)
// Ensure it can be stopped.
if err = e1.Shutdown(); err != nil { if err = e1.Shutdown(); err != nil {
// TODO Find a better way to handle this case // TODO Find a better way to handle this case
// Since the endpoint of opencensus exporter doesn't actually exist, e1 may // Since the endpoint of opencensus exporter doesn't actually exist, e1 may
@ -98,6 +103,26 @@ func TestExportersBuilder_Build(t *testing.T) {
// TODO: once we have an exporter that supports metrics data type test it too. // TODO: once we have an exporter that supports metrics data type test it too.
} }
func TestExportersBuilder_StartAll(t *testing.T) {
exporters := make(Exporters)
expCfg := &configmodels.ExporterSettings{}
traceExporter := &config.ExampleExporterConsumer{}
metricExporter := &config.ExampleExporterConsumer{}
exporters[expCfg] = &builtExporter{
te: traceExporter,
me: metricExporter,
}
assert.False(t, traceExporter.ExporterStarted)
assert.False(t, metricExporter.ExporterStarted)
mh := receivertest.NewMockHost()
err := exporters.StartAll(zap.NewNop(), mh)
assert.NoError(t, err)
assert.True(t, traceExporter.ExporterStarted)
assert.True(t, metricExporter.ExporterStarted)
}
func TestExportersBuilder_StopAll(t *testing.T) { func TestExportersBuilder_StopAll(t *testing.T) {
exporters := make(Exporters) exporters := make(Exporters)
expCfg := &configmodels.ExporterSettings{} expCfg := &configmodels.ExporterSettings{}

View File

@ -93,7 +93,7 @@ func (rcvs Receivers) StartAll(logger *zap.Logger, host receiver.Host) error {
if err := rcv.Start(host); err != nil { if err := rcv.Start(host); err != nil {
return err return err
} }
logger.Info("Receiver is started.", zap.String("receiver", cfg.Name())) logger.Info("Receiver started.", zap.String("receiver", cfg.Name()))
} }
return nil return nil
} }

View File

@ -206,20 +206,25 @@ func (app *Application) setupPipelines() {
var err error var err error
app.exporters, err = builder.NewExportersBuilder(app.logger, app.config, app.factories.Exporters).Build() app.exporters, err = builder.NewExportersBuilder(app.logger, app.config, app.factories.Exporters).Build()
if err != nil { if err != nil {
log.Fatalf("Cannot load configuration: %v", err) log.Fatalf("Cannot build exporters: %v", err)
}
app.logger.Info("Starting exporters...")
err = app.exporters.StartAll(app.logger, app)
if err != nil {
log.Fatalf("Cannot start exporters: %v", err)
} }
// Create pipelines and their processors and plug exporters to the // Create pipelines and their processors and plug exporters to the
// end of the pipelines. // end of the pipelines.
pipelines, err := builder.NewPipelinesBuilder(app.logger, app.config, app.exporters, app.factories.Processors).Build() pipelines, err := builder.NewPipelinesBuilder(app.logger, app.config, app.exporters, app.factories.Processors).Build()
if err != nil { if err != nil {
log.Fatalf("Cannot load configuration: %v", err) log.Fatalf("Cannot build pipelines: %v", err)
} }
// Create receivers and plug them into the start of the pipelines. // Create receivers and plug them into the start of the pipelines.
app.builtReceivers, err = builder.NewReceiversBuilder(app.logger, app.config, pipelines, app.factories.Receivers).Build() app.builtReceivers, err = builder.NewReceiversBuilder(app.logger, app.config, pipelines, app.factories.Receivers).Build()
if err != nil { if err != nil {
log.Fatalf("Cannot load configuration: %v", err) log.Fatalf("Cannot build receivers: %v", err)
} }
app.logger.Info("Starting receivers...") app.logger.Info("Starting receivers...")