diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index 6d837d6ff2..9957535a5d 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -885,6 +885,8 @@ final class CachingRlsLbClient { headers.discardAll(RLS_DATA_KEY); headers.put(RLS_DATA_KEY, response.getHeaderData()); } + String defaultTarget = lbPolicyConfig.getRouteLookupConfig().getDefaultTarget(); + boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty(); if (response.hasData()) { ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper(); SubchannelPicker picker = childPolicyWrapper.getPicker(); @@ -895,9 +897,15 @@ final class CachingRlsLbClient { if (result.getStatus().isOk()) { return result; } - return useFallback(args); + if (hasFallback) { + return useFallback(args); + } + return PickResult.withError(result.getStatus()); } else if (response.hasError()) { - return useFallback(args); + if (hasFallback) { + return useFallback(args); + } + return PickResult.withError(response.getStatus()); } else { return PickResult.withNoResult(); } @@ -907,12 +915,8 @@ final class CachingRlsLbClient { /** Uses Subchannel connected to default target. */ private PickResult useFallback(PickSubchannelArgs args) { - String defaultTarget = lbPolicyConfig.getRouteLookupConfig().getDefaultTarget(); - if (fallbackChildPolicyWrapper == null - || !fallbackChildPolicyWrapper.getTarget().equals(defaultTarget)) { - // TODO(creamsoup) wait until lb is ready - startFallbackChildPolicy(); - } + // TODO(creamsoup) wait until lb is ready + startFallbackChildPolicy(); SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker(); if (picker == null) { return PickResult.withNoResult(); @@ -922,8 +926,12 @@ final class CachingRlsLbClient { private void startFallbackChildPolicy() { String defaultTarget = lbPolicyConfig.getRouteLookupConfig().getDefaultTarget(); - fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget); - + synchronized (lock) { + if (fallbackChildPolicyWrapper != null) { + return; + } + fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget); + } LoadBalancerProvider lbProvider = lbPolicyConfig.getLoadBalancingPolicy().getEffectiveLbProvider(); final LoadBalancer lb = diff --git a/rls/src/main/java/io/grpc/rls/RlsProtoData.java b/rls/src/main/java/io/grpc/rls/RlsProtoData.java index 8a40f965e0..fbcb6feb21 100644 --- a/rls/src/main/java/io/grpc/rls/RlsProtoData.java +++ b/rls/src/main/java/io/grpc/rls/RlsProtoData.java @@ -195,6 +195,7 @@ final class RlsProtoData { private final ImmutableList validTargets; + @Nullable private final String defaultTarget; RouteLookupConfig( @@ -205,6 +206,7 @@ final class RlsProtoData { @Nullable Long staleAgeInMillis, long cacheSizeBytes, List validTargets, + @Nullable String defaultTarget) { checkState( !checkNotNull(grpcKeyBuilders, "grpcKeyBuilders").isEmpty(), @@ -233,7 +235,7 @@ final class RlsProtoData { checkArgument(cacheSizeBytes > 0, "cacheSize must be positive"); this.cacheSizeBytes = cacheSizeBytes; this.validTargets = ImmutableList.copyOf(checkNotNull(validTargets, "validTargets")); - this.defaultTarget = checkNotNull(defaultTarget, "defaultTarget"); + this.defaultTarget = defaultTarget; } /** diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index 45cac901b9..69163a024c 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -97,7 +97,7 @@ public class RlsLoadBalancerTest { public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule(); @Rule public final MockitoRule mocks = MockitoJUnit.rule(); - + private final RlsLoadBalancerProvider provider = new RlsLoadBalancerProvider(); private final DoNotUseDirectScheduledExecutorService fakeScheduledExecutorService = mock(DoNotUseDirectScheduledExecutorService.class, CALLS_REAL_METHODS); private final SynchronizationContext syncContext = @@ -119,6 +119,7 @@ public class RlsLoadBalancerTest { private MethodDescriptor fakeRescueMethod; private RlsLoadBalancer rlsLb; private boolean existingEnableOobChannelDirectPath; + private String defaultTarget = "defaultTarget"; @Before public void setUp() throws Exception { @@ -150,11 +151,6 @@ public class RlsLoadBalancerTest { "localhost:8972", "/com.google/Rescue", "grpc", ImmutableMap.of()), new RouteLookupResponse(ImmutableList.of("civilization"), "you are safe"))); - RlsLoadBalancerProvider provider = new RlsLoadBalancerProvider(); - ConfigOrError parsedConfigOrError = - provider.parseLoadBalancingPolicyConfig(getServiceConfig()); - - assertThat(parsedConfigOrError.getConfig()).isNotNull(); rlsLb = (RlsLoadBalancer) provider.newLoadBalancer(helper); rlsLb.cachingRlsLbClientBuilderProvider = new CachingRlsLbClientBuilderProvider() { @Override @@ -163,11 +159,6 @@ public class RlsLoadBalancerTest { return CachingRlsLbClient.newBuilder(); } }; - rlsLb.handleResolvedAddresses(ResolvedAddresses.newBuilder() - .setAddresses(ImmutableList.of(new EquivalentAddressGroup(mock(SocketAddress.class)))) - .setLoadBalancingPolicyConfig(parsedConfigOrError.getConfig()) - .build()); - verify(helper).createResolvingOobChannelBuilder(anyString()); } @After @@ -177,7 +168,8 @@ public class RlsLoadBalancerTest { } @Test - public void lb_working() { + public void lb_working_withDefaultTarget() throws Exception { + deliverResolvedAddresses(); InOrder inOrder = inOrder(helper); inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); @@ -261,9 +253,79 @@ public class RlsLoadBalancerTest { } @Test - public void lb_nameResolutionFailed() { + public void lb_working_withoutDefaultTarget() throws Exception { + defaultTarget = ""; + deliverResolvedAddresses(); InOrder inOrder = inOrder(helper); + inOrder.verify(helper) + .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + SubchannelPicker picker = pickerCaptor.getValue(); + Metadata headers = new Metadata(); + PickResult res = picker.pickSubchannel( + new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); + inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); + inOrder.verify(helper) + .updateBalancingState(eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class)); + inOrder.verifyNoMoreInteractions(); + assertThat(res.getStatus().isOk()).isTrue(); + assertThat(subchannels).hasSize(1); + FakeSubchannel searchSubchannel = subchannels.getLast(); + searchSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + inOrder.verify(helper) + .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + inOrder.verifyNoMoreInteractions(); + assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); + assertThat(res.getSubchannel().getAddresses()).isEqualTo(searchSubchannel.getAddresses()); + assertThat(res.getSubchannel().getAttributes()).isEqualTo(searchSubchannel.getAttributes()); + + // rescue should be pending status although the overall channel state is READY + picker = pickerCaptor.getValue(); + res = picker.pickSubchannel( + new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT)); + inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); + // other rls picker itself is ready due to first channel. + inOrder.verify(helper) + .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + inOrder.verifyNoMoreInteractions(); + assertThat(res.getStatus().isOk()).isTrue(); + assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); + assertThat(subchannels).hasSize(2); + FakeSubchannel rescueSubchannel = subchannels.getLast(); + + // search subchannel is down, rescue subchannel is still connecting + searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND)); + inOrder.verify(helper) + .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + + rescueSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY)); + inOrder.verify(helper) + .updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture()); + + // search method will fail because there is no fallback target. + picker = pickerCaptor.getValue(); + res = picker.pickSubchannel( + new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT)); + assertThat(res.getStatus().isOk()).isFalse(); + assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); + + res = picker.pickSubchannel( + new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT)); + assertThat(subchannelIsReady(res.getSubchannel())).isTrue(); + assertThat(res.getSubchannel().getAddresses()).isEqualTo(rescueSubchannel.getAddresses()); + assertThat(res.getSubchannel().getAttributes()).isEqualTo(rescueSubchannel.getAttributes()); + + // all channels are failed + rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND)); + inOrder.verify(helper) + .updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture()); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void lb_nameResolutionFailed() throws Exception { + deliverResolvedAddresses(); + InOrder inOrder = inOrder(helper); inOrder.verify(helper) .updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); SubchannelPicker picker = pickerCaptor.getValue(); @@ -307,8 +369,19 @@ public class RlsLoadBalancerTest { assertThat(subchannelIsReady(res.getSubchannel())).isFalse(); } + private void deliverResolvedAddresses() throws Exception { + ConfigOrError parsedConfigOrError = + provider.parseLoadBalancingPolicyConfig(getServiceConfig()); + assertThat(parsedConfigOrError.getConfig()).isNotNull(); + rlsLb.handleResolvedAddresses(ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of(new EquivalentAddressGroup(mock(SocketAddress.class)))) + .setLoadBalancingPolicyConfig(parsedConfigOrError.getConfig()) + .build()); + verify(helper).createResolvingOobChannelBuilder(anyString()); + } + @SuppressWarnings("unchecked") - private static Map getServiceConfig() throws IOException { + private Map getServiceConfig() throws IOException { String serviceConfig = "{" + " \"routeLookupConfig\": " + getRlsConfigJsonStr() + ", " + " \"childPolicy\": [{\"pick_first\": {}}]," @@ -317,7 +390,7 @@ public class RlsLoadBalancerTest { return (Map) JsonParser.parse(serviceConfig); } - private static String getRlsConfigJsonStr() { + private String getRlsConfigJsonStr() { return "{\n" + " \"grpcKeyBuilders\": [\n" + " {\n" @@ -342,7 +415,7 @@ public class RlsLoadBalancerTest { + " \"staleAge\": 240,\n" + " \"validTargets\": [\"localhost:9001\", \"localhost:9002\"]," + " \"cacheSizeBytes\": 1000,\n" - + " \"defaultTarget\": \"defaultTarget\",\n" + + " \"defaultTarget\": \"" + defaultTarget + "\",\n" + " \"requestProcessingStrategy\": \"SYNC_LOOKUP_DEFAULT_TARGET_ON_ERROR\"\n" + "}"; }