mirror of https://github.com/grpc/grpc-java.git
testing: Fix flake in AbstractTransportTest.flowControlPushBack
This attempts to fix a flake seen exactly once with the
currently-disabled OkHttpTransportTest.flowControlPushBack:
```
java.lang.AssertionError
at org.junit.Assert.fail(Assert.java:86)
at org.junit.Assert.assertTrue(Assert.java:41)
at org.junit.Assert.assertTrue(Assert.java:52)
at io.grpc.internal.testing.AbstractTransportTest.flowControlPushBack(AbstractTransportTest.java:1300)
```
That was a failure for assertTrue(serverStream.isReady()), because the
awaitOnReady was finding the previous invocation of onReady. We now
track how many times it has been called. This was a bug introduced in
a8db154702 but wouldn't have been noticed since the in-process transport
is deterministic.
This commit is contained in:
parent
95fd47d747
commit
1da3133fdf
|
|
@ -709,7 +709,7 @@ public abstract class AbstractTransportTest {
|
||||||
assertNotNull(serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
|
assertNotNull(serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
|
||||||
|
|
||||||
serverStream.request(1);
|
serverStream.request(1);
|
||||||
assertTrue(clientStreamListener.awaitOnReady(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
||||||
assertTrue(clientStream.isReady());
|
assertTrue(clientStream.isReady());
|
||||||
clientStream.writeMessage(methodDescriptor.streamRequest("Hello!"));
|
clientStream.writeMessage(methodDescriptor.streamRequest("Hello!"));
|
||||||
assertThat(clientStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)");
|
assertThat(clientStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)");
|
||||||
|
|
@ -761,7 +761,7 @@ public abstract class AbstractTransportTest {
|
||||||
Lists.newArrayList(headers.getAll(binaryKey)));
|
Lists.newArrayList(headers.getAll(binaryKey)));
|
||||||
|
|
||||||
clientStream.request(1);
|
clientStream.request(1);
|
||||||
assertTrue(serverStreamListener.awaitOnReady(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
||||||
assertTrue(serverStream.isReady());
|
assertTrue(serverStream.isReady());
|
||||||
serverStream.writeMessage(methodDescriptor.streamResponse("Hi. Who are you?"));
|
serverStream.writeMessage(methodDescriptor.streamResponse("Hi. Who are you?"));
|
||||||
assertThat(serverStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)");
|
assertThat(serverStreamTracer1.nextOutboundEvent()).isEqualTo("outboundMessage(0)");
|
||||||
|
|
@ -1121,7 +1121,7 @@ public abstract class AbstractTransportTest {
|
||||||
assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method);
|
assertEquals(methodDescriptor.getFullMethodName(), serverStreamCreation.method);
|
||||||
ServerStream serverStream = serverStreamCreation.stream;
|
ServerStream serverStream = serverStreamCreation.stream;
|
||||||
ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
|
ServerStreamListenerBase serverStreamListener = serverStreamCreation.listener;
|
||||||
assertTrue(serverStreamListener.awaitOnReady(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
||||||
|
|
||||||
assertTrue(serverStream.isReady());
|
assertTrue(serverStream.isReady());
|
||||||
serverStream.writeHeaders(new Metadata());
|
serverStream.writeHeaders(new Metadata());
|
||||||
|
|
@ -1231,7 +1231,7 @@ public abstract class AbstractTransportTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
serverStream.request(1);
|
serverStream.request(1);
|
||||||
assertTrue(clientStreamListener.awaitOnReady(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
||||||
assertTrue(clientStream.isReady());
|
assertTrue(clientStream.isReady());
|
||||||
final int maxToSend = 10 * 1024;
|
final int maxToSend = 10 * 1024;
|
||||||
int clientSent;
|
int clientSent;
|
||||||
|
|
@ -1256,7 +1256,7 @@ public abstract class AbstractTransportTest {
|
||||||
int serverReceived = verifyMessageCountAndClose(serverStreamListener.messageQueue, 1);
|
int serverReceived = verifyMessageCountAndClose(serverStreamListener.messageQueue, 1);
|
||||||
|
|
||||||
clientStream.request(1);
|
clientStream.request(1);
|
||||||
assertTrue(serverStreamListener.awaitOnReady(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
||||||
assertTrue(serverStream.isReady());
|
assertTrue(serverStream.isReady());
|
||||||
int serverSent;
|
int serverSent;
|
||||||
// Verify that flow control will push back on server.
|
// Verify that flow control will push back on server.
|
||||||
|
|
@ -1293,10 +1293,9 @@ public abstract class AbstractTransportTest {
|
||||||
serverReceived +=
|
serverReceived +=
|
||||||
verifyMessageCountAndClose(serverStreamListener.messageQueue, clientSent - serverReceived);
|
verifyMessageCountAndClose(serverStreamListener.messageQueue, clientSent - serverReceived);
|
||||||
|
|
||||||
assertTrue(clientStreamListener.awaitOnReady(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
assertTrue(clientStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
||||||
assertTrue(clientStreamListener.awaitOnReady(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
|
||||||
assertTrue(clientStream.isReady());
|
assertTrue(clientStream.isReady());
|
||||||
assertTrue(serverStreamListener.awaitOnReady(TIMEOUT_MS, TimeUnit.MILLISECONDS)); // ???
|
assertTrue(serverStreamListener.awaitOnReadyAndDrain(TIMEOUT_MS, TimeUnit.MILLISECONDS));
|
||||||
assertTrue(serverStream.isReady());
|
assertTrue(serverStream.isReady());
|
||||||
|
|
||||||
// Request four more
|
// Request four more
|
||||||
|
|
@ -1895,12 +1894,22 @@ public abstract class AbstractTransportTest {
|
||||||
|
|
||||||
private static class ServerStreamListenerBase implements ServerStreamListener {
|
private static class ServerStreamListenerBase implements ServerStreamListener {
|
||||||
private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<InputStream>();
|
private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<InputStream>();
|
||||||
private final CountDownLatch onReadyLatch = new CountDownLatch(1);
|
// Would have used Void instead of Object, but null elements are not allowed
|
||||||
|
private final BlockingQueue<Object> readyQueue = new LinkedBlockingQueue<Object>();
|
||||||
private final CountDownLatch halfClosedLatch = new CountDownLatch(1);
|
private final CountDownLatch halfClosedLatch = new CountDownLatch(1);
|
||||||
private final SettableFuture<Status> status = SettableFuture.create();
|
private final SettableFuture<Status> status = SettableFuture.create();
|
||||||
|
|
||||||
private boolean awaitOnReady(int timeout, TimeUnit unit) throws Exception {
|
private boolean awaitOnReady(int timeout, TimeUnit unit) throws Exception {
|
||||||
return onReadyLatch.await(timeout, unit);
|
return readyQueue.poll(timeout, unit) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean awaitOnReadyAndDrain(int timeout, TimeUnit unit) throws Exception {
|
||||||
|
if (!awaitOnReady(timeout, unit)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// Throw the rest away
|
||||||
|
readyQueue.drainTo(Lists.newArrayList());
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean awaitHalfClosed(int timeout, TimeUnit unit) throws Exception {
|
private boolean awaitHalfClosed(int timeout, TimeUnit unit) throws Exception {
|
||||||
|
|
@ -1923,7 +1932,7 @@ public abstract class AbstractTransportTest {
|
||||||
if (status.isDone()) {
|
if (status.isDone()) {
|
||||||
fail("onReady invoked after closed");
|
fail("onReady invoked after closed");
|
||||||
}
|
}
|
||||||
onReadyLatch.countDown();
|
readyQueue.add(new Object());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -1945,13 +1954,23 @@ public abstract class AbstractTransportTest {
|
||||||
|
|
||||||
private static class ClientStreamListenerBase implements ClientStreamListener {
|
private static class ClientStreamListenerBase implements ClientStreamListener {
|
||||||
private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<InputStream>();
|
private final BlockingQueue<InputStream> messageQueue = new LinkedBlockingQueue<InputStream>();
|
||||||
private final CountDownLatch onReadyLatch = new CountDownLatch(1);
|
// Would have used Void instead of Object, but null elements are not allowed
|
||||||
|
private final BlockingQueue<Object> readyQueue = new LinkedBlockingQueue<Object>();
|
||||||
private final SettableFuture<Metadata> headers = SettableFuture.create();
|
private final SettableFuture<Metadata> headers = SettableFuture.create();
|
||||||
private final SettableFuture<Metadata> trailers = SettableFuture.create();
|
private final SettableFuture<Metadata> trailers = SettableFuture.create();
|
||||||
private final SettableFuture<Status> status = SettableFuture.create();
|
private final SettableFuture<Status> status = SettableFuture.create();
|
||||||
|
|
||||||
private boolean awaitOnReady(int timeout, TimeUnit unit) throws Exception {
|
private boolean awaitOnReady(int timeout, TimeUnit unit) throws Exception {
|
||||||
return onReadyLatch.await(timeout, unit);
|
return readyQueue.poll(timeout, unit) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean awaitOnReadyAndDrain(int timeout, TimeUnit unit) throws Exception {
|
||||||
|
if (!awaitOnReady(timeout, unit)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// Throw the rest away
|
||||||
|
readyQueue.drainTo(Lists.newArrayList());
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -1970,7 +1989,7 @@ public abstract class AbstractTransportTest {
|
||||||
if (status.isDone()) {
|
if (status.isDone()) {
|
||||||
fail("onReady invoked after closed");
|
fail("onReady invoked after closed");
|
||||||
}
|
}
|
||||||
onReadyLatch.countDown();
|
readyQueue.add(new Object());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue