From 88efcaf15c21725c137e2d7bf8ffba0b3897a25a Mon Sep 17 00:00:00 2001 From: nathanmittler Date: Wed, 30 Jul 2014 14:37:38 -0700 Subject: [PATCH] Adding CompositeBuffer and a few other utilties related to Buffers. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=72268003 --- .../net/stubby/newtransport/Buffers.java | 29 ++- .../stubby/newtransport/CompositeBuffer.java | 181 ++++++++++++++++++ .../stubby/newtransport/ForwardingBuffer.java | 84 ++++++++ .../newtransport/CompositeBufferTest.java | 154 +++++++++++++++ 4 files changed, 446 insertions(+), 2 deletions(-) create mode 100644 core/src/main/java/com/google/net/stubby/newtransport/CompositeBuffer.java create mode 100644 core/src/main/java/com/google/net/stubby/newtransport/ForwardingBuffer.java create mode 100644 core/src/test/java/com/google/net/stubby/newtransport/CompositeBufferTest.java diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java b/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java index 8dec0e34c7..3d744f00ee 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/Buffers.java @@ -34,6 +34,13 @@ public final class Buffers { return new ByteStringWrapper(bytes); } + /** + * Shortcut for {@code wrap(bytes, 0, bytes.length}. + */ + public static Buffer wrap(byte[] bytes) { + return new ByteArrayWrapper(bytes, 0, bytes.length); + } + /** * Creates a new {@link Buffer} that is backed by the given byte array. * @@ -109,9 +116,27 @@ public final class Buffers { * Creates a new {@link InputStream} backed by the given buffer. Any read taken on the stream will * automatically increment the read position of this buffer. Closing the stream, however, does not * affect the original buffer. + * + * @param buffer the buffer backing the new {@link InputStream}. + * @param owner if {@code true}, the returned stream will close the buffer when closed. */ - public static InputStream openStream(Buffer buffer) { - return new BufferInputStream(buffer); + public static InputStream openStream(Buffer buffer, boolean owner) { + return new BufferInputStream(owner ? buffer : ignoreClose(buffer)); + } + + /** + * Decorates the given {@link Buffer} to ignore calls to {@link Buffer#close}. + * + * @param buffer the buffer to be decorated. + * @return a wrapper around {@code buffer} that ignores calls to {@link Buffer#close}. + */ + public static Buffer ignoreClose(Buffer buffer) { + return new ForwardingBuffer(buffer) { + @Override + public void close() { + // Ignore. + } + }; } /** diff --git a/core/src/main/java/com/google/net/stubby/newtransport/CompositeBuffer.java b/core/src/main/java/com/google/net/stubby/newtransport/CompositeBuffer.java new file mode 100644 index 0000000000..91398ac7bb --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/CompositeBuffer.java @@ -0,0 +1,181 @@ +package com.google.net.stubby.newtransport; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Queue; + +/** + * A {@link Buffer} that is composed of 0 or more {@link Buffer}s. This provides a facade that + * allows multiple buffers to be treated as one. + * + *

When a buffer is added to a composite, it's life cycle is controlled by the composite. Once + * the composite has read past the end of a given buffer, that buffer is automatically closed and + * removed from the composite. + */ +public class CompositeBuffer extends AbstractBuffer { + + private int readableBytes; + private final Queue buffers = new ArrayDeque(); + + /** + * Adds a new {@link Buffer} at the end of the buffer list. After a buffer is added, it is + * expected that this {@link CompositeBuffer} has complete ownership. Any attempt to modify the + * buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of + * this {@link CompositeBuffer}. + */ + public void addBuffer(Buffer buffer) { + buffers.add(buffer); + readableBytes += buffer.readableBytes(); + } + + @Override + public int readableBytes() { + return readableBytes; + } + + @Override + public int readUnsignedByte() { + ReadOperation op = new ReadOperation() { + @Override + int readInternal(Buffer buffer, int length) { + return buffer.readUnsignedByte(); + } + }; + execute(op, 1); + return op.value; + } + + @Override + public void skipBytes(int length) { + execute(new ReadOperation() { + @Override + public int readInternal(Buffer buffer, int length) { + buffer.skipBytes(length); + return 0; + } + }, length); + } + + @Override + public void readBytes(final byte[] dest, final int destOffset, int length) { + execute(new ReadOperation() { + int currentOffset = destOffset; + @Override + public int readInternal(Buffer buffer, int length) { + buffer.readBytes(dest, currentOffset, length); + currentOffset += length; + return 0; + } + }, length); + } + + @Override + public void readBytes(final ByteBuffer dest) { + execute(new ReadOperation() { + @Override + public int readInternal(Buffer buffer, int length) { + // Change the limit so that only lengthToCopy bytes are available. + int prevLimit = dest.limit(); + dest.limit(dest.position() + length); + + // Write the bytes and restore the original limit. + buffer.readBytes(dest); + dest.limit(prevLimit); + return 0; + } + }, dest.remaining()); + } + + @Override + public void readBytes(final OutputStream dest, int length) throws IOException { + ReadOperation op = new ReadOperation() { + @Override + public int readInternal(Buffer buffer, int length) throws IOException { + buffer.readBytes(dest, length); + return 0; + } + }; + execute(op, length); + + // If an exception occurred, throw it. + if (op.isError()) { + throw op.ex; + } + } + + @Override + public void close() { + while (!buffers.isEmpty()) { + buffers.remove().close(); + } + } + + /** + * Executes the given {@link ReadOperation} against the {@link Buffer}s required to satisfy the + * requested {@code length}. + */ + private void execute(ReadOperation op, int length) { + checkReadable(length); + + for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) { + Buffer buffer = buffers.peek(); + int lengthToCopy = Math.min(length, buffer.readableBytes()); + + // Perform the read operation for this buffer. + op.read(buffer, lengthToCopy); + if (op.isError()) { + return; + } + + length -= lengthToCopy; + readableBytes -= lengthToCopy; + } + + if (length > 0) { + // Should never get here. + throw new AssertionError("Failed executing read operation"); + } + } + + /** + * If the current buffer is exhausted, removes and closes it. + */ + private void advanceBufferIfNecessary() { + Buffer buffer = buffers.peek(); + if (buffer.readableBytes() == 0) { + buffers.remove().close(); + } + } + + /** + * A simple read operation to perform on a single {@link Buffer}. All state management for the + * buffers is done by {@link CompositeBuffer#execute(ReadOperation, int)}. + */ + private abstract class ReadOperation { + /** + * Only used by {@link CompositeBuffer#readUnsignedByte()}. + */ + int value; + + /** + * Only used by {@link CompositeBuffer#readBytes(OutputStream, int)}; + */ + IOException ex; + + final void read(Buffer buffer, int length) { + try { + value = readInternal(buffer, length); + } catch (IOException e) { + ex = e; + } + } + + final boolean isError() { + return ex != null; + } + + abstract int readInternal(Buffer buffer, int length) throws IOException; + } +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ForwardingBuffer.java b/core/src/main/java/com/google/net/stubby/newtransport/ForwardingBuffer.java new file mode 100644 index 0000000000..9dbcf0d681 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/ForwardingBuffer.java @@ -0,0 +1,84 @@ +package com.google.net.stubby.newtransport; + +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * Base class for a wrapper around another {@link Buffer}. + */ +public abstract class ForwardingBuffer implements Buffer { + + private final Buffer buf; + + public ForwardingBuffer(Buffer buf) { + this.buf = Preconditions.checkNotNull(buf, "buf"); + } + + @Override + public int readableBytes() { + return buf.readableBytes(); + } + + @Override + public int readUnsignedByte() { + return buf.readUnsignedByte(); + } + + @Override + public int readUnsignedMedium() { + return buf.readUnsignedMedium(); + } + + @Override + public int readUnsignedShort() { + return buf.readUnsignedShort(); + } + + @Override + public int readInt() { + return buf.readInt(); + } + + @Override + public void skipBytes(int length) { + buf.skipBytes(length); + } + + @Override + public void readBytes(byte[] dest, int destOffset, int length) { + buf.readBytes(dest, destOffset, length); + } + + @Override + public void readBytes(ByteBuffer dest) { + buf.readBytes(dest); + } + + @Override + public void readBytes(OutputStream dest, int length) throws IOException { + buf.readBytes(dest, length); + } + + @Override + public boolean hasArray() { + return buf.hasArray(); + } + + @Override + public byte[] array() { + return buf.array(); + } + + @Override + public int arrayOffset() { + return buf.arrayOffset(); + } + + @Override + public void close() { + buf.close(); + } +} \ No newline at end of file diff --git a/core/src/test/java/com/google/net/stubby/newtransport/CompositeBufferTest.java b/core/src/test/java/com/google/net/stubby/newtransport/CompositeBufferTest.java new file mode 100644 index 0000000000..14bda501cd --- /dev/null +++ b/core/src/test/java/com/google/net/stubby/newtransport/CompositeBufferTest.java @@ -0,0 +1,154 @@ +package com.google.net.stubby.newtransport; + +import static io.netty.util.CharsetUtil.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * Tests for {@link CompositeBuffer}. + */ +@RunWith(JUnit4.class) +public class CompositeBufferTest { + private static final String EXPECTED_VALUE = "hello world"; + + private CompositeBuffer composite; + + @Before + public void setup() { + composite = new CompositeBuffer(); + splitAndAdd(EXPECTED_VALUE); + } + + @After + public void teardown() { + composite.close(); + } + + @Test + public void singleBufferShouldSucceed() { + composite = new CompositeBuffer(); + composite.addBuffer(Buffers.wrap(EXPECTED_VALUE.getBytes(UTF_8))); + assertEquals(EXPECTED_VALUE.length(), composite.readableBytes()); + assertEquals(EXPECTED_VALUE, Buffers.readAsStringUtf8(composite)); + assertEquals(0, composite.readableBytes()); + } + + @Test + public void readUnsignedByteShouldSucceed() { + for (int ix = 0; ix < EXPECTED_VALUE.length(); ++ix) { + int c = composite.readUnsignedByte(); + assertEquals(EXPECTED_VALUE.charAt(ix), (char) c); + } + assertEquals(0, composite.readableBytes()); + } + + @Test + public void skipBytesShouldSucceed() { + int remaining = EXPECTED_VALUE.length(); + composite.skipBytes(1); + remaining--; + assertEquals(remaining, composite.readableBytes()); + + composite.skipBytes(5); + remaining -= 5; + assertEquals(remaining, composite.readableBytes()); + + composite.skipBytes(remaining); + assertEquals(0, composite.readableBytes()); + } + + @Test + public void readByteArrayShouldSucceed() { + byte[] bytes = new byte[composite.readableBytes()]; + int writeIndex = 0; + + composite.readBytes(bytes, writeIndex, 1); + writeIndex++; + assertEquals(EXPECTED_VALUE.length() - writeIndex, composite.readableBytes()); + + composite.readBytes(bytes, writeIndex, 5); + writeIndex += 5; + assertEquals(EXPECTED_VALUE.length() - writeIndex, composite.readableBytes()); + + int remaining = composite.readableBytes(); + composite.readBytes(bytes, writeIndex, remaining); + writeIndex += remaining; + assertEquals(0, composite.readableBytes()); + assertEquals(bytes.length, writeIndex); + assertEquals(EXPECTED_VALUE, new String(bytes, UTF_8)); + } + + @Test + public void readByteBufferShouldSucceed() { + ByteBuffer byteBuffer = ByteBuffer.allocate(EXPECTED_VALUE.length()); + int remaining = EXPECTED_VALUE.length(); + + byteBuffer.limit(1); + composite.readBytes(byteBuffer); + remaining--; + assertEquals(remaining, composite.readableBytes()); + + byteBuffer.limit(byteBuffer.limit() + 5); + composite.readBytes(byteBuffer); + remaining -= 5; + assertEquals(remaining, composite.readableBytes()); + + byteBuffer.limit(byteBuffer.limit() + remaining); + composite.readBytes(byteBuffer); + assertEquals(0, composite.readableBytes()); + assertEquals(EXPECTED_VALUE, new String(byteBuffer.array(), UTF_8)); + } + + @Test + public void readStreamShouldSucceed() throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int remaining = EXPECTED_VALUE.length(); + + composite.readBytes(bos, 1); + remaining--; + assertEquals(remaining, composite.readableBytes()); + + composite.readBytes(bos, 5); + remaining -= 5; + assertEquals(remaining, composite.readableBytes()); + + composite.readBytes(bos, remaining); + assertEquals(0, composite.readableBytes()); + assertEquals(EXPECTED_VALUE, new String(bos.toByteArray(), UTF_8)); + } + + @Test + public void closeShouldCloseBuffers() { + composite = new CompositeBuffer(); + Buffer mock1 = mock(Buffer.class); + Buffer mock2 = mock(Buffer.class); + composite.addBuffer(mock1); + composite.addBuffer(mock2); + + composite.close(); + verify(mock1).close(); + verify(mock2).close(); + } + + private void splitAndAdd(String value) { + int partLength = Math.max(1, value.length() / 4); + for (int startIndex = 0, endIndex = 0; startIndex < value.length(); startIndex = endIndex) { + endIndex = Math.min(value.length(), startIndex + partLength); + String part = value.substring(startIndex, endIndex); + composite.addBuffer(Buffers.wrap(part.getBytes(UTF_8))); + } + + assertEquals(value.length(), composite.readableBytes()); + } +}