Introduce ClientResponseObsever to capture call initiation and to allow

for capture of the CallStreamObserver
This commit is contained in:
Louis Ryan 2016-05-19 13:27:52 -07:00
parent 26bace63e5
commit b681a07f6c
5 changed files with 576 additions and 16 deletions

View File

@ -0,0 +1,42 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.stub;
import io.grpc.ExperimentalApi;
/**
* A refinement of {@link CallStreamObserver} that allows for lower-level interaction with
* client calls.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1788")
public abstract class ClientCallStreamObserver<V> extends CallStreamObserver<V> {
}

View File

@ -213,7 +213,9 @@ public class ClientCalls {
ClientCall<ReqT, RespT> call, ReqT param, StreamObserver<RespT> responseObserver,
boolean streamingResponse) {
asyncUnaryRequestCall(call, param,
new StreamObserverToCallListenerAdapter<RespT>(call, responseObserver, streamingResponse),
new StreamObserverToCallListenerAdapter<ReqT, RespT>(call, responseObserver,
new CallToStreamObserverAdapter<ReqT>(call),
streamingResponse),
streamingResponse);
}
@ -235,9 +237,10 @@ public class ClientCalls {
private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver,
boolean streamingResponse) {
startCall(call, new StreamObserverToCallListenerAdapter<RespT>(
call, responseObserver, streamingResponse), streamingResponse);
return new CallToStreamObserverAdapter<ReqT>(call);
CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<ReqT>(call);
startCall(call, new StreamObserverToCallListenerAdapter<ReqT, RespT>(
call, responseObserver, adapter, streamingResponse), streamingResponse);
return adapter;
}
private static <ReqT, RespT> void startCall(ClientCall<ReqT, RespT> call,
@ -252,13 +255,20 @@ public class ClientCalls {
}
}
private static class CallToStreamObserverAdapter<T> implements StreamObserver<T> {
private static class CallToStreamObserverAdapter<T> extends ClientCallStreamObserver<T> {
private boolean frozen;
private final ClientCall<T, ?> call;
private Runnable onReadyHandler;
private boolean autoFlowControlEnabled = true;
public CallToStreamObserverAdapter(ClientCall<T, ?> call) {
this.call = call;
}
private void freeze() {
this.frozen = true;
}
@Override
public void onNext(T value) {
call.sendMessage(value);
@ -273,20 +283,63 @@ public class ClientCalls {
public void onCompleted() {
call.halfClose();
}
@Override
public boolean isReady() {
return call.isReady();
}
@Override
public void setOnReadyHandler(Runnable onReadyHandler) {
if (frozen) {
throw new IllegalStateException("Cannot alter onReadyHandler after call started");
}
this.onReadyHandler = onReadyHandler;
}
@Override
public void disableAutoInboundFlowControl() {
if (frozen) {
throw new IllegalStateException("Cannot disable auto flow control call started");
}
autoFlowControlEnabled = false;
}
@Override
public void request(int count) {
call.request(count);
}
@Override
public void setMessageCompression(boolean enable) {
call.setMessageCompression(enable);
}
}
private static class StreamObserverToCallListenerAdapter<RespT>
private static class StreamObserverToCallListenerAdapter<ReqT, RespT>
extends ClientCall.Listener<RespT> {
private final ClientCall<?, RespT> call;
private final ClientCall<ReqT, RespT> call;
private final StreamObserver<RespT> observer;
private final CallToStreamObserverAdapter<ReqT> adapter;
private final boolean streamingResponse;
private boolean firstResponseReceived;
public StreamObserverToCallListenerAdapter(
ClientCall<?, RespT> call, StreamObserver<RespT> observer, boolean streamingResponse) {
StreamObserverToCallListenerAdapter(
ClientCall<ReqT, RespT> call,
StreamObserver<RespT> observer,
CallToStreamObserverAdapter<ReqT> adapter,
boolean streamingResponse) {
this.call = call;
this.observer = observer;
this.streamingResponse = streamingResponse;
this.adapter = adapter;
if (observer instanceof ClientResponseObserver) {
@SuppressWarnings("unchecked")
ClientResponseObserver<ReqT, RespT> clientResponseObserver =
(ClientResponseObserver<ReqT, RespT>) observer;
clientResponseObserver.beforeStart(adapter);
}
adapter.freeze();
}
@Override
@ -303,7 +356,7 @@ public class ClientCalls {
firstResponseReceived = true;
observer.onNext(message);
if (streamingResponse) {
if (streamingResponse && adapter.autoFlowControlEnabled) {
// Request delivery of the next inbound message.
call.request(1);
}
@ -317,6 +370,13 @@ public class ClientCalls {
observer.onError(status.asRuntimeException(trailers));
}
}
@Override
public void onReady() {
if (adapter.onReadyHandler != null) {
adapter.onReadyHandler.run();
}
}
}
/**

View File

@ -0,0 +1,60 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.stub;
import io.grpc.ExperimentalApi;
/**
* Specialization of {@link StreamObserver} implemented by clients in order to interact with the
* advanced features of a call such as flow-control.
*/
@ExperimentalApi
public interface ClientResponseObserver<ReqT, RespT> extends StreamObserver<RespT> {
/**
* Called by the runtime priot to the start of a call to provide a reference to the
* {@link ClientCallStreamObserver} for the outbound stream. This can be used to listen to
* onReady events, disable auto inbound flow and perform other advanced functions.
*
* <p>Only the methods {@link ClientCallStreamObserver#setOnReadyHandler(Runnable)} and
* {@link ClientCallStreamObserver#disableAutoInboundFlowControl()} may be called within this
* callback
*
* <pre>
* // Copy an iterator to the request stream under flow-control
* someStub.fullDuplexCall(new ClientResponseObserver&lt;ReqT, RespT&gt;() {
* public void beforeStart(final ClientCallStreamObserver&lt;Req&gt; requestStream) {
* StreamObservers.copyWithFlowControl(someIterator, requestStream);
* });
* </pre>
*/
void beforeStart(final ClientCallStreamObserver<ReqT> requestStream);
}

View File

@ -0,0 +1,90 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.stub;
import com.google.common.base.Preconditions;
import io.grpc.ExperimentalApi;
import java.util.Iterator;
/**
* Utility functions for working with {@link StreamObserver} and it's common subclasses like
* {@link CallStreamObserver}.
*/
@ExperimentalApi
public class StreamObservers {
/**
* Copy the values of an {@link Iterator} to the target {@link CallStreamObserver} while properly
* accounting for outbound flow-control.
*
* <p>For clients this method is safe to call inside {@link ClientResponseObserver#beforeStart},
* on servers it is safe to call inside the service method implementation.
* </p>
*
* @param source of values expressed as an {@link Iterator}.
* @param target {@link CallStreamObserver} which accepts values from the source.
*/
public static <V> void copyWithFlowControl(final Iterator<V> source,
final CallStreamObserver<V> target) {
Preconditions.checkNotNull(source);
Preconditions.checkNotNull(target);
target.setOnReadyHandler(new Runnable() {
@Override
public void run() {
while (target.isReady() && source.hasNext()) {
target.onNext(source.next());
}
if (!source.hasNext()) {
target.onCompleted();
}
}
});
}
/**
* Copy the values of an {@link Iterable} to the target {@link CallStreamObserver} while properly
* accounting for outbound flow-control.
*
* <p>For clients this method is safe to call inside {@link ClientResponseObserver#beforeStart},
* on servers it is safe to call inside the service method implementation.
* </p>
*
* @param source of values expressed as an {@link Iterable}.
* @param target {@link CallStreamObserver} which accepts values from the source.
*/
public static <V> void copyWithFlowControl(final Iterable<V> source,
CallStreamObserver<V> target) {
Preconditions.checkNotNull(source);
copyWithFlowControl(source.iterator(), target);
}
}

View File

@ -31,17 +31,28 @@
package io.grpc.stub;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServiceDescriptor;
import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.ManagedChannelImpl;
import io.grpc.stub.ServerCalls.NoopStreamObserver;
import io.grpc.stub.ServerCallsTest.IntegerMarshaller;
import org.junit.Before;
import org.junit.Test;
@ -53,7 +64,10 @@ import org.mockito.MockitoAnnotations;
import java.util.Iterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Unit tests for {@link ClientCalls}.
@ -61,13 +75,21 @@ import java.util.concurrent.ExecutionException;
@RunWith(JUnit4.class)
public class ClientCallsTest {
@Mock private ClientCall<Integer, String> call;
static final MethodDescriptor<Integer, Integer> STREAMING_METHOD = MethodDescriptor.create(
MethodDescriptor.MethodType.BIDI_STREAMING,
"some/method",
new IntegerMarshaller(), new IntegerMarshaller());
@Before public void setUp() {
@Mock
private ClientCall<Integer, String> call;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test public void unaryFutureCallSuccess() throws Exception {
@Test
public void unaryFutureCallSuccess() throws Exception {
Integer req = 2;
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
ArgumentCaptor<ClientCall.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
@ -80,7 +102,8 @@ public class ClientCallsTest {
assertEquals("bar", future.get());
}
@Test public void unaryFutureCallFailed() throws Exception {
@Test
public void unaryFutureCallFailed() throws Exception {
Integer req = 2;
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
ArgumentCaptor<ClientCall.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
@ -99,7 +122,8 @@ public class ClientCallsTest {
}
}
@Test public void unaryFutureCallCancelled() throws Exception {
@Test
public void unaryFutureCallCancelled() throws Exception {
Integer req = 2;
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
ArgumentCaptor<ClientCall.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
@ -117,7 +141,291 @@ public class ClientCallsTest {
}
}
@Test public void blockingResponseStreamFailed() throws Exception {
@Test
public void cannotSetOnReadyAfterCallStarted() throws Exception {
CallStreamObserver<Integer> callStreamObserver =
(CallStreamObserver<Integer>) ClientCalls.asyncClientStreamingCall(call,
new NoopStreamObserver<String>());
Runnable noOpRunnable = new Runnable() {
@Override
public void run() {
}
};
try {
callStreamObserver.setOnReadyHandler(noOpRunnable);
fail("Should not be able to set handler after call started");
} catch (IllegalStateException ise) {
// expected
}
}
@Test
public void disablingInboundAutoFlowControlSuppressesRequestsForMoreMessages()
throws Exception {
ArgumentCaptor<ClientCall.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
CallStreamObserver<Integer> requestObserver =
(CallStreamObserver<Integer>)
ClientCalls.asyncBidiStreamingCall(call, new ClientResponseObserver<Integer, String>() {
@Override
public void beforeStart(ClientCallStreamObserver<Integer> requestStream) {
requestStream.disableAutoInboundFlowControl();
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
verify(call).start(listenerCaptor.capture(), any(Metadata.class));
listenerCaptor.getValue().onMessage("message");
verify(call, times(1)).request(1);
}
@Test
public void callStreamObserverPropagatesFlowControlRequestsToCall()
throws Exception {
ArgumentCaptor<ClientCall.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
ClientResponseObserver<Integer, String> responseObserver =
new ClientResponseObserver<Integer, String>() {
@Override
public void beforeStart(ClientCallStreamObserver<Integer> requestStream) {
requestStream.disableAutoInboundFlowControl();
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
};
CallStreamObserver<Integer> requestObserver =
(CallStreamObserver<Integer>)
ClientCalls.asyncBidiStreamingCall(call, responseObserver);
verify(call).start(listenerCaptor.capture(), any(Metadata.class));
listenerCaptor.getValue().onMessage("message");
requestObserver.request(5);
verify(call, times(1)).request(5);
}
@Test
public void canCaptureInboundFlowControlForServerStreamingObserver()
throws Exception {
ArgumentCaptor<ClientCall.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
ClientResponseObserver<Integer, String> responseObserver =
new ClientResponseObserver<Integer, String>() {
@Override
public void beforeStart(ClientCallStreamObserver<Integer> requestStream) {
requestStream.disableAutoInboundFlowControl();
requestStream.request(5);
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
};
ClientCalls.asyncServerStreamingCall(call, 1, responseObserver);
verify(call).start(listenerCaptor.capture(), any(Metadata.class));
listenerCaptor.getValue().onMessage("message");
verify(call, times(1)).request(1);
verify(call, times(1)).request(5);
}
@Test
public void inprocessTransportInboundFlowControl() throws Exception {
final Semaphore semaphore = new Semaphore(1);
ServerServiceDefinition service = ServerServiceDefinition.builder(
new ServiceDescriptor("some", STREAMING_METHOD))
.addMethod(STREAMING_METHOD, ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
int iteration;
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
final ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.setOnReadyHandler(new Runnable() {
@Override
public void run() {
while (serverCallObserver.isReady()) {
serverCallObserver.onNext(iteration);
}
iteration++;
semaphore.release();
}
});
return new ServerCalls.NoopStreamObserver<Integer>() {
@Override
public void onCompleted() {
serverCallObserver.onCompleted();
}
};
}
}))
.build();
long tag = System.nanoTime();
InProcessServerBuilder.forName("go-with-the-flow" + tag).addService(service).build().start();
ManagedChannelImpl channel = InProcessChannelBuilder.forName("go-with-the-flow" + tag).build();
final ClientCall<Integer, Integer> clientCall = channel.newCall(STREAMING_METHOD,
CallOptions.DEFAULT);
final CountDownLatch latch = new CountDownLatch(1);
final int[] receivedMessages = new int[6];
semaphore.acquire();
ClientResponseObserver<Integer, Integer> responseObserver =
new ClientResponseObserver<Integer, Integer>() {
int index;
@Override
public void beforeStart(final ClientCallStreamObserver<Integer> requestStream) {
requestStream.disableAutoInboundFlowControl();
}
@Override
public void onNext(Integer value) {
receivedMessages[index++] = value;
}
@Override
public void onError(Throwable t) {
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
};
CallStreamObserver<Integer> integerStreamObserver = (CallStreamObserver<Integer>)
ClientCalls.asyncBidiStreamingCall(clientCall, responseObserver);
semaphore.acquire();
integerStreamObserver.request(2);
semaphore.acquire();
integerStreamObserver.request(3);
integerStreamObserver.onCompleted();
latch.await(5, TimeUnit.SECONDS);
// Very that number of messages produced in each onReady handler call matches the number
// requested by the client. Note that ClientCalls.asyncBidiStreamingCall will request(1)
assertArrayEquals(new int[]{0, 1, 1, 2, 2, 2}, receivedMessages);
}
@Test
public void inprocessTransportOutboundFlowControl() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final Semaphore semaphore = new Semaphore(1);
final int[] receivedMessages = new int[6];
ServerServiceDefinition service = ServerServiceDefinition.builder(
new ServiceDescriptor("some", STREAMING_METHOD))
.addMethod(STREAMING_METHOD, ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
final ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.disableAutoInboundFlowControl();
new Thread(new Runnable() {
@Override
public void run() {
try {
serverCallObserver.request(1);
semaphore.acquire();
serverCallObserver.request(2);
semaphore.acquire();
serverCallObserver.request(3);
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
}).start();
return new ServerCalls.NoopStreamObserver<Integer>() {
int index;
@Override
public void onNext(Integer value) {
receivedMessages[index++] = value;
}
@Override
public void onCompleted() {
serverCallObserver.onCompleted();
latch.countDown();
}
};
}
}))
.build();
long tag = System.nanoTime();
InProcessServerBuilder.forName("go-with-the-flow" + tag).addService(service).build().start();
ManagedChannelImpl channel = InProcessChannelBuilder.forName("go-with-the-flow" + tag).build();
final ClientCall<Integer, Integer> clientCall = channel.newCall(STREAMING_METHOD,
CallOptions.DEFAULT);
semaphore.acquire();
ClientResponseObserver<Integer, Integer> responseObserver =
new ClientResponseObserver<Integer, Integer>() {
@Override
public void beforeStart(final ClientCallStreamObserver<Integer> requestStream) {
requestStream.setOnReadyHandler(new Runnable() {
int iteration;
@Override
public void run() {
while (requestStream.isReady()) {
requestStream.onNext(iteration);
}
iteration++;
if (iteration == 3) {
requestStream.onCompleted();
}
semaphore.release();
}
});
}
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
};
ClientCalls.asyncBidiStreamingCall(clientCall, responseObserver);
latch.await(5, TimeUnit.SECONDS);
// Very that number of messages produced in each onReady handler call matches the number
// requested by the client.
assertArrayEquals(new int[]{0, 1, 1, 2, 2, 2}, receivedMessages);
}
@Test
public void blockingResponseStreamFailed() throws Exception {
Integer req = 2;
Iterator<String> iter = ClientCalls.blockingServerStreamingCall(call, req);
ArgumentCaptor<ClientCall.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);