Remove StreamState and use inboundPhase/outboundPhase instead

Remove synchronization on stateLock as we are not required to be thread safe
Add better toString for stream impls
Internal cleanup of various 'status' fields in AbstractClientStream
Remove 'stashTrailers' as we've already extracted status in layer above correctly

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=80678356
This commit is contained in:
lryan 2014-11-24 16:41:02 -08:00 committed by Eric Anderson
parent abdbf4940a
commit c5e70c2310
14 changed files with 237 additions and 286 deletions

View File

@ -2,6 +2,7 @@ package com.google.net.stubby;
import static com.google.common.base.Charsets.US_ASCII;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
@ -346,16 +347,11 @@ public final class Status {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("[").append(code);
if (description != null) {
builder.append(";").append(description);
}
if (cause != null) {
builder.append(";").append(cause);
}
builder.append("]");
return builder.toString();
return MoreObjects.toStringHelper(this)
.add("code", code.name())
.add("description", description)
.add("cause", cause)
.toString();
}
private static class StatusCodeMarshaller implements Metadata.Marshaller<Status> {

View File

@ -1,9 +1,6 @@
package com.google.net.stubby.transport;
import static com.google.net.stubby.transport.StreamState.CLOSED;
import static com.google.net.stubby.transport.StreamState.OPEN;
import static com.google.net.stubby.transport.StreamState.READ_ONLY;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
@ -16,7 +13,6 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* The abstract base class for {@link ClientStream} implementations.
@ -27,16 +23,13 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
private final ClientStreamListener listener;
private boolean listenerClosed;
@GuardedBy("stateLock")
// Stored status & trailers to report when deframer completes or
// transportReportStatus is directly called.
private Status status;
private Metadata.Trailers trailers;
private final Object stateLock = new Object();
private volatile StreamState state = StreamState.OPEN;
// Stored status & trailers to report when deframer completes.
private Status stashedStatus;
private Metadata.Trailers stashedTrailers;
protected AbstractClientStream(ClientStreamListener listener,
@Nullable Decompressor decompressor,
@ -62,15 +55,14 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
* @param errorStatus the error to report
*/
protected void inboundTransportError(Status errorStatus) {
if (state() == CLOSED) {
if (inboundPhase() == Phase.STATUS) {
log.log(Level.INFO, "Received transport error on closed stream {0} {1}",
new Object[]{id(), errorStatus});
return;
}
inboundPhase(Phase.STATUS);
// For transport errors we immediately report status to the application layer
// and do not wait for additional payloads.
setStatus(errorStatus, new Metadata.Trailers());
transportReportStatus(errorStatus, new Metadata.Trailers());
}
/**
@ -82,7 +74,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
* @param headers the parsed headers
*/
protected void inboundHeadersReceived(Metadata.Headers headers) {
if (state() == CLOSED) {
if (inboundPhase() == Phase.STATUS) {
log.log(Level.INFO, "Received headers on closed stream {0} {1}",
new Object[]{id(), headers});
}
@ -95,11 +87,11 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
*/
protected void inboundDataReceived(Buffer frame) {
Preconditions.checkNotNull(frame, "frame");
if (state() == CLOSED) {
if (inboundPhase() == Phase.STATUS) {
frame.close();
return;
}
if (inboundPhase == Phase.HEADERS) {
if (inboundPhase() == Phase.HEADERS) {
// Have not received headers yet so error
inboundTransportError(Status.INTERNAL.withDescription("headers not received before payload"));
frame.close();
@ -120,16 +112,16 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
* Called by transport implementations when they receive trailers.
*/
protected void inboundTrailersReceived(Metadata.Trailers trailers, Status status) {
if (state() == CLOSED) {
Preconditions.checkNotNull(trailers, "trailers");
if (inboundPhase() == Phase.STATUS) {
log.log(Level.INFO, "Received trailers on closed stream {0}\n {1}\n {3}",
new Object[]{id(), status, trailers});
}
inboundPhase(Phase.STATUS);
// Stash the status & trailers so they can be delivered by the deframer calls
// remoteEndClosed
stashedStatus = status;
this.status = status;
if (GRPC_V2_PROTOCOL) {
stashTrailers(trailers);
this.trailers = trailers;
}
deframe(Buffers.empty(), true);
}
@ -138,27 +130,13 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
@Override
protected void receiveStatus(Status status) {
Preconditions.checkNotNull(status, "status");
stashedStatus = status;
stashedTrailers = new Metadata.Trailers();
}
/**
* If using gRPC v2 protocol, this method must be called with received trailers before notifying
* deframer of end of stream.
*/
protected void stashTrailers(Metadata.Trailers trailers) {
Preconditions.checkNotNull(trailers, "trailers");
stashedStatus = trailers.get(Status.CODE_KEY)
.withDescription(trailers.get(Status.MESSAGE_KEY));
trailers.removeAll(Status.CODE_KEY);
trailers.removeAll(Status.MESSAGE_KEY);
stashedTrailers = trailers;
this.status = status;
trailers = new Metadata.Trailers();
}
@Override
protected void remoteEndClosed() {
Preconditions.checkState(stashedStatus != null, "Status and trailers should have been set");
setStatus(stashedStatus, stashedTrailers);
transportReportStatus(status, trailers);
}
@Override
@ -176,59 +154,63 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
/**
* Sets the status if not already set and notifies the stream listener that the stream was closed.
* Report stream closure with status to the application layer if not already reported.
* This method must be called from the transport thread.
*
* @param newStatus the new status to set
* @return {@code} true if the status was not already set.
*/
public boolean setStatus(final Status newStatus, Metadata.Trailers trailers) {
public boolean transportReportStatus(final Status newStatus, Metadata.Trailers trailers) {
Preconditions.checkNotNull(newStatus, "newStatus");
synchronized (stateLock) {
if (status != null) {
// Disallow override of current status.
return false;
}
status = newStatus;
state = CLOSED;
inboundPhase(Phase.STATUS);
status = newStatus;
// Invoke the observer callback which will schedule work onto an application thread
if (!listenerClosed) {
// Status has not been reported to the application layer
listenerClosed = true;
listener.closed(newStatus, trailers);
}
// Invoke the observer callback.
listener.closed(newStatus, trailers);
// Free any resources.
dispose();
return true;
}
@Override
public final void halfClose() {
outboundPhase(Phase.STATUS);
synchronized (stateLock) {
state = state == OPEN ? READ_ONLY : CLOSED;
}
closeFramer(null);
}
@Override
public StreamState state() {
return state;
}
@Override
public void cancel() {
// Allow phase to go to cancelled regardless of prior phase.
outboundPhase = Phase.STATUS;
if (id() != null) {
// Only send a cancellation to remote side if we have actually been allocated
// a stream id. i.e. the server side is aware of the stream.
sendCancel();
if (outboundPhase(Phase.STATUS) != Phase.STATUS) {
closeFramer(null);
}
}
/**
* Send a stream cancellation message to the remote server.
* Cancel the stream. Called by the application layer, never called by the transport.
*/
@Override
public void cancel() {
outboundPhase(Phase.STATUS);
if (id() != null) {
// Only send a cancellation to remote side if we have actually been allocated
// a stream id and we are not already closed. i.e. the server side is aware of the stream.
sendCancel();
}
dispose();
}
/**
* Send a stream cancellation message to the remote server. Can be called by either the
* application or transport layers.
*/
protected abstract void sendCancel();
@Override
protected MoreObjects.ToStringHelper toStringHelper() {
MoreObjects.ToStringHelper toStringHelper = super.toStringHelper();
if (status != null) {
toStringHelper.add("status", status);
}
return toStringHelper;
}
@Override
public boolean isClosed() {
return super.isClosed() || listenerClosed;
}
}

View File

@ -1,9 +1,5 @@
package com.google.net.stubby.transport;
import static com.google.net.stubby.transport.StreamState.CLOSED;
import static com.google.net.stubby.transport.StreamState.OPEN;
import static com.google.net.stubby.transport.StreamState.WRITE_ONLY;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
@ -16,7 +12,6 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* Abstract base class for {@link ServerStream} implementations.
@ -25,14 +20,11 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
implements ServerStream {
private static final Logger log = Logger.getLogger(AbstractServerStream.class.getName());
/** Whether listener.closed() has been called. */
private boolean listenerClosed;
private ServerStreamListener listener;
private final Object stateLock = new Object();
private volatile StreamState state = StreamState.OPEN;
private boolean headersSent = false;
/** Whether listener.closed() has been called. */
@GuardedBy("stateLock")
private boolean listenerClosed;
/**
* Whether the stream was closed gracefully by the application (vs. a transport-level failure).
*/
@ -84,15 +76,12 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
public final void close(Status status, Metadata.Trailers trailers) {
Preconditions.checkNotNull(status, "status");
Preconditions.checkNotNull(trailers, "trailers");
outboundPhase(Phase.STATUS);
synchronized (stateLock) {
state = CLOSED;
if (outboundPhase(Phase.STATUS) != Phase.STATUS) {
gracefulClose = true;
this.stashedTrailers = trailers;
writeStatusToTrailers(status);
closeFramer(status);
}
gracefulClose = true;
this.stashedTrailers = trailers;
writeStatusToTrailers(status);
closeFramer(status);
dispose();
}
private void writeStatusToTrailers(Status status) {
@ -111,7 +100,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
* be retained.
*/
public void inboundDataReceived(Buffer frame, boolean endOfStream) {
if (state() == StreamState.CLOSED) {
if (inboundPhase() == Phase.STATUS) {
frame.close();
return;
}
@ -173,12 +162,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
* abortStream()} for abnormal.
*/
public void complete() {
synchronized (stateLock) {
if (listenerClosed) {
return;
}
listenerClosed = true;
}
listenerClosed = true;
if (!gracefulClose) {
listener.closed(Status.INTERNAL.withDescription("successful complete() without close()"));
throw new IllegalStateException("successful complete() without close()");
@ -186,22 +170,14 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
listener.closed(Status.OK);
}
@Override
public StreamState state() {
return state;
}
/**
* Called when the remote end half-closes the stream.
*/
@Override
protected final void remoteEndClosed() {
synchronized (stateLock) {
Preconditions.checkState(state == OPEN, "Stream not OPEN");
state = WRITE_ONLY;
if (inboundPhase(Phase.STATUS) != Phase.STATUS) {
listener.halfClosed();
}
inboundPhase(Phase.STATUS);
listener.halfClosed();
}
/**
@ -217,31 +193,27 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
* about stream closure and send the status
*/
public final void abortStream(Status status, boolean notifyClient) {
// TODO(user): Investigate whether we can remove the notification to the client
// and rely on a transport layer stream reset instead.
Preconditions.checkArgument(!status.isOk(), "status must not be OK");
boolean closeListener;
synchronized (stateLock) {
if (state == CLOSED) {
// Can't actually notify client.
notifyClient = false;
}
state = CLOSED;
closeListener = !listenerClosed;
if (!listenerClosed) {
listenerClosed = true;
listener.closed(status);
}
try {
if (notifyClient) {
if (stashedTrailers == null) {
stashedTrailers = new Metadata.Trailers();
}
writeStatusToTrailers(status);
closeFramer(status);
if (notifyClient) {
// TODO(user): Remove
if (stashedTrailers == null) {
stashedTrailers = new Metadata.Trailers();
}
writeStatusToTrailers(status);
closeFramer(status);
} else {
dispose();
} finally {
if (closeListener) {
listener.closed(status);
}
}
}
@Override
public boolean isClosed() {
return super.isClosed() || listenerClosed;
}
}

View File

@ -1,5 +1,7 @@
package com.google.net.stubby.transport;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.FutureCallback;
@ -33,7 +35,6 @@ public abstract class AbstractStream<IdT> implements Stream {
}
private volatile IdT id;
private final Object writeLock = new Object();
private final Framer framer;
private final FutureCallback<Object> deframerErrorCallback = new FutureCallback<Object>() {
@Override
@ -47,8 +48,16 @@ public abstract class AbstractStream<IdT> implements Stream {
final GrpcDeframer deframer;
final MessageDeframer2 deframer2;
Phase inboundPhase = Phase.HEADERS;
Phase outboundPhase = Phase.HEADERS;
/**
* Inbound phase is exclusively written to by the transport thread.
*/
private Phase inboundPhase = Phase.HEADERS;
/**
* Outbound phase is exclusively written to by the application thread.
*/
private Phase outboundPhase = Phase.HEADERS;
AbstractStream(@Nullable Decompressor decompressor,
Executor deframerExecutor) {
@ -119,25 +128,13 @@ public abstract class AbstractStream<IdT> implements Stream {
this.id = id;
}
/**
* Free any resources associated with this stream. Subclass implementations must call this
* version.
*/
public void dispose() {
synchronized (writeLock) {
framer.dispose();
}
}
@Override
public void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
Preconditions.checkNotNull(message, "message");
Preconditions.checkArgument(length >= 0, "length must be >= 0");
outboundPhase(Phase.MESSAGE);
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.writePayload(message, length);
}
if (!framer.isClosed()) {
framer.writePayload(message, length);
}
// TODO(user): add flow control.
@ -148,13 +145,40 @@ public abstract class AbstractStream<IdT> implements Stream {
@Override
public final void flush() {
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.flush();
}
if (!framer.isClosed()) {
framer.flush();
}
}
/**
* Closes the underlying framer.
*
* <p>No-op if the framer has already been closed.
*
* @param status if not null, will write the status to the framer before closing it
*/
final void closeFramer(@Nullable Status status) {
if (!framer.isClosed()) {
if (status != null) {
framer.writeStatus(status);
}
framer.close();
}
}
/**
* Free any resources associated with this stream. Subclass implementations must call this
* version.
* <p>
* NOTE. Can be called by both the transport thread and the application thread. Transport
* threads need to dispose when the remote side has terminated the stream. Application threads
* will dispose when the application decides to close the stream as part of normal processing.
* </p>
*/
public void dispose() {
framer.dispose();
}
/**
* Sends an outbound frame to the remote end point.
*
@ -216,42 +240,36 @@ public abstract class AbstractStream<IdT> implements Stream {
}
}
final Phase inboundPhase() {
return inboundPhase;
}
/**
* Transitions the inbound phase. If the transition is disallowed, throws a
* {@link IllegalStateException}.
* Transitions the inbound phase to the given phase and returns the previous phase.
* If the transition is disallowed, throws an {@link IllegalStateException}.
*/
final void inboundPhase(Phase nextPhase) {
final Phase inboundPhase(Phase nextPhase) {
Phase tmp = inboundPhase;
inboundPhase = verifyNextPhase(inboundPhase, nextPhase);
return tmp;
}
final Phase outboundPhase() {
return outboundPhase;
}
/**
* Transitions the outbound phase. If the transition is disallowed, throws a
* {@link IllegalStateException}.
* Transitions the outbound phase to the given phase and returns the previous phase.
* If the transition is disallowed, throws an {@link IllegalStateException}.
*/
final void outboundPhase(Phase nextPhase) {
final Phase outboundPhase(Phase nextPhase) {
Phase tmp = outboundPhase;
outboundPhase = verifyNextPhase(outboundPhase, nextPhase);
}
/**
* Closes the underlying framer.
*
* <p>No-op if the framer has already been closed.
*
* @param status if not null, will write the status to the framer before closing it
*/
final void closeFramer(@Nullable Status status) {
synchronized (writeLock) {
if (!framer.isClosed()) {
if (status != null) {
framer.writeStatus(status);
}
framer.close();
}
}
return tmp;
}
private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) {
if (nextPhase.ordinal() < currentPhase.ordinal() || currentPhase == Phase.STATUS) {
if (nextPhase.ordinal() < currentPhase.ordinal()) {
throw new IllegalStateException(
String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase));
}
@ -277,4 +295,40 @@ public abstract class AbstractStream<IdT> implements Stream {
}
}, MoreExecutors.directExecutor());
}
/**
* Can the stream receive data from its remote peer.
*/
public boolean canReceive() {
return inboundPhase() != Phase.STATUS;
}
/**
* Can the stream send data to its remote peer.
*/
public boolean canSend() {
return outboundPhase() != Phase.STATUS;
}
/**
* Is the stream fully closed. Note that this method is not thread-safe as inboundPhase and
* outboundPhase are mutated in different threads. Tests must account for thread coordination
* when calling.
*/
@VisibleForTesting
public boolean isClosed() {
return inboundPhase() == Phase.STATUS && outboundPhase() == Phase.STATUS;
}
public String toString() {
return toStringHelper().toString();
}
protected MoreObjects.ToStringHelper toStringHelper() {
return MoreObjects.toStringHelper(this)
.add("id", id())
.add("inboundPhase", inboundPhase().name())
.add("outboundPhase", outboundPhase().name());
}
}

View File

@ -85,7 +85,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
}
protected void transportDataReceived(Buffer frame, boolean endOfStream) {
if (inboundPhase == Phase.HEADERS) {
if (inboundPhase() == Phase.HEADERS) {
// Must receive headers prior to receiving any payload as we use headers to check for
// protocol correctness.
transportError = Status.INTERNAL.withDescription("no headers received prior to data");
@ -199,5 +199,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
*/
private static void stripTransportDetails(Metadata metadata) {
metadata.removeAll(HTTP2_STATUS);
metadata.removeAll(Status.CODE_KEY);
metadata.removeAll(Status.MESSAGE_KEY);
}
}

View File

@ -10,12 +10,6 @@ import javax.annotation.Nullable;
* <p>An implementation doesn't need to be thread-safe.
*/
public interface Stream {
/**
* Gets the current state of this stream.
*/
StreamState state();
/**
* Writes a message payload to the remote end-point. The bytes from the stream are immediate read
* by the Transport. This method will always return immediately and will not wait for the write to

View File

@ -1,36 +0,0 @@
package com.google.net.stubby.transport;
/**
* The state of a single {@link Stream} within a transport.
*
* <p>Client state transitions:<br>
* OPEN->READ_ONLY->CLOSED (no-error case)<br>
* OPEN->CLOSED (error) <br>
* STARTING->CLOSED (Failed creation) <br>
*
* <p>Server state transitions:<br>
* OPEN->WRITE_ONLY->CLOSED (no-error case) <br>
* OPEN->CLOSED (error case) <br>
*/
public enum StreamState {
/**
* The stream is open for write by both endpoints.
*/
OPEN,
/**
* Only the remote endpoint may send data. The local endpoint may only read.
*/
READ_ONLY,
/**
* Only the local endpoint may send data. The remote endpoint may only read.
*/
WRITE_ONLY,
/**
* Neither endpoint may send data.
*/
CLOSED
}

View File

@ -150,7 +150,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
// TODO(user): do something with errorCode?
Http2Stream http2Stream = connection().requireStream(streamId);
NettyClientStream stream = clientStream(http2Stream);
stream.setStatus(Status.UNKNOWN, new Metadata.Trailers());
stream.transportReportStatus(Status.UNKNOWN, new Metadata.Trailers());
}
/**
@ -166,7 +166,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
// Any streams that are still active must be closed.
for (Http2Stream stream : http2Streams()) {
clientStream(stream).setStatus(goAwayStatus, new Metadata.Trailers());
clientStream(stream).transportReportStatus(goAwayStatus, new Metadata.Trailers());
}
}
@ -185,7 +185,8 @@ class NettyClientHandler extends Http2ConnectionHandler {
// Close the stream with a status that contains the cause.
Http2Stream stream = connection().stream(http2Ex.streamId());
if (stream != null) {
clientStream(stream).setStatus(Status.fromThrowable(cause), new Metadata.Trailers());
clientStream(stream).transportReportStatus(Status.fromThrowable(cause),
new Metadata.Trailers());
}
// Delegate to the base class to send a RST_STREAM.
@ -210,7 +211,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
private void cancelStream(ChannelHandlerContext ctx, CancelStreamCommand cmd,
ChannelPromise promise) throws Http2Exception {
NettyClientStream stream = cmd.stream();
stream.setStatus(Status.CANCELLED, new Metadata.Trailers());
stream.transportReportStatus(Status.CANCELLED, new Metadata.Trailers());
// No need to set the stream status for a cancellation. It should already have been
// set prior to sending the command.
@ -254,7 +255,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
int lastKnownStream = connection().local().lastKnownStream();
for (Http2Stream stream : http2Streams()) {
if (lastKnownStream < stream.id()) {
clientStream(stream).setStatus(goAwayStatus, new Metadata.Trailers());
clientStream(stream).transportReportStatus(goAwayStatus, new Metadata.Trailers());
stream.close();
}
}

View File

@ -18,7 +18,6 @@ import static org.mockito.Mockito.when;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.StreamState;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@ -85,8 +84,6 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
.add(as("auth"), as("sometoken"))
.add(CONTENT_TYPE_HEADER, CONTENT_TYPE_GRPC);
when(stream.state()).thenReturn(StreamState.OPEN);
// Simulate activation of the handler to force writing of the initial settings
handler.handlerAdded(ctx);
@ -226,7 +223,8 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
handler.channelRead(ctx, goAwayFrame(0));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
InOrder inOrder = inOrder(stream);
inOrder.verify(stream, calls(1)).setStatus(captor.capture(), notNull(Metadata.Trailers.class));
inOrder.verify(stream, calls(1)).transportReportStatus(captor.capture(),
notNull(Metadata.Trailers.class));
assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
}

View File

@ -7,6 +7,8 @@ import static com.google.net.stubby.transport.netty.Utils.CONTENT_TYPE_HEADER;
import static com.google.net.stubby.transport.netty.Utils.STATUS_OK;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
@ -17,7 +19,6 @@ import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.AbstractStream;
import com.google.net.stubby.transport.ClientStreamListener;
import com.google.net.stubby.transport.StreamState;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.AsciiString;
@ -67,7 +68,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
// Force stream creation.
stream().id(STREAM_ID);
stream().halfClose();
assertEquals(StreamState.READ_ONLY, stream.state());
assertTrue(stream().canReceive());
assertFalse(stream().canSend());
}
@Test
@ -97,36 +99,36 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
@Test
public void setStatusWithOkShouldCloseStream() {
stream().id(1);
stream().setStatus(Status.OK, new Metadata.Trailers());
stream().transportReportStatus(Status.OK, new Metadata.Trailers());
verify(listener).closed(same(Status.OK), any(Metadata.Trailers.class));
assertEquals(StreamState.CLOSED, stream.state());
assertTrue(stream.isClosed());
}
@Test
public void setStatusWithErrorShouldCloseStream() {
Status errorStatus = Status.INTERNAL;
stream().setStatus(errorStatus, new Metadata.Trailers());
stream().transportReportStatus(errorStatus, new Metadata.Trailers());
verify(listener).closed(eq(errorStatus), any(Metadata.Trailers.class));
assertEquals(StreamState.CLOSED, stream.state());
assertTrue(stream.isClosed());
}
@Test
public void setStatusWithOkShouldNotOverrideError() {
Status errorStatus = Status.INTERNAL;
stream().setStatus(errorStatus, new Metadata.Trailers());
stream().setStatus(Status.OK, new Metadata.Trailers());
stream().transportReportStatus(errorStatus, new Metadata.Trailers());
stream().transportReportStatus(Status.OK, new Metadata.Trailers());
verify(listener).closed(any(Status.class), any(Metadata.Trailers.class));
assertEquals(StreamState.CLOSED, stream.state());
assertTrue(stream.isClosed());
}
@Test
public void setStatusWithErrorShouldNotOverridePreviousError() {
Status errorStatus = Status.INTERNAL;
stream().setStatus(errorStatus, new Metadata.Trailers());
stream().setStatus(Status.fromThrowable(new RuntimeException("fake")),
stream().transportReportStatus(errorStatus, new Metadata.Trailers());
stream().transportReportStatus(Status.fromThrowable(new RuntimeException("fake")),
new Metadata.Trailers());
verify(listener).closed(any(Status.class), any(Metadata.Trailers.class));
assertEquals(StreamState.CLOSED, stream.state());
assertTrue(stream.isClosed());
}
@Override
@ -170,7 +172,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener).closed(captor.capture(), any(Metadata.Trailers.class));
assertEquals(Status.INTERNAL.getCode(), captor.getValue().getCode());
assertEquals(StreamState.CLOSED, stream.state());
assertTrue(stream.isClosed());
}
@Test
@ -194,7 +196,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener).closed(captor.capture(), any(Metadata.Trailers.class));
assertEquals(Status.INTERNAL.getCode(), captor.getValue().getCode());
assertEquals(StreamState.CLOSED, stream.state());
assertTrue(stream.isClosed());
}
@ -209,7 +211,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
@Override
protected AbstractStream<Integer> createStream() {
AbstractStream<Integer> stream = new NettyClientStream(listener, channel, handler);
assertEquals(StreamState.OPEN, stream.state());
assertTrue(stream.canSend());
assertTrue(stream.canReceive());
return stream;
}

View File

@ -2,7 +2,7 @@ package com.google.net.stubby.transport.netty;
import static com.google.net.stubby.transport.netty.NettyTestUtil.messageFrame;
import static com.google.net.stubby.transport.netty.NettyTestUtil.statusFrame;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.never;
@ -14,7 +14,6 @@ import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.AbstractStream;
import com.google.net.stubby.transport.ServerStreamListener;
import com.google.net.stubby.transport.StreamState;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
@ -78,7 +77,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
// Sending complete. Listener gets closed()
stream().complete();
verify(serverListener).closed(Status.OK);
assertEquals(StreamState.CLOSED, stream.state());
assertTrue(stream().isClosed());
verifyZeroInteractions(serverListener);
}
@ -92,7 +91,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
// Sending complete. Listener gets closed()
stream().complete();
verify(serverListener).closed(Status.OK);
assertEquals(StreamState.CLOSED, stream.state());
assertTrue(stream().isClosed());
verifyZeroInteractions(serverListener);
}
@ -100,12 +99,12 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
public void closeAfterClientHalfCloseShouldSucceed() throws Exception {
// Client half-closes. Listener gets halfClosed()
stream().inboundDataReceived(new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true);
assertEquals(StreamState.WRITE_ONLY, stream.state());
assertTrue(stream().canSend());
verify(serverListener).halfClosed();
// Server closes. Status sent
stream().close(Status.OK, trailers);
assertTrue(stream().isClosed());
verifyNoMoreInteractions(serverListener);
assertEquals(StreamState.CLOSED, stream.state());
verify(channel).writeAndFlush(
new SendGrpcFrameCommand(STREAM_ID, statusFrame(Status.OK), true));
// Sending and receiving complete. Listener gets closed()
@ -118,7 +117,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
public void abortStreamAndSendStatus() throws Exception {
Status status = Status.INTERNAL.withCause(new Throwable());
stream().abortStream(status, true);
assertEquals(StreamState.CLOSED, stream.state());
assertTrue(stream().isClosed());
verify(serverListener).closed(same(status));
verify(channel).writeAndFlush(new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true));
verifyNoMoreInteractions(serverListener);
@ -128,7 +127,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
public void abortStreamAndNotSendStatus() throws Exception {
Status status = Status.INTERNAL.withCause(new Throwable());
stream().abortStream(status, false);
assertEquals(StreamState.CLOSED, stream.state());
assertTrue(stream().isClosed());
verify(serverListener).closed(same(status));
verify(channel, never()).writeAndFlush(
new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true));
@ -140,20 +139,21 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
Status status = Status.INTERNAL.withCause(new Throwable());
// Client half-closes. Listener gets halfClosed()
stream().inboundDataReceived(new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true);
assertEquals(StreamState.WRITE_ONLY, stream.state());
assertTrue(stream().canSend());
verify(serverListener).halfClosed();
// Abort
// Abort from the transport layer
stream().abortStream(status, true);
verify(serverListener).closed(same(status));
assertEquals(StreamState.CLOSED, stream.state());
verifyNoMoreInteractions(serverListener);
assertTrue(stream().isClosed());
}
@Override
protected AbstractStream<Integer> createStream() {
NettyServerStream stream = new NettyServerStream(channel, STREAM_ID, handler);
stream.setListener(serverListener);
assertEquals(StreamState.OPEN, stream.state());
assertTrue(stream.canReceive());
assertTrue(stream.canSend());
verifyZeroInteractions(serverListener);
return stream;
}

View File

@ -127,9 +127,9 @@ class OkHttpClientStream extends Http2ClientStream {
}
@Override
public boolean setStatus(Status newStatus, Metadata.Trailers trailers) {
public boolean transportReportStatus(Status newStatus, Metadata.Trailers trailers) {
synchronized (executorLock) {
return super.setStatus(newStatus, trailers);
return super.transportReportStatus(newStatus, trailers);
}
}

View File

@ -130,7 +130,7 @@ public class OkHttpClientTransport extends AbstractClientTransport {
OkHttpClientStream clientStream = OkHttpClientStream.newStream(executor, listener,
frameWriter, this, outboundFlow);
if (goAway) {
clientStream.setStatus(goAwayStatus, new Metadata.Trailers());
clientStream.transportReportStatus(goAwayStatus, new Metadata.Trailers());
} else {
assignStreamId(clientStream);
}
@ -229,7 +229,7 @@ public class OkHttpClientTransport extends AbstractClientTransport {
}
for (OkHttpClientStream stream : goAwayStreams) {
stream.setStatus(status, new Metadata.Trailers());
stream.transportReportStatus(status, new Metadata.Trailers());
}
}
@ -243,7 +243,7 @@ public class OkHttpClientTransport extends AbstractClientTransport {
stream = streams.remove(streamId);
if (stream != null) {
if (status != null) {
stream.setStatus(status, new Metadata.Trailers());
stream.transportReportStatus(status, new Metadata.Trailers());
}
return true;
}

View File

@ -12,7 +12,6 @@ import com.google.net.stubby.transport.ClientStream;
import com.google.net.stubby.transport.ClientStreamListener;
import com.google.net.stubby.transport.ClientTransport;
import com.google.net.stubby.transport.ClientTransportFactory;
import com.google.net.stubby.transport.StreamState;
import java.io.IOException;
import java.io.InputStream;
@ -151,12 +150,9 @@ public class InProcessUtils {
// Return implementation of ClientStream which delegates to the server listener.
return new ClientStream() {
StreamState state = StreamState.OPEN;
@Override
public void cancel() {
cancelled.set(true);
state = StreamState.CLOSED;
serverWorkQueue.execute(new Runnable() {
@Override
public void run() {
@ -167,7 +163,6 @@ public class InProcessUtils {
@Override
public void halfClose() {
state = StreamState.WRITE_ONLY;
serverWorkQueue.execute(new Runnable() {
@Override
public void run() {
@ -176,11 +171,6 @@ public class InProcessUtils {
});
}
@Override
public StreamState state() {
return state;
}
@Override
public void writeMessage(final InputStream message, int length,
@Nullable final Runnable accepted) {
@ -220,11 +210,6 @@ public class InProcessUtils {
// No-op
}
@Override
public StreamState state() {
return StreamState.CLOSED;
}
@Override
public void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
}