diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java new file mode 100644 index 0000000000..71d48f327a --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java @@ -0,0 +1,42 @@ +package com.google.net.stubby.newtransport; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractService; +import com.google.net.stubby.MethodDescriptor; + +/** + * Abstract base class for all {@link ClientTransport} implementations. Implements the + * {@link #newStream} method to perform a state check on the service before allowing stream + * creation. + */ +public abstract class AbstractClientTransport extends AbstractService implements ClientTransport { + + @Override + public final ClientStream newStream(MethodDescriptor method, StreamListener listener) { + Preconditions.checkNotNull(method, "method"); + Preconditions.checkNotNull(listener, "listener"); + if (state() == State.STARTING) { + // Wait until the transport is running before creating the new stream. + awaitRunning(); + } + + if (state() != State.RUNNING) { + throw new IllegalStateException("Invalid state for creating new stream: " + state()); + } + + // Create the stream. + return newStreamInternal(method, listener); + } + + /** + * Called by {@link #newStream} to perform the actual creation of the new {@link ClientStream}. + * This is only called after the transport has successfully transitioned to the {@code RUNNING} + * state. + * + * @param method the RPC method to be invoked on the server by the new stream. + * @param listener the listener for events on the new stream. + * @return the new stream. + */ + protected abstract ClientStream newStreamInternal(MethodDescriptor method, + StreamListener listener); +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java new file mode 100644 index 0000000000..121e063183 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java @@ -0,0 +1,263 @@ +package com.google.net.stubby.newtransport; + +import static com.google.net.stubby.newtransport.StreamState.CLOSED; +import static com.google.net.stubby.newtransport.StreamState.OPEN; +import static com.google.net.stubby.newtransport.StreamState.READ_ONLY; + +import com.google.common.base.Preconditions; +import com.google.common.io.Closeables; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.net.stubby.Status; + +import java.io.InputStream; +import java.nio.ByteBuffer; + +import javax.annotation.Nullable; + +/** + * Abstract base class for {@link Stream} implementations. + */ +public abstract class AbstractStream implements Stream { + + /** + * Indicates the phase of the GRPC stream in one direction. + */ + protected enum Phase { + CONTEXT, MESSAGE, STATUS + } + + private volatile StreamState state = StreamState.OPEN; + private Status status; + private final Object stateLock = new Object(); + private final Object writeLock = new Object(); + private final MessageFramer framer; + private final StreamListener listener; + protected Phase inboundPhase = Phase.CONTEXT; + protected Phase outboundPhase = Phase.CONTEXT; + + /** + * Handler for Framer output. + */ + private final Framer.Sink outboundFrameHandler = new Framer.Sink() { + @Override + public void deliverFrame(ByteBuffer frame, boolean endOfStream) { + sendFrame(frame, endOfStream); + } + }; + + /** + * Handler for Deframer output. + */ + private final Framer inboundMessageHandler = new Framer() { + @Override + public void writeContext(String name, InputStream value, int length) { + ListenableFuture future = null; + try { + inboundPhase(Phase.CONTEXT); + future = listener.contextRead(name, value, length); + } finally { + closeWhenDone(future, value); + } + } + + @Override + public void writePayload(InputStream input, int length) { + ListenableFuture future = null; + try { + inboundPhase(Phase.MESSAGE); + future = listener.messageRead(input, length); + } finally { + closeWhenDone(future, input); + } + } + + @Override + public void writeStatus(Status status) { + inboundPhase(Phase.STATUS); + setStatus(status); + } + + @Override + public void flush() {} + + @Override + public boolean isClosed() { + return false; + } + + @Override + public void close() {} + + @Override + public void dispose() {} + }; + + protected AbstractStream(StreamListener listener) { + this.listener = Preconditions.checkNotNull(listener, "listener"); + + framer = new MessageFramer(outboundFrameHandler, 4096); + // No compression at the moment. + framer.setAllowCompression(false); + } + + @Override + public StreamState state() { + return state; + } + + @Override + public final void close() { + outboundPhase(Phase.STATUS); + synchronized (stateLock) { + state = state == OPEN ? READ_ONLY : CLOSED; + } + synchronized (writeLock) { + framer.close(); + } + } + + /** + * Free any resources associated with this stream. Subclass implementations must call this + * version. + */ + public void dispose() { + synchronized (writeLock) { + framer.dispose(); + } + } + + @Override + public final void writeContext(String name, InputStream value, int length, + @Nullable Runnable accepted) { + Preconditions.checkNotNull(name, "name"); + Preconditions.checkNotNull(value, "value"); + Preconditions.checkArgument(length >= 0, "length must be >= 0"); + outboundPhase(Phase.CONTEXT); + synchronized (writeLock) { + if (!framer.isClosed()) { + framer.writeContext(name, value, length); + } + } + + // TODO(user): add flow control. + if (accepted != null) { + accepted.run(); + } + } + + @Override + public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) { + Preconditions.checkNotNull(message, "message"); + Preconditions.checkArgument(length >= 0, "length must be >= 0"); + outboundPhase(Phase.MESSAGE); + synchronized (writeLock) { + if (!framer.isClosed()) { + framer.writePayload(message, length); + } + } + + // TODO(user): add flow control. + if (accepted != null) { + accepted.run(); + } + } + + @Override + public final void flush() { + synchronized (writeLock) { + if (!framer.isClosed()) { + framer.flush(); + } + } + } + + /** + * Sets the status if not already set and notifies the stream listener that the stream was closed. + * This method must be called from the transport thread. + * + * @param newStatus the new status to set + * @return {@code} true if the status was not already set. + */ + public boolean setStatus(final Status newStatus) { + Preconditions.checkNotNull(newStatus, "newStatus"); + synchronized (stateLock) { + if (status != null) { + // Disallow override of current status. + return false; + } + + status = newStatus; + state = CLOSED; + } + + // Invoke the observer callback. + listener.closed(newStatus); + + // Free any resources. + dispose(); + + return true; + } + + /** + * Sends an outbound frame to the server. + * + * @param frame a buffer containing the chunk of data to be sent. + * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by + * this endpoint. + */ + protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream); + + /** + * Gets the handler for inbound messages. Subclasses must use this as the target for a + * {@link com.google.net.stubby.newtransport.Deframer}. + */ + protected final Framer inboundMessageHandler() { + return inboundMessageHandler; + } + + /** + * Transitions the inbound phase. If the transition is disallowed, throws a + * {@link IllegalStateException}. + */ + protected final void inboundPhase(Phase nextPhase) { + inboundPhase = verifyNextPhase(inboundPhase, nextPhase); + } + + /** + * Transitions the outbound phase. If the transition is disallowed, throws a + * {@link IllegalStateException}. + */ + protected final void outboundPhase(Phase nextPhase) { + outboundPhase = verifyNextPhase(outboundPhase, nextPhase); + } + + private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) { + if (nextPhase.ordinal() < currentPhase.ordinal() || currentPhase == Phase.STATUS) { + throw new IllegalStateException( + String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase)); + } + return nextPhase; + } + + /** + * If the given future is provided, closes the {@link InputStream} when it completes. Otherwise + * the {@link InputStream} is closed immediately. + */ + private static void closeWhenDone(@Nullable ListenableFuture future, + final InputStream input) { + if (future == null) { + Closeables.closeQuietly(input); + return; + } + + // Close the buffer when the future completes. + future.addListener(new Runnable() { + @Override + public void run() { + Closeables.closeQuietly(input); + } + }, MoreExecutors.sameThreadExecutor()); + } +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java new file mode 100644 index 0000000000..86fad43890 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java @@ -0,0 +1,164 @@ +package com.google.net.stubby.newtransport.http; + +import static com.google.net.stubby.Status.CANCELLED; +import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_HEADER; +import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_PROTORPC; +import static com.google.net.stubby.newtransport.HttpUtil.HTTP_METHOD; + +import com.google.common.base.Preconditions; +import com.google.common.io.ByteBuffers; +import com.google.net.stubby.MethodDescriptor; +import com.google.net.stubby.Status; +import com.google.net.stubby.newtransport.AbstractClientTransport; +import com.google.net.stubby.newtransport.AbstractStream; +import com.google.net.stubby.newtransport.ClientStream; +import com.google.net.stubby.newtransport.InputStreamDeframer; +import com.google.net.stubby.newtransport.StreamListener; +import com.google.net.stubby.newtransport.StreamState; +import com.google.net.stubby.transport.Transport; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * A simple client-side transport for RPC-over-HTTP/1.1. All execution (including listener + * callbacks) are executed in the application thread context. + */ +public class HttpClientTransport extends AbstractClientTransport { + + private final URI baseUri; + private final Set streams = + Collections.synchronizedSet(new HashSet()); + + public HttpClientTransport(URI baseUri) { + this.baseUri = Preconditions.checkNotNull(baseUri, "baseUri"); + } + + @Override + protected ClientStream newStreamInternal(MethodDescriptor method, StreamListener listener) { + URI uri = baseUri.resolve(method.getName()); + HttpClientStream stream = new HttpClientStream(uri, listener); + synchronized (streams) { + // Check for RUNNING to deal with race condition of this being executed right after doStop + // cancels all the streams. + if (state() != State.RUNNING) { + throw new IllegalStateException("Invalid state for creating new stream: " + state()); + } + streams.add(stream); + return stream; + } + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + // Cancel all of the streams for this transport. + synchronized (streams) { + // Guaranteed to be in the STOPPING state here. + for (HttpClientStream stream : streams.toArray(new HttpClientStream[0])) { + stream.cancel(); + } + } + notifyStopped(); + } + + /** + * Client stream implementation for an HTTP transport. + */ + private class HttpClientStream extends AbstractStream implements ClientStream { + final HttpURLConnection connection; + final DataOutputStream outputStream; + boolean connected; + + HttpClientStream(URI uri, StreamListener listener) { + super(listener); + + try { + connection = (HttpURLConnection) uri.toURL().openConnection(); + connection.setDoOutput(true); + connection.setDoInput(true); + connection.setRequestMethod(HTTP_METHOD); + connection.setRequestProperty(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC); + outputStream = new DataOutputStream(connection.getOutputStream()); + connected = true; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void cancel() { + outboundPhase = Phase.STATUS; + if (setStatus(CANCELLED)) { + disconnect(); + } + } + + @Override + protected void sendFrame(ByteBuffer frame, boolean endOfStream) { + if (state() == StreamState.CLOSED) { + // Ignore outbound frames after the stream has closed. + return; + } + + try { + // Synchronizing here to protect against cancellation due to the transport shutting down. + synchronized (connection) { + // Write the data to the connection output stream. + ByteBuffers.asByteSource(frame).copyTo(outputStream); + + if (endOfStream) { + // Close the output stream on this connection. + connection.getOutputStream().close(); + + // The request has completed so now process the response. This results in the listener's + // closed() callback being invoked since we're indicating that this is the end of the + // response stream. + // + // NOTE: Must read the response in the sending thread, since URLConnection has threading + // issues. + new InputStreamDeframer(inboundMessageHandler()).deliverFrame( + connection.getInputStream(), true); + + // Close the input stream and disconnect. + connection.getInputStream().close(); + disconnect(); + } + } + } catch (IOException ioe) { + setStatus(new Status(Transport.Code.INTERNAL, ioe)); + } + } + + @Override + public void dispose() { + super.dispose(); + disconnect(); + } + + /** + * Disconnects the HTTP connection if currently connected. + */ + private void disconnect() { + // Synchronizing since this may be called for the stream (i.e. cancel or read complete) or + // due to shutting down the transport (i.e. cancel). + synchronized (connection) { + if (connected) { + connected = false; + streams.remove(this); + connection.disconnect(); + } + } + } + } +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransportFactory.java b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransportFactory.java new file mode 100644 index 0000000000..794d02dfc9 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransportFactory.java @@ -0,0 +1,27 @@ +package com.google.net.stubby.newtransport.http; + +import com.google.net.stubby.newtransport.ClientTransportFactory; + +import java.net.URI; +import java.net.URISyntaxException; + +/** + * Factory that manufactures instances of {@link HttpClientTransport}. + */ +public class HttpClientTransportFactory implements ClientTransportFactory { + private final URI baseUri; + + public HttpClientTransportFactory(String host, int port, boolean ssl) { + try { + String scheme = ssl ? "https" : "http"; + baseUri = new URI(scheme, null, host, port, "/", null, null); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @Override + public HttpClientTransport newClientTransport() { + return new HttpClientTransport(baseUri); + } +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java index 8881ad0851..2143a26659 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java @@ -1,120 +1,33 @@ package com.google.net.stubby.newtransport.netty; import static com.google.net.stubby.newtransport.StreamState.CLOSED; -import static com.google.net.stubby.newtransport.StreamState.OPEN; -import static com.google.net.stubby.newtransport.StreamState.READ_ONLY; import com.google.common.base.Preconditions; -import com.google.common.io.Closeables; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.net.stubby.Status; +import com.google.net.stubby.newtransport.AbstractStream; import com.google.net.stubby.newtransport.ClientStream; import com.google.net.stubby.newtransport.Deframer; -import com.google.net.stubby.newtransport.Framer; -import com.google.net.stubby.newtransport.MessageFramer; import com.google.net.stubby.newtransport.StreamListener; -import com.google.net.stubby.newtransport.StreamState; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelPromise; -import java.io.InputStream; import java.nio.ByteBuffer; -import javax.annotation.Nullable; - /** * Client stream for a Netty transport. */ -class NettyClientStream implements ClientStream { +class NettyClientStream extends AbstractStream implements ClientStream { public static final int PENDING_STREAM_ID = -1; - /** - * Indicates the phase of the GRPC stream in one direction. - */ - private enum Phase { - CONTEXT, MESSAGE, STATUS - } - - /** - * Guards transition of stream state. - */ - private final Object stateLock = new Object(); - - /** - * Guards access to the frame writer. - */ - private final Object writeLock = new Object(); - - private volatile StreamState state = OPEN; private volatile int id = PENDING_STREAM_ID; - private Status status; - private Phase inboundPhase = Phase.CONTEXT; - private Phase outboundPhase = Phase.CONTEXT; - private final StreamListener listener; private final Channel channel; - private final Framer framer; private final Deframer deframer; - private final Framer.Sink outboundFrameHandler = new Framer.Sink() { - @Override - public void deliverFrame(ByteBuffer buffer, boolean endStream) { - ByteBuf buf = toByteBuf(buffer); - send(buf, endStream, endStream); - } - }; - - private final Framer inboundMessageHandler = new Framer() { - @Override - public void writeContext(String name, InputStream value, int length) { - ListenableFuture future = null; - try { - inboundPhase(Phase.CONTEXT); - future = listener.contextRead(name, value, length); - } finally { - closeWhenDone(future, value); - } - } - - @Override - public void writePayload(InputStream input, int length) { - ListenableFuture future = null; - try { - inboundPhase(Phase.MESSAGE); - future = listener.messageRead(input, length); - } finally { - closeWhenDone(future, input); - } - } - - @Override - public void writeStatus(Status status) { - inboundPhase(Phase.STATUS); - setStatus(status); - } - - @Override - public void flush() {} - - @Override - public boolean isClosed() { - return false; - } - - @Override - public void close() {} - - @Override - public void dispose() {} - }; - NettyClientStream(StreamListener listener, Channel channel) { - this.listener = Preconditions.checkNotNull(listener, "listener"); + super(listener); this.channel = Preconditions.checkNotNull(channel, "channel"); - this.deframer = new ByteBufDeframer(channel.alloc(), inboundMessageHandler); - this.framer = new MessageFramer(outboundFrameHandler, 4096); + this.deframer = new ByteBufDeframer(channel.alloc(), inboundMessageHandler()); } /** @@ -128,25 +41,6 @@ class NettyClientStream implements ClientStream { this.id = id; } - @Override - public StreamState state() { - return state; - } - - @Override - public void close() { - outboundPhase(Phase.STATUS); - // Transition the state to mark the close the local side of the stream. - synchronized (stateLock) { - state = state == OPEN ? READ_ONLY : CLOSED; - } - - // Close the frame writer and send any buffered frames. - synchronized (writeLock) { - framer.close(); - } - } - @Override public void cancel() { outboundPhase = Phase.STATUS; @@ -155,60 +49,6 @@ class NettyClientStream implements ClientStream { channel.writeAndFlush(new CancelStreamCommand(this)); } - /** - * Free any resources associated with this stream. - */ - public void dispose() { - synchronized (writeLock) { - framer.dispose(); - } - } - - @Override - public void writeContext(String name, InputStream value, int length, - @Nullable final Runnable accepted) { - Preconditions.checkNotNull(name, "name"); - Preconditions.checkNotNull(value, "value"); - Preconditions.checkArgument(length >= 0, "length must be >= 0"); - outboundPhase(Phase.CONTEXT); - synchronized (writeLock) { - if (!framer.isClosed()) { - framer.writeContext(name, value, length); - } - } - - // TODO(user): add flow control. - if (accepted != null) { - accepted.run(); - } - } - - @Override - public void writeMessage(InputStream message, int length, @Nullable final Runnable accepted) { - Preconditions.checkNotNull(message, "message"); - Preconditions.checkArgument(length >= 0, "length must be >= 0"); - outboundPhase(Phase.MESSAGE); - synchronized (writeLock) { - if (!framer.isClosed()) { - framer.writePayload(message, length); - } - } - - // TODO(user): add flow control. - if (accepted != null) { - accepted.run(); - } - } - - @Override - public void flush() { - synchronized (writeLock) { - if (!framer.isClosed()) { - framer.flush(); - } - } - } - /** * Called in the channel thread to process the content of an inbound DATA frame. * @@ -219,7 +59,7 @@ class NettyClientStream implements ClientStream { public void inboundDataReceived(ByteBuf frame, boolean endOfStream, ChannelPromise promise) { Preconditions.checkNotNull(frame, "frame"); Preconditions.checkNotNull(promise, "promise"); - if (state == CLOSED) { + if (state() == CLOSED) { promise.setSuccess(); return; } @@ -231,44 +71,11 @@ class NettyClientStream implements ClientStream { promise.setSuccess(); } - /** - * Sets the status if not already set and notifies the stream listener that the stream was closed. - * This method must be called from the Netty channel thread. - * - * @param newStatus the new status to set - * @return {@code} true if the status was not already set. - */ - public boolean setStatus(final Status newStatus) { - Preconditions.checkNotNull(newStatus, "newStatus"); - synchronized (stateLock) { - if (status != null) { - // Disallow override of current status. - return false; - } - - status = newStatus; - state = CLOSED; - } - - // Invoke the observer callback. - listener.closed(newStatus); - - // Free any resources. - dispose(); - - return true; - } - - /** - * Writes the given frame to the channel. - * - * @param data the grpc frame to be written. - * @param endStream indicates whether this is the last frame to be sent for this stream. - * @param endMessage indicates whether the data ends at a message boundary. - */ - private void send(ByteBuf data, boolean endStream, boolean endMessage) { - SendGrpcFrameCommand frame = new SendGrpcFrameCommand(this, data, endStream, endMessage); - channel.writeAndFlush(frame); + @Override + protected void sendFrame(ByteBuffer frame, boolean endOfStream) { + SendGrpcFrameCommand cmd = + new SendGrpcFrameCommand(this, toByteBuf(frame), endOfStream, endOfStream); + channel.writeAndFlush(cmd); } /** @@ -279,49 +86,4 @@ class NettyClientStream implements ClientStream { buf.writeBytes(source); return buf; } - - /** - * Transitions the inbound phase. If the transition is disallowed, throws a - * {@link IllegalStateException}. - */ - private void inboundPhase(Phase nextPhase) { - inboundPhase = verifyNextPhase(inboundPhase, nextPhase); - } - - /** - * Transitions the outbound phase. If the transition is disallowed, throws a - * {@link IllegalStateException}. - */ - private void outboundPhase(Phase nextPhase) { - outboundPhase = verifyNextPhase(outboundPhase, nextPhase); - } - - private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) { - // Only allow forward progression. - if (nextPhase.ordinal() < currentPhase.ordinal() || currentPhase == Phase.STATUS) { - throw new IllegalStateException( - String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase)); - } - return nextPhase; - } - - /** - * If the given future is provided, closes the {@link InputStream} when it completes. Otherwise - * the {@link InputStream} is closed immediately. - */ - private static void closeWhenDone(@Nullable ListenableFuture future, - final InputStream input) { - if (future == null) { - Closeables.closeQuietly(input); - return; - } - - // Close the buffer when the future completes. - future.addListener(new Runnable() { - @Override - public void run() { - Closeables.closeQuietly(input); - } - }, MoreExecutors.sameThreadExecutor()); - } } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java index 28a652d9e0..1ef9416b4d 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java @@ -3,8 +3,8 @@ package com.google.net.stubby.newtransport.netty; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.AbstractService; import com.google.net.stubby.MethodDescriptor; +import com.google.net.stubby.newtransport.AbstractClientTransport; import com.google.net.stubby.newtransport.ClientStream; import com.google.net.stubby.newtransport.ClientTransport; import com.google.net.stubby.newtransport.StreamListener; @@ -25,7 +25,7 @@ import java.util.concurrent.ExecutionException; /** * A Netty-based {@link ClientTransport} implementation. */ -class NettyClientTransport extends AbstractService implements ClientTransport { +class NettyClientTransport extends AbstractClientTransport { private final String host; private final int port; @@ -58,22 +58,7 @@ class NettyClientTransport extends AbstractService implements ClientTransport { } @Override - public ClientStream newStream(MethodDescriptor method, StreamListener listener) { - Preconditions.checkNotNull(method, "method"); - Preconditions.checkNotNull(listener, "listener"); - switch (state()) { - case STARTING: - // Wait until the transport is running before creating the new stream. - awaitRunning(); - break; - case NEW: - case TERMINATED: - case FAILED: - throw new IllegalStateException("Unable to create new stream in state: " + state()); - default: - break; - } - + protected ClientStream newStreamInternal(MethodDescriptor method, StreamListener listener) { // Create the stream. NettyClientStream stream = new NettyClientStream(listener, channel);