diff --git a/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemoryMetricExporter.java b/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemoryMetricExporter.java index 23596e7f1d..5730286457 100644 --- a/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemoryMetricExporter.java +++ b/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemoryMetricExporter.java @@ -16,7 +16,7 @@ package io.opentelemetry.exporters.inmemory; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.util.ArrayList; diff --git a/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemorySpanExporter.java b/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemorySpanExporter.java index 83c6872d56..571ec51b38 100644 --- a/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemorySpanExporter.java +++ b/exporters/inmemory/src/main/java/io/opentelemetry/exporters/inmemory/InMemorySpanExporter.java @@ -16,7 +16,7 @@ package io.opentelemetry.exporters.inmemory; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.util.ArrayList; @@ -108,11 +108,12 @@ public final class InMemorySpanExporter implements SpanExporter { } @Override - public void shutdown() { + public CompletableResultCode shutdown() { synchronized (this) { finishedSpanItems.clear(); isStopped = true; } + return CompletableResultCode.ofSuccess(); } private InMemorySpanExporter() {} diff --git a/exporters/jaeger/src/main/java/io/opentelemetry/exporters/jaeger/JaegerGrpcSpanExporter.java b/exporters/jaeger/src/main/java/io/opentelemetry/exporters/jaeger/JaegerGrpcSpanExporter.java index 7521b8a137..cdd578076c 100644 --- a/exporters/jaeger/src/main/java/io/opentelemetry/exporters/jaeger/JaegerGrpcSpanExporter.java +++ b/exporters/jaeger/src/main/java/io/opentelemetry/exporters/jaeger/JaegerGrpcSpanExporter.java @@ -19,13 +19,14 @@ package io.opentelemetry.exporters.jaeger; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.opentelemetry.exporters.jaeger.proto.api_v2.Collector; import io.opentelemetry.exporters.jaeger.proto.api_v2.Collector.PostSpansResponse; import io.opentelemetry.exporters.jaeger.proto.api_v2.CollectorServiceGrpc; import io.opentelemetry.exporters.jaeger.proto.api_v2.Model; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; @@ -168,15 +169,21 @@ public final class JaegerGrpcSpanExporter implements SpanExporter { /** * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately - * cancelled. The channel is forcefully closed after a timeout. + * cancelled. */ @Override - public void shutdown() { - try { - managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - logger.log(Level.WARNING, "Failed to shutdown the gRPC channel", e); - } + public CompletableResultCode shutdown() { + final CompletableResultCode result = new CompletableResultCode(); + managedChannel.notifyWhenStateChanged( + ConnectivityState.SHUTDOWN, + new Runnable() { + @Override + public void run() { + result.succeed(); + } + }); + managedChannel.shutdown(); + return result; } /** Builder utility for this exporter. */ diff --git a/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingMetricExporter.java b/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingMetricExporter.java index bbde39d65d..9d35d1032f 100644 --- a/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingMetricExporter.java +++ b/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingMetricExporter.java @@ -16,7 +16,7 @@ package io.opentelemetry.exporters.logging; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import java.util.Collection; diff --git a/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingSpanExporter.java b/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingSpanExporter.java index 77dbf9b2d7..3fecd6a838 100644 --- a/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingSpanExporter.java +++ b/exporters/logging/src/main/java/io/opentelemetry/exporters/logging/LoggingSpanExporter.java @@ -16,7 +16,7 @@ package io.opentelemetry.exporters.logging; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; import java.util.Collection; @@ -55,7 +55,7 @@ public class LoggingSpanExporter implements SpanExporter { } @Override - public void shutdown() { - this.flush(); + public CompletableResultCode shutdown() { + return flush(); } } diff --git a/exporters/logging/src/test/java/io/opentelemetry/exporters/logging/LoggingSpanExporterTest.java b/exporters/logging/src/test/java/io/opentelemetry/exporters/logging/LoggingSpanExporterTest.java index 2cbdb3a2ff..424ea7e345 100644 --- a/exporters/logging/src/test/java/io/opentelemetry/exporters/logging/LoggingSpanExporterTest.java +++ b/exporters/logging/src/test/java/io/opentelemetry/exporters/logging/LoggingSpanExporterTest.java @@ -21,7 +21,7 @@ import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.common.AttributeValue; import io.opentelemetry.common.Attributes; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.TestSpanData; import io.opentelemetry.sdk.trace.data.EventImpl; import io.opentelemetry.sdk.trace.data.SpanData; diff --git a/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcMetricExporter.java b/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcMetricExporter.java index 7da0f7583a..3852dbf19f 100644 --- a/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcMetricExporter.java +++ b/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcMetricExporter.java @@ -30,7 +30,7 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc.MetricsServiceFutureStub; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; diff --git a/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcSpanExporter.java b/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcSpanExporter.java index c14528757d..53bc2b6ee0 100644 --- a/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcSpanExporter.java +++ b/exporters/otlp/src/main/java/io/opentelemetry/exporters/otlp/OtlpGrpcSpanExporter.java @@ -22,6 +22,7 @@ import com.google.common.base.Splitter; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.Metadata; @@ -30,7 +31,7 @@ import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc; import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc.TraceServiceFutureStub; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; @@ -169,15 +170,21 @@ public final class OtlpGrpcSpanExporter implements SpanExporter { /** * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately - * cancelled. The channel is forcefully closed after a timeout. + * cancelled. */ @Override - public void shutdown() { - try { - managedChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - logger.log(Level.WARNING, "Failed to shutdown the gRPC channel", e); - } + public CompletableResultCode shutdown() { + final CompletableResultCode result = new CompletableResultCode(); + managedChannel.notifyWhenStateChanged( + ConnectivityState.SHUTDOWN, + new Runnable() { + @Override + public void run() { + result.succeed(); + } + }); + managedChannel.shutdown(); + return result; } /** Builder utility for this exporter. */ diff --git a/exporters/zipkin/src/main/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporter.java b/exporters/zipkin/src/main/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporter.java index c29aa3ff15..39732b9b35 100644 --- a/exporters/zipkin/src/main/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporter.java +++ b/exporters/zipkin/src/main/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporter.java @@ -21,8 +21,8 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; import io.opentelemetry.common.AttributeValue; import io.opentelemetry.common.ReadableAttributes; import io.opentelemetry.common.ReadableKeyValuePairs.KeyValueConsumer; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; -import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.resources.ResourceAttributes; import io.opentelemetry.sdk.trace.data.SpanData; @@ -289,12 +289,13 @@ public final class ZipkinSpanExporter implements SpanExporter { } @Override - public void shutdown() { + public CompletableResultCode shutdown() { try { sender.close(); } catch (IOException e) { logger.log(Level.WARNING, "Exception while closing the Zipkin Sender instance", e); } + return CompletableResultCode.ofSuccess(); } /** diff --git a/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterEndToEndHttpTest.java b/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterEndToEndHttpTest.java index d101daadfb..f0a2f3f704 100644 --- a/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterEndToEndHttpTest.java +++ b/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterEndToEndHttpTest.java @@ -20,7 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; import com.google.common.collect.ImmutableList; import io.opentelemetry.common.Attributes; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.TestSpanData; import io.opentelemetry.sdk.trace.data.EventImpl; import io.opentelemetry.sdk.trace.data.SpanData; diff --git a/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterTest.java b/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterTest.java index 6d59e2f92d..cef65deeaa 100644 --- a/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterTest.java +++ b/exporters/zipkin/src/test/java/io/opentelemetry/exporters/zipkin/ZipkinSpanExporterTest.java @@ -26,8 +26,8 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; import io.opentelemetry.common.AttributeValue; import io.opentelemetry.common.Attributes; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; -import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.resources.ResourceAttributes; diff --git a/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/SpanPipelineBenchmark.java b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/SpanPipelineBenchmark.java index df8236197d..72e55e60f0 100644 --- a/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/SpanPipelineBenchmark.java +++ b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/SpanPipelineBenchmark.java @@ -21,7 +21,7 @@ import static io.opentelemetry.common.AttributeValue.stringAttributeValue; import io.opentelemetry.common.Attributes; import io.opentelemetry.sdk.OpenTelemetrySdk; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; @@ -98,8 +98,9 @@ public class SpanPipelineBenchmark { } @Override - public void shutdown() { + public CompletableResultCode shutdown() { // no-op + return CompletableResultCode.ofSuccess(); } } diff --git a/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java new file mode 100644 index 0000000000..6f3994e4b7 --- /dev/null +++ b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java @@ -0,0 +1,120 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.trace.export; + +import com.google.common.collect.ImmutableList; +import io.opentelemetry.OpenTelemetry; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.trace.Span; +import io.opentelemetry.trace.Tracer; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@State(Scope.Benchmark) +public class BatchSpanProcessorBenchmark { + + private static class DelayingSpanExporter implements SpanExporter { + + private final ScheduledExecutorService executor; + + private final int delayMs; + + private DelayingSpanExporter(int delayMs) { + executor = Executors.newScheduledThreadPool(5); + this.delayMs = delayMs; + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public CompletableResultCode export(Collection spans) { + final CompletableResultCode result = new CompletableResultCode(); + executor.schedule( + new Runnable() { + @Override + public void run() { + result.succeed(); + } + }, + delayMs, + TimeUnit.MILLISECONDS); + return result; + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } + } + + @Param({"0", "1", "5"}) + private int delayMs; + + @Param({"1000", "2000", "5000"}) + private int spanCount; + + private List spans; + + private BatchSpanProcessor processor; + + @Setup(Level.Trial) + public final void setup() { + SpanExporter exporter = new DelayingSpanExporter(delayMs); + processor = BatchSpanProcessor.newBuilder(exporter).build(); + + ImmutableList.Builder spans = ImmutableList.builderWithExpectedSize(spanCount); + Tracer tracer = OpenTelemetry.getTracerProvider().get("benchmarkTracer"); + for (int i = 0; i < spanCount; i++) { + spans.add(tracer.spanBuilder("span").startSpan()); + } + this.spans = spans.build(); + } + + /** Export spans through {@link io.opentelemetry.sdk.trace.export.BatchSpanProcessor}. */ + @Benchmark + @Fork(1) + @Threads(5) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 10, time = 1) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export() { + for (Span span : spans) { + processor.onEnd((ReadableSpan) span); + } + processor.forceFlush().join(10, TimeUnit.MINUTES); + } +} diff --git a/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java index eed38cada5..7c1f36fc3b 100644 --- a/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java +++ b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java @@ -18,7 +18,7 @@ package io.opentelemetry.sdk.trace.export; import io.opentelemetry.OpenTelemetry; import io.opentelemetry.sdk.OpenTelemetrySdk; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricData.LongPoint; import io.opentelemetry.sdk.metrics.data.MetricData.Point; @@ -58,7 +58,9 @@ public class BatchSpanProcessorDroppedSpansBenchmark { } @Override - public void shutdown() {} + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } } @State(Scope.Benchmark) diff --git a/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorFlushBenchmark.java b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorFlushBenchmark.java index 6ca8a5a1e1..fe1e30dc5d 100644 --- a/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorFlushBenchmark.java +++ b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorFlushBenchmark.java @@ -18,7 +18,7 @@ package io.opentelemetry.sdk.trace.export; import com.google.common.collect.ImmutableList; import io.opentelemetry.OpenTelemetry; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.trace.Span; @@ -76,7 +76,9 @@ public class BatchSpanProcessorFlushBenchmark { } @Override - public void shutdown() {} + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } } @Param({"0", "1", "5"}) @@ -113,6 +115,6 @@ public class BatchSpanProcessorFlushBenchmark { for (Span span : spans) { processor.onEnd((ReadableSpan) span); } - processor.flush().join(10, TimeUnit.MINUTES); + processor.forceFlush().join(10, TimeUnit.MINUTES); } } diff --git a/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterBenchmark.java b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterBenchmark.java index 1b00c58780..a268eb35c8 100644 --- a/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterBenchmark.java +++ b/sdk/all/src/jmh/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterBenchmark.java @@ -16,7 +16,7 @@ package io.opentelemetry.sdk.trace.export; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.TestSpanData; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.trace.Span; @@ -54,7 +54,9 @@ public class MultiSpanExporterBenchmark { } @Override - public void shutdown() {} + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } } @Param({"1", "3"}) diff --git a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java b/sdk/common/src/main/java/io/opentelemetry/sdk/common/CompletableResultCode.java similarity index 78% rename from sdk/common/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java rename to sdk/common/src/main/java/io/opentelemetry/sdk/common/CompletableResultCode.java index f516566c22..f34e0c93cb 100644 --- a/sdk/common/src/main/java/io/opentelemetry/sdk/common/export/CompletableResultCode.java +++ b/sdk/common/src/main/java/io/opentelemetry/sdk/common/CompletableResultCode.java @@ -14,12 +14,15 @@ * limitations under the License. */ -package io.opentelemetry.sdk.common.export; +package io.opentelemetry.sdk.common; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -41,6 +44,35 @@ public class CompletableResultCode { return FAILURE; } + /** + * Returns a {@link CompletableResultCode} that completes after all the provided {@link + * CompletableResultCode}s complete. If any of the results fail, the result will be failed. + */ + public static CompletableResultCode ofAll(final Collection codes) { + final CompletableResultCode result = new CompletableResultCode(); + final AtomicInteger pending = new AtomicInteger(codes.size()); + final AtomicBoolean failed = new AtomicBoolean(); + for (final CompletableResultCode code : codes) { + code.whenComplete( + new Runnable() { + @Override + public void run() { + if (!code.isSuccess()) { + failed.set(true); + } + if (pending.decrementAndGet() == 0) { + if (failed.get()) { + result.fail(); + } else { + result.succeed(); + } + } + } + }); + } + return result; + } + private static final CompletableResultCode SUCCESS = new CompletableResultCode().succeed(); private static final CompletableResultCode FAILURE = new CompletableResultCode().fail(); diff --git a/sdk/common/src/test/java/io/opentelemetry/sdk/common/export/CompletableResultCodeTest.java b/sdk/common/src/test/java/io/opentelemetry/sdk/common/CompletableResultCodeTest.java similarity index 85% rename from sdk/common/src/test/java/io/opentelemetry/sdk/common/export/CompletableResultCodeTest.java rename to sdk/common/src/test/java/io/opentelemetry/sdk/common/CompletableResultCodeTest.java index 101c6527bb..429907c71f 100644 --- a/sdk/common/src/test/java/io/opentelemetry/sdk/common/export/CompletableResultCodeTest.java +++ b/sdk/common/src/test/java/io/opentelemetry/sdk/common/CompletableResultCodeTest.java @@ -14,12 +14,13 @@ * limitations under the License. */ -package io.opentelemetry.sdk.common.export; +package io.opentelemetry.sdk.common; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -200,6 +201,36 @@ class CompletableResultCodeTest { assertThat(result.isDone()).isTrue(); } + @Test + void ofAll() { + CompletableResultCode result1 = new CompletableResultCode(); + CompletableResultCode result2 = new CompletableResultCode(); + CompletableResultCode result3 = new CompletableResultCode(); + + CompletableResultCode all = + CompletableResultCode.ofAll(Arrays.asList(result1, result2, result3)); + assertThat(all.isDone()).isFalse(); + result1.succeed(); + assertThat(all.isDone()).isFalse(); + result2.succeed(); + assertThat(all.isDone()).isFalse(); + result3.succeed(); + assertThat(all.isDone()).isTrue(); + assertThat(all.isSuccess()).isTrue(); + } + + @Test + void ofAllWithFailure() { + assertThat( + CompletableResultCode.ofAll( + Arrays.asList( + CompletableResultCode.ofSuccess(), + CompletableResultCode.ofFailure(), + CompletableResultCode.ofSuccess())) + .isSuccess()) + .isFalse(); + } + @Test void join() { CompletableResultCode result = new CompletableResultCode(); diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReader.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReader.java index 2a5b6dca28..a4e65572b0 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReader.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReader.java @@ -18,8 +18,8 @@ package io.opentelemetry.sdk.metrics.export; import com.google.auto.value.AutoValue; import io.opentelemetry.internal.Utils; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.DaemonThreadFactory; -import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.metrics.data.MetricData; import java.util.ArrayList; diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExporter.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExporter.java index 7657d531da..17d6d67904 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExporter.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/MetricExporter.java @@ -16,7 +16,7 @@ package io.opentelemetry.sdk.metrics.export; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.metrics.data.MetricData; import java.util.Collection; diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReaderTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReaderTest.java index 8bd2956cb1..0542996b67 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReaderTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/IntervalMetricReaderTest.java @@ -20,8 +20,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.when; import io.opentelemetry.common.Labels; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; -import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilderTest.ConfigTester; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricData.Descriptor; diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java index 0194571b38..3908b399b9 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java @@ -16,6 +16,7 @@ package io.opentelemetry.sdk.trace; +import io.opentelemetry.sdk.common.CompletableResultCode; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -66,17 +67,21 @@ public final class MultiSpanProcessor implements SpanProcessor { } @Override - public void shutdown() { + public CompletableResultCode shutdown() { + List results = new ArrayList<>(spanProcessorsAll.size()); for (SpanProcessor spanProcessor : spanProcessorsAll) { - spanProcessor.shutdown(); + results.add(spanProcessor.shutdown()); } + return CompletableResultCode.ofAll(results); } @Override - public void forceFlush() { + public CompletableResultCode forceFlush() { + List results = new ArrayList<>(spanProcessorsAll.size()); for (SpanProcessor spanProcessor : spanProcessorsAll) { - spanProcessor.forceFlush(); + results.add(spanProcessor.forceFlush()); } + return CompletableResultCode.ofAll(results); } private MultiSpanProcessor(List spanProcessors) { diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/NoopSpanProcessor.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/NoopSpanProcessor.java index 3b4abc3861..9960aa9fdf 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/NoopSpanProcessor.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/NoopSpanProcessor.java @@ -16,6 +16,8 @@ package io.opentelemetry.sdk.trace; +import io.opentelemetry.sdk.common.CompletableResultCode; + final class NoopSpanProcessor implements SpanProcessor { private static final NoopSpanProcessor INSTANCE = new NoopSpanProcessor(); @@ -40,10 +42,14 @@ final class NoopSpanProcessor implements SpanProcessor { } @Override - public void shutdown() {} + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } @Override - public void forceFlush() {} + public CompletableResultCode forceFlush() { + return CompletableResultCode.ofSuccess(); + } private NoopSpanProcessor() {} } diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java index 341f726d42..d14049aa28 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java @@ -16,6 +16,7 @@ package io.opentelemetry.sdk.trace; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.trace.Span; /** @@ -60,17 +61,17 @@ public interface SpanProcessor { boolean isEndRequired(); /** - * Called when {@link TracerSdkProvider#shutdown()} is called. + * Processes all span events that have not yet been processed and closes used resources. * - *

Implementations must ensure that all span events are processed before returning. + * @return a {@link CompletableResultCode} which completes when shutdown is finished. */ - void shutdown(); + CompletableResultCode shutdown(); /** * Processes all span events that have not yet been processed. * - *

This method is executed synchronously on the calling thread, and should not throw - * exceptions. + * @return a {@link CompletableResultCode} which completes when currently queued spans are + * finished processing. */ - void forceFlush(); + CompletableResultCode forceFlush(); } diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/TracerSharedState.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/TracerSharedState.java index fc80f00ec4..07b3df2949 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/TracerSharedState.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/TracerSharedState.java @@ -21,6 +21,7 @@ import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.config.TraceConfig; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; // Represents the shared state/config between all Tracers created by the same TracerProvider. @@ -113,7 +114,7 @@ final class TracerSharedState { if (isStopped) { return; } - activeSpanProcessor.shutdown(); + activeSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); isStopped = true; } } diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 484e5d68cf..4ceb19cc1c 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -23,8 +23,8 @@ import io.opentelemetry.internal.Utils; import io.opentelemetry.metrics.LongCounter; import io.opentelemetry.metrics.LongCounter.BoundLongCounter; import io.opentelemetry.metrics.Meter; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.DaemonThreadFactory; -import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; @@ -123,18 +123,12 @@ public final class BatchSpanProcessor implements SpanProcessor { } @Override - public void shutdown() { - worker.shutdown(); + public CompletableResultCode shutdown() { + return worker.shutdown(); } @Override - public void forceFlush() { - worker.forceFlush(); - } - - // TODO remove this when this.forceFlush returns CompletableResultCode - @VisibleForTesting - CompletableResultCode flush() { + public CompletableResultCode forceFlush() { return worker.forceFlush(); } @@ -241,15 +235,31 @@ public final class BatchSpanProcessor implements SpanProcessor { nextExportTime = System.nanoTime() + scheduleDelayNanos; } - private void shutdown() { - long pendingBatchesCountInQueue = queue.size() / maxExportBatchSize + 1L; - long pendingBatchesCount = pendingBatchesCountInQueue + 1; - long shutdownTimeout = pendingBatchesCount * exporterTimeoutMillis; + private CompletableResultCode shutdown() { + final CompletableResultCode result = new CompletableResultCode(); - forceFlush().join(shutdownTimeout, TimeUnit.MILLISECONDS); + final CompletableResultCode flushResult = forceFlush(); + flushResult.whenComplete( + new Runnable() { + @Override + public void run() { + continueWork = false; + final CompletableResultCode shutdownResult = spanExporter.shutdown(); + shutdownResult.whenComplete( + new Runnable() { + @Override + public void run() { + if (!flushResult.isSuccess() || !shutdownResult.isSuccess()) { + result.fail(); + } else { + result.succeed(); + } + } + }); + } + }); - spanExporter.shutdown(); - continueWork = false; + return result; } private CompletableResultCode forceFlush() { diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/MultiSpanExporter.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/MultiSpanExporter.java index 4246df349e..4d90d17507 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/MultiSpanExporter.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/MultiSpanExporter.java @@ -16,11 +16,11 @@ package io.opentelemetry.sdk.trace.export; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -33,6 +33,7 @@ import java.util.logging.Logger; */ public final class MultiSpanExporter implements SpanExporter { private static final Logger logger = Logger.getLogger(MultiSpanExporter.class.getName()); + private final SpanExporter[] spanExporters; /** @@ -47,19 +48,20 @@ public final class MultiSpanExporter implements SpanExporter { @Override public CompletableResultCode export(Collection spans) { - final CompletableResultCode compositeResultCode = new CompletableResultCode(); - final AtomicInteger completionsToProcess = new AtomicInteger(spanExporters.length); + List results = new ArrayList<>(spanExporters.length); for (SpanExporter spanExporter : spanExporters) { + final CompletableResultCode exportResult; try { - final CompletableResultCode singleResult = spanExporter.export(spans); - mergeResultCode(compositeResultCode, singleResult, completionsToProcess); + exportResult = spanExporter.export(spans); } catch (Exception e) { // If an exception was thrown by the exporter logger.log(Level.WARNING, "Exception thrown by the export.", e); - compositeResultCode.fail(); + results.add(CompletableResultCode.ofFailure()); + continue; } + results.add(exportResult); } - return compositeResultCode; + return CompletableResultCode.ofAll(results); } /** @@ -69,45 +71,38 @@ public final class MultiSpanExporter implements SpanExporter { */ @Override public CompletableResultCode flush() { - final CompletableResultCode compositeResultCode = new CompletableResultCode(); - final AtomicInteger completionsToProcess = new AtomicInteger(spanExporters.length); + List results = new ArrayList<>(spanExporters.length); for (SpanExporter spanExporter : spanExporters) { + final CompletableResultCode flushResult; try { - mergeResultCode(compositeResultCode, spanExporter.flush(), completionsToProcess); + flushResult = spanExporter.flush(); } catch (Exception e) { // If an exception was thrown by the exporter logger.log(Level.WARNING, "Exception thrown by the flush.", e); - compositeResultCode.fail(); + results.add(CompletableResultCode.ofFailure()); + continue; } + results.add(flushResult); } - return compositeResultCode; + return CompletableResultCode.ofAll(results); } @Override - public void shutdown() { + public CompletableResultCode shutdown() { + List results = new ArrayList<>(spanExporters.length); for (SpanExporter spanExporter : spanExporters) { - spanExporter.shutdown(); + final CompletableResultCode shutdownResult; + try { + shutdownResult = spanExporter.shutdown(); + } catch (Exception e) { + // If an exception was thrown by the exporter + logger.log(Level.WARNING, "Exception thrown by the shutdown.", e); + results.add(CompletableResultCode.ofFailure()); + continue; + } + results.add(shutdownResult); } - } - - private static void mergeResultCode( - final CompletableResultCode compositeResultCode, - final CompletableResultCode singleResultCode, - final AtomicInteger completionsToProcess) { - singleResultCode.whenComplete( - new Runnable() { - @Override - public void run() { - int completionsRemaining = completionsToProcess.decrementAndGet(); - if (singleResultCode.isSuccess()) { - if (completionsRemaining == 0) { - compositeResultCode.succeed(); - } - } else { - compositeResultCode.fail(); - } - } - }); + return CompletableResultCode.ofAll(results); } private MultiSpanExporter(List spanExporters) { diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java index 7db54fbaa5..323940ecd1 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java @@ -17,7 +17,7 @@ package io.opentelemetry.sdk.trace.export; import com.google.common.annotations.VisibleForTesting; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; @@ -102,13 +102,14 @@ public final class SimpleSpanProcessor implements SpanProcessor { } @Override - public void shutdown() { - spanExporter.shutdown(); + public CompletableResultCode shutdown() { + return spanExporter.shutdown(); } @Override - public void forceFlush() { + public CompletableResultCode forceFlush() { // Do nothing. + return CompletableResultCode.ofSuccess(); } /** diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/SpanExporter.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/SpanExporter.java index fec6e78a5f..d693eb1a70 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/SpanExporter.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/SpanExporter.java @@ -16,7 +16,7 @@ package io.opentelemetry.sdk.trace.export; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.TracerSdkProvider; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.Collection; @@ -53,6 +53,8 @@ public interface SpanExporter { /** * Called when {@link TracerSdkProvider#shutdown()} is called, if this {@code SpanExporter} is * register to a {@code TracerSdkProvider} object. + * + * @return a {@link CompletableResultCode} which is completed when shutdown completes. */ - void shutdown(); + CompletableResultCode shutdown(); } diff --git a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java index 1f1b53dc12..1f44a1f556 100644 --- a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java +++ b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import io.opentelemetry.sdk.common.CompletableResultCode; import java.util.Arrays; import java.util.Collections; import org.junit.jupiter.api.BeforeEach; @@ -42,8 +43,12 @@ class MultiSpanProcessorTest { MockitoAnnotations.initMocks(this); when(spanProcessor1.isStartRequired()).thenReturn(true); when(spanProcessor1.isEndRequired()).thenReturn(true); + when(spanProcessor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + when(spanProcessor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); when(spanProcessor2.isStartRequired()).thenReturn(true); when(spanProcessor2.isEndRequired()).thenReturn(true); + when(spanProcessor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + when(spanProcessor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); } @Test diff --git a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/StressTestRunner.java b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/StressTestRunner.java index c45c70092e..350018fe09 100644 --- a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/StressTestRunner.java +++ b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/StressTestRunner.java @@ -60,7 +60,8 @@ abstract class StressTestRunner { for (Thread thread : operationThreads) { Uninterruptibles.joinUninterruptibly(thread); } - getSpanProcessor().shutdown(); + + getSpanProcessor().shutdown().join(1, TimeUnit.MINUTES); } static Builder builder() { diff --git a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkProviderTest.java b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkProviderTest.java index b13e0f0858..b165dce4d5 100644 --- a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkProviderTest.java +++ b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkProviderTest.java @@ -19,8 +19,10 @@ package io.opentelemetry.sdk.trace; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.config.TraceConfig; @@ -28,18 +30,24 @@ import io.opentelemetry.trace.DefaultSpan; import io.opentelemetry.trace.Span; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; /** Unit tests for {@link TracerSdkProvider}. */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) class TracerSdkProviderTest { @Mock private SpanProcessor spanProcessor; private final TracerSdkProvider tracerFactory = TracerSdkProvider.builder().build(); @BeforeEach void setUp() { - MockitoAnnotations.initMocks(this); + when(spanProcessor.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + when(spanProcessor.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); tracerFactory.addSpanProcessor(spanProcessor); } diff --git a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java index e6d0d79a43..ec51015e93 100644 --- a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java +++ b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java @@ -21,8 +21,8 @@ import static org.assertj.core.api.Assertions.assertThat; import io.grpc.Context; import io.opentelemetry.common.AttributeValue; import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationLibraryInfo; -import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.trace.StressTestRunner.OperationUpdater; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; @@ -185,13 +185,15 @@ class TracerSdkTest { } @Override - public void shutdown() { + public CompletableResultCode shutdown() { // no-op + return CompletableResultCode.ofSuccess(); } @Override - public void forceFlush() { + public CompletableResultCode forceFlush() { // no-op + return CompletableResultCode.ofSuccess(); } } @@ -229,8 +231,9 @@ class TracerSdkTest { } @Override - public void shutdown() { + public CompletableResultCode shutdown() { // no-op + return CompletableResultCode.ofSuccess(); } } } diff --git a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java index 25bb292beb..bb191fe410 100644 --- a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java +++ b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java @@ -19,8 +19,9 @@ package io.opentelemetry.sdk.trace.export; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilderTest.ConfigTester; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.Samplers; @@ -208,7 +209,7 @@ class BatchSpanProcessorTest { assertThat(exported).isNotNull(); assertThat(exported.size()).isEqualTo(98); - batchSpanProcessor.flush().join(10, TimeUnit.SECONDS); + batchSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS); exported = waitingSpanExporter.getExported(); assertThat(exported).isNotNull(); assertThat(exported.size()).isEqualTo(2); @@ -450,6 +451,24 @@ class BatchSpanProcessorTest { assertThat(waitingSpanExporter.shutDownCalled.get()).isTrue(); } + @Test + void shutdownPropagatesSuccess() { + when(mockServiceHandler.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + BatchSpanProcessor processor = BatchSpanProcessor.newBuilder(mockServiceHandler).build(); + CompletableResultCode result = processor.shutdown(); + result.join(1, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isTrue(); + } + + @Test + void shutdownPropagatesFailure() { + when(mockServiceHandler.shutdown()).thenReturn(CompletableResultCode.ofFailure()); + BatchSpanProcessor processor = BatchSpanProcessor.newBuilder(mockServiceHandler).build(); + CompletableResultCode result = processor.shutdown(); + result.join(1, TimeUnit.SECONDS); + assertThat(result.isSuccess()).isFalse(); + } + private static final class BlockingSpanExporter implements SpanExporter { final Object monitor = new Object(); @@ -498,8 +517,9 @@ class BatchSpanProcessorTest { } @Override - public void shutdown() { + public CompletableResultCode shutdown() { // Do nothing; + return CompletableResultCode.ofSuccess(); } private void unblock() { @@ -548,8 +568,8 @@ class BatchSpanProcessorTest { } @Override - public void shutdown() { - flush(); + public CompletableResultCode shutdown() { + return flush(); } } @@ -613,8 +633,9 @@ class BatchSpanProcessorTest { } @Override - public void shutdown() { + public CompletableResultCode shutdown() { shutDownCalled.set(true); + return CompletableResultCode.ofSuccess(); } public void reset() { diff --git a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterTest.java b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterTest.java index 5d6e861dcc..6e5fa14a8b 100644 --- a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterTest.java +++ b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/MultiSpanExporterTest.java @@ -18,32 +18,29 @@ package io.opentelemetry.sdk.trace.export; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.when; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.TestUtils; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentMatchers; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; +import org.mockito.junit.jupiter.MockitoExtension; /** Unit tests for {@link MultiSpanExporterTest}. */ +@ExtendWith(MockitoExtension.class) class MultiSpanExporterTest { @Mock private SpanExporter spanExporter1; @Mock private SpanExporter spanExporter2; private static final List SPAN_LIST = Collections.singletonList(TestUtils.makeBasicSpan()); - @BeforeEach - void setUp() { - MockitoAnnotations.initMocks(this); - } - @Test void empty() { SpanExporter multiSpanExporter = MultiSpanExporter.create(Collections.emptyList()); @@ -56,15 +53,15 @@ class MultiSpanExporterTest { SpanExporter multiSpanExporter = MultiSpanExporter.create(Collections.singletonList(spanExporter1)); - Mockito.when(spanExporter1.export(same(SPAN_LIST))) - .thenReturn(CompletableResultCode.ofSuccess()); + when(spanExporter1.export(same(SPAN_LIST))).thenReturn(CompletableResultCode.ofSuccess()); assertThat(multiSpanExporter.export(SPAN_LIST).isSuccess()).isTrue(); Mockito.verify(spanExporter1).export(same(SPAN_LIST)); - Mockito.when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess()); + when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess()); assertThat(multiSpanExporter.flush().isSuccess()).isTrue(); Mockito.verify(spanExporter1).flush(); + when(spanExporter1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); multiSpanExporter.shutdown(); Mockito.verify(spanExporter1).shutdown(); } @@ -74,20 +71,20 @@ class MultiSpanExporterTest { SpanExporter multiSpanExporter = MultiSpanExporter.create(Arrays.asList(spanExporter1, spanExporter2)); - Mockito.when(spanExporter1.export(same(SPAN_LIST))) - .thenReturn(CompletableResultCode.ofSuccess()); - Mockito.when(spanExporter2.export(same(SPAN_LIST))) - .thenReturn(CompletableResultCode.ofSuccess()); + when(spanExporter1.export(same(SPAN_LIST))).thenReturn(CompletableResultCode.ofSuccess()); + when(spanExporter2.export(same(SPAN_LIST))).thenReturn(CompletableResultCode.ofSuccess()); assertThat(multiSpanExporter.export(SPAN_LIST).isSuccess()).isTrue(); Mockito.verify(spanExporter1).export(same(SPAN_LIST)); Mockito.verify(spanExporter2).export(same(SPAN_LIST)); - Mockito.when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess()); - Mockito.when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofSuccess()); + when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess()); + when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofSuccess()); assertThat(multiSpanExporter.flush().isSuccess()).isTrue(); Mockito.verify(spanExporter1).flush(); Mockito.verify(spanExporter2).flush(); + when(spanExporter1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(spanExporter2.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); multiSpanExporter.shutdown(); Mockito.verify(spanExporter1).shutdown(); Mockito.verify(spanExporter2).shutdown(); @@ -98,19 +95,23 @@ class MultiSpanExporterTest { SpanExporter multiSpanExporter = MultiSpanExporter.create(Arrays.asList(spanExporter1, spanExporter2)); - Mockito.when(spanExporter1.export(same(SPAN_LIST))) - .thenReturn(CompletableResultCode.ofSuccess()); - Mockito.when(spanExporter2.export(same(SPAN_LIST))) - .thenReturn(CompletableResultCode.ofFailure()); + when(spanExporter1.export(same(SPAN_LIST))).thenReturn(CompletableResultCode.ofSuccess()); + when(spanExporter2.export(same(SPAN_LIST))).thenReturn(CompletableResultCode.ofFailure()); assertThat(multiSpanExporter.export(SPAN_LIST).isSuccess()).isFalse(); Mockito.verify(spanExporter1).export(same(SPAN_LIST)); Mockito.verify(spanExporter2).export(same(SPAN_LIST)); - Mockito.when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess()); - Mockito.when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofFailure()); + when(spanExporter1.flush()).thenReturn(CompletableResultCode.ofSuccess()); + when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofFailure()); assertThat(multiSpanExporter.flush().isSuccess()).isFalse(); Mockito.verify(spanExporter1).flush(); Mockito.verify(spanExporter2).flush(); + + when(spanExporter1.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(spanExporter2.shutdown()).thenReturn(CompletableResultCode.ofFailure()); + assertThat(multiSpanExporter.shutdown().isSuccess()).isFalse(); + Mockito.verify(spanExporter1).shutdown(); + Mockito.verify(spanExporter2).shutdown(); } @Test @@ -121,16 +122,23 @@ class MultiSpanExporterTest { Mockito.doThrow(new IllegalArgumentException("No export for you.")) .when(spanExporter1) .export(ArgumentMatchers.anyList()); - Mockito.when(spanExporter2.export(same(SPAN_LIST))) - .thenReturn(CompletableResultCode.ofSuccess()); + when(spanExporter2.export(same(SPAN_LIST))).thenReturn(CompletableResultCode.ofSuccess()); assertThat(multiSpanExporter.export(SPAN_LIST).isSuccess()).isFalse(); Mockito.verify(spanExporter1).export(same(SPAN_LIST)); Mockito.verify(spanExporter2).export(same(SPAN_LIST)); Mockito.doThrow(new IllegalArgumentException("No flush for you.")).when(spanExporter1).flush(); - Mockito.when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofSuccess()); + when(spanExporter2.flush()).thenReturn(CompletableResultCode.ofSuccess()); assertThat(multiSpanExporter.flush().isSuccess()).isFalse(); Mockito.verify(spanExporter1).flush(); Mockito.verify(spanExporter2).flush(); + + Mockito.doThrow(new IllegalArgumentException("No shutdown for you.")) + .when(spanExporter1) + .shutdown(); + when(spanExporter2.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + assertThat(multiSpanExporter.shutdown().isSuccess()).isFalse(); + Mockito.verify(spanExporter1).shutdown(); + Mockito.verify(spanExporter2).shutdown(); } } diff --git a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorTest.java b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorTest.java index 7c0e9da3c6..3d949b108d 100644 --- a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorTest.java +++ b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorTest.java @@ -22,7 +22,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import io.opentelemetry.sdk.common.export.CompletableResultCode; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilderTest.ConfigTester; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; @@ -45,10 +45,15 @@ import java.util.Map; import java.util.Properties; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; -import org.mockito.MockitoAnnotations; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; /** Unit tests for {@link SimpleSpanProcessor}. */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) class SimpleSpanProcessorTest { private static final long MAX_SCHEDULE_DELAY_MILLIS = 500; private static final String SPAN_NAME = "MySpanName"; @@ -69,7 +74,6 @@ class SimpleSpanProcessorTest { @BeforeEach void setUp() { - MockitoAnnotations.initMocks(this); simpleSampledSpansProcessor = SimpleSpanProcessor.newBuilder(spanExporter).build(); } diff --git a/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessor.java b/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessor.java index 8496c93ec7..20800058fe 100644 --- a/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessor.java +++ b/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessor.java @@ -19,6 +19,7 @@ package io.opentelemetry.sdk.extensions.trace.export; import com.google.common.base.Preconditions; import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.WaitStrategy; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; @@ -91,13 +92,13 @@ public final class DisruptorAsyncSpanProcessor implements SpanProcessor { } @Override - public void shutdown() { - disruptorEventQueue.shutdown(); + public CompletableResultCode shutdown() { + return disruptorEventQueue.shutdown(); } @Override - public void forceFlush() { - disruptorEventQueue.forceFlush(); + public CompletableResultCode forceFlush() { + return disruptorEventQueue.forceFlush(); } /** diff --git a/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorEventQueue.java b/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorEventQueue.java index 93b814b69f..e5ab322662 100644 --- a/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorEventQueue.java +++ b/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorEventQueue.java @@ -23,11 +23,11 @@ import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.DaemonThreadFactory; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -42,17 +42,19 @@ import javax.annotation.concurrent.ThreadSafe; final class DisruptorEventQueue { private static final Logger logger = Logger.getLogger(DisruptorEventQueue.class.getName()); private static final String WORKER_THREAD_NAME = "DisruptorEventQueue_WorkerThread"; - private static final EventTranslatorThreeArg + + private static final EventTranslatorThreeArg< + DisruptorEvent, EventType, Object, CompletableResultCode> TRANSLATOR_THREE_ARG = - new EventTranslatorThreeArg() { + new EventTranslatorThreeArg() { @Override public void translateTo( DisruptorEvent event, long sequence, EventType eventType, Object span, - CountDownLatch countDownLatch) { - event.setEntry(eventType, span, countDownLatch); + CompletableResultCode result) { + event.setEntry(eventType, span, result); } }; private static final EventFactory EVENT_FACTORY = @@ -68,12 +70,6 @@ final class DisruptorEventQueue { private volatile boolean isShutdown = false; private final boolean blocking; - /** - * Only one consumer for {@link DisruptorEventQueue#forceFlush()} and {@link - * DisruptorEventQueue#shutdown()} invocation. - */ - private static final byte NUM_CONSUMERS = 1; - private enum EventType { ON_START, ON_END, @@ -121,47 +117,41 @@ final class DisruptorEventQueue { // Shuts down the underlying disruptor. Ensures that when this method returns the disruptor is // shutdown. - void shutdown() { + CompletableResultCode shutdown() { synchronized (this) { if (isShutdown) { // Race condition between two calls to shutdown. The other call already finished. - return; + return CompletableResultCode.ofSuccess(); } isShutdown = true; - enqueueAndLock(EventType.ON_SHUTDOWN); + return enqueueWithResult(EventType.ON_SHUTDOWN); } } // Force to publish the ended spans to the SpanProcessor - void forceFlush() { + CompletableResultCode forceFlush() { if (isShutdown) { if (!loggedShutdownMessage.getAndSet(true)) { logger.info("Attempted to flush after Disruptor shutdown."); } - return; + return CompletableResultCode.ofFailure(); } - enqueueAndLock(EventType.ON_FORCE_FLUSH); + return enqueueWithResult(EventType.ON_FORCE_FLUSH); } - private void enqueueAndLock(EventType event) { - CountDownLatch waitingCounter = new CountDownLatch(NUM_CONSUMERS); // only one processor. - enqueue(event, null, waitingCounter); - try { - waitingCounter.await(); - } catch (InterruptedException e) { - // Preserve the interruption. - Thread.currentThread().interrupt(); - logger.warning("Thread interrupted, shutdown may not finished."); - } + private CompletableResultCode enqueueWithResult(EventType event) { + CompletableResultCode result = new CompletableResultCode(); + enqueue(event, null, result); + return result; } // Enqueues an event on the {@link DisruptorEventQueue}. - private void enqueue(EventType eventType, Object span, CountDownLatch flushLatch) { + private void enqueue(EventType eventType, Object span, CompletableResultCode result) { if (blocking) { - ringBuffer.publishEvent(TRANSLATOR_THREE_ARG, eventType, span, flushLatch); + ringBuffer.publishEvent(TRANSLATOR_THREE_ARG, eventType, span, result); } else { // TODO: Record metrics if element not added. - ringBuffer.tryPublishEvent(TRANSLATOR_THREE_ARG, eventType, span, flushLatch); + ringBuffer.tryPublishEvent(TRANSLATOR_THREE_ARG, eventType, span, result); } } @@ -169,13 +159,15 @@ final class DisruptorEventQueue { private static final class DisruptorEvent { @Nullable private Object span = null; @Nullable private EventType eventType = null; - @Nullable private CountDownLatch waitingCounter = null; + @Nullable private CompletableResultCode result = null; void setEntry( - @Nullable EventType eventType, @Nullable Object span, @Nullable CountDownLatch flushLatch) { + @Nullable EventType eventType, + @Nullable Object span, + @Nullable CompletableResultCode result) { this.span = span; this.eventType = eventType; - this.waitingCounter = flushLatch; + this.result = result; } @Nullable @@ -188,9 +180,15 @@ final class DisruptorEventQueue { return eventType; } - void countDownWaitingCounter() { - if (waitingCounter != null) { - waitingCounter.countDown(); + void succeed() { + if (result != null) { + result.succeed(); + } + } + + void fail() { + if (result != null) { + result.fail(); } } } @@ -203,7 +201,7 @@ final class DisruptorEventQueue { } @Override - public void onEvent(DisruptorEvent event, long sequence, boolean endOfBatch) { + public void onEvent(final DisruptorEvent event, long sequence, boolean endOfBatch) { final Object readableSpan = event.getSpan(); final EventType eventType = event.getEventType(); if (eventType == null) { @@ -219,12 +217,11 @@ final class DisruptorEventQueue { spanProcessor.onEnd((ReadableSpan) readableSpan); break; case ON_SHUTDOWN: - spanProcessor.shutdown(); - event.countDownWaitingCounter(); + propagateResult(spanProcessor.shutdown(), event); break; case ON_FORCE_FLUSH: - spanProcessor.forceFlush(); - event.countDownWaitingCounter(); + propagateResult(spanProcessor.forceFlush(), event); + break; } } finally { @@ -233,4 +230,19 @@ final class DisruptorEventQueue { } } } + + private static void propagateResult( + final CompletableResultCode result, final DisruptorEvent event) { + result.whenComplete( + new Runnable() { + @Override + public void run() { + if (result.isSuccess()) { + event.succeed(); + } else { + event.fail(); + } + } + }); + } } diff --git a/sdk_extensions/async_processor/src/test/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessorTest.java b/sdk_extensions/async_processor/src/test/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessorTest.java index 2234bf1a0f..5b9656e90d 100644 --- a/sdk_extensions/async_processor/src/test/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessorTest.java +++ b/sdk_extensions/async_processor/src/test/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessorTest.java @@ -18,6 +18,7 @@ package io.opentelemetry.sdk.extensions.trace.export; import static org.assertj.core.api.Assertions.assertThat; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.trace.MultiSpanProcessor; import io.opentelemetry.sdk.trace.ReadWriteSpan; @@ -26,14 +27,16 @@ import io.opentelemetry.sdk.trace.SpanProcessor; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; +import org.mockito.junit.jupiter.MockitoExtension; /** Unit tests for {@link DisruptorAsyncSpanProcessor}. */ +@ExtendWith(MockitoExtension.class) class DisruptorAsyncSpanProcessorTest { private static final boolean REQUIRED = true; private static final boolean NOT_REQUIRED = false; @@ -41,11 +44,6 @@ class DisruptorAsyncSpanProcessorTest { @Mock private ReadableSpan readableSpan; @Mock private ReadWriteSpan readWriteSpan; - @BeforeEach - void setUp() { - MockitoAnnotations.initMocks(this); - } - // EventQueueEntry for incrementing a Counter. private static class IncrementSpanProcessor implements SpanProcessor { private final AtomicInteger counterOnStart = new AtomicInteger(0); @@ -85,15 +83,17 @@ class DisruptorAsyncSpanProcessorTest { } @Override - public void shutdown() { + public CompletableResultCode shutdown() { counterOnShutdown.incrementAndGet(); + return CompletableResultCode.ofSuccess(); } @Override - public void forceFlush() { + public CompletableResultCode forceFlush() { counterOnForceFlush.incrementAndGet(); deltaExportedForceFlushSpans.set(counterEndSpans.getAndSet(0)); counterOnExportedForceFlushSpans.addAndGet(deltaExportedForceFlushSpans.get()); + return CompletableResultCode.ofSuccess(); } private int getCounterOnStart() { @@ -132,8 +132,8 @@ class DisruptorAsyncSpanProcessorTest { assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0); disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); - disruptorAsyncSpanProcessor.forceFlush(); - disruptorAsyncSpanProcessor.shutdown(); + disruptorAsyncSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS); + disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(1); assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(1); assertThat(incrementSpanProcessor.getCounterOnForceFlush()).isEqualTo(1); @@ -152,8 +152,8 @@ class DisruptorAsyncSpanProcessorTest { assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0); disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); - disruptorAsyncSpanProcessor.forceFlush(); - disruptorAsyncSpanProcessor.shutdown(); + disruptorAsyncSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS); + disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0); assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(1); assertThat(incrementSpanProcessor.getCounterOnForceFlush()).isEqualTo(1); @@ -172,8 +172,8 @@ class DisruptorAsyncSpanProcessorTest { assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0); disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); - disruptorAsyncSpanProcessor.forceFlush(); - disruptorAsyncSpanProcessor.shutdown(); + disruptorAsyncSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS); + disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(1); assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0); assertThat(incrementSpanProcessor.getCounterOnForceFlush()).isEqualTo(1); @@ -185,11 +185,11 @@ class DisruptorAsyncSpanProcessorTest { IncrementSpanProcessor incrementSpanProcessor = new IncrementSpanProcessor(REQUIRED, REQUIRED); DisruptorAsyncSpanProcessor disruptorAsyncSpanProcessor = DisruptorAsyncSpanProcessor.newBuilder(incrementSpanProcessor).build(); - disruptorAsyncSpanProcessor.shutdown(); - disruptorAsyncSpanProcessor.shutdown(); - disruptorAsyncSpanProcessor.shutdown(); - disruptorAsyncSpanProcessor.shutdown(); - disruptorAsyncSpanProcessor.shutdown(); + disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); + disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); + disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); + disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); + disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); assertThat(incrementSpanProcessor.getCounterOnShutdown()).isEqualTo(1); } @@ -198,14 +198,14 @@ class DisruptorAsyncSpanProcessorTest { IncrementSpanProcessor incrementSpanProcessor = new IncrementSpanProcessor(REQUIRED, REQUIRED); DisruptorAsyncSpanProcessor disruptorAsyncSpanProcessor = DisruptorAsyncSpanProcessor.newBuilder(incrementSpanProcessor).build(); - disruptorAsyncSpanProcessor.shutdown(); + disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); - disruptorAsyncSpanProcessor.forceFlush(); + disruptorAsyncSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS); assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0); assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0); assertThat(incrementSpanProcessor.getCounterOnForceFlush()).isEqualTo(0); - disruptorAsyncSpanProcessor.shutdown(); + disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); assertThat(incrementSpanProcessor.getCounterOnShutdown()).isEqualTo(1); } @@ -219,13 +219,13 @@ class DisruptorAsyncSpanProcessorTest { disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); if (i % 10 == 0) { - disruptorAsyncSpanProcessor.forceFlush(); + disruptorAsyncSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS); } } assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(tenK); assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(tenK); assertThat(incrementSpanProcessor.getCounterOnForceFlush()).isEqualTo(tenK / 10); - disruptorAsyncSpanProcessor.shutdown(); + disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); assertThat(incrementSpanProcessor.getCounterOnShutdown()).isEqualTo(1); } @@ -240,7 +240,7 @@ class DisruptorAsyncSpanProcessorTest { .build(); disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); - disruptorAsyncSpanProcessor.shutdown(); + disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); assertThat(incrementSpanProcessor1.getCounterOnStart()).isEqualTo(1); assertThat(incrementSpanProcessor1.getCounterOnEnd()).isEqualTo(1); assertThat(incrementSpanProcessor1.getCounterOnShutdown()).isEqualTo(1); @@ -261,11 +261,11 @@ class DisruptorAsyncSpanProcessorTest { disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); if (i % 100 == 0) { - disruptorAsyncSpanProcessor.forceFlush(); + disruptorAsyncSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS); assertThat(incrementSpanProcessor.getDeltaExportedForceFlushSpans()).isEqualTo(100); } } - disruptorAsyncSpanProcessor.shutdown(); + disruptorAsyncSpanProcessor.shutdown().join(10, TimeUnit.SECONDS); assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(tenK); assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(tenK); assertThat(incrementSpanProcessor.getCounterOnExportedForceFlushSpans()).isEqualTo(tenK); diff --git a/sdk_extensions/zpages/src/main/java/io/opentelemetry/sdk/extensions/zpages/TracezSpanProcessor.java b/sdk_extensions/zpages/src/main/java/io/opentelemetry/sdk/extensions/zpages/TracezSpanProcessor.java index c952255b00..0f4e71de08 100644 --- a/sdk_extensions/zpages/src/main/java/io/opentelemetry/sdk/extensions/zpages/TracezSpanProcessor.java +++ b/sdk_extensions/zpages/src/main/java/io/opentelemetry/sdk/extensions/zpages/TracezSpanProcessor.java @@ -16,6 +16,7 @@ package io.opentelemetry.sdk.extensions.zpages; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; @@ -89,13 +90,15 @@ final class TracezSpanProcessor implements SpanProcessor { } @Override - public void shutdown() { + public CompletableResultCode shutdown() { // Do nothing. + return CompletableResultCode.ofSuccess(); } @Override - public void forceFlush() { + public CompletableResultCode forceFlush() { // Do nothing. + return CompletableResultCode.ofSuccess(); } /**