Make sure forceFlush / shutdown can have callers wait for them to be done by returning CompletableResultCode. (#1571)

* Make sure forceFlush / shutdown can have callers wait for them to be done by returning CompletableResultCode.

* Merge
This commit is contained in:
Anuraag Agrawal 2020-08-29 02:51:39 +09:00 committed by GitHub
parent f680094039
commit c6c179c267
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 524 additions and 231 deletions

View File

@ -16,7 +16,7 @@
package io.opentelemetry.exporters.inmemory;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.ArrayList;

View File

@ -16,7 +16,7 @@
package io.opentelemetry.exporters.inmemory;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.ArrayList;
@ -108,11 +108,12 @@ public final class InMemorySpanExporter implements SpanExporter {
}
@Override
public void shutdown() {
public CompletableResultCode shutdown() {
synchronized (this) {
finishedSpanItems.clear();
isStopped = true;
}
return CompletableResultCode.ofSuccess();
}
private InMemorySpanExporter() {}

View File

@ -19,13 +19,14 @@ package io.opentelemetry.exporters.jaeger;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.exporters.jaeger.proto.api_v2.Collector;
import io.opentelemetry.exporters.jaeger.proto.api_v2.Collector.PostSpansResponse;
import io.opentelemetry.exporters.jaeger.proto.api_v2.CollectorServiceGrpc;
import io.opentelemetry.exporters.jaeger.proto.api_v2.Model;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
@ -168,15 +169,21 @@ public final class JaegerGrpcSpanExporter implements SpanExporter {
/**
* Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
* cancelled. The channel is forcefully closed after a timeout.
* cancelled.
*/
@Override
public void shutdown() {
try {
managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.log(Level.WARNING, "Failed to shutdown the gRPC channel", e);
}
public CompletableResultCode shutdown() {
final CompletableResultCode result = new CompletableResultCode();
managedChannel.notifyWhenStateChanged(
ConnectivityState.SHUTDOWN,
new Runnable() {
@Override
public void run() {
result.succeed();
}
});
managedChannel.shutdown();
return result;
}
/** Builder utility for this exporter. */

View File

@ -16,7 +16,7 @@
package io.opentelemetry.exporters.logging;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.Collection;

View File

@ -16,7 +16,7 @@
package io.opentelemetry.exporters.logging;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collection;
@ -55,7 +55,7 @@ public class LoggingSpanExporter implements SpanExporter {
}
@Override
public void shutdown() {
this.flush();
public CompletableResultCode shutdown() {
return flush();
}
}

View File

@ -21,7 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.common.AttributeValue;
import io.opentelemetry.common.Attributes;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.TestSpanData;
import io.opentelemetry.sdk.trace.data.EventImpl;
import io.opentelemetry.sdk.trace.data.SpanData;

View File

@ -30,7 +30,7 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc.MetricsServiceFutureStub;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;

View File

@ -22,6 +22,7 @@ import com.google.common.base.Splitter;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
@ -30,7 +31,7 @@ import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc;
import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc.TraceServiceFutureStub;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
@ -169,15 +170,21 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {
/**
* Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
* cancelled. The channel is forcefully closed after a timeout.
* cancelled.
*/
@Override
public void shutdown() {
try {
managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.log(Level.WARNING, "Failed to shutdown the gRPC channel", e);
}
public CompletableResultCode shutdown() {
final CompletableResultCode result = new CompletableResultCode();
managedChannel.notifyWhenStateChanged(
ConnectivityState.SHUTDOWN,
new Runnable() {
@Override
public void run() {
result.succeed();
}
});
managedChannel.shutdown();
return result;
}
/** Builder utility for this exporter. */

View File

@ -21,8 +21,8 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
import io.opentelemetry.common.AttributeValue;
import io.opentelemetry.common.ReadableAttributes;
import io.opentelemetry.common.ReadableKeyValuePairs.KeyValueConsumer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.resources.ResourceAttributes;
import io.opentelemetry.sdk.trace.data.SpanData;
@ -289,12 +289,13 @@ public final class ZipkinSpanExporter implements SpanExporter {
}
@Override
public void shutdown() {
public CompletableResultCode shutdown() {
try {
sender.close();
} catch (IOException e) {
logger.log(Level.WARNING, "Exception while closing the Zipkin Sender instance", e);
}
return CompletableResultCode.ofSuccess();
}
/**

View File

@ -20,7 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.common.Attributes;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.TestSpanData;
import io.opentelemetry.sdk.trace.data.EventImpl;
import io.opentelemetry.sdk.trace.data.SpanData;

View File

@ -26,8 +26,8 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.common.AttributeValue;
import io.opentelemetry.common.Attributes;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.resources.ResourceAttributes;

View File

@ -21,7 +21,7 @@ import static io.opentelemetry.common.AttributeValue.stringAttributeValue;
import io.opentelemetry.common.Attributes;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
@ -98,8 +98,9 @@ public class SpanPipelineBenchmark {
}
@Override
public void shutdown() {
public CompletableResultCode shutdown() {
// no-op
return CompletableResultCode.ofSuccess();
}
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.sdk.trace.export;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.trace.Span;
import io.opentelemetry.trace.Tracer;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
@State(Scope.Benchmark)
public class BatchSpanProcessorBenchmark {
private static class DelayingSpanExporter implements SpanExporter {
private final ScheduledExecutorService executor;
private final int delayMs;
private DelayingSpanExporter(int delayMs) {
executor = Executors.newScheduledThreadPool(5);
this.delayMs = delayMs;
}
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
final CompletableResultCode result = new CompletableResultCode();
executor.schedule(
new Runnable() {
@Override
public void run() {
result.succeed();
}
},
delayMs,
TimeUnit.MILLISECONDS);
return result;
}
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}
@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}
@Param({"0", "1", "5"})
private int delayMs;
@Param({"1000", "2000", "5000"})
private int spanCount;
private List<Span> spans;
private BatchSpanProcessor processor;
@Setup(Level.Trial)
public final void setup() {
SpanExporter exporter = new DelayingSpanExporter(delayMs);
processor = BatchSpanProcessor.newBuilder(exporter).build();
ImmutableList.Builder<Span> spans = ImmutableList.builderWithExpectedSize(spanCount);
Tracer tracer = OpenTelemetry.getTracerProvider().get("benchmarkTracer");
for (int i = 0; i < spanCount; i++) {
spans.add(tracer.spanBuilder("span").startSpan());
}
this.spans = spans.build();
}
/** Export spans through {@link io.opentelemetry.sdk.trace.export.BatchSpanProcessor}. */
@Benchmark
@Fork(1)
@Threads(5)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export() {
for (Span span : spans) {
processor.onEnd((ReadableSpan) span);
}
processor.forceFlush().join(10, TimeUnit.MINUTES);
}
}

View File

@ -18,7 +18,7 @@ package io.opentelemetry.sdk.trace.export;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricData.LongPoint;
import io.opentelemetry.sdk.metrics.data.MetricData.Point;
@ -58,7 +58,9 @@ public class BatchSpanProcessorDroppedSpansBenchmark {
}
@Override
public void shutdown() {}
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}
@State(Scope.Benchmark)

View File

@ -18,7 +18,7 @@ package io.opentelemetry.sdk.trace.export;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.trace.Span;
@ -76,7 +76,9 @@ public class BatchSpanProcessorFlushBenchmark {
}
@Override
public void shutdown() {}
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}
@Param({"0", "1", "5"})
@ -113,6 +115,6 @@ public class BatchSpanProcessorFlushBenchmark {
for (Span span : spans) {
processor.onEnd((ReadableSpan) span);
}
processor.flush().join(10, TimeUnit.MINUTES);
processor.forceFlush().join(10, TimeUnit.MINUTES);
}
}

View File

@ -16,7 +16,7 @@
package io.opentelemetry.sdk.trace.export;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.TestSpanData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.trace.Span;
@ -54,7 +54,9 @@ public class MultiSpanExporterBenchmark {
}
@Override
public void shutdown() {}
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}
@Param({"1", "3"})

View File

@ -14,12 +14,15 @@
* limitations under the License.
*/
package io.opentelemetry.sdk.common.export;
package io.opentelemetry.sdk.common;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
@ -41,6 +44,35 @@ public class CompletableResultCode {
return FAILURE;
}
/**
* Returns a {@link CompletableResultCode} that completes after all the provided {@link
* CompletableResultCode}s complete. If any of the results fail, the result will be failed.
*/
public static CompletableResultCode ofAll(final Collection<CompletableResultCode> codes) {
final CompletableResultCode result = new CompletableResultCode();
final AtomicInteger pending = new AtomicInteger(codes.size());
final AtomicBoolean failed = new AtomicBoolean();
for (final CompletableResultCode code : codes) {
code.whenComplete(
new Runnable() {
@Override
public void run() {
if (!code.isSuccess()) {
failed.set(true);
}
if (pending.decrementAndGet() == 0) {
if (failed.get()) {
result.fail();
} else {
result.succeed();
}
}
}
});
}
return result;
}
private static final CompletableResultCode SUCCESS = new CompletableResultCode().succeed();
private static final CompletableResultCode FAILURE = new CompletableResultCode().fail();

View File

@ -14,12 +14,13 @@
* limitations under the License.
*/
package io.opentelemetry.sdk.common.export;
package io.opentelemetry.sdk.common;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
@ -200,6 +201,36 @@ class CompletableResultCodeTest {
assertThat(result.isDone()).isTrue();
}
@Test
void ofAll() {
CompletableResultCode result1 = new CompletableResultCode();
CompletableResultCode result2 = new CompletableResultCode();
CompletableResultCode result3 = new CompletableResultCode();
CompletableResultCode all =
CompletableResultCode.ofAll(Arrays.asList(result1, result2, result3));
assertThat(all.isDone()).isFalse();
result1.succeed();
assertThat(all.isDone()).isFalse();
result2.succeed();
assertThat(all.isDone()).isFalse();
result3.succeed();
assertThat(all.isDone()).isTrue();
assertThat(all.isSuccess()).isTrue();
}
@Test
void ofAllWithFailure() {
assertThat(
CompletableResultCode.ofAll(
Arrays.asList(
CompletableResultCode.ofSuccess(),
CompletableResultCode.ofFailure(),
CompletableResultCode.ofSuccess()))
.isSuccess())
.isFalse();
}
@Test
void join() {
CompletableResultCode result = new CompletableResultCode();

View File

@ -18,8 +18,8 @@ package io.opentelemetry.sdk.metrics.export;
import com.google.auto.value.AutoValue;
import io.opentelemetry.internal.Utils;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.DaemonThreadFactory;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.ArrayList;

View File

@ -16,7 +16,7 @@
package io.opentelemetry.sdk.metrics.export;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.util.Collection;

View File

@ -20,8 +20,8 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
import io.opentelemetry.common.Labels;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilderTest.ConfigTester;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricData.Descriptor;

View File

@ -16,6 +16,7 @@
package io.opentelemetry.sdk.trace;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@ -66,17 +67,21 @@ public final class MultiSpanProcessor implements SpanProcessor {
}
@Override
public void shutdown() {
public CompletableResultCode shutdown() {
List<CompletableResultCode> results = new ArrayList<>(spanProcessorsAll.size());
for (SpanProcessor spanProcessor : spanProcessorsAll) {
spanProcessor.shutdown();
results.add(spanProcessor.shutdown());
}
return CompletableResultCode.ofAll(results);
}
@Override
public void forceFlush() {
public CompletableResultCode forceFlush() {
List<CompletableResultCode> results = new ArrayList<>(spanProcessorsAll.size());
for (SpanProcessor spanProcessor : spanProcessorsAll) {
spanProcessor.forceFlush();
results.add(spanProcessor.forceFlush());
}
return CompletableResultCode.ofAll(results);
}
private MultiSpanProcessor(List<SpanProcessor> spanProcessors) {

View File

@ -16,6 +16,8 @@
package io.opentelemetry.sdk.trace;
import io.opentelemetry.sdk.common.CompletableResultCode;
final class NoopSpanProcessor implements SpanProcessor {
private static final NoopSpanProcessor INSTANCE = new NoopSpanProcessor();
@ -40,10 +42,14 @@ final class NoopSpanProcessor implements SpanProcessor {
}
@Override
public void shutdown() {}
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
@Override
public void forceFlush() {}
public CompletableResultCode forceFlush() {
return CompletableResultCode.ofSuccess();
}
private NoopSpanProcessor() {}
}

View File

@ -16,6 +16,7 @@
package io.opentelemetry.sdk.trace;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.trace.Span;
/**
@ -60,17 +61,17 @@ public interface SpanProcessor {
boolean isEndRequired();
/**
* Called when {@link TracerSdkProvider#shutdown()} is called.
* Processes all span events that have not yet been processed and closes used resources.
*
* <p>Implementations must ensure that all span events are processed before returning.
* @return a {@link CompletableResultCode} which completes when shutdown is finished.
*/
void shutdown();
CompletableResultCode shutdown();
/**
* Processes all span events that have not yet been processed.
*
* <p>This method is executed synchronously on the calling thread, and should not throw
* exceptions.
* @return a {@link CompletableResultCode} which completes when currently queued spans are
* finished processing.
*/
void forceFlush();
CompletableResultCode forceFlush();
}

View File

@ -21,6 +21,7 @@ import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.config.TraceConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
// Represents the shared state/config between all Tracers created by the same TracerProvider.
@ -113,7 +114,7 @@ final class TracerSharedState {
if (isStopped) {
return;
}
activeSpanProcessor.shutdown();
activeSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
isStopped = true;
}
}

View File

@ -23,8 +23,8 @@ import io.opentelemetry.internal.Utils;
import io.opentelemetry.metrics.LongCounter;
import io.opentelemetry.metrics.LongCounter.BoundLongCounter;
import io.opentelemetry.metrics.Meter;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.DaemonThreadFactory;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
@ -123,18 +123,12 @@ public final class BatchSpanProcessor implements SpanProcessor {
}
@Override
public void shutdown() {
worker.shutdown();
public CompletableResultCode shutdown() {
return worker.shutdown();
}
@Override
public void forceFlush() {
worker.forceFlush();
}
// TODO remove this when this.forceFlush returns CompletableResultCode
@VisibleForTesting
CompletableResultCode flush() {
public CompletableResultCode forceFlush() {
return worker.forceFlush();
}
@ -241,15 +235,31 @@ public final class BatchSpanProcessor implements SpanProcessor {
nextExportTime = System.nanoTime() + scheduleDelayNanos;
}
private void shutdown() {
long pendingBatchesCountInQueue = queue.size() / maxExportBatchSize + 1L;
long pendingBatchesCount = pendingBatchesCountInQueue + 1;
long shutdownTimeout = pendingBatchesCount * exporterTimeoutMillis;
private CompletableResultCode shutdown() {
final CompletableResultCode result = new CompletableResultCode();
forceFlush().join(shutdownTimeout, TimeUnit.MILLISECONDS);
final CompletableResultCode flushResult = forceFlush();
flushResult.whenComplete(
new Runnable() {
@Override
public void run() {
continueWork = false;
final CompletableResultCode shutdownResult = spanExporter.shutdown();
shutdownResult.whenComplete(
new Runnable() {
@Override
public void run() {
if (!flushResult.isSuccess() || !shutdownResult.isSuccess()) {
result.fail();
} else {
result.succeed();
}
}
});
}
});
spanExporter.shutdown();
continueWork = false;
return result;
}
private CompletableResultCode forceFlush() {

View File

@ -16,11 +16,11 @@
package io.opentelemetry.sdk.trace.export;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -33,6 +33,7 @@ import java.util.logging.Logger;
*/
public final class MultiSpanExporter implements SpanExporter {
private static final Logger logger = Logger.getLogger(MultiSpanExporter.class.getName());
private final SpanExporter[] spanExporters;
/**
@ -47,19 +48,20 @@ public final class MultiSpanExporter implements SpanExporter {
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
final CompletableResultCode compositeResultCode = new CompletableResultCode();
final AtomicInteger completionsToProcess = new AtomicInteger(spanExporters.length);
List<CompletableResultCode> results = new ArrayList<>(spanExporters.length);
for (SpanExporter spanExporter : spanExporters) {
final CompletableResultCode exportResult;
try {
final CompletableResultCode singleResult = spanExporter.export(spans);
mergeResultCode(compositeResultCode, singleResult, completionsToProcess);
exportResult = spanExporter.export(spans);
} catch (Exception e) {
// If an exception was thrown by the exporter
logger.log(Level.WARNING, "Exception thrown by the export.", e);
compositeResultCode.fail();
results.add(CompletableResultCode.ofFailure());
continue;
}
results.add(exportResult);
}
return compositeResultCode;
return CompletableResultCode.ofAll(results);
}
/**
@ -69,45 +71,38 @@ public final class MultiSpanExporter implements SpanExporter {
*/
@Override
public CompletableResultCode flush() {
final CompletableResultCode compositeResultCode = new CompletableResultCode();
final AtomicInteger completionsToProcess = new AtomicInteger(spanExporters.length);
List<CompletableResultCode> results = new ArrayList<>(spanExporters.length);
for (SpanExporter spanExporter : spanExporters) {
final CompletableResultCode flushResult;
try {
mergeResultCode(compositeResultCode, spanExporter.flush(), completionsToProcess);
flushResult = spanExporter.flush();
} catch (Exception e) {
// If an exception was thrown by the exporter
logger.log(Level.WARNING, "Exception thrown by the flush.", e);
compositeResultCode.fail();
results.add(CompletableResultCode.ofFailure());
continue;
}
results.add(flushResult);
}
return compositeResultCode;
return CompletableResultCode.ofAll(results);
}
@Override
public void shutdown() {
public CompletableResultCode shutdown() {
List<CompletableResultCode> results = new ArrayList<>(spanExporters.length);
for (SpanExporter spanExporter : spanExporters) {
spanExporter.shutdown();
final CompletableResultCode shutdownResult;
try {
shutdownResult = spanExporter.shutdown();
} catch (Exception e) {
// If an exception was thrown by the exporter
logger.log(Level.WARNING, "Exception thrown by the shutdown.", e);
results.add(CompletableResultCode.ofFailure());
continue;
}
results.add(shutdownResult);
}
}
private static void mergeResultCode(
final CompletableResultCode compositeResultCode,
final CompletableResultCode singleResultCode,
final AtomicInteger completionsToProcess) {
singleResultCode.whenComplete(
new Runnable() {
@Override
public void run() {
int completionsRemaining = completionsToProcess.decrementAndGet();
if (singleResultCode.isSuccess()) {
if (completionsRemaining == 0) {
compositeResultCode.succeed();
}
} else {
compositeResultCode.fail();
}
}
});
return CompletableResultCode.ofAll(results);
}
private MultiSpanExporter(List<SpanExporter> spanExporters) {

View File

@ -17,7 +17,7 @@
package io.opentelemetry.sdk.trace.export;
import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
@ -102,13 +102,14 @@ public final class SimpleSpanProcessor implements SpanProcessor {
}
@Override
public void shutdown() {
spanExporter.shutdown();
public CompletableResultCode shutdown() {
return spanExporter.shutdown();
}
@Override
public void forceFlush() {
public CompletableResultCode forceFlush() {
// Do nothing.
return CompletableResultCode.ofSuccess();
}
/**

View File

@ -16,7 +16,7 @@
package io.opentelemetry.sdk.trace.export;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.TracerSdkProvider;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
@ -53,6 +53,8 @@ public interface SpanExporter {
/**
* Called when {@link TracerSdkProvider#shutdown()} is called, if this {@code SpanExporter} is
* register to a {@code TracerSdkProvider} object.
*
* @return a {@link CompletableResultCode} which is completed when shutdown completes.
*/
void shutdown();
CompletableResultCode shutdown();
}

View File

@ -24,6 +24,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.util.Arrays;
import java.util.Collections;
import org.junit.jupiter.api.BeforeEach;
@ -42,8 +43,12 @@ class MultiSpanProcessorTest {
MockitoAnnotations.initMocks(this);
when(spanProcessor1.isStartRequired()).thenReturn(true);
when(spanProcessor1.isEndRequired()).thenReturn(true);
when(spanProcessor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor2.isStartRequired()).thenReturn(true);
when(spanProcessor2.isEndRequired()).thenReturn(true);
when(spanProcessor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
}
@Test

View File

@ -60,7 +60,8 @@ abstract class StressTestRunner {
for (Thread thread : operationThreads) {
Uninterruptibles.joinUninterruptibly(thread);
}
getSpanProcessor().shutdown();
getSpanProcessor().shutdown().join(1, TimeUnit.MINUTES);
}
static Builder builder() {

View File

@ -19,8 +19,10 @@ package io.opentelemetry.sdk.trace;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.config.TraceConfig;
@ -28,18 +30,24 @@ import io.opentelemetry.trace.DefaultSpan;
import io.opentelemetry.trace.Span;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
/** Unit tests for {@link TracerSdkProvider}. */
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class TracerSdkProviderTest {
@Mock private SpanProcessor spanProcessor;
private final TracerSdkProvider tracerFactory = TracerSdkProvider.builder().build();
@BeforeEach
void setUp() {
MockitoAnnotations.initMocks(this);
when(spanProcessor.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
tracerFactory.addSpanProcessor(spanProcessor);
}

View File

@ -21,8 +21,8 @@ import static org.assertj.core.api.Assertions.assertThat;
import io.grpc.Context;
import io.opentelemetry.common.AttributeValue;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.trace.StressTestRunner.OperationUpdater;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
@ -185,13 +185,15 @@ class TracerSdkTest {
}
@Override
public void shutdown() {
public CompletableResultCode shutdown() {
// no-op
return CompletableResultCode.ofSuccess();
}
@Override
public void forceFlush() {
public CompletableResultCode forceFlush() {
// no-op
return CompletableResultCode.ofSuccess();
}
}
@ -229,8 +231,9 @@ class TracerSdkTest {
}
@Override
public void shutdown() {
public CompletableResultCode shutdown() {
// no-op
return CompletableResultCode.ofSuccess();
}
}
}

View File

@ -19,8 +19,9 @@ package io.opentelemetry.sdk.trace.export;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilderTest.ConfigTester;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.Samplers;
@ -208,7 +209,7 @@ class BatchSpanProcessorTest {
assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(98);
batchSpanProcessor.flush().join(10, TimeUnit.SECONDS);
batchSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS);
exported = waitingSpanExporter.getExported();
assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(2);
@ -450,6 +451,24 @@ class BatchSpanProcessorTest {
assertThat(waitingSpanExporter.shutDownCalled.get()).isTrue();
}
@Test
void shutdownPropagatesSuccess() {
when(mockServiceHandler.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
BatchSpanProcessor processor = BatchSpanProcessor.newBuilder(mockServiceHandler).build();
CompletableResultCode result = processor.shutdown();
result.join(1, TimeUnit.SECONDS);
assertThat(result.isSuccess()).isTrue();
}
@Test
void shutdownPropagatesFailure() {
when(mockServiceHandler.shutdown()).thenReturn(CompletableResultCode.ofFailure());
BatchSpanProcessor processor = BatchSpanProcessor.newBuilder(mockServiceHandler).build();
CompletableResultCode result = processor.shutdown();
result.join(1, TimeUnit.SECONDS);
assertThat(result.isSuccess()).isFalse();
}
private static final class BlockingSpanExporter implements SpanExporter {
final Object monitor = new Object();
@ -498,8 +517,9 @@ class BatchSpanProcessorTest {
}
@Override
public void shutdown() {
public CompletableResultCode shutdown() {
// Do nothing;
return CompletableResultCode.ofSuccess();
}
private void unblock() {
@ -548,8 +568,8 @@ class BatchSpanProcessorTest {
}
@Override
public void shutdown() {
flush();
public CompletableResultCode shutdown() {
return flush();
}
}
@ -613,8 +633,9 @@ class BatchSpanProcessorTest {
}
@Override
public void shutdown() {
public CompletableResultCode shutdown() {
shutDownCalled.set(true);
return CompletableResultCode.ofSuccess();
}
public void reset() {

View File

@ -18,32 +18,29 @@ package io.opentelemetry.sdk.trace.export;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.when;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.TestUtils;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
/** Unit tests for {@link MultiSpanExporterTest}. */
@ExtendWith(MockitoExtension.class)
class MultiSpanExporterTest {
@Mock private SpanExporter spanExporter1;
@Mock private SpanExporter spanExporter2;
private static final List<SpanData> SPAN_LIST =
Collections.singletonList(TestUtils.makeBasicSpan());
@BeforeEach
void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test
void empty() {
SpanExporter multiSpanExporter = MultiSpanExporter.create(Collections.emptyList());
@ -56,15 +53,15 @@ class MultiSpanExporterTest {
SpanExporter multiSpanExporter =
MultiSpanExporter.create(Collections.singletonList(spanExporter1));
Mockito.when(spanExporter1.export(same(SPAN_LIST)))
.thenReturn(CompletableResultCode.ofSuccess());
when(spanExporter1.export(same(SPAN_LIST))).thenReturn(CompletableResultCode.ofSuccess());
assertThat(multiSpanExporter.export(SPAN_LIST).isSuccess()).isTrue();
Mockito.verify(spanExporter1).export(same(SPAN_LIST));
Mockito.when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess());
assertThat(multiSpanExporter.flush().isSuccess()).isTrue();
Mockito.verify(spanExporter1).flush();
when(spanExporter1.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
multiSpanExporter.shutdown();
Mockito.verify(spanExporter1).shutdown();
}
@ -74,20 +71,20 @@ class MultiSpanExporterTest {
SpanExporter multiSpanExporter =
MultiSpanExporter.create(Arrays.asList(spanExporter1, spanExporter2));
Mockito.when(spanExporter1.export(same(SPAN_LIST)))
.thenReturn(CompletableResultCode.ofSuccess());
Mockito.when(spanExporter2.export(same(SPAN_LIST)))
.thenReturn(CompletableResultCode.ofSuccess());
when(spanExporter1.export(same(SPAN_LIST))).thenReturn(CompletableResultCode.ofSuccess());
when(spanExporter2.export(same(SPAN_LIST))).thenReturn(CompletableResultCode.ofSuccess());
assertThat(multiSpanExporter.export(SPAN_LIST).isSuccess()).isTrue();
Mockito.verify(spanExporter1).export(same(SPAN_LIST));
Mockito.verify(spanExporter2).export(same(SPAN_LIST));
Mockito.when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess());
Mockito.when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofSuccess());
assertThat(multiSpanExporter.flush().isSuccess()).isTrue();
Mockito.verify(spanExporter1).flush();
Mockito.verify(spanExporter2).flush();
when(spanExporter1.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
when(spanExporter2.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
multiSpanExporter.shutdown();
Mockito.verify(spanExporter1).shutdown();
Mockito.verify(spanExporter2).shutdown();
@ -98,19 +95,23 @@ class MultiSpanExporterTest {
SpanExporter multiSpanExporter =
MultiSpanExporter.create(Arrays.asList(spanExporter1, spanExporter2));
Mockito.when(spanExporter1.export(same(SPAN_LIST)))
.thenReturn(CompletableResultCode.ofSuccess());
Mockito.when(spanExporter2.export(same(SPAN_LIST)))
.thenReturn(CompletableResultCode.ofFailure());
when(spanExporter1.export(same(SPAN_LIST))).thenReturn(CompletableResultCode.ofSuccess());
when(spanExporter2.export(same(SPAN_LIST))).thenReturn(CompletableResultCode.ofFailure());
assertThat(multiSpanExporter.export(SPAN_LIST).isSuccess()).isFalse();
Mockito.verify(spanExporter1).export(same(SPAN_LIST));
Mockito.verify(spanExporter2).export(same(SPAN_LIST));
Mockito.when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess());
Mockito.when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofFailure());
when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofFailure());
assertThat(multiSpanExporter.flush().isSuccess()).isFalse();
Mockito.verify(spanExporter1).flush();
Mockito.verify(spanExporter2).flush();
when(spanExporter1.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
when(spanExporter2.shutdown()).thenReturn(CompletableResultCode.ofFailure());
assertThat(multiSpanExporter.shutdown().isSuccess()).isFalse();
Mockito.verify(spanExporter1).shutdown();
Mockito.verify(spanExporter2).shutdown();
}
@Test
@ -121,16 +122,23 @@ class MultiSpanExporterTest {
Mockito.doThrow(new IllegalArgumentException("No export for you."))
.when(spanExporter1)
.export(ArgumentMatchers.anyList());
Mockito.when(spanExporter2.export(same(SPAN_LIST)))
.thenReturn(CompletableResultCode.ofSuccess());
when(spanExporter2.export(same(SPAN_LIST))).thenReturn(CompletableResultCode.ofSuccess());
assertThat(multiSpanExporter.export(SPAN_LIST).isSuccess()).isFalse();
Mockito.verify(spanExporter1).export(same(SPAN_LIST));
Mockito.verify(spanExporter2).export(same(SPAN_LIST));
Mockito.doThrow(new IllegalArgumentException("No flush for you.")).when(spanExporter1).flush();
Mockito.when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofSuccess());
assertThat(multiSpanExporter.flush().isSuccess()).isFalse();
Mockito.verify(spanExporter1).flush();
Mockito.verify(spanExporter2).flush();
Mockito.doThrow(new IllegalArgumentException("No shutdown for you."))
.when(spanExporter1)
.shutdown();
when(spanExporter2.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
assertThat(multiSpanExporter.shutdown().isSuccess()).isFalse();
Mockito.verify(spanExporter1).shutdown();
Mockito.verify(spanExporter2).shutdown();
}
}

View File

@ -22,7 +22,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import io.opentelemetry.sdk.common.export.CompletableResultCode;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilderTest.ConfigTester;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
@ -45,10 +45,15 @@ import java.util.Map;
import java.util.Properties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
/** Unit tests for {@link SimpleSpanProcessor}. */
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class SimpleSpanProcessorTest {
private static final long MAX_SCHEDULE_DELAY_MILLIS = 500;
private static final String SPAN_NAME = "MySpanName";
@ -69,7 +74,6 @@ class SimpleSpanProcessorTest {
@BeforeEach
void setUp() {
MockitoAnnotations.initMocks(this);
simpleSampledSpansProcessor = SimpleSpanProcessor.newBuilder(spanExporter).build();
}

View File

@ -19,6 +19,7 @@ package io.opentelemetry.sdk.extensions.trace.export;
import com.google.common.base.Preconditions;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
@ -91,13 +92,13 @@ public final class DisruptorAsyncSpanProcessor implements SpanProcessor {
}
@Override
public void shutdown() {
disruptorEventQueue.shutdown();
public CompletableResultCode shutdown() {
return disruptorEventQueue.shutdown();
}
@Override
public void forceFlush() {
disruptorEventQueue.forceFlush();
public CompletableResultCode forceFlush() {
return disruptorEventQueue.forceFlush();
}
/**

View File

@ -23,11 +23,11 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.DaemonThreadFactory;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@ -42,17 +42,19 @@ import javax.annotation.concurrent.ThreadSafe;
final class DisruptorEventQueue {
private static final Logger logger = Logger.getLogger(DisruptorEventQueue.class.getName());
private static final String WORKER_THREAD_NAME = "DisruptorEventQueue_WorkerThread";
private static final EventTranslatorThreeArg<DisruptorEvent, EventType, Object, CountDownLatch>
private static final EventTranslatorThreeArg<
DisruptorEvent, EventType, Object, CompletableResultCode>
TRANSLATOR_THREE_ARG =
new EventTranslatorThreeArg<DisruptorEvent, EventType, Object, CountDownLatch>() {
new EventTranslatorThreeArg<DisruptorEvent, EventType, Object, CompletableResultCode>() {
@Override
public void translateTo(
DisruptorEvent event,
long sequence,
EventType eventType,
Object span,
CountDownLatch countDownLatch) {
event.setEntry(eventType, span, countDownLatch);
CompletableResultCode result) {
event.setEntry(eventType, span, result);
}
};
private static final EventFactory<DisruptorEvent> EVENT_FACTORY =
@ -68,12 +70,6 @@ final class DisruptorEventQueue {
private volatile boolean isShutdown = false;
private final boolean blocking;
/**
* Only one consumer for {@link DisruptorEventQueue#forceFlush()} and {@link
* DisruptorEventQueue#shutdown()} invocation.
*/
private static final byte NUM_CONSUMERS = 1;
private enum EventType {
ON_START,
ON_END,
@ -121,47 +117,41 @@ final class DisruptorEventQueue {
// Shuts down the underlying disruptor. Ensures that when this method returns the disruptor is
// shutdown.
void shutdown() {
CompletableResultCode shutdown() {
synchronized (this) {
if (isShutdown) {
// Race condition between two calls to shutdown. The other call already finished.
return;
return CompletableResultCode.ofSuccess();
}
isShutdown = true;
enqueueAndLock(EventType.ON_SHUTDOWN);
return enqueueWithResult(EventType.ON_SHUTDOWN);
}
}
// Force to publish the ended spans to the SpanProcessor
void forceFlush() {
CompletableResultCode forceFlush() {
if (isShutdown) {
if (!loggedShutdownMessage.getAndSet(true)) {
logger.info("Attempted to flush after Disruptor shutdown.");
}
return;
return CompletableResultCode.ofFailure();
}
enqueueAndLock(EventType.ON_FORCE_FLUSH);
return enqueueWithResult(EventType.ON_FORCE_FLUSH);
}
private void enqueueAndLock(EventType event) {
CountDownLatch waitingCounter = new CountDownLatch(NUM_CONSUMERS); // only one processor.
enqueue(event, null, waitingCounter);
try {
waitingCounter.await();
} catch (InterruptedException e) {
// Preserve the interruption.
Thread.currentThread().interrupt();
logger.warning("Thread interrupted, shutdown may not finished.");
}
private CompletableResultCode enqueueWithResult(EventType event) {
CompletableResultCode result = new CompletableResultCode();
enqueue(event, null, result);
return result;
}
// Enqueues an event on the {@link DisruptorEventQueue}.
private void enqueue(EventType eventType, Object span, CountDownLatch flushLatch) {
private void enqueue(EventType eventType, Object span, CompletableResultCode result) {
if (blocking) {
ringBuffer.publishEvent(TRANSLATOR_THREE_ARG, eventType, span, flushLatch);
ringBuffer.publishEvent(TRANSLATOR_THREE_ARG, eventType, span, result);
} else {
// TODO: Record metrics if element not added.
ringBuffer.tryPublishEvent(TRANSLATOR_THREE_ARG, eventType, span, flushLatch);
ringBuffer.tryPublishEvent(TRANSLATOR_THREE_ARG, eventType, span, result);
}
}
@ -169,13 +159,15 @@ final class DisruptorEventQueue {
private static final class DisruptorEvent {
@Nullable private Object span = null;
@Nullable private EventType eventType = null;
@Nullable private CountDownLatch waitingCounter = null;
@Nullable private CompletableResultCode result = null;
void setEntry(
@Nullable EventType eventType, @Nullable Object span, @Nullable CountDownLatch flushLatch) {
@Nullable EventType eventType,
@Nullable Object span,
@Nullable CompletableResultCode result) {
this.span = span;
this.eventType = eventType;
this.waitingCounter = flushLatch;
this.result = result;
}
@Nullable
@ -188,9 +180,15 @@ final class DisruptorEventQueue {
return eventType;
}
void countDownWaitingCounter() {
if (waitingCounter != null) {
waitingCounter.countDown();
void succeed() {
if (result != null) {
result.succeed();
}
}
void fail() {
if (result != null) {
result.fail();
}
}
}
@ -203,7 +201,7 @@ final class DisruptorEventQueue {
}
@Override
public void onEvent(DisruptorEvent event, long sequence, boolean endOfBatch) {
public void onEvent(final DisruptorEvent event, long sequence, boolean endOfBatch) {
final Object readableSpan = event.getSpan();
final EventType eventType = event.getEventType();
if (eventType == null) {
@ -219,12 +217,11 @@ final class DisruptorEventQueue {
spanProcessor.onEnd((ReadableSpan) readableSpan);
break;
case ON_SHUTDOWN:
spanProcessor.shutdown();
event.countDownWaitingCounter();
propagateResult(spanProcessor.shutdown(), event);
break;
case ON_FORCE_FLUSH:
spanProcessor.forceFlush();
event.countDownWaitingCounter();
propagateResult(spanProcessor.forceFlush(), event);
break;
}
} finally {
@ -233,4 +230,19 @@ final class DisruptorEventQueue {
}
}
}
private static void propagateResult(
final CompletableResultCode result, final DisruptorEvent event) {
result.whenComplete(
new Runnable() {
@Override
public void run() {
if (result.isSuccess()) {
event.succeed();
} else {
event.fail();
}
}
});
}
}

View File

@ -18,6 +18,7 @@ package io.opentelemetry.sdk.extensions.trace.export;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.trace.MultiSpanProcessor;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
@ -26,14 +27,16 @@ import io.opentelemetry.sdk.trace.SpanProcessor;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;
/** Unit tests for {@link DisruptorAsyncSpanProcessor}. */
@ExtendWith(MockitoExtension.class)
class DisruptorAsyncSpanProcessorTest {
private static final boolean REQUIRED = true;
private static final boolean NOT_REQUIRED = false;
@ -41,11 +44,6 @@ class DisruptorAsyncSpanProcessorTest {
@Mock private ReadableSpan readableSpan;
@Mock private ReadWriteSpan readWriteSpan;
@BeforeEach
void setUp() {
MockitoAnnotations.initMocks(this);
}
// EventQueueEntry for incrementing a Counter.
private static class IncrementSpanProcessor implements SpanProcessor {
private final AtomicInteger counterOnStart = new AtomicInteger(0);
@ -85,15 +83,17 @@ class DisruptorAsyncSpanProcessorTest {
}
@Override
public void shutdown() {
public CompletableResultCode shutdown() {
counterOnShutdown.incrementAndGet();
return CompletableResultCode.ofSuccess();
}
@Override
public void forceFlush() {
public CompletableResultCode forceFlush() {
counterOnForceFlush.incrementAndGet();
deltaExportedForceFlushSpans.set(counterEndSpans.getAndSet(0));
counterOnExportedForceFlushSpans.addAndGet(deltaExportedForceFlushSpans.get());
return CompletableResultCode.ofSuccess();
}
private int getCounterOnStart() {
@ -132,8 +132,8 @@ class DisruptorAsyncSpanProcessorTest {
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0);
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
disruptorAsyncSpanProcessor.onEnd(readableSpan);
disruptorAsyncSpanProcessor.forceFlush();
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS);
disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(1);
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(1);
assertThat(incrementSpanProcessor.getCounterOnForceFlush()).isEqualTo(1);
@ -152,8 +152,8 @@ class DisruptorAsyncSpanProcessorTest {
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0);
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
disruptorAsyncSpanProcessor.onEnd(readableSpan);
disruptorAsyncSpanProcessor.forceFlush();
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS);
disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0);
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(1);
assertThat(incrementSpanProcessor.getCounterOnForceFlush()).isEqualTo(1);
@ -172,8 +172,8 @@ class DisruptorAsyncSpanProcessorTest {
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0);
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
disruptorAsyncSpanProcessor.onEnd(readableSpan);
disruptorAsyncSpanProcessor.forceFlush();
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS);
disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(1);
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0);
assertThat(incrementSpanProcessor.getCounterOnForceFlush()).isEqualTo(1);
@ -185,11 +185,11 @@ class DisruptorAsyncSpanProcessorTest {
IncrementSpanProcessor incrementSpanProcessor = new IncrementSpanProcessor(REQUIRED, REQUIRED);
DisruptorAsyncSpanProcessor disruptorAsyncSpanProcessor =
DisruptorAsyncSpanProcessor.newBuilder(incrementSpanProcessor).build();
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
assertThat(incrementSpanProcessor.getCounterOnShutdown()).isEqualTo(1);
}
@ -198,14 +198,14 @@ class DisruptorAsyncSpanProcessorTest {
IncrementSpanProcessor incrementSpanProcessor = new IncrementSpanProcessor(REQUIRED, REQUIRED);
DisruptorAsyncSpanProcessor disruptorAsyncSpanProcessor =
DisruptorAsyncSpanProcessor.newBuilder(incrementSpanProcessor).build();
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
disruptorAsyncSpanProcessor.onEnd(readableSpan);
disruptorAsyncSpanProcessor.forceFlush();
disruptorAsyncSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS);
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0);
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0);
assertThat(incrementSpanProcessor.getCounterOnForceFlush()).isEqualTo(0);
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
assertThat(incrementSpanProcessor.getCounterOnShutdown()).isEqualTo(1);
}
@ -219,13 +219,13 @@ class DisruptorAsyncSpanProcessorTest {
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
disruptorAsyncSpanProcessor.onEnd(readableSpan);
if (i % 10 == 0) {
disruptorAsyncSpanProcessor.forceFlush();
disruptorAsyncSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS);
}
}
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(tenK);
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(tenK);
assertThat(incrementSpanProcessor.getCounterOnForceFlush()).isEqualTo(tenK / 10);
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
assertThat(incrementSpanProcessor.getCounterOnShutdown()).isEqualTo(1);
}
@ -240,7 +240,7 @@ class DisruptorAsyncSpanProcessorTest {
.build();
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
disruptorAsyncSpanProcessor.onEnd(readableSpan);
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
assertThat(incrementSpanProcessor1.getCounterOnStart()).isEqualTo(1);
assertThat(incrementSpanProcessor1.getCounterOnEnd()).isEqualTo(1);
assertThat(incrementSpanProcessor1.getCounterOnShutdown()).isEqualTo(1);
@ -261,11 +261,11 @@ class DisruptorAsyncSpanProcessorTest {
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
disruptorAsyncSpanProcessor.onEnd(readableSpan);
if (i % 100 == 0) {
disruptorAsyncSpanProcessor.forceFlush();
disruptorAsyncSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS);
assertThat(incrementSpanProcessor.getDeltaExportedForceFlushSpans()).isEqualTo(100);
}
}
disruptorAsyncSpanProcessor.shutdown();
disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS);
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(tenK);
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(tenK);
assertThat(incrementSpanProcessor.getCounterOnExportedForceFlushSpans()).isEqualTo(tenK);

View File

@ -16,6 +16,7 @@
package io.opentelemetry.sdk.extensions.zpages;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.ConfigBuilder;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
@ -89,13 +90,15 @@ final class TracezSpanProcessor implements SpanProcessor {
}
@Override
public void shutdown() {
public CompletableResultCode shutdown() {
// Do nothing.
return CompletableResultCode.ofSuccess();
}
@Override
public void forceFlush() {
public CompletableResultCode forceFlush() {
// Do nothing.
return CompletableResultCode.ofSuccess();
}
/**