mirror of https://github.com/grpc/grpc-java.git
rls: allow defaultTarget in RouteLookupConfig unset
The `default_target` field can be unset per the [spec](http://go/grpc-rls-lb-policy-design) Also fixed a synchronization bug (related to #7460) that `createOrGet()` should be guarded by lock.
This commit is contained in:
parent
f59cd0a599
commit
cc5403c4c9
|
|
@ -885,6 +885,8 @@ final class CachingRlsLbClient {
|
|||
headers.discardAll(RLS_DATA_KEY);
|
||||
headers.put(RLS_DATA_KEY, response.getHeaderData());
|
||||
}
|
||||
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().getDefaultTarget();
|
||||
boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
|
||||
if (response.hasData()) {
|
||||
ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
|
||||
SubchannelPicker picker = childPolicyWrapper.getPicker();
|
||||
|
|
@ -895,9 +897,15 @@ final class CachingRlsLbClient {
|
|||
if (result.getStatus().isOk()) {
|
||||
return result;
|
||||
}
|
||||
return useFallback(args);
|
||||
if (hasFallback) {
|
||||
return useFallback(args);
|
||||
}
|
||||
return PickResult.withError(result.getStatus());
|
||||
} else if (response.hasError()) {
|
||||
return useFallback(args);
|
||||
if (hasFallback) {
|
||||
return useFallback(args);
|
||||
}
|
||||
return PickResult.withError(response.getStatus());
|
||||
} else {
|
||||
return PickResult.withNoResult();
|
||||
}
|
||||
|
|
@ -907,12 +915,8 @@ final class CachingRlsLbClient {
|
|||
|
||||
/** Uses Subchannel connected to default target. */
|
||||
private PickResult useFallback(PickSubchannelArgs args) {
|
||||
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().getDefaultTarget();
|
||||
if (fallbackChildPolicyWrapper == null
|
||||
|| !fallbackChildPolicyWrapper.getTarget().equals(defaultTarget)) {
|
||||
// TODO(creamsoup) wait until lb is ready
|
||||
startFallbackChildPolicy();
|
||||
}
|
||||
// TODO(creamsoup) wait until lb is ready
|
||||
startFallbackChildPolicy();
|
||||
SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
|
||||
if (picker == null) {
|
||||
return PickResult.withNoResult();
|
||||
|
|
@ -922,8 +926,12 @@ final class CachingRlsLbClient {
|
|||
|
||||
private void startFallbackChildPolicy() {
|
||||
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().getDefaultTarget();
|
||||
fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
|
||||
|
||||
synchronized (lock) {
|
||||
if (fallbackChildPolicyWrapper != null) {
|
||||
return;
|
||||
}
|
||||
fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
|
||||
}
|
||||
LoadBalancerProvider lbProvider =
|
||||
lbPolicyConfig.getLoadBalancingPolicy().getEffectiveLbProvider();
|
||||
final LoadBalancer lb =
|
||||
|
|
|
|||
|
|
@ -195,6 +195,7 @@ final class RlsProtoData {
|
|||
|
||||
private final ImmutableList<String> validTargets;
|
||||
|
||||
@Nullable
|
||||
private final String defaultTarget;
|
||||
|
||||
RouteLookupConfig(
|
||||
|
|
@ -205,6 +206,7 @@ final class RlsProtoData {
|
|||
@Nullable Long staleAgeInMillis,
|
||||
long cacheSizeBytes,
|
||||
List<String> validTargets,
|
||||
@Nullable
|
||||
String defaultTarget) {
|
||||
checkState(
|
||||
!checkNotNull(grpcKeyBuilders, "grpcKeyBuilders").isEmpty(),
|
||||
|
|
@ -233,7 +235,7 @@ final class RlsProtoData {
|
|||
checkArgument(cacheSizeBytes > 0, "cacheSize must be positive");
|
||||
this.cacheSizeBytes = cacheSizeBytes;
|
||||
this.validTargets = ImmutableList.copyOf(checkNotNull(validTargets, "validTargets"));
|
||||
this.defaultTarget = checkNotNull(defaultTarget, "defaultTarget");
|
||||
this.defaultTarget = defaultTarget;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ public class RlsLoadBalancerTest {
|
|||
public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();
|
||||
@Rule
|
||||
public final MockitoRule mocks = MockitoJUnit.rule();
|
||||
|
||||
private final RlsLoadBalancerProvider provider = new RlsLoadBalancerProvider();
|
||||
private final DoNotUseDirectScheduledExecutorService fakeScheduledExecutorService =
|
||||
mock(DoNotUseDirectScheduledExecutorService.class, CALLS_REAL_METHODS);
|
||||
private final SynchronizationContext syncContext =
|
||||
|
|
@ -119,6 +119,7 @@ public class RlsLoadBalancerTest {
|
|||
private MethodDescriptor<Object, Object> fakeRescueMethod;
|
||||
private RlsLoadBalancer rlsLb;
|
||||
private boolean existingEnableOobChannelDirectPath;
|
||||
private String defaultTarget = "defaultTarget";
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
|
@ -150,11 +151,6 @@ public class RlsLoadBalancerTest {
|
|||
"localhost:8972", "/com.google/Rescue", "grpc", ImmutableMap.<String, String>of()),
|
||||
new RouteLookupResponse(ImmutableList.of("civilization"), "you are safe")));
|
||||
|
||||
RlsLoadBalancerProvider provider = new RlsLoadBalancerProvider();
|
||||
ConfigOrError parsedConfigOrError =
|
||||
provider.parseLoadBalancingPolicyConfig(getServiceConfig());
|
||||
|
||||
assertThat(parsedConfigOrError.getConfig()).isNotNull();
|
||||
rlsLb = (RlsLoadBalancer) provider.newLoadBalancer(helper);
|
||||
rlsLb.cachingRlsLbClientBuilderProvider = new CachingRlsLbClientBuilderProvider() {
|
||||
@Override
|
||||
|
|
@ -163,11 +159,6 @@ public class RlsLoadBalancerTest {
|
|||
return CachingRlsLbClient.newBuilder();
|
||||
}
|
||||
};
|
||||
rlsLb.handleResolvedAddresses(ResolvedAddresses.newBuilder()
|
||||
.setAddresses(ImmutableList.of(new EquivalentAddressGroup(mock(SocketAddress.class))))
|
||||
.setLoadBalancingPolicyConfig(parsedConfigOrError.getConfig())
|
||||
.build());
|
||||
verify(helper).createResolvingOobChannelBuilder(anyString());
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
@ -177,7 +168,8 @@ public class RlsLoadBalancerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void lb_working() {
|
||||
public void lb_working_withDefaultTarget() throws Exception {
|
||||
deliverResolvedAddresses();
|
||||
InOrder inOrder = inOrder(helper);
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
|
||||
|
|
@ -261,9 +253,79 @@ public class RlsLoadBalancerTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void lb_nameResolutionFailed() {
|
||||
public void lb_working_withoutDefaultTarget() throws Exception {
|
||||
defaultTarget = "";
|
||||
deliverResolvedAddresses();
|
||||
InOrder inOrder = inOrder(helper);
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
|
||||
SubchannelPicker picker = pickerCaptor.getValue();
|
||||
Metadata headers = new Metadata();
|
||||
PickResult res = picker.pickSubchannel(
|
||||
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
|
||||
inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class));
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
assertThat(res.getStatus().isOk()).isTrue();
|
||||
|
||||
assertThat(subchannels).hasSize(1);
|
||||
FakeSubchannel searchSubchannel = subchannels.getLast();
|
||||
searchSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
|
||||
assertThat(res.getSubchannel().getAddresses()).isEqualTo(searchSubchannel.getAddresses());
|
||||
assertThat(res.getSubchannel().getAttributes()).isEqualTo(searchSubchannel.getAttributes());
|
||||
|
||||
// rescue should be pending status although the overall channel state is READY
|
||||
picker = pickerCaptor.getValue();
|
||||
res = picker.pickSubchannel(
|
||||
new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT));
|
||||
inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
|
||||
// other rls picker itself is ready due to first channel.
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
assertThat(res.getStatus().isOk()).isTrue();
|
||||
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
|
||||
assertThat(subchannels).hasSize(2);
|
||||
FakeSubchannel rescueSubchannel = subchannels.getLast();
|
||||
|
||||
// search subchannel is down, rescue subchannel is still connecting
|
||||
searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
|
||||
|
||||
rescueSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
|
||||
|
||||
// search method will fail because there is no fallback target.
|
||||
picker = pickerCaptor.getValue();
|
||||
res = picker.pickSubchannel(
|
||||
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
|
||||
assertThat(res.getStatus().isOk()).isFalse();
|
||||
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
|
||||
|
||||
res = picker.pickSubchannel(
|
||||
new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT));
|
||||
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
|
||||
assertThat(res.getSubchannel().getAddresses()).isEqualTo(rescueSubchannel.getAddresses());
|
||||
assertThat(res.getSubchannel().getAttributes()).isEqualTo(rescueSubchannel.getAttributes());
|
||||
|
||||
// all channels are failed
|
||||
rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void lb_nameResolutionFailed() throws Exception {
|
||||
deliverResolvedAddresses();
|
||||
InOrder inOrder = inOrder(helper);
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
|
||||
SubchannelPicker picker = pickerCaptor.getValue();
|
||||
|
|
@ -307,8 +369,19 @@ public class RlsLoadBalancerTest {
|
|||
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
|
||||
}
|
||||
|
||||
private void deliverResolvedAddresses() throws Exception {
|
||||
ConfigOrError parsedConfigOrError =
|
||||
provider.parseLoadBalancingPolicyConfig(getServiceConfig());
|
||||
assertThat(parsedConfigOrError.getConfig()).isNotNull();
|
||||
rlsLb.handleResolvedAddresses(ResolvedAddresses.newBuilder()
|
||||
.setAddresses(ImmutableList.of(new EquivalentAddressGroup(mock(SocketAddress.class))))
|
||||
.setLoadBalancingPolicyConfig(parsedConfigOrError.getConfig())
|
||||
.build());
|
||||
verify(helper).createResolvingOobChannelBuilder(anyString());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Map<String, Object> getServiceConfig() throws IOException {
|
||||
private Map<String, Object> getServiceConfig() throws IOException {
|
||||
String serviceConfig = "{"
|
||||
+ " \"routeLookupConfig\": " + getRlsConfigJsonStr() + ", "
|
||||
+ " \"childPolicy\": [{\"pick_first\": {}}],"
|
||||
|
|
@ -317,7 +390,7 @@ public class RlsLoadBalancerTest {
|
|||
return (Map<String, Object>) JsonParser.parse(serviceConfig);
|
||||
}
|
||||
|
||||
private static String getRlsConfigJsonStr() {
|
||||
private String getRlsConfigJsonStr() {
|
||||
return "{\n"
|
||||
+ " \"grpcKeyBuilders\": [\n"
|
||||
+ " {\n"
|
||||
|
|
@ -342,7 +415,7 @@ public class RlsLoadBalancerTest {
|
|||
+ " \"staleAge\": 240,\n"
|
||||
+ " \"validTargets\": [\"localhost:9001\", \"localhost:9002\"],"
|
||||
+ " \"cacheSizeBytes\": 1000,\n"
|
||||
+ " \"defaultTarget\": \"defaultTarget\",\n"
|
||||
+ " \"defaultTarget\": \"" + defaultTarget + "\",\n"
|
||||
+ " \"requestProcessingStrategy\": \"SYNC_LOOKUP_DEFAULT_TARGET_ON_ERROR\"\n"
|
||||
+ "}";
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue