Use minimal fallback managed channel when none is specified (#6110)

This commit is contained in:
jack-berg 2024-01-04 09:36:03 -06:00 committed by GitHub
parent b4ed53211b
commit da7796b3b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 75 additions and 10 deletions

View File

@ -85,6 +85,7 @@ public class OltpExporterBenchmark {
"span",
new UpstreamGrpcSender<>(
MarshalerTraceServiceGrpc.newFutureStub(defaultGrpcChannel, null),
/* shutdownChannel= */ false,
10,
Collections::emptyMap),
MeterProvider::noop);

View File

@ -5,6 +5,7 @@
package io.opentelemetry.exporter.otlp.testing.internal;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@ -23,8 +24,10 @@ import com.linecorp.armeria.server.logging.LoggingService;
import com.linecorp.armeria.testing.junit5.server.SelfSignedCertificateExtension;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.github.netmikey.logunit.api.LogCapturer;
import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.internal.TlsUtil;
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
@ -61,6 +64,7 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.iterable.ThrowingExtractor;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@ -215,6 +219,23 @@ public abstract class AbstractGrpcTelemetryExporterTest<T, U extends Message> {
httpRequests.clear();
}
@Test
void minimalChannel() {
// Test that UpstreamGrpcSender uses minimal fallback managed channel, so skip for
// OkHttpGrpcSender
assumeThat(exporter.unwrap())
.extracting("delegate.grpcSender")
.matches(sender -> sender.getClass().getSimpleName().equals("UpstreamGrpcSender"));
// When no channel is explicitly set, should fall back to a minimally configured managed channel
TelemetryExporter<?> exporter = exporterBuilder().build();
assertThat(exporter.shutdown().join(10, TimeUnit.SECONDS).isSuccess()).isTrue();
assertThat(exporter.unwrap())
.extracting(
"delegate.grpcSender.stub",
as(InstanceOfAssertFactories.type(MarshalerServiceStub.class)))
.satisfies(stub -> assertThat(((ManagedChannel) stub.getChannel()).isShutdown()).isTrue());
}
@Test
void export() {
List<T> telemetry = Collections.singletonList(generateFakeTelemetry());

View File

@ -158,16 +158,21 @@ public final class ManagedChannelTelemetryExporterBuilder<T>
@Override
public TelemetryExporter<T> build() {
requireNonNull(channelBuilder, "channel");
Runnable shutdownCallback;
if (channelBuilder != null) {
try {
setSslContext(channelBuilder, tlsConfigHelper);
} catch (SSLException e) {
throw new IllegalStateException(e);
}
try {
setSslContext(channelBuilder, tlsConfigHelper);
} catch (SSLException e) {
throw new IllegalStateException(e);
ManagedChannel channel = channelBuilder.build();
delegate.setChannel(channel);
shutdownCallback = channel::shutdownNow;
} else {
shutdownCallback = () -> {};
}
ManagedChannel channel = channelBuilder.build();
delegate.setChannel(channel);
TelemetryExporter<T> delegateExporter = delegate.build();
return new TelemetryExporter<T>() {
@Override
@ -182,7 +187,7 @@ public final class ManagedChannelTelemetryExporterBuilder<T>
@Override
public CompletableResultCode shutdown() {
channel.shutdownNow();
shutdownCallback.run();
return delegateExporter.shutdown();
}
};

View File

@ -8,6 +8,7 @@ package io.opentelemetry.exporter.sender.grpc.managedchannel.internal;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.stub.MetadataUtils;
@ -32,16 +33,19 @@ import org.checkerframework.checker.nullness.qual.Nullable;
public final class UpstreamGrpcSender<T extends Marshaler> implements GrpcSender<T> {
private final MarshalerServiceStub<T, ?, ?> stub;
private final boolean shutdownChannel;
private final long timeoutNanos;
private final Supplier<Map<String, List<String>>> headersSupplier;
/** Creates a new {@link UpstreamGrpcSender}. */
public UpstreamGrpcSender(
MarshalerServiceStub<T, ?, ?> stub,
boolean shutdownChannel,
long timeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier) {
this.timeoutNanos = timeoutNanos;
this.stub = stub;
this.shutdownChannel = shutdownChannel;
this.timeoutNanos = timeoutNanos;
this.headersSupplier = headersSupplier;
}
@ -82,6 +86,10 @@ public final class UpstreamGrpcSender<T extends Marshaler> implements GrpcSender
@Override
public CompletableResultCode shutdown() {
if (shutdownChannel) {
ManagedChannel channel = (ManagedChannel) stub.getChannel();
channel.shutdownNow();
}
return CompletableResultCode.ofSuccess();
}
}

View File

@ -7,6 +7,8 @@ package io.opentelemetry.exporter.sender.grpc.managedchannel.internal;
import io.grpc.Channel;
import io.grpc.Codec;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.exporter.internal.grpc.GrpcSender;
import io.opentelemetry.exporter.internal.grpc.GrpcSenderProvider;
import io.opentelemetry.exporter.internal.grpc.MarshalerServiceStub;
@ -41,6 +43,13 @@ public class UpstreamGrpcSenderProvider implements GrpcSenderProvider {
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
boolean shutdownChannel = false;
if (managedChannel == null) {
// Shutdown the channel as part of the exporter shutdown sequence if
shutdownChannel = true;
managedChannel = minimalFallbackManagedChannel(endpoint);
}
String authorityOverride = null;
Map<String, List<String>> headers = headersSupplier.get();
if (headers != null) {
@ -58,6 +67,27 @@ public class UpstreamGrpcSenderProvider implements GrpcSenderProvider {
.apply((Channel) managedChannel, authorityOverride)
.withCompression(codec.getMessageEncoding());
return new UpstreamGrpcSender<>(stub, timeoutNanos, headersSupplier);
return new UpstreamGrpcSender<>(stub, shutdownChannel, timeoutNanos, headersSupplier);
}
/**
* If {@link ManagedChannel} is not explicitly set, provide a minimally configured fallback
* channel to avoid failing initialization.
*
* <p>This is required to accommodate autoconfigure with {@code
* opentelemetry-exporter-sender-grpc-managed-channel} which will always fail to initialize
* without a fallback channel since there isn't an opportunity to explicitly set the channel.
*
* <p>This only incorporates the target address, port, and whether to use plain text. All
* additional settings are intentionally ignored and must be configured with an explicitly set
* {@link ManagedChannel}.
*/
private static ManagedChannel minimalFallbackManagedChannel(URI endpoint) {
ManagedChannelBuilder<?> channelBuilder =
ManagedChannelBuilder.forAddress(endpoint.getHost(), endpoint.getPort());
if (!endpoint.getScheme().equals("https")) {
channelBuilder.usePlaintext();
}
return channelBuilder.build();
}
}