mirror of https://github.com/grpc/grpc-java.git
rls: Only use subchannel policy for default target when RLS is not available (#9383)
* core: Only use subchannel policy for default target when RLS is not available Fixes #9237
This commit is contained in:
parent
03abe8a088
commit
50cdfa9f05
|
|
@ -931,14 +931,7 @@ final class CachingRlsLbClient {
|
||||||
if (picker == null) {
|
if (picker == null) {
|
||||||
return PickResult.withNoResult();
|
return PickResult.withNoResult();
|
||||||
}
|
}
|
||||||
PickResult result = picker.pickSubchannel(args);
|
return picker.pickSubchannel(args);
|
||||||
if (result.getStatus().isOk()) {
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
if (hasFallback) {
|
|
||||||
return useFallback(args);
|
|
||||||
}
|
|
||||||
return PickResult.withError(result.getStatus());
|
|
||||||
} else if (response.hasError()) {
|
} else if (response.hasError()) {
|
||||||
if (hasFallback) {
|
if (hasFallback) {
|
||||||
return useFallback(args);
|
return useFallback(args);
|
||||||
|
|
|
||||||
|
|
@ -74,6 +74,7 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
@ -110,6 +111,7 @@ public class RlsLoadBalancerTest {
|
||||||
mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper()));
|
mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper()));
|
||||||
private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl();
|
private final FakeRlsServerImpl fakeRlsServerImpl = new FakeRlsServerImpl();
|
||||||
private final Deque<FakeSubchannel> subchannels = new LinkedList<>();
|
private final Deque<FakeSubchannel> subchannels = new LinkedList<>();
|
||||||
|
private final FakeThrottler fakeThrottler = new FakeThrottler();
|
||||||
@Mock
|
@Mock
|
||||||
private Marshaller<Object> mockMarshaller;
|
private Marshaller<Object> mockMarshaller;
|
||||||
@Captor
|
@Captor
|
||||||
|
|
@ -120,7 +122,7 @@ public class RlsLoadBalancerTest {
|
||||||
private String defaultTarget = "defaultTarget";
|
private String defaultTarget = "defaultTarget";
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() {
|
||||||
MockitoAnnotations.initMocks(this);
|
MockitoAnnotations.initMocks(this);
|
||||||
|
|
||||||
fakeSearchMethod =
|
fakeSearchMethod =
|
||||||
|
|
@ -154,19 +156,21 @@ public class RlsLoadBalancerTest {
|
||||||
rlsLb.cachingRlsLbClientBuilderProvider = new CachingRlsLbClientBuilderProvider() {
|
rlsLb.cachingRlsLbClientBuilderProvider = new CachingRlsLbClientBuilderProvider() {
|
||||||
@Override
|
@Override
|
||||||
public CachingRlsLbClient.Builder get() {
|
public CachingRlsLbClient.Builder get() {
|
||||||
// using default throttler which doesn't throttle
|
// using fake throttler to allow enablement of throttler
|
||||||
return CachingRlsLbClient.newBuilder();
|
return CachingRlsLbClient.newBuilder()
|
||||||
|
.setThrottler(fakeThrottler)
|
||||||
|
.setTicker(fakeClock.getTicker());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() {
|
||||||
rlsLb.shutdown();
|
rlsLb.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void lb_working_withDefaultTarget() throws Exception {
|
public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
|
||||||
deliverResolvedAddresses();
|
deliverResolvedAddresses();
|
||||||
InOrder inOrder = inOrder(helper);
|
InOrder inOrder = inOrder(helper);
|
||||||
inOrder.verify(helper)
|
inOrder.verify(helper)
|
||||||
|
|
@ -215,40 +219,76 @@ public class RlsLoadBalancerTest {
|
||||||
inOrder.verify(helper)
|
inOrder.verify(helper)
|
||||||
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
|
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
|
||||||
|
|
||||||
// search again, use pending fallback because searchSubchannel is in failure mode
|
// search again, verify that it doesn't use fallback, since RLS server responded, even though
|
||||||
|
// subchannel is in failure mode
|
||||||
res = picker.pickSubchannel(
|
res = picker.pickSubchannel(
|
||||||
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
|
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
|
||||||
assertThat(res.getStatus().isOk()).isTrue();
|
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
|
||||||
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
|
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
|
||||||
|
fakeThrottler.nextResult = true;
|
||||||
|
|
||||||
|
deliverResolvedAddresses();
|
||||||
|
InOrder inOrder = inOrder(helper);
|
||||||
|
inOrder.verify(helper)
|
||||||
|
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
|
||||||
|
SubchannelPicker picker = pickerCaptor.getValue();
|
||||||
|
Metadata headers = new Metadata();
|
||||||
|
PickResult res;
|
||||||
|
|
||||||
|
// Search that when the RLS server doesn't respond, that fallback is used
|
||||||
|
res = picker.pickSubchannel(
|
||||||
|
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
|
||||||
|
FakeSubchannel fallbackSubchannel = (FakeSubchannel) res.getSubchannel();
|
||||||
|
|
||||||
|
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK);
|
||||||
|
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
|
||||||
inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
|
inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
|
||||||
assertThat(subchannels).hasSize(3);
|
|
||||||
FakeSubchannel fallbackSubchannel = subchannels.getLast();
|
|
||||||
fallbackSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
|
fallbackSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
|
||||||
inOrder.verify(helper, times(2))
|
inOrder.verify(helper, times(1))
|
||||||
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
|
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
|
||||||
inOrder.verifyNoMoreInteractions();
|
inOrder.verifyNoMoreInteractions();
|
||||||
|
|
||||||
res = picker.pickSubchannel(
|
res = picker.pickSubchannel(
|
||||||
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
|
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
|
||||||
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
|
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
|
||||||
assertThat(res.getSubchannel().getAddresses()).isEqualTo(searchSubchannel.getAddresses());
|
assertThat(res.getSubchannel()).isSameInstanceAs(fallbackSubchannel);
|
||||||
assertThat(res.getSubchannel().getAttributes()).isEqualTo(searchSubchannel.getAttributes());
|
|
||||||
|
|
||||||
res = picker.pickSubchannel(
|
res = picker.pickSubchannel(
|
||||||
new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT));
|
new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT));
|
||||||
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
|
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
|
||||||
assertThat(res.getSubchannel().getAddresses()).isEqualTo(rescueSubchannel.getAddresses());
|
assertThat(res.getSubchannel()).isSameInstanceAs(fallbackSubchannel);
|
||||||
assertThat(res.getSubchannel().getAttributes()).isEqualTo(rescueSubchannel.getAttributes());
|
|
||||||
|
// Make sure that when RLS starts communicating that default stops being used
|
||||||
|
fakeThrottler.nextResult = false;
|
||||||
|
fakeClock.forwardTime(2, TimeUnit.SECONDS); // Expires backoff cache entries
|
||||||
|
// Create search subchannel
|
||||||
|
res = picker.pickSubchannel(
|
||||||
|
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
|
||||||
|
assertThat(res.getSubchannel()).isNotSameInstanceAs(fallbackSubchannel);
|
||||||
|
FakeSubchannel searchSubchannel = (FakeSubchannel) res.getSubchannel();
|
||||||
|
searchSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
|
||||||
|
|
||||||
|
// create rescue subchannel
|
||||||
|
res = picker.pickSubchannel(
|
||||||
|
new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT));
|
||||||
|
assertThat(res.getSubchannel()).isNotSameInstanceAs(fallbackSubchannel);
|
||||||
|
assertThat(res.getSubchannel()).isNotSameInstanceAs(searchSubchannel);
|
||||||
|
FakeSubchannel rescueSubchannel = (FakeSubchannel) res.getSubchannel();
|
||||||
|
rescueSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
|
||||||
|
|
||||||
// all channels are failed
|
// all channels are failed
|
||||||
rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
|
rescueSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
|
||||||
inOrder.verify(helper)
|
searchSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
|
||||||
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
|
|
||||||
fallbackSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
|
fallbackSubchannel.updateState(ConnectivityStateInfo.forTransientFailure(Status.NOT_FOUND));
|
||||||
inOrder.verify(helper)
|
|
||||||
.updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
|
res = picker.pickSubchannel(
|
||||||
inOrder.verifyNoMoreInteractions();
|
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
|
||||||
|
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
|
||||||
|
assertThat(res.getSubchannel()).isNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -546,7 +586,7 @@ public class RlsLoadBalancerTest {
|
||||||
private final Attributes attributes;
|
private final Attributes attributes;
|
||||||
private List<EquivalentAddressGroup> eags;
|
private List<EquivalentAddressGroup> eags;
|
||||||
private SubchannelStateListener listener;
|
private SubchannelStateListener listener;
|
||||||
private boolean isReady;
|
private volatile boolean isReady;
|
||||||
|
|
||||||
public FakeSubchannel(List<EquivalentAddressGroup> eags, Attributes attributes) {
|
public FakeSubchannel(List<EquivalentAddressGroup> eags, Attributes attributes) {
|
||||||
this.eags = Collections.unmodifiableList(eags);
|
this.eags = Collections.unmodifiableList(eags);
|
||||||
|
|
@ -590,4 +630,20 @@ public class RlsLoadBalancerTest {
|
||||||
private static boolean subchannelIsReady(Subchannel subchannel) {
|
private static boolean subchannelIsReady(Subchannel subchannel) {
|
||||||
return subchannel instanceof FakeSubchannel && ((FakeSubchannel) subchannel).isReady;
|
return subchannel instanceof FakeSubchannel && ((FakeSubchannel) subchannel).isReady;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final class FakeThrottler implements Throttler {
|
||||||
|
|
||||||
|
private boolean nextResult = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean shouldThrottle() {
|
||||||
|
return nextResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerBackendResponse(boolean throttled) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue