diff --git a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java index abd04fa9ad..694330dfdb 100644 --- a/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java +++ b/examples/src/main/java/io/grpc/examples/manualflowcontrol/ManualFlowControlServer.java @@ -24,7 +24,6 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; public class ManualFlowControlServer { @@ -42,12 +41,6 @@ public class ManualFlowControlServer { (ServerCallStreamObserver) responseObserver; serverCallStreamObserver.disableAutoInboundFlowControl(); - // Guard against spurious onReady() calls caused by a race between onNext() and onReady(). If the transport - // toggles isReady() from false to true while onNext() is executing, but before onNext() checks isReady(), - // request(1) would be called twice - once by onNext() and once by the onReady() scheduled during onNext()'s - // execution. - final AtomicBoolean wasReady = new AtomicBoolean(false); - // Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked // when the consuming side has enough buffer space to receive more messages. // @@ -55,10 +48,17 @@ public class ManualFlowControlServer { // onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent additional messages // from being processed by the incoming StreamObserver. The onReadyHandler must return in a timely manor or else // message processing throughput will suffer. - serverCallStreamObserver.setOnReadyHandler(new Runnable() { + class OnReadyHandler implements Runnable { + // Guard against spurious onReady() calls caused by a race between onNext() and onReady(). If the transport + // toggles isReady() from false to true while onNext() is executing, but before onNext() checks isReady(), + // request(1) would be called twice - once by onNext() and once by the onReady() scheduled during onNext()'s + // execution. + private boolean wasReady = false; + @Override public void run() { - if (serverCallStreamObserver.isReady() && wasReady.compareAndSet(false, true)) { + if (serverCallStreamObserver.isReady() && !wasReady) { + wasReady = true; logger.info("READY"); // Signal the request sender to send one message. This happens when isReady() turns true, signaling that // the receive buffer has enough free space to receive more messages. Calling request() serves to prime @@ -66,7 +66,9 @@ public class ManualFlowControlServer { serverCallStreamObserver.request(1); } } - }); + } + final OnReadyHandler onReadyHandler = new OnReadyHandler(); + serverCallStreamObserver.setOnReadyHandler(onReadyHandler); // Give gRPC a StreamObserver that can observe and process incoming requests. return new StreamObserver() { @@ -90,16 +92,17 @@ public class ManualFlowControlServer { // Check the provided ServerCallStreamObserver to see if it is still ready to accept more messages. if (serverCallStreamObserver.isReady()) { // Signal the sender to send another request. As long as isReady() stays true, the server will keep - // cycling through the loop of onNext() -> request()...onNext() -> request()... until either the client - // runs out of messages and ends the loop or the server runs out of receive buffer space. + // cycling through the loop of onNext() -> request(1)...onNext() -> request(1)... until the client runs + // out of messages and ends the loop (via onCompleted()). // - // If the server runs out of buffer space, isReady() will turn false. When the receive buffer has - // sufficiently drained, isReady() will turn true, and the serverCallStreamObserver's onReadyHandler - // will be called to restart the message pump. + // If request() was called here with the argument of more than 1, the server might runs out of receive + // buffer space, and isReady() will turn false. When the receive buffer has sufficiently drained, + // isReady() will turn true, and the serverCallStreamObserver's onReadyHandler will be called to restart + // the message pump. serverCallStreamObserver.request(1); } else { // If not, note that back-pressure has begun. - wasReady.set(false); + onReadyHandler.wasReady = false; } } catch (Throwable throwable) { throwable.printStackTrace(); diff --git a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java index 8ed16bb96e..98fa6fba57 100644 --- a/stub/src/main/java/io/grpc/stub/CallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/CallStreamObserver.java @@ -19,20 +19,30 @@ package io.grpc.stub; import io.grpc.ExperimentalApi; /** - * A refinement of StreamObserver provided by the GRPC runtime to the application that allows for - * more complex interactions with call behavior. + * A refinement of StreamObserver provided by the GRPC runtime to the application (the client or + * the server) that allows for more complex interactions with call behavior. * - *

In any call there are logically two {@link StreamObserver} implementations: + *

In any call there are logically four {@link StreamObserver} implementations: *

* - *

Implementations of this class represent the 'outbound' message stream. + *

Implementations of this class represent the 'outbound' message streams. The client-side + * one is {@link ClientCallStreamObserver} and the service-side one is + * {@link ServerCallStreamObserver}. * *

Like {@code StreamObserver}, implementations are not required to be thread-safe; if multiple * threads will be writing to an instance concurrently, the application must synchronize its calls.