From 7902017b07d09ccff6d118965b9de804d37960b0 Mon Sep 17 00:00:00 2001 From: Louis Ryan Date: Fri, 11 Mar 2016 13:05:25 -0800 Subject: [PATCH] Add CallStreamObserver and ServerCallStreamObserver which allow for application level interaction with flow control while still allowing stubs to be used. Currently these APIs are only useful on the server side, a separate proposal will be made for adding client support An example of usage is in TestServiceImpl.streamingOutputCallManualFlowControl which listens for flow-control 'onReady' events and produces messages to the client. Added some unit testing for ServerCalls --- .../java/io/grpc/stub/CallStreamObserver.java | 102 +++++ .../grpc/stub/ServerCallStreamObserver.java | 59 +++ .../main/java/io/grpc/stub/ServerCalls.java | 97 ++++- .../java/io/grpc/stub/ServerCallsTest.java | 410 ++++++++++++++++++ 4 files changed, 661 insertions(+), 7 deletions(-) create mode 100644 stub/src/main/java/io/grpc/stub/CallStreamObserver.java create mode 100644 stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java create mode 100644 stub/src/test/java/io/grpc/stub/ServerCallsTest.java diff --git a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java new file mode 100644 index 0000000000..512d0903fa --- /dev/null +++ b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java @@ -0,0 +1,102 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.stub; + +import io.grpc.ExperimentalApi; + +/** + * A refinement of StreamObserver provided by the GRPC runtime to the application that allows for + * more complex interactions with call behavior. + * + *

In any call there are logically two {@link StreamObserver} implementations: + *

+ * + *

Implementations of this class represent the 'outbound' message stream. + * + */ +@ExperimentalApi +public abstract class CallStreamObserver implements StreamObserver { + + /** + * If {@code true} indicates that a call to {@link #onNext(Object)} will not require the entire + * message to be buffered before it is sent to the peer. + */ + public abstract boolean isReady(); + + /** + * Set a {@link Runnable} that will be executed every time the stream {@link #isReady()} state + * changes from {@code false} to {@code true}. While it is not guaranteed that the same + * thread will always be used to execute the {@link Runnable}, it is guaranteed that executions + * are serialized with calls to the 'inbound' {@link StreamObserver}. + * + *

Note that the handler may be called some time after {@link #isReady} has transitioned to + * true as other callbacks may still be executing in the 'inbound' observer. + * + * @param onReadyHandler to call when peer is ready to receive more messages. + */ + public abstract void setOnReadyHandler(Runnable onReadyHandler); + + /** + * Disables automatic flow control where a token is returned to the peer after a call + * to the 'inbound' {@link io.grpc.stub.StreamObserver#onNext(Object)} has completed. If disabled + * an application must make explicit calls to {@link #request} to receive messages. + * + *

Note that for cases where the runtime knows that only one inbound message is allowed + * calling this method will have no effect and the runtime will always permit one and only + * one message. This is true for: + *

+ *

+ */ + public abstract void disableAutoInboundFlowControl(); + + /** + * Requests the peer to produce {@code count} more messages to be delivered to the 'inbound' + * {@link StreamObserver}. + * @param count more messages + */ + public abstract void request(int count); +} diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java new file mode 100644 index 0000000000..2e87a5b36d --- /dev/null +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -0,0 +1,59 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.stub; + +import io.grpc.ExperimentalApi; + +/** + * A refinement of {@link CallStreamObserver} to allows for interaction with call + * cancellation events on the server side. + */ +@ExperimentalApi +public abstract class ServerCallStreamObserver extends CallStreamObserver { + + /** + * If {@code true} indicates that the call has been cancelled by the remote peer. + */ + public abstract boolean isCancelled(); + + /** + * Set a {@link Runnable} that will be called if the calls {@link #isCancelled()} state + * changes from {@code false} to {@code true}. It is guaranteed that execution of the + * {@link Runnable} are serialized with calls to the 'inbound' {@link StreamObserver}. + * + *

Note that the handler may be called some time after {@link #isCancelled} has transitioned to + * {@code true} as other callbacks may still be executing in the 'inbound' observer. + * + * @param onCancelHandler to call when client has cancelled the call. + */ + public abstract void setOnCancelHandler(Runnable onCancelHandler); +} diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index e01de42fa1..94c99970e0 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -128,9 +128,11 @@ public class ServerCalls { MethodDescriptor methodDescriptor, final ServerCall call, Metadata headers) { - final ResponseObserver responseObserver = new ResponseObserver(call); + final ServerCallStreamObserverImpl responseObserver = + new ServerCallStreamObserverImpl(call); // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client - // sends more than 1 requests, ServerCall will catch it. + // sends more than 1 requests, ServerCall will catch it. Note that disabling auto + // inbound flow control has no effect on unary calls. call.request(2); return new EmptyServerCallListener() { ReqT request; @@ -145,6 +147,12 @@ public class ServerCalls { public void onHalfClose() { if (request != null) { method.invoke(request, responseObserver); + responseObserver.freeze(); + if (call.isReady()) { + // Since we are calling invoke in halfClose we have missed the onReady + // event from the transport so recover it here. + onReady(); + } } else { call.close(Status.INVALID_ARGUMENT.withDescription("Half-closed without a request"), new Metadata()); @@ -154,6 +162,16 @@ public class ServerCalls { @Override public void onCancel() { responseObserver.cancelled = true; + if (responseObserver.onCancelHandler != null) { + responseObserver.onCancelHandler.run(); + } + } + + @Override + public void onReady() { + if (responseObserver.onReadyHandler != null) { + responseObserver.onReadyHandler.run(); + } } }; } @@ -173,9 +191,13 @@ public class ServerCalls { MethodDescriptor methodDescriptor, final ServerCall call, Metadata headers) { - call.request(1); - final ResponseObserver responseObserver = new ResponseObserver(call); + final ServerCallStreamObserverImpl responseObserver = + new ServerCallStreamObserverImpl(call); final StreamObserver requestObserver = method.invoke(responseObserver); + responseObserver.freeze(); + if (responseObserver.autoFlowControlEnabled) { + call.request(1); + } return new EmptyServerCallListener() { boolean halfClosed = false; @@ -184,7 +206,9 @@ public class ServerCalls { requestObserver.onNext(request); // Request delivery of the next inbound message. - call.request(1); + if (responseObserver.autoFlowControlEnabled) { + call.request(1); + } } @Override @@ -196,10 +220,20 @@ public class ServerCalls { @Override public void onCancel() { responseObserver.cancelled = true; + if (responseObserver.onCancelHandler != null) { + responseObserver.onCancelHandler.run(); + } if (!halfClosed) { requestObserver.onError(Status.CANCELLED.asException()); } } + + @Override + public void onReady() { + if (responseObserver.onReadyHandler != null) { + responseObserver.onReadyHandler.run(); + } + } }; } }; @@ -213,15 +247,24 @@ public class ServerCalls { StreamObserver invoke(StreamObserver responseObserver); } - private static class ResponseObserver implements StreamObserver { + private static class ServerCallStreamObserverImpl + extends ServerCallStreamObserver { final ServerCall call; volatile boolean cancelled; + private boolean frozen; + private boolean autoFlowControlEnabled = true; private boolean sentHeaders; + private Runnable onReadyHandler; + private Runnable onCancelHandler; - ResponseObserver(ServerCall call) { + ServerCallStreamObserverImpl(ServerCall call) { this.call = call; } + private final void freeze() { + this.frozen = true; + } + @Override public void onNext(RespT response) { if (cancelled) { @@ -247,6 +290,46 @@ public class ServerCalls { call.close(Status.OK, new Metadata()); } } + + @Override + public boolean isReady() { + return call.isReady(); + } + + @Override + public void setOnReadyHandler(Runnable r) { + if (frozen) { + throw new IllegalStateException("Cannot alter onReadyHandler after initialization"); + } + this.onReadyHandler = r; + } + + @Override + public boolean isCancelled() { + return call.isCancelled(); + } + + @Override + public void setOnCancelHandler(Runnable onCancelHandler) { + if (frozen) { + throw new IllegalStateException("Cannot alter onCancelHandler after initialization"); + } + this.onCancelHandler = onCancelHandler; + } + + @Override + public void disableAutoInboundFlowControl() { + if (frozen) { + throw new IllegalStateException("Cannot disable auto flow control after initialization"); + } else { + autoFlowControlEnabled = false; + } + } + + @Override + public void request(int count) { + call.request(count); + } } private static class EmptyServerCallListener extends ServerCall.Listener { diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java new file mode 100644 index 0000000000..f95623077e --- /dev/null +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -0,0 +1,410 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.stub; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.times; + +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerServiceDefinition; +import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.ManagedChannelImpl; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + + +/** + * Tests for {@link ServerCalls}. + */ +@RunWith(JUnit4.class) +public class ServerCallsTest { + static final MethodDescriptor STREAMING_METHOD = MethodDescriptor.create( + MethodDescriptor.MethodType.BIDI_STREAMING, + "some/method", + new IntegerMarshaller(), new IntegerMarshaller()); + + static final MethodDescriptor UNARY_METHOD = MethodDescriptor.create( + MethodDescriptor.MethodType.UNARY, + "some/unarymethod", + new IntegerMarshaller(), new IntegerMarshaller()); + + @Mock + ServerCall serverCall; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + } + + @Test + public void runtimeStreamObserverIsServerCallStreamObserver() throws Exception { + final AtomicBoolean invokeCalled = new AtomicBoolean(); + final AtomicBoolean onCancelCalled = new AtomicBoolean(); + final AtomicBoolean onReadyCalled = new AtomicBoolean(); + final AtomicReference> callObserver = + new AtomicReference>(); + ServerCallHandler callHandler = + ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + assertTrue(responseObserver instanceof ServerCallStreamObserver); + ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + callObserver.set(serverCallObserver); + serverCallObserver.setOnCancelHandler(new Runnable() { + @Override + public void run() { + onCancelCalled.set(true); + } + }); + serverCallObserver.setOnReadyHandler(new Runnable() { + @Override + public void run() { + onReadyCalled.set(true); + } + }); + invokeCalled.set(true); + return new NoOpStreamObserver(); + } + }); + ServerCall.Listener callListener = + callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata()); + Mockito.when(serverCall.isReady()).thenReturn(true).thenReturn(false); + Mockito.when(serverCall.isCancelled()).thenReturn(false).thenReturn(true); + assertTrue(callObserver.get().isReady()); + assertFalse(callObserver.get().isCancelled()); + callListener.onReady(); + callListener.onMessage(1); + callListener.onCancel(); + assertTrue(invokeCalled.get()); + assertTrue(onReadyCalled.get()); + assertTrue(onCancelCalled.get()); + assertFalse(callObserver.get().isReady()); + assertTrue(callObserver.get().isCancelled()); + // Is called twice, once to permit the first message and once again after the first message + // has been processed (auto flow control) + Mockito.verify(serverCall, times(2)).request(1); + } + + @Test + public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception { + final AtomicReference> callObserver = + new AtomicReference>(); + ServerCallHandler callHandler = + ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + callObserver.set((ServerCallStreamObserver) responseObserver); + return new NoOpStreamObserver(); + } + }); + ServerCall.Listener callListener = + callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata()); + callListener.onMessage(1); + try { + callObserver.get().setOnCancelHandler(new Runnable() { + @Override + public void run() { + } + }); + fail("Cannot set onCancel handler after service invocation"); + } catch (IllegalStateException expected) { + // Expected + } + } + + @Test + public void cannotSetOnReadyHandlerAfterServiceInvocation() throws Exception { + final AtomicReference> callObserver = + new AtomicReference>(); + ServerCallHandler callHandler = + ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + callObserver.set((ServerCallStreamObserver) responseObserver); + return new NoOpStreamObserver(); + } + }); + ServerCall.Listener callListener = + callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata()); + callListener.onMessage(1); + try { + callObserver.get().setOnReadyHandler(new Runnable() { + @Override + public void run() { + } + }); + fail("Cannot set onReady after service invocation"); + } catch (IllegalStateException expected) { + // Expected + } + } + + @Test + public void cannotDisableAutoFlowControlAfterServiceInvocation() throws Exception { + final AtomicReference> callObserver = + new AtomicReference>(); + ServerCallHandler callHandler = + ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + callObserver.set((ServerCallStreamObserver) responseObserver); + return new NoOpStreamObserver(); + } + }); + ServerCall.Listener callListener = + callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata()); + callListener.onMessage(1); + try { + callObserver.get().disableAutoInboundFlowControl(); + fail("Cannot set onCancel handler after service invocation"); + } catch (IllegalStateException expected) { + // Expected + } + } + + @Test + public void disablingInboundAutoFlowControlSuppressesRequestsForMoreMessages() throws Exception { + ServerCallHandler callHandler = + ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + serverCallObserver.disableAutoInboundFlowControl(); + return new NoOpStreamObserver(); + } + }); + ServerCall.Listener callListener = + callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata()); + callListener.onReady(); + // Transport should not call this if nothing has been requested but forcing it here + // to verify that message delivery does not trigger a call to request(1). + callListener.onMessage(1); + // Should never be called + Mockito.verify(serverCall, times(0)).request(1); + } + + @Test + public void disablingInboundAutoFlowControlForUnaryHasNoEffect() throws Exception { + ServerCallHandler callHandler = + ServerCalls.asyncUnaryCall( + new ServerCalls.UnaryMethod() { + @Override + public void invoke(Integer req, StreamObserver responseObserver) { + ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + serverCallObserver.disableAutoInboundFlowControl(); + } + }); + ServerCall.Listener callListener = + callHandler.startCall(UNARY_METHOD, serverCall, new Metadata()); + // Auto inbound flow-control always requests 2 messages for unary to detect a violation + // of the unary semantic. + Mockito.verify(serverCall, times(1)).request(2); + } + + @Test + public void onReadyHandlerCalledForUnaryRequest() throws Exception { + final AtomicInteger onReadyCalled = new AtomicInteger(); + final AtomicReference> callObserver = + new AtomicReference>(); + ServerCallHandler callHandler = + ServerCalls.asyncServerStreamingCall( + new ServerCalls.ServerStreamingMethod() { + @Override + public void invoke(Integer req, StreamObserver responseObserver) { + ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + serverCallObserver.setOnReadyHandler(new Runnable() { + @Override + public void run() { + onReadyCalled.incrementAndGet(); + } + }); + } + }); + ServerCall.Listener callListener = + callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata()); + Mockito.when(serverCall.isReady()).thenReturn(true).thenReturn(false); + Mockito.when(serverCall.isCancelled()).thenReturn(false).thenReturn(true); + callListener.onReady(); + // On ready is not called until the unary request message is delivered + assertEquals(0, onReadyCalled.get()); + // delivering the message doesn't trigger onReady listener either + callListener.onMessage(1); + assertEquals(0, onReadyCalled.get()); + // half-closing triggers the unary request delivery and onReady + callListener.onHalfClose(); + assertEquals(1, onReadyCalled.get()); + // Next on ready event from the transport triggers listener + callListener.onReady(); + assertEquals(2, onReadyCalled.get()); + } + + @Test + public void inprocessTransportManualFlow() throws Exception { + final Semaphore semaphore = new Semaphore(1); + ServerServiceDefinition service = ServerServiceDefinition.builder("some") + .addMethod(STREAMING_METHOD, ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + int iteration; + + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + final ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + serverCallObserver.setOnReadyHandler(new Runnable() { + @Override + public void run() { + while (serverCallObserver.isReady()) { + serverCallObserver.onNext(iteration); + } + iteration++; + semaphore.release(); + } + }); + return new NoOpStreamObserver() { + @Override + public void onCompleted() { + serverCallObserver.onCompleted(); + } + }; + } + })) + .build(); + long tag = System.nanoTime(); + InProcessServerBuilder.forName("go-with-the-flow" + tag).addService(service).build().start(); + ManagedChannelImpl channel = InProcessChannelBuilder.forName("go-with-the-flow" + tag).build(); + final ClientCall clientCall = channel.newCall(STREAMING_METHOD, + CallOptions.DEFAULT); + final CountDownLatch latch = new CountDownLatch(1); + final int[] receivedMessages = new int[6]; + clientCall.start(new ClientCall.Listener() { + int index; + + @Override + public void onMessage(Integer message) { + receivedMessages[index++] = message; + } + + @Override + public void onClose(Status status, Metadata trailers) { + latch.countDown(); + } + }, new Metadata()); + semaphore.acquire(); + clientCall.request(1); + semaphore.acquire(); + clientCall.request(2); + semaphore.acquire(); + clientCall.request(3); + clientCall.halfClose(); + latch.await(5, TimeUnit.SECONDS); + // Very that number of messages produced in each onReady handler call matches the number + // requested by the client. + assertArrayEquals(new int[]{0, 1, 1, 2, 2, 2}, receivedMessages); + } + + private static class NoOpStreamObserver implements StreamObserver { + @Override + public void onNext(Integer value) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + } + } + + public static class IntegerMarshaller implements MethodDescriptor.Marshaller { + @Override + public InputStream stream(Integer value) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(4); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeInt(value); + return new ByteArrayInputStream(baos.toByteArray()); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + @Override + public Integer parse(InputStream stream) { + try { + DataInputStream dis = new DataInputStream(stream); + return dis.readInt(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + } +}