Fix InProcessTransport to call onReady

Fixes #875
This commit is contained in:
Eric Anderson 2016-02-06 22:47:09 -08:00
parent 5b9726ea7d
commit 86f2c9f224
1 changed files with 37 additions and 7 deletions

View File

@ -231,14 +231,27 @@ class InProcessTransport implements ServerTransport, ClientTransport {
@Override
public void request(int numMessages) {
clientStream.serverRequested(numMessages);
boolean onReady = clientStream.serverRequested(numMessages);
if (onReady) {
synchronized (this) {
if (!closed) {
clientStreamListener.onReady();
}
}
}
}
// This method is the only reason we have to synchronize field accesses.
private synchronized void clientRequested(int numMessages) {
/**
* Client requested more messages.
*
* @return whether onReady should be called on the server
*/
private synchronized boolean clientRequested(int numMessages) {
if (closed) {
return;
return false;
}
boolean previouslyReady = clientRequested > 0;
clientRequested += numMessages;
while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
clientRequested--;
@ -246,12 +259,14 @@ class InProcessTransport implements ServerTransport, ClientTransport {
}
// Attempt being reentrant-safe
if (closed) {
return;
return false;
}
if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
closed = true;
clientStreamListener.closed(clientNotifyStatus, clientNotifyTrailers);
}
boolean nowReady = clientRequested > 0;
return !previouslyReady && nowReady;
}
private void clientCancelled(Status status) {
@ -366,14 +381,27 @@ class InProcessTransport implements ServerTransport, ClientTransport {
@Override
public void request(int numMessages) {
serverStream.clientRequested(numMessages);
boolean onReady = serverStream.clientRequested(numMessages);
if (onReady) {
synchronized (this) {
if (!closed) {
serverStreamListener.onReady();
}
}
}
}
// This method is the only reason we have to synchronize field accesses.
private synchronized void serverRequested(int numMessages) {
/**
* Client requested more messages.
*
* @return whether onReady should be called on the server
*/
private synchronized boolean serverRequested(int numMessages) {
if (closed) {
return;
return false;
}
boolean previouslyReady = serverRequested > 0;
serverRequested += numMessages;
while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
serverRequested--;
@ -383,6 +411,8 @@ class InProcessTransport implements ServerTransport, ClientTransport {
serverNotifyHalfClose = false;
serverStreamListener.halfClosed();
}
boolean nowReady = serverRequested > 0;
return !previouslyReady && nowReady;
}
private void serverClosed(Status status) {