diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml
index 7c2f53ca0..f33585c60 100644
--- a/.github/workflows/validate.yml
+++ b/.github/workflows/validate.yml
@@ -150,3 +150,7 @@ jobs:
working-directory: ./examples
run: |
mm.py ./src/main/java/io/dapr/examples/unittesting/README.md
+ - name: Validate Configuration API example
+ working-directory: ./examples
+ run: |
+ mm.py ./src/main/java/io/dapr/examples/configuration/grpc/README.md
\ No newline at end of file
diff --git a/.java_header b/.java_header
new file mode 100644
index 000000000..255da1d1a
--- /dev/null
+++ b/.java_header
@@ -0,0 +1,12 @@
+^/\*$
+^ \* Copyright \d\d\d\d The Dapr Authors$
+^ \* Licensed under the Apache License, Version 2.0 \(the "License"\)\;$
+^ \* you may not use this file except in compliance with the License\.$
+^ \* You may obtain a copy of the License at$
+^ \* http://www.apache.org/licenses/LICENSE-2\.0$
+^ \* Unless required by applicable law or agreed to in writing, software$
+^ \* distributed under the License is distributed on an "AS IS" BASIS,$
+^ \* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied\.$
+^ \* See the License for the specific language governing permissions and$
+^limitations under the License\.$
+^\*/$
\ No newline at end of file
diff --git a/checkstyle.xml b/checkstyle.xml
index 2b2eab09f..6482adaca 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -51,10 +51,9 @@
-
+
-
-
+
diff --git a/examples/components/configuration/redis_configstore.yaml b/examples/components/configuration/redis_configstore.yaml
new file mode 100644
index 000000000..5b0e4090d
--- /dev/null
+++ b/examples/components/configuration/redis_configstore.yaml
@@ -0,0 +1,12 @@
+apiVersion: dapr.io/v1alpha1
+kind: Component
+metadata:
+ name: configstore
+spec:
+ type: configuration.redis
+ version: v1
+ metadata:
+ - name: redisHost
+ value: localhost:6379
+ - name: redisPassword
+ value: ""
diff --git a/examples/src/main/java/io/dapr/examples/configuration/grpc/ConfigurationClient.java b/examples/src/main/java/io/dapr/examples/configuration/grpc/ConfigurationClient.java
new file mode 100644
index 000000000..f586999bf
--- /dev/null
+++ b/examples/src/main/java/io/dapr/examples/configuration/grpc/ConfigurationClient.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2022 The Dapr Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package io.dapr.examples.configuration.grpc;
+
+import io.dapr.client.DaprClientBuilder;
+import io.dapr.client.DaprPreviewClient;
+import io.dapr.client.domain.ConfigurationItem;
+import io.dapr.client.domain.GetConfigurationRequest;
+import io.dapr.client.domain.SubscribeConfigurationRequest;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ConfigurationClient {
+
+ private static final String CONFIG_STORE_NAME = "configstore";
+
+ private static final List keys = new ArrayList<>(Arrays.asList("myconfig1", "myconfig3", "myconfig2"));
+
+ /**
+ * Executes various methods to check the different apis.
+ * @param args arguments
+ * @throws Exception throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
+ System.out.println("Using preview client...");
+ getConfigurationForaSingleKey(client);
+ getConfigurationsUsingVarargs(client);
+ getConfigurations(client);
+ subscribeConfigurationRequestWithSubscribe(client);
+ }
+ }
+
+ /**
+ * Gets configuration for a single key.
+ *
+ * @param client DaprPreviewClient object
+ */
+ public static void getConfigurationForaSingleKey(DaprPreviewClient client) {
+ System.out.println("*******trying to retrieve configuration given a single key********");
+ try {
+ Mono item = client.getConfiguration(CONFIG_STORE_NAME, keys.get(0));
+ System.out.println("Value ->" + item.block().getValue() + " key ->" + item.block().getKey());
+ } catch (Exception ex) {
+ System.out.println(ex.getMessage());
+ }
+ }
+
+ /**
+ * Gets configurations for varibale no. of arguments.
+ *
+ * @param client DaprPreviewClient object
+ */
+ public static void getConfigurationsUsingVarargs(DaprPreviewClient client) {
+ System.out.println("*******trying to retrieve configurations for a variable no. of keys********");
+ try {
+ Mono> items =
+ client.getConfiguration(CONFIG_STORE_NAME, "myconfig1", "myconfig3");
+ items.block().forEach(ConfigurationClient::print);
+ } catch (Exception ex) {
+ System.out.println(ex.getMessage());
+ }
+ }
+
+ /**
+ * Gets configurations for a list of keys.
+ *
+ * @param client DaprPreviewClient object
+ */
+ public static void getConfigurations(DaprPreviewClient client) {
+ System.out.println("*******trying to retrieve configurations for a list of keys********");
+ List keys = new ArrayList<>();
+ keys.add("myconfig1");
+ keys.add("myconfig2");
+ keys.add("myconfig3");
+ GetConfigurationRequest req = new GetConfigurationRequest(CONFIG_STORE_NAME, keys);
+ try {
+ Mono> items = client.getConfiguration(req);
+ items.block().forEach(ConfigurationClient::print);
+ } catch (Exception ex) {
+ System.out.println(ex.getMessage());
+ }
+ }
+
+ /**
+ * Subscribe to a list of keys.Optional to above iterator way of retrieving the changes
+ *
+ * @param client DaprPreviewClient object
+ */
+ public static void subscribeConfigurationRequestWithSubscribe(DaprPreviewClient client) {
+ System.out.println("*****Subscribing to keys using subscribe method: " + keys.toString() + " *****");
+ AtomicReference disposableAtomicReference = new AtomicReference<>();
+ SubscribeConfigurationRequest req = new SubscribeConfigurationRequest(CONFIG_STORE_NAME, keys);
+ Runnable subscribeTask = () -> {
+ Flux> outFlux = client.subscribeToConfiguration(req);
+ disposableAtomicReference.set(outFlux
+ .subscribe(
+ cis -> cis.forEach(ConfigurationClient::print)
+ ));
+ };
+ new Thread(subscribeTask).start();
+ try {
+ // To ensure that subscribeThread gets scheduled
+ Thread.sleep(0);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ Runnable updateKeys = () -> {
+ int i = 1;
+ while (i <= 3) {
+ executeDockerCommand(i);
+ i++;
+ }
+ };
+ new Thread(updateKeys).start();
+ try {
+ // To ensure main thread does not die before outFlux subscribe gets called
+ Thread.sleep(10000);
+ disposableAtomicReference.get().dispose();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static void print(ConfigurationItem item) {
+ System.out.println(item.getValue() + " : key ->" + item.getKey());
+ }
+
+ private static void executeDockerCommand(int postfix) {
+ String[] command = new String[] {
+ "docker", "exec", "dapr_redis", "redis-cli",
+ "SET",
+ "myconfig" + postfix, "update_myconfigvalue" + postfix + "||2"
+ };
+ ProcessBuilder processBuilder = new ProcessBuilder(command);
+ Process process = null;
+ try {
+ process = processBuilder.start();
+ process.waitFor();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/examples/src/main/java/io/dapr/examples/configuration/grpc/README.md b/examples/src/main/java/io/dapr/examples/configuration/grpc/README.md
new file mode 100644
index 000000000..b78491204
--- /dev/null
+++ b/examples/src/main/java/io/dapr/examples/configuration/grpc/README.md
@@ -0,0 +1,113 @@
+## Retrieve Configurations via Configuration API
+
+This example provides the different capabilities provided by Dapr Java SDK for Configuration. For further information about Configuration APIs please refer to [this link](https://docs.dapr.io/developing-applications/building-blocks/configuration/)
+**This API is available in Preview Mode**.
+
+### Using the ConfigurationAPI
+
+The java SDK exposes several methods for this -
+* `client.getConfiguration(...)` for getting a configuration for a single/multiple keys.
+* `client.subscribeToConfigurations(...)` for subscribing to a list of keys for any change.
+
+## Pre-requisites
+
+* [Dapr and Dapr Cli](https://docs.dapr.io/getting-started/install-dapr/).
+* Java JDK 11 (or greater): [Oracle JDK](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) or [OpenJDK](https://jdk.java.net/13/).
+* [Apache Maven](https://maven.apache.org/install.html) version 3.x.
+
+### Checking out the code
+
+Clone this repository:
+
+```sh
+git clone https://github.com/dapr/java-sdk.git
+cd java-sdk
+```
+
+Then build the Maven project:
+
+```sh
+# make sure you are in the `java-sdk` directory.
+mvn install
+```
+## Store few dummy configurations in configurationstore
+
+
+```bash
+docker exec dapr_redis redis-cli MSET myconfig1 "val1||1" myconfig2 "val2||1" myconfig3 "val3||1"
+```
+
+
+### Running the example
+
+Get into the examples' directory:
+```sh
+cd examples
+```
+
+Use the following command to run this example-
+
+
+
+```bash
+dapr run --components-path ./components/configuration --app-id configgrpc --log-level debug -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.configuration.grpc.ConfigurationClient
+```
+
+
+
+### Sample output
+```
+== APP == Using preview client...
+== APP == *******trying to retrieve configuration given a single key********
+== APP == Value ->val1 key ->myconfig1
+== APP == *******trying to retrieve configurations for a variable no. of keys********
+== APP == val1 : key ->myconfig1
+== APP == val3 : key ->myconfig3
+== APP == *******trying to retrieve configurations for a list of keys********
+== APP == val1 : key ->myconfig1
+== APP == val2 : key ->myconfig2
+== APP == val3 : key ->myconfig3
+== APP == *****Subscribing to keys using subscribe method: [myconfig1, myconfig3, myconfig2] *****
+== APP == update_myconfigvalue1 : key ->myconfig1
+== APP == update_myconfigvalue2 : key ->myconfig2
+== APP == update_myconfigvalue3 : key ->myconfig3
+
+```
+### Cleanup
+
+To stop the app, run (or press CTRL+C):
+
+
+
+```bash
+dapr stop --app-id configgrpc
+```
+
+
+
diff --git a/pom.xml b/pom.xml
index 7911a520e..f48c59634 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
UTF-8
1.39.0
3.13.0
- https://raw.githubusercontent.com/dapr/dapr/v1.4.0-rc.6/dapr/proto
+ https://raw.githubusercontent.com/dapr/dapr/v1.5.1/dapr/proto
1.6.2
3.1.1
1.8
diff --git a/sdk-tests/components/redisconfigstore.yaml b/sdk-tests/components/redisconfigstore.yaml
new file mode 100644
index 000000000..cd4a025c1
--- /dev/null
+++ b/sdk-tests/components/redisconfigstore.yaml
@@ -0,0 +1,12 @@
+apiVersion: dapr.io/v1alpha1
+kind: Component
+metadata:
+ name: redisconfigstore
+spec:
+ type: configuration.redis
+ version: v1
+ metadata:
+ - name: redisHost
+ value: localhost:6379
+ - name: redisPassword
+ value: ""
diff --git a/sdk-tests/src/test/java/io/dapr/it/configuration/grpc/ConfigurationClientIT.java b/sdk-tests/src/test/java/io/dapr/it/configuration/grpc/ConfigurationClientIT.java
new file mode 100644
index 000000000..7a6b39565
--- /dev/null
+++ b/sdk-tests/src/test/java/io/dapr/it/configuration/grpc/ConfigurationClientIT.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2021 The Dapr Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package io.dapr.it.configuration.grpc;
+
+import io.dapr.client.DaprClientBuilder;
+import io.dapr.client.DaprPreviewClient;
+import io.dapr.client.domain.ConfigurationItem;
+import io.dapr.it.BaseIT;
+import io.dapr.it.DaprRun;
+
+import org.junit.*;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+public class ConfigurationClientIT extends BaseIT {
+
+ private static final String CONFIG_STORE_NAME = "redisconfigstore";
+
+ private static DaprRun daprRun;
+
+ private static DaprPreviewClient daprPreviewClient;
+
+ private static String key = "myconfig1";
+
+ private static List keys = new ArrayList<>(Arrays.asList("myconfig1", "myconfig2", "myconfig3"));
+
+ private static String[] insertCmd = new String[] {
+ "docker", "exec", "dapr_redis", "redis-cli",
+ "MSET",
+ "myconfigkey1", "myconfigvalue1||1",
+ "myconfigkey2", "myconfigvalue2||1",
+ "myconfigkey3", "myconfigvalue3||1"
+ };
+
+ private static String[] updateCmd = new String[] {
+ "docker", "exec", "dapr_redis", "redis-cli",
+ "MSET",
+ "myconfigkey1", "update_myconfigvalue1||2",
+ "myconfigkey2", "update_myconfigvalue2||2",
+ "myconfigkey3", "update_myconfigvalue3||2"
+ };
+
+ @BeforeClass
+ public static void init() throws Exception {
+ daprRun = startDaprApp(ConfigurationClientIT.class.getSimpleName(), 5000);
+ daprRun.switchToGRPC();
+ daprPreviewClient = new DaprClientBuilder().buildPreviewClient();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ daprPreviewClient.close();
+ }
+
+ @Before
+ public void setupConfigStore() {
+ executeDockerCommand(insertCmd);
+ }
+
+ @Test
+ public void getConfiguration() {
+ ConfigurationItem ci = daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "myconfigkey1").block();
+ assertEquals(ci.getKey(), "myconfigkey1");
+ assertEquals(ci.getValue(), "myconfigvalue1");
+ }
+
+ @Test
+ public void getConfigurationWithEmptyKey() {
+ assertThrows(IllegalArgumentException.class, () -> {
+ daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "").block();
+ });
+ }
+
+ @Test
+ public void getConfigurations() {
+ List cis = daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2").block();
+ assertTrue(cis.size() == 2);
+ assertEquals(cis.get(0).getKey(), "myconfigkey1");
+ assertEquals(cis.get(1).getValue(), "myconfigvalue2");
+ }
+
+ @Test
+ public void getConfigurationsWithEmptyList() {
+ List listOfKeys = new ArrayList<>();
+ Map metadata = new HashMap<>();
+ assertThrows(IllegalArgumentException.class, () -> {
+ daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, listOfKeys, metadata).block();
+ });
+ }
+
+ @Test
+ public void subscribeToConfiguration() {
+ List updatedValues = new ArrayList<>();
+ AtomicReference disposable = new AtomicReference<>();
+ Runnable subscribeTask = () -> {
+ Flux> outFlux = daprPreviewClient
+ .subscribeToConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2");
+ disposable.set(outFlux.subscribe(update -> {
+ updatedValues.add(update.get(0).getValue());
+ }));
+ };
+ Thread subscribeThread = new Thread(subscribeTask);
+ subscribeThread.start();
+ try {
+ // To ensure that subscribeThread gets scheduled
+ Thread.sleep(0);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ Runnable updateKeys = () -> {
+ executeDockerCommand(updateCmd);
+ };
+ new Thread(updateKeys).start();
+ try {
+ // To ensure main thread does not die before outFlux subscribe gets called
+ Thread.sleep(3000);
+ disposable.get().dispose();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ assertEquals(updatedValues.size(), 2);
+ assertTrue(updatedValues.contains("update_myconfigvalue1"));
+ assertTrue(updatedValues.contains("update_myconfigvalue2"));
+ assertFalse(updatedValues.contains("update_myconfigvalue3"));
+ }
+
+ private static void executeDockerCommand(String[] command) {
+ ProcessBuilder processBuilder = new ProcessBuilder(command);
+ Process process = null;
+ try {
+ process = processBuilder.start();
+ process.waitFor();
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java
index b8cf2af42..cfc4b4733 100644
--- a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java
+++ b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java
@@ -13,37 +13,33 @@ limitations under the License.
package io.dapr.client;
+import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.DeleteStateRequest;
-import io.dapr.client.domain.DeleteStateRequestBuilder;
import io.dapr.client.domain.ExecuteStateTransactionRequest;
-import io.dapr.client.domain.ExecuteStateTransactionRequestBuilder;
import io.dapr.client.domain.GetBulkSecretRequest;
-import io.dapr.client.domain.GetBulkSecretRequestBuilder;
import io.dapr.client.domain.GetBulkStateRequest;
-import io.dapr.client.domain.GetBulkStateRequestBuilder;
+import io.dapr.client.domain.GetConfigurationRequest;
import io.dapr.client.domain.GetSecretRequest;
-import io.dapr.client.domain.GetSecretRequestBuilder;
import io.dapr.client.domain.GetStateRequest;
-import io.dapr.client.domain.GetStateRequestBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
-import io.dapr.client.domain.InvokeBindingRequestBuilder;
import io.dapr.client.domain.InvokeMethodRequest;
-import io.dapr.client.domain.InvokeMethodRequestBuilder;
import io.dapr.client.domain.PublishEventRequest;
-import io.dapr.client.domain.PublishEventRequestBuilder;
import io.dapr.client.domain.SaveStateRequest;
-import io.dapr.client.domain.SaveStateRequestBuilder;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
+import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Abstract class with convenient methods common between client implementations.
@@ -52,7 +48,7 @@ import java.util.Map;
* @see io.dapr.client.DaprClientGrpc
* @see io.dapr.client.DaprClientHttp
*/
-abstract class AbstractDaprClient implements DaprClient {
+abstract class AbstractDaprClient implements DaprClient, DaprPreviewClient {
/**
* A utility class for serialize and deserialize the transient objects.
@@ -416,4 +412,72 @@ abstract class AbstractDaprClient implements DaprClient {
return this.getBulkSecret(request).defaultIfEmpty(Collections.emptyMap());
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono getConfiguration(String storeName, String key) {
+ GetConfigurationRequest request = new GetConfigurationRequest(storeName, filterEmptyKeys(key));
+ return this.getConfiguration(request).map(data -> data.get(0));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono getConfiguration(String storeName, String key, Map metadata) {
+ GetConfigurationRequest request = new GetConfigurationRequest(storeName, filterEmptyKeys(key));
+ request.setMetadata(metadata);
+ return this.getConfiguration(request).map(data -> data.get(0));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono> getConfiguration(String storeName, String... keys) {
+ List listOfKeys = filterEmptyKeys(keys);
+ GetConfigurationRequest request = new GetConfigurationRequest(storeName, listOfKeys);
+ return this.getConfiguration(request);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono> getConfiguration(
+ String storeName,
+ List keys,
+ Map metadata) {
+ GetConfigurationRequest request = new GetConfigurationRequest(storeName, keys);
+ request.setMetadata(metadata);
+ return this.getConfiguration(request);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Flux> subscribeToConfiguration(String storeName, String... keys) {
+ List listOfKeys = filterEmptyKeys(keys);
+ SubscribeConfigurationRequest request = new SubscribeConfigurationRequest(storeName, listOfKeys);
+ return this.subscribeToConfiguration(request);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Flux> subscribeToConfiguration(
+ String storeName,
+ List keys,
+ Map metadata) {
+ SubscribeConfigurationRequest request = new SubscribeConfigurationRequest(storeName, keys);
+ request.setMetadata(metadata);
+ return this.subscribeToConfiguration(request);
+ }
+
+ private List filterEmptyKeys(String... keys) {
+ return Arrays.stream(keys)
+ .filter(key -> !key.trim().isEmpty())
+ .collect(Collectors.toList());
+ }
}
diff --git a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java
index ba4a2229f..855c6013e 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClientBuilder.java
@@ -117,6 +117,16 @@ public class DaprClientBuilder {
return buildDaprClient(this.apiProtocol);
}
+ /**
+ * Build an instance of the Client based on the provided setup.
+ *
+ * @return an instance of the setup Client
+ * @throws IllegalStateException if any required field is missing
+ */
+ public DaprPreviewClient buildPreviewClient() {
+ return (DaprPreviewClient) buildDaprClient(this.apiProtocol);
+ }
+
/**
* Creates an instance of a Dapr Client based on the chosen protocol.
*
diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
index d829cce4c..401b7690d 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
@@ -17,10 +17,12 @@ import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
+import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.ExecuteStateTransactionRequest;
import io.dapr.client.domain.GetBulkSecretRequest;
import io.dapr.client.domain.GetBulkStateRequest;
+import io.dapr.client.domain.GetConfigurationRequest;
import io.dapr.client.domain.GetSecretRequest;
import io.dapr.client.domain.GetStateRequest;
import io.dapr.client.domain.HttpExtension;
@@ -30,6 +32,7 @@ import io.dapr.client.domain.PublishEventRequest;
import io.dapr.client.domain.SaveStateRequest;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
+import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
@@ -48,6 +51,8 @@ import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.stub.StreamObserver;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.context.Context;
@@ -677,6 +682,100 @@ public class DaprClientGrpc extends AbstractDaprClient {
).then();
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono> getConfiguration(GetConfigurationRequest request) {
+ try {
+ final String configurationStoreName = request.getStoreName();
+ final Map metadata = request.getMetadata();
+ final List keys = request.getKeys();
+ if ((configurationStoreName == null) || (configurationStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("Configuration Store Name cannot be null or empty.");
+ }
+ if (keys.isEmpty()) {
+ throw new IllegalArgumentException("Keys can not be empty or null");
+ }
+ DaprProtos.GetConfigurationRequest.Builder builder = DaprProtos.GetConfigurationRequest.newBuilder()
+ .setStoreName(configurationStoreName).addAllKeys(keys);
+ if (metadata != null) {
+ builder.putAllMetadata(metadata);
+ }
+
+ DaprProtos.GetConfigurationRequest envelope = builder.build();
+ return this.getConfigurationAlpha1(envelope);
+
+ } catch (Exception ex) {
+ return DaprException.wrapMono(ex);
+ }
+ }
+
+ private Mono> getConfigurationAlpha1(DaprProtos.GetConfigurationRequest envelope) {
+ return Mono.subscriberContext().flatMap(
+ context ->
+ this.createMono(
+ it -> intercept(context, asyncStub).getConfigurationAlpha1(envelope, it)
+ )
+ ).map(
+ it ->
+ it.getItemsList().stream()
+ .map(this::buildConfigurationItem).collect(Collectors.toList())
+ );
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Flux> subscribeToConfiguration(SubscribeConfigurationRequest request) {
+ try {
+ final String configurationStoreName = request.getStoreName();
+ final List keys = request.getKeys();
+ final Map metadata = request.getMetadata();
+
+ if (configurationStoreName == null || (configurationStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("Configuration Store Name can not be null or empty.");
+ }
+ if (keys.isEmpty()) {
+ throw new IllegalArgumentException("Keys can not be null or empty.");
+ }
+ DaprProtos.SubscribeConfigurationRequest.Builder builder = DaprProtos.SubscribeConfigurationRequest.newBuilder()
+ .setStoreName(configurationStoreName)
+ .addAllKeys(keys);
+ if (metadata != null) {
+ builder.putAllMetadata(metadata);
+ }
+
+ DaprProtos.SubscribeConfigurationRequest envelope = builder.build();
+ return this.createFlux(
+ it -> intercept(asyncStub).subscribeConfigurationAlpha1(envelope, it)
+ ).map(
+ it ->
+ it.getItemsList().stream()
+ .map(this::buildConfigurationItem).collect(Collectors.toList())
+ );
+ } catch (Exception ex) {
+ return DaprException.wrapFlux(ex);
+ }
+ }
+
+ /**
+ * Build a new Configuration Item from provided parameter.
+ *
+ * @param configurationItem CommonProtos.ConfigurationItem
+ * @return io.dapr.client.domain.ConfigurationItem
+ */
+ private ConfigurationItem buildConfigurationItem(
+ CommonProtos.ConfigurationItem configurationItem) {
+ return new ConfigurationItem(
+ configurationItem.getKey(),
+ configurationItem.getValue(),
+ configurationItem.getVersion(),
+ configurationItem.getMetadataMap()
+ );
+ }
+
/**
* Populates GRPC client with interceptors.
*
@@ -722,6 +821,10 @@ public class DaprClientGrpc extends AbstractDaprClient {
return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
}
+ private Flux createFlux(Consumer> consumer) {
+ return Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
+ }
+
private StreamObserver createStreamObserver(MonoSink sink) {
return new StreamObserver() {
@Override
@@ -740,4 +843,23 @@ public class DaprClientGrpc extends AbstractDaprClient {
}
};
}
+
+ private StreamObserver createStreamObserver(FluxSink sink) {
+ return new StreamObserver() {
+ @Override
+ public void onNext(T value) {
+ sink.next(value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ sink.error(DaprException.propagate(new ExecutionException(t)));
+ }
+
+ @Override
+ public void onCompleted() {
+ sink.complete();
+ }
+ };
+ }
}
diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java
index df672e201..ed4522c99 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java
@@ -15,10 +15,12 @@ package io.dapr.client;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Strings;
+import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.ExecuteStateTransactionRequest;
import io.dapr.client.domain.GetBulkSecretRequest;
import io.dapr.client.domain.GetBulkStateRequest;
+import io.dapr.client.domain.GetConfigurationRequest;
import io.dapr.client.domain.GetSecretRequest;
import io.dapr.client.domain.GetStateRequest;
import io.dapr.client.domain.HttpExtension;
@@ -28,6 +30,7 @@ import io.dapr.client.domain.PublishEventRequest;
import io.dapr.client.domain.SaveStateRequest;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
+import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.client.domain.TransactionalStateRequest;
import io.dapr.config.Properties;
@@ -36,6 +39,7 @@ import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.NetworkUtils;
import io.dapr.utils.TypeRef;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
@@ -665,6 +669,22 @@ public class DaprClientHttp extends AbstractDaprClient {
.then();
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono> getConfiguration(GetConfigurationRequest request) {
+ return DaprException.wrapMono(new UnsupportedOperationException());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Flux> subscribeToConfiguration(SubscribeConfigurationRequest request) {
+ return DaprException.wrapFlux(new UnsupportedOperationException());
+ }
+
/**
* Converts metadata map into Query params.
* @param metadata metadata map
diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
new file mode 100644
index 000000000..5e41f5e35
--- /dev/null
+++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2022 The Dapr Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package io.dapr.client;
+
+import io.dapr.client.domain.ConfigurationItem;
+import io.dapr.client.domain.GetConfigurationRequest;
+import io.dapr.client.domain.SubscribeConfigurationRequest;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Generic client interface for preview or alpha APIs in Dapr, regardless of GRPC or HTTP.
+ *
+ * @see io.dapr.client.DaprClientBuilder for information on how to make instance for this interface.
+ */
+public interface DaprPreviewClient extends AutoCloseable {
+
+ /**
+ * Retrieve a configuration based on a provided key.
+ *
+ * @param storeName Name of the configuration store
+ * @param key key of the configuration item which is to be retrieved
+ * @return Mono of the Configuration Item
+ */
+ Mono getConfiguration(String storeName, String key);
+
+ /**
+ * Retrieve a configuration based on a provided key.
+ *
+ * @param storeName Name of the configuration store
+ * @param key key of the configuration item which is to be retrieved
+ * @param metadata optional metadata
+ * @return Mono of the Configuration Item
+ */
+ Mono getConfiguration(String storeName, String key, Map metadata);
+
+ /**
+ * Retrieve List of configurations based on a provided variable number of keys.
+ *
+ * @param storeName Name of the configuration store
+ * @param keys keys of the configurations which are to be retrieved
+ * @return Mono of List of ConfigurationItems
+ */
+ Mono> getConfiguration(String storeName, String... keys);
+
+ /**
+ * Retrieve List of configurations based on a provided variable number of keys.
+ *
+ * @param storeName Name of the configuration store
+ * @param keys keys of the configurations which are to be retrieved
+ * @param metadata optional metadata
+ * @return Mono of List of ConfigurationItems
+ */
+ Mono> getConfiguration(String storeName, List keys, Map metadata);
+
+ /**
+ * Retrieve List of configurations based on a provided configuration request object.
+ *
+ * @param request request for retrieving Configurations for a list keys
+ * @return Mono of List of ConfigurationItems
+ */
+
+ Mono> getConfiguration(GetConfigurationRequest request);
+
+ /**
+ * Subscribe to the keys for any change.
+ *
+ * @param storeName Name of the configuration store
+ * @param keys keys of the configurations which are to be subscribed
+ * @return Flux of List of configuration items
+ */
+ Flux> subscribeToConfiguration(String storeName, String... keys);
+
+ /**
+ * Subscribe to the keys for any change.
+ *
+ * @param storeName Name of the configuration store
+ * @param keys keys of the configurations which are to be subscribed
+ * @param metadata optional metadata
+ * @return Flux of List of configuration items
+ */
+ Flux> subscribeToConfiguration(String storeName, List keys,
+ Map metadata);
+
+ /**
+ * Subscribe to the keys for any change.
+ *
+ * @param request request for subscribing to any change for the given keys in request
+ * @return Flux of List of configuration items
+ */
+ Flux> subscribeToConfiguration(SubscribeConfigurationRequest request);
+}
diff --git a/sdk/src/main/java/io/dapr/client/domain/ConfigurationItem.java b/sdk/src/main/java/io/dapr/client/domain/ConfigurationItem.java
new file mode 100644
index 000000000..df6ab818c
--- /dev/null
+++ b/sdk/src/main/java/io/dapr/client/domain/ConfigurationItem.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2022 The Dapr Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package io.dapr.client.domain;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A configuration item from Dapr's configuration store.
+ */
+public class ConfigurationItem {
+ private final String key;
+ private final String value;
+ private final String version;
+ private final Map metadata;
+
+ /**
+ * Constructor for a configuration Item.
+ *
+ * @param key key of the configuration item
+ * @param value value for the provided key
+ * @param version version of the configuration item
+ * @param metadata additional information
+ */
+ public ConfigurationItem(String key, String value, String version, Map metadata) {
+ this.key = key;
+ this.value = value;
+ this.version = version;
+ this.metadata = Collections.unmodifiableMap(metadata);
+ }
+
+ /**
+ * Constructor for a configuration Item.
+ *
+ * @param key key of the configuration item
+ * @param value value for the provided key
+ * @param version version of the configuration item
+ */
+ public ConfigurationItem(String key, String value, String version) {
+ this.key = key;
+ this.value = value;
+ this.version = version;
+ this.metadata = Collections.emptyMap();
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public Map getMetadata() {
+ return metadata;
+ }
+}
diff --git a/sdk/src/main/java/io/dapr/client/domain/GetConfigurationRequest.java b/sdk/src/main/java/io/dapr/client/domain/GetConfigurationRequest.java
new file mode 100644
index 000000000..7e0d13c54
--- /dev/null
+++ b/sdk/src/main/java/io/dapr/client/domain/GetConfigurationRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2022 The Dapr Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package io.dapr.client.domain;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Request to get one or more configuration items from Dapr's configuration store.
+ */
+public class GetConfigurationRequest {
+ private final String storeName;
+ private final List keys;
+ private Map metadata;
+
+ public GetConfigurationRequest(String storeName, List keys) {
+ this.storeName = storeName;
+ this.keys = keys == null ? Collections.EMPTY_LIST : Collections.unmodifiableList(keys);
+ }
+
+ public GetConfigurationRequest setMetadata(Map metadata) {
+ this.metadata = metadata == null ? null : Collections.unmodifiableMap(metadata);
+ return this;
+ }
+
+ public String getStoreName() {
+ return storeName;
+ }
+
+ public List getKeys() {
+ return keys;
+ }
+
+ public Map getMetadata() {
+ return metadata;
+ }
+}
diff --git a/sdk/src/main/java/io/dapr/client/domain/SubscribeConfigurationRequest.java b/sdk/src/main/java/io/dapr/client/domain/SubscribeConfigurationRequest.java
new file mode 100644
index 000000000..0684dce5e
--- /dev/null
+++ b/sdk/src/main/java/io/dapr/client/domain/SubscribeConfigurationRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2022 The Dapr Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package io.dapr.client.domain;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Request to subscribe to one or more configuration items.
+ */
+public class SubscribeConfigurationRequest {
+ private final String storeName;
+ private final List keys;
+ private Map metadata;
+
+ public SubscribeConfigurationRequest(String storeName, List keys) {
+ this.storeName = storeName;
+ this.keys = keys == null ? Collections.EMPTY_LIST : Collections.unmodifiableList(keys);
+ }
+
+ public SubscribeConfigurationRequest setMetadata(Map metadata) {
+ this.metadata = metadata == null ? null : Collections.unmodifiableMap(metadata);
+ return this;
+ }
+
+ public String getStoreName() {
+ return storeName;
+ }
+
+ public List getKeys() {
+ return keys;
+ }
+
+ public Map getMetadata() {
+ return metadata;
+ }
+}
diff --git a/sdk/src/main/java/io/dapr/exceptions/DaprException.java b/sdk/src/main/java/io/dapr/exceptions/DaprException.java
index f8fdae376..0771e1fac 100644
--- a/sdk/src/main/java/io/dapr/exceptions/DaprException.java
+++ b/sdk/src/main/java/io/dapr/exceptions/DaprException.java
@@ -15,6 +15,7 @@ package io.dapr.exceptions;
import io.grpc.StatusRuntimeException;
import reactor.core.Exceptions;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.Callable;
@@ -154,6 +155,23 @@ public class DaprException extends RuntimeException {
return Mono.empty();
}
+ /**
+ * Wraps an exception into DaprException (if not already DaprException).
+ *
+ * @param exception Exception to be wrapped.
+ * @param Flux's response type.
+ * @return Flux containing DaprException.
+ */
+ public static Flux wrapFlux(Exception exception) {
+ try {
+ wrap(exception);
+ } catch (Exception e) {
+ return Flux.error(e);
+ }
+
+ return Flux.empty();
+ }
+
/**
* Wraps an exception into DaprException (if not already DaprException).
*
diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
new file mode 100644
index 000000000..a5ce0a592
--- /dev/null
+++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java
@@ -0,0 +1,261 @@
+/*
+ * Copyright 2021 The Dapr Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package io.dapr.client;
+
+
+import io.dapr.client.domain.ConfigurationItem;
+import io.dapr.serializer.DefaultObjectSerializer;
+import io.dapr.v1.CommonProtos;
+import io.dapr.v1.DaprGrpc;
+import io.dapr.v1.DaprProtos;
+import io.grpc.stub.StreamObserver;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static io.dapr.utils.TestUtils.assertThrowsDaprException;
+import static org.junit.Assert.*;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+public class DaprPreviewClientGrpcTest {
+
+ private static final String CONFIG_STORE_NAME = "MyConfigStore";
+
+ private Closeable closeable;
+ private DaprGrpc.DaprStub daprStub;
+ private DaprPreviewClient previewClient;
+
+ @Before
+ public void setup() throws IOException {
+ closeable = mock(Closeable.class);
+ daprStub = mock(DaprGrpc.DaprStub.class);
+ when(daprStub.withInterceptors(any())).thenReturn(daprStub);
+ previewClient = new DaprClientGrpc(
+ closeable, daprStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
+ doNothing().when(closeable).close();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ previewClient.close();
+ verify(closeable).close();
+ verifyNoMoreInteractions(closeable);
+ }
+
+ @Test
+ public void getConfigurationTestErrorScenario() {
+ assertThrows(IllegalArgumentException.class, () -> {
+ previewClient.getConfiguration(CONFIG_STORE_NAME, "").block();
+ });
+ assertThrows(IllegalArgumentException.class, () -> {
+ previewClient.getConfiguration("", "key").block();
+ });
+ }
+
+ @Test
+ public void getSingleConfigurationTest() {
+ doAnswer((Answer) invocation -> {
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ observer.onNext(getSingleMockResponse());
+ observer.onCompleted();
+ return null;
+ }).when(daprStub).getConfigurationAlpha1(any(DaprProtos.GetConfigurationRequest.class), any());
+
+ ConfigurationItem ci = previewClient.getConfiguration(CONFIG_STORE_NAME, "configkey1").block();
+ assertEquals("configkey1", ci.getKey());
+ assertEquals("configvalue1", ci.getValue());
+ assertEquals("1", ci.getVersion());
+ }
+
+ @Test
+ public void getSingleConfigurationWithMetadataTest() {
+ doAnswer((Answer) invocation -> {
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ observer.onNext(getSingleMockResponse());
+ observer.onCompleted();
+ return null;
+ }).when(daprStub).getConfigurationAlpha1(any(DaprProtos.GetConfigurationRequest.class), any());
+
+ Map reqMetadata = new HashMap<>();
+ reqMetadata.put("meta1", "value1");
+ ConfigurationItem ci = previewClient.getConfiguration(CONFIG_STORE_NAME, "configkey1", reqMetadata).block();
+ assertEquals("configkey1", ci.getKey());
+ assertEquals("configvalue1", ci.getValue());
+ assertEquals("1", ci.getVersion());
+ }
+
+ @Test
+ public void getMultipleConfigurationTest() {
+ doAnswer((Answer) invocation -> {
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ observer.onNext(getMultipleMockResponse());
+ observer.onCompleted();
+ return null;
+ }).when(daprStub).getConfigurationAlpha1(any(DaprProtos.GetConfigurationRequest.class), any());
+
+ List cis = previewClient.getConfiguration(CONFIG_STORE_NAME, "configkey1","configkey2").block();
+ assertEquals(2, cis.size());
+ assertEquals("configkey1", cis.stream().findFirst().get().getKey());
+ assertEquals("configvalue1", cis.stream().findFirst().get().getValue());
+ assertEquals("1", cis.stream().findFirst().get().getVersion());
+
+ assertEquals("configkey2", cis.stream().skip(1).findFirst().get().getKey());
+ assertEquals("configvalue2", cis.stream().skip(1).findFirst().get().getValue());
+ assertEquals("1", cis.stream().skip(1).findFirst().get().getVersion());
+ }
+
+ @Test
+ public void getMultipleConfigurationWithMetadataTest() {
+ doAnswer((Answer) invocation -> {
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ observer.onNext(getMultipleMockResponse());
+ observer.onCompleted();
+ return null;
+ }).when(daprStub).getConfigurationAlpha1(any(DaprProtos.GetConfigurationRequest.class), any());
+
+ Map reqMetadata = new HashMap<>();
+ reqMetadata.put("meta1", "value1");
+ List keys = Arrays.asList("configkey1","configkey2");
+ List cis = previewClient.getConfiguration(CONFIG_STORE_NAME, keys, reqMetadata).block();
+ assertEquals(2, cis.size());
+ assertEquals("configkey1", cis.stream().findFirst().get().getKey());
+ assertEquals("configvalue1", cis.stream().findFirst().get().getValue());
+ }
+
+ @Test
+ public void subscribeConfigurationTest() {
+ Map metadata = new HashMap<>();
+ metadata.put("meta1", "value1");
+ DaprProtos.SubscribeConfigurationResponse responseEnvelope = DaprProtos.SubscribeConfigurationResponse.newBuilder()
+ .addItems(CommonProtos.ConfigurationItem.newBuilder()
+ .setKey("configkey1")
+ .setValue("configvalue1")
+ .setVersion("1")
+ .putAllMetadata(metadata)
+ .build())
+ .build();
+
+ doAnswer((Answer) invocation -> {
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ observer.onNext(responseEnvelope);
+ observer.onCompleted();
+ return null;
+ }).when(daprStub).subscribeConfigurationAlpha1(any(DaprProtos.SubscribeConfigurationRequest.class), any());
+
+ Iterator> itr = previewClient.subscribeToConfiguration(CONFIG_STORE_NAME, "configkey1").toIterable().iterator();
+ assertTrue(itr.hasNext());
+ assertEquals("configkey1", itr.next().get(0).getKey());
+ assertFalse(itr.hasNext());
+ }
+
+ @Test
+ public void subscribeConfigurationTestWithMetadata() {
+ Map metadata = new HashMap<>();
+ metadata.put("meta1", "value1");
+ DaprProtos.SubscribeConfigurationResponse responseEnvelope = DaprProtos.SubscribeConfigurationResponse.newBuilder()
+ .addItems(CommonProtos.ConfigurationItem.newBuilder()
+ .setKey("configkey1")
+ .setValue("configvalue1")
+ .setVersion("1")
+ .putAllMetadata(metadata)
+ .build())
+ .build();
+
+ doAnswer((Answer) invocation -> {
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ observer.onNext(responseEnvelope);
+ observer.onCompleted();
+ return null;
+ }).when(daprStub).subscribeConfigurationAlpha1(any(DaprProtos.SubscribeConfigurationRequest.class), any());
+
+ Map reqMetadata = new HashMap<>();
+ List keys = Arrays.asList("configkey1");
+
+ Iterator> itr = previewClient.subscribeToConfiguration(CONFIG_STORE_NAME, keys, reqMetadata).toIterable().iterator();
+ assertTrue(itr.hasNext());
+ assertEquals("configkey1", itr.next().get(0).getKey());
+ assertFalse(itr.hasNext());
+ }
+
+ @Test
+ public void subscribeConfigurationWithErrorTest() {
+ doAnswer((Answer) invocation -> {
+ StreamObserver observer =
+ (StreamObserver) invocation.getArguments()[1];
+ observer.onError(new RuntimeException());
+ observer.onCompleted();
+ return null;
+ }).when(daprStub).subscribeConfigurationAlpha1(any(DaprProtos.SubscribeConfigurationRequest.class), any());
+
+ assertThrowsDaprException(ExecutionException.class, () -> {
+ previewClient.subscribeToConfiguration(CONFIG_STORE_NAME, "key").blockFirst();
+ });
+
+ assertThrows(IllegalArgumentException.class, () -> {
+ previewClient.subscribeToConfiguration("", "key").blockFirst();
+ });
+ }
+
+ private DaprProtos.GetConfigurationResponse getSingleMockResponse() {
+ Map metadata = new HashMap<>();
+ metadata.put("meta1", "value1");
+ DaprProtos.GetConfigurationResponse responseEnvelope = DaprProtos.GetConfigurationResponse.newBuilder()
+ .addItems(CommonProtos.ConfigurationItem.newBuilder()
+ .setKey("configkey1")
+ .setValue("configvalue1")
+ .setVersion("1")
+ .putAllMetadata(metadata)
+ .build()
+ ).build();
+ return responseEnvelope;
+ }
+
+ private DaprProtos.GetConfigurationResponse getMultipleMockResponse() {
+ Map metadata = new HashMap<>();
+ metadata.put("meta1", "value1");
+ DaprProtos.GetConfigurationResponse responseEnvelope = DaprProtos.GetConfigurationResponse.newBuilder()
+ .addItems(CommonProtos.ConfigurationItem.newBuilder()
+ .setKey("configkey1")
+ .setValue("configvalue1")
+ .setVersion("1")
+ .putAllMetadata(metadata)
+ .build())
+ .addItems(CommonProtos.ConfigurationItem.newBuilder()
+ .setKey("configkey2")
+ .setValue("configvalue2")
+ .setVersion("1")
+ .putAllMetadata(metadata)
+ .build())
+ .build();
+ return responseEnvelope;
+ }
+}
diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientHttpTest.java
new file mode 100644
index 000000000..b70e223f4
--- /dev/null
+++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientHttpTest.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) Microsoft Corporation and Dapr Contributors.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.dataformat.xml.XmlMapper;
+import io.dapr.config.Properties;
+import io.dapr.exceptions.DaprException;
+import io.dapr.serializer.DaprObjectSerializer;
+import io.dapr.utils.TypeRef;
+import okhttp3.OkHttpClient;
+import okhttp3.mock.Behavior;
+import okhttp3.mock.MockInterceptor;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import static io.dapr.utils.TestUtils.findFreePort;
+import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class DaprPreviewClientHttpTest {
+ private static final String CONFIG_STORE_NAME = "MyConfigurationStore";
+
+ private DaprPreviewClient daprPreviewClientHttp;
+
+ private DaprHttp daprHttp;
+
+ private OkHttpClient okHttpClient;
+
+ private MockInterceptor mockInterceptor;
+
+ @Before
+ public void setUp() {
+ mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
+ okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
+ daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
+ daprPreviewClientHttp = new DaprClientHttp(daprHttp);
+ }
+
+ @Test
+ public void getConfigurationWithSingleKey() {
+ assertThrows(DaprException.class, () -> {
+ daprPreviewClientHttp.getConfiguration(CONFIG_STORE_NAME, "key").block();
+ });
+ }
+
+ @Test
+ public void getConfiguration() {
+ assertThrows(DaprException.class, () -> {
+ daprPreviewClientHttp.getConfiguration(CONFIG_STORE_NAME, "key1", "key2").block();
+ });
+ }
+
+ @Test
+ public void subscribeConfigurations() {
+ assertThrows(DaprException.class, () -> {
+ daprPreviewClientHttp.subscribeToConfiguration(CONFIG_STORE_NAME, "key1", "key2").blockFirst();
+ });
+ }
+}