[chore] [exporterhelper] Move shutdown error from queue package (#9554)

The error is created by the retry sender and used by the queue sender.
It doesn't belong to queue package
This commit is contained in:
Dmitrii Anoshin 2024-02-13 16:04:23 -08:00 committed by GitHub
parent d455bffaf1
commit cc88aee675
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 39 additions and 6 deletions

View File

@ -17,7 +17,7 @@ import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/exporter/internal/experr"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)
@ -127,7 +127,7 @@ func (rs *retrySender) send(ctx context.Context, req Request) error {
case <-ctx.Done():
return fmt.Errorf("request is cancelled or timed out %w", err)
case <-rs.stopCh:
return queue.NewShutdownErr(err)
return experr.NewShutdownErr(err)
case <-time.After(backoffDelay):
}
}

View File

@ -1,7 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"
package experr // import "go.opentelemetry.io/collector/exporter/internal/experr"
import "errors"
type shutdownErr struct {
err error
@ -18,3 +20,8 @@ func (s shutdownErr) Error() string {
func (s shutdownErr) Unwrap() error {
return s.err
}
func IsShutdownErr(err error) bool {
var sdErr shutdownErr
return errors.As(err, &sdErr)
}

View File

@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package experr
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewShutdownErr(t *testing.T) {
err := NewShutdownErr(errors.New("some error"))
assert.Equal(t, "interrupted due to shutdown: some error", err.Error())
}
func TestIsShutdownErr(t *testing.T) {
err := errors.New("testError")
require.False(t, IsShutdownErr(err))
err = NewShutdownErr(err)
require.True(t, IsShutdownErr(err))
}

View File

@ -17,6 +17,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/internal/experr"
"go.opentelemetry.io/collector/extension/experimental/storage"
)
@ -360,7 +361,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
pq.mu.Unlock()
}()
if errors.As(consumeErr, &shutdownErr{}) {
if experr.IsShutdownErr(consumeErr) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
return

View File

@ -18,6 +18,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/experr"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/pdata/pcommon"
@ -411,7 +412,7 @@ func TestPersistentQueue_CorruptedData(t *testing.T) {
}
assert.Equal(t, 3, ps.Size())
require.True(t, ps.Consume(func(context.Context, tracesRequest) error {
return NewShutdownErr(nil)
return experr.NewShutdownErr(nil)
}))
assert.Equal(t, 2, ps.Size())
@ -523,7 +524,7 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) {
// put one more item in
require.NoError(t, ps.Offer(context.Background(), req))
require.Equal(t, 5, ps.Size())
return NewShutdownErr(nil)
return experr.NewShutdownErr(nil)
}))
assert.NoError(t, ps.Shutdown(context.Background()))