Replace ArrayBlockingQueue with jctools queue. (#3034)

* Replace ArrayBlockingQueue with jctools queue.

* Finish

* ArrayQueue

* Fix dependency

* Drift

* Memory note

* Iteration
This commit is contained in:
Anuraag Agrawal 2021-03-31 15:09:25 +09:00 committed by GitHub
parent c667636643
commit 2f2af19358
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 82 additions and 16 deletions

View File

@ -92,6 +92,7 @@ val DEPENDENCIES = listOf(
"org.awaitility:awaitility:4.0.3",
"org.codehaus.mojo:animal-sniffer-annotations:1.20",
"org.curioswitch.curiostack:protobuf-jackson:1.2.0",
"org.jctools:jctools-core:3.3.0",
"org.junit-pioneer:junit-pioneer:1.3.8",
"org.skyscreamer:jsonassert:1.5.0",
"org.slf4j:slf4j-simple:1.7.30"

View File

@ -35,6 +35,8 @@ dependencies {
compileOnly("io.prometheus:simpleclient_httpserver")
compileOnly(project(":exporters:zipkin"))
testImplementation(project(path=":sdk:trace-shaded-deps"))
testImplementation(project(":proto"))
testImplementation(project(":sdk:testing"))
testImplementation("com.linecorp.armeria:armeria-junit5")

View File

@ -17,11 +17,12 @@ import io.opentelemetry.sdk.trace.SpanLimits;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.internal.JcTools;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -97,8 +98,7 @@ class TracerProviderConfigurationTest {
assertThat(worker)
.extracting("queue")
.isInstanceOfSatisfying(
ArrayBlockingQueue.class,
queue -> assertThat(queue.remainingCapacity()).isEqualTo(2048));
Queue.class, queue -> assertThat(JcTools.capacity(queue)).isEqualTo(2048));
assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter);
});
} finally {
@ -133,8 +133,7 @@ class TracerProviderConfigurationTest {
assertThat(worker)
.extracting("queue")
.isInstanceOfSatisfying(
ArrayBlockingQueue.class,
queue -> assertThat(queue.remainingCapacity()).isEqualTo(2));
Queue.class, queue -> assertThat(JcTools.capacity(queue)).isEqualTo(2));
assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter);
});
} finally {

View File

@ -0,0 +1,22 @@
plugins {
`java-library`
id("com.github.johnrengelman.shadow")
}
// This project is not published, it is bundled into :sdk:trace
description = "Internal use only - shaded dependencies of OpenTelemetry SDK for Tracing"
extra["moduleName"] = "io.opentelemetry.sdk.trace.internal"
dependencies {
implementation("org.jctools:jctools-core")
}
tasks {
shadowJar {
minimize()
relocate("org.jctools", "io.opentelemetry.internal.shaded.jctools")
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.trace.internal;
import java.util.Queue;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscArrayQueue;
/** Internal accessor of JCTools package for fast queues. */
public final class JcTools {
/**
* Returns a new {@link Queue} appropriate for use with multiple producers and a single consumer.
*/
public static <T> Queue<T> newMpscArrayQueue(int capacity) {
return new MpscArrayQueue<>(capacity);
}
/**
* Returns the capacity of the {@link Queue}, which must be a JcTools queue. We cast to the
* implementation so callers do not need to use the shaded classes.
*/
public static long capacity(Queue<?> queue) {
return ((MessagePassingQueue<?>) queue).capacity();
}
private JcTools() {}
}

View File

@ -9,10 +9,14 @@ plugins {
description = "OpenTelemetry SDK For Tracing"
extra["moduleName"] = "io.opentelemetry.sdk.trace"
evaluationDependsOn(":sdk:trace-shaded-deps")
dependencies {
api(project(":api:all"))
api(project(":sdk:common"))
compileOnly(project(":sdk:trace-shaded-deps"))
implementation(project(":api:metrics"))
implementation(project(":semconv"))
@ -24,6 +28,7 @@ dependencies {
testImplementation("com.google.guava:guava")
jmh(project(":sdk:metrics"))
jmh(project(":sdk:trace-shaded-deps"))
jmh(project(":sdk:testing")) {
// JMH doesn"t handle dependencies that are duplicated between the main and jmh
// configurations properly, but luckily here it"s simple enough to just exclude transitive
@ -62,4 +67,11 @@ tasks {
File(propertiesDir, "version.properties").writeText("sdk.version=${project.version}")
}
}
jar {
inputs.files(project(":sdk:trace-shaded-deps").file("src"))
val shadowJar = project(":sdk:trace-shaded-deps").tasks.named<Jar>("shadowJar")
from(zipTree(shadowJar.get().archiveFile))
dependsOn(shadowJar)
}
}

View File

@ -57,10 +57,6 @@ public class BatchSpanProcessorMultiThreadBenchmark {
new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads);
exportedSpans = metrics.exportedSpans();
droppedSpans = metrics.droppedSpans();
}
@TearDown(Level.Trial)
public final void tearDown() {
processor.shutdown().join(10, TimeUnit.SECONDS);
}
}

View File

@ -38,6 +38,7 @@ public class DelayingSpanExporter implements SpanExporter {
@Override
public CompletableResultCode shutdown() {
executor.shutdown();
return CompletableResultCode.ofSuccess();
}
}

View File

@ -17,8 +17,10 @@ import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.internal.JcTools;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@ -36,9 +38,6 @@ import java.util.logging.Logger;
* {@code maxQueueSize} maximum size, if queue is full spans are dropped). Spans are exported either
* when there are {@code maxExportBatchSize} pending spans or {@code scheduleDelayNanos} has passed
* since the last export finished.
*
* <p>This batch {@link SpanProcessor} can cause high contention in a very high traffic service.
* TODO: Add a link to the SpanProcessor that uses Disruptor as alternative with low contention.
*/
public final class BatchSpanProcessor implements SpanProcessor {
@ -73,7 +72,7 @@ public final class BatchSpanProcessor implements SpanProcessor {
scheduleDelayNanos,
maxExportBatchSize,
exporterTimeoutNanos,
new ArrayBlockingQueue<>(maxQueueSize));
JcTools.newMpscArrayQueue(maxQueueSize));
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.start();
}
@ -131,7 +130,8 @@ public final class BatchSpanProcessor implements SpanProcessor {
private final long exporterTimeoutNanos;
private long nextExportTime;
private final BlockingQueue<ReadableSpan> queue;
private final Queue<ReadableSpan> queue;
// When waiting on the spans queue, exporter thread sets this atomic to the number of more
// spans it needs before doing an export. Writer threads would then wait for the queue to reach
// spansNeeded size before notifying the exporter thread about new entries.
@ -149,7 +149,7 @@ public final class BatchSpanProcessor implements SpanProcessor {
long scheduleDelayNanos,
int maxExportBatchSize,
long exporterTimeoutNanos,
BlockingQueue<ReadableSpan> queue) {
Queue<ReadableSpan> queue) {
this.spanExporter = spanExporter;
this.scheduleDelayNanos = scheduleDelayNanos;
this.maxExportBatchSize = maxExportBatchSize;

View File

@ -86,7 +86,8 @@ public final class BatchSpanProcessorBuilder {
}
/**
* Sets the maximum number of Spans that are kept in the queue before start dropping.
* Sets the maximum number of Spans that are kept in the queue before start dropping. More memory
* than this value may be allocated to optimize queue access.
*
* <p>See the BatchSampledSpansProcessor class description for a high-level design description of
* this class.

View File

@ -63,6 +63,7 @@ include(":sdk:common")
include(":sdk:metrics")
include(":sdk:testing")
include(":sdk:trace")
include(":sdk:trace-shaded-deps")
include(":sdk-extensions:async-processor")
include(":sdk-extensions:autoconfigure")
include(":sdk-extensions:aws")