Add batching capability to the old QueueConfig (#12746)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
This commit is contained in:
Bogdan Drutu 2025-03-26 16:56:07 -07:00 committed by GitHub
parent ea0a4c60c4
commit f09817e056
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 179 additions and 169 deletions

View File

@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support to configure batching in the sending queue.
# One or more tracking issues or pull requests related to the change
issues: [12746]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]

View File

@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Deprecate `QueueConfig` in favor of `QueueBatchConfig`.
# One or more tracking issues or pull requests related to the change
issues: [12746]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]

View File

@ -47,7 +47,7 @@ type BaseExporter struct {
retryCfg configretry.BackOffConfig
queueBatchSettings QueueBatchSettings[request.Request]
queueCfg QueueConfig
queueCfg queuebatch.Config
batcherCfg BatcherConfig
}
@ -190,10 +190,10 @@ func WithRetry(config configretry.BackOffConfig) Option {
}
}
// WithQueue overrides the default QueueConfig for an exporter.
// The default QueueConfig is to disable queueing.
// WithQueue overrides the default queuebatch.Config for an exporter.
// The default queuebatch.Config is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(cfg QueueConfig) Option {
func WithQueue(cfg queuebatch.Config) Option {
return func(o *BaseExporter) error {
if o.queueBatchSettings.Encoding == nil {
return errors.New("WithQueue option is not available for the new request exporters, use WithQueueBatch instead")
@ -206,7 +206,7 @@ func WithQueue(cfg QueueConfig) Option {
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithQueueBatch(cfg QueueConfig, set QueueBatchSettings[request.Request]) Option {
func WithQueueBatch(cfg queuebatch.Config, set QueueBatchSettings[request.Request]) Option {
return func(o *BaseExporter) error {
if !cfg.Enabled {
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."

View File

@ -13,8 +13,6 @@ import (
"go.uber.org/zap"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
@ -26,10 +24,10 @@ type QueueBatchSettings[K any] struct {
Sizers map[request.SizerType]request.Sizer[K]
}
// NewDefaultQueueConfig returns the default config for QueueConfig.
// NewDefaultQueueConfig returns the default config for queuebatch.Config.
// By default, the queue stores 1000 items of telemetry and is non-blocking when full.
func NewDefaultQueueConfig() QueueConfig {
return QueueConfig{
func NewDefaultQueueConfig() queuebatch.Config {
return queuebatch.Config{
Enabled: true,
Sizer: request.SizerTypeRequests,
NumConsumers: 10,
@ -38,94 +36,18 @@ func NewDefaultQueueConfig() QueueConfig {
// This default is probably still too high, and may be adjusted further down in a future release
QueueSize: 1_000,
BlockOnOverflow: false,
StorageID: nil,
Batch: nil,
}
}
// QueueConfig defines configuration for queueing requests before exporting.
// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type QueueConfig struct {
// Enabled indicates whether to not enqueue batches before exporting.
Enabled bool `mapstructure:"enabled"`
// WaitForResult determines if incoming requests are blocked until the request is processed or not.
// Currently, this option is not available when persistent queue is configured using the storage configuration.
WaitForResult bool `mapstructure:"wait_for_result"`
// Sizer determines the type of size measurement used by this component.
// It accepts "requests", "items", or "bytes".
Sizer request.SizerType `mapstructure:"sizer"`
// QueueSize represents the maximum data size allowed for concurrent storage and processing.
QueueSize int64 `mapstructure:"queue_size"`
// NumConsumers is the number of consumers from the queue.
NumConsumers int `mapstructure:"num_consumers"`
// Deprecated: [v0.123.0] use `block_on_overflow`.
Blocking bool `mapstructure:"blocking"`
// BlockOnOverflow determines the behavior when the component's QueueSize limit is reached.
// If true, the component will wait for space; otherwise, operations will immediately return a retryable error.
BlockOnOverflow bool `mapstructure:"block_on_overflow"`
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *component.ID `mapstructure:"storage"`
hasBlocking bool
}
func (qCfg *QueueConfig) Unmarshal(conf *confmap.Conf) error {
if err := conf.Unmarshal(qCfg); err != nil {
return err
}
// If user still uses the old blocking, override and will log error during initialization.
if conf.IsSet("blocking") {
qCfg.hasBlocking = true
qCfg.BlockOnOverflow = qCfg.Blocking
}
return nil
}
// Validate checks if the Config is valid
func (qCfg *QueueConfig) Validate() error {
if !qCfg.Enabled {
return nil
}
if qCfg.NumConsumers <= 0 {
return errors.New("`num_consumers` must be positive")
}
if qCfg.QueueSize <= 0 {
return errors.New("`queue_size` must be positive")
}
if qCfg.StorageID != nil && qCfg.WaitForResult {
return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`")
}
// Only support request sizer for persistent queue at this moment.
if qCfg.StorageID != nil && qCfg.Sizer != request.SizerTypeRequests {
return errors.New("persistent queue configured with `storage` only supports `requests` sizer")
}
return nil
}
func NewQueueSender(
qSet queuebatch.Settings[request.Request],
qCfg QueueConfig,
qCfg queuebatch.Config,
bCfg BatcherConfig,
exportFailureMessage string,
next sender.Sender[request.Request],
) (sender.Sender[request.Request], error) {
if qCfg.hasBlocking {
qSet.Telemetry.Logger.Error("using deprecated field `blocking`")
}
exportFunc := func(ctx context.Context, req request.Request) error {
// Have to read the number of items before sending the request since the request can
// be modified by the downstream components like the batcher.
@ -141,47 +63,41 @@ func NewQueueSender(
return queuebatch.NewQueueBatch(qSet, newQueueBatchConfig(qCfg, bCfg), exportFunc)
}
func newQueueBatchConfig(qCfg QueueConfig, bCfg BatcherConfig) queuebatch.Config {
var qbCfg queuebatch.Config
func newQueueBatchConfig(qCfg queuebatch.Config, bCfg BatcherConfig) queuebatch.Config {
// Overwrite configuration with the legacy BatcherConfig configured via WithBatcher.
// TODO: Remove this when WithBatcher is removed.
if !bCfg.Enabled {
return qCfg
}
// User configured queueing, copy all config.
if qCfg.Enabled {
qbCfg = queuebatch.Config{
Enabled: true,
WaitForResult: qCfg.WaitForResult,
Sizer: qCfg.Sizer,
QueueSize: qCfg.QueueSize,
NumConsumers: qCfg.NumConsumers,
BlockOnOverflow: qCfg.BlockOnOverflow,
StorageID: qCfg.StorageID,
// TODO: Copy batching configuration as well when available.
}
// Overwrite configuration with the legacy BatcherConfig configured via WithBatcher.
// TODO: Remove this when WithBatcher is removed.
if bCfg.Enabled {
qbCfg.Batch = &queuebatch.BatchConfig{
FlushTimeout: bCfg.FlushTimeout,
MinSize: bCfg.MinSize,
MaxSize: bCfg.MaxSize,
}
}
} else {
// This can happen only if the deprecated way to configure batching is used with a "disabled" queue.
// TODO: Remove this when WithBatcher is removed.
qbCfg = queuebatch.Config{
Enabled: true,
WaitForResult: true,
Sizer: request.SizerTypeRequests,
QueueSize: math.MaxInt,
NumConsumers: runtime.NumCPU(),
BlockOnOverflow: true,
StorageID: nil,
Batch: &queuebatch.BatchConfig{
FlushTimeout: bCfg.FlushTimeout,
MinSize: bCfg.MinSize,
MaxSize: bCfg.MaxSize,
},
qCfg.Batch = &queuebatch.BatchConfig{
FlushTimeout: bCfg.FlushTimeout,
MinSize: bCfg.MinSize,
MaxSize: bCfg.MaxSize,
}
return qCfg
}
// This can happen only if the deprecated way to configure batching is used with a "disabled" queue.
// TODO: Remove this when WithBatcher is removed.
return queuebatch.Config{
Enabled: true,
WaitForResult: true,
Sizer: request.SizerTypeRequests,
QueueSize: math.MaxInt,
NumConsumers: runtime.NumCPU(),
BlockOnOverflow: true,
StorageID: nil,
Batch: &queuebatch.BatchConfig{
FlushTimeout: bCfg.FlushTimeout,
MinSize: bCfg.MinSize,
MaxSize: bCfg.MaxSize,
},
}
return qbCfg
}
// BatcherConfig defines a configuration for batching requests based on a timeout and a minimum number of items.

View File

@ -15,7 +15,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
@ -37,8 +36,8 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
qSet.Telemetry.Logger = zap.New(logger)
be, err := NewQueueSender(
qSet, NewDefaultQueueConfig(), BatcherConfig{}, "", sender.NewSender(func(context.Context, request.Request) error { return errors.New("some error") }))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2}))
require.NoError(t, be.Shutdown(context.Background()))
@ -46,21 +45,6 @@ func TestNewQueueSenderFailedRequestDropped(t *testing.T) {
assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message)
}
func TestQueueConfig_DeprecatedBlockingUnmarshal(t *testing.T) {
conf := confmap.NewFromStringMap(map[string]any{
"enabled": true,
"num_consumers": 2,
"queue_size": 100,
"blocking": true,
})
qCfg := QueueConfig{}
assert.False(t, qCfg.BlockOnOverflow)
require.NoError(t, conf.Unmarshal(&qCfg))
assert.True(t, qCfg.BlockOnOverflow)
assert.True(t, qCfg.hasBlocking)
}
func TestQueueConfig_Validate(t *testing.T) {
qCfg := NewDefaultQueueConfig()
require.NoError(t, qCfg.Validate())

View File

@ -8,6 +8,7 @@ import (
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
)
@ -31,6 +32,9 @@ type Config struct {
// If true, the component will wait for space; otherwise, operations will immediately return a retryable error.
BlockOnOverflow bool `mapstructure:"block_on_overflow"`
// Deprecated: [v0.123.0] use `block_on_overflow`.
Blocking bool `mapstructure:"blocking"`
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue.
// TODO: This will be changed to Optional when available.
@ -45,6 +49,23 @@ type Config struct {
// BatchConfig it configures how the requests are consumed from the queue and batch together during consumption.
// TODO: This will be changed to Optional when available.
Batch *BatchConfig `mapstructure:"batch"`
// TODO: Remove when deprecated "blocking" is removed.
hasBlocking bool
}
func (cfg *Config) Unmarshal(conf *confmap.Conf) error {
if err := conf.Unmarshal(cfg); err != nil {
return err
}
// If user still uses the old blocking, override and will log error during initialization.
if conf.IsSet("blocking") {
cfg.hasBlocking = true
cfg.BlockOnOverflow = cfg.Blocking
}
return nil
}
// Validate checks if the Config is valid
@ -61,6 +82,7 @@ func (cfg *Config) Validate() error {
return errors.New("`queue_size` must be positive")
}
// Only support request sizer for persistent queue at this moment.
if cfg.StorageID != nil && cfg.WaitForResult {
return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`")
}
@ -70,6 +92,11 @@ func (cfg *Config) Validate() error {
return errors.New("persistent queue configured with `storage` only supports `requests` sizer")
}
// Only support items sizer for batch at this moment.
if cfg.Batch != nil && cfg.Sizer != request.SizerTypeItems {
return errors.New("`batch` supports only `items` sizer")
}
return nil
}

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
)
@ -40,11 +41,39 @@ func TestConfig_Validate(t *testing.T) {
cfg.StorageID = &storageID
require.EqualError(t, cfg.Validate(), "persistent queue configured with `storage` only supports `requests` sizer")
cfg = newTestConfig()
cfg.Sizer = request.SizerTypeItems
cfg.StorageID = &storageID
require.EqualError(t, cfg.Validate(), "persistent queue configured with `storage` only supports `requests` sizer")
cfg = newTestConfig()
cfg.Sizer = request.SizerTypeRequests
require.EqualError(t, cfg.Validate(), "`batch` supports only `items` sizer")
cfg = newTestConfig()
cfg.Sizer = request.SizerTypeBytes
require.EqualError(t, cfg.Validate(), "`batch` supports only `items` sizer")
// Confirm Validate doesn't return error with invalid config when feature is disabled
cfg.Enabled = false
assert.NoError(t, cfg.Validate())
}
func TestConfigDeprecatedBlockingUnmarshal(t *testing.T) {
conf := confmap.NewFromStringMap(map[string]any{
"enabled": true,
"num_consumers": 2,
"queue_size": 100,
"blocking": true,
})
qCfg := Config{}
assert.False(t, qCfg.BlockOnOverflow)
require.NoError(t, conf.Unmarshal(&qCfg))
assert.True(t, qCfg.BlockOnOverflow)
assert.True(t, qCfg.hasBlocking)
}
func TestBatchConfig_Validate(t *testing.T) {
cfg := newTestBatchConfig()
require.NoError(t, cfg.Validate())

View File

@ -29,10 +29,14 @@ type QueueBatch struct {
}
func NewQueueBatch(
qSet Settings[request.Request],
set Settings[request.Request],
cfg Config,
next sender.SendFunc[request.Request],
) (*QueueBatch, error) {
if cfg.hasBlocking {
set.Telemetry.Logger.Error("using deprecated field `blocking`")
}
var b Batcher[request.Request]
switch {
case cfg.Batch == nil:
@ -49,7 +53,7 @@ func NewQueueBatch(
}
var q Queue[request.Request]
sizer, ok := qSet.Sizers[cfg.Sizer]
sizer, ok := set.Sizers[cfg.Sizer]
if !ok {
return nil, fmt.Errorf("queue_batch: unsupported sizer %q", cfg.Sizer)
}
@ -67,15 +71,15 @@ func NewQueueBatch(
sizer: sizer,
capacity: cfg.QueueSize,
blockOnOverflow: cfg.BlockOnOverflow,
signal: qSet.Signal,
signal: set.Signal,
storageID: *cfg.StorageID,
encoding: qSet.Encoding,
id: qSet.ID,
telemetry: qSet.Telemetry,
encoding: set.Encoding,
id: set.ID,
telemetry: set.Telemetry,
}), cfg.NumConsumers, b.Consume)
}
oq, err := newObsQueue(qSet, q)
oq, err := newObsQueue(set, q)
if err != nil {
return nil, err
}

View File

@ -8,18 +8,18 @@ import (
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
)
// QueueConfig defines configuration for queueing batches before sending to the consumerSender.
type QueueConfig = internal.QueueConfig
// Deprecated: [v0.123.0] use QueueBatchConfig.
type QueueConfig = QueueBatchConfig
// Deprecated: [v0.123.0] use WithQueueBatch.
func WithRequestQueue(cfg QueueConfig, encoding QueueBatchEncoding[Request]) Option {
func WithRequestQueue(cfg QueueBatchConfig, encoding QueueBatchEncoding[Request]) Option {
return WithQueueBatch(cfg, QueueBatchSettings{Encoding: encoding})
}
// WithQueue overrides the default QueueConfig for an exporter.
// The default QueueConfig is to disable queueing.
// WithQueue overrides the default QueueBatchConfig for an exporter.
// The default QueueBatchConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueConfig) Option {
func WithQueue(config QueueBatchConfig) Option {
return internal.WithQueue(config)
}
@ -33,7 +33,7 @@ func WithBatcher(cfg BatcherConfig) Option {
}
// QueueBatchConfig defines configuration for queueing and batching for the exporter.
type QueueBatchConfig = internal.QueueConfig
type QueueBatchConfig = queuebatch.Config
// QueueBatchEncoding defines the encoding to be used if persistent queue is configured.
// Duplicate definition with queuebatch.Encoding since aliasing generics is not supported by default.
@ -59,7 +59,7 @@ func WithQueueBatch(cfg QueueBatchConfig, set QueueBatchSettings) Option {
return internal.WithQueueBatch(cfg, set)
}
// NewDefaultQueueConfig returns the default config for QueueConfig.
// NewDefaultQueueConfig returns the default config for QueueBatchConfig.
// By default, the queue stores 1000 items of telemetry and is non-blocking when full.
var NewDefaultQueueConfig = internal.NewDefaultQueueConfig

View File

@ -7,8 +7,8 @@ import (
"go.opentelemetry.io/collector/exporter/exporterhelper"
)
// Deprecated: [v0.123.0] Use exporterhelper.QueueConfig
type Config = exporterhelper.QueueConfig
// Deprecated: [v0.123.0] Use exporterhelper.QueueBatchConfigs
type Config = exporterhelper.QueueBatchConfig
// Deprecated: [v0.123.0] Use exporterhelper.NewDefaultQueueConfig.
// Small difference that this is blocking vs the other one which is not blocking.

View File

@ -19,10 +19,10 @@ import (
// Config defines configuration for OTLP exporter.
type Config struct {
TimeoutConfig exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
QueueConfig exporterhelper.QueueConfig `mapstructure:"sending_queue"`
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`
ClientConfig configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
TimeoutConfig exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
QueueConfig exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"`
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`
ClientConfig configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
// Experimental: This configuration is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved

View File

@ -50,7 +50,7 @@ func TestUnmarshalConfig(t *testing.T) {
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueConfig: exporterhelper.QueueConfig{
QueueConfig: exporterhelper.QueueBatchConfig{
Enabled: true,
Sizer: exporterhelper.RequestSizerTypeRequests,
NumConsumers: 2,

View File

@ -45,9 +45,9 @@ func (e *EncodingType) UnmarshalText(text []byte) error {
// Config defines configuration for OTLP/HTTP exporter.
type Config struct {
ClientConfig confighttp.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
QueueConfig exporterhelper.QueueConfig `mapstructure:"sending_queue"`
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`
ClientConfig confighttp.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
QueueConfig exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"`
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`
// The URL to send traces to. If omitted the Endpoint + "/v1/traces" will be used.
TracesEndpoint string `mapstructure:"traces_endpoint"`

View File

@ -52,7 +52,7 @@ func TestUnmarshalConfig(t *testing.T) {
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueConfig: exporterhelper.QueueConfig{
QueueConfig: exporterhelper.QueueBatchConfig{
Enabled: true,
Sizer: exporterhelper.RequestSizerTypeRequests,
NumConsumers: 2,

View File

@ -23,7 +23,7 @@ func testExporterConfig(endpoint string) component.Config {
retryConfig := configretry.NewDefaultBackOffConfig()
retryConfig.InitialInterval = time.Millisecond // interval is short for the test purposes
return &otlpexporter.Config{
QueueConfig: exporterhelper.QueueConfig{Enabled: false},
QueueConfig: exporterhelper.QueueBatchConfig{Enabled: false},
RetryConfig: retryConfig,
ClientConfig: configgrpc.ClientConfig{
Endpoint: endpoint,