mirror of https://github.com/grpc/grpc-java.git
core: refactor load-balancing config handling (#5397)
The LoadBalancingConfig message, which looks like
```json
{
"policy_name" : {
"config_key1" : "config_value1",
"config_key2" : "config_value2"
}
}
```
appears multiple times. It gets super tedious and confusing to handle, because both the whole config and the value (in the above example is `{ "config_key1" : "config_value1" }`) are just `Map<String, Object>`, and each user needs to do the following validation:
1. The whole config must have exactly one key
2. The value must be a map
Here I define `LbConfig` that holds the policy name and the config value, and a method in `ServiceConfigUtil` that converts the parsed JSON format into `LbConfig`.
There is also multiple cases where you need to handle a list of configs (top-level balancing policy, child and fallback policies in xds, grpclb child policies). I also made another helper method in `ServiceConfigUtil` to convert them into `List<LbConfig>`.
Found and fixed a bug in the xds code, where the top-level balancer should pass the config value (excluding the policy name), not the whole config to the child balancers. Search for "supported_1_option" in the diff to see it in the tests.
This commit is contained in:
parent
8806403b3a
commit
02f55189aa
|
|
@ -32,12 +32,12 @@ import io.grpc.LoadBalancer.SubchannelPicker;
|
||||||
import io.grpc.LoadBalancerProvider;
|
import io.grpc.LoadBalancerProvider;
|
||||||
import io.grpc.LoadBalancerRegistry;
|
import io.grpc.LoadBalancerRegistry;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
|
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedHashSet;
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
|
@ -215,6 +215,8 @@ public final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factor
|
||||||
}
|
}
|
||||||
|
|
||||||
if (haveBalancerAddress) {
|
if (haveBalancerAddress) {
|
||||||
|
// This is a special case where the existence of balancer address in the resolved address
|
||||||
|
// selects "grpclb" policy regardless of the service config.
|
||||||
LoadBalancerProvider grpclbProvider = registry.getProvider("grpclb");
|
LoadBalancerProvider grpclbProvider = registry.getProvider("grpclb");
|
||||||
if (grpclbProvider == null) {
|
if (grpclbProvider == null) {
|
||||||
if (backendAddrs.isEmpty()) {
|
if (backendAddrs.isEmpty()) {
|
||||||
|
|
@ -238,21 +240,16 @@ public final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factor
|
||||||
}
|
}
|
||||||
roundRobinDueToGrpclbDepMissing = false;
|
roundRobinDueToGrpclbDepMissing = false;
|
||||||
|
|
||||||
List<Map<String, Object>> lbConfigs = null;
|
List<LbConfig> lbConfigs = null;
|
||||||
if (config != null) {
|
if (config != null) {
|
||||||
lbConfigs = ServiceConfigUtil.getLoadBalancingConfigsFromServiceConfig(config);
|
List<Map<String, Object>> rawLbConfigs =
|
||||||
|
ServiceConfigUtil.getLoadBalancingConfigsFromServiceConfig(config);
|
||||||
|
lbConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList(rawLbConfigs);
|
||||||
}
|
}
|
||||||
if (lbConfigs != null && !lbConfigs.isEmpty()) {
|
if (lbConfigs != null && !lbConfigs.isEmpty()) {
|
||||||
LinkedHashSet<String> policiesTried = new LinkedHashSet<>();
|
LinkedHashSet<String> policiesTried = new LinkedHashSet<>();
|
||||||
for (Map<String, Object> lbConfig : lbConfigs) {
|
for (LbConfig lbConfig : lbConfigs) {
|
||||||
if (lbConfig.size() != 1) {
|
String policy = lbConfig.getPolicyName();
|
||||||
throw new PolicyException(
|
|
||||||
"There are " + lbConfig.size()
|
|
||||||
+ " load-balancing configs in a list item. Exactly one is expected. Config="
|
|
||||||
+ lbConfig);
|
|
||||||
}
|
|
||||||
Entry<String, Object> entry = lbConfig.entrySet().iterator().next();
|
|
||||||
String policy = entry.getKey();
|
|
||||||
LoadBalancerProvider provider = registry.getProvider(policy);
|
LoadBalancerProvider provider = registry.getProvider(policy);
|
||||||
if (provider != null) {
|
if (provider != null) {
|
||||||
if (!policiesTried.isEmpty()) {
|
if (!policiesTried.isEmpty()) {
|
||||||
|
|
@ -260,7 +257,7 @@ public final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factor
|
||||||
ChannelLogLevel.DEBUG,
|
ChannelLogLevel.DEBUG,
|
||||||
"{0} specified by Service Config are not available", policiesTried);
|
"{0} specified by Service Config are not available", policiesTried);
|
||||||
}
|
}
|
||||||
return new PolicySelection(provider, servers, (Map) entry.getValue());
|
return new PolicySelection(provider, servers, lbConfig.getRawConfigValue());
|
||||||
}
|
}
|
||||||
policiesTried.add(policy);
|
policiesTried.add(policy);
|
||||||
}
|
}
|
||||||
|
|
@ -297,13 +294,12 @@ public final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factor
|
||||||
final List<EquivalentAddressGroup> serverList;
|
final List<EquivalentAddressGroup> serverList;
|
||||||
@Nullable final Map<String, Object> config;
|
@Nullable final Map<String, Object> config;
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
PolicySelection(
|
PolicySelection(
|
||||||
LoadBalancerProvider provider, List<EquivalentAddressGroup> serverList,
|
LoadBalancerProvider provider, List<EquivalentAddressGroup> serverList,
|
||||||
@Nullable Map<?, ?> config) {
|
@Nullable Map<String, Object> config) {
|
||||||
this.provider = checkNotNull(provider, "provider");
|
this.provider = checkNotNull(provider, "provider");
|
||||||
this.serverList = Collections.unmodifiableList(checkNotNull(serverList, "serverList"));
|
this.serverList = Collections.unmodifiableList(checkNotNull(serverList, "serverList"));
|
||||||
this.config = (Map<String, Object>) config;
|
this.config = config;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkState;
|
||||||
import static com.google.common.math.LongMath.checkedAdd;
|
import static com.google.common.math.LongMath.checkedAdd;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.MoreObjects;
|
||||||
|
import com.google.common.base.Objects;
|
||||||
import io.grpc.internal.RetriableStream.Throttle;
|
import io.grpc.internal.RetriableStream.Throttle;
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
@ -345,35 +347,75 @@ public final class ServiceConfigUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extracts the loadbalancing policy name from loadbalancer config.
|
* Unwrap a LoadBalancingConfig JSON object into a {@link LbConfig}. The input is a JSON object
|
||||||
|
* (map) with exactly one entry, where the key is the policy name and the value is a config object
|
||||||
|
* for that policy.
|
||||||
*/
|
*/
|
||||||
public static String getBalancerPolicyNameFromLoadBalancingConfig(Map<String, Object> lbConfig) {
|
@SuppressWarnings("unchecked")
|
||||||
return lbConfig.entrySet().iterator().next().getKey();
|
public static LbConfig unwrapLoadBalancingConfig(Object lbConfig) {
|
||||||
|
Map<String, Object> map;
|
||||||
|
try {
|
||||||
|
map = (Map<String, Object>) lbConfig;
|
||||||
|
} catch (ClassCastException e) {
|
||||||
|
ClassCastException ex = new ClassCastException("Invalid type. Config=" + lbConfig);
|
||||||
|
ex.initCause(e);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
if (map.size() != 1) {
|
||||||
|
throw new RuntimeException(
|
||||||
|
"There are " + map.size() + " fields in a LoadBalancingConfig object. Exactly one"
|
||||||
|
+ " is expected. Config=" + lbConfig);
|
||||||
|
}
|
||||||
|
Map.Entry<String, Object> entry = map.entrySet().iterator().next();
|
||||||
|
Map<String, Object> configValue;
|
||||||
|
try {
|
||||||
|
configValue = (Map<String, Object>) entry.getValue();
|
||||||
|
} catch (ClassCastException e) {
|
||||||
|
ClassCastException ex =
|
||||||
|
new ClassCastException("Invalid value type. value=" + entry.getValue());
|
||||||
|
ex.initCause(e);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
return new LbConfig(entry.getKey(), configValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a JSON list of LoadBalancingConfigs, and convert it into a list of LbConfig.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public static List<LbConfig> unwrapLoadBalancingConfigList(Object listObject) {
|
||||||
|
List<?> list;
|
||||||
|
try {
|
||||||
|
list = (List<?>) listObject;
|
||||||
|
} catch (ClassCastException e) {
|
||||||
|
ClassCastException ex = new ClassCastException("List expected, but is " + listObject);
|
||||||
|
ex.initCause(e);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
ArrayList<LbConfig> result = new ArrayList<>();
|
||||||
|
for (Object rawChildPolicy : list) {
|
||||||
|
result.add(unwrapLoadBalancingConfig(rawChildPolicy));
|
||||||
|
}
|
||||||
|
return Collections.unmodifiableList(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extracts the loadbalancer name from xds loadbalancer config.
|
* Extracts the loadbalancer name from xds loadbalancer config.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
public static String getBalancerNameFromXdsConfig(LbConfig xdsConfig) {
|
||||||
public static String getBalancerNameFromXdsConfig(
|
Map<String, Object> map = xdsConfig.getRawConfigValue();
|
||||||
Map<String, Object> xdsConfig) {
|
return getString(map, XDS_CONFIG_BALANCER_NAME_KEY);
|
||||||
Object entry = xdsConfig.entrySet().iterator().next().getValue();
|
|
||||||
return getString((Map<String, Object>) entry, XDS_CONFIG_BALANCER_NAME_KEY);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extracts list of child policies from xds loadbalancer config.
|
* Extracts list of child policies from xds loadbalancer config.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public static List<Map<String, Object>> getChildPolicyFromXdsConfig(
|
public static List<LbConfig> getChildPolicyFromXdsConfig(LbConfig xdsConfig) {
|
||||||
Map<String, Object> xdsConfig) {
|
Map<String, Object> map = xdsConfig.getRawConfigValue();
|
||||||
Object rawEntry = xdsConfig.entrySet().iterator().next().getValue();
|
Object rawChildPolicies = map.get(XDS_CONFIG_CHILD_POLICY_KEY);
|
||||||
if (rawEntry instanceof Map) {
|
if (rawChildPolicies != null) {
|
||||||
Map<String, Object> entry = (Map<String, Object>) rawEntry;
|
return unwrapLoadBalancingConfigList(rawChildPolicies);
|
||||||
if (entry.containsKey(XDS_CONFIG_CHILD_POLICY_KEY)) {
|
|
||||||
return (List<Map<String, Object>>) (List<?>) getList(entry, XDS_CONFIG_CHILD_POLICY_KEY);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
@ -381,16 +423,12 @@ public final class ServiceConfigUtil {
|
||||||
/**
|
/**
|
||||||
* Extracts list of fallback policies from xds loadbalancer config.
|
* Extracts list of fallback policies from xds loadbalancer config.
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public static List<Map<String, Object>> getFallbackPolicyFromXdsConfig(
|
public static List<LbConfig> getFallbackPolicyFromXdsConfig(LbConfig xdsConfig) {
|
||||||
Map<String, Object> lbConfig) {
|
Map<String, Object> map = xdsConfig.getRawConfigValue();
|
||||||
Object rawEntry = lbConfig.entrySet().iterator().next().getValue();
|
Object rawFallbackPolicies = map.get(XDS_CONFIG_FALLBACK_POLICY_KEY);
|
||||||
if (rawEntry instanceof Map) {
|
if (rawFallbackPolicies != null) {
|
||||||
Map<String, Object> entry = (Map<String, Object>) rawEntry;
|
return unwrapLoadBalancingConfigList(rawFallbackPolicies);
|
||||||
if (entry.containsKey(XDS_CONFIG_FALLBACK_POLICY_KEY)) {
|
|
||||||
return (List<Map<String, Object>>) (List<?>) getList(entry, XDS_CONFIG_FALLBACK_POLICY_KEY);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
@ -642,4 +680,49 @@ public final class ServiceConfigUtil {
|
||||||
// we did over/under flow, if the sign is negative we should return MAX otherwise MIN
|
// we did over/under flow, if the sign is negative we should return MAX otherwise MIN
|
||||||
return Long.MAX_VALUE + ((naiveSum >>> (Long.SIZE - 1)) ^ 1);
|
return Long.MAX_VALUE + ((naiveSum >>> (Long.SIZE - 1)) ^ 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A LoadBalancingConfig that includes the policy name (the key) and its raw config value (parsed
|
||||||
|
* JSON).
|
||||||
|
*/
|
||||||
|
public static final class LbConfig {
|
||||||
|
private final String policyName;
|
||||||
|
private final Map<String, Object> rawConfigValue;
|
||||||
|
|
||||||
|
public LbConfig(String policyName, Map<String, Object> rawConfigValue) {
|
||||||
|
this.policyName = checkNotNull(policyName, "policyName");
|
||||||
|
this.rawConfigValue = checkNotNull(rawConfigValue, "rawConfigValue");
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPolicyName() {
|
||||||
|
return policyName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Object> getRawConfigValue() {
|
||||||
|
return rawConfigValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (o instanceof LbConfig) {
|
||||||
|
LbConfig other = (LbConfig) o;
|
||||||
|
return policyName.equals(other.policyName)
|
||||||
|
&& rawConfigValue.equals(other.rawConfigValue);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hashCode(policyName, rawConfigValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return MoreObjects.toStringHelper(this)
|
||||||
|
.add("policyName", policyName)
|
||||||
|
.add("rawConfigValue", rawConfigValue)
|
||||||
|
.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,9 +18,10 @@ package io.grpc.internal;
|
||||||
|
|
||||||
import static com.google.common.truth.Truth.assertThat;
|
import static com.google.common.truth.Truth.assertThat;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.JUnit4;
|
import org.junit.runners.JUnit4;
|
||||||
|
|
@ -30,17 +31,6 @@ import org.junit.runners.JUnit4;
|
||||||
*/
|
*/
|
||||||
@RunWith(JUnit4.class)
|
@RunWith(JUnit4.class)
|
||||||
public class ServiceConfigUtilTest {
|
public class ServiceConfigUtilTest {
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Test
|
|
||||||
public void getBalancerPolicyNameFromLoadBalancingConfig() throws Exception {
|
|
||||||
String lbConfig = "{\"lbPolicy1\" : { \"key\" : \"val\" }}";
|
|
||||||
assertEquals(
|
|
||||||
"lbPolicy1",
|
|
||||||
ServiceConfigUtil.getBalancerPolicyNameFromLoadBalancingConfig(
|
|
||||||
(Map<String, Object>) JsonParser.parse(lbConfig)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Test
|
@Test
|
||||||
public void getBalancerNameFromXdsConfig() throws Exception {
|
public void getBalancerNameFromXdsConfig() throws Exception {
|
||||||
String lbConfig = "{\"xds_experimental\" : { "
|
String lbConfig = "{\"xds_experimental\" : { "
|
||||||
|
|
@ -51,10 +41,9 @@ public class ServiceConfigUtilTest {
|
||||||
assertEquals(
|
assertEquals(
|
||||||
"dns:///balancer.example.com:8080",
|
"dns:///balancer.example.com:8080",
|
||||||
ServiceConfigUtil.getBalancerNameFromXdsConfig(
|
ServiceConfigUtil.getBalancerNameFromXdsConfig(
|
||||||
(Map<String, Object>) JsonParser.parse(lbConfig)));
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfig))));
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Test
|
@Test
|
||||||
public void getChildPolicyFromXdsConfig() throws Exception {
|
public void getChildPolicyFromXdsConfig() throws Exception {
|
||||||
String lbConfig = "{\"xds_experimental\" : { "
|
String lbConfig = "{\"xds_experimental\" : { "
|
||||||
|
|
@ -62,18 +51,17 @@ public class ServiceConfigUtilTest {
|
||||||
+ "\"childPolicy\" : [{\"round_robin\" : {}}, {\"lbPolicy2\" : {\"key\" : \"val\"}}],"
|
+ "\"childPolicy\" : [{\"round_robin\" : {}}, {\"lbPolicy2\" : {\"key\" : \"val\"}}],"
|
||||||
+ "\"fallbackPolicy\" : [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"lbPolicy4\" : {}}]"
|
+ "\"fallbackPolicy\" : [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"lbPolicy4\" : {}}]"
|
||||||
+ "}}";
|
+ "}}";
|
||||||
Map<String, Object> expectedChildPolicy1 = (Map<String, Object>) JsonParser.parse(
|
LbConfig expectedChildPolicy1 = ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||||
"{\"round_robin\" : {}}");
|
JsonParser.parse("{\"round_robin\" : {}}"));
|
||||||
Map<String, Object> expectedChildPolicy2 = (Map<String, Object>) JsonParser.parse(
|
LbConfig expectedChildPolicy2 = ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||||
"{\"lbPolicy2\" : {\"key\" : \"val\"}}");
|
JsonParser.parse("{\"lbPolicy2\" : {\"key\" : \"val\"}}"));
|
||||||
|
|
||||||
List<Map<String, Object>> childPolicies = ServiceConfigUtil.getChildPolicyFromXdsConfig(
|
List<LbConfig> childPolicies = ServiceConfigUtil.getChildPolicyFromXdsConfig(
|
||||||
(Map<String, Object>) JsonParser.parse(lbConfig));
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfig)));
|
||||||
|
|
||||||
assertThat(childPolicies).containsExactly(expectedChildPolicy1, expectedChildPolicy2);
|
assertThat(childPolicies).containsExactly(expectedChildPolicy1, expectedChildPolicy2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Test
|
@Test
|
||||||
public void getChildPolicyFromXdsConfig_null() throws Exception {
|
public void getChildPolicyFromXdsConfig_null() throws Exception {
|
||||||
String lbConfig = "{\"xds_experimental\" : { "
|
String lbConfig = "{\"xds_experimental\" : { "
|
||||||
|
|
@ -81,13 +69,12 @@ public class ServiceConfigUtilTest {
|
||||||
+ "\"fallbackPolicy\" : [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"lbPolicy4\" : {}}]"
|
+ "\"fallbackPolicy\" : [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"lbPolicy4\" : {}}]"
|
||||||
+ "}}";
|
+ "}}";
|
||||||
|
|
||||||
List<Map<String, Object>> childPolicies = ServiceConfigUtil.getChildPolicyFromXdsConfig(
|
List<LbConfig> childPolicies = ServiceConfigUtil.getChildPolicyFromXdsConfig(
|
||||||
(Map<String, Object>) JsonParser.parse(lbConfig));
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfig)));
|
||||||
|
|
||||||
assertThat(childPolicies).isNull();
|
assertThat(childPolicies).isNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Test
|
@Test
|
||||||
public void getFallbackPolicyFromXdsConfig() throws Exception {
|
public void getFallbackPolicyFromXdsConfig() throws Exception {
|
||||||
String lbConfig = "{\"xds_experimental\" : { "
|
String lbConfig = "{\"xds_experimental\" : { "
|
||||||
|
|
@ -95,18 +82,17 @@ public class ServiceConfigUtilTest {
|
||||||
+ "\"childPolicy\" : [{\"round_robin\" : {}}, {\"lbPolicy2\" : {\"key\" : \"val\"}}],"
|
+ "\"childPolicy\" : [{\"round_robin\" : {}}, {\"lbPolicy2\" : {\"key\" : \"val\"}}],"
|
||||||
+ "\"fallbackPolicy\" : [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"lbPolicy4\" : {}}]"
|
+ "\"fallbackPolicy\" : [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"lbPolicy4\" : {}}]"
|
||||||
+ "}}";
|
+ "}}";
|
||||||
Map<String, Object> expectedFallbackPolicy1 = (Map<String, Object>) JsonParser.parse(
|
LbConfig expectedFallbackPolicy1 = ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||||
"{\"lbPolicy3\" : {\"key\" : \"val\"}}");
|
JsonParser.parse("{\"lbPolicy3\" : {\"key\" : \"val\"}}"));
|
||||||
Map<String, Object> expectedFallbackPolicy2 = (Map<String, Object>) JsonParser.parse(
|
LbConfig expectedFallbackPolicy2 = ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||||
"{\"lbPolicy4\" : {}}");
|
JsonParser.parse("{\"lbPolicy4\" : {}}"));
|
||||||
|
|
||||||
List<Map<String, Object>> childPolicies = ServiceConfigUtil.getFallbackPolicyFromXdsConfig(
|
List<LbConfig> childPolicies = ServiceConfigUtil.getFallbackPolicyFromXdsConfig(
|
||||||
(Map<String, Object>) JsonParser.parse(lbConfig));
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfig)));
|
||||||
|
|
||||||
assertThat(childPolicies).containsExactly(expectedFallbackPolicy1, expectedFallbackPolicy2);
|
assertThat(childPolicies).containsExactly(expectedFallbackPolicy1, expectedFallbackPolicy2);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Test
|
@Test
|
||||||
public void getFallbackPolicyFromXdsConfig_null() throws Exception {
|
public void getFallbackPolicyFromXdsConfig_null() throws Exception {
|
||||||
String lbConfig = "{\"xds_experimental\" : { "
|
String lbConfig = "{\"xds_experimental\" : { "
|
||||||
|
|
@ -114,9 +100,127 @@ public class ServiceConfigUtilTest {
|
||||||
+ "\"childPolicy\" : [{\"round_robin\" : {}}, {\"lbPolicy2\" : {\"key\" : \"val\"}}]"
|
+ "\"childPolicy\" : [{\"round_robin\" : {}}, {\"lbPolicy2\" : {\"key\" : \"val\"}}]"
|
||||||
+ "}}";
|
+ "}}";
|
||||||
|
|
||||||
List<Map<String, Object>> fallbackPolicies = ServiceConfigUtil.getFallbackPolicyFromXdsConfig(
|
List<LbConfig> fallbackPolicies = ServiceConfigUtil.getFallbackPolicyFromXdsConfig(
|
||||||
(Map<String, Object>) JsonParser.parse(lbConfig));
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfig)));
|
||||||
|
|
||||||
assertThat(fallbackPolicies).isNull();
|
assertThat(fallbackPolicies).isNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void unwrapLoadBalancingConfig() throws Exception {
|
||||||
|
String lbConfig = "{\"xds_experimental\" : { "
|
||||||
|
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
|
||||||
|
+ "\"childPolicy\" : [{\"round_robin\" : {}}, {\"lbPolicy2\" : {\"key\" : \"val\"}}]"
|
||||||
|
+ "}}";
|
||||||
|
|
||||||
|
LbConfig config = ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfig));
|
||||||
|
assertThat(config.getPolicyName()).isEqualTo("xds_experimental");
|
||||||
|
assertThat(config.getRawConfigValue()).isEqualTo(JsonParser.parse(
|
||||||
|
"{\"balancerName\" : \"dns:///balancer.example.com:8080\","
|
||||||
|
+ "\"childPolicy\" : [{\"round_robin\" : {}}, {\"lbPolicy2\" : {\"key\" : \"val\"}}]"
|
||||||
|
+ "}"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void unwrapLoadBalancingConfig_failOnTooManyFields() throws Exception {
|
||||||
|
// A LoadBalancingConfig should not have more than one field.
|
||||||
|
String lbConfig = "{\"xds_experimental\" : { "
|
||||||
|
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
|
||||||
|
+ "\"childPolicy\" : [{\"round_robin\" : {}}, {\"lbPolicy2\" : {\"key\" : \"val\"}}]"
|
||||||
|
+ "},"
|
||||||
|
+ "\"grpclb\" : {} }";
|
||||||
|
try {
|
||||||
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfig));
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertThat(e).hasMessageThat().contains("There are 2 fields");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void unwrapLoadBalancingConfig_failOnEmptyObject() throws Exception {
|
||||||
|
// A LoadBalancingConfig should not exactly one field.
|
||||||
|
String lbConfig = "{}";
|
||||||
|
try {
|
||||||
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfig));
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertThat(e).hasMessageThat().contains("There are 0 fields");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void unwrapLoadBalancingConfig_failOnList() throws Exception {
|
||||||
|
// A LoadBalancingConfig must be a JSON dictionary (map)
|
||||||
|
String lbConfig = "[ { \"xds\" : {} } ]";
|
||||||
|
try {
|
||||||
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfig));
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertThat(e).hasMessageThat().contains("Invalid type");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void unwrapLoadBalancingConfig_failOnString() throws Exception {
|
||||||
|
// A LoadBalancingConfig must be a JSON dictionary (map)
|
||||||
|
String lbConfig = "\"xds\"";
|
||||||
|
try {
|
||||||
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfig));
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertThat(e).hasMessageThat().contains("Invalid type");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void unwrapLoadBalancingConfig_failWhenConfigIsString() throws Exception {
|
||||||
|
// The value of the config should be a JSON dictionary (map)
|
||||||
|
String lbConfig = "{ \"xds\" : \"I thought I was a config.\" }";
|
||||||
|
try {
|
||||||
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfig));
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertThat(e).hasMessageThat().contains("Invalid value type");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void unwrapLoadBalancingConfigList() throws Exception {
|
||||||
|
String lbConfig = "[ "
|
||||||
|
+ "{\"xds_experimental\" : {\"balancerName\" : \"dns:///balancer.example.com:8080\"} },"
|
||||||
|
+ "{\"grpclb\" : {} } ]";
|
||||||
|
List<LbConfig> configs =
|
||||||
|
ServiceConfigUtil.unwrapLoadBalancingConfigList(JsonParser.parse(lbConfig));
|
||||||
|
assertThat(configs).containsExactly(
|
||||||
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(
|
||||||
|
"{\"xds_experimental\" : "
|
||||||
|
+ "{\"balancerName\" : \"dns:///balancer.example.com:8080\"} }")),
|
||||||
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(
|
||||||
|
"{\"grpclb\" : {} }"))).inOrder();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void unwrapLoadBalancingConfigList_failOnObject() throws Exception {
|
||||||
|
String notAList = "{}";
|
||||||
|
try {
|
||||||
|
ServiceConfigUtil.unwrapLoadBalancingConfigList(JsonParser.parse(notAList));
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertThat(e).hasMessageThat().contains("List expected");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void unwrapLoadBalancingConfigList_failOnMalformedConfig() throws Exception {
|
||||||
|
String lbConfig = "[ "
|
||||||
|
+ "{\"xds_experimental\" : \"I thought I was a config\" },"
|
||||||
|
+ "{\"grpclb\" : {} } ]";
|
||||||
|
try {
|
||||||
|
ServiceConfigUtil.unwrapLoadBalancingConfigList(JsonParser.parse(lbConfig));
|
||||||
|
fail("Should throw");
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertThat(e).hasMessageThat().contains("Invalid value type");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,10 +26,10 @@ import io.grpc.LoadBalancer.Helper;
|
||||||
import io.grpc.LoadBalancer.Subchannel;
|
import io.grpc.LoadBalancer.Subchannel;
|
||||||
import io.grpc.ManagedChannel;
|
import io.grpc.ManagedChannel;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
|
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||||
import io.grpc.xds.XdsComms.AdsStreamCallback;
|
import io.grpc.xds.XdsComms.AdsStreamCallback;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
|
@ -54,7 +54,7 @@ class XdsLbState {
|
||||||
final String balancerName;
|
final String balancerName;
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
final Map<String, Object> childPolicy;
|
final LbConfig childPolicy;
|
||||||
|
|
||||||
private final SubchannelStore subchannelStore;
|
private final SubchannelStore subchannelStore;
|
||||||
private final Helper helper;
|
private final Helper helper;
|
||||||
|
|
@ -66,7 +66,7 @@ class XdsLbState {
|
||||||
|
|
||||||
XdsLbState(
|
XdsLbState(
|
||||||
String balancerName,
|
String balancerName,
|
||||||
@Nullable Map<String, Object> childPolicy,
|
@Nullable LbConfig childPolicy,
|
||||||
@Nullable XdsComms xdsComms,
|
@Nullable XdsComms xdsComms,
|
||||||
Helper helper,
|
Helper helper,
|
||||||
SubchannelStore subchannelStore,
|
SubchannelStore subchannelStore,
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ package io.grpc.xds;
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
import static io.grpc.ConnectivityState.IDLE;
|
import static io.grpc.ConnectivityState.IDLE;
|
||||||
import static io.grpc.ConnectivityState.SHUTDOWN;
|
import static io.grpc.ConnectivityState.SHUTDOWN;
|
||||||
import static io.grpc.internal.ServiceConfigUtil.getBalancerPolicyNameFromLoadBalancingConfig;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
@ -33,6 +32,7 @@ import io.grpc.LoadBalancerRegistry;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.SynchronizationContext.ScheduledHandle;
|
import io.grpc.SynchronizationContext.ScheduledHandle;
|
||||||
import io.grpc.internal.ServiceConfigUtil;
|
import io.grpc.internal.ServiceConfigUtil;
|
||||||
|
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||||
import io.grpc.xds.XdsComms.AdsStreamCallback;
|
import io.grpc.xds.XdsComms.AdsStreamCallback;
|
||||||
import io.grpc.xds.XdsLbState.SubchannelStore;
|
import io.grpc.xds.XdsLbState.SubchannelStore;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
@ -51,8 +51,8 @@ final class XdsLoadBalancer extends LoadBalancer {
|
||||||
static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
|
static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
|
||||||
Attributes.Key.create("io.grpc.xds.XdsLoadBalancer.stateInfo");
|
Attributes.Key.create("io.grpc.xds.XdsLoadBalancer.stateInfo");
|
||||||
|
|
||||||
private static final ImmutableMap<String, Object> DEFAULT_FALLBACK_POLICY =
|
private static final LbConfig DEFAULT_FALLBACK_POLICY =
|
||||||
ImmutableMap.of("round_robin", (Object) ImmutableMap.<String, Object>of());
|
new LbConfig("round_robin", ImmutableMap.<String, Object>of());
|
||||||
|
|
||||||
private final SubchannelStore subchannelStore;
|
private final SubchannelStore subchannelStore;
|
||||||
private final Helper helper;
|
private final Helper helper;
|
||||||
|
|
@ -77,7 +77,7 @@ final class XdsLoadBalancer extends LoadBalancer {
|
||||||
@Nullable
|
@Nullable
|
||||||
private XdsLbState xdsLbState;
|
private XdsLbState xdsLbState;
|
||||||
|
|
||||||
private Map<String, Object> fallbackPolicy;
|
private LbConfig fallbackPolicy;
|
||||||
|
|
||||||
XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, SubchannelStore subchannelStore) {
|
XdsLoadBalancer(Helper helper, LoadBalancerRegistry lbRegistry, SubchannelStore subchannelStore) {
|
||||||
this.helper = checkNotNull(helper, "helper");
|
this.helper = checkNotNull(helper, "helper");
|
||||||
|
|
@ -89,8 +89,9 @@ final class XdsLoadBalancer extends LoadBalancer {
|
||||||
@Override
|
@Override
|
||||||
public void handleResolvedAddressGroups(
|
public void handleResolvedAddressGroups(
|
||||||
List<EquivalentAddressGroup> servers, Attributes attributes) {
|
List<EquivalentAddressGroup> servers, Attributes attributes) {
|
||||||
Map<String, Object> newLbConfig = checkNotNull(
|
Map<String, Object> newRawLbConfig = checkNotNull(
|
||||||
attributes.get(ATTR_LOAD_BALANCING_CONFIG), "ATTR_LOAD_BALANCING_CONFIG not available");
|
attributes.get(ATTR_LOAD_BALANCING_CONFIG), "ATTR_LOAD_BALANCING_CONFIG not available");
|
||||||
|
LbConfig newLbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(newRawLbConfig);
|
||||||
fallbackPolicy = selectFallbackPolicy(newLbConfig, lbRegistry);
|
fallbackPolicy = selectFallbackPolicy(newLbConfig, lbRegistry);
|
||||||
fallbackManager.updateFallbackServers(servers, attributes, fallbackPolicy);
|
fallbackManager.updateFallbackServers(servers, attributes, fallbackPolicy);
|
||||||
fallbackManager.maybeStartFallbackTimer();
|
fallbackManager.maybeStartFallbackTimer();
|
||||||
|
|
@ -98,9 +99,9 @@ final class XdsLoadBalancer extends LoadBalancer {
|
||||||
xdsLbState.handleResolvedAddressGroups(servers, attributes);
|
xdsLbState.handleResolvedAddressGroups(servers, attributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleNewConfig(Map<String, Object> newLbConfig) {
|
private void handleNewConfig(LbConfig newLbConfig) {
|
||||||
String newBalancerName = ServiceConfigUtil.getBalancerNameFromXdsConfig(newLbConfig);
|
String newBalancerName = ServiceConfigUtil.getBalancerNameFromXdsConfig(newLbConfig);
|
||||||
Map<String, Object> childPolicy = selectChildPolicy(newLbConfig, lbRegistry);
|
LbConfig childPolicy = selectChildPolicy(newLbConfig, lbRegistry);
|
||||||
XdsComms xdsComms = null;
|
XdsComms xdsComms = null;
|
||||||
if (xdsLbState != null) { // may release and re-use/shutdown xdsComms from current xdsLbState
|
if (xdsLbState != null) { // may release and re-use/shutdown xdsComms from current xdsLbState
|
||||||
if (!newBalancerName.equals(xdsLbState.balancerName)) {
|
if (!newBalancerName.equals(xdsLbState.balancerName)) {
|
||||||
|
|
@ -130,43 +131,37 @@ final class XdsLoadBalancer extends LoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private static String getPolicyNameOrNull(@Nullable Map<String, Object> config) {
|
private static String getPolicyNameOrNull(@Nullable LbConfig config) {
|
||||||
if (config == null) {
|
if (config == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return getBalancerPolicyNameFromLoadBalancingConfig(config);
|
return config.getPolicyName();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static Map<String, Object> selectChildPolicy(
|
static LbConfig selectChildPolicy(LbConfig lbConfig, LoadBalancerRegistry lbRegistry) {
|
||||||
Map<String, Object> lbConfig, LoadBalancerRegistry lbRegistry) {
|
List<LbConfig> childConfigs = ServiceConfigUtil.getChildPolicyFromXdsConfig(lbConfig);
|
||||||
List<Map<String, Object>> childConfigs =
|
|
||||||
ServiceConfigUtil.getChildPolicyFromXdsConfig(lbConfig);
|
|
||||||
return selectSupportedLbPolicy(childConfigs, lbRegistry);
|
return selectSupportedLbPolicy(childConfigs, lbRegistry);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static Map<String, Object> selectFallbackPolicy(
|
static LbConfig selectFallbackPolicy(LbConfig lbConfig, LoadBalancerRegistry lbRegistry) {
|
||||||
Map<String, Object> lbConfig, LoadBalancerRegistry lbRegistry) {
|
List<LbConfig> fallbackConfigs = ServiceConfigUtil.getFallbackPolicyFromXdsConfig(lbConfig);
|
||||||
List<Map<String, Object>> fallbackConfigs =
|
LbConfig fallbackPolicy = selectSupportedLbPolicy(fallbackConfigs, lbRegistry);
|
||||||
ServiceConfigUtil.getFallbackPolicyFromXdsConfig(lbConfig);
|
|
||||||
Map<String, Object> fallbackPolicy = selectSupportedLbPolicy(fallbackConfigs, lbRegistry);
|
|
||||||
return fallbackPolicy == null ? DEFAULT_FALLBACK_POLICY : fallbackPolicy;
|
return fallbackPolicy == null ? DEFAULT_FALLBACK_POLICY : fallbackPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private static Map<String, Object> selectSupportedLbPolicy(
|
private static LbConfig selectSupportedLbPolicy(
|
||||||
List<Map<String, Object>> lbConfigs, LoadBalancerRegistry lbRegistry) {
|
@Nullable List<LbConfig> lbConfigs, LoadBalancerRegistry lbRegistry) {
|
||||||
if (lbConfigs == null) {
|
if (lbConfigs == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
for (Object lbConfig : lbConfigs) {
|
for (LbConfig lbConfig : lbConfigs) {
|
||||||
@SuppressWarnings("unchecked")
|
String lbPolicy = lbConfig.getPolicyName();
|
||||||
Map<String, Object> candidate = (Map<String, Object>) lbConfig;
|
|
||||||
String lbPolicy = ServiceConfigUtil.getBalancerPolicyNameFromLoadBalancingConfig(candidate);
|
|
||||||
if (lbRegistry.getProvider(lbPolicy) != null) {
|
if (lbRegistry.getProvider(lbPolicy) != null) {
|
||||||
return candidate;
|
return lbConfig;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
|
@ -239,7 +234,7 @@ final class XdsLoadBalancer extends LoadBalancer {
|
||||||
private final SubchannelStore subchannelStore;
|
private final SubchannelStore subchannelStore;
|
||||||
private final LoadBalancerRegistry lbRegistry;
|
private final LoadBalancerRegistry lbRegistry;
|
||||||
|
|
||||||
private Map<String, Object> fallbackPolicy;
|
private LbConfig fallbackPolicy;
|
||||||
|
|
||||||
// read-only for outer class
|
// read-only for outer class
|
||||||
private LoadBalancer fallbackBalancer;
|
private LoadBalancer fallbackBalancer;
|
||||||
|
|
@ -281,9 +276,7 @@ final class XdsLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
helper.getChannelLogger().log(
|
helper.getChannelLogger().log(
|
||||||
ChannelLogLevel.INFO, "Using fallback policy");
|
ChannelLogLevel.INFO, "Using fallback policy");
|
||||||
String fallbackPolicyName = ServiceConfigUtil.getBalancerPolicyNameFromLoadBalancingConfig(
|
fallbackBalancer = lbRegistry.getProvider(fallbackPolicy.getPolicyName())
|
||||||
fallbackPolicy);
|
|
||||||
fallbackBalancer = lbRegistry.getProvider(fallbackPolicyName)
|
|
||||||
.newLoadBalancer(helper);
|
.newLoadBalancer(helper);
|
||||||
fallbackBalancer.handleResolvedAddressGroups(fallbackServers, fallbackAttributes);
|
fallbackBalancer.handleResolvedAddressGroups(fallbackServers, fallbackAttributes);
|
||||||
// TODO: maybe update picker
|
// TODO: maybe update picker
|
||||||
|
|
@ -291,20 +284,16 @@ final class XdsLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
void updateFallbackServers(
|
void updateFallbackServers(
|
||||||
List<EquivalentAddressGroup> servers, Attributes attributes,
|
List<EquivalentAddressGroup> servers, Attributes attributes,
|
||||||
Map<String, Object> fallbackPolicy) {
|
LbConfig fallbackPolicy) {
|
||||||
this.fallbackServers = servers;
|
this.fallbackServers = servers;
|
||||||
this.fallbackAttributes = Attributes.newBuilder()
|
this.fallbackAttributes = Attributes.newBuilder()
|
||||||
.setAll(attributes)
|
.setAll(attributes)
|
||||||
.set(ATTR_LOAD_BALANCING_CONFIG, fallbackPolicy)
|
.set(ATTR_LOAD_BALANCING_CONFIG, fallbackPolicy.getRawConfigValue())
|
||||||
.build();
|
.build();
|
||||||
Map<String, Object> currentFallbackPolicy = this.fallbackPolicy;
|
LbConfig currentFallbackPolicy = this.fallbackPolicy;
|
||||||
this.fallbackPolicy = fallbackPolicy;
|
this.fallbackPolicy = fallbackPolicy;
|
||||||
if (fallbackBalancer != null) {
|
if (fallbackBalancer != null) {
|
||||||
String currentPolicyName =
|
if (fallbackPolicy.getPolicyName().equals(currentFallbackPolicy.getPolicyName())) {
|
||||||
ServiceConfigUtil.getBalancerPolicyNameFromLoadBalancingConfig(currentFallbackPolicy);
|
|
||||||
String newPolicyName =
|
|
||||||
ServiceConfigUtil.getBalancerPolicyNameFromLoadBalancingConfig(fallbackPolicy);
|
|
||||||
if (newPolicyName.equals(currentPolicyName)) {
|
|
||||||
fallbackBalancer.handleResolvedAddressGroups(fallbackServers, fallbackAttributes);
|
fallbackBalancer.handleResolvedAddressGroups(fallbackServers, fallbackAttributes);
|
||||||
} else {
|
} else {
|
||||||
fallbackBalancer.shutdown();
|
fallbackBalancer.shutdown();
|
||||||
|
|
|
||||||
|
|
@ -32,12 +32,12 @@ import io.grpc.LoadBalancerProvider;
|
||||||
import io.grpc.LoadBalancerRegistry;
|
import io.grpc.LoadBalancerRegistry;
|
||||||
import io.grpc.SynchronizationContext;
|
import io.grpc.SynchronizationContext;
|
||||||
import io.grpc.internal.FakeClock;
|
import io.grpc.internal.FakeClock;
|
||||||
|
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||||
import io.grpc.xds.XdsLbState.SubchannelStoreImpl;
|
import io.grpc.xds.XdsLbState.SubchannelStoreImpl;
|
||||||
import io.grpc.xds.XdsLoadBalancer.FallbackManager;
|
import io.grpc.xds.XdsLoadBalancer.FallbackManager;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
@ -97,7 +97,7 @@ public class FallbackManagerTest {
|
||||||
private ChannelLogger channelLogger;
|
private ChannelLogger channelLogger;
|
||||||
|
|
||||||
private FallbackManager fallbackManager;
|
private FallbackManager fallbackManager;
|
||||||
private Map<String, Object> fallbackPolicy;
|
private LbConfig fallbackPolicy;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
|
|
@ -106,8 +106,7 @@ public class FallbackManagerTest {
|
||||||
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
|
doReturn(fakeClock.getScheduledExecutorService()).when(helper).getScheduledExecutorService();
|
||||||
doReturn(channelLogger).when(helper).getChannelLogger();
|
doReturn(channelLogger).when(helper).getChannelLogger();
|
||||||
fallbackManager = new FallbackManager(helper, new SubchannelStoreImpl(), lbRegistry);
|
fallbackManager = new FallbackManager(helper, new SubchannelStoreImpl(), lbRegistry);
|
||||||
fallbackPolicy = new HashMap<>();
|
fallbackPolicy = new LbConfig("test_policy", new HashMap<String, Object>());
|
||||||
fallbackPolicy.put("test_policy", new HashMap<>());
|
|
||||||
lbRegistry.register(fakeLbProvider);
|
lbRegistry.register(fakeLbProvider);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -131,7 +130,7 @@ public class FallbackManagerTest {
|
||||||
verify(fakeLb).handleResolvedAddressGroups(
|
verify(fakeLb).handleResolvedAddressGroups(
|
||||||
same(eags),
|
same(eags),
|
||||||
eq(Attributes.newBuilder()
|
eq(Attributes.newBuilder()
|
||||||
.set(LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, fallbackPolicy)
|
.set(LoadBalancer.ATTR_LOAD_BALANCING_CONFIG, fallbackPolicy.getRawConfigValue())
|
||||||
.build()));
|
.build()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -52,13 +52,14 @@ import io.grpc.inprocess.InProcessChannelBuilder;
|
||||||
import io.grpc.inprocess.InProcessServerBuilder;
|
import io.grpc.inprocess.InProcessServerBuilder;
|
||||||
import io.grpc.internal.FakeClock;
|
import io.grpc.internal.FakeClock;
|
||||||
import io.grpc.internal.JsonParser;
|
import io.grpc.internal.JsonParser;
|
||||||
|
import io.grpc.internal.ServiceConfigUtil;
|
||||||
|
import io.grpc.internal.ServiceConfigUtil.LbConfig;
|
||||||
import io.grpc.internal.testing.StreamRecorder;
|
import io.grpc.internal.testing.StreamRecorder;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
import io.grpc.testing.GrpcCleanupRule;
|
import io.grpc.testing.GrpcCleanupRule;
|
||||||
import io.grpc.xds.XdsLbState.SubchannelStore;
|
import io.grpc.xds.XdsLbState.SubchannelStore;
|
||||||
import io.grpc.xds.XdsLbState.SubchannelStoreImpl;
|
import io.grpc.xds.XdsLbState.SubchannelStoreImpl;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
@ -254,13 +255,13 @@ public class XdsLoadBalancerTest {
|
||||||
+ "{\"supported_2\" : {\"key\" : \"val\"}}],"
|
+ "{\"supported_2\" : {\"key\" : \"val\"}}],"
|
||||||
+ "\"fallbackPolicy\" : [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"lbPolicy4\" : {}}]"
|
+ "\"fallbackPolicy\" : [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"lbPolicy4\" : {}}]"
|
||||||
+ "}}";
|
+ "}}";
|
||||||
@SuppressWarnings("unchecked")
|
LbConfig expectedChildPolicy =
|
||||||
Map<String, Object> expectedChildPolicy = (Map<String, Object>) JsonParser.parse(
|
ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||||
"{\"supported_1\" : {\"key\" : \"val\"}}");
|
JsonParser.parse("{\"supported_1\" : {\"key\" : \"val\"}}"));
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
LbConfig childPolicy = XdsLoadBalancer
|
||||||
Map<String, Object> childPolicy = XdsLoadBalancer
|
.selectChildPolicy(
|
||||||
.selectChildPolicy((Map<String, Object>) JsonParser.parse(lbConfigRaw), lbRegistry);
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfigRaw)), lbRegistry);
|
||||||
|
|
||||||
assertEquals(expectedChildPolicy, childPolicy);
|
assertEquals(expectedChildPolicy, childPolicy);
|
||||||
}
|
}
|
||||||
|
|
@ -273,13 +274,11 @@ public class XdsLoadBalancerTest {
|
||||||
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}},"
|
+ "\"fallbackPolicy\" : [{\"unsupported\" : {}}, {\"supported_1\" : {\"key\" : \"val\"}},"
|
||||||
+ "{\"supported_2\" : {\"key\" : \"val\"}}]"
|
+ "{\"supported_2\" : {\"key\" : \"val\"}}]"
|
||||||
+ "}}";
|
+ "}}";
|
||||||
@SuppressWarnings("unchecked")
|
LbConfig expectedFallbackPolicy = ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||||
Map<String, Object> expectedFallbackPolicy = (Map<String, Object>) JsonParser.parse(
|
JsonParser.parse("{\"supported_1\" : {\"key\" : \"val\"}}"));
|
||||||
"{\"supported_1\" : {\"key\" : \"val\"}}");
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
LbConfig fallbackPolicy = XdsLoadBalancer.selectFallbackPolicy(
|
||||||
Map<String, Object> fallbackPolicy = XdsLoadBalancer
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfigRaw)), lbRegistry);
|
||||||
.selectFallbackPolicy((Map<String, Object>) JsonParser.parse(lbConfigRaw), lbRegistry);
|
|
||||||
|
|
||||||
assertEquals(expectedFallbackPolicy, fallbackPolicy);
|
assertEquals(expectedFallbackPolicy, fallbackPolicy);
|
||||||
}
|
}
|
||||||
|
|
@ -290,13 +289,11 @@ public class XdsLoadBalancerTest {
|
||||||
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
|
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
|
||||||
+ "\"childPolicy\" : [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"lbPolicy4\" : {}}]"
|
+ "\"childPolicy\" : [{\"lbPolicy3\" : {\"key\" : \"val\"}}, {\"lbPolicy4\" : {}}]"
|
||||||
+ "}}";
|
+ "}}";
|
||||||
@SuppressWarnings("unchecked")
|
LbConfig expectedFallbackPolicy = ServiceConfigUtil.unwrapLoadBalancingConfig(
|
||||||
Map<String, Object> expectedFallbackPolicy = (Map<String, Object>) JsonParser.parse(
|
JsonParser.parse("{\"round_robin\" : {}}"));
|
||||||
"{\"round_robin\" : {}}");
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
LbConfig fallbackPolicy = XdsLoadBalancer.selectFallbackPolicy(
|
||||||
Map<String, Object> fallbackPolicy = XdsLoadBalancer
|
ServiceConfigUtil.unwrapLoadBalancingConfig(JsonParser.parse(lbConfigRaw)), lbRegistry);
|
||||||
.selectFallbackPolicy((Map<String, Object>) JsonParser.parse(lbConfigRaw), lbRegistry);
|
|
||||||
|
|
||||||
assertEquals(expectedFallbackPolicy, fallbackPolicy);
|
assertEquals(expectedFallbackPolicy, fallbackPolicy);
|
||||||
}
|
}
|
||||||
|
|
@ -508,7 +505,7 @@ public class XdsLoadBalancerTest {
|
||||||
verify(fakeBalancer1).handleResolvedAddressGroups(
|
verify(fakeBalancer1).handleResolvedAddressGroups(
|
||||||
Matchers.<List<EquivalentAddressGroup>>any(), captor.capture());
|
Matchers.<List<EquivalentAddressGroup>>any(), captor.capture());
|
||||||
assertThat(captor.getValue().get(ATTR_LOAD_BALANCING_CONFIG))
|
assertThat(captor.getValue().get(ATTR_LOAD_BALANCING_CONFIG))
|
||||||
.containsExactly("supported_1", new HashMap<String, Object>());
|
.containsExactly("supported_1_option", "yes");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -534,7 +531,7 @@ public class XdsLoadBalancerTest {
|
||||||
verify(fakeBalancer1).handleResolvedAddressGroups(
|
verify(fakeBalancer1).handleResolvedAddressGroups(
|
||||||
Matchers.<List<EquivalentAddressGroup>>any(), captor.capture());
|
Matchers.<List<EquivalentAddressGroup>>any(), captor.capture());
|
||||||
assertThat(captor.getValue().get(ATTR_LOAD_BALANCING_CONFIG))
|
assertThat(captor.getValue().get(ATTR_LOAD_BALANCING_CONFIG))
|
||||||
.containsExactly("supported_1", new HashMap<String, Object>());
|
.containsExactly("supported_1_option", "yes");
|
||||||
|
|
||||||
assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1);
|
assertThat(fakeClock.forwardTime(10, TimeUnit.SECONDS)).isEqualTo(1);
|
||||||
assertThat(fakeClock.getPendingTasks()).isEmpty();
|
assertThat(fakeClock.getPendingTasks()).isEmpty();
|
||||||
|
|
@ -582,13 +579,13 @@ public class XdsLoadBalancerTest {
|
||||||
verify(fakeBalancer1).handleResolvedAddressGroups(
|
verify(fakeBalancer1).handleResolvedAddressGroups(
|
||||||
Matchers.<List<EquivalentAddressGroup>>any(), captor.capture());
|
Matchers.<List<EquivalentAddressGroup>>any(), captor.capture());
|
||||||
assertThat(captor.getValue().get(ATTR_LOAD_BALANCING_CONFIG))
|
assertThat(captor.getValue().get(ATTR_LOAD_BALANCING_CONFIG))
|
||||||
.containsExactly("supported_1", new HashMap<String, Object>());
|
.containsExactly("supported_1_option", "yes");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Attributes standardModeWithFallback1Attributes() throws Exception {
|
private static Attributes standardModeWithFallback1Attributes() throws Exception {
|
||||||
String lbConfigRaw = "{\"xds_experimental\" : { "
|
String lbConfigRaw = "{\"xds_experimental\" : { "
|
||||||
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
|
+ "\"balancerName\" : \"dns:///balancer.example.com:8080\","
|
||||||
+ "\"fallbackPolicy\" : [{\"supported_1\" : {}}]"
|
+ "\"fallbackPolicy\" : [{\"supported_1\" : { \"supported_1_option\" : \"yes\"}}]"
|
||||||
+ "}}";
|
+ "}}";
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Map<String, Object> lbConfig = (Map<String, Object>) JsonParser.parse(lbConfigRaw);
|
Map<String, Object> lbConfig = (Map<String, Object>) JsonParser.parse(lbConfigRaw);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue