diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
index 260b45efea..05f29e2011 100644
--- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
+++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java
@@ -191,9 +191,59 @@ final class RingHashLoadBalancer extends LoadBalancer {
subchannels.clear();
}
+ /**
+ * Updates the overall balancing state by aggregating the connectivity states of all subchannels.
+ *
+ *
Aggregation rules (in order of dominance):
+ *
+ * - If there is at least one subchannel in READY state, overall state is READY
+ * - If there are 2 or more subchannels in TRANSIENT_FAILURE, overall state is
+ * TRANSIENT_FAILURE
+ * - If there is at least one subchannel in CONNECTING state, overall state is
+ * CONNECTING
+ * - If there is at least one subchannel in IDLE state, overall state is IDLE
+ * - Otherwise, overall state is TRANSIENT_FAILURE
+ *
+ */
private void updateBalancingState() {
checkState(!subchannels.isEmpty(), "no subchannel has been created");
- ConnectivityState overallState = aggregateState(subchannels.values());
+ int failureCount = 0;
+ boolean hasConnecting = false;
+ Subchannel idleSubchannel = null;
+ ConnectivityState overallState = null;
+ for (Subchannel subchannel : subchannels.values()) {
+ ConnectivityState state = getSubchannelStateInfoRef(subchannel).value.getState();
+ if (state == READY) {
+ overallState = READY;
+ break;
+ }
+ if (state == TRANSIENT_FAILURE) {
+ failureCount++;
+ } else if (state == CONNECTING) {
+ hasConnecting = true;
+ } else if (state == IDLE) {
+ if (idleSubchannel == null) {
+ idleSubchannel = subchannel;
+ }
+ }
+ }
+ if (overallState == null) {
+ if (failureCount >= 2) {
+ // This load balancer may not get any pick requests from the upstream if it's reporting
+ // TRANSIENT_FAILURE. It needs to recover by itself by attempting to connect to at least
+ // one subchannel that has not failed at any given time.
+ if (!hasConnecting && idleSubchannel != null) {
+ idleSubchannel.requestConnection();
+ }
+ overallState = TRANSIENT_FAILURE;
+ } else if (hasConnecting) {
+ overallState = CONNECTING;
+ } else if (idleSubchannel != null) {
+ overallState = IDLE;
+ } else {
+ overallState = TRANSIENT_FAILURE;
+ }
+ }
RingHashPicker picker = new RingHashPicker(syncContext, ring, subchannels);
// TODO(chengyuanzhang): avoid unnecessary reprocess caused by duplicated server addr updates
helper.updateBalancingState(overallState, picker);
@@ -221,46 +271,6 @@ final class RingHashLoadBalancer extends LoadBalancer {
updateBalancingState();
}
- /**
- * Aggregates the connectivity states of a group of subchannels for overall connectivity state.
- *
- * Aggregation rules (in order of dominance):
- *
- * - If there is at least one subchannel in READY state, overall state is READY
- * - If there are 2 or more subchannels in TRANSIENT_FAILURE, overall state is
- * TRANSIENT_FAILURE
- * - If there is at least one subchannel in CONNECTING state, overall state is
- * CONNECTING
- * - If there is at least one subchannel in IDLE state, overall state is IDLE
- * - Otherwise, overall state is TRANSIENT_FAILURE
- *
- */
- private static ConnectivityState aggregateState(Iterable subchannels) {
- int failureCount = 0;
- boolean hasIdle = false;
- boolean hasConnecting = false;
- for (Subchannel subchannel : subchannels) {
- ConnectivityState state = getSubchannelStateInfoRef(subchannel).value.getState();
- if (state == READY) {
- return state;
- }
- if (state == TRANSIENT_FAILURE) {
- failureCount++;
- } else if (state == CONNECTING) {
- hasConnecting = true;
- } else if (state == IDLE) {
- hasIdle = true;
- }
- }
- if (failureCount >= 2) {
- return TRANSIENT_FAILURE;
- }
- if (hasConnecting) {
- return CONNECTING;
- }
- return hasIdle ? IDLE : TRANSIENT_FAILURE;
- }
-
private static void shutdownSubchannel(Subchannel subchannel) {
subchannel.shutdown();
getSubchannelStateInfoRef(subchannel).value = ConnectivityStateInfo.forNonError(SHUTDOWN);
diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java
index eaa2160c54..6b4fd2a215 100644
--- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java
+++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java
@@ -525,13 +525,15 @@ public class RingHashLoadBalancerTest {
ConnectivityStateInfo.forTransientFailure(
Status.PERMISSION_DENIED.withDescription("permission denied")));
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
+ verify(subchannels.get(Collections.singletonList(servers.get(3))))
+ .requestConnection(); // LB attempts to recover by itself
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
assertThat(result.getStatus().isOk()).isFalse(); // fail the RPC
assertThat(result.getStatus().getCode())
.isEqualTo(Code.UNAVAILABLE); // with error status for the original server hit by hash
assertThat(result.getStatus().getDescription()).isEqualTo("unreachable");
- verify(subchannels.get(Collections.singletonList(servers.get(3))))
+ verify(subchannels.get(Collections.singletonList(servers.get(3))), times(2))
.requestConnection(); // kickoff connection to server3 (next first non-failing)
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
.requestConnection(); // no excessive connection