From fe46edacea7436e99fef15e5bb4b7850535999a9 Mon Sep 17 00:00:00 2001 From: Ran Date: Thu, 17 Oct 2019 14:49:53 -0700 Subject: [PATCH] stub: ignore unary response msg if status is not OK (#6288) * stub: ignore unary response msg if status is not OK * stub: throw if no msg with OK status * address the comment, improve tests. * fix error message * fix error message * improve naming of the tests * call onCompleted on success unary flow * fix test * handle errors for delayed unary message sending * clean up the onCompleted/onError logic * use hasMessageThat to produce better error message when fail --- .../main/java/io/grpc/stub/ClientCalls.java | 29 ++++- .../java/io/grpc/stub/ClientCallsTest.java | 117 ++++++++++++++++++ 2 files changed, 144 insertions(+), 2 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index 4c78b1b652..82793cd3d8 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -401,6 +401,7 @@ public final class ClientCalls { private final CallToStreamObserverAdapter adapter; private final boolean streamingResponse; private boolean firstResponseReceived; + private RespT unaryMessage; // Non private to avoid synthetic class StreamObserverToCallListenerAdapter( @@ -431,7 +432,13 @@ public final class ClientCalls { .asRuntimeException(); } firstResponseReceived = true; - observer.onNext(message); + + if (streamingResponse) { + observer.onNext(message); + } else { + // will send message in onClose() for unary calls. + unaryMessage = message; + } if (streamingResponse && adapter.autoFlowControlEnabled) { // Request delivery of the next inbound message. @@ -441,10 +448,28 @@ public final class ClientCalls { @Override public void onClose(Status status, Metadata trailers) { + Throwable error = null; if (status.isOk()) { + if (!streamingResponse) { + if (unaryMessage != null) { + try { + observer.onNext(unaryMessage); + } catch (Throwable t) { + error = t; + } + } else { + error = Status.INTERNAL.withDescription("Response message is null for unary call") + .asRuntimeException(); + } + } + } else { + error = status.asRuntimeException(trailers); + } + + if (error == null) { observer.onCompleted(); } else { - observer.onError(status.asRuntimeException(trailers)); + observer.onError(error); } } diff --git a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java index a636487706..9e70fb1765 100644 --- a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java @@ -99,6 +99,123 @@ public class ClientCallsTest { } } + @Test + public void unaryAsyncCallStatusIsOkWithMessageSuccess() throws Exception { + Integer req = 2; + final String resp = "bar"; + final Status status = Status.OK; + final Metadata trailers = new Metadata(); + final List actualResponse = new ArrayList<>(); + final List completed = new ArrayList<>(); + + NoopClientCall call = new NoopClientCall() { + @Override + public void start(ClientCall.Listener listener, Metadata headers) { + listener.onMessage(resp); + listener.onClose(status, trailers); + } + }; + + StreamObserver responseObserver = new StreamObserver() { + @Override + public void onNext(String value) { + actualResponse.add(value); + } + + @Override + public void onError(Throwable t) { + fail("Should not reach here"); + } + + @Override + public void onCompleted() { + completed.add(true); + } + }; + + ClientCalls.asyncUnaryCall(call, req, responseObserver); + assertThat(actualResponse.size()).isEqualTo(1); + assertEquals(resp, actualResponse.get(0)); + assertThat(completed.size()).isEqualTo(1); + assertThat(completed.get(0)).isTrue(); + } + + @Test + public void unaryAsyncCallStatusIsOkWithNullMessageGetError() throws Exception { + Integer req = 2; + final Status status = Status.OK; + final Metadata trailers = new Metadata(); + final List expected = new ArrayList<>(); + + NoopClientCall call = new NoopClientCall() { + @Override + public void start(ClientCall.Listener listener, Metadata headers) { + listener.onMessage(null); + listener.onClose(status, trailers); + } + }; + + StreamObserver responseObserver = new StreamObserver() { + @Override + public void onNext(String value) { + fail("Should not reach here"); + } + + @Override + public void onError(Throwable t) { + expected.add(t); + } + + @Override + public void onCompleted() { + fail("Should not reach here"); + } + }; + + ClientCalls.asyncUnaryCall(call, req, responseObserver); + assertThat(expected.size()).isEqualTo(1); + assertThat(expected.get(0)).hasMessageThat() + .isEqualTo("INTERNAL: Response message is null for unary call"); + } + + @Test + public void unaryAsyncCallStatusIsNotOkWithMessageDoNotSendMessage() throws Exception { + Integer req = 2; + final Status status = Status.INTERNAL.withDescription("Unique status"); + final String resp = "bar"; + final Metadata trailers = new Metadata(); + final List expected = new ArrayList<>(); + + NoopClientCall call = new NoopClientCall() { + @Override + public void start(io.grpc.ClientCall.Listener listener, Metadata headers) { + listener.onMessage(resp); + listener.onClose(status, trailers); + } + }; + + StreamObserver responseObserver = new StreamObserver() { + @Override + public void onNext(String value) { + fail("Should not reach here"); + } + + @Override + public void onError(Throwable t) { + expected.add(t); + } + + @Override + public void onCompleted() { + fail("Should not reach here"); + } + }; + + ClientCalls.asyncUnaryCall(call, req, responseObserver); + assertThat(expected.size()).isEqualTo(1); + assertThat(expected.get(0)).hasMessageThat().isEqualTo("INTERNAL: Unique status"); + } + @Test public void unaryBlockingCallSuccess() throws Exception { Integer req = 2;