DelayedClientTransport and fix TransportSet.shutdown() semantics.

Always return a completed future from `TransportSet`. If a (real) transport has not been created (e.g., in reconnect back-off), a `DelayedClientTransport` will be returned.

Eventually we will get rid of the transport futures everywhere, and have streams always __owned__ by some transports.

DelayedClientTransport
----------------------

After we get rid of the transport future, this is what `ClientCallImpl` and `LoadBalancer` get when a real transport has not been created yet. It buffers new streams and pings until `setTransport()` is called, after which point all buffered and future streams/pings are transferred to the real transport.

If a buffered stream is cancelled, `DelayedClientTransport` will remove it from the buffer list, thus #1342 will be resolved after the larger refactoring is complete.

This PR only makes `TransportSet` use `DelayedClientTransport`. Follow-up changes will be made to allow `LoadBalancer.pickTransport()` to return null, in which case `ManagedChannelImpl` will give `ClientCallImpl` a `DelayedClientTransport`.

Changes to ClientTransport shutdown semantics
---------------------------------------------

Previously when shutdown() is called, `ClientTransport` should not accept newStream(), and when all existing streams have been closed, `ClientTransport` is terminated. Only when a transport is terminated would a transport owner (e.g., `TransportSet`) remove the reference to it.

`DelayedClientTransport` brings about a new case: when `setTransport()` is called, we switch to the real transport and no longer need the delayed transport. This is achieved by calling `shutdown()` on the delayed transport and letting it terminate. However, as the delayed transport has already been handed out to users, we would like `newStream()` to keep working for them, even though the delayed transport is already shut down and terminated.

In order to make it easy to manage the life-cycle of `DelayedClientTransport`, we redefine the shutdown semantics of transport:
- A transport can own a stream. Typically the transport owns the streams
  it creates, but there may be exceptions. `DelayedClientTransport` DOES
  NOT OWN the streams it returns from `newStream()` after `setTransport()`
  has been called. Instead, the ownership would be transferred to the
  real transport.
- After `shutdown()` has been called, the transport stops owning new
  streams, and `newStream()` may still succeed. With this idea,
  `DelayedClientTransport`, even when terminated, will continue
  passing `newStream()` to the real transport.
- When a transport is in shutdown state, and it doesn't own any stream,
  it then can enter terminated state.

ManagedClientTransport / ClientTransport
----------------------------------------

Remove life-cycle interfaces from `ClientTransport`, and put them in its subclass - `ManagedClientTransport`, with the same idea that we have `Channel` and `ManagedChannel`. Only the one who creates the transport will get `ManagedClientTransport` thus is able to start and shutdown the transport. The users of transport, e.g., `LoadBalancer`, can only get `ClientTransport` thus are not alter its state. This change clarifies the responsibility of transport life-cycle management.

Fix TransportSet shutdown semantics
-----------------------------------

Currently, if `TransportSet.shutdown()` has been called, it no longer create new transports, which is wrong.

The correct semantics of `TransportSet.shutdown()` should be:
- Shutdown all transports, thus stop new streams being created on them
- Stop `obtainActiveTransport()` from returning transports
- Streams that already created, including those buffered in delayed transport, should continue. That means if delayed transport has buffered streams, we should let the existing reconnect task continue.
This commit is contained in:
Kun Zhang 2016-01-29 17:25:42 -08:00
parent 544cd3a33b
commit cf787bddf2
22 changed files with 932 additions and 260 deletions

View File

@ -36,8 +36,8 @@ import com.google.common.base.Preconditions;
import io.grpc.ExperimentalApi;
import io.grpc.internal.AbstractManagedChannelImplBuilder;
import io.grpc.internal.AbstractReferenceCounted;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.ManagedClientTransport;
import java.net.SocketAddress;
@ -89,7 +89,7 @@ public class InProcessChannelBuilder extends
}
@Override
public ClientTransport newClientTransport(SocketAddress addr, String authority) {
public ManagedClientTransport newClientTransport(SocketAddress addr, String authority) {
return new InProcessTransport(name);
}

View File

@ -40,7 +40,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.NoopClientStream;
import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerStreamListener;
@ -59,12 +59,12 @@ import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
class InProcessTransport implements ServerTransport, ClientTransport {
class InProcessTransport implements ServerTransport, ManagedClientTransport {
private static final Logger log = Logger.getLogger(InProcessTransport.class.getName());
private final String name;
private ServerTransportListener serverTransportListener;
private ClientTransport.Listener clientTransportListener;
private ManagedClientTransport.Listener clientTransportListener;
@GuardedBy("this")
private boolean shutdown;
@GuardedBy("this")
@ -79,7 +79,7 @@ class InProcessTransport implements ServerTransport, ClientTransport {
}
@Override
public synchronized void start(ClientTransport.Listener listener) {
public synchronized void start(ManagedClientTransport.Listener listener) {
this.clientTransportListener = listener;
InProcessServer server = InProcessServer.findServer(name);
if (server != null) {
@ -152,7 +152,7 @@ class InProcessTransport implements ServerTransport, ClientTransport {
@Override
public synchronized void shutdown() {
// Can be called multiple times: once for ClientTransport, once for ServerTransport.
// Can be called multiple times: once for ManagedClientTransport, once for ServerTransport.
if (shutdown) {
return;
}

View File

@ -230,7 +230,8 @@ public abstract class AbstractManagedChannelImplBuilder
}
@Override
public ClientTransport newClientTransport(SocketAddress serverAddress, String authority) {
public ManagedClientTransport newClientTransport(SocketAddress serverAddress,
String authority) {
return factory.newClientTransport(
serverAddress, authorityOverride != null ? authorityOverride : authority);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
@ -33,25 +33,21 @@ package io.grpc.internal;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.ThreadSafe;
/**
* The client-side transport encapsulating a single connection to a remote server. Allows creation
* of new {@link Stream} instances for communication with the server. All methods on the transport
* and its listener are expected to execute quickly.
*
* <p>{@link #start} must be the first method call to this interface and return before calling other
* methods.
* The client-side transport typically encapsulating a single connection to a remote
* server. However, streams created before the client has discovered any server address may
* eventually be issued on different connections. All methods on the transport and its callbacks
* are expected to execute quickly.
*/
@ThreadSafe
public interface ClientTransport {
/**
* Creates a new stream for sending messages to the remote end-point.
* Creates a new stream for sending messages to a remote end-point.
*
* <p>
* This method returns immediately and does not wait for any validation of the request. If
@ -67,64 +63,18 @@ public interface ClientTransport {
ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers);
/**
* Starts transport. This method may only be called once.
* Pings a remote endpoint. When an acknowledgement is received, the given callback will be
* invoked using the given executor.
*
* <p>Implementations must not call {@code listener} from within {@link #start}; implementations
* are expected to notify listener on a separate thread. This method should not throw any
* exceptions.
*
* @param listener non-{@code null} listener of transport events
*/
void start(Listener listener);
/**
* Pings the remote endpoint to verify that the transport is still active. When an acknowledgement
* is received, the given callback will be invoked using the given executor.
* <p>Pings are not necessarily sent to the same endpont, thus a successful ping only means at
* least one endpoint responded, but doesn't imply the availability of other endpoints (if there
* is any).
*
* <p>This is an optional method. Transports that do not have any mechanism by which to ping the
* remote endpoint may throw {@link UnsupportedOperationException}.
*/
void ping(PingCallback callback, Executor executor);
/**
* Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will
* fail (once {@link Listener#transportShutdown} callback called). This method may only be called
* once.
*/
void shutdown();
/**
* Receives notifications for the transport life-cycle events. Implementation does not need to be
* thread-safe, so notifications must be properly sychronized externally.
*/
interface Listener {
/**
* The transport is shutting down. No new streams will be processed, but existing streams may
* continue. Shutdown could have been caused by an error or normal operation. It is possible
* that this method is called without {@link #shutdown} being called. If the argument to this
* function is {@link Status#isOk}, it is safe to immediately reconnect.
*
* <p>This is called exactly once, and must be called prior to {@link #transportTerminated}.
*
* @param s the reason for the shutdown.
*/
void transportShutdown(Status s);
/**
* The transport completed shutting down. All resources have been released.
*
* <p>This is called exactly once, and must be called after {@link #transportShutdown} has been
* called.
*/
void transportTerminated();
/**
* The transport is ready to accept traffic, because the connection is established. This is
* called at most once.
*/
void transportReady();
}
/**
* A callback that is invoked when the acknowledgement to a {@link #ping} is received. Exactly one
* of the two methods should be called per {@link #ping}.

View File

@ -33,7 +33,7 @@ package io.grpc.internal;
import java.net.SocketAddress;
/** Pre-configured factory for creating {@link ClientTransport} instances. */
/** Pre-configured factory for creating {@link ManagedClientTransport} instances. */
public interface ClientTransportFactory extends ReferenceCounted {
/**
* Creates an unstarted transport for exclusive use.
@ -41,5 +41,5 @@ public interface ClientTransportFactory extends ReferenceCounted {
* @param serverAddress the address that the transport is connected to
* @param authority the HTTP/2 authority of the server
*/
ClientTransport newClientTransport(SocketAddress serverAddress, String authority);
ManagedClientTransport newClientTransport(SocketAddress serverAddress, String authority);
}

View File

@ -0,0 +1,268 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.GuardedBy;
/**
* A client transport that queues requests before a real transport is available. When a backing
* transport is later provided, this class delegates to it.
*
* <p>This transport owns the streams that it has created before {@link #setTransport} is
* called. When {@link #setTransport} is called, the ownership of pending streams and subsequent new
* streams are transferred to the given transport, thus this transport won't own any stream.
*/
class DelayedClientTransport implements ManagedClientTransport {
private final Object lock = new Object();
private Listener listener;
/** 'lock' must be held when assigning to delegate. */
private volatile ClientTransport delegate;
@GuardedBy("lock")
private Collection<PendingStream> pendingStreams = new HashSet<PendingStream>();
@GuardedBy("lock")
private Collection<PendingPing> pendingPings = new ArrayList<PendingPing>();
/**
* When shutdown == true and pendingStreams == null, then the transport is considered terminated.
*/
@GuardedBy("lock")
private boolean shutdown;
@Override
public void start(Listener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");
}
@Override
public ClientStream newStream(MethodDescriptor<?, ?> method, Metadata headers) {
ClientTransport transport = delegate;
if (transport != null) {
return transport.newStream(method, headers);
}
synchronized (lock) {
// Check again, since it may have changed while waiting for lock
transport = delegate;
if (transport != null) {
return transport.newStream(method, headers);
}
if (!shutdown) {
PendingStream pendingStream = new PendingStream(method, headers);
pendingStreams.add(pendingStream);
return pendingStream;
}
}
DelayedStream stream = new DelayedStream();
stream.setError(Status.UNAVAILABLE.withDescription("transport shutdown"));
return stream;
}
public void ping(final PingCallback callback, Executor executor) {
ClientTransport transport = delegate;
if (transport != null) {
transport.ping(callback, executor);
return;
}
synchronized (lock) {
// Check again, since it may have changed while waiting for lock
transport = delegate;
if (transport != null) {
transport.ping(callback, executor);
return;
}
if (!shutdown) {
PendingPing pendingPing = new PendingPing(callback, executor);
pendingPings.add(pendingPing);
return;
}
}
executor.execute(new Runnable() {
@Override public void run() {
callback.onFailure(
Status.UNAVAILABLE.withDescription("transport shutdown").asException());
}
});
}
/**
* Prevents creating any new streams until {@link #setTransport} is called. Buffered streams are
* not failed, so if {@link #shutdown} is called when {@link #setTransport} has not been called,
* you still need to call {@link setTransport} to make this transport terminated.
*/
@Override
public void shutdown() {
synchronized (lock) {
if (shutdown) {
return;
}
shutdown = true;
listener.transportShutdown(
Status.OK.withDescription("Channel requested transport to shut down"));
if (pendingStreams == null || pendingStreams.isEmpty()) {
pendingStreams = null;
listener.transportTerminated();
}
}
}
/**
* Shuts down this transport and cancels all streams that it owns, hence immediately terminates
* this transport.
*/
public void shutdownNow(Status status) {
shutdown();
Collection<PendingStream> savedPendingStreams = null;
synchronized (lock) {
if (pendingStreams != null) {
savedPendingStreams = pendingStreams;
pendingStreams = null;
}
}
if (savedPendingStreams != null) {
for (PendingStream stream : savedPendingStreams) {
stream.setError(status);
}
listener.transportTerminated();
}
// If savedPendingStreams == null, transportTerminated() has already been called in shutdown().
}
/**
* Transfers all the pending and future requests and pings to the given transport.
*
* <p>May only be called after {@link #start(Listener)}.
*
* <p>{@code transport} will be used for all future calls to {@link #newStream}, even if this
* transport is {@link #shutdown}.
*/
public void setTransport(ClientTransport transport) {
Collection<PendingStream> savedPendingStreams;
synchronized (lock) {
Preconditions.checkState(delegate == null, "setTransport already called");
Preconditions.checkState(listener != null, "start() not called");
delegate = Preconditions.checkNotNull(transport, "transport");
for (PendingPing ping : pendingPings) {
ping.createRealPing(transport);
}
pendingPings = null;
if (shutdown && pendingStreams != null) {
listener.transportTerminated();
}
savedPendingStreams = pendingStreams;
pendingStreams = null;
if (!shutdown) {
listener.transportReady();
}
}
// createRealStream may be expensive, so we run it outside of the lock. It will start real
// streams on the transport. If there are pending requests, they will be serialized too, which
// may be expensive.
// TODO(zhangkun83): may consider doing it in a different thread.
if (savedPendingStreams != null) {
for (PendingStream stream : savedPendingStreams) {
stream.createRealStream(transport);
}
}
}
@VisibleForTesting
int getPendingStreamsCount() {
synchronized (lock) {
return pendingStreams == null ? 0 : pendingStreams.size();
}
}
private class PendingStream extends DelayedStream {
private final MethodDescriptor<?, ?> method;
private final Metadata headers;
private PendingStream(MethodDescriptor<?, ?> method, Metadata headers) {
this.method = method;
this.headers = headers;
}
private void createRealStream(ClientTransport transport) {
setStream(transport.newStream(method, headers));
}
// TODO(zhangkun83): DelayedStream.setError() doesn't have a clearly-defined semantic to be
// overriden. Make it clear or find another method to override.
@Override
void setError(Status reason) {
synchronized (lock) {
if (pendingStreams != null) {
pendingStreams.remove(this);
if (shutdown && pendingStreams.isEmpty()) {
pendingStreams = null;
listener.transportTerminated();
}
}
}
super.setError(reason);
}
}
private static class PendingPing {
private final PingCallback callback;
private final Executor executor;
public PendingPing(PingCallback callback, Executor executor) {
this.callback = callback;
this.executor = executor;
}
public void createRealPing(ClientTransport transport) {
try {
transport.ping(callback, executor);
} catch (final UnsupportedOperationException ex) {
executor.execute(new Runnable() {
@Override
public void run() {
callback.onFailure(ex);
}
});
}
}
}
}

View File

@ -53,6 +53,8 @@ import javax.annotation.concurrent.GuardedBy;
* DelayedStream} may be internally altered by different threads, thus internal synchronization is
* necessary.
*/
// TODO(zhangkun83): merge it with DelayedClientTransport.PendingStream as it will be no longer
// needed by ClientCallImpl as we move away from ListenableFuture<ClientTransport>
class DelayedStream implements ClientStream {
// set to non null once both listener and realStream are valid. After this point it is safe

View File

@ -0,0 +1,105 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import io.grpc.Status;
import javax.annotation.concurrent.ThreadSafe;
/**
* A {@link ClientTransport} that has life-cycle management.
*
* <p>{@link #start} must be the first method call to this interface and return before calling other
* methods.
*
* <p>Typically the transport owns the streams it creates through {@link #newStream}, while some
* implementations may transfer the streams to somewhere else. Either way they must conform to the
* contract defined by {@link #shutdown}, {@link Listener#transportShutdown} and
* {@link Listener#transportTerminated}.
*/
@ThreadSafe
public interface ManagedClientTransport extends ClientTransport {
/**
* Starts transport. This method may only be called once.
*
* <p>Implementations must not call {@code listener} from within {@link #start}; implementations
* are expected to notify listener on a separate thread. This method should not throw any
* exceptions.
*
* @param listener non-{@code null} listener of transport events
*/
void start(Listener listener);
/**
* Initiates an orderly shutdown of the transport. Existing streams continue, but the transport
* will not own any new streams. New streams will either fail (once
* {@link Listener#transportShutdown} callback called), or be transferred off this transport (in
* which case they may succeed). This method may only be called once.
*/
void shutdown();
/**
* Receives notifications for the transport life-cycle events. Implementation does not need to be
* thread-safe, so notifications must be properly sychronized externally.
*/
interface Listener {
/**
* The transport is shutting down. This transport will stop owning new streams, but existing
* streams may continue, and the transport may still be able to process {@link #newStream} as
* long as it doesn't own the new streams. Shutdown could have been caused by an error or normal
* operation. It is possible that this method is called without {@link #shutdown} being called.
* If the argument to this function is {@link Status#isOk}, it is safe to immediately reconnect.
*
* <p>This is called exactly once, and must be called prior to {@link #transportTerminated}.
*
* @param s the reason for the shutdown.
*/
void transportShutdown(Status s);
/**
* The transport completed shutting down. All resources have been released. All streams have
* either been closed or transferred off this transport. This transport may still be able to
* process {@link #newStream} as long as it doesn't own the new streams.
*
* <p>This is called exactly once, and must be called after {@link #transportShutdown} has been
* called.
*/
void transportTerminated();
/**
* The transport is ready to accept traffic, because the connection is established. This is
* called at most once.
*/
void transportReady();
}
}

View File

@ -34,8 +34,7 @@ package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.EquivalentAddressGroup;
@ -62,12 +61,6 @@ import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
final class TransportSet {
private static final Logger log = Logger.getLogger(TransportSet.class.getName());
private static final UncancellableTransportFuture NULL_VALUE_FUTURE;
static {
NULL_VALUE_FUTURE = new UncancellableTransportFuture();
NULL_VALUE_FUTURE.set(null);
}
private final Object lock = new Object();
private final EquivalentAddressGroup addressGroup;
@ -98,24 +91,35 @@ final class TransportSet {
private ScheduledFuture<?> reconnectTask;
/**
* All transports that are not stopped. At the very least the value of {@link
* activeTransportFuture} will be present, but previously used transports that still have streams
* or are stopping may also be present.
* All transports that are not terminated. At the very least the value of {@link activeTransport}
* will be present, but previously used transports that still have streams or are stopping may
* also be present.
*/
@GuardedBy("lock")
private final Collection<ClientTransport> transports = new ArrayList<ClientTransport>();
private final Collection<ManagedClientTransport> transports =
new ArrayList<ManagedClientTransport>();
private final LoadBalancer<ClientTransport> loadBalancer;
@GuardedBy("lock")
private boolean shutdown;
/**
* The future for the transport for new outgoing requests. 'lock' must be held when assigning
* to it.
/*
* The transport for new outgoing requests.
* - If shutdown == true, activeTransport is null (shutdown)
* - Otherwise, if delayedTransport != null,
* activeTransport is delayedTransport (waiting to connect)
* - Otherwise, activeTransport is either null (initially or when idle)
* or points to a real transport (when connecting or connected).
*
* 'lock' must be held when assigning to it.
*/
@Nullable
private volatile UncancellableTransportFuture activeTransportFuture;
private volatile ManagedClientTransport activeTransport;
@GuardedBy("lock")
@Nullable
private DelayedClientTransport delayedTransport;
TransportSet(EquivalentAddressGroup addressGroup, String authority,
LoadBalancer<ClientTransport> loadBalancer, BackoffPolicy.Provider backoffPolicyProvider,
@ -146,30 +150,27 @@ final class TransportSet {
* <p>Cancelling the return future has no effect. The future will never fail. If this {@code
* TransportSet} has been shut down, the returned future will have {@code null} value.
*/
// TODO(zhangkun83): change it to return a ClientTransport directly
final ListenableFuture<ClientTransport> obtainActiveTransport() {
UncancellableTransportFuture savedTransportFuture = activeTransportFuture;
if (savedTransportFuture != null) {
return savedTransportFuture;
ClientTransport savedTransport = activeTransport;
if (savedTransport != null) {
return Futures.<ClientTransport>immediateFuture(savedTransport);
}
synchronized (lock) {
// Check again, since it could have changed before acquiring the lock
if (activeTransportFuture == null) {
// In shutdown(), activeTransportFuture is set to NULL_VALUE_FUTURE, thus if
// activeTransportFuture is null, shutdown must be false.
Preconditions.checkState(!shutdown, "already shutdown");
Preconditions.checkState(activeTransportFuture == null || activeTransportFuture.isDone(),
"activeTransportFuture is neither null nor done");
activeTransportFuture = new UncancellableTransportFuture();
if (activeTransport == null && !shutdown) {
delayedTransport = new DelayedClientTransport();
transports.add(delayedTransport);
delayedTransport.start(new BaseTransportListener(delayedTransport));
activeTransport = delayedTransport;
scheduleConnection();
}
return activeTransportFuture;
return Futures.<ClientTransport>immediateFuture(activeTransport);
}
}
// Can only be called when shutdown == false
@GuardedBy("lock")
private void scheduleConnection() {
Preconditions.checkState(!shutdown, "Already shut down");
Preconditions.checkState(reconnectTask == null || reconnectTask.isDone(),
"previous reconnectTask is not done");
@ -184,22 +185,39 @@ final class TransportSet {
Runnable createTransportRunnable = new Runnable() {
@Override
public void run() {
DelayedClientTransport savedDelayedTransport;
ManagedClientTransport newActiveTransport;
boolean savedShutdown;
synchronized (lock) {
if (shutdown) {
return;
}
savedShutdown = shutdown;
if (currentAddressIndex == headIndex) {
backoffWatch.reset().start();
}
ClientTransport newActiveTransport =
transportFactory.newClientTransport(address, authority);
newActiveTransport = transportFactory.newClientTransport(address, authority);
log.log(Level.INFO, "Created transport {0} for {1}",
new Object[] {newActiveTransport, address});
transports.add(newActiveTransport);
newActiveTransport.start(
new TransportListener(newActiveTransport, activeTransportFuture, address));
Preconditions.checkState(activeTransportFuture.set(newActiveTransport),
"failed to set the new transport to the future");
new TransportListener(newActiveTransport, address));
if (shutdown) {
// If TransportSet already shutdown, newActiveTransport is only to take care of pending
// streams in delayedTransport, but will not serve new streams, and it will be shutdown
// as soon as it's set to the delayedTransport.
// activeTransport should have already been set to null by shutdown(). We keep it null.
Preconditions.checkState(activeTransport == null,
"Unexpected non-null activeTransport");
} else {
activeTransport = newActiveTransport;
}
savedDelayedTransport = delayedTransport;
delayedTransport = null;
}
savedDelayedTransport.setTransport(newActiveTransport);
// This delayed transport will terminate and be removed from transports.
savedDelayedTransport.shutdown();
if (savedShutdown) {
// See comments in the synchronized block above on why we shutdown here.
newActiveTransport.shutdown();
}
}
};
@ -228,65 +246,90 @@ final class TransportSet {
}
/**
* Shut down all transports, may run callback inline.
* Shut down all transports, stop creating new streams, but existing streams will continue.
*
* <p>May run callback inline.
*/
final void shutdown() {
UncancellableTransportFuture savedActiveTransportFuture;
ManagedClientTransport savedActiveTransport;
boolean runCallback = false;
synchronized (lock) {
if (shutdown) {
return;
}
shutdown = true;
savedActiveTransportFuture = activeTransportFuture;
activeTransportFuture = NULL_VALUE_FUTURE;
savedActiveTransport = activeTransport;
activeTransport = null;
if (transports.isEmpty()) {
runCallback = true;
Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
Preconditions.checkState(delayedTransport == null, "Should have no delayedTransport");
} // else: the callback will be run once all transports have been terminated
}
if (reconnectTask != null) {
reconnectTask.cancel(false);
}
// else: the callback will be run once all transports have been terminated
}
if (savedActiveTransportFuture != null) {
if (savedActiveTransportFuture.isDone()) {
try {
// Should not throw any exception here
savedActiveTransportFuture.get().shutdown();
} catch (Exception e) {
throw Throwables.propagate(e);
}
} else {
savedActiveTransportFuture.set(null);
}
if (savedActiveTransport != null) {
savedActiveTransport.shutdown();
}
if (runCallback) {
callback.onTerminated();
}
}
private class TransportListener implements ClientTransport.Listener {
private final SocketAddress address;
private final ClientTransport transport;
private final UncancellableTransportFuture transportFuture;
@GuardedBy("lock")
private void cancelReconnectTask() {
if (reconnectTask != null) {
reconnectTask.cancel(false);
reconnectTask = null;
}
}
public TransportListener(ClientTransport transport,
UncancellableTransportFuture transportFuture, SocketAddress address) {
/** Shared base for both delayed and real transports. */
private class BaseTransportListener implements ManagedClientTransport.Listener {
protected final ManagedClientTransport transport;
public BaseTransportListener(ManagedClientTransport transport) {
this.transport = transport;
this.transportFuture = transportFuture;
}
@Override
public void transportReady() {}
@Override
public void transportShutdown(Status status) {}
@Override
public void transportTerminated() {
boolean runCallback = false;
synchronized (lock) {
transports.remove(transport);
if (shutdown && transports.isEmpty()) {
runCallback = true;
cancelReconnectTask();
}
}
if (runCallback) {
callback.onTerminated();
}
}
}
/** Listener for real transports. */
private class TransportListener extends BaseTransportListener {
private final SocketAddress address;
public TransportListener(ManagedClientTransport transport, SocketAddress address) {
super(transport);
this.address = address;
}
@GuardedBy("lock")
private boolean isAttachedToActiveTransport() {
return activeTransportFuture == transportFuture;
return activeTransport == transport;
}
@Override
public void transportReady() {
synchronized (lock) {
log.log(Level.INFO, "Transport {0} for {1} is ready", new Object[] {transport, address});
Preconditions.checkState(transportFuture.isDone(), "the transport future is not done");
super.transportReady();
synchronized (lock) {
if (isAttachedToActiveTransport()) {
headIndex = -1;
}
@ -296,52 +339,32 @@ final class TransportSet {
@Override
public void transportShutdown(Status s) {
synchronized (lock) {
log.log(Level.INFO, "Transport {0} for {1} is being shutdown",
new Object[] {transport, address});
Preconditions.checkState(transportFuture.isDone(), "the transport future is not done");
super.transportShutdown(s);
synchronized (lock) {
if (isAttachedToActiveTransport()) {
activeTransportFuture = null;
activeTransport = null;
}
}
// TODO(zhangkun83): if loadBalancer was given delayedTransport earlier, it will get the real
// transport's shutdown event, and loadBalancer won't be able to match the two. This beats the
// purpose of passing the transport. We may just remove the second argument.
loadBalancer.transportShutdown(addressGroup, transport, s);
}
@Override
public void transportTerminated() {
boolean runCallback = false;
synchronized (lock) {
log.log(Level.INFO, "Transport {0} for {1} is terminated",
new Object[] {transport, address});
super.transportTerminated();
Preconditions.checkState(!isAttachedToActiveTransport(),
"Listener is still attached to activeTransportFuture. "
"Listener is still attached to activeTransport. "
+ "Seems transportTerminated was not called.");
transports.remove(transport);
if (shutdown && transports.isEmpty()) {
runCallback = true;
}
}
if (runCallback) {
callback.onTerminated();
}
}
}
interface Callback {
void onTerminated();
}
private static class UncancellableTransportFuture extends AbstractFuture<ClientTransport> {
@Override public boolean cancel(boolean mayInterruptIfRunning) {
// Do not cancel.
// A future instance is shared among multiple obtainActiveTransport() calls.
// Since the user of the future may cancel it when it's no longer needed, cancelling for real
// will affect other users of the same future.
return false;
}
@Override protected boolean set(ClientTransport v) {
return super.set(v);
}
}
}

View File

@ -0,0 +1,222 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.same;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.grpc.IntegerMarshaller;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StringMarshaller;
import io.grpc.internal.ClientTransport;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.util.concurrent.Executor;
/**
* Unit tests for {@link DelayedClientTransport}.
*/
@RunWith(JUnit4.class)
public class DelayedClientTransportTest {
@Mock private ManagedClientTransport.Listener transportListener;
@Mock private ClientTransport mockRealTransport;
@Mock private ClientStream mockRealStream;
@Mock private ClientStreamListener streamListener;
@Mock private ClientTransport.PingCallback pingCallback;
@Mock private Executor mockExecutor;
@Captor private ArgumentCaptor<Status> statusCaptor;
private final MethodDescriptor<String, Integer> method = MethodDescriptor.create(
MethodDescriptor.MethodType.UNKNOWN, "/service/method",
new StringMarshaller(), new IntegerMarshaller());
private final Metadata headers = new Metadata();
private final DelayedClientTransport delayedTransport = new DelayedClientTransport();
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
when(mockRealTransport.newStream(same(method), same(headers))).thenReturn(mockRealStream);
delayedTransport.start(transportListener);
}
@Test public void streamStartThenSetTransport() {
ClientStream stream = delayedTransport.newStream(method, headers);
stream.start(streamListener);
assertEquals(1, delayedTransport.getPendingStreamsCount());
assertTrue(stream instanceof DelayedStream);
delayedTransport.setTransport(mockRealTransport);
assertEquals(0, delayedTransport.getPendingStreamsCount());
verify(mockRealTransport).newStream(same(method), same(headers));
verify(mockRealStream).start(same(streamListener));
}
@Test public void newStreamThenSetTransportThenShutdown() {
ClientStream stream = delayedTransport.newStream(method, headers);
assertEquals(1, delayedTransport.getPendingStreamsCount());
assertTrue(stream instanceof DelayedStream);
delayedTransport.setTransport(mockRealTransport);
assertEquals(0, delayedTransport.getPendingStreamsCount());
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
verify(mockRealTransport).newStream(same(method), same(headers));
stream.start(streamListener);
verify(mockRealStream).start(same(streamListener));
}
@Test public void transportTerminatedThenSetTransport() {
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
delayedTransport.setTransport(mockRealTransport);
verifyNoMoreInteractions(transportListener);
}
@Test public void setTransportThenShutdownThenNewStream() {
delayedTransport.setTransport(mockRealTransport);
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
ClientStream stream = delayedTransport.newStream(method, headers);
assertEquals(0, delayedTransport.getPendingStreamsCount());
stream.start(streamListener);
assertFalse(stream instanceof DelayedStream);
verify(mockRealTransport).newStream(same(method), same(headers));
verify(mockRealStream).start(same(streamListener));
}
@Test public void setTransportThenShutdownNowThenNewStream() {
delayedTransport.setTransport(mockRealTransport);
delayedTransport.shutdownNow(Status.UNAVAILABLE);
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
ClientStream stream = delayedTransport.newStream(method, headers);
assertEquals(0, delayedTransport.getPendingStreamsCount());
stream.start(streamListener);
assertFalse(stream instanceof DelayedStream);
verify(mockRealTransport).newStream(same(method), same(headers));
verify(mockRealStream).start(same(streamListener));
}
@Test public void cancelStreamWithoutSetTransport() {
ClientStream stream = delayedTransport.newStream(method, new Metadata());
assertEquals(1, delayedTransport.getPendingStreamsCount());
stream.cancel(Status.CANCELLED);
assertEquals(0, delayedTransport.getPendingStreamsCount());
verifyNoMoreInteractions(mockRealTransport);
verifyNoMoreInteractions(mockRealStream);
}
@Test public void startThenCancelStreamWithoutSetTransport() {
ClientStream stream = delayedTransport.newStream(method, new Metadata());
stream.start(streamListener);
assertEquals(1, delayedTransport.getPendingStreamsCount());
stream.cancel(Status.CANCELLED);
assertEquals(0, delayedTransport.getPendingStreamsCount());
verify(streamListener).closed(same(Status.CANCELLED), any(Metadata.class));
verifyNoMoreInteractions(mockRealTransport);
verifyNoMoreInteractions(mockRealStream);
}
@Test public void newStreamThenShutdownTransportThenCancelStream() {
ClientStream stream = delayedTransport.newStream(method, new Metadata());
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener, times(0)).transportTerminated();
assertEquals(1, delayedTransport.getPendingStreamsCount());
stream.cancel(Status.CANCELLED);
verify(transportListener).transportTerminated();
assertEquals(0, delayedTransport.getPendingStreamsCount());
verifyNoMoreInteractions(mockRealTransport);
verifyNoMoreInteractions(mockRealStream);
}
@Test public void setTransportThenShutdownThenPing() {
delayedTransport.setTransport(mockRealTransport);
delayedTransport.shutdown();
delayedTransport.ping(pingCallback, mockExecutor);
verify(mockRealTransport).ping(same(pingCallback), same(mockExecutor));
}
@Test public void pingThenSetTransport() {
delayedTransport.ping(pingCallback, mockExecutor);
delayedTransport.setTransport(mockRealTransport);
verify(mockRealTransport).ping(same(pingCallback), same(mockExecutor));
}
@Test public void shutdownThenNewStream() {
delayedTransport.shutdown();
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
ClientStream stream = delayedTransport.newStream(method, new Metadata());
stream.start(streamListener);
verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
}
@Test public void startStreamThenShutdownNow() {
ClientStream stream = delayedTransport.newStream(method, new Metadata());
stream.start(streamListener);
delayedTransport.shutdownNow(Status.UNAVAILABLE);
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
}
@Test public void shutdownNowThenNewStream() {
delayedTransport.shutdownNow(Status.UNAVAILABLE);
verify(transportListener).transportShutdown(any(Status.class));
verify(transportListener).transportTerminated();
ClientStream stream = delayedTransport.newStream(method, new Metadata());
stream.start(streamListener);
verify(streamListener).closed(statusCaptor.capture(), any(Metadata.class));
assertEquals(Status.Code.UNAVAILABLE, statusCaptor.getValue().getCode());
}
}

View File

@ -63,7 +63,6 @@ public class DelayedStreamTest {
@Rule public final ExpectedException thrown = ExpectedException.none();
@Mock private ClientStreamListener listener;
@Mock private ClientTransport transport;
@Mock private ClientStream realStream;
@Captor private ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
private DelayedStream stream = new DelayedStream();

View File

@ -117,7 +117,7 @@ public class ManagedChannelImplTest {
@Rule public final ExpectedException thrown = ExpectedException.none();
@Mock
private ClientTransport mockTransport;
private ManagedClientTransport mockTransport;
@Mock
private ClientTransportFactory mockTransportFactory;
@Mock
@ -127,8 +127,8 @@ public class ManagedChannelImplTest {
@Mock
private ClientCall.Listener<Integer> mockCallListener3;
private ArgumentCaptor<ClientTransport.Listener> transportListenerCaptor =
ArgumentCaptor.forClass(ClientTransport.Listener.class);
private ArgumentCaptor<ManagedClientTransport.Listener> transportListenerCaptor =
ArgumentCaptor.forClass(ManagedClientTransport.Listener.class);
private ArgumentCaptor<ClientStreamListener> streamListenerCaptor =
ArgumentCaptor.forClass(ClientStreamListener.class);
@ -183,7 +183,6 @@ public class ManagedChannelImplTest {
verifyNoMoreInteractions(mockTransportFactory);
// Create transport and call
ClientTransport mockTransport = mock(ClientTransport.class);
ClientStream mockStream = mock(ClientStream.class);
Metadata headers = new Metadata();
when(mockTransportFactory.newClientTransport(any(SocketAddress.class), any(String.class)))
@ -193,7 +192,7 @@ public class ManagedChannelImplTest {
verify(mockTransportFactory, timeout(1000))
.newClientTransport(same(socketAddress), eq(authority));
verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture());
ClientTransport.Listener transportListener = transportListenerCaptor.getValue();
ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue();
verify(mockTransport, timeout(1000)).newStream(same(method), same(headers));
verify(mockStream).start(streamListenerCaptor.capture());
verify(mockStream).setCompressor(isA(Compressor.class));
@ -277,7 +276,7 @@ public class ManagedChannelImplTest {
call.cancel();
verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture());
final ClientTransport.Listener transportListener = transportListenerCaptor.getValue();
final ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue();
final Object lock = new Object();
final CyclicBarrier barrier = new CyclicBarrier(2);
new Thread() {
@ -386,8 +385,8 @@ public class ManagedChannelImplTest {
final SocketAddress badAddress = new SocketAddress() {};
final ResolvedServerInfo goodServer = new ResolvedServerInfo(goodAddress, Attributes.EMPTY);
final ResolvedServerInfo badServer = new ResolvedServerInfo(badAddress, Attributes.EMPTY);
final ClientTransport goodTransport = mock(ClientTransport.class);
final ClientTransport badTransport = mock(ClientTransport.class);
final ManagedClientTransport goodTransport = mock(ManagedClientTransport.class);
final ManagedClientTransport badTransport = mock(ManagedClientTransport.class);
when(mockTransportFactory.newClientTransport(same(goodAddress), any(String.class)))
.thenReturn(goodTransport);
when(mockTransportFactory.newClientTransport(same(badAddress), any(String.class)))
@ -413,8 +412,8 @@ public class ManagedChannelImplTest {
// First try should fail with the bad address.
call.start(mockCallListener, headers);
ArgumentCaptor<ClientTransport.Listener> badTransportListenerCaptor =
ArgumentCaptor.forClass(ClientTransport.Listener.class);
ArgumentCaptor<ManagedClientTransport.Listener> badTransportListenerCaptor =
ArgumentCaptor.forClass(ManagedClientTransport.Listener.class);
verify(mockCallListener, timeout(1000)).onClose(same(Status.UNAVAILABLE), any(Metadata.class));
verify(badTransport, timeout(1000)).start(badTransportListenerCaptor.capture());
badTransportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE);

View File

@ -55,6 +55,7 @@ import io.grpc.LoadBalancer;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.TransportManager;
import io.grpc.internal.TestUtils.MockClientTransportInfo;
import org.junit.After;
import org.junit.Before;
@ -148,10 +149,10 @@ public class ManagedChannelImplTransportManagerTest {
@Test
public void createAndReuseTransport() throws Exception {
doAnswer(new Answer<ClientTransport>() {
doAnswer(new Answer<ManagedClientTransport>() {
@Override
public ClientTransport answer(InvocationOnMock invocation) throws Throwable {
return mock(ClientTransport.class);
public ManagedClientTransport answer(InvocationOnMock invocation) throws Throwable {
return mock(ManagedClientTransport.class);
}
}).when(mockTransportFactory).newClientTransport(any(SocketAddress.class), any(String.class));
@ -173,8 +174,8 @@ public class ManagedChannelImplTransportManagerTest {
SocketAddress addr2 = mock(SocketAddress.class);
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(Arrays.asList(addr1, addr2));
LinkedList<ClientTransport.Listener> listeners =
TestUtils.captureListeners(mockTransportFactory);
LinkedList<MockClientTransportInfo> transports =
TestUtils.captureTransports(mockTransportFactory);
// Invocation counters
int backoffReset = 0;
@ -185,7 +186,7 @@ public class ManagedChannelImplTransportManagerTest {
verify(mockTransportFactory).newClientTransport(addr1, authority);
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
// Fail the first transport, without setting it to ready
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Subsequent getTransport() will use the next address
ListenableFuture<ClientTransport> future2a = tm.getTransport(addressGroup);
@ -197,9 +198,9 @@ public class ManagedChannelImplTransportManagerTest {
assertSame(future2a.get(), future2b.get());
assertNotSame(future1.get(), future2a.get());
// Make the second transport ready
listeners.peek().transportReady();
transports.peek().listener.transportReady();
// Disconnect the second transport
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Subsequent getTransport() will use the next address, which is the first one since we have run
// out of addresses.
@ -212,7 +213,7 @@ public class ManagedChannelImplTransportManagerTest {
verify(mockBackoffPolicy, times(0)).nextBackoffMillis();
verify(mockTransportFactory, times(2)).newClientTransport(addr1, authority);
verifyNoMoreInteractions(mockTransportFactory);
assertEquals(1, listeners.size());
assertEquals(1, transports.size());
}
@Test
@ -221,8 +222,8 @@ public class ManagedChannelImplTransportManagerTest {
SocketAddress addr2 = mock(SocketAddress.class);
EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(Arrays.asList(addr1, addr2));
LinkedList<ClientTransport.Listener> listeners =
TestUtils.captureListeners(mockTransportFactory);
LinkedList<MockClientTransportInfo> transports =
TestUtils.captureTransports(mockTransportFactory);
// Invocation counters
int transportsAddr1 = 0;
@ -236,9 +237,9 @@ public class ManagedChannelImplTransportManagerTest {
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Back-off policy was set initially.
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
listeners.peek().transportReady();
transports.peek().listener.transportReady();
// Then close it
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Second pick fails. This is the beginning of a series of failures.
ListenableFuture<ClientTransport> future2 = tm.getTransport(addressGroup);
@ -246,7 +247,7 @@ public class ManagedChannelImplTransportManagerTest {
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Back-off policy was reset.
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Third pick fails too
ListenableFuture<ClientTransport> future3 = tm.getTransport(addressGroup);
@ -254,7 +255,7 @@ public class ManagedChannelImplTransportManagerTest {
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Back-off policy was not reset.
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Forth pick is on addr2, back-off policy kicks in.
ListenableFuture<ClientTransport> future4 = tm.getTransport(addressGroup);

View File

@ -34,6 +34,10 @@ package io.grpc.internal;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -46,31 +50,53 @@ import java.util.LinkedList;
*/
final class TestUtils {
static class MockClientTransportInfo {
/**
* Stub the given mock {@link ClientTransportFactory} by returning mock {@link ClientTransport}s
* which saves their listeners to a list which is returned by this method.
* A mock transport created by the mock transport factory.
*/
static LinkedList<ClientTransport.Listener> captureListeners(
ClientTransportFactory mockTransportFactory) {
final LinkedList<ClientTransport.Listener> listeners =
new LinkedList<ClientTransport.Listener>();
final ManagedClientTransport transport;
doAnswer(new Answer<ClientTransport>() {
/**
* The listener passed to the start() of the mock transport.
*/
final ManagedClientTransport.Listener listener;
MockClientTransportInfo(ManagedClientTransport transport,
ManagedClientTransport.Listener listener) {
this.transport = transport;
this.listener = listener;
}
}
/**
* Stub the given mock {@link ClientTransportFactory} by returning mock
* {@link ManagedClientTransport}s which saves their listeners along with them. This method
* returns a list of {@link MockClientTransportInfo}, each of which is a started mock transport
* and its listener.
*/
static LinkedList<MockClientTransportInfo> captureTransports(
ClientTransportFactory mockTransportFactory) {
final LinkedList<MockClientTransportInfo> captor = new LinkedList<MockClientTransportInfo>();
doAnswer(new Answer<ManagedClientTransport>() {
@Override
public ClientTransport answer(InvocationOnMock invocation) throws Throwable {
ClientTransport mockTransport = mock(ClientTransport.class);
public ManagedClientTransport answer(InvocationOnMock invocation) throws Throwable {
final ManagedClientTransport mockTransport = mock(ManagedClientTransport.class);
when(mockTransport.newStream(any(MethodDescriptor.class), any(Metadata.class)))
.thenReturn(mock(ClientStream.class));
// Save the listener
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
listeners.add((ClientTransport.Listener) invocation.getArguments()[0]);
captor.add(new MockClientTransportInfo(
mockTransport, (ManagedClientTransport.Listener) invocation.getArguments()[0]));
return null;
}
}).when(mockTransport).start(any(ClientTransport.Listener.class));
}).when(mockTransport).start(any(ManagedClientTransport.Listener.class));
return mockTransport;
}
}).when(mockTransportFactory).newClientTransport(any(SocketAddress.class), any(String.class));
return listeners;
return captor;
}
}

View File

@ -31,21 +31,29 @@
package io.grpc.internal;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.same;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.EquivalentAddressGroup;
import io.grpc.IntegerMarshaller;
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.StringMarshaller;
import io.grpc.internal.TestUtils.MockClientTransportInfo;
import org.junit.Before;
import org.junit.Test;
@ -77,10 +85,16 @@ public class TransportSetTest {
@Mock private BackoffPolicy.Provider mockBackoffPolicyProvider;
@Mock private ClientTransportFactory mockTransportFactory;
@Mock private TransportSet.Callback mockTransportSetCallback;
@Mock private ClientStreamListener mockStreamListener;
private final MethodDescriptor<String, Integer> method = MethodDescriptor.create(
MethodDescriptor.MethodType.UNKNOWN, "/service/method",
new StringMarshaller(), new IntegerMarshaller());
private final Metadata headers = new Metadata();
private TransportSet transportSet;
private EquivalentAddressGroup addressGroup;
private LinkedList<ClientTransport.Listener> listeners;
private LinkedList<MockClientTransportInfo> transports;
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
@ -91,7 +105,7 @@ public class TransportSetTest {
when(mockBackoffPolicy1.nextBackoffMillis()).thenReturn(10L, 100L);
when(mockBackoffPolicy2.nextBackoffMillis()).thenReturn(10L, 100L);
when(mockBackoffPolicy3.nextBackoffMillis()).thenReturn(10L, 100L);
listeners = TestUtils.captureListeners(mockTransportFactory);
transports = TestUtils.captureTransports(mockTransportFactory);
}
@Test public void singleAddressBackoff() {
@ -109,7 +123,7 @@ public class TransportSetTest {
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Fail this one
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Second attempt uses the first back-off value interval.
transportSet.obtainActiveTransport();
@ -121,7 +135,7 @@ public class TransportSetTest {
fakeClock.forwardMillis(1);
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Fail this one too
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Third attempt uses the second back-off interval.
transportSet.obtainActiveTransport();
@ -133,9 +147,9 @@ public class TransportSetTest {
fakeClock.forwardMillis(1);
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Let this one succeed
listeners.peek().transportReady();
transports.peek().listener.transportReady();
// And close it
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Back-off is reset, and the next attempt will happen immediately
transportSet.obtainActiveTransport();
@ -165,9 +179,9 @@ public class TransportSetTest {
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Let this one through
listeners.peek().transportReady();
transports.peek().listener.transportReady();
// Then shut it down
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
////// Now start a series of failing attempts, where addr2 is the head.
@ -176,14 +190,14 @@ public class TransportSetTest {
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Second attempt will start immediately. Keep back-off policy.
transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Fail this one too
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Third attempt is on head, thus controlled by the first back-off interval.
transportSet.obtainActiveTransport();
@ -194,14 +208,14 @@ public class TransportSetTest {
fakeClock.forwardMillis(1);
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one too
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Forth attempt will start immediately. Keep back-off policy.
transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Fail this one too
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Fifth attempt is on head, thus controlled by the second back-off interval.
transportSet.obtainActiveTransport();
@ -212,9 +226,9 @@ public class TransportSetTest {
fakeClock.forwardMillis(1);
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Let it through
listeners.peek().transportReady();
transports.peek().listener.transportReady();
// Then close it.
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
////// Now start a series of failing attempts, where addr1 is the head.
@ -223,14 +237,14 @@ public class TransportSetTest {
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr1)).newClientTransport(addr1, authority);
// Fail this one
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Second attempt will start immediately. Keep back-off policy.
transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockTransportFactory, times(++transportsAddr2)).newClientTransport(addr2, authority);
// Fail this one too
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Third attempt is on head, thus controlled by the first back-off interval.
transportSet.obtainActiveTransport();
@ -263,7 +277,7 @@ public class TransportSetTest {
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Fail this one
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Won't reconnect until requested, even if back-off time has expired
fakeClock.forwardMillis(10);
@ -274,7 +288,7 @@ public class TransportSetTest {
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Fail this one, too
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// Request immediately, but will wait for back-off before reconnecting
assertNotNull(transportSet.obtainActiveTransport());
@ -284,7 +298,7 @@ public class TransportSetTest {
}
@Test
public void shutdownBeforeTransportCreated() throws Exception {
public void shutdownBeforeTransportCreatedWithPendingStream() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createTransortSet(addr);
@ -294,17 +308,79 @@ public class TransportSetTest {
assertTrue(pick.isDone());
assertNotNull(pick.get());
// Fail this one
listeners.poll().transportShutdown(Status.UNAVAILABLE);
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
transportInfo.listener.transportTerminated();
// Second transport will wait for back-off
pick = transportSet.obtainActiveTransport();
assertFalse(pick.isDone());
assertTrue(pick.isDone());
assertTrue(pick.get() instanceof DelayedClientTransport);
// Start a stream, which will be pending in the delayed transport
ClientStream pendingStream = pick.get().newStream(method, headers);
pendingStream.start(mockStreamListener);
// Shut down TransportSet before the transort is created.
// Shut down TransportSet before the transport is created. Further call to
// obtainActiveTransport() gets null results.
transportSet.shutdown();
pick = transportSet.obtainActiveTransport();
assertTrue(pick.isDone());
assertNull(pick.get());
verify(mockTransportFactory).newClientTransport(addr, authority);
// Reconnect will eventually happen, even though TransportSet has been shut down
fakeClock.forwardMillis(10);
verify(mockTransportFactory, times(2)).newClientTransport(addr, authority);
// The pending stream will be started on this newly started transport, which is promptly shut
// down by TransportSet right after the stream is created.
transportInfo = transports.poll();
verify(transportInfo.transport).newStream(same(method), same(headers));
verify(transportInfo.transport).shutdown();
verify(mockTransportSetCallback, never()).onTerminated();
// Terminating the transport will let TransportSet to be terminated.
transportInfo.listener.transportTerminated();
verify(mockTransportSetCallback).onTerminated();
// No more transports will be created.
fakeClock.forwardMillis(10000);
verifyNoMoreInteractions(mockTransportFactory);
assertEquals(0, transports.size());
}
@Test
public void shutdownBeforeTransportCreatedWithoutPendingStream() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createTransortSet(addr);
// First transport is created immediately
ListenableFuture<ClientTransport> pick = transportSet.obtainActiveTransport();
verify(mockTransportFactory).newClientTransport(addr, authority);
assertTrue(pick.isDone());
assertNotNull(pick.get());
// Fail this one
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
transportInfo.listener.transportTerminated();
// Second transport will wait for back-off
pick = transportSet.obtainActiveTransport();
assertTrue(pick.isDone());
assertTrue(pick.get() instanceof DelayedClientTransport);
// Shut down TransportSet before the transport is created. Futher call to
// obtainActiveTransport() gets null results.
transportSet.shutdown();
pick = transportSet.obtainActiveTransport();
assertTrue(pick.isDone());
assertNull(pick.get());
// TransportSet terminated promptly.
verify(mockTransportSetCallback).onTerminated();
// No more transports will be created.
fakeClock.forwardMillis(10000);
verifyNoMoreInteractions(mockTransportFactory);
assertEquals(0, transports.size());
}
@Test
@ -321,26 +397,24 @@ public class TransportSetTest {
}
@Test
public void cancellingTransportFutureWontAffectOtherCallers() {
public void futuresAreAlwaysComplete() throws Exception {
SocketAddress addr = mock(SocketAddress.class);
createTransortSet(addr);
// Fail the first pick so that the next pick will be pending on back-off
ListenableFuture<ClientTransport> future0 = transportSet.obtainActiveTransport();
assertTrue(future0.isDone());
listeners.poll().transportShutdown(Status.UNAVAILABLE);
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
ListenableFuture<ClientTransport> future1 = transportSet.obtainActiveTransport();
ListenableFuture<ClientTransport> future2 = transportSet.obtainActiveTransport();
// These futures are pending on back-off
assertFalse(future1.isDone());
assertFalse(future2.isDone());
assertTrue(future1.isDone());
assertTrue(future2.isDone());
// Cancel future1
future1.cancel(false);
// future2 is not affected. future1 can either be really cancelled or not, which we don't care.
assertFalse(future2.isDone());
assertTrue(future1.get() instanceof DelayedClientTransport);
assertTrue(future2.get() instanceof DelayedClientTransport);
}
private void createTransortSet(SocketAddress ... addrs) {

View File

@ -43,9 +43,9 @@ import io.grpc.Internal;
import io.grpc.NameResolver;
import io.grpc.internal.AbstractManagedChannelImplBuilder;
import io.grpc.internal.AbstractReferenceCounted;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.SharedResourceHolder;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
@ -304,7 +304,8 @@ public class NettyChannelBuilder extends AbstractManagedChannelImplBuilder<Netty
}
@Override
public ClientTransport newClientTransport(SocketAddress serverAddress, String authority) {
public ManagedClientTransport newClientTransport(
SocketAddress serverAddress, String authority) {
ProtocolNegotiator negotiator = protocolNegotiator != null ? protocolNegotiator :
createProtocolNegotiator(authority, negotiationType, sslContext);
return new NettyClientTransport(serverAddress, channelType, group, negotiator,

View File

@ -42,10 +42,10 @@ import com.google.common.base.Ticker;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.ClientTransport.PingCallback;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.Http2Ping;
import io.grpc.internal.ManagedClientTransport;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
@ -115,7 +115,7 @@ class NettyClientHandler extends AbstractNettyHandler {
private Throwable goAwayStatusThrowable;
private int nextStreamId;
static NettyClientHandler newHandler(ClientTransport.Listener listener,
static NettyClientHandler newHandler(ManagedClientTransport.Listener listener,
int flowControlWindow, int maxHeaderListSize,
Ticker ticker) {
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
@ -131,7 +131,7 @@ class NettyClientHandler extends AbstractNettyHandler {
static NettyClientHandler newHandler(Http2Connection connection,
Http2FrameReader frameReader,
Http2FrameWriter frameWriter,
final ClientTransport.Listener listener,
final ManagedClientTransport.Listener listener,
int flowControlWindow,
Ticker ticker) {
Preconditions.checkNotNull(connection, "connection");

View File

@ -40,7 +40,7 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.ManagedClientTransport;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -56,9 +56,9 @@ import java.util.concurrent.Executor;
import javax.annotation.concurrent.GuardedBy;
/**
* A Netty-based {@link ClientTransport} implementation.
* A Netty-based {@link ManagedClientTransport} implementation.
*/
class NettyClientTransport implements ClientTransport {
class NettyClientTransport implements ManagedClientTransport {
private final SocketAddress address;
private final Class<? extends Channel> channelType;
private final EventLoopGroup group;

View File

@ -50,8 +50,8 @@ import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerStreamListener;
@ -92,7 +92,7 @@ import java.util.concurrent.TimeoutException;
public class NettyClientTransportTest {
@Mock
private ClientTransport.Listener clientTransportListener;
private ManagedClientTransport.Listener clientTransportListener;
private final List<NettyClientTransport> transports = new ArrayList<NettyClientTransport>();
private NioEventLoopGroup group;

View File

@ -46,9 +46,9 @@ import io.grpc.ExperimentalApi;
import io.grpc.NameResolver;
import io.grpc.internal.AbstractManagedChannelImplBuilder;
import io.grpc.internal.AbstractReferenceCounted;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.SharedResourceHolder.Resource;
@ -255,7 +255,7 @@ public class OkHttpChannelBuilder extends
}
@Override
public ClientTransport newClientTransport(SocketAddress addr, String authority) {
public ManagedClientTransport newClientTransport(SocketAddress addr, String authority) {
InetSocketAddress inetSocketAddr = (InetSocketAddress) addr;
return new OkHttpClientTransport(inetSocketAddr, authority, executor, socketFactory,
Utils.convertSpec(connectionSpec), maxMessageSize);

View File

@ -44,9 +44,9 @@ import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.Http2Ping;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.SerializingExecutor;
import io.grpc.okhttp.internal.ConnectionSpec;
import io.grpc.okhttp.internal.framed.ErrorCode;
@ -84,9 +84,9 @@ import javax.annotation.concurrent.GuardedBy;
import javax.net.ssl.SSLSocketFactory;
/**
* A okhttp-based {@link ClientTransport} implementation.
* A okhttp-based {@link ManagedClientTransport} implementation.
*/
class OkHttpClientTransport implements ClientTransport {
class OkHttpClientTransport implements ManagedClientTransport {
private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS;
private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
private static final OkHttpClientStream[] EMPTY_STREAM_ARRAY = new OkHttpClientStream[0];

View File

@ -73,6 +73,7 @@ import io.grpc.internal.AbstractStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.okhttp.OkHttpClientTransport.ClientFrameHandler;
import io.grpc.okhttp.internal.ConnectionSpec;
import io.grpc.okhttp.internal.framed.ErrorCode;
@ -131,7 +132,7 @@ public class OkHttpClientTransportTest {
@Mock
MethodDescriptor<?, ?> method;
@Mock
private ClientTransport.Listener transportListener;
private ManagedClientTransport.Listener transportListener;
private OkHttpClientTransport clientTransport;
private MockFrameReader frameReader;
private ExecutorService executor;
@ -1323,7 +1324,7 @@ public class OkHttpClientTransportTest {
ConnectionSpec.CLEARTEXT,
DEFAULT_MAX_MESSAGE_SIZE);
ClientTransport.Listener listener = mock(ClientTransport.Listener.class);
ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
clientTransport.start(listener);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener, timeout(TIME_OUT_MS)).transportShutdown(captor.capture());