examples: Add cancellation example

It uses the echo service for both unary and bidi RPCs, to show the
various cancellation circumstances and APIs.
This commit is contained in:
Eric Anderson 2023-03-22 18:11:32 -07:00 committed by GitHub
parent 6b7cb9e4a4
commit 39c9ebf180
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 492 additions and 0 deletions

View File

@ -51,6 +51,22 @@ java_grpc_library(
deps = [":route_guide_java_proto"],
)
proto_library(
name = "echo_proto",
srcs = ["src/main/proto/grpc/examples/echo/echo.proto"],
)
java_proto_library(
name = "echo_java_proto",
deps = [":echo_proto"],
)
java_grpc_library(
name = "echo_java_grpc",
srcs = [":echo_proto"],
deps = [":echo_java_proto"],
)
java_library(
name = "examples",
testonly = 1,
@ -64,6 +80,8 @@ java_library(
"@io_grpc_grpc_java//netty",
],
deps = [
":echo_java_grpc",
":echo_java_proto",
":hello_streaming_java_grpc",
":hello_streaming_java_proto",
":helloworld_java_grpc",

View File

@ -202,6 +202,20 @@ task keepAliveClient(type: CreateStartScripts) {
classpath = startScripts.classpath
}
task cancellationClient(type: CreateStartScripts) {
mainClass = 'io.grpc.examples.cancellation.CancellationClient'
applicationName = 'cancellation-client'
outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
classpath = startScripts.classpath
}
task cancellationServer(type: CreateStartScripts) {
mainClass = 'io.grpc.examples.cancellation.CancellationServer'
applicationName = 'cancellation-server'
outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
classpath = startScripts.classpath
}
applicationDistribution.into('bin') {
from(routeGuideServer)
from(routeGuideClient)
@ -223,5 +237,7 @@ applicationDistribution.into('bin') {
from(deadlineClient)
from(keepAliveServer)
from(keepAliveClient)
from(cancellationClient)
from(cancellationServer)
fileMode = 0755
}

View File

@ -0,0 +1,204 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.examples.cancellation;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.Context.CancellableContext;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.examples.echo.EchoGrpc;
import io.grpc.examples.echo.EchoRequest;
import io.grpc.examples.echo.EchoResponse;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
/**
* A client that cancels RPCs to an Echo server.
*/
public class CancellationClient {
private final Channel channel;
public CancellationClient(Channel channel) {
this.channel = channel;
}
private void demonstrateCancellation() throws Exception {
echoBlocking("I'M A BLOCKING CLIENT! HEAR ME ROAR!");
// io.grpc.Context can be used to cancel RPCs using any of the stubs. It is the only way to
// cancel blocking stub RPCs. io.grpc.Context is a general-purpose alternative to thread
// interruption and can be used outside of gRPC, like to coordinate within your application.
//
// CancellableContext must always be cancelled or closed at the end of its lifetime, otherwise
// it could "leak" memory.
try (CancellableContext context = Context.current().withCancellation()) {
new Thread(() -> {
try {
Thread.sleep(500); // Do some work
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
// Cancellation reasons are never sent to the server. But they are echoed back to the
// client as the RPC failure reason.
context.cancel(new RuntimeException("Oops. Messed that up, let me try again"));
}).start();
// context.run() attaches the context to this thread for gRPC to observe. It also restores
// the previous context before returning.
context.run(() -> echoBlocking("RAAWRR haha lol hehe AWWRR GRRR"));
}
// Futures cancelled with interruption cancel the RPC.
ListenableFuture<EchoResponse> future = echoFuture("Future clie*cough*nt was here!");
Thread.sleep(500); // Do some work
// We realize we really don't want to hear that echo.
future.cancel(true);
Thread.sleep(100); // Make logs more obvious. Cancel is async
ClientCallStreamObserver<EchoRequest> reqCallObserver = echoAsync("Testing, testing, 1, 2, 3");
reqCallObserver.onCompleted();
Thread.sleep(500); // Make logs more obvious. Wait for completion
// Async's onError() will cancel. But the method can't be called concurrently with other calls
// on the StreamObserver. If you need thread-safety, use CancellableContext as above.
StreamObserver<EchoRequest> reqObserver = echoAsync("... async client... is the... best...");
try {
Thread.sleep(500); // Do some work
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
// Since reqObserver.onCompleted() hasn't been called, we can use onError().
reqObserver.onError(new RuntimeException("That was weak..."));
Thread.sleep(100); // Make logs more obvious. Cancel is async
// Async's cancel() will cancel. Also may not be called concurrently with other calls on the
// StreamObserver.
reqCallObserver = echoAsync("Async client or bust!");
reqCallObserver.onCompleted();
try {
Thread.sleep(250); // Do some work
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
// Since onCompleted() has been called, we can't use onError(). It is safe to use cancel()
// regardless of onCompleted() being called.
reqCallObserver.cancel("That's enough. I'm bored", null);
Thread.sleep(100); // Make logs more obvious. Cancel is async
}
/** Say hello to server, just like in helloworld example. */
public void echoBlocking(String text) {
System.out.println("\nYelling: " + text);
EchoRequest request = EchoRequest.newBuilder().setMessage(text).build();
EchoResponse response;
try {
response = EchoGrpc.newBlockingStub(channel).unaryEcho(request);
} catch (StatusRuntimeException e) {
System.out.println("RPC failed: " + e.getStatus());
return;
}
System.out.println("Echo: " + response.getMessage());
}
/** Say hello to the server, but using future API. */
public ListenableFuture<EchoResponse> echoFuture(String text) {
System.out.println("\nYelling: " + text);
EchoRequest request = EchoRequest.newBuilder().setMessage(text).build();
ListenableFuture<EchoResponse> future = EchoGrpc.newFutureStub(channel).unaryEcho(request);
Futures.addCallback(future, new FutureCallback<EchoResponse>() {
@Override
public void onSuccess(EchoResponse response) {
System.out.println("Echo: " + response.getMessage());
}
@Override
public void onFailure(Throwable t) {
System.out.println("RPC failed: " + Status.fromThrowable(t));
}
}, MoreExecutors.directExecutor());
return future;
}
/** Say hello to the server, but using async API and cancelling. */
public ClientCallStreamObserver<EchoRequest> echoAsync(String text) {
System.out.println("\nYelling: " + text);
EchoRequest request = EchoRequest.newBuilder().setMessage(text).build();
// Client-streaming and bidirectional RPCs can cast the returned StreamObserver to
// ClientCallStreamObserver.
//
// Unary and server-streaming stub methods don't return a StreamObserver. For such RPCs, you can
// use ClientResponseObserver to get the ClientCallStreamObserver. For example:
// EchoGrpc.newStub(channel).unaryEcho(new ClientResponseObserver<EchoResponse>() {...});
// Since ClientCallStreamObserver.cancel() is not thread-safe, it isn't safe to call from
// another thread until the RPC stub method (e.g., unaryEcho()) returns.
ClientCallStreamObserver<EchoRequest> reqObserver = (ClientCallStreamObserver<EchoRequest>)
EchoGrpc.newStub(channel).bidirectionalStreamingEcho(new StreamObserver<EchoResponse>() {
@Override
public void onNext(EchoResponse response) {
System.out.println("Echo: " + response.getMessage());
}
@Override
public void onCompleted() {
System.out.println("RPC completed");
}
@Override
public void onError(Throwable t) {
System.out.println("RPC failed: " + Status.fromThrowable(t));
}
});
reqObserver.onNext(request);
return reqObserver;
}
/**
* Cancel RPCs to a server. If provided, the first element of {@code args} is the target server.
*/
public static void main(String[] args) throws Exception {
String target = "localhost:50051";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [target]");
System.err.println("");
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
target = args[0];
}
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
try {
CancellationClient client = new CancellationClient(channel);
client.demonstrateCancellation();
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}

View File

@ -0,0 +1,206 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.examples.cancellation;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.examples.echo.EchoGrpc;
import io.grpc.examples.echo.EchoRequest;
import io.grpc.examples.echo.EchoResponse;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
/**
* Server that manages startup/shutdown of a {@code Greeter} server.
*
* <p>Any abort of an ongoing RPC is considered "cancellation" of that RPC. The common causes of
* cancellation are the client explicitly cancelling, the deadline expires, and I/O failures. The
* service is not informed the reason for the cancellation.
*
* <p>There are two APIs for services to be notified of RPC cancellation: io.grpc.Context and
* ServerCallStreamObserver. Context listeners are called on a different thread, so need to be
* thread-safe. The ServerCallStreamObserver cancellation callback is called like other
* StreamObserver callbacks, so the application may not need thread-safe handling. Both APIs have
* thread-safe isCancelled() polling methods.
*/
public class CancellationServer {
public static void main(String[] args) throws IOException, InterruptedException {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
int port = 50051;
Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
.addService(new SlowEcho(scheduler))
.build()
.start();
System.out.println("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
});
server.awaitTermination();
scheduler.shutdown();
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
}
static class SlowEcho extends EchoGrpc.EchoImplBase {
private final ScheduledExecutorService scheduler;
/** {@code scheduler} must be single-threaded. */
public SlowEcho(ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
}
/**
* Repeatedly echos each request until the client has no more requests. It performs all work
* asynchronously on a single thread. It uses ServerCallStreamObserver to be notified of RPC
* cancellation.
*/
@Override
public StreamObserver<EchoRequest> bidirectionalStreamingEcho(
StreamObserver<EchoResponse> responseObserver) {
// If the service is truly asynchronous, using ServerCallStreamObserver to receive
// cancellation notifications tends to work well.
// It is safe to cast the provided observer to ServerCallStreamObserver.
ServerCallStreamObserver<EchoResponse> responseCallObserver =
(ServerCallStreamObserver<EchoResponse>) responseObserver;
System.out.println("\nBidi RPC started");
class EchoObserver implements StreamObserver<EchoRequest> {
private static final int delayMs = 200;
private final List<Future<?>> echos = new ArrayList<>();
@Override
public void onNext(EchoRequest request) {
System.out.println("Bidi RPC received request: " + request.getMessage());
EchoResponse response
= EchoResponse.newBuilder().setMessage(request.getMessage()).build();
Runnable echo = () -> responseObserver.onNext(response);
echos.add(scheduler.scheduleAtFixedRate(echo, delayMs, delayMs, TimeUnit.MILLISECONDS));
}
@Override
public void onCompleted() {
System.out.println("Bidi RPC client finished");
// Let each echo happen two more times, and then stop.
List<Future<?>> echosCopy = new ArrayList<>(echos);
Runnable complete = () -> {
stopEchos(echosCopy);
responseObserver.onCompleted();
System.out.println("Bidi RPC completed");
};
echos.add(scheduler.schedule(complete, 2*delayMs, TimeUnit.MILLISECONDS));
}
@Override
public void onError(Throwable t) {
System.out.println("Bidi RPC failed: " + Status.fromThrowable(t));
stopEchos(echos);
scheduler.execute(() -> responseObserver.onError(t));
}
public void onCancel() {
// If onCompleted() hasn't been called by this point, then this method and onError are
// both called. If onCompleted() has been called, then just this method is called.
System.out.println("Bidi RPC cancelled");
stopEchos(echos);
}
private void stopEchos(List<Future<?>> echos) {
for (Future<?> echo : echos) {
echo.cancel(false);
}
}
}
EchoObserver requestObserver = new EchoObserver();
// onCancel() can be called even after the service completes or fails the RPC, because
// callbacks are racy and the response still has to be sent to the client. Use
// setOnCloseHandler() to be notified when the RPC completed without cancellation (as best as
// the server is able to tell).
responseCallObserver.setOnCancelHandler(requestObserver::onCancel);
return requestObserver;
}
/**
* Echos the request after a delay. It processes the request in-line within the callback. It
* uses Context to be notified of RPC cancellation.
*/
@Override
public void unaryEcho(EchoRequest request, StreamObserver<EchoResponse> responseObserver) {
// ServerCallStreamObserver.setOnCancelHandler(Runnable) is not useful for this method, since
// this method only returns once it has a result. ServerCallStreamObserver guarantees the
// Runnable is not run at the same time as other RPC callback methods (including this method),
// so the cancellation notification would be guaranteed to occur too late.
System.out.println("\nUnary RPC started: " + request.getMessage());
Context currentContext = Context.current();
// Let's start a multi-part operation. We can check cancellation periodically.
for (int i = 0; i < 10; i++) {
// ServerCallStreamObserver.isCancelled() returns true only if the RPC is cancelled.
// Context.isCancelled() is similar, but also returns true when the RPC completes normally.
// It doesn't matter which API is used here.
if (currentContext.isCancelled()) {
System.out.println("Unary RPC cancelled");
responseObserver.onError(
Status.CANCELLED.withDescription("RPC cancelled").asRuntimeException());
return;
}
FutureTask<Void> task = new FutureTask<>(() -> {
Thread.sleep(100); // Do some work
return null;
});
// Some Java blocking APIs have a method to cancel an ongoing operation, like closing an
// InputStream or interrupting the thread. We can use a Context listener to call that API
// from another thread if the RPC is cancelled.
Context.CancellationListener listener = (Context context) -> task.cancel(true);
Context.current().addListener(listener, MoreExecutors.directExecutor());
task.run(); // A cancellable operation
Context.current().removeListener(listener);
// gRPC stubs observe io.grpc.Context cancellation, so cancellation is automatically
// propagated when performing an RPC. You can use a different Context or use Context.fork()
// to disable the automatic propagation. For example,
// Context.ROOT.call(() -> futureStub.unaryEcho(request));
// context.fork().call(() -> futureStub.unaryEcho(request));
}
responseObserver.onNext(
EchoResponse.newBuilder().setMessage(request.getMessage()).build());
responseObserver.onCompleted();
System.out.println("Unary RPC completed");
}
}
}

View File

@ -0,0 +1,48 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
syntax = "proto3";
option go_package = "google.golang.org/grpc/examples/features/proto/echo";
option java_multiple_files = true;
option java_package = "io.grpc.examples.echo";
option java_outer_classname = "EchoProto";
package grpc.examples.echo;
// EchoRequest is the request for echo.
message EchoRequest {
string message = 1;
}
// EchoResponse is the response for echo.
message EchoResponse {
string message = 1;
}
// Echo is the echo service.
service Echo {
// UnaryEcho is unary echo.
rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
// ServerStreamingEcho is server side streaming.
rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
// ClientStreamingEcho is client side streaming.
rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
// BidirectionalStreamingEcho is bidi streaming.
rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
}