mirror of https://github.com/grpc/grpc-java.git
stub,examples: Clarify CallStreamObserver's Javadoc (#6561)
* Clarify CallStreamObserver's Javadoc * Remove unnecessary AtomicBoolean and clarify a comment in ManualFlowControlServer
This commit is contained in:
parent
4ad3acc1d4
commit
589a645a38
|
|
@ -24,7 +24,6 @@ import io.grpc.stub.StreamObserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
public class ManualFlowControlServer {
|
public class ManualFlowControlServer {
|
||||||
|
|
@ -42,12 +41,6 @@ public class ManualFlowControlServer {
|
||||||
(ServerCallStreamObserver<HelloReply>) responseObserver;
|
(ServerCallStreamObserver<HelloReply>) responseObserver;
|
||||||
serverCallStreamObserver.disableAutoInboundFlowControl();
|
serverCallStreamObserver.disableAutoInboundFlowControl();
|
||||||
|
|
||||||
// Guard against spurious onReady() calls caused by a race between onNext() and onReady(). If the transport
|
|
||||||
// toggles isReady() from false to true while onNext() is executing, but before onNext() checks isReady(),
|
|
||||||
// request(1) would be called twice - once by onNext() and once by the onReady() scheduled during onNext()'s
|
|
||||||
// execution.
|
|
||||||
final AtomicBoolean wasReady = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
// Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked
|
// Set up a back-pressure-aware consumer for the request stream. The onReadyHandler will be invoked
|
||||||
// when the consuming side has enough buffer space to receive more messages.
|
// when the consuming side has enough buffer space to receive more messages.
|
||||||
//
|
//
|
||||||
|
|
@ -55,10 +48,17 @@ public class ManualFlowControlServer {
|
||||||
// onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent additional messages
|
// onNext(), onError(), and onComplete() handlers. Blocking the onReadyHandler will prevent additional messages
|
||||||
// from being processed by the incoming StreamObserver. The onReadyHandler must return in a timely manor or else
|
// from being processed by the incoming StreamObserver. The onReadyHandler must return in a timely manor or else
|
||||||
// message processing throughput will suffer.
|
// message processing throughput will suffer.
|
||||||
serverCallStreamObserver.setOnReadyHandler(new Runnable() {
|
class OnReadyHandler implements Runnable {
|
||||||
|
// Guard against spurious onReady() calls caused by a race between onNext() and onReady(). If the transport
|
||||||
|
// toggles isReady() from false to true while onNext() is executing, but before onNext() checks isReady(),
|
||||||
|
// request(1) would be called twice - once by onNext() and once by the onReady() scheduled during onNext()'s
|
||||||
|
// execution.
|
||||||
|
private boolean wasReady = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
if (serverCallStreamObserver.isReady() && wasReady.compareAndSet(false, true)) {
|
if (serverCallStreamObserver.isReady() && !wasReady) {
|
||||||
|
wasReady = true;
|
||||||
logger.info("READY");
|
logger.info("READY");
|
||||||
// Signal the request sender to send one message. This happens when isReady() turns true, signaling that
|
// Signal the request sender to send one message. This happens when isReady() turns true, signaling that
|
||||||
// the receive buffer has enough free space to receive more messages. Calling request() serves to prime
|
// the receive buffer has enough free space to receive more messages. Calling request() serves to prime
|
||||||
|
|
@ -66,7 +66,9 @@ public class ManualFlowControlServer {
|
||||||
serverCallStreamObserver.request(1);
|
serverCallStreamObserver.request(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
final OnReadyHandler onReadyHandler = new OnReadyHandler();
|
||||||
|
serverCallStreamObserver.setOnReadyHandler(onReadyHandler);
|
||||||
|
|
||||||
// Give gRPC a StreamObserver that can observe and process incoming requests.
|
// Give gRPC a StreamObserver that can observe and process incoming requests.
|
||||||
return new StreamObserver<HelloRequest>() {
|
return new StreamObserver<HelloRequest>() {
|
||||||
|
|
@ -90,16 +92,17 @@ public class ManualFlowControlServer {
|
||||||
// Check the provided ServerCallStreamObserver to see if it is still ready to accept more messages.
|
// Check the provided ServerCallStreamObserver to see if it is still ready to accept more messages.
|
||||||
if (serverCallStreamObserver.isReady()) {
|
if (serverCallStreamObserver.isReady()) {
|
||||||
// Signal the sender to send another request. As long as isReady() stays true, the server will keep
|
// Signal the sender to send another request. As long as isReady() stays true, the server will keep
|
||||||
// cycling through the loop of onNext() -> request()...onNext() -> request()... until either the client
|
// cycling through the loop of onNext() -> request(1)...onNext() -> request(1)... until the client runs
|
||||||
// runs out of messages and ends the loop or the server runs out of receive buffer space.
|
// out of messages and ends the loop (via onCompleted()).
|
||||||
//
|
//
|
||||||
// If the server runs out of buffer space, isReady() will turn false. When the receive buffer has
|
// If request() was called here with the argument of more than 1, the server might runs out of receive
|
||||||
// sufficiently drained, isReady() will turn true, and the serverCallStreamObserver's onReadyHandler
|
// buffer space, and isReady() will turn false. When the receive buffer has sufficiently drained,
|
||||||
// will be called to restart the message pump.
|
// isReady() will turn true, and the serverCallStreamObserver's onReadyHandler will be called to restart
|
||||||
|
// the message pump.
|
||||||
serverCallStreamObserver.request(1);
|
serverCallStreamObserver.request(1);
|
||||||
} else {
|
} else {
|
||||||
// If not, note that back-pressure has begun.
|
// If not, note that back-pressure has begun.
|
||||||
wasReady.set(false);
|
onReadyHandler.wasReady = false;
|
||||||
}
|
}
|
||||||
} catch (Throwable throwable) {
|
} catch (Throwable throwable) {
|
||||||
throwable.printStackTrace();
|
throwable.printStackTrace();
|
||||||
|
|
|
||||||
|
|
@ -19,20 +19,30 @@ package io.grpc.stub;
|
||||||
import io.grpc.ExperimentalApi;
|
import io.grpc.ExperimentalApi;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A refinement of StreamObserver provided by the GRPC runtime to the application that allows for
|
* A refinement of StreamObserver provided by the GRPC runtime to the application (the client or
|
||||||
* more complex interactions with call behavior.
|
* the server) that allows for more complex interactions with call behavior.
|
||||||
*
|
*
|
||||||
* <p>In any call there are logically two {@link StreamObserver} implementations:
|
* <p>In any call there are logically four {@link StreamObserver} implementations:
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li>'inbound' - which the GRPC runtime calls when it receives messages from the
|
* <li>'inbound', client-side - which the GRPC runtime calls when it receives messages from
|
||||||
* remote peer. This is implemented by the application.
|
* the server. This is implemented by the client application and passed into a service method
|
||||||
|
* on a stub object.
|
||||||
* </li>
|
* </li>
|
||||||
* <li>'outbound' - which the GRPC runtime provides to the application which it uses to
|
* <li>'outbound', client-side - which the GRPC runtime provides to the client application and the
|
||||||
* send messages to the remote peer.
|
* client uses this {@code StreamObserver} to send messages to the server.
|
||||||
|
* </li>
|
||||||
|
* <li>'inbound', server-side - which the GRPC runtime calls when it receives messages from
|
||||||
|
* the client. This is implemented by the server application and returned from service
|
||||||
|
* implementations of client-side streaming and bidirectional streaming methods.
|
||||||
|
* </li>
|
||||||
|
* <li>'outbound', server-side - which the GRPC runtime provides to the server application and
|
||||||
|
* the server uses this {@code StreamObserver} to send messages (responses) to the client.
|
||||||
* </li>
|
* </li>
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
* <p>Implementations of this class represent the 'outbound' message stream.
|
* <p>Implementations of this class represent the 'outbound' message streams. The client-side
|
||||||
|
* one is {@link ClientCallStreamObserver} and the service-side one is
|
||||||
|
* {@link ServerCallStreamObserver}.
|
||||||
*
|
*
|
||||||
* <p>Like {@code StreamObserver}, implementations are not required to be thread-safe; if multiple
|
* <p>Like {@code StreamObserver}, implementations are not required to be thread-safe; if multiple
|
||||||
* threads will be writing to an instance concurrently, the application must synchronize its calls.
|
* threads will be writing to an instance concurrently, the application must synchronize its calls.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue