From 264dc09cfd55d82e51e8723fbeb2741e45066864 Mon Sep 17 00:00:00 2001 From: Xiao Hang Date: Thu, 3 Sep 2015 17:08:24 -0700 Subject: [PATCH] Port test cases in AbstractTransportTest to android interop test The test scenarios all remain the same. The changes are just in proto to nanoproto conversion and removing mocktio --- .../integrationtest/InteropTester.java | 537 ++++++++++++++++-- .../grpc/android/integrationtest/test.proto | 7 + 2 files changed, 485 insertions(+), 59 deletions(-) diff --git a/android-interop-testing/app/src/main/java/io/grpc/android/integrationtest/InteropTester.java b/android-interop-testing/app/src/main/java/io/grpc/android/integrationtest/InteropTester.java index 098a26dc01..3824617d5f 100644 --- a/android-interop-testing/app/src/main/java/io/grpc/android/integrationtest/InteropTester.java +++ b/android-interop-testing/app/src/main/java/io/grpc/android/integrationtest/InteropTester.java @@ -41,13 +41,28 @@ import android.os.Build; import android.support.annotation.Nullable; import android.util.Log; -import io.grpc.android.integrationtest.nano.Messages; -import io.grpc.ChannelImpl; -import io.grpc.stub.StreamObserver; -import io.grpc.okhttp.OkHttpChannelBuilder; -import io.grpc.testing.StreamRecorder; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNull; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; -import junit.framework.Assert; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.StatusRuntimeException; +import io.grpc.android.integrationtest.nano.Messages; +import io.grpc.android.integrationtest.nano.Messages.Payload; +import io.grpc.android.integrationtest.nano.Messages.ResponseParameters; +import io.grpc.android.integrationtest.nano.Messages.SimpleRequest; +import io.grpc.android.integrationtest.nano.Messages.SimpleResponse; +import io.grpc.android.integrationtest.nano.Messages.StreamingInputCallRequest; +import io.grpc.android.integrationtest.nano.Messages.StreamingInputCallResponse; +import io.grpc.android.integrationtest.nano.Messages.StreamingOutputCallRequest; +import io.grpc.android.integrationtest.nano.Messages.StreamingOutputCallResponse; +import io.grpc.okhttp.OkHttpChannelBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.StreamRecorder; import java.io.InputStream; import java.io.PrintWriter; @@ -59,6 +74,7 @@ import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -74,20 +90,43 @@ import javax.security.auth.x500.X500Principal; public final class InteropTester extends AsyncTask { final static String SUCCESS_MESSAGE = "Succeed!!!"; - private ChannelImpl channel; + private ManagedChannel channel; private TestServiceGrpc.TestServiceBlockingStub blockingStub; private TestServiceGrpc.TestService asyncStub; private String testCase; private TestListener listener; + private static int OPERATION_TIMEOUT = 5000; + + class ResponseObserver implements StreamObserver { + public LinkedBlockingQueue responses = new LinkedBlockingQueue(); + final Object magicTailResponse = new Object(); + + @Override + public void onNext(Messages.StreamingOutputCallResponse value) { + responses.add(value); + } + + @Override + public void onError(Throwable t) { + Log.e(TesterActivity.LOG_TAG, "Encounter an error", t); + responses.add(t); + } + + @Override + public void onCompleted() { + responses.add(magicTailResponse); + } + } + public InteropTester(String testCase, - String host, - int port, - @Nullable String serverHostOverride, - boolean useTls, - @Nullable InputStream testCa, - @Nullable String androidSocketFactoryTls, - TestListener listener) { + String host, + int port, + @Nullable String serverHostOverride, + boolean useTls, + @Nullable InputStream testCa, + @Nullable String androidSocketFactoryTls, + TestListener listener) { this.testCase = testCase; this.listener = listener; @@ -154,6 +193,21 @@ public final class InteropTester extends AsyncTask { clientStreaming(); serverStreaming(); pingPong(); + emptyStream(); + cancelAfterBegin(); + cancelAfterFirstResponse(); + fullDuplexCallShouldSucceed(); + halfDuplexCallShouldSucceed(); + serverStreamingShouldBeFlowControlled(); + veryLargeRequest(); + veryLargeResponse(); + deadlineNotExceeded(); + deadlineExceeded(); + deadlineExceededServerStreaming(); + unimplementedMethod(); + timeoutOnSleepingServer(); + // This has to be the last one, because it will shut down the channel. + gracefulShutdown(); } else if ("empty_unary".equals(testCase)) { emptyUnary(); } else if ("large_unary".equals(testCase)) { @@ -164,27 +218,55 @@ public final class InteropTester extends AsyncTask { serverStreaming(); } else if ("ping_pong".equals(testCase)) { pingPong(); + } else if ("empty_stream".equals(testCase)) { + emptyStream(); + } else if ("cancel_after_begin".equals(testCase)) { + cancelAfterBegin(); + } else if ("cancel_after_first_response".equals(testCase)) { + cancelAfterFirstResponse(); + } else if ("full_duplex_call_should_succeed".equals(testCase)) { + fullDuplexCallShouldSucceed(); + } else if ("half_duplex_call_should_succeed".equals(testCase)) { + halfDuplexCallShouldSucceed(); + } else if ("server_streaming_should_be_flow_controlled".equals(testCase)) { + serverStreamingShouldBeFlowControlled(); + } else if ("very_large_request".equals(testCase)) { + veryLargeRequest(); + } else if ("very_large_response".equals(testCase)) { + veryLargeResponse(); + } else if ("deadline_not_exceeded".equals(testCase)) { + deadlineNotExceeded(); + } else if ("deadline_exceeded".equals(testCase)) { + deadlineExceeded(); + } else if ("deadline_exceeded_server_streaming".equals(testCase)) { + deadlineExceededServerStreaming(); + } else if ("unimplemented_method".equals(testCase)) { + unimplementedMethod(); + } else if ("timeout_on_sleeping_server".equals(testCase)) { + timeoutOnSleepingServer(); + } else if ("graceful_shutdown".equals(testCase)) { + gracefulShutdown(); } else { throw new IllegalArgumentException("Unimplemented/Unknown test case: " + testCase); } } public void emptyUnary() { - assertEquals(new EmptyProtos.Empty(), blockingStub.emptyCall(new EmptyProtos.Empty())); + assertMessageEquals(new EmptyProtos.Empty(), blockingStub.emptyCall(new EmptyProtos.Empty())); } public void largeUnary() { final Messages.SimpleRequest request = new Messages.SimpleRequest(); request.responseSize = 314159; request.responseType = Messages.COMPRESSABLE; - request.payload = new Messages.Payload(); + request.payload = new Payload(); request.payload.body = new byte[271828]; final Messages.SimpleResponse goldenResponse = new Messages.SimpleResponse(); - goldenResponse.payload = new Messages.Payload(); + goldenResponse.payload = new Payload(); goldenResponse.payload.body = new byte[314159]; Messages.SimpleResponse response = blockingStub.unaryCall(request); - assertEquals(goldenResponse, response); + assertMessageEquals(goldenResponse, response); } public void serverStreaming() throws Exception { @@ -203,7 +285,7 @@ public final class InteropTester extends AsyncTask { new Messages.StreamingOutputCallResponse[4]; for (int i = 0; i < 4; i++) { goldenResponses[i] = new Messages.StreamingOutputCallResponse(); - goldenResponses[i].payload = new Messages.Payload(); + goldenResponses[i].payload = new Payload(); goldenResponses[i].payload.type = Messages.COMPRESSABLE; } goldenResponses[0].payload.body = new byte[31415]; @@ -215,14 +297,14 @@ public final class InteropTester extends AsyncTask { asyncStub.streamingOutputCall(request, recorder); recorder.awaitCompletion(); assertSuccess(recorder); - assertEquals(Arrays.asList(goldenResponses), recorder.getValues()); + assertMessageEquals(Arrays.asList(goldenResponses), recorder.getValues()); } public void clientStreaming() throws Exception { final Messages.StreamingInputCallRequest[] requests = new Messages.StreamingInputCallRequest[4]; for (int i = 0; i < 4; i++) { requests[i] = new Messages.StreamingInputCallRequest(); - requests[i].payload = new Messages.Payload(); + requests[i].payload = new Payload(); } requests[0].payload.body = new byte[27182]; requests[1].payload.body = new byte[8]; @@ -240,7 +322,7 @@ public final class InteropTester extends AsyncTask { requestObserver.onNext(request); } requestObserver.onCompleted(); - assertEquals(goldenResponse, responseObserver.firstValue().get()); + assertMessageEquals(goldenResponse, responseObserver.firstValue().get()); } public void pingPong() throws Exception { @@ -250,7 +332,7 @@ public final class InteropTester extends AsyncTask { requests[i] = new Messages.StreamingOutputCallRequest(); requests[i].responseParameters = new Messages.ResponseParameters[1]; requests[i].responseParameters[0] = new Messages.ResponseParameters(); - requests[i].payload = new Messages.Payload(); + requests[i].payload = new Payload(); } requests[0].responseParameters[0].size = 31415; requests[0].payload.body = new byte[27182]; @@ -266,7 +348,7 @@ public final class InteropTester extends AsyncTask { new Messages.StreamingOutputCallResponse[4]; for (int i = 0; i < 4; i++) { goldenResponses[i] = new Messages.StreamingOutputCallResponse(); - goldenResponses[i].payload = new Messages.Payload(); + goldenResponses[i].payload = new Payload(); goldenResponses[i].payload.type = Messages.COMPRESSABLE; } goldenResponses[0].payload.body = new byte[31415]; @@ -274,47 +356,384 @@ public final class InteropTester extends AsyncTask { goldenResponses[2].payload.body = new byte[2653]; goldenResponses[3].payload.body = new byte[58979]; - final LinkedBlockingQueue responses = new LinkedBlockingQueue<>(); - final Object magicTailResponse = new Object(); @SuppressWarnings("unchecked") - StreamObserver responseObserver = - new StreamObserver() { - - @Override - public void onNext(Messages.StreamingOutputCallResponse value) { - responses.add(value); - } - - @Override - public void onError(Throwable t) { - Log.e(TesterActivity.LOG_TAG, "Encounter an error", t); - responses.add(t); - } - - @Override - public void onCompleted() { - responses.add(magicTailResponse); - } - }; + ResponseObserver responseObserver = new ResponseObserver(); StreamObserver requestObserver = asyncStub.fullDuplexCall(responseObserver); for (int i = 0; i < requests.length; i++) { requestObserver.onNext(requests[i]); - Object response = responses.poll(5, TimeUnit.SECONDS); + Object response = responseObserver.responses.poll(OPERATION_TIMEOUT, TimeUnit.SECONDS); if (!(response instanceof Messages.StreamingOutputCallResponse)) { - Assert.fail("Unexpected: " + response); + fail("Unexpected: " + response); } - assertEquals(goldenResponses[i], (Messages.StreamingOutputCallResponse) response); - Assert.assertTrue("More than 1 responses received for ping pong test.", responses.isEmpty()); + assertMessageEquals(goldenResponses[i], (Messages.StreamingOutputCallResponse) response); + assertTrue("More than 1 responses received for ping pong test.", + responseObserver.responses.isEmpty()); } requestObserver.onCompleted(); - Assert.assertEquals(magicTailResponse, responses.poll(5, TimeUnit.SECONDS)); + assertEquals(responseObserver.magicTailResponse, + responseObserver.responses.poll(OPERATION_TIMEOUT, TimeUnit.SECONDS)); } - public static void assertEquals(MessageNano expected, MessageNano actual) { + public void emptyStream() throws Exception { + @SuppressWarnings("unchecked") + ResponseObserver responseObserver = new ResponseObserver(); + StreamObserver requestObserver + = asyncStub.fullDuplexCall(responseObserver); + requestObserver.onCompleted(); + assertEquals(responseObserver.magicTailResponse, + responseObserver.responses.poll(OPERATION_TIMEOUT, TimeUnit.SECONDS)); + } + + public void cancelAfterBegin() throws Exception { + StreamRecorder responseObserver = StreamRecorder.create(); + StreamObserver requestObserver = + asyncStub.streamingInputCall(responseObserver); + requestObserver.onError(new RuntimeException()); + responseObserver.awaitCompletion(); + assertEquals(Arrays.asList(), responseObserver.getValues()); + assertEquals(io.grpc.Status.CANCELLED.getCode(), + io.grpc.Status.fromThrowable(responseObserver.getError()).getCode()); + } + + public void cancelAfterFirstResponse() throws Exception { + final StreamingOutputCallRequest request = new StreamingOutputCallRequest(); + request.responseParameters = new Messages.ResponseParameters[1]; + request.responseParameters[0] = new ResponseParameters(); + request.responseParameters[0].size = 31415; + request.payload = new Payload(); + request.payload.body = new byte[27182]; + final StreamingOutputCallResponse goldenResponse = new StreamingOutputCallResponse(); + goldenResponse.payload = new Payload(); + goldenResponse.payload.type = Messages.COMPRESSABLE; + goldenResponse.payload.body = new byte[31415]; + + ResponseObserver responseObserver = new ResponseObserver(); + StreamObserver requestObserver + = asyncStub.fullDuplexCall(responseObserver); + requestObserver.onNext(request); + Object response = responseObserver.responses.poll(OPERATION_TIMEOUT, TimeUnit.SECONDS); + if (!(response instanceof Messages.StreamingOutputCallResponse)) { + fail("Unexpected: " + response); + } + assertMessageEquals(goldenResponse, (Messages.StreamingOutputCallResponse) response); + + requestObserver.onError(new RuntimeException()); + response = responseObserver.responses.poll(OPERATION_TIMEOUT, TimeUnit.SECONDS); + if (!(response instanceof Throwable)) { + fail("Unexpected: " + response); + } + assertEquals(io.grpc.Status.CANCELLED.getCode(), + io.grpc.Status.fromThrowable((Throwable) response).getCode()); + } + + public void fullDuplexCallShouldSucceed() throws Exception { + // Build the request. + Integer[] responseSizes = {50, 100, 150, 200}; + final StreamingOutputCallRequest request = new StreamingOutputCallRequest(); + request.responseParameters = new ResponseParameters[responseSizes.length]; + request.responseType = Messages.COMPRESSABLE; + for (int i = 0; i < responseSizes.length; ++i) { + request.responseParameters[i] = new ResponseParameters(); + request.responseParameters[i].size = responseSizes[i]; + request.responseParameters[i].intervalUs = 0; + } + + StreamRecorder recorder = StreamRecorder.create(); + StreamObserver requestStream = + asyncStub.fullDuplexCall(recorder); + + final int numRequests = 10; + for (int ix = numRequests; ix > 0; --ix) { + requestStream.onNext(request); + } + requestStream.onCompleted(); + recorder.awaitCompletion(); + assertSuccess(recorder); + assertEquals(responseSizes.length * numRequests, recorder.getValues().size()); + for (int ix = 0; ix < recorder.getValues().size(); ++ix) { + StreamingOutputCallResponse response = recorder.getValues().get(ix); + assertEquals(Messages.COMPRESSABLE, response.payload.type); + int length = response.payload.body.length; + int expectedSize = responseSizes[ix % responseSizes.length]; + assertEquals("comparison failed at index " + ix, expectedSize, length); + } + } + + public void halfDuplexCallShouldSucceed() throws Exception { + // Build the request. + Integer[] responseSizes = {50, 100, 150, 200}; + final StreamingOutputCallRequest request = new StreamingOutputCallRequest(); + request.responseParameters = new ResponseParameters[responseSizes.length]; + request.responseType = Messages.COMPRESSABLE; + for (int i = 0; i < responseSizes.length; ++i) { + request.responseParameters[i] = new ResponseParameters(); + request.responseParameters[i].size = responseSizes[i]; + request.responseParameters[i].intervalUs = 0; + } + + StreamRecorder recorder = StreamRecorder.create(); + StreamObserver requestStream = asyncStub.halfDuplexCall(recorder); + + final int numRequests = 10; + for (int ix = numRequests; ix > 0; --ix) { + requestStream.onNext(request); + } + requestStream.onCompleted(); + recorder.awaitCompletion(); + assertSuccess(recorder); + assertEquals(responseSizes.length * numRequests, recorder.getValues().size()); + for (int ix = 0; ix < recorder.getValues().size(); ++ix) { + StreamingOutputCallResponse response = recorder.getValues().get(ix); + assertEquals(Messages.COMPRESSABLE, response.payload.type); + int length = response.payload.body.length; + int expectedSize = responseSizes[ix % responseSizes.length]; + assertEquals("comparison failed at index " + ix, expectedSize, length); + } + } + + public void serverStreamingShouldBeFlowControlled() throws Exception { + final StreamingOutputCallRequest request = new StreamingOutputCallRequest(); + request.responseType = Messages.COMPRESSABLE; + request.responseParameters = new ResponseParameters[2]; + request.responseParameters[0] = new ResponseParameters(); + request.responseParameters[0].size = 100000; + request.responseParameters[1] = new ResponseParameters(); + request.responseParameters[1].size = 100001; + final StreamingOutputCallResponse[] goldenResponses = new StreamingOutputCallResponse[2]; + goldenResponses[0] = new StreamingOutputCallResponse(); + goldenResponses[0].payload = new Payload(); + goldenResponses[0].payload.type = Messages.COMPRESSABLE; + goldenResponses[0].payload.body = new byte[100000]; + goldenResponses[1] = new StreamingOutputCallResponse(); + goldenResponses[1].payload = new Payload(); + goldenResponses[1].payload.type = Messages.COMPRESSABLE; + goldenResponses[1].payload.body = new byte[100001]; + + long start = System.nanoTime(); + + final ArrayBlockingQueue queue = new ArrayBlockingQueue(10); + ClientCall call = + channel.newCall(TestServiceGrpc.METHOD_STREAMING_OUTPUT_CALL, CallOptions.DEFAULT); + call.start(new ClientCall.Listener() { + @Override + public void onHeaders(Metadata headers) {} + + @Override + public void onMessage(final StreamingOutputCallResponse message) { + queue.add(message); + } + + @Override + public void onClose(io.grpc.Status status, Metadata trailers) { + queue.add(status); + } + }, new Metadata()); + call.sendMessage(request); + call.halfClose(); + + // Time how long it takes to get the first response. + call.request(1); + assertMessageEquals(goldenResponses[0], + (StreamingOutputCallResponse) queue.poll(OPERATION_TIMEOUT, TimeUnit.MILLISECONDS)); + long firstCallDuration = System.nanoTime() - start; + + // Without giving additional flow control, make sure that we don't get another response. We wait + // until we are comfortable the next message isn't coming. We may have very low nanoTime + // resolution (like on Windows) or be using a testing, in-process transport where message + // handling is instantaneous. In both cases, firstCallDuration may be 0, so round up sleep time + // to at least 1ms. + assertNull(queue.poll(Math.max(firstCallDuration * 4, 1 * 1000 * 1000), TimeUnit.NANOSECONDS)); + + // Make sure that everything still completes. + call.request(1); + assertMessageEquals(goldenResponses[1], + (StreamingOutputCallResponse) queue.poll(OPERATION_TIMEOUT, TimeUnit.MILLISECONDS)); + assertEquals(io.grpc.Status.OK, queue.poll(OPERATION_TIMEOUT, TimeUnit.MILLISECONDS)); + } + + public void veryLargeRequest() throws Exception { + final SimpleRequest request = new SimpleRequest(); + request.payload = new Payload(); + request.payload.type = Messages.COMPRESSABLE; + request.payload.body = new byte[unaryPayloadLength()]; + request.responseSize = 10; + request.responseType = Messages.COMPRESSABLE; + final SimpleResponse goldenResponse = new SimpleResponse(); + goldenResponse.payload = new Payload(); + goldenResponse.payload.type = Messages.COMPRESSABLE; + goldenResponse.payload.body = new byte[10]; + + assertMessageEquals(goldenResponse, blockingStub.unaryCall(request)); + } + + public void veryLargeResponse() throws Exception { + final SimpleRequest request = new SimpleRequest(); + request.responseSize = unaryPayloadLength(); + request.responseType = Messages.COMPRESSABLE; + + final SimpleResponse goldenResponse = new SimpleResponse(); + goldenResponse.payload = new Payload(); + goldenResponse.payload.type = Messages.COMPRESSABLE; + goldenResponse.payload.body = new byte[unaryPayloadLength()]; + + assertMessageEquals(goldenResponse, blockingStub.unaryCall(request)); + } + + public void deadlineNotExceeded() { + // warm up the channel and JVM + blockingStub.emptyCall(new EmptyProtos.Empty()); + StreamingOutputCallRequest request = new StreamingOutputCallRequest(); + request.responseParameters = new ResponseParameters[1]; + request.responseParameters[0] = new ResponseParameters(); + request.responseParameters[0].intervalUs = 0; + TestServiceGrpc.newBlockingStub(channel) + .withDeadlineAfter(10, TimeUnit.SECONDS) + .streamingOutputCall(request); + } + + public void deadlineExceeded() { + // warm up the channel and JVM + blockingStub.emptyCall(new EmptyProtos.Empty()); + TestServiceGrpc.TestServiceBlockingStub stub = TestServiceGrpc.newBlockingStub(channel) + .withDeadlineAfter(10, TimeUnit.MILLISECONDS); + try { + StreamingOutputCallRequest request = new StreamingOutputCallRequest(); + request.responseParameters = new ResponseParameters[1]; + request.responseParameters[0] = new ResponseParameters(); + request.responseParameters[0].intervalUs = 20000; + stub.streamingOutputCall(request).next(); + fail("Expected deadline to be exceeded"); + } catch (Throwable t) { + assertEquals(io.grpc.Status.DEADLINE_EXCEEDED.getCode(), + io.grpc.Status.fromThrowable(t).getCode()); + } + } + + public void deadlineExceededServerStreaming() throws Exception { + // warm up the channel and JVM + blockingStub.emptyCall(new EmptyProtos.Empty()); + ResponseParameters responseParameters = new ResponseParameters(); + responseParameters.size = 1; + responseParameters.intervalUs = 10000; + StreamingOutputCallRequest request = new StreamingOutputCallRequest(); + request.responseType = Messages.COMPRESSABLE; + request.responseParameters = new ResponseParameters[4]; + request.responseParameters[0] = responseParameters; + request.responseParameters[1] = responseParameters; + request.responseParameters[2] = responseParameters; + request.responseParameters[3] = responseParameters; + StreamRecorder recorder = StreamRecorder.create(); + TestServiceGrpc.newStub(channel) + .withDeadlineAfter(30, TimeUnit.MILLISECONDS) + .streamingOutputCall(request, recorder); + recorder.awaitCompletion(); + assertEquals(io.grpc.Status.DEADLINE_EXCEEDED.getCode(), + io.grpc.Status.fromThrowable(recorder.getError()).getCode()); + } + + protected int unaryPayloadLength() { + // 10MiB. + return 10485760; + } + + public void gracefulShutdown() throws Exception { + StreamingOutputCallRequest[] requests = new StreamingOutputCallRequest[3]; + requests[0] = new StreamingOutputCallRequest(); + requests[0].responseParameters = new ResponseParameters[1]; + requests[0].responseParameters[0] = new ResponseParameters(); + requests[0].responseParameters[0].size = 3; + requests[0].payload = new Payload(); + requests[0].payload.body = new byte[2]; + requests[1] = new StreamingOutputCallRequest(); + requests[1].responseParameters = new ResponseParameters[1]; + requests[1].responseParameters[0] = new ResponseParameters(); + requests[1].responseParameters[0].size = 1; + requests[1].payload = new Payload(); + requests[1].payload.body = new byte[7]; + requests[2] = new StreamingOutputCallRequest(); + requests[2].responseParameters = new ResponseParameters[1]; + requests[2].responseParameters[0] = new ResponseParameters(); + requests[2].responseParameters[0].size = 4; + requests[2].payload = new Payload(); + requests[2].payload.body = new byte[1]; + + StreamingOutputCallResponse[] goldenResponses = new StreamingOutputCallResponse[3]; + goldenResponses[0] = new StreamingOutputCallResponse(); + goldenResponses[0].payload = new Payload(); + goldenResponses[0].payload.type = Messages.COMPRESSABLE; + goldenResponses[0].payload.body = new byte[3]; + goldenResponses[1] = new StreamingOutputCallResponse(); + goldenResponses[1].payload = new Payload(); + goldenResponses[1].payload.type = Messages.COMPRESSABLE; + goldenResponses[1].payload.body = new byte[1]; + goldenResponses[2] = new StreamingOutputCallResponse(); + goldenResponses[2].payload = new Payload(); + goldenResponses[2].payload.type = Messages.COMPRESSABLE; + goldenResponses[2].payload.body = new byte[4]; + + + ResponseObserver responseObserver = new ResponseObserver(); + StreamObserver requestObserver + = asyncStub.fullDuplexCall(responseObserver); + requestObserver.onNext(requests[0]); + Object response = responseObserver.responses.poll(OPERATION_TIMEOUT, TimeUnit.SECONDS); + assertTrue(response instanceof Messages.StreamingOutputCallResponse); + assertMessageEquals(goldenResponses[0], (Messages.StreamingOutputCallResponse) response); + // Initiate graceful shutdown. + channel.shutdown(); + // The previous ping-pong could have raced with the shutdown, but this one certainly shouldn't. + requestObserver.onNext(requests[1]); + response = responseObserver.responses.poll(OPERATION_TIMEOUT, TimeUnit.SECONDS); + assertTrue(response instanceof Messages.StreamingOutputCallResponse); + assertMessageEquals(goldenResponses[1], (Messages.StreamingOutputCallResponse) response); + requestObserver.onNext(requests[2]); + response = responseObserver.responses.poll(OPERATION_TIMEOUT, TimeUnit.SECONDS); + assertTrue(response instanceof Messages.StreamingOutputCallResponse); + assertMessageEquals(goldenResponses[2], (Messages.StreamingOutputCallResponse) response); + requestObserver.onCompleted(); + assertEquals(responseObserver.magicTailResponse, + responseObserver.responses.poll(OPERATION_TIMEOUT, TimeUnit.SECONDS)); + } + + /** Sends an rpc to an unimplemented method on the server. */ + public void unimplementedMethod() { + UnimplementedServiceGrpc.UnimplementedServiceBlockingStub stub = + UnimplementedServiceGrpc.newBlockingStub(channel); + try { + stub.unimplementedCall(new EmptyProtos.Empty()); + fail(); + } catch (StatusRuntimeException e) { + assertEquals(io.grpc.Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode()); + } + } + + /** Start a fullDuplexCall which the server will not respond, and verify the deadline expires. */ + public void timeoutOnSleepingServer() throws Exception { + TestServiceGrpc.TestService stub = TestServiceGrpc.newStub(channel) + .withDeadlineAfter(1, TimeUnit.MILLISECONDS); + StreamRecorder recorder = StreamRecorder.create(); + StreamObserver requestObserver + = stub.fullDuplexCall(recorder); + + try { + StreamingOutputCallRequest request = new StreamingOutputCallRequest(); + request.payload = new Messages.Payload(); + request.payload.body = new byte[27182]; + requestObserver.onNext(request); + } catch (IllegalStateException expected) { + // This can happen if the stream has already been terminated due to deadline exceeded. + } + + recorder.awaitCompletion(); + assertEquals(io.grpc.Status.DEADLINE_EXCEEDED.getCode(), + io.grpc.Status.fromThrowable(recorder.getError()).getCode()); + } + + public static void assertMessageEquals(MessageNano expected, MessageNano actual) { if (!MessageNano.messageNanoEquals(expected, actual)) { - Assert.assertEquals(expected.toString(), actual.toString()); - Assert.fail("Messages not equal, but assertEquals didn't throw"); + assertEquals(expected.toString(), actual.toString()); + fail("Messages not equal, but assertEquals didn't throw"); } } @@ -324,15 +743,15 @@ public final class InteropTester extends AsyncTask { } } - public static void assertEquals(List expected, - List actual) { + public static void assertMessageEquals(List expected, + List actual) { if (expected == null || actual == null) { - Assert.assertEquals(expected, actual); + assertEquals(expected, actual); } else if (expected.size() != actual.size()) { - Assert.assertEquals(expected, actual); + assertEquals(expected, actual); } else { for (int i = 0; i < expected.size(); i++) { - assertEquals(expected.get(i), actual.get(i)); + assertMessageEquals(expected.get(i), actual.get(i)); } } } @@ -397,4 +816,4 @@ public final class InteropTester extends AsyncTask { void onPostTest(String result); } -} +} \ No newline at end of file diff --git a/android-interop-testing/app/src/main/proto/io/grpc/android/integrationtest/test.proto b/android-interop-testing/app/src/main/proto/io/grpc/android/integrationtest/test.proto index b5f0a1653f..c6df4ee8f1 100644 --- a/android-interop-testing/app/src/main/proto/io/grpc/android/integrationtest/test.proto +++ b/android-interop-testing/app/src/main/proto/io/grpc/android/integrationtest/test.proto @@ -71,3 +71,10 @@ service TestService { rpc HalfDuplexCall(stream StreamingOutputCallRequest) returns (stream StreamingOutputCallResponse); } + +// A simple service NOT implemented at servers so clients can test for +// that case. +service UnimplementedService { + // A call that no server should implement + rpc UnimplementedCall(grpc.testing.Empty) returns(grpc.testing.Empty); +}