Don't fail on timeout / interrupted since the result isn't really don… (#2957)

* Don't fail on timeout / interrupted since the result isn't really done yet.

* Fix test

* warning
This commit is contained in:
Anuraag Agrawal 2021-03-04 06:48:51 +09:00 committed by GitHub
parent 5de886b7ed
commit a33a4ed036
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 70 additions and 50 deletions

View File

@ -136,8 +136,9 @@ public final class CompletableResultCode {
}
/**
* Waits for the specified amount of time for this {@link CompletableResultCode} to complete. If
* it times out or is interrupted, the {@link CompletableResultCode} is failed.
* Waits up to the specified amount of time for this {@link CompletableResultCode} to complete.
* Even after this method returns, the result may not be complete yet - you should always check
* {@link #isSuccess()} or {@link #isDone()} after calling this method to determine the result.
*
* @return this {@link CompletableResultCode}
*/
@ -149,11 +150,10 @@ public final class CompletableResultCode {
whenComplete(latch::countDown);
try {
if (!latch.await(timeout, unit)) {
fail();
return this;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail();
}
return this;
}

View File

@ -13,6 +13,7 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
class CompletableResultCodeTest {
@ -163,17 +164,24 @@ class CompletableResultCodeTest {
void joinTimesOut() {
CompletableResultCode result = new CompletableResultCode();
assertThat(result.join(1, TimeUnit.MILLISECONDS).isSuccess()).isFalse();
assertThat(result.isDone()).isTrue();
assertThat(result.isDone()).isFalse();
}
@Test
void joinInterrupted() {
CompletableResultCode result = new CompletableResultCode();
Thread thread = new Thread(() -> result.join(10, TimeUnit.SECONDS));
AtomicReference<Boolean> interrupted = new AtomicReference<>();
Thread thread =
new Thread(
() -> {
result.join(10, TimeUnit.SECONDS);
interrupted.set(Thread.currentThread().isInterrupted());
});
thread.start();
thread.interrupt();
// Different thread so wait a bit for result to be propagated.
await().untilAsserted(() -> assertThat(result.isDone()).isTrue());
await().untilAsserted(() -> assertThat(interrupted).hasValue(true));
assertThat(result.isSuccess()).isFalse();
assertThat(result.isDone()).isFalse();
}
}

View File

@ -111,6 +111,11 @@ public final class BatchSpanProcessor implements SpanProcessor {
return worker.forceFlush();
}
// Visible for testing
ArrayList<SpanData> getBatch() {
return worker.batch;
}
// Worker is a thread that batches multiple spans and calls the registered SpanExporter to export
// the data.
private static final class Worker implements Runnable {

View File

@ -10,8 +10,10 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;
import io.opentelemetry.api.trace.Span;
@ -33,15 +35,19 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
@SuppressWarnings("PreferJavaTimeOverload")
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class BatchSpanProcessorTest {
private static final String SPAN_NAME_1 = "MySpanName/1";
@ -51,6 +57,12 @@ class BatchSpanProcessorTest {
private final BlockingSpanExporter blockingSpanExporter = new BlockingSpanExporter();
@Mock private Sampler mockSampler;
@Mock private SpanExporter mockSpanExporter;
@BeforeEach
void setUp() {
when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
}
@AfterEach
void cleanup() {
@ -317,52 +329,47 @@ class BatchSpanProcessorTest {
@Test
@Timeout(5)
public void exporterTimesOut() throws InterruptedException {
final CountDownLatch interruptMarker = new CountDownLatch(1);
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(1, new CompletableResultCode()) {
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
CompletableResultCode result = super.export(spans);
Thread exporterThread =
new Thread(
() -> {
try {
// sleep longer than the configured timeout of 100ms
Thread.sleep(1000);
} catch (InterruptedException e) {
interruptMarker.countDown();
}
});
exporterThread.start();
result.whenComplete(
() -> {
if (!result.isSuccess()) {
exporterThread.interrupt();
}
});
return result;
}
};
int exporterTimeoutMillis = 100;
sdkTracerProvider =
SdkTracerProvider.builder()
.addSpanProcessor(
BatchSpanProcessor.builder(waitingSpanExporter)
.setExporterTimeout(exporterTimeoutMillis, TimeUnit.MILLISECONDS)
.setScheduleDelay(1, TimeUnit.MILLISECONDS)
.setMaxQueueSize(1)
.build())
public void continuesIfExporterTimesOut() throws InterruptedException {
int exporterTimeoutMillis = 10;
BatchSpanProcessor bsp =
BatchSpanProcessor.builder(mockSpanExporter)
.setExporterTimeout(exporterTimeoutMillis, TimeUnit.MILLISECONDS)
.setScheduleDelay(1, TimeUnit.MILLISECONDS)
.setMaxQueueSize(1)
.build();
sdkTracerProvider = SdkTracerProvider.builder().addSpanProcessor(bsp).build();
ReadableSpan span = createEndedSpan(SPAN_NAME_1);
List<SpanData> exported = waitingSpanExporter.waitForExport();
assertThat(exported).containsExactly(span.toSpanData());
CountDownLatch exported = new CountDownLatch(1);
// We return a result we never complete, meaning it will timeout.
when(mockSpanExporter.export(
argThat(
spans -> {
assertThat(spans)
.anySatisfy(span -> assertThat(span.getName()).isEqualTo(SPAN_NAME_1));
exported.countDown();
return true;
})))
.thenReturn(new CompletableResultCode());
createEndedSpan(SPAN_NAME_1);
exported.await();
// Timed out so the span was dropped.
await().untilAsserted(() -> assertThat(bsp.getBatch()).isEmpty());
// since the interrupt happens outside the execution of the test method, we'll block to make
// sure that the thread was actually interrupted due to the timeout.
interruptMarker.await();
// Still processing new spans.
CountDownLatch exportedAgain = new CountDownLatch(1);
reset(mockSpanExporter);
when(mockSpanExporter.export(
argThat(
spans -> {
assertThat(spans)
.anySatisfy(span -> assertThat(span.getName()).isEqualTo(SPAN_NAME_2));
exportedAgain.countDown();
return true;
})))
.thenReturn(CompletableResultCode.ofSuccess());
createEndedSpan(SPAN_NAME_2);
exported.await();
await().untilAsserted(() -> assertThat(bsp.getBatch()).isEmpty());
}
@Test