mirror of https://github.com/grpc/grpc-java.git
rls:Fix throttling in route lookup (b/262779100) (#9874)
* Correct value being passed to throttler which had been backwards. * Fix flaky test. * Add a test using AdaptiveThrottler with a CachingRlsLBClient. * Address test flakiness.
This commit is contained in:
parent
56a08c3506
commit
5983be1369
|
|
@ -414,7 +414,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
|||
|
||||
// Schedule the next response chunk if there is one.
|
||||
Chunk nextChunk = chunks.peek();
|
||||
if (nextChunk != null) {
|
||||
if (nextChunk != null && !executor.isShutdown()) {
|
||||
scheduled = true;
|
||||
// TODO(ejona): cancel future if RPC is cancelled
|
||||
Future<?> unused = executor.schedule(new LogExceptionRunnable(dispatchTask),
|
||||
|
|
|
|||
|
|
@ -147,10 +147,11 @@ public class NettyFlowControlTest {
|
|||
// deal with cases that either don't cause a window update or hit max window
|
||||
expectedWindow = Math.min(MAX_WINDOW, Math.max(expectedWindow, REGULAR_WINDOW));
|
||||
|
||||
// Range looks large, but this allows for only one extra/missed window update
|
||||
// Range looks large, but this allows for only one extra/missed window update plus
|
||||
// bdpPing variations.
|
||||
// (one extra update causes a 2x difference and one missed update causes a .5x difference)
|
||||
assertTrue("Window was " + lastWindow + " expecting " + expectedWindow,
|
||||
lastWindow < 2 * expectedWindow);
|
||||
lastWindow < 2.2 * expectedWindow);
|
||||
assertTrue("Window was " + lastWindow + " expecting " + expectedWindow,
|
||||
expectedWindow < 2 * lastWindow);
|
||||
}
|
||||
|
|
@ -194,6 +195,7 @@ public class NettyFlowControlTest {
|
|||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final long expectedWindow;
|
||||
int lastWindow;
|
||||
boolean wasCompleted;
|
||||
|
||||
public TestStreamObserver(
|
||||
AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef, long window) {
|
||||
|
|
@ -206,9 +208,18 @@ public class NettyFlowControlTest {
|
|||
public void onNext(StreamingOutputCallResponse value) {
|
||||
GrpcHttp2ConnectionHandler grpcHandler = grpcHandlerRef.get();
|
||||
Http2Stream connectionStream = grpcHandler.connection().connectionStream();
|
||||
lastWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream);
|
||||
if (lastWindow >= expectedWindow) {
|
||||
onCompleted();
|
||||
int curWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream);
|
||||
synchronized (this) {
|
||||
if (curWindow >= expectedWindow) {
|
||||
if (wasCompleted) {
|
||||
return;
|
||||
}
|
||||
wasCompleted = true;
|
||||
lastWindow = curWindow;
|
||||
onCompleted();
|
||||
} else if (!wasCompleted) {
|
||||
lastWindow = curWindow;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -218,13 +218,13 @@ final class CachingRlsLbClient {
|
|||
public void onError(Throwable t) {
|
||||
logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
|
||||
response.setException(t);
|
||||
throttler.registerBackendResponse(false);
|
||||
throttler.registerBackendResponse(true);
|
||||
helper.propagateRlsError();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
throttler.registerBackendResponse(true);
|
||||
throttler.registerBackendResponse(false);
|
||||
}
|
||||
});
|
||||
return response;
|
||||
|
|
|
|||
|
|
@ -151,6 +151,7 @@ public class CachingRlsLbClientTest {
|
|||
private String rlsChannelOverriddenAuthority;
|
||||
|
||||
private void setUpRlsLbClient() {
|
||||
fakeThrottler.resetCounts();
|
||||
rlsLbClient =
|
||||
CachingRlsLbClient.newBuilder()
|
||||
.setBackoffProvider(fakeBackoffProvider)
|
||||
|
|
@ -362,6 +363,8 @@ public class CachingRlsLbClientTest {
|
|||
assertThat(pickResult.getStatus().isOk()).isTrue();
|
||||
assertThat(pickResult.getSubchannel()).isNotNull();
|
||||
assertThat(headers.get(RLS_DATA_KEY)).isEqualTo("header-rls-data-value");
|
||||
assertThat(fakeThrottler.getNumThrottled()).isEqualTo(0);
|
||||
assertThat(fakeThrottler.getNumUnthrottled()).isEqualTo(1);
|
||||
|
||||
// move backoff further back to only test error behavior
|
||||
fakeBackoffProvider.nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);
|
||||
|
|
@ -388,6 +391,97 @@ public class CachingRlsLbClientTest {
|
|||
CallOptions.DEFAULT));
|
||||
assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
assertThat(pickResult.getStatus().getDescription()).contains("fallback not available");
|
||||
assertThat(fakeThrottler.getNumThrottled()).isEqualTo(1);
|
||||
assertThat(fakeThrottler.getNumUnthrottled()).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void get_withAdaptiveThrottler() throws Exception {
|
||||
AdaptiveThrottler adaptiveThrottler =
|
||||
new AdaptiveThrottler.Builder()
|
||||
.setHistorySeconds(1)
|
||||
.setRatioForAccepts(1.0f)
|
||||
.setRequestsPadding(1)
|
||||
.setTicker(fakeClock.getTicker())
|
||||
.build();
|
||||
|
||||
this.rlsLbClient =
|
||||
CachingRlsLbClient.newBuilder()
|
||||
.setBackoffProvider(fakeBackoffProvider)
|
||||
.setResolvedAddressesFactory(resolvedAddressFactory)
|
||||
.setEvictionListener(evictionListener)
|
||||
.setHelper(helper)
|
||||
.setLbPolicyConfig(lbPolicyConfiguration)
|
||||
.setThrottler(adaptiveThrottler)
|
||||
.setTicker(fakeClock.getTicker())
|
||||
.build();
|
||||
InOrder inOrder = inOrder(helper);
|
||||
RouteLookupRequest routeLookupRequest = RouteLookupRequest.create(ImmutableMap.of(
|
||||
"server", "bigtable.googleapis.com", "service-key", "service1", "method-key", "create"));
|
||||
rlsServerImpl.setLookupTable(
|
||||
ImmutableMap.of(
|
||||
routeLookupRequest,
|
||||
RouteLookupResponse.create(
|
||||
ImmutableList.of("primary.cloudbigtable.googleapis.com"),
|
||||
"header-rls-data-value")));
|
||||
|
||||
// valid channel
|
||||
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
|
||||
assertThat(resp.isPending()).isTrue();
|
||||
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
|
||||
|
||||
resp = getInSyncContext(routeLookupRequest);
|
||||
assertThat(resp.hasData()).isTrue();
|
||||
|
||||
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class);
|
||||
ArgumentCaptor<ConnectivityState> stateCaptor =
|
||||
ArgumentCaptor.forClass(ConnectivityState.class);
|
||||
inOrder.verify(helper, times(2))
|
||||
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
|
||||
|
||||
Metadata headers = new Metadata();
|
||||
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(
|
||||
new PickSubchannelArgsImpl(
|
||||
TestMethodDescriptors.voidMethod().toBuilder().setFullMethodName("service1/create")
|
||||
.build(),
|
||||
headers,
|
||||
CallOptions.DEFAULT));
|
||||
assertThat(pickResult.getSubchannel()).isNotNull();
|
||||
assertThat(headers.get(RLS_DATA_KEY)).isEqualTo("header-rls-data-value");
|
||||
|
||||
// move backoff further back to only test error behavior
|
||||
fakeBackoffProvider.nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);
|
||||
// try to get invalid
|
||||
RouteLookupRequest invalidRouteLookupRequest =
|
||||
RouteLookupRequest.create(ImmutableMap.<String, String>of());
|
||||
CachedRouteLookupResponse errorResp = getInSyncContext(invalidRouteLookupRequest);
|
||||
assertThat(errorResp.isPending()).isTrue();
|
||||
fakeClock.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
|
||||
|
||||
errorResp = getInSyncContext(invalidRouteLookupRequest);
|
||||
assertThat(errorResp.hasError()).isTrue();
|
||||
|
||||
// Channel is still READY because the subchannel for method /service1/create is still READY.
|
||||
// Method /doesn/exists will use fallback child balancer and fail immediately.
|
||||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
|
||||
PickSubchannelArgsImpl invalidArgs = getInvalidArgs(headers);
|
||||
pickResult = pickerCaptor.getValue().pickSubchannel(invalidArgs);
|
||||
assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
assertThat(pickResult.getStatus().getDescription()).contains("fallback not available");
|
||||
long time = fakeClock.getTicker().read();
|
||||
assertThat(adaptiveThrottler.requestStat.get(time)).isEqualTo(2L);
|
||||
assertThat(adaptiveThrottler.throttledStat.get(time)).isEqualTo(1L);
|
||||
}
|
||||
|
||||
private PickSubchannelArgsImpl getInvalidArgs(Metadata headers) {
|
||||
PickSubchannelArgsImpl invalidArgs = new PickSubchannelArgsImpl(
|
||||
TestMethodDescriptors.voidMethod().toBuilder()
|
||||
.setFullMethodName("doesn/exists")
|
||||
.build(),
|
||||
headers,
|
||||
CallOptions.DEFAULT);
|
||||
return invalidArgs;
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -755,6 +849,8 @@ public class CachingRlsLbClientTest {
|
|||
}
|
||||
|
||||
private static final class FakeThrottler implements Throttler {
|
||||
int numUnthrottled;
|
||||
int numThrottled;
|
||||
|
||||
private boolean nextResult = false;
|
||||
|
||||
|
|
@ -765,7 +861,24 @@ public class CachingRlsLbClientTest {
|
|||
|
||||
@Override
|
||||
public void registerBackendResponse(boolean throttled) {
|
||||
// no-op
|
||||
if (throttled) {
|
||||
numThrottled++;
|
||||
} else {
|
||||
numUnthrottled++;
|
||||
}
|
||||
}
|
||||
|
||||
public int getNumUnthrottled() {
|
||||
return numUnthrottled;
|
||||
}
|
||||
|
||||
public int getNumThrottled() {
|
||||
return numThrottled;
|
||||
}
|
||||
|
||||
public void resetCounts() {
|
||||
numThrottled = 0;
|
||||
numUnthrottled = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue