Manual cherrypick for commits 8bbb0e25da, b2873186e3 (#4104)

This commit is contained in:
jack-berg 2022-01-21 08:35:29 -06:00 committed by GitHub
parent 89675582a1
commit 3938d96ce2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 180 additions and 47 deletions

View File

@ -43,6 +43,7 @@ dependencies {
testImplementation("com.linecorp.armeria:armeria-junit5")
testImplementation("io.opentelemetry.proto:opentelemetry-proto")
testImplementation("org.skyscreamer:jsonassert")
testImplementation("org.assertj:assertj-core")
testImplementation("com.google.api.grpc:proto-google-common-protos")
testImplementation("io.grpc:grpc-testing")

View File

@ -16,7 +16,6 @@ import io.grpc.stub.MetadataUtils;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
@ -44,7 +43,7 @@ public final class DefaultGrpcExporterBuilder<T extends Marshaler>
private boolean compressionEnabled = false;
@Nullable private Metadata metadata;
@Nullable private byte[] trustedCertificatesPem;
@Nullable private RetryPolicy retryPolicy;
@Nullable RetryPolicy retryPolicy;
private MeterProvider meterProvider = MeterProvider.noop();
/** Creates a new {@link DefaultGrpcExporterBuilder}. */
@ -173,25 +172,4 @@ public final class DefaultGrpcExporterBuilder<T extends Marshaler>
return new DefaultGrpcExporter<>(
type, channel, stub, meterProvider, timeoutNanos, compressionEnabled);
}
/**
* Reflectively access a {@link DefaultGrpcExporterBuilder} instance in field called "delegate" of
* the instance.
*
* @throws IllegalArgumentException if the instance does not contain a field called "delegate" of
* type {@link DefaultGrpcExporterBuilder}
*/
public static <T> DefaultGrpcExporterBuilder<?> getDelegateBuilder(Class<T> type, T instance) {
try {
Field field = type.getDeclaredField("delegate");
field.setAccessible(true);
Object value = field.get(instance);
if (!(value instanceof DefaultGrpcExporterBuilder)) {
throw new IllegalArgumentException("delegate field is not type DefaultGrpcExporterBuilder");
}
return (DefaultGrpcExporterBuilder<?>) value;
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new IllegalArgumentException("Unable to access delegate reflectively.", e);
}
}
}

View File

