diff --git a/api/src/main/java/io/grpc/CallOptions.java b/api/src/main/java/io/grpc/CallOptions.java index 2c9b1c5208..c7504a6506 100644 --- a/api/src/main/java/io/grpc/CallOptions.java +++ b/api/src/main/java/io/grpc/CallOptions.java @@ -61,7 +61,7 @@ public final class CallOptions { @Nullable private String compressorName; - private Object[][] customOptions = new Object[0][2]; + private Object[][] customOptions; // Unmodifiable list private List streamTracerFactories = Collections.emptyList(); @@ -364,6 +364,7 @@ public final class CallOptions { } private CallOptions() { + customOptions = new Object[0][2]; } /** diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index dee19274b5..5967bf581b 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -59,7 +59,8 @@ import io.grpc.internal.StreamListener; import java.io.InputStream; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.HashSet; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; import java.util.Set; import java.util.concurrent.Executor; @@ -96,7 +97,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans @GuardedBy("this") private Status shutdownStatus; @GuardedBy("this") - private Set streams = new HashSet<>(); + private final Set streams = Collections.newSetFromMap( + new IdentityHashMap()); @GuardedBy("this") private List serverStreamTracerFactories; private final Attributes attributes; diff --git a/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java b/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java index 93dda7cdbc..34021d8a82 100644 --- a/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java +++ b/core/src/main/java/io/grpc/internal/CompositeReadableBuffer.java @@ -34,7 +34,15 @@ import java.util.Queue; public class CompositeReadableBuffer extends AbstractReadableBuffer { private int readableBytes; - private final Queue buffers = new ArrayDeque<>(); + private final Queue buffers; + + public CompositeReadableBuffer(int initialCapacity) { + buffers = new ArrayDeque<>(initialCapacity); + } + + public CompositeReadableBuffer() { + buffers = new ArrayDeque<>(); + } /** * Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is @@ -64,92 +72,114 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer { return readableBytes; } + private static final NoThrowReadOperation UBYTE_OP = + new NoThrowReadOperation() { + @Override + public int read(ReadableBuffer buffer, int length, Void unused, int value) { + return buffer.readUnsignedByte(); + } + }; + @Override public int readUnsignedByte() { - ReadOperation op = new ReadOperation() { - @Override - int readInternal(ReadableBuffer buffer, int length) { - return buffer.readUnsignedByte(); - } - }; - execute(op, 1); - return op.value; + return executeNoThrow(UBYTE_OP, 1, null, 0); } + private static final NoThrowReadOperation SKIP_OP = + new NoThrowReadOperation() { + @Override + public int read(ReadableBuffer buffer, int length, Void unused, int unused2) { + buffer.skipBytes(length); + return 0; + } + }; + @Override public void skipBytes(int length) { - execute(new ReadOperation() { - @Override - public int readInternal(ReadableBuffer buffer, int length) { - buffer.skipBytes(length); - return 0; - } - }, length); + executeNoThrow(SKIP_OP, length, null, 0); + } + + private static final NoThrowReadOperation BYTE_ARRAY_OP = + new NoThrowReadOperation() { + @Override + public int read(ReadableBuffer buffer, int length, byte[] dest, int offset) { + buffer.readBytes(dest, offset, length); + return offset + length; + } + }; + + @Override + public void readBytes(byte[] dest, int destOffset, int length) { + executeNoThrow(BYTE_ARRAY_OP, length, dest, destOffset); + } + + private static final NoThrowReadOperation BYTE_BUF_OP = + new NoThrowReadOperation() { + @Override + public int read(ReadableBuffer buffer, int length, ByteBuffer dest, int unused) { + // Change the limit so that only lengthToCopy bytes are available. + int prevLimit = dest.limit(); + ((Buffer) dest).limit(dest.position() + length); + // Write the bytes and restore the original limit. + buffer.readBytes(dest); + ((Buffer) dest).limit(prevLimit); + return 0; + } + }; + + @Override + public void readBytes(ByteBuffer dest) { + executeNoThrow(BYTE_BUF_OP, dest.remaining(), dest, 0); + } + + private static final ReadOperation STREAM_OP = + new ReadOperation() { + @Override + public int read(ReadableBuffer buffer, int length, OutputStream dest, int unused) + throws IOException { + buffer.readBytes(dest, length); + return 0; + } + }; + + @Override + public void readBytes(OutputStream dest, int length) throws IOException { + execute(STREAM_OP, length, dest, 0); } @Override - public void readBytes(final byte[] dest, final int destOffset, int length) { - execute(new ReadOperation() { - int currentOffset = destOffset; - @Override - public int readInternal(ReadableBuffer buffer, int length) { - buffer.readBytes(dest, currentOffset, length); - currentOffset += length; - return 0; - } - }, length); - } - - @Override - public void readBytes(final ByteBuffer dest) { - execute(new ReadOperation() { - @Override - public int readInternal(ReadableBuffer buffer, int length) { - // Change the limit so that only lengthToCopy bytes are available. - int prevLimit = dest.limit(); - ((Buffer) dest).limit(dest.position() + length); - - // Write the bytes and restore the original limit. - buffer.readBytes(dest); - ((Buffer) dest).limit(prevLimit); - return 0; - } - }, dest.remaining()); - } - - @Override - public void readBytes(final OutputStream dest, int length) throws IOException { - ReadOperation op = new ReadOperation() { - @Override - public int readInternal(ReadableBuffer buffer, int length) throws IOException { - buffer.readBytes(dest, length); - return 0; - } - }; - execute(op, length); - - // If an exception occurred, throw it. - if (op.isError()) { - throw op.ex; + public ReadableBuffer readBytes(int length) { + if (length <= 0) { + return ReadableBuffers.empty(); } - } - - @Override - public CompositeReadableBuffer readBytes(int length) { checkReadable(length); readableBytes -= length; - CompositeReadableBuffer newBuffer = new CompositeReadableBuffer(); - while (length > 0) { + ReadableBuffer newBuffer = null; + CompositeReadableBuffer newComposite = null; + do { ReadableBuffer buffer = buffers.peek(); - if (buffer.readableBytes() > length) { - newBuffer.addBuffer(buffer.readBytes(length)); + int readable = buffer.readableBytes(); + ReadableBuffer readBuffer; + if (readable > length) { + readBuffer = buffer.readBytes(length); length = 0; } else { - newBuffer.addBuffer(buffers.poll()); - length -= buffer.readableBytes(); + readBuffer = buffers.poll(); + length -= readable; } - } + if (newBuffer == null) { + newBuffer = readBuffer; + } else { + if (newComposite == null) { + newComposite = + new CompositeReadableBuffer(length == 0 ? 2 : Math.min(buffers.size() + 2, 16)); + newComposite.addBuffer(newBuffer); + newBuffer = newComposite; + } + newComposite.addBuffer(readBuffer); + } + } while (length > 0); return newBuffer; } @@ -164,7 +194,7 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer { * Executes the given {@link ReadOperation} against the {@link ReadableBuffer}s required to * satisfy the requested {@code length}. */ - private void execute(ReadOperation op, int length) { + private int execute(ReadOperation op, int length, T dest, int value) throws IOException { checkReadable(length); if (!buffers.isEmpty()) { @@ -176,10 +206,7 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer { int lengthToCopy = Math.min(length, buffer.readableBytes()); // Perform the read operation for this buffer. - op.read(buffer, lengthToCopy); - if (op.isError()) { - return; - } + value = op.read(buffer, lengthToCopy, dest, value); length -= lengthToCopy; readableBytes -= lengthToCopy; @@ -189,6 +216,16 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer { // Should never get here. throw new AssertionError("Failed executing read operation"); } + + return value; + } + + private int executeNoThrow(NoThrowReadOperation op, int length, T dest, int value) { + try { + return execute(op, length, dest, value); + } catch (IOException e) { + throw new AssertionError(e); // shouldn't happen + } } /** @@ -202,32 +239,23 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer { } /** - * A simple read operation to perform on a single {@link ReadableBuffer}. All state management for - * the buffers is done by {@link CompositeReadableBuffer#execute(ReadOperation, int)}. + * A simple read operation to perform on a single {@link ReadableBuffer}. + * All state management for the buffers is done by + * {@link CompositeReadableBuffer#execute(ReadOperation, int, Object, int)}. */ - private abstract static class ReadOperation { + private interface ReadOperation { /** - * Only used by {@link CompositeReadableBuffer#readUnsignedByte()}. + * This method can also be used to simultaneously perform operation-specific int-valued + * aggregation over the sequence of buffers in a {@link CompositeReadableBuffer}. + * {@code value} is the return value from the prior buffer, or the "initial" value passed + * to {@code execute()} in the case of the first buffer. {@code execute()} returns the value + * returned by the operation called on the last buffer. */ - int value; + int read(ReadableBuffer buffer, int length, T dest, int value) throws IOException; + } - /** - * Only used by {@link CompositeReadableBuffer#readBytes(OutputStream, int)}. - */ - IOException ex; - - final void read(ReadableBuffer buffer, int length) { - try { - value = readInternal(buffer, length); - } catch (IOException e) { - ex = e; - } - } - - final boolean isError() { - return ex != null; - } - - abstract int readInternal(ReadableBuffer buffer, int length) throws IOException; + private interface NoThrowReadOperation extends ReadOperation { + @Override + int read(ReadableBuffer buffer, int length, T dest, int value); } } diff --git a/core/src/main/java/io/grpc/internal/InUseStateAggregator.java b/core/src/main/java/io/grpc/internal/InUseStateAggregator.java index 88181fd512..f4f3a186d8 100644 --- a/core/src/main/java/io/grpc/internal/InUseStateAggregator.java +++ b/core/src/main/java/io/grpc/internal/InUseStateAggregator.java @@ -16,7 +16,9 @@ package io.grpc.internal; -import java.util.HashSet; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Set; import javax.annotation.concurrent.NotThreadSafe; /** @@ -25,7 +27,7 @@ import javax.annotation.concurrent.NotThreadSafe; @NotThreadSafe public abstract class InUseStateAggregator { - private final HashSet inUseObjects = new HashSet<>(); + private final Set inUseObjects = Collections.newSetFromMap(new IdentityHashMap()); /** * Update the in-use state of an object. Initially no object is in use. diff --git a/core/src/main/java/io/grpc/internal/MessageFramer.java b/core/src/main/java/io/grpc/internal/MessageFramer.java index ad86d450fa..83592e691a 100644 --- a/core/src/main/java/io/grpc/internal/MessageFramer.java +++ b/core/src/main/java/io/grpc/internal/MessageFramer.java @@ -76,7 +76,7 @@ public class MessageFramer implements Framer { private Compressor compressor = Codec.Identity.NONE; private boolean messageCompression = true; private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter(); - private final byte[] headerScratch = new byte[HEADER_LENGTH]; + private final ByteBuffer headerScratch = ByteBuffer.allocate(HEADER_LENGTH); private final WritableBufferAllocator bufferAllocator; private final StatsTraceContext statsTraceCtx; // transportTracer is nullable until it is integrated with client transports @@ -218,15 +218,14 @@ public class MessageFramer implements Framer { String.format("message too large %d > %d", messageLength , maxOutboundMessageSize)) .asRuntimeException(); } - ByteBuffer header = ByteBuffer.wrap(headerScratch); - header.put(UNCOMPRESSED); - header.putInt(messageLength); + headerScratch.clear(); + headerScratch.put(UNCOMPRESSED).putInt(messageLength); // Allocate the initial buffer chunk based on frame header + payload length. // Note that the allocator may allocate a buffer larger or smaller than this length if (buffer == null) { - buffer = bufferAllocator.allocate(header.position() + messageLength); + buffer = bufferAllocator.allocate(headerScratch.position() + messageLength); } - writeRaw(headerScratch, 0, header.position()); + writeRaw(headerScratch.array(), 0, headerScratch.position()); return writeToOutputStream(message, outputStreamAdapter); } @@ -234,12 +233,11 @@ public class MessageFramer implements Framer { * Write a message that has been serialized to a sequence of buffers. */ private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compressed) { - ByteBuffer header = ByteBuffer.wrap(headerScratch); - header.put(compressed ? COMPRESSED : UNCOMPRESSED); int messageLength = bufferChain.readableBytes(); - header.putInt(messageLength); + headerScratch.clear(); + headerScratch.put(compressed ? COMPRESSED : UNCOMPRESSED).putInt(messageLength); WritableBuffer writeableHeader = bufferAllocator.allocate(HEADER_LENGTH); - writeableHeader.write(headerScratch, 0, header.position()); + writeableHeader.write(headerScratch.array(), 0, headerScratch.position()); if (messageLength == 0) { // the payload had 0 length so make the header the current buffer. buffer = writeableHeader; diff --git a/core/src/main/java/io/grpc/internal/ReflectionLongAdderCounter.java b/core/src/main/java/io/grpc/internal/ReflectionLongAdderCounter.java index 6126706875..fddeee82c1 100644 --- a/core/src/main/java/io/grpc/internal/ReflectionLongAdderCounter.java +++ b/core/src/main/java/io/grpc/internal/ReflectionLongAdderCounter.java @@ -97,10 +97,12 @@ public final class ReflectionLongAdderCounter implements LongCounter { return initializationException == null; } + private static final Object[] one = new Object[] { 1L }; + @Override public void add(long delta) { try { - addMethod.invoke(instance, delta); + addMethod.invoke(instance, delta == 1L ? one : new Object[] { delta }); } catch (IllegalAccessException e) { throw new RuntimeException(e); } catch (InvocationTargetException e) { diff --git a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java index fab249e656..dc4fc45ae4 100644 --- a/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java +++ b/cronet/src/main/java/io/grpc/cronet/CronetClientTransport.java @@ -36,7 +36,8 @@ import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.HashSet; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.Set; import java.util.concurrent.Executor; import javax.annotation.Nullable; @@ -53,8 +54,8 @@ class CronetClientTransport implements ConnectionClientTransport { private Listener listener; private final Object lock = new Object(); @GuardedBy("lock") - private final Set streams = - new HashSet(); + private final Set streams = Collections.newSetFromMap( + new IdentityHashMap()); private final Executor executor; private final int maxMessageSize; private final boolean alwaysUsePut;