drop failed to exporter batches and return error when forcing flush a span processor (#1860)
* drop failed to exporter batches and return error when forcing flush a span processor * changelog * changelog * change should export condition * cleanup
This commit is contained in:
		
							parent
							
								
									f6a9279a86
								
							
						
					
					
						commit
						e399d355cb
					
				| 
						 | 
				
			
			@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
 | 
			
		|||
 | 
			
		||||
- Make `NewSplitDriver` from `go.opentelemetry.io/otel/exporters/otlp` take variadic arguments instead of a `SplitConfig` item.
 | 
			
		||||
  `NewSplitDriver` now automatically implements an internal `noopDriver` for `SplitConfig` fields that are not initialized. (#1798)
 | 
			
		||||
- BatchSpanProcessor now report export failures when calling `ForceFlush()` method. (#1860)
 | 
			
		||||
 | 
			
		||||
### Deprecated
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -23,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
 | 
			
		|||
 | 
			
		||||
- Only report errors from the `"go.opentelemetry.io/otel/sdk/resource".Environment` function when they are not `nil`. (#1850, #1851)
 | 
			
		||||
- The `Shutdown` method of the simple `SpanProcessor` in the `go.opentelemetry.io/otel/sdk/trace` package now honors the context deadline or cancellation. (#1616, #1856)
 | 
			
		||||
- BatchSpanProcessor now drops span batches that failed to be exported. (#1860)
 | 
			
		||||
 | 
			
		||||
### Security
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -156,16 +156,14 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
 | 
			
		|||
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
 | 
			
		||||
	var err error
 | 
			
		||||
	if bsp.e != nil {
 | 
			
		||||
		wait := make(chan struct{})
 | 
			
		||||
		wait := make(chan error)
 | 
			
		||||
		go func() {
 | 
			
		||||
			if err := bsp.exportSpans(ctx); err != nil {
 | 
			
		||||
				otel.Handle(err)
 | 
			
		||||
			}
 | 
			
		||||
			wait <- bsp.exportSpans(ctx)
 | 
			
		||||
			close(wait)
 | 
			
		||||
		}()
 | 
			
		||||
		// Wait until the export is finished or the context is cancelled/timed out
 | 
			
		||||
		select {
 | 
			
		||||
		case <-wait:
 | 
			
		||||
		case err = <-wait:
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			err = ctx.Err()
 | 
			
		||||
		}
 | 
			
		||||
| 
						 | 
				
			
			@ -216,11 +214,18 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
 | 
			
		|||
		defer cancel()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(bsp.batch) > 0 {
 | 
			
		||||
		if err := bsp.e.ExportSpans(ctx, bsp.batch); err != nil {
 | 
			
		||||
	if l := len(bsp.batch); l > 0 {
 | 
			
		||||
		err := bsp.e.ExportSpans(ctx, bsp.batch)
 | 
			
		||||
 | 
			
		||||
		// A new batch is always created after exporting, even if the batch failed to be exported.
 | 
			
		||||
		//
 | 
			
		||||
		// It is up to the exporter to implement any type of retry logic if a batch is failing
 | 
			
		||||
		// to be exported, since it is specific to the protocol and backend being sent to.
 | 
			
		||||
		bsp.batch = bsp.batch[:0]
 | 
			
		||||
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		bsp.batch = bsp.batch[:0]
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -244,7 +249,7 @@ func (bsp *batchSpanProcessor) processQueue() {
 | 
			
		|||
		case sd := <-bsp.queue:
 | 
			
		||||
			bsp.batchMutex.Lock()
 | 
			
		||||
			bsp.batch = append(bsp.batch, sd)
 | 
			
		||||
			shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
 | 
			
		||||
			shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
 | 
			
		||||
			bsp.batchMutex.Unlock()
 | 
			
		||||
			if shouldExport {
 | 
			
		||||
				if !bsp.timer.Stop() {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -37,6 +37,9 @@ type testBatchExporter struct {
 | 
			
		|||
	batchCount    int
 | 
			
		||||
	shutdownCount int
 | 
			
		||||
	delay         time.Duration
 | 
			
		||||
	errors        []error
 | 
			
		||||
	droppedCount  int
 | 
			
		||||
	idx           int
 | 
			
		||||
	err           error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -44,6 +47,13 @@ func (t *testBatchExporter) ExportSpans(ctx context.Context, ss []*sdktrace.Span
 | 
			
		|||
	t.mu.Lock()
 | 
			
		||||
	defer t.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	if t.idx < len(t.errors) {
 | 
			
		||||
		t.droppedCount += len(ss)
 | 
			
		||||
		err := t.errors[t.idx]
 | 
			
		||||
		t.idx++
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	time.Sleep(t.delay)
 | 
			
		||||
 | 
			
		||||
	select {
 | 
			
		||||
| 
						 | 
				
			
			@ -338,12 +348,8 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
 | 
			
		|||
	// Force flush any held span batches
 | 
			
		||||
	err := ssp.ForceFlush(context.Background())
 | 
			
		||||
 | 
			
		||||
	gotNumOfSpans := te.len()
 | 
			
		||||
	spanDifference := option.wantNumSpans - gotNumOfSpans
 | 
			
		||||
	if spanDifference > 10 || spanDifference < 0 {
 | 
			
		||||
		t.Errorf("number of exported span not equal to or within 10 less than: got %+v, want %+v\n",
 | 
			
		||||
			gotNumOfSpans, option.wantNumSpans)
 | 
			
		||||
	}
 | 
			
		||||
	assertMaxSpanDiff(t, te.len(), option.wantNumSpans, 10)
 | 
			
		||||
 | 
			
		||||
	gotBatchCount := te.getBatchCount()
 | 
			
		||||
	if gotBatchCount < option.wantBatchCount {
 | 
			
		||||
		t.Errorf("number batches: got %+v, want >= %+v\n",
 | 
			
		||||
| 
						 | 
				
			
			@ -353,6 +359,65 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
 | 
			
		|||
	assert.NoError(t, err)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestBatchSpanProcessorDropBatchIfFailed(t *testing.T) {
 | 
			
		||||
	te := testBatchExporter{
 | 
			
		||||
		errors: []error{errors.New("fail to export")},
 | 
			
		||||
	}
 | 
			
		||||
	tp := basicTracerProvider(t)
 | 
			
		||||
	option := testOption{
 | 
			
		||||
		o: []sdktrace.BatchSpanProcessorOption{
 | 
			
		||||
			sdktrace.WithMaxQueueSize(0),
 | 
			
		||||
			sdktrace.WithMaxExportBatchSize(2000),
 | 
			
		||||
		},
 | 
			
		||||
		wantNumSpans:   1000,
 | 
			
		||||
		wantBatchCount: 1,
 | 
			
		||||
		genNumSpans:    1000,
 | 
			
		||||
	}
 | 
			
		||||
	ssp := createAndRegisterBatchSP(option, &te)
 | 
			
		||||
	if ssp == nil {
 | 
			
		||||
		t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
 | 
			
		||||
	}
 | 
			
		||||
	tp.RegisterSpanProcessor(ssp)
 | 
			
		||||
	tr := tp.Tracer("BatchSpanProcessorWithOption")
 | 
			
		||||
	generateSpan(t, option.parallel, tr, option)
 | 
			
		||||
 | 
			
		||||
	// Force flush any held span batches
 | 
			
		||||
	err := ssp.ForceFlush(context.Background())
 | 
			
		||||
	assert.Error(t, err)
 | 
			
		||||
	assert.EqualError(t, err, "fail to export")
 | 
			
		||||
 | 
			
		||||
	// First flush will fail, nothing should be exported.
 | 
			
		||||
	assertMaxSpanDiff(t, te.droppedCount, option.wantNumSpans, 10)
 | 
			
		||||
	assert.Equal(t, 0, te.len())
 | 
			
		||||
	assert.Equal(t, 0, te.getBatchCount())
 | 
			
		||||
 | 
			
		||||
	// Generate a new batch, this will succeed
 | 
			
		||||
	generateSpan(t, option.parallel, tr, option)
 | 
			
		||||
 | 
			
		||||
	// Force flush any held span batches
 | 
			
		||||
	err = ssp.ForceFlush(context.Background())
 | 
			
		||||
	assert.NoError(t, err)
 | 
			
		||||
 | 
			
		||||
	assertMaxSpanDiff(t, te.len(), option.wantNumSpans, 10)
 | 
			
		||||
	gotBatchCount := te.getBatchCount()
 | 
			
		||||
	if gotBatchCount < option.wantBatchCount {
 | 
			
		||||
		t.Errorf("number batches: got %+v, want >= %+v\n",
 | 
			
		||||
			gotBatchCount, option.wantBatchCount)
 | 
			
		||||
		t.Errorf("Batches %v\n", te.sizes)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func assertMaxSpanDiff(t *testing.T, want, got, maxDif int) {
 | 
			
		||||
	spanDifference := want - got
 | 
			
		||||
	if spanDifference < 0 {
 | 
			
		||||
		spanDifference = spanDifference * -1
 | 
			
		||||
	}
 | 
			
		||||
	if spanDifference > maxDif {
 | 
			
		||||
		t.Errorf("number of exported span not equal to or within %d less than: got %+v, want %+v\n",
 | 
			
		||||
			maxDif, got, want)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
 | 
			
		||||
	var bp testBatchExporter
 | 
			
		||||
	bsp := sdktrace.NewBatchSpanProcessor(&bp)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue