Unify compression configuration for exporters (#4775)

* Fix handling of compressionMethod `none` in GrpcExporterBuilder

* Fix handling of compressionMethod `none` in OkHttpExporterBuilder

* Add compression configuration assertions to AbstractGrpcTelemetryExporterTest

* Add compression configuration to JaegerGrpcSpanExporterBuilder

* Add compression configuration to ZipkinSpanExporterBuilder

* Specify that zipkin default compression is gzip

Co-authored-by: Jack Berg <jberg@newrelic.com>
This commit is contained in:
Donnerbart 2022-11-01 19:00:35 +01:00 committed by GitHub
parent b6c58c5d90
commit fa46f19d39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 445 additions and 9 deletions

View File

@ -1,2 +1,4 @@
Comparing source compatibility of against
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporterBuilder setCompression(java.lang.String)

View File

@ -1,2 +1,4 @@
Comparing source compatibility of against
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.zipkin.ZipkinSpanExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.zipkin.ZipkinSpanExporterBuilder setCompression(java.lang.String)

View File

@ -98,7 +98,7 @@ public class GrpcExporterBuilder<T extends Marshaler> {
}
public GrpcExporterBuilder<T> setCompression(String compressionMethod) {
this.compressionEnabled = true;
this.compressionEnabled = compressionMethod.equals("gzip");
return this;
}

View File

@ -72,9 +72,7 @@ public final class OkHttpExporterBuilder<T extends Marshaler> {
}
public OkHttpExporterBuilder<T> setCompression(String compressionMethod) {
if (compressionMethod.equals("gzip")) {
this.compressionEnabled = true;
}
this.compressionEnabled = compressionMethod.equals("gzip");
return this;
}

View File

@ -0,0 +1,183 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.internal.grpc;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import java.net.URI;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class GrpcExporterBuilderTest {
private final ManagedChannel channel = mock(ManagedChannel.class);
private GrpcExporterBuilder<Marshaler> builder;
@BeforeEach
@SuppressWarnings("unchecked")
void setUp() {
Supplier<BiFunction<Channel, String, MarshalerServiceStub<Marshaler, ?, ?>>> grpcStubFactory =
mock(Supplier.class);
when(grpcStubFactory.get())
.thenReturn((c, s) -> new TestMarshalerServiceStub(c, CallOptions.DEFAULT));
builder =
GrpcExporter.builder(
"otlp", "span", 0, URI.create("http://localhost:4317"), grpcStubFactory, "/test");
}
@Test
void compressionDefault() {
GrpcExporter<Marshaler> exporter = builder.build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpGrpcExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false));
} finally {
exporter.shutdown();
}
}
@Test
void compressionNone() {
GrpcExporter<Marshaler> exporter = builder.setCompression("none").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpGrpcExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false));
} finally {
exporter.shutdown();
}
}
@Test
void compressionGzip() {
GrpcExporter<Marshaler> exporter = builder.setCompression("gzip").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpGrpcExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(true));
} finally {
exporter.shutdown();
}
}
@Test
void compressionEnabledAndDisabled() {
GrpcExporter<Marshaler> exporter =
builder.setCompression("gzip").setCompression("none").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpGrpcExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false));
} finally {
exporter.shutdown();
}
}
@Test
void compressionDefaultWithChannel() {
GrpcExporter<Marshaler> exporter = builder.setChannel(channel).build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
UpstreamGrpcExporter.class,
otlp ->
assertThat(otlp)
.extracting("stub")
.extracting("callOptions.compressorName")
.isEqualTo(Codec.Identity.NONE.getMessageEncoding()));
} finally {
exporter.shutdown();
}
}
@Test
void compressionNoneWithChannel() {
GrpcExporter<Marshaler> exporter = builder.setChannel(channel).setCompression("none").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
UpstreamGrpcExporter.class,
otlp ->
assertThat(otlp)
.extracting("stub")
.extracting("callOptions.compressorName")
.isEqualTo(Codec.Identity.NONE.getMessageEncoding()));
} finally {
exporter.shutdown();
}
}
@Test
void compressionGzipWithChannel() {
GrpcExporter<Marshaler> exporter = builder.setChannel(channel).setCompression("gzip").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
UpstreamGrpcExporter.class,
otlp ->
assertThat(otlp)
.extracting("stub")
.extracting("callOptions.compressorName")
.isEqualTo(new Codec.Gzip().getMessageEncoding()));
} finally {
exporter.shutdown();
}
}
@Test
void compressionEnabledAndDisabledWithChannel() {
GrpcExporter<Marshaler> exporter =
builder.setChannel(channel).setCompression("gzip").setCompression("none").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
UpstreamGrpcExporter.class,
otlp ->
assertThat(otlp)
.extracting("stub")
.extracting("callOptions.compressorName")
.isEqualTo(Codec.Identity.NONE.getMessageEncoding()));
} finally {
exporter.shutdown();
}
}
private final class TestMarshalerServiceStub
extends MarshalerServiceStub<Marshaler, Void, TestMarshalerServiceStub> {
private TestMarshalerServiceStub(Channel channel, CallOptions callOptions) {
super(channel, callOptions);
}
@Override
protected TestMarshalerServiceStub build(Channel channel, CallOptions callOptions) {
return new TestMarshalerServiceStub(channel, callOptions);
}
@Override
public ListenableFuture<Void> export(Marshaler request) {
return Futures.immediateVoidFuture();
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.internal.okhttp;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import org.junit.jupiter.api.Test;
class OkHttpExporterBuilderTest {
private final OkHttpExporterBuilder<Marshaler> builder =
new OkHttpExporterBuilder<>("otlp", "span", "http://localhost:4318/v1/traces");
@Test
void compressionDefault() {
OkHttpExporter<Marshaler> exporter = builder.build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false));
} finally {
exporter.shutdown();
}
}
@Test
void compressionNone() {
OkHttpExporter<Marshaler> exporter = builder.setCompression("none").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false));
} finally {
exporter.shutdown();
}
}
@Test
void compressionGzip() {
OkHttpExporter<Marshaler> exporter = builder.setCompression("gzip").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(true));
} finally {
exporter.shutdown();
}
}
@Test
void compressionEnabledAndDisabled() {
OkHttpExporter<Marshaler> exporter =
builder.setCompression("gzip").setCompression("none").build();
try {
assertThat(exporter)
.isInstanceOfSatisfying(
OkHttpExporter.class,
otlp -> assertThat(otlp).extracting("compressionEnabled").isEqualTo(false));
} finally {
exporter.shutdown();
}
}
}

View File

@ -66,6 +66,19 @@ public final class JaegerGrpcSpanExporterBuilder {
return this;
}
/**
* Sets the method used to compress payloads. If unset, compression is disabled. Currently
* supported compression methods include "gzip" and "none".
*/
public JaegerGrpcSpanExporterBuilder setCompression(String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
checkArgument(
compressionMethod.equals("gzip") || compressionMethod.equals("none"),
"Unsupported compression method. Supported compression methods include: gzip, none.");
delegate.setCompression(compressionMethod);
return this;
}
/**
* Sets the maximum time to wait for the collector to process an exported batch of spans. If
* unset, defaults to {@value DEFAULT_TIMEOUT_SECS}s.

View File

@ -357,6 +357,57 @@ class JaegerGrpcSpanExporterTest {
assertThatThrownBy(() -> JaegerGrpcSpanExporter.builder().setEndpoint("gopher://localhost"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid endpoint, must start with http:// or https://: gopher://localhost");
assertThatThrownBy(() -> JaegerGrpcSpanExporter.builder().setCompression(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("compressionMethod");
assertThatThrownBy(() -> JaegerGrpcSpanExporter.builder().setCompression("foo"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Unsupported compression method. Supported compression methods include: gzip, none.");
}
@Test
void compressionDefault() {
JaegerGrpcSpanExporter exporter = JaegerGrpcSpanExporter.builder().build();
try {
assertThat(exporter).extracting("delegate.compressionEnabled").isEqualTo(false);
} finally {
exporter.shutdown();
}
}
@Test
void compressionNone() {
JaegerGrpcSpanExporter exporter =
JaegerGrpcSpanExporter.builder().setCompression("none").build();
try {
assertThat(exporter).extracting("delegate.compressionEnabled").isEqualTo(false);
} finally {
exporter.shutdown();
}
}
@Test
void compressionGzip() {
JaegerGrpcSpanExporter exporter =
JaegerGrpcSpanExporter.builder().setCompression("gzip").build();
try {
assertThat(exporter).extracting("delegate.compressionEnabled").isEqualTo(true);
} finally {
exporter.shutdown();
}
}
@Test
void compressionEnabledAndDisabled() {
JaegerGrpcSpanExporter exporter =
JaegerGrpcSpanExporter.builder().setCompression("gzip").setCompression("none").build();
try {
assertThat(exporter).extracting("delegate.compressionEnabled").isEqualTo(false);
} finally {
exporter.shutdown();
}
}
@Test

View File

@ -8,6 +8,7 @@ package io.opentelemetry.exporter.otlp.testing.internal;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.junit.jupiter.api.Named.named;
import static org.junit.jupiter.params.provider.Arguments.arguments;
@ -237,6 +238,28 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
assertThat(exportedResourceTelemetry).containsExactlyElementsOf(expectedResourceTelemetry);
}
@Test
void compressionWithNone() {
TelemetryExporter<T> exporter =
exporterBuilder().setEndpoint(server.httpUri().toString()).setCompression("none").build();
// UpstreamGrpcExporter doesn't support compression, so we skip the assertion
assumeThat(exporter.unwrap())
.extracting("delegate")
.isNotInstanceOf(UpstreamGrpcExporter.class);
assertThat(exporter.unwrap()).extracting("delegate.compressionEnabled").isEqualTo(false);
}
@Test
void compressionWithGzip() {
TelemetryExporter<T> exporter =
exporterBuilder().setEndpoint(server.httpUri().toString()).setCompression("gzip").build();
// UpstreamGrpcExporter doesn't support compression, so we skip the assertion
assumeThat(exporter.unwrap())
.extracting("delegate")
.isNotInstanceOf(UpstreamGrpcExporter.class);
assertThat(exporter.unwrap()).extracting("delegate.compressionEnabled").isEqualTo(true);
}
@Test
void authorityWithAuth() {
TelemetryExporter<T> exporter =

View File

@ -139,6 +139,11 @@ public final class ManagedChannelTelemetryExporterBuilder<T>
delegate.setChannel(channel);
TelemetryExporter<T> delegateExporter = delegate.build();
return new TelemetryExporter<T>() {
@Override
public Object unwrap() {
return delegateExporter.unwrap();
}
@Override
public CompletableResultCode export(Collection<T> items) {
return delegateExporter.export(items);

View File

@ -19,6 +19,11 @@ public interface TelemetryExporter<T> {
/** Wraps a SpanExporter. */
static TelemetryExporter<SpanData> wrap(SpanExporter exporter) {
return new TelemetryExporter<SpanData>() {
@Override
public Object unwrap() {
return exporter;
}
@Override
public CompletableResultCode export(Collection<SpanData> items) {
return exporter.export(items);
@ -34,6 +39,11 @@ public interface TelemetryExporter<T> {
/** Wraps a MetricExporter. */
static TelemetryExporter<MetricData> wrap(MetricExporter exporter) {
return new TelemetryExporter<MetricData>() {
@Override
public Object unwrap() {
return exporter;
}
@Override
public CompletableResultCode export(Collection<MetricData> items) {
return exporter.export(items);
@ -49,6 +59,11 @@ public interface TelemetryExporter<T> {
/** Wraps a LogRecordExporter. */
static TelemetryExporter<LogRecordData> wrap(LogRecordExporter exporter) {
return new TelemetryExporter<LogRecordData>() {
@Override
public Object unwrap() {
return exporter;
}
@Override
public CompletableResultCode export(Collection<LogRecordData> items) {
return exporter.export(items);
@ -61,6 +76,8 @@ public interface TelemetryExporter<T> {
};
}
Object unwrap();
CompletableResultCode export(Collection<T> items);
CompletableResultCode shutdown();

View File

@ -26,11 +26,14 @@ public final class ZipkinSpanExporterBuilder {
private Supplier<InetAddress> localIpAddressSupplier = LocalInetAddressSupplier.getInstance();
@Nullable private Sender sender;
private String endpoint = ZipkinSpanExporter.DEFAULT_ENDPOINT;
// compression is enabled by default, because this is the default of OkHttpSender,
// which is created when no custom sender is set (see OkHttpSender.Builder)
private boolean compressionEnabled = true;
private long readTimeoutMillis = TimeUnit.SECONDS.toMillis(10);
private MeterProvider meterProvider = MeterProvider.noop();
/**
* Sets the Zipkin sender. Implements the client side of the span transport. A {@link
* Sets the Zipkin sender. Implements the client side of the span transport. An {@link
* OkHttpSender} is a good default.
*
* <p>The {@link Sender#close()} method will be called when the exporter is shut down.
@ -75,7 +78,7 @@ public final class ZipkinSpanExporterBuilder {
}
/**
* Sets the zipkin endpoint. This will use the endpoint to assign a {@link OkHttpSender} instance
* Sets the zipkin endpoint. This will use the endpoint to assign an {@link OkHttpSender} instance
* to this builder.
*
* @param endpoint The Zipkin endpoint URL, ex. "http://zipkinhost:9411/api/v2/spans".
@ -88,6 +91,26 @@ public final class ZipkinSpanExporterBuilder {
return this;
}
/**
* Sets the method used to compress payloads. If unset, gzip compression is enabled. Currently
* supported compression methods include "gzip" and "none".
*
* <p>The compression method is ignored when a custom Zipkin sender is set via {@link
* #setSender(Sender)}.
*
* @param compressionMethod The compression method, ex. "gzip".
* @return this.
* @see OkHttpSender
*/
public ZipkinSpanExporterBuilder setCompression(String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
checkArgument(
compressionMethod.equals("gzip") || compressionMethod.equals("none"),
"Unsupported compression method. Supported compression methods include: gzip, none.");
this.compressionEnabled = compressionMethod.equals("gzip");
return this;
}
/**
* Sets the maximum time to wait for the export of a batch of spans. If unset, defaults to 10s.
*
@ -135,7 +158,11 @@ public final class ZipkinSpanExporterBuilder {
Sender sender = this.sender;
if (sender == null) {
sender =
OkHttpSender.newBuilder().endpoint(endpoint).readTimeout((int) readTimeoutMillis).build();
OkHttpSender.newBuilder()
.endpoint(endpoint)
.compressionEnabled(compressionEnabled)
.readTimeout((int) readTimeoutMillis)
.build();
}
OtelToZipkinSpanTransformer transformer =
OtelToZipkinSpanTransformer.create(localIpAddressSupplier);

View File

@ -136,6 +136,10 @@ class ZipkinSpanExporterTest {
.isInstanceOf(NullPointerException.class)
.hasMessage("endpoint");
assertThatThrownBy(() -> ZipkinSpanExporter.builder().setCompression(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("compressionMethod");
assertThatThrownBy(() -> ZipkinSpanExporter.builder().setSender(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("sender");
@ -144,4 +148,45 @@ class ZipkinSpanExporterTest {
.isInstanceOf(NullPointerException.class)
.hasMessage("encoder");
}
@Test
void compressionDefault() {
ZipkinSpanExporter exporter = ZipkinSpanExporter.builder().build();
try {
assertThat(exporter).extracting("sender.compressionEnabled").isEqualTo(true);
} finally {
exporter.shutdown();
}
}
@Test
void compressionNone() {
ZipkinSpanExporter exporter = ZipkinSpanExporter.builder().setCompression("none").build();
try {
assertThat(exporter).extracting("sender.compressionEnabled").isEqualTo(false);
} finally {
exporter.shutdown();
}
}
@Test
void compressionGzip() {
ZipkinSpanExporter exporter = ZipkinSpanExporter.builder().setCompression("gzip").build();
try {
assertThat(exporter).extracting("sender.compressionEnabled").isEqualTo(true);
} finally {
exporter.shutdown();
}
}
@Test
void compressionEnabledAndDisabled() {
ZipkinSpanExporter exporter =
ZipkinSpanExporter.builder().setCompression("gzip").setCompression("none").build();
try {
assertThat(exporter).extracting("sender.compressionEnabled").isEqualTo(false);
} finally {
exporter.shutdown();
}
}
}