xds: use singleton XdsClient for server side (#8130)

This commit is contained in:
sanjaypujare 2021-04-30 09:52:56 -07:00 committed by GitHub
parent 5d99bb07b8
commit 02ff64fa21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 139 additions and 159 deletions

View File

@ -22,14 +22,10 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.UInt32Value;
import io.grpc.Grpc;
import io.grpc.Internal;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.TimeProvider;
import io.grpc.xds.EnvoyServerProtoData.CidrRange;
import io.grpc.xds.EnvoyServerProtoData.DownstreamTlsContext;
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
@ -38,7 +34,6 @@ import io.netty.channel.Channel;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.math.BigInteger;
import java.net.Inet6Address;
import java.net.InetAddress;
@ -72,7 +67,8 @@ public final class XdsClientWrapperForServerSds {
new TimeServiceResource("GrpcServerXdsClient");
private AtomicReference<EnvoyServerProtoData.Listener> curListener = new AtomicReference<>();
@SuppressWarnings("unused")
private ObjectPool<XdsClient> xdsClientPool;
private final XdsNameResolverProvider.XdsClientPoolFactory xdsClientPoolFactory;
@Nullable private XdsClient xdsClient;
private final int port;
private ScheduledExecutorService timeService;
@ -85,55 +81,31 @@ public final class XdsClientWrapperForServerSds {
*
* @param port server's port for which listener config is needed.
*/
public XdsClientWrapperForServerSds(int port) {
XdsClientWrapperForServerSds(int port) {
this(port, SharedXdsClientPoolProvider.getDefaultProvider());
}
@VisibleForTesting
XdsClientWrapperForServerSds(int port,
XdsNameResolverProvider.XdsClientPoolFactory xdsClientPoolFactory) {
this.port = port;
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
}
public boolean hasXdsClient() {
return xdsClient != null;
}
/** Creates an XdsClient and starts a watch. */
public void createXdsClientAndStart() throws IOException {
checkState(xdsClient == null, "start() called more than once");
Bootstrapper.BootstrapInfo bootstrapInfo;
try {
bootstrapInfo = new BootstrapperImpl().bootstrap();
List<Bootstrapper.ServerInfo> serverList = bootstrapInfo.getServers();
if (serverList.isEmpty()) {
throw new XdsInitializationException("No management server provided by bootstrap");
}
} catch (XdsInitializationException e) {
throw new IOException(e);
}
Bootstrapper.ServerInfo serverInfo = bootstrapInfo.getServers().get(0); // use first server
ManagedChannel channel =
Grpc.newChannelBuilder(serverInfo.getTarget(), serverInfo.getChannelCredentials())
.keepAliveTime(5, TimeUnit.MINUTES).build();
timeService = SharedResourceHolder.get(timeServiceResource);
newServerApi = serverInfo.isUseProtocolV3();
String grpcServerResourceId = bootstrapInfo.getServerListenerResourceNameTemplate();
if (newServerApi && grpcServerResourceId == null) {
throw new IOException(
"missing server_listener_resource_name_template value in xds bootstrap");
}
XdsClient xdsClientImpl =
new ClientXdsClient(
channel,
bootstrapInfo,
timeService,
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER);
start(xdsClientImpl, grpcServerResourceId);
@VisibleForTesting XdsClient getXdsClient() {
return xdsClient;
}
/** Accepts an XdsClient and starts a watch. */
@VisibleForTesting
public void start(XdsClient xdsClient, String grpcServerResourceId) {
checkState(this.xdsClient == null, "start() called more than once");
checkNotNull(xdsClient, "xdsClient");
this.xdsClient = xdsClient;
public void start() {
try {
xdsClientPool = xdsClientPoolFactory.getOrCreate();
} catch (Exception e) {
reportError(e, true);
return;
}
xdsClient = xdsClientPool.getObject();
this.listenerWatcher =
new XdsClient.LdsResourceWatcher() {
@Override
@ -156,6 +128,15 @@ public final class XdsClientWrapperForServerSds {
reportError(error.asException(), isResourceAbsent(error));
}
};
String grpcServerResourceId = xdsClient.getBootstrapInfo()
.getServerListenerResourceNameTemplate();
newServerApi = xdsClient.getBootstrapInfo().getServers().get(0).isUseProtocolV3();
if (newServerApi && grpcServerResourceId == null) {
reportError(
new XdsInitializationException(
"missing server_listener_resource_name_template value in xds bootstrap"),
true);
}
grpcServerResourceId = grpcServerResourceId.replaceAll("%s", "0.0.0.0:" + port);
xdsClient.watchLdsResource(grpcServerResourceId, listenerWatcher);
}
@ -438,8 +419,7 @@ public final class XdsClientWrapperForServerSds {
public void shutdown() {
logger.log(Level.FINER, "Shutdown");
if (xdsClient != null) {
xdsClient.shutdown();
xdsClient = null;
xdsClient = xdsClientPool.returnObject(xdsClient);
}
if (timeService != null) {
timeService = SharedResourceHolder.release(timeServiceResource, timeService);

View File

@ -113,9 +113,7 @@ public final class ServerWrapperForXds extends Server {
checkState(started.compareAndSet(false, true), "Already started");
currentServingState = ServingState.STARTING;
SettableFuture<Throwable> future = addServerWatcher();
if (!xdsClientWrapperForServerSds.hasXdsClient()) {
xdsClientWrapperForServerSds.createXdsClientAndStart();
}
xdsClientWrapperForServerSds.start();
try {
Throwable throwable = future.get();
if (throwable != null) {

View File

@ -45,7 +45,6 @@ public class FilterChainMatchTest {
private static final String LOCAL_IP = "10.1.2.3"; // dest
private static final String REMOTE_IP = "10.4.2.3"; // source
@Mock private XdsClient xdsClient;
@Mock private Channel channel;
private XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
@ -54,9 +53,9 @@ public class FilterChainMatchTest {
@Before
public void setUp() throws IOException {
MockitoAnnotations.initMocks(this);
xdsClientWrapperForServerSds = new XdsClientWrapperForServerSds(PORT);
xdsClientWrapperForServerSds = XdsServerTestHelper.createXdsClientWrapperForServerSds(PORT);
registeredWatcher =
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds, xdsClient, PORT);
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds);
}
@After

View File

@ -64,7 +64,6 @@ public class ServerWrapperForXdsTest {
private int port;
private XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
private XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener;
private XdsClient mockXdsClient;
private XdsClient.LdsResourceWatcher listenerWatcher;
private Server mockServer;
@ -72,11 +71,10 @@ public class ServerWrapperForXdsTest {
public void setUp() throws IOException {
port = XdsServerTestHelper.findFreePort();
mockDelegateBuilder = mock(ServerBuilder.class);
xdsClientWrapperForServerSds = new XdsClientWrapperForServerSds(port);
xdsClientWrapperForServerSds = XdsServerTestHelper.createXdsClientWrapperForServerSds(port);
mockXdsServingStatusListener = mock(XdsServerBuilder.XdsServingStatusListener.class);
mockXdsClient = mock(XdsClient.class);
listenerWatcher =
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds, mockXdsClient, port);
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds);
mockServer = mock(Server.class);
when(mockDelegateBuilder.build()).thenReturn(mockServer);
serverWrapperForXds = new ServerWrapperForXds(mockDelegateBuilder,

View File

@ -18,7 +18,6 @@ package io.grpc.xds;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -53,7 +52,6 @@ public class XdsClientWrapperForServerSdsTestMisc {
private static final int PORT = 7000;
@Mock private XdsClient xdsClient;
@Mock private Channel channel;
private XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
@ -62,7 +60,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
@Before
public void setUp() throws IOException {
MockitoAnnotations.initMocks(this);
xdsClientWrapperForServerSds = new XdsClientWrapperForServerSds(PORT);
xdsClientWrapperForServerSds = XdsServerTestHelper.createXdsClientWrapperForServerSds(PORT);
}
@After
@ -73,14 +71,14 @@ public class XdsClientWrapperForServerSdsTestMisc {
@Test
public void nonInetSocketAddress_expectNull() throws UnknownHostException {
registeredWatcher =
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds, xdsClient, PORT);
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds);
assertThat(sendListenerUpdate(new InProcessSocketAddress("test1"), null)).isNull();
}
@Test
public void nonMatchingPort_expectException() throws UnknownHostException {
registeredWatcher =
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds, xdsClient, PORT);
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds);
try {
InetAddress ipLocalAddress = InetAddress.getByName("10.1.2.3");
InetSocketAddress localAddress = new InetSocketAddress(ipLocalAddress, PORT + 1);
@ -96,11 +94,12 @@ public class XdsClientWrapperForServerSdsTestMisc {
@Test
public void emptyFilterChain_expectNull() throws UnknownHostException {
registeredWatcher =
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds, xdsClient, PORT);
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds);
InetAddress ipLocalAddress = InetAddress.getByName("10.1.2.3");
InetSocketAddress localAddress = new InetSocketAddress(ipLocalAddress, PORT);
ArgumentCaptor<XdsClient.LdsResourceWatcher> listenerWatcherCaptor = ArgumentCaptor
.forClass(null);
XdsClient xdsClient = xdsClientWrapperForServerSds.getXdsClient();
verify(xdsClient)
.watchLdsResource(eq("grpc/server?udpa.resource.listening_address=0.0.0.0:" + PORT),
listenerWatcherCaptor.capture());
@ -121,7 +120,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
@Test
public void registerServerWatcher() throws UnknownHostException {
registeredWatcher =
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds, xdsClient, PORT);
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds);
XdsClientWrapperForServerSds.ServerWatcher mockServerWatcher =
mock(XdsClientWrapperForServerSds.ServerWatcher.class);
xdsClientWrapperForServerSds.addServerWatcher(mockServerWatcher);
@ -140,7 +139,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
@Test
public void registerServerWatcher_afterListenerUpdate() throws UnknownHostException {
registeredWatcher =
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds, xdsClient, PORT);
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds);
InetAddress ipLocalAddress = InetAddress.getByName("10.1.2.3");
InetSocketAddress localAddress = new InetSocketAddress(ipLocalAddress, PORT);
EnvoyServerProtoData.DownstreamTlsContext tlsContext =
@ -156,7 +155,7 @@ public class XdsClientWrapperForServerSdsTestMisc {
@Test
public void registerServerWatcher_notifyError() throws UnknownHostException {
registeredWatcher =
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds, xdsClient, PORT);
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds);
XdsClientWrapperForServerSds.ServerWatcher mockServerWatcher =
mock(XdsClientWrapperForServerSds.ServerWatcher.class);
xdsClientWrapperForServerSds.addServerWatcher(mockServerWatcher);
@ -186,33 +185,6 @@ public class XdsClientWrapperForServerSdsTestMisc {
verify(mockServerWatcher).onListenerUpdate();
}
@Test
public void startXdsClient_expectException() {
XdsClientWrapperForServerSds.ServerWatcher mockServerWatcher =
mock(XdsClientWrapperForServerSds.ServerWatcher.class);
xdsClientWrapperForServerSds.addServerWatcher(mockServerWatcher);
try {
xdsClientWrapperForServerSds.createXdsClientAndStart();
fail("exception expected");
} catch (IOException expected) {
assertThat(expected)
.hasMessageThat()
.contains("Cannot find bootstrap configuration");
}
verify(mockServerWatcher, never()).onError(any(Throwable.class), eq(false));
}
@Test
public void xdsClientStart_multipleReplacements() {
xdsClientWrapperForServerSds.start(xdsClient, "grpc/server?%s and %s and one more %s");
ArgumentCaptor<XdsClient.LdsResourceWatcher> listenerWatcherCaptor =
ArgumentCaptor.forClass(null);
verify(xdsClient)
.watchLdsResource(
eq("grpc/server?0.0.0.0:7000 and 0.0.0.0:7000 and one more 0.0.0.0:7000"),
listenerWatcherCaptor.capture());
}
private DownstreamTlsContext sendListenerUpdate(
SocketAddress localAddress, DownstreamTlsContext tlsContext) throws UnknownHostException {
when(channel.localAddress()).thenReturn(localAddress);
@ -226,11 +198,9 @@ public class XdsClientWrapperForServerSdsTestMisc {
/** Creates XdsClientWrapperForServerSds: also used by other classes. */
public static XdsClientWrapperForServerSds createXdsClientWrapperForServerSds(
int port, DownstreamTlsContext downstreamTlsContext) {
XdsClient mockXdsClient = mock(XdsClient.class);
XdsClientWrapperForServerSds xdsClientWrapperForServerSds =
new XdsClientWrapperForServerSds(port);
xdsClientWrapperForServerSds.start(
mockXdsClient, "grpc/server?udpa.resource.listening_address=%s");
XdsServerTestHelper.createXdsClientWrapperForServerSds(port);
xdsClientWrapperForServerSds.start();
XdsSdsClientServerTest.generateListenerUpdateToWatcher(
downstreamTlsContext, xdsClientWrapperForServerSds.getListenerWatcher());
return xdsClientWrapperForServerSds;

View File

@ -333,10 +333,9 @@ public class XdsSdsClientServerTest {
private void buildServerWithTlsContext(
DownstreamTlsContext downstreamTlsContext, ServerCredentials fallbackCredentials)
throws IOException {
XdsClient mockXdsClient = mock(XdsClient.class);
XdsClientWrapperForServerSds xdsClientWrapperForServerSds =
new XdsClientWrapperForServerSds(port);
xdsClientWrapperForServerSds.start(mockXdsClient, "grpc/server");
createXdsClientWrapperForServerSds(port);
xdsClientWrapperForServerSds.start();
buildServerWithFallbackServerCredentials(
xdsClientWrapperForServerSds, fallbackCredentials, downstreamTlsContext);
}
@ -352,10 +351,9 @@ public class XdsSdsClientServerTest {
/** Creates XdsClientWrapperForServerSds. */
private static XdsClientWrapperForServerSds createXdsClientWrapperForServerSds(int port) {
XdsClient mockXdsClient = mock(XdsClient.class);
XdsClientWrapperForServerSds xdsClientWrapperForServerSds =
new XdsClientWrapperForServerSds(port);
xdsClientWrapperForServerSds.start(mockXdsClient, "grpc/server");
XdsServerTestHelper.createXdsClientWrapperForServerSds(port);
xdsClientWrapperForServerSds.start();
return xdsClientWrapperForServerSds;
}

View File

@ -58,23 +58,20 @@ import org.mockito.ArgumentCaptor;
public class XdsServerBuilderTest {
@Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
private XdsClient mockXdsClient;
private XdsServerBuilder builder;
private ServerWrapperForXds xdsServer;
private XdsClient.LdsResourceWatcher listenerWatcher;
private int port;
private XdsClientWrapperForServerSds xdsClientWrapperForServerSds;
private void buildServer(
XdsServerBuilder.XdsServingStatusListener xdsServingStatusListener,
boolean injectMockXdsClient)
private void buildServer(XdsServerBuilder.XdsServingStatusListener xdsServingStatusListener)
throws IOException {
buildBuilder(xdsServingStatusListener, injectMockXdsClient);
buildBuilder(xdsServingStatusListener);
xdsServer = cleanupRule.register(builder.buildServer(xdsClientWrapperForServerSds));
}
private void buildBuilder(XdsServerBuilder.XdsServingStatusListener xdsServingStatusListener,
boolean injectMockXdsClient) throws IOException {
private void buildBuilder(XdsServerBuilder.XdsServingStatusListener xdsServingStatusListener)
throws IOException {
port = XdsServerTestHelper.findFreePort();
builder =
XdsServerBuilder.forPort(
@ -82,12 +79,8 @@ public class XdsServerBuilderTest {
if (xdsServingStatusListener != null) {
builder = builder.xdsServingStatusListener(xdsServingStatusListener);
}
xdsClientWrapperForServerSds = new XdsClientWrapperForServerSds(port);
if (injectMockXdsClient) {
mockXdsClient = mock(XdsClient.class);
listenerWatcher =
XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds, mockXdsClient, port);
}
xdsClientWrapperForServerSds = XdsServerTestHelper.createXdsClientWrapperForServerSds(port);
listenerWatcher = XdsServerTestHelper.startAndGetWatcher(xdsClientWrapperForServerSds);
}
private void verifyServer(
@ -121,7 +114,7 @@ public class XdsServerBuilderTest {
private void verifyShutdown() throws InterruptedException {
xdsServer.shutdown();
xdsServer.awaitTermination(500L, TimeUnit.MILLISECONDS);
verify(mockXdsClient, times(1)).shutdown();
assertThat(xdsClientWrapperForServerSds.getXdsClient()).isNull();
}
private Future<Throwable> startServerAsync() throws InterruptedException {
@ -147,7 +140,7 @@ public class XdsServerBuilderTest {
@Test
public void xdsServerStartAndShutdown()
throws IOException, InterruptedException, TimeoutException, ExecutionException {
buildServer(null, true);
buildServer(null);
Future<Throwable> future = startServerAsync();
XdsServerTestHelper.generateListenerUpdate(
listenerWatcher,
@ -160,7 +153,7 @@ public class XdsServerBuilderTest {
@Test
public void xdsServerStartAfterListenerUpdate()
throws IOException, InterruptedException, TimeoutException, ExecutionException {
buildServer(null, true);
buildServer(null);
XdsServerTestHelper.generateListenerUpdate(
listenerWatcher,
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1")
@ -173,7 +166,6 @@ public class XdsServerBuilderTest {
assertThat(expected).hasMessageThat().contains("Already started");
}
verifyServer(null,null, null);
verifyShutdown();
}
@Test
@ -181,14 +173,13 @@ public class XdsServerBuilderTest {
throws IOException, InterruptedException, TimeoutException, ExecutionException {
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener =
mock(XdsServerBuilder.XdsServingStatusListener.class);
buildServer(mockXdsServingStatusListener, true);
buildServer(mockXdsServingStatusListener);
Future<Throwable> future = startServerAsync();
XdsServerTestHelper.generateListenerUpdate(
listenerWatcher,
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1")
);
verifyServer(future, mockXdsServingStatusListener, null);
verifyShutdown();
}
@Test
@ -196,7 +187,7 @@ public class XdsServerBuilderTest {
throws IOException, InterruptedException, TimeoutException, ExecutionException {
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener =
mock(XdsServerBuilder.XdsServingStatusListener.class);
buildServer(mockXdsServingStatusListener, true);
buildServer(mockXdsServingStatusListener);
Future<Throwable> future = startServerAsync();
listenerWatcher.onError(Status.ABORTED);
ArgumentCaptor<Throwable> argCaptor = ArgumentCaptor.forClass(null);
@ -230,7 +221,6 @@ public class XdsServerBuilderTest {
CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1")
);
verifyServer(future, mockXdsServingStatusListener, null);
verifyShutdown();
}
@Test
@ -238,7 +228,7 @@ public class XdsServerBuilderTest {
throws IOException, InterruptedException, TimeoutException, ExecutionException {
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener =
mock(XdsServerBuilder.XdsServingStatusListener.class);
buildServer(mockXdsServingStatusListener, true);
buildServer(mockXdsServingStatusListener);
Future<Throwable> future = startServerAsync();
// create port conflict for start to fail
ServerSocket serverSocket = new ServerSocket(port);
@ -253,27 +243,12 @@ public class XdsServerBuilderTest {
serverSocket.close();
}
@Test
public void xdsServerWithoutMockXdsClient_startError()
throws IOException, InterruptedException, TimeoutException, ExecutionException {
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener =
mock(XdsServerBuilder.XdsServingStatusListener.class);
buildServer(mockXdsServingStatusListener, false);
try {
xdsServer.start();
fail("exception expected");
} catch (IOException expected) {
assertThat(expected).hasMessageThat().contains("Cannot find bootstrap configuration");
}
verify(mockXdsServingStatusListener, never()).onNotServing(any(Throwable.class));
}
@Test
public void xdsServerStartSecondUpdateAndError()
throws IOException, InterruptedException, TimeoutException, ExecutionException {
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener =
mock(XdsServerBuilder.XdsServingStatusListener.class);
buildServer(mockXdsServingStatusListener, true);
buildServer(mockXdsServingStatusListener);
Future<Throwable> future = startServerAsync();
XdsServerTestHelper.generateListenerUpdate(
listenerWatcher,
@ -287,14 +262,13 @@ public class XdsServerBuilderTest {
verifyServer(future, mockXdsServingStatusListener, null);
listenerWatcher.onError(Status.ABORTED);
verifyServer(null, mockXdsServingStatusListener, null);
verifyShutdown();
}
@Test
public void xdsServer_2ndBuild_expectException() throws IOException {
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener =
mock(XdsServerBuilder.XdsServingStatusListener.class);
buildServer(mockXdsServingStatusListener, true);
buildServer(mockXdsServingStatusListener);
try {
builder.build();
fail("exception expected");
@ -307,7 +281,7 @@ public class XdsServerBuilderTest {
public void xdsServer_2ndSetter_expectException() throws IOException {
XdsServerBuilder.XdsServingStatusListener mockXdsServingStatusListener =
mock(XdsServerBuilder.XdsServingStatusListener.class);
buildBuilder(mockXdsServingStatusListener, true);
buildBuilder(mockXdsServingStatusListener);
BindableService mockBindableService = mock(BindableService.class);
ServerServiceDefinition serverServiceDefinition = io.grpc.ServerServiceDefinition
.builder("mock").build();

View File

@ -16,12 +16,18 @@
package io.grpc.xds;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.grpc.InsecureChannelCredentials;
import io.grpc.internal.ObjectPool;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Arrays;
import java.util.Map;
import javax.annotation.Nullable;
import org.mockito.ArgumentCaptor;
/**
@ -29,17 +35,74 @@ import org.mockito.ArgumentCaptor;
*/
class XdsServerTestHelper {
private static final String SERVER_URI = "trafficdirector.googleapis.com";
private static final String NODE_ID =
"projects/42/networks/default/nodes/5c85b298-6f5b-4722-b74a-f7d1f0ccf5ad";
private static final EnvoyProtoData.Node BOOTSTRAP_NODE =
EnvoyProtoData.Node.newBuilder().setId(NODE_ID).build();
static final Bootstrapper.BootstrapInfo BOOTSTRAP_INFO =
new Bootstrapper.BootstrapInfo(
Arrays.asList(
new Bootstrapper.ServerInfo(SERVER_URI, InsecureChannelCredentials.create(), false)),
BOOTSTRAP_NODE,
null,
"grpc/server?udpa.resource.listening_address=%s");
static final class FakeXdsClientPoolFactory
implements XdsNameResolverProvider.XdsClientPoolFactory {
private XdsClient xdsClient;
FakeXdsClientPoolFactory(XdsClient xdsClient) {
this.xdsClient = xdsClient;
}
@Override
public void setBootstrapOverride(Map<String, ?> bootstrap) {
throw new UnsupportedOperationException("Should not be called");
}
@Override
@Nullable
public ObjectPool<XdsClient> get() {
throw new UnsupportedOperationException("Should not be called");
}
@Override
public ObjectPool<XdsClient> getOrCreate() throws XdsInitializationException {
return new ObjectPool<XdsClient>() {
@Override
public XdsClient getObject() {
return xdsClient;
}
@Override
public XdsClient returnObject(Object object) {
return null;
}
};
}
}
static XdsClientWrapperForServerSds createXdsClientWrapperForServerSds(int port) {
FakeXdsClientPoolFactory fakeXdsClientPoolFactory = new FakeXdsClientPoolFactory(
buildMockXdsClient());
return new XdsClientWrapperForServerSds(port, fakeXdsClientPoolFactory);
}
private static XdsClient buildMockXdsClient() {
XdsClient xdsClient = mock(XdsClient.class);
when(xdsClient.getBootstrapInfo()).thenReturn(BOOTSTRAP_INFO);
return xdsClient;
}
static XdsClient.LdsResourceWatcher startAndGetWatcher(
XdsClientWrapperForServerSds xdsClientWrapperForServerSds,
XdsClient mockXdsClient,
int port) {
xdsClientWrapperForServerSds.start(
mockXdsClient, "grpc/server?udpa.resource.listening_address=%s");
ArgumentCaptor<XdsClient.LdsResourceWatcher> listenerWatcherCaptor = ArgumentCaptor
.forClass(null);
verify(mockXdsClient)
.watchLdsResource(eq("grpc/server?udpa.resource.listening_address=0.0.0.0:" + port),
listenerWatcherCaptor.capture());
XdsClientWrapperForServerSds xdsClientWrapperForServerSds) {
xdsClientWrapperForServerSds.start();
XdsClient mockXdsClient = xdsClientWrapperForServerSds.getXdsClient();
ArgumentCaptor<XdsClient.LdsResourceWatcher> listenerWatcherCaptor =
ArgumentCaptor.forClass(null);
verify(mockXdsClient).watchLdsResource(any(String.class), listenerWatcherCaptor.capture());
return listenerWatcherCaptor.getValue();
}