examples: Health example (#9991)

Provides a server with both a greet service and the health service.

   Client has an example of using the health service directly through the unary call
    <a href="https://github.com/grpc/grpc-java/blob/master/services/src/main/proto/grpc/health/v1/health.proto">check</a>
    to get the current health.  It also utilizes the health of the server's greet service
    indirectly through the round robin load balancer, which uses the streaming rpc
    <strong>watch</strong> (you can see how it is done in
    {@link  io.grpc.protobuf.services.HealthCheckingLoadBalancerFactory}).
This commit is contained in:
Larry Safran 2023-03-30 13:32:04 -07:00 committed by GitHub
parent 8ceac65e7a
commit 42b4c61d5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 400 additions and 0 deletions

View File

@ -80,6 +80,7 @@ java_library(
"@io_grpc_grpc_java//netty",
],
deps = [
":_health_java_grpc",
":echo_java_grpc",
":echo_java_proto",
":hello_streaming_java_grpc",
@ -93,7 +94,11 @@ java_library(
"@io_grpc_grpc_java//api",
"@io_grpc_grpc_java//context",
"@io_grpc_grpc_java//protobuf",
"@io_grpc_grpc_java//services:health",
"@io_grpc_grpc_java//services:healthlb",
"@io_grpc_grpc_java//stub",
"@io_grpc_grpc_proto//:health_proto",
"@io_grpc_grpc_proto//:health_java_proto",
"@maven//:com_google_api_grpc_proto_google_common_protos",
"@maven//:com_google_code_findbugs_jsr305",
"@maven//:com_google_code_gson_gson",
@ -217,3 +222,29 @@ java_binary(
":examples",
],
)
java_binary(
name = "healthservice-server",
testonly = 1,
main_class = "io.grpc.examples.healthservice.HealthServiceServer",
runtime_deps = [
":examples",
],
)
java_binary(
name = "healthservice-client",
testonly = 1,
main_class = "io.grpc.examples.healthservice.HealthServiceClient",
runtime_deps = [
":examples",
],
)
java_grpc_library(
name = "_health_java_grpc",
srcs = ["@io_grpc_grpc_proto//:health_proto"],
visibility = ["//visibility:private"],
deps = ["@io_grpc_grpc_proto//:health_java_proto"],
)

View File

@ -119,6 +119,20 @@ before trying out the examples.
</details>
- <details>
<summary>Health Service</summary>
The [health service example](src/main/java/io/grpc/examples/healthservice)
provides a HelloWorld gRPC server that doesn't like short names along with a
health service. It also provides a client application which makes HelloWorld
calls and checks the health status.
The client application also shows how the round robin load balancer can
utilize the health status to avoid making calls to a service that is
not actively serving.
</details>
- [Keep Alive](src/main/java/io/grpc/examples/keepalive)
### <a name="to-build-the-examples"></a> To build the examples

View File

@ -27,6 +27,7 @@ def protocVersion = protobufVersion
dependencies {
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "io.grpc:grpc-services:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"
compileOnly "org.apache.tomcat:annotations-api:6.0.53"
@ -223,6 +224,20 @@ task cancellationServer(type: CreateStartScripts) {
classpath = startScripts.classpath
}
task healthServiceServer(type: CreateStartScripts) {
mainClass = 'io.grpc.examples.healthservice.HealthServiceServer'
applicationName = 'health-service-server'
outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
classpath = startScripts.classpath
}
task healthServiceClient(type: CreateStartScripts) {
mainClass = 'io.grpc.examples.healthservice.HealthServiceClient'
applicationName = 'health-service-client'
outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
classpath = startScripts.classpath
}
task multiplexingServer(type: CreateStartScripts) {
mainClass = 'io.grpc.examples.multiplex.MultiplexingServer'
applicationName = 'multiplexing-server'
@ -259,6 +274,8 @@ applicationDistribution.into('bin') {
from(deadlineClient)
from(keepAliveServer)
from(keepAliveClient)
from(healthServiceServer)
from(healthServiceClient)
from(cancellationClient)
from(cancellationServer)
from(multiplexingServer)

View File

@ -42,6 +42,10 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-services</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>

View File

@ -0,0 +1,194 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.examples.healthservice;
import io.grpc.Channel;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.grpc.health.v1.HealthGrpc;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A client that requests a greeting from the {@link HelloWorldServer}.
*/
public class HealthServiceClient {
private static final Logger logger = Logger.getLogger(HealthServiceClient.class.getName());
private final GreeterGrpc.GreeterBlockingStub greeterBlockingStub;
private final HealthGrpc.HealthStub healthStub;
private final HealthGrpc.HealthBlockingStub healthBlockingStub;
private final HealthCheckRequest healthRequest;
/** Construct client for accessing HelloWorld server using the existing channel. */
public HealthServiceClient(Channel channel) {
greeterBlockingStub = GreeterGrpc.newBlockingStub(channel);
healthStub = HealthGrpc.newStub(channel);
healthBlockingStub = HealthGrpc.newBlockingStub(channel);
healthRequest = HealthCheckRequest.getDefaultInstance();
LoadBalancerProvider roundRobin = LoadBalancerRegistry.getDefaultRegistry()
.getProvider("round_robin");
}
private ServingStatus checkHealth(String prefix) {
HealthCheckResponse response =
healthBlockingStub.check(healthRequest);
logger.info(prefix + ", current health is: " + response.getStatus());
return response.getStatus();
}
/** Say hello to server. */
public void greet(String name) {
logger.info("Will try to greet " + name + " ...");
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
response = greeterBlockingStub.sayHello(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
} catch (Exception e) {
e.printStackTrace();
return;
}
logger.info("Greeting: " + response.getMessage());
}
private static void runTest(String target, String[] users, boolean useRoundRobin)
throws InterruptedException {
ManagedChannelBuilder<?> builder =
Grpc.newChannelBuilder(target, InsecureChannelCredentials.create());
// Round Robin, when a healthCheckConfig is present in the default service configuration, runs
// a watch on the health service and when picking an endpoint will
// consider a transport to a server whose service is not in SERVING state to be unavailable.
// Since we only have a single server we are connecting to, then the load balancer will
// return an error without sending the RPC.
if (useRoundRobin) {
builder = builder
.defaultLoadBalancingPolicy("round_robin")
.defaultServiceConfig(generateHealthConfig(""));
}
ManagedChannel channel = builder.build();
System.out.println("\nDoing test with" + (useRoundRobin ? "" : "out")
+ " the Round Robin load balancer\n");
try {
HealthServiceClient client = new HealthServiceClient(channel);
if (!useRoundRobin) {
client.checkHealth("Before call");
}
client.greet(users[0]);
if (!useRoundRobin) {
client.checkHealth("After user " + users[0]);
}
for (String user : users) {
client.greet(user);
Thread.sleep(100); // Since the health update is asynchronous give it time to propagate
}
if (!useRoundRobin) {
client.checkHealth("After all users");
Thread.sleep(10000);
client.checkHealth("After 10 second wait");
} else {
Thread.sleep(10000);
}
client.greet("Larry");
} finally {
// ManagedChannels use resources like threads and TCP connections. To prevent leaking these
// resources the channel should be shut down when it will no longer be used. If it may be used
// again leave it running.
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
private static Map<String, Object> generateHealthConfig(String serviceName) {
Map<String, Object> config = new HashMap<>();
Map<String, Object> serviceMap = new HashMap<>();
config.put("healthCheckConfig", serviceMap);
serviceMap.put("serviceName", serviceName);
return config;
}
/**
* Uses a server with both a greet service and the health service.
* If provided, the first element of {@code args} is the name to use in the
* greeting. The second argument is the target server.
* This has an example of using the health service directly through the unary call
* <a href="https://github.com/grpc/grpc-java/blob/master/services/src/main/proto/grpc/health/v1/health.proto">check</a>
* to get the current health. It also utilizes the health of the server's greet service
* indirectly through the round robin load balancer, which uses the streaming rpc
* <strong>watch</strong> (you can see how it is done in
* {@link io.grpc.protobuf.services.HealthCheckingLoadBalancerFactory}).
*/
public static void main(String[] args) throws Exception {
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tH:%1$tM:%1$tS %4$s %2$s: %5$s%6$s%n");
String[] users = {"world", "foo", "I am Grut"};
// Access a service running on the local machine on port 50051
String target = "localhost:50051";
// Allow passing in the user and target strings as command line arguments
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [target [name] [name] ...]");
System.err.println("");
System.err.println(" target The server to connect to. Defaults to " + target);
System.err.println(" name The names you wish to be greeted by. Defaults to " + Arrays.toString(users));
System.exit(1);
}
target = args[0];
}
if (args.length > 1) {
users = new String[args.length-1];
for (int i=0; i < users.length; i++) {
users[i] = args[i+1];
}
}
// Will see failures of rpc's sent while server service is not serving, where the failures come
// from the server
runTest(target, users, false);
// The client will throw an error when sending the rpc to a non-serving service because the
// round robin load balancer uses the health service's watch rpc.
runTest(target, users, true);
}
}

View File

@ -0,0 +1,140 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.examples.healthservice;
import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.grpc.protobuf.services.HealthStatusManager;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
/**
* Server that manages startup/shutdown of a {@code Greeter} server.
*/
public class HealthServiceServer {
private static final Logger logger = Logger.getLogger(HealthServiceServer.class.getName());
private Server server;
private HealthStatusManager health;
private void start() throws IOException {
/* The port on which the server should run */
int port = 50051;
health = new HealthStatusManager();
server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
.addService(new GreeterImpl())
.addService(health.getHealthService())
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
HealthServiceServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
health.setStatus("", ServingStatus.SERVING);
}
private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
/**
* Main launches the server from the command line.
*/
public static void main(String[] args) throws IOException, InterruptedException {
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tH:%1$tM:%1$tS %4$s %2$s: %5$s%6$s%n");
final HealthServiceServer server = new HealthServiceServer();
server.start();
server.blockUntilShutdown();
}
private class GreeterImpl extends GreeterGrpc.GreeterImplBase {
boolean isServing = true;
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
if (!isServing) {
responseObserver.onError(
Status.INTERNAL.withDescription("Not Serving right now").asRuntimeException());
return;
}
if (isNameLongEnough(req)) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} else {
logger.warning("Tiny message received, throwing a temper tantrum");
health.setStatus("", ServingStatus.NOT_SERVING);
isServing = false;
// In 10 seconds set it back to serving
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
isServing = true;
health.setStatus("", ServingStatus.SERVING);
logger.info("tantrum complete");
}
}).start();
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription("Offended by short name").asRuntimeException());
}
}
private boolean isNameLongEnough(HelloRequest req) {
return isServing && req.getName().length() >= 5;
}
}
}