xds: XdsDepManager should ignore updates after shutdown

This prevents a NPE and subsequent channel panic when trying to build a
config (because there are no watchers, so waitingOnResource==false)
without any listener and route.
```
java.lang.NullPointerException: Cannot invoke "io.grpc.xds.XdsDependencyManager$RdsUpdateSupplier.getRdsUpdate()" because "routeSource" is null
    at io.grpc.xds.XdsDependencyManager.buildUpdate(XdsDependencyManager.java:295)
    at io.grpc.xds.XdsDependencyManager.maybePublishConfig(XdsDependencyManager.java:266)
    at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:899)
    at io.grpc.xds.XdsDependencyManager$EdsWatcher.onChanged(XdsDependencyManager.java:888)
    at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.notifyWatcher(XdsClientImpl.java:929)
    at io.grpc.xds.client.XdsClientImpl$ResourceSubscriber.lambda$onData$0(XdsClientImpl.java:837)
    at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:96)
```

I think this fully-fixes the problem today, but not tomorrow.
subscribeToCluster() is racy as well, but not yet used.

This was noticed when idleTimeout was firing, with some other code
calling getState(true) to wake the channel back up. That may have made
this panic more visible than it would be otherwise, but that has not
been investigated.

b/412474567
This commit is contained in:
Eric Anderson 2025-04-23 09:18:08 -07:00 committed by GitHub
parent 7952afdd56
commit 25199e9df9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 151 additions and 2 deletions

View File

@ -199,6 +199,7 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
watcherEntry.getValue());
watcherEntry.getValue().cancelled = true;
}
}
@ -591,6 +592,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
@Override
public void onError(Status error) {
checkNotNull(error, "error");
if (cancelled) {
return;
}
// Don't update configuration on error, if we've already received configuration
if (!hasDataValue()) {
setDataAsStatus(Status.UNAVAILABLE.withDescription(
@ -659,6 +663,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
@Override
public void onChanged(XdsListenerResource.LdsUpdate update) {
checkNotNull(update, "update");
if (cancelled) {
return;
}
HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
List<VirtualHost> virtualHosts;
@ -787,6 +794,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
@Override
public void onChanged(RdsUpdate update) {
checkNotNull(update, "update");
if (cancelled) {
return;
}
List<VirtualHost> oldVirtualHosts = hasDataValue()
? getData().getValue().virtualHosts
: Collections.emptyList();
@ -815,6 +825,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
@Override
public void onChanged(XdsClusterResource.CdsUpdate update) {
checkNotNull(update, "update");
if (cancelled) {
return;
}
switch (update.clusterType()) {
case EDS:
setData(update);
@ -895,6 +908,9 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
@Override
public void onChanged(XdsEndpointResource.EdsUpdate update) {
if (cancelled) {
return;
}
setData(checkNotNull(update, "update"));
maybePublishConfig();
}

View File

@ -41,6 +41,7 @@ import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Message;
import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
@ -65,7 +66,7 @@ import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsConfig.XdsClusterConfig;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
import io.grpc.xds.client.CommonBootstrapperTestUtils;
import io.grpc.xds.client.XdsClientImpl;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsTransportFactory;
import java.io.Closeable;
@ -115,7 +116,7 @@ public class XdsDependencyManagerTest {
});
private ManagedChannel channel;
private XdsClientImpl xdsClient;
private XdsClient xdsClient;
private XdsDependencyManager xdsDependencyManager;
private TestWatcher xdsConfigWatcher;
private Server xdsServer;
@ -715,6 +716,138 @@ public class XdsDependencyManagerTest {
assertThat(status.getDescription()).contains(XdsTestUtils.CLUSTER_NAME);
}
@Test
public void ldsUpdateAfterShutdown() {
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
@SuppressWarnings("unchecked")
XdsClient.ResourceWatcher<XdsListenerResource.LdsUpdate> resourceWatcher =
mock(XdsClient.ResourceWatcher.class);
xdsClient.watchXdsResource(
XdsListenerResource.getInstance(),
serverName,
resourceWatcher,
MoreExecutors.directExecutor());
verify(resourceWatcher, timeout(5000)).onChanged(any());
syncContext.execute(() -> {
// Shutdown before any updates. This will unsubscribe from XdsClient, but only after this
// Runnable returns
xdsDependencyManager.shutdown();
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS2", "CDS", "EDS",
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
verify(resourceWatcher, timeout(5000).times(2)).onChanged(any());
xdsClient.cancelXdsResourceWatch(
XdsListenerResource.getInstance(), serverName, resourceWatcher);
});
}
@Test
public void rdsUpdateAfterShutdown() {
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
@SuppressWarnings("unchecked")
XdsClient.ResourceWatcher<XdsRouteConfigureResource.RdsUpdate> resourceWatcher =
mock(XdsClient.ResourceWatcher.class);
xdsClient.watchXdsResource(
XdsRouteConfigureResource.getInstance(),
"RDS",
resourceWatcher,
MoreExecutors.directExecutor());
verify(resourceWatcher, timeout(5000)).onChanged(any());
syncContext.execute(() -> {
// Shutdown before any updates. This will unsubscribe from XdsClient, but only after this
// Runnable returns
xdsDependencyManager.shutdown();
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS2", "EDS",
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
verify(resourceWatcher, timeout(5000).times(2)).onChanged(any());
xdsClient.cancelXdsResourceWatch(
XdsRouteConfigureResource.getInstance(), serverName, resourceWatcher);
});
}
@Test
public void cdsUpdateAfterShutdown() {
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
@SuppressWarnings("unchecked")
XdsClient.ResourceWatcher<XdsClusterResource.CdsUpdate> resourceWatcher =
mock(XdsClient.ResourceWatcher.class);
xdsClient.watchXdsResource(
XdsClusterResource.getInstance(),
"CDS",
resourceWatcher,
MoreExecutors.directExecutor());
verify(resourceWatcher, timeout(5000)).onChanged(any());
syncContext.execute(() -> {
// Shutdown before any updates. This will unsubscribe from XdsClient, but only after this
// Runnable returns
xdsDependencyManager.shutdown();
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS2",
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
verify(resourceWatcher, timeout(5000).times(2)).onChanged(any());
xdsClient.cancelXdsResourceWatch(
XdsClusterResource.getInstance(), serverName, resourceWatcher);
});
}
@Test
public void edsUpdateAfterShutdown() {
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
ENDPOINT_HOSTNAME, ENDPOINT_PORT);
xdsDependencyManager = new XdsDependencyManager(xdsClient, xdsConfigWatcher, syncContext,
serverName, serverName, nameResolverArgs, scheduler);
verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());
@SuppressWarnings("unchecked")
XdsClient.ResourceWatcher<XdsEndpointResource.EdsUpdate> resourceWatcher =
mock(XdsClient.ResourceWatcher.class);
xdsClient.watchXdsResource(
XdsEndpointResource.getInstance(),
"EDS",
resourceWatcher,
MoreExecutors.directExecutor());
verify(resourceWatcher, timeout(5000)).onChanged(any());
syncContext.execute(() -> {
// Shutdown before any updates. This will unsubscribe from XdsClient, but only after this
// Runnable returns
xdsDependencyManager.shutdown();
XdsTestUtils.setAdsConfig(controlPlaneService, serverName, "RDS", "CDS", "EDS",
ENDPOINT_HOSTNAME + "2", ENDPOINT_PORT);
verify(resourceWatcher, timeout(5000).times(2)).onChanged(any());
xdsClient.cancelXdsResourceWatch(
XdsEndpointResource.getInstance(), serverName, resourceWatcher);
});
}
private Listener buildInlineClientListener(String rdsName, String clusterName) {
return XdsTestUtils.buildInlineClientListener(rdsName, clusterName, serverName);
}