From cf787bddf2fa110b2673cd671468eb7643c8ee03 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Fri, 29 Jan 2016 17:25:42 -0800 Subject: [PATCH] DelayedClientTransport and fix TransportSet.shutdown() semantics. Always return a completed future from `TransportSet`. If a (real) transport has not been created (e.g., in reconnect back-off), a `DelayedClientTransport` will be returned. Eventually we will get rid of the transport futures everywhere, and have streams always __owned__ by some transports. DelayedClientTransport ---------------------- After we get rid of the transport future, this is what `ClientCallImpl` and `LoadBalancer` get when a real transport has not been created yet. It buffers new streams and pings until `setTransport()` is called, after which point all buffered and future streams/pings are transferred to the real transport. If a buffered stream is cancelled, `DelayedClientTransport` will remove it from the buffer list, thus #1342 will be resolved after the larger refactoring is complete. This PR only makes `TransportSet` use `DelayedClientTransport`. Follow-up changes will be made to allow `LoadBalancer.pickTransport()` to return null, in which case `ManagedChannelImpl` will give `ClientCallImpl` a `DelayedClientTransport`. Changes to ClientTransport shutdown semantics --------------------------------------------- Previously when shutdown() is called, `ClientTransport` should not accept newStream(), and when all existing streams have been closed, `ClientTransport` is terminated. Only when a transport is terminated would a transport owner (e.g., `TransportSet`) remove the reference to it. `DelayedClientTransport` brings about a new case: when `setTransport()` is called, we switch to the real transport and no longer need the delayed transport. This is achieved by calling `shutdown()` on the delayed transport and letting it terminate. However, as the delayed transport has already been handed out to users, we would like `newStream()` to keep working for them, even though the delayed transport is already shut down and terminated. In order to make it easy to manage the life-cycle of `DelayedClientTransport`, we redefine the shutdown semantics of transport: - A transport can own a stream. Typically the transport owns the streams it creates, but there may be exceptions. `DelayedClientTransport` DOES NOT OWN the streams it returns from `newStream()` after `setTransport()` has been called. Instead, the ownership would be transferred to the real transport. - After `shutdown()` has been called, the transport stops owning new streams, and `newStream()` may still succeed. With this idea, `DelayedClientTransport`, even when terminated, will continue passing `newStream()` to the real transport. - When a transport is in shutdown state, and it doesn't own any stream, it then can enter terminated state. ManagedClientTransport / ClientTransport ---------------------------------------- Remove life-cycle interfaces from `ClientTransport`, and put them in its subclass - `ManagedClientTransport`, with the same idea that we have `Channel` and `ManagedChannel`. Only the one who creates the transport will get `ManagedClientTransport` thus is able to start and shutdown the transport. The users of transport, e.g., `LoadBalancer`, can only get `ClientTransport` thus are not alter its state. This change clarifies the responsibility of transport life-cycle management. Fix TransportSet shutdown semantics ----------------------------------- Currently, if `TransportSet.shutdown()` has been called, it no longer create new transports, which is wrong. The correct semantics of `TransportSet.shutdown()` should be: - Shutdown all transports, thus stop new streams being created on them - Stop `obtainActiveTransport()` from returning transports - Streams that already created, including those buffered in delayed transport, should continue. That means if delayed transport has buffered streams, we should let the existing reconnect task continue. --- .../inprocess/InProcessChannelBuilder.java | 4 +- .../io/grpc/inprocess/InProcessTransport.java | 10 +- .../AbstractManagedChannelImplBuilder.java | 3 +- .../io/grpc/internal/ClientTransport.java | 72 +---- .../grpc/internal/ClientTransportFactory.java | 4 +- .../grpc/internal/DelayedClientTransport.java | 268 ++++++++++++++++++ .../java/io/grpc/internal/DelayedStream.java | 2 + .../grpc/internal/ManagedClientTransport.java | 105 +++++++ .../java/io/grpc/internal/TransportSet.java | 225 ++++++++------- .../internal/DelayedClientTransportTest.java | 222 +++++++++++++++ .../io/grpc/internal/DelayedStreamTest.java | 1 - .../grpc/internal/ManagedChannelImplTest.java | 19 +- ...anagedChannelImplTransportManagerTest.java | 31 +- .../test/java/io/grpc/internal/TestUtils.java | 54 +++- .../io/grpc/internal/TransportSetTest.java | 136 +++++++-- .../io/grpc/netty/NettyChannelBuilder.java | 5 +- .../io/grpc/netty/NettyClientHandler.java | 6 +- .../io/grpc/netty/NettyClientTransport.java | 6 +- .../grpc/netty/NettyClientTransportTest.java | 4 +- .../io/grpc/okhttp/OkHttpChannelBuilder.java | 4 +- .../io/grpc/okhttp/OkHttpClientTransport.java | 6 +- .../okhttp/OkHttpClientTransportTest.java | 5 +- 22 files changed, 932 insertions(+), 260 deletions(-) create mode 100644 core/src/main/java/io/grpc/internal/DelayedClientTransport.java create mode 100644 core/src/main/java/io/grpc/internal/ManagedClientTransport.java create mode 100644 core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java index 4eeb1c96ac..20650451a9 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java @@ -36,8 +36,8 @@ import com.google.common.base.Preconditions; import io.grpc.ExperimentalApi; import io.grpc.internal.AbstractManagedChannelImplBuilder; import io.grpc.internal.AbstractReferenceCounted; -import io.grpc.internal.ClientTransport; import io.grpc.internal.ClientTransportFactory; +import io.grpc.internal.ManagedClientTransport; import java.net.SocketAddress; @@ -89,7 +89,7 @@ public class InProcessChannelBuilder extends } @Override - public ClientTransport newClientTransport(SocketAddress addr, String authority) { + public ManagedClientTransport newClientTransport(SocketAddress addr, String authority) { return new InProcessTransport(name); } diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index a620b1e397..f541ba476c 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -40,7 +40,7 @@ import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.internal.ClientStream; import io.grpc.internal.ClientStreamListener; -import io.grpc.internal.ClientTransport; +import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.NoopClientStream; import io.grpc.internal.ServerStream; import io.grpc.internal.ServerStreamListener; @@ -59,12 +59,12 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; @ThreadSafe -class InProcessTransport implements ServerTransport, ClientTransport { +class InProcessTransport implements ServerTransport, ManagedClientTransport { private static final Logger log = Logger.getLogger(InProcessTransport.class.getName()); private final String name; private ServerTransportListener serverTransportListener; - private ClientTransport.Listener clientTransportListener; + private ManagedClientTransport.Listener clientTransportListener; @GuardedBy("this") private boolean shutdown; @GuardedBy("this") @@ -79,7 +79,7 @@ class InProcessTransport implements ServerTransport, ClientTransport { } @Override - public synchronized void start(ClientTransport.Listener listener) { + public synchronized void start(ManagedClientTransport.Listener listener) { this.clientTransportListener = listener; InProcessServer server = InProcessServer.findServer(name); if (server != null) { @@ -152,7 +152,7 @@ class InProcessTransport implements ServerTransport, ClientTransport { @Override public synchronized void shutdown() { - // Can be called multiple times: once for ClientTransport, once for ServerTransport. + // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport. if (shutdown) { return; } diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 990e368b39..441e18f376 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -230,7 +230,8 @@ public abstract class AbstractManagedChannelImplBuilder } @Override - public ClientTransport newClientTransport(SocketAddress serverAddress, String authority) { + public ManagedClientTransport newClientTransport(SocketAddress serverAddress, + String authority) { return factory.newClientTransport( serverAddress, authorityOverride != null ? authorityOverride : authority); } diff --git a/core/src/main/java/io/grpc/internal/ClientTransport.java b/core/src/main/java/io/grpc/internal/ClientTransport.java index 298c98787f..c4f1ee6215 100644 --- a/core/src/main/java/io/grpc/internal/ClientTransport.java +++ b/core/src/main/java/io/grpc/internal/ClientTransport.java @@ -1,5 +1,5 @@ /* - * Copyright 2014, Google Inc. All rights reserved. + * Copyright 2016, Google Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are @@ -33,25 +33,21 @@ package io.grpc.internal; import io.grpc.Metadata; import io.grpc.MethodDescriptor; -import io.grpc.Status; import java.util.concurrent.Executor; - import javax.annotation.concurrent.ThreadSafe; /** - * The client-side transport encapsulating a single connection to a remote server. Allows creation - * of new {@link Stream} instances for communication with the server. All methods on the transport - * and its listener are expected to execute quickly. - * - *

