Adding CompositeBuffer and a few other utilties related to Buffers.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=72268003
This commit is contained in:
nathanmittler 2014-07-30 14:37:38 -07:00 committed by Eric Anderson
parent bd56449f47
commit 88efcaf15c
4 changed files with 446 additions and 2 deletions

View File

@ -34,6 +34,13 @@ public final class Buffers {
return new ByteStringWrapper(bytes);
}
/**
* Shortcut for {@code wrap(bytes, 0, bytes.length}.
*/
public static Buffer wrap(byte[] bytes) {
return new ByteArrayWrapper(bytes, 0, bytes.length);
}
/**
* Creates a new {@link Buffer} that is backed by the given byte array.
*
@ -109,9 +116,27 @@ public final class Buffers {
* Creates a new {@link InputStream} backed by the given buffer. Any read taken on the stream will
* automatically increment the read position of this buffer. Closing the stream, however, does not
* affect the original buffer.
*
* @param buffer the buffer backing the new {@link InputStream}.
* @param owner if {@code true}, the returned stream will close the buffer when closed.
*/
public static InputStream openStream(Buffer buffer) {
return new BufferInputStream(buffer);
public static InputStream openStream(Buffer buffer, boolean owner) {
return new BufferInputStream(owner ? buffer : ignoreClose(buffer));
}
/**
* Decorates the given {@link Buffer} to ignore calls to {@link Buffer#close}.
*
* @param buffer the buffer to be decorated.
* @return a wrapper around {@code buffer} that ignores calls to {@link Buffer#close}.
*/
public static Buffer ignoreClose(Buffer buffer) {
return new ForwardingBuffer(buffer) {
@Override
public void close() {
// Ignore.
}
};
}
/**

View File

@ -0,0 +1,181 @@
package com.google.net.stubby.newtransport;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
/**
* A {@link Buffer} that is composed of 0 or more {@link Buffer}s. This provides a facade that
* allows multiple buffers to be treated as one.
*
* <p>When a buffer is added to a composite, it's life cycle is controlled by the composite. Once
* the composite has read past the end of a given buffer, that buffer is automatically closed and
* removed from the composite.
*/
public class CompositeBuffer extends AbstractBuffer {
private int readableBytes;
private final Queue<Buffer> buffers = new ArrayDeque<Buffer>();
/**
* Adds a new {@link Buffer} at the end of the buffer list. After a buffer is added, it is
* expected that this {@link CompositeBuffer} has complete ownership. Any attempt to modify the
* buffer (i.e. modifying the readable bytes) may result in corruption of the internal state of
* this {@link CompositeBuffer}.
*/
public void addBuffer(Buffer buffer) {
buffers.add(buffer);
readableBytes += buffer.readableBytes();
}
@Override
public int readableBytes() {
return readableBytes;
}
@Override
public int readUnsignedByte() {
ReadOperation op = new ReadOperation() {
@Override
int readInternal(Buffer buffer, int length) {
return buffer.readUnsignedByte();
}
};
execute(op, 1);
return op.value;
}
@Override
public void skipBytes(int length) {
execute(new ReadOperation() {
@Override
public int readInternal(Buffer buffer, int length) {
buffer.skipBytes(length);
return 0;
}
}, length);
}
@Override
public void readBytes(final byte[] dest, final int destOffset, int length) {
execute(new ReadOperation() {
int currentOffset = destOffset;
@Override
public int readInternal(Buffer buffer, int length) {
buffer.readBytes(dest, currentOffset, length);
currentOffset += length;
return 0;
}
}, length);
}
@Override
public void readBytes(final ByteBuffer dest) {
execute(new ReadOperation() {
@Override
public int readInternal(Buffer buffer, int length) {
// Change the limit so that only lengthToCopy bytes are available.
int prevLimit = dest.limit();
dest.limit(dest.position() + length);
// Write the bytes and restore the original limit.
buffer.readBytes(dest);
dest.limit(prevLimit);
return 0;
}
}, dest.remaining());
}
@Override
public void readBytes(final OutputStream dest, int length) throws IOException {
ReadOperation op = new ReadOperation() {
@Override
public int readInternal(Buffer buffer, int length) throws IOException {
buffer.readBytes(dest, length);
return 0;
}
};
execute(op, length);
// If an exception occurred, throw it.
if (op.isError()) {
throw op.ex;
}
}
@Override
public void close() {
while (!buffers.isEmpty()) {
buffers.remove().close();
}
}
/**
* Executes the given {@link ReadOperation} against the {@link Buffer}s required to satisfy the
* requested {@code length}.
*/
private void execute(ReadOperation op, int length) {
checkReadable(length);
for (; length > 0 && !buffers.isEmpty(); advanceBufferIfNecessary()) {
Buffer buffer = buffers.peek();
int lengthToCopy = Math.min(length, buffer.readableBytes());
// Perform the read operation for this buffer.
op.read(buffer, lengthToCopy);
if (op.isError()) {
return;
}
length -= lengthToCopy;
readableBytes -= lengthToCopy;
}
if (length > 0) {
// Should never get here.
throw new AssertionError("Failed executing read operation");
}
}
/**
* If the current buffer is exhausted, removes and closes it.
*/
private void advanceBufferIfNecessary() {
Buffer buffer = buffers.peek();
if (buffer.readableBytes() == 0) {
buffers.remove().close();
}
}
/**
* A simple read operation to perform on a single {@link Buffer}. All state management for the
* buffers is done by {@link CompositeBuffer#execute(ReadOperation, int)}.
*/
private abstract class ReadOperation {
/**
* Only used by {@link CompositeBuffer#readUnsignedByte()}.
*/
int value;
/**
* Only used by {@link CompositeBuffer#readBytes(OutputStream, int)};
*/
IOException ex;
final void read(Buffer buffer, int length) {
try {
value = readInternal(buffer, length);
} catch (IOException e) {
ex = e;
}
}
final boolean isError() {
return ex != null;
}
abstract int readInternal(Buffer buffer, int length) throws IOException;
}
}

View File

@ -0,0 +1,84 @@
package com.google.net.stubby.newtransport;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
* Base class for a wrapper around another {@link Buffer}.
*/
public abstract class ForwardingBuffer implements Buffer {
private final Buffer buf;
public ForwardingBuffer(Buffer buf) {
this.buf = Preconditions.checkNotNull(buf, "buf");
}
@Override
public int readableBytes() {
return buf.readableBytes();
}
@Override
public int readUnsignedByte() {
return buf.readUnsignedByte();
}
@Override
public int readUnsignedMedium() {
return buf.readUnsignedMedium();
}
@Override
public int readUnsignedShort() {
return buf.readUnsignedShort();
}
@Override
public int readInt() {
return buf.readInt();
}
@Override
public void skipBytes(int length) {
buf.skipBytes(length);
}
@Override
public void readBytes(byte[] dest, int destOffset, int length) {
buf.readBytes(dest, destOffset, length);
}
@Override
public void readBytes(ByteBuffer dest) {
buf.readBytes(dest);
}
@Override
public void readBytes(OutputStream dest, int length) throws IOException {
buf.readBytes(dest, length);
}
@Override
public boolean hasArray() {
return buf.hasArray();
}
@Override
public byte[] array() {
return buf.array();
}
@Override
public int arrayOffset() {
return buf.arrayOffset();
}
@Override
public void close() {
buf.close();
}
}

View File

@ -0,0 +1,154 @@
package com.google.net.stubby.newtransport;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* Tests for {@link CompositeBuffer}.
*/
@RunWith(JUnit4.class)
public class CompositeBufferTest {
private static final String EXPECTED_VALUE = "hello world";
private CompositeBuffer composite;
@Before
public void setup() {
composite = new CompositeBuffer();
splitAndAdd(EXPECTED_VALUE);
}
@After
public void teardown() {
composite.close();
}
@Test
public void singleBufferShouldSucceed() {
composite = new CompositeBuffer();
composite.addBuffer(Buffers.wrap(EXPECTED_VALUE.getBytes(UTF_8)));
assertEquals(EXPECTED_VALUE.length(), composite.readableBytes());
assertEquals(EXPECTED_VALUE, Buffers.readAsStringUtf8(composite));
assertEquals(0, composite.readableBytes());
}
@Test
public void readUnsignedByteShouldSucceed() {
for (int ix = 0; ix < EXPECTED_VALUE.length(); ++ix) {
int c = composite.readUnsignedByte();
assertEquals(EXPECTED_VALUE.charAt(ix), (char) c);
}
assertEquals(0, composite.readableBytes());
}
@Test
public void skipBytesShouldSucceed() {
int remaining = EXPECTED_VALUE.length();
composite.skipBytes(1);
remaining--;
assertEquals(remaining, composite.readableBytes());
composite.skipBytes(5);
remaining -= 5;
assertEquals(remaining, composite.readableBytes());
composite.skipBytes(remaining);
assertEquals(0, composite.readableBytes());
}
@Test
public void readByteArrayShouldSucceed() {
byte[] bytes = new byte[composite.readableBytes()];
int writeIndex = 0;
composite.readBytes(bytes, writeIndex, 1);
writeIndex++;
assertEquals(EXPECTED_VALUE.length() - writeIndex, composite.readableBytes());
composite.readBytes(bytes, writeIndex, 5);
writeIndex += 5;
assertEquals(EXPECTED_VALUE.length() - writeIndex, composite.readableBytes());
int remaining = composite.readableBytes();
composite.readBytes(bytes, writeIndex, remaining);
writeIndex += remaining;
assertEquals(0, composite.readableBytes());
assertEquals(bytes.length, writeIndex);
assertEquals(EXPECTED_VALUE, new String(bytes, UTF_8));
}
@Test
public void readByteBufferShouldSucceed() {
ByteBuffer byteBuffer = ByteBuffer.allocate(EXPECTED_VALUE.length());
int remaining = EXPECTED_VALUE.length();
byteBuffer.limit(1);
composite.readBytes(byteBuffer);
remaining--;
assertEquals(remaining, composite.readableBytes());
byteBuffer.limit(byteBuffer.limit() + 5);
composite.readBytes(byteBuffer);
remaining -= 5;
assertEquals(remaining, composite.readableBytes());
byteBuffer.limit(byteBuffer.limit() + remaining);
composite.readBytes(byteBuffer);
assertEquals(0, composite.readableBytes());
assertEquals(EXPECTED_VALUE, new String(byteBuffer.array(), UTF_8));
}
@Test
public void readStreamShouldSucceed() throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int remaining = EXPECTED_VALUE.length();
composite.readBytes(bos, 1);
remaining--;
assertEquals(remaining, composite.readableBytes());
composite.readBytes(bos, 5);
remaining -= 5;
assertEquals(remaining, composite.readableBytes());
composite.readBytes(bos, remaining);
assertEquals(0, composite.readableBytes());
assertEquals(EXPECTED_VALUE, new String(bos.toByteArray(), UTF_8));
}
@Test
public void closeShouldCloseBuffers() {
composite = new CompositeBuffer();
Buffer mock1 = mock(Buffer.class);
Buffer mock2 = mock(Buffer.class);
composite.addBuffer(mock1);
composite.addBuffer(mock2);
composite.close();
verify(mock1).close();
verify(mock2).close();
}
private void splitAndAdd(String value) {
int partLength = Math.max(1, value.length() / 4);
for (int startIndex = 0, endIndex = 0; startIndex < value.length(); startIndex = endIndex) {
endIndex = Math.min(value.length(), startIndex + partLength);
String part = value.substring(startIndex, endIndex);
composite.addBuffer(Buffers.wrap(part.getBytes(UTF_8)));
}
assertEquals(value.length(), composite.readableBytes());
}
}