Autoconfigure experimental OTLP retry (#3791)

* Add retry policy class and wire into autoconfigure

* Wire up otlp retry for DefaultGrpcExporter

* Add documentation to autoconfigure readme

* Add more tests

* Move delegate accessor to :exporters:otlp:common

* Use jackson for json serialization

* PR feedback
This commit is contained in:
jack-berg 2021-11-08 19:06:47 -06:00 committed by GitHub
parent c858e1801b
commit a1a45d2657
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 713 additions and 152 deletions

View File

@ -39,9 +39,10 @@ dependencies {
testImplementation(project(":sdk:logs"))
testImplementation(project(":sdk:testing"))
testImplementation("com.fasterxml.jackson.core:jackson-core")
testImplementation("com.fasterxml.jackson.core:jackson-databind")
testImplementation("com.google.protobuf:protobuf-java-util")
testImplementation("io.opentelemetry.proto:opentelemetry-proto")
testImplementation("org.skyscreamer:jsonassert")
testImplementation("com.google.api.grpc:proto-google-common-protos")
testImplementation("io.grpc:grpc-testing")

View File

@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.internal;
import com.google.auto.value.AutoValue;
import java.time.Duration;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
@AutoValue
public abstract class RetryPolicy {
private static final RetryPolicy DEFAULT = new RetryPolicyBuilder().build();
RetryPolicy() {}
/** Return the default {@link RetryPolicy}. */
public static RetryPolicy getDefault() {
return DEFAULT;
}
/** Returns a new {@link RetryPolicyBuilder} to construct a {@link RetryPolicy}. */
public static RetryPolicyBuilder builder() {
return new RetryPolicyBuilder();
}
/** Returns the max number of attempts, including the original request. */
public abstract int getMaxAttempts();
/** Returns the initial backoff. */
public abstract Duration getInitialBackoff();
/** Returns the max backoff. */
public abstract Duration getMaxBackoff();
/** Returns the backoff multiplier. */
public abstract double getBackoffMultiplier();
static RetryPolicy create(
int maxAttempts, Duration initialBackoff, Duration maxBackoff, double backoffMultiplier) {
return new AutoValue_RetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.internal;
import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;
import java.time.Duration;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class RetryPolicyBuilder {
private static final int DEFAULT_MAX_ATTEMPTS = 5;
private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.ofSeconds(1);
private static final Duration DEFAULT_MAX_BACKOFF = Duration.ofSeconds(5);
private static final double DEFAULT_BACKOFF_MULTIPLIER = 1.5;
private int maxAttempts = DEFAULT_MAX_ATTEMPTS;
private Duration initialBackoff = DEFAULT_INITIAL_BACKOFF;
private Duration maxBackoff = DEFAULT_MAX_BACKOFF;
private double backoffMultiplier = DEFAULT_BACKOFF_MULTIPLIER;
RetryPolicyBuilder() {}
/**
* Set the maximum number of attempts, including the original request. Must be greater than 1 and
* less than 6.
*/
public RetryPolicyBuilder setMaxAttempts(int maxAttempts) {
checkArgument(
maxAttempts > 1 && maxAttempts < 6, "maxAttempts must be greater than 1 and less than 6");
this.maxAttempts = maxAttempts;
return this;
}
/** Set the initial backoff. Must be greater than 0. */
public RetryPolicyBuilder setInitialBackoff(Duration initialBackoff) {
requireNonNull(initialBackoff, "initialBackoff");
checkArgument(initialBackoff.toNanos() > 0, "initialBackoff must be greater than 0");
this.initialBackoff = initialBackoff;
return this;
}
/** Set the maximum backoff. Must be greater than 0. */
public RetryPolicyBuilder setMaxBackoff(Duration maxBackoff) {
requireNonNull(maxBackoff, "maxBackoff");
checkArgument(maxBackoff.toNanos() > 0, "maxBackoff must be greater than 0");
this.maxBackoff = maxBackoff;
return this;
}
/** Set the backoff multiplier. Must be greater than 0.0. */
public RetryPolicyBuilder setBackoffMultiplier(double backoffMultiplier) {
checkArgument(backoffMultiplier > 0, "backoffMultiplier must be greater than 0");
this.backoffMultiplier = backoffMultiplier;
return this;
}
/** Build and return a {@link RetryPolicy} with the values of this builder. */
public RetryPolicy build() {
return RetryPolicy.create(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
}
}

View File

@ -6,6 +6,7 @@
package io.opentelemetry.exporter.otlp.internal.grpc;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
import static io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil.toServiceConfig;
import io.grpc.Codec;
import io.grpc.ManagedChannel;
@ -13,6 +14,8 @@ import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
@ -32,6 +35,7 @@ public final class DefaultGrpcExporterBuilder<T extends Marshaler>
private final String type;
private final Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>> stubFactory;
private final String grpcServiceName;
@Nullable private ManagedChannel channel;
private long timeoutNanos;
@ -39,6 +43,7 @@ public final class DefaultGrpcExporterBuilder<T extends Marshaler>
private boolean compressionEnabled = false;
@Nullable private Metadata metadata;
@Nullable private byte[] trustedCertificatesPem;
@Nullable private RetryPolicy retryPolicy;
/** Creates a new {@link DefaultGrpcExporterBuilder}. */
// Visible for testing
@ -46,9 +51,11 @@ public final class DefaultGrpcExporterBuilder<T extends Marshaler>
String type,
Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>> stubFactory,
long defaultTimeoutSecs,
URI defaultEndpoint) {
URI defaultEndpoint,
String grpcServiceName) {
this.type = type;
this.stubFactory = stubFactory;
this.grpcServiceName = grpcServiceName;
timeoutNanos = TimeUnit.SECONDS.toNanos(defaultTimeoutSecs);
endpoint = defaultEndpoint;
}
@ -110,6 +117,12 @@ public final class DefaultGrpcExporterBuilder<T extends Marshaler>
return this;
}
@Override
public GrpcExporterBuilder<T> addRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return this;
}
@Override
public GrpcExporter<T> build() {
ManagedChannel channel = this.channel;
@ -139,6 +152,10 @@ public final class DefaultGrpcExporterBuilder<T extends Marshaler>
}
}
if (retryPolicy != null) {
managedChannelBuilder.defaultServiceConfig(toServiceConfig(grpcServiceName, retryPolicy));
}
channel = managedChannelBuilder.build();
}
@ -147,4 +164,25 @@ public final class DefaultGrpcExporterBuilder<T extends Marshaler>
stubFactory.apply(channel).withCompression(codec.getMessageEncoding());
return new DefaultGrpcExporter<>(type, channel, stub, timeoutNanos, compressionEnabled);
}
/**
* Reflectively access a {@link DefaultGrpcExporterBuilder} instance in field called "delegate" of
* the instance.
*
* @throws IllegalArgumentException if the instance does not contain a field called "delegate" of
* type {@link DefaultGrpcExporterBuilder}
*/
public static <T> DefaultGrpcExporterBuilder<?> getDelegateBuilder(Class<T> type, T instance) {
try {
Field field = type.getDeclaredField("delegate");
field.setAccessible(true);
Object value = field.get(instance);
if (!(value instanceof DefaultGrpcExporterBuilder)) {
throw new IllegalArgumentException("delegate field is not type DefaultGrpcExporterBuilder");
}
return (DefaultGrpcExporterBuilder<?>) value;
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new IllegalArgumentException("Unable to access delegate reflectively.", e);
}
}
}

View File

@ -27,9 +27,10 @@ public interface GrpcExporter<T extends Marshaler> {
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>>> stubFactory,
String grpcServiceName,
String grpcEndpointPath) {
return GrpcExporterUtil.exporterBuilder(
type, defaultTimeoutSecs, defaultEndpoint, stubFactory, grpcEndpointPath);
type, defaultTimeoutSecs, defaultEndpoint, stubFactory, grpcServiceName, grpcEndpointPath);
}
/**

View File

@ -7,6 +7,7 @@ package io.opentelemetry.exporter.otlp.internal.grpc;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
@ -26,5 +27,7 @@ public interface GrpcExporterBuilder<T extends Marshaler> {
GrpcExporterBuilder<T> addHeader(String key, String value);
GrpcExporterBuilder<T> addRetryPolicy(RetryPolicy retryPolicy);
GrpcExporter<T> build();
}

View File

@ -32,13 +32,14 @@ final class GrpcExporterUtil {
long defaultTimeoutSecs,
URI defaultEndpoint,
Supplier<Function<ManagedChannel, MarshalerServiceStub<T, ?, ?>>> stubFactory,
String grpcServiceName,
String grpcEndpointPath) {
if (USE_OKHTTP) {
return new OkHttpGrpcExporterBuilder<>(
type, grpcEndpointPath, defaultTimeoutSecs, defaultEndpoint);
} else {
return new DefaultGrpcExporterBuilder<>(
type, stubFactory.get(), defaultTimeoutSecs, defaultEndpoint);
type, stubFactory.get(), defaultTimeoutSecs, defaultEndpoint, grpcServiceName);
}
}

View File

@ -6,13 +6,21 @@
package io.opentelemetry.exporter.otlp.internal.grpc;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status.Code;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.TlsUtil;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -29,6 +37,16 @@ public final class ManagedChannelUtil {
private static final Logger logger = Logger.getLogger(ManagedChannelUtil.class.getName());
private static final List<Code> RETRYABLE_STATUS_CODES =
Arrays.asList(
Code.CANCELLED,
Code.DEADLINE_EXCEEDED,
Code.RESOURCE_EXHAUSTED,
Code.ABORTED,
Code.OUT_OF_RANGE,
Code.UNAVAILABLE,
Code.DATA_LOSS);
/**
* Configure the channel builder to trust the certificates. The {@code byte[]} should contain an
* X.509 certificate collection in PEM format.
@ -70,6 +88,34 @@ public final class ManagedChannelUtil {
}
}
/**
* Convert the {@link RetryPolicy} into a gRPC service config for the {@code serviceName}. The
* resulting map can be passed to {@link ManagedChannelBuilder#defaultServiceConfig(Map)}.
*/
public static Map<String, ?> toServiceConfig(String serviceName, RetryPolicy retryPolicy) {
List<Double> retryableStatusCodes =
RETRYABLE_STATUS_CODES.stream().map(Code::value).map(i -> (double) i).collect(toList());
Map<String, Object> retryConfig = new HashMap<>();
retryConfig.put("retryableStatusCodes", retryableStatusCodes);
retryConfig.put("maxAttempts", (double) retryPolicy.getMaxAttempts());
retryConfig.put("initialBackoff", retryPolicy.getInitialBackoff().toMillis() / 1000.0 + "s");
retryConfig.put("maxBackoff", retryPolicy.getMaxBackoff().toMillis() / 1000.0 + "s");
retryConfig.put("backoffMultiplier", retryPolicy.getBackoffMultiplier());
Map<String, Object> methodConfig = new HashMap<>();
methodConfig.put(
"name", Collections.singletonList(Collections.singletonMap("service", serviceName)));
methodConfig.put("retryPolicy", retryConfig);
return Collections.singletonMap("methodConfig", Collections.singletonList(methodConfig));
}
/** Return the list of gRPC status codes that are retryable in OTLP. */
public static List<Code> retryableStatusCodes() {
return RETRYABLE_STATUS_CODES;
}
/** Shutdown the gRPC channel. */
public static CompletableResultCode shutdownChannel(ManagedChannel managedChannel) {
final CompletableResultCode result = new CompletableResultCode();

View File

@ -7,6 +7,7 @@ package io.opentelemetry.exporter.otlp.internal.grpc;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.TlsUtil;
import java.net.URI;
import java.net.URISyntaxException;
@ -102,6 +103,11 @@ public final class OkHttpGrpcExporterBuilder<T extends Marshaler>
return this;
}
@Override
public GrpcExporterBuilder<T> addRetryPolicy(RetryPolicy retryPolicy) {
throw new UnsupportedOperationException("Only available on DefaultGrpcExporter");
}
@Override
public GrpcExporter<T> build() {
OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();

View File

@ -0,0 +1,56 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.internal;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.time.Duration;
import org.junit.jupiter.api.Test;
class RetryPolicyTest {
@Test
void defaultRetryPolicy() {
assertThat(RetryPolicy.getDefault().getMaxAttempts()).isEqualTo(5);
assertThat(RetryPolicy.getDefault().getInitialBackoff()).isEqualTo(Duration.ofSeconds(1));
assertThat(RetryPolicy.getDefault().getMaxBackoff()).isEqualTo(Duration.ofSeconds(5));
assertThat(RetryPolicy.getDefault().getBackoffMultiplier()).isEqualTo(1.5);
}
@Test
void build() {
RetryPolicy retryPolicy =
RetryPolicy.builder()
.setMaxAttempts(2)
.setInitialBackoff(Duration.ofMillis(2))
.setMaxBackoff(Duration.ofSeconds(1))
.setBackoffMultiplier(1.1)
.build();
assertThat(retryPolicy.getMaxAttempts()).isEqualTo(2);
assertThat(retryPolicy.getInitialBackoff()).isEqualTo(Duration.ofMillis(2));
assertThat(retryPolicy.getMaxBackoff()).isEqualTo(Duration.ofSeconds(1));
assertThat(retryPolicy.getBackoffMultiplier()).isEqualTo(1.1);
}
@Test
void invalidRetryPolicy() {
assertThatThrownBy(() -> RetryPolicy.builder().setMaxAttempts(1))
.isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> RetryPolicy.builder().setMaxAttempts(6))
.isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> RetryPolicy.builder().setInitialBackoff(null))
.isInstanceOf(NullPointerException.class);
assertThatThrownBy(() -> RetryPolicy.builder().setInitialBackoff(Duration.ofMillis(0)))
.isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> RetryPolicy.builder().setMaxBackoff(null))
.isInstanceOf(NullPointerException.class);
assertThatThrownBy(() -> RetryPolicy.builder().setMaxBackoff(Duration.ofMillis(0)))
.isInstanceOf(IllegalArgumentException.class);
assertThatThrownBy(() -> RetryPolicy.builder().setBackoffMultiplier(0))
.isInstanceOf(IllegalArgumentException.class);
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.internal.grpc;
import static org.assertj.core.api.Assertions.assertThatCode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.skyscreamer.jsonassert.JSONAssert;
class ManagedChannelUtilTest {
@Test
void toServiceConfig() throws Exception {
// Validate that the map matches the protobuf to JSON translation of the
// grpc.service_config.ServiceConfig protobuf definition described at:
// https://github.com/grpc/grpc/blob/master/doc/service_config.md
Map<String, ?> serviceConfig =
ManagedChannelUtil.toServiceConfig(
"opentelemetry.proto.MyService", RetryPolicy.getDefault());
String expectedServiceConfig =
"{\n"
+ " \"methodConfig\": [{\n"
+ " \"retryPolicy\": {\n"
+ " \"backoffMultiplier\": 1.5,\n"
+ " \"maxAttempts\": 5.0,\n"
+ " \"initialBackoff\": \"1.0s\",\n"
+ " \"retryableStatusCodes\": [1.0, 4.0, 8.0, 10.0, 11.0, 14.0, 15.0],\n"
+ " \"maxBackoff\": \"5.0s\"\n"
+ " },\n"
+ " \"name\": [{\n"
+ " \"service\": \"opentelemetry.proto.MyService\"\n"
+ " }]\n"
+ " }]\n"
+ "}";
JSONAssert.assertEquals(
expectedServiceConfig, new ObjectMapper().writeValueAsString(serviceConfig), false);
// Validate that the map format does not throw when passed to managed channel builder.
// Any type mismatch will throw.
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forTarget("localhost");
assertThatCode(() -> builder.defaultServiceConfig(serviceConfig)).doesNotThrowAnyException();
}
}

View File

@ -19,8 +19,7 @@ testSets {
dependencies {
api(project(":sdk:logs"))
implementation(project(":exporters:otlp:common"))
api(project(":exporters:otlp:common"))
compileOnly("io.grpc:grpc-stub")

View File

@ -19,9 +19,10 @@ import java.util.concurrent.TimeUnit;
/** Builder for {@link OtlpGrpcLogExporter}. */
public final class OtlpGrpcLogExporterBuilder {
private static final String GRPC_SERVICE_NAME =
"opentelemetry.proto.collector.logs.v1.LogsService";
// Visible for testing
static final String GRPC_ENDPOINT_PATH =
"/opentelemetry.proto.collector.logs.v1.LogsService/Export";
static final String GRPC_ENDPOINT_PATH = "/" + GRPC_SERVICE_NAME + "/Export";
private static final String DEFAULT_ENDPOINT_URL = "http://localhost:4317";
private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL);
@ -37,6 +38,7 @@ public final class OtlpGrpcLogExporterBuilder {
DEFAULT_TIMEOUT_SECS,
DEFAULT_ENDPOINT,
() -> MarshalerLogsServiceGrpc::newFutureStub,
GRPC_SERVICE_NAME,
GRPC_ENDPOINT_PATH);
}

View File

@ -22,6 +22,7 @@ import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceId;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporter;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.logs.ResourceLogsMarshaler;
@ -154,6 +155,16 @@ class OtlpGrpcLogsExporterTest {
"Unsupported compression method. Supported compression methods include: gzip, none.");
}
@Test
void testBuilderDelegate() {
assertThatCode(
() ->
DefaultGrpcExporterBuilder.getDelegateBuilder(
OtlpGrpcLogExporterBuilder.class, OtlpGrpcLogExporter.builder())
.addRetryPolicy(RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}
@Test
void testExport() {
LogData log = generateFakeLog();

View File

@ -18,8 +18,7 @@ testSets {
dependencies {
api(project(":sdk:metrics"))
implementation(project(":exporters:otlp:common"))
api(project(":exporters:otlp:common"))
compileOnly("io.grpc:grpc-stub")

View File

@ -19,9 +19,10 @@ import java.util.concurrent.TimeUnit;
/** Builder utility for this exporter. */
public final class OtlpGrpcMetricExporterBuilder {
private static final String GRPC_SERVICE_NAME =
"opentelemetry.proto.collector.metrics.v1.MetricsService";
// Visible for testing
static final String GRPC_ENDPOINT_PATH =
"/opentelemetry.proto.collector.metrics.v1.MetricsService/Export";
static final String GRPC_ENDPOINT_PATH = "/" + GRPC_SERVICE_NAME + "/Export";
private static final String DEFAULT_ENDPOINT_URL = "http://localhost:4317";
private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL);
@ -37,6 +38,7 @@ public final class OtlpGrpcMetricExporterBuilder {
DEFAULT_TIMEOUT_SECS,
DEFAULT_ENDPOINT,
() -> MarshalerMetricsServiceGrpc::newFutureStub,
GRPC_SERVICE_NAME,
GRPC_ENDPOINT_PATH);
}

View File

@ -20,6 +20,7 @@ import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporter;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.metrics.ResourceMetricsMarshaler;
@ -156,6 +157,16 @@ class OtlpGrpcMetricExporterTest {
"Unsupported compression method. Supported compression methods include: gzip, none.");
}
@Test
void testBuilderDelegate() {
assertThatCode(
() ->
DefaultGrpcExporterBuilder.getDelegateBuilder(
OtlpGrpcMetricExporterBuilder.class, OtlpGrpcMetricExporter.builder())
.addRetryPolicy(RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}
@Test
void testExport() {
MetricData metric = generateFakeMetric();

View File

@ -24,11 +24,10 @@ testSets {
dependencies {
api(project(":sdk:trace"))
api(project(":exporters:otlp:common"))
implementation(project(":api:metrics"))
implementation(project(":exporters:otlp:common"))
compileOnly("io.grpc:grpc-stub")
testImplementation(project(":sdk:testing"))

View File

@ -68,7 +68,8 @@ public class GrpcExporterBenchmark {
"span",
MarshalerTraceServiceGrpc::newFutureStub,
10,
URI.create("http://localhost:" + server.activeLocalPort()))
URI.create("http://localhost:" + server.activeLocalPort()),
OtlpGrpcSpanExporterBuilder.GRPC_SERVICE_NAME)
.build();
okhttpGrpcExporter =

View File

@ -20,8 +20,9 @@ import java.util.concurrent.TimeUnit;
public final class OtlpGrpcSpanExporterBuilder {
// Visible for testing
static final String GRPC_ENDPOINT_PATH =
"/opentelemetry.proto.collector.trace.v1.TraceService/Export";
static final String GRPC_SERVICE_NAME = "opentelemetry.proto.collector.trace.v1.TraceService";
// Visible for testing
static final String GRPC_ENDPOINT_PATH = "/" + GRPC_SERVICE_NAME + "/Export";
private static final String DEFAULT_ENDPOINT_URL = "http://localhost:4317";
private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL);
@ -37,6 +38,7 @@ public final class OtlpGrpcSpanExporterBuilder {
DEFAULT_TIMEOUT_SECS,
DEFAULT_ENDPOINT,
() -> MarshalerTraceServiceGrpc::newFutureStub,
GRPC_SERVICE_NAME,
GRPC_ENDPOINT_PATH);
}

View File

@ -23,6 +23,7 @@ import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporter;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.traces.ResourceSpansMarshaler;
@ -157,6 +158,16 @@ class OtlpGrpcSpanExporterTest {
"Unsupported compression method. Supported compression methods include: gzip, none.");
}
@Test
void testBuilderDelegate() {
assertThatCode(
() ->
DefaultGrpcExporterBuilder.getDelegateBuilder(
OtlpGrpcSpanExporterBuilder.class, OtlpGrpcSpanExporter.builder())
.addRetryPolicy(RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}
@Test
void testExport() {
SpanData span = generateFakeSpan();

View File

@ -70,10 +70,22 @@ The [OpenTelemetry Protocol (OTLP)](https://github.com/open-telemetry/openteleme
| otel.exporter.otlp.protocol | OTEL_EXPORTER_OTLP_PROTOCOL | The transport protocol to use on OTLP trace and metrics requests. Options include `grpc` and `http/protobuf`. Default is `grpc`. |
| otel.exporter.otlp.traces.protocol | OTEL_EXPORTER_OTLP_TRACES_PROTOCOL | The transport protocol to use on OTLP trace requests. Options include `grpc` and `http/protobuf`. Default is `grpc`. |
| otel.exporter.otlp.metrics.protocol | OTEL_EXPORTER_OTLP_METRICS_PROTOCOL | The transport protocol to use on OTLP metrics requests. Options include `grpc` and `http/protobuf`. Default is `grpc`. |
| otel.experimental.exporter.otlp.retry.enabled | OTEL_EXPERIMENTAL_EXPORTER_OTLP_RETRY_ENABLED | If `true`, enable [experimental retry support](#otlp-exporter-retry). Default is `false`. |
To configure the service name for the OTLP exporter, add the `service.name` key
to the OpenTelemetry Resource ([see below](#opentelemetry-resource)), e.g. `OTEL_RESOURCE_ATTRIBUTES=service.name=myservice`.
#### OTLP exporter retry
[OTLP](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#otlpgrpc-response) requires that [transient](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/exporter.md#retry) errors be handled with a retry strategy. When retry is enabled, retryable gRPC status codes will be retried using an exponential backoff with jitter algorithm as described in the [gRPC Retry Design](https://github.com/grpc/proposal/blob/master/A6-client-retries.md#exponential-backoff).
The policy has the following configuration, which there is currently no way to customize.
- `maxAttempts`: The maximum number of attempts, including the original request. Defaults to `5`.
- `initialBackoff`: The initial backoff duration. Defaults to `1s`
- `maxBackoff`: The maximum backoff duration. Defaults to `5s`.
- `backoffMultiplier` THe backoff multiplier. Defaults to `1.5`.
### Jaeger exporter
The [Jaeger](https://www.jaegertracing.io/docs/1.21/apis/#protobuf-via-grpc-stable) exporter. This exporter uses gRPC for its communications protocol.

View File

@ -12,6 +12,7 @@ import static io.opentelemetry.sdk.autoconfigure.OtlpConfigUtil.PROTOCOL_HTTP_PR
import io.opentelemetry.exporter.logging.LoggingMetricExporter;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
@ -98,7 +99,8 @@ final class MetricExporterConfiguration {
builder::addHeader,
builder::setCompression,
builder::setTimeout,
builder::setTrustedCertificates);
builder::setTrustedCertificates,
(unused) -> {});
exporter = builder.build();
} else if (protocol.equals(PROTOCOL_GRPC)) {
@ -122,7 +124,11 @@ final class MetricExporterConfiguration {
builder::addHeader,
builder::setCompression,
builder::setTimeout,
builder::setTrustedCertificates);
builder::setTrustedCertificates,
retryPolicy ->
DefaultGrpcExporterBuilder.getDelegateBuilder(
OtlpGrpcMetricExporterBuilder.class, builder)
.addRetryPolicy(retryPolicy));
exporter = builder.build();
} else {

View File

@ -5,6 +5,7 @@
package io.opentelemetry.sdk.autoconfigure;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigurationException;
import java.io.IOException;
@ -41,7 +42,8 @@ final class OtlpConfigUtil {
BiConsumer<String, String> addHeader,
Consumer<String> setCompression,
Consumer<Duration> setTimeout,
Consumer<byte[]> setTrustedCertificates) {
Consumer<byte[]> setTrustedCertificates,
Consumer<RetryPolicy> setRetryPolicy) {
String protocol = getOtlpProtocol(dataType, config);
boolean isHttpProtobuf = protocol.equals(PROTOCOL_HTTP_PROTOBUF);
URL endpoint =
@ -105,6 +107,11 @@ final class OtlpConfigUtil {
}
setTrustedCertificates.accept(certificateBytes);
}
Boolean retryEnabled = config.getBoolean("otel.experimental.exporter.otlp.retry.enabled");
if (retryEnabled != null && retryEnabled) {
setRetryPolicy.accept(RetryPolicy.getDefault());
}
}
private static URL createUrl(URL context, String spec) {

View File

@ -18,6 +18,7 @@ import io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporterBuilder;
import io.opentelemetry.exporter.logging.LoggingSpanExporter;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder;
import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
@ -134,7 +135,8 @@ final class SpanExporterConfiguration {
builder::addHeader,
builder::setCompression,
builder::setTimeout,
builder::setTrustedCertificates);
builder::setTrustedCertificates,
unused -> {});
return builder.build();
} else if (protocol.equals(PROTOCOL_GRPC)) {
@ -151,8 +153,11 @@ final class SpanExporterConfiguration {
builder::addHeader,
builder::setCompression,
builder::setTimeout,
builder::setTrustedCertificates);
builder::setTrustedCertificates,
retryPolicy ->
DefaultGrpcExporterBuilder.getDelegateBuilder(
OtlpGrpcSpanExporterBuilder.class, builder)
.addRetryPolicy(retryPolicy));
return builder.build();
} else {
throw new ConfigurationException("Unsupported OTLP traces protocol: " + protocol);

View File

@ -261,6 +261,7 @@ class OtlpConfigUtilTest {
(value1, value2) -> {},
value -> {},
value -> {},
value -> {},
value -> {});
return endpoint.get();

View File

@ -12,21 +12,10 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import com.google.common.collect.Lists;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigurationException;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
@ -44,9 +33,8 @@ import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -56,11 +44,35 @@ import org.junit.jupiter.api.extension.RegisterExtension;
class OtlpGrpcConfigTest {
private static final BlockingQueue<ExportTraceServiceRequest> traceRequests =
new LinkedBlockingDeque<>();
private static final BlockingQueue<ExportMetricsServiceRequest> metricRequests =
new LinkedBlockingDeque<>();
private static final BlockingQueue<RequestHeaders> requestHeaders = new LinkedBlockingDeque<>();
private static final List<SpanData> SPAN_DATA =
Lists.newArrayList(
TestSpanData.builder()
.setHasEnded(true)
.setName("name")
.setStartEpochNanos(MILLISECONDS.toNanos(System.currentTimeMillis()))
.setEndEpochNanos(MILLISECONDS.toNanos(System.currentTimeMillis()))
.setKind(SpanKind.SERVER)
.setStatus(StatusData.error())
.setTotalRecordedEvents(0)
.setTotalRecordedLinks(0)
.build());
private static final List<MetricData> METRIC_DATA =
Lists.newArrayList(
MetricData.createLongSum(
Resource.empty(),
InstrumentationLibraryInfo.empty(),
"metric_name",
"metric_description",
"ms",
LongSumData.create(
false,
AggregationTemporality.CUMULATIVE,
Collections.singletonList(
LongPointData.create(
MILLISECONDS.toNanos(System.currentTimeMillis()),
MILLISECONDS.toNanos(System.currentTimeMillis()),
Attributes.of(stringKey("key"), "value"),
10)))));
@RegisterExtension
@Order(1)
@ -69,60 +81,16 @@ class OtlpGrpcConfigTest {
@RegisterExtension
@Order(2)
public static final ServerExtension server =
new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
sb.service(
GrpcService.builder()
// OTLP spans
.addService(
new TraceServiceGrpc.TraceServiceImplBase() {
@Override
public void export(
ExportTraceServiceRequest request,
StreamObserver<ExportTraceServiceResponse> responseObserver) {
traceRequests.add(request);
responseObserver.onNext(ExportTraceServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
}
})
// OTLP metrics
.addService(
new MetricsServiceGrpc.MetricsServiceImplBase() {
@Override
public void export(
ExportMetricsServiceRequest request,
StreamObserver<ExportMetricsServiceResponse> responseObserver) {
if (request.getResourceMetricsCount() > 0) {
metricRequests.add(request);
}
responseObserver.onNext(
ExportMetricsServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
}
})
.useBlockingTaskExecutor(true)
.build());
sb.decorator(
(delegate, ctx, req) -> {
requestHeaders.add(req.headers());
return delegate.serve(ctx, req);
});
sb.tls(certificate.certificateFile(), certificate.privateKeyFile());
}
};
public static final OtlpGrpcServerExtension server = new OtlpGrpcServerExtension(certificate);
@BeforeEach
void setUp() {
traceRequests.clear();
metricRequests.clear();
requestHeaders.clear();
GlobalOpenTelemetry.resetForTest();
}
@AfterEach
public void tearDown() {
server.reset();
GlobalOpenTelemetry.resetForTest();
}
@ -143,14 +111,9 @@ class OtlpGrpcConfigTest {
assertThat(spanExporter)
.extracting("delegate.timeoutNanos")
.isEqualTo(TimeUnit.SECONDS.toNanos(15));
assertThat(
spanExporter
.export(Lists.newArrayList(generateFakeSpan()))
.join(15, TimeUnit.SECONDS)
.isSuccess())
.isTrue();
assertThat(traceRequests).hasSize(1);
assertThat(requestHeaders)
assertThat(spanExporter.export(SPAN_DATA).join(15, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(server.traceRequests).hasSize(1);
assertThat(server.requestHeaders)
.anyMatch(
headers ->
headers.contains(
@ -161,14 +124,9 @@ class OtlpGrpcConfigTest {
assertThat(metricExporter)
.extracting("delegate.timeoutNanos")
.isEqualTo(TimeUnit.SECONDS.toNanos(15));
assertThat(
metricExporter
.export(Lists.newArrayList(generateFakeMetric()))
.join(15, TimeUnit.SECONDS)
.isSuccess())
.isTrue();
assertThat(metricRequests).hasSize(1);
assertThat(requestHeaders)
assertThat(metricExporter.export(METRIC_DATA).join(15, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(server.metricRequests).hasSize(1);
assertThat(server.requestHeaders)
.anyMatch(
headers ->
headers.contains(
@ -200,14 +158,9 @@ class OtlpGrpcConfigTest {
assertThat(spanExporter)
.extracting("delegate.timeoutNanos")
.isEqualTo(TimeUnit.SECONDS.toNanos(15));
assertThat(
spanExporter
.export(Lists.newArrayList(generateFakeSpan()))
.join(10, TimeUnit.SECONDS)
.isSuccess())
.isTrue();
assertThat(traceRequests).hasSize(1);
assertThat(requestHeaders)
assertThat(spanExporter.export(SPAN_DATA).join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(server.traceRequests).hasSize(1);
assertThat(server.requestHeaders)
.anyMatch(
headers ->
headers.contains(
@ -238,14 +191,9 @@ class OtlpGrpcConfigTest {
assertThat(metricExporter)
.extracting("delegate.timeoutNanos")
.isEqualTo(TimeUnit.SECONDS.toNanos(15));
assertThat(
metricExporter
.export(Lists.newArrayList(generateFakeMetric()))
.join(15, TimeUnit.SECONDS)
.isSuccess())
.isTrue();
assertThat(metricRequests).hasSize(1);
assertThat(requestHeaders)
assertThat(metricExporter.export(METRIC_DATA).join(15, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(server.metricRequests).hasSize(1);
assertThat(server.requestHeaders)
.anyMatch(
headers ->
headers.contains(
@ -275,37 +223,6 @@ class OtlpGrpcConfigTest {
.hasMessageContaining("Invalid OTLP certificate path:");
}
private static SpanData generateFakeSpan() {
return TestSpanData.builder()
.setHasEnded(true)
.setName("name")
.setStartEpochNanos(MILLISECONDS.toNanos(System.currentTimeMillis()))
.setEndEpochNanos(MILLISECONDS.toNanos(System.currentTimeMillis()))
.setKind(SpanKind.SERVER)
.setStatus(StatusData.error())
.setTotalRecordedEvents(0)
.setTotalRecordedLinks(0)
.build();
}
private static MetricData generateFakeMetric() {
return MetricData.createLongSum(
Resource.empty(),
InstrumentationLibraryInfo.empty(),
"metric_name",
"metric_description",
"ms",
LongSumData.create(
false,
AggregationTemporality.CUMULATIVE,
Collections.singletonList(
LongPointData.create(
MILLISECONDS.toNanos(System.currentTimeMillis()),
MILLISECONDS.toNanos(System.currentTimeMillis()),
Attributes.of(stringKey("key"), "value"),
10))));
}
@Test
void configuresGlobal() {
System.setProperty("otel.exporter.otlp.endpoint", "https://localhost:" + server.httpsPort());
@ -318,12 +235,12 @@ class OtlpGrpcConfigTest {
await()
.untilAsserted(
() -> {
assertThat(traceRequests).hasSize(1);
assertThat(server.traceRequests).hasSize(1);
// Not well defined how many metric exports would have happened by now, check that
// any did. Metrics are recorded by OtlpGrpcSpanExporter, BatchSpanProcessor, and
// potentially others.
assertThat(metricRequests).isNotEmpty();
assertThat(server.metricRequests).isNotEmpty();
});
}
}

View File

@ -0,0 +1,160 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.autoconfigure;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.exporter.otlp.internal.grpc.ManagedChannelUtil.retryableStatusCodes;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import com.google.common.collect.Lists;
import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension;
import io.grpc.Status;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.LongSumData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.trace.TestSpanData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
class OtlpGrpcRetryTest {
private static final List<SpanData> SPAN_DATA =
Lists.newArrayList(
TestSpanData.builder()
.setHasEnded(true)
.setName("name")
.setStartEpochNanos(MILLISECONDS.toNanos(System.currentTimeMillis()))
.setEndEpochNanos(MILLISECONDS.toNanos(System.currentTimeMillis()))
.setKind(SpanKind.SERVER)
.setStatus(StatusData.error())
.setTotalRecordedEvents(0)
.setTotalRecordedLinks(0)
.build());
private static final List<MetricData> METRIC_DATA =
Lists.newArrayList(
MetricData.createLongSum(
Resource.empty(),
InstrumentationLibraryInfo.empty(),
"metric_name",
"metric_description",
"ms",
LongSumData.create(
false,
AggregationTemporality.CUMULATIVE,
Collections.singletonList(
LongPointData.create(
MILLISECONDS.toNanos(System.currentTimeMillis()),
MILLISECONDS.toNanos(System.currentTimeMillis()),
Attributes.of(stringKey("key"), "value"),
10)))));
@RegisterExtension
@Order(1)
public static final SelfSignedCertificateExtension certificate =
new SelfSignedCertificateExtension();
@RegisterExtension
@Order(2)
public static final OtlpGrpcServerExtension server = new OtlpGrpcServerExtension(certificate);
@Test
void configureSpanExporterRetryPolicy() {
Map<String, String> props = new HashMap<>();
props.put("otel.exporter.otlp.traces.endpoint", "https://localhost:" + server.httpsPort());
props.put(
"otel.exporter.otlp.traces.certificate", certificate.certificateFile().getAbsolutePath());
props.put("otel.experimental.exporter.otlp.retry.enabled", "true");
SpanExporter spanExporter =
SpanExporterConfiguration.configureExporter(
"otlp", DefaultConfigProperties.createForTest(props), Collections.emptyMap());
testRetryableStatusCodes(() -> SPAN_DATA, spanExporter::export, server.traceRequests::size);
testDefaultRetryPolicy(() -> SPAN_DATA, spanExporter::export, server.traceRequests::size);
}
@Test
void configureMetricExporterRetryPolicy() {
Map<String, String> props = new HashMap<>();
props.put("otel.exporter.otlp.metrics.endpoint", "https://localhost:" + server.httpsPort());
props.put(
"otel.exporter.otlp.metrics.certificate", certificate.certificateFile().getAbsolutePath());
props.put("otel.experimental.exporter.otlp.retry.enabled", "true");
MetricExporter metricExporter =
MetricExporterConfiguration.configureOtlpMetrics(
DefaultConfigProperties.createForTest(props), SdkMeterProvider.builder());
testRetryableStatusCodes(
() -> METRIC_DATA, metricExporter::export, server.metricRequests::size);
testDefaultRetryPolicy(() -> METRIC_DATA, metricExporter::export, server.metricRequests::size);
}
private static <T> void testRetryableStatusCodes(
Supplier<T> dataSupplier,
Function<T, CompletableResultCode> exporter,
Supplier<Integer> serverRequestCountSupplier) {
for (Status.Code code : Status.Code.values()) {
server.reset();
server.responseStatuses.add(Status.fromCode(code));
server.responseStatuses.add(Status.OK);
CompletableResultCode resultCode =
exporter.apply(dataSupplier.get()).join(10, TimeUnit.SECONDS);
boolean retryable = retryableStatusCodes().contains(code);
boolean expectedResult = retryable || code == Status.Code.OK;
assertThat(resultCode.isSuccess())
.as(
"status code %s should export %s",
code, expectedResult ? "successfully" : "unsuccessfully")
.isEqualTo(expectedResult);
int expectedRequests = retryable ? 2 : 1;
assertThat(serverRequestCountSupplier.get())
.as("status code %s should make %s requests", code, expectedRequests)
.isEqualTo(expectedRequests);
}
}
private static <T> void testDefaultRetryPolicy(
Supplier<T> dataSupplier,
Function<T, CompletableResultCode> exporter,
Supplier<Integer> serverRequestCountSupplier) {
server.reset();
// Set the server to fail with a retryable status code for the max attempts
int maxAttempts = RetryPolicy.getDefault().getMaxAttempts();
Status.Code retryableCode = retryableStatusCodes().get(0);
for (int i = 0; i < maxAttempts; i++) {
server.responseStatuses.add(Status.fromCode(retryableCode));
}
// Result should be failure, sever should have received maxAttempts requests
CompletableResultCode resultCode =
exporter.apply(dataSupplier.get()).join(10, TimeUnit.SECONDS);
assertThat(resultCode.isSuccess()).isFalse();
assertThat(serverRequestCountSupplier.get()).isEqualTo(maxAttempts);
}
}

View File

@ -0,0 +1,97 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.autoconfigure;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc;
import java.util.ArrayDeque;
import java.util.Queue;
class OtlpGrpcServerExtension extends ServerExtension {
final Queue<ExportTraceServiceRequest> traceRequests = new ArrayDeque<>();
final Queue<ExportMetricsServiceRequest> metricRequests = new ArrayDeque<>();
final Queue<Status> responseStatuses = new ArrayDeque<>();
final Queue<RequestHeaders> requestHeaders = new ArrayDeque<>();
private final SelfSignedCertificateExtension certificate;
OtlpGrpcServerExtension(SelfSignedCertificateExtension certificate) {
this.certificate = certificate;
}
@Override
protected void configure(ServerBuilder sb) {
sb.service(
GrpcService.builder()
.addService(
new TraceServiceGrpc.TraceServiceImplBase() {
@Override
public void export(
ExportTraceServiceRequest request,
StreamObserver<ExportTraceServiceResponse> responseObserver) {
exportHelper(
traceRequests,
ExportTraceServiceResponse.getDefaultInstance(),
request,
responseObserver);
}
})
.addService(
new MetricsServiceGrpc.MetricsServiceImplBase() {
@Override
public void export(
ExportMetricsServiceRequest request,
StreamObserver<ExportMetricsServiceResponse> responseObserver) {
exportHelper(
metricRequests,
ExportMetricsServiceResponse.getDefaultInstance(),
request,
responseObserver);
}
})
.useBlockingTaskExecutor(true)
.build());
sb.decorator(
(delegate, ctx, req) -> {
requestHeaders.add(req.headers());
return delegate.serve(ctx, req);
});
sb.tls(certificate.certificateFile(), certificate.privateKeyFile());
}
private <RequestT, ResponseT> void exportHelper(
Queue<RequestT> requests,
ResponseT defaultResponse,
RequestT request,
StreamObserver<ResponseT> responseObserver) {
requests.add(request);
Status responseStatus = responseStatuses.peek() != null ? responseStatuses.poll() : Status.OK;
if (responseStatus.isOk()) {
responseObserver.onNext(defaultResponse);
responseObserver.onCompleted();
return;
}
responseObserver.onError(responseStatus.asRuntimeException());
}
void reset() {
traceRequests.clear();
metricRequests.clear();
requestHeaders.clear();
responseStatuses.clear();
}
}