mirror of https://github.com/grpc/grpc-java.git
xds: create singleton XdsClient object (promote ClientXdsClient) (#7500)
Use a global factory to create a shared XdsClient object pool that can be used by multiple client channels. The object pool is thread-safe and holds a single XdsClient returning to each client channel. So at most one XdsClient instance will be created per process, and it is shared between client channels.
This commit is contained in:
parent
34ef76704a
commit
80631db7a8
|
|
@ -0,0 +1,146 @@
|
|||
/*
|
||||
* Copyright 2020 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import io.grpc.internal.ExponentialBackoffPolicy;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.internal.SharedResourceHolder;
|
||||
import io.grpc.xds.Bootstrapper.BootstrapInfo;
|
||||
import io.grpc.xds.EnvoyProtoData.Node;
|
||||
import io.grpc.xds.XdsClient.XdsChannel;
|
||||
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import javax.annotation.concurrent.GuardedBy;
|
||||
import javax.annotation.concurrent.ThreadSafe;
|
||||
|
||||
/**
|
||||
* The global factory for creating a singleton {@link XdsClient} instance to be used by all gRPC
|
||||
* clients in the process.
|
||||
*/
|
||||
@ThreadSafe
|
||||
final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
|
||||
private final Bootstrapper bootstrapper;
|
||||
private final XdsChannelFactory channelFactory;
|
||||
private final Object lock = new Object();
|
||||
private volatile ObjectPool<XdsClient> xdsClientPool;
|
||||
|
||||
private SharedXdsClientPoolProvider() {
|
||||
this(Bootstrapper.getInstance(), XdsChannelFactory.getInstance());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
SharedXdsClientPoolProvider(
|
||||
Bootstrapper bootstrapper, XdsChannelFactory channelFactory) {
|
||||
this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper");
|
||||
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
|
||||
}
|
||||
|
||||
static SharedXdsClientPoolProvider getDefaultProvider() {
|
||||
return SharedXdsClientPoolProviderHolder.instance;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectPool<XdsClient> getXdsClientPool() throws XdsInitializationException {
|
||||
ObjectPool<XdsClient> ref = xdsClientPool;
|
||||
if (ref == null) {
|
||||
synchronized (lock) {
|
||||
ref = xdsClientPool;
|
||||
if (ref == null) {
|
||||
BootstrapInfo bootstrapInfo = bootstrapper.readBootstrap();
|
||||
XdsChannel channel = channelFactory.createChannel(bootstrapInfo.getServers());
|
||||
ref = xdsClientPool = new RefCountedXdsClientObjectPool(channel, bootstrapInfo.getNode());
|
||||
}
|
||||
}
|
||||
}
|
||||
return ref;
|
||||
}
|
||||
|
||||
private static class SharedXdsClientPoolProviderHolder {
|
||||
private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider();
|
||||
}
|
||||
|
||||
@ThreadSafe
|
||||
@VisibleForTesting
|
||||
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
|
||||
private final XdsChannel channel;
|
||||
private final Node node;
|
||||
private final XdsClientFactory factory;
|
||||
private final Object lock = new Object();
|
||||
@GuardedBy("lock")
|
||||
private ScheduledExecutorService scheduler;
|
||||
@GuardedBy("lock")
|
||||
private XdsClient xdsClient;
|
||||
@GuardedBy("lock")
|
||||
private int refCount;
|
||||
|
||||
RefCountedXdsClientObjectPool(XdsChannel channel, Node node) {
|
||||
this(channel, node, XdsClientFactory.INSTANCE);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
RefCountedXdsClientObjectPool(XdsChannel channel, Node node, XdsClientFactory factory) {
|
||||
this.channel = checkNotNull(channel, "channel");
|
||||
this.node = checkNotNull(node, "node");
|
||||
this.factory = checkNotNull(factory, "factory");
|
||||
}
|
||||
|
||||
@Override
|
||||
public XdsClient getObject() {
|
||||
synchronized (lock) {
|
||||
if (xdsClient == null) {
|
||||
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
|
||||
xdsClient = factory.newXdsClient(channel, node, scheduler);
|
||||
}
|
||||
refCount++;
|
||||
return xdsClient;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XdsClient returnObject(Object object) {
|
||||
synchronized (lock) {
|
||||
refCount--;
|
||||
if (refCount == 0) {
|
||||
xdsClient.shutdown();
|
||||
xdsClient = null;
|
||||
scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// Introduced for testing.
|
||||
@VisibleForTesting
|
||||
abstract static class XdsClientFactory {
|
||||
private static final XdsClientFactory INSTANCE = new XdsClientFactory() {
|
||||
@Override
|
||||
XdsClient newXdsClient(XdsChannel channel, Node node,
|
||||
ScheduledExecutorService timeService) {
|
||||
return new ClientXdsClient(channel, node, timeService,
|
||||
new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
}
|
||||
};
|
||||
|
||||
abstract XdsClient newXdsClient(XdsChannel channel, Node node,
|
||||
ScheduledExecutorService timeService);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -19,15 +19,12 @@ package io.grpc.xds;
|
|||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.base.MoreObjects.ToStringHelper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.xds.EnvoyProtoData.DropOverload;
|
||||
import io.grpc.xds.EnvoyProtoData.Locality;
|
||||
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
|
||||
|
|
@ -537,7 +534,8 @@ abstract class XdsClient {
|
|||
/**
|
||||
* Shutdown this {@link XdsClient} and release resources.
|
||||
*/
|
||||
abstract void shutdown();
|
||||
void shutdown() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a data watcher for the given LDS resource.
|
||||
|
|
@ -611,74 +609,10 @@ abstract class XdsClient {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
// TODO(chengyuanzhang): eliminate this factory
|
||||
abstract static class XdsClientFactory {
|
||||
abstract XdsClient createXdsClient();
|
||||
}
|
||||
|
||||
/**
|
||||
* An {@link ObjectPool} holding reference and ref-count of an {@link XdsClient} instance.
|
||||
* Initially the instance is null and the ref-count is zero. {@link #getObject()} will create a
|
||||
* new XdsClient instance if the ref-count is zero when calling the method. {@code #getObject()}
|
||||
* increments the ref-count and {@link #returnObject(Object)} decrements it. Anytime when the
|
||||
* ref-count gets back to zero, the XdsClient instance will be shutdown and de-referenced.
|
||||
*/
|
||||
static final class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
|
||||
|
||||
private final XdsClientFactory xdsClientFactory;
|
||||
|
||||
@VisibleForTesting
|
||||
@Nullable
|
||||
XdsClient xdsClient;
|
||||
|
||||
private int refCount;
|
||||
|
||||
RefCountedXdsClientObjectPool(XdsClientFactory xdsClientFactory) {
|
||||
this.xdsClientFactory = Preconditions.checkNotNull(xdsClientFactory, "xdsClientFactory");
|
||||
}
|
||||
|
||||
/**
|
||||
* See {@link RefCountedXdsClientObjectPool}.
|
||||
*/
|
||||
@Override
|
||||
public synchronized XdsClient getObject() {
|
||||
if (xdsClient == null) {
|
||||
checkState(
|
||||
refCount == 0,
|
||||
"Bug: refCount should be zero while xdsClient is null");
|
||||
xdsClient = xdsClientFactory.createXdsClient();
|
||||
}
|
||||
refCount++;
|
||||
return xdsClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* See {@link RefCountedXdsClientObjectPool}.
|
||||
*/
|
||||
@Override
|
||||
public synchronized XdsClient returnObject(Object object) {
|
||||
checkState(
|
||||
object == xdsClient,
|
||||
"Bug: the returned object '%s' does not match current XdsClient '%s'",
|
||||
object,
|
||||
xdsClient);
|
||||
|
||||
refCount--;
|
||||
checkState(refCount >= 0, "Bug: refCount of XdsClient less than 0");
|
||||
if (refCount == 0) {
|
||||
xdsClient.shutdown();
|
||||
xdsClient = null;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
static final class XdsChannel {
|
||||
private final ManagedChannel managedChannel;
|
||||
private final boolean useProtocolV3;
|
||||
|
||||
@VisibleForTesting
|
||||
XdsChannel(ManagedChannel managedChannel, boolean useProtocolV3) {
|
||||
this.managedChannel = managedChannel;
|
||||
this.useProtocolV3 = useProtocolV3;
|
||||
|
|
|
|||
|
|
@ -33,7 +33,6 @@ import io.grpc.Status;
|
|||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.xds.Bootstrapper.BootstrapInfo;
|
||||
import io.grpc.xds.EnvoyProtoData.ClusterWeight;
|
||||
import io.grpc.xds.EnvoyProtoData.Route;
|
||||
import io.grpc.xds.EnvoyProtoData.RouteAction;
|
||||
|
|
@ -43,7 +42,6 @@ import io.grpc.xds.XdsClient.LdsResourceWatcher;
|
|||
import io.grpc.xds.XdsClient.LdsUpdate;
|
||||
import io.grpc.xds.XdsClient.RdsResourceWatcher;
|
||||
import io.grpc.xds.XdsClient.RdsUpdate;
|
||||
import io.grpc.xds.XdsClient.XdsChannel;
|
||||
import io.grpc.xds.XdsLogger.XdsLogLevel;
|
||||
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
|
||||
import java.util.Collection;
|
||||
|
|
@ -79,8 +77,6 @@ final class XdsNameResolver extends NameResolver {
|
|||
private final String authority;
|
||||
private final ServiceConfigParser serviceConfigParser;
|
||||
private final SynchronizationContext syncContext;
|
||||
private final Bootstrapper bootstrapper;
|
||||
private final XdsChannelFactory channelFactory;
|
||||
private final XdsClientPoolFactory xdsClientPoolFactory;
|
||||
private final ThreadSafeRandom random;
|
||||
private final ConcurrentMap<String, AtomicInteger> clusterRefs = new ConcurrentHashMap<>();
|
||||
|
|
@ -92,28 +88,19 @@ final class XdsNameResolver extends NameResolver {
|
|||
private XdsClient xdsClient;
|
||||
private ResolveState resolveState;
|
||||
|
||||
XdsNameResolver(String name,
|
||||
ServiceConfigParser serviceConfigParser,
|
||||
SynchronizationContext syncContext,
|
||||
XdsClientPoolFactory xdsClientPoolFactory) {
|
||||
this(name, serviceConfigParser, syncContext, Bootstrapper.getInstance(),
|
||||
XdsChannelFactory.getInstance(), xdsClientPoolFactory, ThreadSafeRandomImpl.instance);
|
||||
XdsNameResolver(String name, ServiceConfigParser serviceConfigParser,
|
||||
SynchronizationContext syncContext) {
|
||||
this(name, serviceConfigParser, syncContext, SharedXdsClientPoolProvider.getDefaultProvider(),
|
||||
ThreadSafeRandomImpl.instance);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
XdsNameResolver(
|
||||
String name,
|
||||
ServiceConfigParser serviceConfigParser,
|
||||
SynchronizationContext syncContext,
|
||||
Bootstrapper bootstrapper,
|
||||
XdsChannelFactory channelFactory,
|
||||
XdsClientPoolFactory xdsClientPoolFactory,
|
||||
XdsNameResolver(String name, ServiceConfigParser serviceConfigParser,
|
||||
SynchronizationContext syncContext, XdsClientPoolFactory xdsClientPoolFactory,
|
||||
ThreadSafeRandom random) {
|
||||
authority = GrpcUtil.checkAuthority(checkNotNull(name, "name"));
|
||||
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
|
||||
this.syncContext = checkNotNull(syncContext, "syncContext");
|
||||
this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper");
|
||||
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
|
||||
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
|
||||
this.random = checkNotNull(random, "random");
|
||||
logger = XdsLogger.withLogId(InternalLogId.allocate("xds-resolver", name));
|
||||
|
|
@ -128,17 +115,13 @@ final class XdsNameResolver extends NameResolver {
|
|||
@Override
|
||||
public void start(Listener2 listener) {
|
||||
this.listener = checkNotNull(listener, "listener");
|
||||
BootstrapInfo bootstrapInfo;
|
||||
XdsChannel channel;
|
||||
try {
|
||||
bootstrapInfo = bootstrapper.readBootstrap();
|
||||
channel = channelFactory.createChannel(bootstrapInfo.getServers());
|
||||
xdsClientPool = xdsClientPoolFactory.getXdsClientPool();
|
||||
} catch (Exception e) {
|
||||
listener.onError(
|
||||
Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
|
||||
return;
|
||||
}
|
||||
xdsClientPool = xdsClientPoolFactory.newXdsClientObjectPool(bootstrapInfo, channel);
|
||||
xdsClient = xdsClientPool.getObject();
|
||||
resolveState = new ResolveState();
|
||||
resolveState.start();
|
||||
|
|
|
|||
|
|
@ -19,22 +19,11 @@ package io.grpc.xds;
|
|||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Supplier;
|
||||
import io.grpc.Internal;
|
||||
import io.grpc.NameResolver.Args;
|
||||
import io.grpc.NameResolverProvider;
|
||||
import io.grpc.SynchronizationContext;
|
||||
import io.grpc.internal.BackoffPolicy;
|
||||
import io.grpc.internal.ExponentialBackoffPolicy;
|
||||
import io.grpc.internal.GrpcUtil;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.xds.Bootstrapper.BootstrapInfo;
|
||||
import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool;
|
||||
import io.grpc.xds.XdsClient.XdsChannel;
|
||||
import io.grpc.xds.XdsClient.XdsClientFactory;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
* A provider for {@link XdsNameResolver}.
|
||||
|
|
@ -61,13 +50,8 @@ public final class XdsNameResolverProvider extends NameResolverProvider {
|
|||
targetPath,
|
||||
targetUri);
|
||||
String name = targetPath.substring(1);
|
||||
XdsClientPoolFactory xdsClientPoolFactory =
|
||||
new RefCountedXdsClientPoolFactory(args.getSynchronizationContext(),
|
||||
args.getScheduledExecutorService(), new ExponentialBackoffPolicy.Provider(),
|
||||
GrpcUtil.STOPWATCH_SUPPLIER);
|
||||
return new XdsNameResolver(
|
||||
name, args.getServiceConfigParser(),
|
||||
args.getSynchronizationContext(), xdsClientPoolFactory);
|
||||
return new XdsNameResolver(name, args.getServiceConfigParser(),
|
||||
args.getSynchronizationContext());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
@ -89,38 +73,7 @@ public final class XdsNameResolverProvider extends NameResolverProvider {
|
|||
return 4;
|
||||
}
|
||||
|
||||
static class RefCountedXdsClientPoolFactory implements XdsClientPoolFactory {
|
||||
private final SynchronizationContext syncContext;
|
||||
private final ScheduledExecutorService timeService;
|
||||
private final BackoffPolicy.Provider backoffPolicyProvider;
|
||||
private final Supplier<Stopwatch> stopwatchSupplier;
|
||||
|
||||
RefCountedXdsClientPoolFactory(
|
||||
SynchronizationContext syncContext,
|
||||
ScheduledExecutorService timeService,
|
||||
BackoffPolicy.Provider backoffPolicyProvider,
|
||||
Supplier<Stopwatch> stopwatchSupplier) {
|
||||
this.syncContext = checkNotNull(syncContext, "syncContext");
|
||||
this.timeService = checkNotNull(timeService, "timeService");
|
||||
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
|
||||
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectPool<XdsClient> newXdsClientObjectPool(
|
||||
final BootstrapInfo bootstrapInfo, final XdsChannel channel) {
|
||||
XdsClientFactory xdsClientFactory = new XdsClientFactory() {
|
||||
@Override
|
||||
XdsClient createXdsClient() {
|
||||
return new XdsClientImpl2(channel, bootstrapInfo.getNode(), syncContext, timeService,
|
||||
backoffPolicyProvider, stopwatchSupplier);
|
||||
}
|
||||
};
|
||||
return new RefCountedXdsClientObjectPool(xdsClientFactory);
|
||||
}
|
||||
}
|
||||
|
||||
interface XdsClientPoolFactory {
|
||||
ObjectPool<XdsClient> newXdsClientObjectPool(BootstrapInfo bootstrapInfo, XdsChannel channel);
|
||||
ObjectPool<XdsClient> getXdsClientPool() throws XdsInitializationException;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Copyright 2020 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.xds.Bootstrapper.BootstrapInfo;
|
||||
import io.grpc.xds.Bootstrapper.ChannelCreds;
|
||||
import io.grpc.xds.Bootstrapper.ServerInfo;
|
||||
import io.grpc.xds.EnvoyProtoData.Node;
|
||||
import io.grpc.xds.SharedXdsClientPoolProvider.RefCountedXdsClientObjectPool;
|
||||
import io.grpc.xds.SharedXdsClientPoolProvider.RefCountedXdsClientObjectPool.XdsClientFactory;
|
||||
import io.grpc.xds.XdsClient.XdsChannel;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
|
||||
/** Tests for {@link SharedXdsClientPoolProvider}. */
|
||||
@RunWith(JUnit4.class)
|
||||
public class SharedXdsClientPoolProviderTest {
|
||||
|
||||
private final XdsChannel channel = new XdsChannel(mock(ManagedChannel.class), false);
|
||||
private final Node node = Node.newBuilder().setId("SharedXdsClientPoolProviderTest").build();
|
||||
private final AtomicReference<XdsClient> xdsClientRef = new AtomicReference<>();
|
||||
private final XdsClientFactory factory = new XdsClientFactory() {
|
||||
@Override
|
||||
XdsClient newXdsClient(XdsChannel channel, Node node, ScheduledExecutorService timeService) {
|
||||
XdsClient xdsClient = mock(XdsClient.class);
|
||||
xdsClientRef.set(xdsClient);
|
||||
return xdsClient;
|
||||
}
|
||||
};
|
||||
|
||||
@Test
|
||||
public void getXdsClientPool_sharedInstance() throws XdsInitializationException {
|
||||
ServerInfo server =
|
||||
new ServerInfo("trafficdirector.googleapis.com",
|
||||
Collections.singletonList(new ChannelCreds("insecure", null)),
|
||||
Collections.<String>emptyList());
|
||||
BootstrapInfo bootstrapInfo = new BootstrapInfo(Collections.singletonList(server), node, null);
|
||||
Bootstrapper bootstrapper = mock(Bootstrapper.class);
|
||||
when(bootstrapper.readBootstrap()).thenReturn(bootstrapInfo);
|
||||
XdsChannelFactory channelFactory = mock(XdsChannelFactory.class);
|
||||
when(channelFactory.createChannel(ArgumentMatchers.<ServerInfo>anyList())).thenReturn(channel);
|
||||
|
||||
SharedXdsClientPoolProvider provider =
|
||||
new SharedXdsClientPoolProvider(bootstrapper, channelFactory);
|
||||
|
||||
ObjectPool<XdsClient> xdsClientPool = provider.getXdsClientPool();
|
||||
verify(bootstrapper).readBootstrap();
|
||||
verify(channelFactory).createChannel(Collections.singletonList(server));
|
||||
assertThat(provider.getXdsClientPool()).isSameInstanceAs(xdsClientPool);
|
||||
verifyNoMoreInteractions(bootstrapper, channelFactory);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void refCountedXdsClientObjectPool_delayedCreation() {
|
||||
RefCountedXdsClientObjectPool xdsClientPool =
|
||||
new RefCountedXdsClientObjectPool(channel, node, factory);
|
||||
assertThat(xdsClientRef.get()).isNull();
|
||||
xdsClientPool.getObject();
|
||||
assertThat(xdsClientRef.get()).isNotNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void refCountedXdsClientObjectPool_refCounted() {
|
||||
RefCountedXdsClientObjectPool xdsClientPool =
|
||||
new RefCountedXdsClientObjectPool(channel, node, factory);
|
||||
|
||||
// getObject once
|
||||
XdsClient xdsClient = xdsClientPool.getObject();
|
||||
assertThat(xdsClient).isNotNull();
|
||||
// getObject twice
|
||||
assertThat(xdsClientPool.getObject()).isSameInstanceAs(xdsClient);
|
||||
// returnObject once
|
||||
assertThat(xdsClientPool.returnObject(xdsClient)).isNull();
|
||||
verify(xdsClient, never()).shutdown();
|
||||
// returnObject twice
|
||||
assertThat(xdsClientPool.returnObject(xdsClient)).isNull();
|
||||
verify(xdsClient).shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void refCountedXdsClientObjectPool_getObjectCreatesNewInstanceIfAlreadyShutdown() {
|
||||
RefCountedXdsClientObjectPool xdsClientPool =
|
||||
new RefCountedXdsClientObjectPool(channel, node, factory);
|
||||
XdsClient xdsClient1 = xdsClientPool.getObject();
|
||||
verify(xdsClient1, never()).shutdown();
|
||||
assertThat(xdsClientPool.returnObject(xdsClient1)).isNull();
|
||||
verify(xdsClient1).shutdown();
|
||||
|
||||
XdsClient xdsClient2 = xdsClientPool.getObject();
|
||||
assertThat(xdsClient2).isNotSameInstanceAs(xdsClient1);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,106 +0,0 @@
|
|||
/*
|
||||
* Copyright 2019 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.xds;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import io.grpc.xds.XdsClient.RefCountedXdsClientObjectPool;
|
||||
import io.grpc.xds.XdsClient.XdsClientFactory;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link XdsClient}.
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class XdsClientTest {
|
||||
@SuppressWarnings("deprecation") // https://github.com/grpc/grpc-java/issues/7467
|
||||
@Rule
|
||||
public final ExpectedException thrown = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void refCountedXdsClientObjectPool_getObjectShouldMatchReturnObject() {
|
||||
XdsClientFactory xdsClientFactory = new XdsClientFactory() {
|
||||
@Override
|
||||
XdsClient createXdsClient() {
|
||||
return mock(XdsClient.class);
|
||||
}
|
||||
};
|
||||
RefCountedXdsClientObjectPool xdsClientPool =
|
||||
new RefCountedXdsClientObjectPool(xdsClientFactory);
|
||||
|
||||
// getObject once
|
||||
XdsClient xdsClient = xdsClientPool.getObject();
|
||||
assertThat(xdsClient).isNotNull();
|
||||
// getObject twice
|
||||
assertThat(xdsClientPool.getObject()).isSameInstanceAs(xdsClient);
|
||||
// returnObject once
|
||||
assertThat(xdsClientPool.returnObject(xdsClient)).isNull();
|
||||
verify(xdsClient, never()).shutdown();
|
||||
// returnObject twice
|
||||
assertThat(xdsClientPool.returnObject(xdsClient)).isNull();
|
||||
verify(xdsClient).shutdown();
|
||||
assertThat(xdsClientPool.xdsClient).isNull();
|
||||
|
||||
thrown.expect(IllegalStateException.class);
|
||||
// returnOject for the 3rd time
|
||||
xdsClientPool.returnObject(xdsClient);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void refCountedXdsClientObjectPool_returnWrongObjectShouldThrow() {
|
||||
XdsClientFactory xdsClientFactory = new XdsClientFactory() {
|
||||
@Override
|
||||
XdsClient createXdsClient() {
|
||||
return mock(XdsClient.class);
|
||||
}
|
||||
};
|
||||
RefCountedXdsClientObjectPool xdsClientPool =
|
||||
new RefCountedXdsClientObjectPool(xdsClientFactory);
|
||||
|
||||
xdsClientPool.getObject();
|
||||
|
||||
thrown.expect(IllegalStateException.class);
|
||||
xdsClientPool.returnObject(mock(XdsClient.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void refCountedXdsClientObjectPool_getObjectCreatesNewInstanceIfAlreadyShutdown() {
|
||||
XdsClientFactory xdsClientFactory = new XdsClientFactory() {
|
||||
@Override
|
||||
XdsClient createXdsClient() {
|
||||
return mock(XdsClient.class);
|
||||
}
|
||||
};
|
||||
RefCountedXdsClientObjectPool xdsClientPool =
|
||||
new RefCountedXdsClientObjectPool(xdsClientFactory);
|
||||
|
||||
XdsClient xdsClient1 = xdsClientPool.getObject();
|
||||
verify(xdsClient1, never()).shutdown();
|
||||
assertThat(xdsClientPool.returnObject(xdsClient1)).isNull();
|
||||
verify(xdsClient1).shutdown();
|
||||
|
||||
XdsClient xdsClient2 = xdsClientPool.getObject();
|
||||
assertThat(xdsClient2).isNotSameInstanceAs(xdsClient1);
|
||||
}
|
||||
}
|
||||
|
|
@ -18,7 +18,6 @@ package io.grpc.xds;
|
|||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
|
@ -26,12 +25,10 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.InternalConfigSelector;
|
||||
import io.grpc.InternalConfigSelector.Result;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.MethodDescriptor.MethodType;
|
||||
|
|
@ -47,15 +44,11 @@ import io.grpc.internal.JsonUtil;
|
|||
import io.grpc.internal.ObjectPool;
|
||||
import io.grpc.internal.PickSubchannelArgsImpl;
|
||||
import io.grpc.testing.TestMethodDescriptors;
|
||||
import io.grpc.xds.Bootstrapper.BootstrapInfo;
|
||||
import io.grpc.xds.Bootstrapper.ServerInfo;
|
||||
import io.grpc.xds.EnvoyProtoData.ClusterWeight;
|
||||
import io.grpc.xds.EnvoyProtoData.Node;
|
||||
import io.grpc.xds.EnvoyProtoData.Route;
|
||||
import io.grpc.xds.EnvoyProtoData.RouteAction;
|
||||
import io.grpc.xds.EnvoyProtoData.VirtualHost;
|
||||
import io.grpc.xds.XdsClient.RdsResourceWatcher;
|
||||
import io.grpc.xds.XdsClient.XdsChannel;
|
||||
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
|
@ -95,12 +88,6 @@ public class XdsNameResolverTest {
|
|||
return ConfigOrError.fromConfig(rawServiceConfig);
|
||||
}
|
||||
};
|
||||
private final XdsChannelFactory channelFactory = new XdsChannelFactory() {
|
||||
@Override
|
||||
XdsChannel createChannel(List<ServerInfo> servers) throws XdsInitializationException {
|
||||
return new XdsChannel(mock(ManagedChannel.class), false);
|
||||
}
|
||||
};
|
||||
private final FakeXdsClientPoolFactory xdsClientPoolFactory = new FakeXdsClientPoolFactory();
|
||||
private final String cluster1 = "cluster-foo.googleapis.com";
|
||||
private final String cluster2 = "cluster-bar.googleapis.com";
|
||||
|
|
@ -120,20 +107,8 @@ public class XdsNameResolverTest {
|
|||
@Before
|
||||
public void setUp() {
|
||||
XdsNameResolver.enableTimeout = true;
|
||||
Bootstrapper bootstrapper = new Bootstrapper() {
|
||||
@Override
|
||||
public BootstrapInfo readBootstrap() {
|
||||
return new BootstrapInfo(
|
||||
ImmutableList.of(
|
||||
new ServerInfo(
|
||||
"trafficdirector.googleapis.com",
|
||||
ImmutableList.<ChannelCreds>of(), ImmutableList.<String>of())),
|
||||
Node.newBuilder().build(),
|
||||
null);
|
||||
}
|
||||
};
|
||||
resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, bootstrapper,
|
||||
channelFactory, xdsClientPoolFactory, mockRandom);
|
||||
resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext,
|
||||
xdsClientPoolFactory, mockRandom);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
@ -147,15 +122,15 @@ public class XdsNameResolverTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void resolving_failToBootstrap() {
|
||||
Bootstrapper bootstrapper = new Bootstrapper() {
|
||||
public void resolving_failToCreateXdsClientPool() {
|
||||
XdsClientPoolFactory xdsClientPoolFactory = new XdsClientPoolFactory() {
|
||||
@Override
|
||||
public BootstrapInfo readBootstrap() throws XdsInitializationException {
|
||||
public ObjectPool<XdsClient> getXdsClientPool() throws XdsInitializationException {
|
||||
throw new XdsInitializationException("Fail to read bootstrap file");
|
||||
}
|
||||
};
|
||||
resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, bootstrapper,
|
||||
channelFactory, xdsClientPoolFactory, mockRandom);
|
||||
resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext,
|
||||
xdsClientPoolFactory, mockRandom);
|
||||
resolver.start(mockListener);
|
||||
verify(mockListener).onError(errorCaptor.capture());
|
||||
Status error = errorCaptor.getValue();
|
||||
|
|
@ -164,37 +139,6 @@ public class XdsNameResolverTest {
|
|||
assertThat(error.getCause()).hasMessageThat().isEqualTo("Fail to read bootstrap file");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void resolving_failToCreateXdsChannel() {
|
||||
Bootstrapper bootstrapper = new Bootstrapper() {
|
||||
@Override
|
||||
public BootstrapInfo readBootstrap() {
|
||||
return new BootstrapInfo(
|
||||
ImmutableList.of(
|
||||
new ServerInfo(
|
||||
"trafficdirector.googleapis.com",
|
||||
ImmutableList.<ChannelCreds>of(), ImmutableList.<String>of())),
|
||||
Node.newBuilder().build(),
|
||||
null);
|
||||
}
|
||||
};
|
||||
XdsChannelFactory channelFactory = new XdsChannelFactory() {
|
||||
@Override
|
||||
XdsChannel createChannel(List<ServerInfo> servers) throws XdsInitializationException {
|
||||
throw new XdsInitializationException("No server with supported channel creds found");
|
||||
}
|
||||
};
|
||||
resolver = new XdsNameResolver(AUTHORITY, serviceConfigParser, syncContext, bootstrapper,
|
||||
channelFactory, xdsClientPoolFactory, mockRandom);
|
||||
resolver.start(mockListener);
|
||||
verify(mockListener).onError(errorCaptor.capture());
|
||||
Status error = errorCaptor.getValue();
|
||||
assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
assertThat(error.getDescription()).isEqualTo("Failed to initialize xDS");
|
||||
assertThat(error.getCause()).hasMessageThat()
|
||||
.isEqualTo("No server with supported channel creds found");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void resolving_ldsResourceNotFound() {
|
||||
resolver.start(mockListener);
|
||||
|
|
@ -740,9 +684,9 @@ public class XdsNameResolverTest {
|
|||
}
|
||||
|
||||
private final class FakeXdsClientPoolFactory implements XdsClientPoolFactory {
|
||||
|
||||
@Override
|
||||
public ObjectPool<XdsClient> newXdsClientObjectPool(
|
||||
BootstrapInfo bootstrapInfo, XdsChannel channel) {
|
||||
public ObjectPool<XdsClient> getXdsClientPool() throws XdsInitializationException {
|
||||
return new ObjectPool<XdsClient>() {
|
||||
@Override
|
||||
public XdsClient getObject() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue