Plumb trailer passing through transport streams.

We purposefully avoid going through the (de)framer, since close()
behavior is specific to whether on client or server.
AbstractClientStream and AbstractServerStream handle mapping the events
to appropriate semantics, but require stashing status/trailer for later
use.

It was very interesting getting to a point where we could support the old
and new protocol; that is probably the most detailed-oriented portion of
the CL. There are some interface hacks going on, but those will
naturally be removed when we trash the gRPC v1 framer.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=76092186
This commit is contained in:
ejona 2014-09-22 12:23:19 -07:00 committed by Eric Anderson
parent fc7a052c07
commit 9d50299a04
19 changed files with 380 additions and 237 deletions

View File

@ -14,7 +14,7 @@ import javax.inject.Provider;
/** Channel wrapper that authenticates all calls with OAuth2. */
public class OAuth2ChannelInterceptor extends ForwardingChannel {
private static final Metadata.Key<String> AUTHORIZATION =
new Metadata.Key<String>("Authorization", Metadata.STRING_MARSHALLER);
Metadata.Key.of("Authorization", Metadata.STRING_MARSHALLER);
private final OAuth2AccessTokenProvider accessTokenProvider;
private final Provider<String> authorizationHeaderProvider

View File

@ -87,6 +87,42 @@ public abstract class Metadata<S extends Metadata> {
}
};
/**
* Simple metadata marshaller that encodes an integer as a signed decimal string or as big endian
* binary with four bytes.
*/
public static final Marshaller<Integer> INTEGER_MARSHALLER = new Marshaller<Integer>() {
@Override
public byte[] toBytes(Integer value) {
return new byte[] {
(byte) (value >>> 24),
(byte) (value >>> 16),
(byte) (value >>> 8),
(byte) (value >>> 0)};
}
@Override
public String toAscii(Integer value) {
return value.toString();
}
@Override
public Integer parseBytes(byte[] serialized) {
if (serialized.length != 4) {
throw new IllegalArgumentException("Can only deserialize 4 bytes into an integer");
}
return (serialized[0] << 24)
| (serialized[1] << 16)
| (serialized[2] << 8)
| serialized[3];
}
@Override
public Integer parseAscii(String ascii) {
return Integer.valueOf(ascii);
}
};
private final ListMultimap<String, MetadataEntry> store;
private final boolean serializable;
@ -395,6 +431,9 @@ public abstract class Metadata<S extends Metadata> {
* Key for metadata entries. Allows for parsing and serialization of metadata.
*/
public static class Key<T> {
public static <T> Key<T> of(String name, Marshaller<T> marshaller) {
return new Key<T>(name, marshaller);
}
private final String name;
private final byte[] asciiName;
@ -403,7 +442,7 @@ public abstract class Metadata<S extends Metadata> {
/**
* Keys have a name and a marshaller used for serialization.
*/
public Key(String name, Marshaller<T> marshaller) {
private Key(String name, Marshaller<T> marshaller) {
this.name = Preconditions.checkNotNull(name, "name").intern();
this.asciiName = name.getBytes(StandardCharsets.US_ASCII);
this.marshaller = Preconditions.checkNotNull(marshaller);

View File

@ -4,6 +4,8 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.net.stubby.transport.Transport;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
@ -12,9 +14,14 @@ import javax.annotation.concurrent.Immutable;
*/
@Immutable
public class Status {
public static final Status OK = new Status(Transport.Code.OK);
public static final Status CANCELLED = new Status(Transport.Code.CANCELLED);
public static final Metadata.Key<Transport.Code> CODE_KEY
= Metadata.Key.of("grpc-status", new CodeMarshaller());
public static final Metadata.Key<String> MESSAGE_KEY
= Metadata.Key.of("grpc-message", Metadata.STRING_MARSHALLER);
private static final Logger log = Logger.getLogger(Status.class.getName());
public static Status fromThrowable(Throwable t) {
for (Throwable cause : Throwables.getCausalChain(t)) {
@ -134,4 +141,35 @@ public class Status {
builder.append("]");
return builder.toString();
}
private static class CodeMarshaller implements Metadata.Marshaller<Transport.Code> {
@Override
public byte[] toBytes(Transport.Code value) {
return Metadata.INTEGER_MARSHALLER.toBytes(value.getNumber());
}
@Override
public String toAscii(Transport.Code value) {
return Metadata.INTEGER_MARSHALLER.toAscii(value.getNumber());
}
@Override
public Transport.Code parseBytes(byte[] serialized) {
return intToCode(Metadata.INTEGER_MARSHALLER.parseBytes(serialized));
}
@Override
public Transport.Code parseAscii(String ascii) {
return intToCode(Metadata.INTEGER_MARSHALLER.parseAscii(ascii));
}
private Transport.Code intToCode(Integer i) {
Transport.Code code = Transport.Code.valueOf(i);
if (code == null) {
log.warning("Unknown Code: " + i);
code = Transport.Code.UNKNOWN;
}
return code;
}
}
}

View File

@ -30,7 +30,7 @@ public abstract class AbstractBuffer implements Buffer {
int b2 = readUnsignedByte();
int b3 = readUnsignedByte();
int b4 = readUnsignedByte();
return (b1 << 24) + (b2 << 16) + (b3 << 8) + b4;
return (b1 << 24) | (b2 << 16) | (b3 << 8) | b4;
}
@Override

View File

@ -5,9 +5,15 @@ import static com.google.net.stubby.newtransport.StreamState.OPEN;
import static com.google.net.stubby.newtransport.StreamState.READ_ONLY;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import java.io.InputStream;
import java.nio.ByteBuffer;
import javax.annotation.concurrent.GuardedBy;
/**
* The abstract base class for {@link ClientStream} implementations.
*/
@ -15,37 +21,64 @@ public abstract class AbstractClientStream extends AbstractStream implements Cli
private final StreamListener listener;
@GuardedBy("stateLock")
private Status status;
private final Object stateLock = new Object();
private volatile StreamState state = StreamState.OPEN;
private Status stashedStatus;
private Metadata.Trailers stashedTrailers;
protected AbstractClientStream(StreamListener listener) {
this.listener = Preconditions.checkNotNull(listener);
}
@Override
protected final StreamListener listener() {
return listener;
protected ListenableFuture<Void> receiveMessage(InputStream is, int length) {
return listener.messageRead(is, length);
}
/** gRPC protocol v1 support */
@Override
protected void receiveStatus(Status status) {
Preconditions.checkNotNull(status, "status");
stashedStatus = status;
stashedTrailers = new Metadata.Trailers();
}
/**
* Overrides the behavior of the {@link StreamListener#closed(Status)} method to call
* {@link #setStatus(Status)}, rather than notifying the {@link #listener()} directly.
* If using gRPC v2 protocol, this method must be called with received trailers before notifying
* deframer of end of stream.
*/
@Override
protected final StreamListener inboundMessageHandler() {
// Wraps the base handler to get status update.
return new ForwardingStreamListener(super.inboundMessageHandler()) {
@Override
public void closed(Status status, Metadata.Trailers trailers) {
inboundPhase(Phase.STATUS);
// TODO(user): Fix once we switch the wire format to express status in trailers
setStatus(status, new Metadata.Trailers());
public void stashTrailers(Metadata.Trailers trailers) {
Preconditions.checkNotNull(status, "trailers");
stashedStatus = new Status(trailers.get(Status.CODE_KEY), trailers.get(Status.MESSAGE_KEY));
trailers.removeAll(Status.CODE_KEY);
trailers.removeAll(Status.MESSAGE_KEY);
stashedTrailers = trailers;
}
};
@Override
protected void remoteEndClosed() {
Preconditions.checkState(stashedStatus != null, "Status and trailers should have been set");
setStatus(stashedStatus, stashedTrailers);
}
@Override
protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) {
sendFrame(frame, endOfStream);
}
/**
* Sends an outbound frame to the remote end point.
*
* @param frame a buffer containing the chunk of data to be sent.
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint.
*/
protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
/**
* Sets the status if not already set and notifies the stream listener that the stream was closed.
* This method must be called from the transport thread.

View File

@ -5,10 +5,14 @@ import static com.google.net.stubby.newtransport.StreamState.OPEN;
import static com.google.net.stubby.newtransport.StreamState.WRITE_ONLY;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import java.io.InputStream;
import java.nio.ByteBuffer;
import javax.annotation.concurrent.GuardedBy;
/**
@ -26,18 +30,25 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser
/** Saved application status for notifying when graceful stream termination completes. */
@GuardedBy("stateLock")
private Status gracefulStatus;
@GuardedBy("stateLock")
private Metadata.Trailers gracefulTrailers;
@Override
protected final StreamListener listener() {
return listener;
}
/** Saved trailers from close() that need to be sent once the framer has sent all messages. */
private Metadata.Trailers stashedTrailers;
public final void setListener(ServerStreamListener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");
}
@Override
protected ListenableFuture<Void> receiveMessage(InputStream is, int length) {
inboundPhase(Phase.MESSAGE);
return listener.messageRead(is, length);
}
/** gRPC protocol v1 support */
@Override
protected void receiveStatus(Status status) {
Preconditions.checkState(status == Status.OK, "Received status can only be OK on server");
}
@Override
public final void close(Status status, Metadata.Trailers trailers) {
Preconditions.checkNotNull(status, "status");
@ -52,13 +63,48 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser
// is notified via complete()). Since there may be large buffers involved, the actual
// completion of the RPC could be much later than this call.
gracefulStatus = status;
gracefulTrailers = trailers;
}
}
trailers.removeAll(Status.CODE_KEY);
trailers.removeAll(Status.MESSAGE_KEY);
trailers.put(Status.CODE_KEY, status.getCode());
if (status.getDescription() != null) {
trailers.put(Status.MESSAGE_KEY, status.getDescription());
}
this.stashedTrailers = trailers;
closeFramer(status);
dispose();
}
@Override
protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) {
if (!GRPC_V2_PROTOCOL) {
sendFrame(frame, endOfStream);
} else {
sendFrame(frame, false);
if (endOfStream) {
sendTrailers(stashedTrailers);
stashedTrailers = null;
}
}
}
/**
* Sends an outbound frame to the remote end point.
*
* @param frame a buffer containing the chunk of data to be sent.
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint.
*/
protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
/**
* Sends trailers to the remote end point. This call implies end of stream.
*
* @param trailers metadata to be sent to end point
*/
protected abstract void sendTrailers(Metadata.Trailers trailers);
/**
* The Stream is considered completely closed and there is no further opportunity for error. It
* calls the listener's {@code closed()} if it was not already done by {@link #abortStream}. Note
@ -80,7 +126,7 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser
new Metadata.Trailers());
throw new IllegalStateException("successful complete() without close()");
}
listener.closed(status, gracefulTrailers);
listener.closed(status, new Metadata.Trailers());
}
@Override
@ -91,7 +137,8 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser
/**
* Called when the remote end half-closes the stream.
*/
public final void remoteEndClosed() {
@Override
protected final void remoteEndClosed() {
synchronized (stateLock) {
Preconditions.checkState(state == OPEN, "Stream not OPEN");
state = WRITE_ONLY;

View File

@ -4,7 +4,6 @@ import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import java.io.InputStream;
@ -41,29 +40,19 @@ public abstract class AbstractStream implements Stream {
private final Framer.Sink<ByteBuffer> outboundFrameHandler = new Framer.Sink<ByteBuffer>() {
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfStream) {
sendFrame(frame, endOfStream);
internalSendFrame(frame, endOfStream);
}
};
/**
* Internal handler for Deframer output. Informs the {@link #listener()} of inbound messages.
* Internal handler for deframer output. Informs stream of inbound messages.
*/
private final StreamListener inboundMessageHandler = new StreamListener() {
@Override
public ListenableFuture<Void> headersRead(Metadata.Headers headers) {
inboundPhase(Phase.HEADERS);
ListenableFuture<Void> future = listener().headersRead(headers);
disableWindowUpdate(future);
return future;
}
private final GrpcDeframer.Sink inboundMessageHandler = new GrpcDeframer.Sink() {
@Override
public ListenableFuture<Void> messageRead(InputStream input, int length) {
ListenableFuture<Void> future = null;
try {
inboundPhase(Phase.MESSAGE);
future = listener().messageRead(input, length);
future = receiveMessage(input, length);
disableWindowUpdate(future);
return future;
} finally {
@ -72,9 +61,13 @@ public abstract class AbstractStream implements Stream {
}
@Override
public void closed(Status status, Metadata.Trailers trailers) {
inboundPhase(Phase.STATUS);
listener().closed(status, trailers);
public void statusRead(Status status) {
receiveStatus(status);
}
@Override
public void endOfStream() {
remoteEndClosed();
}
};
@ -129,12 +122,16 @@ public abstract class AbstractStream implements Stream {
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint.
*/
protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
protected abstract void internalSendFrame(ByteBuffer frame, boolean endOfStream);
/**
* Returns the listener associated to this stream.
*/
protected abstract StreamListener listener();
/** A message was deframed. */
protected abstract ListenableFuture<Void> receiveMessage(InputStream is, int length);
/** A status was deframed. */
protected abstract void receiveStatus(Status status);
/** Deframer reached end of stream. */
protected abstract void remoteEndClosed();
/**
* If the given future is non-{@code null}, temporarily disables window updates for inbound flow
@ -147,7 +144,7 @@ public abstract class AbstractStream implements Stream {
* Gets the internal handler for inbound messages. Subclasses must use this as the target for a
* {@link com.google.net.stubby.newtransport.Deframer}.
*/
protected StreamListener inboundMessageHandler() {
protected GrpcDeframer.Sink inboundMessageHandler() {
return inboundMessageHandler;
}

View File

@ -2,7 +2,6 @@ package com.google.net.stubby.newtransport;
import com.google.common.io.ByteStreams;
import com.google.net.stubby.GrpcFramingUtil;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Operation;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
@ -23,13 +22,13 @@ public abstract class Deframer<F> implements Framer.Sink<F> {
*/
private static final int LENGTH_NOT_SET = -1;
private final StreamListener target;
private final GrpcDeframer.Sink target;
private boolean inFrame;
private byte currentFlags;
private int currentLength = LENGTH_NOT_SET;
private boolean statusDelivered;
public Deframer(StreamListener target) {
public Deframer(GrpcDeframer.Sink target) {
this.target = target;
}
@ -141,7 +140,8 @@ public abstract class Deframer<F> implements Framer.Sink<F> {
}
private void writeStatus(Status status) {
target.closed(status, new Metadata.Trailers());
target.statusRead(status);
target.endOfStream();
statusDelivered = true;
}

View File

@ -20,6 +20,9 @@ import java.util.concurrent.Executor;
* {@link Decompressor}.
*/
public class GrpcDeframer implements Closeable {
public interface Sink extends MessageDeframer2.Sink {
void statusRead(Status status);
}
private enum State {
HEADER, BODY
@ -35,22 +38,22 @@ public class GrpcDeframer implements Closeable {
private boolean statusNotified;
private boolean endOfStream;
private boolean deliveryOutstanding;
private StreamListener listener;
private Sink sink;
private CompositeBuffer nextFrame;
/**
* Constructs the deframer.
*
* @param decompressor the object used for de-framing GRPC compression frames.
* @param listener the listener for fully read GRPC messages.
* @param sink the sink for fully read GRPC messages.
* @param executor the executor to be used for delivery. All calls to
* {@link #deframe(Buffer, boolean)} must be made in the context of this executor. This
* executor must not allow concurrent access to this class, so it must be either a single
* thread or have sequential processing of events.
*/
public GrpcDeframer(Decompressor decompressor, StreamListener listener, Executor executor) {
public GrpcDeframer(Decompressor decompressor, Sink sink, Executor executor) {
this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor");
this.listener = Preconditions.checkNotNull(listener, "listener");
this.sink = Preconditions.checkNotNull(sink, "sink");
this.executor = Preconditions.checkNotNull(executor, "executor");
deliveryTask = new Runnable() {
@Override
@ -62,7 +65,7 @@ public class GrpcDeframer implements Closeable {
}
/**
* Adds the given data to this deframer and attempts delivery to the listener.
* Adds the given data to this deframer and attempts delivery to the sink.
*/
public void deframe(Buffer data, boolean endOfStream) {
Preconditions.checkNotNull(data, "data");
@ -87,7 +90,7 @@ public class GrpcDeframer implements Closeable {
/**
* If there is no outstanding delivery, attempts to read and deliver as many messages to the
* listener as possible. Only one outstanding delivery is allowed at a time.
* sink as possible. Only one outstanding delivery is allowed at a time.
*/
private void deliver() {
if (deliveryOutstanding) {
@ -106,11 +109,11 @@ public class GrpcDeframer implements Closeable {
processHeader();
break;
case BODY:
// Read the body and deliver the message to the listener.
// Read the body and deliver the message to the sink.
deliveryOutstanding = true;
ListenableFuture<Void> processingFuture = processBody();
if (processingFuture != null) {
// A listener was returned for the completion of processing the delivered
// A sink was returned for the completion of processing the delivered
// message. Once it's done, try to deliver the next message.
processingFuture.addListener(deliveryTask, executor);
return;
@ -200,9 +203,9 @@ public class GrpcDeframer implements Closeable {
*/
private ListenableFuture<Void> processMessage() {
try {
return listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
return sink.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
} finally {
// Don't close the frame, since the listener is now responsible for the life-cycle.
// Don't close the frame, since the sink is now responsible for the life-cycle.
nextFrame = null;
}
}
@ -223,10 +226,11 @@ public class GrpcDeframer implements Closeable {
}
/**
* Delivers the status notification to the listener.
* Delivers the status notification to the sink.
*/
private void notifyStatus(Status status) {
statusNotified = true;
listener.closed(status, new Metadata.Trailers());
sink.statusRead(status);
sink.endOfStream();
}
}

View File

@ -15,7 +15,7 @@ public class InputStreamDeframer extends Deframer<InputStream> {
private final PrefixingInputStream prefixingInputStream;
public InputStreamDeframer(StreamListener target) {
public InputStreamDeframer(GrpcDeframer.Sink target) {
super(target);
prefixingInputStream = new PrefixingInputStream(4096);
}

View File

@ -3,11 +3,10 @@ package com.google.net.stubby.newtransport;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.InputStream;
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.zip.GZIPInputStream;
@ -30,63 +29,17 @@ public class MessageDeframer2 implements Closeable {
NONE, GZIP;
}
public interface Sink {
public ListenableFuture<Void> messageRead(InputStream is, int length);
public void endOfStream();
}
private enum State {
HEADER, BODY
}
/**
* Create a deframer for use on the server-side. All calls to this class must be made in the
* context of the provided executor, which also must not allow concurrent processing of Runnables.
*
* @param listener callback for fully read GRPC messages
* @param executor used for internal event processing
*/
public static MessageDeframer2 createOnServer(StreamListener listener, Executor executor) {
return createOnServer(listener, executor, Compression.NONE);
}
/**
* Create a deframer for use on the server-side. All calls to this class must be made in the
* context of the provided executor, which also must not allow concurrent processing of Runnables.
*
* @param listener callback for fully read GRPC messages
* @param executor used for internal event processing
* @param compression the compression used if a compressed frame is encountered, with NONE meaning
* unsupported
*/
public static MessageDeframer2 createOnServer(StreamListener listener, Executor executor,
Compression compression) {
return new MessageDeframer2(listener, executor, false, compression);
}
/**
* Create a deframer for use on the client-side. All calls to this class must be made in the
* context of the provided executor, which also must not allow concurrent processing of Runnables.
*
* @param listener callback for fully read GRPC messages
* @param executor used for internal event processing
*/
public static MessageDeframer2 createOnClient(StreamListener listener, Executor executor) {
return createOnClient(listener, executor, Compression.NONE);
}
/**
* Create a deframer for use on the client-side. All calls to this class must be made in the
* context of the provided executor, which also must not allow concurrent processing of Runnables.
*
* @param listener callback for fully read GRPC messages
* @param executor used for internal event processing
* @param compression the compression used if a compressed frame is encountered, with NONE meaning
* unsupported
*/
public static MessageDeframer2 createOnClient(StreamListener listener, Executor executor,
Compression compression) {
return new MessageDeframer2(listener, executor, true, compression);
}
private final StreamListener listener;
private final Sink sink;
private final Executor executor;
private final boolean client;
private final Compression compression;
private final Runnable deliveryTask = new Runnable() {
@Override
@ -103,16 +56,35 @@ public class MessageDeframer2 implements Closeable {
private CompositeBuffer nextFrame;
private CompositeBuffer unprocessed = new CompositeBuffer();
private MessageDeframer2(StreamListener listener, Executor executor, boolean client,
Compression compression) {
this.listener = Preconditions.checkNotNull(listener, "listener");
/**
* Create a deframer. All calls to this class must be made in the context of the provided
* executor, which also must not allow concurrent processing of Runnables. Compression will not
* be supported.
*
* @param sink callback for fully read GRPC messages
* @param executor used for internal event processing
*/
public MessageDeframer2(Sink sink, Executor executor) {
this(sink, executor, Compression.NONE);
}
/**
* Create a deframer. All calls to this class must be made in the context of the provided
* executor, which also must not allow concurrent processing of Runnables.
*
* @param sink callback for fully read GRPC messages
* @param executor used for internal event processing
* @param compression the compression used if a compressed frame is encountered, with NONE meaning
* unsupported
*/
public MessageDeframer2(Sink sink, Executor executor, Compression compression) {
this.sink = Preconditions.checkNotNull(sink, "sink");
this.executor = Preconditions.checkNotNull(executor, "executor");
this.client = client;
this.compression = Preconditions.checkNotNull(compression, "compression");
}
/**
* Adds the given data to this deframer and attempts delivery to the listener.
* Adds the given data to this deframer and attempts delivery to the sink.
*/
public void deframe(Buffer data, boolean endOfStream) {
Preconditions.checkNotNull(data, "data");
@ -134,9 +106,18 @@ public class MessageDeframer2 implements Closeable {
}
}
public void delayProcessing(ListenableFuture<Void> future) {
Preconditions.checkState(!deliveryOutstanding, "Only one delay allowed concurrently");
if (future != null) {
deliveryOutstanding = true;
// Once future completes, try to deliver the next message.
future.addListener(deliveryTask, executor);
}
}
/**
* If there is no outstanding delivery, attempts to read and deliver as many messages to the
* listener as possible. Only one outstanding delivery is allowed at a time.
* sink as possible. Only one outstanding delivery is allowed at a time.
*/
private void deliver() {
if (deliveryOutstanding) {
@ -151,11 +132,11 @@ public class MessageDeframer2 implements Closeable {
processHeader();
break;
case BODY:
// Read the body and deliver the message to the listener.
// Read the body and deliver the message to the sink.
deliveryOutstanding = true;
ListenableFuture<Void> processingFuture = processBody();
if (processingFuture != null) {
// A listener was returned for the completion of processing the delivered
// A future was returned for the completion of processing the delivered
// message. Once it's done, try to deliver the next message.
processingFuture.addListener(deliveryTask, executor);
return;
@ -175,10 +156,7 @@ public class MessageDeframer2 implements Closeable {
// application is properly notified of abortion.
throw new RuntimeException("Encountered end-of-stream mid-frame");
}
if (!client) {
// If on the server-side, we need to notify application of half-close.
listener.closed(Status.OK, new Metadata.Trailers());
}
sink.endOfStream();
}
}
@ -241,13 +219,13 @@ public class MessageDeframer2 implements Closeable {
} catch (IOException ex) {
throw new RuntimeException(ex);
}
future = listener.messageRead(new ByteArrayInputStream(bytes), bytes.length);
future = sink.messageRead(new ByteArrayInputStream(bytes), bytes.length);
} else {
throw new AssertionError("Unknown compression type");
}
} else {
// Don't close the frame, since the listener is now responsible for the life-cycle.
future = listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
// Don't close the frame, since the sink is now responsible for the life-cycle.
future = sink.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
nextFrame = null;
}

View File

@ -50,8 +50,7 @@ class NettyClientStream extends AbstractClientStream implements NettyStream {
this.deframer2 = null;
} else {
this.deframer = null;
this.deframer2 = MessageDeframer2.createOnClient(
inboundMessageHandler(), channel.eventLoop());
this.deframer2 = new MessageDeframer2(inboundMessageHandler(), channel.eventLoop());
}
windowUpdateManager = new WindowUpdateManager(channel, inboundFlow);
}
@ -83,8 +82,14 @@ class NettyClientStream extends AbstractClientStream implements NettyStream {
public void inboundHeadersRecieved(Http2Headers headers, boolean endOfStream) {
responseCode = responseCode(headers);
isGrpcResponse = isGrpcResponse(headers, responseCode);
if (!isGrpcResponse && endOfStream) {
if (endOfStream) {
if (isGrpcResponse) {
// TODO(user): call stashTrailers() as appropriate, then provide endOfStream to
// deframer.
setStatus(new Status(responseCode), new Metadata.Trailers());
} else {
setStatus(new Status(responseCode), new Metadata.Trailers());
}
}
}

View File

@ -57,16 +57,6 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler {
super(connection, frameReader, frameWriter, inboundFlow, outboundFlow);
this.transportListener = Preconditions.checkNotNull(transportListener, "transportListener");
this.inboundFlow = Preconditions.checkNotNull(inboundFlow, "inboundFlow");
// Observe the HTTP/2 connection for events.
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void streamHalfClosed(Http2Stream stream) {
if (stream.state() == Http2Stream.State.HALF_CLOSED_REMOTE) {
serverStream(stream).remoteEndClosed();
}
}
});
}
@Override

View File

@ -2,6 +2,7 @@ package com.google.net.stubby.newtransport.netty;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.newtransport.AbstractServerStream;
import com.google.net.stubby.newtransport.GrpcDeframer;
import com.google.net.stubby.newtransport.MessageDeframer2;
@ -35,7 +36,7 @@ class NettyServerStream extends AbstractServerStream implements NettyStream {
deframer2 = null;
} else {
deframer = null;
deframer2 = MessageDeframer2.createOnServer(inboundMessageHandler(), channel.eventLoop());
deframer2 = new MessageDeframer2(inboundMessageHandler(), channel.eventLoop());
}
windowUpdateManager =
new WindowUpdateManager(channel, Preconditions.checkNotNull(inboundFlow, "inboundFlow"));
@ -68,6 +69,11 @@ class NettyServerStream extends AbstractServerStream implements NettyStream {
channel.writeAndFlush(cmd);
}
@Override
protected void sendTrailers(Metadata.Trailers trailers) {
// TODO(user): send trailers
}
@Override
public int id() {
return id;

View File

@ -6,6 +6,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import com.google.common.primitives.Bytes;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@ -43,7 +45,7 @@ public class MetadataTest {
private static final String LANCE = "lance";
private static final byte[] LANCE_BYTES = LANCE.getBytes(StandardCharsets.US_ASCII);
private static final Metadata.Key<Fish> KEY = new Metadata.Key<Fish>("test", FISH_MARSHALLER);
private static final Metadata.Key<Fish> KEY = Metadata.Key.of("test", FISH_MARSHALLER);
@Test
public void testWriteParsed() {
@ -108,6 +110,31 @@ public class MetadataTest {
assertEquals("authority", h1.getAuthority());
}
@Test
public void integerMarshallerBytesIsBigEndian() {
assertEquals(Bytes.asList(new byte[] {0x12, 0x34, 0x56, 0x78}),
Bytes.asList(Metadata.INTEGER_MARSHALLER.toBytes(0x12345678)));
}
@Test
public void integerMarshallerAsciiIsDecimal() {
assertEquals("12345678", Metadata.INTEGER_MARSHALLER.toAscii(12345678));
}
@Test
public void roundTripIntegerMarshaller() {
roundTripInteger(0);
roundTripInteger(1);
roundTripInteger(-1);
roundTripInteger(0x12345678);
roundTripInteger(0x87654321);
}
private void roundTripInteger(Integer i) {
assertEquals(i, Metadata.INTEGER_MARSHALLER.parseBytes(Metadata.INTEGER_MARSHALLER.toBytes(i)));
assertEquals(i, Metadata.INTEGER_MARSHALLER.parseAscii(Metadata.INTEGER_MARSHALLER.toAscii(i)));
}
private static class Fish {
private String name;

View File

@ -51,7 +51,7 @@ public class GrpcDeframerTest {
private StubDecompressor decompressor;
@Mock
private StreamListener listener;
private GrpcDeframer.Sink sink;
private SettableFuture<Void> messageFuture;
@ -59,10 +59,10 @@ public class GrpcDeframerTest {
public void setup() {
MockitoAnnotations.initMocks(this);
messageFuture = SettableFuture.create();
when(listener.messageRead(any(InputStream.class), anyInt())).thenReturn(messageFuture);
when(sink.messageRead(any(InputStream.class), anyInt())).thenReturn(messageFuture);
decompressor = new StubDecompressor();
reader = new GrpcDeframer(decompressor, listener, MoreExecutors.directExecutor());
reader = new GrpcDeframer(decompressor, sink, MoreExecutors.directExecutor());
}
@Test
@ -135,7 +135,7 @@ public class GrpcDeframerTest {
byte[] fullBuffer = Arrays.copyOf(frame, frame.length * 2);
System.arraycopy(frame, 0, fullBuffer, frame.length, frame.length);
// Use only a portion of the frame. Should not call the listener.
// Use only a portion of the frame. Should not call the sink.
int startIx = 0;
int endIx = 10;
byte[] chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx);
@ -144,7 +144,7 @@ public class GrpcDeframerTest {
verifyNoPayload();
verifyNoStatus();
// Supply the rest of the frame and a portion of a second frame. Should call the listener.
// Supply the rest of the frame and a portion of a second frame. Should call the sink.
startIx = endIx;
endIx = startIx + frame.length;
chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx);
@ -156,7 +156,7 @@ public class GrpcDeframerTest {
private void verifyPayload() {
ArgumentCaptor<InputStream> captor = ArgumentCaptor.forClass(InputStream.class);
verify(listener).messageRead(captor.capture(), eq(MESSAGE.length()));
verify(sink).messageRead(captor.capture(), eq(MESSAGE.length()));
assertEquals(MESSAGE, readString(captor.getValue(), MESSAGE.length()));
}
@ -176,16 +176,18 @@ public class GrpcDeframerTest {
private void verifyStatus(Transport.Code code) {
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener).closed(captor.capture(), notNull(Metadata.Trailers.class));
verify(sink).statusRead(captor.capture());
verify(sink).endOfStream();
assertEquals(code, captor.getValue().getCode());
}
private void verifyNoPayload() {
verify(listener, never()).messageRead(any(InputStream.class), anyInt());
verify(sink, never()).messageRead(any(InputStream.class), anyInt());
}
private void verifyNoStatus() {
verify(listener, never()).closed(any(Status.class), notNull(Metadata.Trailers.class));
verify(sink, never()).statusRead(any(Status.class));
verify(sink, never()).endOfStream();
}
private static byte[] payloadFrame() throws IOException {

View File

@ -16,6 +16,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.MessageDeframer2.Sink;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -34,133 +35,121 @@ import java.io.InputStream;
*/
@RunWith(JUnit4.class)
public class MessageDeframer2Test {
private StreamListener listener = mock(StreamListener.class);
private MessageDeframer2 deframer
= MessageDeframer2.createOnClient(listener, MoreExecutors.directExecutor());
private Sink sink = mock(Sink.class);
private MessageDeframer2 deframer = new MessageDeframer2(sink, MoreExecutors.directExecutor());
private ArgumentCaptor<InputStream> messages = ArgumentCaptor.forClass(InputStream.class);
@Test
public void simplePayload() {
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false);
verify(listener).messageRead(messages.capture(), eq(2));
verify(sink).messageRead(messages.capture(), eq(2));
assertEquals(Bytes.asList(new byte[] {3, 14}), bytes(messages));
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(sink);
}
@Test
public void smallCombinedPayloads() {
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false);
verify(listener).messageRead(messages.capture(), eq(1));
verify(sink).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verify(listener).messageRead(messages.capture(), eq(2));
verify(sink).messageRead(messages.capture(), eq(2));
assertEquals(Bytes.asList(new byte[] {14, 15}), bytes(messages));
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(sink);
}
@Test
public void clientEndOfStreamShouldNotNotifyClose() {
public void endOfStreamWithPayloadShouldNotifyEndOfStream() {
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true);
verify(listener).messageRead(messages.capture(), eq(1));
verify(sink).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verifyNoMoreInteractions(listener);
verify(sink).endOfStream();
verifyNoMoreInteractions(sink);
}
@Test
public void serverEndOfStreamWithPayloadShouldNotifyClose() {
deframer = MessageDeframer2.createOnServer(listener, MoreExecutors.directExecutor());
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true);
verify(listener).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verify(listener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
verifyNoMoreInteractions(listener);
}
@Test
public void serverEndOfStreamShouldNotifyClose() {
deframer = MessageDeframer2.createOnServer(listener, MoreExecutors.directExecutor());
public void endOfStreamShouldNotifyEndOfStream() {
deframer.deframe(buffer(new byte[0]), true);
verify(listener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
verifyNoMoreInteractions(listener);
verify(sink).endOfStream();
verifyNoMoreInteractions(sink);
}
@Test
public void payloadSplitBetweenBuffers() {
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9}), false);
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(sink);
deframer.deframe(buffer(new byte[] {2, 6}), false);
verify(listener).messageRead(messages.capture(), eq(7));
verify(sink).messageRead(messages.capture(), eq(7));
assertEquals(Bytes.asList(new byte[] {3, 14, 1, 5, 9, 2, 6}), bytes(messages));
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(sink);
}
@Test
public void frameHeaderSplitBetweenBuffers() {
deframer.deframe(buffer(new byte[] {0, 0}), false);
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(sink);
deframer.deframe(buffer(new byte[] {0, 0, 1, 3}), false);
verify(listener).messageRead(messages.capture(), eq(1));
verify(sink).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(sink);
}
@Test
public void emptyPayload() {
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 0}), false);
verify(listener).messageRead(messages.capture(), eq(0));
verify(sink).messageRead(messages.capture(), eq(0));
assertEquals(Bytes.asList(), bytes(messages));
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(sink);
}
@Test
public void largerFrameSize() {
deframer.deframe(
Buffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false);
verify(listener).messageRead(messages.capture(), eq(1000));
verify(sink).messageRead(messages.capture(), eq(1000));
assertEquals(Bytes.asList(new byte[1000]), bytes(messages));
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(sink);
}
@Test
public void payloadCallbackShouldWaitForFutureCompletion() {
SettableFuture<Void> messageFuture = SettableFuture.create();
when(listener.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture);
when(sink.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture);
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false);
verify(listener).messageRead(messages.capture(), eq(1));
verify(sink).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(sink);
messageFuture.set(null);
verify(listener).messageRead(messages.capture(), eq(2));
verify(sink).messageRead(messages.capture(), eq(2));
assertEquals(Bytes.asList(new byte[] {14, 15}), bytes(messages));
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(sink);
}
@Test
public void serverClosedCallbackShouldWaitForFutureCompletion() {
deframer = MessageDeframer2.createOnServer(listener, MoreExecutors.directExecutor());
public void endOfStreamCallbackShouldWaitForFutureCompletion() {
SettableFuture<Void> messageFuture = SettableFuture.create();
when(listener.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture);
when(sink.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture);
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true);
verify(listener).messageRead(messages.capture(), eq(1));
verify(sink).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(sink);
messageFuture.set(null);
verify(listener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
verifyNoMoreInteractions(listener);
verify(sink).endOfStream();
verifyNoMoreInteractions(sink);
}
@Test
public void compressed() {
deframer = MessageDeframer2.createOnClient(
listener, MoreExecutors.directExecutor(), MessageDeframer2.Compression.GZIP);
deframer = new MessageDeframer2(
sink, MoreExecutors.directExecutor(), MessageDeframer2.Compression.GZIP);
byte[] payload = compress(new byte[1000]);
assertTrue(payload.length < 100);
byte[] header = new byte[] {1, 0, 0, 0, (byte) payload.length};
deframer.deframe(buffer(Bytes.concat(header, payload)), false);
verify(listener).messageRead(messages.capture(), eq(1000));
verify(sink).messageRead(messages.capture(), eq(1000));
assertEquals(Bytes.asList(new byte[1000]), bytes(messages));
verifyNoMoreInteractions(listener);
verifyNoMoreInteractions(sink);
}
private static List<Byte> bytes(ArgumentCaptor<InputStream> captor) {

View File

@ -138,7 +138,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
assertArrayEquals(CONTENT, ByteStreams.toByteArray(captor.getValue()));
if (endStream) {
verify(streamListener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
verify(streamListener).halfClosed();
}
verifyNoMoreInteractions(streamListener);
}
@ -149,7 +149,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
handler.channelRead(ctx, emptyDataFrame(STREAM_ID, true));
verify(streamListener, never()).messageRead(any(InputStream.class), anyInt());
verify(streamListener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
verify(streamListener).halfClosed();
verifyNoMoreInteractions(streamListener);
}

View File

@ -2,6 +2,7 @@ package com.google.net.stubby.newtransport.netty;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.notNull;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.never;
@ -15,6 +16,9 @@ import com.google.net.stubby.newtransport.ServerStreamListener;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@ -57,7 +61,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
verifyZeroInteractions(serverListener);
// Sending complete. Listener gets closed()
stream().complete();
verify(serverListener).closed(Status.CANCELLED, trailers);
verify(serverListener).closed(eq(Status.CANCELLED), notNull(Metadata.Trailers.class));
assertEquals(StreamState.CLOSED, stream.state());
verifyZeroInteractions(serverListener);
}
@ -65,7 +69,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
@Test
public void closeAfterClientHalfCloseShouldSucceed() throws Exception {
// Client half-closes. Listener gets halfClosed()
stream().remoteEndClosed();
stream().inboundDataReceived(new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true);
assertEquals(StreamState.WRITE_ONLY, stream.state());
verify(serverListener).halfClosed();
// Server closes. Status sent
@ -76,23 +80,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
new SendGrpcFrameCommand(STREAM_ID, statusFrame(Status.OK), true));
// Sending and receiving complete. Listener gets closed()
stream().complete();
verify(serverListener).closed(Status.OK, trailers);
verifyNoMoreInteractions(serverListener);
}
@Test
public void clientHalfCloseForTheSecondTimeShouldFail() throws Exception {
// Client half-closes. Listener gets halfClosed()
stream().remoteEndClosed();
assertEquals(StreamState.WRITE_ONLY, stream.state());
verify(serverListener).halfClosed();
// Client half-closes again.
try {
stream().remoteEndClosed();
fail();
} catch (IllegalStateException expected) {
}
assertEquals(StreamState.WRITE_ONLY, stream.state());
verify(serverListener).closed(eq(Status.OK), notNull(Metadata.Trailers.class));
verifyNoMoreInteractions(serverListener);
}
@ -121,7 +109,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
public void abortStreamAfterClientHalfCloseShouldCallClose() {
Status status = new Status(Transport.Code.INTERNAL, new Throwable());
// Client half-closes. Listener gets halfClosed()
stream().remoteEndClosed();
stream().inboundDataReceived(new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true);
assertEquals(StreamState.WRITE_ONLY, stream.state());
verify(serverListener).halfClosed();
// Abort