mirror of https://github.com/grpc/grpc-java.git
Implement empty_stream integration test
The test was going to use a queue like ping_pong, but using a mock proved much simpler. Thus, I also updated ping_pong to use the simpler model, because it is useful for the two tests to be similar. InProcessTransportTest failed for empty_stream due to gRPC v2 issues, and so instead of ignoring emptyStream() I found the broken tests that were preventing swapping to gRPC v2 and ignored them instead. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=82733943
This commit is contained in:
parent
b09c26b4dd
commit
42fcc501b9
|
|
@ -34,9 +34,11 @@ package com.google.net.stubby.testing.integration;
|
|||
import static com.google.net.stubby.testing.integration.Messages.PayloadType.COMPRESSABLE;
|
||||
import static com.google.net.stubby.testing.integration.Util.assertEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.timeout;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
|
|
@ -79,7 +81,6 @@ import java.util.List;
|
|||
import java.util.Random;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
|
@ -279,52 +280,29 @@ public abstract class AbstractTransportTest {
|
|||
.setBody(ByteString.copyFrom(new byte[58979])))
|
||||
.build());
|
||||
|
||||
final SynchronousQueue<Object> queue = new SynchronousQueue<Object>();
|
||||
final Object sentinel = new Object();
|
||||
StreamObserver<StreamingOutputCallRequest> requestObserver = asyncStub.fullDuplexCall(
|
||||
new StreamObserver<StreamingOutputCallResponse>() {
|
||||
@Override
|
||||
public void onValue(StreamingOutputCallResponse response) {
|
||||
put(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
put(t);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
put(sentinel);
|
||||
}
|
||||
|
||||
public void put(Object o) {
|
||||
try {
|
||||
queue.put(o);
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new AssertionError(ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
@SuppressWarnings("unchecked")
|
||||
StreamObserver<StreamingOutputCallResponse> responseObserver = mock(StreamObserver.class);
|
||||
StreamObserver<StreamingOutputCallRequest> requestObserver
|
||||
= asyncStub.fullDuplexCall(responseObserver);
|
||||
for (int i = 0; i < requests.size(); i++) {
|
||||
requestObserver.onValue(requests.get(i));
|
||||
Object o = queue.take();
|
||||
if (o == sentinel) {
|
||||
fail("Premature onCompleted");
|
||||
} else if (o instanceof Throwable) {
|
||||
throw Throwables.propagate((Throwable) o);
|
||||
}
|
||||
try {
|
||||
assertEquals(goldenResponses.get(i), (StreamingOutputCallResponse) o);
|
||||
} catch (Exception e) {
|
||||
requestObserver.onError(e);
|
||||
while (queue.take() instanceof StreamingOutputCallResponse) {}
|
||||
throw e;
|
||||
}
|
||||
verify(responseObserver, timeout(1000)).onValue(goldenResponses.get(i));
|
||||
verifyNoMoreInteractions(responseObserver);
|
||||
}
|
||||
requestObserver.onCompleted();
|
||||
assertEquals(sentinel, queue.take());
|
||||
verify(responseObserver, timeout(1000)).onCompleted();
|
||||
verifyNoMoreInteractions(responseObserver);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyStream() throws Exception {
|
||||
@SuppressWarnings("unchecked")
|
||||
StreamObserver<StreamingOutputCallResponse> responseObserver = mock(StreamObserver.class);
|
||||
StreamObserver<StreamingOutputCallRequest> requestObserver
|
||||
= asyncStub.fullDuplexCall(responseObserver);
|
||||
requestObserver.onCompleted();
|
||||
verify(responseObserver, timeout(1000)).onCompleted();
|
||||
verifyNoMoreInteractions(responseObserver);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -221,6 +221,8 @@ public class TestServiceClient {
|
|||
tester.serverStreaming();
|
||||
} else if ("ping_pong".equals(testCase)) {
|
||||
tester.pingPong();
|
||||
} else if ("empty_stream".equals(testCase)) {
|
||||
tester.emptyStream();
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown test case: " + testCase);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue