mirror of https://github.com/grpc/grpc-java.git
Fix bugs exposed by test_TestServiceBenchmarks.
More bugs were lurking in this section of code. In some unknown way, code I'm working on triggered frequent (9/10) failures of test_TestServiceBenchmarks. These were all possible to be triggered on HEAD, but for some unknown reason are not. I am pretty certain we have at least one transport that handles exceptions poorly, such as by throwing away the exception and not informing the Operation of the error, which causes this test to time out. Timeouts in this test are thus likely an indicator of a real bug, even when it doesn't show any exception, because there was likely an exception and it was eaten without logging. Bugs fixed (in order of discovery): 1) In Deframer, currentLength and inFrame were not reset after consuming a context or status message. 2) In consolidate(), if suffix was greater than the buffer and the buffer contained data, the data would be lost. 3) In Deframer, some (unknown) Operation's addPayload() did not consume all of the provided InputStream immediately (maybe it does so on another thread), leaving those bytes in frame. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=69402916
This commit is contained in:
parent
88a758236a
commit
82c87abcbf
|
|
@ -5,6 +5,7 @@ import com.google.net.stubby.GrpcFramingUtil;
|
|||
import com.google.net.stubby.Operation;
|
||||
import com.google.net.stubby.Status;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
|
@ -62,6 +63,9 @@ public abstract class Deframer<F> {
|
|||
return consolidate();
|
||||
}
|
||||
if (GrpcFramingUtil.isPayloadFrame(currentFlags)) {
|
||||
// Advance stream now, because target.addPayload() may not or may process the frame on
|
||||
// another thread.
|
||||
framedChunk = new ByteArrayInputStream(ByteStreams.toByteArray(framedChunk));
|
||||
try {
|
||||
// Report payload to the receiving operation
|
||||
target.addPayload(framedChunk, Operation.Phase.PAYLOAD);
|
||||
|
|
@ -75,19 +79,29 @@ public abstract class Deframer<F> {
|
|||
// Writing a custom parser would have to do varint handling and potentially
|
||||
// deal with out-of-order tags etc.
|
||||
Transport.ContextValue contextValue = Transport.ContextValue.parseFrom(framedChunk);
|
||||
try {
|
||||
target.addContext(contextValue.getKey(),
|
||||
contextValue.getValue().newInput(),
|
||||
target.getPhase());
|
||||
} finally {
|
||||
currentLength = LENGTH_NOT_SET;
|
||||
inFrame = false;
|
||||
}
|
||||
} else if (GrpcFramingUtil.isStatusFrame(currentFlags)) {
|
||||
int status = framedChunk.read() << 8 | framedChunk.read();
|
||||
Transport.Code code = Transport.Code.valueOf(status);
|
||||
// TODO(user): Resolve what to do with remainder of framedChunk
|
||||
try {
|
||||
if (code == null) {
|
||||
// Log for unknown code
|
||||
target.close(new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
|
||||
} else {
|
||||
target.close(new Status(code));
|
||||
}
|
||||
} finally {
|
||||
currentLength = LENGTH_NOT_SET;
|
||||
inFrame = false;
|
||||
}
|
||||
}
|
||||
if (grpcStream.available() == 0) {
|
||||
// We've processed all the data so consolidate the underlying buffers
|
||||
|
|
|
|||
|
|
@ -88,21 +88,28 @@ public class InputStreamDeframer extends Deframer<InputStream> {
|
|||
if (remainingSuffix == 0) {
|
||||
// No suffix so clear
|
||||
suffix = null;
|
||||
} else if (buffer == null || remainingSuffix > buffer.length - bufferIndex) {
|
||||
// Suffix exceeds current buffer size
|
||||
buffer = ByteStreams.toByteArray(suffix);
|
||||
return;
|
||||
}
|
||||
int bufferLength = buffer == null ? 0 : buffer.length;
|
||||
int bytesInBuffer = bufferLength - bufferIndex;
|
||||
// Shift existing bytes
|
||||
if (bufferLength < bytesInBuffer + remainingSuffix) {
|
||||
// Buffer too small, so create a new buffer before copying in the suffix
|
||||
byte[] newBuffer = new byte[bytesInBuffer + remainingSuffix];
|
||||
if (bytesInBuffer > 0) {
|
||||
System.arraycopy(buffer, bufferIndex, newBuffer, 0, bytesInBuffer);
|
||||
}
|
||||
buffer = newBuffer;
|
||||
bufferIndex = 0;
|
||||
} else if (buffer.length == bufferIndex) {
|
||||
// Buffer has been fully consumed, copy suffix into it
|
||||
ByteStreams.readFully(suffix, buffer, buffer.length - remainingSuffix, remainingSuffix);
|
||||
bufferIndex = buffer.length - remainingSuffix;
|
||||
} else {
|
||||
// Buffer has been partially consumed so shift the buffer before copying in the suffix
|
||||
System.arraycopy(buffer, bufferIndex, buffer, bufferIndex - remainingSuffix,
|
||||
buffer.length - bufferIndex);
|
||||
ByteStreams.readFully(suffix, buffer, buffer.length - remainingSuffix, remainingSuffix);
|
||||
// Enough space is in buffer, so shift the existing bytes to open up exactly enough bytes
|
||||
// for the suffix at the end.
|
||||
System.arraycopy(buffer, bufferIndex, buffer, bufferIndex - remainingSuffix, bytesInBuffer);
|
||||
bufferIndex -= remainingSuffix;
|
||||
}
|
||||
// Write suffix to buffer
|
||||
ByteStreams.readFully(suffix, buffer, buffer.length - remainingSuffix, remainingSuffix);
|
||||
suffix = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in New Issue