diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java b/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java
index 8dec0e34c7..3d744f00ee 100644
--- a/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java
+++ b/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java
@@ -34,6 +34,13 @@ public final class Buffers {
return new ByteStringWrapper(bytes);
}
+ /**
+ * Shortcut for {@code wrap(bytes, 0, bytes.length}.
+ */
+ public static Buffer wrap(byte[] bytes) {
+ return new ByteArrayWrapper(bytes, 0, bytes.length);
+ }
+
/**
* Creates a new {@link Buffer} that is backed by the given byte array.
*
@@ -109,9 +116,27 @@ public final class Buffers {
* Creates a new {@link InputStream} backed by the given buffer. Any read taken on the stream will
* automatically increment the read position of this buffer. Closing the stream, however, does not
* affect the original buffer.
+ *
+ * @param buffer the buffer backing the new {@link InputStream}.
+ * @param owner if {@code true}, the returned stream will close the buffer when closed.
*/
- public static InputStream openStream(Buffer buffer) {
- return new BufferInputStream(buffer);
+ public static InputStream openStream(Buffer buffer, boolean owner) {
+ return new BufferInputStream(owner ? buffer : ignoreClose(buffer));
+ }
+
+ /**
+ * Decorates the given {@link Buffer} to ignore calls to {@link Buffer#close}.
+ *
+ * @param buffer the buffer to be decorated.
+ * @return a wrapper around {@code buffer} that ignores calls to {@link Buffer#close}.
+ */
+ public static Buffer ignoreClose(Buffer buffer) {
+ return new ForwardingBuffer(buffer) {
+ @Override
+ public void close() {
+ // Ignore.
+ }
+ };
}
/**
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/CompositeBuffer.java b/core/src/main/java/com/google/net/stubby/newtransport/CompositeBuffer.java
new file mode 100644
index 0000000000..91398ac7bb
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/CompositeBuffer.java
@@ -0,0 +1,181 @@
+package com.google.net.stubby.newtransport;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/**
+ * A {@link Buffer} that is composed of 0 or more {@link Buffer}s. This provides a facade that
+ * allows multiple buffers to be treated as one.
+ *
+ *
When a buffer is added to a composite, it's life cycle is controlled by the composite. Once
+ * the composite has read past the end of a given buffer, that buffer is automatically closed and
+ * removed from the composite.
+ */
+public class CompositeBuffer extends AbstractBuffer {
+
+ private int readableBytes;
+ private final Queue buffers = new ArrayDeque();
+
+ /**
+ * Adds a new {@link Buffer} at the end of the buffer list. After a buffer is added, it is
+ * expected that this {@link CompositeBuffer} has complete ownership. Any attempt to modify the
+ * buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of
+ * this {@link CompositeBuffer}.
+ */
+ public void addBuffer(Buffer buffer) {
+ buffers.add(buffer);
+ readableBytes += buffer.readableBytes();
+ }
+
+ @Override
+ public int readableBytes() {
+ return readableBytes;
+ }
+
+ @Override
+ public int readUnsignedByte() {
+ ReadOperation op = new ReadOperation() {
+ @Override
+ int readInternal(Buffer buffer, int length) {
+ return buffer.readUnsignedByte();
+ }
+ };
+ execute(op, 1);
+ return op.value;
+ }
+
+ @Override
+ public void skipBytes(int length) {
+ execute(new ReadOperation() {
+ @Override
+ public int readInternal(Buffer buffer, int length) {
+ buffer.skipBytes(length);
+ return 0;
+ }
+ }, length);
+ }
+
+ @Override
+ public void readBytes(final byte[] dest, final int destOffset, int length) {
+ execute(new ReadOperation() {
+ int currentOffset = destOffset;
+ @Override
+ public int readInternal(Buffer buffer, int length) {
+ buffer.readBytes(dest, currentOffset, length);
+ currentOffset += length;
+ return 0;
+ }
+ }, length);
+ }
+
+ @Override
+ public void readBytes(final ByteBuffer dest) {
+ execute(new ReadOperation() {
+ @Override
+ public int readInternal(Buffer buffer, int length) {
+ // Change the limit so that only lengthToCopy bytes are available.
+ int prevLimit = dest.limit();
+ dest.limit(dest.position() + length);
+
+ // Write the bytes and restore the original limit.
+ buffer.readBytes(dest);
+ dest.limit(prevLimit);
+ return 0;
+ }
+ }, dest.remaining());
+ }
+
+ @Override
+ public void readBytes(final OutputStream dest, int length) throws IOException {
+ ReadOperation op = new ReadOperation() {
+ @Override
+ public int readInternal(Buffer buffer, int length) throws IOException {
+ buffer.readBytes(dest, length);
+ return 0;
+ }
+ };
+ execute(op, length);
+
+ // If an exception occurred, throw it.
+ if (op.isError()) {
+ throw op.ex;
+ }
+ }
+
+ @Override
+ public void close() {
+ while (!buffers.isEmpty()) {
+ buffers.remove().close();
+ }
+ }
+
+ /**
+ * Executes the given {@link ReadOperation} against the {@link Buffer}s required to satisfy the
+ * requested {@code length}.
+ */
+ private void execute(ReadOperation op, int length) {
+ checkReadable(length);
+
+ for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) {
+ Buffer buffer = buffers.peek();
+ int lengthToCopy = Math.min(length, buffer.readableBytes());
+
+ // Perform the read operation for this buffer.
+ op.read(buffer, lengthToCopy);
+ if (op.isError()) {
+ return;
+ }
+
+ length -= lengthToCopy;
+ readableBytes -= lengthToCopy;
+ }
+
+ if (length > 0) {
+ // Should never get here.
+ throw new AssertionError("Failed executing read operation");
+ }
+ }
+
+ /**
+ * If the current buffer is exhausted, removes and closes it.
+ */
+ private void advanceBufferIfNecessary() {
+ Buffer buffer = buffers.peek();
+ if (buffer.readableBytes() == 0) {
+ buffers.remove().close();
+ }
+ }
+
+ /**
+ * A simple read operation to perform on a single {@link Buffer}. All state management for the
+ * buffers is done by {@link CompositeBuffer#execute(ReadOperation, int)}.
+ */
+ private abstract class ReadOperation {
+ /**
+ * Only used by {@link CompositeBuffer#readUnsignedByte()}.
+ */
+ int value;
+
+ /**
+ * Only used by {@link CompositeBuffer#readBytes(OutputStream, int)};
+ */
+ IOException ex;
+
+ final void read(Buffer buffer, int length) {
+ try {
+ value = readInternal(buffer, length);
+ } catch (IOException e) {
+ ex = e;
+ }
+ }
+
+ final boolean isError() {
+ return ex != null;
+ }
+
+ abstract int readInternal(Buffer buffer, int length) throws IOException;
+ }
+}
diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ForwardingBuffer.java b/core/src/main/java/com/google/net/stubby/newtransport/ForwardingBuffer.java
new file mode 100644
index 0000000000..9dbcf0d681
--- /dev/null
+++ b/core/src/main/java/com/google/net/stubby/newtransport/ForwardingBuffer.java
@@ -0,0 +1,84 @@
+package com.google.net.stubby.newtransport;
+
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Base class for a wrapper around another {@link Buffer}.
+ */
+public abstract class ForwardingBuffer implements Buffer {
+
+ private final Buffer buf;
+
+ public ForwardingBuffer(Buffer buf) {
+ this.buf = Preconditions.checkNotNull(buf, "buf");
+ }
+
+ @Override
+ public int readableBytes() {
+ return buf.readableBytes();
+ }
+
+ @Override
+ public int readUnsignedByte() {
+ return buf.readUnsignedByte();
+ }
+
+ @Override
+ public int readUnsignedMedium() {
+ return buf.readUnsignedMedium();
+ }
+
+ @Override
+ public int readUnsignedShort() {
+ return buf.readUnsignedShort();
+ }
+
+ @Override
+ public int readInt() {
+ return buf.readInt();
+ }
+
+ @Override
+ public void skipBytes(int length) {
+ buf.skipBytes(length);
+ }
+
+ @Override
+ public void readBytes(byte[] dest, int destOffset, int length) {
+ buf.readBytes(dest, destOffset, length);
+ }
+
+ @Override
+ public void readBytes(ByteBuffer dest) {
+ buf.readBytes(dest);
+ }
+
+ @Override
+ public void readBytes(OutputStream dest, int length) throws IOException {
+ buf.readBytes(dest, length);
+ }
+
+ @Override
+ public boolean hasArray() {
+ return buf.hasArray();
+ }
+
+ @Override
+ public byte[] array() {
+ return buf.array();
+ }
+
+ @Override
+ public int arrayOffset() {
+ return buf.arrayOffset();
+ }
+
+ @Override
+ public void close() {
+ buf.close();
+ }
+}
\ No newline at end of file
diff --git a/core/src/test/java/com/google/net/stubby/newtransport/CompositeBufferTest.java b/core/src/test/java/com/google/net/stubby/newtransport/CompositeBufferTest.java
new file mode 100644
index 0000000000..14bda501cd
--- /dev/null
+++ b/core/src/test/java/com/google/net/stubby/newtransport/CompositeBufferTest.java
@@ -0,0 +1,154 @@
+package com.google.net.stubby.newtransport;
+
+import static io.netty.util.CharsetUtil.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Tests for {@link CompositeBuffer}.
+ */
+@RunWith(JUnit4.class)
+public class CompositeBufferTest {
+ private static final String EXPECTED_VALUE = "hello world";
+
+ private CompositeBuffer composite;
+
+ @Before
+ public void setup() {
+ composite = new CompositeBuffer();
+ splitAndAdd(EXPECTED_VALUE);
+ }
+
+ @After
+ public void teardown() {
+ composite.close();
+ }
+
+ @Test
+ public void singleBufferShouldSucceed() {
+ composite = new CompositeBuffer();
+ composite.addBuffer(Buffers.wrap(EXPECTED_VALUE.getBytes(UTF_8)));
+ assertEquals(EXPECTED_VALUE.length(), composite.readableBytes());
+ assertEquals(EXPECTED_VALUE, Buffers.readAsStringUtf8(composite));
+ assertEquals(0, composite.readableBytes());
+ }
+
+ @Test
+ public void readUnsignedByteShouldSucceed() {
+ for (int ix = 0; ix < EXPECTED_VALUE.length(); ++ix) {
+ int c = composite.readUnsignedByte();
+ assertEquals(EXPECTED_VALUE.charAt(ix), (char) c);
+ }
+ assertEquals(0, composite.readableBytes());
+ }
+
+ @Test
+ public void skipBytesShouldSucceed() {
+ int remaining = EXPECTED_VALUE.length();
+ composite.skipBytes(1);
+ remaining--;
+ assertEquals(remaining, composite.readableBytes());
+
+ composite.skipBytes(5);
+ remaining -= 5;
+ assertEquals(remaining, composite.readableBytes());
+
+ composite.skipBytes(remaining);
+ assertEquals(0, composite.readableBytes());
+ }
+
+ @Test
+ public void readByteArrayShouldSucceed() {
+ byte[] bytes = new byte[composite.readableBytes()];
+ int writeIndex = 0;
+
+ composite.readBytes(bytes, writeIndex, 1);
+ writeIndex++;
+ assertEquals(EXPECTED_VALUE.length() - writeIndex, composite.readableBytes());
+
+ composite.readBytes(bytes, writeIndex, 5);
+ writeIndex += 5;
+ assertEquals(EXPECTED_VALUE.length() - writeIndex, composite.readableBytes());
+
+ int remaining = composite.readableBytes();
+ composite.readBytes(bytes, writeIndex, remaining);
+ writeIndex += remaining;
+ assertEquals(0, composite.readableBytes());
+ assertEquals(bytes.length, writeIndex);
+ assertEquals(EXPECTED_VALUE, new String(bytes, UTF_8));
+ }
+
+ @Test
+ public void readByteBufferShouldSucceed() {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(EXPECTED_VALUE.length());
+ int remaining = EXPECTED_VALUE.length();
+
+ byteBuffer.limit(1);
+ composite.readBytes(byteBuffer);
+ remaining--;
+ assertEquals(remaining, composite.readableBytes());
+
+ byteBuffer.limit(byteBuffer.limit() + 5);
+ composite.readBytes(byteBuffer);
+ remaining -= 5;
+ assertEquals(remaining, composite.readableBytes());
+
+ byteBuffer.limit(byteBuffer.limit() + remaining);
+ composite.readBytes(byteBuffer);
+ assertEquals(0, composite.readableBytes());
+ assertEquals(EXPECTED_VALUE, new String(byteBuffer.array(), UTF_8));
+ }
+
+ @Test
+ public void readStreamShouldSucceed() throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ int remaining = EXPECTED_VALUE.length();
+
+ composite.readBytes(bos, 1);
+ remaining--;
+ assertEquals(remaining, composite.readableBytes());
+
+ composite.readBytes(bos, 5);
+ remaining -= 5;
+ assertEquals(remaining, composite.readableBytes());
+
+ composite.readBytes(bos, remaining);
+ assertEquals(0, composite.readableBytes());
+ assertEquals(EXPECTED_VALUE, new String(bos.toByteArray(), UTF_8));
+ }
+
+ @Test
+ public void closeShouldCloseBuffers() {
+ composite = new CompositeBuffer();
+ Buffer mock1 = mock(Buffer.class);
+ Buffer mock2 = mock(Buffer.class);
+ composite.addBuffer(mock1);
+ composite.addBuffer(mock2);
+
+ composite.close();
+ verify(mock1).close();
+ verify(mock2).close();
+ }
+
+ private void splitAndAdd(String value) {
+ int partLength = Math.max(1, value.length() / 4);
+ for (int startIndex = 0, endIndex = 0; startIndex < value.length(); startIndex = endIndex) {
+ endIndex = Math.min(value.length(), startIndex + partLength);
+ String part = value.substring(startIndex, endIndex);
+ composite.addBuffer(Buffers.wrap(part.getBytes(UTF_8)));
+ }
+
+ assertEquals(value.length(), composite.readableBytes());
+ }
+}