From d50c8d4ec1fb05a55c8a166c804dc8c37559c23e Mon Sep 17 00:00:00 2001 From: Jihun Cho Date: Wed, 1 May 2019 17:13:07 -0700 Subject: [PATCH] stub: improve error message for Client/ServerCalls (#5656) --- stub/src/main/java/io/grpc/stub/ClientCalls.java | 7 +++++++ stub/src/main/java/io/grpc/stub/ServerCalls.java | 6 ++++++ 2 files changed, 13 insertions(+) diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index d8ee7a9eb1..12adecbd84 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -17,6 +17,7 @@ package io.grpc.stub; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -321,6 +322,8 @@ public final class ClientCalls { private final ClientCall call; private Runnable onReadyHandler; private boolean autoFlowControlEnabled = true; + private boolean aborted = false; + private boolean completed = false; // Non private to avoid synthetic class CallToStreamObserverAdapter(ClientCall call) { @@ -333,17 +336,21 @@ public final class ClientCalls { @Override public void onNext(T value) { + checkState(!aborted, "Stream was terminated by error, no further calls are allowed"); + checkState(!completed, "Stream is already completed, no further calls are allowed"); call.sendMessage(value); } @Override public void onError(Throwable t) { call.cancel("Cancelled by client with StreamObserver.onError()", t); + aborted = true; } @Override public void onCompleted() { call.halfClose(); + completed = true; } @Override diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index 14a3c3c22e..00e9f9cad1 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -310,6 +310,8 @@ public final class ServerCalls { private boolean sentHeaders; private Runnable onReadyHandler; private Runnable onCancelHandler; + private boolean aborted = false; + private boolean completed = false; // Non private to avoid synthetic class ServerCallStreamObserverImpl(ServerCall call) { @@ -338,6 +340,8 @@ public final class ServerCalls { } return; } + checkState(!aborted, "Stream was terminated by error, no further calls are allowed"); + checkState(!completed, "Stream is already completed, no further calls are allowed"); if (!sentHeaders) { call.sendHeaders(new Metadata()); sentHeaders = true; @@ -352,6 +356,7 @@ public final class ServerCalls { metadata = new Metadata(); } call.close(Status.fromThrowable(t), metadata); + aborted = true; } @Override @@ -362,6 +367,7 @@ public final class ServerCalls { } } else { call.close(Status.OK, new Metadata()); + completed = true; } }