From 2ca28a2903fd98080fc5c7895fc35a77f5de5732 Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Wed, 10 Mar 2021 13:41:31 -0800 Subject: [PATCH] stub: stabilize StreamObserver APIs Resolves #1788 --- .../grpc/stub/ClientCallStreamObserver.java | 57 ++++++++++++++++-- .../io/grpc/stub/ClientResponseObserver.java | 3 - .../grpc/stub/ServerCallStreamObserver.java | 58 +++++++++++++++++-- .../java/io/grpc/stub/ClientCallsTest.java | 2 +- .../java/io/grpc/stub/ServerCallsTest.java | 2 +- 5 files changed, 107 insertions(+), 15 deletions(-) diff --git a/stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java index e18d19fc37..ea09bb99d5 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ClientCallStreamObserver.java @@ -16,8 +16,6 @@ package io.grpc.stub; -import io.grpc.ExperimentalApi; - import javax.annotation.Nullable; /** @@ -30,7 +28,6 @@ import javax.annotation.Nullable; *

DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create * "real" RPCs suitable for testing and make a fake for the server-side. */ -@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1788") public abstract class ClientCallStreamObserver extends CallStreamObserver { /** * Prevent any further processing for this {@code ClientCallStreamObserver}. No further messages @@ -57,10 +54,60 @@ public abstract class ClientCallStreamObserver extends CallStreamObserver * *

This method may only be called during {@link ClientResponseObserver#beforeStart * ClientResponseObserver.beforeStart()}. - * - *

This API is still a work in-progress and may change in the future. */ public void disableAutoRequestWithInitial(int request) { throw new UnsupportedOperationException(); } + + /** + * If {@code true}, indicates that the observer is capable of sending additional messages + * without requiring excessive buffering internally. This value is just a suggestion and the + * application is free to ignore it, however doing so may result in excessive buffering within the + * observer. + * + *

If {@code false}, the runnable passed to {@link #setOnReadyHandler} will be called after + * {@code isReady()} transitions to {@code true}. + */ + @Override + public abstract boolean isReady(); + + /** + * Set a {@link Runnable} that will be executed every time the stream {@link #isReady()} state + * changes from {@code false} to {@code true}. While it is not guaranteed that the same + * thread will always be used to execute the {@link Runnable}, it is guaranteed that executions + * are serialized with calls to the 'inbound' {@link StreamObserver}. + * + *

On client-side this method may only be called during {@link + * ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial + * call to the application, before the service returns its {@code StreamObserver}. + * + *

Because there is a processing delay to deliver this notification, it is possible for + * concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious" + * notifications by checking {@code isReady()}'s current value instead of assuming it is now + * {@code true}. If {@code isReady() == false} the normal expectations apply, so there would be + * another {@code onReadyHandler} callback. + * + * @param onReadyHandler to call when peer is ready to receive more messages. + */ + @Override + public abstract void setOnReadyHandler(Runnable onReadyHandler); + + /** + * Requests the peer to produce {@code count} more messages to be delivered to the 'inbound' + * {@link StreamObserver}. + * + *

This method is safe to call from multiple threads without external synchronization. + * + * @param count more messages + */ + @Override + public abstract void request(int count); + + /** + * Sets message compression for subsequent calls to {@link #onNext}. + * + * @param enable whether to enable compression. + */ + @Override + public abstract void setMessageCompression(boolean enable); } diff --git a/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java b/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java index 5f5510f6d3..91403b02fc 100644 --- a/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java +++ b/stub/src/main/java/io/grpc/stub/ClientResponseObserver.java @@ -16,13 +16,10 @@ package io.grpc.stub; -import io.grpc.ExperimentalApi; - /** * Specialization of {@link StreamObserver} implemented by clients in order to interact with the * advanced features of a call such as flow-control. */ -@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4693") public interface ClientResponseObserver extends StreamObserver { /** * Called by the runtime priot to the start of a call to provide a reference to the diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java index 7a538b0e5d..3ba1bf563e 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java @@ -16,8 +16,6 @@ package io.grpc.stub; -import io.grpc.ExperimentalApi; - /** * A refinement of {@link CallStreamObserver} to allows for interaction with call * cancellation events on the server side. @@ -28,7 +26,6 @@ import io.grpc.ExperimentalApi; *

DO NOT MOCK: The API is too complex to reliably mock. Use InProcessChannelBuilder to create * "real" RPCs suitable for testing and interact with the server using a normal client stub. */ -@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1788") public abstract class ServerCallStreamObserver extends CallStreamObserver { /** @@ -89,10 +86,61 @@ public abstract class ServerCallStreamObserver extends CallStreamObserver *

  • {@link io.grpc.MethodDescriptor.MethodType#SERVER_STREAMING} operations.
  • * *

    - * - *

    This API is still a work in-progress and may change in the future. */ public void disableAutoRequest() { throw new UnsupportedOperationException(); } + + + /** + * If {@code true}, indicates that the observer is capable of sending additional messages + * without requiring excessive buffering internally. This value is just a suggestion and the + * application is free to ignore it, however doing so may result in excessive buffering within the + * observer. + * + *

    If {@code false}, the runnable passed to {@link #setOnReadyHandler} will be called after + * {@code isReady()} transitions to {@code true}. + */ + @Override + public abstract boolean isReady(); + + /** + * Set a {@link Runnable} that will be executed every time the stream {@link #isReady()} state + * changes from {@code false} to {@code true}. While it is not guaranteed that the same + * thread will always be used to execute the {@link Runnable}, it is guaranteed that executions + * are serialized with calls to the 'inbound' {@link StreamObserver}. + * + *

    On client-side this method may only be called during {@link + * ClientResponseObserver#beforeStart}. On server-side it may only be called during the initial + * call to the application, before the service returns its {@code StreamObserver}. + * + *

    Because there is a processing delay to deliver this notification, it is possible for + * concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious" + * notifications by checking {@code isReady()}'s current value instead of assuming it is now + * {@code true}. If {@code isReady() == false} the normal expectations apply, so there would be + * another {@code onReadyHandler} callback. + * + * @param onReadyHandler to call when peer is ready to receive more messages. + */ + @Override + public abstract void setOnReadyHandler(Runnable onReadyHandler); + + /** + * Requests the peer to produce {@code count} more messages to be delivered to the 'inbound' + * {@link StreamObserver}. + * + *

    This method is safe to call from multiple threads without external synchronization. + * + * @param count more messages + */ + @Override + public abstract void request(int count); + + /** + * Sets message compression for subsequent calls to {@link #onNext}. + * + * @param enable whether to enable compression. + */ + @Override + public abstract void setMessageCompression(boolean enable); } diff --git a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java index 6b54ba1f62..c394fc09de 100644 --- a/stub/src/test/java/io/grpc/stub/ClientCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ClientCallsTest.java @@ -390,7 +390,7 @@ public class ClientCallsTest { ClientCalls.asyncBidiStreamingCall(call, new ClientResponseObserver() { @Override public void beforeStart(ClientCallStreamObserver requestStream) { - requestStream.disableAutoInboundFlowControl(); + requestStream.disableAutoRequestWithInitial(1); } @Override diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index f9ceb82166..a2a1ef9396 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -288,7 +288,7 @@ public class ServerCallsTest { public StreamObserver invoke(StreamObserver responseObserver) { ServerCallStreamObserver serverCallObserver = (ServerCallStreamObserver) responseObserver; - serverCallObserver.disableAutoInboundFlowControl(); + serverCallObserver.disableAutoRequest(); return new ServerCalls.NoopStreamObserver<>(); } });