Remove gRPC v1 support.

No major refactorings/simplifications were done. Only gRPC v1 support
infrastructure was removed.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=82737436
This commit is contained in:
ejona 2014-12-23 12:16:17 -08:00 committed by Eric Anderson
parent 1c20eb6cef
commit c0f41920bd
28 changed files with 105 additions and 2240 deletions

View File

@ -62,10 +62,8 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
private Metadata.Trailers trailers;
protected AbstractClientStream(ClientStreamListener listener,
@Nullable Decompressor decompressor,
Executor deframerExecutor) {
super(decompressor, deframerExecutor);
protected AbstractClientStream(ClientStreamListener listener, Executor deframerExecutor) {
super(deframerExecutor);
this.listener = Preconditions.checkNotNull(listener);
}
@ -151,20 +149,10 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
// Stash the status & trailers so they can be delivered by the deframer calls
// remoteEndClosed
this.status = status;
if (GRPC_V2_PROTOCOL) {
this.trailers = trailers;
}
this.trailers = trailers;
deframe(Buffers.empty(), true);
}
/** gRPC protocol v1 support */
@Override
protected void receiveStatus(Status status) {
Preconditions.checkNotNull(status, "status");
this.status = status;
trailers = new Metadata.Trailers();
}
@Override
protected void remoteEndClosed() {
transportReportStatus(status, trailers);
@ -207,7 +195,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
@Override
public final void halfClose() {
if (outboundPhase(Phase.STATUS) != Phase.STATUS) {
closeFramer(null);
closeFramer();
}
}

View File

@ -63,9 +63,8 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
/** Saved trailers from close() that need to be sent once the framer has sent all messages. */
private Metadata.Trailers stashedTrailers;
protected AbstractServerStream(IdT id, @Nullable Decompressor decompressor,
Executor deframerExecutor) {
super(decompressor, deframerExecutor);
protected AbstractServerStream(IdT id, Executor deframerExecutor) {
super(deframerExecutor);
id(id);
}
@ -79,12 +78,6 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
return listener.messageRead(is, length);
}
/** gRPC protocol v1 support */
@Override
protected void receiveStatus(Status status) {
Preconditions.checkState(status == Status.OK, "Received status can only be OK on server");
}
@Override
public void writeHeaders(Metadata.Headers headers) {
Preconditions.checkNotNull(headers, "headers");
@ -111,7 +104,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
gracefulClose = true;
this.stashedTrailers = trailers;
writeStatusToTrailers(status);
closeFramer(status);
closeFramer();
}
}
@ -148,17 +141,13 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
@Override
protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) {
if (!GRPC_V2_PROTOCOL) {
sendFrame(frame, endOfStream);
} else {
if (frame.hasRemaining()) {
sendFrame(frame, false);
}
if (endOfStream) {
sendTrailers(stashedTrailers, headersSent);
headersSent = true;
stashedTrailers = null;
}
if (frame.hasRemaining()) {
sendFrame(frame, false);
}
if (endOfStream) {
sendTrailers(stashedTrailers, headersSent);
headersSent = true;
stashedTrailers = null;
}
}
@ -237,7 +226,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
stashedTrailers = new Metadata.Trailers();
}
writeStatusToTrailers(status);
closeFramer(status);
closeFramer();
} else {
dispose();
}

View File

