Add shutdown / close to OpenTelemetrySdk (#5100)
* Add shutdown / close to OpenTelemetrySdk * Shutdown hook closes instead of shutdown
This commit is contained in:
parent
50e7a17309
commit
4df4a0ae09
|
@ -1,2 +1,5 @@
|
||||||
Comparing source compatibility of against
|
Comparing source compatibility of against
|
||||||
No changes.
|
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.OpenTelemetrySdk (not serializable)
|
||||||
|
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
|
||||||
|
+++ NEW METHOD: PUBLIC(+) void close()
|
||||||
|
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.CompletableResultCode shutdown()
|
||||||
|
|
|
@ -10,7 +10,6 @@ import eu.rekawek.toxiproxy.ToxiproxyClient;
|
||||||
import eu.rekawek.toxiproxy.model.ToxicDirection;
|
import eu.rekawek.toxiproxy.model.ToxicDirection;
|
||||||
import eu.rekawek.toxiproxy.model.ToxicList;
|
import eu.rekawek.toxiproxy.model.ToxicList;
|
||||||
import eu.rekawek.toxiproxy.model.toxic.Timeout;
|
import eu.rekawek.toxiproxy.model.toxic.Timeout;
|
||||||
import io.opentelemetry.api.OpenTelemetry;
|
|
||||||
import io.opentelemetry.api.common.Attributes;
|
import io.opentelemetry.api.common.Attributes;
|
||||||
import io.opentelemetry.api.trace.Span;
|
import io.opentelemetry.api.trace.Span;
|
||||||
import io.opentelemetry.api.trace.Tracer;
|
import io.opentelemetry.api.trace.Tracer;
|
||||||
|
@ -106,9 +105,7 @@ public class OtlpPipelineStressTest {
|
||||||
|
|
||||||
private final InMemoryMetricExporter metricExporter = InMemoryMetricExporter.create();
|
private final InMemoryMetricExporter metricExporter = InMemoryMetricExporter.create();
|
||||||
|
|
||||||
private SdkTracerProvider sdkTracerProvider;
|
private OpenTelemetrySdk openTelemetry;
|
||||||
private OpenTelemetry openTelemetry;
|
|
||||||
private SdkMeterProvider meterProvider;
|
|
||||||
private Proxy collectorProxy;
|
private Proxy collectorProxy;
|
||||||
private ToxiproxyClient toxiproxyClient;
|
private ToxiproxyClient toxiproxyClient;
|
||||||
|
|
||||||
|
@ -134,8 +131,7 @@ public class OtlpPipelineStressTest {
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
void tearDown() throws IOException {
|
void tearDown() throws IOException {
|
||||||
meterProvider.close();
|
openTelemetry.close();
|
||||||
sdkTracerProvider.shutdown();
|
|
||||||
|
|
||||||
toxiproxyClient.reset();
|
toxiproxyClient.reset();
|
||||||
collectorProxy.delete();
|
collectorProxy.delete();
|
||||||
|
@ -188,7 +184,7 @@ public class OtlpPipelineStressTest {
|
||||||
|
|
||||||
Thread.sleep(10000);
|
Thread.sleep(10000);
|
||||||
List<MetricData> finishedMetricItems = metricExporter.getFinishedMetricItems();
|
List<MetricData> finishedMetricItems = metricExporter.getFinishedMetricItems();
|
||||||
meterProvider.close();
|
openTelemetry.getSdkMeterProvider().close();
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
reportMetrics(finishedMetricItems);
|
reportMetrics(finishedMetricItems);
|
||||||
Thread.sleep(10000);
|
Thread.sleep(10000);
|
||||||
|
@ -250,7 +246,7 @@ public class OtlpPipelineStressTest {
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
// set up the metric exporter and wire it into the SDK and a timed reader.
|
// set up the metric exporter and wire it into the SDK and a timed reader.
|
||||||
meterProvider =
|
SdkMeterProvider meterProvider =
|
||||||
SdkMeterProvider.builder()
|
SdkMeterProvider.builder()
|
||||||
.setResource(resource)
|
.setResource(resource)
|
||||||
.registerMetricReader(
|
.registerMetricReader(
|
||||||
|
@ -281,7 +277,9 @@ public class OtlpPipelineStressTest {
|
||||||
SdkTracerProvider tracerProvider =
|
SdkTracerProvider tracerProvider =
|
||||||
SdkTracerProvider.builder().addSpanProcessor(spanProcessor).build();
|
SdkTracerProvider.builder().addSpanProcessor(spanProcessor).build();
|
||||||
openTelemetry =
|
openTelemetry =
|
||||||
OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).buildAndRegisterGlobal();
|
OpenTelemetrySdk.builder()
|
||||||
sdkTracerProvider = tracerProvider;
|
.setTracerProvider(tracerProvider)
|
||||||
|
.setMeterProvider(meterProvider)
|
||||||
|
.buildAndRegisterGlobal();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@ import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer;
|
||||||
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider;
|
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider;
|
||||||
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
|
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
|
||||||
import io.opentelemetry.sdk.autoconfigure.spi.internal.DefaultConfigProperties;
|
import io.opentelemetry.sdk.autoconfigure.spi.internal.DefaultConfigProperties;
|
||||||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
|
||||||
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
|
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
|
||||||
import io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder;
|
import io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder;
|
||||||
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
|
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
|
||||||
|
@ -35,7 +34,6 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.ServiceLoader;
|
import java.util.ServiceLoader;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
@ -365,19 +363,6 @@ public final class AutoConfiguredOpenTelemetrySdkBuilder implements AutoConfigur
|
||||||
loggerProviderBuilder = loggerProviderCustomizer.apply(loggerProviderBuilder, config);
|
loggerProviderBuilder = loggerProviderCustomizer.apply(loggerProviderBuilder, config);
|
||||||
SdkLoggerProvider loggerProvider = loggerProviderBuilder.build();
|
SdkLoggerProvider loggerProvider = loggerProviderBuilder.build();
|
||||||
|
|
||||||
if (registerShutdownHook) {
|
|
||||||
Runtime.getRuntime()
|
|
||||||
.addShutdownHook(
|
|
||||||
new Thread(
|
|
||||||
() -> {
|
|
||||||
List<CompletableResultCode> shutdown = new ArrayList<>();
|
|
||||||
shutdown.add(tracerProvider.shutdown());
|
|
||||||
shutdown.add(meterProvider.shutdown());
|
|
||||||
shutdown.add(loggerProvider.shutdown());
|
|
||||||
CompletableResultCode.ofAll(shutdown).join(10, TimeUnit.SECONDS);
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
ContextPropagators propagators =
|
ContextPropagators propagators =
|
||||||
PropagatorConfiguration.configurePropagators(
|
PropagatorConfiguration.configurePropagators(
|
||||||
config, serviceClassLoader, propagatorCustomizer);
|
config, serviceClassLoader, propagatorCustomizer);
|
||||||
|
@ -390,6 +375,10 @@ public final class AutoConfiguredOpenTelemetrySdkBuilder implements AutoConfigur
|
||||||
.setPropagators(propagators);
|
.setPropagators(propagators);
|
||||||
|
|
||||||
openTelemetrySdk = sdkBuilder.build();
|
openTelemetrySdk = sdkBuilder.build();
|
||||||
|
|
||||||
|
if (registerShutdownHook) {
|
||||||
|
Runtime.getRuntime().addShutdownHook(new Thread(openTelemetrySdk::close));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (setResultAsGlobal) {
|
if (setResultAsGlobal) {
|
||||||
|
|
|
@ -413,7 +413,7 @@ class AutoConfiguredOpenTelemetrySdkTest {
|
||||||
assertThat(spanData.getResource().getAttribute(stringKey("cat"))).isEqualTo("meow");
|
assertThat(spanData.getResource().getAttribute(stringKey("cat"))).isEqualTo("meow");
|
||||||
|
|
||||||
// Ensures the export happened.
|
// Ensures the export happened.
|
||||||
sdk.getSdkTracerProvider().shutdown().join(10, TimeUnit.SECONDS);
|
sdk.shutdown().join(10, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -163,21 +163,7 @@ class FullConfigTest {
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
void afterEach() {
|
void afterEach() {
|
||||||
autoConfiguredOpenTelemetrySdk
|
autoConfiguredOpenTelemetrySdk.getOpenTelemetrySdk().shutdown().join(10, TimeUnit.SECONDS);
|
||||||
.getOpenTelemetrySdk()
|
|
||||||
.getSdkMeterProvider()
|
|
||||||
.shutdown()
|
|
||||||
.join(10, TimeUnit.SECONDS);
|
|
||||||
autoConfiguredOpenTelemetrySdk
|
|
||||||
.getOpenTelemetrySdk()
|
|
||||||
.getSdkLoggerProvider()
|
|
||||||
.shutdown()
|
|
||||||
.join(10, TimeUnit.SECONDS);
|
|
||||||
autoConfiguredOpenTelemetrySdk
|
|
||||||
.getOpenTelemetrySdk()
|
|
||||||
.getSdkTracerProvider()
|
|
||||||
.shutdown()
|
|
||||||
.join(10, TimeUnit.SECONDS);
|
|
||||||
GlobalOpenTelemetry.resetForTest();
|
GlobalOpenTelemetry.resetForTest();
|
||||||
GlobalLoggerProvider.resetForTest();
|
GlobalLoggerProvider.resetForTest();
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,14 +12,25 @@ import io.opentelemetry.api.trace.Tracer;
|
||||||
import io.opentelemetry.api.trace.TracerBuilder;
|
import io.opentelemetry.api.trace.TracerBuilder;
|
||||||
import io.opentelemetry.api.trace.TracerProvider;
|
import io.opentelemetry.api.trace.TracerProvider;
|
||||||
import io.opentelemetry.context.propagation.ContextPropagators;
|
import io.opentelemetry.context.propagation.ContextPropagators;
|
||||||
|
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||||
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
|
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
|
||||||
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
|
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
|
||||||
import io.opentelemetry.sdk.trace.SdkTracerProvider;
|
import io.opentelemetry.sdk.trace.SdkTracerProvider;
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.logging.Logger;
|
||||||
import javax.annotation.concurrent.ThreadSafe;
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
||||||
/** The SDK implementation of {@link OpenTelemetry}. */
|
/** The SDK implementation of {@link OpenTelemetry}. */
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
public final class OpenTelemetrySdk implements OpenTelemetry {
|
public final class OpenTelemetrySdk implements OpenTelemetry, Closeable {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = Logger.getLogger(OpenTelemetrySdk.class.getName());
|
||||||
|
|
||||||
|
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
|
||||||
private final ObfuscatedTracerProvider tracerProvider;
|
private final ObfuscatedTracerProvider tracerProvider;
|
||||||
private final ObfuscatedMeterProvider meterProvider;
|
private final ObfuscatedMeterProvider meterProvider;
|
||||||
private final SdkLoggerProvider loggerProvider;
|
private final SdkLoggerProvider loggerProvider;
|
||||||
|
@ -78,6 +89,29 @@ public final class OpenTelemetrySdk implements OpenTelemetry {
|
||||||
return propagators;
|
return propagators;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown the SDK. Calls {@link SdkTracerProvider#shutdown()}, {@link
|
||||||
|
* SdkMeterProvider#shutdown()}, and {@link SdkLoggerProvider#shutdown()}.
|
||||||
|
*
|
||||||
|
* @return a {@link CompletableResultCode} which completes when all providers are shutdown
|
||||||
|
*/
|
||||||
|
public CompletableResultCode shutdown() {
|
||||||
|
if (!isShutdown.compareAndSet(false, true)) {
|
||||||
|
LOGGER.info("Multiple shutdown calls");
|
||||||
|
return CompletableResultCode.ofSuccess();
|
||||||
|
}
|
||||||
|
List<CompletableResultCode> results = new ArrayList<>();
|
||||||
|
results.add(tracerProvider.unobfuscate().shutdown());
|
||||||
|
results.add(meterProvider.unobfuscate().shutdown());
|
||||||
|
results.add(loggerProvider.shutdown());
|
||||||
|
return CompletableResultCode.ofAll(results);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
shutdown().join(10, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "OpenTelemetrySdk{"
|
return "OpenTelemetrySdk{"
|
||||||
|
|
|
@ -10,14 +10,18 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||||
import static org.assertj.core.api.InstanceOfAssertFactories.type;
|
import static org.assertj.core.api.InstanceOfAssertFactories.type;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import io.github.netmikey.logunit.api.LogCapturer;
|
||||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||||
import io.opentelemetry.api.OpenTelemetry;
|
import io.opentelemetry.api.OpenTelemetry;
|
||||||
import io.opentelemetry.api.common.AttributeKey;
|
import io.opentelemetry.api.common.AttributeKey;
|
||||||
import io.opentelemetry.context.propagation.ContextPropagators;
|
import io.opentelemetry.context.propagation.ContextPropagators;
|
||||||
import io.opentelemetry.context.propagation.TextMapPropagator;
|
import io.opentelemetry.context.propagation.TextMapPropagator;
|
||||||
import io.opentelemetry.sdk.common.Clock;
|
import io.opentelemetry.sdk.common.Clock;
|
||||||
|
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||||
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
|
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
|
||||||
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
|
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
|
||||||
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
|
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
|
||||||
|
@ -36,15 +40,21 @@ import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
|
||||||
import io.opentelemetry.sdk.trace.export.SpanExporter;
|
import io.opentelemetry.sdk.trace.export.SpanExporter;
|
||||||
import io.opentelemetry.sdk.trace.samplers.Sampler;
|
import io.opentelemetry.sdk.trace.samplers.Sampler;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
class OpenTelemetrySdkTest {
|
class OpenTelemetrySdkTest {
|
||||||
|
|
||||||
|
@RegisterExtension
|
||||||
|
LogCapturer logCapturer = LogCapturer.create().captureForLogger(OpenTelemetrySdk.class.getName());
|
||||||
|
|
||||||
@Mock private MetricExporter metricExporter;
|
@Mock private MetricExporter metricExporter;
|
||||||
@Mock private SdkTracerProvider tracerProvider;
|
@Mock private SdkTracerProvider tracerProvider;
|
||||||
@Mock private SdkMeterProvider meterProvider;
|
@Mock private SdkMeterProvider meterProvider;
|
||||||
|
@ -318,6 +328,42 @@ class OpenTelemetrySdkTest {
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void shutdown() {
|
||||||
|
when(tracerProvider.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
|
||||||
|
when(meterProvider.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
|
||||||
|
when(loggerProvider.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
|
||||||
|
|
||||||
|
OpenTelemetrySdk sdk =
|
||||||
|
OpenTelemetrySdk.builder()
|
||||||
|
.setTracerProvider(tracerProvider)
|
||||||
|
.setMeterProvider(meterProvider)
|
||||||
|
.setLoggerProvider(loggerProvider)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// First call should call shutdown
|
||||||
|
assertThat(sdk.shutdown().join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
|
||||||
|
verify(tracerProvider).shutdown();
|
||||||
|
verify(meterProvider).shutdown();
|
||||||
|
verify(loggerProvider).shutdown();
|
||||||
|
assertThat(logCapturer.getEvents()).isEmpty();
|
||||||
|
|
||||||
|
// Subsequent calls should log not call shutdown
|
||||||
|
Mockito.reset(tracerProvider, meterProvider, loggerProvider);
|
||||||
|
assertThat(sdk.shutdown().join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
|
||||||
|
sdk.close();
|
||||||
|
|
||||||
|
verify(tracerProvider, never()).shutdown();
|
||||||
|
verify(meterProvider, never()).shutdown();
|
||||||
|
verify(loggerProvider, never()).shutdown();
|
||||||
|
|
||||||
|
assertThat(logCapturer.getEvents())
|
||||||
|
.hasSize(2)
|
||||||
|
.allSatisfy(
|
||||||
|
loggingEvent ->
|
||||||
|
assertThat(loggingEvent.getMessage()).isEqualTo("Multiple shutdown calls"));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void stringRepresentation() {
|
void stringRepresentation() {
|
||||||
SpanExporter spanExporter = mock(SpanExporter.class);
|
SpanExporter spanExporter = mock(SpanExporter.class);
|
||||||
|
|
Loading…
Reference in New Issue