Add http/protobuf exporter for metrics (#3440)
* Add OtlpHttpMetricExporter * Fix test flake by adjusting logging order. * Drop metric instrumentation of OtlpHttpMetricExporter * Attempt to fix log tet flake using awaitility * Rename isCompressionEnabled to compressionEnabled
This commit is contained in:
parent
8a597c6c6b
commit
8224ee4323
|
|
@ -0,0 +1,5 @@
|
||||||
|
# OpenTelemetry - OTLP Metrics Exporter - HTTP
|
||||||
|
|
||||||
|
This is the OpenTelemetry exporter, sending metric data to OpenTelemetry collector via HTTP without gRPC.
|
||||||
|
|
||||||
|
// TODO: add javadoc links once published
|
||||||
|
|
@ -0,0 +1,23 @@
|
||||||
|
plugins {
|
||||||
|
id("otel.java-conventions")
|
||||||
|
id("otel.publish-conventions")
|
||||||
|
|
||||||
|
id("otel.animalsniffer-conventions")
|
||||||
|
}
|
||||||
|
|
||||||
|
description = "OpenTelemetry Protocol HTTP Metrics Exporter"
|
||||||
|
otelJava.moduleName.set("io.opentelemetry.exporter.otlp.http.metrics")
|
||||||
|
|
||||||
|
dependencies {
|
||||||
|
api(project(":sdk:metrics"))
|
||||||
|
|
||||||
|
implementation(project(":exporters:otlp:common"))
|
||||||
|
|
||||||
|
implementation("com.squareup.okhttp3:okhttp")
|
||||||
|
implementation("com.squareup.okhttp3:okhttp-tls")
|
||||||
|
implementation("com.squareup.okio:okio")
|
||||||
|
|
||||||
|
testImplementation(project(":sdk:testing"))
|
||||||
|
|
||||||
|
testImplementation("com.linecorp.armeria:armeria-junit5")
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
otel.release=alpha
|
||||||
|
|
@ -0,0 +1,196 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.exporter.otlp.http.metric;
|
||||||
|
|
||||||
|
import com.google.rpc.Code;
|
||||||
|
import com.google.rpc.Status;
|
||||||
|
import io.opentelemetry.exporter.otlp.internal.MetricAdapter;
|
||||||
|
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
|
||||||
|
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||||
|
import io.opentelemetry.sdk.internal.ThrottlingLogger;
|
||||||
|
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||||
|
import io.opentelemetry.sdk.metrics.export.MetricExporter;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.logging.Level;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
import okhttp3.Call;
|
||||||
|
import okhttp3.Callback;
|
||||||
|
import okhttp3.Headers;
|
||||||
|
import okhttp3.MediaType;
|
||||||
|
import okhttp3.OkHttpClient;
|
||||||
|
import okhttp3.Request;
|
||||||
|
import okhttp3.RequestBody;
|
||||||
|
import okhttp3.Response;
|
||||||
|
import okhttp3.ResponseBody;
|
||||||
|
import okio.BufferedSink;
|
||||||
|
import okio.GzipSink;
|
||||||
|
import okio.Okio;
|
||||||
|
|
||||||
|
/** Exports metrics using OTLP via HTTP, using OpenTelemetry's protobuf model. */
|
||||||
|
@ThreadSafe
|
||||||
|
public final class OtlpHttpMetricExporter implements MetricExporter {
|
||||||
|
|
||||||
|
private static final MediaType PROTOBUF_MEDIA_TYPE = MediaType.parse("application/x-protobuf");
|
||||||
|
|
||||||
|
private final ThrottlingLogger logger =
|
||||||
|
new ThrottlingLogger(Logger.getLogger(OtlpHttpMetricExporter.class.getName()));
|
||||||
|
|
||||||
|
private final OkHttpClient client;
|
||||||
|
private final String endpoint;
|
||||||
|
@Nullable private final Headers headers;
|
||||||
|
private final boolean compressionEnabled;
|
||||||
|
|
||||||
|
OtlpHttpMetricExporter(
|
||||||
|
OkHttpClient client, String endpoint, @Nullable Headers headers, boolean compressionEnabled) {
|
||||||
|
this.client = client;
|
||||||
|
this.endpoint = endpoint;
|
||||||
|
this.headers = headers;
|
||||||
|
this.compressionEnabled = compressionEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Submits all the given metrics in a single batch to the OpenTelemetry collector.
|
||||||
|
*
|
||||||
|
* @param metrics the list of Metrics to be exported.
|
||||||
|
* @return the result of the operation
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public CompletableResultCode export(Collection<MetricData> metrics) {
|
||||||
|
ExportMetricsServiceRequest exportMetricsServiceRequest =
|
||||||
|
ExportMetricsServiceRequest.newBuilder()
|
||||||
|
.addAllResourceMetrics(MetricAdapter.toProtoResourceMetrics(metrics))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Request.Builder requestBuilder = new Request.Builder().url(endpoint);
|
||||||
|
if (headers != null) {
|
||||||
|
requestBuilder.headers(headers);
|
||||||
|
}
|
||||||
|
RequestBody requestBody =
|
||||||
|
RequestBody.create(exportMetricsServiceRequest.toByteArray(), PROTOBUF_MEDIA_TYPE);
|
||||||
|
if (compressionEnabled) {
|
||||||
|
requestBuilder.addHeader("Content-Encoding", "gzip");
|
||||||
|
requestBuilder.post(gzipRequestBody(requestBody));
|
||||||
|
} else {
|
||||||
|
requestBuilder.post(requestBody);
|
||||||
|
}
|
||||||
|
|
||||||
|
CompletableResultCode result = new CompletableResultCode();
|
||||||
|
|
||||||
|
client
|
||||||
|
.newCall(requestBuilder.build())
|
||||||
|
.enqueue(
|
||||||
|
new Callback() {
|
||||||
|
@Override
|
||||||
|
public void onFailure(Call call, IOException e) {
|
||||||
|
logger.log(
|
||||||
|
Level.SEVERE,
|
||||||
|
"Failed to export metrics. The request could not be executed. Full error message: "
|
||||||
|
+ e.getMessage());
|
||||||
|
result.fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onResponse(Call call, Response response) {
|
||||||
|
if (response.isSuccessful()) {
|
||||||
|
result.succeed();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int code = response.code();
|
||||||
|
|
||||||
|
Status status = extractErrorStatus(response);
|
||||||
|
|
||||||
|
logger.log(
|
||||||
|
Level.WARNING,
|
||||||
|
"Failed to export metrics. Server responded with HTTP status code "
|
||||||
|
+ code
|
||||||
|
+ ". Error message: "
|
||||||
|
+ status.getMessage());
|
||||||
|
result.fail();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static RequestBody gzipRequestBody(RequestBody requestBody) {
|
||||||
|
return new RequestBody() {
|
||||||
|
@Override
|
||||||
|
public MediaType contentType() {
|
||||||
|
return requestBody.contentType();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long contentLength() {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeTo(BufferedSink bufferedSink) throws IOException {
|
||||||
|
BufferedSink gzipSink = Okio.buffer(new GzipSink(bufferedSink));
|
||||||
|
requestBody.writeTo(gzipSink);
|
||||||
|
gzipSink.close();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Status extractErrorStatus(Response response) {
|
||||||
|
ResponseBody responseBody = response.body();
|
||||||
|
if (responseBody == null) {
|
||||||
|
return Status.newBuilder()
|
||||||
|
.setMessage("Response body missing, HTTP status message: " + response.message())
|
||||||
|
.setCode(Code.UNKNOWN.getNumber())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return Status.parseFrom(responseBody.bytes());
|
||||||
|
} catch (IOException e) {
|
||||||
|
return Status.newBuilder()
|
||||||
|
.setMessage("Unable to parse response body, HTTP status message: " + response.message())
|
||||||
|
.setCode(Code.UNKNOWN.getNumber())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The OTLP exporter does not batch metrics, so this method will immediately return with success.
|
||||||
|
*
|
||||||
|
* @return always Success
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public CompletableResultCode flush() {
|
||||||
|
return CompletableResultCode.ofSuccess();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a new builder instance for this exporter.
|
||||||
|
*
|
||||||
|
* @return a new builder instance for this exporter.
|
||||||
|
*/
|
||||||
|
public static OtlpHttpMetricExporterBuilder builder() {
|
||||||
|
return new OtlpHttpMetricExporterBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a new {@link OtlpHttpMetricExporter} using the default values.
|
||||||
|
*
|
||||||
|
* @return a new {@link OtlpHttpMetricExporter} instance.
|
||||||
|
*/
|
||||||
|
public static OtlpHttpMetricExporter getDefault() {
|
||||||
|
return builder().build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Shutdown the exporter. */
|
||||||
|
@Override
|
||||||
|
public CompletableResultCode shutdown() {
|
||||||
|
final CompletableResultCode result = CompletableResultCode.ofSuccess();
|
||||||
|
client.dispatcher().cancelAll();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,160 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.exporter.otlp.http.metric;
|
||||||
|
|
||||||
|
import static io.opentelemetry.api.internal.Utils.checkArgument;
|
||||||
|
import static java.util.Objects.requireNonNull;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.security.cert.CertificateException;
|
||||||
|
import java.security.cert.CertificateFactory;
|
||||||
|
import java.security.cert.X509Certificate;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
import okhttp3.Headers;
|
||||||
|
import okhttp3.OkHttpClient;
|
||||||
|
import okhttp3.tls.HandshakeCertificates;
|
||||||
|
|
||||||
|
/** Builder utility for {@link OtlpHttpMetricExporter}. */
|
||||||
|
public final class OtlpHttpMetricExporterBuilder {
|
||||||
|
|
||||||
|
private static final long DEFAULT_TIMEOUT_SECS = 10;
|
||||||
|
private static final String DEFAULT_ENDPOINT = "http://localhost:4317/v1/metrics";
|
||||||
|
|
||||||
|
private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS);
|
||||||
|
private String endpoint = DEFAULT_ENDPOINT;
|
||||||
|
private boolean compressionEnabled = false;
|
||||||
|
@Nullable private Headers.Builder headersBuilder;
|
||||||
|
@Nullable private byte[] trustedCertificatesPem;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the maximum time to wait for the collector to process an exported batch of metrics. If
|
||||||
|
* unset, defaults to {@value DEFAULT_TIMEOUT_SECS}s.
|
||||||
|
*/
|
||||||
|
public OtlpHttpMetricExporterBuilder setTimeout(long timeout, TimeUnit unit) {
|
||||||
|
requireNonNull(unit, "unit");
|
||||||
|
checkArgument(timeout >= 0, "timeout must be non-negative");
|
||||||
|
timeoutNanos = unit.toNanos(timeout);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the maximum time to wait for the collector to process an exported batch of metrics. If
|
||||||
|
* unset, defaults to {@value DEFAULT_TIMEOUT_SECS}s.
|
||||||
|
*/
|
||||||
|
public OtlpHttpMetricExporterBuilder setTimeout(Duration timeout) {
|
||||||
|
requireNonNull(timeout, "timeout");
|
||||||
|
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT}. The
|
||||||
|
* endpoint must start with either http:// or https://, and include the full HTTP path.
|
||||||
|
*/
|
||||||
|
public OtlpHttpMetricExporterBuilder setEndpoint(String endpoint) {
|
||||||
|
requireNonNull(endpoint, "endpoint");
|
||||||
|
|
||||||
|
URI uri;
|
||||||
|
try {
|
||||||
|
uri = new URI(endpoint);
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
throw new IllegalArgumentException("Invalid endpoint, must be a URL: " + endpoint, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (uri.getScheme() == null
|
||||||
|
|| (!uri.getScheme().equals("http") && !uri.getScheme().equals("https"))) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Invalid endpoint, must start with http:// or https://: " + uri);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.endpoint = endpoint;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the method used to compress payloads. If unset, compression is disabled. Currently the
|
||||||
|
* only supported compression method is "gzip".
|
||||||
|
*/
|
||||||
|
public OtlpHttpMetricExporterBuilder setCompression(String compressionMethod) {
|
||||||
|
requireNonNull(compressionMethod, "compressionMethod");
|
||||||
|
Preconditions.checkArgument(
|
||||||
|
compressionMethod.equals("gzip"),
|
||||||
|
"Unsupported compression method. Supported compression methods include: gzip.");
|
||||||
|
this.compressionEnabled = true;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Add header to requests. */
|
||||||
|
public OtlpHttpMetricExporterBuilder addHeader(String key, String value) {
|
||||||
|
if (headersBuilder == null) {
|
||||||
|
headersBuilder = new Headers.Builder();
|
||||||
|
}
|
||||||
|
headersBuilder.add(key, value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the certificate chain to use for verifying servers when TLS is enabled. The {@code byte[]}
|
||||||
|
* should contain an X.509 certificate collection in PEM format. If not set, TLS connections will
|
||||||
|
* use the system default trusted certificates.
|
||||||
|
*/
|
||||||
|
public OtlpHttpMetricExporterBuilder setTrustedCertificates(byte[] trustedCertificatesPem) {
|
||||||
|
this.trustedCertificatesPem = trustedCertificatesPem;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a new instance of the exporter based on the builder's values.
|
||||||
|
*
|
||||||
|
* @return a new exporter's instance
|
||||||
|
*/
|
||||||
|
public OtlpHttpMetricExporter build() {
|
||||||
|
OkHttpClient.Builder clientBuilder =
|
||||||
|
new OkHttpClient.Builder().callTimeout(Duration.ofNanos(timeoutNanos));
|
||||||
|
|
||||||
|
if (trustedCertificatesPem != null) {
|
||||||
|
try {
|
||||||
|
HandshakeCertificates handshakeCertificates =
|
||||||
|
toHandshakeCertificates(trustedCertificatesPem);
|
||||||
|
clientBuilder.sslSocketFactory(
|
||||||
|
handshakeCertificates.sslSocketFactory(), handshakeCertificates.trustManager());
|
||||||
|
} catch (CertificateException e) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Could not set trusted certificate for OTLP HTTP connection, are they valid X.509 in PEM format?",
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Headers headers = headersBuilder == null ? null : headersBuilder.build();
|
||||||
|
|
||||||
|
return new OtlpHttpMetricExporter(clientBuilder.build(), endpoint, headers, compressionEnabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract X.509 certificates from the bytes.
|
||||||
|
*
|
||||||
|
* @param trustedCertificatesPem bytes containing an X.509 certificate collection in PEM format.
|
||||||
|
* @return a HandshakeCertificates with the certificates
|
||||||
|
* @throws CertificateException if an error occurs extracting certificates
|
||||||
|
*/
|
||||||
|
private static HandshakeCertificates toHandshakeCertificates(byte[] trustedCertificatesPem)
|
||||||
|
throws CertificateException {
|
||||||
|
ByteArrayInputStream is = new ByteArrayInputStream(trustedCertificatesPem);
|
||||||
|
CertificateFactory factory = CertificateFactory.getInstance("X.509");
|
||||||
|
HandshakeCertificates.Builder certBuilder = new HandshakeCertificates.Builder();
|
||||||
|
while (is.available() > 0) {
|
||||||
|
X509Certificate cert = (X509Certificate) factory.generateCertificate(is);
|
||||||
|
certBuilder.addTrustedCertificate(cert);
|
||||||
|
}
|
||||||
|
return certBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
OtlpHttpMetricExporterBuilder() {}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,10 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
/** OpenTelemetry exporter which sends metric data to OpenTelemetry collector via OTLP HTTP. */
|
||||||
|
@ParametersAreNonnullByDefault
|
||||||
|
package io.opentelemetry.exporter.otlp.http.metric;
|
||||||
|
|
||||||
|
import javax.annotation.ParametersAreNonnullByDefault;
|
||||||
|
|
@ -0,0 +1,253 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.exporter.otlp.http.metric;
|
||||||
|
|
||||||
|
import static io.opentelemetry.api.common.AttributeKey.stringKey;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
|
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
import com.google.protobuf.Message;
|
||||||
|
import com.google.rpc.Status;
|
||||||
|
import com.linecorp.armeria.common.AggregatedHttpRequest;
|
||||||
|
import com.linecorp.armeria.common.HttpMethod;
|
||||||
|
import com.linecorp.armeria.common.HttpResponse;
|
||||||
|
import com.linecorp.armeria.common.HttpStatus;
|
||||||
|
import com.linecorp.armeria.common.MediaType;
|
||||||
|
import com.linecorp.armeria.server.ServerBuilder;
|
||||||
|
import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension;
|
||||||
|
import io.github.netmikey.logunit.api.LogCapturer;
|
||||||
|
import io.opentelemetry.api.common.Attributes;
|
||||||
|
import io.opentelemetry.exporter.otlp.internal.MetricAdapter;
|
||||||
|
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
|
||||||
|
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
|
||||||
|
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||||
|
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
|
||||||
|
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
|
||||||
|
import io.opentelemetry.sdk.metrics.data.LongPointData;
|
||||||
|
import io.opentelemetry.sdk.metrics.data.LongSumData;
|
||||||
|
import io.opentelemetry.sdk.metrics.data.MetricData;
|
||||||
|
import io.opentelemetry.sdk.resources.Resource;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import okhttp3.tls.HeldCertificate;
|
||||||
|
import okio.Buffer;
|
||||||
|
import okio.GzipSource;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
|
import org.slf4j.event.Level;
|
||||||
|
import org.slf4j.event.LoggingEvent;
|
||||||
|
|
||||||
|
class OtlpHttpMetricExporterTest {
|
||||||
|
|
||||||
|
private static final MediaType APPLICATION_PROTOBUF =
|
||||||
|
MediaType.create("application", "x-protobuf");
|
||||||
|
private static final HeldCertificate HELD_CERTIFICATE;
|
||||||
|
|
||||||
|
static {
|
||||||
|
try {
|
||||||
|
HELD_CERTIFICATE =
|
||||||
|
new HeldCertificate.Builder()
|
||||||
|
.commonName("localhost")
|
||||||
|
.addSubjectAlternativeName(InetAddress.getByName("localhost").getCanonicalHostName())
|
||||||
|
.build();
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
throw new IllegalStateException("Error building certificate.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@RegisterExtension
|
||||||
|
static MockWebServerExtension server =
|
||||||
|
new MockWebServerExtension() {
|
||||||
|
@Override
|
||||||
|
protected void configureServer(ServerBuilder sb) {
|
||||||
|
sb.tls(HELD_CERTIFICATE.keyPair().getPrivate(), HELD_CERTIFICATE.certificate());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@RegisterExtension
|
||||||
|
LogCapturer logs = LogCapturer.create().captureForType(OtlpHttpMetricExporter.class);
|
||||||
|
|
||||||
|
private OtlpHttpMetricExporterBuilder builder;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setup() {
|
||||||
|
builder =
|
||||||
|
OtlpHttpMetricExporter.builder()
|
||||||
|
.setEndpoint("https://localhost:" + server.httpsPort() + "/v1/metrics")
|
||||||
|
.addHeader("foo", "bar")
|
||||||
|
.setTrustedCertificates(
|
||||||
|
HELD_CERTIFICATE.certificatePem().getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("PreferJavaTimeOverload")
|
||||||
|
void invalidConfig() {
|
||||||
|
assertThatThrownBy(() -> OtlpHttpMetricExporter.builder().setTimeout(-1, TimeUnit.MILLISECONDS))
|
||||||
|
.isInstanceOf(IllegalArgumentException.class)
|
||||||
|
.hasMessage("timeout must be non-negative");
|
||||||
|
assertThatThrownBy(() -> OtlpHttpMetricExporter.builder().setTimeout(1, null))
|
||||||
|
.isInstanceOf(NullPointerException.class)
|
||||||
|
.hasMessage("unit");
|
||||||
|
assertThatThrownBy(() -> OtlpHttpMetricExporter.builder().setTimeout(null))
|
||||||
|
.isInstanceOf(NullPointerException.class)
|
||||||
|
.hasMessage("timeout");
|
||||||
|
|
||||||
|
assertThatThrownBy(() -> OtlpHttpMetricExporter.builder().setEndpoint(null))
|
||||||
|
.isInstanceOf(NullPointerException.class)
|
||||||
|
.hasMessage("endpoint");
|
||||||
|
assertThatThrownBy(() -> OtlpHttpMetricExporter.builder().setEndpoint("😺://localhost"))
|
||||||
|
.isInstanceOf(IllegalArgumentException.class)
|
||||||
|
.hasMessage("Invalid endpoint, must be a URL: 😺://localhost");
|
||||||
|
assertThatThrownBy(() -> OtlpHttpMetricExporter.builder().setEndpoint("localhost"))
|
||||||
|
.isInstanceOf(IllegalArgumentException.class)
|
||||||
|
.hasMessage("Invalid endpoint, must start with http:// or https://: localhost");
|
||||||
|
assertThatThrownBy(() -> OtlpHttpMetricExporter.builder().setEndpoint("gopher://localhost"))
|
||||||
|
.isInstanceOf(IllegalArgumentException.class)
|
||||||
|
.hasMessage("Invalid endpoint, must start with http:// or https://: gopher://localhost");
|
||||||
|
|
||||||
|
assertThatThrownBy(() -> OtlpHttpMetricExporter.builder().setCompression(null))
|
||||||
|
.isInstanceOf(NullPointerException.class)
|
||||||
|
.hasMessage("compressionMethod");
|
||||||
|
assertThatThrownBy(() -> OtlpHttpMetricExporter.builder().setCompression("foo"))
|
||||||
|
.isInstanceOf(IllegalArgumentException.class)
|
||||||
|
.hasMessage("Unsupported compression method. Supported compression methods include: gzip.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testExportUncompressed() {
|
||||||
|
server.enqueue(successResponse());
|
||||||
|
OtlpHttpMetricExporter exporter = builder.build();
|
||||||
|
|
||||||
|
ExportMetricsServiceRequest payload =
|
||||||
|
exportAndAssertResult(exporter, /* expectedResult= */ true);
|
||||||
|
AggregatedHttpRequest request = server.takeRequest().request();
|
||||||
|
assertRequestCommon(request);
|
||||||
|
assertThat(parseRequestBody(request.content().array())).isEqualTo(payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testExportGzipCompressed() {
|
||||||
|
server.enqueue(successResponse());
|
||||||
|
OtlpHttpMetricExporter exporter = builder.setCompression("gzip").build();
|
||||||
|
|
||||||
|
ExportMetricsServiceRequest payload =
|
||||||
|
exportAndAssertResult(exporter, /* expectedResult= */ true);
|
||||||
|
AggregatedHttpRequest request = server.takeRequest().request();
|
||||||
|
assertRequestCommon(request);
|
||||||
|
assertThat(request.headers().get("Content-Encoding")).isEqualTo("gzip");
|
||||||
|
assertThat(parseRequestBody(gzipDecompress(request.content().array()))).isEqualTo(payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void assertRequestCommon(AggregatedHttpRequest request) {
|
||||||
|
assertThat(request.method()).isEqualTo(HttpMethod.POST);
|
||||||
|
assertThat(request.path()).isEqualTo("/v1/metrics");
|
||||||
|
assertThat(request.headers().get("foo")).isEqualTo("bar");
|
||||||
|
assertThat(request.headers().get("Content-Type")).isEqualTo(APPLICATION_PROTOBUF.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ExportMetricsServiceRequest parseRequestBody(byte[] bytes) {
|
||||||
|
try {
|
||||||
|
return ExportMetricsServiceRequest.parseFrom(bytes);
|
||||||
|
} catch (InvalidProtocolBufferException e) {
|
||||||
|
throw new IllegalStateException("Unable to parse Protobuf request body.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] gzipDecompress(byte[] bytes) {
|
||||||
|
try {
|
||||||
|
Buffer result = new Buffer();
|
||||||
|
GzipSource source = new GzipSource(new Buffer().write(bytes));
|
||||||
|
while (source.read(result, Integer.MAX_VALUE) != -1) {}
|
||||||
|
return result.readByteArray();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalStateException("Unable to decompress payload.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testServerError() {
|
||||||
|
server.enqueue(
|
||||||
|
buildResponse(
|
||||||
|
HttpStatus.INTERNAL_SERVER_ERROR,
|
||||||
|
Status.newBuilder().setMessage("Server error!").build()));
|
||||||
|
OtlpHttpMetricExporter exporter = builder.build();
|
||||||
|
|
||||||
|
exportAndAssertResult(exporter, /* expectedResult= */ false);
|
||||||
|
await()
|
||||||
|
.atMost(Duration.ofSeconds(10))
|
||||||
|
.untilAsserted(
|
||||||
|
() -> {
|
||||||
|
LoggingEvent log =
|
||||||
|
logs.assertContains(
|
||||||
|
"Failed to export metrics. Server responded with HTTP status code 500. Error message: Server error!");
|
||||||
|
assertThat(log.getLevel()).isEqualTo(Level.WARN);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testServerErrorParseError() {
|
||||||
|
server.enqueue(
|
||||||
|
HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR, APPLICATION_PROTOBUF, "Server error!"));
|
||||||
|
OtlpHttpMetricExporter exporter = builder.build();
|
||||||
|
|
||||||
|
exportAndAssertResult(exporter, /* expectedResult= */ false);
|
||||||
|
await()
|
||||||
|
.atMost(Duration.ofSeconds(10))
|
||||||
|
.untilAsserted(
|
||||||
|
() -> {
|
||||||
|
LoggingEvent log =
|
||||||
|
logs.assertContains(
|
||||||
|
"Failed to export metrics. Server responded with HTTP status code 500. Error message: Unable to parse response body, HTTP status message:");
|
||||||
|
assertThat(log.getLevel()).isEqualTo(Level.WARN);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ExportMetricsServiceRequest exportAndAssertResult(
|
||||||
|
OtlpHttpMetricExporter otlpHttpMetricExporter, boolean expectedResult) {
|
||||||
|
List<MetricData> metrics = Collections.singletonList(generateFakeMetric());
|
||||||
|
CompletableResultCode resultCode = otlpHttpMetricExporter.export(metrics);
|
||||||
|
resultCode.join(10, TimeUnit.SECONDS);
|
||||||
|
assertThat(resultCode.isSuccess()).isEqualTo(expectedResult);
|
||||||
|
return ExportMetricsServiceRequest.newBuilder()
|
||||||
|
.addAllResourceMetrics(MetricAdapter.toProtoResourceMetrics(metrics))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static HttpResponse successResponse() {
|
||||||
|
ExportMetricsServiceResponse exportMetricsServiceResponse =
|
||||||
|
ExportMetricsServiceResponse.newBuilder().build();
|
||||||
|
return buildResponse(HttpStatus.OK, exportMetricsServiceResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T extends Message> HttpResponse buildResponse(HttpStatus httpStatus, T message) {
|
||||||
|
return HttpResponse.of(httpStatus, APPLICATION_PROTOBUF, message.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MetricData generateFakeMetric() {
|
||||||
|
long startNs = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis());
|
||||||
|
long endNs = startNs + TimeUnit.MILLISECONDS.toNanos(900);
|
||||||
|
return MetricData.createLongSum(
|
||||||
|
Resource.empty(),
|
||||||
|
InstrumentationLibraryInfo.empty(),
|
||||||
|
"name",
|
||||||
|
"description",
|
||||||
|
"1",
|
||||||
|
LongSumData.create(
|
||||||
|
/* isMonotonic= */ true,
|
||||||
|
AggregationTemporality.CUMULATIVE,
|
||||||
|
Collections.singletonList(
|
||||||
|
LongPointData.create(startNs, endNs, Attributes.of(stringKey("k"), "v"), 5))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,8 +1,5 @@
|
||||||
# OpenTelemetry - OTLP Trace Exporter - HTTP
|
# OpenTelemetry - OTLP Trace Exporter - HTTP
|
||||||
|
|
||||||
[![Javadocs][javadoc-image]][javadoc-url]
|
|
||||||
|
|
||||||
This is the OpenTelemetry exporter, sending span data to OpenTelemetry collector via HTTP without gRPC.
|
This is the OpenTelemetry exporter, sending span data to OpenTelemetry collector via HTTP without gRPC.
|
||||||
|
|
||||||
[javadoc-image]: https://www.javadoc.io/badge/io.opentelemetry/opentelemetry-exporters-otlp.svg
|
// TODO: add javadoc links once published
|
||||||
[javadoc-url]: https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-exporters-otlp
|
|
||||||
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import javax.annotation.concurrent.ThreadSafe;
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
import okhttp3.Call;
|
import okhttp3.Call;
|
||||||
import okhttp3.Callback;
|
import okhttp3.Callback;
|
||||||
|
|
@ -59,11 +60,11 @@ public final class OtlpHttpSpanExporter implements SpanExporter {
|
||||||
|
|
||||||
private final OkHttpClient client;
|
private final OkHttpClient client;
|
||||||
private final String endpoint;
|
private final String endpoint;
|
||||||
private final Headers headers;
|
@Nullable private final Headers headers;
|
||||||
private final boolean isCompressionEnabled;
|
private final boolean compressionEnabled;
|
||||||
|
|
||||||
OtlpHttpSpanExporter(
|
OtlpHttpSpanExporter(
|
||||||
OkHttpClient client, String endpoint, Headers headers, boolean isCompressionEnabled) {
|
OkHttpClient client, String endpoint, Headers headers, boolean compressionEnabled) {
|
||||||
Meter meter = GlobalMeterProvider.get().get("io.opentelemetry.exporters.otlp-http");
|
Meter meter = GlobalMeterProvider.get().get("io.opentelemetry.exporters.otlp-http");
|
||||||
this.spansSeen = meter.counterBuilder("spansSeenByExporter").build().bind(EXPORTER_NAME_LABELS);
|
this.spansSeen = meter.counterBuilder("spansSeenByExporter").build().bind(EXPORTER_NAME_LABELS);
|
||||||
LongCounter spansExportedCounter = meter.counterBuilder("spansExportedByExporter").build();
|
LongCounter spansExportedCounter = meter.counterBuilder("spansExportedByExporter").build();
|
||||||
|
|
@ -73,7 +74,7 @@ public final class OtlpHttpSpanExporter implements SpanExporter {
|
||||||
this.client = client;
|
this.client = client;
|
||||||
this.endpoint = endpoint;
|
this.endpoint = endpoint;
|
||||||
this.headers = headers;
|
this.headers = headers;
|
||||||
this.isCompressionEnabled = isCompressionEnabled;
|
this.compressionEnabled = compressionEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -96,7 +97,7 @@ public final class OtlpHttpSpanExporter implements SpanExporter {
|
||||||
}
|
}
|
||||||
RequestBody requestBody =
|
RequestBody requestBody =
|
||||||
RequestBody.create(exportTraceServiceRequest.toByteArray(), PROTOBUF_MEDIA_TYPE);
|
RequestBody.create(exportTraceServiceRequest.toByteArray(), PROTOBUF_MEDIA_TYPE);
|
||||||
if (isCompressionEnabled) {
|
if (compressionEnabled) {
|
||||||
requestBuilder.addHeader("Content-Encoding", "gzip");
|
requestBuilder.addHeader("Content-Encoding", "gzip");
|
||||||
requestBuilder.post(gzipRequestBody(requestBody));
|
requestBuilder.post(gzipRequestBody(requestBody));
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -112,11 +113,11 @@ public final class OtlpHttpSpanExporter implements SpanExporter {
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Call call, IOException e) {
|
public void onFailure(Call call, IOException e) {
|
||||||
spansExportedFailure.add(spans.size());
|
spansExportedFailure.add(spans.size());
|
||||||
result.fail();
|
|
||||||
logger.log(
|
logger.log(
|
||||||
Level.SEVERE,
|
Level.SEVERE,
|
||||||
"Failed to export spans. The request could not be executed. Full error message: "
|
"Failed to export spans. The request could not be executed. Full error message: "
|
||||||
+ e.getMessage());
|
+ e.getMessage());
|
||||||
|
result.fail();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ public final class OtlpHttpSpanExporterBuilder {
|
||||||
|
|
||||||
private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS);
|
private long timeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_TIMEOUT_SECS);
|
||||||
private String endpoint = DEFAULT_ENDPOINT;
|
private String endpoint = DEFAULT_ENDPOINT;
|
||||||
private boolean isCompressionEnabled = false;
|
private boolean compressionEnabled = false;
|
||||||
@Nullable private Headers.Builder headersBuilder;
|
@Nullable private Headers.Builder headersBuilder;
|
||||||
@Nullable private byte[] trustedCertificatesPem;
|
@Nullable private byte[] trustedCertificatesPem;
|
||||||
|
|
||||||
|
|
@ -87,7 +87,7 @@ public final class OtlpHttpSpanExporterBuilder {
|
||||||
Preconditions.checkArgument(
|
Preconditions.checkArgument(
|
||||||
compressionMethod.equals("gzip"),
|
compressionMethod.equals("gzip"),
|
||||||
"Unsupported compression method. Supported compression methods include: gzip.");
|
"Unsupported compression method. Supported compression methods include: gzip.");
|
||||||
this.isCompressionEnabled = true;
|
this.compressionEnabled = true;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -134,7 +134,7 @@ public final class OtlpHttpSpanExporterBuilder {
|
||||||
|
|
||||||
Headers headers = headersBuilder == null ? null : headersBuilder.build();
|
Headers headers = headersBuilder == null ? null : headersBuilder.build();
|
||||||
|
|
||||||
return new OtlpHttpSpanExporter(clientBuilder.build(), endpoint, headers, isCompressionEnabled);
|
return new OtlpHttpSpanExporter(clientBuilder.build(), endpoint, headers, compressionEnabled);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ package io.opentelemetry.exporter.otlp.http.trace;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
|
|
||||||
import com.google.protobuf.InvalidProtocolBufferException;
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
|
|
@ -35,6 +36,7 @@ import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
@ -181,10 +183,15 @@ class OtlpHttpSpanExporterTest {
|
||||||
OtlpHttpSpanExporter exporter = builder.build();
|
OtlpHttpSpanExporter exporter = builder.build();
|
||||||
|
|
||||||
exportAndAssertResult(exporter, /* expectedResult= */ false);
|
exportAndAssertResult(exporter, /* expectedResult= */ false);
|
||||||
LoggingEvent log =
|
await()
|
||||||
logs.assertContains(
|
.atMost(Duration.ofSeconds(10))
|
||||||
"Failed to export spans. Server responded with HTTP status code 500. Error message: Server error!");
|
.untilAsserted(
|
||||||
assertThat(log.getLevel()).isEqualTo(Level.WARN);
|
() -> {
|
||||||
|
LoggingEvent log =
|
||||||
|
logs.assertContains(
|
||||||
|
"Failed to export spans. Server responded with HTTP status code 500. Error message: Server error!");
|
||||||
|
assertThat(log.getLevel()).isEqualTo(Level.WARN);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -194,10 +201,15 @@ class OtlpHttpSpanExporterTest {
|
||||||
OtlpHttpSpanExporter exporter = builder.build();
|
OtlpHttpSpanExporter exporter = builder.build();
|
||||||
|
|
||||||
exportAndAssertResult(exporter, /* expectedResult= */ false);
|
exportAndAssertResult(exporter, /* expectedResult= */ false);
|
||||||
LoggingEvent log =
|
await()
|
||||||
logs.assertContains(
|
.atMost(Duration.ofSeconds(10))
|
||||||
"Failed to export spans. Server responded with HTTP status code 500. Error message: Unable to parse response body, HTTP status message:");
|
.untilAsserted(
|
||||||
assertThat(log.getLevel()).isEqualTo(Level.WARN);
|
() -> {
|
||||||
|
LoggingEvent log =
|
||||||
|
logs.assertContains(
|
||||||
|
"Failed to export spans. Server responded with HTTP status code 500. Error message: Unable to parse response body, HTTP status message:");
|
||||||
|
assertThat(log.getLevel()).isEqualTo(Level.WARN);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ExportTraceServiceRequest exportAndAssertResult(
|
private static ExportTraceServiceRequest exportAndAssertResult(
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,7 @@ include(":exporters:otlp:all")
|
||||||
include(":exporters:otlp:common")
|
include(":exporters:otlp:common")
|
||||||
include(":exporters:otlp:metrics")
|
include(":exporters:otlp:metrics")
|
||||||
include(":exporters:otlp:trace")
|
include(":exporters:otlp:trace")
|
||||||
|
include(":exporters:otlp-http:metrics")
|
||||||
include(":exporters:otlp-http:trace")
|
include(":exporters:otlp-http:trace")
|
||||||
include(":exporters:prometheus")
|
include(":exporters:prometheus")
|
||||||
include(":exporters:zipkin")
|
include(":exporters:zipkin")
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue