diff --git a/core/src/main/java/com/google/net/stubby/transport/Deframer.java b/core/src/main/java/com/google/net/stubby/transport/Deframer.java index 5721c46afe..3e3fe9db13 100644 --- a/core/src/main/java/com/google/net/stubby/transport/Deframer.java +++ b/core/src/main/java/com/google/net/stubby/transport/Deframer.java @@ -5,6 +5,7 @@ import com.google.net.stubby.GrpcFramingUtil; import com.google.net.stubby.Operation; import com.google.net.stubby.Status; +import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; @@ -62,6 +63,9 @@ public abstract class Deframer { return consolidate(); } if (GrpcFramingUtil.isPayloadFrame(currentFlags)) { + // Advance stream now, because target.addPayload() may not or may process the frame on + // another thread. + framedChunk = new ByteArrayInputStream(ByteStreams.toByteArray(framedChunk)); try { // Report payload to the receiving operation target.addPayload(framedChunk, Operation.Phase.PAYLOAD); @@ -75,18 +79,28 @@ public abstract class Deframer { // Writing a custom parser would have to do varint handling and potentially // deal with out-of-order tags etc. Transport.ContextValue contextValue = Transport.ContextValue.parseFrom(framedChunk); - target.addContext(contextValue.getKey(), - contextValue.getValue().newInput(), - target.getPhase()); + try { + target.addContext(contextValue.getKey(), + contextValue.getValue().newInput(), + target.getPhase()); + } finally { + currentLength = LENGTH_NOT_SET; + inFrame = false; + } } else if (GrpcFramingUtil.isStatusFrame(currentFlags)) { int status = framedChunk.read() << 8 | framedChunk.read(); Transport.Code code = Transport.Code.valueOf(status); // TODO(user): Resolve what to do with remainder of framedChunk - if (code == null) { - // Log for unknown code - target.close(new Status(Transport.Code.UNKNOWN, "Unknown status code " + status)); - } else { - target.close(new Status(code)); + try { + if (code == null) { + // Log for unknown code + target.close(new Status(Transport.Code.UNKNOWN, "Unknown status code " + status)); + } else { + target.close(new Status(code)); + } + } finally { + currentLength = LENGTH_NOT_SET; + inFrame = false; } } if (grpcStream.available() == 0) { diff --git a/core/src/main/java/com/google/net/stubby/transport/InputStreamDeframer.java b/core/src/main/java/com/google/net/stubby/transport/InputStreamDeframer.java index cc8817357d..344d65f878 100644 --- a/core/src/main/java/com/google/net/stubby/transport/InputStreamDeframer.java +++ b/core/src/main/java/com/google/net/stubby/transport/InputStreamDeframer.java @@ -88,21 +88,28 @@ public class InputStreamDeframer extends Deframer { if (remainingSuffix == 0) { // No suffix so clear suffix = null; - } else if (buffer == null || remainingSuffix > buffer.length - bufferIndex) { - // Suffix exceeds current buffer size - buffer = ByteStreams.toByteArray(suffix); + return; + } + int bufferLength = buffer == null ? 0 : buffer.length; + int bytesInBuffer = bufferLength - bufferIndex; + // Shift existing bytes + if (bufferLength < bytesInBuffer + remainingSuffix) { + // Buffer too small, so create a new buffer before copying in the suffix + byte[] newBuffer = new byte[bytesInBuffer + remainingSuffix]; + if (bytesInBuffer > 0) { + System.arraycopy(buffer, bufferIndex, newBuffer, 0, bytesInBuffer); + } + buffer = newBuffer; bufferIndex = 0; - } else if (buffer.length == bufferIndex) { - // Buffer has been fully consumed, copy suffix into it - ByteStreams.readFully(suffix, buffer, buffer.length - remainingSuffix, remainingSuffix); - bufferIndex = buffer.length - remainingSuffix; } else { - // Buffer has been partially consumed so shift the buffer before copying in the suffix - System.arraycopy(buffer, bufferIndex, buffer, bufferIndex - remainingSuffix, - buffer.length - bufferIndex); - ByteStreams.readFully(suffix, buffer, buffer.length - remainingSuffix, remainingSuffix); + // Enough space is in buffer, so shift the existing bytes to open up exactly enough bytes + // for the suffix at the end. + System.arraycopy(buffer, bufferIndex, buffer, bufferIndex - remainingSuffix, bytesInBuffer); bufferIndex -= remainingSuffix; } + // Write suffix to buffer + ByteStreams.readFully(suffix, buffer, buffer.length - remainingSuffix, remainingSuffix); + suffix = null; } @Override