From 0f56c47ac8276bcd03cfea82814dae43f9a16d65 Mon Sep 17 00:00:00 2001 From: ejona Date: Thu, 13 Nov 2014 15:16:51 -0800 Subject: [PATCH] Handle exceptions thrown by the application ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=79895899 --- .../transport/AbstractClientStream.java | 27 ++++- .../transport/AbstractServerStream.java | 20 +++- .../stubby/transport/MessageDeframer2.java | 110 ++++++++++++------ .../transport/MessageDeframer2Test.java | 93 ++++++++++++--- 4 files changed, 200 insertions(+), 50 deletions(-) diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java index 8fb46cc5df..f0cfffa719 100644 --- a/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java @@ -5,6 +5,8 @@ import static com.google.net.stubby.transport.StreamState.OPEN; import static com.google.net.stubby.transport.StreamState.READ_ONLY; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; @@ -37,6 +39,16 @@ public abstract class AbstractClientStream extends AbstractStream // Stored status & trailers to report when deframer completes. private Status stashedStatus; private Metadata.Trailers stashedTrailers; + private final FutureCallback failureCallback = new FutureCallback() { + @Override + public void onFailure(Throwable t) { + log.log(Level.WARNING, "Exception processing message", t); + cancel(); + } + + @Override + public void onSuccess(Object result) {} + }; protected AbstractClientStream(ClientStreamListener listener, @Nullable Decompressor decompressor, @@ -88,7 +100,10 @@ public abstract class AbstractClientStream extends AbstractStream } inboundPhase(Phase.MESSAGE); if (GRPC_V2_PROTOCOL) { - deframer2.delayProcessing(listener.headersRead(headers)); + ListenableFuture future = deframer2.delayProcessing(listener.headersRead(headers)); + if (future != null) { + Futures.addCallback(future, failureCallback); + } } else { // This is a little broken as it doesn't strictly wait for the last payload handled // by the deframer to be processed by the application layer. Not worth fixing as will @@ -116,7 +131,10 @@ public abstract class AbstractClientStream extends AbstractStream if (!GRPC_V2_PROTOCOL) { deframer.deframe(frame, false); } else { - deframer2.deframe(frame, false); + ListenableFuture future = deframer2.deframe(frame, false); + if (future != null) { + Futures.addCallback(future, failureCallback); + } } } @@ -138,7 +156,10 @@ public abstract class AbstractClientStream extends AbstractStream if (!GRPC_V2_PROTOCOL) { deframer.deframe(Buffers.empty(), true); } else { - deframer2.deframe(Buffers.empty(), true); + ListenableFuture future = deframer2.deframe(Buffers.empty(), true); + if (future != null) { + Futures.addCallback(future, failureCallback); + } } } diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java index 61a4526c7a..c6056e097b 100644 --- a/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java @@ -5,6 +5,8 @@ import static com.google.net.stubby.transport.StreamState.OPEN; import static com.google.net.stubby.transport.StreamState.WRITE_ONLY; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; @@ -12,6 +14,8 @@ import com.google.net.stubby.Status; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.concurrent.Executor; +import java.util.logging.Level; +import java.util.logging.Logger; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -21,6 +25,7 @@ import javax.annotation.concurrent.GuardedBy; */ public abstract class AbstractServerStream extends AbstractStream implements ServerStream { + private static final Logger log = Logger.getLogger(AbstractServerStream.class.getName()); private ServerStreamListener listener; @@ -36,6 +41,16 @@ public abstract class AbstractServerStream extends AbstractStream private boolean gracefulClose; /** Saved trailers from close() that need to be sent once the framer has sent all messages. */ private Metadata.Trailers stashedTrailers; + private final FutureCallback failureCallback = new FutureCallback() { + @Override + public void onFailure(Throwable t) { + log.log(Level.WARNING, "Exception processing message", t); + abortStream(Status.fromThrowable(t), true); + } + + @Override + public void onSuccess(Object result) {} + }; protected AbstractServerStream(IdT id, @Nullable Decompressor decompressor, Executor deframerExecutor) { @@ -117,7 +132,10 @@ public abstract class AbstractServerStream extends AbstractStream if (!GRPC_V2_PROTOCOL) { deframer.deframe(frame, endOfStream); } else { - deframer2.deframe(frame, endOfStream); + ListenableFuture future = deframer2.deframe(frame, endOfStream); + if (future != null) { + Futures.addCallback(future, failureCallback); + } } } diff --git a/core/src/main/java/com/google/net/stubby/transport/MessageDeframer2.java b/core/src/main/java/com/google/net/stubby/transport/MessageDeframer2.java index feda090e58..0a63b84ede 100644 --- a/core/src/main/java/com/google/net/stubby/transport/MessageDeframer2.java +++ b/core/src/main/java/com/google/net/stubby/transport/MessageDeframer2.java @@ -1,8 +1,12 @@ package com.google.net.stubby.transport; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.io.ByteStreams; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.Status; import java.io.ByteArrayInputStream; @@ -42,18 +46,11 @@ public class MessageDeframer2 implements Closeable { private final Sink sink; private final Executor executor; private final Compression compression; - private final Runnable deliveryTask = new Runnable() { - @Override - public void run() { - deliveryOutstanding = false; - deliver(); - } - }; private State state = State.HEADER; private int requiredLength = HEADER_LENGTH; private boolean compressedFlag; private boolean endOfStream; - private boolean deliveryOutstanding; + private SettableFuture deliveryOutstanding; private CompositeBuffer nextFrame; private CompositeBuffer unprocessed = new CompositeBuffer(); @@ -86,8 +83,14 @@ public class MessageDeframer2 implements Closeable { /** * Adds the given data to this deframer and attempts delivery to the sink. + * + *

