From b701e8920daaccf9e6eb2916d2ea94da8df74be5 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Wed, 24 Oct 2018 09:35:55 -0700 Subject: [PATCH] grpclb: enter fallback when LB stream broken even before fallback timer expires (#4990) Previously the client waits ~10 seconds until the fallback timer has expired. While the timer is useful to address the long tail, it shouldn't delay using the fallback in case of obvious errors, like the channel failing to connect or an UNIMPLEMENTED response. --- .../main/java/io/grpc/grpclb/GrpclbState.java | 3 - .../grpc/grpclb/GrpclbLoadBalancerTest.java | 81 +++++++++++++------ 2 files changed, 57 insertions(+), 27 deletions(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index e509d8465b..736b6039c5 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -217,9 +217,6 @@ final class GrpclbState { if (usingFallbackBackends) { return; } - if (fallbackTimer != null && fallbackTimer.isPending()) { - return; - } int numReadySubchannels = 0; for (Subchannel subchannel : subchannels.values()) { if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) { diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 633320396f..64ade74903 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -1055,29 +1055,6 @@ public class GrpclbLoadBalancerTest { fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS); assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); - ///////////////////////////////////////////// - // Break the LB stream before timer expires - ///////////////////////////////////////////// - Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); - lbResponseObserver.onError(streamError.asException()); - // Not in fallback mode. The error will be propagated. - verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker.dropList).isEmpty(); - ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList); - Status status = errorEntry.result.getStatus(); - assertThat(status.getCode()).isEqualTo(streamError.getCode()); - assertThat(status.getDescription()).contains(streamError.getDescription()); - // A new stream is created - verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); - lbResponseObserver = lbResponseObserverCaptor.getValue(); - assertEquals(1, lbRequestObservers.size()); - lbRequestObserver = lbRequestObservers.poll(); - verify(lbRequestObserver).onNext( - eq(LoadBalanceRequest.newBuilder().setInitialRequest( - InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) - .build())); - ////////////////////////////////// // Fallback timer expires (or not) ////////////////////////////////// @@ -1134,13 +1111,14 @@ public class GrpclbLoadBalancerTest { // Break the LB stream after the timer expires //////////////////////////////////////////////// if (timerExpires) { + Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); lbResponseObserver.onError(streamError.asException()); // The error will NOT propagate to picker because fallback list is in use. inOrder.verify(helper, never()) .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); // A new stream is created - verify(mockLbService, times(3)).balanceLoad(lbResponseObserverCaptor.capture()); + verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); lbResponseObserver = lbResponseObserverCaptor.getValue(); assertEquals(1, lbRequestObservers.size()); lbRequestObserver = lbRequestObservers.poll(); @@ -1175,6 +1153,61 @@ public class GrpclbLoadBalancerTest { assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); } + @Test + public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() { + long loadReportIntervalMillis = 1983; + InOrder inOrder = inOrder(helper, subchannelPool); + + // Create a resolution list with a mixture of balancer and backend addresses + List resolutionList = + createResolvedServerAddresses(false, true, false); + Attributes resolutionAttrs = Attributes.EMPTY; + deliverResolvedAddresses(resolutionList, resolutionAttrs); + + inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0))); + + // Attempted to connect to balancer + assertEquals(1, fakeOobChannels.size()); + ManagedChannel oobChannel = fakeOobChannels.poll(); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis)); + // We don't care if these methods have been run. + inOrder.verify(helper, atLeast(0)).getSynchronizationContext(); + inOrder.verify(helper, atLeast(0)).getScheduledExecutorService(); + + inOrder.verifyNoMoreInteractions(); + + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + + ///////////////////////////////////////////// + // Break the LB stream before timer expires + ///////////////////////////////////////////// + Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken"); + lbResponseObserver.onError(streamError.asException()); + + // Fall back to the backends from resolver + fallbackTestVerifyUseOfFallbackBackendLists( + inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2))); + + // A new stream is created + verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture()); + lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder().setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + } + @Test public void grpclbFallback_balancerLost() { subtestGrpclbFallbackConnectionLost(true, false);