Add WritableBuffer interface for zero copy data writes. Fixes #8

WritableBuffer is a generic interface that allows to transfer data
from gRPC directly to the native transport's buffer implementation.
This commit is contained in:
Jakob Buchgraber 2015-03-11 15:07:44 -07:00
parent ee19f066c4
commit 0076243063
43 changed files with 1167 additions and 272 deletions

View File

@ -33,9 +33,6 @@ package io.grpc;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service.Listener;
import com.google.common.util.concurrent.Service.State;
import io.grpc.transport.ClientStream; import io.grpc.transport.ClientStream;
import io.grpc.transport.ClientStreamListener; import io.grpc.transport.ClientStreamListener;
@ -48,7 +45,6 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;

View File

@ -38,7 +38,6 @@ import io.grpc.Metadata;
import io.grpc.Status; import io.grpc.Status;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -67,7 +66,9 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
* *
* @param listener the listener to receive notifications * @param listener the listener to receive notifications
*/ */
protected AbstractClientStream(ClientStreamListener listener) { protected AbstractClientStream(WritableBufferAllocator bufferAllocator,
ClientStreamListener listener) {
super(bufferAllocator);
this.listener = Preconditions.checkNotNull(listener); this.listener = Preconditions.checkNotNull(listener);
} }
@ -122,7 +123,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
* *
* @param frame the received data frame. Its ownership is transferred to this method. * @param frame the received data frame. Its ownership is transferred to this method.
*/ */
protected void inboundDataReceived(Buffer frame) { protected void inboundDataReceived(ReadableBuffer frame) {
Preconditions.checkNotNull(frame, "frame"); Preconditions.checkNotNull(frame, "frame");
boolean needToCloseFrame = true; boolean needToCloseFrame = true;
try { try {
@ -173,7 +174,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
// remoteEndClosed // remoteEndClosed
this.status = status; this.status = status;
this.trailers = trailers; this.trailers = trailers;
deframe(Buffers.empty(), true); deframe(ReadableBuffers.empty(), true);
} }
@Override @Override
@ -182,7 +183,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
} }
@Override @Override
protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) { protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream) {
sendFrame(frame, endOfStream); sendFrame(frame, endOfStream);
} }
@ -193,7 +194,7 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint. * this endpoint.
*/ */
protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream); protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream);
/** /**
* Report stream closure with status to the application layer if not already reported. This method * Report stream closure with status to the application layer if not already reported. This method

View File

@ -32,9 +32,9 @@
package io.grpc.transport; package io.grpc.transport;
/** /**
* Abstract base class for {@link Buffer} implementations. * Abstract base class for {@link ReadableBuffer} implementations.
*/ */
public abstract class AbstractBuffer implements Buffer { public abstract class AbstractReadableBuffer implements ReadableBuffer {
@Override @Override
public final int readUnsignedMedium() { public final int readUnsignedMedium() {

View File

@ -37,7 +37,6 @@ import io.grpc.Metadata;
import io.grpc.Status; import io.grpc.Status;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -62,7 +61,8 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
/** Saved trailers from close() that need to be sent once the framer has sent all messages. */ /** Saved trailers from close() that need to be sent once the framer has sent all messages. */
private Metadata.Trailers stashedTrailers; private Metadata.Trailers stashedTrailers;
protected AbstractServerStream(IdT id) { protected AbstractServerStream(WritableBufferAllocator bufferAllocator, IdT id) {
super(bufferAllocator);
id(id); id(id);
} }
@ -125,7 +125,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
* be retained. * be retained.
* @param endOfStream {@code true} if no more data will be received on the stream. * @param endOfStream {@code true} if no more data will be received on the stream.
*/ */
public void inboundDataReceived(Buffer frame, boolean endOfStream) { public void inboundDataReceived(ReadableBuffer frame, boolean endOfStream) {
if (inboundPhase() == Phase.STATUS) { if (inboundPhase() == Phase.STATUS) {
frame.close(); frame.close();
return; return;
@ -142,8 +142,8 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
} }
@Override @Override
protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) { protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream) {
if (frame.hasRemaining()) { if (frame.readableBytes() > 0) {
sendFrame(frame, false); sendFrame(frame, false);
} }
if (endOfStream) { if (endOfStream) {
@ -167,7 +167,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint. * this endpoint.
*/ */
protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream); protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream);
/** /**
* Sends trailers to the remote end point. This call implies end of stream. * Sends trailers to the remote end point. This call implies end of stream.

View File

@ -36,7 +36,6 @@ import com.google.common.base.Objects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -65,7 +64,7 @@ public abstract class AbstractStream<IdT> implements Stream {
*/ */
private Phase outboundPhase = Phase.HEADERS; private Phase outboundPhase = Phase.HEADERS;
AbstractStream() { AbstractStream(WritableBufferAllocator bufferAllocator) {
MessageDeframer.Listener inboundMessageHandler = new MessageDeframer.Listener() { MessageDeframer.Listener inboundMessageHandler = new MessageDeframer.Listener() {
@Override @Override
public void bytesRead(int numBytes) { public void bytesRead(int numBytes) {
@ -87,14 +86,14 @@ public abstract class AbstractStream<IdT> implements Stream {
remoteEndClosed(); remoteEndClosed();
} }
}; };
MessageFramer.Sink<ByteBuffer> outboundFrameHandler = new MessageFramer.Sink<ByteBuffer>() { MessageFramer.Sink outboundFrameHandler = new MessageFramer.Sink() {
@Override @Override
public void deliverFrame(ByteBuffer frame, boolean endOfStream) { public void deliverFrame(WritableBuffer frame, boolean endOfStream) {
internalSendFrame(frame, endOfStream); internalSendFrame(frame, endOfStream);
} }
}; };
framer = new MessageFramer(outboundFrameHandler, 4096); framer = new MessageFramer(outboundFrameHandler, bufferAllocator, 4096);
this.deframer = new MessageDeframer(inboundMessageHandler); this.deframer = new MessageDeframer(inboundMessageHandler);
} }
@ -168,7 +167,7 @@ public abstract class AbstractStream<IdT> implements Stream {
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint. * this endpoint.
*/ */
protected abstract void internalSendFrame(ByteBuffer frame, boolean endOfStream); protected abstract void internalSendFrame(WritableBuffer frame, boolean endOfStream);
/** /**
* Handles a message that was just deframed. * Handles a message that was just deframed.
@ -194,7 +193,7 @@ public abstract class AbstractStream<IdT> implements Stream {
protected abstract void returnProcessedBytes(int processedBytes); protected abstract void returnProcessedBytes(int processedBytes);
/** /**
* Called when a {@link #deframe(Buffer, boolean)} operation failed. * Called when a {@link #deframe(ReadableBuffer, boolean)} operation failed.
* *
* @param cause the actual failure * @param cause the actual failure
*/ */
@ -212,7 +211,7 @@ public abstract class AbstractStream<IdT> implements Stream {
* Called to parse a received frame and attempt delivery of any completed * Called to parse a received frame and attempt delivery of any completed
* messages. Must be called from the transport thread. * messages. Must be called from the transport thread.
*/ */
protected final void deframe(Buffer frame, boolean endOfStream) { protected final void deframe(ReadableBuffer frame, boolean endOfStream) {
try { try {
deframer.deframe(frame, endOfStream); deframer.deframe(frame, endOfStream);
} catch (Throwable t) { } catch (Throwable t) {

View File

@ -38,34 +38,34 @@ import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
/** /**
* A {@link Buffer} that is composed of 0 or more {@link Buffer}s. This provides a facade that * A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a facade that
* allows multiple buffers to be treated as one. * allows multiple buffers to be treated as one.
* *
* <p>When a buffer is added to a composite, its life cycle is controlled by the composite. Once * <p>When a buffer is added to a composite, its life cycle is controlled by the composite. Once
* the composite has read past the end of a given buffer, that buffer is automatically closed and * the composite has read past the end of a given buffer, that buffer is automatically closed and
* removed from the composite. * removed from the composite.
*/ */
public class CompositeBuffer extends AbstractBuffer { public class CompositeReadableBuffer extends AbstractReadableBuffer {
private int readableBytes; private int readableBytes;
private final Queue<Buffer> buffers = new ArrayDeque<Buffer>(); private final Queue<ReadableBuffer> buffers = new ArrayDeque<ReadableBuffer>();
/** /**
* Adds a new {@link Buffer} at the end of the buffer list. After a buffer is added, it is * Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is
* expected that this {@code CompositeBuffer} has complete ownership. Any attempt to modify the * expected that this {@code CompositeBuffer} has complete ownership. Any attempt to modify the
* buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of * buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of
* this {@code CompositeBuffer}. * this {@code CompositeBuffer}.
*/ */
public void addBuffer(Buffer buffer) { public void addBuffer(ReadableBuffer buffer) {
if (!(buffer instanceof CompositeBuffer)) { if (!(buffer instanceof CompositeReadableBuffer)) {
buffers.add(buffer); buffers.add(buffer);
readableBytes += buffer.readableBytes(); readableBytes += buffer.readableBytes();
return; return;
} }
CompositeBuffer compositeBuffer = (CompositeBuffer) buffer; CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer;
while (!compositeBuffer.buffers.isEmpty()) { while (!compositeBuffer.buffers.isEmpty()) {
Buffer subBuffer = compositeBuffer.buffers.remove(); ReadableBuffer subBuffer = compositeBuffer.buffers.remove();
buffers.add(subBuffer); buffers.add(subBuffer);
} }
readableBytes += compositeBuffer.readableBytes; readableBytes += compositeBuffer.readableBytes;
@ -82,7 +82,7 @@ public class CompositeBuffer extends AbstractBuffer {
public int readUnsignedByte() { public int readUnsignedByte() {
ReadOperation op = new ReadOperation() { ReadOperation op = new ReadOperation() {
@Override @Override
int readInternal(Buffer buffer, int length) { int readInternal(ReadableBuffer buffer, int length) {
return buffer.readUnsignedByte(); return buffer.readUnsignedByte();
} }
}; };
@ -94,7 +94,7 @@ public class CompositeBuffer extends AbstractBuffer {
public void skipBytes(int length) { public void skipBytes(int length) {
execute(new ReadOperation() { execute(new ReadOperation() {
@Override @Override
public int readInternal(Buffer buffer, int length) { public int readInternal(ReadableBuffer buffer, int length) {
buffer.skipBytes(length); buffer.skipBytes(length);
return 0; return 0;
} }
@ -106,7 +106,7 @@ public class CompositeBuffer extends AbstractBuffer {
execute(new ReadOperation() { execute(new ReadOperation() {
int currentOffset = destOffset; int currentOffset = destOffset;
@Override @Override
public int readInternal(Buffer buffer, int length) { public int readInternal(ReadableBuffer buffer, int length) {
buffer.readBytes(dest, currentOffset, length); buffer.readBytes(dest, currentOffset, length);
currentOffset += length; currentOffset += length;
return 0; return 0;
@ -118,7 +118,7 @@ public class CompositeBuffer extends AbstractBuffer {
public void readBytes(final ByteBuffer dest) { public void readBytes(final ByteBuffer dest) {
execute(new ReadOperation() { execute(new ReadOperation() {
@Override @Override
public int readInternal(Buffer buffer, int length) { public int readInternal(ReadableBuffer buffer, int length) {
// Change the limit so that only lengthToCopy bytes are available. // Change the limit so that only lengthToCopy bytes are available.
int prevLimit = dest.limit(); int prevLimit = dest.limit();
dest.limit(dest.position() + length); dest.limit(dest.position() + length);
@ -135,7 +135,7 @@ public class CompositeBuffer extends AbstractBuffer {
public void readBytes(final OutputStream dest, int length) throws IOException { public void readBytes(final OutputStream dest, int length) throws IOException {
ReadOperation op = new ReadOperation() { ReadOperation op = new ReadOperation() {
@Override @Override
public int readInternal(Buffer buffer, int length) throws IOException { public int readInternal(ReadableBuffer buffer, int length) throws IOException {
buffer.readBytes(dest, length); buffer.readBytes(dest, length);
return 0; return 0;
} }
@ -149,13 +149,13 @@ public class CompositeBuffer extends AbstractBuffer {
} }
@Override @Override
public CompositeBuffer readBytes(int length) { public CompositeReadableBuffer readBytes(int length) {
checkReadable(length); checkReadable(length);
readableBytes -= length; readableBytes -= length;
CompositeBuffer newBuffer = new CompositeBuffer(); CompositeReadableBuffer newBuffer = new CompositeReadableBuffer();
while (length > 0) { while (length > 0) {
Buffer buffer = buffers.peek(); ReadableBuffer buffer = buffers.peek();
if (buffer.readableBytes() > length) { if (buffer.readableBytes() > length) {
newBuffer.addBuffer(buffer.readBytes(length)); newBuffer.addBuffer(buffer.readBytes(length));
length = 0; length = 0;
@ -175,14 +175,14 @@ public class CompositeBuffer extends AbstractBuffer {
} }
/** /**
* Executes the given {@link ReadOperation} against the {@link Buffer}s required to satisfy the * Executes the given {@link ReadOperation} against the {@link ReadableBuffer}s required to satisfy the
* requested {@code length}. * requested {@code length}.
*/ */
private void execute(ReadOperation op, int length) { private void execute(ReadOperation op, int length) {
checkReadable(length); checkReadable(length);
for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) { for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) {
Buffer buffer = buffers.peek(); ReadableBuffer buffer = buffers.peek();
int lengthToCopy = Math.min(length, buffer.readableBytes()); int lengthToCopy = Math.min(length, buffer.readableBytes());
// Perform the read operation for this buffer. // Perform the read operation for this buffer.
@ -205,28 +205,28 @@ public class CompositeBuffer extends AbstractBuffer {
* If the current buffer is exhausted, removes and closes it. * If the current buffer is exhausted, removes and closes it.
*/ */
private void advanceBufferIfNecessary() { private void advanceBufferIfNecessary() {
Buffer buffer = buffers.peek(); ReadableBuffer buffer = buffers.peek();
if (buffer.readableBytes() == 0) { if (buffer.readableBytes() == 0) {
buffers.remove().close(); buffers.remove().close();
} }
} }
/** /**
* A simple read operation to perform on a single {@link Buffer}. All state management for the * A simple read operation to perform on a single {@link ReadableBuffer}. All state management for the
* buffers is done by {@link CompositeBuffer#execute(ReadOperation, int)}. * buffers is done by {@link CompositeReadableBuffer#execute(ReadOperation, int)}.
*/ */
private abstract class ReadOperation { private abstract class ReadOperation {
/** /**
* Only used by {@link CompositeBuffer#readUnsignedByte()}. * Only used by {@link CompositeReadableBuffer#readUnsignedByte()}.
*/ */
int value; int value;
/** /**
* Only used by {@link CompositeBuffer#readBytes(OutputStream, int)}; * Only used by {@link CompositeReadableBuffer#readBytes(OutputStream, int)};
*/ */
IOException ex; IOException ex;
final void read(Buffer buffer, int length) { final void read(ReadableBuffer buffer, int length) {
try { try {
value = readInternal(buffer, length); value = readInternal(buffer, length);
} catch (IOException e) { } catch (IOException e) {
@ -238,6 +238,6 @@ public class CompositeBuffer extends AbstractBuffer {
return ex != null; return ex != null;
} }
abstract int readInternal(Buffer buffer, int length) throws IOException; abstract int readInternal(ReadableBuffer buffer, int length) throws IOException;
} }
} }

View File

@ -38,21 +38,21 @@ import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/** /**
* Base class for a wrapper around another {@link Buffer}. * Base class for a wrapper around another {@link ReadableBuffer}.
* *
* <p>This class just passes every operation through to the underlying buffer. Subclasses may * <p>This class just passes every operation through to the underlying buffer. Subclasses may
* override methods to intercept concertain operations. * override methods to intercept concertain operations.
*/ */
public abstract class ForwardingBuffer implements Buffer { public abstract class ForwardingReadableBuffer implements ReadableBuffer {
private final Buffer buf; private final ReadableBuffer buf;
/** /**
* Constructor. * Constructor.
* *
* @param buf the underlying buffer * @param buf the underlying buffer
*/ */
public ForwardingBuffer(Buffer buf) { public ForwardingReadableBuffer(ReadableBuffer buf) {
this.buf = Preconditions.checkNotNull(buf, "buf"); this.buf = Preconditions.checkNotNull(buf, "buf");
} }
@ -102,7 +102,7 @@ public abstract class ForwardingBuffer implements Buffer {
} }
@Override @Override
public Buffer readBytes(int length) { public ReadableBuffer readBytes(int length) {
return buf.readBytes(length); return buf.readBytes(length);
} }

View File

@ -70,8 +70,9 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
private Charset errorCharset = Charsets.UTF_8; private Charset errorCharset = Charsets.UTF_8;
private boolean contentTypeChecked; private boolean contentTypeChecked;
protected Http2ClientStream(ClientStreamListener listener) { protected Http2ClientStream(WritableBufferAllocator bufferAllocator,
super(listener); ClientStreamListener listener) {
super(bufferAllocator, listener);
} }
/** /**
@ -112,7 +113,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
* @param frame the received data frame * @param frame the received data frame
* @param endOfStream {@code true} if there will be no more data received for this stream * @param endOfStream {@code true} if there will be no more data received for this stream
*/ */
protected void transportDataReceived(Buffer frame, boolean endOfStream) { protected void transportDataReceived(ReadableBuffer frame, boolean endOfStream) {
if (transportError == null && inboundPhase() == Phase.HEADERS) { if (transportError == null && inboundPhase() == Phase.HEADERS) {
// Must receive headers prior to receiving any payload as we use headers to check for // Must receive headers prior to receiving any payload as we use headers to check for
// protocol correctness. // protocol correctness.
@ -122,7 +123,7 @@ public abstract class Http2ClientStream extends AbstractClientStream<Integer> {
// We've already detected a transport error and now we're just accumulating more detail // We've already detected a transport error and now we're just accumulating more detail
// for it. // for it.
transportError = transportError.augmentDescription("DATA-----------------------------\n" + transportError = transportError.augmentDescription("DATA-----------------------------\n" +
Buffers.readAsString(frame, errorCharset)); ReadableBuffers.readAsString(frame, errorCharset));
frame.close(); frame.close();
if (transportError.getDescription().length() > 1000 || endOfStream) { if (transportError.getDescription().length() > 1000 || endOfStream) {
inboundTransportError(transportError); inboundTransportError(transportError);

View File

@ -99,8 +99,8 @@ public class MessageDeframer implements Closeable {
private int requiredLength = HEADER_LENGTH; private int requiredLength = HEADER_LENGTH;
private boolean compressedFlag; private boolean compressedFlag;
private boolean endOfStream; private boolean endOfStream;
private CompositeBuffer nextFrame; private CompositeReadableBuffer nextFrame;
private CompositeBuffer unprocessed = new CompositeBuffer(); private CompositeReadableBuffer unprocessed = new CompositeReadableBuffer();
private long pendingDeliveries; private long pendingDeliveries;
private boolean deliveryStalled = true; private boolean deliveryStalled = true;
@ -149,10 +149,10 @@ public class MessageDeframer implements Closeable {
* @param endOfStream if {@code true}, indicates that {@code data} is the end of the stream from * @param endOfStream if {@code true}, indicates that {@code data} is the end of the stream from
* the remote endpoint. * the remote endpoint.
* @throws IllegalStateException if {@link #close()} has been called previously or if * @throws IllegalStateException if {@link #close()} has been called previously or if
* {@link #deframe(Buffer, boolean)} has previously been called with * {@link #deframe(ReadableBuffer, boolean)} has previously been called with
* {@code endOfStream=true}. * {@code endOfStream=true}.
*/ */
public void deframe(Buffer data, boolean endOfStream) { public void deframe(ReadableBuffer data, boolean endOfStream) {
Preconditions.checkNotNull(data, "data"); Preconditions.checkNotNull(data, "data");
boolean needToCloseData = true; boolean needToCloseData = true;
try { try {
@ -275,7 +275,7 @@ public class MessageDeframer implements Closeable {
int totalBytesRead = 0; int totalBytesRead = 0;
try { try {
if (nextFrame == null) { if (nextFrame == null) {
nextFrame = new CompositeBuffer(); nextFrame = new CompositeReadableBuffer();
} }
// Read until the buffer contains all the required bytes. // Read until the buffer contains all the required bytes.
@ -331,7 +331,7 @@ public class MessageDeframer implements Closeable {
} }
private InputStream getUncompressedBody() { private InputStream getUncompressedBody() {
return Buffers.openStream(nextFrame, true); return ReadableBuffers.openStream(nextFrame, true);
} }
private InputStream getCompressedBody() { private InputStream getCompressedBody() {
@ -345,7 +345,7 @@ public class MessageDeframer implements Closeable {
} }
try { try {
return new GZIPInputStream(Buffers.openStream(nextFrame, true)); return new GZIPInputStream(ReadableBuffers.openStream(nextFrame, true));
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@ -31,6 +31,8 @@
package io.grpc.transport; package io.grpc.transport;
import static java.lang.Math.min;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
@ -52,14 +54,14 @@ public class MessageFramer {
/** /**
* Sink implemented by the transport layer to receive frames and forward them to their destination * Sink implemented by the transport layer to receive frames and forward them to their destination
*/ */
public interface Sink<T> { public interface Sink {
/** /**
* Delivers a frame via the transport. * Delivers a frame via the transport.
* *
* @param frame the contents of the frame to deliver * @param frame the contents of the frame to deliver
* @param endOfStream whether the frame is the last one for the GRPC stream * @param endOfStream whether the frame is the last one for the GRPC stream
*/ */
public void deliverFrame(T frame, boolean endOfStream); void deliverFrame(WritableBuffer frame, boolean endOfStream);
} }
private static final int HEADER_LENGTH = 5; private static final int HEADER_LENGTH = 5;
@ -70,11 +72,14 @@ public class MessageFramer {
NONE, GZIP; NONE, GZIP;
} }
private final Sink<ByteBuffer> sink; private final Sink sink;
private ByteBuffer bytebuf; private WritableBuffer buffer;
private final Compression compression; private final Compression compression;
private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter(); private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter();
private final byte[] headerScratch = new byte[HEADER_LENGTH]; private final byte[] headerScratch = new byte[HEADER_LENGTH];
private final WritableBufferAllocator bufferAllocator;
private final int maxFrameSize;
private boolean closed;
/** /**
* Creates a {@code MessageFramer} without compression. * Creates a {@code MessageFramer} without compression.
@ -82,8 +87,8 @@ public class MessageFramer {
* @param sink the sink used to deliver frames to the transport * @param sink the sink used to deliver frames to the transport
* @param maxFrameSize the maximum frame size that this framer will deliver * @param maxFrameSize the maximum frame size that this framer will deliver
*/ */
public MessageFramer(Sink<ByteBuffer> sink, int maxFrameSize) { public MessageFramer(Sink sink, WritableBufferAllocator bufferAllocator, int maxFrameSize) {
this(sink, maxFrameSize, Compression.NONE); this(sink, bufferAllocator, maxFrameSize, Compression.NONE);
} }
/** /**
@ -93,9 +98,10 @@ public class MessageFramer {
* @param maxFrameSize the maximum frame size that this framer will deliver * @param maxFrameSize the maximum frame size that this framer will deliver
* @param compression the compression type * @param compression the compression type
*/ */
public MessageFramer(Sink<ByteBuffer> sink, int maxFrameSize, Compression compression) { public MessageFramer(Sink sink, WritableBufferAllocator bufferAllocator, int maxFrameSize, Compression compression) {
this.sink = Preconditions.checkNotNull(sink, "sink"); this.sink = Preconditions.checkNotNull(sink, "sink");
this.bytebuf = ByteBuffer.allocate(maxFrameSize); this.bufferAllocator = bufferAllocator;
this.maxFrameSize = maxFrameSize;
this.compression = Preconditions.checkNotNull(compression, "compression"); this.compression = Preconditions.checkNotNull(compression, "compression");
} }
@ -108,17 +114,19 @@ public class MessageFramer {
*/ */
public void writePayload(InputStream message, int messageLength) { public void writePayload(InputStream message, int messageLength) {
try { try {
if (compression == Compression.NONE) { switch(compression) {
case NONE:
writeFrame(message, messageLength, false); writeFrame(message, messageLength, false);
} else if (compression != Compression.GZIP) { break;
throw new AssertionError("Unknown compression type"); case GZIP:
} else {
// compression == GZIP
DirectAccessByteArrayOutputStream out = new DirectAccessByteArrayOutputStream(); DirectAccessByteArrayOutputStream out = new DirectAccessByteArrayOutputStream();
gzipCompressTo(message, messageLength, out); gzipCompressTo(message, messageLength, out);
InputStream compressedMessage = InputStream compressedMessage =
new DeferredByteArrayInputStream(out.getBuf(), 0, out.getCount()); new DeferredByteArrayInputStream(out.getBuf(), 0, out.getCount());
writeFrame(compressedMessage, out.getCount(), true); writeFrame(compressedMessage, out.getCount(), true);
break;
default:
throw new AssertionError("Unknown compression type");
} }
} catch (IOException ex) { } catch (IOException ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);
@ -167,11 +175,14 @@ public class MessageFramer {
private void writeRaw(byte[] b, int off, int len) { private void writeRaw(byte[] b, int off, int len) {
while (len > 0) { while (len > 0) {
if (bytebuf.remaining() == 0) { if (buffer != null && buffer.writableBytes() == 0) {
commitToSink(false); commitToSink(false);
} }
int toWrite = Math.min(len, bytebuf.remaining()); if (buffer == null) {
bytebuf.put(b, off, toWrite); buffer = bufferAllocator.allocate(maxFrameSize);
}
int toWrite = min(len, buffer.writableBytes());
buffer.write(b, off, toWrite);
off += toWrite; off += toWrite;
len -= toWrite; len -= toWrite;
} }
@ -181,18 +192,17 @@ public class MessageFramer {
* Flushes any buffered data in the framer to the sink. * Flushes any buffered data in the framer to the sink.
*/ */
public void flush() { public void flush() {
if (bytebuf.position() == 0) { if (buffer != null && buffer.readableBytes() > 0) {
return;
}
commitToSink(false); commitToSink(false);
} }
}
/** /**
* Indicates whether or not this framer has been closed via a call to either * Indicates whether or not this framer has been closed via a call to either
* {@link #close()} or {@link #dispose()}. * {@link #close()} or {@link #dispose()}.
*/ */
public boolean isClosed() { public boolean isClosed() {
return bytebuf == null; return closed;
} }
/** /**
@ -202,7 +212,7 @@ public class MessageFramer {
public void close() { public void close() {
if (!isClosed()) { if (!isClosed()) {
commitToSink(true); commitToSink(true);
dispose(); closed = true;
} }
} }
@ -211,14 +221,19 @@ public class MessageFramer {
* closed or disposed, additional calls to this method will have no affect. * closed or disposed, additional calls to this method will have no affect.
*/ */
public void dispose() { public void dispose() {
// TODO(louiscryan): Returning buffer to a pool would go here closed = true;
bytebuf = null; if (buffer != null) {
buffer.release();
buffer = null;
}
} }
private void commitToSink(boolean endOfStream) { private void commitToSink(boolean endOfStream) {
bytebuf.flip(); if (buffer == null) {
sink.deliverFrame(bytebuf, endOfStream); buffer = bufferAllocator.allocate(0);
bytebuf.clear(); }
sink.deliverFrame(buffer, endOfStream);
buffer = null;
} }
private void verifyNotClosed() { private void verifyNotClosed() {

View File

@ -44,7 +44,7 @@ import java.nio.ByteBuffer;
* done in {@link ByteBuffer}. It is not expected that callers will attempt to modify the backing * done in {@link ByteBuffer}. It is not expected that callers will attempt to modify the backing
* array. * array.
*/ */
public interface Buffer extends Closeable { public interface ReadableBuffer extends Closeable {
/** /**
* Gets the current number of readable bytes remaining in this buffer. * Gets the current number of readable bytes remaining in this buffer.
@ -131,7 +131,7 @@ public interface Buffer extends Closeable {
* @param length the number of bytes to contain in returned Buffer. * @param length the number of bytes to contain in returned Buffer.
* @throws IndexOutOfBoundsException if required bytes are not readable * @throws IndexOutOfBoundsException if required bytes are not readable
*/ */
Buffer readBytes(int length); ReadableBuffer readBytes(int length);
/** /**
* Indicates whether or not this buffer exposes a backing array. * Indicates whether or not this buffer exposes a backing array.

View File

@ -42,49 +42,49 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
/** /**
* Utility methods for creating {@link Buffer} instances. * Utility methods for creating {@link ReadableBuffer} instances.
*/ */
public final class Buffers { public final class ReadableBuffers {
private static final Buffer EMPTY_BUFFER = new ByteArrayWrapper(new byte[0]); private static final ReadableBuffer EMPTY_BUFFER = new ByteArrayWrapper(new byte[0]);
/** /**
* Returns an empty {@link Buffer} instance. * Returns an empty {@link ReadableBuffer} instance.
*/ */
public static Buffer empty() { public static ReadableBuffer empty() {
return EMPTY_BUFFER; return EMPTY_BUFFER;
} }
/** /**
* Shortcut for {@code wrap(bytes, 0, bytes.length}. * Shortcut for {@code wrap(bytes, 0, bytes.length}.
*/ */
public static Buffer wrap(byte[] bytes) { public static ReadableBuffer wrap(byte[] bytes) {
return new ByteArrayWrapper(bytes, 0, bytes.length); return new ByteArrayWrapper(bytes, 0, bytes.length);
} }
/** /**
* Creates a new {@link Buffer} that is backed by the given byte array. * Creates a new {@link ReadableBuffer} that is backed by the given byte array.
* *
* @param bytes the byte array being wrapped. * @param bytes the byte array being wrapped.
* @param offset the starting offset for the buffer within the byte array. * @param offset the starting offset for the buffer within the byte array.
* @param length the length of the buffer from the {@code offset} index. * @param length the length of the buffer from the {@code offset} index.
*/ */
public static Buffer wrap(byte[] bytes, int offset, int length) { public static ReadableBuffer wrap(byte[] bytes, int offset, int length) {
return new ByteArrayWrapper(bytes, offset, length); return new ByteArrayWrapper(bytes, offset, length);
} }
/** /**
* Creates a new {@link Buffer} that is backed by the given {@link ByteBuffer}. Calls to read from * Creates a new {@link ReadableBuffer} that is backed by the given {@link ByteBuffer}. Calls to read from
* the buffer will increment the position of the {@link ByteBuffer}. * the buffer will increment the position of the {@link ByteBuffer}.
*/ */
public static Buffer wrap(ByteBuffer bytes) { public static ReadableBuffer wrap(ByteBuffer bytes) {
return new ByteBufferWrapper(bytes); return new ByteReadableBufferWrapper(bytes);
} }
/** /**
* Reads an entire {@link Buffer} to a new array. After calling this method, the buffer will * Reads an entire {@link ReadableBuffer} to a new array. After calling this method, the buffer will
* contain no readable bytes. * contain no readable bytes.
*/ */
public static byte[] readArray(Buffer buffer) { public static byte[] readArray(ReadableBuffer buffer) {
Preconditions.checkNotNull(buffer, "buffer"); Preconditions.checkNotNull(buffer, "buffer");
int length = buffer.readableBytes(); int length = buffer.readableBytes();
byte[] bytes = new byte[length]; byte[] bytes = new byte[length];
@ -93,18 +93,18 @@ public final class Buffers {
} }
/** /**
* Reads the entire {@link Buffer} to a new {@link String} with the given charset. * Reads the entire {@link ReadableBuffer} to a new {@link String} with the given charset.
*/ */
public static String readAsString(Buffer buffer, Charset charset) { public static String readAsString(ReadableBuffer buffer, Charset charset) {
Preconditions.checkNotNull(charset, "charset"); Preconditions.checkNotNull(charset, "charset");
byte[] bytes = readArray(buffer); byte[] bytes = readArray(buffer);
return new String(bytes, charset); return new String(bytes, charset);
} }
/** /**
* Reads the entire {@link Buffer} to a new {@link String} using UTF-8 decoding. * Reads the entire {@link ReadableBuffer} to a new {@link String} using UTF-8 decoding.
*/ */
public static String readAsStringUtf8(Buffer buffer) { public static String readAsStringUtf8(ReadableBuffer buffer) {
return readAsString(buffer, UTF_8); return readAsString(buffer, UTF_8);
} }
@ -116,18 +116,18 @@ public final class Buffers {
* @param buffer the buffer backing the new {@link InputStream}. * @param buffer the buffer backing the new {@link InputStream}.
* @param owner if {@code true}, the returned stream will close the buffer when closed. * @param owner if {@code true}, the returned stream will close the buffer when closed.
*/ */
public static InputStream openStream(Buffer buffer, boolean owner) { public static InputStream openStream(ReadableBuffer buffer, boolean owner) {
return new BufferInputStream(owner ? buffer : ignoreClose(buffer)); return new BufferInputStream(owner ? buffer : ignoreClose(buffer));
} }
/** /**
* Decorates the given {@link Buffer} to ignore calls to {@link Buffer#close}. * Decorates the given {@link ReadableBuffer} to ignore calls to {@link ReadableBuffer#close}.
* *
* @param buffer the buffer to be decorated. * @param buffer the buffer to be decorated.
* @return a wrapper around {@code buffer} that ignores calls to {@link Buffer#close}. * @return a wrapper around {@code buffer} that ignores calls to {@link ReadableBuffer#close}.
*/ */
public static Buffer ignoreClose(Buffer buffer) { public static ReadableBuffer ignoreClose(ReadableBuffer buffer) {
return new ForwardingBuffer(buffer) { return new ForwardingReadableBuffer(buffer) {
@Override @Override
public void close() { public void close() {
// Ignore. // Ignore.
@ -136,9 +136,9 @@ public final class Buffers {
} }
/** /**
* A {@link Buffer} that is backed by a byte array. * A {@link ReadableBuffer} that is backed by a byte array.
*/ */
private static class ByteArrayWrapper extends AbstractBuffer { private static class ByteArrayWrapper extends AbstractReadableBuffer {
int offset; int offset;
final int end; final int end;
final byte[] bytes; final byte[] bytes;
@ -221,12 +221,12 @@ public final class Buffers {
} }
/** /**
* A {@link Buffer} that is backed by a {@link ByteBuffer}. * A {@link ReadableBuffer} that is backed by a {@link ByteBuffer}.
*/ */
private static class ByteBufferWrapper extends AbstractBuffer { private static class ByteReadableBufferWrapper extends AbstractReadableBuffer {
final ByteBuffer bytes; final ByteBuffer bytes;
ByteBufferWrapper(ByteBuffer bytes) { ByteReadableBufferWrapper(ByteBuffer bytes) {
this.bytes = Preconditions.checkNotNull(bytes, "bytes"); this.bytes = Preconditions.checkNotNull(bytes, "bytes");
} }
@ -283,12 +283,12 @@ public final class Buffers {
} }
@Override @Override
public ByteBufferWrapper readBytes(int length) { public ByteReadableBufferWrapper readBytes(int length) {
checkReadable(length); checkReadable(length);
ByteBuffer buffer = bytes.duplicate(); ByteBuffer buffer = bytes.duplicate();
bytes.position(bytes.position() + length); bytes.position(bytes.position() + length);
buffer.limit(bytes.position() + length); buffer.limit(bytes.position() + length);
return new ByteBufferWrapper(buffer); return new ByteReadableBufferWrapper(buffer);
} }
@Override @Override
@ -308,12 +308,12 @@ public final class Buffers {
} }
/** /**
* An {@link InputStream} that is backed by a {@link Buffer}. * An {@link InputStream} that is backed by a {@link ReadableBuffer}.
*/ */
private static class BufferInputStream extends InputStream { private static class BufferInputStream extends InputStream {
final Buffer buffer; final ReadableBuffer buffer;
public BufferInputStream(Buffer buffer) { public BufferInputStream(ReadableBuffer buffer) {
this.buffer = Preconditions.checkNotNull(buffer, "buffer"); this.buffer = Preconditions.checkNotNull(buffer, "buffer");
} }
@ -344,5 +344,5 @@ public final class Buffers {
} }
} }
private Buffers() {} private ReadableBuffers() {}
} }

View File

@ -0,0 +1,68 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport;
/**
* An interface for a byte buffer that can only be written to.
* {@link WritableBuffer}s are a generic way to transfer bytes to
* the concrete network transports, like Netty and OkHttp.
*/
public interface WritableBuffer {
/**
* Appends {@code length} bytes to the buffer from the source
* array starting at {@code srcIndex}.
*
* @throws IndexOutOfBoundsException
* if the specified {@code srcIndex} is less than {@code 0},
* if {@code srcIndex + length} is greater than
* {@code src.length}, or
* if {@code length} is greater than {@link #writableBytes()}
*/
void write(byte[] src, int srcIndex, int length);
/**
* Returns the number of bytes one can write to the buffer.
*/
int writableBytes();
/**
* Returns the number of bytes one can read from the buffer.
*/
int readableBytes();
/**
* Releases the buffer, indicating to the {@link WritableBufferAllocator} that
* this buffer is no longer used and its resources can be reused.
*/
void release();
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport;
/**
* The preferred way of creating a {@link WritableBuffer}.
*/
public interface WritableBufferAllocator {
/**
* Allocate a new {@link WritableBuffer} that can hold
* {@code capacity} bytes.
*/
WritableBuffer allocate(int capacity);
}

View File

@ -43,13 +43,13 @@ import org.mockito.MockitoAnnotations;
import org.mockito.stubbing.OngoingStubbing; import org.mockito.stubbing.OngoingStubbing;
/** /**
* Tests for {@link AbstractBuffer}. * Tests for {@link AbstractReadableBuffer}.
*/ */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class AbstractBufferTest { public class AbstractReadableBufferTest {
@Mock @Mock
private AbstractBuffer buffer; private AbstractReadableBuffer buffer;
@Before @Before
public void setup() { public void setup() {

View File

@ -0,0 +1,56 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport;
import org.junit.Before;
import java.util.Arrays;
public class ByteWritableBufferTest extends WritableBufferTestBase {
private MessageFramerTest.ByteWritableBuffer buffer;
@Before
public void setup() {
buffer = new MessageFramerTest.ByteWritableBuffer(100);
}
@Override
protected WritableBuffer buffer() {
return buffer;
}
@Override
protected byte[] writtenBytes() {
return Arrays.copyOfRange(buffer.data, 0, buffer.readableBytes());
}
}

View File

@ -47,17 +47,17 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/** /**
* Tests for {@link CompositeBuffer}. * Tests for {@link CompositeReadableBuffer}.
*/ */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class CompositeBufferTest { public class CompositeReadableBufferTest {
private static final String EXPECTED_VALUE = "hello world"; private static final String EXPECTED_VALUE = "hello world";
private CompositeBuffer composite; private CompositeReadableBuffer composite;
@Before @Before
public void setup() { public void setup() {
composite = new CompositeBuffer(); composite = new CompositeReadableBuffer();
splitAndAdd(EXPECTED_VALUE); splitAndAdd(EXPECTED_VALUE);
} }
@ -68,10 +68,10 @@ public class CompositeBufferTest {
@Test @Test
public void singleBufferShouldSucceed() { public void singleBufferShouldSucceed() {
composite = new CompositeBuffer(); composite = new CompositeReadableBuffer();
composite.addBuffer(Buffers.wrap(EXPECTED_VALUE.getBytes(UTF_8))); composite.addBuffer(ReadableBuffers.wrap(EXPECTED_VALUE.getBytes(UTF_8)));
assertEquals(EXPECTED_VALUE.length(), composite.readableBytes()); assertEquals(EXPECTED_VALUE.length(), composite.readableBytes());
assertEquals(EXPECTED_VALUE, Buffers.readAsStringUtf8(composite)); assertEquals(EXPECTED_VALUE, ReadableBuffers.readAsStringUtf8(composite));
assertEquals(0, composite.readableBytes()); assertEquals(0, composite.readableBytes());
} }
@ -161,9 +161,9 @@ public class CompositeBufferTest {
@Test @Test
public void closeShouldCloseBuffers() { public void closeShouldCloseBuffers() {
composite = new CompositeBuffer(); composite = new CompositeReadableBuffer();
Buffer mock1 = mock(Buffer.class); ReadableBuffer mock1 = mock(ReadableBuffer.class);
Buffer mock2 = mock(Buffer.class); ReadableBuffer mock2 = mock(ReadableBuffer.class);
composite.addBuffer(mock1); composite.addBuffer(mock1);
composite.addBuffer(mock2); composite.addBuffer(mock2);
@ -177,7 +177,7 @@ public class CompositeBufferTest {
for (int startIndex = 0, endIndex = 0; startIndex < value.length(); startIndex = endIndex) { for (int startIndex = 0, endIndex = 0; startIndex < value.length(); startIndex = endIndex) {
endIndex = Math.min(value.length(), startIndex + partLength); endIndex = Math.min(value.length(), startIndex + partLength);
String part = value.substring(startIndex, endIndex); String part = value.substring(startIndex, endIndex);
composite.addBuffer(Buffers.wrap(part.getBytes(UTF_8))); composite.addBuffer(ReadableBuffers.wrap(part.getBytes(UTF_8)));
} }
assertEquals(value.length(), composite.readableBytes()); assertEquals(value.length(), composite.readableBytes());

View File

@ -149,7 +149,7 @@ public class MessageDeframerTest {
public void largerFrameSize() { public void largerFrameSize() {
deframer.request(1); deframer.request(1);
deframer.deframe( deframer.deframe(
Buffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false); ReadableBuffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false);
verify(listener).messageRead(messages.capture()); verify(listener).messageRead(messages.capture());
assertEquals(Bytes.asList(new byte[1000]), bytes(messages)); assertEquals(Bytes.asList(new byte[1000]), bytes(messages));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
@ -196,8 +196,8 @@ public class MessageDeframerTest {
} }
} }
private static Buffer buffer(byte[] bytes) { private static ReadableBuffer buffer(byte[] bytes) {
return Buffers.wrap(bytes); return ReadableBuffers.wrap(bytes);
} }
private static byte[] compress(byte[] bytes) { private static byte[] compress(byte[] bytes) {

View File

@ -36,8 +36,9 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions;
import static io.grpc.transport.MessageFramer.Compression;
import com.google.common.primitives.Bytes; import com.google.common.base.Preconditions;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -49,8 +50,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer; import java.util.Arrays;
import java.util.List;
/** /**
* Tests for {@link MessageFramer} * Tests for {@link MessageFramer}
@ -60,19 +60,18 @@ public class MessageFramerTest {
private static final int TRANSPORT_FRAME_SIZE = 12; private static final int TRANSPORT_FRAME_SIZE = 12;
@Mock @Mock
private MessageFramer.Sink<List<Byte>> sink; private MessageFramer.Sink sink;
private MessageFramer.Sink<ByteBuffer> copyingSink;
private MessageFramer framer; private MessageFramer framer;
@Captor @Captor
private ArgumentCaptor<List<Byte>> frameCaptor; private ArgumentCaptor<ByteWritableBuffer> frameCaptor;
private WritableBufferAllocator allocator = new BytesWritableBufferAllocator();
@Before @Before
public void setup() { public void setup() {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
copyingSink = new ByteArrayConverterSink(sink); framer = new MessageFramer(sink, allocator, TRANSPORT_FRAME_SIZE);
framer = new MessageFramer(copyingSink, TRANSPORT_FRAME_SIZE);
} }
@Test @Test
@ -80,7 +79,7 @@ public class MessageFramerTest {
writePayload(framer, new byte[] {3, 14}); writePayload(framer, new byte[] {3, 14});
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
framer.flush(); framer.flush();
verify(sink).deliverFrame(Bytes.asList(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false);
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
} }
@ -91,8 +90,7 @@ public class MessageFramerTest {
writePayload(framer, new byte[] {14}); writePayload(framer, new byte[] {14});
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
framer.flush(); framer.flush();
verify(sink).deliverFrame( verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false);
Bytes.asList(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false);
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
} }
@ -101,26 +99,25 @@ public class MessageFramerTest {
writePayload(framer, new byte[] {3, 14, 1, 5, 9, 2, 6}); writePayload(framer, new byte[] {3, 14, 1, 5, 9, 2, 6});
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
framer.close(); framer.close();
verify(sink).deliverFrame(Bytes.asList(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true); verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true);
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
} }
@Test @Test
public void closeWithoutBufferedFrameGivesEmptySink() { public void closeWithoutBufferedFrameGivesEmptySink() {
framer.close(); framer.close();
verify(sink).deliverFrame(Bytes.asList(), true); verify(sink).deliverFrame(new ByteWritableBuffer(0), true);
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
} }
@Test @Test
public void payloadSplitBetweenSinks() { public void payloadSplitBetweenSinks() {
writePayload(framer, new byte[] {3, 14, 1, 5, 9, 2, 6, 5}); writePayload(framer, new byte[] {3, 14, 1, 5, 9, 2, 6, 5});
verify(sink).deliverFrame( verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false);
Bytes.asList(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false);
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
framer.flush(); framer.flush();
verify(sink).deliverFrame(Bytes.asList(new byte[] {5}), false); verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false);
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
} }
@ -129,11 +126,11 @@ public class MessageFramerTest {
writePayload(framer, new byte[] {3, 14, 1}); writePayload(framer, new byte[] {3, 14, 1});
writePayload(framer, new byte[] {3}); writePayload(framer, new byte[] {3});
verify(sink).deliverFrame( verify(sink).deliverFrame(
Bytes.asList(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false); toWriteBuffer(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false);
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
framer.flush(); framer.flush();
verify(sink).deliverFrame(Bytes.asList(new byte[] {1, 3}), false); verify(sink).deliverFrame(toWriteBuffer(new byte[] {1, 3}), false);
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
} }
@ -141,7 +138,7 @@ public class MessageFramerTest {
public void emptyPayloadYieldsFrame() throws Exception { public void emptyPayloadYieldsFrame() throws Exception {
writePayload(framer, new byte[0]); writePayload(framer, new byte[0]);
framer.flush(); framer.flush();
verify(sink).deliverFrame(Bytes.asList(new byte[] {0, 0, 0, 0, 0}), false); verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false);
} }
@Test @Test
@ -149,60 +146,118 @@ public class MessageFramerTest {
writePayload(framer, new byte[] {3, 14}); writePayload(framer, new byte[] {3, 14});
framer.flush(); framer.flush();
framer.flush(); framer.flush();
verify(sink).deliverFrame(Bytes.asList(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false);
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
} }
@Test @Test
public void largerFrameSize() throws Exception { public void largerFrameSize() throws Exception {
final int transportFrameSize = 10000; final int transportFrameSize = 10000;
MessageFramer framer = new MessageFramer(copyingSink, transportFrameSize); MessageFramer framer = new MessageFramer(sink, allocator, transportFrameSize);
writePayload(framer, new byte[1000]); writePayload(framer, new byte[1000]);
framer.flush(); framer.flush();
verify(sink).deliverFrame(frameCaptor.capture(), eq(false)); verify(sink).deliverFrame(frameCaptor.capture(), eq(false));
List<Byte> buffer = frameCaptor.getValue(); ByteWritableBuffer buffer = frameCaptor.getValue();
assertEquals(1005, buffer.size()); assertEquals(1005, buffer.size());
assertEquals(Bytes.asList(new byte[] {0, 0, 0, 3, (byte) 232}), buffer.subList(0, 5));
byte data[] = new byte[1005];
data[3] = 3;
data[4] = (byte) 232;
assertEquals(toWriteBuffer(data, transportFrameSize), buffer);
verifyNoMoreInteractions(sink); verifyNoMoreInteractions(sink);
} }
@Test @Test
public void compressed() throws Exception { public void compressed() throws Exception {
final int transportFrameSize = 100; final int transportFrameSize = 100;
MessageFramer framer = new MessageFramer(copyingSink, transportFrameSize, MessageFramer framer =
MessageFramer.Compression.GZIP); new MessageFramer(sink, allocator, transportFrameSize, Compression.GZIP);
writePayload(framer, new byte[1000]); writePayload(framer, new byte[1000]);
framer.flush(); framer.flush();
verify(sink).deliverFrame(frameCaptor.capture(), eq(false)); verify(sink).deliverFrame(frameCaptor.capture(), eq(false));
List<Byte> buffer = frameCaptor.getValue(); ByteWritableBuffer buffer = frameCaptor.getValue();
// It should have compressed very well. // It should have compressed very well.
assertTrue(buffer.size() < 100); assertTrue(buffer.size() < 100);
// We purposefully don't check the last byte of length, since that depends on how exactly it // We purposefully don't check the last byte of length, since that depends on how exactly it
// compressed. // compressed.
assertEquals(Bytes.asList(new byte[] {1, 0, 0, 0}), buffer.subList(0, 4)); assertEquals(1, buffer.data[0]);
verifyNoMoreInteractions(sink); assertEquals(0, buffer.data[1]);
assertEquals(0, buffer.data[2]);
assertEquals(0, buffer.data[3]);
}
private static WritableBuffer toWriteBuffer(byte[] data) {
return toWriteBuffer(data, TRANSPORT_FRAME_SIZE);
}
private static WritableBuffer toWriteBuffer(byte[] data, int maxFrameSize) {
ByteWritableBuffer buffer = new ByteWritableBuffer(maxFrameSize);
buffer.write(data, 0, data.length);
return buffer;
} }
private static void writePayload(MessageFramer framer, byte[] bytes) { private static void writePayload(MessageFramer framer, byte[] bytes) {
framer.writePayload(new ByteArrayInputStream(bytes), bytes.length); framer.writePayload(new ByteArrayInputStream(bytes), bytes.length);
} }
/** static class ByteWritableBuffer implements WritableBuffer {
* Since ByteBuffers are reused, this sink copies their value at the time of the call. Converting byte[] data;
* to List<Byte> is convenience. private int writeIdx;
*/
private static class ByteArrayConverterSink implements MessageFramer.Sink<ByteBuffer> {
private final MessageFramer.Sink<List<Byte>> delegate;
public ByteArrayConverterSink(MessageFramer.Sink<List<Byte>> delegate) { ByteWritableBuffer(int maxFrameSize) {
this.delegate = delegate; data = new byte[maxFrameSize];
} }
@Override @Override
public void deliverFrame(ByteBuffer frame, boolean endOfStream) { public void write(byte[] bytes, int srcIndex, int length) {
byte[] frameBytes = new byte[frame.remaining()]; System.arraycopy(bytes, srcIndex, data, writeIdx, length);
frame.get(frameBytes); writeIdx += length;
delegate.deliverFrame(Bytes.asList(frameBytes), endOfStream); }
@Override
public int writableBytes() {
return data.length - writeIdx;
}
@Override
public int readableBytes() {
return writeIdx;
}
@Override
public void release() {
data = null;
}
int size() {
return writeIdx;
}
@Override
public boolean equals(Object buffer) {
if (!(buffer instanceof ByteWritableBuffer)) {
return false;
}
ByteWritableBuffer other = (ByteWritableBuffer) buffer;
return writableBytes() == other.writableBytes() &&
readableBytes() == other.readableBytes() &&
Arrays.equals(data, other.data);
}
@Override
public int hashCode() {
return Arrays.hashCode(data) + writableBytes() + readableBytes();
}
}
static class BytesWritableBufferAllocator implements WritableBufferAllocator {
@Override
public WritableBuffer allocate(int maxCapacity) {
return new ByteWritableBuffer(maxCapacity);
} }
} }
} }

View File

@ -43,15 +43,15 @@ import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
/** /**
* Abstract base class for tests of {@link Buffer} subclasses. * Abstract base class for tests of {@link ReadableBuffer} subclasses.
*/ */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public abstract class BufferTestBase { public abstract class ReadableBufferTestBase {
protected final String msg = "hello"; protected final String msg = "hello";
@Test @Test
public void bufferShouldReadAllBytes() { public void bufferShouldReadAllBytes() {
Buffer buffer = buffer(); ReadableBuffer buffer = buffer();
for (int ix = 0; ix < msg.length(); ++ix) { for (int ix = 0; ix < msg.length(); ++ix) {
assertEquals(msg.length() - ix, buffer.readableBytes()); assertEquals(msg.length() - ix, buffer.readableBytes());
assertEquals(msg.charAt(ix), buffer.readUnsignedByte()); assertEquals(msg.charAt(ix), buffer.readUnsignedByte());
@ -61,7 +61,7 @@ public abstract class BufferTestBase {
@Test @Test
public void readToArrayShouldSucceed() { public void readToArrayShouldSucceed() {
Buffer buffer = buffer(); ReadableBuffer buffer = buffer();
byte[] array = new byte[msg.length()]; byte[] array = new byte[msg.length()];
buffer.readBytes(array, 0, array.length); buffer.readBytes(array, 0, array.length);
Arrays.equals(msg.getBytes(UTF_8), array); Arrays.equals(msg.getBytes(UTF_8), array);
@ -70,7 +70,7 @@ public abstract class BufferTestBase {
@Test @Test
public void partialReadToArrayShouldSucceed() { public void partialReadToArrayShouldSucceed() {
Buffer buffer = buffer(); ReadableBuffer buffer = buffer();
byte[] array = new byte[msg.length()]; byte[] array = new byte[msg.length()];
buffer.readBytes(array, 1, 2); buffer.readBytes(array, 1, 2);
Arrays.equals(new byte[] {'h', 'e'}, Arrays.copyOfRange(array, 1, 3)); Arrays.equals(new byte[] {'h', 'e'}, Arrays.copyOfRange(array, 1, 3));
@ -79,7 +79,7 @@ public abstract class BufferTestBase {
@Test @Test
public void readToStreamShouldSucceed() throws Exception { public void readToStreamShouldSucceed() throws Exception {
Buffer buffer = buffer(); ReadableBuffer buffer = buffer();
ByteArrayOutputStream stream = new ByteArrayOutputStream(); ByteArrayOutputStream stream = new ByteArrayOutputStream();
buffer.readBytes(stream, msg.length()); buffer.readBytes(stream, msg.length());
Arrays.equals(msg.getBytes(UTF_8), stream.toByteArray()); Arrays.equals(msg.getBytes(UTF_8), stream.toByteArray());
@ -88,7 +88,7 @@ public abstract class BufferTestBase {
@Test @Test
public void partialReadToStreamShouldSucceed() throws Exception { public void partialReadToStreamShouldSucceed() throws Exception {
Buffer buffer = buffer(); ReadableBuffer buffer = buffer();
ByteArrayOutputStream stream = new ByteArrayOutputStream(); ByteArrayOutputStream stream = new ByteArrayOutputStream();
buffer.readBytes(stream, 2); buffer.readBytes(stream, 2);
Arrays.equals(new byte[]{'h', 'e'}, Arrays.copyOfRange(stream.toByteArray(), 0, 2)); Arrays.equals(new byte[]{'h', 'e'}, Arrays.copyOfRange(stream.toByteArray(), 0, 2));
@ -97,7 +97,7 @@ public abstract class BufferTestBase {
@Test @Test
public void readToByteBufferShouldSucceed() { public void readToByteBufferShouldSucceed() {
Buffer buffer = buffer(); ReadableBuffer buffer = buffer();
ByteBuffer byteBuffer = ByteBuffer.allocate(msg.length()); ByteBuffer byteBuffer = ByteBuffer.allocate(msg.length());
buffer.readBytes(byteBuffer); buffer.readBytes(byteBuffer);
byteBuffer.flip(); byteBuffer.flip();
@ -109,7 +109,7 @@ public abstract class BufferTestBase {
@Test @Test
public void partialReadToByteBufferShouldSucceed() { public void partialReadToByteBufferShouldSucceed() {
Buffer buffer = buffer(); ReadableBuffer buffer = buffer();
ByteBuffer byteBuffer = ByteBuffer.allocate(2); ByteBuffer byteBuffer = ByteBuffer.allocate(2);
buffer.readBytes(byteBuffer); buffer.readBytes(byteBuffer);
byteBuffer.flip(); byteBuffer.flip();
@ -119,5 +119,5 @@ public abstract class BufferTestBase {
assertEquals(3, buffer.readableBytes()); assertEquals(3, buffer.readableBytes());
} }
protected abstract Buffer buffer(); protected abstract ReadableBuffer buffer();
} }

View File

@ -32,7 +32,7 @@
package io.grpc.transport; package io.grpc.transport;
import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Charsets.UTF_8;
import static io.grpc.transport.Buffers.wrap; import static io.grpc.transport.ReadableBuffers.wrap;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -40,14 +40,14 @@ import static org.junit.Assert.assertTrue;
import org.junit.Test; import org.junit.Test;
/** /**
* Tests for the array-backed {@link Buffer} returned by {@link Buffers#wrap(byte[], int, int)}; * Tests for the array-backed {@link ReadableBuffer} returned by {@link ReadableBuffers#wrap(byte[], int, int)};
*/ */
public class BuffersArrayTest extends BufferTestBase { public class ReadableBuffersArrayTest extends ReadableBufferTestBase {
@Test @Test
public void bufferShouldExposeArray() { public void bufferShouldExposeArray() {
byte[] array = msg.getBytes(UTF_8); byte[] array = msg.getBytes(UTF_8);
Buffer buffer = wrap(array, 1, msg.length() - 1); ReadableBuffer buffer = wrap(array, 1, msg.length() - 1);
assertTrue(buffer.hasArray()); assertTrue(buffer.hasArray());
assertSame(array, buffer.array()); assertSame(array, buffer.array());
assertEquals(1, buffer.arrayOffset()); assertEquals(1, buffer.arrayOffset());
@ -58,7 +58,7 @@ public class BuffersArrayTest extends BufferTestBase {
} }
@Override @Override
protected Buffer buffer() { protected ReadableBuffer buffer() {
return Buffers.wrap(msg.getBytes(UTF_8), 0, msg.length()); return ReadableBuffers.wrap(msg.getBytes(UTF_8), 0, msg.length());
} }
} }

View File

@ -36,12 +36,12 @@ import static com.google.common.base.Charsets.UTF_8;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/** /**
* Tests for the array-backed {@link Buffer} returned by {@link Buffers#wrap(ByteBuffer)}. * Tests for the array-backed {@link ReadableBuffer} returned by {@link ReadableBuffers#wrap(ByteBuffer)}.
*/ */
public class BuffersByteBufferTest extends BufferTestBase { public class ReadableBuffersByteBufferTest extends ReadableBufferTestBase {
@Override @Override
protected Buffer buffer() { protected ReadableBuffer buffer() {
return Buffers.wrap(ByteBuffer.wrap(msg.getBytes(UTF_8))); return ReadableBuffers.wrap(ByteBuffer.wrap(msg.getBytes(UTF_8)));
} }
} }

View File

@ -0,0 +1,66 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Abstract base class for tests of {@link WritableBufferAllocator} subclasses.
*/
@RunWith(JUnit4.class)
public abstract class WritableBufferAllocatorTestBase {
protected abstract WritableBufferAllocator allocator();
@Test
public void testBuffersAreDifferent() {
WritableBuffer buffer1 = allocator().allocate(100);
WritableBuffer buffer2 = allocator().allocate(100);
assertNotSame(buffer1, buffer2);
buffer1.release();
buffer2.release();
}
@Test
public void testCapacity() {
WritableBuffer buffer = allocator().allocate(4096);
assertEquals(0, buffer.readableBytes());
assertEquals(4096, buffer.writableBytes());
}
}

View File

@ -0,0 +1,119 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Abstract base class for tests of {@link WritableBuffer} subclasses.
*/
@RunWith(JUnit4.class)
public abstract class WritableBufferTestBase {
/**
* Returns a new buffer for every test case with
* at least 100 byte of capacity.
*/
protected abstract WritableBuffer buffer();
/**
* Bytes written to {@link #buffer()}.
*/
protected abstract byte[] writtenBytes();
@Test(expected = RuntimeException.class)
public void testWriteNegativeLength() {
buffer().write(new byte[1], 0, -1);
}
@Test(expected = IndexOutOfBoundsException.class)
public void testWriteNegativeSrcIndex() {
buffer().write(new byte[1], -1, 0);
}
@Test(expected = IndexOutOfBoundsException.class)
public void testWriteSrcIndexAndLengthExceedSrcLength() {
buffer().write(new byte[10], 1, 10);
}
@Test(expected = IndexOutOfBoundsException.class)
public void testWriteSrcIndexAndLengthExceedWritableBytes() {
buffer().write(new byte[buffer().writableBytes()], 1, buffer().writableBytes());
}
@Test
public void testWritableAndReadableBytes() {
int before = buffer().writableBytes();
buffer().write(new byte[10], 0, 5);
assertEquals(5, before - buffer().writableBytes());
assertEquals(5, buffer().readableBytes());
}
@Test
public void testWriteSrcIndex() {
byte b[] = new byte[10];
for (byte i = 5; i < 10; i++) {
b[i] = i;
}
buffer().write(b, 5, 5);
assertEquals(5, buffer().readableBytes());
byte writtenBytes[] = writtenBytes();
assertEquals(5, writtenBytes.length);
for (int i = 0; i < writtenBytes.length; i++) {
assertEquals(5+i, writtenBytes[i]);
}
}
@Test
public void testMultipleWrites() {
byte[] b = new byte[100];
for (byte i = 0; i < b.length; i++) {
b[i] = i;
}
// Write in chunks of 10 bytes
for (int i = 0; i < 10; i++) {
buffer().write(b, 10 * i, 10);
assertEquals(10 * (i + 1), buffer().readableBytes());
}
assertArrayEquals(b, writtenBytes());
}
}

View File

@ -35,12 +35,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.transport.ClientStreamListener; import io.grpc.transport.ClientStreamListener;
import io.grpc.transport.Http2ClientStream; import io.grpc.transport.Http2ClientStream;
import io.grpc.transport.WritableBuffer;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Headers;
import java.nio.ByteBuffer;
/** /**
* Client stream for a Netty transport. * Client stream for a Netty transport.
*/ */
@ -50,7 +49,7 @@ class NettyClientStream extends Http2ClientStream {
private final NettyClientHandler handler; private final NettyClientHandler handler;
NettyClientStream(ClientStreamListener listener, Channel channel, NettyClientHandler handler) { NettyClientStream(ClientStreamListener listener, Channel channel, NettyClientHandler handler) {
super(listener); super(new NettyWritableBufferAllocator(channel.alloc()), listener);
this.channel = checkNotNull(channel, "channel"); this.channel = checkNotNull(channel, "channel");
this.handler = checkNotNull(handler, "handler"); this.handler = checkNotNull(handler, "handler");
} }
@ -74,7 +73,7 @@ class NettyClientStream extends Http2ClientStream {
} }
void transportDataReceived(ByteBuf frame, boolean endOfStream) { void transportDataReceived(ByteBuf frame, boolean endOfStream) {
transportDataReceived(new NettyBuffer(frame.retain()), endOfStream); transportDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream);
} }
@Override @Override
@ -84,11 +83,9 @@ class NettyClientStream extends Http2ClientStream {
} }
@Override @Override
protected void sendFrame(ByteBuffer frame, boolean endOfStream) { protected void sendFrame(WritableBuffer frame, boolean endOfStream) {
SendGrpcFrameCommand cmd = ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf();
new SendGrpcFrameCommand(this, Utils.toByteBuf(channel.alloc(), frame), endOfStream); channel.writeAndFlush(new SendGrpcFrameCommand(this, bytebuf, endOfStream));
channel.writeAndFlush(cmd);
} }
@Override @Override

View File

@ -34,7 +34,7 @@ package io.grpc.transport.netty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import io.grpc.transport.AbstractBuffer; import io.grpc.transport.AbstractReadableBuffer;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import java.io.IOException; import java.io.IOException;
@ -46,11 +46,11 @@ import java.nio.ByteBuffer;
* call {@link ByteBuf#retain}, so if that is needed it should be called prior to creating this * call {@link ByteBuf#retain}, so if that is needed it should be called prior to creating this
* buffer. * buffer.
*/ */
class NettyBuffer extends AbstractBuffer { class NettyReadableBuffer extends AbstractReadableBuffer {
private final ByteBuf buffer; private final ByteBuf buffer;
private boolean closed; private boolean closed;
NettyBuffer(ByteBuf buffer) { NettyReadableBuffer(ByteBuf buffer) {
this.buffer = Preconditions.checkNotNull(buffer, "buffer"); this.buffer = Preconditions.checkNotNull(buffer, "buffer");
} }
@ -93,9 +93,9 @@ class NettyBuffer extends AbstractBuffer {
} }
@Override @Override
public NettyBuffer readBytes(int length) { public NettyReadableBuffer readBytes(int length) {
// The ByteBuf returned by readSlice() stores a reference to buffer but does not call retain(). // The ByteBuf returned by readSlice() stores a reference to buffer but does not call retain().
return new NettyBuffer(buffer.readSlice(length).retain()); return new NettyReadableBuffer(buffer.readSlice(length).retain());
} }
@Override @Override

View File

@ -35,12 +35,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.transport.AbstractServerStream; import io.grpc.transport.AbstractServerStream;
import io.grpc.transport.WritableBuffer;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Headers;
import java.nio.ByteBuffer;
/** /**
* Server stream for a Netty HTTP2 transport * Server stream for a Netty HTTP2 transport
*/ */
@ -50,13 +49,13 @@ class NettyServerStream extends AbstractServerStream<Integer> {
private final NettyServerHandler handler; private final NettyServerHandler handler;
NettyServerStream(Channel channel, int id, NettyServerHandler handler) { NettyServerStream(Channel channel, int id, NettyServerHandler handler) {
super(id); super(new NettyWritableBufferAllocator(channel.alloc()), id);
this.channel = checkNotNull(channel, "channel"); this.channel = checkNotNull(channel, "channel");
this.handler = checkNotNull(handler, "handler"); this.handler = checkNotNull(handler, "handler");
} }
void inboundDataReceived(ByteBuf frame, boolean endOfStream) { void inboundDataReceived(ByteBuf frame, boolean endOfStream) {
super.inboundDataReceived(new NettyBuffer(frame.retain()), endOfStream); super.inboundDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream);
} }
@Override @Override
@ -81,10 +80,9 @@ class NettyServerStream extends AbstractServerStream<Integer> {
} }
@Override @Override
protected void sendFrame(ByteBuffer frame, boolean endOfStream) { protected void sendFrame(WritableBuffer frame, boolean endOfStream) {
SendGrpcFrameCommand cmd = ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf();
new SendGrpcFrameCommand(this, Utils.toByteBuf(channel.alloc(), frame), endOfStream); channel.writeAndFlush(new SendGrpcFrameCommand(this, bytebuf, endOfStream));
channel.writeAndFlush(cmd);
} }
@Override @Override

View File

@ -0,0 +1,71 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport.netty;
import io.grpc.transport.WritableBuffer;
import io.netty.buffer.ByteBuf;
/**
* The {@link WritableBuffer} used by the Netty transport.
*/
class NettyWritableBuffer implements WritableBuffer {
private final ByteBuf bytebuf;
NettyWritableBuffer(ByteBuf bytebuf) {
this.bytebuf = bytebuf;
}
@Override
public void write(byte[] src, int srcIndex, int length) {
bytebuf.writeBytes(src, srcIndex, length);
}
@Override
public int writableBytes() {
return bytebuf.writableBytes();
}
@Override
public int readableBytes() {
return bytebuf.readableBytes();
}
@Override
public void release() {
bytebuf.release();
}
ByteBuf bytebuf() {
return bytebuf;
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport.netty;
import io.grpc.transport.WritableBufferAllocator;
import io.netty.buffer.ByteBufAllocator;
/**
* The default allocator for {@link NettyWritableBuffer}s used by the Netty transport.
*/
class NettyWritableBufferAllocator implements WritableBufferAllocator {
private final ByteBufAllocator allocator;
NettyWritableBufferAllocator(ByteBufAllocator allocator) {
this.allocator = allocator;
}
@Override
public NettyWritableBuffer allocate(int capacity) {
return new NettyWritableBuffer(allocator.buffer(capacity, capacity));
}
}

View File

@ -34,8 +34,8 @@ package io.grpc.transport.netty;
import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import io.grpc.transport.Buffer; import io.grpc.transport.ReadableBuffer;
import io.grpc.transport.BufferTestBase; import io.grpc.transport.ReadableBufferTestBase;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import org.junit.Before; import org.junit.Before;
@ -44,15 +44,15 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
/** /**
* Tests for {@link NettyBuffer}. * Tests for {@link NettyReadableBuffer}.
*/ */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class NettyBufferTest extends BufferTestBase { public class NettyReadableBufferTest extends ReadableBufferTestBase {
private NettyBuffer buffer; private NettyReadableBuffer buffer;
@Before @Before
public void setup() { public void setup() {
buffer = new NettyBuffer(Unpooled.copiedBuffer(msg, UTF_8)); buffer = new NettyReadableBuffer(Unpooled.copiedBuffer(msg, UTF_8));
} }
@Test @Test
@ -69,7 +69,7 @@ public class NettyBufferTest extends BufferTestBase {
} }
@Override @Override
protected Buffer buffer() { protected ReadableBuffer buffer() {
return buffer; return buffer;
} }
} }

View File

@ -58,7 +58,9 @@ import io.grpc.transport.MessageFramer;
import io.grpc.transport.ServerStream; import io.grpc.transport.ServerStream;
import io.grpc.transport.ServerStreamListener; import io.grpc.transport.ServerStreamListener;
import io.grpc.transport.ServerTransportListener; import io.grpc.transport.ServerTransportListener;
import io.grpc.transport.WritableBuffer;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -86,7 +88,6 @@ import org.mockito.MockitoAnnotations;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer;
/** Unit tests for {@link NettyServerHandler}. */ /** Unit tests for {@link NettyServerHandler}. */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
@ -278,12 +279,13 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase {
private ByteBuf dataFrame(int streamId, boolean endStream) { private ByteBuf dataFrame(int streamId, boolean endStream) {
final ByteBuf compressionFrame = Unpooled.buffer(CONTENT.length); final ByteBuf compressionFrame = Unpooled.buffer(CONTENT.length);
MessageFramer framer = new MessageFramer(new MessageFramer.Sink<ByteBuffer>() { MessageFramer framer = new MessageFramer(new MessageFramer.Sink() {
@Override @Override
public void deliverFrame(ByteBuffer frame, boolean endOfStream) { public void deliverFrame(WritableBuffer frame, boolean endOfStream) {
compressionFrame.writeBytes(frame); ByteBuf bytebuf = ((NettyWritableBuffer)frame).bytebuf();
compressionFrame.writeBytes(bytebuf);
} }
}, 1000); }, new NettyWritableBufferAllocator(ByteBufAllocator.DEFAULT), 1000);
framer.writePayload(new ByteArrayInputStream(CONTENT), CONTENT.length); framer.writePayload(new ByteArrayInputStream(CONTENT), CONTENT.length);
framer.flush(); framer.flush();
if (endStream) { if (endStream) {

View File

@ -35,7 +35,6 @@ import static io.netty.util.CharsetUtil.UTF_8;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import io.grpc.Status;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;

View File

@ -0,0 +1,53 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport.netty;
import io.grpc.transport.WritableBufferAllocator;
import io.grpc.transport.WritableBufferAllocatorTestBase;
import io.netty.buffer.ByteBufAllocator;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for {@link NettyWritableBufferAllocator}.
*/
@RunWith(JUnit4.class)
public class NettyWritableBufferAllocatorTest extends WritableBufferAllocatorTestBase {
private final NettyWritableBufferAllocator allocator =
new NettyWritableBufferAllocator(ByteBufAllocator.DEFAULT);
@Override
protected WritableBufferAllocator allocator() {
return allocator;
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport.netty;
import io.grpc.transport.WritableBuffer;
import io.grpc.transport.WritableBufferTestBase;
import io.netty.buffer.Unpooled;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.Arrays;
/**
* Tests for {@link NettyWritableBuffer}.
*/
@RunWith(JUnit4.class)
public class NettyWritableBufferTest extends WritableBufferTestBase {
private NettyWritableBuffer buffer;
@Before
public void setup() {
buffer = new NettyWritableBuffer(Unpooled.buffer(100));
}
@After
public void teardown() {
buffer.release();
}
@Override
protected WritableBuffer buffer() {
return buffer;
}
@Override
protected byte[] writtenBytes() {
byte b[] = buffer.bytebuf().array();
int fromIdx = buffer.bytebuf().arrayOffset();
return Arrays.copyOfRange(b, fromIdx, buffer.readableBytes());
}
}

View File

@ -6,6 +6,9 @@ description = "gRPC: OkHttp"
dependencies { dependencies {
compile project(':grpc-core'), compile project(':grpc-core'),
libraries.okhttp libraries.okhttp
// Tests depend on base class defined by core module.
testCompile project(':grpc-core').sourceSets.test.output
} }
// Configure the animal sniffer plugin // Configure the animal sniffer plugin

View File

@ -40,8 +40,9 @@ import io.grpc.Metadata;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.transport.ClientStreamListener; import io.grpc.transport.ClientStreamListener;
import io.grpc.transport.Http2ClientStream; import io.grpc.transport.Http2ClientStream;
import io.grpc.transport.WritableBuffer;
import okio.Buffer;
import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
@ -78,7 +79,7 @@ class OkHttpClientStream extends Http2ClientStream {
AsyncFrameWriter frameWriter, AsyncFrameWriter frameWriter,
OkHttpClientTransport transport, OkHttpClientTransport transport,
OutboundFlowController outboundFlow) { OutboundFlowController outboundFlow) {
super(listener); super(new OkHttpWritableBufferAllocator(), listener);
this.frameWriter = frameWriter; this.frameWriter = frameWriter;
this.transport = transport; this.transport = transport;
this.outboundFlow = outboundFlow; this.outboundFlow = outboundFlow;
@ -109,17 +110,14 @@ class OkHttpClientStream extends Http2ClientStream {
synchronized (lock) { synchronized (lock) {
long length = frame.size(); long length = frame.size();
window -= length; window -= length;
super.transportDataReceived(new OkHttpBuffer(frame), endOfStream); super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
} }
} }
@Override @Override
protected void sendFrame(ByteBuffer frame, boolean endOfStream) { protected void sendFrame(WritableBuffer frame, boolean endOfStream) {
Preconditions.checkState(id() != 0, "streamId should be set"); Preconditions.checkState(id() != 0, "streamId should be set");
okio.Buffer buffer = new okio.Buffer(); Buffer buffer = ((OkHttpWritableBuffer) frame).buffer();
// Read the data into a buffer.
// TODO(madongfly): swap to NIO buffers or zero-copy if/when okhttp/okio supports it
buffer.write(frame.array(), frame.arrayOffset(), frame.remaining());
// Write the data to the remote endpoint. // Write the data to the remote endpoint.
// Per http2 SPEC, the max data length should be larger than 64K, while our frame size is // Per http2 SPEC, the max data length should be larger than 64K, while our frame size is
// only 4K. // only 4K.

View File

@ -46,7 +46,6 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.Status.Code; import io.grpc.Status.Code;
import io.grpc.transport.ClientStream;
import io.grpc.transport.ClientStreamListener; import io.grpc.transport.ClientStreamListener;
import io.grpc.transport.ClientTransport; import io.grpc.transport.ClientTransport;

View File

@ -31,8 +31,8 @@
package io.grpc.transport.okhttp; package io.grpc.transport.okhttp;
import io.grpc.transport.AbstractBuffer; import io.grpc.transport.AbstractReadableBuffer;
import io.grpc.transport.Buffer; import io.grpc.transport.ReadableBuffer;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
@ -40,12 +40,12 @@ import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/** /**
* A {@link Buffer} implementation that is backed by an {@link okio.Buffer}. * A {@link io.grpc.transport.ReadableBuffer} implementation that is backed by an {@link okio.Buffer}.
*/ */
class OkHttpBuffer extends AbstractBuffer { class OkHttpReadableBuffer extends AbstractReadableBuffer {
private final okio.Buffer buffer; private final okio.Buffer buffer;
OkHttpBuffer(okio.Buffer buffer) { OkHttpReadableBuffer(okio.Buffer buffer) {
this.buffer = buffer; this.buffer = buffer;
} }
@ -85,10 +85,10 @@ class OkHttpBuffer extends AbstractBuffer {
} }
@Override @Override
public Buffer readBytes(int length) { public ReadableBuffer readBytes(int length) {
okio.Buffer buf = new okio.Buffer(); okio.Buffer buf = new okio.Buffer();
buf.write(buffer, length); buf.write(buffer, length);
return new OkHttpBuffer(buf); return new OkHttpReadableBuffer(buf);
} }
@Override @Override

View File

@ -0,0 +1,72 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport.okhttp;
import io.grpc.transport.WritableBuffer;
import okio.Buffer;
class OkHttpWritableBuffer implements WritableBuffer {
private final Buffer buffer;
private int writableBytes;
private int readableBytes;
OkHttpWritableBuffer(Buffer buffer, int capacity) {
this.buffer = buffer;
writableBytes = capacity;
}
@Override
public void write(byte[] src, int srcIndex, int length) {
buffer.write(src, srcIndex, length);
writableBytes -= length;
readableBytes += length;
}
@Override
public int writableBytes() {
return writableBytes;
}
@Override
public int readableBytes() {
return readableBytes;
}
@Override
public void release() {
}
Buffer buffer() {
return buffer;
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport.okhttp;
import io.grpc.transport.WritableBufferAllocator;
import okio.Buffer;
/**
* The default allocator for {@link OkHttpWritableBuffer}s used by the OkHttp transport.
*/
class OkHttpWritableBufferAllocator implements WritableBufferAllocator {
@Override
public OkHttpWritableBuffer allocate(int capacity) {
return new OkHttpWritableBuffer(new Buffer(), capacity);
}
}

View File

@ -47,8 +47,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.Service.State;
import com.squareup.okhttp.internal.spdy.ErrorCode; import com.squareup.okhttp.internal.spdy.ErrorCode;
import com.squareup.okhttp.internal.spdy.FrameReader; import com.squareup.okhttp.internal.spdy.FrameReader;

View File

@ -0,0 +1,52 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport.okhttp;
import io.grpc.transport.WritableBufferAllocator;
import io.grpc.transport.WritableBufferAllocatorTestBase;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for {@link OkHttpWritableBufferAllocator}.
*/
@RunWith(JUnit4.class)
public class OkHttpWritableBufferAllocatorTest extends WritableBufferAllocatorTestBase {
private final OkHttpWritableBufferAllocator allocator = new OkHttpWritableBufferAllocator();
@Override
protected WritableBufferAllocator allocator() {
return allocator;
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport.okhttp;
import io.grpc.transport.WritableBuffer;
import io.grpc.transport.WritableBufferTestBase;
import okio.Buffer;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* Tests for {@link OkHttpWritableBuffer}.
*/
@RunWith(JUnit4.class)
public class OkHttpWritableBufferTest extends WritableBufferTestBase {
private OkHttpWritableBuffer buffer;
@Before
public void setup() {
buffer = new OkHttpWritableBuffer(new Buffer(), 100);
}
@Override
protected WritableBuffer buffer() {
return buffer;
}
@Override
protected byte[] writtenBytes() {
return buffer.buffer().readByteArray();
}
}