More aggresive version of BatchSpanProcessor (#1579)

* More aggresive version of BatchSpanProcessor

* Add benchmark

* Polish

* Polish

* Incorporated some changes from #1571

* Rollback one test change

* Polish

* Polish
This commit is contained in:
Nikita Salnikov-Tarnovski 2020-08-26 18:22:39 +03:00 committed by GitHub
parent 88582202c4
commit af39f35086
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 471 additions and 166 deletions

View File

@ -0,0 +1,157 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.sdk.trace.export;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricData.LongPoint;
import io.opentelemetry.sdk.metrics.data.MetricData.Point;
import io.opentelemetry.sdk.metrics.export.MetricProducer;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.trace.Tracer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
public class BatchSpanProcessorDroppedSpansBenchmark {
private static class DelayingSpanExporter implements SpanExporter {
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}
@Override
public void shutdown() {}
}
@State(Scope.Benchmark)
public static class BenchmarkState {
private final MetricProducer metricProducer =
OpenTelemetrySdk.getMeterProvider().getMetricProducer();
private BatchSpanProcessor processor;
private Tracer tracer;
private Collection<MetricData> allMetrics;
@Setup(Level.Trial)
public final void setup() {
SpanExporter exporter = new DelayingSpanExporter();
processor = BatchSpanProcessor.newBuilder(exporter).build();
tracer = OpenTelemetry.getTracerProvider().get("benchmarkTracer");
}
@TearDown(Level.Trial)
public final void tearDown() {
processor.shutdown();
}
@TearDown(Level.Iteration)
public final void recordMetrics() {
allMetrics = metricProducer.collectAllMetrics();
}
}
@State(Scope.Thread)
@AuxCounters(AuxCounters.Type.EVENTS)
public static class ThreadState {
private Collection<MetricData> allMetrics;
@TearDown(Level.Iteration)
public final void recordMetrics(BenchmarkState benchmarkState) {
allMetrics = benchmarkState.allMetrics;
}
/** Burn, checkstyle, burn. */
public double dropRatio() {
long exported = getMetric(true);
long dropped = getMetric(false);
long total = exported + dropped;
if (total == 0) {
return 0;
} else {
// Due to peculiarities of JMH reporting we have to divide this by the number of the
// concurrent threads running the actual benchmark.
return (double) dropped / total / 5;
}
}
public long exportedSpans() {
return getMetric(true);
}
public long droppedSpans() {
return getMetric(false);
}
private long getMetric(boolean dropped) {
String labelValue = String.valueOf(dropped);
for (MetricData metricData : allMetrics) {
if (metricData.getDescriptor().getName().equals("processedSpans")) {
List<Point> points = new ArrayList<>(metricData.getPoints());
if (points.isEmpty()) {
return 0;
} else {
// Find latest point with given value of dropped label
for (int i = points.size() - 1; i >= 0; i--) {
LongPoint point = (LongPoint) points.get(i);
if (labelValue.equals(point.getLabels().get("dropped"))) {
return point.getValue();
}
}
}
}
}
return 0;
}
}
/** Export spans through {@link io.opentelemetry.sdk.trace.export.BatchSpanProcessor}. */
@Benchmark
@Fork(1)
@Threads(5)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 20)
@BenchmarkMode(Mode.Throughput)
public void export(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
}
}

View File

@ -0,0 +1,118 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.sdk.trace.export;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Tracer;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
@State(Scope.Benchmark)
public class BatchSpanProcessorFlushBenchmark {
private static class DelayingSpanExporter implements SpanExporter {
private final ScheduledExecutorService executor;
private final int delayMs;
private DelayingSpanExporter(int delayMs) {
executor = Executors.newScheduledThreadPool(5);
this.delayMs = delayMs;
}
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
final CompletableResultCode result = new CompletableResultCode();
executor.schedule(
new Runnable() {
@Override
public void run() {
result.succeed();
}
},
delayMs,
TimeUnit.MILLISECONDS);
return result;
}
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}
@Override
public void shutdown() {}
}
@Param({"0", "1", "5"})
private int delayMs;
@Param({"1000", "2000", "5000"})
private int spanCount;
private List<Span> spans;
private BatchSpanProcessor processor;
@Setup(Level.Trial)
public final void setup() {
SpanExporter exporter = new DelayingSpanExporter(delayMs);
processor = BatchSpanProcessor.newBuilder(exporter).build();
ImmutableList.Builder<Span> spans = ImmutableList.builderWithExpectedSize(spanCount);
Tracer tracer = OpenTelemetry.getTracerProvider().get("benchmarkTracer");
for (int i = 0; i < spanCount; i++) {
spans.add(tracer.spanBuilder("span").startSpan());
}
this.spans = spans.build();
}
/** Export spans through {@link io.opentelemetry.sdk.trace.export.BatchSpanProcessor}. */
@Benchmark
@Fork(1)
@Threads(5)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export() {
for (Span span : spans) {
processor.onEnd((ReadableSpan) span);
}
processor.flush().join(10, TimeUnit.MINUTES);
}
}

