From 50cdfa9f05131ef0066cca99140a5f99339a49e3 Mon Sep 17 00:00:00 2001 From: Larry Safran <107004254+larry-safran@users.noreply.github.com> Date: Wed, 20 Jul 2022 17:20:19 -0700 Subject: [PATCH] rls: Only use subchannel policy for default target when RLS is not available (#9383) * core: Only use subchannel policy for default target when RLS is not available Fixes #9237 --- .../java/io/grpc/rls/CachingRlsLbClient.java | 9 +- .../java/io/grpc/rls/RlsLoadBalancerTest.java | 96 +++++++++++++++---- 2 files changed, 77 insertions(+), 28 deletions(-) diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index fbd2e46851..6c777f4514 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -931,14 +931,7 @@ final class CachingRlsLbClient { if (picker == null) { return PickResult.withNoResult(); } - PickResult result = picker.pickSubchannel(args); - if (result.getStatus().isOk()) { - return result; - } - if (hasFallback) { - return useFallback(args); - } - return PickResult.withError(result.getStatus()); + return picker.pickSubchannel(args); } else if (response.hasError()) { if (hasFallback) { return useFallback(args); diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index 120b205bf4..def99eb6ae 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -74,6 +74,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.junit.After; import org.junit.Before; @@ -110,6 +111,7 @@ public class RlsLoadBalancerTest { mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper())); private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl(); private final Deque subchannels = new LinkedList<>(); + private final FakeThrottler fakeThrottler = new FakeThrottler(); @Mock private Marshaller mockMarshaller; @Captor @@ -120,7 +122,7 @@ public class RlsLoadBalancerTest { private String defaultTarget = "defaultTarget"; @Before - public void setUp() throws Exception { + public void setUp() { MockitoAnnotations.initMocks(this); fakeSearchMethod = @@ -154,19 +156,21 @@ public class RlsLoadBalancerTest { rlsLb.cachingRlsLbClientBuilderProvider = new CachingRlsLbClientBuilderProvider() { @Override public CachingRlsLbClient.Builder get() { - // using default throttler which doesn't throttle - return CachingRlsLbClient.newBuilder(); + // using fake throttler to allow enablement of throttler + return CachingRlsLbClient.newBuilder() + .setThrottler(fakeThrottler) + .setTicker(fakeClock.getTicker()); } }; } @After - public void tearDown() throws Exception { + public void tearDown() { rlsLb.shutdown(); } @Test - public void lb_working_withDefaultTarget() throws Exception { + public void lb_working_withDefaultTarget_rlsResponding() throws Exception { deliverResolvedAddresses(); InOrder inOrder = inOrder(helper); inOrder.verify(helper) @@ -215,40 +219,76 @@ public class RlsLoadBalancerTest { inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); - // search again, use pending fallback because searchSubchannel is in failure mode + // search again, verify that it doesn't use fallback, since RLS server responded, even though + // subchannel is in failure mode res = picker.pickSubchannel( new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); - assertThat(res.getStatus().isOk()).isTrue(); + assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND); assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); + } + @Test + public void lb_working_withDefaultTarget_noRlsResponse() throws Exception { + fakeThrottler.nextResult = true; + + deliverResolvedAddresses(); + InOrder inOrder = inOrder(helper); + inOrder.verify(helper) + .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + Metadata headers = new Metadata(); + PickResult res; + + // Search that when the RLS server doesn't respond, that fallback is used + res = picker.pickSubchannel( + new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); + FakeSubchannel fallbackSubchannel = (FakeSubchannel) res.getSubchannel(); + + assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK); + assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); - assertThat(subchannels).hasSize(3); - FakeSubchannel fallbackSubchannel = subchannels.getLast(); fallbackSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); - inOrder.verify(helper, times(2)) + inOrder.verify(helper, times(1)) .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); inOrder.verifyNoMoreInteractions(); res = picker.pickSubchannel( new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); - assertThat(res.getSubchannel().getAddresses()).isEqualTo(searchSubchannel.getAddresses()); - assertThat(res.getSubchannel().getAttributes()).isEqualTo(searchSubchannel.getAttributes()); + assertThat(res.getSubchannel()).isSameInstanceAs(fallbackSubchannel); res = picker.pickSubchannel( new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT)); assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); - assertThat(res.getSubchannel().getAddresses()).isEqualTo(rescueSubchannel.getAddresses()); - assertThat(res.getSubchannel().getAttributes()).isEqualTo(rescueSubchannel.getAttributes()); + assertThat(res.getSubchannel()).isSameInstanceAs(fallbackSubchannel); + + // Make sure that when RLS starts communicating that default stops being used + fakeThrottler.nextResult = false; + fakeClock.forwardTime(2, TimeUnit.SECONDS); // Expires backoff cache entries + // Create search subchannel + res = picker.pickSubchannel( + new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); + assertThat(res.getSubchannel()).isNotSameInstanceAs(fallbackSubchannel); + FakeSubchannel searchSubchannel = (FakeSubchannel) res.getSubchannel(); + searchSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + + // create rescue subchannel + res = picker.pickSubchannel( + new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT)); + assertThat(res.getSubchannel()).isNotSameInstanceAs(fallbackSubchannel); + assertThat(res.getSubchannel()).isNotSameInstanceAs(searchSubchannel); + FakeSubchannel rescueSubchannel = (FakeSubchannel) res.getSubchannel(); + rescueSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); // all channels are failed rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND)); - inOrder.verify(helper) - .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND)); fallbackSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND)); - inOrder.verify(helper) - .updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); - inOrder.verifyNoMoreInteractions(); + + res = picker.pickSubchannel( + new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); + assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND); + assertThat(res.getSubchannel()).isNull(); } @Test @@ -546,7 +586,7 @@ public class RlsLoadBalancerTest { private final Attributes attributes; private List eags; private SubchannelStateListener listener; - private boolean isReady; + private volatile boolean isReady; public FakeSubchannel(List eags, Attributes attributes) { this.eags = Collections.unmodifiableList(eags); @@ -590,4 +630,20 @@ public class RlsLoadBalancerTest { private static boolean subchannelIsReady(Subchannel subchannel) { return subchannel instanceof FakeSubchannel && ((FakeSubchannel) subchannel).isReady; } + + private static final class FakeThrottler implements Throttler { + + private boolean nextResult = false; + + @Override + public boolean shouldThrottle() { + return nextResult; + } + + @Override + public void registerBackendResponse(boolean throttled) { + // no-op + } + } + }