Add experimental JdkHttpSender (#5557)

This commit is contained in:
jack-berg 2023-07-06 15:42:08 -05:00 committed by GitHub
parent cb4d13de47
commit aab7159327
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1067 additions and 39 deletions

View File

@ -8,10 +8,11 @@ otelJava.moduleName.set("io.opentelemetry.all")
tasks {
// We don't compile much here, just some API boundary tests. This project is mostly for
// aggregating jacoco reports and it doesn't work if this isn't at least as high as the
// highest supported Java version in any of our projects. All of our projects target
// Java 8.
// highest supported Java version in any of our projects. All of our
// projects target Java 8 except :exporters:http-sender:jdk, which targets
// Java 11
withType(JavaCompile::class) {
options.release.set(8)
options.release.set(11)
}
val testJavaVersion: String? by project

View File

@ -34,3 +34,35 @@ dependencies {
testImplementation("io.grpc:grpc-testing")
testRuntimeOnly("io.grpc:grpc-netty-shaded")
}
val testJavaVersion: String? by project
testing {
suites {
register<JvmTestSuite>("testHttpSenderProvider") {
dependencies {
implementation(project(":exporters:sender:jdk"))
implementation(project(":exporters:sender:okhttp"))
}
targets {
all {
testTask {
enabled = !testJavaVersion.equals("8")
}
}
}
}
}
}
tasks {
check {
dependsOn(testing.suites)
}
}
afterEvaluate {
tasks.named<JavaCompile>("compileTestHttpSenderProviderJava") {
options.release.set(11)
}
}

View File

@ -6,6 +6,7 @@
package io.opentelemetry.exporter.internal.http;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.internal.ConfigUtil;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.TlsConfigHelper;
@ -129,29 +130,80 @@ public final class HttpExporterBuilder<T extends Marshaler> {
Map<String, String> headers = this.headers == null ? Collections.emptyMap() : this.headers;
Supplier<Map<String, String>> headerSupplier = () -> headers;
HttpSender httpSender = null;
// TODO: once we publish multiple HttpSenderProviders, log warning when multiple are found
for (HttpSenderProvider httpSenderProvider :
ServiceLoader.load(HttpSenderProvider.class, HttpExporterBuilder.class.getClassLoader())) {
httpSender =
httpSenderProvider.createSender(
endpoint,
compressionEnabled,
exportAsJson ? "application/json" : "application/x-protobuf",
timeoutNanos,
headerSupplier,
authenticator,
retryPolicy,
tlsConfigHelper.getSslContext(),
tlsConfigHelper.getTrustManager());
LOGGER.log(Level.FINE, "Using HttpSender: " + httpSender.getClass().getName());
break;
}
if (httpSender == null) {
throw new IllegalStateException(
"No HttpSenderProvider found on classpath. Please add dependency on opentelemetry-exporter-sender-okhttp");
}
HttpSenderProvider httpSenderProvider = resolveHttpSenderProvider();
HttpSender httpSender =
httpSenderProvider.createSender(
endpoint,
compressionEnabled,
exportAsJson ? "application/json" : "application/x-protobuf",
timeoutNanos,
headerSupplier,
authenticator,
retryPolicy,
tlsConfigHelper.getSslContext(),
tlsConfigHelper.getTrustManager());
LOGGER.log(Level.FINE, "Using HttpSender: " + httpSender.getClass().getName());
return new HttpExporter<>(exporterName, type, httpSender, meterProviderSupplier, exportAsJson);
}
/**
* Resolve the {@link HttpSenderProvider}.
*
* <p>If no {@link HttpSenderProvider} is available, throw {@link IllegalStateException}.
*
* <p>If only one {@link HttpSenderProvider} is available, use it.
*
* <p>If multiple are available and..
*
* <ul>
* <li>{@code io.opentelemetry.exporter.internal.http.HttpSenderProvider} is empty, use the
* first found.
* <li>{@code io.opentelemetry.exporter.internal.http.HttpSenderProvider} is set, use the
* matching provider. If none match, throw {@link IllegalStateException}.
* </ul>
*/
private static HttpSenderProvider resolveHttpSenderProvider() {
Map<String, HttpSenderProvider> httpSenderProviders = new HashMap<>();
for (HttpSenderProvider spi :
ServiceLoader.load(HttpSenderProvider.class, HttpExporterBuilder.class.getClassLoader())) {
httpSenderProviders.put(spi.getClass().getName(), spi);
}
// No provider on classpath, throw
if (httpSenderProviders.isEmpty()) {
throw new IllegalStateException(
"No HttpSenderProvider found on classpath. Please add dependency on "
+ "opentelemetry-exporter-sender-okhttp or opentelemetry-exporter-sender-jdk");
}
// Exactly one provider on classpath, use it
if (httpSenderProviders.size() == 1) {
return httpSenderProviders.values().stream().findFirst().get();
}
// If we've reached here, there are multiple HttpSenderProviders
String configuredSender =
ConfigUtil.getString("io.opentelemetry.exporter.internal.http.HttpSenderProvider", "");
// Multiple providers but none configured, use first we find and log a warning
if (configuredSender.isEmpty()) {
LOGGER.log(
Level.WARNING,
"Multiple HttpSenderProvider found. Please include only one, "
+ "or specify preference setting io.opentelemetry.exporter.internal.http.HttpSenderProvider "
+ "to the FQCN of the preferred provider.");
return httpSenderProviders.values().stream().findFirst().get();
}
// Multiple providers with configuration match, use configuration match
if (httpSenderProviders.containsKey(configuredSender)) {
return httpSenderProviders.get(configuredSender);
}
// Multiple providers, configured does not match, throw
throw new IllegalStateException(
"No HttpSenderProvider matched configured io.opentelemetry.exporter.internal.http.HttpSenderProvider: "
+ configuredSender);
}
}

View File

@ -16,6 +16,7 @@ class HttpExporterTest {
assertThatThrownBy(() -> new HttpExporterBuilder<>("name", "type", "http://localhost").build())
.isInstanceOf(IllegalStateException.class)
.hasMessage(
"No HttpSenderProvider found on classpath. Please add dependency on opentelemetry-exporter-sender-okhttp");
"No HttpSenderProvider found on classpath. Please add dependency on "
+ "opentelemetry-exporter-sender-okhttp or opentelemetry-exporter-sender-jdk");
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.internal.http;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.exporter.sender.jdk.internal.JdkHttpSender;
import io.opentelemetry.exporter.sender.okhttp.internal.OkHttpHttpSender;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junitpioneer.jupiter.SetSystemProperty;
class HttpExporterTest {
@RegisterExtension
LogCapturer logCapturer =
LogCapturer.create().captureForLogger(HttpExporterBuilder.class.getName());
@Test
void build_multipleSendersNoConfiguration() {
Assertions.assertThatCode(
() -> new HttpExporterBuilder<>("exporter", "type", "http://localhost").build())
.doesNotThrowAnyException();
logCapturer.assertContains(
"Multiple HttpSenderProvider found. Please include only one, "
+ "or specify preference setting io.opentelemetry.exporter.internal.http.HttpSenderProvider "
+ "to the FQCN of the preferred provider.");
}
@Test
@SetSystemProperty(
key = "io.opentelemetry.exporter.internal.http.HttpSenderProvider",
value = "io.opentelemetry.exporter.sender.jdk.internal.JdkHttpSenderProvider")
void build_multipleSendersWithJdk() {
assertThat(new HttpExporterBuilder<>("exporter", "type", "http://localhost").build())
.extracting("httpSender")
.isInstanceOf(JdkHttpSender.class);
assertThat(logCapturer.getEvents()).isEmpty();
}
@Test
@SetSystemProperty(
key = "io.opentelemetry.exporter.internal.http.HttpSenderProvider",
value = "io.opentelemetry.exporter.sender.okhttp.internal.OkHttpHttpSenderProvider")
void build_multipleSendersWithOkHttp() {
assertThat(new HttpExporterBuilder<>("exporter", "type", "http://localhost").build())
.extracting("httpSender")
.isInstanceOf(OkHttpHttpSender.class);
assertThat(logCapturer.getEvents()).isEmpty();
}
@Test
@SetSystemProperty(
key = "io.opentelemetry.exporter.internal.http.HttpSenderProvider",
value = "foo")
void build_multipleSendersNoMatch() {
assertThatThrownBy(
() -> new HttpExporterBuilder<>("exporter", "type", "http://localhost").build())
.isInstanceOf(IllegalStateException.class)
.hasMessage(
"No HttpSenderProvider matched configured io.opentelemetry.exporter.internal.http.HttpSenderProvider: foo");
assertThat(logCapturer.getEvents()).isEmpty();
}
}

View File

@ -39,6 +39,8 @@ dependencies {
jmhRuntimeOnly("io.grpc:grpc-netty")
}
val testJavaVersion: String? by project
testing {
suites {
register<JvmTestSuite>("testGrpcNetty") {
@ -65,6 +67,22 @@ testing {
implementation("io.grpc:grpc-stub")
}
}
register<JvmTestSuite>("testJdkHttpSender") {
dependencies {
implementation(project(":exporters:sender:jdk"))
implementation(project(":exporters:otlp:testing-internal"))
implementation("io.grpc:grpc-stub")
}
targets {
all {
testTask {
systemProperty("io.opentelemetry.exporter.internal.http.HttpSenderProvider", "io.opentelemetry.exporter.sender.jdk.internal.JdkHttpSenderProvider")
enabled = !testJavaVersion.equals("8")
}
}
}
}
register<JvmTestSuite>("testSpanPipeline") {
dependencies {
implementation("io.opentelemetry.proto:opentelemetry-proto")
@ -86,3 +104,9 @@ tasks {
)
}
}
afterEvaluate {
tasks.named<JavaCompile>("compileTestJdkHttpSenderJava") {
options.release.set(11)
}
}

View File

@ -21,10 +21,10 @@ import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
class OtlpHttpLogRecordExporterTest
class OtlpHttpLogRecordExporterOkHttpSenderTest
extends AbstractHttpTelemetryExporterTest<LogRecordData, ResourceLogs> {
protected OtlpHttpLogRecordExporterTest() {
protected OtlpHttpLogRecordExporterOkHttpSenderTest() {
super("log", "/v1/logs", ResourceLogs.getDefaultInstance());
}

View File

@ -31,10 +31,10 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
import org.junit.jupiter.api.Test;
class OtlpHttpMetricExporterTest
class OtlpHttpMetricExporterOkHttpSenderTest
extends AbstractHttpTelemetryExporterTest<MetricData, ResourceMetrics> {
protected OtlpHttpMetricExporterTest() {
protected OtlpHttpMetricExporterOkHttpSenderTest() {
super("metric", "/v1/metrics", ResourceMetrics.getDefaultInstance());
}

View File

@ -21,9 +21,10 @@ import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
class OtlpHttpSpanExporterTest extends AbstractHttpTelemetryExporterTest<SpanData, ResourceSpans> {
class OtlpHttpSpanExporterOkHttpSenderTest
extends AbstractHttpTelemetryExporterTest<SpanData, ResourceSpans> {
protected OtlpHttpSpanExporterTest() {
protected OtlpHttpSpanExporterOkHttpSenderTest() {
super("span", "/v1/traces", ResourceSpans.getDefaultInstance());
}

View File

@ -0,0 +1,123 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.http.logs;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.logs.ResourceLogsMarshaler;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractHttpTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.FakeTelemetryUtil;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
class OtlpHttpLogRecordExporterJdkSenderTest
extends AbstractHttpTelemetryExporterTest<LogRecordData, ResourceLogs> {
protected OtlpHttpLogRecordExporterJdkSenderTest() {
super("log", "/v1/logs", ResourceLogs.getDefaultInstance());
}
@Override
protected boolean hasAuthenticatorSupport() {
return false;
}
@Override
protected TelemetryExporterBuilder<LogRecordData> exporterBuilder() {
OtlpHttpLogRecordExporterBuilder builder = OtlpHttpLogRecordExporter.builder();
return new TelemetryExporterBuilder<>() {
@Override
public TelemetryExporterBuilder<LogRecordData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<LogRecordData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<LogRecordData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<LogRecordData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<LogRecordData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<LogRecordData> setAuthenticator(Authenticator authenticator) {
Authenticator.setAuthenticatorOnDelegate(builder, authenticator);
return this;
}
@Override
public TelemetryExporterBuilder<LogRecordData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<LogRecordData> setSslContext(
SSLContext ssLContext, X509TrustManager trustManager) {
builder.setSslContext(ssLContext, trustManager);
return this;
}
@Override
public TelemetryExporterBuilder<LogRecordData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<LogRecordData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporterBuilder<LogRecordData> setChannel(io.grpc.ManagedChannel channel) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public TelemetryExporter<LogRecordData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
}
@Override
protected LogRecordData generateFakeTelemetry() {
return FakeTelemetryUtil.generateFakeLogRecordData();
}
@Override
protected Marshaler[] toMarshalers(List<LogRecordData> telemetry) {
return ResourceLogsMarshaler.create(telemetry);
}
}

View File

@ -0,0 +1,177 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.http.metrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractHttpTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.FakeTelemetryUtil;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
import org.junit.jupiter.api.Test;
class OtlpHttpMetricExporterJdkSenderTest
extends AbstractHttpTelemetryExporterTest<MetricData, ResourceMetrics> {
protected OtlpHttpMetricExporterJdkSenderTest() {
super("metric", "/v1/metrics", ResourceMetrics.getDefaultInstance());
}
/** Test configuration specific to metric exporter. */
@Test
void validMetricConfig() {
assertThatCode(
() ->
OtlpHttpMetricExporter.builder()
.setAggregationTemporalitySelector(
AggregationTemporalitySelector.deltaPreferred()))
.doesNotThrowAnyException();
assertThat(
OtlpHttpMetricExporter.builder()
.setAggregationTemporalitySelector(AggregationTemporalitySelector.deltaPreferred())
.build()
.getAggregationTemporality(InstrumentType.COUNTER))
.isEqualTo(AggregationTemporality.DELTA);
assertThat(
OtlpHttpMetricExporter.builder()
.build()
.getAggregationTemporality(InstrumentType.COUNTER))
.isEqualTo(AggregationTemporality.CUMULATIVE);
assertThat(
OtlpHttpMetricExporter.builder()
.setDefaultAggregationSelector(
DefaultAggregationSelector.getDefault()
.with(InstrumentType.HISTOGRAM, Aggregation.drop()))
.build()
.getDefaultAggregation(InstrumentType.HISTOGRAM))
.isEqualTo(Aggregation.drop());
}
/** Test configuration specific to metric exporter. */
@Test
void invalidMetricConfig() {
assertThatThrownBy(
() -> OtlpHttpMetricExporter.builder().setAggregationTemporalitySelector(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("aggregationTemporalitySelector");
assertThatThrownBy(() -> OtlpHttpMetricExporter.builder().setDefaultAggregationSelector(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("defaultAggregationSelector");
}
@Override
protected boolean hasAuthenticatorSupport() {
return false;
}
@Override
protected TelemetryExporterBuilder<MetricData> exporterBuilder() {
OtlpHttpMetricExporterBuilder builder = OtlpHttpMetricExporter.builder();
return new TelemetryExporterBuilder<>() {
@Override
public TelemetryExporterBuilder<MetricData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setAuthenticator(Authenticator authenticator) {
Authenticator.setAuthenticatorOnDelegate(builder, authenticator);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setSslContext(
SSLContext sslContext, X509TrustManager trustManager) {
builder.setSslContext(sslContext, trustManager);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setChannel(io.grpc.ManagedChannel channel) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public TelemetryExporter<MetricData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
}
@Override
protected MetricData generateFakeTelemetry() {
return FakeTelemetryUtil.generateFakeMetricData();
}
@Override
protected Marshaler[] toMarshalers(List<MetricData> telemetry) {
return ResourceMetricsMarshaler.create(telemetry);
}
}

View File

@ -0,0 +1,131 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.http.trace;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractHttpTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.FakeTelemetryUtil;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.exporter.sender.jdk.internal.JdkHttpSender;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
class OtlpHttpSpanExporterJdkSenderTest
extends AbstractHttpTelemetryExporterTest<SpanData, ResourceSpans> {
protected OtlpHttpSpanExporterJdkSenderTest() {
super("span", "/v1/traces", ResourceSpans.getDefaultInstance());
}
@Override
protected boolean hasAuthenticatorSupport() {
return false;
}
@Override
protected TelemetryExporterBuilder<SpanData> exporterBuilder() {
OtlpHttpSpanExporterBuilder builder = OtlpHttpSpanExporter.builder();
return new TelemetryExporterBuilder<>() {
@Override
public TelemetryExporterBuilder<SpanData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setAuthenticator(Authenticator authenticator) {
Authenticator.setAuthenticatorOnDelegate(builder, authenticator);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setSslContext(
SSLContext sslContext, X509TrustManager trustManager) {
builder.setSslContext(sslContext, trustManager);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setChannel(io.grpc.ManagedChannel channel) {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public TelemetryExporter<SpanData> build() {
OtlpHttpSpanExporter exporter = builder.build();
assertThat(exporter)
.extracting("delegate")
.extracting("httpSender")
.isInstanceOf(JdkHttpSender.class);
return TelemetryExporter.wrap(exporter);
}
};
}
@Override
protected SpanData generateFakeTelemetry() {
return FakeTelemetryUtil.generateFakeSpanData();
}
@Override
protected Marshaler[] toMarshalers(List<SpanData> telemetry) {
return ResourceSpansMarshaler.create(telemetry);
}
}

View File

@ -27,7 +27,6 @@ import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.exporter.internal.TlsUtil;
import io.opentelemetry.exporter.internal.grpc.UpstreamGrpcExporter;
import io.opentelemetry.exporter.internal.http.HttpExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
@ -293,10 +292,6 @@ public abstract class AbstractHttpTelemetryExporterTest<T, U extends Message> {
void compressionWithGzip() {
TelemetryExporter<T> exporter =
exporterBuilder().setEndpoint(server.httpUri() + path).setCompression("gzip").build();
// UpstreamGrpcExporter doesn't support compression, so we skip the assertion
assumeThat(exporter.unwrap())
.extracting("delegate")
.isNotInstanceOf(UpstreamGrpcExporter.class);
assertThat(exporter.unwrap())
.extracting("delegate.httpSender.compressionEnabled")
.isEqualTo(true);
@ -345,6 +340,8 @@ public abstract class AbstractHttpTelemetryExporterTest<T, U extends Message> {
@Test
void withAuthenticator() {
assumeThat(hasAuthenticatorSupport()).isTrue();
TelemetryExporter<T> exporter =
exporterBuilder()
.setEndpoint(server.httpUri() + path)
@ -664,6 +661,11 @@ public abstract class AbstractHttpTelemetryExporterTest<T, U extends Message> {
protected abstract Marshaler[] toMarshalers(List<T> telemetry);
// TODO: remove once JdkHttpSender supports authenticator
protected boolean hasAuthenticatorSupport() {
return true;
}
private List<U> toProto(List<T> telemetry) {
return Arrays.stream(toMarshalers(telemetry))
.map(

View File

@ -0,0 +1,20 @@
plugins {
id("otel.java-conventions")
id("otel.publish-conventions")
}
description = "OpenTelemetry JDK HttpSender"
otelJava.moduleName.set("io.opentelemetry.exporter.sender.jdk.internal")
dependencies {
implementation(project(":exporters:common"))
implementation(project(":sdk:common"))
}
tasks {
withType<JavaCompile>().configureEach {
sourceCompatibility = "11"
targetCompatibility = "11"
options.release.set(11)
}
}

View File

@ -0,0 +1 @@
otel.release=alpha

View File

@ -0,0 +1,109 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.sender.jdk.internal;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.function.Supplier;
final class BodyPublisher implements HttpRequest.BodyPublisher {
private final int length;
private final byte[] content;
private final Supplier<ByteBuffer> bufSupplier;
BodyPublisher(byte[] content, int length, Supplier<ByteBuffer> bufSupplier) {
this.content = content;
this.length = length;
this.bufSupplier = bufSupplier;
}
private List<ByteBuffer> copyToBuffers() {
int offset = 0;
int length = this.length;
List<ByteBuffer> buffers = new ArrayList<>();
while (length > 0) {
ByteBuffer b = bufSupplier.get();
b.clear();
int lengthToCopy = Math.min(b.capacity(), length);
b.put(content, offset, lengthToCopy);
offset += lengthToCopy;
length -= lengthToCopy;
b.flip();
buffers.add(b);
}
return buffers;
}
@Override
public long contentLength() {
return length;
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
Subscription subscription = new Subscription(copyToBuffers(), subscriber);
subscriber.onSubscribe(subscription);
}
private static class Subscription implements Flow.Subscription {
private volatile boolean isCompleted;
private final List<ByteBuffer> buffers;
private final Flow.Subscriber<? super ByteBuffer> subscriber;
private int offset = 0;
private Subscription(List<ByteBuffer> buffers, Flow.Subscriber<? super ByteBuffer> subscriber) {
this.buffers = buffers;
this.subscriber = subscriber;
}
@Override
public void request(long n) {
if (isCompleted) {
return;
}
if (n <= 0) {
subscriber.onError(new IllegalArgumentException("Subscription request must be >= 0"));
isCompleted = true;
} else {
run(n);
}
}
@Override
public void cancel() {
isCompleted = true;
}
private synchronized void run(long requestedItems) {
if (isCompleted) {
return;
}
long count = 0;
ByteBuffer next;
while (count < requestedItems) {
int nextIndex = offset++;
if (nextIndex >= buffers.size()) {
break;
}
next = buffers.get(nextIndex);
subscriber.onNext(next);
count++;
}
if (offset >= buffers.size()) {
isCompleted = true;
subscriber.onComplete();
}
}
}
}

View File

@ -0,0 +1,232 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.sender.jdk.internal;
import io.opentelemetry.exporter.internal.http.HttpSender;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
/**
* {@link HttpSender} which is backed by JDK {@link HttpClient}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class JdkHttpSender implements HttpSender {
private static final Set<Integer> retryableStatusCodes = Set.of(429, 502, 503, 504);
private static final ThreadLocal<NoCopyByteArrayOutputStream> threadLocalBaos =
ThreadLocal.withInitial(NoCopyByteArrayOutputStream::new);
private static final ThreadLocal<ByteBufferPool> threadLocalByteBufPool =
ThreadLocal.withInitial(ByteBufferPool::new);
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
private final HttpClient client;
private final URI uri;
private final boolean compressionEnabled;
private final String contentType;
private final long timeoutNanos;
private final Supplier<Map<String, String>> headerSupplier;
@Nullable private final RetryPolicy retryPolicy;
JdkHttpSender(
String endpoint,
boolean compressionEnabled,
String contentType,
long timeoutNanos,
Supplier<Map<String, String>> headerSupplier,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext) {
HttpClient.Builder builder = HttpClient.newBuilder().executor(executorService);
if (sslContext != null) {
builder.sslContext(sslContext);
}
this.client = builder.build();
try {
this.uri = new URI(endpoint);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
this.compressionEnabled = compressionEnabled;
this.contentType = contentType;
this.timeoutNanos = timeoutNanos;
this.headerSupplier = headerSupplier;
this.retryPolicy = retryPolicy;
}
@Override
public void send(
Consumer<OutputStream> marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Throwable> onError) {
CompletableFuture<HttpResponse<byte[]>> unused =
CompletableFuture.supplyAsync(() -> sendInternal(marshaler), executorService)
.whenComplete(
(httpResponse, throwable) -> {
if (throwable != null) {
onError.accept(throwable);
return;
}
onResponse.accept(toHttpResponse(httpResponse));
});
}
private HttpResponse<byte[]> sendInternal(Consumer<OutputStream> marshaler) {
long startTimeNanos = System.nanoTime();
HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder().uri(uri).timeout(Duration.ofNanos(timeoutNanos));
headerSupplier.get().forEach(requestBuilder::setHeader);
requestBuilder.header("Content-Type", contentType);
NoCopyByteArrayOutputStream os = threadLocalBaos.get();
os.reset();
if (compressionEnabled) {
requestBuilder.header("Content-Encoding", "gzip");
try (GZIPOutputStream gzos = new GZIPOutputStream(os)) {
marshaler.accept(gzos);
} catch (IOException e) {
throw new IllegalStateException(e);
}
} else {
marshaler.accept(os);
}
ByteBufferPool byteBufferPool = threadLocalByteBufPool.get();
requestBuilder.POST(new BodyPublisher(os.buf(), os.size(), byteBufferPool::getBuffer));
// If no retry policy, short circuit
if (retryPolicy == null) {
return sendRequest(requestBuilder, byteBufferPool);
}
long attempt = 0;
long nextBackoffNanos = retryPolicy.getInitialBackoff().toNanos();
do {
requestBuilder.timeout(Duration.ofNanos(timeoutNanos - (System.nanoTime() - startTimeNanos)));
HttpResponse<byte[]> httpResponse = sendRequest(requestBuilder, byteBufferPool);
attempt++;
if (attempt >= retryPolicy.getMaxAttempts()
|| !retryableStatusCodes.contains(httpResponse.statusCode())) {
return httpResponse;
}
// Compute and sleep for backoff
long upperBoundNanos = Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos());
long backoffNanos = ThreadLocalRandom.current().nextLong(upperBoundNanos);
nextBackoffNanos = (long) (nextBackoffNanos * retryPolicy.getBackoffMultiplier());
try {
TimeUnit.NANOSECONDS.sleep(backoffNanos);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
if ((System.nanoTime() - startTimeNanos) >= timeoutNanos) {
return httpResponse;
}
} while (true);
}
private HttpResponse<byte[]> sendRequest(
HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) {
try {
return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofByteArray());
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
// TODO: is throwable retryable?
throw new IllegalStateException(e);
} finally {
byteBufferPool.resetPool();
}
}
private static class NoCopyByteArrayOutputStream extends ByteArrayOutputStream {
NoCopyByteArrayOutputStream() {
super(retryableStatusCodes.size());
}
private byte[] buf() {
return buf;
}
}
private static Response toHttpResponse(HttpResponse<byte[]> response) {
return new Response() {
@Override
public int statusCode() {
return response.statusCode();
}
@Override
public String statusMessage() {
return String.valueOf(response.statusCode());
}
@Override
public byte[] responseBody() {
return response.body();
}
};
}
private static class ByteBufferPool {
// TODO: make configurable?
private static final int BUF_SIZE = 16 * 1024;
private final ConcurrentLinkedQueue<ByteBuffer> pool = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<ByteBuffer> out = new ConcurrentLinkedQueue<>();
private ByteBuffer getBuffer() {
ByteBuffer buffer = pool.poll();
if (buffer == null) {
buffer = ByteBuffer.allocate(BUF_SIZE);
}
out.offer(buffer);
return buffer;
}
private void resetPool() {
ByteBuffer buf = out.poll();
while (buf != null) {
pool.offer(buf);
buf = out.poll();
}
}
}
@Override
public CompletableResultCode shutdown() {
executorService.shutdown();
return CompletableResultCode.ofSuccess();
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.sender.jdk.internal;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.http.HttpSender;
import io.opentelemetry.exporter.internal.http.HttpSenderProvider;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.util.Map;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager;
/**
* {@link HttpSender} SPI implementation for {@link JdkHttpSender}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class JdkHttpSenderProvider implements HttpSenderProvider {
@Override
public HttpSender createSender(
String endpoint,
boolean compressionEnabled,
String contentType,
long timeoutNanos,
Supplier<Map<String, String>> headerSupplier,
@Nullable Authenticator authenticator,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
return new JdkHttpSender(
endpoint,
compressionEnabled,
contentType,
timeoutNanos,
headerSupplier,
retryPolicy,
sslContext);
}
}

View File

@ -0,0 +1 @@
io.opentelemetry.exporter.sender.jdk.internal.JdkHttpSenderProvider

View File

@ -39,7 +39,7 @@ import okio.Okio;
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class OkHttpHttpSender implements HttpSender {
public final class OkHttpHttpSender implements HttpSender {
private final OkHttpClient client;
private final HttpUrl url;

View File

@ -21,7 +21,7 @@ import org.jetbrains.annotations.Nullable;
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class OkHttpHttpSenderProvider implements HttpSenderProvider {
public final class OkHttpHttpSenderProvider implements HttpSenderProvider {
@Override
public HttpSender createSender(

View File

@ -32,6 +32,7 @@ include(":extensions:incubator")
include(":extensions:kotlin")
include(":extensions:trace-propagators")
include(":exporters:common")
include(":exporters:sender:jdk")
include(":exporters:sender:okhttp")
include(":exporters:jaeger")
include(":exporters:jaeger-proto")