{@link #start} must be the first method call to this interface and return before calling other - * methods. + * The client-side transport typically encapsulating a single connection to a remote + * server. However, streams created before the client has discovered any server address may + * eventually be issued on different connections. All methods on the transport and its callbacks + * are expected to execute quickly. */ @ThreadSafe public interface ClientTransport { /** - * Creates a new stream for sending messages to the remote end-point. + * Creates a new stream for sending messages to a remote end-point. * *

* This method returns immediately and does not wait for any validation of the request. If @@ -67,64 +63,18 @@ public interface ClientTransport { ClientStream newStream(MethodDescriptor method, Metadata headers); /** - * Starts transport. This method may only be called once. + * Pings a remote endpoint. When an acknowledgement is received, the given callback will be + * invoked using the given executor. * - *

Implementations must not call {@code listener} from within {@link #start}; implementations - * are expected to notify listener on a separate thread. This method should not throw any - * exceptions. - * - * @param listener non-{@code null} listener of transport events - */ - void start(Listener listener); - - /** - * Pings the remote endpoint to verify that the transport is still active. When an acknowledgement - * is received, the given callback will be invoked using the given executor. + *

Pings are not necessarily sent to the same endpont, thus a successful ping only means at + * least one endpoint responded, but doesn't imply the availability of other endpoints (if there + * is any). * *

This is an optional method. Transports that do not have any mechanism by which to ping the * remote endpoint may throw {@link UnsupportedOperationException}. */ void ping(PingCallback callback, Executor executor); - /** - * Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will - * fail (once {@link Listener#transportShutdown} callback called). This method may only be called - * once. - */ - void shutdown(); - - /** - * Receives notifications for the transport life-cycle events. Implementation does not need to be - * thread-safe, so notifications must be properly sychronized externally. - */ - interface Listener { - /** - * The transport is shutting down. No new streams will be processed, but existing streams may - * continue. Shutdown could have been caused by an error or normal operation. It is possible - * that this method is called without {@link #shutdown} being called. If the argument to this - * function is {@link Status#isOk}, it is safe to immediately reconnect. - * - *

This is called exactly once, and must be called prior to {@link #transportTerminated}. - * - * @param s the reason for the shutdown. - */ - void transportShutdown(Status s); - - /** - * The transport completed shutting down. All resources have been released. - * - *

This is called exactly once, and must be called after {@link #transportShutdown} has been - * called. - */ - void transportTerminated(); - - /** - * The transport is ready to accept traffic, because the connection is established. This is - * called at most once. - */ - void transportReady(); - } - /** * A callback that is invoked when the acknowledgement to a {@link #ping} is received. Exactly one * of the two methods should be called per {@link #ping}. diff --git a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java index bc06a10966..7172a80533 100644 --- a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java +++ b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java @@ -33,7 +33,7 @@ package io.grpc.internal; import java.net.SocketAddress; -/** Pre-configured factory for creating {@link ClientTransport} instances. */ +/** Pre-configured factory for creating {@link ManagedClientTransport} instances. */ public interface ClientTransportFactory extends ReferenceCounted { /** * Creates an unstarted transport for exclusive use. @@ -41,5 +41,5 @@ public interface ClientTransportFactory extends ReferenceCounted { * @param serverAddress the address that the transport is connected to * @param authority the HTTP/2 authority of the server */ - ClientTransport newClientTransport(SocketAddress serverAddress, String authority); + ManagedClientTransport newClientTransport(SocketAddress serverAddress, String authority); } diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java new file mode 100644 index 0000000000..7869c68e97 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -0,0 +1,268 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.concurrent.Executor; + +import javax.annotation.concurrent.GuardedBy; + +/** + * A client transport that queues requests before a real transport is available. When a backing + * transport is later provided, this class delegates to it. + * + *

This transport owns the streams that it has created before {@link #setTransport} is + * called. When {@link #setTransport} is called, the ownership of pending streams and subsequent new + * streams are transferred to the given transport, thus this transport won't own any stream. + */ +class DelayedClientTransport implements ManagedClientTransport { + private final Object lock = new Object(); + + private Listener listener; + /** 'lock' must be held when assigning to delegate. */ + private volatile ClientTransport delegate; + + @GuardedBy("lock") + private Collection pendingStreams = new HashSet(); + @GuardedBy("lock") + private Collection pendingPings = new ArrayList(); + /** + * When shutdown == true and pendingStreams == null, then the transport is considered terminated. + */ + @GuardedBy("lock") + private boolean shutdown; + + @Override + public void start(Listener listener) { + this.listener = Preconditions.checkNotNull(listener, "listener"); + } + + @Override + public ClientStream newStream(MethodDescriptor method, Metadata headers) { + ClientTransport transport = delegate; + if (transport != null) { + return transport.newStream(method, headers); + } + synchronized (lock) { + // Check again, since it may have changed while waiting for lock + transport = delegate; + if (transport != null) { + return transport.newStream(method, headers); + } + if (!shutdown) { + PendingStream pendingStream = new PendingStream(method, headers); + pendingStreams.add(pendingStream); + return pendingStream; + } + } + DelayedStream stream = new DelayedStream(); + stream.setError(Status.UNAVAILABLE.withDescription("transport shutdown")); + return stream; + } + + public void ping(final PingCallback callback, Executor executor) { + ClientTransport transport = delegate; + if (transport != null) { + transport.ping(callback, executor); + return; + } + synchronized (lock) { + // Check again, since it may have changed while waiting for lock + transport = delegate; + if (transport != null) { + transport.ping(callback, executor); + return; + } + if (!shutdown) { + PendingPing pendingPing = new PendingPing(callback, executor); + pendingPings.add(pendingPing); + return; + } + } + executor.execute(new Runnable() { + @Override public void run() { + callback.onFailure( + Status.UNAVAILABLE.withDescription("transport shutdown").asException()); + } + }); + } + + /** + * Prevents creating any new streams until {@link #setTransport} is called. Buffered streams are + * not failed, so if {@link #shutdown} is called when {@link #setTransport} has not been called, + * you still need to call {@link setTransport} to make this transport terminated. + */ + @Override + public void shutdown() { + synchronized (lock) { + if (shutdown) { + return; + } + shutdown = true; + listener.transportShutdown( + Status.OK.withDescription("Channel requested transport to shut down")); + if (pendingStreams == null || pendingStreams.isEmpty()) { + pendingStreams = null; + listener.transportTerminated(); + } + } + } + + /** + * Shuts down this transport and cancels all streams that it owns, hence immediately terminates + * this transport. + */ + public void shutdownNow(Status status) { + shutdown(); + Collection savedPendingStreams = null; + synchronized (lock) { + if (pendingStreams != null) { + savedPendingStreams = pendingStreams; + pendingStreams = null; + } + } + if (savedPendingStreams != null) { + for (PendingStream stream : savedPendingStreams) { + stream.setError(status); + } + listener.transportTerminated(); + } + // If savedPendingStreams == null, transportTerminated() has already been called in shutdown(). + } + + /** + * Transfers all the pending and future requests and pings to the given transport. + * + *

May only be called after {@link #start(Listener)}. + * + *

{@code transport} will be used for all future calls to {@link #newStream}, even if this + * transport is {@link #shutdown}. + */ + public void setTransport(ClientTransport transport) { + Collection savedPendingStreams; + synchronized (lock) { + Preconditions.checkState(delegate == null, "setTransport already called"); + Preconditions.checkState(listener != null, "start() not called"); + delegate = Preconditions.checkNotNull(transport, "transport"); + for (PendingPing ping : pendingPings) { + ping.createRealPing(transport); + } + pendingPings = null; + if (shutdown && pendingStreams != null) { + listener.transportTerminated(); + } + savedPendingStreams = pendingStreams; + pendingStreams = null; + if (!shutdown) { + listener.transportReady(); + } + } + // createRealStream may be expensive, so we run it outside of the lock. It will start real + // streams on the transport. If there are pending requests, they will be serialized too, which + // may be expensive. + // TODO(zhangkun83): may consider doing it in a different thread. + if (savedPendingStreams != null) { + for (PendingStream stream : savedPendingStreams) { + stream.createRealStream(transport); + } + } + } + + @VisibleForTesting + int getPendingStreamsCount() { + synchronized (lock) { + return pendingStreams == null ? 0 : pendingStreams.size(); + } + } + + private class PendingStream extends DelayedStream { + private final MethodDescriptor method; + private final Metadata headers; + + private PendingStream(MethodDescriptor method, Metadata headers) { + this.method = method; + this.headers = headers; + } + + private void createRealStream(ClientTransport transport) { + setStream(transport.newStream(method, headers)); + } + + // TODO(zhangkun83): DelayedStream.setError() doesn't have a clearly-defined semantic to be + // overriden. Make it clear or find another method to override. + @Override + void setError(Status reason) { + synchronized (lock) { + if (pendingStreams != null) { + pendingStreams.remove(this); + if (shutdown && pendingStreams.isEmpty()) { + pendingStreams = null; + listener.transportTerminated(); + } + } + } + super.setError(reason); + } + } + + private static class PendingPing { + private final PingCallback callback; + private final Executor executor; + + public PendingPing(PingCallback callback, Executor executor) { + this.callback = callback; + this.executor = executor; + } + + public void createRealPing(ClientTransport transport) { + try { + transport.ping(callback, executor); + } catch (final UnsupportedOperationException ex) { + executor.execute(new Runnable() { + @Override + public void run() { + callback.onFailure(ex); + } + }); + } + } + } +} diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java index 89c7fcab2a..42c16ad219 100644 --- a/core/src/main/java/io/grpc/internal/DelayedStream.java +++ b/core/src/main/java/io/grpc/internal/DelayedStream.java @@ -53,6 +53,8 @@ import javax.annotation.concurrent.GuardedBy; * DelayedStream} may be internally altered by different threads, thus internal synchronization is * necessary. */ +// TODO(zhangkun83): merge it with DelayedClientTransport.PendingStream as it will be no longer +// needed by ClientCallImpl as we move away from ListenableFuture class DelayedStream implements ClientStream { // set to non null once both listener and realStream are valid. After this point it is safe diff --git a/core/src/main/java/io/grpc/internal/ManagedClientTransport.java b/core/src/main/java/io/grpc/internal/ManagedClientTransport.java new file mode 100644 index 0000000000..02b53c5c19 --- /dev/null +++ b/core/src/main/java/io/grpc/internal/ManagedClientTransport.java @@ -0,0 +1,105 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import io.grpc.Status; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * A {@link ClientTransport} that has life-cycle management. + * + *

{@link #start} must be the first method call to this interface and return before calling other + * methods. + * + *

Typically the transport owns the streams it creates through {@link #newStream}, while some + * implementations may transfer the streams to somewhere else. Either way they must conform to the + * contract defined by {@link #shutdown}, {@link Listener#transportShutdown} and + * {@link Listener#transportTerminated}. + */ +@ThreadSafe +public interface ManagedClientTransport extends ClientTransport { + + /** + * Starts transport. This method may only be called once. + * + *

Implementations must not call {@code listener} from within {@link #start}; implementations + * are expected to notify listener on a separate thread. This method should not throw any + * exceptions. + * + * @param listener non-{@code null} listener of transport events + */ + void start(Listener listener); + + /** + * Initiates an orderly shutdown of the transport. Existing streams continue, but the transport + * will not own any new streams. New streams will either fail (once + * {@link Listener#transportShutdown} callback called), or be transferred off this transport (in + * which case they may succeed). This method may only be called once. + */ + void shutdown(); + + /** + * Receives notifications for the transport life-cycle events. Implementation does not need to be + * thread-safe, so notifications must be properly sychronized externally. + */ + interface Listener { + /** + * The transport is shutting down. This transport will stop owning new streams, but existing + * streams may continue, and the transport may still be able to process {@link #newStream} as + * long as it doesn't own the new streams. Shutdown could have been caused by an error or normal + * operation. It is possible that this method is called without {@link #shutdown} being called. + * If the argument to this function is {@link Status#isOk}, it is safe to immediately reconnect. + * + *

This is called exactly once, and must be called prior to {@link #transportTerminated}. + * + * @param s the reason for the shutdown. + */ + void transportShutdown(Status s); + + /** + * The transport completed shutting down. All resources have been released. All streams have + * either been closed or transferred off this transport. This transport may still be able to + * process {@link #newStream} as long as it doesn't own the new streams. + * + *

This is called exactly once, and must be called after {@link #transportShutdown} has been + * called. + */ + void transportTerminated(); + + /** + * The transport is ready to accept traffic, because the connection is established. This is + * called at most once. + */ + void transportReady(); + } +} diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java index 4e8b523dcc..8b0216fb86 100644 --- a/core/src/main/java/io/grpc/internal/TransportSet.java +++ b/core/src/main/java/io/grpc/internal/TransportSet.java @@ -34,8 +34,7 @@ package io.grpc.internal; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.EquivalentAddressGroup; @@ -62,12 +61,6 @@ import javax.annotation.concurrent.ThreadSafe; @ThreadSafe final class TransportSet { private static final Logger log = Logger.getLogger(TransportSet.class.getName()); - private static final UncancellableTransportFuture NULL_VALUE_FUTURE; - - static { - NULL_VALUE_FUTURE = new UncancellableTransportFuture(); - NULL_VALUE_FUTURE.set(null); - } private final Object lock = new Object(); private final EquivalentAddressGroup addressGroup; @@ -98,24 +91,35 @@ final class TransportSet { private ScheduledFuture reconnectTask; /** - * All transports that are not stopped. At the very least the value of {@link - * activeTransportFuture} will be present, but previously used transports that still have streams - * or are stopping may also be present. + * All transports that are not terminated. At the very least the value of {@link activeTransport} + * will be present, but previously used transports that still have streams or are stopping may + * also be present. */ @GuardedBy("lock") - private final Collection transports = new ArrayList(); + private final Collection transports = + new ArrayList(); private final LoadBalancer loadBalancer; @GuardedBy("lock") private boolean shutdown; - /** - * The future for the transport for new outgoing requests. 'lock' must be held when assigning - * to it. + /* + * The transport for new outgoing requests. + * - If shutdown == true, activeTransport is null (shutdown) + * - Otherwise, if delayedTransport != null, + * activeTransport is delayedTransport (waiting to connect) + * - Otherwise, activeTransport is either null (initially or when idle) + * or points to a real transport (when connecting or connected). + * + * 'lock' must be held when assigning to it. */ @Nullable - private volatile UncancellableTransportFuture activeTransportFuture; + private volatile ManagedClientTransport activeTransport; + + @GuardedBy("lock") + @Nullable + private DelayedClientTransport delayedTransport; TransportSet(EquivalentAddressGroup addressGroup, String authority, LoadBalancer loadBalancer, BackoffPolicy.Provider backoffPolicyProvider, @@ -146,30 +150,27 @@ final class TransportSet { *

Cancelling the return future has no effect. The future will never fail. If this {@code * TransportSet} has been shut down, the returned future will have {@code null} value. */ + // TODO(zhangkun83): change it to return a ClientTransport directly final ListenableFuture obtainActiveTransport() { - UncancellableTransportFuture savedTransportFuture = activeTransportFuture; - if (savedTransportFuture != null) { - return savedTransportFuture; + ClientTransport savedTransport = activeTransport; + if (savedTransport != null) { + return Futures.immediateFuture(savedTransport); } synchronized (lock) { // Check again, since it could have changed before acquiring the lock - if (activeTransportFuture == null) { - // In shutdown(), activeTransportFuture is set to NULL_VALUE_FUTURE, thus if - // activeTransportFuture is null, shutdown must be false. - Preconditions.checkState(!shutdown, "already shutdown"); - Preconditions.checkState(activeTransportFuture == null || activeTransportFuture.isDone(), - "activeTransportFuture is neither null nor done"); - activeTransportFuture = new UncancellableTransportFuture(); + if (activeTransport == null && !shutdown) { + delayedTransport = new DelayedClientTransport(); + transports.add(delayedTransport); + delayedTransport.start(new BaseTransportListener(delayedTransport)); + activeTransport = delayedTransport; scheduleConnection(); } - return activeTransportFuture; + return Futures.immediateFuture(activeTransport); } } - // Can only be called when shutdown == false @GuardedBy("lock") private void scheduleConnection() { - Preconditions.checkState(!shutdown, "Already shut down"); Preconditions.checkState(reconnectTask == null || reconnectTask.isDone(), "previous reconnectTask is not done"); @@ -184,22 +185,39 @@ final class TransportSet { Runnable createTransportRunnable = new Runnable() { @Override public void run() { + DelayedClientTransport savedDelayedTransport; + ManagedClientTransport newActiveTransport; + boolean savedShutdown; synchronized (lock) { - if (shutdown) { - return; - } + savedShutdown = shutdown; if (currentAddressIndex == headIndex) { backoffWatch.reset().start(); } - ClientTransport newActiveTransport = - transportFactory.newClientTransport(address, authority); + newActiveTransport = transportFactory.newClientTransport(address, authority); log.log(Level.INFO, "Created transport {0} for {1}", new Object[] {newActiveTransport, address}); transports.add(newActiveTransport); newActiveTransport.start( - new TransportListener(newActiveTransport, activeTransportFuture, address)); - Preconditions.checkState(activeTransportFuture.set(newActiveTransport), - "failed to set the new transport to the future"); + new TransportListener(newActiveTransport, address)); + if (shutdown) { + // If TransportSet already shutdown, newActiveTransport is only to take care of pending + // streams in delayedTransport, but will not serve new streams, and it will be shutdown + // as soon as it's set to the delayedTransport. + // activeTransport should have already been set to null by shutdown(). We keep it null. + Preconditions.checkState(activeTransport == null, + "Unexpected non-null activeTransport"); + } else { + activeTransport = newActiveTransport; + } + savedDelayedTransport = delayedTransport; + delayedTransport = null; + } + savedDelayedTransport.setTransport(newActiveTransport); + // This delayed transport will terminate and be removed from transports. + savedDelayedTransport.shutdown(); + if (savedShutdown) { + // See comments in the synchronized block above on why we shutdown here. + newActiveTransport.shutdown(); } } }; @@ -228,65 +246,90 @@ final class TransportSet { } /** - * Shut down all transports, may run callback inline. + * Shut down all transports, stop creating new streams, but existing streams will continue. + * + *

May run callback inline. */ final void shutdown() { - UncancellableTransportFuture savedActiveTransportFuture; + ManagedClientTransport savedActiveTransport; boolean runCallback = false; synchronized (lock) { if (shutdown) { return; } shutdown = true; - savedActiveTransportFuture = activeTransportFuture; - activeTransportFuture = NULL_VALUE_FUTURE; + savedActiveTransport = activeTransport; + activeTransport = null; if (transports.isEmpty()) { runCallback = true; - } - if (reconnectTask != null) { - reconnectTask.cancel(false); - } - // else: the callback will be run once all transports have been terminated + Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled"); + Preconditions.checkState(delayedTransport == null, "Should have no delayedTransport"); + } // else: the callback will be run once all transports have been terminated } - if (savedActiveTransportFuture != null) { - if (savedActiveTransportFuture.isDone()) { - try { - // Should not throw any exception here - savedActiveTransportFuture.get().shutdown(); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } else { - savedActiveTransportFuture.set(null); - } + if (savedActiveTransport != null) { + savedActiveTransport.shutdown(); } if (runCallback) { callback.onTerminated(); } } - private class TransportListener implements ClientTransport.Listener { - private final SocketAddress address; - private final ClientTransport transport; - private final UncancellableTransportFuture transportFuture; + @GuardedBy("lock") + private void cancelReconnectTask() { + if (reconnectTask != null) { + reconnectTask.cancel(false); + reconnectTask = null; + } + } - public TransportListener(ClientTransport transport, - UncancellableTransportFuture transportFuture, SocketAddress address) { + /** Shared base for both delayed and real transports. */ + private class BaseTransportListener implements ManagedClientTransport.Listener { + protected final ManagedClientTransport transport; + + public BaseTransportListener(ManagedClientTransport transport) { this.transport = transport; - this.transportFuture = transportFuture; + } + + @Override + public void transportReady() {} + + @Override + public void transportShutdown(Status status) {} + + @Override + public void transportTerminated() { + boolean runCallback = false; + synchronized (lock) { + transports.remove(transport); + if (shutdown && transports.isEmpty()) { + runCallback = true; + cancelReconnectTask(); + } + } + if (runCallback) { + callback.onTerminated(); + } + } + } + + /** Listener for real transports. */ + private class TransportListener extends BaseTransportListener { + private final SocketAddress address; + + public TransportListener(ManagedClientTransport transport, SocketAddress address) { + super(transport); this.address = address; } - @GuardedBy("lock") private boolean isAttachedToActiveTransport() { - return activeTransportFuture == transportFuture; + return activeTransport == transport; } @Override public void transportReady() { + log.log(Level.INFO, "Transport {0} for {1} is ready", new Object[] {transport, address}); + super.transportReady(); synchronized (lock) { - log.log(Level.INFO, "Transport {0} for {1} is ready", new Object[] {transport, address}); - Preconditions.checkState(transportFuture.isDone(), "the transport future is not done"); if (isAttachedToActiveTransport()) { headIndex = -1; } @@ -296,52 +339,32 @@ final class TransportSet { @Override public void transportShutdown(Status s) { + log.log(Level.INFO, "Transport {0} for {1} is being shutdown", + new Object[] {transport, address}); + super.transportShutdown(s); synchronized (lock) { - log.log(Level.INFO, "Transport {0} for {1} is being shutdown", - new Object[] {transport, address}); - Preconditions.checkState(transportFuture.isDone(), "the transport future is not done"); if (isAttachedToActiveTransport()) { - activeTransportFuture = null; + activeTransport = null; } } + // TODO(zhangkun83): if loadBalancer was given delayedTransport earlier, it will get the real + // transport's shutdown event, and loadBalancer won't be able to match the two. This beats the + // purpose of passing the transport. We may just remove the second argument. loadBalancer.transportShutdown(addressGroup, transport, s); } @Override public void transportTerminated() { - boolean runCallback = false; - synchronized (lock) { - log.log(Level.INFO, "Transport {0} for {1} is terminated", - new Object[] {transport, address}); - Preconditions.checkState(!isAttachedToActiveTransport(), - "Listener is still attached to activeTransportFuture. " - + "Seems transportTerminated was not called."); - transports.remove(transport); - if (shutdown && transports.isEmpty()) { - runCallback = true; - } - } - if (runCallback) { - callback.onTerminated(); - } + log.log(Level.INFO, "Transport {0} for {1} is terminated", + new Object[] {transport, address}); + super.transportTerminated(); + Preconditions.checkState(!isAttachedToActiveTransport(), + "Listener is still attached to activeTransport. " + + "Seems transportTerminated was not called."); } } interface Callback { void onTerminated(); } - - private static class UncancellableTransportFuture extends AbstractFuture { - @Override public boolean cancel(boolean mayInterruptIfRunning) { - // Do not cancel. - // A future instance is shared among multiple obtainActiveTransport() calls. - // Since the user of the future may cancel it when it's no longer needed, cancelling for real - // will affect other users of the same future. - return false; - } - - @Override protected boolean set(ClientTransport v) { - return super.set(v); - } - } } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java new file mode 100644 index 0000000000..85bf735025 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -0,0 +1,222 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.same; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import io.grpc.IntegerMarshaller; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.StringMarshaller; +import io.grpc.internal.ClientTransport; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.concurrent.Executor; + +/** + * Unit tests for {@link DelayedClientTransport}. + */ +@RunWith(JUnit4.class) +public class DelayedClientTransportTest { + @Mock private ManagedClientTransport.Listener transportListener; + @Mock private ClientTransport mockRealTransport; + @Mock private ClientStream mockRealStream; + @Mock private ClientStreamListener streamListener; + @Mock private ClientTransport.PingCallback pingCallback; + @Mock private Executor mockExecutor; + @Captor private ArgumentCaptor statusCaptor; + + private final MethodDescriptor method = MethodDescriptor.create( + MethodDescriptor.MethodType.UNKNOWN, "/service/method", + new StringMarshaller(), new IntegerMarshaller()); + private final Metadata headers = new Metadata(); + + private final DelayedClientTransport delayedTransport = new DelayedClientTransport(); + + @Before public void setUp() { + MockitoAnnotations.initMocks(this); + when(mockRealTransport.newStream(same(method), same(headers))).thenReturn(mockRealStream); + delayedTransport.start(transportListener); + } + + @Test public void streamStartThenSetTransport() { + ClientStream stream = delayedTransport.newStream(method, headers); + stream.start(streamListener); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + assertTrue(stream instanceof DelayedStream); + delayedTransport.setTransport(mockRealTransport); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + verify(mockRealTransport).newStream(same(method), same(headers)); + verify(mockRealStream).start(same(streamListener)); + } + + @Test public void newStreamThenSetTransportThenShutdown() { + ClientStream stream = delayedTransport.newStream(method, headers); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + assertTrue(stream instanceof DelayedStream); + delayedTransport.setTransport(mockRealTransport); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + delayedTransport.shutdown(); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + verify(mockRealTransport).newStream(same(method), same(headers)); + stream.start(streamListener); + verify(mockRealStream).start(same(streamListener)); + } + + @Test public void transportTerminatedThenSetTransport() { + delayedTransport.shutdown(); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + delayedTransport.setTransport(mockRealTransport); + verifyNoMoreInteractions(transportListener); + } + + @Test public void setTransportThenShutdownThenNewStream() { + delayedTransport.setTransport(mockRealTransport); + delayedTransport.shutdown(); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + ClientStream stream = delayedTransport.newStream(method, headers); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + stream.start(streamListener); + assertFalse(stream instanceof DelayedStream); + verify(mockRealTransport).newStream(same(method), same(headers)); + verify(mockRealStream).start(same(streamListener)); + } + + @Test public void setTransportThenShutdownNowThenNewStream() { + delayedTransport.setTransport(mockRealTransport); + delayedTransport.shutdownNow(Status.UNAVAILABLE); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + ClientStream stream = delayedTransport.newStream(method, headers); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + stream.start(streamListener); + assertFalse(stream instanceof DelayedStream); + verify(mockRealTransport).newStream(same(method), same(headers)); + verify(mockRealStream).start(same(streamListener)); + } + + @Test public void cancelStreamWithoutSetTransport() { + ClientStream stream = delayedTransport.newStream(method, new Metadata()); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + stream.cancel(Status.CANCELLED); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + verifyNoMoreInteractions(mockRealTransport); + verifyNoMoreInteractions(mockRealStream); + } + + @Test public void startThenCancelStreamWithoutSetTransport() { + ClientStream stream = delayedTransport.newStream(method, new Metadata()); + stream.start(streamListener); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + stream.cancel(Status.CANCELLED); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + verify(streamListener).closed(same(Status.CANCELLED), any(Metadata.class)); + verifyNoMoreInteractions(mockRealTransport); + verifyNoMoreInteractions(mockRealStream); + } + + @Test public void newStreamThenShutdownTransportThenCancelStream() { + ClientStream stream = delayedTransport.newStream(method, new Metadata()); + delayedTransport.shutdown(); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener, times(0)).transportTerminated(); + assertEquals(1, delayedTransport.getPendingStreamsCount()); + stream.cancel(Status.CANCELLED); + verify(transportListener).transportTerminated(); + assertEquals(0, delayedTransport.getPendingStreamsCount()); + verifyNoMoreInteractions(mockRealTransport); + verifyNoMoreInteractions(mockRealStream); + } + + @Test public void setTransportThenShutdownThenPing() { + delayedTransport.setTransport(mockRealTransport); + delayedTransport.shutdown(); + delayedTransport.ping(pingCallback, mockExecutor); + verify(mockRealTransport).ping(same(pingCallback), same(mockExecutor)); + } + + @Test public void pingThenSetTransport() { + delayedTransport.ping(pingCallback, mockExecutor); + delayedTransport.setTransport(mockRealTransport); + verify(mockRealTransport).ping(same(pingCallback), same(mockExecutor)); + } + + @Test public void shutdownThenNewStream() { + delayedTransport.shutdown(); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + ClientStream stream = delayedTransport.newStream(method, new Metadata()); + stream.start(streamListener); + verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); + assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); + } + + @Test public void startStreamThenShutdownNow() { + ClientStream stream = delayedTransport.newStream(method, new Metadata()); + stream.start(streamListener); + delayedTransport.shutdownNow(Status.UNAVAILABLE); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); + assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); + } + + @Test public void shutdownNowThenNewStream() { + delayedTransport.shutdownNow(Status.UNAVAILABLE); + verify(transportListener).transportShutdown(any(Status.class)); + verify(transportListener).transportTerminated(); + ClientStream stream = delayedTransport.newStream(method, new Metadata()); + stream.start(streamListener); + verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class)); + assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode()); + } +} diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java index 3b08447900..267fb81d50 100644 --- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java @@ -63,7 +63,6 @@ public class DelayedStreamTest { @Rule public final ExpectedException thrown = ExpectedException.none(); @Mock private ClientStreamListener listener; - @Mock private ClientTransport transport; @Mock private ClientStream realStream; @Captor private ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); private DelayedStream stream = new DelayedStream(); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index cfb4b545f9..bc13b4b175 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -117,7 +117,7 @@ public class ManagedChannelImplTest { @Rule public final ExpectedException thrown = ExpectedException.none(); @Mock - private ClientTransport mockTransport; + private ManagedClientTransport mockTransport; @Mock private ClientTransportFactory mockTransportFactory; @Mock @@ -127,8 +127,8 @@ public class ManagedChannelImplTest { @Mock private ClientCall.Listener mockCallListener3; - private ArgumentCaptor transportListenerCaptor = - ArgumentCaptor.forClass(ClientTransport.Listener.class); + private ArgumentCaptor transportListenerCaptor = + ArgumentCaptor.forClass(ManagedClientTransport.Listener.class); private ArgumentCaptor streamListenerCaptor = ArgumentCaptor.forClass(ClientStreamListener.class); @@ -183,7 +183,6 @@ public class ManagedChannelImplTest { verifyNoMoreInteractions(mockTransportFactory); // Create transport and call - ClientTransport mockTransport = mock(ClientTransport.class); ClientStream mockStream = mock(ClientStream.class); Metadata headers = new Metadata(); when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class))) @@ -193,7 +192,7 @@ public class ManagedChannelImplTest { verify(mockTransportFactory, timeout(1000)) .newClientTransport(same(socketAddress), eq(authority)); verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture()); - ClientTransport.Listener transportListener = transportListenerCaptor.getValue(); + ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue(); verify(mockTransport, timeout(1000)).newStream(same(method), same(headers)); verify(mockStream).start(streamListenerCaptor.capture()); verify(mockStream).setCompressor(isA(Compressor.class)); @@ -277,7 +276,7 @@ public class ManagedChannelImplTest { call.cancel(); verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture()); - final ClientTransport.Listener transportListener = transportListenerCaptor.getValue(); + final ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue(); final Object lock = new Object(); final CyclicBarrier barrier = new CyclicBarrier(2); new Thread() { @@ -386,8 +385,8 @@ public class ManagedChannelImplTest { final SocketAddress badAddress = new SocketAddress() {}; final ResolvedServerInfo goodServer = new ResolvedServerInfo(goodAddress, Attributes.EMPTY); final ResolvedServerInfo badServer = new ResolvedServerInfo(badAddress, Attributes.EMPTY); - final ClientTransport goodTransport = mock(ClientTransport.class); - final ClientTransport badTransport = mock(ClientTransport.class); + final ManagedClientTransport goodTransport = mock(ManagedClientTransport.class); + final ManagedClientTransport badTransport = mock(ManagedClientTransport.class); when(mockTransportFactory.newClientTransport(same(goodAddress), any(String.class))) .thenReturn(goodTransport); when(mockTransportFactory.newClientTransport(same(badAddress), any(String.class))) @@ -413,8 +412,8 @@ public class ManagedChannelImplTest { // First try should fail with the bad address. call.start(mockCallListener, headers); - ArgumentCaptor badTransportListenerCaptor = - ArgumentCaptor.forClass(ClientTransport.Listener.class); + ArgumentCaptor badTransportListenerCaptor = + ArgumentCaptor.forClass(ManagedClientTransport.Listener.class); verify(mockCallListener, timeout(1000)).onClose(same(Status.UNAVAILABLE), any(Metadata.class)); verify(badTransport, timeout(1000)).start(badTransportListenerCaptor.capture()); badTransportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE); diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java index 443e569d87..d5b623591c 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java @@ -55,6 +55,7 @@ import io.grpc.LoadBalancer; import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.TransportManager; +import io.grpc.internal.TestUtils.MockClientTransportInfo; import org.junit.After; import org.junit.Before; @@ -148,10 +149,10 @@ public class ManagedChannelImplTransportManagerTest { @Test public void createAndReuseTransport() throws Exception { - doAnswer(new Answer() { + doAnswer(new Answer() { @Override - public ClientTransport answer(InvocationOnMock invocation) throws Throwable { - return mock(ClientTransport.class); + public ManagedClientTransport answer(InvocationOnMock invocation) throws Throwable { + return mock(ManagedClientTransport.class); } }).when(mockTransportFactory).newClientTransport(any(SocketAddress.class), any(String.class)); @@ -173,8 +174,8 @@ public class ManagedChannelImplTransportManagerTest { SocketAddress addr2 = mock(SocketAddress.class); EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(Arrays.asList(addr1, addr2)); - LinkedList listeners = - TestUtils.captureListeners(mockTransportFactory); + LinkedList transports = + TestUtils.captureTransports(mockTransportFactory); // Invocation counters int backoffReset = 0; @@ -185,7 +186,7 @@ public class ManagedChannelImplTransportManagerTest { verify(mockTransportFactory).newClientTransport(addr1, authority); verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); // Fail the first transport, without setting it to ready - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Subsequent getTransport() will use the next address ListenableFuture future2a = tm.getTransport(addressGroup); @@ -197,9 +198,9 @@ public class ManagedChannelImplTransportManagerTest { assertSame(future2a.get(), future2b.get()); assertNotSame(future1.get(), future2a.get()); // Make the second transport ready - listeners.peek().transportReady(); + transports.peek().listener.transportReady(); // Disconnect the second transport - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Subsequent getTransport() will use the next address, which is the first one since we have run // out of addresses. @@ -212,7 +213,7 @@ public class ManagedChannelImplTransportManagerTest { verify(mockBackoffPolicy, times(0)).nextBackoffMillis(); verify(mockTransportFactory, times(2)).newClientTransport(addr1, authority); verifyNoMoreInteractions(mockTransportFactory); - assertEquals(1, listeners.size()); + assertEquals(1, transports.size()); } @Test @@ -221,8 +222,8 @@ public class ManagedChannelImplTransportManagerTest { SocketAddress addr2 = mock(SocketAddress.class); EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(Arrays.asList(addr1, addr2)); - LinkedList listeners = - TestUtils.captureListeners(mockTransportFactory); + LinkedList transports = + TestUtils.captureTransports(mockTransportFactory); // Invocation counters int transportsAddr1 = 0; @@ -236,9 +237,9 @@ public class ManagedChannelImplTransportManagerTest { verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Back-off policy was set initially. verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); - listeners.peek().transportReady(); + transports.peek().listener.transportReady(); // Then close it - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Second pick fails. This is the beginning of a series of failures. ListenableFuture future2 = tm.getTransport(addressGroup); @@ -246,7 +247,7 @@ public class ManagedChannelImplTransportManagerTest { verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); // Back-off policy was reset. verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Third pick fails too ListenableFuture future3 = tm.getTransport(addressGroup); @@ -254,7 +255,7 @@ public class ManagedChannelImplTransportManagerTest { verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Back-off policy was not reset. verify(mockBackoffPolicyProvider, times(backoffReset)).get(); - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Forth pick is on addr2, back-off policy kicks in. ListenableFuture future4 = tm.getTransport(addressGroup); diff --git a/core/src/test/java/io/grpc/internal/TestUtils.java b/core/src/test/java/io/grpc/internal/TestUtils.java index 3d74d510cf..fcc1979c0a 100644 --- a/core/src/test/java/io/grpc/internal/TestUtils.java +++ b/core/src/test/java/io/grpc/internal/TestUtils.java @@ -34,6 +34,10 @@ package io.grpc.internal; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -46,31 +50,53 @@ import java.util.LinkedList; */ final class TestUtils { - /** - * Stub the given mock {@link ClientTransportFactory} by returning mock {@link ClientTransport}s - * which saves their listeners to a list which is returned by this method. - */ - static LinkedList captureListeners( - ClientTransportFactory mockTransportFactory) { - final LinkedList listeners = - new LinkedList(); + static class MockClientTransportInfo { + /** + * A mock transport created by the mock transport factory. + */ + final ManagedClientTransport transport; - doAnswer(new Answer() { + /** + * The listener passed to the start() of the mock transport. + */ + final ManagedClientTransport.Listener listener; + + MockClientTransportInfo(ManagedClientTransport transport, + ManagedClientTransport.Listener listener) { + this.transport = transport; + this.listener = listener; + } + } + + /** + * Stub the given mock {@link ClientTransportFactory} by returning mock + * {@link ManagedClientTransport}s which saves their listeners along with them. This method + * returns a list of {@link MockClientTransportInfo}, each of which is a started mock transport + * and its listener. + */ + static LinkedList captureTransports( + ClientTransportFactory mockTransportFactory) { + final LinkedList captor = new LinkedList(); + + doAnswer(new Answer() { @Override - public ClientTransport answer(InvocationOnMock invocation) throws Throwable { - ClientTransport mockTransport = mock(ClientTransport.class); + public ManagedClientTransport answer(InvocationOnMock invocation) throws Throwable { + final ManagedClientTransport mockTransport = mock(ManagedClientTransport.class); + when(mockTransport.newStream(any(MethodDescriptor.class), any(Metadata.class))) + .thenReturn(mock(ClientStream.class)); // Save the listener doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - listeners.add((ClientTransport.Listener) invocation.getArguments()[0]); + captor.add(new MockClientTransportInfo( + mockTransport, (ManagedClientTransport.Listener) invocation.getArguments()[0])); return null; } - }).when(mockTransport).start(any(ClientTransport.Listener.class)); + }).when(mockTransport).start(any(ManagedClientTransport.Listener.class)); return mockTransport; } }).when(mockTransportFactory).newClientTransport(any(SocketAddress.class), any(String.class)); - return listeners; + return captor; } } diff --git a/core/src/test/java/io/grpc/internal/TransportSetTest.java b/core/src/test/java/io/grpc/internal/TransportSetTest.java index 6ebc58caaf..0d1be9c2ed 100644 --- a/core/src/test/java/io/grpc/internal/TransportSetTest.java +++ b/core/src/test/java/io/grpc/internal/TransportSetTest.java @@ -31,21 +31,29 @@ package io.grpc.internal; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.same; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.EquivalentAddressGroup; +import io.grpc.IntegerMarshaller; import io.grpc.LoadBalancer; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import io.grpc.Status; +import io.grpc.StringMarshaller; +import io.grpc.internal.TestUtils.MockClientTransportInfo; import org.junit.Before; import org.junit.Test; @@ -77,10 +85,16 @@ public class TransportSetTest { @Mock private BackoffPolicy.Provider mockBackoffPolicyProvider; @Mock private ClientTransportFactory mockTransportFactory; @Mock private TransportSet.Callback mockTransportSetCallback; + @Mock private ClientStreamListener mockStreamListener; + + private final MethodDescriptor method = MethodDescriptor.create( + MethodDescriptor.MethodType.UNKNOWN, "/service/method", + new StringMarshaller(), new IntegerMarshaller()); + private final Metadata headers = new Metadata(); private TransportSet transportSet; private EquivalentAddressGroup addressGroup; - private LinkedList listeners; + private LinkedList transports; @Before public void setUp() { MockitoAnnotations.initMocks(this); @@ -91,7 +105,7 @@ public class TransportSetTest { when(mockBackoffPolicy1.nextBackoffMillis()).thenReturn(10L, 100L); when(mockBackoffPolicy2.nextBackoffMillis()).thenReturn(10L, 100L); when(mockBackoffPolicy3.nextBackoffMillis()).thenReturn(10L, 100L); - listeners = TestUtils.captureListeners(mockTransportFactory); + transports = TestUtils.captureTransports(mockTransportFactory); } @Test public void singleAddressBackoff() { @@ -109,7 +123,7 @@ public class TransportSetTest { verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority); // Fail this one - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Second attempt uses the first back-off value interval. transportSet.obtainActiveTransport(); @@ -121,7 +135,7 @@ public class TransportSetTest { fakeClock.forwardMillis(1); verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority); // Fail this one too - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Third attempt uses the second back-off interval. transportSet.obtainActiveTransport(); @@ -133,9 +147,9 @@ public class TransportSetTest { fakeClock.forwardMillis(1); verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority); // Let this one succeed - listeners.peek().transportReady(); + transports.peek().listener.transportReady(); // And close it - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Back-off is reset, and the next attempt will happen immediately transportSet.obtainActiveTransport(); @@ -165,9 +179,9 @@ public class TransportSetTest { verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Let this one through - listeners.peek().transportReady(); + transports.peek().listener.transportReady(); // Then shut it down - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); ////// Now start a series of failing attempts, where addr2 is the head. @@ -176,14 +190,14 @@ public class TransportSetTest { verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); // Fail this one - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Second attempt will start immediately. Keep back-off policy. transportSet.obtainActiveTransport(); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Fail this one too - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Third attempt is on head, thus controlled by the first back-off interval. transportSet.obtainActiveTransport(); @@ -194,14 +208,14 @@ public class TransportSetTest { fakeClock.forwardMillis(1); verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); // Fail this one too - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Forth attempt will start immediately. Keep back-off policy. transportSet.obtainActiveTransport(); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Fail this one too - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Fifth attempt is on head, thus controlled by the second back-off interval. transportSet.obtainActiveTransport(); @@ -212,9 +226,9 @@ public class TransportSetTest { fakeClock.forwardMillis(1); verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); // Let it through - listeners.peek().transportReady(); + transports.peek().listener.transportReady(); // Then close it. - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); ////// Now start a series of failing attempts, where addr1 is the head. @@ -223,14 +237,14 @@ public class TransportSetTest { verify(mockBackoffPolicyProvider, times(++backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority); // Fail this one - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Second attempt will start immediately. Keep back-off policy. transportSet.obtainActiveTransport(); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority); // Fail this one too - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Third attempt is on head, thus controlled by the first back-off interval. transportSet.obtainActiveTransport(); @@ -263,7 +277,7 @@ public class TransportSetTest { verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority); // Fail this one - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Won't reconnect until requested, even if back-off time has expired fakeClock.forwardMillis(10); @@ -274,7 +288,7 @@ public class TransportSetTest { verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority); // Fail this one, too - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); // Request immediately, but will wait for back-off before reconnecting assertNotNull(transportSet.obtainActiveTransport()); @@ -284,7 +298,7 @@ public class TransportSetTest { } @Test - public void shutdownBeforeTransportCreated() throws Exception { + public void shutdownBeforeTransportCreatedWithPendingStream() throws Exception { SocketAddress addr = mock(SocketAddress.class); createTransortSet(addr); @@ -294,17 +308,79 @@ public class TransportSetTest { assertTrue(pick.isDone()); assertNotNull(pick.get()); // Fail this one - listeners.poll().transportShutdown(Status.UNAVAILABLE); + MockClientTransportInfo transportInfo = transports.poll(); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo.listener.transportTerminated(); // Second transport will wait for back-off pick = transportSet.obtainActiveTransport(); - assertFalse(pick.isDone()); + assertTrue(pick.isDone()); + assertTrue(pick.get() instanceof DelayedClientTransport); + // Start a stream, which will be pending in the delayed transport + ClientStream pendingStream = pick.get().newStream(method, headers); + pendingStream.start(mockStreamListener); - // Shut down TransportSet before the transort is created. + // Shut down TransportSet before the transport is created. Further call to + // obtainActiveTransport() gets null results. transportSet.shutdown(); + pick = transportSet.obtainActiveTransport(); assertTrue(pick.isDone()); assertNull(pick.get()); verify(mockTransportFactory).newClientTransport(addr, authority); + + // Reconnect will eventually happen, even though TransportSet has been shut down + fakeClock.forwardMillis(10); + verify(mockTransportFactory, times(2)).newClientTransport(addr, authority); + // The pending stream will be started on this newly started transport, which is promptly shut + // down by TransportSet right after the stream is created. + transportInfo = transports.poll(); + verify(transportInfo.transport).newStream(same(method), same(headers)); + verify(transportInfo.transport).shutdown(); + verify(mockTransportSetCallback, never()).onTerminated(); + // Terminating the transport will let TransportSet to be terminated. + transportInfo.listener.transportTerminated(); + verify(mockTransportSetCallback).onTerminated(); + + // No more transports will be created. + fakeClock.forwardMillis(10000); + verifyNoMoreInteractions(mockTransportFactory); + assertEquals(0, transports.size()); + } + + @Test + public void shutdownBeforeTransportCreatedWithoutPendingStream() throws Exception { + SocketAddress addr = mock(SocketAddress.class); + createTransortSet(addr); + + // First transport is created immediately + ListenableFuture pick = transportSet.obtainActiveTransport(); + verify(mockTransportFactory).newClientTransport(addr, authority); + assertTrue(pick.isDone()); + assertNotNull(pick.get()); + // Fail this one + MockClientTransportInfo transportInfo = transports.poll(); + transportInfo.listener.transportShutdown(Status.UNAVAILABLE); + transportInfo.listener.transportTerminated(); + + // Second transport will wait for back-off + pick = transportSet.obtainActiveTransport(); + assertTrue(pick.isDone()); + assertTrue(pick.get() instanceof DelayedClientTransport); + + // Shut down TransportSet before the transport is created. Futher call to + // obtainActiveTransport() gets null results. + transportSet.shutdown(); + pick = transportSet.obtainActiveTransport(); + assertTrue(pick.isDone()); + assertNull(pick.get()); + + // TransportSet terminated promptly. + verify(mockTransportSetCallback).onTerminated(); + + // No more transports will be created. + fakeClock.forwardMillis(10000); + verifyNoMoreInteractions(mockTransportFactory); + assertEquals(0, transports.size()); } @Test @@ -321,26 +397,24 @@ public class TransportSetTest { } @Test - public void cancellingTransportFutureWontAffectOtherCallers() { + public void futuresAreAlwaysComplete() throws Exception { SocketAddress addr = mock(SocketAddress.class); createTransortSet(addr); // Fail the first pick so that the next pick will be pending on back-off ListenableFuture future0 = transportSet.obtainActiveTransport(); assertTrue(future0.isDone()); - listeners.poll().transportShutdown(Status.UNAVAILABLE); + transports.poll().listener.transportShutdown(Status.UNAVAILABLE); ListenableFuture future1 = transportSet.obtainActiveTransport(); ListenableFuture future2 = transportSet.obtainActiveTransport(); // These futures are pending on back-off - assertFalse(future1.isDone()); - assertFalse(future2.isDone()); + assertTrue(future1.isDone()); + assertTrue(future2.isDone()); - // Cancel future1 - future1.cancel(false); - // future2 is not affected. future1 can either be really cancelled or not, which we don't care. - assertFalse(future2.isDone()); + assertTrue(future1.get() instanceof DelayedClientTransport); + assertTrue(future2.get() instanceof DelayedClientTransport); } private void createTransortSet(SocketAddress ... addrs) { diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 1d5ca47d73..9420c096d3 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -43,9 +43,9 @@ import io.grpc.Internal; import io.grpc.NameResolver; import io.grpc.internal.AbstractManagedChannelImplBuilder; import io.grpc.internal.AbstractReferenceCounted; -import io.grpc.internal.ClientTransport; import io.grpc.internal.ClientTransportFactory; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.SharedResourceHolder; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; @@ -304,7 +304,8 @@ public class NettyChannelBuilder extends AbstractManagedChannelImplBuilder 0, "maxHeaderListSize must be positive"); @@ -131,7 +131,7 @@ class NettyClientHandler extends AbstractNettyHandler { static NettyClientHandler newHandler(Http2Connection connection, Http2FrameReader frameReader, Http2FrameWriter frameWriter, - final ClientTransport.Listener listener, + final ManagedClientTransport.Listener listener, int flowControlWindow, Ticker ticker) { Preconditions.checkNotNull(connection, "connection"); diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 35ca3088e3..8b0bf00662 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -40,7 +40,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.internal.ClientStream; -import io.grpc.internal.ClientTransport; +import io.grpc.internal.ManagedClientTransport; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -56,9 +56,9 @@ import java.util.concurrent.Executor; import javax.annotation.concurrent.GuardedBy; /** - * A Netty-based {@link ClientTransport} implementation. + * A Netty-based {@link ManagedClientTransport} implementation. */ -class NettyClientTransport implements ClientTransport { +class NettyClientTransport implements ManagedClientTransport { private final SocketAddress address; private final Class channelType; private final EventLoopGroup group; diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 6898eaaf71..06c93e9171 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -50,8 +50,8 @@ import io.grpc.Status; import io.grpc.StatusException; import io.grpc.internal.ClientStream; import io.grpc.internal.ClientStreamListener; -import io.grpc.internal.ClientTransport; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.ServerListener; import io.grpc.internal.ServerStream; import io.grpc.internal.ServerStreamListener; @@ -92,7 +92,7 @@ import java.util.concurrent.TimeoutException; public class NettyClientTransportTest { @Mock - private ClientTransport.Listener clientTransportListener; + private ManagedClientTransport.Listener clientTransportListener; private final List transports = new ArrayList(); private NioEventLoopGroup group; diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java index 25426529aa..c737a94952 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java @@ -46,9 +46,9 @@ import io.grpc.ExperimentalApi; import io.grpc.NameResolver; import io.grpc.internal.AbstractManagedChannelImplBuilder; import io.grpc.internal.AbstractReferenceCounted; -import io.grpc.internal.ClientTransport; import io.grpc.internal.ClientTransportFactory; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.SharedResourceHolder.Resource; @@ -255,7 +255,7 @@ public class OkHttpChannelBuilder extends } @Override - public ClientTransport newClientTransport(SocketAddress addr, String authority) { + public ManagedClientTransport newClientTransport(SocketAddress addr, String authority) { InetSocketAddress inetSocketAddr = (InetSocketAddress) addr; return new OkHttpClientTransport(inetSocketAddr, authority, executor, socketFactory, Utils.convertSpec(connectionSpec), maxMessageSize); diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 33c4725bd7..61583c2c1c 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -44,9 +44,9 @@ import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; import io.grpc.Status.Code; -import io.grpc.internal.ClientTransport; import io.grpc.internal.GrpcUtil; import io.grpc.internal.Http2Ping; +import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.SerializingExecutor; import io.grpc.okhttp.internal.ConnectionSpec; import io.grpc.okhttp.internal.framed.ErrorCode; @@ -84,9 +84,9 @@ import javax.annotation.concurrent.GuardedBy; import javax.net.ssl.SSLSocketFactory; /** - * A okhttp-based {@link ClientTransport} implementation. + * A okhttp-based {@link ManagedClientTransport} implementation. */ -class OkHttpClientTransport implements ClientTransport { +class OkHttpClientTransport implements ManagedClientTransport { private static final Map ERROR_CODE_TO_STATUS; private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName()); private static final OkHttpClientStream[] EMPTY_STREAM_ARRAY = new OkHttpClientStream[0]; diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index fa6174436f..66be3357ad 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -73,6 +73,7 @@ import io.grpc.internal.AbstractStream; import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientTransport; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.ManagedClientTransport; import io.grpc.okhttp.OkHttpClientTransport.ClientFrameHandler; import io.grpc.okhttp.internal.ConnectionSpec; import io.grpc.okhttp.internal.framed.ErrorCode; @@ -131,7 +132,7 @@ public class OkHttpClientTransportTest { @Mock MethodDescriptor method; @Mock - private ClientTransport.Listener transportListener; + private ManagedClientTransport.Listener transportListener; private OkHttpClientTransport clientTransport; private MockFrameReader frameReader; private ExecutorService executor; @@ -1323,7 +1324,7 @@ public class OkHttpClientTransportTest { ConnectionSpec.CLEARTEXT, DEFAULT_MAX_MESSAGE_SIZE); - ClientTransport.Listener listener = mock(ClientTransport.Listener.class); + ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class); clientTransport.start(listener); ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture());