[exporterhelper] Do not re-enqueue failed requests (#9090)

The current re-enqueuing behavior is not obvious and cannot be
configured. It takes place only for persistent queue and only if
`retry_on_failure::enabled=true` even if `retry_on_failure` is a setting
for a different backoff retry strategy.

This change removes the re-enqueuing behavior in favor of the
`retry_on_failure` option. Consider increasing
`retry_on_failure::max_elapsed_time` to reduce chances of data loss.

Resolves
https://github.com/open-telemetry/opentelemetry-collector/issues/8382
This commit is contained in:
Dmitrii Anoshin 2023-12-14 20:41:02 -08:00 committed by GitHub
parent d7116dc4f6
commit ef4a5be399
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 97 additions and 209 deletions

View File

@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporters/sending_queue
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Do not re-enqueue failed batches, rely on the retry_on_failure strategy instead.
# One or more tracking issues or pull requests related to the change
issues: [8382]
# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The current re-enqueuing behavior is not obvious and cannot be configured. It takes place only for persistent queue
and only if `retry_on_failure::enabled=true` even if `retry_on_failure` is a setting for a different backoff retry
strategy. This change removes the re-enqueuing behavior. Consider increasing `retry_on_failure::max_elapsed_time`
to reduce chances of data loss or set it to 0 to keep retrying until requests succeed.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]

View File

@ -176,15 +176,6 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
}
be.connectSenders()
// If retry sender is disabled then disable requeuing in the queue sender.
// TODO: Make re-enqueuing configurable on queue sender instead of relying on retry sender.
if qs, ok := be.queueSender.(*queueSender); ok {
// if it's not retrySender, then it is disabled.
if _, ok = be.retrySender.(*retrySender); !ok {
qs.requeuingEnabled = false
}
}
return be, nil
}
@ -212,7 +203,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
func (be *baseExporter) Shutdown(ctx context.Context) error {
return multierr.Combine(
// First shutdown the retry sender, so it can push any pending requests to back the queue.
// First shutdown the retry sender, so the queue sender can flush the queue without retries.
be.retrySender.Shutdown(ctx),
// Then shutdown the queue sender.
be.queueSender.Shutdown(ctx),

View File

@ -40,12 +40,13 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) bool) bool {
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
item, ok := <-q.items
if !ok {
return false
}
consumeFunc(item.ctx, item.req)
// the memory queue doesn't handle consume errors
_ = consumeFunc(item.ctx, item.req)
return true
}

View File

