Use OkHttpGrpcExporter unless upstream Channel provided by user (#4538)

* Use OkHttp-based gRPC exporter unless setChannel is called.
This commit is contained in:
Anuraag Agrawal 2022-06-17 09:45:03 +09:00 committed by GitHub
parent ccfc9ea918
commit 852e755eeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 830 additions and 1265 deletions

View File

@ -38,7 +38,6 @@ public final class JaegerGrpcSpanExporterBuilder {
DEFAULT_TIMEOUT_SECS,
DEFAULT_ENDPOINT,
() -> MarshalerCollectorServiceGrpc::newFutureStub,
GRPC_SERVICE_NAME,
GRPC_ENDPOINT_PATH);
}

View File

@ -7,10 +7,10 @@ package io.opentelemetry.exporter.otlp.trace;
import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.grpc.GrpcService;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
@ -56,6 +56,8 @@ public class GrpcExporterBenchmark {
.http(0)
.build();
private static ManagedChannel defaultGrpcChannel;
private static GrpcExporter<TraceRequestMarshaler> defaultGrpcExporter;
private static GrpcExporter<TraceRequestMarshaler> okhttpGrpcExporter;
@ -63,23 +65,27 @@ public class GrpcExporterBenchmark {
public void setUp() {
server.start().join();
defaultGrpcChannel =
ManagedChannelBuilder.forAddress("localhost", server.activeLocalPort()).build();
defaultGrpcExporter =
new DefaultGrpcExporterBuilder<>(
GrpcExporter.builder(
"otlp",
"span",
MarshalerTraceServiceGrpc::newFutureStub,
10,
URI.create("http://localhost:" + server.activeLocalPort()),
OtlpGrpcSpanExporterBuilder.GRPC_SERVICE_NAME)
() -> MarshalerTraceServiceGrpc::newFutureStub,
OtlpGrpcSpanExporterBuilder.GRPC_ENDPOINT_PATH)
.setChannel(defaultGrpcChannel)
.build();
okhttpGrpcExporter =
new OkHttpGrpcExporterBuilder<TraceRequestMarshaler>(
GrpcExporter.builder(
"otlp",
"span",
OtlpGrpcSpanExporterBuilder.GRPC_ENDPOINT_PATH,
10,
URI.create("http://localhost:" + server.activeLocalPort()))
URI.create("http://localhost:" + server.activeLocalPort()),
() -> MarshalerTraceServiceGrpc::newFutureStub,
OtlpGrpcSpanExporterBuilder.GRPC_ENDPOINT_PATH)
.build();
}
@ -87,6 +93,7 @@ public class GrpcExporterBenchmark {
public void tearDown() {
defaultGrpcExporter.shutdown().join(10, TimeUnit.SECONDS);
okhttpGrpcExporter.shutdown().join(10, TimeUnit.SECONDS);
defaultGrpcChannel.shutdownNow();
server.stop().join();
}

View File

@ -51,7 +51,6 @@ public final class OtlpGrpcMetricExporterBuilder {
DEFAULT_TIMEOUT_SECS,
DEFAULT_ENDPOINT,
() -> MarshalerMetricsServiceGrpc::newFutureStub,
GRPC_SERVICE_NAME,
GRPC_ENDPOINT_PATH);
}

View File

@ -40,7 +40,6 @@ public final class OtlpGrpcSpanExporterBuilder {
DEFAULT_TIMEOUT_SECS,
DEFAULT_ENDPOINT,
() -> MarshalerTraceServiceGrpc::newFutureStub,
GRPC_SERVICE_NAME,
GRPC_ENDPOINT_PATH);
}

View File

@ -10,13 +10,12 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
@ -26,7 +25,7 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
import io.opentelemetry.sdk.resources.Resource;
import java.time.Duration;
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -49,69 +48,15 @@ class OtlpGrpcMetricExporterTest
}
@Test
void usingOkHttp() {
assertThat(OtlpGrpcMetricExporter.builder().delegate)
.isInstanceOf(OkHttpGrpcExporterBuilder.class);
void usingOkHttp() throws Exception {
try (Closeable exporter = OtlpGrpcMetricExporter.builder().build()) {
assertThat(exporter).extracting("delegate").isInstanceOf(OkHttpGrpcExporter.class);
}
}
@Override
protected TelemetryExporterBuilder<MetricData> exporterBuilder() {
OtlpGrpcMetricExporterBuilder builder = OtlpGrpcMetricExporter.builder();
return new TelemetryExporterBuilder<MetricData>() {
@Override
public TelemetryExporterBuilder<MetricData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.delegate.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporter<MetricData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
return TelemetryExporterBuilder.wrap(OtlpGrpcMetricExporter.builder());
}
@Override

View File

@ -12,20 +12,19 @@ import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.testing.trace.TestSpanData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import java.time.Duration;
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -50,69 +49,15 @@ class OtlpGrpcSpanExporterTest extends AbstractGrpcTelemetryExporterTest<SpanDat
}
@Test
void usingOkHttp() {
assertThat(OtlpGrpcSpanExporter.builder().delegate)
.isInstanceOf(OkHttpGrpcExporterBuilder.class);
void usingOkHttp() throws Exception {
try (Closeable exporter = OtlpGrpcSpanExporter.builder().build()) {
assertThat(exporter).extracting("delegate").isInstanceOf(OkHttpGrpcExporter.class);
}
}
@Override
protected TelemetryExporterBuilder<SpanData> exporterBuilder() {
OtlpGrpcSpanExporterBuilder builder = OtlpGrpcSpanExporter.builder();
return new TelemetryExporterBuilder<SpanData>() {
@Override
public TelemetryExporterBuilder<SpanData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.delegate.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporter<SpanData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
return TelemetryExporterBuilder.wrap(OtlpGrpcSpanExporter.builder());
}
@Override

View File

@ -9,14 +9,15 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.UpstreamGrpcExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.ManagedChannelTelemetryExporterBuilder;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
@ -26,7 +27,7 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
import io.opentelemetry.sdk.resources.Resource;
import java.time.Duration;
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -49,69 +50,20 @@ class OtlpGrpcNettyMetricExporterTest
}
@Test
void usingGrpc() {
assertThat(OtlpGrpcMetricExporter.builder().delegate)
.isInstanceOf(DefaultGrpcExporterBuilder.class);
@SuppressWarnings("deprecation") // testing deprecated feature
void usingGrpc() throws Exception {
try (Closeable exporter =
OtlpGrpcMetricExporter.builder()
.setChannel(InProcessChannelBuilder.forName("test").build())
.build()) {
assertThat(exporter).extracting("delegate").isInstanceOf(UpstreamGrpcExporter.class);
}
}
@Override
protected TelemetryExporterBuilder<MetricData> exporterBuilder() {
OtlpGrpcMetricExporterBuilder builder = OtlpGrpcMetricExporter.builder();
return new TelemetryExporterBuilder<MetricData>() {
@Override
public TelemetryExporterBuilder<MetricData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.delegate.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporter<MetricData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
return ManagedChannelTelemetryExporterBuilder.wrap(
TelemetryExporterBuilder.wrap(OtlpGrpcMetricExporter.builder()));
}
@Override

View File

@ -8,24 +8,25 @@ package io.opentelemetry.exporter.otlp.trace;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.UpstreamGrpcExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.ManagedChannelTelemetryExporterBuilder;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.testing.trace.TestSpanData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import java.time.Duration;
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -51,69 +52,20 @@ class OtlpGrpcNettySpanExporterTest
}
@Test
void usingGrpc() {
assertThat(OtlpGrpcSpanExporter.builder().delegate)
.isInstanceOf(DefaultGrpcExporterBuilder.class);
@SuppressWarnings("deprecation") // testing deprecated feature
void usingGrpc() throws Exception {
try (Closeable exporter =
OtlpGrpcSpanExporter.builder()
.setChannel(InProcessChannelBuilder.forName("test").build())
.build()) {
assertThat(exporter).extracting("delegate").isInstanceOf(UpstreamGrpcExporter.class);
}
}
@Override
protected TelemetryExporterBuilder<SpanData> exporterBuilder() {
OtlpGrpcSpanExporterBuilder builder = OtlpGrpcSpanExporter.builder();
return new TelemetryExporterBuilder<SpanData>() {
@Override
public TelemetryExporterBuilder<SpanData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.delegate.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporter<SpanData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
return ManagedChannelTelemetryExporterBuilder.wrap(
TelemetryExporterBuilder.wrap(OtlpGrpcSpanExporter.builder()));
}
@Override

View File

@ -8,13 +8,13 @@ package io.opentelemetry.exporter.otlp.metrics;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static org.assertj.core.api.Assertions.assertThat;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.UpstreamGrpcExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.ManagedChannelTelemetryExporterBuilder;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
@ -24,7 +24,7 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
import io.opentelemetry.sdk.resources.Resource;
import java.time.Duration;
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -38,69 +38,20 @@ class OtlpGrpcNettyShadedMetricExporterTest
}
@Test
void usingGrpc() {
assertThat(OtlpGrpcMetricExporter.builder().delegate)
.isInstanceOf(DefaultGrpcExporterBuilder.class);
@SuppressWarnings("deprecation") // testing deprecated feature
void usingGrpc() throws Exception {
try (Closeable exporter =
OtlpGrpcMetricExporter.builder()
.setChannel(InProcessChannelBuilder.forName("test").build())
.build()) {
assertThat(exporter).extracting("delegate").isInstanceOf(UpstreamGrpcExporter.class);
}
}
@Override
protected TelemetryExporterBuilder<MetricData> exporterBuilder() {
OtlpGrpcMetricExporterBuilder builder = OtlpGrpcMetricExporter.builder();
return new TelemetryExporterBuilder<MetricData>() {
@Override
public TelemetryExporterBuilder<MetricData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.delegate.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporter<MetricData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
return ManagedChannelTelemetryExporterBuilder.wrap(
TelemetryExporterBuilder.wrap(OtlpGrpcMetricExporter.builder()));
}
@Override

View File

@ -7,23 +7,23 @@ package io.opentelemetry.exporter.otlp.trace;
import static org.assertj.core.api.Assertions.assertThat;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.UpstreamGrpcExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.ManagedChannelTelemetryExporterBuilder;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.testing.trace.TestSpanData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import java.time.Duration;
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -40,69 +40,20 @@ class OtlpGrpcNettyShadedSpanExporterTest
}
@Test
void usingGrpc() {
assertThat(OtlpGrpcSpanExporter.builder().delegate)
.isInstanceOf(DefaultGrpcExporterBuilder.class);
@SuppressWarnings("deprecation") // testing deprecated feature
void usingGrpc() throws Exception {
try (Closeable exporter =
OtlpGrpcSpanExporter.builder()
.setChannel(InProcessChannelBuilder.forName("test").build())
.build()) {
assertThat(exporter).extracting("delegate").isInstanceOf(UpstreamGrpcExporter.class);
}
}
@Override
protected TelemetryExporterBuilder<SpanData> exporterBuilder() {
OtlpGrpcSpanExporterBuilder builder = OtlpGrpcSpanExporter.builder();
return new TelemetryExporterBuilder<SpanData>() {
@Override
public TelemetryExporterBuilder<SpanData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.delegate.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporter<SpanData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
return ManagedChannelTelemetryExporterBuilder.wrap(
TelemetryExporterBuilder.wrap(OtlpGrpcSpanExporter.builder()));
}
@Override

View File

@ -8,13 +8,13 @@ package io.opentelemetry.exporter.otlp.metrics;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static org.assertj.core.api.Assertions.assertThat;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.UpstreamGrpcExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.ManagedChannelTelemetryExporterBuilder;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
@ -24,7 +24,7 @@ import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
import io.opentelemetry.sdk.resources.Resource;
import java.time.Duration;
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -38,69 +38,20 @@ class OtlpGrpcOkHttpMetricExporterTest
}
@Test
void usingGrpc() {
assertThat(OtlpGrpcMetricExporter.builder().delegate)
.isInstanceOf(DefaultGrpcExporterBuilder.class);
@SuppressWarnings("deprecation") // testing deprecated featurea
void usingGrpc() throws Exception {
try (Closeable exporter =
OtlpGrpcMetricExporter.builder()
.setChannel(InProcessChannelBuilder.forName("test").build())
.build()) {
assertThat(exporter).extracting("delegate").isInstanceOf(UpstreamGrpcExporter.class);
}
}
@Override
protected TelemetryExporterBuilder<MetricData> exporterBuilder() {
OtlpGrpcMetricExporterBuilder builder = OtlpGrpcMetricExporter.builder();
return new TelemetryExporterBuilder<MetricData>() {
@Override
public TelemetryExporterBuilder<MetricData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.delegate.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporter<MetricData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
return ManagedChannelTelemetryExporterBuilder.wrap(
TelemetryExporterBuilder.wrap(OtlpGrpcMetricExporter.builder()));
}
@Override

View File

@ -7,23 +7,23 @@ package io.opentelemetry.exporter.otlp.trace;
import static org.assertj.core.api.Assertions.assertThat;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.UpstreamGrpcExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.traces.ResourceSpansMarshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.ManagedChannelTelemetryExporterBuilder;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.testing.trace.TestSpanData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import java.time.Duration;
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -40,69 +40,20 @@ class OtlpGrpcOkHttpSpanExporterTest
}
@Test
void usingGrpc() {
assertThat(OtlpGrpcSpanExporter.builder().delegate)
.isInstanceOf(DefaultGrpcExporterBuilder.class);
@SuppressWarnings("deprecation") // testing deprecated feature
void usingGrpc() throws Exception {
try (Closeable exporter =
OtlpGrpcSpanExporter.builder()
.setChannel(InProcessChannelBuilder.forName("test").build())
.build()) {
assertThat(exporter).extracting("delegate").isInstanceOf(UpstreamGrpcExporter.class);
}
}
@Override
protected TelemetryExporterBuilder<SpanData> exporterBuilder() {
OtlpGrpcSpanExporterBuilder builder = OtlpGrpcSpanExporter.builder();
return new TelemetryExporterBuilder<SpanData>() {
@Override
public TelemetryExporterBuilder<SpanData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.delegate.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporter<SpanData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
return ManagedChannelTelemetryExporterBuilder.wrap(
TelemetryExporterBuilder.wrap(OtlpGrpcSpanExporter.builder()));
}
@Override

View File

@ -1,179 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.internal.grpc;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import static io.opentelemetry.exporter.internal.grpc.ManagedChannelUtil.toServiceConfig;
import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
/**
* A builder for {@link DefaultGrpcExporter}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class DefaultGrpcExporterBuilder<T extends Marshaler>
implements GrpcExporterBuilder<T> {
private final String exporterName;
private final String type;
private final BiFunction<ManagedChannel, String, MarshalerServiceStub<T, ?, ?>> stubFactory;
private final String grpcServiceName;
@Nullable private ManagedChannel channel;
private long timeoutNanos;
private URI endpoint;
private boolean compressionEnabled = false;
@Nullable private Metadata metadata;
@Nullable private byte[] trustedCertificatesPem;
@Nullable private byte[] privateKeyPem;
@Nullable private byte[] certificatePem;
@Nullable RetryPolicy retryPolicy;
@Nullable String authorityOverride;
private MeterProvider meterProvider = MeterProvider.noop();
/** Creates a new {@link DefaultGrpcExporterBuilder}. */
// Visible for testing
public DefaultGrpcExporterBuilder(
String exporterName,
String type,
BiFunction<ManagedChannel, String, MarshalerServiceStub<T, ?, ?>> stubFactory,
long defaultTimeoutSecs,
URI defaultEndpoint,
String grpcServiceName) {
this.exporterName = exporterName;
this.type = type;
this.stubFactory = stubFactory;
this.grpcServiceName = grpcServiceName;
timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs);
endpoint = defaultEndpoint;
}
@Override
public DefaultGrpcExporterBuilder<T> setChannel(ManagedChannel channel) {
this.channel = channel;
return this;
}
@Override
public DefaultGrpcExporterBuilder<T> setTimeout(long timeout, TimeUnit unit) {
timeoutNanos = unit.toNanos(timeout);
return this;
}
@Override
public DefaultGrpcExporterBuilder<T> setTimeout(Duration timeout) {
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}
@Override
public DefaultGrpcExporterBuilder<T> setEndpoint(String endpoint) {
this.endpoint = ExporterBuilderUtil.validateEndpoint(endpoint);
return this;
}
@Override
public DefaultGrpcExporterBuilder<T> setCompression(String compressionMethod) {
this.compressionEnabled = true;
return this;
}
@Override
public DefaultGrpcExporterBuilder<T> setTrustedCertificates(byte[] trustedCertificatesPem) {
this.trustedCertificatesPem = trustedCertificatesPem;
return this;
}
@Override
public GrpcExporterBuilder<T> setClientTls(byte[] privateKeyPem, byte[] certificatePem) {
this.privateKeyPem = privateKeyPem;
this.certificatePem = certificatePem;
return this;
}
@Override
public DefaultGrpcExporterBuilder<T> addHeader(String key, String value) {
if (key.equals("host")) {
authorityOverride = value;
return this;
}
if (metadata == null) {
metadata = new Metadata();
}
metadata.put(Metadata.Key.of(key, ASCII_STRING_MARSHALLER), value);
return this;
}
@Override
public GrpcExporterBuilder<T> setRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return this;
}
@Override
public GrpcExporterBuilder<T> setMeterProvider(MeterProvider meterProvider) {
this.meterProvider = meterProvider;
return this;
}
@Override
public GrpcExporter<T> build() {
ManagedChannel channel = this.channel;
if (channel == null) {
ManagedChannelBuilder<?> managedChannelBuilder =
ManagedChannelBuilder.forTarget(endpoint.getAuthority());
if (endpoint.getScheme().equals("https")) {
managedChannelBuilder.useTransportSecurity();
} else {
managedChannelBuilder.usePlaintext();
}
if (metadata != null) {
managedChannelBuilder.intercept(MetadataUtils.newAttachHeadersInterceptor(metadata));
}
if (trustedCertificatesPem != null) {
try {
ManagedChannelUtil.setClientKeysAndTrustedCertificatesPem(
managedChannelBuilder, privateKeyPem, certificatePem, trustedCertificatesPem);
} catch (SSLException e) {
throw new IllegalStateException(
"Could not set trusted certificates for gRPC TLS connection, are they valid "
+ "X.509 in PEM format?",
e);
}
}
if (retryPolicy != null) {
managedChannelBuilder.defaultServiceConfig(toServiceConfig(grpcServiceName, retryPolicy));
}
channel = managedChannelBuilder.build();
}
Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE;
MarshalerServiceStub<T, ?, ?> stub =
stubFactory.apply(channel, authorityOverride).withCompression(codec.getMessageEncoding());
return new DefaultGrpcExporter<>(
exporterName, type, channel, stub, meterProvider, timeoutNanos);
}
}

View File

@ -5,7 +5,7 @@
package io.opentelemetry.exporter.internal.grpc;
import io.grpc.ManagedChannel;
import io.grpc.Channel;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.net.URI;
@ -26,17 +26,10 @@ public interface GrpcExporter<T extends Marshaler> {
String type,
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<BiFunction<ManagedChannel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
String grpcServiceName,
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
String grpcEndpointPath) {
return GrpcExporterUtil.exporterBuilder(
exporterName,
type,
defaultTimeoutSecs,
defaultEndpoint,
stubFactory,
grpcServiceName,
grpcEndpointPath);
return new GrpcExporterBuilder<>(
exporterName, type, defaultTimeoutSecs, defaultEndpoint, stubFactory, grpcEndpointPath);
}
/**

View File

@ -5,12 +5,35 @@
package io.opentelemetry.exporter.internal.grpc;
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.TlsUtil;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.okhttp.OkHttpUtil;
import io.opentelemetry.exporter.internal.retry.RetryInterceptor;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
/**
* A builder for {@link GrpcExporter}.
@ -18,26 +41,175 @@ import java.util.concurrent.TimeUnit;
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public interface GrpcExporterBuilder<T extends Marshaler> {
GrpcExporterBuilder<T> setChannel(ManagedChannel channel);
@SuppressWarnings("JavadocMethod")
public class GrpcExporterBuilder<T extends Marshaler> {
GrpcExporterBuilder<T> setTimeout(long timeout, TimeUnit unit);
private final String exporterName;
private final String type;
private final String grpcEndpointPath;
private final Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>>
grpcStubFactory;
GrpcExporterBuilder<T> setTimeout(Duration timeout);
private long timeoutNanos;
private URI endpoint;
private boolean compressionEnabled = false;
private final Map<String, String> headers = new HashMap<>();
@Nullable private byte[] trustedCertificatesPem;
@Nullable private byte[] privateKeyPem;
@Nullable private byte[] certificatePem;
@Nullable private RetryPolicy retryPolicy;
private MeterProvider meterProvider = MeterProvider.noop();
GrpcExporterBuilder<T> setEndpoint(String endpoint);
// Use Object type since gRPC may not be on the classpath.
@Nullable private Object grpcChannel;
GrpcExporterBuilder<T> setCompression(String compressionMethod);
GrpcExporterBuilder(
String exporterName,
String type,
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> grpcStubFactory,
String grpcEndpointPath) {
this.exporterName = exporterName;
this.type = type;
this.grpcEndpointPath = grpcEndpointPath;
timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs);
endpoint = defaultEndpoint;
this.grpcStubFactory = grpcStubFactory;
}
GrpcExporterBuilder<T> setTrustedCertificates(byte[] trustedCertificatesPem);
public GrpcExporterBuilder<T> setChannel(ManagedChannel channel) {
this.grpcChannel = channel;
return this;
}
GrpcExporterBuilder<T> setClientTls(byte[] privateKeyPem, byte[] certificatePem);
public GrpcExporterBuilder<T> setTimeout(long timeout, TimeUnit unit) {
timeoutNanos = unit.toNanos(timeout);
return this;
}
GrpcExporterBuilder<T> addHeader(String key, String value);
public GrpcExporterBuilder<T> setTimeout(Duration timeout) {
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}
GrpcExporterBuilder<T> setRetryPolicy(RetryPolicy retryPolicy);
public GrpcExporterBuilder<T> setEndpoint(String endpoint) {
this.endpoint = ExporterBuilderUtil.validateEndpoint(endpoint);
return this;
}
GrpcExporterBuilder<T> setMeterProvider(MeterProvider meterProvider);
public GrpcExporterBuilder<T> setCompression(String compressionMethod) {
this.compressionEnabled = true;
return this;
}
GrpcExporter<T> build();
public GrpcExporterBuilder<T> setTrustedCertificates(byte[] trustedCertificatesPem) {
this.trustedCertificatesPem = trustedCertificatesPem;
return this;
}
public GrpcExporterBuilder<T> setClientTls(byte[] privateKeyPem, byte[] certificatePem) {
this.privateKeyPem = privateKeyPem;
this.certificatePem = certificatePem;
return this;
}
public GrpcExporterBuilder<T> addHeader(String key, String value) {
headers.put(key, value);
return this;
}
public GrpcExporterBuilder<T> setRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return this;
}
public GrpcExporterBuilder<T> setMeterProvider(MeterProvider meterProvider) {
this.meterProvider = meterProvider;
return this;
}
public GrpcExporter<T> build() {
if (grpcChannel != null) {
return new UpstreamGrpcExporterFactory().buildWithChannel((Channel) grpcChannel);
}
OkHttpClient.Builder clientBuilder =
new OkHttpClient.Builder().dispatcher(OkHttpUtil.newDispatcher());
clientBuilder.callTimeout(Duration.ofNanos(timeoutNanos));
if (trustedCertificatesPem != null) {
try {
X509TrustManager trustManager = TlsUtil.trustManager(trustedCertificatesPem);
X509KeyManager keyManager = null;
if (privateKeyPem != null && certificatePem != null) {
keyManager = TlsUtil.keyManager(privateKeyPem, certificatePem);
}
clientBuilder.sslSocketFactory(
TlsUtil.sslSocketFactory(keyManager, trustManager), trustManager);
} catch (SSLException e) {
throw new IllegalStateException(
"Could not set trusted certificates, are they valid X.509 in PEM format?", e);
}
}
String endpoint = this.endpoint.resolve(grpcEndpointPath).toString();
if (endpoint.startsWith("http://")) {
clientBuilder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE));
} else {
clientBuilder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1));
}
Headers.Builder headers = new Headers.Builder();
this.headers.forEach(headers::add);
headers.add("te", "trailers");
if (compressionEnabled) {
headers.add("grpc-encoding", "gzip");
}
if (retryPolicy != null) {
clientBuilder.addInterceptor(
new RetryInterceptor(retryPolicy, OkHttpGrpcExporter::isRetryable));
}
return new OkHttpGrpcExporter<>(
exporterName,
type,
clientBuilder.build(),
meterProvider,
endpoint,
headers.build(),
compressionEnabled);
}
// Use an inner class to ensure GrpcExporterBuilder does not have classloading dependencies on
// upstream gRPC.
private class UpstreamGrpcExporterFactory {
private GrpcExporter<T> buildWithChannel(Channel channel) {
Metadata metadata = new Metadata();
String authorityOverride = null;
for (Map.Entry<String, String> entry : headers.entrySet()) {
String name = entry.getKey();
String value = entry.getValue();
if (name.equals("host")) {
authorityOverride = value;
continue;
}
metadata.put(Metadata.Key.of(name, Metadata.ASCII_STRING_MARSHALLER), value);
}
channel =
ClientInterceptors.intercept(
channel, MetadataUtils.newAttachHeadersInterceptor(metadata));
Codec codec = compressionEnabled ? new Codec.Gzip() : Codec.Identity.NONE;
MarshalerServiceStub<T, ?, ?> stub =
grpcStubFactory
.get()
.apply(channel, authorityOverride)
.withCompression(codec.getMessageEncoding());
return new UpstreamGrpcExporter<>(exporterName, type, stub, meterProvider, timeoutNanos);
}
}
}

View File

@ -5,53 +5,12 @@
package io.opentelemetry.exporter.internal.grpc;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import java.net.URI;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
final class GrpcExporterUtil {
private static final boolean USE_OKHTTP;
static {
boolean useOkhttp = true;
// Use the OkHttp exporter unless grpc-stub is on the classpath.
try {
Class.forName("io.grpc.stub.AbstractStub");
useOkhttp = false;
} catch (ClassNotFoundException e) {
// Fall through
}
USE_OKHTTP = useOkhttp;
}
static <T extends Marshaler> GrpcExporterBuilder<T> exporterBuilder(
String exporterName,
String type,
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<BiFunction<ManagedChannel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
String grpcServiceName,
String grpcEndpointPath) {
if (USE_OKHTTP) {
return new OkHttpGrpcExporterBuilder<>(
exporterName, type, grpcEndpointPath, defaultTimeoutSecs, defaultEndpoint);
} else {
return new DefaultGrpcExporterBuilder<>(
exporterName,
type,
stubFactory.get(),
defaultTimeoutSecs,
defaultEndpoint,
grpcServiceName);
}
}
static void logUnimplemented(Logger logger, String type, @Nullable String fullErrorMessage) {
String envVar;
switch (type) {

View File

@ -1,174 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.internal.grpc;
import io.grpc.ManagedChannel;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.TlsUtil;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.okhttp.OkHttpUtil;
import io.opentelemetry.exporter.internal.retry.RetryInterceptor;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
/**
* A builder for {@link OkHttpGrpcExporter}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class OkHttpGrpcExporterBuilder<T extends Marshaler>
implements GrpcExporterBuilder<T> {
private final String exporterName;
private final String type;
private final String grpcEndpointPath;
private long timeoutNanos;
private URI endpoint;
private boolean compressionEnabled = false;
private final Headers.Builder headers = new Headers.Builder();
@Nullable private byte[] trustedCertificatesPem;
@Nullable private byte[] privateKeyPem;
@Nullable private byte[] certificatePem;
@Nullable private RetryPolicy retryPolicy;
private MeterProvider meterProvider = MeterProvider.noop();
/** Creates a new {@link OkHttpGrpcExporterBuilder}. */
// Visible for testing
public OkHttpGrpcExporterBuilder(
String exporterName,
String type,
String grpcEndpointPath,
long defaultTimeoutSecs,
URI defaultEndpoint) {
this.exporterName = exporterName;
this.type = type;
this.grpcEndpointPath = grpcEndpointPath;
timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs);
endpoint = defaultEndpoint;
}
@Override
public OkHttpGrpcExporterBuilder<T> setChannel(ManagedChannel channel) {
throw new UnsupportedOperationException("Only available on DefaultGrpcExporter");
}
@Override
public OkHttpGrpcExporterBuilder<T> setTimeout(long timeout, TimeUnit unit) {
timeoutNanos = unit.toNanos(timeout);
return this;
}
@Override
public OkHttpGrpcExporterBuilder<T> setTimeout(Duration timeout) {
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}
@Override
public OkHttpGrpcExporterBuilder<T> setEndpoint(String endpoint) {
this.endpoint = ExporterBuilderUtil.validateEndpoint(endpoint);
return this;
}
@Override
public OkHttpGrpcExporterBuilder<T> setCompression(String compressionMethod) {
this.compressionEnabled = true;
return this;
}
@Override
public OkHttpGrpcExporterBuilder<T> setTrustedCertificates(byte[] trustedCertificatesPem) {
this.trustedCertificatesPem = trustedCertificatesPem;
return this;
}
@Override
public GrpcExporterBuilder<T> setClientTls(byte[] privateKeyPem, byte[] certificatePem) {
this.privateKeyPem = privateKeyPem;
this.certificatePem = certificatePem;
return this;
}
@Override
public OkHttpGrpcExporterBuilder<T> addHeader(String key, String value) {
headers.add(key, value);
return this;
}
@Override
public GrpcExporterBuilder<T> setRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return this;
}
@Override
public GrpcExporterBuilder<T> setMeterProvider(MeterProvider meterProvider) {
this.meterProvider = meterProvider;
return this;
}
@Override
public GrpcExporter<T> build() {
OkHttpClient.Builder clientBuilder =
new OkHttpClient.Builder().dispatcher(OkHttpUtil.newDispatcher());
clientBuilder.callTimeout(Duration.ofNanos(timeoutNanos));
if (trustedCertificatesPem != null) {
try {
X509TrustManager trustManager = TlsUtil.trustManager(trustedCertificatesPem);
X509KeyManager keyManager = null;
if (privateKeyPem != null && certificatePem != null) {
keyManager = TlsUtil.keyManager(privateKeyPem, certificatePem);
}
clientBuilder.sslSocketFactory(
TlsUtil.sslSocketFactory(keyManager, trustManager), trustManager);
} catch (SSLException e) {
throw new IllegalStateException(
"Could not set trusted certificates, are they valid X.509 in PEM format?", e);
}
}
String endpoint = this.endpoint.resolve(grpcEndpointPath).toString();
if (endpoint.startsWith("http://")) {
clientBuilder.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE));
} else {
clientBuilder.protocols(Arrays.asList(Protocol.HTTP_2, Protocol.HTTP_1_1));
}
headers.add("te", "trailers");
if (compressionEnabled) {
headers.add("grpc-encoding", "gzip");
}
if (retryPolicy != null) {
clientBuilder.addInterceptor(
new RetryInterceptor(retryPolicy, OkHttpGrpcExporter::isRetryable));
}
return new OkHttpGrpcExporter<>(
exporterName,
type,
clientBuilder.build(),
meterProvider,
endpoint,
headers.build(),
compressionEnabled);
}
}

View File

@ -8,7 +8,6 @@ package io.opentelemetry.exporter.internal.grpc;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.ExporterMetrics;
@ -22,15 +21,15 @@ import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
* A {@link GrpcExporter} which uses the standard grpc-java library.
* A {@link GrpcExporter} which uses the upstream grpc-java library.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class DefaultGrpcExporter<T extends Marshaler> implements GrpcExporter<T> {
public final class UpstreamGrpcExporter<T extends Marshaler> implements GrpcExporter<T> {
private static final Logger internalLogger =
Logger.getLogger(DefaultGrpcExporter.class.getName());
Logger.getLogger(UpstreamGrpcExporter.class.getName());
private final ThrottlingLogger logger = new ThrottlingLogger(internalLogger);
@ -39,21 +38,18 @@ public final class DefaultGrpcExporter<T extends Marshaler> implements GrpcExpor
private final String type;
private final ExporterMetrics exporterMetrics;
private final ManagedChannel managedChannel;
private final MarshalerServiceStub<T, ?, ?> stub;
private final long timeoutNanos;
/** Creates a new {@link DefaultGrpcExporter}. */
DefaultGrpcExporter(
/** Creates a new {@link UpstreamGrpcExporter}. */
UpstreamGrpcExporter(
String exporterName,
String type,
ManagedChannel channel,
MarshalerServiceStub<T, ?, ?> stub,
MeterProvider meterProvider,
long timeoutNanos) {
this.type = type;
this.exporterMetrics = ExporterMetrics.createGrpc(exporterName, type, meterProvider);
this.managedChannel = channel;
this.timeoutNanos = timeoutNanos;
this.stub = stub;
}
@ -121,9 +117,6 @@ public final class DefaultGrpcExporter<T extends Marshaler> implements GrpcExpor
@Override
public CompletableResultCode shutdown() {
if (managedChannel.isTerminated()) {
return CompletableResultCode.ofSuccess();
}
return ManagedChannelUtil.shutdownChannel(managedChannel);
return CompletableResultCode.ofSuccess();
}
}

View File

@ -5,9 +5,8 @@
package io.opentelemetry.exporter.internal.retry;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.GrpcStatusUtil;
import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.okhttp.OkHttpExporterBuilder;
import java.lang.reflect.Field;
import java.util.Arrays;
@ -50,22 +49,19 @@ public class RetryUtil {
}
/**
* Reflectively access a {@link DefaultGrpcExporterBuilder}, {@link OkHttpGrpcExporterBuilder}, or
* {@link OkHttpExporterBuilder} instance in field called "delegate" of the instance, and set the
* {@link RetryPolicy}.
* Reflectively access a {@link GrpcExporterBuilder}, or {@link OkHttpExporterBuilder} instance in
* field called "delegate" of the instance, and set the {@link RetryPolicy}.
*
* @throws IllegalArgumentException if the instance does not contain a field called "delegate" of
* type {@link DefaultGrpcExporterBuilder}
* a supported type.
*/
public static void setRetryPolicyOnDelegate(Object instance, RetryPolicy retryPolicy) {
try {
Field field = instance.getClass().getDeclaredField("delegate");
field.setAccessible(true);
Object value = field.get(instance);
if (value instanceof DefaultGrpcExporterBuilder) {
((DefaultGrpcExporterBuilder<?>) value).setRetryPolicy(retryPolicy);
} else if (value instanceof OkHttpGrpcExporterBuilder) {
((OkHttpGrpcExporterBuilder<?>) value).setRetryPolicy(retryPolicy);
if (value instanceof GrpcExporterBuilder) {
((GrpcExporterBuilder<?>) value).setRetryPolicy(retryPolicy);
} else if (value instanceof OkHttpExporterBuilder) {
((OkHttpExporterBuilder<?>) value).setRetryPolicy(retryPolicy);
} else {

View File

@ -145,7 +145,7 @@ class RetryInterceptorTest {
when(random.get(anyLong())).thenReturn(1L);
doNothing().when(sleeper).sleep(anyLong());
// Connecting to a non-routable IP address to trigger connection timeout
// Connecting to a non-routable IP address to trigger connection error
assertThatThrownBy(
() ->
client.newCall(new Request.Builder().url("http://10.255.255.1").build()).execute())

View File

@ -9,8 +9,8 @@ import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder;
import io.opentelemetry.exporter.internal.okhttp.OkHttpExporterBuilder;
import java.net.URI;
import java.net.URISyntaxException;
@ -20,24 +20,11 @@ import org.junit.jupiter.api.Test;
class RetryUtilTest {
@Test
void setRetryPolicyOnDelegate_DefaultGrpcExporterBuilder() throws URISyntaxException {
void setRetryPolicyOnDelegate_GrpcExporterBuilder() throws URISyntaxException {
RetryPolicy retryPolicy = RetryPolicy.getDefault();
DefaultGrpcExporterBuilder<?> builder =
new DefaultGrpcExporterBuilder<>(
"otlp", "test", (u1, u2) -> null, 0, new URI("http://localhost"), "test");
RetryUtil.setRetryPolicyOnDelegate(new WithDelegate(builder), retryPolicy);
assertThat(builder)
.extracting("retryPolicy", as(InstanceOfAssertFactories.type(RetryPolicy.class)))
.isEqualTo(retryPolicy);
}
@Test
void setRetryPolicyOnDelegate_OkHttpGrpcExporterBuilder() throws URISyntaxException {
RetryPolicy retryPolicy = RetryPolicy.getDefault();
OkHttpGrpcExporterBuilder<?> builder =
new OkHttpGrpcExporterBuilder<>("otlp", "test", "/test", 0, new URI("http://localhost"));
GrpcExporterBuilder<?> builder =
GrpcExporter.builder(
"otlp", "test", 0, new URI("http://localhost"), () -> (u1, u2) -> null, "/test");
RetryUtil.setRetryPolicyOnDelegate(new WithDelegate(builder), retryPolicy);

View File

@ -40,7 +40,6 @@ public final class OtlpGrpcLogExporterBuilder {
DEFAULT_TIMEOUT_SECS,
DEFAULT_ENDPOINT,
() -> MarshalerLogsServiceGrpc::newFutureStub,
GRPC_SERVICE_NAME,
GRPC_ENDPOINT_PATH);
}

View File

@ -9,13 +9,12 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.logs.ResourceLogsMarshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
@ -23,10 +22,9 @@ import io.opentelemetry.sdk.logs.data.LogData;
import io.opentelemetry.sdk.logs.data.LogDataBuilder;
import io.opentelemetry.sdk.logs.data.Severity;
import io.opentelemetry.sdk.resources.Resource;
import java.time.Duration;
import java.io.Closeable;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
class OtlpGrpcLogExporterTest extends AbstractGrpcTelemetryExporterTest<LogData, ResourceLogs> {
@ -45,69 +43,15 @@ class OtlpGrpcLogExporterTest extends AbstractGrpcTelemetryExporterTest<LogData,
}
@Test
void usingOkHttp() {
assertThat(OtlpGrpcLogExporter.builder().delegate)
.isInstanceOf(OkHttpGrpcExporterBuilder.class);
void usingOkHttp() throws Exception {
try (Closeable exporter = OtlpGrpcLogExporter.builder().build()) {
assertThat(exporter).extracting("delegate").isInstanceOf(OkHttpGrpcExporter.class);
}
}
@Override
protected TelemetryExporterBuilder<LogData> exporterBuilder() {
OtlpGrpcLogExporterBuilder builder = OtlpGrpcLogExporter.builder();
return new TelemetryExporterBuilder<LogData>() {
@Override
public TelemetryExporterBuilder<LogData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.delegate.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporter<LogData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
return TelemetryExporterBuilder.wrap(OtlpGrpcLogExporter.builder());
}
@Override

View File

@ -8,14 +8,15 @@ package io.opentelemetry.exporter.otlp.logs;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.UpstreamGrpcExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.logs.ResourceLogsMarshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.ManagedChannelTelemetryExporterBuilder;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
@ -23,10 +24,9 @@ import io.opentelemetry.sdk.logs.data.LogData;
import io.opentelemetry.sdk.logs.data.LogDataBuilder;
import io.opentelemetry.sdk.logs.data.Severity;
import io.opentelemetry.sdk.resources.Resource;
import java.time.Duration;
import java.io.Closeable;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
class OtlpGrpcNettyLogExporterTest
@ -46,69 +46,20 @@ class OtlpGrpcNettyLogExporterTest
}
@Test
void usingGrpc() {
assertThat(OtlpGrpcLogExporter.builder().delegate)
.isInstanceOf(DefaultGrpcExporterBuilder.class);
@SuppressWarnings("deprecation") // testing deprecated feature
void usingGrpc() throws Exception {
try (Closeable exporter =
OtlpGrpcLogExporter.builder()
.setChannel(InProcessChannelBuilder.forName("test").build())
.build()) {
assertThat(exporter).extracting("delegate").isInstanceOf(UpstreamGrpcExporter.class);
}
}
@Override
protected TelemetryExporterBuilder<LogData> exporterBuilder() {
OtlpGrpcLogExporterBuilder builder = OtlpGrpcLogExporter.builder();
return new TelemetryExporterBuilder<LogData>() {
@Override
public TelemetryExporterBuilder<LogData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.delegate.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporter<LogData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
return ManagedChannelTelemetryExporterBuilder.wrap(
TelemetryExporterBuilder.wrap(OtlpGrpcLogExporter.builder()));
}
@Override

View File

@ -7,13 +7,13 @@ package io.opentelemetry.exporter.otlp.logs;
import static org.assertj.core.api.Assertions.assertThat;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.UpstreamGrpcExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.logs.ResourceLogsMarshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.ManagedChannelTelemetryExporterBuilder;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
@ -21,10 +21,9 @@ import io.opentelemetry.sdk.logs.data.LogData;
import io.opentelemetry.sdk.logs.data.LogDataBuilder;
import io.opentelemetry.sdk.logs.data.Severity;
import io.opentelemetry.sdk.resources.Resource;
import java.time.Duration;
import java.io.Closeable;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
class OtlpGrpcNettyShadedLogExporterTest
@ -35,69 +34,20 @@ class OtlpGrpcNettyShadedLogExporterTest
}
@Test
void usingGrpc() {
assertThat(OtlpGrpcLogExporter.builder().delegate)
.isInstanceOf(DefaultGrpcExporterBuilder.class);
@SuppressWarnings("deprecation") // testing deprecated feature
void usingGrpc() throws Exception {
try (Closeable exporter =
OtlpGrpcLogExporter.builder()
.setChannel(InProcessChannelBuilder.forName("test").build())
.build()) {
assertThat(exporter).extracting("delegate").isInstanceOf(UpstreamGrpcExporter.class);
}
}
@Override
protected TelemetryExporterBuilder<LogData> exporterBuilder() {
OtlpGrpcLogExporterBuilder builder = OtlpGrpcLogExporter.builder();
return new TelemetryExporterBuilder<LogData>() {
@Override
public TelemetryExporterBuilder<LogData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.delegate.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporter<LogData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
return ManagedChannelTelemetryExporterBuilder.wrap(
TelemetryExporterBuilder.wrap(OtlpGrpcLogExporter.builder()));
}
@Override

View File

@ -7,13 +7,13 @@ package io.opentelemetry.exporter.otlp.logs;
import static org.assertj.core.api.Assertions.assertThat;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.grpc.UpstreamGrpcExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.otlp.logs.ResourceLogsMarshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.ManagedChannelTelemetryExporterBuilder;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
@ -21,10 +21,9 @@ import io.opentelemetry.sdk.logs.data.LogData;
import io.opentelemetry.sdk.logs.data.LogDataBuilder;
import io.opentelemetry.sdk.logs.data.Severity;
import io.opentelemetry.sdk.resources.Resource;
import java.time.Duration;
import java.io.Closeable;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
class OtlpGrpcNettyOkHttpLogExporterTest
@ -35,69 +34,20 @@ class OtlpGrpcNettyOkHttpLogExporterTest
}
@Test
void usingGrpc() {
assertThat(OtlpGrpcLogExporter.builder().delegate)
.isInstanceOf(DefaultGrpcExporterBuilder.class);
@SuppressWarnings("deprecation") // testing deprecated feature
void usingGrpc() throws Exception {
try (Closeable exporter =
OtlpGrpcLogExporter.builder()
.setChannel(InProcessChannelBuilder.forName("test").build())
.build()) {
assertThat(exporter).extracting("delegate").isInstanceOf(UpstreamGrpcExporter.class);
}
}
@Override
protected TelemetryExporterBuilder<LogData> exporterBuilder() {
OtlpGrpcLogExporterBuilder builder = OtlpGrpcLogExporter.builder();
return new TelemetryExporterBuilder<LogData>() {
@Override
public TelemetryExporterBuilder<LogData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setRetryPolicy(RetryPolicy retryPolicy) {
builder.delegate.setRetryPolicy(retryPolicy);
return this;
}
@Override
public TelemetryExporter<LogData> build() {
return TelemetryExporter.wrap(builder.build());
}
};
return ManagedChannelTelemetryExporterBuilder.wrap(
TelemetryExporterBuilder.wrap(OtlpGrpcLogExporter.builder()));
}
@Override

View File

@ -12,6 +12,12 @@ dependencies {
api(project(":sdk:trace"))
api(project(":sdk:testing"))
api(project(":exporters:otlp:all"))
api(project(":exporters:otlp:logs"))
// Must be compileOnly so gRPC isn't on the classpath for non-gRPC tests.
compileOnly("io.grpc:grpc-stub")
implementation(project(":testing-internal"))
api("io.opentelemetry.proto:opentelemetry-proto")

View File

@ -22,8 +22,8 @@ import com.linecorp.armeria.server.logging.LoggingService;
import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.github.netmikey.logunit.api.LogCapturer;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporter;
import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporter;
import io.opentelemetry.exporter.internal.grpc.UpstreamGrpcExporter;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
@ -162,7 +162,7 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
@RegisterExtension
LogCapturer logs =
LogCapturer.create()
.captureForType(usingOkHttp() ? OkHttpGrpcExporter.class : DefaultGrpcExporter.class);
.captureForType(usingOkHttp() ? OkHttpGrpcExporter.class : UpstreamGrpcExporter.class);
private final String type;
private final U resourceTelemetryInstance;
@ -260,7 +260,7 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
@Test
@SuppressLogger(OkHttpGrpcExporter.class)
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(UpstreamGrpcExporter.class)
void tls_untrusted() {
TelemetryExporter<T> exporter =
exporterBuilder().setEndpoint(server.httpsUri().toString()).build();
@ -275,11 +275,12 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
@Test
@SuppressLogger(OkHttpGrpcExporter.class)
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(UpstreamGrpcExporter.class)
void tls_badCert() {
assertThatThrownBy(
() ->
exporterBuilder()
.setEndpoint(server.httpsUri().toString())
.setTrustedCertificates("foobar".getBytes(StandardCharsets.UTF_8))
.build())
.isInstanceOf(IllegalStateException.class)
@ -334,7 +335,7 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
@Test
@SuppressLogger(OkHttpGrpcExporter.class)
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(UpstreamGrpcExporter.class)
void exportAfterShutdown() {
TelemetryExporter<T> exporter =
exporterBuilder().setEndpoint(server.httpUri().toString()).build();
@ -357,7 +358,7 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
@Test
@SuppressLogger(OkHttpGrpcExporter.class)
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(UpstreamGrpcExporter.class)
void error() {
addGrpcError(13, null);
assertThat(
@ -376,7 +377,7 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
@Test
@SuppressLogger(OkHttpGrpcExporter.class)
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(UpstreamGrpcExporter.class)
void errorWithMessage() {
addGrpcError(8, "out of quota");
assertThat(
@ -395,7 +396,7 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
@Test
@SuppressLogger(OkHttpGrpcExporter.class)
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(UpstreamGrpcExporter.class)
void errorWithEscapedMessage() {
addGrpcError(5, "クマ🐻");
assertThat(
@ -414,7 +415,7 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
@Test
@SuppressLogger(OkHttpGrpcExporter.class)
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(UpstreamGrpcExporter.class)
void testExport_Unavailable() {
addGrpcError(14, null);
assertThat(
@ -434,7 +435,7 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
@Test
@SuppressLogger(OkHttpGrpcExporter.class)
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(UpstreamGrpcExporter.class)
void testExport_Unimplemented() {
addGrpcError(12, "UNIMPLEMENTED");
assertThat(
@ -475,7 +476,7 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
@ParameterizedTest
@ValueSource(ints = {1, 4, 8, 10, 11, 14, 15})
@SuppressLogger(OkHttpGrpcExporter.class)
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(UpstreamGrpcExporter.class)
void retryableError(int code) {
addGrpcError(code, null);
@ -497,7 +498,7 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
@Test
@SuppressLogger(OkHttpGrpcExporter.class)
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(UpstreamGrpcExporter.class)
void retryableError_tooManyAttempts() {
addGrpcError(1, null);
addGrpcError(1, null);
@ -521,7 +522,7 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
@ParameterizedTest
@ValueSource(ints = {2, 3, 5, 6, 7, 9, 12, 13, 16})
@SuppressLogger(OkHttpGrpcExporter.class)
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(UpstreamGrpcExporter.class)
void nonRetryableError(int code) {
addGrpcError(code, null);

View File

@ -0,0 +1,83 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.testing.internal;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogExporterBuilder;
import io.opentelemetry.sdk.logs.data.LogData;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
final class GrpcLogExporterBuilderWrapper implements TelemetryExporterBuilder<LogData> {
private final OtlpGrpcLogExporterBuilder builder;
GrpcLogExporterBuilderWrapper(OtlpGrpcLogExporterBuilder builder) {
this.builder = builder;
}
@Override
public TelemetryExporterBuilder<LogData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<LogData> setRetryPolicy(RetryPolicy retryPolicy) {
RetryUtil.setRetryPolicyOnDelegate(builder, retryPolicy);
return this;
}
@Override
@SuppressWarnings("deprecation") // testing deprecated functionality
public TelemetryExporterBuilder<LogData> setChannel(ManagedChannel channel) {
builder.setChannel(channel);
return this;
}
@Override
public TelemetryExporter<LogData> build() {
return TelemetryExporter.wrap(builder.build());
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.testing.internal;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.sdk.metrics.data.MetricData;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
final class GrpcMetricExporterBuilderWrapper implements TelemetryExporterBuilder<MetricData> {
private final OtlpGrpcMetricExporterBuilder builder;
GrpcMetricExporterBuilderWrapper(OtlpGrpcMetricExporterBuilder builder) {
this.builder = builder;
}
@Override
public TelemetryExporterBuilder<MetricData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<MetricData> setRetryPolicy(RetryPolicy retryPolicy) {
RetryUtil.setRetryPolicyOnDelegate(builder, retryPolicy);
return this;
}
@Override
@SuppressWarnings("deprecation") // testing deprecated functionality
public TelemetryExporterBuilder<MetricData> setChannel(ManagedChannel channel) {
builder.setChannel(channel);
return this;
}
@Override
public TelemetryExporter<MetricData> build() {
return TelemetryExporter.wrap(builder.build());
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.testing.internal;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
/** Wrapper of {@link OtlpGrpcSpanExporterBuilder} for use in integration tests. */
final class GrpcSpanExporterBuilderWrapper implements TelemetryExporterBuilder<SpanData> {
private final OtlpGrpcSpanExporterBuilder builder;
GrpcSpanExporterBuilderWrapper(OtlpGrpcSpanExporterBuilder builder) {
this.builder = builder;
}
@Override
public TelemetryExporterBuilder<SpanData> setEndpoint(String endpoint) {
builder.setEndpoint(endpoint);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTimeout(long timeout, TimeUnit unit) {
builder.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTimeout(Duration timeout) {
builder.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setCompression(String compression) {
builder.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> addHeader(String key, String value) {
builder.addHeader(key, value);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setTrustedCertificates(byte[] certificates) {
builder.setTrustedCertificates(certificates);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setClientTls(
byte[] privateKeyPem, byte[] certificatePem) {
builder.setClientTls(privateKeyPem, certificatePem);
return this;
}
@Override
public TelemetryExporterBuilder<SpanData> setRetryPolicy(RetryPolicy retryPolicy) {
RetryUtil.setRetryPolicyOnDelegate(builder, retryPolicy);
return this;
}
@Override
@SuppressWarnings("deprecation") // testing deprecated functionality
public TelemetryExporterBuilder<SpanData> setChannel(ManagedChannel channel) {
builder.setChannel(channel);
return this;
}
@Override
public TelemetryExporter<SpanData> build() {
return TelemetryExporter.wrap(builder.build());
}
}

View File

@ -0,0 +1,149 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.testing.internal;
import static java.util.Objects.requireNonNull;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.exporter.internal.grpc.ManagedChannelUtil;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
/**
* Wraps a {@link TelemetryExporterBuilder}, delegating methods to upstream gRPC's {@link
* ManagedChannel} where appropriate.
*/
public final class ManagedChannelTelemetryExporterBuilder<T>
implements TelemetryExporterBuilder<T> {
public static <T> ManagedChannelTelemetryExporterBuilder<T> wrap(
TelemetryExporterBuilder<T> delegate) {
return new ManagedChannelTelemetryExporterBuilder<>(delegate);
}
private ManagedChannelTelemetryExporterBuilder(TelemetryExporterBuilder<T> delegate) {
this.delegate = delegate;
}
private final TelemetryExporterBuilder<T> delegate;
@Nullable private ManagedChannelBuilder<?> channelBuilder;
@Nullable private byte[] privateKeyPem;
@Nullable private byte[] certificatePem;
@Nullable private byte[] trustedCertificatesPem;
@Override
public TelemetryExporterBuilder<T> setEndpoint(String endpoint) {
delegate.setEndpoint(endpoint);
URI uri = URI.create(endpoint);
channelBuilder = ManagedChannelBuilder.forAddress(uri.getHost(), uri.getPort());
if (!uri.getScheme().equals("https")) {
channelBuilder.usePlaintext();
}
return this;
}
@Override
public TelemetryExporterBuilder<T> setTimeout(long timeout, TimeUnit unit) {
delegate.setTimeout(timeout, unit);
return this;
}
@Override
public TelemetryExporterBuilder<T> setTimeout(Duration timeout) {
delegate.setTimeout(timeout);
return this;
}
@Override
public TelemetryExporterBuilder<T> setCompression(String compression) {
delegate.setCompression(compression);
return this;
}
@Override
public TelemetryExporterBuilder<T> addHeader(String key, String value) {
delegate.addHeader(key, value);
return this;
}
// When a user provides a Channel, we are not in control of TLS or retry config and reimplement it
// here for use in tests. Technically we don't have to test them since they are out of the SDK's
// control, but it's probably worth verifying the baseline functionality anyways.
@Override
public TelemetryExporterBuilder<T> setTrustedCertificates(byte[] certificates) {
this.trustedCertificatesPem = certificates;
return this;
}
@Override
public TelemetryExporterBuilder<T> setClientTls(byte[] privateKeyPem, byte[] certificatePem) {
this.privateKeyPem = privateKeyPem;
this.certificatePem = certificatePem;
return this;
}
@Override
public TelemetryExporterBuilder<T> setRetryPolicy(RetryPolicy retryPolicy) {
String grpcServiceName;
if (delegate instanceof GrpcLogExporterBuilderWrapper) {
grpcServiceName = "opentelemetry.proto.collector.logs.v1.LogsService";
} else if (delegate instanceof GrpcMetricExporterBuilderWrapper) {
grpcServiceName = "opentelemetry.proto.collector.metrics.v1.MetricsService";
} else if (delegate instanceof GrpcSpanExporterBuilderWrapper) {
grpcServiceName = "opentelemetry.proto.collector.trace.v1.TraceService";
} else {
throw new IllegalStateException("Can't happen");
}
requireNonNull(channelBuilder, "channel");
channelBuilder.defaultServiceConfig(
ManagedChannelUtil.toServiceConfig(grpcServiceName, retryPolicy));
return this;
}
@Override
public TelemetryExporterBuilder<T> setChannel(ManagedChannel channel) {
throw new UnsupportedOperationException();
}
@Override
public TelemetryExporter<T> build() {
requireNonNull(channelBuilder, "channel");
if (trustedCertificatesPem != null) {
try {
ManagedChannelUtil.setClientKeysAndTrustedCertificatesPem(
channelBuilder, privateKeyPem, certificatePem, trustedCertificatesPem);
} catch (SSLException e) {
throw new IllegalStateException(
"Could not set trusted certificates, are they valid X.509 in PEM format?", e);
}
}
ManagedChannel channel = channelBuilder.build();
delegate.setChannel(channel);
TelemetryExporter<T> delegateExporter = delegate.build();
return new TelemetryExporter<T>() {
@Override
public CompletableResultCode export(Collection<T> items) {
return delegateExporter.export(items);
}
@Override
public CompletableResultCode shutdown() {
channel.shutdownNow();
return delegateExporter.shutdown();
}
};
}
}

View File

@ -5,11 +5,31 @@
package io.opentelemetry.exporter.otlp.testing.internal;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogExporterBuilder;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder;
import io.opentelemetry.sdk.logs.data.LogData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
public interface TelemetryExporterBuilder<T> {
static TelemetryExporterBuilder<SpanData> wrap(OtlpGrpcSpanExporterBuilder builder) {
return new GrpcSpanExporterBuilderWrapper(builder);
}
static TelemetryExporterBuilder<MetricData> wrap(OtlpGrpcMetricExporterBuilder builder) {
return new GrpcMetricExporterBuilderWrapper(builder);
}
static TelemetryExporterBuilder<LogData> wrap(OtlpGrpcLogExporterBuilder builder) {
return new GrpcLogExporterBuilderWrapper(builder);
}
TelemetryExporterBuilder<T> setEndpoint(String endpoint);
TelemetryExporterBuilder<T> setTimeout(long timeout, TimeUnit unit);
@ -26,5 +46,7 @@ public interface TelemetryExporterBuilder<T> {
TelemetryExporterBuilder<T> setRetryPolicy(RetryPolicy retryPolicy);
TelemetryExporterBuilder<T> setChannel(ManagedChannel channel);
TelemetryExporter<T> build();
}

View File

@ -15,7 +15,6 @@ import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigurationException;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
class SpanExporterConfigurationTest {
@ -47,9 +46,7 @@ class SpanExporterConfigurationTest {
.isInstanceOfSatisfying(
OtlpGrpcSpanExporter.class,
otlp ->
assertThat(otlp)
.extracting("delegate.timeoutNanos")
.isEqualTo(TimeUnit.MILLISECONDS.toNanos(10L)));
assertThat(otlp).extracting("delegate.client.callTimeoutMillis").isEqualTo(10));
} finally {
exporter.shutdown();
}
@ -70,9 +67,7 @@ class SpanExporterConfigurationTest {
.isInstanceOfSatisfying(
JaegerGrpcSpanExporter.class,
jaeger ->
assertThat(jaeger)
.extracting("delegate.timeoutNanos")
.isEqualTo(TimeUnit.MILLISECONDS.toNanos(10L)));
assertThat(jaeger).extracting("delegate.client.callTimeoutMillis").isEqualTo(10));
} finally {
exporter.shutdown();
}

View File

@ -10,6 +10,7 @@ import static io.opentelemetry.sdk.autoconfigure.OtlpGrpcServerExtension.generat
import static io.opentelemetry.sdk.autoconfigure.OtlpGrpcServerExtension.generateFakeSpan;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.InstanceOfAssertFactories.INTEGER;
import static org.awaitility.Awaitility.await;
import com.google.common.collect.Lists;
@ -83,8 +84,8 @@ class OtlpGrpcConfigTest {
LogExporter logExporter =
LogExporterConfiguration.configureOtlpLogs(properties, MeterProvider.noop())) {
assertThat(spanExporter)
.extracting("delegate.timeoutNanos")
.isEqualTo(TimeUnit.SECONDS.toNanos(15));
.extracting("delegate.client.callTimeoutMillis", INTEGER)
.isEqualTo(TimeUnit.SECONDS.toMillis(15));
assertThat(spanExporter.export(SPAN_DATA).join(15, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(server.traceRequests).hasSize(1);
assertThat(server.requestHeaders)
@ -96,8 +97,8 @@ class OtlpGrpcConfigTest {
&& headers.contains("grpc-encoding", "gzip"));
assertThat(metricExporter)
.extracting("delegate.timeoutNanos")
.isEqualTo(TimeUnit.SECONDS.toNanos(15));
.extracting("delegate.client.callTimeoutMillis", INTEGER)
.isEqualTo(TimeUnit.SECONDS.toMillis(15));
assertThat(metricExporter.export(METRIC_DATA).join(15, TimeUnit.SECONDS).isSuccess())
.isTrue();
assertThat(server.metricRequests).hasSize(1);
@ -111,8 +112,8 @@ class OtlpGrpcConfigTest {
&& headers.contains("grpc-encoding", "gzip"));
assertThat(logExporter)
.extracting("delegate.timeoutNanos")
.isEqualTo(TimeUnit.SECONDS.toNanos(15));
.extracting("delegate.client.callTimeoutMillis", INTEGER)
.isEqualTo(TimeUnit.SECONDS.toMillis(15));
assertThat(logExporter.export(LOG_DATA).join(15, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(server.logRequests).hasSize(1);
assertThat(server.requestHeaders)
@ -148,8 +149,8 @@ class OtlpGrpcConfigTest {
NamedSpiManager.createEmpty(),
MeterProvider.noop())) {
assertThat(spanExporter)
.extracting("delegate.timeoutNanos")
.isEqualTo(TimeUnit.SECONDS.toNanos(15));
.extracting("delegate.client.callTimeoutMillis", INTEGER)
.isEqualTo(TimeUnit.SECONDS.toMillis(15));
assertThat(spanExporter.export(SPAN_DATA).join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(server.traceRequests).hasSize(1);
assertThat(server.requestHeaders)
@ -183,8 +184,8 @@ class OtlpGrpcConfigTest {
DefaultConfigProperties.createForTest(props))) {
assertThat(metricExporter)
.extracting("delegate.timeoutNanos")
.isEqualTo(TimeUnit.SECONDS.toNanos(15));
.extracting("delegate.client.callTimeoutMillis", INTEGER)
.isEqualTo(TimeUnit.SECONDS.toMillis(15));
assertThat(metricExporter.getAggregationTemporality(InstrumentType.COUNTER))
.isEqualTo(AggregationTemporality.DELTA);
assertThat(metricExporter.getAggregationTemporality(InstrumentType.UP_DOWN_COUNTER))
@ -224,8 +225,8 @@ class OtlpGrpcConfigTest {
DefaultConfigProperties.createForTest(props), MeterProvider.noop())) {
assertThat(logExporter)
.extracting("delegate.timeoutNanos")
.isEqualTo(TimeUnit.SECONDS.toNanos(15));
.extracting("delegate.client.callTimeoutMillis", INTEGER)
.isEqualTo(TimeUnit.SECONDS.toMillis(15));
assertThat(logExporter.export(LOG_DATA).join(15, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(server.logRequests).hasSize(1);
assertThat(server.requestHeaders)

View File

@ -14,7 +14,7 @@ import com.google.common.collect.Lists;
import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension;
import io.grpc.Status;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.internal.grpc.DefaultGrpcExporter;
import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporter;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.internal.retry.RetryUtil;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
@ -35,7 +35,7 @@ import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(OkHttpGrpcExporter.class)
class OtlpGrpcRetryTest {
private static final List<SpanData> SPAN_DATA = Lists.newArrayList(generateFakeSpan());
@ -52,7 +52,7 @@ class OtlpGrpcRetryTest {
public static final OtlpGrpcServerExtension server = new OtlpGrpcServerExtension(certificate);
@Test
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(OkHttpGrpcExporter.class)
void configureSpanExporterRetryPolicy() {
Map<String, String> props = new HashMap<>();
props.put("otel.exporter.otlp.traces.endpoint", "https://localhost:" + server.httpsPort());
@ -72,7 +72,7 @@ class OtlpGrpcRetryTest {
}
@Test
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(OkHttpGrpcExporter.class)
void configureMetricExporterRetryPolicy() {
Map<String, String> props = new HashMap<>();
props.put("otel.exporter.otlp.metrics.endpoint", "https://localhost:" + server.httpsPort());
@ -91,7 +91,7 @@ class OtlpGrpcRetryTest {
}
@Test
@SuppressLogger(DefaultGrpcExporter.class)
@SuppressLogger(OkHttpGrpcExporter.class)
void configureLogExporterRetryPolicy() {
Map<String, String> props = new HashMap<>();
props.put("otel.exporter.otlp.logs.endpoint", "https://localhost:" + server.httpsPort());

View File

@ -18,7 +18,6 @@ import io.grpc.stub.MetadataUtils;
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.grpc.ManagedChannelUtil;
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.exporter.internal.grpc.OkHttpGrpcExporterBuilder;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import java.net.URI;
@ -45,7 +44,6 @@ final class DefaultGrpcServiceBuilder<ReqT extends Marshaler, ResT extends UnMar
@Nullable private byte[] certificatePem;
@Nullable private RetryPolicy retryPolicy;
/** Creates a new {@link OkHttpGrpcExporterBuilder}. */
// Visible for testing
DefaultGrpcServiceBuilder(
String type,