Propagate serialization IOException instead of rethrowing as runtime (#6082)

Co-authored-by: Ricardo Mestre <ricardom57@hotmail.com>
This commit is contained in:
jack-berg 2024-01-03 17:06:47 -06:00 committed by GitHub
parent 63fe7084d2
commit b4ed53211b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 94 additions and 50 deletions

View File

@ -12,9 +12,7 @@ import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -37,7 +35,6 @@ public final class HttpExporter<T extends Marshaler> {
private final String type;
private final HttpSender httpSender;
private final ExporterMetrics exporterMetrics;
private final boolean exportAsJson;
public HttpExporter(
String exporterName,
@ -51,7 +48,6 @@ public final class HttpExporter<T extends Marshaler> {
exportAsJson
? ExporterMetrics.createHttpJson(exporterName, type, meterProviderSupplier)
: ExporterMetrics.createHttpProtobuf(exporterName, type, meterProviderSupplier);
this.exportAsJson = exportAsJson;
}
public CompletableResultCode export(T exportRequest, int numItems) {
@ -63,21 +59,8 @@ public final class HttpExporter<T extends Marshaler> {
CompletableResultCode result = new CompletableResultCode();
Consumer<OutputStream> marshaler =
os -> {
try {
if (exportAsJson) {
exportRequest.writeJsonTo(os);
} else {
exportRequest.writeBinaryTo(os);
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
};
httpSender.send(
marshaler,
exportRequest,
exportRequest.getBinarySerializedSize(),
httpResponse -> {
int statusCode = httpResponse.statusCode();
@ -90,11 +73,11 @@ public final class HttpExporter<T extends Marshaler> {
exporterMetrics.addFailed(numItems);
byte[] body;
byte[] body = null;
try {
body = httpResponse.responseBody();
} catch (IOException ex) {
throw new IllegalStateException(ex);
logger.log(Level.FINE, "Unable to obtain response body", ex);
}
String status = extractErrorStatus(httpResponse.statusMessage(), body);

View File

@ -182,6 +182,7 @@ public final class HttpExporterBuilder<T extends Marshaler> {
httpSenderProvider.createSender(
endpoint,
compressor,
exportAsJson,
exportAsJson ? "application/json" : "application/x-protobuf",
timeoutNanos,
connectTimeoutNanos,

View File

@ -5,9 +5,9 @@
package io.opentelemetry.exporter.internal.http;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Consumer;
/**
@ -33,7 +33,7 @@ public interface HttpSender {
* @param onError the callback to invoke when the HTTP request could not be executed
*/
void send(
Consumer<OutputStream> marshaler,
Marshaler marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Throwable> onError);

View File

@ -29,6 +29,7 @@ public interface HttpSenderProvider {
HttpSender createSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeout,

View File

@ -9,6 +9,8 @@ otelJava.moduleName.set("io.opentelemetry.exporter.sender.jdk.internal")
dependencies {
implementation(project(":exporters:common"))
implementation(project(":sdk:common"))
compileOnly("com.fasterxml.jackson.core:jackson-core")
}
tasks {

View File

@ -7,11 +7,13 @@ package io.opentelemetry.exporter.sender.jdk.internal;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.http.HttpSender;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
@ -53,6 +55,7 @@ public final class JdkHttpSender implements HttpSender {
private final HttpClient client;
private final URI uri;
@Nullable private final Compressor compressor;
private final boolean exportAsJson;
private final String contentType;
private final long timeoutNanos;
private final Supplier<Map<String, List<String>>> headerSupplier;
@ -63,6 +66,7 @@ public final class JdkHttpSender implements HttpSender {
HttpClient client,
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
Supplier<Map<String, List<String>>> headerSupplier,
@ -74,6 +78,7 @@ public final class JdkHttpSender implements HttpSender {
throw new IllegalArgumentException(e);
}
this.compressor = compressor;
this.exportAsJson = exportAsJson;
this.contentType = contentType;
this.timeoutNanos = timeoutNanos;
this.headerSupplier = headerSupplier;
@ -83,6 +88,7 @@ public final class JdkHttpSender implements HttpSender {
JdkHttpSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeoutNanos,
@ -93,6 +99,7 @@ public final class JdkHttpSender implements HttpSender {
configureClient(sslContext, connectTimeoutNanos),
endpoint,
compressor,
exportAsJson,
contentType,
timeoutNanos,
headerSupplier,
@ -111,7 +118,7 @@ public final class JdkHttpSender implements HttpSender {
@Override
public void send(
Consumer<OutputStream> marshaler,
Marshaler marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Throwable> onError) {
@ -121,7 +128,7 @@ public final class JdkHttpSender implements HttpSender {
try {
return sendInternal(marshaler);
} catch (IOException e) {
throw new IllegalStateException(e);
throw new UncheckedIOException(e);
}
},
executorService)
@ -136,7 +143,7 @@ public final class JdkHttpSender implements HttpSender {
}
// Visible for testing
HttpResponse<byte[]> sendInternal(Consumer<OutputStream> marshaler) throws IOException {
HttpResponse<byte[]> sendInternal(Marshaler marshaler) throws IOException {
long startTimeNanos = System.nanoTime();
HttpRequest.Builder requestBuilder =
HttpRequest.newBuilder().uri(uri).timeout(Duration.ofNanos(timeoutNanos));
@ -151,12 +158,12 @@ public final class JdkHttpSender implements HttpSender {
if (compressor != null) {
requestBuilder.header("Content-Encoding", compressor.getEncoding());
try (OutputStream compressed = compressor.compress(os)) {
marshaler.accept(compressed);
write(marshaler, compressed);
} catch (IOException e) {
throw new IllegalStateException(e);
}
} else {
marshaler.accept(os);
write(marshaler, os);
}
ByteBufferPool byteBufferPool = threadLocalByteBufPool.get();
@ -211,6 +218,14 @@ public final class JdkHttpSender implements HttpSender {
throw exception;
}
private void write(Marshaler marshaler, OutputStream os) throws IOException {
if (exportAsJson) {
marshaler.writeJsonTo(os);
} else {
marshaler.writeBinaryTo(os);
}
}
private HttpResponse<byte[]> sendRequest(
HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) throws IOException {
try {

View File

@ -29,6 +29,7 @@ public final class JdkHttpSenderProvider implements HttpSenderProvider {
public HttpSender createSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeout,
@ -40,6 +41,7 @@ public final class JdkHttpSenderProvider implements HttpSenderProvider {
return new JdkHttpSender(
endpoint,
compressor,
exportAsJson,
contentType,
timeoutNanos,
connectTimeout,

View File

@ -14,6 +14,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.http.HttpClient;
@ -54,6 +56,7 @@ class JdkHttpSenderTest {
"http://10.255.255.1", // Connecting to a non-routable IP address to trigger connection
// timeout
null,
false,
"text/plain",
Duration.ofSeconds(10).toNanos(),
Collections::emptyMap,
@ -65,7 +68,7 @@ class JdkHttpSenderTest {
@Test
void sendInternal_RetryableConnectTimeoutException() throws IOException, InterruptedException {
assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.isInstanceOf(HttpConnectTimeoutException.class);
verify(mockHttpClient, times(2)).send(any(), any());
@ -75,7 +78,7 @@ class JdkHttpSenderTest {
void sendInternal_RetryableIoException() throws IOException, InterruptedException {
doThrow(new IOException("error!")).when(mockHttpClient).send(any(), any());
assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.isInstanceOf(IOException.class)
.hasMessage("error!");
@ -86,7 +89,7 @@ class JdkHttpSenderTest {
void sendInternal_NonRetryableException() throws IOException, InterruptedException {
doThrow(new SSLException("unknown error")).when(mockHttpClient).send(any(), any());
assertThatThrownBy(() -> sender.sendInternal(marshaler -> {}))
assertThatThrownBy(() -> sender.sendInternal(new NoOpMarshaler()))
.isInstanceOf(IOException.class)
.hasMessage("unknown error");
@ -99,6 +102,7 @@ class JdkHttpSenderTest {
new JdkHttpSender(
"http://localhost",
null,
false,
"text/plain",
1,
TimeUnit.SECONDS.toNanos(10),
@ -112,4 +116,15 @@ class JdkHttpSenderTest {
httpClient ->
assertThat(httpClient.connectTimeout().get()).isEqualTo(Duration.ofSeconds(10)));
}
private static class NoOpMarshaler extends Marshaler {
@Override
public int getBinarySerializedSize() {
return 0;
}
@Override
protected void writeTo(Serializer output) {}
}
}

View File

@ -15,6 +15,7 @@ dependencies {
implementation("com.squareup.okhttp3:okhttp")
compileOnly("io.grpc:grpc-stub")
compileOnly("com.fasterxml.jackson.core:jackson-core")
testImplementation("com.linecorp.armeria:armeria-junit5")
}

View File

@ -10,10 +10,10 @@ import io.opentelemetry.exporter.internal.RetryUtil;
import io.opentelemetry.exporter.internal.auth.Authenticator;
import io.opentelemetry.exporter.internal.compression.Compressor;
import io.opentelemetry.exporter.internal.http.HttpSender;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.io.OutputStream;
import java.time.Duration;
import java.util.List;
import java.util.Map;
@ -44,6 +44,7 @@ public final class OkHttpHttpSender implements HttpSender {
private final OkHttpClient client;
private final HttpUrl url;
@Nullable private final Compressor compressor;
private final boolean exportAsJson;
private final Supplier<Map<String, List<String>>> headerSupplier;
private final MediaType mediaType;
@ -52,6 +53,7 @@ public final class OkHttpHttpSender implements HttpSender {
public OkHttpHttpSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectionTimeoutNanos,
@ -86,13 +88,14 @@ public final class OkHttpHttpSender implements HttpSender {
this.client = builder.build();
this.url = HttpUrl.get(endpoint);
this.compressor = compressor;
this.exportAsJson = exportAsJson;
this.mediaType = MediaType.parse(contentType);
this.headerSupplier = headerSupplier;
}
@Override
public void send(
Consumer<OutputStream> marshaler,
Marshaler marshaler,
int contentLength,
Consumer<Response> onResponse,
Consumer<Throwable> onError) {
@ -103,7 +106,7 @@ public final class OkHttpHttpSender implements HttpSender {
headers.forEach(
(key, values) -> values.forEach(value -> requestBuilder.addHeader(key, value)));
}
RequestBody body = new RawRequestBody(marshaler, contentLength, mediaType);
RequestBody body = new RawRequestBody(marshaler, exportAsJson, contentLength, mediaType);
if (compressor != null) {
requestBuilder.addHeader("Content-Encoding", compressor.getEncoding());
requestBuilder.post(new CompressedRequestBody(compressor, body));
@ -161,13 +164,15 @@ public final class OkHttpHttpSender implements HttpSender {
private static class RawRequestBody extends RequestBody {
private final Consumer<OutputStream> marshaler;
private final Marshaler marshaler;
private final boolean exportAsJson;
private final int contentLength;
private final MediaType mediaType;
private RawRequestBody(
Consumer<OutputStream> marshaler, int contentLength, MediaType mediaType) {
Marshaler marshaler, boolean exportAsJson, int contentLength, MediaType mediaType) {
this.marshaler = marshaler;
this.exportAsJson = exportAsJson;
this.contentLength = contentLength;
this.mediaType = mediaType;
}
@ -183,8 +188,12 @@ public final class OkHttpHttpSender implements HttpSender {
}
@Override
public void writeTo(BufferedSink bufferedSink) {
marshaler.accept(bufferedSink.outputStream());
public void writeTo(BufferedSink bufferedSink) throws IOException {
if (exportAsJson) {
marshaler.writeJsonTo(bufferedSink.outputStream());
} else {
marshaler.writeBinaryTo(bufferedSink.outputStream());
}
}
}

View File

@ -29,6 +29,7 @@ public final class OkHttpHttpSenderProvider implements HttpSenderProvider {
public HttpSender createSender(
String endpoint,
@Nullable Compressor compressor,
boolean exportAsJson,
String contentType,
long timeoutNanos,
long connectTimeout,
@ -40,6 +41,7 @@ public final class OkHttpHttpSenderProvider implements HttpSenderProvider {
return new OkHttpHttpSender(
endpoint,
compressor,
exportAsJson,
contentType,
timeoutNanos,
connectTimeout,

View File

@ -76,6 +76,7 @@ class AuthenticatingExporterTest {
server.enqueue(HttpResponse.of(HttpStatus.UNAUTHORIZED));
return Collections.emptyMap();
})
.exportAsJson()
.build();
server.enqueue(HttpResponse.of(HttpStatus.UNAUTHORIZED));
assertThat(exporter.export(marshaler, 0).join(1, TimeUnit.MINUTES).isSuccess()).isFalse();

View File

@ -5,35 +5,47 @@
package io.opentelemetry.exporter.sender.okhttp.internal;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.marshal.ProtoFieldInfo;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.function.Consumer;
class OkHttpHttpSuppressionTest extends AbstractOkHttpSuppressionTest<OkHttpHttpSender> {
@Override
void send(OkHttpHttpSender sender, Runnable onSuccess, Runnable onFailure) {
byte[] content = "A".getBytes(StandardCharsets.UTF_8);
Consumer<OutputStream> outputStreamConsumer =
outputStream -> {
try {
outputStream.write(content);
} catch (IOException e) {
throw new RuntimeException(e);
Marshaler marshaler =
new Marshaler() {
@Override
public int getBinarySerializedSize() {
return content.length;
}
@Override
protected void writeTo(Serializer output) throws IOException {
output.serializeBytes(ProtoFieldInfo.create(1, 1, "field"), content);
}
};
sender.send(
outputStreamConsumer,
content.length,
(response) -> onSuccess.run(),
(error) -> onFailure.run());
marshaler, content.length, (response) -> onSuccess.run(), (error) -> onFailure.run());
}
@Override
OkHttpHttpSender createSender(String endpoint) {
return new OkHttpHttpSender(
endpoint, null, "text/plain", 10L, 10L, Collections::emptyMap, null, null, null, null);
endpoint,
null,
false,
"text/plain",
10L,
10L,
Collections::emptyMap,
null,
null,
null,
null);
}
}