@ -39,7 +39,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.net.stubby.Status;
import java.io.InputStream;
import java.nio.ByteBuffer;
@ -51,13 +50,6 @@ import javax.annotation.Nullable;
* Abstract base class for {@link Stream} implementations.
*/
public abstract class AbstractStream<IdT> implements Stream {
/**
* Global to enable gRPC v2 protocol support, which may be incomplete. This is a complete hack
* and should please, please, please be temporary to ease migration.
*/
// TODO(user): remove this once v1 support is dropped.
public static boolean GRPC_V2_PROTOCOL = true;
/**
* Indicates the phase of the GRPC stream in one direction.
*/
@ -66,7 +58,7 @@ public abstract class AbstractStream<IdT> implements Stream {
}
private volatile IdT id;
private final Framer framer;
private final MessageFramer2 framer;
private final FutureCallback<Object> deframerErrorCallback = new FutureCallback<Object>() {
@Override
public void onSuccess(Object result) {}
@ -77,7 +69,6 @@ public abstract class AbstractStream<IdT> implements Stream {
}
};
final GrpcDeframer deframer;
final MessageDeframer2 deframer2;
/**
@ -90,9 +81,8 @@ public abstract class AbstractStream<IdT> implements Stream {
*/
private Phase outboundPhase = Phase.HEADERS;
AbstractStream(@Nullable Decompressor decompressor,
Executor deframerExecutor) {
GrpcDeframer.Sink inboundMessageHandler = new GrpcDeframer.Sink() {
AbstractStream(Executor deframerExecutor) {
MessageDeframer2.Sink inboundMessageHandler = new MessageDeframer2.Sink() {
@Override
public ListenableFuture<Void> messageRead(InputStream input, final int length) {
ListenableFuture<Void> future = null;
@ -104,17 +94,12 @@ public abstract class AbstractStream<IdT> implements Stream {
}
}
@Override
public void statusRead(Status status) {
receiveStatus(status);
}
@Override
public void endOfStream() {
remoteEndClosed();
}
};
Framer.Sink<ByteBuffer> outboundFrameHandler = new Framer.Sink<ByteBuffer>() {
MessageFramer2.Sink<ByteBuffer> outboundFrameHandler = new MessageFramer2.Sink<ByteBuffer>() {
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfStream) {
internalSendFrame(frame, endOfStream);
@ -129,19 +114,8 @@ public abstract class AbstractStream<IdT> implements Stream {
returnProcessedBytes(numBytes);
}
};
if (!GRPC_V2_PROTOCOL) {
framer = new MessageFramer(outboundFrameHandler, 4096);
this.deframer =
new GrpcDeframer(decompressor, inboundMessageHandler, deframerExecutor, listener);
this.deframer2 = null;
} else {
if (decompressor != null) {
decompressor.close();
}
framer = new MessageFramer2(outboundFrameHandler, 4096);
this.deframer = null;
this.deframer2 = new MessageDeframer2(inboundMessageHandler, deframerExecutor, listener);
}
framer = new MessageFramer2(outboundFrameHandler, 4096);
this.deframer2 = new MessageDeframer2(inboundMessageHandler, deframerExecutor, listener);
}
/**
@ -188,14 +162,9 @@ public abstract class AbstractStream<IdT> implements Stream {
* Closes the underlying framer.
*
* <p>No-op if the framer has already been closed.
*
* @param status if not null, will write the status to the framer before closing it
*/
final void closeFramer(@Nullable Status status) {
final void closeFramer() {
if (!framer.isClosed()) {
if (status != null) {
framer.writeStatus(status);
}
framer.close();
}
}
@ -225,9 +194,6 @@ public abstract class AbstractStream<IdT> implements Stream {
/** A message was deframed. */
protected abstract ListenableFuture<Void> receiveMessage(InputStream is, int length);
/** A status was deframed. */
protected abstract void receiveStatus(Status status);
/** Deframer reached end of stream. */
protected abstract void remoteEndClosed();
@ -248,11 +214,7 @@ public abstract class AbstractStream<IdT> implements Stream {
*/
protected final void deframe(Buffer frame, boolean endOfStream) {
ListenableFuture<?> future;
if (GRPC_V2_PROTOCOL) {
future = deframer2.deframe(frame, endOfStream);
} else {
future = deframer.deframe(frame, endOfStream);
}
future = deframer2.deframe(frame, endOfStream);
if (future != null) {
Futures.addCallback(future, deframerErrorCallback);
}
@ -262,15 +224,9 @@ public abstract class AbstractStream<IdT> implements Stream {
* Delays delivery from the deframer until the given future completes.
*/
protected final void delayDeframer(ListenableFuture<?> future) {
if (GRPC_V2_PROTOCOL) {
ListenableFuture<?> deliveryFuture = deframer2.delayProcessing(future);
if (deliveryFuture != null) {
Futures.addCallback(deliveryFuture, deframerErrorCallback);
}
} else {
// V1 is a little broken as it doesn't strictly wait for the last payload handled
// by the deframer to be processed by the application layer. Not worth fixing as will
// be removed when the v1 deframer is removed.
ListenableFuture<?> deliveryFuture = deframer2.delayProcessing(future);
if (deliveryFuture != null) {
Futures.addCallback(deliveryFuture, deframerErrorCallback);
}
}

View File

@ -1,367 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.net.stubby.transport;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import com.google.net.stubby.DeferredInputStream;
import com.google.net.stubby.transport.Framer.Sink;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.zip.Deflater;
/**
* Compression framer for HTTP/2 transport frames, for use in both compression and
* non-compression scenarios. Receives message-stream as input. It is able to change compression
* configuration on-the-fly, but will not actually begin using the new configuration until the next
* full frame.
*/
class CompressionFramer {
/**
* Compression level to indicate using this class's default level. Note that this value is
* allowed to conflict with Deflate.DEFAULT_COMPRESSION, in which case this class's default
* prevails.
*/
public static final int DEFAULT_COMPRESSION_LEVEL = -1;
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
/**
* Size of the GRPC compression frame header which consists of:
* 1 byte for the compression type,
* 3 bytes for the length of the compression frame.
*/
@VisibleForTesting
static final int HEADER_LENGTH = 4;
/**
* Number of frame bytes to reserve to allow for zlib overhead. This does not include data-length
* dependent overheads and compression latency (delay between providing data to zlib and output of
* the compressed data).
*
* <p>References:
* deflate framing: http://www.gzip.org/zlib/rfc-deflate.html
* (note that bit-packing is little-endian (section 3.1.1) whereas description of sequences
* is big-endian, so bits appear reversed),
* zlib framing: http://tools.ietf.org/html/rfc1950,
* details on flush behavior: http://www.zlib.net/manual.html
*/
@VisibleForTesting
static final int MARGIN
= 5 /* deflate current block overhead, assuming no compression:
block type (1) + len (2) + nlen (2) */
+ 5 /* deflate flush; adds an empty block after current:
00 (not end; no compression) 00 00 (len) FF FF (nlen) */
+ 5 /* deflate flush; some versions of zlib output two empty blocks on some flushes */
+ 5 /* deflate finish; adds empty block to mark end, since we commonly flush before finish:
03 (end; fixed Huffman + 5 bits of end of block) 00 (last 3 bits + padding),
or if compression level is 0: 01 (end; no compression) 00 00 (len) FF FF (nlen) */
+ 2 /* zlib header; CMF (1) + FLG (1) */ + 4 /* zlib ADLER32 (4) */
+ 5 /* additional safety for good measure */;
private static final Logger log = Logger.getLogger(CompressionFramer.class.getName());
private final Sink<ByteBuffer> sink;
/**
* Bytes of frame being constructed. {@code position() == 0} when no frame in progress.
*/
private final ByteBuffer bytebuf;
/** Number of frame bytes it is acceptable to leave unused when compressing. */
private final int sufficient;
private Deflater deflater;
/** Number of bytes written to deflater since last deflate sync. */
private int writtenSinceSync;
/** Number of bytes read from deflater since last deflate sync. */
private int readSinceSync;
/**
* Whether the current frame is actually being compressed. If {@code bytebuf.position() == 0},
* then this value has no meaning.
*/
private boolean usingCompression;
/**
* Whether compression is requested. This does not imply we are compressing the current frame
* (see {@link #usingCompression}), or that we will even compress the next frame (see {@link
* #compressionUnsupported}).
*/
private boolean allowCompression;
/** Whether compression is possible with current configuration and platform. */
private final boolean compressionUnsupported;
/**
* Compression level to set on the Deflater, where {@code DEFAULT_COMPRESSION_LEVEL} implies this
* class's default.
*/
private int compressionLevel = DEFAULT_COMPRESSION_LEVEL;
private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter();
/**
* Since compression tries to form full frames, if compression is working well then it will
* consecutively compress smaller amounts of input data in order to not exceed the frame size. For
* example, if the data is getting 50% compression and a maximum frame size of 128, then it will
* encode roughly 128 bytes which leaves 64, so we encode 64, 32, 16, 8, 4, 2, 1, 1.
* {@code sufficient} cuts off the long tail and says that at some point the frame is "good
* enough" to stop. Choosing a value of {@code 0} is not outrageous.
*
* @param maxFrameSize maximum number of bytes allowed for output frames
* @param allowCompression whether frames should be compressed
* @param sufficient number of frame bytes it is acceptable to leave unused when compressing
*/
public CompressionFramer(Sink<ByteBuffer> sink, int maxFrameSize, boolean allowCompression,
int sufficient) {
this.sink = sink;
this.allowCompression = allowCompression;
int maxSufficient = maxFrameSize - HEADER_LENGTH - MARGIN
- 1 /* to force at least one byte of data */;
boolean compressionUnsupported = false;
if (maxSufficient < 0) {
compressionUnsupported = true;
log.log(Level.INFO, "Frame not large enough for compression");
} else if (maxSufficient < sufficient) {
log.log(Level.INFO, "Compression sufficient reduced to {0} from {1} to fit in frame size {2}",
new Object[] {maxSufficient, sufficient, maxFrameSize});
sufficient = maxSufficient;
}
this.sufficient = sufficient;
// TODO(user): Benchmark before switching to direct buffers
bytebuf = ByteBuffer.allocate(maxFrameSize);
if (!bytebuf.hasArray()) {
compressionUnsupported = true;
log.log(Level.INFO, "Byte buffer doesn't support array(), which is required for compression");
}
this.compressionUnsupported = compressionUnsupported;
}
/**
* Sets whether compression is encouraged.
*/
public void setAllowCompression(boolean allow) {
this.allowCompression = allow;
}
/**
* Set the preferred compression level for when compression is enabled.
*
* @param level the preferred compression level (0-9), or {@code DEFAULT_COMPRESSION_LEVEL} to use
* this class's default
* @see java.util.zip.Deflater#setLevel
*/
public void setCompressionLevel(int level) {
Preconditions.checkArgument(level == DEFAULT_COMPRESSION_LEVEL
|| (level >= Deflater.NO_COMPRESSION && level <= Deflater.BEST_COMPRESSION),
"invalid compression level");
this.compressionLevel = level;
}
/**
* Ensures state and buffers are initialized for writing data to a frame. Callers should be very
* aware this method may modify {@code usingCompression}.
*/
private void checkInitFrame() {
if (bytebuf.position() != 0) {
return;
}
bytebuf.position(HEADER_LENGTH);
usingCompression = compressionUnsupported ? false : allowCompression;
if (usingCompression) {
if (deflater == null) {
deflater = new Deflater();
} else {
deflater.reset();
}
deflater.setLevel(compressionLevel == DEFAULT_COMPRESSION_LEVEL
? Deflater.DEFAULT_COMPRESSION : compressionLevel);
writtenSinceSync = 0;
readSinceSync = 0;
}
}
/** Frame contents of {@code message}, flushing to {@code sink} as necessary. */
public int write(InputStream message) throws IOException {
checkInitFrame();
if (!usingCompression && bytebuf.hasArray()) {
if (bytebuf.remaining() == 0) {
commitToSink(false, false);
}
int available = message.available();
if (available <= bytebuf.remaining()) {
// When InputStream is DeferredProtoInputStream, this is zero-copy because bytebuf is large
// enough for the proto to be serialized directly into it.
int read = ByteStreams.read(message,
bytebuf.array(), bytebuf.arrayOffset() + bytebuf.position(), bytebuf.remaining());
bytebuf.position(bytebuf.position() + read);
if (read != available) {
throw new RuntimeException("message.available() did not follow our semantics of always "
+ "returning the number of remaining bytes");
}
return read;
}
}
if (message instanceof DeferredInputStream) {
return ((DeferredInputStream<?>) message).flushTo(outputStreamAdapter);
} else {
// This could be optimized when compression is off, but we expect performance-critical code
// to provide a DeferredInputStream.
return (int) ByteStreams.copy(message, outputStreamAdapter);
}
}
/**
* Frame contents of {@code b} between {@code off} (inclusive) and {@code off + len} (exclusive),
* flushing to {@code sink} as necessary.
*/
public void write(byte[] b, int off, int len) {
while (len > 0) {
checkInitFrame();
if (!usingCompression) {
if (bytebuf.remaining() == 0) {
commitToSink(false, false);
continue;
}
int toWrite = Math.min(len, bytebuf.remaining());
bytebuf.put(b, off, toWrite);
off += toWrite;
len -= toWrite;
} else {
if (bytebuf.remaining() <= MARGIN + sufficient) {
commitToSink(false, false);
continue;
}
// Amount of memory that is guaranteed not to be consumed, including in-flight data in zlib.
int safeCapacity = bytebuf.remaining() - MARGIN
- (writtenSinceSync - readSinceSync) - dataLengthDependentOverhead(writtenSinceSync);
if (safeCapacity <= 0) {
while (deflatePut(deflater, bytebuf, Deflater.SYNC_FLUSH) != 0) {}
writtenSinceSync = 0;
readSinceSync = 0;
continue;
}
int toWrite = Math.min(len, safeCapacity - dataLengthDependentOverhead(safeCapacity));
deflater.setInput(b, off, toWrite);
writtenSinceSync += toWrite;
while (!deflater.needsInput()) {
readSinceSync += deflatePut(deflater, bytebuf, Deflater.NO_FLUSH);
}
// Clear internal references of byte[] b.
deflater.setInput(EMPTY_BYTE_ARRAY);
off += toWrite;
len -= toWrite;
}
}
}
/**
* When data is uncompressable, there are 5B of overhead per deflate block, which is generally
* 16 KiB for zlib, but the format supports up to 32 KiB. One block's overhead is already
* accounted for in MARGIN. We use 1B/2KiB to circumvent dealing with rounding errors. Note that
* 1B/2KiB is not enough to support 8 KiB blocks due to rounding errors.
*/
private static int dataLengthDependentOverhead(int length) {
return length / 2048;
}
private static int deflatePut(Deflater deflater, ByteBuffer bytebuf, int flush) {
if (bytebuf.remaining() == 0) {
throw new AssertionError("Compressed data exceeded frame size");
}
int deflateBytes = deflater.deflate(bytebuf.array(), bytebuf.arrayOffset() + bytebuf.position(),
bytebuf.remaining(), flush);
bytebuf.position(bytebuf.position() + deflateBytes);
return deflateBytes;
}
public void endOfMessage() {
if ((!usingCompression && bytebuf.remaining() == 0)
|| (usingCompression && bytebuf.remaining() <= MARGIN + sufficient)) {
commitToSink(true, false);
}
}
public void flush() {
if (bytebuf.position() == 0) {
return;
}
commitToSink(true, false);
}
public void close() {
if (bytebuf.position() == 0) {
// No pending frame, so send an empty one.
bytebuf.flip();
sink.deliverFrame(bytebuf, true);
bytebuf.clear();
} else {
commitToSink(true, true);
}
}
/**
* Writes compression frame to sink. It does not initialize the next frame, so {@link
* #checkInitFrame()} is necessary if other frames are to follow.
*/
private void commitToSink(boolean endOfMessage, boolean endOfStream) {
if (usingCompression) {
deflater.finish();
while (!deflater.finished()) {
deflatePut(deflater, bytebuf, Deflater.NO_FLUSH);
}
if (endOfMessage) {
deflater.end();
deflater = null;
}
}
int frameFlag = usingCompression
? TransportFrameUtil.FLATE_FLAG : TransportFrameUtil.NO_COMPRESS_FLAG;
// Header = 1b flag | 3b length of GRPC frame
int header = (frameFlag << 24) | (bytebuf.position() - 4);
bytebuf.putInt(0, header);
bytebuf.flip();
sink.deliverFrame(bytebuf, endOfStream);
bytebuf.clear();
}
private class OutputStreamAdapter extends OutputStream {
private final byte[] singleByte = new byte[1];
@Override
public void write(int b) {
singleByte[0] = (byte) b;
write(singleByte, 0, 1);
}
@Override
public void write(byte[] b, int off, int len) {
CompressionFramer.this.write(b, off, len);
}
}
}

View File

@ -1,78 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.net.stubby.transport;
import java.io.Closeable;
import javax.annotation.Nullable;
/**
* An object responsible for reading GRPC compression frames for a single stream.
*/
public interface Decompressor extends Closeable {
/**
* Adds the given chunk of a GRPC compression frame to the internal buffers. If the data is
* compressed, it is uncompressed whenever possible (which may only be after the entire
* compression frame has been received).
*
* <p>Some or all of the given {@code data} chunk may not be made immediately available via
* {@link #readBytes} due to internal buffering.
*
* @param data a received chunk of a GRPC compression frame. Control over the life cycle for this
* buffer is given to this {@link Decompressor}. Only this {@link Decompressor} should call
* {@link Buffer#close} after this point.
*/
void decompress(Buffer data);
/**
* Reads up to the given number of bytes. Ownership of the returned {@link Buffer} is transferred
* to the caller who is responsible for calling {@link Buffer#close}.
*
* <p>The length of the returned {@link Buffer} may be less than {@code maxLength}, but will never
* be 0. If no data is available, {@code null} is returned. To ensure that all available data is
* read, the caller should repeatedly call {@link #readBytes} until it returns {@code null}.
*
* @param maxLength the maximum number of bytes to read. This value must be > 0, otherwise throws
* an {@link IllegalArgumentException}.
* @return a {@link Buffer} containing the number of bytes read or {@code null} if no data is
* currently available.
*/
@Nullable
Buffer readBytes(int maxLength);
/**
* Closes this decompressor and frees any resources.
*/
@Override
void close();
}

View File

@ -1,91 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.net.stubby.transport;
import com.google.net.stubby.Status;
import java.io.InputStream;
/**
* Implementations produce the GRPC byte sequence and then split it over multiple frames to be
* delivered via the transport layer which implements {@link Framer.Sink}
*/
public interface Framer {
/**
* Sink implemented by the transport layer to receive frames and forward them to their destination
*/
public interface Sink<T> {
/**
* Deliver a frame via the transport.
*
* @param frame the contents of the frame to deliver
* @param endOfStream whether the frame is the last one for the GRPC stream
*/
public void deliverFrame(T frame, boolean endOfStream);
}
/**
* Write out a Payload message. {@code payload} will be completely consumed.
* {@code payload.available()} must return the number of remaining bytes to be read.
*/
public void writePayload(InputStream payload, int length);
/**
* Write out a Status message.
*/
// TODO(user): change this signature when we actually start writing out the complete Status.
public void writeStatus(Status status);
/**
* Flush any buffered data in the framer to the sink.
*/
public void flush();
/**
* Indicates whether or not this {@link Framer} has been closed via a call to either
* {@link #close()} or {@link #dispose()}.
*/
public boolean isClosed();
/**
* Flushes and closes the framer and releases any buffers. After the {@link Framer} is closed or
* disposed, additional calls to this method will have no affect.
*/
public void close();
/**
* Closes the framer and releases any buffers, but does not flush. After the {@link Framer} is
* closed or disposed, additional calls to this method will have no affect.
*/
public void dispose();
}

View File

@ -1,320 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.net.stubby.transport;
import static com.google.net.stubby.GrpcFramingUtil.FRAME_LENGTH;
import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_LENGTH;
import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_MASK;
import static com.google.net.stubby.GrpcFramingUtil.PAYLOAD_FRAME;
import static com.google.net.stubby.GrpcFramingUtil.STATUS_FRAME;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.Status;
import java.io.Closeable;
import java.util.concurrent.Executor;
/**
* Deframer for GRPC frames. Delegates deframing/decompression of the GRPC compression frame to a
* {@link Decompressor}.
*/
public class GrpcDeframer implements Closeable {
public interface Sink extends MessageDeframer2.Sink {
void statusRead(Status status);
}
private enum State {
HEADER, BODY
}
private static final int HEADER_LENGTH = FRAME_TYPE_LENGTH + FRAME_LENGTH;
private final Decompressor decompressor;
private final Executor executor;
private final DeframerListener listener;
private State state = State.HEADER;
private int requiredLength = HEADER_LENGTH;
private int frameType;
private boolean statusNotified;
private boolean endOfStream;
private SettableFuture<?> deliveryOutstanding;
private Sink sink;
private CompositeBuffer nextFrame;
/**
* Constructs the deframer.
*
* @param decompressor the object used for de-framing GRPC compression frames.
* @param sink the sink for fully read GRPC messages.
* @param executor the executor to be used for delivery. All calls to
* {@link #deframe(Buffer, boolean)} must be made in the context of this executor. This
* executor must not allow concurrent access to this class, so it must be either a single
* thread or have sequential processing of events.
* @param listener a listener to deframing events
*/
public GrpcDeframer(Decompressor decompressor, Sink sink, Executor executor,
DeframerListener listener) {
this.decompressor = Preconditions.checkNotNull(decompressor, "decompressor");
this.sink = Preconditions.checkNotNull(sink, "sink");
this.executor = Preconditions.checkNotNull(executor, "executor");
this.listener = Preconditions.checkNotNull(listener, "listener");
}
/**
* Adds the given data to this deframer and attempts delivery to the sink.
*
* <p>If returned future is not {@code null}, then it completes when no more deliveries are
* occuring. Delivering completes if all available deframing input is consumed or if delivery
* resulted in an exception, in which case this method may throw the exception or the returned
* future will fail with the throwable. The future is guaranteed to complete within the executor
* provided during construction.
*/
public ListenableFuture<?> deframe(Buffer data, boolean endOfStream) {
Preconditions.checkNotNull(data, "data");
// Add the data to the decompression buffer.
decompressor.decompress(data);
// Indicate that all of the data for this stream has been received.
this.endOfStream = endOfStream;
if (deliveryOutstanding != null) {
// Only allow one outstanding delivery at a time.
return null;
}
return deliver();
}
@Override
public void close() {
decompressor.close();
if (nextFrame != null) {
nextFrame.close();
}
}
/**
* Reads and delivers as many messages to the sink as possible. May only be called when a delivery
* is known not to be outstanding.
*/
private ListenableFuture<?> deliver() {
// Process the uncompressed bytes.
while (readRequiredBytes()) {
if (statusNotified) {
throw new IllegalStateException("Inbound data after receiving status frame");
}
switch (state) {
case HEADER:
processHeader();
break;
case BODY:
// Read the body and deliver the message to the sink.
ListenableFuture<?> processingFuture = processBody();
if (processingFuture != null) {
// A future was returned for the completion of processing the delivered
// message. Once it's done, try to deliver the next message.
return delayProcessingInternal(processingFuture);
}
break;
default:
throw new AssertionError("Invalid state: " + state);
}
}
if (endOfStream) {
if (nextFrame.readableBytes() != 0) {
throw Status.INTERNAL
.withDescription("Encountered end-of-stream mid-frame")
.asRuntimeException();
}
// If reached the end of stream without reading a status frame, fabricate one
// and deliver to the target.
if (!statusNotified) {
notifyStatus(Status.OK);
}
}
// All available messages processed.
if (deliveryOutstanding != null) {
SettableFuture<?> previousOutstanding = deliveryOutstanding;
deliveryOutstanding = null;
previousOutstanding.set(null);
}
return null;
}
/**
* May only be called when a delivery is known not to be outstanding. If deliveryOutstanding is
* non-null, then it will be re-used and this method will return {@code null}.
*/
private ListenableFuture<?> delayProcessingInternal(ListenableFuture<?> future) {
Preconditions.checkNotNull(future, "future");
// Return a separate future so that our callback is guaranteed to complete before any
// listeners on the returned future.
ListenableFuture<?> returnFuture = null;
if (deliveryOutstanding == null) {
returnFuture = deliveryOutstanding = SettableFuture.create();
}
Futures.addCallback(future, new FutureCallback<Object>() {
@Override
public void onFailure(Throwable t) {
SettableFuture<?> previousOutstanding = deliveryOutstanding;
deliveryOutstanding = null;
previousOutstanding.setException(t);
}
@Override
public void onSuccess(Object result) {
try {
deliver();
} catch (Throwable t) {
if (deliveryOutstanding == null) {
throw Throwables.propagate(t);
} else {
onFailure(t);
}
}
}
}, executor);
return returnFuture;
}
/**
* Attempts to read the required bytes into nextFrame.
*
* @returns {@code true} if all of the required bytes have been read.
*/
private boolean readRequiredBytes() {
int totalBytesRead = 0;
try {
if (nextFrame == null) {
nextFrame = new CompositeBuffer();
}
// Read until the buffer contains all the required bytes.
int missingBytes;
while ((missingBytes = requiredLength - nextFrame.readableBytes()) > 0) {
Buffer buffer = decompressor.readBytes(missingBytes);
if (buffer == null) {
// No more data is available.
break;
}
totalBytesRead += buffer.readableBytes();
// Add it to the composite buffer for the next frame.
nextFrame.addBuffer(buffer);
}
// Return whether or not all of the required bytes are now in the frame.
return nextFrame.readableBytes() == requiredLength;
} finally {
if (totalBytesRead > 0) {
listener.bytesRead(totalBytesRead);
}
}
}
/**
* Processes the GRPC compression header which is composed of the compression flag and the outer
* frame length.
*/
private void processHeader() {
// Peek, but do not read the header.
frameType = nextFrame.readUnsignedByte() & FRAME_TYPE_MASK;
// Update the required length to include the length of the frame.
requiredLength = nextFrame.readInt();
// Continue reading the frame body.
state = State.BODY;
}
/**
* Processes the body of the GRPC compression frame. A single compression frame may contain
* several GRPC messages within it.
*/
private ListenableFuture<Void> processBody() {
ListenableFuture<Void> future = null;
switch (frameType) {
case PAYLOAD_FRAME:
future = processMessage();
break;
case STATUS_FRAME:
processStatus();
break;
default:
throw new AssertionError("Invalid frameType: " + frameType);
}
// Done with this frame, begin processing the next header.
state = State.HEADER;
requiredLength = HEADER_LENGTH;
return future;
}
/**
* Processes the payload of a message frame.
*/
private ListenableFuture<Void> processMessage() {
try {
return sink.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes());
} finally {
// Don't close the frame, since the sink is now responsible for the life-cycle.
nextFrame = null;
}
}
/**
* Processes the payload of a status frame.
*/
private void processStatus() {
try {
notifyStatus(Status.fromCodeValue(nextFrame.readUnsignedShort()));
} finally {
nextFrame.close();
nextFrame = null;
}
}
/**
* Delivers the status notification to the sink.
*/
private void notifyStatus(Status status) {
statusNotified = true;
sink.statusRead(status);
sink.endOfStream();
}
}

View File

@ -70,10 +70,8 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
private Charset errorCharset = Charsets.UTF_8;
private boolean contentTypeChecked;
protected Http2ClientStream(ClientStreamListener listener,
@Nullable Decompressor decompressor,
Executor deframerExecutor) {
super(listener, decompressor, deframerExecutor);
protected Http2ClientStream(ClientStreamListener listener, Executor deframerExecutor) {
super(listener, deframerExecutor);
}
protected void transportHeadersReceived(Metadata.Headers headers) {
@ -125,24 +123,19 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
} else {
inboundDataReceived(frame);
if (endOfStream) {
if (GRPC_V2_PROTOCOL) {
if (false) {
// This is a protocol violation as we expect to receive trailers.
transportError = Status.INTERNAL.withDescription("Recevied EOS on DATA frame");
frame.close();
inboundTransportError(transportError);
} else {
// TODO(user): Delete this hack when trailers are supported by GFE with v2. Currently
// GFE doesn't support trailers, so when using gRPC v2 protocol GFE will not send any
// status. We paper over this for now by just assuming OK. For all properly functioning
// servers (both v1 and v2), stashedStatus should not be null here.
Metadata.Trailers trailers = new Metadata.Trailers();
trailers.put(Status.CODE_KEY, Status.OK);
inboundTrailersReceived(trailers, Status.OK);
}
if (false) {
// This is a protocol violation as we expect to receive trailers.
transportError = Status.INTERNAL.withDescription("Recevied EOS on DATA frame");
frame.close();
inboundTransportError(transportError);
} else {
// Synthesize trailers until we get rid of v1.
inboundTrailersReceived(new Metadata.Trailers(), Status.OK);
// TODO(user): Delete this hack when trailers are supported by GFE with v2. Currently
// GFE doesn't support trailers, so when using gRPC v2 protocol GFE will not send any
// status. We paper over this for now by just assuming OK. For all properly functioning
// servers (both v1 and v2), stashedStatus should not be null here.
Metadata.Trailers trailers = new Metadata.Trailers();
trailers.put(Status.CODE_KEY, Status.OK);
inboundTrailersReceived(trailers, Status.OK);
}
}
}

View File

@ -1,124 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.net.stubby.transport;
import com.google.net.stubby.GrpcFramingUtil;
import com.google.net.stubby.Status;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
/**
* Default {@link Framer} implementation.
*/
public class MessageFramer implements Framer {
private CompressionFramer framer;
private final ByteBuffer scratch = ByteBuffer.allocate(16);
public MessageFramer(Sink<ByteBuffer> sink, int maxFrameSize) {
// TODO(user): maxFrameSize should probably come from a 'Platform' class
framer = new CompressionFramer(sink, maxFrameSize, false, maxFrameSize / 16);
}
/**
* Sets whether compression is encouraged.
*/
public void setAllowCompression(boolean enable) {
verifyNotClosed();
framer.setAllowCompression(enable);
}
@Override
public void writePayload(InputStream message, int messageLength) {
verifyNotClosed();
try {
scratch.clear();
scratch.put(GrpcFramingUtil.PAYLOAD_FRAME);
scratch.putInt(messageLength);
framer.write(scratch.array(), 0, scratch.position());
if (messageLength != framer.write(message)) {
throw new RuntimeException("Message length was inaccurate");
}
framer.endOfMessage();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
@Override
public void writeStatus(Status status) {
verifyNotClosed();
short code = (short) status.getCode().value();
scratch.clear();
scratch.put(GrpcFramingUtil.STATUS_FRAME);
int length = 2;
scratch.putInt(length);
scratch.putShort(code);
framer.write(scratch.array(), 0, scratch.position());
framer.endOfMessage();
}
@Override
public void flush() {
verifyNotClosed();
framer.flush();
}
@Override
public boolean isClosed() {
return framer == null;
}
@Override
public void close() {
if (!isClosed()) {
// TODO(user): Returning buffer to a pool would go here
framer.close();
framer = null;
}
}
@Override
public void dispose() {
// TODO(user): Returning buffer to a pool would go here
framer = null;
}
private void verifyNotClosed() {
if (isClosed()) {
throw new IllegalStateException("Framer already closed");
}
}
}

View File

@ -34,7 +34,6 @@ package com.google.net.stubby.transport;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import com.google.net.stubby.DeferredInputStream;
import com.google.net.stubby.Status;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -45,9 +44,23 @@ import java.nio.ByteBuffer;
import java.util.zip.GZIPOutputStream;
/**
* Default {@link Framer} implementation.
* Encodes gRPC messages to be delivered via the transport layer which implements {@link
* MessageFramer2.Sink}.
*/
public class MessageFramer2 implements Framer {
public class MessageFramer2 {
/**
* Sink implemented by the transport layer to receive frames and forward them to their destination
*/
public interface Sink<T> {
/**
* Deliver a frame via the transport.
*
* @param frame the contents of the frame to deliver
* @param endOfStream whether the frame is the last one for the GRPC stream
*/
public void deliverFrame(T frame, boolean endOfStream);
}
private static final int HEADER_LENGTH = 5;
private static final byte UNCOMPRESSED = 0;
private static final byte COMPRESSED = 1;
@ -72,7 +85,10 @@ public class MessageFramer2 implements Framer {
this.compression = Preconditions.checkNotNull(compression, "compression");
}
@Override
/**
* Write out a Payload message. {@code message} will be completely consumed.
* {@code message.available()} must return the number of remaining bytes to be read.
*/
public void writePayload(InputStream message, int messageLength) {
try {
if (compression == Compression.NONE) {
@ -144,12 +160,9 @@ public class MessageFramer2 implements Framer {
}
}
@Override
public void writeStatus(Status status) {
// NOOP
}
@Override
/**
* Flush any buffered data in the framer to the sink.
*/
public void flush() {
if (bytebuf.position() == 0) {
return;
@ -157,12 +170,18 @@ public class MessageFramer2 implements Framer {
commitToSink(false);
}
@Override
/**
* Indicates whether or not this {@link Framer} has been closed via a call to either
* {@link #close()} or {@link #dispose()}.
*/
public boolean isClosed() {
return bytebuf == null;
}
@Override
/**
* Flushes and closes the framer and releases any buffers. After the {@link Framer} is closed or
* disposed, additional calls to this method will have no affect.
*/
public void close() {
if (!isClosed()) {
commitToSink(true);
@ -170,7 +189,10 @@ public class MessageFramer2 implements Framer {
}
}
@Override
/**
* Closes the framer and releases any buffers, but does not flush. After the {@link Framer} is
* closed or disposed, additional calls to this method will have no affect.
*/
public void dispose() {
// TODO(user): Returning buffer to a pool would go here
bytebuf = null;

View File

@ -55,40 +55,6 @@ public final class TransportFrameUtil {
private static final byte[] binaryHeaderSuffixBytes =
Metadata.BINARY_HEADER_SUFFIX.getBytes(US_ASCII);
// Compression modes (lowest order 3 bits of frame flags)
public static final byte NO_COMPRESS_FLAG = 0x0;
public static final byte FLATE_FLAG = 0x1;
public static final byte COMPRESSION_FLAG_MASK = 0x7;
public static boolean isNotCompressed(int b) {
return ((b & COMPRESSION_FLAG_MASK) == NO_COMPRESS_FLAG);
}
public static boolean isFlateCompressed(int b) {
return ((b & COMPRESSION_FLAG_MASK) == FLATE_FLAG);
}
/**
* Length of the compression type field.
*/
public static final int COMPRESSION_TYPE_LENGTH = 1;
/**
* Length of the compression frame length field.
*/
public static final int COMPRESSION_FRAME_LENGTH = 3;
/**
* Full length of the compression header.
*/
public static final int COMPRESSION_HEADER_LENGTH =
COMPRESSION_TYPE_LENGTH + COMPRESSION_FRAME_LENGTH;
// Flags
public static final byte PAYLOAD_FRAME = 0x0;
public static final byte STATUS_FRAME = 0x3;
// TODO(user): This needs proper namespacing support, this is currently just a hack
/**
* Converts the path from the HTTP request to the full qualified method name.

View File

@ -1,130 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.net.stubby.transport;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Bytes;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import java.util.zip.Deflater;
import java.util.zip.InflaterInputStream;
/** Unit tests for {@link CompressionFramer}. */
@RunWith(JUnit4.class)
public class CompressionFramerTest {
private int maxFrameSize = 1024;
private int sufficient = 8;
private CapturingSink sink = new CapturingSink();
private CompressionFramer framer = new CompressionFramer(sink, maxFrameSize, true, sufficient);
@Test
public void testGoodCompression() {
byte[] payload = new byte[1000];
framer.setCompressionLevel(Deflater.BEST_COMPRESSION);
framer.write(payload, 0, payload.length);
framer.endOfMessage();
framer.flush();
assertEquals(1, sink.frames.size());
byte[] frame = sink.frames.get(0);
assertEquals(TransportFrameUtil.FLATE_FLAG, frame[0]);
assertTrue(decodeFrameLength(frame) < 30);
assertArrayEquals(payload, decompress(frame));
}
@Test
public void testPoorCompression() {
byte[] payload = new byte[3 * maxFrameSize / 2];
new Random(1).nextBytes(payload);
framer.setCompressionLevel(Deflater.DEFAULT_COMPRESSION);
framer.write(payload, 0, payload.length);
framer.endOfMessage();
framer.flush();
assertEquals(2, sink.frames.size());
assertEquals(TransportFrameUtil.FLATE_FLAG, sink.frames.get(0)[0]);
assertEquals(TransportFrameUtil.FLATE_FLAG, sink.frames.get(1)[0]);
assertTrue(decodeFrameLength(sink.frames.get(0)) <= maxFrameSize);
assertTrue(decodeFrameLength(sink.frames.get(0))
>= maxFrameSize - CompressionFramer.HEADER_LENGTH - CompressionFramer.MARGIN - sufficient);
assertArrayEquals(payload, decompress(sink.frames));
}
private static int decodeFrameLength(byte[] frame) {
return ((frame[1] & 0xFF) << 16)
| ((frame[2] & 0xFF) << 8)
| (frame[3] & 0xFF);
}
private static byte[] decompress(byte[] frame) {
try {
return ByteStreams.toByteArray(new InflaterInputStream(new ByteArrayInputStream(frame,
CompressionFramer.HEADER_LENGTH, frame.length - CompressionFramer.HEADER_LENGTH)));
} catch (IOException ex) {
throw new AssertionError();
}
}
private static byte[] decompress(List<byte[]> frames) {
byte[][] bytes = new byte[frames.size()][];
for (int i = 0; i < frames.size(); i++) {
bytes[i] = decompress(frames.get(i));
}
return Bytes.concat(bytes);
}
private static class CapturingSink implements Framer.Sink<ByteBuffer> {
public final List<byte[]> frames = Lists.newArrayList();
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
byte[] frameBytes = new byte[frame.remaining()];
frame.get(frameBytes);
assertEquals(frameBytes.length - CompressionFramer.HEADER_LENGTH,
decodeFrameLength(frameBytes));
frames.add(frameBytes);
}
}
}

View File

@ -1,280 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.net.stubby.transport;
import static com.google.net.stubby.transport.TransportFrameUtil.PAYLOAD_FRAME;
import static com.google.net.stubby.transport.TransportFrameUtil.STATUS_FRAME;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.Status;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import javax.annotation.Nullable;
/**
* Tests for {@link GrpcDeframer}.
*/
@RunWith(JUnit4.class)
public class GrpcDeframerTest {
private static final String MESSAGE = "hello world";
private static final byte[] MESSAGE_BYTES = MESSAGE.getBytes(StandardCharsets.UTF_8);
private static final Status STATUS_CODE = Status.CANCELLED;
private GrpcDeframer reader;
private StubDecompressor decompressor;
@Mock
private GrpcDeframer.Sink sink;
private SettableFuture<Void> messageFuture;
@Mock
private DeframerListener listener;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
messageFuture = SettableFuture.create();
when(sink.messageRead(any(InputStream.class), anyInt())).thenReturn(messageFuture);
decompressor = new StubDecompressor();
reader = new GrpcDeframer(decompressor, sink, MoreExecutors.directExecutor(), listener);
}
@Test
public void payloadShouldCallTarget() throws Exception {
decompressor.init(payloadFrame());
reader.deframe(Buffers.empty(), false);
verifyPayload();
verifyNoStatus();
}
@Test
public void payloadWithEndOfStreamShouldNotifyStatus() throws Exception {
decompressor.init(payloadFrame());
reader.deframe(Buffers.empty(), true);
verifyPayload();
verifyNoStatus();
messageFuture.set(null);
verifyStatus(Status.Code.OK);
}
@Test
public void statusShouldCallTarget() throws Exception {
decompressor.init(statusFrame());
reader.deframe(Buffers.empty(), false);
verifyNoPayload();
verifyStatus();
}
@Test
public void statusWithEndOfStreamShouldNotifyStatusOnce() throws Exception {
decompressor.init(statusFrame());
reader.deframe(Buffers.empty(), true);
verifyNoPayload();
verifyStatus();
}
@Test
public void multipleFramesShouldCallTarget() throws Exception {
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(os);
// Write a payload frame.
writeFrame(PAYLOAD_FRAME, MESSAGE_BYTES, dos);
// Write a status frame.
byte[] statusBytes = new byte[] {0, (byte) STATUS_CODE.getCode().value()};
writeFrame(STATUS_FRAME, statusBytes, dos);
// Now write the complete frame: compression header followed by the 3 message frames.
dos.close();
byte[] bodyBytes = os.toByteArray();
decompressor.init(bodyBytes);
reader.deframe(Buffers.empty(), false);
// Verify that all callbacks were called.
verifyPayload();
verifyNoStatus();
messageFuture.set(null);
verifyStatus();
}
@Test
public void partialFrameShouldSucceed() throws Exception {
byte[] frame = payloadFrame();
// Create a buffer that contains 2 payload frames.
byte[] fullBuffer = Arrays.copyOf(frame, frame.length * 2);
System.arraycopy(frame, 0, fullBuffer, frame.length, frame.length);
// Use only a portion of the frame. Should not call the sink.
int startIx = 0;
int endIx = 10;
byte[] chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx);
decompressor.init(chunk);
reader.deframe(Buffers.empty(), false);
verifyNoPayload();
verifyNoStatus();
// Supply the rest of the frame and a portion of a second frame. Should call the sink.
startIx = endIx;
endIx = startIx + frame.length;
chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx);
decompressor.init(chunk);
reader.deframe(Buffers.empty(), false);
verifyPayload();
verifyNoStatus();
}
private void verifyPayload() {
ArgumentCaptor<InputStream> captor = ArgumentCaptor.forClass(InputStream.class);
verify(sink).messageRead(captor.capture(), eq(MESSAGE.length()));
assertEquals(MESSAGE, readString(captor.getValue(), MESSAGE.length()));
}
private String readString(InputStream in, int length) {
try {
byte[] bytes = new byte[length];
ByteStreams.readFully(in, bytes);
return new String(bytes, StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void verifyStatus() {
verifyStatus(Status.Code.CANCELLED);
}
private void verifyStatus(Status.Code code) {
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(sink).statusRead(captor.capture());
verify(sink).endOfStream();
assertEquals(code, captor.getValue().getCode());
}
private void verifyNoPayload() {
verify(sink, never()).messageRead(any(InputStream.class), anyInt());
}
private void verifyNoStatus() {
verify(sink, never()).statusRead(any(Status.class));
verify(sink, never()).endOfStream();
}
private static byte[] payloadFrame() throws IOException {
return frame(PAYLOAD_FRAME, MESSAGE_BYTES);
}
private static byte[] statusFrame() throws IOException {
byte[] bytes = new byte[] {0, (byte) STATUS_CODE.getCode().value()};
return frame(STATUS_FRAME, bytes);
}
private static byte[] frame(int frameType, byte[] data) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStream os = bos;
DataOutputStream dos = new DataOutputStream(os);
writeFrame(frameType, data, dos);
dos.close();
return bos.toByteArray();
}
private static void writeFrame(int frameType, byte[] data, DataOutputStream out)
throws IOException {
out.write(frameType);
out.writeInt(data.length);
out.write(data);
}
private static final class StubDecompressor implements Decompressor {
byte[] bytes;
int offset;
void init(byte[] bytes) {
this.bytes = bytes;
this.offset = 0;
}
@Override
public void decompress(Buffer data) {
// Do nothing.
}
@Override
public void close() {
// Do nothing.
}
@Override
@Nullable
public Buffer readBytes(int length) {
length = Math.min(length, bytes.length - offset);
if (length == 0) {
return null;
}
Buffer buffer = Buffers.wrap(bytes, offset, length);
offset += length;
return buffer;
}
}
}

View File

@ -60,8 +60,8 @@ public class MessageFramer2Test {
private static final int TRANSPORT_FRAME_SIZE = 12;
@Mock
private Framer.Sink<List<Byte>> sink;
private Framer.Sink<ByteBuffer> copyingSink;
private MessageFramer2.Sink<List<Byte>> sink;
private MessageFramer2.Sink<ByteBuffer> copyingSink;
private MessageFramer2 framer;
@Captor
@ -191,10 +191,10 @@ public class MessageFramer2Test {
* Since ByteBuffers are reused, this sink copies their value at the time of the call. Converting
* to List<Byte> is convenience.
*/
private static class ByteArrayConverterSink implements Framer.Sink<ByteBuffer> {
private final Framer.Sink<List<Byte>> delegate;
private static class ByteArrayConverterSink implements MessageFramer2.Sink<ByteBuffer> {
private final MessageFramer2.Sink<List<Byte>> delegate;
public ByteArrayConverterSink(Framer.Sink<List<Byte>> delegate) {
public ByteArrayConverterSink(MessageFramer2.Sink<List<Byte>> delegate) {
this.delegate = delegate;
}

View File

@ -1,126 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.net.stubby.transport;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.google.common.primitives.Bytes;
import com.google.net.stubby.GrpcFramingUtil;
import com.google.net.stubby.Status;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
/**
* Tests for {@link MessageFramer}
*/
@RunWith(JUnit4.class)
public class MessageFramerTest {
public static final int TRANSPORT_FRAME_SIZE = 57;
@Test
public void testPayload() throws Exception {
CapturingSink sink = new CapturingSink();
MessageFramer framer = new MessageFramer(sink, TRANSPORT_FRAME_SIZE);
byte[] payload = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
byte[] unframedStream =
Bytes.concat(
new byte[]{GrpcFramingUtil.PAYLOAD_FRAME},
new byte[]{0, 0, 0, (byte) payload.length},
payload);
for (int i = 0; i < 1000; i++) {
framer.writePayload(new ByteArrayInputStream(payload), payload.length);
if ((i + 1) % 13 == 0) {
// Test flushing periodically
framer.flush();
}
}
framer.flush();
assertEquals(sink.deframedStream.length, unframedStream.length * 1000);
for (int i = 0; i < 1000; i++) {
assertArrayEquals(unframedStream,
Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length,
(i + 1) * unframedStream.length));
}
}
@Test
public void testStatus() throws Exception {
CapturingSink sink = new CapturingSink();
MessageFramer framer = new MessageFramer(sink, TRANSPORT_FRAME_SIZE);
byte[] unframedStream = Bytes.concat(
new byte[]{GrpcFramingUtil.STATUS_FRAME},
new byte[]{0, 0, 0, 2}, // Len is 2 bytes
new byte[]{0, 13}); // Internal==13
for (int i = 0; i < 1000; i++) {
framer.writeStatus(Status.INTERNAL);
if ((i + 1) % 13 == 0) {
framer.flush();
}
}
framer.flush();
assertEquals(sink.deframedStream.length, unframedStream.length * 1000);
for (int i = 0; i < 1000; i++) {
assertArrayEquals(unframedStream,
Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length,
(i + 1) * unframedStream.length));
}
}
static class CapturingSink implements Framer.Sink<ByteBuffer> {
byte[] deframedStream = new byte[0];
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfMessage) {
assertTrue(frame.remaining() <= TRANSPORT_FRAME_SIZE);
// Frame must contain compression flag & 24 bit length
int header = frame.getInt();
byte flag = (byte) (header >>> 24);
int length = header & 0xFFFFFF;
assertTrue(TransportFrameUtil.isNotCompressed(flag));
assertEquals(frame.remaining(), length);
// Frame must exceed dictated transport frame size
byte[] frameBytes = new byte[frame.remaining()];
frame.get(frameBytes);
deframedStream = Bytes.concat(deframedStream, frameBytes);
}
}
}

View File

@ -18,5 +18,6 @@ dependencies {
project(':stubby-okhttp'),
project(':stubby-stub'),
project(':stubby-testing'),
libraries.junit
libraries.junit,
libraries.mockito
}

View File

@ -44,6 +44,11 @@
<artifactId>junit</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
<build>

View File

@ -63,13 +63,11 @@ import com.google.net.stubby.testing.integration.Messages.StreamingInputCallRequ
import com.google.net.stubby.testing.integration.Messages.StreamingInputCallResponse;
import com.google.net.stubby.testing.integration.Messages.StreamingOutputCallRequest;
import com.google.net.stubby.testing.integration.Messages.StreamingOutputCallResponse;
import com.google.net.stubby.transport.AbstractStream;
import com.google.protobuf.ByteString;
import com.google.protobuf.EmptyProtos.Empty;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@ -494,7 +492,6 @@ public abstract class AbstractTransportTest {
@org.junit.Test
public void exchangeContextUnaryCall() throws Exception {
Assume.assumeTrue(AbstractStream.GRPC_V2_PROTOCOL);
TestServiceGrpc.TestServiceBlockingStub stub =
TestServiceGrpc.newBlockingStub(channel);
@ -514,14 +511,11 @@ public abstract class AbstractTransportTest {
// Assert that our side channel object is echoed back in both headers and trailers
Assert.assertEquals(contextValue, headersCapture.get().get(METADATA_KEY));
if (AbstractStream.GRPC_V2_PROTOCOL) {
Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY));
}
Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY));
}
@org.junit.Test
public void exchangeContextStreamingCall() throws Exception {
Assume.assumeTrue(AbstractStream.GRPC_V2_PROTOCOL);
TestServiceGrpc.TestServiceStub stub = TestServiceGrpc.newStub(channel);
// Capture the context exchange
@ -560,9 +554,7 @@ public abstract class AbstractTransportTest {
// Assert that our side channel object is echoed back in both headers and trailers
Assert.assertEquals(contextValue, headersCapture.get().get(METADATA_KEY));
if (AbstractStream.GRPC_V2_PROTOCOL) {
Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY));
}
Assert.assertEquals(contextValue, trailersCapture.get().get(METADATA_KEY));
}

View File

@ -112,7 +112,6 @@ public class TestServiceClient {
* --serverHost=The host of the remote server.<br>
* --serverPort=$port_number The port of the remote server.<br>
* --test_case=empty_unary|server_streaming The client test to run.<br>
* --grpc_version=1|2 Use gRPC v2 protocol. Default is 1.
*/
public static void main(String[] args) throws Exception {
Map<String, String> argMap = parseArgs(args);
@ -121,8 +120,11 @@ public class TestServiceClient {
int serverPort = getPort(argMap);
String testCase = getTestCase(argMap);
com.google.net.stubby.transport.AbstractStream.GRPC_V2_PROTOCOL =
getGrpcVersion(argMap) == 2;
// TODO(user): Remove. Ideally stop passing the arg in scripts first.
if (getGrpcVersion(argMap) != 2) {
System.err.println("Only grpc_version=2 is supported");
System.exit(1);
}
final Tester tester = new Tester(transport, serverHost, serverPort);
Runtime.getRuntime().addShutdownHook(new Thread() {

View File

@ -118,15 +118,17 @@ public class TestServiceServer {
* --transport=<HTTP2_NETTY|HTTP2_NETTY_TLS> Identifies the transport
* over which GRPC frames should be sent. <br>
* --port=<port number> The port number for RPC communications.
* --grpc_version=<1|2> Use gRPC v2 protocol. Default is v1.
*/
public static void main(String[] args) throws Exception {
Map<String, String> argMap = parseArgs(args);
Transport transport = getTransport(argMap);
int port = getPort(RPC_PORT_ARG, argMap);
com.google.net.stubby.transport.AbstractStream.GRPC_V2_PROTOCOL =
getGrpcVersion(argMap) == 2;
// TODO(user): Remove. Ideally stop passing the arg in scripts first.
if (getGrpcVersion(argMap) != 2) {
System.err.println("Only grpc_version=2 is supported");
System.exit(1);
}
final TestServiceServer server = new TestServiceServer(transport, port);

View File

@ -51,7 +51,7 @@ class NettyClientStream extends Http2ClientStream {
private final NettyClientHandler handler;
NettyClientStream(ClientStreamListener listener, Channel channel, NettyClientHandler handler) {
super(listener, new NettyDecompressor(channel.alloc()), channel.eventLoop());
super(listener, channel.eventLoop());
this.channel = checkNotNull(channel, "channel");
this.handler = checkNotNull(handler, "handler");
}

View File

@ -1,313 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.net.stubby.transport.netty;
import static com.google.net.stubby.transport.TransportFrameUtil.COMPRESSION_HEADER_LENGTH;
import static com.google.net.stubby.transport.TransportFrameUtil.isFlateCompressed;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.google.net.stubby.transport.Buffer;
import com.google.net.stubby.transport.Decompressor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.CompositeByteBuf;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.zip.InflaterInputStream;
import javax.annotation.Nullable;
/**
* A {@link Decompressor} implementation based on Netty {@link CompositeByteBuf}s.
*/
public class NettyDecompressor implements Decompressor {
private final CompositeByteBuf buffer;
private final ByteBufAllocator alloc;
private Frame frame;
public NettyDecompressor(ByteBufAllocator alloc) {
this.alloc = Preconditions.checkNotNull(alloc, "alloc");
buffer = alloc.compositeBuffer();
}
@Override
public void decompress(Buffer data) {
ByteBuf buf = toByteBuf(data);
// Add it to the compression frame buffer.
buffer.addComponent(buf);
buffer.writerIndex(buffer.writerIndex() + buf.readableBytes());
}
@Override
public Buffer readBytes(final int maxLength) {
Preconditions.checkArgument(maxLength > 0, "maxLength must be > 0");
try {
// Read the next frame if we don't already have one.
if (frame == null) {
frame = nextFrame();
}
ByteBuf byteBuf = null;
if (frame != null) {
// Read as many bytes as we can from the frame.
byteBuf = frame.readBytes(maxLength);
// If we reached the end of the frame, close it.
if (frame.complete()) {
frame.close();
frame = null;
}
}
if (byteBuf == null) {
// No data was available.
return null;
}
return new NettyBuffer(byteBuf);
} finally {
// Discard any component buffers that have been fully read.
buffer.discardReadComponents();
}
}
@Override
public void close() {
// Release the CompositeByteBuf. This will automatically release any components as well.
buffer.release();
if (frame != null) {
frame.close();
}
}
/**
* Returns the next compression frame object, or {@code null} if the next frame header is
* incomplete.
*/
@SuppressWarnings("resource")
@Nullable
private Frame nextFrame() {
if (buffer.readableBytes() < COMPRESSION_HEADER_LENGTH) {
// Don't have all the required bytes for the frame header yet.
return null;
}
// Read the header and create the frame object.
boolean compressed = isFlateCompressed(buffer.readUnsignedByte());
int frameLength = buffer.readUnsignedMedium();
if (frameLength == 0) {
return nextFrame();
}
return compressed ? new CompressedFrame(frameLength) : new UncompressedFrame(frameLength);
}
/**
* Converts the given buffer into a {@link ByteBuf}.
*/
private ByteBuf toByteBuf(Buffer data) {
if (data instanceof NettyBuffer) {
// Just return the contained ByteBuf.
return ((NettyBuffer) data).buffer();
}
// Create a new ByteBuf and copy the content to it.
try {
int length = data.readableBytes();
ByteBuf buf = alloc.buffer(length);
data.readBytes(new ByteBufOutputStream(buf), length);
return buf;
} catch (IOException e) {
throw Throwables.propagate(e);
} finally {
data.close();
}
}
/**
* A wrapper around the body of a compression frame. Provides a generic method for reading bytes
* from any frame.
*/
private interface Frame extends Closeable {
@Nullable
ByteBuf readBytes(int maxLength);
boolean complete();
@Override
void close();
}
/**
* An uncompressed frame. Just writes bytes directly from the compression frame.
*/
private class UncompressedFrame implements Frame {
int bytesRemainingInFrame;
public UncompressedFrame(int frameLength) {
this.bytesRemainingInFrame = frameLength;
}
@Override
@Nullable
public ByteBuf readBytes(int maxLength) {
Preconditions.checkState(!complete(), "Must not call readBytes on a completed frame");
int available = buffer.readableBytes();
if (available == 0) {
return null;
}
int bytesToRead = Math.min(available, Math.min(maxLength, bytesRemainingInFrame));
bytesRemainingInFrame -= bytesToRead;
return buffer.readBytes(bytesToRead);
}
@Override
public boolean complete() {
return bytesRemainingInFrame == 0;
}
@Override
public void close() {
// Do nothing.
}
}
/**
* A compressed frame that inflates the data as it reads from the frame.
*/
private class CompressedFrame implements Frame {
private final InputStream in;
private ByteBuf nextBuf;
public CompressedFrame(int frameLength) {
// Limit the stream by the frameLength.
in = new InflaterInputStream(new GrowableByteBufInputStream(frameLength));
}
@Override
@Nullable
public ByteBuf readBytes(int maxLength) {
// If the pre-existing nextBuf is too small, release it.
if (nextBuf != null && nextBuf.capacity() < maxLength) {
nextBuf.release();
nextBuf = null;
}
if (nextBuf == null) {
nextBuf = alloc.buffer();
}
try {
int bytesToWrite = Math.min(maxLength, nextBuf.writableBytes());
nextBuf.writeBytes(in, bytesToWrite);
} catch (EOFException e) {
// The next compressed block is unavailable at the moment. Nothing to return.
return null;
} catch (IOException e) {
throw new RuntimeException(e);
}
if (!nextBuf.isReadable()) {
throw new AssertionError("Read zero bytes from the compression frame");
}
ByteBuf ret = nextBuf;
nextBuf = null;
return ret;
}
@Override
public boolean complete() {
try {
return in.available() <= 0;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
Closeables.closeQuietly(in);
}
}
/**
* A stream backed by the {@link #buffer}, which allows for additional reading as the buffer
* grows. Not using Netty's stream class since it doesn't handle growth of the underlying buffer.
*/
private class GrowableByteBufInputStream extends InputStream {
final int startIndex;
final int endIndex;
GrowableByteBufInputStream(int length) {
startIndex = buffer.readerIndex();
endIndex = startIndex + length;
}
@Override
public int read() throws IOException {
if (available() == 0) {
return -1;
}
return buffer.readByte() & 0xff;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
int available = available();
if (available == 0) {
return -1;
}
len = Math.min(available, len);
buffer.readBytes(b, off, len);
return len;
}
@Override
public int available() throws IOException {
return Math.min(endIndex - buffer.readerIndex(), buffer.readableBytes());
}
}
}

View File

@ -51,7 +51,7 @@ class NettyServerStream extends AbstractServerStream<Integer> {
private final NettyServerHandler handler;
NettyServerStream(Channel channel, int id, NettyServerHandler handler) {
super(id, new NettyDecompressor(channel.alloc()), channel.eventLoop());
super(id, channel.eventLoop());
this.channel = checkNotNull(channel, "channel");
this.handler = checkNotNull(handler, "handler");
}

View File

@ -32,7 +32,6 @@
package com.google.net.stubby.transport.netty;
import static com.google.net.stubby.transport.netty.NettyTestUtil.messageFrame;
import static com.google.net.stubby.transport.netty.NettyTestUtil.statusFrame;
import static com.google.net.stubby.transport.netty.Utils.CONTENT_TYPE_GRPC;
import static com.google.net.stubby.transport.netty.Utils.CONTENT_TYPE_HEADER;
import static com.google.net.stubby.transport.netty.Utils.STATUS_OK;
@ -56,9 +55,6 @@ import io.netty.handler.codec.AsciiString;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@ -169,7 +165,6 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
@Test
public void inboundTrailersClosesCall() throws Exception {
Assume.assumeTrue(AbstractStream.GRPC_V2_PROTOCOL);
stream().id(1);
stream().transportHeadersReceived(grpcResponseHeaders(), false);
super.inboundMessageShouldCallListener();
@ -183,11 +178,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
// Receive headers first so that it's a valid GRPC response.
stream().transportHeadersReceived(grpcResponseHeaders(), false);
if (AbstractStream.GRPC_V2_PROTOCOL) {
stream().transportHeadersReceived(grpcResponseTrailers(Status.INTERNAL), true);
} else {
stream().transportDataReceived(statusFrame(Status.INTERNAL), false);
}
stream().transportHeadersReceived(grpcResponseTrailers(Status.INTERNAL), true);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener).closed(captor.capture(), any(Metadata.Trailers.class));
assertEquals(Status.INTERNAL.getCode(), captor.getValue().getCode());

View File

@ -1,201 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.google.net.stubby.transport.netty;
import static com.google.common.base.Charsets.UTF_8;
import static com.google.net.stubby.transport.Buffers.readAsStringUtf8;
import static com.google.net.stubby.transport.TransportFrameUtil.COMPRESSION_HEADER_LENGTH;
import static com.google.net.stubby.transport.TransportFrameUtil.FLATE_FLAG;
import static com.google.net.stubby.transport.TransportFrameUtil.NO_COMPRESS_FLAG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import com.google.net.stubby.transport.Buffer;
import com.google.net.stubby.transport.Buffers;
import com.google.net.stubby.transport.CompositeBuffer;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.util.Arrays;
import java.util.zip.DeflaterOutputStream;
/**
* Tests for {@link NettyDecompressor}.
*/
@RunWith(JUnit4.class)
public class NettyDecompressorTest {
private static final String MESSAGE = "hello world";
private NettyDecompressor decompressor;
@Before
public void setup() {
decompressor = new NettyDecompressor(UnpooledByteBufAllocator.DEFAULT);
}
@Test
public void uncompressedDataShouldSucceed() throws Exception {
fullMessageShouldSucceed(false);
}
@Test
public void compressedDataShouldSucceed() throws Exception {
fullMessageShouldSucceed(true);
}
@Test
public void uncompressedFrameShouldNotBeReadableUntilComplete() throws Exception {
byte[] frame = frame(false);
byte[] chunk1 = Arrays.copyOf(frame, 5);
byte[] chunk2 = Arrays.copyOfRange(frame, 5, frame.length);
// Decompress the first chunk and verify it's not readable yet.
decompressor.decompress(Buffers.wrap(chunk1));
CompositeBuffer composite = new CompositeBuffer();
Buffer buffer = decompressor.readBytes(2);
assertEquals(1, buffer.readableBytes());
composite.addBuffer(buffer);
// Decompress the rest of the frame and verify it's readable.
decompressor.decompress(Buffers.wrap(chunk2));
composite.addBuffer(decompressor.readBytes(MESSAGE.length() - 1));
assertEquals(MESSAGE, readAsStringUtf8(composite));
}
@Test
public void compressedFrameShouldNotBeReadableUntilComplete() throws Exception {
byte[] frame = frame(true);
byte[] chunk1 = Arrays.copyOf(frame, 5);
byte[] chunk2 = Arrays.copyOfRange(frame, 5, frame.length);
// Decompress the first chunk and verify it's not readable yet.
decompressor.decompress(Buffers.wrap(chunk1));
Buffer buffer = decompressor.readBytes(2);
assertNull(buffer);
// Decompress the rest of the frame and verify it's readable.
decompressor.decompress(Buffers.wrap(chunk2));
CompositeBuffer composite = new CompositeBuffer();
for(int remaining = MESSAGE.length(); remaining > 0; ) {
Buffer buf = decompressor.readBytes(remaining);
if (buf == null) {
break;
}
composite.addBuffer(buf);
remaining -= buf.readableBytes();
}
assertEquals(MESSAGE, readAsStringUtf8(composite));
}
@Test
public void nettyBufferShouldBeReleasedAfterRead() throws Exception {
byte[] frame = frame(false);
byte[] chunk1 = Arrays.copyOf(frame, 5);
byte[] chunk2 = Arrays.copyOfRange(frame, 5, frame.length);
NettyBuffer buffer1 = new NettyBuffer(Unpooled.wrappedBuffer(chunk1));
NettyBuffer buffer2 = new NettyBuffer(Unpooled.wrappedBuffer(chunk2));
// CompositeByteBuf always keeps at least one buffer internally, so we add a second so
// that it will release the first after it is read.
decompressor.decompress(buffer1);
decompressor.decompress(buffer2);
NettyBuffer readBuffer = (NettyBuffer) decompressor.readBytes(buffer1.readableBytes());
assertEquals(0, buffer1.buffer().refCnt());
assertEquals(1, readBuffer.buffer().refCnt());
}
@Test
public void closeShouldReleasedBuffers() throws Exception {
byte[] frame = frame(false);
byte[] chunk1 = Arrays.copyOf(frame, 5);
NettyBuffer buffer1 = new NettyBuffer(Unpooled.wrappedBuffer(chunk1));
decompressor.decompress(buffer1);
assertEquals(1, buffer1.buffer().refCnt());
decompressor.close();
assertEquals(0, buffer1.buffer().refCnt());
}
private void fullMessageShouldSucceed(boolean compress) throws Exception {
// Decompress the entire frame all at once.
byte[] frame = frame(compress);
decompressor.decompress(Buffers.wrap(frame));
// Read some bytes and verify.
int chunkSize = MESSAGE.length() / 2;
assertEquals(MESSAGE.substring(0, chunkSize),
readAsStringUtf8(decompressor.readBytes(chunkSize)));
// Read the rest and verify.
assertEquals(MESSAGE.substring(chunkSize),
readAsStringUtf8(decompressor.readBytes(MESSAGE.length() - chunkSize)));
}
/**
* Creates a compression frame from {@link #MESSAGE}, applying compression if requested.
*/
private byte[] frame(boolean compress) throws Exception {
byte[] msgBytes = bytes(MESSAGE);
if (compress) {
msgBytes = compress(msgBytes);
}
ByteArrayOutputStream os =
new ByteArrayOutputStream(msgBytes.length + COMPRESSION_HEADER_LENGTH);
DataOutputStream dos = new DataOutputStream(os);
int frameFlag = compress ? FLATE_FLAG : NO_COMPRESS_FLAG;
// Header = 1b flag | 3b length of GRPC frame
int header = (frameFlag << 24) | msgBytes.length;
dos.writeInt(header);
dos.write(msgBytes);
dos.close();
return os.toByteArray();
}
private byte[] bytes(String str) {
return str.getBytes(UTF_8);
}
private byte[] compress(byte[] data) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DeflaterOutputStream dos = new DeflaterOutputStream(out);
dos.write(data);
dos.close();
return out.toByteArray();
}
}

View File

@ -53,7 +53,6 @@ import com.google.common.io.ByteStreams;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
import com.google.net.stubby.Status.Code;
import com.google.net.stubby.transport.Framer;
import com.google.net.stubby.transport.MessageFramer2;
import com.google.net.stubby.transport.ServerStream;
import com.google.net.stubby.transport.ServerStreamListener;
@ -272,7 +271,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
private ByteBuf dataFrame(int streamId, boolean endStream) {
final ByteBuf compressionFrame = Unpooled.buffer(CONTENT.length);
MessageFramer2 framer = new MessageFramer2(new Framer.Sink<ByteBuffer>() {
MessageFramer2 framer = new MessageFramer2(new MessageFramer2.Sink<ByteBuffer>() {
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfStream) {
compressionFrame.writeBytes(frame);

View File

@ -31,13 +31,11 @@
package com.google.net.stubby.transport.netty;
import static com.google.net.stubby.GrpcFramingUtil.PAYLOAD_FRAME;
import static com.google.net.stubby.GrpcFramingUtil.STATUS_FRAME;
import static io.netty.util.CharsetUtil.UTF_8;
import com.google.common.io.ByteStreams;
import com.google.net.stubby.Status;
import com.google.net.stubby.transport.AbstractStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@ -60,10 +58,6 @@ public class NettyTestUtil {
static ByteBuf messageFrame(String message) throws Exception {
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(os);
if (!AbstractStream.GRPC_V2_PROTOCOL) {
dos.write(PAYLOAD_FRAME);
dos.writeInt(message.length());
}
dos.write(message.getBytes(UTF_8));
dos.close();
@ -86,9 +80,7 @@ public class NettyTestUtil {
static ByteBuf compressionFrame(byte[] data) {
ByteBuf buf = Unpooled.buffer();
if (AbstractStream.GRPC_V2_PROTOCOL) {
buf.writeByte(0);
}
buf.writeByte(0);
buf.writeInt(data.length);
buf.writeBytes(data);
return buf;

View File

@ -98,10 +98,7 @@ class OkHttpClientStream extends Http2ClientStream {
OkHttpClientTransport transport,
Object executorLock,
OutboundFlowController outboundFlow) {
super(listener, null, executor);
if (!GRPC_V2_PROTOCOL) {
throw new RuntimeException("okhttp transport can only work with V2 protocol!");
}
super(listener, executor);
this.frameWriter = frameWriter;
this.transport = transport;
this.executorLock = executorLock;