@ -30,10 +30,10 @@ func TestBoundedQueue(t *testing.T) {
consumerState := newConsumerState(t)
consumers := NewQueueConsumers(q, 1, func(_ context.Context, item string) bool {
consumers := NewQueueConsumers(q, 1, func(_ context.Context, item string) error {
consumerState.record(item)
<-waitCh
return true
return nil
})
assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost()))
@ -89,10 +89,10 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
consumerState := newConsumerState(t)
waitChan := make(chan struct{})
consumers := NewQueueConsumers(q, 5, func(_ context.Context, item string) bool {
consumers := NewQueueConsumers(q, 5, func(_ context.Context, item string) error {
<-waitChan
consumerState.record(item)
return true
return nil
})
assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost()))
@ -176,9 +176,9 @@ func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int)
b.ReportAllocs()
for i := 0; i < b.N; i++ {
q := NewBoundedMemoryQueue[string](capacity)
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) bool {
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) error {
time.Sleep(1 * time.Millisecond)
return true
return nil
})
require.NoError(b, consumers.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < numberOfItems; j++ {

View File

@ -13,11 +13,11 @@ import (
type QueueConsumers[T any] struct {
queue Queue[T]
numConsumers int
consumeFunc func(context.Context, T) bool
consumeFunc func(context.Context, T) error
stopWG sync.WaitGroup
}
func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) bool) *QueueConsumers[T] {
func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *QueueConsumers[T] {
return &QueueConsumers[T]{
queue: q,
numConsumers: numConsumers,

View File

@ -0,0 +1,20 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"
type shutdownErr struct {
err error
}
func NewShutdownErr(err error) error {
return shutdownErr{err: err}
}
func (s shutdownErr) Error() string {
return "interrupted due to shutdown: " + s.err.Error()
}
func (s shutdownErr) Unwrap() error {
return s.err
}

View File

@ -141,10 +141,10 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) bool) bool {
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
var (
req T
onProcessingFinished func()
onProcessingFinished func(error)
consumed bool
)
@ -157,9 +157,7 @@ func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) bool)
}
if consumed {
if ok := consumeFunc(context.Background(), req); ok {
onProcessingFinished()
}
onProcessingFinished(consumeFunc(context.Background(), req))
return true
}
}
@ -241,7 +239,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
// getNextItem pulls the next available item from the persistent storage along with a callback function that should be
// called after the item is processed to clean up the storage. If no new item is available, returns false.
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(), bool) {
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), bool) {
pq.mu.Lock()
defer pq.mu.Unlock()
@ -282,7 +280,13 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(), bool)
// Increase the reference count, so the client is not closed while the request is being processed.
pq.refClient++
return request, func() {
return request, func(consumeErr error) {
if errors.As(consumeErr, &shutdownErr{}) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
return
}
// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
defer pq.mu.Unlock()

View File

@ -39,7 +39,7 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
}
// createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers.
func createAndStartTestPersistentQueue(t *testing.T, capacity, numConsumers int, consumeFunc func(_ context.Context, item ptrace.Traces) bool) Queue[ptrace.Traces] {
func createAndStartTestPersistentQueue(t *testing.T, capacity, numConsumers int, consumeFunc func(_ context.Context, item ptrace.Traces) error) Queue[ptrace.Traces] {
pq := NewPersistentQueue[ptrace.Traces](capacity, component.DataTypeTraces, component.ID{}, marshaler.MarshalTraces,
unmarshaler.UnmarshalTraces, exportertest.NewNopCreateSettings())
host := &mockHost{ext: map[component.ID]component.Component{
@ -73,10 +73,10 @@ func createTestPersistentQueue(client storage.Client) *persistentQueue[ptrace.Tr
func TestPersistentQueue_FullCapacity(t *testing.T) {
start := make(chan struct{})
done := make(chan struct{})
pq := createAndStartTestPersistentQueue(t, 5, 1, func(context.Context, ptrace.Traces) bool {
pq := createAndStartTestPersistentQueue(t, 5, 1, func(context.Context, ptrace.Traces) error {
<-start
<-done
return true
return nil
})
assert.Equal(t, 0, pq.Size())
@ -100,7 +100,7 @@ func TestPersistentQueue_FullCapacity(t *testing.T) {
}
func TestPersistentQueue_Shutdown(t *testing.T) {
pq := createAndStartTestPersistentQueue(t, 1001, 100, func(context.Context, ptrace.Traces) bool { return true })
pq := createAndStartTestPersistentQueue(t, 1001, 100, func(context.Context, ptrace.Traces) error { return nil })
req := newTraces(1, 10)
for i := 0; i < 1000; i++ {
@ -140,9 +140,9 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) {
req := newTraces(1, 10)
numMessagesConsumed := &atomic.Int32{}
pq := createAndStartTestPersistentQueue(t, 1000, c.numConsumers, func(context.Context, ptrace.Traces) bool {
pq := createAndStartTestPersistentQueue(t, 1000, c.numConsumers, func(context.Context, ptrace.Traces) error {
numMessagesConsumed.Add(int32(1))
return true
return nil
})
for i := 0; i < c.numMessagesProduced; i++ {
@ -401,7 +401,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0, 1})
// Lets mark item 1 as finished, it will remove it from the currently dispatched items list.
onProcessingFinished()
onProcessingFinished(nil)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})
// Reload the storage. Since items 0 was not finished, this should be re-enqueued at the end.
@ -415,7 +415,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {
r, onProcessingFinished, found := newPs.getNextItem(context.Background())
require.True(t, found)
assert.Equal(t, req, r)
onProcessingFinished()
onProcessingFinished(nil)
}
// The queue should be now empty
@ -524,7 +524,7 @@ func BenchmarkPersistentQueue_TraceSpans(b *testing.B) {
}
for i := 0; i < bb.N; i++ {
require.True(bb, ps.Consume(func(context.Context, ptrace.Traces) bool { return true }))
require.True(bb, ps.Consume(func(context.Context, ptrace.Traces) error { return nil }))
}
require.NoError(b, ext.Shutdown(context.Background()))
})
@ -603,7 +603,7 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) {
assert.False(t, client.(*mockStorageClient).isClosed())
assert.NoError(t, ps.Shutdown(context.Background()))
assert.False(t, client.(*mockStorageClient).isClosed())
onProcessingFinished()
onProcessingFinished(nil)
assert.True(t, client.(*mockStorageClient).isClosed())
}
@ -643,7 +643,7 @@ func TestPersistentQueue_StorageFull(t *testing.T) {
// Subsequent items succeed, as deleting the first item frees enough space for the state update
reqCount--
for i := reqCount; i > 0; i-- {
require.True(t, ps.Consume(func(context.Context, ptrace.Traces) bool { return true }))
require.True(t, ps.Consume(func(context.Context, ptrace.Traces) error { return nil }))
}
// We should be able to put a new item in

View File

@ -30,8 +30,7 @@ type Queue[T any] interface {
// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
// The provided callback function returns true if the item was consumed or false if the consumer is stopped.
Consume(func(ctx context.Context, item T) bool) bool
Consume(func(ctx context.Context, item T) error) bool
// Size returns the current Size of the queue
Size() int
// Capacity returns the capacity of the queue.

View File

@ -7,7 +7,6 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"
"go.opencensus.io/metric/metricdata"
@ -75,14 +74,12 @@ func (qCfg *QueueSettings) Validate() error {
type queueSender struct {
baseRequestSender
fullName string
queue internal.Queue[Request]
traceAttribute attribute.KeyValue
logger *zap.Logger
meter otelmetric.Meter
consumers *internal.QueueConsumers[Request]
requeuingEnabled bool
stopped *atomic.Bool
fullName string
queue internal.Queue[Request]
traceAttribute attribute.KeyValue
logger *zap.Logger
meter otelmetric.Meter
consumers *internal.QueueConsumers[Request]
metricCapacity otelmetric.Int64ObservableGauge
metricSize otelmetric.Int64ObservableGauge
@ -105,50 +102,22 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
// TODO: this can be further exposed as a config param rather than relying on a type of queue
requeuingEnabled: isPersistent,
stopped: &atomic.Bool{},
}
qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume)
return qs
}
// consume is the function that is executed by the queue consumers to send the data to the next consumerSender.
func (qs *queueSender) consume(ctx context.Context, req Request) bool {
func (qs *queueSender) consume(ctx context.Context, req Request) error {
err := qs.nextSender.send(ctx, req)
// Nothing to do if the error is nil or permanent. Permanent errors are already logged by retrySender.
if err == nil || consumererror.IsPermanent(err) {
return true
}
// Do not requeue if the queue sender is stopped.
if qs.stopped.Load() {
return false
}
if !qs.requeuingEnabled {
if err != nil && !consumererror.IsPermanent(err) {
qs.logger.Error(
"Exporting failed. No more retries left. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.ItemsCount()),
)
return true
}
if qs.queue.Offer(ctx, extractPartialRequest(req, err)) == nil {
qs.logger.Error(
"Exporting failed. Putting back to the end of the queue.",
zap.Error(err),
)
} else {
qs.logger.Error(
"Exporting failed. Queue did not accept requeuing request. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.ItemsCount()),
)
}
return true
return err
}
// Start is invoked during service startup.
@ -212,8 +181,6 @@ func (qs *queueSender) recordWithOC() error {
// Shutdown is invoked during service shutdown.
func (qs *queueSender) Shutdown(ctx context.Context) error {
qs.stopped.Store(true)
// Cleanup queue metrics reporting
_ = globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(0)

View File

@ -11,8 +11,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
@ -222,112 +220,6 @@ func TestQueueSettings_Validate(t *testing.T) {
assert.NoError(t, qCfg.Validate())
}
// if requeueing is enabled, we eventually retry even if we failed at first
func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
be.queueSender.(*queueSender).requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})
mockR := newMockRequest(4, errors.New("transient error"))
ocs.run(func() {
ocs.waitGroup.Add(1) // necessary because we'll call send() again after requeueing
// This is asynchronous so it should just enqueue, no errors expected.
require.NoError(t, be.send(context.Background(), mockR))
})
ocs.awaitAsyncProcessing()
// In the newMockConcurrentExporter we count requests and items even for failed requests
mockR.checkNumRequests(t, 2)
// ensure that only 1 item was sent which correspond to items count in the error returned by mockRequest.OnError()
ocs.checkSendItemsCount(t, 1)
ocs.checkDroppedItemsCount(t, 4) // not actually dropped, but ocs counts each failed send here
}
// disabling retry sender should disable requeuing.
func TestQueuedRetry_RequeuingDisabled(t *testing.T) {
mockR := newMockRequest(2, errors.New("transient error"))
// use persistent storage as it expected to be used with requeuing unless the retry sender is disabled
qCfg := NewDefaultQueueSettings()
storageID := component.NewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := NewDefaultRetrySettings()
rCfg.Enabled = false
be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(mockR), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := be.obsrepSender.(*observabilityConsumerSender)
var extensions = map[component.ID]component.Component{
storageID: internal.NewMockStorageExtension(nil),
}
host := &mockHost{ext: extensions}
require.NoError(t, be.Start(context.Background(), host))
ocs.run(func() {
// This is asynchronous so it should just enqueue, no errors expected.
require.NoError(t, be.send(context.Background(), mockR))
})
ocs.awaitAsyncProcessing()
// one failed request, no retries, two items dropped.
mockR.checkNumRequests(t, 1)
ocs.checkSendItemsCount(t, 0)
ocs.checkDroppedItemsCount(t, 2)
}
// if requeueing is enabled, but the queue is full, we get an error
func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
qCfg.QueueSize = 1
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead
set := exportertest.NewNopCreateSettings()
logger, observedLogs := observer.New(zap.ErrorLevel)
set.Logger = zap.New(logger)
be, err := newBaseExporter(set, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
be.queueSender.(*queueSender).requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})
// send a request that will fail after waitReq1 is unblocked
waitReq1 := make(chan struct{})
req1 := newMockExportRequest(func(ctx context.Context) error {
waitReq1 <- struct{}{}
return errors.New("some error")
})
require.NoError(t, be.queueSender.send(context.Background(), req1))
// send another request to fill the queue
req2 := newMockRequest(1, nil)
require.NoError(t, be.queueSender.send(context.Background(), req2))
<-waitReq1
// req1 cannot be put back to the queue and should be dropped, check the log message
assert.Eventually(t, func() bool {
return observedLogs.FilterMessageSnippet("Queue did not accept requeuing request. Dropping data.").Len() == 1
}, time.Second, 1*time.Millisecond)
// req2 should be sent out after that
req2.checkNumRequests(t, 1)
}
func TestQueueRetryWithDisabledQueue(t *testing.T) {
qs := NewDefaultQueueSettings()
qs.Enabled = false
@ -393,7 +285,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
require.Error(t, be.Start(context.Background(), host), "could not get storage client")
}
func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) {
func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
storageID := component.NewIDWithName("file_storage", "storage")
@ -450,19 +342,3 @@ type mockHost struct {
func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
return nh.ext
}
type mockExportRequest struct {
exportFunc func(context.Context) error
}
func newMockExportRequest(exportFunc func(context.Context) error) *mockExportRequest {
return &mockExportRequest{exportFunc: exportFunc}
}
func (m *mockExportRequest) ItemsCount() int {
return 1
}
func (m *mockExportRequest) Export(ctx context.Context) error {
return m.exportFunc(ctx)
}

View File

@ -16,6 +16,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)
@ -35,7 +36,7 @@ type RetrySettings struct {
// consecutive retries will always be `MaxInterval`.
MaxInterval time.Duration `mapstructure:"max_interval"`
// MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch.
// Once this value is reached, the data is discarded.
// Once this value is reached, the data is discarded. If set to 0, the retries are never stopped.
MaxElapsedTime time.Duration `mapstructure:"max_elapsed_time"`
}
@ -185,7 +186,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {
case <-ctx.Done():
return fmt.Errorf("request is cancelled or timed out %w", err)
case <-rs.stopCh:
return fmt.Errorf("interrupted due to shutdown %w", err)
return internal.NewShutdownErr(err)
case <-time.After(backoffDelay):
}
}