Use directly the consumer helper funcs in the other helpers packages. (#4719)
Try to understand if the Component helper is necessary, or only having the helper funcs is enough. Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
parent
5ee1fec9fe
commit
0ef0a9ce7b
|
|
@ -88,12 +88,12 @@ func (f *nopExporterFactory) CreateLogsExporter(
|
|||
}
|
||||
|
||||
var nopExporterInstance = &nopExporter{
|
||||
Component: componenthelper.New(),
|
||||
Consumer: consumertest.NewNop(),
|
||||
Consumer: consumertest.NewNop(),
|
||||
}
|
||||
|
||||
// nopExporter stores consumed traces and metrics for testing purposes.
|
||||
type nopExporter struct {
|
||||
component.Component
|
||||
componenthelper.StartFunc
|
||||
componenthelper.ShutdownFunc
|
||||
consumertest.Consumer
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,11 +68,10 @@ func (f *nopExtensionFactory) CreateExtension(
|
|||
return nopExtensionInstance, nil
|
||||
}
|
||||
|
||||
var nopExtensionInstance = &nopExtension{
|
||||
Component: componenthelper.New(),
|
||||
}
|
||||
var nopExtensionInstance = &nopExtension{}
|
||||
|
||||
// nopExtension stores consumed traces and metrics for testing purposes.
|
||||
type nopExtension struct {
|
||||
component.Component
|
||||
componenthelper.StartFunc
|
||||
componenthelper.ShutdownFunc
|
||||
}
|
||||
|
|
|
|||
|
|
@ -92,12 +92,12 @@ func (f *nopProcessorFactory) CreateLogsProcessor(
|
|||
}
|
||||
|
||||
var nopProcessorInstance = &nopProcessor{
|
||||
Component: componenthelper.New(),
|
||||
Consumer: consumertest.NewNop(),
|
||||
Consumer: consumertest.NewNop(),
|
||||
}
|
||||
|
||||
// nopProcessor stores consumed traces and metrics for testing purposes.
|
||||
type nopProcessor struct {
|
||||
component.Component
|
||||
componenthelper.StartFunc
|
||||
componenthelper.ShutdownFunc
|
||||
consumertest.Consumer
|
||||
}
|
||||
|
|
|
|||
|
|
@ -90,11 +90,10 @@ func (f *nopReceiverFactory) CreateLogsReceiver(
|
|||
return nopReceiverInstance, nil
|
||||
}
|
||||
|
||||
var nopReceiverInstance = &nopReceiver{
|
||||
Component: componenthelper.New(),
|
||||
}
|
||||
var nopReceiverInstance = &nopReceiver{}
|
||||
|
||||
// nopReceiver stores consumed traces and metrics for testing purposes.
|
||||
type nopReceiver struct {
|
||||
component.Component
|
||||
componenthelper.StartFunc
|
||||
componenthelper.ShutdownFunc
|
||||
}
|
||||
|
|
|
|||
|
|
@ -88,8 +88,9 @@ func (req *baseRequest) OnProcessingFinished() {
|
|||
|
||||
// baseSettings represents all the options that users can configure.
|
||||
type baseSettings struct {
|
||||
componentOptions []componenthelper.Option
|
||||
consumerOptions []consumerhelper.Option
|
||||
componenthelper.StartFunc
|
||||
componenthelper.ShutdownFunc
|
||||
consumerOptions []consumerhelper.Option
|
||||
TimeoutSettings
|
||||
QueueSettings
|
||||
RetrySettings
|
||||
|
|
@ -120,7 +121,7 @@ type Option func(*baseSettings)
|
|||
// The default start function does nothing and always returns nil.
|
||||
func WithStart(start componenthelper.StartFunc) Option {
|
||||
return func(o *baseSettings) {
|
||||
o.componentOptions = append(o.componentOptions, componenthelper.WithStart(start))
|
||||
o.StartFunc = start
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -128,7 +129,7 @@ func WithStart(start componenthelper.StartFunc) Option {
|
|||
// The default shutdown function does nothing and always returns nil.
|
||||
func WithShutdown(shutdown componenthelper.ShutdownFunc) Option {
|
||||
return func(o *baseSettings) {
|
||||
o.componentOptions = append(o.componentOptions, componenthelper.WithShutdown(shutdown))
|
||||
o.ShutdownFunc = shutdown
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -167,16 +168,15 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
|
|||
|
||||
// baseExporter contains common fields between different exporter types.
|
||||
type baseExporter struct {
|
||||
component.Component
|
||||
componenthelper.StartFunc
|
||||
componenthelper.ShutdownFunc
|
||||
obsrep *obsExporter
|
||||
sender requestSender
|
||||
qrSender *queuedRetrySender
|
||||
}
|
||||
|
||||
func newBaseExporter(cfg config.Exporter, set component.ExporterCreateSettings, bs *baseSettings, signal config.DataType, reqUnmarshaler internal.RequestUnmarshaler) *baseExporter {
|
||||
be := &baseExporter{
|
||||
Component: componenthelper.New(bs.componentOptions...),
|
||||
}
|
||||
be := &baseExporter{}
|
||||
|
||||
be.obsrep = newObsExporter(obsreport.ExporterSettings{
|
||||
Level: set.MetricsLevel,
|
||||
|
|
@ -185,7 +185,21 @@ func newBaseExporter(cfg config.Exporter, set component.ExporterCreateSettings,
|
|||
}, globalInstruments)
|
||||
be.qrSender = newQueuedRetrySender(cfg.ID(), signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
|
||||
be.sender = be.qrSender
|
||||
be.StartFunc = func(ctx context.Context, host component.Host) error {
|
||||
// First start the wrapped exporter.
|
||||
if err := bs.StartFunc.Start(ctx, host); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If no error then start the queuedRetrySender.
|
||||
return be.qrSender.start(ctx, host)
|
||||
}
|
||||
be.ShutdownFunc = func(ctx context.Context) error {
|
||||
// First shutdown the queued retry sender
|
||||
be.qrSender.shutdown()
|
||||
// Last shutdown the wrapped exporter itself.
|
||||
return bs.ShutdownFunc.Shutdown(ctx)
|
||||
}
|
||||
return be
|
||||
}
|
||||
|
||||
|
|
@ -195,25 +209,6 @@ func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) reques
|
|||
be.qrSender.consumerSender = f(be.qrSender.consumerSender)
|
||||
}
|
||||
|
||||
// Start all senders and exporter and is invoked during service start.
|
||||
func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
|
||||
// First start the wrapped exporter.
|
||||
if err := be.Component.Start(ctx, host); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If no error then start the queuedRetrySender.
|
||||
return be.qrSender.start(ctx, host)
|
||||
}
|
||||
|
||||
// Shutdown all senders and exporter and is invoked during service shutdown.
|
||||
func (be *baseExporter) Shutdown(ctx context.Context) error {
|
||||
// First shutdown the queued retry sender
|
||||
be.qrSender.shutdown()
|
||||
// Last shutdown the wrapped exporter itself.
|
||||
return be.Component.Shutdown(ctx)
|
||||
}
|
||||
|
||||
// timeoutSender is a request sender that adds a `timeout` to every request that passes this sender.
|
||||
type timeoutSender struct {
|
||||
cfg TimeoutSettings
|
||||
|
|
|
|||
|
|
@ -34,7 +34,8 @@ import (
|
|||
type ProcessLogsFunc func(context.Context, pdata.Logs) (pdata.Logs, error)
|
||||
|
||||
type logProcessor struct {
|
||||
component.Component
|
||||
componenthelper.StartFunc
|
||||
componenthelper.ShutdownFunc
|
||||
consumer.Logs
|
||||
}
|
||||
|
||||
|
|
@ -75,7 +76,8 @@ func NewLogsProcessor(
|
|||
}
|
||||
|
||||
return &logProcessor{
|
||||
Component: componenthelper.New(bs.componentOptions...),
|
||||
Logs: logsConsumer,
|
||||
StartFunc: bs.StartFunc,
|
||||
ShutdownFunc: bs.ShutdownFunc,
|
||||
Logs: logsConsumer,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,7 +34,8 @@ import (
|
|||
type ProcessMetricsFunc func(context.Context, pdata.Metrics) (pdata.Metrics, error)
|
||||
|
||||
type metricsProcessor struct {
|
||||
component.Component
|
||||
componenthelper.StartFunc
|
||||
componenthelper.ShutdownFunc
|
||||
consumer.Metrics
|
||||
}
|
||||
|
||||
|
|
@ -75,7 +76,8 @@ func NewMetricsProcessor(
|
|||
}
|
||||
|
||||
return &metricsProcessor{
|
||||
Component: componenthelper.New(bs.componentOptions...),
|
||||
Metrics: metricsConsumer,
|
||||
StartFunc: bs.StartFunc,
|
||||
ShutdownFunc: bs.ShutdownFunc,
|
||||
Metrics: metricsConsumer,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ type Option func(*baseSettings)
|
|||
// The default shutdown function does nothing and always returns nil.
|
||||
func WithStart(start componenthelper.StartFunc) Option {
|
||||
return func(o *baseSettings) {
|
||||
o.componentOptions = append(o.componentOptions, componenthelper.WithStart(start))
|
||||
o.StartFunc = start
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -47,7 +47,7 @@ func WithStart(start componenthelper.StartFunc) Option {
|
|||
// The default shutdown function does nothing and always returns nil.
|
||||
func WithShutdown(shutdown componenthelper.ShutdownFunc) Option {
|
||||
return func(o *baseSettings) {
|
||||
o.componentOptions = append(o.componentOptions, componenthelper.WithShutdown(shutdown))
|
||||
o.ShutdownFunc = shutdown
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -60,8 +60,9 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
|
|||
}
|
||||
|
||||
type baseSettings struct {
|
||||
componentOptions []componenthelper.Option
|
||||
consumerOptions []consumerhelper.Option
|
||||
componenthelper.StartFunc
|
||||
componenthelper.ShutdownFunc
|
||||
consumerOptions []consumerhelper.Option
|
||||
}
|
||||
|
||||
// fromOptions returns the internal settings starting from the default and applying all options.
|
||||
|
|
|
|||
|
|
@ -34,7 +34,8 @@ import (
|
|||
type ProcessTracesFunc func(context.Context, pdata.Traces) (pdata.Traces, error)
|
||||
|
||||
type tracesProcessor struct {
|
||||
component.Component
|
||||
componenthelper.StartFunc
|
||||
componenthelper.ShutdownFunc
|
||||
consumer.Traces
|
||||
}
|
||||
|
||||
|
|
@ -76,7 +77,8 @@ func NewTracesProcessor(
|
|||
}
|
||||
|
||||
return &tracesProcessor{
|
||||
Component: componenthelper.New(bs.componentOptions...),
|
||||
Traces: traceConsumer,
|
||||
StartFunc: bs.StartFunc,
|
||||
ShutdownFunc: bs.ShutdownFunc,
|
||||
Traces: traceConsumer,
|
||||
}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,31 +42,28 @@ type Scraper interface {
|
|||
Scrape(context.Context) (pdata.Metrics, error)
|
||||
}
|
||||
|
||||
type baseSettings struct {
|
||||
componentOptions []componenthelper.Option
|
||||
}
|
||||
|
||||
// ScraperOption apply changes to internal options.
|
||||
type ScraperOption func(*baseSettings)
|
||||
type ScraperOption func(*baseScraper)
|
||||
|
||||
// WithStart sets the function that will be called on startup.
|
||||
func WithStart(start componenthelper.StartFunc) ScraperOption {
|
||||
return func(o *baseSettings) {
|
||||
o.componentOptions = append(o.componentOptions, componenthelper.WithStart(start))
|
||||
return func(o *baseScraper) {
|
||||
o.StartFunc = start
|
||||
}
|
||||
}
|
||||
|
||||
// WithShutdown sets the function that will be called on shutdown.
|
||||
func WithShutdown(shutdown componenthelper.ShutdownFunc) ScraperOption {
|
||||
return func(o *baseSettings) {
|
||||
o.componentOptions = append(o.componentOptions, componenthelper.WithShutdown(shutdown))
|
||||
return func(o *baseScraper) {
|
||||
o.ShutdownFunc = shutdown
|
||||
}
|
||||
}
|
||||
|
||||
var _ Scraper = (*baseScraper)(nil)
|
||||
|
||||
type baseScraper struct {
|
||||
component.Component
|
||||
componenthelper.StartFunc
|
||||
componenthelper.ShutdownFunc
|
||||
ScrapeFunc
|
||||
id config.ComponentID
|
||||
}
|
||||
|
|
@ -81,16 +78,13 @@ func NewScraper(name string, scrape ScrapeFunc, options ...ScraperOption) (Scrap
|
|||
if scrape == nil {
|
||||
return nil, errNilFunc
|
||||
}
|
||||
set := &baseSettings{}
|
||||
for _, op := range options {
|
||||
op(set)
|
||||
}
|
||||
|
||||
ms := &baseScraper{
|
||||
Component: componenthelper.New(set.componentOptions...),
|
||||
bs := &baseScraper{
|
||||
ScrapeFunc: scrape,
|
||||
id: config.NewComponentID(config.Type(name)),
|
||||
}
|
||||
for _, op := range options {
|
||||
op(bs)
|
||||
}
|
||||
|
||||
return ms, nil
|
||||
return bs, nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue