mirror of https://github.com/grpc/grpc-java.git
interop-testing: Add status_code_and_message interop test
This commit is contained in:
parent
257062fa88
commit
84eaefed6b
|
|
@ -78,6 +78,7 @@ import io.grpc.stub.MetadataUtils;
|
|||
import io.grpc.stub.StreamObserver;
|
||||
import io.grpc.testing.StreamRecorder;
|
||||
import io.grpc.testing.TestUtils;
|
||||
import io.grpc.testing.integration.Messages.EchoStatus;
|
||||
import io.grpc.testing.integration.Messages.Payload;
|
||||
import io.grpc.testing.integration.Messages.PayloadType;
|
||||
import io.grpc.testing.integration.Messages.ResponseParameters;
|
||||
|
|
@ -924,6 +925,52 @@ public abstract class AbstractInteropTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void statusCodeAndMessage() throws Exception {
|
||||
int errorCode = 2;
|
||||
String errorMessage = "test status message";
|
||||
EchoStatus responseStatus = EchoStatus.newBuilder()
|
||||
.setCode(errorCode)
|
||||
.setMessage(errorMessage)
|
||||
.build();
|
||||
SimpleRequest simpleRequest = SimpleRequest.newBuilder()
|
||||
.setResponseStatus(responseStatus)
|
||||
.build();
|
||||
StreamingOutputCallRequest streamingRequest = StreamingOutputCallRequest.newBuilder()
|
||||
.setResponseStatus(responseStatus)
|
||||
.build();
|
||||
|
||||
// Test UnaryCall
|
||||
try {
|
||||
blockingStub.unaryCall(simpleRequest);
|
||||
fail();
|
||||
} catch (StatusRuntimeException e) {
|
||||
assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode());
|
||||
assertEquals(errorMessage, e.getStatus().getDescription());
|
||||
}
|
||||
if (metricsExpected()) {
|
||||
assertClientMetrics("grpc.testing.TestService/UnaryCall", Status.Code.UNKNOWN);
|
||||
}
|
||||
|
||||
// Test FullDuplexCall
|
||||
@SuppressWarnings("unchecked")
|
||||
StreamObserver<StreamingOutputCallResponse> responseObserver =
|
||||
(StreamObserver<StreamingOutputCallResponse>) mock(StreamObserver.class);
|
||||
StreamObserver<StreamingOutputCallRequest> requestObserver
|
||||
= asyncStub.fullDuplexCall(responseObserver);
|
||||
requestObserver.onNext(streamingRequest);
|
||||
requestObserver.onCompleted();
|
||||
|
||||
ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
|
||||
verify(responseObserver, timeout(operationTimeoutMillis())).onError(captor.capture());
|
||||
assertEquals(Status.UNKNOWN.getCode(), Status.fromThrowable(captor.getValue()).getCode());
|
||||
assertEquals(errorMessage, Status.fromThrowable(captor.getValue()).getDescription());
|
||||
verifyNoMoreInteractions(responseObserver);
|
||||
if (metricsExpected()) {
|
||||
assertClientMetrics("grpc.testing.TestService/FullDuplexCall", Status.Code.UNKNOWN);
|
||||
}
|
||||
}
|
||||
|
||||
/** Sends an rpc to an unimplemented method within TestService. */
|
||||
@Test(timeout = 10000)
|
||||
public void unimplementedMethod() {
|
||||
|
|
|
|||
|
|
@ -49,6 +49,7 @@ public enum TestCases {
|
|||
OAUTH2_AUTH_TOKEN("raw oauth2 access token auth"),
|
||||
PER_RPC_CREDS("per rpc raw oauth2 access token auth"),
|
||||
CUSTOM_METADATA("unary and full duplex calls with metadata"),
|
||||
STATUS_CODE_AND_MESSAGE("request error code and message"),
|
||||
UNIMPLEMENTED_METHOD("call an unimplemented RPC method"),
|
||||
UNIMPLEMENTED_SERVICE("call an unimplemented RPC service"),
|
||||
CANCEL_AFTER_BEGIN("cancel stream after starting it"),
|
||||
|
|
|
|||
|
|
@ -268,6 +268,11 @@ public class TestServiceClient {
|
|||
break;
|
||||
}
|
||||
|
||||
case STATUS_CODE_AND_MESSAGE: {
|
||||
tester.statusCodeAndMessage();
|
||||
break;
|
||||
}
|
||||
|
||||
case UNIMPLEMENTED_METHOD: {
|
||||
tester.unimplementedMethod();
|
||||
break;
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@
|
|||
|
||||
package io.grpc.testing.integration;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Queues;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.EmptyProtos;
|
||||
|
|
@ -55,6 +56,7 @@ import java.util.Queue;
|
|||
import java.util.Random;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.annotation.concurrent.GuardedBy;
|
||||
|
||||
/**
|
||||
* Implementation of the business logic for the TestService. Uses an executor to schedule chunks
|
||||
|
|
@ -131,6 +133,14 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
|||
.setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE)
|
||||
.setBody(payload);
|
||||
}
|
||||
|
||||
if (req.hasResponseStatus()) {
|
||||
obs.onError(Status.fromCodeValue(req.getResponseStatus().getCode())
|
||||
.withDescription(req.getResponseStatus().getMessage())
|
||||
.asRuntimeException());
|
||||
return;
|
||||
}
|
||||
|
||||
responseObserver.onNext(responseBuilder.build());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
|
|
@ -186,14 +196,23 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
|||
return new StreamObserver<StreamingOutputCallRequest>() {
|
||||
@Override
|
||||
public void onNext(StreamingOutputCallRequest request) {
|
||||
if (request.hasResponseStatus()) {
|
||||
dispatcher.cancel();
|
||||
responseObserver.onError(Status.fromCodeValue(request.getResponseStatus().getCode())
|
||||
.withDescription(request.getResponseStatus().getMessage())
|
||||
.asRuntimeException());
|
||||
return;
|
||||
}
|
||||
dispatcher.enqueue(toChunkQueue(request));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
if (!dispatcher.isCancelled()) {
|
||||
// Tell the dispatcher that all input has been received.
|
||||
dispatcher.completeInput();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable cause) {
|
||||
|
|
@ -239,6 +258,7 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
|||
private final Queue<Chunk> chunks;
|
||||
private final StreamObserver<StreamingOutputCallResponse> responseStream;
|
||||
private boolean scheduled;
|
||||
@GuardedBy("this") private boolean cancelled;
|
||||
private Throwable failure;
|
||||
private Runnable dispatchTask = new Runnable() {
|
||||
@Override
|
||||
|
|
@ -295,11 +315,27 @@ public class TestServiceImpl extends TestServiceGrpc.TestServiceImplBase {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows the service to cancel the remaining responses.
|
||||
*/
|
||||
public synchronized void cancel() {
|
||||
Preconditions.checkState(!cancelled, "Dispatcher already cancelled");
|
||||
chunks.clear();
|
||||
cancelled = true;
|
||||
}
|
||||
|
||||
public synchronized boolean isCancelled() {
|
||||
return cancelled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatches the current response chunk to the client. This is only called by the executor. At
|
||||
* any time, a given dispatch task should only be registered with the executor once.
|
||||
*/
|
||||
private synchronized void dispatchChunk() {
|
||||
if (cancelled) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
// Pop off the next chunk and send it to the client.
|
||||
Chunk chunk = chunks.remove();
|
||||
|
|
|
|||
|
|
@ -58,8 +58,8 @@ public class TestCasesTest {
|
|||
String[] testCases = {"empty_unary", "large_unary", "client_streaming", "server_streaming",
|
||||
"ping_pong", "empty_stream", "compute_engine_creds", "service_account_creds",
|
||||
"jwt_token_creds", "oauth2_auth_token", "per_rpc_creds", "custom_metadata",
|
||||
"unimplemented_method", "unimplemented_service", "cancel_after_begin",
|
||||
"cancel_after_first_response", "timeout_on_sleeping_server"};
|
||||
"status_code_and_message", "unimplemented_method", "unimplemented_service",
|
||||
"cancel_after_begin", "cancel_after_first_response", "timeout_on_sleeping_server"};
|
||||
|
||||
assertEquals(testCases.length, TestCases.values().length);
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue