Added a static method "drain" under JcTools with a generic consumer (#4582)

* Added a static method "drain" under JcTools with a generic consumer

* Rename spanT to T

* Update sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java

* Rename Test methods

Co-authored-by: Anuraag Agrawal <anuraaga@gmail.com>
This commit is contained in:
Mohit Palriwal 2022-07-07 23:30:35 -07:00 committed by GitHub
parent 198aecdfa4
commit aa873a05be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 206 additions and 3 deletions

View File

@ -7,6 +7,7 @@ package io.opentelemetry.sdk.trace.internal;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.Consumer;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscArrayQueue;
@ -42,5 +43,29 @@ public final class JcTools {
}
}
/**
* Remove up to <i>limit</i> elements from the {@link Queue} and hand to consume.
*
* @throws IllegalArgumentException consumer is {@code null}
* @throws IllegalArgumentException if maxExportBatchSize is negative
*/
@SuppressWarnings("unchecked")
public static <T> void drain(Queue<T> queue, int limit, Consumer<T> consumer) {
if (queue instanceof MessagePassingQueue) {
((MessagePassingQueue<T>) queue).drain(consumer::accept, limit);
} else {
drainNonJcQueue(queue, limit, consumer);
}
}
private static <T> void drainNonJcQueue(
Queue<T> queue, int maxExportBatchSize, Consumer<T> consumer) {
int polledCount = 0;
T item;
while (polledCount++ < maxExportBatchSize && (item = queue.poll()) != null) {
consumer.accept(item);
}
}
private JcTools() {}
}

View File

@ -0,0 +1,109 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.trace.internal;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import org.jctools.queues.MpscArrayQueue;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class JcToolsTest {
ArrayList<String> batch = new ArrayList<>(10);
@Test
void drain_ArrayBlockingQueue() {
// Arrange
batch.add("Test3");
Queue<String> queue = new ArrayBlockingQueue<>(10);
queue.add("Test1");
queue.add("Test2");
// Act
JcTools.drain(queue, 5, batch::add);
// Assert
assertThat(batch).hasSize(3);
assertThat(queue).hasSize(0);
}
@Test
void drain_MessagePassingQueue() {
// Arrange
batch.add("Test3");
Queue<String> queue = new MpscArrayQueue<>(10);
queue.add("Test1");
queue.add("Test2");
// Act
JcTools.drain(queue, 5, batch::add);
// Assert
assertThat(batch).hasSize(3);
assertThat(queue).hasSize(0);
}
@Test
void drain_MaxBatch() {
// Arrange
Queue<String> queue = new MpscArrayQueue<>(10);
queue.add("Test1");
queue.add("Test2");
// Act
JcTools.drain(queue, 1, batch::add);
// Assert
assertThat(batch).hasSize(1);
assertThat(queue).hasSize(1);
}
@Test
void newFixedSize_MpscQueue() {
// Arrange
int capacity = 10;
// Act
Queue<Object> objects = JcTools.newFixedSizeQueue(capacity);
// Assert
assertThat(objects).isInstanceOf(MpscArrayQueue.class);
}
@Test
void capacity_MpscQueue() {
// Arrange
int capacity = 10;
Queue<Object> queue = JcTools.newFixedSizeQueue(capacity);
// Act
long queueSize = JcTools.capacity(queue);
// Assert
assertThat(queueSize).isGreaterThan(capacity);
}
@Test
void capacity_ArrayBlockingQueue() {
// Arrange
Queue<String> queue = new ArrayBlockingQueue<>(10);
// Act
long queueSize = JcTools.capacity(queue);
// Assert
assertThat(queueSize).isEqualTo(10);
}
}

View File

@ -237,9 +237,9 @@ public final class BatchSpanProcessor implements SpanProcessor {
if (flushRequested.get() != null) {
flush();
}
while (!queue.isEmpty() && batch.size() < maxExportBatchSize) {
batch.add(queue.poll().toSpanData());
}
JcTools.drain(
queue, maxExportBatchSize - batch.size(), span -> batch.add(span.toSpanData()));
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) {
exportCurrentBatch();
updateNextExportTime();

View File

@ -220,6 +220,75 @@ class BatchSpanProcessorTest {
assertThat(exported.size()).isEqualTo(2);
}
@Test
void testEmptyQueue() {
// Arrange
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(100, CompletableResultCode.ofSuccess(), 1);
BatchSpanProcessor batchSpanProcessor =
BatchSpanProcessor.builder(waitingSpanExporter)
.setMaxExportBatchSize(10)
.setScheduleDelay(10, TimeUnit.SECONDS)
.setMaxQueueSize(10_000)
.build();
// Act
sdkTracerProvider = SdkTracerProvider.builder().addSpanProcessor(batchSpanProcessor).build();
List<SpanData> exported = waitingSpanExporter.waitForExport();
// Assert
await().untilAsserted(() -> assertThat(batchSpanProcessor.getQueue()).isEmpty());
assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(0);
}
@Test
void testQueueSizeSmallerThanMaxBatch() {
// Arrange
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(100, CompletableResultCode.ofSuccess(), 1);
BatchSpanProcessor batchSpanProcessor =
BatchSpanProcessor.builder(waitingSpanExporter)
.setMaxExportBatchSize(11)
.setScheduleDelay(10, TimeUnit.SECONDS)
.setMaxQueueSize(10_000)
.build();
// Act
sdkTracerProvider = SdkTracerProvider.builder().addSpanProcessor(batchSpanProcessor).build();
for (int i = 0; i < 10; i++) {
createEndedSpan("notExported");
}
List<SpanData> exported = waitingSpanExporter.waitForExport();
// Assert
assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(0);
}
@Test
void testQueueSizeSmallerThanMaxBatchWithForceFlush() {
// Arrange
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(100, CompletableResultCode.ofSuccess(), 1);
BatchSpanProcessor batchSpanProcessor =
BatchSpanProcessor.builder(waitingSpanExporter)
.setMaxExportBatchSize(11)
.setScheduleDelay(10, TimeUnit.SECONDS)
.setMaxQueueSize(10_000)
.build();
// Act
sdkTracerProvider = SdkTracerProvider.builder().addSpanProcessor(batchSpanProcessor).build();
for (int i = 0; i < 10; i++) {
createEndedSpan("notExported");
}
batchSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS);
List<SpanData> exported = waitingSpanExporter.waitForExport();
// Assert
assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(10);
await().untilAsserted(() -> assertThat(batchSpanProcessor.getQueue()).isEmpty());
}
@Test
void exportSpansToMultipleExporters() {
WaitingSpanExporter waitingSpanExporter =