Handle exceptions thrown by the application

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=79895899
This commit is contained in:
ejona 2014-11-13 15:16:51 -08:00 committed by Eric Anderson
parent 1f2a1869e5
commit 0f56c47ac8
4 changed files with 200 additions and 50 deletions

View File

@ -5,6 +5,8 @@ import static com.google.net.stubby.transport.StreamState.OPEN;
import static com.google.net.stubby.transport.StreamState.READ_ONLY;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
@ -37,6 +39,16 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
// Stored status & trailers to report when deframer completes.
private Status stashedStatus;
private Metadata.Trailers stashedTrailers;
private final FutureCallback<Object> failureCallback = new FutureCallback<Object>() {
@Override
public void onFailure(Throwable t) {
log.log(Level.WARNING, "Exception processing message", t);
cancel();
}
@Override
public void onSuccess(Object result) {}
};
protected AbstractClientStream(ClientStreamListener listener,
@Nullable Decompressor decompressor,
@ -88,7 +100,10 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
}
inboundPhase(Phase.MESSAGE);
if (GRPC_V2_PROTOCOL) {
deframer2.delayProcessing(listener.headersRead(headers));
ListenableFuture<?> future = deframer2.delayProcessing(listener.headersRead(headers));
if (future != null) {
Futures.addCallback(future, failureCallback);
}
} else {
// This is a little broken as it doesn't strictly wait for the last payload handled
// by the deframer to be processed by the application layer. Not worth fixing as will
@ -116,7 +131,10 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
if (!GRPC_V2_PROTOCOL) {
deframer.deframe(frame, false);
} else {
deframer2.deframe(frame, false);
ListenableFuture<?> future = deframer2.deframe(frame, false);
if (future != null) {
Futures.addCallback(future, failureCallback);
}
}
}
@ -138,7 +156,10 @@ public abstract class AbstractClientStream<IdT> extends AbstractStream<IdT>
if (!GRPC_V2_PROTOCOL) {
deframer.deframe(Buffers.empty(), true);
} else {
deframer2.deframe(Buffers.empty(), true);
ListenableFuture<?> future = deframer2.deframe(Buffers.empty(), true);
if (future != null) {
Futures.addCallback(future, failureCallback);
}
}
}

View File

@ -5,6 +5,8 @@ import static com.google.net.stubby.transport.StreamState.OPEN;
import static com.google.net.stubby.transport.StreamState.WRITE_ONLY;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status;
@ -12,6 +14,8 @@ import com.google.net.stubby.Status;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
@ -21,6 +25,7 @@ import javax.annotation.concurrent.GuardedBy;
*/
public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
implements ServerStream {
private static final Logger log = Logger.getLogger(AbstractServerStream.class.getName());
private ServerStreamListener listener;
@ -36,6 +41,16 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
private boolean gracefulClose;
/** Saved trailers from close() that need to be sent once the framer has sent all messages. */
private Metadata.Trailers stashedTrailers;
private final FutureCallback<Object> failureCallback = new FutureCallback<Object>() {
@Override
public void onFailure(Throwable t) {
log.log(Level.WARNING, "Exception processing message", t);
abortStream(Status.fromThrowable(t), true);
}
@Override
public void onSuccess(Object result) {}
};
protected AbstractServerStream(IdT id, @Nullable Decompressor decompressor,
Executor deframerExecutor) {
@ -117,7 +132,10 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
if (!GRPC_V2_PROTOCOL) {
deframer.deframe(frame, endOfStream);
} else {
deframer2.deframe(frame, endOfStream);
ListenableFuture<?> future = deframer2.deframe(frame, endOfStream);
if (future != null) {
Futures.addCallback(future, failureCallback);
}
}
}

View File

@ -1,8 +1,12 @@
package com.google.net.stubby.transport;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.Status;
import java.io.ByteArrayInputStream;
@ -42,18 +46,11 @@ public class MessageDeframer2 implements Closeable {
private final Sink sink;
private final Executor executor;
private final Compression compression;
private final Runnable deliveryTask = new Runnable() {
@Override
public void run() {
deliveryOutstanding = false;
deliver();
}
};
private State state = State.HEADER;
private int requiredLength = HEADER_LENGTH;
private boolean compressedFlag;
private boolean endOfStream;
private boolean deliveryOutstanding;
private SettableFuture<?> deliveryOutstanding;
private CompositeBuffer nextFrame;
private CompositeBuffer unprocessed = new CompositeBuffer();
@ -86,8 +83,14 @@ public class MessageDeframer2 implements Closeable {
/**
* Adds the given data to this deframer and attempts delivery to the sink.
*
* <p>If returned future is not {@code null}, then it completes when no more deliveries are
* occuring. Delivering completes if all available deframing input is consumed or if delivery
* resulted in an exception, in which case this method may throw the exception or the returned
* future will fail with the throwable. The future is guaranteed to complete within the executor
* provided during construction.
*/
public void deframe(Buffer data, boolean endOfStream) {
public ListenableFuture<?> deframe(Buffer data, boolean endOfStream) {
Preconditions.checkNotNull(data, "data");
Preconditions.checkState(this.endOfStream == false, "Past end of stream");
unprocessed.addBuffer(data);
@ -95,8 +98,11 @@ public class MessageDeframer2 implements Closeable {
// Indicate that all of the data for this stream has been received.
this.endOfStream = endOfStream;
// Deliver the next message if not already delivering.
deliver();
if (deliveryOutstanding != null) {
// Only allow one outstanding delivery at a time.
return null;
}
return deliver();
}
@Override
@ -107,25 +113,62 @@ public class MessageDeframer2 implements Closeable {
}
}
public void delayProcessing(ListenableFuture<Void> future) {
Preconditions.checkState(!deliveryOutstanding, "Only one delay allowed concurrently");
if (future != null) {
deliveryOutstanding = true;
// Once future completes, try to deliver the next message.
future.addListener(deliveryTask, executor);
/**
* Consider {@code future} to be a message currently being processed. Messages will not be
* delivered until the future completes. The returned future behaves as if it was returned by
* {@link #deframe(Buffer, boolean)}.
*
* @throws IllegalStateException if a message is already being processed
*/
public ListenableFuture<?> delayProcessing(ListenableFuture<?> future) {
Preconditions.checkState(deliveryOutstanding == null, "Only one delay allowed concurrently");
if (future == null) {
return null;
}
return delayProcessingInternal(future);
}
/**
* If there is no outstanding delivery, attempts to read and deliver as many messages to the
* sink as possible. Only one outstanding delivery is allowed at a time.
* May only be called when a delivery is known not to be outstanding. If deliveryOutstanding is
* non-null, then it will be re-used and this method will return {@code null}.
*/
private void deliver() {
if (deliveryOutstanding) {
// Only allow one outstanding delivery at a time.
return;
private ListenableFuture<?> delayProcessingInternal(ListenableFuture<?> future) {
Preconditions.checkNotNull(future, "future");
// Return a separate future so that our callback is guaranteed to complete before any
// listeners on the returned future.
ListenableFuture<?> returnFuture = null;
if (deliveryOutstanding == null) {
returnFuture = deliveryOutstanding = SettableFuture.create();
}
Futures.addCallback(future, new FutureCallback<Object>() {
@Override
public void onFailure(Throwable t) {
SettableFuture<?> previousOutstanding = deliveryOutstanding;
deliveryOutstanding = null;
previousOutstanding.setException(t);
}
@Override
public void onSuccess(Object result) {
try {
deliver();
} catch (Throwable t) {
if (deliveryOutstanding == null) {
throw Throwables.propagate(t);
} else {
onFailure(t);
}
}
}
}, executor);
return returnFuture;
}
/**
* Reads and delivers as many messages to the sink as possible. May only be called when a delivery
* is known not to be outstanding.
*/
private ListenableFuture<?> deliver() {
// Process the uncompressed bytes.
while (readRequiredBytes()) {
switch (state) {
@ -134,17 +177,13 @@ public class MessageDeframer2 implements Closeable {
break;
case BODY:
// Read the body and deliver the message to the sink.
deliveryOutstanding = true;
ListenableFuture<Void> processingFuture = processBody();
ListenableFuture<?> processingFuture = processBody();
if (processingFuture != null) {
// A future was returned for the completion of processing the delivered
// message. Once it's done, try to deliver the next message.
processingFuture.addListener(deliveryTask, executor);
return;
return delayProcessingInternal(processingFuture);
}
// No future was returned, so assume processing is complete for the delivery.
deliveryOutstanding = false;
break;
default:
throw new AssertionError("Invalid state: " + state);
@ -153,14 +192,19 @@ public class MessageDeframer2 implements Closeable {
if (endOfStream) {
if (nextFrame.readableBytes() != 0) {
// TODO(user): Investigate how this should be propagated, so that stream is aborted and
// application is properly notified of abortion.
throw Status.INTERNAL
.withDescription("Encountered end-of-stream mid-frame")
.asRuntimeException();
}
sink.endOfStream();
}
// All available messagesed processed.
if (deliveryOutstanding != null) {
SettableFuture<?> previousOutstanding = deliveryOutstanding;
deliveryOutstanding = null;
previousOutstanding.set(null);
}
return null;
}
/**
@ -210,8 +254,8 @@ public class MessageDeframer2 implements Closeable {
* Processes the body of the GRPC compression frame. A single compression frame may contain
* several GRPC messages within it.
*/
private ListenableFuture<Void> processBody() {
ListenableFuture<Void> future;
private ListenableFuture<?> processBody() {
ListenableFuture<?> future;
if (compressedFlag) {
if (compression == Compression.NONE) {
throw Status.INTERNAL

View File

@ -1,7 +1,11 @@
package com.google.net.stubby.transport;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@ -11,6 +15,7 @@ import static org.mockito.Mockito.when;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Bytes;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.transport.MessageDeframer2.Sink;
@ -24,6 +29,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.zip.GZIPOutputStream;
/**
@ -37,7 +43,7 @@ public class MessageDeframer2Test {
@Test
public void simplePayload() {
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false);
assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false));
verify(sink).messageRead(messages.capture(), eq(2));
assertEquals(Bytes.asList(new byte[] {3, 14}), bytes(messages));
verifyNoMoreInteractions(sink);
@ -45,7 +51,8 @@ public class MessageDeframer2Test {
@Test
public void smallCombinedPayloads() {
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false);
assertNull(
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false));
verify(sink).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verify(sink).messageRead(messages.capture(), eq(2));
@ -55,7 +62,7 @@ public class MessageDeframer2Test {
@Test
public void endOfStreamWithPayloadShouldNotifyEndOfStream() {
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true);
assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true));
verify(sink).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verify(sink).endOfStream();
@ -64,16 +71,16 @@ public class MessageDeframer2Test {
@Test
public void endOfStreamShouldNotifyEndOfStream() {
deframer.deframe(buffer(new byte[0]), true);
assertNull(deframer.deframe(buffer(new byte[0]), true));
verify(sink).endOfStream();
verifyNoMoreInteractions(sink);
}
@Test
public void payloadSplitBetweenBuffers() {
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9}), false);
assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9}), false));
verifyNoMoreInteractions(sink);
deframer.deframe(buffer(new byte[] {2, 6}), false);
assertNull(deframer.deframe(buffer(new byte[] {2, 6}), false));
verify(sink).messageRead(messages.capture(), eq(7));
assertEquals(Bytes.asList(new byte[] {3, 14, 1, 5, 9, 2, 6}), bytes(messages));
verifyNoMoreInteractions(sink);
@ -81,9 +88,9 @@ public class MessageDeframer2Test {
@Test
public void frameHeaderSplitBetweenBuffers() {
deframer.deframe(buffer(new byte[] {0, 0}), false);
assertNull(deframer.deframe(buffer(new byte[] {0, 0}), false));
verifyNoMoreInteractions(sink);
deframer.deframe(buffer(new byte[] {0, 0, 1, 3}), false);
assertNull(deframer.deframe(buffer(new byte[] {0, 0, 1, 3}), false));
verify(sink).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verifyNoMoreInteractions(sink);
@ -91,7 +98,7 @@ public class MessageDeframer2Test {
@Test
public void emptyPayload() {
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 0}), false);
assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 0}), false));
verify(sink).messageRead(messages.capture(), eq(0));
assertEquals(Bytes.asList(), bytes(messages));
verifyNoMoreInteractions(sink);
@ -99,8 +106,8 @@ public class MessageDeframer2Test {
@Test
public void largerFrameSize() {
deframer.deframe(
Buffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false);
assertNull(deframer.deframe(
Buffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false));
verify(sink).messageRead(messages.capture(), eq(1000));
assertEquals(Bytes.asList(new byte[1000]), bytes(messages));
verifyNoMoreInteractions(sink);
@ -110,31 +117,91 @@ public class MessageDeframer2Test {
public void payloadCallbackShouldWaitForFutureCompletion() {
SettableFuture<Void> messageFuture = SettableFuture.create();
when(sink.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture);
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false);
ListenableFuture<?> deframeFuture
= deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false);
assertNotNull(deframeFuture);
verify(sink).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verifyNoMoreInteractions(sink);
SettableFuture<Void> messageFuture2 = SettableFuture.create();
when(sink.messageRead(any(InputStream.class), eq(2))).thenReturn(messageFuture2);
messageFuture.set(null);
assertFalse(deframeFuture.isDone());
verify(sink).messageRead(messages.capture(), eq(2));
assertEquals(Bytes.asList(new byte[] {14, 15}), bytes(messages));
verifyNoMoreInteractions(sink);
messageFuture2.set(null);
assertTrue(deframeFuture.isDone());
verifyNoMoreInteractions(sink);
}
@Test
public void endOfStreamCallbackShouldWaitForFutureCompletion() {
SettableFuture<Void> messageFuture = SettableFuture.create();
when(sink.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture);
deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true);
ListenableFuture<?> deframeFuture
= deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true);
assertNotNull(deframeFuture);
verify(sink).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verifyNoMoreInteractions(sink);
messageFuture.set(null);
assertTrue(deframeFuture.isDone());
verify(sink).endOfStream();
verifyNoMoreInteractions(sink);
}
@Test
public void futureShouldPropagateThrownException() throws InterruptedException {
SettableFuture<Void> messageFuture = SettableFuture.create();
when(sink.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture);
ListenableFuture<?> deframeFuture
= deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false);
assertNotNull(deframeFuture);
verify(sink).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verifyNoMoreInteractions(sink);
RuntimeException thrownEx = new RuntimeException();
when(sink.messageRead(any(InputStream.class), eq(2))).thenThrow(thrownEx);
messageFuture.set(null);
verify(sink).messageRead(messages.capture(), eq(2));
assertTrue(deframeFuture.isDone());
try {
deframeFuture.get();
fail("Should have throws ExecutionException");
} catch (ExecutionException ex) {
assertEquals(thrownEx, ex.getCause());
}
verifyNoMoreInteractions(sink);
}
@Test
public void futureFailureShouldStopAndPropagateFailure() throws InterruptedException {
SettableFuture<Void> messageFuture = SettableFuture.create();
when(sink.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture);
ListenableFuture<?> deframeFuture
= deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false);
assertNotNull(deframeFuture);
verify(sink).messageRead(messages.capture(), eq(1));
assertEquals(Bytes.asList(new byte[] {3}), bytes(messages));
verifyNoMoreInteractions(sink);
RuntimeException thrownEx = new RuntimeException();
messageFuture.setException(thrownEx);
assertTrue(deframeFuture.isDone());
try {
deframeFuture.get();
fail("Should have throws ExecutionException");
} catch (ExecutionException ex) {
assertEquals(thrownEx, ex.getCause());
}
verifyNoMoreInteractions(sink);
}
@Test
public void compressed() {
deframer = new MessageDeframer2(