From dc95465f6a7e44b6f8dba9684542ed74d89faca2 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Wed, 31 Jan 2018 11:25:50 -0800 Subject: [PATCH] core: retry part 5, add RetryPolicy data object --- .../io/grpc/internal/ManagedChannelImpl.java | 39 +++- .../io/grpc/internal/RetriableStream.java | 62 +++++- .../io/grpc/internal/RetriableStreamTest.java | 6 +- .../io/grpc/internal/RetryPolicyTest.java | 188 ++++++++++++++++++ .../internal/test_retry_service_config.json | 53 +++++ 5 files changed, 345 insertions(+), 3 deletions(-) create mode 100644 core/src/test/java/io/grpc/internal/RetryPolicyTest.java create mode 100644 core/src/test/resources/io/grpc/internal/test_retry_service_config.json diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 87875b3bae..e5b7c5f40d 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState; import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static io.grpc.internal.RetriableStream.RetryPolicy.DEFAULT; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; @@ -53,6 +54,7 @@ import io.grpc.Status; import io.grpc.internal.Channelz.ChannelStats; import io.grpc.internal.ClientCallImpl.ClientTransportProvider; import io.grpc.internal.RetriableStream.ChannelBufferMeter; +import io.grpc.internal.RetriableStream.RetryPolicy; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; import java.lang.ref.SoftReference; @@ -75,6 +77,7 @@ import java.util.logging.Level; import java.util.logging.LogRecord; import java.util.logging.Logger; import java.util.regex.Pattern; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; @@ -205,6 +208,10 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume private final long perRpcBufferLimit; private final long channelBufferLimit; + private RetryPolicies retryPolicies; + // Temporary false flag that can skip the retry code path. + private boolean retryEnabled; + // Called from channelExecutor private final ManagedClientTransport.Listener delayedTransportListener = new ManagedClientTransport.Listener() { @@ -429,9 +436,11 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume final CallOptions callOptions, final Metadata headers, final Context context) { + RetryPolicy retryPolicy = retryPolicies == null ? DEFAULT : retryPolicies.get(method); return new RetriableStream( method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit, - getCallExecutor(callOptions), transportFactory.getScheduledExecutorService()) { + getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(), + retryPolicy) { @Override Status prestart() { return uncommittedRetriableStreamsRegistry.add(this); @@ -1033,6 +1042,18 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { return; } + + try { + if (retryEnabled) { + retryPolicies = getRetryPolicies(config); + } + } catch (RuntimeException re) { + logger.log( + Level.WARNING, + "[" + getLogId() + "] Unexpected exception from parsing service config", + re); + } + try { balancer.handleResolvedAddressGroups(servers, config); } catch (Throwable e) { @@ -1067,6 +1088,16 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume } } + // TODO(zdapeng): implement it once the Gson dependency issue is resolved. + private static RetryPolicies getRetryPolicies(Attributes config) { + return new RetryPolicies() { + @Override + public RetryPolicy get(MethodDescriptor method) { + return RetryPolicy.DEFAULT; + } + }; + } + private final class SubchannelImpl extends AbstractSubchannel { // Set right after SubchannelImpl is created. InternalSubchannel subchannel; @@ -1253,4 +1284,10 @@ public final class ManagedChannelImpl extends ManagedChannel implements Instrume return e; } } + + @VisibleForTesting + interface RetryPolicies { + @Nonnull + RetryPolicy get(MethodDescriptor method); + } } diff --git a/core/src/main/java/io/grpc/internal/RetriableStream.java b/core/src/main/java/io/grpc/internal/RetriableStream.java index e25845a88a..05aac4ef4e 100644 --- a/core/src/main/java/io/grpc/internal/RetriableStream.java +++ b/core/src/main/java/io/grpc/internal/RetriableStream.java @@ -16,10 +16,12 @@ package io.grpc.internal; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import io.grpc.Attributes; import io.grpc.CallOptions; import io.grpc.ClientStreamTracer; @@ -43,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.Immutable; /** A logical {@link ClientStream} that is retriable. */ abstract class RetriableStream implements ClientStream { @@ -58,6 +61,8 @@ abstract class RetriableStream implements ClientStream { private final ScheduledExecutorService scheduledExecutorService; // Must not modify it. private final Metadata headers; + // TODO(zdapeng): add and use its business logic + private final RetryPolicy retryPolicy; /** Must be held when updating state, accessing state.buffer, or certain substream attributes. */ private final Object lock = new Object(); @@ -80,7 +85,8 @@ abstract class RetriableStream implements ClientStream { RetriableStream( MethodDescriptor method, Metadata headers, ChannelBufferMeter channelBufferUsed, long perRpcBufferLimit, long channelBufferLimit, - Executor callExecutor, ScheduledExecutorService scheduledExecutorService) { + Executor callExecutor, ScheduledExecutorService scheduledExecutorService, + RetryPolicy retryPolicy) { this.method = method; this.channelBufferUsed = channelBufferUsed; this.perRpcBufferLimit = perRpcBufferLimit; @@ -88,6 +94,7 @@ abstract class RetriableStream implements ClientStream { this.callExecutor = callExecutor; this.scheduledExecutorService = scheduledExecutorService; this.headers = headers; + this.retryPolicy = checkNotNull(retryPolicy, "retryPolicy"); } @Nullable // null if already committed @@ -777,4 +784,57 @@ abstract class RetriableStream implements ClientStream { return bufferUsed.addAndGet(newBytesUsed); } } + + @Immutable + static final class RetryPolicy { + private final int maxAttempts; + private final double initialBackoffInSeconds; + private final double maxBackoffInSeconds; + private final double backoffMultiplier; + private final Collection retryableStatusCodes; + + RetryPolicy( + int maxAttempts, double initialBackoffInSeconds, double maxBackoffInSeconds, + double backoffMultiplier, Collection retryableStatusCodes) { + checkArgument(maxAttempts >= 1, "maxAttempts"); + this.maxAttempts = maxAttempts; + checkArgument(initialBackoffInSeconds >= 0D, "initialBackoffInSeconds"); + this.initialBackoffInSeconds = initialBackoffInSeconds; + checkArgument( + maxBackoffInSeconds >= initialBackoffInSeconds, + "maxBackoffInSeconds should be at least initialBackoffInSeconds"); + this.maxBackoffInSeconds = maxBackoffInSeconds; + checkArgument(backoffMultiplier > 0D, "backoffMultiplier"); + this.backoffMultiplier = backoffMultiplier; + this.retryableStatusCodes = Collections.unmodifiableSet( + new HashSet(checkNotNull(retryableStatusCodes, "retryableStatusCodes"))); + } + + /** No retry. */ + static final RetryPolicy DEFAULT = + new RetryPolicy(1, 0, 0, 1, Collections.emptyList()); + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof RetryPolicy)) { + return false; + } + RetryPolicy that = (RetryPolicy) o; + return maxAttempts == that.maxAttempts + && Double.compare(backoffMultiplier, that.backoffMultiplier) == 0 + && Double.compare(initialBackoffInSeconds, that.initialBackoffInSeconds) == 0 + && Double.compare(maxBackoffInSeconds, that.maxBackoffInSeconds) == 0 + && Objects.equal(retryableStatusCodes, that.retryableStatusCodes); + } + + @Override + public int hashCode() { + return Objects.hashCode( + maxAttempts, initialBackoffInSeconds, maxBackoffInSeconds, backoffMultiplier, + retryableStatusCodes); + } + } } diff --git a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java index 5345b09369..d816c3ec97 100644 --- a/core/src/test/java/io/grpc/internal/RetriableStreamTest.java +++ b/core/src/test/java/io/grpc/internal/RetriableStreamTest.java @@ -45,11 +45,14 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; +import io.grpc.Status.Code; import io.grpc.StringMarshaller; import io.grpc.internal.RetriableStream.ChannelBufferMeter; +import io.grpc.internal.RetriableStream.RetryPolicy; import io.grpc.internal.StreamListener.MessageProducer; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; @@ -86,7 +89,8 @@ public class RetriableStreamTest { private final RetriableStream retriableStream = new RetriableStream( method, new Metadata(),channelBufferUsed, PER_RPC_BUFFER_LIMIT, CHANNEL_BUFFER_LIMIT, - MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService()) { + MoreExecutors.directExecutor(), fakeClock.getScheduledExecutorService(), + new RetryPolicy(4, 100D, 300D, 2D, Arrays.asList(Code.UNAVAILABLE, Code.DATA_LOSS))) { @Override void postCommit() { retriableStreamRecorder.postCommit(); diff --git a/core/src/test/java/io/grpc/internal/RetryPolicyTest.java b/core/src/test/java/io/grpc/internal/RetryPolicyTest.java new file mode 100644 index 0000000000..90e45dcc17 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/RetryPolicyTest.java @@ -0,0 +1,188 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static com.google.common.base.Preconditions.checkState; +import static io.grpc.internal.RetriableStream.RetryPolicy.DEFAULT; +import static java.lang.Double.parseDouble; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; +import io.grpc.MethodDescriptor; +import io.grpc.Status.Code; +import io.grpc.internal.ManagedChannelImpl.RetryPolicies; +import io.grpc.internal.RetriableStream.RetryPolicy; +import io.grpc.testing.TestMethodDescriptors; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for RetryPolicy. */ +@RunWith(JUnit4.class) +public class RetryPolicyTest { + // TODO(zdapeng): move and refactor it to an appropriate place in src when implementing + // ManagedChannelImpl.getRetryPolicies() + static RetryPolicies getRetryPolicies(JsonObject serviceConfig) { + final Map fullMethodNameMap = new HashMap(); + final Map serviceNameMap = new HashMap(); + + if (serviceConfig != null) { + JsonArray methodConfigs = serviceConfig.getAsJsonArray("methodConfig"); + + /* schema as follows + { + "methodConfig": [ + { + "name": [ + { + "service": string, + "method": string, // Optional + } + ], + "retryPolicy": { + "maxAttempts": number, + "initialBackoff": string, // Long decimal with "s" appended + "maxBackoff": string, // Long decimal with "s" appended + "backoffMultiplier": number + "retryableStatusCodes": [] + } + } + ] + } + */ + + if (methodConfigs != null) { + for (JsonElement methodConfig : methodConfigs) { + JsonArray names = methodConfig.getAsJsonObject().getAsJsonArray("name"); + JsonObject retryPolicy = methodConfig.getAsJsonObject().getAsJsonObject("retryPolicy"); + if (retryPolicy != null) { + int maxAttempts = retryPolicy.getAsJsonPrimitive("maxAttempts").getAsInt(); + String initialBackoffStr = + retryPolicy.getAsJsonPrimitive("initialBackoff").getAsString(); + checkState( + initialBackoffStr.charAt(initialBackoffStr.length() - 1) == 's', + "invalid value of initialBackoff"); + double initialBackoff = + Double.parseDouble(initialBackoffStr.substring(0, initialBackoffStr.length() - 1)); + String maxBackoffStr = + retryPolicy.getAsJsonPrimitive("maxBackoff").getAsString(); + checkState( + maxBackoffStr.charAt(maxBackoffStr.length() - 1) == 's', + "invalid value of maxBackoff"); + double maxBackoff = + Double.parseDouble(maxBackoffStr.substring(0, maxBackoffStr.length() - 1)); + double backoffMultiplier = + retryPolicy.getAsJsonPrimitive("backoffMultiplier").getAsDouble(); + JsonArray retryableStatusCodes = retryPolicy.getAsJsonArray("retryableStatusCodes"); + Set codeSet = new HashSet(retryableStatusCodes.size()); + for (JsonElement retryableStatusCode : retryableStatusCodes) { + codeSet.add(Code.valueOf(retryableStatusCode.getAsString())); + } + RetryPolicy pojoPolicy = new RetryPolicy( + maxAttempts, initialBackoff, maxBackoff, backoffMultiplier, codeSet); + + for (JsonElement name : names) { + String service = name.getAsJsonObject().getAsJsonPrimitive("service").getAsString(); + JsonPrimitive method = name.getAsJsonObject().getAsJsonPrimitive("method"); + if (method != null && !method.getAsString().isEmpty()) { + fullMethodNameMap.put( + MethodDescriptor.generateFullMethodName(service, method.getAsString()), + pojoPolicy); + } else { + serviceNameMap.put(service, pojoPolicy); + } + } + } + } + } + } + + return new RetryPolicies() { + @Override + public RetryPolicy get(MethodDescriptor method) { + RetryPolicy retryPolicy = fullMethodNameMap.get(method.getFullMethodName()); + if (retryPolicy == null) { + retryPolicy = serviceNameMap + .get(MethodDescriptor.extractFullServiceName(method.getFullMethodName())); + } + if (retryPolicy == null) { + retryPolicy = DEFAULT; + } + return retryPolicy; + } + }; + } + + @Test + public void getRetryPolicies() throws Exception { + Reader reader = null; + try { + reader = new InputStreamReader( + RetryPolicyTest.class.getResourceAsStream( + "/io/grpc/internal/test_retry_service_config.json"), + "UTF-8"); + JsonObject serviceConfig = new JsonParser().parse(reader).getAsJsonObject(); + RetryPolicies retryPolicies = getRetryPolicies(serviceConfig); + assertNotNull(retryPolicies); + + MethodDescriptor.Builder builder = TestMethodDescriptors.voidMethod().toBuilder(); + + assertEquals( + RetryPolicy.DEFAULT, + retryPolicies.get(builder.setFullMethodName("not/exist").build())); + assertEquals( + RetryPolicy.DEFAULT, + retryPolicies.get(builder.setFullMethodName("not_exist/Foo1").build())); + + assertEquals( + new RetryPolicy( + 3, parseDouble("2.1"), parseDouble("2.2"), parseDouble("3"), + Arrays.asList(Code.UNAVAILABLE, Code.RESOURCE_EXHAUSTED)), + retryPolicies.get(builder.setFullMethodName("SimpleService1/not_exist").build())); + assertEquals( + new RetryPolicy( + 4, parseDouble(".1"), parseDouble("1"), parseDouble("2"), + Arrays.asList(Code.UNAVAILABLE)), + retryPolicies.get(builder.setFullMethodName("SimpleService1/Foo1").build())); + + assertEquals( + RetryPolicy.DEFAULT, + retryPolicies.get(builder.setFullMethodName("SimpleService2/not_exist").build())); + assertEquals( + new RetryPolicy( + 4, parseDouble(".1"), parseDouble("1"), parseDouble("2"), + Arrays.asList(Code.UNAVAILABLE)), + retryPolicies.get(builder.setFullMethodName("SimpleService2/Foo2").build())); + } finally { + if (reader != null) { + reader.close(); + } + } + } +} diff --git a/core/src/test/resources/io/grpc/internal/test_retry_service_config.json b/core/src/test/resources/io/grpc/internal/test_retry_service_config.json new file mode 100644 index 0000000000..406c7f5d26 --- /dev/null +++ b/core/src/test/resources/io/grpc/internal/test_retry_service_config.json @@ -0,0 +1,53 @@ +{ + "loadBalancingPolicy":"round_robin", + "methodConfig":[ + { + "name":[ + { + "service":"SimpleService1" + } + ], + "waitForReady":false, + "retryPolicy":{ + "maxAttempts":3, + "initialBackoff":"2.1s", + "maxBackoff":"2.2s", + "backoffMultiplier":3, + "retryableStatusCodes":[ + "UNAVAILABLE", + "RESOURCE_EXHAUSTED" + ] + } + }, + { + "name":[ + { + "service":"SimpleService2" + } + ], + "waitForReady":false + }, + { + "name":[ + { + "service":"SimpleService1", + "method":"Foo1" + }, + { + "service":"SimpleService2", + "method":"Foo2" + } + ], + "waitForReady":true, + "retryPolicy":{ + "maxAttempts":4, + "initialBackoff":".1s", + "maxBackoff":"1s", + "backoffMultiplier":2, + "retryableStatusCodes":[ + "UNAVAILABLE" + ] + } + } + ] +}