Add proper shutdown implementations for all exporters (#5113)

This commit is contained in:
jack-berg 2023-01-13 21:27:03 -06:00 committed by GitHub
parent 40d9c64164
commit e82ab27582
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 409 additions and 301 deletions

View File

@ -63,6 +63,7 @@ public final class OkHttpGrpcExporter<T extends Marshaler> implements GrpcExport
// We only log unimplemented once since it's a configuration issue that won't be recovered.
private final AtomicBoolean loggedUnimplemented = new AtomicBoolean();
private final AtomicBoolean isShutdown = new AtomicBoolean();
private final String type;
private final ExporterMetrics exporterMetrics;
@ -91,6 +92,10 @@ public final class OkHttpGrpcExporter<T extends Marshaler> implements GrpcExport
@Override
public CompletableResultCode export(T exportRequest, int numItems) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}
exporterMetrics.addSeen(numItems);
Request.Builder requestBuilder = new Request.Builder().url(url).headers(headers);
@ -209,6 +214,10 @@ public final class OkHttpGrpcExporter<T extends Marshaler> implements GrpcExport
@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.WARNING, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
client.dispatcher().cancelAll();
client.dispatcher().executorService().shutdownNow();
client.connectionPool().evictAll();

View File

@ -36,6 +36,7 @@ public final class UpstreamGrpcExporter<T extends Marshaler> implements GrpcExpo
// We only log unavailable once since it's a configuration issue that won't be recovered.
private final AtomicBoolean loggedUnimplemented = new AtomicBoolean();
private final AtomicBoolean isShutdown = new AtomicBoolean();
private final String type;
private final ExporterMetrics exporterMetrics;
@ -57,6 +58,10 @@ public final class UpstreamGrpcExporter<T extends Marshaler> implements GrpcExpo
@Override
public CompletableResultCode export(T exportRequest, int numItems) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}
exporterMetrics.addSeen(numItems);
CompletableResultCode result = new CompletableResultCode();
@ -118,6 +123,9 @@ public final class UpstreamGrpcExporter<T extends Marshaler> implements GrpcExpo
@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.WARNING, "Calling shutdown() multiple times.");
}
return CompletableResultCode.ofSuccess();
}
}

View File

@ -13,6 +13,7 @@ import io.opentelemetry.exporter.internal.retry.RetryUtil;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.logging.Level;
@ -44,6 +45,7 @@ public final class OkHttpExporter<T extends Marshaler> {
private static final Logger internalLogger = Logger.getLogger(OkHttpExporter.class.getName());
private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger);
private final AtomicBoolean isShutdown = new AtomicBoolean();
private final String type;
private final OkHttpClient client;
@ -76,6 +78,10 @@ public final class OkHttpExporter<T extends Marshaler> {
}
public CompletableResultCode export(T exportRequest, int numItems) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}
exporterMetrics.addSeen(numItems);
Request.Builder requestBuilder = new Request.Builder().url(url);
@ -139,11 +145,14 @@ public final class OkHttpExporter<T extends Marshaler> {
}
public CompletableResultCode shutdown() {
CompletableResultCode result = CompletableResultCode.ofSuccess();
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.WARNING, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
client.dispatcher().cancelAll();
client.dispatcher().executorService().shutdownNow();
client.connectionPool().evictAll();
return result;
return CompletableResultCode.ofSuccess();
}
static boolean isRetryable(Response response) {

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@ -43,6 +44,7 @@ public final class JaegerThriftSpanExporter implements SpanExporter {
private final ThrottlingLogger logger =
new ThrottlingLogger(Logger.getLogger(JaegerThriftSpanExporter.class.getName()));
private final AtomicBoolean isShutdown = new AtomicBoolean();
private final ThriftSender thriftSender;
private final Process process;
@ -82,13 +84,16 @@ public final class JaegerThriftSpanExporter implements SpanExporter {
*/
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}
Map<Process, List<Span>> batches =
spans.stream().collect(Collectors.groupingBy(SpanData::getResource)).entrySet().stream()
.collect(
Collectors.toMap(
entry -> createProcess(entry.getKey()),
entry -> Adapter.toJaeger(entry.getValue())));
List<CompletableResultCode> batchResults = new ArrayList<>(batches.size());
batches.forEach(
(process, jaegerSpans) -> {
@ -148,13 +153,9 @@ public final class JaegerThriftSpanExporter implements SpanExporter {
*/
@Override
public CompletableResultCode shutdown() {
CompletableResultCode result = new CompletableResultCode();
// todo
return result.succeed();
}
// Visible for testing
Process getProcess() {
return process;
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.WARNING, "Calling shutdown() multiple times.");
}
return CompletableResultCode.ofSuccess();
}
}

View File

@ -8,6 +8,7 @@ package io.opentelemetry.exporter.jaeger.thrift;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import io.github.netmikey.logunit.api.LogCapturer;
import io.jaegertracing.internal.exceptions.SenderException;
import io.jaegertracing.thrift.internal.senders.ThriftSender;
import io.jaegertracing.thriftjava.Process;
@ -22,6 +23,7 @@ import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.resources.Resource;
@ -31,6 +33,7 @@ import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -38,6 +41,7 @@ import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ -55,6 +59,10 @@ class JaegerThriftSpanExporterTest {
SpanContext.create(TRACE_ID, SPAN_ID, TraceFlags.getDefault(), TraceState.getDefault());
private static final SpanContext SPAN_CONTEXT_2 =
SpanContext.create(TRACE_ID, SPAN_ID_2, TraceFlags.getDefault(), TraceState.getDefault());
private static final Duration DURATION = Duration.ofMillis(900);
@RegisterExtension
LogCapturer logs = LogCapturer.create().captureForType(JaegerThriftSpanExporter.class);
private JaegerThriftSpanExporter exporter;
@Mock private ThriftSender thriftSender;
@ -66,34 +74,17 @@ class JaegerThriftSpanExporterTest {
@Test
void testExport() throws SenderException, UnknownHostException {
long duration = 900; // ms
long startMs = System.currentTimeMillis();
long endMs = startMs + duration;
SpanData span =
TestSpanData.builder()
.setHasEnded(true)
.setSpanContext(SPAN_CONTEXT)
.setParentSpanContext(SPAN_CONTEXT_2)
.setName("GET /api/endpoint")
.setStartEpochNanos(TimeUnit.MILLISECONDS.toNanos(startMs))
.setEndEpochNanos(TimeUnit.MILLISECONDS.toNanos(endMs))
.setStatus(StatusData.ok())
.setKind(SpanKind.CONSUMER)
.setLinks(Collections.emptyList())
.setTotalRecordedLinks(0)
.setTotalRecordedEvents(0)
.setInstrumentationScopeInfo(
InstrumentationScopeInfo.builder("io.opentelemetry.auto")
.setVersion("1.0.0")
.build())
.setResource(
Resource.create(
Attributes.of(
ResourceAttributes.SERVICE_NAME,
"myServiceName",
AttributeKey.stringKey("resource-attr-key"),
"resource-attr-value")))
.build();
testSpanData(
Resource.create(
Attributes.of(
ResourceAttributes.SERVICE_NAME,
"myServiceName",
AttributeKey.stringKey("resource-attr-key"),
"resource-attr-value")),
"GET /api/endpoint",
SPAN_CONTEXT,
SPAN_CONTEXT_2);
// test
CompletableResultCode result = exporter.export(Collections.singletonList(span));
@ -126,8 +117,8 @@ class JaegerThriftSpanExporterTest {
.setTraceIdLow(TRACE_ID_LOW)
.setRefType(SpanRefType.CHILD_OF)))
.setParentSpanId(SPAN_ID_2_LONG)
.setStartTime(TimeUnit.MILLISECONDS.toMicros(startMs))
.setDuration(TimeUnit.MILLISECONDS.toMicros(duration))
.setStartTime(TimeUnit.NANOSECONDS.toMicros(span.getStartEpochNanos()))
.setDuration(DURATION.toMillis() * 1000)
.setLogs(Collections.emptyList());
expectedSpan.addToTags(new Tag("span.kind", TagType.STRING).setVStr("consumer"));
expectedSpan.addToTags(new Tag("otel.status_code", TagType.STRING).setVStr("OK"));
@ -144,58 +135,29 @@ class JaegerThriftSpanExporterTest {
@Test
void testExportMultipleResources() throws SenderException, UnknownHostException {
long duration = 900; // ms
long startMs = System.currentTimeMillis();
long endMs = startMs + duration;
SpanData span =
TestSpanData.builder()
.setHasEnded(true)
.setSpanContext(SPAN_CONTEXT)
.setName("GET /api/endpoint/1")
.setStartEpochNanos(TimeUnit.MILLISECONDS.toNanos(startMs))
.setEndEpochNanos(TimeUnit.MILLISECONDS.toNanos(endMs))
.setStatus(StatusData.ok())
.setKind(SpanKind.CONSUMER)
.setLinks(Collections.emptyList())
.setTotalRecordedLinks(0)
.setTotalRecordedEvents(0)
.setInstrumentationScopeInfo(
InstrumentationScopeInfo.builder("io.opentelemetry.auto")
.setVersion("1.0.0")
.build())
.setResource(
Resource.create(
Attributes.of(
ResourceAttributes.SERVICE_NAME,
"myServiceName1",
AttributeKey.stringKey("resource-attr-key-1"),
"resource-attr-value-1")))
.build();
testSpanData(
Resource.create(
Attributes.of(
ResourceAttributes.SERVICE_NAME,
"myServiceName1",
AttributeKey.stringKey("resource-attr-key-1"),
"resource-attr-value-1")),
"GET /api/endpoint/1",
SPAN_CONTEXT,
SpanContext.getInvalid());
SpanData span2 =
TestSpanData.builder()
.setHasEnded(true)
.setSpanContext(SPAN_CONTEXT_2)
.setName("GET /api/endpoint/2")
.setStartEpochNanos(TimeUnit.MILLISECONDS.toNanos(startMs))
.setEndEpochNanos(TimeUnit.MILLISECONDS.toNanos(endMs))
.setStatus(StatusData.ok())
.setKind(SpanKind.CONSUMER)
.setLinks(Collections.emptyList())
.setTotalRecordedLinks(0)
.setTotalRecordedEvents(0)
.setInstrumentationScopeInfo(
InstrumentationScopeInfo.builder("io.opentelemetry.auto")
.setVersion("1.0.0")
.build())
.setResource(
Resource.create(
Attributes.of(
ResourceAttributes.SERVICE_NAME,
"myServiceName2",
AttributeKey.stringKey("resource-attr-key-2"),
"resource-attr-value-2")))
.build();
testSpanData(
Resource.create(
Attributes.of(
ResourceAttributes.SERVICE_NAME,
"myServiceName2",
AttributeKey.stringKey("resource-attr-key-2"),
"resource-attr-value-2")),
"GET /api/endpoint/2",
SPAN_CONTEXT_2,
SpanContext.getInvalid());
// test
CompletableResultCode result = exporter.export(Arrays.asList(span, span2));
@ -232,8 +194,8 @@ class JaegerThriftSpanExporterTest {
.setSpanId(SPAN_ID_LONG)
.setOperationName("GET /api/endpoint/1")
.setReferences(Collections.emptyList())
.setStartTime(TimeUnit.MILLISECONDS.toMicros(startMs))
.setDuration(TimeUnit.MILLISECONDS.toMicros(duration))
.setStartTime(TimeUnit.NANOSECONDS.toMicros(span.getStartEpochNanos()))
.setDuration(DURATION.toMillis() * 1000)
.setLogs(Collections.emptyList());
expectedSpan1.addToTags(new Tag("span.kind", TagType.STRING).setVStr("consumer"));
expectedSpan1.addToTags(new Tag("otel.status_code", TagType.STRING).setVStr("OK"));
@ -251,8 +213,8 @@ class JaegerThriftSpanExporterTest {
.setSpanId(SPAN_ID_2_LONG)
.setOperationName("GET /api/endpoint/2")
.setReferences(Collections.emptyList())
.setStartTime(TimeUnit.MILLISECONDS.toMicros(startMs))
.setDuration(TimeUnit.MILLISECONDS.toMicros(duration))
.setStartTime(TimeUnit.NANOSECONDS.toMicros(span2.getStartEpochNanos()))
.setDuration(DURATION.toMillis() * 1000)
.setLogs(Collections.emptyList());
expectedSpan2.addToTags(new Tag("span.kind", TagType.STRING).setVStr("consumer"));
expectedSpan2.addToTags(new Tag("otel.status_code", TagType.STRING).setVStr("OK"));
@ -266,4 +228,47 @@ class JaegerThriftSpanExporterTest {
verify(thriftSender).send(expectedProcess2, Collections.singletonList(expectedSpan2));
verify(thriftSender).send(expectedProcess1, Collections.singletonList(expectedSpan1));
}
@Test
@SuppressLogger(JaegerThriftSpanExporter.class)
void shutdown() {
assertThat(exporter.shutdown().join(1, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(logs.getEvents()).isEmpty();
assertThat(
exporter
.export(
Collections.singletonList(
testSpanData(
Resource.getDefault(),
"span name",
SPAN_CONTEXT,
SpanContext.getInvalid())))
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isFalse();
assertThat(exporter.shutdown().join(1, TimeUnit.SECONDS).isSuccess()).isTrue();
logs.assertContains("Calling shutdown() multiple times.");
}
private static SpanData testSpanData(
Resource resource, String spanName, SpanContext spanContext, SpanContext parentContext) {
long startMs = System.currentTimeMillis();
long endMs = startMs + DURATION.toMillis();
return TestSpanData.builder()
.setHasEnded(true)
.setSpanContext(spanContext)
.setParentSpanContext(parentContext)
.setName(spanName)
.setStartEpochNanos(TimeUnit.MILLISECONDS.toNanos(startMs))
.setEndEpochNanos(TimeUnit.MILLISECONDS.toNanos(endMs))
.setStatus(StatusData.ok())
.setKind(SpanKind.CONSUMER)
.setLinks(Collections.emptyList())
.setTotalRecordedLinks(0)
.setTotalRecordedEvents(0)
.setInstrumentationScopeInfo(
InstrumentationScopeInfo.builder("io.opentelemetry.auto").setVersion("1.0.0").build())
.setResource(resource)
.build();
}
}

View File

@ -15,6 +15,7 @@ import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.grpc.protocol.AbstractUnaryGrpcService;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.MeterProvider;
@ -24,12 +25,15 @@ import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceId;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporter;
import io.opentelemetry.exporter.jaeger.proto.api_v2.Collector;
import io.opentelemetry.exporter.jaeger.proto.api_v2.Model;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.trace.TestSpanData;
import io.opentelemetry.sdk.trace.IdGenerator;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
@ -53,10 +57,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
class JaegerGrpcSpanExporterTest {
private static final String TRACE_ID = "00000000000000000000000000abc123";
private static final String SPAN_ID = "0000000000def456";
private static final String SPAN_ID_2 = "0000000000aef789";
private static final BlockingQueue<Collector.PostSpansRequest> postedRequests =
new LinkedBlockingDeque<>();
@ -85,6 +85,9 @@ class JaegerGrpcSpanExporterTest {
}
};
@RegisterExtension
LogCapturer logs = LogCapturer.create().captureForType(OkHttpGrpcExporter.class);
private static JaegerGrpcSpanExporter exporter;
@BeforeAll
@ -108,35 +111,15 @@ class JaegerGrpcSpanExporterTest {
@Test
void testExport() throws Exception {
long duration = 900; // ms
long startMs = System.currentTimeMillis();
long endMs = startMs + duration;
SpanData span =
TestSpanData.builder()
.setHasEnded(true)
.setSpanContext(
SpanContext.create(
TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault()))
.setName("GET /api/endpoint")
.setStartEpochNanos(TimeUnit.MILLISECONDS.toNanos(startMs))
.setEndEpochNanos(TimeUnit.MILLISECONDS.toNanos(endMs))
.setStatus(StatusData.ok())
.setKind(SpanKind.CONSUMER)
.setLinks(Collections.emptyList())
.setTotalRecordedLinks(0)
.setTotalRecordedEvents(0)
.setInstrumentationScopeInfo(
InstrumentationScopeInfo.builder("io.opentelemetry.auto")
.setVersion("1.0.0")
.build())
.setResource(
Resource.create(
Attributes.of(
ResourceAttributes.SERVICE_NAME,
"myServiceName",
AttributeKey.stringKey("resource-attr-key"),
"resource-attr-value")))
.build();
testSpanData(
Resource.create(
Attributes.of(
ResourceAttributes.SERVICE_NAME,
"myServiceName",
AttributeKey.stringKey("resource-attr-key"),
"resource-attr-value")),
"GET /api/endpoint");
// test
CompletableResultCode result = exporter.export(Collections.singletonList(span));
@ -147,7 +130,8 @@ class JaegerGrpcSpanExporterTest {
assertThat(postedRequests).hasSize(1);
Model.Batch batch = postedRequests.poll().getBatch();
assertThat(batch.getSpans(0).getOperationName()).isEqualTo("GET /api/endpoint");
assertThat(SpanId.fromBytes(batch.getSpans(0).getSpanId().toByteArray())).isEqualTo(SPAN_ID);
assertThat(SpanId.fromBytes(batch.getSpans(0).getSpanId().toByteArray()))
.isEqualTo(span.getSpanContext().getSpanId());
assertThat(
getTagValue(batch.getProcess().getTagsList(), "resource-attr-key")
@ -161,62 +145,25 @@ class JaegerGrpcSpanExporterTest {
@Test
void testExportMultipleResources() throws Exception {
long duration = 900; // ms
long startMs = System.currentTimeMillis();
long endMs = startMs + duration;
SpanData span =
TestSpanData.builder()
.setHasEnded(true)
.setSpanContext(
SpanContext.create(
TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault()))
.setName("GET /api/endpoint/1")
.setStartEpochNanos(TimeUnit.MILLISECONDS.toNanos(startMs))
.setEndEpochNanos(TimeUnit.MILLISECONDS.toNanos(endMs))
.setStatus(StatusData.ok())
.setKind(SpanKind.CONSUMER)
.setLinks(Collections.emptyList())
.setTotalRecordedLinks(0)
.setTotalRecordedEvents(0)
.setInstrumentationScopeInfo(
InstrumentationScopeInfo.builder("io.opentelemetry.auto")
.setVersion("1.0.0")
.build())
.setResource(
Resource.create(
Attributes.of(
ResourceAttributes.SERVICE_NAME,
"myServiceName1",
AttributeKey.stringKey("resource-attr-key-1"),
"resource-attr-value-1")))
.build();
testSpanData(
Resource.create(
Attributes.of(
ResourceAttributes.SERVICE_NAME,
"myServiceName1",
AttributeKey.stringKey("resource-attr-key-1"),
"resource-attr-value-1")),
"GET /api/endpoint/1");
SpanData span2 =
TestSpanData.builder()
.setHasEnded(true)
.setSpanContext(
SpanContext.create(
TRACE_ID, SPAN_ID_2, TraceFlags.getSampled(), TraceState.getDefault()))
.setName("GET /api/endpoint/2")
.setStartEpochNanos(TimeUnit.MILLISECONDS.toNanos(startMs))
.setEndEpochNanos(TimeUnit.MILLISECONDS.toNanos(endMs))
.setStatus(StatusData.ok())
.setKind(SpanKind.CONSUMER)
.setLinks(Collections.emptyList())
.setTotalRecordedLinks(0)
.setTotalRecordedEvents(0)
.setInstrumentationScopeInfo(
InstrumentationScopeInfo.builder("io.opentelemetry.auto")
.setVersion("1.0.0")
.build())
.setResource(
Resource.create(
Attributes.of(
ResourceAttributes.SERVICE_NAME,
"myServiceName2",
AttributeKey.stringKey("resource-attr-key-2"),
"resource-attr-value-2")))
.build();
testSpanData(
Resource.create(
Attributes.of(
ResourceAttributes.SERVICE_NAME,
"myServiceName2",
AttributeKey.stringKey("resource-attr-key-2"),
"resource-attr-value-2")),
"GET /api/endpoint/2");
// test
CompletableResultCode result = exporter.export(Arrays.asList(span, span2));
@ -240,13 +187,13 @@ class JaegerGrpcSpanExporterTest {
assertThat(processTag2.isPresent()).isFalse();
assertThat(batch.getSpans(0).getOperationName()).isEqualTo("GET /api/endpoint/1");
assertThat(SpanId.fromBytes(batch.getSpans(0).getSpanId().toByteArray()))
.isEqualTo(SPAN_ID);
.isEqualTo(span.getSpanContext().getSpanId());
assertThat(processTag.get().getVStr()).isEqualTo("resource-attr-value-1");
assertThat(batch.getProcess().getServiceName()).isEqualTo("myServiceName1");
} else if (processTag2.isPresent()) {
assertThat(batch.getSpans(0).getOperationName()).isEqualTo("GET /api/endpoint/2");
assertThat(SpanId.fromBytes(batch.getSpans(0).getSpanId().toByteArray()))
.isEqualTo(SPAN_ID_2);
.isEqualTo(span2.getSpanContext().getSpanId());
assertThat(processTag2.get().getVStr()).isEqualTo("resource-attr-value-2");
assertThat(batch.getProcess().getServiceName()).isEqualTo("myServiceName2");
} else {
@ -257,7 +204,7 @@ class JaegerGrpcSpanExporterTest {
private static void verifyBatch(Model.Batch batch) throws Exception {
assertThat(batch.getSpansCount()).isEqualTo(1);
assertThat(TraceId.fromBytes(batch.getSpans(0).getTraceId().toByteArray())).isEqualTo(TRACE_ID);
assertThat(TraceId.fromBytes(batch.getSpans(0).getTraceId().toByteArray())).isNotNull();
assertThat(batch.getProcess().getTagsCount()).isEqualTo(5);
assertThat(
@ -311,6 +258,32 @@ class JaegerGrpcSpanExporterTest {
return tags.stream().filter(kv -> kv.getKey().equals(tagKey)).findFirst();
}
private static SpanData testSpanData(Resource resource, String spanName) {
long duration = 900; // ms
long startMs = System.currentTimeMillis();
long endMs = startMs + duration;
return TestSpanData.builder()
.setHasEnded(true)
.setSpanContext(
SpanContext.create(
IdGenerator.random().generateTraceId(),
IdGenerator.random().generateSpanId(),
TraceFlags.getSampled(),
TraceState.getDefault()))
.setName(spanName)
.setStartEpochNanos(TimeUnit.MILLISECONDS.toNanos(startMs))
.setEndEpochNanos(TimeUnit.MILLISECONDS.toNanos(endMs))
.setStatus(StatusData.ok())
.setKind(SpanKind.CONSUMER)
.setLinks(Collections.emptyList())
.setTotalRecordedLinks(0)
.setTotalRecordedEvents(0)
.setInstrumentationScopeInfo(
InstrumentationScopeInfo.builder("io.opentelemetry.auto").setVersion("1.0.0").build())
.setResource(resource)
.build();
}
@Test
void validTrustedConfig() {
assertThatCode(
@ -411,10 +384,19 @@ class JaegerGrpcSpanExporterTest {
}
@Test
void doubleShutdown() {
@SuppressLogger(OkHttpGrpcExporter.class)
void shutdown() {
JaegerGrpcSpanExporter exporter =
JaegerGrpcSpanExporter.builder().setEndpoint(server.httpUri().toString()).build();
assertThat(exporter.shutdown().join(1, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(logs.getEvents()).isEmpty();
assertThat(
exporter
.export(Collections.singletonList(testSpanData(Resource.getDefault(), "span name")))
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isFalse();
assertThat(exporter.shutdown().join(1, TimeUnit.SECONDS).isSuccess()).isTrue();
logs.assertContains("Calling shutdown() multiple times.");
}
}

View File

@ -15,6 +15,7 @@ import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -29,6 +30,8 @@ public final class OtlpJsonLoggingLogRecordExporter implements LogRecordExporter
private static final Logger logger =
Logger.getLogger(OtlpJsonLoggingLogRecordExporter.class.getName());
private final AtomicBoolean isShutdown = new AtomicBoolean();
/** Returns a new {@link OtlpJsonLoggingLogRecordExporter}. */
public static LogRecordExporter create() {
return new OtlpJsonLoggingLogRecordExporter();
@ -38,6 +41,10 @@ public final class OtlpJsonLoggingLogRecordExporter implements LogRecordExporter
@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}
ResourceLogsMarshaler[] allResourceLogs = ResourceLogsMarshaler.create(logs);
for (ResourceLogsMarshaler resourceLogs : allResourceLogs) {
SegmentedStringWriter sw = new SegmentedStringWriter(JSON_FACTORY._getBufferRecycler());
@ -59,6 +66,9 @@ public final class OtlpJsonLoggingLogRecordExporter implements LogRecordExporter
@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.WARNING, "Calling shutdown() multiple times.");
}
return CompletableResultCode.ofSuccess();
}
}

View File

@ -17,6 +17,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -29,6 +30,8 @@ public final class OtlpJsonLoggingMetricExporter implements MetricExporter {
private static final Logger logger =
Logger.getLogger(OtlpJsonLoggingMetricExporter.class.getName());
private final AtomicBoolean isShutdown = new AtomicBoolean();
private final AggregationTemporality aggregationTemporality;
/**
@ -68,6 +71,10 @@ public final class OtlpJsonLoggingMetricExporter implements MetricExporter {
@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}
ResourceMetricsMarshaler[] allResourceMetrics = ResourceMetricsMarshaler.create(metrics);
for (ResourceMetricsMarshaler resourceMetrics : allResourceMetrics) {
SegmentedStringWriter sw = new SegmentedStringWriter(JSON_FACTORY._getBufferRecycler());
@ -89,6 +96,9 @@ public final class OtlpJsonLoggingMetricExporter implements MetricExporter {
@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.WARNING, "Calling shutdown() multiple times.");
}
return CompletableResultCode.ofSuccess();
}
}

View File

@ -13,6 +13,7 @@ import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -25,6 +26,8 @@ public final class OtlpJsonLoggingSpanExporter implements SpanExporter {
private static final Logger logger =
Logger.getLogger(OtlpJsonLoggingSpanExporter.class.getName());
private final AtomicBoolean isShutdown = new AtomicBoolean();
/** Returns a new {@link OtlpJsonLoggingSpanExporter}. */
public static SpanExporter create() {
return new OtlpJsonLoggingSpanExporter();
@ -34,6 +37,10 @@ public final class OtlpJsonLoggingSpanExporter implements SpanExporter {
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}
ResourceSpansMarshaler[] allResourceSpans = ResourceSpansMarshaler.create(spans);
for (ResourceSpansMarshaler resourceSpans : allResourceSpans) {
SegmentedStringWriter sw =
@ -56,6 +63,9 @@ public final class OtlpJsonLoggingSpanExporter implements SpanExporter {
@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.WARNING, "Calling shutdown() multiple times.");
}
return CompletableResultCode.ofSuccess();
}
}

View File

@ -23,6 +23,7 @@ import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.logs.TestLogRecordData;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -166,5 +167,11 @@ class OtlpJsonLoggingLogRecordExporterTest {
@Test
void shutdown() {
assertThat(exporter.shutdown().isSuccess()).isTrue();
assertThat(
exporter.export(Collections.singletonList(LOG1)).join(10, TimeUnit.SECONDS).isSuccess())
.isFalse();
assertThat(logs.getEvents()).isEmpty();
assertThat(exporter.shutdown().isSuccess()).isTrue();
logs.assertContains("Calling shutdown() multiple times.");
}
}

View File

@ -21,6 +21,8 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -170,5 +172,14 @@ class OtlpJsonLoggingMetricExporterTest {
@Test
void shutdown() {
assertThat(exporter.shutdown().isSuccess()).isTrue();
assertThat(
exporter
.export(Collections.singletonList(METRIC1))
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isFalse();
assertThat(logs.getEvents()).isEmpty();
assertThat(exporter.shutdown().isSuccess()).isTrue();
logs.assertContains("Calling shutdown() multiple times.");
}
}

View File

@ -26,6 +26,7 @@ import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -191,5 +192,14 @@ class OtlpJsonLoggingSpanExporterTest {
@Test
void shutdown() {
assertThat(exporter.shutdown().isSuccess()).isTrue();
assertThat(
exporter
.export(Collections.singletonList(SPAN1))
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isFalse();
assertThat(logs.getEvents()).isEmpty();
assertThat(exporter.shutdown().isSuccess()).isTrue();
logs.assertContains("Calling shutdown() multiple times.");
}
}

View File

@ -11,6 +11,7 @@ import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -18,6 +19,7 @@ import java.util.logging.Logger;
public final class LoggingMetricExporter implements MetricExporter {
private static final Logger logger = Logger.getLogger(LoggingMetricExporter.class.getName());
private final AtomicBoolean isShutdown = new AtomicBoolean();
private final AggregationTemporality aggregationTemporality;
/**
@ -64,6 +66,10 @@ public final class LoggingMetricExporter implements MetricExporter {
@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}
logger.info("Received a collection of " + metrics.size() + " metrics for export.");
for (MetricData metricData : metrics) {
logger.log(Level.INFO, "metric: {0}", metricData);
@ -91,8 +97,10 @@ public final class LoggingMetricExporter implements MetricExporter {
@Override
public CompletableResultCode shutdown() {
// no-op
this.flush();
return CompletableResultCode.ofSuccess();
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.WARNING, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
return flush();
}
}

View File

@ -10,6 +10,7 @@ import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -18,6 +19,8 @@ import java.util.logging.Logger;
public final class LoggingSpanExporter implements SpanExporter {
private static final Logger logger = Logger.getLogger(LoggingSpanExporter.class.getName());
private final AtomicBoolean isShutdown = new AtomicBoolean();
/** Returns a new {@link LoggingSpanExporter}. */
public static LoggingSpanExporter create() {
return new LoggingSpanExporter();
@ -33,6 +36,10 @@ public final class LoggingSpanExporter implements SpanExporter {
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}
// We always have 32 + 16 + name + several whitespace, 60 seems like an OK initial guess.
StringBuilder sb = new StringBuilder(60);
for (SpanData span : spans) {
@ -80,6 +87,10 @@ public final class LoggingSpanExporter implements SpanExporter {
@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.WARNING, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
return flush();
}
}

View File

@ -15,6 +15,7 @@ import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A {@link LogRecordExporter} implementation that outputs log records to standard out. The output
@ -29,6 +30,8 @@ import java.util.Collection;
public class SystemOutLogRecordExporter implements LogRecordExporter {
private static final DateTimeFormatter ISO_FORMAT = DateTimeFormatter.ISO_DATE_TIME;
private final AtomicBoolean isShutdown = new AtomicBoolean();
/** Returns a new {@link SystemOutLogRecordExporter}. */
public static SystemOutLogRecordExporter create() {
return new SystemOutLogRecordExporter();
@ -38,6 +41,10 @@ public class SystemOutLogRecordExporter implements LogRecordExporter {
@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}
StringBuilder stringBuilder = new StringBuilder(60);
for (LogRecordData log : logs) {
@ -82,6 +89,10 @@ public class SystemOutLogRecordExporter implements LogRecordExporter {
@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
System.out.println("Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
return CompletableResultCode.ofSuccess();
}
}

View File

@ -8,46 +8,60 @@ package io.opentelemetry.exporter.logging;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static org.assertj.core.api.Assertions.assertThat;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableValueAtQuantile;
import io.opentelemetry.sdk.resources.Resource;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import java.util.logging.StreamHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@SuppressLogger(LoggingMetricExporter.class)
class LoggingMetricExporterTest {
LoggingMetricExporter exporter;
private static final MetricData METRIC_DATA =
ImmutableMetricData.createLongSum(
Resource.create(Attributes.of(stringKey("host"), "localhost")),
InstrumentationScopeInfo.builder("manualInstrumentation").setVersion("1.0").build(),
"counterOne",
"A simple counter",
"one",
ImmutableSumData.create(
true,
AggregationTemporality.CUMULATIVE,
Collections.singletonList(
ImmutableLongPointData.create(
TimeUnit.MILLISECONDS.toNanos(Instant.now().toEpochMilli()),
TimeUnit.MILLISECONDS.toNanos(Instant.now().plusMillis(245).toEpochMilli()),
Attributes.of(stringKey("z"), "y", stringKey("x"), "w"),
1010))));
@RegisterExtension
LogCapturer logs = LogCapturer.create().captureForType(LoggingMetricExporter.class);
private LoggingMetricExporter exporter;
@BeforeEach
void setUp() {
exporter = LoggingMetricExporter.create();
}
@AfterEach
void tearDown() {
exporter.shutdown();
}
@Test
void preferredTemporality() {
assertThat(LoggingMetricExporter.create().getAggregationTemporality(InstrumentType.COUNTER))
@ -59,64 +73,21 @@ class LoggingMetricExporterTest {
}
@Test
void testExport() {
long nowEpochNanos = System.currentTimeMillis() * 1000 * 1000;
Resource resource = Resource.create(Attributes.of(stringKey("host"), "localhost"));
InstrumentationScopeInfo instrumentationScopeInfo =
InstrumentationScopeInfo.builder("manualInstrumentation").setVersion("1.0").build();
exporter.export(
Arrays.asList(
ImmutableMetricData.createDoubleSummary(
resource,
instrumentationScopeInfo,
"measureOne",
"A summarized test measure",
"ms",
ImmutableSummaryData.create(
Collections.singletonList(
ImmutableSummaryPointData.create(
nowEpochNanos,
nowEpochNanos + 245,
Attributes.of(stringKey("a"), "b", stringKey("c"), "d"),
1010,
50000,
Arrays.asList(
ImmutableValueAtQuantile.create(0.0, 25),
ImmutableValueAtQuantile.create(1.0, 433)))))),
ImmutableMetricData.createLongSum(
resource,
instrumentationScopeInfo,
"counterOne",
"A simple counter",
"one",
ImmutableSumData.create(
true,
AggregationTemporality.CUMULATIVE,
Collections.singletonList(
ImmutableLongPointData.create(
nowEpochNanos,
nowEpochNanos + 245,
Attributes.of(stringKey("z"), "y", stringKey("x"), "w"),
1010)))),
ImmutableMetricData.createDoubleSum(
resource,
instrumentationScopeInfo,
"observedValue",
"an observer gauge",
"kb",
ImmutableSumData.create(
true,
AggregationTemporality.CUMULATIVE,
Collections.singletonList(
ImmutableDoublePointData.create(
nowEpochNanos,
nowEpochNanos + 245,
Attributes.of(stringKey("1"), "2", stringKey("3"), "4"),
33.7767))))));
void export() {
assertThat(exporter.export(Collections.singletonList(METRIC_DATA)).isSuccess()).isTrue();
assertThat(logs.getEvents())
.satisfiesExactly(
loggingEvent ->
assertThat(loggingEvent.getMessage())
.isEqualTo("Received a collection of 1 metrics for export."),
loggingEvent -> {
assertThat(loggingEvent.getMessage()).isEqualTo("metric: {0}");
assertThat(loggingEvent.getArgumentArray()).isEqualTo(new MetricData[] {METRIC_DATA});
});
}
@Test
void testFlush() {
void flush() {
AtomicBoolean flushed = new AtomicBoolean(false);
Logger.getLogger(LoggingMetricExporter.class.getName())
.addHandler(
@ -129,4 +100,18 @@ class LoggingMetricExporterTest {
exporter.flush();
assertThat(flushed.get()).isTrue();
}
@Test
void shutdown() {
assertThat(exporter.shutdown().isSuccess()).isTrue();
assertThat(
exporter
.export(Collections.singletonList(METRIC_DATA))
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isFalse();
assertThat(logs.getEvents()).isEmpty();
assertThat(exporter.shutdown().isSuccess()).isTrue();
logs.assertContains("Calling shutdown() multiple times.");
}
}

View File

@ -8,7 +8,6 @@ package io.opentelemetry.exporter.logging;
import static io.opentelemetry.api.common.AttributeKey.booleanKey;
import static io.opentelemetry.api.common.AttributeKey.longKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import io.github.netmikey.logunit.api.LogCapturer;
@ -18,7 +17,6 @@ import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.testing.trace.TestSpanData;
import io.opentelemetry.sdk.trace.data.EventData;
@ -33,7 +31,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import java.util.logging.StreamHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -96,14 +93,9 @@ class LoggingSpanExporterTest {
exporter = LoggingSpanExporter.create();
}
@AfterEach
void tearDown() {
exporter.close();
}
@Test
void log() {
exporter.export(Arrays.asList(SPAN1, SPAN2));
void export() {
assertThat(exporter.export(Arrays.asList(SPAN1, SPAN2)).isSuccess()).isTrue();
assertThat(logs.getEvents())
.hasSize(2)
@ -120,37 +112,7 @@ class LoggingSpanExporterTest {
}
@Test
void returnCode() {
long epochNanos = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis());
SpanData spanData =
TestSpanData.builder()
.setHasEnded(true)
.setSpanContext(
SpanContext.create(
"12345678876543211234567887654321",
"8765432112345678",
TraceFlags.getSampled(),
TraceState.getDefault()))
.setStartEpochNanos(epochNanos)
.setEndEpochNanos(epochNanos + 1000)
.setStatus(StatusData.ok())
.setName("testSpan")
.setKind(SpanKind.INTERNAL)
.setEvents(
Collections.singletonList(
EventData.create(
epochNanos + 500,
"somethingHappenedHere",
Attributes.of(booleanKey("important"), true))))
.setTotalRecordedEvents(1)
.setTotalRecordedLinks(0)
.build();
CompletableResultCode resultCode = exporter.export(singletonList(spanData));
assertThat(resultCode.isSuccess()).isTrue();
}
@Test
void testFlush() {
void flush() {
AtomicBoolean flushed = new AtomicBoolean(false);
Logger.getLogger(LoggingSpanExporter.class.getName())
.addHandler(
@ -163,4 +125,18 @@ class LoggingSpanExporterTest {
exporter.flush();
assertThat(flushed.get()).isTrue();
}
@Test
void shutdown() {
assertThat(exporter.shutdown().isSuccess()).isTrue();
assertThat(
exporter
.export(Collections.singletonList(SPAN1))
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isFalse();
assertThat(logs.getEvents()).isEmpty();
assertThat(exporter.shutdown().isSuccess()).isTrue();
logs.assertContains("Calling shutdown() multiple times.");
}
}

View File

@ -15,7 +15,6 @@ import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.resources.Resource;
@ -23,18 +22,17 @@ import io.opentelemetry.sdk.testing.logs.TestLogRecordData;
import java.time.LocalDateTime;
import java.time.Month;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
class SystemOutLogRecordExporterTest {
@Test
void returnCodes() {
void export() {
SystemOutLogRecordExporter exporter = SystemOutLogRecordExporter.create();
CompletableResultCode resultCode =
exporter.export(singletonList(sampleLog(System.currentTimeMillis())));
assertThat(resultCode).isSameAs(CompletableResultCode.ofSuccess());
assertThat(exporter.shutdown()).isSameAs(CompletableResultCode.ofSuccess());
assertThat(exporter.export(singletonList(sampleLog(System.currentTimeMillis()))).isSuccess())
.isTrue();
}
@Test
@ -50,6 +48,19 @@ class SystemOutLogRecordExporterTest {
+ "[scopeInfo: logTest:1.0] {amount=1, cheese=\"cheddar\"}");
}
@Test
void shutdown() {
SystemOutLogRecordExporter exporter = SystemOutLogRecordExporter.create();
assertThat(exporter.shutdown().isSuccess()).isTrue();
assertThat(
exporter
.export(Collections.singletonList(sampleLog(System.currentTimeMillis())))
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isFalse();
assertThat(exporter.shutdown().isSuccess()).isTrue();
}
private static LogRecordData sampleLog(long timestamp) {
return TestLogRecordData.builder()
.setResource(Resource.empty())

View File

@ -402,14 +402,19 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isFalse();
assertThat(httpRequests).isEmpty();
}
@Test
@SuppressLogger(OkHttpGrpcExporter.class)
@SuppressLogger(UpstreamGrpcExporter.class)
void doubleShutdown() {
TelemetryExporter<T> exporter =
exporterBuilder().setEndpoint(server.httpUri().toString()).build();
assertThat(exporter.shutdown().join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(logs.getEvents()).isEmpty();
assertThat(exporter.shutdown().join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
logs.assertContains("Calling shutdown() multiple times.");
}
@Test

View File

@ -431,13 +431,17 @@ public abstract class AbstractHttpTelemetryExporterTest<T, U extends Message> {
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isFalse();
assertThat(httpRequests).isEmpty();
}
@Test
@SuppressLogger(OkHttpExporter.class)
void doubleShutdown() {
TelemetryExporter<T> exporter = exporterBuilder().setEndpoint(server.httpUri() + path).build();
assertThat(exporter.shutdown().join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(logs.getEvents()).isEmpty();
assertThat(exporter.shutdown().join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
logs.assertContains("Calling shutdown() multiple times.");
}
@Test

View File

@ -15,6 +15,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -36,6 +37,7 @@ public final class ZipkinSpanExporter implements SpanExporter {
public static final String DEFAULT_ENDPOINT = "http://localhost:9411/api/v2/spans";
private final ThrottlingLogger logger = new ThrottlingLogger(baseLogger);
private final AtomicBoolean isShutdown = new AtomicBoolean();
private final BytesEncoder<Span> encoder;
private final Sender sender;
private final ExporterMetrics exporterMetrics;
@ -58,6 +60,10 @@ public final class ZipkinSpanExporter implements SpanExporter {
@Override
public CompletableResultCode export(Collection<SpanData> spanDataList) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}
int numItems = spanDataList.size();
exporterMetrics.addSeen(numItems);
@ -96,6 +102,10 @@ public final class ZipkinSpanExporter implements SpanExporter {
@Override
public CompletableResultCode shutdown() {
if (!isShutdown.compareAndSet(false, true)) {
logger.log(Level.WARNING, "Calling shutdown() multiple times.");
return CompletableResultCode.ofSuccess();
}
try {
sender.close();
} catch (IOException e) {

View File

@ -14,6 +14,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.CompletableResultCode;
@ -24,6 +25,7 @@ import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import zipkin2.Call;
@ -41,6 +43,9 @@ class ZipkinSpanExporterTest {
@Mock private OtelToZipkinSpanTransformer mockTransformer;
@Mock private InetAddress localIp;
@RegisterExtension
LogCapturer logs = LogCapturer.create().captureForType(ZipkinSpanExporter.class);
@Test
void testExport() {
TestSpanData testSpanData = spanBuilder().build();
@ -110,11 +115,21 @@ class ZipkinSpanExporterTest {
}
@Test
@SuppressLogger(ZipkinSpanExporter.class)
void testShutdown() throws IOException {
ZipkinSpanExporter exporter = ZipkinSpanExporter.builder().setSender(mockSender).build();
exporter.shutdown();
assertThat(exporter.shutdown().isSuccess()).isTrue();
verify(mockSender).close();
assertThat(logs.getEvents()).isEmpty();
assertThat(
exporter
.export(Collections.singletonList(spanBuilder().build()))
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isFalse();
assertThat(exporter.shutdown().isSuccess()).isTrue();
logs.assertContains("Calling shutdown() multiple times.");
}
@Test