Deprecated everything in exporterqueue, alias from exporterhelper (#12706)
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
parent
622ce90d91
commit
2bc783c7d3
|
|
@ -0,0 +1,25 @@
|
||||||
|
# Use this changelog template to create an entry for release notes.
|
||||||
|
|
||||||
|
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
|
||||||
|
change_type: deprecation
|
||||||
|
|
||||||
|
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
|
||||||
|
component: exporterqueue
|
||||||
|
|
||||||
|
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
|
||||||
|
note: Deprecated Config, NewDefaultConfig, Encoding, ErrQueueFull. Use alias from exporterhelper.
|
||||||
|
|
||||||
|
# One or more tracking issues or pull requests related to the change
|
||||||
|
issues: [12706]
|
||||||
|
|
||||||
|
# (Optional) One or more lines of additional information to render under the primary note.
|
||||||
|
# These lines will be padded with 2 spaces and then inserted directly into the document.
|
||||||
|
# Use pipe (|) for multiline entries.
|
||||||
|
subtext:
|
||||||
|
|
||||||
|
# Optional: The change log or logs in which this entry should be included.
|
||||||
|
# e.g. '[user]' or '[user, api]'
|
||||||
|
# Include 'user' if the change is relevant to end users.
|
||||||
|
# Include 'api' if there is a change to a library API.
|
||||||
|
# Default: '[user]'
|
||||||
|
change_logs: [api]
|
||||||
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"go.opentelemetry.io/collector/component"
|
"go.opentelemetry.io/collector/component"
|
||||||
"go.opentelemetry.io/collector/config/configretry"
|
"go.opentelemetry.io/collector/config/configretry"
|
||||||
"go.opentelemetry.io/collector/consumer"
|
"go.opentelemetry.io/collector/consumer"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterbatcher"
|
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -38,25 +37,9 @@ func WithRetry(config configretry.BackOffConfig) Option {
|
||||||
return internal.WithRetry(config)
|
return internal.WithRetry(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithQueue overrides the default QueueConfig for an exporter.
|
|
||||||
// The default QueueConfig is to disable queueing.
|
|
||||||
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
|
|
||||||
func WithQueue(config QueueConfig) Option {
|
|
||||||
return internal.WithQueue(config)
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithCapabilities overrides the default Capabilities() function for a Consumer.
|
// WithCapabilities overrides the default Capabilities() function for a Consumer.
|
||||||
// The default is non-mutable data.
|
// The default is non-mutable data.
|
||||||
// TODO: Verify if we can change the default to be mutable as we do for processors.
|
// TODO: Verify if we can change the default to be mutable as we do for processors.
|
||||||
func WithCapabilities(capabilities consumer.Capabilities) Option {
|
func WithCapabilities(capabilities consumer.Capabilities) Option {
|
||||||
return internal.WithCapabilities(capabilities)
|
return internal.WithCapabilities(capabilities)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithBatcher enables batching for an exporter based on custom request types.
|
|
||||||
// For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and
|
|
||||||
// WithRequestBatchFuncs provided.
|
|
||||||
// This API is at the early stage of development and may change without backward compatibility
|
|
||||||
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
|
|
||||||
func WithBatcher(cfg exporterbatcher.Config) Option {
|
|
||||||
return internal.WithBatcher(cfg)
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,6 @@ import (
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterqueue"
|
|
||||||
"go.opentelemetry.io/collector/pipeline"
|
"go.opentelemetry.io/collector/pipeline"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -49,7 +48,7 @@ type BaseExporter struct {
|
||||||
retryCfg configretry.BackOffConfig
|
retryCfg configretry.BackOffConfig
|
||||||
|
|
||||||
queueBatchSettings QueueBatchSettings[request.Request]
|
queueBatchSettings QueueBatchSettings[request.Request]
|
||||||
queueCfg exporterqueue.Config
|
queueCfg QueueConfig
|
||||||
batcherCfg exporterbatcher.Config
|
batcherCfg exporterbatcher.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -195,7 +194,7 @@ func WithRetry(config configretry.BackOffConfig) Option {
|
||||||
// WithQueue overrides the default QueueConfig for an exporter.
|
// WithQueue overrides the default QueueConfig for an exporter.
|
||||||
// The default QueueConfig is to disable queueing.
|
// The default QueueConfig is to disable queueing.
|
||||||
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
|
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
|
||||||
func WithQueue(cfg exporterqueue.Config) Option {
|
func WithQueue(cfg QueueConfig) Option {
|
||||||
return func(o *BaseExporter) error {
|
return func(o *BaseExporter) error {
|
||||||
if o.queueBatchSettings.Encoding == nil {
|
if o.queueBatchSettings.Encoding == nil {
|
||||||
return errors.New("WithQueue option is not available for the new request exporters, use WithQueueBatch instead")
|
return errors.New("WithQueue option is not available for the new request exporters, use WithQueueBatch instead")
|
||||||
|
|
@ -208,7 +207,7 @@ func WithQueue(cfg exporterqueue.Config) Option {
|
||||||
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
|
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
|
||||||
// Experimental: This API is at the early stage of development and may change without backward compatibility
|
// Experimental: This API is at the early stage of development and may change without backward compatibility
|
||||||
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
|
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
|
||||||
func WithQueueBatch(cfg exporterqueue.Config, set QueueBatchSettings[request.Request]) Option {
|
func WithQueueBatch(cfg QueueConfig, set QueueBatchSettings[request.Request]) Option {
|
||||||
return func(o *BaseExporter) error {
|
return func(o *BaseExporter) error {
|
||||||
if !cfg.Enabled {
|
if !cfg.Enabled {
|
||||||
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
|
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,6 @@ import (
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterqueue"
|
|
||||||
"go.opentelemetry.io/collector/exporter/exportertest"
|
"go.opentelemetry.io/collector/exporter/exportertest"
|
||||||
"go.opentelemetry.io/collector/pipeline"
|
"go.opentelemetry.io/collector/pipeline"
|
||||||
)
|
)
|
||||||
|
|
@ -51,10 +50,10 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Nil(t, bs.queueBatchSettings.Encoding)
|
require.Nil(t, bs.queueBatchSettings.Encoding)
|
||||||
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
|
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
|
||||||
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(exporterqueue.NewDefaultConfig()))
|
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|
||||||
qCfg := exporterqueue.NewDefaultConfig()
|
qCfg := NewDefaultQueueConfig()
|
||||||
storageID := component.NewID(component.MustNewType("test"))
|
storageID := component.NewID(component.MustNewType("test"))
|
||||||
qCfg.StorageID = &storageID
|
qCfg.StorageID = &storageID
|
||||||
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
|
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
|
||||||
|
|
@ -70,7 +69,7 @@ func TestBaseExporterLogging(t *testing.T) {
|
||||||
set.Logger = zap.New(logger)
|
set.Logger = zap.New(logger)
|
||||||
rCfg := configretry.NewDefaultBackOffConfig()
|
rCfg := configretry.NewDefaultBackOffConfig()
|
||||||
rCfg.Enabled = false
|
rCfg.Enabled = false
|
||||||
qCfg := exporterqueue.NewDefaultConfig()
|
qCfg := NewDefaultQueueConfig()
|
||||||
qCfg.Enabled = false
|
qCfg.Enabled = false
|
||||||
bs, err := NewBaseExporter(set, pipeline.SignalMetrics, errExport,
|
bs, err := NewBaseExporter(set, pipeline.SignalMetrics, errExport,
|
||||||
WithQueueBatchSettings(newFakeQueueBatch()),
|
WithQueueBatchSettings(newFakeQueueBatch()),
|
||||||
|
|
@ -100,7 +99,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
|
||||||
queueOptions: []Option{
|
queueOptions: []Option{
|
||||||
WithQueueBatchSettings(newFakeQueueBatch()),
|
WithQueueBatchSettings(newFakeQueueBatch()),
|
||||||
func() Option {
|
func() Option {
|
||||||
qs := exporterqueue.NewDefaultConfig()
|
qs := NewDefaultQueueConfig()
|
||||||
qs.Enabled = false
|
qs.Enabled = false
|
||||||
return WithQueue(qs)
|
return WithQueue(qs)
|
||||||
}(),
|
}(),
|
||||||
|
|
@ -115,7 +114,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
|
||||||
name: "WithRequestQueue",
|
name: "WithRequestQueue",
|
||||||
queueOptions: []Option{
|
queueOptions: []Option{
|
||||||
func() Option {
|
func() Option {
|
||||||
qs := exporterqueue.NewDefaultConfig()
|
qs := NewDefaultQueueConfig()
|
||||||
qs.Enabled = false
|
qs.Enabled = false
|
||||||
return WithQueueBatch(qs, newFakeQueueBatch())
|
return WithQueueBatch(qs, newFakeQueueBatch())
|
||||||
}(),
|
}(),
|
||||||
|
|
|
||||||
|
|
@ -5,25 +5,73 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/collector/component"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterbatcher"
|
"go.opentelemetry.io/collector/exporter/exporterbatcher"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterqueue"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
|
// QueueBatchSettings is a subset of the queuebatch.Settings that are needed when used within an Exporter.
|
||||||
type QueueBatchSettings[K any] struct {
|
type QueueBatchSettings[K any] struct {
|
||||||
Encoding exporterqueue.Encoding[K]
|
Encoding queuebatch.Encoding[K]
|
||||||
Sizers map[exporterbatcher.SizerType]queuebatch.Sizer[K]
|
Sizers map[exporterbatcher.SizerType]queuebatch.Sizer[K]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewDefaultQueueConfig returns the default config for QueueConfig.
|
||||||
|
// By default, the queue stores 1000 items of telemetry and is non-blocking when full.
|
||||||
|
func NewDefaultQueueConfig() QueueConfig {
|
||||||
|
return QueueConfig{
|
||||||
|
Enabled: true,
|
||||||
|
NumConsumers: 10,
|
||||||
|
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
|
||||||
|
// This can be estimated at 1-4 GB worth of maximum memory usage
|
||||||
|
// This default is probably still too high, and may be adjusted further down in a future release
|
||||||
|
QueueSize: 1_000,
|
||||||
|
Blocking: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueConfig defines configuration for queueing requests before exporting.
|
||||||
|
// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
|
||||||
|
// Experimental: This API is at the early stage of development and may change without backward compatibility
|
||||||
|
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
|
||||||
|
type QueueConfig struct {
|
||||||
|
// Enabled indicates whether to not enqueue batches before exporting.
|
||||||
|
Enabled bool `mapstructure:"enabled"`
|
||||||
|
// NumConsumers is the number of consumers from the queue.
|
||||||
|
NumConsumers int `mapstructure:"num_consumers"`
|
||||||
|
// QueueSize is the maximum number of requests allowed in queue at any given time.
|
||||||
|
QueueSize int `mapstructure:"queue_size"`
|
||||||
|
// Blocking controls the queue behavior when full.
|
||||||
|
// If true it blocks until enough space to add the new request to the queue.
|
||||||
|
Blocking bool `mapstructure:"blocking"`
|
||||||
|
// StorageID if not empty, enables the persistent storage and uses the component specified
|
||||||
|
// as a storage extension for the persistent queue
|
||||||
|
StorageID *component.ID `mapstructure:"storage"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate checks if the Config is valid
|
||||||
|
func (qCfg *QueueConfig) Validate() error {
|
||||||
|
if !qCfg.Enabled {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if qCfg.NumConsumers <= 0 {
|
||||||
|
return errors.New("`num_consumers` must be positive")
|
||||||
|
}
|
||||||
|
if qCfg.QueueSize <= 0 {
|
||||||
|
return errors.New("`queue_size` must be positive")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func NewQueueSender(
|
func NewQueueSender(
|
||||||
qSet queuebatch.Settings[request.Request],
|
qSet queuebatch.Settings[request.Request],
|
||||||
qCfg exporterqueue.Config,
|
qCfg QueueConfig,
|
||||||
bCfg exporterbatcher.Config,
|
bCfg exporterbatcher.Config,
|
||||||
exportFailureMessage string,
|
exportFailureMessage string,
|
||||||
next sender.Sender[request.Request],
|
next sender.Sender[request.Request],
|
||||||
|
|
@ -43,7 +91,7 @@ func NewQueueSender(
|
||||||
return queuebatch.NewQueueBatch(qSet, newQueueBatchConfig(qCfg, bCfg), exportFunc)
|
return queuebatch.NewQueueBatch(qSet, newQueueBatchConfig(qCfg, bCfg), exportFunc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newQueueBatchConfig(qCfg exporterqueue.Config, bCfg exporterbatcher.Config) queuebatch.Config {
|
func newQueueBatchConfig(qCfg QueueConfig, bCfg exporterbatcher.Config) queuebatch.Config {
|
||||||
qbCfg := queuebatch.Config{
|
qbCfg := queuebatch.Config{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
WaitForResult: !qCfg.Enabled,
|
WaitForResult: !qCfg.Enabled,
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,6 @@ import (
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterqueue"
|
|
||||||
"go.opentelemetry.io/collector/exporter/exportertest"
|
"go.opentelemetry.io/collector/exporter/exportertest"
|
||||||
"go.opentelemetry.io/collector/pipeline"
|
"go.opentelemetry.io/collector/pipeline"
|
||||||
)
|
)
|
||||||
|
|
@ -37,7 +36,7 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
|
||||||
logger, observed := observer.New(zap.ErrorLevel)
|
logger, observed := observer.New(zap.ErrorLevel)
|
||||||
qSet.Telemetry.Logger = zap.New(logger)
|
qSet.Telemetry.Logger = zap.New(logger)
|
||||||
be, err := NewQueueSender(
|
be, err := NewQueueSender(
|
||||||
qSet, exporterqueue.NewDefaultConfig(), exporterbatcher.Config{}, "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
|
qSet, NewDefaultQueueConfig(), exporterbatcher.Config{}, "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
|
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
|
||||||
|
|
@ -46,3 +45,19 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
|
||||||
assert.Len(t, observed.All(), 1)
|
assert.Len(t, observed.All(), 1)
|
||||||
assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message)
|
assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestQueueConfig_Validate(t *testing.T) {
|
||||||
|
qCfg := NewDefaultQueueConfig()
|
||||||
|
require.NoError(t, qCfg.Validate())
|
||||||
|
|
||||||
|
qCfg.NumConsumers = 0
|
||||||
|
require.EqualError(t, qCfg.Validate(), "`num_consumers` must be positive")
|
||||||
|
|
||||||
|
qCfg = NewDefaultQueueConfig()
|
||||||
|
qCfg.QueueSize = 0
|
||||||
|
require.EqualError(t, qCfg.Validate(), "`queue_size` must be positive")
|
||||||
|
|
||||||
|
// Confirm Validate doesn't return error with invalid config when feature is disabled
|
||||||
|
qCfg.Enabled = false
|
||||||
|
assert.NoError(t, qCfg.Validate())
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,6 @@ import (
|
||||||
"go.opentelemetry.io/collector/exporter/exporterbatcher"
|
"go.opentelemetry.io/collector/exporter/exporterbatcher"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
|
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
|
||||||
"go.opentelemetry.io/collector/pdata/plog"
|
"go.opentelemetry.io/collector/pdata/plog"
|
||||||
"go.opentelemetry.io/collector/pipeline"
|
"go.opentelemetry.io/collector/pipeline"
|
||||||
|
|
@ -37,7 +36,7 @@ func NewLogsQueueBatchSettings() QueueBatchSettings {
|
||||||
exporterbatcher.SizerTypeRequests: NewRequestsSizer(),
|
exporterbatcher.SizerTypeRequests: NewRequestsSizer(),
|
||||||
exporterbatcher.SizerTypeItems: queuebatch.NewItemsSizer(),
|
exporterbatcher.SizerTypeItems: queuebatch.NewItemsSizer(),
|
||||||
exporterbatcher.SizerTypeBytes: queuebatch.BaseSizer{
|
exporterbatcher.SizerTypeBytes: queuebatch.BaseSizer{
|
||||||
SizeofFunc: func(req request.Request) int64 {
|
SizeofFunc: func(req Request) int64 {
|
||||||
return int64(logsMarshaler.LogsSize(req.(*logsRequest).ld))
|
return int64(logsMarshaler.LogsSize(req.(*logsRequest).ld))
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,6 @@ import (
|
||||||
"go.opentelemetry.io/collector/exporter/exporterbatcher"
|
"go.opentelemetry.io/collector/exporter/exporterbatcher"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
|
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
|
||||||
"go.opentelemetry.io/collector/pdata/pmetric"
|
"go.opentelemetry.io/collector/pdata/pmetric"
|
||||||
"go.opentelemetry.io/collector/pipeline"
|
"go.opentelemetry.io/collector/pipeline"
|
||||||
|
|
@ -37,7 +36,7 @@ func NewMetricsQueueBatchSettings() QueueBatchSettings {
|
||||||
exporterbatcher.SizerTypeRequests: NewRequestsSizer(),
|
exporterbatcher.SizerTypeRequests: NewRequestsSizer(),
|
||||||
exporterbatcher.SizerTypeItems: queuebatch.NewItemsSizer(),
|
exporterbatcher.SizerTypeItems: queuebatch.NewItemsSizer(),
|
||||||
exporterbatcher.SizerTypeBytes: queuebatch.BaseSizer{
|
exporterbatcher.SizerTypeBytes: queuebatch.BaseSizer{
|
||||||
SizeofFunc: func(req request.Request) int64 {
|
SizeofFunc: func(req Request) int64 {
|
||||||
return int64(metricsMarshaler.MetricsSize(req.(*metricsRequest).md))
|
return int64(metricsMarshaler.MetricsSize(req.(*metricsRequest).md))
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -4,20 +4,49 @@
|
||||||
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"
|
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"go.opentelemetry.io/collector/exporter/exporterbatcher"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterqueue"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
|
||||||
)
|
)
|
||||||
|
|
||||||
// QueueConfig defines configuration for queueing batches before sending to the consumerSender.
|
// QueueConfig defines configuration for queueing batches before sending to the consumerSender.
|
||||||
type QueueConfig = exporterqueue.Config
|
type QueueConfig = internal.QueueConfig
|
||||||
|
|
||||||
// Deprecated: [v0.123.0] use WithQueueBatch.
|
// Deprecated: [v0.123.0] use WithQueueBatch.
|
||||||
func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[Request]) Option {
|
func WithRequestQueue(cfg QueueConfig, encoding QueueBatchEncoding[Request]) Option {
|
||||||
return WithQueueBatch(cfg, QueueBatchSettings{Encoding: encoding})
|
return WithQueueBatch(cfg, QueueBatchSettings{Encoding: encoding})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithQueue overrides the default QueueConfig for an exporter.
|
||||||
|
// The default QueueConfig is to disable queueing.
|
||||||
|
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
|
||||||
|
func WithQueue(config QueueConfig) Option {
|
||||||
|
return internal.WithQueue(config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithBatcher enables batching for an exporter based on custom request types.
|
||||||
|
// For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and
|
||||||
|
// WithRequestBatchFuncs provided.
|
||||||
|
// This API is at the early stage of development and may change without backward compatibility
|
||||||
|
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
|
||||||
|
func WithBatcher(cfg exporterbatcher.Config) Option {
|
||||||
|
return internal.WithBatcher(cfg)
|
||||||
|
}
|
||||||
|
|
||||||
// QueueBatchConfig defines configuration for queueing and batching for the exporter.
|
// QueueBatchConfig defines configuration for queueing and batching for the exporter.
|
||||||
type QueueBatchConfig = exporterqueue.Config
|
type QueueBatchConfig = internal.QueueConfig
|
||||||
|
|
||||||
|
// QueueBatchEncoding defines the encoding to be used if persistent queue is configured.
|
||||||
|
// Duplicate definition with queuebatch.Encoding since aliasing generics is not supported by default.
|
||||||
|
type QueueBatchEncoding[T any] interface {
|
||||||
|
// Marshal is a function that can marshal a request into bytes.
|
||||||
|
Marshal(T) ([]byte, error)
|
||||||
|
|
||||||
|
// Unmarshal is a function that can unmarshal bytes into a request.
|
||||||
|
Unmarshal([]byte) (T, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
var ErrQueueIsFull = queuebatch.ErrQueueIsFull
|
||||||
|
|
||||||
// QueueBatchSettings are settings for the QueueBatch component.
|
// QueueBatchSettings are settings for the QueueBatch component.
|
||||||
// They include things line Encoding to be used with persistent queue, or the available Sizers, etc.
|
// They include things line Encoding to be used with persistent queue, or the available Sizers, etc.
|
||||||
|
|
@ -32,14 +61,5 @@ func WithQueueBatch(cfg QueueBatchConfig, set QueueBatchSettings) Option {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDefaultQueueConfig returns the default config for QueueConfig.
|
// NewDefaultQueueConfig returns the default config for QueueConfig.
|
||||||
func NewDefaultQueueConfig() QueueConfig {
|
// By default, the queue stores 1000 items of telemetry and is non-blocking when full.
|
||||||
return exporterqueue.Config{
|
var NewDefaultQueueConfig = internal.NewDefaultQueueConfig
|
||||||
Enabled: true,
|
|
||||||
NumConsumers: 10,
|
|
||||||
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
|
|
||||||
// This can be estimated at 1-4 GB worth of maximum memory usage
|
|
||||||
// This default is probably still too high, and may be adjusted further down in a future release
|
|
||||||
QueueSize: 1_000,
|
|
||||||
Blocking: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -31,12 +31,12 @@ type RequestConverterFunc[K any] func(context.Context, K) (Request, error)
|
||||||
|
|
||||||
// RequestConsumeFunc processes the request. After the function returns, the request is no longer accessible,
|
// RequestConsumeFunc processes the request. After the function returns, the request is no longer accessible,
|
||||||
// and accessing it is considered undefined behavior.
|
// and accessing it is considered undefined behavior.
|
||||||
type RequestConsumeFunc = sender.SendFunc[request.Request]
|
type RequestConsumeFunc = sender.SendFunc[Request]
|
||||||
|
|
||||||
// RequestSizer is an interface that returns the size of the given request.
|
// RequestSizer is an interface that returns the size of the given request.
|
||||||
type RequestSizer = queuebatch.Sizer[request.Request]
|
type RequestSizer = queuebatch.Sizer[Request]
|
||||||
|
|
||||||
// NewRequestsSizer returns a RequestSizer that counts the requests by the number of requests, always returning 1.
|
// NewRequestsSizer returns a RequestSizer that counts the requests by the number of requests, always returning 1.
|
||||||
func NewRequestsSizer() RequestSizer {
|
func NewRequestsSizer() RequestSizer {
|
||||||
return queuebatch.RequestsSizer[request.Request]{}
|
return queuebatch.RequestsSizer[Request]{}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,6 @@ import (
|
||||||
"go.opentelemetry.io/collector/exporter/exporterbatcher"
|
"go.opentelemetry.io/collector/exporter/exporterbatcher"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
|
|
||||||
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
|
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
|
||||||
"go.opentelemetry.io/collector/pdata/ptrace"
|
"go.opentelemetry.io/collector/pdata/ptrace"
|
||||||
"go.opentelemetry.io/collector/pipeline"
|
"go.opentelemetry.io/collector/pipeline"
|
||||||
|
|
@ -37,7 +36,7 @@ func NewTracesQueueBatchSettings() QueueBatchSettings {
|
||||||
exporterbatcher.SizerTypeRequests: NewRequestsSizer(),
|
exporterbatcher.SizerTypeRequests: NewRequestsSizer(),
|
||||||
exporterbatcher.SizerTypeItems: queuebatch.NewItemsSizer(),
|
exporterbatcher.SizerTypeItems: queuebatch.NewItemsSizer(),
|
||||||
exporterbatcher.SizerTypeBytes: queuebatch.BaseSizer{
|
exporterbatcher.SizerTypeBytes: queuebatch.BaseSizer{
|
||||||
SizeofFunc: func(req request.Request) int64 {
|
SizeofFunc: func(req Request) int64 {
|
||||||
return int64(tracesMarshaler.TracesSize(req.(*tracesRequest).td))
|
return int64(tracesMarshaler.TracesSize(req.(*tracesRequest).td))
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
|
||||||
|
|
@ -4,33 +4,14 @@
|
||||||
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
|
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"go.opentelemetry.io/collector/exporter/exporterhelper"
|
||||||
|
|
||||||
"go.opentelemetry.io/collector/component"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config defines configuration for queueing requests before exporting.
|
// Deprecated: [v0.123.0] Use exporterhelper.QueueConfig
|
||||||
// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
|
type Config = exporterhelper.QueueConfig
|
||||||
// Experimental: This API is at the early stage of development and may change without backward compatibility
|
|
||||||
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
|
|
||||||
type Config struct {
|
|
||||||
// Enabled indicates whether to not enqueue batches before exporting.
|
|
||||||
Enabled bool `mapstructure:"enabled"`
|
|
||||||
// NumConsumers is the number of consumers from the queue.
|
|
||||||
NumConsumers int `mapstructure:"num_consumers"`
|
|
||||||
// QueueSize is the maximum number of requests allowed in queue at any given time.
|
|
||||||
QueueSize int `mapstructure:"queue_size"`
|
|
||||||
// Blocking controls the queue behavior when full.
|
|
||||||
// If true it blocks until enough space to add the new request to the queue.
|
|
||||||
Blocking bool `mapstructure:"blocking"`
|
|
||||||
// StorageID if not empty, enables the persistent storage and uses the component specified
|
|
||||||
// as a storage extension for the persistent queue
|
|
||||||
StorageID *component.ID `mapstructure:"storage"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDefaultConfig returns the default Config.
|
// Deprecated: [v0.123.0] Use exporterhelper.NewDefaultQueueConfig.
|
||||||
// Experimental: This API is at the early stage of development and may change without backward compatibility
|
// Small difference that this is blocking vs the other one which is not blocking.
|
||||||
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
|
|
||||||
func NewDefaultConfig() Config {
|
func NewDefaultConfig() Config {
|
||||||
return Config{
|
return Config{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
|
|
@ -39,17 +20,3 @@ func NewDefaultConfig() Config {
|
||||||
Blocking: true,
|
Blocking: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate checks if the Config is valid
|
|
||||||
func (qCfg *Config) Validate() error {
|
|
||||||
if !qCfg.Enabled {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if qCfg.NumConsumers <= 0 {
|
|
||||||
return errors.New("`num_consumers` must be positive")
|
|
||||||
}
|
|
||||||
if qCfg.QueueSize <= 0 {
|
|
||||||
return errors.New("`queue_size` must be positive")
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -1,27 +0,0 @@
|
||||||
// Copyright The OpenTelemetry Authors
|
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
|
||||||
|
|
||||||
package exporterqueue
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestQueueConfig_Validate(t *testing.T) {
|
|
||||||
qCfg := NewDefaultConfig()
|
|
||||||
require.NoError(t, qCfg.Validate())
|
|
||||||
|
|
||||||
qCfg.NumConsumers = 0
|
|
||||||
require.EqualError(t, qCfg.Validate(), "`num_consumers` must be positive")
|
|
||||||
|
|
||||||
qCfg = NewDefaultConfig()
|
|
||||||
qCfg.QueueSize = 0
|
|
||||||
require.EqualError(t, qCfg.Validate(), "`queue_size` must be positive")
|
|
||||||
|
|
||||||
// Confirm Validate doesn't return error with invalid config when feature is disabled
|
|
||||||
qCfg.Enabled = false
|
|
||||||
assert.NoError(t, qCfg.Validate())
|
|
||||||
}
|
|
||||||
|
|
@ -2,7 +2,15 @@
|
||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
|
package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"
|
||||||
|
import (
|
||||||
|
"go.opentelemetry.io/collector/exporter/exporterhelper"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Deprecated: [v0.123.0] Use exporterhelper.ErrQueueIsFull
|
||||||
|
var ErrQueueIsFull = exporterhelper.ErrQueueIsFull
|
||||||
|
|
||||||
|
// Deprecated: [v0.123.0] Use exporterhelper.QueueBatchEncoding
|
||||||
|
// Duplicate definition with queuebatch.Encoding since aliasing generics is not supported by default.
|
||||||
type Encoding[T any] interface {
|
type Encoding[T any] interface {
|
||||||
// Marshal is a function that can marshal a request into bytes.
|
// Marshal is a function that can marshal a request into bytes.
|
||||||
Marshal(T) ([]byte, error)
|
Marshal(T) ([]byte, error)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue