Add ability to enable gzip compression to OtlpGrpcSpanExporter and OtlpGrpcMetricExporter (#3585)

This commit is contained in:
jack-berg 2021-09-09 11:51:47 -05:00 committed by GitHub
parent 02ee514292
commit 68c2d48080
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 263 additions and 9 deletions

View File

@ -1,2 +1,4 @@
Comparing source compatibility of against Comparing source compatibility of against
No changes. *** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder setCompression(java.lang.String)

View File

@ -45,6 +45,7 @@ dependencies {
jmhImplementation(project(":proto")) jmhImplementation(project(":proto"))
jmhImplementation(project(":sdk:testing")) jmhImplementation(project(":sdk:testing"))
jmhImplementation(project(":sdk-extensions:resources")) jmhImplementation(project(":sdk-extensions:resources"))
jmhImplementation("io.grpc:grpc-api")
} }
wire { wire {

View File

@ -0,0 +1,141 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.internal;
import io.grpc.Codec;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleCounter;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.DoubleUpDownCounter;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.resources.Resource;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Warmup;
@BenchmarkMode({Mode.AverageTime})
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
@Fork(1)
public class GrpcGzipBenchmark {
private static final ExportMetricsServiceRequest METRICS_REQUEST;
private static final Codec GZIP_CODEC = new Codec.Gzip();
private static final Codec IDENTITY_CODEC = Codec.Identity.NONE;
static {
SdkMeterProvider meterProvider =
SdkMeterProvider.builder()
.setResource(
Resource.create(
Attributes.builder()
.put(AttributeKey.booleanKey("key_bool"), true)
.put(AttributeKey.stringKey("key_string"), "string")
.put(AttributeKey.longKey("key_int"), 100L)
.put(AttributeKey.doubleKey("key_double"), 100.3)
.put(
AttributeKey.stringArrayKey("key_string_array"),
Arrays.asList("string", "string"))
.put(AttributeKey.longArrayKey("key_long_array"), Arrays.asList(12L, 23L))
.put(
AttributeKey.doubleArrayKey("key_double_array"),
Arrays.asList(12.3, 23.1))
.put(
AttributeKey.booleanArrayKey("key_boolean_array"),
Arrays.asList(true, false))
.build()))
.build();
Meter meter1 = meterProvider.get("longinstrumentation");
meter1
.gaugeBuilder("gauge")
.setDescription("gauge description")
.setUnit("unit")
.ofLongs()
.buildWithCallback(
measurement ->
measurement.observe(5, Attributes.of(AttributeKey.stringKey("key"), "value")));
LongCounter longCounter =
meter1
.counterBuilder("counter")
.setDescription("counter description")
.setUnit("unit")
.build();
longCounter.add(1);
longCounter.add(2, Attributes.of(AttributeKey.longKey("lives"), 9L));
longCounter.add(3);
LongUpDownCounter longUpDownCounter =
meter1
.upDownCounterBuilder("updowncounter")
.setDescription("updowncounter description")
.setUnit("unit")
.build();
longUpDownCounter.add(1);
longUpDownCounter.add(-1, Attributes.of(AttributeKey.booleanKey("on"), true));
longUpDownCounter.add(1);
Meter meter2 = meterProvider.get("doubleinstrumentation");
meter2
.gaugeBuilder("doublegauge")
.setDescription("doublegauge")
.setUnit("unit")
.buildWithCallback(measurement -> measurement.observe(5.0));
DoubleCounter doubleCounter = meter2.counterBuilder("doublecounter").ofDoubles().build();
doubleCounter.add(1.0);
doubleCounter.add(2.0);
DoubleUpDownCounter doubleUpDownCounter =
meter2.upDownCounterBuilder("doubleupdown").ofDoubles().build();
doubleUpDownCounter.add(1.0);
doubleUpDownCounter.add(-1.0);
DoubleHistogram histogram = meter2.histogramBuilder("histogram").build();
histogram.record(1.0);
histogram.record(2.0);
histogram.record(3.0);
histogram.record(4.0);
histogram.record(5.0);
Collection<MetricData> metricData = meterProvider.collectAllMetrics();
METRICS_REQUEST =
ExportMetricsServiceRequest.newBuilder()
.addAllResourceMetrics(MetricAdapter.toProtoResourceMetrics(metricData))
.build();
}
@Benchmark
public ByteArrayOutputStream gzipCompressor() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStream gzos = GZIP_CODEC.compress(baos);
METRICS_REQUEST.writeTo(gzos);
gzos.close();
return baos;
}
@Benchmark
public ByteArrayOutputStream identityCompressor() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
OutputStream gzos = IDENTITY_CODEC.compress(baos);
METRICS_REQUEST.writeTo(gzos);
gzos.close();
return baos;
}
}