@ -5,7 +5,10 @@
package io.opentelemetry.exporter.otlp.internal.retry;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.grpc.GrpcStatusUtil;
import io.opentelemetry.exporter.otlp.internal.grpc.OkHttpGrpcExporterBuilder;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@ -44,4 +47,29 @@ public class RetryUtil {
public static Set<Integer> retryableHttpResponseCodes() {
return RETRYABLE_HTTP_STATUS_CODES;
}
/**
* Reflectively access a {@link DefaultGrpcExporterBuilder} or {@link OkHttpGrpcExporterBuilder}
* instance in field called "delegate" of the instance, and set the {@link RetryPolicy}.
*
* @throws IllegalArgumentException if the instance does not contain a field called "delegate" of
* type {@link DefaultGrpcExporterBuilder}
*/
public static void setRetryPolicyOnDelegate(Object instance, RetryPolicy retryPolicy) {
try {
Field field = instance.getClass().getDeclaredField("delegate");
field.setAccessible(true);
Object value = field.get(instance);
if (value instanceof DefaultGrpcExporterBuilder) {
((DefaultGrpcExporterBuilder<?>) value).setRetryPolicy(retryPolicy);
} else if (value instanceof OkHttpGrpcExporterBuilder) {
((OkHttpGrpcExporterBuilder<?>) value).setRetryPolicy(retryPolicy);
} else {
throw new IllegalArgumentException(
"delegate field is not type DefaultGrpcExporterBuilder or OkHttpGrpcExporterBuilder");
}
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new IllegalArgumentException("Unable to access delegate reflectively.", e);
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.internal.retry;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.grpc.OkHttpGrpcExporterBuilder;
import java.net.URI;
import java.net.URISyntaxException;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
class RetryUtilTest {
@Test
void setRetryPolicyOnDelegate_DefaultGrpcExporterBuilder() throws URISyntaxException {
RetryPolicy retryPolicy = RetryPolicy.getDefault();
DefaultGrpcExporterBuilder<?> builder =
new DefaultGrpcExporterBuilder<>(
"test", unused -> null, 0, new URI("http://localhost"), "test");
RetryUtil.setRetryPolicyOnDelegate(new WithDelegate(builder), retryPolicy);
assertThat(builder)
.extracting("retryPolicy", as(InstanceOfAssertFactories.type(RetryPolicy.class)))
.isEqualTo(retryPolicy);
}
@Test
void setRetryPolicyOnDelegate_OkHttpGrpcExporterBuilder() throws URISyntaxException {
RetryPolicy retryPolicy = RetryPolicy.getDefault();
OkHttpGrpcExporterBuilder<?> builder =
new OkHttpGrpcExporterBuilder<>("test", "/test", 0, new URI("http://localhost"));
RetryUtil.setRetryPolicyOnDelegate(new WithDelegate(builder), retryPolicy);
assertThat(builder)
.extracting("retryPolicy", as(InstanceOfAssertFactories.type(RetryPolicy.class)))
.isEqualTo(retryPolicy);
}
@Test
void setRetryPolicyOnDelegate_InvalidUsage() {
assertThatThrownBy(
() -> RetryUtil.setRetryPolicyOnDelegate(new Object(), RetryPolicy.getDefault()))
.hasMessageContaining("Unable to access delegate reflectively");
assertThatThrownBy(
() ->
RetryUtil.setRetryPolicyOnDelegate(
new WithDelegate(new Object()), RetryPolicy.getDefault()))
.hasMessageContaining(
"delegate field is not type DefaultGrpcExporterBuilder or OkHttpGrpcExporterBuilder");
}
@SuppressWarnings({"UnusedVariable", "FieldCanBeLocal"})
private static class WithDelegate {
private final Object delegate;
private WithDelegate(Object delegate) {
this.delegate = delegate;
}
}
}

View File

@ -6,12 +6,14 @@
package io.opentelemetry.exporter.otlp.logs;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.grpc.OkHttpGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.logs.ResourceLogsMarshaler;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
@ -33,6 +35,15 @@ class OtlpGrpcLogExporterTest extends AbstractGrpcTelemetryExporterTest<LogData,
super("log", ResourceLogs.getDefaultInstance());
}
@Test
void testBuilderDelegate() {
assertThatCode(
() ->
RetryUtil.setRetryPolicyOnDelegate(
OtlpGrpcLogExporter.builder(), RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}
@Test
void usingOkHttp() {
assertThat(OtlpGrpcLogExporter.builder().delegate)

View File

@ -13,6 +13,7 @@ import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.logs.ResourceLogsMarshaler;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
@ -39,9 +40,8 @@ class OtlpGrpcNettyLogExporterTest
void testBuilderDelegate() {
assertThatCode(
() ->
DefaultGrpcExporterBuilder.getDelegateBuilder(
OtlpGrpcLogExporterBuilder.class, OtlpGrpcLogExporter.builder())
.setRetryPolicy(RetryPolicy.getDefault()))
RetryUtil.setRetryPolicyOnDelegate(
OtlpGrpcLogExporter.builder(), RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}

View File

@ -7,12 +7,14 @@ package io.opentelemetry.exporter.otlp.metrics;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.grpc.OkHttpGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
@ -36,6 +38,15 @@ class OtlpGrpcMetricExporterTest
super("metric", ResourceMetrics.getDefaultInstance());
}
@Test
void testBuilderDelegate() {
assertThatCode(
() ->
RetryUtil.setRetryPolicyOnDelegate(
OtlpGrpcMetricExporter.builder(), RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}
@Test
void usingOkHttp() {
assertThat(OtlpGrpcMetricExporter.builder().delegate)

View File

@ -14,6 +14,7 @@ import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporterBuilder;
@ -41,9 +42,8 @@ class OtlpGrpcNettyMetricExporterTest
void testBuilderDelegate() {
assertThatCode(
() ->
DefaultGrpcExporterBuilder.getDelegateBuilder(
OtlpGrpcMetricExporterBuilder.class, OtlpGrpcMetricExporter.builder())
.setRetryPolicy(RetryPolicy.getDefault()))
RetryUtil.setRetryPolicyOnDelegate(
OtlpGrpcMetricExporter.builder(), RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}

View File

@ -6,6 +6,7 @@
package io.opentelemetry.exporter.otlp.trace;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
@ -14,6 +15,7 @@ import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.grpc.OkHttpGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.internal.traces.ResourceSpansMarshaler;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
@ -38,6 +40,15 @@ class OtlpGrpcSpanExporterTest extends AbstractGrpcTelemetryExporterTest<SpanDat
super("span", ResourceSpans.getDefaultInstance());
}
@Test
void testBuilderDelegate() {
assertThatCode(
() ->
RetryUtil.setRetryPolicyOnDelegate(
OtlpGrpcSpanExporter.builder(), RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}
@Test
void usingOkHttp() {
assertThat(OtlpGrpcSpanExporter.builder().delegate)

View File

@ -15,6 +15,7 @@ import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.internal.traces.ResourceSpansMarshaler;
import io.opentelemetry.exporter.otlp.testing.internal.AbstractGrpcTelemetryExporterTest;
import io.opentelemetry.exporter.otlp.testing.internal.TelemetryExporter;
@ -44,9 +45,8 @@ class OtlpGrpcNettySpanExporterTest
void builderDelegate() {
assertThatCode(
() ->
DefaultGrpcExporterBuilder.getDelegateBuilder(
OtlpGrpcSpanExporterBuilder.class, OtlpGrpcSpanExporter.builder())
.setRetryPolicy(RetryPolicy.getDefault()))
RetryUtil.setRetryPolicyOnDelegate(
OtlpGrpcSpanExporter.builder(), RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}

View File

@ -25,13 +25,31 @@ import java.util.List;
*/
public final class PrometheusCollector extends Collector implements MetricReader {
private final MetricProducer metricProducer;
private volatile boolean registered = false;
PrometheusCollector(MetricProducer metricProducer) {
this.metricProducer = metricProducer;
}
/**
* This method is called in {@link Factory#apply(MetricProducer)}. {@link Collector#register()}
* triggers a call to {@link #collect()}, which throws an error because {@link
* MetricProducer#collectAllMetrics()} is not yet read to accept calls. To get around this, we
* have {@link #collect()} exit early until registration is complete.
*/
@SuppressWarnings("TypeParameterUnusedInFormals")
@Override
public <T extends Collector> T register() {
T result = super.register();
this.registered = true;
return result;
}
@Override
public List<MetricFamilySamples> collect() {
if (!registered) {
return Collections.emptyList();
}
Collection<MetricData> allMetrics = metricProducer.collectAllMetrics();
List<MetricFamilySamples> allSamples = new ArrayList<>(allMetrics.size());
for (MetricData metricData : allMetrics) {

View File

@ -7,11 +7,13 @@ package io.opentelemetry.exporter.prometheus;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.DoubleSumData;
@ -25,6 +27,7 @@ import io.prometheus.client.exporter.common.TextFormat;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -43,6 +46,18 @@ class PrometheusCollectorTest {
prometheusCollector.register();
}
@Test
void registerWithSdkMeterProvider() {
assertThatCode(
() ->
SdkMeterProvider.builder()
.registerMetricReader(PrometheusCollector.create())
.build()
.forceFlush()
.join(10, TimeUnit.SECONDS))
.doesNotThrowAnyException();
}
@Test
void registerToDefault() throws IOException {
when(metricProducer.collectAllMetrics()).thenReturn(generateTestData());

View File

@ -13,8 +13,8 @@ import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.logging.SystemOutLogExporter;
import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogExporter;
import io.opentelemetry.exporter.otlp.http.logs.OtlpHttpLogExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogExporter;
import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogExporterBuilder;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
@ -129,10 +129,7 @@ class LogExporterConfiguration {
builder::setCompression,
builder::setTimeout,
builder::setTrustedCertificates,
retryPolicy ->
DefaultGrpcExporterBuilder.getDelegateBuilder(
OtlpGrpcLogExporterBuilder.class, builder)
.setRetryPolicy(retryPolicy));
retryPolicy -> RetryUtil.setRetryPolicyOnDelegate(builder, retryPolicy));
builder.setMeterProvider(meterProvider);
return builder.build();

View File

@ -12,8 +12,8 @@ import static io.opentelemetry.sdk.autoconfigure.OtlpConfigUtil.PROTOCOL_HTTP_PR
import io.opentelemetry.exporter.logging.LoggingMetricExporter;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
@ -134,10 +134,7 @@ final class MetricExporterConfiguration {
builder::setCompression,
builder::setTimeout,
builder::setTrustedCertificates,
retryPolicy ->
DefaultGrpcExporterBuilder.getDelegateBuilder(
OtlpGrpcMetricExporterBuilder.class, builder)
.setRetryPolicy(retryPolicy));
retryPolicy -> RetryUtil.setRetryPolicyOnDelegate(builder, retryPolicy));
OtlpConfigUtil.configureOtlpAggregationTemporality(config, builder::setPreferredTemporality);
exporter = builder.build();

View File

@ -16,8 +16,8 @@ import io.opentelemetry.exporter.jaeger.JaegerGrpcSpanExporterBuilder;
import io.opentelemetry.exporter.logging.LoggingSpanExporter;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.grpc.DefaultGrpcExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.retry.RetryUtil;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder;
import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter;
@ -150,10 +150,7 @@ final class SpanExporterConfiguration {
builder::setCompression,
builder::setTimeout,
builder::setTrustedCertificates,
retryPolicy ->
DefaultGrpcExporterBuilder.getDelegateBuilder(
OtlpGrpcSpanExporterBuilder.class, builder)
.setRetryPolicy(retryPolicy));
retryPolicy -> RetryUtil.setRetryPolicyOnDelegate(builder, retryPolicy));
builder.setMeterProvider(meterProvider);
return builder.build();