Random acts of garbage reduction

I noticed some opportunities to reduce allocations on hot paths
This commit is contained in:
Nick Hill 2021-01-21 10:45:31 -08:00 committed by GitHub
parent ae82c41032
commit 2072df9be6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 151 additions and 117 deletions

View File

@ -61,7 +61,7 @@ public final class CallOptions {
@Nullable
private String compressorName;
private Object[][] customOptions = new Object[0][2];
private Object[][] customOptions;
// Unmodifiable list
private List<ClientStreamTracer.Factory> streamTracerFactories = Collections.emptyList();
@ -364,6 +364,7 @@ public final class CallOptions {
}
private CallOptions() {
customOptions = new Object[0][2];
}
/**

View File

@ -59,7 +59,8 @@ import io.grpc.internal.StreamListener;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
@ -96,7 +97,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
@GuardedBy("this")
private Status shutdownStatus;
@GuardedBy("this")
private Set<InProcessStream> streams = new HashSet<>();
private final Set<InProcessStream> streams = Collections.newSetFromMap(
new IdentityHashMap<InProcessStream, Boolean>());
@GuardedBy("this")
private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
private final Attributes attributes;

View File

@ -34,7 +34,15 @@ import java.util.Queue;
public class CompositeReadableBuffer extends AbstractReadableBuffer {
private int readableBytes;
private final Queue<ReadableBuffer> buffers = new ArrayDeque<>();
private final Queue<ReadableBuffer> buffers;
public CompositeReadableBuffer(int initialCapacity) {
buffers = new ArrayDeque<>(initialCapacity);
}
public CompositeReadableBuffer() {
buffers = new ArrayDeque<>();
}
/**
* Adds a new {@link ReadableBuffer} at the end of the buffer list. After a buffer is added, it is
@ -64,92 +72,114 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer {
return readableBytes;
}
private static final NoThrowReadOperation<Void> UBYTE_OP =
new NoThrowReadOperation<Void>() {
@Override
public int read(ReadableBuffer buffer, int length, Void unused, int value) {
return buffer.readUnsignedByte();
}
};
@Override
public int readUnsignedByte() {
ReadOperation op = new ReadOperation() {
@Override
int readInternal(ReadableBuffer buffer, int length) {
return buffer.readUnsignedByte();
}
};
execute(op, 1);
return op.value;
return executeNoThrow(UBYTE_OP, 1, null, 0);
}
private static final NoThrowReadOperation<Void> SKIP_OP =
new NoThrowReadOperation<Void>() {
@Override
public int read(ReadableBuffer buffer, int length, Void unused, int unused2) {
buffer.skipBytes(length);
return 0;
}
};
@Override
public void skipBytes(int length) {
execute(new ReadOperation() {
@Override
public int readInternal(ReadableBuffer buffer, int length) {
buffer.skipBytes(length);
return 0;
}
}, length);
executeNoThrow(SKIP_OP, length, null, 0);
}
private static final NoThrowReadOperation<byte[]> BYTE_ARRAY_OP =
new NoThrowReadOperation<byte[]>() {
@Override
public int read(ReadableBuffer buffer, int length, byte[] dest, int offset) {
buffer.readBytes(dest, offset, length);
return offset + length;
}
};
@Override
public void readBytes(byte[] dest, int destOffset, int length) {
executeNoThrow(BYTE_ARRAY_OP, length, dest, destOffset);
}
private static final NoThrowReadOperation<ByteBuffer> BYTE_BUF_OP =
new NoThrowReadOperation<ByteBuffer>() {
@Override
public int read(ReadableBuffer buffer, int length, ByteBuffer dest, int unused) {
// Change the limit so that only lengthToCopy bytes are available.
int prevLimit = dest.limit();
((Buffer) dest).limit(dest.position() + length);
// Write the bytes and restore the original limit.
buffer.readBytes(dest);
((Buffer) dest).limit(prevLimit);
return 0;
}
};
@Override
public void readBytes(ByteBuffer dest) {
executeNoThrow(BYTE_BUF_OP, dest.remaining(), dest, 0);
}
private static final ReadOperation<OutputStream> STREAM_OP =
new ReadOperation<OutputStream>() {
@Override
public int read(ReadableBuffer buffer, int length, OutputStream dest, int unused)
throws IOException {
buffer.readBytes(dest, length);
return 0;
}
};
@Override
public void readBytes(OutputStream dest, int length) throws IOException {
execute(STREAM_OP, length, dest, 0);
}
@Override
public void readBytes(final byte[] dest, final int destOffset, int length) {
execute(new ReadOperation() {
int currentOffset = destOffset;
@Override
public int readInternal(ReadableBuffer buffer, int length) {
buffer.readBytes(dest, currentOffset, length);
currentOffset += length;
return 0;
}
}, length);
}
@Override
public void readBytes(final ByteBuffer dest) {
execute(new ReadOperation() {
@Override
public int readInternal(ReadableBuffer buffer, int length) {
// Change the limit so that only lengthToCopy bytes are available.
int prevLimit = dest.limit();
((Buffer) dest).limit(dest.position() + length);
// Write the bytes and restore the original limit.
buffer.readBytes(dest);
((Buffer) dest).limit(prevLimit);
return 0;
}
}, dest.remaining());
}
@Override
public void readBytes(final OutputStream dest, int length) throws IOException {
ReadOperation op = new ReadOperation() {
@Override
public int readInternal(ReadableBuffer buffer, int length) throws IOException {
buffer.readBytes(dest, length);
return 0;
}
};
execute(op, length);
// If an exception occurred, throw it.
if (op.isError()) {
throw op.ex;
public ReadableBuffer readBytes(int length) {
if (length <= 0) {
return ReadableBuffers.empty();
}
}
@Override
public CompositeReadableBuffer readBytes(int length) {
checkReadable(length);
readableBytes -= length;
CompositeReadableBuffer newBuffer = new CompositeReadableBuffer();
while (length > 0) {
ReadableBuffer newBuffer = null;
CompositeReadableBuffer newComposite = null;
do {
ReadableBuffer buffer = buffers.peek();
if (buffer.readableBytes() > length) {
newBuffer.addBuffer(buffer.readBytes(length));
int readable = buffer.readableBytes();
ReadableBuffer readBuffer;
if (readable > length) {
readBuffer = buffer.readBytes(length);
length = 0;
} else {
newBuffer.addBuffer(buffers.poll());
length -= buffer.readableBytes();
readBuffer = buffers.poll();
length -= readable;
}
}
if (newBuffer == null) {
newBuffer = readBuffer;
} else {
if (newComposite == null) {
newComposite =
new CompositeReadableBuffer(length == 0 ? 2 : Math.min(buffers.size() + 2, 16));
newComposite.addBuffer(newBuffer);
newBuffer = newComposite;
}
newComposite.addBuffer(readBuffer);
}
} while (length > 0);
return newBuffer;
}
@ -164,7 +194,7 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer {
* Executes the given {@link ReadOperation} against the {@link ReadableBuffer}s required to
* satisfy the requested {@code length}.
*/
private void execute(ReadOperation op, int length) {
private <T> int execute(ReadOperation<T> op, int length, T dest, int value) throws IOException {
checkReadable(length);
if (!buffers.isEmpty()) {
@ -176,10 +206,7 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer {
int lengthToCopy = Math.min(length, buffer.readableBytes());
// Perform the read operation for this buffer.
op.read(buffer, lengthToCopy);
if (op.isError()) {
return;
}
value = op.read(buffer, lengthToCopy, dest, value);
length -= lengthToCopy;
readableBytes -= lengthToCopy;
@ -189,6 +216,16 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer {
// Should never get here.
throw new AssertionError("Failed executing read operation");
}
return value;
}
private <T> int executeNoThrow(NoThrowReadOperation<T> op, int length, T dest, int value) {
try {
return execute(op, length, dest, value);
} catch (IOException e) {
throw new AssertionError(e); // shouldn't happen
}
}
/**
@ -202,32 +239,23 @@ public class CompositeReadableBuffer extends AbstractReadableBuffer {
}
/**
* A simple read operation to perform on a single {@link ReadableBuffer}. All state management for
* the buffers is done by {@link CompositeReadableBuffer#execute(ReadOperation, int)}.
* A simple read operation to perform on a single {@link ReadableBuffer}.
* All state management for the buffers is done by
* {@link CompositeReadableBuffer#execute(ReadOperation, int, Object, int)}.
*/
private abstract static class ReadOperation {
private interface ReadOperation<T> {
/**
* Only used by {@link CompositeReadableBuffer#readUnsignedByte()}.
* This method can also be used to simultaneously perform operation-specific int-valued
* aggregation over the sequence of buffers in a {@link CompositeReadableBuffer}.
* {@code value} is the return value from the prior buffer, or the "initial" value passed
* to {@code execute()} in the case of the first buffer. {@code execute()} returns the value
* returned by the operation called on the last buffer.
*/
int value;
int read(ReadableBuffer buffer, int length, T dest, int value) throws IOException;
}
/**
* Only used by {@link CompositeReadableBuffer#readBytes(OutputStream, int)}.
*/
IOException ex;
final void read(ReadableBuffer buffer, int length) {
try {
value = readInternal(buffer, length);
} catch (IOException e) {
ex = e;
}
}
final boolean isError() {
return ex != null;
}
abstract int readInternal(ReadableBuffer buffer, int length) throws IOException;
private interface NoThrowReadOperation<T> extends ReadOperation<T> {
@Override
int read(ReadableBuffer buffer, int length, T dest, int value);
}
}

View File

@ -16,7 +16,9 @@
package io.grpc.internal;
import java.util.HashSet;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe;
/**
@ -25,7 +27,7 @@ import javax.annotation.concurrent.NotThreadSafe;
@NotThreadSafe
public abstract class InUseStateAggregator<T> {
private final HashSet<T> inUseObjects = new HashSet<>();
private final Set<T> inUseObjects = Collections.newSetFromMap(new IdentityHashMap<T,Boolean>());
/**
* Update the in-use state of an object. Initially no object is in use.

View File

@ -76,7 +76,7 @@ public class MessageFramer implements Framer {
private Compressor compressor = Codec.Identity.NONE;
private boolean messageCompression = true;
private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter();
private final byte[] headerScratch = new byte[HEADER_LENGTH];
private final ByteBuffer headerScratch = ByteBuffer.allocate(HEADER_LENGTH);
private final WritableBufferAllocator bufferAllocator;
private final StatsTraceContext statsTraceCtx;
// transportTracer is nullable until it is integrated with client transports
@ -218,15 +218,14 @@ public class MessageFramer implements Framer {
String.format("message too large %d > %d", messageLength , maxOutboundMessageSize))
.asRuntimeException();
}
ByteBuffer header = ByteBuffer.wrap(headerScratch);
header.put(UNCOMPRESSED);
header.putInt(messageLength);
headerScratch.clear();
headerScratch.put(UNCOMPRESSED).putInt(messageLength);
// Allocate the initial buffer chunk based on frame header + payload length.
// Note that the allocator may allocate a buffer larger or smaller than this length
if (buffer == null) {
buffer = bufferAllocator.allocate(header.position() + messageLength);
buffer = bufferAllocator.allocate(headerScratch.position() + messageLength);
}
writeRaw(headerScratch, 0, header.position());
writeRaw(headerScratch.array(), 0, headerScratch.position());
return writeToOutputStream(message, outputStreamAdapter);
}
@ -234,12 +233,11 @@ public class MessageFramer implements Framer {
* Write a message that has been serialized to a sequence of buffers.
*/
private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compressed) {
ByteBuffer header = ByteBuffer.wrap(headerScratch);
header.put(compressed ? COMPRESSED : UNCOMPRESSED);
int messageLength = bufferChain.readableBytes();
header.putInt(messageLength);
headerScratch.clear();
headerScratch.put(compressed ? COMPRESSED : UNCOMPRESSED).putInt(messageLength);
WritableBuffer writeableHeader = bufferAllocator.allocate(HEADER_LENGTH);
writeableHeader.write(headerScratch, 0, header.position());
writeableHeader.write(headerScratch.array(), 0, headerScratch.position());
if (messageLength == 0) {
// the payload had 0 length so make the header the current buffer.
buffer = writeableHeader;

View File

@ -97,10 +97,12 @@ public final class ReflectionLongAdderCounter implements LongCounter {
return initializationException == null;
}
private static final Object[] one = new Object[] { 1L };
@Override
public void add(long delta) {
try {
addMethod.invoke(instance, delta);
addMethod.invoke(instance, delta == 1L ? one : new Object[] { delta });
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {

View File

@ -36,7 +36,8 @@ import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
@ -53,8 +54,8 @@ class CronetClientTransport implements ConnectionClientTransport {
private Listener listener;
private final Object lock = new Object();
@GuardedBy("lock")
private final Set<CronetClientStream> streams =
new HashSet<CronetClientStream>();
private final Set<CronetClientStream> streams = Collections.newSetFromMap(
new IdentityHashMap<CronetClientStream, Boolean>());
private final Executor executor;
private final int maxMessageSize;
private final boolean alwaysUsePut;