Bulk get state (#335)

* unfinished work

* Working version of Bulk get state API.

Co-authored-by: Haishi2016 <hbai@microsoft.com>
This commit is contained in:
Artur Souza 2020-09-16 11:29:01 -07:00 committed by GitHub
parent e2ddc9f96d
commit baab109202
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1305 additions and 895 deletions

View File

@ -25,7 +25,7 @@ jobs:
DAPR_RUNTIME_VER: 0.10.0-rc.0
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/e7c9a643dfefbcfff0c2c26c12029259e6e81180/install/install.sh
DAPR_CLI_REF: e7c9a643dfefbcfff0c2c26c12029259e6e81180
DAPR_REF:
DAPR_REF: 1e3b7aa3202e268759bce6cf9b30b6d95471ab8c
OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }}
OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }}
GPG_KEY: ${{ secrets.GPG_KEY }}

View File

@ -17,7 +17,7 @@
<grpc.version>1.25.0</grpc.version>
<protobuf.version>3.11.0</protobuf.version>
<protoc.version>3.10.0</protoc.version>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/2f32759fb9798d79b6d3fa92b1d972b07a62b6e8/dapr/proto</dapr.proto.baseurl>
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/1660773c3da6740377f4440cae89dea93479c9f5/dapr/proto</dapr.proto.baseurl>
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
<maven-dependency-plugin.version>3.1.1</maven-dependency-plugin.version>
<maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>

View File

@ -78,6 +78,12 @@
<version>5.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.1.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -42,7 +42,7 @@ public class DaprRun {
Class serviceClass,
int maxWaitMilliseconds) {
// The app name needs to be deterministic since we depend on it to kill previous runs.
this.appName = String.format("%s_%s", testName, serviceClass == null ? "" : serviceClass.getSimpleName());
this.appName = serviceClass == null ? testName : String.format("%s_%s", testName, serviceClass.getSimpleName());
this.startCommand =
new Command(successMessage, buildDaprCommand(this.appName, serviceClass, ports));
this.listCommand = new Command(

View File

@ -0,0 +1,490 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.it.state;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Common test cases for Dapr client (GRPC and HTTP).
*/
public abstract class AbstractStateClientIT extends BaseIT {
@Test
public void saveAndGetState() {
//The key use to store the state
final String stateKey = "myKey";
//create the http client
DaprClient daprClient = buildDaprClient();
//creation of a dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//create of the deferred call to DAPR to store the state
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save action
saveResponse.block();
//create of the deferred call to DAPR to get the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey), MyData.class);
//retrieve the state
State<MyData> myDataResponse = response.block();
//Assert that the response is the correct one
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
}
@Test
public void saveAndGetBulkStates() {
final String stateKeyOne = UUID.randomUUID().toString();
final String stateKeyTwo = UUID.randomUUID().toString();
final String stateKeyThree = "NotFound";
DaprClient daprClient = buildDaprClient();
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//saves the states.
daprClient.saveState(STATE_STORE_NAME, stateKeyOne, "1", data, null).block();
daprClient.saveState(STATE_STORE_NAME, stateKeyTwo, null, null, null).block();
//retrieves states in bulk.
Mono<List<State<MyData>>> response =
daprClient.getStates(STATE_STORE_NAME, Arrays.asList(stateKeyOne, stateKeyTwo, stateKeyThree), MyData.class);
List<State<MyData>> result = response.block();
//Assert that the response is the correct one
assertEquals(3, result.size());
assertEquals(stateKeyOne, result.stream().findFirst().get().getKey());
assertEquals(data, result.stream().findFirst().get().getValue());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals(stateKeyTwo, result.stream().skip(1).findFirst().get().getKey());
assertNull(result.stream().skip(1).findFirst().get().getValue());
assertEquals("1", result.stream().skip(1).findFirst().get().getEtag());
assertNull(result.stream().skip(1).findFirst().get().getError());
assertEquals(stateKeyThree, result.stream().skip(2).findFirst().get().getKey());
assertNull(result.stream().skip(2).findFirst().get().getValue());
assertEquals("", result.stream().skip(2).findFirst().get().getEtag());
assertNull("not found", result.stream().skip(2).findFirst().get().getError());
}
@Test
public void saveUpdateAndGetState() {
//The key use to store the state and be updated
final String stateKey = "keyToBeUpdated";
//create http DAPR client
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute save action to DAPR
saveResponse.block();
//change data properties
data.setPropertyA("data in property A");
data.setPropertyB("data in property B2");
//create deferred action to update the sate without any etag or options
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the update action to DAPR
saveResponse.block();
//Create deferred action to retrieve the action
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the retrieve of the state
State<MyData> myDataResponse = response.block();
//review that the update was success action
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
}
@Test
public void saveAndDeleteState() {
//The key use to store the state and be deleted
final String stateKey = "myeKeyToBeDeleted";
//create DAPR client
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the retrieve of the state
State<MyData> myDataResponse = response.block();
//review that the state was saved correctly
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//create deferred action to delete the state
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, null, null);
//execute the delete action
deleteResponse.block();
//Create deferred action to retrieve the state
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//execute the retrieve of the state
myDataResponse = response.block();
//review that the action does not return any value, because the state was deleted
Assert.assertNull(myDataResponse.getValue());
}
@Test
public void saveUpdateAndGetStateWithEtag() {
//The key use to store the state and be updated using etags
final String stateKey = "keyToBeUpdatedWithEtag";
//create DAPR client
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
//review that the etag is not empty
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
String firstETag = myDataResponse.getEtag();
//change the data in order to update the state
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//Create deferred action to update the data using the correct etag
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, null);
saveResponse.block();
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//retrive the data wihout any etag
myDataResponse = response.block();
//review that state value changes
Assert.assertNotNull(myDataResponse.getEtag());
//review that the etag changes after an update
Assert.assertNotEquals(firstETag, myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
}
@Test(expected = RuntimeException.class)
public void saveUpdateAndGetStateWithWrongEtag() {
final String stateKey = "keyToBeUpdatedWithWrongEtag";
//create DAPR client
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
//review that the etag is not empty
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
String firstETag = myDataResponse.getEtag();
//change the data in order to update the state
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//Create deferred action to update the data using the incorrect etag
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "99999999999999", data, null);
saveResponse.block();
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//retrive the data wihout any etag
myDataResponse = response.block();
//review that state value changes
Assert.assertNotNull(myDataResponse.getEtag());
//review that the etag changes after an update
Assert.assertNotEquals(firstETag, myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
}
@Test
public void saveAndDeleteStateWithEtag() {
final String stateKey = "myeKeyToBeDeletedWithEtag";
//create DAPR client
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to get the state with the etag
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the get state
State<MyData> myDataResponse = response.block();
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to delete an state sending the etag
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), null);
//execute the delete of the state
deleteResponse.block();
//Create deferred action to get the sate without an etag
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey), MyData.class);
myDataResponse = response.block();
//Review that the response is null, because the state was deleted
Assert.assertNull(myDataResponse.getValue());
}
@Test(expected = RuntimeException.class)
public void saveAndDeleteStateWithWrongEtag() {
final String stateKey = "myeKeyToBeDeletedWithWrongEtag";
//create DAPR client
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to get the state with the etag
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the get state
State<MyData> myDataResponse = response.block();
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to delete an state sending the incorrect etag
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999999", null);
//execute the delete of the state, this should trhow an exception
deleteResponse.block();
//Create deferred action to get the sate without an etag
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey), MyData.class);
myDataResponse = response.block();
//Review that the response is null, because the state was deleted
Assert.assertNull(myDataResponse.getValue());
}
@Test(expected = RuntimeException.class)
public void saveUpdateAndGetStateWithEtagAndStateOptionsFirstWrite() {
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
//create option with concurrency with first writte and consistency of strong
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG,
StateOptions.Concurrency.FIRST_WRITE);
//create dapr client
DaprClient daprClient = buildDaprClient();
//create Dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//create state using stateOptions
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
//execute the save state
saveResponse.block();
//crate deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
MyData.class);
//execute the retrieve of the state using options
State<MyData> myDataResponse = response.block();
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//change data to be udpated
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//create deferred action to update the action with options
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state
saveResponse.block();
data.setPropertyA("last write");
data.setPropertyB("data in property B2");
//create deferred action to update the action with the same etag
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//throws an exception, the state was already udpated
saveResponse.block();
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
State<MyData> myLastDataResponse = response.block();
Assert.assertNotNull(myLastDataResponse.getEtag());
Assert.assertNotNull(myLastDataResponse.getKey());
Assert.assertNotNull(myLastDataResponse.getValue());
Assert.assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
Assert.assertEquals("data in property A2", myLastDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
}
@Test()
public void saveUpdateAndGetStateWithEtagAndStateOptionsLastWrite() {
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
//create option with concurrency with first writte and consistency of strong
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE);
//create dapr client
DaprClient daprClient = buildDaprClient();
//create Dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//create state using stateOptions
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
//execute the save state
saveResponse.block();
//crate deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
MyData.class);
//execute the retrieve of the state using options
State<MyData> myDataResponse = response.block();
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//change data to be udpated
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//create deferred action to update the action with options
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state
saveResponse.block();
data.setPropertyA("last write");
data.setPropertyB("data in property B2");
//create deferred action to update the action with the same etag
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state without an error
saveResponse.block();
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
State<MyData> myLastDataResponse = response.block();
Assert.assertNotNull(myLastDataResponse.getEtag());
Assert.assertNotNull(myLastDataResponse.getKey());
Assert.assertNotNull(myLastDataResponse.getValue());
Assert.assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
Assert.assertEquals("last write", myLastDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
}
protected abstract DaprClient buildDaprClient();
}

View File

@ -8,28 +8,18 @@ package io.dapr.it.state;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprClientGrpc;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Test State GRPC DAPR capabilities using a DAPR instance with an empty service running
*/
public class GRPCStateClientIT extends BaseIT {
public class GRPCStateClientIT extends AbstractStateClientIT {
private static DaprRun daprRun;
@ -49,419 +39,9 @@ public class GRPCStateClientIT extends BaseIT {
daprClient.close();
}
@Test
public void saveAndGetState() {
//The key use to store the state
final String stateKey = "myKey";
//create the http client
//creation of a dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//create of the deferred call to DAPR to store the state
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save action
saveResponse.block();
//create of the deferred call to DAPR to get the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
//retrieve the state
State<MyData> myDataResponse = response.block();
//Assert that the response is the correct one
assertNotNull(myDataResponse.getEtag());
assertNotNull(myDataResponse.getKey());
assertNotNull(myDataResponse.getValue());
assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
}
@Test
public void saveUpdateAndGetState() {
//The key use to store the state and be updated
final String stateKey = "keyToBeUpdated";
//create http DAPR client
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute save action to DAPR
saveResponse.block();
//change data properties
data.setPropertyA("data in property A");
data.setPropertyB("data in property B2");
//create deferred action to update the sate without any etag or options
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the update action to DAPR
saveResponse.block();
//Create deferred action to retrieve the action
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the retrieve of the state
State<MyData> myDataResponse = response.block();
//review that the update was success action
assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
}
@Test
public void saveAndDeleteState() {
//The key use to store the state and be deleted
final String stateKey = "myeKeyToBeDeleted";
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the retrieve of the state
State<MyData> myDataResponse = response.block();
//review that the state was saved correctly
assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//create deferred action to delete the state
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, null, null);
//execute the delete action
deleteResponse.block();
//Create deferred action to retrieve the state
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//execute the retrieve of the state
myDataResponse = response.block();
//review that the action does not return any value, because the state was deleted
assertNull(myDataResponse.getValue());
}
@Test
public void saveUpdateAndGetStateWithEtag() {
//The key use to store the state and be updated using etags
final String stateKey = "keyToBeUpdatedWithEtag";
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
//review that the etag is not empty
assertNotNull(myDataResponse.getEtag());
assertNotNull(myDataResponse.getKey());
assertNotNull(myDataResponse.getValue());
assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
String firstETag = myDataResponse.getEtag();
//change the data in order to update the state
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//Create deferred action to update the data using the correct etag
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, null);
saveResponse.block();
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//retrive the data wihout any etag
myDataResponse = response.block();
//review that state value changes
assertNotNull(myDataResponse.getEtag());
//review that the etag changes after an update
assertNotEquals(firstETag, myDataResponse.getEtag());
assertNotNull(myDataResponse.getKey());
assertNotNull(myDataResponse.getValue());
assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
}
@Ignore("This test case is ignored because DAPR ignore the ETag is wrong when is sent from GRPC protocol, the " +
"execution continues and the state is updated.")
@Test(expected = RuntimeException.class)
public void saveUpdateAndGetStateWithWrongEtag() {
final String stateKey = "keyToBeUpdatedWithWrongEtag";
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
//review that the etag is not empty
assertNotNull(myDataResponse.getEtag());
assertNotNull(myDataResponse.getKey());
assertNotNull(myDataResponse.getValue());
assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
String firstETag = myDataResponse.getEtag();
//change the data in order to update the state
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//Create deferred action to update the data using the incorrect etag
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "99999999999999", data, null);
saveResponse.block();
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//retrive the data wihout any etag
myDataResponse = response.block();
//review that state value changes
assertNotNull(myDataResponse.getEtag());
//review that the etag changes after an update
assertNotEquals(firstETag, myDataResponse.getEtag());
assertNotNull(myDataResponse.getKey());
assertNotNull(myDataResponse.getValue());
assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
}
@Test
public void saveAndDeleteStateWithEtag() {
final String stateKey = "myeKeyToBeDeletedWithEtag";
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to get the state with the etag
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the get state
State<MyData> myDataResponse = response.block();
assertNotNull(myDataResponse.getEtag());
assertNotNull(myDataResponse.getKey());
assertNotNull(myDataResponse.getValue());
assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to delete an state sending the etag
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), null);
//execute the delete of the state
deleteResponse.block();
//Create deferred action to get the sate without an etag
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
myDataResponse = response.block();
//Review that the response is null, because the state was deleted
assertNull(myDataResponse.getValue());
}
@Ignore("This test case is ignored because DAPR ignore if the ETag is wrong when is sent from GRPC protocol, the " +
"execution continues and the state is deleted.")
@Test(expected = RuntimeException.class)
public void saveAndDeleteStateWithWrongEtag() {
final String stateKey = "myeKeyToBeDeletedWithWrongEtag";
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to get the state with the etag
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the get state
State<MyData> myDataResponse = response.block();
assertNotNull(myDataResponse.getEtag());
assertNotNull(myDataResponse.getKey());
assertNotNull(myDataResponse.getValue());
assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to delete an state sending the incorrect etag
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999999", null);
//execute the delete of the state, this should trhow an exception
deleteResponse.block();
//Create deferred action to get the sate without an etag
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
myDataResponse = response.block();
//Review that the response is null, because the state was deleted
assertNull(myDataResponse.getValue());
}
@Ignore("This test case is ignored because it seems that DAPR using GRPC is ignoring the state options for " +
"consistency and concurrency.")
@Test(expected = RuntimeException.class)
public void saveUpdateAndGetStateWithEtagAndStateOptionsFirstWrite() {
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
//create option with concurrency with first writte and consistency of strong
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG,
StateOptions.Concurrency.FIRST_WRITE);
//create Dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//create state using stateOptions
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
//execute the save state
saveResponse.block();
//crate deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
MyData.class);
//execute the retrieve of the state using options
State<MyData> myDataResponse = response.block();
assertNotNull(myDataResponse.getEtag());
assertNotNull(myDataResponse.getKey());
assertNotNull(myDataResponse.getValue());
assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//change data to be udpated
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//create deferred action to update the action with options
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state
saveResponse.block();
data.setPropertyA("last write");
data.setPropertyB("data in property B2");
//create deferred action to update the action with the same etag
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//throws an exception, the state was already udpated
saveResponse.block();
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
State<MyData> myLastDataResponse = response.block();
assertNotNull(myLastDataResponse.getEtag());
assertNotNull(myLastDataResponse.getKey());
assertNotNull(myLastDataResponse.getValue());
assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
assertEquals("data in property A2", myLastDataResponse.getValue().getPropertyA());
assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
}
@Test()
public void saveUpdateAndGetStateWithEtagAndStateOptionsLastWrite() {
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
//create option with concurrency with first writte and consistency of strong
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE);
//create Dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//create state using stateOptions
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
//execute the save state
saveResponse.block();
//crate deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
MyData.class);
//execute the retrieve of the state using options
State<MyData> myDataResponse = response.block();
assertNotNull(myDataResponse.getEtag());
assertNotNull(myDataResponse.getKey());
assertNotNull(myDataResponse.getValue());
assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//change data to be udpated
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//create deferred action to update the action with options
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state
saveResponse.block();
data.setPropertyA("last write");
data.setPropertyB("data in property B2");
//create deferred action to update the action with the same etag
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state without an error
saveResponse.block();
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
State<MyData> myLastDataResponse = response.block();
assertNotNull(myLastDataResponse.getEtag());
assertNotNull(myLastDataResponse.getKey());
assertNotNull(myLastDataResponse.getValue());
assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
assertEquals("last write", myLastDataResponse.getValue().getPropertyA());
assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
@Override
protected DaprClient buildDaprClient() {
return daprClient;
}
}

View File

@ -7,448 +7,41 @@ package io.dapr.it.state;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.it.BaseIT;
import io.dapr.client.DaprClientHttp;
import io.dapr.it.DaprRun;
import org.junit.Assert;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.io.IOException;
import static org.junit.Assert.assertTrue;
/**
* Test State HTTP DAPR capabilities using a DAPR instance with an empty service running
*/
public class HttpStateClientIT extends BaseIT {
public class HttpStateClientIT extends AbstractStateClientIT {
private static DaprRun daprRun;
private static DaprClient daprClient;
@BeforeClass
public static void init() throws Exception {
daprRun = startDaprApp(HttpStateClientIT.class.getSimpleName(), 1000);
daprRun = startDaprApp(HttpStateClientIT.class.getSimpleName(), 5000);
daprRun.switchToHTTP();
daprClient = new DaprClientBuilder().build();
assertTrue(daprClient instanceof DaprClientHttp);
}
@Test
public void saveAndGetState() {
//The key use to store the state
final String stateKey = "myKey";
//create the http client
DaprClient daprClient = buildDaprClient();
//creation of a dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//create of the deferred call to DAPR to store the state
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save action
saveResponse.block();
//create of the deferred call to DAPR to get the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
//retrieve the state
State<MyData> myDataResponse = response.block();
//Assert that the response is the correct one
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
@AfterClass
public static void tearDown() throws IOException {
daprClient.close();
}
@Test
public void saveUpdateAndGetState() {
//The key use to store the state and be updated
final String stateKey = "keyToBeUpdated";
//create http DAPR client
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute save action to DAPR
saveResponse.block();
//change data properties
data.setPropertyA("data in property A");
data.setPropertyB("data in property B2");
//create deferred action to update the sate without any etag or options
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the update action to DAPR
saveResponse.block();
//Create deferred action to retrieve the action
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the retrieve of the state
State<MyData> myDataResponse = response.block();
//review that the update was success action
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
}
@Test
public void saveAndDeleteState() {
//The key use to store the state and be deleted
final String stateKey = "myeKeyToBeDeleted";
//create DAPR client
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the retrieve of the state
State<MyData> myDataResponse = response.block();
//review that the state was saved correctly
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//create deferred action to delete the state
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, null, null);
//execute the delete action
deleteResponse.block();
//Create deferred action to retrieve the state
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//execute the retrieve of the state
myDataResponse = response.block();
//review that the action does not return any value, because the state was deleted
Assert.assertNull(myDataResponse.getValue());
}
@Test
public void saveUpdateAndGetStateWithEtag() {
//The key use to store the state and be updated using etags
final String stateKey = "keyToBeUpdatedWithEtag";
//create DAPR client
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
//review that the etag is not empty
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
String firstETag = myDataResponse.getEtag();
//change the data in order to update the state
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//Create deferred action to update the data using the correct etag
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, null);
saveResponse.block();
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//retrive the data wihout any etag
myDataResponse = response.block();
//review that state value changes
Assert.assertNotNull(myDataResponse.getEtag());
//review that the etag changes after an update
Assert.assertNotEquals(firstETag, myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
}
@Test(expected = RuntimeException.class)
public void saveUpdateAndGetStateWithWrongEtag() {
final String stateKey = "keyToBeUpdatedWithWrongEtag";
//create DAPR client
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
//review that the etag is not empty
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
String firstETag = myDataResponse.getEtag();
//change the data in order to update the state
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//Create deferred action to update the data using the incorrect etag
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "99999999999999", data, null);
saveResponse.block();
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//retrive the data wihout any etag
myDataResponse = response.block();
//review that state value changes
Assert.assertNotNull(myDataResponse.getEtag());
//review that the etag changes after an update
Assert.assertNotEquals(firstETag, myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
}
@Test
public void saveAndDeleteStateWithEtag() {
final String stateKey = "myeKeyToBeDeletedWithEtag";
//create DAPR client
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to get the state with the etag
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the get state
State<MyData> myDataResponse = response.block();
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to delete an state sending the etag
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), null);
//execute the delete of the state
deleteResponse.block();
//Create deferred action to get the sate without an etag
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
myDataResponse = response.block();
//Review that the response is null, because the state was deleted
Assert.assertNull(myDataResponse.getValue());
}
@Test(expected = RuntimeException.class)
public void saveAndDeleteStateWithWrongEtag() {
final String stateKey = "myeKeyToBeDeletedWithWrongEtag";
//create DAPR client
DaprClient daprClient = buildDaprClient();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to get the state with the etag
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the get state
State<MyData> myDataResponse = response.block();
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to delete an state sending the incorrect etag
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999999", null);
//execute the delete of the state, this should trhow an exception
deleteResponse.block();
//Create deferred action to get the sate without an etag
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
myDataResponse = response.block();
//Review that the response is null, because the state was deleted
Assert.assertNull(myDataResponse.getValue());
}
@Test(expected = RuntimeException.class)
public void saveUpdateAndGetStateWithEtagAndStateOptionsFirstWrite() {
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
//create option with concurrency with first writte and consistency of strong
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG,
StateOptions.Concurrency.FIRST_WRITE);
//create dapr client
DaprClient daprClient = buildDaprClient();
//create Dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//create state using stateOptions
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
//execute the save state
saveResponse.block();
//crate deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
MyData.class);
//execute the retrieve of the state using options
State<MyData> myDataResponse = response.block();
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//change data to be udpated
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//create deferred action to update the action with options
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state
saveResponse.block();
data.setPropertyA("last write");
data.setPropertyB("data in property B2");
//create deferred action to update the action with the same etag
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//throws an exception, the state was already udpated
saveResponse.block();
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
State<MyData> myLastDataResponse = response.block();
Assert.assertNotNull(myLastDataResponse.getEtag());
Assert.assertNotNull(myLastDataResponse.getKey());
Assert.assertNotNull(myLastDataResponse.getValue());
Assert.assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
Assert.assertEquals("data in property A2", myLastDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
}
@Test()
public void saveUpdateAndGetStateWithEtagAndStateOptionsLastWrite() {
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
//create option with concurrency with first writte and consistency of strong
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE);
//create dapr client
DaprClient daprClient = buildDaprClient();
//create Dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//create state using stateOptions
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
//execute the save state
saveResponse.block();
//crate deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
MyData.class);
//execute the retrieve of the state using options
State<MyData> myDataResponse = response.block();
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//change data to be udpated
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//create deferred action to update the action with options
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state
saveResponse.block();
data.setPropertyA("last write");
data.setPropertyB("data in property B2");
//create deferred action to update the action with the same etag
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state without an error
saveResponse.block();
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
State<MyData> myLastDataResponse = response.block();
Assert.assertNotNull(myLastDataResponse.getEtag());
Assert.assertNotNull(myLastDataResponse.getKey());
Assert.assertNotNull(myLastDataResponse.getValue());
Assert.assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
Assert.assertEquals("last write", myLastDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
}
private static DaprClient buildDaprClient() {
return new DaprClientBuilder().build();
@Override
protected DaprClient buildDaprClient() {
return daprClient;
}
}

View File

@ -5,6 +5,8 @@
package io.dapr.it.state;
import java.util.Objects;
public class MyData {
/// Gets or sets the value for PropertyA.
@ -13,8 +15,6 @@ public class MyData {
/// Gets or sets the value for PropertyB.
private String propertyB;
private MyData myData;
public String getPropertyB() {
return propertyB;
}
@ -39,11 +39,17 @@ public class MyData {
'}';
}
public MyData getMyData() {
return myData;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MyData myData = (MyData) o;
return Objects.equals(propertyA, myData.propertyA) &&
Objects.equals(propertyB, myData.propertyB);
}
public void setMyData(MyData myData) {
this.myData = myData;
@Override
public int hashCode() {
return Objects.hash(propertyA, propertyB);
}
}

View File

@ -13,6 +13,7 @@ import io.dapr.client.domain.GetSecretRequest;
import io.dapr.client.domain.GetSecretRequestBuilder;
import io.dapr.client.domain.GetStateRequest;
import io.dapr.client.domain.GetStateRequestBuilder;
import io.dapr.client.domain.GetStatesRequestBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeBindingRequestBuilder;
@ -302,17 +303,20 @@ abstract class AbstractDaprClient implements DaprClient {
return this.getState(stateStoreName, key, etag, options, TypeRef.get(clazz));
}
private <T> State<T> buildStateKeyValue(
DaprProtos.GetStateResponse response,
String requestedKey,
StateOptions stateOptions,
TypeRef<T> type) throws IOException {
ByteString payload = response.getData();
byte[] data = payload == null ? null : payload.toByteArray();
T value = stateSerializer.deserialize(data, type);
String etag = response.getEtag();
String key = requestedKey;
return new State<>(value, key, etag, stateOptions);
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<List<State<T>>> getStates(String stateStoreName, List<String> keys, TypeRef<T> type) {
return this.getStates(new GetStatesRequestBuilder(stateStoreName, keys).build(), type).map(r -> r.getObject());
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<List<State<T>>> getStates(String stateStoreName, List<String> keys, Class<T> clazz) {
return this.getStates(stateStoreName, keys, TypeRef.get(clazz));
}
/**

View File

@ -8,6 +8,7 @@ package io.dapr.client;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.GetSecretRequest;
import io.dapr.client.domain.GetStateRequest;
import io.dapr.client.domain.GetStatesRequest;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeServiceRequest;
@ -349,6 +350,19 @@ public interface DaprClient extends Closeable {
*/
<T> Mono<State<T>> getState(String stateStoreName, String key, String etag, StateOptions options, TypeRef<T> type);
/**
* Retrieve a State based on their key.
*
* @param stateStoreName The name of the state store.
* @param key The key of the State to be retrieved.
* @param etag Optional etag for conditional get
* @param options Optional settings for retrieve operation.
* @param clazz The Type of State needed as return.
* @param <T> The Type of the return.
* @return A Mono Plan for the requested State.
*/
<T> Mono<State<T>> getState(String stateStoreName, String key, String etag, StateOptions options, Class<T> clazz);
/**
* Retrieve a State based on their key.
*
@ -360,17 +374,36 @@ public interface DaprClient extends Closeable {
<T> Mono<Response<State<T>>> getState(GetStateRequest request, TypeRef<T> type);
/**
* Retrieve a State based on their key.
* Retrieve bulk States based on their keys.
*
* @param stateStoreName The name of the state store.
* @param key The key of the State to be retrieved.
* @param etag Optional etag for conditional get
* @param options Optional settings for retrieve operation.
* @param keys The keys of the State to be retrieved.
* @param type The type of State needed as return.
* @param <T> The type of the return.
* @return A Mono Plan for the requested State.
*/
<T> Mono<List<State<T>>> getStates(String stateStoreName, List<String> keys, TypeRef<T> type);
/**
* Retrieve bulk States based on their keys.
*
* @param stateStoreName The name of the state store.
* @param keys The keys of the State to be retrieved.
* @param clazz The type of State needed as return.
* @param <T> The type of the return.
* @return A Mono Plan for the requested State.
*/
<T> Mono<State<T>> getState(String stateStoreName, String key, String etag, StateOptions options, Class<T> clazz);
<T> Mono<List<State<T>>> getStates(String stateStoreName, List<String> keys, Class<T> clazz);
/**
* Retrieve bulk States based on their keys.
*
* @param request The request to get state.
* @param type The Type of State needed as return.
* @param <T> The Type of the return.
* @return A Mono Plan for the requested State.
*/
<T> Mono<Response<List<State<T>>>> getStates(GetStatesRequest request, TypeRef<T> type);
/**
* Save/Update a list of states.

View File

@ -5,6 +5,7 @@
package io.dapr.client;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
@ -12,6 +13,7 @@ import com.google.protobuf.Empty;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.GetSecretRequest;
import io.dapr.client.domain.GetStateRequest;
import io.dapr.client.domain.GetStatesRequest;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeServiceRequest;
@ -52,6 +54,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
/**
* An adapter for the GRPC Client.
@ -253,6 +256,74 @@ public class DaprClientGrpc extends AbstractDaprClient {
}
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Response<List<State<T>>>> getStates(GetStatesRequest request, TypeRef<T> type) {
try {
final String stateStoreName = request.getStateStoreName();
final List<String> keys = request.getKeys();
final int parallelism = request.getParallelism();
final Context context = request.getContext();
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
}
if (keys == null || keys.isEmpty()) {
throw new IllegalArgumentException("Key cannot be null or empty.");
}
if (parallelism < 0) {
throw new IllegalArgumentException("Parallelism cannot be negative.");
}
DaprProtos.GetBulkStateRequest.Builder builder = DaprProtos.GetBulkStateRequest.newBuilder()
.setStoreName(stateStoreName)
.addAllKeys(keys)
.setParallelism(parallelism);
DaprProtos.GetBulkStateRequest envelope = builder.build();
return Mono.fromCallable(wrap(context, () -> {
ListenableFuture<DaprProtos.GetBulkStateResponse> futureResponse = client.getBulkState(envelope);
DaprProtos.GetBulkStateResponse response = null;
try {
response = futureResponse.get();
} catch (NullPointerException npe) {
return null;
}
return response
.getItemsList()
.stream()
.map(b -> {
try {
return buildStateKeyValue(b, type);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
})).map(s -> new Response(context, s));
} catch (Exception ex) {
return Mono.error(ex);
}
}
private <T> State<T> buildStateKeyValue(
DaprProtos.BulkStateItem item,
TypeRef<T> type) throws IOException {
String key = item.getKey();
String error = item.getError();
if (!Strings.isNullOrEmpty(error)) {
return new State<>(key, error);
}
ByteString payload = item.getData();
byte[] data = payload == null ? null : payload.toByteArray();
T value = stateSerializer.deserialize(data, type);
String etag = item.getEtag();
return new State<>(value, key, etag);
}
private <T> State<T> buildStateKeyValue(
DaprProtos.GetStateResponse response,
String requestedKey,
@ -300,13 +371,13 @@ public class DaprClientGrpc extends AbstractDaprClient {
private <T> CommonProtos.StateItem.Builder buildStateRequest(State<T> state) throws IOException {
byte[] bytes = stateSerializer.serialize(state.getValue());
ByteString data = ByteString.copyFrom(bytes);
CommonProtos.StateItem.Builder stateBuilder = CommonProtos.StateItem.newBuilder();
if (state.getEtag() != null) {
stateBuilder.setEtag(state.getEtag());
}
if (data != null) {
stateBuilder.setValue(data);
if (bytes != null) {
stateBuilder.setValue(ByteString.copyFrom(bytes));
}
stateBuilder.setKey(state.getKey());
CommonProtos.StateOptions.Builder optionBuilder = null;
@ -347,8 +418,6 @@ public class DaprClientGrpc extends AbstractDaprClient {
CommonProtos.StateOptions.Builder optionBuilder = null;
if (options != null) {
optionBuilder = CommonProtos.StateOptions.newBuilder();
optionBuilder = CommonProtos.StateOptions.newBuilder();
if (options.getConcurrency() != null) {
optionBuilder.setConcurrency(getGrpcStateConcurrency(options));

View File

@ -5,9 +5,12 @@
package io.dapr.client;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Strings;
import io.dapr.client.domain.DeleteStateRequest;
import io.dapr.client.domain.GetSecretRequest;
import io.dapr.client.domain.GetStateRequest;
import io.dapr.client.domain.GetStatesRequest;
import io.dapr.client.domain.HttpExtension;
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeServiceRequest;
@ -16,6 +19,7 @@ import io.dapr.client.domain.Response;
import io.dapr.client.domain.SaveStateRequest;
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
import io.dapr.utils.TypeRef;
@ -26,6 +30,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -74,6 +79,11 @@ public class DaprClientHttp extends AbstractDaprClient {
*/
public static final String SECRETS_PATH = DaprHttp.API_VERSION + "/secrets";
/**
* State Path format for bulk state API.
*/
public static final String STATE_BULK_PATH_FORMAT = STATE_PATH + "/%s/bulk";
/**
* The HTTP client to be used.
*
@ -256,6 +266,50 @@ public class DaprClientHttp extends AbstractDaprClient {
return Mono.error(ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Response<List<State<T>>>> getStates(GetStatesRequest request, TypeRef<T> type) {
try {
final String stateStoreName = request.getStateStoreName();
final List<String> keys = request.getKeys();
final int parallelism = request.getParallelism();
final Context context = request.getContext();
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
}
if (keys == null || keys.isEmpty()) {
throw new IllegalArgumentException("Key cannot be null or empty.");
}
if (parallelism < 0) {
throw new IllegalArgumentException("Parallelism cannot be negative.");
}
String url = String.format(STATE_BULK_PATH_FORMAT, stateStoreName);
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("keys", keys);
jsonMap.put("parallelism", parallelism);
byte[] requestBody = INTERNAL_SERIALIZER.serialize(jsonMap);
return this.client
.invokeApi(DaprHttp.HttpMethods.POST.name(), url, null, requestBody, null, context)
.flatMap(s -> {
try {
return Mono.just(buildStates(s, type));
} catch (Exception ex) {
return Mono.error(ex);
}
})
.map(r -> new Response<>(context, r));
} catch (Exception ex) {
return Mono.error(ex);
}
}
/**
* {@inheritDoc}
@ -295,7 +349,7 @@ public class DaprClientHttp extends AbstractDaprClient {
.invokeApi(DaprHttp.HttpMethods.GET.name(), url.toString(), urlParameters, headers, context)
.flatMap(s -> {
try {
return Mono.just(buildStateKeyValue(s, key, options, type));
return Mono.just(buildState(s, key, options, type));
} catch (Exception ex) {
return Mono.error(ex);
}
@ -399,10 +453,10 @@ public class DaprClientHttp extends AbstractDaprClient {
* @param requestedKey The Key Requested.
* @param type The Class of the Value of the state
* @param <T> The Type of the Value of the state
* @return A StateKeyValue instance
* @throws IOException If there's a issue deserialzing the response.
* @return A State instance
* @throws IOException If there's a issue deserializing the response.
*/
private <T> State<T> buildStateKeyValue(
private <T> State<T> buildState(
DaprHttp.Response response, String requestedKey, StateOptions stateOptions, TypeRef<T> type) throws IOException {
// The state is in the body directly, so we use the state serializer here.
T value = stateSerializer.deserialize(response.getBody(), type);
@ -414,6 +468,39 @@ public class DaprClientHttp extends AbstractDaprClient {
return new State<>(value, key, etag, stateOptions);
}
/**
* Builds a State object based on the Response.
*
* @param response The response of the HTTP Call
* @param type The Class of the Value of the state
* @param <T> The Type of the Value of the state
* @return A list of states.
* @throws IOException If there's a issue deserializing the response.
*/
private <T> List<State<T>> buildStates(
DaprHttp.Response response, TypeRef<T> type) throws IOException {
JsonNode root = INTERNAL_SERIALIZER.parseNode(response.getBody());
List<State<T>> result = new ArrayList<>();
for (Iterator<JsonNode> it = root.elements(); it.hasNext(); ) {
JsonNode node = it.next();
String key = node.path("key").asText();
String error = node.path("error").asText();
if (!Strings.isNullOrEmpty(error)) {
result.add(new State<>(key, error));
continue;
}
String etag = node.path("etag").asText();
// TODO(artursouza): JSON cannot differentiate if data returned is String or byte[], it is ambiguous.
// This is not a high priority since GRPC is the default (and recommended) client implementation.
byte[] data = node.path("data").toString().getBytes(Properties.STRING_CHARSET.get());
T value = stateSerializer.deserialize(data, type);
result.add(new State<>(value, key, etag));
}
return result;
}
/**
* {@inheritDoc}
*/

View File

@ -8,6 +8,7 @@ package io.dapr.client;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.domain.CloudEvent;
import io.dapr.utils.TypeRef;
@ -112,6 +113,17 @@ public class ObjectSerializer {
return OBJECT_MAPPER.readValue(content, javaType);
}
/**
* Parses the JSON content into a node for fine-grained processing.
*
* @param content JSON content.
* @return JsonNode.
* @throws IOException In case content cannot be parsed.
*/
public JsonNode parseNode(byte[] content) throws IOException {
return OBJECT_MAPPER.readTree(content);
}
/**
* Parses a given String to the corresponding object defined by class.
*

View File

@ -10,7 +10,7 @@ import java.util.Collections;
import java.util.Map;
/**
* Builds a request to publish an event.
* Builds a request to request state.
*/
public class GetStateRequestBuilder {

View File

@ -0,0 +1,56 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.client.domain;
import io.grpc.Context;
import java.util.List;
/**
* A request to get bulk state by keys.
*/
public class GetStatesRequest {
private String stateStoreName;
private List<String> keys;
private int parallelism;
private Context context;
public String getStateStoreName() {
return stateStoreName;
}
void setStateStoreName(String stateStoreName) {
this.stateStoreName = stateStoreName;
}
public List<String> getKeys() {
return keys;
}
void setKeys(List<String> keys) {
this.keys = keys;
}
public int getParallelism() {
return parallelism;
}
void setParallelism(int parallelism) {
this.parallelism = parallelism;
}
public Context getContext() {
return context;
}
void setContext(Context context) {
this.context = context;
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.client.domain;
import io.grpc.Context;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Builds a request to request states.
*/
public class GetStatesRequestBuilder {
private final String stateStoreName;
private final List<String> keys;
private int parallelism = 1;
private Context context;
public GetStatesRequestBuilder(String stateStoreName, List<String> keys) {
this.stateStoreName = stateStoreName;
this.keys = Collections.unmodifiableList(keys);
}
public GetStatesRequestBuilder(String stateStoreName, String... keys) {
this.stateStoreName = stateStoreName;
this.keys = Collections.unmodifiableList(Arrays.asList(keys));
}
public GetStatesRequestBuilder withParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
public GetStatesRequestBuilder withContext(Context context) {
this.context = context;
return this;
}
/**
* Builds a request object.
* @return Request object.
*/
public GetStatesRequest build() {
GetStatesRequest request = new GetStatesRequest();
request.setStateStoreName(this.stateStoreName);
request.setKeys(this.keys);
request.setParallelism(this.parallelism);
request.setContext(this.context);
return request;
}
}

View File

@ -11,28 +11,50 @@ package io.dapr.client.domain;
* @param <T> The type of the value of the sate
*/
public class State<T> {
/**
* The value of the state.
*/
private final T value;
/**
* The key of the state.
*/
private final String key;
/**
* The ETag to be used
* Keep in mind that for some state stores (like reids) only numbers are supported.
*/
private final String etag;
/**
* The error in case the key could not be retrieved.
*/
private final String error;
/**
* The options used for saving the state.
*/
private final StateOptions options;
/**
* Create an inmutable state
* This Constructor MUST be used anytime you need to retrieve or delete a State.
* Create an immutable state reference to be retrieved or deleted.
* This Constructor CAN be used anytime you need to retrieve or delete a state.
*
* @param key - The key of the state
*/
public State(String key) {
this.key = key;
this.value = null;
this.etag = null;
this.options = null;
this.error = null;
}
/**
* Create an immutable state reference to be retrieved or deleted.
* This Constructor CAN be used anytime you need to retrieve or delete a state.
*
* @param key - The key of the state
* @param etag - The etag of the state - Keep in mind that for some state stores (like redis) only numbers
@ -44,16 +66,16 @@ public class State<T> {
this.key = key;
this.etag = etag;
this.options = options;
this.error = null;
}
/**
* Create an inmutable state.
* This Constructor MUST be used anytime you want the state to be send for a Save operation.
* Create an immutable state.
* This Constructor CAN be used anytime you want the state to be saved.
*
* @param value - The value of the state.
* @param key - The key of the state.
* @param etag - The etag of the state - Keep in mind that for some state stores (like redis)
* only numbers are supported.
* @param etag - The etag of the state - for some state stores (like redis) only numbers are supported.
* @param options - REQUIRED when saving a state.
*/
public State(T value, String key, String etag, StateOptions options) {
@ -61,6 +83,38 @@ public class State<T> {
this.key = key;
this.etag = etag;
this.options = options;
this.error = null;
}
/**
* Create an immutable state.
* This Constructor CAN be used anytime you want the state to be saved.
*
* @param value - The value of the state.
* @param key - The key of the state.
* @param etag - The etag of the state - some state stores (like redis) only numbers are supported.
*/
public State(T value, String key, String etag) {
this.value = value;
this.key = key;
this.etag = etag;
this.options = null;
this.error = null;
}
/**
* Create an immutable state.
* This Constructor MUST be used anytime the key could not be retrieved and contains an error.
*
* @param key - The key of the state.
* @param error - Error when fetching the state.
*/
public State(String key, String error) {
this.value = null;
this.key = key;
this.etag = null;
this.options = null;
this.error = error;
}
/**
@ -90,6 +144,16 @@ public class State<T> {
return etag;
}
/**
* Retrieve the error for this state.
*
* @return The error for this state.
*/
public String getError() {
return error;
}
/**
* Retrieve the Options used for saving the state.
*
@ -123,6 +187,10 @@ public class State<T> {
return false;
}
if (getError() != null ? !getEtag().equals(that.getError()) : that.getError() != null) {
return false;
}
if (getOptions() != null ? !getOptions().equals(that.getOptions()) : that.getOptions() != null) {
return false;
}
@ -135,6 +203,7 @@ public class State<T> {
int result = getValue() != null ? getValue().hashCode() : 0;
result = 31 * result + (getKey() != null ? getKey().hashCode() : 0);
result = 31 * result + (getEtag() != null ? getEtag().hashCode() : 0);
result = 31 * result + (getError() != null ? getError().hashCode() : 0);
result = 31 * result + (getOptions() != null ? options.hashCode() : 0);
return result;
}
@ -145,6 +214,7 @@ public class State<T> {
+ "value=" + value
+ ", key='" + key + "'"
+ ", etag='" + etag + "'"
+ ", etag='" + error + "'"
+ ", options={'" + options.toString() + "}"
+ "}";
}

View File

@ -34,12 +34,15 @@ import reactor.core.publisher.Mono;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.google.common.util.concurrent.Futures.addCallback;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@ -753,6 +756,183 @@ public class DaprClientGrpcTest {
assertEquals(expectedState, result.block());
}
@Test
public void getStatesString() throws IOException {
DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setData(serialize("hello world"))
.setKey("100")
.setEtag("1")
.build())
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setKey("200")
.setError("not found")
.build())
.build();
SettableFuture<DaprProtos.GetBulkStateResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetBulkStateResponse> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class)))
.thenAnswer(c -> {
settableFuture.set(responseEnvelope);
return settableFuture;
});
List<State<String>> result = adapter.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), String.class).block();
assertTrue(callback.wasCalled);
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertEquals("hello world", result.stream().findFirst().get().getValue());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
assertNull(result.stream().skip(1).findFirst().get().getValue());
assertNull(result.stream().skip(1).findFirst().get().getEtag());
assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
}
@Test
public void getStatesInteger() throws IOException {
DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setData(serialize(1234))
.setKey("100")
.setEtag("1")
.build())
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setKey("200")
.setError("not found")
.build())
.build();
SettableFuture<DaprProtos.GetBulkStateResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetBulkStateResponse> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class)))
.thenAnswer(c -> {
settableFuture.set(responseEnvelope);
return settableFuture;
});
List<State<Integer>> result = adapter.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), int.class).block();
assertTrue(callback.wasCalled);
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertEquals(1234, (int)result.stream().findFirst().get().getValue());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
assertNull(result.stream().skip(1).findFirst().get().getValue());
assertNull(result.stream().skip(1).findFirst().get().getEtag());
assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
}
@Test
public void getStatesBoolean() throws IOException {
DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setData(serialize(true))
.setKey("100")
.setEtag("1")
.build())
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setKey("200")
.setError("not found")
.build())
.build();
SettableFuture<DaprProtos.GetBulkStateResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetBulkStateResponse> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class)))
.thenAnswer(c -> {
settableFuture.set(responseEnvelope);
return settableFuture;
});
List<State<Boolean>> result = adapter.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), boolean.class).block();
assertTrue(callback.wasCalled);
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertEquals(true, result.stream().findFirst().get().getValue());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
assertNull(result.stream().skip(1).findFirst().get().getValue());
assertNull(result.stream().skip(1).findFirst().get().getEtag());
assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
}
@Test
public void getStatesByteArray() throws IOException {
DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setData(serialize(new byte[]{1, 2, 3}))
.setKey("100")
.setEtag("1")
.build())
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setKey("200")
.setError("not found")
.build())
.build();
SettableFuture<DaprProtos.GetBulkStateResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetBulkStateResponse> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class)))
.thenAnswer(c -> {
settableFuture.set(responseEnvelope);
return settableFuture;
});
List<State<byte[]>> result = adapter.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), byte[].class).block();
assertTrue(callback.wasCalled);
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertArrayEquals(new byte[]{1, 2, 3}, result.stream().findFirst().get().getValue());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
assertNull(result.stream().skip(1).findFirst().get().getValue());
assertNull(result.stream().skip(1).findFirst().get().getEtag());
assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
}
@Test
public void getStatesObject() throws IOException {
MyObject object = new MyObject(1, "Event");
DaprProtos.GetBulkStateResponse responseEnvelope = DaprProtos.GetBulkStateResponse.newBuilder()
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setData(serialize(object))
.setKey("100")
.setEtag("1")
.build())
.addItems(DaprProtos.BulkStateItem.newBuilder()
.setKey("200")
.setError("not found")
.build())
.build();
SettableFuture<DaprProtos.GetBulkStateResponse> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetBulkStateResponse> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
when(client.getBulkState(any(DaprProtos.GetBulkStateRequest.class)))
.thenAnswer(c -> {
settableFuture.set(responseEnvelope);
return settableFuture;
});
List<State<MyObject>> result = adapter.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), MyObject.class).block();
assertTrue(callback.wasCalled);
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertEquals(object, result.stream().findFirst().get().getValue());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
assertNull(result.stream().skip(1).findFirst().get().getValue());
assertNull(result.stream().skip(1).findFirst().get().getEtag());
assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
}
@Test(expected = RuntimeException.class)
public void deleteStateExceptionThrowTest() {
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class))).thenThrow(RuntimeException.class);

View File

@ -21,11 +21,13 @@ import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -38,6 +40,9 @@ public class DaprClientHttpTest {
private static final String SECRET_STORE_NAME = "MySecretStore";
private final String EXPECTED_RESULT =
"{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}";
private DaprClientHttp daprClientHttp;
private DaprHttp daprHttp;
@ -46,9 +51,6 @@ public class DaprClientHttpTest {
private MockInterceptor mockInterceptor;
private final String EXPECTED_RESULT =
"{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}";
@Before
public void setUp() throws Exception {
mockInterceptor = new MockInterceptor(Behavior.UNORDERED);
@ -382,7 +384,120 @@ public class DaprClientHttpTest {
}
@Test
public void getStates() {
public void getStatesString() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk")
.respond("[{\"key\": \"100\", \"data\": \"hello world\", \"etag\": \"1\"}," +
"{\"key\": \"200\", \"error\": \"not found\"}]");
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
List<State<String>> result =
daprClientHttp.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), String.class).block();
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertEquals("hello world", result.stream().findFirst().get().getValue());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
assertNull(result.stream().skip(1).findFirst().get().getValue());
assertNull(result.stream().skip(1).findFirst().get().getEtag());
assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
}
@Test
public void getStatesInteger() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk")
.respond("[{\"key\": \"100\", \"data\": 1234, \"etag\": \"1\"}," +
"{\"key\": \"200\", \"error\": \"not found\"}]");
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
List<State<Integer>> result =
daprClientHttp.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), int.class).block();
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertEquals(1234, (int)result.stream().findFirst().get().getValue());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
assertNull(result.stream().skip(1).findFirst().get().getValue());
assertNull(result.stream().skip(1).findFirst().get().getEtag());
assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
}
@Test
public void getStatesBoolean() {
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk")
.respond("[{\"key\": \"100\", \"data\": true, \"etag\": \"1\"}," +
"{\"key\": \"200\", \"error\": \"not found\"}]");
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
List<State<Boolean>> result =
daprClientHttp.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), boolean.class).block();
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertEquals(true, (boolean)result.stream().findFirst().get().getValue());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
assertNull(result.stream().skip(1).findFirst().get().getValue());
assertNull(result.stream().skip(1).findFirst().get().getEtag());
assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
}
@Test
public void getStatesByteArray() {
byte[] value = new byte[]{1, 2, 3};
String base64Value = Base64.getEncoder().encodeToString(value);
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk")
.respond("[{\"key\": \"100\", \"data\": \"" + base64Value + "\", \"etag\": \"1\"}," +
"{\"key\": \"200\", \"error\": \"not found\"}]");
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
// JSON cannot differentiate if data returned is String or byte[], it is ambiguous. So we get base64 encoded back.
// So, users should use String instead of byte[].
List<State<String>> result =
daprClientHttp.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), String.class).block();
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertEquals(base64Value, result.stream().findFirst().get().getValue());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
assertNull(result.stream().skip(1).findFirst().get().getValue());
assertNull(result.stream().skip(1).findFirst().get().getEtag());
assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
}
@Test
public void getStatesObject() {
MyObject object = new MyObject(1, "Event");
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/state/MyStateStore/bulk")
.respond("[{\"key\": \"100\", \"data\": " +
"{ \"id\": \"" + object.id + "\", \"value\": \"" + object.value + "\"}, \"etag\": \"1\"}," +
"{\"key\": \"200\", \"error\": \"not found\"}]");
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
// JSON cannot differentiate if data returned is String or byte[], it is ambiguous. So we get base64 encoded back.
// So, users should use String instead of byte[].
List<State<MyObject>> result =
daprClientHttp.getStates(STATE_STORE_NAME, Arrays.asList("100", "200"), MyObject.class).block();
assertEquals(2, result.size());
assertEquals("100", result.stream().findFirst().get().getKey());
assertEquals(object, result.stream().findFirst().get().getValue());
assertEquals("1", result.stream().findFirst().get().getEtag());
assertNull(result.stream().findFirst().get().getError());
assertEquals("200", result.stream().skip(1).findFirst().get().getKey());
assertNull(result.stream().skip(1).findFirst().get().getValue());
assertNull(result.stream().skip(1).findFirst().get().getEtag());
assertEquals("not found", result.stream().skip(1).findFirst().get().getError());
}
@Test
public void getState() {
StateOptions stateOptions = mock(StateOptions.class);
State<String> stateKeyValue = new State("value", "key", "etag", stateOptions);
State<String> stateKeyNull = new State("value", null, "etag", stateOptions);
@ -715,4 +830,53 @@ public class DaprClientHttpTest {
assertEquals("mysecretvalue2", secret.get("mysecretkey2"));
}
}
public static class MyObject {
private Integer id;
private String value;
public MyObject() {
}
public MyObject(Integer id, String value) {
this.id = id;
this.value = value;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof MyObject)) return false;
MyObject myObject = (MyObject) o;
if (!getId().equals(myObject.getId())) return false;
if (getValue() != null ? !getValue().equals(myObject.getValue()) : myObject.getValue() != null) return false;
return true;
}
@Override
public int hashCode() {
int result = getId().hashCode();
result = 31 * result + (getValue() != null ? getValue().hashCode() : 0);
return result;
}
}
}