diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
index 4eeb1c96ac..20650451a9 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
@@ -36,8 +36,8 @@ import com.google.common.base.Preconditions;
import io.grpc.ExperimentalApi;
import io.grpc.internal.AbstractManagedChannelImplBuilder;
import io.grpc.internal.AbstractReferenceCounted;
-import io.grpc.internal.ClientTransport;
import io.grpc.internal.ClientTransportFactory;
+import io.grpc.internal.ManagedClientTransport;
import java.net.SocketAddress;
@@ -89,7 +89,7 @@ public class InProcessChannelBuilder extends
}
@Override
- public ClientTransport newClientTransport(SocketAddress addr, String authority) {
+ public ManagedClientTransport newClientTransport(SocketAddress addr, String authority) {
return new InProcessTransport(name);
}
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java
index a620b1e397..f541ba476c 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java
@@ -40,7 +40,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
-import io.grpc.internal.ClientTransport;
+import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.NoopClientStream;
import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerStreamListener;
@@ -59,12 +59,12 @@ import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
-class InProcessTransport implements ServerTransport, ClientTransport {
+class InProcessTransport implements ServerTransport, ManagedClientTransport {
private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
private final String name;
private ServerTransportListener serverTransportListener;
- private ClientTransport.Listener clientTransportListener;
+ private ManagedClientTransport.Listener clientTransportListener;
@GuardedBy("this")
private boolean shutdown;
@GuardedBy("this")
@@ -79,7 +79,7 @@ class InProcessTransport implements ServerTransport, ClientTransport {
}
@Override
- public synchronized void start(ClientTransport.Listener listener) {
+ public synchronized void start(ManagedClientTransport.Listener listener) {
this.clientTransportListener = listener;
InProcessServer server = InProcessServer.findServer(name);
if (server != null) {
@@ -152,7 +152,7 @@ class InProcessTransport implements ServerTransport, ClientTransport {
@Override
public synchronized void shutdown() {
- // Can be called multiple times: once for ClientTransport, once for ServerTransport.
+ // Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
if (shutdown) {
return;
}
diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
index 990e368b39..441e18f376 100644
--- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java
@@ -230,7 +230,8 @@ public abstract class AbstractManagedChannelImplBuilder
}
@Override
- public ClientTransport newClientTransport(SocketAddress serverAddress, String authority) {
+ public ManagedClientTransport newClientTransport(SocketAddress serverAddress,
+ String authority) {
return factory.newClientTransport(
serverAddress, authorityOverride != null ? authorityOverride : authority);
}
diff --git a/core/src/main/java/io/grpc/internal/ClientTransport.java b/core/src/main/java/io/grpc/internal/ClientTransport.java
index 298c98787f..c4f1ee6215 100644
--- a/core/src/main/java/io/grpc/internal/ClientTransport.java
+++ b/core/src/main/java/io/grpc/internal/ClientTransport.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014, Google Inc. All rights reserved.
+ * Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
@@ -33,25 +33,21 @@ package io.grpc.internal;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
-import io.grpc.Status;
import java.util.concurrent.Executor;
-
import javax.annotation.concurrent.ThreadSafe;
/**
- * The client-side transport encapsulating a single connection to a remote server. Allows creation
- * of new {@link Stream} instances for communication with the server. All methods on the transport
- * and its listener are expected to execute quickly.
- *
- *
{@link #start} must be the first method call to this interface and return before calling other
- * methods.
+ * The client-side transport typically encapsulating a single connection to a remote
+ * server. However, streams created before the client has discovered any server address may
+ * eventually be issued on different connections. All methods on the transport and its callbacks
+ * are expected to execute quickly.
*/
@ThreadSafe
public interface ClientTransport {
/**
- * Creates a new stream for sending messages to the remote end-point.
+ * Creates a new stream for sending messages to a remote end-point.
*
*
* This method returns immediately and does not wait for any validation of the request. If
@@ -67,64 +63,18 @@ public interface ClientTransport {
ClientStream newStream(MethodDescriptor, ?> method, Metadata headers);
/**
- * Starts transport. This method may only be called once.
+ * Pings a remote endpoint. When an acknowledgement is received, the given callback will be
+ * invoked using the given executor.
*
- *
Implementations must not call {@code listener} from within {@link #start}; implementations
- * are expected to notify listener on a separate thread. This method should not throw any
- * exceptions.
- *
- * @param listener non-{@code null} listener of transport events
- */
- void start(Listener listener);
-
- /**
- * Pings the remote endpoint to verify that the transport is still active. When an acknowledgement
- * is received, the given callback will be invoked using the given executor.
+ *
Pings are not necessarily sent to the same endpont, thus a successful ping only means at
+ * least one endpoint responded, but doesn't imply the availability of other endpoints (if there
+ * is any).
*
*
This is an optional method. Transports that do not have any mechanism by which to ping the
* remote endpoint may throw {@link UnsupportedOperationException}.
*/
void ping(PingCallback callback, Executor executor);
- /**
- * Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will
- * fail (once {@link Listener#transportShutdown} callback called). This method may only be called
- * once.
- */
- void shutdown();
-
- /**
- * Receives notifications for the transport life-cycle events. Implementation does not need to be
- * thread-safe, so notifications must be properly sychronized externally.
- */
- interface Listener {
- /**
- * The transport is shutting down. No new streams will be processed, but existing streams may
- * continue. Shutdown could have been caused by an error or normal operation. It is possible
- * that this method is called without {@link #shutdown} being called. If the argument to this
- * function is {@link Status#isOk}, it is safe to immediately reconnect.
- *
- *
This is called exactly once, and must be called prior to {@link #transportTerminated}.
- *
- * @param s the reason for the shutdown.
- */
- void transportShutdown(Status s);
-
- /**
- * The transport completed shutting down. All resources have been released.
- *
- *
This is called exactly once, and must be called after {@link #transportShutdown} has been
- * called.
- */
- void transportTerminated();
-
- /**
- * The transport is ready to accept traffic, because the connection is established. This is
- * called at most once.
- */
- void transportReady();
- }
-
/**
* A callback that is invoked when the acknowledgement to a {@link #ping} is received. Exactly one
* of the two methods should be called per {@link #ping}.
diff --git a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
index bc06a10966..7172a80533 100644
--- a/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
+++ b/core/src/main/java/io/grpc/internal/ClientTransportFactory.java
@@ -33,7 +33,7 @@ package io.grpc.internal;
import java.net.SocketAddress;
-/** Pre-configured factory for creating {@link ClientTransport} instances. */
+/** Pre-configured factory for creating {@link ManagedClientTransport} instances. */
public interface ClientTransportFactory extends ReferenceCounted {
/**
* Creates an unstarted transport for exclusive use.
@@ -41,5 +41,5 @@ public interface ClientTransportFactory extends ReferenceCounted {
* @param serverAddress the address that the transport is connected to
* @param authority the HTTP/2 authority of the server
*/
- ClientTransport newClientTransport(SocketAddress serverAddress, String authority);
+ ManagedClientTransport newClientTransport(SocketAddress serverAddress, String authority);
}
diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java
new file mode 100644
index 0000000000..7869c68e97
--- /dev/null
+++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.internal;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.Executor;
+
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * A client transport that queues requests before a real transport is available. When a backing
+ * transport is later provided, this class delegates to it.
+ *
+ *
This transport owns the streams that it has created before {@link #setTransport} is
+ * called. When {@link #setTransport} is called, the ownership of pending streams and subsequent new
+ * streams are transferred to the given transport, thus this transport won't own any stream.
+ */
+class DelayedClientTransport implements ManagedClientTransport {
+ private final Object lock = new Object();
+
+ private Listener listener;
+ /** 'lock' must be held when assigning to delegate. */
+ private volatile ClientTransport delegate;
+
+ @GuardedBy("lock")
+ private Collection pendingStreams = new HashSet();
+ @GuardedBy("lock")
+ private Collection pendingPings = new ArrayList();
+ /**
+ * When shutdown == true and pendingStreams == null, then the transport is considered terminated.
+ */
+ @GuardedBy("lock")
+ private boolean shutdown;
+
+ @Override
+ public void start(Listener listener) {
+ this.listener = Preconditions.checkNotNull(listener, "listener");
+ }
+
+ @Override
+ public ClientStream newStream(MethodDescriptor, ?> method, Metadata headers) {
+ ClientTransport transport = delegate;
+ if (transport != null) {
+ return transport.newStream(method, headers);
+ }
+ synchronized (lock) {
+ // Check again, since it may have changed while waiting for lock
+ transport = delegate;
+ if (transport != null) {
+ return transport.newStream(method, headers);
+ }
+ if (!shutdown) {
+ PendingStream pendingStream = new PendingStream(method, headers);
+ pendingStreams.add(pendingStream);
+ return pendingStream;
+ }
+ }
+ DelayedStream stream = new DelayedStream();
+ stream.setError(Status.UNAVAILABLE.withDescription("transport shutdown"));
+ return stream;
+ }
+
+ public void ping(final PingCallback callback, Executor executor) {
+ ClientTransport transport = delegate;
+ if (transport != null) {
+ transport.ping(callback, executor);
+ return;
+ }
+ synchronized (lock) {
+ // Check again, since it may have changed while waiting for lock
+ transport = delegate;
+ if (transport != null) {
+ transport.ping(callback, executor);
+ return;
+ }
+ if (!shutdown) {
+ PendingPing pendingPing = new PendingPing(callback, executor);
+ pendingPings.add(pendingPing);
+ return;
+ }
+ }
+ executor.execute(new Runnable() {
+ @Override public void run() {
+ callback.onFailure(
+ Status.UNAVAILABLE.withDescription("transport shutdown").asException());
+ }
+ });
+ }
+
+ /**
+ * Prevents creating any new streams until {@link #setTransport} is called. Buffered streams are
+ * not failed, so if {@link #shutdown} is called when {@link #setTransport} has not been called,
+ * you still need to call {@link setTransport} to make this transport terminated.
+ */
+ @Override
+ public void shutdown() {
+ synchronized (lock) {
+ if (shutdown) {
+ return;
+ }
+ shutdown = true;
+ listener.transportShutdown(
+ Status.OK.withDescription("Channel requested transport to shut down"));
+ if (pendingStreams == null || pendingStreams.isEmpty()) {
+ pendingStreams = null;
+ listener.transportTerminated();
+ }
+ }
+ }
+
+ /**
+ * Shuts down this transport and cancels all streams that it owns, hence immediately terminates
+ * this transport.
+ */
+ public void shutdownNow(Status status) {
+ shutdown();
+ Collection savedPendingStreams = null;
+ synchronized (lock) {
+ if (pendingStreams != null) {
+ savedPendingStreams = pendingStreams;
+ pendingStreams = null;
+ }
+ }
+ if (savedPendingStreams != null) {
+ for (PendingStream stream : savedPendingStreams) {
+ stream.setError(status);
+ }
+ listener.transportTerminated();
+ }
+ // If savedPendingStreams == null, transportTerminated() has already been called in shutdown().
+ }
+
+ /**
+ * Transfers all the pending and future requests and pings to the given transport.
+ *
+ * May only be called after {@link #start(Listener)}.
+ *
+ *
{@code transport} will be used for all future calls to {@link #newStream}, even if this
+ * transport is {@link #shutdown}.
+ */
+ public void setTransport(ClientTransport transport) {
+ Collection savedPendingStreams;
+ synchronized (lock) {
+ Preconditions.checkState(delegate == null, "setTransport already called");
+ Preconditions.checkState(listener != null, "start() not called");
+ delegate = Preconditions.checkNotNull(transport, "transport");
+ for (PendingPing ping : pendingPings) {
+ ping.createRealPing(transport);
+ }
+ pendingPings = null;
+ if (shutdown && pendingStreams != null) {
+ listener.transportTerminated();
+ }
+ savedPendingStreams = pendingStreams;
+ pendingStreams = null;
+ if (!shutdown) {
+ listener.transportReady();
+ }
+ }
+ // createRealStream may be expensive, so we run it outside of the lock. It will start real
+ // streams on the transport. If there are pending requests, they will be serialized too, which
+ // may be expensive.
+ // TODO(zhangkun83): may consider doing it in a different thread.
+ if (savedPendingStreams != null) {
+ for (PendingStream stream : savedPendingStreams) {
+ stream.createRealStream(transport);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ int getPendingStreamsCount() {
+ synchronized (lock) {
+ return pendingStreams == null ? 0 : pendingStreams.size();
+ }
+ }
+
+ private class PendingStream extends DelayedStream {
+ private final MethodDescriptor, ?> method;
+ private final Metadata headers;
+
+ private PendingStream(MethodDescriptor, ?> method, Metadata headers) {
+ this.method = method;
+ this.headers = headers;
+ }
+
+ private void createRealStream(ClientTransport transport) {
+ setStream(transport.newStream(method, headers));
+ }
+
+ // TODO(zhangkun83): DelayedStream.setError() doesn't have a clearly-defined semantic to be
+ // overriden. Make it clear or find another method to override.
+ @Override
+ void setError(Status reason) {
+ synchronized (lock) {
+ if (pendingStreams != null) {
+ pendingStreams.remove(this);
+ if (shutdown && pendingStreams.isEmpty()) {
+ pendingStreams = null;
+ listener.transportTerminated();
+ }
+ }
+ }
+ super.setError(reason);
+ }
+ }
+
+ private static class PendingPing {
+ private final PingCallback callback;
+ private final Executor executor;
+
+ public PendingPing(PingCallback callback, Executor executor) {
+ this.callback = callback;
+ this.executor = executor;
+ }
+
+ public void createRealPing(ClientTransport transport) {
+ try {
+ transport.ping(callback, executor);
+ } catch (final UnsupportedOperationException ex) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ callback.onFailure(ex);
+ }
+ });
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/io/grpc/internal/DelayedStream.java b/core/src/main/java/io/grpc/internal/DelayedStream.java
index 89c7fcab2a..42c16ad219 100644
--- a/core/src/main/java/io/grpc/internal/DelayedStream.java
+++ b/core/src/main/java/io/grpc/internal/DelayedStream.java
@@ -53,6 +53,8 @@ import javax.annotation.concurrent.GuardedBy;
* DelayedStream} may be internally altered by different threads, thus internal synchronization is
* necessary.
*/
+// TODO(zhangkun83): merge it with DelayedClientTransport.PendingStream as it will be no longer
+// needed by ClientCallImpl as we move away from ListenableFuture
class DelayedStream implements ClientStream {
// set to non null once both listener and realStream are valid. After this point it is safe
diff --git a/core/src/main/java/io/grpc/internal/ManagedClientTransport.java b/core/src/main/java/io/grpc/internal/ManagedClientTransport.java
new file mode 100644
index 0000000000..02b53c5c19
--- /dev/null
+++ b/core/src/main/java/io/grpc/internal/ManagedClientTransport.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2016, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.internal;
+
+import io.grpc.Status;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * A {@link ClientTransport} that has life-cycle management.
+ *
+ * {@link #start} must be the first method call to this interface and return before calling other
+ * methods.
+ *
+ *
Typically the transport owns the streams it creates through {@link #newStream}, while some
+ * implementations may transfer the streams to somewhere else. Either way they must conform to the
+ * contract defined by {@link #shutdown}, {@link Listener#transportShutdown} and
+ * {@link Listener#transportTerminated}.
+ */
+@ThreadSafe
+public interface ManagedClientTransport extends ClientTransport {
+
+ /**
+ * Starts transport. This method may only be called once.
+ *
+ *
Implementations must not call {@code listener} from within {@link #start}; implementations
+ * are expected to notify listener on a separate thread. This method should not throw any
+ * exceptions.
+ *
+ * @param listener non-{@code null} listener of transport events
+ */
+ void start(Listener listener);
+
+ /**
+ * Initiates an orderly shutdown of the transport. Existing streams continue, but the transport
+ * will not own any new streams. New streams will either fail (once
+ * {@link Listener#transportShutdown} callback called), or be transferred off this transport (in
+ * which case they may succeed). This method may only be called once.
+ */
+ void shutdown();
+
+ /**
+ * Receives notifications for the transport life-cycle events. Implementation does not need to be
+ * thread-safe, so notifications must be properly sychronized externally.
+ */
+ interface Listener {
+ /**
+ * The transport is shutting down. This transport will stop owning new streams, but existing
+ * streams may continue, and the transport may still be able to process {@link #newStream} as
+ * long as it doesn't own the new streams. Shutdown could have been caused by an error or normal
+ * operation. It is possible that this method is called without {@link #shutdown} being called.
+ * If the argument to this function is {@link Status#isOk}, it is safe to immediately reconnect.
+ *
+ *
This is called exactly once, and must be called prior to {@link #transportTerminated}.
+ *
+ * @param s the reason for the shutdown.
+ */
+ void transportShutdown(Status s);
+
+ /**
+ * The transport completed shutting down. All resources have been released. All streams have
+ * either been closed or transferred off this transport. This transport may still be able to
+ * process {@link #newStream} as long as it doesn't own the new streams.
+ *
+ *
This is called exactly once, and must be called after {@link #transportShutdown} has been
+ * called.
+ */
+ void transportTerminated();
+
+ /**
+ * The transport is ready to accept traffic, because the connection is established. This is
+ * called at most once.
+ */
+ void transportReady();
+ }
+}
diff --git a/core/src/main/java/io/grpc/internal/TransportSet.java b/core/src/main/java/io/grpc/internal/TransportSet.java
index 4e8b523dcc..8b0216fb86 100644
--- a/core/src/main/java/io/grpc/internal/TransportSet.java
+++ b/core/src/main/java/io/grpc/internal/TransportSet.java
@@ -34,8 +34,7 @@ package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.EquivalentAddressGroup;
@@ -62,12 +61,6 @@ import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
final class TransportSet {
private static final Logger log = Logger.getLogger(TransportSet.class.getName());
- private static final UncancellableTransportFuture NULL_VALUE_FUTURE;
-
- static {
- NULL_VALUE_FUTURE = new UncancellableTransportFuture();
- NULL_VALUE_FUTURE.set(null);
- }
private final Object lock = new Object();
private final EquivalentAddressGroup addressGroup;
@@ -98,24 +91,35 @@ final class TransportSet {
private ScheduledFuture> reconnectTask;
/**
- * All transports that are not stopped. At the very least the value of {@link
- * activeTransportFuture} will be present, but previously used transports that still have streams
- * or are stopping may also be present.
+ * All transports that are not terminated. At the very least the value of {@link activeTransport}
+ * will be present, but previously used transports that still have streams or are stopping may
+ * also be present.
*/
@GuardedBy("lock")
- private final Collection transports = new ArrayList();
+ private final Collection transports =
+ new ArrayList();
private final LoadBalancer loadBalancer;
@GuardedBy("lock")
private boolean shutdown;
- /**
- * The future for the transport for new outgoing requests. 'lock' must be held when assigning
- * to it.
+ /*
+ * The transport for new outgoing requests.
+ * - If shutdown == true, activeTransport is null (shutdown)
+ * - Otherwise, if delayedTransport != null,
+ * activeTransport is delayedTransport (waiting to connect)
+ * - Otherwise, activeTransport is either null (initially or when idle)
+ * or points to a real transport (when connecting or connected).
+ *
+ * 'lock' must be held when assigning to it.
*/
@Nullable
- private volatile UncancellableTransportFuture activeTransportFuture;
+ private volatile ManagedClientTransport activeTransport;
+
+ @GuardedBy("lock")
+ @Nullable
+ private DelayedClientTransport delayedTransport;
TransportSet(EquivalentAddressGroup addressGroup, String authority,
LoadBalancer loadBalancer, BackoffPolicy.Provider backoffPolicyProvider,
@@ -146,30 +150,27 @@ final class TransportSet {
* Cancelling the return future has no effect. The future will never fail. If this {@code
* TransportSet} has been shut down, the returned future will have {@code null} value.
*/
+ // TODO(zhangkun83): change it to return a ClientTransport directly
final ListenableFuture obtainActiveTransport() {
- UncancellableTransportFuture savedTransportFuture = activeTransportFuture;
- if (savedTransportFuture != null) {
- return savedTransportFuture;
+ ClientTransport savedTransport = activeTransport;
+ if (savedTransport != null) {
+ return Futures.immediateFuture(savedTransport);
}
synchronized (lock) {
// Check again, since it could have changed before acquiring the lock
- if (activeTransportFuture == null) {
- // In shutdown(), activeTransportFuture is set to NULL_VALUE_FUTURE, thus if
- // activeTransportFuture is null, shutdown must be false.
- Preconditions.checkState(!shutdown, "already shutdown");
- Preconditions.checkState(activeTransportFuture == null || activeTransportFuture.isDone(),
- "activeTransportFuture is neither null nor done");
- activeTransportFuture = new UncancellableTransportFuture();
+ if (activeTransport == null && !shutdown) {
+ delayedTransport = new DelayedClientTransport();
+ transports.add(delayedTransport);
+ delayedTransport.start(new BaseTransportListener(delayedTransport));
+ activeTransport = delayedTransport;
scheduleConnection();
}
- return activeTransportFuture;
+ return Futures.immediateFuture(activeTransport);
}
}
- // Can only be called when shutdown == false
@GuardedBy("lock")
private void scheduleConnection() {
- Preconditions.checkState(!shutdown, "Already shut down");
Preconditions.checkState(reconnectTask == null || reconnectTask.isDone(),
"previous reconnectTask is not done");
@@ -184,22 +185,39 @@ final class TransportSet {
Runnable createTransportRunnable = new Runnable() {
@Override
public void run() {
+ DelayedClientTransport savedDelayedTransport;
+ ManagedClientTransport newActiveTransport;
+ boolean savedShutdown;
synchronized (lock) {
- if (shutdown) {
- return;
- }
+ savedShutdown = shutdown;
if (currentAddressIndex == headIndex) {
backoffWatch.reset().start();
}
- ClientTransport newActiveTransport =
- transportFactory.newClientTransport(address, authority);
+ newActiveTransport = transportFactory.newClientTransport(address, authority);
log.log(Level.INFO, "Created transport {0} for {1}",
new Object[] {newActiveTransport, address});
transports.add(newActiveTransport);
newActiveTransport.start(
- new TransportListener(newActiveTransport, activeTransportFuture, address));
- Preconditions.checkState(activeTransportFuture.set(newActiveTransport),
- "failed to set the new transport to the future");
+ new TransportListener(newActiveTransport, address));
+ if (shutdown) {
+ // If TransportSet already shutdown, newActiveTransport is only to take care of pending
+ // streams in delayedTransport, but will not serve new streams, and it will be shutdown
+ // as soon as it's set to the delayedTransport.
+ // activeTransport should have already been set to null by shutdown(). We keep it null.
+ Preconditions.checkState(activeTransport == null,
+ "Unexpected non-null activeTransport");
+ } else {
+ activeTransport = newActiveTransport;
+ }
+ savedDelayedTransport = delayedTransport;
+ delayedTransport = null;
+ }
+ savedDelayedTransport.setTransport(newActiveTransport);
+ // This delayed transport will terminate and be removed from transports.
+ savedDelayedTransport.shutdown();
+ if (savedShutdown) {
+ // See comments in the synchronized block above on why we shutdown here.
+ newActiveTransport.shutdown();
}
}
};
@@ -228,65 +246,90 @@ final class TransportSet {
}
/**
- * Shut down all transports, may run callback inline.
+ * Shut down all transports, stop creating new streams, but existing streams will continue.
+ *
+ * May run callback inline.
*/
final void shutdown() {
- UncancellableTransportFuture savedActiveTransportFuture;
+ ManagedClientTransport savedActiveTransport;
boolean runCallback = false;
synchronized (lock) {
if (shutdown) {
return;
}
shutdown = true;
- savedActiveTransportFuture = activeTransportFuture;
- activeTransportFuture = NULL_VALUE_FUTURE;
+ savedActiveTransport = activeTransport;
+ activeTransport = null;
if (transports.isEmpty()) {
runCallback = true;
- }
- if (reconnectTask != null) {
- reconnectTask.cancel(false);
- }
- // else: the callback will be run once all transports have been terminated
+ Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
+ Preconditions.checkState(delayedTransport == null, "Should have no delayedTransport");
+ } // else: the callback will be run once all transports have been terminated
}
- if (savedActiveTransportFuture != null) {
- if (savedActiveTransportFuture.isDone()) {
- try {
- // Should not throw any exception here
- savedActiveTransportFuture.get().shutdown();
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- } else {
- savedActiveTransportFuture.set(null);
- }
+ if (savedActiveTransport != null) {
+ savedActiveTransport.shutdown();
}
if (runCallback) {
callback.onTerminated();
}
}
- private class TransportListener implements ClientTransport.Listener {
- private final SocketAddress address;
- private final ClientTransport transport;
- private final UncancellableTransportFuture transportFuture;
+ @GuardedBy("lock")
+ private void cancelReconnectTask() {
+ if (reconnectTask != null) {
+ reconnectTask.cancel(false);
+ reconnectTask = null;
+ }
+ }
- public TransportListener(ClientTransport transport,
- UncancellableTransportFuture transportFuture, SocketAddress address) {
+ /** Shared base for both delayed and real transports. */
+ private class BaseTransportListener implements ManagedClientTransport.Listener {
+ protected final ManagedClientTransport transport;
+
+ public BaseTransportListener(ManagedClientTransport transport) {
this.transport = transport;
- this.transportFuture = transportFuture;
+ }
+
+ @Override
+ public void transportReady() {}
+
+ @Override
+ public void transportShutdown(Status status) {}
+
+ @Override
+ public void transportTerminated() {
+ boolean runCallback = false;
+ synchronized (lock) {
+ transports.remove(transport);
+ if (shutdown && transports.isEmpty()) {
+ runCallback = true;
+ cancelReconnectTask();
+ }
+ }
+ if (runCallback) {
+ callback.onTerminated();
+ }
+ }
+ }
+
+ /** Listener for real transports. */
+ private class TransportListener extends BaseTransportListener {
+ private final SocketAddress address;
+
+ public TransportListener(ManagedClientTransport transport, SocketAddress address) {
+ super(transport);
this.address = address;
}
- @GuardedBy("lock")
private boolean isAttachedToActiveTransport() {
- return activeTransportFuture == transportFuture;
+ return activeTransport == transport;
}
@Override
public void transportReady() {
+ log.log(Level.INFO, "Transport {0} for {1} is ready", new Object[] {transport, address});
+ super.transportReady();
synchronized (lock) {
- log.log(Level.INFO, "Transport {0} for {1} is ready", new Object[] {transport, address});
- Preconditions.checkState(transportFuture.isDone(), "the transport future is not done");
if (isAttachedToActiveTransport()) {
headIndex = -1;
}
@@ -296,52 +339,32 @@ final class TransportSet {
@Override
public void transportShutdown(Status s) {
+ log.log(Level.INFO, "Transport {0} for {1} is being shutdown",
+ new Object[] {transport, address});
+ super.transportShutdown(s);
synchronized (lock) {
- log.log(Level.INFO, "Transport {0} for {1} is being shutdown",
- new Object[] {transport, address});
- Preconditions.checkState(transportFuture.isDone(), "the transport future is not done");
if (isAttachedToActiveTransport()) {
- activeTransportFuture = null;
+ activeTransport = null;
}
}
+ // TODO(zhangkun83): if loadBalancer was given delayedTransport earlier, it will get the real
+ // transport's shutdown event, and loadBalancer won't be able to match the two. This beats the
+ // purpose of passing the transport. We may just remove the second argument.
loadBalancer.transportShutdown(addressGroup, transport, s);
}
@Override
public void transportTerminated() {
- boolean runCallback = false;
- synchronized (lock) {
- log.log(Level.INFO, "Transport {0} for {1} is terminated",
- new Object[] {transport, address});
- Preconditions.checkState(!isAttachedToActiveTransport(),
- "Listener is still attached to activeTransportFuture. "
- + "Seems transportTerminated was not called.");
- transports.remove(transport);
- if (shutdown && transports.isEmpty()) {
- runCallback = true;
- }
- }
- if (runCallback) {
- callback.onTerminated();
- }
+ log.log(Level.INFO, "Transport {0} for {1} is terminated",
+ new Object[] {transport, address});
+ super.transportTerminated();
+ Preconditions.checkState(!isAttachedToActiveTransport(),
+ "Listener is still attached to activeTransport. "
+ + "Seems transportTerminated was not called.");
}
}
interface Callback {
void onTerminated();
}
-
- private static class UncancellableTransportFuture extends AbstractFuture {
- @Override public boolean cancel(boolean mayInterruptIfRunning) {
- // Do not cancel.
- // A future instance is shared among multiple obtainActiveTransport() calls.
- // Since the user of the future may cancel it when it's no longer needed, cancelling for real
- // will affect other users of the same future.
- return false;
- }
-
- @Override protected boolean set(ClientTransport v) {
- return super.set(v);
- }
- }
}
diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java
new file mode 100644
index 0000000000..85bf735025
--- /dev/null
+++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java
@@ -0,0 +1,222 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.internal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import io.grpc.IntegerMarshaller;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.StringMarshaller;
+import io.grpc.internal.ClientTransport;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Unit tests for {@link DelayedClientTransport}.
+ */
+@RunWith(JUnit4.class)
+public class DelayedClientTransportTest {
+ @Mock private ManagedClientTransport.Listener transportListener;
+ @Mock private ClientTransport mockRealTransport;
+ @Mock private ClientStream mockRealStream;
+ @Mock private ClientStreamListener streamListener;
+ @Mock private ClientTransport.PingCallback pingCallback;
+ @Mock private Executor mockExecutor;
+ @Captor private ArgumentCaptor statusCaptor;
+
+ private final MethodDescriptor method = MethodDescriptor.create(
+ MethodDescriptor.MethodType.UNKNOWN, "/service/method",
+ new StringMarshaller(), new IntegerMarshaller());
+ private final Metadata headers = new Metadata();
+
+ private final DelayedClientTransport delayedTransport = new DelayedClientTransport();
+
+ @Before public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ when(mockRealTransport.newStream(same(method), same(headers))).thenReturn(mockRealStream);
+ delayedTransport.start(transportListener);
+ }
+
+ @Test public void streamStartThenSetTransport() {
+ ClientStream stream = delayedTransport.newStream(method, headers);
+ stream.start(streamListener);
+ assertEquals(1, delayedTransport.getPendingStreamsCount());
+ assertTrue(stream instanceof DelayedStream);
+ delayedTransport.setTransport(mockRealTransport);
+ assertEquals(0, delayedTransport.getPendingStreamsCount());
+ verify(mockRealTransport).newStream(same(method), same(headers));
+ verify(mockRealStream).start(same(streamListener));
+ }
+
+ @Test public void newStreamThenSetTransportThenShutdown() {
+ ClientStream stream = delayedTransport.newStream(method, headers);
+ assertEquals(1, delayedTransport.getPendingStreamsCount());
+ assertTrue(stream instanceof DelayedStream);
+ delayedTransport.setTransport(mockRealTransport);
+ assertEquals(0, delayedTransport.getPendingStreamsCount());
+ delayedTransport.shutdown();
+ verify(transportListener).transportShutdown(any(Status.class));
+ verify(transportListener).transportTerminated();
+ verify(mockRealTransport).newStream(same(method), same(headers));
+ stream.start(streamListener);
+ verify(mockRealStream).start(same(streamListener));
+ }
+
+ @Test public void transportTerminatedThenSetTransport() {
+ delayedTransport.shutdown();
+ verify(transportListener).transportShutdown(any(Status.class));
+ verify(transportListener).transportTerminated();
+ delayedTransport.setTransport(mockRealTransport);
+ verifyNoMoreInteractions(transportListener);
+ }
+
+ @Test public void setTransportThenShutdownThenNewStream() {
+ delayedTransport.setTransport(mockRealTransport);
+ delayedTransport.shutdown();
+ verify(transportListener).transportShutdown(any(Status.class));
+ verify(transportListener).transportTerminated();
+ ClientStream stream = delayedTransport.newStream(method, headers);
+ assertEquals(0, delayedTransport.getPendingStreamsCount());
+ stream.start(streamListener);
+ assertFalse(stream instanceof DelayedStream);
+ verify(mockRealTransport).newStream(same(method), same(headers));
+ verify(mockRealStream).start(same(streamListener));
+ }
+
+ @Test public void setTransportThenShutdownNowThenNewStream() {
+ delayedTransport.setTransport(mockRealTransport);
+ delayedTransport.shutdownNow(Status.UNAVAILABLE);
+ verify(transportListener).transportShutdown(any(Status.class));
+ verify(transportListener).transportTerminated();
+ ClientStream stream = delayedTransport.newStream(method, headers);
+ assertEquals(0, delayedTransport.getPendingStreamsCount());
+ stream.start(streamListener);
+ assertFalse(stream instanceof DelayedStream);
+ verify(mockRealTransport).newStream(same(method), same(headers));
+ verify(mockRealStream).start(same(streamListener));
+ }
+
+ @Test public void cancelStreamWithoutSetTransport() {
+ ClientStream stream = delayedTransport.newStream(method, new Metadata());
+ assertEquals(1, delayedTransport.getPendingStreamsCount());
+ stream.cancel(Status.CANCELLED);
+ assertEquals(0, delayedTransport.getPendingStreamsCount());
+ verifyNoMoreInteractions(mockRealTransport);
+ verifyNoMoreInteractions(mockRealStream);
+ }
+
+ @Test public void startThenCancelStreamWithoutSetTransport() {
+ ClientStream stream = delayedTransport.newStream(method, new Metadata());
+ stream.start(streamListener);
+ assertEquals(1, delayedTransport.getPendingStreamsCount());
+ stream.cancel(Status.CANCELLED);
+ assertEquals(0, delayedTransport.getPendingStreamsCount());
+ verify(streamListener).closed(same(Status.CANCELLED), any(Metadata.class));
+ verifyNoMoreInteractions(mockRealTransport);
+ verifyNoMoreInteractions(mockRealStream);
+ }
+
+ @Test public void newStreamThenShutdownTransportThenCancelStream() {
+ ClientStream stream = delayedTransport.newStream(method, new Metadata());
+ delayedTransport.shutdown();
+ verify(transportListener).transportShutdown(any(Status.class));
+ verify(transportListener, times(0)).transportTerminated();
+ assertEquals(1, delayedTransport.getPendingStreamsCount());
+ stream.cancel(Status.CANCELLED);
+ verify(transportListener).transportTerminated();
+ assertEquals(0, delayedTransport.getPendingStreamsCount());
+ verifyNoMoreInteractions(mockRealTransport);
+ verifyNoMoreInteractions(mockRealStream);
+ }
+
+ @Test public void setTransportThenShutdownThenPing() {
+ delayedTransport.setTransport(mockRealTransport);
+ delayedTransport.shutdown();
+ delayedTransport.ping(pingCallback, mockExecutor);
+ verify(mockRealTransport).ping(same(pingCallback), same(mockExecutor));
+ }
+
+ @Test public void pingThenSetTransport() {
+ delayedTransport.ping(pingCallback, mockExecutor);
+ delayedTransport.setTransport(mockRealTransport);
+ verify(mockRealTransport).ping(same(pingCallback), same(mockExecutor));
+ }
+
+ @Test public void shutdownThenNewStream() {
+ delayedTransport.shutdown();
+ verify(transportListener).transportShutdown(any(Status.class));
+ verify(transportListener).transportTerminated();
+ ClientStream stream = delayedTransport.newStream(method, new Metadata());
+ stream.start(streamListener);
+ verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
+ assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
+ }
+
+ @Test public void startStreamThenShutdownNow() {
+ ClientStream stream = delayedTransport.newStream(method, new Metadata());
+ stream.start(streamListener);
+ delayedTransport.shutdownNow(Status.UNAVAILABLE);
+ verify(transportListener).transportShutdown(any(Status.class));
+ verify(transportListener).transportTerminated();
+ verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
+ assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
+ }
+
+ @Test public void shutdownNowThenNewStream() {
+ delayedTransport.shutdownNow(Status.UNAVAILABLE);
+ verify(transportListener).transportShutdown(any(Status.class));
+ verify(transportListener).transportTerminated();
+ ClientStream stream = delayedTransport.newStream(method, new Metadata());
+ stream.start(streamListener);
+ verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
+ assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
+ }
+}
diff --git a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java
index 3b08447900..267fb81d50 100644
--- a/core/src/test/java/io/grpc/internal/DelayedStreamTest.java
+++ b/core/src/test/java/io/grpc/internal/DelayedStreamTest.java
@@ -63,7 +63,6 @@ public class DelayedStreamTest {
@Rule public final ExpectedException thrown = ExpectedException.none();
@Mock private ClientStreamListener listener;
- @Mock private ClientTransport transport;
@Mock private ClientStream realStream;
@Captor private ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class);
private DelayedStream stream = new DelayedStream();
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
index cfb4b545f9..bc13b4b175 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
@@ -117,7 +117,7 @@ public class ManagedChannelImplTest {
@Rule public final ExpectedException thrown = ExpectedException.none();
@Mock
- private ClientTransport mockTransport;
+ private ManagedClientTransport mockTransport;
@Mock
private ClientTransportFactory mockTransportFactory;
@Mock
@@ -127,8 +127,8 @@ public class ManagedChannelImplTest {
@Mock
private ClientCall.Listener mockCallListener3;
- private ArgumentCaptor transportListenerCaptor =
- ArgumentCaptor.forClass(ClientTransport.Listener.class);
+ private ArgumentCaptor transportListenerCaptor =
+ ArgumentCaptor.forClass(ManagedClientTransport.Listener.class);
private ArgumentCaptor streamListenerCaptor =
ArgumentCaptor.forClass(ClientStreamListener.class);
@@ -183,7 +183,6 @@ public class ManagedChannelImplTest {
verifyNoMoreInteractions(mockTransportFactory);
// Create transport and call
- ClientTransport mockTransport = mock(ClientTransport.class);
ClientStream mockStream = mock(ClientStream.class);
Metadata headers = new Metadata();
when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class)))
@@ -193,7 +192,7 @@ public class ManagedChannelImplTest {
verify(mockTransportFactory, timeout(1000))
.newClientTransport(same(socketAddress), eq(authority));
verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture());
- ClientTransport.Listener transportListener = transportListenerCaptor.getValue();
+ ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue();
verify(mockTransport, timeout(1000)).newStream(same(method), same(headers));
verify(mockStream).start(streamListenerCaptor.capture());
verify(mockStream).setCompressor(isA(Compressor.class));
@@ -277,7 +276,7 @@ public class ManagedChannelImplTest {
call.cancel();
verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture());
- final ClientTransport.Listener transportListener = transportListenerCaptor.getValue();
+ final ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue();
final Object lock = new Object();
final CyclicBarrier barrier = new CyclicBarrier(2);
new Thread() {
@@ -386,8 +385,8 @@ public class ManagedChannelImplTest {
final SocketAddress badAddress = new SocketAddress() {};
final ResolvedServerInfo goodServer = new ResolvedServerInfo(goodAddress, Attributes.EMPTY);
final ResolvedServerInfo badServer = new ResolvedServerInfo(badAddress, Attributes.EMPTY);
- final ClientTransport goodTransport = mock(ClientTransport.class);
- final ClientTransport badTransport = mock(ClientTransport.class);
+ final ManagedClientTransport goodTransport = mock(ManagedClientTransport.class);
+ final ManagedClientTransport badTransport = mock(ManagedClientTransport.class);
when(mockTransportFactory.newClientTransport(same(goodAddress), any(String.class)))
.thenReturn(goodTransport);
when(mockTransportFactory.newClientTransport(same(badAddress), any(String.class)))
@@ -413,8 +412,8 @@ public class ManagedChannelImplTest {
// First try should fail with the bad address.
call.start(mockCallListener, headers);
- ArgumentCaptor badTransportListenerCaptor =
- ArgumentCaptor.forClass(ClientTransport.Listener.class);
+ ArgumentCaptor badTransportListenerCaptor =
+ ArgumentCaptor.forClass(ManagedClientTransport.Listener.class);
verify(mockCallListener, timeout(1000)).onClose(same(Status.UNAVAILABLE), any(Metadata.class));
verify(badTransport, timeout(1000)).start(badTransportListenerCaptor.capture());
badTransportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE);
diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
index 443e569d87..d5b623591c 100644
--- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
+++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java
@@ -55,6 +55,7 @@ import io.grpc.LoadBalancer;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.TransportManager;
+import io.grpc.internal.TestUtils.MockClientTransportInfo;
import org.junit.After;
import org.junit.Before;
@@ -148,10 +149,10 @@ public class ManagedChannelImplTransportManagerTest {
@Test
public void createAndReuseTransport() throws Exception {
- doAnswer(new Answer() {
+ doAnswer(new Answer() {
@Override
- public ClientTransport answer(InvocationOnMock invocation) throws Throwable {
- return mock(ClientTransport.class);
+ public ManagedClientTransport answer(InvocationOnMock invocation) throws Throwable {
+ return mock(ManagedClientTransport.class);
}
}).when(mockTransportFactory).newClientTransport(any(SocketAddress.class), any(String.class));
@@ -173,8 +174,8 @@ public class ManagedChannelImplTransportManagerTest {
SocketAddress addr2 = mock(SocketAddress.class);
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(Arrays.asList(addr1, addr2));
- LinkedList listeners =
- TestUtils.captureListeners(mockTransportFactory);
+ LinkedList transports =
+ TestUtils.captureTransports(mockTransportFactory);
// Invocation counters
int backoffReset = 0;
@@ -185,7 +186,7 @@ public class ManagedChannelImplTransportManagerTest {
verify(mockTransportFactory).newClientTransport(addr1, authority);
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
// Fail the first transport, without setting it to ready
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Subsequent getTransport() will use the next address
ListenableFuture future2a = tm.getTransport(addressGroup);
@@ -197,9 +198,9 @@ public class ManagedChannelImplTransportManagerTest {
assertSame(future2a.get(), future2b.get());
assertNotSame(future1.get(), future2a.get());
// Make the second transport ready
- listeners.peek().transportReady();
+ transports.peek().listener.transportReady();
// Disconnect the second transport
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Subsequent getTransport() will use the next address, which is the first one since we have run
// out of addresses.
@@ -212,7 +213,7 @@ public class ManagedChannelImplTransportManagerTest {
verify(mockBackoffPolicy, times(0)).nextBackoffMillis();
verify(mockTransportFactory, times(2)).newClientTransport(addr1, authority);
verifyNoMoreInteractions(mockTransportFactory);
- assertEquals(1, listeners.size());
+ assertEquals(1, transports.size());
}
@Test
@@ -221,8 +222,8 @@ public class ManagedChannelImplTransportManagerTest {
SocketAddress addr2 = mock(SocketAddress.class);
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(Arrays.asList(addr1, addr2));
- LinkedList listeners =
- TestUtils.captureListeners(mockTransportFactory);
+ LinkedList transports =
+ TestUtils.captureTransports(mockTransportFactory);
// Invocation counters
int transportsAddr1 = 0;
@@ -236,9 +237,9 @@ public class ManagedChannelImplTransportManagerTest {
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Back-off policy was set initially.
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
- listeners.peek().transportReady();
+ transports.peek().listener.transportReady();
// Then close it
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Second pick fails. This is the beginning of a series of failures.
ListenableFuture future2 = tm.getTransport(addressGroup);
@@ -246,7 +247,7 @@ public class ManagedChannelImplTransportManagerTest {
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Back-off policy was reset.
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Third pick fails too
ListenableFuture future3 = tm.getTransport(addressGroup);
@@ -254,7 +255,7 @@ public class ManagedChannelImplTransportManagerTest {
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Back-off policy was not reset.
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Forth pick is on addr2, back-off policy kicks in.
ListenableFuture future4 = tm.getTransport(addressGroup);
diff --git a/core/src/test/java/io/grpc/internal/TestUtils.java b/core/src/test/java/io/grpc/internal/TestUtils.java
index 3d74d510cf..fcc1979c0a 100644
--- a/core/src/test/java/io/grpc/internal/TestUtils.java
+++ b/core/src/test/java/io/grpc/internal/TestUtils.java
@@ -34,6 +34,10 @@ package io.grpc.internal;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -46,31 +50,53 @@ import java.util.LinkedList;
*/
final class TestUtils {
- /**
- * Stub the given mock {@link ClientTransportFactory} by returning mock {@link ClientTransport}s
- * which saves their listeners to a list which is returned by this method.
- */
- static LinkedList captureListeners(
- ClientTransportFactory mockTransportFactory) {
- final LinkedList listeners =
- new LinkedList();
+ static class MockClientTransportInfo {
+ /**
+ * A mock transport created by the mock transport factory.
+ */
+ final ManagedClientTransport transport;
- doAnswer(new Answer() {
+ /**
+ * The listener passed to the start() of the mock transport.
+ */
+ final ManagedClientTransport.Listener listener;
+
+ MockClientTransportInfo(ManagedClientTransport transport,
+ ManagedClientTransport.Listener listener) {
+ this.transport = transport;
+ this.listener = listener;
+ }
+ }
+
+ /**
+ * Stub the given mock {@link ClientTransportFactory} by returning mock
+ * {@link ManagedClientTransport}s which saves their listeners along with them. This method
+ * returns a list of {@link MockClientTransportInfo}, each of which is a started mock transport
+ * and its listener.
+ */
+ static LinkedList captureTransports(
+ ClientTransportFactory mockTransportFactory) {
+ final LinkedList captor = new LinkedList();
+
+ doAnswer(new Answer() {
@Override
- public ClientTransport answer(InvocationOnMock invocation) throws Throwable {
- ClientTransport mockTransport = mock(ClientTransport.class);
+ public ManagedClientTransport answer(InvocationOnMock invocation) throws Throwable {
+ final ManagedClientTransport mockTransport = mock(ManagedClientTransport.class);
+ when(mockTransport.newStream(any(MethodDescriptor.class), any(Metadata.class)))
+ .thenReturn(mock(ClientStream.class));
// Save the listener
doAnswer(new Answer() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
- listeners.add((ClientTransport.Listener) invocation.getArguments()[0]);
+ captor.add(new MockClientTransportInfo(
+ mockTransport, (ManagedClientTransport.Listener) invocation.getArguments()[0]));
return null;
}
- }).when(mockTransport).start(any(ClientTransport.Listener.class));
+ }).when(mockTransport).start(any(ManagedClientTransport.Listener.class));
return mockTransport;
}
}).when(mockTransportFactory).newClientTransport(any(SocketAddress.class), any(String.class));
- return listeners;
+ return captor;
}
}
diff --git a/core/src/test/java/io/grpc/internal/TransportSetTest.java b/core/src/test/java/io/grpc/internal/TransportSetTest.java
index 6ebc58caaf..0d1be9c2ed 100644
--- a/core/src/test/java/io/grpc/internal/TransportSetTest.java
+++ b/core/src/test/java/io/grpc/internal/TransportSetTest.java
@@ -31,21 +31,29 @@
package io.grpc.internal;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.same;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.EquivalentAddressGroup;
+import io.grpc.IntegerMarshaller;
import io.grpc.LoadBalancer;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
import io.grpc.Status;
+import io.grpc.StringMarshaller;
+import io.grpc.internal.TestUtils.MockClientTransportInfo;
import org.junit.Before;
import org.junit.Test;
@@ -77,10 +85,16 @@ public class TransportSetTest {
@Mock private BackoffPolicy.Provider mockBackoffPolicyProvider;
@Mock private ClientTransportFactory mockTransportFactory;
@Mock private TransportSet.Callback mockTransportSetCallback;
+ @Mock private ClientStreamListener mockStreamListener;
+
+ private final MethodDescriptor method = MethodDescriptor.create(
+ MethodDescriptor.MethodType.UNKNOWN, "/service/method",
+ new StringMarshaller(), new IntegerMarshaller());
+ private final Metadata headers = new Metadata();
private TransportSet transportSet;
private EquivalentAddressGroup addressGroup;
- private LinkedList listeners;
+ private LinkedList transports;
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
@@ -91,7 +105,7 @@ public class TransportSetTest {
when(mockBackoffPolicy1.nextBackoffMillis()).thenReturn(10L, 100L);
when(mockBackoffPolicy2.nextBackoffMillis()).thenReturn(10L, 100L);
when(mockBackoffPolicy3.nextBackoffMillis()).thenReturn(10L, 100L);
- listeners = TestUtils.captureListeners(mockTransportFactory);
+ transports = TestUtils.captureTransports(mockTransportFactory);
}
@Test public void singleAddressBackoff() {
@@ -109,7 +123,7 @@ public class TransportSetTest {
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Fail this one
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Second attempt uses the first back-off value interval.
transportSet.obtainActiveTransport();
@@ -121,7 +135,7 @@ public class TransportSetTest {
fakeClock.forwardMillis(1);
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Fail this one too
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Third attempt uses the second back-off interval.
transportSet.obtainActiveTransport();
@@ -133,9 +147,9 @@ public class TransportSetTest {
fakeClock.forwardMillis(1);
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Let this one succeed
- listeners.peek().transportReady();
+ transports.peek().listener.transportReady();
// And close it
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Back-off is reset, and the next attempt will happen immediately
transportSet.obtainActiveTransport();
@@ -165,9 +179,9 @@ public class TransportSetTest {
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Let this one through
- listeners.peek().transportReady();
+ transports.peek().listener.transportReady();
// Then shut it down
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
////// Now start a series of failing attempts, where addr2 is the head.
@@ -176,14 +190,14 @@ public class TransportSetTest {
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Second attempt will start immediately. Keep back-off policy.
transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Fail this one too
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Third attempt is on head, thus controlled by the first back-off interval.
transportSet.obtainActiveTransport();
@@ -194,14 +208,14 @@ public class TransportSetTest {
fakeClock.forwardMillis(1);
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one too
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Forth attempt will start immediately. Keep back-off policy.
transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Fail this one too
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Fifth attempt is on head, thus controlled by the second back-off interval.
transportSet.obtainActiveTransport();
@@ -212,9 +226,9 @@ public class TransportSetTest {
fakeClock.forwardMillis(1);
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Let it through
- listeners.peek().transportReady();
+ transports.peek().listener.transportReady();
// Then close it.
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
////// Now start a series of failing attempts, where addr1 is the head.
@@ -223,14 +237,14 @@ public class TransportSetTest {
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Fail this one
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Second attempt will start immediately. Keep back-off policy.
transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one too
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Third attempt is on head, thus controlled by the first back-off interval.
transportSet.obtainActiveTransport();
@@ -263,7 +277,7 @@ public class TransportSetTest {
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Fail this one
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Won't reconnect until requested, even if back-off time has expired
fakeClock.forwardMillis(10);
@@ -274,7 +288,7 @@ public class TransportSetTest {
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Fail this one, too
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Request immediately, but will wait for back-off before reconnecting
assertNotNull(transportSet.obtainActiveTransport());
@@ -284,7 +298,7 @@ public class TransportSetTest {
}
@Test
- public void shutdownBeforeTransportCreated() throws Exception {
+ public void shutdownBeforeTransportCreatedWithPendingStream() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createTransortSet(addr);
@@ -294,17 +308,79 @@ public class TransportSetTest {
assertTrue(pick.isDone());
assertNotNull(pick.get());
// Fail this one
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ MockClientTransportInfo transportInfo = transports.poll();
+ transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
+ transportInfo.listener.transportTerminated();
// Second transport will wait for back-off
pick = transportSet.obtainActiveTransport();
- assertFalse(pick.isDone());
+ assertTrue(pick.isDone());
+ assertTrue(pick.get() instanceof DelayedClientTransport);
+ // Start a stream, which will be pending in the delayed transport
+ ClientStream pendingStream = pick.get().newStream(method, headers);
+ pendingStream.start(mockStreamListener);
- // Shut down TransportSet before the transort is created.
+ // Shut down TransportSet before the transport is created. Further call to
+ // obtainActiveTransport() gets null results.
transportSet.shutdown();
+ pick = transportSet.obtainActiveTransport();
assertTrue(pick.isDone());
assertNull(pick.get());
verify(mockTransportFactory).newClientTransport(addr, authority);
+
+ // Reconnect will eventually happen, even though TransportSet has been shut down
+ fakeClock.forwardMillis(10);
+ verify(mockTransportFactory, times(2)).newClientTransport(addr, authority);
+ // The pending stream will be started on this newly started transport, which is promptly shut
+ // down by TransportSet right after the stream is created.
+ transportInfo = transports.poll();
+ verify(transportInfo.transport).newStream(same(method), same(headers));
+ verify(transportInfo.transport).shutdown();
+ verify(mockTransportSetCallback, never()).onTerminated();
+ // Terminating the transport will let TransportSet to be terminated.
+ transportInfo.listener.transportTerminated();
+ verify(mockTransportSetCallback).onTerminated();
+
+ // No more transports will be created.
+ fakeClock.forwardMillis(10000);
+ verifyNoMoreInteractions(mockTransportFactory);
+ assertEquals(0, transports.size());
+ }
+
+ @Test
+ public void shutdownBeforeTransportCreatedWithoutPendingStream() throws Exception {
+ SocketAddress addr = mock(SocketAddress.class);
+ createTransortSet(addr);
+
+ // First transport is created immediately
+ ListenableFuture pick = transportSet.obtainActiveTransport();
+ verify(mockTransportFactory).newClientTransport(addr, authority);
+ assertTrue(pick.isDone());
+ assertNotNull(pick.get());
+ // Fail this one
+ MockClientTransportInfo transportInfo = transports.poll();
+ transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
+ transportInfo.listener.transportTerminated();
+
+ // Second transport will wait for back-off
+ pick = transportSet.obtainActiveTransport();
+ assertTrue(pick.isDone());
+ assertTrue(pick.get() instanceof DelayedClientTransport);
+
+ // Shut down TransportSet before the transport is created. Futher call to
+ // obtainActiveTransport() gets null results.
+ transportSet.shutdown();
+ pick = transportSet.obtainActiveTransport();
+ assertTrue(pick.isDone());
+ assertNull(pick.get());
+
+ // TransportSet terminated promptly.
+ verify(mockTransportSetCallback).onTerminated();
+
+ // No more transports will be created.
+ fakeClock.forwardMillis(10000);
+ verifyNoMoreInteractions(mockTransportFactory);
+ assertEquals(0, transports.size());
}
@Test
@@ -321,26 +397,24 @@ public class TransportSetTest {
}
@Test
- public void cancellingTransportFutureWontAffectOtherCallers() {
+ public void futuresAreAlwaysComplete() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createTransortSet(addr);
// Fail the first pick so that the next pick will be pending on back-off
ListenableFuture future0 = transportSet.obtainActiveTransport();
assertTrue(future0.isDone());
- listeners.poll().transportShutdown(Status.UNAVAILABLE);
+ transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
ListenableFuture future1 = transportSet.obtainActiveTransport();
ListenableFuture future2 = transportSet.obtainActiveTransport();
// These futures are pending on back-off
- assertFalse(future1.isDone());
- assertFalse(future2.isDone());
+ assertTrue(future1.isDone());
+ assertTrue(future2.isDone());
- // Cancel future1
- future1.cancel(false);
- // future2 is not affected. future1 can either be really cancelled or not, which we don't care.
- assertFalse(future2.isDone());
+ assertTrue(future1.get() instanceof DelayedClientTransport);
+ assertTrue(future2.get() instanceof DelayedClientTransport);
}
private void createTransortSet(SocketAddress ... addrs) {
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
index 1d5ca47d73..9420c096d3 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
@@ -43,9 +43,9 @@ import io.grpc.Internal;
import io.grpc.NameResolver;
import io.grpc.internal.AbstractManagedChannelImplBuilder;
import io.grpc.internal.AbstractReferenceCounted;
-import io.grpc.internal.ClientTransport;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.SharedResourceHolder;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
@@ -304,7 +304,8 @@ public class NettyChannelBuilder extends AbstractManagedChannelImplBuilder 0, "maxHeaderListSize must be positive");
@@ -131,7 +131,7 @@ class NettyClientHandler extends AbstractNettyHandler {
static NettyClientHandler newHandler(Http2Connection connection,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
- final ClientTransport.Listener listener,
+ final ManagedClientTransport.Listener listener,
int flowControlWindow,
Ticker ticker) {
Preconditions.checkNotNull(connection, "connection");
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
index 35ca3088e3..8b0bf00662 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
@@ -40,7 +40,7 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.ClientStream;
-import io.grpc.internal.ClientTransport;
+import io.grpc.internal.ManagedClientTransport;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -56,9 +56,9 @@ import java.util.concurrent.Executor;
import javax.annotation.concurrent.GuardedBy;
/**
- * A Netty-based {@link ClientTransport} implementation.
+ * A Netty-based {@link ManagedClientTransport} implementation.
*/
-class NettyClientTransport implements ClientTransport {
+class NettyClientTransport implements ManagedClientTransport {
private final SocketAddress address;
private final Class extends Channel> channelType;
private final EventLoopGroup group;
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index 6898eaaf71..06c93e9171 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -50,8 +50,8 @@ import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
-import io.grpc.internal.ClientTransport;
import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerStreamListener;
@@ -92,7 +92,7 @@ import java.util.concurrent.TimeoutException;
public class NettyClientTransportTest {
@Mock
- private ClientTransport.Listener clientTransportListener;
+ private ManagedClientTransport.Listener clientTransportListener;
private final List transports = new ArrayList();
private NioEventLoopGroup group;
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
index 25426529aa..c737a94952 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpChannelBuilder.java
@@ -46,9 +46,9 @@ import io.grpc.ExperimentalApi;
import io.grpc.NameResolver;
import io.grpc.internal.AbstractManagedChannelImplBuilder;
import io.grpc.internal.AbstractReferenceCounted;
-import io.grpc.internal.ClientTransport;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.SharedResourceHolder.Resource;
@@ -255,7 +255,7 @@ public class OkHttpChannelBuilder extends
}
@Override
- public ClientTransport newClientTransport(SocketAddress addr, String authority) {
+ public ManagedClientTransport newClientTransport(SocketAddress addr, String authority) {
InetSocketAddress inetSocketAddr = (InetSocketAddress) addr;
return new OkHttpClientTransport(inetSocketAddr, authority, executor, socketFactory,
Utils.convertSpec(connectionSpec), maxMessageSize);
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
index 33c4725bd7..61583c2c1c 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
@@ -44,9 +44,9 @@ import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.Status.Code;
-import io.grpc.internal.ClientTransport;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.Http2Ping;
+import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.SerializingExecutor;
import io.grpc.okhttp.internal.ConnectionSpec;
import io.grpc.okhttp.internal.framed.ErrorCode;
@@ -84,9 +84,9 @@ import javax.annotation.concurrent.GuardedBy;
import javax.net.ssl.SSLSocketFactory;
/**
- * A okhttp-based {@link ClientTransport} implementation.
+ * A okhttp-based {@link ManagedClientTransport} implementation.
*/
-class OkHttpClientTransport implements ClientTransport {
+class OkHttpClientTransport implements ManagedClientTransport {
private static final Map ERROR_CODE_TO_STATUS;
private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
private static final OkHttpClientStream[] EMPTY_STREAM_ARRAY = new OkHttpClientStream[0];
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
index fa6174436f..66be3357ad 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
@@ -73,6 +73,7 @@ import io.grpc.internal.AbstractStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.ManagedClientTransport;
import io.grpc.okhttp.OkHttpClientTransport.ClientFrameHandler;
import io.grpc.okhttp.internal.ConnectionSpec;
import io.grpc.okhttp.internal.framed.ErrorCode;
@@ -131,7 +132,7 @@ public class OkHttpClientTransportTest {
@Mock
MethodDescriptor, ?> method;
@Mock
- private ClientTransport.Listener transportListener;
+ private ManagedClientTransport.Listener transportListener;
private OkHttpClientTransport clientTransport;
private MockFrameReader frameReader;
private ExecutorService executor;
@@ -1323,7 +1324,7 @@ public class OkHttpClientTransportTest {
ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE);
- ClientTransport.Listener listener = mock(ClientTransport.Listener.class);
+ ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
clientTransport.start(listener);
ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class);
verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture());