Reduce duplicate code in components helper (#2186)

Add componenthelper package to help building components like Processors, Exporters.

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
Bogdan Drutu 2020-11-20 10:17:46 -05:00 committed by GitHub
parent e365c9bfbc
commit 8ceddba7ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 227 additions and 143 deletions

View File

@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package componenthelper
import (
"context"
"go.opentelemetry.io/collector/component"
)
// Start specifies the function invoked when the exporter is being started.
type Start func(context.Context, component.Host) error
// Shutdown specifies the function invoked when the exporter is being shutdown.
type Shutdown func(context.Context) error
// ComponentSettings represents a settings struct to create components.
type ComponentSettings struct {
Start
Shutdown
}
// DefaultComponentSettings returns the default settings for a component. The Start and Shutdown are no-op.
func DefaultComponentSettings() *ComponentSettings {
return &ComponentSettings{
Start: func(ctx context.Context, host component.Host) error { return nil },
Shutdown: func(ctx context.Context) error { return nil },
}
}
type baseComponent struct {
start Start
shutdown Shutdown
}
// Start all senders and exporter and is invoked during service start.
func (be *baseComponent) Start(ctx context.Context, host component.Host) error {
return be.start(ctx, host)
}
// Shutdown all senders and exporter and is invoked during service shutdown.
func (be *baseComponent) Shutdown(ctx context.Context) error {
return be.shutdown(ctx)
}
// NewComponent returns a component.Component that calls the given Start and Shutdown.
func NewComponent(s *ComponentSettings) component.Component {
return &baseComponent{
start: s.Start,
shutdown: s.Shutdown,
}
}

View File

@ -0,0 +1,69 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package componenthelper
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
)
func TestDefaultSettings(t *testing.T) {
st := DefaultComponentSettings()
require.NotNil(t, st)
cp := NewComponent(st)
require.NoError(t, cp.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, cp.Shutdown(context.Background()))
}
func TestWithStart(t *testing.T) {
startCalled := false
st := DefaultComponentSettings()
st.Start = func(context.Context, component.Host) error { startCalled = true; return nil }
cp := NewComponent(st)
assert.NoError(t, cp.Start(context.Background(), componenttest.NewNopHost()))
assert.True(t, startCalled)
}
func TestWithStart_ReturnError(t *testing.T) {
want := errors.New("my_error")
st := DefaultComponentSettings()
st.Start = func(context.Context, component.Host) error { return want }
cp := NewComponent(st)
assert.Equal(t, want, cp.Start(context.Background(), componenttest.NewNopHost()))
}
func TestWithShutdown(t *testing.T) {
shutdownCalled := false
st := DefaultComponentSettings()
st.Shutdown = func(context.Context) error { shutdownCalled = true; return nil }
cp := NewComponent(st)
assert.NoError(t, cp.Shutdown(context.Background()))
assert.True(t, shutdownCalled)
}
func TestWithShutdown_ReturnError(t *testing.T) {
want := errors.New("my_error")
st := DefaultComponentSettings()
st.Shutdown = func(context.Context) error { return want }
cp := NewComponent(st)
assert.Equal(t, want, cp.Shutdown(context.Background()))
}

View File

