Add support for dapr 0.4 (multi-state store) (#199)

* Updating proto file from dapr.

* Adding support for multi-state store.

* Allow PubSubIT to get accept events out of order.

Co-authored-by: Shalabh Mohan Shrivastava <shalabhms@gmail.com>
This commit is contained in:
Artur Souza 2020-02-04 13:31:57 -08:00 committed by GitHub
parent af44053198
commit bbbd4be648
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 493 additions and 345 deletions

View File

@ -5,7 +5,6 @@ on:
branches:
- master
- release-*
- java_sdk_wip
tags:
- v*
@ -13,31 +12,70 @@ on:
branches:
- master
- release-*
- java_sdk_wip
jobs:
build:
runs-on: ubuntu-latest
env:
GOVER: 1.13.7
GOOS: linux
GOARCH: amd64
GOPROXY: https://proxy.golang.org
JDK_VER: 13.0.x
DAPR_RUNTIME_VER: 0.3.0
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/f84566fb2bf5a599252ab9d6bd82fc78faf94dba/install/install.sh
DAPR_CLI_REF: f84566fb2bf5a599252ab9d6bd82fc78faf94dba
DAPR_REF: e540a7166aeaf115773ccc4c7a1056ae7eed073b
OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }}
OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }}
GPG_KEY: ${{ secrets.GPG_KEY }}
GPG_PWD: ${{ secrets.GPG_PWD }}
steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- name: Set up OpenJDK ${{ env.JDK_VER }}
uses: actions/setup-java@v1
with:
java-version: ${{ env.JDK_VER }}
- name: Set up Dapr CLI
run: wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash
run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash
- name: Set up Go ${{ env.GOVER }}
if: env.DAPR_REF != '' || env.DAPR_CLI_REF != ''
uses: actions/setup-go@v1
with:
go-version: ${{ env.GOVER }}
- name: Checkout Dapr CLI repo to override dapr command.
uses: actions/checkout@v2
if: env.DAPR_CLI_REF != ''
with:
repository: dapr/cli
ref: ${{ env.DAPR_CLI_REF }}
path: cli
- name: Checkout Dapr repo to override daprd.
uses: actions/checkout@v2
if: env.DAPR_REF != ''
with:
repository: dapr/dapr
ref: ${{ env.DAPR_REF }}
path: dapr
- name: Build and override dapr cli with referenced commit.
if: env.DAPR_CLI_REF != ''
run: |
cd cli
make
sudo cp dist/linux_amd64/release/dapr /usr/local/bin/dapr
cd ..
- name: Initialize Dapr runtime ${{ env.DAPR_RUNTIME_VER }}
run: |
sudo dapr init --runtime-version ${{ env.DAPR_RUNTIME_VER }}
echo "Showing dapr version..."
dapr --version
- name: Build and override daprd with referenced commit.
if: env.DAPR_REF != ''
run: |
cd dapr
make
sudo cp dist/linux_amd64/release/daprd /usr/local/bin/daprd
cd ..
- name: Install Local kafka using docker-compose
run: |
docker-compose -f ./sdk-tests/deploy/local-test-kafka.yml up -d

2
.gitignore vendored
View File

@ -41,6 +41,8 @@ hs_err_pid*
# Some other generated folders/files
**/components/redis.yaml
**/components/redis_messagebus.yaml
**/components/statestore.yaml
**/components/messagebus.yaml
/docs/dapr-sdk-actors
/docs/dapr-sdk-autogen
/docs/dapr-sdk

View File

@ -9,3 +9,5 @@ spec:
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"

View File

@ -23,6 +23,8 @@ public class StateClient {
public String message;
}
private static final String STATE_STORE_NAME = "statestore";
private static final String KEY_NAME = "myKey";
/**
@ -36,17 +38,17 @@ public class StateClient {
MyClass myClass = new MyClass();
myClass.message = message;
client.saveState(KEY_NAME, myClass).block();
client.saveState(STATE_STORE_NAME, KEY_NAME, myClass).block();
System.out.println("Saving class with message: " + message);
Mono<State<MyClass>> retrievedMessageMono = client.getState(KEY_NAME, MyClass.class);
Mono<State<MyClass>> retrievedMessageMono = client.getState(STATE_STORE_NAME, KEY_NAME, MyClass.class);
System.out.println("Retrieved class message from state: " + (retrievedMessageMono.block().getValue()).message);
System.out.println("Deleting state...");
Mono<Void> mono = client.deleteState(KEY_NAME);
Mono<Void> mono = client.deleteState(STATE_STORE_NAME, KEY_NAME);
mono.block();
Mono<State<MyClass>> retrievedDeletedMessageMono = client.getState(KEY_NAME, MyClass.class);
Mono<State<MyClass>> retrievedDeletedMessageMono = client.getState(STATE_STORE_NAME, KEY_NAME, MyClass.class);
System.out.println("Trying to retrieve deleted state: " + retrievedDeletedMessageMono.block().getValue());
}

View File

@ -1,106 +1,109 @@
syntax = "proto3";
package dapr;
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/duration.proto";
option java_outer_classname = "DaprProtos";
option java_package = "io.dapr";
option csharp_namespace = "Dapr.Client.Grpc";
// Dapr definitions
service Dapr {
rpc PublishEvent(PublishEventEnvelope) returns (google.protobuf.Empty) {}
rpc InvokeService(InvokeServiceEnvelope) returns (InvokeServiceResponseEnvelope) {}
rpc InvokeBinding(InvokeBindingEnvelope) returns (google.protobuf.Empty) {}
rpc GetState(GetStateEnvelope) returns (GetStateResponseEnvelope) {}
rpc SaveState(SaveStateEnvelope) returns (google.protobuf.Empty) {}
rpc DeleteState(DeleteStateEnvelope) returns (google.protobuf.Empty) {}
}
message InvokeServiceResponseEnvelope {
google.protobuf.Any data = 1;
map<string,string> metadata = 2;
}
message DeleteStateEnvelope {
string key = 1;
string etag = 2;
StateOptions options = 3;
}
message SaveStateEnvelope {
repeated StateRequest requests = 1;
}
message GetStateEnvelope {
string key = 1;
string consistency = 2;
}
message GetStateResponseEnvelope {
google.protobuf.Any data = 1;
string etag = 2;
}
message InvokeBindingEnvelope {
string name = 1;
google.protobuf.Any data = 2;
map<string,string> metadata = 3;
}
message InvokeServiceEnvelope {
string id = 1;
string method = 2;
google.protobuf.Any data = 3;
map<string,string> metadata = 4;
}
message PublishEventEnvelope {
string topic = 1;
google.protobuf.Any data = 2;
}
message State {
string key = 1;
google.protobuf.Any value = 2;
string etag = 3;
map<string,string> metadata = 4;
StateOptions options = 5;
}
message StateOptions {
string concurrency = 1;
string consistency = 2;
RetryPolicy retryPolicy = 3;
}
message RetryPolicy {
int32 threshold = 1;
string pattern = 2;
google.protobuf.Duration interval = 3;
}
message StateRequest {
string key = 1;
google.protobuf.Any value = 2;
string etag = 3;
map<string,string> metadata = 4;
StateRequestOptions options = 5;
}
message StateRequestOptions {
string concurrency = 1;
string consistency = 2;
StateRetryPolicy retryPolicy = 3;
}
message StateRetryPolicy {
int32 threshold = 1;
string pattern = 2;
google.protobuf.Duration interval = 3;
}
syntax = "proto3";
package dapr;
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/duration.proto";
option java_outer_classname = "DaprProtos";
option java_package = "io.dapr";
option csharp_namespace = "Dapr.Client.Grpc";
// Dapr definitions
service Dapr {
rpc PublishEvent(PublishEventEnvelope) returns (google.protobuf.Empty) {}
rpc InvokeService(InvokeServiceEnvelope) returns (InvokeServiceResponseEnvelope) {}
rpc InvokeBinding(InvokeBindingEnvelope) returns (google.protobuf.Empty) {}
rpc GetState(GetStateEnvelope) returns (GetStateResponseEnvelope) {}
rpc SaveState(SaveStateEnvelope) returns (google.protobuf.Empty) {}
rpc DeleteState(DeleteStateEnvelope) returns (google.protobuf.Empty) {}
}
message InvokeServiceResponseEnvelope {
google.protobuf.Any data = 1;
map<string,string> metadata = 2;
}
message DeleteStateEnvelope {
string storeName = 1;
string key = 2;
string etag = 3;
StateOptions options = 4;
}
message SaveStateEnvelope {
string storeName = 1;
repeated StateRequest requests = 2;
}
message GetStateEnvelope {
string storeName = 1;
string key = 2;
string consistency = 3;
}
message GetStateResponseEnvelope {
google.protobuf.Any data = 1;
string etag = 2;
}
message InvokeBindingEnvelope {
string name = 1;
google.protobuf.Any data = 2;
map<string,string> metadata = 3;
}
message InvokeServiceEnvelope {
string id = 1;
string method = 2;
google.protobuf.Any data = 3;
map<string,string> metadata = 4;
}
message PublishEventEnvelope {
string topic = 1;
google.protobuf.Any data = 2;
}
message State {
string key = 1;
google.protobuf.Any value = 2;
string etag = 3;
map<string,string> metadata = 4;
StateOptions options = 5;
}
message StateOptions {
string concurrency = 1;
string consistency = 2;
RetryPolicy retryPolicy = 3;
}
message RetryPolicy {
int32 threshold = 1;
string pattern = 2;
google.protobuf.Duration interval = 3;
}
message StateRequest {
string key = 1;
google.protobuf.Any value = 2;
string etag = 3;
map<string,string> metadata = 4;
StateRequestOptions options = 5;
}
message StateRequestOptions {
string concurrency = 1;
string consistency = 2;
StateRetryPolicy retryPolicy = 3;
}
message StateRetryPolicy {
int32 threshold = 1;
string pattern = 2;
google.protobuf.Duration interval = 3;
}

View File

@ -11,6 +11,8 @@ import org.junit.AfterClass;
public abstract class BaseIT {
protected static final String STATE_STORE_NAME = "statestore";
private static final Collection<DaprRun> DAPR_RUNS = new ArrayList<>();
protected static DaprRun startDaprApp(

View File

@ -62,13 +62,16 @@ public class PubSubIT extends BaseIT {
assertEquals(11, messages.size());
for (int i = 0; i < NUM_MESSAGES; i++) {
assertTrue(messages.get(i).startsWith("This is message "));
assertTrue(messages.contains(String.format("This is message #%d", i)));
}
byte[] result=new byte[] { 1 };
assertEquals(result.length, messages.get(10).getBytes().length);
assertEquals(result[0], messages.get(10).getBytes()[0]);
boolean foundByte = false;
for (String message : messages) {
if ((message.getBytes().length == 1) && (message.getBytes()[0] == 1)) {
foundByte = true;
}
}
assertTrue(foundByte);
}, 2000);
}

View File

@ -62,12 +62,12 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//create of the deferred call to DAPR to store the state
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save action
saveResponse.block();
//create of the deferred call to DAPR to get the state
Mono<State<MyData>> response = daprClient.getState(new State(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
//retrieve the state
State<MyData> myDataResponse = response.block();
@ -94,7 +94,7 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute save action to DAPR
saveResponse.block();
@ -102,12 +102,13 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyA("data in property A");
data.setPropertyB("data in property B2");
//create deferred action to update the sate without any etag or options
saveResponse = daprClient.saveState(stateKey, null, data, null);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the update action to DAPR
saveResponse.block();
//Create deferred action to retrieve the action
Mono<State<MyData>> response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the retrieve of the state
State<MyData> myDataResponse = response.block();
@ -126,12 +127,13 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the retrieve of the state
State<MyData> myDataResponse = response.block();
@ -140,12 +142,12 @@ public class GRPCStateClientIT extends BaseIT {
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//create deferred action to delete the state
Mono<Void> deleteResponse = daprClient.deleteState(stateKey, null, null);
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, null, null);
//execute the delete action
deleteResponse.block();
//Create deferred action to retrieve the state
response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//execute the retrieve of the state
myDataResponse = response.block();
@ -166,12 +168,13 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
@ -188,11 +191,11 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//Create deferred action to update the data using the correct etag
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, null);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, null);
saveResponse.block();
response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//retrive the data wihout any etag
myDataResponse = response.block();
@ -206,7 +209,8 @@ public class GRPCStateClientIT extends BaseIT {
assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
}
@Ignore("This test case is ignored because DAPR ignore the ETag is wrong when is sent from GRPC protocol, the execution continues and the state is updated.")
@Ignore("This test case is ignored because DAPR ignore the ETag is wrong when is sent from GRPC protocol, the " +
"execution continues and the state is updated.")
@Test(expected = RuntimeException.class)
public void saveUpdateAndGetStateWithWrongEtag() {
final String stateKey = "keyToBeUpdatedWithWrongEtag";
@ -217,12 +221,13 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
@ -239,11 +244,11 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//Create deferred action to update the data using the incorrect etag
saveResponse = daprClient.saveState(stateKey, "99999999999999", data, null);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "99999999999999", data, null);
saveResponse.block();
response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//retrive the data wihout any etag
myDataResponse = response.block();
@ -267,12 +272,13 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to get the state with the etag
Mono<State<MyData>> response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the get state
State<MyData> myDataResponse = response.block();
@ -283,19 +289,20 @@ public class GRPCStateClientIT extends BaseIT {
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to delete an state sending the etag
Mono<Void> deleteResponse = daprClient.deleteState(stateKey, myDataResponse.getEtag(), null);
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), null);
//execute the delete of the state
deleteResponse.block();
//Create deferred action to get the sate without an etag
response = daprClient.getState(new State(stateKey, null, null), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
myDataResponse = response.block();
//Review that the response is null, because the state was deleted
assertNull(myDataResponse.getValue());
}
@Ignore("This test case is ignored because DAPR ignore if the ETag is wrong when is sent from GRPC protocol, the execution continues and the state is deleted.")
@Ignore("This test case is ignored because DAPR ignore if the ETag is wrong when is sent from GRPC protocol, the " +
"execution continues and the state is deleted.")
@Test(expected = RuntimeException.class)
public void saveAndDeleteStateWithWrongEtag() {
final String stateKey = "myeKeyToBeDeletedWithWrongEtag";
@ -306,12 +313,13 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to get the state with the etag
Mono<State<MyData>> response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the get state
State<MyData> myDataResponse = response.block();
@ -322,25 +330,27 @@ public class GRPCStateClientIT extends BaseIT {
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to delete an state sending the incorrect etag
Mono<Void> deleteResponse = daprClient.deleteState(stateKey, "99999999999", null);
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999999", null);
//execute the delete of the state, this should trhow an exception
deleteResponse.block();
//Create deferred action to get the sate without an etag
response = daprClient.getState(new State(stateKey, null, null), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
myDataResponse = response.block();
//Review that the response is null, because the state was deleted
assertNull(myDataResponse.getValue());
}
@Ignore("This test case is ignored because it seems that DAPR using GRPC is ignoring the state options for consistency and concurrency.")
@Ignore("This test case is ignored because it seems that DAPR using GRPC is ignoring the state options for " +
"consistency and concurrency.")
@Test(expected = RuntimeException.class)
public void saveUpdateAndGetStateWithEtagAndStateOptionsFirstWrite() {
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
//create option with concurrency with first writte and consistency of strong
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, null);
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG,
StateOptions.Concurrency.FIRST_WRITE, null);
//create Dummy data
@ -349,13 +359,14 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//create state using stateOptions
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, stateOptions);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
//execute the save state
saveResponse.block();
//crate deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
MyData.class);
//execute the retrieve of the state using options
State<MyData> myDataResponse = response.block();
@ -369,7 +380,7 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//create deferred action to update the action with options
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state
saveResponse.block();
@ -377,11 +388,11 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyA("last write");
data.setPropertyB("data in property B2");
//create deferred action to update the action with the same etag
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//throws an exception, the state was already udpated
saveResponse.block();
response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
State<MyData> myLastDataResponse = response.block();
assertNotNull(myLastDataResponse.getEtag());
@ -397,7 +408,8 @@ public class GRPCStateClientIT extends BaseIT {
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
//create option with concurrency with first writte and consistency of strong
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE, null);
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE
, null);
//create Dummy data
@ -406,13 +418,14 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//create state using stateOptions
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, stateOptions);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
//execute the save state
saveResponse.block();
//crate deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
MyData.class);
//execute the retrieve of the state using options
State<MyData> myDataResponse = response.block();
@ -426,7 +439,7 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//create deferred action to update the action with options
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state
saveResponse.block();
@ -434,11 +447,11 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyA("last write");
data.setPropertyB("data in property B2");
//create deferred action to update the action with the same etag
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state without an error
saveResponse.block();
response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
State<MyData> myLastDataResponse = response.block();
assertNotNull(myLastDataResponse.getEtag());
@ -452,7 +465,8 @@ public class GRPCStateClientIT extends BaseIT {
@Test(timeout = 13000)
public void saveDeleteWithRetry() {
final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry";
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(3), 3, StateOptions.RetryPolicy.Pattern.LINEAR);
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(3), 3,
StateOptions.RetryPolicy.Pattern.LINEAR);
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
@ -462,12 +476,12 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
@ -479,7 +493,7 @@ public class GRPCStateClientIT extends BaseIT {
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
Mono<Void> deleteResponse = daprClient.deleteState(stateKey, "99999999", stateOptions);
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999", stateOptions);
long start = System.currentTimeMillis();
try {
@ -499,7 +513,8 @@ public class GRPCStateClientIT extends BaseIT {
@Test(timeout = 13000)
public void saveUpdateWithRetry() {
final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry";
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(4), 3, StateOptions.RetryPolicy.Pattern.LINEAR);
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(4), 3,
StateOptions.RetryPolicy.Pattern.LINEAR);
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
@ -509,12 +524,12 @@ public class GRPCStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
@ -526,7 +541,7 @@ public class GRPCStateClientIT extends BaseIT {
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to save the sate
saveResponse = daprClient.saveState(stateKey, "9999999", data, stateOptions);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "9999999", data, stateOptions);
//execute the save state action
long start = System.currentTimeMillis();

View File

@ -33,6 +33,7 @@ public class HelloWorldClientIT extends BaseIT {
{
DaprProtos.GetStateEnvelope req = DaprProtos.GetStateEnvelope
.newBuilder()
.setStoreName(STATE_STORE_NAME)
.setKey(key)
.build();
DaprProtos.GetStateResponseEnvelope response = client.getState(req);
@ -45,6 +46,7 @@ public class HelloWorldClientIT extends BaseIT {
{
DaprProtos.DeleteStateEnvelope req = DaprProtos.DeleteStateEnvelope
.newBuilder()
.setStoreName(STATE_STORE_NAME)
.setKey(key)
.build();
client.deleteState(req);
@ -54,6 +56,7 @@ public class HelloWorldClientIT extends BaseIT {
{
DaprProtos.GetStateEnvelope req = DaprProtos.GetStateEnvelope
.newBuilder()
.setStoreName(STATE_STORE_NAME)
.setKey(key)
.build();
DaprProtos.GetStateResponseEnvelope response = client.getState(req);

View File

@ -38,13 +38,14 @@ public class HelloWorldGrpcStateService {
String value = "Hello World";
StateRequest req = StateRequest
.newBuilder()
.setKey(key)
.setValue(Any.newBuilder().setValue(ByteString.copyFromUtf8(value)).build())
.build();
.newBuilder()
.setKey(key)
.setValue(Any.newBuilder().setValue(ByteString.copyFromUtf8(value)).build())
.build();
SaveStateEnvelope state = SaveStateEnvelope.newBuilder()
.addRequests(req)
.build();
.setStoreName("statestore")
.addRequests(req)
.build();
client.saveState(state);
System.out.println("Saved!");
channel.shutdown();

View File

@ -54,12 +54,12 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//create of the deferred call to DAPR to store the state
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save action
saveResponse.block();
//create of the deferred call to DAPR to get the state
Mono<State<MyData>> response = daprClient.getState(new State(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
//retrieve the state
State<MyData> myDataResponse = response.block();
@ -86,7 +86,7 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute save action to DAPR
saveResponse.block();
@ -94,12 +94,13 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyA("data in property A");
data.setPropertyB("data in property B2");
//create deferred action to update the sate without any etag or options
saveResponse = daprClient.saveState(stateKey, null, data, null);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the update action to DAPR
saveResponse.block();
//Create deferred action to retrieve the action
Mono<State<MyData>> response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the retrieve of the state
State<MyData> myDataResponse = response.block();
@ -121,12 +122,13 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the retrieve of the state
State<MyData> myDataResponse = response.block();
@ -135,12 +137,12 @@ public class HttpStateClientIT extends BaseIT {
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//create deferred action to delete the state
Mono<Void> deleteResponse = daprClient.deleteState(stateKey, null, null);
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, null, null);
//execute the delete action
deleteResponse.block();
//Create deferred action to retrieve the state
response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//execute the retrieve of the state
myDataResponse = response.block();
@ -161,12 +163,13 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
@ -183,11 +186,11 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//Create deferred action to update the data using the correct etag
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, null);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, null);
saveResponse.block();
response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//retrive the data wihout any etag
myDataResponse = response.block();
@ -214,12 +217,13 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
@ -236,11 +240,11 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//Create deferred action to update the data using the incorrect etag
saveResponse = daprClient.saveState(stateKey, "99999999999999", data, null);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "99999999999999", data, null);
saveResponse.block();
response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null), MyData.class);
//retrive the data wihout any etag
myDataResponse = response.block();
@ -264,12 +268,13 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to get the state with the etag
Mono<State<MyData>> response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the get state
State<MyData> myDataResponse = response.block();
@ -280,12 +285,12 @@ public class HttpStateClientIT extends BaseIT {
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to delete an state sending the etag
Mono<Void> deleteResponse = daprClient.deleteState(stateKey, myDataResponse.getEtag(), null);
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), null);
//execute the delete of the state
deleteResponse.block();
//Create deferred action to get the sate without an etag
response = daprClient.getState(new State(stateKey, null, null), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
myDataResponse = response.block();
//Review that the response is null, because the state was deleted
@ -304,12 +309,13 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to get the state with the etag
Mono<State<MyData>> response = daprClient.getState(new State<MyData>(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State<MyData>(stateKey, null, null),
MyData.class);
//execute the get state
State<MyData> myDataResponse = response.block();
@ -320,12 +326,12 @@ public class HttpStateClientIT extends BaseIT {
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to delete an state sending the incorrect etag
Mono<Void> deleteResponse = daprClient.deleteState(stateKey, "99999999999", null);
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999999", null);
//execute the delete of the state, this should trhow an exception
deleteResponse.block();
//Create deferred action to get the sate without an etag
response = daprClient.getState(new State(stateKey, null, null), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
myDataResponse = response.block();
//Review that the response is null, because the state was deleted
@ -337,7 +343,8 @@ public class HttpStateClientIT extends BaseIT {
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
//create option with concurrency with first writte and consistency of strong
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, null);
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG,
StateOptions.Concurrency.FIRST_WRITE, null);
//create dapr client
DaprClient daprClient = buildDaprClient();
@ -347,13 +354,14 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//create state using stateOptions
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, stateOptions);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
//execute the save state
saveResponse.block();
//crate deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
MyData.class);
//execute the retrieve of the state using options
State<MyData> myDataResponse = response.block();
@ -367,7 +375,7 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//create deferred action to update the action with options
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state
saveResponse.block();
@ -375,11 +383,11 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyA("last write");
data.setPropertyB("data in property B2");
//create deferred action to update the action with the same etag
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//throws an exception, the state was already udpated
saveResponse.block();
response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
State<MyData> myLastDataResponse = response.block();
Assert.assertNotNull(myLastDataResponse.getEtag());
@ -395,7 +403,8 @@ public class HttpStateClientIT extends BaseIT {
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
//create option with concurrency with first writte and consistency of strong
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE, null);
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE
, null);
//create dapr client
DaprClient daprClient = buildDaprClient();
@ -405,13 +414,14 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//create state using stateOptions
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, stateOptions);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions);
//execute the save state
saveResponse.block();
//crate deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions),
MyData.class);
//execute the retrieve of the state using options
State<MyData> myDataResponse = response.block();
@ -425,7 +435,7 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//create deferred action to update the action with options
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state
saveResponse.block();
@ -433,11 +443,11 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyA("last write");
data.setPropertyB("data in property B2");
//create deferred action to update the action with the same etag
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state without an error
saveResponse.block();
response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class);
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class);
State<MyData> myLastDataResponse = response.block();
Assert.assertNotNull(myLastDataResponse.getEtag());
@ -451,7 +461,8 @@ public class HttpStateClientIT extends BaseIT {
@Test(timeout = 13000)
public void saveDeleteWithRetry() {
final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry";
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(3), 3, StateOptions.RetryPolicy.Pattern.LINEAR);
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(3), 3,
StateOptions.RetryPolicy.Pattern.LINEAR);
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
//create DAPR client
@ -462,12 +473,12 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
@ -479,7 +490,7 @@ public class HttpStateClientIT extends BaseIT {
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
Mono<Void> deleteResponse = daprClient.deleteState(stateKey, "99999999", stateOptions);
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999", stateOptions);
long start = System.currentTimeMillis();
try {
@ -499,7 +510,8 @@ public class HttpStateClientIT extends BaseIT {
@Test(timeout = 13000)
public void saveUpdateWithRetry() {
final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry";
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(4), 3, StateOptions.RetryPolicy.Pattern.EXPONENTIAL);
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(4), 3,
StateOptions.RetryPolicy.Pattern.EXPONENTIAL);
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
//create DAPR client
@ -510,12 +522,12 @@ public class HttpStateClientIT extends BaseIT {
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<State<MyData>> response = daprClient.getState(new State(stateKey, null, null), MyData.class);
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
//execute the action for retrieve the state and the etag
State<MyData> myDataResponse = response.block();
@ -527,7 +539,7 @@ public class HttpStateClientIT extends BaseIT {
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to save the sate
saveResponse = daprClient.saveState(stateKey, "9999999", data, stateOptions);
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "9999999", data, stateOptions);
//execute the save state action
long start = System.currentTimeMillis();

View File

@ -148,78 +148,86 @@ public interface DaprClient {
/**
* Retrieve a State based on their key.
*
* @param state State to be re-retrieved.
* @param clazz The Type of State needed as return.
* @param <T> The Type of the return.
* @param stateStoreName The name of the state store.
* @param state State to be re-retrieved.
* @param clazz The Type of State needed as return.
* @param <T> The Type of the return.
* @return A Mono Plan for the requested State.
*/
<T> Mono<State<T>> getState(State<T> state, Class<T> clazz);
<T> Mono<State<T>> getState(String stateStoreName, State<T> state, Class<T> clazz);
/**
* Retrieve a State based on their key.
*
* @param key The key of the State to be retrieved.
* @param clazz The Type of State needed as return.
* @param <T> The Type of the return.
* @param stateStoreName The name of the state store.
* @param key The key of the State to be retrieved.
* @param clazz The Type of State needed as return.
* @param <T> The Type of the return.
* @return A Mono Plan for the requested State.
*/
<T> Mono<State<T>> getState(String key, Class<T> clazz);
<T> Mono<State<T>> getState(String stateStoreName, String key, Class<T> clazz);
/**
* Retrieve a State based on their key.
*
* @param key The key of the State to be retrieved.
* @param etag Optional etag for conditional get
* @param options Optional settings for retrieve operation.
* @param clazz The Type of State needed as return.
* @param <T> The Type of the return.
* @param stateStoreName The name of the state store.
* @param key The key of the State to be retrieved.
* @param etag Optional etag for conditional get
* @param options Optional settings for retrieve operation.
* @param clazz The Type of State needed as return.
* @param <T> The Type of the return.
* @return A Mono Plan for the requested State.
*/
<T> Mono<State<T>> getState(String key, String etag, StateOptions options, Class<T> clazz);
<T> Mono<State<T>> getState(String stateStoreName, String key, String etag, StateOptions options, Class<T> clazz);
/**
* Save/Update a list of states.
*
* @param states the States to be saved.
* @param stateStoreName The name of the state store.
* @param states The States to be saved.
* @return a Mono plan of type Void.
*/
Mono<Void> saveStates(List<State<?>> states);
Mono<Void> saveStates(String stateStoreName, List<State<?>> states);
/**
* Save/Update a state.
*
* @param key the key of the state.
* @param value the value of the state.
* @param stateStoreName The name of the state store.
* @param key The key of the state.
* @param value The value of the state.
* @return a Mono plan of type Void.
*/
Mono<Void> saveState(String key, Object value);
Mono<Void> saveState(String stateStoreName, String key, Object value);
/**
* Save/Update a state.
*
* @param key the key of the state.
* @param etag the etag to be used.
* @param value the value of the state.
* @param options the Options to use for each state.
* @param stateStoreName The name of the state store.
* @param key The key of the state.
* @param etag The etag to be used.
* @param value The value of the state.
* @param options The Options to use for each state.
* @return a Mono plan of type Void.
*/
Mono<Void> saveState(String key, String etag, Object value, StateOptions options);
Mono<Void> saveState(String stateStoreName, String key, String etag, Object value, StateOptions options);
/**
* Delete a state.
*
* @param key The key of the State to be removed.
* @param stateStoreName The name of the state store.
* @param key The key of the State to be removed.
* @return a Mono plan of type Void.
*/
Mono<Void> deleteState(String key);
Mono<Void> deleteState(String stateStoreName, String key);
/**
* Delete a state.
*
* @param key The key of the State to be removed.
* @param etag Optional etag for conditional delete.
* @param options Optional settings for state operation.
* @param stateStoreName The name of the state store.
* @param key The key of the State to be removed.
* @param etag Optional etag for conditional delete.
* @param options Optional settings for state operation.
* @return a Mono plan of type Void.
*/
Mono<Void> deleteState(String key, String etag, StateOptions options);
Mono<Void> deleteState(String stateStoreName, String key, String etag, StateOptions options);
}

View File

@ -210,25 +210,33 @@ public class DaprClientGrpc implements DaprClient {
* {@inheritDoc}
*/
@Override
public <T> Mono<State<T>> getState(State<T> state, Class<T> clazz) {
return this.getState(state.getKey(), state.getEtag(), state.getOptions(), clazz);
public <T> Mono<State<T>> getState(String stateStoreName, State<T> state, Class<T> clazz) {
return this.getState(stateStoreName, state.getKey(), state.getEtag(), state.getOptions(), clazz);
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<State<T>> getState(String key, Class<T> clazz) {
return this.getState(key, null, null, clazz);
public <T> Mono<State<T>> getState(String stateStoreName, String key, Class<T> clazz) {
return this.getState(stateStoreName, key, null, null, clazz);
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<State<T>> getState(String key, String etag, StateOptions options, Class<T> clazz) {
public <T> Mono<State<T>> getState(
String stateStoreName, String key, String etag, StateOptions options, Class<T> clazz) {
try {
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
}
if ((key == null) || (key.trim().isEmpty())) {
throw new IllegalArgumentException("Key cannot be null or empty.");
}
DaprProtos.GetStateEnvelope.Builder builder = DaprProtos.GetStateEnvelope.newBuilder()
.setStoreName(stateStoreName)
.setKey(key);
if (options != null && options.getConsistency() != null) {
builder.setConsistency(options.getConsistency().getValue());
@ -267,9 +275,13 @@ public class DaprClientGrpc implements DaprClient {
* {@inheritDoc}
*/
@Override
public Mono<Void> saveStates(List<State<?>> states) {
public Mono<Void> saveStates(String stateStoreName, List<State<?>> states) {
try {
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
}
DaprProtos.SaveStateEnvelope.Builder builder = DaprProtos.SaveStateEnvelope.newBuilder();
builder.setStoreName(stateStoreName);
for (State state : states) {
builder.addRequests(buildStateRequest(state).build());
}
@ -342,33 +354,40 @@ public class DaprClientGrpc implements DaprClient {
* {@inheritDoc}
*/
@Override
public Mono<Void> saveState(String key, Object value) {
return this.saveState(key, null, value, null);
public Mono<Void> saveState(String stateStoreName, String key, Object value) {
return this.saveState(stateStoreName, key, null, value, null);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> saveState(String key, String etag, Object value, StateOptions options) {
public Mono<Void> saveState(String stateStoreName, String key, String etag, Object value, StateOptions options) {
State<?> state = new State<>(value, key, etag, options);
return this.saveStates(Arrays.asList(state));
return this.saveStates(stateStoreName, Arrays.asList(state));
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> deleteState(String key) {
return this.deleteState(key, null, null);
public Mono<Void> deleteState(String stateStoreName, String key) {
return this.deleteState(stateStoreName, key, null, null);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> deleteState(String key, String etag, StateOptions options) {
public Mono<Void> deleteState(String stateStoreName, String key, String etag, StateOptions options) {
try {
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
}
if ((key == null) || (key.trim().isEmpty())) {
throw new IllegalArgumentException("Key cannot be null or empty.");
}
DaprProtos.StateOptions.Builder optionBuilder = null;
if (options != null) {
optionBuilder = DaprProtos.StateOptions.newBuilder();
@ -402,6 +421,7 @@ public class DaprClientGrpc implements DaprClient {
}
}
DaprProtos.DeleteStateEnvelope.Builder builder = DaprProtos.DeleteStateEnvelope.newBuilder()
.setStoreName(stateStoreName)
.setKey(key);
if (etag != null) {
builder.setEtag(etag);

View File

@ -268,26 +268,30 @@ public class DaprClientHttp implements DaprClient {
* {@inheritDoc}
*/
@Override
public <T> Mono<State<T>> getState(State<T> state, Class<T> clazz) {
return this.getState(state.getKey(), state.getEtag(), state.getOptions(), clazz);
public <T> Mono<State<T>> getState(String stateStoreName, State<T> state, Class<T> clazz) {
return this.getState(stateStoreName, state.getKey(), state.getEtag(), state.getOptions(), clazz);
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<State<T>> getState(String key, Class<T> clazz) {
return this.getState(key, null, null, clazz);
public <T> Mono<State<T>> getState(String stateStoreName, String key, Class<T> clazz) {
return this.getState(stateStoreName, key, null, null, clazz);
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<State<T>> getState(String key, String etag, StateOptions options, Class<T> clazz) {
public <T> Mono<State<T>> getState(
String stateStoreName, String key, String etag, StateOptions options, Class<T> clazz) {
try {
if (key == null) {
throw new IllegalArgumentException("Name cannot be null or empty.");
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
}
if ((key == null) || (key.trim().isEmpty())) {
throw new IllegalArgumentException("Key cannot be null or empty.");
}
Map<String, String> headers = new HashMap<>();
if (etag != null && !etag.trim().isEmpty()) {
@ -295,6 +299,8 @@ public class DaprClientHttp implements DaprClient {
}
StringBuilder url = new StringBuilder(Constants.STATE_PATH)
.append("/")
.append(stateStoreName)
.append("/")
.append(key);
Map<String, String> urlParameters = Optional.ofNullable(options)
@ -319,8 +325,11 @@ public class DaprClientHttp implements DaprClient {
* {@inheritDoc}
*/
@Override
public Mono<Void> saveStates(List<State<?>> states) {
public Mono<Void> saveStates(String stateStoreName, List<State<?>> states) {
try {
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
}
if (states == null || states.isEmpty()) {
return Mono.empty();
}
@ -330,7 +339,7 @@ public class DaprClientHttp implements DaprClient {
if (etag != null && !etag.trim().isEmpty()) {
headers.put(Constants.HEADER_HTTP_ETAG_ID, etag);
}
final String url = Constants.STATE_PATH;
final String url = Constants.STATE_PATH + "/" + stateStoreName;
List<State<Object>> internalStateObjects = new ArrayList<>(states.size());
for (State state : states) {
if (state == null) {
@ -356,41 +365,44 @@ public class DaprClientHttp implements DaprClient {
* {@inheritDoc}
*/
@Override
public Mono<Void> saveState(String key, Object value) {
return this.saveState(key, null, value, null);
public Mono<Void> saveState(String stateStoreName, String key, Object value) {
return this.saveState(stateStoreName, key, null, value, null);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> saveState(String key, String etag, Object value, StateOptions options) {
public Mono<Void> saveState(String stateStoreName, String key, String etag, Object value, StateOptions options) {
return Mono.fromSupplier(() -> new State<>(value, key, etag, options))
.flatMap(state -> saveStates(Arrays.asList(state)));
.flatMap(state -> saveStates(stateStoreName, Arrays.asList(state)));
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> deleteState(String key) {
return this.deleteState(key, null, null);
public Mono<Void> deleteState(String stateStoreName, String key) {
return this.deleteState(stateStoreName, key, null, null);
}
/**
* {@inheritDoc}
*/
@Override
public Mono<Void> deleteState(String key, String etag, StateOptions options) {
public Mono<Void> deleteState(String stateStoreName, String key, String etag, StateOptions options) {
try {
if (key == null || key.trim().isEmpty()) {
throw new IllegalArgumentException("Name cannot be null or empty.");
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
throw new IllegalArgumentException("State store name cannot be null or empty.");
}
if ((key == null) || (key.trim().isEmpty())) {
throw new IllegalArgumentException("Key cannot be null or empty.");
}
Map<String, String> headers = new HashMap<>();
if (etag != null && !etag.trim().isEmpty()) {
headers.put(Constants.HEADER_HTTP_ETAG_ID, etag);
}
String url = Constants.STATE_PATH + "/" + key;
String url = Constants.STATE_PATH + "/" + stateStoreName + "/" + key;
Map<String, String> urlParameters = Optional.ofNullable(options)
.map(stateOptions -> stateOptions.getStateOptionsAsMap())
.orElse(new HashMap<>());

View File

@ -35,6 +35,8 @@ import static org.mockito.Mockito.*;
public class DaprClientGrpcTest {
private static final String STATE_STORE_NAME = "MyStateStore";
private DaprGrpc.DaprFutureStub client;
private DaprClientGrpc adapter;
private ObjectSerializer serializer;
@ -332,7 +334,7 @@ public class DaprClientGrpcTest {
String request = "Request";
byte[] byteRequest = serializer.serialize(request);
Mono<byte[]> result =
adapter.invokeService(Verb.GET, "appId", "method", byteRequest, (HashMap<String, String>)null);
adapter.invokeService(Verb.GET, "appId", "method", byteRequest, (HashMap<String, String>) null);
settableFuture.setException(ex);
result.block();
}
@ -351,7 +353,7 @@ public class DaprClientGrpcTest {
String request = "Request";
byte[] byteRequest = serializer.serialize(request);
Mono<byte[]> result = adapter.invokeService(
Verb.GET, "appId", "method", byteRequest, (HashMap<String, String>)null);
Verb.GET, "appId", "method", byteRequest, (HashMap<String, String>) null);
byte[] byteOutput = result.block();
String strOutput = serializer.deserialize(byteOutput, String.class);
assertEquals(expected, strOutput);
@ -434,7 +436,7 @@ public class DaprClientGrpcTest {
public void getStateExceptionThrownTest() {
when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class))).thenThrow(RuntimeException.class);
State<String> key = buildStateKey(null, "Key1", "ETag1", null);
Mono<State<String>> result = adapter.getState(key, String.class);
Mono<State<String>> result = adapter.getState(STATE_STORE_NAME, key, String.class);
result.block();
}
@ -448,7 +450,7 @@ public class DaprClientGrpcTest {
when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class)))
.thenReturn(settableFuture);
State<String> key = buildStateKey(null, "Key1", "ETag1", null);
Mono<State<String>> result = adapter.getState(key, String.class);
Mono<State<String>> result = adapter.getState(STATE_STORE_NAME, key, String.class);
settableFuture.setException(ex);
result.block();
}
@ -466,7 +468,7 @@ public class DaprClientGrpcTest {
when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class)))
.thenReturn(settableFuture);
State<String> keyRequest = buildStateKey(null, key, etag, null);
Mono<State<String>> result = adapter.getState(keyRequest, String.class);
Mono<State<String>> result = adapter.getState(STATE_STORE_NAME, keyRequest, String.class);
settableFuture.set(responseEnvelope);
assertEquals(expectedState, result.block());
}
@ -489,7 +491,7 @@ public class DaprClientGrpcTest {
addCallback(settableFuture, callback, directExecutor());
when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class)))
.thenReturn(settableFuture);
Mono<State<MyObject>> result = adapter.getState(keyRequest, MyObject.class);
Mono<State<MyObject>> result = adapter.getState(STATE_STORE_NAME, keyRequest, MyObject.class);
settableFuture.set(responseEnvelope);
assertEquals(expectedState, result.block());
}
@ -512,7 +514,7 @@ public class DaprClientGrpcTest {
addCallback(settableFuture, callback, directExecutor());
when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class)))
.thenReturn(settableFuture);
Mono<State<MyObject>> result = adapter.getState(keyRequest, MyObject.class);
Mono<State<MyObject>> result = adapter.getState(STATE_STORE_NAME, keyRequest, MyObject.class);
settableFuture.set(responseEnvelope);
assertEquals(expectedState, result.block());
}
@ -521,7 +523,7 @@ public class DaprClientGrpcTest {
public void deleteStateExceptionThrowTest() {
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))).thenThrow(RuntimeException.class);
State<String> key = buildStateKey(null, "Key1", "ETag1", null);
Mono<Void> result = adapter.deleteState(key.getKey(), key.getEtag(), key.getOptions());
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions());
result.block();
}
@ -535,7 +537,7 @@ public class DaprClientGrpcTest {
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
State<String> key = buildStateKey(null, "Key1", "ETag1", null);
Mono<Void> result = adapter.deleteState(key.getKey(), key.getEtag(), key.getOptions());
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions());
settableFuture.setException(ex);
result.block();
}
@ -550,7 +552,8 @@ public class DaprClientGrpcTest {
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, null);
Mono<Void> result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions());
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
stateKey.getOptions());
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -568,7 +571,8 @@ public class DaprClientGrpcTest {
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions());
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
stateKey.getOptions());
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -586,7 +590,8 @@ public class DaprClientGrpcTest {
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions());
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
stateKey.getOptions());
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -604,7 +609,8 @@ public class DaprClientGrpcTest {
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions());
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
stateKey.getOptions());
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -622,7 +628,8 @@ public class DaprClientGrpcTest {
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions());
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
stateKey.getOptions());
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -640,7 +647,8 @@ public class DaprClientGrpcTest {
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions());
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
stateKey.getOptions());
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -658,7 +666,8 @@ public class DaprClientGrpcTest {
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions());
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
stateKey.getOptions());
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -676,7 +685,8 @@ public class DaprClientGrpcTest {
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
State<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions());
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
stateKey.getOptions());
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -688,7 +698,7 @@ public class DaprClientGrpcTest {
String etag = "ETag1";
String value = "State value";
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenThrow(RuntimeException.class);
Mono<Void> result = adapter.saveState(key, etag, value, null);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null);
result.block();
}
@ -702,7 +712,7 @@ public class DaprClientGrpcTest {
MockCallback<Empty> callback = new MockCallback<>(ex);
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
Mono<Void> result = adapter.saveState(key, etag, value, null);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null);
settableFuture.setException(ex);
result.block();
}
@ -716,7 +726,7 @@ public class DaprClientGrpcTest {
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
Mono<Void> result = adapter.saveState(key, etag, value, null);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null);
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -733,7 +743,7 @@ public class DaprClientGrpcTest {
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(key, etag, value, options);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -750,7 +760,7 @@ public class DaprClientGrpcTest {
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(key, etag, value, options);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -767,7 +777,7 @@ public class DaprClientGrpcTest {
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(key, etag, value, options);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -784,7 +794,7 @@ public class DaprClientGrpcTest {
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
null, null, null);
Mono<Void> result = adapter.saveState(key, etag, value, options);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -801,7 +811,7 @@ public class DaprClientGrpcTest {
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
null, 1, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(key, etag, value, options);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -818,7 +828,7 @@ public class DaprClientGrpcTest {
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR);
Mono<Void> result = adapter.saveState(key, etag, value, options);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -835,7 +845,7 @@ public class DaprClientGrpcTest {
when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, null);
Mono<Void> result = adapter.saveState(key, etag, value, options);
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
assertTrue(callback.wasCalled);
@ -848,9 +858,9 @@ public class DaprClientGrpcTest {
/**
* The purpose of this test is to show that it doesn't matter when the client is called, the actual coll to DAPR
* will be done when the output Mono response call the Mono.block method.
* Like for instanche if you call getState, withouth blocking for the response, and then call delete for the same state
* you just retrived but block for the delete response, when later you block for the response of the getState, you will
* not found the state.
* Like for instance if you call getState, without blocking for the response, and then call delete for the same
* state you just retrieved but block for the delete response, when later you block for the response of the getState,
* you will not find the state.
* <p>This test will execute the following flow:</p>
* <ol>
* <li>Exeucte client getState for Key=key1</li>
@ -861,7 +871,8 @@ public class DaprClientGrpcTest {
* <li>Block for deleteState call.</li>
* <li>Block for getState for Key=key2 and Assert they 2 was not found.</li>
* </ol>
* @throws Exception - Test will fail if any unexpected exception is being thrown
*
* @throws Exception - Test will fail if any unexpected exception is being thrown.
*/
@Test
@ -877,17 +888,18 @@ public class DaprClientGrpcTest {
futuresMap.put(key2, buildFutureGetStateEnvelop(expectedValue2, etag));
when(client.getState(argThat(new GetStateEnvelopeKeyMatcher(key1)))).thenReturn(futuresMap.get(key1));
State<String> keyRequest1 = buildStateKey(null, key1, etag, null);
Mono<State<String>> resultGet1 = adapter.getState(keyRequest1, String.class);
Mono<State<String>> resultGet1 = adapter.getState(STATE_STORE_NAME, keyRequest1, String.class);
assertEquals(expectedState1, resultGet1.block());
State<String> keyRequest2 = buildStateKey(null, key2, etag, null);
Mono<State<String>> resultGet2 = adapter.getState(keyRequest2, String.class);
Mono<State<String>> resultGet2 = adapter.getState(STATE_STORE_NAME, keyRequest2, String.class);
SettableFuture<Empty> settableFutureDelete = SettableFuture.create();
MockCallback<Empty> callbackDelete = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFutureDelete, callbackDelete, directExecutor());
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFutureDelete);
Mono<Void> resultDelete = adapter.deleteState(keyRequest2.getKey(), keyRequest2.getEtag(), keyRequest2.getOptions());
Mono<Void> resultDelete = adapter.deleteState(STATE_STORE_NAME, keyRequest2.getKey(), keyRequest2.getEtag(),
keyRequest2.getOptions());
settableFutureDelete.set(Empty.newBuilder().build());
resultDelete.block();
assertTrue(callbackDelete.wasCalled);
@ -916,7 +928,8 @@ public class DaprClientGrpcTest {
}
private StateOptions buildStateOptions(StateOptions.Consistency consistency, StateOptions.Concurrency concurrency,
Duration interval, Integer threshold, StateOptions.RetryPolicy.Pattern pattern) {
Duration interval, Integer threshold,
StateOptions.RetryPolicy.Pattern pattern) {
StateOptions.RetryPolicy retryPolicy = null;
if (interval != null || threshold != null || pattern != null) {

View File

@ -22,6 +22,8 @@ import static org.mockito.Mockito.mock;
public class DaprClientHttpTest {
private static final String STATE_STORE_NAME = "MyStateStore";
private DaprClientHttp daprClientHttp;
private DaprHttp daprHttp;
@ -30,7 +32,8 @@ public class DaprClientHttpTest {
private MockInterceptor mockInterceptor;
private final String EXPECTED_RESULT = "{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}";
private final String EXPECTED_RESULT =
"{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}";
@Before
public void setUp() throws Exception {
@ -215,14 +218,14 @@ public class DaprClientHttpTest {
State<String> stateKeyValue = new State("value", "key", "etag", stateOptions);
State<String> stateKeyNull = new State("value", null, "etag", stateOptions);
mockInterceptor.addRule()
.get("http://localhost:3000/v1.0/state/key")
.get("http://localhost:3000/v1.0/state/MyStateStore/key")
.respond("\"" + EXPECTED_RESULT + "\"");
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
assertThrows(IllegalArgumentException.class, () -> {
daprClientHttp.getState(stateKeyNull, String.class).block();
daprClientHttp.getState(STATE_STORE_NAME, stateKeyNull, String.class).block();
});
Mono<State<String>> mono = daprClientHttp.getState(stateKeyValue, String.class);
Mono<State<String>> mono = daprClientHttp.getState(STATE_STORE_NAME, stateKeyValue, String.class);
assertEquals(mono.block().getKey(), "key");
}
@ -230,11 +233,11 @@ public class DaprClientHttpTest {
public void getStatesEmptyEtag() {
State<String> stateEmptyEtag = new State("value", "key", "", null);
mockInterceptor.addRule()
.get("http://localhost:3000/v1.0/state/key")
.get("http://localhost:3000/v1.0/state/MyStateStore/key")
.respond("\"" + EXPECTED_RESULT + "\"");
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
Mono<State<String>> monoEmptyEtag = daprClientHttp.getState(stateEmptyEtag, String.class);
Mono<State<String>> monoEmptyEtag = daprClientHttp.getState(STATE_STORE_NAME, stateEmptyEtag, String.class);
assertEquals(monoEmptyEtag.block().getKey(), "key");
}
@ -242,11 +245,11 @@ public class DaprClientHttpTest {
public void getStatesNullEtag() {
State<String> stateNullEtag = new State("value", "key", null, null);
mockInterceptor.addRule()
.get("http://localhost:3000/v1.0/state/key")
.get("http://localhost:3000/v1.0/state/MyStateStore/key")
.respond("\"" + EXPECTED_RESULT + "\"");
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
Mono<State<String>> monoNullEtag = daprClientHttp.getState(stateNullEtag, String.class);
Mono<State<String>> monoNullEtag = daprClientHttp.getState(STATE_STORE_NAME, stateNullEtag, String.class);
assertEquals(monoNullEtag.block().getKey(), "key");
}
@ -255,11 +258,11 @@ public class DaprClientHttpTest {
State<String> stateKeyValue = new State("value", "key", "etag", null);
List<State<?>> stateKeyValueList = Arrays.asList(stateKeyValue);
mockInterceptor.addRule()
.post("http://localhost:3000/v1.0/state")
.post("http://localhost:3000/v1.0/state/MyStateStore")
.respond(EXPECTED_RESULT);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
Mono<Void> mono = daprClientHttp.saveStates(stateKeyValueList);
Mono<Void> mono = daprClientHttp.saveStates(STATE_STORE_NAME, stateKeyValueList);
assertNull(mono.block());
}
@ -268,13 +271,13 @@ public class DaprClientHttpTest {
State<String> stateKeyValue = new State("value", "key", "", null);
List<State<?>> stateKeyValueList = new ArrayList();
mockInterceptor.addRule()
.post("http://localhost:3000/v1.0/state")
.post("http://localhost:3000/v1.0/state/MyStateStore")
.respond(EXPECTED_RESULT);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
Mono<Void> mono = daprClientHttp.saveStates(null);
Mono<Void> mono = daprClientHttp.saveStates(STATE_STORE_NAME, null);
assertNull(mono.block());
Mono<Void> mono1 = daprClientHttp.saveStates(stateKeyValueList);
Mono<Void> mono1 = daprClientHttp.saveStates(STATE_STORE_NAME, stateKeyValueList);
assertNull(mono1.block());
}
@ -283,11 +286,11 @@ public class DaprClientHttpTest {
State<String> stateKeyValue = new State("value", "key", null, null);
List<State<?>> stateKeyValueList = Arrays.asList(stateKeyValue);
mockInterceptor.addRule()
.post("http://localhost:3000/v1.0/state")
.post("http://localhost:3000/v1.0/state/MyStateStore")
.respond(EXPECTED_RESULT);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
Mono<Void> mono = daprClientHttp.saveStates(stateKeyValueList);
Mono<Void> mono = daprClientHttp.saveStates(STATE_STORE_NAME, stateKeyValueList);
assertNull(mono.block());
}
@ -296,23 +299,23 @@ public class DaprClientHttpTest {
State<String> stateKeyValue = new State("value", "key", "", null);
List<State<?>> stateKeyValueList = Arrays.asList(stateKeyValue);
mockInterceptor.addRule()
.post("http://localhost:3000/v1.0/state")
.post("http://localhost:3000/v1.0/state/MyStateStore")
.respond(EXPECTED_RESULT);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
Mono<Void> mono = daprClientHttp.saveStates(stateKeyValueList);
Mono<Void> mono = daprClientHttp.saveStates(STATE_STORE_NAME, stateKeyValueList);
assertNull(mono.block());
}
@Test
public void simpleSaveStates() {
mockInterceptor.addRule()
.post("http://localhost:3000/v1.0/state")
.post("http://localhost:3000/v1.0/state/MyStateStore")
.respond(EXPECTED_RESULT);
StateOptions stateOptions = mock(StateOptions.class);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
Mono<Void> mono = daprClientHttp.saveState("key", "etag", "value", stateOptions);
Mono<Void> mono = daprClientHttp.saveState(STATE_STORE_NAME, "key", "etag", "value", stateOptions);
assertNull(mono.block());
}
@ -322,11 +325,11 @@ public class DaprClientHttpTest {
StateOptions stateOptions = mock(StateOptions.class);
State<String> stateKeyValue = new State("value", "key", "etag", stateOptions);
mockInterceptor.addRule()
.delete("http://localhost:3000/v1.0/state/key")
.delete("http://localhost:3000/v1.0/state/MyStateStore/key")
.respond(EXPECTED_RESULT);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
Mono<Void> mono = daprClientHttp.deleteState(stateKeyValue.getKey(), stateKeyValue.getEtag(), stateOptions);
Mono<Void> mono = daprClientHttp.deleteState(STATE_STORE_NAME, stateKeyValue.getKey(), stateKeyValue.getEtag(), stateOptions);
assertNull(mono.block());
}
@ -334,11 +337,11 @@ public class DaprClientHttpTest {
public void deleteStateNullEtag() {
State<String> stateKeyValue = new State("value", "key", null, null);
mockInterceptor.addRule()
.delete("http://localhost:3000/v1.0/state/key")
.delete("http://localhost:3000/v1.0/state/MyStateStore/key")
.respond(EXPECTED_RESULT);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
Mono<Void> mono = daprClientHttp.deleteState(stateKeyValue.getKey(), stateKeyValue.getEtag(), null);
Mono<Void> mono = daprClientHttp.deleteState(STATE_STORE_NAME, stateKeyValue.getKey(), stateKeyValue.getEtag(), null);
assertNull(mono.block());
}
@ -346,11 +349,11 @@ public class DaprClientHttpTest {
public void deleteStateEmptyEtag() {
State<String> stateKeyValue = new State("value", "key", "", null);
mockInterceptor.addRule()
.delete("http://localhost:3000/v1.0/state/key")
.delete("http://localhost:3000/v1.0/state/MyStateStore/key")
.respond(EXPECTED_RESULT);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
Mono<Void> mono = daprClientHttp.deleteState(stateKeyValue.getKey(), stateKeyValue.getEtag(), null);
Mono<Void> mono = daprClientHttp.deleteState(STATE_STORE_NAME, stateKeyValue.getKey(), stateKeyValue.getEtag(), null);
assertNull(mono.block());
}
@ -359,18 +362,27 @@ public class DaprClientHttpTest {
State<String> stateKeyValueNull = new State("value", null, "etag", null);
State<String> stateKeyValueEmpty = new State("value", "", "etag", null);
mockInterceptor.addRule()
.delete("http://localhost:3000/v1.0/state/key")
.delete("http://localhost:3000/v1.0/state/MyStateStore/key")
.respond(EXPECTED_RESULT);
daprHttp = new DaprHttp(3000, okHttpClient);
daprClientHttp = new DaprClientHttp(daprHttp);
assertThrows(IllegalArgumentException.class, () -> {
daprClientHttp.deleteState(null, null, null).block();
daprClientHttp.deleteState(STATE_STORE_NAME, null, null, null).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprClientHttp.deleteState("", null, null).block();
daprClientHttp.deleteState(STATE_STORE_NAME, "", null, null).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprClientHttp.deleteState(" ", null, null).block();
daprClientHttp.deleteState(STATE_STORE_NAME, " ", null, null).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprClientHttp.deleteState(null, "key", null, null).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprClientHttp.deleteState("", "key", null, null).block();
});
assertThrows(IllegalArgumentException.class, () -> {
daprClientHttp.deleteState(" ", "key", null, null).block();
});
}
}