Cleanup OTLP exporter timeout settings. (#2446)

* Cleanup OTLP exporter timeout settings.

* error prone
This commit is contained in:
Anuraag Agrawal 2021-01-09 15:32:20 +09:00 committed by GitHub
parent 890015b082
commit 620969556f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 158 additions and 46 deletions

View File

@ -57,25 +57,36 @@ import javax.annotation.concurrent.ThreadSafe;
*/
@ThreadSafe
public final class OtlpGrpcMetricExporter implements MetricExporter {
public static final String DEFAULT_ENDPOINT = "localhost:4317";
public static final long DEFAULT_DEADLINE_MS = TimeUnit.SECONDS.toMillis(10);
/**
* Default endpoint.
*
* @deprecated Will be removed without replacement
*/
@Deprecated public static final String DEFAULT_ENDPOINT = "localhost:4317";
/**
* Default timeout.
*
* @deprecated Will be removed without replacement
*/
@Deprecated public static final long DEFAULT_DEADLINE_MS = TimeUnit.SECONDS.toMillis(10);
private static final Logger logger = Logger.getLogger(OtlpGrpcMetricExporter.class.getName());
private final MetricsServiceFutureStub metricsService;
private final ManagedChannel managedChannel;
private final long deadlineMs;
private final long timeoutNanos;
/**
* Creates a new OTLP gRPC Metric Reporter with the given name, using the given channel.
*
* @param channel the channel to use when communicating with the OpenTelemetry Collector.
* @param deadlineMs max waiting time for the collector to process each metric batch. When set to
* 0 or to a negative value, the exporter will wait indefinitely.
* @param timeoutNanos max waiting time for the collector to process each metric batch. When set
* to 0 or to a negative value, the exporter will wait indefinitely.
*/
OtlpGrpcMetricExporter(ManagedChannel channel, long deadlineMs) {
OtlpGrpcMetricExporter(ManagedChannel channel, long timeoutNanos) {
this.managedChannel = channel;
this.deadlineMs = deadlineMs;
this.timeoutNanos = timeoutNanos;
metricsService = MetricsServiceGrpc.newFutureStub(channel);
}
@ -94,8 +105,8 @@ public final class OtlpGrpcMetricExporter implements MetricExporter {
final CompletableResultCode result = new CompletableResultCode();
MetricsServiceFutureStub exporter;
if (deadlineMs > 0) {
exporter = metricsService.withDeadlineAfter(deadlineMs, TimeUnit.MILLISECONDS);
if (timeoutNanos > 0) {
exporter = metricsService.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS);
} else {
exporter = metricsService;
}

View File

@ -6,6 +6,8 @@
package io.opentelemetry.exporter.otlp.metrics;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;
import com.google.common.base.Splitter;
import io.grpc.ManagedChannel;
@ -13,22 +15,28 @@ import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.sdk.extension.otproto.CommonProperties;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/** Builder utility for this exporter. */
@SuppressWarnings("deprecation") // Remove after ConfigBuilder is deleted
public final class OtlpGrpcMetricExporterBuilder
extends io.opentelemetry.sdk.common.export.ConfigBuilder<OtlpGrpcMetricExporterBuilder> {
private static final String DEFAULT_ENDPOINT = "localhost:4317";
private static final long DEFAULT_TIMEOUT_SECS = 10;
private static final String KEY_TIMEOUT = "otel.exporter.otlp.metric.timeout";
private static final String KEY_ENDPOINT = "otel.exporter.otlp.metric.endpoint";
private static final String KEY_INSECURE = "otel.exporter.otlp.metric.insecure";
private static final String KEY_HEADERS = "otel.exporter.otlp.metric.headers";
private ManagedChannel channel;
private long deadlineMs = OtlpGrpcMetricExporter.DEFAULT_DEADLINE_MS; // 10 seconds
private String endpoint = OtlpGrpcMetricExporter.DEFAULT_ENDPOINT;
private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS);
private String endpoint = DEFAULT_ENDPOINT;
private boolean useTls = false;
@Nullable private Metadata metadata;
@ -45,15 +53,36 @@ public final class OtlpGrpcMetricExporterBuilder
return this;
}
/**
* Sets the maximum time to wait for the collector to process an exported batch of metrics. If
* unset, defaults to {@value DEFAULT_TIMEOUT_SECS}s.
*/
public OtlpGrpcMetricExporterBuilder setTimeout(long timeout, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(timeout >= 0, "timeout must be non-negative");
timeoutNanos = unit.toNanos(timeout);
return this;
}
/**
* Sets the maximum time to wait for the collector to process an exported batch of metrics. If
* unset, defaults to {@value DEFAULT_TIMEOUT_SECS}s.
*/
public OtlpGrpcMetricExporterBuilder setTimeout(Duration timeout) {
requireNonNull(timeout, "timeout");
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}
/**
* Sets the max waiting time for the collector to process each metric batch. Optional.
*
* @param deadlineMs the max waiting time
* @return this builder's instance
* @deprecated Use {@link #setTimeout(long, TimeUnit)}
*/
@Deprecated
public OtlpGrpcMetricExporterBuilder setDeadlineMs(long deadlineMs) {
this.deadlineMs = deadlineMs;
return this;
return setTimeout(Duration.ofMillis(deadlineMs));
}
/**
@ -117,7 +146,7 @@ public final class OtlpGrpcMetricExporterBuilder
channel = managedChannelBuilder.build();
}
return new OtlpGrpcMetricExporter(channel, deadlineMs);
return new OtlpGrpcMetricExporter(channel, timeoutNanos);
}
OtlpGrpcMetricExporterBuilder() {}
@ -138,7 +167,7 @@ public final class OtlpGrpcMetricExporterBuilder
value = getLongProperty(CommonProperties.KEY_TIMEOUT, configMap);
}
if (value != null) {
this.setDeadlineMs(value);
this.setTimeout(Duration.ofMillis(value));
}
String endpointValue = getStringProperty(KEY_ENDPOINT, configMap);

View File

@ -8,6 +8,7 @@ package io.opentelemetry.exporter.otlp.metrics;
import static com.google.common.base.Charsets.US_ASCII;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.google.common.io.Closer;
import io.grpc.ManagedChannel;
@ -29,6 +30,7 @@ import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricData.LongPoint;
import io.opentelemetry.sdk.resources.Resource;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -65,6 +67,20 @@ class OtlpGrpcMetricExporterTest {
closer.close();
}
@Test
@SuppressWarnings("PreferJavaTimeOverload")
void invalidConfig() {
assertThatThrownBy(() -> OtlpGrpcMetricExporter.builder().setTimeout(-1, TimeUnit.MILLISECONDS))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("timeout must be non-negative");
assertThatThrownBy(() -> OtlpGrpcMetricExporter.builder().setTimeout(1, null))
.isInstanceOf(NullPointerException.class)
.hasMessage("unit");
assertThatThrownBy(() -> OtlpGrpcMetricExporter.builder().setTimeout(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("timeout");
}
@Test
void configTest() {
Properties options = new Properties();
@ -73,7 +89,7 @@ class OtlpGrpcMetricExporterTest {
options.put("otel.exporter.otlp.headers", "test_1=1,test_2=2");
OtlpGrpcMetricExporterBuilder config = OtlpGrpcMetricExporter.builder().readProperties(options);
assertThat(config).extracting("useTls").isEqualTo(false);
assertThat(config).extracting("deadlineMs").isEqualTo(12L);
assertThat(config).extracting("timeoutNanos").isEqualTo(TimeUnit.MILLISECONDS.toNanos(12));
assertThat(config)
.extracting("metadata")
.extracting("namesAndValues")
@ -123,11 +139,10 @@ class OtlpGrpcMetricExporterTest {
@Test
void testExport_DeadlineSetPerExport() throws Exception {
int deadlineMs = 1500;
OtlpGrpcMetricExporter exporter =
OtlpGrpcMetricExporter.builder()
.setChannel(inProcessChannel)
.setDeadlineMs(deadlineMs)
.setTimeout(Duration.ofMillis(1500))
.build();
try {

View File

@ -63,8 +63,19 @@ import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
public final class OtlpGrpcSpanExporter implements SpanExporter {
public static final String DEFAULT_ENDPOINT = "localhost:4317";
public static final long DEFAULT_DEADLINE_MS = TimeUnit.SECONDS.toMillis(10);
/**
* Default endpoint.
*
* @deprecated Will be removed without replacement
*/
@Deprecated public static final String DEFAULT_ENDPOINT = "localhost:4317";
/**
* Default timeout.
*
* @deprecated Will be removed without replacement
*/
@Deprecated public static final long DEFAULT_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
private static final Logger logger = Logger.getLogger(OtlpGrpcSpanExporter.class.getName());
private static final String EXPORTER_NAME = OtlpGrpcSpanExporter.class.getSimpleName();
@ -78,7 +89,7 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {
private final TraceServiceFutureStub traceService;
private final ManagedChannel managedChannel;
private final long deadlineMs;
private final long timeoutNanos;
private final LongCounter.BoundLongCounter spansSeen;
private final LongCounter.BoundLongCounter spansExportedSuccess;
private final LongCounter.BoundLongCounter spansExportedFailure;
@ -87,10 +98,10 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {
* Creates a new OTLP gRPC Span Reporter with the given name, using the given channel.
*
* @param channel the channel to use when communicating with the OpenTelemetry Collector.
* @param deadlineMs max waiting time for the collector to process each span batch. When set to 0
* or to a negative value, the exporter will wait indefinitely.
* @param timeoutNanos max waiting time for the collector to process each span batch. When set to
* 0 or to a negative value, the exporter will wait indefinitely.
*/
OtlpGrpcSpanExporter(ManagedChannel channel, long deadlineMs) {
OtlpGrpcSpanExporter(ManagedChannel channel, long timeoutNanos) {
Meter meter = GlobalMetricsProvider.getMeter("io.opentelemetry.exporters.otlp");
this.spansSeen =
meter.longCounterBuilder("spansSeenByExporter").build().bind(EXPORTER_NAME_LABELS);
@ -98,7 +109,7 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {
this.spansExportedSuccess = spansExportedCounter.bind(EXPORT_SUCCESS_LABELS);
this.spansExportedFailure = spansExportedCounter.bind(EXPORT_FAILURE_LABELS);
this.managedChannel = channel;
this.deadlineMs = deadlineMs;
this.timeoutNanos = timeoutNanos;
this.traceService = TraceServiceGrpc.newFutureStub(channel);
}
@ -120,8 +131,8 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {
final CompletableResultCode result = new CompletableResultCode();
TraceServiceFutureStub exporter;
if (deadlineMs > 0) {
exporter = traceService.withDeadlineAfter(deadlineMs, TimeUnit.MILLISECONDS);
if (timeoutNanos > 0) {
exporter = traceService.withDeadlineAfter(timeoutNanos, TimeUnit.NANOSECONDS);
} else {
exporter = traceService;
}
@ -193,7 +204,7 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {
}
// Visible for testing
long getDeadlineMs() {
return deadlineMs;
long getTimeoutNanos() {
return timeoutNanos;
}
}

View File

@ -6,6 +6,8 @@
package io.opentelemetry.exporter.otlp.trace;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;
import com.google.common.base.Splitter;
import io.grpc.ManagedChannel;
@ -16,8 +18,10 @@ import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.sdk.extension.otproto.CommonProperties;
import java.io.ByteArrayInputStream;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
@ -26,14 +30,17 @@ import javax.net.ssl.SSLException;
public final class OtlpGrpcSpanExporterBuilder
extends io.opentelemetry.sdk.common.export.ConfigBuilder<OtlpGrpcSpanExporterBuilder> {
private static final String DEFAULT_ENDPOINT = "localhost:4317";
private static final long DEFAULT_TIMEOUT_SECS = 10;
private static final String KEY_TIMEOUT = "otel.exporter.otlp.span.timeout";
private static final String KEY_ENDPOINT = "otel.exporter.otlp.span.endpoint";
private static final String KEY_INSECURE = "otel.exporter.otlp.span.insecure";
private static final String KEY_HEADERS = "otel.exporter.otlp.span.headers";
private ManagedChannel channel;
private long deadlineMs = OtlpGrpcSpanExporter.DEFAULT_DEADLINE_MS; // 10 seconds
private String endpoint = OtlpGrpcSpanExporter.DEFAULT_ENDPOINT;
private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS);
private String endpoint = DEFAULT_ENDPOINT;
private boolean useTls = false;
@Nullable private Metadata metadata;
@Nullable private byte[] trustedCertificatesPem;
@ -50,15 +57,36 @@ public final class OtlpGrpcSpanExporterBuilder
return this;
}
/**
* Sets the maximum time to wait for the collector to process an exported batch of spans. If
* unset, defaults to {@value DEFAULT_TIMEOUT_SECS}s.
*/
public OtlpGrpcSpanExporterBuilder setTimeout(long timeout, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(timeout >= 0, "timeout must be non-negative");
timeoutNanos = unit.toNanos(timeout);
return this;
}
/**
* Sets the maximum time to wait for the collector to process an exported batch of spans. If
* unset, defaults to {@value DEFAULT_TIMEOUT_SECS}s.
*/
public OtlpGrpcSpanExporterBuilder setTimeout(Duration timeout) {
requireNonNull(timeout, "timeout");
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}
/**
* Sets the max waiting time for the collector to process each span batch. Optional.
*
* @param deadlineMs the max waiting time
* @return this builder's instance
* @deprecated Use {@link #setTimeout(long, TimeUnit)}
*/
@Deprecated
public OtlpGrpcSpanExporterBuilder setDeadlineMs(long deadlineMs) {
this.deadlineMs = deadlineMs;
return this;
return setTimeout(Duration.ofMillis(deadlineMs));
}
/**
@ -177,7 +205,7 @@ public final class OtlpGrpcSpanExporterBuilder
channel = managedChannelBuilder.build();
}
return new OtlpGrpcSpanExporter(channel, deadlineMs);
return new OtlpGrpcSpanExporter(channel, timeoutNanos);
}
OtlpGrpcSpanExporterBuilder() {}
@ -198,7 +226,7 @@ public final class OtlpGrpcSpanExporterBuilder
value = getLongProperty(CommonProperties.KEY_TIMEOUT, configMap);
}
if (value != null) {
this.setDeadlineMs(value);
this.setTimeout(Duration.ofMillis(value));
}
String endpointValue = getStringProperty(KEY_ENDPOINT, configMap);

View File

@ -6,6 +6,7 @@
package io.opentelemetry.exporter.otlp.trace;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import com.google.common.io.Closer;
@ -26,6 +27,7 @@ import io.opentelemetry.sdk.extension.otproto.SpanAdapter;
import io.opentelemetry.sdk.testing.trace.TestSpanData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -63,6 +65,20 @@ class OtlpGrpcSpanExporterTest {
closer.close();
}
@Test
@SuppressWarnings("PreferJavaTimeOverload")
void invalidConfig() {
assertThatThrownBy(() -> OtlpGrpcSpanExporter.builder().setTimeout(-1, TimeUnit.MILLISECONDS))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("timeout must be non-negative");
assertThatThrownBy(() -> OtlpGrpcSpanExporter.builder().setTimeout(1, null))
.isInstanceOf(NullPointerException.class)
.hasMessage("unit");
assertThatThrownBy(() -> OtlpGrpcSpanExporter.builder().setTimeout(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("timeout");
}
@Test
void testExport() {
SpanData span = generateFakeSpan();
@ -96,11 +112,10 @@ class OtlpGrpcSpanExporterTest {
@Test
void testExport_DeadlineSetPerExport() throws InterruptedException {
int deadlineMs = 1500;
OtlpGrpcSpanExporter exporter =
OtlpGrpcSpanExporter.builder()
.setChannel(inProcessChannel)
.setDeadlineMs(deadlineMs)
.setTimeout(Duration.ofMillis(1500))
.build();
try {

View File

@ -90,7 +90,7 @@ class OtlpGrpcSpanExporterTest {
});
OtlpGrpcSpanExporter exporter = builder.build();
assertThat(exporter.getDeadlineMs()).isEqualTo(5124);
assertThat(exporter.getTimeoutNanos()).isEqualTo(5124);
assertThat(
exporter
.export(

View File

@ -13,6 +13,7 @@ import io.opentelemetry.sdk.metrics.export.IntervalMetricReader;
import io.opentelemetry.sdk.metrics.export.IntervalMetricReaderBuilder;
import io.prometheus.client.exporter.HTTPServer;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -70,9 +71,9 @@ final class MetricExporterConfiguration {
config.getCommaSeparatedMap("otel.exporter.otlp.headers").forEach(builder::addHeader);
Long deadlineMs = config.getLong("otel.exporter.otlp.timeout");
if (deadlineMs != null) {
builder.setDeadlineMs(deadlineMs);
Long timeoutMillis = config.getLong("otel.exporter.otlp.timeout");
if (timeoutMillis != null) {
builder.setTimeout(Duration.ofMillis(timeoutMillis));
}
OtlpGrpcMetricExporter exporter = builder.build();

View File

@ -13,6 +13,7 @@ import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder;
import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
import io.opentelemetry.exporter.zipkin.ZipkinSpanExporterBuilder;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nullable;
@ -63,9 +64,9 @@ final class SpanExporterConfiguration {
config.getCommaSeparatedMap("otel.exporter.otlp.headers").forEach(builder::addHeader);
Long deadlineMs = config.getLong("otel.exporter.otlp.timeout");
if (deadlineMs != null) {
builder.setDeadlineMs(deadlineMs);
Long timeoutMillis = config.getLong("otel.exporter.otlp.timeout");
if (timeoutMillis != null) {
builder.setTimeout(Duration.ofMillis(timeoutMillis));
}
return builder.build();

View File

@ -12,6 +12,7 @@ import io.opentelemetry.sdk.trace.config.TraceConfig;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@ -98,7 +99,7 @@ public class SpanPipelineBenchmark {
return SimpleSpanProcessor.create(
OtlpGrpcSpanExporter.builder()
.setEndpoint(collectorAddress)
.setDeadlineMs(50000)
.setTimeout(Duration.ofSeconds(50))
.build());
}
@ -115,7 +116,7 @@ public class SpanPipelineBenchmark {
return BatchSpanProcessor.builder(
OtlpGrpcSpanExporter.builder()
.setEndpoint(collectorAddress)
.setDeadlineMs(50000)
.setTimeout(Duration.ofSeconds(50))
.build())
.build();
}