From 4429ec2b7843d59c39d9ec4027857e89ad4c32f8 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Mon, 12 Jul 2021 14:53:48 -0700 Subject: [PATCH] core: add perAttemptRecvTimeout to retry policy (#8301) --- .../internal/ManagedChannelServiceConfig.java | 17 +++- .../java/io/grpc/internal/RetryPolicy.java | 8 ++ .../io/grpc/internal/ServiceConfigUtil.java | 6 +- .../ManagedChannelServiceConfigTest.java | 96 +++++++++++++++++-- .../io/grpc/internal/RetriableStreamTest.java | 1 + .../io/grpc/internal/RetryPolicyTest.java | 3 + 6 files changed, 122 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelServiceConfig.java b/core/src/main/java/io/grpc/internal/ManagedChannelServiceConfig.java index 7e6768f36e..4fdf3bf6cf 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelServiceConfig.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelServiceConfig.java @@ -27,11 +27,13 @@ import io.grpc.CallOptions; import io.grpc.InternalConfigSelector; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.MethodDescriptor; +import io.grpc.Status.Code; import io.grpc.internal.RetriableStream.Throttle; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import javax.annotation.Nullable; /** @@ -354,9 +356,22 @@ final class ManagedChannelServiceConfig { "backoffMultiplier must be greater than 0: %s", backoffMultiplier); + Long perAttemptRecvTimeout = + ServiceConfigUtil.getPerAttemptRecvTimeoutNanosFromRetryPolicy(retryPolicy); + checkArgument( + perAttemptRecvTimeout == null || perAttemptRecvTimeout >= 0, + "perAttemptRecvTimeout cannot be negative: %s", + perAttemptRecvTimeout); + + Set retryableCodes = + ServiceConfigUtil.getRetryableStatusCodesFromRetryPolicy(retryPolicy); + checkArgument( + perAttemptRecvTimeout != null || !retryableCodes.isEmpty(), + "retryableStatusCodes cannot be empty without perAttemptRecvTimeout"); + return new RetryPolicy( maxAttempts, initialBackoffNanos, maxBackoffNanos, backoffMultiplier, - ServiceConfigUtil.getRetryableStatusCodesFromRetryPolicy(retryPolicy)); + perAttemptRecvTimeout, retryableCodes); } private static HedgingPolicy hedgingPolicy( diff --git a/core/src/main/java/io/grpc/internal/RetryPolicy.java b/core/src/main/java/io/grpc/internal/RetryPolicy.java index 36ab3a0ca2..84267bee43 100644 --- a/core/src/main/java/io/grpc/internal/RetryPolicy.java +++ b/core/src/main/java/io/grpc/internal/RetryPolicy.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import io.grpc.Status.Code; import java.util.Set; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; /** @@ -33,6 +34,8 @@ final class RetryPolicy { final long initialBackoffNanos; final long maxBackoffNanos; final double backoffMultiplier; + @Nullable + final Long perAttemptRecvTimeoutNanos; final Set retryableStatusCodes; /** @@ -44,11 +47,13 @@ final class RetryPolicy { long initialBackoffNanos, long maxBackoffNanos, double backoffMultiplier, + @Nullable Long perAttemptRecvTimeoutNanos, @Nonnull Set retryableStatusCodes) { this.maxAttempts = maxAttempts; this.initialBackoffNanos = initialBackoffNanos; this.maxBackoffNanos = maxBackoffNanos; this.backoffMultiplier = backoffMultiplier; + this.perAttemptRecvTimeoutNanos = perAttemptRecvTimeoutNanos; this.retryableStatusCodes = ImmutableSet.copyOf(retryableStatusCodes); } @@ -59,6 +64,7 @@ final class RetryPolicy { initialBackoffNanos, maxBackoffNanos, backoffMultiplier, + perAttemptRecvTimeoutNanos, retryableStatusCodes); } @@ -72,6 +78,7 @@ final class RetryPolicy { && this.initialBackoffNanos == that.initialBackoffNanos && this.maxBackoffNanos == that.maxBackoffNanos && Double.compare(this.backoffMultiplier, that.backoffMultiplier) == 0 + && Objects.equal(this.perAttemptRecvTimeoutNanos, that.perAttemptRecvTimeoutNanos) && Objects.equal(this.retryableStatusCodes, that.retryableStatusCodes); } @@ -82,6 +89,7 @@ final class RetryPolicy { .add("initialBackoffNanos", initialBackoffNanos) .add("maxBackoffNanos", maxBackoffNanos) .add("backoffMultiplier", backoffMultiplier) + .add("perAttemptRecvTimeoutNanos", perAttemptRecvTimeoutNanos) .add("retryableStatusCodes", retryableStatusCodes) .toString(); } diff --git a/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java b/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java index e6df6240d1..294e9dbcf7 100644 --- a/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java +++ b/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java @@ -140,6 +140,11 @@ public final class ServiceConfigUtil { return JsonUtil.getNumber(retryPolicy, "backoffMultiplier"); } + @Nullable + static Long getPerAttemptRecvTimeoutNanosFromRetryPolicy(Map retryPolicy) { + return JsonUtil.getStringAsDuration(retryPolicy, "perAttemptRecvTimeout"); + } + private static Set getListOfStatusCodesAsSet(Map obj, String key) { List statuses = JsonUtil.getList(obj, key); if (statuses == null) { @@ -178,7 +183,6 @@ public final class ServiceConfigUtil { String retryableStatusCodesKey = "retryableStatusCodes"; Set codes = getListOfStatusCodesAsSet(retryPolicy, retryableStatusCodesKey); verify(codes != null, "%s is required in retry policy", retryableStatusCodesKey); - verify(!codes.isEmpty(), "%s must not be empty", retryableStatusCodesKey); verify(!codes.contains(Status.Code.OK), "%s must not contain OK", retryableStatusCodesKey); return codes; } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelServiceConfigTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelServiceConfigTest.java index 4507150433..709f6274de 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelServiceConfigTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelServiceConfigTest.java @@ -20,6 +20,7 @@ import static com.google.common.truth.Truth.assertThat; import static io.grpc.MethodDescriptor.MethodType.UNARY; import static io.grpc.Status.Code.UNAVAILABLE; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.Assert.fail; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -155,13 +156,14 @@ public class ManagedChannelServiceConfigTest { "name", ImmutableList.of(name1, name2), "timeout", "1.234s", "retryPolicy", - ImmutableMap.of( - "maxAttempts", 3.0D, - "initialBackoff", "1s", - "maxBackoff", "10s", - "backoffMultiplier", 1.5D, - "retryableStatusCodes", ImmutableList.of("UNAVAILABLE") - )); + ImmutableMap.builder() + .put("maxAttempts", 3.0D) + .put("initialBackoff", "1s") + .put("maxBackoff", "10s") + .put("backoffMultiplier", 1.5D) + .put("perAttemptRecvTimeout", "2.5s") + .put("retryableStatusCodes", ImmutableList.of("UNAVAILABLE")) + .build()); Map defaultMethodConfig = ImmutableMap.of( "name", ImmutableList.of(ImmutableMap.of()), "timeout", "4.321s"); @@ -187,6 +189,8 @@ public class ManagedChannelServiceConfigTest { methodInfo = serviceConfig.getMethodConfig(methodForName("service1", "method1")); assertThat(methodInfo.timeoutNanos).isEqualTo(MILLISECONDS.toNanos(1234)); assertThat(methodInfo.retryPolicy.maxAttempts).isEqualTo(2); + assertThat(methodInfo.retryPolicy.perAttemptRecvTimeoutNanos) + .isEqualTo(MILLISECONDS.toNanos(2500)); assertThat(methodInfo.retryPolicy.retryableStatusCodes).containsExactly(UNAVAILABLE); methodInfo = serviceConfig.getMethodConfig(methodForName("service1", "methodX")); assertThat(methodInfo.timeoutNanos).isEqualTo(MILLISECONDS.toNanos(4321)); @@ -212,6 +216,84 @@ public class ManagedChannelServiceConfigTest { .isEqualTo(serviceConfig.getMethodConfig(method)); } + @Test + public void retryConfig_emptyRetriableStatusCodesAllowedWithPerAttemptRecvTimeoutGiven() { + Map retryPolicy = ImmutableMap.builder() + .put("maxAttempts", 3.0D) + .put("initialBackoff", "1s") + .put("maxBackoff", "10s") + .put("backoffMultiplier", 1.5D) + .put("perAttemptRecvTimeout", "2.5s") + .put("retryableStatusCodes", ImmutableList.of()) + .build(); + Map methodConfig = ImmutableMap.of( + "name", ImmutableList.of(ImmutableMap.of()), "retryPolicy", retryPolicy); + Map rawServiceConfig = + ImmutableMap.of("methodConfig", ImmutableList.of(methodConfig)); + assertThat(ManagedChannelServiceConfig.fromServiceConfig(rawServiceConfig, true, 5, 5, null)) + .isNotNull(); + } + + @Test + public void retryConfig_PerAttemptRecvTimeoutUnsetAllowedIfRetryableStatusCodesNonempty() { + Map retryPolicy = ImmutableMap.builder() + .put("maxAttempts", 3.0D) + .put("initialBackoff", "1s") + .put("maxBackoff", "10s") + .put("backoffMultiplier", 1.5D) + .put("retryableStatusCodes", ImmutableList.of("UNAVAILABLE")) + .build(); + Map methodConfig = ImmutableMap.of( + "name", ImmutableList.of(ImmutableMap.of()), "retryPolicy", retryPolicy); + Map rawServiceConfig = + ImmutableMap.of("methodConfig", ImmutableList.of(methodConfig)); + assertThat(ManagedChannelServiceConfig.fromServiceConfig(rawServiceConfig, true, 5, 5, null)) + .isNotNull(); + } + + @Test + public void retryConfig_emptyRetriableStatusCodesNotAllowedWithPerAttemptRecvTimeoutUnset() { + Map retryPolicy = ImmutableMap.builder() + .put("maxAttempts", 3.0D) + .put("initialBackoff", "1s") + .put("maxBackoff", "10s") + .put("backoffMultiplier", 1.5D) + .put("retryableStatusCodes", ImmutableList.of()) + .build(); + Map methodConfig = ImmutableMap.of( + "name", ImmutableList.of(ImmutableMap.of()), "retryPolicy", retryPolicy); + Map rawServiceConfig = + ImmutableMap.of("methodConfig", ImmutableList.of(methodConfig)); + try { + ManagedChannelServiceConfig.fromServiceConfig(rawServiceConfig, true, 5, 5, null); + fail("The expected IllegalArgumentException is not thrown"); + } catch (IllegalArgumentException e) { + assertThat(e).hasMessageThat().contains( + "retryableStatusCodes cannot be empty without perAttemptRecvTimeout"); + } + } + + // For now we allow perAttemptRecvTimeout being zero although it does not make sense. + // TODO(zdapeng): disallow zero perAttemptRecvTimeout if hedging is not enabled once we support + // hedge_on_per_try_timeout. + @Test + public void retryConfig_AllowPerAttemptRecvTimeoutZero() { + Map retryPolicy = ImmutableMap.builder() + .put("maxAttempts", 3.0D) + .put("initialBackoff", "1s") + .put("maxBackoff", "10s") + .put("backoffMultiplier", 1.5D) + .put("perAttemptRecvTimeout", "0s") + .put("retryableStatusCodes", ImmutableList.of()) + .build(); + Map methodConfig = ImmutableMap.of( + "name", ImmutableList.of(ImmutableMap.of()), "retryPolicy", retryPolicy); + Map rawServiceConfig = + ImmutableMap.of("methodConfig", ImmutableList.of(methodConfig)); + assertThat(ManagedChannelServiceConfig.fromServiceConfig(rawServiceConfig, true, 5, 5, null)) + .isNotNull(); + } + private static MethodDescriptor methodForName(String service, String method) { return MethodDescriptor.newBuilder() .setFullMethodName(service + "/" + method) diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index af78392787..a83964b5e9 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -120,6 +120,7 @@ public class RetriableStreamTest { TimeUnit.SECONDS.toNanos(INITIAL_BACKOFF_IN_SECONDS), TimeUnit.SECONDS.toNanos(MAX_BACKOFF_IN_SECONDS), BACKOFF_MULTIPLIER, + null, ImmutableSet.of(RETRIABLE_STATUS_CODE_1, RETRIABLE_STATUS_CODE_2)); private static final HedgingPolicy HEDGING_POLICY = new HedgingPolicy( diff --git a/core/src/test/java/io/grpc/internal/RetryPolicyTest.java b/core/src/test/java/io/grpc/internal/RetryPolicyTest.java index 5ff96df539..51d196c20b 100644 --- a/core/src/test/java/io/grpc/internal/RetryPolicyTest.java +++ b/core/src/test/java/io/grpc/internal/RetryPolicyTest.java @@ -78,6 +78,7 @@ public class RetryPolicyTest { TimeUnit.MILLISECONDS.toNanos(2100), TimeUnit.MILLISECONDS.toNanos(2200), parseDouble("3"), + null, ImmutableSet.of(Code.UNAVAILABLE, Code.RESOURCE_EXHAUSTED))); method = builder.setFullMethodName("SimpleService1/Foo1").build(); @@ -87,6 +88,7 @@ public class RetryPolicyTest { TimeUnit.MILLISECONDS.toNanos(100), TimeUnit.MILLISECONDS.toNanos(1000), parseDouble("2"), + null, ImmutableSet.of(Code.UNAVAILABLE))); method = builder.setFullMethodName("SimpleService2/not_exist").build(); @@ -99,6 +101,7 @@ public class RetryPolicyTest { TimeUnit.MILLISECONDS.toNanos(100), TimeUnit.MILLISECONDS.toNanos(1000), parseDouble("2"), + null, ImmutableSet.of(Code.UNAVAILABLE))); } finally { if (reader != null) {