Add connectTimeout configuration option OtlpGrpc{Signal}Exporters (#6079)
This commit is contained in:
parent
96fe54fc16
commit
1d4a2f4a50
|
|
@ -1,2 +1,13 @@
|
|||
Comparing source compatibility of against
|
||||
No changes.
|
||||
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder (not serializable)
|
||||
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
|
||||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder setConnectTimeout(long, java.util.concurrent.TimeUnit)
|
||||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder setConnectTimeout(java.time.Duration)
|
||||
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder (not serializable)
|
||||
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
|
||||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder setConnectTimeout(long, java.util.concurrent.TimeUnit)
|
||||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder setConnectTimeout(java.time.Duration)
|
||||
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder (not serializable)
|
||||
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
|
||||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder setConnectTimeout(long, java.util.concurrent.TimeUnit)
|
||||
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder setConnectTimeout(java.time.Duration)
|
||||
|
|
|
|||
|
|
@ -43,6 +43,8 @@ import javax.net.ssl.X509TrustManager;
|
|||
@SuppressWarnings("JavadocMethod")
|
||||
public class GrpcExporterBuilder<T extends Marshaler> {
|
||||
|
||||
public static final long DEFAULT_CONNECT_TIMEOUT_SECS = 10;
|
||||
|
||||
private static final Logger LOGGER = Logger.getLogger(GrpcExporterBuilder.class.getName());
|
||||
|
||||
private final String exporterName;
|
||||
|
|
@ -52,6 +54,7 @@ public class GrpcExporterBuilder<T extends Marshaler> {
|
|||
grpcStubFactory;
|
||||
|
||||
private long timeoutNanos;
|
||||
private long connectTimeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_CONNECT_TIMEOUT_SECS);
|
||||
private URI endpoint;
|
||||
@Nullable private Compressor compressor;
|
||||
private final Map<String, String> constantHeaders = new HashMap<>();
|
||||
|
|
@ -92,6 +95,11 @@ public class GrpcExporterBuilder<T extends Marshaler> {
|
|||
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
public GrpcExporterBuilder<T> setConnectTimeout(long timeout, TimeUnit unit) {
|
||||
connectTimeoutNanos = unit.toNanos(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
public GrpcExporterBuilder<T> setEndpoint(String endpoint) {
|
||||
this.endpoint = ExporterBuilderUtil.validateEndpoint(endpoint);
|
||||
return this;
|
||||
|
|
@ -151,6 +159,7 @@ public class GrpcExporterBuilder<T extends Marshaler> {
|
|||
grpcEndpointPath);
|
||||
|
||||
copy.timeoutNanos = timeoutNanos;
|
||||
copy.connectTimeoutNanos = connectTimeoutNanos;
|
||||
copy.endpoint = endpoint;
|
||||
copy.compressor = compressor;
|
||||
copy.constantHeaders.putAll(constantHeaders);
|
||||
|
|
@ -193,6 +202,7 @@ public class GrpcExporterBuilder<T extends Marshaler> {
|
|||
grpcEndpointPath,
|
||||
compressor,
|
||||
timeoutNanos,
|
||||
connectTimeoutNanos,
|
||||
headerSupplier,
|
||||
grpcChannel,
|
||||
grpcStubFactory,
|
||||
|
|
@ -214,6 +224,7 @@ public class GrpcExporterBuilder<T extends Marshaler> {
|
|||
joiner.add("endpoint=" + endpoint.toString());
|
||||
joiner.add("endpointPath=" + grpcEndpointPath);
|
||||
joiner.add("timeoutNanos=" + timeoutNanos);
|
||||
joiner.add("connectTimeoutNanos=" + connectTimeoutNanos);
|
||||
joiner.add(
|
||||
"compressorEncoding="
|
||||
+ Optional.ofNullable(compressor).map(Compressor::getEncoding).orElse(null));
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ public interface GrpcSenderProvider {
|
|||
String endpointPath,
|
||||
@Nullable Compressor compressor,
|
||||
long timeoutNanos,
|
||||
long connectTimeoutNanos,
|
||||
Supplier<Map<String, List<String>>> headersSupplier,
|
||||
@Nullable Object managedChannel,
|
||||
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
|
||||
|
|
|
|||
|
|
@ -100,6 +100,7 @@ public class OltpExporterBenchmark {
|
|||
.toString(),
|
||||
null,
|
||||
10,
|
||||
10,
|
||||
Collections::emptyMap,
|
||||
null,
|
||||
null,
|
||||
|
|
|
|||
|
|
@ -101,6 +101,26 @@ public final class OtlpGrpcLogRecordExporterBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
|
||||
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
|
||||
*/
|
||||
public OtlpGrpcLogRecordExporterBuilder setConnectTimeout(long timeout, TimeUnit unit) {
|
||||
requireNonNull(unit, "unit");
|
||||
checkArgument(timeout >= 0, "timeout must be non-negative");
|
||||
delegate.setConnectTimeout(timeout, unit);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
|
||||
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
|
||||
*/
|
||||
public OtlpGrpcLogRecordExporterBuilder setConnectTimeout(Duration timeout) {
|
||||
requireNonNull(timeout, "timeout");
|
||||
return setConnectTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT_URL}. The
|
||||
* endpoint must start with either http:// or https://.
|
||||
|
|
|
|||
|
|
@ -113,6 +113,26 @@ public final class OtlpGrpcMetricExporterBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
|
||||
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
|
||||
*/
|
||||
public OtlpGrpcMetricExporterBuilder setConnectTimeout(long timeout, TimeUnit unit) {
|
||||
requireNonNull(unit, "unit");
|
||||
checkArgument(timeout >= 0, "timeout must be non-negative");
|
||||
delegate.setConnectTimeout(timeout, unit);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
|
||||
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
|
||||
*/
|
||||
public OtlpGrpcMetricExporterBuilder setConnectTimeout(Duration timeout) {
|
||||
requireNonNull(timeout, "timeout");
|
||||
return setConnectTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT_URL}. The
|
||||
* endpoint must start with either http:// or https://.
|
||||
|
|
|
|||
|
|
@ -97,6 +97,26 @@ public final class OtlpGrpcSpanExporterBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
|
||||
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
|
||||
*/
|
||||
public OtlpGrpcSpanExporterBuilder setConnectTimeout(long timeout, TimeUnit unit) {
|
||||
requireNonNull(unit, "unit");
|
||||
checkArgument(timeout >= 0, "timeout must be non-negative");
|
||||
delegate.setConnectTimeout(timeout, unit);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
|
||||
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
|
||||
*/
|
||||
public OtlpGrpcSpanExporterBuilder setConnectTimeout(Duration timeout) {
|
||||
requireNonNull(timeout, "timeout");
|
||||
return setConnectTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT_URL}. The
|
||||
* endpoint must start with either http:// or https://.
|
||||
|
|
|
|||
|
|
@ -456,6 +456,33 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressLogger(GrpcExporter.class)
|
||||
void connectTimeout() {
|
||||
// UpstreamGrpcSender doesn't support connectTimeout, so we skip the test
|
||||
assumeThat(exporter.unwrap())
|
||||
.extracting("delegate.grpcSender")
|
||||
.matches(sender -> sender.getClass().getSimpleName().equals("OkHttpGrpcSender"));
|
||||
|
||||
TelemetryExporter<T> exporter =
|
||||
exporterBuilder()
|
||||
// Connecting to a non-routable IP address to trigger connection error
|
||||
.setEndpoint("http://10.255.255.1")
|
||||
.setConnectTimeout(Duration.ofMillis(1))
|
||||
.build();
|
||||
try {
|
||||
long startTimeMillis = System.currentTimeMillis();
|
||||
CompletableResultCode result =
|
||||
exporter.export(Collections.singletonList(generateFakeTelemetry()));
|
||||
assertThat(result.join(10, TimeUnit.SECONDS).isSuccess()).isFalse();
|
||||
// Assert that the export request fails well before the default connect timeout of 10s
|
||||
assertThat(System.currentTimeMillis() - startTimeMillis)
|
||||
.isLessThan(TimeUnit.SECONDS.toMillis(1));
|
||||
} finally {
|
||||
exporter.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void deadlineSetPerExport() throws InterruptedException {
|
||||
TelemetryExporter<T> exporter =
|
||||
|
|
@ -840,6 +867,9 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
|
|||
+ "timeoutNanos="
|
||||
+ TimeUnit.SECONDS.toNanos(10)
|
||||
+ ", "
|
||||
+ "connectTimeoutNanos="
|
||||
+ TimeUnit.SECONDS.toNanos(10)
|
||||
+ ", "
|
||||
+ "compressorEncoding=null, "
|
||||
+ "headers=Headers\\{User-Agent=OBFUSCATED\\}"
|
||||
+ ".*" // Maybe additional grpcChannel field
|
||||
|
|
@ -851,6 +881,7 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
|
|||
telemetryExporter =
|
||||
exporterBuilder()
|
||||
.setTimeout(Duration.ofSeconds(5))
|
||||
.setConnectTimeout(Duration.ofSeconds(4))
|
||||
.setEndpoint("http://example:4317")
|
||||
.setCompression("gzip")
|
||||
.addHeader("foo", "bar")
|
||||
|
|
@ -877,6 +908,9 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
|
|||
+ "timeoutNanos="
|
||||
+ TimeUnit.SECONDS.toNanos(5)
|
||||
+ ", "
|
||||
+ "connectTimeoutNanos="
|
||||
+ TimeUnit.SECONDS.toNanos(4)
|
||||
+ ", "
|
||||
+ "compressorEncoding=gzip, "
|
||||
+ "headers=Headers\\{.*foo=OBFUSCATED.*\\}, "
|
||||
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3\\}"
|
||||
|
|
|
|||
|
|
@ -49,7 +49,8 @@ final class GrpcLogRecordExporterBuilderWrapper implements TelemetryExporterBuil
|
|||
|
||||
@Override
|
||||
public TelemetryExporterBuilder<LogRecordData> setConnectTimeout(Duration timeout) {
|
||||
throw new UnsupportedOperationException();
|
||||
builder.setConnectTimeout(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -49,7 +49,8 @@ final class GrpcMetricExporterBuilderWrapper implements TelemetryExporterBuilder
|
|||
|
||||
@Override
|
||||
public TelemetryExporterBuilder<MetricData> setConnectTimeout(Duration timeout) {
|
||||
throw new UnsupportedOperationException();
|
||||
builder.setConnectTimeout(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -50,7 +50,8 @@ final class GrpcSpanExporterBuilderWrapper implements TelemetryExporterBuilder<S
|
|||
|
||||
@Override
|
||||
public TelemetryExporterBuilder<SpanData> setConnectTimeout(Duration timeout) {
|
||||
throw new UnsupportedOperationException();
|
||||
builder.setConnectTimeout(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ public class UpstreamGrpcSenderProvider implements GrpcSenderProvider {
|
|||
String endpointPath,
|
||||
@Nullable Compressor compressor,
|
||||
long timeoutNanos,
|
||||
long connectTimeoutNanos,
|
||||
Supplier<Map<String, List<String>>> headersSupplier,
|
||||
@Nullable Object managedChannel,
|
||||
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
|
||||
|
|
|
|||
|
|
@ -75,6 +75,7 @@ public final class OkHttpGrpcSender<T extends Marshaler> implements GrpcSender<T
|
|||
String endpoint,
|
||||
@Nullable Compressor compressor,
|
||||
long timeoutNanos,
|
||||
long connectTimeoutNanos,
|
||||
Supplier<Map<String, List<String>>> headersSupplier,
|
||||
@Nullable RetryPolicy retryPolicy,
|
||||
@Nullable SSLContext sslContext,
|
||||
|
|
@ -82,7 +83,8 @@ public final class OkHttpGrpcSender<T extends Marshaler> implements GrpcSender<T
|
|||
OkHttpClient.Builder clientBuilder =
|
||||
new OkHttpClient.Builder()
|
||||
.dispatcher(OkHttpUtil.newDispatcher())
|
||||
.callTimeout(Duration.ofNanos(timeoutNanos));
|
||||
.callTimeout(Duration.ofNanos(timeoutNanos))
|
||||
.connectTimeout(Duration.ofNanos(connectTimeoutNanos));
|
||||
if (retryPolicy != null) {
|
||||
clientBuilder.addInterceptor(
|
||||
new RetryInterceptor(retryPolicy, OkHttpGrpcSender::isRetryable));
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ public class OkHttpGrpcSenderProvider implements GrpcSenderProvider {
|
|||
String endpointPath,
|
||||
@Nullable Compressor compressor,
|
||||
long timeoutNanos,
|
||||
long connectTimeoutNanos,
|
||||
Supplier<Map<String, List<String>>> headersSupplier,
|
||||
@Nullable Object managedChannel,
|
||||
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
|
||||
|
|
@ -45,6 +46,7 @@ public class OkHttpGrpcSenderProvider implements GrpcSenderProvider {
|
|||
endpoint.resolve(endpointPath).toString(),
|
||||
compressor,
|
||||
timeoutNanos,
|
||||
connectTimeoutNanos,
|
||||
headersSupplier,
|
||||
retryPolicy,
|
||||
sslContext,
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ class OkHttpGrpcSuppressionTest
|
|||
@Override
|
||||
OkHttpGrpcSender<DummyMarshaler> createSender(String endpoint) {
|
||||
return new OkHttpGrpcSender<>(
|
||||
"https://localhost", null, 10L, Collections::emptyMap, null, null, null);
|
||||
"https://localhost", null, 10L, 10L, Collections::emptyMap, null, null, null);
|
||||
}
|
||||
|
||||
protected static class DummyMarshaler extends MarshalerWithSize {
|
||||
|
|
|
|||
Loading…
Reference in New Issue