diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index fcb0d28eb..593458a4c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -25,11 +25,11 @@ jobs: GOARCH: amd64 GOPROXY: https://proxy.golang.org JDK_VER: ${{ matrix.java }} - DAPR_CLI_VER: 1.8.0-rc.1 - DAPR_RUNTIME_VER: 1.8.0-rc.3 + DAPR_CLI_VER: 1.8.1 + DAPR_RUNTIME_VER: 1.8.4 DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.8.0-rc.1/install/install.sh DAPR_CLI_REF: - DAPR_REF: 683a436ff1367e4ff2d27da2d79069c04ec2c46d + DAPR_REF: 0e34de1086c697d7d74ac2af14ddb11f9346b57e steps: - uses: actions/checkout@v3 - name: Set up OpenJDK ${{ env.JDK_VER }} diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml index c650af082..62fc05895 100644 --- a/.github/workflows/validate.yml +++ b/.github/workflows/validate.yml @@ -36,11 +36,11 @@ jobs: GOARCH: amd64 GOPROXY: https://proxy.golang.org JDK_VER: ${{ matrix.java }} - DAPR_CLI_VER: 1.8.0-rc.1 - DAPR_RUNTIME_VER: 1.8.0-rc.3 + DAPR_CLI_VER: 1.8.1 + DAPR_RUNTIME_VER: 1.8.4 DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.8.0-rc.1/install/install.sh DAPR_CLI_REF: - DAPR_REF: 683a436ff1367e4ff2d27da2d79069c04ec2c46d + DAPR_REF: 0e34de1086c697d7d74ac2af14ddb11f9346b57e steps: - uses: actions/checkout@v3 - name: Set up OpenJDK ${{ env.JDK_VER }} @@ -145,10 +145,14 @@ jobs: working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/unittesting/README.md - - name: Validate Configuration API example + - name: Validate Configuration gRPC API example working-directory: ./examples run: | mm.py ./src/main/java/io/dapr/examples/configuration/grpc/README.md + - name: Validate Configuration HTTP API example + working-directory: ./examples + run: | + mm.py ./src/main/java/io/dapr/examples/configuration/http/README.md - name: Validate query state HTTP example working-directory: ./examples run: | diff --git a/examples/src/main/java/io/dapr/examples/configuration/grpc/ConfigurationClient.java b/examples/src/main/java/io/dapr/examples/configuration/grpc/ConfigurationClient.java index ff49c243d..69e36b5f2 100644 --- a/examples/src/main/java/io/dapr/examples/configuration/grpc/ConfigurationClient.java +++ b/examples/src/main/java/io/dapr/examples/configuration/grpc/ConfigurationClient.java @@ -18,6 +18,8 @@ import io.dapr.client.DaprPreviewClient; import io.dapr.client.domain.ConfigurationItem; import io.dapr.client.domain.GetConfigurationRequest; import io.dapr.client.domain.SubscribeConfigurationRequest; +import io.dapr.client.domain.SubscribeConfigurationResponse; +import io.dapr.client.domain.UnsubscribeConfigurationResponse; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -25,6 +27,7 @@ import reactor.core.publisher.Mono; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -43,41 +46,8 @@ public class ConfigurationClient { public static void main(String[] args) throws Exception { try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) { System.out.println("Using preview client..."); - getConfigurationForaSingleKey(client); - getConfigurationsUsingVarargs(client); getConfigurations(client); - subscribeConfigurationRequestWithSubscribe(client); - } - } - - /** - * Gets configuration for a single key. - * - * @param client DaprPreviewClient object - */ - public static void getConfigurationForaSingleKey(DaprPreviewClient client) { - System.out.println("*******trying to retrieve configuration given a single key********"); - try { - Mono item = client.getConfiguration(CONFIG_STORE_NAME, keys.get(0)); - System.out.println("Value ->" + item.block().getValue() + " key ->" + item.block().getKey()); - } catch (Exception ex) { - System.out.println(ex.getMessage()); - } - } - - /** - * Gets configurations for varibale no. of arguments. - * - * @param client DaprPreviewClient object - */ - public static void getConfigurationsUsingVarargs(DaprPreviewClient client) { - System.out.println("*******trying to retrieve configurations for a variable no. of keys********"); - try { - Mono> items = - client.getConfiguration(CONFIG_STORE_NAME, "myconfig1", "myconfig3"); - items.block().forEach((k,v) -> print(v, k)); - } catch (Exception ex) { - System.out.println(ex.getMessage()); + subscribeConfigurationRequest(client); } } @@ -106,36 +76,25 @@ public class ConfigurationClient { * * @param client DaprPreviewClient object */ - public static void subscribeConfigurationRequestWithSubscribe(DaprPreviewClient client) { - System.out.println("*****Subscribing to keys using subscribe method: " + keys.toString() + " *****"); - AtomicReference disposableAtomicReference = new AtomicReference<>(); - SubscribeConfigurationRequest req = new SubscribeConfigurationRequest(CONFIG_STORE_NAME, keys); + public static void subscribeConfigurationRequest(DaprPreviewClient client) { + System.out.println("Subscribing to key: myconfig1"); + SubscribeConfigurationRequest req = new SubscribeConfigurationRequest( + CONFIG_STORE_NAME, Collections.singletonList("myconfig1")); + Flux outFlux = client.subscribeConfiguration(req); Runnable subscribeTask = () -> { - Flux> outFlux = client.subscribeToConfiguration(req); - disposableAtomicReference.set(outFlux - .subscribe( - cis -> cis.forEach((k,v) -> print(v, k)) - )); + outFlux.subscribe(cis -> { + System.out.println("subscription ID : " + cis.getSubscriptionId()); + System.out.println("subscribing to key myconfig1 is successful"); + }); }; new Thread(subscribeTask).start(); + // To ensure main thread does not die before outFlux subscribe gets called + inducingSleepTime(5000); + } + + private static void inducingSleepTime(int timeInMillis) { try { - // To ensure that subscribeThread gets scheduled - Thread.sleep(0); - } catch (InterruptedException e) { - e.printStackTrace(); - } - Runnable updateKeys = () -> { - int i = 1; - while (i <= 3) { - executeDockerCommand(i); - i++; - } - }; - new Thread(updateKeys).start(); - try { - // To ensure main thread does not die before outFlux subscribe gets called - Thread.sleep(10000); - disposableAtomicReference.get().dispose(); + Thread.sleep(timeInMillis); } catch (InterruptedException e) { e.printStackTrace(); } @@ -144,22 +103,4 @@ public class ConfigurationClient { private static void print(ConfigurationItem item, String key) { System.out.println(item.getValue() + " : key ->" + key); } - - private static void executeDockerCommand(int postfix) { - String[] command = new String[] { - "docker", "exec", "dapr_redis", "redis-cli", - "SET", - "myconfig" + postfix, "update_myconfigvalue" + postfix + "||2" - }; - ProcessBuilder processBuilder = new ProcessBuilder(command); - Process process = null; - try { - process = processBuilder.start(); - process.waitFor(); - } catch (IOException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } } diff --git a/examples/src/main/java/io/dapr/examples/configuration/grpc/README.md b/examples/src/main/java/io/dapr/examples/configuration/grpc/README.md index 3785e9f5f..832f65871 100644 --- a/examples/src/main/java/io/dapr/examples/configuration/grpc/README.md +++ b/examples/src/main/java/io/dapr/examples/configuration/grpc/README.md @@ -7,7 +7,8 @@ This example provides the different capabilities provided by Dapr Java SDK for C The java SDK exposes several methods for this - * `client.getConfiguration(...)` for getting a configuration for a single/multiple keys. -* `client.subscribeToConfigurations(...)` for subscribing to a list of keys for any change. +* `client.subscribeConfiguration(...)` for subscribing to a list of keys for any change. +* `client.unsubscribeConfiguration(...)` for unsubscribing to changes from subscribed items. ## Pre-requisites @@ -48,6 +49,81 @@ docker exec dapr_redis redis-cli MSET myconfig1 "val1||1" myconfig2 "val2||1" my ### Running the example +This example uses the Java SDK Dapr client in order to **Get, Subscribe and Unsubscribe** from configuration items and utilizes `Redis` as configuration store. +`ConfigurationClient.java` is the example class demonstrating all 3 features. +Kindly check [DaprPreviewClient.java](https://github.com/dapr/java-sdk/blob/master/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java) for detailed description of the supported APIs. + +```java +public class ConfigurationClient { + // ... + /** + * Executes various methods to check the different apis. + * @param args arguments + * @throws Exception throws Exception + */ + public static void main(String[] args) throws Exception { + try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) { + System.out.println("Using preview client..."); + getConfigurations(client); + subscribeConfigurationRequestWithSubscribe(client); + unsubscribeConfigurationItems(client); + } + } + + /** + * Gets configurations for a list of keys. + * + * @param client DaprPreviewClient object + */ + public static void getConfigurations(DaprPreviewClient client) { + System.out.println("*******trying to retrieve configurations for a list of keys********"); + List keys = new ArrayList<>(); + // ... + GetConfigurationRequest req = new GetConfigurationRequest(CONFIG_STORE_NAME, keys); + try { + Mono> items = client.getConfiguration(req); + // .. + } catch (Exception ex) {} + } + + /** + * Subscribe to a list of keys.Optional to above iterator way of retrieving the changes + * + * @param client DaprPreviewClient object + */ + public static void subscribeConfigurationRequestWithSubscribe(DaprPreviewClient client) { + System.out.println("Subscribing to key: myconfig1"); + // ... + Runnable subscribeTask = () -> { + Flux outFlux = client.subscribeConfiguration(req); + // ... + }; + // .. + } + + /** + * Unsubscribe using subscription id. + * + * @param client DaprPreviewClient object + */ + public static void unsubscribeConfigurationItems(DaprPreviewClient client) { + System.out.println("Subscribing to key: myconfig2"); + // .. + Runnable subscribeTask = () -> { + Flux outFlux = client.subscribeConfiguration(CONFIG_STORE_NAME, "myconfig2"); + // ... + }; + // ... + + UnsubscribeConfigurationResponse res = client.unsubscribeConfiguration( + subscriptionId.get(), + CONFIG_STORE_NAME + ).block(); + // .. + } +} +``` + Get into the examples' directory: ```sh cd examples @@ -59,21 +135,16 @@ Use the following command to run this example- name: Run ConfigurationClient example expected_stdout_lines: - "== APP == Using preview client..." - - "== APP == *******trying to retrieve configuration given a single key********" - - "== APP == Value ->val1 key ->myconfig1" - - "== APP == *******trying to retrieve configurations for a variable no. of keys********" - - "== APP == val1 : key ->myconfig1" - - "== APP == val3 : key ->myconfig3" - "== APP == *******trying to retrieve configurations for a list of keys********" - "== APP == val1 : key ->myconfig1" - "== APP == val2 : key ->myconfig2" - "== APP == val3 : key ->myconfig3" - - "== APP == *****Subscribing to keys using subscribe method: [myconfig1, myconfig3, myconfig2] *****" - - "== APP == update_myconfigvalue1 : key ->myconfig1" - - "== APP == update_myconfigvalue2 : key ->myconfig2" - - "== APP == update_myconfigvalue3 : key ->myconfig3" + - "== APP == Subscribing to key: myconfig1" + - "== APP == subscription ID :" + - "== APP == subscribing to key myconfig1 is successful" background: true -sleep: 5 +output_match_mode: substring +sleep: 10 --> ```bash @@ -85,19 +156,14 @@ dapr run --components-path ./components/configuration --app-id configgrpc --log- ### Sample output ``` == APP == Using preview client... -== APP == *******trying to retrieve configuration given a single key******** -== APP == Value ->val1 key ->myconfig1 -== APP == *******trying to retrieve configurations for a variable no. of keys******** -== APP == val1 : key ->myconfig1 -== APP == val3 : key ->myconfig3 == APP == *******trying to retrieve configurations for a list of keys******** == APP == val1 : key ->myconfig1 == APP == val2 : key ->myconfig2 == APP == val3 : key ->myconfig3 -== APP == *****Subscribing to keys using subscribe method: [myconfig1, myconfig3, myconfig2] ***** -== APP == update_myconfigvalue1 : key ->myconfig1 -== APP == update_myconfigvalue2 : key ->myconfig2 -== APP == update_myconfigvalue3 : key ->myconfig3 +== APP == Subscribing to key: myconfig1 +== APP == subscription ID : 82bb8e24-f69d-477a-9126-5ffaf995f498 +== APP == subscribing to key myconfig1 is successful + ``` ### Cleanup diff --git a/examples/src/main/java/io/dapr/examples/configuration/http/ConfigurationClient.java b/examples/src/main/java/io/dapr/examples/configuration/http/ConfigurationClient.java new file mode 100644 index 000000000..9ad93d625 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/configuration/http/ConfigurationClient.java @@ -0,0 +1,102 @@ +/* + * Copyright 2022 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.configuration.http; + +import io.dapr.client.DaprApiProtocol; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.DaprPreviewClient; +import io.dapr.client.domain.ConfigurationItem; +import io.dapr.client.domain.GetConfigurationRequest; +import io.dapr.client.domain.SubscribeConfigurationRequest; +import io.dapr.client.domain.SubscribeConfigurationResponse; +import io.dapr.config.Properties; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ConfigurationClient { + + private static final String CONFIG_STORE_NAME = "configstore"; + private static String SUBSCRIPTION_ID; + + /** + * Executes various methods to check the different apis. + * @param args arguments + * @throws Exception throws Exception + */ + public static void main(String[] args) throws Exception { + System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.HTTP.name()); + try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) { + System.out.println("Using preview client..."); + getConfigurations(client); + subscribeConfigurationRequest(client); + } + } + + /** + * Gets configurations for a list of keys. + * + * @param client DaprPreviewClient object + */ + public static void getConfigurations(DaprPreviewClient client) { + System.out.println("*******trying to retrieve configurations for a list of keys********"); + List keys = new ArrayList<>(); + keys.add("myconfig1"); + keys.add("myconfig2"); + keys.add("myconfig3"); + + Map hmap = new HashMap<>(); + hmap.put("meta_key","meta_value"); + GetConfigurationRequest req = new GetConfigurationRequest(CONFIG_STORE_NAME, keys); + req.setMetadata(hmap); + + try { + Mono> items = client.getConfiguration(req); + items.block().forEach((k,v) -> print(v, k)); + } catch (Exception ex) { + System.out.println(ex.getMessage()); + } + } + + /** + * Subscribe to a list of keys. + * + * @param client DaprPreviewClient object + */ + public static void subscribeConfigurationRequest(DaprPreviewClient client) throws InterruptedException { + System.out.println("Subscribing to key: myconfig2"); + SubscribeConfigurationRequest req = new SubscribeConfigurationRequest( + CONFIG_STORE_NAME, Collections.singletonList("myconfig2")); + Flux outFlux = client.subscribeConfiguration(req); + outFlux.subscribe( + cis -> { + SUBSCRIPTION_ID = cis.getSubscriptionId(); + }); + if (!SUBSCRIPTION_ID.isEmpty()) { + System.out.println("subscribing to myconfig2 is successful"); + } else { + System.out.println("error in subscribing to myconfig2"); + } + Thread.sleep(5000); + } + + private static void print(ConfigurationItem item, String key) { + System.out.println(item.getValue() + " : key ->" + key); + } +} diff --git a/examples/src/main/java/io/dapr/examples/configuration/http/ConfigurationHandler.java b/examples/src/main/java/io/dapr/examples/configuration/http/ConfigurationHandler.java new file mode 100644 index 000000000..370318528 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/configuration/http/ConfigurationHandler.java @@ -0,0 +1,64 @@ +/* + * Copyright 2022 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.configuration.http; + +import io.dapr.client.domain.SubscribeConfigurationResponse; +import io.dapr.examples.DaprApplication; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +/** + * A sample springboot application to register and receive for updates on configuration items sent by Dapr. + * Users are free to write their own controllers to handle any specific route suited to the need. + * java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.configuration.http.ConfigurationSubscriber -p 3009 + */ +@RestController +public class ConfigurationHandler { + + /** + * Receives a subscription change notification. + * @param configStore Path variables for post call + * @param key Key whose value has changed + * @param response Configuration response + */ + @PostMapping(path = "/configuration/{configStore}/{key}", produces = MediaType.ALL_VALUE) + public void handleConfigUpdate(@PathVariable("configStore") String configStore, + @PathVariable("key") String key, + @RequestBody SubscribeConfigurationResponse response) { + System.out.println("Configuration update received for store: " + configStore); + response.getItems().forEach((k,v) -> System.out.println("Key: " + k + " Value :" + v.getValue())); + } + + /** + * This is entry point for Configuration Subscriber service. + * @param args Arguments for main + * @throws Exception Throws Exception + */ + public static void main(String[] args) throws Exception { + Options options = new Options(); + options.addRequiredOption("p", "port", true, "The port this app will listen on"); + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(options, args); + // If port string is not valid, it will throw an exception. + int port = Integer.parseInt(cmd.getOptionValue("port")); + DaprApplication.start(port); + } +} diff --git a/examples/src/main/java/io/dapr/examples/configuration/http/README.md b/examples/src/main/java/io/dapr/examples/configuration/http/README.md new file mode 100644 index 000000000..edb0107a1 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/configuration/http/README.md @@ -0,0 +1,208 @@ +## Retrieve Configurations via Configuration API + +This example provides the different capabilities provided by Dapr Java SDK for Configuration. For further information about Configuration APIs please refer to [this link](https://docs.dapr.io/developing-applications/building-blocks/configuration/) +**This API is available in Preview Mode**. + +### Using the ConfigurationAPI + +The java SDK exposes several methods for this - +* `client.getConfiguration(...)` for getting a configuration for a single/multiple keys. +* `client.subscribeConfiguration(...)` for subscribing to a list of keys for any change. +* `client.unsubscribeConfiguration(...)` for unsubscribing to changes from subscribed items. + +## Pre-requisites + +* [Dapr and Dapr Cli](https://docs.dapr.io/getting-started/install-dapr/). +* Java JDK 11 (or greater): + * [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11) + * [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) + * [OpenJDK 11](https://jdk.java.net/11/) +* [Apache Maven](https://maven.apache.org/install.html) version 3.x. + +### Checking out the code + +Clone this repository: + +```sh +git clone https://github.com/dapr/java-sdk.git +cd java-sdk +``` + +Then build the Maven project: + +```sh +# make sure you are in the `java-sdk` directory. +mvn install +``` +## Store few dummy configurations in configurationstore + + +```bash +docker exec dapr_redis redis-cli MSET myconfig1 "val1||1" myconfig2 "val2||1" myconfig3 "val3||1" +``` + + +### Running the example + +This example uses the Java SDK Dapr client in order to **Get, Subscribe and Unsubscribe** from configuration items and utilizes `Redis` as configuration store. +`ConfigurationClient.java` is the example class demonstrating all 3 features. +Kindly check [DaprPreviewClient.java](https://github.com/dapr/java-sdk/blob/master/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java) for detailed description of the supported APIs. + +```java +public class ConfigurationClient { + // ... + /** + * Executes various methods to check the different apis. + * @param args arguments + * @throws Exception throws Exception + */ + public static void main(String[] args) throws Exception { + System.getProperties().setProperty(Properties.API_PROTOCOL.getName(), DaprApiProtocol.HTTP.name()); + try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) { + System.out.println("Using preview client..."); + getConfigurations(client); + subscribeConfigurationRequest(client); + } + } + + /** + * Gets configurations for a list of keys. + * + * @param client DaprPreviewClient object + */ + public static void getConfigurations(DaprPreviewClient client) { + System.out.println("*******trying to retrieve configurations for a list of keys********"); + GetConfigurationRequest req = new GetConfigurationRequest(CONFIG_STORE_NAME, keys); + try { + Mono> items = client.getConfiguration(req); + // ... + } catch (Exception ex) { + // ... + } + } + + /** + * Subscribe to a list of keys. + * + * @param client DaprPreviewClient object + */ + public static void subscribeConfigurationRequest(DaprPreviewClient client) { + // ... + SubscribeConfigurationRequest req = new SubscribeConfigurationRequest( + CONFIG_STORE_NAME, Collections.singletonList("myconfig2")); + Runnable subscribeTask = () -> { + Flux outFlux = client.subscribeConfiguration(req); + // ... + }; + // .. + } +} +``` + +Get into the examples' directory: +```sh +cd examples +``` + +#### Running the configuration subscriber app: + +`DaprApplication.start()` Method will run a Spring Boot application containing a `ConfigurationHandler`, a contoller +to receive configuration change notifications. + +```java +@RestController +public class ConfigurationHandler { + //... + @PostMapping(path = "/configuration/{configStore}/{key}", produces = MediaType.ALL_VALUE) + public void handleConfigUpdate(@PathVariable("configStore") String configStore, + @PathVariable("key") String key, + @RequestBody SubscribeConfigurationResponse response) { + System.out.println("Configuration update received for store: " + configStore); + response.getItems().forEach((k,v) -> System.out.println("Key: " + k + " Value :" + v.getValue())); + } + //.... +} +``` +Execute the following script to run the ConfigSubscriber app: + + + +```bash +dapr run --app-id confighandler -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.configuration.http.ConfigurationHandler -p 3009 +``` + + +#### Running the ConfigurationClient app: + +Use the following command to run this example- + + + +```bash +dapr run --components-path ./components/configuration --app-id confighttp --log-level debug --app-port 3009 --dapr-http-port 3500 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.configuration.http.ConfigurationClient +``` + +#### Update myconfig2 key in configurationstore + + + + +```bash +docker exec dapr_redis redis-cli MSET myconfig2 "updated_val2||1" +``` + + +### Sample output +``` +== APP == Using preview client... +== APP == *******trying to retrieve configurations for a list of keys******** +== APP == val1 : key ->myconfig1 +== APP == val2 : key ->myconfig2 +== APP == val3 : key ->myconfig3 +== APP == Subscribing to key: myconfig2 +== APP == subscribing to myconfig2 is successful +``` +### Cleanup + +To stop the app, run (or press CTRL+C): + + + +```bash +dapr stop --app-id confighttp +dapr stop --app-id confighandler +``` + + diff --git a/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java b/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java index 298b8bca3..bfee1ac80 100644 --- a/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java +++ b/sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java @@ -13,8 +13,6 @@ limitations under the License. package io.dapr.springboot; -import io.dapr.Rule; - import java.util.HashMap; import java.util.List; import java.util.Map; @@ -24,7 +22,6 @@ import java.util.stream.Collectors; * Internal Singleton to handle Dapr configuration. */ class DaprRuntime { - /** * The singleton instance. */ @@ -63,7 +60,8 @@ class DaprRuntime { * * @param pubsubName Pubsub name to subcribe to. * @param topicName Name of the topic being subscribed to. - * @param rule The optional rule for this route. + * @param match Match expression for this route. + * @param priority Priority for this match relative to others. * @param route Destination route for requests. * @param metadata Metadata for extended subscription functionality. */ diff --git a/sdk-tests/src/test/java/io/dapr/it/configuration/grpc/ConfigurationClientIT.java b/sdk-tests/src/test/java/io/dapr/it/configuration/grpc/ConfigurationClientIT.java index d2143143f..e64be4103 100644 --- a/sdk-tests/src/test/java/io/dapr/it/configuration/grpc/ConfigurationClientIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/configuration/grpc/ConfigurationClientIT.java @@ -16,6 +16,8 @@ package io.dapr.it.configuration.grpc; import io.dapr.client.DaprClientBuilder; import io.dapr.client.DaprPreviewClient; import io.dapr.client.domain.ConfigurationItem; +import io.dapr.client.domain.SubscribeConfigurationResponse; +import io.dapr.client.domain.UnsubscribeConfigurationResponse; import io.dapr.it.BaseIT; import io.dapr.it.DaprRun; @@ -25,7 +27,6 @@ import reactor.core.publisher.Flux; import java.io.IOException; import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; @@ -81,13 +82,6 @@ public class ConfigurationClientIT extends BaseIT { assertEquals(ci.getValue(), "myconfigvalue1"); } - @Test - public void getConfigurationWithEmptyKey() { - assertThrows(IllegalArgumentException.class, () -> { - daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "").block(); - }); - } - @Test public void getConfigurations() { Map cis = daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2").block(); @@ -98,27 +92,19 @@ public class ConfigurationClientIT extends BaseIT { } @Test - public void getConfigurationsWithEmptyList() { - List listOfKeys = new ArrayList<>(); - Map metadata = new HashMap<>(); - assertThrows(IllegalArgumentException.class, () -> { - daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, listOfKeys, metadata).block(); - }); - } - - @Test - public void subscribeToConfiguration() { - List updatedValues = new ArrayList<>(); - AtomicReference disposable = new AtomicReference<>(); + public void subscribeConfiguration() { Runnable subscribeTask = () -> { - Flux> outFlux = daprPreviewClient - .subscribeToConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2"); - disposable.set(outFlux.subscribe(update -> { - updatedValues.add(update.entrySet() - .stream() - .findFirst() - .get().getValue().getValue()); - })); + Flux outFlux = daprPreviewClient + .subscribeConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2"); + outFlux.subscribe(update -> { + if (update.getItems().size() == 0 ) { + assertTrue(update.getSubscriptionId().length() > 0); + } else { + String value = update.getItems().entrySet().stream().findFirst().get().getValue().getValue(); + assertEquals(update.getItems().size(), 1); + assertTrue(value.contains("update_")); + } + }); }; Thread subscribeThread = new Thread(subscribeTask); subscribeThread.start(); @@ -134,15 +120,73 @@ public class ConfigurationClientIT extends BaseIT { new Thread(updateKeys).start(); try { // To ensure main thread does not die before outFlux subscribe gets called - Thread.sleep(3000); - disposable.get().dispose(); + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + public void unsubscribeConfigurationItems() { + List updatedValues = new ArrayList<>(); + AtomicReference disposableAtomicReference = new AtomicReference<>(); + AtomicReference subscriptionId = new AtomicReference<>(); + Runnable subscribeTask = () -> { + Flux outFlux = daprPreviewClient + .subscribeConfiguration(CONFIG_STORE_NAME, "myconfigkey1"); + disposableAtomicReference.set(outFlux + .subscribe(update -> { + subscriptionId.set(update.getSubscriptionId()); + updatedValues.add(update.getItems().entrySet().stream().findFirst().get().getValue().getValue()); + } + )); + }; + new Thread(subscribeTask).start(); + + // To ensure that subscribeThread gets scheduled + inducingSleepTime(0); + + Runnable updateKeys = () -> { + int i = 1; + while (i <= 5) { + String[] command = new String[] { + "docker", "exec", "dapr_redis", "redis-cli", + "SET", + "myconfigkey1", "update_myconfigvalue" + i + "||2" + }; + executeDockerCommand(command); + i++; + } + }; + new Thread(updateKeys).start(); + + // To ensure key starts getting updated + inducingSleepTime(1000); + + UnsubscribeConfigurationResponse res = daprPreviewClient.unsubscribeConfiguration( + subscriptionId.get(), + CONFIG_STORE_NAME + ).block(); + + assertTrue(res != null); + assertTrue(res.getIsUnsubscribed()); + int listSize = updatedValues.size(); + // To ensure main thread does not die + inducingSleepTime(1000); + + new Thread(updateKeys).start(); + + // To ensure main thread does not die + inducingSleepTime(2000); + assertTrue(updatedValues.size() == listSize); + } + + private static void inducingSleepTime(int timeInMillis) { + try { + Thread.sleep(timeInMillis); } catch (InterruptedException e) { e.printStackTrace(); } - assertEquals(updatedValues.size(), 2); - assertTrue(updatedValues.contains("update_myconfigvalue1")); - assertTrue(updatedValues.contains("update_myconfigvalue2")); - assertFalse(updatedValues.contains("update_myconfigvalue3")); } private static void executeDockerCommand(String[] command) { diff --git a/sdk-tests/src/test/java/io/dapr/it/configuration/http/ConfigSubscriberController.java b/sdk-tests/src/test/java/io/dapr/it/configuration/http/ConfigSubscriberController.java new file mode 100644 index 000000000..335c057c2 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/configuration/http/ConfigSubscriberController.java @@ -0,0 +1,60 @@ +/* + * Copyright 2022 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.configuration.http; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.dapr.client.domain.ConfigurationItem; +import io.dapr.client.domain.SubscribeConfigurationResponse; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +import java.util.Iterator; +import java.util.Map; + +/** + * Spring boot Controller class for api endpoints. + */ +@RestController +public class ConfigSubscriberController { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + /** + * Api mapping for subscribe configuration. + * @param pathVarsMap Path variables for post call + * @param obj request Body + * @return Returns void + */ + @PostMapping(path = "/configuration/{configStore}/{key}", produces = MediaType.APPLICATION_JSON_VALUE) + public Mono handleMessage( + @PathVariable Map pathVarsMap, + @RequestBody SubscribeConfigurationResponse obj) { + return Mono.fromRunnable( + () -> { + try { + Map items = obj.getItems(); + for (Map.Entry entry : items.entrySet()) { + System.out.println(entry.getValue().getValue() + " : key ->" + entry.getKey()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + ); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/configuration/http/ConfigurationIT.java b/sdk-tests/src/test/java/io/dapr/it/configuration/http/ConfigurationIT.java new file mode 100644 index 000000000..dfd85787a --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/configuration/http/ConfigurationIT.java @@ -0,0 +1,82 @@ +package io.dapr.it.configuration.http; + +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.DaprPreviewClient; +import io.dapr.client.domain.ConfigurationItem; +import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.*; + +import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; + +public class ConfigurationIT extends BaseIT { + private static final String CONFIG_STORE_NAME = "redisconfigstore"; + + private static DaprRun daprRun; + + private static DaprPreviewClient daprPreviewClient; + + private static String key = "myconfig1"; + + private static List keys = new ArrayList<>(Arrays.asList("myconfig1", "myconfig2", "myconfig3")); + + private static String[] insertCmd = new String[] { + "docker", "exec", "dapr_redis", "redis-cli", + "MSET", + "myconfigkey1", "myconfigvalue1||1", + "myconfigkey2", "myconfigvalue2||1", + "myconfigkey3", "myconfigvalue3||1" + }; + + @BeforeClass + public static void init() throws Exception { + daprRun = startDaprApp(ConfigurationIT.class.getSimpleName(), 5000); + daprRun.switchToHTTP(); + daprPreviewClient = new DaprClientBuilder().buildPreviewClient(); + } + + @AfterClass + public static void tearDown() throws Exception { + daprPreviewClient.close(); + } + + @Before + public void setupConfigStore() { + executeDockerCommand(insertCmd); + } + + @Test + public void getConfiguration() { + ConfigurationItem ci = daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "myconfigkey1").block(); + assertEquals(ci.getKey(), "myconfigkey1"); + assertEquals(ci.getValue(), "myconfigvalue1"); + } + + @Test + public void getConfigurations() { + Map cis = daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2").block(); + assertTrue(cis.size() == 2); + assertTrue(cis.containsKey("myconfigkey1")); + assertTrue(cis.containsKey("myconfigkey2")); + } + + private static void executeDockerCommand(String[] command) { + ProcessBuilder processBuilder = new ProcessBuilder(command); + Process process = null; + try { + process = processBuilder.start(); + process.waitFor(); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/configuration/http/ConfigurationSubscribeIT.java b/sdk-tests/src/test/java/io/dapr/it/configuration/http/ConfigurationSubscribeIT.java new file mode 100644 index 000000000..d86dd0660 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/configuration/http/ConfigurationSubscribeIT.java @@ -0,0 +1,91 @@ +package io.dapr.it.configuration.http; + +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.DaprPreviewClient; +import io.dapr.client.domain.SubscribeConfigurationResponse; +import io.dapr.client.domain.UnsubscribeConfigurationResponse; +import io.dapr.it.BaseIT; +import io.dapr.it.DaprRun; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import reactor.core.publisher.Flux; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertTrue; + +public class ConfigurationSubscribeIT extends BaseIT { + private static final String CONFIG_STORE_NAME = "redisconfigstore"; + + private static DaprRun daprRun; + + private static DaprPreviewClient daprPreviewClient; + + private static String key = "myconfig1"; + + private static List keys = new ArrayList<>(Arrays.asList("myconfig1", "myconfig2", "myconfig3")); + + private static String[] insertCmd = new String[] { + "docker", "exec", "dapr_redis", "redis-cli", + "MSET", + "myconfigkey1", "myconfigvalue1||1", + "myconfigkey2", "myconfigvalue2||1", + "myconfigkey3", "myconfigvalue3||1" + }; + + @BeforeClass + public static void init() throws Exception { + daprRun = startDaprApp( + ConfigurationIT.class.getSimpleName(), + ConfigurationSubscriberService.SUCCESS_MESSAGE, + ConfigurationSubscriberService.class, + true, + 60000); + daprRun.switchToHTTP(); + daprPreviewClient = new DaprClientBuilder().buildPreviewClient(); + } + + @AfterClass + public static void tearDown() throws Exception { + daprPreviewClient.close(); + } + + @Before + public void setupConfigStore() { + executeDockerCommand(insertCmd); + } + + @Test + public void subscribeAndUnsubscribeConfiguration() { + AtomicReference subId= new AtomicReference<>(""); + Flux outFlux = daprPreviewClient + .subscribeConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2"); + outFlux.subscribe(items -> { + subId.set(items.getSubscriptionId()); + }); + assertTrue(subId.get().length() > 0); + + UnsubscribeConfigurationResponse res = daprPreviewClient.unsubscribeConfiguration( + subId.get(), + CONFIG_STORE_NAME + ).block(); + assertTrue(res.getIsUnsubscribed()); + } + + private static void executeDockerCommand(String[] command) { + ProcessBuilder processBuilder = new ProcessBuilder(command); + Process process = null; + try { + process = processBuilder.start(); + process.waitFor(); + } catch (IOException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/configuration/http/ConfigurationSubscriberService.java b/sdk-tests/src/test/java/io/dapr/it/configuration/http/ConfigurationSubscriberService.java new file mode 100644 index 000000000..1589c562e --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/configuration/http/ConfigurationSubscriberService.java @@ -0,0 +1,52 @@ +/* + * Copyright 2022 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.configuration.http; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * Service for ConfigurationSubscriber. + * dapr run --components-path ./components/configuration --app-id configsubscriber --app-port 3000 -- \ + * java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.configuration.http.ConfigurationSubscriber -p 3000 + */ +@SpringBootApplication +public class ConfigurationSubscriberService { + + public static final String SUCCESS_MESSAGE = "dapr initialized. Status: Running"; + + /** + * This is entry point for Configuration Subscriber service. + * @param args Arguments for main + * @throws Exception Throws Exception + */ + public static void main(String[] args) throws Exception { + int port = Integer.parseInt(args[0]); + + System.out.printf("Service starting on port %d ...\n", port); + + // Start Dapr's callback endpoint. + start(port); + } + + /** + * Starts Dapr's callback in a given port. + * + * @param port Port to listen to. + */ + private static void start(int port) { + SpringApplication app = new SpringApplication(ConfigurationSubscriberService.class); + app.run(String.format("--server.port=%d", port)); + } +} diff --git a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java index 417746705..2cfc5e4b8 100644 --- a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java +++ b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java @@ -32,7 +32,10 @@ import io.dapr.client.domain.SaveStateRequest; import io.dapr.client.domain.State; import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.SubscribeConfigurationRequest; +import io.dapr.client.domain.SubscribeConfigurationResponse; import io.dapr.client.domain.TransactionalStateOperation; +import io.dapr.client.domain.UnsubscribeConfigurationRequest; +import io.dapr.client.domain.UnsubscribeConfigurationResponse; import io.dapr.client.domain.query.Query; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.utils.TypeRef; @@ -543,22 +546,32 @@ abstract class AbstractDaprClient implements DaprClient, DaprPreviewClient { /** * {@inheritDoc} */ - public Flux> subscribeToConfiguration(String storeName, String... keys) { + public Flux subscribeConfiguration(String storeName, String... keys) { List listOfKeys = filterEmptyKeys(keys); SubscribeConfigurationRequest request = new SubscribeConfigurationRequest(storeName, listOfKeys); - return this.subscribeToConfiguration(request); + return this.subscribeConfiguration(request); } /** * {@inheritDoc} */ - public Flux> subscribeToConfiguration( + public Flux subscribeConfiguration( String storeName, List keys, Map metadata) { SubscribeConfigurationRequest request = new SubscribeConfigurationRequest(storeName, keys); request.setMetadata(metadata); - return this.subscribeToConfiguration(request); + return this.subscribeConfiguration(request); + } + + /** + * {@inheritDoc} + */ + public Mono unsubscribeConfiguration( + String id, + String storeName) { + UnsubscribeConfigurationRequest request = new UnsubscribeConfigurationRequest(id, storeName); + return this.unsubscribeConfiguration(request); } private List filterEmptyKeys(String... keys) { diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index 593b37af4..a2113f0ab 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -36,7 +36,10 @@ import io.dapr.client.domain.SaveStateRequest; import io.dapr.client.domain.State; import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.SubscribeConfigurationRequest; +import io.dapr.client.domain.SubscribeConfigurationResponse; import io.dapr.client.domain.TransactionalStateOperation; +import io.dapr.client.domain.UnsubscribeConfigurationRequest; +import io.dapr.client.domain.UnsubscribeConfigurationResponse; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; import io.dapr.internal.opencensus.GrpcWrapper; @@ -776,9 +779,7 @@ public class DaprClientGrpc extends AbstractDaprClient { if ((configurationStoreName == null) || (configurationStoreName.trim().isEmpty())) { throw new IllegalArgumentException("Configuration Store Name cannot be null or empty."); } - if (keys.isEmpty()) { - throw new IllegalArgumentException("Keys can not be empty or null"); - } + DaprProtos.GetConfigurationRequest.Builder builder = DaprProtos.GetConfigurationRequest.newBuilder() .setStoreName(configurationStoreName).addAllKeys(keys); if (metadata != null) { @@ -816,7 +817,7 @@ public class DaprClientGrpc extends AbstractDaprClient { * {@inheritDoc} */ @Override - public Flux> subscribeToConfiguration(SubscribeConfigurationRequest request) { + public Flux subscribeConfiguration(SubscribeConfigurationRequest request) { try { final String configurationStoreName = request.getStoreName(); final List keys = request.getKeys(); @@ -825,12 +826,12 @@ public class DaprClientGrpc extends AbstractDaprClient { if (configurationStoreName == null || (configurationStoreName.trim().isEmpty())) { throw new IllegalArgumentException("Configuration Store Name can not be null or empty."); } - if (keys.isEmpty()) { - throw new IllegalArgumentException("Keys can not be null or empty."); - } + + // keys can and empty list for subscribe all scenario, so we do not need check for empty keys. DaprProtos.SubscribeConfigurationRequest.Builder builder = DaprProtos.SubscribeConfigurationRequest.newBuilder() .setStoreName(configurationStoreName) .addAllKeys(keys); + if (metadata != null) { builder.putAllMetadata(metadata); } @@ -841,12 +842,12 @@ public class DaprClientGrpc extends AbstractDaprClient { ).map( it -> { Map configMap = new HashMap<>(); - Iterator> itr = it.getItems().entrySet().iterator(); + Iterator> itr = it.getItemsMap().entrySet().iterator(); while (itr.hasNext()) { Map.Entry entry = itr.next(); configMap.put(entry.getKey(), buildConfigurationItem(entry.getValue(), entry.getKey())); } - return Collections.unmodifiableMap(configMap); + return new SubscribeConfigurationResponse(it.getId(), Collections.unmodifiableMap(configMap)); } ); } catch (Exception ex) { @@ -854,6 +855,38 @@ public class DaprClientGrpc extends AbstractDaprClient { } } + /** + * {@inheritDoc} + */ + @Override + public Mono unsubscribeConfiguration(UnsubscribeConfigurationRequest request) { + try { + final String configurationStoreName = request.getStoreName(); + final String id = request.getSubscriptionId(); + + if (configurationStoreName == null || (configurationStoreName.trim().isEmpty())) { + throw new IllegalArgumentException("Configuration Store Name can not be null or empty."); + } + if (id.isEmpty()) { + throw new IllegalArgumentException("Subscription id can not be null or empty."); + } + DaprProtos.UnsubscribeConfigurationRequest.Builder builder = + DaprProtos.UnsubscribeConfigurationRequest.newBuilder() + .setId(id) + .setStoreName(configurationStoreName); + + DaprProtos.UnsubscribeConfigurationRequest envelope = builder.build(); + + return this.createMono( + it -> intercept(asyncStub).unsubscribeConfigurationAlpha1(envelope, it) + ).map( + it -> new UnsubscribeConfigurationResponse(it.getOk(), it.getMessage()) + ); + } catch (Exception ex) { + return DaprException.wrapMono(ex); + } + } + /** * Build a new Configuration Item from provided parameter. * diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java index 0d42af879..8de0d1718 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java @@ -34,8 +34,11 @@ import io.dapr.client.domain.SaveStateRequest; import io.dapr.client.domain.State; import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.SubscribeConfigurationRequest; +import io.dapr.client.domain.SubscribeConfigurationResponse; import io.dapr.client.domain.TransactionalStateOperation; import io.dapr.client.domain.TransactionalStateRequest; +import io.dapr.client.domain.UnsubscribeConfigurationRequest; +import io.dapr.client.domain.UnsubscribeConfigurationResponse; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; import io.dapr.serializer.DaprObjectSerializer; @@ -54,6 +57,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; @@ -768,15 +772,150 @@ public class DaprClientHttp extends AbstractDaprClient { */ @Override public Mono> getConfiguration(GetConfigurationRequest request) { - return DaprException.wrapMono(new UnsupportedOperationException()); + try { + final String configurationStoreName = request.getStoreName(); + final List keys = request.getKeys(); + final Map metadata = request.getMetadata(); + + if ((configurationStoreName == null) || (configurationStoreName.trim().isEmpty())) { + throw new IllegalArgumentException("Configuration Store Name cannot be null or empty."); + } + + Map> queryParams = new HashMap<>(); + if (!keys.isEmpty()) { + queryParams.put("key", Collections.unmodifiableList(keys)); + } + + // Appending passed metadata too into queryparams + Map> queryArgs = metadataToQueryArgs(metadata); + queryParams.putAll(queryArgs); + + String[] pathSegments = new String[] {DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName }; + return Mono.subscriberContext().flatMap( + context -> this.client + .invokeApi( + DaprHttp.HttpMethods.GET.name(), + pathSegments, queryParams, + (String) null, null, context) + ).map( + response -> { + try { + Map m = INTERNAL_SERIALIZER.deserialize(response.getBody(), Map.class); + Set set = m.keySet(); + JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody()); + Iterator itr = set.iterator(); + Map result = new HashMap<>(); + while (itr.hasNext()) { + String key = itr.next(); + String value = root.get(key).path("value").asText(); + String version = root.get(key).path("version").asText(); + result.put(key, new ConfigurationItem( + key, + value, + version, + new HashMap<>() + )); + } + return Collections.unmodifiableMap(result); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + ); + } catch (Exception ex) { + return DaprException.wrapMono(ex); + } } /** * {@inheritDoc} */ @Override - public Flux> subscribeToConfiguration(SubscribeConfigurationRequest request) { - return DaprException.wrapFlux(new UnsupportedOperationException()); + public Flux subscribeConfiguration(SubscribeConfigurationRequest request) { + try { + final String configurationStoreName = request.getStoreName(); + final List keys = request.getKeys(); + final Map metadata = request.getMetadata(); + + if (configurationStoreName == null || (configurationStoreName.trim().isEmpty())) { + throw new IllegalArgumentException("Configuration Store Name can not be null or empty."); + } + + Map> queryParams = new HashMap<>(); + if (!keys.isEmpty()) { + queryParams.put("key", Collections.unmodifiableList(keys)); + } + + // Appending passed metadata too into queryparams + Map> queryArgs = metadataToQueryArgs(metadata); + queryParams.putAll(queryArgs); + + String[] pathSegments = + new String[] { DaprHttp.ALPHA_1_API_VERSION, "configuration", configurationStoreName, "subscribe" }; + SubscribeConfigurationResponse res = Mono.subscriberContext().flatMap( + context -> this.client.invokeApi( + DaprHttp.HttpMethods.GET.name(), + pathSegments, queryParams, + (String) null, null, context + ) + ).map(response -> { + try { + JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody()); + String subscriptionId = root.path("id").asText(); + return new SubscribeConfigurationResponse(subscriptionId, new HashMap<>()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).block(); + if (res != null) { + return Flux.just(res); + } + return Flux.empty(); + } catch (Exception ex) { + return DaprException.wrapFlux(ex); + } + } + + /** + * {@inheritDoc} + */ + @Override + public Mono unsubscribeConfiguration(UnsubscribeConfigurationRequest request) { + try { + final String id = request.getSubscriptionId(); + final String configStoreName = request.getStoreName(); + if (configStoreName == null || (configStoreName.trim().isEmpty())) { + throw new IllegalArgumentException("Configuration Store Name can not be null or empty."); + } + if (id.isEmpty()) { + throw new IllegalArgumentException("Subscription id can not be null or empty."); + } + + String[] pathSegments = new String[] + { DaprHttp.ALPHA_1_API_VERSION, "configuration", configStoreName, id, "unsubscribe" }; + + return Mono.subscriberContext().flatMap( + context -> this.client + .invokeApi( + DaprHttp.HttpMethods.GET.name(), + pathSegments, null, + (String) null, null, context) + ).map( + response -> { + JsonNode root = null; + try { + root = INTERNAL_SERIALIZER.parseNode(response.getBody()); + } catch (IOException e) { + throw new RuntimeException(e); + } + boolean ok = root.path("ok").asBoolean(); + String message = root.path("message").asText(); + return new UnsubscribeConfigurationResponse(ok, message); + } + ); + } catch (Exception ex) { + return DaprException.wrapMono(ex); + } } /** diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index 040f3d74f..9c431f9c5 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -18,6 +18,9 @@ import io.dapr.client.domain.GetConfigurationRequest; import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateResponse; import io.dapr.client.domain.SubscribeConfigurationRequest; +import io.dapr.client.domain.SubscribeConfigurationResponse; +import io.dapr.client.domain.UnsubscribeConfigurationRequest; +import io.dapr.client.domain.UnsubscribeConfigurationResponse; import io.dapr.client.domain.query.Query; import io.dapr.utils.TypeRef; import reactor.core.publisher.Flux; @@ -86,9 +89,9 @@ public interface DaprPreviewClient extends AutoCloseable { * * @param storeName Name of the configuration store * @param keys keys of the configurations which are to be subscribed - * @return Flux of Map of configuration items + * @return Flux of {@link SubscribeConfigurationResponse} instance */ - Flux> subscribeToConfiguration(String storeName, String... keys); + Flux subscribeConfiguration(String storeName, String... keys); /** * Subscribe to the keys for any change. @@ -96,18 +99,35 @@ public interface DaprPreviewClient extends AutoCloseable { * @param storeName Name of the configuration store * @param keys keys of the configurations which are to be subscribed * @param metadata optional metadata - * @return Flux of Map of configuration items + * @return Flux of {@link SubscribeConfigurationResponse} instance */ - Flux> subscribeToConfiguration(String storeName, List keys, - Map metadata); + Flux subscribeConfiguration(String storeName, List keys, + Map metadata); /** * Subscribe to the keys for any change. * * @param request request for subscribing to any change for the given keys in request - * @return Flux of Map of configuration items + * @return Flux of {@link SubscribeConfigurationResponse} instance */ - Flux> subscribeToConfiguration(SubscribeConfigurationRequest request); + Flux subscribeConfiguration(SubscribeConfigurationRequest request); + + /** + * Subscribe to the keys for any change. + * + * @param id subscription id returned by subscribeConfiguration API. + * @param storeName Name of the configuration store. + * @return Mono of {@link UnsubscribeConfigurationResponse} instance. + */ + Mono unsubscribeConfiguration(String id, String storeName); + + /** + * Subscribe to the keys for any change. + * + * @param request request for unsubscribing to any change for the given subscription id in request + * @return Mono of {@link UnsubscribeConfigurationResponse} instance. + */ + Mono unsubscribeConfiguration(UnsubscribeConfigurationRequest request); /** * Query for states using a query string. diff --git a/sdk/src/main/java/io/dapr/client/domain/ConfigurationItem.java b/sdk/src/main/java/io/dapr/client/domain/ConfigurationItem.java index df6ab818c..727c314dc 100644 --- a/sdk/src/main/java/io/dapr/client/domain/ConfigurationItem.java +++ b/sdk/src/main/java/io/dapr/client/domain/ConfigurationItem.java @@ -13,6 +13,9 @@ limitations under the License. package io.dapr.client.domain; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + import java.util.Collections; import java.util.Map; @@ -47,7 +50,11 @@ public class ConfigurationItem { * @param value value for the provided key * @param version version of the configuration item */ - public ConfigurationItem(String key, String value, String version) { + @JsonCreator + public ConfigurationItem( + @JsonProperty("key") String key, + @JsonProperty("value") String value, + @JsonProperty("version") String version) { this.key = key; this.value = value; this.version = version; diff --git a/sdk/src/main/java/io/dapr/client/domain/SubscribeConfigurationResponse.java b/sdk/src/main/java/io/dapr/client/domain/SubscribeConfigurationResponse.java new file mode 100644 index 000000000..aac9723a8 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/SubscribeConfigurationResponse.java @@ -0,0 +1,58 @@ +/* + * Copyright 2022 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Collections; +import java.util.Map; + +/** + * Domain object for response from subscribeConfiguration API. + */ +public class SubscribeConfigurationResponse { + + /** + * Subscription id for the items subscribed to. + */ + private final String subscriptionId; + + /** + * Map of Configuration key to {@link ConfigurationItem}. + */ + private final Map items; + + /** + * Constructor for SubscribeConfigurationResponse. + * + * @param id Subscription id for the items subscribed to.This id is returned by subscribeToConfiguration API. + * @param items Map of configuration items user subscribed to. + */ + @JsonCreator + public SubscribeConfigurationResponse( + @JsonProperty("id") String id, + @JsonProperty("items") Map items) { + this.subscriptionId = id; + this.items = Collections.unmodifiableMap(items); + } + + public Map getItems() { + return items; + } + + public String getSubscriptionId() { + return subscriptionId; + } +} diff --git a/sdk/src/main/java/io/dapr/client/domain/UnsubscribeConfigurationRequest.java b/sdk/src/main/java/io/dapr/client/domain/UnsubscribeConfigurationRequest.java new file mode 100644 index 000000000..6df65ccce --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/UnsubscribeConfigurationRequest.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +/** + * Request to unsubscribe to one or more configuration items using subscription id. + */ +public class UnsubscribeConfigurationRequest { + /** + * Name of the configuration store. + */ + private final String storeName; + /** + * Subscription id for the items to unsubscribe to. + */ + private final String subscriptionId; + + /** + * Constructor for UnsubscribeConfigurationRequest. + * + * @param id Subscription id for the items subscribed to.This id is returned by subscribeConfiguration API. + * @param storeName Name of the configuration store. + */ + public UnsubscribeConfigurationRequest(String id, String storeName) { + this.storeName = storeName; + this.subscriptionId = id; + } + + public String getStoreName() { + return storeName; + } + + public String getSubscriptionId() { + return subscriptionId; + } +} diff --git a/sdk/src/main/java/io/dapr/client/domain/UnsubscribeConfigurationResponse.java b/sdk/src/main/java/io/dapr/client/domain/UnsubscribeConfigurationResponse.java new file mode 100644 index 000000000..59d02eae5 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/UnsubscribeConfigurationResponse.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain; + +/** + * Domain object for unsubscribe response. + */ +public class UnsubscribeConfigurationResponse { + /** + * Boolean denoting whether unsubscribe API call is success/failure. + */ + private final boolean isUnsubscribed; + /** + * Unsubscribe API response message. + */ + private final String message; + + /** + * Constructor for UnsubscribeConfigurationResponse. + * + * @param isUnsubscribed boolean denoting unsubscribe API response. + * @param message unsubscribe API response message. + */ + public UnsubscribeConfigurationResponse(boolean isUnsubscribed, String message) { + this.isUnsubscribed = isUnsubscribed; + this.message = message; + } + + public boolean getIsUnsubscribed() { + return isUnsubscribed; + } + + public String getMessage() { + return message; + } +} diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index 2b3981dec..88f46c8b0 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -23,6 +23,9 @@ import io.dapr.client.domain.QueryStateItem; import io.dapr.client.domain.QueryStateRequest; import io.dapr.client.domain.QueryStateResponse; import io.dapr.client.domain.SubscribeConfigurationRequest; +import io.dapr.client.domain.SubscribeConfigurationResponse; +import io.dapr.client.domain.UnsubscribeConfigurationRequest; +import io.dapr.client.domain.UnsubscribeConfigurationResponse; import io.dapr.client.domain.query.Query; import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.v1.CommonProtos; @@ -87,16 +90,9 @@ public class DaprPreviewClientGrpcTest { @Test public void getConfigurationTestErrorScenario() { - assertThrows(IllegalArgumentException.class, () -> { - previewClient.getConfiguration(CONFIG_STORE_NAME, "").block(); - }); assertThrows(IllegalArgumentException.class, () -> { previewClient.getConfiguration("", "key").block(); }); - GetConfigurationRequest req = new GetConfigurationRequest(CONFIG_STORE_NAME, null); - assertThrows(IllegalArgumentException.class, () -> { - previewClient.getConfiguration(req).block(); - }); } @Test @@ -182,6 +178,7 @@ public class DaprPreviewClientGrpcTest { .build()); DaprProtos.SubscribeConfigurationResponse responseEnvelope = DaprProtos.SubscribeConfigurationResponse.newBuilder() .putAllItems(configs) + .setId("subscription_id") .build(); doAnswer((Answer) invocation -> { @@ -192,10 +189,12 @@ public class DaprPreviewClientGrpcTest { return null; }).when(daprStub).subscribeConfigurationAlpha1(any(DaprProtos.SubscribeConfigurationRequest.class), any()); - Iterator> itr = previewClient.subscribeToConfiguration(CONFIG_STORE_NAME, "configkey1").toIterable().iterator(); + Iterator itr = previewClient.subscribeConfiguration(CONFIG_STORE_NAME, "configkey1").toIterable().iterator(); assertTrue(itr.hasNext()); - assertTrue(itr.next().containsKey("configkey1")); - assertFalse(itr.hasNext()); + SubscribeConfigurationResponse res = itr.next(); + assertTrue(res.getItems().containsKey("configkey1")); + assertEquals("subscription_id", res.getSubscriptionId()); + assertFalse(itr.hasNext()); } @Test @@ -210,6 +209,7 @@ public class DaprPreviewClientGrpcTest { .build()); DaprProtos.SubscribeConfigurationResponse responseEnvelope = DaprProtos.SubscribeConfigurationResponse.newBuilder() .putAllItems(configs) + .setId("subscription_id") .build(); doAnswer((Answer) invocation -> { @@ -223,10 +223,12 @@ public class DaprPreviewClientGrpcTest { Map reqMetadata = new HashMap<>(); List keys = Arrays.asList("configkey1"); - Iterator> itr = previewClient.subscribeToConfiguration(CONFIG_STORE_NAME, "configkey1").toIterable().iterator(); + Iterator itr = previewClient.subscribeConfiguration(CONFIG_STORE_NAME, keys, reqMetadata).toIterable().iterator(); assertTrue(itr.hasNext()); - assertTrue(itr.next().containsKey("configkey1")); - assertFalse(itr.hasNext()); + SubscribeConfigurationResponse res = itr.next(); + assertTrue(res.getItems().containsKey("configkey1")); + assertEquals("subscription_id", res.getSubscriptionId()); + assertFalse(itr.hasNext()); } @Test @@ -240,16 +242,56 @@ public class DaprPreviewClientGrpcTest { }).when(daprStub).subscribeConfigurationAlpha1(any(DaprProtos.SubscribeConfigurationRequest.class), any()); assertThrowsDaprException(ExecutionException.class, () -> { - previewClient.subscribeToConfiguration(CONFIG_STORE_NAME, "key").blockFirst(); + previewClient.subscribeConfiguration(CONFIG_STORE_NAME, "key").blockFirst(); }); assertThrows(IllegalArgumentException.class, () -> { - previewClient.subscribeToConfiguration("", "key").blockFirst(); + previewClient.subscribeConfiguration("", "key").blockFirst(); + }); + } + + @Test + public void unsubscribeConfigurationTest() { + DaprProtos.UnsubscribeConfigurationResponse responseEnvelope = DaprProtos.UnsubscribeConfigurationResponse.newBuilder() + .setOk(true) + .setMessage("unsubscribed_message") + .build(); + + doAnswer((Answer) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onNext(responseEnvelope); + observer.onCompleted(); + return null; + }).when(daprStub).unsubscribeConfigurationAlpha1(any(DaprProtos.UnsubscribeConfigurationRequest.class), any()); + + UnsubscribeConfigurationResponse + response = previewClient.unsubscribeConfiguration("subscription_id", CONFIG_STORE_NAME).block(); + assertTrue(response.getIsUnsubscribed()); + assertEquals("unsubscribed_message", response.getMessage()); + } + + @Test + public void unsubscribeConfigurationTestWithError() { + doAnswer((Answer) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[1]; + observer.onError(new RuntimeException()); + observer.onCompleted(); + return null; + }).when(daprStub).unsubscribeConfigurationAlpha1(any(DaprProtos.UnsubscribeConfigurationRequest.class), any()); + + assertThrowsDaprException(ExecutionException.class, () -> { + previewClient.unsubscribeConfiguration("subscription_id", CONFIG_STORE_NAME).block(); }); - SubscribeConfigurationRequest req = new SubscribeConfigurationRequest(CONFIG_STORE_NAME, null); assertThrows(IllegalArgumentException.class, () -> { - previewClient.subscribeToConfiguration(req).blockFirst(); + previewClient.unsubscribeConfiguration("", CONFIG_STORE_NAME).block(); + }); + + UnsubscribeConfigurationRequest req = new UnsubscribeConfigurationRequest("subscription_id", ""); + assertThrows(IllegalArgumentException.class, () -> { + previewClient.unsubscribeConfiguration(req).block(); }); } diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientHttpTest.java index 121db19b4..c14b1b5c0 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientHttpTest.java @@ -13,24 +13,36 @@ limitations under the License. package io.dapr.client; -import io.dapr.client.domain.QueryStateRequest; -import io.dapr.client.domain.QueryStateResponse; +import io.dapr.client.domain.*; import io.dapr.client.domain.query.Query; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; import io.dapr.utils.TypeRef; +import io.dapr.v1.DaprProtos; +import io.grpc.stub.StreamObserver; import okhttp3.OkHttpClient; import okhttp3.mock.Behavior; import okhttp3.mock.MockInterceptor; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; +import org.mockito.stubbing.Answer; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static io.dapr.utils.TestUtils.assertThrowsDaprException; +import static org.junit.Assert.*; +import static org.junit.Assert.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; public class DaprPreviewClientHttpTest { - private static final String CONFIG_STORE_NAME = "MyConfigurationStore"; + private static final String CONFIG_STORE_NAME = "MyConfigStore"; private DaprPreviewClient daprPreviewClientHttp; @@ -48,27 +60,6 @@ public class DaprPreviewClientHttpTest { daprPreviewClientHttp = new DaprClientHttp(daprHttp); } - @Test - public void getConfigurationWithSingleKey() { - assertThrows(DaprException.class, () -> { - daprPreviewClientHttp.getConfiguration(CONFIG_STORE_NAME, "key").block(); - }); - } - - @Test - public void getConfiguration() { - assertThrows(DaprException.class, () -> { - daprPreviewClientHttp.getConfiguration(CONFIG_STORE_NAME, "key1", "key2").block(); - }); - } - - @Test - public void subscribeConfigurations() { - assertThrows(DaprException.class, () -> { - daprPreviewClientHttp.subscribeToConfiguration(CONFIG_STORE_NAME, "key1", "key2").blockFirst(); - }); - } - @Test public void queryStateExceptionsTest() { assertThrows(IllegalArgumentException.class, () -> { @@ -120,4 +111,118 @@ public class DaprPreviewClientHttpTest { assertEquals("result must be same", "testData", response.getResults().get(0).getValue()); assertEquals("result must be same", "6f54ad94-dfb9-46f0-a371-e42d550adb7d", response.getResults().get(0).getEtag()); } + + @Test + public void getConfigurationTestErrorScenario() { + assertThrows(IllegalArgumentException.class, () -> { + daprPreviewClientHttp.getConfiguration("", "key").block(); + }); + assertThrows(IllegalArgumentException.class, () -> { + daprPreviewClientHttp.getConfiguration(" ", "key").block(); + }); + } + + @Test + public void getConfigurationTest() { + mockInterceptor.addRule() + .get() + .path("/v1.0-alpha1/configuration/MyConfigStore") + .param("key","configkey1") + .respond("{\"configkey1\" : {\"value\" : \"configvalue1\",\"version\" : \"1\"}}"); + + ConfigurationItem ci = daprPreviewClientHttp.getConfiguration(CONFIG_STORE_NAME, "configkey1").block(); + assertNotNull(ci); + assertEquals("configkey1", ci.getKey()); + assertEquals("configvalue1", ci.getValue()); + assertEquals("1", ci.getVersion()); + } + + @Test + public void getAllConfigurationTest() { + mockInterceptor.addRule() + .get() + .path("/v1.0-alpha1/configuration/MyConfigStore") + .respond("{\"configkey1\" : {\"value\" : \"configvalue1\",\"version\" : \"1\"}}"); + + ConfigurationItem ci = daprPreviewClientHttp.getConfiguration(CONFIG_STORE_NAME, "configkey1").block(); + assertNotNull(ci); + assertEquals("configkey1", ci.getKey()); + assertEquals("configvalue1", ci.getValue()); + assertEquals("1", ci.getVersion()); + } + + @Test + public void subscribeConfigurationTest() { + mockInterceptor.addRule() + .get() + .path("/v1.0-alpha1/configuration/MyConfigStore/subscribe") + .param("key", "configkey1") + .respond("{\"id\":\"1234\"}"); + + Iterator itr = daprPreviewClientHttp.subscribeConfiguration(CONFIG_STORE_NAME, "configkey1").toIterable().iterator(); + assertTrue(itr.hasNext()); + SubscribeConfigurationResponse res = itr.next(); + assertEquals("1234", res.getSubscriptionId()); + assertFalse(itr.hasNext()); + } + + @Test + public void subscribeAllConfigurationTest() { + mockInterceptor.addRule() + .get() + .path("/v1.0-alpha1/configuration/MyConfigStore/subscribe") + .respond("{\"id\":\"1234\"}"); + + Iterator itr = daprPreviewClientHttp.subscribeConfiguration(CONFIG_STORE_NAME, "configkey1").toIterable().iterator(); + assertTrue(itr.hasNext()); + SubscribeConfigurationResponse res = itr.next(); + assertEquals("1234", res.getSubscriptionId()); + assertFalse(itr.hasNext()); + } + + @Test + public void unsubscribeConfigurationTest() { + mockInterceptor.addRule() + .get() + .path("/v1.0-alpha1/configuration/MyConfigStore/1234/unsubscribe") + .respond("{\"ok\": true}"); + + UnsubscribeConfigurationResponse res = daprPreviewClientHttp.unsubscribeConfiguration("1234", CONFIG_STORE_NAME).block(); + assertTrue(res.getIsUnsubscribed()); + } + + @Test + public void unsubscribeConfigurationTestWithError() { + assertThrows(IllegalArgumentException.class, () -> { + daprPreviewClientHttp.unsubscribeConfiguration("", CONFIG_STORE_NAME).block(); + }); + + UnsubscribeConfigurationRequest req = new UnsubscribeConfigurationRequest("subscription_id", ""); + assertThrows(IllegalArgumentException.class, () -> { + daprPreviewClientHttp.unsubscribeConfiguration(req).block(); + }); + + mockInterceptor.addRule() + .get() + .path("/v1.0-alpha1/configuration/MyConfigStore/1234/unsubscribe") + .respond("{\"ok\": false, \"message\": \"some error while unsubscribing\"}"); + UnsubscribeConfigurationResponse res = daprPreviewClientHttp.unsubscribeConfiguration("1234", CONFIG_STORE_NAME).block(); + assertFalse(res.getIsUnsubscribed()); + } + + @Test + public void subscribeConfigurationTestWithError() { + assertThrows(IllegalArgumentException.class, () -> { + daprPreviewClientHttp.subscribeConfiguration("", "key1").blockFirst(); + }); + + mockInterceptor.addRule() + .get() + .path("/v1.0-alpha1/configuration/MyConfigStore/subscribe") + .param("key", "configkey1") + .respond(500); + assertThrows(DaprException.class, () -> { + daprPreviewClientHttp.subscribeConfiguration(CONFIG_STORE_NAME, "configkey1").blockFirst(); + }); + } }