Rewrite gRPC service invocation example (#883)

* original version of service invocation grpc proxying

Signed-off-by: MregXN <mregxn@gmail.com>

* modify formatter and README

Signed-off-by: MregXN <mregxn@gmail.com>

* inject grpc port automatically

Signed-off-by: MregXN <mregxn@gmail.com>

* re-trigger validation

Signed-off-by: MregXN <mregxn@gmail.com>

* use withInterceptors() as MetadataUtils.attachHeaders is deprecated

Signed-off-by: MregXN <mregxn@gmail.com>

---------

Signed-off-by: MregXN <mregxn@gmail.com>
Signed-off-by: Artur Souza <artursouza.ms@outlook.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
Co-authored-by: Cassie Coyle <cassie@diagrid.io>
This commit is contained in:
MregXN 2023-12-21 23:23:19 +08:00 committed by GitHub
parent 49ccb31dcc
commit 14d836310c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 186 additions and 178 deletions

View File

@ -7,13 +7,16 @@ option java_package = "io.dapr.examples";
// User Code definitions
service HelloWorld {
rpc Say (SayRequest) returns (SayResponse) {}
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message SayRequest {
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
message SayResponse {
string timestamp = 1;
}

View File

@ -13,43 +13,67 @@ limitations under the License.
package io.dapr.examples.invoke.grpc;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.examples.DaprExamplesProtos.HelloReply;
import io.dapr.examples.DaprExamplesProtos.HelloRequest;
import io.dapr.examples.HelloWorldGrpc;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.MetadataUtils;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 1. Build and install jars:
* mvn clean install
* 2. cd [repo root]/examples
* 2. Send messages to the server:
* dapr run -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.invoke.grpc.HelloWorldClient
* dapr run -- java -jar target/dapr-java-sdk-examples-exec.jar
* io.dapr.examples.invoke.grpc.HelloWorldClient
*/
public class HelloWorldClient {
private static final Logger logger = Logger.getLogger(HelloWorldClient.class.getName());
/**
* The main method of the client app.
*
* @param args Array of messages to be sent.
*/
public static void main(String[] args) throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
String serviceAppId = "hellogrpc";
String method = "say";
String user = "World";
String target = "localhost:" + System.getenv("DAPR_GRPC_PORT");
int count = 0;
while (true) {
String message = "Message #" + (count++);
System.out.println("Sending message: " + message);
client.invokeMethod(serviceAppId, method, message, HttpExtension.NONE).block();
System.out.println("Message sent: " + message);
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
Thread.sleep(1000);
try {
HelloWorldGrpc.HelloWorldBlockingStub blockingStub = HelloWorldGrpc.newBlockingStub(channel);
// This is an example, so for simplicity we are just exiting here.
// Normally a dapr app would be a web service and not exit main.
System.out.println("Done");
Metadata headers = new Metadata();
headers.put(Metadata.Key.of("dapr-app-id", Metadata.ASCII_STRING_MARSHALLER),
"hellogrpc");
// MetadataUtils.attachHeaders is deprecated.
blockingStub = blockingStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(headers));
logger.info("Will try to greet " + user + " ...");
try {
HelloRequest request = HelloRequest.newBuilder().setName(user).build();
HelloReply response = blockingStub.sayHello(request);
logger.info("Greeting: " + response.getMessage());
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
} finally {
// To prevent leaking resources like threads and TCP connections
// the channel should be shut down when it will no longer be used.
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
}

View File

@ -13,11 +13,12 @@ limitations under the License.
package io.dapr.examples.invoke.grpc;
import com.google.protobuf.Any;
import io.dapr.v1.AppCallbackGrpc;
import io.dapr.v1.CommonProtos;
import io.dapr.examples.DaprExamplesProtos.HelloReply;
import io.dapr.examples.DaprExamplesProtos.HelloRequest;
import io.dapr.examples.HelloWorldGrpc;
import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@ -25,13 +26,9 @@ import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import static io.dapr.examples.DaprExamplesProtos.SayRequest;
import static io.dapr.examples.DaprExamplesProtos.SayResponse;
/**
* 1. Build and install jars:
@ -39,107 +36,89 @@ import static io.dapr.examples.DaprExamplesProtos.SayResponse;
* 2. cd [repo root]/examples
* 3. Run in server mode:
* dapr run --app-id hellogrpc --app-port 5000 --app-protocol grpc \
* -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.invoke.grpc.HelloWorldService -p 5000
* -- java -jar target/dapr-java-sdk-examples-exec.jar
* io.dapr.examples.invoke.grpc.HelloWorldService -p 5000
*/
public class HelloWorldService {
private static final Logger logger = Logger.getLogger(HelloWorldService.class.getName());
/**
* Server mode: class that encapsulates all server-side logic for Grpc.
* Server mode: Grpc server.
*/
private static class GrpcHelloWorldDaprService extends AppCallbackGrpc.AppCallbackImplBase {
private Server server;
/**
* Server mode: class that encapsulates server-side handling logic for Grpc.
*/
static class HelloWorldImpl extends HelloWorldGrpc.HelloWorldImplBase {
/**
* Format to output date and time.
*/
private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
/**
* Server mode: Grpc server.
*/
private Server server;
/**
* Server mode: starts listening on given port.
*
* @param port Port to listen on.
* @throws IOException Errors while trying to start service.
*/
private void start(int port) throws IOException {
this.server = ServerBuilder
.forPort(port)
.addService(this)
.build()
.start();
System.out.printf("Server: started listening on port %d\n", port);
// Now we handle ctrl+c (or any other JVM shutdown)
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("Server: shutting down gracefully ...");
GrpcHelloWorldDaprService.this.server.shutdown();
System.out.println("Server: Bye.");
}
});
}
/**
* Server mode: waits for shutdown trigger.
*
* @throws InterruptedException Propagated interrupted exception.
*/
private void awaitTermination() throws InterruptedException {
if (this.server != null) {
this.server.awaitTermination();
}
}
/**
* Server mode: this is the Dapr method to receive Invoke operations via Grpc.
*
* @param request Dapr envelope request,
* @param responseObserver Dapr envelope response.
*/
@Override
public void onInvoke(CommonProtos.InvokeRequest request,
StreamObserver<CommonProtos.InvokeResponse> responseObserver) {
try {
if ("say".equals(request.getMethod())) {
SayRequest sayRequest =
SayRequest.newBuilder().setMessage(request.getData().getValue().toStringUtf8()).build();
SayResponse sayResponse = this.say(sayRequest);
CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder();
responseBuilder.setData(Any.pack(sayResponse));
responseObserver.onNext(responseBuilder.build());
}
} finally {
responseObserver.onCompleted();
}
}
/**
* Handling of the 'say' method.
* Handling of the 'sayHello' method.
*
* @param request Request to say something.
* @return Response with when it was said.
*/
public SayResponse say(SayRequest request) {
Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
logger.info("greet to " + req.getName());
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
/**
* Server mode: starts listening on given port.
*
* @param port Port to listen on.
* @throws IOException Errors while trying to start service.
*/
private void start(int port) throws IOException {
server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create())
.addService(new HelloWorldImpl())
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown
// hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
HelloWorldService.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}
// Handles the request by printing message.
System.out.println("Server: " + request.getMessage());
System.out.println("@ " + utcNowAsString);
/**
* Server mode: waits for shutdown trigger.
*
* @throws InterruptedException Propagated interrupted exception.
*/
private void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
// Now respond with current timestamp.
SayResponse.Builder responseBuilder = SayResponse.newBuilder();
return responseBuilder.setTimestamp(utcNowAsString).build();
/**
* Await termination on the main thread since the grpc library uses daemon
* threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
/**
* This is the main method of this app.
*
* @param args The port to listen on.
* @throws Exception An Exception.
*/
@ -153,8 +132,9 @@ public class HelloWorldService {
// If port string is not valid, it will throw an exception.
int port = Integer.parseInt(cmd.getOptionValue("port"));
final GrpcHelloWorldDaprService service = new GrpcHelloWorldDaprService();
final HelloWorldService service = new HelloWorldService();
service.start(port);
service.awaitTermination();
service.blockUntilShutdown();
}
}

View File

@ -34,53 +34,51 @@ cd examples
### Running the example's service
The first component is the service. It has a simple API with the `Say` method. This method will print out each message received from the client. The proto file below contains the description of the HelloWorld service found in the `./proto/examples/helloworld.proto` file:
The first component is the service. It has a simple API with the `SayHello` method. This method will print out each message received from the client. The proto file below contains the description of the HelloWorld service found in the `./proto/examples/helloworld.proto` file:
```text
service HelloWorld {
rpc Say (SayRequest) returns (SayResponse) {}
}
message SayRequest {
string message = 1;
}
message SayResponse {
string timestamp = 1;
}
service HelloWorld {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
```
In the `HelloWorldService.java` file, you will find the `HelloWorldService` class, containing the main method. The service implementation happens in the `GrpcHelloWorldDaprService` class. You can see that it extends `DaprClientImplBase` instead of `HelloWorldImplBase`. This is because this service will be called by Dapr, so it implements the service API expected by Dapr. The `DaprClientImplBase` class is part of this SDK. In a real-world application, the service would still implement it's main API as well. The Dapr's API would be exposed as an additional service. In this example, we are implementing Dapr's API only. Modifying this example to expose `HelloWorldService` is offered as an exercise to the reader.
In the `HelloWorldService.java` file, you will find the `HelloWorldService` class, containing the main method and handling logic. You can see that it extends `HelloWorldImplBase` automatically generated by proto to implement `sayhello` method.
```java
private static class GrpcHelloWorldDaprService extends DaprClientGrpc.DaprClientImplBase {
///...
@Override
public void onInvoke(DaprClientProtos.InvokeEnvelope request, StreamObserver<Any> responseObserver) {
try {
if ("say".equals(request.getMethod())) {
SayRequest sayRequest =
SayRequest.newBuilder().setMessage(request.getData().getValue().toStringUtf8()).build();
SayResponse sayResponse = this.say(sayRequest);
CommonProtos.InvokeResponse.Builder responseBuilder = CommonProtos.InvokeResponse.newBuilder();
responseBuilder.setData(Any.pack(sayResponse));
responseObserver.onNext(responseBuilder.build());
}
} finally {
responseObserver.onCompleted();
}
}
///...
}
static class HelloWorldImpl extends HelloWorldGrpc.HelloWorldImplBase {
/**
* Handling of the 'sayHello' method.
*
* @param request Request to say something.
* @return Response with when it was said.
*/
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
```
In the `GrpcHelloWorldDaprService` class, the `onInvoke` method is the most important. It is called by Dapr's runtime containing information that this code needs to redirect the request to the correct underlying method. In this case, the only method supported is the `say` method. So, it checks for the method requested and builds the `SayRequest` object from Dapr's envelope request. Once a `SayResponse` instance is ready, it serializes it into Dapr's envelope response object and returns.
Now run the service code:
<!-- STEP
name: Run demo service
expected_stdout_lines:
- '== APP == Server: "Message #0"'
- '== APP == Server: "Message #1"'
- '== APP == INFO: greet to World'
background: true
sleep: 1
-->
@ -95,55 +93,59 @@ The `app-id` argument is used to identify this service in Dapr's runtime. The `a
### Running the example's client
The other component is the client. It will send one message per second to the service via Dapr's invoke API using Dapr's SDK. Open the `HelloWorldClient.java` file, it uses the Dapr's Java SDK to invoke the `say` method on the service above:
The other component is the client. It will add user name to the grpc request and send it to the server. Open the `HelloWorldClient.java` file, it creates a new grpc channel and sends request directly to the dapr side car through this channel.
```java
private static class HelloWorldClient {
///...
public static void main(String[] args) throws Exception {
try (DaprClient client = new DaprClientBuilder().build()) {
String serviceAppId = "hellogrpc";
String method = "say";
int count = 0;
while (true) {
String message = "Message #" + (count++);
System.out.println("Sending message: " + message);
client.invokeMethod(serviceAppId, method, message, HttpExtension.NONE).block();
System.out.println("Message sent: " + message);
String user = "World";
// Access a service running on the local machine on port 50051
String target = "localhost:50051";
Thread.sleep(1000);
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
// This is an example, so for simplicity we are just exiting here.
// Normally a dapr app would be a web service and not exit main.
System.out.println("Done");
try {
HelloWorldGrpc.HelloWorldBlockingStub blockingStub = HelloWorldGrpc.newBlockingStub(channel);
Metadata headers = new Metadata();
headers.put(Metadata.Key.of("dapr-app-id", Metadata.ASCII_STRING_MARSHALLER),
"hellogrpc");
blockingStub = MetadataUtils.attachHeaders(blockingStub, headers);
logger.info("Will try to greet " + user + " ...");
try {
HelloRequest request = HelloRequest.newBuilder().setName(user).build();
HelloReply response = blockingStub.sayHello(request);
logger.info("Greeting: " + response.getMessage());
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
}
} finally {
// To prevent leaking resources like threads and TCP connections
// the channel should be shut down when it will no longer be used.
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
///...
}
```
First, it creates an instance of `DaprClient` via `DaprClientBuilder`. The protocol used by DaprClient is transparent to the application. The HTTP and GRPC ports used by Dapr's sidecar are automatically chosen and exported as environment variables: `DAPR_HTTP_PORT` and `DAPR_GRPC_PORT`. Dapr's Java SDK references these environment variables when communicating to Dapr's sidecar. The Dapr client is also within a try-with-resource block to properly close the client at the end.
Finally, it will go through in an infinite loop and invoke the `say` method every second. Notice the use of `block()` on the return from `invokeMethod` - it is required to actually make the service invocation via a [Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) object.
Finally, open a new command line terminal and run the client code to send some messages.
<!-- STEP
name: Run demo client
expected_stdout_lines:
- '== APP == Sending message: Message #0'
- '== APP == Message sent: Message #0'
- '== APP == Sending message: Message #1'
- '== APP == Message sent: Message #1'
- '== APP == INFO: Will try to greet World ...'
- '== APP == INFO: Greeting: Hello World'
background: true
sleep: 10
-->
```bash
dapr run --app-id invokegrpc -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.invoke.grpc.HelloWorldClient
dapr run --app-id invokegrpc --app-protocol grpc -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.invoke.grpc.HelloWorldClient
```
<!-- END_STEP -->
@ -164,4 +166,3 @@ dapr stop --app-id invokegrpc
<!-- END_STEP -->
Thanks for playing.