diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index a66d2d65e..2b06dac86 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -103,23 +103,10 @@ func NewBatchSpanProcessor(e export.SpanBatcher, opts ...BatchSpanProcessorOptio bsp.stopCh = make(chan struct{}) - // Start timer to export spans. - ticker := time.NewTicker(bsp.o.ScheduledDelayMillis) bsp.stopWait.Add(1) go func() { - defer ticker.Stop() - batch := make([]*export.SpanData, 0, bsp.o.MaxExportBatchSize) - for { - select { - case <-bsp.stopCh: - bsp.processQueue(&batch) - close(bsp.queue) - bsp.stopWait.Done() - return - case <-ticker.C: - bsp.processQueue(&batch) - } - } + defer bsp.stopWait.Done() + bsp.processQueue() }() return bsp, nil @@ -167,32 +154,58 @@ func WithBlocking() BatchSpanProcessorOption { } } -// processQueue removes spans from the `queue` channel until there is -// no more data. It calls the exporter in batches of up to -// MaxExportBatchSize until all the available data have been processed. -func (bsp *BatchSpanProcessor) processQueue(batch *[]*export.SpanData) { +// processQueue removes spans from the `queue` channel until processor +// is shut down. It calls the exporter in batches of up to MaxExportBatchSize +// waiting up to ScheduledDelayMillis to form a batch. +func (bsp *BatchSpanProcessor) processQueue() { + ticker := time.NewTicker(bsp.o.ScheduledDelayMillis) + defer ticker.Stop() + + batch := make([]*export.SpanData, 0, bsp.o.MaxExportBatchSize) + + exportSpans := func() { + if len(batch) > 0 { + bsp.e.ExportSpans(context.Background(), batch) + batch = batch[:0] + } + } + +loop: for { - // Read spans until either the buffer fills or the - // queue is empty. - for ok := true; ok && len(*batch) < bsp.o.MaxExportBatchSize; { - select { - case sd := <-bsp.queue: - if sd != nil && sd.SpanContext.IsSampled() { - *batch = append(*batch, sd) + select { + case <-bsp.stopCh: + break loop + case <-ticker.C: + exportSpans() + case sd := <-bsp.queue: + if sd.SpanContext.IsSampled() { + batch = append(batch, sd) + if len(batch) == bsp.o.MaxExportBatchSize { + ticker.Reset(bsp.o.ScheduledDelayMillis) + exportSpans() } - default: - ok = false } } + } - if len(*batch) == 0 { - return + // Consume queue before close to unblock enqueue and prevent + // "panic: send on closed channel". + for { + select { + case sd := <-bsp.queue: + if sd == nil { + exportSpans() + return + } + if sd.SpanContext.IsSampled() { + batch = append(batch, sd) + if len(batch) == bsp.o.MaxExportBatchSize { + exportSpans() + } + } + default: + close(bsp.queue) } - - // Send one batch, then continue reading until the - // buffer is empty. - bsp.e.ExportSpans(context.Background(), *batch) - *batch = (*batch)[:0] } }