xds: fix ServerXdsClient to return subscribed resources only for LDS (#7689)

This commit is contained in:
sanjaypujare 2020-12-09 17:42:12 -08:00 committed by GitHub
parent f5865d5bf2
commit 20fc907b21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 17 deletions

View File

@ -56,20 +56,25 @@ final class ServerXdsClient extends AbstractXdsClient {
@Nullable
private ListenerWatcher listenerWatcher;
private int listenerPort = -1;
private final boolean newServerApi;
private final boolean useNewApiForListenerQuery;
@Nullable private final String instanceIp;
private String grpcServerResourceId;
@Nullable
private ScheduledHandle ldsRespTimer;
ServerXdsClient(XdsChannel channel, Node node, ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider, Supplier<Stopwatch> stopwatchSupplier,
boolean newServerApi, String instanceIp, String grpcServerResourceId) {
ServerXdsClient(
XdsChannel channel,
Node node,
ScheduledExecutorService timeService,
BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier,
boolean useNewApiForListenerQuery,
String instanceIp,
String grpcServerResourceId) {
super(channel, node, timeService, backoffPolicyProvider, stopwatchSupplier);
this.newServerApi = channel.isUseProtocolV3() && newServerApi;
this.useNewApiForListenerQuery = channel.isUseProtocolV3() && useNewApiForListenerQuery;
this.instanceIp = (instanceIp != null ? instanceIp : "0.0.0.0");
this.grpcServerResourceId =
(grpcServerResourceId != null) ? grpcServerResourceId : "grpc/server";
this.grpcServerResourceId = grpcServerResourceId != null ? grpcServerResourceId : "grpc/server";
}
@Override
@ -78,7 +83,7 @@ final class ServerXdsClient extends AbstractXdsClient {
listenerWatcher = checkNotNull(watcher, "watcher");
checkArgument(port > 0, "port needs to be > 0");
listenerPort = port;
if (newServerApi) {
if (useNewApiForListenerQuery) {
String listeningAddress = instanceIp + ":" + listenerPort;
grpcServerResourceId =
grpcServerResourceId + "?udpa.resource.listening_address=" + listeningAddress;
@ -89,7 +94,7 @@ final class ServerXdsClient extends AbstractXdsClient {
@Override
public void run() {
getLogger().log(XdsLogLevel.INFO, "Started watching listener for port {0}", port);
if (!newServerApi) {
if (!useNewApiForListenerQuery) {
updateNodeMetadataForListenerRequest(port);
}
adjustResourceSubscription(ResourceType.LDS);
@ -107,7 +112,10 @@ final class ServerXdsClient extends AbstractXdsClient {
@Nullable
@Override
Collection<String> getSubscribedResources(ResourceType type) {
if (newServerApi) {
if (type != ResourceType.LDS) {
return null;
}
if (useNewApiForListenerQuery) {
return ImmutableList.<String>of(grpcServerResourceId);
} else {
return Collections.emptyList();
@ -175,17 +183,17 @@ final class ServerXdsClient extends AbstractXdsClient {
}
private boolean isRequestedListener(Listener listener) {
if (newServerApi) {
if (useNewApiForListenerQuery) {
return grpcServerResourceId.equals(listener.getName())
&& listener.getTrafficDirection().equals(TrafficDirection.INBOUND)
&& isAddressMatching(listener.getAddress(), listenerPort);
&& listener.getTrafficDirection().equals(TrafficDirection.INBOUND)
&& isAddressMatching(listener.getAddress(), listenerPort);
}
return isAddressMatching(listener.getAddress(), 15001)
&& hasMatchingFilter(listener.getFilterChainsList());
}
private boolean isAddressMatching(Address address, int portToMatch) {
return address.hasSocketAddress() && (address.getSocketAddress().getPortValue() == portToMatch);
return address.hasSocketAddress() && address.getSocketAddress().getPortValue() == portToMatch;
}
private boolean hasMatchingFilter(List<FilterChain> filterChainsList) {

View File

@ -141,7 +141,7 @@ public class ServerXdsClientNewServerApiTest {
private ListenerWatcher listenerWatcher;
private ManagedChannel channel;
private XdsClient xdsClient;
private ServerXdsClient xdsClient;
@Before
public void setUp() throws IOException {
@ -531,6 +531,7 @@ public class ServerXdsClientNewServerApiTest {
.onNext(eq(buildDiscoveryRequest(NODE, "0",
ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
ResourceType.LDS.typeUrl(), "")));
verifyNoMoreInteractions(requestObserver);
// Management server becomes unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
@ -551,6 +552,7 @@ public class ServerXdsClientNewServerApiTest {
.onNext(eq(buildDiscoveryRequest(NODE, "0",
ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
ResourceType.LDS.typeUrl(), "")));
verifyNoMoreInteractions(requestObserver);
// Management server is still not reachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
@ -571,6 +573,7 @@ public class ServerXdsClientNewServerApiTest {
.onNext(eq(buildDiscoveryRequest(NODE, "0",
ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
ResourceType.LDS.typeUrl(), "")));
verifyNoMoreInteractions(requestObserver);
// Management server sends back a LDS response.
response = buildDiscoveryResponse("1", listeners,
@ -595,6 +598,7 @@ public class ServerXdsClientNewServerApiTest {
.onNext(eq(buildDiscoveryRequest(NODE, "1",
ImmutableList.of("test/value?udpa.resource.listening_address=192.168.3.7:7000"),
ResourceType.LDS.typeUrl(), "")));
verifyNoMoreInteractions(requestObserver);
// Management server becomes unreachable again.
responseObserver.onError(Status.UNAVAILABLE.asException());
@ -616,7 +620,7 @@ public class ServerXdsClientNewServerApiTest {
ResourceType.LDS.typeUrl(), "")));
verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
backoffPolicy2, requestObserver);
}
static Listener buildListenerWithFilterChain(String name, int portValue, String address,

View File

@ -636,6 +636,7 @@ public class ServerXdsClientTest {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "",
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(requestObserver);
final FilterChain filterChainOutbound = buildFilterChain(buildFilterChainMatch(8000), null);
final FilterChain filterChainInbound = buildFilterChain(buildFilterChainMatch(PORT,
@ -675,6 +676,7 @@ public class ServerXdsClientTest {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(requestObserver);
// Management server becomes unreachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
@ -694,6 +696,7 @@ public class ServerXdsClientTest {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(requestObserver);
// Management server is still not reachable.
responseObserver.onError(Status.UNAVAILABLE.asException());
@ -713,6 +716,7 @@ public class ServerXdsClientTest {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "0",
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(requestObserver);
// Management server sends back a LDS response.
response = buildDiscoveryResponseV2("1", listeners,
@ -736,6 +740,7 @@ public class ServerXdsClientTest {
verify(requestObserver)
.onNext(eq(buildDiscoveryRequest(getNodeToVerify(), "1",
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(requestObserver);
// Management server becomes unreachable again.
responseObserver.onError(Status.UNAVAILABLE.asException());
@ -756,7 +761,7 @@ public class ServerXdsClientTest {
ResourceType.LDS.typeUrlV2(), "")));
verifyNoMoreInteractions(mockedDiscoveryService, backoffPolicyProvider, backoffPolicy1,
backoffPolicy2);
backoffPolicy2, requestObserver);
}
static Listener buildListenerWithFilterChain(String name, int portValue, String address,