From c0f41920bd3d92b50683b23b2d1c6cb81ec583f2 Mon Sep 17 00:00:00 2001 From: ejona Date: Tue, 23 Dec 2014 12:16:17 -0800 Subject: [PATCH] Remove gRPC v1 support. No major refactorings/simplifications were done. Only gRPC v1 support infrastructure was removed. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=82737436 --- .../transport/AbstractClientStream.java | 20 +- .../transport/AbstractServerStream.java | 33 +- .../net/stubby/transport/AbstractStream.java | 66 +--- .../stubby/transport/CompressionFramer.java | 367 ------------------ .../net/stubby/transport/Decompressor.java | 78 ---- .../google/net/stubby/transport/Framer.java | 91 ----- .../net/stubby/transport/GrpcDeframer.java | 320 --------------- .../stubby/transport/Http2ClientStream.java | 35 +- .../net/stubby/transport/MessageFramer.java | 124 ------ .../net/stubby/transport/MessageFramer2.java | 48 ++- .../stubby/transport/TransportFrameUtil.java | 34 -- .../transport/CompressionFramerTest.java | 130 ------- .../stubby/transport/GrpcDeframerTest.java | 280 ------------- .../stubby/transport/MessageFramer2Test.java | 10 +- .../stubby/transport/MessageFramerTest.java | 126 ------ integration-testing/build.gradle | 3 +- integration-testing/pom.xml | 5 + .../integration/AbstractTransportTest.java | 12 +- .../integration/TestServiceClient.java | 8 +- .../integration/TestServiceServer.java | 8 +- .../transport/netty/NettyClientStream.java | 2 +- .../transport/netty/NettyDecompressor.java | 313 --------------- .../transport/netty/NettyServerStream.java | 2 +- .../netty/NettyClientStreamTest.java | 11 +- .../netty/NettyDecompressorTest.java | 201 ---------- .../netty/NettyServerHandlerTest.java | 3 +- .../stubby/transport/netty/NettyTestUtil.java | 10 +- .../transport/okhttp/OkHttpClientStream.java | 5 +- 28 files changed, 105 insertions(+), 2240 deletions(-) delete mode 100644 core/src/main/java/com/google/net/stubby/transport/CompressionFramer.java delete mode 100644 core/src/main/java/com/google/net/stubby/transport/Decompressor.java delete mode 100644 core/src/main/java/com/google/net/stubby/transport/Framer.java delete mode 100644 core/src/main/java/com/google/net/stubby/transport/GrpcDeframer.java delete mode 100644 core/src/main/java/com/google/net/stubby/transport/MessageFramer.java delete mode 100644 core/src/test/java/com/google/net/stubby/transport/CompressionFramerTest.java delete mode 100644 core/src/test/java/com/google/net/stubby/transport/GrpcDeframerTest.java delete mode 100644 core/src/test/java/com/google/net/stubby/transport/MessageFramerTest.java delete mode 100644 netty/src/main/java/com/google/net/stubby/transport/netty/NettyDecompressor.java delete mode 100644 netty/src/test/java/com/google/net/stubby/transport/netty/NettyDecompressorTest.java diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java index 6ead716c43..0384d4f291 100644 --- a/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java @@ -62,10 +62,8 @@ public abstract class AbstractClientStream extends AbstractStream private Metadata.Trailers trailers; - protected AbstractClientStream(ClientStreamListener listener, - @Nullable Decompressor decompressor, - Executor deframerExecutor) { - super(decompressor, deframerExecutor); + protected AbstractClientStream(ClientStreamListener listener, Executor deframerExecutor) { + super(deframerExecutor); this.listener = Preconditions.checkNotNull(listener); } @@ -151,20 +149,10 @@ public abstract class AbstractClientStream extends AbstractStream // Stash the status & trailers so they can be delivered by the deframer calls // remoteEndClosed this.status = status; - if (GRPC_V2_PROTOCOL) { - this.trailers = trailers; - } + this.trailers = trailers; deframe(Buffers.empty(), true); } - /** gRPC protocol v1 support */ - @Override - protected void receiveStatus(Status status) { - Preconditions.checkNotNull(status, "status"); - this.status = status; - trailers = new Metadata.Trailers(); - } - @Override protected void remoteEndClosed() { transportReportStatus(status, trailers); @@ -207,7 +195,7 @@ public abstract class AbstractClientStream extends AbstractStream @Override public final void halfClose() { if (outboundPhase(Phase.STATUS) != Phase.STATUS) { - closeFramer(null); + closeFramer(); } } diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java index d10349a603..838d70a238 100644 --- a/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java @@ -63,9 +63,8 @@ public abstract class AbstractServerStream extends AbstractStream /** Saved trailers from close() that need to be sent once the framer has sent all messages. */ private Metadata.Trailers stashedTrailers; - protected AbstractServerStream(IdT id, @Nullable Decompressor decompressor, - Executor deframerExecutor) { - super(decompressor, deframerExecutor); + protected AbstractServerStream(IdT id, Executor deframerExecutor) { + super(deframerExecutor); id(id); } @@ -79,12 +78,6 @@ public abstract class AbstractServerStream extends AbstractStream return listener.messageRead(is, length); } - /** gRPC protocol v1 support */ - @Override - protected void receiveStatus(Status status) { - Preconditions.checkState(status == Status.OK, "Received status can only be OK on server"); - } - @Override public void writeHeaders(Metadata.Headers headers) { Preconditions.checkNotNull(headers, "headers"); @@ -111,7 +104,7 @@ public abstract class AbstractServerStream extends AbstractStream gracefulClose = true; this.stashedTrailers = trailers; writeStatusToTrailers(status); - closeFramer(status); + closeFramer(); } } @@ -148,17 +141,13 @@ public abstract class AbstractServerStream extends AbstractStream @Override protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) { - if (!GRPC_V2_PROTOCOL) { - sendFrame(frame, endOfStream); - } else { - if (frame.hasRemaining()) { - sendFrame(frame, false); - } - if (endOfStream) { - sendTrailers(stashedTrailers, headersSent); - headersSent = true; - stashedTrailers = null; - } + if (frame.hasRemaining()) { + sendFrame(frame, false); + } + if (endOfStream) { + sendTrailers(stashedTrailers, headersSent); + headersSent = true; + stashedTrailers = null; } } @@ -237,7 +226,7 @@ public abstract class AbstractServerStream extends AbstractStream stashedTrailers = new Metadata.Trailers(); } writeStatusToTrailers(status); - closeFramer(status); + closeFramer(); } else { dispose(); } diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java index 17787b944c..1c34879690 100644 --- a/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java @@ -39,7 +39,6 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.net.stubby.Status; import java.io.InputStream; import java.nio.ByteBuffer; @@ -51,13 +50,6 @@ import javax.annotation.Nullable; * Abstract base class for {@link Stream} implementations. */ public abstract class AbstractStream implements Stream { - /** - * Global to enable gRPC v2 protocol support, which may be incomplete. This is a complete hack - * and should please, please, please be temporary to ease migration. - */ - // TODO(user): remove this once v1 support is dropped. - public static boolean GRPC_V2_PROTOCOL = true; - /** * Indicates the phase of the GRPC stream in one direction. */ @@ -66,7 +58,7 @@ public abstract class AbstractStream implements Stream { } private volatile IdT id; - private final Framer framer; + private final MessageFramer2 framer; private final FutureCallback deframerErrorCallback = new FutureCallback() { @Override public void onSuccess(Object result) {} @@ -77,7 +69,6 @@ public abstract class AbstractStream implements Stream { } }; - final GrpcDeframer deframer; final MessageDeframer2 deframer2; /** @@ -90,9 +81,8 @@ public abstract class AbstractStream implements Stream { */ private Phase outboundPhase = Phase.HEADERS; - AbstractStream(@Nullable Decompressor decompressor, - Executor deframerExecutor) { - GrpcDeframer.Sink inboundMessageHandler = new GrpcDeframer.Sink() { + AbstractStream(Executor deframerExecutor) { + MessageDeframer2.Sink inboundMessageHandler = new MessageDeframer2.Sink() { @Override public ListenableFuture messageRead(InputStream input, final int length) { ListenableFuture future = null; @@ -104,17 +94,12 @@ public abstract class AbstractStream implements Stream { } } - @Override - public void statusRead(Status status) { - receiveStatus(status); - } - @Override public void endOfStream() { remoteEndClosed(); } }; - Framer.Sink outboundFrameHandler = new Framer.Sink() { + MessageFramer2.Sink outboundFrameHandler = new MessageFramer2.Sink() { @Override public void deliverFrame(ByteBuffer frame, boolean endOfStream) { internalSendFrame(frame, endOfStream); @@ -129,19 +114,8 @@ public abstract class AbstractStream implements Stream { returnProcessedBytes(numBytes); } }; - if (!GRPC_V2_PROTOCOL) { - framer = new MessageFramer(outboundFrameHandler, 4096); - this.deframer = - new GrpcDeframer(decompressor, inboundMessageHandler, deframerExecutor, listener); - this.deframer2 = null; - } else { - if (decompressor != null) { - decompressor.close(); - } - framer = new MessageFramer2(outboundFrameHandler, 4096); - this.deframer = null; - this.deframer2 = new MessageDeframer2(inboundMessageHandler, deframerExecutor, listener); - } + framer = new MessageFramer2(outboundFrameHandler, 4096); + this.deframer2 = new MessageDeframer2(inboundMessageHandler, deframerExecutor, listener); } /** @@ -188,14 +162,9 @@ public abstract class AbstractStream implements Stream { * Closes the underlying framer. * *

No-op if the framer has already been closed. - * - * @param status if not null, will write the status to the framer before closing it */ - final void closeFramer(@Nullable Status status) { + final void closeFramer() { if (!framer.isClosed()) { - if (status != null) { - framer.writeStatus(status); - } framer.close(); } } @@ -225,9 +194,6 @@ public abstract class AbstractStream implements Stream { /** A message was deframed. */ protected abstract ListenableFuture receiveMessage(InputStream is, int length); - /** A status was deframed. */ - protected abstract void receiveStatus(Status status); - /** Deframer reached end of stream. */ protected abstract void remoteEndClosed(); @@ -248,11 +214,7 @@ public abstract class AbstractStream implements Stream { */ protected final void deframe(Buffer frame, boolean endOfStream) { ListenableFuture future; - if (GRPC_V2_PROTOCOL) { - future = deframer2.deframe(frame, endOfStream); - } else { - future = deframer.deframe(frame, endOfStream); - } + future = deframer2.deframe(frame, endOfStream); if (future != null) { Futures.addCallback(future, deframerErrorCallback); } @@ -262,15 +224,9 @@ public abstract class AbstractStream implements Stream { * Delays delivery from the deframer until the given future completes. */ protected final void delayDeframer(ListenableFuture future) { - if (GRPC_V2_PROTOCOL) { - ListenableFuture deliveryFuture = deframer2.delayProcessing(future); - if (deliveryFuture != null) { - Futures.addCallback(deliveryFuture, deframerErrorCallback); - } - } else { - // V1 is a little broken as it doesn't strictly wait for the last payload handled - // by the deframer to be processed by the application layer. Not worth fixing as will - // be removed when the v1 deframer is removed. + ListenableFuture deliveryFuture = deframer2.delayProcessing(future); + if (deliveryFuture != null) { + Futures.addCallback(deliveryFuture, deframerErrorCallback); } } diff --git a/core/src/main/java/com/google/net/stubby/transport/CompressionFramer.java b/core/src/main/java/com/google/net/stubby/transport/CompressionFramer.java deleted file mode 100644 index 9ec163303e..0000000000 --- a/core/src/main/java/com/google/net/stubby/transport/CompressionFramer.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package com.google.net.stubby.transport; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.io.ByteStreams; -import com.google.net.stubby.DeferredInputStream; -import com.google.net.stubby.transport.Framer.Sink; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.logging.Level; -import java.util.logging.Logger; -import java.util.zip.Deflater; - -/** - * Compression framer for HTTP/2 transport frames, for use in both compression and - * non-compression scenarios. Receives message-stream as input. It is able to change compression - * configuration on-the-fly, but will not actually begin using the new configuration until the next - * full frame. - */ -class CompressionFramer { - /** - * Compression level to indicate using this class's default level. Note that this value is - * allowed to conflict with Deflate.DEFAULT_COMPRESSION, in which case this class's default - * prevails. - */ - public static final int DEFAULT_COMPRESSION_LEVEL = -1; - private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; - /** - * Size of the GRPC compression frame header which consists of: - * 1 byte for the compression type, - * 3 bytes for the length of the compression frame. - */ - @VisibleForTesting - static final int HEADER_LENGTH = 4; - /** - * Number of frame bytes to reserve to allow for zlib overhead. This does not include data-length - * dependent overheads and compression latency (delay between providing data to zlib and output of - * the compressed data). - * - *

References: - * deflate framing: http://www.gzip.org/zlib/rfc-deflate.html - * (note that bit-packing is little-endian (section 3.1.1) whereas description of sequences - * is big-endian, so bits appear reversed), - * zlib framing: http://tools.ietf.org/html/rfc1950, - * details on flush behavior: http://www.zlib.net/manual.html - */ - @VisibleForTesting - static final int MARGIN - = 5 /* deflate current block overhead, assuming no compression: - block type (1) + len (2) + nlen (2) */ - + 5 /* deflate flush; adds an empty block after current: - 00 (not end; no compression) 00 00 (len) FF FF (nlen) */ - + 5 /* deflate flush; some versions of zlib output two empty blocks on some flushes */ - + 5 /* deflate finish; adds empty block to mark end, since we commonly flush before finish: - 03 (end; fixed Huffman + 5 bits of end of block) 00 (last 3 bits + padding), - or if compression level is 0: 01 (end; no compression) 00 00 (len) FF FF (nlen) */ - + 2 /* zlib header; CMF (1) + FLG (1) */ + 4 /* zlib ADLER32 (4) */ - + 5 /* additional safety for good measure */; - - private static final Logger log = Logger.getLogger(CompressionFramer.class.getName()); - - private final Sink sink; - /** - * Bytes of frame being constructed. {@code position() == 0} when no frame in progress. - */ - private final ByteBuffer bytebuf; - /** Number of frame bytes it is acceptable to leave unused when compressing. */ - private final int sufficient; - private Deflater deflater; - /** Number of bytes written to deflater since last deflate sync. */ - private int writtenSinceSync; - /** Number of bytes read from deflater since last deflate sync. */ - private int readSinceSync; - /** - * Whether the current frame is actually being compressed. If {@code bytebuf.position() == 0}, - * then this value has no meaning. - */ - private boolean usingCompression; - /** - * Whether compression is requested. This does not imply we are compressing the current frame - * (see {@link #usingCompression}), or that we will even compress the next frame (see {@link - * #compressionUnsupported}). - */ - private boolean allowCompression; - /** Whether compression is possible with current configuration and platform. */ - private final boolean compressionUnsupported; - /** - * Compression level to set on the Deflater, where {@code DEFAULT_COMPRESSION_LEVEL} implies this - * class's default. - */ - private int compressionLevel = DEFAULT_COMPRESSION_LEVEL; - private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter(); - - /** - * Since compression tries to form full frames, if compression is working well then it will - * consecutively compress smaller amounts of input data in order to not exceed the frame size. For - * example, if the data is getting 50% compression and a maximum frame size of 128, then it will - * encode roughly 128 bytes which leaves 64, so we encode 64, 32, 16, 8, 4, 2, 1, 1. - * {@code sufficient} cuts off the long tail and says that at some point the frame is "good - * enough" to stop. Choosing a value of {@code 0} is not outrageous. - * - * @param maxFrameSize maximum number of bytes allowed for output frames - * @param allowCompression whether frames should be compressed - * @param sufficient number of frame bytes it is acceptable to leave unused when compressing - */ - public CompressionFramer(Sink sink, int maxFrameSize, boolean allowCompression, - int sufficient) { - this.sink = sink; - this.allowCompression = allowCompression; - int maxSufficient = maxFrameSize - HEADER_LENGTH - MARGIN - - 1 /* to force at least one byte of data */; - boolean compressionUnsupported = false; - if (maxSufficient < 0) { - compressionUnsupported = true; - log.log(Level.INFO, "Frame not large enough for compression"); - } else if (maxSufficient < sufficient) { - log.log(Level.INFO, "Compression sufficient reduced to {0} from {1} to fit in frame size {2}", - new Object[] {maxSufficient, sufficient, maxFrameSize}); - sufficient = maxSufficient; - } - this.sufficient = sufficient; - // TODO(user): Benchmark before switching to direct buffers - bytebuf = ByteBuffer.allocate(maxFrameSize); - if (!bytebuf.hasArray()) { - compressionUnsupported = true; - log.log(Level.INFO, "Byte buffer doesn't support array(), which is required for compression"); - } - this.compressionUnsupported = compressionUnsupported; - } - - /** - * Sets whether compression is encouraged. - */ - public void setAllowCompression(boolean allow) { - this.allowCompression = allow; - } - - /** - * Set the preferred compression level for when compression is enabled. - * - * @param level the preferred compression level (0-9), or {@code DEFAULT_COMPRESSION_LEVEL} to use - * this class's default - * @see java.util.zip.Deflater#setLevel - */ - public void setCompressionLevel(int level) { - Preconditions.checkArgument(level == DEFAULT_COMPRESSION_LEVEL - || (level >= Deflater.NO_COMPRESSION && level <= Deflater.BEST_COMPRESSION), - "invalid compression level"); - this.compressionLevel = level; - } - - /** - * Ensures state and buffers are initialized for writing data to a frame. Callers should be very - * aware this method may modify {@code usingCompression}. - */ - private void checkInitFrame() { - if (bytebuf.position() != 0) { - return; - } - bytebuf.position(HEADER_LENGTH); - usingCompression = compressionUnsupported ? false : allowCompression; - if (usingCompression) { - if (deflater == null) { - deflater = new Deflater(); - } else { - deflater.reset(); - } - deflater.setLevel(compressionLevel == DEFAULT_COMPRESSION_LEVEL - ? Deflater.DEFAULT_COMPRESSION : compressionLevel); - writtenSinceSync = 0; - readSinceSync = 0; - } - } - - /** Frame contents of {@code message}, flushing to {@code sink} as necessary. */ - public int write(InputStream message) throws IOException { - checkInitFrame(); - if (!usingCompression && bytebuf.hasArray()) { - if (bytebuf.remaining() == 0) { - commitToSink(false, false); - } - int available = message.available(); - if (available <= bytebuf.remaining()) { - // When InputStream is DeferredProtoInputStream, this is zero-copy because bytebuf is large - // enough for the proto to be serialized directly into it. - int read = ByteStreams.read(message, - bytebuf.array(), bytebuf.arrayOffset() + bytebuf.position(), bytebuf.remaining()); - bytebuf.position(bytebuf.position() + read); - if (read != available) { - throw new RuntimeException("message.available() did not follow our semantics of always " - + "returning the number of remaining bytes"); - } - return read; - } - } - if (message instanceof DeferredInputStream) { - return ((DeferredInputStream) message).flushTo(outputStreamAdapter); - } else { - // This could be optimized when compression is off, but we expect performance-critical code - // to provide a DeferredInputStream. - return (int) ByteStreams.copy(message, outputStreamAdapter); - } - } - - /** - * Frame contents of {@code b} between {@code off} (inclusive) and {@code off + len} (exclusive), - * flushing to {@code sink} as necessary. - */ - public void write(byte[] b, int off, int len) { - while (len > 0) { - checkInitFrame(); - if (!usingCompression) { - if (bytebuf.remaining() == 0) { - commitToSink(false, false); - continue; - } - int toWrite = Math.min(len, bytebuf.remaining()); - bytebuf.put(b, off, toWrite); - off += toWrite; - len -= toWrite; - } else { - if (bytebuf.remaining() <= MARGIN + sufficient) { - commitToSink(false, false); - continue; - } - // Amount of memory that is guaranteed not to be consumed, including in-flight data in zlib. - int safeCapacity = bytebuf.remaining() - MARGIN - - (writtenSinceSync - readSinceSync) - dataLengthDependentOverhead(writtenSinceSync); - if (safeCapacity <= 0) { - while (deflatePut(deflater, bytebuf, Deflater.SYNC_FLUSH) != 0) {} - writtenSinceSync = 0; - readSinceSync = 0; - continue; - } - int toWrite = Math.min(len, safeCapacity - dataLengthDependentOverhead(safeCapacity)); - deflater.setInput(b, off, toWrite); - writtenSinceSync += toWrite; - while (!deflater.needsInput()) { - readSinceSync += deflatePut(deflater, bytebuf, Deflater.NO_FLUSH); - } - // Clear internal references of byte[] b. - deflater.setInput(EMPTY_BYTE_ARRAY); - off += toWrite; - len -= toWrite; - } - } - } - - /** - * When data is uncompressable, there are 5B of overhead per deflate block, which is generally - * 16 KiB for zlib, but the format supports up to 32 KiB. One block's overhead is already - * accounted for in MARGIN. We use 1B/2KiB to circumvent dealing with rounding errors. Note that - * 1B/2KiB is not enough to support 8 KiB blocks due to rounding errors. - */ - private static int dataLengthDependentOverhead(int length) { - return length / 2048; - } - - private static int deflatePut(Deflater deflater, ByteBuffer bytebuf, int flush) { - if (bytebuf.remaining() == 0) { - throw new AssertionError("Compressed data exceeded frame size"); - } - int deflateBytes = deflater.deflate(bytebuf.array(), bytebuf.arrayOffset() + bytebuf.position(), - bytebuf.remaining(), flush); - bytebuf.position(bytebuf.position() + deflateBytes); - return deflateBytes; - } - - public void endOfMessage() { - if ((!usingCompression && bytebuf.remaining() == 0) - || (usingCompression && bytebuf.remaining() <= MARGIN + sufficient)) { - commitToSink(true, false); - } - } - - public void flush() { - if (bytebuf.position() == 0) { - return; - } - commitToSink(true, false); - } - - public void close() { - if (bytebuf.position() == 0) { - // No pending frame, so send an empty one. - bytebuf.flip(); - sink.deliverFrame(bytebuf, true); - bytebuf.clear(); - } else { - commitToSink(true, true); - } - } - - /** - * Writes compression frame to sink. It does not initialize the next frame, so {@link - * #checkInitFrame()} is necessary if other frames are to follow. - */ - private void commitToSink(boolean endOfMessage, boolean endOfStream) { - if (usingCompression) { - deflater.finish(); - while (!deflater.finished()) { - deflatePut(deflater, bytebuf, Deflater.NO_FLUSH); - } - if (endOfMessage) { - deflater.end(); - deflater = null; - } - } - int frameFlag = usingCompression - ? TransportFrameUtil.FLATE_FLAG : TransportFrameUtil.NO_COMPRESS_FLAG; - // Header = 1b flag | 3b length of GRPC frame - int header = (frameFlag << 24) | (bytebuf.position() - 4); - bytebuf.putInt(0, header); - bytebuf.flip(); - sink.deliverFrame(bytebuf, endOfStream); - bytebuf.clear(); - } - - private class OutputStreamAdapter extends OutputStream { - private final byte[] singleByte = new byte[1]; - - @Override - public void write(int b) { - singleByte[0] = (byte) b; - write(singleByte, 0, 1); - } - - @Override - public void write(byte[] b, int off, int len) { - CompressionFramer.this.write(b, off, len); - } - } -} diff --git a/core/src/main/java/com/google/net/stubby/transport/Decompressor.java b/core/src/main/java/com/google/net/stubby/transport/Decompressor.java deleted file mode 100644 index 5329d90b84..0000000000 --- a/core/src/main/java/com/google/net/stubby/transport/Decompressor.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package com.google.net.stubby.transport; - -import java.io.Closeable; - -import javax.annotation.Nullable; - -/** - * An object responsible for reading GRPC compression frames for a single stream. - */ -public interface Decompressor extends Closeable { - - /** - * Adds the given chunk of a GRPC compression frame to the internal buffers. If the data is - * compressed, it is uncompressed whenever possible (which may only be after the entire - * compression frame has been received). - * - *

Some or all of the given {@code data} chunk may not be made immediately available via - * {@link #readBytes} due to internal buffering. - * - * @param data a received chunk of a GRPC compression frame. Control over the life cycle for this - * buffer is given to this {@link Decompressor}. Only this {@link Decompressor} should call - * {@link Buffer#close} after this point. - */ - void decompress(Buffer data); - - /** - * Reads up to the given number of bytes. Ownership of the returned {@link Buffer} is transferred - * to the caller who is responsible for calling {@link Buffer#close}. - * - *

The length of the returned {@link Buffer} may be less than {@code maxLength}, but will never - * be 0. If no data is available, {@code null} is returned. To ensure that all available data is - * read, the caller should repeatedly call {@link #readBytes} until it returns {@code null}. - * - * @param maxLength the maximum number of bytes to read. This value must be > 0, otherwise throws - * an {@link IllegalArgumentException}. - * @return a {@link Buffer} containing the number of bytes read or {@code null} if no data is - * currently available. - */ - @Nullable - Buffer readBytes(int maxLength); - - /** - * Closes this decompressor and frees any resources. - */ - @Override - void close(); -} diff --git a/core/src/main/java/com/google/net/stubby/transport/Framer.java b/core/src/main/java/com/google/net/stubby/transport/Framer.java deleted file mode 100644 index d8035ce3df..0000000000 --- a/core/src/main/java/com/google/net/stubby/transport/Framer.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package com.google.net.stubby.transport; - -import com.google.net.stubby.Status; - -import java.io.InputStream; - -/** - * Implementations produce the GRPC byte sequence and then split it over multiple frames to be - * delivered via the transport layer which implements {@link Framer.Sink} - */ -public interface Framer { - - /** - * Sink implemented by the transport layer to receive frames and forward them to their destination - */ - public interface Sink { - /** - * Deliver a frame via the transport. - * - * @param frame the contents of the frame to deliver - * @param endOfStream whether the frame is the last one for the GRPC stream - */ - public void deliverFrame(T frame, boolean endOfStream); - } - - /** - * Write out a Payload message. {@code payload} will be completely consumed. - * {@code payload.available()} must return the number of remaining bytes to be read. - */ - public void writePayload(InputStream payload, int length); - - /** - * Write out a Status message. - */ - // TODO(user): change this signature when we actually start writing out the complete Status. - public void writeStatus(Status status); - - /** - * Flush any buffered data in the framer to the sink. - */ - public void flush(); - - /** - * Indicates whether or not this {@link Framer} has been closed via a call to either - * {@link #close()} or {@link #dispose()}. - */ - public boolean isClosed(); - - /** - * Flushes and closes the framer and releases any buffers. After the {@link Framer} is closed or - * disposed, additional calls to this method will have no affect. - */ - public void close(); - - /** - * Closes the framer and releases any buffers, but does not flush. After the {@link Framer} is - * closed or disposed, additional calls to this method will have no affect. - */ - public void dispose(); -} diff --git a/core/src/main/java/com/google/net/stubby/transport/GrpcDeframer.java b/core/src/main/java/com/google/net/stubby/transport/GrpcDeframer.java deleted file mode 100644 index f9a344f301..0000000000 --- a/core/src/main/java/com/google/net/stubby/transport/GrpcDeframer.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package com.google.net.stubby.transport; - -import static com.google.net.stubby.GrpcFramingUtil.FRAME_LENGTH; -import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_LENGTH; -import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_MASK; -import static com.google.net.stubby.GrpcFramingUtil.PAYLOAD_FRAME; -import static com.google.net.stubby.GrpcFramingUtil.STATUS_FRAME; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import com.google.net.stubby.Status; - -import java.io.Closeable; -import java.util.concurrent.Executor; - -/** - * Deframer for GRPC frames. Delegates deframing/decompression of the GRPC compression frame to a - * {@link Decompressor}. - */ -public class GrpcDeframer implements Closeable { - public interface Sink extends MessageDeframer2.Sink { - void statusRead(Status status); - } - - private enum State { - HEADER, BODY - } - - private static final int HEADER_LENGTH = FRAME_TYPE_LENGTH + FRAME_LENGTH; - private final Decompressor decompressor; - private final Executor executor; - private final DeframerListener listener; - private State state = State.HEADER; - private int requiredLength = HEADER_LENGTH; - private int frameType; - private boolean statusNotified; - private boolean endOfStream; - private SettableFuture deliveryOutstanding; - private Sink sink; - private CompositeBuffer nextFrame; - - /** - * Constructs the deframer. - * - * @param decompressor the object used for de-framing GRPC compression frames. - * @param sink the sink for fully read GRPC messages. - * @param executor the executor to be used for delivery. All calls to - * {@link #deframe(Buffer, boolean)} must be made in the context of this executor. This - * executor must not allow concurrent access to this class, so it must be either a single - * thread or have sequential processing of events. - * @param listener a listener to deframing events - */ - public GrpcDeframer(Decompressor decompressor, Sink sink, Executor executor, - DeframerListener listener) { - this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor"); - this.sink = Preconditions.checkNotNull(sink, "sink"); - this.executor = Preconditions.checkNotNull(executor, "executor"); - this.listener = Preconditions.checkNotNull(listener, "listener"); - } - - /** - * Adds the given data to this deframer and attempts delivery to the sink. - * - *

If returned future is not {@code null}, then it completes when no more deliveries are - * occuring. Delivering completes if all available deframing input is consumed or if delivery - * resulted in an exception, in which case this method may throw the exception or the returned - * future will fail with the throwable. The future is guaranteed to complete within the executor - * provided during construction. - */ - public ListenableFuture deframe(Buffer data, boolean endOfStream) { - Preconditions.checkNotNull(data, "data"); - - // Add the data to the decompression buffer. - decompressor.decompress(data); - - // Indicate that all of the data for this stream has been received. - this.endOfStream = endOfStream; - - if (deliveryOutstanding != null) { - // Only allow one outstanding delivery at a time. - return null; - } - return deliver(); - } - - @Override - public void close() { - decompressor.close(); - if (nextFrame != null) { - nextFrame.close(); - } - } - - /** - * Reads and delivers as many messages to the sink as possible. May only be called when a delivery - * is known not to be outstanding. - */ - private ListenableFuture deliver() { - // Process the uncompressed bytes. - while (readRequiredBytes()) { - if (statusNotified) { - throw new IllegalStateException("Inbound data after receiving status frame"); - } - - switch (state) { - case HEADER: - processHeader(); - break; - case BODY: - // Read the body and deliver the message to the sink. - ListenableFuture processingFuture = processBody(); - if (processingFuture != null) { - // A future was returned for the completion of processing the delivered - // message. Once it's done, try to deliver the next message. - return delayProcessingInternal(processingFuture); - } - - break; - default: - throw new AssertionError("Invalid state: " + state); - } - } - - if (endOfStream) { - if (nextFrame.readableBytes() != 0) { - throw Status.INTERNAL - .withDescription("Encountered end-of-stream mid-frame") - .asRuntimeException(); - } - // If reached the end of stream without reading a status frame, fabricate one - // and deliver to the target. - if (!statusNotified) { - notifyStatus(Status.OK); - } - } - // All available messages processed. - if (deliveryOutstanding != null) { - SettableFuture previousOutstanding = deliveryOutstanding; - deliveryOutstanding = null; - previousOutstanding.set(null); - } - return null; - } - - /** - * May only be called when a delivery is known not to be outstanding. If deliveryOutstanding is - * non-null, then it will be re-used and this method will return {@code null}. - */ - private ListenableFuture delayProcessingInternal(ListenableFuture future) { - Preconditions.checkNotNull(future, "future"); - // Return a separate future so that our callback is guaranteed to complete before any - // listeners on the returned future. - ListenableFuture returnFuture = null; - if (deliveryOutstanding == null) { - returnFuture = deliveryOutstanding = SettableFuture.create(); - } - Futures.addCallback(future, new FutureCallback() { - @Override - public void onFailure(Throwable t) { - SettableFuture previousOutstanding = deliveryOutstanding; - deliveryOutstanding = null; - previousOutstanding.setException(t); - } - - @Override - public void onSuccess(Object result) { - try { - deliver(); - } catch (Throwable t) { - if (deliveryOutstanding == null) { - throw Throwables.propagate(t); - } else { - onFailure(t); - } - } - } - }, executor); - return returnFuture; - } - - /** - * Attempts to read the required bytes into nextFrame. - * - * @returns {@code true} if all of the required bytes have been read. - */ - private boolean readRequiredBytes() { - int totalBytesRead = 0; - try { - if (nextFrame == null) { - nextFrame = new CompositeBuffer(); - } - - // Read until the buffer contains all the required bytes. - int missingBytes; - while ((missingBytes = requiredLength - nextFrame.readableBytes()) > 0) { - Buffer buffer = decompressor.readBytes(missingBytes); - if (buffer == null) { - // No more data is available. - break; - } - totalBytesRead += buffer.readableBytes(); - // Add it to the composite buffer for the next frame. - nextFrame.addBuffer(buffer); - } - - // Return whether or not all of the required bytes are now in the frame. - return nextFrame.readableBytes() == requiredLength; - } finally { - if (totalBytesRead > 0) { - listener.bytesRead(totalBytesRead); - } - } - } - - /** - * Processes the GRPC compression header which is composed of the compression flag and the outer - * frame length. - */ - private void processHeader() { - // Peek, but do not read the header. - frameType = nextFrame.readUnsignedByte() & FRAME_TYPE_MASK; - - // Update the required length to include the length of the frame. - requiredLength = nextFrame.readInt(); - - // Continue reading the frame body. - state = State.BODY; - } - - /** - * Processes the body of the GRPC compression frame. A single compression frame may contain - * several GRPC messages within it. - */ - private ListenableFuture processBody() { - ListenableFuture future = null; - switch (frameType) { - case PAYLOAD_FRAME: - future = processMessage(); - break; - case STATUS_FRAME: - processStatus(); - break; - default: - throw new AssertionError("Invalid frameType: " + frameType); - } - - // Done with this frame, begin processing the next header. - state = State.HEADER; - requiredLength = HEADER_LENGTH; - return future; - } - - /** - * Processes the payload of a message frame. - */ - private ListenableFuture processMessage() { - try { - return sink.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes()); - } finally { - // Don't close the frame, since the sink is now responsible for the life-cycle. - nextFrame = null; - } - } - - /** - * Processes the payload of a status frame. - */ - private void processStatus() { - try { - notifyStatus(Status.fromCodeValue(nextFrame.readUnsignedShort())); - } finally { - nextFrame.close(); - nextFrame = null; - } - } - - /** - * Delivers the status notification to the sink. - */ - private void notifyStatus(Status status) { - statusNotified = true; - sink.statusRead(status); - sink.endOfStream(); - } -} diff --git a/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java b/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java index f88a7e1e8a..50ca235fed 100644 --- a/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java @@ -70,10 +70,8 @@ public abstract class Http2ClientStream extends AbstractClientStream { private Charset errorCharset = Charsets.UTF_8; private boolean contentTypeChecked; - protected Http2ClientStream(ClientStreamListener listener, - @Nullable Decompressor decompressor, - Executor deframerExecutor) { - super(listener, decompressor, deframerExecutor); + protected Http2ClientStream(ClientStreamListener listener, Executor deframerExecutor) { + super(listener, deframerExecutor); } protected void transportHeadersReceived(Metadata.Headers headers) { @@ -125,24 +123,19 @@ public abstract class Http2ClientStream extends AbstractClientStream { } else { inboundDataReceived(frame); if (endOfStream) { - if (GRPC_V2_PROTOCOL) { - if (false) { - // This is a protocol violation as we expect to receive trailers. - transportError = Status.INTERNAL.withDescription("Recevied EOS on DATA frame"); - frame.close(); - inboundTransportError(transportError); - } else { - // TODO(user): Delete this hack when trailers are supported by GFE with v2. Currently - // GFE doesn't support trailers, so when using gRPC v2 protocol GFE will not send any - // status. We paper over this for now by just assuming OK. For all properly functioning - // servers (both v1 and v2), stashedStatus should not be null here. - Metadata.Trailers trailers = new Metadata.Trailers(); - trailers.put(Status.CODE_KEY, Status.OK); - inboundTrailersReceived(trailers, Status.OK); - } + if (false) { + // This is a protocol violation as we expect to receive trailers. + transportError = Status.INTERNAL.withDescription("Recevied EOS on DATA frame"); + frame.close(); + inboundTransportError(transportError); } else { - // Synthesize trailers until we get rid of v1. - inboundTrailersReceived(new Metadata.Trailers(), Status.OK); + // TODO(user): Delete this hack when trailers are supported by GFE with v2. Currently + // GFE doesn't support trailers, so when using gRPC v2 protocol GFE will not send any + // status. We paper over this for now by just assuming OK. For all properly functioning + // servers (both v1 and v2), stashedStatus should not be null here. + Metadata.Trailers trailers = new Metadata.Trailers(); + trailers.put(Status.CODE_KEY, Status.OK); + inboundTrailersReceived(trailers, Status.OK); } } } diff --git a/core/src/main/java/com/google/net/stubby/transport/MessageFramer.java b/core/src/main/java/com/google/net/stubby/transport/MessageFramer.java deleted file mode 100644 index 3b076c3d93..0000000000 --- a/core/src/main/java/com/google/net/stubby/transport/MessageFramer.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package com.google.net.stubby.transport; - -import com.google.net.stubby.GrpcFramingUtil; -import com.google.net.stubby.Status; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; - -/** - * Default {@link Framer} implementation. - */ -public class MessageFramer implements Framer { - - private CompressionFramer framer; - private final ByteBuffer scratch = ByteBuffer.allocate(16); - - public MessageFramer(Sink sink, int maxFrameSize) { - // TODO(user): maxFrameSize should probably come from a 'Platform' class - framer = new CompressionFramer(sink, maxFrameSize, false, maxFrameSize / 16); - } - - /** - * Sets whether compression is encouraged. - */ - public void setAllowCompression(boolean enable) { - verifyNotClosed(); - framer.setAllowCompression(enable); - } - - @Override - public void writePayload(InputStream message, int messageLength) { - verifyNotClosed(); - try { - scratch.clear(); - scratch.put(GrpcFramingUtil.PAYLOAD_FRAME); - scratch.putInt(messageLength); - framer.write(scratch.array(), 0, scratch.position()); - if (messageLength != framer.write(message)) { - throw new RuntimeException("Message length was inaccurate"); - } - framer.endOfMessage(); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - - - @Override - public void writeStatus(Status status) { - verifyNotClosed(); - short code = (short) status.getCode().value(); - scratch.clear(); - scratch.put(GrpcFramingUtil.STATUS_FRAME); - int length = 2; - scratch.putInt(length); - scratch.putShort(code); - framer.write(scratch.array(), 0, scratch.position()); - framer.endOfMessage(); - } - - @Override - public void flush() { - verifyNotClosed(); - framer.flush(); - } - - @Override - public boolean isClosed() { - return framer == null; - } - - @Override - public void close() { - if (!isClosed()) { - // TODO(user): Returning buffer to a pool would go here - framer.close(); - framer = null; - } - } - - @Override - public void dispose() { - // TODO(user): Returning buffer to a pool would go here - framer = null; - } - - private void verifyNotClosed() { - if (isClosed()) { - throw new IllegalStateException("Framer already closed"); - } - } -} diff --git a/core/src/main/java/com/google/net/stubby/transport/MessageFramer2.java b/core/src/main/java/com/google/net/stubby/transport/MessageFramer2.java index 116785c3c7..c3a33de253 100644 --- a/core/src/main/java/com/google/net/stubby/transport/MessageFramer2.java +++ b/core/src/main/java/com/google/net/stubby/transport/MessageFramer2.java @@ -34,7 +34,6 @@ package com.google.net.stubby.transport; import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; import com.google.net.stubby.DeferredInputStream; -import com.google.net.stubby.Status; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -45,9 +44,23 @@ import java.nio.ByteBuffer; import java.util.zip.GZIPOutputStream; /** - * Default {@link Framer} implementation. + * Encodes gRPC messages to be delivered via the transport layer which implements {@link + * MessageFramer2.Sink}. */ -public class MessageFramer2 implements Framer { +public class MessageFramer2 { + /** + * Sink implemented by the transport layer to receive frames and forward them to their destination + */ + public interface Sink { + /** + * Deliver a frame via the transport. + * + * @param frame the contents of the frame to deliver + * @param endOfStream whether the frame is the last one for the GRPC stream + */ + public void deliverFrame(T frame, boolean endOfStream); + } + private static final int HEADER_LENGTH = 5; private static final byte UNCOMPRESSED = 0; private static final byte COMPRESSED = 1; @@ -72,7 +85,10 @@ public class MessageFramer2 implements Framer { this.compression = Preconditions.checkNotNull(compression, "compression"); } - @Override + /** + * Write out a Payload message. {@code message} will be completely consumed. + * {@code message.available()} must return the number of remaining bytes to be read. + */ public void writePayload(InputStream message, int messageLength) { try { if (compression == Compression.NONE) { @@ -144,12 +160,9 @@ public class MessageFramer2 implements Framer { } } - @Override - public void writeStatus(Status status) { - // NOOP - } - - @Override + /** + * Flush any buffered data in the framer to the sink. + */ public void flush() { if (bytebuf.position() == 0) { return; @@ -157,12 +170,18 @@ public class MessageFramer2 implements Framer { commitToSink(false); } - @Override + /** + * Indicates whether or not this {@link Framer} has been closed via a call to either + * {@link #close()} or {@link #dispose()}. + */ public boolean isClosed() { return bytebuf == null; } - @Override + /** + * Flushes and closes the framer and releases any buffers. After the {@link Framer} is closed or + * disposed, additional calls to this method will have no affect. + */ public void close() { if (!isClosed()) { commitToSink(true); @@ -170,7 +189,10 @@ public class MessageFramer2 implements Framer { } } - @Override + /** + * Closes the framer and releases any buffers, but does not flush. After the {@link Framer} is + * closed or disposed, additional calls to this method will have no affect. + */ public void dispose() { // TODO(user): Returning buffer to a pool would go here bytebuf = null; diff --git a/core/src/main/java/com/google/net/stubby/transport/TransportFrameUtil.java b/core/src/main/java/com/google/net/stubby/transport/TransportFrameUtil.java index a7107eb7ec..80d1c23f87 100644 --- a/core/src/main/java/com/google/net/stubby/transport/TransportFrameUtil.java +++ b/core/src/main/java/com/google/net/stubby/transport/TransportFrameUtil.java @@ -55,40 +55,6 @@ public final class TransportFrameUtil { private static final byte[] binaryHeaderSuffixBytes = Metadata.BINARY_HEADER_SUFFIX.getBytes(US_ASCII); - - // Compression modes (lowest order 3 bits of frame flags) - public static final byte NO_COMPRESS_FLAG = 0x0; - public static final byte FLATE_FLAG = 0x1; - public static final byte COMPRESSION_FLAG_MASK = 0x7; - - public static boolean isNotCompressed(int b) { - return ((b & COMPRESSION_FLAG_MASK) == NO_COMPRESS_FLAG); - } - - public static boolean isFlateCompressed(int b) { - return ((b & COMPRESSION_FLAG_MASK) == FLATE_FLAG); - } - - /** - * Length of the compression type field. - */ - public static final int COMPRESSION_TYPE_LENGTH = 1; - - /** - * Length of the compression frame length field. - */ - public static final int COMPRESSION_FRAME_LENGTH = 3; - - /** - * Full length of the compression header. - */ - public static final int COMPRESSION_HEADER_LENGTH = - COMPRESSION_TYPE_LENGTH + COMPRESSION_FRAME_LENGTH; - - // Flags - public static final byte PAYLOAD_FRAME = 0x0; - public static final byte STATUS_FRAME = 0x3; - // TODO(user): This needs proper namespacing support, this is currently just a hack /** * Converts the path from the HTTP request to the full qualified method name. diff --git a/core/src/test/java/com/google/net/stubby/transport/CompressionFramerTest.java b/core/src/test/java/com/google/net/stubby/transport/CompressionFramerTest.java deleted file mode 100644 index 7f4d796d1a..0000000000 --- a/core/src/test/java/com/google/net/stubby/transport/CompressionFramerTest.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package com.google.net.stubby.transport; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import com.google.common.collect.Lists; -import com.google.common.io.ByteStreams; -import com.google.common.primitives.Bytes; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Random; -import java.util.zip.Deflater; -import java.util.zip.InflaterInputStream; - -/** Unit tests for {@link CompressionFramer}. */ -@RunWith(JUnit4.class) -public class CompressionFramerTest { - private int maxFrameSize = 1024; - private int sufficient = 8; - private CapturingSink sink = new CapturingSink(); - private CompressionFramer framer = new CompressionFramer(sink, maxFrameSize, true, sufficient); - - @Test - public void testGoodCompression() { - byte[] payload = new byte[1000]; - framer.setCompressionLevel(Deflater.BEST_COMPRESSION); - framer.write(payload, 0, payload.length); - framer.endOfMessage(); - framer.flush(); - - assertEquals(1, sink.frames.size()); - byte[] frame = sink.frames.get(0); - assertEquals(TransportFrameUtil.FLATE_FLAG, frame[0]); - assertTrue(decodeFrameLength(frame) < 30); - assertArrayEquals(payload, decompress(frame)); - } - - @Test - public void testPoorCompression() { - byte[] payload = new byte[3 * maxFrameSize / 2]; - new Random(1).nextBytes(payload); - framer.setCompressionLevel(Deflater.DEFAULT_COMPRESSION); - framer.write(payload, 0, payload.length); - framer.endOfMessage(); - framer.flush(); - - assertEquals(2, sink.frames.size()); - assertEquals(TransportFrameUtil.FLATE_FLAG, sink.frames.get(0)[0]); - assertEquals(TransportFrameUtil.FLATE_FLAG, sink.frames.get(1)[0]); - assertTrue(decodeFrameLength(sink.frames.get(0)) <= maxFrameSize); - assertTrue(decodeFrameLength(sink.frames.get(0)) - >= maxFrameSize - CompressionFramer.HEADER_LENGTH - CompressionFramer.MARGIN - sufficient); - assertArrayEquals(payload, decompress(sink.frames)); - } - - private static int decodeFrameLength(byte[] frame) { - return ((frame[1] & 0xFF) << 16) - | ((frame[2] & 0xFF) << 8) - | (frame[3] & 0xFF); - } - - private static byte[] decompress(byte[] frame) { - try { - return ByteStreams.toByteArray(new InflaterInputStream(new ByteArrayInputStream(frame, - CompressionFramer.HEADER_LENGTH, frame.length - CompressionFramer.HEADER_LENGTH))); - } catch (IOException ex) { - throw new AssertionError(); - } - } - - private static byte[] decompress(List frames) { - byte[][] bytes = new byte[frames.size()][]; - for (int i = 0; i < frames.size(); i++) { - bytes[i] = decompress(frames.get(i)); - } - return Bytes.concat(bytes); - } - - private static class CapturingSink implements Framer.Sink { - public final List frames = Lists.newArrayList(); - - @Override - public void deliverFrame(ByteBuffer frame, boolean endOfMessage) { - byte[] frameBytes = new byte[frame.remaining()]; - frame.get(frameBytes); - assertEquals(frameBytes.length - CompressionFramer.HEADER_LENGTH, - decodeFrameLength(frameBytes)); - frames.add(frameBytes); - } - } -} diff --git a/core/src/test/java/com/google/net/stubby/transport/GrpcDeframerTest.java b/core/src/test/java/com/google/net/stubby/transport/GrpcDeframerTest.java deleted file mode 100644 index f9e0f280da..0000000000 --- a/core/src/test/java/com/google/net/stubby/transport/GrpcDeframerTest.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package com.google.net.stubby.transport; - -import static com.google.net.stubby.transport.TransportFrameUtil.PAYLOAD_FRAME; -import static com.google.net.stubby.transport.TransportFrameUtil.STATUS_FRAME; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import com.google.common.io.ByteStreams; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; -import com.google.net.stubby.Status; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; - -import javax.annotation.Nullable; - -/** - * Tests for {@link GrpcDeframer}. - */ -@RunWith(JUnit4.class) -public class GrpcDeframerTest { - private static final String MESSAGE = "hello world"; - private static final byte[] MESSAGE_BYTES = MESSAGE.getBytes(StandardCharsets.UTF_8); - private static final Status STATUS_CODE = Status.CANCELLED; - - private GrpcDeframer reader; - - private StubDecompressor decompressor; - - @Mock - private GrpcDeframer.Sink sink; - - private SettableFuture messageFuture; - - @Mock - private DeframerListener listener; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - messageFuture = SettableFuture.create(); - when(sink.messageRead(any(InputStream.class), anyInt())).thenReturn(messageFuture); - - decompressor = new StubDecompressor(); - reader = new GrpcDeframer(decompressor, sink, MoreExecutors.directExecutor(), listener); - } - - @Test - public void payloadShouldCallTarget() throws Exception { - decompressor.init(payloadFrame()); - reader.deframe(Buffers.empty(), false); - verifyPayload(); - verifyNoStatus(); - } - - @Test - public void payloadWithEndOfStreamShouldNotifyStatus() throws Exception { - decompressor.init(payloadFrame()); - reader.deframe(Buffers.empty(), true); - verifyPayload(); - verifyNoStatus(); - - messageFuture.set(null); - verifyStatus(Status.Code.OK); - } - - @Test - public void statusShouldCallTarget() throws Exception { - decompressor.init(statusFrame()); - reader.deframe(Buffers.empty(), false); - verifyNoPayload(); - verifyStatus(); - } - - @Test - public void statusWithEndOfStreamShouldNotifyStatusOnce() throws Exception { - decompressor.init(statusFrame()); - reader.deframe(Buffers.empty(), true); - verifyNoPayload(); - verifyStatus(); - } - - @Test - public void multipleFramesShouldCallTarget() throws Exception { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(os); - - // Write a payload frame. - writeFrame(PAYLOAD_FRAME, MESSAGE_BYTES, dos); - - // Write a status frame. - byte[] statusBytes = new byte[] {0, (byte) STATUS_CODE.getCode().value()}; - writeFrame(STATUS_FRAME, statusBytes, dos); - - // Now write the complete frame: compression header followed by the 3 message frames. - dos.close(); - byte[] bodyBytes = os.toByteArray(); - - decompressor.init(bodyBytes); - reader.deframe(Buffers.empty(), false); - - // Verify that all callbacks were called. - verifyPayload(); - verifyNoStatus(); - - messageFuture.set(null); - verifyStatus(); - } - - @Test - public void partialFrameShouldSucceed() throws Exception { - byte[] frame = payloadFrame(); - - // Create a buffer that contains 2 payload frames. - byte[] fullBuffer = Arrays.copyOf(frame, frame.length * 2); - System.arraycopy(frame, 0, fullBuffer, frame.length, frame.length); - - // Use only a portion of the frame. Should not call the sink. - int startIx = 0; - int endIx = 10; - byte[] chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx); - decompressor.init(chunk); - reader.deframe(Buffers.empty(), false); - verifyNoPayload(); - verifyNoStatus(); - - // Supply the rest of the frame and a portion of a second frame. Should call the sink. - startIx = endIx; - endIx = startIx + frame.length; - chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx); - decompressor.init(chunk); - reader.deframe(Buffers.empty(), false); - verifyPayload(); - verifyNoStatus(); - } - - private void verifyPayload() { - ArgumentCaptor captor = ArgumentCaptor.forClass(InputStream.class); - verify(sink).messageRead(captor.capture(), eq(MESSAGE.length())); - assertEquals(MESSAGE, readString(captor.getValue(), MESSAGE.length())); - } - - private String readString(InputStream in, int length) { - try { - byte[] bytes = new byte[length]; - ByteStreams.readFully(in, bytes); - return new String(bytes, StandardCharsets.UTF_8); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private void verifyStatus() { - verifyStatus(Status.Code.CANCELLED); - } - - private void verifyStatus(Status.Code code) { - ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); - verify(sink).statusRead(captor.capture()); - verify(sink).endOfStream(); - assertEquals(code, captor.getValue().getCode()); - } - - private void verifyNoPayload() { - verify(sink, never()).messageRead(any(InputStream.class), anyInt()); - } - - private void verifyNoStatus() { - verify(sink, never()).statusRead(any(Status.class)); - verify(sink, never()).endOfStream(); - } - - private static byte[] payloadFrame() throws IOException { - return frame(PAYLOAD_FRAME, MESSAGE_BYTES); - } - - private static byte[] statusFrame() throws IOException { - byte[] bytes = new byte[] {0, (byte) STATUS_CODE.getCode().value()}; - return frame(STATUS_FRAME, bytes); - } - - private static byte[] frame(int frameType, byte[] data) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - OutputStream os = bos; - DataOutputStream dos = new DataOutputStream(os); - writeFrame(frameType, data, dos); - dos.close(); - return bos.toByteArray(); - } - - private static void writeFrame(int frameType, byte[] data, DataOutputStream out) - throws IOException { - out.write(frameType); - out.writeInt(data.length); - out.write(data); - } - - private static final class StubDecompressor implements Decompressor { - byte[] bytes; - int offset; - - void init(byte[] bytes) { - this.bytes = bytes; - this.offset = 0; - } - - @Override - public void decompress(Buffer data) { - // Do nothing. - } - - @Override - public void close() { - // Do nothing. - } - - @Override - @Nullable - public Buffer readBytes(int length) { - length = Math.min(length, bytes.length - offset); - if (length == 0) { - return null; - } - - Buffer buffer = Buffers.wrap(bytes, offset, length); - offset += length; - return buffer; - } - } -} diff --git a/core/src/test/java/com/google/net/stubby/transport/MessageFramer2Test.java b/core/src/test/java/com/google/net/stubby/transport/MessageFramer2Test.java index 417385af7b..253bccf507 100644 --- a/core/src/test/java/com/google/net/stubby/transport/MessageFramer2Test.java +++ b/core/src/test/java/com/google/net/stubby/transport/MessageFramer2Test.java @@ -60,8 +60,8 @@ public class MessageFramer2Test { private static final int TRANSPORT_FRAME_SIZE = 12; @Mock - private Framer.Sink> sink; - private Framer.Sink copyingSink; + private MessageFramer2.Sink> sink; + private MessageFramer2.Sink copyingSink; private MessageFramer2 framer; @Captor @@ -191,10 +191,10 @@ public class MessageFramer2Test { * Since ByteBuffers are reused, this sink copies their value at the time of the call. Converting * to List is convenience. */ - private static class ByteArrayConverterSink implements Framer.Sink { - private final Framer.Sink> delegate; + private static class ByteArrayConverterSink implements MessageFramer2.Sink { + private final MessageFramer2.Sink> delegate; - public ByteArrayConverterSink(Framer.Sink> delegate) { + public ByteArrayConverterSink(MessageFramer2.Sink> delegate) { this.delegate = delegate; } diff --git a/core/src/test/java/com/google/net/stubby/transport/MessageFramerTest.java b/core/src/test/java/com/google/net/stubby/transport/MessageFramerTest.java deleted file mode 100644 index 7fc3f75327..0000000000 --- a/core/src/test/java/com/google/net/stubby/transport/MessageFramerTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package com.google.net.stubby.transport; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import com.google.common.primitives.Bytes; -import com.google.net.stubby.GrpcFramingUtil; -import com.google.net.stubby.Status; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.ByteArrayInputStream; -import java.nio.ByteBuffer; -import java.util.Arrays; - -/** - * Tests for {@link MessageFramer} - */ -@RunWith(JUnit4.class) -public class MessageFramerTest { - - public static final int TRANSPORT_FRAME_SIZE = 57; - - @Test - public void testPayload() throws Exception { - CapturingSink sink = new CapturingSink(); - MessageFramer framer = new MessageFramer(sink, TRANSPORT_FRAME_SIZE); - byte[] payload = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23}; - byte[] unframedStream = - Bytes.concat( - new byte[]{GrpcFramingUtil.PAYLOAD_FRAME}, - new byte[]{0, 0, 0, (byte) payload.length}, - payload); - for (int i = 0; i < 1000; i++) { - framer.writePayload(new ByteArrayInputStream(payload), payload.length); - if ((i + 1) % 13 == 0) { - // Test flushing periodically - framer.flush(); - } - } - framer.flush(); - assertEquals(sink.deframedStream.length, unframedStream.length * 1000); - for (int i = 0; i < 1000; i++) { - assertArrayEquals(unframedStream, - Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length, - (i + 1) * unframedStream.length)); - } - } - - @Test - public void testStatus() throws Exception { - CapturingSink sink = new CapturingSink(); - MessageFramer framer = new MessageFramer(sink, TRANSPORT_FRAME_SIZE); - byte[] unframedStream = Bytes.concat( - new byte[]{GrpcFramingUtil.STATUS_FRAME}, - new byte[]{0, 0, 0, 2}, // Len is 2 bytes - new byte[]{0, 13}); // Internal==13 - for (int i = 0; i < 1000; i++) { - framer.writeStatus(Status.INTERNAL); - if ((i + 1) % 13 == 0) { - framer.flush(); - } - } - framer.flush(); - assertEquals(sink.deframedStream.length, unframedStream.length * 1000); - for (int i = 0; i < 1000; i++) { - assertArrayEquals(unframedStream, - Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length, - (i + 1) * unframedStream.length)); - } - } - - static class CapturingSink implements Framer.Sink { - - byte[] deframedStream = new byte[0]; - - @Override - public void deliverFrame(ByteBuffer frame, boolean endOfMessage) { - assertTrue(frame.remaining() <= TRANSPORT_FRAME_SIZE); - // Frame must contain compression flag & 24 bit length - int header = frame.getInt(); - byte flag = (byte) (header >>> 24); - int length = header & 0xFFFFFF; - assertTrue(TransportFrameUtil.isNotCompressed(flag)); - assertEquals(frame.remaining(), length); - // Frame must exceed dictated transport frame size - byte[] frameBytes = new byte[frame.remaining()]; - frame.get(frameBytes); - deframedStream = Bytes.concat(deframedStream, frameBytes); - } - } -} diff --git a/integration-testing/build.gradle b/integration-testing/build.gradle index cec674857b..2ad104bad7 100644 --- a/integration-testing/build.gradle +++ b/integration-testing/build.gradle @@ -18,5 +18,6 @@ dependencies { project(':stubby-okhttp'), project(':stubby-stub'), project(':stubby-testing'), - libraries.junit + libraries.junit, + libraries.mockito } diff --git a/integration-testing/pom.xml b/integration-testing/pom.xml index daa39ae328..88e5e72421 100644 --- a/integration-testing/pom.xml +++ b/integration-testing/pom.xml @@ -44,6 +44,11 @@ junit compile + + org.mockito + mockito-core + compile + diff --git a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java index d7949db0ff..4ee74b4a80 100644 --- a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java +++ b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java @@ -63,13 +63,11 @@ import com.google.net.stubby.testing.integration.Messages.StreamingInputCallRequ import com.google.net.stubby.testing.integration.Messages.StreamingInputCallResponse; import com.google.net.stubby.testing.integration.Messages.StreamingOutputCallRequest; import com.google.net.stubby.testing.integration.Messages.StreamingOutputCallResponse; -import com.google.net.stubby.transport.AbstractStream; import com.google.protobuf.ByteString; import com.google.protobuf.EmptyProtos.Empty; import org.junit.After; import org.junit.Assert; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -494,7 +492,6 @@ public abstract class AbstractTransportTest { @org.junit.Test public void exchangeContextUnaryCall() throws Exception { - Assume.assumeTrue(AbstractStream.GRPC_V2_PROTOCOL); TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel); @@ -514,14 +511,11 @@ public abstract class AbstractTransportTest { // Assert that our side channel object is echoed back in both headers and trailers Assert.assertEquals(contextValue, headersCapture.get().get(METADATA_KEY)); - if (AbstractStream.GRPC_V2_PROTOCOL) { - Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY)); - } + Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY)); } @org.junit.Test public void exchangeContextStreamingCall() throws Exception { - Assume.assumeTrue(AbstractStream.GRPC_V2_PROTOCOL); TestServiceGrpc.TestServiceStub stub = TestServiceGrpc.newStub(channel); // Capture the context exchange @@ -560,9 +554,7 @@ public abstract class AbstractTransportTest { // Assert that our side channel object is echoed back in both headers and trailers Assert.assertEquals(contextValue, headersCapture.get().get(METADATA_KEY)); - if (AbstractStream.GRPC_V2_PROTOCOL) { - Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY)); - } + Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY)); } diff --git a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceClient.java b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceClient.java index 241997d365..85a4382c1a 100644 --- a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceClient.java +++ b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceClient.java @@ -112,7 +112,6 @@ public class TestServiceClient { * --serverHost=The host of the remote server.
* --serverPort=$port_number The port of the remote server.
* --test_case=empty_unary|server_streaming The client test to run.
- * --grpc_version=1|2 Use gRPC v2 protocol. Default is 1. */ public static void main(String[] args) throws Exception { Map argMap = parseArgs(args); @@ -121,8 +120,11 @@ public class TestServiceClient { int serverPort = getPort(argMap); String testCase = getTestCase(argMap); - com.google.net.stubby.transport.AbstractStream.GRPC_V2_PROTOCOL = - getGrpcVersion(argMap) == 2; + // TODO(user): Remove. Ideally stop passing the arg in scripts first. + if (getGrpcVersion(argMap) != 2) { + System.err.println("Only grpc_version=2 is supported"); + System.exit(1); + } final Tester tester = new Tester(transport, serverHost, serverPort); Runtime.getRuntime().addShutdownHook(new Thread() { diff --git a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceServer.java b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceServer.java index 7d16c27063..3d5e564f0a 100644 --- a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceServer.java +++ b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceServer.java @@ -118,15 +118,17 @@ public class TestServiceServer { * --transport= Identifies the transport * over which GRPC frames should be sent.
* --port= The port number for RPC communications. - * --grpc_version=<1|2> Use gRPC v2 protocol. Default is v1. */ public static void main(String[] args) throws Exception { Map argMap = parseArgs(args); Transport transport = getTransport(argMap); int port = getPort(RPC_PORT_ARG, argMap); - com.google.net.stubby.transport.AbstractStream.GRPC_V2_PROTOCOL = - getGrpcVersion(argMap) == 2; + // TODO(user): Remove. Ideally stop passing the arg in scripts first. + if (getGrpcVersion(argMap) != 2) { + System.err.println("Only grpc_version=2 is supported"); + System.exit(1); + } final TestServiceServer server = new TestServiceServer(transport, port); diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientStream.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientStream.java index 2568d64dc6..af9131240f 100644 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientStream.java +++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientStream.java @@ -51,7 +51,7 @@ class NettyClientStream extends Http2ClientStream { private final NettyClientHandler handler; NettyClientStream(ClientStreamListener listener, Channel channel, NettyClientHandler handler) { - super(listener, new NettyDecompressor(channel.alloc()), channel.eventLoop()); + super(listener, channel.eventLoop()); this.channel = checkNotNull(channel, "channel"); this.handler = checkNotNull(handler, "handler"); } diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyDecompressor.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyDecompressor.java deleted file mode 100644 index 93dcd2dd76..0000000000 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyDecompressor.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package com.google.net.stubby.transport.netty; - -import static com.google.net.stubby.transport.TransportFrameUtil.COMPRESSION_HEADER_LENGTH; -import static com.google.net.stubby.transport.TransportFrameUtil.isFlateCompressed; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.io.Closeables; -import com.google.net.stubby.transport.Buffer; -import com.google.net.stubby.transport.Decompressor; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.buffer.CompositeByteBuf; - -import java.io.Closeable; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.util.zip.InflaterInputStream; - -import javax.annotation.Nullable; - -/** - * A {@link Decompressor} implementation based on Netty {@link CompositeByteBuf}s. - */ -public class NettyDecompressor implements Decompressor { - - private final CompositeByteBuf buffer; - private final ByteBufAllocator alloc; - private Frame frame; - - public NettyDecompressor(ByteBufAllocator alloc) { - this.alloc = Preconditions.checkNotNull(alloc, "alloc"); - buffer = alloc.compositeBuffer(); - } - - @Override - public void decompress(Buffer data) { - ByteBuf buf = toByteBuf(data); - - // Add it to the compression frame buffer. - buffer.addComponent(buf); - buffer.writerIndex(buffer.writerIndex() + buf.readableBytes()); - } - - @Override - public Buffer readBytes(final int maxLength) { - Preconditions.checkArgument(maxLength > 0, "maxLength must be > 0"); - try { - // Read the next frame if we don't already have one. - if (frame == null) { - frame = nextFrame(); - } - - ByteBuf byteBuf = null; - if (frame != null) { - // Read as many bytes as we can from the frame. - byteBuf = frame.readBytes(maxLength); - - // If we reached the end of the frame, close it. - if (frame.complete()) { - frame.close(); - frame = null; - } - } - - if (byteBuf == null) { - // No data was available. - return null; - } - - return new NettyBuffer(byteBuf); - } finally { - // Discard any component buffers that have been fully read. - buffer.discardReadComponents(); - } - } - - @Override - public void close() { - // Release the CompositeByteBuf. This will automatically release any components as well. - buffer.release(); - if (frame != null) { - frame.close(); - } - } - - /** - * Returns the next compression frame object, or {@code null} if the next frame header is - * incomplete. - */ - @SuppressWarnings("resource") - @Nullable - private Frame nextFrame() { - if (buffer.readableBytes() < COMPRESSION_HEADER_LENGTH) { - // Don't have all the required bytes for the frame header yet. - return null; - } - - // Read the header and create the frame object. - boolean compressed = isFlateCompressed(buffer.readUnsignedByte()); - int frameLength = buffer.readUnsignedMedium(); - if (frameLength == 0) { - return nextFrame(); - } - - return compressed ? new CompressedFrame(frameLength) : new UncompressedFrame(frameLength); - } - - /** - * Converts the given buffer into a {@link ByteBuf}. - */ - private ByteBuf toByteBuf(Buffer data) { - if (data instanceof NettyBuffer) { - // Just return the contained ByteBuf. - return ((NettyBuffer) data).buffer(); - } - - // Create a new ByteBuf and copy the content to it. - try { - int length = data.readableBytes(); - ByteBuf buf = alloc.buffer(length); - data.readBytes(new ByteBufOutputStream(buf), length); - return buf; - } catch (IOException e) { - throw Throwables.propagate(e); - } finally { - data.close(); - } - } - - /** - * A wrapper around the body of a compression frame. Provides a generic method for reading bytes - * from any frame. - */ - private interface Frame extends Closeable { - @Nullable - ByteBuf readBytes(int maxLength); - - boolean complete(); - - @Override - void close(); - } - - /** - * An uncompressed frame. Just writes bytes directly from the compression frame. - */ - private class UncompressedFrame implements Frame { - int bytesRemainingInFrame; - - public UncompressedFrame(int frameLength) { - this.bytesRemainingInFrame = frameLength; - } - - @Override - @Nullable - public ByteBuf readBytes(int maxLength) { - Preconditions.checkState(!complete(), "Must not call readBytes on a completed frame"); - int available = buffer.readableBytes(); - if (available == 0) { - return null; - } - - int bytesToRead = Math.min(available, Math.min(maxLength, bytesRemainingInFrame)); - bytesRemainingInFrame -= bytesToRead; - - return buffer.readBytes(bytesToRead); - } - - @Override - public boolean complete() { - return bytesRemainingInFrame == 0; - } - - @Override - public void close() { - // Do nothing. - } - } - - /** - * A compressed frame that inflates the data as it reads from the frame. - */ - private class CompressedFrame implements Frame { - private final InputStream in; - private ByteBuf nextBuf; - - public CompressedFrame(int frameLength) { - // Limit the stream by the frameLength. - in = new InflaterInputStream(new GrowableByteBufInputStream(frameLength)); - } - - @Override - @Nullable - public ByteBuf readBytes(int maxLength) { - - // If the pre-existing nextBuf is too small, release it. - if (nextBuf != null && nextBuf.capacity() < maxLength) { - nextBuf.release(); - nextBuf = null; - } - - if (nextBuf == null) { - nextBuf = alloc.buffer(); - } - - try { - int bytesToWrite = Math.min(maxLength, nextBuf.writableBytes()); - nextBuf.writeBytes(in, bytesToWrite); - } catch (EOFException e) { - // The next compressed block is unavailable at the moment. Nothing to return. - return null; - } catch (IOException e) { - throw new RuntimeException(e); - } - - if (!nextBuf.isReadable()) { - throw new AssertionError("Read zero bytes from the compression frame"); - } - - ByteBuf ret = nextBuf; - nextBuf = null; - return ret; - } - - @Override - public boolean complete() { - try { - return in.available() <= 0; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() { - Closeables.closeQuietly(in); - } - } - - /** - * A stream backed by the {@link #buffer}, which allows for additional reading as the buffer - * grows. Not using Netty's stream class since it doesn't handle growth of the underlying buffer. - */ - private class GrowableByteBufInputStream extends InputStream { - final int startIndex; - final int endIndex; - - GrowableByteBufInputStream(int length) { - startIndex = buffer.readerIndex(); - endIndex = startIndex + length; - } - - @Override - public int read() throws IOException { - if (available() == 0) { - return -1; - } - return buffer.readByte() & 0xff; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - int available = available(); - if (available == 0) { - return -1; - } - - len = Math.min(available, len); - buffer.readBytes(b, off, len); - return len; - } - - @Override - public int available() throws IOException { - return Math.min(endIndex - buffer.readerIndex(), buffer.readableBytes()); - } - } -} diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerStream.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerStream.java index c4b8405759..969dec138d 100644 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerStream.java +++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerStream.java @@ -51,7 +51,7 @@ class NettyServerStream extends AbstractServerStream { private final NettyServerHandler handler; NettyServerStream(Channel channel, int id, NettyServerHandler handler) { - super(id, new NettyDecompressor(channel.alloc()), channel.eventLoop()); + super(id, channel.eventLoop()); this.channel = checkNotNull(channel, "channel"); this.handler = checkNotNull(handler, "handler"); } diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java index db817ebe72..39517813c5 100644 --- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java @@ -32,7 +32,6 @@ package com.google.net.stubby.transport.netty; import static com.google.net.stubby.transport.netty.NettyTestUtil.messageFrame; -import static com.google.net.stubby.transport.netty.NettyTestUtil.statusFrame; import static com.google.net.stubby.transport.netty.Utils.CONTENT_TYPE_GRPC; import static com.google.net.stubby.transport.netty.Utils.CONTENT_TYPE_HEADER; import static com.google.net.stubby.transport.netty.Utils.STATUS_OK; @@ -56,9 +55,6 @@ import io.netty.handler.codec.AsciiString; import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.Http2Headers; -import org.junit.After; -import org.junit.Assume; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -169,7 +165,6 @@ public class NettyClientStreamTest extends NettyStreamTestBase { @Test public void inboundTrailersClosesCall() throws Exception { - Assume.assumeTrue(AbstractStream.GRPC_V2_PROTOCOL); stream().id(1); stream().transportHeadersReceived(grpcResponseHeaders(), false); super.inboundMessageShouldCallListener(); @@ -183,11 +178,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase { // Receive headers first so that it's a valid GRPC response. stream().transportHeadersReceived(grpcResponseHeaders(), false); - if (AbstractStream.GRPC_V2_PROTOCOL) { - stream().transportHeadersReceived(grpcResponseTrailers(Status.INTERNAL), true); - } else { - stream().transportDataReceived(statusFrame(Status.INTERNAL), false); - } + stream().transportHeadersReceived(grpcResponseTrailers(Status.INTERNAL), true); ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); verify(listener).closed(captor.capture(), any(Metadata.Trailers.class)); assertEquals(Status.INTERNAL.getCode(), captor.getValue().getCode()); diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyDecompressorTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyDecompressorTest.java deleted file mode 100644 index 47ba5e6438..0000000000 --- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyDecompressorTest.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package com.google.net.stubby.transport.netty; - -import static com.google.common.base.Charsets.UTF_8; -import static com.google.net.stubby.transport.Buffers.readAsStringUtf8; -import static com.google.net.stubby.transport.TransportFrameUtil.COMPRESSION_HEADER_LENGTH; -import static com.google.net.stubby.transport.TransportFrameUtil.FLATE_FLAG; -import static com.google.net.stubby.transport.TransportFrameUtil.NO_COMPRESS_FLAG; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -import com.google.net.stubby.transport.Buffer; -import com.google.net.stubby.transport.Buffers; -import com.google.net.stubby.transport.CompositeBuffer; - -import io.netty.buffer.Unpooled; -import io.netty.buffer.UnpooledByteBufAllocator; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.util.Arrays; -import java.util.zip.DeflaterOutputStream; - -/** - * Tests for {@link NettyDecompressor}. - */ -@RunWith(JUnit4.class) -public class NettyDecompressorTest { - private static final String MESSAGE = "hello world"; - - private NettyDecompressor decompressor; - - @Before - public void setup() { - decompressor = new NettyDecompressor(UnpooledByteBufAllocator.DEFAULT); - } - - @Test - public void uncompressedDataShouldSucceed() throws Exception { - fullMessageShouldSucceed(false); - } - - @Test - public void compressedDataShouldSucceed() throws Exception { - fullMessageShouldSucceed(true); - } - - @Test - public void uncompressedFrameShouldNotBeReadableUntilComplete() throws Exception { - byte[] frame = frame(false); - byte[] chunk1 = Arrays.copyOf(frame, 5); - byte[] chunk2 = Arrays.copyOfRange(frame, 5, frame.length); - - // Decompress the first chunk and verify it's not readable yet. - decompressor.decompress(Buffers.wrap(chunk1)); - - CompositeBuffer composite = new CompositeBuffer(); - Buffer buffer = decompressor.readBytes(2); - assertEquals(1, buffer.readableBytes()); - composite.addBuffer(buffer); - - // Decompress the rest of the frame and verify it's readable. - decompressor.decompress(Buffers.wrap(chunk2)); - composite.addBuffer(decompressor.readBytes(MESSAGE.length() - 1)); - assertEquals(MESSAGE, readAsStringUtf8(composite)); - } - - @Test - public void compressedFrameShouldNotBeReadableUntilComplete() throws Exception { - byte[] frame = frame(true); - byte[] chunk1 = Arrays.copyOf(frame, 5); - byte[] chunk2 = Arrays.copyOfRange(frame, 5, frame.length); - - // Decompress the first chunk and verify it's not readable yet. - decompressor.decompress(Buffers.wrap(chunk1)); - Buffer buffer = decompressor.readBytes(2); - assertNull(buffer); - - // Decompress the rest of the frame and verify it's readable. - decompressor.decompress(Buffers.wrap(chunk2)); - CompositeBuffer composite = new CompositeBuffer(); - for(int remaining = MESSAGE.length(); remaining > 0; ) { - Buffer buf = decompressor.readBytes(remaining); - if (buf == null) { - break; - } - composite.addBuffer(buf); - remaining -= buf.readableBytes(); - } - assertEquals(MESSAGE, readAsStringUtf8(composite)); - } - - @Test - public void nettyBufferShouldBeReleasedAfterRead() throws Exception { - byte[] frame = frame(false); - byte[] chunk1 = Arrays.copyOf(frame, 5); - byte[] chunk2 = Arrays.copyOfRange(frame, 5, frame.length); - NettyBuffer buffer1 = new NettyBuffer(Unpooled.wrappedBuffer(chunk1)); - NettyBuffer buffer2 = new NettyBuffer(Unpooled.wrappedBuffer(chunk2)); - // CompositeByteBuf always keeps at least one buffer internally, so we add a second so - // that it will release the first after it is read. - decompressor.decompress(buffer1); - decompressor.decompress(buffer2); - NettyBuffer readBuffer = (NettyBuffer) decompressor.readBytes(buffer1.readableBytes()); - assertEquals(0, buffer1.buffer().refCnt()); - assertEquals(1, readBuffer.buffer().refCnt()); - } - - @Test - public void closeShouldReleasedBuffers() throws Exception { - byte[] frame = frame(false); - byte[] chunk1 = Arrays.copyOf(frame, 5); - NettyBuffer buffer1 = new NettyBuffer(Unpooled.wrappedBuffer(chunk1)); - decompressor.decompress(buffer1); - assertEquals(1, buffer1.buffer().refCnt()); - decompressor.close(); - assertEquals(0, buffer1.buffer().refCnt()); - } - - private void fullMessageShouldSucceed(boolean compress) throws Exception { - // Decompress the entire frame all at once. - byte[] frame = frame(compress); - decompressor.decompress(Buffers.wrap(frame)); - - // Read some bytes and verify. - int chunkSize = MESSAGE.length() / 2; - assertEquals(MESSAGE.substring(0, chunkSize), - readAsStringUtf8(decompressor.readBytes(chunkSize))); - - // Read the rest and verify. - assertEquals(MESSAGE.substring(chunkSize), - readAsStringUtf8(decompressor.readBytes(MESSAGE.length() - chunkSize))); - } - - /** - * Creates a compression frame from {@link #MESSAGE}, applying compression if requested. - */ - private byte[] frame(boolean compress) throws Exception { - byte[] msgBytes = bytes(MESSAGE); - if (compress) { - msgBytes = compress(msgBytes); - } - ByteArrayOutputStream os = - new ByteArrayOutputStream(msgBytes.length + COMPRESSION_HEADER_LENGTH); - DataOutputStream dos = new DataOutputStream(os); - int frameFlag = compress ? FLATE_FLAG : NO_COMPRESS_FLAG; - // Header = 1b flag | 3b length of GRPC frame - int header = (frameFlag << 24) | msgBytes.length; - dos.writeInt(header); - dos.write(msgBytes); - dos.close(); - return os.toByteArray(); - } - - private byte[] bytes(String str) { - return str.getBytes(UTF_8); - } - - private byte[] compress(byte[] data) throws Exception { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - DeflaterOutputStream dos = new DeflaterOutputStream(out); - dos.write(data); - dos.close(); - return out.toByteArray(); - } -} diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java index 4c2763f8ae..d81015b347 100644 --- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java @@ -53,7 +53,6 @@ import com.google.common.io.ByteStreams; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; import com.google.net.stubby.Status.Code; -import com.google.net.stubby.transport.Framer; import com.google.net.stubby.transport.MessageFramer2; import com.google.net.stubby.transport.ServerStream; import com.google.net.stubby.transport.ServerStreamListener; @@ -272,7 +271,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { private ByteBuf dataFrame(int streamId, boolean endStream) { final ByteBuf compressionFrame = Unpooled.buffer(CONTENT.length); - MessageFramer2 framer = new MessageFramer2(new Framer.Sink() { + MessageFramer2 framer = new MessageFramer2(new MessageFramer2.Sink() { @Override public void deliverFrame(ByteBuffer frame, boolean endOfStream) { compressionFrame.writeBytes(frame); diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyTestUtil.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyTestUtil.java index 696a44e61a..7782715939 100644 --- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyTestUtil.java +++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyTestUtil.java @@ -31,13 +31,11 @@ package com.google.net.stubby.transport.netty; -import static com.google.net.stubby.GrpcFramingUtil.PAYLOAD_FRAME; import static com.google.net.stubby.GrpcFramingUtil.STATUS_FRAME; import static io.netty.util.CharsetUtil.UTF_8; import com.google.common.io.ByteStreams; import com.google.net.stubby.Status; -import com.google.net.stubby.transport.AbstractStream; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -60,10 +58,6 @@ public class NettyTestUtil { static ByteBuf messageFrame(String message) throws Exception { ByteArrayOutputStream os = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(os); - if (!AbstractStream.GRPC_V2_PROTOCOL) { - dos.write(PAYLOAD_FRAME); - dos.writeInt(message.length()); - } dos.write(message.getBytes(UTF_8)); dos.close(); @@ -86,9 +80,7 @@ public class NettyTestUtil { static ByteBuf compressionFrame(byte[] data) { ByteBuf buf = Unpooled.buffer(); - if (AbstractStream.GRPC_V2_PROTOCOL) { - buf.writeByte(0); - } + buf.writeByte(0); buf.writeInt(data.length); buf.writeBytes(data); return buf; diff --git a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java index 1564a20df7..71854ede3c 100644 --- a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java @@ -98,10 +98,7 @@ class OkHttpClientStream extends Http2ClientStream { OkHttpClientTransport transport, Object executorLock, OutboundFlowController outboundFlow) { - super(listener, null, executor); - if (!GRPC_V2_PROTOCOL) { - throw new RuntimeException("okhttp transport can only work with V2 protocol!"); - } + super(listener, executor); this.frameWriter = frameWriter; this.transport = transport; this.executorLock = executorLock;