xds: XdsClient should unsubscribe on last resource (#11264)

Otherwise, the server will continue sending updates and if we
re-subscribe to the last resource, the server won't re-send it. Also
completely remove the per-type state, as it could only add confusion.
This commit is contained in:
Jiajing LU 2024-07-30 23:46:01 +08:00 committed by Eric Anderson
parent 752a045f10
commit 19c9b998b1
4 changed files with 57 additions and 6 deletions

View File

@ -152,8 +152,14 @@ final class ControlPlaneClient {
startRpcStream();
}
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
if (resources != null) {
adsStream.sendDiscoveryRequest(resourceType, resources);
if (resources == null) {
resources = Collections.emptyList();
}
adsStream.sendDiscoveryRequest(resourceType, resources);
if (resources.isEmpty()) {
// The resource type no longer has subscribing resources; clean up references to it
versions.remove(resourceType);
adsStream.respNonces.remove(resourceType);
}
}

View File

@ -281,7 +281,7 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
@SuppressWarnings("unchecked")
public void run() {
ResourceSubscriber<T> subscriber =
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.cancelResourceWatch();

View File

@ -133,6 +133,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
/**
* Tests for {@link XdsClientImpl}.
@ -2757,6 +2758,37 @@ public abstract class GrpcXdsClientImplTestBase {
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
}
@Test
public void edsCleanupNonceAfterUnsubscription() {
Assume.assumeFalse(ignoreResourceDeletion());
// Suppose we have an EDS subscription A.1
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
assertThat(call).isNotNull();
call.verifyRequest(EDS, "A.1", "", "", NODE);
// EDS -> {A.1}, version 1
List<Message> dropOverloads = ImmutableList.of();
List<Message> endpointsV1 = ImmutableList.of(lbEndpointHealthy);
ImmutableMap<String, Any> resourcesV1 = ImmutableMap.of(
"A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads)));
call.sendResponse(EDS, resourcesV1.values().asList(), VERSION_1, "0000");
// {A.1} -> ACK, version 1
call.verifyRequest(EDS, "A.1", VERSION_1, "0000", NODE);
verify(edsResourceWatcher, times(1)).onChanged(any());
// trigger an EDS resource unsubscription.
xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
verifySubscribedResourcesMetadataSizes(0, 0, 0, 0);
call.verifyRequest(EDS, Arrays.asList(), VERSION_1, "0000", NODE);
// When re-subscribing, the version and nonce were properly forgotten, so the request is the
// same as the initial request
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
call.verifyRequest(EDS, "A.1", "", "", NODE, Mockito.timeout(2000).times(2));
}
@Test
public void edsResponseErrorHandling_allResourcesFailedUnpack() {
DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE,
@ -3787,10 +3819,22 @@ public abstract class GrpcXdsClientImplTestBase {
protected void verifyRequest(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
Node node) {
Node node, VerificationMode verificationMode) {
throw new UnsupportedOperationException();
}
protected void verifyRequest(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
Node node) {
verifyRequest(type, resources, versionInfo, nonce, node, Mockito.timeout(2000));
}
protected void verifyRequest(
XdsResourceType<?> type, String resource, String versionInfo, String nonce,
Node node, VerificationMode verificationMode) {
verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node, verificationMode);
}
protected void verifyRequest(
XdsResourceType<?> type, String resource, String versionInfo, String nonce, Node node) {
verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node);

View File

@ -118,6 +118,7 @@ import org.junit.runners.Parameterized.Parameters;
import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
/**
* Tests for {@link XdsClientImpl} with protocol version v3.
@ -205,8 +206,8 @@ public class GrpcXdsClientImplV3Test extends GrpcXdsClientImplTestBase {
@Override
protected void verifyRequest(
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
EnvoyProtoData.Node node) {
verify(requestObserver, Mockito.timeout(2000)).onNext(argThat(new DiscoveryRequestMatcher(
EnvoyProtoData.Node node, VerificationMode verificationMode) {
verify(requestObserver, verificationMode).onNext(argThat(new DiscoveryRequestMatcher(
node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce, null, null)));
}