diff --git a/core/src/main/java/io/grpc/transport/AbstractClientTransport.java b/core/src/main/java/io/grpc/transport/AbstractClientTransport.java deleted file mode 100644 index d26f75e877..0000000000 --- a/core/src/main/java/io/grpc/transport/AbstractClientTransport.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package io.grpc.transport; - -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.AbstractService; - -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; - -/** - * Abstract base class for all {@link ClientTransport} implementations. Implements the - * {@link #newStream} method to perform a state check on the service before allowing stream - * creation. - */ -public abstract class AbstractClientTransport extends AbstractService implements ClientTransport { - - @Override - public final ClientStream newStream(MethodDescriptor method, - Metadata.Headers headers, - ClientStreamListener listener) { - Preconditions.checkNotNull(method, "method"); - Preconditions.checkNotNull(listener, "listener"); - if (state() == State.STARTING) { - // Wait until the transport is running before creating the new stream. - awaitRunning(); - } - - if (state() != State.RUNNING) { - throw new IllegalStateException("Invalid state for creating new stream: " + state(), - failureCause()); - } - - // Create the stream. - return newStreamInternal(method, headers, listener); - } - - /** - * Called by {@link #newStream} to perform the actual creation of the new {@link ClientStream}. - * This is only called after the transport has successfully transitioned to the {@code RUNNING} - * state. - * - * @param method the RPC method to be invoked on the server by the new stream. - * @param listener the listener for events on the new stream. - * @return the new stream. - */ - protected abstract ClientStream newStreamInternal(MethodDescriptor method, - Metadata.Headers headers, - ClientStreamListener listener); -} diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java index eb62205a57..9f25ad9aa6 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java @@ -34,13 +34,13 @@ package io.grpc.transport.netty; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.Metadata; import io.grpc.MethodDescriptor; -import io.grpc.transport.AbstractClientTransport; import io.grpc.transport.ClientStream; import io.grpc.transport.ClientStreamListener; import io.grpc.transport.ClientTransport; @@ -79,7 +79,7 @@ import javax.net.ssl.SSLParameters; /** * A Netty-based {@link ClientTransport} implementation. */ -class NettyClientTransport extends AbstractClientTransport { +class NettyClientTransport extends AbstractService implements ClientTransport { private final SocketAddress address; private final EventLoopGroup eventGroup; @@ -87,7 +87,7 @@ class NettyClientTransport extends AbstractClientTransport { private final NettyClientHandler handler; private final boolean ssl; private final AsciiString authority; - private Channel channel; + private volatile Channel channel; NettyClientTransport(SocketAddress address, NegotiationType negotiationType, EventLoopGroup eventGroup, SslContext sslContext) { @@ -140,8 +140,27 @@ class NettyClientTransport extends AbstractClientTransport { } @Override - protected ClientStream newStreamInternal(MethodDescriptor method, Metadata.Headers headers, + public ClientStream newStream(MethodDescriptor method, Metadata.Headers headers, ClientStreamListener listener) { + Preconditions.checkNotNull(method, "method"); + Preconditions.checkNotNull(headers, "headers"); + Preconditions.checkNotNull(listener, "listener"); + + // We can't write to the channel until negotiation is complete. Use state() instead of blindly + // calling awaitRunning() in order to avoid obtaining a lock in the common case. + if (state() != State.RUNNING) { + try { + awaitRunning(); + } catch (IllegalStateException ex) { + if (channel == null) { + // Negotiation did not complete, so still can't write to channel. Ex should already + // contain failureCause() information. + throw ex; + } + } + // channel is now guaranteed to be non-null + } + // Create the stream. NettyClientStream stream = new NettyClientStream(listener, channel, handler); diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java index a2d5e30b54..48a21d9003 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java @@ -33,6 +33,7 @@ package io.grpc.transport.okhttp; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractService; import com.squareup.okhttp.internal.spdy.ErrorCode; import com.squareup.okhttp.internal.spdy.FrameReader; @@ -46,7 +47,6 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.Status.Code; -import io.grpc.transport.AbstractClientTransport; import io.grpc.transport.ClientStream; import io.grpc.transport.ClientStreamListener; import io.grpc.transport.ClientTransport; @@ -75,7 +75,7 @@ import javax.net.ssl.SSLSocketFactory; /** * A okhttp-based {@link ClientTransport} implementation. */ -public class OkHttpClientTransport extends AbstractClientTransport { +public class OkHttpClientTransport extends AbstractService implements ClientTransport { /** The default initial window size in HTTP/2 is 64 KiB for the stream and connection. */ @VisibleForTesting static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024; @@ -164,15 +164,19 @@ public class OkHttpClientTransport extends AbstractClientTransport { } @Override - protected ClientStream newStreamInternal(MethodDescriptor method, - Metadata.Headers headers, - ClientStreamListener listener) { + public ClientStream newStream(MethodDescriptor method, Metadata.Headers headers, + ClientStreamListener listener) { + Preconditions.checkNotNull(method, "method"); + Preconditions.checkNotNull(headers, "headers"); + Preconditions.checkNotNull(listener, "listener"); OkHttpClientStream clientStream = OkHttpClientStream.newStream(listener, frameWriter, this, outboundFlow); - if (goAway) { - clientStream.transportReportStatus(goAwayStatus, false, new Metadata.Trailers()); - } else { - assignStreamId(clientStream); + synchronized (lock) { + if (goAway) { + throw new IllegalStateException("Transport not running", goAwayStatus.asRuntimeException()); + } else { + assignStreamId(clientStream); + } } String defaultPath = "/" + method.getName(); frameWriter.synStream(false, false, clientStream.id(), 0,