If returned future is not {@code null}, then it completes when no more deliveries are + * occuring. Delivering completes if all available deframing input is consumed or if delivery + * resulted in an exception, in which case this method may throw the exception or the returned + * future will fail with the throwable. The future is guaranteed to complete within the executor + * provided during construction. */ - public void deframe(Buffer data, boolean endOfStream) { + public ListenableFuture deframe(Buffer data, boolean endOfStream) { Preconditions.checkNotNull(data, "data"); Preconditions.checkState(this.endOfStream == false, "Past end of stream"); unprocessed.addBuffer(data); @@ -95,8 +98,11 @@ public class MessageDeframer2 implements Closeable { // Indicate that all of the data for this stream has been received. this.endOfStream = endOfStream; - // Deliver the next message if not already delivering. - deliver(); + if (deliveryOutstanding != null) { + // Only allow one outstanding delivery at a time. + return null; + } + return deliver(); } @Override @@ -107,25 +113,62 @@ public class MessageDeframer2 implements Closeable { } } - public void delayProcessing(ListenableFuture future) { - Preconditions.checkState(!deliveryOutstanding, "Only one delay allowed concurrently"); - if (future != null) { - deliveryOutstanding = true; - // Once future completes, try to deliver the next message. - future.addListener(deliveryTask, executor); + /** + * Consider {@code future} to be a message currently being processed. Messages will not be + * delivered until the future completes. The returned future behaves as if it was returned by + * {@link #deframe(Buffer, boolean)}. + * + * @throws IllegalStateException if a message is already being processed + */ + public ListenableFuture delayProcessing(ListenableFuture future) { + Preconditions.checkState(deliveryOutstanding == null, "Only one delay allowed concurrently"); + if (future == null) { + return null; } + return delayProcessingInternal(future); } /** - * If there is no outstanding delivery, attempts to read and deliver as many messages to the - * sink as possible. Only one outstanding delivery is allowed at a time. + * May only be called when a delivery is known not to be outstanding. If deliveryOutstanding is + * non-null, then it will be re-used and this method will return {@code null}. */ - private void deliver() { - if (deliveryOutstanding) { - // Only allow one outstanding delivery at a time. - return; + private ListenableFuture delayProcessingInternal(ListenableFuture future) { + Preconditions.checkNotNull(future, "future"); + // Return a separate future so that our callback is guaranteed to complete before any + // listeners on the returned future. + ListenableFuture returnFuture = null; + if (deliveryOutstanding == null) { + returnFuture = deliveryOutstanding = SettableFuture.create(); } + Futures.addCallback(future, new FutureCallback() { + @Override + public void onFailure(Throwable t) { + SettableFuture previousOutstanding = deliveryOutstanding; + deliveryOutstanding = null; + previousOutstanding.setException(t); + } + @Override + public void onSuccess(Object result) { + try { + deliver(); + } catch (Throwable t) { + if (deliveryOutstanding == null) { + throw Throwables.propagate(t); + } else { + onFailure(t); + } + } + } + }, executor); + return returnFuture; + } + + /** + * Reads and delivers as many messages to the sink as possible. May only be called when a delivery + * is known not to be outstanding. + */ + private ListenableFuture deliver() { // Process the uncompressed bytes. while (readRequiredBytes()) { switch (state) { @@ -134,17 +177,13 @@ public class MessageDeframer2 implements Closeable { break; case BODY: // Read the body and deliver the message to the sink. - deliveryOutstanding = true; - ListenableFuture processingFuture = processBody(); + ListenableFuture processingFuture = processBody(); if (processingFuture != null) { // A future was returned for the completion of processing the delivered // message. Once it's done, try to deliver the next message. - processingFuture.addListener(deliveryTask, executor); - return; + return delayProcessingInternal(processingFuture); } - // No future was returned, so assume processing is complete for the delivery. - deliveryOutstanding = false; break; default: throw new AssertionError("Invalid state: " + state); @@ -153,14 +192,19 @@ public class MessageDeframer2 implements Closeable { if (endOfStream) { if (nextFrame.readableBytes() != 0) { - // TODO(user): Investigate how this should be propagated, so that stream is aborted and - // application is properly notified of abortion. throw Status.INTERNAL .withDescription("Encountered end-of-stream mid-frame") .asRuntimeException(); } sink.endOfStream(); } + // All available messagesed processed. + if (deliveryOutstanding != null) { + SettableFuture previousOutstanding = deliveryOutstanding; + deliveryOutstanding = null; + previousOutstanding.set(null); + } + return null; } /** @@ -210,8 +254,8 @@ public class MessageDeframer2 implements Closeable { * Processes the body of the GRPC compression frame. A single compression frame may contain * several GRPC messages within it. */ - private ListenableFuture processBody() { - ListenableFuture future; + private ListenableFuture processBody() { + ListenableFuture future; if (compressedFlag) { if (compression == Compression.NONE) { throw Status.INTERNAL diff --git a/core/src/test/java/com/google/net/stubby/transport/MessageDeframer2Test.java b/core/src/test/java/com/google/net/stubby/transport/MessageDeframer2Test.java index 768c0976b4..d00806ad1d 100644 --- a/core/src/test/java/com/google/net/stubby/transport/MessageDeframer2Test.java +++ b/core/src/test/java/com/google/net/stubby/transport/MessageDeframer2Test.java @@ -1,7 +1,11 @@ package com.google.net.stubby.transport; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -11,6 +15,7 @@ import static org.mockito.Mockito.when; import com.google.common.io.ByteStreams; import com.google.common.primitives.Bytes; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.transport.MessageDeframer2.Sink; @@ -24,6 +29,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.zip.GZIPOutputStream; /** @@ -37,7 +43,7 @@ public class MessageDeframer2Test { @Test public void simplePayload() { - deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false); + assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 2, 3, 14}), false)); verify(sink).messageRead(messages.capture(), eq(2)); assertEquals(Bytes.asList(new byte[] {3, 14}), bytes(messages)); verifyNoMoreInteractions(sink); @@ -45,7 +51,8 @@ public class MessageDeframer2Test { @Test public void smallCombinedPayloads() { - deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); + assertNull( + deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false)); verify(sink).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); verify(sink).messageRead(messages.capture(), eq(2)); @@ -55,7 +62,7 @@ public class MessageDeframer2Test { @Test public void endOfStreamWithPayloadShouldNotifyEndOfStream() { - deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true); + assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true)); verify(sink).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); verify(sink).endOfStream(); @@ -64,16 +71,16 @@ public class MessageDeframer2Test { @Test public void endOfStreamShouldNotifyEndOfStream() { - deframer.deframe(buffer(new byte[0]), true); + assertNull(deframer.deframe(buffer(new byte[0]), true)); verify(sink).endOfStream(); verifyNoMoreInteractions(sink); } @Test public void payloadSplitBetweenBuffers() { - deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9}), false); + assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9}), false)); verifyNoMoreInteractions(sink); - deframer.deframe(buffer(new byte[] {2, 6}), false); + assertNull(deframer.deframe(buffer(new byte[] {2, 6}), false)); verify(sink).messageRead(messages.capture(), eq(7)); assertEquals(Bytes.asList(new byte[] {3, 14, 1, 5, 9, 2, 6}), bytes(messages)); verifyNoMoreInteractions(sink); @@ -81,9 +88,9 @@ public class MessageDeframer2Test { @Test public void frameHeaderSplitBetweenBuffers() { - deframer.deframe(buffer(new byte[] {0, 0}), false); + assertNull(deframer.deframe(buffer(new byte[] {0, 0}), false)); verifyNoMoreInteractions(sink); - deframer.deframe(buffer(new byte[] {0, 0, 1, 3}), false); + assertNull(deframer.deframe(buffer(new byte[] {0, 0, 1, 3}), false)); verify(sink).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); verifyNoMoreInteractions(sink); @@ -91,7 +98,7 @@ public class MessageDeframer2Test { @Test public void emptyPayload() { - deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 0}), false); + assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 0}), false)); verify(sink).messageRead(messages.capture(), eq(0)); assertEquals(Bytes.asList(), bytes(messages)); verifyNoMoreInteractions(sink); @@ -99,8 +106,8 @@ public class MessageDeframer2Test { @Test public void largerFrameSize() { - deframer.deframe( - Buffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false); + assertNull(deframer.deframe( + Buffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false)); verify(sink).messageRead(messages.capture(), eq(1000)); assertEquals(Bytes.asList(new byte[1000]), bytes(messages)); verifyNoMoreInteractions(sink); @@ -110,31 +117,91 @@ public class MessageDeframer2Test { public void payloadCallbackShouldWaitForFutureCompletion() { SettableFuture messageFuture = SettableFuture.create(); when(sink.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture); - deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); + ListenableFuture deframeFuture + = deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); + assertNotNull(deframeFuture); verify(sink).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); verifyNoMoreInteractions(sink); + SettableFuture messageFuture2 = SettableFuture.create(); + when(sink.messageRead(any(InputStream.class), eq(2))).thenReturn(messageFuture2); messageFuture.set(null); + assertFalse(deframeFuture.isDone()); verify(sink).messageRead(messages.capture(), eq(2)); assertEquals(Bytes.asList(new byte[] {14, 15}), bytes(messages)); verifyNoMoreInteractions(sink); + + messageFuture2.set(null); + assertTrue(deframeFuture.isDone()); + verifyNoMoreInteractions(sink); } @Test public void endOfStreamCallbackShouldWaitForFutureCompletion() { SettableFuture messageFuture = SettableFuture.create(); when(sink.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture); - deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true); + ListenableFuture deframeFuture + = deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true); + assertNotNull(deframeFuture); verify(sink).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); verifyNoMoreInteractions(sink); messageFuture.set(null); + assertTrue(deframeFuture.isDone()); verify(sink).endOfStream(); verifyNoMoreInteractions(sink); } + @Test + public void futureShouldPropagateThrownException() throws InterruptedException { + SettableFuture messageFuture = SettableFuture.create(); + when(sink.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture); + ListenableFuture deframeFuture + = deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); + assertNotNull(deframeFuture); + verify(sink).messageRead(messages.capture(), eq(1)); + assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); + verifyNoMoreInteractions(sink); + + RuntimeException thrownEx = new RuntimeException(); + when(sink.messageRead(any(InputStream.class), eq(2))).thenThrow(thrownEx); + messageFuture.set(null); + verify(sink).messageRead(messages.capture(), eq(2)); + assertTrue(deframeFuture.isDone()); + try { + deframeFuture.get(); + fail("Should have throws ExecutionException"); + } catch (ExecutionException ex) { + assertEquals(thrownEx, ex.getCause()); + } + verifyNoMoreInteractions(sink); + } + + @Test + public void futureFailureShouldStopAndPropagateFailure() throws InterruptedException { + SettableFuture messageFuture = SettableFuture.create(); + when(sink.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture); + ListenableFuture deframeFuture + = deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); + assertNotNull(deframeFuture); + verify(sink).messageRead(messages.capture(), eq(1)); + assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); + verifyNoMoreInteractions(sink); + + RuntimeException thrownEx = new RuntimeException(); + messageFuture.setException(thrownEx); + assertTrue(deframeFuture.isDone()); + try { + deframeFuture.get(); + fail("Should have throws ExecutionException"); + } catch (ExecutionException ex) { + assertEquals(thrownEx, ex.getCause()); + } + verifyNoMoreInteractions(sink); + } + @Test public void compressed() { deframer = new MessageDeframer2(