xds: implement ignore_resource_deletion server feature (#9339)

As defined in the gRFC [A53: Option for Ignoring xDS Resource Deletion](https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md).

This includes semi-related changes:
* Refactor ClientXdsClientTestBase: extract verify methods for golden resources
* Parameterize ClientXdsClientV2Test and ClientXdsClientV3Test with ignoreResourceDeletion enabled and disabled
* Add FORCE_INFO and FORCE_WARNING levels to XdsLogLevel
This commit is contained in:
Sergii Tkachenko 2022-07-08 13:09:38 -07:00 committed by GitHub
parent 5f9ef98173
commit ac23d33d72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 469 additions and 155 deletions

View File

@ -62,10 +62,21 @@ public abstract class Bootstrapper {
abstract boolean useProtocolV3();
abstract boolean ignoreResourceDeletion();
@VisibleForTesting
static ServerInfo create(
String target, ChannelCredentials channelCredentials, boolean useProtocolV3) {
return new AutoValue_Bootstrapper_ServerInfo(target, channelCredentials, useProtocolV3);
return new AutoValue_Bootstrapper_ServerInfo(target, channelCredentials, useProtocolV3,
false);
}
@VisibleForTesting
static ServerInfo create(
String target, ChannelCredentials channelCredentials, boolean useProtocolV3,
boolean ignoreResourceDeletion) {
return new AutoValue_Bootstrapper_ServerInfo(target, channelCredentials, useProtocolV3,
ignoreResourceDeletion);
}
}

View File

@ -54,15 +54,22 @@ class BootstrapperImpl extends Bootstrapper {
private static final String BOOTSTRAP_CONFIG_SYS_PROPERTY = "io.grpc.xds.bootstrapConfig";
@VisibleForTesting
static String bootstrapConfigFromSysProp = System.getProperty(BOOTSTRAP_CONFIG_SYS_PROPERTY);
private static final String XDS_V3_SERVER_FEATURE = "xds_v3";
// Feature-gating environment variables.
static boolean enableFederation =
!Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"))
&& Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"));
// Client features.
@VisibleForTesting
static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING =
"envoy.lb.does_not_support_overprovisioning";
@VisibleForTesting
static final String CLIENT_FEATURE_RESOURCE_IN_SOTW = "xds.config.resource-in-sotw";
static boolean enableFederation =
!Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"))
&& Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"));
// Server features.
private static final String SERVER_FEATURE_XDS_V3 = "xds_v3";
private static final String SERVER_FEATURE_IGNORE_RESOURCE_DELETION = "ignore_resource_deletion";
private final XdsLogger logger;
private FileReader reader = LocalFileReader.INSTANCE;
@ -275,12 +282,15 @@ class BootstrapperImpl extends Bootstrapper {
}
boolean useProtocolV3 = false;
boolean ignoreResourceDeletion = false;
List<String> serverFeatures = JsonUtil.getListOfStrings(serverConfig, "server_features");
if (serverFeatures != null) {
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
useProtocolV3 = serverFeatures.contains(XDS_V3_SERVER_FEATURE);
useProtocolV3 = serverFeatures.contains(SERVER_FEATURE_XDS_V3);
ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION);
}
servers.add(ServerInfo.create(serverUri, channelCredentials, useProtocolV3));
servers.add(
ServerInfo.create(serverUri, channelCredentials, useProtocolV3, ignoreResourceDeletion));
}
return servers.build();
}

View File

@ -2156,8 +2156,7 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName);
subscriber.cancelResourceWatch();
ldsResourceSubscribers.remove(resourceName);
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.LDS);
@ -2194,8 +2193,7 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName);
subscriber.cancelResourceWatch();
rdsResourceSubscribers.remove(resourceName);
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.RDS);
@ -2232,8 +2230,7 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName);
subscriber.cancelResourceWatch();
cdsResourceSubscribers.remove(resourceName);
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.CDS);
@ -2270,8 +2267,7 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName);
subscriber.cancelResourceWatch();
edsResourceSubscribers.remove(resourceName);
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.EDS);
@ -2320,7 +2316,7 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
Bootstrapper.BootstrapInfo getBootstrapInfo() {
return bootstrapInfo;
}
@Override
public String toString() {
return logId.toString();
@ -2370,29 +2366,18 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
} else if (type == ResourceType.LDS || type == ResourceType.CDS) {
if (subscriber.data != null && invalidResources.contains(resourceName)) {
// Update is rejected but keep using the cached data.
if (type == ResourceType.LDS) {
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
if (hcm != null) {
String rdsName = hcm.rdsName();
if (rdsName != null) {
retainedResources.add(rdsName);
}
}
} else {
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
String edsName = cdsUpdate.edsServiceName();
if (edsName == null) {
edsName = cdsUpdate.clusterName();
}
retainedResources.add(edsName);
}
retainDependentResource(subscriber, retainedResources);
} else if (invalidResources.contains(resourceName)) {
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail));
} else {
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update.
subscriber.onAbsent();
// Retain any dependent resources if the resource deletion is ignored
// per bootstrap ignore_resource_deletion server feature.
if (!subscriber.absent) {
retainDependentResource(subscriber, retainedResources);
}
}
}
}
@ -2409,6 +2394,28 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
}
}
private void retainDependentResource(
ResourceSubscriber subscriber, Set<String> retainedResources) {
if (subscriber.data == null) {
return;
}
String resourceName = null;
if (subscriber.type == ResourceType.LDS) {
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
if (hcm != null) {
resourceName = hcm.rdsName();
}
} else if (subscriber.type == ResourceType.CDS) {
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
resourceName = cdsUpdate.edsServiceName();
}
if (resourceName != null) {
retainedResources.add(resourceName);
}
}
private static final class ParsedResource {
private final ResourceUpdate resourceUpdate;
private final Any rawResource;
@ -2431,15 +2438,18 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
* Tracks a single subscribed resource.
*/
private final class ResourceSubscriber {
private final ServerInfo serverInfo;
@Nullable private final ServerInfo serverInfo;
@Nullable private final AbstractXdsClient xdsChannel;
private final ResourceType type;
private final String resource;
private final Set<ResourceWatcher> watchers = new HashSet<>();
private ResourceUpdate data;
@Nullable private ResourceUpdate data;
private boolean absent;
private ScheduledHandle respTimer;
private ResourceMetadata metadata;
// Tracks whether the deletion has been ignored per bootstrap server feature.
// See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md
private boolean resourceDeletionIgnored;
@Nullable private ScheduledHandle respTimer;
@Nullable private ResourceMetadata metadata;
@Nullable private String errorDescription;
ResourceSubscriber(ResourceType type, String resource) {
@ -2533,6 +2543,21 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
}
}
void cancelResourceWatch() {
if (isWatched()) {
throw new IllegalStateException("Can't cancel resource watch with active watchers present");
}
stopTimer();
String message = "Unsubscribing {0} resource {1} from server {2}";
XdsLogLevel logLevel = XdsLogLevel.INFO;
if (resourceDeletionIgnored) {
message += " for which we previously ignored a deletion";
logLevel = XdsLogLevel.FORCE_INFO;
}
logger.log(logLevel, message, type, resource,
serverInfo != null ? serverInfo.target() : "unknown");
}
boolean isWatched() {
return !watchers.isEmpty();
}
@ -2547,6 +2572,12 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
ResourceUpdate oldData = this.data;
this.data = parsedResource.getResourceUpdate();
absent = false;
if (resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
+ "of resource for which we previously ignored a deletion: type {1} name {2}",
serverInfo != null ? serverInfo.target() : "unknown", type, resource);
resourceDeletionIgnored = false;
}
if (!Objects.equals(oldData, data)) {
for (ResourceWatcher watcher : watchers) {
notifyWatcher(watcher, data);
@ -2558,6 +2589,22 @@ final class ClientXdsClient extends XdsClient implements XdsResponseHandler, Res
if (respTimer != null && respTimer.isPending()) { // too early to conclude absence
return;
}
// Ignore deletion of State of the World resources when this feature is on,
// and the resource is reusable.
boolean ignoreResourceDeletionEnabled =
serverInfo != null && serverInfo.ignoreResourceDeletion();
boolean isStateOfTheWorld = (type == ResourceType.LDS || type == ResourceType.CDS);
if (ignoreResourceDeletionEnabled && isStateOfTheWorld && data != null) {
if (!resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_WARNING,
"xds server {0}: ignoring deletion for resource type {1} name {2}}",
serverInfo.target(), type, resource);
resourceDeletionIgnored = true;
}
return;
}
logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
if (!absent) {
data = null;

View File

@ -81,6 +81,10 @@ final class XdsLogger {
return Level.FINE;
case INFO:
return Level.FINER;
case FORCE_INFO:
return Level.INFO;
case FORCE_WARNING:
return Level.WARNING;
default:
return Level.FINEST;
}
@ -89,6 +93,11 @@ final class XdsLogger {
/**
* Log levels. See the table below for the mapping from the XdsLogger levels to
* Java logger levels.
*
* <p><b>NOTE:</b>
* Please use {@code FORCE_} levels with care, only when the message is expected to be
* surfaced to the library user. Normally libraries should minimize the usage
* of highly visible logs.
* <pre>
* +---------------------+-------------------+
* | XdsLogger Level | Java Logger Level |
@ -97,6 +106,8 @@ final class XdsLogger {
* | INFO | FINER |
* | WARNING | FINE |
* | ERROR | FINE |
* | FORCE_INFO | INFO |
* | FORCE_WARNING | WARNING |
* +---------------------+-------------------+
* </pre>
*/
@ -104,6 +115,8 @@ final class XdsLogger {
DEBUG,
INFO,
WARNING,
ERROR
ERROR,
FORCE_INFO,
FORCE_WARNING,
}
}

View File

@ -578,6 +578,8 @@ public class BootstrapperImplTest {
ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
assertThat(serverInfo.ignoreResourceDeletion()).isFalse();
// xds v2: xds v3 disabled
assertThat(serverInfo.useProtocolV3()).isFalse();
}
@ -600,9 +602,59 @@ public class BootstrapperImplTest {
ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
assertThat(serverInfo.ignoreResourceDeletion()).isFalse();
// xds_v3 enabled
assertThat(serverInfo.useProtocolV3()).isTrue();
}
@Test
public void serverFeatureIgnoreResourceDeletion() throws XdsInitializationException {
String rawData = "{\n"
+ " \"xds_servers\": [\n"
+ " {\n"
+ " \"server_uri\": \"" + SERVER_URI + "\",\n"
+ " \"channel_creds\": [\n"
+ " {\"type\": \"insecure\"}\n"
+ " ],\n"
+ " \"server_features\": [\"ignore_resource_deletion\"]\n"
+ " }\n"
+ " ]\n"
+ "}";
bootstrapper.setFileReader(createFileReader(BOOTSTRAP_FILE_PATH, rawData));
BootstrapInfo info = bootstrapper.bootstrap();
ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
// Only ignore_resource_deletion feature enabled: confirm it's on, and xds_v3 is off.
assertThat(serverInfo.useProtocolV3()).isFalse();
assertThat(serverInfo.ignoreResourceDeletion()).isTrue();
}
@Test
public void serverFeatureIgnoreResourceDeletion_xdsV3() throws XdsInitializationException {
String rawData = "{\n"
+ " \"xds_servers\": [\n"
+ " {\n"
+ " \"server_uri\": \"" + SERVER_URI + "\",\n"
+ " \"channel_creds\": [\n"
+ " {\"type\": \"insecure\"}\n"
+ " ],\n"
+ " \"server_features\": [\"xds_v3\", \"ignore_resource_deletion\"]\n"
+ " }\n"
+ " ]\n"
+ "}";
bootstrapper.setFileReader(createFileReader(BOOTSTRAP_FILE_PATH, rawData));
BootstrapInfo info = bootstrapper.bootstrap();
ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
// xds_v3 and ignore_resource_deletion features enabled: confirm both are on.
assertThat(serverInfo.useProtocolV3()).isTrue();
assertThat(serverInfo.ignoreResourceDeletion()).isTrue();
}
@Test
public void notFound() {
BootstrapperImpl.bootstrapPathFromEnvVar = null;

View File

@ -133,8 +133,9 @@ public abstract class ClientXdsClientTestBase {
private static final Node NODE = Node.newBuilder().setId(NODE_ID).build();
private static final Any FAILING_ANY = MessageFactory.FAILING_ANY;
private static final ChannelCredentials CHANNEL_CREDENTIALS = InsecureChannelCredentials.create();
private final ServerInfo lrsServerInfo =
ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, useProtocolV3());
// xDS control plane server info.
private ServerInfo xdsServerInfo;
private static final FakeClock.TaskFilter RPC_RETRY_TASK_FILTER =
new FakeClock.TaskFilter() {
@ -316,10 +317,11 @@ public abstract class ClientXdsClientTestBase {
}
};
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, useProtocolV3(),
ignoreResourceDeletion());
Bootstrapper.BootstrapInfo bootstrapInfo =
Bootstrapper.BootstrapInfo.builder()
.servers(Arrays.asList(
Bootstrapper.ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, useProtocolV3())))
.servers(Collections.singletonList(xdsServerInfo))
.node(NODE)
.authorities(ImmutableMap.of(
"authority.xds.com",
@ -365,6 +367,9 @@ public abstract class ClientXdsClientTestBase {
protected abstract boolean useProtocolV3();
/** Whether ignore_resource_deletion server feature is enabled for the given test. */
protected abstract boolean ignoreResourceDeletion();
protected abstract BindableService createAdsService();
protected abstract BindableService createLrsService();
@ -484,10 +489,73 @@ public abstract class ClientXdsClientTestBase {
}
/**
* Helper method to validate {@link XdsClient.EdsUpdate} created for the test CDS resource
* {@link ClientXdsClientTestBase#testClusterLoadAssignment}.
* Verifies the LDS update against the golden Listener with vhosts {@link #testListenerVhosts}.
*/
private void validateTestClusterLoadAssigment(EdsUpdate edsUpdate) {
private void verifyGoldenListenerVhosts(LdsUpdate ldsUpdate) {
assertThat(ldsUpdate.listener()).isNull();
HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
assertThat(hcm.rdsName()).isNull();
assertThat(hcm.virtualHosts()).hasSize(VHOST_SIZE);
verifyGoldenHcm(hcm);
}
/**
* Verifies the LDS update against the golden Listener with RDS name {@link #testListenerRds}.
*/
private void verifyGoldenListenerRds(LdsUpdate ldsUpdate) {
assertThat(ldsUpdate.listener()).isNull();
HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
assertThat(hcm.rdsName()).isEqualTo(RDS_RESOURCE);
assertThat(hcm.virtualHosts()).isNull();
verifyGoldenHcm(hcm);
}
private void verifyGoldenHcm(HttpConnectionManager hcm) {
if (useProtocolV3()) {
// The last configured filter has to be a terminal filter.
assertThat(hcm.httpFilterConfigs()).isNotNull();
assertThat(hcm.httpFilterConfigs()).hasSize(1);
assertThat(hcm.httpFilterConfigs().get(0).name).isEqualTo("terminal");
assertThat(hcm.httpFilterConfigs().get(0).filterConfig).isEqualTo(RouterFilter.ROUTER_CONFIG);
} else {
assertThat(hcm.httpFilterConfigs()).isNull();
}
}
/**
* Verifies the RDS update against the golden route config {@link #testRouteConfig}.
*/
private void verifyGoldenRouteConfig(RdsUpdate rdsUpdate) {
assertThat(rdsUpdate.virtualHosts).hasSize(VHOST_SIZE);
for (VirtualHost vhost : rdsUpdate.virtualHosts) {
assertThat(vhost.name()).contains("do not care");
assertThat(vhost.domains()).hasSize(1);
assertThat(vhost.routes()).hasSize(1);
}
}
/**
* Verifies the CDS update against the golden Round Robin Cluster {@link #testClusterRoundRobin}.
*/
private void verifyGoldenClusterRoundRobin(CdsUpdate cdsUpdate) {
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
}
/**
* Verifies the EDS update against the golden Cluster with load assignment
* {@link #testClusterLoadAssignment}.
*/
private void validateGoldenClusterLoadAssignment(EdsUpdate edsUpdate) {
assertThat(edsUpdate.clusterName).isEqualTo(EDS_RESOURCE);
assertThat(edsUpdate.dropPolicies)
.containsExactly(
@ -620,7 +688,12 @@ public abstract class ClientXdsClientTestBase {
verifyResourceMetadataAcked(LDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
verifyResourceMetadataNacked(LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT,
VERSION_2, TIME_INCREMENT * 2, errorsV2);
verifyResourceMetadataDoesNotExist(LDS, "C");
if (!ignoreResourceDeletion()) {
verifyResourceMetadataDoesNotExist(LDS, "C");
} else {
// When resource deletion is disabled, {C} stays ACKed in the previous version VERSION_1.
verifyResourceMetadataAcked(LDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
}
call.verifyRequestNack(LDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2);
// LDS -> {B, C} version 3
@ -630,7 +703,12 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(LDS, resourcesV3.values().asList(), VERSION_3, "0002");
// {A} -> does not exist
// {B, C} -> ACK, version 3
verifyResourceMetadataDoesNotExist(LDS, "A");
if (!ignoreResourceDeletion()) {
verifyResourceMetadataDoesNotExist(LDS, "A");
} else {
// When resource deletion is disabled, {A} stays ACKed in the previous version VERSION_2.
verifyResourceMetadataAcked(LDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
}
verifyResourceMetadataAcked(LDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3);
verifyResourceMetadataAcked(LDS, "C", resourcesV3.get("C"), VERSION_3, TIME_INCREMENT * 3);
call.verifyRequest(LDS, subscribedResourceNames, VERSION_3, "0002", NODE);
@ -638,7 +716,7 @@ public abstract class ClientXdsClientTestBase {
}
@Test
public void ldsResponseErrorHandling_subscribedResourceInvalid_withRdsSubscriptioin() {
public void ldsResponseErrorHandling_subscribedResourceInvalid_withRdsSubscription() {
List<String> subscribedResourceNames = ImmutableList.of("A", "B", "C");
xdsClient.watchLdsResource("A", ldsResourceWatcher);
xdsClient.watchRdsResource("A.1", rdsResourceWatcher);
@ -694,14 +772,26 @@ public abstract class ClientXdsClientTestBase {
verifyResourceMetadataNacked(
LDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 3,
errorsV2);
verifyResourceMetadataDoesNotExist(LDS, "C");
if (!ignoreResourceDeletion()) {
verifyResourceMetadataDoesNotExist(LDS, "C");
} else {
// When resource deletion is disabled, {C} stays ACKed in the previous version VERSION_1.
verifyResourceMetadataAcked(LDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
}
call.verifyRequestNack(LDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2);
// {A.1} -> does not exist
// {A.1} -> does not exist, missing from {A}
// {B.1} -> version 1
// {C.1} -> does not exist
// {C.1} -> does not exist because {C} does not exist
verifyResourceMetadataDoesNotExist(RDS, "A.1");
verifyResourceMetadataAcked(RDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2);
verifyResourceMetadataDoesNotExist(RDS, "C.1");
if (!ignoreResourceDeletion()) {
verifyResourceMetadataDoesNotExist(RDS, "C.1");
} else {
// When resource deletion is disabled, {C.1} is not deleted when {C} is deleted.
// Verify {C.1} stays in the previous version VERSION_1.
verifyResourceMetadataAcked(RDS, "C.1", resourcesV11.get("C.1"), VERSION_1,
TIME_INCREMENT * 2);
}
}
@Test
@ -712,8 +802,7 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
.hasSize(VHOST_SIZE);
verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
@ -727,8 +816,7 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0000");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
.hasSize(VHOST_SIZE);
verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
@ -742,8 +830,7 @@ public abstract class ClientXdsClientTestBase {
// Client sends an ACK LDS request.
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().rdsName())
.isEqualTo(RDS_RESOURCE);
verifyGoldenListenerRds(ldsUpdateCaptor.getValue());
assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerRds, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
@ -760,8 +847,7 @@ public abstract class ClientXdsClientTestBase {
LdsResourceWatcher watcher = mock(LdsResourceWatcher.class);
xdsClient.watchLdsResource(LDS_RESOURCE, watcher);
verify(watcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().rdsName())
.isEqualTo(RDS_RESOURCE);
verifyGoldenListenerRds(ldsUpdateCaptor.getValue());
call.verifyNoMoreRequest();
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerRds, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
@ -790,16 +876,14 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
.hasSize(VHOST_SIZE);
verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
// Updated LDS response.
call.sendResponse(LDS, testListenerRds, VERSION_2, "0001");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_2, "0001", NODE);
verify(ldsResourceWatcher, times(2)).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().rdsName())
.isEqualTo(RDS_RESOURCE);
verifyGoldenListenerRds(ldsUpdateCaptor.getValue());
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerRds, VERSION_2, TIME_INCREMENT * 2);
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
assertThat(channelForCustomAuthority).isNull();
@ -821,8 +905,7 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
call.verifyRequest(LDS, ldsResourceName, VERSION_1, "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
.hasSize(VHOST_SIZE);
verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
verifyResourceMetadataAcked(
LDS, ldsResourceName, testListenerVhosts, VERSION_1, TIME_INCREMENT);
}
@ -842,8 +925,7 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
call.verifyRequest(LDS, ldsResourceName, VERSION_1, "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
.hasSize(VHOST_SIZE);
verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
verifyResourceMetadataAcked(
LDS, ldsResourceName, testListenerVhosts, VERSION_1, TIME_INCREMENT);
}
@ -1072,6 +1154,8 @@ public abstract class ClientXdsClientTestBase {
@Test
public void ldsResourceDeleted() {
Assume.assumeFalse(ignoreResourceDeletion());
DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher);
verifyResourceMetadataRequested(LDS, LDS_RESOURCE);
@ -1079,8 +1163,7 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
.hasSize(VHOST_SIZE);
verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
@ -1092,6 +1175,46 @@ public abstract class ClientXdsClientTestBase {
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
}
/**
* When ignore_resource_deletion server feature is on, xDS client should keep the deleted listener
* on empty response, and resume the normal work when LDS contains the listener again.
* */
@Test
public void ldsResourceDeleted_ignoreResourceDeletion() {
Assume.assumeTrue(ignoreResourceDeletion());
DiscoveryRpcCall call = startResourceWatcher(LDS, LDS_RESOURCE, ldsResourceWatcher);
verifyResourceMetadataRequested(LDS, LDS_RESOURCE);
// Initial LDS response.
call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
// Empty LDS response does not delete the listener.
call.sendResponse(LDS, Collections.emptyList(), VERSION_2, "0001");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_2, "0001", NODE);
// The resource is still ACKED at VERSION_1 (no changes).
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
// onResourceDoesNotExist not called
verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE);
// Next update is correct, and contains the listener again.
call.sendResponse(LDS, testListenerVhosts, VERSION_3, "0003");
call.verifyRequest(LDS, LDS_RESOURCE, VERSION_3, "0003", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
// LDS is now ACKEd at VERSION_3.
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_3,
TIME_INCREMENT * 3);
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
verifyNoMoreInteractions(ldsResourceWatcher);
}
@Test
public void multipleLdsWatchers() {
String ldsResourceTwo = "bar.googleapis.com";
@ -1119,17 +1242,14 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(LDS, ImmutableList.of(testListenerVhosts, listenerTwo), VERSION_1, "0000");
// ldsResourceWatcher called with listenerVhosts.
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
.hasSize(VHOST_SIZE);
verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
// watcher1 called with listenerTwo.
verify(watcher1).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().rdsName())
.isEqualTo(RDS_RESOURCE);
verifyGoldenListenerRds(ldsUpdateCaptor.getValue());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts()).isNull();
// watcher2 called with listenerTwo.
verify(watcher2).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().rdsName())
.isEqualTo(RDS_RESOURCE);
verifyGoldenListenerRds(ldsUpdateCaptor.getValue());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts()).isNull();
// Metadata of both listeners is stored.
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
@ -1265,7 +1385,7 @@ public abstract class ClientXdsClientTestBase {
// Client sends an ACK RDS request.
call.verifyRequest(RDS, RDS_RESOURCE, VERSION_1, "0000", NODE);
verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 0, 1, 0);
@ -1279,7 +1399,7 @@ public abstract class ClientXdsClientTestBase {
// Client sends an ACK RDS request.
call.verifyRequest(RDS, RDS_RESOURCE, VERSION_1, "0000", NODE);
verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
assertThat(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 0, 1, 0);
@ -1296,7 +1416,7 @@ public abstract class ClientXdsClientTestBase {
RdsResourceWatcher watcher = mock(RdsResourceWatcher.class);
xdsClient.watchRdsResource(RDS_RESOURCE, watcher);
verify(watcher).onChanged(rdsUpdateCaptor.capture());
assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
call.verifyNoMoreRequest();
verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 0, 1, 0);
@ -1325,7 +1445,7 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000");
call.verifyRequest(RDS, RDS_RESOURCE, VERSION_1, "0000", NODE);
verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
// Updated RDS response.
@ -1353,23 +1473,25 @@ public abstract class ClientXdsClientTestBase {
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
call.sendResponse(LDS, testListenerRds, VERSION_1, "0000");
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().rdsName())
.isEqualTo(RDS_RESOURCE);
verifyGoldenListenerRds(ldsUpdateCaptor.getValue());
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerRds, VERSION_1, TIME_INCREMENT);
verifyResourceMetadataRequested(RDS, RDS_RESOURCE);
verifySubscribedResourcesMetadataSizes(1, 0, 1, 0);
call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000");
verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerRds, VERSION_1, TIME_INCREMENT);
verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT * 2);
verifySubscribedResourcesMetadataSizes(1, 0, 1, 0);
// The Listener is getting replaced configured with an RDS name, to the one configured with
// vhosts. Expect the RDS resources to be discarded.
// Note that this must work the same despite the ignore_resource_deletion feature is on.
// This happens because the Listener is getting replaced, and not deleted.
call.sendResponse(LDS, testListenerVhosts, VERSION_2, "0001");
verify(ldsResourceWatcher, times(2)).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
.hasSize(VHOST_SIZE);
verifyGoldenListenerVhosts(ldsUpdateCaptor.getValue());
verify(rdsResourceWatcher).onResourceDoesNotExist(RDS_RESOURCE);
verifyResourceMetadataDoesNotExist(RDS, RDS_RESOURCE);
verifyResourceMetadataAcked(
@ -1413,11 +1535,13 @@ public abstract class ClientXdsClientTestBase {
// Simulates receiving the requested RDS resource.
call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000");
verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT * 2);
// Simulates receiving an updated version of the requested LDS resource as a TCP listener
// with a filter chain containing inlined RouteConfiguration.
// Note that this must work the same despite the ignore_resource_deletion feature is on.
// This happens because the Listener is getting replaced, and not deleted.
hcmFilter = mf.buildHttpConnectionManagerFilter(
null,
mf.buildRouteConfiguration(
@ -1466,7 +1590,7 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000");
verify(rdsResourceWatcher).onChanged(rdsUpdateCaptor.capture());
assertThat(rdsUpdateCaptor.getValue().virtualHosts).hasSize(VHOST_SIZE);
verifyGoldenRouteConfig(rdsUpdateCaptor.getValue());
verifyNoMoreInteractions(watcher1, watcher2);
verifyResourceMetadataAcked(RDS, RDS_RESOURCE, testRouteConfig, VERSION_1, TIME_INCREMENT);
verifyResourceMetadataDoesNotExist(RDS, rdsResourceTwo);
@ -1598,7 +1722,12 @@ public abstract class ClientXdsClientTestBase {
verifyResourceMetadataAcked(CDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
verifyResourceMetadataNacked(CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT,
VERSION_2, TIME_INCREMENT * 2, errorsV2);
verifyResourceMetadataDoesNotExist(CDS, "C");
if (!ignoreResourceDeletion()) {
verifyResourceMetadataDoesNotExist(CDS, "C");
} else {
// When resource deletion is disabled, {C} stays ACKed in the previous version VERSION_1.
verifyResourceMetadataAcked(CDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
}
call.verifyRequestNack(CDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2);
// CDS -> {B, C} version 3
@ -1612,7 +1741,12 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(CDS, resourcesV3.values().asList(), VERSION_3, "0002");
// {A} -> does not exit
// {B, C} -> ACK, version 3
verifyResourceMetadataDoesNotExist(CDS, "A");
if (!ignoreResourceDeletion()) {
verifyResourceMetadataDoesNotExist(CDS, "A");
} else {
// When resource deletion is disabled, {A} stays ACKed in the previous version VERSION_2.
verifyResourceMetadataAcked(CDS, "A", resourcesV2.get("A"), VERSION_2, TIME_INCREMENT * 2);
}
verifyResourceMetadataAcked(CDS, "B", resourcesV3.get("B"), VERSION_3, TIME_INCREMENT * 3);
verifyResourceMetadataAcked(CDS, "C", resourcesV3.get("C"), VERSION_3, TIME_INCREMENT * 3);
call.verifyRequest(CDS, subscribedResourceNames, VERSION_3, "0002", NODE);
@ -1684,14 +1818,26 @@ public abstract class ClientXdsClientTestBase {
verifyResourceMetadataNacked(
CDS, "B", resourcesV1.get("B"), VERSION_1, TIME_INCREMENT, VERSION_2, TIME_INCREMENT * 3,
errorsV2);
verifyResourceMetadataDoesNotExist(CDS, "C");
if (!ignoreResourceDeletion()) {
verifyResourceMetadataDoesNotExist(CDS, "C");
} else {
// When resource deletion is disabled, {C} stays ACKed in the previous version VERSION_1.
verifyResourceMetadataAcked(CDS, "C", resourcesV1.get("C"), VERSION_1, TIME_INCREMENT);
}
call.verifyRequestNack(CDS, subscribedResourceNames, VERSION_1, "0001", NODE, errorsV2);
// {A.1} -> does not exist
// {A.1} -> does not exist, missing from {A}
// {B.1} -> version 1
// {C.1} -> does not exist
// {C.1} -> does not exist because {C} does not exist
verifyResourceMetadataDoesNotExist(EDS, "A.1");
verifyResourceMetadataAcked(EDS, "B.1", resourcesV11.get("B.1"), VERSION_1, TIME_INCREMENT * 2);
verifyResourceMetadataDoesNotExist(EDS, "C.1");
if (!ignoreResourceDeletion()) {
verifyResourceMetadataDoesNotExist(EDS, "C.1");
} else {
// When resource deletion is disabled, {C.1} is not deleted when {C} is deleted.
// Verify {C.1} stays in the previous version VERSION_1.
verifyResourceMetadataAcked(EDS, "C.1", resourcesV11.get("C.1"), VERSION_1,
TIME_INCREMENT * 2);
}
}
@Test
@ -1702,18 +1848,7 @@ public abstract class ClientXdsClientTestBase {
// Client sent an ACK CDS request.
call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
verifyGoldenClusterRoundRobin(cdsUpdateCaptor.getValue());
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
TIME_INCREMENT);
@ -1728,18 +1863,7 @@ public abstract class ClientXdsClientTestBase {
// Client sent an ACK CDS request.
call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
verifyGoldenClusterRoundRobin(cdsUpdateCaptor.getValue());
assertThat(fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty();
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
TIME_INCREMENT);
@ -1992,18 +2116,7 @@ public abstract class ClientXdsClientTestBase {
CdsResourceWatcher watcher = mock(CdsResourceWatcher.class);
xdsClient.watchCdsResource(CDS_RESOURCE, watcher);
verify(watcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
verifyGoldenClusterRoundRobin(cdsUpdateCaptor.getValue());
call.verifyNoMoreRequest();
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
TIME_INCREMENT);
@ -2070,7 +2183,7 @@ public abstract class ClientXdsClientTestBase {
childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(xdsServerInfo);
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, clusterEds, VERSION_2, TIME_INCREMENT * 2);
@ -2126,6 +2239,8 @@ public abstract class ClientXdsClientTestBase {
@Test
public void cdsResourceDeleted() {
Assume.assumeFalse(ignoreResourceDeletion());
DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher);
verifyResourceMetadataRequested(CDS, CDS_RESOURCE);
@ -2133,18 +2248,7 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(CDS, testClusterRoundRobin, VERSION_1, "0000");
call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE);
assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS);
assertThat(cdsUpdate.edsServiceName()).isNull();
LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig());
assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental");
List<LbConfig> childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isNull();
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
verifyGoldenClusterRoundRobin(cdsUpdateCaptor.getValue());
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
@ -2157,6 +2261,48 @@ public abstract class ClientXdsClientTestBase {
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
}
/**
* When ignore_resource_deletion server feature is on, xDS client should keep the deleted cluster
* on empty response, and resume the normal work when CDS contains the cluster again.
* */
@Test
public void cdsResourceDeleted_ignoreResourceDeletion() {
Assume.assumeTrue(ignoreResourceDeletion());
DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher);
verifyResourceMetadataRequested(CDS, CDS_RESOURCE);
// Initial CDS response.
call.sendResponse(CDS, testClusterRoundRobin, VERSION_1, "0000");
call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
verifyGoldenClusterRoundRobin(cdsUpdateCaptor.getValue());
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
// Empty LDS response does not delete the cluster.
call.sendResponse(CDS, Collections.emptyList(), VERSION_2, "0001");
call.verifyRequest(CDS, CDS_RESOURCE, VERSION_2, "0001", NODE);
// The resource is still ACKED at VERSION_1 (no changes).
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
// onResourceDoesNotExist must not be called.
verify(cdsResourceWatcher, never()).onResourceDoesNotExist(CDS_RESOURCE);
// Next update is correct, and contains the cluster again.
call.sendResponse(CDS, testClusterRoundRobin, VERSION_3, "0003");
call.verifyRequest(CDS, CDS_RESOURCE, VERSION_3, "0003", NODE);
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
verifyGoldenClusterRoundRobin(cdsUpdateCaptor.getValue());
verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_3,
TIME_INCREMENT * 3);
verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
verifyNoMoreInteractions(cdsResourceWatcher);
}
@Test
public void multipleCdsWatchers() {
String cdsResourceTwo = "cluster-bar.googleapis.com";
@ -2211,7 +2357,7 @@ public abstract class ClientXdsClientTestBase {
childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(xdsServerInfo);
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
verify(watcher2).onChanged(cdsUpdateCaptor.capture());
@ -2224,7 +2370,7 @@ public abstract class ClientXdsClientTestBase {
childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(
JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy"));
assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin");
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(xdsServerInfo);
assertThat(cdsUpdate.maxConcurrentRequests()).isNull();
assertThat(cdsUpdate.upstreamTlsContext()).isNull();
// Metadata of both clusters is stored.
@ -2369,7 +2515,7 @@ public abstract class ClientXdsClientTestBase {
// Client sent an ACK EDS request.
call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE);
verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
validateTestClusterLoadAssigment(edsUpdateCaptor.getValue());
validateGoldenClusterLoadAssignment(edsUpdateCaptor.getValue());
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1,
TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
@ -2383,7 +2529,7 @@ public abstract class ClientXdsClientTestBase {
// Client sent an ACK EDS request.
call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE);
verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
validateTestClusterLoadAssigment(edsUpdateCaptor.getValue());
validateGoldenClusterLoadAssignment(edsUpdateCaptor.getValue());
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1,
TIME_INCREMENT);
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
@ -2400,7 +2546,7 @@ public abstract class ClientXdsClientTestBase {
EdsResourceWatcher watcher = mock(EdsResourceWatcher.class);
xdsClient.watchEdsResource(EDS_RESOURCE, watcher);
verify(watcher).onChanged(edsUpdateCaptor.capture());
validateTestClusterLoadAssigment(edsUpdateCaptor.getValue());
validateGoldenClusterLoadAssignment(edsUpdateCaptor.getValue());
call.verifyNoMoreRequest();
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1,
TIME_INCREMENT);
@ -2430,7 +2576,7 @@ public abstract class ClientXdsClientTestBase {
call.verifyRequest(EDS, EDS_RESOURCE, VERSION_1, "0000", NODE);
verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
EdsUpdate edsUpdate = edsUpdateCaptor.getValue();
validateTestClusterLoadAssigment(edsUpdate);
validateGoldenClusterLoadAssignment(edsUpdate);
verifyResourceMetadataAcked(EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1,
TIME_INCREMENT);
@ -2505,7 +2651,7 @@ public abstract class ClientXdsClientTestBase {
verify(cdsWatcher).onChanged(cdsUpdateCaptor.capture());
CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.edsServiceName()).isEqualTo(null);
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo);
assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(xdsServerInfo);
verify(cdsResourceWatcher).onChanged(cdsUpdateCaptor.capture());
cdsUpdate = cdsUpdateCaptor.getValue();
assertThat(cdsUpdate.edsServiceName()).isEqualTo(EDS_RESOURCE);
@ -2553,7 +2699,10 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(CDS, clusters, VERSION_2, "0001");
verify(cdsResourceWatcher, times(2)).onChanged(cdsUpdateCaptor.capture());
assertThat(cdsUpdateCaptor.getValue().edsServiceName()).isNull();
// Note that the endpoint must be deleted even if the ignore_resource_deletion feature.
// This happens because the cluster CDS_RESOURCE is getting replaced, and not deleted.
verify(edsResourceWatcher).onResourceDoesNotExist(EDS_RESOURCE);
verify(edsResourceWatcher, never()).onResourceDoesNotExist(resource);
verifyNoMoreInteractions(cdsWatcher, edsWatcher);
verifyResourceMetadataDoesNotExist(EDS, EDS_RESOURCE);
verifyResourceMetadataAcked(
@ -2588,7 +2737,7 @@ public abstract class ClientXdsClientTestBase {
call.sendResponse(EDS, testClusterLoadAssignment, VERSION_1, "0000");
verify(edsResourceWatcher).onChanged(edsUpdateCaptor.capture());
EdsUpdate edsUpdate = edsUpdateCaptor.getValue();
validateTestClusterLoadAssigment(edsUpdate);
validateGoldenClusterLoadAssignment(edsUpdate);
verifyNoMoreInteractions(watcher1, watcher2);
verifyResourceMetadataAcked(
EDS, EDS_RESOURCE, testClusterLoadAssignment, VERSION_1, TIME_INCREMENT);
@ -2848,7 +2997,7 @@ public abstract class ClientXdsClientTestBase {
public void reportLoadStatsToServer() {
xdsClient.watchLdsResource(LDS_RESOURCE, ldsResourceWatcher);
String clusterName = "cluster-foo.googleapis.com";
ClusterDropStats dropStats = xdsClient.addClusterDropStats(lrsServerInfo, clusterName, null);
ClusterDropStats dropStats = xdsClient.addClusterDropStats(xdsServerInfo, clusterName, null);
LrsRpcCall lrsCall = loadReportCalls.poll();
lrsCall.verifyNextReportClusters(Collections.<String[]>emptyList()); // initial LRS request

View File

@ -100,16 +100,27 @@ import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
/**
* Tests for {@link ClientXdsClient} with protocol version v2.
*/
@RunWith(JUnit4.class)
@RunWith(Parameterized.class)
public class ClientXdsClientV2Test extends ClientXdsClientTestBase {
/** Parameterized test cases. */
@Parameters(name = "ignoreResourceDeletion={0}")
public static Iterable<? extends Boolean> data() {
return ImmutableList.of(false, true);
}
@Parameter
public boolean ignoreResourceDeletion;
@Override
protected BindableService createAdsService() {
return new AggregatedDiscoveryServiceImplBase() {
@ -168,6 +179,11 @@ public class ClientXdsClientV2Test extends ClientXdsClientTestBase {
return false;
}
@Override
protected boolean ignoreResourceDeletion() {
return ignoreResourceDeletion;
}
private static class DiscoveryRpcCallV2 extends DiscoveryRpcCall {
StreamObserver<DiscoveryRequest> requestObserver;
StreamObserver<DiscoveryResponse> responseObserver;

View File

@ -108,16 +108,27 @@ import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
/**
* Tests for {@link ClientXdsClient} with protocol version v3.
*/
@RunWith(JUnit4.class)
@RunWith(Parameterized.class)
public class ClientXdsClientV3Test extends ClientXdsClientTestBase {
/** Parameterized test cases. */
@Parameters(name = "ignoreResourceDeletion={0}")
public static Iterable<? extends Boolean> data() {
return ImmutableList.of(false, true);
}
@Parameter
public boolean ignoreResourceDeletion;
@Override
protected BindableService createAdsService() {
return new AggregatedDiscoveryServiceImplBase() {
@ -176,6 +187,11 @@ public class ClientXdsClientV3Test extends ClientXdsClientTestBase {
return true;
}
@Override
protected boolean ignoreResourceDeletion() {
return ignoreResourceDeletion;
}
private static class DiscoveryRpcCallV3 extends DiscoveryRpcCall {
StreamObserver<DiscoveryRequest> requestObserver;
StreamObserver<DiscoveryResponse> responseObserver;