protobuf: copy input data before decoding

CodedInputStream is risk averse in ways that hurt performance when
parsing large messages.  gRPC knows how large the input size is as it
is being read from the wire, and only tries to parse it once the entire
message has been read in.  The message is represented as chunks of
memory strung together in a CompositeReadableBuffer, and then wrapped
in a custom BufferInputStream.

When passed to Protobuf, CodedInputStream attempts to read data out
of this InputStream into CIS's internal 4K buffer.  For messages that
are much larger, CIS copies from the input in chunks of 4K and saved in
an ArrayList.  Once the entire message size is read in, it is re-copied
into one large byte array and passed back up.  This only happens for
ByteStrings and ByteBuffers that are read out of CIS.  (See
CIS.readRawBytesSlowPath for implementation).

gRPC doesn't need this overhead, since we already have the entire
message in memory, albeit in chunks.  This change copies the composite
buffer into a single heap byte buffer, and passes this (via
UnsafeByteOperations) into CodedInputStream.  This pays one copy to
build the heap buffer, but avoids the two copes in CIS.  This also
ensures that the buffer is considered "immutable" from CIS's point of
view.

Because CIS does not have ByteString aliasing turned on, this large
buffer will not accidentally be kept in memory even if only tiny fields
from the proto are still referenced.  Instead, reading ByteStrings out
of CIS will always copy.  (This copy, and the problems it avoids, can
be turned off by calling CIS.enableAliasing.)

Benchmark results will come shortly, but initial testing shows
significant speedup in throughput tests.  Profiling has shown that
copying memory was a large time consumer for messages of size 1MB.
This commit is contained in:
Carl Mastrangelo 2016-08-12 15:19:22 -07:00
parent 09d663faf1
commit c47d948a47
2 changed files with 35 additions and 10 deletions

View File

@ -312,7 +312,7 @@ public final class ReadableBuffers {
/**
* An {@link InputStream} that is backed by a {@link ReadableBuffer}.
*/
private static class BufferInputStream extends InputStream implements KnownLength {
private static final class BufferInputStream extends InputStream implements KnownLength {
final ReadableBuffer buffer;
public BufferInputStream(ReadableBuffer buffer) {

View File

@ -40,10 +40,13 @@ import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import io.grpc.ExperimentalApi;
import io.grpc.KnownLength;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import java.io.IOException;
import java.io.InputStream;
/**
@ -107,23 +110,45 @@ public class ProtoLiteUtils {
}
}
}
CodedInputStream cis = null;
try {
return parseFrom(stream);
if (stream instanceof KnownLength) {
int size = stream.available();
if (size > 0 && size <= GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE) {
byte[] buf = new byte[size];
int chunkSize;
int position = 0;
while ((chunkSize = stream.read(buf, position, buf.length - position)) != -1) {
position += chunkSize;
}
if (buf.length != position) {
throw new RuntimeException("size inaccurate: " + buf.length + " != " + position);
}
cis = CodedInputStream.newInstance(buf);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
if (cis == null) {
cis = CodedInputStream.newInstance(stream);
}
// Pre-create the CodedInputStream so that we can remove the size limit restriction
// when parsing.
cis.setSizeLimit(Integer.MAX_VALUE);
try {
return parseFrom(cis);
} catch (InvalidProtocolBufferException ipbe) {
throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence")
.withCause(ipbe).asRuntimeException();
}
}
private T parseFrom(InputStream stream) throws InvalidProtocolBufferException {
// Pre-create the CodedInputStream so that we can remove the size limit restriction
// when parsing.
CodedInputStream codedInput = CodedInputStream.newInstance(stream);
codedInput.setSizeLimit(Integer.MAX_VALUE);
T message = parser.parseFrom(codedInput, globalRegistry);
private T parseFrom(CodedInputStream stream) throws InvalidProtocolBufferException {
T message = parser.parseFrom(stream, globalRegistry);
try {
codedInput.checkLastTagWas(0);
stream.checkLastTagWas(0);
return message;
} catch (InvalidProtocolBufferException e) {
e.setUnfinishedMessage(message);