diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 4dd275ab7..620d5167a 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -25,7 +25,7 @@ jobs:
DAPR_RUNTIME_VER: 0.10.0-rc.0
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/e7c9a643dfefbcfff0c2c26c12029259e6e81180/install/install.sh
DAPR_CLI_REF: e7c9a643dfefbcfff0c2c26c12029259e6e81180
- DAPR_REF:
+ DAPR_REF: 1e3b7aa3202e268759bce6cf9b30b6d95471ab8c
OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }}
OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }}
GPG_KEY: ${{ secrets.GPG_KEY }}
diff --git a/pom.xml b/pom.xml
index 94bbd65e1..42f3e14d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -17,7 +17,7 @@
1.25.0
3.11.0
3.10.0
- https://raw.githubusercontent.com/dapr/dapr/2f32759fb9798d79b6d3fa92b1d972b07a62b6e8/dapr/proto
+ https://raw.githubusercontent.com/dapr/dapr/1660773c3da6740377f4440cae89dea93479c9f5/dapr/proto
1.6.2
3.1.1
1.8
diff --git a/sdk-tests/pom.xml b/sdk-tests/pom.xml
index 3b51c1ba1..addf82c99 100644
--- a/sdk-tests/pom.xml
+++ b/sdk-tests/pom.xml
@@ -78,6 +78,12 @@
5.1.0
test
+
+ io.projectreactor
+ reactor-core
+ 3.3.1.RELEASE
+ test
+
diff --git a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java
index 600972554..dd11c26b7 100644
--- a/sdk-tests/src/test/java/io/dapr/it/DaprRun.java
+++ b/sdk-tests/src/test/java/io/dapr/it/DaprRun.java
@@ -42,7 +42,7 @@ public class DaprRun {
Class serviceClass,
int maxWaitMilliseconds) {
// The app name needs to be deterministic since we depend on it to kill previous runs.
- this.appName = String.format("%s_%s", testName, serviceClass == null ? "" : serviceClass.getSimpleName());
+ this.appName = serviceClass == null ? testName : String.format("%s_%s", testName, serviceClass.getSimpleName());
this.startCommand =
new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports));
this.listCommand = new Command(
diff --git a/sdk-tests/src/test/java/io/dapr/it/state/AbstractStateClientIT.java b/sdk-tests/src/test/java/io/dapr/it/state/AbstractStateClientIT.java
new file mode 100644
index 000000000..cddf85156
--- /dev/null
+++ b/sdk-tests/src/test/java/io/dapr/it/state/AbstractStateClientIT.java
@@ -0,0 +1,490 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.it.state;
+
+import io.dapr.client.DaprClient;
+import io.dapr.client.DaprClientBuilder;
+import io.dapr.client.domain.State;
+import io.dapr.client.domain.StateOptions;
+import io.dapr.it.BaseIT;
+import io.dapr.it.DaprRun;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import reactor.core.publisher.Mono;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Common test cases for Dapr client (GRPC and HTTP).
+ */
+public abstract class AbstractStateClientIT extends BaseIT {
+
+ @Test
+ public void saveAndGetState() {
+
+ //The key use to store the state
+ final String stateKey = "myKey";
+
+ //create the http client
+ DaprClient daprClient = buildDaprClient();
+
+ //creation of a dummy data
+ MyData data = new MyData();
+ data.setPropertyA("data in property A");
+ data.setPropertyB("data in property B");
+
+ //create of the deferred call to DAPR to store the state
+ Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
+ //execute the save action
+ saveResponse.block();
+
+ //create of the deferred call to DAPR to get the state
+ Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey), MyData.class);
+
+ //retrieve the state
+ State myDataResponse = response.block();
+
+ //Assert that the response is the correct one
+ Assert.assertNotNull(myDataResponse.getEtag());
+ Assert.assertNotNull(myDataResponse.getKey());
+ Assert.assertNotNull(myDataResponse.getValue());
+ Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
+ Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
+ }
+
+ @Test
+ public void saveAndGetBulkStates() {
+ final String stateKeyOne = UUID.randomUUID().toString();
+ final String stateKeyTwo = UUID.randomUUID().toString();
+ final String stateKeyThree = "NotFound";
+
+ DaprClient daprClient = buildDaprClient();
+
+ MyData data = new MyData();
+ data.setPropertyA("data in property A");
+ data.setPropertyB("data in property B");
+
+ //saves the states.
+ daprClient.saveState(STATE_STORE_NAME, stateKeyOne, "1", data, null).block();
+ daprClient.saveState(STATE_STORE_NAME, stateKeyTwo, null, null, null).block();
+
+ //retrieves states in bulk.
+ Mono>> response =
+ daprClient.getStates(STATE_STORE_NAME, Arrays.asList(stateKeyOne, stateKeyTwo, stateKeyThree), MyData.class);
+ List> result = response.block();
+
+ //Assert that the response is the correct one
+ assertEquals(3, result.size());
+ assertEquals(stateKeyOne, result.stream().findFirst().get().getKey());
+ assertEquals(data, result.stream().findFirst().get().getValue());
+ assertEquals("1", result.stream().findFirst().get().getEtag());
+ assertNull(result.stream().findFirst().get().getError());
+
+ assertEquals(stateKeyTwo, result.stream().skip(1).findFirst().get().getKey());
+ assertNull(result.stream().skip(1).findFirst().get().getValue());
+ assertEquals("1", result.stream().skip(1).findFirst().get().getEtag());
+ assertNull(result.stream().skip(1).findFirst().get().getError());
+
+ assertEquals(stateKeyThree, result.stream().skip(2).findFirst().get().getKey());
+ assertNull(result.stream().skip(2).findFirst().get().getValue());
+ assertEquals("", result.stream().skip(2).findFirst().get().getEtag());
+ assertNull("not found", result.stream().skip(2).findFirst().get().getError());
+ }
+
+ @Test
+ public void saveUpdateAndGetState() {
+
+ //The key use to store the state and be updated
+ final String stateKey = "keyToBeUpdated";
+
+ //create http DAPR client
+ DaprClient daprClient = buildDaprClient();
+ //Create dummy data to be store
+ MyData data = new MyData();
+ data.setPropertyA("data in property A");
+ data.setPropertyB("data in property B");
+
+ //Create deferred action to save the sate
+ Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
+ //execute save action to DAPR
+ saveResponse.block();
+
+ //change data properties
+ data.setPropertyA("data in property A");
+ data.setPropertyB("data in property B2");
+ //create deferred action to update the sate without any etag or options
+ saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
+ //execute the update action to DAPR
+ saveResponse.block();
+
+ //Create deferred action to retrieve the action
+ Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
+ MyData.class);
+ //execute the retrieve of the state
+ State myDataResponse = response.block();
+
+ //review that the update was success action
+ Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
+ Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
+ }
+
+ @Test
+ public void saveAndDeleteState() {
+ //The key use to store the state and be deleted
+ final String stateKey = "myeKeyToBeDeleted";
+
+ //create DAPR client
+ DaprClient daprClient = buildDaprClient();
+
+ //Create dummy data to be store
+ MyData data = new MyData();
+ data.setPropertyA("data in property A");
+ data.setPropertyB("data in property B");
+ //Create deferred action to save the sate
+ Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
+ //execute the save state action
+ saveResponse.block();
+
+ //Create deferred action to retrieve the state
+ Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
+ MyData.class);
+ //execute the retrieve of the state
+ State myDataResponse = response.block();
+
+ //review that the state was saved correctly
+ Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
+ Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
+
+ //create deferred action to delete the state
+ Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, null, null);
+ //execute the delete action
+ deleteResponse.block();
+
+ //Create deferred action to retrieve the state
+ response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
+ //execute the retrieve of the state
+ myDataResponse = response.block();
+
+ //review that the action does not return any value, because the state was deleted
+ Assert.assertNull(myDataResponse.getValue());
+ }
+
+
+ @Test
+ public void saveUpdateAndGetStateWithEtag() {
+ //The key use to store the state and be updated using etags
+ final String stateKey = "keyToBeUpdatedWithEtag";
+ //create DAPR client
+ DaprClient daprClient = buildDaprClient();
+ //Create dummy data to be store
+ MyData data = new MyData();
+ data.setPropertyA("data in property A");
+ data.setPropertyB("data in property B");
+
+ //Create deferred action to save the sate
+ Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
+ //execute the save state action
+ saveResponse.block();
+
+ //Create deferred action to retrieve the state
+ Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
+ MyData.class);
+ //execute the action for retrieve the state and the etag
+ State myDataResponse = response.block();
+
+ //review that the etag is not empty
+ Assert.assertNotNull(myDataResponse.getEtag());
+ Assert.assertNotNull(myDataResponse.getKey());
+ Assert.assertNotNull(myDataResponse.getValue());
+ Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
+ Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
+
+ String firstETag = myDataResponse.getEtag();
+
+ //change the data in order to update the state
+ data.setPropertyA("data in property A2");
+ data.setPropertyB("data in property B2");
+ //Create deferred action to update the data using the correct etag
+ saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, null);
+ saveResponse.block();
+
+
+ response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
+ //retrive the data wihout any etag
+ myDataResponse = response.block();
+
+ //review that state value changes
+ Assert.assertNotNull(myDataResponse.getEtag());
+ //review that the etag changes after an update
+ Assert.assertNotEquals(firstETag, myDataResponse.getEtag());
+ Assert.assertNotNull(myDataResponse.getKey());
+ Assert.assertNotNull(myDataResponse.getValue());
+ Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
+ Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
+ }
+
+
+ @Test(expected = RuntimeException.class)
+ public void saveUpdateAndGetStateWithWrongEtag() {
+ final String stateKey = "keyToBeUpdatedWithWrongEtag";
+
+ //create DAPR client
+ DaprClient daprClient = buildDaprClient();
+ //Create dummy data to be store
+ MyData data = new MyData();
+ data.setPropertyA("data in property A");
+ data.setPropertyB("data in property B");
+
+ //Create deferred action to save the sate
+ Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
+ //execute the save state action
+ saveResponse.block();
+
+ //Create deferred action to retrieve the state
+ Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
+ MyData.class);
+ //execute the action for retrieve the state and the etag
+ State myDataResponse = response.block();
+
+ //review that the etag is not empty
+ Assert.assertNotNull(myDataResponse.getEtag());
+ Assert.assertNotNull(myDataResponse.getKey());
+ Assert.assertNotNull(myDataResponse.getValue());
+ Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
+ Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
+
+ String firstETag = myDataResponse.getEtag();
+
+ //change the data in order to update the state
+ data.setPropertyA("data in property A2");
+ data.setPropertyB("data in property B2");
+ //Create deferred action to update the data using the incorrect etag
+ saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "99999999999999", data, null);
+ saveResponse.block();
+
+
+ response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
+ //retrive the data wihout any etag
+ myDataResponse = response.block();
+
+ //review that state value changes
+ Assert.assertNotNull(myDataResponse.getEtag());
+ //review that the etag changes after an update
+ Assert.assertNotEquals(firstETag, myDataResponse.getEtag());
+ Assert.assertNotNull(myDataResponse.getKey());
+ Assert.assertNotNull(myDataResponse.getValue());
+ Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
+ Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
+ }
+
+ @Test
+ public void saveAndDeleteStateWithEtag() {
+ final String stateKey = "myeKeyToBeDeletedWithEtag";
+ //create DAPR client
+ DaprClient daprClient = buildDaprClient();
+ //Create dummy data to be store
+ MyData data = new MyData();
+ data.setPropertyA("data in property A");
+ data.setPropertyB("data in property B");
+ //Create deferred action to save the sate
+ Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
+ //execute the save state action
+ saveResponse.block();
+
+ //Create deferred action to get the state with the etag
+ Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
+ MyData.class);
+ //execute the get state
+ State myDataResponse = response.block();
+
+ Assert.assertNotNull(myDataResponse.getEtag());
+ Assert.assertNotNull(myDataResponse.getKey());
+ Assert.assertNotNull(myDataResponse.getValue());
+ Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
+ Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
+
+ //Create deferred action to delete an state sending the etag
+ Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), null);
+ //execute the delete of the state
+ deleteResponse.block();
+
+ //Create deferred action to get the sate without an etag
+ response = daprClient.getState(STATE_STORE_NAME, new State(stateKey), MyData.class);
+ myDataResponse = response.block();
+
+ //Review that the response is null, because the state was deleted
+ Assert.assertNull(myDataResponse.getValue());
+ }
+
+
+ @Test(expected = RuntimeException.class)
+ public void saveAndDeleteStateWithWrongEtag() {
+ final String stateKey = "myeKeyToBeDeletedWithWrongEtag";
+
+ //create DAPR client
+ DaprClient daprClient = buildDaprClient();
+ //Create dummy data to be store
+ MyData data = new MyData();
+ data.setPropertyA("data in property A");
+ data.setPropertyB("data in property B");
+ //Create deferred action to save the sate
+ Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
+ //execute the save state action
+ saveResponse.block();
+
+ //Create deferred action to get the state with the etag
+ Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
+ MyData.class);
+ //execute the get state
+ State myDataResponse = response.block();
+
+ Assert.assertNotNull(myDataResponse.getEtag());
+ Assert.assertNotNull(myDataResponse.getKey());
+ Assert.assertNotNull(myDataResponse.getValue());
+ Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
+ Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
+
+ //Create deferred action to delete an state sending the incorrect etag
+ Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999999", null);
+ //execute the delete of the state, this should trhow an exception
+ deleteResponse.block();
+
+ //Create deferred action to get the sate without an etag
+ response = daprClient.getState(STATE_STORE_NAME, new State(stateKey), MyData.class);
+ myDataResponse = response.block();
+
+ //Review that the response is null, because the state was deleted
+ Assert.assertNull(myDataResponse.getValue());
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void saveUpdateAndGetStateWithEtagAndStateOptionsFirstWrite() {
+ final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
+
+ //create option with concurrency with first writte and consistency of strong
+ StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG,
+ StateOptions.Concurrency.FIRST_WRITE);
+
+ //create dapr client
+ DaprClient daprClient = buildDaprClient();
+ //create Dummy data
+ MyData data = new MyData();
+ data.setPropertyA("data in property A");
+ data.setPropertyB("data in property B");
+
+ //create state using stateOptions
+ Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
+ //execute the save state
+ saveResponse.block();
+
+
+ //crate deferred action to retrieve the state
+ Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
+ MyData.class);
+ //execute the retrieve of the state using options
+ State myDataResponse = response.block();
+
+ Assert.assertNotNull(myDataResponse.getEtag());
+ Assert.assertNotNull(myDataResponse.getKey());
+ Assert.assertNotNull(myDataResponse.getValue());
+ Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
+ Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
+
+ //change data to be udpated
+ data.setPropertyA("data in property A2");
+ data.setPropertyB("data in property B2");
+ //create deferred action to update the action with options
+ saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
+ //update the state
+ saveResponse.block();
+
+
+ data.setPropertyA("last write");
+ data.setPropertyB("data in property B2");
+ //create deferred action to update the action with the same etag
+ saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
+ //throws an exception, the state was already udpated
+ saveResponse.block();
+
+ response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
+ State myLastDataResponse = response.block();
+
+ Assert.assertNotNull(myLastDataResponse.getEtag());
+ Assert.assertNotNull(myLastDataResponse.getKey());
+ Assert.assertNotNull(myLastDataResponse.getValue());
+ Assert.assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
+ Assert.assertEquals("data in property A2", myLastDataResponse.getValue().getPropertyA());
+ Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
+ }
+
+ @Test()
+ public void saveUpdateAndGetStateWithEtagAndStateOptionsLastWrite() {
+ final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
+
+ //create option with concurrency with first writte and consistency of strong
+ StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE);
+
+ //create dapr client
+ DaprClient daprClient = buildDaprClient();
+ //create Dummy data
+ MyData data = new MyData();
+ data.setPropertyA("data in property A");
+ data.setPropertyB("data in property B");
+
+ //create state using stateOptions
+ Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
+ //execute the save state
+ saveResponse.block();
+
+
+ //crate deferred action to retrieve the state
+ Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
+ MyData.class);
+ //execute the retrieve of the state using options
+ State myDataResponse = response.block();
+
+ Assert.assertNotNull(myDataResponse.getEtag());
+ Assert.assertNotNull(myDataResponse.getKey());
+ Assert.assertNotNull(myDataResponse.getValue());
+ Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
+ Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
+
+ //change data to be udpated
+ data.setPropertyA("data in property A2");
+ data.setPropertyB("data in property B2");
+ //create deferred action to update the action with options
+ saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
+ //update the state
+ saveResponse.block();
+
+
+ data.setPropertyA("last write");
+ data.setPropertyB("data in property B2");
+ //create deferred action to update the action with the same etag
+ saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
+ //update the state without an error
+ saveResponse.block();
+
+ response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
+ State myLastDataResponse = response.block();
+
+ Assert.assertNotNull(myLastDataResponse.getEtag());
+ Assert.assertNotNull(myLastDataResponse.getKey());
+ Assert.assertNotNull(myLastDataResponse.getValue());
+ Assert.assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
+ Assert.assertEquals("last write", myLastDataResponse.getValue().getPropertyA());
+ Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
+ }
+
+ protected abstract DaprClient buildDaprClient();
+
+}
diff --git a/sdk-tests/src/test/java/io/dapr/it/state/GRPCStateClientIT.java b/sdk-tests/src/test/java/io/dapr/it/state/GRPCStateClientIT.java
index c85bfcfb0..ec2c59220 100644
--- a/sdk-tests/src/test/java/io/dapr/it/state/GRPCStateClientIT.java
+++ b/sdk-tests/src/test/java/io/dapr/it/state/GRPCStateClientIT.java
@@ -8,28 +8,18 @@ package io.dapr.it.state;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprClientGrpc;
-import io.dapr.client.domain.State;
-import io.dapr.client.domain.StateOptions;
-import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import reactor.core.publisher.Mono;
import java.io.IOException;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Test State GRPC DAPR capabilities using a DAPR instance with an empty service running
*/
-public class GRPCStateClientIT extends BaseIT {
+public class GRPCStateClientIT extends AbstractStateClientIT {
private static DaprRun daprRun;
@@ -49,419 +39,9 @@ public class GRPCStateClientIT extends BaseIT {
daprClient.close();
}
-
- @Test
- public void saveAndGetState() {
-
- //The key use to store the state
- final String stateKey = "myKey";
-
- //create the http client
-
-
- //creation of a dummy data
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
-
- //create of the deferred call to DAPR to store the state
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the save action
- saveResponse.block();
-
- //create of the deferred call to DAPR to get the state
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
-
- //retrieve the state
- State myDataResponse = response.block();
-
- //Assert that the response is the correct one
- assertNotNull(myDataResponse.getEtag());
- assertNotNull(myDataResponse.getKey());
- assertNotNull(myDataResponse.getValue());
- assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
- }
-
- @Test
- public void saveUpdateAndGetState() {
-
- //The key use to store the state and be updated
- final String stateKey = "keyToBeUpdated";
-
- //create http DAPR client
-
- //Create dummy data to be store
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
-
- //Create deferred action to save the sate
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute save action to DAPR
- saveResponse.block();
-
- //change data properties
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B2");
- //create deferred action to update the sate without any etag or options
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the update action to DAPR
- saveResponse.block();
-
- //Create deferred action to retrieve the action
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
- MyData.class);
- //execute the retrieve of the state
- State myDataResponse = response.block();
-
- //review that the update was success action
- assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
- }
-
- @Test
- public void saveAndDeleteState() {
- //The key use to store the state and be deleted
- final String stateKey = "myeKeyToBeDeleted";
-
- //Create dummy data to be store
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
- //Create deferred action to save the sate
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the save state action
- saveResponse.block();
-
- //Create deferred action to retrieve the state
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
- MyData.class);
- //execute the retrieve of the state
- State myDataResponse = response.block();
-
- //review that the state was saved correctly
- assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- //create deferred action to delete the state
- Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, null, null);
- //execute the delete action
- deleteResponse.block();
-
- //Create deferred action to retrieve the state
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
- //execute the retrieve of the state
- myDataResponse = response.block();
-
- //review that the action does not return any value, because the state was deleted
- assertNull(myDataResponse.getValue());
- }
-
-
- @Test
- public void saveUpdateAndGetStateWithEtag() {
- //The key use to store the state and be updated using etags
- final String stateKey = "keyToBeUpdatedWithEtag";
-
-
- //Create dummy data to be store
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
-
- //Create deferred action to save the sate
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the save state action
- saveResponse.block();
-
- //Create deferred action to retrieve the state
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
- MyData.class);
- //execute the action for retrieve the state and the etag
- State myDataResponse = response.block();
-
- //review that the etag is not empty
- assertNotNull(myDataResponse.getEtag());
- assertNotNull(myDataResponse.getKey());
- assertNotNull(myDataResponse.getValue());
- assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- String firstETag = myDataResponse.getEtag();
-
- //change the data in order to update the state
- data.setPropertyA("data in property A2");
- data.setPropertyB("data in property B2");
- //Create deferred action to update the data using the correct etag
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, null);
- saveResponse.block();
-
-
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
- //retrive the data wihout any etag
- myDataResponse = response.block();
-
- //review that state value changes
- assertNotNull(myDataResponse.getEtag());
- //review that the etag changes after an update
- assertNotEquals(firstETag, myDataResponse.getEtag());
- assertNotNull(myDataResponse.getKey());
- assertNotNull(myDataResponse.getValue());
- assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
- assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
- }
-
- @Ignore("This test case is ignored because DAPR ignore the ETag is wrong when is sent from GRPC protocol, the " +
- "execution continues and the state is updated.")
- @Test(expected = RuntimeException.class)
- public void saveUpdateAndGetStateWithWrongEtag() {
- final String stateKey = "keyToBeUpdatedWithWrongEtag";
-
- //Create dummy data to be store
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
-
- //Create deferred action to save the sate
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the save state action
- saveResponse.block();
-
- //Create deferred action to retrieve the state
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
- MyData.class);
- //execute the action for retrieve the state and the etag
- State myDataResponse = response.block();
-
- //review that the etag is not empty
- assertNotNull(myDataResponse.getEtag());
- assertNotNull(myDataResponse.getKey());
- assertNotNull(myDataResponse.getValue());
- assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- String firstETag = myDataResponse.getEtag();
-
- //change the data in order to update the state
- data.setPropertyA("data in property A2");
- data.setPropertyB("data in property B2");
- //Create deferred action to update the data using the incorrect etag
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "99999999999999", data, null);
- saveResponse.block();
-
-
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
- //retrive the data wihout any etag
- myDataResponse = response.block();
-
- //review that state value changes
- assertNotNull(myDataResponse.getEtag());
- //review that the etag changes after an update
- assertNotEquals(firstETag, myDataResponse.getEtag());
- assertNotNull(myDataResponse.getKey());
- assertNotNull(myDataResponse.getValue());
- assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
- assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
- }
-
- @Test
- public void saveAndDeleteStateWithEtag() {
- final String stateKey = "myeKeyToBeDeletedWithEtag";
-
-
- //Create dummy data to be store
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
- //Create deferred action to save the sate
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the save state action
- saveResponse.block();
-
- //Create deferred action to get the state with the etag
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
- MyData.class);
- //execute the get state
- State myDataResponse = response.block();
-
- assertNotNull(myDataResponse.getEtag());
- assertNotNull(myDataResponse.getKey());
- assertNotNull(myDataResponse.getValue());
- assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- //Create deferred action to delete an state sending the etag
- Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), null);
- //execute the delete of the state
- deleteResponse.block();
-
- //Create deferred action to get the sate without an etag
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
- myDataResponse = response.block();
-
- //Review that the response is null, because the state was deleted
- assertNull(myDataResponse.getValue());
- }
-
- @Ignore("This test case is ignored because DAPR ignore if the ETag is wrong when is sent from GRPC protocol, the " +
- "execution continues and the state is deleted.")
- @Test(expected = RuntimeException.class)
- public void saveAndDeleteStateWithWrongEtag() {
- final String stateKey = "myeKeyToBeDeletedWithWrongEtag";
-
-
- //Create dummy data to be store
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
- //Create deferred action to save the sate
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the save state action
- saveResponse.block();
-
- //Create deferred action to get the state with the etag
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
- MyData.class);
- //execute the get state
- State myDataResponse = response.block();
-
- assertNotNull(myDataResponse.getEtag());
- assertNotNull(myDataResponse.getKey());
- assertNotNull(myDataResponse.getValue());
- assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- //Create deferred action to delete an state sending the incorrect etag
- Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999999", null);
- //execute the delete of the state, this should trhow an exception
- deleteResponse.block();
-
- //Create deferred action to get the sate without an etag
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
- myDataResponse = response.block();
-
- //Review that the response is null, because the state was deleted
- assertNull(myDataResponse.getValue());
- }
-
- @Ignore("This test case is ignored because it seems that DAPR using GRPC is ignoring the state options for " +
- "consistency and concurrency.")
- @Test(expected = RuntimeException.class)
- public void saveUpdateAndGetStateWithEtagAndStateOptionsFirstWrite() {
- final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
-
- //create option with concurrency with first writte and consistency of strong
- StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG,
- StateOptions.Concurrency.FIRST_WRITE);
-
-
- //create Dummy data
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
-
- //create state using stateOptions
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
- //execute the save state
- saveResponse.block();
-
-
- //crate deferred action to retrieve the state
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
- MyData.class);
- //execute the retrieve of the state using options
- State myDataResponse = response.block();
-
- assertNotNull(myDataResponse.getEtag());
- assertNotNull(myDataResponse.getKey());
- assertNotNull(myDataResponse.getValue());
- assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- //change data to be udpated
- data.setPropertyA("data in property A2");
- data.setPropertyB("data in property B2");
- //create deferred action to update the action with options
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
- //update the state
- saveResponse.block();
-
-
- data.setPropertyA("last write");
- data.setPropertyB("data in property B2");
- //create deferred action to update the action with the same etag
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
- //throws an exception, the state was already udpated
- saveResponse.block();
-
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
- State myLastDataResponse = response.block();
-
- assertNotNull(myLastDataResponse.getEtag());
- assertNotNull(myLastDataResponse.getKey());
- assertNotNull(myLastDataResponse.getValue());
- assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
- assertEquals("data in property A2", myLastDataResponse.getValue().getPropertyA());
- assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
- }
-
- @Test()
- public void saveUpdateAndGetStateWithEtagAndStateOptionsLastWrite() {
- final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
-
- //create option with concurrency with first writte and consistency of strong
- StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE);
-
-
- //create Dummy data
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
-
- //create state using stateOptions
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
- //execute the save state
- saveResponse.block();
-
-
- //crate deferred action to retrieve the state
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
- MyData.class);
- //execute the retrieve of the state using options
- State myDataResponse = response.block();
-
- assertNotNull(myDataResponse.getEtag());
- assertNotNull(myDataResponse.getKey());
- assertNotNull(myDataResponse.getValue());
- assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- //change data to be udpated
- data.setPropertyA("data in property A2");
- data.setPropertyB("data in property B2");
- //create deferred action to update the action with options
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
- //update the state
- saveResponse.block();
-
-
- data.setPropertyA("last write");
- data.setPropertyB("data in property B2");
- //create deferred action to update the action with the same etag
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
- //update the state without an error
- saveResponse.block();
-
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
- State myLastDataResponse = response.block();
-
- assertNotNull(myLastDataResponse.getEtag());
- assertNotNull(myLastDataResponse.getKey());
- assertNotNull(myLastDataResponse.getValue());
- assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
- assertEquals("last write", myLastDataResponse.getValue().getPropertyA());
- assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
+ @Override
+ protected DaprClient buildDaprClient() {
+ return daprClient;
}
}
diff --git a/sdk-tests/src/test/java/io/dapr/it/state/HttpStateClientIT.java b/sdk-tests/src/test/java/io/dapr/it/state/HttpStateClientIT.java
index 3b49bf256..971d1de27 100644
--- a/sdk-tests/src/test/java/io/dapr/it/state/HttpStateClientIT.java
+++ b/sdk-tests/src/test/java/io/dapr/it/state/HttpStateClientIT.java
@@ -7,448 +7,41 @@ package io.dapr.it.state;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
-import io.dapr.client.domain.State;
-import io.dapr.client.domain.StateOptions;
-import io.dapr.it.BaseIT;
+import io.dapr.client.DaprClientHttp;
import io.dapr.it.DaprRun;
-import org.junit.Assert;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
-import org.junit.Test;
-import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
/**
* Test State HTTP DAPR capabilities using a DAPR instance with an empty service running
*/
-public class HttpStateClientIT extends BaseIT {
+public class HttpStateClientIT extends AbstractStateClientIT {
private static DaprRun daprRun;
+ private static DaprClient daprClient;
+
@BeforeClass
public static void init() throws Exception {
- daprRun = startDaprApp(HttpStateClientIT.class.getSimpleName(), 1000);
+ daprRun = startDaprApp(HttpStateClientIT.class.getSimpleName(), 5000);
daprRun.switchToHTTP();
+ daprClient = new DaprClientBuilder().build();
+
+ assertTrue(daprClient instanceof DaprClientHttp);
}
- @Test
- public void saveAndGetState() {
-
- //The key use to store the state
- final String stateKey = "myKey";
-
- //create the http client
- DaprClient daprClient = buildDaprClient();
-
- //creation of a dummy data
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
-
- //create of the deferred call to DAPR to store the state
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the save action
- saveResponse.block();
-
- //create of the deferred call to DAPR to get the state
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
-
- //retrieve the state
- State myDataResponse = response.block();
-
- //Assert that the response is the correct one
- Assert.assertNotNull(myDataResponse.getEtag());
- Assert.assertNotNull(myDataResponse.getKey());
- Assert.assertNotNull(myDataResponse.getValue());
- Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
+ @AfterClass
+ public static void tearDown() throws IOException {
+ daprClient.close();
}
- @Test
- public void saveUpdateAndGetState() {
-
- //The key use to store the state and be updated
- final String stateKey = "keyToBeUpdated";
-
- //create http DAPR client
- DaprClient daprClient = buildDaprClient();
- //Create dummy data to be store
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
-
- //Create deferred action to save the sate
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute save action to DAPR
- saveResponse.block();
-
- //change data properties
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B2");
- //create deferred action to update the sate without any etag or options
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the update action to DAPR
- saveResponse.block();
-
- //Create deferred action to retrieve the action
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
- MyData.class);
- //execute the retrieve of the state
- State myDataResponse = response.block();
-
- //review that the update was success action
- Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
- }
-
- @Test
- public void saveAndDeleteState() {
- //The key use to store the state and be deleted
- final String stateKey = "myeKeyToBeDeleted";
-
- //create DAPR client
- DaprClient daprClient = buildDaprClient();
-
- //Create dummy data to be store
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
- //Create deferred action to save the sate
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the save state action
- saveResponse.block();
-
- //Create deferred action to retrieve the state
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
- MyData.class);
- //execute the retrieve of the state
- State myDataResponse = response.block();
-
- //review that the state was saved correctly
- Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- //create deferred action to delete the state
- Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, null, null);
- //execute the delete action
- deleteResponse.block();
-
- //Create deferred action to retrieve the state
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
- //execute the retrieve of the state
- myDataResponse = response.block();
-
- //review that the action does not return any value, because the state was deleted
- Assert.assertNull(myDataResponse.getValue());
- }
-
-
- @Test
- public void saveUpdateAndGetStateWithEtag() {
- //The key use to store the state and be updated using etags
- final String stateKey = "keyToBeUpdatedWithEtag";
- //create DAPR client
- DaprClient daprClient = buildDaprClient();
- //Create dummy data to be store
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
-
- //Create deferred action to save the sate
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the save state action
- saveResponse.block();
-
- //Create deferred action to retrieve the state
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
- MyData.class);
- //execute the action for retrieve the state and the etag
- State myDataResponse = response.block();
-
- //review that the etag is not empty
- Assert.assertNotNull(myDataResponse.getEtag());
- Assert.assertNotNull(myDataResponse.getKey());
- Assert.assertNotNull(myDataResponse.getValue());
- Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- String firstETag = myDataResponse.getEtag();
-
- //change the data in order to update the state
- data.setPropertyA("data in property A2");
- data.setPropertyB("data in property B2");
- //Create deferred action to update the data using the correct etag
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, null);
- saveResponse.block();
-
-
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
- //retrive the data wihout any etag
- myDataResponse = response.block();
-
- //review that state value changes
- Assert.assertNotNull(myDataResponse.getEtag());
- //review that the etag changes after an update
- Assert.assertNotEquals(firstETag, myDataResponse.getEtag());
- Assert.assertNotNull(myDataResponse.getKey());
- Assert.assertNotNull(myDataResponse.getValue());
- Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
- Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
- }
-
-
- @Test(expected = RuntimeException.class)
- public void saveUpdateAndGetStateWithWrongEtag() {
- final String stateKey = "keyToBeUpdatedWithWrongEtag";
-
- //create DAPR client
- DaprClient daprClient = buildDaprClient();
- //Create dummy data to be store
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
-
- //Create deferred action to save the sate
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the save state action
- saveResponse.block();
-
- //Create deferred action to retrieve the state
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
- MyData.class);
- //execute the action for retrieve the state and the etag
- State myDataResponse = response.block();
-
- //review that the etag is not empty
- Assert.assertNotNull(myDataResponse.getEtag());
- Assert.assertNotNull(myDataResponse.getKey());
- Assert.assertNotNull(myDataResponse.getValue());
- Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- String firstETag = myDataResponse.getEtag();
-
- //change the data in order to update the state
- data.setPropertyA("data in property A2");
- data.setPropertyB("data in property B2");
- //Create deferred action to update the data using the incorrect etag
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "99999999999999", data, null);
- saveResponse.block();
-
-
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
- //retrive the data wihout any etag
- myDataResponse = response.block();
-
- //review that state value changes
- Assert.assertNotNull(myDataResponse.getEtag());
- //review that the etag changes after an update
- Assert.assertNotEquals(firstETag, myDataResponse.getEtag());
- Assert.assertNotNull(myDataResponse.getKey());
- Assert.assertNotNull(myDataResponse.getValue());
- Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
- Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
- }
-
- @Test
- public void saveAndDeleteStateWithEtag() {
- final String stateKey = "myeKeyToBeDeletedWithEtag";
- //create DAPR client
- DaprClient daprClient = buildDaprClient();
- //Create dummy data to be store
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
- //Create deferred action to save the sate
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the save state action
- saveResponse.block();
-
- //Create deferred action to get the state with the etag
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
- MyData.class);
- //execute the get state
- State myDataResponse = response.block();
-
- Assert.assertNotNull(myDataResponse.getEtag());
- Assert.assertNotNull(myDataResponse.getKey());
- Assert.assertNotNull(myDataResponse.getValue());
- Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- //Create deferred action to delete an state sending the etag
- Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), null);
- //execute the delete of the state
- deleteResponse.block();
-
- //Create deferred action to get the sate without an etag
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
- myDataResponse = response.block();
-
- //Review that the response is null, because the state was deleted
- Assert.assertNull(myDataResponse.getValue());
- }
-
-
- @Test(expected = RuntimeException.class)
- public void saveAndDeleteStateWithWrongEtag() {
- final String stateKey = "myeKeyToBeDeletedWithWrongEtag";
-
- //create DAPR client
- DaprClient daprClient = buildDaprClient();
- //Create dummy data to be store
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
- //Create deferred action to save the sate
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
- //execute the save state action
- saveResponse.block();
-
- //Create deferred action to get the state with the etag
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null),
- MyData.class);
- //execute the get state
- State myDataResponse = response.block();
-
- Assert.assertNotNull(myDataResponse.getEtag());
- Assert.assertNotNull(myDataResponse.getKey());
- Assert.assertNotNull(myDataResponse.getValue());
- Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- //Create deferred action to delete an state sending the incorrect etag
- Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999999", null);
- //execute the delete of the state, this should trhow an exception
- deleteResponse.block();
-
- //Create deferred action to get the sate without an etag
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
- myDataResponse = response.block();
-
- //Review that the response is null, because the state was deleted
- Assert.assertNull(myDataResponse.getValue());
- }
-
- @Test(expected = RuntimeException.class)
- public void saveUpdateAndGetStateWithEtagAndStateOptionsFirstWrite() {
- final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
-
- //create option with concurrency with first writte and consistency of strong
- StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG,
- StateOptions.Concurrency.FIRST_WRITE);
-
- //create dapr client
- DaprClient daprClient = buildDaprClient();
- //create Dummy data
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
-
- //create state using stateOptions
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
- //execute the save state
- saveResponse.block();
-
-
- //crate deferred action to retrieve the state
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
- MyData.class);
- //execute the retrieve of the state using options
- State myDataResponse = response.block();
-
- Assert.assertNotNull(myDataResponse.getEtag());
- Assert.assertNotNull(myDataResponse.getKey());
- Assert.assertNotNull(myDataResponse.getValue());
- Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- //change data to be udpated
- data.setPropertyA("data in property A2");
- data.setPropertyB("data in property B2");
- //create deferred action to update the action with options
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
- //update the state
- saveResponse.block();
-
-
- data.setPropertyA("last write");
- data.setPropertyB("data in property B2");
- //create deferred action to update the action with the same etag
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
- //throws an exception, the state was already udpated
- saveResponse.block();
-
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
- State myLastDataResponse = response.block();
-
- Assert.assertNotNull(myLastDataResponse.getEtag());
- Assert.assertNotNull(myLastDataResponse.getKey());
- Assert.assertNotNull(myLastDataResponse.getValue());
- Assert.assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
- Assert.assertEquals("data in property A2", myLastDataResponse.getValue().getPropertyA());
- Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
- }
-
- @Test()
- public void saveUpdateAndGetStateWithEtagAndStateOptionsLastWrite() {
- final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
-
- //create option with concurrency with first writte and consistency of strong
- StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE);
-
- //create dapr client
- DaprClient daprClient = buildDaprClient();
- //create Dummy data
- MyData data = new MyData();
- data.setPropertyA("data in property A");
- data.setPropertyB("data in property B");
-
- //create state using stateOptions
- Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
- //execute the save state
- saveResponse.block();
-
-
- //crate deferred action to retrieve the state
- Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
- MyData.class);
- //execute the retrieve of the state using options
- State myDataResponse = response.block();
-
- Assert.assertNotNull(myDataResponse.getEtag());
- Assert.assertNotNull(myDataResponse.getKey());
- Assert.assertNotNull(myDataResponse.getValue());
- Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
- Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
-
- //change data to be udpated
- data.setPropertyA("data in property A2");
- data.setPropertyB("data in property B2");
- //create deferred action to update the action with options
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
- //update the state
- saveResponse.block();
-
-
- data.setPropertyA("last write");
- data.setPropertyB("data in property B2");
- //create deferred action to update the action with the same etag
- saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
- //update the state without an error
- saveResponse.block();
-
- response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
- State myLastDataResponse = response.block();
-
- Assert.assertNotNull(myLastDataResponse.getEtag());
- Assert.assertNotNull(myLastDataResponse.getKey());
- Assert.assertNotNull(myLastDataResponse.getValue());
- Assert.assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
- Assert.assertEquals("last write", myLastDataResponse.getValue().getPropertyA());
- Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
- }
-
- private static DaprClient buildDaprClient() {
- return new DaprClientBuilder().build();
+ @Override
+ protected DaprClient buildDaprClient() {
+ return daprClient;
}
}
diff --git a/sdk-tests/src/test/java/io/dapr/it/state/MyData.java b/sdk-tests/src/test/java/io/dapr/it/state/MyData.java
index e99a2e2eb..a638f5a52 100644
--- a/sdk-tests/src/test/java/io/dapr/it/state/MyData.java
+++ b/sdk-tests/src/test/java/io/dapr/it/state/MyData.java
@@ -5,6 +5,8 @@
package io.dapr.it.state;
+import java.util.Objects;
+
public class MyData {
/// Gets or sets the value for PropertyA.
@@ -13,8 +15,6 @@ public class MyData {
/// Gets or sets the value for PropertyB.
private String propertyB;
- private MyData myData;
-
public String getPropertyB() {
return propertyB;
}
@@ -39,11 +39,17 @@ public class MyData {
'}';
}
- public MyData getMyData() {
- return myData;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ MyData myData = (MyData) o;
+ return Objects.equals(propertyA, myData.propertyA) &&
+ Objects.equals(propertyB, myData.propertyB);
}
- public void setMyData(MyData myData) {
- this.myData = myData;
+ @Override
+ public int hashCode() {
+ return Objects.hash(propertyA, propertyB);
}
}
diff --git a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java
index 39ed6f202..15878368b 100644
--- a/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java
+++ b/sdk/src/main/java/io/dapr/client/AbstractDaprClient.java
@@ -13,6 +13,7 @@ import io.dapr.client.domain.GetSecretRequest;
import io.dapr.client.domain.GetSecretRequestBuilder;
import io.dapr.client.domain.GetStateRequest;
import io.dapr.client.domain.GetStateRequestBuilder;
+import io.dapr.client.domain.GetStatesRequestBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeBindingRequestBuilder;
@@ -302,17 +303,20 @@ abstract class AbstractDaprClient implements DaprClient {
return this.getState(stateStoreName, key, etag, options, TypeRef.get(clazz));
}
- private State buildStateKeyValue(
- DaprProtos.GetStateResponse response,
- String requestedKey,
- StateOptions stateOptions,
- TypeRef type) throws IOException {
- ByteString payload = response.getData();
- byte[] data = payload == null ? null : payload.toByteArray();
- T value = stateSerializer.deserialize(data, type);
- String etag = response.getEtag();
- String key = requestedKey;
- return new State<>(value, key, etag, stateOptions);
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono>> getStates(String stateStoreName, List keys, TypeRef type) {
+ return this.getStates(new GetStatesRequestBuilder(stateStoreName, keys).build(), type).map(r -> r.getObject());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono>> getStates(String stateStoreName, List keys, Class clazz) {
+ return this.getStates(stateStoreName, keys, TypeRef.get(clazz));
}
/**
diff --git a/sdk/src/main/java/io/dapr/client/DaprClient.java b/sdk/src/main/java/io/dapr/client/DaprClient.java
index 504e86db3..6cf9dcb78 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClient.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClient.java
@@ -8,6 +8,7 @@ package io.dapr.client;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.GetSecretRequest;
import io.dapr.client.domain.GetStateRequest;
+import io.dapr.client.domain.GetStatesRequest;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeServiceRequest;
@@ -349,6 +350,19 @@ public interface DaprClient extends Closeable {
*/
Mono> getState(String stateStoreName, String key, String etag, StateOptions options, TypeRef type);
+ /**
+ * Retrieve a State based on their key.
+ *
+ * @param stateStoreName The name of the state store.
+ * @param key The key of the State to be retrieved.
+ * @param etag Optional etag for conditional get
+ * @param options Optional settings for retrieve operation.
+ * @param clazz The Type of State needed as return.
+ * @param The Type of the return.
+ * @return A Mono Plan for the requested State.
+ */
+ Mono> getState(String stateStoreName, String key, String etag, StateOptions options, Class clazz);
+
/**
* Retrieve a State based on their key.
*
@@ -360,17 +374,36 @@ public interface DaprClient extends Closeable {
Mono>> getState(GetStateRequest request, TypeRef type);
/**
- * Retrieve a State based on their key.
+ * Retrieve bulk States based on their keys.
*
* @param stateStoreName The name of the state store.
- * @param key The key of the State to be retrieved.
- * @param etag Optional etag for conditional get
- * @param options Optional settings for retrieve operation.
+ * @param keys The keys of the State to be retrieved.
+ * @param type The type of State needed as return.
+ * @param The type of the return.
+ * @return A Mono Plan for the requested State.
+ */
+ Mono>> getStates(String stateStoreName, List keys, TypeRef type);
+
+ /**
+ * Retrieve bulk States based on their keys.
+ *
+ * @param stateStoreName The name of the state store.
+ * @param keys The keys of the State to be retrieved.
* @param clazz The type of State needed as return.
* @param The type of the return.
* @return A Mono Plan for the requested State.
*/
- Mono> getState(String stateStoreName, String key, String etag, StateOptions options, Class clazz);
+ Mono>> getStates(String stateStoreName, List keys, Class clazz);
+
+ /**
+ * Retrieve bulk States based on their keys.
+ *
+ * @param request The request to get state.
+ * @param type The Type of State needed as return.
+ * @param The Type of the return.
+ * @return A Mono Plan for the requested State.
+ */
+ Mono>>> getStates(GetStatesRequest request, TypeRef type);
/**
* Save/Update a list of states.
diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
index 39ecd46ad..606d77b36 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
@@ -5,6 +5,7 @@
package io.dapr.client;
+import com.google.common.base.Strings;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
@@ -12,6 +13,7 @@ import com.google.protobuf.Empty;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.GetSecretRequest;
import io.dapr.client.domain.GetStateRequest;
+import io.dapr.client.domain.GetStatesRequest;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeServiceRequest;
@@ -52,6 +54,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
/**
* An adapter for the GRPC Client.
@@ -253,6 +256,74 @@ public class DaprClientGrpc extends AbstractDaprClient {
}
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono>>> getStates(GetStatesRequest request, TypeRef type) {
+ try {
+ final String stateStoreName = request.getStateStoreName();
+ final List keys = request.getKeys();
+ final int parallelism = request.getParallelism();
+ final Context context = request.getContext();
+ if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("State store name cannot be null or empty.");
+ }
+ if (keys == null || keys.isEmpty()) {
+ throw new IllegalArgumentException("Key cannot be null or empty.");
+ }
+
+ if (parallelism < 0) {
+ throw new IllegalArgumentException("Parallelism cannot be negative.");
+ }
+ DaprProtos.GetBulkStateRequest.Builder builder = DaprProtos.GetBulkStateRequest.newBuilder()
+ .setStoreName(stateStoreName)
+ .addAllKeys(keys)
+ .setParallelism(parallelism);
+
+ DaprProtos.GetBulkStateRequest envelope = builder.build();
+ return Mono.fromCallable(wrap(context, () -> {
+ ListenableFuture futureResponse = client.getBulkState(envelope);
+ DaprProtos.GetBulkStateResponse response = null;
+ try {
+ response = futureResponse.get();
+ } catch (NullPointerException npe) {
+ return null;
+ }
+
+ return response
+ .getItemsList()
+ .stream()
+ .map(b -> {
+ try {
+ return buildStateKeyValue(b, type);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(Collectors.toList());
+ })).map(s -> new Response(context, s));
+ } catch (Exception ex) {
+ return Mono.error(ex);
+ }
+ }
+
+ private State buildStateKeyValue(
+ DaprProtos.BulkStateItem item,
+ TypeRef type) throws IOException {
+ String key = item.getKey();
+ String error = item.getError();
+ if (!Strings.isNullOrEmpty(error)) {
+ return new State<>(key, error);
+ }
+
+ ByteString payload = item.getData();
+ byte[] data = payload == null ? null : payload.toByteArray();
+ T value = stateSerializer.deserialize(data, type);
+ String etag = item.getEtag();
+ return new State<>(value, key, etag);
+ }
+
private State buildStateKeyValue(
DaprProtos.GetStateResponse response,
String requestedKey,
@@ -300,13 +371,13 @@ public class DaprClientGrpc extends AbstractDaprClient {
private CommonProtos.StateItem.Builder buildStateRequest(State state) throws IOException {
byte[] bytes = stateSerializer.serialize(state.getValue());
- ByteString data = ByteString.copyFrom(bytes);
+
CommonProtos.StateItem.Builder stateBuilder = CommonProtos.StateItem.newBuilder();
if (state.getEtag() != null) {
stateBuilder.setEtag(state.getEtag());
}
- if (data != null) {
- stateBuilder.setValue(data);
+ if (bytes != null) {
+ stateBuilder.setValue(ByteString.copyFrom(bytes));
}
stateBuilder.setKey(state.getKey());
CommonProtos.StateOptions.Builder optionBuilder = null;
@@ -347,8 +418,6 @@ public class DaprClientGrpc extends AbstractDaprClient {
CommonProtos.StateOptions.Builder optionBuilder = null;
if (options != null) {
- optionBuilder = CommonProtos.StateOptions.newBuilder();
-
optionBuilder = CommonProtos.StateOptions.newBuilder();
if (options.getConcurrency() != null) {
optionBuilder.setConcurrency(getGrpcStateConcurrency(options));
diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java
index 2c0cbc5a0..23f42d1b7 100644
--- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java
+++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java
@@ -5,9 +5,12 @@
package io.dapr.client;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Strings;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.GetSecretRequest;
import io.dapr.client.domain.GetStateRequest;
+import io.dapr.client.domain.GetStatesRequest;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeServiceRequest;
@@ -16,6 +19,7 @@ import io.dapr.client.domain.Response;
import io.dapr.client.domain.SaveStateRequest;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
+import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.TypeRef;
@@ -26,6 +30,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -74,6 +79,11 @@ public class DaprClientHttp extends AbstractDaprClient {
*/
public static final String SECRETS_PATH = DaprHttp.API_VERSION + "/secrets";
+ /**
+ * State Path format for bulk state API.
+ */
+ public static final String STATE_BULK_PATH_FORMAT = STATE_PATH + "/%s/bulk";
+
/**
* The HTTP client to be used.
*
@@ -256,6 +266,50 @@ public class DaprClientHttp extends AbstractDaprClient {
return Mono.error(ex);
}
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono>>> getStates(GetStatesRequest request, TypeRef type) {
+ try {
+ final String stateStoreName = request.getStateStoreName();
+ final List keys = request.getKeys();
+ final int parallelism = request.getParallelism();
+ final Context context = request.getContext();
+ if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("State store name cannot be null or empty.");
+ }
+ if (keys == null || keys.isEmpty()) {
+ throw new IllegalArgumentException("Key cannot be null or empty.");
+ }
+
+ if (parallelism < 0) {
+ throw new IllegalArgumentException("Parallelism cannot be negative.");
+ }
+
+ String url = String.format(STATE_BULK_PATH_FORMAT, stateStoreName);
+ Map jsonMap = new HashMap<>();
+ jsonMap.put("keys", keys);
+ jsonMap.put("parallelism", parallelism);
+
+ byte[] requestBody = INTERNAL_SERIALIZER.serialize(jsonMap);
+ return this.client
+ .invokeApi(DaprHttp.HttpMethods.POST.name(), url, null, requestBody, null, context)
+ .flatMap(s -> {
+ try {
+ return Mono.just(buildStates(s, type));
+ } catch (Exception ex) {
+ return Mono.error(ex);
+ }
+ })
+ .map(r -> new Response<>(context, r));
+
+ } catch (Exception ex) {
+ return Mono.error(ex);
+ }
+ }
+
/**
* {@inheritDoc}
@@ -295,7 +349,7 @@ public class DaprClientHttp extends AbstractDaprClient {
.invokeApi(DaprHttp.HttpMethods.GET.name(), url.toString(), urlParameters, headers, context)
.flatMap(s -> {
try {
- return Mono.just(buildStateKeyValue(s, key, options, type));
+ return Mono.just(buildState(s, key, options, type));
} catch (Exception ex) {
return Mono.error(ex);
}
@@ -399,10 +453,10 @@ public class DaprClientHttp extends AbstractDaprClient {
* @param requestedKey The Key Requested.
* @param type The Class of the Value of the state
* @param The Type of the Value of the state
- * @return A StateKeyValue instance
- * @throws IOException If there's a issue deserialzing the response.
+ * @return A State instance
+ * @throws IOException If there's a issue deserializing the response.
*/
- private State buildStateKeyValue(
+ private State buildState(
DaprHttp.Response response, String requestedKey, StateOptions stateOptions, TypeRef type) throws IOException {
// The state is in the body directly, so we use the state serializer here.
T value = stateSerializer.deserialize(response.getBody(), type);
@@ -414,6 +468,39 @@ public class DaprClientHttp extends AbstractDaprClient {
return new State<>(value, key, etag, stateOptions);
}
+ /**
+ * Builds a State object based on the Response.
+ *
+ * @param response The response of the HTTP Call
+ * @param type The Class of the Value of the state
+ * @param The Type of the Value of the state
+ * @return A list of states.
+ * @throws IOException If there's a issue deserializing the response.
+ */
+ private List> buildStates(
+ DaprHttp.Response response, TypeRef type) throws IOException {
+ JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody());
+ List> result = new ArrayList<>();
+ for (Iterator it = root.elements(); it.hasNext(); ) {
+ JsonNode node = it.next();
+ String key = node.path("key").asText();
+ String error = node.path("error").asText();
+ if (!Strings.isNullOrEmpty(error)) {
+ result.add(new State<>(key, error));
+ continue;
+ }
+
+ String etag = node.path("etag").asText();
+ // TODO(artursouza): JSON cannot differentiate if data returned is String or byte[], it is ambiguous.
+ // This is not a high priority since GRPC is the default (and recommended) client implementation.
+ byte[] data = node.path("data").toString().getBytes(Properties.STRING_CHARSET.get());
+ T value = stateSerializer.deserialize(data, type);
+ result.add(new State<>(value, key, etag));
+ }
+
+ return result;
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java
index b51838f04..7a8df82fc 100644
--- a/sdk/src/main/java/io/dapr/client/ObjectSerializer.java
+++ b/sdk/src/main/java/io/dapr/client/ObjectSerializer.java
@@ -8,6 +8,7 @@ package io.dapr.client;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.domain.CloudEvent;
import io.dapr.utils.TypeRef;
@@ -112,6 +113,17 @@ public class ObjectSerializer {
return OBJECT_MAPPER.readValue(content, javaType);
}
+ /**
+ * Parses the JSON content into a node for fine-grained processing.
+ *
+ * @param content JSON content.
+ * @return JsonNode.
+ * @throws IOException In case content cannot be parsed.
+ */
+ public JsonNode parseNode(byte[] content) throws IOException {
+ return OBJECT_MAPPER.readTree(content);
+ }
+
/**
* Parses a given String to the corresponding object defined by class.
*
diff --git a/sdk/src/main/java/io/dapr/client/domain/GetStateRequestBuilder.java b/sdk/src/main/java/io/dapr/client/domain/GetStateRequestBuilder.java
index efe5e4bb3..5a3eb34b2 100644
--- a/sdk/src/main/java/io/dapr/client/domain/GetStateRequestBuilder.java
+++ b/sdk/src/main/java/io/dapr/client/domain/GetStateRequestBuilder.java
@@ -10,7 +10,7 @@ import java.util.Collections;
import java.util.Map;
/**
- * Builds a request to publish an event.
+ * Builds a request to request state.
*/
public class GetStateRequestBuilder {
diff --git a/sdk/src/main/java/io/dapr/client/domain/GetStatesRequest.java b/sdk/src/main/java/io/dapr/client/domain/GetStatesRequest.java
new file mode 100644
index 000000000..ca90eed2a
--- /dev/null
+++ b/sdk/src/main/java/io/dapr/client/domain/GetStatesRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.client.domain;
+
+import io.grpc.Context;
+
+import java.util.List;
+
+/**
+ * A request to get bulk state by keys.
+ */
+public class GetStatesRequest {
+
+ private String stateStoreName;
+
+ private List keys;
+
+ private int parallelism;
+
+ private Context context;
+
+ public String getStateStoreName() {
+ return stateStoreName;
+ }
+
+ void setStateStoreName(String stateStoreName) {
+ this.stateStoreName = stateStoreName;
+ }
+
+ public List getKeys() {
+ return keys;
+ }
+
+ void setKeys(List keys) {
+ this.keys = keys;
+ }
+
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ }
+
+ public Context getContext() {
+ return context;
+ }
+
+ void setContext(Context context) {
+ this.context = context;
+ }
+}
diff --git a/sdk/src/main/java/io/dapr/client/domain/GetStatesRequestBuilder.java b/sdk/src/main/java/io/dapr/client/domain/GetStatesRequestBuilder.java
new file mode 100644
index 000000000..9152416f1
--- /dev/null
+++ b/sdk/src/main/java/io/dapr/client/domain/GetStatesRequestBuilder.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.client.domain;
+
+import io.grpc.Context;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Builds a request to request states.
+ */
+public class GetStatesRequestBuilder {
+
+ private final String stateStoreName;
+
+ private final List keys;
+
+ private int parallelism = 1;
+
+ private Context context;
+
+ public GetStatesRequestBuilder(String stateStoreName, List keys) {
+ this.stateStoreName = stateStoreName;
+ this.keys = Collections.unmodifiableList(keys);
+ }
+
+ public GetStatesRequestBuilder(String stateStoreName, String... keys) {
+ this.stateStoreName = stateStoreName;
+ this.keys = Collections.unmodifiableList(Arrays.asList(keys));
+ }
+
+ public GetStatesRequestBuilder withParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
+
+ public GetStatesRequestBuilder withContext(Context context) {
+ this.context = context;
+ return this;
+ }
+
+ /**
+ * Builds a request object.
+ * @return Request object.
+ */
+ public GetStatesRequest build() {
+ GetStatesRequest request = new GetStatesRequest();
+ request.setStateStoreName(this.stateStoreName);
+ request.setKeys(this.keys);
+ request.setParallelism(this.parallelism);
+ request.setContext(this.context);
+ return request;
+ }
+
+}
diff --git a/sdk/src/main/java/io/dapr/client/domain/State.java b/sdk/src/main/java/io/dapr/client/domain/State.java
index a71c7fa20..08c53e73e 100644
--- a/sdk/src/main/java/io/dapr/client/domain/State.java
+++ b/sdk/src/main/java/io/dapr/client/domain/State.java
@@ -11,28 +11,50 @@ package io.dapr.client.domain;
* @param The type of the value of the sate
*/
public class State {
+
/**
* The value of the state.
*/
private final T value;
+
/**
* The key of the state.
*/
private final String key;
+
/**
* The ETag to be used
* Keep in mind that for some state stores (like reids) only numbers are supported.
*/
private final String etag;
+ /**
+ * The error in case the key could not be retrieved.
+ */
+ private final String error;
+
/**
* The options used for saving the state.
*/
private final StateOptions options;
/**
- * Create an inmutable state
- * This Constructor MUST be used anytime you need to retrieve or delete a State.
+ * Create an immutable state reference to be retrieved or deleted.
+ * This Constructor CAN be used anytime you need to retrieve or delete a state.
+ *
+ * @param key - The key of the state
+ */
+ public State(String key) {
+ this.key = key;
+ this.value = null;
+ this.etag = null;
+ this.options = null;
+ this.error = null;
+ }
+
+ /**
+ * Create an immutable state reference to be retrieved or deleted.
+ * This Constructor CAN be used anytime you need to retrieve or delete a state.
*
* @param key - The key of the state
* @param etag - The etag of the state - Keep in mind that for some state stores (like redis) only numbers
@@ -44,16 +66,16 @@ public class State {
this.key = key;
this.etag = etag;
this.options = options;
+ this.error = null;
}
/**
- * Create an inmutable state.
- * This Constructor MUST be used anytime you want the state to be send for a Save operation.
+ * Create an immutable state.
+ * This Constructor CAN be used anytime you want the state to be saved.
*
* @param value - The value of the state.
* @param key - The key of the state.
- * @param etag - The etag of the state - Keep in mind that for some state stores (like redis)
- * only numbers are supported.
+ * @param etag - The etag of the state - for some state stores (like redis) only numbers are supported.
* @param options - REQUIRED when saving a state.
*/
public State(T value, String key, String etag, StateOptions options) {
@@ -61,6 +83,38 @@ public class State {
this.key = key;
this.etag = etag;
this.options = options;
+ this.error = null;
+ }
+
+ /**
+ * Create an immutable state.
+ * This Constructor CAN be used anytime you want the state to be saved.
+ *
+ * @param value - The value of the state.
+ * @param key - The key of the state.
+ * @param etag - The etag of the state - some state stores (like redis) only numbers are supported.
+ */
+ public State(T value, String key, String etag) {
+ this.value = value;
+ this.key = key;
+ this.etag = etag;
+ this.options = null;
+ this.error = null;
+ }
+
+ /**
+ * Create an immutable state.
+ * This Constructor MUST be used anytime the key could not be retrieved and contains an error.
+ *
+ * @param key - The key of the state.
+ * @param error - Error when fetching the state.
+ */
+ public State(String key, String error) {
+ this.value = null;
+ this.key = key;
+ this.etag = null;
+ this.options = null;
+ this.error = error;
}
/**
@@ -90,6 +144,16 @@ public class State {
return etag;
}
+ /**
+ * Retrieve the error for this state.
+ *
+ * @return The error for this state.
+ */
+
+ public String getError() {
+ return error;
+ }
+
/**
* Retrieve the Options used for saving the state.
*
@@ -123,6 +187,10 @@ public class State {
return false;
}
+ if (getError() != null ? !getEtag().equals(that.getError()) : that.getError() != null) {
+ return false;
+ }
+
if (getOptions() != null ? !getOptions().equals(that.getOptions()) : that.getOptions() != null) {
return false;
}
@@ -135,6 +203,7 @@ public class State {
int result = getValue() != null ? getValue().hashCode() : 0;
result = 31 * result + (getKey() != null ? getKey().hashCode() : 0);
result = 31 * result + (getEtag() != null ? getEtag().hashCode() : 0);
+ result = 31 * result + (getError() != null ? getError().hashCode() : 0);
result = 31 * result + (getOptions() != null ? options.hashCode() : 0);
return result;
}
@@ -145,6 +214,7 @@ public class State {
+ "value=" + value
+ ", key='" + key + "'"
+ ", etag='" + etag + "'"
+ + ", etag='" + error + "'"
+ ", options={'" + options.toString() + "}"
+ "}";
}
diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java
index b23b27fe1..f31e92bd8 100644
--- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java
+++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java
@@ -34,12 +34,15 @@ import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static com.google.common.util.concurrent.Futures.addCallback;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -753,6 +756,183 @@ public class DaprClientGrpcTest {
assertEquals(expectedState, result.block());
}
+
+ @Test
+ public void getStatesString() throws IOException {
+ DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
+ .addItems(DaprProtos.BulkStateItem.newBuilder()
+ .setData(serialize("hello world"))
+ .setKey("100")
+ .setEtag("1")
+ .build())
+ .addItems(DaprProtos.BulkStateItem.newBuilder()
+ .setKey("200")
+ .setError("not found")
+ .build())
+ .build();
+ SettableFuture settableFuture = SettableFuture.create();
+ MockCallback callback = new MockCallback<>(responseEnvelope);
+ addCallback(settableFuture, callback, directExecutor());
+ when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class)))
+ .thenAnswer(c -> {
+ settableFuture.set(responseEnvelope);
+ return settableFuture;
+ });
+ List> result = adapter.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), String.class).block();
+ assertTrue(callback.wasCalled);
+
+ assertEquals(2, result.size());
+ assertEquals("100", result.stream().findFirst().get().getKey());
+ assertEquals("hello world", result.stream().findFirst().get().getValue());
+ assertEquals("1", result.stream().findFirst().get().getEtag());
+ assertNull(result.stream().findFirst().get().getError());
+ assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
+ assertNull(result.stream().skip(1).findFirst().get().getValue());
+ assertNull(result.stream().skip(1).findFirst().get().getEtag());
+ assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
+ }
+
+ @Test
+ public void getStatesInteger() throws IOException {
+ DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
+ .addItems(DaprProtos.BulkStateItem.newBuilder()
+ .setData(serialize(1234))
+ .setKey("100")
+ .setEtag("1")
+ .build())
+ .addItems(DaprProtos.BulkStateItem.newBuilder()
+ .setKey("200")
+ .setError("not found")
+ .build())
+ .build();
+ SettableFuture settableFuture = SettableFuture.create();
+ MockCallback callback = new MockCallback<>(responseEnvelope);
+ addCallback(settableFuture, callback, directExecutor());
+ when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class)))
+ .thenAnswer(c -> {
+ settableFuture.set(responseEnvelope);
+ return settableFuture;
+ });
+ List> result = adapter.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), int.class).block();
+ assertTrue(callback.wasCalled);
+
+ assertEquals(2, result.size());
+ assertEquals("100", result.stream().findFirst().get().getKey());
+ assertEquals(1234, (int)result.stream().findFirst().get().getValue());
+ assertEquals("1", result.stream().findFirst().get().getEtag());
+ assertNull(result.stream().findFirst().get().getError());
+ assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
+ assertNull(result.stream().skip(1).findFirst().get().getValue());
+ assertNull(result.stream().skip(1).findFirst().get().getEtag());
+ assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
+ }
+
+ @Test
+ public void getStatesBoolean() throws IOException {
+ DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
+ .addItems(DaprProtos.BulkStateItem.newBuilder()
+ .setData(serialize(true))
+ .setKey("100")
+ .setEtag("1")
+ .build())
+ .addItems(DaprProtos.BulkStateItem.newBuilder()
+ .setKey("200")
+ .setError("not found")
+ .build())
+ .build();
+ SettableFuture settableFuture = SettableFuture.create();
+ MockCallback callback = new MockCallback<>(responseEnvelope);
+ addCallback(settableFuture, callback, directExecutor());
+ when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class)))
+ .thenAnswer(c -> {
+ settableFuture.set(responseEnvelope);
+ return settableFuture;
+ });
+ List> result = adapter.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), boolean.class).block();
+ assertTrue(callback.wasCalled);
+
+ assertEquals(2, result.size());
+ assertEquals("100", result.stream().findFirst().get().getKey());
+ assertEquals(true, result.stream().findFirst().get().getValue());
+ assertEquals("1", result.stream().findFirst().get().getEtag());
+ assertNull(result.stream().findFirst().get().getError());
+ assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
+ assertNull(result.stream().skip(1).findFirst().get().getValue());
+ assertNull(result.stream().skip(1).findFirst().get().getEtag());
+ assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
+ }
+
+ @Test
+ public void getStatesByteArray() throws IOException {
+ DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
+ .addItems(DaprProtos.BulkStateItem.newBuilder()
+ .setData(serialize(new byte[]{1, 2, 3}))
+ .setKey("100")
+ .setEtag("1")
+ .build())
+ .addItems(DaprProtos.BulkStateItem.newBuilder()
+ .setKey("200")
+ .setError("not found")
+ .build())
+ .build();
+ SettableFuture settableFuture = SettableFuture.create();
+ MockCallback callback = new MockCallback<>(responseEnvelope);
+ addCallback(settableFuture, callback, directExecutor());
+ when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class)))
+ .thenAnswer(c -> {
+ settableFuture.set(responseEnvelope);
+ return settableFuture;
+ });
+ List> result = adapter.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), byte[].class).block();
+ assertTrue(callback.wasCalled);
+
+ assertEquals(2, result.size());
+ assertEquals("100", result.stream().findFirst().get().getKey());
+ assertArrayEquals(new byte[]{1, 2, 3}, result.stream().findFirst().get().getValue());
+ assertEquals("1", result.stream().findFirst().get().getEtag());
+ assertNull(result.stream().findFirst().get().getError());
+ assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
+ assertNull(result.stream().skip(1).findFirst().get().getValue());
+ assertNull(result.stream().skip(1).findFirst().get().getEtag());
+ assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
+ }
+
+ @Test
+ public void getStatesObject() throws IOException {
+ MyObject object = new MyObject(1, "Event");
+ DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
+ .addItems(DaprProtos.BulkStateItem.newBuilder()
+ .setData(serialize(object))
+ .setKey("100")
+ .setEtag("1")
+ .build())
+ .addItems(DaprProtos.BulkStateItem.newBuilder()
+ .setKey("200")
+ .setError("not found")
+ .build())
+ .build();
+ SettableFuture settableFuture = SettableFuture.create();
+ MockCallback callback = new MockCallback<>(responseEnvelope);
+ addCallback(settableFuture, callback, directExecutor());
+ when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class)))
+ .thenAnswer(c -> {
+ settableFuture.set(responseEnvelope);
+ return settableFuture;
+ });
+ List> result = adapter.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), MyObject.class).block();
+ assertTrue(callback.wasCalled);
+
+ assertEquals(2, result.size());
+ assertEquals("100", result.stream().findFirst().get().getKey());
+ assertEquals(object, result.stream().findFirst().get().getValue());
+ assertEquals("1", result.stream().findFirst().get().getEtag());
+ assertNull(result.stream().findFirst().get().getError());
+ assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
+ assertNull(result.stream().skip(1).findFirst().get().getValue());
+ assertNull(result.stream().skip(1).findFirst().get().getEtag());
+ assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
+ }
+
@Test(expected = RuntimeException.class)
public void deleteStateExceptionThrowTest() {
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))).thenThrow(RuntimeException.class);
diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java
index 019fb7ef9..5c37eb63f 100644
--- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java
+++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java
@@ -21,11 +21,13 @@ import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -38,6 +40,9 @@ public class DaprClientHttpTest {
private static final String SECRET_STORE_NAME = "MySecretStore";
+ private final String EXPECTED_RESULT =
+ "{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}";
+
private DaprClientHttp daprClientHttp;
private DaprHttp daprHttp;
@@ -46,9 +51,6 @@ public class DaprClientHttpTest {
private MockInterceptor mockInterceptor;
- private final String EXPECTED_RESULT =
- "{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}";
-
@Before
public void setUp() throws Exception {
mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
@@ -382,7 +384,120 @@ public class DaprClientHttpTest {
}
@Test
- public void getStates() {
+ public void getStatesString() {
+ mockInterceptor.addRule()
+ .post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk")
+ .respond("[{\"key\": \"100\", \"data\": \"hello world\", \"etag\": \"1\"}," +
+ "{\"key\": \"200\", \"error\": \"not found\"}]");
+ daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
+ daprClientHttp = new DaprClientHttp(daprHttp);
+ List> result =
+ daprClientHttp.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), String.class).block();
+ assertEquals(2, result.size());
+ assertEquals("100", result.stream().findFirst().get().getKey());
+ assertEquals("hello world", result.stream().findFirst().get().getValue());
+ assertEquals("1", result.stream().findFirst().get().getEtag());
+ assertNull(result.stream().findFirst().get().getError());
+ assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
+ assertNull(result.stream().skip(1).findFirst().get().getValue());
+ assertNull(result.stream().skip(1).findFirst().get().getEtag());
+ assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
+ }
+
+ @Test
+ public void getStatesInteger() {
+ mockInterceptor.addRule()
+ .post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk")
+ .respond("[{\"key\": \"100\", \"data\": 1234, \"etag\": \"1\"}," +
+ "{\"key\": \"200\", \"error\": \"not found\"}]");
+ daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
+ daprClientHttp = new DaprClientHttp(daprHttp);
+ List> result =
+ daprClientHttp.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), int.class).block();
+ assertEquals(2, result.size());
+ assertEquals("100", result.stream().findFirst().get().getKey());
+ assertEquals(1234, (int)result.stream().findFirst().get().getValue());
+ assertEquals("1", result.stream().findFirst().get().getEtag());
+ assertNull(result.stream().findFirst().get().getError());
+ assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
+ assertNull(result.stream().skip(1).findFirst().get().getValue());
+ assertNull(result.stream().skip(1).findFirst().get().getEtag());
+ assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
+ }
+
+ @Test
+ public void getStatesBoolean() {
+ mockInterceptor.addRule()
+ .post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk")
+ .respond("[{\"key\": \"100\", \"data\": true, \"etag\": \"1\"}," +
+ "{\"key\": \"200\", \"error\": \"not found\"}]");
+ daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
+ daprClientHttp = new DaprClientHttp(daprHttp);
+ List> result =
+ daprClientHttp.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), boolean.class).block();
+ assertEquals(2, result.size());
+ assertEquals("100", result.stream().findFirst().get().getKey());
+ assertEquals(true, (boolean)result.stream().findFirst().get().getValue());
+ assertEquals("1", result.stream().findFirst().get().getEtag());
+ assertNull(result.stream().findFirst().get().getError());
+ assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
+ assertNull(result.stream().skip(1).findFirst().get().getValue());
+ assertNull(result.stream().skip(1).findFirst().get().getEtag());
+ assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
+ }
+
+ @Test
+ public void getStatesByteArray() {
+ byte[] value = new byte[]{1, 2, 3};
+ String base64Value = Base64.getEncoder().encodeToString(value);
+ mockInterceptor.addRule()
+ .post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk")
+ .respond("[{\"key\": \"100\", \"data\": \"" + base64Value + "\", \"etag\": \"1\"}," +
+ "{\"key\": \"200\", \"error\": \"not found\"}]");
+ daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
+ daprClientHttp = new DaprClientHttp(daprHttp);
+ // JSON cannot differentiate if data returned is String or byte[], it is ambiguous. So we get base64 encoded back.
+ // So, users should use String instead of byte[].
+ List> result =
+ daprClientHttp.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), String.class).block();
+ assertEquals(2, result.size());
+ assertEquals("100", result.stream().findFirst().get().getKey());
+ assertEquals(base64Value, result.stream().findFirst().get().getValue());
+ assertEquals("1", result.stream().findFirst().get().getEtag());
+ assertNull(result.stream().findFirst().get().getError());
+ assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
+ assertNull(result.stream().skip(1).findFirst().get().getValue());
+ assertNull(result.stream().skip(1).findFirst().get().getEtag());
+ assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
+ }
+
+ @Test
+ public void getStatesObject() {
+ MyObject object = new MyObject(1, "Event");
+ mockInterceptor.addRule()
+ .post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk")
+ .respond("[{\"key\": \"100\", \"data\": " +
+ "{ \"id\": \"" + object.id + "\", \"value\": \"" + object.value + "\"}, \"etag\": \"1\"}," +
+ "{\"key\": \"200\", \"error\": \"not found\"}]");
+ daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
+ daprClientHttp = new DaprClientHttp(daprHttp);
+ // JSON cannot differentiate if data returned is String or byte[], it is ambiguous. So we get base64 encoded back.
+ // So, users should use String instead of byte[].
+ List> result =
+ daprClientHttp.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), MyObject.class).block();
+ assertEquals(2, result.size());
+ assertEquals("100", result.stream().findFirst().get().getKey());
+ assertEquals(object, result.stream().findFirst().get().getValue());
+ assertEquals("1", result.stream().findFirst().get().getEtag());
+ assertNull(result.stream().findFirst().get().getError());
+ assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
+ assertNull(result.stream().skip(1).findFirst().get().getValue());
+ assertNull(result.stream().skip(1).findFirst().get().getEtag());
+ assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
+ }
+
+ @Test
+ public void getState() {
StateOptions stateOptions = mock(StateOptions.class);
State stateKeyValue = new State("value", "key", "etag", stateOptions);
State stateKeyNull = new State("value", null, "etag", stateOptions);
@@ -715,4 +830,53 @@ public class DaprClientHttpTest {
assertEquals("mysecretvalue2", secret.get("mysecretkey2"));
}
}
+
+ public static class MyObject {
+ private Integer id;
+ private String value;
+
+ public MyObject() {
+ }
+
+ public MyObject(Integer id, String value) {
+ this.id = id;
+ this.value = value;
+ }
+
+ public Integer getId() {
+ return id;
+ }
+
+ public void setId(Integer id) {
+ this.id = id;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof MyObject)) return false;
+
+ MyObject myObject = (MyObject) o;
+
+ if (!getId().equals(myObject.getId())) return false;
+ if (getValue() != null ? !getValue().equals(myObject.getValue()) : myObject.getValue() != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getId().hashCode();
+ result = 31 * result + (getValue() != null ? getValue().hashCode() : 0);
+ return result;
+ }
+ }
}
\ No newline at end of file