Update exporterhelper to use the Shutdown with context (#798)
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
parent
2d1a9b7ed1
commit
eea53c92e3
|
|
@ -15,6 +15,8 @@
|
|||
package exporterhelper
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
|
|
@ -23,7 +25,7 @@ var (
|
|||
)
|
||||
|
||||
// Shutdown specifies the function invoked when the exporter is being shutdown.
|
||||
type Shutdown func() error
|
||||
type Shutdown func(context.Context) error
|
||||
|
||||
// ExporterOptions contains options concerning how an Exporter is configured.
|
||||
type ExporterOptions struct {
|
||||
|
|
@ -49,10 +51,3 @@ func newExporterOptions(options ...ExporterOption) ExporterOptions {
|
|||
}
|
||||
return opts
|
||||
}
|
||||
|
||||
func errToStatus(err error) trace.Status {
|
||||
if err != nil {
|
||||
return trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}
|
||||
}
|
||||
return okStatus
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,3 +25,10 @@ func TestErrorToStatus(t *testing.T) {
|
|||
require.Equal(t, okStatus, errToStatus(nil))
|
||||
require.Equal(t, trace.Status{Code: trace.StatusCodeUnknown, Message: "my_error"}, errToStatus(errors.New("my_error")))
|
||||
}
|
||||
|
||||
func errToStatus(err error) trace.Status {
|
||||
if err != nil {
|
||||
return trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}
|
||||
}
|
||||
return okStatus
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,8 +44,8 @@ func (me *metricsExporter) ConsumeMetricsData(ctx context.Context, md consumerda
|
|||
}
|
||||
|
||||
// Shutdown stops the exporter and is invoked during shutdown.
|
||||
func (me *metricsExporter) Shutdown(context.Context) error {
|
||||
return me.shutdown()
|
||||
func (me *metricsExporter) Shutdown(ctx context.Context) error {
|
||||
return me.shutdown(ctx)
|
||||
}
|
||||
|
||||
// NewMetricsExporter creates an MetricsExporter that can record metrics and can wrap every request with a Span.
|
||||
|
|
@ -66,7 +66,7 @@ func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetric
|
|||
|
||||
// The default shutdown method always returns nil.
|
||||
if opts.shutdown == nil {
|
||||
opts.shutdown = func() error { return nil }
|
||||
opts.shutdown = func(context.Context) error { return nil }
|
||||
}
|
||||
|
||||
return &metricsExporter{
|
||||
|
|
|
|||
|
|
@ -126,7 +126,7 @@ func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) {
|
|||
|
||||
func TestMetricsExporter_WithShutdown(t *testing.T) {
|
||||
shutdownCalled := false
|
||||
shutdown := func() error { shutdownCalled = true; return nil }
|
||||
shutdown := func(context.Context) error { shutdownCalled = true; return nil }
|
||||
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(0, nil), WithShutdown(shutdown))
|
||||
assert.NotNil(t, me)
|
||||
|
|
@ -138,7 +138,7 @@ func TestMetricsExporter_WithShutdown(t *testing.T) {
|
|||
|
||||
func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) {
|
||||
want := errors.New("my_error")
|
||||
shutdownErr := func() error { return want }
|
||||
shutdownErr := func(context.Context) error { return want }
|
||||
|
||||
me, err := NewMetricsExporter(fakeMetricsExporterConfig, newPushMetricsData(0, nil), WithShutdown(shutdownErr))
|
||||
assert.NotNil(t, me)
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ type traceExporterOld struct {
|
|||
shutdown Shutdown
|
||||
}
|
||||
|
||||
func (te *traceExporterOld) Start(ctx context.Context, host component.Host) error {
|
||||
func (te *traceExporterOld) Start(_ context.Context, _ component.Host) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -50,8 +50,8 @@ func (te *traceExporterOld) ConsumeTraceData(ctx context.Context, td consumerdat
|
|||
}
|
||||
|
||||
// Shutdown stops the exporter and is invoked during shutdown.
|
||||
func (te *traceExporterOld) Shutdown(context.Context) error {
|
||||
return te.shutdown()
|
||||
func (te *traceExporterOld) Shutdown(ctx context.Context) error {
|
||||
return te.shutdown(ctx)
|
||||
}
|
||||
|
||||
// NewTraceExporterOld creates an TraceExporterOld that can record metrics and can wrap every
|
||||
|
|
@ -77,9 +77,7 @@ func NewTraceExporterOld(
|
|||
|
||||
// The default shutdown function does nothing.
|
||||
if opts.shutdown == nil {
|
||||
opts.shutdown = func() error {
|
||||
return nil
|
||||
}
|
||||
opts.shutdown = func(context.Context) error { return nil }
|
||||
}
|
||||
|
||||
return &traceExporterOld{
|
||||
|
|
@ -112,7 +110,7 @@ type traceExporter struct {
|
|||
shutdown Shutdown
|
||||
}
|
||||
|
||||
func (te *traceExporter) Start(ctx context.Context, host component.Host) error {
|
||||
func (te *traceExporter) Start(_ context.Context, _ component.Host) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -126,8 +124,8 @@ func (te *traceExporter) ConsumeTrace(
|
|||
}
|
||||
|
||||
// Shutdown stops the exporter and is invoked during shutdown.
|
||||
func (te *traceExporter) Shutdown(context.Context) error {
|
||||
return te.shutdown()
|
||||
func (te *traceExporter) Shutdown(ctx context.Context) error {
|
||||
return te.shutdown(ctx)
|
||||
}
|
||||
|
||||
// NewTraceExporter creates a TraceExporter that can record metrics and can wrap
|
||||
|
|
@ -152,9 +150,7 @@ func NewTraceExporter(
|
|||
|
||||
// The default shutdown function does nothing.
|
||||
if opts.shutdown == nil {
|
||||
opts.shutdown = func() error {
|
||||
return nil
|
||||
}
|
||||
opts.shutdown = func(context.Context) error { return nil }
|
||||
}
|
||||
|
||||
return &traceExporter{
|
||||
|
|
|
|||
|
|
@ -135,7 +135,7 @@ func TestTraceExporterOld_WithSpan_ReturnError(t *testing.T) {
|
|||
|
||||
func TestTraceExporterOld_WithShutdown(t *testing.T) {
|
||||
shutdownCalled := false
|
||||
shutdown := func() error { shutdownCalled = true; return nil }
|
||||
shutdown := func(context.Context) error { shutdownCalled = true; return nil }
|
||||
|
||||
te, err := NewTraceExporterOld(fakeTraceExporterConfig, newTraceDataPusherOld(0, nil), WithShutdown(shutdown))
|
||||
assert.NotNil(t, te)
|
||||
|
|
@ -147,7 +147,7 @@ func TestTraceExporterOld_WithShutdown(t *testing.T) {
|
|||
|
||||
func TestTraceExporterOld_WithShutdown_ReturnError(t *testing.T) {
|
||||
want := errors.New("my_error")
|
||||
shutdownErr := func() error { return want }
|
||||
shutdownErr := func(context.Context) error { return want }
|
||||
|
||||
te, err := NewTraceExporterOld(fakeTraceExporterConfig, newTraceDataPusherOld(0, nil), WithShutdown(shutdownErr))
|
||||
assert.NotNil(t, te)
|
||||
|
|
@ -322,7 +322,7 @@ func TestTraceExporter_WithSpan_ReturnError(t *testing.T) {
|
|||
|
||||
func TestTraceExporter_WithShutdown(t *testing.T) {
|
||||
shutdownCalled := false
|
||||
shutdown := func() error { shutdownCalled = true; return nil }
|
||||
shutdown := func(context.Context) error { shutdownCalled = true; return nil }
|
||||
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newTraceDataPusher(0, nil), WithShutdown(shutdown))
|
||||
assert.NotNil(t, te)
|
||||
|
|
@ -334,7 +334,7 @@ func TestTraceExporter_WithShutdown(t *testing.T) {
|
|||
|
||||
func TestTraceExporter_WithShutdown_ReturnError(t *testing.T) {
|
||||
want := errors.New("my_error")
|
||||
shutdownErr := func() error { return want }
|
||||
shutdownErr := func(context.Context) error { return want }
|
||||
|
||||
te, err := NewTraceExporter(fakeTraceExporterConfig, newTraceDataPusher(0, nil), WithShutdown(shutdownErr))
|
||||
assert.NotNil(t, te)
|
||||
|
|
|
|||
|
|
@ -180,8 +180,8 @@ func NewMetricsExporter(config configmodels.Exporter, logger *zap.Logger) (compo
|
|||
)
|
||||
}
|
||||
|
||||
func loggerSync(logger *zap.Logger) func() error {
|
||||
return func() error {
|
||||
func loggerSync(logger *zap.Logger) func(context.Context) error {
|
||||
return func(context.Context) error {
|
||||
// Currently Sync() on stdout and stderr return errors on Linux and macOS,
|
||||
// respectively:
|
||||
//
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ func NewMetricsExporter(logger *zap.Logger, config configmodels.Exporter, opts .
|
|||
return oexp, nil
|
||||
}
|
||||
|
||||
func (oce *ocAgentExporter) Shutdown() error {
|
||||
func (oce *ocAgentExporter) Shutdown(context.Context) error {
|
||||
wg := &sync.WaitGroup{}
|
||||
var errors []error
|
||||
var errorsMu sync.Mutex
|
||||
|
|
|
|||
|
|
@ -59,8 +59,8 @@ const (
|
|||
)
|
||||
|
||||
// NewTraceExporter creates an OTLP trace exporter.
|
||||
func NewTraceExporter(logger *zap.Logger, config configmodels.Exporter) (component.TraceExporterOld, error) {
|
||||
oce, err := createOTLPExporter(logger, config)
|
||||
func NewTraceExporter(_ *zap.Logger, config configmodels.Exporter) (component.TraceExporterOld, error) {
|
||||
oce, err := createOTLPExporter(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -76,8 +76,8 @@ func NewTraceExporter(logger *zap.Logger, config configmodels.Exporter) (compone
|
|||
}
|
||||
|
||||
// NewMetricsExporter creates an OTLP metrics exporter.
|
||||
func NewMetricsExporter(logger *zap.Logger, config configmodels.Exporter) (component.MetricsExporterOld, error) {
|
||||
oce, err := createOTLPExporter(logger, config)
|
||||
func NewMetricsExporter(_ *zap.Logger, config configmodels.Exporter) (component.MetricsExporterOld, error) {
|
||||
oce, err := createOTLPExporter(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -94,7 +94,7 @@ func NewMetricsExporter(logger *zap.Logger, config configmodels.Exporter) (compo
|
|||
}
|
||||
|
||||
// createOTLPExporter creates an OTLP exporter.
|
||||
func createOTLPExporter(logger *zap.Logger, config configmodels.Exporter) (*otlpExporter, error) {
|
||||
func createOTLPExporter(config configmodels.Exporter) (*otlpExporter, error) {
|
||||
oCfg := config.(*Config)
|
||||
|
||||
if oCfg.Endpoint == "" {
|
||||
|
|
@ -125,7 +125,7 @@ func createOTLPExporter(logger *zap.Logger, config configmodels.Exporter) (*otlp
|
|||
return oce, nil
|
||||
}
|
||||
|
||||
func (oce *otlpExporter) Shutdown() error {
|
||||
func (oce *otlpExporter) Shutdown(context.Context) error {
|
||||
// Stop all exporters. Will wait until all are stopped.
|
||||
wg := &sync.WaitGroup{}
|
||||
var errors []error
|
||||
|
|
|
|||
|
|
@ -91,9 +91,9 @@ func (f *Factory) CreateMetricsExporter(logger *zap.Logger, cfg configmodels.Exp
|
|||
}()
|
||||
|
||||
pexp := &prometheusExporter{
|
||||
name: cfg.Name(),
|
||||
exporter: pe,
|
||||
shutdown: ln.Close,
|
||||
name: cfg.Name(),
|
||||
exporter: pe,
|
||||
shutdownFunc: ln.Close,
|
||||
}
|
||||
|
||||
return pexp, nil
|
||||
|
|
|
|||
|
|
@ -25,20 +25,19 @@ import (
|
|||
"github.com/open-telemetry/opentelemetry-collector/component"
|
||||
"github.com/open-telemetry/opentelemetry-collector/consumer"
|
||||
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
|
||||
"github.com/open-telemetry/opentelemetry-collector/exporter/exporterhelper"
|
||||
)
|
||||
|
||||
var errBlankPrometheusAddress = errors.New("expecting a non-blank address to run the Prometheus metrics handler")
|
||||
|
||||
type prometheusExporter struct {
|
||||
name string
|
||||
exporter *prometheus.Exporter
|
||||
shutdown exporterhelper.Shutdown
|
||||
name string
|
||||
exporter *prometheus.Exporter
|
||||
shutdownFunc func() error
|
||||
}
|
||||
|
||||
var _ consumer.MetricsConsumerOld = (*prometheusExporter)(nil)
|
||||
|
||||
func (pe *prometheusExporter) Start(ctx context.Context, host component.Host) error {
|
||||
func (pe *prometheusExporter) Start(_ context.Context, _ component.Host) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -51,5 +50,5 @@ func (pe *prometheusExporter) ConsumeMetricsData(ctx context.Context, md consume
|
|||
|
||||
// Shutdown stops the exporter and is invoked during shutdown.
|
||||
func (pe *prometheusExporter) Shutdown(context.Context) error {
|
||||
return pe.shutdown()
|
||||
return pe.shutdownFunc()
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue