mirror of https://github.com/grpc/grpc-java.git
xds: have cluster balancer cancel cluster watcher only during shutdown
Previously when CdsConfig is changed, the old cluster watcher is canceled immediately even it's in graceful switch period, so the old cluster balancer won't receive any new updates. This behavior is not as good/clean as cancelling the old watch only once the old cluster balancer is shutdown.
This commit is contained in:
parent
5555ec9a35
commit
cd049ed48b
|
|
@ -59,12 +59,9 @@ public final class CdsLoadBalancer extends LoadBalancer {
|
|||
|
||||
// The following fields become non-null once handleResolvedAddresses() successfully.
|
||||
|
||||
// Most recent CdsConfig.
|
||||
// Most recent cluster name.
|
||||
@Nullable
|
||||
private CdsConfig cdsConfig;
|
||||
// Most recent ClusterWatcher.
|
||||
@Nullable
|
||||
private ClusterWatcher clusterWatcher;
|
||||
private String clusterName;
|
||||
@Nullable
|
||||
private ObjectPool<XdsClient> xdsClientPool;
|
||||
@Nullable
|
||||
|
|
@ -110,29 +107,19 @@ public final class CdsLoadBalancer extends LoadBalancer {
|
|||
}
|
||||
CdsConfig newCdsConfig = (CdsConfig) lbConfig;
|
||||
|
||||
// If CdsConfig is changed, do a graceful switch.
|
||||
if (!newCdsConfig.equals(cdsConfig)) {
|
||||
LoadBalancer.Factory fixedCdsConfigBalancerFactory =
|
||||
new FixedCdsConfigBalancerFactory(newCdsConfig);
|
||||
switchingLoadBalancer.switchTo(fixedCdsConfigBalancerFactory);
|
||||
// If cluster is changed, do a graceful switch.
|
||||
if (!newCdsConfig.name.equals(clusterName)) {
|
||||
LoadBalancer.Factory clusterBalancerFactory = new ClusterBalancerFactory(newCdsConfig.name);
|
||||
switchingLoadBalancer.switchTo(clusterBalancerFactory);
|
||||
}
|
||||
|
||||
switchingLoadBalancer.handleResolvedAddresses(resolvedAddresses);
|
||||
|
||||
// The clusterWatcher is also updated after switchingLoadBalancer.handleResolvedAddresses().
|
||||
cdsConfig = newCdsConfig;
|
||||
clusterName = newCdsConfig.name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleNameResolutionError(Status error) {
|
||||
channelLogger.log(ChannelLogLevel.ERROR, "Name resolution error: {0}", error);
|
||||
// Go into TRANSIENT_FAILURE if we have not yet received any cluster resource. Otherwise,
|
||||
// we keep running with the data we had previously.
|
||||
if (clusterWatcher == null) {
|
||||
helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(error));
|
||||
} else {
|
||||
switchingLoadBalancer.handleNameResolutionError(error);
|
||||
}
|
||||
switchingLoadBalancer.handleNameResolutionError(error);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -151,32 +138,28 @@ public final class CdsLoadBalancer extends LoadBalancer {
|
|||
}
|
||||
|
||||
/**
|
||||
* A load balancer factory that provides a load balancer for a given CdsConfig.
|
||||
* A load balancer factory that provides a load balancer for a given cluster.
|
||||
*/
|
||||
private final class FixedCdsConfigBalancerFactory extends LoadBalancer.Factory {
|
||||
private final class ClusterBalancerFactory extends LoadBalancer.Factory {
|
||||
|
||||
final CdsConfig cdsConfig;
|
||||
final CdsConfig oldCdsConfig;
|
||||
final ClusterWatcher oldClusterWatcher;
|
||||
final String clusterName;
|
||||
|
||||
FixedCdsConfigBalancerFactory(CdsConfig cdsConfig) {
|
||||
this.cdsConfig = cdsConfig;
|
||||
oldCdsConfig = CdsLoadBalancer.this.cdsConfig;
|
||||
oldClusterWatcher = CdsLoadBalancer.this.clusterWatcher;
|
||||
ClusterBalancerFactory(String clusterName) {
|
||||
this.clusterName = clusterName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (!(o instanceof FixedCdsConfigBalancerFactory)) {
|
||||
if (!(o instanceof ClusterBalancerFactory)) {
|
||||
return false;
|
||||
}
|
||||
FixedCdsConfigBalancerFactory that = (FixedCdsConfigBalancerFactory) o;
|
||||
return cdsConfig.equals(that.cdsConfig);
|
||||
ClusterBalancerFactory that = (ClusterBalancerFactory) o;
|
||||
return clusterName.equals(that.clusterName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(super.hashCode(), cdsConfig);
|
||||
return Objects.hash(super.hashCode(), clusterName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -207,7 +190,7 @@ public final class CdsLoadBalancer extends LoadBalancer {
|
|||
if (clusterWatcher.edsBalancer != null) {
|
||||
clusterWatcher.edsBalancer.shutdown();
|
||||
}
|
||||
xdsClient.cancelClusterDataWatch(cdsConfig.name, clusterWatcher);
|
||||
xdsClient.cancelClusterDataWatch(clusterName, clusterWatcher);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -215,11 +198,7 @@ public final class CdsLoadBalancer extends LoadBalancer {
|
|||
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||
if (clusterWatcher == null) {
|
||||
clusterWatcher = new ClusterWatcherImpl(helper, resolvedAddresses);
|
||||
xdsClient.watchClusterData(cdsConfig.name, clusterWatcher);
|
||||
if (oldCdsConfig != null) {
|
||||
xdsClient.cancelClusterDataWatch(oldCdsConfig.name, oldClusterWatcher);
|
||||
}
|
||||
CdsLoadBalancer.this.clusterWatcher = clusterWatcher;
|
||||
xdsClient.watchClusterData(clusterName, clusterWatcher);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
|||
|
|
@ -264,7 +264,6 @@ public class CdsLoadBalancerTest {
|
|||
|
||||
ArgumentCaptor<ClusterWatcher> clusterWatcherCaptor2 = ArgumentCaptor.forClass(null);
|
||||
verify(xdsClient).watchClusterData(eq("bar.googleapis.com"), clusterWatcherCaptor2.capture());
|
||||
verify(xdsClient).cancelClusterDataWatch("foo.googleapis.com", clusterWatcher1);
|
||||
|
||||
ClusterWatcher clusterWatcher2 = clusterWatcherCaptor2.getValue();
|
||||
clusterWatcher2.onClusterChanged(
|
||||
|
|
@ -301,6 +300,7 @@ public class CdsLoadBalancerTest {
|
|||
edsLbHelper2.updateBalancingState(ConnectivityState.READY, picker2);
|
||||
verify(helper).updateBalancingState(ConnectivityState.READY, picker2);
|
||||
verify(edsLoadBalancer1).shutdown();
|
||||
verify(xdsClient).cancelClusterDataWatch("foo.googleapis.com", clusterWatcher1);
|
||||
|
||||
clusterWatcher2.onClusterChanged(
|
||||
ClusterUpdate.newBuilder()
|
||||
|
|
|
|||
Loading…
Reference in New Issue