mirror of https://github.com/grpc/grpc-java.git
xds: ring_hash self recover from TRANSIENT_FAILURE by attempting to connect one subchannel (#8144)
Kicks off connection for one of IDLE subchannels (if exist) when the ring_hash LB policy is reporting TRANSIENT_FAILURE to its upstream. While the ring_hash policy is reporting TRANSIENT_FAILURE, it will not be getting any pick requests from the priority policy. However, because the ring_hash policy does not attempt to reconnect to subchannels unless it is getting pick requests, it will need special handling to ensure that it will eventually recover from TRANSIENT_FAILURE state once the problem is resolved. Specifically, it will make sure that it is attempting to connect (after applicable backoff period) to at least one subchannel at any given time.
This commit is contained in:
parent
0c2d8edc4c
commit
dbc5786c30
|
|
@ -191,9 +191,59 @@ final class RingHashLoadBalancer extends LoadBalancer {
|
|||
subchannels.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the overall balancing state by aggregating the connectivity states of all subchannels.
|
||||
*
|
||||
* <p>Aggregation rules (in order of dominance):
|
||||
* <ol>
|
||||
* <li>If there is at least one subchannel in READY state, overall state is READY</li>
|
||||
* <li>If there are <em>2 or more</em> subchannels in TRANSIENT_FAILURE, overall state is
|
||||
* TRANSIENT_FAILURE</li>
|
||||
* <li>If there is at least one subchannel in CONNECTING state, overall state is
|
||||
* CONNECTING</li>
|
||||
* <li>If there is at least one subchannel in IDLE state, overall state is IDLE</li>
|
||||
* <li>Otherwise, overall state is TRANSIENT_FAILURE</li>
|
||||
* </ol>
|
||||
*/
|
||||
private void updateBalancingState() {
|
||||
checkState(!subchannels.isEmpty(), "no subchannel has been created");
|
||||
ConnectivityState overallState = aggregateState(subchannels.values());
|
||||
int failureCount = 0;
|
||||
boolean hasConnecting = false;
|
||||
Subchannel idleSubchannel = null;
|
||||
ConnectivityState overallState = null;
|
||||
for (Subchannel subchannel : subchannels.values()) {
|
||||
ConnectivityState state = getSubchannelStateInfoRef(subchannel).value.getState();
|
||||
if (state == READY) {
|
||||
overallState = READY;
|
||||
break;
|
||||
}
|
||||
if (state == TRANSIENT_FAILURE) {
|
||||
failureCount++;
|
||||
} else if (state == CONNECTING) {
|
||||
hasConnecting = true;
|
||||
} else if (state == IDLE) {
|
||||
if (idleSubchannel == null) {
|
||||
idleSubchannel = subchannel;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (overallState == null) {
|
||||
if (failureCount >= 2) {
|
||||
// This load balancer may not get any pick requests from the upstream if it's reporting
|
||||
// TRANSIENT_FAILURE. It needs to recover by itself by attempting to connect to at least
|
||||
// one subchannel that has not failed at any given time.
|
||||
if (!hasConnecting && idleSubchannel != null) {
|
||||
idleSubchannel.requestConnection();
|
||||
}
|
||||
overallState = TRANSIENT_FAILURE;
|
||||
} else if (hasConnecting) {
|
||||
overallState = CONNECTING;
|
||||
} else if (idleSubchannel != null) {
|
||||
overallState = IDLE;
|
||||
} else {
|
||||
overallState = TRANSIENT_FAILURE;
|
||||
}
|
||||
}
|
||||
RingHashPicker picker = new RingHashPicker(syncContext, ring, subchannels);
|
||||
// TODO(chengyuanzhang): avoid unnecessary reprocess caused by duplicated server addr updates
|
||||
helper.updateBalancingState(overallState, picker);
|
||||
|
|
@ -221,46 +271,6 @@ final class RingHashLoadBalancer extends LoadBalancer {
|
|||
updateBalancingState();
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregates the connectivity states of a group of subchannels for overall connectivity state.
|
||||
*
|
||||
* <p>Aggregation rules (in order of dominance):
|
||||
* <ol>
|
||||
* <li>If there is at least one subchannel in READY state, overall state is READY</li>
|
||||
* <li>If there are <em>2 or more</em> subchannels in TRANSIENT_FAILURE, overall state is
|
||||
* TRANSIENT_FAILURE</li>
|
||||
* <li>If there is at least one subchannel in CONNECTING state, overall state is
|
||||
* CONNECTING</li>
|
||||
* <li>If there is at least one subchannel in IDLE state, overall state is IDLE</li>
|
||||
* <li>Otherwise, overall state is TRANSIENT_FAILURE</li>
|
||||
* </ol>
|
||||
*/
|
||||
private static ConnectivityState aggregateState(Iterable<Subchannel> subchannels) {
|
||||
int failureCount = 0;
|
||||
boolean hasIdle = false;
|
||||
boolean hasConnecting = false;
|
||||
for (Subchannel subchannel : subchannels) {
|
||||
ConnectivityState state = getSubchannelStateInfoRef(subchannel).value.getState();
|
||||
if (state == READY) {
|
||||
return state;
|
||||
}
|
||||
if (state == TRANSIENT_FAILURE) {
|
||||
failureCount++;
|
||||
} else if (state == CONNECTING) {
|
||||
hasConnecting = true;
|
||||
} else if (state == IDLE) {
|
||||
hasIdle = true;
|
||||
}
|
||||
}
|
||||
if (failureCount >= 2) {
|
||||
return TRANSIENT_FAILURE;
|
||||
}
|
||||
if (hasConnecting) {
|
||||
return CONNECTING;
|
||||
}
|
||||
return hasIdle ? IDLE : TRANSIENT_FAILURE;
|
||||
}
|
||||
|
||||
private static void shutdownSubchannel(Subchannel subchannel) {
|
||||
subchannel.shutdown();
|
||||
getSubchannelStateInfoRef(subchannel).value = ConnectivityStateInfo.forNonError(SHUTDOWN);
|
||||
|
|
|
|||
|
|
@ -525,13 +525,15 @@ public class RingHashLoadBalancerTest {
|
|||
ConnectivityStateInfo.forTransientFailure(
|
||||
Status.PERMISSION_DENIED.withDescription("permission denied")));
|
||||
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
|
||||
verify(subchannels.get(Collections.singletonList(servers.get(3))))
|
||||
.requestConnection(); // LB attempts to recover by itself
|
||||
|
||||
PickResult result = pickerCaptor.getValue().pickSubchannel(args);
|
||||
assertThat(result.getStatus().isOk()).isFalse(); // fail the RPC
|
||||
assertThat(result.getStatus().getCode())
|
||||
.isEqualTo(Code.UNAVAILABLE); // with error status for the original server hit by hash
|
||||
assertThat(result.getStatus().getDescription()).isEqualTo("unreachable");
|
||||
verify(subchannels.get(Collections.singletonList(servers.get(3))))
|
||||
verify(subchannels.get(Collections.singletonList(servers.get(3))), times(2))
|
||||
.requestConnection(); // kickoff connection to server3 (next first non-failing)
|
||||
verify(subchannels.get(Collections.singletonList(servers.get(1))), never())
|
||||
.requestConnection(); // no excessive connection
|
||||
|
|
|
|||
Loading…
Reference in New Issue