From b681a07f6cb7d6e19239b10d87220f0049d9efe2 Mon Sep 17 00:00:00 2001 From: Louis Ryan Date: Thu, 19 May 2016 13:27:52 -0700 Subject: [PATCH] Introduce ClientResponseObsever to capture call initiation and to allow for capture of the CallStreamObserver --- .../grpc/stub/ClientCallStreamObserver.java | 42 +++ .../main/java/io/grpc/stub/ClientCalls.java | 80 ++++- .../io/grpc/stub/ClientResponseObserver.java | 60 ++++ .../java/io/grpc/stub/StreamObservers.java | 90 +++++ .../java/io/grpc/stub/ClientCallsTest.java | 320 +++++++++++++++++- 5 files changed, 576 insertions(+), 16 deletions(-) create mode 100644 stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java create mode 100644 stub/src/main/java/io/grpc/stub/ClientResponseObserver.java create mode 100644 stub/src/main/java/io/grpc/stub/StreamObservers.java diff --git a/stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java new file mode 100644 index 0000000000..4dfb41e7b6 --- /dev/null +++ b/stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java @@ -0,0 +1,42 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.stub; + +import io.grpc.ExperimentalApi; + +/** + * A refinement of {@link CallStreamObserver} that allows for lower-level interaction with + * client calls. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1788") +public abstract class ClientCallStreamObserver extends CallStreamObserver { +} diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index ba2f1fff0c..5296497635 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -213,7 +213,9 @@ public class ClientCalls { ClientCall call, ReqT param, StreamObserver responseObserver, boolean streamingResponse) { asyncUnaryRequestCall(call, param, - new StreamObserverToCallListenerAdapter(call, responseObserver, streamingResponse), + new StreamObserverToCallListenerAdapter(call, responseObserver, + new CallToStreamObserverAdapter(call), + streamingResponse), streamingResponse); } @@ -235,9 +237,10 @@ public class ClientCalls { private static StreamObserver asyncStreamingRequestCall( ClientCall call, StreamObserver responseObserver, boolean streamingResponse) { - startCall(call, new StreamObserverToCallListenerAdapter( - call, responseObserver, streamingResponse), streamingResponse); - return new CallToStreamObserverAdapter(call); + CallToStreamObserverAdapter adapter = new CallToStreamObserverAdapter(call); + startCall(call, new StreamObserverToCallListenerAdapter( + call, responseObserver, adapter, streamingResponse), streamingResponse); + return adapter; } private static void startCall(ClientCall call, @@ -252,13 +255,20 @@ public class ClientCalls { } } - private static class CallToStreamObserverAdapter implements StreamObserver { + private static class CallToStreamObserverAdapter extends ClientCallStreamObserver { + private boolean frozen; private final ClientCall call; + private Runnable onReadyHandler; + private boolean autoFlowControlEnabled = true; public CallToStreamObserverAdapter(ClientCall call) { this.call = call; } + private void freeze() { + this.frozen = true; + } + @Override public void onNext(T value) { call.sendMessage(value); @@ -273,20 +283,63 @@ public class ClientCalls { public void onCompleted() { call.halfClose(); } + + @Override + public boolean isReady() { + return call.isReady(); + } + + @Override + public void setOnReadyHandler(Runnable onReadyHandler) { + if (frozen) { + throw new IllegalStateException("Cannot alter onReadyHandler after call started"); + } + this.onReadyHandler = onReadyHandler; + } + + @Override + public void disableAutoInboundFlowControl() { + if (frozen) { + throw new IllegalStateException("Cannot disable auto flow control call started"); + } + autoFlowControlEnabled = false; + } + + @Override + public void request(int count) { + call.request(count); + } + + @Override + public void setMessageCompression(boolean enable) { + call.setMessageCompression(enable); + } } - private static class StreamObserverToCallListenerAdapter + private static class StreamObserverToCallListenerAdapter extends ClientCall.Listener { - private final ClientCall call; + private final ClientCall call; private final StreamObserver observer; + private final CallToStreamObserverAdapter adapter; private final boolean streamingResponse; private boolean firstResponseReceived; - public StreamObserverToCallListenerAdapter( - ClientCall call, StreamObserver observer, boolean streamingResponse) { + StreamObserverToCallListenerAdapter( + ClientCall call, + StreamObserver observer, + CallToStreamObserverAdapter adapter, + boolean streamingResponse) { this.call = call; this.observer = observer; this.streamingResponse = streamingResponse; + this.adapter = adapter; + if (observer instanceof ClientResponseObserver) { + @SuppressWarnings("unchecked") + ClientResponseObserver clientResponseObserver = + (ClientResponseObserver) observer; + clientResponseObserver.beforeStart(adapter); + } + adapter.freeze(); } @Override @@ -303,7 +356,7 @@ public class ClientCalls { firstResponseReceived = true; observer.onNext(message); - if (streamingResponse) { + if (streamingResponse && adapter.autoFlowControlEnabled) { // Request delivery of the next inbound message. call.request(1); } @@ -317,6 +370,13 @@ public class ClientCalls { observer.onError(status.asRuntimeException(trailers)); } } + + @Override + public void onReady() { + if (adapter.onReadyHandler != null) { + adapter.onReadyHandler.run(); + } + } } /** diff --git a/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java b/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java new file mode 100644 index 0000000000..0238bb2f2f --- /dev/null +++ b/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java @@ -0,0 +1,60 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.stub; + +import io.grpc.ExperimentalApi; + +/** + * Specialization of {@link StreamObserver} implemented by clients in order to interact with the + * advanced features of a call such as flow-control. + */ +@ExperimentalApi +public interface ClientResponseObserver extends StreamObserver { + /** + * Called by the runtime priot to the start of a call to provide a reference to the + * {@link ClientCallStreamObserver} for the outbound stream. This can be used to listen to + * onReady events, disable auto inbound flow and perform other advanced functions. + * + *

Only the methods {@link ClientCallStreamObserver#setOnReadyHandler(Runnable)} and + * {@link ClientCallStreamObserver#disableAutoInboundFlowControl()} may be called within this + * callback + * + *

+   *   // Copy an iterator to the request stream under flow-control
+   *   someStub.fullDuplexCall(new ClientResponseObserver<ReqT, RespT>() {
+   *     public void beforeStart(final ClientCallStreamObserver<Req> requestStream) {
+   *       StreamObservers.copyWithFlowControl(someIterator, requestStream);
+   *   });
+   * 
+ */ + void beforeStart(final ClientCallStreamObserver requestStream); +} diff --git a/stub/src/main/java/io/grpc/stub/StreamObservers.java b/stub/src/main/java/io/grpc/stub/StreamObservers.java new file mode 100644 index 0000000000..37cb0a0d10 --- /dev/null +++ b/stub/src/main/java/io/grpc/stub/StreamObservers.java @@ -0,0 +1,90 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.stub; + +import com.google.common.base.Preconditions; + +import io.grpc.ExperimentalApi; + +import java.util.Iterator; + +/** + * Utility functions for working with {@link StreamObserver} and it's common subclasses like + * {@link CallStreamObserver}. + */ +@ExperimentalApi +public class StreamObservers { + /** + * Copy the values of an {@link Iterator} to the target {@link CallStreamObserver} while properly + * accounting for outbound flow-control. + * + *

For clients this method is safe to call inside {@link ClientResponseObserver#beforeStart}, + * on servers it is safe to call inside the service method implementation. + *

+ * + * @param source of values expressed as an {@link Iterator}. + * @param target {@link CallStreamObserver} which accepts values from the source. + */ + public static void copyWithFlowControl(final Iterator source, + final CallStreamObserver target) { + Preconditions.checkNotNull(source); + Preconditions.checkNotNull(target); + target.setOnReadyHandler(new Runnable() { + @Override + public void run() { + while (target.isReady() && source.hasNext()) { + target.onNext(source.next()); + } + if (!source.hasNext()) { + target.onCompleted(); + } + } + }); + } + + /** + * Copy the values of an {@link Iterable} to the target {@link CallStreamObserver} while properly + * accounting for outbound flow-control. + * + *

For clients this method is safe to call inside {@link ClientResponseObserver#beforeStart}, + * on servers it is safe to call inside the service method implementation. + *

+ * + * @param source of values expressed as an {@link Iterable}. + * @param target {@link CallStreamObserver} which accepts values from the source. + */ + public static void copyWithFlowControl(final Iterable source, + CallStreamObserver target) { + Preconditions.checkNotNull(source); + copyWithFlowControl(source.iterator(), target); + } +} diff --git a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java index a0b6716320..dde993ea46 100644 --- a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java @@ -31,17 +31,28 @@ package io.grpc.stub; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.google.common.util.concurrent.ListenableFuture; +import io.grpc.CallOptions; import io.grpc.ClientCall; import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.ServerServiceDefinition; +import io.grpc.ServiceDescriptor; import io.grpc.Status; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.ManagedChannelImpl; +import io.grpc.stub.ServerCalls.NoopStreamObserver; +import io.grpc.stub.ServerCallsTest.IntegerMarshaller; import org.junit.Before; import org.junit.Test; @@ -53,7 +64,10 @@ import org.mockito.MockitoAnnotations; import java.util.Iterator; import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; /** * Unit tests for {@link ClientCalls}. @@ -61,13 +75,21 @@ import java.util.concurrent.ExecutionException; @RunWith(JUnit4.class) public class ClientCallsTest { - @Mock private ClientCall call; + static final MethodDescriptor STREAMING_METHOD = MethodDescriptor.create( + MethodDescriptor.MethodType.BIDI_STREAMING, + "some/method", + new IntegerMarshaller(), new IntegerMarshaller()); - @Before public void setUp() { + @Mock + private ClientCall call; + + @Before + public void setUp() { MockitoAnnotations.initMocks(this); } - @Test public void unaryFutureCallSuccess() throws Exception { + @Test + public void unaryFutureCallSuccess() throws Exception { Integer req = 2; ListenableFuture future = ClientCalls.futureUnaryCall(call, req); ArgumentCaptor> listenerCaptor = ArgumentCaptor.forClass(null); @@ -80,7 +102,8 @@ public class ClientCallsTest { assertEquals("bar", future.get()); } - @Test public void unaryFutureCallFailed() throws Exception { + @Test + public void unaryFutureCallFailed() throws Exception { Integer req = 2; ListenableFuture future = ClientCalls.futureUnaryCall(call, req); ArgumentCaptor> listenerCaptor = ArgumentCaptor.forClass(null); @@ -99,7 +122,8 @@ public class ClientCallsTest { } } - @Test public void unaryFutureCallCancelled() throws Exception { + @Test + public void unaryFutureCallCancelled() throws Exception { Integer req = 2; ListenableFuture future = ClientCalls.futureUnaryCall(call, req); ArgumentCaptor> listenerCaptor = ArgumentCaptor.forClass(null); @@ -117,7 +141,291 @@ public class ClientCallsTest { } } - @Test public void blockingResponseStreamFailed() throws Exception { + @Test + public void cannotSetOnReadyAfterCallStarted() throws Exception { + CallStreamObserver callStreamObserver = + (CallStreamObserver) ClientCalls.asyncClientStreamingCall(call, + new NoopStreamObserver()); + Runnable noOpRunnable = new Runnable() { + @Override + public void run() { + } + }; + try { + callStreamObserver.setOnReadyHandler(noOpRunnable); + fail("Should not be able to set handler after call started"); + } catch (IllegalStateException ise) { + // expected + } + } + + @Test + public void disablingInboundAutoFlowControlSuppressesRequestsForMoreMessages() + throws Exception { + ArgumentCaptor> listenerCaptor = ArgumentCaptor.forClass(null); + CallStreamObserver requestObserver = + (CallStreamObserver) + ClientCalls.asyncBidiStreamingCall(call, new ClientResponseObserver() { + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + requestStream.disableAutoInboundFlowControl(); + } + + @Override + public void onNext(String value) { + + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onCompleted() { + + } + }); + verify(call).start(listenerCaptor.capture(), any(Metadata.class)); + listenerCaptor.getValue().onMessage("message"); + verify(call, times(1)).request(1); + } + + @Test + public void callStreamObserverPropagatesFlowControlRequestsToCall() + throws Exception { + ArgumentCaptor> listenerCaptor = ArgumentCaptor.forClass(null); + ClientResponseObserver responseObserver = + new ClientResponseObserver() { + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + requestStream.disableAutoInboundFlowControl(); + } + + @Override + public void onNext(String value) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + } + }; + CallStreamObserver requestObserver = + (CallStreamObserver) + ClientCalls.asyncBidiStreamingCall(call, responseObserver); + verify(call).start(listenerCaptor.capture(), any(Metadata.class)); + listenerCaptor.getValue().onMessage("message"); + requestObserver.request(5); + verify(call, times(1)).request(5); + } + + @Test + public void canCaptureInboundFlowControlForServerStreamingObserver() + throws Exception { + ArgumentCaptor> listenerCaptor = ArgumentCaptor.forClass(null); + ClientResponseObserver responseObserver = + new ClientResponseObserver() { + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + requestStream.disableAutoInboundFlowControl(); + requestStream.request(5); + } + + @Override + public void onNext(String value) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + } + }; + ClientCalls.asyncServerStreamingCall(call, 1, responseObserver); + verify(call).start(listenerCaptor.capture(), any(Metadata.class)); + listenerCaptor.getValue().onMessage("message"); + verify(call, times(1)).request(1); + verify(call, times(1)).request(5); + } + + @Test + public void inprocessTransportInboundFlowControl() throws Exception { + final Semaphore semaphore = new Semaphore(1); + ServerServiceDefinition service = ServerServiceDefinition.builder( + new ServiceDescriptor("some", STREAMING_METHOD)) + .addMethod(STREAMING_METHOD, ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + int iteration; + + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + final ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + serverCallObserver.setOnReadyHandler(new Runnable() { + @Override + public void run() { + while (serverCallObserver.isReady()) { + serverCallObserver.onNext(iteration); + } + iteration++; + semaphore.release(); + } + }); + return new ServerCalls.NoopStreamObserver() { + @Override + public void onCompleted() { + serverCallObserver.onCompleted(); + } + }; + } + })) + .build(); + long tag = System.nanoTime(); + InProcessServerBuilder.forName("go-with-the-flow" + tag).addService(service).build().start(); + ManagedChannelImpl channel = InProcessChannelBuilder.forName("go-with-the-flow" + tag).build(); + final ClientCall clientCall = channel.newCall(STREAMING_METHOD, + CallOptions.DEFAULT); + final CountDownLatch latch = new CountDownLatch(1); + final int[] receivedMessages = new int[6]; + semaphore.acquire(); + + ClientResponseObserver responseObserver = + new ClientResponseObserver() { + int index; + @Override + public void beforeStart(final ClientCallStreamObserver requestStream) { + requestStream.disableAutoInboundFlowControl(); + } + + @Override + public void onNext(Integer value) { + receivedMessages[index++] = value; + } + + @Override + public void onError(Throwable t) { + latch.countDown(); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + }; + + CallStreamObserver integerStreamObserver = (CallStreamObserver) + ClientCalls.asyncBidiStreamingCall(clientCall, responseObserver); + semaphore.acquire(); + integerStreamObserver.request(2); + semaphore.acquire(); + integerStreamObserver.request(3); + integerStreamObserver.onCompleted(); + latch.await(5, TimeUnit.SECONDS); + // Very that number of messages produced in each onReady handler call matches the number + // requested by the client. Note that ClientCalls.asyncBidiStreamingCall will request(1) + + assertArrayEquals(new int[]{0, 1, 1, 2, 2, 2}, receivedMessages); + } + + @Test + public void inprocessTransportOutboundFlowControl() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final Semaphore semaphore = new Semaphore(1); + final int[] receivedMessages = new int[6]; + ServerServiceDefinition service = ServerServiceDefinition.builder( + new ServiceDescriptor("some", STREAMING_METHOD)) + .addMethod(STREAMING_METHOD, ServerCalls.asyncBidiStreamingCall( + new ServerCalls.BidiStreamingMethod() { + @Override + public StreamObserver invoke(StreamObserver responseObserver) { + final ServerCallStreamObserver serverCallObserver = + (ServerCallStreamObserver) responseObserver; + serverCallObserver.disableAutoInboundFlowControl(); + new Thread(new Runnable() { + @Override + public void run() { + try { + serverCallObserver.request(1); + semaphore.acquire(); + serverCallObserver.request(2); + semaphore.acquire(); + serverCallObserver.request(3); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + }).start(); + return new ServerCalls.NoopStreamObserver() { + int index; + @Override + public void onNext(Integer value) { + receivedMessages[index++] = value; + } + + @Override + public void onCompleted() { + serverCallObserver.onCompleted(); + latch.countDown(); + } + }; + } + })) + .build(); + long tag = System.nanoTime(); + InProcessServerBuilder.forName("go-with-the-flow" + tag).addService(service).build().start(); + ManagedChannelImpl channel = InProcessChannelBuilder.forName("go-with-the-flow" + tag).build(); + final ClientCall clientCall = channel.newCall(STREAMING_METHOD, + CallOptions.DEFAULT); + semaphore.acquire(); + + ClientResponseObserver responseObserver = + new ClientResponseObserver() { + @Override + public void beforeStart(final ClientCallStreamObserver requestStream) { + requestStream.setOnReadyHandler(new Runnable() { + int iteration; + @Override + public void run() { + while (requestStream.isReady()) { + requestStream.onNext(iteration); + } + iteration++; + if (iteration == 3) { + requestStream.onCompleted(); + } + semaphore.release(); + } + }); + } + + @Override + public void onNext(Integer value) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + } + }; + + ClientCalls.asyncBidiStreamingCall(clientCall, responseObserver); + latch.await(5, TimeUnit.SECONDS); + // Very that number of messages produced in each onReady handler call matches the number + // requested by the client. + assertArrayEquals(new int[]{0, 1, 1, 2, 2, 2}, receivedMessages); + } + + @Test + public void blockingResponseStreamFailed() throws Exception { Integer req = 2; Iterator iter = ClientCalls.blockingServerStreamingCall(call, req); ArgumentCaptor> listenerCaptor = ArgumentCaptor.forClass(null);