Implementing ForceFlush for SpanProcessor (#882)

* first implementation of ForceFlush

* Update sdk/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java

Co-Authored-By: John Watson <jkwatson@gmail.com>

* Rename flush -> shutdown. Add test

* Better wording in Javadoc

Co-authored-by: John Watson <jkwatson@gmail.com>
This commit is contained in:
Giovanni Liva 2020-02-21 20:27:18 +01:00 committed by GitHub
parent 2ebb3ef352
commit 073ece13b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 107 additions and 5 deletions

View File

@ -62,6 +62,13 @@ public final class MultiSpanProcessor implements SpanProcessor {
}
}
@Override
public void forceFlush() {
for (SpanProcessor spanProcessor : spanProcessors) {
spanProcessor.forceFlush();
}
}
private MultiSpanProcessor(List<SpanProcessor> spanProcessors) {
this.spanProcessors = spanProcessors;
}

View File

@ -32,5 +32,8 @@ final class NoopSpanProcessor implements SpanProcessor {
@Override
public void shutdown() {}
@Override
public void forceFlush() {}
private NoopSpanProcessor() {}
}

View File

@ -48,4 +48,12 @@ public interface SpanProcessor {
/** Called when {@link TracerSdkProvider#shutdown()} is called. */
void shutdown();
/**
* Exports all ended spans that have not yet been exported.
*
* <p>This method is called synchronously on the execution thread, and should not throw
* exceptions.
*/
void forceFlush();
}

View File

@ -97,7 +97,12 @@ public final class BatchSpansProcessor implements SpanProcessor {
@Override
public void shutdown() {
workerThread.interrupt();
worker.flush();
worker.shutdown();
}
@Override
public void forceFlush() {
worker.forceFlush();
}
/**
@ -335,7 +340,12 @@ public final class BatchSpansProcessor implements SpanProcessor {
}
}
private void flush() {
private void shutdown() {
forceFlush();
executorService.shutdown();
}
private void forceFlush() {
ArrayList<ReadableSpan> spansCopy;
synchronized (monitor) {
spansCopy = new ArrayList<>(spansList);
@ -343,7 +353,6 @@ public final class BatchSpansProcessor implements SpanProcessor {
}
// Execute the batch export outside the synchronized to not block all producers.
exportBatches(spansCopy);
executorService.shutdown();
}
private void exportBatches(ArrayList<ReadableSpan> spanList) {

View File

@ -64,6 +64,11 @@ public final class SimpleSpansProcessor implements SpanProcessor {
spanExporter.shutdown();
}
@Override
public void forceFlush() {
// Do nothing.
}
/**
* Returns a new Builder for {@link SimpleSpansProcessor}.
*

View File

@ -128,6 +128,28 @@ public class BatchSpansProcessorTest {
span6.toSpanData());
}
@Test
public void forceExport() {
WaitingSpanExporter waitingSpanExporter = new WaitingSpanExporter(1, 1);
BatchSpansProcessor batchSpansProcessor =
BatchSpansProcessor.newBuilder(waitingSpanExporter)
.setMaxQueueSize(10_000)
.setMaxExportBatchSize(2_000)
.setScheduleDelayMillis(10_000) // 10s
.build();
tracerSdkFactory.addSpanProcessor(batchSpansProcessor);
for (int i = 0; i < 100; i++) {
createSampledEndedSpan("notExported");
}
List<SpanData> exported = waitingSpanExporter.waitForExport();
assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(0);
batchSpansProcessor.forceFlush();
exported = waitingSpanExporter.waitForExport();
assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(100);
}
@Test
public void exportSpansToMultipleServices() {
WaitingSpanExporter waitingSpanExporter = new WaitingSpanExporter(2);
@ -395,12 +417,18 @@ public class BatchSpansProcessorTest {
private final List<SpanData> spanDataList = new ArrayList<>();
private final int numberToWaitFor;
private CountDownLatch countDownLatch;
private int timeout = 10;
WaitingSpanExporter(int numberToWaitFor) {
countDownLatch = new CountDownLatch(numberToWaitFor);
this.numberToWaitFor = numberToWaitFor;
}
WaitingSpanExporter(int numberToWaitFor, int timeout) {
this(numberToWaitFor);
this.timeout = timeout;
}
/**
* Waits until we received numberOfSpans spans to export. Returns the list of exported {@link
* SpanData} objects, otherwise {@code null} if the current thread is interrupted.
@ -411,7 +439,7 @@ public class BatchSpansProcessorTest {
@Nullable
List<SpanData> waitForExport() {
try {
countDownLatch.await(10, TimeUnit.SECONDS);
countDownLatch.await(timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Preserve the interruption status as per guidance.
Thread.currentThread().interrupt();

View File

@ -59,6 +59,11 @@ public final class DisruptorAsyncSpanProcessor implements SpanProcessor {
disruptorEventQueue.shutdown();
}
@Override
public void forceFlush() {
disruptorEventQueue.forceFlush();
}
/**
* Returns a new Builder for {@link DisruptorAsyncSpanProcessor}.
*

View File

@ -70,7 +70,8 @@ final class DisruptorEventQueue {
enum EventType {
ON_START,
ON_END,
ON_SHUTDOWN
ON_SHUTDOWN,
ON_FORCE_FLUSH
}
// Creates a new EventQueue. Private to prevent creation of non-singleton instance.
@ -129,6 +130,18 @@ final class DisruptorEventQueue {
}
}
void forceFlush() {
if (isShutdown) {
return;
}
synchronized (this) {
if (isShutdown) {
return;
}
enqueue(null, EventType.ON_FORCE_FLUSH);
}
}
// An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry.
private static final class DisruptorEvent {
@Nullable private ReadableSpan readableSpan = null;
@ -180,6 +193,9 @@ final class DisruptorEventQueue {
spanProcessor.shutdown();
shutdownCounter.countDown();
break;
case ON_FORCE_FLUSH:
spanProcessor.forceFlush();
break;
}
} finally {
// Remove the reference to the previous entry to allow the memory to be gc'ed.

View File

@ -45,6 +45,7 @@ public class DisruptorAsyncSpanProcessorTest {
private final AtomicInteger counterOnStart = new AtomicInteger(0);
private final AtomicInteger counterOnEnd = new AtomicInteger(0);
private final AtomicInteger counterOnShutdown = new AtomicInteger(0);
private final AtomicInteger counterOnForceFlush = new AtomicInteger(0);
@Override
public void onStart(ReadableSpan span) {
@ -61,6 +62,11 @@ public class DisruptorAsyncSpanProcessorTest {
counterOnShutdown.incrementAndGet();
}
@Override
public void forceFlush() {
counterOnForceFlush.incrementAndGet();
}
private int getCounterOnStart() {
return counterOnStart.get();
}
@ -72,6 +78,10 @@ public class DisruptorAsyncSpanProcessorTest {
private int getCounterOnShutdown() {
return counterOnShutdown.get();
}
private int getCounterOnForceFlush() {
return counterOnForceFlush.get();
}
}
@Test
@ -83,10 +93,12 @@ public class DisruptorAsyncSpanProcessorTest {
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0);
disruptorAsyncSpanProcessor.onStart(readableSpan);
disruptorAsyncSpanProcessor.onEnd(readableSpan);
disruptorAsyncSpanProcessor.forceFlush();
disruptorAsyncSpanProcessor.shutdown();
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(1);
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(1);
assertThat(incrementSpanProcessor.getCounterOnShutdown()).isEqualTo(1);
assertThat(incrementSpanProcessor.getCounterOnForceFlush()).isEqualTo(1);
}
@Test
@ -110,8 +122,10 @@ public class DisruptorAsyncSpanProcessorTest {
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.onStart(readableSpan);
disruptorAsyncSpanProcessor.onEnd(readableSpan);
disruptorAsyncSpanProcessor.forceFlush();
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0);
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0);
assertThat(incrementSpanProcessor.getCounterOnForceFlush()).isEqualTo(0);
disruptorAsyncSpanProcessor.shutdown();
assertThat(incrementSpanProcessor.getCounterOnShutdown()).isEqualTo(1);
}
@ -125,10 +139,14 @@ public class DisruptorAsyncSpanProcessorTest {
for (int i = 0; i < tenK; i++) {
disruptorAsyncSpanProcessor.onStart(readableSpan);
disruptorAsyncSpanProcessor.onEnd(readableSpan);
if (i % 10 == 0) {
disruptorAsyncSpanProcessor.forceFlush();
}
}
disruptorAsyncSpanProcessor.shutdown();
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(tenK);
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(tenK);
assertThat(incrementSpanProcessor.getCounterOnForceFlush()).isEqualTo(tenK / 10);
assertThat(incrementSpanProcessor.getCounterOnShutdown()).isEqualTo(1);
}
@ -143,12 +161,15 @@ public class DisruptorAsyncSpanProcessorTest {
.build();
disruptorAsyncSpanProcessor.onStart(readableSpan);
disruptorAsyncSpanProcessor.onEnd(readableSpan);
disruptorAsyncSpanProcessor.forceFlush();
disruptorAsyncSpanProcessor.shutdown();
assertThat(incrementSpanProcessor1.getCounterOnStart()).isEqualTo(1);
assertThat(incrementSpanProcessor1.getCounterOnEnd()).isEqualTo(1);
assertThat(incrementSpanProcessor1.getCounterOnShutdown()).isEqualTo(1);
assertThat(incrementSpanProcessor1.getCounterOnForceFlush()).isEqualTo(1);
assertThat(incrementSpanProcessor2.getCounterOnStart()).isEqualTo(1);
assertThat(incrementSpanProcessor2.getCounterOnEnd()).isEqualTo(1);
assertThat(incrementSpanProcessor2.getCounterOnShutdown()).isEqualTo(1);
assertThat(incrementSpanProcessor2.getCounterOnForceFlush()).isEqualTo(1);
}
}