From 09acf2f6b7ee052debfd57d473b8f87d4c20d34e Mon Sep 17 00:00:00 2001 From: John Cormie Date: Mon, 22 Jan 2024 10:30:32 -0800 Subject: [PATCH] transports can send a msg larger than their flow control window (#10842) --- .../grpc/internal/AbstractTransportTest.java | 80 ++++++++++++++++--- .../io/grpc/servlet/JettyTransportTest.java | 6 ++ .../io/grpc/servlet/TomcatTransportTest.java | 7 ++ .../grpc/servlet/UndertowTransportTest.java | 7 ++ 4 files changed, 89 insertions(+), 11 deletions(-) diff --git a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java index 2e99085fe6..57d870575d 100644 --- a/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java +++ b/core/src/testFixtures/java/io/grpc/internal/AbstractTransportTest.java @@ -1354,8 +1354,8 @@ public abstract class AbstractTransportTest { serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); serverTransport = serverTransportListener.transport; - ClientStream clientStream = client.newStream( - methodDescriptor, new Metadata(), callOptions, tracers); + ClientStream clientStream = + client.newStream(methodDescriptor, new Metadata(), callOptions, tracers); ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); clientStream.start(clientStreamListener); StreamCreation serverStreamCreation = @@ -1366,15 +1366,7 @@ public abstract class AbstractTransportTest { serverStream.writeHeaders(new Metadata(), true); - String largeMessage; - { - int size = 1 * 1024; - StringBuilder sb = new StringBuilder(size); - for (int i = 0; i < size; i++) { - sb.append('a'); - } - largeMessage = sb.toString(); - } + String largeMessage = newString(1024); serverStream.request(1); assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS)); @@ -1494,6 +1486,64 @@ public abstract class AbstractTransportTest { assertEquals(status.getDescription(), clientStreamStatus.getDescription()); } + @Test + public void flowControlDoesNotDeadlockLargeMessage() throws Exception { + server.start(serverListener); + client = newClientTransport(server); + startTransport(client, mockClientTransportListener); + MockServerTransportListener serverTransportListener = + serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + serverTransport = serverTransportListener.transport; + + ClientStream clientStream = + client.newStream(methodDescriptor, new Metadata(), callOptions, tracers); + ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase(); + clientStream.start(clientStreamListener); + StreamCreation serverStreamCreation = + serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS); + assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method); + ServerStream serverStream = serverStreamCreation.stream; + ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener; + + serverStream.writeHeaders(new Metadata(), true); + + String largeMessage = newString(TEST_FLOW_CONTROL_WINDOW + 1); + + serverStream.request(1); + assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + assertTrue(clientStream.isReady()); + clientStream.writeMessage(methodDescriptor.streamRequest(largeMessage)); + clientStream.flush(); + doPingPong(serverListener); + + verifyMessageCountAndClose(serverStreamListener.messageQueue, 1); + + clientStream.request(1); + assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + assertTrue(serverStream.isReady()); + serverStream.writeMessage(methodDescriptor.streamResponse(largeMessage)); + serverStream.flush(); + doPingPong(serverListener); + + verifyMessageCountAndClose(clientStreamListener.messageQueue, 1); + + // And now check that the streams can still complete normally. + clientStream.halfClose(); + doPingPong(serverListener); + serverStream.request(1); + assertTrue(serverStreamListener.awaitHalfClosed(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + + Status status = Status.OK.withDescription("... quite a lengthy discussion"); + serverStream.close(status, new Metadata()); + doPingPong(serverListener); + clientStream.request(1); + assertCodeEquals(Status.OK, serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS); + assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + assertEquals(status.getCode(), clientStreamStatus.getCode()); + assertEquals(status.getDescription(), clientStreamStatus.getDescription()); + } + private int verifyMessageCountAndClose(BlockingQueue messageQueue, int count) throws Exception { InputStream message; @@ -2399,4 +2449,12 @@ public abstract class AbstractTransportTest { throws ExecutionException, InterruptedException { return socket.getStats().get().data; } + + private static String newString(int size) { + StringBuilder sb = new StringBuilder(size); + for (int i = 0; i < size; i++) { + sb.append('a'); + } + return sb.toString(); + } } diff --git a/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java b/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java index 7941afc9b4..f21754fb68 100644 --- a/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java +++ b/servlet/src/jettyTest/java/io/grpc/servlet/JettyTransportTest.java @@ -199,6 +199,12 @@ public class JettyTransportTest extends AbstractTransportTest { public void flowControlPushBack() { } + @Override + @Ignore("Servlet flow control not implemented yet") + @Test + public void flowControlDoesNotDeadlockLargeMessage() { + } + // FIXME @Override @Ignore("Jetty is broken on client RST_STREAM") diff --git a/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatTransportTest.java b/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatTransportTest.java index 43c69e13fd..262036883a 100644 --- a/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatTransportTest.java +++ b/servlet/src/tomcatTest/java/io/grpc/servlet/TomcatTransportTest.java @@ -218,6 +218,13 @@ public class TomcatTransportTest extends AbstractTransportTest { @Test public void flowControlPushBack() {} + // FIXME + @Override + @Ignore("Servlet flow control not implemented yet") + @Test + public void flowControlDoesNotDeadlockLargeMessage() { + } + @Override @Ignore("Server side sockets are managed by the servlet container") @Test diff --git a/servlet/src/undertowTest/java/io/grpc/servlet/UndertowTransportTest.java b/servlet/src/undertowTest/java/io/grpc/servlet/UndertowTransportTest.java index 9d894b5e3f..e14c11985d 100644 --- a/servlet/src/undertowTest/java/io/grpc/servlet/UndertowTransportTest.java +++ b/servlet/src/undertowTest/java/io/grpc/servlet/UndertowTransportTest.java @@ -266,6 +266,13 @@ public class UndertowTransportTest extends AbstractTransportTest { @Test public void flowControlPushBack() {} + // FIXME + @Override + @Ignore("Servlet flow control not implemented yet") + @Test + public void flowControlDoesNotDeadlockLargeMessage() { + } + @Override @Ignore("Server side sockets are managed by the servlet container") @Test