From 2c2c48a5921ffba4cad4695c43a01543b5ccd4c6 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Mon, 27 Jul 2015 14:36:21 -0700 Subject: [PATCH] Implement Transport reconnect --- core/src/main/java/io/grpc/BackoffPolicy.java | 47 ++++++++ core/src/main/java/io/grpc/ChannelImpl.java | 89 ++++++++++++-- .../src/main/java/io/grpc/ClientCallImpl.java | 2 +- .../io/grpc/ExponentialBackoffPolicy.java | 112 ++++++++++++++++++ .../io/grpc/ExponentialBackoffPolicyTest.java | 74 ++++++++++++ 5 files changed, 315 insertions(+), 9 deletions(-) create mode 100644 core/src/main/java/io/grpc/BackoffPolicy.java create mode 100644 core/src/main/java/io/grpc/ExponentialBackoffPolicy.java create mode 100644 core/src/test/java/io/grpc/ExponentialBackoffPolicyTest.java diff --git a/core/src/main/java/io/grpc/BackoffPolicy.java b/core/src/main/java/io/grpc/BackoffPolicy.java new file mode 100644 index 0000000000..2f24f87b5f --- /dev/null +++ b/core/src/main/java/io/grpc/BackoffPolicy.java @@ -0,0 +1,47 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +/** + * Determines how long to wait before doing some action (typically a retry, or a reconnect). + */ +interface BackoffPolicy { + interface Provider { + BackoffPolicy get(); + } + + /** + * @return The number of milliseconds to wait. + */ + long nextBackoffMillis(); +} + diff --git a/core/src/main/java/io/grpc/ChannelImpl.java b/core/src/main/java/io/grpc/ChannelImpl.java index c50950d1a3..986a1d461a 100644 --- a/core/src/main/java/io/grpc/ChannelImpl.java +++ b/core/src/main/java/io/grpc/ChannelImpl.java @@ -35,6 +35,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.grpc.ClientCallImpl.ClientTransportProvider; +import io.grpc.Metadata.Headers; +import io.grpc.transport.ClientStream; +import io.grpc.transport.ClientStreamListener; import io.grpc.transport.ClientTransport; import io.grpc.transport.ClientTransport.PingCallback; import io.grpc.transport.ClientTransportFactory; @@ -67,7 +70,11 @@ public final class ChannelImpl extends Channel { /** * Executor that runs deadline timers for requests. */ - private ScheduledExecutorService deadlineCancellationExecutor; + private ScheduledExecutorService scheduledExecutor; + + // TODO(carl-mastrangelo): Allow clients to pass this in + private final BackoffPolicy.Provider backoffPolicyProvider = + new ExponentialBackoffPolicy.Provider(); /** * We delegate to this channel, so that we can have interceptors as necessary. If there aren't * any interceptors this will just be {@link RealChannel}. @@ -91,6 +98,9 @@ public final class ChannelImpl extends Channel { private boolean terminated; private Runnable terminationRunnable; + private long reconnectTimeMillis; + private BackoffPolicy reconnectPolicy; + private final ClientTransportProvider transportProvider = new ClientTransportProvider() { @Override public ClientTransport get() { @@ -104,7 +114,7 @@ public final class ChannelImpl extends Channel { this.executor = executor; this.userAgent = userAgent; this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors); - deadlineCancellationExecutor = SharedResourceHolder.get(TIMER_SERVICE); + scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE); } /** Hack to allow executors to auto-shutdown. Not for general use. */ @@ -124,8 +134,7 @@ public final class ChannelImpl extends Channel { } shutdown = true; // After shutdown there are no new calls, so no new cancellation tasks are needed - deadlineCancellationExecutor = - SharedResourceHolder.release(TIMER_SERVICE, deadlineCancellationExecutor); + scheduledExecutor = SharedResourceHolder.release(TIMER_SERVICE, scheduledExecutor); if (activeTransport != null) { activeTransport.shutdown(); activeTransport = null; @@ -231,7 +240,8 @@ public final class ChannelImpl extends Channel { private ClientTransport obtainActiveTransport() { ClientTransport savedActiveTransport = activeTransport; - if (savedActiveTransport != null) { + // If we know there is an active transport and we are not in backoff mode, return quickly. + if (savedActiveTransport != null && !(savedActiveTransport instanceof InactiveTransport)) { return savedActiveTransport; } synchronized (lock) { @@ -239,9 +249,22 @@ public final class ChannelImpl extends Channel { return null; } savedActiveTransport = activeTransport; + if (savedActiveTransport instanceof InactiveTransport) { + if (System.nanoTime() > TimeUnit.MILLISECONDS.toNanos(reconnectTimeMillis)) { + // The timeout expired, clear the inactive transport and update the shutdown status to + // something that is retryable. + activeTransport = null; + savedActiveTransport = activeTransport; + } else { + // We are still in backoff mode, just return the inactive transport. + return savedActiveTransport; + } + } + if (savedActiveTransport != null) { return savedActiveTransport; } + // There is no active transport, or we just finished backoff. Create a new transport. ClientTransport newActiveTransport = transportFactory.newClientTransport(); transports.add(newActiveTransport); boolean failed = true; @@ -273,7 +296,7 @@ public final class ChannelImpl extends Channel { new SerializingExecutor(executor), callOptions, transportProvider, - deadlineCancellationExecutor) + scheduledExecutor) .setUserAgent(userAgent); } } @@ -287,15 +310,30 @@ public final class ChannelImpl extends Channel { @Override public void transportReady() { - // TODO(carl-mastrangelo): Implement this + synchronized (lock) { + if (activeTransport == transport) { + reconnectPolicy = null; + } + } } @Override public void transportShutdown(Status s) { - // TODO(carl-mastrangelo): use this status to determine if and how to retry the connection. synchronized (lock) { if (activeTransport == transport) { activeTransport = null; + // This transport listener was attached to the active transport. + if (s.isOk()) { + return; + } + // Alright, something bad has happened. + if (reconnectPolicy == null) { + // This happens the first time something bad has happened. + reconnectPolicy = backoffPolicyProvider.get(); + reconnectTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + } + activeTransport = new InactiveTransport(s); + reconnectTimeMillis += reconnectPolicy.nextBackoffMillis(); } } } @@ -420,4 +458,39 @@ public final class ChannelImpl extends Channel { instance.shutdown(); } }; + + private static final class InactiveTransport implements ClientTransport { + private final Status shutdownStatus; + + private InactiveTransport(Status s) { + shutdownStatus = s; + } + + @Override + public ClientStream newStream( + MethodDescriptor method, Headers headers, ClientStreamListener listener) { + listener.closed(shutdownStatus, new Metadata.Trailers()); + return new ClientCallImpl.NoopClientStream(); + } + + @Override + public void start(Listener listener) { + throw new IllegalStateException(); + } + + @Override + public void ping(final PingCallback callback, Executor executor) { + executor.execute(new Runnable() { + @Override + public void run() { + callback.pingFailed(shutdownStatus.asException()); + } + }); + } + + @Override + public void shutdown() { + // no-op + } + } } diff --git a/core/src/main/java/io/grpc/ClientCallImpl.java b/core/src/main/java/io/grpc/ClientCallImpl.java index e0ecbb42bc..835fee49d0 100644 --- a/core/src/main/java/io/grpc/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/ClientCallImpl.java @@ -305,7 +305,7 @@ final class ClientCallImpl extends ClientCall { } } - private static class NoopClientStream implements ClientStream { + static class NoopClientStream implements ClientStream { @Override public void writeMessage(InputStream message) {} @Override public void flush() {} diff --git a/core/src/main/java/io/grpc/ExponentialBackoffPolicy.java b/core/src/main/java/io/grpc/ExponentialBackoffPolicy.java new file mode 100644 index 0000000000..e22082d131 --- /dev/null +++ b/core/src/main/java/io/grpc/ExponentialBackoffPolicy.java @@ -0,0 +1,112 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.annotations.VisibleForTesting; + +import java.util.Random; +import java.util.concurrent.TimeUnit; + +/** + * Retry Policy for Transport reconnection. Initial parameters from + * https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md + * + *

TODO(carl-mastrangelo): add unit tests for this class + */ +final class ExponentialBackoffPolicy implements BackoffPolicy { + static final class Provider implements BackoffPolicy.Provider { + @Override + public BackoffPolicy get() { + return new ExponentialBackoffPolicy(); + } + } + + private Random random = new Random(); + private long initialBackoffMillis = TimeUnit.SECONDS.toMillis(1); + private long maxBackoffMillis = TimeUnit.MINUTES.toMillis(2); + private double multiplier = 1.6; + private double jitter = .2; + + private long nextBackoffMillis = initialBackoffMillis; + + @Override + public long nextBackoffMillis() { + long currentBackoffMillis = nextBackoffMillis; + nextBackoffMillis = Math.min((long) (currentBackoffMillis * multiplier), maxBackoffMillis); + return currentBackoffMillis + + uniformRandom(-jitter * currentBackoffMillis, jitter * currentBackoffMillis); + } + + private long uniformRandom(double low, double high) { + checkArgument(high >= low); + double mag = high - low; + return (long) (random.nextDouble() * mag + low); + } + + /* + * No guice and no flags means we get to implement these setters for testing ourselves. Do not + * call these from non-test code. + */ + + @VisibleForTesting + ExponentialBackoffPolicy setRandom(Random random) { + this.random = random; + return this; + } + + @VisibleForTesting + ExponentialBackoffPolicy setInitialBackoffMillis(long initialBackoffMillis) { + this.initialBackoffMillis = initialBackoffMillis; + return this; + } + + @VisibleForTesting + ExponentialBackoffPolicy setMaxBackoffMillis(long maxBackoffMillis) { + this.maxBackoffMillis = maxBackoffMillis; + return this; + } + + @VisibleForTesting + ExponentialBackoffPolicy setMultiplier(double multiplier) { + this.multiplier = multiplier; + return this; + } + + @VisibleForTesting + ExponentialBackoffPolicy setJitter(double jitter) { + this.jitter = jitter; + return this; + } +} + diff --git a/core/src/test/java/io/grpc/ExponentialBackoffPolicyTest.java b/core/src/test/java/io/grpc/ExponentialBackoffPolicyTest.java new file mode 100644 index 0000000000..f26f0238fe --- /dev/null +++ b/core/src/test/java/io/grpc/ExponentialBackoffPolicyTest.java @@ -0,0 +1,74 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Random; + +/** + * Test for {@link ExponentialBackoffPolicy}. + */ +@RunWith(JUnit4.class) +public class ExponentialBackoffPolicyTest { + private ExponentialBackoffPolicy policy = new ExponentialBackoffPolicy(); + private Random notRandom = new Random() { + @Override + public double nextDouble() { + return .5; + } + }; + + @Test + public void maxDelayReached() { + long maxBackoffMillis = 120 * 1000; + policy.setMaxBackoffMillis(maxBackoffMillis) + .setJitter(0) + .setRandom(notRandom); + for (int i = 0; i < 50; i++) { + if (maxBackoffMillis == policy.nextBackoffMillis()) { + return; // Success + } + } + assertEquals("max delay not reached", maxBackoffMillis, policy.nextBackoffMillis()); + } + + @Test public void canProvide() { + assertNotNull(new ExponentialBackoffPolicy.Provider().get()); + } +} +