xds: extract channel creation out of XdsClient's implementation (#6494)

Introduce a `XdsChannelFactory` for gRPC components that instantiate `XdsClient` instances.
This commit is contained in:
Chengyuan Zhang 2019-12-08 19:10:12 -08:00 committed by GitHub
parent 30688a1eff
commit d3d977d096
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 70 additions and 48 deletions

View File

@ -16,14 +16,21 @@
package io.grpc.xds;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
// TODO(sanjaypujare): remove dependency on envoy data types.
import io.envoyproxy.envoy.api.v2.auth.UpstreamTlsContext;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.alts.GoogleDefaultChannelBuilder;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
@ -468,4 +475,45 @@ abstract class XdsClient {
return null;
}
}
/**
* Factory for creating channels to xDS severs.
*/
abstract static class XdsChannelFactory {
private static XdsChannelFactory DEFAULT_INSTANCE = new XdsChannelFactory() {
/**
* Creates a channel to the first server in the given list.
*/
@Override
ManagedChannel createChannel(List<ServerInfo> servers) {
checkArgument(!servers.isEmpty(), "No management server provided.");
ServerInfo serverInfo = servers.get(0);
String serverUri = serverInfo.getServerUri();
List<ChannelCreds> channelCredsList = serverInfo.getChannelCredentials();
ManagedChannel ch = null;
// Use the first supported channel credentials configuration.
// Currently, only "google_default" is supported.
for (ChannelCreds creds : channelCredsList) {
if (creds.getType().equals("google_default")) {
ch = GoogleDefaultChannelBuilder.forTarget(serverUri).build();
break;
}
}
if (ch == null) {
ch = ManagedChannelBuilder.forTarget(serverUri).build();
}
return ch;
}
};
static XdsChannelFactory getInstance() {
return DEFAULT_INSTANCE;
}
/**
* Creates a channel to one of the provided management servers.
*/
abstract ManagedChannel createChannel(List<ServerInfo> servers);
}
}

View File

@ -41,14 +41,11 @@ import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Http
import io.envoyproxy.envoy.config.filter.network.http_connection_manager.v2.Rds;
import io.envoyproxy.envoy.service.discovery.v2.AggregatedDiscoveryServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.alts.GoogleDefaultChannelBuilder;
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.StreamObserver;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
@ -137,29 +134,15 @@ final class XdsClientImpl extends XdsClient {
XdsClientImpl(
List<ServerInfo> servers, // list of management servers
XdsChannelFactory channelFactory,
Node node,
SynchronizationContext syncContext,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Stopwatch stopwatch) {
this(
buildChannel(checkNotNull(servers, "servers")),
node,
syncContext,
timeService,
backoffPolicyProvider,
stopwatch);
}
@VisibleForTesting
XdsClientImpl(
ManagedChannel channel,
Node node,
SynchronizationContext syncContext,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Stopwatch stopwatch) {
this.channel = checkNotNull(channel, "channel");
this.channel =
checkNotNull(channelFactory, "channelFactory")
.createChannel(checkNotNull(servers, "servers"));
this.node = checkNotNull(node, "node");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timeService = checkNotNull(timeService, "timeService");
@ -314,31 +297,6 @@ final class XdsClientImpl extends XdsClient {
}
}
/**
* Builds a channel to one of the provided management servers.
*
* <p>Note: currently we only support using the first server.
*/
private static ManagedChannel buildChannel(List<ServerInfo> servers) {
checkArgument(!servers.isEmpty(), "No management server provided.");
ServerInfo serverInfo = servers.get(0);
String serverUri = serverInfo.getServerUri();
List<ChannelCreds> channelCredsList = serverInfo.getChannelCredentials();
ManagedChannel ch = null;
// Use the first supported channel credentials configuration.
// Currently, only "google_default" is supported.
for (ChannelCreds creds : channelCredsList) {
if (creds.getType().equals("google_default")) {
ch = GoogleDefaultChannelBuilder.forTarget(serverUri).build();
break;
}
}
if (ch == null) {
ch = ManagedChannelBuilder.forTarget(serverUri).build();
}
return ch;
}
/**
* Establishes the RPC connection by creating a new RPC stream on the given channel for
* xDS protocol communication.

View File

@ -30,6 +30,7 @@ import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.UInt32Value;
@ -79,6 +80,8 @@ import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.Bootstrapper.ChannelCreds;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.LbEndpoint;
import io.grpc.xds.EnvoyProtoData.Locality;
@ -89,6 +92,7 @@ import io.grpc.xds.XdsClient.ConfigUpdate;
import io.grpc.xds.XdsClient.ConfigWatcher;
import io.grpc.xds.XdsClient.EndpointUpdate;
import io.grpc.xds.XdsClient.EndpointWatcher;
import io.grpc.xds.XdsClient.XdsChannelFactory;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashSet;
@ -169,7 +173,7 @@ public class XdsClientImplTest {
when(backoffPolicy1.nextBackoffNanos()).thenReturn(10L, 100L);
when(backoffPolicy2.nextBackoffNanos()).thenReturn(20L, 200L);
String serverName = InProcessServerBuilder.generateName();
final String serverName = InProcessServerBuilder.generateName();
AggregatedDiscoveryServiceImplBase serviceImpl = new AggregatedDiscoveryServiceImplBase() {
@Override
public StreamObserver<DiscoveryRequest> streamAggregatedResources(
@ -202,8 +206,20 @@ public class XdsClientImplTest {
.start());
channel =
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
List<ServerInfo> servers =
ImmutableList.of(new ServerInfo(serverName, ImmutableList.<ChannelCreds>of()));
XdsChannelFactory channelFactory = new XdsChannelFactory() {
@Override
ManagedChannel createChannel(List<ServerInfo> servers) {
assertThat(Iterables.getOnlyElement(servers).getServerUri()).isEqualTo(serverName);
assertThat(Iterables.getOnlyElement(servers).getChannelCredentials()).isEmpty();
return channel;
}
};
xdsClient =
new XdsClientImpl(channel, NODE, syncContext,
new XdsClientImpl(servers, channelFactory, NODE, syncContext,
fakeClock.getScheduledExecutorService(), backoffPolicyProvider,
fakeClock.getStopwatchSupplier().get());
// Only the connection to management server is established, no RPC request is sent until at