Implement Transport reconnect

This commit is contained in:
Carl Mastrangelo 2015-07-27 14:36:21 -07:00
parent 512134b095
commit 2c2c48a592
5 changed files with 315 additions and 9 deletions

View File

@ -0,0 +1,47 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
/**
* Determines how long to wait before doing some action (typically a retry, or a reconnect).
*/
interface BackoffPolicy {
interface Provider {
BackoffPolicy get();
}
/**
* @return The number of milliseconds to wait.
*/
long nextBackoffMillis();
}

View File

@ -35,6 +35,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.grpc.ClientCallImpl.ClientTransportProvider; import io.grpc.ClientCallImpl.ClientTransportProvider;
import io.grpc.Metadata.Headers;
import io.grpc.transport.ClientStream;
import io.grpc.transport.ClientStreamListener;
import io.grpc.transport.ClientTransport; import io.grpc.transport.ClientTransport;
import io.grpc.transport.ClientTransport.PingCallback; import io.grpc.transport.ClientTransport.PingCallback;
import io.grpc.transport.ClientTransportFactory; import io.grpc.transport.ClientTransportFactory;
@ -67,7 +70,11 @@ public final class ChannelImpl extends Channel {
/** /**
* Executor that runs deadline timers for requests. * Executor that runs deadline timers for requests.
*/ */
private ScheduledExecutorService deadlineCancellationExecutor; private ScheduledExecutorService scheduledExecutor;
// TODO(carl-mastrangelo): Allow clients to pass this in
private final BackoffPolicy.Provider backoffPolicyProvider =
new ExponentialBackoffPolicy.Provider();
/** /**
* We delegate to this channel, so that we can have interceptors as necessary. If there aren't * We delegate to this channel, so that we can have interceptors as necessary. If there aren't
* any interceptors this will just be {@link RealChannel}. * any interceptors this will just be {@link RealChannel}.
@ -91,6 +98,9 @@ public final class ChannelImpl extends Channel {
private boolean terminated; private boolean terminated;
private Runnable terminationRunnable; private Runnable terminationRunnable;
private long reconnectTimeMillis;
private BackoffPolicy reconnectPolicy;
private final ClientTransportProvider transportProvider = new ClientTransportProvider() { private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
@Override @Override
public ClientTransport get() { public ClientTransport get() {
@ -104,7 +114,7 @@ public final class ChannelImpl extends Channel {
this.executor = executor; this.executor = executor;
this.userAgent = userAgent; this.userAgent = userAgent;
this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors); this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
deadlineCancellationExecutor = SharedResourceHolder.get(TIMER_SERVICE); scheduledExecutor = SharedResourceHolder.get(TIMER_SERVICE);
} }
/** Hack to allow executors to auto-shutdown. Not for general use. */ /** Hack to allow executors to auto-shutdown. Not for general use. */
@ -124,8 +134,7 @@ public final class ChannelImpl extends Channel {
} }
shutdown = true; shutdown = true;
// After shutdown there are no new calls, so no new cancellation tasks are needed // After shutdown there are no new calls, so no new cancellation tasks are needed
deadlineCancellationExecutor = scheduledExecutor = SharedResourceHolder.release(TIMER_SERVICE, scheduledExecutor);
SharedResourceHolder.release(TIMER_SERVICE, deadlineCancellationExecutor);
if (activeTransport != null) { if (activeTransport != null) {
activeTransport.shutdown(); activeTransport.shutdown();
activeTransport = null; activeTransport = null;
@ -231,7 +240,8 @@ public final class ChannelImpl extends Channel {
private ClientTransport obtainActiveTransport() { private ClientTransport obtainActiveTransport() {
ClientTransport savedActiveTransport = activeTransport; ClientTransport savedActiveTransport = activeTransport;
if (savedActiveTransport != null) { // If we know there is an active transport and we are not in backoff mode, return quickly.
if (savedActiveTransport != null && !(savedActiveTransport instanceof InactiveTransport)) {
return savedActiveTransport; return savedActiveTransport;
} }
synchronized (lock) { synchronized (lock) {
@ -239,9 +249,22 @@ public final class ChannelImpl extends Channel {
return null; return null;
} }
savedActiveTransport = activeTransport; savedActiveTransport = activeTransport;
if (savedActiveTransport instanceof InactiveTransport) {
if (System.nanoTime() > TimeUnit.MILLISECONDS.toNanos(reconnectTimeMillis)) {
// The timeout expired, clear the inactive transport and update the shutdown status to
// something that is retryable.
activeTransport = null;
savedActiveTransport = activeTransport;
} else {
// We are still in backoff mode, just return the inactive transport.
return savedActiveTransport;
}
}
if (savedActiveTransport != null) { if (savedActiveTransport != null) {
return savedActiveTransport; return savedActiveTransport;
} }
// There is no active transport, or we just finished backoff. Create a new transport.
ClientTransport newActiveTransport = transportFactory.newClientTransport(); ClientTransport newActiveTransport = transportFactory.newClientTransport();
transports.add(newActiveTransport); transports.add(newActiveTransport);
boolean failed = true; boolean failed = true;
@ -273,7 +296,7 @@ public final class ChannelImpl extends Channel {
new SerializingExecutor(executor), new SerializingExecutor(executor),
callOptions, callOptions,
transportProvider, transportProvider,
deadlineCancellationExecutor) scheduledExecutor)
.setUserAgent(userAgent); .setUserAgent(userAgent);
} }
} }
@ -287,15 +310,30 @@ public final class ChannelImpl extends Channel {
@Override @Override
public void transportReady() { public void transportReady() {
// TODO(carl-mastrangelo): Implement this synchronized (lock) {
if (activeTransport == transport) {
reconnectPolicy = null;
}
}
} }
@Override @Override
public void transportShutdown(Status s) { public void transportShutdown(Status s) {
// TODO(carl-mastrangelo): use this status to determine if and how to retry the connection.
synchronized (lock) { synchronized (lock) {
if (activeTransport == transport) { if (activeTransport == transport) {
activeTransport = null; activeTransport = null;
// This transport listener was attached to the active transport.
if (s.isOk()) {
return;
}
// Alright, something bad has happened.
if (reconnectPolicy == null) {
// This happens the first time something bad has happened.
reconnectPolicy = backoffPolicyProvider.get();
reconnectTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
activeTransport = new InactiveTransport(s);
reconnectTimeMillis += reconnectPolicy.nextBackoffMillis();
} }
} }
} }
@ -420,4 +458,39 @@ public final class ChannelImpl extends Channel {
instance.shutdown(); instance.shutdown();
} }
}; };
private static final class InactiveTransport implements ClientTransport {
private final Status shutdownStatus;
private InactiveTransport(Status s) {
shutdownStatus = s;
}
@Override
public ClientStream newStream(
MethodDescriptor<?, ?> method, Headers headers, ClientStreamListener listener) {
listener.closed(shutdownStatus, new Metadata.Trailers());
return new ClientCallImpl.NoopClientStream();
}
@Override
public void start(Listener listener) {
throw new IllegalStateException();
}
@Override
public void ping(final PingCallback callback, Executor executor) {
executor.execute(new Runnable() {
@Override
public void run() {
callback.pingFailed(shutdownStatus.asException());
}
});
}
@Override
public void shutdown() {
// no-op
}
}
} }

View File

@ -305,7 +305,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
} }
} }
private static class NoopClientStream implements ClientStream { static class NoopClientStream implements ClientStream {
@Override public void writeMessage(InputStream message) {} @Override public void writeMessage(InputStream message) {}
@Override public void flush() {} @Override public void flush() {}

View File

@ -0,0 +1,112 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* Retry Policy for Transport reconnection. Initial parameters from
* https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
*
* <p>TODO(carl-mastrangelo): add unit tests for this class
*/
final class ExponentialBackoffPolicy implements BackoffPolicy {
static final class Provider implements BackoffPolicy.Provider {
@Override
public BackoffPolicy get() {
return new ExponentialBackoffPolicy();
}
}
private Random random = new Random();
private long initialBackoffMillis = TimeUnit.SECONDS.toMillis(1);
private long maxBackoffMillis = TimeUnit.MINUTES.toMillis(2);
private double multiplier = 1.6;
private double jitter = .2;
private long nextBackoffMillis = initialBackoffMillis;
@Override
public long nextBackoffMillis() {
long currentBackoffMillis = nextBackoffMillis;
nextBackoffMillis = Math.min((long) (currentBackoffMillis * multiplier), maxBackoffMillis);
return currentBackoffMillis
+ uniformRandom(-jitter * currentBackoffMillis, jitter * currentBackoffMillis);
}
private long uniformRandom(double low, double high) {
checkArgument(high >= low);
double mag = high - low;
return (long) (random.nextDouble() * mag + low);
}
/*
* No guice and no flags means we get to implement these setters for testing ourselves. Do not
* call these from non-test code.
*/
@VisibleForTesting
ExponentialBackoffPolicy setRandom(Random random) {
this.random = random;
return this;
}
@VisibleForTesting
ExponentialBackoffPolicy setInitialBackoffMillis(long initialBackoffMillis) {
this.initialBackoffMillis = initialBackoffMillis;
return this;
}
@VisibleForTesting
ExponentialBackoffPolicy setMaxBackoffMillis(long maxBackoffMillis) {
this.maxBackoffMillis = maxBackoffMillis;
return this;
}
@VisibleForTesting
ExponentialBackoffPolicy setMultiplier(double multiplier) {
this.multiplier = multiplier;
return this;
}
@VisibleForTesting
ExponentialBackoffPolicy setJitter(double jitter) {
this.jitter = jitter;
return this;
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.Random;
/**
* Test for {@link ExponentialBackoffPolicy}.
*/
@RunWith(JUnit4.class)
public class ExponentialBackoffPolicyTest {
private ExponentialBackoffPolicy policy = new ExponentialBackoffPolicy();
private Random notRandom = new Random() {
@Override
public double nextDouble() {
return .5;
}
};
@Test
public void maxDelayReached() {
long maxBackoffMillis = 120 * 1000;
policy.setMaxBackoffMillis(maxBackoffMillis)
.setJitter(0)
.setRandom(notRandom);
for (int i = 0; i < 50; i++) {
if (maxBackoffMillis == policy.nextBackoffMillis()) {
return; // Success
}
}
assertEquals("max delay not reached", maxBackoffMillis, policy.nextBackoffMillis());
}
@Test public void canProvide() {
assertNotNull(new ExponentialBackoffPolicy.Provider().get());
}
}