Process the queue until no data but respect max batch size (#599)

This commit is contained in:
Joshua MacDonald 2020-03-27 16:21:20 -07:00 committed by GitHub
parent e8546e3bc5
commit 1e8e72b530
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 26 deletions

View File

@ -103,23 +103,24 @@ func NewBatchSpanProcessor(e export.SpanBatcher, opts ...BatchSpanProcessorOptio
bsp.stopCh = make(chan struct{})
//Start timer to export metrics
// Start timer to export spans.
ticker := time.NewTicker(bsp.o.ScheduledDelayMillis)
bsp.stopWait.Add(1)
go func(ctx context.Context) {
go func() {
defer ticker.Stop()
batch := make([]*export.SpanData, 0, bsp.o.MaxExportBatchSize)
for {
select {
case <-bsp.stopCh:
bsp.processQueue()
bsp.processQueue(&batch)
close(bsp.queue)
bsp.stopWait.Done()
return
case <-ticker.C:
bsp.processQueue()
bsp.processQueue(&batch)
}
}
}(context.Background())
}()
return bsp, nil
}
@ -166,36 +167,41 @@ func WithBlocking() BatchSpanProcessorOption {
}
}
func (bsp *BatchSpanProcessor) processQueue() {
batch := make([]*export.SpanData, 0, bsp.o.MaxExportBatchSize)
// processQueue removes spans from the `queue` channel until there is
// no more data. It calls the exporter in batches of up to
// MaxExportBatchSize until all the available data have been processed.
func (bsp *BatchSpanProcessor) processQueue(batch *[]*export.SpanData) {
for {
var sd *export.SpanData
var ok bool
select {
case sd = <-bsp.queue:
if sd != nil && sd.SpanContext.IsSampled() {
batch = append(batch, sd)
// Read spans until either the buffer fills or the
// queue is empty.
for ok := true; ok && len(*batch) < bsp.o.MaxExportBatchSize; {
select {
case sd := <-bsp.queue:
if sd != nil && sd.SpanContext.IsSampled() {
*batch = append(*batch, sd)
}
default:
ok = false
}
ok = true
default:
ok = false
}
if ok {
if len(batch) >= bsp.o.MaxExportBatchSize {
bsp.e.ExportSpans(context.Background(), batch)
batch = batch[:0]
}
} else {
if len(batch) > 0 {
bsp.e.ExportSpans(context.Background(), batch)
}
break
if len(*batch) == 0 {
return
}
// Send one batch, then continue reading until the
// buffer is empty.
bsp.e.ExportSpans(context.Background(), *batch)
*batch = (*batch)[:0]
}
}
func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
select {
case <-bsp.stopCh:
return
default:
}
if bsp.o.BlockOnQueueFull {
bsp.queue <- sd
} else {

View File

@ -143,6 +143,19 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
waitTime: waitTime,
parallel: true,
},
{
name: "parallel span blocking",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithScheduleDelayMillis(schDelay),
sdktrace.WithMaxExportBatchSize(200),
sdktrace.WithBlocking(),
},
wantNumSpans: 2000,
wantBatchCount: 10,
genNumSpans: 2000,
waitTime: waitTime,
parallel: true,
},
}
for _, option := range options {
te := testBatchExporter{}
@ -158,6 +171,8 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
time.Sleep(option.waitTime)
tp.UnregisterSpanProcessor(ssp)
gotNumOfSpans := te.len()
if option.wantNumSpans != gotNumOfSpans {
t.Errorf("%s: number of exported span: got %+v, want %+v\n", option.name, gotNumOfSpans, option.wantNumSpans)