diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 925ed2de35..7cdf6c85cc 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -16,14 +16,21 @@ package io.grpc.xds; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; // TODO(sanjaypujare): remove dependency on envoy data types. import io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.grpc.Status; +import io.grpc.alts.GoogleDefaultChannelBuilder; import io.grpc.internal.ObjectPool; +import io.grpc.xds.Bootstrapper.ChannelCreds; +import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.EnvoyProtoData.DropOverload; import io.grpc.xds.EnvoyProtoData.Locality; import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints; @@ -468,4 +475,45 @@ abstract class XdsClient { return null; } } + + /** + * Factory for creating channels to xDS severs. + */ + abstract static class XdsChannelFactory { + private static XdsChannelFactory DEFAULT_INSTANCE = new XdsChannelFactory() { + + /** + * Creates a channel to the first server in the given list. + */ + @Override + ManagedChannel createChannel(List servers) { + checkArgument(!servers.isEmpty(), "No management server provided."); + ServerInfo serverInfo = servers.get(0); + String serverUri = serverInfo.getServerUri(); + List channelCredsList = serverInfo.getChannelCredentials(); + ManagedChannel ch = null; + // Use the first supported channel credentials configuration. + // Currently, only "google_default" is supported. + for (ChannelCreds creds : channelCredsList) { + if (creds.getType().equals("google_default")) { + ch = GoogleDefaultChannelBuilder.forTarget(serverUri).build(); + break; + } + } + if (ch == null) { + ch = ManagedChannelBuilder.forTarget(serverUri).build(); + } + return ch; + } + }; + + static XdsChannelFactory getInstance() { + return DEFAULT_INSTANCE; + } + + /** + * Creates a channel to one of the provided management servers. + */ + abstract ManagedChannel createChannel(List servers); + } } diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index c18e92921a..6460ccae21 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -41,14 +41,11 @@ import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Http import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds; import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc; import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; -import io.grpc.alts.GoogleDefaultChannelBuilder; import io.grpc.internal.BackoffPolicy; import io.grpc.stub.StreamObserver; -import io.grpc.xds.Bootstrapper.ChannelCreds; import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.EnvoyProtoData.DropOverload; import io.grpc.xds.EnvoyProtoData.Locality; @@ -137,29 +134,15 @@ final class XdsClientImpl extends XdsClient { XdsClientImpl( List servers, // list of management servers + XdsChannelFactory channelFactory, Node node, SynchronizationContext syncContext, ScheduledExecutorService timeService, BackoffPolicy.Provider backoffPolicyProvider, Stopwatch stopwatch) { - this( - buildChannel(checkNotNull(servers, "servers")), - node, - syncContext, - timeService, - backoffPolicyProvider, - stopwatch); - } - - @VisibleForTesting - XdsClientImpl( - ManagedChannel channel, - Node node, - SynchronizationContext syncContext, - ScheduledExecutorService timeService, - BackoffPolicy.Provider backoffPolicyProvider, - Stopwatch stopwatch) { - this.channel = checkNotNull(channel, "channel"); + this.channel = + checkNotNull(channelFactory, "channelFactory") + .createChannel(checkNotNull(servers, "servers")); this.node = checkNotNull(node, "node"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.timeService = checkNotNull(timeService, "timeService"); @@ -314,31 +297,6 @@ final class XdsClientImpl extends XdsClient { } } - /** - * Builds a channel to one of the provided management servers. - * - *

Note: currently we only support using the first server. - */ - private static ManagedChannel buildChannel(List servers) { - checkArgument(!servers.isEmpty(), "No management server provided."); - ServerInfo serverInfo = servers.get(0); - String serverUri = serverInfo.getServerUri(); - List channelCredsList = serverInfo.getChannelCredentials(); - ManagedChannel ch = null; - // Use the first supported channel credentials configuration. - // Currently, only "google_default" is supported. - for (ChannelCreds creds : channelCredsList) { - if (creds.getType().equals("google_default")) { - ch = GoogleDefaultChannelBuilder.forTarget(serverUri).build(); - break; - } - } - if (ch == null) { - ch = ManagedChannelBuilder.forTarget(serverUri).build(); - } - return ch; - } - /** * Establishes the RPC connection by creating a new RPC stream on the given channel for * xDS protocol communication. diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java index e821af17e4..4efaad98aa 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Any; import com.google.protobuf.UInt32Value; @@ -79,6 +80,8 @@ import io.grpc.internal.BackoffPolicy; import io.grpc.internal.FakeClock; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.Bootstrapper.ChannelCreds; +import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.EnvoyProtoData.DropOverload; import io.grpc.xds.EnvoyProtoData.LbEndpoint; import io.grpc.xds.EnvoyProtoData.Locality; @@ -89,6 +92,7 @@ import io.grpc.xds.XdsClient.ConfigUpdate; import io.grpc.xds.XdsClient.ConfigWatcher; import io.grpc.xds.XdsClient.EndpointUpdate; import io.grpc.xds.XdsClient.EndpointWatcher; +import io.grpc.xds.XdsClient.XdsChannelFactory; import java.io.IOException; import java.util.ArrayDeque; import java.util.HashSet; @@ -169,7 +173,7 @@ public class XdsClientImplTest { when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L); when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L); - String serverName = InProcessServerBuilder.generateName(); + final String serverName = InProcessServerBuilder.generateName(); AggregatedDiscoveryServiceImplBase serviceImpl = new AggregatedDiscoveryServiceImplBase() { @Override public StreamObserver streamAggregatedResources( @@ -202,8 +206,20 @@ public class XdsClientImplTest { .start()); channel = cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); + + List servers = + ImmutableList.of(new ServerInfo(serverName, ImmutableList.of())); + XdsChannelFactory channelFactory = new XdsChannelFactory() { + @Override + ManagedChannel createChannel(List servers) { + assertThat(Iterables.getOnlyElement(servers).getServerUri()).isEqualTo(serverName); + assertThat(Iterables.getOnlyElement(servers).getChannelCredentials()).isEmpty(); + return channel; + } + }; + xdsClient = - new XdsClientImpl(channel, NODE, syncContext, + new XdsClientImpl(servers, channelFactory, NODE, syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider, fakeClock.getStopwatchSupplier().get()); // Only the connection to management server is established, no RPC request is sent until at