View File

@ -8,6 +8,7 @@ package io.opentelemetry.exporter.otlp.metrics;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Codec;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.Status; import io.grpc.Status;
import io.opentelemetry.exporter.otlp.internal.MetricsRequestMarshaler; import io.opentelemetry.exporter.otlp.internal.MetricsRequestMarshaler;
@ -42,11 +43,15 @@ public final class OtlpGrpcMetricExporter implements MetricExporter {
* @param channel the channel to use when communicating with the OpenTelemetry Collector. * @param channel the channel to use when communicating with the OpenTelemetry Collector.
* @param timeoutNanos max waiting time for the collector to process each metric batch. When set * @param timeoutNanos max waiting time for the collector to process each metric batch. When set
* to 0 or to a negative value, the exporter will wait indefinitely. * to 0 or to a negative value, the exporter will wait indefinitely.
* @param compressionEnabled whether or not to enable gzip compression.
*/ */
OtlpGrpcMetricExporter(ManagedChannel channel, long timeoutNanos) { OtlpGrpcMetricExporter(ManagedChannel channel, long timeoutNanos, boolean compressionEnabled) {
this.managedChannel = channel; this.managedChannel = channel;
this.timeoutNanos = timeoutNanos; this.timeoutNanos = timeoutNanos;
metricsService = MarshalerMetricsServiceGrpc.newFutureStub(channel); Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE;
this.metricsService =
MarshalerMetricsServiceGrpc.newFutureStub(channel)
.withCompression(codec.getMessageEncoding());
} }
/** /**

View File

@ -31,7 +31,7 @@ public final class OtlpGrpcMetricExporterBuilder {
@Nullable private ManagedChannel channel; @Nullable private ManagedChannel channel;
private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS); private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS);
private URI endpoint = DEFAULT_ENDPOINT; private URI endpoint = DEFAULT_ENDPOINT;
private boolean compressionEnabled = false;
@Nullable private Metadata metadata; @Nullable private Metadata metadata;
@Nullable private byte[] trustedCertificatesPem; @Nullable private byte[] trustedCertificatesPem;
@ -91,6 +91,19 @@ public final class OtlpGrpcMetricExporterBuilder {
return this; return this;
} }
/**
* Sets the method used to compress payloads. If unset, compression is disabled. Currently the
* only supported compression method is "gzip".
*/
public OtlpGrpcMetricExporterBuilder setCompression(String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
checkArgument(
compressionMethod.equals("gzip"),
"Unsupported compression method. Supported compression methods include: gzip.");
this.compressionEnabled = true;
return this;
}
/** /**
* Sets the certificate chain to use for verifying servers when TLS is enabled. The {@code byte[]} * Sets the certificate chain to use for verifying servers when TLS is enabled. The {@code byte[]}
* should contain an X.509 certificate collection in PEM format. If not set, TLS connections will * should contain an X.509 certificate collection in PEM format. If not set, TLS connections will
@ -152,7 +165,7 @@ public final class OtlpGrpcMetricExporterBuilder {
channel = managedChannelBuilder.build(); channel = managedChannelBuilder.build();
} }
return new OtlpGrpcMetricExporter(channel, timeoutNanos); return new OtlpGrpcMetricExporter(channel, timeoutNanos, compressionEnabled);
} }
OtlpGrpcMetricExporterBuilder() {} OtlpGrpcMetricExporterBuilder() {}

View File

@ -101,6 +101,13 @@ class OtlpGrpcMetricExporterTest {
assertThatThrownBy(() -> OtlpGrpcMetricExporter.builder().setEndpoint("gopher://localhost")) assertThatThrownBy(() -> OtlpGrpcMetricExporter.builder().setEndpoint("gopher://localhost"))
.isInstanceOf(IllegalArgumentException.class) .isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid endpoint, must start with http:// or https://: gopher://localhost"); .hasMessage("Invalid endpoint, must start with http:// or https://: gopher://localhost");
assertThatThrownBy(() -> OtlpGrpcMetricExporter.builder().setCompression(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("compressionMethod");
assertThatThrownBy(() -> OtlpGrpcMetricExporter.builder().setCompression("foo"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Unsupported compression method. Supported compression methods include: gzip.");
} }
@Test @Test

View File

@ -84,6 +84,16 @@ class ExportTest {
} }
}; };
@Test
void gzipCompressionExport() {
OtlpGrpcMetricExporter exporter =
OtlpGrpcMetricExporter.builder()
.setEndpoint("http://localhost:" + server.httpPort())
.setCompression("gzip")
.build();
assertThat(exporter.export(METRICS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
}
@Test @Test
void plainTextExport() { void plainTextExport() {
OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter exporter =

View File

@ -84,6 +84,16 @@ class ExportTest {
} }
}; };
@Test
void gzipCompressionExport() {
OtlpGrpcMetricExporter exporter =
OtlpGrpcMetricExporter.builder()
.setEndpoint("http://localhost:" + server.httpPort())
.setCompression("gzip")
.build();
assertThat(exporter.export(METRICS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
}
@Test @Test
void plainTextExport() { void plainTextExport() {
OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter exporter =

View File

@ -84,6 +84,16 @@ class ExportTest {
} }
}; };
@Test
void gzipCompressionExport() {
OtlpGrpcMetricExporter exporter =
OtlpGrpcMetricExporter.builder()
.setEndpoint("http://localhost:" + server.httpPort())
.setCompression("gzip")
.build();
assertThat(exporter.export(METRICS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
}
@Test @Test
void plainTextExport() { void plainTextExport() {
OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter exporter =

View File

@ -8,6 +8,7 @@ package io.opentelemetry.exporter.otlp.trace;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Codec;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.Status; import io.grpc.Status;
import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.AttributeKey;
@ -61,8 +62,9 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {
* @param channel the channel to use when communicating with the OpenTelemetry Collector. * @param channel the channel to use when communicating with the OpenTelemetry Collector.
* @param timeoutNanos max waiting time for the collector to process each span batch. When set to * @param timeoutNanos max waiting time for the collector to process each span batch. When set to
* 0 or to a negative value, the exporter will wait indefinitely. * 0 or to a negative value, the exporter will wait indefinitely.
* @param compressionEnabled whether or not to enable gzip compression.
*/ */
OtlpGrpcSpanExporter(ManagedChannel channel, long timeoutNanos) { OtlpGrpcSpanExporter(ManagedChannel channel, long timeoutNanos, boolean compressionEnabled) {
// TODO: telemetry schema version. // TODO: telemetry schema version.
Meter meter = GlobalMeterProvider.get().meterBuilder("io.opentelemetry.exporters.otlp").build(); Meter meter = GlobalMeterProvider.get().meterBuilder("io.opentelemetry.exporters.otlp").build();
this.spansSeen = this.spansSeen =
@ -72,8 +74,10 @@ public final class OtlpGrpcSpanExporter implements SpanExporter {
this.spansExportedFailure = spansExportedCounter.bind(EXPORT_FAILURE_ATTRIBUTES); this.spansExportedFailure = spansExportedCounter.bind(EXPORT_FAILURE_ATTRIBUTES);
this.managedChannel = channel; this.managedChannel = channel;
this.timeoutNanos = timeoutNanos; this.timeoutNanos = timeoutNanos;
Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE;
this.traceService = MarshalerTraceServiceGrpc.newFutureStub(channel); this.traceService =
MarshalerTraceServiceGrpc.newFutureStub(channel)
.withCompression(codec.getMessageEncoding());
} }
/** /**

View File

@ -31,6 +31,7 @@ public final class OtlpGrpcSpanExporterBuilder {
@Nullable private ManagedChannel channel; @Nullable private ManagedChannel channel;
private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS); private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS);
private URI endpoint = DEFAULT_ENDPOINT; private URI endpoint = DEFAULT_ENDPOINT;
private boolean compressionEnabled = false;
@Nullable private Metadata metadata; @Nullable private Metadata metadata;
@Nullable private byte[] trustedCertificatesPem; @Nullable private byte[] trustedCertificatesPem;
@ -90,6 +91,19 @@ public final class OtlpGrpcSpanExporterBuilder {
return this; return this;
} }
/**
* Sets the method used to compress payloads. If unset, compression is disabled. Currently the
* only supported compression method is "gzip".
*/
public OtlpGrpcSpanExporterBuilder setCompression(String compressionMethod) {
requireNonNull(compressionMethod, "compressionMethod");
checkArgument(
compressionMethod.equals("gzip"),
"Unsupported compression method. Supported compression methods include: gzip.");
this.compressionEnabled = true;
return this;
}
/** /**
* Sets the certificate chain to use for verifying servers when TLS is enabled. The {@code byte[]} * Sets the certificate chain to use for verifying servers when TLS is enabled. The {@code byte[]}
* should contain an X.509 certificate collection in PEM format. If not set, TLS connections will * should contain an X.509 certificate collection in PEM format. If not set, TLS connections will
@ -151,7 +165,7 @@ public final class OtlpGrpcSpanExporterBuilder {
channel = managedChannelBuilder.build(); channel = managedChannelBuilder.build();
} }
return new OtlpGrpcSpanExporter(channel, timeoutNanos); return new OtlpGrpcSpanExporter(channel, timeoutNanos, compressionEnabled);
} }
OtlpGrpcSpanExporterBuilder() {} OtlpGrpcSpanExporterBuilder() {}

View File

@ -102,6 +102,13 @@ class OtlpGrpcSpanExporterTest {
assertThatThrownBy(() -> OtlpGrpcSpanExporter.builder().setEndpoint("gopher://localhost")) assertThatThrownBy(() -> OtlpGrpcSpanExporter.builder().setEndpoint("gopher://localhost"))
.isInstanceOf(IllegalArgumentException.class) .isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid endpoint, must start with http:// or https://: gopher://localhost"); .hasMessage("Invalid endpoint, must start with http:// or https://: gopher://localhost");
assertThatThrownBy(() -> OtlpGrpcSpanExporter.builder().setCompression(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("compressionMethod");
assertThatThrownBy(() -> OtlpGrpcSpanExporter.builder().setCompression("foo"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Unsupported compression method. Supported compression methods include: gzip.");
} }
@Test @Test

View File

@ -71,6 +71,16 @@ class ExportTest {
} }
}; };
@Test
void gzipCompressionExport() {
OtlpGrpcSpanExporter exporter =
OtlpGrpcSpanExporter.builder()
.setEndpoint("http://localhost:" + server.httpPort())
.setCompression("gzip")
.build();
assertThat(exporter.export(SPANS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
}
@Test @Test
void plainTextExport() { void plainTextExport() {
OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter exporter =

View File

@ -71,6 +71,16 @@ class ExportTest {
} }
}; };
@Test
void gzipCompressionExport() {
OtlpGrpcSpanExporter exporter =
OtlpGrpcSpanExporter.builder()
.setEndpoint("http://localhost:" + server.httpPort())
.setCompression("gzip")
.build();
assertThat(exporter.export(SPANS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
}
@Test @Test
void plainTextExport() { void plainTextExport() {
OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter exporter =

View File

@ -71,6 +71,16 @@ class ExportTest {
} }
}; };
@Test
void gzipCompressionExport() {
OtlpGrpcSpanExporter exporter =
OtlpGrpcSpanExporter.builder()
.setEndpoint("http://localhost:" + server.httpPort())
.setCompression("gzip")
.build();
assertThat(exporter.export(SPANS).join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
}
@Test @Test
void plainTextExport() { void plainTextExport() {
OtlpGrpcSpanExporter exporter = OtlpGrpcSpanExporter exporter =