diff --git a/rls/build.gradle b/rls/build.gradle index c5b7d382ab..32004a7d43 100644 --- a/rls/build.gradle +++ b/rls/build.gradle @@ -17,6 +17,7 @@ dependencies { guavaDependency 'implementation' compileOnly libraries.javax_annotation testImplementation libraries.truth, + project(':grpc-grpclb'), project(':grpc-testing'), project(':grpc-testing-proto'), project(':grpc-core').sourceSets.test.output // for FakeClock diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index 69e3bb28d7..87d01360f4 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -162,11 +162,13 @@ final class CachingRlsLbClient { rlsConfig.getLookupService(), helper.getUnsafeChannelCredentials()); rlsChannelBuilder.overrideAuthority(helper.getAuthority()); if (enableOobChannelDirectPath) { + Map directPathServiceConfig = + getDirectPathServiceConfig(rlsConfig.getLookupService()); logger.log( ChannelLogLevel.DEBUG, "RLS channel direct path enabled. RLS channel service config: {0}", - getDirectpathServiceConfig()); - rlsChannelBuilder.defaultServiceConfig(getDirectpathServiceConfig()); + directPathServiceConfig); + rlsChannelBuilder.defaultServiceConfig(directPathServiceConfig); rlsChannelBuilder.disableServiceConfigLookUp(); } rlsChannel = rlsChannelBuilder.build(); @@ -183,12 +185,14 @@ final class CachingRlsLbClient { logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created"); } - private static ImmutableMap getDirectpathServiceConfig() { + private static ImmutableMap getDirectPathServiceConfig(String serviceName) { ImmutableMap pickFirstStrategy = ImmutableMap.of("pick_first", ImmutableMap.of()); ImmutableMap childPolicy = - ImmutableMap.of("childPolicy", ImmutableList.of(pickFirstStrategy)); + ImmutableMap.of( + "childPolicy", ImmutableList.of(pickFirstStrategy), + "serviceName", serviceName); ImmutableMap grpcLbPolicy = ImmutableMap.of("grpclb", childPolicy); diff --git a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java index 7a418876a2..c2c221800a 100644 --- a/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java +++ b/rls/src/test/java/io/grpc/rls/CachingRlsLbClientTest.java @@ -145,12 +145,16 @@ public class CachingRlsLbClientTest { private CachingRlsLbClient rlsLbClient; private boolean existingEnableOobChannelDirectPath; + private Map rlsChannelServiceConfig; + private String rlsChannelOverriddenAuthority; @Before public void setUp() throws Exception { existingEnableOobChannelDirectPath = CachingRlsLbClient.enableOobChannelDirectPath; CachingRlsLbClient.enableOobChannelDirectPath = false; + } + private void setUpRlsLbClient() { rlsLbClient = CachingRlsLbClient.newBuilder() .setBackoffProvider(fakeBackoffProvider) @@ -185,6 +189,7 @@ public class CachingRlsLbClientTest { @Test public void get_noError_lifeCycle() throws Exception { + setUpRlsLbClient(); InOrder inOrder = inOrder(evictionListener); RouteLookupRequest routeLookupRequest = new RouteLookupRequest( @@ -233,8 +238,44 @@ public class CachingRlsLbClientTest { inOrder.verifyNoMoreInteractions(); } + @Test + public void rls_overDirectPath() throws Exception { + CachingRlsLbClient.enableOobChannelDirectPath = true; + setUpRlsLbClient(); + RouteLookupRequest routeLookupRequest = + new RouteLookupRequest( + "bigtable.googleapis.com", "/foo/bar", "grpc", ImmutableMap.of()); + rlsServerImpl.setLookupTable( + ImmutableMap.of( + routeLookupRequest, + new RouteLookupResponse(ImmutableList.of("target"), "header"))); + + // initial request + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + assertThat(resp.isPending()).isTrue(); + + // server response + fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); + + resp = getInSyncContext(routeLookupRequest); + assertThat(resp.hasData()).isTrue(); + + assertThat(rlsChannelOverriddenAuthority).isEqualTo("bigtable.googleapis.com:443"); + assertThat(rlsChannelServiceConfig).isEqualTo( + ImmutableMap.of( + "loadBalancingConfig", + ImmutableList.of(ImmutableMap.of( + "grpclb", + ImmutableMap.of( + "childPolicy", + ImmutableList.of(ImmutableMap.of("pick_first", ImmutableMap.of())), + "serviceName", + "service1"))))); + } + @Test public void get_throttledAndRecover() throws Exception { + setUpRlsLbClient(); RouteLookupRequest routeLookupRequest = new RouteLookupRequest("server", "/foo/bar", "grpc", ImmutableMap.of()); rlsServerImpl.setLookupTable( @@ -276,6 +317,7 @@ public class CachingRlsLbClientTest { @Test public void get_updatesLbState() throws Exception { + setUpRlsLbClient(); InOrder inOrder = inOrder(helper); RouteLookupRequest routeLookupRequest = new RouteLookupRequest( @@ -344,6 +386,7 @@ public class CachingRlsLbClientTest { @Test public void get_childPolicyWrapper_reusedForSameTarget() throws Exception { + setUpRlsLbClient(); RouteLookupRequest routeLookupRequest = new RouteLookupRequest("server", "/foo/bar", "grpc", ImmutableMap.of()); RouteLookupRequest routeLookupRequest2 = @@ -565,6 +608,20 @@ public class CachingRlsLbClientTest { public ManagedChannel build() { return grpcCleanupRule.register(super.build()); } + + @Override + public CleaningChannelBuilder defaultServiceConfig(Map serviceConfig) { + rlsChannelServiceConfig = serviceConfig; + delegate().defaultServiceConfig(serviceConfig); + return this; + } + + @Override + public CleaningChannelBuilder overrideAuthority(String authority) { + rlsChannelOverriddenAuthority = authority; + delegate().overrideAuthority(authority); + return this; + } } return new CleaningChannelBuilder();