stub: improve error message for Client/ServerCalls (#5656)

This commit is contained in:
Jihun Cho 2019-05-01 17:13:07 -07:00 committed by GitHub
parent 97cb0554e4
commit d50c8d4ec1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 0 deletions

View File

@ -17,6 +17,7 @@
package io.grpc.stub; package io.grpc.stub;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -321,6 +322,8 @@ public final class ClientCalls {
private final ClientCall<T, ?> call; private final ClientCall<T, ?> call;
private Runnable onReadyHandler; private Runnable onReadyHandler;
private boolean autoFlowControlEnabled = true; private boolean autoFlowControlEnabled = true;
private boolean aborted = false;
private boolean completed = false;
// Non private to avoid synthetic class // Non private to avoid synthetic class
CallToStreamObserverAdapter(ClientCall<T, ?> call) { CallToStreamObserverAdapter(ClientCall<T, ?> call) {
@ -333,17 +336,21 @@ public final class ClientCalls {
@Override @Override
public void onNext(T value) { public void onNext(T value) {
checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
checkState(!completed, "Stream is already completed, no further calls are allowed");
call.sendMessage(value); call.sendMessage(value);
} }
@Override @Override
public void onError(Throwable t) { public void onError(Throwable t) {
call.cancel("Cancelled by client with StreamObserver.onError()", t); call.cancel("Cancelled by client with StreamObserver.onError()", t);
aborted = true;
} }
@Override @Override
public void onCompleted() { public void onCompleted() {
call.halfClose(); call.halfClose();
completed = true;
} }
@Override @Override

View File

@ -310,6 +310,8 @@ public final class ServerCalls {
private boolean sentHeaders; private boolean sentHeaders;
private Runnable onReadyHandler; private Runnable onReadyHandler;
private Runnable onCancelHandler; private Runnable onCancelHandler;
private boolean aborted = false;
private boolean completed = false;
// Non private to avoid synthetic class // Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call) { ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call) {
@ -338,6 +340,8 @@ public final class ServerCalls {
} }
return; return;
} }
checkState(!aborted, "Stream was terminated by error, no further calls are allowed");
checkState(!completed, "Stream is already completed, no further calls are allowed");
if (!sentHeaders) { if (!sentHeaders) {
call.sendHeaders(new Metadata()); call.sendHeaders(new Metadata());
sentHeaders = true; sentHeaders = true;
@ -352,6 +356,7 @@ public final class ServerCalls {
metadata = new Metadata(); metadata = new Metadata();
} }
call.close(Status.fromThrowable(t), metadata); call.close(Status.fromThrowable(t), metadata);
aborted = true;
} }
@Override @Override
@ -362,6 +367,7 @@ public final class ServerCalls {
} }
} else { } else {
call.close(Status.OK, new Metadata()); call.close(Status.OK, new Metadata());
completed = true;
} }
} }