From 9d50299a043ebde0e3cbea4e252cf51540756353 Mon Sep 17 00:00:00 2001 From: ejona Date: Mon, 22 Sep 2014 12:23:19 -0700 Subject: [PATCH] Plumb trailer passing through transport streams. We purposefully avoid going through the (de)framer, since close() behavior is specific to whether on client or server. AbstractClientStream and AbstractServerStream handle mapping the events to appropriate semantics, but require stashing status/trailer for later use. It was very interesting getting to a point where we could support the old and new protocol; that is probably the most detailed-oriented portion of the CL. There are some interface hacks going on, but those will naturally be removed when we trash the gRPC v1 framer. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=76092186 --- .../stubby/auth/OAuth2ChannelInterceptor.java | 2 +- .../java/com/google/net/stubby/Metadata.java | 41 ++++++- .../java/com/google/net/stubby/Status.java | 40 +++++- .../stubby/newtransport/AbstractBuffer.java | 2 +- .../newtransport/AbstractClientStream.java | 63 +++++++--- .../newtransport/AbstractServerStream.java | 67 ++++++++-- .../stubby/newtransport/AbstractStream.java | 45 ++++--- .../net/stubby/newtransport/Deframer.java | 8 +- .../net/stubby/newtransport/GrpcDeframer.java | 28 +++-- .../newtransport/InputStreamDeframer.java | 2 +- .../stubby/newtransport/MessageDeframer2.java | 116 +++++++----------- .../newtransport/netty/NettyClientStream.java | 13 +- .../netty/NettyServerHandler.java | 10 -- .../newtransport/netty/NettyServerStream.java | 8 +- .../com/google/net/stubby/MetadataTest.java | 29 ++++- .../stubby/newtransport/GrpcDeframerTest.java | 20 +-- .../newtransport/MessageDeframer2Test.java | 91 ++++++-------- .../netty/NettyServerHandlerTest.java | 4 +- .../netty/NettyServerStreamTest.java | 28 ++--- 19 files changed, 380 insertions(+), 237 deletions(-) diff --git a/auth/src/main/java/com/google/net/stubby/auth/OAuth2ChannelInterceptor.java b/auth/src/main/java/com/google/net/stubby/auth/OAuth2ChannelInterceptor.java index 1459fd70dc..ea7b06f0d0 100644 --- a/auth/src/main/java/com/google/net/stubby/auth/OAuth2ChannelInterceptor.java +++ b/auth/src/main/java/com/google/net/stubby/auth/OAuth2ChannelInterceptor.java @@ -14,7 +14,7 @@ import javax.inject.Provider; /** Channel wrapper that authenticates all calls with OAuth2. */ public class OAuth2ChannelInterceptor extends ForwardingChannel { private static final Metadata.Key AUTHORIZATION = - new Metadata.Key("Authorization", Metadata.STRING_MARSHALLER); + Metadata.Key.of("Authorization", Metadata.STRING_MARSHALLER); private final OAuth2AccessTokenProvider accessTokenProvider; private final Provider authorizationHeaderProvider diff --git a/core/src/main/java/com/google/net/stubby/Metadata.java b/core/src/main/java/com/google/net/stubby/Metadata.java index c757294653..951a69abe6 100644 --- a/core/src/main/java/com/google/net/stubby/Metadata.java +++ b/core/src/main/java/com/google/net/stubby/Metadata.java @@ -87,6 +87,42 @@ public abstract class Metadata { } }; + /** + * Simple metadata marshaller that encodes an integer as a signed decimal string or as big endian + * binary with four bytes. + */ + public static final Marshaller INTEGER_MARSHALLER = new Marshaller() { + @Override + public byte[] toBytes(Integer value) { + return new byte[] { + (byte) (value >>> 24), + (byte) (value >>> 16), + (byte) (value >>> 8), + (byte) (value >>> 0)}; + } + + @Override + public String toAscii(Integer value) { + return value.toString(); + } + + @Override + public Integer parseBytes(byte[] serialized) { + if (serialized.length != 4) { + throw new IllegalArgumentException("Can only deserialize 4 bytes into an integer"); + } + return (serialized[0] << 24) + | (serialized[1] << 16) + | (serialized[2] << 8) + | serialized[3]; + } + + @Override + public Integer parseAscii(String ascii) { + return Integer.valueOf(ascii); + } + }; + private final ListMultimap store; private final boolean serializable; @@ -395,6 +431,9 @@ public abstract class Metadata { * Key for metadata entries. Allows for parsing and serialization of metadata. */ public static class Key { + public static Key of(String name, Marshaller marshaller) { + return new Key(name, marshaller); + } private final String name; private final byte[] asciiName; @@ -403,7 +442,7 @@ public abstract class Metadata { /** * Keys have a name and a marshaller used for serialization. */ - public Key(String name, Marshaller marshaller) { + private Key(String name, Marshaller marshaller) { this.name = Preconditions.checkNotNull(name, "name").intern(); this.asciiName = name.getBytes(StandardCharsets.US_ASCII); this.marshaller = Preconditions.checkNotNull(marshaller); diff --git a/core/src/main/java/com/google/net/stubby/Status.java b/core/src/main/java/com/google/net/stubby/Status.java index 6b6de600f0..61f6ed786f 100644 --- a/core/src/main/java/com/google/net/stubby/Status.java +++ b/core/src/main/java/com/google/net/stubby/Status.java @@ -4,6 +4,8 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.net.stubby.transport.Transport; +import java.util.logging.Logger; + import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; @@ -12,9 +14,14 @@ import javax.annotation.concurrent.Immutable; */ @Immutable public class Status { - public static final Status OK = new Status(Transport.Code.OK); public static final Status CANCELLED = new Status(Transport.Code.CANCELLED); + public static final Metadata.Key CODE_KEY + = Metadata.Key.of("grpc-status", new CodeMarshaller()); + public static final Metadata.Key MESSAGE_KEY + = Metadata.Key.of("grpc-message", Metadata.STRING_MARSHALLER); + + private static final Logger log = Logger.getLogger(Status.class.getName()); public static Status fromThrowable(Throwable t) { for (Throwable cause : Throwables.getCausalChain(t)) { @@ -134,4 +141,35 @@ public class Status { builder.append("]"); return builder.toString(); } + + private static class CodeMarshaller implements Metadata.Marshaller { + @Override + public byte[] toBytes(Transport.Code value) { + return Metadata.INTEGER_MARSHALLER.toBytes(value.getNumber()); + } + + @Override + public String toAscii(Transport.Code value) { + return Metadata.INTEGER_MARSHALLER.toAscii(value.getNumber()); + } + + @Override + public Transport.Code parseBytes(byte[] serialized) { + return intToCode(Metadata.INTEGER_MARSHALLER.parseBytes(serialized)); + } + + @Override + public Transport.Code parseAscii(String ascii) { + return intToCode(Metadata.INTEGER_MARSHALLER.parseAscii(ascii)); + } + + private Transport.Code intToCode(Integer i) { + Transport.Code code = Transport.Code.valueOf(i); + if (code == null) { + log.warning("Unknown Code: " + i); + code = Transport.Code.UNKNOWN; + } + return code; + } + } } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractBuffer.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractBuffer.java index 639d9db454..f85860d29b 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractBuffer.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractBuffer.java @@ -30,7 +30,7 @@ public abstract class AbstractBuffer implements Buffer { int b2 = readUnsignedByte(); int b3 = readUnsignedByte(); int b4 = readUnsignedByte(); - return (b1 << 24) + (b2 << 16) + (b3 << 8) + b4; + return (b1 << 24) | (b2 << 16) | (b3 << 8) | b4; } @Override diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java index 99508f8d8e..0b81babd5a 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java @@ -5,9 +5,15 @@ import static com.google.net.stubby.newtransport.StreamState.OPEN; import static com.google.net.stubby.newtransport.StreamState.READ_ONLY; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import javax.annotation.concurrent.GuardedBy; + /** * The abstract base class for {@link ClientStream} implementations. */ @@ -15,37 +21,64 @@ public abstract class AbstractClientStream extends AbstractStream implements Cli private final StreamListener listener; + @GuardedBy("stateLock") private Status status; private final Object stateLock = new Object(); private volatile StreamState state = StreamState.OPEN; + private Status stashedStatus; + private Metadata.Trailers stashedTrailers; + protected AbstractClientStream(StreamListener listener) { this.listener = Preconditions.checkNotNull(listener); } @Override - protected final StreamListener listener() { - return listener; + protected ListenableFuture receiveMessage(InputStream is, int length) { + return listener.messageRead(is, length); + } + + /** gRPC protocol v1 support */ + @Override + protected void receiveStatus(Status status) { + Preconditions.checkNotNull(status, "status"); + stashedStatus = status; + stashedTrailers = new Metadata.Trailers(); } /** - * Overrides the behavior of the {@link StreamListener#closed(Status)} method to call - * {@link #setStatus(Status)}, rather than notifying the {@link #listener()} directly. + * If using gRPC v2 protocol, this method must be called with received trailers before notifying + * deframer of end of stream. */ - @Override - protected final StreamListener inboundMessageHandler() { - // Wraps the base handler to get status update. - return new ForwardingStreamListener(super.inboundMessageHandler()) { - @Override - public void closed(Status status, Metadata.Trailers trailers) { - inboundPhase(Phase.STATUS); - // TODO(user): Fix once we switch the wire format to express status in trailers - setStatus(status, new Metadata.Trailers()); - } - }; + public void stashTrailers(Metadata.Trailers trailers) { + Preconditions.checkNotNull(status, "trailers"); + stashedStatus = new Status(trailers.get(Status.CODE_KEY), trailers.get(Status.MESSAGE_KEY)); + trailers.removeAll(Status.CODE_KEY); + trailers.removeAll(Status.MESSAGE_KEY); + stashedTrailers = trailers; } + @Override + protected void remoteEndClosed() { + Preconditions.checkState(stashedStatus != null, "Status and trailers should have been set"); + setStatus(stashedStatus, stashedTrailers); + } + + @Override + protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) { + sendFrame(frame, endOfStream); + } + + /** + * Sends an outbound frame to the remote end point. + * + * @param frame a buffer containing the chunk of data to be sent. + * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by + * this endpoint. + */ + protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream); + /** * Sets the status if not already set and notifies the stream listener that the stream was closed. * This method must be called from the transport thread. diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java index 9fd5a9d5b9..12048448aa 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java @@ -5,10 +5,14 @@ import static com.google.net.stubby.newtransport.StreamState.OPEN; import static com.google.net.stubby.newtransport.StreamState.WRITE_ONLY; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; import com.google.net.stubby.transport.Transport; +import java.io.InputStream; +import java.nio.ByteBuffer; + import javax.annotation.concurrent.GuardedBy; /** @@ -26,18 +30,25 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser /** Saved application status for notifying when graceful stream termination completes. */ @GuardedBy("stateLock") private Status gracefulStatus; - @GuardedBy("stateLock") - private Metadata.Trailers gracefulTrailers; - - @Override - protected final StreamListener listener() { - return listener; - } + /** Saved trailers from close() that need to be sent once the framer has sent all messages. */ + private Metadata.Trailers stashedTrailers; public final void setListener(ServerStreamListener listener) { this.listener = Preconditions.checkNotNull(listener, "listener"); } + @Override + protected ListenableFuture receiveMessage(InputStream is, int length) { + inboundPhase(Phase.MESSAGE); + return listener.messageRead(is, length); + } + + /** gRPC protocol v1 support */ + @Override + protected void receiveStatus(Status status) { + Preconditions.checkState(status == Status.OK, "Received status can only be OK on server"); + } + @Override public final void close(Status status, Metadata.Trailers trailers) { Preconditions.checkNotNull(status, "status"); @@ -52,13 +63,48 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser // is notified via complete()). Since there may be large buffers involved, the actual // completion of the RPC could be much later than this call. gracefulStatus = status; - gracefulTrailers = trailers; } } + trailers.removeAll(Status.CODE_KEY); + trailers.removeAll(Status.MESSAGE_KEY); + trailers.put(Status.CODE_KEY, status.getCode()); + if (status.getDescription() != null) { + trailers.put(Status.MESSAGE_KEY, status.getDescription()); + } + this.stashedTrailers = trailers; closeFramer(status); dispose(); } + @Override + protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) { + if (!GRPC_V2_PROTOCOL) { + sendFrame(frame, endOfStream); + } else { + sendFrame(frame, false); + if (endOfStream) { + sendTrailers(stashedTrailers); + stashedTrailers = null; + } + } + } + + /** + * Sends an outbound frame to the remote end point. + * + * @param frame a buffer containing the chunk of data to be sent. + * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by + * this endpoint. + */ + protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream); + + /** + * Sends trailers to the remote end point. This call implies end of stream. + * + * @param trailers metadata to be sent to end point + */ + protected abstract void sendTrailers(Metadata.Trailers trailers); + /** * The Stream is considered completely closed and there is no further opportunity for error. It * calls the listener's {@code closed()} if it was not already done by {@link #abortStream}. Note @@ -80,7 +126,7 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser new Metadata.Trailers()); throw new IllegalStateException("successful complete() without close()"); } - listener.closed(status, gracefulTrailers); + listener.closed(status, new Metadata.Trailers()); } @Override @@ -91,7 +137,8 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser /** * Called when the remote end half-closes the stream. */ - public final void remoteEndClosed() { + @Override + protected final void remoteEndClosed() { synchronized (stateLock) { Preconditions.checkState(state == OPEN, "Stream not OPEN"); state = WRITE_ONLY; diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java index f0cfa2a3a4..d4c3bc4253 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java @@ -4,7 +4,6 @@ import com.google.common.base.Preconditions; import com.google.common.io.Closeables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; import java.io.InputStream; @@ -41,29 +40,19 @@ public abstract class AbstractStream implements Stream { private final Framer.Sink outboundFrameHandler = new Framer.Sink() { @Override public void deliverFrame(ByteBuffer frame, boolean endOfStream) { - sendFrame(frame, endOfStream); + internalSendFrame(frame, endOfStream); } }; /** - * Internal handler for Deframer output. Informs the {@link #listener()} of inbound messages. + * Internal handler for deframer output. Informs stream of inbound messages. */ - private final StreamListener inboundMessageHandler = new StreamListener() { - - @Override - public ListenableFuture headersRead(Metadata.Headers headers) { - inboundPhase(Phase.HEADERS); - ListenableFuture future = listener().headersRead(headers); - disableWindowUpdate(future); - return future; - } - + private final GrpcDeframer.Sink inboundMessageHandler = new GrpcDeframer.Sink() { @Override public ListenableFuture messageRead(InputStream input, int length) { ListenableFuture future = null; try { - inboundPhase(Phase.MESSAGE); - future = listener().messageRead(input, length); + future = receiveMessage(input, length); disableWindowUpdate(future); return future; } finally { @@ -72,9 +61,13 @@ public abstract class AbstractStream implements Stream { } @Override - public void closed(Status status, Metadata.Trailers trailers) { - inboundPhase(Phase.STATUS); - listener().closed(status, trailers); + public void statusRead(Status status) { + receiveStatus(status); + } + + @Override + public void endOfStream() { + remoteEndClosed(); } }; @@ -129,12 +122,16 @@ public abstract class AbstractStream implements Stream { * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by * this endpoint. */ - protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream); + protected abstract void internalSendFrame(ByteBuffer frame, boolean endOfStream); - /** - * Returns the listener associated to this stream. - */ - protected abstract StreamListener listener(); + /** A message was deframed. */ + protected abstract ListenableFuture receiveMessage(InputStream is, int length); + + /** A status was deframed. */ + protected abstract void receiveStatus(Status status); + + /** Deframer reached end of stream. */ + protected abstract void remoteEndClosed(); /** * If the given future is non-{@code null}, temporarily disables window updates for inbound flow @@ -147,7 +144,7 @@ public abstract class AbstractStream implements Stream { * Gets the internal handler for inbound messages. Subclasses must use this as the target for a * {@link com.google.net.stubby.newtransport.Deframer}. */ - protected StreamListener inboundMessageHandler() { + protected GrpcDeframer.Sink inboundMessageHandler() { return inboundMessageHandler; } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java index 817ff9422f..dd7d80fa1b 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java @@ -2,7 +2,6 @@ package com.google.net.stubby.newtransport; import com.google.common.io.ByteStreams; import com.google.net.stubby.GrpcFramingUtil; -import com.google.net.stubby.Metadata; import com.google.net.stubby.Operation; import com.google.net.stubby.Status; import com.google.net.stubby.transport.Transport; @@ -23,13 +22,13 @@ public abstract class Deframer implements Framer.Sink { */ private static final int LENGTH_NOT_SET = -1; - private final StreamListener target; + private final GrpcDeframer.Sink target; private boolean inFrame; private byte currentFlags; private int currentLength = LENGTH_NOT_SET; private boolean statusDelivered; - public Deframer(StreamListener target) { + public Deframer(GrpcDeframer.Sink target) { this.target = target; } @@ -141,7 +140,8 @@ public abstract class Deframer implements Framer.Sink { } private void writeStatus(Status status) { - target.closed(status, new Metadata.Trailers()); + target.statusRead(status); + target.endOfStream(); statusDelivered = true; } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java index cc1a3062ef..9d55009355 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java @@ -20,6 +20,9 @@ import java.util.concurrent.Executor; * {@link Decompressor}. */ public class GrpcDeframer implements Closeable { + public interface Sink extends MessageDeframer2.Sink { + void statusRead(Status status); + } private enum State { HEADER, BODY @@ -35,22 +38,22 @@ public class GrpcDeframer implements Closeable { private boolean statusNotified; private boolean endOfStream; private boolean deliveryOutstanding; - private StreamListener listener; + private Sink sink; private CompositeBuffer nextFrame; /** * Constructs the deframer. * * @param decompressor the object used for de-framing GRPC compression frames. - * @param listener the listener for fully read GRPC messages. + * @param sink the sink for fully read GRPC messages. * @param executor the executor to be used for delivery. All calls to * {@link #deframe(Buffer, boolean)} must be made in the context of this executor. This * executor must not allow concurrent access to this class, so it must be either a single * thread or have sequential processing of events. */ - public GrpcDeframer(Decompressor decompressor, StreamListener listener, Executor executor) { + public GrpcDeframer(Decompressor decompressor, Sink sink, Executor executor) { this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor"); - this.listener = Preconditions.checkNotNull(listener, "listener"); + this.sink = Preconditions.checkNotNull(sink, "sink"); this.executor = Preconditions.checkNotNull(executor, "executor"); deliveryTask = new Runnable() { @Override @@ -62,7 +65,7 @@ public class GrpcDeframer implements Closeable { } /** - * Adds the given data to this deframer and attempts delivery to the listener. + * Adds the given data to this deframer and attempts delivery to the sink. */ public void deframe(Buffer data, boolean endOfStream) { Preconditions.checkNotNull(data, "data"); @@ -87,7 +90,7 @@ public class GrpcDeframer implements Closeable { /** * If there is no outstanding delivery, attempts to read and deliver as many messages to the - * listener as possible. Only one outstanding delivery is allowed at a time. + * sink as possible. Only one outstanding delivery is allowed at a time. */ private void deliver() { if (deliveryOutstanding) { @@ -106,11 +109,11 @@ public class GrpcDeframer implements Closeable { processHeader(); break; case BODY: - // Read the body and deliver the message to the listener. + // Read the body and deliver the message to the sink. deliveryOutstanding = true; ListenableFuture processingFuture = processBody(); if (processingFuture != null) { - // A listener was returned for the completion of processing the delivered + // A sink was returned for the completion of processing the delivered // message. Once it's done, try to deliver the next message. processingFuture.addListener(deliveryTask, executor); return; @@ -200,9 +203,9 @@ public class GrpcDeframer implements Closeable { */ private ListenableFuture processMessage() { try { - return listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes()); + return sink.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes()); } finally { - // Don't close the frame, since the listener is now responsible for the life-cycle. + // Don't close the frame, since the sink is now responsible for the life-cycle. nextFrame = null; } } @@ -223,10 +226,11 @@ public class GrpcDeframer implements Closeable { } /** - * Delivers the status notification to the listener. + * Delivers the status notification to the sink. */ private void notifyStatus(Status status) { statusNotified = true; - listener.closed(status, new Metadata.Trailers()); + sink.statusRead(status); + sink.endOfStream(); } } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/InputStreamDeframer.java b/core/src/main/java/com/google/net/stubby/newtransport/InputStreamDeframer.java index 75895d3de7..fe5aa06364 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/InputStreamDeframer.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/InputStreamDeframer.java @@ -15,7 +15,7 @@ public class InputStreamDeframer extends Deframer { private final PrefixingInputStream prefixingInputStream; - public InputStreamDeframer(StreamListener target) { + public InputStreamDeframer(GrpcDeframer.Sink target) { super(target); prefixingInputStream = new PrefixingInputStream(4096); } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/MessageDeframer2.java b/core/src/main/java/com/google/net/stubby/newtransport/MessageDeframer2.java index 57a25d36a9..8c37e02895 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/MessageDeframer2.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/MessageDeframer2.java @@ -3,11 +3,10 @@ package com.google.net.stubby.newtransport; import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.ListenableFuture; -import com.google.net.stubby.Metadata; -import com.google.net.stubby.Status; import java.io.ByteArrayInputStream; import java.io.Closeable; +import java.io.InputStream; import java.io.IOException; import java.util.concurrent.Executor; import java.util.zip.GZIPInputStream; @@ -30,63 +29,17 @@ public class MessageDeframer2 implements Closeable { NONE, GZIP; } + public interface Sink { + public ListenableFuture messageRead(InputStream is, int length); + public void endOfStream(); + } + private enum State { HEADER, BODY } - /** - * Create a deframer for use on the server-side. All calls to this class must be made in the - * context of the provided executor, which also must not allow concurrent processing of Runnables. - * - * @param listener callback for fully read GRPC messages - * @param executor used for internal event processing - */ - public static MessageDeframer2 createOnServer(StreamListener listener, Executor executor) { - return createOnServer(listener, executor, Compression.NONE); - } - - /** - * Create a deframer for use on the server-side. All calls to this class must be made in the - * context of the provided executor, which also must not allow concurrent processing of Runnables. - * - * @param listener callback for fully read GRPC messages - * @param executor used for internal event processing - * @param compression the compression used if a compressed frame is encountered, with NONE meaning - * unsupported - */ - public static MessageDeframer2 createOnServer(StreamListener listener, Executor executor, - Compression compression) { - return new MessageDeframer2(listener, executor, false, compression); - } - - /** - * Create a deframer for use on the client-side. All calls to this class must be made in the - * context of the provided executor, which also must not allow concurrent processing of Runnables. - * - * @param listener callback for fully read GRPC messages - * @param executor used for internal event processing - */ - public static MessageDeframer2 createOnClient(StreamListener listener, Executor executor) { - return createOnClient(listener, executor, Compression.NONE); - } - - /** - * Create a deframer for use on the client-side. All calls to this class must be made in the - * context of the provided executor, which also must not allow concurrent processing of Runnables. - * - * @param listener callback for fully read GRPC messages - * @param executor used for internal event processing - * @param compression the compression used if a compressed frame is encountered, with NONE meaning - * unsupported - */ - public static MessageDeframer2 createOnClient(StreamListener listener, Executor executor, - Compression compression) { - return new MessageDeframer2(listener, executor, true, compression); - } - - private final StreamListener listener; + private final Sink sink; private final Executor executor; - private final boolean client; private final Compression compression; private final Runnable deliveryTask = new Runnable() { @Override @@ -103,16 +56,35 @@ public class MessageDeframer2 implements Closeable { private CompositeBuffer nextFrame; private CompositeBuffer unprocessed = new CompositeBuffer(); - private MessageDeframer2(StreamListener listener, Executor executor, boolean client, - Compression compression) { - this.listener = Preconditions.checkNotNull(listener, "listener"); + /** + * Create a deframer. All calls to this class must be made in the context of the provided + * executor, which also must not allow concurrent processing of Runnables. Compression will not + * be supported. + * + * @param sink callback for fully read GRPC messages + * @param executor used for internal event processing + */ + public MessageDeframer2(Sink sink, Executor executor) { + this(sink, executor, Compression.NONE); + } + + /** + * Create a deframer. All calls to this class must be made in the context of the provided + * executor, which also must not allow concurrent processing of Runnables. + * + * @param sink callback for fully read GRPC messages + * @param executor used for internal event processing + * @param compression the compression used if a compressed frame is encountered, with NONE meaning + * unsupported + */ + public MessageDeframer2(Sink sink, Executor executor, Compression compression) { + this.sink = Preconditions.checkNotNull(sink, "sink"); this.executor = Preconditions.checkNotNull(executor, "executor"); - this.client = client; this.compression = Preconditions.checkNotNull(compression, "compression"); } /** - * Adds the given data to this deframer and attempts delivery to the listener. + * Adds the given data to this deframer and attempts delivery to the sink. */ public void deframe(Buffer data, boolean endOfStream) { Preconditions.checkNotNull(data, "data"); @@ -134,9 +106,18 @@ public class MessageDeframer2 implements Closeable { } } + public void delayProcessing(ListenableFuture future) { + Preconditions.checkState(!deliveryOutstanding, "Only one delay allowed concurrently"); + if (future != null) { + deliveryOutstanding = true; + // Once future completes, try to deliver the next message. + future.addListener(deliveryTask, executor); + } + } + /** * If there is no outstanding delivery, attempts to read and deliver as many messages to the - * listener as possible. Only one outstanding delivery is allowed at a time. + * sink as possible. Only one outstanding delivery is allowed at a time. */ private void deliver() { if (deliveryOutstanding) { @@ -151,11 +132,11 @@ public class MessageDeframer2 implements Closeable { processHeader(); break; case BODY: - // Read the body and deliver the message to the listener. + // Read the body and deliver the message to the sink. deliveryOutstanding = true; ListenableFuture processingFuture = processBody(); if (processingFuture != null) { - // A listener was returned for the completion of processing the delivered + // A future was returned for the completion of processing the delivered // message. Once it's done, try to deliver the next message. processingFuture.addListener(deliveryTask, executor); return; @@ -175,10 +156,7 @@ public class MessageDeframer2 implements Closeable { // application is properly notified of abortion. throw new RuntimeException("Encountered end-of-stream mid-frame"); } - if (!client) { - // If on the server-side, we need to notify application of half-close. - listener.closed(Status.OK, new Metadata.Trailers()); - } + sink.endOfStream(); } } @@ -241,13 +219,13 @@ public class MessageDeframer2 implements Closeable { } catch (IOException ex) { throw new RuntimeException(ex); } - future = listener.messageRead(new ByteArrayInputStream(bytes), bytes.length); + future = sink.messageRead(new ByteArrayInputStream(bytes), bytes.length); } else { throw new AssertionError("Unknown compression type"); } } else { - // Don't close the frame, since the listener is now responsible for the life-cycle. - future = listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes()); + // Don't close the frame, since the sink is now responsible for the life-cycle. + future = sink.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes()); nextFrame = null; } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java index 00ee76ce98..1d990c3612 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java @@ -50,8 +50,7 @@ class NettyClientStream extends AbstractClientStream implements NettyStream { this.deframer2 = null; } else { this.deframer = null; - this.deframer2 = MessageDeframer2.createOnClient( - inboundMessageHandler(), channel.eventLoop()); + this.deframer2 = new MessageDeframer2(inboundMessageHandler(), channel.eventLoop()); } windowUpdateManager = new WindowUpdateManager(channel, inboundFlow); } @@ -83,8 +82,14 @@ class NettyClientStream extends AbstractClientStream implements NettyStream { public void inboundHeadersRecieved(Http2Headers headers, boolean endOfStream) { responseCode = responseCode(headers); isGrpcResponse = isGrpcResponse(headers, responseCode); - if (!isGrpcResponse && endOfStream) { - setStatus(new Status(responseCode), new Metadata.Trailers()); + if (endOfStream) { + if (isGrpcResponse) { + // TODO(user): call stashTrailers() as appropriate, then provide endOfStream to + // deframer. + setStatus(new Status(responseCode), new Metadata.Trailers()); + } else { + setStatus(new Status(responseCode), new Metadata.Trailers()); + } } } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java index 3193e47d96..e023596956 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerHandler.java @@ -57,16 +57,6 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler { super(connection, frameReader, frameWriter, inboundFlow, outboundFlow); this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener"); this.inboundFlow = Preconditions.checkNotNull(inboundFlow, "inboundFlow"); - - // Observe the HTTP/2 connection for events. - connection.addListener(new Http2ConnectionAdapter() { - @Override - public void streamHalfClosed(Http2Stream stream) { - if (stream.state() == Http2Stream.State.HALF_CLOSED_REMOTE) { - serverStream(stream).remoteEndClosed(); - } - } - }); } @Override diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerStream.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerStream.java index e33bce90b4..cc5c1be028 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerStream.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServerStream.java @@ -2,6 +2,7 @@ package com.google.net.stubby.newtransport.netty; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; +import com.google.net.stubby.Metadata; import com.google.net.stubby.newtransport.AbstractServerStream; import com.google.net.stubby.newtransport.GrpcDeframer; import com.google.net.stubby.newtransport.MessageDeframer2; @@ -35,7 +36,7 @@ class NettyServerStream extends AbstractServerStream implements NettyStream { deframer2 = null; } else { deframer = null; - deframer2 = MessageDeframer2.createOnServer(inboundMessageHandler(), channel.eventLoop()); + deframer2 = new MessageDeframer2(inboundMessageHandler(), channel.eventLoop()); } windowUpdateManager = new WindowUpdateManager(channel, Preconditions.checkNotNull(inboundFlow, "inboundFlow")); @@ -68,6 +69,11 @@ class NettyServerStream extends AbstractServerStream implements NettyStream { channel.writeAndFlush(cmd); } + @Override + protected void sendTrailers(Metadata.Trailers trailers) { + // TODO(user): send trailers + } + @Override public int id() { return id; diff --git a/core/src/test/java/com/google/net/stubby/MetadataTest.java b/core/src/test/java/com/google/net/stubby/MetadataTest.java index a0f79a62fa..12fbc900c5 100644 --- a/core/src/test/java/com/google/net/stubby/MetadataTest.java +++ b/core/src/test/java/com/google/net/stubby/MetadataTest.java @@ -6,6 +6,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import com.google.common.primitives.Bytes; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -43,7 +45,7 @@ public class MetadataTest { private static final String LANCE = "lance"; private static final byte[] LANCE_BYTES = LANCE.getBytes(StandardCharsets.US_ASCII); - private static final Metadata.Key KEY = new Metadata.Key("test", FISH_MARSHALLER); + private static final Metadata.Key KEY = Metadata.Key.of("test", FISH_MARSHALLER); @Test public void testWriteParsed() { @@ -108,6 +110,31 @@ public class MetadataTest { assertEquals("authority", h1.getAuthority()); } + @Test + public void integerMarshallerBytesIsBigEndian() { + assertEquals(Bytes.asList(new byte[] {0x12, 0x34, 0x56, 0x78}), + Bytes.asList(Metadata.INTEGER_MARSHALLER.toBytes(0x12345678))); + } + + @Test + public void integerMarshallerAsciiIsDecimal() { + assertEquals("12345678", Metadata.INTEGER_MARSHALLER.toAscii(12345678)); + } + + @Test + public void roundTripIntegerMarshaller() { + roundTripInteger(0); + roundTripInteger(1); + roundTripInteger(-1); + roundTripInteger(0x12345678); + roundTripInteger(0x87654321); + } + + private void roundTripInteger(Integer i) { + assertEquals(i, Metadata.INTEGER_MARSHALLER.parseBytes(Metadata.INTEGER_MARSHALLER.toBytes(i))); + assertEquals(i, Metadata.INTEGER_MARSHALLER.parseAscii(Metadata.INTEGER_MARSHALLER.toAscii(i))); + } + private static class Fish { private String name; diff --git a/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java index f360f47cde..d21005dff1 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java @@ -51,7 +51,7 @@ public class GrpcDeframerTest { private StubDecompressor decompressor; @Mock - private StreamListener listener; + private GrpcDeframer.Sink sink; private SettableFuture messageFuture; @@ -59,10 +59,10 @@ public class GrpcDeframerTest { public void setup() { MockitoAnnotations.initMocks(this); messageFuture = SettableFuture.create(); - when(listener.messageRead(any(InputStream.class), anyInt())).thenReturn(messageFuture); + when(sink.messageRead(any(InputStream.class), anyInt())).thenReturn(messageFuture); decompressor = new StubDecompressor(); - reader = new GrpcDeframer(decompressor, listener, MoreExecutors.directExecutor()); + reader = new GrpcDeframer(decompressor, sink, MoreExecutors.directExecutor()); } @Test @@ -135,7 +135,7 @@ public class GrpcDeframerTest { byte[] fullBuffer = Arrays.copyOf(frame, frame.length * 2); System.arraycopy(frame, 0, fullBuffer, frame.length, frame.length); - // Use only a portion of the frame. Should not call the listener. + // Use only a portion of the frame. Should not call the sink. int startIx = 0; int endIx = 10; byte[] chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx); @@ -144,7 +144,7 @@ public class GrpcDeframerTest { verifyNoPayload(); verifyNoStatus(); - // Supply the rest of the frame and a portion of a second frame. Should call the listener. + // Supply the rest of the frame and a portion of a second frame. Should call the sink. startIx = endIx; endIx = startIx + frame.length; chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx); @@ -156,7 +156,7 @@ public class GrpcDeframerTest { private void verifyPayload() { ArgumentCaptor captor = ArgumentCaptor.forClass(InputStream.class); - verify(listener).messageRead(captor.capture(), eq(MESSAGE.length())); + verify(sink).messageRead(captor.capture(), eq(MESSAGE.length())); assertEquals(MESSAGE, readString(captor.getValue(), MESSAGE.length())); } @@ -176,16 +176,18 @@ public class GrpcDeframerTest { private void verifyStatus(Transport.Code code) { ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); - verify(listener).closed(captor.capture(), notNull(Metadata.Trailers.class)); + verify(sink).statusRead(captor.capture()); + verify(sink).endOfStream(); assertEquals(code, captor.getValue().getCode()); } private void verifyNoPayload() { - verify(listener, never()).messageRead(any(InputStream.class), anyInt()); + verify(sink, never()).messageRead(any(InputStream.class), anyInt()); } private void verifyNoStatus() { - verify(listener, never()).closed(any(Status.class), notNull(Metadata.Trailers.class)); + verify(sink, never()).statusRead(any(Status.class)); + verify(sink, never()).endOfStream(); } private static byte[] payloadFrame() throws IOException { diff --git a/core/src/test/java/com/google/net/stubby/newtransport/MessageDeframer2Test.java b/core/src/test/java/com/google/net/stubby/newtransport/MessageDeframer2Test.java index 4f5d11696c..719548ee5e 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/MessageDeframer2Test.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/MessageDeframer2Test.java @@ -16,6 +16,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; +import com.google.net.stubby.newtransport.MessageDeframer2.Sink; import org.junit.Test; import org.junit.runner.RunWith; @@ -34,133 +35,121 @@ import java.io.InputStream; */ @RunWith(JUnit4.class) public class MessageDeframer2Test { - private StreamListener listener = mock(StreamListener.class); - private MessageDeframer2 deframer - = MessageDeframer2.createOnClient(listener, MoreExecutors.directExecutor()); + private Sink sink = mock(Sink.class); + private MessageDeframer2 deframer = new MessageDeframer2(sink, MoreExecutors.directExecutor()); private ArgumentCaptor messages = ArgumentCaptor.forClass(InputStream.class); @Test public void simplePayload() { deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); - verify(listener).messageRead(messages.capture(), eq(2)); + verify(sink).messageRead(messages.capture(), eq(2)); assertEquals(Bytes.asList(new byte[] {3, 14}), bytes(messages)); - verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(sink); } @Test public void smallCombinedPayloads() { deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); - verify(listener).messageRead(messages.capture(), eq(1)); + verify(sink).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); - verify(listener).messageRead(messages.capture(), eq(2)); + verify(sink).messageRead(messages.capture(), eq(2)); assertEquals(Bytes.asList(new byte[] {14, 15}), bytes(messages)); - verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(sink); } @Test - public void clientEndOfStreamShouldNotNotifyClose() { + public void endOfStreamWithPayloadShouldNotifyEndOfStream() { deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true); - verify(listener).messageRead(messages.capture(), eq(1)); + verify(sink).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); - verifyNoMoreInteractions(listener); + verify(sink).endOfStream(); + verifyNoMoreInteractions(sink); } @Test - public void serverEndOfStreamWithPayloadShouldNotifyClose() { - deframer = MessageDeframer2.createOnServer(listener, MoreExecutors.directExecutor()); - deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true); - verify(listener).messageRead(messages.capture(), eq(1)); - assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); - verify(listener).closed(eq(Status.OK), notNull(Metadata.Trailers.class)); - verifyNoMoreInteractions(listener); - } - - @Test - public void serverEndOfStreamShouldNotifyClose() { - deframer = MessageDeframer2.createOnServer(listener, MoreExecutors.directExecutor()); + public void endOfStreamShouldNotifyEndOfStream() { deframer.deframe(buffer(new byte[0]), true); - verify(listener).closed(eq(Status.OK), notNull(Metadata.Trailers.class)); - verifyNoMoreInteractions(listener); + verify(sink).endOfStream(); + verifyNoMoreInteractions(sink); } @Test public void payloadSplitBetweenBuffers() { deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9}), false); - verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(sink); deframer.deframe(buffer(new byte[] {2, 6}), false); - verify(listener).messageRead(messages.capture(), eq(7)); + verify(sink).messageRead(messages.capture(), eq(7)); assertEquals(Bytes.asList(new byte[] {3, 14, 1, 5, 9, 2, 6}), bytes(messages)); - verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(sink); } @Test public void frameHeaderSplitBetweenBuffers() { deframer.deframe(buffer(new byte[] {0, 0}), false); - verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(sink); deframer.deframe(buffer(new byte[] {0, 0, 1, 3}), false); - verify(listener).messageRead(messages.capture(), eq(1)); + verify(sink).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); - verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(sink); } @Test public void emptyPayload() { deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 0}), false); - verify(listener).messageRead(messages.capture(), eq(0)); + verify(sink).messageRead(messages.capture(), eq(0)); assertEquals(Bytes.asList(), bytes(messages)); - verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(sink); } @Test public void largerFrameSize() { deframer.deframe( Buffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false); - verify(listener).messageRead(messages.capture(), eq(1000)); + verify(sink).messageRead(messages.capture(), eq(1000)); assertEquals(Bytes.asList(new byte[1000]), bytes(messages)); - verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(sink); } @Test public void payloadCallbackShouldWaitForFutureCompletion() { SettableFuture messageFuture = SettableFuture.create(); - when(listener.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture); + when(sink.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture); deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); - verify(listener).messageRead(messages.capture(), eq(1)); + verify(sink).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); - verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(sink); messageFuture.set(null); - verify(listener).messageRead(messages.capture(), eq(2)); + verify(sink).messageRead(messages.capture(), eq(2)); assertEquals(Bytes.asList(new byte[] {14, 15}), bytes(messages)); - verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(sink); } @Test - public void serverClosedCallbackShouldWaitForFutureCompletion() { - deframer = MessageDeframer2.createOnServer(listener, MoreExecutors.directExecutor()); + public void endOfStreamCallbackShouldWaitForFutureCompletion() { SettableFuture messageFuture = SettableFuture.create(); - when(listener.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture); + when(sink.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture); deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true); - verify(listener).messageRead(messages.capture(), eq(1)); + verify(sink).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); - verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(sink); messageFuture.set(null); - verify(listener).closed(eq(Status.OK), notNull(Metadata.Trailers.class)); - verifyNoMoreInteractions(listener); + verify(sink).endOfStream(); + verifyNoMoreInteractions(sink); } @Test public void compressed() { - deframer = MessageDeframer2.createOnClient( - listener, MoreExecutors.directExecutor(), MessageDeframer2.Compression.GZIP); + deframer = new MessageDeframer2( + sink, MoreExecutors.directExecutor(), MessageDeframer2.Compression.GZIP); byte[] payload = compress(new byte[1000]); assertTrue(payload.length < 100); byte[] header = new byte[] {1, 0, 0, 0, (byte) payload.length}; deframer.deframe(buffer(Bytes.concat(header, payload)), false); - verify(listener).messageRead(messages.capture(), eq(1000)); + verify(sink).messageRead(messages.capture(), eq(1000)); assertEquals(Bytes.asList(new byte[1000]), bytes(messages)); - verifyNoMoreInteractions(listener); + verifyNoMoreInteractions(sink); } private static List bytes(ArgumentCaptor captor) { diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java index 45f7e2f630..bd5b68b6ea 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java @@ -138,7 +138,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { assertArrayEquals(CONTENT, ByteStreams.toByteArray(captor.getValue())); if (endStream) { - verify(streamListener).closed(eq(Status.OK), notNull(Metadata.Trailers.class)); + verify(streamListener).halfClosed(); } verifyNoMoreInteractions(streamListener); } @@ -149,7 +149,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { handler.channelRead(ctx, emptyDataFrame(STREAM_ID, true)); verify(streamListener, never()).messageRead(any(InputStream.class), anyInt()); - verify(streamListener).closed(eq(Status.OK), notNull(Metadata.Trailers.class)); + verify(streamListener).halfClosed(); verifyNoMoreInteractions(streamListener); } diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java index 5fc6eb71da..40ce287405 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java @@ -2,6 +2,7 @@ package com.google.net.stubby.newtransport.netty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.notNull; import static org.mockito.Matchers.same; import static org.mockito.Mockito.never; @@ -15,6 +16,9 @@ import com.google.net.stubby.newtransport.ServerStreamListener; import com.google.net.stubby.newtransport.StreamState; import com.google.net.stubby.transport.Transport; +import io.netty.buffer.EmptyByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -57,7 +61,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase { verifyZeroInteractions(serverListener); // Sending complete. Listener gets closed() stream().complete(); - verify(serverListener).closed(Status.CANCELLED, trailers); + verify(serverListener).closed(eq(Status.CANCELLED), notNull(Metadata.Trailers.class)); assertEquals(StreamState.CLOSED, stream.state()); verifyZeroInteractions(serverListener); } @@ -65,7 +69,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase { @Test public void closeAfterClientHalfCloseShouldSucceed() throws Exception { // Client half-closes. Listener gets halfClosed() - stream().remoteEndClosed(); + stream().inboundDataReceived(new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true); assertEquals(StreamState.WRITE_ONLY, stream.state()); verify(serverListener).halfClosed(); // Server closes. Status sent @@ -76,23 +80,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase { new SendGrpcFrameCommand(STREAM_ID, statusFrame(Status.OK), true)); // Sending and receiving complete. Listener gets closed() stream().complete(); - verify(serverListener).closed(Status.OK, trailers); - verifyNoMoreInteractions(serverListener); - } - - @Test - public void clientHalfCloseForTheSecondTimeShouldFail() throws Exception { - // Client half-closes. Listener gets halfClosed() - stream().remoteEndClosed(); - assertEquals(StreamState.WRITE_ONLY, stream.state()); - verify(serverListener).halfClosed(); - // Client half-closes again. - try { - stream().remoteEndClosed(); - fail(); - } catch (IllegalStateException expected) { - } - assertEquals(StreamState.WRITE_ONLY, stream.state()); + verify(serverListener).closed(eq(Status.OK), notNull(Metadata.Trailers.class)); verifyNoMoreInteractions(serverListener); } @@ -121,7 +109,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase { public void abortStreamAfterClientHalfCloseShouldCallClose() { Status status = new Status(Transport.Code.INTERNAL, new Throwable()); // Client half-closes. Listener gets halfClosed() - stream().remoteEndClosed(); + stream().inboundDataReceived(new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true); assertEquals(StreamState.WRITE_ONLY, stream.state()); verify(serverListener).halfClosed(); // Abort