From f44fc503106c36dad3a32b5f823d60777588bb4d Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Wed, 7 Feb 2018 14:42:00 -0800 Subject: [PATCH] =?UTF-8?q?grpclb:=20enter=20fallback=20mode=20immediately?= =?UTF-8?q?=20when=20balancer=20and=20all=20backend=E2=80=A6=20(#4007)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit grpclb: enter fallback mode immediately when balancer and all backend connections are lost Changed according to updated spec. --- .../main/java/io/grpc/grpclb/GrpclbState.java | 50 +++---- .../grpc/grpclb/GrpclbLoadBalancerTest.java | 131 ++++++++---------- 2 files changed, 82 insertions(+), 99 deletions(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index b35988f7f7..f295f916d2 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -101,7 +101,7 @@ final class GrpclbState { private static final Attributes.Key> STATE_INFO = Attributes.Key.of("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo"); - // Reset to null when timer expires or cancelled. + // Scheduled only once. Never reset. @Nullable private FallbackModeTask fallbackTimer; private List fallbackBackendList = Collections.emptyList(); @@ -146,7 +146,7 @@ final class GrpclbState { subchannel.requestConnection(); } subchannel.getAttributes().get(STATE_INFO).set(newState); - maybeStartFallbackTimer(); + maybeUseFallbackBackends(); maybeUpdatePicker(); } @@ -171,7 +171,12 @@ final class GrpclbState { startLbRpc(); } fallbackBackendList = newBackendServers; - maybeStartFallbackTimer(); + // Start the fallback timer if it's never started + if (fallbackTimer == null) { + logger.log(Level.FINE, "[{0}] Starting fallback timer.", new Object[] {logId}); + fallbackTimer = new FallbackModeTask(); + fallbackTimer.schedule(); + } if (usingFallbackBackends) { // Populate the new fallback backends to round-robin list. useFallbackBackends(); @@ -179,22 +184,16 @@ final class GrpclbState { maybeUpdatePicker(); } - /** - * Start the fallback timer if it's not already started and all connections are lost. - */ - private void maybeStartFallbackTimer() { - if (fallbackTimer != null) { - return; - } - if (fallbackBackendList.isEmpty()) { - return; - } + private void maybeUseFallbackBackends() { if (balancerWorking) { return; } if (usingFallbackBackends) { return; } + if (fallbackTimer != null && !fallbackTimer.discarded) { + return; + } int numReadySubchannels = 0; for (Subchannel subchannel : subchannels.values()) { if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) { @@ -204,9 +203,8 @@ final class GrpclbState { if (numReadySubchannels > 0) { return; } - logger.log(Level.FINE, "[{0}] Starting fallback timer.", new Object[] {logId}); - fallbackTimer = new FallbackModeTask(); - fallbackTimer.schedule(); + // Fallback contiditions met + useFallbackBackends(); } /** @@ -275,7 +273,6 @@ final class GrpclbState { private void cancelFallbackTimer() { if (fallbackTimer != null) { fallbackTimer.cancel(); - fallbackTimer = null; } } @@ -358,28 +355,24 @@ final class GrpclbState { @VisibleForTesting class FallbackModeTask implements Runnable { private ScheduledFuture scheduledFuture; - // If the scheduledFuture is cancelled after the task has made it into the ChannelExecutor, the - // task will be started anyway. Use this boolean to signal that the task should not be run. - private boolean cancelled; + private boolean discarded; @Override public void run() { helper.runSerialized(new Runnable() { @Override public void run() { - if (!cancelled) { - checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch"); - fallbackTimer = null; - useFallbackBackends(); - maybeUpdatePicker(); - } + checkState(fallbackTimer == FallbackModeTask.this, "fallback timer mismatch"); + discarded = true; + maybeUseFallbackBackends(); + maybeUpdatePicker(); } }); } void cancel() { + discarded = true; scheduledFuture.cancel(false); - cancelled = true; } void schedule() { @@ -556,7 +549,8 @@ final class GrpclbState { cleanUp(); propagateError(error); balancerWorking = false; - maybeStartFallbackTimer(); + maybeUseFallbackBackends(); + maybeUpdatePicker(); startLbRpc(); } diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 9d8bc4f154..dc18c1b31b 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -373,8 +373,8 @@ public class GrpclbLoadBalancerTest { .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); - // No backend address from resolver. Fallback timer is not started. - assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + // Fallback timer is started as soon as address is resolved. + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); assertEquals(1, fakeOobChannels.size()); verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); @@ -1022,6 +1022,9 @@ public class GrpclbLoadBalancerTest { .set(GrpclbConstants.ATTR_LB_POLICY, LbPolicy.GRPCLB).build(); deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + // Fallback timer is started as soon as the addresses are resolved. + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + assertSame(LbPolicy.GRPCLB, balancer.getLbPolicy()); assertNull(balancer.getDelegate()); verify(helper).createOobChannel(addrsEq(grpclbResolutionList.get(0)), eq(lbAuthority(0))); @@ -1036,9 +1039,6 @@ public class GrpclbLoadBalancerTest { InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) .build())); - // No backend address from resolver. Fallback timer is not started. - assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - // Simulate receiving LB response List backends1 = Arrays.asList( new ServerEntry("127.0.0.1", 2000, "token0001"), @@ -1209,8 +1209,7 @@ public class GrpclbLoadBalancerTest { subtestGrpclbFallbackInitialTimeout(true); } - // Fallback within the period of the initial timeout, where the server list is not received from - // the balancer. + // Fallback or not within the period of the initial timeout. private void subtestGrpclbFallbackInitialTimeout(boolean timerExpires) { long loadReportIntervalMillis = 1983; InOrder helperInOrder = inOrder(helper); @@ -1247,6 +1246,29 @@ public class GrpclbLoadBalancerTest { fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS); assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + ///////////////////////////////////////////// + // Break the LB stream before timer expires + ///////////////////////////////////////////// + Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); + lbResponseObserver.onError(streamError.asException()); + // Not in fallback mode. The error will be propagated. + verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker.dropList).isEmpty(); + ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList); + Status status = errorEntry.result.getStatus(); + assertThat(status.getCode()).isEqualTo(streamError.getCode()); + assertThat(status.getDescription()).contains(streamError.getDescription()); + // A new stream is created + verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + ////////////////////////////////// // Fallback timer expires (or not) ////////////////////////////////// @@ -1301,37 +1323,26 @@ public class GrpclbLoadBalancerTest { helperInOrder, helper, Arrays.asList(resolutionList.get(1), resolutionList.get(2))); } - /////////////////////// - // Break the LB stream - /////////////////////// - Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); - lbResponseObserver.onError(streamError.asException()); - + //////////////////////////////////////////////// + // Break the LB stream after the timer expires + //////////////////////////////////////////////// if (timerExpires) { + lbResponseObserver.onError(streamError.asException()); + // The error will NOT propagate to picker because fallback list is in use. helperInOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); - } else { - // Not in fallback mode. The error will be propagated. - verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker.dropList).isEmpty(); - ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList); - Status status = errorEntry.result.getStatus(); - assertThat(status.getCode()).isEqualTo(streamError.getCode()); - assertThat(status.getDescription()).contains(streamError.getDescription()); + // A new stream is created + verify(mockLbService, times(3)).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); } - // A new stream is created - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); - lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - ///////////////////////////////// // Balancer returns a server list ///////////////////////////////// @@ -1359,28 +1370,23 @@ public class GrpclbLoadBalancerTest { } @Test - public void grpclbFallback_balancerLost_timerExpires() { - subtestGrpclbFallbackConnectionLost(true, false, true); + public void grpclbFallback_balancerLost() { + subtestGrpclbFallbackConnectionLost(true, false); } @Test - public void grpclbFallback_subchannelsLost_timerExpires() { - subtestGrpclbFallbackConnectionLost(false, true, true); + public void grpclbFallback_subchannelsLost() { + subtestGrpclbFallbackConnectionLost(false, true); } @Test - public void grpclbFallback_allLost_timerExpires() { - subtestGrpclbFallbackConnectionLost(true, true, true); - } - - @Test - public void grpclbFallback_allLost_ResumeBeforeTimerExpires() { - subtestGrpclbFallbackConnectionLost(true, true, false); + public void grpclbFallback_allLost() { + subtestGrpclbFallbackConnectionLost(true, true); } // Fallback outside of the initial timeout, where all connections are lost. private void subtestGrpclbFallbackConnectionLost( - boolean balancerBroken, boolean allSubchannelsBroken, boolean timerExpires) { + boolean balancerBroken, boolean allSubchannelsBroken) { long loadReportIntervalMillis = 1983; InOrder inOrder = inOrder(helper, mockLbService); @@ -1419,9 +1425,6 @@ public class GrpclbLoadBalancerTest { lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(serverList)); - // No fallback timer scheduled - assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - List subchannels = fallbackTestVerifyUseOfBalancerBackendLists(inOrder, helper, serverList); @@ -1442,24 +1445,14 @@ public class GrpclbLoadBalancerTest { } if (balancerBroken && allSubchannelsBroken) { - // Fallback timer is scheduled if all connections are lost. - assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - if (timerExpires) { - fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS); - assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + // Going into fallback + subchannels = fallbackTestVerifyUseOfFallbackBackendLists( + inOrder, helper, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); - // Going into fallback - subchannels = fallbackTestVerifyUseOfFallbackBackendLists( - inOrder, helper, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); - - // When in fallback mode, fallback timer should not be scheduled when all backend - // connections are lost - for (Subchannel subchannel : subchannels) { - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); - } - assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - } else { - fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS); + // When in fallback mode, fallback timer should not be scheduled when all backend + // connections are lost + for (Subchannel subchannel : subchannels) { + deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); } // Exit fallback mode or cancel fallback timer when receiving a new server list from balancer @@ -1469,15 +1462,11 @@ public class GrpclbLoadBalancerTest { lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(serverList2)); - assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - if (timerExpires) { - fallbackTestVerifyUseOfBalancerBackendLists(inOrder, helper, serverList2); - } - } else { - assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + fallbackTestVerifyUseOfBalancerBackendLists(inOrder, helper, serverList2); } + assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - if (!(balancerBroken && allSubchannelsBroken && timerExpires)) { + if (!(balancerBroken && allSubchannelsBroken)) { verify(helper, never()).createSubchannel(eq(resolutionList.get(0)), any(Attributes.class)); verify(helper, never()).createSubchannel(eq(resolutionList.get(2)), any(Attributes.class)); }