@ -16,14 +16,13 @@ package exporterhelper
import (
"context"
"sync"
"time"
"go.opencensus.io/trace"
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumererror"
)
@ -32,7 +31,7 @@ var (
okStatus = trace.Status{Code: trace.StatusCodeOK}
)
// Settings for timeout. The timeout applies to individual attempts to send data to the backend.
// ComponentSettings for timeout. The timeout applies to individual attempts to send data to the backend.
type TimeoutSettings struct {
// Timeout is the timeout for every attempt to send data to the backend.
Timeout time.Duration `mapstructure:"timeout"`
@ -76,34 +75,26 @@ func (req *baseRequest) setContext(ctx context.Context) {
req.ctx = ctx
}
// Start specifies the function invoked when the exporter is being started.
type Start func(context.Context, component.Host) error
// Shutdown specifies the function invoked when the exporter is being shutdown.
type Shutdown func(context.Context) error
// internalOptions represents all the options that users can configure.
type internalOptions struct {
// baseSettings represents all the options that users can configure.
type baseSettings struct {
*componenthelper.ComponentSettings
TimeoutSettings
QueueSettings
RetrySettings
ResourceToTelemetrySettings
Start
Shutdown
}
// fromConfiguredOptions returns the internal options starting from the default and applying all configured options.
func fromConfiguredOptions(options ...ExporterOption) *internalOptions {
// fromOptions returns the internal options starting from the default and applying all configured options.
func fromOptions(options []Option) *baseSettings {
// Start from the default options:
opts := &internalOptions{
TimeoutSettings: CreateDefaultTimeoutSettings(),
opts := &baseSettings{
ComponentSettings: componenthelper.DefaultComponentSettings(),
TimeoutSettings: CreateDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call CreateDefaultQueueSettings)
QueueSettings: QueueSettings{Enabled: false},
// TODO: Enable retry by default (call CreateDefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
ResourceToTelemetrySettings: createDefaultResourceToTelemetrySettings(),
Start: func(ctx context.Context, host component.Host) error { return nil },
Shutdown: func(ctx context.Context) error { return nil },
}
for _, op := range options {
@ -113,79 +104,75 @@ func fromConfiguredOptions(options ...ExporterOption) *internalOptions {
return opts
}
// ExporterOption apply changes to internalOptions.
type ExporterOption func(*internalOptions)
// Option apply changes to baseSettings.
type Option func(*baseSettings)
// WithShutdown overrides the default Shutdown function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown Shutdown) ExporterOption {
return func(o *internalOptions) {
func WithShutdown(shutdown componenthelper.Shutdown) Option {
return func(o *baseSettings) {
o.Shutdown = shutdown
}
}
// WithStart overrides the default Start function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithStart(start Start) ExporterOption {
return func(o *internalOptions) {
func WithStart(start componenthelper.Start) Option {
return func(o *baseSettings) {
o.Start = start
}
}
// WithTimeout overrides the default TimeoutSettings for an exporter.
// The default TimeoutSettings is 5 seconds.
func WithTimeout(timeoutSettings TimeoutSettings) ExporterOption {
return func(o *internalOptions) {
func WithTimeout(timeoutSettings TimeoutSettings) Option {
return func(o *baseSettings) {
o.TimeoutSettings = timeoutSettings
}
}
// WithRetry overrides the default RetrySettings for an exporter.
// The default RetrySettings is to disable retries.
func WithRetry(retrySettings RetrySettings) ExporterOption {
return func(o *internalOptions) {
func WithRetry(retrySettings RetrySettings) Option {
return func(o *baseSettings) {
o.RetrySettings = retrySettings
}
}
// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
func WithQueue(queueSettings QueueSettings) ExporterOption {
return func(o *internalOptions) {
func WithQueue(queueSettings QueueSettings) Option {
return func(o *baseSettings) {
o.QueueSettings = queueSettings
}
}
// WithResourceToTelemetryConversion overrides the default ResourceToTelemetrySettings for an exporter.
// The default ResourceToTelemetrySettings is to disable resource attributes to metric labels conversion.
func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTelemetrySettings) ExporterOption {
return func(o *internalOptions) {
func WithResourceToTelemetryConversion(resourceToTelemetrySettings ResourceToTelemetrySettings) Option {
return func(o *baseSettings) {
o.ResourceToTelemetrySettings = resourceToTelemetrySettings
}
}
// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.Component
cfg configmodels.Exporter
sender requestSender
qrSender *queuedRetrySender
start Start
shutdown Shutdown
startOnce sync.Once
shutdownOnce sync.Once
convertResourceToTelemetry bool
}
func newBaseExporter(cfg configmodels.Exporter, logger *zap.Logger, options ...ExporterOption) *baseExporter {
opts := fromConfiguredOptions(options...)
func newBaseExporter(cfg configmodels.Exporter, logger *zap.Logger, options ...Option) *baseExporter {
bs := fromOptions(options)
be := &baseExporter{
Component: componenthelper.NewComponent(bs.ComponentSettings),
cfg: cfg,
start: opts.Start,
shutdown: opts.Shutdown,
convertResourceToTelemetry: opts.ResourceToTelemetrySettings.Enabled,
convertResourceToTelemetry: bs.ResourceToTelemetrySettings.Enabled,
}
be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings}, logger)
be.qrSender = newQueuedRetrySender(bs.QueueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, logger)
be.sender = be.qrSender
return be
@ -199,31 +186,22 @@ func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) reques
// Start all senders and exporter and is invoked during service start.
func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
err := componenterror.ErrAlreadyStarted
be.startOnce.Do(func() {
// First start the wrapped exporter.
err = be.start(ctx, host)
if err != nil {
// TODO: Log errors, or check if it is recorded by the caller.
return
}
// First start the wrapped exporter.
if err := be.Component.Start(ctx, host); err != nil {
return err
}
// If no error then start the queuedRetrySender.
be.qrSender.start()
})
return err
// If no error then start the queuedRetrySender.
be.qrSender.start()
return nil
}
// Shutdown all senders and exporter and is invoked during service shutdown.
func (be *baseExporter) Shutdown(ctx context.Context) error {
err := componenterror.ErrAlreadyStopped
be.shutdownOnce.Do(func() {
// First shutdown the queued retry sender
be.qrSender.shutdown()
// Last shutdown the wrapped exporter itself.
err = be.shutdown(ctx)
})
return err
// First shutdown the queued retry sender
be.qrSender.shutdown()
// Last shutdown the wrapped exporter itself.
return be.Component.Shutdown(ctx)
}
// timeoutSender is a request sender that adds a `timeout` to every request that passes this sender.

View File

@ -44,16 +44,17 @@ func TestBaseExporter(t *testing.T) {
}
func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be := newBaseExporter(
defaultExporterCfg,
zap.NewNop(),
WithStart(func(ctx context.Context, host component.Host) error { return errors.New("my error") }),
WithShutdown(func(ctx context.Context) error { return errors.New("my error") }),
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithResourceToTelemetryConversion(createDefaultResourceToTelemetrySettings()),
WithTimeout(CreateDefaultTimeoutSettings()),
)
require.Error(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.Error(t, be.Shutdown(context.Background()))
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
require.Equal(t, want, be.Shutdown(context.Background()))
}
func errToStatus(err error) trace.Status {

View File

@ -73,7 +73,7 @@ func NewLogsExporter(
cfg configmodels.Exporter,
logger *zap.Logger,
pushLogsData PushLogsData,
options ...ExporterOption,
options ...Option,
) (component.LogsExporter, error) {
if cfg == nil {
return nil, errNilConfig

View File

@ -78,7 +78,7 @@ func NewMetricsExporter(
cfg configmodels.Exporter,
logger *zap.Logger,
pushMetricsData PushMetricsData,
options ...ExporterOption,
options ...Option,
) (component.MetricsExporter, error) {
if cfg == nil {
return nil, errNilConfig

View File

@ -74,7 +74,7 @@ func NewTraceExporter(
cfg configmodels.Exporter,
logger *zap.Logger,
dataPusher traceDataPusher,
options ...ExporterOption,
options ...Option,
) (component.TracesExporter, error) {
if cfg == nil {

View File

@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
@ -30,12 +31,6 @@ import (
// to stop further processing without propagating an error back up the pipeline to logs.
var ErrSkipProcessingData = errors.New("sentinel error to skip processing data from the remainder of the pipeline")
// Start specifies the function invoked when the processor is being started.
type Start func(context.Context, component.Host) error
// Shutdown specifies the function invoked when the processor is being shutdown.
type Shutdown func(context.Context) error
// TProcessor is a helper interface that allows avoiding implementing all functions in TracesProcessor by using NewTraceProcessor.
type TProcessor interface {
// ProcessTraces is a helper function that processes the incoming data and returns the data to be sent to the next component.
@ -58,74 +53,75 @@ type LProcessor interface {
}
// Option apply changes to internalOptions.
type Option func(*baseProcessor)
type Option func(*baseSettings)
// WithStart overrides the default Start function for an processor.
// The default shutdown function does nothing and always returns nil.
func WithStart(start Start) Option {
return func(o *baseProcessor) {
o.start = start
func WithStart(start componenthelper.Start) Option {
return func(o *baseSettings) {
o.Start = start
}
}
// WithShutdown overrides the default Shutdown function for an processor.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown Shutdown) Option {
return func(o *baseProcessor) {
o.shutdown = shutdown
func WithShutdown(shutdown componenthelper.Shutdown) Option {
return func(o *baseSettings) {
o.Shutdown = shutdown
}
}
// WithShutdown overrides the default GetCapabilities function for an processor.
// The default GetCapabilities function returns mutable capabilities.
func WithCapabilities(capabilities component.ProcessorCapabilities) Option {
return func(o *baseProcessor) {
return func(o *baseSettings) {
o.capabilities = capabilities
}
}
type baseSettings struct {
*componenthelper.ComponentSettings
capabilities component.ProcessorCapabilities
}
// fromOptions returns the internal settings starting from the default and applying all options.
func fromOptions(options []Option) *baseSettings {
// Start from the default options:
opts := &baseSettings{
ComponentSettings: componenthelper.DefaultComponentSettings(),
capabilities: component.ProcessorCapabilities{MutatesConsumedData: true},
}
for _, op := range options {
op(opts)
}
return opts
}
// internalOptions contains internalOptions concerning how an Processor is configured.
type baseProcessor struct {
component.Component
fullName string
start Start
shutdown Shutdown
capabilities component.ProcessorCapabilities
}
// Construct the internalOptions from multiple Option.
func newBaseProcessor(fullName string, options ...Option) baseProcessor {
bs := fromOptions(options)
be := baseProcessor{
Component: componenthelper.NewComponent(bs.ComponentSettings),
fullName: fullName,
capabilities: component.ProcessorCapabilities{MutatesConsumedData: true},
}
for _, op := range options {
op(&be)
capabilities: bs.capabilities,
}
return be
}
// Start the processor, invoked during service start.
func (bp *baseProcessor) Start(ctx context.Context, host component.Host) error {
if bp.start != nil {
return bp.start(ctx, host)
}
return nil
}
func (bp *baseProcessor) GetCapabilities() component.ProcessorCapabilities {
return bp.capabilities
}
// Shutdown the processor, invoked during service shutdown.
func (bp *baseProcessor) Shutdown(ctx context.Context) error {
if bp.shutdown != nil {
return bp.shutdown(ctx)
}
return nil
}
type tracesProcessor struct {
baseProcessor
processor TProcessor

View File

@ -38,45 +38,21 @@ var testCfg = &configmodels.ProcessorSettings{
NameVal: testFullName,
}
func TestWithStart(t *testing.T) {
startCalled := false
start := func(context.Context, component.Host) error { startCalled = true; return nil }
bp := newBaseProcessor(testFullName, WithStart(start))
assert.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))
assert.True(t, startCalled)
}
func TestWithStart_ReturnError(t *testing.T) {
want := errors.New("my_error")
start := func(context.Context, component.Host) error { return want }
bp := newBaseProcessor(testFullName, WithStart(start))
assert.Equal(t, want, bp.Start(context.Background(), componenttest.NewNopHost()))
}
func TestWithShutdown(t *testing.T) {
shutdownCalled := false
shutdown := func(context.Context) error { shutdownCalled = true; return nil }
bp := newBaseProcessor(testFullName, WithShutdown(shutdown))
assert.NoError(t, bp.Shutdown(context.Background()))
assert.True(t, shutdownCalled)
}
func TestWithShutdown_ReturnError(t *testing.T) {
want := errors.New("my_error")
shutdownErr := func(context.Context) error { return want }
bp := newBaseProcessor(testFullName, WithShutdown(shutdownErr))
assert.Equal(t, want, bp.Shutdown(context.Background()))
}
func TestWithCapabilities(t *testing.T) {
func TestDefaultOptions(t *testing.T) {
bp := newBaseProcessor(testFullName)
assert.True(t, bp.GetCapabilities().MutatesConsumedData)
assert.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, bp.Shutdown(context.Background()))
}
bp = newBaseProcessor(testFullName, WithCapabilities(component.ProcessorCapabilities{MutatesConsumedData: false}))
func TestWithOptions(t *testing.T) {
want := errors.New("my_error")
bp := newBaseProcessor(testFullName,
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithCapabilities(component.ProcessorCapabilities{MutatesConsumedData: false}))
assert.Equal(t, want, bp.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, want, bp.Shutdown(context.Background()))
assert.False(t, bp.GetCapabilities().MutatesConsumedData)
}