core: add perAttemptRecvTimeout to retry policy (#8301)

This commit is contained in:
ZHANG Dapeng 2021-07-12 14:53:48 -07:00 committed by GitHub
parent 9b55aed12e
commit 4429ec2b78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 122 additions and 9 deletions

View File

@ -27,11 +27,13 @@ import io.grpc.CallOptions;
import io.grpc.InternalConfigSelector;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.MethodDescriptor;
import io.grpc.Status.Code;
import io.grpc.internal.RetriableStream.Throttle;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
/**
@ -354,9 +356,22 @@ final class ManagedChannelServiceConfig {
"backoffMultiplier must be greater than 0: %s",
backoffMultiplier);
Long perAttemptRecvTimeout =
ServiceConfigUtil.getPerAttemptRecvTimeoutNanosFromRetryPolicy(retryPolicy);
checkArgument(
perAttemptRecvTimeout == null || perAttemptRecvTimeout >= 0,
"perAttemptRecvTimeout cannot be negative: %s",
perAttemptRecvTimeout);
Set<Code> retryableCodes =
ServiceConfigUtil.getRetryableStatusCodesFromRetryPolicy(retryPolicy);
checkArgument(
perAttemptRecvTimeout != null || !retryableCodes.isEmpty(),
"retryableStatusCodes cannot be empty without perAttemptRecvTimeout");
return new RetryPolicy(
maxAttempts, initialBackoffNanos, maxBackoffNanos, backoffMultiplier,
ServiceConfigUtil.getRetryableStatusCodesFromRetryPolicy(retryPolicy));
perAttemptRecvTimeout, retryableCodes);
}
private static HedgingPolicy hedgingPolicy(

View File

@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet;
import io.grpc.Status.Code;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
/**
@ -33,6 +34,8 @@ final class RetryPolicy {
final long initialBackoffNanos;
final long maxBackoffNanos;
final double backoffMultiplier;
@Nullable
final Long perAttemptRecvTimeoutNanos;
final Set<Code> retryableStatusCodes;
/**
@ -44,11 +47,13 @@ final class RetryPolicy {
long initialBackoffNanos,
long maxBackoffNanos,
double backoffMultiplier,
@Nullable Long perAttemptRecvTimeoutNanos,
@Nonnull Set<Code> retryableStatusCodes) {
this.maxAttempts = maxAttempts;
this.initialBackoffNanos = initialBackoffNanos;
this.maxBackoffNanos = maxBackoffNanos;
this.backoffMultiplier = backoffMultiplier;
this.perAttemptRecvTimeoutNanos = perAttemptRecvTimeoutNanos;
this.retryableStatusCodes = ImmutableSet.copyOf(retryableStatusCodes);
}
@ -59,6 +64,7 @@ final class RetryPolicy {
initialBackoffNanos,
maxBackoffNanos,
backoffMultiplier,
perAttemptRecvTimeoutNanos,
retryableStatusCodes);
}
@ -72,6 +78,7 @@ final class RetryPolicy {
&& this.initialBackoffNanos == that.initialBackoffNanos
&& this.maxBackoffNanos == that.maxBackoffNanos
&& Double.compare(this.backoffMultiplier, that.backoffMultiplier) == 0
&& Objects.equal(this.perAttemptRecvTimeoutNanos, that.perAttemptRecvTimeoutNanos)
&& Objects.equal(this.retryableStatusCodes, that.retryableStatusCodes);
}
@ -82,6 +89,7 @@ final class RetryPolicy {
.add("initialBackoffNanos", initialBackoffNanos)
.add("maxBackoffNanos", maxBackoffNanos)
.add("backoffMultiplier", backoffMultiplier)
.add("perAttemptRecvTimeoutNanos", perAttemptRecvTimeoutNanos)
.add("retryableStatusCodes", retryableStatusCodes)
.toString();
}

View File

@ -140,6 +140,11 @@ public final class ServiceConfigUtil {
return JsonUtil.getNumber(retryPolicy, "backoffMultiplier");
}
@Nullable
static Long getPerAttemptRecvTimeoutNanosFromRetryPolicy(Map<String, ?> retryPolicy) {
return JsonUtil.getStringAsDuration(retryPolicy, "perAttemptRecvTimeout");
}
private static Set<Status.Code> getListOfStatusCodesAsSet(Map<String, ?> obj, String key) {
List<?> statuses = JsonUtil.getList(obj, key);
if (statuses == null) {
@ -178,7 +183,6 @@ public final class ServiceConfigUtil {
String retryableStatusCodesKey = "retryableStatusCodes";
Set<Status.Code> codes = getListOfStatusCodesAsSet(retryPolicy, retryableStatusCodesKey);
verify(codes != null, "%s is required in retry policy", retryableStatusCodesKey);
verify(!codes.isEmpty(), "%s must not be empty", retryableStatusCodesKey);
verify(!codes.contains(Status.Code.OK), "%s must not contain OK", retryableStatusCodesKey);
return codes;
}

View File

@ -20,6 +20,7 @@ import static com.google.common.truth.Truth.assertThat;
import static io.grpc.MethodDescriptor.MethodType.UNARY;
import static io.grpc.Status.Code.UNAVAILABLE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.fail;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@ -155,13 +156,14 @@ public class ManagedChannelServiceConfigTest {
"name", ImmutableList.of(name1, name2),
"timeout", "1.234s",
"retryPolicy",
ImmutableMap.of(
"maxAttempts", 3.0D,
"initialBackoff", "1s",
"maxBackoff", "10s",
"backoffMultiplier", 1.5D,
"retryableStatusCodes", ImmutableList.of("UNAVAILABLE")
));
ImmutableMap.builder()
.put("maxAttempts", 3.0D)
.put("initialBackoff", "1s")
.put("maxBackoff", "10s")
.put("backoffMultiplier", 1.5D)
.put("perAttemptRecvTimeout", "2.5s")
.put("retryableStatusCodes", ImmutableList.of("UNAVAILABLE"))
.build());
Map<String, ?> defaultMethodConfig = ImmutableMap.of(
"name", ImmutableList.of(ImmutableMap.of()),
"timeout", "4.321s");
@ -187,6 +189,8 @@ public class ManagedChannelServiceConfigTest {
methodInfo = serviceConfig.getMethodConfig(methodForName("service1", "method1"));
assertThat(methodInfo.timeoutNanos).isEqualTo(MILLISECONDS.toNanos(1234));
assertThat(methodInfo.retryPolicy.maxAttempts).isEqualTo(2);
assertThat(methodInfo.retryPolicy.perAttemptRecvTimeoutNanos)
.isEqualTo(MILLISECONDS.toNanos(2500));
assertThat(methodInfo.retryPolicy.retryableStatusCodes).containsExactly(UNAVAILABLE);
methodInfo = serviceConfig.getMethodConfig(methodForName("service1", "methodX"));
assertThat(methodInfo.timeoutNanos).isEqualTo(MILLISECONDS.toNanos(4321));
@ -212,6 +216,84 @@ public class ManagedChannelServiceConfigTest {
.isEqualTo(serviceConfig.getMethodConfig(method));
}
@Test
public void retryConfig_emptyRetriableStatusCodesAllowedWithPerAttemptRecvTimeoutGiven() {
Map<String, ?> retryPolicy = ImmutableMap.<String, Object>builder()
.put("maxAttempts", 3.0D)
.put("initialBackoff", "1s")
.put("maxBackoff", "10s")
.put("backoffMultiplier", 1.5D)
.put("perAttemptRecvTimeout", "2.5s")
.put("retryableStatusCodes", ImmutableList.of())
.build();
Map<String, ?> methodConfig = ImmutableMap.of(
"name", ImmutableList.of(ImmutableMap.of()), "retryPolicy", retryPolicy);
Map<String, ?> rawServiceConfig =
ImmutableMap.of("methodConfig", ImmutableList.of(methodConfig));
assertThat(ManagedChannelServiceConfig.fromServiceConfig(rawServiceConfig, true, 5, 5, null))
.isNotNull();
}
@Test
public void retryConfig_PerAttemptRecvTimeoutUnsetAllowedIfRetryableStatusCodesNonempty() {
Map<String, ?> retryPolicy = ImmutableMap.<String, Object>builder()
.put("maxAttempts", 3.0D)
.put("initialBackoff", "1s")
.put("maxBackoff", "10s")
.put("backoffMultiplier", 1.5D)
.put("retryableStatusCodes", ImmutableList.of("UNAVAILABLE"))
.build();
Map<String, ?> methodConfig = ImmutableMap.of(
"name", ImmutableList.of(ImmutableMap.of()), "retryPolicy", retryPolicy);
Map<String, ?> rawServiceConfig =
ImmutableMap.of("methodConfig", ImmutableList.of(methodConfig));
assertThat(ManagedChannelServiceConfig.fromServiceConfig(rawServiceConfig, true, 5, 5, null))
.isNotNull();
}
@Test
public void retryConfig_emptyRetriableStatusCodesNotAllowedWithPerAttemptRecvTimeoutUnset() {
Map<String, ?> retryPolicy = ImmutableMap.<String, Object>builder()
.put("maxAttempts", 3.0D)
.put("initialBackoff", "1s")
.put("maxBackoff", "10s")
.put("backoffMultiplier", 1.5D)
.put("retryableStatusCodes", ImmutableList.of())
.build();
Map<String, ?> methodConfig = ImmutableMap.of(
"name", ImmutableList.of(ImmutableMap.of()), "retryPolicy", retryPolicy);
Map<String, ?> rawServiceConfig =
ImmutableMap.of("methodConfig", ImmutableList.of(methodConfig));
try {
ManagedChannelServiceConfig.fromServiceConfig(rawServiceConfig, true, 5, 5, null);
fail("The expected IllegalArgumentException is not thrown");
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageThat().contains(
"retryableStatusCodes cannot be empty without perAttemptRecvTimeout");
}
}
// For now we allow perAttemptRecvTimeout being zero although it does not make sense.
// TODO(zdapeng): disallow zero perAttemptRecvTimeout if hedging is not enabled once we support
// hedge_on_per_try_timeout.
@Test
public void retryConfig_AllowPerAttemptRecvTimeoutZero() {
Map<String, ?> retryPolicy = ImmutableMap.<String, Object>builder()
.put("maxAttempts", 3.0D)
.put("initialBackoff", "1s")
.put("maxBackoff", "10s")
.put("backoffMultiplier", 1.5D)
.put("perAttemptRecvTimeout", "0s")
.put("retryableStatusCodes", ImmutableList.of())
.build();
Map<String, ?> methodConfig = ImmutableMap.of(
"name", ImmutableList.of(ImmutableMap.of()), "retryPolicy", retryPolicy);
Map<String, ?> rawServiceConfig =
ImmutableMap.of("methodConfig", ImmutableList.of(methodConfig));
assertThat(ManagedChannelServiceConfig.fromServiceConfig(rawServiceConfig, true, 5, 5, null))
.isNotNull();
}
private static MethodDescriptor<?, ?> methodForName(String service, String method) {
return MethodDescriptor.<Void, Void>newBuilder()
.setFullMethodName(service + "/" + method)

View File

@ -120,6 +120,7 @@ public class RetriableStreamTest {
TimeUnit.SECONDS.toNanos(INITIAL_BACKOFF_IN_SECONDS),
TimeUnit.SECONDS.toNanos(MAX_BACKOFF_IN_SECONDS),
BACKOFF_MULTIPLIER,
null,
ImmutableSet.of(RETRIABLE_STATUS_CODE_1, RETRIABLE_STATUS_CODE_2));
private static final HedgingPolicy HEDGING_POLICY =
new HedgingPolicy(

View File

@ -78,6 +78,7 @@ public class RetryPolicyTest {
TimeUnit.MILLISECONDS.toNanos(2100),
TimeUnit.MILLISECONDS.toNanos(2200),
parseDouble("3"),
null,
ImmutableSet.of(Code.UNAVAILABLE, Code.RESOURCE_EXHAUSTED)));
method = builder.setFullMethodName("SimpleService1/Foo1").build();
@ -87,6 +88,7 @@ public class RetryPolicyTest {
TimeUnit.MILLISECONDS.toNanos(100),
TimeUnit.MILLISECONDS.toNanos(1000),
parseDouble("2"),
null,
ImmutableSet.of(Code.UNAVAILABLE)));
method = builder.setFullMethodName("SimpleService2/not_exist").build();
@ -99,6 +101,7 @@ public class RetryPolicyTest {
TimeUnit.MILLISECONDS.toNanos(100),
TimeUnit.MILLISECONDS.toNanos(1000),
parseDouble("2"),
null,
ImmutableSet.of(Code.UNAVAILABLE)));
} finally {
if (reader != null) {