diff --git a/core/src/main/java/com/google/net/stubby/Call.java b/core/src/main/java/com/google/net/stubby/Call.java index b8f6986897..61ba2a8ef2 100644 --- a/core/src/main/java/com/google/net/stubby/Call.java +++ b/core/src/main/java/com/google/net/stubby/Call.java @@ -31,9 +31,6 @@ package com.google.net.stubby; -import com.google.common.util.concurrent.ListenableFuture; - -import javax.annotation.Nullable; /** * Low-level methods for communicating with a remote server during a single RPC. Unlike normal RPCs, @@ -69,14 +66,13 @@ public abstract class Call { * This method is always called, if no headers were received then an empty {@link Metadata} * is passed. */ - public abstract ListenableFuture onHeaders(Metadata.Headers headers); + public abstract void onHeaders(Metadata.Headers headers); /** * A response payload has been received. For streaming calls, there may be zero payload * messages. */ - @Nullable - public abstract ListenableFuture onPayload(T payload); + public abstract void onPayload(T payload); /** * The Call has been closed. No further sending or receiving can occur. If {@code status} is @@ -97,6 +93,22 @@ public abstract class Call { // TODO(lryan): Might be better to put into Channel#newCall, might reduce decoration burden public abstract void start(Listener responseListener, Metadata.Headers headers); + /** + * Requests up to the given number of messages from the call to be delivered to + * {@link Listener#onPayload(Object)}. No additional messages will be delivered. + * + *

Message delivery is guaranteed to be sequential in the order received. In addition, the + * listener methods will not be accessed concurrently. While it is not guaranteed that the same + * thread will always be used, it is guaranteed that only a single thread will access the listener + * at a time. + * + *

If it is desired to bypass inbound flow control, a very large number of messages can be + * specified (e.g. {@link Integer#MAX_VALUE}). + * + * @param numMessages the requested number of messages to be delivered to the listener. + */ + public abstract void request(int numMessages); + /** * Prevent any further processing for this Call. No further messages may be sent or will be * received. The server is informed of cancellations, but may not stop processing the call. diff --git a/core/src/main/java/com/google/net/stubby/ChannelImpl.java b/core/src/main/java/com/google/net/stubby/ChannelImpl.java index 839d16da69..9b5722b63a 100644 --- a/core/src/main/java/com/google/net/stubby/ChannelImpl.java +++ b/core/src/main/java/com/google/net/stubby/ChannelImpl.java @@ -32,13 +32,10 @@ package com.google.net.stubby; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service.Listener; import com.google.common.util.concurrent.Service.State; -import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.transport.ClientStream; import com.google.net.stubby.transport.ClientStreamListener; import com.google.net.stubby.transport.ClientTransport; @@ -48,7 +45,6 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -68,6 +64,7 @@ public final class ChannelImpl implements Channel { @Override public void flush() {} @Override public void cancel() {} @Override public void halfClose() {} + @Override public void request(int numMessages) {} } private final ClientTransportFactory transportFactory; @@ -270,6 +267,11 @@ public final class ChannelImpl implements Channel { } } + @Override + public void request(int numMessages) { + stream.request(numMessages); + } + @Override public void cancel() { // Cancel is called in exception handling cases, so it may be the case that the @@ -311,58 +313,51 @@ public final class ChannelImpl implements Channel { private class ClientStreamListenerImpl implements ClientStreamListener { private final Listener observer; + private boolean closed; public ClientStreamListenerImpl(Listener observer) { Preconditions.checkNotNull(observer); this.observer = observer; } - private ListenableFuture dispatchCallable( - final Callable> callable) { - final SettableFuture ours = SettableFuture.create(); + @Override + public void headersRead(final Metadata.Headers headers) { callExecutor.execute(new Runnable() { @Override public void run() { try { - ListenableFuture theirs = callable.call(); - if (theirs == null) { - ours.set(null); - } else { - Futures.addCallback(theirs, new FutureCallback() { - @Override - public void onSuccess(Void result) { - ours.set(null); - } - @Override - public void onFailure(Throwable t) { - ours.setException(t); - } - }, MoreExecutors.directExecutor()); + if (closed) { + return; } + + observer.onHeaders(headers); } catch (Throwable t) { - ours.setException(t); + cancel(); + throw Throwables.propagate(t); } } }); - return ours; } @Override - public ListenableFuture headersRead(final Metadata.Headers headers) { - return dispatchCallable(new Callable>() { + public void messageRead(final InputStream message, final int length) { + callExecutor.execute(new Runnable() { @Override - public ListenableFuture call() throws Exception { - return observer.onHeaders(headers); - } - }); - } + public void run() { + try { + if (closed) { + return; + } - @Override - public ListenableFuture messageRead(final InputStream message, final int length) { - return dispatchCallable(new Callable>() { - @Override - public ListenableFuture call() { - return observer.onPayload(method.parseResponse(message)); + try { + observer.onPayload(method.parseResponse(message)); + } finally { + message.close(); + } + } catch (Throwable t) { + cancel(); + throw Throwables.propagate(t); + } } }); } @@ -372,6 +367,7 @@ public final class ChannelImpl implements Channel { callExecutor.execute(new Runnable() { @Override public void run() { + closed = true; observer.onClose(status, trailers); } }); diff --git a/core/src/main/java/com/google/net/stubby/ClientInterceptors.java b/core/src/main/java/com/google/net/stubby/ClientInterceptors.java index 636f9031bf..1e0248b3f8 100644 --- a/core/src/main/java/com/google/net/stubby/ClientInterceptors.java +++ b/core/src/main/java/com/google/net/stubby/ClientInterceptors.java @@ -33,7 +33,6 @@ package com.google.net.stubby; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.ListenableFuture; import java.util.Arrays; import java.util.Iterator; @@ -121,6 +120,11 @@ public class ClientInterceptors { this.delegate.start(responseListener, headers); } + @Override + public void request(int numMessages) { + this.delegate.request(numMessages); + } + @Override public void cancel() { this.delegate.cancel(); @@ -150,13 +154,13 @@ public class ClientInterceptors { } @Override - public ListenableFuture onHeaders(Metadata.Headers headers) { - return delegate.onHeaders(headers); + public void onHeaders(Metadata.Headers headers) { + delegate.onHeaders(headers); } @Override - public ListenableFuture onPayload(T payload) { - return delegate.onPayload(payload); + public void onPayload(T payload) { + delegate.onPayload(payload); } @Override diff --git a/core/src/main/java/com/google/net/stubby/ServerCall.java b/core/src/main/java/com/google/net/stubby/ServerCall.java index e9253493cd..8efdb2bd62 100644 --- a/core/src/main/java/com/google/net/stubby/ServerCall.java +++ b/core/src/main/java/com/google/net/stubby/ServerCall.java @@ -31,9 +31,6 @@ package com.google.net.stubby; -import com.google.common.util.concurrent.ListenableFuture; - -import javax.annotation.Nullable; /** * Low-level method for communicating with a remote client during a single RPC. Unlike normal RPCs, @@ -67,8 +64,7 @@ public abstract class ServerCall { * A request payload has been received. For streaming calls, there may be zero payload * messages. */ - @Nullable - public abstract ListenableFuture onPayload(RequestT payload); + public abstract void onPayload(RequestT payload); /** * The client completed all message sending. However, the call may still be cancelled. @@ -93,6 +89,14 @@ public abstract class ServerCall { public abstract void onComplete(); } + /** + * Requests up to the given number of messages from the call to be delivered to + * {@link Listener#onPayload(Object)}. No additional messages will be delivered. + * + * @param numMessages the requested number of messages to be delivered to the listener. + */ + public abstract void request(int numMessages); + /** * Send response header metadata prior to sending a response payload. This method may * only be called once and cannot be called after calls to {@code Stream#sendPayload} diff --git a/core/src/main/java/com/google/net/stubby/ServerImpl.java b/core/src/main/java/com/google/net/stubby/ServerImpl.java index f7cd2e9a9f..df933fe00e 100644 --- a/core/src/main/java/com/google/net/stubby/ServerImpl.java +++ b/core/src/main/java/com/google/net/stubby/ServerImpl.java @@ -34,26 +34,20 @@ package com.google.net.stubby; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.util.concurrent.AbstractService; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service; -import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.transport.ServerListener; import com.google.net.stubby.transport.ServerStream; import com.google.net.stubby.transport.ServerStreamListener; import com.google.net.stubby.transport.ServerTransportListener; +import java.io.IOException; import java.io.InputStream; import java.util.Collection; import java.util.Collections; import java.util.HashSet; -import java.util.concurrent.Callable; import java.util.concurrent.Executor; -import javax.annotation.Nullable; - /** * Default implementation of {@link Server}, for creation by transports. * @@ -299,9 +293,12 @@ public class ServerImpl extends AbstractService implements Server { private static class NoopListener implements ServerStreamListener { @Override - @Nullable - public ListenableFuture messageRead(InputStream value, int length) { - return null; + public void messageRead(InputStream value, int length) { + try { + value.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } } @Override @@ -349,12 +346,16 @@ public class ServerImpl extends AbstractService implements Server { } @Override - @Nullable - public ListenableFuture messageRead(final InputStream message, final int length) { - return dispatchCallable(new Callable>() { + public void messageRead(final InputStream message, final int length) { + callExecutor.execute(new Runnable() { @Override - public ListenableFuture call() { - return getListener().messageRead(message, length); + public void run() { + try { + getListener().messageRead(message, length); + } catch (Throwable t) { + internalClose(Status.fromThrowable(t), new Metadata.Trailers()); + throw Throwables.propagate(t); + } } }); } @@ -383,36 +384,6 @@ public class ServerImpl extends AbstractService implements Server { } }); } - - private ListenableFuture dispatchCallable( - final Callable> callable) { - final SettableFuture ours = SettableFuture.create(); - callExecutor.execute(new Runnable() { - @Override - public void run() { - try { - ListenableFuture theirs = callable.call(); - if (theirs == null) { - ours.set(null); - } else { - Futures.addCallback(theirs, new FutureCallback() { - @Override - public void onSuccess(Void result) { - ours.set(null); - } - @Override - public void onFailure(Throwable t) { - ours.setException(t); - } - }, MoreExecutors.directExecutor()); - } - } catch (Throwable t) { - ours.setException(t); - } - } - }); - return ours; - } } private class ServerCallImpl extends ServerCall { @@ -425,6 +396,11 @@ public class ServerImpl extends AbstractService implements Server { this.methodDef = methodDef; } + @Override + public void request(int numMessages) { + stream.request(numMessages); + } + @Override public void sendHeaders(Metadata.Headers headers) { stream.writeHeaders(headers); @@ -468,13 +444,28 @@ public class ServerImpl extends AbstractService implements Server { } @Override - @Nullable - public ListenableFuture messageRead(final InputStream message, int length) { - return listener.onPayload(methodDef.parseRequest(message)); + public void messageRead(final InputStream message, int length) { + if (cancelled) { + return; + } + + try { + listener.onPayload(methodDef.parseRequest(message)); + } finally { + try { + message.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } @Override public void halfClosed() { + if (cancelled) { + return; + } + listener.onHalfClose(); } diff --git a/core/src/main/java/com/google/net/stubby/ServerInterceptors.java b/core/src/main/java/com/google/net/stubby/ServerInterceptors.java index a17c6ac811..8401c0903c 100644 --- a/core/src/main/java/com/google/net/stubby/ServerInterceptors.java +++ b/core/src/main/java/com/google/net/stubby/ServerInterceptors.java @@ -144,6 +144,11 @@ public class ServerInterceptors { this.delegate = delegate; } + @Override + public void request(int numMessages) { + delegate.request(numMessages); + } + @Override public void sendHeaders(Metadata.Headers headers) { delegate.sendHeaders(headers); diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java index 053b057dad..3cf17ef71f 100644 --- a/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/AbstractClientStream.java @@ -33,14 +33,11 @@ package com.google.net.stubby.transport; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; @@ -53,7 +50,6 @@ public abstract class AbstractClientStream extends AbstractStream implements ClientStream { private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName()); - private static final ListenableFuture COMPLETED_FUTURE = Futures.immediateFuture(null); private final ClientStreamListener listener; private boolean listenerClosed; @@ -65,17 +61,15 @@ public abstract class AbstractClientStream extends AbstractStream private Runnable closeListenerTask; - protected AbstractClientStream(ClientStreamListener listener, Executor deframerExecutor) { - super(deframerExecutor); + protected AbstractClientStream(ClientStreamListener listener) { this.listener = Preconditions.checkNotNull(listener); } @Override - protected ListenableFuture receiveMessage(InputStream is, int length) { - if (listenerClosed) { - return COMPLETED_FUTURE; + protected void receiveMessage(InputStream is, int length) { + if (!listenerClosed) { + listener.messageRead(is, length); } - return listener.messageRead(is, length); } @Override @@ -114,7 +108,7 @@ public abstract class AbstractClientStream extends AbstractStream new Object[]{id(), headers}); } inboundPhase(Phase.MESSAGE); - delayDeframer(listener.headersRead(headers)); + listener.headersRead(headers); } /** @@ -208,7 +202,7 @@ public abstract class AbstractClientStream extends AbstractStream closeListenerTask = null; // Determine if the deframer is stalled (i.e. currently has no complete messages to deliver). - boolean deliveryStalled = !deframer.isDeliveryOutstanding(); + boolean deliveryStalled = deframer.isStalled(); if (stopDelivery || deliveryStalled) { // Close the listener immediately. diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java index 7dd1b53dce..ab1c9a639d 100644 --- a/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/AbstractServerStream.java @@ -32,13 +32,11 @@ package com.google.net.stubby.transport; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ListenableFuture; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; @@ -63,8 +61,7 @@ public abstract class AbstractServerStream extends AbstractStream /** Saved trailers from close() that need to be sent once the framer has sent all messages. */ private Metadata.Trailers stashedTrailers; - protected AbstractServerStream(IdT id, Executor deframerExecutor) { - super(deframerExecutor); + protected AbstractServerStream(IdT id) { id(id); } @@ -73,9 +70,9 @@ public abstract class AbstractServerStream extends AbstractStream } @Override - protected ListenableFuture receiveMessage(InputStream is, int length) { + protected void receiveMessage(InputStream is, int length) { inboundPhase(Phase.MESSAGE); - return listener.messageRead(is, length); + listener.messageRead(is, length); } @Override diff --git a/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java b/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java index bed0372cf6..8959408007 100644 --- a/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/AbstractStream.java @@ -34,15 +34,9 @@ package com.google.net.stubby.transport; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.io.Closeables; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.concurrent.Executor; import javax.annotation.Nullable; @@ -59,15 +53,6 @@ public abstract class AbstractStream implements Stream { private volatile IdT id; private final MessageFramer framer; - private final FutureCallback deframerErrorCallback = new FutureCallback() { - @Override - public void onSuccess(Object result) {} - - @Override - public void onFailure(Throwable t) { - deframeFailed(t); - } - }; final MessageDeframer deframer; @@ -81,7 +66,7 @@ public abstract class AbstractStream implements Stream { */ private Phase outboundPhase = Phase.HEADERS; - AbstractStream(Executor deframerExecutor) { + AbstractStream() { MessageDeframer.Listener inboundMessageHandler = new MessageDeframer.Listener() { @Override public void bytesRead(int numBytes) { @@ -89,14 +74,8 @@ public abstract class AbstractStream implements Stream { } @Override - public ListenableFuture messageRead(InputStream input, final int length) { - ListenableFuture future = null; - try { - future = receiveMessage(input, length); - return future; - } finally { - closeWhenDone(future, input); - } + public void messageRead(InputStream input, final int length) { + receiveMessage(input, length); } @Override @@ -117,7 +96,7 @@ public abstract class AbstractStream implements Stream { }; framer = new MessageFramer(outboundFrameHandler, 4096); - this.deframer = new MessageDeframer(inboundMessageHandler, deframerExecutor); + this.deframer = new MessageDeframer(inboundMessageHandler); } /** @@ -194,7 +173,7 @@ public abstract class AbstractStream implements Stream { protected abstract void internalSendFrame(ByteBuffer frame, boolean endOfStream); /** A message was deframed. */ - protected abstract ListenableFuture receiveMessage(InputStream is, int length); + protected abstract void receiveMessage(InputStream is, int length); /** Deframer has no pending deliveries. */ protected abstract void inboundDeliveryPaused(); @@ -215,23 +194,25 @@ public abstract class AbstractStream implements Stream { /** * Called to parse a received frame and attempt delivery of any completed - * messages. + * messages. Must be called from the transport thread. */ protected final void deframe(Buffer frame, boolean endOfStream) { - ListenableFuture future; - future = deframer.deframe(frame, endOfStream); - if (future != null) { - Futures.addCallback(future, deframerErrorCallback); + try { + deframer.deframe(frame, endOfStream); + } catch (Throwable t) { + deframeFailed(t); } } /** - * Delays delivery from the deframer until the given future completes. + * Called to request the given number of messages from the deframer. Must be called + * from the transport thread. */ - protected final void delayDeframer(ListenableFuture future) { - ListenableFuture deliveryFuture = deframer.delayProcessing(future); - if (deliveryFuture != null) { - Futures.addCallback(deliveryFuture, deframerErrorCallback); + protected final void requestMessagesFromDeframer(int numMessages) { + try { + deframer.request(numMessages); + } catch (Throwable t) { + deframeFailed(t); } } @@ -271,26 +252,6 @@ public abstract class AbstractStream implements Stream { return nextPhase; } - /** - * If the given future is provided, closes the {@link InputStream} when it completes. Otherwise - * the {@link InputStream} is closed immediately. - */ - private static void closeWhenDone(@Nullable ListenableFuture future, - final InputStream input) { - if (future == null) { - Closeables.closeQuietly(input); - return; - } - - // Close the buffer when the future completes. - future.addListener(new Runnable() { - @Override - public void run() { - Closeables.closeQuietly(input); - } - }, MoreExecutors.directExecutor()); - } - /** * Can the stream receive data from its remote peer. */ diff --git a/core/src/main/java/com/google/net/stubby/transport/ClientStream.java b/core/src/main/java/com/google/net/stubby/transport/ClientStream.java index 3187f3ce81..188fc845a7 100644 --- a/core/src/main/java/com/google/net/stubby/transport/ClientStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/ClientStream.java @@ -49,5 +49,4 @@ public interface ClientStream extends Stream { * the remote end-point is closed. */ void halfClose(); - } diff --git a/core/src/main/java/com/google/net/stubby/transport/ClientStreamListener.java b/core/src/main/java/com/google/net/stubby/transport/ClientStreamListener.java index a1f8d987df..0ec84ce364 100644 --- a/core/src/main/java/com/google/net/stubby/transport/ClientStreamListener.java +++ b/core/src/main/java/com/google/net/stubby/transport/ClientStreamListener.java @@ -31,12 +31,9 @@ package com.google.net.stubby.transport; -import com.google.common.util.concurrent.ListenableFuture; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; -import javax.annotation.Nullable; - /** An observer of client-side stream events. */ public interface ClientStreamListener extends StreamListener { /** @@ -48,11 +45,8 @@ public interface ClientStreamListener extends StreamListener { *

This method should return quickly, as the same thread may be used to process other streams. * * @param headers the fully buffered received headers. - * @return a processing completion future, or {@code null} to indicate that processing of the - * headers is immediately complete. */ - @Nullable - ListenableFuture headersRead(Metadata.Headers headers); + void headersRead(Metadata.Headers headers); /** * Called when the stream is fully closed. {@link diff --git a/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java b/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java index 682b184f06..9c8cdb1438 100644 --- a/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java +++ b/core/src/main/java/com/google/net/stubby/transport/Http2ClientStream.java @@ -37,7 +37,6 @@ import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; import java.nio.charset.Charset; -import java.util.concurrent.Executor; import javax.annotation.Nullable; @@ -70,8 +69,8 @@ public abstract class Http2ClientStream extends AbstractClientStream { private Charset errorCharset = Charsets.UTF_8; private boolean contentTypeChecked; - protected Http2ClientStream(ClientStreamListener listener, Executor deframerExecutor) { - super(listener, deframerExecutor); + protected Http2ClientStream(ClientStreamListener listener) { + super(listener); } protected void transportHeadersReceived(Metadata.Headers headers) { diff --git a/core/src/main/java/com/google/net/stubby/transport/MessageDeframer.java b/core/src/main/java/com/google/net/stubby/transport/MessageDeframer.java index 162c025ae6..42b586593e 100644 --- a/core/src/main/java/com/google/net/stubby/transport/MessageDeframer.java +++ b/core/src/main/java/com/google/net/stubby/transport/MessageDeframer.java @@ -32,19 +32,13 @@ package com.google.net.stubby.transport; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.io.ByteStreams; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.Status; import java.io.ByteArrayInputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.Executor; import java.util.zip.GZIPInputStream; import javax.annotation.concurrent.NotThreadSafe; @@ -52,8 +46,8 @@ import javax.annotation.concurrent.NotThreadSafe; /** * Deframer for GRPC frames. * - *

This class is not thread-safe. All calls to this class must be made in the context of the - * executor provided during creation. That executor must not allow concurrent execution of tasks. + *

This class is not thread-safe. All calls to public methods should be made in the transport + * thread. */ @NotThreadSafe public class MessageDeframer implements Closeable { @@ -82,11 +76,8 @@ public class MessageDeframer implements Closeable { * * @param is stream containing the message. * @param length the length in bytes of the message. - * @return a future indicating when the application has completed processing the message. The - * next delivery will not occur until this future completes. If {@code null}, it is assumed that - * the application has completed processing the message upon returning from the method call. */ - ListenableFuture messageRead(InputStream is, int length); + void messageRead(InputStream is, int length); /** * Called when end-of-stream has not yet been reached but there are no complete messages @@ -105,65 +96,67 @@ public class MessageDeframer implements Closeable { } private final Listener listener; - private final Executor executor; private final Compression compression; private State state = State.HEADER; private int requiredLength = HEADER_LENGTH; private boolean compressedFlag; private boolean endOfStream; - private SettableFuture deliveryOutstanding; private CompositeBuffer nextFrame; private CompositeBuffer unprocessed = new CompositeBuffer(); + private long pendingDeliveries; + private boolean deliveryStalled = true; /** - * Create a deframer. All calls to this class must be made in the context of the provided - * executor, which also must not allow concurrent processing of Runnables. Compression will not be - * supported. + * Create a deframer. Compression will not be supported. * * @param listener listener for deframer events. - * @param executor used for internal event processing */ - public MessageDeframer(Listener listener, Executor executor) { - this(listener, executor, Compression.NONE); + public MessageDeframer(Listener listener) { + this(listener, Compression.NONE); } /** - * Create a deframer. All calls to this class must be made in the context of the provided - * executor, which also must not allow concurrent processing of Runnables. + * Create a deframer. * * @param listener listener for deframer events. - * @param executor used for internal event processing * @param compression the compression used if a compressed frame is encountered, with NONE meaning * unsupported */ - public MessageDeframer(Listener listener, Executor executor, Compression compression) { + public MessageDeframer(Listener listener, Compression compression) { this.listener = Preconditions.checkNotNull(listener, "sink"); - this.executor = Preconditions.checkNotNull(executor, "executor"); this.compression = Preconditions.checkNotNull(compression, "compression"); } /** - * Adds the given data to this deframer and attempts delivery to the sink. + * Requests up to the given number of messages from the call to be delivered to + * {@link Listener#messageRead(InputStream, int)}. No additional messages will be delivered. * - *

If returned future is not {@code null}, then it completes when no more deliveries are - * occuring. Delivering completes if all available deframing input is consumed or if delivery - * resulted in an exception, in which case this method may throw the exception or the returned - * future will fail with the throwable. The future is guaranteed to complete within the executor - * provided during construction. + * @param numMessages the requested number of messages to be delivered to the listener. */ - public ListenableFuture deframe(Buffer data, boolean endOfStream) { + public void request(int numMessages) { + Preconditions.checkArgument(numMessages > 0, "numMessages must be > 0"); + pendingDeliveries += numMessages; + deliver(); + } + + /** + * Adds the given data to this deframer and attempts delivery to the sink. + */ + public void deframe(Buffer data, boolean endOfStream) { Preconditions.checkNotNull(data, "data"); Preconditions.checkState(!this.endOfStream, "Past end of stream"); unprocessed.addBuffer(data); // Indicate that all of the data for this stream has been received. this.endOfStream = endOfStream; + deliver(); + } - if (isDeliveryOutstanding()) { - // Only allow one outstanding delivery at a time. - return null; - } - return deliver(); + /** + * Indicates whether delivery is currently stalled, pending receipt of more data. + */ + public boolean isStalled() { + return deliveryStalled; } @Override @@ -175,83 +168,23 @@ public class MessageDeframer implements Closeable { } /** - * Indicates whether or not there is currently a delivery outstanding to the application. + * Reads and delivers as many messages to the sink as possible. */ - public final boolean isDeliveryOutstanding() { - return deliveryOutstanding != null; - } - - /** - * Consider {@code future} to be a message currently being processed. Messages will not be - * delivered until the future completes. The returned future behaves as if it was returned by - * {@link #deframe(Buffer, boolean)}. - * - * @throws IllegalStateException if a message is already being processed - */ - public ListenableFuture delayProcessing(ListenableFuture future) { - Preconditions.checkState(!isDeliveryOutstanding(), "Only one delay allowed concurrently"); - if (future == null) { - return null; - } - return delayProcessingInternal(future); - } - - /** - * May only be called when a delivery is known not to be outstanding. If deliveryOutstanding is - * non-null, then it will be re-used and this method will return {@code null}. - */ - private ListenableFuture delayProcessingInternal(ListenableFuture future) { - Preconditions.checkNotNull(future, "future"); - // Return a separate future so that our callback is guaranteed to complete before any - // listeners on the returned future. - ListenableFuture returnFuture = null; - if (!isDeliveryOutstanding()) { - returnFuture = deliveryOutstanding = SettableFuture.create(); - } - Futures.addCallback(future, new FutureCallback() { - @Override - public void onFailure(Throwable t) { - SettableFuture previousOutstanding = deliveryOutstanding; - deliveryOutstanding = null; - previousOutstanding.setException(t); - } - - @Override - public void onSuccess(Object result) { - try { - deliver(); - } catch (Throwable t) { - if (!isDeliveryOutstanding()) { - throw Throwables.propagate(t); - } else { - onFailure(t); - } - } - } - }, executor); - return returnFuture; - } - - /** - * Reads and delivers as many messages to the sink as possible. May only be called when a delivery - * is known not to be outstanding. - */ - private ListenableFuture deliver() { + private void deliver() { // Process the uncompressed bytes. - while (readRequiredBytes()) { + boolean stalled = false; + while (pendingDeliveries > 0 && !(stalled = !readRequiredBytes())) { switch (state) { case HEADER: processHeader(); break; case BODY: - // Read the body and deliver the message to the sink. - ListenableFuture processingFuture = processBody(); - if (processingFuture != null) { - // A future was returned for the completion of processing the delivered - // message. Once it's done, try to deliver the next message. - return delayProcessingInternal(processingFuture); - } + // Read the body and deliver the message. + processBody(); + // Since we've delivered a message, decrement the number of pending + // deliveries remaining. + pendingDeliveries--; break; default: throw new AssertionError("Invalid state: " + state); @@ -259,25 +192,29 @@ public class MessageDeframer implements Closeable { } if (endOfStream) { - if (nextFrame.readableBytes() != 0) { - throw Status.INTERNAL - .withDescription("Encountered end-of-stream mid-frame") + if (!isDataAvailable()) { + listener.endOfStream(); + } else if (stalled) { + // We've received the entire stream and have data available but we don't have + // enough to read the next frame ... this is bad. + throw Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame") .asRuntimeException(); } - listener.endOfStream(); } - // All available messages have processed. - if (isDeliveryOutstanding()) { - SettableFuture previousOutstanding = deliveryOutstanding; - deliveryOutstanding = null; - previousOutstanding.set(null); - if (!endOfStream) { - // Notify that delivery is currently paused. - listener.deliveryStalled(); - } + // Never indicate that we're stalled if we've received all the data for the stream. + stalled &= !endOfStream; + + // If we're transitioning to the stalled state, notify the listener. + boolean previouslyStalled = deliveryStalled; + deliveryStalled = stalled; + if (stalled && !previouslyStalled) { + listener.deliveryStalled(); } - return null; + } + + private boolean isDataAvailable() { + return unprocessed.readableBytes() > 0 || (nextFrame != null && nextFrame.readableBytes() > 0); } /** @@ -335,35 +272,32 @@ public class MessageDeframer implements Closeable { * Processes the body of the GRPC compression frame. A single compression frame may contain * several GRPC messages within it. */ - private ListenableFuture processBody() { - ListenableFuture future; + private void processBody() { if (compressedFlag) { if (compression == Compression.NONE) { - throw Status.INTERNAL - .withDescription("Can't decode compressed frame as compression not configured.") - .asRuntimeException(); + throw Status.INTERNAL.withDescription( + "Can't decode compressed frame as compression not configured.").asRuntimeException(); } else if (compression == Compression.GZIP) { // Fully drain frame. byte[] bytes; try { - bytes = ByteStreams.toByteArray( - new GZIPInputStream(Buffers.openStream(nextFrame, false))); + bytes = + ByteStreams.toByteArray(new GZIPInputStream(Buffers.openStream(nextFrame, false))); } catch (IOException ex) { throw new RuntimeException(ex); } - future = listener.messageRead(new ByteArrayInputStream(bytes), bytes.length); + listener.messageRead(new ByteArrayInputStream(bytes), bytes.length); } else { throw new AssertionError("Unknown compression type"); } } else { // Don't close the frame, since the sink is now responsible for the life-cycle. - future = listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes()); + listener.messageRead(Buffers.openStream(nextFrame, true), nextFrame.readableBytes()); nextFrame = null; } // Done with this frame, begin processing the next header. state = State.HEADER; requiredLength = HEADER_LENGTH; - return future; } } diff --git a/core/src/main/java/com/google/net/stubby/transport/Stream.java b/core/src/main/java/com/google/net/stubby/transport/Stream.java index fd795bc5da..85c32cde81 100644 --- a/core/src/main/java/com/google/net/stubby/transport/Stream.java +++ b/core/src/main/java/com/google/net/stubby/transport/Stream.java @@ -41,6 +41,15 @@ import javax.annotation.Nullable; *

An implementation doesn't need to be thread-safe. */ public interface Stream { + /** + * Requests up to the given number of messages from the call to be delivered to + * {@link StreamListener#messageRead(java.io.InputStream, int)}. No additional messages will be + * delivered. + * + * @param numMessages the requested number of messages to be delivered to the listener. + */ + void request(int numMessages); + /** * Writes a message payload to the remote end-point. The bytes from the stream are immediate read * by the Transport. This method will always return immediately and will not wait for the write to diff --git a/core/src/main/java/com/google/net/stubby/transport/StreamListener.java b/core/src/main/java/com/google/net/stubby/transport/StreamListener.java index 2250e34b01..a38b51d442 100644 --- a/core/src/main/java/com/google/net/stubby/transport/StreamListener.java +++ b/core/src/main/java/com/google/net/stubby/transport/StreamListener.java @@ -31,12 +31,8 @@ package com.google.net.stubby.transport; -import com.google.common.util.concurrent.ListenableFuture; - import java.io.InputStream; -import javax.annotation.Nullable; - /** * An observer of {@link Stream} events. It is guaranteed to only have one concurrent callback at a * time. @@ -46,21 +42,12 @@ public interface StreamListener { * Called upon receiving a message from the remote end-point. The {@link InputStream} is * non-blocking and contains the entire message. * - *

The method optionally returns a future that can be observed by flow control to determine - * when the message has been processed by the application. If {@code null} is returned, processing - * of this message is assumed to be complete upon returning from this method. - * - *

The {@code message} {@link InputStream} will be closed when the returned future completes. - * If no future is returned, the stream will be closed immediately after returning from this - * method. + *

The provided {@code message} {@link InputStream} must be closed by the listener. * *

This method should return quickly, as the same thread may be used to process other streams. * * @param message the bytes of the message. * @param length the length of the message {@link InputStream}. - * @return a processing completion future, or {@code null} to indicate that processing of the - * message is immediately complete. */ - @Nullable - ListenableFuture messageRead(InputStream message, int length); + void messageRead(InputStream message, int length); } diff --git a/core/src/test/java/com/google/net/stubby/ClientInterceptorsTest.java b/core/src/test/java/com/google/net/stubby/ClientInterceptorsTest.java index bbd3fd493c..22ce6d1799 100644 --- a/core/src/test/java/com/google/net/stubby/ClientInterceptorsTest.java +++ b/core/src/test/java/com/google/net/stubby/ClientInterceptorsTest.java @@ -130,6 +130,7 @@ public class ClientInterceptorsTest { public void ordered() { final List order = new ArrayList(); channel = new Channel() { + @SuppressWarnings("unchecked") @Override public Call newCall(MethodDescriptor method) { order.add("channel"); @@ -199,9 +200,9 @@ public class ClientInterceptorsTest { public void start(Call.Listener responseListener, Metadata.Headers headers) { super.start(new ForwardingListener(responseListener) { @Override - public ListenableFuture onHeaders(Metadata.Headers headers) { + public void onHeaders(Metadata.Headers headers) { examinedHeaders.add(headers); - return super.onHeaders(headers); + super.onHeaders(headers); } }, headers); } diff --git a/core/src/test/java/com/google/net/stubby/ServerImplTest.java b/core/src/test/java/com/google/net/stubby/ServerImplTest.java index 9f3b9538fe..9cb6cf4b88 100644 --- a/core/src/test/java/com/google/net/stubby/ServerImplTest.java +++ b/core/src/test/java/com/google/net/stubby/ServerImplTest.java @@ -34,21 +34,18 @@ package com.google.net.stubby; import static com.google.common.base.Charsets.UTF_8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.notNull; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.AbstractService; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; -import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.transport.ServerStream; import com.google.net.stubby.transport.ServerStreamListener; import com.google.net.stubby.transport.ServerTransportListener; @@ -71,8 +68,6 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; /** Unit tests for {@link ServerImpl}. */ @@ -86,7 +81,9 @@ public class ServerImplTest { private Service transportServer = new NoopService(); private ServerImpl server = new ServerImpl(executor, registry) .setTransportServer(transportServer); - private ServerStream stream = Mockito.mock(ServerStream.class); + + @Mock + private ServerStream stream; @Mock private ServerCall.Listener callListener; @@ -238,9 +235,8 @@ public class ServerImplTest { assertNotNull(call); String order = "Lots of pizza, please"; - ListenableFuture future = streamListener.messageRead(STRING_MARSHALLER.stream(order), 1); - future.get(); - verify(callListener).onPayload(order); + streamListener.messageRead(STRING_MARSHALLER.stream(order), 1); + verify(callListener, timeout(2000)).onPayload(order); call.sendPayload(314); ArgumentCaptor inputCaptor = ArgumentCaptor.forClass(InputStream.class); @@ -297,48 +293,6 @@ public class ServerImplTest { verifyNoMoreInteractions(stream); } - @Test - public void futureStatusIsPropagatedToTransport() throws Exception { - final AtomicReference> callReference - = new AtomicReference>(); - registry.addService(ServerServiceDefinition.builder("Waiter") - .addMethod("serve", STRING_MARSHALLER, INTEGER_MARSHALLER, - new ServerCallHandler() { - @Override - public ServerCall.Listener startCall(String fullMethodName, - ServerCall call, Metadata.Headers headers) { - callReference.set(call); - return callListener; - } - }).build()); - ServerTransportListener transportListener = newTransport(server); - - ServerStreamListener streamListener - = transportListener.streamCreated(stream, "/Waiter/serve", new Metadata.Headers()); - assertNotNull(streamListener); - - executeBarrier(executor).await(); - ServerCall call = callReference.get(); - assertNotNull(call); - - String delay = "No, I've not looked over the menu yet"; - SettableFuture appFuture = SettableFuture.create(); - when(callListener.onPayload(delay)).thenReturn(appFuture); - ListenableFuture future = streamListener.messageRead(STRING_MARSHALLER.stream(delay), 1); - executeBarrier(executor).await(); - verify(callListener).onPayload(delay); - try { - future.get(0, TimeUnit.SECONDS); - fail(); - } catch (TimeoutException ex) { - // Expected. - } - - appFuture.set(null); - // Shouldn't throw. - future.get(0, TimeUnit.SECONDS); - } - private static ServerTransportListener newTransport(ServerImpl server) { Service transport = new NoopService(); transport.startAsync(); diff --git a/core/src/test/java/com/google/net/stubby/transport/MessageDeframerTest.java b/core/src/test/java/com/google/net/stubby/transport/MessageDeframerTest.java index 3365b3cab9..bab8f598b5 100644 --- a/core/src/test/java/com/google/net/stubby/transport/MessageDeframerTest.java +++ b/core/src/test/java/com/google/net/stubby/transport/MessageDeframerTest.java @@ -32,25 +32,16 @@ package com.google.net.stubby.transport; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; import com.google.common.io.ByteStreams; import com.google.common.primitives.Bytes; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.transport.MessageDeframer.Listener; import org.junit.Test; @@ -62,7 +53,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.List; -import java.util.concurrent.ExecutionException; import java.util.zip.GZIPOutputStream; /** @@ -71,13 +61,13 @@ import java.util.zip.GZIPOutputStream; @RunWith(JUnit4.class) public class MessageDeframerTest { private Listener listener = mock(Listener.class); - private MessageDeframer deframer = - new MessageDeframer(listener, MoreExecutors.directExecutor()); + private MessageDeframer deframer = new MessageDeframer(listener); private ArgumentCaptor messages = ArgumentCaptor.forClass(InputStream.class); @Test public void simplePayload() { - assertNull(deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 2, 3, 14}), false)); + deframer.request(1); + deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 2, 3, 14}), false); verify(listener).messageRead(messages.capture(), eq(2)); assertEquals(Bytes.asList(new byte[]{3, 14}), bytes(messages)); verify(listener, atLeastOnce()).bytesRead(anyInt()); @@ -86,8 +76,8 @@ public class MessageDeframerTest { @Test public void smallCombinedPayloads() { - assertNull( - deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false)); + deframer.request(2); + deframer.deframe(buffer(new byte[]{0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); verify(listener).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); verify(listener).messageRead(messages.capture(), eq(2)); @@ -98,7 +88,8 @@ public class MessageDeframerTest { @Test public void endOfStreamWithPayloadShouldNotifyEndOfStream() { - assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true)); + deframer.request(1); + deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true); verify(listener).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); verify(listener).endOfStream(); @@ -108,17 +99,18 @@ public class MessageDeframerTest { @Test public void endOfStreamShouldNotifyEndOfStream() { - assertNull(deframer.deframe(buffer(new byte[0]), true)); + deframer.deframe(buffer(new byte[0]), true); verify(listener).endOfStream(); verifyNoMoreInteractions(listener); } @Test public void payloadSplitBetweenBuffers() { - assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9}), false)); + deframer.request(1); + deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 7, 3, 14, 1, 5, 9}), false); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - assertNull(deframer.deframe(buffer(new byte[] {2, 6}), false)); + deframer.deframe(buffer(new byte[] {2, 6}), false); verify(listener).messageRead(messages.capture(), eq(7)); assertEquals(Bytes.asList(new byte[] {3, 14, 1, 5, 9, 2, 6}), bytes(messages)); verify(listener, atLeastOnce()).bytesRead(anyInt()); @@ -127,10 +119,12 @@ public class MessageDeframerTest { @Test public void frameHeaderSplitBetweenBuffers() { - assertNull(deframer.deframe(buffer(new byte[] {0, 0}), false)); + deframer.request(1); + + deframer.deframe(buffer(new byte[] {0, 0}), false); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); - assertNull(deframer.deframe(buffer(new byte[] {0, 0, 1, 3}), false)); + deframer.deframe(buffer(new byte[] {0, 0, 1, 3}), false); verify(listener).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); verify(listener, atLeastOnce()).bytesRead(anyInt()); @@ -139,7 +133,8 @@ public class MessageDeframerTest { @Test public void emptyPayload() { - assertNull(deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 0}), false)); + deframer.request(1); + deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 0}), false); verify(listener).messageRead(messages.capture(), eq(0)); assertEquals(Bytes.asList(), bytes(messages)); verify(listener, atLeastOnce()).bytesRead(anyInt()); @@ -148,8 +143,9 @@ public class MessageDeframerTest { @Test public void largerFrameSize() { - assertNull(deframer.deframe( - Buffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false)); + deframer.request(1); + deframer.deframe( + Buffers.wrap(Bytes.concat(new byte[] {0, 0, 0, 3, (byte) 232}, new byte[1000])), false); verify(listener).messageRead(messages.capture(), eq(1000)); assertEquals(Bytes.asList(new byte[1000]), bytes(messages)); verify(listener, atLeastOnce()).bytesRead(anyInt()); @@ -157,110 +153,23 @@ public class MessageDeframerTest { } @Test - public void payloadCallbackShouldWaitForFutureCompletion() { - SettableFuture messageFuture = SettableFuture.create(); - when(listener.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture); - // Deframe a block with 2 messages. - ListenableFuture deframeFuture - = deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); - assertNotNull(deframeFuture); - verify(listener).messageRead(messages.capture(), eq(1)); - assertEquals(Bytes.asList(new byte[]{3}), bytes(messages)); - verify(listener, atLeastOnce()).bytesRead(anyInt()); + public void endOfStreamCallbackShouldWaitForMessageDelivery() { + deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true); verifyNoMoreInteractions(listener); - SettableFuture messageFuture2 = SettableFuture.create(); - when(listener.messageRead(any(InputStream.class), eq(2))).thenReturn(messageFuture2); - messageFuture.set(null); - assertFalse(deframeFuture.isDone()); - verify(listener).messageRead(messages.capture(), eq(2)); - assertEquals(Bytes.asList(new byte[] {14, 15}), bytes(messages)); - verify(listener, atLeastOnce()).bytesRead(anyInt()); - verifyNoMoreInteractions(listener); - - messageFuture2.set(null); - assertTrue(deframeFuture.isDone()); - - verify(listener, atLeastOnce()).bytesRead(anyInt()); - verify(listener).deliveryStalled(); - verifyNoMoreInteractions(listener); - } - - @Test - public void endOfStreamCallbackShouldWaitForFutureCompletion() { - SettableFuture messageFuture = SettableFuture.create(); - when(listener.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture); - ListenableFuture deframeFuture - = deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3}), true); - assertNotNull(deframeFuture); + deframer.request(1); verify(listener).messageRead(messages.capture(), eq(1)); assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); - verify(listener, atLeastOnce()).bytesRead(anyInt()); - verifyNoMoreInteractions(listener); - - messageFuture.set(null); - assertTrue(deframeFuture.isDone()); verify(listener).endOfStream(); verify(listener, atLeastOnce()).bytesRead(anyInt()); verifyNoMoreInteractions(listener); } - @Test - public void futureShouldPropagateThrownException() throws InterruptedException { - SettableFuture messageFuture = SettableFuture.create(); - when(listener.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture); - ListenableFuture deframeFuture - = deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); - assertNotNull(deframeFuture); - verify(listener).messageRead(messages.capture(), eq(1)); - assertEquals(Bytes.asList(new byte[]{3}), bytes(messages)); - verify(listener, atLeastOnce()).bytesRead(anyInt()); - verifyNoMoreInteractions(listener); - - RuntimeException thrownEx = new RuntimeException(); - when(listener.messageRead(any(InputStream.class), eq(2))).thenThrow(thrownEx); - messageFuture.set(null); - verify(listener).messageRead(messages.capture(), eq(2)); - assertTrue(deframeFuture.isDone()); - try { - deframeFuture.get(); - fail("Should have throws ExecutionException"); - } catch (ExecutionException ex) { - assertEquals(thrownEx, ex.getCause()); - } - verify(listener, atLeastOnce()).bytesRead(anyInt()); - verifyNoMoreInteractions(listener); - } - - @Test - public void futureFailureShouldStopAndPropagateFailure() throws InterruptedException { - SettableFuture messageFuture = SettableFuture.create(); - when(listener.messageRead(any(InputStream.class), eq(1))).thenReturn(messageFuture); - ListenableFuture deframeFuture - = deframer.deframe(buffer(new byte[] {0, 0, 0, 0, 1, 3, 0, 0, 0, 0, 2, 14, 15}), false); - assertNotNull(deframeFuture); - verify(listener).messageRead(messages.capture(), eq(1)); - assertEquals(Bytes.asList(new byte[] {3}), bytes(messages)); - verify(listener, atLeastOnce()).bytesRead(anyInt()); - verifyNoMoreInteractions(listener); - - RuntimeException thrownEx = new RuntimeException(); - messageFuture.setException(thrownEx); - assertTrue(deframeFuture.isDone()); - try { - deframeFuture.get(); - fail("Should have throws ExecutionException"); - } catch (ExecutionException ex) { - assertEquals(thrownEx, ex.getCause()); - } - verify(listener, atLeastOnce()).bytesRead(anyInt()); - verifyNoMoreInteractions(listener); - } - @Test public void compressed() { - deframer = new MessageDeframer( - listener, MoreExecutors.directExecutor(), MessageDeframer.Compression.GZIP); + deframer = new MessageDeframer(listener, MessageDeframer.Compression.GZIP); + deframer.request(1); + byte[] payload = compress(new byte[1000]); assertTrue(payload.length < 100); byte[] header = new byte[] {1, 0, 0, 0, (byte) payload.length}; diff --git a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java index f557985ed1..cadb10a67b 100644 --- a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java +++ b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java @@ -39,7 +39,6 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; import com.google.net.stubby.AbstractServerBuilder; @@ -75,12 +74,12 @@ import org.mockito.ArgumentCaptor; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; /** @@ -426,22 +425,18 @@ public abstract class AbstractTransportTest { // Start the call and prepare capture of results. final List results = Collections.synchronizedList(new ArrayList()); - final List> processedFutures = - Collections.synchronizedList(new LinkedList>()); final SettableFuture completionFuture = SettableFuture.create(); + final AtomicInteger count = new AtomicInteger(); call.start(new Call.Listener() { @Override - public ListenableFuture onHeaders(Metadata.Headers headers) { - return null; + public void onHeaders(Metadata.Headers headers) { } @Override - public ListenableFuture onPayload(final StreamingOutputCallResponse payload) { - SettableFuture processedFuture = SettableFuture.create(); + public void onPayload(final StreamingOutputCallResponse payload) { results.add(payload); - processedFutures.add(processedFuture); - return processedFuture; + count.incrementAndGet(); } @Override @@ -460,17 +455,9 @@ public abstract class AbstractTransportTest { // Slowly set completion on all of the futures. int expectedResults = responseSizes.size(); - int count = 0; - while (count < expectedResults) { - if (!processedFutures.isEmpty()) { - assertEquals(1, processedFutures.size()); - assertEquals(count + 1, results.size()); - count++; - - // Remove and set the first future to allow receipt of additional messages - // from flow control. - processedFutures.remove(0).set(null); - } + while (count.get() < expectedResults) { + // Allow one more inbound message to be delivered to the application. + call.request(1); // Sleep a bit. Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientStream.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientStream.java index af9131240f..d9e3c3039e 100644 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientStream.java +++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientStream.java @@ -51,11 +51,21 @@ class NettyClientStream extends Http2ClientStream { private final NettyClientHandler handler; NettyClientStream(ClientStreamListener listener, Channel channel, NettyClientHandler handler) { - super(listener, channel.eventLoop()); + super(listener); this.channel = checkNotNull(channel, "channel"); this.handler = checkNotNull(handler, "handler"); } + @Override + public void request(final int numMessages) { + channel.eventLoop().execute(new Runnable() { + @Override + public void run() { + requestMessagesFromDeframer(numMessages); + } + }); + } + void transportHeadersReceived(Http2Headers headers, boolean endOfStream) { if (endOfStream) { transportTrailersReceived(Utils.convertTrailers(headers)); diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerStream.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerStream.java index c4241a8766..a5ef06e020 100644 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerStream.java +++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerStream.java @@ -51,7 +51,7 @@ class NettyServerStream extends AbstractServerStream { private final NettyServerHandler handler; NettyServerStream(Channel channel, int id, NettyServerHandler handler) { - super(id, channel.eventLoop()); + super(id); this.channel = checkNotNull(channel, "channel"); this.handler = checkNotNull(handler, "handler"); } @@ -60,6 +60,16 @@ class NettyServerStream extends AbstractServerStream { super.inboundDataReceived(new NettyBuffer(frame.retain()), endOfStream); } + @Override + public void request(final int numMessages) { + channel.eventLoop().execute(new Runnable() { + @Override + public void run() { + requestMessagesFromDeframer(numMessages); + } + }); + } + @Override protected void inboundDeliveryPaused() { // Do nothing. diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java index 112780c2c3..1c4985498f 100644 --- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyClientStreamTest.java @@ -45,9 +45,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; import com.google.net.stubby.transport.AbstractStream; @@ -64,7 +62,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.Mockito; import java.io.InputStream; @@ -202,9 +199,9 @@ public class NettyClientStreamTest extends NettyStreamTestBase { verify(listener, never()).closed(any(Status.class), any(Metadata.Trailers.class)); // We are now waiting for 100 bytes of error context on the stream, cancel has not yet been sent - Mockito.verify(channel, never()).writeAndFlush(any(CancelStreamCommand.class)); + verify(channel, never()).writeAndFlush(any(CancelStreamCommand.class)); stream().transportDataReceived(Unpooled.buffer(100).writeZero(100), false); - Mockito.verify(channel, never()).writeAndFlush(any(CancelStreamCommand.class)); + verify(channel, never()).writeAndFlush(any(CancelStreamCommand.class)); stream().transportDataReceived(Unpooled.buffer(1000).writeZero(1000), false); // Now verify that cancel is sent and an error is reported to the listener @@ -226,10 +223,6 @@ public class NettyClientStreamTest extends NettyStreamTestBase { @Test public void deframedDataAfterCancelShouldBeIgnored() throws Exception { - // Mock the listener to return this future when a message is read. - final SettableFuture future = SettableFuture.create(); - when(listener.messageRead(any(InputStream.class), anyInt())).thenReturn(future); - stream().id(1); // Receive headers first so that it's a valid GRPC response. stream().transportHeadersReceived(grpcResponseHeaders(), false); @@ -238,6 +231,9 @@ public class NettyClientStreamTest extends NettyStreamTestBase { stream().transportDataReceived(simpleGrpcFrame(), false); stream().transportDataReceived(simpleGrpcFrame(), false); + // Only allow the first to be delivered. + stream().request(1); + // Receive error trailers. The server status will not be processed until after all of the // data frames have been processed. Since cancellation will interrupt message delivery, // this status will never be processed and the listener will instead only see the @@ -251,9 +247,8 @@ public class NettyClientStreamTest extends NettyStreamTestBase { Metadata.Trailers trailers = Utils.convertTrailers(grpcResponseTrailers(Status.CANCELLED)); stream().transportReportStatus(Status.CANCELLED, true, trailers); - // Now complete the future to trigger the deframer to fire the next message to the - // stream. - future.set(null); + // Now allow the delivery of the second. + stream().request(1); // Verify that the listener was only notified of the first message, not the second. verify(listener).messageRead(any(InputStream.class), anyInt()); diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java index d749ea07a7..0bfd8a65a3 100644 --- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerHandlerTest.java @@ -44,6 +44,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -163,6 +164,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { private void inboundDataShouldForwardToStreamListener(boolean endStream) throws Exception { createStream(); + stream.request(1); // Create a data frame and then trigger the handler to read it. ByteBuf frame = dataFrame(STREAM_ID, endStream); @@ -180,6 +182,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { @Test public void clientHalfCloseShouldForwardToStreamListener() throws Exception { createStream(); + stream.request(1); handler.channelRead(ctx, emptyGrpcFrame(STREAM_ID, true)); ArgumentCaptor captor = ArgumentCaptor.forClass(InputStream.class); @@ -202,11 +205,12 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { @Test public void streamErrorShouldNotCloseChannel() throws Exception { createStream(); + stream.request(1); // When a DATA frame is read, throw an exception. It will be converted into an // Http2StreamException. RuntimeException e = new RuntimeException("Fake Exception"); - when(streamListener.messageRead(any(InputStream.class), anyInt())).thenThrow(e); + doThrow(e).when(streamListener).messageRead(any(InputStream.class), anyInt()); // Read a DATA frame to trigger the exception. handler.channelRead(ctx, emptyGrpcFrame(STREAM_ID, true)); @@ -217,7 +221,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { // Verify the stream was closed. ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); verify(streamListener).closed(captor.capture()); - assertEquals(e, captor.getValue().asException().getCause().getCause()); + assertEquals(e, captor.getValue().asException().getCause()); assertEquals(Code.INTERNAL, captor.getValue().getCode()); } @@ -225,7 +229,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { public void connectionErrorShouldCloseChannel() throws Exception { createStream(); - // Read a DATA frame to trigger the exception. + // Read a bad frame to trigger the exception. handler.channelRead(ctx, badFrame()); // Verify the expected GO_AWAY frame was written. diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerStreamTest.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerStreamTest.java index a2605c480c..c109a69572 100644 --- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerStreamTest.java +++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyServerStreamTest.java @@ -32,7 +32,6 @@ package com.google.net.stubby.transport.netty; import static com.google.net.stubby.transport.netty.NettyTestUtil.messageFrame; -import static com.google.net.stubby.transport.netty.NettyTestUtil.statusFrame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; diff --git a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyStreamTestBase.java b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyStreamTestBase.java index 249ec6379a..ad04847d67 100644 --- a/netty/src/test/java/com/google/net/stubby/transport/netty/NettyStreamTestBase.java +++ b/netty/src/test/java/com/google/net/stubby/transport/netty/NettyStreamTestBase.java @@ -35,14 +35,12 @@ import static com.google.net.stubby.transport.netty.NettyTestUtil.messageFrame; import static io.netty.util.CharsetUtil.UTF_8; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.transport.AbstractStream; import com.google.net.stubby.transport.StreamListener; @@ -95,8 +93,6 @@ public abstract class NettyStreamTestBase { @Mock protected ChannelPromise promise; - protected SettableFuture processingFuture; - protected InputStream input; protected AbstractStream stream; @@ -114,9 +110,6 @@ public abstract class NettyStreamTestBase { when(pipeline.firstContext()).thenReturn(ctx); when(eventLoop.inEventLoop()).thenReturn(true); - processingFuture = SettableFuture.create(); - when(listener().messageRead(any(InputStream.class), anyInt())).thenReturn(processingFuture); - doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { @@ -132,6 +125,8 @@ public abstract class NettyStreamTestBase { @Test public void inboundMessageShouldCallListener() throws Exception { + stream.request(1); + if (stream instanceof NettyServerStream) { ((NettyServerStream) stream).inboundDataReceived(messageFrame(MESSAGE), false); } else { @@ -142,10 +137,6 @@ public abstract class NettyStreamTestBase { // Verify that inbound flow control window update has been disabled for the stream. assertEquals(MESSAGE, NettyTestUtil.toString(captor.getValue())); - - // Verify that inbound flow control window update has been re-enabled for the stream after - // the future completes. - processingFuture.set(null); } protected abstract AbstractStream createStream(); diff --git a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java index 0c5baa199b..59a07c846a 100644 --- a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientStream.java @@ -42,7 +42,6 @@ import com.squareup.okhttp.internal.spdy.Header; import java.nio.ByteBuffer; import java.util.List; -import java.util.concurrent.Executor; import javax.annotation.concurrent.GuardedBy; @@ -57,28 +56,11 @@ class OkHttpClientStream extends Http2ClientStream { /** * Construct a new client stream. */ - static OkHttpClientStream newStream(final Executor executor, ClientStreamListener listener, + static OkHttpClientStream newStream(ClientStreamListener listener, AsyncFrameWriter frameWriter, OkHttpClientTransport transport, OutboundFlowController outboundFlow) { - // Create a lock object that can be used by both the executor and methods in the stream - // to ensure consistent locking behavior. - final Object executorLock = new Object(); - Executor synchronizingExecutor = new Executor() { - @Override - public void execute(final Runnable command) { - executor.execute(new Runnable() { - @Override - public void run() { - synchronized (executorLock) { - command.run(); - } - } - }); - } - }; - return new OkHttpClientStream(synchronizingExecutor, listener, frameWriter, transport, - executorLock, outboundFlow); + return new OkHttpClientStream(listener, frameWriter, transport, outboundFlow); } @GuardedBy("executorLock") @@ -88,25 +70,28 @@ class OkHttpClientStream extends Http2ClientStream { private final AsyncFrameWriter frameWriter; private final OutboundFlowController outboundFlow; private final OkHttpClientTransport transport; - // Lock used to synchronize with work done on the executor. - private final Object executorLock; + private final Object lock = new Object(); private Object outboundFlowState; - private OkHttpClientStream(final Executor executor, - final ClientStreamListener listener, + private OkHttpClientStream(ClientStreamListener listener, AsyncFrameWriter frameWriter, OkHttpClientTransport transport, - Object executorLock, OutboundFlowController outboundFlow) { - super(listener, executor); + super(listener); this.frameWriter = frameWriter; this.transport = transport; - this.executorLock = executorLock; this.outboundFlow = outboundFlow; } + @Override + public void request(final int numMessages) { + synchronized (lock) { + requestMessagesFromDeframer(numMessages); + } + } + public void transportHeadersReceived(List

headers, boolean endOfStream) { - synchronized (executorLock) { + synchronized (lock) { if (endOfStream) { transportTrailersReceived(Utils.convertTrailers(headers)); } else { @@ -120,7 +105,7 @@ class OkHttpClientStream extends Http2ClientStream { * the future listeners (executed by synchronizedExecutor) will not be executed in the same time. */ public void transportDataReceived(okio.Buffer frame, boolean endOfStream) { - synchronized (executorLock) { + synchronized (lock) { long length = frame.size(); window -= length; super.transportDataReceived(new OkHttpBuffer(frame), endOfStream); @@ -143,7 +128,7 @@ class OkHttpClientStream extends Http2ClientStream { @Override protected void returnProcessedBytes(int processedBytes) { - synchronized (executorLock) { + synchronized (lock) { processedWindow -= processedBytes; if (processedWindow <= WINDOW_UPDATE_THRESHOLD) { int delta = OkHttpClientTransport.DEFAULT_INITIAL_WINDOW_SIZE - processedWindow; @@ -157,7 +142,7 @@ class OkHttpClientStream extends Http2ClientStream { @Override public void transportReportStatus(Status newStatus, boolean stopDelivery, Metadata.Trailers trailers) { - synchronized (executorLock) { + synchronized (lock) { super.transportReportStatus(newStatus, stopDelivery, trailers); } } diff --git a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransport.java index 83ccc802bb..a48a1f3e7f 100644 --- a/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransport.java @@ -166,7 +166,7 @@ public class OkHttpClientTransport extends AbstractClientTransport { protected ClientStream newStreamInternal(MethodDescriptor method, Metadata.Headers headers, ClientStreamListener listener) { - OkHttpClientStream clientStream = OkHttpClientStream.newStream(executor, listener, + OkHttpClientStream clientStream = OkHttpClientStream.newStream(listener, frameWriter, this, outboundFlow); if (goAway) { clientStream.transportReportStatus(goAwayStatus, false, new Metadata.Trailers()); diff --git a/okhttp/src/test/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransportTest.java index d7015373c9..87f7572b4a 100644 --- a/okhttp/src/test/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/com/google/net/stubby/transport/okhttp/OkHttpClientTransportTest.java @@ -45,7 +45,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Service; import com.google.common.util.concurrent.Service.State; import com.google.net.stubby.Metadata; @@ -137,8 +136,8 @@ public class OkHttpClientTransportTest { public void nextFrameThrowIOException() throws Exception { MockStreamListener listener1 = new MockStreamListener(); MockStreamListener listener2 = new MockStreamListener(); - clientTransport.newStream(method, new Metadata.Headers(), listener1); - clientTransport.newStream(method, new Metadata.Headers(), listener2); + clientTransport.newStream(method, new Metadata.Headers(), listener1).request(1); + clientTransport.newStream(method, new Metadata.Headers(), listener2).request(1); assertEquals(2, streams.size()); assertTrue(streams.containsKey(3)); assertTrue(streams.containsKey(5)); @@ -158,7 +157,7 @@ public class OkHttpClientTransportTest { final int numMessages = 10; final String message = "Hello Client"; MockStreamListener listener = new MockStreamListener(); - clientTransport.newStream(method, new Metadata.Headers(), listener); + clientTransport.newStream(method, new Metadata.Headers(), listener).request(numMessages); assertTrue(streams.containsKey(3)); frameHandler.headers(false, false, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS); assertNotNull(listener.headers); @@ -179,7 +178,7 @@ public class OkHttpClientTransportTest { @Test public void invalidInboundHeadersCancelStream() throws Exception { MockStreamListener listener = new MockStreamListener(); - clientTransport.newStream(method, new Metadata.Headers(), listener); + clientTransport.newStream(method, new Metadata.Headers(), listener).request(1); assertTrue(streams.containsKey(3)); // Empty headers block without correct content type or status frameHandler.headers(false, false, 3, 0, new ArrayList
(), @@ -246,8 +245,8 @@ public class OkHttpClientTransportTest { public void windowUpdate() throws Exception { MockStreamListener listener1 = new MockStreamListener(); MockStreamListener listener2 = new MockStreamListener(); - clientTransport.newStream(method,new Metadata.Headers(), listener1); - clientTransport.newStream(method,new Metadata.Headers(), listener2); + clientTransport.newStream(method,new Metadata.Headers(), listener1).request(2); + clientTransport.newStream(method,new Metadata.Headers(), listener2).request(2); assertEquals(2, streams.size()); OkHttpClientStream stream1 = streams.get(3); OkHttpClientStream stream2 = streams.get(5); @@ -299,7 +298,7 @@ public class OkHttpClientTransportTest { @Test public void windowUpdateWithInboundFlowControl() throws Exception { MockStreamListener listener = new MockStreamListener(); - clientTransport.newStream(method, new Metadata.Headers(), listener); + clientTransport.newStream(method, new Metadata.Headers(), listener).request(1); OkHttpClientStream stream = streams.get(3); int messageLength = OkHttpClientTransport.DEFAULT_INITIAL_WINDOW_SIZE / 2 + 1; @@ -342,8 +341,8 @@ public class OkHttpClientTransportTest { // start 2 streams. MockStreamListener listener1 = new MockStreamListener(); MockStreamListener listener2 = new MockStreamListener(); - clientTransport.newStream(method,new Metadata.Headers(), listener1); - clientTransport.newStream(method,new Metadata.Headers(), listener2); + clientTransport.newStream(method,new Metadata.Headers(), listener1).request(1); + clientTransport.newStream(method,new Metadata.Headers(), listener2).request(1); assertEquals(2, streams.size()); // Receive goAway, max good id is 3. @@ -494,18 +493,16 @@ public class OkHttpClientTransportTest { } @Override - public ListenableFuture headersRead(Metadata.Headers headers) { + public void headersRead(Metadata.Headers headers) { this.headers = headers; - return null; } @Override - public ListenableFuture messageRead(InputStream message, int length) { + public void messageRead(InputStream message, int length) { String msg = getContent(message); if (msg != null) { messages.add(msg); } - return null; } @Override @@ -522,13 +519,18 @@ public class OkHttpClientTransportTest { } static String getContent(InputStream message) { - BufferedReader br = - new BufferedReader(new InputStreamReader(message, UTF_8)); + BufferedReader br = new BufferedReader(new InputStreamReader(message, UTF_8)); try { // Only one line message is used in this test. return br.readLine(); } catch (IOException e) { return null; + } finally { + try { + message.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } } } } diff --git a/stub/src/main/java/com/google/net/stubby/stub/Calls.java b/stub/src/main/java/com/google/net/stubby/stub/Calls.java index 279e8408f2..f47a9eaae1 100644 --- a/stub/src/main/java/com/google/net/stubby/stub/Calls.java +++ b/stub/src/main/java/com/google/net/stubby/stub/Calls.java @@ -160,7 +160,7 @@ public class Calls { ReqT param, StreamObserver responseObserver) { asyncServerStreamingCall(call, param, - new StreamObserverToCallListenerAdapter(responseObserver)); + new StreamObserverToCallListenerAdapter(call, responseObserver)); } private static void asyncServerStreamingCall( @@ -168,6 +168,7 @@ public class Calls { ReqT param, Call.Listener responseListener) { call.start(responseListener, new Metadata.Headers()); + call.request(1); try { call.sendPayload(param); call.halfClose(); @@ -217,10 +218,11 @@ public class Calls { * Execute a duplex-streaming call. * @return request stream observer. */ - public static StreamObserver duplexStreamingCall( - Call call, StreamObserver responseObserver) { - call.start(new StreamObserverToCallListenerAdapter(responseObserver), + public static StreamObserver duplexStreamingCall(Call call, + StreamObserver responseObserver) { + call.start(new StreamObserverToCallListenerAdapter(call, responseObserver), new Metadata.Headers()); + call.request(1); return new CallToStreamObserverAdapter(call); } @@ -248,22 +250,25 @@ public class Calls { } } - private static class StreamObserverToCallListenerAdapter extends Call.Listener { - private final StreamObserver observer; + private static class StreamObserverToCallListenerAdapter extends Call.Listener { + private final Call call; + private final StreamObserver observer; - public StreamObserverToCallListenerAdapter(StreamObserver observer) { + public StreamObserverToCallListenerAdapter(Call call, StreamObserver observer) { + this.call = call; this.observer = observer; } @Override - public ListenableFuture onHeaders(Metadata.Headers headers) { - return null; + public void onHeaders(Metadata.Headers headers) { } @Override - public ListenableFuture onPayload(T payload) { + public void onPayload(RespT payload) { observer.onValue(payload); - return null; + + // Request delivery of the next inbound message. + call.request(1); } @Override @@ -288,18 +293,16 @@ public class Calls { } @Override - public ListenableFuture onHeaders(Metadata.Headers headers) { - return null; + public void onHeaders(Metadata.Headers headers) { } @Override - public ListenableFuture onPayload(RespT value) { + public void onPayload(RespT value) { if (this.value != null) { throw Status.INTERNAL.withDescription("More than one value received for unary call") .asRuntimeException(); } this.value = value; - return null; } @Override @@ -357,11 +360,13 @@ public class Calls { if (!hasNext()) { throw new NoSuchElementException(); } - @SuppressWarnings("unchecked") - Payload tmp = (Payload) last; - last = null; - tmp.processed.set(null); - return tmp.value; + try { + @SuppressWarnings("unchecked") + T tmp = (T) last; + return tmp; + } finally { + last = null; + } } @Override @@ -373,16 +378,13 @@ public class Calls { private boolean done = false; @Override - public ListenableFuture onHeaders(Metadata.Headers headers) { - return null; + public void onHeaders(Metadata.Headers headers) { } @Override - public ListenableFuture onPayload(T value) { + public void onPayload(T value) { Preconditions.checkState(!done, "Call already closed"); - SettableFuture future = SettableFuture.create(); - buffer.add(new Payload(value, future)); - return future; + buffer.add(value); } @Override @@ -397,14 +399,4 @@ public class Calls { } } } - - private static class Payload { - public final T value; - public final SettableFuture processed; - - public Payload(T value, SettableFuture processed) { - this.value = value; - this.processed = processed; - } - } } diff --git a/stub/src/main/java/com/google/net/stubby/stub/MetadataUtils.java b/stub/src/main/java/com/google/net/stubby/stub/MetadataUtils.java index 2ab7e76da2..c382aca576 100644 --- a/stub/src/main/java/com/google/net/stubby/stub/MetadataUtils.java +++ b/stub/src/main/java/com/google/net/stubby/stub/MetadataUtils.java @@ -31,7 +31,6 @@ package com.google.net.stubby.stub; -import com.google.common.util.concurrent.ListenableFuture; import com.google.net.stubby.Call; import com.google.net.stubby.Channel; import com.google.net.stubby.ClientInterceptor; @@ -122,9 +121,9 @@ public class MetadataUtils { trailersCapture.set(null); super.start(new ForwardingListener(responseListener) { @Override - public ListenableFuture onHeaders(Metadata.Headers headers) { + public void onHeaders(Metadata.Headers headers) { headersCapture.set(headers); - return super.onHeaders(headers); + super.onHeaders(headers); } @Override diff --git a/stub/src/main/java/com/google/net/stubby/stub/ServerCalls.java b/stub/src/main/java/com/google/net/stubby/stub/ServerCalls.java index 2f4198c8df..c68ef14832 100644 --- a/stub/src/main/java/com/google/net/stubby/stub/ServerCalls.java +++ b/stub/src/main/java/com/google/net/stubby/stub/ServerCalls.java @@ -31,7 +31,6 @@ package com.google.net.stubby.stub; -import com.google.common.util.concurrent.ListenableFuture; import com.google.net.stubby.Metadata; import com.google.net.stubby.ServerCall; import com.google.net.stubby.ServerCallHandler; @@ -59,21 +58,24 @@ public class ServerCalls { public ServerCall.Listener startCall( String fullMethodName, final ServerCall call, Metadata.Headers headers) { final ResponseObserver responseObserver = new ResponseObserver(call); + call.request(1); return new EmptyServerCallListener() { ReqT request; @Override - public ListenableFuture onPayload(ReqT request) { + public void onPayload(ReqT request) { if (this.request == null) { // We delay calling method.invoke() until onHalfClose(), because application may call // close(OK) inside invoke(), while close(OK) is not allowed before onHalfClose(). this.request = request; + + // Request delivery of the next inbound message. + call.request(1); } else { call.close( Status.INVALID_ARGUMENT.withDescription( "More than one request payloads for unary call or server streaming call"), new Metadata.Trailers()); } - return null; } @Override @@ -99,17 +101,20 @@ public class ServerCalls { final StreamingRequestMethod method) { return new ServerCallHandler() { @Override - public ServerCall.Listener startCall(String fullMethodName, ServerCall call, - Metadata.Headers headers) { + public ServerCall.Listener startCall(String fullMethodName, + final ServerCall call, Metadata.Headers headers) { + call.request(1); final ResponseObserver responseObserver = new ResponseObserver(call); final StreamObserver requestObserver = method.invoke(responseObserver); return new EmptyServerCallListener() { boolean halfClosed = false; @Override - public ListenableFuture onPayload(ReqT request) { + public void onPayload(ReqT request) { requestObserver.onValue(request); - return null; + + // Request delivery of the next inbound message. + call.request(1); } @Override @@ -158,6 +163,9 @@ public class ServerCalls { throw Status.CANCELLED.asRuntimeException(); } call.sendPayload(response); + + // Request delivery of the next inbound message. + call.request(1); } @Override @@ -177,8 +185,7 @@ public class ServerCalls { private static class EmptyServerCallListener extends ServerCall.Listener { @Override - public ListenableFuture onPayload(ReqT request) { - return null; + public void onPayload(ReqT request) { } @Override