Make DaprClient Closeable and properly close GRPC Managed Channel (#311)

This commit is contained in:
Mukundan Sundararajan 2020-07-30 13:34:04 -07:00 committed by GitHub
parent a388a6a90f
commit 14fded8d18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 511 additions and 342 deletions

View File

@ -8,6 +8,8 @@ package io.dapr.examples.bindings.http;
import io.dapr.client.DaprClient; import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder; import io.dapr.client.DaprClientBuilder;
import java.io.IOException;
/** /**
* Service for output binding example. * Service for output binding example.
* 1. From your repo root, build and install jars: * 1. From your repo root, build and install jars:
@ -36,8 +38,8 @@ public class OutputBindingExample {
* @param args Not used. * @param args Not used.
*/ */
@SuppressWarnings("checkstyle:AbbreviationAsWordInName") @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public static void main(String[] args) { public static void main(String[] args) throws IOException {
DaprClient client = new DaprClientBuilder().build(); try (DaprClient client = new DaprClientBuilder().build()) {
int count = 0; int count = 0;
while (!Thread.currentThread().isInterrupted()) { while (!Thread.currentThread().isInterrupted()) {
@ -68,3 +70,4 @@ public class OutputBindingExample {
System.out.println("Done."); System.out.println("Done.");
} }
} }
}

View File

@ -101,7 +101,7 @@ dapr run --components-path ./components --app-id inputbinding --app-port 3000 --
The output binding application is a simple java class with a main method that uses the Dapr Client to invoke binding. The output binding application is a simple java class with a main method that uses the Dapr Client to invoke binding.
In the `OutputBindingExample.java` file, you will find the `OutputBindingExample` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and recieved objects, and second is for objects to be persisted. The client publishes events using `invokeBinding` method. See the code snippet below: In the `OutputBindingExample.java` file, you will find the `OutputBindingExample` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and recieved objects, and second is for objects to be persisted. The client publishes events using `invokeBinding` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
```java ```java
public class OutputBindingExample{ public class OutputBindingExample{
///... ///...
@ -109,17 +109,37 @@ public class OutputBindingExample {
static final String BINDING_OPERATION = "create"; static final String BINDING_OPERATION = "create";
///... ///...
public static void main(String[] args) { public static void main(String[] args) throws IOException {
DaprClient client = new DaprClientBuilder().build(); try (DaprClient client = new DaprClientBuilder().build()) {
///...
int count = 0;
while (!Thread.currentThread().isInterrupted()) {
String message = "Message #" + (count++);
// Randomly decides between a class type or string type to be sent.
if (Math.random() >= 0.5) {
// This is an example of sending data in a user-defined object. The input binding will receive:
// {"message":"hello"}
MyClass myClass = new MyClass(); MyClass myClass = new MyClass();
myClass.message = message; myClass.message = message;
System.out.println("sending an object instance with message: " + myClass.message); System.out.println("sending a class with message: " + myClass.message);
client.invokeBinding(BINDING_NAME, BINDING_OPERATION, myClass); //Binding a data object client.invokeBinding(BINDING_NAME, BINDING_OPERATION, myClass).block();
///... } else {
System.out.println("sending a plain string: " + m); System.out.println("sending a plain string: " + message);
client.invokeBinding(BINDING_NAME, BINDING_OPERATION, message); //Binding a plain string text client.invokeBinding(BINDING_NAME, BINDING_OPERATION, message).block();
}
try {
Thread.sleep((long) (10000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
System.out.println("Done.");
}
} }
///... ///...
} }
@ -145,4 +165,9 @@ Once running, the InputBindingExample should print the output as follows:
Events have been retrieved from the binding. Events have been retrieved from the binding.
For bringing down the kafka cluster that was started in the beginning, run
```sh
docker-compose -f ./src/main/java/io/dapr/examples/bindings/http/docker-compose-single-kafka.yml down
```
For more details on Dapr Spring Boot integration, please refer to [Dapr Spring Boot](../../../springboot/DaprApplication.java) Application implementation. For more details on Dapr Spring Boot integration, please refer to [Dapr Spring Boot](../../../springboot/DaprApplication.java) Application implementation.

View File

@ -9,6 +9,8 @@ import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder; import io.dapr.client.DaprClientBuilder;
import io.dapr.client.HttpExtension; import io.dapr.client.HttpExtension;
import java.io.IOException;
/** /**
* 1. Build and install jars: * 1. Build and install jars:
* mvn clean install * mvn clean install
@ -23,8 +25,8 @@ public class HelloWorldClient {
* *
* @param args Array of messages to be sent. * @param args Array of messages to be sent.
*/ */
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException, IOException {
DaprClient client = new DaprClientBuilder().build(); try (DaprClient client = new DaprClientBuilder().build()) {
String serviceAppId = "hellogrpc"; String serviceAppId = "hellogrpc";
String method = "say"; String method = "say";
@ -44,3 +46,4 @@ public class HelloWorldClient {
} }
} }
} }
}

View File

@ -87,7 +87,7 @@ The other component is the client. It will send one message per second to the se
private static class HelloWorldClient { private static class HelloWorldClient {
///... ///...
public static void main(String[] args) { public static void main(String[] args) {
DaprClient client = new DaprClientBuilder().build(); try (DaprClient client = new DaprClientBuilder().build()) {
String serviceAppId = "hellogrpc"; String serviceAppId = "hellogrpc";
String method = "say"; String method = "say";
@ -100,13 +100,18 @@ private static class HelloWorldClient {
System.out.println("Message sent: " + message); System.out.println("Message sent: " + message);
Thread.sleep(1000); Thread.sleep(1000);
// This is an example, so for simplicity we are just exiting here.
// Normally a dapr app would be a web service and not exit main.
System.out.println("Done");
}
} }
} }
///... ///...
} }
``` ```
First, it creates an instance of `DaprClient` via `DaprClientBuilder`. The protocol used by DaprClient is transparent to the application. The HTTP and GRPC ports used by Dapr's sidecar are automatically chosen and exported as environment variables: `DAPR_HTTP_PORT` and `DAPR_GRPC_PORT`. Dapr's Java SDK references these environment variables when communicating to Dapr's sidecar. First, it creates an instance of `DaprClient` via `DaprClientBuilder`. The protocol used by DaprClient is transparent to the application. The HTTP and GRPC ports used by Dapr's sidecar are automatically chosen and exported as environment variables: `DAPR_HTTP_PORT` and `DAPR_GRPC_PORT`. Dapr's Java SDK references these environment variables when communicating to Dapr's sidecar. The Dapr client is also within a try-with-resource block to properly close the client at the end.
Finally, it will go through in an infinite loop and invoke the `say` method every second. Notice the use of `block()` on the return from `invokeService` - it is required to actually make the service invocation via a [Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) object. Finally, it will go through in an infinite loop and invoke the `say` method every second. Notice the use of `block()` on the return from `invokeService` - it is required to actually make the service invocation via a [Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) object.

View File

@ -9,6 +9,8 @@ import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder; import io.dapr.client.DaprClientBuilder;
import io.dapr.client.HttpExtension; import io.dapr.client.HttpExtension;
import java.io.IOException;
/** /**
* 1. Build and install jars: * 1. Build and install jars:
* mvn clean install * mvn clean install
@ -29,8 +31,8 @@ public class InvokeClient {
* *
* @param args Messages to be sent as request for the invoke API. * @param args Messages to be sent as request for the invoke API.
*/ */
public static void main(String[] args) { public static void main(String[] args) throws IOException {
DaprClient client = (new DaprClientBuilder()).build(); try (DaprClient client = (new DaprClientBuilder()).build()) {
for (String message : args) { for (String message : args) {
byte[] response = client.invokeService(SERVICE_APP_ID, "say", message, HttpExtension.POST, null, byte[] response = client.invokeService(SERVICE_APP_ID, "say", message, HttpExtension.POST, null,
byte[].class).block(); byte[].class).block();
@ -42,3 +44,4 @@ public class InvokeClient {
System.out.println("Done"); System.out.println("Done");
} }
} }
}

View File

@ -110,13 +110,18 @@ public class InvokeClient {
private static final String SERVICE_APP_ID = "invokedemo"; private static final String SERVICE_APP_ID = "invokedemo";
///... ///...
public static void main(String[] args) { public static void main(String[] args) throws IOException {
DaprClient client = (new DaprClientBuilder()).build(); try (DaprClient client = (new DaprClientBuilder()).build()) {
for (String message : args) { for (String message : args) {
byte[] response = client.invokeService(SERVICE_APP_ID, "say", byte[] response = client.invokeService(SERVICE_APP_ID, "say", message, HttpExtension.POST, null,
message, HttpExtension.POST, null, byte[].class).block(); byte[].class).block();
System.out.println(new String(response)); System.out.println(new String(response));
} }
// This is an example, so for simplicity we are just exiting here.
// Normally a dapr app would be a web service and not exit main.
System.out.println("Done");
}
} }
///... ///...
} }

View File

@ -8,6 +8,7 @@ package io.dapr.examples.pubsub.http;
import io.dapr.client.DaprClient; import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder; import io.dapr.client.DaprClientBuilder;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
/** /**
@ -32,7 +33,7 @@ public class Publisher {
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
//Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client //Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
DaprClient client = new DaprClientBuilder().build(); try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i); String message = String.format("This is message #%d", i);
//Publishing messages //Publishing messages
@ -60,3 +61,4 @@ public class Publisher {
System.out.println("Done."); System.out.println("Done.");
} }
} }
}

View File

@ -84,19 +84,22 @@ dapr run --components-path ./components --app-id subscriber --app-port 3000 --po
The other component is the publisher. It is a simple java application with a main method that uses the Dapr HTTP Client to publish 10 messages to an specific topic. The other component is the publisher. It is a simple java application with a main method that uses the Dapr HTTP Client to publish 10 messages to an specific topic.
In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and recieved objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. See the code snippet below: In the `Publisher.java` file, you will find the `Publisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: One is for Dapr's sent and recieved objects, and second is for objects to be persisted. The client publishes messages using `publishEvent` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
```java ```java
public class Publisher { public class Publisher {
private static final int NUM_MESSAGES = 10; private static final int NUM_MESSAGES = 10;
private static final String TOPIC_NAME = "testingtopic"; private static final String TOPIC_NAME = "testingtopic";
///... ///...
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
DaprClient client = new DaprClientBuilder().build(); //Creating the DaprClient: Using the default builder client produces an HTTP Dapr Client
try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i); String message = String.format("This is message #%d", i);
//Publishing messages
client.publishEvent(TOPIC_NAME, message).block(); client.publishEvent(TOPIC_NAME, message).block();
System.out.println("Published message: " + message); System.out.println("Published message: " + message);
//.. //...
}
} }
} }
///... ///...

View File

@ -43,8 +43,8 @@ mvn install
Before getting into the application code, follow these steps in order to setup a local instance of Vault. This is needed for the local instances. Steps are: Before getting into the application code, follow these steps in order to setup a local instance of Vault. This is needed for the local instances. Steps are:
1. navigate to the [repo-root] with `cd java-sdk` 1. navigate to the [examples] with `cd examples`
2. Run `docker-compose -f ./examples/src/main/java/io/dapr/examples/secrets/docker-compose-vault.yml up -d` to run the container locally 2. Run `docker-compose -f ./src/main/java/io/dapr/examples/secrets/docker-compose-vault.yml up -d` to run the container locally
3. Run `docker ps` to see the container running locally: 3. Run `docker ps` to see the container running locally:
```bash ```bash
@ -61,6 +61,8 @@ Dapr's API for secret store only support read operations. For this sample to run
vault login myroot vault login myroot
``` ```
> Note: If you get `http: server gave HTTP response to HTTPS client` make sure the local vault address is set `export VAULT_ADDR=http://127.0.0.1:8200/`
2. Create secret (replace `[my favorite movie]` with a title of our choice): 2. Create secret (replace `[my favorite movie]` with a title of our choice):
```bash ```bash
vault kv put secret/dapr/movie title="[my favorite movie]" vault kv put secret/dapr/movie title="[my favorite movie]"
@ -78,24 +80,36 @@ The example's main function is in `SecretClient.java`.
```java ```java
public class SecretClient { public class SecretClient {
/**
* Identifier in Dapr for the secret store.
*/
private static final String SECRET_STORE_NAME = "vault"; private static final String SECRET_STORE_NAME = "vault";
/**
* JSON Serializer to print output.
*/
private static final ObjectMapper JSON_SERIALIZER = new ObjectMapper();
///... ///...
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
///... if (args.length != 1) {
throw new IllegalArgumentException("Use one argument: secret's key to be retrieved.");
}
String secretKey = args[0]; String secretKey = args[0];
DaprClient client = (new DaprClientBuilder()).build(); try (DaprClient client = (new DaprClientBuilder()).build()) {
Map<String, String> secret = client.getSecret(SECRET_STORE_NAME, secretKey).block(); Map<String, String> secret = client.getSecret(SECRET_STORE_NAME, secretKey).block();
System.out.println(JSON_SERIALIZER.writeValueAsString(secret)); System.out.println(JSON_SERIALIZER.writeValueAsString(secret));
} }
}
///... ///...
} }
``` ```
The program receives one and only one argument: the secret's key to be fetched. The program receives one and only one argument: the secret's key to be fetched.
After identifying the key to be fetched, it will retrieve it from the pre-defined secret store: `vault`. After identifying the key to be fetched, it will retrieve it from the pre-defined secret store: `vault`.
The secret store's name **must** match the component's name defined in `< repo dir >/examples/components/hashicorp_vault.yaml`. The secret store's name **must** match the component's name defined in `< repo dir >/examples/components/hashicorp_vault.yaml`.
The Dapr client is also within a try-with-resource block to properly close the client at the end.
Execute the follow script in order to run the example: Execute the follow script in order to run the example:
```sh ```sh
@ -109,4 +123,11 @@ Once running, the program should print the output as follows:
== APP == {"title":"[my favorite movie]"} == APP == {"title":"[my favorite movie]"}
``` ```
To close the app, press CTRL+c.
To cleanup and bring the vault container down, run
```sh
docker-compose -f ./src/main/java/io/dapr/examples/secrets/docker-compose-vault.yml down
```
Thanks for playing. Thanks for playing.

View File

@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.DaprClient; import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder; import io.dapr.client.DaprClientBuilder;
import java.io.IOException;
import java.util.Map; import java.util.Map;
/** /**
@ -44,8 +45,9 @@ public class SecretClient {
} }
String secretKey = args[0]; String secretKey = args[0];
DaprClient client = (new DaprClientBuilder()).build(); try (DaprClient client = (new DaprClientBuilder()).build()) {
Map<String, String> secret = client.getSecret(SECRET_STORE_NAME, secretKey).block(); Map<String, String> secret = client.getSecret(SECRET_STORE_NAME, secretKey).block();
System.out.println(JSON_SERIALIZER.writeValueAsString(secret)); System.out.println(JSON_SERIALIZER.writeValueAsString(secret));
} }
} }
}

View File

@ -32,21 +32,21 @@ cd examples
### Running the StateClient ### Running the StateClient
This example uses the Java SDK Dapr client in order to save, retrieve and delete a state, in this case, an instance of a class. Multiple state stores are supported since Dapr 0.4. See the code snippet bellow: This example uses the Java SDK Dapr client in order to save, retrieve and delete a state, in this case, an instance of a class. Multiple state stores are supported since Dapr 0.4. See the code snippet bellow:
``` ```java
public class StateClient { public class StateClient {
///... ///...
private static final String STATE_STORE_NAME = "statestore"; private static final String STATE_STORE_NAME = "statestore";
private static final String KEY_NAME = "mykey"; private static final String KEY_NAME = "mykey";
///... ///...
public static void main(String[] args) { public static void main(String[] args) throws IOException {
DaprClient client = new DaprClientBuilder().build(); try (DaprClient client = new DaprClientBuilder().build()) {
String message = args.length == 0 ? " " : args[0]; String message = args.length == 0 ? " " : args[0];
MyClass myClass = new MyClass(); MyClass myClass = new MyClass();
myClass.message = message; myClass.message = message;
client.saveState(KEY_NAME, myClass).block(); client.saveState(STATE_STORE_NAME, KEY_NAME, myClass).block();
System.out.println("Saving class with message: " + message); System.out.println("Saving class with message: " + message);
Mono<State<MyClass>> retrievedMessageMono = client.getState(STATE_STORE_NAME, KEY_NAME, MyClass.class); Mono<State<MyClass>> retrievedMessageMono = client.getState(STATE_STORE_NAME, KEY_NAME, MyClass.class);
@ -58,10 +58,15 @@ public class StateClient {
Mono<State<MyClass>> retrievedDeletedMessageMono = client.getState(STATE_STORE_NAME, KEY_NAME, MyClass.class); Mono<State<MyClass>> retrievedDeletedMessageMono = client.getState(STATE_STORE_NAME, KEY_NAME, MyClass.class);
System.out.println("Trying to retrieve deleted state: " + retrievedDeletedMessageMono.block().getValue()); System.out.println("Trying to retrieve deleted state: " + retrievedDeletedMessageMono.block().getValue());
// This is an example, so for simplicity we are just exiting here.
// Normally a dapr app would be a web service and not exit main.
System.out.println("Done");
}
} }
} }
``` ```
The code uses the `DaprClient` created by the `DaprClientBuilder`. Notice that this builder uses default settings. Internally, it is using `DefaultObjectSerializer` for two properties: `objectSerializer` is for Dapr's sent and recieved objects, and `stateSerializer` is for objects to be persisted. This client performs three operations: `client.saveState(...)` for persisting an instance of `MyClass`, then uses the `client.getState(...)` operation in order to retrieve back the persisted state using the same key. `client.deleteState(...)` operation is used to remove the persisted state. Finally, the code tries to retrieve the deleted state, which should not be found. The code uses the `DaprClient` created by the `DaprClientBuilder`. Notice that this builder uses default settings. Internally, it is using `DefaultObjectSerializer` for two properties: `objectSerializer` is for Dapr's sent and received objects, and `stateSerializer` is for objects to be persisted. This client performs three operations: `client.saveState(...)` for persisting an instance of `MyClass`, then uses the `client.getState(...)` operation in order to retrieve back the persisted state using the same key. `client.deleteState(...)` operation is used to remove the persisted state. Finally, the code tries to retrieve the deleted state, which should not be found. The Dapr client is also within a try-with-resource block to properly close the client at the end.
### Running the example ### Running the example

View File

@ -10,6 +10,8 @@ import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.State; import io.dapr.client.domain.State;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.io.IOException;
/** /**
* 1. Build and install jars: * 1. Build and install jars:
* mvn clean install * mvn clean install
@ -32,8 +34,8 @@ public class StateClient {
* Executes the sate actions. * Executes the sate actions.
* @param args messages to be sent as state value. * @param args messages to be sent as state value.
*/ */
public static void main(String[] args) { public static void main(String[] args) throws IOException {
DaprClient client = new DaprClientBuilder().build(); try (DaprClient client = new DaprClientBuilder().build()) {
String message = args.length == 0 ? " " : args[0]; String message = args.length == 0 ? " " : args[0];
MyClass myClass = new MyClass(); MyClass myClass = new MyClass();
@ -57,3 +59,4 @@ public class StateClient {
System.out.println("Done"); System.out.println("Done");
} }
} }
}

View File

@ -73,7 +73,7 @@ public class BindingIT extends BaseIT {
daprRun.switchToHTTP(); daprRun.switchToHTTP();
} }
DaprClient client = new DaprClientBuilder().build(); try(DaprClient client = new DaprClientBuilder().build()) {
// This is an example of sending data in a user-defined object. The input binding will receive: // This is an example of sending data in a user-defined object. The input binding will receive:
// {"message":"hello"} // {"message":"hello"}
@ -121,3 +121,4 @@ public class BindingIT extends BaseIT {
}, 8000); }, 8000);
} }
} }
}

View File

@ -6,11 +6,13 @@ import io.dapr.client.DaprHttp;
import io.dapr.client.HttpExtension; import io.dapr.client.HttpExtension;
import io.dapr.it.BaseIT; import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun; import io.dapr.it.DaprRun;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.*; import java.util.*;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -58,11 +60,11 @@ public class MethodInvokeIT extends BaseIT {
} }
@Test @Test
public void testInvoke() { public void testInvoke() throws IOException {
// At this point, it is guaranteed that the service above is running and all ports being listened to. // At this point, it is guaranteed that the service above is running and all ports being listened to.
DaprClient client = new DaprClientBuilder().build(); try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d", i); String message = String.format("This is message #%d", i);
//Publishing messages //Publishing messages
@ -82,13 +84,12 @@ public class MethodInvokeIT extends BaseIT {
client.invokeService(daprRun.getAppName(), "messages/2", "updated message".getBytes(), HttpExtension.PUT).block(); client.invokeService(daprRun.getAppName(), "messages/2", "updated message".getBytes(), HttpExtension.PUT).block();
messages = client.invokeService(daprRun.getAppName(), "messages", null, HttpExtension.GET, Map.class).block(); messages = client.invokeService(daprRun.getAppName(), "messages", null, HttpExtension.GET, Map.class).block();
assertEquals("updated message", messages.get("2")); assertEquals("updated message", messages.get("2"));
}
} }
@Test @Test
public void testInvokeWithObjects() { public void testInvokeWithObjects() throws IOException {
DaprClient client = new DaprClientBuilder().build(); try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
Person person = new Person(); Person person = new Person();
person.setName(String.format("Name %d", i)); person.setName(String.format("Name %d", i));
@ -120,3 +121,4 @@ public class MethodInvokeIT extends BaseIT {
assertEquals("Smith", resultPerson.getLastName()); assertEquals("Smith", resultPerson.getLastName());
} }
} }
}

View File

@ -37,6 +37,7 @@ public class PubSubIT extends BaseIT {
/** /**
* Parameters for this test. * Parameters for this test.
* Param #1: useGrpc. * Param #1: useGrpc.
*
* @return Collection of parameter tuples. * @return Collection of parameter tuples.
*/ */
@Parameterized.Parameters @Parameterized.Parameters
@ -65,7 +66,7 @@ public class PubSubIT extends BaseIT {
} }
// Send a batch of messages on one topic // Send a batch of messages on one topic
DaprClient client = new DaprClientBuilder().build(); try (DaprClient client = new DaprClientBuilder().build()) {
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
String message = String.format("This is message #%d on topic %s", i, TOPIC_NAME); String message = String.format("This is message #%d on topic %s", i, TOPIC_NAME);
//Publishing messages //Publishing messages
@ -94,7 +95,6 @@ public class PubSubIT extends BaseIT {
System.out.println("Checking results for topic " + TOPIC_NAME); System.out.println("Checking results for topic " + TOPIC_NAME);
final List<String> messages = client.invokeService(daprRun.getAppName(), "messages/testingtopic", null, HttpExtension.GET, List.class).block(); final List<String> messages = client.invokeService(daprRun.getAppName(), "messages/testingtopic", null, HttpExtension.GET, List.class).block();
assertEquals(11, messages.size()); assertEquals(11, messages.size());
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
assertTrue(messages.contains(String.format("This is message #%d on topic %s", i, TOPIC_NAME))); assertTrue(messages.contains(String.format("This is message #%d on topic %s", i, TOPIC_NAME)));
} }
@ -119,5 +119,6 @@ public class PubSubIT extends BaseIT {
} }
}, 2000); }, 2000);
} }
}
} }

View File

@ -14,12 +14,14 @@ import io.dapr.client.DaprClientHttp;
import io.dapr.it.BaseIT; import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun; import io.dapr.it.DaprRun;
import io.dapr.it.services.EmptyService; import io.dapr.it.services.EmptyService;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.*; import java.util.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -96,6 +98,11 @@ public class SecretsClientIT extends BaseIT {
} }
} }
@After
public void tearDown() throws IOException {
daprClient.close();
}
@Test @Test
public void getSecret() throws Exception { public void getSecret() throws Exception {
String key = UUID.randomUUID().toString(); String key = UUID.randomUUID().toString();

View File

@ -13,11 +13,13 @@ import io.dapr.client.domain.StateOptions;
import io.dapr.it.BaseIT; import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun; import io.dapr.it.DaprRun;
import io.dapr.it.services.EmptyService; import io.dapr.it.services.EmptyService;
import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -46,6 +48,11 @@ public class GRPCStateClientIT extends BaseIT {
assertTrue(daprClient instanceof DaprClientGrpc); assertTrue(daprClient instanceof DaprClientGrpc);
} }
@AfterClass
public static void tearDown() throws IOException {
daprClient.close();
}
@Test @Test
public void saveAndGetState() { public void saveAndGetState() {

View File

@ -64,5 +64,6 @@ public class HelloWorldClientIT extends BaseIT {
System.out.println("Got: " + value); System.out.println("Got: " + value);
Assert.assertEquals("", value); Assert.assertEquals("", value);
} }
channel.shutdown();
} }
} }

