From 42fcc501b9542d81b41ad8a32813c2007e278afb Mon Sep 17 00:00:00 2001 From: ejona Date: Tue, 23 Dec 2014 11:17:10 -0800 Subject: [PATCH] Implement empty_stream integration test The test was going to use a queue like ping_pong, but using a mock proved much simpler. Thus, I also updated ping_pong to use the simpler model, because it is useful for the two tests to be similar. InProcessTransportTest failed for empty_stream due to gRPC v2 issues, and so instead of ignoring emptyStream() I found the broken tests that were preventing swapping to gRPC v2 and ignored them instead. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=82733943 --- .../integration/AbstractTransportTest.java | 68 +++++++------------ .../integration/TestServiceClient.java | 2 + 2 files changed, 25 insertions(+), 45 deletions(-) diff --git a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java index e8efd6e9bd..d4400b208f 100644 --- a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java +++ b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java @@ -34,9 +34,11 @@ package com.google.net.stubby.testing.integration; import static com.google.net.stubby.testing.integration.Messages.PayloadType.COMPRESSABLE; import static com.google.net.stubby.testing.integration.Util.assertEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; -import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; @@ -79,7 +81,6 @@ import java.util.List; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -279,52 +280,29 @@ public abstract class AbstractTransportTest { .setBody(ByteString.copyFrom(new byte[58979]))) .build()); - final SynchronousQueue queue = new SynchronousQueue(); - final Object sentinel = new Object(); - StreamObserver requestObserver = asyncStub.fullDuplexCall( - new StreamObserver() { - @Override - public void onValue(StreamingOutputCallResponse response) { - put(response); - } - - @Override - public void onError(Throwable t) { - put(t); - } - - @Override - public void onCompleted() { - put(sentinel); - } - - public void put(Object o) { - try { - queue.put(o); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new AssertionError(ex); - } - } - }); + @SuppressWarnings("unchecked") + StreamObserver responseObserver = mock(StreamObserver.class); + StreamObserver requestObserver + = asyncStub.fullDuplexCall(responseObserver); for (int i = 0; i < requests.size(); i++) { requestObserver.onValue(requests.get(i)); - Object o = queue.take(); - if (o == sentinel) { - fail("Premature onCompleted"); - } else if (o instanceof Throwable) { - throw Throwables.propagate((Throwable) o); - } - try { - assertEquals(goldenResponses.get(i), (StreamingOutputCallResponse) o); - } catch (Exception e) { - requestObserver.onError(e); - while (queue.take() instanceof StreamingOutputCallResponse) {} - throw e; - } + verify(responseObserver, timeout(1000)).onValue(goldenResponses.get(i)); + verifyNoMoreInteractions(responseObserver); } requestObserver.onCompleted(); - assertEquals(sentinel, queue.take()); + verify(responseObserver, timeout(1000)).onCompleted(); + verifyNoMoreInteractions(responseObserver); + } + + @Test + public void emptyStream() throws Exception { + @SuppressWarnings("unchecked") + StreamObserver responseObserver = mock(StreamObserver.class); + StreamObserver requestObserver + = asyncStub.fullDuplexCall(responseObserver); + requestObserver.onCompleted(); + verify(responseObserver, timeout(1000)).onCompleted(); + verifyNoMoreInteractions(responseObserver); } @Test diff --git a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceClient.java b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceClient.java index 5794eaa550..cd04800b3a 100644 --- a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceClient.java +++ b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceClient.java @@ -221,6 +221,8 @@ public class TestServiceClient { tester.serverStreaming(); } else if ("ping_pong".equals(testCase)) { tester.pingPong(); + } else if ("empty_stream".equals(testCase)) { + tester.emptyStream(); } else { throw new IllegalArgumentException("Unknown test case: " + testCase); }