From 3c12e3af1ac433a5ceac8fa68ab8f39b8f5954eb Mon Sep 17 00:00:00 2001 From: Yuriy Holinko Date: Tue, 25 Mar 2025 18:20:12 +0200 Subject: [PATCH] Refine delay jitter for exponential backoff (#7206) --- .../sender/jdk/internal/JdkHttpSender.java | 23 +++---- .../okhttp/internal/RetryInterceptor.java | 64 +++++++++---------- .../okhttp/internal/RetryInterceptorTest.java | 64 +++++++++++++------ 3 files changed, 84 insertions(+), 67 deletions(-) diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index 20d219edaf..312149728a 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -213,9 +213,11 @@ public final class JdkHttpSender implements HttpSender { do { if (attempt > 0) { // Compute and sleep for backoff - long upperBoundNanos = Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos()); - long backoffNanos = ThreadLocalRandom.current().nextLong(upperBoundNanos); - nextBackoffNanos = (long) (nextBackoffNanos * retryPolicy.getBackoffMultiplier()); + long currentBackoffNanos = + Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos()); + long backoffNanos = + (long) (ThreadLocalRandom.current().nextDouble(0.8d, 1.2d) * currentBackoffNanos); + nextBackoffNanos = (long) (currentBackoffNanos * retryPolicy.getBackoffMultiplier()); try { TimeUnit.NANOSECONDS.sleep(backoffNanos); } catch (InterruptedException e) { @@ -227,16 +229,11 @@ public final class JdkHttpSender implements HttpSender { break; } } - - attempt++; + httpResponse = null; + exception = null; requestBuilder.timeout(Duration.ofNanos(timeoutNanos - (System.nanoTime() - startTimeNanos))); try { httpResponse = sendRequest(requestBuilder, byteBufferPool); - } catch (IOException e) { - exception = e; - } - - if (httpResponse != null) { boolean retryable = retryableStatusCodes.contains(httpResponse.statusCode()); if (logger.isLoggable(Level.FINER)) { logger.log( @@ -251,8 +248,8 @@ public final class JdkHttpSender implements HttpSender { if (!retryable) { return httpResponse; } - } - if (exception != null) { + } catch (IOException e) { + exception = e; boolean retryable = retryExceptionPredicate.test(exception); if (logger.isLoggable(Level.FINER)) { logger.log( @@ -268,7 +265,7 @@ public final class JdkHttpSender implements HttpSender { throw exception; } } - } while (attempt < retryPolicy.getMaxAttempts()); + } while (++attempt < retryPolicy.getMaxAttempts()); if (httpResponse != null) { return httpResponse; diff --git a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java index 7c3a3bfab8..988c8277c2 100644 --- a/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java +++ b/exporters/sender/okhttp/src/main/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptor.java @@ -18,6 +18,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; import okhttp3.Interceptor; @@ -37,7 +38,7 @@ public final class RetryInterceptor implements Interceptor { private final Function isRetryable; private final Predicate retryExceptionPredicate; private final Sleeper sleeper; - private final BoundedLongGenerator randomLong; + private final Supplier randomJitter; /** Constructs a new retrier. */ public RetryInterceptor(RetryPolicy retryPolicy, Function isRetryable) { @@ -48,7 +49,7 @@ public final class RetryInterceptor implements Interceptor { ? RetryInterceptor::isRetryableException : retryPolicy.getRetryExceptionPredicate(), TimeUnit.NANOSECONDS::sleep, - bound -> ThreadLocalRandom.current().nextLong(bound)); + () -> ThreadLocalRandom.current().nextDouble(0.8d, 1.2d)); } // Visible for testing @@ -57,12 +58,12 @@ public final class RetryInterceptor implements Interceptor { Function isRetryable, Predicate retryExceptionPredicate, Sleeper sleeper, - BoundedLongGenerator randomLong) { + Supplier randomJitter) { this.retryPolicy = retryPolicy; this.isRetryable = isRetryable; this.retryExceptionPredicate = retryExceptionPredicate; this.sleeper = sleeper; - this.randomLong = randomLong; + this.randomJitter = randomJitter; } @Override @@ -75,9 +76,10 @@ public final class RetryInterceptor implements Interceptor { if (attempt > 0) { // Compute and sleep for backoff // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#exponential-backoff - long upperBoundNanos = Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos()); - long backoffNanos = randomLong.get(upperBoundNanos); - nextBackoffNanos = (long) (nextBackoffNanos * retryPolicy.getBackoffMultiplier()); + long currentBackoffNanos = + Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos()); + long backoffNanos = (long) (randomJitter.get() * currentBackoffNanos); + nextBackoffNanos = (long) (currentBackoffNanos * retryPolicy.getBackoffMultiplier()); try { sleeper.sleep(backoffNanos); } catch (InterruptedException e) { @@ -88,31 +90,31 @@ public final class RetryInterceptor implements Interceptor { if (response != null) { response.close(); } + exception = null; } - - attempt++; try { response = chain.proceed(chain.request()); + if (response != null) { + boolean retryable = Boolean.TRUE.equals(isRetryable.apply(response)); + if (logger.isLoggable(Level.FINER)) { + logger.log( + Level.FINER, + "Attempt " + + attempt + + " returned " + + (retryable ? "retryable" : "non-retryable") + + " response: " + + responseStringRepresentation(response)); + } + if (!retryable) { + return response; + } + } else { + throw new NullPointerException("response cannot be null."); + } } catch (IOException e) { exception = e; - } - if (response != null) { - boolean retryable = Boolean.TRUE.equals(isRetryable.apply(response)); - if (logger.isLoggable(Level.FINER)) { - logger.log( - Level.FINER, - "Attempt " - + attempt - + " returned " - + (retryable ? "retryable" : "non-retryable") - + " response: " - + responseStringRepresentation(response)); - } - if (!retryable) { - return response; - } - } - if (exception != null) { + response = null; boolean retryable = retryExceptionPredicate.test(exception); if (logger.isLoggable(Level.FINER)) { logger.log( @@ -128,8 +130,7 @@ public final class RetryInterceptor implements Interceptor { throw exception; } } - - } while (attempt < retryPolicy.getMaxAttempts()); + } while (++attempt < retryPolicy.getMaxAttempts()); if (response != null) { return response; @@ -172,11 +173,6 @@ public final class RetryInterceptor implements Interceptor { return false; } - // Visible for testing - interface BoundedLongGenerator { - long get(long bound); - } - // Visible for testing interface Sleeper { void sleep(long delayNanos) throws InterruptedException; diff --git a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptorTest.java b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptorTest.java index 3ba8368335..ca2ad7d596 100644 --- a/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptorTest.java +++ b/exporters/sender/okhttp/src/test/java/io/opentelemetry/exporter/sender/okhttp/internal/RetryInterceptorTest.java @@ -9,8 +9,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -32,9 +33,11 @@ import java.net.UnknownHostException; import java.time.Duration; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Stream; +import okhttp3.Interceptor; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; @@ -47,7 +50,9 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; @ExtendWith(MockitoExtension.class) class RetryInterceptorTest { @@ -55,7 +60,7 @@ class RetryInterceptorTest { @RegisterExtension static final MockWebServerExtension server = new MockWebServerExtension(); @Mock private RetryInterceptor.Sleeper sleeper; - @Mock private RetryInterceptor.BoundedLongGenerator random; + @Mock private Supplier random; private Predicate retryExceptionPredicate; private RetryInterceptor retrier; @@ -91,6 +96,24 @@ class RetryInterceptorTest { client = new OkHttpClient.Builder().addInterceptor(retrier).build(); } + @Test + void noRetryOnNullResponse() throws IOException { + Interceptor.Chain chain = mock(Interceptor.Chain.class); + when(chain.proceed(any())).thenReturn(null); + when(chain.request()) + .thenReturn(new Request.Builder().url(server.httpUri().toString()).build()); + assertThatThrownBy( + () -> { + retrier.intercept(chain); + }) + .isInstanceOf(NullPointerException.class) + .hasMessage("response cannot be null."); + + verifyNoInteractions(retryExceptionPredicate); + verifyNoInteractions(random); + verifyNoInteractions(sleeper); + } + @Test void noRetry() throws Exception { server.enqueue(HttpResponse.of(HttpStatus.OK)); @@ -109,17 +132,8 @@ class RetryInterceptorTest { @ValueSource(ints = {5, 6}) void backsOff(int attempts) throws Exception { succeedOnAttempt(attempts); - - // Will backoff 4 times - when(random.get((long) (TimeUnit.SECONDS.toNanos(1) * Math.pow(1.6, 0)))).thenReturn(100L); - when(random.get((long) (TimeUnit.SECONDS.toNanos(1) * Math.pow(1.6, 1)))).thenReturn(50L); - // Capped - when(random.get(TimeUnit.SECONDS.toNanos(2))).thenReturn(500L).thenReturn(510L); - - doNothing().when(sleeper).sleep(100); - doNothing().when(sleeper).sleep(50); - doNothing().when(sleeper).sleep(500); - doNothing().when(sleeper).sleep(510); + when(random.get()).thenReturn(1.0d); + doNothing().when(sleeper).sleep(anyLong()); try (Response response = sendRequest()) { if (attempts <= 5) { @@ -139,16 +153,26 @@ class RetryInterceptorTest { succeedOnAttempt(5); // Backs off twice, second is interrupted - when(random.get((long) (TimeUnit.SECONDS.toNanos(1) * Math.pow(1.6, 0)))).thenReturn(100L); - when(random.get((long) (TimeUnit.SECONDS.toNanos(1) * Math.pow(1.6, 1)))).thenReturn(50L); + when(random.get()).thenReturn(1.0d).thenReturn(1.0d); + doAnswer( + new Answer() { + int counter = 0; - doNothing().when(sleeper).sleep(100); - doThrow(new InterruptedException()).when(sleeper).sleep(50); + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + if (counter++ == 1) { + throw new InterruptedException(); + } + return null; + } + }) + .when(sleeper) + .sleep(anyLong()); try (Response response = sendRequest()) { assertThat(response.isSuccessful()).isFalse(); } - + verify(sleeper, times(2)).sleep(anyLong()); for (int i = 0; i < 2; i++) { server.takeRequest(0, TimeUnit.NANOSECONDS); } @@ -157,7 +181,7 @@ class RetryInterceptorTest { @Test void connectTimeout() throws Exception { client = connectTimeoutClient(); - when(random.get(anyLong())).thenReturn(1L); + when(random.get()).thenReturn(1.0d); doNothing().when(sleeper).sleep(anyLong()); // Connecting to a non-routable IP address to trigger connection error @@ -174,7 +198,7 @@ class RetryInterceptorTest { @Test void connectException() throws Exception { client = connectTimeoutClient(); - when(random.get(anyLong())).thenReturn(1L); + when(random.get()).thenReturn(1.0d); doNothing().when(sleeper).sleep(anyLong()); // Connecting to localhost on an unused port address to trigger java.net.ConnectException