mirror of https://github.com/grpc/grpc-java.git
xds: Unconditionally apply backoff on LRS stream recreation
This would limit LRS stream creation to one per second, even if the
old stream was considered good as it received a response. This is the
same change as made to ADS in 957079194a.
b/224833499
This commit is contained in:
parent
054cb49b49
commit
569b7b0b95
|
|
@ -259,20 +259,16 @@ final class LoadReportClient {
|
||||||
closed = true;
|
closed = true;
|
||||||
cleanUp();
|
cleanUp();
|
||||||
|
|
||||||
long delayNanos = 0;
|
|
||||||
if (initialResponseReceived || lrsRpcRetryPolicy == null) {
|
if (initialResponseReceived || lrsRpcRetryPolicy == null) {
|
||||||
// Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
|
// Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
|
||||||
// has never been initialized.
|
// has never been initialized.
|
||||||
lrsRpcRetryPolicy = backoffPolicyProvider.get();
|
lrsRpcRetryPolicy = backoffPolicyProvider.get();
|
||||||
}
|
}
|
||||||
// Backoff only when balancer wasn't working previously.
|
|
||||||
if (!initialResponseReceived) {
|
|
||||||
// The back-off policy determines the interval between consecutive RPC upstarts, thus the
|
// The back-off policy determines the interval between consecutive RPC upstarts, thus the
|
||||||
// actual delay may be smaller than the value from the back-off policy, or even negative,
|
// actual delay may be smaller than the value from the back-off policy, or even negative,
|
||||||
// depending how much time was spent in the previous RPC.
|
// depending how much time was spent in the previous RPC.
|
||||||
delayNanos =
|
long delayNanos =
|
||||||
lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
|
lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
|
||||||
}
|
|
||||||
logger.log(XdsLogLevel.INFO, "Retry LRS stream in {0} ns", delayNanos);
|
logger.log(XdsLogLevel.INFO, "Retry LRS stream in {0} ns", delayNanos);
|
||||||
if (delayNanos <= 0) {
|
if (delayNanos <= 0) {
|
||||||
startLrsRpc();
|
startLrsRpc();
|
||||||
|
|
|
||||||
|
|
@ -434,8 +434,15 @@ public class LoadReportClientTest {
|
||||||
// Then breaks the RPC
|
// Then breaks the RPC
|
||||||
responseObserver.onError(Status.UNAVAILABLE.asException());
|
responseObserver.onError(Status.UNAVAILABLE.asException());
|
||||||
|
|
||||||
// Will reset the retry sequence and retry immediately, because balancer has responded.
|
// Will reset the retry sequence retry after a delay. We want to always delay, to restrict any
|
||||||
|
// accidental closed loop of retries to 1 QPS.
|
||||||
inOrder.verify(backoffPolicyProvider).get();
|
inOrder.verify(backoffPolicyProvider).get();
|
||||||
|
inOrder.verify(backoffPolicy2).nextBackoffNanos();
|
||||||
|
// Fast-forward to a moment before the retry of backoff sequence 2 (2s)
|
||||||
|
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(2) - 1);
|
||||||
|
verifyNoMoreInteractions(mockLoadReportingService);
|
||||||
|
// Then time for retry
|
||||||
|
fakeClock.forwardNanos(1);
|
||||||
inOrder.verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
|
inOrder.verify(mockLoadReportingService).streamLoadStats(lrsResponseObserverCaptor.capture());
|
||||||
responseObserver = lrsResponseObserverCaptor.getValue();
|
responseObserver = lrsResponseObserverCaptor.getValue();
|
||||||
assertThat(lrsRequestObservers).hasSize(1);
|
assertThat(lrsRequestObservers).hasSize(1);
|
||||||
|
|
@ -446,12 +453,12 @@ public class LoadReportClientTest {
|
||||||
fakeClock.forwardNanos(4);
|
fakeClock.forwardNanos(4);
|
||||||
responseObserver.onError(Status.UNAVAILABLE.asException());
|
responseObserver.onError(Status.UNAVAILABLE.asException());
|
||||||
|
|
||||||
// Will be on the first retry (2s) of backoff sequence 2.
|
// Will be on the second retry (20s) of backoff sequence 2.
|
||||||
inOrder.verify(backoffPolicy2).nextBackoffNanos();
|
inOrder.verify(backoffPolicy2).nextBackoffNanos();
|
||||||
assertEquals(1, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
|
assertEquals(1, fakeClock.numPendingTasks(LRS_RPC_RETRY_TASK_FILTER));
|
||||||
|
|
||||||
// Fast-forward to a moment before the retry, the time spent in the last try is deducted.
|
// Fast-forward to a moment before the retry, the time spent in the last try is deducted.
|
||||||
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(2) - 4 - 1);
|
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(20) - 4 - 1);
|
||||||
verifyNoMoreInteractions(mockLoadReportingService);
|
verifyNoMoreInteractions(mockLoadReportingService);
|
||||||
// Then time for retry
|
// Then time for retry
|
||||||
fakeClock.forwardNanos(1);
|
fakeClock.forwardNanos(1);
|
||||||
|
|
@ -471,7 +478,8 @@ public class LoadReportClientTest {
|
||||||
ClusterStats clusterStats = Iterables.getOnlyElement(request.getClusterStatsList());
|
ClusterStats clusterStats = Iterables.getOnlyElement(request.getClusterStatsList());
|
||||||
assertThat(clusterStats.getClusterName()).isEqualTo(CLUSTER1);
|
assertThat(clusterStats.getClusterName()).isEqualTo(CLUSTER1);
|
||||||
assertThat(clusterStats.getClusterServiceName()).isEqualTo(EDS_SERVICE_NAME1);
|
assertThat(clusterStats.getClusterServiceName()).isEqualTo(EDS_SERVICE_NAME1);
|
||||||
assertThat(Durations.toSeconds(clusterStats.getLoadReportInterval())).isEqualTo(1L + 10L + 2L);
|
assertThat(Durations.toSeconds(clusterStats.getLoadReportInterval()))
|
||||||
|
.isEqualTo(1L + 10L + 2L + 20L);
|
||||||
assertThat(Iterables.getOnlyElement(clusterStats.getDroppedRequestsList()).getCategory())
|
assertThat(Iterables.getOnlyElement(clusterStats.getDroppedRequestsList()).getCategory())
|
||||||
.isEqualTo("lb");
|
.isEqualTo("lb");
|
||||||
assertThat(Iterables.getOnlyElement(clusterStats.getDroppedRequestsList()).getDroppedCount())
|
assertThat(Iterables.getOnlyElement(clusterStats.getDroppedRequestsList()).getDroppedCount())
|
||||||
|
|
@ -490,7 +498,7 @@ public class LoadReportClientTest {
|
||||||
// Wrapping up
|
// Wrapping up
|
||||||
verify(backoffPolicyProvider, times(2)).get();
|
verify(backoffPolicyProvider, times(2)).get();
|
||||||
verify(backoffPolicy1, times(2)).nextBackoffNanos();
|
verify(backoffPolicy1, times(2)).nextBackoffNanos();
|
||||||
verify(backoffPolicy2, times(1)).nextBackoffNanos();
|
verify(backoffPolicy2, times(2)).nextBackoffNanos();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue