rls: fix connectivity state aggregation (#8625)

Fix connectivity state aggregation as per http://go/grpc-rls-lb-policy-design#heading=h.6e8tt7xcwcdn

> Note that, for the purposes of aggregation, when a child policy reports TRANSIENT_FAILURE, we consider it to continue to be in that state until it reports READY (i.e., we ignore CONNECTING in between the two, no matter how many times it bounces back and forth between TRANSIENT_FAILURE and CONNECTING).
This commit is contained in:
ZHANG Dapeng 2021-10-21 21:24:51 -07:00 committed by GitHub
parent 00bb283090
commit 203515dd3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 22 additions and 7 deletions

View File

@ -16,6 +16,7 @@
package io.grpc.rls;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ConnectivityState;
import javax.annotation.Nullable;
@ -35,6 +36,7 @@ interface SubchannelStateManager {
* {@code null}.
*/
@Nullable
@VisibleForTesting
ConnectivityState getState(String name);
/** Returns representative subchannel status from all registered subchannels. */

View File

@ -30,6 +30,7 @@ final class SubchannelStateManagerImpl implements SubchannelStateManager {
private final HashMap<String, ConnectivityState> stateMap = new HashMap<>();
private final Multiset<ConnectivityState> stateMultiset = HashMultiset.create();
private ConnectivityState currentState;
@Override
public void updateState(String name, ConnectivityState newState) {
@ -56,16 +57,20 @@ final class SubchannelStateManagerImpl implements SubchannelStateManager {
@Override
public ConnectivityState getAggregatedState() {
if (stateMultiset.contains(ConnectivityState.READY)) {
return ConnectivityState.READY;
currentState = ConnectivityState.READY;
} else if (stateMultiset.contains(ConnectivityState.CONNECTING)) {
return ConnectivityState.CONNECTING;
if (currentState != ConnectivityState.TRANSIENT_FAILURE) {
currentState = ConnectivityState.CONNECTING;
}
} else if (stateMultiset.contains(ConnectivityState.IDLE)) {
return ConnectivityState.IDLE;
currentState = ConnectivityState.IDLE;
} else if (stateMultiset.contains(ConnectivityState.TRANSIENT_FAILURE)) {
return ConnectivityState.TRANSIENT_FAILURE;
currentState = ConnectivityState.TRANSIENT_FAILURE;
} else {
// empty or shutdown
currentState = ConnectivityState.IDLE;
}
// empty or shutdown
return ConnectivityState.IDLE;
return currentState;
}
@Override

View File

@ -57,7 +57,7 @@ public class ChildLoadBalancerHelperTest {
.isEqualTo(ConnectivityState.TRANSIENT_FAILURE);
childLbHelper2.updateBalancingState(ConnectivityState.CONNECTING, picker2);
inOrder.verify(helper).updateBalancingState(ConnectivityState.CONNECTING, picker);
inOrder.verify(helper).updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, picker);
assertThat(subchannelStateManager.getState(target2)).isEqualTo(ConnectivityState.CONNECTING);
childLbHelper1.updateBalancingState(ConnectivityState.READY, picker1);

View File

@ -90,5 +90,13 @@ public class SubchannelStateManagerImplTest {
assertThat(subchannelStateManager.getAggregatedState())
.isEqualTo(ConnectivityState.TRANSIENT_FAILURE);
subchannelStateManager.updateState("channel1", ConnectivityState.CONNECTING);
assertThat(subchannelStateManager.getAggregatedState())
.isEqualTo(ConnectivityState.TRANSIENT_FAILURE);
subchannelStateManager.updateState("channel1", ConnectivityState.READY);
assertThat(subchannelStateManager.getAggregatedState())
.isEqualTo(ConnectivityState.READY);
}
}