mirror of https://github.com/dapr/java-sdk.git
Config api support (#670)
* inital draft for config api Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com> * Introducing new client for preview apis and code refactoring Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com> * Unit tests and code refactoring Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com> * Adding integration test Signed-off-by: pravinpushkar <ppushkar@microsoft.com> * Copyright changes Signed-off-by: pravinpushkar <ppushkar@microsoft.com> * Review comments fixes Signed-off-by: pravinpushkar <ppushkar@microsoft.com> * Removed DaprPreviewClientProxy and updated example README Signed-off-by: pravinpushkar <ppushkar@microsoft.com> * Adding validate workflow for cofiguration api example Signed-off-by: pravinpushkar <ppushkar@microsoft.com> * fixing example autovalidation and code coverage Signed-off-by: pravinpushkar <ppushkar@microsoft.com> * Fixing autovalidation and removing getAllConfiguration Signed-off-by: pravinpushkar <ppushkar@microsoft.com> * Fixing review comments Signed-off-by: pravinpushkar <ppushkar@microsoft.com> * Add regex header checkstyle. Signed-off-by: Artur Souza <artursouza.ms@outlook.com> * Fix headers and add javadocs to some. Signed-off-by: Artur Souza <artursouza.ms@outlook.com> Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
parent
d8b8a5b4a8
commit
185cdba293
|
@ -150,3 +150,7 @@ jobs:
|
|||
working-directory: ./examples
|
||||
run: |
|
||||
mm.py ./src/main/java/io/dapr/examples/unittesting/README.md
|
||||
- name: Validate Configuration API example
|
||||
working-directory: ./examples
|
||||
run: |
|
||||
mm.py ./src/main/java/io/dapr/examples/configuration/grpc/README.md
|
|
@ -0,0 +1,12 @@
|
|||
^/\*$
|
||||
^ \* Copyright \d\d\d\d The Dapr Authors$
|
||||
^ \* Licensed under the Apache License, Version 2.0 \(the "License"\)\;$
|
||||
^ \* you may not use this file except in compliance with the License\.$
|
||||
^ \* You may obtain a copy of the License at$
|
||||
^ \* http://www.apache.org/licenses/LICENSE-2\.0$
|
||||
^ \* Unless required by applicable law or agreed to in writing, software$
|
||||
^ \* distributed under the License is distributed on an "AS IS" BASIS,$
|
||||
^ \* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied\.$
|
||||
^ \* See the License for the specific language governing permissions and$
|
||||
^limitations under the License\.$
|
||||
^\*/$
|
|
@ -51,10 +51,9 @@
|
|||
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
|
||||
</module>
|
||||
|
||||
<module name="Header">
|
||||
<module name="RegexpHeader">
|
||||
<property name="fileExtensions" value="java"/>
|
||||
<!-- We just validate the top 2 lines so this attribute's value does not blow away. -->
|
||||
<property name="header" value='/*\n * Copyright 2021 The Dapr Authors\n * Licensed under the Apache License, Version 2.0 (the "License");\n'/>
|
||||
<property name="headerFile" value=".java_header"/>
|
||||
</module>
|
||||
|
||||
<module name="TreeWalker">
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: configstore
|
||||
spec:
|
||||
type: configuration.redis
|
||||
version: v1
|
||||
metadata:
|
||||
- name: redisHost
|
||||
value: localhost:6379
|
||||
- name: redisPassword
|
||||
value: ""
|
|
@ -0,0 +1,164 @@
|
|||
/*
|
||||
* Copyright 2022 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.configuration.grpc;
|
||||
|
||||
import io.dapr.client.DaprClientBuilder;
|
||||
import io.dapr.client.DaprPreviewClient;
|
||||
import io.dapr.client.domain.ConfigurationItem;
|
||||
import io.dapr.client.domain.GetConfigurationRequest;
|
||||
import io.dapr.client.domain.SubscribeConfigurationRequest;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class ConfigurationClient {
|
||||
|
||||
private static final String CONFIG_STORE_NAME = "configstore";
|
||||
|
||||
private static final List<String> keys = new ArrayList<>(Arrays.asList("myconfig1", "myconfig3", "myconfig2"));
|
||||
|
||||
/**
|
||||
* Executes various methods to check the different apis.
|
||||
* @param args arguments
|
||||
* @throws Exception throws Exception
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
|
||||
System.out.println("Using preview client...");
|
||||
getConfigurationForaSingleKey(client);
|
||||
getConfigurationsUsingVarargs(client);
|
||||
getConfigurations(client);
|
||||
subscribeConfigurationRequestWithSubscribe(client);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets configuration for a single key.
|
||||
*
|
||||
* @param client DaprPreviewClient object
|
||||
*/
|
||||
public static void getConfigurationForaSingleKey(DaprPreviewClient client) {
|
||||
System.out.println("*******trying to retrieve configuration given a single key********");
|
||||
try {
|
||||
Mono<ConfigurationItem> item = client.getConfiguration(CONFIG_STORE_NAME, keys.get(0));
|
||||
System.out.println("Value ->" + item.block().getValue() + " key ->" + item.block().getKey());
|
||||
} catch (Exception ex) {
|
||||
System.out.println(ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets configurations for varibale no. of arguments.
|
||||
*
|
||||
* @param client DaprPreviewClient object
|
||||
*/
|
||||
public static void getConfigurationsUsingVarargs(DaprPreviewClient client) {
|
||||
System.out.println("*******trying to retrieve configurations for a variable no. of keys********");
|
||||
try {
|
||||
Mono<List<ConfigurationItem>> items =
|
||||
client.getConfiguration(CONFIG_STORE_NAME, "myconfig1", "myconfig3");
|
||||
items.block().forEach(ConfigurationClient::print);
|
||||
} catch (Exception ex) {
|
||||
System.out.println(ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets configurations for a list of keys.
|
||||
*
|
||||
* @param client DaprPreviewClient object
|
||||
*/
|
||||
public static void getConfigurations(DaprPreviewClient client) {
|
||||
System.out.println("*******trying to retrieve configurations for a list of keys********");
|
||||
List<String> keys = new ArrayList<>();
|
||||
keys.add("myconfig1");
|
||||
keys.add("myconfig2");
|
||||
keys.add("myconfig3");
|
||||
GetConfigurationRequest req = new GetConfigurationRequest(CONFIG_STORE_NAME, keys);
|
||||
try {
|
||||
Mono<List<ConfigurationItem>> items = client.getConfiguration(req);
|
||||
items.block().forEach(ConfigurationClient::print);
|
||||
} catch (Exception ex) {
|
||||
System.out.println(ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to a list of keys.Optional to above iterator way of retrieving the changes
|
||||
*
|
||||
* @param client DaprPreviewClient object
|
||||
*/
|
||||
public static void subscribeConfigurationRequestWithSubscribe(DaprPreviewClient client) {
|
||||
System.out.println("*****Subscribing to keys using subscribe method: " + keys.toString() + " *****");
|
||||
AtomicReference<Disposable> disposableAtomicReference = new AtomicReference<>();
|
||||
SubscribeConfigurationRequest req = new SubscribeConfigurationRequest(CONFIG_STORE_NAME, keys);
|
||||
Runnable subscribeTask = () -> {
|
||||
Flux<List<ConfigurationItem>> outFlux = client.subscribeToConfiguration(req);
|
||||
disposableAtomicReference.set(outFlux
|
||||
.subscribe(
|
||||
cis -> cis.forEach(ConfigurationClient::print)
|
||||
));
|
||||
};
|
||||
new Thread(subscribeTask).start();
|
||||
try {
|
||||
// To ensure that subscribeThread gets scheduled
|
||||
Thread.sleep(0);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
Runnable updateKeys = () -> {
|
||||
int i = 1;
|
||||
while (i <= 3) {
|
||||
executeDockerCommand(i);
|
||||
i++;
|
||||
}
|
||||
};
|
||||
new Thread(updateKeys).start();
|
||||
try {
|
||||
// To ensure main thread does not die before outFlux subscribe gets called
|
||||
Thread.sleep(10000);
|
||||
disposableAtomicReference.get().dispose();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
private static void print(ConfigurationItem item) {
|
||||
System.out.println(item.getValue() + " : key ->" + item.getKey());
|
||||
}
|
||||
|
||||
private static void executeDockerCommand(int postfix) {
|
||||
String[] command = new String[] {
|
||||
"docker", "exec", "dapr_redis", "redis-cli",
|
||||
"SET",
|
||||
"myconfig" + postfix, "update_myconfigvalue" + postfix + "||2"
|
||||
};
|
||||
ProcessBuilder processBuilder = new ProcessBuilder(command);
|
||||
Process process = null;
|
||||
try {
|
||||
process = processBuilder.start();
|
||||
process.waitFor();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,113 @@
|
|||
## Retrieve Configurations via Configuration API
|
||||
|
||||
This example provides the different capabilities provided by Dapr Java SDK for Configuration. For further information about Configuration APIs please refer to [this link](https://docs.dapr.io/developing-applications/building-blocks/configuration/)
|
||||
**This API is available in Preview Mode**.
|
||||
|
||||
### Using the ConfigurationAPI
|
||||
|
||||
The java SDK exposes several methods for this -
|
||||
* `client.getConfiguration(...)` for getting a configuration for a single/multiple keys.
|
||||
* `client.subscribeToConfigurations(...)` for subscribing to a list of keys for any change.
|
||||
|
||||
## Pre-requisites
|
||||
|
||||
* [Dapr and Dapr Cli](https://docs.dapr.io/getting-started/install-dapr/).
|
||||
* Java JDK 11 (or greater): [Oracle JDK](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11) or [OpenJDK](https://jdk.java.net/13/).
|
||||
* [Apache Maven](https://maven.apache.org/install.html) version 3.x.
|
||||
|
||||
### Checking out the code
|
||||
|
||||
Clone this repository:
|
||||
|
||||
```sh
|
||||
git clone https://github.com/dapr/java-sdk.git
|
||||
cd java-sdk
|
||||
```
|
||||
|
||||
Then build the Maven project:
|
||||
|
||||
```sh
|
||||
# make sure you are in the `java-sdk` directory.
|
||||
mvn install
|
||||
```
|
||||
## Store few dummy configurations in configurationstore
|
||||
<!-- STEP
|
||||
name: Set configuration value
|
||||
expected_stdout_lines:
|
||||
- "OK"
|
||||
timeout_seconds: 20
|
||||
-->
|
||||
|
||||
```bash
|
||||
docker exec dapr_redis redis-cli MSET myconfig1 "val1||1" myconfig2 "val2||1" myconfig3 "val3||1"
|
||||
```
|
||||
<!-- END_STEP -->
|
||||
|
||||
### Running the example
|
||||
|
||||
Get into the examples' directory:
|
||||
```sh
|
||||
cd examples
|
||||
```
|
||||
|
||||
Use the following command to run this example-
|
||||
|
||||
<!-- STEP
|
||||
name: Run ConfigurationClient example
|
||||
expected_stdout_lines:
|
||||
- "== APP == Using preview client..."
|
||||
- "== APP == *******trying to retrieve configuration given a single key********"
|
||||
- "== APP == Value ->val1 key ->myconfig1"
|
||||
- "== APP == *******trying to retrieve configurations for a variable no. of keys********"
|
||||
- "== APP == val1 : key ->myconfig1"
|
||||
- "== APP == val3 : key ->myconfig3"
|
||||
- "== APP == *******trying to retrieve configurations for a list of keys********"
|
||||
- "== APP == val1 : key ->myconfig1"
|
||||
- "== APP == val2 : key ->myconfig2"
|
||||
- "== APP == val3 : key ->myconfig3"
|
||||
- "== APP == *****Subscribing to keys using subscribe method: [myconfig1, myconfig3, myconfig2] *****"
|
||||
- "== APP == update_myconfigvalue1 : key ->myconfig1"
|
||||
- "== APP == update_myconfigvalue2 : key ->myconfig2"
|
||||
- "== APP == update_myconfigvalue3 : key ->myconfig3"
|
||||
background: true
|
||||
sleep: 5
|
||||
-->
|
||||
|
||||
```bash
|
||||
dapr run --components-path ./components/configuration --app-id configgrpc --log-level debug -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.configuration.grpc.ConfigurationClient
|
||||
```
|
||||
|
||||
<!-- END_STEP -->
|
||||
|
||||
### Sample output
|
||||
```
|
||||
== APP == Using preview client...
|
||||
== APP == *******trying to retrieve configuration given a single key********
|
||||
== APP == Value ->val1 key ->myconfig1
|
||||
== APP == *******trying to retrieve configurations for a variable no. of keys********
|
||||
== APP == val1 : key ->myconfig1
|
||||
== APP == val3 : key ->myconfig3
|
||||
== APP == *******trying to retrieve configurations for a list of keys********
|
||||
== APP == val1 : key ->myconfig1
|
||||
== APP == val2 : key ->myconfig2
|
||||
== APP == val3 : key ->myconfig3
|
||||
== APP == *****Subscribing to keys using subscribe method: [myconfig1, myconfig3, myconfig2] *****
|
||||
== APP == update_myconfigvalue1 : key ->myconfig1
|
||||
== APP == update_myconfigvalue2 : key ->myconfig2
|
||||
== APP == update_myconfigvalue3 : key ->myconfig3
|
||||
|
||||
```
|
||||
### Cleanup
|
||||
|
||||
To stop the app, run (or press CTRL+C):
|
||||
|
||||
<!-- STEP
|
||||
name: Cleanup
|
||||
-->
|
||||
|
||||
```bash
|
||||
dapr stop --app-id configgrpc
|
||||
```
|
||||
|
||||
<!-- END_STEP -->
|
||||
|
2
pom.xml
2
pom.xml
|
@ -16,7 +16,7 @@
|
|||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<grpc.version>1.39.0</grpc.version>
|
||||
<protobuf.version>3.13.0</protobuf.version>
|
||||
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.4.0-rc.6/dapr/proto</dapr.proto.baseurl>
|
||||
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.5.1/dapr/proto</dapr.proto.baseurl>
|
||||
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
|
||||
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
|
||||
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: redisconfigstore
|
||||
spec:
|
||||
type: configuration.redis
|
||||
version: v1
|
||||
metadata:
|
||||
- name: redisHost
|
||||
value: localhost:6379
|
||||
- name: redisPassword
|
||||
value: ""
|
|
@ -0,0 +1,157 @@
|
|||
/*
|
||||
* Copyright 2021 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.it.configuration.grpc;
|
||||
|
||||
import io.dapr.client.DaprClientBuilder;
|
||||
import io.dapr.client.DaprPreviewClient;
|
||||
import io.dapr.client.domain.ConfigurationItem;
|
||||
import io.dapr.it.BaseIT;
|
||||
import io.dapr.it.DaprRun;
|
||||
|
||||
import org.junit.*;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class ConfigurationClientIT extends BaseIT {
|
||||
|
||||
private static final String CONFIG_STORE_NAME = "redisconfigstore";
|
||||
|
||||
private static DaprRun daprRun;
|
||||
|
||||
private static DaprPreviewClient daprPreviewClient;
|
||||
|
||||
private static String key = "myconfig1";
|
||||
|
||||
private static List<String> keys = new ArrayList<>(Arrays.asList("myconfig1", "myconfig2", "myconfig3"));
|
||||
|
||||
private static String[] insertCmd = new String[] {
|
||||
"docker", "exec", "dapr_redis", "redis-cli",
|
||||
"MSET",
|
||||
"myconfigkey1", "myconfigvalue1||1",
|
||||
"myconfigkey2", "myconfigvalue2||1",
|
||||
"myconfigkey3", "myconfigvalue3||1"
|
||||
};
|
||||
|
||||
private static String[] updateCmd = new String[] {
|
||||
"docker", "exec", "dapr_redis", "redis-cli",
|
||||
"MSET",
|
||||
"myconfigkey1", "update_myconfigvalue1||2",
|
||||
"myconfigkey2", "update_myconfigvalue2||2",
|
||||
"myconfigkey3", "update_myconfigvalue3||2"
|
||||
};
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
daprRun = startDaprApp(ConfigurationClientIT.class.getSimpleName(), 5000);
|
||||
daprRun.switchToGRPC();
|
||||
daprPreviewClient = new DaprClientBuilder().buildPreviewClient();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
daprPreviewClient.close();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupConfigStore() {
|
||||
executeDockerCommand(insertCmd);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getConfiguration() {
|
||||
ConfigurationItem ci = daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "myconfigkey1").block();
|
||||
assertEquals(ci.getKey(), "myconfigkey1");
|
||||
assertEquals(ci.getValue(), "myconfigvalue1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getConfigurationWithEmptyKey() {
|
||||
assertThrows(IllegalArgumentException.class, () -> {
|
||||
daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "").block();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getConfigurations() {
|
||||
List<ConfigurationItem> cis = daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2").block();
|
||||
assertTrue(cis.size() == 2);
|
||||
assertEquals(cis.get(0).getKey(), "myconfigkey1");
|
||||
assertEquals(cis.get(1).getValue(), "myconfigvalue2");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getConfigurationsWithEmptyList() {
|
||||
List<String> listOfKeys = new ArrayList<>();
|
||||
Map<String, String> metadata = new HashMap<>();
|
||||
assertThrows(IllegalArgumentException.class, () -> {
|
||||
daprPreviewClient.getConfiguration(CONFIG_STORE_NAME, listOfKeys, metadata).block();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void subscribeToConfiguration() {
|
||||
List<String> updatedValues = new ArrayList<>();
|
||||
AtomicReference<Disposable> disposable = new AtomicReference<>();
|
||||
Runnable subscribeTask = () -> {
|
||||
Flux<List<ConfigurationItem>> outFlux = daprPreviewClient
|
||||
.subscribeToConfiguration(CONFIG_STORE_NAME, "myconfigkey1", "myconfigkey2");
|
||||
disposable.set(outFlux.subscribe(update -> {
|
||||
updatedValues.add(update.get(0).getValue());
|
||||
}));
|
||||
};
|
||||
Thread subscribeThread = new Thread(subscribeTask);
|
||||
subscribeThread.start();
|
||||
try {
|
||||
// To ensure that subscribeThread gets scheduled
|
||||
Thread.sleep(0);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
Runnable updateKeys = () -> {
|
||||
executeDockerCommand(updateCmd);
|
||||
};
|
||||
new Thread(updateKeys).start();
|
||||
try {
|
||||
// To ensure main thread does not die before outFlux subscribe gets called
|
||||
Thread.sleep(3000);
|
||||
disposable.get().dispose();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
assertEquals(updatedValues.size(), 2);
|
||||
assertTrue(updatedValues.contains("update_myconfigvalue1"));
|
||||
assertTrue(updatedValues.contains("update_myconfigvalue2"));
|
||||
assertFalse(updatedValues.contains("update_myconfigvalue3"));
|
||||
}
|
||||
|
||||
private static void executeDockerCommand(String[] command) {
|
||||
ProcessBuilder processBuilder = new ProcessBuilder(command);
|
||||
Process process = null;
|
||||
try {
|
||||
process = processBuilder.start();
|
||||
process.waitFor();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -13,37 +13,33 @@ limitations under the License.
|
|||
|
||||
package io.dapr.client;
|
||||
|
||||
import io.dapr.client.domain.ConfigurationItem;
|
||||
import io.dapr.client.domain.DeleteStateRequest;
|
||||
import io.dapr.client.domain.DeleteStateRequestBuilder;
|
||||
import io.dapr.client.domain.ExecuteStateTransactionRequest;
|
||||
import io.dapr.client.domain.ExecuteStateTransactionRequestBuilder;
|
||||
import io.dapr.client.domain.GetBulkSecretRequest;
|
||||
import io.dapr.client.domain.GetBulkSecretRequestBuilder;
|
||||
import io.dapr.client.domain.GetBulkStateRequest;
|
||||
import io.dapr.client.domain.GetBulkStateRequestBuilder;
|
||||
import io.dapr.client.domain.GetConfigurationRequest;
|
||||
import io.dapr.client.domain.GetSecretRequest;
|
||||
import io.dapr.client.domain.GetSecretRequestBuilder;
|
||||
import io.dapr.client.domain.GetStateRequest;
|
||||
import io.dapr.client.domain.GetStateRequestBuilder;
|
||||
import io.dapr.client.domain.HttpExtension;
|
||||
import io.dapr.client.domain.InvokeBindingRequest;
|
||||
import io.dapr.client.domain.InvokeBindingRequestBuilder;
|
||||
import io.dapr.client.domain.InvokeMethodRequest;
|
||||
import io.dapr.client.domain.InvokeMethodRequestBuilder;
|
||||
import io.dapr.client.domain.PublishEventRequest;
|
||||
import io.dapr.client.domain.PublishEventRequestBuilder;
|
||||
import io.dapr.client.domain.SaveStateRequest;
|
||||
import io.dapr.client.domain.SaveStateRequestBuilder;
|
||||
import io.dapr.client.domain.State;
|
||||
import io.dapr.client.domain.StateOptions;
|
||||
import io.dapr.client.domain.SubscribeConfigurationRequest;
|
||||
import io.dapr.client.domain.TransactionalStateOperation;
|
||||
import io.dapr.serializer.DaprObjectSerializer;
|
||||
import io.dapr.utils.TypeRef;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Abstract class with convenient methods common between client implementations.
|
||||
|
@ -52,7 +48,7 @@ import java.util.Map;
|
|||
* @see io.dapr.client.DaprClientGrpc
|
||||
* @see io.dapr.client.DaprClientHttp
|
||||
*/
|
||||
abstract class AbstractDaprClient implements DaprClient {
|
||||
abstract class AbstractDaprClient implements DaprClient, DaprPreviewClient {
|
||||
|
||||
/**
|
||||
* A utility class for serialize and deserialize the transient objects.
|
||||
|
@ -416,4 +412,72 @@ abstract class AbstractDaprClient implements DaprClient {
|
|||
return this.getBulkSecret(request).defaultIfEmpty(Collections.emptyMap());
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Mono<ConfigurationItem> getConfiguration(String storeName, String key) {
|
||||
GetConfigurationRequest request = new GetConfigurationRequest(storeName, filterEmptyKeys(key));
|
||||
return this.getConfiguration(request).map(data -> data.get(0));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Mono<ConfigurationItem> getConfiguration(String storeName, String key, Map<String, String> metadata) {
|
||||
GetConfigurationRequest request = new GetConfigurationRequest(storeName, filterEmptyKeys(key));
|
||||
request.setMetadata(metadata);
|
||||
return this.getConfiguration(request).map(data -> data.get(0));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Mono<List<ConfigurationItem>> getConfiguration(String storeName, String... keys) {
|
||||
List<String> listOfKeys = filterEmptyKeys(keys);
|
||||
GetConfigurationRequest request = new GetConfigurationRequest(storeName, listOfKeys);
|
||||
return this.getConfiguration(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Mono<List<ConfigurationItem>> getConfiguration(
|
||||
String storeName,
|
||||
List<String> keys,
|
||||
Map<String, String> metadata) {
|
||||
GetConfigurationRequest request = new GetConfigurationRequest(storeName, keys);
|
||||
request.setMetadata(metadata);
|
||||
return this.getConfiguration(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public Flux<List<ConfigurationItem>> subscribeToConfiguration(String storeName, String... keys) {
|
||||
List<String> listOfKeys = filterEmptyKeys(keys);
|
||||
SubscribeConfigurationRequest request = new SubscribeConfigurationRequest(storeName, listOfKeys);
|
||||
return this.subscribeToConfiguration(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
public Flux<List<ConfigurationItem>> subscribeToConfiguration(
|
||||
String storeName,
|
||||
List<String> keys,
|
||||
Map<String, String> metadata) {
|
||||
SubscribeConfigurationRequest request = new SubscribeConfigurationRequest(storeName, keys);
|
||||
request.setMetadata(metadata);
|
||||
return this.subscribeToConfiguration(request);
|
||||
}
|
||||
|
||||
private List<String> filterEmptyKeys(String... keys) {
|
||||
return Arrays.stream(keys)
|
||||
.filter(key -> !key.trim().isEmpty())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,6 +117,16 @@ public class DaprClientBuilder {
|
|||
return buildDaprClient(this.apiProtocol);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build an instance of the Client based on the provided setup.
|
||||
*
|
||||
* @return an instance of the setup Client
|
||||
* @throws IllegalStateException if any required field is missing
|
||||
*/
|
||||
public DaprPreviewClient buildPreviewClient() {
|
||||
return (DaprPreviewClient) buildDaprClient(this.apiProtocol);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an instance of a Dapr Client based on the chosen protocol.
|
||||
*
|
||||
|
|
|
@ -17,10 +17,12 @@ import com.google.common.base.Strings;
|
|||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Empty;
|
||||
import io.dapr.client.domain.ConfigurationItem;
|
||||
import io.dapr.client.domain.DeleteStateRequest;
|
||||
import io.dapr.client.domain.ExecuteStateTransactionRequest;
|
||||
import io.dapr.client.domain.GetBulkSecretRequest;
|
||||
import io.dapr.client.domain.GetBulkStateRequest;
|
||||
import io.dapr.client.domain.GetConfigurationRequest;
|
||||
import io.dapr.client.domain.GetSecretRequest;
|
||||
import io.dapr.client.domain.GetStateRequest;
|
||||
import io.dapr.client.domain.HttpExtension;
|
||||
|
@ -30,6 +32,7 @@ import io.dapr.client.domain.PublishEventRequest;
|
|||
import io.dapr.client.domain.SaveStateRequest;
|
||||
import io.dapr.client.domain.State;
|
||||
import io.dapr.client.domain.StateOptions;
|
||||
import io.dapr.client.domain.SubscribeConfigurationRequest;
|
||||
import io.dapr.client.domain.TransactionalStateOperation;
|
||||
import io.dapr.config.Properties;
|
||||
import io.dapr.exceptions.DaprException;
|
||||
|
@ -48,6 +51,8 @@ import io.grpc.ForwardingClientCall;
|
|||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.MonoSink;
|
||||
import reactor.util.context.Context;
|
||||
|
@ -677,6 +682,100 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
).then();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Mono<List<ConfigurationItem>> getConfiguration(GetConfigurationRequest request) {
|
||||
try {
|
||||
final String configurationStoreName = request.getStoreName();
|
||||
final Map<String, String> metadata = request.getMetadata();
|
||||
final List<String> keys = request.getKeys();
|
||||
if ((configurationStoreName == null) || (configurationStoreName.trim().isEmpty())) {
|
||||
throw new IllegalArgumentException("Configuration Store Name cannot be null or empty.");
|
||||
}
|
||||
if (keys.isEmpty()) {
|
||||
throw new IllegalArgumentException("Keys can not be empty or null");
|
||||
}
|
||||
DaprProtos.GetConfigurationRequest.Builder builder = DaprProtos.GetConfigurationRequest.newBuilder()
|
||||
.setStoreName(configurationStoreName).addAllKeys(keys);
|
||||
if (metadata != null) {
|
||||
builder.putAllMetadata(metadata);
|
||||
}
|
||||
|
||||
DaprProtos.GetConfigurationRequest envelope = builder.build();
|
||||
return this.getConfigurationAlpha1(envelope);
|
||||
|
||||
} catch (Exception ex) {
|
||||
return DaprException.wrapMono(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private Mono<List<ConfigurationItem>> getConfigurationAlpha1(DaprProtos.GetConfigurationRequest envelope) {
|
||||
return Mono.subscriberContext().flatMap(
|
||||
context ->
|
||||
this.<DaprProtos.GetConfigurationResponse>createMono(
|
||||
it -> intercept(context, asyncStub).getConfigurationAlpha1(envelope, it)
|
||||
)
|
||||
).map(
|
||||
it ->
|
||||
it.getItemsList().stream()
|
||||
.map(this::buildConfigurationItem).collect(Collectors.toList())
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Flux<List<ConfigurationItem>> subscribeToConfiguration(SubscribeConfigurationRequest request) {
|
||||
try {
|
||||
final String configurationStoreName = request.getStoreName();
|
||||
final List<String> keys = request.getKeys();
|
||||
final Map<String, String> metadata = request.getMetadata();
|
||||
|
||||
if (configurationStoreName == null || (configurationStoreName.trim().isEmpty())) {
|
||||
throw new IllegalArgumentException("Configuration Store Name can not be null or empty.");
|
||||
}
|
||||
if (keys.isEmpty()) {
|
||||
throw new IllegalArgumentException("Keys can not be null or empty.");
|
||||
}
|
||||
DaprProtos.SubscribeConfigurationRequest.Builder builder = DaprProtos.SubscribeConfigurationRequest.newBuilder()
|
||||
.setStoreName(configurationStoreName)
|
||||
.addAllKeys(keys);
|
||||
if (metadata != null) {
|
||||
builder.putAllMetadata(metadata);
|
||||
}
|
||||
|
||||
DaprProtos.SubscribeConfigurationRequest envelope = builder.build();
|
||||
return this.<DaprProtos.SubscribeConfigurationResponse>createFlux(
|
||||
it -> intercept(asyncStub).subscribeConfigurationAlpha1(envelope, it)
|
||||
).map(
|
||||
it ->
|
||||
it.getItemsList().stream()
|
||||
.map(this::buildConfigurationItem).collect(Collectors.toList())
|
||||
);
|
||||
} catch (Exception ex) {
|
||||
return DaprException.wrapFlux(ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new Configuration Item from provided parameter.
|
||||
*
|
||||
* @param configurationItem CommonProtos.ConfigurationItem
|
||||
* @return io.dapr.client.domain.ConfigurationItem
|
||||
*/
|
||||
private ConfigurationItem buildConfigurationItem(
|
||||
CommonProtos.ConfigurationItem configurationItem) {
|
||||
return new ConfigurationItem(
|
||||
configurationItem.getKey(),
|
||||
configurationItem.getValue(),
|
||||
configurationItem.getVersion(),
|
||||
configurationItem.getMetadataMap()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Populates GRPC client with interceptors.
|
||||
*
|
||||
|
@ -722,6 +821,10 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
return Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
|
||||
}
|
||||
|
||||
private <T> Flux<T> createFlux(Consumer<StreamObserver<T>> consumer) {
|
||||
return Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run());
|
||||
}
|
||||
|
||||
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
|
||||
return new StreamObserver<T>() {
|
||||
@Override
|
||||
|
@ -740,4 +843,23 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink) {
|
||||
return new StreamObserver<T>() {
|
||||
@Override
|
||||
public void onNext(T value) {
|
||||
sink.next(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
sink.error(DaprException.propagate(new ExecutionException(t)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
sink.complete();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,10 +15,12 @@ package io.dapr.client;
|
|||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.base.Strings;
|
||||
import io.dapr.client.domain.ConfigurationItem;
|
||||
import io.dapr.client.domain.DeleteStateRequest;
|
||||
import io.dapr.client.domain.ExecuteStateTransactionRequest;
|
||||
import io.dapr.client.domain.GetBulkSecretRequest;
|
||||
import io.dapr.client.domain.GetBulkStateRequest;
|
||||
import io.dapr.client.domain.GetConfigurationRequest;
|
||||
import io.dapr.client.domain.GetSecretRequest;
|
||||
import io.dapr.client.domain.GetStateRequest;
|
||||
import io.dapr.client.domain.HttpExtension;
|
||||
|
@ -28,6 +30,7 @@ import io.dapr.client.domain.PublishEventRequest;
|
|||
import io.dapr.client.domain.SaveStateRequest;
|
||||
import io.dapr.client.domain.State;
|
||||
import io.dapr.client.domain.StateOptions;
|
||||
import io.dapr.client.domain.SubscribeConfigurationRequest;
|
||||
import io.dapr.client.domain.TransactionalStateOperation;
|
||||
import io.dapr.client.domain.TransactionalStateRequest;
|
||||
import io.dapr.config.Properties;
|
||||
|
@ -36,6 +39,7 @@ import io.dapr.serializer.DaprObjectSerializer;
|
|||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import io.dapr.utils.NetworkUtils;
|
||||
import io.dapr.utils.TypeRef;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -665,6 +669,22 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
.then();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Mono<List<ConfigurationItem>> getConfiguration(GetConfigurationRequest request) {
|
||||
return DaprException.wrapMono(new UnsupportedOperationException());
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Flux<List<ConfigurationItem>> subscribeToConfiguration(SubscribeConfigurationRequest request) {
|
||||
return DaprException.wrapFlux(new UnsupportedOperationException());
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts metadata map into Query params.
|
||||
* @param metadata metadata map
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Copyright 2022 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.client;
|
||||
|
||||
import io.dapr.client.domain.ConfigurationItem;
|
||||
import io.dapr.client.domain.GetConfigurationRequest;
|
||||
import io.dapr.client.domain.SubscribeConfigurationRequest;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Generic client interface for preview or alpha APIs in Dapr, regardless of GRPC or HTTP.
|
||||
*
|
||||
* @see io.dapr.client.DaprClientBuilder for information on how to make instance for this interface.
|
||||
*/
|
||||
public interface DaprPreviewClient extends AutoCloseable {
|
||||
|
||||
/**
|
||||
* Retrieve a configuration based on a provided key.
|
||||
*
|
||||
* @param storeName Name of the configuration store
|
||||
* @param key key of the configuration item which is to be retrieved
|
||||
* @return Mono of the Configuration Item
|
||||
*/
|
||||
Mono<ConfigurationItem> getConfiguration(String storeName, String key);
|
||||
|
||||
/**
|
||||
* Retrieve a configuration based on a provided key.
|
||||
*
|
||||
* @param storeName Name of the configuration store
|
||||
* @param key key of the configuration item which is to be retrieved
|
||||
* @param metadata optional metadata
|
||||
* @return Mono of the Configuration Item
|
||||
*/
|
||||
Mono<ConfigurationItem> getConfiguration(String storeName, String key, Map<String, String> metadata);
|
||||
|
||||
/**
|
||||
* Retrieve List of configurations based on a provided variable number of keys.
|
||||
*
|
||||
* @param storeName Name of the configuration store
|
||||
* @param keys keys of the configurations which are to be retrieved
|
||||
* @return Mono of List of ConfigurationItems
|
||||
*/
|
||||
Mono<List<ConfigurationItem>> getConfiguration(String storeName, String... keys);
|
||||
|
||||
/**
|
||||
* Retrieve List of configurations based on a provided variable number of keys.
|
||||
*
|
||||
* @param storeName Name of the configuration store
|
||||
* @param keys keys of the configurations which are to be retrieved
|
||||
* @param metadata optional metadata
|
||||
* @return Mono of List of ConfigurationItems
|
||||
*/
|
||||
Mono<List<ConfigurationItem>> getConfiguration(String storeName, List<String> keys, Map<String, String> metadata);
|
||||
|
||||
/**
|
||||
* Retrieve List of configurations based on a provided configuration request object.
|
||||
*
|
||||
* @param request request for retrieving Configurations for a list keys
|
||||
* @return Mono of List of ConfigurationItems
|
||||
*/
|
||||
|
||||
Mono<List<ConfigurationItem>> getConfiguration(GetConfigurationRequest request);
|
||||
|
||||
/**
|
||||
* Subscribe to the keys for any change.
|
||||
*
|
||||
* @param storeName Name of the configuration store
|
||||
* @param keys keys of the configurations which are to be subscribed
|
||||
* @return Flux of List of configuration items
|
||||
*/
|
||||
Flux<List<ConfigurationItem>> subscribeToConfiguration(String storeName, String... keys);
|
||||
|
||||
/**
|
||||
* Subscribe to the keys for any change.
|
||||
*
|
||||
* @param storeName Name of the configuration store
|
||||
* @param keys keys of the configurations which are to be subscribed
|
||||
* @param metadata optional metadata
|
||||
* @return Flux of List of configuration items
|
||||
*/
|
||||
Flux<List<ConfigurationItem>> subscribeToConfiguration(String storeName, List<String> keys,
|
||||
Map<String, String> metadata);
|
||||
|
||||
/**
|
||||
* Subscribe to the keys for any change.
|
||||
*
|
||||
* @param request request for subscribing to any change for the given keys in request
|
||||
* @return Flux of List of configuration items
|
||||
*/
|
||||
Flux<List<ConfigurationItem>> subscribeToConfiguration(SubscribeConfigurationRequest request);
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Copyright 2022 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.client.domain;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A configuration item from Dapr's configuration store.
|
||||
*/
|
||||
public class ConfigurationItem {
|
||||
private final String key;
|
||||
private final String value;
|
||||
private final String version;
|
||||
private final Map<String, String> metadata;
|
||||
|
||||
/**
|
||||
* Constructor for a configuration Item.
|
||||
*
|
||||
* @param key key of the configuration item
|
||||
* @param value value for the provided key
|
||||
* @param version version of the configuration item
|
||||
* @param metadata additional information
|
||||
*/
|
||||
public ConfigurationItem(String key, String value, String version, Map<String, String> metadata) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
this.version = version;
|
||||
this.metadata = Collections.unmodifiableMap(metadata);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor for a configuration Item.
|
||||
*
|
||||
* @param key key of the configuration item
|
||||
* @param value value for the provided key
|
||||
* @param version version of the configuration item
|
||||
*/
|
||||
public ConfigurationItem(String key, String value, String version) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
this.version = version;
|
||||
this.metadata = Collections.emptyMap();
|
||||
}
|
||||
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public String getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public String getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public Map<String, String> getMetadata() {
|
||||
return metadata;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright 2022 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.client.domain;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Request to get one or more configuration items from Dapr's configuration store.
|
||||
*/
|
||||
public class GetConfigurationRequest {
|
||||
private final String storeName;
|
||||
private final List<String> keys;
|
||||
private Map<String, String> metadata;
|
||||
|
||||
public GetConfigurationRequest(String storeName, List<String> keys) {
|
||||
this.storeName = storeName;
|
||||
this.keys = keys == null ? Collections.EMPTY_LIST : Collections.unmodifiableList(keys);
|
||||
}
|
||||
|
||||
public GetConfigurationRequest setMetadata(Map<String, String> metadata) {
|
||||
this.metadata = metadata == null ? null : Collections.unmodifiableMap(metadata);
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getStoreName() {
|
||||
return storeName;
|
||||
}
|
||||
|
||||
public List<String> getKeys() {
|
||||
return keys;
|
||||
}
|
||||
|
||||
public Map<String, String> getMetadata() {
|
||||
return metadata;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright 2022 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.client.domain;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Request to subscribe to one or more configuration items.
|
||||
*/
|
||||
public class SubscribeConfigurationRequest {
|
||||
private final String storeName;
|
||||
private final List<String> keys;
|
||||
private Map<String, String> metadata;
|
||||
|
||||
public SubscribeConfigurationRequest(String storeName, List<String> keys) {
|
||||
this.storeName = storeName;
|
||||
this.keys = keys == null ? Collections.EMPTY_LIST : Collections.unmodifiableList(keys);
|
||||
}
|
||||
|
||||
public SubscribeConfigurationRequest setMetadata(Map<String, String> metadata) {
|
||||
this.metadata = metadata == null ? null : Collections.unmodifiableMap(metadata);
|
||||
return this;
|
||||
}
|
||||
|
||||
public String getStoreName() {
|
||||
return storeName;
|
||||
}
|
||||
|
||||
public List<String> getKeys() {
|
||||
return keys;
|
||||
}
|
||||
|
||||
public Map<String, String> getMetadata() {
|
||||
return metadata;
|
||||
}
|
||||
}
|
|
@ -15,6 +15,7 @@ package io.dapr.exceptions;
|
|||
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import reactor.core.Exceptions;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -154,6 +155,23 @@ public class DaprException extends RuntimeException {
|
|||
return Mono.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps an exception into DaprException (if not already DaprException).
|
||||
*
|
||||
* @param exception Exception to be wrapped.
|
||||
* @param <T> Flux's response type.
|
||||
* @return Flux containing DaprException.
|
||||
*/
|
||||
public static <T> Flux<T> wrapFlux(Exception exception) {
|
||||
try {
|
||||
wrap(exception);
|
||||
} catch (Exception e) {
|
||||
return Flux.error(e);
|
||||
}
|
||||
|
||||
return Flux.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps an exception into DaprException (if not already DaprException).
|
||||
*
|
||||
|
|
|
@ -0,0 +1,261 @@
|
|||
/*
|
||||
* Copyright 2021 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.client;
|
||||
|
||||
|
||||
import io.dapr.client.domain.ConfigurationItem;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import io.dapr.v1.CommonProtos;
|
||||
import io.dapr.v1.DaprGrpc;
|
||||
import io.dapr.v1.DaprProtos;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static io.dapr.utils.TestUtils.assertThrowsDaprException;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class DaprPreviewClientGrpcTest {
|
||||
|
||||
private static final String CONFIG_STORE_NAME = "MyConfigStore";
|
||||
|
||||
private Closeable closeable;
|
||||
private DaprGrpc.DaprStub daprStub;
|
||||
private DaprPreviewClient previewClient;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
closeable = mock(Closeable.class);
|
||||
daprStub = mock(DaprGrpc.DaprStub.class);
|
||||
when(daprStub.withInterceptors(any())).thenReturn(daprStub);
|
||||
previewClient = new DaprClientGrpc(
|
||||
closeable, daprStub, new DefaultObjectSerializer(), new DefaultObjectSerializer());
|
||||
doNothing().when(closeable).close();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
previewClient.close();
|
||||
verify(closeable).close();
|
||||
verifyNoMoreInteractions(closeable);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getConfigurationTestErrorScenario() {
|
||||
assertThrows(IllegalArgumentException.class, () -> {
|
||||
previewClient.getConfiguration(CONFIG_STORE_NAME, "").block();
|
||||
});
|
||||
assertThrows(IllegalArgumentException.class, () -> {
|
||||
previewClient.getConfiguration("", "key").block();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getSingleConfigurationTest() {
|
||||
doAnswer((Answer<Void>) invocation -> {
|
||||
StreamObserver<DaprProtos.GetConfigurationResponse> observer =
|
||||
(StreamObserver<DaprProtos.GetConfigurationResponse>) invocation.getArguments()[1];
|
||||
observer.onNext(getSingleMockResponse());
|
||||
observer.onCompleted();
|
||||
return null;
|
||||
}).when(daprStub).getConfigurationAlpha1(any(DaprProtos.GetConfigurationRequest.class), any());
|
||||
|
||||
ConfigurationItem ci = previewClient.getConfiguration(CONFIG_STORE_NAME, "configkey1").block();
|
||||
assertEquals("configkey1", ci.getKey());
|
||||
assertEquals("configvalue1", ci.getValue());
|
||||
assertEquals("1", ci.getVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getSingleConfigurationWithMetadataTest() {
|
||||
doAnswer((Answer<Void>) invocation -> {
|
||||
StreamObserver<DaprProtos.GetConfigurationResponse> observer =
|
||||
(StreamObserver<DaprProtos.GetConfigurationResponse>) invocation.getArguments()[1];
|
||||
observer.onNext(getSingleMockResponse());
|
||||
observer.onCompleted();
|
||||
return null;
|
||||
}).when(daprStub).getConfigurationAlpha1(any(DaprProtos.GetConfigurationRequest.class), any());
|
||||
|
||||
Map<String, String> reqMetadata = new HashMap<>();
|
||||
reqMetadata.put("meta1", "value1");
|
||||
ConfigurationItem ci = previewClient.getConfiguration(CONFIG_STORE_NAME, "configkey1", reqMetadata).block();
|
||||
assertEquals("configkey1", ci.getKey());
|
||||
assertEquals("configvalue1", ci.getValue());
|
||||
assertEquals("1", ci.getVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getMultipleConfigurationTest() {
|
||||
doAnswer((Answer<Void>) invocation -> {
|
||||
StreamObserver<DaprProtos.GetConfigurationResponse> observer =
|
||||
(StreamObserver<DaprProtos.GetConfigurationResponse>) invocation.getArguments()[1];
|
||||
observer.onNext(getMultipleMockResponse());
|
||||
observer.onCompleted();
|
||||
return null;
|
||||
}).when(daprStub).getConfigurationAlpha1(any(DaprProtos.GetConfigurationRequest.class), any());
|
||||
|
||||
List<ConfigurationItem> cis = previewClient.getConfiguration(CONFIG_STORE_NAME, "configkey1","configkey2").block();
|
||||
assertEquals(2, cis.size());
|
||||
assertEquals("configkey1", cis.stream().findFirst().get().getKey());
|
||||
assertEquals("configvalue1", cis.stream().findFirst().get().getValue());
|
||||
assertEquals("1", cis.stream().findFirst().get().getVersion());
|
||||
|
||||
assertEquals("configkey2", cis.stream().skip(1).findFirst().get().getKey());
|
||||
assertEquals("configvalue2", cis.stream().skip(1).findFirst().get().getValue());
|
||||
assertEquals("1", cis.stream().skip(1).findFirst().get().getVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getMultipleConfigurationWithMetadataTest() {
|
||||
doAnswer((Answer<Void>) invocation -> {
|
||||
StreamObserver<DaprProtos.GetConfigurationResponse> observer =
|
||||
(StreamObserver<DaprProtos.GetConfigurationResponse>) invocation.getArguments()[1];
|
||||
observer.onNext(getMultipleMockResponse());
|
||||
observer.onCompleted();
|
||||
return null;
|
||||
}).when(daprStub).getConfigurationAlpha1(any(DaprProtos.GetConfigurationRequest.class), any());
|
||||
|
||||
Map<String, String> reqMetadata = new HashMap<>();
|
||||
reqMetadata.put("meta1", "value1");
|
||||
List<String> keys = Arrays.asList("configkey1","configkey2");
|
||||
List<ConfigurationItem> cis = previewClient.getConfiguration(CONFIG_STORE_NAME, keys, reqMetadata).block();
|
||||
assertEquals(2, cis.size());
|
||||
assertEquals("configkey1", cis.stream().findFirst().get().getKey());
|
||||
assertEquals("configvalue1", cis.stream().findFirst().get().getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void subscribeConfigurationTest() {
|
||||
Map<String, String> metadata = new HashMap<>();
|
||||
metadata.put("meta1", "value1");
|
||||
DaprProtos.SubscribeConfigurationResponse responseEnvelope = DaprProtos.SubscribeConfigurationResponse.newBuilder()
|
||||
.addItems(CommonProtos.ConfigurationItem.newBuilder()
|
||||
.setKey("configkey1")
|
||||
.setValue("configvalue1")
|
||||
.setVersion("1")
|
||||
.putAllMetadata(metadata)
|
||||
.build())
|
||||
.build();
|
||||
|
||||
doAnswer((Answer<Void>) invocation -> {
|
||||
StreamObserver<DaprProtos.SubscribeConfigurationResponse> observer =
|
||||
(StreamObserver<DaprProtos.SubscribeConfigurationResponse>) invocation.getArguments()[1];
|
||||
observer.onNext(responseEnvelope);
|
||||
observer.onCompleted();
|
||||
return null;
|
||||
}).when(daprStub).subscribeConfigurationAlpha1(any(DaprProtos.SubscribeConfigurationRequest.class), any());
|
||||
|
||||
Iterator<List<ConfigurationItem>> itr = previewClient.subscribeToConfiguration(CONFIG_STORE_NAME, "configkey1").toIterable().iterator();
|
||||
assertTrue(itr.hasNext());
|
||||
assertEquals("configkey1", itr.next().get(0).getKey());
|
||||
assertFalse(itr.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void subscribeConfigurationTestWithMetadata() {
|
||||
Map<String, String> metadata = new HashMap<>();
|
||||
metadata.put("meta1", "value1");
|
||||
DaprProtos.SubscribeConfigurationResponse responseEnvelope = DaprProtos.SubscribeConfigurationResponse.newBuilder()
|
||||
.addItems(CommonProtos.ConfigurationItem.newBuilder()
|
||||
.setKey("configkey1")
|
||||
.setValue("configvalue1")
|
||||
.setVersion("1")
|
||||
.putAllMetadata(metadata)
|
||||
.build())
|
||||
.build();
|
||||
|
||||
doAnswer((Answer<Void>) invocation -> {
|
||||
StreamObserver<DaprProtos.SubscribeConfigurationResponse> observer =
|
||||
(StreamObserver<DaprProtos.SubscribeConfigurationResponse>) invocation.getArguments()[1];
|
||||
observer.onNext(responseEnvelope);
|
||||
observer.onCompleted();
|
||||
return null;
|
||||
}).when(daprStub).subscribeConfigurationAlpha1(any(DaprProtos.SubscribeConfigurationRequest.class), any());
|
||||
|
||||
Map<String, String> reqMetadata = new HashMap<>();
|
||||
List<String> keys = Arrays.asList("configkey1");
|
||||
|
||||
Iterator<List<ConfigurationItem>> itr = previewClient.subscribeToConfiguration(CONFIG_STORE_NAME, keys, reqMetadata).toIterable().iterator();
|
||||
assertTrue(itr.hasNext());
|
||||
assertEquals("configkey1", itr.next().get(0).getKey());
|
||||
assertFalse(itr.hasNext());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void subscribeConfigurationWithErrorTest() {
|
||||
doAnswer((Answer<Void>) invocation -> {
|
||||
StreamObserver<DaprProtos.SubscribeConfigurationResponse> observer =
|
||||
(StreamObserver<DaprProtos.SubscribeConfigurationResponse>) invocation.getArguments()[1];
|
||||
observer.onError(new RuntimeException());
|
||||
observer.onCompleted();
|
||||
return null;
|
||||
}).when(daprStub).subscribeConfigurationAlpha1(any(DaprProtos.SubscribeConfigurationRequest.class), any());
|
||||
|
||||
assertThrowsDaprException(ExecutionException.class, () -> {
|
||||
previewClient.subscribeToConfiguration(CONFIG_STORE_NAME, "key").blockFirst();
|
||||
});
|
||||
|
||||
assertThrows(IllegalArgumentException.class, () -> {
|
||||
previewClient.subscribeToConfiguration("", "key").blockFirst();
|
||||
});
|
||||
}
|
||||
|
||||
private DaprProtos.GetConfigurationResponse getSingleMockResponse() {
|
||||
Map<String, String> metadata = new HashMap<>();
|
||||
metadata.put("meta1", "value1");
|
||||
DaprProtos.GetConfigurationResponse responseEnvelope = DaprProtos.GetConfigurationResponse.newBuilder()
|
||||
.addItems(CommonProtos.ConfigurationItem.newBuilder()
|
||||
.setKey("configkey1")
|
||||
.setValue("configvalue1")
|
||||
.setVersion("1")
|
||||
.putAllMetadata(metadata)
|
||||
.build()
|
||||
).build();
|
||||
return responseEnvelope;
|
||||
}
|
||||
|
||||
private DaprProtos.GetConfigurationResponse getMultipleMockResponse() {
|
||||
Map<String, String> metadata = new HashMap<>();
|
||||
metadata.put("meta1", "value1");
|
||||
DaprProtos.GetConfigurationResponse responseEnvelope = DaprProtos.GetConfigurationResponse.newBuilder()
|
||||
.addItems(CommonProtos.ConfigurationItem.newBuilder()
|
||||
.setKey("configkey1")
|
||||
.setValue("configvalue1")
|
||||
.setVersion("1")
|
||||
.putAllMetadata(metadata)
|
||||
.build())
|
||||
.addItems(CommonProtos.ConfigurationItem.newBuilder()
|
||||
.setKey("configkey2")
|
||||
.setValue("configvalue2")
|
||||
.setVersion("1")
|
||||
.putAllMetadata(metadata)
|
||||
.build())
|
||||
.build();
|
||||
return responseEnvelope;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation and Dapr Contributors.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.client;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
|
||||
import io.dapr.config.Properties;
|
||||
import io.dapr.exceptions.DaprException;
|
||||
import io.dapr.serializer.DaprObjectSerializer;
|
||||
import io.dapr.utils.TypeRef;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.mock.Behavior;
|
||||
import okhttp3.mock.MockInterceptor;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.ServerSocket;
|
||||
import java.net.Socket;
|
||||
|
||||
import static io.dapr.utils.TestUtils.findFreePort;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class DaprPreviewClientHttpTest {
|
||||
private static final String CONFIG_STORE_NAME = "MyConfigurationStore";
|
||||
|
||||
private DaprPreviewClient daprPreviewClientHttp;
|
||||
|
||||
private DaprHttp daprHttp;
|
||||
|
||||
private OkHttpClient okHttpClient;
|
||||
|
||||
private MockInterceptor mockInterceptor;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
|
||||
okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build();
|
||||
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
|
||||
daprPreviewClientHttp = new DaprClientHttp(daprHttp);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getConfigurationWithSingleKey() {
|
||||
assertThrows(DaprException.class, () -> {
|
||||
daprPreviewClientHttp.getConfiguration(CONFIG_STORE_NAME, "key").block();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getConfiguration() {
|
||||
assertThrows(DaprException.class, () -> {
|
||||
daprPreviewClientHttp.getConfiguration(CONFIG_STORE_NAME, "key1", "key2").block();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void subscribeConfigurations() {
|
||||
assertThrows(DaprException.class, () -> {
|
||||
daprPreviewClientHttp.subscribeToConfiguration(CONFIG_STORE_NAME, "key1", "key2").blockFirst();
|
||||
});
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue