From 84eaefed6bac646dd39d3d5563aaa2590cab808b Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Wed, 28 Dec 2016 17:22:46 -0800 Subject: [PATCH] interop-testing: Add status_code_and_message interop test --- .../integration/AbstractInteropTest.java | 47 +++++++++++++++++++ .../grpc/testing/integration/TestCases.java | 1 + .../integration/TestServiceClient.java | 5 ++ .../testing/integration/TestServiceImpl.java | 40 +++++++++++++++- .../testing/integration/TestCasesTest.java | 4 +- 5 files changed, 93 insertions(+), 4 deletions(-) diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index b230856dc3..48c28e34bd 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -78,6 +78,7 @@ import io.grpc.stub.MetadataUtils; import io.grpc.stub.StreamObserver; import io.grpc.testing.StreamRecorder; import io.grpc.testing.TestUtils; +import io.grpc.testing.integration.Messages.EchoStatus; import io.grpc.testing.integration.Messages.Payload; import io.grpc.testing.integration.Messages.PayloadType; import io.grpc.testing.integration.Messages.ResponseParameters; @@ -924,6 +925,52 @@ public abstract class AbstractInteropTest { } } + @Test(timeout = 10000) + public void statusCodeAndMessage() throws Exception { + int errorCode = 2; + String errorMessage = "test status message"; + EchoStatus responseStatus = EchoStatus.newBuilder() + .setCode(errorCode) + .setMessage(errorMessage) + .build(); + SimpleRequest simpleRequest = SimpleRequest.newBuilder() + .setResponseStatus(responseStatus) + .build(); + StreamingOutputCallRequest streamingRequest = StreamingOutputCallRequest.newBuilder() + .setResponseStatus(responseStatus) + .build(); + + // Test UnaryCall + try { + blockingStub.unaryCall(simpleRequest); + fail(); + } catch (StatusRuntimeException e) { + assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode()); + assertEquals(errorMessage, e.getStatus().getDescription()); + } + if (metricsExpected()) { + assertClientMetrics("grpc.testing.TestService/UnaryCall", Status.Code.UNKNOWN); + } + + // Test FullDuplexCall + @SuppressWarnings("unchecked") + StreamObserver responseObserver = + (StreamObserver) mock(StreamObserver.class); + StreamObserver requestObserver + = asyncStub.fullDuplexCall(responseObserver); + requestObserver.onNext(streamingRequest); + requestObserver.onCompleted(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Throwable.class); + verify(responseObserver, timeout(operationTimeoutMillis())).onError(captor.capture()); + assertEquals(Status.UNKNOWN.getCode(), Status.fromThrowable(captor.getValue()).getCode()); + assertEquals(errorMessage, Status.fromThrowable(captor.getValue()).getDescription()); + verifyNoMoreInteractions(responseObserver); + if (metricsExpected()) { + assertClientMetrics("grpc.testing.TestService/FullDuplexCall", Status.Code.UNKNOWN); + } + } + /** Sends an rpc to an unimplemented method within TestService. */ @Test(timeout = 10000) public void unimplementedMethod() { diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java index b35f6c05bb..03faf935ee 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestCases.java @@ -49,6 +49,7 @@ public enum TestCases { OAUTH2_AUTH_TOKEN("raw oauth2 access token auth"), PER_RPC_CREDS("per rpc raw oauth2 access token auth"), CUSTOM_METADATA("unary and full duplex calls with metadata"), + STATUS_CODE_AND_MESSAGE("request error code and message"), UNIMPLEMENTED_METHOD("call an unimplemented RPC method"), UNIMPLEMENTED_SERVICE("call an unimplemented RPC service"), CANCEL_AFTER_BEGIN("cancel stream after starting it"), diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index f8c81c8dd4..678d2de150 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -268,6 +268,11 @@ public class TestServiceClient { break; } + case STATUS_CODE_AND_MESSAGE: { + tester.statusCodeAndMessage(); + break; + } + case UNIMPLEMENTED_METHOD: { tester.unimplementedMethod(); break; diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index bfa9b08688..87708af274 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -31,6 +31,7 @@ package io.grpc.testing.integration; +import com.google.common.base.Preconditions; import com.google.common.collect.Queues; import com.google.protobuf.ByteString; import com.google.protobuf.EmptyProtos; @@ -55,6 +56,7 @@ import java.util.Queue; import java.util.Random; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.GuardedBy; /** * Implementation of the business logic for the TestService. Uses an executor to schedule chunks @@ -131,6 +133,14 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { .setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE) .setBody(payload); } + + if (req.hasResponseStatus()) { + obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode()) + .withDescription(req.getResponseStatus().getMessage()) + .asRuntimeException()); + return; + } + responseObserver.onNext(responseBuilder.build()); responseObserver.onCompleted(); } @@ -186,13 +196,22 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { return new StreamObserver() { @Override public void onNext(StreamingOutputCallRequest request) { + if (request.hasResponseStatus()) { + dispatcher.cancel(); + responseObserver.onError(Status.fromCodeValue(request.getResponseStatus().getCode()) + .withDescription(request.getResponseStatus().getMessage()) + .asRuntimeException()); + return; + } dispatcher.enqueue(toChunkQueue(request)); } @Override public void onCompleted() { - // Tell the dispatcher that all input has been received. - dispatcher.completeInput(); + if (!dispatcher.isCancelled()) { + // Tell the dispatcher that all input has been received. + dispatcher.completeInput(); + } } @Override @@ -239,6 +258,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { private final Queue chunks; private final StreamObserver responseStream; private boolean scheduled; + @GuardedBy("this") private boolean cancelled; private Throwable failure; private Runnable dispatchTask = new Runnable() { @Override @@ -295,11 +315,27 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase { return this; } + /** + * Allows the service to cancel the remaining responses. + */ + public synchronized void cancel() { + Preconditions.checkState(!cancelled, "Dispatcher already cancelled"); + chunks.clear(); + cancelled = true; + } + + public synchronized boolean isCancelled() { + return cancelled; + } + /** * Dispatches the current response chunk to the client. This is only called by the executor. At * any time, a given dispatch task should only be registered with the executor once. */ private synchronized void dispatchChunk() { + if (cancelled) { + return; + } try { // Pop off the next chunk and send it to the client. Chunk chunk = chunks.remove(); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java index 8044a311d5..90e10fc949 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TestCasesTest.java @@ -58,8 +58,8 @@ public class TestCasesTest { String[] testCases = {"empty_unary", "large_unary", "client_streaming", "server_streaming", "ping_pong", "empty_stream", "compute_engine_creds", "service_account_creds", "jwt_token_creds", "oauth2_auth_token", "per_rpc_creds", "custom_metadata", - "unimplemented_method", "unimplemented_service", "cancel_after_begin", - "cancel_after_first_response", "timeout_on_sleeping_server"}; + "status_code_and_message", "unimplemented_method", "unimplemented_service", + "cancel_after_begin", "cancel_after_first_response", "timeout_on_sleeping_server"}; assertEquals(testCases.length, TestCases.values().length);