core: switch to scheduled closing of MessageDeframer

This commit is contained in:
Eric Gribkoff 2017-06-27 09:48:18 -07:00
parent 16575d01a3
commit a446b5388f
7 changed files with 228 additions and 117 deletions

View File

@ -184,7 +184,8 @@ public abstract class AbstractClientStream extends AbstractStream
private ClientStreamListener listener; private ClientStreamListener listener;
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance(); private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
private Runnable deliveryStalledTask; private boolean deframerClosed = false;
private Runnable deframerClosedTask;
/** /**
* Whether the stream is closed from the transport's perspective. This can differ from {@link * Whether the stream is closed from the transport's perspective. This can differ from {@link
@ -210,18 +211,14 @@ public abstract class AbstractClientStream extends AbstractStream
} }
@Override @Override
public final void deliveryStalled() { public void deframerClosed(boolean hasPartialMessageIgnored) {
if (deliveryStalledTask != null) { deframerClosed = true;
deliveryStalledTask.run(); if (deframerClosedTask != null) {
deliveryStalledTask = null; deframerClosedTask.run();
deframerClosedTask = null;
} }
} }
@Override
public final void endOfStream() {
deliveryStalled();
}
@Override @Override
protected final ClientStreamListener listener() { protected final ClientStreamListener listener() {
return listener; return listener;
@ -266,7 +263,7 @@ public abstract class AbstractClientStream extends AbstractStream
} }
needToCloseFrame = false; needToCloseFrame = false;
deframe(frame, false); deframe(frame);
} finally { } finally {
if (needToCloseFrame) { if (needToCloseFrame) {
frame.close(); frame.close();
@ -314,18 +311,18 @@ public abstract class AbstractClientStream extends AbstractStream
statusReported = true; statusReported = true;
onStreamDeallocated(); onStreamDeallocated();
// If not stopping delivery, then we must wait until the deframer is stalled (i.e., it has no if (deframerClosed) {
// complete messages to deliver). deframerClosedTask = null;
if (stopDelivery || isDeframerStalled()) {
deliveryStalledTask = null;
closeListener(status, trailers); closeListener(status, trailers);
} else { } else {
deliveryStalledTask = new Runnable() { deframerClosedTask =
@Override new Runnable() {
public void run() { @Override
closeListener(status, trailers); public void run() {
} closeListener(status, trailers);
}; }
};
closeDeframer(stopDelivery);
} }
} }
@ -337,7 +334,6 @@ public abstract class AbstractClientStream extends AbstractStream
private void closeListener(Status status, Metadata trailers) { private void closeListener(Status status, Metadata trailers) {
if (!listenerClosed) { if (!listenerClosed) {
listenerClosed = true; listenerClosed = true;
closeDeframer();
statsTraceCtx.streamClosed(status); statsTraceCtx.streamClosed(status);
listener().closed(status, trailers); listener().closed(status, trailers);
} }

View File

@ -182,6 +182,11 @@ public abstract class AbstractServerStream extends AbstractStream
private ServerStreamListener listener; private ServerStreamListener listener;
private final StatsTraceContext statsTraceCtx; private final StatsTraceContext statsTraceCtx;
private boolean endOfStream = false;
private boolean deframerClosed = false;
private boolean immediateCloseRequested = false;
private Runnable deframerClosedTask;
protected TransportState(int maxMessageSize, StatsTraceContext statsTraceCtx) { protected TransportState(int maxMessageSize, StatsTraceContext statsTraceCtx) {
super(maxMessageSize, statsTraceCtx); super(maxMessageSize, statsTraceCtx);
this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx"); this.statsTraceCtx = Preconditions.checkNotNull(statsTraceCtx, "statsTraceCtx");
@ -202,12 +207,24 @@ public abstract class AbstractServerStream extends AbstractStream
} }
@Override @Override
public void deliveryStalled() {} public void deframerClosed(boolean hasPartialMessage) {
deframerClosed = true;
@Override if (endOfStream) {
public void endOfStream() { if (!immediateCloseRequested && hasPartialMessage) {
closeDeframer(); // We've received the entire stream and have data available but we don't have
listener().halfClosed(); // enough to read the next frame ... this is bad.
deframeFailed(Status.INTERNAL
.withDescription("Encountered end-of-stream mid-frame")
.asRuntimeException());
deframerClosedTask = null;
return;
}
listener.halfClosed();
}
if (deframerClosedTask != null) {
deframerClosedTask.run();
deframerClosedTask = null;
}
} }
@Override @Override
@ -224,8 +241,13 @@ public abstract class AbstractServerStream extends AbstractStream
* @param endOfStream {@code true} if no more data will be received on the stream. * @param endOfStream {@code true} if no more data will be received on the stream.
*/ */
public void inboundDataReceived(ReadableBuffer frame, boolean endOfStream) { public void inboundDataReceived(ReadableBuffer frame, boolean endOfStream) {
Preconditions.checkState(!this.endOfStream, "Past end of stream");
// Deframe the message. If a failure occurs, deframeFailed will be called. // Deframe the message. If a failure occurs, deframeFailed will be called.
deframe(frame, endOfStream); deframe(frame);
if (endOfStream) {
this.endOfStream = true;
closeDeframer(false);
}
} }
/** /**
@ -238,9 +260,22 @@ public abstract class AbstractServerStream extends AbstractStream
* *
* @param status the error status. Must not be {@link Status#OK}. * @param status the error status. Must not be {@link Status#OK}.
*/ */
public final void transportReportStatus(Status status) { public final void transportReportStatus(final Status status) {
Preconditions.checkArgument(!status.isOk(), "status must not be OK"); Preconditions.checkArgument(!status.isOk(), "status must not be OK");
closeListener(status); if (deframerClosed) {
deframerClosedTask = null;
closeListener(status);
} else {
deframerClosedTask =
new Runnable() {
@Override
public void run() {
closeListener(status);
}
};
immediateCloseRequested = true;
closeDeframer(true);
}
} }
/** /**
@ -249,7 +284,20 @@ public abstract class AbstractServerStream extends AbstractStream
* #transportReportStatus}. * #transportReportStatus}.
*/ */
public void complete() { public void complete() {
closeListener(Status.OK); if (deframerClosed) {
deframerClosedTask = null;
closeListener(Status.OK);
} else {
deframerClosedTask =
new Runnable() {
@Override
public void run() {
closeListener(Status.OK);
}
};
immediateCloseRequested = true;
closeDeframer(true);
}
} }
/** /**
@ -263,7 +311,6 @@ public abstract class AbstractServerStream extends AbstractStream
} }
listenerClosed = true; listenerClosed = true;
onStreamDeallocated(); onStreamDeallocated();
closeDeframer();
listener().closed(newStatus); listener().closed(newStatus);
} }
} }

View File

@ -147,7 +147,7 @@ public abstract class AbstractStream implements Stream {
} }
/** /**
* Called when a {@link #deframe(ReadableBuffer, boolean)} operation failed. * Called when a {@link #deframe(ReadableBuffer)} operation failed.
* *
* @param cause the actual failure * @param cause the actual failure
*/ */
@ -156,31 +156,36 @@ public abstract class AbstractStream implements Stream {
/** /**
* Closes this deframer and frees any resources. After this method is called, additional calls * Closes this deframer and frees any resources. After this method is called, additional calls
* will have no effect. * will have no effect.
*
* <p>When {@code stopDelivery} is false, the deframer will wait to close until any already
* queued messages have been delivered.
*
* <p>The deframer will invoke {@link #deframerClosed(boolean)} upon closing.
*
* @param stopDelivery interrupt pending deliveries and close immediately
*/ */
protected final void closeDeframer() { protected final void closeDeframer(boolean stopDelivery) {
deframer.close(); if (stopDelivery) {
} deframer.close();
} else {
/** deframer.closeWhenComplete();
* Indicates whether delivery is currently stalled, pending receipt of more data. }
*/
protected final boolean isDeframerStalled() {
return deframer.isStalled();
} }
/** /**
* Called to parse a received frame and attempt delivery of any completed * Called to parse a received frame and attempt delivery of any completed
* messages. Must be called from the transport thread. * messages. Must be called from the transport thread.
*/ */
protected final void deframe(ReadableBuffer frame, boolean endOfStream) { protected final void deframe(ReadableBuffer frame) {
if (deframer.isClosed()) { if (deframer.isClosed()) {
frame.close(); frame.close();
return; return;
} }
try { try {
deframer.deframe(frame, endOfStream); deframer.deframe(frame);
} catch (Throwable t) { } catch (Throwable t) {
deframeFailed(t); deframeFailed(t);
deframer.close(); // unrecoverable state
} }
} }
@ -196,6 +201,7 @@ public abstract class AbstractStream implements Stream {
deframer.request(numMessages); deframer.request(numMessages);
} catch (Throwable t) { } catch (Throwable t) {
deframeFailed(t); deframeFailed(t);
deframer.close(); // unrecoverable state
} }
} }

View File

@ -58,20 +58,14 @@ public class MessageDeframer implements Closeable {
/** /**
* Called to deliver the next complete message. * Called to deliver the next complete message.
* *
* @param is stream containing the message. * @param message stream containing the message.
*/ */
void messageRead(InputStream is); void messageRead(InputStream message);
/** /**
* Called when end-of-stream has not yet been reached but there are no complete messages * Called when the deframer closes.
* remaining to be delivered.
*/ */
void deliveryStalled(); void deframerClosed(boolean hasPartialMessage);
/**
* Called when the stream is complete and all messages have been successfully delivered.
*/
void endOfStream();
} }
private enum State { private enum State {
@ -86,13 +80,13 @@ public class MessageDeframer implements Closeable {
private State state = State.HEADER; private State state = State.HEADER;
private int requiredLength = HEADER_LENGTH; private int requiredLength = HEADER_LENGTH;
private boolean compressedFlag; private boolean compressedFlag;
private boolean endOfStream;
private CompositeReadableBuffer nextFrame; private CompositeReadableBuffer nextFrame;
private CompositeReadableBuffer unprocessed = new CompositeReadableBuffer(); private CompositeReadableBuffer unprocessed = new CompositeReadableBuffer();
private long pendingDeliveries; private long pendingDeliveries;
private boolean deliveryStalled = true;
private boolean inDelivery = false; private boolean inDelivery = false;
private boolean closeWhenComplete = false;
/** /**
* Create a deframer. * Create a deframer.
* *
@ -147,24 +141,18 @@ public class MessageDeframer implements Closeable {
* Adds the given data to this deframer and attempts delivery to the listener. * Adds the given data to this deframer and attempts delivery to the listener.
* *
* @param data the raw data read from the remote endpoint. Must be non-null. * @param data the raw data read from the remote endpoint. Must be non-null.
* @param endOfStream if {@code true}, indicates that {@code data} is the end of the stream from * @throws IllegalStateException if {@link #close()} or {@link #closeWhenComplete()} has been
* the remote endpoint. End of stream should not be used in the event of a transport * called previously.
* error, such as a stream reset.
* @throws IllegalStateException if {@link #close()} has been called previously or if
* this method has previously been called with {@code endOfStream=true}.
*/ */
public void deframe(ReadableBuffer data, boolean endOfStream) { public void deframe(ReadableBuffer data) {
Preconditions.checkNotNull(data, "data"); Preconditions.checkNotNull(data, "data");
boolean needToCloseData = true; boolean needToCloseData = true;
try { try {
checkNotClosed(); checkNotClosedOrScheduledToClose();
Preconditions.checkState(!this.endOfStream, "Past end of stream");
unprocessed.addBuffer(data); unprocessed.addBuffer(data);
needToCloseData = false; needToCloseData = false;
// Indicate that all of the data for this stream has been received.
this.endOfStream = endOfStream;
deliver(); deliver();
} finally { } finally {
if (needToCloseData) { if (needToCloseData) {
@ -173,12 +161,17 @@ public class MessageDeframer implements Closeable {
} }
} }
/** /** Close when any messages currently in unprocessed have been requested and delivered. */
* Indicates whether delivery is currently stalled, pending receipt of more data. This means public void closeWhenComplete() {
* that no additional data can be delivered to the application. if (unprocessed == null) {
*/ return;
public boolean isStalled() { }
return deliveryStalled; boolean stalled = unprocessed.readableBytes() == 0;
if (stalled) {
close();
} else {
closeWhenComplete = true;
}
} }
/** /**
@ -187,6 +180,10 @@ public class MessageDeframer implements Closeable {
*/ */
@Override @Override
public void close() { public void close() {
if (isClosed()) {
return;
}
boolean hasPartialMessage = nextFrame != null && nextFrame.readableBytes() > 0;
try { try {
if (unprocessed != null) { if (unprocessed != null) {
unprocessed.close(); unprocessed.close();
@ -198,6 +195,7 @@ public class MessageDeframer implements Closeable {
unprocessed = null; unprocessed = null;
nextFrame = null; nextFrame = null;
} }
listener.deframerClosed(hasPartialMessage);
} }
/** /**
@ -210,8 +208,9 @@ public class MessageDeframer implements Closeable {
/** /**
* Throws if this deframer has already been closed. * Throws if this deframer has already been closed.
*/ */
private void checkNotClosed() { private void checkNotClosedOrScheduledToClose() {
Preconditions.checkState(!isClosed(), "MessageDeframer is already closed"); Preconditions.checkState(!isClosed(), "MessageDeframer is already closed");
Preconditions.checkState(!closeWhenComplete, "MessageDeframer is scheduled to close");
} }
/** /**
@ -253,26 +252,8 @@ public class MessageDeframer implements Closeable {
* be in unprocessed. * be in unprocessed.
*/ */
boolean stalled = unprocessed.readableBytes() == 0; boolean stalled = unprocessed.readableBytes() == 0;
if (closeWhenComplete && stalled) {
if (endOfStream && stalled) { close();
boolean havePartialMessage = nextFrame != null && nextFrame.readableBytes() > 0;
if (!havePartialMessage) {
listener.endOfStream();
deliveryStalled = false;
return;
} else {
// We've received the entire stream and have data available but we don't have
// enough to read the next frame ... this is bad.
throw Status.INTERNAL.withDescription(
debugString + ": Encountered end-of-stream mid-frame").asRuntimeException();
}
}
// If we're transitioning to the stalled state, notify the listener.
boolean previouslyStalled = deliveryStalled;
deliveryStalled = stalled;
if (stalled && !previouslyStalled) {
listener.deliveryStalled();
} }
} finally { } finally {
inDelivery = false; inDelivery = false;

View File

@ -200,7 +200,7 @@ public class AbstractClientStreamTest {
// on the transport thread. // on the transport thread.
stream.transportState().requestMessagesFromDeframer(1); stream.transportState().requestMessagesFromDeframer(1);
// Send first byte of 2 byte message // Send first byte of 2 byte message
stream.transportState().deframe(ReadableBuffers.wrap(new byte[] {0, 0, 0, 0, 2, 1}), false); stream.transportState().deframe(ReadableBuffers.wrap(new byte[] {0, 0, 0, 0, 2, 1}));
Status status = Status.INTERNAL; Status status = Status.INTERNAL;
// Simulate getting a reset // Simulate getting a reset
stream.transportState().transportReportStatus(status, false /*stop delivery*/, new Metadata()); stream.transportState().transportReportStatus(status, false /*stop delivery*/, new Metadata());

View File

@ -27,15 +27,18 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.InternalStatus; import io.grpc.InternalStatus;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.AbstractServerStream.TransportState; import io.grpc.internal.AbstractServerStream.TransportState;
import io.grpc.internal.MessageFramerTest.ByteWritableBuffer; import io.grpc.internal.MessageFramerTest.ByteWritableBuffer;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
@ -48,6 +51,7 @@ import org.mockito.ArgumentCaptor;
*/ */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class AbstractServerStreamTest { public class AbstractServerStreamTest {
private static final int TIMEOUT_MS = 1000;
private static final int MAX_MESSAGE_SIZE = 100; private static final int MAX_MESSAGE_SIZE = 100;
@Rule public final ExpectedException thrown = ExpectedException.none(); @Rule public final ExpectedException thrown = ExpectedException.none();
@ -90,6 +94,59 @@ public class AbstractServerStreamTest {
assertNull("no message expected", streamListenerMessageQueue.poll()); assertNull("no message expected", streamListenerMessageQueue.poll());
} }
@Test
public void queuedBytesInDeframerShouldNotBlockComplete() throws Exception {
final SettableFuture<Status> closedFuture = SettableFuture.create();
stream.transportState().setListener(new ServerStreamListenerBase() {
@Override
public void closed(Status status) {
closedFuture.set(status);
}
});
// Queue bytes in deframer
stream.transportState().inboundDataReceived(ReadableBuffers.wrap(new byte[]{1}), false);
stream.transportState().complete();
assertEquals(Status.OK, closedFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
@Test
public void queuedBytesInDeframerShouldNotBlockTransportReportStatus() throws Exception {
final SettableFuture<Status> closedFuture = SettableFuture.create();
stream.transportState().setListener(new ServerStreamListenerBase() {
@Override
public void closed(Status status) {
closedFuture.set(status);
}
});
// Queue bytes in deframer
stream.transportState().inboundDataReceived(ReadableBuffers.wrap(new byte[]{1}), false);
stream.transportState().transportReportStatus(Status.CANCELLED);
assertEquals(Status.CANCELLED, closedFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
}
@Test
public void partialMessageAtEndOfStreamShouldFail() throws Exception {
final SettableFuture<Status> closedFuture = SettableFuture.create();
stream.transportState().setListener(new ServerStreamListenerBase() {
@Override
public void closed(Status status) {
closedFuture.set(status);
}
});
// Queue a partial message in the deframer
stream.transportState().inboundDataReceived(ReadableBuffers.wrap(new byte[]{1}), true);
stream.transportState().requestMessagesFromDeframer(1);
Status status = closedFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertEquals(Status.INTERNAL.getCode(), status.getCode());
assertEquals("Encountered end-of-stream mid-frame", status.getDescription());
}
/** /**
* Test for issue https://github.com/grpc/grpc-java/issues/615 * Test for issue https://github.com/grpc/grpc-java/issues/615
*/ */
@ -120,7 +177,6 @@ public class AbstractServerStreamTest {
state.onStreamAllocated(); state.onStreamAllocated();
} }
@Test @Test
public void listenerReady_readyCalled() { public void listenerReady_readyCalled() {
ServerStreamListener streamListener = mock(ServerStreamListener.class); ServerStreamListener streamListener = mock(ServerStreamListener.class);
@ -240,7 +296,16 @@ public class AbstractServerStreamTest {
private static class ServerStreamListenerBase implements ServerStreamListener { private static class ServerStreamListenerBase implements ServerStreamListener {
@Override @Override
public void messagesAvailable(MessageProducer producer) {} public void messagesAvailable(MessageProducer producer) {
InputStream message;
while ((message = producer.next()) != null) {
try {
message.close();
} catch (IOException e) {
// Continue to close other messages
}
}
}
@Override @Override
public void onReady() {} public void onReady() {}
@ -279,7 +344,10 @@ public class AbstractServerStreamTest {
} }
@Override @Override
protected void deframeFailed(Throwable cause) {} protected void deframeFailed(Throwable cause) {
Status status = Status.fromThrowable(cause);
transportReportStatus(status);
}
@Override @Override
public void bytesRead(int processedBytes) {} public void bytesRead(int processedBytes) {}

View File

@ -73,7 +73,7 @@ public class MessageDeframerTest {
@Test @Test
public void simplePayload() { public void simplePayload() {
deframer.request(1); deframer.request(1);
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 2, 3, 14}));
verify(listener).messageRead(messages.capture()); verify(listener).messageRead(messages.capture());
assertEquals(Bytes.asList(new byte[]{3, 14}), bytes(messages)); assertEquals(Bytes.asList(new byte[]{3, 14}), bytes(messages));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
@ -84,7 +84,7 @@ public class MessageDeframerTest {
@Test @Test
public void smallCombinedPayloads() { public void smallCombinedPayloads() {
deframer.request(2); deframer.request(2);
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}));
verify(listener, times(2)).messageRead(messages.capture()); verify(listener, times(2)).messageRead(messages.capture());
List<InputStream> streams = messages.getAllValues(); List<InputStream> streams = messages.getAllValues();
assertEquals(2, streams.size()); assertEquals(2, streams.size());
@ -98,10 +98,11 @@ public class MessageDeframerTest {
@Test @Test
public void endOfStreamWithPayloadShouldNotifyEndOfStream() { public void endOfStreamWithPayloadShouldNotifyEndOfStream() {
deframer.request(1); deframer.request(1);
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true); deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}));
deframer.closeWhenComplete();
verify(listener).messageRead(messages.capture()); verify(listener).messageRead(messages.capture());
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verify(listener).endOfStream(); verify(listener).deframerClosed(false);
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(1, 1, 1); checkStats(1, 1, 1);
@ -109,8 +110,20 @@ public class MessageDeframerTest {
@Test @Test
public void endOfStreamShouldNotifyEndOfStream() { public void endOfStreamShouldNotifyEndOfStream() {
deframer.deframe(buffer(new byte[0]), true); deframer.deframe(buffer(new byte[0]));
verify(listener).endOfStream(); deframer.closeWhenComplete();
verify(listener).deframerClosed(false);
verifyNoMoreInteractions(listener);
checkStats(0, 0, 0);
}
@Test
public void endOfStreamWithPartialMessageShouldNotifyDeframerClosedWithPartialMessage() {
deframer.request(1);
deframer.deframe(buffer(new byte[1]));
deframer.closeWhenComplete();
verify(listener, atLeastOnce()).bytesRead(anyInt());
verify(listener).deframerClosed(true);
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(0, 0, 0); checkStats(0, 0, 0);
} }
@ -118,14 +131,13 @@ public class MessageDeframerTest {
@Test @Test
public void payloadSplitBetweenBuffers() { public void payloadSplitBetweenBuffers() {
deframer.request(1); deframer.request(1);
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9}), false); deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9}));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
deframer.deframe(buffer(new byte[] {2, 6}), false); deframer.deframe(buffer(new byte[] {2, 6}));
verify(listener).messageRead(messages.capture()); verify(listener).messageRead(messages.capture());
assertEquals(Bytes.asList(new byte[] {3, 14, 1, 5, 9, 2, 6}), bytes(messages)); assertEquals(Bytes.asList(new byte[] {3, 14, 1, 5, 9, 2, 6}), bytes(messages));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
assertTrue(deframer.isStalled());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(1, 7, 7); checkStats(1, 7, 7);
} }
@ -134,14 +146,13 @@ public class MessageDeframerTest {
public void frameHeaderSplitBetweenBuffers() { public void frameHeaderSplitBetweenBuffers() {
deframer.request(1); deframer.request(1);
deframer.deframe(buffer(new byte[] {0, 0}), false); deframer.deframe(buffer(new byte[] {0, 0}));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
deframer.deframe(buffer(new byte[] {0, 0, 1, 3}), false); deframer.deframe(buffer(new byte[] {0, 0, 1, 3}));
verify(listener).messageRead(messages.capture()); verify(listener).messageRead(messages.capture());
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
assertTrue(deframer.isStalled());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(1, 1, 1); checkStats(1, 1, 1);
} }
@ -149,7 +160,7 @@ public class MessageDeframerTest {
@Test @Test
public void emptyPayload() { public void emptyPayload() {
deframer.request(1); deframer.request(1);
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 0}), false); deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 0}));
verify(listener).messageRead(messages.capture()); verify(listener).messageRead(messages.capture());
assertEquals(Bytes.asList(), bytes(messages)); assertEquals(Bytes.asList(), bytes(messages));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
@ -161,7 +172,7 @@ public class MessageDeframerTest {
public void largerFrameSize() { public void largerFrameSize() {
deframer.request(1); deframer.request(1);
deframer.deframe(ReadableBuffers.wrap( deframer.deframe(ReadableBuffers.wrap(
Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false); Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])));
verify(listener).messageRead(messages.capture()); verify(listener).messageRead(messages.capture());
assertEquals(Bytes.asList(new byte[1000]), bytes(messages)); assertEquals(Bytes.asList(new byte[1000]), bytes(messages));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
@ -171,13 +182,14 @@ public class MessageDeframerTest {
@Test @Test
public void endOfStreamCallbackShouldWaitForMessageDelivery() { public void endOfStreamCallbackShouldWaitForMessageDelivery() {
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true); deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}));
deframer.closeWhenComplete();
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
deframer.request(1); deframer.request(1);
verify(listener).messageRead(messages.capture()); verify(listener).messageRead(messages.capture());
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verify(listener).endOfStream(); verify(listener).deframerClosed(false);
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
checkStats(1, 1, 1); checkStats(1, 1, 1);
@ -192,7 +204,7 @@ public class MessageDeframerTest {
byte[] payload = compress(new byte[1000]); byte[] payload = compress(new byte[1000]);
assertTrue(payload.length < 100); assertTrue(payload.length < 100);
byte[] header = new byte[] {1, 0, 0, 0, (byte) payload.length}; byte[] header = new byte[] {1, 0, 0, 0, (byte) payload.length};
deframer.deframe(buffer(Bytes.concat(header, payload)), false); deframer.deframe(buffer(Bytes.concat(header, payload)));
verify(listener).messageRead(messages.capture()); verify(listener).messageRead(messages.capture());
assertEquals(Bytes.asList(new byte[1000]), bytes(messages)); assertEquals(Bytes.asList(new byte[1000]), bytes(messages));
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
@ -209,13 +221,14 @@ public class MessageDeframerTest {
return null; return null;
} }
}).when(listener).messageRead(Matchers.<InputStream>any()); }).when(listener).messageRead(Matchers.<InputStream>any());
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true); deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}));
deframer.closeWhenComplete();
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
deframer.request(1); deframer.request(1);
verify(listener).messageRead(messages.capture()); verify(listener).messageRead(messages.capture());
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verify(listener).endOfStream(); verify(listener).deframerClosed(false);
verify(listener, atLeastOnce()).bytesRead(anyInt()); verify(listener, atLeastOnce()).bytesRead(anyInt());
verifyNoMoreInteractions(listener); verifyNoMoreInteractions(listener);
} }