mirror of https://github.com/grpc/grpc-java.git
rls: support routeLookupChannelServiceConfig in RLS lb config
Implementing the latest change for RLS lb config.
```
The configuration for the LB policy will be of the following form:
{
"routeLookupConfig": <JSON form of RouteLookupConfig proto>,
"routeLookupChannelServiceConfig": {...service config JSON...},
"childPolicy": [
{"<policy name>": {...child policy config...}}
],
"childPolicyConfigTargetFieldName": "<name of field>"
}
```
>If the routeLookupChannelServiceConfig field is present, we will pass the specified service config to the RLS control plane channel, and we will disable fetching service config via that channel's resolver.
This commit is contained in:
parent
bd156f98d6
commit
7308d92034
|
|
@ -23,8 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Converter;
|
import com.google.common.base.Converter;
|
||||||
import com.google.common.base.MoreObjects;
|
import com.google.common.base.MoreObjects;
|
||||||
import com.google.common.base.MoreObjects.ToStringHelper;
|
import com.google.common.base.MoreObjects.ToStringHelper;
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
import io.grpc.ChannelLogger;
|
import io.grpc.ChannelLogger;
|
||||||
|
|
@ -83,13 +81,6 @@ final class CachingRlsLbClient {
|
||||||
private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
|
private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
|
||||||
RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
|
RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
|
||||||
|
|
||||||
// System property to use direct path enabled OobChannel, by default direct path is enabled.
|
|
||||||
private static final String RLS_ENABLE_OOB_CHANNEL_DIRECTPATH_PROPERTY =
|
|
||||||
"io.grpc.rls.CachingRlsLbClient.enable_oobchannel_directpath";
|
|
||||||
@VisibleForTesting
|
|
||||||
static boolean enableOobChannelDirectPath =
|
|
||||||
Boolean.parseBoolean(System.getProperty(RLS_ENABLE_OOB_CHANNEL_DIRECTPATH_PROPERTY, "false"));
|
|
||||||
|
|
||||||
// All cache status changes (pending, backoff, success) must be under this lock
|
// All cache status changes (pending, backoff, success) must be under this lock
|
||||||
private final Object lock = new Object();
|
private final Object lock = new Object();
|
||||||
// LRU cache based on access order (BACKOFF and actual data will be here)
|
// LRU cache based on access order (BACKOFF and actual data will be here)
|
||||||
|
|
@ -158,14 +149,14 @@ final class CachingRlsLbClient {
|
||||||
ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
|
ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
|
||||||
rlsConfig.getLookupService(), helper.getUnsafeChannelCredentials());
|
rlsConfig.getLookupService(), helper.getUnsafeChannelCredentials());
|
||||||
rlsChannelBuilder.overrideAuthority(helper.getAuthority());
|
rlsChannelBuilder.overrideAuthority(helper.getAuthority());
|
||||||
if (enableOobChannelDirectPath) {
|
Map<String, ?> routeLookupChannelServiceConfig =
|
||||||
Map<String, ?> directPathServiceConfig =
|
lbPolicyConfig.getRouteLookupChannelServiceConfig();
|
||||||
getDirectPathServiceConfig(rlsConfig.getLookupService());
|
if (routeLookupChannelServiceConfig != null) {
|
||||||
logger.log(
|
logger.log(
|
||||||
ChannelLogLevel.DEBUG,
|
ChannelLogLevel.DEBUG,
|
||||||
"RLS channel direct path enabled. RLS channel service config: {0}",
|
"RLS channel service config: {0}",
|
||||||
directPathServiceConfig);
|
routeLookupChannelServiceConfig);
|
||||||
rlsChannelBuilder.defaultServiceConfig(directPathServiceConfig);
|
rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
|
||||||
rlsChannelBuilder.disableServiceConfigLookUp();
|
rlsChannelBuilder.disableServiceConfigLookUp();
|
||||||
}
|
}
|
||||||
rlsChannel = rlsChannelBuilder.build();
|
rlsChannel = rlsChannelBuilder.build();
|
||||||
|
|
@ -184,21 +175,6 @@ final class CachingRlsLbClient {
|
||||||
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
|
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ImmutableMap<String, Object> getDirectPathServiceConfig(String serviceName) {
|
|
||||||
ImmutableMap<String, Object> pickFirstStrategy =
|
|
||||||
ImmutableMap.<String, Object>of("pick_first", ImmutableMap.of());
|
|
||||||
|
|
||||||
ImmutableMap<String, Object> childPolicy =
|
|
||||||
ImmutableMap.<String, Object>of(
|
|
||||||
"childPolicy", ImmutableList.of(pickFirstStrategy),
|
|
||||||
"serviceName", serviceName);
|
|
||||||
|
|
||||||
ImmutableMap<String, Object> grpcLbPolicy =
|
|
||||||
ImmutableMap.<String, Object>of("grpclb", childPolicy);
|
|
||||||
|
|
||||||
return ImmutableMap.<String, Object>of("loadBalancingConfig", ImmutableList.of(grpcLbPolicy));
|
|
||||||
}
|
|
||||||
|
|
||||||
@CheckReturnValue
|
@CheckReturnValue
|
||||||
private ListenableFuture<RouteLookupResponse> asyncRlsCall(RouteLookupRequest request) {
|
private ListenableFuture<RouteLookupResponse> asyncRlsCall(RouteLookupRequest request) {
|
||||||
final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
|
final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
|
||||||
|
|
|
||||||
|
|
@ -48,11 +48,15 @@ import javax.annotation.Nullable;
|
||||||
final class LbPolicyConfiguration {
|
final class LbPolicyConfiguration {
|
||||||
|
|
||||||
private final RouteLookupConfig routeLookupConfig;
|
private final RouteLookupConfig routeLookupConfig;
|
||||||
|
@Nullable
|
||||||
|
private final Map<String, ?> routeLookupChannelServiceConfig;
|
||||||
private final ChildLoadBalancingPolicy policy;
|
private final ChildLoadBalancingPolicy policy;
|
||||||
|
|
||||||
LbPolicyConfiguration(
|
LbPolicyConfiguration(
|
||||||
RouteLookupConfig routeLookupConfig, ChildLoadBalancingPolicy policy) {
|
RouteLookupConfig routeLookupConfig, @Nullable Map<String, ?> routeLookupChannelServiceConfig,
|
||||||
|
ChildLoadBalancingPolicy policy) {
|
||||||
this.routeLookupConfig = checkNotNull(routeLookupConfig, "routeLookupConfig");
|
this.routeLookupConfig = checkNotNull(routeLookupConfig, "routeLookupConfig");
|
||||||
|
this.routeLookupChannelServiceConfig = routeLookupChannelServiceConfig;
|
||||||
this.policy = checkNotNull(policy, "policy");
|
this.policy = checkNotNull(policy, "policy");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -60,6 +64,11 @@ final class LbPolicyConfiguration {
|
||||||
return routeLookupConfig;
|
return routeLookupConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
Map<String, ?> getRouteLookupChannelServiceConfig() {
|
||||||
|
return routeLookupChannelServiceConfig;
|
||||||
|
}
|
||||||
|
|
||||||
ChildLoadBalancingPolicy getLoadBalancingPolicy() {
|
ChildLoadBalancingPolicy getLoadBalancingPolicy() {
|
||||||
return policy;
|
return policy;
|
||||||
}
|
}
|
||||||
|
|
@ -74,18 +83,20 @@ final class LbPolicyConfiguration {
|
||||||
}
|
}
|
||||||
LbPolicyConfiguration that = (LbPolicyConfiguration) o;
|
LbPolicyConfiguration that = (LbPolicyConfiguration) o;
|
||||||
return Objects.equals(routeLookupConfig, that.routeLookupConfig)
|
return Objects.equals(routeLookupConfig, that.routeLookupConfig)
|
||||||
|
&& Objects.equals(routeLookupChannelServiceConfig, that.routeLookupChannelServiceConfig)
|
||||||
&& Objects.equals(policy, that.policy);
|
&& Objects.equals(policy, that.policy);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(routeLookupConfig, policy);
|
return Objects.hash(routeLookupConfig, routeLookupChannelServiceConfig, policy);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return MoreObjects.toStringHelper(this)
|
return MoreObjects.toStringHelper(this)
|
||||||
.add("routeLookupConfig", routeLookupConfig)
|
.add("routeLookupConfig", routeLookupConfig)
|
||||||
|
.add("routeLookupChannelServiceConfig", routeLookupChannelServiceConfig)
|
||||||
.add("policy", policy)
|
.add("policy", policy)
|
||||||
.toString();
|
.toString();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -62,12 +62,15 @@ public final class RlsLoadBalancerProvider extends LoadBalancerProvider {
|
||||||
try {
|
try {
|
||||||
RouteLookupConfig routeLookupConfig = new RouteLookupConfigConverter()
|
RouteLookupConfig routeLookupConfig = new RouteLookupConfigConverter()
|
||||||
.convert(JsonUtil.getObject(rawLoadBalancingConfigPolicy, "routeLookupConfig"));
|
.convert(JsonUtil.getObject(rawLoadBalancingConfigPolicy, "routeLookupConfig"));
|
||||||
|
Map<String, ?> routeLookupChannelServiceConfig =
|
||||||
|
JsonUtil.getObject(rawLoadBalancingConfigPolicy, "routeLookupChannelServiceConfig");
|
||||||
ChildLoadBalancingPolicy lbPolicy = ChildLoadBalancingPolicy
|
ChildLoadBalancingPolicy lbPolicy = ChildLoadBalancingPolicy
|
||||||
.create(
|
.create(
|
||||||
JsonUtil.getString(rawLoadBalancingConfigPolicy, "childPolicyConfigTargetFieldName"),
|
JsonUtil.getString(rawLoadBalancingConfigPolicy, "childPolicyConfigTargetFieldName"),
|
||||||
JsonUtil.checkObjectList(
|
JsonUtil.checkObjectList(
|
||||||
checkNotNull(JsonUtil.getList(rawLoadBalancingConfigPolicy, "childPolicy"))));
|
checkNotNull(JsonUtil.getList(rawLoadBalancingConfigPolicy, "childPolicy"))));
|
||||||
return ConfigOrError.fromConfig(new LbPolicyConfiguration(routeLookupConfig, lbPolicy));
|
return ConfigOrError.fromConfig(
|
||||||
|
new LbPolicyConfiguration(routeLookupConfig, routeLookupChannelServiceConfig, lbPolicy));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
return ConfigOrError.fromError(
|
return ConfigOrError.fromError(
|
||||||
Status.INVALID_ARGUMENT
|
Status.INVALID_ARGUMENT
|
||||||
|
|
|
||||||
|
|
@ -90,7 +90,6 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
|
@ -144,19 +143,12 @@ public class CachingRlsLbClientTest {
|
||||||
mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper()));
|
mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper()));
|
||||||
private final FakeThrottler fakeThrottler = new FakeThrottler();
|
private final FakeThrottler fakeThrottler = new FakeThrottler();
|
||||||
private final LbPolicyConfiguration lbPolicyConfiguration =
|
private final LbPolicyConfiguration lbPolicyConfiguration =
|
||||||
new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, childLbPolicy);
|
new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, null, childLbPolicy);
|
||||||
|
|
||||||
private CachingRlsLbClient rlsLbClient;
|
private CachingRlsLbClient rlsLbClient;
|
||||||
private boolean existingEnableOobChannelDirectPath;
|
|
||||||
private Map<String, ?> rlsChannelServiceConfig;
|
private Map<String, ?> rlsChannelServiceConfig;
|
||||||
private String rlsChannelOverriddenAuthority;
|
private String rlsChannelOverriddenAuthority;
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
existingEnableOobChannelDirectPath = CachingRlsLbClient.enableOobChannelDirectPath;
|
|
||||||
CachingRlsLbClient.enableOobChannelDirectPath = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setUpRlsLbClient() {
|
private void setUpRlsLbClient() {
|
||||||
rlsLbClient =
|
rlsLbClient =
|
||||||
CachingRlsLbClient.newBuilder()
|
CachingRlsLbClient.newBuilder()
|
||||||
|
|
@ -173,7 +165,6 @@ public class CachingRlsLbClientTest {
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
rlsLbClient.close();
|
rlsLbClient.close();
|
||||||
CachingRlsLbClient.enableOobChannelDirectPath = existingEnableOobChannelDirectPath;
|
|
||||||
assertWithMessage(
|
assertWithMessage(
|
||||||
"On client shut down, RlsLoadBalancer must shut down with all its child loadbalancers.")
|
"On client shut down, RlsLoadBalancer must shut down with all its child loadbalancers.")
|
||||||
.that(lbProvider.loadBalancers).isEmpty();
|
.that(lbProvider.loadBalancers).isEmpty();
|
||||||
|
|
@ -244,9 +235,29 @@ public class CachingRlsLbClientTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void rls_overDirectPath() throws Exception {
|
public void rls_withCustomRlsChannelServiceConfig() throws Exception {
|
||||||
CachingRlsLbClient.enableOobChannelDirectPath = true;
|
Map<String, ?> routeLookupChannelServiceConfig =
|
||||||
setUpRlsLbClient();
|
ImmutableMap.of(
|
||||||
|
"loadBalancingConfig",
|
||||||
|
ImmutableList.of(ImmutableMap.of(
|
||||||
|
"grpclb",
|
||||||
|
ImmutableMap.of(
|
||||||
|
"childPolicy",
|
||||||
|
ImmutableList.of(ImmutableMap.of("pick_first", ImmutableMap.of())),
|
||||||
|
"serviceName",
|
||||||
|
"service1"))));
|
||||||
|
LbPolicyConfiguration lbPolicyConfiguration = new LbPolicyConfiguration(
|
||||||
|
ROUTE_LOOKUP_CONFIG, routeLookupChannelServiceConfig, childLbPolicy);
|
||||||
|
rlsLbClient =
|
||||||
|
CachingRlsLbClient.newBuilder()
|
||||||
|
.setBackoffProvider(fakeBackoffProvider)
|
||||||
|
.setResolvedAddressesFactory(resolvedAddressFactory)
|
||||||
|
.setEvictionListener(evictionListener)
|
||||||
|
.setHelper(helper)
|
||||||
|
.setLbPolicyConfig(lbPolicyConfiguration)
|
||||||
|
.setThrottler(fakeThrottler)
|
||||||
|
.setTimeProvider(fakeTimeProvider)
|
||||||
|
.build();
|
||||||
RouteLookupRequest routeLookupRequest = new RouteLookupRequest(ImmutableMap.of(
|
RouteLookupRequest routeLookupRequest = new RouteLookupRequest(ImmutableMap.of(
|
||||||
"server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar"));
|
"server", "bigtable.googleapis.com", "service-key", "foo", "method-key", "bar"));
|
||||||
rlsServerImpl.setLookupTable(
|
rlsServerImpl.setLookupTable(
|
||||||
|
|
@ -265,16 +276,7 @@ public class CachingRlsLbClientTest {
|
||||||
assertThat(resp.hasData()).isTrue();
|
assertThat(resp.hasData()).isTrue();
|
||||||
|
|
||||||
assertThat(rlsChannelOverriddenAuthority).isEqualTo("bigtable.googleapis.com:443");
|
assertThat(rlsChannelOverriddenAuthority).isEqualTo("bigtable.googleapis.com:443");
|
||||||
assertThat(rlsChannelServiceConfig).isEqualTo(
|
assertThat(rlsChannelServiceConfig).isEqualTo(routeLookupChannelServiceConfig);
|
||||||
ImmutableMap.of(
|
|
||||||
"loadBalancingConfig",
|
|
||||||
ImmutableList.of(ImmutableMap.of(
|
|
||||||
"grpclb",
|
|
||||||
ImmutableMap.of(
|
|
||||||
"childPolicy",
|
|
||||||
ImmutableList.of(ImmutableMap.of("pick_first", ImmutableMap.of())),
|
|
||||||
"serviceName",
|
|
||||||
"service1")))));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
|
|
@ -118,16 +118,12 @@ public class RlsLoadBalancerTest {
|
||||||
private MethodDescriptor<Object, Object> fakeSearchMethod;
|
private MethodDescriptor<Object, Object> fakeSearchMethod;
|
||||||
private MethodDescriptor<Object, Object> fakeRescueMethod;
|
private MethodDescriptor<Object, Object> fakeRescueMethod;
|
||||||
private RlsLoadBalancer rlsLb;
|
private RlsLoadBalancer rlsLb;
|
||||||
private boolean existingEnableOobChannelDirectPath;
|
|
||||||
private String defaultTarget = "defaultTarget";
|
private String defaultTarget = "defaultTarget";
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
MockitoAnnotations.initMocks(this);
|
MockitoAnnotations.initMocks(this);
|
||||||
|
|
||||||
existingEnableOobChannelDirectPath = CachingRlsLbClient.enableOobChannelDirectPath;
|
|
||||||
CachingRlsLbClient.enableOobChannelDirectPath = false;
|
|
||||||
|
|
||||||
fakeSearchMethod =
|
fakeSearchMethod =
|
||||||
MethodDescriptor.newBuilder()
|
MethodDescriptor.newBuilder()
|
||||||
.setFullMethodName("com.google/Search")
|
.setFullMethodName("com.google/Search")
|
||||||
|
|
@ -168,7 +164,6 @@ public class RlsLoadBalancerTest {
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
rlsLb.shutdown();
|
rlsLb.shutdown();
|
||||||
CachingRlsLbClient.enableOobChannelDirectPath = existingEnableOobChannelDirectPath;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue