From c5e70c23107a6873713a7f940e7535dd44b9db99 Mon Sep 17 00:00:00 2001 From: lryan Date: Mon, 24 Nov 2014 16:41:02 -0800 Subject: [PATCH] Remove StreamState and use inboundPhase/outboundPhase instead Remove synchronization on stateLock as we are not required to be thread safe Add better toString for stream impls Internal cleanup of various 'status' fields in AbstractClientStream Remove 'stashTrailers' as we've already extracted status in layer above correctly ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=80678356 --- .../java/com/google/net/stubby/Status.java | 16 +- .../transport/AbstractClientStream.java | 134 +++++++--------- .../transport/AbstractServerStream.java | 82 ++++------ .../net/stubby/transport/AbstractStream.java | 146 ++++++++++++------ .../stubby/transport/Http2ClientStream.java | 4 +- .../google/net/stubby/transport/Stream.java | 6 - .../net/stubby/transport/StreamState.java | 36 ----- .../transport/netty/NettyClientHandler.java | 11 +- .../netty/NettyClientHandlerTest.java | 6 +- .../netty/NettyClientStreamTest.java | 33 ++-- .../netty/NettyServerStreamTest.java | 24 +-- .../transport/okhttp/OkHttpClientStream.java | 4 +- .../okhttp/OkHttpClientTransport.java | 6 +- .../net/stubby/testing/InProcessUtils.java | 15 -- 14 files changed, 237 insertions(+), 286 deletions(-) delete mode 100644 core/src/main/java/com/google/net/stubby/transport/StreamState.java diff --git a/core/src/main/java/com/google/net/stubby/Status.java b/core/src/main/java/com/google/net/stubby/Status.java index 34c379e870..fe5318ecb3 100644 --- a/core/src/main/java/com/google/net/stubby/Status.java +++ b/core/src/main/java/com/google/net/stubby/Status.java @@ -2,6 +2,7 @@ package com.google.net.stubby; import static com.google.common.base.Charsets.US_ASCII; +import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -346,16 +347,11 @@ public final class Status { @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("[").append(code); - if (description != null) { - builder.append(";").append(description); - } - if (cause != null) { - builder.append(";").append(cause); - } - builder.append("]"); - return builder.toString(); + return MoreObjects.toStringHelper(this) + .add("code", code.name()) + .add("description", description) + .add("cause", cause) + .toString(); } private static class StatusCodeMarshaller implements Metadata.Marshaller { diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java index 44bfa65173..0baa37997e 100644 --- a/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java @@ -1,9 +1,6 @@ package com.google.net.stubby.transport; -import static com.google.net.stubby.transport.StreamState.CLOSED; -import static com.google.net.stubby.transport.StreamState.OPEN; -import static com.google.net.stubby.transport.StreamState.READ_ONLY; - +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; import com.google.net.stubby.Metadata; @@ -16,7 +13,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; /** * The abstract base class for {@link ClientStream} implementations. @@ -27,16 +23,13 @@ public abstract class AbstractClientStream extends AbstractStream private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName()); private final ClientStreamListener listener; + private boolean listenerClosed; - @GuardedBy("stateLock") + // Stored status & trailers to report when deframer completes or + // transportReportStatus is directly called. private Status status; + private Metadata.Trailers trailers; - private final Object stateLock = new Object(); - private volatile StreamState state = StreamState.OPEN; - - // Stored status & trailers to report when deframer completes. - private Status stashedStatus; - private Metadata.Trailers stashedTrailers; protected AbstractClientStream(ClientStreamListener listener, @Nullable Decompressor decompressor, @@ -62,15 +55,14 @@ public abstract class AbstractClientStream extends AbstractStream * @param errorStatus the error to report */ protected void inboundTransportError(Status errorStatus) { - if (state() == CLOSED) { + if (inboundPhase() == Phase.STATUS) { log.log(Level.INFO, "Received transport error on closed stream {0} {1}", new Object[]{id(), errorStatus}); return; } - inboundPhase(Phase.STATUS); // For transport errors we immediately report status to the application layer // and do not wait for additional payloads. - setStatus(errorStatus, new Metadata.Trailers()); + transportReportStatus(errorStatus, new Metadata.Trailers()); } /** @@ -82,7 +74,7 @@ public abstract class AbstractClientStream extends AbstractStream * @param headers the parsed headers */ protected void inboundHeadersReceived(Metadata.Headers headers) { - if (state() == CLOSED) { + if (inboundPhase() == Phase.STATUS) { log.log(Level.INFO, "Received headers on closed stream {0} {1}", new Object[]{id(), headers}); } @@ -95,11 +87,11 @@ public abstract class AbstractClientStream extends AbstractStream */ protected void inboundDataReceived(Buffer frame) { Preconditions.checkNotNull(frame, "frame"); - if (state() == CLOSED) { + if (inboundPhase() == Phase.STATUS) { frame.close(); return; } - if (inboundPhase == Phase.HEADERS) { + if (inboundPhase() == Phase.HEADERS) { // Have not received headers yet so error inboundTransportError(Status.INTERNAL.withDescription("headers not received before payload")); frame.close(); @@ -120,16 +112,16 @@ public abstract class AbstractClientStream extends AbstractStream * Called by transport implementations when they receive trailers. */ protected void inboundTrailersReceived(Metadata.Trailers trailers, Status status) { - if (state() == CLOSED) { + Preconditions.checkNotNull(trailers, "trailers"); + if (inboundPhase() == Phase.STATUS) { log.log(Level.INFO, "Received trailers on closed stream {0}\n {1}\n {3}", new Object[]{id(), status, trailers}); } - inboundPhase(Phase.STATUS); // Stash the status & trailers so they can be delivered by the deframer calls // remoteEndClosed - stashedStatus = status; + this.status = status; if (GRPC_V2_PROTOCOL) { - stashTrailers(trailers); + this.trailers = trailers; } deframe(Buffers.empty(), true); } @@ -138,27 +130,13 @@ public abstract class AbstractClientStream extends AbstractStream @Override protected void receiveStatus(Status status) { Preconditions.checkNotNull(status, "status"); - stashedStatus = status; - stashedTrailers = new Metadata.Trailers(); - } - - /** - * If using gRPC v2 protocol, this method must be called with received trailers before notifying - * deframer of end of stream. - */ - protected void stashTrailers(Metadata.Trailers trailers) { - Preconditions.checkNotNull(trailers, "trailers"); - stashedStatus = trailers.get(Status.CODE_KEY) - .withDescription(trailers.get(Status.MESSAGE_KEY)); - trailers.removeAll(Status.CODE_KEY); - trailers.removeAll(Status.MESSAGE_KEY); - stashedTrailers = trailers; + this.status = status; + trailers = new Metadata.Trailers(); } @Override protected void remoteEndClosed() { - Preconditions.checkState(stashedStatus != null, "Status and trailers should have been set"); - setStatus(stashedStatus, stashedTrailers); + transportReportStatus(status, trailers); } @Override @@ -176,59 +154,63 @@ public abstract class AbstractClientStream extends AbstractStream protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream); /** - * Sets the status if not already set and notifies the stream listener that the stream was closed. + * Report stream closure with status to the application layer if not already reported. * This method must be called from the transport thread. * * @param newStatus the new status to set * @return {@code} true if the status was not already set. */ - public boolean setStatus(final Status newStatus, Metadata.Trailers trailers) { + public boolean transportReportStatus(final Status newStatus, Metadata.Trailers trailers) { Preconditions.checkNotNull(newStatus, "newStatus"); - synchronized (stateLock) { - if (status != null) { - // Disallow override of current status. - return false; - } - status = newStatus; - state = CLOSED; + inboundPhase(Phase.STATUS); + status = newStatus; + // Invoke the observer callback which will schedule work onto an application thread + if (!listenerClosed) { + // Status has not been reported to the application layer + listenerClosed = true; + listener.closed(newStatus, trailers); } - - // Invoke the observer callback. - listener.closed(newStatus, trailers); - - // Free any resources. - dispose(); - return true; } @Override public final void halfClose() { - outboundPhase(Phase.STATUS); - synchronized (stateLock) { - state = state == OPEN ? READ_ONLY : CLOSED; - } - closeFramer(null); - } - - @Override - public StreamState state() { - return state; - } - - @Override - public void cancel() { - // Allow phase to go to cancelled regardless of prior phase. - outboundPhase = Phase.STATUS; - if (id() != null) { - // Only send a cancellation to remote side if we have actually been allocated - // a stream id. i.e. the server side is aware of the stream. - sendCancel(); + if (outboundPhase(Phase.STATUS) != Phase.STATUS) { + closeFramer(null); } } /** - * Send a stream cancellation message to the remote server. + * Cancel the stream. Called by the application layer, never called by the transport. + */ + @Override + public void cancel() { + outboundPhase(Phase.STATUS); + if (id() != null) { + // Only send a cancellation to remote side if we have actually been allocated + // a stream id and we are not already closed. i.e. the server side is aware of the stream. + sendCancel(); + } + dispose(); + } + + /** + * Send a stream cancellation message to the remote server. Can be called by either the + * application or transport layers. */ protected abstract void sendCancel(); + + @Override + protected MoreObjects.ToStringHelper toStringHelper() { + MoreObjects.ToStringHelper toStringHelper = super.toStringHelper(); + if (status != null) { + toStringHelper.add("status", status); + } + return toStringHelper; + } + + @Override + public boolean isClosed() { + return super.isClosed() || listenerClosed; + } } diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java index 4f8fb5163b..564e16ecb3 100644 --- a/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java @@ -1,9 +1,5 @@ package com.google.net.stubby.transport; -import static com.google.net.stubby.transport.StreamState.CLOSED; -import static com.google.net.stubby.transport.StreamState.OPEN; -import static com.google.net.stubby.transport.StreamState.WRITE_ONLY; - import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; import com.google.net.stubby.Metadata; @@ -16,7 +12,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; /** * Abstract base class for {@link ServerStream} implementations. @@ -25,14 +20,11 @@ public abstract class AbstractServerStream extends AbstractStream implements ServerStream { private static final Logger log = Logger.getLogger(AbstractServerStream.class.getName()); + /** Whether listener.closed() has been called. */ + private boolean listenerClosed; private ServerStreamListener listener; - private final Object stateLock = new Object(); - private volatile StreamState state = StreamState.OPEN; private boolean headersSent = false; - /** Whether listener.closed() has been called. */ - @GuardedBy("stateLock") - private boolean listenerClosed; /** * Whether the stream was closed gracefully by the application (vs. a transport-level failure). */ @@ -84,15 +76,12 @@ public abstract class AbstractServerStream extends AbstractStream public final void close(Status status, Metadata.Trailers trailers) { Preconditions.checkNotNull(status, "status"); Preconditions.checkNotNull(trailers, "trailers"); - outboundPhase(Phase.STATUS); - synchronized (stateLock) { - state = CLOSED; + if (outboundPhase(Phase.STATUS) != Phase.STATUS) { + gracefulClose = true; + this.stashedTrailers = trailers; + writeStatusToTrailers(status); + closeFramer(status); } - gracefulClose = true; - this.stashedTrailers = trailers; - writeStatusToTrailers(status); - closeFramer(status); - dispose(); } private void writeStatusToTrailers(Status status) { @@ -111,7 +100,7 @@ public abstract class AbstractServerStream extends AbstractStream * be retained. */ public void inboundDataReceived(Buffer frame, boolean endOfStream) { - if (state() == StreamState.CLOSED) { + if (inboundPhase() == Phase.STATUS) { frame.close(); return; } @@ -173,12 +162,7 @@ public abstract class AbstractServerStream extends AbstractStream * abortStream()} for abnormal. */ public void complete() { - synchronized (stateLock) { - if (listenerClosed) { - return; - } - listenerClosed = true; - } + listenerClosed = true; if (!gracefulClose) { listener.closed(Status.INTERNAL.withDescription("successful complete() without close()")); throw new IllegalStateException("successful complete() without close()"); @@ -186,22 +170,14 @@ public abstract class AbstractServerStream extends AbstractStream listener.closed(Status.OK); } - @Override - public StreamState state() { - return state; - } - /** * Called when the remote end half-closes the stream. */ @Override protected final void remoteEndClosed() { - synchronized (stateLock) { - Preconditions.checkState(state == OPEN, "Stream not OPEN"); - state = WRITE_ONLY; + if (inboundPhase(Phase.STATUS) != Phase.STATUS) { + listener.halfClosed(); } - inboundPhase(Phase.STATUS); - listener.halfClosed(); } /** @@ -217,31 +193,27 @@ public abstract class AbstractServerStream extends AbstractStream * about stream closure and send the status */ public final void abortStream(Status status, boolean notifyClient) { + // TODO(user): Investigate whether we can remove the notification to the client + // and rely on a transport layer stream reset instead. Preconditions.checkArgument(!status.isOk(), "status must not be OK"); - boolean closeListener; - synchronized (stateLock) { - if (state == CLOSED) { - // Can't actually notify client. - notifyClient = false; - } - state = CLOSED; - closeListener = !listenerClosed; + if (!listenerClosed) { listenerClosed = true; + listener.closed(status); } - - try { - if (notifyClient) { - if (stashedTrailers == null) { - stashedTrailers = new Metadata.Trailers(); - } - writeStatusToTrailers(status); - closeFramer(status); + if (notifyClient) { + // TODO(user): Remove + if (stashedTrailers == null) { + stashedTrailers = new Metadata.Trailers(); } + writeStatusToTrailers(status); + closeFramer(status); + } else { dispose(); - } finally { - if (closeListener) { - listener.closed(status); - } } } + + @Override + public boolean isClosed() { + return super.isClosed() || listenerClosed; + } } diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java index 90dee8a519..6e3bf7773d 100644 --- a/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java @@ -1,5 +1,7 @@ package com.google.net.stubby.transport; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.io.Closeables; import com.google.common.util.concurrent.FutureCallback; @@ -33,7 +35,6 @@ public abstract class AbstractStream implements Stream { } private volatile IdT id; - private final Object writeLock = new Object(); private final Framer framer; private final FutureCallback deframerErrorCallback = new FutureCallback() { @Override @@ -47,8 +48,16 @@ public abstract class AbstractStream implements Stream { final GrpcDeframer deframer; final MessageDeframer2 deframer2; - Phase inboundPhase = Phase.HEADERS; - Phase outboundPhase = Phase.HEADERS; + + /** + * Inbound phase is exclusively written to by the transport thread. + */ + private Phase inboundPhase = Phase.HEADERS; + + /** + * Outbound phase is exclusively written to by the application thread. + */ + private Phase outboundPhase = Phase.HEADERS; AbstractStream(@Nullable Decompressor decompressor, Executor deframerExecutor) { @@ -119,25 +128,13 @@ public abstract class AbstractStream implements Stream { this.id = id; } - /** - * Free any resources associated with this stream. Subclass implementations must call this - * version. - */ - public void dispose() { - synchronized (writeLock) { - framer.dispose(); - } - } - @Override public void writeMessage(InputStream message, int length, @Nullable Runnable accepted) { Preconditions.checkNotNull(message, "message"); Preconditions.checkArgument(length >= 0, "length must be >= 0"); outboundPhase(Phase.MESSAGE); - synchronized (writeLock) { - if (!framer.isClosed()) { - framer.writePayload(message, length); - } + if (!framer.isClosed()) { + framer.writePayload(message, length); } // TODO(user): add flow control. @@ -148,13 +145,40 @@ public abstract class AbstractStream implements Stream { @Override public final void flush() { - synchronized (writeLock) { - if (!framer.isClosed()) { - framer.flush(); - } + if (!framer.isClosed()) { + framer.flush(); } } + /** + * Closes the underlying framer. + * + *

No-op if the framer has already been closed. + * + * @param status if not null, will write the status to the framer before closing it + */ + final void closeFramer(@Nullable Status status) { + if (!framer.isClosed()) { + if (status != null) { + framer.writeStatus(status); + } + framer.close(); + } + } + + /** + * Free any resources associated with this stream. Subclass implementations must call this + * version. + *

+ * NOTE. Can be called by both the transport thread and the application thread. Transport + * threads need to dispose when the remote side has terminated the stream. Application threads + * will dispose when the application decides to close the stream as part of normal processing. + *

+ */ + public void dispose() { + framer.dispose(); + } + /** * Sends an outbound frame to the remote end point. * @@ -216,42 +240,36 @@ public abstract class AbstractStream implements Stream { } } + final Phase inboundPhase() { + return inboundPhase; + } + /** - * Transitions the inbound phase. If the transition is disallowed, throws a - * {@link IllegalStateException}. + * Transitions the inbound phase to the given phase and returns the previous phase. + * If the transition is disallowed, throws an {@link IllegalStateException}. */ - final void inboundPhase(Phase nextPhase) { + final Phase inboundPhase(Phase nextPhase) { + Phase tmp = inboundPhase; inboundPhase = verifyNextPhase(inboundPhase, nextPhase); + return tmp; + } + + final Phase outboundPhase() { + return outboundPhase; } /** - * Transitions the outbound phase. If the transition is disallowed, throws a - * {@link IllegalStateException}. + * Transitions the outbound phase to the given phase and returns the previous phase. + * If the transition is disallowed, throws an {@link IllegalStateException}. */ - final void outboundPhase(Phase nextPhase) { + final Phase outboundPhase(Phase nextPhase) { + Phase tmp = outboundPhase; outboundPhase = verifyNextPhase(outboundPhase, nextPhase); - } - - /** - * Closes the underlying framer. - * - *

No-op if the framer has already been closed. - * - * @param status if not null, will write the status to the framer before closing it - */ - final void closeFramer(@Nullable Status status) { - synchronized (writeLock) { - if (!framer.isClosed()) { - if (status != null) { - framer.writeStatus(status); - } - framer.close(); - } - } + return tmp; } private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) { - if (nextPhase.ordinal() < currentPhase.ordinal() || currentPhase == Phase.STATUS) { + if (nextPhase.ordinal() < currentPhase.ordinal()) { throw new IllegalStateException( String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase)); } @@ -277,4 +295,40 @@ public abstract class AbstractStream implements Stream { } }, MoreExecutors.directExecutor()); } + + /** + * Can the stream receive data from its remote peer. + */ + public boolean canReceive() { + return inboundPhase() != Phase.STATUS; + } + + /** + * Can the stream send data to its remote peer. + */ + public boolean canSend() { + return outboundPhase() != Phase.STATUS; + } + + /** + * Is the stream fully closed. Note that this method is not thread-safe as inboundPhase and + * outboundPhase are mutated in different threads. Tests must account for thread coordination + * when calling. + */ + @VisibleForTesting + public boolean isClosed() { + return inboundPhase() == Phase.STATUS && outboundPhase() == Phase.STATUS; + } + + public String toString() { + return toStringHelper().toString(); + } + + protected MoreObjects.ToStringHelper toStringHelper() { + return MoreObjects.toStringHelper(this) + .add("id", id()) + .add("inboundPhase", inboundPhase().name()) + .add("outboundPhase", outboundPhase().name()); + + } } diff --git a/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java b/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java index 59f7df57e7..cc32fedbd4 100644 --- a/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java @@ -85,7 +85,7 @@ public abstract class Http2ClientStream extends AbstractClientStream { } protected void transportDataReceived(Buffer frame, boolean endOfStream) { - if (inboundPhase == Phase.HEADERS) { + if (inboundPhase() == Phase.HEADERS) { // Must receive headers prior to receiving any payload as we use headers to check for // protocol correctness. transportError = Status.INTERNAL.withDescription("no headers received prior to data"); @@ -199,5 +199,7 @@ public abstract class Http2ClientStream extends AbstractClientStream { */ private static void stripTransportDetails(Metadata metadata) { metadata.removeAll(HTTP2_STATUS); + metadata.removeAll(Status.CODE_KEY); + metadata.removeAll(Status.MESSAGE_KEY); } } diff --git a/core/src/main/java/com/google/net/stubby/transport/Stream.java b/core/src/main/java/com/google/net/stubby/transport/Stream.java index 6de7a5b3cd..68a09c5a8d 100644 --- a/core/src/main/java/com/google/net/stubby/transport/Stream.java +++ b/core/src/main/java/com/google/net/stubby/transport/Stream.java @@ -10,12 +10,6 @@ import javax.annotation.Nullable; *

An implementation doesn't need to be thread-safe. */ public interface Stream { - - /** - * Gets the current state of this stream. - */ - StreamState state(); - /** * Writes a message payload to the remote end-point. The bytes from the stream are immediate read * by the Transport. This method will always return immediately and will not wait for the write to diff --git a/core/src/main/java/com/google/net/stubby/transport/StreamState.java b/core/src/main/java/com/google/net/stubby/transport/StreamState.java deleted file mode 100644 index d102e63dfa..0000000000 --- a/core/src/main/java/com/google/net/stubby/transport/StreamState.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.google.net.stubby.transport; - -/** - * The state of a single {@link Stream} within a transport. - * - *

Client state transitions:
- * OPEN->READ_ONLY->CLOSED (no-error case)
- * OPEN->CLOSED (error)
- * STARTING->CLOSED (Failed creation)
- * - *

Server state transitions:
- * OPEN->WRITE_ONLY->CLOSED (no-error case)
- * OPEN->CLOSED (error case)
- */ -public enum StreamState { - - /** - * The stream is open for write by both endpoints. - */ - OPEN, - - /** - * Only the remote endpoint may send data. The local endpoint may only read. - */ - READ_ONLY, - - /** - * Only the local endpoint may send data. The remote endpoint may only read. - */ - WRITE_ONLY, - - /** - * Neither endpoint may send data. - */ - CLOSED -} diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientHandler.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientHandler.java index 1e2aa609c1..4b4767050e 100644 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientHandler.java +++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientHandler.java @@ -150,7 +150,7 @@ class NettyClientHandler extends Http2ConnectionHandler { // TODO(user): do something with errorCode? Http2Stream http2Stream = connection().requireStream(streamId); NettyClientStream stream = clientStream(http2Stream); - stream.setStatus(Status.UNKNOWN, new Metadata.Trailers()); + stream.transportReportStatus(Status.UNKNOWN, new Metadata.Trailers()); } /** @@ -166,7 +166,7 @@ class NettyClientHandler extends Http2ConnectionHandler { // Any streams that are still active must be closed. for (Http2Stream stream : http2Streams()) { - clientStream(stream).setStatus(goAwayStatus, new Metadata.Trailers()); + clientStream(stream).transportReportStatus(goAwayStatus, new Metadata.Trailers()); } } @@ -185,7 +185,8 @@ class NettyClientHandler extends Http2ConnectionHandler { // Close the stream with a status that contains the cause. Http2Stream stream = connection().stream(http2Ex.streamId()); if (stream != null) { - clientStream(stream).setStatus(Status.fromThrowable(cause), new Metadata.Trailers()); + clientStream(stream).transportReportStatus(Status.fromThrowable(cause), + new Metadata.Trailers()); } // Delegate to the base class to send a RST_STREAM. @@ -210,7 +211,7 @@ class NettyClientHandler extends Http2ConnectionHandler { private void cancelStream(ChannelHandlerContext ctx, CancelStreamCommand cmd, ChannelPromise promise) throws Http2Exception { NettyClientStream stream = cmd.stream(); - stream.setStatus(Status.CANCELLED, new Metadata.Trailers()); + stream.transportReportStatus(Status.CANCELLED, new Metadata.Trailers()); // No need to set the stream status for a cancellation. It should already have been // set prior to sending the command. @@ -254,7 +255,7 @@ class NettyClientHandler extends Http2ConnectionHandler { int lastKnownStream = connection().local().lastKnownStream(); for (Http2Stream stream : http2Streams()) { if (lastKnownStream < stream.id()) { - clientStream(stream).setStatus(goAwayStatus, new Metadata.Trailers()); + clientStream(stream).transportReportStatus(goAwayStatus, new Metadata.Trailers()); stream.close(); } } diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientHandlerTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientHandlerTest.java index cedca1635f..c6195b899f 100644 --- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientHandlerTest.java +++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientHandlerTest.java @@ -18,7 +18,6 @@ import static org.mockito.Mockito.when; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; -import com.google.net.stubby.transport.StreamState; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -85,8 +84,6 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { .add(as("auth"), as("sometoken")) .add(CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC); - when(stream.state()).thenReturn(StreamState.OPEN); - // Simulate activation of the handler to force writing of the initial settings handler.handlerAdded(ctx); @@ -226,7 +223,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase { handler.channelRead(ctx, goAwayFrame(0)); ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); InOrder inOrder = inOrder(stream); - inOrder.verify(stream, calls(1)).setStatus(captor.capture(), notNull(Metadata.Trailers.class)); + inOrder.verify(stream, calls(1)).transportReportStatus(captor.capture(), + notNull(Metadata.Trailers.class)); assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode()); } diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java index bf2f38871f..8a01ddbbab 100644 --- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java @@ -7,6 +7,8 @@ import static com.google.net.stubby.transport.netty.Utils.CONTENT_TYPE_HEADER; import static com.google.net.stubby.transport.netty.Utils.STATUS_OK; import static io.netty.util.CharsetUtil.UTF_8; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; @@ -17,7 +19,6 @@ import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; import com.google.net.stubby.transport.AbstractStream; import com.google.net.stubby.transport.ClientStreamListener; -import com.google.net.stubby.transport.StreamState; import io.netty.buffer.Unpooled; import io.netty.handler.codec.AsciiString; @@ -67,7 +68,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase { // Force stream creation. stream().id(STREAM_ID); stream().halfClose(); - assertEquals(StreamState.READ_ONLY, stream.state()); + assertTrue(stream().canReceive()); + assertFalse(stream().canSend()); } @Test @@ -97,36 +99,36 @@ public class NettyClientStreamTest extends NettyStreamTestBase { @Test public void setStatusWithOkShouldCloseStream() { stream().id(1); - stream().setStatus(Status.OK, new Metadata.Trailers()); + stream().transportReportStatus(Status.OK, new Metadata.Trailers()); verify(listener).closed(same(Status.OK), any(Metadata.Trailers.class)); - assertEquals(StreamState.CLOSED, stream.state()); + assertTrue(stream.isClosed()); } @Test public void setStatusWithErrorShouldCloseStream() { Status errorStatus = Status.INTERNAL; - stream().setStatus(errorStatus, new Metadata.Trailers()); + stream().transportReportStatus(errorStatus, new Metadata.Trailers()); verify(listener).closed(eq(errorStatus), any(Metadata.Trailers.class)); - assertEquals(StreamState.CLOSED, stream.state()); + assertTrue(stream.isClosed()); } @Test public void setStatusWithOkShouldNotOverrideError() { Status errorStatus = Status.INTERNAL; - stream().setStatus(errorStatus, new Metadata.Trailers()); - stream().setStatus(Status.OK, new Metadata.Trailers()); + stream().transportReportStatus(errorStatus, new Metadata.Trailers()); + stream().transportReportStatus(Status.OK, new Metadata.Trailers()); verify(listener).closed(any(Status.class), any(Metadata.Trailers.class)); - assertEquals(StreamState.CLOSED, stream.state()); + assertTrue(stream.isClosed()); } @Test public void setStatusWithErrorShouldNotOverridePreviousError() { Status errorStatus = Status.INTERNAL; - stream().setStatus(errorStatus, new Metadata.Trailers()); - stream().setStatus(Status.fromThrowable(new RuntimeException("fake")), + stream().transportReportStatus(errorStatus, new Metadata.Trailers()); + stream().transportReportStatus(Status.fromThrowable(new RuntimeException("fake")), new Metadata.Trailers()); verify(listener).closed(any(Status.class), any(Metadata.Trailers.class)); - assertEquals(StreamState.CLOSED, stream.state()); + assertTrue(stream.isClosed()); } @Override @@ -170,7 +172,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase { ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); verify(listener).closed(captor.capture(), any(Metadata.Trailers.class)); assertEquals(Status.INTERNAL.getCode(), captor.getValue().getCode()); - assertEquals(StreamState.CLOSED, stream.state()); + assertTrue(stream.isClosed()); } @Test @@ -194,7 +196,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase { ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); verify(listener).closed(captor.capture(), any(Metadata.Trailers.class)); assertEquals(Status.INTERNAL.getCode(), captor.getValue().getCode()); - assertEquals(StreamState.CLOSED, stream.state()); + assertTrue(stream.isClosed()); } @@ -209,7 +211,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase { @Override protected AbstractStream createStream() { AbstractStream stream = new NettyClientStream(listener, channel, handler); - assertEquals(StreamState.OPEN, stream.state()); + assertTrue(stream.canSend()); + assertTrue(stream.canReceive()); return stream; } diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerStreamTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerStreamTest.java index c45312ce74..3dd1b19edb 100644 --- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerStreamTest.java +++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerStreamTest.java @@ -2,7 +2,7 @@ package com.google.net.stubby.transport.netty; import static com.google.net.stubby.transport.netty.NettyTestUtil.messageFrame; import static com.google.net.stubby.transport.netty.NettyTestUtil.statusFrame; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.same; import static org.mockito.Mockito.never; @@ -14,7 +14,6 @@ import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; import com.google.net.stubby.transport.AbstractStream; import com.google.net.stubby.transport.ServerStreamListener; -import com.google.net.stubby.transport.StreamState; import io.netty.buffer.EmptyByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; @@ -78,7 +77,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase { // Sending complete. Listener gets closed() stream().complete(); verify(serverListener).closed(Status.OK); - assertEquals(StreamState.CLOSED, stream.state()); + assertTrue(stream().isClosed()); verifyZeroInteractions(serverListener); } @@ -92,7 +91,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase { // Sending complete. Listener gets closed() stream().complete(); verify(serverListener).closed(Status.OK); - assertEquals(StreamState.CLOSED, stream.state()); + assertTrue(stream().isClosed()); verifyZeroInteractions(serverListener); } @@ -100,12 +99,12 @@ public class NettyServerStreamTest extends NettyStreamTestBase { public void closeAfterClientHalfCloseShouldSucceed() throws Exception { // Client half-closes. Listener gets halfClosed() stream().inboundDataReceived(new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true); - assertEquals(StreamState.WRITE_ONLY, stream.state()); + assertTrue(stream().canSend()); verify(serverListener).halfClosed(); // Server closes. Status sent stream().close(Status.OK, trailers); + assertTrue(stream().isClosed()); verifyNoMoreInteractions(serverListener); - assertEquals(StreamState.CLOSED, stream.state()); verify(channel).writeAndFlush( new SendGrpcFrameCommand(STREAM_ID, statusFrame(Status.OK), true)); // Sending and receiving complete. Listener gets closed() @@ -118,7 +117,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase { public void abortStreamAndSendStatus() throws Exception { Status status = Status.INTERNAL.withCause(new Throwable()); stream().abortStream(status, true); - assertEquals(StreamState.CLOSED, stream.state()); + assertTrue(stream().isClosed()); verify(serverListener).closed(same(status)); verify(channel).writeAndFlush(new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true)); verifyNoMoreInteractions(serverListener); @@ -128,7 +127,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase { public void abortStreamAndNotSendStatus() throws Exception { Status status = Status.INTERNAL.withCause(new Throwable()); stream().abortStream(status, false); - assertEquals(StreamState.CLOSED, stream.state()); + assertTrue(stream().isClosed()); verify(serverListener).closed(same(status)); verify(channel, never()).writeAndFlush( new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true)); @@ -140,20 +139,21 @@ public class NettyServerStreamTest extends NettyStreamTestBase { Status status = Status.INTERNAL.withCause(new Throwable()); // Client half-closes. Listener gets halfClosed() stream().inboundDataReceived(new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true); - assertEquals(StreamState.WRITE_ONLY, stream.state()); + assertTrue(stream().canSend()); verify(serverListener).halfClosed(); - // Abort + // Abort from the transport layer stream().abortStream(status, true); verify(serverListener).closed(same(status)); - assertEquals(StreamState.CLOSED, stream.state()); verifyNoMoreInteractions(serverListener); + assertTrue(stream().isClosed()); } @Override protected AbstractStream createStream() { NettyServerStream stream = new NettyServerStream(channel, STREAM_ID, handler); stream.setListener(serverListener); - assertEquals(StreamState.OPEN, stream.state()); + assertTrue(stream.canReceive()); + assertTrue(stream.canSend()); verifyZeroInteractions(serverListener); return stream; } diff --git a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java index 3ca82ff3bc..35be9f4b4c 100644 --- a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java @@ -127,9 +127,9 @@ class OkHttpClientStream extends Http2ClientStream { } @Override - public boolean setStatus(Status newStatus, Metadata.Trailers trailers) { + public boolean transportReportStatus(Status newStatus, Metadata.Trailers trailers) { synchronized (executorLock) { - return super.setStatus(newStatus, trailers); + return super.transportReportStatus(newStatus, trailers); } } diff --git a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransport.java index d0a90500c3..e194400f69 100644 --- a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransport.java @@ -130,7 +130,7 @@ public class OkHttpClientTransport extends AbstractClientTransport { OkHttpClientStream clientStream = OkHttpClientStream.newStream(executor, listener, frameWriter, this, outboundFlow); if (goAway) { - clientStream.setStatus(goAwayStatus, new Metadata.Trailers()); + clientStream.transportReportStatus(goAwayStatus, new Metadata.Trailers()); } else { assignStreamId(clientStream); } @@ -229,7 +229,7 @@ public class OkHttpClientTransport extends AbstractClientTransport { } for (OkHttpClientStream stream : goAwayStreams) { - stream.setStatus(status, new Metadata.Trailers()); + stream.transportReportStatus(status, new Metadata.Trailers()); } } @@ -243,7 +243,7 @@ public class OkHttpClientTransport extends AbstractClientTransport { stream = streams.remove(streamId); if (stream != null) { if (status != null) { - stream.setStatus(status, new Metadata.Trailers()); + stream.transportReportStatus(status, new Metadata.Trailers()); } return true; } diff --git a/testing/src/main/java/com/google/net/stubby/testing/InProcessUtils.java b/testing/src/main/java/com/google/net/stubby/testing/InProcessUtils.java index 962cce8f06..e7b4df81ce 100644 --- a/testing/src/main/java/com/google/net/stubby/testing/InProcessUtils.java +++ b/testing/src/main/java/com/google/net/stubby/testing/InProcessUtils.java @@ -12,7 +12,6 @@ import com.google.net.stubby.transport.ClientStream; import com.google.net.stubby.transport.ClientStreamListener; import com.google.net.stubby.transport.ClientTransport; import com.google.net.stubby.transport.ClientTransportFactory; -import com.google.net.stubby.transport.StreamState; import java.io.IOException; import java.io.InputStream; @@ -151,12 +150,9 @@ public class InProcessUtils { // Return implementation of ClientStream which delegates to the server listener. return new ClientStream() { - StreamState state = StreamState.OPEN; - @Override public void cancel() { cancelled.set(true); - state = StreamState.CLOSED; serverWorkQueue.execute(new Runnable() { @Override public void run() { @@ -167,7 +163,6 @@ public class InProcessUtils { @Override public void halfClose() { - state = StreamState.WRITE_ONLY; serverWorkQueue.execute(new Runnable() { @Override public void run() { @@ -176,11 +171,6 @@ public class InProcessUtils { }); } - @Override - public StreamState state() { - return state; - } - @Override public void writeMessage(final InputStream message, int length, @Nullable final Runnable accepted) { @@ -220,11 +210,6 @@ public class InProcessUtils { // No-op } - @Override - public StreamState state() { - return StreamState.CLOSED; - } - @Override public void writeMessage(InputStream message, int length, @Nullable Runnable accepted) { }