mirror of https://github.com/grpc/grpc-java.git
examples: Adds client/server retrying example via service config
This commit is contained in:
parent
1a42c8aea1
commit
a543174830
|
|
@ -87,6 +87,36 @@ before trying out the examples.
|
||||||
|
|
||||||
</details>
|
</details>
|
||||||
|
|
||||||
|
- <details>
|
||||||
|
<summary>Retrying</summary>
|
||||||
|
|
||||||
|
The [retrying example](src/main/java/io/grpc/examples/retrying) provides a HelloWorld GRPC client &
|
||||||
|
server which demos the effect of client retry policy configured on the [ManagedChannel](
|
||||||
|
../api/src/main/java/io/grpc/ManagedChannel.java) via [GRPC ServiceConfig](
|
||||||
|
https://github.com/grpc/grpc/blob/master/doc/service_config.md). Retry policy implementation &
|
||||||
|
configuration details are outlined in the [proposal](https://github.com/grpc/proposal/blob/master/A6-client-retries.md).
|
||||||
|
|
||||||
|
This retrying example is very similar to the [hedging example](src/main/java/io/grpc/examples/hedging) in its setup.
|
||||||
|
The [RetryingHelloWorldServer](src/main/java/io/grpc/examples/retrying/RetryingHelloWorldServer.java) responds with
|
||||||
|
a status UNAVAILABLE error response to a specified percentage of requests to simulate server resource exhaustion and
|
||||||
|
general flakiness. The [RetryingHelloWorldClient](src/main/java/io/grpc/examples/retrying/RetryingHelloWorldClient.java) makes
|
||||||
|
a number of sequential requests to the server, several of which will be retried depending on the configured policy in
|
||||||
|
[retrying_service_config.json](src/main/resources/io/grpc/examples/retrying/retrying_service_config.json). Although
|
||||||
|
the requests are blocking unary calls for simplicity, these could easily be changed to future unary calls in order to
|
||||||
|
test the result of request concurrency with retry policy enabled.
|
||||||
|
|
||||||
|
One can experiment with the [RetryingHelloWorldServer](src/main/java/io/grpc/examples/retrying/RetryingHelloWorldServer.java)
|
||||||
|
failure conditions to simulate server throttling, as well as alter policy values in the [retrying_service_config.json](
|
||||||
|
src/main/resources/io/grpc/examples/retrying/retrying_service_config.json) to see their effects. To disable retrying
|
||||||
|
entirely, set environment variable `DISABLE_RETRYING_IN_RETRYING_EXAMPLE=true` before running the client.
|
||||||
|
Disabling the retry policy should produce many more failed GRPC calls as seen in the output log.
|
||||||
|
|
||||||
|
See [the section below](#to-build-the-examples) for how to build and run the example. The
|
||||||
|
executables for the server and the client are `retrying-hello-world-server` and
|
||||||
|
`retrying-hello-world-client`.
|
||||||
|
|
||||||
|
</details>
|
||||||
|
|
||||||
### <a name="to-build-the-examples"></a> To build the examples
|
### <a name="to-build-the-examples"></a> To build the examples
|
||||||
|
|
||||||
1. **[Install gRPC Java library SNAPSHOT locally, including code generation plugin](../COMPILING.md) (Only need this step for non-released versions, e.g. master HEAD).**
|
1. **[Install gRPC Java library SNAPSHOT locally, including code generation plugin](../COMPILING.md) (Only need this step for non-released versions, e.g. master HEAD).**
|
||||||
|
|
|
||||||
|
|
@ -91,6 +91,20 @@ task helloWorldClient(type: CreateStartScripts) {
|
||||||
classpath = startScripts.classpath
|
classpath = startScripts.classpath
|
||||||
}
|
}
|
||||||
|
|
||||||
|
task retryingHelloWorldServer(type: CreateStartScripts) {
|
||||||
|
mainClassName = 'io.grpc.examples.retrying.RetryingHelloWorldServer'
|
||||||
|
applicationName = 'retrying-hello-world-server'
|
||||||
|
outputDir = new File(project.buildDir, 'tmp')
|
||||||
|
classpath = startScripts.classpath
|
||||||
|
}
|
||||||
|
|
||||||
|
task retryingHelloWorldClient(type: CreateStartScripts) {
|
||||||
|
mainClassName = 'io.grpc.examples.retrying.RetryingHelloWorldClient'
|
||||||
|
applicationName = 'retrying-hello-world-client'
|
||||||
|
outputDir = new File(project.buildDir, 'tmp')
|
||||||
|
classpath = startScripts.classpath
|
||||||
|
}
|
||||||
|
|
||||||
task hedgingHelloWorldServer(type: CreateStartScripts) {
|
task hedgingHelloWorldServer(type: CreateStartScripts) {
|
||||||
mainClassName = 'io.grpc.examples.hedging.HedgingHelloWorldServer'
|
mainClassName = 'io.grpc.examples.hedging.HedgingHelloWorldServer'
|
||||||
applicationName = 'hedging-hello-world-server'
|
applicationName = 'hedging-hello-world-server'
|
||||||
|
|
@ -119,6 +133,8 @@ applicationDistribution.into('bin') {
|
||||||
from(helloWorldClient)
|
from(helloWorldClient)
|
||||||
from(hedgingHelloWorldClient)
|
from(hedgingHelloWorldClient)
|
||||||
from(hedgingHelloWorldServer)
|
from(hedgingHelloWorldServer)
|
||||||
|
from(retryingHelloWorldClient)
|
||||||
|
from(retryingHelloWorldServer)
|
||||||
from(compressingHelloWorldClient)
|
from(compressingHelloWorldClient)
|
||||||
fileMode = 0755
|
fileMode = 0755
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,148 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2020 The gRPC Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.grpc.examples.retrying;
|
||||||
|
|
||||||
|
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||||
|
|
||||||
|
import com.google.gson.Gson;
|
||||||
|
import com.google.gson.stream.JsonReader;
|
||||||
|
import io.grpc.ManagedChannel;
|
||||||
|
import io.grpc.ManagedChannelBuilder;
|
||||||
|
import io.grpc.StatusRuntimeException;
|
||||||
|
import io.grpc.examples.helloworld.GreeterGrpc;
|
||||||
|
import io.grpc.examples.helloworld.HelloReply;
|
||||||
|
import io.grpc.examples.helloworld.HelloRequest;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ForkJoinPool;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.logging.Level;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A client that requests a greeting from the {@link RetryingHelloWorldServer} with a retrying policy.
|
||||||
|
*/
|
||||||
|
public class RetryingHelloWorldClient {
|
||||||
|
static final String ENV_DISABLE_RETRYING = "DISABLE_RETRYING_IN_RETRYING_EXAMPLE";
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(RetryingHelloWorldClient.class.getName());
|
||||||
|
|
||||||
|
private final boolean enableRetries;
|
||||||
|
private final ManagedChannel channel;
|
||||||
|
private final GreeterGrpc.GreeterBlockingStub blockingStub;
|
||||||
|
private final AtomicInteger totalRpcs = new AtomicInteger();
|
||||||
|
private final AtomicInteger failedRpcs = new AtomicInteger();
|
||||||
|
|
||||||
|
protected Map<String, ?> getRetryingServiceConfig() {
|
||||||
|
return new Gson()
|
||||||
|
.fromJson(
|
||||||
|
new JsonReader(
|
||||||
|
new InputStreamReader(
|
||||||
|
RetryingHelloWorldClient.class.getResourceAsStream(
|
||||||
|
"retrying_service_config.json"),
|
||||||
|
UTF_8)),
|
||||||
|
Map.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct client connecting to HelloWorld server at {@code host:port}.
|
||||||
|
*/
|
||||||
|
public RetryingHelloWorldClient(String host, int port, boolean enableRetries) {
|
||||||
|
|
||||||
|
ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder.forAddress(host, port)
|
||||||
|
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
|
||||||
|
// needing certificates.
|
||||||
|
.usePlaintext();
|
||||||
|
if (enableRetries) {
|
||||||
|
Map<String, ?> serviceConfig = getRetryingServiceConfig();
|
||||||
|
logger.info("Client started with retrying configuration: " + serviceConfig.toString());
|
||||||
|
channelBuilder.defaultServiceConfig(serviceConfig).enableRetry();
|
||||||
|
}
|
||||||
|
channel = channelBuilder.build();
|
||||||
|
blockingStub = GreeterGrpc.newBlockingStub(channel);
|
||||||
|
this.enableRetries = enableRetries;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdown() throws InterruptedException {
|
||||||
|
channel.shutdown().awaitTermination(60, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Say hello to server in a blocking unary call.
|
||||||
|
*/
|
||||||
|
public void greet(String name) {
|
||||||
|
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
|
||||||
|
HelloReply response = null;
|
||||||
|
StatusRuntimeException statusRuntimeException = null;
|
||||||
|
try {
|
||||||
|
response = blockingStub.sayHello(request);
|
||||||
|
} catch (StatusRuntimeException e) {
|
||||||
|
failedRpcs.incrementAndGet();
|
||||||
|
statusRuntimeException = e;
|
||||||
|
}
|
||||||
|
|
||||||
|
totalRpcs.incrementAndGet();
|
||||||
|
|
||||||
|
if (statusRuntimeException == null) {
|
||||||
|
logger.log(Level.INFO,"Greeting: {0}", new Object[]{response.getMessage()});
|
||||||
|
} else {
|
||||||
|
logger.log(Level.INFO,"RPC failed: {0}", new Object[]{statusRuntimeException.getStatus()});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void printSummary() {
|
||||||
|
logger.log(
|
||||||
|
Level.INFO,
|
||||||
|
"\n\nTotal RPCs sent: {0}. Total RPCs failed: {1}\n",
|
||||||
|
new Object[]{
|
||||||
|
totalRpcs.get(), failedRpcs.get()});
|
||||||
|
|
||||||
|
if (enableRetries) {
|
||||||
|
logger.log(
|
||||||
|
Level.INFO,
|
||||||
|
"Retrying enabled. To disable retries, run the client with environment variable {0}=true.",
|
||||||
|
ENV_DISABLE_RETRYING);
|
||||||
|
} else {
|
||||||
|
logger.log(
|
||||||
|
Level.INFO,
|
||||||
|
"Retrying disabled. To enable retries, unset environment variable {0} and then run the client.",
|
||||||
|
ENV_DISABLE_RETRYING);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
boolean enableRetries = !Boolean.parseBoolean(System.getenv(ENV_DISABLE_RETRYING));
|
||||||
|
final RetryingHelloWorldClient client = new RetryingHelloWorldClient("localhost", 50051, enableRetries);
|
||||||
|
ForkJoinPool executor = new ForkJoinPool();
|
||||||
|
|
||||||
|
for (int i = 0; i < 50; i++) {
|
||||||
|
final String userId = "user" + i;
|
||||||
|
executor.execute(
|
||||||
|
new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
client.greet(userId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
executor.awaitQuiescence(100, TimeUnit.SECONDS);
|
||||||
|
executor.shutdown();
|
||||||
|
client.printSummary();
|
||||||
|
client.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,111 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2020 The gRPC Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.grpc.examples.retrying;
|
||||||
|
|
||||||
|
import java.text.DecimalFormat;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import io.grpc.Server;
|
||||||
|
import io.grpc.ServerBuilder;
|
||||||
|
import io.grpc.Status;
|
||||||
|
import io.grpc.examples.helloworld.GreeterGrpc;
|
||||||
|
import io.grpc.examples.helloworld.HelloReply;
|
||||||
|
import io.grpc.examples.helloworld.HelloRequest;
|
||||||
|
import io.grpc.stub.StreamObserver;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A HelloWorld server that responds to requests with UNAVAILABLE with a given percentage.
|
||||||
|
*/
|
||||||
|
public class RetryingHelloWorldServer {
|
||||||
|
private static final Logger logger = Logger.getLogger(RetryingHelloWorldServer.class.getName());
|
||||||
|
private static final float unavailablePercentage = 0.5F;
|
||||||
|
private static Random random = new Random();
|
||||||
|
|
||||||
|
private Server server;
|
||||||
|
|
||||||
|
private void start() throws IOException {
|
||||||
|
/* The port on which the server should run */
|
||||||
|
int port = 50051;
|
||||||
|
server = ServerBuilder.forPort(port)
|
||||||
|
.addService(new GreeterImpl())
|
||||||
|
.build()
|
||||||
|
.start();
|
||||||
|
logger.info("Server started, listening on " + port);
|
||||||
|
|
||||||
|
DecimalFormat df = new DecimalFormat("#%");
|
||||||
|
logger.info("Responding as UNAVAILABLE to " + df.format(unavailablePercentage) + " requests");
|
||||||
|
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
|
||||||
|
System.err.println("*** shutting down gRPC server since JVM is shutting down");
|
||||||
|
try {
|
||||||
|
RetryingHelloWorldServer.this.stop();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace(System.err);
|
||||||
|
}
|
||||||
|
System.err.println("*** server shut down");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void stop() throws InterruptedException {
|
||||||
|
if (server != null) {
|
||||||
|
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Await termination on the main thread since the grpc library uses daemon threads.
|
||||||
|
*/
|
||||||
|
private void blockUntilShutdown() throws InterruptedException {
|
||||||
|
if (server != null) {
|
||||||
|
server.awaitTermination();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Main launches the server from the command line.
|
||||||
|
*/
|
||||||
|
public static void main(String[] args) throws IOException, InterruptedException {
|
||||||
|
final RetryingHelloWorldServer server = new RetryingHelloWorldServer();
|
||||||
|
server.start();
|
||||||
|
server.blockUntilShutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
|
||||||
|
AtomicInteger retryCounter = new AtomicInteger(0);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
|
||||||
|
int count = retryCounter.incrementAndGet();
|
||||||
|
if (random.nextFloat() < unavailablePercentage) {
|
||||||
|
logger.info("Returning stubbed UNAVAILABLE error. count: " + count);
|
||||||
|
responseObserver.onError(Status.UNAVAILABLE
|
||||||
|
.withDescription("Greeter temporarily unavailable...").asRuntimeException());
|
||||||
|
} else {
|
||||||
|
logger.info("Returning successful Hello response, count: " + count);
|
||||||
|
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
|
||||||
|
responseObserver.onNext(reply);
|
||||||
|
responseObserver.onCompleted();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,22 @@
|
||||||
|
{
|
||||||
|
"methodConfig": [
|
||||||
|
{
|
||||||
|
"name": [
|
||||||
|
{
|
||||||
|
"service": "helloworld.Greeter",
|
||||||
|
"method": "SayHello"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
|
||||||
|
"retryPolicy": {
|
||||||
|
"maxAttempts": 5,
|
||||||
|
"initialBackoff": "0.5s",
|
||||||
|
"maxBackoff": "30s",
|
||||||
|
"backoffMultiplier": 2,
|
||||||
|
"retryableStatusCodes": [
|
||||||
|
"UNAVAILABLE"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue