mirror of https://github.com/grpc/grpc-java.git
xds: XdsClient should unsubscribe on last resource (#11264)
Otherwise, the server will continue sending updates and if we re-subscribe to the last resource, the server won't re-send it. Also completely remove the per-type state, as it could only add confusion.
This commit is contained in:
parent
752a045f10
commit
19c9b998b1
|
|
@ -152,8 +152,14 @@ final class ControlPlaneClient {
|
|||
startRpcStream();
|
||||
}
|
||||
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
|
||||
if (resources != null) {
|
||||
adsStream.sendDiscoveryRequest(resourceType, resources);
|
||||
if (resources == null) {
|
||||
resources = Collections.emptyList();
|
||||
}
|
||||
adsStream.sendDiscoveryRequest(resourceType, resources);
|
||||
if (resources.isEmpty()) {
|
||||
// The resource type no longer has subscribing resources; clean up references to it
|
||||
versions.remove(resourceType);
|
||||
adsStream.respNonces.remove(resourceType);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -281,7 +281,7 @@ public final class XdsClientImpl extends XdsClient implements XdsResponseHandler
|
|||
@SuppressWarnings("unchecked")
|
||||
public void run() {
|
||||
ResourceSubscriber<T> subscriber =
|
||||
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
|
||||
(ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
|
||||
subscriber.removeWatcher(watcher);
|
||||
if (!subscriber.isWatched()) {
|
||||
subscriber.cancelResourceWatch();
|
||||
|
|
|
|||
|
|
@ -133,6 +133,7 @@ import org.mockito.invocation.InvocationOnMock;
|
|||
import org.mockito.junit.MockitoJUnit;
|
||||
import org.mockito.junit.MockitoRule;
|
||||
import org.mockito.stubbing.Answer;
|
||||
import org.mockito.verification.VerificationMode;
|
||||
|
||||
/**
|
||||
* Tests for {@link XdsClientImpl}.
|
||||
|
|
@ -2757,6 +2758,37 @@ public abstract class GrpcXdsClientImplTestBase {
|
|||
verifySubscribedResourcesMetadataSizes(0, 0, 0, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void edsCleanupNonceAfterUnsubscription() {
|
||||
Assume.assumeFalse(ignoreResourceDeletion());
|
||||
|
||||
// Suppose we have an EDS subscription A.1
|
||||
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
|
||||
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
|
||||
assertThat(call).isNotNull();
|
||||
call.verifyRequest(EDS, "A.1", "", "", NODE);
|
||||
|
||||
// EDS -> {A.1}, version 1
|
||||
List<Message> dropOverloads = ImmutableList.of();
|
||||
List<Message> endpointsV1 = ImmutableList.of(lbEndpointHealthy);
|
||||
ImmutableMap<String, Any> resourcesV1 = ImmutableMap.of(
|
||||
"A.1", Any.pack(mf.buildClusterLoadAssignment("A.1", endpointsV1, dropOverloads)));
|
||||
call.sendResponse(EDS, resourcesV1.values().asList(), VERSION_1, "0000");
|
||||
// {A.1} -> ACK, version 1
|
||||
call.verifyRequest(EDS, "A.1", VERSION_1, "0000", NODE);
|
||||
verify(edsResourceWatcher, times(1)).onChanged(any());
|
||||
|
||||
// trigger an EDS resource unsubscription.
|
||||
xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
|
||||
verifySubscribedResourcesMetadataSizes(0, 0, 0, 0);
|
||||
call.verifyRequest(EDS, Arrays.asList(), VERSION_1, "0000", NODE);
|
||||
|
||||
// When re-subscribing, the version and nonce were properly forgotten, so the request is the
|
||||
// same as the initial request
|
||||
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
|
||||
call.verifyRequest(EDS, "A.1", "", "", NODE, Mockito.timeout(2000).times(2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void edsResponseErrorHandling_allResourcesFailedUnpack() {
|
||||
DiscoveryRpcCall call = startResourceWatcher(XdsEndpointResource.getInstance(), EDS_RESOURCE,
|
||||
|
|
@ -3787,10 +3819,22 @@ public abstract class GrpcXdsClientImplTestBase {
|
|||
|
||||
protected void verifyRequest(
|
||||
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
|
||||
Node node) {
|
||||
Node node, VerificationMode verificationMode) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
protected void verifyRequest(
|
||||
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
|
||||
Node node) {
|
||||
verifyRequest(type, resources, versionInfo, nonce, node, Mockito.timeout(2000));
|
||||
}
|
||||
|
||||
protected void verifyRequest(
|
||||
XdsResourceType<?> type, String resource, String versionInfo, String nonce,
|
||||
Node node, VerificationMode verificationMode) {
|
||||
verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node, verificationMode);
|
||||
}
|
||||
|
||||
protected void verifyRequest(
|
||||
XdsResourceType<?> type, String resource, String versionInfo, String nonce, Node node) {
|
||||
verifyRequest(type, ImmutableList.of(resource), versionInfo, nonce, node);
|
||||
|
|
|
|||
|
|
@ -118,6 +118,7 @@ import org.junit.runners.Parameterized.Parameters;
|
|||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.InOrder;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.verification.VerificationMode;
|
||||
|
||||
/**
|
||||
* Tests for {@link XdsClientImpl} with protocol version v3.
|
||||
|
|
@ -205,8 +206,8 @@ public class GrpcXdsClientImplV3Test extends GrpcXdsClientImplTestBase {
|
|||
@Override
|
||||
protected void verifyRequest(
|
||||
XdsResourceType<?> type, List<String> resources, String versionInfo, String nonce,
|
||||
EnvoyProtoData.Node node) {
|
||||
verify(requestObserver, Mockito.timeout(2000)).onNext(argThat(new DiscoveryRequestMatcher(
|
||||
EnvoyProtoData.Node node, VerificationMode verificationMode) {
|
||||
verify(requestObserver, verificationMode).onNext(argThat(new DiscoveryRequestMatcher(
|
||||
node.toEnvoyProtoNode(), versionInfo, resources, type.typeUrl(), nonce, null, null)));
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue