From 0076243063e7cff699c12185fa52baaaa194a035 Mon Sep 17 00:00:00 2001 From: Jakob Buchgraber Date: Wed, 11 Mar 2015 15:07:44 -0700 Subject: [PATCH] Add WritableBuffer interface for zero copy data writes. Fixes #8 WritableBuffer is a generic interface that allows to transfer data from gRPC directly to the native transport's buffer implementation. --- core/src/main/java/io/grpc/ChannelImpl.java | 4 - .../grpc/transport/AbstractClientStream.java | 13 +- ...uffer.java => AbstractReadableBuffer.java} | 4 +- .../grpc/transport/AbstractServerStream.java | 12 +- .../io/grpc/transport/AbstractStream.java | 15 +- ...ffer.java => CompositeReadableBuffer.java} | 50 +++---- ...fer.java => ForwardingReadableBuffer.java} | 10 +- .../io/grpc/transport/Http2ClientStream.java | 9 +- .../io/grpc/transport/MessageDeframer.java | 14 +- .../java/io/grpc/transport/MessageFramer.java | 79 +++++----- .../{Buffer.java => ReadableBuffer.java} | 4 +- .../{Buffers.java => ReadableBuffers.java} | 66 ++++----- .../io/grpc/transport/WritableBuffer.java | 68 +++++++++ .../transport/WritableBufferAllocator.java | 44 ++++++ ...t.java => AbstractReadableBufferTest.java} | 6 +- .../transport/ByteWritableBufferTest.java | 56 ++++++++ ....java => CompositeReadableBufferTest.java} | 22 +-- .../grpc/transport/MessageDeframerTest.java | 10 +- .../io/grpc/transport/MessageFramerTest.java | 135 ++++++++++++------ ...tBase.java => ReadableBufferTestBase.java} | 20 +-- ...est.java => ReadableBuffersArrayTest.java} | 12 +- ...ava => ReadableBuffersByteBufferTest.java} | 8 +- .../WritableBufferAllocatorTestBase.java | 66 +++++++++ .../transport/WritableBufferTestBase.java | 119 +++++++++++++++ .../transport/netty/NettyClientStream.java | 15 +- ...tyBuffer.java => NettyReadableBuffer.java} | 10 +- .../transport/netty/NettyServerStream.java | 14 +- .../transport/netty/NettyWritableBuffer.java | 71 +++++++++ .../netty/NettyWritableBufferAllocator.java | 52 +++++++ ...Test.java => NettyReadableBufferTest.java} | 14 +- .../netty/NettyServerHandlerTest.java | 12 +- .../grpc/transport/netty/NettyTestUtil.java | 1 - .../NettyWritableBufferAllocatorTest.java | 53 +++++++ .../netty/NettyWritableBufferTest.java | 73 ++++++++++ okhttp/build.gradle | 3 + .../transport/okhttp/OkHttpClientStream.java | 26 ++-- .../okhttp/OkHttpClientTransport.java | 1 - ...pBuffer.java => OkHttpReadableBuffer.java} | 14 +- .../okhttp/OkHttpWritableBuffer.java | 72 ++++++++++ .../okhttp/OkHttpWritableBufferAllocator.java | 45 ++++++ .../okhttp/OkHttpClientTransportTest.java | 2 - .../OkHttpWritableBufferAllocatorTest.java | 52 +++++++ .../okhttp/OkHttpWritableBufferTest.java | 63 ++++++++ 43 files changed, 1167 insertions(+), 272 deletions(-) rename core/src/main/java/io/grpc/transport/{AbstractBuffer.java => AbstractReadableBuffer.java} (95%) rename core/src/main/java/io/grpc/transport/{CompositeBuffer.java => CompositeReadableBuffer.java} (76%) rename core/src/main/java/io/grpc/transport/{ForwardingBuffer.java => ForwardingReadableBuffer.java} (91%) rename core/src/main/java/io/grpc/transport/{Buffer.java => ReadableBuffer.java} (98%) rename core/src/main/java/io/grpc/transport/{Buffers.java => ReadableBuffers.java} (79%) create mode 100644 core/src/main/java/io/grpc/transport/WritableBuffer.java create mode 100644 core/src/main/java/io/grpc/transport/WritableBufferAllocator.java rename core/src/test/java/io/grpc/transport/{AbstractBufferTest.java => AbstractReadableBufferTest.java} (95%) create mode 100644 core/src/test/java/io/grpc/transport/ByteWritableBufferTest.java rename core/src/test/java/io/grpc/transport/{CompositeBufferTest.java => CompositeReadableBufferTest.java} (90%) rename core/src/test/java/io/grpc/transport/{BufferTestBase.java => ReadableBufferTestBase.java} (90%) rename core/src/test/java/io/grpc/transport/{BuffersArrayTest.java => ReadableBuffersArrayTest.java} (84%) rename core/src/test/java/io/grpc/transport/{BuffersByteBufferTest.java => ReadableBuffersByteBufferTest.java} (85%) create mode 100644 core/src/test/java/io/grpc/transport/WritableBufferAllocatorTestBase.java create mode 100644 core/src/test/java/io/grpc/transport/WritableBufferTestBase.java rename netty/src/main/java/io/grpc/transport/netty/{NettyBuffer.java => NettyReadableBuffer.java} (92%) create mode 100644 netty/src/main/java/io/grpc/transport/netty/NettyWritableBuffer.java create mode 100644 netty/src/main/java/io/grpc/transport/netty/NettyWritableBufferAllocator.java rename netty/src/test/java/io/grpc/transport/netty/{NettyBufferTest.java => NettyReadableBufferTest.java} (86%) create mode 100644 netty/src/test/java/io/grpc/transport/netty/NettyWritableBufferAllocatorTest.java create mode 100644 netty/src/test/java/io/grpc/transport/netty/NettyWritableBufferTest.java rename okhttp/src/main/java/io/grpc/transport/okhttp/{OkHttpBuffer.java => OkHttpReadableBuffer.java} (87%) create mode 100644 okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpWritableBuffer.java create mode 100644 okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpWritableBufferAllocator.java create mode 100644 okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpWritableBufferAllocatorTest.java create mode 100644 okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpWritableBufferTest.java diff --git a/core/src/main/java/io/grpc/ChannelImpl.java b/core/src/main/java/io/grpc/ChannelImpl.java index ad5431d375..2f9d3deef8 100644 --- a/core/src/main/java/io/grpc/ChannelImpl.java +++ b/core/src/main/java/io/grpc/ChannelImpl.java @@ -33,9 +33,6 @@ package io.grpc; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.Service.Listener; -import com.google.common.util.concurrent.Service.State; import io.grpc.transport.ClientStream; import io.grpc.transport.ClientStreamListener; @@ -48,7 +45,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.concurrent.GuardedBy; diff --git a/core/src/main/java/io/grpc/transport/AbstractClientStream.java b/core/src/main/java/io/grpc/transport/AbstractClientStream.java index de40a15a92..5d7d3a8f1f 100644 --- a/core/src/main/java/io/grpc/transport/AbstractClientStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractClientStream.java @@ -38,7 +38,6 @@ import io.grpc.Metadata; import io.grpc.Status; import java.io.InputStream; -import java.nio.ByteBuffer; import java.util.logging.Level; import java.util.logging.Logger; @@ -67,7 +66,9 @@ public abstract class AbstractClientStream extends AbstractStream * * @param listener the listener to receive notifications */ - protected AbstractClientStream(ClientStreamListener listener) { + protected AbstractClientStream(WritableBufferAllocator bufferAllocator, + ClientStreamListener listener) { + super(bufferAllocator); this.listener = Preconditions.checkNotNull(listener); } @@ -122,7 +123,7 @@ public abstract class AbstractClientStream extends AbstractStream * * @param frame the received data frame. Its ownership is transferred to this method. */ - protected void inboundDataReceived(Buffer frame) { + protected void inboundDataReceived(ReadableBuffer frame) { Preconditions.checkNotNull(frame, "frame"); boolean needToCloseFrame = true; try { @@ -173,7 +174,7 @@ public abstract class AbstractClientStream extends AbstractStream // remoteEndClosed this.status = status; this.trailers = trailers; - deframe(Buffers.empty(), true); + deframe(ReadableBuffers.empty(), true); } @Override @@ -182,7 +183,7 @@ public abstract class AbstractClientStream extends AbstractStream } @Override - protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) { + protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream) { sendFrame(frame, endOfStream); } @@ -193,7 +194,7 @@ public abstract class AbstractClientStream extends AbstractStream * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by * this endpoint. */ - protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream); + protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream); /** * Report stream closure with status to the application layer if not already reported. This method diff --git a/core/src/main/java/io/grpc/transport/AbstractBuffer.java b/core/src/main/java/io/grpc/transport/AbstractReadableBuffer.java similarity index 95% rename from core/src/main/java/io/grpc/transport/AbstractBuffer.java rename to core/src/main/java/io/grpc/transport/AbstractReadableBuffer.java index c7c9039592..ae53b3ff87 100644 --- a/core/src/main/java/io/grpc/transport/AbstractBuffer.java +++ b/core/src/main/java/io/grpc/transport/AbstractReadableBuffer.java @@ -32,9 +32,9 @@ package io.grpc.transport; /** - * Abstract base class for {@link Buffer} implementations. + * Abstract base class for {@link ReadableBuffer} implementations. */ -public abstract class AbstractBuffer implements Buffer { +public abstract class AbstractReadableBuffer implements ReadableBuffer { @Override public final int readUnsignedMedium() { diff --git a/core/src/main/java/io/grpc/transport/AbstractServerStream.java b/core/src/main/java/io/grpc/transport/AbstractServerStream.java index 8c05b7eb44..3c7b994ba5 100644 --- a/core/src/main/java/io/grpc/transport/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractServerStream.java @@ -37,7 +37,6 @@ import io.grpc.Metadata; import io.grpc.Status; import java.io.InputStream; -import java.nio.ByteBuffer; import java.util.logging.Level; import java.util.logging.Logger; @@ -62,7 +61,8 @@ public abstract class AbstractServerStream extends AbstractStream /** Saved trailers from close() that need to be sent once the framer has sent all messages. */ private Metadata.Trailers stashedTrailers; - protected AbstractServerStream(IdT id) { + protected AbstractServerStream(WritableBufferAllocator bufferAllocator, IdT id) { + super(bufferAllocator); id(id); } @@ -125,7 +125,7 @@ public abstract class AbstractServerStream extends AbstractStream * be retained. * @param endOfStream {@code true} if no more data will be received on the stream. */ - public void inboundDataReceived(Buffer frame, boolean endOfStream) { + public void inboundDataReceived(ReadableBuffer frame, boolean endOfStream) { if (inboundPhase() == Phase.STATUS) { frame.close(); return; @@ -142,8 +142,8 @@ public abstract class AbstractServerStream extends AbstractStream } @Override - protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) { - if (frame.hasRemaining()) { + protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream) { + if (frame.readableBytes() > 0) { sendFrame(frame, false); } if (endOfStream) { @@ -167,7 +167,7 @@ public abstract class AbstractServerStream extends AbstractStream * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by * this endpoint. */ - protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream); + protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream); /** * Sends trailers to the remote end point. This call implies end of stream. diff --git a/core/src/main/java/io/grpc/transport/AbstractStream.java b/core/src/main/java/io/grpc/transport/AbstractStream.java index f9bcd75e7d..0e4312f494 100644 --- a/core/src/main/java/io/grpc/transport/AbstractStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractStream.java @@ -36,7 +36,6 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; import java.io.InputStream; -import java.nio.ByteBuffer; import javax.annotation.Nullable; @@ -65,7 +64,7 @@ public abstract class AbstractStream implements Stream { */ private Phase outboundPhase = Phase.HEADERS; - AbstractStream() { + AbstractStream(WritableBufferAllocator bufferAllocator) { MessageDeframer.Listener inboundMessageHandler = new MessageDeframer.Listener() { @Override public void bytesRead(int numBytes) { @@ -87,14 +86,14 @@ public abstract class AbstractStream implements Stream { remoteEndClosed(); } }; - MessageFramer.Sink outboundFrameHandler = new MessageFramer.Sink() { + MessageFramer.Sink outboundFrameHandler = new MessageFramer.Sink() { @Override - public void deliverFrame(ByteBuffer frame, boolean endOfStream) { + public void deliverFrame(WritableBuffer frame, boolean endOfStream) { internalSendFrame(frame, endOfStream); } }; - framer = new MessageFramer(outboundFrameHandler, 4096); + framer = new MessageFramer(outboundFrameHandler, bufferAllocator, 4096); this.deframer = new MessageDeframer(inboundMessageHandler); } @@ -168,7 +167,7 @@ public abstract class AbstractStream implements Stream { * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by * this endpoint. */ - protected abstract void internalSendFrame(ByteBuffer frame, boolean endOfStream); + protected abstract void internalSendFrame(WritableBuffer frame, boolean endOfStream); /** * Handles a message that was just deframed. @@ -194,7 +193,7 @@ public abstract class AbstractStream implements Stream { protected abstract void returnProcessedBytes(int processedBytes); /** - * Called when a {@link #deframe(Buffer, boolean)} operation failed. + * Called when a {@link #deframe(ReadableBuffer, boolean)} operation failed. * * @param cause the actual failure */ @@ -212,7 +211,7 @@ public abstract class AbstractStream implements Stream { * Called to parse a received frame and attempt delivery of any completed * messages. Must be called from the transport thread. */ - protected final void deframe(Buffer frame, boolean endOfStream) { + protected final void deframe(ReadableBuffer frame, boolean endOfStream) { try { deframer.deframe(frame, endOfStream); } catch (Throwable t) { diff --git a/core/src/main/java/io/grpc/transport/CompositeBuffer.java b/core/src/main/java/io/grpc/transport/CompositeReadableBuffer.java similarity index 76% rename from core/src/main/java/io/grpc/transport/CompositeBuffer.java rename to core/src/main/java/io/grpc/transport/CompositeReadableBuffer.java index 986b39c212..0da0ca83c7 100644 --- a/core/src/main/java/io/grpc/transport/CompositeBuffer.java +++ b/core/src/main/java/io/grpc/transport/CompositeReadableBuffer.java @@ -38,34 +38,34 @@ import java.util.ArrayDeque; import java.util.Queue; /** - * A {@link Buffer} that is composed of 0 or more {@link Buffer}s. This provides a facade that + * A {@link ReadableBuffer} that is composed of 0 or more {@link ReadableBuffer}s. This provides a facade that * allows multiple buffers to be treated as one. * *

When a buffer is added to a composite, its life cycle is controlled by the composite. Once * the composite has read past the end of a given buffer, that buffer is automatically closed and * removed from the composite. */ -public class CompositeBuffer extends AbstractBuffer { +public class CompositeReadableBuffer extends AbstractReadableBuffer { private int readableBytes; - private final Queue buffers = new ArrayDeque(); + private final Queue buffers = new ArrayDeque(); /** - * Adds a new {@link Buffer} at the end of the buffer list. After a buffer is added, it is + * Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is * expected that this {@code CompositeBuffer} has complete ownership. Any attempt to modify the * buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of * this {@code CompositeBuffer}. */ - public void addBuffer(Buffer buffer) { - if (!(buffer instanceof CompositeBuffer)) { + public void addBuffer(ReadableBuffer buffer) { + if (!(buffer instanceof CompositeReadableBuffer)) { buffers.add(buffer); readableBytes += buffer.readableBytes(); return; } - CompositeBuffer compositeBuffer = (CompositeBuffer) buffer; + CompositeReadableBuffer compositeBuffer = (CompositeReadableBuffer) buffer; while (!compositeBuffer.buffers.isEmpty()) { - Buffer subBuffer = compositeBuffer.buffers.remove(); + ReadableBuffer subBuffer = compositeBuffer.buffers.remove(); buffers.add(subBuffer); } readableBytes += compositeBuffer.readableBytes; @@ -82,7 +82,7 @@ public class CompositeBuffer extends AbstractBuffer { public int readUnsignedByte() { ReadOperation op = new ReadOperation() { @Override - int readInternal(Buffer buffer, int length) { + int readInternal(ReadableBuffer buffer, int length) { return buffer.readUnsignedByte(); } }; @@ -94,7 +94,7 @@ public class CompositeBuffer extends AbstractBuffer { public void skipBytes(int length) { execute(new ReadOperation() { @Override - public int readInternal(Buffer buffer, int length) { + public int readInternal(ReadableBuffer buffer, int length) { buffer.skipBytes(length); return 0; } @@ -106,7 +106,7 @@ public class CompositeBuffer extends AbstractBuffer { execute(new ReadOperation() { int currentOffset = destOffset; @Override - public int readInternal(Buffer buffer, int length) { + public int readInternal(ReadableBuffer buffer, int length) { buffer.readBytes(dest, currentOffset, length); currentOffset += length; return 0; @@ -118,7 +118,7 @@ public class CompositeBuffer extends AbstractBuffer { public void readBytes(final ByteBuffer dest) { execute(new ReadOperation() { @Override - public int readInternal(Buffer buffer, int length) { + public int readInternal(ReadableBuffer buffer, int length) { // Change the limit so that only lengthToCopy bytes are available. int prevLimit = dest.limit(); dest.limit(dest.position() + length); @@ -135,7 +135,7 @@ public class CompositeBuffer extends AbstractBuffer { public void readBytes(final OutputStream dest, int length) throws IOException { ReadOperation op = new ReadOperation() { @Override - public int readInternal(Buffer buffer, int length) throws IOException { + public int readInternal(ReadableBuffer buffer, int length) throws IOException { buffer.readBytes(dest, length); return 0; } @@ -149,13 +149,13 @@ public class CompositeBuffer extends AbstractBuffer { } @Override - public CompositeBuffer readBytes(int length) { + public CompositeReadableBuffer readBytes(int length) { checkReadable(length); readableBytes -= length; - CompositeBuffer newBuffer = new CompositeBuffer(); + CompositeReadableBuffer newBuffer = new CompositeReadableBuffer(); while (length > 0) { - Buffer buffer = buffers.peek(); + ReadableBuffer buffer = buffers.peek(); if (buffer.readableBytes() > length) { newBuffer.addBuffer(buffer.readBytes(length)); length = 0; @@ -175,14 +175,14 @@ public class CompositeBuffer extends AbstractBuffer { } /** - * Executes the given {@link ReadOperation} against the {@link Buffer}s required to satisfy the + * Executes the given {@link ReadOperation} against the {@link ReadableBuffer}s required to satisfy the * requested {@code length}. */ private void execute(ReadOperation op, int length) { checkReadable(length); for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) { - Buffer buffer = buffers.peek(); + ReadableBuffer buffer = buffers.peek(); int lengthToCopy = Math.min(length, buffer.readableBytes()); // Perform the read operation for this buffer. @@ -205,28 +205,28 @@ public class CompositeBuffer extends AbstractBuffer { * If the current buffer is exhausted, removes and closes it. */ private void advanceBufferIfNecessary() { - Buffer buffer = buffers.peek(); + ReadableBuffer buffer = buffers.peek(); if (buffer.readableBytes() == 0) { buffers.remove().close(); } } /** - * A simple read operation to perform on a single {@link Buffer}. All state management for the - * buffers is done by {@link CompositeBuffer#execute(ReadOperation, int)}. + * A simple read operation to perform on a single {@link ReadableBuffer}. All state management for the + * buffers is done by {@link CompositeReadableBuffer#execute(ReadOperation, int)}. */ private abstract class ReadOperation { /** - * Only used by {@link CompositeBuffer#readUnsignedByte()}. + * Only used by {@link CompositeReadableBuffer#readUnsignedByte()}. */ int value; /** - * Only used by {@link CompositeBuffer#readBytes(OutputStream, int)}; + * Only used by {@link CompositeReadableBuffer#readBytes(OutputStream, int)}; */ IOException ex; - final void read(Buffer buffer, int length) { + final void read(ReadableBuffer buffer, int length) { try { value = readInternal(buffer, length); } catch (IOException e) { @@ -238,6 +238,6 @@ public class CompositeBuffer extends AbstractBuffer { return ex != null; } - abstract int readInternal(Buffer buffer, int length) throws IOException; + abstract int readInternal(ReadableBuffer buffer, int length) throws IOException; } } diff --git a/core/src/main/java/io/grpc/transport/ForwardingBuffer.java b/core/src/main/java/io/grpc/transport/ForwardingReadableBuffer.java similarity index 91% rename from core/src/main/java/io/grpc/transport/ForwardingBuffer.java rename to core/src/main/java/io/grpc/transport/ForwardingReadableBuffer.java index cbb41b182b..7a42a1a823 100644 --- a/core/src/main/java/io/grpc/transport/ForwardingBuffer.java +++ b/core/src/main/java/io/grpc/transport/ForwardingReadableBuffer.java @@ -38,21 +38,21 @@ import java.io.OutputStream; import java.nio.ByteBuffer; /** - * Base class for a wrapper around another {@link Buffer}. + * Base class for a wrapper around another {@link ReadableBuffer}. * *

This class just passes every operation through to the underlying buffer. Subclasses may * override methods to intercept concertain operations. */ -public abstract class ForwardingBuffer implements Buffer { +public abstract class ForwardingReadableBuffer implements ReadableBuffer { - private final Buffer buf; + private final ReadableBuffer buf; /** * Constructor. * * @param buf the underlying buffer */ - public ForwardingBuffer(Buffer buf) { + public ForwardingReadableBuffer(ReadableBuffer buf) { this.buf = Preconditions.checkNotNull(buf, "buf"); } @@ -102,7 +102,7 @@ public abstract class ForwardingBuffer implements Buffer { } @Override - public Buffer readBytes(int length) { + public ReadableBuffer readBytes(int length) { return buf.readBytes(length); } diff --git a/core/src/main/java/io/grpc/transport/Http2ClientStream.java b/core/src/main/java/io/grpc/transport/Http2ClientStream.java index 5484e78efe..32f869ebb8 100644 --- a/core/src/main/java/io/grpc/transport/Http2ClientStream.java +++ b/core/src/main/java/io/grpc/transport/Http2ClientStream.java @@ -70,8 +70,9 @@ public abstract class Http2ClientStream extends AbstractClientStream { private Charset errorCharset = Charsets.UTF_8; private boolean contentTypeChecked; - protected Http2ClientStream(ClientStreamListener listener) { - super(listener); + protected Http2ClientStream(WritableBufferAllocator bufferAllocator, + ClientStreamListener listener) { + super(bufferAllocator, listener); } /** @@ -112,7 +113,7 @@ public abstract class Http2ClientStream extends AbstractClientStream { * @param frame the received data frame * @param endOfStream {@code true} if there will be no more data received for this stream */ - protected void transportDataReceived(Buffer frame, boolean endOfStream) { + protected void transportDataReceived(ReadableBuffer frame, boolean endOfStream) { if (transportError == null && inboundPhase() == Phase.HEADERS) { // Must receive headers prior to receiving any payload as we use headers to check for // protocol correctness. @@ -122,7 +123,7 @@ public abstract class Http2ClientStream extends AbstractClientStream { // We've already detected a transport error and now we're just accumulating more detail // for it. transportError = transportError.augmentDescription("DATA-----------------------------\n" + - Buffers.readAsString(frame, errorCharset)); + ReadableBuffers.readAsString(frame, errorCharset)); frame.close(); if (transportError.getDescription().length() > 1000 || endOfStream) { inboundTransportError(transportError); diff --git a/core/src/main/java/io/grpc/transport/MessageDeframer.java b/core/src/main/java/io/grpc/transport/MessageDeframer.java index 4a7d59fd84..638dd51dfd 100644 --- a/core/src/main/java/io/grpc/transport/MessageDeframer.java +++ b/core/src/main/java/io/grpc/transport/MessageDeframer.java @@ -99,8 +99,8 @@ public class MessageDeframer implements Closeable { private int requiredLength = HEADER_LENGTH; private boolean compressedFlag; private boolean endOfStream; - private CompositeBuffer nextFrame; - private CompositeBuffer unprocessed = new CompositeBuffer(); + private CompositeReadableBuffer nextFrame; + private CompositeReadableBuffer unprocessed = new CompositeReadableBuffer(); private long pendingDeliveries; private boolean deliveryStalled = true; @@ -149,10 +149,10 @@ public class MessageDeframer implements Closeable { * @param endOfStream if {@code true}, indicates that {@code data} is the end of the stream from * the remote endpoint. * @throws IllegalStateException if {@link #close()} has been called previously or if - * {@link #deframe(Buffer, boolean)} has previously been called with + * {@link #deframe(ReadableBuffer, boolean)} has previously been called with * {@code endOfStream=true}. */ - public void deframe(Buffer data, boolean endOfStream) { + public void deframe(ReadableBuffer data, boolean endOfStream) { Preconditions.checkNotNull(data, "data"); boolean needToCloseData = true; try { @@ -275,7 +275,7 @@ public class MessageDeframer implements Closeable { int totalBytesRead = 0; try { if (nextFrame == null) { - nextFrame = new CompositeBuffer(); + nextFrame = new CompositeReadableBuffer(); } // Read until the buffer contains all the required bytes. @@ -331,7 +331,7 @@ public class MessageDeframer implements Closeable { } private InputStream getUncompressedBody() { - return Buffers.openStream(nextFrame, true); + return ReadableBuffers.openStream(nextFrame, true); } private InputStream getCompressedBody() { @@ -345,7 +345,7 @@ public class MessageDeframer implements Closeable { } try { - return new GZIPInputStream(Buffers.openStream(nextFrame, true)); + return new GZIPInputStream(ReadableBuffers.openStream(nextFrame, true)); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/core/src/main/java/io/grpc/transport/MessageFramer.java b/core/src/main/java/io/grpc/transport/MessageFramer.java index e8f3507ea0..d35e7cf994 100644 --- a/core/src/main/java/io/grpc/transport/MessageFramer.java +++ b/core/src/main/java/io/grpc/transport/MessageFramer.java @@ -31,6 +31,8 @@ package io.grpc.transport; +import static java.lang.Math.min; + import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; @@ -52,14 +54,14 @@ public class MessageFramer { /** * Sink implemented by the transport layer to receive frames and forward them to their destination */ - public interface Sink { + public interface Sink { /** * Delivers a frame via the transport. * * @param frame the contents of the frame to deliver * @param endOfStream whether the frame is the last one for the GRPC stream */ - public void deliverFrame(T frame, boolean endOfStream); + void deliverFrame(WritableBuffer frame, boolean endOfStream); } private static final int HEADER_LENGTH = 5; @@ -70,11 +72,14 @@ public class MessageFramer { NONE, GZIP; } - private final Sink sink; - private ByteBuffer bytebuf; + private final Sink sink; + private WritableBuffer buffer; private final Compression compression; private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter(); private final byte[] headerScratch = new byte[HEADER_LENGTH]; + private final WritableBufferAllocator bufferAllocator; + private final int maxFrameSize; + private boolean closed; /** * Creates a {@code MessageFramer} without compression. @@ -82,8 +87,8 @@ public class MessageFramer { * @param sink the sink used to deliver frames to the transport * @param maxFrameSize the maximum frame size that this framer will deliver */ - public MessageFramer(Sink sink, int maxFrameSize) { - this(sink, maxFrameSize, Compression.NONE); + public MessageFramer(Sink sink, WritableBufferAllocator bufferAllocator, int maxFrameSize) { + this(sink, bufferAllocator, maxFrameSize, Compression.NONE); } /** @@ -93,9 +98,10 @@ public class MessageFramer { * @param maxFrameSize the maximum frame size that this framer will deliver * @param compression the compression type */ - public MessageFramer(Sink sink, int maxFrameSize, Compression compression) { + public MessageFramer(Sink sink, WritableBufferAllocator bufferAllocator, int maxFrameSize, Compression compression) { this.sink = Preconditions.checkNotNull(sink, "sink"); - this.bytebuf = ByteBuffer.allocate(maxFrameSize); + this.bufferAllocator = bufferAllocator; + this.maxFrameSize = maxFrameSize; this.compression = Preconditions.checkNotNull(compression, "compression"); } @@ -108,17 +114,19 @@ public class MessageFramer { */ public void writePayload(InputStream message, int messageLength) { try { - if (compression == Compression.NONE) { - writeFrame(message, messageLength, false); - } else if (compression != Compression.GZIP) { - throw new AssertionError("Unknown compression type"); - } else { - // compression == GZIP - DirectAccessByteArrayOutputStream out = new DirectAccessByteArrayOutputStream(); - gzipCompressTo(message, messageLength, out); - InputStream compressedMessage = - new DeferredByteArrayInputStream(out.getBuf(), 0, out.getCount()); - writeFrame(compressedMessage, out.getCount(), true); + switch(compression) { + case NONE: + writeFrame(message, messageLength, false); + break; + case GZIP: + DirectAccessByteArrayOutputStream out = new DirectAccessByteArrayOutputStream(); + gzipCompressTo(message, messageLength, out); + InputStream compressedMessage = + new DeferredByteArrayInputStream(out.getBuf(), 0, out.getCount()); + writeFrame(compressedMessage, out.getCount(), true); + break; + default: + throw new AssertionError("Unknown compression type"); } } catch (IOException ex) { throw new RuntimeException(ex); @@ -167,11 +175,14 @@ public class MessageFramer { private void writeRaw(byte[] b, int off, int len) { while (len > 0) { - if (bytebuf.remaining() == 0) { + if (buffer != null && buffer.writableBytes() == 0) { commitToSink(false); } - int toWrite = Math.min(len, bytebuf.remaining()); - bytebuf.put(b, off, toWrite); + if (buffer == null) { + buffer = bufferAllocator.allocate(maxFrameSize); + } + int toWrite = min(len, buffer.writableBytes()); + buffer.write(b, off, toWrite); off += toWrite; len -= toWrite; } @@ -181,10 +192,9 @@ public class MessageFramer { * Flushes any buffered data in the framer to the sink. */ public void flush() { - if (bytebuf.position() == 0) { - return; + if (buffer != null && buffer.readableBytes() > 0) { + commitToSink(false); } - commitToSink(false); } /** @@ -192,7 +202,7 @@ public class MessageFramer { * {@link #close()} or {@link #dispose()}. */ public boolean isClosed() { - return bytebuf == null; + return closed; } /** @@ -202,7 +212,7 @@ public class MessageFramer { public void close() { if (!isClosed()) { commitToSink(true); - dispose(); + closed = true; } } @@ -211,14 +221,19 @@ public class MessageFramer { * closed or disposed, additional calls to this method will have no affect. */ public void dispose() { - // TODO(louiscryan): Returning buffer to a pool would go here - bytebuf = null; + closed = true; + if (buffer != null) { + buffer.release(); + buffer = null; + } } private void commitToSink(boolean endOfStream) { - bytebuf.flip(); - sink.deliverFrame(bytebuf, endOfStream); - bytebuf.clear(); + if (buffer == null) { + buffer = bufferAllocator.allocate(0); + } + sink.deliverFrame(buffer, endOfStream); + buffer = null; } private void verifyNotClosed() { diff --git a/core/src/main/java/io/grpc/transport/Buffer.java b/core/src/main/java/io/grpc/transport/ReadableBuffer.java similarity index 98% rename from core/src/main/java/io/grpc/transport/Buffer.java rename to core/src/main/java/io/grpc/transport/ReadableBuffer.java index 482202549b..6b72c0f4ae 100644 --- a/core/src/main/java/io/grpc/transport/Buffer.java +++ b/core/src/main/java/io/grpc/transport/ReadableBuffer.java @@ -44,7 +44,7 @@ import java.nio.ByteBuffer; * done in {@link ByteBuffer}. It is not expected that callers will attempt to modify the backing * array. */ -public interface Buffer extends Closeable { +public interface ReadableBuffer extends Closeable { /** * Gets the current number of readable bytes remaining in this buffer. @@ -131,7 +131,7 @@ public interface Buffer extends Closeable { * @param length the number of bytes to contain in returned Buffer. * @throws IndexOutOfBoundsException if required bytes are not readable */ - Buffer readBytes(int length); + ReadableBuffer readBytes(int length); /** * Indicates whether or not this buffer exposes a backing array. diff --git a/core/src/main/java/io/grpc/transport/Buffers.java b/core/src/main/java/io/grpc/transport/ReadableBuffers.java similarity index 79% rename from core/src/main/java/io/grpc/transport/Buffers.java rename to core/src/main/java/io/grpc/transport/ReadableBuffers.java index 7655d00bda..1a7a4504c0 100644 --- a/core/src/main/java/io/grpc/transport/Buffers.java +++ b/core/src/main/java/io/grpc/transport/ReadableBuffers.java @@ -42,49 +42,49 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; /** - * Utility methods for creating {@link Buffer} instances. + * Utility methods for creating {@link ReadableBuffer} instances. */ -public final class Buffers { - private static final Buffer EMPTY_BUFFER = new ByteArrayWrapper(new byte[0]); +public final class ReadableBuffers { + private static final ReadableBuffer EMPTY_BUFFER = new ByteArrayWrapper(new byte[0]); /** - * Returns an empty {@link Buffer} instance. + * Returns an empty {@link ReadableBuffer} instance. */ - public static Buffer empty() { + public static ReadableBuffer empty() { return EMPTY_BUFFER; } /** * Shortcut for {@code wrap(bytes, 0, bytes.length}. */ - public static Buffer wrap(byte[] bytes) { + public static ReadableBuffer wrap(byte[] bytes) { return new ByteArrayWrapper(bytes, 0, bytes.length); } /** - * Creates a new {@link Buffer} that is backed by the given byte array. + * Creates a new {@link ReadableBuffer} that is backed by the given byte array. * * @param bytes the byte array being wrapped. * @param offset the starting offset for the buffer within the byte array. * @param length the length of the buffer from the {@code offset} index. */ - public static Buffer wrap(byte[] bytes, int offset, int length) { + public static ReadableBuffer wrap(byte[] bytes, int offset, int length) { return new ByteArrayWrapper(bytes, offset, length); } /** - * Creates a new {@link Buffer} that is backed by the given {@link ByteBuffer}. Calls to read from + * Creates a new {@link ReadableBuffer} that is backed by the given {@link ByteBuffer}. Calls to read from * the buffer will increment the position of the {@link ByteBuffer}. */ - public static Buffer wrap(ByteBuffer bytes) { - return new ByteBufferWrapper(bytes); + public static ReadableBuffer wrap(ByteBuffer bytes) { + return new ByteReadableBufferWrapper(bytes); } /** - * Reads an entire {@link Buffer} to a new array. After calling this method, the buffer will + * Reads an entire {@link ReadableBuffer} to a new array. After calling this method, the buffer will * contain no readable bytes. */ - public static byte[] readArray(Buffer buffer) { + public static byte[] readArray(ReadableBuffer buffer) { Preconditions.checkNotNull(buffer, "buffer"); int length = buffer.readableBytes(); byte[] bytes = new byte[length]; @@ -93,18 +93,18 @@ public final class Buffers { } /** - * Reads the entire {@link Buffer} to a new {@link String} with the given charset. + * Reads the entire {@link ReadableBuffer} to a new {@link String} with the given charset. */ - public static String readAsString(Buffer buffer, Charset charset) { + public static String readAsString(ReadableBuffer buffer, Charset charset) { Preconditions.checkNotNull(charset, "charset"); byte[] bytes = readArray(buffer); return new String(bytes, charset); } /** - * Reads the entire {@link Buffer} to a new {@link String} using UTF-8 decoding. + * Reads the entire {@link ReadableBuffer} to a new {@link String} using UTF-8 decoding. */ - public static String readAsStringUtf8(Buffer buffer) { + public static String readAsStringUtf8(ReadableBuffer buffer) { return readAsString(buffer, UTF_8); } @@ -116,18 +116,18 @@ public final class Buffers { * @param buffer the buffer backing the new {@link InputStream}. * @param owner if {@code true}, the returned stream will close the buffer when closed. */ - public static InputStream openStream(Buffer buffer, boolean owner) { + public static InputStream openStream(ReadableBuffer buffer, boolean owner) { return new BufferInputStream(owner ? buffer : ignoreClose(buffer)); } /** - * Decorates the given {@link Buffer} to ignore calls to {@link Buffer#close}. + * Decorates the given {@link ReadableBuffer} to ignore calls to {@link ReadableBuffer#close}. * * @param buffer the buffer to be decorated. - * @return a wrapper around {@code buffer} that ignores calls to {@link Buffer#close}. + * @return a wrapper around {@code buffer} that ignores calls to {@link ReadableBuffer#close}. */ - public static Buffer ignoreClose(Buffer buffer) { - return new ForwardingBuffer(buffer) { + public static ReadableBuffer ignoreClose(ReadableBuffer buffer) { + return new ForwardingReadableBuffer(buffer) { @Override public void close() { // Ignore. @@ -136,9 +136,9 @@ public final class Buffers { } /** - * A {@link Buffer} that is backed by a byte array. + * A {@link ReadableBuffer} that is backed by a byte array. */ - private static class ByteArrayWrapper extends AbstractBuffer { + private static class ByteArrayWrapper extends AbstractReadableBuffer { int offset; final int end; final byte[] bytes; @@ -221,12 +221,12 @@ public final class Buffers { } /** - * A {@link Buffer} that is backed by a {@link ByteBuffer}. + * A {@link ReadableBuffer} that is backed by a {@link ByteBuffer}. */ - private static class ByteBufferWrapper extends AbstractBuffer { + private static class ByteReadableBufferWrapper extends AbstractReadableBuffer { final ByteBuffer bytes; - ByteBufferWrapper(ByteBuffer bytes) { + ByteReadableBufferWrapper(ByteBuffer bytes) { this.bytes = Preconditions.checkNotNull(bytes, "bytes"); } @@ -283,12 +283,12 @@ public final class Buffers { } @Override - public ByteBufferWrapper readBytes(int length) { + public ByteReadableBufferWrapper readBytes(int length) { checkReadable(length); ByteBuffer buffer = bytes.duplicate(); bytes.position(bytes.position() + length); buffer.limit(bytes.position() + length); - return new ByteBufferWrapper(buffer); + return new ByteReadableBufferWrapper(buffer); } @Override @@ -308,12 +308,12 @@ public final class Buffers { } /** - * An {@link InputStream} that is backed by a {@link Buffer}. + * An {@link InputStream} that is backed by a {@link ReadableBuffer}. */ private static class BufferInputStream extends InputStream { - final Buffer buffer; + final ReadableBuffer buffer; - public BufferInputStream(Buffer buffer) { + public BufferInputStream(ReadableBuffer buffer) { this.buffer = Preconditions.checkNotNull(buffer, "buffer"); } @@ -344,5 +344,5 @@ public final class Buffers { } } - private Buffers() {} + private ReadableBuffers() {} } diff --git a/core/src/main/java/io/grpc/transport/WritableBuffer.java b/core/src/main/java/io/grpc/transport/WritableBuffer.java new file mode 100644 index 0000000000..225feb7da3 --- /dev/null +++ b/core/src/main/java/io/grpc/transport/WritableBuffer.java @@ -0,0 +1,68 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport; + +/** + * An interface for a byte buffer that can only be written to. + * {@link WritableBuffer}s are a generic way to transfer bytes to + * the concrete network transports, like Netty and OkHttp. + */ +public interface WritableBuffer { + + /** + * Appends {@code length} bytes to the buffer from the source + * array starting at {@code srcIndex}. + * + * @throws IndexOutOfBoundsException + * if the specified {@code srcIndex} is less than {@code 0}, + * if {@code srcIndex + length} is greater than + * {@code src.length}, or + * if {@code length} is greater than {@link #writableBytes()} + */ + void write(byte[] src, int srcIndex, int length); + + /** + * Returns the number of bytes one can write to the buffer. + */ + int writableBytes(); + + /** + * Returns the number of bytes one can read from the buffer. + */ + int readableBytes(); + + /** + * Releases the buffer, indicating to the {@link WritableBufferAllocator} that + * this buffer is no longer used and its resources can be reused. + */ + void release(); +} diff --git a/core/src/main/java/io/grpc/transport/WritableBufferAllocator.java b/core/src/main/java/io/grpc/transport/WritableBufferAllocator.java new file mode 100644 index 0000000000..f04eb1b182 --- /dev/null +++ b/core/src/main/java/io/grpc/transport/WritableBufferAllocator.java @@ -0,0 +1,44 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport; + +/** + * The preferred way of creating a {@link WritableBuffer}. + */ +public interface WritableBufferAllocator { + + /** + * Allocate a new {@link WritableBuffer} that can hold + * {@code capacity} bytes. + */ + WritableBuffer allocate(int capacity); +} diff --git a/core/src/test/java/io/grpc/transport/AbstractBufferTest.java b/core/src/test/java/io/grpc/transport/AbstractReadableBufferTest.java similarity index 95% rename from core/src/test/java/io/grpc/transport/AbstractBufferTest.java rename to core/src/test/java/io/grpc/transport/AbstractReadableBufferTest.java index c3c74a2d49..71be905a05 100644 --- a/core/src/test/java/io/grpc/transport/AbstractBufferTest.java +++ b/core/src/test/java/io/grpc/transport/AbstractReadableBufferTest.java @@ -43,13 +43,13 @@ import org.mockito.MockitoAnnotations; import org.mockito.stubbing.OngoingStubbing; /** - * Tests for {@link AbstractBuffer}. + * Tests for {@link AbstractReadableBuffer}. */ @RunWith(JUnit4.class) -public class AbstractBufferTest { +public class AbstractReadableBufferTest { @Mock - private AbstractBuffer buffer; + private AbstractReadableBuffer buffer; @Before public void setup() { diff --git a/core/src/test/java/io/grpc/transport/ByteWritableBufferTest.java b/core/src/test/java/io/grpc/transport/ByteWritableBufferTest.java new file mode 100644 index 0000000000..b57cf4c985 --- /dev/null +++ b/core/src/test/java/io/grpc/transport/ByteWritableBufferTest.java @@ -0,0 +1,56 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport; + +import org.junit.Before; + +import java.util.Arrays; + +public class ByteWritableBufferTest extends WritableBufferTestBase { + + private MessageFramerTest.ByteWritableBuffer buffer; + + @Before + public void setup() { + buffer = new MessageFramerTest.ByteWritableBuffer(100); + } + + @Override + protected WritableBuffer buffer() { + return buffer; + } + + @Override + protected byte[] writtenBytes() { + return Arrays.copyOfRange(buffer.data, 0, buffer.readableBytes()); + } +} diff --git a/core/src/test/java/io/grpc/transport/CompositeBufferTest.java b/core/src/test/java/io/grpc/transport/CompositeReadableBufferTest.java similarity index 90% rename from core/src/test/java/io/grpc/transport/CompositeBufferTest.java rename to core/src/test/java/io/grpc/transport/CompositeReadableBufferTest.java index ba5d18543d..89f77180cc 100644 --- a/core/src/test/java/io/grpc/transport/CompositeBufferTest.java +++ b/core/src/test/java/io/grpc/transport/CompositeReadableBufferTest.java @@ -47,17 +47,17 @@ import java.io.IOException; import java.nio.ByteBuffer; /** - * Tests for {@link CompositeBuffer}. + * Tests for {@link CompositeReadableBuffer}. */ @RunWith(JUnit4.class) -public class CompositeBufferTest { +public class CompositeReadableBufferTest { private static final String EXPECTED_VALUE = "hello world"; - private CompositeBuffer composite; + private CompositeReadableBuffer composite; @Before public void setup() { - composite = new CompositeBuffer(); + composite = new CompositeReadableBuffer(); splitAndAdd(EXPECTED_VALUE); } @@ -68,10 +68,10 @@ public class CompositeBufferTest { @Test public void singleBufferShouldSucceed() { - composite = new CompositeBuffer(); - composite.addBuffer(Buffers.wrap(EXPECTED_VALUE.getBytes(UTF_8))); + composite = new CompositeReadableBuffer(); + composite.addBuffer(ReadableBuffers.wrap(EXPECTED_VALUE.getBytes(UTF_8))); assertEquals(EXPECTED_VALUE.length(), composite.readableBytes()); - assertEquals(EXPECTED_VALUE, Buffers.readAsStringUtf8(composite)); + assertEquals(EXPECTED_VALUE, ReadableBuffers.readAsStringUtf8(composite)); assertEquals(0, composite.readableBytes()); } @@ -161,9 +161,9 @@ public class CompositeBufferTest { @Test public void closeShouldCloseBuffers() { - composite = new CompositeBuffer(); - Buffer mock1 = mock(Buffer.class); - Buffer mock2 = mock(Buffer.class); + composite = new CompositeReadableBuffer(); + ReadableBuffer mock1 = mock(ReadableBuffer.class); + ReadableBuffer mock2 = mock(ReadableBuffer.class); composite.addBuffer(mock1); composite.addBuffer(mock2); @@ -177,7 +177,7 @@ public class CompositeBufferTest { for (int startIndex = 0, endIndex = 0; startIndex < value.length(); startIndex = endIndex) { endIndex = Math.min(value.length(), startIndex + partLength); String part = value.substring(startIndex, endIndex); - composite.addBuffer(Buffers.wrap(part.getBytes(UTF_8))); + composite.addBuffer(ReadableBuffers.wrap(part.getBytes(UTF_8))); } assertEquals(value.length(), composite.readableBytes()); diff --git a/core/src/test/java/io/grpc/transport/MessageDeframerTest.java b/core/src/test/java/io/grpc/transport/MessageDeframerTest.java index 82bcdc2469..255245ee0c 100644 --- a/core/src/test/java/io/grpc/transport/MessageDeframerTest.java +++ b/core/src/test/java/io/grpc/transport/MessageDeframerTest.java @@ -68,7 +68,7 @@ public class MessageDeframerTest { @Test public void simplePayload() { deframer.request(1); - deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 2, 3, 14}), false); + deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); verify(listener).messageRead(messages.capture()); assertEquals(Bytes.asList(new byte[]{3, 14}), bytes(messages)); verify(listener, atLeastOnce()).bytesRead(anyInt()); @@ -78,7 +78,7 @@ public class MessageDeframerTest { @Test public void smallCombinedPayloads() { deframer.request(2); - deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); + deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); verify(listener, times(2)).messageRead(messages.capture()); List streams = messages.getAllValues(); assertEquals(2, streams.size()); @@ -149,7 +149,7 @@ public class MessageDeframerTest { public void largerFrameSize() { deframer.request(1); deframer.deframe( - Buffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false); + ReadableBuffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false); verify(listener).messageRead(messages.capture()); assertEquals(Bytes.asList(new byte[1000]), bytes(messages)); verify(listener, atLeastOnce()).bytesRead(anyInt()); @@ -196,8 +196,8 @@ public class MessageDeframerTest { } } - private static Buffer buffer(byte[] bytes) { - return Buffers.wrap(bytes); + private static ReadableBuffer buffer(byte[] bytes) { + return ReadableBuffers.wrap(bytes); } private static byte[] compress(byte[] bytes) { diff --git a/core/src/test/java/io/grpc/transport/MessageFramerTest.java b/core/src/test/java/io/grpc/transport/MessageFramerTest.java index 801165ed29..ece7a57d4a 100644 --- a/core/src/test/java/io/grpc/transport/MessageFramerTest.java +++ b/core/src/test/java/io/grpc/transport/MessageFramerTest.java @@ -36,8 +36,9 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static io.grpc.transport.MessageFramer.Compression; -import com.google.common.primitives.Bytes; +import com.google.common.base.Preconditions; import org.junit.Before; import org.junit.Test; @@ -49,8 +50,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import java.io.ByteArrayInputStream; -import java.nio.ByteBuffer; -import java.util.List; +import java.util.Arrays; /** * Tests for {@link MessageFramer} @@ -60,19 +60,18 @@ public class MessageFramerTest { private static final int TRANSPORT_FRAME_SIZE = 12; @Mock - private MessageFramer.Sink> sink; - private MessageFramer.Sink copyingSink; + private MessageFramer.Sink sink; private MessageFramer framer; @Captor - private ArgumentCaptor> frameCaptor; + private ArgumentCaptor frameCaptor; + private WritableBufferAllocator allocator = new BytesWritableBufferAllocator(); @Before public void setup() { MockitoAnnotations.initMocks(this); - copyingSink = new ByteArrayConverterSink(sink); - framer = new MessageFramer(copyingSink, TRANSPORT_FRAME_SIZE); + framer = new MessageFramer(sink, allocator, TRANSPORT_FRAME_SIZE); } @Test @@ -80,7 +79,7 @@ public class MessageFramerTest { writePayload(framer, new byte[] {3, 14}); verifyNoMoreInteractions(sink); framer.flush(); - verify(sink).deliverFrame(Bytes.asList(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); verifyNoMoreInteractions(sink); } @@ -91,8 +90,7 @@ public class MessageFramerTest { writePayload(framer, new byte[] {14}); verifyNoMoreInteractions(sink); framer.flush(); - verify(sink).deliverFrame( - Bytes.asList(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 1, 14}), false); verifyNoMoreInteractions(sink); } @@ -101,26 +99,25 @@ public class MessageFramerTest { writePayload(framer, new byte[] {3, 14, 1, 5, 9, 2, 6}); verifyNoMoreInteractions(sink); framer.close(); - verify(sink).deliverFrame(Bytes.asList(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9, 2, 6}), true); verifyNoMoreInteractions(sink); } @Test public void closeWithoutBufferedFrameGivesEmptySink() { framer.close(); - verify(sink).deliverFrame(Bytes.asList(), true); + verify(sink).deliverFrame(new ByteWritableBuffer(0), true); verifyNoMoreInteractions(sink); } @Test public void payloadSplitBetweenSinks() { writePayload(framer, new byte[] {3, 14, 1, 5, 9, 2, 6, 5}); - verify(sink).deliverFrame( - Bytes.asList(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 8, 3, 14, 1, 5, 9, 2, 6}), false); verifyNoMoreInteractions(sink); framer.flush(); - verify(sink).deliverFrame(Bytes.asList(new byte[] {5}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {5}), false); verifyNoMoreInteractions(sink); } @@ -129,11 +126,11 @@ public class MessageFramerTest { writePayload(framer, new byte[] {3, 14, 1}); writePayload(framer, new byte[] {3}); verify(sink).deliverFrame( - Bytes.asList(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false); + toWriteBuffer(new byte[] {0, 0, 0, 0, 3, 3, 14, 1, 0, 0, 0, 0}), false); verifyNoMoreInteractions(sink); framer.flush(); - verify(sink).deliverFrame(Bytes.asList(new byte[] {1, 3}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {1, 3}), false); verifyNoMoreInteractions(sink); } @@ -141,7 +138,7 @@ public class MessageFramerTest { public void emptyPayloadYieldsFrame() throws Exception { writePayload(framer, new byte[0]); framer.flush(); - verify(sink).deliverFrame(Bytes.asList(new byte[] {0, 0, 0, 0, 0}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 0}), false); } @Test @@ -149,60 +146,118 @@ public class MessageFramerTest { writePayload(framer, new byte[] {3, 14}); framer.flush(); framer.flush(); - verify(sink).deliverFrame(Bytes.asList(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); + verify(sink).deliverFrame(toWriteBuffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); verifyNoMoreInteractions(sink); } @Test public void largerFrameSize() throws Exception { final int transportFrameSize = 10000; - MessageFramer framer = new MessageFramer(copyingSink, transportFrameSize); + MessageFramer framer = new MessageFramer(sink, allocator, transportFrameSize); writePayload(framer, new byte[1000]); framer.flush(); verify(sink).deliverFrame(frameCaptor.capture(), eq(false)); - List buffer = frameCaptor.getValue(); + ByteWritableBuffer buffer = frameCaptor.getValue(); assertEquals(1005, buffer.size()); - assertEquals(Bytes.asList(new byte[] {0, 0, 0, 3, (byte) 232}), buffer.subList(0, 5)); + + byte data[] = new byte[1005]; + data[3] = 3; + data[4] = (byte) 232; + + assertEquals(toWriteBuffer(data, transportFrameSize), buffer); verifyNoMoreInteractions(sink); } @Test public void compressed() throws Exception { final int transportFrameSize = 100; - MessageFramer framer = new MessageFramer(copyingSink, transportFrameSize, - MessageFramer.Compression.GZIP); + MessageFramer framer = + new MessageFramer(sink, allocator, transportFrameSize, Compression.GZIP); writePayload(framer, new byte[1000]); framer.flush(); verify(sink).deliverFrame(frameCaptor.capture(), eq(false)); - List buffer = frameCaptor.getValue(); + ByteWritableBuffer buffer = frameCaptor.getValue(); // It should have compressed very well. assertTrue(buffer.size() < 100); // We purposefully don't check the last byte of length, since that depends on how exactly it // compressed. - assertEquals(Bytes.asList(new byte[] {1, 0, 0, 0}), buffer.subList(0, 4)); - verifyNoMoreInteractions(sink); + assertEquals(1, buffer.data[0]); + assertEquals(0, buffer.data[1]); + assertEquals(0, buffer.data[2]); + assertEquals(0, buffer.data[3]); + } + + private static WritableBuffer toWriteBuffer(byte[] data) { + return toWriteBuffer(data, TRANSPORT_FRAME_SIZE); + } + + private static WritableBuffer toWriteBuffer(byte[] data, int maxFrameSize) { + ByteWritableBuffer buffer = new ByteWritableBuffer(maxFrameSize); + buffer.write(data, 0, data.length); + return buffer; } private static void writePayload(MessageFramer framer, byte[] bytes) { framer.writePayload(new ByteArrayInputStream(bytes), bytes.length); } - /** - * Since ByteBuffers are reused, this sink copies their value at the time of the call. Converting - * to List is convenience. - */ - private static class ByteArrayConverterSink implements MessageFramer.Sink { - private final MessageFramer.Sink> delegate; + static class ByteWritableBuffer implements WritableBuffer { + byte[] data; + private int writeIdx; - public ByteArrayConverterSink(MessageFramer.Sink> delegate) { - this.delegate = delegate; + ByteWritableBuffer(int maxFrameSize) { + data = new byte[maxFrameSize]; } @Override - public void deliverFrame(ByteBuffer frame, boolean endOfStream) { - byte[] frameBytes = new byte[frame.remaining()]; - frame.get(frameBytes); - delegate.deliverFrame(Bytes.asList(frameBytes), endOfStream); + public void write(byte[] bytes, int srcIndex, int length) { + System.arraycopy(bytes, srcIndex, data, writeIdx, length); + writeIdx += length; + } + + @Override + public int writableBytes() { + return data.length - writeIdx; + } + + @Override + public int readableBytes() { + return writeIdx; + } + + @Override + public void release() { + data = null; + } + + int size() { + return writeIdx; + } + + @Override + public boolean equals(Object buffer) { + if (!(buffer instanceof ByteWritableBuffer)) { + return false; + } + + ByteWritableBuffer other = (ByteWritableBuffer) buffer; + + return writableBytes() == other.writableBytes() && + readableBytes() == other.readableBytes() && + Arrays.equals(data, other.data); + } + + @Override + public int hashCode() { + return Arrays.hashCode(data) + writableBytes() + readableBytes(); + } + } + + static class BytesWritableBufferAllocator implements WritableBufferAllocator { + + @Override + public WritableBuffer allocate(int maxCapacity) { + return new ByteWritableBuffer(maxCapacity); } } } diff --git a/core/src/test/java/io/grpc/transport/BufferTestBase.java b/core/src/test/java/io/grpc/transport/ReadableBufferTestBase.java similarity index 90% rename from core/src/test/java/io/grpc/transport/BufferTestBase.java rename to core/src/test/java/io/grpc/transport/ReadableBufferTestBase.java index 269583963c..5e72d5f243 100644 --- a/core/src/test/java/io/grpc/transport/BufferTestBase.java +++ b/core/src/test/java/io/grpc/transport/ReadableBufferTestBase.java @@ -43,15 +43,15 @@ import java.nio.ByteBuffer; import java.util.Arrays; /** - * Abstract base class for tests of {@link Buffer} subclasses. + * Abstract base class for tests of {@link ReadableBuffer} subclasses. */ @RunWith(JUnit4.class) -public abstract class BufferTestBase { +public abstract class ReadableBufferTestBase { protected final String msg = "hello"; @Test public void bufferShouldReadAllBytes() { - Buffer buffer = buffer(); + ReadableBuffer buffer = buffer(); for (int ix = 0; ix < msg.length(); ++ix) { assertEquals(msg.length() - ix, buffer.readableBytes()); assertEquals(msg.charAt(ix), buffer.readUnsignedByte()); @@ -61,7 +61,7 @@ public abstract class BufferTestBase { @Test public void readToArrayShouldSucceed() { - Buffer buffer = buffer(); + ReadableBuffer buffer = buffer(); byte[] array = new byte[msg.length()]; buffer.readBytes(array, 0, array.length); Arrays.equals(msg.getBytes(UTF_8), array); @@ -70,7 +70,7 @@ public abstract class BufferTestBase { @Test public void partialReadToArrayShouldSucceed() { - Buffer buffer = buffer(); + ReadableBuffer buffer = buffer(); byte[] array = new byte[msg.length()]; buffer.readBytes(array, 1, 2); Arrays.equals(new byte[] {'h', 'e'}, Arrays.copyOfRange(array, 1, 3)); @@ -79,7 +79,7 @@ public abstract class BufferTestBase { @Test public void readToStreamShouldSucceed() throws Exception { - Buffer buffer = buffer(); + ReadableBuffer buffer = buffer(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); buffer.readBytes(stream, msg.length()); Arrays.equals(msg.getBytes(UTF_8), stream.toByteArray()); @@ -88,7 +88,7 @@ public abstract class BufferTestBase { @Test public void partialReadToStreamShouldSucceed() throws Exception { - Buffer buffer = buffer(); + ReadableBuffer buffer = buffer(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); buffer.readBytes(stream, 2); Arrays.equals(new byte[]{'h', 'e'}, Arrays.copyOfRange(stream.toByteArray(), 0, 2)); @@ -97,7 +97,7 @@ public abstract class BufferTestBase { @Test public void readToByteBufferShouldSucceed() { - Buffer buffer = buffer(); + ReadableBuffer buffer = buffer(); ByteBuffer byteBuffer = ByteBuffer.allocate(msg.length()); buffer.readBytes(byteBuffer); byteBuffer.flip(); @@ -109,7 +109,7 @@ public abstract class BufferTestBase { @Test public void partialReadToByteBufferShouldSucceed() { - Buffer buffer = buffer(); + ReadableBuffer buffer = buffer(); ByteBuffer byteBuffer = ByteBuffer.allocate(2); buffer.readBytes(byteBuffer); byteBuffer.flip(); @@ -119,5 +119,5 @@ public abstract class BufferTestBase { assertEquals(3, buffer.readableBytes()); } - protected abstract Buffer buffer(); + protected abstract ReadableBuffer buffer(); } diff --git a/core/src/test/java/io/grpc/transport/BuffersArrayTest.java b/core/src/test/java/io/grpc/transport/ReadableBuffersArrayTest.java similarity index 84% rename from core/src/test/java/io/grpc/transport/BuffersArrayTest.java rename to core/src/test/java/io/grpc/transport/ReadableBuffersArrayTest.java index 12fd8597e2..1008796e7b 100644 --- a/core/src/test/java/io/grpc/transport/BuffersArrayTest.java +++ b/core/src/test/java/io/grpc/transport/ReadableBuffersArrayTest.java @@ -32,7 +32,7 @@ package io.grpc.transport; import static com.google.common.base.Charsets.UTF_8; -import static io.grpc.transport.Buffers.wrap; +import static io.grpc.transport.ReadableBuffers.wrap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -40,14 +40,14 @@ import static org.junit.Assert.assertTrue; import org.junit.Test; /** - * Tests for the array-backed {@link Buffer} returned by {@link Buffers#wrap(byte[], int, int)}; + * Tests for the array-backed {@link ReadableBuffer} returned by {@link ReadableBuffers#wrap(byte[], int, int)}; */ -public class BuffersArrayTest extends BufferTestBase { +public class ReadableBuffersArrayTest extends ReadableBufferTestBase { @Test public void bufferShouldExposeArray() { byte[] array = msg.getBytes(UTF_8); - Buffer buffer = wrap(array, 1, msg.length() - 1); + ReadableBuffer buffer = wrap(array, 1, msg.length() - 1); assertTrue(buffer.hasArray()); assertSame(array, buffer.array()); assertEquals(1, buffer.arrayOffset()); @@ -58,7 +58,7 @@ public class BuffersArrayTest extends BufferTestBase { } @Override - protected Buffer buffer() { - return Buffers.wrap(msg.getBytes(UTF_8), 0, msg.length()); + protected ReadableBuffer buffer() { + return ReadableBuffers.wrap(msg.getBytes(UTF_8), 0, msg.length()); } } diff --git a/core/src/test/java/io/grpc/transport/BuffersByteBufferTest.java b/core/src/test/java/io/grpc/transport/ReadableBuffersByteBufferTest.java similarity index 85% rename from core/src/test/java/io/grpc/transport/BuffersByteBufferTest.java rename to core/src/test/java/io/grpc/transport/ReadableBuffersByteBufferTest.java index 9486c0f574..c753147508 100644 --- a/core/src/test/java/io/grpc/transport/BuffersByteBufferTest.java +++ b/core/src/test/java/io/grpc/transport/ReadableBuffersByteBufferTest.java @@ -36,12 +36,12 @@ import static com.google.common.base.Charsets.UTF_8; import java.nio.ByteBuffer; /** - * Tests for the array-backed {@link Buffer} returned by {@link Buffers#wrap(ByteBuffer)}. + * Tests for the array-backed {@link ReadableBuffer} returned by {@link ReadableBuffers#wrap(ByteBuffer)}. */ -public class BuffersByteBufferTest extends BufferTestBase { +public class ReadableBuffersByteBufferTest extends ReadableBufferTestBase { @Override - protected Buffer buffer() { - return Buffers.wrap(ByteBuffer.wrap(msg.getBytes(UTF_8))); + protected ReadableBuffer buffer() { + return ReadableBuffers.wrap(ByteBuffer.wrap(msg.getBytes(UTF_8))); } } diff --git a/core/src/test/java/io/grpc/transport/WritableBufferAllocatorTestBase.java b/core/src/test/java/io/grpc/transport/WritableBufferAllocatorTestBase.java new file mode 100644 index 0000000000..59f27cecad --- /dev/null +++ b/core/src/test/java/io/grpc/transport/WritableBufferAllocatorTestBase.java @@ -0,0 +1,66 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Abstract base class for tests of {@link WritableBufferAllocator} subclasses. + */ +@RunWith(JUnit4.class) +public abstract class WritableBufferAllocatorTestBase { + + protected abstract WritableBufferAllocator allocator(); + + @Test + public void testBuffersAreDifferent() { + WritableBuffer buffer1 = allocator().allocate(100); + WritableBuffer buffer2 = allocator().allocate(100); + + assertNotSame(buffer1, buffer2); + + buffer1.release(); + buffer2.release(); + } + + @Test + public void testCapacity() { + WritableBuffer buffer = allocator().allocate(4096); + assertEquals(0, buffer.readableBytes()); + assertEquals(4096, buffer.writableBytes()); + } +} diff --git a/core/src/test/java/io/grpc/transport/WritableBufferTestBase.java b/core/src/test/java/io/grpc/transport/WritableBufferTestBase.java new file mode 100644 index 0000000000..4e433dbf7e --- /dev/null +++ b/core/src/test/java/io/grpc/transport/WritableBufferTestBase.java @@ -0,0 +1,119 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Abstract base class for tests of {@link WritableBuffer} subclasses. + */ +@RunWith(JUnit4.class) +public abstract class WritableBufferTestBase { + + /** + * Returns a new buffer for every test case with + * at least 100 byte of capacity. + */ + protected abstract WritableBuffer buffer(); + + /** + * Bytes written to {@link #buffer()}. + */ + protected abstract byte[] writtenBytes(); + + @Test(expected = RuntimeException.class) + public void testWriteNegativeLength() { + buffer().write(new byte[1], 0, -1); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testWriteNegativeSrcIndex() { + buffer().write(new byte[1], -1, 0); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testWriteSrcIndexAndLengthExceedSrcLength() { + buffer().write(new byte[10], 1, 10); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testWriteSrcIndexAndLengthExceedWritableBytes() { + buffer().write(new byte[buffer().writableBytes()], 1, buffer().writableBytes()); + } + + @Test + public void testWritableAndReadableBytes() { + int before = buffer().writableBytes(); + buffer().write(new byte[10], 0, 5); + + assertEquals(5, before - buffer().writableBytes()); + assertEquals(5, buffer().readableBytes()); + } + + @Test + public void testWriteSrcIndex() { + byte b[] = new byte[10]; + for (byte i = 5; i < 10; i++) { + b[i] = i; + } + + buffer().write(b, 5, 5); + + assertEquals(5, buffer().readableBytes()); + byte writtenBytes[] = writtenBytes(); + assertEquals(5, writtenBytes.length); + for (int i = 0; i < writtenBytes.length; i++) { + assertEquals(5+i, writtenBytes[i]); + } + } + + @Test + public void testMultipleWrites() { + byte[] b = new byte[100]; + for (byte i = 0; i < b.length; i++) { + b[i] = i; + } + + // Write in chunks of 10 bytes + for (int i = 0; i < 10; i++) { + buffer().write(b, 10 * i, 10); + assertEquals(10 * (i + 1), buffer().readableBytes()); + } + + assertArrayEquals(b, writtenBytes()); + } +} diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java index 522a8e0a48..f63439b633 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java @@ -35,12 +35,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import io.grpc.transport.ClientStreamListener; import io.grpc.transport.Http2ClientStream; +import io.grpc.transport.WritableBuffer; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.handler.codec.http2.Http2Headers; -import java.nio.ByteBuffer; - /** * Client stream for a Netty transport. */ @@ -50,7 +49,7 @@ class NettyClientStream extends Http2ClientStream { private final NettyClientHandler handler; NettyClientStream(ClientStreamListener listener, Channel channel, NettyClientHandler handler) { - super(listener); + super(new NettyWritableBufferAllocator(channel.alloc()), listener); this.channel = checkNotNull(channel, "channel"); this.handler = checkNotNull(handler, "handler"); } @@ -74,7 +73,7 @@ class NettyClientStream extends Http2ClientStream { } void transportDataReceived(ByteBuf frame, boolean endOfStream) { - transportDataReceived(new NettyBuffer(frame.retain()), endOfStream); + transportDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream); } @Override @@ -84,11 +83,9 @@ class NettyClientStream extends Http2ClientStream { } @Override - protected void sendFrame(ByteBuffer frame, boolean endOfStream) { - SendGrpcFrameCommand cmd = - new SendGrpcFrameCommand(this, Utils.toByteBuf(channel.alloc(), frame), endOfStream); - - channel.writeAndFlush(cmd); + protected void sendFrame(WritableBuffer frame, boolean endOfStream) { + ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf(); + channel.writeAndFlush(new SendGrpcFrameCommand(this, bytebuf, endOfStream)); } @Override diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyBuffer.java b/netty/src/main/java/io/grpc/transport/netty/NettyReadableBuffer.java similarity index 92% rename from netty/src/main/java/io/grpc/transport/netty/NettyBuffer.java rename to netty/src/main/java/io/grpc/transport/netty/NettyReadableBuffer.java index c6a165dfc7..9e4275922e 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyBuffer.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyReadableBuffer.java @@ -34,7 +34,7 @@ package io.grpc.transport.netty; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import io.grpc.transport.AbstractBuffer; +import io.grpc.transport.AbstractReadableBuffer; import io.netty.buffer.ByteBuf; import java.io.IOException; @@ -46,11 +46,11 @@ import java.nio.ByteBuffer; * call {@link ByteBuf#retain}, so if that is needed it should be called prior to creating this * buffer. */ -class NettyBuffer extends AbstractBuffer { +class NettyReadableBuffer extends AbstractReadableBuffer { private final ByteBuf buffer; private boolean closed; - NettyBuffer(ByteBuf buffer) { + NettyReadableBuffer(ByteBuf buffer) { this.buffer = Preconditions.checkNotNull(buffer, "buffer"); } @@ -93,9 +93,9 @@ class NettyBuffer extends AbstractBuffer { } @Override - public NettyBuffer readBytes(int length) { + public NettyReadableBuffer readBytes(int length) { // The ByteBuf returned by readSlice() stores a reference to buffer but does not call retain(). - return new NettyBuffer(buffer.readSlice(length).retain()); + return new NettyReadableBuffer(buffer.readSlice(length).retain()); } @Override diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java index f632023f1b..c2e823718e 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java @@ -35,12 +35,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import io.grpc.Metadata; import io.grpc.transport.AbstractServerStream; +import io.grpc.transport.WritableBuffer; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.handler.codec.http2.Http2Headers; -import java.nio.ByteBuffer; - /** * Server stream for a Netty HTTP2 transport */ @@ -50,13 +49,13 @@ class NettyServerStream extends AbstractServerStream { private final NettyServerHandler handler; NettyServerStream(Channel channel, int id, NettyServerHandler handler) { - super(id); + super(new NettyWritableBufferAllocator(channel.alloc()), id); this.channel = checkNotNull(channel, "channel"); this.handler = checkNotNull(handler, "handler"); } void inboundDataReceived(ByteBuf frame, boolean endOfStream) { - super.inboundDataReceived(new NettyBuffer(frame.retain()), endOfStream); + super.inboundDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream); } @Override @@ -81,10 +80,9 @@ class NettyServerStream extends AbstractServerStream { } @Override - protected void sendFrame(ByteBuffer frame, boolean endOfStream) { - SendGrpcFrameCommand cmd = - new SendGrpcFrameCommand(this, Utils.toByteBuf(channel.alloc(), frame), endOfStream); - channel.writeAndFlush(cmd); + protected void sendFrame(WritableBuffer frame, boolean endOfStream) { + ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf(); + channel.writeAndFlush(new SendGrpcFrameCommand(this, bytebuf, endOfStream)); } @Override diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyWritableBuffer.java b/netty/src/main/java/io/grpc/transport/netty/NettyWritableBuffer.java new file mode 100644 index 0000000000..e01c4b8557 --- /dev/null +++ b/netty/src/main/java/io/grpc/transport/netty/NettyWritableBuffer.java @@ -0,0 +1,71 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport.netty; + +import io.grpc.transport.WritableBuffer; +import io.netty.buffer.ByteBuf; + +/** + * The {@link WritableBuffer} used by the Netty transport. + */ +class NettyWritableBuffer implements WritableBuffer { + + private final ByteBuf bytebuf; + + NettyWritableBuffer(ByteBuf bytebuf) { + this.bytebuf = bytebuf; + } + + @Override + public void write(byte[] src, int srcIndex, int length) { + bytebuf.writeBytes(src, srcIndex, length); + } + + @Override + public int writableBytes() { + return bytebuf.writableBytes(); + } + + @Override + public int readableBytes() { + return bytebuf.readableBytes(); + } + + @Override + public void release() { + bytebuf.release(); + } + + ByteBuf bytebuf() { + return bytebuf; + } +} diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyWritableBufferAllocator.java b/netty/src/main/java/io/grpc/transport/netty/NettyWritableBufferAllocator.java new file mode 100644 index 0000000000..8ffcfb0eb7 --- /dev/null +++ b/netty/src/main/java/io/grpc/transport/netty/NettyWritableBufferAllocator.java @@ -0,0 +1,52 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport.netty; + +import io.grpc.transport.WritableBufferAllocator; +import io.netty.buffer.ByteBufAllocator; + +/** + * The default allocator for {@link NettyWritableBuffer}s used by the Netty transport. + */ +class NettyWritableBufferAllocator implements WritableBufferAllocator { + + private final ByteBufAllocator allocator; + + NettyWritableBufferAllocator(ByteBufAllocator allocator) { + this.allocator = allocator; + } + + @Override + public NettyWritableBuffer allocate(int capacity) { + return new NettyWritableBuffer(allocator.buffer(capacity, capacity)); + } +} diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyBufferTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyReadableBufferTest.java similarity index 86% rename from netty/src/test/java/io/grpc/transport/netty/NettyBufferTest.java rename to netty/src/test/java/io/grpc/transport/netty/NettyReadableBufferTest.java index 1d0f04e5cf..fbdf2cdd77 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyBufferTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyReadableBufferTest.java @@ -34,8 +34,8 @@ package io.grpc.transport.netty; import static com.google.common.base.Charsets.UTF_8; import static org.junit.Assert.assertEquals; -import io.grpc.transport.Buffer; -import io.grpc.transport.BufferTestBase; +import io.grpc.transport.ReadableBuffer; +import io.grpc.transport.ReadableBufferTestBase; import io.netty.buffer.Unpooled; import org.junit.Before; @@ -44,15 +44,15 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** - * Tests for {@link NettyBuffer}. + * Tests for {@link NettyReadableBuffer}. */ @RunWith(JUnit4.class) -public class NettyBufferTest extends BufferTestBase { - private NettyBuffer buffer; +public class NettyReadableBufferTest extends ReadableBufferTestBase { + private NettyReadableBuffer buffer; @Before public void setup() { - buffer = new NettyBuffer(Unpooled.copiedBuffer(msg, UTF_8)); + buffer = new NettyReadableBuffer(Unpooled.copiedBuffer(msg, UTF_8)); } @Test @@ -69,7 +69,7 @@ public class NettyBufferTest extends BufferTestBase { } @Override - protected Buffer buffer() { + protected ReadableBuffer buffer() { return buffer; } } diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java index e6ca745122..d8d8ca98ac 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java @@ -58,7 +58,9 @@ import io.grpc.transport.MessageFramer; import io.grpc.transport.ServerStream; import io.grpc.transport.ServerStreamListener; import io.grpc.transport.ServerTransportListener; +import io.grpc.transport.WritableBuffer; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; @@ -86,7 +88,6 @@ import org.mockito.MockitoAnnotations; import java.io.ByteArrayInputStream; import java.io.InputStream; -import java.nio.ByteBuffer; /** Unit tests for {@link NettyServerHandler}. */ @RunWith(JUnit4.class) @@ -278,12 +279,13 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { private ByteBuf dataFrame(int streamId, boolean endStream) { final ByteBuf compressionFrame = Unpooled.buffer(CONTENT.length); - MessageFramer framer = new MessageFramer(new MessageFramer.Sink() { + MessageFramer framer = new MessageFramer(new MessageFramer.Sink() { @Override - public void deliverFrame(ByteBuffer frame, boolean endOfStream) { - compressionFrame.writeBytes(frame); + public void deliverFrame(WritableBuffer frame, boolean endOfStream) { + ByteBuf bytebuf = ((NettyWritableBuffer)frame).bytebuf(); + compressionFrame.writeBytes(bytebuf); } - }, 1000); + }, new NettyWritableBufferAllocator(ByteBufAllocator.DEFAULT), 1000); framer.writePayload(new ByteArrayInputStream(CONTENT), CONTENT.length); framer.flush(); if (endStream) { diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyTestUtil.java b/netty/src/test/java/io/grpc/transport/netty/NettyTestUtil.java index 46d5cb1de4..7c6c64bfff 100644 --- a/netty/src/test/java/io/grpc/transport/netty/NettyTestUtil.java +++ b/netty/src/test/java/io/grpc/transport/netty/NettyTestUtil.java @@ -35,7 +35,6 @@ import static io.netty.util.CharsetUtil.UTF_8; import com.google.common.io.ByteStreams; -import io.grpc.Status; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyWritableBufferAllocatorTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyWritableBufferAllocatorTest.java new file mode 100644 index 0000000000..f1a1c5392c --- /dev/null +++ b/netty/src/test/java/io/grpc/transport/netty/NettyWritableBufferAllocatorTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport.netty; + +import io.grpc.transport.WritableBufferAllocator; +import io.grpc.transport.WritableBufferAllocatorTestBase; +import io.netty.buffer.ByteBufAllocator; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link NettyWritableBufferAllocator}. + */ +@RunWith(JUnit4.class) +public class NettyWritableBufferAllocatorTest extends WritableBufferAllocatorTestBase { + + private final NettyWritableBufferAllocator allocator = + new NettyWritableBufferAllocator(ByteBufAllocator.DEFAULT); + + @Override + protected WritableBufferAllocator allocator() { + return allocator; + } +} diff --git a/netty/src/test/java/io/grpc/transport/netty/NettyWritableBufferTest.java b/netty/src/test/java/io/grpc/transport/netty/NettyWritableBufferTest.java new file mode 100644 index 0000000000..41cdd18f18 --- /dev/null +++ b/netty/src/test/java/io/grpc/transport/netty/NettyWritableBufferTest.java @@ -0,0 +1,73 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport.netty; + +import io.grpc.transport.WritableBuffer; +import io.grpc.transport.WritableBufferTestBase; +import io.netty.buffer.Unpooled; +import org.junit.After; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; + +/** + * Tests for {@link NettyWritableBuffer}. + */ +@RunWith(JUnit4.class) +public class NettyWritableBufferTest extends WritableBufferTestBase { + + private NettyWritableBuffer buffer; + + @Before + public void setup() { + buffer = new NettyWritableBuffer(Unpooled.buffer(100)); + } + + @After + public void teardown() { + buffer.release(); + } + + @Override + protected WritableBuffer buffer() { + return buffer; + } + + @Override + protected byte[] writtenBytes() { + byte b[] = buffer.bytebuf().array(); + int fromIdx = buffer.bytebuf().arrayOffset(); + return Arrays.copyOfRange(b, fromIdx, buffer.readableBytes()); + } +} diff --git a/okhttp/build.gradle b/okhttp/build.gradle index b5cb844fa9..28a0f2b4e2 100644 --- a/okhttp/build.gradle +++ b/okhttp/build.gradle @@ -6,6 +6,9 @@ description = "gRPC: OkHttp" dependencies { compile project(':grpc-core'), libraries.okhttp + + // Tests depend on base class defined by core module. + testCompile project(':grpc-core').sourceSets.test.output } // Configure the animal sniffer plugin diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java index 46fe122bc9..f22cc32b0f 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java @@ -40,8 +40,9 @@ import io.grpc.Metadata; import io.grpc.Status; import io.grpc.transport.ClientStreamListener; import io.grpc.transport.Http2ClientStream; +import io.grpc.transport.WritableBuffer; +import okio.Buffer; -import java.nio.ByteBuffer; import java.util.List; import javax.annotation.concurrent.GuardedBy; @@ -58,9 +59,9 @@ class OkHttpClientStream extends Http2ClientStream { * Construct a new client stream. */ static OkHttpClientStream newStream(ClientStreamListener listener, - AsyncFrameWriter frameWriter, - OkHttpClientTransport transport, - OutboundFlowController outboundFlow) { + AsyncFrameWriter frameWriter, + OkHttpClientTransport transport, + OutboundFlowController outboundFlow) { return new OkHttpClientStream(listener, frameWriter, transport, outboundFlow); } @@ -75,10 +76,10 @@ class OkHttpClientStream extends Http2ClientStream { private Object outboundFlowState; private OkHttpClientStream(ClientStreamListener listener, - AsyncFrameWriter frameWriter, - OkHttpClientTransport transport, - OutboundFlowController outboundFlow) { - super(listener); + AsyncFrameWriter frameWriter, + OkHttpClientTransport transport, + OutboundFlowController outboundFlow) { + super(new OkHttpWritableBufferAllocator(), listener); this.frameWriter = frameWriter; this.transport = transport; this.outboundFlow = outboundFlow; @@ -109,17 +110,14 @@ class OkHttpClientStream extends Http2ClientStream { synchronized (lock) { long length = frame.size(); window -= length; - super.transportDataReceived(new OkHttpBuffer(frame), endOfStream); + super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream); } } @Override - protected void sendFrame(ByteBuffer frame, boolean endOfStream) { + protected void sendFrame(WritableBuffer frame, boolean endOfStream) { Preconditions.checkState(id() != 0, "streamId should be set"); - okio.Buffer buffer = new okio.Buffer(); - // Read the data into a buffer. - // TODO(madongfly): swap to NIO buffers or zero-copy if/when okhttp/okio supports it - buffer.write(frame.array(), frame.arrayOffset(), frame.remaining()); + Buffer buffer = ((OkHttpWritableBuffer) frame).buffer(); // Write the data to the remote endpoint. // Per http2 SPEC, the max data length should be larger than 64K, while our frame size is // only 4K. diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java index 0adc5e0392..20cbffc351 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java @@ -46,7 +46,6 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.Status.Code; -import io.grpc.transport.ClientStream; import io.grpc.transport.ClientStreamListener; import io.grpc.transport.ClientTransport; diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpBuffer.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpReadableBuffer.java similarity index 87% rename from okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpBuffer.java rename to okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpReadableBuffer.java index 40bfe92d6f..fb372e878b 100644 --- a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpBuffer.java +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpReadableBuffer.java @@ -31,8 +31,8 @@ package io.grpc.transport.okhttp; -import io.grpc.transport.AbstractBuffer; -import io.grpc.transport.Buffer; +import io.grpc.transport.AbstractReadableBuffer; +import io.grpc.transport.ReadableBuffer; import java.io.EOFException; import java.io.IOException; @@ -40,12 +40,12 @@ import java.io.OutputStream; import java.nio.ByteBuffer; /** - * A {@link Buffer} implementation that is backed by an {@link okio.Buffer}. + * A {@link io.grpc.transport.ReadableBuffer} implementation that is backed by an {@link okio.Buffer}. */ -class OkHttpBuffer extends AbstractBuffer { +class OkHttpReadableBuffer extends AbstractReadableBuffer { private final okio.Buffer buffer; - OkHttpBuffer(okio.Buffer buffer) { + OkHttpReadableBuffer(okio.Buffer buffer) { this.buffer = buffer; } @@ -85,10 +85,10 @@ class OkHttpBuffer extends AbstractBuffer { } @Override - public Buffer readBytes(int length) { + public ReadableBuffer readBytes(int length) { okio.Buffer buf = new okio.Buffer(); buf.write(buffer, length); - return new OkHttpBuffer(buf); + return new OkHttpReadableBuffer(buf); } @Override diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpWritableBuffer.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpWritableBuffer.java new file mode 100644 index 0000000000..0afb89eb64 --- /dev/null +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpWritableBuffer.java @@ -0,0 +1,72 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport.okhttp; + +import io.grpc.transport.WritableBuffer; +import okio.Buffer; + +class OkHttpWritableBuffer implements WritableBuffer { + + private final Buffer buffer; + private int writableBytes; + private int readableBytes; + + OkHttpWritableBuffer(Buffer buffer, int capacity) { + this.buffer = buffer; + writableBytes = capacity; + } + + @Override + public void write(byte[] src, int srcIndex, int length) { + buffer.write(src, srcIndex, length); + writableBytes -= length; + readableBytes += length; + } + + @Override + public int writableBytes() { + return writableBytes; + } + + @Override + public int readableBytes() { + return readableBytes; + } + + @Override + public void release() { + } + + Buffer buffer() { + return buffer; + } +} diff --git a/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpWritableBufferAllocator.java b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpWritableBufferAllocator.java new file mode 100644 index 0000000000..a31293ba9e --- /dev/null +++ b/okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpWritableBufferAllocator.java @@ -0,0 +1,45 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport.okhttp; + +import io.grpc.transport.WritableBufferAllocator; +import okio.Buffer; + +/** + * The default allocator for {@link OkHttpWritableBuffer}s used by the OkHttp transport. + */ +class OkHttpWritableBufferAllocator implements WritableBufferAllocator { + @Override + public OkHttpWritableBuffer allocate(int capacity) { + return new OkHttpWritableBuffer(new Buffer(), capacity); + } +} diff --git a/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java index 79eb873f6e..33ad46fec3 100644 --- a/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java @@ -47,8 +47,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.Service; -import com.google.common.util.concurrent.Service.State; import com.squareup.okhttp.internal.spdy.ErrorCode; import com.squareup.okhttp.internal.spdy.FrameReader; diff --git a/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpWritableBufferAllocatorTest.java b/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpWritableBufferAllocatorTest.java new file mode 100644 index 0000000000..c94275fd52 --- /dev/null +++ b/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpWritableBufferAllocatorTest.java @@ -0,0 +1,52 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport.okhttp; + +import io.grpc.transport.WritableBufferAllocator; +import io.grpc.transport.WritableBufferAllocatorTestBase; + +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link OkHttpWritableBufferAllocator}. + */ +@RunWith(JUnit4.class) +public class OkHttpWritableBufferAllocatorTest extends WritableBufferAllocatorTestBase { + + private final OkHttpWritableBufferAllocator allocator = new OkHttpWritableBufferAllocator(); + + @Override + protected WritableBufferAllocator allocator() { + return allocator; + } +} diff --git a/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpWritableBufferTest.java b/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpWritableBufferTest.java new file mode 100644 index 0000000000..bb1c431793 --- /dev/null +++ b/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpWritableBufferTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.transport.okhttp; + +import io.grpc.transport.WritableBuffer; +import io.grpc.transport.WritableBufferTestBase; +import okio.Buffer; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link OkHttpWritableBuffer}. + */ +@RunWith(JUnit4.class) +public class OkHttpWritableBufferTest extends WritableBufferTestBase { + + private OkHttpWritableBuffer buffer; + + @Before + public void setup() { + buffer = new OkHttpWritableBuffer(new Buffer(), 100); + } + + @Override + protected WritableBuffer buffer() { + return buffer; + } + + @Override + protected byte[] writtenBytes() { + return buffer.buffer().readByteArray(); + } +}