Use SDK in GRPC invoke + add more invoke overloads. (#176)

This commit is contained in:
Artur Souza 2020-01-30 13:44:03 -08:00 committed by GitHub
parent 1af0bee418
commit 838224f18a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 121 additions and 190 deletions

View File

@ -5,140 +5,37 @@
package io.dapr.examples.invoke.grpc;
import static io.dapr.examples.DaprExamplesProtos.SayRequest;
import static io.dapr.examples.DaprExamplesProtos.SayResponse;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import io.dapr.DaprGrpc;
import io.dapr.DaprProtos.InvokeServiceEnvelope;
import io.dapr.DaprProtos.InvokeServiceResponseEnvelope;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.Verb;
/**
* 1. Build and install jars:
* mvn clean install
* 2. Send messages to the server:
* dapr run --protocol grpc --grpc-port 50001 -- mvn exec:java -pl=examples \
* -Dexec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldClient \
* -Dexec.args="-p 50001 'message one' 'message two'"
* dapr run -- mvn exec:java -pl=examples -D exec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldClient
*/
public class HelloWorldClient {
/**
* Client mode: class representing a client-side logic for calling HelloWorld over Dapr.
*/
private static class GrpcHelloWorldDaprClient {
/**
* Client communication channel: host, port and tls(on/off).
*/
private final ManagedChannel channel;
/**
* Calls will be done asynchronously.
*/
private final DaprGrpc.DaprFutureStub client;
/**
* Creates a Grpc client for the DaprGrpc service.
*
* @param host host for the remote service endpoint
* @param port port for the remote service endpoint
*/
public GrpcHelloWorldDaprClient(String host, int port) {
this(ManagedChannelBuilder
.forAddress("localhost", port)
.usePlaintext() // SSL/TLS is default, we turn it off just because this is a sample and not prod.
.build());
}
/**
* Helper constructor to build client from channel.
*
* @param channel The ManagedChannel.
*/
private GrpcHelloWorldDaprClient(ManagedChannel channel) {
this.channel = channel;
this.client = DaprGrpc.newFutureStub(channel);
}
/**
* Client mode: sends messages, one per second.
*
* @param messages The messages to send.
*/
private void sendMessages(String... messages)
throws ExecutionException, InterruptedException, InvalidProtocolBufferException {
List<ListenableFuture<InvokeServiceResponseEnvelope>> futureResponses = new ArrayList<>();
for (String message : messages) {
SayRequest request = SayRequest
.newBuilder()
.setMessage(message)
.build();
// Now, wrap the request with Dapr's envelope.
InvokeServiceEnvelope requestEnvelope = InvokeServiceEnvelope
.newBuilder()
.setId("hellogrpc") // Service's identifier.
.setData(Any.pack(request))
.setMethod("say") // The service's method to be invoked by Dapr.
.build();
futureResponses.add(client.invokeService(requestEnvelope));
System.out.println("Client: sent => " + message);
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
}
for (ListenableFuture<InvokeServiceResponseEnvelope> future : futureResponses) {
Any data = future.get().getData(); // Blocks waiting for response.
// IMPORTANT: do not use Any.unpack(), use Type.ParseFrom() instead.
SayResponse response = SayResponse.parseFrom(data.getValue());
System.out.println("Client: got response => " + response.getTimestamp());
}
}
/**
* Client mode: gracefully shutdown client within 1 min, otherwise force it.
*
* @throws InterruptedException Propagated interrupted exception.
*/
private void shutdown() throws InterruptedException {
this.channel.shutdown().awaitTermination(1, TimeUnit.MINUTES);
System.out.println("Client: Bye.");
}
}
/**
* The main method of this app.
* The main method of the client app.
*
* @param args Args representing the port the app will listen on.
* @throws Exception An Exception.
* @param args Array of messages to be sent.
*/
public static void main(String[] args) throws Exception {
Options options = new Options();
options.addRequiredOption("p", "port", true, "Port to listen or send event to.");
public static void main(String[] args) throws InterruptedException {
DaprClient client = new DaprClientBuilder().build();
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);
String serviceAppId = "hellogrpc";
String method = "say";
// If port string is not valid, it will throw an exception.
int port = Integer.parseInt(cmd.getOptionValue("port"));
int count = 0;
while (true) {
String message = "Message #" + (count++);
System.out.println("Sending message: " + message);
client.invokeService(Verb.POST, serviceAppId, method, message).block();
System.out.println("Message sent: " + message);
GrpcHelloWorldDaprClient helloWorldClient = new GrpcHelloWorldDaprClient("localhost", port);
helloWorldClient.sendMessages(cmd.getArgs());
helloWorldClient.shutdown();
Thread.sleep(1000);
}
}
}

View File

@ -30,8 +30,8 @@ import org.apache.commons.cli.Options;
* mvn clean install
* 2. Run in server mode:
* dapr run --app-id hellogrpc --app-port 5000 --protocol grpc \
* -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldService \
* -Dexec.args="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5009"
* -- mvn exec:java -pl=examples -D exec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldService \
* -D exec.args="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5009"
*/
public class HelloWorldService {
@ -97,14 +97,11 @@ public class HelloWorldService {
public void onInvoke(DaprClientProtos.InvokeEnvelope request, StreamObserver<Any> responseObserver) {
try {
if ("say".equals(request.getMethod())) {
// IMPORTANT: do not use Any.unpack(), use Type.ParseFrom() instead.
SayRequest sayRequest = SayRequest.parseFrom(request.getData().getValue());
SayRequest sayRequest =
SayRequest.newBuilder().setMessage(request.getData().getValue().toStringUtf8()).build();
SayResponse sayResponse = this.say(sayRequest);
responseObserver.onNext(Any.pack(sayResponse));
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
responseObserver.onError(e);
} finally {
responseObserver.onCompleted();
}

View File

@ -70,61 +70,49 @@ In the `GrpcHelloWorldDaprService` class, the `onInvoke` method is the most impo
Now run the service code:
```sh
dapr run --app-id hellogrpc --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldService -Dexec.args="-p 5000"
dapr run --app-id hellogrpc --app-port 5000 --protocol grpc -- mvn exec:java -pl=examples -D exec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldService -D exec.args="-p 5000"
```
The `app-id` argument is used to identify this service in Dapr's runtime. The `app-port` determines which port Dapr's runtime should call into this service. The `protocol` argument informs Dapr which protocol it should use: `grpc` or `http`(default).
The `app-id` argument is used to identify this service in Dapr's runtime. The `app-port` determines which port Dapr's runtime should call into this service. The `protocol` argument informs Dapr which protocol it should use to invoke the application: `grpc` or `http`(default).
### Running the example's client
The other component is the client. It will take in the messages via command line arguments and send each one to the service via Dapr's invoke API over Grpc. Open the `HelloWorldClient.java` file, it contains the `HelloWorldClient` class with the main method and also the `GrpcHelloWorldDaprClient` class. The `GrpcHelloWorldDaprClient` encapsulates an instance of the `DaprFutureStub` class because it is calling Dapr's API. Creating a client to call `HelloWorldService` directly can be an exercise for the reader. In the `GrpcHelloWorldDaprClient` class, the most important method is `sendMessages`. See the code snippet below:
The other component is the client. It will send one message per second to the service via Dapr's invoke API using Dapr's SDK. Open the `HelloWorldClient.java` file, it uses the Dapr's Java SDK to invoke the `say` method on the service above:
```java
private static class GrpcHelloWorldDaprClient {
private static class HelloWorldClient {
///...
private void sendMessages(String... messages) throws ExecutionException, InterruptedException, InvalidProtocolBufferException {
List<ListenableFuture<InvokeServiceResponseEnvelope>> futureResponses = new ArrayList<>();
for (String message : messages)
{
SayRequest request = SayRequest
.newBuilder()
.setMessage(message)
.build();
public static void main(String[] args) {
DaprClient client = new DaprClientBuilder().build();
// Now, wrap the request with Dapr's envelope.
InvokeServiceEnvelope requestEnvelope = InvokeServiceEnvelope
.newBuilder()
.setId("hellogrpc") // Service's identifier.
.setData(Any.pack(request))
.setMethod("say") // The service's method to be invoked by Dapr.
.build();
String serviceAppId = "hellogrpc";
String method = "say";
futureResponses.add(client.invokeService(requestEnvelope));
System.out.println("Client: sent => " + message);
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
}
int count = 0;
while (true) {
String message = "Message #" + (count++);
System.out.println("Sending message: " + message);
client.invokeService(Verb.POST, serviceAppId, method, message).block();
System.out.println("Message sent: " + message);
for (ListenableFuture<InvokeServiceResponseEnvelope> future : futureResponses) {
Any data = future.get().getData(); // Blocks waiting for response.
// IMPORTANT: do not use Any.unpack(), use Type.ParseFrom() instead.
SayResponse response = SayResponse.parseFrom(data.getValue());
System.out.println("Client: got response => " + response.getTimestamp());
}
}
Thread.sleep(1000);
}
}
}
///...
}
```
First, it goes through each message and creates a corresponding `SayRequest` object as if it would call the `HelloWorld` service directly. Then, the request object is wrapped into an instance of `InvokeServiceEnvelope`. As expected, the enveloped request is sent to Dapr's `invokeService` method. Once all responses are completed, they are unwrapped into `SayResponse` objects.
First, it creates an instance of `DaprClient` via `DaprClientBuilder`. The protocol used by DaprClient is transparent to the application. The HTTP and GRPC ports used by Dapr's sidecar are automatically chosen and exported as environment variables: `DAPR_HTTP_PORT` and `DAPR_GRPC_PORT`. Dapr's Java SDK references these environment variables when communicating to Dapr's sidecar.
Finally, open a new command line terminal and run the client code to send some messages. Feel free to play with the command line to send different messages:
Finally, it will go through in an infinite loop and invoke the `say` method every second. Notice the use of `block()` on the return from `invokeService` - it is required to actually make the service invocation via a [Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) object.
Finally, open a new command line terminal and run the client code to send some messages.
```sh
dapr run --protocol grpc --grpc-port 50001 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldClient -Dexec.args="-p 50001 'message one' 'message two'"
dapr run -- mvn exec:java -pl=examples -D exec.mainClass=io.dapr.examples.invoke.grpc.HelloWorldClient
```
Once the messages are sent, use `CTRL+C` to exit Dapr.
The `protocol` argument tells Dapr which protocol to use. In this command, `grpc-port` is specified so Dapr does not pick a random port and uses the requested port instead. The same port is passed in the client executable via the `p` argument. The last arguments into the Java's main method are the messages to be sent.
Thanks for playing.

View File

@ -56,6 +56,20 @@ public interface DaprClient {
<T, R> Mono<T> invokeService(
Verb verb, String appId, String method, R request, Map<String, String> metadata, Class<T> clazz);
/**
* Invoke a service without metadata, using serialization.
*
* @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value.
* @param appId The Application ID where the service is.
* @param method The actual Method to be call in the application.
* @param request The request to be sent to invoke the service.
* @param clazz the Type needed as return for the call.
* @param <T> the Type of the return, use byte[] to skip serialization.
* @param <R> The Type of the request, use byte[] to skip serialization.
* @return A Mono Plan of type clazz.
*/
<T, R> Mono<T> invokeService(Verb verb, String appId, String method, R request, Class<T> clazz);
/**
* Invoke a service without input, using serialization for response.
*
@ -82,6 +96,18 @@ public interface DaprClient {
*/
<R> Mono<Void> invokeService(Verb verb, String appId, String method, R request, Map<String, String> metadata);
/**
* Invoke a service with void response, no metadata and using serialization.
*
* @param verb The Verb to be used for HTTP will be the HTTP Verb, for GRPC is just a metadata value.
* @param appId The Application ID where the service is.
* @param method The actual Method to be call in the application.
* @param request The request to be sent to invoke the service.
* @param <R> The Type of the request, use byte[] to skip serialization.
* @return A Mono plan for Void.
*/
<R> Mono<Void> invokeService(Verb verb, String appId, String method, R request);
/**
* Invoke a service without input and void response.
*

View File

@ -125,12 +125,24 @@ public class DaprClientGrpc implements DaprClient {
*/
@Override
public <T> Mono<T> invokeService(
Verb verb,
String appId,
String method,
Map<String, String> metadata,
Class<T> clazz) {
return this.invokeService(verb, appId, method, null, null, clazz);
Verb verb, String appId, String method, Map<String, String> metadata, Class<T> clazz) {
return this.invokeService(verb, appId, method, null, metadata, clazz);
}
/**
* {@inheritDoc}
*/
@Override
public <T, R> Mono<T> invokeService(Verb verb, String appId, String method, R request, Class<T> clazz) {
return this.invokeService(verb, appId, method, request, null, clazz);
}
/**
* {@inheritDoc}
*/
@Override
public <R> Mono<Void> invokeService(Verb verb, String appId, String method, R request) {
return this.invokeService(verb, appId, method, request, null, byte[].class).then();
}
/**
@ -138,11 +150,7 @@ public class DaprClientGrpc implements DaprClient {
*/
@Override
public <R> Mono<Void> invokeService(
Verb verb,
String appId,
String method,
R request,
Map<String, String> metadata) {
Verb verb, String appId, String method, R request, Map<String, String> metadata) {
return this.invokeService(verb, appId, method, request, metadata, byte[].class).then();
}
@ -150,8 +158,9 @@ public class DaprClientGrpc implements DaprClient {
* {@inheritDoc}
*/
@Override
public Mono<Void> invokeService(Verb verb, String appId, String method, Map<String, String> metadata) {
return this.invokeService(verb, appId, method, null, metadata, Void.class).then();
public Mono<Void> invokeService(
Verb verb, String appId, String method, Map<String, String> metadata) {
return this.invokeService(verb, appId, method, null, metadata, byte[].class).then();
}
/**
@ -159,11 +168,7 @@ public class DaprClientGrpc implements DaprClient {
*/
@Override
public Mono<byte[]> invokeService(
Verb verb,
String appId,
String method,
byte[] request,
Map<String, String> metadata) {
Verb verb, String appId, String method, byte[] request, Map<String, String> metadata) {
return this.invokeService(verb, appId, method, request, metadata, byte[].class);
}

View File

@ -155,7 +155,23 @@ public class DaprClientHttp implements DaprClient {
@Override
public <T> Mono<T> invokeService(
Verb verb, String appId, String method, Map<String, String> metadata, Class<T> clazz) {
return this.invokeService(verb, appId, method, null, null, clazz);
return this.invokeService(verb, appId, method, null, metadata, clazz);
}
/**
* {@inheritDoc}
*/
@Override
public <T, R> Mono<T> invokeService(Verb verb, String appId, String method, R request, Class<T> clazz) {
return this.invokeService(verb, appId, method, request, null, clazz);
}
/**
* {@inheritDoc}
*/
@Override
public <R> Mono<Void> invokeService(Verb verb, String appId, String method, R request) {
return this.invokeService(verb, appId, method, request, null, byte[].class).then();
}
/**

View File

@ -142,7 +142,7 @@ public class DaprClientGrpcTest {
public void invokeServiceVoidExceptionThrownTest() {
when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class)))
.thenThrow(RuntimeException.class);
Mono<Void> result = adapter.invokeService(Verb.GET, "appId", "method", "request", null);
Mono<Void> result = adapter.invokeService(Verb.GET, "appId", "method", "request");
result.block();
}
@ -156,7 +156,7 @@ public class DaprClientGrpcTest {
settableFuture.setException(ex);
when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class)))
.thenReturn(settableFuture);
Mono<Void> result = adapter.invokeService(Verb.GET, "appId", "method", "request", null);
Mono<Void> result = adapter.invokeService(Verb.GET, "appId", "method", "request");
result.block();
}
@ -170,7 +170,7 @@ public class DaprClientGrpcTest {
addCallback(settableFuture, callback, directExecutor());
when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class)))
.thenReturn(settableFuture);
Mono<Void> result = adapter.invokeService(Verb.GET, "appId", "method", "request", null);
Mono<Void> result = adapter.invokeService(Verb.GET, "appId", "method", "request");
settableFuture.set(DaprProtos.InvokeServiceResponseEnvelope.newBuilder().setData(getAny("Value")).build());
result.block();
assertTrue(callback.wasCalled);
@ -187,7 +187,7 @@ public class DaprClientGrpcTest {
when(client.invokeService(any(DaprProtos.InvokeServiceEnvelope.class)))
.thenReturn(settableFuture);
MyObject request = new MyObject(1, "Event");
Mono<Void> result = adapter.invokeService(Verb.GET, "appId", "method", request, null);
Mono<Void> result = adapter.invokeService(Verb.GET, "appId", "method", request);
settableFuture.set(DaprProtos.InvokeServiceResponseEnvelope.newBuilder().setData(getAny("Value")).build());
result.block();
assertTrue(callback.wasCalled);
@ -311,7 +311,7 @@ public class DaprClientGrpcTest {
.thenThrow(RuntimeException.class);
String request = "Request";
byte[] byteRequest = serializer.serialize(request);
Mono<byte[]> result = adapter.invokeService(Verb.GET, "appId", "method", byteRequest, null);
Mono<byte[]> result = adapter.invokeService(Verb.GET, "appId", "method", byteRequest, byte[].class);
result.block();
}
@ -326,7 +326,8 @@ public class DaprClientGrpcTest {
.thenReturn(settableFuture);
String request = "Request";
byte[] byteRequest = serializer.serialize(request);
Mono<byte[]> result = adapter.invokeService(Verb.GET, "appId", "method", byteRequest, null);
Mono<byte[]> result =
adapter.invokeService(Verb.GET, "appId", "method", byteRequest, (HashMap<String, String>)null);
settableFuture.setException(ex);
result.block();
}
@ -344,7 +345,8 @@ public class DaprClientGrpcTest {
.thenReturn(settableFuture);
String request = "Request";
byte[] byteRequest = serializer.serialize(request);
Mono<byte[]> result = adapter.invokeService(Verb.GET, "appId", "method", byteRequest, null);
Mono<byte[]> result = adapter.invokeService(
Verb.GET, "appId", "method", byteRequest, (HashMap<String, String>)null);
byte[] byteOutput = result.block();
String strOutput = serializer.deserialize(byteOutput, String.class);
assertEquals(expected, strOutput);
@ -363,7 +365,7 @@ public class DaprClientGrpcTest {
.thenReturn(settableFuture);
String request = "Request";
byte[] byteRequest = serializer.serialize(request);
Mono<byte[]> result = adapter.invokeService(Verb.GET, "appId", "method", byteRequest, null);
Mono<byte[]> result = adapter.invokeService(Verb.GET, "appId", "method", byteRequest, byte[].class);
byte[] byteOutput = result.block();
assertEquals(resultObj, serializer.deserialize(byteOutput, MyObject.class));
}