From c47d948a472936c54693dd49cac17d66a87420c6 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Fri, 12 Aug 2016 15:19:22 -0700 Subject: [PATCH] protobuf: copy input data before decoding CodedInputStream is risk averse in ways that hurt performance when parsing large messages. gRPC knows how large the input size is as it is being read from the wire, and only tries to parse it once the entire message has been read in. The message is represented as chunks of memory strung together in a CompositeReadableBuffer, and then wrapped in a custom BufferInputStream. When passed to Protobuf, CodedInputStream attempts to read data out of this InputStream into CIS's internal 4K buffer. For messages that are much larger, CIS copies from the input in chunks of 4K and saved in an ArrayList. Once the entire message size is read in, it is re-copied into one large byte array and passed back up. This only happens for ByteStrings and ByteBuffers that are read out of CIS. (See CIS.readRawBytesSlowPath for implementation). gRPC doesn't need this overhead, since we already have the entire message in memory, albeit in chunks. This change copies the composite buffer into a single heap byte buffer, and passes this (via UnsafeByteOperations) into CodedInputStream. This pays one copy to build the heap buffer, but avoids the two copes in CIS. This also ensures that the buffer is considered "immutable" from CIS's point of view. Because CIS does not have ByteString aliasing turned on, this large buffer will not accidentally be kept in memory even if only tiny fields from the proto are still referenced. Instead, reading ByteStrings out of CIS will always copy. (This copy, and the problems it avoids, can be turned off by calling CIS.enableAliasing.) Benchmark results will come shortly, but initial testing shows significant speedup in throughput tests. Profiling has shown that copying memory was a large time consumer for messages of size 1MB. --- .../io/grpc/internal/ReadableBuffers.java | 2 +- .../io/grpc/protobuf/lite/ProtoLiteUtils.java | 43 +++++++++++++++---- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/ReadableBuffers.java b/core/src/main/java/io/grpc/internal/ReadableBuffers.java index 6832940289..8eba4fdf75 100644 --- a/core/src/main/java/io/grpc/internal/ReadableBuffers.java +++ b/core/src/main/java/io/grpc/internal/ReadableBuffers.java @@ -312,7 +312,7 @@ public final class ReadableBuffers { /** * An {@link InputStream} that is backed by a {@link ReadableBuffer}. */ - private static class BufferInputStream extends InputStream implements KnownLength { + private static final class BufferInputStream extends InputStream implements KnownLength { final ReadableBuffer buffer; public BufferInputStream(ReadableBuffer buffer) { diff --git a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java index 3f51550389..a66f57b5a3 100644 --- a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java +++ b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java @@ -40,10 +40,13 @@ import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; import io.grpc.ExperimentalApi; +import io.grpc.KnownLength; import io.grpc.Metadata; import io.grpc.MethodDescriptor.Marshaller; import io.grpc.Status; +import io.grpc.internal.GrpcUtil; +import java.io.IOException; import java.io.InputStream; /** @@ -107,23 +110,45 @@ public class ProtoLiteUtils { } } } + CodedInputStream cis = null; try { - return parseFrom(stream); + if (stream instanceof KnownLength) { + int size = stream.available(); + if (size > 0 && size <= GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE) { + byte[] buf = new byte[size]; + int chunkSize; + int position = 0; + while ((chunkSize = stream.read(buf, position, buf.length - position)) != -1) { + position += chunkSize; + } + if (buf.length != position) { + throw new RuntimeException("size inaccurate: " + buf.length + " != " + position); + } + cis = CodedInputStream.newInstance(buf); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + if (cis == null) { + cis = CodedInputStream.newInstance(stream); + } + // Pre-create the CodedInputStream so that we can remove the size limit restriction + // when parsing. + cis.setSizeLimit(Integer.MAX_VALUE); + + try { + return parseFrom(cis); } catch (InvalidProtocolBufferException ipbe) { throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence") .withCause(ipbe).asRuntimeException(); } } - private T parseFrom(InputStream stream) throws InvalidProtocolBufferException { - // Pre-create the CodedInputStream so that we can remove the size limit restriction - // when parsing. - CodedInputStream codedInput = CodedInputStream.newInstance(stream); - codedInput.setSizeLimit(Integer.MAX_VALUE); - - T message = parser.parseFrom(codedInput, globalRegistry); + private T parseFrom(CodedInputStream stream) throws InvalidProtocolBufferException { + T message = parser.parseFrom(stream, globalRegistry); try { - codedInput.checkLastTagWas(0); + stream.checkLastTagWas(0); return message; } catch (InvalidProtocolBufferException e) { e.setUnfinishedMessage(message);