mirror of https://github.com/grpc/grpc-java.git
stub: ignore unary response msg if status is not OK (#6288)
* stub: ignore unary response msg if status is not OK * stub: throw if no msg with OK status * address the comment, improve tests. * fix error message * fix error message * improve naming of the tests * call onCompleted on success unary flow * fix test * handle errors for delayed unary message sending * clean up the onCompleted/onError logic * use hasMessageThat to produce better error message when fail
This commit is contained in:
parent
45d49a56cc
commit
fe46edacea
|
|
@ -401,6 +401,7 @@ public final class ClientCalls {
|
||||||
private final CallToStreamObserverAdapter<ReqT> adapter;
|
private final CallToStreamObserverAdapter<ReqT> adapter;
|
||||||
private final boolean streamingResponse;
|
private final boolean streamingResponse;
|
||||||
private boolean firstResponseReceived;
|
private boolean firstResponseReceived;
|
||||||
|
private RespT unaryMessage;
|
||||||
|
|
||||||
// Non private to avoid synthetic class
|
// Non private to avoid synthetic class
|
||||||
StreamObserverToCallListenerAdapter(
|
StreamObserverToCallListenerAdapter(
|
||||||
|
|
@ -431,7 +432,13 @@ public final class ClientCalls {
|
||||||
.asRuntimeException();
|
.asRuntimeException();
|
||||||
}
|
}
|
||||||
firstResponseReceived = true;
|
firstResponseReceived = true;
|
||||||
observer.onNext(message);
|
|
||||||
|
if (streamingResponse) {
|
||||||
|
observer.onNext(message);
|
||||||
|
} else {
|
||||||
|
// will send message in onClose() for unary calls.
|
||||||
|
unaryMessage = message;
|
||||||
|
}
|
||||||
|
|
||||||
if (streamingResponse && adapter.autoFlowControlEnabled) {
|
if (streamingResponse && adapter.autoFlowControlEnabled) {
|
||||||
// Request delivery of the next inbound message.
|
// Request delivery of the next inbound message.
|
||||||
|
|
@ -441,10 +448,28 @@ public final class ClientCalls {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onClose(Status status, Metadata trailers) {
|
public void onClose(Status status, Metadata trailers) {
|
||||||
|
Throwable error = null;
|
||||||
if (status.isOk()) {
|
if (status.isOk()) {
|
||||||
|
if (!streamingResponse) {
|
||||||
|
if (unaryMessage != null) {
|
||||||
|
try {
|
||||||
|
observer.onNext(unaryMessage);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
error = t;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error = Status.INTERNAL.withDescription("Response message is null for unary call")
|
||||||
|
.asRuntimeException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error = status.asRuntimeException(trailers);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (error == null) {
|
||||||
observer.onCompleted();
|
observer.onCompleted();
|
||||||
} else {
|
} else {
|
||||||
observer.onError(status.asRuntimeException(trailers));
|
observer.onError(error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -99,6 +99,123 @@ public class ClientCallsTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void unaryAsyncCallStatusIsOkWithMessageSuccess() throws Exception {
|
||||||
|
Integer req = 2;
|
||||||
|
final String resp = "bar";
|
||||||
|
final Status status = Status.OK;
|
||||||
|
final Metadata trailers = new Metadata();
|
||||||
|
final List<String> actualResponse = new ArrayList<>();
|
||||||
|
final List<Boolean> completed = new ArrayList<>();
|
||||||
|
|
||||||
|
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
|
||||||
|
@Override
|
||||||
|
public void start(ClientCall.Listener<String> listener, Metadata headers) {
|
||||||
|
listener.onMessage(resp);
|
||||||
|
listener.onClose(status, trailers);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
StreamObserver<String> responseObserver = new StreamObserver<String>() {
|
||||||
|
@Override
|
||||||
|
public void onNext(String value) {
|
||||||
|
actualResponse.add(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
fail("Should not reach here");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCompleted() {
|
||||||
|
completed.add(true);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ClientCalls.asyncUnaryCall(call, req, responseObserver);
|
||||||
|
assertThat(actualResponse.size()).isEqualTo(1);
|
||||||
|
assertEquals(resp, actualResponse.get(0));
|
||||||
|
assertThat(completed.size()).isEqualTo(1);
|
||||||
|
assertThat(completed.get(0)).isTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void unaryAsyncCallStatusIsOkWithNullMessageGetError() throws Exception {
|
||||||
|
Integer req = 2;
|
||||||
|
final Status status = Status.OK;
|
||||||
|
final Metadata trailers = new Metadata();
|
||||||
|
final List<Throwable> expected = new ArrayList<>();
|
||||||
|
|
||||||
|
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
|
||||||
|
@Override
|
||||||
|
public void start(ClientCall.Listener<String> listener, Metadata headers) {
|
||||||
|
listener.onMessage(null);
|
||||||
|
listener.onClose(status, trailers);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
StreamObserver<String> responseObserver = new StreamObserver<String>() {
|
||||||
|
@Override
|
||||||
|
public void onNext(String value) {
|
||||||
|
fail("Should not reach here");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
expected.add(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCompleted() {
|
||||||
|
fail("Should not reach here");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ClientCalls.asyncUnaryCall(call, req, responseObserver);
|
||||||
|
assertThat(expected.size()).isEqualTo(1);
|
||||||
|
assertThat(expected.get(0)).hasMessageThat()
|
||||||
|
.isEqualTo("INTERNAL: Response message is null for unary call");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void unaryAsyncCallStatusIsNotOkWithMessageDoNotSendMessage() throws Exception {
|
||||||
|
Integer req = 2;
|
||||||
|
final Status status = Status.INTERNAL.withDescription("Unique status");
|
||||||
|
final String resp = "bar";
|
||||||
|
final Metadata trailers = new Metadata();
|
||||||
|
final List<Throwable> expected = new ArrayList<>();
|
||||||
|
|
||||||
|
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
|
||||||
|
@Override
|
||||||
|
public void start(io.grpc.ClientCall.Listener<String> listener, Metadata headers) {
|
||||||
|
listener.onMessage(resp);
|
||||||
|
listener.onClose(status, trailers);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
StreamObserver<String> responseObserver = new StreamObserver<String>() {
|
||||||
|
@Override
|
||||||
|
public void onNext(String value) {
|
||||||
|
fail("Should not reach here");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
expected.add(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCompleted() {
|
||||||
|
fail("Should not reach here");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ClientCalls.asyncUnaryCall(call, req, responseObserver);
|
||||||
|
assertThat(expected.size()).isEqualTo(1);
|
||||||
|
assertThat(expected.get(0)).hasMessageThat().isEqualTo("INTERNAL: Unique status");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void unaryBlockingCallSuccess() throws Exception {
|
public void unaryBlockingCallSuccess() throws Exception {
|
||||||
Integer req = 2;
|
Integer req = 2;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue