From 42b4c61d5e869b1534808204bc71df84720f5087 Mon Sep 17 00:00:00 2001
From: Larry Safran <107004254+larry-safran@users.noreply.github.com>
Date: Thu, 30 Mar 2023 13:32:04 -0700
Subject: [PATCH] examples: Health example (#9991)
Provides a server with both a greet service and the health service.
Client has an example of using the health service directly through the unary call
check
to get the current health. It also utilizes the health of the server's greet service
indirectly through the round robin load balancer, which uses the streaming rpc
watch (you can see how it is done in
{@link io.grpc.protobuf.services.HealthCheckingLoadBalancerFactory}).
---
examples/BUILD.bazel | 31 +++
examples/README.md | 14 ++
examples/build.gradle | 17 ++
examples/pom.xml | 4 +
.../healthservice/HealthServiceClient.java | 194 ++++++++++++++++++
.../healthservice/HealthServiceServer.java | 140 +++++++++++++
6 files changed, 400 insertions(+)
create mode 100644 examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java
create mode 100644 examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java
diff --git a/examples/BUILD.bazel b/examples/BUILD.bazel
index 82d6b9fb2c..563fa07ce8 100644
--- a/examples/BUILD.bazel
+++ b/examples/BUILD.bazel
@@ -80,6 +80,7 @@ java_library(
"@io_grpc_grpc_java//netty",
],
deps = [
+ ":_health_java_grpc",
":echo_java_grpc",
":echo_java_proto",
":hello_streaming_java_grpc",
@@ -93,7 +94,11 @@ java_library(
"@io_grpc_grpc_java//api",
"@io_grpc_grpc_java//context",
"@io_grpc_grpc_java//protobuf",
+ "@io_grpc_grpc_java//services:health",
+ "@io_grpc_grpc_java//services:healthlb",
"@io_grpc_grpc_java//stub",
+ "@io_grpc_grpc_proto//:health_proto",
+ "@io_grpc_grpc_proto//:health_java_proto",
"@maven//:com_google_api_grpc_proto_google_common_protos",
"@maven//:com_google_code_findbugs_jsr305",
"@maven//:com_google_code_gson_gson",
@@ -217,3 +222,29 @@ java_binary(
":examples",
],
)
+
+java_binary(
+ name = "healthservice-server",
+ testonly = 1,
+ main_class = "io.grpc.examples.healthservice.HealthServiceServer",
+ runtime_deps = [
+ ":examples",
+ ],
+)
+
+java_binary(
+ name = "healthservice-client",
+ testonly = 1,
+ main_class = "io.grpc.examples.healthservice.HealthServiceClient",
+ runtime_deps = [
+ ":examples",
+ ],
+)
+
+java_grpc_library(
+ name = "_health_java_grpc",
+ srcs = ["@io_grpc_grpc_proto//:health_proto"],
+ visibility = ["//visibility:private"],
+ deps = ["@io_grpc_grpc_proto//:health_java_proto"],
+)
+
diff --git a/examples/README.md b/examples/README.md
index f83d04f08d..ae849fa7d6 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -119,6 +119,20 @@ before trying out the examples.
+-
+ Health Service
+
+ The [health service example](src/main/java/io/grpc/examples/healthservice)
+ provides a HelloWorld gRPC server that doesn't like short names along with a
+ health service. It also provides a client application which makes HelloWorld
+ calls and checks the health status.
+
+ The client application also shows how the round robin load balancer can
+ utilize the health status to avoid making calls to a service that is
+ not actively serving.
+
+
+
- [Keep Alive](src/main/java/io/grpc/examples/keepalive)
### To build the examples
diff --git a/examples/build.gradle b/examples/build.gradle
index c330fb1cc0..a2980b28af 100644
--- a/examples/build.gradle
+++ b/examples/build.gradle
@@ -27,6 +27,7 @@ def protocVersion = protobufVersion
dependencies {
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
+ implementation "io.grpc:grpc-services:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"
compileOnly "org.apache.tomcat:annotations-api:6.0.53"
@@ -223,6 +224,20 @@ task cancellationServer(type: CreateStartScripts) {
classpath = startScripts.classpath
}
+task healthServiceServer(type: CreateStartScripts) {
+ mainClass = 'io.grpc.examples.healthservice.HealthServiceServer'
+ applicationName = 'health-service-server'
+ outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
+ classpath = startScripts.classpath
+}
+
+task healthServiceClient(type: CreateStartScripts) {
+ mainClass = 'io.grpc.examples.healthservice.HealthServiceClient'
+ applicationName = 'health-service-client'
+ outputDir = new File(project.buildDir, 'tmp/scripts/' + name)
+ classpath = startScripts.classpath
+}
+
task multiplexingServer(type: CreateStartScripts) {
mainClass = 'io.grpc.examples.multiplex.MultiplexingServer'
applicationName = 'multiplexing-server'
@@ -259,6 +274,8 @@ applicationDistribution.into('bin') {
from(deadlineClient)
from(keepAliveServer)
from(keepAliveClient)
+ from(healthServiceServer)
+ from(healthServiceClient)
from(cancellationClient)
from(cancellationServer)
from(multiplexingServer)
diff --git a/examples/pom.xml b/examples/pom.xml
index 4482a2b8c8..13f8164fb4 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -42,6 +42,10 @@
io.grpc
grpc-protobuf
+
+ io.grpc
+ grpc-services
+
io.grpc
grpc-stub
diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java
new file mode 100644
index 0000000000..471084feab
--- /dev/null
+++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2023 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.examples.healthservice;
+
+import io.grpc.Channel;
+import io.grpc.Grpc;
+import io.grpc.InsecureChannelCredentials;
+import io.grpc.LoadBalancerProvider;
+import io.grpc.LoadBalancerRegistry;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.StatusRuntimeException;
+import io.grpc.examples.helloworld.GreeterGrpc;
+import io.grpc.examples.helloworld.HelloReply;
+import io.grpc.examples.helloworld.HelloRequest;
+import io.grpc.health.v1.HealthCheckRequest;
+import io.grpc.health.v1.HealthCheckResponse;
+import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
+import io.grpc.health.v1.HealthGrpc;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A client that requests a greeting from the {@link HelloWorldServer}.
+ */
+public class HealthServiceClient {
+ private static final Logger logger = Logger.getLogger(HealthServiceClient.class.getName());
+
+ private final GreeterGrpc.GreeterBlockingStub greeterBlockingStub;
+ private final HealthGrpc.HealthStub healthStub;
+ private final HealthGrpc.HealthBlockingStub healthBlockingStub;
+
+ private final HealthCheckRequest healthRequest;
+
+ /** Construct client for accessing HelloWorld server using the existing channel. */
+ public HealthServiceClient(Channel channel) {
+ greeterBlockingStub = GreeterGrpc.newBlockingStub(channel);
+ healthStub = HealthGrpc.newStub(channel);
+ healthBlockingStub = HealthGrpc.newBlockingStub(channel);
+ healthRequest = HealthCheckRequest.getDefaultInstance();
+ LoadBalancerProvider roundRobin = LoadBalancerRegistry.getDefaultRegistry()
+ .getProvider("round_robin");
+
+ }
+
+ private ServingStatus checkHealth(String prefix) {
+ HealthCheckResponse response =
+ healthBlockingStub.check(healthRequest);
+ logger.info(prefix + ", current health is: " + response.getStatus());
+ return response.getStatus();
+ }
+
+ /** Say hello to server. */
+ public void greet(String name) {
+ logger.info("Will try to greet " + name + " ...");
+ HelloRequest request = HelloRequest.newBuilder().setName(name).build();
+ HelloReply response;
+ try {
+ response = greeterBlockingStub.sayHello(request);
+ } catch (StatusRuntimeException e) {
+ logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
+ return;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ logger.info("Greeting: " + response.getMessage());
+ }
+
+
+ private static void runTest(String target, String[] users, boolean useRoundRobin)
+ throws InterruptedException {
+ ManagedChannelBuilder> builder =
+ Grpc.newChannelBuilder(target, InsecureChannelCredentials.create());
+
+ // Round Robin, when a healthCheckConfig is present in the default service configuration, runs
+ // a watch on the health service and when picking an endpoint will
+ // consider a transport to a server whose service is not in SERVING state to be unavailable.
+ // Since we only have a single server we are connecting to, then the load balancer will
+ // return an error without sending the RPC.
+ if (useRoundRobin) {
+ builder = builder
+ .defaultLoadBalancingPolicy("round_robin")
+ .defaultServiceConfig(generateHealthConfig(""));
+ }
+
+ ManagedChannel channel = builder.build();
+
+ System.out.println("\nDoing test with" + (useRoundRobin ? "" : "out")
+ + " the Round Robin load balancer\n");
+
+ try {
+ HealthServiceClient client = new HealthServiceClient(channel);
+ if (!useRoundRobin) {
+ client.checkHealth("Before call");
+ }
+ client.greet(users[0]);
+ if (!useRoundRobin) {
+ client.checkHealth("After user " + users[0]);
+ }
+
+ for (String user : users) {
+ client.greet(user);
+ Thread.sleep(100); // Since the health update is asynchronous give it time to propagate
+ }
+
+ if (!useRoundRobin) {
+ client.checkHealth("After all users");
+ Thread.sleep(10000);
+ client.checkHealth("After 10 second wait");
+ } else {
+ Thread.sleep(10000);
+ }
+ client.greet("Larry");
+ } finally {
+ // ManagedChannels use resources like threads and TCP connections. To prevent leaking these
+ // resources the channel should be shut down when it will no longer be used. If it may be used
+ // again leave it running.
+ channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+ private static Map generateHealthConfig(String serviceName) {
+ Map config = new HashMap<>();
+ Map serviceMap = new HashMap<>();
+
+ config.put("healthCheckConfig", serviceMap);
+ serviceMap.put("serviceName", serviceName);
+ return config;
+ }
+
+ /**
+ * Uses a server with both a greet service and the health service.
+ * If provided, the first element of {@code args} is the name to use in the
+ * greeting. The second argument is the target server.
+ * This has an example of using the health service directly through the unary call
+ * check
+ * to get the current health. It also utilizes the health of the server's greet service
+ * indirectly through the round robin load balancer, which uses the streaming rpc
+ * watch (you can see how it is done in
+ * {@link io.grpc.protobuf.services.HealthCheckingLoadBalancerFactory}).
+ */
+ public static void main(String[] args) throws Exception {
+ System.setProperty("java.util.logging.SimpleFormatter.format",
+ "%1$tH:%1$tM:%1$tS %4$s %2$s: %5$s%6$s%n");
+
+ String[] users = {"world", "foo", "I am Grut"};
+ // Access a service running on the local machine on port 50051
+ String target = "localhost:50051";
+ // Allow passing in the user and target strings as command line arguments
+ if (args.length > 0) {
+ if ("--help".equals(args[0])) {
+ System.err.println("Usage: [target [name] [name] ...]");
+ System.err.println("");
+ System.err.println(" target The server to connect to. Defaults to " + target);
+ System.err.println(" name The names you wish to be greeted by. Defaults to " + Arrays.toString(users));
+ System.exit(1);
+ }
+ target = args[0];
+ }
+ if (args.length > 1) {
+ users = new String[args.length-1];
+ for (int i=0; i < users.length; i++) {
+ users[i] = args[i+1];
+ }
+ }
+
+ // Will see failures of rpc's sent while server service is not serving, where the failures come
+ // from the server
+ runTest(target, users, false);
+
+ // The client will throw an error when sending the rpc to a non-serving service because the
+ // round robin load balancer uses the health service's watch rpc.
+ runTest(target, users, true);
+
+ }
+}
diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java
new file mode 100644
index 0000000000..f6547c1110
--- /dev/null
+++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2023 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.examples.healthservice;
+
+import io.grpc.Grpc;
+import io.grpc.InsecureServerCredentials;
+import io.grpc.Server;
+import io.grpc.Status;
+import io.grpc.examples.helloworld.GreeterGrpc;
+import io.grpc.examples.helloworld.HelloReply;
+import io.grpc.examples.helloworld.HelloRequest;
+import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
+import io.grpc.protobuf.services.HealthStatusManager;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+/**
+ * Server that manages startup/shutdown of a {@code Greeter} server.
+ */
+public class HealthServiceServer {
+ private static final Logger logger = Logger.getLogger(HealthServiceServer.class.getName());
+
+ private Server server;
+ private HealthStatusManager health;
+
+ private void start() throws IOException {
+ /* The port on which the server should run */
+ int port = 50051;
+ health = new HealthStatusManager();
+ server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
+ .addService(new GreeterImpl())
+ .addService(health.getHealthService())
+ .build()
+ .start();
+ logger.info("Server started, listening on " + port);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ // Use stderr here since the logger may have been reset by its JVM shutdown hook.
+ System.err.println("*** shutting down gRPC server since JVM is shutting down");
+ try {
+ HealthServiceServer.this.stop();
+ } catch (InterruptedException e) {
+ e.printStackTrace(System.err);
+ }
+ System.err.println("*** server shut down");
+ }
+ });
+
+ health.setStatus("", ServingStatus.SERVING);
+ }
+
+ private void stop() throws InterruptedException {
+ if (server != null) {
+ server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * Await termination on the main thread since the grpc library uses daemon threads.
+ */
+ private void blockUntilShutdown() throws InterruptedException {
+ if (server != null) {
+ server.awaitTermination();
+ }
+ }
+
+ /**
+ * Main launches the server from the command line.
+ */
+ public static void main(String[] args) throws IOException, InterruptedException {
+ System.setProperty("java.util.logging.SimpleFormatter.format",
+ "%1$tH:%1$tM:%1$tS %4$s %2$s: %5$s%6$s%n");
+
+ final HealthServiceServer server = new HealthServiceServer();
+ server.start();
+ server.blockUntilShutdown();
+ }
+
+ private class GreeterImpl extends GreeterGrpc.GreeterImplBase {
+ boolean isServing = true;
+
+ @Override
+ public void sayHello(HelloRequest req, StreamObserver responseObserver) {
+ if (!isServing) {
+ responseObserver.onError(
+ Status.INTERNAL.withDescription("Not Serving right now").asRuntimeException());
+ return;
+ }
+
+ if (isNameLongEnough(req)) {
+ HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ } else {
+ logger.warning("Tiny message received, throwing a temper tantrum");
+ health.setStatus("", ServingStatus.NOT_SERVING);
+ isServing = false;
+
+ // In 10 seconds set it back to serving
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ isServing = true;
+ health.setStatus("", ServingStatus.SERVING);
+ logger.info("tantrum complete");
+ }
+ }).start();
+ responseObserver.onError(
+ Status.INVALID_ARGUMENT.withDescription("Offended by short name").asRuntimeException());
+ }
+ }
+
+ private boolean isNameLongEnough(HelloRequest req) {
+ return isServing && req.getName().length() >= 5;
+ }
+ }
+}