From e795f14bedc56af2f2ba60eef1b23add6106ab99 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Thu, 13 Jun 2019 16:46:00 -0700 Subject: [PATCH] interop-testing: Observe flow control in TestServiceImpl --- .../grpc/testing/integration/TestServiceImpl.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index 5fe7248b2b..8d0c4f421a 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -228,7 +228,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { private class ResponseDispatcher { private final Chunk completionChunk = new Chunk(0, 0, 0); private final Queue chunks; - private final StreamObserver responseStream; + private final ServerCallStreamObserver responseStream; private boolean scheduled; @GuardedBy("this") private boolean cancelled; private Throwable failure; @@ -268,7 +268,12 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { */ public ResponseDispatcher(StreamObserver responseStream) { this.chunks = Queues.newLinkedBlockingQueue(); - this.responseStream = responseStream; + this.responseStream = (ServerCallStreamObserver) responseStream; + this.responseStream.setOnReadyHandler(new Runnable() { + @Override public void run() { + scheduleNextChunk(); + } + }); } /** @@ -349,6 +354,11 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { return; } + if (chunks.peek() != completionChunk && !responseStream.isReady()) { + // Wait for the onReady handler to be called. + return; + } + // Schedule the next response chunk if there is one. Chunk nextChunk = chunks.peek(); if (nextChunk != null) {