diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 3498173d61..f9ecf01ac4 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -231,14 +231,27 @@ class InProcessTransport implements ServerTransport, ClientTransport { @Override public void request(int numMessages) { - clientStream.serverRequested(numMessages); + boolean onReady = clientStream.serverRequested(numMessages); + if (onReady) { + synchronized (this) { + if (!closed) { + clientStreamListener.onReady(); + } + } + } } // This method is the only reason we have to synchronize field accesses. - private synchronized void clientRequested(int numMessages) { + /** + * Client requested more messages. + * + * @return whether onReady should be called on the server + */ + private synchronized boolean clientRequested(int numMessages) { if (closed) { - return; + return false; } + boolean previouslyReady = clientRequested > 0; clientRequested += numMessages; while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) { clientRequested--; @@ -246,12 +259,14 @@ class InProcessTransport implements ServerTransport, ClientTransport { } // Attempt being reentrant-safe if (closed) { - return; + return false; } if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) { closed = true; clientStreamListener.closed(clientNotifyStatus, clientNotifyTrailers); } + boolean nowReady = clientRequested > 0; + return !previouslyReady && nowReady; } private void clientCancelled(Status status) { @@ -366,14 +381,27 @@ class InProcessTransport implements ServerTransport, ClientTransport { @Override public void request(int numMessages) { - serverStream.clientRequested(numMessages); + boolean onReady = serverStream.clientRequested(numMessages); + if (onReady) { + synchronized (this) { + if (!closed) { + serverStreamListener.onReady(); + } + } + } } // This method is the only reason we have to synchronize field accesses. - private synchronized void serverRequested(int numMessages) { + /** + * Client requested more messages. + * + * @return whether onReady should be called on the server + */ + private synchronized boolean serverRequested(int numMessages) { if (closed) { - return; + return false; } + boolean previouslyReady = serverRequested > 0; serverRequested += numMessages; while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) { serverRequested--; @@ -383,6 +411,8 @@ class InProcessTransport implements ServerTransport, ClientTransport { serverNotifyHalfClose = false; serverStreamListener.halfClosed(); } + boolean nowReady = serverRequested > 0; + return !previouslyReady && nowReady; } private void serverClosed(Status status) {