ExporterHelper: Add ability to configure start function and remove duplicate code (#1337)
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
parent
08e53d4653
commit
68c4e05c29
|
|
@ -18,36 +18,70 @@ import (
|
|||
"context"
|
||||
|
||||
"go.opencensus.io/trace"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
)
|
||||
|
||||
var (
|
||||
okStatus = trace.Status{Code: trace.StatusCodeOK}
|
||||
)
|
||||
|
||||
// Start specifies the function invoked when the exporter is being started.
|
||||
type Start func(context.Context, component.Host) error
|
||||
|
||||
// Shutdown specifies the function invoked when the exporter is being shutdown.
|
||||
type Shutdown func(context.Context) error
|
||||
|
||||
// ExporterOptions contains options concerning how an Exporter is configured.
|
||||
type ExporterOptions struct {
|
||||
shutdown Shutdown
|
||||
}
|
||||
|
||||
// ExporterOption apply changes to ExporterOptions.
|
||||
type ExporterOption func(*ExporterOptions)
|
||||
// ExporterOption apply changes to internalOptions.
|
||||
type ExporterOption func(*baseExporter)
|
||||
|
||||
// WithShutdown overrides the default Shutdown function for an exporter.
|
||||
// The default shutdown function does nothing and always returns nil.
|
||||
func WithShutdown(shutdown Shutdown) ExporterOption {
|
||||
return func(o *ExporterOptions) {
|
||||
return func(o *baseExporter) {
|
||||
o.shutdown = shutdown
|
||||
}
|
||||
}
|
||||
|
||||
// Construct the ExporterOptions from multiple ExporterOption.
|
||||
func newExporterOptions(options ...ExporterOption) ExporterOptions {
|
||||
var opts ExporterOptions
|
||||
for _, op := range options {
|
||||
op(&opts)
|
||||
// WithStart overrides the default Start function for an exporter.
|
||||
// The default shutdown function does nothing and always returns nil.
|
||||
func WithStart(start Start) ExporterOption {
|
||||
return func(o *baseExporter) {
|
||||
o.start = start
|
||||
}
|
||||
return opts
|
||||
}
|
||||
|
||||
// internalOptions contains internalOptions concerning how an Exporter is configured.
|
||||
type baseExporter struct {
|
||||
exporterFullName string
|
||||
start Start
|
||||
shutdown Shutdown
|
||||
}
|
||||
|
||||
// Construct the internalOptions from multiple ExporterOption.
|
||||
func newBaseExporter(exporterFullName string, options ...ExporterOption) baseExporter {
|
||||
be := baseExporter{
|
||||
exporterFullName: exporterFullName,
|
||||
}
|
||||
|
||||
for _, op := range options {
|
||||
op(&be)
|
||||
}
|
||||
|
||||
return be
|
||||
}
|
||||
|
||||
func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
|
||||
if be.start != nil {
|
||||
return be.start(ctx, host)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown stops the exporter and is invoked during shutdown.
|
||||
func (be *baseExporter) Shutdown(ctx context.Context) error {
|
||||
if be.shutdown != nil {
|
||||
return be.shutdown(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,11 +14,15 @@
|
|||
package exporterhelper
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opencensus.io/trace"
|
||||
|
||||
"go.opentelemetry.io/collector/component"
|
||||
"go.opentelemetry.io/collector/component/componenttest"
|
||||
)
|
||||
|
||||
func TestErrorToStatus(t *testing.T) {
|
||||
|
|
@ -26,6 +30,21 @@ func TestErrorToStatus(t *testing.T) {
|
|||
require.Equal(t, trace.Status{Code: trace.StatusCodeUnknown, Message: "my_error"}, errToStatus(errors.New("my_error")))
|
||||
}
|
||||
|
||||
func TestBaseExporter(t *testing.T) {
|
||||
be := newBaseExporter("test")
|
||||
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.NoError(t, be.Shutdown(context.Background()))
|
||||
}
|
||||
|
||||
func TestBaseExporterWithOptions(t *testing.T) {
|
||||
be := newBaseExporter(
|
||||
"test",
|
||||
WithStart(func(ctx context.Context, host component.Host) error { return errors.New("my error") }),
|
||||
WithShutdown(func(ctx context.Context) error { return errors.New("my error") }))
|
||||
require.Error(t, be.Start(context.Background(), componenttest.NewNopHost()))
|
||||
require.Error(t, be.Shutdown(context.Background()))
|
||||
}
|
||||
|
||||
func errToStatus(err error) trace.Status {
|
||||
if err != nil {
|
||||
return trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}
|
||||
|
|
|
|||
|
|
@ -28,13 +28,8 @@ import (
|
|||
type PushLogsData func(ctx context.Context, md data.Logs) (droppedTimeSeries int, err error)
|
||||
|
||||
type logsExporter struct {
|
||||
exporterFullName string
|
||||
pushLogsData PushLogsData
|
||||
shutdown Shutdown
|
||||
}
|
||||
|
||||
func (me *logsExporter) Start(ctx context.Context, host component.Host) error {
|
||||
return nil
|
||||
baseExporter
|
||||
pushLogsData PushLogsData
|
||||
}
|
||||
|
||||
func (me *logsExporter) ConsumeLogs(ctx context.Context, md data.Logs) error {
|
||||
|
|
@ -43,11 +38,6 @@ func (me *logsExporter) ConsumeLogs(ctx context.Context, md data.Logs) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// Shutdown stops the exporter and is invoked during shutdown.
|
||||
func (me *logsExporter) Shutdown(ctx context.Context) error {
|
||||
return me.shutdown(ctx)
|
||||
}
|
||||
|
||||
// NewLogsExporter creates an LogsExporter that can record logs and can wrap every request with a Span.
|
||||
// TODO: Add support for retries.
|
||||
func NewLogsExporter(config configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) {
|
||||
|
|
@ -59,19 +49,11 @@ func NewLogsExporter(config configmodels.Exporter, pushLogsData PushLogsData, op
|
|||
return nil, errNilPushLogsData
|
||||
}
|
||||
|
||||
opts := newExporterOptions(options...)
|
||||
|
||||
pushLogsData = pushLogsWithObservability(pushLogsData, config.Name())
|
||||
|
||||
// The default shutdown method always returns nil.
|
||||
if opts.shutdown == nil {
|
||||
opts.shutdown = func(context.Context) error { return nil }
|
||||
}
|
||||
|
||||
return &logsExporter{
|
||||
exporterFullName: config.Name(),
|
||||
pushLogsData: pushLogsData,
|
||||
shutdown: opts.shutdown,
|
||||
baseExporter: newBaseExporter(config.Name(), options...),
|
||||
pushLogsData: pushLogsData,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -30,13 +30,8 @@ import (
|
|||
type PushMetricsDataOld func(ctx context.Context, td consumerdata.MetricsData) (droppedTimeSeries int, err error)
|
||||
|
||||
type metricsExporterOld struct {
|
||||
exporterFullName string
|
||||
pushMetricsData PushMetricsDataOld
|
||||
shutdown Shutdown
|
||||
}
|
||||
|
||||
func (me *metricsExporterOld) Start(ctx context.Context, host component.Host) error {
|
||||
return nil
|
||||
baseExporter
|
||||
pushMetricsData PushMetricsDataOld
|
||||
}
|
||||
|
||||
func (me *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
|
||||
|
|
@ -45,13 +40,8 @@ func (me *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consume
|
|||
return err
|
||||
}
|
||||
|
||||
// Shutdown stops the exporter and is invoked during shutdown.
|
||||
func (me *metricsExporterOld) Shutdown(ctx context.Context) error {
|
||||
return me.shutdown(ctx)
|
||||
}
|
||||
|
||||
// NewMetricsExporterOld creates an MetricsExporter that can record metrics and can wrap every request with a Span.
|
||||
// If no options are passed it just adds the exporter format as a tag in the Context.
|
||||
// If no internalOptions are passed it just adds the exporter format as a tag in the Context.
|
||||
// TODO: Add support for retries.
|
||||
func NewMetricsExporterOld(config configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) {
|
||||
if config == nil {
|
||||
|
|
@ -62,19 +52,11 @@ func NewMetricsExporterOld(config configmodels.Exporter, pushMetricsData PushMet
|
|||
return nil, errNilPushMetricsData
|
||||
}
|
||||
|
||||
opts := newExporterOptions(options...)
|
||||
|
||||
pushMetricsData = pushMetricsWithObservabilityOld(pushMetricsData, config.Name())
|
||||
|
||||
// The default shutdown method always returns nil.
|
||||
if opts.shutdown == nil {
|
||||
opts.shutdown = func(context.Context) error { return nil }
|
||||
}
|
||||
|
||||
return &metricsExporterOld{
|
||||
exporterFullName: config.Name(),
|
||||
pushMetricsData: pushMetricsData,
|
||||
shutdown: opts.shutdown,
|
||||
baseExporter: newBaseExporter(config.Name(), options...),
|
||||
pushMetricsData: pushMetricsData,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
@ -107,13 +89,8 @@ func NumTimeSeries(md consumerdata.MetricsData) int {
|
|||
type PushMetricsData func(ctx context.Context, md pdata.Metrics) (droppedTimeSeries int, err error)
|
||||
|
||||
type metricsExporter struct {
|
||||
exporterFullName string
|
||||
pushMetricsData PushMetricsData
|
||||
shutdown Shutdown
|
||||
}
|
||||
|
||||
func (me *metricsExporter) Start(ctx context.Context, host component.Host) error {
|
||||
return nil
|
||||
baseExporter
|
||||
pushMetricsData PushMetricsData
|
||||
}
|
||||
|
||||
func (me *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
|
||||
|
|
@ -122,13 +99,8 @@ func (me *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics)
|
|||
return err
|
||||
}
|
||||
|
||||
// Shutdown stops the exporter and is invoked during shutdown.
|
||||
func (me *metricsExporter) Shutdown(ctx context.Context) error {
|
||||
return me.shutdown(ctx)
|
||||
}
|
||||
|
||||
// NewMetricsExporter creates an MetricsExporter that can record metrics and can wrap every request with a Span.
|
||||
// If no options are passed it just adds the exporter format as a tag in the Context.
|
||||
// If no internalOptions are passed it just adds the exporter format as a tag in the Context.
|
||||
// TODO: Add support for retries.
|
||||
func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) {
|
||||
if config == nil {
|
||||
|
|
@ -139,19 +111,11 @@ func NewMetricsExporter(config configmodels.Exporter, pushMetricsData PushMetric
|
|||
return nil, errNilPushMetricsData
|
||||
}
|
||||
|
||||
opts := newExporterOptions(options...)
|
||||
|
||||
pushMetricsData = pushMetricsWithObservability(pushMetricsData, config.Name())
|
||||
|
||||
// The default shutdown method always returns nil.
|
||||
if opts.shutdown == nil {
|
||||
opts.shutdown = func(context.Context) error { return nil }
|
||||
}
|
||||
|
||||
return &metricsExporter{
|
||||
exporterFullName: config.Name(),
|
||||
pushMetricsData: pushMetricsData,
|
||||
shutdown: opts.shutdown,
|
||||
baseExporter: newBaseExporter(config.Name(), options...),
|
||||
pushMetricsData: pushMetricsData,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -32,15 +32,10 @@ type traceDataPusherOld func(ctx context.Context, td consumerdata.TraceData) (dr
|
|||
// returns the number of dropped spans.
|
||||
type traceDataPusher func(ctx context.Context, td pdata.Traces) (droppedSpans int, err error)
|
||||
|
||||
// traceExporterOld implements the exporter with additional helper options.
|
||||
// traceExporterOld implements the exporter with additional helper internalOptions.
|
||||
type traceExporterOld struct {
|
||||
exporterFullName string
|
||||
dataPusher traceDataPusherOld
|
||||
shutdown Shutdown
|
||||
}
|
||||
|
||||
func (te *traceExporterOld) Start(_ context.Context, _ component.Host) error {
|
||||
return nil
|
||||
baseExporter
|
||||
dataPusher traceDataPusherOld
|
||||
}
|
||||
|
||||
func (te *traceExporterOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
|
||||
|
|
@ -49,13 +44,8 @@ func (te *traceExporterOld) ConsumeTraceData(ctx context.Context, td consumerdat
|
|||
return err
|
||||
}
|
||||
|
||||
// Shutdown stops the exporter and is invoked during shutdown.
|
||||
func (te *traceExporterOld) Shutdown(ctx context.Context) error {
|
||||
return te.shutdown(ctx)
|
||||
}
|
||||
|
||||
// NewTraceExporterOld creates an TraceExporterOld that can record metrics and can wrap every
|
||||
// request with a Span. If no options are passed it just adds the exporter format as a
|
||||
// request with a Span. If no internalOptions are passed it just adds the exporter format as a
|
||||
// tag in the Context.
|
||||
func NewTraceExporterOld(
|
||||
config configmodels.Exporter,
|
||||
|
|
@ -71,19 +61,11 @@ func NewTraceExporterOld(
|
|||
return nil, errNilPushTraceData
|
||||
}
|
||||
|
||||
opts := newExporterOptions(options...)
|
||||
|
||||
dataPusher = dataPusher.withObservability(config.Name())
|
||||
|
||||
// The default shutdown function does nothing.
|
||||
if opts.shutdown == nil {
|
||||
opts.shutdown = func(context.Context) error { return nil }
|
||||
}
|
||||
|
||||
return &traceExporterOld{
|
||||
exporterFullName: config.Name(),
|
||||
dataPusher: dataPusher,
|
||||
shutdown: opts.shutdown,
|
||||
baseExporter: newBaseExporter(config.Name(), options...),
|
||||
dataPusher: dataPusher,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
@ -105,13 +87,8 @@ func (p traceDataPusherOld) withObservability(exporterName string) traceDataPush
|
|||
}
|
||||
|
||||
type traceExporter struct {
|
||||
exporterFullName string
|
||||
dataPusher traceDataPusher
|
||||
shutdown Shutdown
|
||||
}
|
||||
|
||||
func (te *traceExporter) Start(_ context.Context, _ component.Host) error {
|
||||
return nil
|
||||
baseExporter
|
||||
dataPusher traceDataPusher
|
||||
}
|
||||
|
||||
func (te *traceExporter) ConsumeTraces(
|
||||
|
|
@ -123,11 +100,6 @@ func (te *traceExporter) ConsumeTraces(
|
|||
return err
|
||||
}
|
||||
|
||||
// Shutdown stops the exporter and is invoked during shutdown.
|
||||
func (te *traceExporter) Shutdown(ctx context.Context) error {
|
||||
return te.shutdown(ctx)
|
||||
}
|
||||
|
||||
// NewTraceExporter creates a TraceExporter that can record metrics and can wrap
|
||||
// every request with a Span.
|
||||
func NewTraceExporter(
|
||||
|
|
@ -144,19 +116,11 @@ func NewTraceExporter(
|
|||
return nil, errNilPushTraceData
|
||||
}
|
||||
|
||||
opts := newExporterOptions(options...)
|
||||
|
||||
dataPusher = dataPusher.withObservability(config.Name())
|
||||
|
||||
// The default shutdown function does nothing.
|
||||
if opts.shutdown == nil {
|
||||
opts.shutdown = func(context.Context) error { return nil }
|
||||
}
|
||||
|
||||
return &traceExporter{
|
||||
exporterFullName: config.Name(),
|
||||
dataPusher: dataPusher,
|
||||
shutdown: opts.shutdown,
|
||||
baseExporter: newBaseExporter(config.Name(), options...),
|
||||
dataPusher: dataPusher,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue