From b4ed53211bb6f893bed9dbde119847d180638862 Mon Sep 17 00:00:00 2001 From: jack-berg <34418638+jack-berg@users.noreply.github.com> Date: Wed, 3 Jan 2024 17:06:47 -0600 Subject: [PATCH] Propagate serialization IOException instead of rethrowing as runtime (#6082) Co-authored-by: Ricardo Mestre --- .../exporter/internal/http/HttpExporter.java | 23 ++--------- .../internal/http/HttpExporterBuilder.java | 1 + .../exporter/internal/http/HttpSender.java | 4 +- .../internal/http/HttpSenderProvider.java | 1 + exporters/sender/jdk/build.gradle.kts | 2 + .../sender/jdk/internal/JdkHttpSender.java | 25 +++++++++--- .../jdk/internal/JdkHttpSenderProvider.java | 2 + .../jdk/internal/JdkHttpSenderTest.java | 21 ++++++++-- exporters/sender/okhttp/build.gradle.kts | 1 + .../okhttp/internal/OkHttpHttpSender.java | 23 +++++++---- .../internal/OkHttpHttpSenderProvider.java | 2 + .../internal/AuthenticatingExporterTest.java | 1 + .../internal/OkHttpHttpSuppressionTest.java | 38 ++++++++++++------- 13 files changed, 94 insertions(+), 50 deletions(-) diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java index f7af7150f7..befb579d96 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporter.java @@ -12,9 +12,7 @@ import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.internal.ThrottlingLogger; import java.io.IOException; -import java.io.OutputStream; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -37,7 +35,6 @@ public final class HttpExporter { private final String type; private final HttpSender httpSender; private final ExporterMetrics exporterMetrics; - private final boolean exportAsJson; public HttpExporter( String exporterName, @@ -51,7 +48,6 @@ public final class HttpExporter { exportAsJson ? ExporterMetrics.createHttpJson(exporterName, type, meterProviderSupplier) : ExporterMetrics.createHttpProtobuf(exporterName, type, meterProviderSupplier); - this.exportAsJson = exportAsJson; } public CompletableResultCode export(T exportRequest, int numItems) { @@ -63,21 +59,8 @@ public final class HttpExporter { CompletableResultCode result = new CompletableResultCode(); - Consumer marshaler = - os -> { - try { - if (exportAsJson) { - exportRequest.writeJsonTo(os); - } else { - exportRequest.writeBinaryTo(os); - } - } catch (IOException e) { - throw new IllegalStateException(e); - } - }; - httpSender.send( - marshaler, + exportRequest, exportRequest.getBinarySerializedSize(), httpResponse -> { int statusCode = httpResponse.statusCode(); @@ -90,11 +73,11 @@ public final class HttpExporter { exporterMetrics.addFailed(numItems); - byte[] body; + byte[] body = null; try { body = httpResponse.responseBody(); } catch (IOException ex) { - throw new IllegalStateException(ex); + logger.log(Level.FINE, "Unable to obtain response body", ex); } String status = extractErrorStatus(httpResponse.statusMessage(), body); diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java index 2285cc9772..3b8275e252 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpExporterBuilder.java @@ -182,6 +182,7 @@ public final class HttpExporterBuilder { httpSenderProvider.createSender( endpoint, compressor, + exportAsJson, exportAsJson ? "application/json" : "application/x-protobuf", timeoutNanos, connectTimeoutNanos, diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSender.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSender.java index f7e21cb781..dc5c775a53 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSender.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSender.java @@ -5,9 +5,9 @@ package io.opentelemetry.exporter.internal.http; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.CompletableResultCode; import java.io.IOException; -import java.io.OutputStream; import java.util.function.Consumer; /** @@ -33,7 +33,7 @@ public interface HttpSender { * @param onError the callback to invoke when the HTTP request could not be executed */ void send( - Consumer marshaler, + Marshaler marshaler, int contentLength, Consumer onResponse, Consumer onError); diff --git a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSenderProvider.java b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSenderProvider.java index 20d4322ae6..90259734d1 100644 --- a/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSenderProvider.java +++ b/exporters/common/src/main/java/io/opentelemetry/exporter/internal/http/HttpSenderProvider.java @@ -29,6 +29,7 @@ public interface HttpSenderProvider { HttpSender createSender( String endpoint, @Nullable Compressor compressor, + boolean exportAsJson, String contentType, long timeoutNanos, long connectTimeout, diff --git a/exporters/sender/jdk/build.gradle.kts b/exporters/sender/jdk/build.gradle.kts index 80fb0f870d..13f77b1482 100644 --- a/exporters/sender/jdk/build.gradle.kts +++ b/exporters/sender/jdk/build.gradle.kts @@ -9,6 +9,8 @@ otelJava.moduleName.set("io.opentelemetry.exporter.sender.jdk.internal") dependencies { implementation(project(":exporters:common")) implementation(project(":sdk:common")) + + compileOnly("com.fasterxml.jackson.core:jackson-core") } tasks { diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index 73d895a488..838e1d5e20 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -7,11 +7,13 @@ package io.opentelemetry.exporter.sender.jdk.internal; import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.http.HttpSender; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.net.URI; import java.net.URISyntaxException; import java.net.http.HttpClient; @@ -53,6 +55,7 @@ public final class JdkHttpSender implements HttpSender { private final HttpClient client; private final URI uri; @Nullable private final Compressor compressor; + private final boolean exportAsJson; private final String contentType; private final long timeoutNanos; private final Supplier>> headerSupplier; @@ -63,6 +66,7 @@ public final class JdkHttpSender implements HttpSender { HttpClient client, String endpoint, @Nullable Compressor compressor, + boolean exportAsJson, String contentType, long timeoutNanos, Supplier>> headerSupplier, @@ -74,6 +78,7 @@ public final class JdkHttpSender implements HttpSender { throw new IllegalArgumentException(e); } this.compressor = compressor; + this.exportAsJson = exportAsJson; this.contentType = contentType; this.timeoutNanos = timeoutNanos; this.headerSupplier = headerSupplier; @@ -83,6 +88,7 @@ public final class JdkHttpSender implements HttpSender { JdkHttpSender( String endpoint, @Nullable Compressor compressor, + boolean exportAsJson, String contentType, long timeoutNanos, long connectTimeoutNanos, @@ -93,6 +99,7 @@ public final class JdkHttpSender implements HttpSender { configureClient(sslContext, connectTimeoutNanos), endpoint, compressor, + exportAsJson, contentType, timeoutNanos, headerSupplier, @@ -111,7 +118,7 @@ public final class JdkHttpSender implements HttpSender { @Override public void send( - Consumer marshaler, + Marshaler marshaler, int contentLength, Consumer onResponse, Consumer onError) { @@ -121,7 +128,7 @@ public final class JdkHttpSender implements HttpSender { try { return sendInternal(marshaler); } catch (IOException e) { - throw new IllegalStateException(e); + throw new UncheckedIOException(e); } }, executorService) @@ -136,7 +143,7 @@ public final class JdkHttpSender implements HttpSender { } // Visible for testing - HttpResponse sendInternal(Consumer marshaler) throws IOException { + HttpResponse sendInternal(Marshaler marshaler) throws IOException { long startTimeNanos = System.nanoTime(); HttpRequest.Builder requestBuilder = HttpRequest.newBuilder().uri(uri).timeout(Duration.ofNanos(timeoutNanos)); @@ -151,12 +158,12 @@ public final class JdkHttpSender implements HttpSender { if (compressor != null) { requestBuilder.header("Content-Encoding", compressor.getEncoding()); try (OutputStream compressed = compressor.compress(os)) { - marshaler.accept(compressed); + write(marshaler, compressed); } catch (IOException e) { throw new IllegalStateException(e); } } else { - marshaler.accept(os); + write(marshaler, os); } ByteBufferPool byteBufferPool = threadLocalByteBufPool.get(); @@ -211,6 +218,14 @@ public final class JdkHttpSender implements HttpSender { throw exception; } + private void write(Marshaler marshaler, OutputStream os) throws IOException { + if (exportAsJson) { + marshaler.writeJsonTo(os); + } else { + marshaler.writeBinaryTo(os); + } + } + private HttpResponse sendRequest( HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) throws IOException { try { diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java index 1891cf27c8..c4c6bf071c 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderProvider.java @@ -29,6 +29,7 @@ public final class JdkHttpSenderProvider implements HttpSenderProvider { public HttpSender createSender( String endpoint, @Nullable Compressor compressor, + boolean exportAsJson, String contentType, long timeoutNanos, long connectTimeout, @@ -40,6 +41,7 @@ public final class JdkHttpSenderProvider implements HttpSenderProvider { return new JdkHttpSender( endpoint, compressor, + exportAsJson, contentType, timeoutNanos, connectTimeout, diff --git a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java index 1e2ee93986..e4ce06f466 100644 --- a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java +++ b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java @@ -14,6 +14,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.marshal.Serializer; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.io.IOException; import java.net.http.HttpClient; @@ -54,6 +56,7 @@ class JdkHttpSenderTest { "http://10.255.255.1", // Connecting to a non-routable IP address to trigger connection // timeout null, + false, "text/plain", Duration.ofSeconds(10).toNanos(), Collections::emptyMap, @@ -65,7 +68,7 @@ class JdkHttpSenderTest { @Test void sendInternal_RetryableConnectTimeoutException() throws IOException, InterruptedException { - assertThatThrownBy(() -> sender.sendInternal(marshaler -> {})) + assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler())) .isInstanceOf(HttpConnectTimeoutException.class); verify(mockHttpClient, times(2)).send(any(), any()); @@ -75,7 +78,7 @@ class JdkHttpSenderTest { void sendInternal_RetryableIoException() throws IOException, InterruptedException { doThrow(new IOException("error!")).when(mockHttpClient).send(any(), any()); - assertThatThrownBy(() -> sender.sendInternal(marshaler -> {})) + assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler())) .isInstanceOf(IOException.class) .hasMessage("error!"); @@ -86,7 +89,7 @@ class JdkHttpSenderTest { void sendInternal_NonRetryableException() throws IOException, InterruptedException { doThrow(new SSLException("unknown error")).when(mockHttpClient).send(any(), any()); - assertThatThrownBy(() -> sender.sendInternal(marshaler -> {})) + assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler())) .isInstanceOf(IOException.class) .hasMessage("unknown error"); @@ -99,6 +102,7 @@ class JdkHttpSenderTest { new JdkHttpSender( "http://localhost", null, + false, "text/plain", 1, TimeUnit.SECONDS.toNanos(10), @@ -112,4 +116,15 @@ class JdkHttpSenderTest { httpClient -> assertThat(httpClient.connectTimeout().get()).isEqualTo(Duration.ofSeconds(10))); } + + private static class NoOpMarshaler extends Marshaler { + + @Override + public int getBinarySerializedSize() { + return 0; + } + + @Override + protected void writeTo(Serializer output) {} + } } diff --git a/exporters/sender/okhttp/build.gradle.kts b/exporters/sender/okhttp/build.gradle.kts index ef2c6c59c5..107270e9ad 100644 --- a/exporters/sender/okhttp/build.gradle.kts +++ b/exporters/sender/okhttp/build.gradle.kts @@ -15,6 +15,7 @@ dependencies { implementation("com.squareup.okhttp3:okhttp") compileOnly("io.grpc:grpc-stub") + compileOnly("com.fasterxml.jackson.core:jackson-core") testImplementation("com.linecorp.armeria:armeria-junit5") } diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java index 7724416c7f..656b9258c2 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSender.java @@ -10,10 +10,10 @@ import io.opentelemetry.exporter.internal.RetryUtil; import io.opentelemetry.exporter.internal.auth.Authenticator; import io.opentelemetry.exporter.internal.compression.Compressor; import io.opentelemetry.exporter.internal.http.HttpSender; +import io.opentelemetry.exporter.internal.marshal.Marshaler; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.export.RetryPolicy; import java.io.IOException; -import java.io.OutputStream; import java.time.Duration; import java.util.List; import java.util.Map; @@ -44,6 +44,7 @@ public final class OkHttpHttpSender implements HttpSender { private final OkHttpClient client; private final HttpUrl url; @Nullable private final Compressor compressor; + private final boolean exportAsJson; private final Supplier>> headerSupplier; private final MediaType mediaType; @@ -52,6 +53,7 @@ public final class OkHttpHttpSender implements HttpSender { public OkHttpHttpSender( String endpoint, @Nullable Compressor compressor, + boolean exportAsJson, String contentType, long timeoutNanos, long connectionTimeoutNanos, @@ -86,13 +88,14 @@ public final class OkHttpHttpSender implements HttpSender { this.client = builder.build(); this.url = HttpUrl.get(endpoint); this.compressor = compressor; + this.exportAsJson = exportAsJson; this.mediaType = MediaType.parse(contentType); this.headerSupplier = headerSupplier; } @Override public void send( - Consumer marshaler, + Marshaler marshaler, int contentLength, Consumer onResponse, Consumer onError) { @@ -103,7 +106,7 @@ public final class OkHttpHttpSender implements HttpSender { headers.forEach( (key, values) -> values.forEach(value -> requestBuilder.addHeader(key, value))); } - RequestBody body = new RawRequestBody(marshaler, contentLength, mediaType); + RequestBody body = new RawRequestBody(marshaler, exportAsJson, contentLength, mediaType); if (compressor != null) { requestBuilder.addHeader("Content-Encoding", compressor.getEncoding()); requestBuilder.post(new CompressedRequestBody(compressor, body)); @@ -161,13 +164,15 @@ public final class OkHttpHttpSender implements HttpSender { private static class RawRequestBody extends RequestBody { - private final Consumer marshaler; + private final Marshaler marshaler; + private final boolean exportAsJson; private final int contentLength; private final MediaType mediaType; private RawRequestBody( - Consumer marshaler, int contentLength, MediaType mediaType) { + Marshaler marshaler, boolean exportAsJson, int contentLength, MediaType mediaType) { this.marshaler = marshaler; + this.exportAsJson = exportAsJson; this.contentLength = contentLength; this.mediaType = mediaType; } @@ -183,8 +188,12 @@ public final class OkHttpHttpSender implements HttpSender { } @Override - public void writeTo(BufferedSink bufferedSink) { - marshaler.accept(bufferedSink.outputStream()); + public void writeTo(BufferedSink bufferedSink) throws IOException { + if (exportAsJson) { + marshaler.writeJsonTo(bufferedSink.outputStream()); + } else { + marshaler.writeBinaryTo(bufferedSink.outputStream()); + } } } diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java index d969a1ee23..3acad86d03 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSenderProvider.java @@ -29,6 +29,7 @@ public final class OkHttpHttpSenderProvider implements HttpSenderProvider { public HttpSender createSender( String endpoint, @Nullable Compressor compressor, + boolean exportAsJson, String contentType, long timeoutNanos, long connectTimeout, @@ -40,6 +41,7 @@ public final class OkHttpHttpSenderProvider implements HttpSenderProvider { return new OkHttpHttpSender( endpoint, compressor, + exportAsJson, contentType, timeoutNanos, connectTimeout, diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/AuthenticatingExporterTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/AuthenticatingExporterTest.java index e1c948f550..a028d9c085 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/AuthenticatingExporterTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/AuthenticatingExporterTest.java @@ -76,6 +76,7 @@ class AuthenticatingExporterTest { server.enqueue(HttpResponse.of(HttpStatus.UNAUTHORIZED)); return Collections.emptyMap(); }) + .exportAsJson() .build(); server.enqueue(HttpResponse.of(HttpStatus.UNAUTHORIZED)); assertThat(exporter.export(marshaler, 0).join(1, TimeUnit.MINUTES).isSuccess()).isFalse(); diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java index 0227826d4c..046dbee521 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/OkHttpHttpSuppressionTest.java @@ -5,35 +5,47 @@ package io.opentelemetry.exporter.sender.okhttp.internal; +import io.opentelemetry.exporter.internal.marshal.Marshaler; +import io.opentelemetry.exporter.internal.marshal.ProtoFieldInfo; +import io.opentelemetry.exporter.internal.marshal.Serializer; import java.io.IOException; -import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Collections; -import java.util.function.Consumer; class OkHttpHttpSuppressionTest extends AbstractOkHttpSuppressionTest { @Override void send(OkHttpHttpSender sender, Runnable onSuccess, Runnable onFailure) { byte[] content = "A".getBytes(StandardCharsets.UTF_8); - Consumer outputStreamConsumer = - outputStream -> { - try { - outputStream.write(content); - } catch (IOException e) { - throw new RuntimeException(e); + Marshaler marshaler = + new Marshaler() { + @Override + public int getBinarySerializedSize() { + return content.length; + } + + @Override + protected void writeTo(Serializer output) throws IOException { + output.serializeBytes(ProtoFieldInfo.create(1, 1, "field"), content); } }; sender.send( - outputStreamConsumer, - content.length, - (response) -> onSuccess.run(), - (error) -> onFailure.run()); + marshaler, content.length, (response) -> onSuccess.run(), (error) -> onFailure.run()); } @Override OkHttpHttpSender createSender(String endpoint) { return new OkHttpHttpSender( - endpoint, null, "text/plain", 10L, 10L, Collections::emptyMap, null, null, null, null); + endpoint, + null, + false, + "text/plain", + 10L, + 10L, + Collections::emptyMap, + null, + null, + null, + null); } }