grpclb: enter fallback mode immediately when balancer and all backend… (#4007)

grpclb: enter fallback mode immediately when balancer and all backend connections are lost

Changed according to updated spec.
This commit is contained in:
Kun Zhang 2018-02-07 14:42:00 -08:00 committed by GitHub
parent de54a4cb49
commit f44fc50310
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 82 additions and 99 deletions

View File

@ -101,7 +101,7 @@ final class GrpclbState {
private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
Attributes.Key.of("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo");
// Reset to null when timer expires or cancelled.
// Scheduled only once. Never reset.
@Nullable
private FallbackModeTask fallbackTimer;
private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList();
@ -146,7 +146,7 @@ final class GrpclbState {
subchannel.requestConnection();
}
subchannel.getAttributes().get(STATE_INFO).set(newState);
maybeStartFallbackTimer();
maybeUseFallbackBackends();
maybeUpdatePicker();
}
@ -171,7 +171,12 @@ final class GrpclbState {
startLbRpc();
}
fallbackBackendList = newBackendServers;
maybeStartFallbackTimer();
// Start the fallback timer if it's never started
if (fallbackTimer == null) {
logger.log(Level.FINE, "[{0}] Starting fallback timer.", new Object[] {logId});
fallbackTimer = new FallbackModeTask();
fallbackTimer.schedule();
}
if (usingFallbackBackends) {
// Populate the new fallback backends to round-robin list.
useFallbackBackends();
@ -179,22 +184,16 @@ final class GrpclbState {
maybeUpdatePicker();
}
/**
* Start the fallback timer if it's not already started and all connections are lost.
*/
private void maybeStartFallbackTimer() {
if (fallbackTimer != null) {
return;
}
if (fallbackBackendList.isEmpty()) {
return;
}
private void maybeUseFallbackBackends() {
if (balancerWorking) {
return;
}
if (usingFallbackBackends) {
return;
}
if (fallbackTimer != null && !fallbackTimer.discarded) {
return;
}
int numReadySubchannels = 0;
for (Subchannel subchannel : subchannels.values()) {
if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) {
@ -204,9 +203,8 @@ final class GrpclbState {
if (numReadySubchannels > 0) {
return;
}
logger.log(Level.FINE, "[{0}] Starting fallback timer.", new Object[] {logId});
fallbackTimer = new FallbackModeTask();
fallbackTimer.schedule();
// Fallback contiditions met
useFallbackBackends();
}
/**
@ -275,7 +273,6 @@ final class GrpclbState {
private void cancelFallbackTimer() {
if (fallbackTimer != null) {
fallbackTimer.cancel();
fallbackTimer = null;
}
}
@ -358,28 +355,24 @@ final class GrpclbState {
@VisibleForTesting
class FallbackModeTask implements Runnable {
private ScheduledFuture<?> scheduledFuture;
// If the scheduledFuture is cancelled after the task has made it into the ChannelExecutor, the
// task will be started anyway. Use this boolean to signal that the task should not be run.
private boolean cancelled;
private boolean discarded;
@Override
public void run() {
helper.runSerialized(new Runnable() {
@Override
public void run() {
if (!cancelled) {
checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch");
fallbackTimer = null;
useFallbackBackends();
maybeUpdatePicker();
}
checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch");
discarded = true;
maybeUseFallbackBackends();
maybeUpdatePicker();
}
});
}
void cancel() {
discarded = true;
scheduledFuture.cancel(false);
cancelled = true;
}
void schedule() {
@ -556,7 +549,8 @@ final class GrpclbState {
cleanUp();
propagateError(error);
balancerWorking = false;
maybeStartFallbackTimer();
maybeUseFallbackBackends();
maybeUpdatePicker();
startLbRpc();
}

View File

@ -373,8 +373,8 @@ public class GrpclbLoadBalancerTest {
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
// No backend address from resolver. Fallback timer is not started.
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
// Fallback timer is started as soon as address is resolved.
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
@ -1022,6 +1022,9 @@ public class GrpclbLoadBalancerTest {
.set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build();
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
// Fallback timer is started as soon as the addresses are resolved.
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy());
assertNull(balancer.getDelegate());
verify(helper).createOobChannel(addrsEq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
@ -1036,9 +1039,6 @@ public class GrpclbLoadBalancerTest {
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
// No backend address from resolver. Fallback timer is not started.
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
// Simulate receiving LB response
List<ServerEntry> backends1 = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "token0001"),
@ -1209,8 +1209,7 @@ public class GrpclbLoadBalancerTest {
subtestGrpclbFallbackInitialTimeout(true);
}
// Fallback within the period of the initial timeout, where the server list is not received from
// the balancer.
// Fallback or not within the period of the initial timeout.
private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) {
long loadReportIntervalMillis = 1983;
InOrder helperInOrder = inOrder(helper);
@ -1247,6 +1246,29 @@ public class GrpclbLoadBalancerTest {
fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS);
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
/////////////////////////////////////////////
// Break the LB stream before timer expires
/////////////////////////////////////////////
Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
lbResponseObserver.onError(streamError.asException());
// Not in fallback mode. The error will be propagated.
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.dropList).isEmpty();
ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList);
Status status = errorEntry.result.getStatus();
assertThat(status.getCode()).isEqualTo(streamError.getCode());
assertThat(status.getDescription()).contains(streamError.getDescription());
// A new stream is created
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
//////////////////////////////////
// Fallback timer expires (or not)
//////////////////////////////////
@ -1301,37 +1323,26 @@ public class GrpclbLoadBalancerTest {
helperInOrder, helper, Arrays.asList(resolutionList.get(1), resolutionList.get(2)));
}
///////////////////////
// Break the LB stream
///////////////////////
Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
lbResponseObserver.onError(streamError.asException());
////////////////////////////////////////////////
// Break the LB stream after the timer expires
////////////////////////////////////////////////
if (timerExpires) {
lbResponseObserver.onError(streamError.asException());
// The error will NOT propagate to picker because fallback list is in use.
helperInOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
} else {
// Not in fallback mode. The error will be propagated.
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.dropList).isEmpty();
ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList);
Status status = errorEntry.result.getStatus();
assertThat(status.getCode()).isEqualTo(streamError.getCode());
assertThat(status.getDescription()).contains(streamError.getDescription());
// A new stream is created
verify(mockLbService, times(3)).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
}
// A new stream is created
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
lbResponseObserver = lbResponseObserverCaptor.getValue();
assertEquals(1, lbRequestObservers.size());
lbRequestObserver = lbRequestObservers.poll();
verify(lbRequestObserver).onNext(
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
.build()));
/////////////////////////////////
// Balancer returns a server list
/////////////////////////////////
@ -1359,28 +1370,23 @@ public class GrpclbLoadBalancerTest {
}
@Test
public void grpclbFallback_balancerLost_timerExpires() {
subtestGrpclbFallbackConnectionLost(true, false, true);
public void grpclbFallback_balancerLost() {
subtestGrpclbFallbackConnectionLost(true, false);
}
@Test
public void grpclbFallback_subchannelsLost_timerExpires() {
subtestGrpclbFallbackConnectionLost(false, true, true);
public void grpclbFallback_subchannelsLost() {
subtestGrpclbFallbackConnectionLost(false, true);
}
@Test
public void grpclbFallback_allLost_timerExpires() {
subtestGrpclbFallbackConnectionLost(true, true, true);
}
@Test
public void grpclbFallback_allLost_ResumeBeforeTimerExpires() {
subtestGrpclbFallbackConnectionLost(true, true, false);
public void grpclbFallback_allLost() {
subtestGrpclbFallbackConnectionLost(true, true);
}
// Fallback outside of the initial timeout, where all connections are lost.
private void subtestGrpclbFallbackConnectionLost(
boolean balancerBroken, boolean allSubchannelsBroken, boolean timerExpires) {
boolean balancerBroken, boolean allSubchannelsBroken) {
long loadReportIntervalMillis = 1983;
InOrder inOrder = inOrder(helper, mockLbService);
@ -1419,9 +1425,6 @@ public class GrpclbLoadBalancerTest {
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(serverList));
// No fallback timer scheduled
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
List<Subchannel> subchannels =
fallbackTestVerifyUseOfBalancerBackendLists(inOrder, helper, serverList);
@ -1442,24 +1445,14 @@ public class GrpclbLoadBalancerTest {
}
if (balancerBroken && allSubchannelsBroken) {
// Fallback timer is scheduled if all connections are lost.
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
if (timerExpires) {
fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
// Going into fallback
subchannels = fallbackTestVerifyUseOfFallbackBackendLists(
inOrder, helper, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
// Going into fallback
subchannels = fallbackTestVerifyUseOfFallbackBackendLists(
inOrder, helper, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
// When in fallback mode, fallback timer should not be scheduled when all backend
// connections are lost
for (Subchannel subchannel : subchannels) {
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
}
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
} else {
fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS);
// When in fallback mode, fallback timer should not be scheduled when all backend
// connections are lost
for (Subchannel subchannel : subchannels) {
deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE));
}
// Exit fallback mode or cancel fallback timer when receiving a new server list from balancer
@ -1469,15 +1462,11 @@ public class GrpclbLoadBalancerTest {
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(serverList2));
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
if (timerExpires) {
fallbackTestVerifyUseOfBalancerBackendLists(inOrder, helper, serverList2);
}
} else {
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
fallbackTestVerifyUseOfBalancerBackendLists(inOrder, helper, serverList2);
}
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
if (!(balancerBroken && allSubchannelsBroken && timerExpires)) {
if (!(balancerBroken && allSubchannelsBroken)) {
verify(helper, never()).createSubchannel(eq(resolutionList.get(0)), any(Attributes.class));
verify(helper, never()).createSubchannel(eq(resolutionList.get(2)), any(Attributes.class));
}