From 25199e9df9b7f3f0db664883abb103a6abbfe120 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 23 Apr 2025 09:18:08 -0700 Subject: [PATCH] xds: XdsDepManager should ignore updates after shutdown This prevents a NPE and subsequent channel panic when trying to build a config (because there are no watchers, so waitingOnResource==false) without any listener and route. ``` java.lang.NullPointerException: Cannot invoke "io.grpc.xds.XdsDependencyManager$RdsUpdateSupplier.getRdsUpdate()" because "routeSource" is null at io.grpc.xds.XdsDependencyManager.buildUpdate(XdsDependencyManager.java:295) at io.grpc.xds.XdsDependencyManager.maybePublishConfig(XdsDependencyManager.java:266) at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:899) at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:888) at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.notifyWatcher(XdsClientImpl.java:929) at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.lambda$onData$0(XdsClientImpl.java:837) at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:96) ``` I think this fully-fixes the problem today, but not tomorrow. subscribeToCluster() is racy as well, but not yet used. This was noticed when idleTimeout was firing, with some other code calling getState(true) to wake the channel back up. That may have made this panic more visible than it would be otherwise, but that has not been investigated. b/412474567 --- .../io/grpc/xds/XdsDependencyManager.java | 16 ++ .../io/grpc/xds/XdsDependencyManagerTest.java | 137 +++++++++++++++++- 2 files changed, 151 insertions(+), 2 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java index 8cd3119727..d804954ecf 100644 --- a/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java +++ b/xds/src/main/java/io/grpc/xds/XdsDependencyManager.java @@ -199,6 +199,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi for (Map.Entry> watcherEntry : watchers.watchers.entrySet()) { xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(), watcherEntry.getValue()); + watcherEntry.getValue().cancelled = true; } } @@ -591,6 +592,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @Override public void onError(Status error) { checkNotNull(error, "error"); + if (cancelled) { + return; + } // Don't update configuration on error, if we've already received configuration if (!hasDataValue()) { setDataAsStatus(Status.UNAVAILABLE.withDescription( @@ -659,6 +663,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @Override public void onChanged(XdsListenerResource.LdsUpdate update) { checkNotNull(update, "update"); + if (cancelled) { + return; + } HttpConnectionManager httpConnectionManager = update.httpConnectionManager(); List virtualHosts; @@ -787,6 +794,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @Override public void onChanged(RdsUpdate update) { checkNotNull(update, "update"); + if (cancelled) { + return; + } List oldVirtualHosts = hasDataValue() ? getData().getValue().virtualHosts : Collections.emptyList(); @@ -815,6 +825,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @Override public void onChanged(XdsClusterResource.CdsUpdate update) { checkNotNull(update, "update"); + if (cancelled) { + return; + } switch (update.clusterType()) { case EDS: setData(update); @@ -895,6 +908,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi @Override public void onChanged(XdsEndpointResource.EdsUpdate update) { + if (cancelled) { + return; + } setData(checkNotNull(update, "update")); maybePublishConfig(); } diff --git a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java index 2af04a3aed..1f3d8511ec 100644 --- a/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java @@ -41,6 +41,7 @@ import static org.mockito.Mockito.verify; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Message; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; @@ -65,7 +66,7 @@ import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.XdsConfig.XdsClusterConfig; import io.grpc.xds.XdsEndpointResource.EdsUpdate; import io.grpc.xds.client.CommonBootstrapperTestUtils; -import io.grpc.xds.client.XdsClientImpl; +import io.grpc.xds.client.XdsClient; import io.grpc.xds.client.XdsClientMetricReporter; import io.grpc.xds.client.XdsTransportFactory; import java.io.Closeable; @@ -115,7 +116,7 @@ public class XdsDependencyManagerTest { }); private ManagedChannel channel; - private XdsClientImpl xdsClient; + private XdsClient xdsClient; private XdsDependencyManager xdsDependencyManager; private TestWatcher xdsConfigWatcher; private Server xdsServer; @@ -715,6 +716,138 @@ public class XdsDependencyManagerTest { assertThat(status.getDescription()).contains(XdsTestUtils.CLUSTER_NAME); } + @Test + public void ldsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsListenerResource.getInstance(), + serverName, + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsListenerResource.getInstance(), serverName, resourceWatcher); + }); + } + + @Test + public void rdsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsRouteConfigureResource.getInstance(), + "RDS", + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS2", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsRouteConfigureResource.getInstance(), serverName, resourceWatcher); + }); + } + + @Test + public void cdsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsClusterResource.getInstance(), + "CDS", + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS2", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsClusterResource.getInstance(), serverName, resourceWatcher); + }); + } + + @Test + public void edsUpdateAfterShutdown() { + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME, ENDPOINT_PORT); + + xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext, + serverName, serverName, nameResolverArgs, scheduler); + + verify(xdsConfigWatcher, timeout(1000)).onUpdate(any()); + + @SuppressWarnings("unchecked") + XdsClient.ResourceWatcher resourceWatcher = + mock(XdsClient.ResourceWatcher.class); + xdsClient.watchXdsResource( + XdsEndpointResource.getInstance(), + "EDS", + resourceWatcher, + MoreExecutors.directExecutor()); + verify(resourceWatcher, timeout(5000)).onChanged(any()); + + syncContext.execute(() -> { + // Shutdown before any updates. This will unsubscribe from XdsClient, but only after this + // Runnable returns + xdsDependencyManager.shutdown(); + + XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS", + ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT); + verify(resourceWatcher, timeout(5000).times(2)).onChanged(any()); + xdsClient.cancelXdsResourceWatch( + XdsEndpointResource.getInstance(), serverName, resourceWatcher); + }); + } + private Listener buildInlineClientListener(String rdsName, String clusterName) { return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName); }