View File

@ -24,7 +24,8 @@ dependencies {
testCompile project(path: ':opentelemetry-sdk-common', configuration: 'testClasses') testCompile project(path: ':opentelemetry-sdk-common', configuration: 'testClasses')
testImplementation project(':opentelemetry-testing-internal') testImplementation project(':opentelemetry-testing-internal')
testImplementation libraries.junit_pioneer testImplementation libraries.junit_pioneer,
libraries.awaitility
signature "org.codehaus.mojo.signature:java17:1.0@signature" signature "org.codehaus.mojo.signature:java17:1.0@signature"
signature "net.sf.androidscents.signature:android-api-level-24:7.0_r2@signature" signature "net.sf.androidscents.signature:android-api-level-24:7.0_r2@signature"

View File

@ -31,28 +31,22 @@ import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Timer; import java.util.concurrent.ArrayBlockingQueue;
import java.util.TimerTask; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
/** /**
* Implementation of the {@link SpanProcessor} that batches spans exported by the SDK then pushes * Implementation of the {@link SpanProcessor} that batches spans exported by the SDK then pushes
* them to the exporter pipeline. * them to the exporter pipeline.
* *
* <p>All spans reported by the SDK implementation are first added to a synchronized queue (with a * <p>All spans reported by the SDK implementation are first added to a synchronized queue (with a
* {@code maxQueueSize} maximum size, after the size is reached spans are dropped) and exported * {@code maxQueueSize} maximum size, if queue is full spans are dropped). Spans are exported either
* every {@code scheduleDelayMillis} to the exporter pipeline in batches of {@code * when there are {@code maxExportBatchSize} pending spans or {@code scheduleDelayMillis} has passed
* maxExportBatchSize}. Spans may also be dropped when it becomes time to export again, and there is * since the last export finished.
* an export in progress.
*
* <p>If the queue gets half full a preemptive notification is sent to the worker thread that
* exports the spans to wake up and start a new export cycle.
* *
* <p>This batch {@link SpanProcessor} can cause high contention in a very high traffic service. * <p>This batch {@link SpanProcessor} can cause high contention in a very high traffic service.
* TODO: Add a link to the SpanProcessor that uses Disruptor as alternative with low contention. * TODO: Add a link to the SpanProcessor that uses Disruptor as alternative with low contention.
@ -85,10 +79,7 @@ public final class BatchSpanProcessor implements SpanProcessor {
private static final String WORKER_THREAD_NAME = private static final String WORKER_THREAD_NAME =
BatchSpanProcessor.class.getSimpleName() + "_WorkerThread"; BatchSpanProcessor.class.getSimpleName() + "_WorkerThread";
private static final String TIMER_THREAD_NAME =
BatchSpanProcessor.class.getSimpleName() + "_TimerThread";
private final Worker worker; private final Worker worker;
private final Thread workerThread;
private final boolean sampled; private final boolean sampled;
private BatchSpanProcessor( private BatchSpanProcessor(
@ -102,11 +93,11 @@ public final class BatchSpanProcessor implements SpanProcessor {
new Worker( new Worker(
spanExporter, spanExporter,
scheduleDelayMillis, scheduleDelayMillis,
maxQueueSize,
maxExportBatchSize, maxExportBatchSize,
exporterTimeoutMillis); exporterTimeoutMillis,
this.workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); new ArrayBlockingQueue<ReadableSpan>(maxQueueSize));
this.workerThread.start(); Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.start();
this.sampled = sampled; this.sampled = sampled;
} }
@ -133,7 +124,6 @@ public final class BatchSpanProcessor implements SpanProcessor {
@Override @Override
public void shutdown() { public void shutdown() {
workerThread.interrupt();
worker.shutdown(); worker.shutdown();
} }
@ -142,169 +132,148 @@ public final class BatchSpanProcessor implements SpanProcessor {
worker.forceFlush(); worker.forceFlush();
} }
// TODO remove this when this.forceFlush returns CompletableResultCode
@VisibleForTesting
CompletableResultCode flush() {
return worker.forceFlush();
}
// Worker is a thread that batches multiple spans and calls the registered SpanExporter to export // Worker is a thread that batches multiple spans and calls the registered SpanExporter to export
// the data. // the data.
//
// The list of batched data is protected by an explicit monitor object which ensures full
// concurrency.
private static final class Worker implements Runnable { private static final class Worker implements Runnable {
static { static {
Meter meter = OpenTelemetry.getMeter("io.opentelemetry.sdk.trace"); Meter meter = OpenTelemetry.getMeter("io.opentelemetry.sdk.trace");
LongCounter droppedSpansCounter = LongCounter processedSpansCounter =
meter meter
.longCounterBuilder("droppedSpans") .longCounterBuilder("processedSpans")
.setUnit("1") .setUnit("1")
.setDescription( .setDescription(
"The number of spans dropped by the BatchSpanProcessor due to high throughput.") "The number of spans dropped by the BatchSpanProcessor due to high throughput.")
.setConstantLabels(
Labels.of("spanProcessorType", BatchSpanProcessor.class.getSimpleName()))
.build(); .build();
droppedSpans = droppedSpans = processedSpansCounter.bind(Labels.of("dropped", "true"));
droppedSpansCounter.bind( exportedSpans = processedSpansCounter.bind(Labels.of("dropped", "false"));
Labels.of("spanProcessorType", BatchSpanProcessor.class.getSimpleName()));
} }
private static final BoundLongCounter droppedSpans; private static final BoundLongCounter droppedSpans;
private static final BoundLongCounter exportedSpans;
private final Timer timer = new Timer(TIMER_THREAD_NAME, /* isDaemon= */ true);
private static final Logger logger = Logger.getLogger(Worker.class.getName()); private static final Logger logger = Logger.getLogger(Worker.class.getName());
private final SpanExporter spanExporter; private final SpanExporter spanExporter;
private final long scheduleDelayMillis; private final long scheduleDelayNanos;
private final int maxQueueSize;
private final int maxExportBatchSize; private final int maxExportBatchSize;
private final int halfMaxQueueSize;
private final Object monitor = new Object();
private final int exporterTimeoutMillis; private final int exporterTimeoutMillis;
private final AtomicBoolean exportAvailable = new AtomicBoolean(true);
@GuardedBy("monitor") private long nextExportTime;
private final List<ReadableSpan> spansList;
private final BlockingQueue<ReadableSpan> queue;
private final AtomicReference<CompletableResultCode> flushRequested = new AtomicReference<>();
private volatile boolean continueWork = true;
private final ArrayList<SpanData> batch;
private Worker( private Worker(
SpanExporter spanExporter, SpanExporter spanExporter,
long scheduleDelayMillis, long scheduleDelayMillis,
int maxQueueSize,
int maxExportBatchSize, int maxExportBatchSize,
int exporterTimeoutMillis) { int exporterTimeoutMillis,
BlockingQueue<ReadableSpan> queue) {
this.spanExporter = spanExporter; this.spanExporter = spanExporter;
this.scheduleDelayMillis = scheduleDelayMillis; this.scheduleDelayNanos = TimeUnit.MILLISECONDS.toNanos(scheduleDelayMillis);
this.maxQueueSize = maxQueueSize;
this.halfMaxQueueSize = maxQueueSize >> 1;
this.maxExportBatchSize = maxExportBatchSize; this.maxExportBatchSize = maxExportBatchSize;
this.exporterTimeoutMillis = exporterTimeoutMillis; this.exporterTimeoutMillis = exporterTimeoutMillis;
this.spansList = new ArrayList<>(maxQueueSize); this.queue = queue;
this.batch = new ArrayList<>(this.maxExportBatchSize);
} }
private void addSpan(ReadableSpan span) { private void addSpan(ReadableSpan span) {
synchronized (monitor) { if (!queue.offer(span)) {
if (spansList.size() == maxQueueSize) {
droppedSpans.add(1); droppedSpans.add(1);
return;
}
// TODO: Record a gauge for referenced spans.
spansList.add(span);
// Notify the worker thread that at half of the queue is available. It will take
// time anyway for the thread to wake up.
if (spansList.size() >= halfMaxQueueSize) {
monitor.notifyAll();
}
} }
} }
@Override @Override
public void run() { public void run() {
while (!Thread.currentThread().isInterrupted()) { updateNextExportTime();
// Copy all the batched spans in a separate list to release the monitor lock asap to
// avoid blocking the producer thread. while (continueWork) {
ArrayList<ReadableSpan> spansCopy; if (flushRequested.get() != null) {
synchronized (monitor) { flush();
// If still maxExportBatchSize elements in the queue better to execute an extra }
if (spansList.size() < maxExportBatchSize) {
do {
// In the case of a spurious wakeup we export only if we have at least one span in
// the batch. It is acceptable because batching is a best effort mechanism here.
try { try {
monitor.wait(scheduleDelayMillis); ReadableSpan lastElement = queue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) { if (lastElement != null) {
// Preserve the interruption status as per guidance and stop doing any work. batch.add(lastElement.toSpanData());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return; return;
} }
} while (spansList.isEmpty());
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) {
exportCurrentBatch();
updateNextExportTime();
} }
spansCopy = new ArrayList<>(spansList);
spansList.clear();
} }
// Execute the batch export outside the synchronized to not block all producers.
exportBatches(spansCopy);
} }
private void flush() {
int spansToFlush = queue.size();
while (spansToFlush > 0) {
ReadableSpan span = queue.poll();
assert span != null;
batch.add(span.toSpanData());
spansToFlush--;
if (batch.size() >= maxExportBatchSize) {
exportCurrentBatch();
}
}
exportCurrentBatch();
flushRequested.get().succeed();
flushRequested.set(null);
}
private void updateNextExportTime() {
nextExportTime = System.nanoTime() + scheduleDelayNanos;
} }
private void shutdown() { private void shutdown() {
forceFlush(); long pendingBatchesCountInQueue = queue.size() / maxExportBatchSize + 1L;
timer.cancel(); long pendingBatchesCount = pendingBatchesCountInQueue + 1;
long shutdownTimeout = pendingBatchesCount * exporterTimeoutMillis;
forceFlush().join(shutdownTimeout, TimeUnit.MILLISECONDS);
spanExporter.shutdown(); spanExporter.shutdown();
continueWork = false;
} }
private void forceFlush() { private CompletableResultCode forceFlush() {
ArrayList<ReadableSpan> spansCopy; CompletableResultCode flushResult = new CompletableResultCode();
synchronized (monitor) { this.flushRequested.compareAndSet(null, flushResult);
spansCopy = new ArrayList<>(spansList); return this.flushRequested.get();
spansList.clear();
}
// Execute the batch export outside the synchronized to not block all producers.
exportBatches(spansCopy);
} }
private void exportBatches(ArrayList<ReadableSpan> spanList) { private void exportCurrentBatch() {
// TODO: Record a counter for pushed spans. if (batch.isEmpty()) {
for (int i = 0; i < spanList.size(); ) { return;
int lastIndexToTake = Math.min(i + maxExportBatchSize, spanList.size());
onBatchExport(createSpanDataForExport(spanList, i, lastIndexToTake));
i = lastIndexToTake;
}
} }
private static List<SpanData> createSpanDataForExport(
List<ReadableSpan> spanList, int startIndex, int endIndex) {
List<SpanData> spanDataBuffer = new ArrayList<>(endIndex - startIndex);
for (int i = startIndex; i < endIndex; i++) {
spanDataBuffer.add(spanList.get(i).toSpanData());
// Remove the reference to the ReadableSpan to allow GC to free the memory.
spanList.set(i, null);
}
return Collections.unmodifiableList(spanDataBuffer);
}
// Exports the list of SpanData to the SpanExporter.
@SuppressWarnings("BooleanParameter")
private void onBatchExport(final List<SpanData> spans) {
if (exportAvailable.compareAndSet(true, false)) {
try { try {
final CompletableResultCode result = spanExporter.export(spans); final CompletableResultCode result = spanExporter.export(batch);
result.whenComplete( result.join(exporterTimeoutMillis, TimeUnit.MILLISECONDS);
new Runnable() { if (result.isSuccess()) {
@Override exportedSpans.add(batch.size());
public void run() { } else {
if (!result.isSuccess()) {
logger.log(Level.FINE, "Exporter failed"); logger.log(Level.FINE, "Exporter failed");
} }
exportAvailable.set(true);
}
});
timer.schedule(
new TimerTask() {
@Override
public void run() {
result.fail();
}
},
exporterTimeoutMillis);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e); logger.log(Level.WARNING, "Exporter threw an Exception", e);
} } finally {
} else { batch.clear();
logger.log(Level.FINE, "Exporter busy. Dropping spans.");
} }
} }
} }

View File

@ -151,8 +151,13 @@ class TracerSdkTest {
StressTestRunner.Operation.create(2_000, 1, new SimpleSpanOperation(tracer))); StressTestRunner.Operation.create(2_000, 1, new SimpleSpanOperation(tracer)));
} }
// Needs to correlate with the BatchSpanProcessor.Builder's default, which is the only thing
// this test can guarantee
final int defaultMaxQueueSize = 2048;
stressTestBuilder.build().run(); stressTestBuilder.build().run();
assertThat(countingSpanExporter.numberOfSpansExported.get()).isEqualTo(8_000); assertThat(countingSpanExporter.numberOfSpansExported.get())
.isGreaterThanOrEqualTo(defaultMaxQueueSize);
} }
private static class CountingSpanProcessor implements SpanProcessor { private static class CountingSpanProcessor implements SpanProcessor {

View File

@ -17,6 +17,7 @@
package io.opentelemetry.sdk.trace.export; package io.opentelemetry.sdk.trace.export;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.CompletableResultCode;
@ -154,10 +155,9 @@ class BatchSpanProcessorTest {
@Test @Test
void exportMoreSpansThanTheBufferSize() { void exportMoreSpansThanTheBufferSize() {
WaitingSpanExporter waitingSpanExporter = CompletableSpanExporter spanExporter = new CompletableSpanExporter();
new WaitingSpanExporter(6, CompletableResultCode.ofSuccess());
BatchSpanProcessor batchSpanProcessor = BatchSpanProcessor batchSpanProcessor =
BatchSpanProcessor.newBuilder(waitingSpanExporter) BatchSpanProcessor.newBuilder(spanExporter)
.setMaxQueueSize(6) .setMaxQueueSize(6)
.setMaxExportBatchSize(2) .setMaxExportBatchSize(2)
.setScheduleDelayMillis(MAX_SCHEDULE_DELAY_MILLIS) .setScheduleDelayMillis(MAX_SCHEDULE_DELAY_MILLIS)
@ -171,25 +171,32 @@ class BatchSpanProcessorTest {
ReadableSpan span4 = createSampledEndedSpan(SPAN_NAME_1); ReadableSpan span4 = createSampledEndedSpan(SPAN_NAME_1);
ReadableSpan span5 = createSampledEndedSpan(SPAN_NAME_1); ReadableSpan span5 = createSampledEndedSpan(SPAN_NAME_1);
ReadableSpan span6 = createSampledEndedSpan(SPAN_NAME_1); ReadableSpan span6 = createSampledEndedSpan(SPAN_NAME_1);
List<SpanData> exported = waitingSpanExporter.waitForExport();
assertThat(exported) spanExporter.succeed();
await()
.untilAsserted(
() ->
assertThat(spanExporter.getExported())
.containsExactly( .containsExactly(
span1.toSpanData(), span1.toSpanData(),
span2.toSpanData(), span2.toSpanData(),
span3.toSpanData(), span3.toSpanData(),
span4.toSpanData(), span4.toSpanData(),
span5.toSpanData(), span5.toSpanData(),
span6.toSpanData()); span6.toSpanData()));
} }
@Test @Test
void forceExport() { void forceExport() {
WaitingSpanExporter waitingSpanExporter = WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(1, CompletableResultCode.ofSuccess(), 1); new WaitingSpanExporter(100, CompletableResultCode.ofSuccess(), 1);
BatchSpanProcessor batchSpanProcessor = BatchSpanProcessor batchSpanProcessor =
BatchSpanProcessor.newBuilder(waitingSpanExporter) BatchSpanProcessor.newBuilder(waitingSpanExporter)
.setMaxQueueSize(10_000) .setMaxQueueSize(10_000)
.setMaxExportBatchSize(2_000) // Force flush should send all spans, make sure the number of spans we check here is
// not divisible by the batch size.
.setMaxExportBatchSize(49)
.setScheduleDelayMillis(10_000) // 10s .setScheduleDelayMillis(10_000) // 10s
.build(); .build();
@ -199,11 +206,12 @@ class BatchSpanProcessorTest {
} }
List<SpanData> exported = waitingSpanExporter.waitForExport(); List<SpanData> exported = waitingSpanExporter.waitForExport();
assertThat(exported).isNotNull(); assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(0); assertThat(exported.size()).isEqualTo(98);
batchSpanProcessor.forceFlush();
exported = waitingSpanExporter.waitForExport(); batchSpanProcessor.flush().join(10, TimeUnit.SECONDS);
exported = waitingSpanExporter.getExported();
assertThat(exported).isNotNull(); assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(100); assertThat(exported.size()).isEqualTo(2);
} }
@Test @Test
@ -424,20 +432,20 @@ class BatchSpanProcessorTest {
@Test @Test
@Timeout(10) @Timeout(10)
public void shutdownFlushes() { void shutdownFlushes() {
WaitingSpanExporter waitingSpanExporter = WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(1, CompletableResultCode.ofSuccess()); new WaitingSpanExporter(1, CompletableResultCode.ofSuccess());
// Set the export delay to zero, for no timeout, in order to confirm the #flush() below works // Set the export delay to large value, in order to confirm the #flush() below works
tracerSdkFactory.addSpanProcessor( tracerSdkFactory.addSpanProcessor(
BatchSpanProcessor.newBuilder(waitingSpanExporter).setScheduleDelayMillis(0).build()); BatchSpanProcessor.newBuilder(waitingSpanExporter).setScheduleDelayMillis(10_000).build());
ReadableSpan span2 = createSampledEndedSpan(SPAN_NAME_2); ReadableSpan span2 = createSampledEndedSpan(SPAN_NAME_2);
// Force a shutdown, without this, the #waitForExport() call below would block indefinitely. // Force a shutdown, which forces processing of all remaining spans.
tracerSdkFactory.shutdown(); tracerSdkFactory.shutdown();
List<SpanData> exported = waitingSpanExporter.waitForExport(); List<SpanData> exported = waitingSpanExporter.getExported();
assertThat(exported).containsExactly(span2.toSpanData()); assertThat(exported).containsExactly(span2.toSpanData());
assertThat(waitingSpanExporter.shutDownCalled.get()).isTrue(); assertThat(waitingSpanExporter.shutDownCalled.get()).isTrue();
} }
@ -502,6 +510,49 @@ class BatchSpanProcessorTest {
} }
} }
private static class CompletableSpanExporter implements SpanExporter {
private final List<CompletableResultCode> results = new ArrayList<>();
private final List<SpanData> exported = new ArrayList<>();
private volatile boolean succeeded;
List<SpanData> getExported() {
return exported;
}
void succeed() {
succeeded = true;
results.forEach(CompletableResultCode::succeed);
}
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
exported.addAll(spans);
if (succeeded) {
return CompletableResultCode.ofSuccess();
}
CompletableResultCode result = new CompletableResultCode();
results.add(result);
return result;
}
@Override
public CompletableResultCode flush() {
if (succeeded) {
return CompletableResultCode.ofSuccess();
} else {
return CompletableResultCode.ofFailure();
}
}
@Override
public void shutdown() {
flush();
}
}
static class WaitingSpanExporter implements SpanExporter { static class WaitingSpanExporter implements SpanExporter {
private final List<SpanData> spanDataList = new ArrayList<>(); private final List<SpanData> spanDataList = new ArrayList<>();
@ -522,6 +573,12 @@ class BatchSpanProcessorTest {
this.timeout = timeout; this.timeout = timeout;
} }
List<SpanData> getExported() {
List<SpanData> result = new ArrayList<>(spanDataList);
spanDataList.clear();
return result;
}
/** /**
* Waits until we received numberOfSpans spans to export. Returns the list of exported {@link * Waits until we received numberOfSpans spans to export. Returns the list of exported {@link
* SpanData} objects, otherwise {@code null} if the current thread is interrupted. * SpanData} objects, otherwise {@code null} if the current thread is interrupted.
@ -538,9 +595,7 @@ class BatchSpanProcessorTest {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return null; return null;
} }
List<SpanData> result = new ArrayList<>(spanDataList); return getExported();
spanDataList.clear();
return result;
} }
@Override @Override