diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index d8ee7a9eb1..12adecbd84 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -17,6 +17,7 @@ package io.grpc.stub; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -321,6 +322,8 @@ public final class ClientCalls { private final ClientCall call; private Runnable onReadyHandler; private boolean autoFlowControlEnabled = true; + private boolean aborted = false; + private boolean completed = false; // Non private to avoid synthetic class CallToStreamObserverAdapter(ClientCall call) { @@ -333,17 +336,21 @@ public final class ClientCalls { @Override public void onNext(T value) { + checkState(!aborted, "Stream was terminated by error, no further calls are allowed"); + checkState(!completed, "Stream is already completed, no further calls are allowed"); call.sendMessage(value); } @Override public void onError(Throwable t) { call.cancel("Cancelled by client with StreamObserver.onError()", t); + aborted = true; } @Override public void onCompleted() { call.halfClose(); + completed = true; } @Override diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index 14a3c3c22e..00e9f9cad1 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -310,6 +310,8 @@ public final class ServerCalls { private boolean sentHeaders; private Runnable onReadyHandler; private Runnable onCancelHandler; + private boolean aborted = false; + private boolean completed = false; // Non private to avoid synthetic class ServerCallStreamObserverImpl(ServerCall call) { @@ -338,6 +340,8 @@ public final class ServerCalls { } return; } + checkState(!aborted, "Stream was terminated by error, no further calls are allowed"); + checkState(!completed, "Stream is already completed, no further calls are allowed"); if (!sentHeaders) { call.sendHeaders(new Metadata()); sentHeaders = true; @@ -352,6 +356,7 @@ public final class ServerCalls { metadata = new Metadata(); } call.close(Status.fromThrowable(t), metadata); + aborted = true; } @Override @@ -362,6 +367,7 @@ public final class ServerCalls { } } else { call.close(Status.OK, new Metadata()); + completed = true; } }