mirror of https://github.com/grpc/grpc-java.git
grpclb: enter fallback when LB stream broken even before fallback timer expires (#4990)
Previously the client waits ~10 seconds until the fallback timer has expired. While the timer is useful to address the long tail, it shouldn't delay using the fallback in case of obvious errors, like the channel failing to connect or an UNIMPLEMENTED response.
This commit is contained in:
parent
7582049a95
commit
b701e8920d
|
|
@ -217,9 +217,6 @@ final class GrpclbState {
|
|||
if (usingFallbackBackends) {
|
||||
return;
|
||||
}
|
||||
if (fallbackTimer != null && fallbackTimer.isPending()) {
|
||||
return;
|
||||
}
|
||||
int numReadySubchannels = 0;
|
||||
for (Subchannel subchannel : subchannels.values()) {
|
||||
if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) {
|
||||
|
|
|
|||
|
|
@ -1055,29 +1055,6 @@ public class GrpclbLoadBalancerTest {
|
|||
fakeClock.forwardTime(GrpclbState.FALLBACK_TIMEOUT_MS - 1, TimeUnit.MILLISECONDS);
|
||||
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
|
||||
|
||||
/////////////////////////////////////////////
|
||||
// Break the LB stream before timer expires
|
||||
/////////////////////////////////////////////
|
||||
Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
|
||||
lbResponseObserver.onError(streamError.asException());
|
||||
// Not in fallback mode. The error will be propagated.
|
||||
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
|
||||
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
|
||||
assertThat(picker.dropList).isEmpty();
|
||||
ErrorEntry errorEntry = (ErrorEntry) Iterables.getOnlyElement(picker.pickList);
|
||||
Status status = errorEntry.result.getStatus();
|
||||
assertThat(status.getCode()).isEqualTo(streamError.getCode());
|
||||
assertThat(status.getDescription()).contains(streamError.getDescription());
|
||||
// A new stream is created
|
||||
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
|
||||
lbResponseObserver = lbResponseObserverCaptor.getValue();
|
||||
assertEquals(1, lbRequestObservers.size());
|
||||
lbRequestObserver = lbRequestObservers.poll();
|
||||
verify(lbRequestObserver).onNext(
|
||||
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
|
||||
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
|
||||
.build()));
|
||||
|
||||
//////////////////////////////////
|
||||
// Fallback timer expires (or not)
|
||||
//////////////////////////////////
|
||||
|
|
@ -1134,13 +1111,14 @@ public class GrpclbLoadBalancerTest {
|
|||
// Break the LB stream after the timer expires
|
||||
////////////////////////////////////////////////
|
||||
if (timerExpires) {
|
||||
Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
|
||||
lbResponseObserver.onError(streamError.asException());
|
||||
|
||||
// The error will NOT propagate to picker because fallback list is in use.
|
||||
inOrder.verify(helper, never())
|
||||
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
|
||||
// A new stream is created
|
||||
verify(mockLbService, times(3)).balanceLoad(lbResponseObserverCaptor.capture());
|
||||
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
|
||||
lbResponseObserver = lbResponseObserverCaptor.getValue();
|
||||
assertEquals(1, lbRequestObservers.size());
|
||||
lbRequestObserver = lbRequestObservers.poll();
|
||||
|
|
@ -1175,6 +1153,61 @@ public class GrpclbLoadBalancerTest {
|
|||
assertEquals(0, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void grpclbFallback_breakLbStreamBeforeFallbackTimerExpires() {
|
||||
long loadReportIntervalMillis = 1983;
|
||||
InOrder inOrder = inOrder(helper, subchannelPool);
|
||||
|
||||
// Create a resolution list with a mixture of balancer and backend addresses
|
||||
List<EquivalentAddressGroup> resolutionList =
|
||||
createResolvedServerAddresses(false, true, false);
|
||||
Attributes resolutionAttrs = Attributes.EMPTY;
|
||||
deliverResolvedAddresses(resolutionList, resolutionAttrs);
|
||||
|
||||
inOrder.verify(helper).createOobChannel(eq(resolutionList.get(1)), eq(lbAuthority(0)));
|
||||
|
||||
// Attempted to connect to balancer
|
||||
assertEquals(1, fakeOobChannels.size());
|
||||
ManagedChannel oobChannel = fakeOobChannels.poll();
|
||||
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
|
||||
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
|
||||
assertEquals(1, lbRequestObservers.size());
|
||||
StreamObserver<LoadBalanceRequest> lbRequestObserver = lbRequestObservers.poll();
|
||||
|
||||
verify(lbRequestObserver).onNext(
|
||||
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
|
||||
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
|
||||
.build()));
|
||||
lbResponseObserver.onNext(buildInitialResponse(loadReportIntervalMillis));
|
||||
// We don't care if these methods have been run.
|
||||
inOrder.verify(helper, atLeast(0)).getSynchronizationContext();
|
||||
inOrder.verify(helper, atLeast(0)).getScheduledExecutorService();
|
||||
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
|
||||
assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER));
|
||||
|
||||
/////////////////////////////////////////////
|
||||
// Break the LB stream before timer expires
|
||||
/////////////////////////////////////////////
|
||||
Status streamError = Status.UNAVAILABLE.withDescription("OOB stream broken");
|
||||
lbResponseObserver.onError(streamError.asException());
|
||||
|
||||
// Fall back to the backends from resolver
|
||||
fallbackTestVerifyUseOfFallbackBackendLists(
|
||||
inOrder, Arrays.asList(resolutionList.get(0), resolutionList.get(2)));
|
||||
|
||||
// A new stream is created
|
||||
verify(mockLbService, times(2)).balanceLoad(lbResponseObserverCaptor.capture());
|
||||
lbResponseObserver = lbResponseObserverCaptor.getValue();
|
||||
assertEquals(1, lbRequestObservers.size());
|
||||
lbRequestObserver = lbRequestObservers.poll();
|
||||
verify(lbRequestObserver).onNext(
|
||||
eq(LoadBalanceRequest.newBuilder().setInitialRequest(
|
||||
InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build())
|
||||
.build()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void grpclbFallback_balancerLost() {
|
||||
subtestGrpclbFallbackConnectionLost(true, false);
|
||||
|
|
|
|||
Loading…
Reference in New Issue