interop-testing: Observe flow control in TestServiceImpl

This commit is contained in:
Eric Anderson 2019-06-13 16:46:00 -07:00
parent 0b27e2862d
commit e795f14bed
1 changed files with 12 additions and 2 deletions

View File

@ -228,7 +228,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
private class ResponseDispatcher {
private final Chunk completionChunk = new Chunk(0, 0, 0);
private final Queue<Chunk> chunks;
private final StreamObserver<StreamingOutputCallResponse> responseStream;
private final ServerCallStreamObserver<StreamingOutputCallResponse> responseStream;
private boolean scheduled;
@GuardedBy("this") private boolean cancelled;
private Throwable failure;
@ -268,7 +268,12 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
*/
public ResponseDispatcher(StreamObserver<StreamingOutputCallResponse> responseStream) {
this.chunks = Queues.newLinkedBlockingQueue();
this.responseStream = responseStream;
this.responseStream = (ServerCallStreamObserver<StreamingOutputCallResponse>) responseStream;
this.responseStream.setOnReadyHandler(new Runnable() {
@Override public void run() {
scheduleNextChunk();
}
});
}
/**
@ -349,6 +354,11 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
return;
}
if (chunks.peek() != completionChunk && !responseStream.isReady()) {
// Wait for the onReady handler to be called.
return;
}
// Schedule the next response chunk if there is one.
Chunk nextChunk = chunks.peek();
if (nextChunk != null) {