Config API : HTTP APIs and Unsubscribe gRPC API changes (#740)

* Config API : Unsubscribe config items changes

Signed-off-by: pravinpushkar <ppushkar@microsoft.com>

* Unit and integration tests changes

Signed-off-by: pravinpushkar <ppushkar@microsoft.com>

* Addressing review comments

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* GetConfiguration HTTP API impl

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Subscribe and Unsubscribe HTTP changes

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Subscribe config http changes

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Http unsubscribe and unit tests

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Integration tests for http config apis

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Fixing lint

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Chnaging rersponse of HTTP Unsubscribe API

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Fix integration test failure

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* refactor IT tests

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* code clean up

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* setting dapr ref

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Update readme

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Modify Unit tests

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Remove failing buggy tests

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* fix review commnets and integration tests

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* trigger pr check

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Fix for getAll keys

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Fix subscribe All

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Move http subscribe handle to dapr controller

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Simplified examples

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* change dapr ref, and add unit test

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* trigger pr checks

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* trigger pr checks

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Use biConsumer to register handlers

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Re use args contructors and re add final modifier

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* put dapr run back

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* fix nitpick

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>

* Remove the springboot HTTP "convenience".

Signed-off-by: Artur Souza <artursouza.ms@outlook.com>

Signed-off-by: pravinpushkar <ppushkar@microsoft.com>
Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>
Signed-off-by: Artur Souza <artursouza.ms@outlook.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
Pravin Pushkar 2022-09-30 09:07:46 +05:30 committed by GitHub
parent 44f80c956d
commit 04e7298702
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1433 additions and 210 deletions

View File

@ -25,11 +25,11 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }}
DAPR_CLI_VER: 1.8.0-rc.1
DAPR_RUNTIME_VER: 1.8.0-rc.3
DAPR_CLI_VER: 1.8.1
DAPR_RUNTIME_VER: 1.8.4
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.8.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF: 683a436ff1367e4ff2d27da2d79069c04ec2c46d
DAPR_REF: 0e34de1086c697d7d74ac2af14ddb11f9346b57e
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}

View File

@ -36,11 +36,11 @@ jobs:
GOARCH: amd64
GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }}
DAPR_CLI_VER: 1.8.0-rc.1
DAPR_RUNTIME_VER: 1.8.0-rc.3
DAPR_CLI_VER: 1.8.1
DAPR_RUNTIME_VER: 1.8.4
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.8.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF: 683a436ff1367e4ff2d27da2d79069c04ec2c46d
DAPR_REF: 0e34de1086c697d7d74ac2af14ddb11f9346b57e
steps:
- uses: actions/checkout@v3
- name: Set up OpenJDK ${{ env.JDK_VER }}
@ -145,10 +145,14 @@ jobs:
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/unittesting/README.md
- name: Validate Configuration API example
- name: Validate Configuration gRPC API example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/configuration/grpc/README.md
- name: Validate Configuration HTTP API example
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/configuration/http/README.md
- name: Validate query state HTTP example
working-directory: ./examples
run: |

View File

@ -18,6 +18,8 @@ import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.GetConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -25,6 +27,7 @@ import reactor.core.publisher.Mono;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@ -43,41 +46,8 @@ public class ConfigurationClient {
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
System.out.println("Using preview client...");
getConfigurationForaSingleKey(client);
getConfigurationsUsingVarargs(client);
getConfigurations(client);
subscribeConfigurationRequestWithSubscribe(client);
}
}
/**
* Gets configuration for a single key.
*
* @param client DaprPreviewClient object
*/
public static void getConfigurationForaSingleKey(DaprPreviewClient client) {
System.out.println("*******trying to retrieve configuration given a single key********");
try {
Mono<ConfigurationItem> item = client.getConfiguration(CONFIG_STORE_NAME, keys.get(0));
System.out.println("Value ->" + item.block().getValue() + " key ->" + item.block().getKey());
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
/**
* Gets configurations for varibale no. of arguments.
*
* @param client DaprPreviewClient object
*/
public static void getConfigurationsUsingVarargs(DaprPreviewClient client) {
System.out.println("*******trying to retrieve configurations for a variable no. of keys********");
try {
Mono<Map<String, ConfigurationItem>> items =
client.getConfiguration(CONFIG_STORE_NAME, "myconfig1", "myconfig3");
items.block().forEach((k,v) -> print(v, k));
} catch (Exception ex) {
System.out.println(ex.getMessage());
subscribeConfigurationRequest(client);
}
}
@ -106,36 +76,25 @@ public class ConfigurationClient {
*
* @param client DaprPreviewClient object
*/
public static void subscribeConfigurationRequestWithSubscribe(DaprPreviewClient client) {
System.out.println("*****Subscribing to keys using subscribe method: " + keys.toString() + " *****");
AtomicReference<Disposable> disposableAtomicReference = new AtomicReference<>();
SubscribeConfigurationRequest req = new SubscribeConfigurationRequest(CONFIG_STORE_NAME, keys);
public static void subscribeConfigurationRequest(DaprPreviewClient client) {
System.out.println("Subscribing to key: myconfig1");
SubscribeConfigurationRequest req = new SubscribeConfigurationRequest(
CONFIG_STORE_NAME, Collections.singletonList("myconfig1"));
Flux<SubscribeConfigurationResponse> outFlux = client.subscribeConfiguration(req);
Runnable subscribeTask = () -> {
Flux<Map<String, ConfigurationItem>> outFlux = client.subscribeToConfiguration(req);
disposableAtomicReference.set(outFlux
.subscribe(
cis -> cis.forEach((k,v) -> print(v, k))
));
outFlux.subscribe(cis -> {
System.out.println("subscription ID : " + cis.getSubscriptionId());
System.out.println("subscribing to key myconfig1 is successful");
});
};
new Thread(subscribeTask).start();
// To ensure main thread does not die before outFlux subscribe gets called
inducingSleepTime(5000);
}
private static void inducingSleepTime(int timeInMillis) {
try {
// To ensure that subscribeThread gets scheduled
Thread.sleep(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
Runnable updateKeys = () -> {
int i = 1;
while (i <= 3) {
executeDockerCommand(i);
i++;
}
};
new Thread(updateKeys).start();
try {
// To ensure main thread does not die before outFlux subscribe gets called
Thread.sleep(10000);
disposableAtomicReference.get().dispose();
Thread.sleep(timeInMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
@ -144,22 +103,4 @@ public class ConfigurationClient {
private static void print(ConfigurationItem item, String key) {
System.out.println(item.getValue() + " : key ->" + key);
}
private static void executeDockerCommand(int postfix) {
String[] command = new String[] {
"docker", "exec", "dapr_redis", "redis-cli",
"SET",
"myconfig" + postfix, "update_myconfigvalue" + postfix + "||2"
};
ProcessBuilder processBuilder = new ProcessBuilder(command);
Process process = null;
try {
process = processBuilder.start();
process.waitFor();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -7,7 +7,8 @@ This example provides the different capabilities provided by Dapr Java SDK for C
The java SDK exposes several methods for this -
* `client.getConfiguration(...)` for getting a configuration for a single/multiple keys.
* `client.subscribeToConfigurations(...)` for subscribing to a list of keys for any change.
* `client.subscribeConfiguration(...)` for subscribing to a list of keys for any change.
* `client.unsubscribeConfiguration(...)` for unsubscribing to changes from subscribed items.
## Pre-requisites
@ -48,6 +49,81 @@ docker exec dapr_redis redis-cli MSET myconfig1 "val1||1" myconfig2 "val2||1" my
### Running the example
This example uses the Java SDK Dapr client in order to **Get, Subscribe and Unsubscribe** from configuration items and utilizes `Redis` as configuration store.
`ConfigurationClient.java` is the example class demonstrating all 3 features.
Kindly check [DaprPreviewClient.java](https://github.com/dapr/java-sdk/blob/master/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java) for detailed description of the supported APIs.
```java
public class ConfigurationClient {
// ...
/**
* Executes various methods to check the different apis.
* @param args arguments
* @throws Exception throws Exception
*/
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
System.out.println("Using preview client...");
getConfigurations(client);
subscribeConfigurationRequestWithSubscribe(client);
unsubscribeConfigurationItems(client);
}
}
/**
* Gets configurations for a list of keys.
*
* @param client DaprPreviewClient object
*/
public static void getConfigurations(DaprPreviewClient client) {
System.out.println("*******trying to retrieve configurations for a list of keys********");
List<String> keys = new ArrayList<>();
// ...
GetConfigurationRequest req = new GetConfigurationRequest(CONFIG_STORE_NAME, keys);
try {
Mono<List<ConfigurationItem>> items = client.getConfiguration(req);
// ..
} catch (Exception ex) {}
}
/**
* Subscribe to a list of keys.Optional to above iterator way of retrieving the changes
*
* @param client DaprPreviewClient object
*/
public static void subscribeConfigurationRequestWithSubscribe(DaprPreviewClient client) {
System.out.println("Subscribing to key: myconfig1");
// ...
Runnable subscribeTask = () -> {
Flux<SubscribeConfigurationResponse> outFlux = client.subscribeConfiguration(req);
// ...
};
// ..
}
/**
* Unsubscribe using subscription id.
*
* @param client DaprPreviewClient object
*/
public static void unsubscribeConfigurationItems(DaprPreviewClient client) {
System.out.println("Subscribing to key: myconfig2");
// ..
Runnable subscribeTask = () -> {
Flux<SubscribeConfigurationResponse> outFlux = client.subscribeConfiguration(CONFIG_STORE_NAME, "myconfig2");
// ...
};
// ...
UnsubscribeConfigurationResponse res = client.unsubscribeConfiguration(
subscriptionId.get(),
CONFIG_STORE_NAME
).block();
// ..
}
}
```
Get into the examples' directory:
```sh
cd examples
@ -59,21 +135,16 @@ Use the following command to run this example-
name: Run ConfigurationClient example
expected_stdout_lines:
- "== APP == Using preview client..."
- "== APP == *******trying to retrieve configuration given a single key********"
- "== APP == Value ->val1 key ->myconfig1"
- "== APP == *******trying to retrieve configurations for a variable no. of keys********"
- "== APP == val1 : key ->myconfig1"
- "== APP == val3 : key ->myconfig3"
- "== APP == *******trying to retrieve configurations for a list of keys********"
- "== APP == val1 : key ->myconfig1"
- "== APP == val2 : key ->myconfig2"
- "== APP == val3 : key ->myconfig3"
- "== APP == *****Subscribing to keys using subscribe method: [myconfig1, myconfig3, myconfig2] *****"
- "== APP == update_myconfigvalue1 : key ->myconfig1"
- "== APP == update_myconfigvalue2 : key ->myconfig2"
- "== APP == update_myconfigvalue3 : key ->myconfig3"
- "== APP == Subscribing to key: myconfig1"
- "== APP == subscription ID :"
- "== APP == subscribing to key myconfig1 is successful"
background: true
sleep: 5
output_match_mode: substring
sleep: 10
-->
```bash
@ -85,19 +156,14 @@ dapr run --components-path ./components/configuration --app-id configgrpc --log-
### Sample output
```
== APP == Using preview client...
== APP == *******trying to retrieve configuration given a single key********
== APP == Value ->val1 key ->myconfig1
== APP == *******trying to retrieve configurations for a variable no. of keys********
== APP == val1 : key ->myconfig1
== APP == val3 : key ->myconfig3
== APP == *******trying to retrieve configurations for a list of keys********
== APP == val1 : key ->myconfig1
== APP == val2 : key ->myconfig2
== APP == val3 : key ->myconfig3
== APP == *****Subscribing to keys using subscribe method: [myconfig1, myconfig3, myconfig2] *****
== APP == update_myconfigvalue1 : key ->myconfig1
== APP == update_myconfigvalue2 : key ->myconfig2
== APP == update_myconfigvalue3 : key ->myconfig3
== APP == Subscribing to key: myconfig1
== APP == subscription ID : 82bb8e24-f69d-477a-9126-5ffaf995f498
== APP == subscribing to key myconfig1 is successful
```
### Cleanup

View File

@ -0,0 +1,102 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.configuration.http;
import io.dapr.client.DaprApiProtocol;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.GetConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.config.Properties;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConfigurationClient {
private static final String CONFIG_STORE_NAME = "configstore";
private static String SUBSCRIPTION_ID;
/**
* Executes various methods to check the different apis.
* @param args arguments
* @throws Exception throws Exception
*/
public static void main(String[] args) throws Exception {
System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.HTTP.name());
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
System.out.println("Using preview client...");
getConfigurations(client);
subscribeConfigurationRequest(client);
}
}
/**
* Gets configurations for a list of keys.
*
* @param client DaprPreviewClient object
*/
public static void getConfigurations(DaprPreviewClient client) {
System.out.println("*******trying to retrieve configurations for a list of keys********");
List<String> keys = new ArrayList<>();
keys.add("myconfig1");
keys.add("myconfig2");
keys.add("myconfig3");
Map<String, String> hmap = new HashMap<>();
hmap.put("meta_key","meta_value");
GetConfigurationRequest req = new GetConfigurationRequest(CONFIG_STORE_NAME, keys);
req.setMetadata(hmap);
try {
Mono<Map<String, ConfigurationItem>> items = client.getConfiguration(req);
items.block().forEach((k,v) -> print(v, k));
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
/**
* Subscribe to a list of keys.
*
* @param client DaprPreviewClient object
*/
public static void subscribeConfigurationRequest(DaprPreviewClient client) throws InterruptedException {
System.out.println("Subscribing to key: myconfig2");
SubscribeConfigurationRequest req = new SubscribeConfigurationRequest(
CONFIG_STORE_NAME, Collections.singletonList("myconfig2"));
Flux<SubscribeConfigurationResponse> outFlux = client.subscribeConfiguration(req);
outFlux.subscribe(
cis -> {
SUBSCRIPTION_ID = cis.getSubscriptionId();
});
if (!SUBSCRIPTION_ID.isEmpty()) {
System.out.println("subscribing to myconfig2 is successful");
} else {
System.out.println("error in subscribing to myconfig2");
}
Thread.sleep(5000);
}
private static void print(ConfigurationItem item, String key) {
System.out.println(item.getValue() + " : key ->" + key);
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.configuration.http;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.examples.DaprApplication;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
* A sample springboot application to register and receive for updates on configuration items sent by Dapr.
* Users are free to write their own controllers to handle any specific route suited to the need.
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.configuration.http.ConfigurationSubscriber -p 3009
*/
@RestController
public class ConfigurationHandler {
/**
* Receives a subscription change notification.
* @param configStore Path variables for post call
* @param key Key whose value has changed
* @param response Configuration response
*/
@PostMapping(path = "/configuration/{configStore}/{key}", produces = MediaType.ALL_VALUE)
public void handleConfigUpdate(@PathVariable("configStore") String configStore,
@PathVariable("key") String key,
@RequestBody SubscribeConfigurationResponse response) {
System.out.println("Configuration update received for store: " + configStore);
response.getItems().forEach((k,v) -> System.out.println("Key: " + k + " Value :" + v.getValue()));
}
/**
* This is entry point for Configuration Subscriber service.
* @param args Arguments for main
* @throws Exception Throws Exception
*/
public static void main(String[] args) throws Exception {
Options options = new Options();
options.addRequiredOption("p", "port", true, "The port this app will listen on");
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);
// If port string is not valid, it will throw an exception.
int port = Integer.parseInt(cmd.getOptionValue("port"));
DaprApplication.start(port);
}
}

View File

@ -0,0 +1,208 @@
## Retrieve Configurations via Configuration API
This example provides the different capabilities provided by Dapr Java SDK for Configuration. For further information about Configuration APIs please refer to [this link](https://docs.dapr.io/developing-applications/building-blocks/configuration/)
**This API is available in Preview Mode**.
### Using the ConfigurationAPI
The java SDK exposes several methods for this -
* `client.getConfiguration(...)` for getting a configuration for a single/multiple keys.
* `client.subscribeConfiguration(...)` for subscribing to a list of keys for any change.
* `client.unsubscribeConfiguration(...)` for unsubscribing to changes from subscribed items.
## Pre-requisites
* [Dapr and Dapr Cli](https://docs.dapr.io/getting-started/install-dapr/).
* Java JDK 11 (or greater):
* [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11)
* [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11)
* [OpenJDK 11](https://jdk.java.net/11/)
* [Apache Maven](https://maven.apache.org/install.html) version 3.x.
### Checking out the code
Clone this repository:
```sh
git clone https://github.com/dapr/java-sdk.git
cd java-sdk
```
Then build the Maven project:
```sh
# make sure you are in the `java-sdk` directory.
mvn install
```
## Store few dummy configurations in configurationstore
<!-- STEP
name: Set configuration value
expected_stdout_lines:
- "OK"
timeout_seconds: 20
-->
```bash
docker exec dapr_redis redis-cli MSET myconfig1 "val1||1" myconfig2 "val2||1" myconfig3 "val3||1"
```
<!-- END_STEP -->
### Running the example
This example uses the Java SDK Dapr client in order to **Get, Subscribe and Unsubscribe** from configuration items and utilizes `Redis` as configuration store.
`ConfigurationClient.java` is the example class demonstrating all 3 features.
Kindly check [DaprPreviewClient.java](https://github.com/dapr/java-sdk/blob/master/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java) for detailed description of the supported APIs.
```java
public class ConfigurationClient {
// ...
/**
* Executes various methods to check the different apis.
* @param args arguments
* @throws Exception throws Exception
*/
public static void main(String[] args) throws Exception {
System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.HTTP.name());
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
System.out.println("Using preview client...");
getConfigurations(client);
subscribeConfigurationRequest(client);
}
}
/**
* Gets configurations for a list of keys.
*
* @param client DaprPreviewClient object
*/
public static void getConfigurations(DaprPreviewClient client) {
System.out.println("*******trying to retrieve configurations for a list of keys********");
GetConfigurationRequest req = new GetConfigurationRequest(CONFIG_STORE_NAME, keys);
try {
Mono<List<ConfigurationItem>> items = client.getConfiguration(req);
// ...
} catch (Exception ex) {
// ...
}
}
/**
* Subscribe to a list of keys.
*
* @param client DaprPreviewClient object
*/
public static void subscribeConfigurationRequest(DaprPreviewClient client) {
// ...
SubscribeConfigurationRequest req = new SubscribeConfigurationRequest(
CONFIG_STORE_NAME, Collections.singletonList("myconfig2"));
Runnable subscribeTask = () -> {
Flux<SubscribeConfigurationResponse> outFlux = client.subscribeConfiguration(req);
// ...
};
// ..
}
}
```
Get into the examples' directory:
```sh
cd examples
```
#### Running the configuration subscriber app:
`DaprApplication.start()` Method will run a Spring Boot application containing a `ConfigurationHandler`, a contoller
to receive configuration change notifications.
```java
@RestController
public class ConfigurationHandler {
//...
@PostMapping(path = "/configuration/{configStore}/{key}", produces = MediaType.ALL_VALUE)
public void handleConfigUpdate(@PathVariable("configStore") String configStore,
@PathVariable("key") String key,
@RequestBody SubscribeConfigurationResponse response) {
System.out.println("Configuration update received for store: " + configStore);
response.getItems().forEach((k,v) -> System.out.println("Key: " + k + " Value :" + v.getValue()));
}
//....
}
```
Execute the following script to run the ConfigSubscriber app:
<!-- STEP
name: Run ConfigurationHandler
expected_stdout_lines:
- '== APP == Configuration update received for store: configstore'
- '== APP == Key: myconfig2 Value :updated_val2'
background: true
output_match_mode: substring
background: true
sleep: 5
-->
```bash
dapr run --app-id confighandler -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.configuration.http.ConfigurationHandler -p 3009
```
<!-- END_STEP -->
#### Running the ConfigurationClient app:
Use the following command to run this example-
<!-- STEP
name: Run ConfigurationClient example
expected_stdout_lines:
- "== APP == Using preview client..."
- "== APP == *******trying to retrieve configurations for a list of keys********"
- "== APP == val1 : key ->myconfig1"
- "== APP == val2 : key ->myconfig2"
- "== APP == val3 : key ->myconfig3"
- "== APP == Subscribing to key: myconfig2"
- "== APP == subscribing to myconfig2 is successful"
background: true
output_match_mode: substring
sleep: 10
-->
```bash
dapr run --components-path ./components/configuration --app-id confighttp --log-level debug --app-port 3009 --dapr-http-port 3500 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.configuration.http.ConfigurationClient
```
#### Update myconfig2 key in configurationstore
<!-- END_STEP -->
<!-- STEP
name: Update configuration value
timeout_seconds: 20
-->
```bash
docker exec dapr_redis redis-cli MSET myconfig2 "updated_val2||1"
```
<!-- END_STEP -->
### Sample output
```
== APP == Using preview client...
== APP == *******trying to retrieve configurations for a list of keys********
== APP == val1 : key ->myconfig1
== APP == val2 : key ->myconfig2
== APP == val3 : key ->myconfig3
== APP == Subscribing to key: myconfig2
== APP == subscribing to myconfig2 is successful
```
### Cleanup
To stop the app, run (or press CTRL+C):
<!-- STEP
name: Cleanup
-->
```bash
dapr stop --app-id confighttp
dapr stop --app-id confighandler
```
<!-- END_STEP -->

View File

@ -13,8 +13,6 @@ limitations under the License.
package io.dapr.springboot;
import io.dapr.Rule;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -24,7 +22,6 @@ import java.util.stream.Collectors;
* Internal Singleton to handle Dapr configuration.
*/
class DaprRuntime {
/**
* The singleton instance.
*/
@ -63,7 +60,8 @@ class DaprRuntime {
*
* @param pubsubName Pubsub name to subcribe to.
* @param topicName Name of the topic being subscribed to.
* @param rule The optional rule for this route.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
*/

View File

@ -16,6 +16,8 @@ package io.dapr.it.configuration.grpc;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
@ -25,7 +27,6 @@ import reactor.core.publisher.Flux;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
@ -81,13 +82,6 @@ public class ConfigurationClientIT extends BaseIT {
assertEquals(ci.getValue(), "myconfigvalue1");
}
@Test
public void getConfigurationWithEmptyKey() {
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "").block();
});
}
@Test
public void getConfigurations() {
Map<String, ConfigurationItem> cis = daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2").block();
@ -98,27 +92,19 @@ public class ConfigurationClientIT extends BaseIT {
}
@Test
public void getConfigurationsWithEmptyList() {
List<String> listOfKeys = new ArrayList<>();
Map<String, String> metadata = new HashMap<>();
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, listOfKeys, metadata).block();
});
}
@Test
public void subscribeToConfiguration() {
List<String> updatedValues = new ArrayList<>();
AtomicReference<Disposable> disposable = new AtomicReference<>();
public void subscribeConfiguration() {
Runnable subscribeTask = () -> {
Flux<Map<String, ConfigurationItem>> outFlux = daprPreviewClient
.subscribeToConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2");
disposable.set(outFlux.subscribe(update -> {
updatedValues.add(update.entrySet()
.stream()
.findFirst()
.get().getValue().getValue());
}));
Flux<SubscribeConfigurationResponse> outFlux = daprPreviewClient
.subscribeConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2");
outFlux.subscribe(update -> {
if (update.getItems().size() == 0 ) {
assertTrue(update.getSubscriptionId().length() > 0);
} else {
String value = update.getItems().entrySet().stream().findFirst().get().getValue().getValue();
assertEquals(update.getItems().size(), 1);
assertTrue(value.contains("update_"));
}
});
};
Thread subscribeThread = new Thread(subscribeTask);
subscribeThread.start();
@ -134,15 +120,73 @@ public class ConfigurationClientIT extends BaseIT {
new Thread(updateKeys).start();
try {
// To ensure main thread does not die before outFlux subscribe gets called
Thread.sleep(3000);
disposable.get().dispose();
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void unsubscribeConfigurationItems() {
List<String> updatedValues = new ArrayList<>();
AtomicReference<Disposable> disposableAtomicReference = new AtomicReference<>();
AtomicReference<String> subscriptionId = new AtomicReference<>();
Runnable subscribeTask = () -> {
Flux<SubscribeConfigurationResponse> outFlux = daprPreviewClient
.subscribeConfiguration(CONFIG_STORE_NAME, "myconfigkey1");
disposableAtomicReference.set(outFlux
.subscribe(update -> {
subscriptionId.set(update.getSubscriptionId());
updatedValues.add(update.getItems().entrySet().stream().findFirst().get().getValue().getValue());
}
));
};
new Thread(subscribeTask).start();
// To ensure that subscribeThread gets scheduled
inducingSleepTime(0);
Runnable updateKeys = () -> {
int i = 1;
while (i <= 5) {
String[] command = new String[] {
"docker", "exec", "dapr_redis", "redis-cli",
"SET",
"myconfigkey1", "update_myconfigvalue" + i + "||2"
};
executeDockerCommand(command);
i++;
}
};
new Thread(updateKeys).start();
// To ensure key starts getting updated
inducingSleepTime(1000);
UnsubscribeConfigurationResponse res = daprPreviewClient.unsubscribeConfiguration(
subscriptionId.get(),
CONFIG_STORE_NAME
).block();
assertTrue(res != null);
assertTrue(res.getIsUnsubscribed());
int listSize = updatedValues.size();
// To ensure main thread does not die
inducingSleepTime(1000);
new Thread(updateKeys).start();
// To ensure main thread does not die
inducingSleepTime(2000);
assertTrue(updatedValues.size() == listSize);
}
private static void inducingSleepTime(int timeInMillis) {
try {
Thread.sleep(timeInMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
assertEquals(updatedValues.size(), 2);
assertTrue(updatedValues.contains("update_myconfigvalue1"));
assertTrue(updatedValues.contains("update_myconfigvalue2"));
assertFalse(updatedValues.contains("update_myconfigvalue3"));
}
private static void executeDockerCommand(String[] command) {

View File

@ -0,0 +1,60 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.configuration.http;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import java.util.Iterator;
import java.util.Map;
/**
* Spring boot Controller class for api endpoints.
*/
@RestController
public class ConfigSubscriberController {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* Api mapping for subscribe configuration.
* @param pathVarsMap Path variables for post call
* @param obj request Body
* @return Returns void
*/
@PostMapping(path = "/configuration/{configStore}/{key}", produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<Void> handleMessage(
@PathVariable Map<String, String> pathVarsMap,
@RequestBody SubscribeConfigurationResponse obj) {
return Mono.fromRunnable(
() -> {
try {
Map<String, ConfigurationItem> items = obj.getItems();
for (Map.Entry<String, ConfigurationItem> entry : items.entrySet()) {
System.out.println(entry.getValue().getValue() + " : key ->" + entry.getKey());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
);
}
}

View File

@ -0,0 +1,82 @@
package io.dapr.it.configuration.http;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.*;
import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
public class ConfigurationIT extends BaseIT {
private static final String CONFIG_STORE_NAME = "redisconfigstore";
private static DaprRun daprRun;
private static DaprPreviewClient daprPreviewClient;
private static String key = "myconfig1";
private static List<String> keys = new ArrayList<>(Arrays.asList("myconfig1", "myconfig2", "myconfig3"));
private static String[] insertCmd = new String[] {
"docker", "exec", "dapr_redis", "redis-cli",
"MSET",
"myconfigkey1", "myconfigvalue1||1",
"myconfigkey2", "myconfigvalue2||1",
"myconfigkey3", "myconfigvalue3||1"
};
@BeforeClass
public static void init() throws Exception {
daprRun = startDaprApp(ConfigurationIT.class.getSimpleName(), 5000);
daprRun.switchToHTTP();
daprPreviewClient = new DaprClientBuilder().buildPreviewClient();
}
@AfterClass
public static void tearDown() throws Exception {
daprPreviewClient.close();
}
@Before
public void setupConfigStore() {
executeDockerCommand(insertCmd);
}
@Test
public void getConfiguration() {
ConfigurationItem ci = daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "myconfigkey1").block();
assertEquals(ci.getKey(), "myconfigkey1");
assertEquals(ci.getValue(), "myconfigvalue1");
}
@Test
public void getConfigurations() {
Map<String, ConfigurationItem> cis = daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2").block();
assertTrue(cis.size() == 2);
assertTrue(cis.containsKey("myconfigkey1"));
assertTrue(cis.containsKey("myconfigkey2"));
}
private static void executeDockerCommand(String[] command) {
ProcessBuilder processBuilder = new ProcessBuilder(command);
Process process = null;
try {
process = processBuilder.start();
process.waitFor();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,91 @@
package io.dapr.it.configuration.http;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertTrue;
public class ConfigurationSubscribeIT extends BaseIT {
private static final String CONFIG_STORE_NAME = "redisconfigstore";
private static DaprRun daprRun;
private static DaprPreviewClient daprPreviewClient;
private static String key = "myconfig1";
private static List<String> keys = new ArrayList<>(Arrays.asList("myconfig1", "myconfig2", "myconfig3"));
private static String[] insertCmd = new String[] {
"docker", "exec", "dapr_redis", "redis-cli",
"MSET",
"myconfigkey1", "myconfigvalue1||1",
"myconfigkey2", "myconfigvalue2||1",
"myconfigkey3", "myconfigvalue3||1"
};
@BeforeClass
public static void init() throws Exception {
daprRun = startDaprApp(
ConfigurationIT.class.getSimpleName(),
ConfigurationSubscriberService.SUCCESS_MESSAGE,
ConfigurationSubscriberService.class,
true,
60000);
daprRun.switchToHTTP();
daprPreviewClient = new DaprClientBuilder().buildPreviewClient();
}
@AfterClass
public static void tearDown() throws Exception {
daprPreviewClient.close();
}
@Before
public void setupConfigStore() {
executeDockerCommand(insertCmd);
}
@Test
public void subscribeAndUnsubscribeConfiguration() {
AtomicReference<String> subId= new AtomicReference<>("");
Flux<SubscribeConfigurationResponse> outFlux = daprPreviewClient
.subscribeConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2");
outFlux.subscribe(items -> {
subId.set(items.getSubscriptionId());
});
assertTrue(subId.get().length() > 0);
UnsubscribeConfigurationResponse res = daprPreviewClient.unsubscribeConfiguration(
subId.get(),
CONFIG_STORE_NAME
).block();
assertTrue(res.getIsUnsubscribed());
}
private static void executeDockerCommand(String[] command) {
ProcessBuilder processBuilder = new ProcessBuilder(command);
Process process = null;
try {
process = processBuilder.start();
process.waitFor();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.configuration.http;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Service for ConfigurationSubscriber.
* dapr run --components-path ./components/configuration --app-id configsubscriber --app-port 3000 -- \
* java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.configuration.http.ConfigurationSubscriber -p 3000
*/
@SpringBootApplication
public class ConfigurationSubscriberService {
public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running";
/**
* This is entry point for Configuration Subscriber service.
* @param args Arguments for main
* @throws Exception Throws Exception
*/
public static void main(String[] args) throws Exception {
int port = Integer.parseInt(args[0]);
System.out.printf("Service starting on port %d ...\n", port);
// Start Dapr's callback endpoint.
start(port);
}
/**
* Starts Dapr's callback in a given port.
*
* @param port Port to listen to.
*/
private static void start(int port) {
SpringApplication app = new SpringApplication(ConfigurationSubscriberService.class);
app.run(String.format("--server.port=%d", port));
}
}

View File

@ -32,7 +32,10 @@ import io.dapr.client.domain.SaveStateRequest;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.client.domain.query.Query;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;
@ -543,22 +546,32 @@ abstract class AbstractDaprClient implements DaprClient, DaprPreviewClient {
/**
* {@inheritDoc}
*/
public Flux<Map<String, ConfigurationItem>> subscribeToConfiguration(String storeName, String... keys) {
public Flux<SubscribeConfigurationResponse> subscribeConfiguration(String storeName, String... keys) {
List<String> listOfKeys = filterEmptyKeys(keys);
SubscribeConfigurationRequest request = new SubscribeConfigurationRequest(storeName, listOfKeys);
return this.subscribeToConfiguration(request);
return this.subscribeConfiguration(request);
}
/**
* {@inheritDoc}
*/
public Flux<Map<String, ConfigurationItem>> subscribeToConfiguration(
public Flux<SubscribeConfigurationResponse> subscribeConfiguration(
String storeName,
List<String> keys,
Map<String, String> metadata) {
SubscribeConfigurationRequest request = new SubscribeConfigurationRequest(storeName, keys);
request.setMetadata(metadata);
return this.subscribeToConfiguration(request);
return this.subscribeConfiguration(request);
}
/**
* {@inheritDoc}
*/
public Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(
String id,
String storeName) {
UnsubscribeConfigurationRequest request = new UnsubscribeConfigurationRequest(id, storeName);
return this.unsubscribeConfiguration(request);
}
private List<String> filterEmptyKeys(String... keys) {

View File

@ -36,7 +36,10 @@ import io.dapr.client.domain.SaveStateRequest;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
@ -776,9 +779,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
if ((configurationStoreName == null) || (configurationStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("Configuration Store Name cannot be null or empty.");
}
if (keys.isEmpty()) {
throw new IllegalArgumentException("Keys can not be empty or null");
}
DaprProtos.GetConfigurationRequest.Builder builder = DaprProtos.GetConfigurationRequest.newBuilder()
.setStoreName(configurationStoreName).addAllKeys(keys);
if (metadata != null) {
@ -816,7 +817,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
* {@inheritDoc}
*/
@Override
public Flux<Map<String, ConfigurationItem>> subscribeToConfiguration(SubscribeConfigurationRequest request) {
public Flux<SubscribeConfigurationResponse> subscribeConfiguration(SubscribeConfigurationRequest request) {
try {
final String configurationStoreName = request.getStoreName();
final List<String> keys = request.getKeys();
@ -825,12 +826,12 @@ public class DaprClientGrpc extends AbstractDaprClient {
if (configurationStoreName == null || (configurationStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("Configuration Store Name can not be null or empty.");
}
if (keys.isEmpty()) {
throw new IllegalArgumentException("Keys can not be null or empty.");
}
// keys can and empty list for subscribe all scenario, so we do not need check for empty keys.
DaprProtos.SubscribeConfigurationRequest.Builder builder = DaprProtos.SubscribeConfigurationRequest.newBuilder()
.setStoreName(configurationStoreName)
.addAllKeys(keys);
if (metadata != null) {
builder.putAllMetadata(metadata);
}
@ -841,12 +842,12 @@ public class DaprClientGrpc extends AbstractDaprClient {
).map(
it -> {
Map<String, ConfigurationItem> configMap = new HashMap<>();
Iterator<Map.Entry<String, CommonProtos.ConfigurationItem>> itr = it.getItems().entrySet().iterator();
Iterator<Map.Entry<String, CommonProtos.ConfigurationItem>> itr = it.getItemsMap().entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, CommonProtos.ConfigurationItem> entry = itr.next();
configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey()));
}
return Collections.unmodifiableMap(configMap);
return new SubscribeConfigurationResponse(it.getId(), Collections.unmodifiableMap(configMap));
}
);
} catch (Exception ex) {
@ -854,6 +855,38 @@ public class DaprClientGrpc extends AbstractDaprClient {
}
}
/**
* {@inheritDoc}
*/
@Override
public Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(UnsubscribeConfigurationRequest request) {
try {
final String configurationStoreName = request.getStoreName();
final String id = request.getSubscriptionId();
if (configurationStoreName == null || (configurationStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("Configuration Store Name can not be null or empty.");
}
if (id.isEmpty()) {
throw new IllegalArgumentException("Subscription id can not be null or empty.");
}
DaprProtos.UnsubscribeConfigurationRequest.Builder builder =
DaprProtos.UnsubscribeConfigurationRequest.newBuilder()
.setId(id)
.setStoreName(configurationStoreName);
DaprProtos.UnsubscribeConfigurationRequest envelope = builder.build();
return this.<DaprProtos.UnsubscribeConfigurationResponse>createMono(
it -> intercept(asyncStub).unsubscribeConfigurationAlpha1(envelope, it)
).map(
it -> new UnsubscribeConfigurationResponse(it.getOk(), it.getMessage())
);
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}
/**
* Build a new Configuration Item from provided parameter.
*

View File

@ -34,8 +34,11 @@ import io.dapr.client.domain.SaveStateRequest;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.client.domain.TransactionalStateRequest;
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.serializer.DaprObjectSerializer;
@ -54,6 +57,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -768,15 +772,150 @@ public class DaprClientHttp extends AbstractDaprClient {
*/
@Override
public Mono<Map<String, ConfigurationItem>> getConfiguration(GetConfigurationRequest request) {
return DaprException.wrapMono(new UnsupportedOperationException());
try {
final String configurationStoreName = request.getStoreName();
final List<String> keys = request.getKeys();
final Map<String, String> metadata = request.getMetadata();
if ((configurationStoreName == null) || (configurationStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("Configuration Store Name cannot be null or empty.");
}
Map<String, List<String>> queryParams = new HashMap<>();
if (!keys.isEmpty()) {
queryParams.put("key", Collections.unmodifiableList(keys));
}
// Appending passed metadata too into queryparams
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
queryParams.putAll(queryArgs);
String[] pathSegments = new String[] {DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName };
return Mono.subscriberContext().flatMap(
context -> this.client
.invokeApi(
DaprHttp.HttpMethods.GET.name(),
pathSegments, queryParams,
(String) null, null, context)
).map(
response -> {
try {
Map m = INTERNAL_SERIALIZER.deserialize(response.getBody(), Map.class);
Set<String> set = m.keySet();
JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody());
Iterator<String> itr = set.iterator();
Map<String, ConfigurationItem> result = new HashMap<>();
while (itr.hasNext()) {
String key = itr.next();
String value = root.get(key).path("value").asText();
String version = root.get(key).path("version").asText();
result.put(key, new ConfigurationItem(
key,
value,
version,
new HashMap<>()
));
}
return Collections.unmodifiableMap(result);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
);
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public Flux<Map<String, ConfigurationItem>> subscribeToConfiguration(SubscribeConfigurationRequest request) {
return DaprException.wrapFlux(new UnsupportedOperationException());
public Flux<SubscribeConfigurationResponse> subscribeConfiguration(SubscribeConfigurationRequest request) {
try {
final String configurationStoreName = request.getStoreName();
final List<String> keys = request.getKeys();
final Map<String, String> metadata = request.getMetadata();
if (configurationStoreName == null || (configurationStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("Configuration Store Name can not be null or empty.");
}
Map<String, List<String>> queryParams = new HashMap<>();
if (!keys.isEmpty()) {
queryParams.put("key", Collections.unmodifiableList(keys));
}
// Appending passed metadata too into queryparams
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
queryParams.putAll(queryArgs);
String[] pathSegments =
new String[] { DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName, "subscribe" };
SubscribeConfigurationResponse res = Mono.subscriberContext().flatMap(
context -> this.client.invokeApi(
DaprHttp.HttpMethods.GET.name(),
pathSegments, queryParams,
(String) null, null, context
)
).map(response -> {
try {
JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody());
String subscriptionId = root.path("id").asText();
return new SubscribeConfigurationResponse(subscriptionId, new HashMap<>());
} catch (IOException e) {
throw new RuntimeException(e);
}
}).block();
if (res != null) {
return Flux.just(res);
}
return Flux.empty();
} catch (Exception ex) {
return DaprException.wrapFlux(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(UnsubscribeConfigurationRequest request) {
try {
final String id = request.getSubscriptionId();
final String configStoreName = request.getStoreName();
if (configStoreName == null || (configStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("Configuration Store Name can not be null or empty.");
}
if (id.isEmpty()) {
throw new IllegalArgumentException("Subscription id can not be null or empty.");
}
String[] pathSegments = new String[]
{ DaprHttp.ALPHA_1_API_VERSION, "configuration", configStoreName, id, "unsubscribe" };
return Mono.subscriberContext().flatMap(
context -> this.client
.invokeApi(
DaprHttp.HttpMethods.GET.name(),
pathSegments, null,
(String) null, null, context)
).map(
response -> {
JsonNode root = null;
try {
root = INTERNAL_SERIALIZER.parseNode(response.getBody());
} catch (IOException e) {
throw new RuntimeException(e);
}
boolean ok = root.path("ok").asBoolean();
String message = root.path("message").asText();
return new UnsubscribeConfigurationResponse(ok, message);
}
);
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}
/**

View File

@ -18,6 +18,9 @@ import io.dapr.client.domain.GetConfigurationRequest;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.client.domain.query.Query;
import io.dapr.utils.TypeRef;
import reactor.core.publisher.Flux;
@ -86,9 +89,9 @@ public interface DaprPreviewClient extends AutoCloseable {
*
* @param storeName Name of the configuration store
* @param keys keys of the configurations which are to be subscribed
* @return Flux of Map of configuration items
* @return Flux of {@link SubscribeConfigurationResponse} instance
*/
Flux<Map<String, ConfigurationItem>> subscribeToConfiguration(String storeName, String... keys);
Flux<SubscribeConfigurationResponse> subscribeConfiguration(String storeName, String... keys);
/**
* Subscribe to the keys for any change.
@ -96,18 +99,35 @@ public interface DaprPreviewClient extends AutoCloseable {
* @param storeName Name of the configuration store
* @param keys keys of the configurations which are to be subscribed
* @param metadata optional metadata
* @return Flux of Map of configuration items
* @return Flux of {@link SubscribeConfigurationResponse} instance
*/
Flux<Map<String, ConfigurationItem>> subscribeToConfiguration(String storeName, List<String> keys,
Map<String, String> metadata);
Flux<SubscribeConfigurationResponse> subscribeConfiguration(String storeName, List<String> keys,
Map<String, String> metadata);
/**
* Subscribe to the keys for any change.
*
* @param request request for subscribing to any change for the given keys in request
* @return Flux of Map of configuration items
* @return Flux of {@link SubscribeConfigurationResponse} instance
*/
Flux<Map<String, ConfigurationItem>> subscribeToConfiguration(SubscribeConfigurationRequest request);
Flux<SubscribeConfigurationResponse> subscribeConfiguration(SubscribeConfigurationRequest request);
/**
* Subscribe to the keys for any change.
*
* @param id subscription id returned by subscribeConfiguration API.
* @param storeName Name of the configuration store.
* @return Mono of {@link UnsubscribeConfigurationResponse} instance.
*/
Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(String id, String storeName);
/**
* Subscribe to the keys for any change.
*
* @param request request for unsubscribing to any change for the given subscription id in request
* @return Mono of {@link UnsubscribeConfigurationResponse} instance.
*/
Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(UnsubscribeConfigurationRequest request);
/**
* Query for states using a query string.

View File

@ -13,6 +13,9 @@ limitations under the License.
package io.dapr.client.domain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collections;
import java.util.Map;
@ -47,7 +50,11 @@ public class ConfigurationItem {
* @param value value for the provided key
* @param version version of the configuration item
*/
public ConfigurationItem(String key, String value, String version) {
@JsonCreator
public ConfigurationItem(
@JsonProperty("key") String key,
@JsonProperty("value") String value,
@JsonProperty("version") String version) {
this.key = key;
this.value = value;
this.version = version;

View File

@ -0,0 +1,58 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collections;
import java.util.Map;
/**
* Domain object for response from subscribeConfiguration API.
*/
public class SubscribeConfigurationResponse {
/**
* Subscription id for the items subscribed to.
*/
private final String subscriptionId;
/**
* Map of Configuration key to {@link ConfigurationItem}.
*/
private final Map<String, ConfigurationItem> items;
/**
* Constructor for SubscribeConfigurationResponse.
*
* @param id Subscription id for the items subscribed to.This id is returned by subscribeToConfiguration API.
* @param items Map of configuration items user subscribed to.
*/
@JsonCreator
public SubscribeConfigurationResponse(
@JsonProperty("id") String id,
@JsonProperty("items") Map<String, ConfigurationItem> items) {
this.subscriptionId = id;
this.items = Collections.unmodifiableMap(items);
}
public Map<String, ConfigurationItem> getItems() {
return items;
}
public String getSubscriptionId() {
return subscriptionId;
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
/**
* Request to unsubscribe to one or more configuration items using subscription id.
*/
public class UnsubscribeConfigurationRequest {
/**
* Name of the configuration store.
*/
private final String storeName;
/**
* Subscription id for the items to unsubscribe to.
*/
private final String subscriptionId;
/**
* Constructor for UnsubscribeConfigurationRequest.
*
* @param id Subscription id for the items subscribed to.This id is returned by subscribeConfiguration API.
* @param storeName Name of the configuration store.
*/
public UnsubscribeConfigurationRequest(String id, String storeName) {
this.storeName = storeName;
this.subscriptionId = id;
}
public String getStoreName() {
return storeName;
}
public String getSubscriptionId() {
return subscriptionId;
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2022 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.client.domain;
/**
* Domain object for unsubscribe response.
*/
public class UnsubscribeConfigurationResponse {
/**
* Boolean denoting whether unsubscribe API call is success/failure.
*/
private final boolean isUnsubscribed;
/**
* Unsubscribe API response message.
*/
private final String message;
/**
* Constructor for UnsubscribeConfigurationResponse.
*
* @param isUnsubscribed boolean denoting unsubscribe API response.
* @param message unsubscribe API response message.
*/
public UnsubscribeConfigurationResponse(boolean isUnsubscribed, String message) {
this.isUnsubscribed = isUnsubscribed;
this.message = message;
}
public boolean getIsUnsubscribed() {
return isUnsubscribed;
}
public String getMessage() {
return message;
}
}

View File

@ -23,6 +23,9 @@ import io.dapr.client.domain.QueryStateItem;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationResponse;
import io.dapr.client.domain.UnsubscribeConfigurationRequest;
import io.dapr.client.domain.UnsubscribeConfigurationResponse;
import io.dapr.client.domain.query.Query;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.v1.CommonProtos;
@ -87,16 +90,9 @@ public class DaprPreviewClientGrpcTest {
@Test
public void getConfigurationTestErrorScenario() {
assertThrows(IllegalArgumentException.class, () -> {
previewClient.getConfiguration(CONFIG_STORE_NAME, "").block();
});
assertThrows(IllegalArgumentException.class, () -> {
previewClient.getConfiguration("", "key").block();
});
GetConfigurationRequest req = new GetConfigurationRequest(CONFIG_STORE_NAME, null);
assertThrows(IllegalArgumentException.class, () -> {
previewClient.getConfiguration(req).block();
});
}
@Test
@ -182,6 +178,7 @@ public class DaprPreviewClientGrpcTest {
.build());
DaprProtos.SubscribeConfigurationResponse responseEnvelope = DaprProtos.SubscribeConfigurationResponse.newBuilder()
.putAllItems(configs)
.setId("subscription_id")
.build();
doAnswer((Answer<Void>) invocation -> {
@ -192,10 +189,12 @@ public class DaprPreviewClientGrpcTest {
return null;
}).when(daprStub).subscribeConfigurationAlpha1(any(DaprProtos.SubscribeConfigurationRequest.class), any());
Iterator<Map<String, ConfigurationItem>> itr = previewClient.subscribeToConfiguration(CONFIG_STORE_NAME, "configkey1").toIterable().iterator();
Iterator<SubscribeConfigurationResponse> itr = previewClient.subscribeConfiguration(CONFIG_STORE_NAME, "configkey1").toIterable().iterator();
assertTrue(itr.hasNext());
assertTrue(itr.next().containsKey("configkey1"));
assertFalse(itr.hasNext());
SubscribeConfigurationResponse res = itr.next();
assertTrue(res.getItems().containsKey("configkey1"));
assertEquals("subscription_id", res.getSubscriptionId());
assertFalse(itr.hasNext());
}
@Test
@ -210,6 +209,7 @@ public class DaprPreviewClientGrpcTest {
.build());
DaprProtos.SubscribeConfigurationResponse responseEnvelope = DaprProtos.SubscribeConfigurationResponse.newBuilder()
.putAllItems(configs)
.setId("subscription_id")
.build();
doAnswer((Answer<Void>) invocation -> {
@ -223,10 +223,12 @@ public class DaprPreviewClientGrpcTest {
Map<String, String> reqMetadata = new HashMap<>();
List<String> keys = Arrays.asList("configkey1");
Iterator<Map<String, ConfigurationItem>> itr = previewClient.subscribeToConfiguration(CONFIG_STORE_NAME, "configkey1").toIterable().iterator();
Iterator<SubscribeConfigurationResponse> itr = previewClient.subscribeConfiguration(CONFIG_STORE_NAME, keys, reqMetadata).toIterable().iterator();
assertTrue(itr.hasNext());
assertTrue(itr.next().containsKey("configkey1"));
assertFalse(itr.hasNext());
SubscribeConfigurationResponse res = itr.next();
assertTrue(res.getItems().containsKey("configkey1"));
assertEquals("subscription_id", res.getSubscriptionId());
assertFalse(itr.hasNext());
}
@Test
@ -240,16 +242,56 @@ public class DaprPreviewClientGrpcTest {
}).when(daprStub).subscribeConfigurationAlpha1(any(DaprProtos.SubscribeConfigurationRequest.class), any());
assertThrowsDaprException(ExecutionException.class, () -> {
previewClient.subscribeToConfiguration(CONFIG_STORE_NAME, "key").blockFirst();
previewClient.subscribeConfiguration(CONFIG_STORE_NAME, "key").blockFirst();
});
assertThrows(IllegalArgumentException.class, () -> {
previewClient.subscribeToConfiguration("", "key").blockFirst();
previewClient.subscribeConfiguration("", "key").blockFirst();
});
}
@Test
public void unsubscribeConfigurationTest() {
DaprProtos.UnsubscribeConfigurationResponse responseEnvelope = DaprProtos.UnsubscribeConfigurationResponse.newBuilder()
.setOk(true)
.setMessage("unsubscribed_message")
.build();
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.UnsubscribeConfigurationResponse> observer =
(StreamObserver<DaprProtos.UnsubscribeConfigurationResponse>) invocation.getArguments()[1];
observer.onNext(responseEnvelope);
observer.onCompleted();
return null;
}).when(daprStub).unsubscribeConfigurationAlpha1(any(DaprProtos.UnsubscribeConfigurationRequest.class), any());
UnsubscribeConfigurationResponse
response = previewClient.unsubscribeConfiguration("subscription_id", CONFIG_STORE_NAME).block();
assertTrue(response.getIsUnsubscribed());
assertEquals("unsubscribed_message", response.getMessage());
}
@Test
public void unsubscribeConfigurationTestWithError() {
doAnswer((Answer<Void>) invocation -> {
StreamObserver<DaprProtos.UnsubscribeConfigurationResponse> observer =
(StreamObserver<DaprProtos.UnsubscribeConfigurationResponse>) invocation.getArguments()[1];
observer.onError(new RuntimeException());
observer.onCompleted();
return null;
}).when(daprStub).unsubscribeConfigurationAlpha1(any(DaprProtos.UnsubscribeConfigurationRequest.class), any());
assertThrowsDaprException(ExecutionException.class, () -> {
previewClient.unsubscribeConfiguration("subscription_id", CONFIG_STORE_NAME).block();
});
SubscribeConfigurationRequest req = new SubscribeConfigurationRequest(CONFIG_STORE_NAME, null);
assertThrows(IllegalArgumentException.class, () -> {
previewClient.subscribeToConfiguration(req).blockFirst();
previewClient.unsubscribeConfiguration("", CONFIG_STORE_NAME).block();
});
UnsubscribeConfigurationRequest req = new UnsubscribeConfigurationRequest("subscription_id", "");
assertThrows(IllegalArgumentException.class, () -> {
previewClient.unsubscribeConfiguration(req).block();
});
}

View File

@ -13,24 +13,36 @@ limitations under the License.
package io.dapr.client;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.*;
import io.dapr.client.domain.query.Query;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.utils.TypeRef;
import io.dapr.v1.DaprProtos;
import io.grpc.stub.StreamObserver;
import okhttp3.OkHttpClient;
import okhttp3.mock.Behavior;
import okhttp3.mock.MockInterceptor;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
import static org.junit.Assert.*;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
public class DaprPreviewClientHttpTest {
private static final String CONFIG_STORE_NAME = "MyConfigurationStore";
private static final String CONFIG_STORE_NAME = "MyConfigStore";
private DaprPreviewClient daprPreviewClientHttp;
@ -48,27 +60,6 @@ public class DaprPreviewClientHttpTest {
daprPreviewClientHttp = new DaprClientHttp(daprHttp);
}
@Test
public void getConfigurationWithSingleKey() {
assertThrows(DaprException.class, () -> {
daprPreviewClientHttp.getConfiguration(CONFIG_STORE_NAME, "key").block();
});
}
@Test
public void getConfiguration() {
assertThrows(DaprException.class, () -> {
daprPreviewClientHttp.getConfiguration(CONFIG_STORE_NAME, "key1", "key2").block();
});
}
@Test
public void subscribeConfigurations() {
assertThrows(DaprException.class, () -> {
daprPreviewClientHttp.subscribeToConfiguration(CONFIG_STORE_NAME, "key1", "key2").blockFirst();
});
}
@Test
public void queryStateExceptionsTest() {
assertThrows(IllegalArgumentException.class, () -> {
@ -120,4 +111,118 @@ public class DaprPreviewClientHttpTest {
assertEquals("result must be same", "testData", response.getResults().get(0).getValue());
assertEquals("result must be same", "6f54ad94-dfb9-46f0-a371-e42d550adb7d", response.getResults().get(0).getEtag());
}
@Test
public void getConfigurationTestErrorScenario() {
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.getConfiguration("", "key").block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.getConfiguration(" ", "key").block();
});
}
@Test
public void getConfigurationTest() {
mockInterceptor.addRule()
.get()
.path("/v1.0-alpha1/configuration/MyConfigStore")
.param("key","configkey1")
.respond("{\"configkey1\" : {\"value\" : \"configvalue1\",\"version\" : \"1\"}}");
ConfigurationItem ci = daprPreviewClientHttp.getConfiguration(CONFIG_STORE_NAME, "configkey1").block();
assertNotNull(ci);
assertEquals("configkey1", ci.getKey());
assertEquals("configvalue1", ci.getValue());
assertEquals("1", ci.getVersion());
}
@Test
public void getAllConfigurationTest() {
mockInterceptor.addRule()
.get()
.path("/v1.0-alpha1/configuration/MyConfigStore")
.respond("{\"configkey1\" : {\"value\" : \"configvalue1\",\"version\" : \"1\"}}");
ConfigurationItem ci = daprPreviewClientHttp.getConfiguration(CONFIG_STORE_NAME, "configkey1").block();
assertNotNull(ci);
assertEquals("configkey1", ci.getKey());
assertEquals("configvalue1", ci.getValue());
assertEquals("1", ci.getVersion());
}
@Test
public void subscribeConfigurationTest() {
mockInterceptor.addRule()
.get()
.path("/v1.0-alpha1/configuration/MyConfigStore/subscribe")
.param("key", "configkey1")
.respond("{\"id\":\"1234\"}");
Iterator<SubscribeConfigurationResponse> itr = daprPreviewClientHttp.subscribeConfiguration(CONFIG_STORE_NAME, "configkey1").toIterable().iterator();
assertTrue(itr.hasNext());
SubscribeConfigurationResponse res = itr.next();
assertEquals("1234", res.getSubscriptionId());
assertFalse(itr.hasNext());
}
@Test
public void subscribeAllConfigurationTest() {
mockInterceptor.addRule()
.get()
.path("/v1.0-alpha1/configuration/MyConfigStore/subscribe")
.respond("{\"id\":\"1234\"}");
Iterator<SubscribeConfigurationResponse> itr = daprPreviewClientHttp.subscribeConfiguration(CONFIG_STORE_NAME, "configkey1").toIterable().iterator();
assertTrue(itr.hasNext());
SubscribeConfigurationResponse res = itr.next();
assertEquals("1234", res.getSubscriptionId());
assertFalse(itr.hasNext());
}
@Test
public void unsubscribeConfigurationTest() {
mockInterceptor.addRule()
.get()
.path("/v1.0-alpha1/configuration/MyConfigStore/1234/unsubscribe")
.respond("{\"ok\": true}");
UnsubscribeConfigurationResponse res = daprPreviewClientHttp.unsubscribeConfiguration("1234", CONFIG_STORE_NAME).block();
assertTrue(res.getIsUnsubscribed());
}
@Test
public void unsubscribeConfigurationTestWithError() {
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.unsubscribeConfiguration("", CONFIG_STORE_NAME).block();
});
UnsubscribeConfigurationRequest req = new UnsubscribeConfigurationRequest("subscription_id", "");
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.unsubscribeConfiguration(req).block();
});
mockInterceptor.addRule()
.get()
.path("/v1.0-alpha1/configuration/MyConfigStore/1234/unsubscribe")
.respond("{\"ok\": false, \"message\": \"some error while unsubscribing\"}");
UnsubscribeConfigurationResponse res = daprPreviewClientHttp.unsubscribeConfiguration("1234", CONFIG_STORE_NAME).block();
assertFalse(res.getIsUnsubscribed());
}
@Test
public void subscribeConfigurationTestWithError() {
assertThrows(IllegalArgumentException.class, () -> {
daprPreviewClientHttp.subscribeConfiguration("", "key1").blockFirst();
});
mockInterceptor.addRule()
.get()
.path("/v1.0-alpha1/configuration/MyConfigStore/subscribe")
.param("key", "configkey1")
.respond(500);
assertThrows(DaprException.class, () -> {
daprPreviewClientHttp.subscribeConfiguration(CONFIG_STORE_NAME, "configkey1").blockFirst();
});
}
}