diff --git a/CHANGELOG.md b/CHANGELOG.md index 627d088a9..7778bd889 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm The existing `ParentSpanID` and `HasRemoteParent` fields are removed in favor of this. (#1748) - The `ParentContext` field of the `"go.opentelemetry.io/otel/sdk/trace".SamplingParameters` is updated to hold a `context.Context` containing the parent span. This changes it to make `SamplingParameters` conform with the OpenTelemetry specification. (#1749) +- Modify `BatchSpanProcessor.ForceFlush` to abort after timeout/cancellation. (#1757) - Improve OTLP/gRPC exporter connection errors. (#1737) ### Removed diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index aa322ae64..25d556e5f 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -148,7 +148,23 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { // ForceFlush exports all ended spans that have not yet been exported. func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { - return bsp.exportSpans(ctx) + var err error + if bsp.e != nil { + wait := make(chan struct{}) + go func() { + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } + close(wait) + }() + // Wait until the export is finished or the context is cancelled/timed out + select { + case <-wait: + case <-ctx.Done(): + err = ctx.Err() + } + } + return err } func WithMaxQueueSize(size int) BatchSpanProcessorOption { diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 6b721ffe3..f2bf428e2 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -17,6 +17,7 @@ package trace_test import ( "context" "encoding/binary" + "errors" "sync" "testing" "time" @@ -257,3 +258,71 @@ func TestBatchSpanProcessorShutdown(t *testing.T) { } assert.Equal(t, 1, bp.shutdownCount) } + +func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) { + te := testBatchExporter{} + tp := basicTracerProvider(t) + option := testOption{ + name: "default BatchSpanProcessorOptions", + o: []sdktrace.BatchSpanProcessorOption{ + sdktrace.WithMaxQueueSize(0), + sdktrace.WithMaxExportBatchSize(3000), + }, + wantNumSpans: 2053, + wantBatchCount: 1, + genNumSpans: 2053, + } + ssp := createAndRegisterBatchSP(option, &te) + if ssp == nil { + t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name) + } + tp.RegisterSpanProcessor(ssp) + tr := tp.Tracer("BatchSpanProcessorWithOption") + generateSpan(t, option.parallel, tr, option) + + // Force flush any held span batches + err := ssp.ForceFlush(context.Background()) + + gotNumOfSpans := te.len() + spanDifference := option.wantNumSpans - gotNumOfSpans + if spanDifference > 10 || spanDifference < 0 { + t.Errorf("number of exported span not equal to or within 10 less than: got %+v, want %+v\n", + gotNumOfSpans, option.wantNumSpans) + } + gotBatchCount := te.getBatchCount() + if gotBatchCount < option.wantBatchCount { + t.Errorf("number batches: got %+v, want >= %+v\n", + gotBatchCount, option.wantBatchCount) + t.Errorf("Batches %v\n", te.sizes) + } + assert.NoError(t, err) +} + +func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) { + var bp testBatchExporter + bsp := sdktrace.NewBatchSpanProcessor(&bp) + // Add timeout to context to test deadline + ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) + defer cancel() + <-ctx.Done() + + if err := bsp.ForceFlush(ctx); err == nil { + t.Error("expected context DeadlineExceeded error, got nil") + } else if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("expected context DeadlineExceeded error, got %v", err) + } +} + +func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) { + var bp testBatchExporter + bsp := sdktrace.NewBatchSpanProcessor(&bp) + ctx, cancel := context.WithCancel(context.Background()) + // Cancel the context + cancel() + + if err := bsp.ForceFlush(ctx); err == nil { + t.Error("expected context canceled error, got nil") + } else if !errors.Is(err, context.Canceled) { + t.Errorf("expected context canceled error, got %v", err) + } +}