From ce71309a1dd89ec872342794c813546a0af9d831 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 16 Mar 2022 14:24:33 +0200 Subject: [PATCH] Deprecate consumerhelper, move helpers to consumer (#5006) Signed-off-by: Bogdan Drutu --- CHANGELOG.md | 1 + consumer/consumer.go | 50 +++++++++-------- consumer/consumerhelper/common.go | 38 ++----------- consumer/consumerhelper/common_test.go | 36 ------------- consumer/consumerhelper/doc.go | 17 ------ consumer/consumerhelper/logs.go | 29 ++-------- consumer/consumerhelper/metrics.go | 29 ++-------- consumer/consumerhelper/traces.go | 29 ++-------- consumer/logs.go | 53 +++++++++++++++++++ consumer/{consumerhelper => }/logs_test.go | 9 ++-- consumer/metrics.go | 53 +++++++++++++++++++ consumer/{consumerhelper => }/metrics_test.go | 9 ++-- consumer/traces.go | 53 +++++++++++++++++++ consumer/{consumerhelper => }/traces_test.go | 9 ++-- exporter/exporterhelper/common.go | 5 +- exporter/exporterhelper/common_test.go | 4 +- exporter/exporterhelper/logs.go | 11 ++-- exporter/exporterhelper/logs_test.go | 3 +- exporter/exporterhelper/metrics.go | 11 ++-- exporter/exporterhelper/metrics_test.go | 3 +- exporter/exporterhelper/traces.go | 11 ++-- exporter/exporterhelper/traces_test.go | 3 +- processor/processorhelper/logs.go | 3 +- processor/processorhelper/metrics.go | 3 +- processor/processorhelper/processor.go | 7 ++- processor/processorhelper/traces.go | 3 +- 26 files changed, 245 insertions(+), 237 deletions(-) delete mode 100644 consumer/consumerhelper/common_test.go delete mode 100644 consumer/consumerhelper/doc.go create mode 100644 consumer/logs.go rename consumer/{consumerhelper => }/logs_test.go (86%) create mode 100644 consumer/metrics.go rename consumer/{consumerhelper => }/metrics_test.go (86%) create mode 100644 consumer/traces.go rename consumer/{consumerhelper => }/traces_test.go (86%) diff --git a/CHANGELOG.md b/CHANGELOG.md index db4fb988ed..deb2f5ec75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ ### 🚩 Deprecations 🚩 - Deprecate `pdata.AttributeMap.Delete` in favor of `pdata.AttributeMap.Remove` (#4914) +- Deprecate consumerhelper, move helpers to consumer (#5006) ### 💡 Enhancements 💡 diff --git a/consumer/consumer.go b/consumer/consumer.go index 9d446f65aa..ddfce62cdd 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -15,9 +15,7 @@ package consumer // import "go.opentelemetry.io/collector/consumer" import ( - "context" - - "go.opentelemetry.io/collector/model/pdata" + "errors" ) // Capabilities describes the capabilities of a Processor. @@ -34,26 +32,36 @@ type baseConsumer interface { Capabilities() Capabilities } -// Metrics is the new metrics consumer interface that receives pdata.Metrics, processes it -// as needed, and sends it to the next processing node if any or to the destination. -type Metrics interface { - baseConsumer - // ConsumeMetrics receives pdata.Metrics for consumption. - ConsumeMetrics(ctx context.Context, md pdata.Metrics) error +var errNilFunc = errors.New("nil consumer func") + +type baseImpl struct { + capabilities Capabilities } -// Traces is an interface that receives pdata.Traces, processes it -// as needed, and sends it to the next processing node if any or to the destination. -type Traces interface { - baseConsumer - // ConsumeTraces receives pdata.Traces for consumption. - ConsumeTraces(ctx context.Context, td pdata.Traces) error +// Option to construct new consumers. +type Option func(*baseImpl) + +// WithCapabilities overrides the default GetCapabilities function for a processor. +// The default GetCapabilities function returns mutable capabilities. +func WithCapabilities(capabilities Capabilities) Option { + return func(o *baseImpl) { + o.capabilities = capabilities + } } -// Logs is an interface that receives pdata.Logs, processes it -// as needed, and sends it to the next processing node if any or to the destination. -type Logs interface { - baseConsumer - // ConsumeLogs receives pdata.Logs for consumption. - ConsumeLogs(ctx context.Context, ld pdata.Logs) error +// Capabilities implementation of the base +func (bs baseImpl) Capabilities() Capabilities { + return bs.capabilities +} + +func newBaseImpl(options ...Option) *baseImpl { + bs := &baseImpl{ + capabilities: Capabilities{MutatesData: false}, + } + + for _, op := range options { + op(bs) + } + + return bs } diff --git a/consumer/consumerhelper/common.go b/consumer/consumerhelper/common.go index 8081e4826f..0081fc50bd 100644 --- a/consumer/consumerhelper/common.go +++ b/consumer/consumerhelper/common.go @@ -15,41 +15,11 @@ package consumerhelper // import "go.opentelemetry.io/collector/consumer/consumerhelper" import ( - "errors" - "go.opentelemetry.io/collector/consumer" ) -var errNilFunc = errors.New("nil consumer func") +// Deprecated: [v0.47.0] use consumer.Option +type Option = consumer.Option -type baseConsumer struct { - capabilities consumer.Capabilities -} - -// Option applies changes to internalOptions. -type Option func(*baseConsumer) - -// WithCapabilities overrides the default GetCapabilities function for a processor. -// The default GetCapabilities function returns mutable capabilities. -func WithCapabilities(capabilities consumer.Capabilities) Option { - return func(o *baseConsumer) { - o.capabilities = capabilities - } -} - -// Capabilities implementation of the base Consumer. -func (bs baseConsumer) Capabilities() consumer.Capabilities { - return bs.capabilities -} - -func newBaseConsumer(options ...Option) *baseConsumer { - bs := &baseConsumer{ - capabilities: consumer.Capabilities{MutatesData: false}, - } - - for _, op := range options { - op(bs) - } - - return bs -} +// Deprecated: [v0.47.0] use consumer.WithCapabilities +var WithCapabilities = consumer.WithCapabilities diff --git a/consumer/consumerhelper/common_test.go b/consumer/consumerhelper/common_test.go deleted file mode 100644 index 924531a9b8..0000000000 --- a/consumer/consumerhelper/common_test.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package consumerhelper - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "go.opentelemetry.io/collector/consumer" -) - -func TestDefaultOptions(t *testing.T) { - bp := newBaseConsumer() - assert.Equal(t, consumer.Capabilities{MutatesData: false}, bp.Capabilities()) -} - -func TestWithCapabilities(t *testing.T) { - bpMutate := newBaseConsumer(WithCapabilities(consumer.Capabilities{MutatesData: true})) - assert.Equal(t, consumer.Capabilities{MutatesData: true}, bpMutate.Capabilities()) - - bpNotMutate := newBaseConsumer(WithCapabilities(consumer.Capabilities{MutatesData: false})) - assert.Equal(t, consumer.Capabilities{MutatesData: false}, bpNotMutate.Capabilities()) -} diff --git a/consumer/consumerhelper/doc.go b/consumer/consumerhelper/doc.go deleted file mode 100644 index 977dc8faea..0000000000 --- a/consumer/consumerhelper/doc.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package consumerhelper defines types and functions used to create consumer -// Logs, Metrics, and Traces. -package consumerhelper // import "go.opentelemetry.io/collector/consumer/consumerhelper" diff --git a/consumer/consumerhelper/logs.go b/consumer/consumerhelper/logs.go index 898fa7e99c..7ac046fe63 100644 --- a/consumer/consumerhelper/logs.go +++ b/consumer/consumerhelper/logs.go @@ -15,32 +15,11 @@ package consumerhelper // import "go.opentelemetry.io/collector/consumer/consumerhelper" import ( - "context" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/model/pdata" ) -// ConsumeLogsFunc is a helper function that is similar to ConsumeLogs. -type ConsumeLogsFunc func(ctx context.Context, ld pdata.Logs) error +// Deprecated: [v0.47.0] use consumer.ConsumeLogsFunc +type ConsumeLogsFunc = consumer.ConsumeLogsFunc -// ConsumeLogs calls f(ctx, ld). -func (f ConsumeLogsFunc) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { - return f(ctx, ld) -} - -type baseLogs struct { - *baseConsumer - ConsumeLogsFunc -} - -// NewLogs returns a consumer.Logs configured with the provided options. -func NewLogs(consume ConsumeLogsFunc, options ...Option) (consumer.Logs, error) { - if consume == nil { - return nil, errNilFunc - } - return &baseLogs{ - baseConsumer: newBaseConsumer(options...), - ConsumeLogsFunc: consume, - }, nil -} +// Deprecated: [v0.47.0] use consumer.NewLogs +var NewLogs = consumer.NewLogs diff --git a/consumer/consumerhelper/metrics.go b/consumer/consumerhelper/metrics.go index 4bbd35607d..61dfe5eba5 100644 --- a/consumer/consumerhelper/metrics.go +++ b/consumer/consumerhelper/metrics.go @@ -15,32 +15,11 @@ package consumerhelper // import "go.opentelemetry.io/collector/consumer/consumerhelper" import ( - "context" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/model/pdata" ) -// ConsumeMetricsFunc is a helper function that is similar to ConsumeMetrics. -type ConsumeMetricsFunc func(ctx context.Context, ld pdata.Metrics) error +// Deprecated: [v0.47.0] use consumer.ConsumeMetricsFunc +type ConsumeMetricsFunc = consumer.ConsumeMetricsFunc -// ConsumeMetrics calls f(ctx, ld). -func (f ConsumeMetricsFunc) ConsumeMetrics(ctx context.Context, ld pdata.Metrics) error { - return f(ctx, ld) -} - -type baseMetrics struct { - *baseConsumer - ConsumeMetricsFunc -} - -// NewMetrics returns a consumer.Metrics configured with the provided options. -func NewMetrics(consume ConsumeMetricsFunc, options ...Option) (consumer.Metrics, error) { - if consume == nil { - return nil, errNilFunc - } - return &baseMetrics{ - baseConsumer: newBaseConsumer(options...), - ConsumeMetricsFunc: consume, - }, nil -} +// Deprecated: [v0.47.0] use consumer.NewMetrics +var NewMetrics = consumer.NewMetrics diff --git a/consumer/consumerhelper/traces.go b/consumer/consumerhelper/traces.go index d71f5a5606..b232f01433 100644 --- a/consumer/consumerhelper/traces.go +++ b/consumer/consumerhelper/traces.go @@ -15,32 +15,11 @@ package consumerhelper // import "go.opentelemetry.io/collector/consumer/consumerhelper" import ( - "context" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/model/pdata" ) -// ConsumeTracesFunc is a helper function that is similar to ConsumeTraces. -type ConsumeTracesFunc func(ctx context.Context, ld pdata.Traces) error +// Deprecated: [v0.47.0] use consumer.ConsumeTracesFunc +type ConsumeTracesFunc = consumer.ConsumeTracesFunc -// ConsumeTraces calls f(ctx, ld). -func (f ConsumeTracesFunc) ConsumeTraces(ctx context.Context, ld pdata.Traces) error { - return f(ctx, ld) -} - -type baseTraces struct { - *baseConsumer - ConsumeTracesFunc -} - -// NewTraces returns a consumer.Traces configured with the provided options. -func NewTraces(consume ConsumeTracesFunc, options ...Option) (consumer.Traces, error) { - if consume == nil { - return nil, errNilFunc - } - return &baseTraces{ - baseConsumer: newBaseConsumer(options...), - ConsumeTracesFunc: consume, - }, nil -} +// Deprecated: [v0.47.0] use consumer.NewTraces +var NewTraces = consumer.NewTraces diff --git a/consumer/logs.go b/consumer/logs.go new file mode 100644 index 0000000000..2876c14978 --- /dev/null +++ b/consumer/logs.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer // import "go.opentelemetry.io/collector/consumer" + +import ( + "context" + + "go.opentelemetry.io/collector/model/pdata" +) + +// Logs is an interface that receives pdata.Logs, processes it +// as needed, and sends it to the next processing node if any or to the destination. +type Logs interface { + baseConsumer + // ConsumeLogs receives pdata.Logs for consumption. + ConsumeLogs(ctx context.Context, ld pdata.Logs) error +} + +// ConsumeLogsFunc is a helper function that is similar to ConsumeLogs. +type ConsumeLogsFunc func(ctx context.Context, ld pdata.Logs) error + +// ConsumeLogs calls f(ctx, ld). +func (f ConsumeLogsFunc) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { + return f(ctx, ld) +} + +type baseLogs struct { + *baseImpl + ConsumeLogsFunc +} + +// NewLogs returns a Logs configured with the provided options. +func NewLogs(consume ConsumeLogsFunc, options ...Option) (Logs, error) { + if consume == nil { + return nil, errNilFunc + } + return &baseLogs{ + baseImpl: newBaseImpl(options...), + ConsumeLogsFunc: consume, + }, nil +} diff --git a/consumer/consumerhelper/logs_test.go b/consumer/logs_test.go similarity index 86% rename from consumer/consumerhelper/logs_test.go rename to consumer/logs_test.go index 695a398b7e..d3815937d9 100644 --- a/consumer/consumerhelper/logs_test.go +++ b/consumer/logs_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package consumerhelper +package consumer import ( "context" @@ -21,7 +21,6 @@ import ( "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/model/pdata" ) @@ -29,7 +28,7 @@ func TestDefaultLogs(t *testing.T) { cp, err := NewLogs(func(context.Context, pdata.Logs) error { return nil }) assert.NoError(t, err) assert.NoError(t, cp.ConsumeLogs(context.Background(), pdata.NewLogs())) - assert.Equal(t, consumer.Capabilities{MutatesData: false}, cp.Capabilities()) + assert.Equal(t, Capabilities{MutatesData: false}, cp.Capabilities()) } func TestNilFuncLogs(t *testing.T) { @@ -40,10 +39,10 @@ func TestNilFuncLogs(t *testing.T) { func TestWithCapabilitiesLogs(t *testing.T) { cp, err := NewLogs( func(context.Context, pdata.Logs) error { return nil }, - WithCapabilities(consumer.Capabilities{MutatesData: true})) + WithCapabilities(Capabilities{MutatesData: true})) assert.NoError(t, err) assert.NoError(t, cp.ConsumeLogs(context.Background(), pdata.NewLogs())) - assert.Equal(t, consumer.Capabilities{MutatesData: true}, cp.Capabilities()) + assert.Equal(t, Capabilities{MutatesData: true}, cp.Capabilities()) } func TestConsumeLogs(t *testing.T) { diff --git a/consumer/metrics.go b/consumer/metrics.go new file mode 100644 index 0000000000..43102cee7f --- /dev/null +++ b/consumer/metrics.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer // import "go.opentelemetry.io/collector/consumer" + +import ( + "context" + + "go.opentelemetry.io/collector/model/pdata" +) + +// Metrics is the new metrics consumer interface that receives pdata.Metrics, processes it +// as needed, and sends it to the next processing node if any or to the destination. +type Metrics interface { + baseConsumer + // ConsumeMetrics receives pdata.Metrics for consumption. + ConsumeMetrics(ctx context.Context, md pdata.Metrics) error +} + +// ConsumeMetricsFunc is a helper function that is similar to ConsumeMetrics. +type ConsumeMetricsFunc func(ctx context.Context, ld pdata.Metrics) error + +// ConsumeMetrics calls f(ctx, ld). +func (f ConsumeMetricsFunc) ConsumeMetrics(ctx context.Context, ld pdata.Metrics) error { + return f(ctx, ld) +} + +type baseMetrics struct { + *baseImpl + ConsumeMetricsFunc +} + +// NewMetrics returns a Metrics configured with the provided options. +func NewMetrics(consume ConsumeMetricsFunc, options ...Option) (Metrics, error) { + if consume == nil { + return nil, errNilFunc + } + return &baseMetrics{ + baseImpl: newBaseImpl(options...), + ConsumeMetricsFunc: consume, + }, nil +} diff --git a/consumer/consumerhelper/metrics_test.go b/consumer/metrics_test.go similarity index 86% rename from consumer/consumerhelper/metrics_test.go rename to consumer/metrics_test.go index c74544f7ef..f6b8856886 100644 --- a/consumer/consumerhelper/metrics_test.go +++ b/consumer/metrics_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package consumerhelper +package consumer import ( "context" @@ -21,7 +21,6 @@ import ( "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/model/pdata" ) @@ -29,7 +28,7 @@ func TestDefaultMetrics(t *testing.T) { cp, err := NewMetrics(func(context.Context, pdata.Metrics) error { return nil }) assert.NoError(t, err) assert.NoError(t, cp.ConsumeMetrics(context.Background(), pdata.NewMetrics())) - assert.Equal(t, consumer.Capabilities{MutatesData: false}, cp.Capabilities()) + assert.Equal(t, Capabilities{MutatesData: false}, cp.Capabilities()) } func TestNilFuncMetrics(t *testing.T) { @@ -40,10 +39,10 @@ func TestNilFuncMetrics(t *testing.T) { func TestWithCapabilitiesMetrics(t *testing.T) { cp, err := NewMetrics( func(context.Context, pdata.Metrics) error { return nil }, - WithCapabilities(consumer.Capabilities{MutatesData: true})) + WithCapabilities(Capabilities{MutatesData: true})) assert.NoError(t, err) assert.NoError(t, cp.ConsumeMetrics(context.Background(), pdata.NewMetrics())) - assert.Equal(t, consumer.Capabilities{MutatesData: true}, cp.Capabilities()) + assert.Equal(t, Capabilities{MutatesData: true}, cp.Capabilities()) } func TestConsumeMetrics(t *testing.T) { diff --git a/consumer/traces.go b/consumer/traces.go new file mode 100644 index 0000000000..3852d7d253 --- /dev/null +++ b/consumer/traces.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer // import "go.opentelemetry.io/collector/consumer" + +import ( + "context" + + "go.opentelemetry.io/collector/model/pdata" +) + +// Traces is an interface that receives pdata.Traces, processes it +// as needed, and sends it to the next processing node if any or to the destination. +type Traces interface { + baseConsumer + // ConsumeTraces receives pdata.Traces for consumption. + ConsumeTraces(ctx context.Context, td pdata.Traces) error +} + +// ConsumeTracesFunc is a helper function that is similar to ConsumeTraces. +type ConsumeTracesFunc func(ctx context.Context, ld pdata.Traces) error + +// ConsumeTraces calls f(ctx, ld). +func (f ConsumeTracesFunc) ConsumeTraces(ctx context.Context, ld pdata.Traces) error { + return f(ctx, ld) +} + +type baseTraces struct { + *baseImpl + ConsumeTracesFunc +} + +// NewTraces returns a Traces configured with the provided options. +func NewTraces(consume ConsumeTracesFunc, options ...Option) (Traces, error) { + if consume == nil { + return nil, errNilFunc + } + return &baseTraces{ + baseImpl: newBaseImpl(options...), + ConsumeTracesFunc: consume, + }, nil +} diff --git a/consumer/consumerhelper/traces_test.go b/consumer/traces_test.go similarity index 86% rename from consumer/consumerhelper/traces_test.go rename to consumer/traces_test.go index af3114b1c5..41cd269cfe 100644 --- a/consumer/consumerhelper/traces_test.go +++ b/consumer/traces_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package consumerhelper +package consumer import ( "context" @@ -21,7 +21,6 @@ import ( "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/model/pdata" ) @@ -29,7 +28,7 @@ func TestDefaultTraces(t *testing.T) { cp, err := NewTraces(func(context.Context, pdata.Traces) error { return nil }) assert.NoError(t, err) assert.NoError(t, cp.ConsumeTraces(context.Background(), pdata.NewTraces())) - assert.Equal(t, consumer.Capabilities{MutatesData: false}, cp.Capabilities()) + assert.Equal(t, Capabilities{MutatesData: false}, cp.Capabilities()) } func TestNilFuncTraces(t *testing.T) { @@ -40,10 +39,10 @@ func TestNilFuncTraces(t *testing.T) { func TestWithCapabilitiesTraces(t *testing.T) { cp, err := NewTraces( func(context.Context, pdata.Traces) error { return nil }, - WithCapabilities(consumer.Capabilities{MutatesData: true})) + WithCapabilities(Capabilities{MutatesData: true})) assert.NoError(t, err) assert.NoError(t, cp.ConsumeTraces(context.Background(), pdata.NewTraces())) - assert.Equal(t, consumer.Capabilities{MutatesData: true}, cp.Capabilities()) + assert.Equal(t, Capabilities{MutatesData: true}, cp.Capabilities()) } func TestConsumeTraces(t *testing.T) { diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 6f6efbce5b..c48af91c96 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -21,7 +21,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/obsreport" ) @@ -89,7 +88,7 @@ func (req *baseRequest) OnProcessingFinished() { type baseSettings struct { component.StartFunc component.ShutdownFunc - consumerOptions []consumerhelper.Option + consumerOptions []consumer.Option TimeoutSettings QueueSettings RetrySettings @@ -161,7 +160,7 @@ func WithQueue(queueSettings QueueSettings) Option { // TODO: Verify if we can change the default to be mutable as we do for processors. func WithCapabilities(capabilities consumer.Capabilities) Option { return func(o *baseSettings) { - o.consumerOptions = append(o.consumerOptions, consumerhelper.WithCapabilities(capabilities)) + o.consumerOptions = append(o.consumerOptions, consumer.WithCapabilities(capabilities)) } } diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 3dc7ed3ae1..42559e212a 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -26,7 +26,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/consumer/consumerhelper" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/model/pdata" ) @@ -70,7 +70,7 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { } } -func nopTracePusher() consumerhelper.ConsumeTracesFunc { +func nopTracePusher() consumer.ConsumeTracesFunc { return func(ctx context.Context, ld pdata.Traces) error { return nil } diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index c89514f13b..3837d9a901 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/model/otlp" "go.opentelemetry.io/collector/model/pdata" @@ -34,10 +33,10 @@ var logsUnmarshaler = otlp.NewProtobufLogsUnmarshaler() type logsRequest struct { baseRequest ld pdata.Logs - pusher consumerhelper.ConsumeLogsFunc + pusher consumer.ConsumeLogsFunc } -func newLogsRequest(ctx context.Context, ld pdata.Logs, pusher consumerhelper.ConsumeLogsFunc) request { +func newLogsRequest(ctx context.Context, ld pdata.Logs, pusher consumer.ConsumeLogsFunc) request { return &logsRequest{ baseRequest: baseRequest{ctx: ctx}, ld: ld, @@ -45,7 +44,7 @@ func newLogsRequest(ctx context.Context, ld pdata.Logs, pusher consumerhelper.Co } } -func newLogsRequestUnmarshalerFunc(pusher consumerhelper.ConsumeLogsFunc) internal.RequestUnmarshaler { +func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) internal.RequestUnmarshaler { return func(bytes []byte) (internal.PersistentRequest, error) { logs, err := logsUnmarshaler.UnmarshalLogs(bytes) if err != nil { @@ -84,7 +83,7 @@ type logsExporter struct { func NewLogsExporter( cfg config.Exporter, set component.ExporterCreateSettings, - pusher consumerhelper.ConsumeLogsFunc, + pusher consumer.ConsumeLogsFunc, options ...Option, ) (component.LogsExporter, error) { if cfg == nil { @@ -108,7 +107,7 @@ func NewLogsExporter( } }) - lc, err := consumerhelper.NewLogs(func(ctx context.Context, ld pdata.Logs) error { + lc, err := consumer.NewLogs(func(ctx context.Context, ld pdata.Logs) error { req := newLogsRequest(ctx, ld, pusher) err := be.sender.send(req) if errors.Is(err, errSendingQueueIsFull) { diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 79a0fe940e..3bd0dcc9b3 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -31,7 +31,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/model/pdata" @@ -199,7 +198,7 @@ func TestLogsExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, le.Shutdown(context.Background()), want) } -func newPushLogsData(retError error) consumerhelper.ConsumeLogsFunc { +func newPushLogsData(retError error) consumer.ConsumeLogsFunc { return func(ctx context.Context, td pdata.Logs) error { return retError } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 9cc46c438d..f17eefee95 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/model/otlp" "go.opentelemetry.io/collector/model/pdata" @@ -34,10 +33,10 @@ var metricsUnmarshaler = otlp.NewProtobufMetricsUnmarshaler() type metricsRequest struct { baseRequest md pdata.Metrics - pusher consumerhelper.ConsumeMetricsFunc + pusher consumer.ConsumeMetricsFunc } -func newMetricsRequest(ctx context.Context, md pdata.Metrics, pusher consumerhelper.ConsumeMetricsFunc) request { +func newMetricsRequest(ctx context.Context, md pdata.Metrics, pusher consumer.ConsumeMetricsFunc) request { return &metricsRequest{ baseRequest: baseRequest{ctx: ctx}, md: md, @@ -45,7 +44,7 @@ func newMetricsRequest(ctx context.Context, md pdata.Metrics, pusher consumerhel } } -func newMetricsRequestUnmarshalerFunc(pusher consumerhelper.ConsumeMetricsFunc) internal.RequestUnmarshaler { +func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) internal.RequestUnmarshaler { return func(bytes []byte) (internal.PersistentRequest, error) { metrics, err := metricsUnmarshaler.UnmarshalMetrics(bytes) if err != nil { @@ -85,7 +84,7 @@ type metricsExporter struct { func NewMetricsExporter( cfg config.Exporter, set component.ExporterCreateSettings, - pusher consumerhelper.ConsumeMetricsFunc, + pusher consumer.ConsumeMetricsFunc, options ...Option, ) (component.MetricsExporter, error) { if cfg == nil { @@ -109,7 +108,7 @@ func NewMetricsExporter( } }) - mc, err := consumerhelper.NewMetrics(func(ctx context.Context, md pdata.Metrics) error { + mc, err := consumer.NewMetrics(func(ctx context.Context, md pdata.Metrics) error { req := newMetricsRequest(ctx, md, pusher) err := be.sender.send(req) if errors.Is(err, errSendingQueueIsFull) { diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 9a725a0f09..0dfc62db76 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -31,7 +31,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/model/pdata" @@ -200,7 +199,7 @@ func TestMetricsExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, me.Shutdown(context.Background())) } -func newPushMetricsData(retError error) consumerhelper.ConsumeMetricsFunc { +func newPushMetricsData(retError error) consumer.ConsumeMetricsFunc { return func(ctx context.Context, td pdata.Metrics) error { return retError } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 5b6618496a..1a10fb4c9c 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/model/otlp" "go.opentelemetry.io/collector/model/pdata" @@ -34,10 +33,10 @@ var tracesUnmarshaler = otlp.NewProtobufTracesUnmarshaler() type tracesRequest struct { baseRequest td pdata.Traces - pusher consumerhelper.ConsumeTracesFunc + pusher consumer.ConsumeTracesFunc } -func newTracesRequest(ctx context.Context, td pdata.Traces, pusher consumerhelper.ConsumeTracesFunc) request { +func newTracesRequest(ctx context.Context, td pdata.Traces, pusher consumer.ConsumeTracesFunc) request { return &tracesRequest{ baseRequest: baseRequest{ctx: ctx}, td: td, @@ -45,7 +44,7 @@ func newTracesRequest(ctx context.Context, td pdata.Traces, pusher consumerhelpe } } -func newTraceRequestUnmarshalerFunc(pusher consumerhelper.ConsumeTracesFunc) internal.RequestUnmarshaler { +func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) internal.RequestUnmarshaler { return func(bytes []byte) (internal.PersistentRequest, error) { traces, err := tracesUnmarshaler.UnmarshalTraces(bytes) if err != nil { @@ -85,7 +84,7 @@ type traceExporter struct { func NewTracesExporter( cfg config.Exporter, set component.ExporterCreateSettings, - pusher consumerhelper.ConsumeTracesFunc, + pusher consumer.ConsumeTracesFunc, options ...Option, ) (component.TracesExporter, error) { @@ -110,7 +109,7 @@ func NewTracesExporter( } }) - tc, err := consumerhelper.NewTraces(func(ctx context.Context, td pdata.Traces) error { + tc, err := consumer.NewTraces(func(ctx context.Context, td pdata.Traces) error { req := newTracesRequest(ctx, td, pusher) err := be.sender.send(req) if errors.Is(err, errSendingQueueIsFull) { diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 93a9db81d8..f27c6026d3 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -31,7 +31,6 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" - "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/model/pdata" @@ -200,7 +199,7 @@ func TestTracesExporter_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, te.Shutdown(context.Background()), want) } -func newTraceDataPusher(retError error) consumerhelper.ConsumeTracesFunc { +func newTraceDataPusher(retError error) consumer.ConsumeTracesFunc { return func(ctx context.Context, td pdata.Traces) error { return retError } diff --git a/processor/processorhelper/logs.go b/processor/processorhelper/logs.go index 8d049c1fc9..822a0c99a4 100644 --- a/processor/processorhelper/logs.go +++ b/processor/processorhelper/logs.go @@ -24,7 +24,6 @@ import ( "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/model/pdata" ) @@ -56,7 +55,7 @@ func NewLogsProcessor( eventOptions := spanAttributes(cfg.ID()) bs := fromOptions(options) - logsConsumer, err := consumerhelper.NewLogs(func(ctx context.Context, ld pdata.Logs) error { + logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld pdata.Logs) error { span := trace.SpanFromContext(ctx) span.AddEvent("Start processing.", eventOptions) var err error diff --git a/processor/processorhelper/metrics.go b/processor/processorhelper/metrics.go index e7c2d3eacd..4585e51fda 100644 --- a/processor/processorhelper/metrics.go +++ b/processor/processorhelper/metrics.go @@ -24,7 +24,6 @@ import ( "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/model/pdata" ) @@ -56,7 +55,7 @@ func NewMetricsProcessor( eventOptions := spanAttributes(cfg.ID()) bs := fromOptions(options) - metricsConsumer, err := consumerhelper.NewMetrics(func(ctx context.Context, md pdata.Metrics) error { + metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pdata.Metrics) error { span := trace.SpanFromContext(ctx) span.AddEvent("Start processing.", eventOptions) var err error diff --git a/processor/processorhelper/processor.go b/processor/processorhelper/processor.go index ad60ef32a6..1797692bdc 100644 --- a/processor/processorhelper/processor.go +++ b/processor/processorhelper/processor.go @@ -23,7 +23,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) @@ -55,21 +54,21 @@ func WithShutdown(shutdown component.ShutdownFunc) Option { // The default GetCapabilities function returns mutable capabilities. func WithCapabilities(capabilities consumer.Capabilities) Option { return func(o *baseSettings) { - o.consumerOptions = append(o.consumerOptions, consumerhelper.WithCapabilities(capabilities)) + o.consumerOptions = append(o.consumerOptions, consumer.WithCapabilities(capabilities)) } } type baseSettings struct { component.StartFunc component.ShutdownFunc - consumerOptions []consumerhelper.Option + consumerOptions []consumer.Option } // fromOptions returns the internal settings starting from the default and applying all options. func fromOptions(options []Option) *baseSettings { // Start from the default options: opts := &baseSettings{ - consumerOptions: []consumerhelper.Option{consumerhelper.WithCapabilities(consumer.Capabilities{MutatesData: true})}, + consumerOptions: []consumer.Option{consumer.WithCapabilities(consumer.Capabilities{MutatesData: true})}, } for _, op := range options { diff --git a/processor/processorhelper/traces.go b/processor/processorhelper/traces.go index e044f1af3e..b802c539ea 100644 --- a/processor/processorhelper/traces.go +++ b/processor/processorhelper/traces.go @@ -24,7 +24,6 @@ import ( "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerhelper" "go.opentelemetry.io/collector/model/pdata" ) @@ -56,7 +55,7 @@ func NewTracesProcessor( eventOptions := spanAttributes(cfg.ID()) bs := fromOptions(options) - traceConsumer, err := consumerhelper.NewTraces(func(ctx context.Context, td pdata.Traces) error { + traceConsumer, err := consumer.NewTraces(func(ctx context.Context, td pdata.Traces) error { span := trace.SpanFromContext(ctx) span.AddEvent("Start processing.", eventOptions) var err error