From 86465b3399f38b7ca365cd026739ec6017799c49 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Tue, 18 May 2021 13:14:37 -0700 Subject: [PATCH] xds: cluster_resolver LB policy should wait until all clusters being resolved before propagating endpoints to child LB policy (#8176) Do not propagate partial endpoint discovery results to the child LB policy of cluster_resolver LB policy. This could avoid premature RPC failures when connections to resolved endpoints fail while there are other unresolved endpoints. Also, endpoints should be attempted in the order of clusters they belong to: endpoints from a lower-priority cluster should not be used before endpoints from a higher-priority cluster are attempted. Most importantly, it should not fallback to use DNS-resolved endpoints before all EDS-resolved endpoints failed. --- .../grpc/xds/ClusterResolverLoadBalancer.java | 47 +++--- .../xds/ClusterResolverLoadBalancerTest.java | 137 ++++++++++-------- 2 files changed, 106 insertions(+), 78 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 6e1b17cf7a..e4a848413d 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -17,11 +17,9 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; -import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME; -import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; @@ -211,30 +209,35 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { List addresses = new ArrayList<>(); Map priorityChildConfigs = new HashMap<>(); List priorities = new ArrayList<>(); // totally ordered priority list - boolean allResolved = true; + Status endpointNotFound = Status.OK; for (String cluster : clusters) { ClusterState state = clusterStates.get(cluster); - if (!state.resolved) { - allResolved = false; - continue; + // Propagate endpoints to the child LB policy only after all clusters have been resolved. + if (!state.resolved && state.status.isOk()) { + return; } if (state.result != null) { addresses.addAll(state.result.addresses); priorityChildConfigs.putAll(state.result.priorityChildConfigs); priorities.addAll(state.result.priorities); + } else { + endpointNotFound = state.status; } } if (addresses.isEmpty()) { + if (endpointNotFound.isOk()) { + endpointNotFound = Status.UNAVAILABLE.withDescription( + "No usable endpoint from cluster(s): " + clusters); + } else { + endpointNotFound = + Status.UNAVAILABLE.withCause(endpointNotFound.getCause()) + .withDescription(endpointNotFound.getDescription()); + } + helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(endpointNotFound)); if (childLb != null) { childLb.shutdown(); childLb = null; } - if (allResolved) { - Status unavailable = Status.UNAVAILABLE.withDescription("No usable endpoint"); - helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable)); - } else { - helper.updateBalancingState(CONNECTING, BUFFER_PICKER); - } return; } PriorityLbConfig childConfig = @@ -252,14 +255,15 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { private void handleEndpointResolutionError() { boolean allInError = true; + Status error = null; for (ClusterState state : clusterStates.values()) { if (state.status.isOk()) { allInError = false; + } else { + error = state.status; } } if (allInError) { - // Propagate the error status of the last cluster. This is the best we can do. - Status error = clusterStates.get(clusters.get(clusters.size() - 1)).status; if (childLb != null) { childLb.handleNameResolutionError(error); } else { @@ -581,10 +585,17 @@ final class ClusterResolverLoadBalancer extends LoadBalancer { return; } status = error; - // NameResolver.Listener API cannot distinguish transient errors, we should avoid - // waiting for DNS addresses indefinitely. - resolved = true; - handleEndpointResolutionError(); + // NameResolver.Listener API cannot distinguish between address-not-found and + // transient errors. If the error occurs in the first resolution, treat it as + // address not found. Otherwise, either there is previously resolved addresses + // previously encountered error, propagate the error to downstream/upstream and + // let downstream/upstream handle it. + if (!resolved) { + resolved = true; + handleEndpointResourceUpdate(); + } else { + handleEndpointResolutionError(); + } if (scheduledRefresh != null && scheduledRefresh.isPending()) { return; } diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index 294a33b0bf..7026cad6a4 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -290,42 +290,53 @@ public class ClusterResolverLoadBalancerTest { String priority2 = CLUSTER2 + "[priority2]"; String priority3 = CLUSTER1 + "[priority1]"; - // First deliver CLUSTER2's endpoints, two priorities with each has one locality. + // CLUSTER2: locality1 with priority 1 and locality3 with priority 2. xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME2, ImmutableMap.of(locality1, localityLbEndpoints1, locality3, localityLbEndpoints3)); + assertThat(childBalancers).isEmpty(); // not created until all clusters resolved + + // CLUSTER1: locality2 with priority 1. + xdsClient.deliverClusterLoadAssignment( + EDS_SERVICE_NAME1, Collections.singletonMap(locality2, localityLbEndpoints2)); + + // Endpoints of all clusters have been resolved. assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME); PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config; - assertThat(priorityLbConfig.priorities).containsExactly(priority1, priority2).inOrder(); - PriorityChildConfig priorityChildConfig = priorityLbConfig.childConfigs.get(priority1); - assertThat(priorityChildConfig.ignoreReresolution).isTrue(); - assertThat(priorityChildConfig.policySelection.getProvider().getPolicyName()) - .isEqualTo(CLUSTER_IMPL_POLICY_NAME); - ClusterImplConfig clusterImplConfig = - (ClusterImplConfig) priorityChildConfig.policySelection.getConfig(); - assertClusterImplConfig(clusterImplConfig, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L, - tlsContext, Collections.emptyList(), WEIGHTED_TARGET_POLICY_NAME); - WeightedTargetConfig weightedTargetConfig = - (WeightedTargetConfig) clusterImplConfig.childPolicy.getConfig(); - assertThat(weightedTargetConfig.targets.keySet()).containsExactly(locality1.toString()); - WeightedPolicySelection target = weightedTargetConfig.targets.get(locality1.toString()); - assertThat(target.weight).isEqualTo(70); - assertThat(target.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin"); + assertThat(priorityLbConfig.priorities) + .containsExactly(priority3, priority1, priority2).inOrder(); - priorityChildConfig = priorityLbConfig.childConfigs.get(priority2); - assertThat(priorityChildConfig.ignoreReresolution).isTrue(); - assertThat(priorityChildConfig.policySelection.getProvider().getPolicyName()) + PriorityChildConfig priorityChildConfig1 = priorityLbConfig.childConfigs.get(priority1); + assertThat(priorityChildConfig1.ignoreReresolution).isTrue(); + assertThat(priorityChildConfig1.policySelection.getProvider().getPolicyName()) .isEqualTo(CLUSTER_IMPL_POLICY_NAME); - clusterImplConfig = (ClusterImplConfig) priorityChildConfig.policySelection.getConfig(); - assertClusterImplConfig(clusterImplConfig, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L, + ClusterImplConfig clusterImplConfig1 = + (ClusterImplConfig) priorityChildConfig1.policySelection.getConfig(); + assertClusterImplConfig(clusterImplConfig1, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L, tlsContext, Collections.emptyList(), WEIGHTED_TARGET_POLICY_NAME); - weightedTargetConfig = (WeightedTargetConfig) clusterImplConfig.childPolicy.getConfig(); - assertThat(weightedTargetConfig.targets.keySet()).containsExactly(locality3.toString()); - target = weightedTargetConfig.targets.get(locality3.toString()); - assertThat(target.weight).isEqualTo(20); - assertThat(target.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin"); + WeightedTargetConfig weightedTargetConfig1 = + (WeightedTargetConfig) clusterImplConfig1.childPolicy.getConfig(); + assertThat(weightedTargetConfig1.targets.keySet()).containsExactly(locality1.toString()); + WeightedPolicySelection target1 = weightedTargetConfig1.targets.get(locality1.toString()); + assertThat(target1.weight).isEqualTo(70); + assertThat(target1.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin"); + + PriorityChildConfig priorityChildConfig2 = priorityLbConfig.childConfigs.get(priority2); + assertThat(priorityChildConfig2.ignoreReresolution).isTrue(); + assertThat(priorityChildConfig2.policySelection.getProvider().getPolicyName()) + .isEqualTo(CLUSTER_IMPL_POLICY_NAME); + ClusterImplConfig clusterImplConfig2 = + (ClusterImplConfig) priorityChildConfig2.policySelection.getConfig(); + assertClusterImplConfig(clusterImplConfig2, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L, + tlsContext, Collections.emptyList(), WEIGHTED_TARGET_POLICY_NAME); + WeightedTargetConfig weightedTargetConfig2 = + (WeightedTargetConfig) clusterImplConfig2.childPolicy.getConfig(); + assertThat(weightedTargetConfig2.targets.keySet()).containsExactly(locality3.toString()); + WeightedPolicySelection target2 = weightedTargetConfig2.targets.get(locality3.toString()); + assertThat(target2.weight).isEqualTo(20); + assertThat(target2.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin"); List priorityAddrs1 = AddressFilter.filter(childBalancer.addresses, priority1); assertThat(priorityAddrs1).hasSize(2); @@ -335,26 +346,20 @@ public class ClusterResolverLoadBalancerTest { assertThat(priorityAddrs2).hasSize(1); assertAddressesEqual(Collections.singletonList(endpoint4), priorityAddrs2); - // Then deliver CLUSTER1's endpoints, one priority with one locality. - xdsClient.deliverClusterLoadAssignment( - EDS_SERVICE_NAME1, Collections.singletonMap(locality2, localityLbEndpoints2)); - - priorityLbConfig = (PriorityLbConfig) childBalancer.config; - assertThat(priorityLbConfig.priorities) - .containsExactly(priority3, priority1, priority2).inOrder(); - - priorityChildConfig = priorityLbConfig.childConfigs.get(priority3); - assertThat(priorityChildConfig.ignoreReresolution).isTrue(); - assertThat(priorityChildConfig.policySelection.getProvider().getPolicyName()) + PriorityChildConfig priorityChildConfig3 = priorityLbConfig.childConfigs.get(priority3); + assertThat(priorityChildConfig3.ignoreReresolution).isTrue(); + assertThat(priorityChildConfig3.policySelection.getProvider().getPolicyName()) .isEqualTo(CLUSTER_IMPL_POLICY_NAME); - clusterImplConfig = (ClusterImplConfig) priorityChildConfig.policySelection.getConfig(); - assertClusterImplConfig(clusterImplConfig, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, + ClusterImplConfig clusterImplConfig3 = + (ClusterImplConfig) priorityChildConfig3.policySelection.getConfig(); + assertClusterImplConfig(clusterImplConfig3, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, tlsContext, Collections.emptyList(), WEIGHTED_TARGET_POLICY_NAME); - weightedTargetConfig = (WeightedTargetConfig) clusterImplConfig.childPolicy.getConfig(); - assertThat(weightedTargetConfig.targets.keySet()).containsExactly(locality2.toString()); - target = weightedTargetConfig.targets.get(locality2.toString()); - assertThat(target.weight).isEqualTo(10); - assertThat(target.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin"); + WeightedTargetConfig weightedTargetConfig3 = + (WeightedTargetConfig) clusterImplConfig3.childPolicy.getConfig(); + assertThat(weightedTargetConfig3.targets.keySet()).containsExactly(locality2.toString()); + WeightedPolicySelection target3 = weightedTargetConfig3.targets.get(locality2.toString()); + assertThat(target3.weight).isEqualTo(10); + assertThat(target3.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin"); List priorityAddrs3 = AddressFilter.filter(childBalancer.addresses, priority3); assertThat(priorityAddrs3).hasSize(1); @@ -370,16 +375,17 @@ public class ClusterResolverLoadBalancerTest { assertThat(childBalancers).isEmpty(); reset(helper); xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1); - verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); - assertThat(result.getStatus().isOk()).isTrue(); - assertThat(result.getSubchannel()).isNull(); // buffer picker expected + verify(helper, never()).updateBalancingState( + any(ConnectivityState.class), any(SubchannelPicker.class)); // wait for CLUSTER2's results xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME2); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status expectedError = Status.UNAVAILABLE.withDescription("No usable endpoint"); - assertPicker(pickerCaptor.getValue(), expectedError, null); + assertPicker( + pickerCaptor.getValue(), + Status.UNAVAILABLE.withDescription( + "No usable endpoint from cluster(s): " + Arrays.asList(CLUSTER1, CLUSTER2)), + null); } @Test @@ -413,7 +419,8 @@ public class ClusterResolverLoadBalancerTest { xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - Status expectedError = Status.UNAVAILABLE.withDescription("No usable endpoint"); + Status expectedError = Status.UNAVAILABLE.withDescription( + "No usable endpoint from cluster(s): " + Arrays.asList(CLUSTER1, CLUSTER2)); assertPicker(pickerCaptor.getValue(), expectedError, null); } @@ -507,8 +514,11 @@ public class ClusterResolverLoadBalancerTest { assertThat(childBalancers).isEmpty(); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - assertPicker(pickerCaptor.getValue(), - Status.UNAVAILABLE.withDescription("No usable endpoint"), null); + assertPicker( + pickerCaptor.getValue(), + Status.UNAVAILABLE.withDescription( + "No usable endpoint from cluster(s): " + Collections.singleton(CLUSTER1)), + null); } @Test @@ -691,10 +701,8 @@ public class ClusterResolverLoadBalancerTest { assertThat(childBalancers).isEmpty(); reset(helper); xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1); - verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); - PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); - assertThat(result.getStatus().isOk()).isTrue(); - assertThat(result.getSubchannel()).isNull(); // buffer picker expected, waiting for DNS + verify(helper, never()).updateBalancingState( + any(ConnectivityState.class), any(SubchannelPicker.class)); // wait for DNS results FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); @@ -737,7 +745,7 @@ public class ClusterResolverLoadBalancerTest { verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); assertPicker(pickerCaptor.getValue(), - Status.UNAVAILABLE.withDescription("No usable endpoint"), null); + Status.UNAVAILABLE.withDescription("I am lost"), null); } @Test @@ -756,10 +764,16 @@ public class ClusterResolverLoadBalancerTest { 10 /* localityWeight */, 1 /* priority */); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); - FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); // child LB created + assertThat(childBalancers).isEmpty(); // not created until all clusters resolved. FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); resolver.deliverError(Status.UNKNOWN.withDescription("I am lost")); + + // DNS resolution failed, but there are EDS endpoints can be used. + assertThat(childBalancers).hasSize(1); + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); // child LB created assertThat(childBalancer.upstreamError).isNull(); // should not propagate error to child LB + assertAddressesEqual(Collections.singletonList(endpoint), childBalancer.addresses); + xdsClient.deliverError(Status.RESOURCE_EXHAUSTED.withDescription("out of memory")); assertThat(childBalancer.upstreamError).isNotNull(); // last cluster's (DNS) error propagated assertThat(childBalancer.upstreamError.getCode()).isEqualTo(Code.UNKNOWN); @@ -787,7 +801,10 @@ public class ClusterResolverLoadBalancerTest { resolver.deliverError(dnsError); verify(helper).updateBalancingState( eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - assertPicker(pickerCaptor.getValue(), dnsError, null); + assertPicker( + pickerCaptor.getValue(), + Status.UNAVAILABLE.withDescription(dnsError.getDescription()), + null); } @Test