binder: Respect requested message limits within a single MessageProducer

This commit is contained in:
Andrei Kandratovich 2022-05-16 18:23:16 +02:00 committed by GitHub
parent 8a84611d9d
commit 2c33e39f5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 31 additions and 1 deletions

View File

@ -468,7 +468,7 @@ abstract class Inbound<L extends StreamListener> implements StreamListener.Messa
if (firstMessage != null) {
stream = firstMessage;
firstMessage = null;
} else if (messageAvailable()) {
} else if (numRequestedMessages > 0 && messageAvailable()) {
stream = assembleNextMessage();
}
if (stream != null) {

View File

@ -1503,6 +1503,36 @@ public abstract class AbstractTransportTest {
return count;
}
@Test
public void messageProducerOnlyProducesRequestedMessages() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener =
serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
// Start an RPC.
ClientStream clientStream = client.newStream(
methodDescriptor, new Metadata(), callOptions, tracers);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
StreamCreation serverStreamCreation =
serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method);
// Have the client send two messages.
clientStream.writeMessage(methodDescriptor.streamRequest("MESSAGE"));
clientStream.writeMessage(methodDescriptor.streamRequest("MESSAGE"));
clientStream.flush();
doPingPong(serverListener);
// Verify server only receives one message if that's all it requests.
serverStreamCreation.stream.request(1);
verifyMessageCountAndClose(serverStreamCreation.listener.messageQueue, 1);
}
@Test
public void interactionsAfterServerStreamCloseAreNoops() throws Exception {
server.start(serverListener);