diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index 5fe7248b2b..8d0c4f421a 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -228,7 +228,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { private class ResponseDispatcher { private final Chunk completionChunk = new Chunk(0, 0, 0); private final Queue chunks; - private final StreamObserver responseStream; + private final ServerCallStreamObserver responseStream; private boolean scheduled; @GuardedBy("this") private boolean cancelled; private Throwable failure; @@ -268,7 +268,12 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { */ public ResponseDispatcher(StreamObserver responseStream) { this.chunks = Queues.newLinkedBlockingQueue(); - this.responseStream = responseStream; + this.responseStream = (ServerCallStreamObserver) responseStream; + this.responseStream.setOnReadyHandler(new Runnable() { + @Override public void run() { + scheduleNextChunk(); + } + }); } /** @@ -349,6 +354,11 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { return; } + if (chunks.peek() != completionChunk && !responseStream.isReady()) { + // Wait for the onReady handler to be called. + return; + } + // Schedule the next response chunk if there is one. Chunk nextChunk = chunks.peek(); if (nextChunk != null) {