mirror of https://github.com/grpc/grpc-java.git
core: retry part 5, add RetryPolicy data object
This commit is contained in:
parent
722d6f0bea
commit
dc95465f6a
|
|
@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
|
||||||
import static io.grpc.ConnectivityState.IDLE;
|
import static io.grpc.ConnectivityState.IDLE;
|
||||||
import static io.grpc.ConnectivityState.SHUTDOWN;
|
import static io.grpc.ConnectivityState.SHUTDOWN;
|
||||||
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||||
|
import static io.grpc.internal.RetriableStream.RetryPolicy.DEFAULT;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Stopwatch;
|
import com.google.common.base.Stopwatch;
|
||||||
|
|
@ -53,6 +54,7 @@ import io.grpc.Status;
|
||||||
import io.grpc.internal.Channelz.ChannelStats;
|
import io.grpc.internal.Channelz.ChannelStats;
|
||||||
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
|
import io.grpc.internal.ClientCallImpl.ClientTransportProvider;
|
||||||
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
|
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
|
||||||
|
import io.grpc.internal.RetriableStream.RetryPolicy;
|
||||||
import java.lang.ref.Reference;
|
import java.lang.ref.Reference;
|
||||||
import java.lang.ref.ReferenceQueue;
|
import java.lang.ref.ReferenceQueue;
|
||||||
import java.lang.ref.SoftReference;
|
import java.lang.ref.SoftReference;
|
||||||
|
|
@ -75,6 +77,7 @@ import java.util.logging.Level;
|
||||||
import java.util.logging.LogRecord;
|
import java.util.logging.LogRecord;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
import javax.annotation.Nonnull;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import javax.annotation.concurrent.GuardedBy;
|
import javax.annotation.concurrent.GuardedBy;
|
||||||
import javax.annotation.concurrent.ThreadSafe;
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
@ -205,6 +208,10 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
|
||||||
private final long perRpcBufferLimit;
|
private final long perRpcBufferLimit;
|
||||||
private final long channelBufferLimit;
|
private final long channelBufferLimit;
|
||||||
|
|
||||||
|
private RetryPolicies retryPolicies;
|
||||||
|
// Temporary false flag that can skip the retry code path.
|
||||||
|
private boolean retryEnabled;
|
||||||
|
|
||||||
// Called from channelExecutor
|
// Called from channelExecutor
|
||||||
private final ManagedClientTransport.Listener delayedTransportListener =
|
private final ManagedClientTransport.Listener delayedTransportListener =
|
||||||
new ManagedClientTransport.Listener() {
|
new ManagedClientTransport.Listener() {
|
||||||
|
|
@ -429,9 +436,11 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
|
||||||
final CallOptions callOptions,
|
final CallOptions callOptions,
|
||||||
final Metadata headers,
|
final Metadata headers,
|
||||||
final Context context) {
|
final Context context) {
|
||||||
|
RetryPolicy retryPolicy = retryPolicies == null ? DEFAULT : retryPolicies.get(method);
|
||||||
return new RetriableStream<ReqT>(
|
return new RetriableStream<ReqT>(
|
||||||
method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit,
|
method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit,
|
||||||
getCallExecutor(callOptions), transportFactory.getScheduledExecutorService()) {
|
getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(),
|
||||||
|
retryPolicy) {
|
||||||
@Override
|
@Override
|
||||||
Status prestart() {
|
Status prestart() {
|
||||||
return uncommittedRetriableStreamsRegistry.add(this);
|
return uncommittedRetriableStreamsRegistry.add(this);
|
||||||
|
|
@ -1033,6 +1042,18 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
|
||||||
if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
|
if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (retryEnabled) {
|
||||||
|
retryPolicies = getRetryPolicies(config);
|
||||||
|
}
|
||||||
|
} catch (RuntimeException re) {
|
||||||
|
logger.log(
|
||||||
|
Level.WARNING,
|
||||||
|
"[" + getLogId() + "] Unexpected exception from parsing service config",
|
||||||
|
re);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
balancer.handleResolvedAddressGroups(servers, config);
|
balancer.handleResolvedAddressGroups(servers, config);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
|
@ -1067,6 +1088,16 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(zdapeng): implement it once the Gson dependency issue is resolved.
|
||||||
|
private static RetryPolicies getRetryPolicies(Attributes config) {
|
||||||
|
return new RetryPolicies() {
|
||||||
|
@Override
|
||||||
|
public RetryPolicy get(MethodDescriptor<?, ?> method) {
|
||||||
|
return RetryPolicy.DEFAULT;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
private final class SubchannelImpl extends AbstractSubchannel {
|
private final class SubchannelImpl extends AbstractSubchannel {
|
||||||
// Set right after SubchannelImpl is created.
|
// Set right after SubchannelImpl is created.
|
||||||
InternalSubchannel subchannel;
|
InternalSubchannel subchannel;
|
||||||
|
|
@ -1253,4 +1284,10 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume
|
||||||
return e;
|
return e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
interface RetryPolicies {
|
||||||
|
@Nonnull
|
||||||
|
RetryPolicy get(MethodDescriptor<?, ?> method);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,10 +16,12 @@
|
||||||
|
|
||||||
package io.grpc.internal;
|
package io.grpc.internal;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
import static com.google.common.base.Preconditions.checkState;
|
import static com.google.common.base.Preconditions.checkState;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Objects;
|
||||||
import io.grpc.Attributes;
|
import io.grpc.Attributes;
|
||||||
import io.grpc.CallOptions;
|
import io.grpc.CallOptions;
|
||||||
import io.grpc.ClientStreamTracer;
|
import io.grpc.ClientStreamTracer;
|
||||||
|
|
@ -43,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import javax.annotation.CheckReturnValue;
|
import javax.annotation.CheckReturnValue;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import javax.annotation.concurrent.GuardedBy;
|
import javax.annotation.concurrent.GuardedBy;
|
||||||
|
import javax.annotation.concurrent.Immutable;
|
||||||
|
|
||||||
/** A logical {@link ClientStream} that is retriable. */
|
/** A logical {@link ClientStream} that is retriable. */
|
||||||
abstract class RetriableStream<ReqT> implements ClientStream {
|
abstract class RetriableStream<ReqT> implements ClientStream {
|
||||||
|
|
@ -58,6 +61,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
|
||||||
private final ScheduledExecutorService scheduledExecutorService;
|
private final ScheduledExecutorService scheduledExecutorService;
|
||||||
// Must not modify it.
|
// Must not modify it.
|
||||||
private final Metadata headers;
|
private final Metadata headers;
|
||||||
|
// TODO(zdapeng): add and use its business logic
|
||||||
|
private final RetryPolicy retryPolicy;
|
||||||
|
|
||||||
/** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
|
/** Must be held when updating state, accessing state.buffer, or certain substream attributes. */
|
||||||
private final Object lock = new Object();
|
private final Object lock = new Object();
|
||||||
|
|
@ -80,7 +85,8 @@ abstract class RetriableStream<ReqT> implements ClientStream {
|
||||||
RetriableStream(
|
RetriableStream(
|
||||||
MethodDescriptor<ReqT, ?> method, Metadata headers,
|
MethodDescriptor<ReqT, ?> method, Metadata headers,
|
||||||
ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
|
ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit,
|
||||||
Executor callExecutor, ScheduledExecutorService scheduledExecutorService) {
|
Executor callExecutor, ScheduledExecutorService scheduledExecutorService,
|
||||||
|
RetryPolicy retryPolicy) {
|
||||||
this.method = method;
|
this.method = method;
|
||||||
this.channelBufferUsed = channelBufferUsed;
|
this.channelBufferUsed = channelBufferUsed;
|
||||||
this.perRpcBufferLimit = perRpcBufferLimit;
|
this.perRpcBufferLimit = perRpcBufferLimit;
|
||||||
|
|
@ -88,6 +94,7 @@ abstract class RetriableStream<ReqT> implements ClientStream {
|
||||||
this.callExecutor = callExecutor;
|
this.callExecutor = callExecutor;
|
||||||
this.scheduledExecutorService = scheduledExecutorService;
|
this.scheduledExecutorService = scheduledExecutorService;
|
||||||
this.headers = headers;
|
this.headers = headers;
|
||||||
|
this.retryPolicy = checkNotNull(retryPolicy, "retryPolicy");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable // null if already committed
|
@Nullable // null if already committed
|
||||||
|
|
@ -777,4 +784,57 @@ abstract class RetriableStream<ReqT> implements ClientStream {
|
||||||
return bufferUsed.addAndGet(newBytesUsed);
|
return bufferUsed.addAndGet(newBytesUsed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Immutable
|
||||||
|
static final class RetryPolicy {
|
||||||
|
private final int maxAttempts;
|
||||||
|
private final double initialBackoffInSeconds;
|
||||||
|
private final double maxBackoffInSeconds;
|
||||||
|
private final double backoffMultiplier;
|
||||||
|
private final Collection<Status.Code> retryableStatusCodes;
|
||||||
|
|
||||||
|
RetryPolicy(
|
||||||
|
int maxAttempts, double initialBackoffInSeconds, double maxBackoffInSeconds,
|
||||||
|
double backoffMultiplier, Collection<Status.Code> retryableStatusCodes) {
|
||||||
|
checkArgument(maxAttempts >= 1, "maxAttempts");
|
||||||
|
this.maxAttempts = maxAttempts;
|
||||||
|
checkArgument(initialBackoffInSeconds >= 0D, "initialBackoffInSeconds");
|
||||||
|
this.initialBackoffInSeconds = initialBackoffInSeconds;
|
||||||
|
checkArgument(
|
||||||
|
maxBackoffInSeconds >= initialBackoffInSeconds,
|
||||||
|
"maxBackoffInSeconds should be at least initialBackoffInSeconds");
|
||||||
|
this.maxBackoffInSeconds = maxBackoffInSeconds;
|
||||||
|
checkArgument(backoffMultiplier > 0D, "backoffMultiplier");
|
||||||
|
this.backoffMultiplier = backoffMultiplier;
|
||||||
|
this.retryableStatusCodes = Collections.unmodifiableSet(
|
||||||
|
new HashSet<Status.Code>(checkNotNull(retryableStatusCodes, "retryableStatusCodes")));
|
||||||
|
}
|
||||||
|
|
||||||
|
/** No retry. */
|
||||||
|
static final RetryPolicy DEFAULT =
|
||||||
|
new RetryPolicy(1, 0, 0, 1, Collections.<Status.Code>emptyList());
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!(o instanceof RetryPolicy)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
RetryPolicy that = (RetryPolicy) o;
|
||||||
|
return maxAttempts == that.maxAttempts
|
||||||
|
&& Double.compare(backoffMultiplier, that.backoffMultiplier) == 0
|
||||||
|
&& Double.compare(initialBackoffInSeconds, that.initialBackoffInSeconds) == 0
|
||||||
|
&& Double.compare(maxBackoffInSeconds, that.maxBackoffInSeconds) == 0
|
||||||
|
&& Objects.equal(retryableStatusCodes, that.retryableStatusCodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hashCode(
|
||||||
|
maxAttempts, initialBackoffInSeconds, maxBackoffInSeconds, backoffMultiplier,
|
||||||
|
retryableStatusCodes);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -45,11 +45,14 @@ import io.grpc.Metadata;
|
||||||
import io.grpc.MethodDescriptor;
|
import io.grpc.MethodDescriptor;
|
||||||
import io.grpc.MethodDescriptor.MethodType;
|
import io.grpc.MethodDescriptor.MethodType;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
|
import io.grpc.Status.Code;
|
||||||
import io.grpc.StringMarshaller;
|
import io.grpc.StringMarshaller;
|
||||||
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
|
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
|
||||||
|
import io.grpc.internal.RetriableStream.RetryPolicy;
|
||||||
import io.grpc.internal.StreamListener.MessageProducer;
|
import io.grpc.internal.StreamListener.MessageProducer;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
@ -86,7 +89,8 @@ public class RetriableStreamTest {
|
||||||
private final RetriableStream<String> retriableStream =
|
private final RetriableStream<String> retriableStream =
|
||||||
new RetriableStream<String>(
|
new RetriableStream<String>(
|
||||||
method, new Metadata(),channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
|
method, new Metadata(),channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT,
|
||||||
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService()) {
|
MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(),
|
||||||
|
new RetryPolicy(4, 100D, 300D, 2D, Arrays.asList(Code.UNAVAILABLE, Code.DATA_LOSS))) {
|
||||||
@Override
|
@Override
|
||||||
void postCommit() {
|
void postCommit() {
|
||||||
retriableStreamRecorder.postCommit();
|
retriableStreamRecorder.postCommit();
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,188 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2018, gRPC Authors All rights reserved.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.grpc.internal;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkState;
|
||||||
|
import static io.grpc.internal.RetriableStream.RetryPolicy.DEFAULT;
|
||||||
|
import static java.lang.Double.parseDouble;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
|
import com.google.gson.JsonArray;
|
||||||
|
import com.google.gson.JsonElement;
|
||||||
|
import com.google.gson.JsonObject;
|
||||||
|
import com.google.gson.JsonParser;
|
||||||
|
import com.google.gson.JsonPrimitive;
|
||||||
|
import io.grpc.MethodDescriptor;
|
||||||
|
import io.grpc.Status.Code;
|
||||||
|
import io.grpc.internal.ManagedChannelImpl.RetryPolicies;
|
||||||
|
import io.grpc.internal.RetriableStream.RetryPolicy;
|
||||||
|
import io.grpc.testing.TestMethodDescriptors;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.Reader;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.JUnit4;
|
||||||
|
|
||||||
|
/** Unit tests for RetryPolicy. */
|
||||||
|
@RunWith(JUnit4.class)
|
||||||
|
public class RetryPolicyTest {
|
||||||
|
// TODO(zdapeng): move and refactor it to an appropriate place in src when implementing
|
||||||
|
// ManagedChannelImpl.getRetryPolicies()
|
||||||
|
static RetryPolicies getRetryPolicies(JsonObject serviceConfig) {
|
||||||
|
final Map<String, RetryPolicy> fullMethodNameMap = new HashMap<String, RetryPolicy>();
|
||||||
|
final Map<String, RetryPolicy> serviceNameMap = new HashMap<String, RetryPolicy>();
|
||||||
|
|
||||||
|
if (serviceConfig != null) {
|
||||||
|
JsonArray methodConfigs = serviceConfig.getAsJsonArray("methodConfig");
|
||||||
|
|
||||||
|
/* schema as follows
|
||||||
|
{
|
||||||
|
"methodConfig": [
|
||||||
|
{
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"service": string,
|
||||||
|
"method": string, // Optional
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"retryPolicy": {
|
||||||
|
"maxAttempts": number,
|
||||||
|
"initialBackoff": string, // Long decimal with "s" appended
|
||||||
|
"maxBackoff": string, // Long decimal with "s" appended
|
||||||
|
"backoffMultiplier": number
|
||||||
|
"retryableStatusCodes": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (methodConfigs != null) {
|
||||||
|
for (JsonElement methodConfig : methodConfigs) {
|
||||||
|
JsonArray names = methodConfig.getAsJsonObject().getAsJsonArray("name");
|
||||||
|
JsonObject retryPolicy = methodConfig.getAsJsonObject().getAsJsonObject("retryPolicy");
|
||||||
|
if (retryPolicy != null) {
|
||||||
|
int maxAttempts = retryPolicy.getAsJsonPrimitive("maxAttempts").getAsInt();
|
||||||
|
String initialBackoffStr =
|
||||||
|
retryPolicy.getAsJsonPrimitive("initialBackoff").getAsString();
|
||||||
|
checkState(
|
||||||
|
initialBackoffStr.charAt(initialBackoffStr.length() - 1) == 's',
|
||||||
|
"invalid value of initialBackoff");
|
||||||
|
double initialBackoff =
|
||||||
|
Double.parseDouble(initialBackoffStr.substring(0, initialBackoffStr.length() - 1));
|
||||||
|
String maxBackoffStr =
|
||||||
|
retryPolicy.getAsJsonPrimitive("maxBackoff").getAsString();
|
||||||
|
checkState(
|
||||||
|
maxBackoffStr.charAt(maxBackoffStr.length() - 1) == 's',
|
||||||
|
"invalid value of maxBackoff");
|
||||||
|
double maxBackoff =
|
||||||
|
Double.parseDouble(maxBackoffStr.substring(0, maxBackoffStr.length() - 1));
|
||||||
|
double backoffMultiplier =
|
||||||
|
retryPolicy.getAsJsonPrimitive("backoffMultiplier").getAsDouble();
|
||||||
|
JsonArray retryableStatusCodes = retryPolicy.getAsJsonArray("retryableStatusCodes");
|
||||||
|
Set<Code> codeSet = new HashSet<Code>(retryableStatusCodes.size());
|
||||||
|
for (JsonElement retryableStatusCode : retryableStatusCodes) {
|
||||||
|
codeSet.add(Code.valueOf(retryableStatusCode.getAsString()));
|
||||||
|
}
|
||||||
|
RetryPolicy pojoPolicy = new RetryPolicy(
|
||||||
|
maxAttempts, initialBackoff, maxBackoff, backoffMultiplier, codeSet);
|
||||||
|
|
||||||
|
for (JsonElement name : names) {
|
||||||
|
String service = name.getAsJsonObject().getAsJsonPrimitive("service").getAsString();
|
||||||
|
JsonPrimitive method = name.getAsJsonObject().getAsJsonPrimitive("method");
|
||||||
|
if (method != null && !method.getAsString().isEmpty()) {
|
||||||
|
fullMethodNameMap.put(
|
||||||
|
MethodDescriptor.generateFullMethodName(service, method.getAsString()),
|
||||||
|
pojoPolicy);
|
||||||
|
} else {
|
||||||
|
serviceNameMap.put(service, pojoPolicy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new RetryPolicies() {
|
||||||
|
@Override
|
||||||
|
public RetryPolicy get(MethodDescriptor<?, ?> method) {
|
||||||
|
RetryPolicy retryPolicy = fullMethodNameMap.get(method.getFullMethodName());
|
||||||
|
if (retryPolicy == null) {
|
||||||
|
retryPolicy = serviceNameMap
|
||||||
|
.get(MethodDescriptor.extractFullServiceName(method.getFullMethodName()));
|
||||||
|
}
|
||||||
|
if (retryPolicy == null) {
|
||||||
|
retryPolicy = DEFAULT;
|
||||||
|
}
|
||||||
|
return retryPolicy;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getRetryPolicies() throws Exception {
|
||||||
|
Reader reader = null;
|
||||||
|
try {
|
||||||
|
reader = new InputStreamReader(
|
||||||
|
RetryPolicyTest.class.getResourceAsStream(
|
||||||
|
"/io/grpc/internal/test_retry_service_config.json"),
|
||||||
|
"UTF-8");
|
||||||
|
JsonObject serviceConfig = new JsonParser().parse(reader).getAsJsonObject();
|
||||||
|
RetryPolicies retryPolicies = getRetryPolicies(serviceConfig);
|
||||||
|
assertNotNull(retryPolicies);
|
||||||
|
|
||||||
|
MethodDescriptor.Builder<Void, Void> builder = TestMethodDescriptors.voidMethod().toBuilder();
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
RetryPolicy.DEFAULT,
|
||||||
|
retryPolicies.get(builder.setFullMethodName("not/exist").build()));
|
||||||
|
assertEquals(
|
||||||
|
RetryPolicy.DEFAULT,
|
||||||
|
retryPolicies.get(builder.setFullMethodName("not_exist/Foo1").build()));
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
new RetryPolicy(
|
||||||
|
3, parseDouble("2.1"), parseDouble("2.2"), parseDouble("3"),
|
||||||
|
Arrays.asList(Code.UNAVAILABLE, Code.RESOURCE_EXHAUSTED)),
|
||||||
|
retryPolicies.get(builder.setFullMethodName("SimpleService1/not_exist").build()));
|
||||||
|
assertEquals(
|
||||||
|
new RetryPolicy(
|
||||||
|
4, parseDouble(".1"), parseDouble("1"), parseDouble("2"),
|
||||||
|
Arrays.asList(Code.UNAVAILABLE)),
|
||||||
|
retryPolicies.get(builder.setFullMethodName("SimpleService1/Foo1").build()));
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
RetryPolicy.DEFAULT,
|
||||||
|
retryPolicies.get(builder.setFullMethodName("SimpleService2/not_exist").build()));
|
||||||
|
assertEquals(
|
||||||
|
new RetryPolicy(
|
||||||
|
4, parseDouble(".1"), parseDouble("1"), parseDouble("2"),
|
||||||
|
Arrays.asList(Code.UNAVAILABLE)),
|
||||||
|
retryPolicies.get(builder.setFullMethodName("SimpleService2/Foo2").build()));
|
||||||
|
} finally {
|
||||||
|
if (reader != null) {
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,53 @@
|
||||||
|
{
|
||||||
|
"loadBalancingPolicy":"round_robin",
|
||||||
|
"methodConfig":[
|
||||||
|
{
|
||||||
|
"name":[
|
||||||
|
{
|
||||||
|
"service":"SimpleService1"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"waitForReady":false,
|
||||||
|
"retryPolicy":{
|
||||||
|
"maxAttempts":3,
|
||||||
|
"initialBackoff":"2.1s",
|
||||||
|
"maxBackoff":"2.2s",
|
||||||
|
"backoffMultiplier":3,
|
||||||
|
"retryableStatusCodes":[
|
||||||
|
"UNAVAILABLE",
|
||||||
|
"RESOURCE_EXHAUSTED"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":[
|
||||||
|
{
|
||||||
|
"service":"SimpleService2"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"waitForReady":false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":[
|
||||||
|
{
|
||||||
|
"service":"SimpleService1",
|
||||||
|
"method":"Foo1"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"service":"SimpleService2",
|
||||||
|
"method":"Foo2"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"waitForReady":true,
|
||||||
|
"retryPolicy":{
|
||||||
|
"maxAttempts":4,
|
||||||
|
"initialBackoff":".1s",
|
||||||
|
"maxBackoff":"1s",
|
||||||
|
"backoffMultiplier":2,
|
||||||
|
"retryableStatusCodes":[
|
||||||
|
"UNAVAILABLE"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue