Add CallStreamObserver and ServerCallStreamObserver which allow for application level interaction with flow control

while still allowing stubs to be used.
Currently these APIs are only useful on the server side, a separate proposal will be made for adding client support
An example of usage is in TestServiceImpl.streamingOutputCallManualFlowControl which listens for flow-control 'onReady' events and produces messages to the client.
Added some unit testing for ServerCalls
This commit is contained in:
Louis Ryan 2016-03-11 13:05:25 -08:00
parent de7ec3c682
commit 7902017b07
4 changed files with 661 additions and 7 deletions

View File

@ -0,0 +1,102 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.stub;
import io.grpc.ExperimentalApi;
/**
* A refinement of StreamObserver provided by the GRPC runtime to the application that allows for
* more complex interactions with call behavior.
*
* <p>In any call there are logically two {@link StreamObserver} implementations:
* <ul>
* <li>'inbound' - which the GRPC runtime calls when it receives messages from the
* remote peer. This is implemented by the application.
* </li>
* <li>'outbound' - which the GRPC runtime provides to the application which it uses to
* send messages to the remote peer.
* </li>
* </ul>
*
* <p>Implementations of this class represent the 'outbound' message stream.
*
*/
@ExperimentalApi
public abstract class CallStreamObserver<V> implements StreamObserver<V> {
/**
* If {@code true} indicates that a call to {@link #onNext(Object)} will not require the entire
* message to be buffered before it is sent to the peer.
*/
public abstract boolean isReady();
/**
* Set a {@link Runnable} that will be executed every time the stream {@link #isReady()} state
* changes from {@code false} to {@code true}. While it is not guaranteed that the same
* thread will always be used to execute the {@link Runnable}, it is guaranteed that executions
* are serialized with calls to the 'inbound' {@link StreamObserver}.
*
* <p>Note that the handler may be called some time after {@link #isReady} has transitioned to
* true as other callbacks may still be executing in the 'inbound' observer.
*
* @param onReadyHandler to call when peer is ready to receive more messages.
*/
public abstract void setOnReadyHandler(Runnable onReadyHandler);
/**
* Disables automatic flow control where a token is returned to the peer after a call
* to the 'inbound' {@link io.grpc.stub.StreamObserver#onNext(Object)} has completed. If disabled
* an application must make explicit calls to {@link #request} to receive messages.
*
* <p>Note that for cases where the runtime knows that only one inbound message is allowed
* calling this method will have no effect and the runtime will always permit one and only
* one message. This is true for:
* <ul>
* <li>{@link io.grpc.MethodDescriptor.MethodType#UNARY} operations on both the
* client and server.
* </li>
* <li>{@link io.grpc.MethodDescriptor.MethodType#CLIENT_STREAMING} operations on the server.
* </li>
* <li>{@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations on the client.
* </li>
* </ul>
* </p>
*/
public abstract void disableAutoInboundFlowControl();
/**
* Requests the peer to produce {@code count} more messages to be delivered to the 'inbound'
* {@link StreamObserver}.
* @param count more messages
*/
public abstract void request(int count);
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.stub;
import io.grpc.ExperimentalApi;
/**
* A refinement of {@link CallStreamObserver} to allows for interaction with call
* cancellation events on the server side.
*/
@ExperimentalApi
public abstract class ServerCallStreamObserver<V> extends CallStreamObserver<V> {
/**
* If {@code true} indicates that the call has been cancelled by the remote peer.
*/
public abstract boolean isCancelled();
/**
* Set a {@link Runnable} that will be called if the calls {@link #isCancelled()} state
* changes from {@code false} to {@code true}. It is guaranteed that execution of the
* {@link Runnable} are serialized with calls to the 'inbound' {@link StreamObserver}.
*
* <p>Note that the handler may be called some time after {@link #isCancelled} has transitioned to
* {@code true} as other callbacks may still be executing in the 'inbound' observer.
*
* @param onCancelHandler to call when client has cancelled the call.
*/
public abstract void setOnCancelHandler(Runnable onCancelHandler);
}

View File

@ -128,9 +128,11 @@ public class ServerCalls {
MethodDescriptor<ReqT, RespT> methodDescriptor,
final ServerCall<RespT> call,
Metadata headers) {
final ResponseObserver<RespT> responseObserver = new ResponseObserver<RespT>(call);
final ServerCallStreamObserverImpl<RespT> responseObserver =
new ServerCallStreamObserverImpl<RespT>(call);
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
// sends more than 1 requests, ServerCall will catch it.
// sends more than 1 requests, ServerCall will catch it. Note that disabling auto
// inbound flow control has no effect on unary calls.
call.request(2);
return new EmptyServerCallListener<ReqT>() {
ReqT request;
@ -145,6 +147,12 @@ public class ServerCalls {
public void onHalfClose() {
if (request != null) {
method.invoke(request, responseObserver);
responseObserver.freeze();
if (call.isReady()) {
// Since we are calling invoke in halfClose we have missed the onReady
// event from the transport so recover it here.
onReady();
}
} else {
call.close(Status.INVALID_ARGUMENT.withDescription("Half-closed without a request"),
new Metadata());
@ -154,6 +162,16 @@ public class ServerCalls {
@Override
public void onCancel() {
responseObserver.cancelled = true;
if (responseObserver.onCancelHandler != null) {
responseObserver.onCancelHandler.run();
}
}
@Override
public void onReady() {
if (responseObserver.onReadyHandler != null) {
responseObserver.onReadyHandler.run();
}
}
};
}
@ -173,9 +191,13 @@ public class ServerCalls {
MethodDescriptor<ReqT, RespT> methodDescriptor,
final ServerCall<RespT> call,
Metadata headers) {
call.request(1);
final ResponseObserver<RespT> responseObserver = new ResponseObserver<RespT>(call);
final ServerCallStreamObserverImpl<RespT> responseObserver =
new ServerCallStreamObserverImpl<RespT>(call);
final StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
responseObserver.freeze();
if (responseObserver.autoFlowControlEnabled) {
call.request(1);
}
return new EmptyServerCallListener<ReqT>() {
boolean halfClosed = false;
@ -184,7 +206,9 @@ public class ServerCalls {
requestObserver.onNext(request);
// Request delivery of the next inbound message.
call.request(1);
if (responseObserver.autoFlowControlEnabled) {
call.request(1);
}
}
@Override
@ -196,10 +220,20 @@ public class ServerCalls {
@Override
public void onCancel() {
responseObserver.cancelled = true;
if (responseObserver.onCancelHandler != null) {
responseObserver.onCancelHandler.run();
}
if (!halfClosed) {
requestObserver.onError(Status.CANCELLED.asException());
}
}
@Override
public void onReady() {
if (responseObserver.onReadyHandler != null) {
responseObserver.onReadyHandler.run();
}
}
};
}
};
@ -213,15 +247,24 @@ public class ServerCalls {
StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
}
private static class ResponseObserver<RespT> implements StreamObserver<RespT> {
private static class ServerCallStreamObserverImpl<RespT>
extends ServerCallStreamObserver<RespT> {
final ServerCall<RespT> call;
volatile boolean cancelled;
private boolean frozen;
private boolean autoFlowControlEnabled = true;
private boolean sentHeaders;
private Runnable onReadyHandler;
private Runnable onCancelHandler;
ResponseObserver(ServerCall<RespT> call) {
ServerCallStreamObserverImpl(ServerCall<RespT> call) {
this.call = call;
}
private final void freeze() {
this.frozen = true;
}
@Override
public void onNext(RespT response) {
if (cancelled) {
@ -247,6 +290,46 @@ public class ServerCalls {
call.close(Status.OK, new Metadata());
}
}
@Override
public boolean isReady() {
return call.isReady();
}
@Override
public void setOnReadyHandler(Runnable r) {
if (frozen) {
throw new IllegalStateException("Cannot alter onReadyHandler after initialization");
}
this.onReadyHandler = r;
}
@Override
public boolean isCancelled() {
return call.isCancelled();
}
@Override
public void setOnCancelHandler(Runnable onCancelHandler) {
if (frozen) {
throw new IllegalStateException("Cannot alter onCancelHandler after initialization");
}
this.onCancelHandler = onCancelHandler;
}
@Override
public void disableAutoInboundFlowControl() {
if (frozen) {
throw new IllegalStateException("Cannot disable auto flow control after initialization");
} else {
autoFlowControlEnabled = false;
}
}
@Override
public void request(int count) {
call.request(count);
}
}
private static class EmptyServerCallListener<ReqT> extends ServerCall.Listener<ReqT> {

View File

@ -0,0 +1,410 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.stub;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.ManagedChannelImpl;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* Tests for {@link ServerCalls}.
*/
@RunWith(JUnit4.class)
public class ServerCallsTest {
static final MethodDescriptor<Integer, Integer> STREAMING_METHOD = MethodDescriptor.create(
MethodDescriptor.MethodType.BIDI_STREAMING,
"some/method",
new IntegerMarshaller(), new IntegerMarshaller());
static final MethodDescriptor<Integer, Integer> UNARY_METHOD = MethodDescriptor.create(
MethodDescriptor.MethodType.UNARY,
"some/unarymethod",
new IntegerMarshaller(), new IntegerMarshaller());
@Mock
ServerCall<Integer> serverCall;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
}
@Test
public void runtimeStreamObserverIsServerCallStreamObserver() throws Exception {
final AtomicBoolean invokeCalled = new AtomicBoolean();
final AtomicBoolean onCancelCalled = new AtomicBoolean();
final AtomicBoolean onReadyCalled = new AtomicBoolean();
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<ServerCallStreamObserver<Integer>>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
assertTrue(responseObserver instanceof ServerCallStreamObserver);
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
callObserver.set(serverCallObserver);
serverCallObserver.setOnCancelHandler(new Runnable() {
@Override
public void run() {
onCancelCalled.set(true);
}
});
serverCallObserver.setOnReadyHandler(new Runnable() {
@Override
public void run() {
onReadyCalled.set(true);
}
});
invokeCalled.set(true);
return new NoOpStreamObserver();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata());
Mockito.when(serverCall.isReady()).thenReturn(true).thenReturn(false);
Mockito.when(serverCall.isCancelled()).thenReturn(false).thenReturn(true);
assertTrue(callObserver.get().isReady());
assertFalse(callObserver.get().isCancelled());
callListener.onReady();
callListener.onMessage(1);
callListener.onCancel();
assertTrue(invokeCalled.get());
assertTrue(onReadyCalled.get());
assertTrue(onCancelCalled.get());
assertFalse(callObserver.get().isReady());
assertTrue(callObserver.get().isCancelled());
// Is called twice, once to permit the first message and once again after the first message
// has been processed (auto flow control)
Mockito.verify(serverCall, times(2)).request(1);
}
@Test
public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<ServerCallStreamObserver<Integer>>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
callObserver.set((ServerCallStreamObserver<Integer>) responseObserver);
return new NoOpStreamObserver();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().setOnCancelHandler(new Runnable() {
@Override
public void run() {
}
});
fail("Cannot set onCancel handler after service invocation");
} catch (IllegalStateException expected) {
// Expected
}
}
@Test
public void cannotSetOnReadyHandlerAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<ServerCallStreamObserver<Integer>>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
callObserver.set((ServerCallStreamObserver<Integer>) responseObserver);
return new NoOpStreamObserver();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().setOnReadyHandler(new Runnable() {
@Override
public void run() {
}
});
fail("Cannot set onReady after service invocation");
} catch (IllegalStateException expected) {
// Expected
}
}
@Test
public void cannotDisableAutoFlowControlAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<ServerCallStreamObserver<Integer>>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
callObserver.set((ServerCallStreamObserver<Integer>) responseObserver);
return new NoOpStreamObserver();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().disableAutoInboundFlowControl();
fail("Cannot set onCancel handler after service invocation");
} catch (IllegalStateException expected) {
// Expected
}
}
@Test
public void disablingInboundAutoFlowControlSuppressesRequestsForMoreMessages() throws Exception {
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.disableAutoInboundFlowControl();
return new NoOpStreamObserver();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata());
callListener.onReady();
// Transport should not call this if nothing has been requested but forcing it here
// to verify that message delivery does not trigger a call to request(1).
callListener.onMessage(1);
// Should never be called
Mockito.verify(serverCall, times(0)).request(1);
}
@Test
public void disablingInboundAutoFlowControlForUnaryHasNoEffect() throws Exception {
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncUnaryCall(
new ServerCalls.UnaryMethod<Integer, Integer>() {
@Override
public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.disableAutoInboundFlowControl();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(UNARY_METHOD, serverCall, new Metadata());
// Auto inbound flow-control always requests 2 messages for unary to detect a violation
// of the unary semantic.
Mockito.verify(serverCall, times(1)).request(2);
}
@Test
public void onReadyHandlerCalledForUnaryRequest() throws Exception {
final AtomicInteger onReadyCalled = new AtomicInteger();
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<ServerCallStreamObserver<Integer>>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncServerStreamingCall(
new ServerCalls.ServerStreamingMethod<Integer, Integer>() {
@Override
public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.setOnReadyHandler(new Runnable() {
@Override
public void run() {
onReadyCalled.incrementAndGet();
}
});
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata());
Mockito.when(serverCall.isReady()).thenReturn(true).thenReturn(false);
Mockito.when(serverCall.isCancelled()).thenReturn(false).thenReturn(true);
callListener.onReady();
// On ready is not called until the unary request message is delivered
assertEquals(0, onReadyCalled.get());
// delivering the message doesn't trigger onReady listener either
callListener.onMessage(1);
assertEquals(0, onReadyCalled.get());
// half-closing triggers the unary request delivery and onReady
callListener.onHalfClose();
assertEquals(1, onReadyCalled.get());
// Next on ready event from the transport triggers listener
callListener.onReady();
assertEquals(2, onReadyCalled.get());
}
@Test
public void inprocessTransportManualFlow() throws Exception {
final Semaphore semaphore = new Semaphore(1);
ServerServiceDefinition service = ServerServiceDefinition.builder("some")
.addMethod(STREAMING_METHOD, ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
int iteration;
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
final ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.setOnReadyHandler(new Runnable() {
@Override
public void run() {
while (serverCallObserver.isReady()) {
serverCallObserver.onNext(iteration);
}
iteration++;
semaphore.release();
}
});
return new NoOpStreamObserver() {
@Override
public void onCompleted() {
serverCallObserver.onCompleted();
}
};
}
}))
.build();
long tag = System.nanoTime();
InProcessServerBuilder.forName("go-with-the-flow" + tag).addService(service).build().start();
ManagedChannelImpl channel = InProcessChannelBuilder.forName("go-with-the-flow" + tag).build();
final ClientCall<Integer, Integer> clientCall = channel.newCall(STREAMING_METHOD,
CallOptions.DEFAULT);
final CountDownLatch latch = new CountDownLatch(1);
final int[] receivedMessages = new int[6];
clientCall.start(new ClientCall.Listener<Integer>() {
int index;
@Override
public void onMessage(Integer message) {
receivedMessages[index++] = message;
}
@Override
public void onClose(Status status, Metadata trailers) {
latch.countDown();
}
}, new Metadata());
semaphore.acquire();
clientCall.request(1);
semaphore.acquire();
clientCall.request(2);
semaphore.acquire();
clientCall.request(3);
clientCall.halfClose();
latch.await(5, TimeUnit.SECONDS);
// Very that number of messages produced in each onReady handler call matches the number
// requested by the client.
assertArrayEquals(new int[]{0, 1, 1, 2, 2, 2}, receivedMessages);
}
private static class NoOpStreamObserver implements StreamObserver<Integer> {
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
}
public static class IntegerMarshaller implements MethodDescriptor.Marshaller<Integer> {
@Override
public InputStream stream(Integer value) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream(4);
DataOutputStream dos = new DataOutputStream(baos);
dos.writeInt(value);
return new ByteArrayInputStream(baos.toByteArray());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
@Override
public Integer parse(InputStream stream) {
try {
DataInputStream dis = new DataInputStream(stream);
return dis.readInt();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
}
}