View File

@ -11,6 +11,7 @@ import io.dapr.client.domain.Verb;
import io.dapr.utils.TypeRef; import io.dapr.utils.TypeRef;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -19,7 +20,7 @@ import java.util.Map;
* *
* @see io.dapr.client.DaprClientBuilder for information on how to make instance for this interface. * @see io.dapr.client.DaprClientBuilder for information on how to make instance for this interface.
*/ */
public interface DaprClient { public interface DaprClient extends Closeable {
/** /**
* Publish an event. * Publish an event.

View File

@ -14,10 +14,13 @@ import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import java.io.Closeable;
/** /**
* A builder for the DaprClient, * A builder for the DaprClient,
* Currently only and HTTP Client will be supported. * Currently only and HTTP Client will be supported.
*/ */
public class DaprClientBuilder { public class DaprClientBuilder {
/** /**
@ -105,7 +108,13 @@ public class DaprClientBuilder {
throw new IllegalStateException("Invalid port."); throw new IllegalStateException("Invalid port.");
} }
ManagedChannel channel = ManagedChannelBuilder.forAddress(Constants.DEFAULT_HOSTNAME, port).usePlaintext().build(); ManagedChannel channel = ManagedChannelBuilder.forAddress(Constants.DEFAULT_HOSTNAME, port).usePlaintext().build();
return new DaprClientGrpc(DaprGrpc.newFutureStub(channel), this.objectSerializer, this.stateSerializer); Closeable closeableChannel = () -> {
if (channel != null && !channel.isShutdown()) {
channel.shutdown();
}
};
DaprGrpc.DaprFutureStub stub = DaprGrpc.newFutureStub(channel);
return new DaprClientGrpc(closeableChannel, stub, this.objectSerializer, this.stateSerializer);
} }
/** /**

View File

@ -19,6 +19,7 @@ import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos; import io.dapr.v1.DaprProtos;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -34,6 +35,11 @@ import static io.dapr.client.domain.StateOptions.RetryPolicy;
*/ */
public class DaprClientGrpc implements DaprClient { public class DaprClientGrpc implements DaprClient {
/**
* The GRPC managed channel to be used.
*/
private Closeable channel;
/** /**
* The GRPC client to be used. * The GRPC client to be used.
* *
@ -54,15 +60,18 @@ public class DaprClientGrpc implements DaprClient {
/** /**
* Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
* *
* @param closeableChannel A closeable for a Managed GRPC channel
* @param futureClient GRPC client * @param futureClient GRPC client
* @param objectSerializer Serializer for transient request/response objects. * @param objectSerializer Serializer for transient request/response objects.
* @param stateSerializer Serializer for state objects. * @param stateSerializer Serializer for state objects.
* @see DaprClientBuilder * @see DaprClientBuilder
*/ */
DaprClientGrpc( DaprClientGrpc(
Closeable closeableChannel,
DaprGrpc.DaprFutureStub futureClient, DaprGrpc.DaprFutureStub futureClient,
DaprObjectSerializer objectSerializer, DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) { DaprObjectSerializer stateSerializer) {
this.channel = closeableChannel;
this.client = futureClient; this.client = futureClient;
this.objectSerializer = objectSerializer; this.objectSerializer = objectSerializer;
this.stateSerializer = stateSerializer; this.stateSerializer = stateSerializer;
@ -642,4 +651,15 @@ public class DaprClientGrpc implements DaprClient {
return this.getSecret(secretStoreName, secretName, null); return this.getSecret(secretStoreName, secretName, null);
} }
/**
* Closes the ManagedChannel for GRPC.
* @see io.grpc.ManagedChannel#shutdown()
* @throws IOException on exception.
*/
@Override
public void close() throws IOException {
if (channel != null) {
channel.close();
}
}
} }

View File

@ -592,4 +592,8 @@ public class DaprClientHttp implements DaprClient {
return this.getSecret(secretStoreName, secretName, null); return this.getSecret(secretStoreName, secretName, null);
} }
@Override
public void close() throws IOException {
client.close();
}
} }

View File

@ -16,6 +16,7 @@ import okhttp3.Request;
import okhttp3.RequestBody; import okhttp3.RequestBody;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
@ -25,7 +26,7 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
public class DaprHttp { public class DaprHttp implements Closeable {
/** /**
* Dapr's http default scheme. * Dapr's http default scheme.
*/ */
@ -239,4 +240,13 @@ public class DaprHttp {
return OBJECT_MAPPER.readValue(json, DaprError.class); return OBJECT_MAPPER.readValue(json, DaprError.class);
} }
/**
* Shutdown call is not necessary for OkHttpClient.
* @see OkHttpClient
*/
@Override
public void close() throws IOException {
// No code needed
}
} }

View File

@ -12,18 +12,19 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.Empty; import com.google.protobuf.Empty;
import io.dapr.client.domain.State; import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.Verb;
import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.TypeRef; import io.dapr.utils.TypeRef;
import io.dapr.v1.CommonProtos; import io.dapr.v1.CommonProtos;
import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos; import io.dapr.v1.DaprProtos;
import org.checkerframework.checker.nullness.compatqual.NullableDecl; import org.checkerframework.checker.nullness.compatqual.NullableDecl;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.time.Duration; import java.time.Duration;
import java.util.Collections; import java.util.Collections;
@ -43,15 +44,25 @@ public class DaprClientGrpcTest {
private static final String SECRET_STORE_NAME = "MySecretStore"; private static final String SECRET_STORE_NAME = "MySecretStore";
private Closeable closeable;
private DaprGrpc.DaprFutureStub client; private DaprGrpc.DaprFutureStub client;
private DaprClientGrpc adapter; private DaprClientGrpc adapter;
private ObjectSerializer serializer; private ObjectSerializer serializer;
@Before @Before
public void setup() { public void setup() throws IOException {
closeable = mock(Closeable.class);
client = mock(DaprGrpc.DaprFutureStub.class); client = mock(DaprGrpc.DaprFutureStub.class);
adapter = new DaprClientGrpc(client, new DefaultObjectSerializer(), new DefaultObjectSerializer()); adapter = new DaprClientGrpc(closeable, client, new DefaultObjectSerializer(), new DefaultObjectSerializer());
serializer = new ObjectSerializer(); serializer = new ObjectSerializer();
doNothing().when(closeable).close();
}
@After
public void tearDown() throws IOException {
adapter.close();
verify(closeable).close();
verifyNoMoreInteractions(closeable);
} }
@Test(expected = RuntimeException.class) @Test(expected = RuntimeException.class)

View File

@ -434,6 +434,14 @@ public class DaprClientHttpTest {
assertNull(mono.block()); assertNull(mono.block());
} }
@Test(expected = IllegalArgumentException.class)
public void saveStateNullStateStoreName() {
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
Mono<Void> mono = daprClientHttp.saveStates(null, null);
assertNull(mono.block());
}
@Test @Test
public void saveStatesNull() { public void saveStatesNull() {
State<String> stateKeyValue = new State("value", "key", "", null); State<String> stateKeyValue = new State("value", "key", "", null);
@ -621,6 +629,13 @@ public class DaprClientHttpTest {
}); });
} }
@Test(expected = IllegalArgumentException.class)
public void getSecretsNullStoreName() {
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
daprClientHttp.getSecret(null, "key").block();
}
@Test @Test
public void getSecretsWithMetadata() { public void getSecretsWithMetadata() {
mockInterceptor.addRule() mockInterceptor.addRule()