diff --git a/sdk/all/build.gradle b/sdk/all/build.gradle index d8c1fa81ee..00c505e641 100644 --- a/sdk/all/build.gradle +++ b/sdk/all/build.gradle @@ -58,4 +58,4 @@ task generateVersionResource { def propertiesFile = new File(folder.getAbsolutePath(), "version.properties") propertiesFile.write("sdk.version=${project.version}") } -} +} \ No newline at end of file diff --git a/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java new file mode 100644 index 0000000000..eed38cada5 --- /dev/null +++ b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java @@ -0,0 +1,157 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricData.LongPoint; +import io.opentelemetry.sdk.metrics.data.MetricData.Point; +import io.opentelemetry.sdk.metrics.export.MetricProducer; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.trace.Tracer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +public class BatchSpanProcessorDroppedSpansBenchmark { + + private static class DelayingSpanExporter implements SpanExporter { + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public CompletableResultCode export(Collection spans) { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public void shutdown() {} + } + + @State(Scope.Benchmark) + public static class BenchmarkState { + private final MetricProducer metricProducer = + OpenTelemetrySdk.getMeterProvider().getMetricProducer(); + private BatchSpanProcessor processor; + private Tracer tracer; + private Collection allMetrics; + + @Setup(Level.Trial) + public final void setup() { + SpanExporter exporter = new DelayingSpanExporter(); + processor = BatchSpanProcessor.newBuilder(exporter).build(); + + tracer = OpenTelemetry.getTracerProvider().get("benchmarkTracer"); + } + + @TearDown(Level.Trial) + public final void tearDown() { + processor.shutdown(); + } + + @TearDown(Level.Iteration) + public final void recordMetrics() { + allMetrics = metricProducer.collectAllMetrics(); + } + } + + @State(Scope.Thread) + @AuxCounters(AuxCounters.Type.EVENTS) + public static class ThreadState { + private Collection allMetrics; + + @TearDown(Level.Iteration) + public final void recordMetrics(BenchmarkState benchmarkState) { + allMetrics = benchmarkState.allMetrics; + } + + /** Burn, checkstyle, burn. */ + public double dropRatio() { + long exported = getMetric(true); + long dropped = getMetric(false); + long total = exported + dropped; + if (total == 0) { + return 0; + } else { + // Due to peculiarities of JMH reporting we have to divide this by the number of the + // concurrent threads running the actual benchmark. + return (double) dropped / total / 5; + } + } + + public long exportedSpans() { + return getMetric(true); + } + + public long droppedSpans() { + return getMetric(false); + } + + private long getMetric(boolean dropped) { + String labelValue = String.valueOf(dropped); + for (MetricData metricData : allMetrics) { + if (metricData.getDescriptor().getName().equals("processedSpans")) { + List points = new ArrayList<>(metricData.getPoints()); + if (points.isEmpty()) { + return 0; + } else { + // Find latest point with given value of dropped label + for (int i = points.size() - 1; i >= 0; i--) { + LongPoint point = (LongPoint) points.get(i); + if (labelValue.equals(point.getLabels().get("dropped"))) { + return point.getValue(); + } + } + } + } + } + return 0; + } + } + + /** Export spans through {@link io.opentelemetry.sdk.trace.export.BatchSpanProcessor}. */ + @Benchmark + @Fork(1) + @Threads(5) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 5, time = 20) + @BenchmarkMode(Mode.Throughput) + public void export( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } +} diff --git a/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorFlushBenchmark.java b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorFlushBenchmark.java new file mode 100644 index 0000000000..6ca8a5a1e1 --- /dev/null +++ b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorFlushBenchmark.java @@ -0,0 +1,118 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.trace.export; + +import com.google.common.collect.ImmutableList; +import io.opentelemetry.OpenTelemetry; +import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.trace.Span; +import io.opentelemetry.trace.Tracer; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@State(Scope.Benchmark) +public class BatchSpanProcessorFlushBenchmark { + + private static class DelayingSpanExporter implements SpanExporter { + + private final ScheduledExecutorService executor; + + private final int delayMs; + + private DelayingSpanExporter(int delayMs) { + executor = Executors.newScheduledThreadPool(5); + this.delayMs = delayMs; + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public CompletableResultCode export(Collection spans) { + final CompletableResultCode result = new CompletableResultCode(); + executor.schedule( + new Runnable() { + @Override + public void run() { + result.succeed(); + } + }, + delayMs, + TimeUnit.MILLISECONDS); + return result; + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public void shutdown() {} + } + + @Param({"0", "1", "5"}) + private int delayMs; + + @Param({"1000", "2000", "5000"}) + private int spanCount; + + private List spans; + + private BatchSpanProcessor processor; + + @Setup(Level.Trial) + public final void setup() { + SpanExporter exporter = new DelayingSpanExporter(delayMs); + processor = BatchSpanProcessor.newBuilder(exporter).build(); + + ImmutableList.Builder spans = ImmutableList.builderWithExpectedSize(spanCount); + Tracer tracer = OpenTelemetry.getTracerProvider().get("benchmarkTracer"); + for (int i = 0; i < spanCount; i++) { + spans.add(tracer.spanBuilder("span").startSpan()); + } + this.spans = spans.build(); + } + + /** Export spans through {@link io.opentelemetry.sdk.trace.export.BatchSpanProcessor}. */ + @Benchmark + @Fork(1) + @Threads(5) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export() { + for (Span span : spans) { + processor.onEnd((ReadableSpan) span); + } + processor.flush().join(10, TimeUnit.MINUTES); + } +} diff --git a/sdk/tracing/build.gradle b/sdk/tracing/build.gradle index b440b8fdcc..5bac5fffc5 100644 --- a/sdk/tracing/build.gradle +++ b/sdk/tracing/build.gradle @@ -24,7 +24,8 @@ dependencies { testCompile project(path: ':opentelemetry-sdk-common', configuration: 'testClasses') testImplementation project(':opentelemetry-testing-internal') - testImplementation libraries.junit_pioneer + testImplementation libraries.junit_pioneer, + libraries.awaitility signature "org.codehaus.mojo.signature:java17:1.0@signature" signature "net.sf.androidscents.signature:android-api-level-24:7.0_r2@signature" diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 95883c177e..c59902ac58 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -31,28 +31,22 @@ import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.concurrent.GuardedBy; /** * Implementation of the {@link SpanProcessor} that batches spans exported by the SDK then pushes * them to the exporter pipeline. * *

All spans reported by the SDK implementation are first added to a synchronized queue (with a - * {@code maxQueueSize} maximum size, after the size is reached spans are dropped) and exported - * every {@code scheduleDelayMillis} to the exporter pipeline in batches of {@code - * maxExportBatchSize}. Spans may also be dropped when it becomes time to export again, and there is - * an export in progress. - * - *

If the queue gets half full a preemptive notification is sent to the worker thread that - * exports the spans to wake up and start a new export cycle. + * {@code maxQueueSize} maximum size, if queue is full spans are dropped). Spans are exported either + * when there are {@code maxExportBatchSize} pending spans or {@code scheduleDelayMillis} has passed + * since the last export finished. * *

This batch {@link SpanProcessor} can cause high contention in a very high traffic service. * TODO: Add a link to the SpanProcessor that uses Disruptor as alternative with low contention. @@ -85,10 +79,7 @@ public final class BatchSpanProcessor implements SpanProcessor { private static final String WORKER_THREAD_NAME = BatchSpanProcessor.class.getSimpleName() + "_WorkerThread"; - private static final String TIMER_THREAD_NAME = - BatchSpanProcessor.class.getSimpleName() + "_TimerThread"; private final Worker worker; - private final Thread workerThread; private final boolean sampled; private BatchSpanProcessor( @@ -102,11 +93,11 @@ public final class BatchSpanProcessor implements SpanProcessor { new Worker( spanExporter, scheduleDelayMillis, - maxQueueSize, maxExportBatchSize, - exporterTimeoutMillis); - this.workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); - this.workerThread.start(); + exporterTimeoutMillis, + new ArrayBlockingQueue(maxQueueSize)); + Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); + workerThread.start(); this.sampled = sampled; } @@ -133,7 +124,6 @@ public final class BatchSpanProcessor implements SpanProcessor { @Override public void shutdown() { - workerThread.interrupt(); worker.shutdown(); } @@ -142,169 +132,148 @@ public final class BatchSpanProcessor implements SpanProcessor { worker.forceFlush(); } + // TODO remove this when this.forceFlush returns CompletableResultCode + @VisibleForTesting + CompletableResultCode flush() { + return worker.forceFlush(); + } + // Worker is a thread that batches multiple spans and calls the registered SpanExporter to export // the data. - // - // The list of batched data is protected by an explicit monitor object which ensures full - // concurrency. private static final class Worker implements Runnable { static { Meter meter = OpenTelemetry.getMeter("io.opentelemetry.sdk.trace"); - LongCounter droppedSpansCounter = + LongCounter processedSpansCounter = meter - .longCounterBuilder("droppedSpans") + .longCounterBuilder("processedSpans") .setUnit("1") .setDescription( "The number of spans dropped by the BatchSpanProcessor due to high throughput.") + .setConstantLabels( + Labels.of("spanProcessorType", BatchSpanProcessor.class.getSimpleName())) .build(); - droppedSpans = - droppedSpansCounter.bind( - Labels.of("spanProcessorType", BatchSpanProcessor.class.getSimpleName())); + droppedSpans = processedSpansCounter.bind(Labels.of("dropped", "true")); + exportedSpans = processedSpansCounter.bind(Labels.of("dropped", "false")); } private static final BoundLongCounter droppedSpans; - - private final Timer timer = new Timer(TIMER_THREAD_NAME, /* isDaemon= */ true); + private static final BoundLongCounter exportedSpans; private static final Logger logger = Logger.getLogger(Worker.class.getName()); private final SpanExporter spanExporter; - private final long scheduleDelayMillis; - private final int maxQueueSize; + private final long scheduleDelayNanos; private final int maxExportBatchSize; - private final int halfMaxQueueSize; - private final Object monitor = new Object(); private final int exporterTimeoutMillis; - private final AtomicBoolean exportAvailable = new AtomicBoolean(true); - @GuardedBy("monitor") - private final List spansList; + private long nextExportTime; + + private final BlockingQueue queue; + + private final AtomicReference flushRequested = new AtomicReference<>(); + private volatile boolean continueWork = true; + private final ArrayList batch; private Worker( SpanExporter spanExporter, long scheduleDelayMillis, - int maxQueueSize, int maxExportBatchSize, - int exporterTimeoutMillis) { + int exporterTimeoutMillis, + BlockingQueue queue) { this.spanExporter = spanExporter; - this.scheduleDelayMillis = scheduleDelayMillis; - this.maxQueueSize = maxQueueSize; - this.halfMaxQueueSize = maxQueueSize >> 1; + this.scheduleDelayNanos = TimeUnit.MILLISECONDS.toNanos(scheduleDelayMillis); this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutMillis = exporterTimeoutMillis; - this.spansList = new ArrayList<>(maxQueueSize); + this.queue = queue; + this.batch = new ArrayList<>(this.maxExportBatchSize); } private void addSpan(ReadableSpan span) { - synchronized (monitor) { - if (spansList.size() == maxQueueSize) { - droppedSpans.add(1); - return; - } - // TODO: Record a gauge for referenced spans. - spansList.add(span); - // Notify the worker thread that at half of the queue is available. It will take - // time anyway for the thread to wake up. - if (spansList.size() >= halfMaxQueueSize) { - monitor.notifyAll(); - } + if (!queue.offer(span)) { + droppedSpans.add(1); } } @Override public void run() { - while (!Thread.currentThread().isInterrupted()) { - // Copy all the batched spans in a separate list to release the monitor lock asap to - // avoid blocking the producer thread. - ArrayList spansCopy; - synchronized (monitor) { - // If still maxExportBatchSize elements in the queue better to execute an extra - if (spansList.size() < maxExportBatchSize) { - do { - // In the case of a spurious wakeup we export only if we have at least one span in - // the batch. It is acceptable because batching is a best effort mechanism here. - try { - monitor.wait(scheduleDelayMillis); - } catch (InterruptedException ie) { - // Preserve the interruption status as per guidance and stop doing any work. - Thread.currentThread().interrupt(); - return; - } - } while (spansList.isEmpty()); - } - spansCopy = new ArrayList<>(spansList); - spansList.clear(); + updateNextExportTime(); + + while (continueWork) { + if (flushRequested.get() != null) { + flush(); + } + + try { + ReadableSpan lastElement = queue.poll(100, TimeUnit.MILLISECONDS); + if (lastElement != null) { + batch.add(lastElement.toSpanData()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + + if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { + exportCurrentBatch(); + updateNextExportTime(); } - // Execute the batch export outside the synchronized to not block all producers. - exportBatches(spansCopy); } } + private void flush() { + int spansToFlush = queue.size(); + while (spansToFlush > 0) { + ReadableSpan span = queue.poll(); + assert span != null; + batch.add(span.toSpanData()); + spansToFlush--; + if (batch.size() >= maxExportBatchSize) { + exportCurrentBatch(); + } + } + exportCurrentBatch(); + flushRequested.get().succeed(); + flushRequested.set(null); + } + + private void updateNextExportTime() { + nextExportTime = System.nanoTime() + scheduleDelayNanos; + } + private void shutdown() { - forceFlush(); - timer.cancel(); + long pendingBatchesCountInQueue = queue.size() / maxExportBatchSize + 1L; + long pendingBatchesCount = pendingBatchesCountInQueue + 1; + long shutdownTimeout = pendingBatchesCount * exporterTimeoutMillis; + + forceFlush().join(shutdownTimeout, TimeUnit.MILLISECONDS); + spanExporter.shutdown(); + continueWork = false; } - private void forceFlush() { - ArrayList spansCopy; - synchronized (monitor) { - spansCopy = new ArrayList<>(spansList); - spansList.clear(); + private CompletableResultCode forceFlush() { + CompletableResultCode flushResult = new CompletableResultCode(); + this.flushRequested.compareAndSet(null, flushResult); + return this.flushRequested.get(); + } + + private void exportCurrentBatch() { + if (batch.isEmpty()) { + return; } - // Execute the batch export outside the synchronized to not block all producers. - exportBatches(spansCopy); - } - private void exportBatches(ArrayList spanList) { - // TODO: Record a counter for pushed spans. - for (int i = 0; i < spanList.size(); ) { - int lastIndexToTake = Math.min(i + maxExportBatchSize, spanList.size()); - onBatchExport(createSpanDataForExport(spanList, i, lastIndexToTake)); - i = lastIndexToTake; - } - } - - private static List createSpanDataForExport( - List spanList, int startIndex, int endIndex) { - List spanDataBuffer = new ArrayList<>(endIndex - startIndex); - for (int i = startIndex; i < endIndex; i++) { - spanDataBuffer.add(spanList.get(i).toSpanData()); - // Remove the reference to the ReadableSpan to allow GC to free the memory. - spanList.set(i, null); - } - return Collections.unmodifiableList(spanDataBuffer); - } - - // Exports the list of SpanData to the SpanExporter. - @SuppressWarnings("BooleanParameter") - private void onBatchExport(final List spans) { - if (exportAvailable.compareAndSet(true, false)) { - try { - final CompletableResultCode result = spanExporter.export(spans); - result.whenComplete( - new Runnable() { - @Override - public void run() { - if (!result.isSuccess()) { - logger.log(Level.FINE, "Exporter failed"); - } - exportAvailable.set(true); - } - }); - timer.schedule( - new TimerTask() { - @Override - public void run() { - result.fail(); - } - }, - exporterTimeoutMillis); - } catch (Exception e) { - logger.log(Level.WARNING, "Exporter threw an Exception", e); + try { + final CompletableResultCode result = spanExporter.export(batch); + result.join(exporterTimeoutMillis, TimeUnit.MILLISECONDS); + if (result.isSuccess()) { + exportedSpans.add(batch.size()); + } else { + logger.log(Level.FINE, "Exporter failed"); } - } else { - logger.log(Level.FINE, "Exporter busy. Dropping spans."); + } catch (Exception e) { + logger.log(Level.WARNING, "Exporter threw an Exception", e); + } finally { + batch.clear(); } } } diff --git a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java index 0841ebb7d4..e6d0d79a43 100644 --- a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java +++ b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java @@ -151,8 +151,13 @@ class TracerSdkTest { StressTestRunner.Operation.create(2_000, 1, new SimpleSpanOperation(tracer))); } + // Needs to correlate with the BatchSpanProcessor.Builder's default, which is the only thing + // this test can guarantee + final int defaultMaxQueueSize = 2048; + stressTestBuilder.build().run(); - assertThat(countingSpanExporter.numberOfSpansExported.get()).isEqualTo(8_000); + assertThat(countingSpanExporter.numberOfSpansExported.get()) + .isGreaterThanOrEqualTo(defaultMaxQueueSize); } private static class CountingSpanProcessor implements SpanProcessor { diff --git a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java index a41b838099..25bb292beb 100644 --- a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java +++ b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java @@ -17,6 +17,7 @@ package io.opentelemetry.sdk.trace.export; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.doThrow; import io.opentelemetry.sdk.common.export.CompletableResultCode; @@ -154,10 +155,9 @@ class BatchSpanProcessorTest { @Test void exportMoreSpansThanTheBufferSize() { - WaitingSpanExporter waitingSpanExporter = - new WaitingSpanExporter(6, CompletableResultCode.ofSuccess()); + CompletableSpanExporter spanExporter = new CompletableSpanExporter(); BatchSpanProcessor batchSpanProcessor = - BatchSpanProcessor.newBuilder(waitingSpanExporter) + BatchSpanProcessor.newBuilder(spanExporter) .setMaxQueueSize(6) .setMaxExportBatchSize(2) .setScheduleDelayMillis(MAX_SCHEDULE_DELAY_MILLIS) @@ -171,25 +171,32 @@ class BatchSpanProcessorTest { ReadableSpan span4 = createSampledEndedSpan(SPAN_NAME_1); ReadableSpan span5 = createSampledEndedSpan(SPAN_NAME_1); ReadableSpan span6 = createSampledEndedSpan(SPAN_NAME_1); - List exported = waitingSpanExporter.waitForExport(); - assertThat(exported) - .containsExactly( - span1.toSpanData(), - span2.toSpanData(), - span3.toSpanData(), - span4.toSpanData(), - span5.toSpanData(), - span6.toSpanData()); + + spanExporter.succeed(); + + await() + .untilAsserted( + () -> + assertThat(spanExporter.getExported()) + .containsExactly( + span1.toSpanData(), + span2.toSpanData(), + span3.toSpanData(), + span4.toSpanData(), + span5.toSpanData(), + span6.toSpanData())); } @Test void forceExport() { WaitingSpanExporter waitingSpanExporter = - new WaitingSpanExporter(1, CompletableResultCode.ofSuccess(), 1); + new WaitingSpanExporter(100, CompletableResultCode.ofSuccess(), 1); BatchSpanProcessor batchSpanProcessor = BatchSpanProcessor.newBuilder(waitingSpanExporter) .setMaxQueueSize(10_000) - .setMaxExportBatchSize(2_000) + // Force flush should send all spans, make sure the number of spans we check here is + // not divisible by the batch size. + .setMaxExportBatchSize(49) .setScheduleDelayMillis(10_000) // 10s .build(); @@ -199,11 +206,12 @@ class BatchSpanProcessorTest { } List exported = waitingSpanExporter.waitForExport(); assertThat(exported).isNotNull(); - assertThat(exported.size()).isEqualTo(0); - batchSpanProcessor.forceFlush(); - exported = waitingSpanExporter.waitForExport(); + assertThat(exported.size()).isEqualTo(98); + + batchSpanProcessor.flush().join(10, TimeUnit.SECONDS); + exported = waitingSpanExporter.getExported(); assertThat(exported).isNotNull(); - assertThat(exported.size()).isEqualTo(100); + assertThat(exported.size()).isEqualTo(2); } @Test @@ -424,20 +432,20 @@ class BatchSpanProcessorTest { @Test @Timeout(10) - public void shutdownFlushes() { + void shutdownFlushes() { WaitingSpanExporter waitingSpanExporter = new WaitingSpanExporter(1, CompletableResultCode.ofSuccess()); - // Set the export delay to zero, for no timeout, in order to confirm the #flush() below works + // Set the export delay to large value, in order to confirm the #flush() below works tracerSdkFactory.addSpanProcessor( - BatchSpanProcessor.newBuilder(waitingSpanExporter).setScheduleDelayMillis(0).build()); + BatchSpanProcessor.newBuilder(waitingSpanExporter).setScheduleDelayMillis(10_000).build()); ReadableSpan span2 = createSampledEndedSpan(SPAN_NAME_2); - // Force a shutdown, without this, the #waitForExport() call below would block indefinitely. + // Force a shutdown, which forces processing of all remaining spans. tracerSdkFactory.shutdown(); - List exported = waitingSpanExporter.waitForExport(); + List exported = waitingSpanExporter.getExported(); assertThat(exported).containsExactly(span2.toSpanData()); assertThat(waitingSpanExporter.shutDownCalled.get()).isTrue(); } @@ -502,6 +510,49 @@ class BatchSpanProcessorTest { } } + private static class CompletableSpanExporter implements SpanExporter { + + private final List results = new ArrayList<>(); + + private final List exported = new ArrayList<>(); + + private volatile boolean succeeded; + + List getExported() { + return exported; + } + + void succeed() { + succeeded = true; + results.forEach(CompletableResultCode::succeed); + } + + @Override + public CompletableResultCode export(Collection spans) { + exported.addAll(spans); + if (succeeded) { + return CompletableResultCode.ofSuccess(); + } + CompletableResultCode result = new CompletableResultCode(); + results.add(result); + return result; + } + + @Override + public CompletableResultCode flush() { + if (succeeded) { + return CompletableResultCode.ofSuccess(); + } else { + return CompletableResultCode.ofFailure(); + } + } + + @Override + public void shutdown() { + flush(); + } + } + static class WaitingSpanExporter implements SpanExporter { private final List spanDataList = new ArrayList<>(); @@ -522,6 +573,12 @@ class BatchSpanProcessorTest { this.timeout = timeout; } + List getExported() { + List result = new ArrayList<>(spanDataList); + spanDataList.clear(); + return result; + } + /** * Waits until we received numberOfSpans spans to export. Returns the list of exported {@link * SpanData} objects, otherwise {@code null} if the current thread is interrupted. @@ -538,9 +595,7 @@ class BatchSpanProcessorTest { Thread.currentThread().interrupt(); return null; } - List result = new ArrayList<>(spanDataList); - spanDataList.clear(); - return result; + return getExported(); } @Override