From 641d74f34f9bb80758b11d60c650714f192083ab Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Fri, 10 Jan 2020 13:54:43 -0800 Subject: [PATCH] grpclb: support explicit fallback from LB (#6549) --- .../main/java/io/grpc/grpclb/GrpclbState.java | 7 +- .../main/proto/grpc/lb/v1/load_balancer.proto | 7 + .../grpc/grpclb/GrpclbLoadBalancerTest.java | 214 +++++++++++++++++- 3 files changed, 225 insertions(+), 3 deletions(-) diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 6e44a7eed6..925eb2f04e 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -581,7 +581,12 @@ final class GrpclbState { return; } - if (typeCase != LoadBalanceResponseTypeCase.SERVER_LIST) { + if (typeCase == LoadBalanceResponseTypeCase.FALLBACK_RESPONSE) { + cancelFallbackTimer(); + useFallbackBackends(); + maybeUpdatePicker(); + return; + } else if (typeCase != LoadBalanceResponseTypeCase.SERVER_LIST) { logger.log(ChannelLogLevel.WARNING, "Ignoring unexpected response type: {0}", typeCase); return; } diff --git a/grpclb/src/main/proto/grpc/lb/v1/load_balancer.proto b/grpclb/src/main/proto/grpc/lb/v1/load_balancer.proto index 7f65e0e871..6f65d323af 100644 --- a/grpclb/src/main/proto/grpc/lb/v1/load_balancer.proto +++ b/grpclb/src/main/proto/grpc/lb/v1/load_balancer.proto @@ -94,6 +94,11 @@ message LoadBalanceResponse { // Contains the list of servers selected by the load balancer. The client // should send requests to these servers in the specified order. ServerList server_list = 2; + + // If this field is set, then the client should eagerly enter fallback + // mode (even if there are existing, healthy connections to backends). + // See go/grpclb-explicit-fallback for more details. + FallbackResponse fallback_response = 3; } } @@ -111,6 +116,8 @@ message InitialLoadBalanceResponse { google.protobuf.Duration client_stats_report_interval = 2; } +message FallbackResponse {} + message ServerList { // Contains a list of servers selected by the load balancer. The list will // be updated when server resolutions change or as needed to balance load diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index 1ccc530b72..a88b840d4d 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -81,6 +81,7 @@ import io.grpc.internal.GrpcAttributes; import io.grpc.internal.JsonParser; import io.grpc.lb.v1.ClientStats; import io.grpc.lb.v1.ClientStatsPerToken; +import io.grpc.lb.v1.FallbackResponse; import io.grpc.lb.v1.InitialLoadBalanceRequest; import io.grpc.lb.v1.InitialLoadBalanceResponse; import io.grpc.lb.v1.LoadBalanceRequest; @@ -288,7 +289,6 @@ public class GrpclbLoadBalancerTest { } @After - @SuppressWarnings("unchecked") public void tearDown() { try { if (balancer != null) { @@ -2085,6 +2085,210 @@ public class GrpclbLoadBalancerTest { assertThat(mode).isEqualTo(Mode.ROUND_ROBIN); } + @Test + public void grpclbWorking_lbSendsFallbackMessage() { + InOrder inOrder = inOrder(helper, subchannelPool); + List grpclbResolutionList = + createResolvedServerAddresses(true, true, false, false); + List fallbackEags = grpclbResolutionList.subList(2, 4); + Attributes grpclbResolutionAttrs = Attributes.EMPTY; + deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs); + + // Fallback timer is started as soon as the addresses are resolved. + assertEquals(1, fakeClock.numPendingTasks(FALLBACK_MODE_TASK_FILTER)); + + List addrs = new ArrayList<>(); + addrs.addAll(grpclbResolutionList.get(0).getAddresses()); + addrs.addAll(grpclbResolutionList.get(1).getAddresses()); + Attributes attr = grpclbResolutionList.get(0).getAttributes(); + EquivalentAddressGroup oobChannelEag = new EquivalentAddressGroup(addrs, attr); + verify(helper).createOobChannel(eq(oobChannelEag), eq(lbAuthority(0))); + assertEquals(1, fakeOobChannels.size()); + ManagedChannel oobChannel = fakeOobChannels.poll(); + verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture()); + StreamObserver lbResponseObserver = lbResponseObserverCaptor.getValue(); + assertEquals(1, lbRequestObservers.size()); + StreamObserver lbRequestObserver = lbRequestObservers.poll(); + verify(lbRequestObserver).onNext( + eq(LoadBalanceRequest.newBuilder() + .setInitialRequest( + InitialLoadBalanceRequest.newBuilder().setName(SERVICE_AUTHORITY).build()) + .build())); + + // Simulate receiving LB response + ServerEntry backend1a = new ServerEntry("127.0.0.1", 2000, "token0001"); + ServerEntry backend1b = new ServerEntry("127.0.0.1", 2010, "token0002"); + List backends1 = Arrays.asList(backend1a, backend1b); + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + logs.clear(); + lbResponseObserver.onNext(buildInitialResponse()); + assertThat(logs).containsExactly("DEBUG: Got an LB response: " + buildInitialResponse()); + logs.clear(); + lbResponseObserver.onNext(buildLbResponse(backends1)); + + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + + assertEquals(2, mockSubchannels.size()); + Subchannel subchannel1 = mockSubchannels.poll(); + Subchannel subchannel2 = mockSubchannels.poll(); + + verify(subchannel1).requestConnection(); + verify(subchannel2).requestConnection(); + assertEquals( + new EquivalentAddressGroup(backend1a.addr, LB_BACKEND_ATTRS), + subchannel1.getAddresses()); + assertEquals( + new EquivalentAddressGroup(backend1b.addr, LB_BACKEND_ATTRS), + subchannel2.getAddresses()); + + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(CONNECTING)); + + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker0.dropList).containsExactly(null, null); + assertThat(picker0.pickList).containsExactly(BUFFER_ENTRY); + inOrder.verifyNoMoreInteractions(); + + assertThat(logs).containsExactly( + "DEBUG: Got an LB response: " + buildLbResponse(backends1), + "INFO: Using RR list=" + + "[[[/127.0.0.1:2000]/{io.grpc.grpclb.lbProvidedBackend=true}](token0001)," + + " [[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}](token0002)]," + + " drop=[null, null]", + "INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder(); + logs.clear(); + + // Let subchannels be connected + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + assertThat(logs).containsExactly( + "INFO: READY: picks=" + + "[[[[[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0002)]]," + + " drops=[null, null]"); + logs.clear(); + + RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); + + assertThat(picker1.dropList).containsExactly(null, null); + assertThat(picker1.pickList).containsExactly( + new BackendEntry(subchannel2, getLoadRecorder(), "token0002")); + + deliverSubchannelState(subchannel1, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + assertThat(logs).containsExactly( + "INFO: READY: picks=" + + "[[[[[/127.0.0.1:2000]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0001)]," + + " [[[[/127.0.0.1:2010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token0002)]]," + + " drops=[null, null]"); + logs.clear(); + + RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker2.dropList).containsExactly(null, null); + assertThat(picker2.pickList).containsExactly( + new BackendEntry(subchannel1, getLoadRecorder(), "token0001"), + new BackendEntry(subchannel2, getLoadRecorder(), "token0002")) + .inOrder(); + + // enter fallback mode + lbResponseObserver.onNext(buildLbFallbackResponse()); + + // existing subchannels must be returned immediately to gracefully shutdown. + verify(subchannelPool) + .returnSubchannel(eq(subchannel1), eq(ConnectivityStateInfo.forNonError(READY))); + verify(subchannelPool) + .returnSubchannel(eq(subchannel2), eq(ConnectivityStateInfo.forNonError(READY))); + + // verify fallback + fallbackTestVerifyUseOfFallbackBackendLists(inOrder, fallbackEags); + + assertFalse(oobChannel.isShutdown()); + verify(lbRequestObserver, never()).onCompleted(); + + // exit fall back by providing two new backends + ServerEntry backend2a = new ServerEntry("127.0.0.1", 8000, "token1001"); + ServerEntry backend2b = new ServerEntry("127.0.0.1", 8010, "token1002"); + List backends2 = Arrays.asList(backend2a, backend2b); + + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + logs.clear(); + lbResponseObserver.onNext(buildLbResponse(backends2)); + + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + inOrder.verify(subchannelPool).takeOrCreateSubchannel( + eq(new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS)), + any(Attributes.class)); + + assertEquals(2, mockSubchannels.size()); + Subchannel subchannel3 = mockSubchannels.poll(); + Subchannel subchannel4 = mockSubchannels.poll(); + verify(subchannel3).requestConnection(); + verify(subchannel4).requestConnection(); + assertEquals( + new EquivalentAddressGroup(backend2a.addr, LB_BACKEND_ATTRS), + subchannel3.getAddresses()); + assertEquals( + new EquivalentAddressGroup(backend2b.addr, LB_BACKEND_ATTRS), + subchannel4.getAddresses()); + + deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(CONNECTING)); + deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(CONNECTING)); + + inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + RoundRobinPicker picker6 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker6.dropList).containsExactly(null, null); + assertThat(picker6.pickList).containsExactly(BUFFER_ENTRY); + inOrder.verifyNoMoreInteractions(); + + assertThat(logs).containsExactly( + "DEBUG: Got an LB response: " + buildLbResponse(backends2), + "INFO: Using RR list=" + + "[[[/127.0.0.1:8000]/{io.grpc.grpclb.lbProvidedBackend=true}](token1001)," + + " [[/127.0.0.1:8010]/{io.grpc.grpclb.lbProvidedBackend=true}](token1002)]," + + " drop=[null, null]", + "INFO: CONNECTING: picks=[BUFFER_ENTRY], drops=[null, null]").inOrder(); + logs.clear(); + + // Let new subchannels be connected + deliverSubchannelState(subchannel3, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + assertThat(logs).containsExactly( + "INFO: READY: picks=" + + "[[[[[/127.0.0.1:8000]/{io.grpc.grpclb.lbProvidedBackend=true}]](token1001)]]," + + " drops=[null, null]"); + logs.clear(); + + RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker3.dropList).containsExactly(null, null); + assertThat(picker3.pickList).containsExactly( + new BackendEntry(subchannel3, getLoadRecorder(), "token1001")); + + deliverSubchannelState(subchannel4, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + assertThat(logs).containsExactly( + "INFO: READY: picks=" + + "[[[[[/127.0.0.1:8000]/{io.grpc.grpclb.lbProvidedBackend=true}]](token1001)]," + + " [[[[/127.0.0.1:8010]/{io.grpc.grpclb.lbProvidedBackend=true}]](token1002)]]," + + " drops=[null, null]"); + logs.clear(); + + RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker4.dropList).containsExactly(null, null); + assertThat(picker4.pickList).containsExactly( + new BackendEntry(subchannel3, getLoadRecorder(), "token1001"), + new BackendEntry(subchannel4, getLoadRecorder(), "token1002")) + .inOrder(); + } + @SuppressWarnings("deprecation") private void deliverSubchannelState( final Subchannel subchannel, final ConnectivityStateInfo newState) { @@ -2154,7 +2358,13 @@ public class GrpclbLoadBalancerTest { return LoadBalanceResponse.newBuilder() .setInitialResponse( InitialLoadBalanceResponse.newBuilder() - .setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis))) + .setClientStatsReportInterval(Durations.fromMillis(loadReportIntervalMillis))) + .build(); + } + + private static LoadBalanceResponse buildLbFallbackResponse() { + return LoadBalanceResponse.newBuilder() + .setFallbackResponse(FallbackResponse.newBuilder().build()) .build(); }