diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index 8cd3119727..d804954ecf 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -199,6 +199,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi for (Map.Entry> watcherEntry : watchers.watchers.entrySet()) { xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(), watcherEntry.getValue()); + watcherEntry.getValue().cancelled = true; } } @@ -591,6 +592,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @Override public void onError(Status error) { checkNotNull(error, "error"); + if (cancelled) { + return; + } // Don't update configuration on error, if we've already received configuration if (!hasDataValue()) { setDataAsStatus(Status.UNAVAILABLE.withDescription( @@ -659,6 +663,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @Override public void onChanged(XdsListenerResource.LdsUpdate update) { checkNotNull(update, "update"); + if (cancelled) { + return; + } HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); List virtualHosts; @@ -787,6 +794,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @Override public void onChanged(RdsUpdate update) { checkNotNull(update, "update"); + if (cancelled) { + return; + } List oldVirtualHosts = hasDataValue() ? getData().getValue().virtualHosts : Collections.emptyList(); @@ -815,6 +825,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @Override public void onChanged(XdsClusterResource.CdsUpdate update) { checkNotNull(update, "update"); + if (cancelled) { + return; + } switch (update.clusterType()) { case EDS: setData(update); @@ -895,6 +908,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @Override public void onChanged(XdsEndpointResource.EdsUpdate update) { + if (cancelled) { + return; + } setData(checkNotNull(update, "update")); maybePublishConfig(); } diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java index 2af04a3aed..1f3d8511ec 100644 --- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -41,6 +41,7 @@ import static org.mockito.Mockito.verify; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Message; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; @@ -65,7 +66,7 @@ import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.XdsConfig.XdsClusterConfig; import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.client.CommonBootstrapperTestUtils; -import io.grpc.xds.client.XdsClientImpl; +import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsTransportFactory; import java.io.Closeable; @@ -115,7 +116,7 @@ public class XdsDependencyManagerTest { }); private ManagedChannel channel; - private XdsClientImpl xdsClient; + private XdsClient xdsClient; private XdsDependencyManager xdsDependencyManager; private TestWatcher xdsConfigWatcher; private Server xdsServer; @@ -715,6 +716,138 @@ public class XdsDependencyManagerTest { assertThat(status.getDescription()).contains(XdsTestUtils.CLUSTER_NAME); } + @Test + public void ldsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsListenerResource.getInstance(), + serverName, + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsListenerResource.getInstance(), serverName, resourceWatcher); + }); + } + + @Test + public void rdsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsRouteConfigureResource.getInstance(), + "RDS", + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS2", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsRouteConfigureResource.getInstance(), serverName, resourceWatcher); + }); + } + + @Test + public void cdsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsClusterResource.getInstance(), + "CDS", + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS2", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsClusterResource.getInstance(), serverName, resourceWatcher); + }); + } + + @Test + public void edsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsEndpointResource.getInstance(), + "EDS", + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsEndpointResource.getInstance(), serverName, resourceWatcher); + }); + } + private Listener buildInlineClientListener(String rdsName, String clusterName) { return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName); }