From bbbd4be648092b832b73e3d08cc2b7e05317df24 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Tue, 4 Feb 2020 13:31:57 -0800 Subject: [PATCH] Add support for dapr 0.4 (multi-state store) (#199) * Updating proto file from dapr. * Adding support for multi-state store. * Allow PubSubIT to get accept events out of order. Co-authored-by: Shalabh Mohan Shrivastava --- .github/workflows/build.yml | 46 +++- .gitignore | 2 + examples/components/redis.yaml | 2 + .../io/dapr/examples/state/StateClient.java | 10 +- proto/dapr/dapr.proto | 215 +++++++++--------- .../src/test/java/io/dapr/it/BaseIT.java | 2 + .../java/io/dapr/it/pubsub/http/PubSubIT.java | 15 +- .../io/dapr/it/state/GRPCStateClientIT.java | 111 +++++---- .../io/dapr/it/state/HelloWorldClientIT.java | 3 + .../it/state/HelloWorldGrpcStateService.java | 13 +- .../io/dapr/it/state/HttpStateClientIT.java | 102 +++++---- .../main/java/io/dapr/client/DaprClient.java | 68 +++--- .../java/io/dapr/client/DaprClientGrpc.java | 46 ++-- .../java/io/dapr/client/DaprClientHttp.java | 50 ++-- .../io/dapr/client/DaprClientGrpcTest.java | 83 ++++--- .../io/dapr/client/DaprClientHttpTest.java | 70 +++--- 16 files changed, 493 insertions(+), 345 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 40ea3e2b9..3a19abcbc 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -5,7 +5,6 @@ on: branches: - master - release-* - - java_sdk_wip tags: - v* @@ -13,31 +12,70 @@ on: branches: - master - release-* - - java_sdk_wip jobs: build: runs-on: ubuntu-latest env: + GOVER: 1.13.7 + GOOS: linux + GOARCH: amd64 + GOPROXY: https://proxy.golang.org JDK_VER: 13.0.x DAPR_RUNTIME_VER: 0.3.0 + DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/f84566fb2bf5a599252ab9d6bd82fc78faf94dba/install/install.sh + DAPR_CLI_REF: f84566fb2bf5a599252ab9d6bd82fc78faf94dba + DAPR_REF: e540a7166aeaf115773ccc4c7a1056ae7eed073b OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }} OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }} GPG_KEY: ${{ secrets.GPG_KEY }} GPG_PWD: ${{ secrets.GPG_PWD }} steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v2 - name: Set up OpenJDK ${{ env.JDK_VER }} uses: actions/setup-java@v1 with: java-version: ${{ env.JDK_VER }} - name: Set up Dapr CLI - run: wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash + run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash + - name: Set up Go ${{ env.GOVER }} + if: env.DAPR_REF != '' || env.DAPR_CLI_REF != '' + uses: actions/setup-go@v1 + with: + go-version: ${{ env.GOVER }} + - name: Checkout Dapr CLI repo to override dapr command. + uses: actions/checkout@v2 + if: env.DAPR_CLI_REF != '' + with: + repository: dapr/cli + ref: ${{ env.DAPR_CLI_REF }} + path: cli + - name: Checkout Dapr repo to override daprd. + uses: actions/checkout@v2 + if: env.DAPR_REF != '' + with: + repository: dapr/dapr + ref: ${{ env.DAPR_REF }} + path: dapr + - name: Build and override dapr cli with referenced commit. + if: env.DAPR_CLI_REF != '' + run: | + cd cli + make + sudo cp dist/linux_amd64/release/dapr /usr/local/bin/dapr + cd .. - name: Initialize Dapr runtime ${{ env.DAPR_RUNTIME_VER }} run: | sudo dapr init --runtime-version ${{ env.DAPR_RUNTIME_VER }} echo "Showing dapr version..." dapr --version + - name: Build and override daprd with referenced commit. + if: env.DAPR_REF != '' + run: | + cd dapr + make + sudo cp dist/linux_amd64/release/daprd /usr/local/bin/daprd + cd .. - name: Install Local kafka using docker-compose run: | docker-compose -f ./sdk-tests/deploy/local-test-kafka.yml up -d diff --git a/.gitignore b/.gitignore index 3404a4c60..ae28bb52d 100644 --- a/.gitignore +++ b/.gitignore @@ -41,6 +41,8 @@ hs_err_pid* # Some other generated folders/files **/components/redis.yaml **/components/redis_messagebus.yaml +**/components/statestore.yaml +**/components/messagebus.yaml /docs/dapr-sdk-actors /docs/dapr-sdk-autogen /docs/dapr-sdk diff --git a/examples/components/redis.yaml b/examples/components/redis.yaml index 3cd472b1c..064922915 100644 --- a/examples/components/redis.yaml +++ b/examples/components/redis.yaml @@ -9,3 +9,5 @@ spec: value: localhost:6379 - name: redisPassword value: "" + - name: actorStateStore + value: "true" diff --git a/examples/src/main/java/io/dapr/examples/state/StateClient.java b/examples/src/main/java/io/dapr/examples/state/StateClient.java index f1604ef9f..ea99fe541 100644 --- a/examples/src/main/java/io/dapr/examples/state/StateClient.java +++ b/examples/src/main/java/io/dapr/examples/state/StateClient.java @@ -23,6 +23,8 @@ public class StateClient { public String message; } + private static final String STATE_STORE_NAME = "statestore"; + private static final String KEY_NAME = "myKey"; /** @@ -36,17 +38,17 @@ public class StateClient { MyClass myClass = new MyClass(); myClass.message = message; - client.saveState(KEY_NAME, myClass).block(); + client.saveState(STATE_STORE_NAME, KEY_NAME, myClass).block(); System.out.println("Saving class with message: " + message); - Mono> retrievedMessageMono = client.getState(KEY_NAME, MyClass.class); + Mono> retrievedMessageMono = client.getState(STATE_STORE_NAME, KEY_NAME, MyClass.class); System.out.println("Retrieved class message from state: " + (retrievedMessageMono.block().getValue()).message); System.out.println("Deleting state..."); - Mono mono = client.deleteState(KEY_NAME); + Mono mono = client.deleteState(STATE_STORE_NAME, KEY_NAME); mono.block(); - Mono> retrievedDeletedMessageMono = client.getState(KEY_NAME, MyClass.class); + Mono> retrievedDeletedMessageMono = client.getState(STATE_STORE_NAME, KEY_NAME, MyClass.class); System.out.println("Trying to retrieve deleted state: " + retrievedDeletedMessageMono.block().getValue()); } diff --git a/proto/dapr/dapr.proto b/proto/dapr/dapr.proto index b87d0f4fc..ac451710a 100644 --- a/proto/dapr/dapr.proto +++ b/proto/dapr/dapr.proto @@ -1,106 +1,109 @@ -syntax = "proto3"; - -package dapr; - -import "google/protobuf/any.proto"; -import "google/protobuf/empty.proto"; -import "google/protobuf/duration.proto"; - -option java_outer_classname = "DaprProtos"; -option java_package = "io.dapr"; - -option csharp_namespace = "Dapr.Client.Grpc"; - - -// Dapr definitions -service Dapr { - rpc PublishEvent(PublishEventEnvelope) returns (google.protobuf.Empty) {} - rpc InvokeService(InvokeServiceEnvelope) returns (InvokeServiceResponseEnvelope) {} - rpc InvokeBinding(InvokeBindingEnvelope) returns (google.protobuf.Empty) {} - rpc GetState(GetStateEnvelope) returns (GetStateResponseEnvelope) {} - rpc SaveState(SaveStateEnvelope) returns (google.protobuf.Empty) {} - rpc DeleteState(DeleteStateEnvelope) returns (google.protobuf.Empty) {} -} - -message InvokeServiceResponseEnvelope { - google.protobuf.Any data = 1; - map metadata = 2; -} - -message DeleteStateEnvelope { - string key = 1; - string etag = 2; - StateOptions options = 3; -} - -message SaveStateEnvelope { - repeated StateRequest requests = 1; -} - -message GetStateEnvelope { - string key = 1; - string consistency = 2; -} - -message GetStateResponseEnvelope { - google.protobuf.Any data = 1; - string etag = 2; -} - -message InvokeBindingEnvelope { - string name = 1; - google.protobuf.Any data = 2; - map metadata = 3; -} - -message InvokeServiceEnvelope { - string id = 1; - string method = 2; - google.protobuf.Any data = 3; - map metadata = 4; -} - -message PublishEventEnvelope { - string topic = 1; - google.protobuf.Any data = 2; -} - -message State { - string key = 1; - google.protobuf.Any value = 2; - string etag = 3; - map metadata = 4; - StateOptions options = 5; -} - -message StateOptions { - string concurrency = 1; - string consistency = 2; - RetryPolicy retryPolicy = 3; -} - -message RetryPolicy { - int32 threshold = 1; - string pattern = 2; - google.protobuf.Duration interval = 3; -} - -message StateRequest { - string key = 1; - google.protobuf.Any value = 2; - string etag = 3; - map metadata = 4; - StateRequestOptions options = 5; -} - -message StateRequestOptions { - string concurrency = 1; - string consistency = 2; - StateRetryPolicy retryPolicy = 3; -} - -message StateRetryPolicy { - int32 threshold = 1; - string pattern = 2; - google.protobuf.Duration interval = 3; -} +syntax = "proto3"; + +package dapr; + +import "google/protobuf/any.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/duration.proto"; + +option java_outer_classname = "DaprProtos"; +option java_package = "io.dapr"; + +option csharp_namespace = "Dapr.Client.Grpc"; + + +// Dapr definitions +service Dapr { + rpc PublishEvent(PublishEventEnvelope) returns (google.protobuf.Empty) {} + rpc InvokeService(InvokeServiceEnvelope) returns (InvokeServiceResponseEnvelope) {} + rpc InvokeBinding(InvokeBindingEnvelope) returns (google.protobuf.Empty) {} + rpc GetState(GetStateEnvelope) returns (GetStateResponseEnvelope) {} + rpc SaveState(SaveStateEnvelope) returns (google.protobuf.Empty) {} + rpc DeleteState(DeleteStateEnvelope) returns (google.protobuf.Empty) {} +} + +message InvokeServiceResponseEnvelope { + google.protobuf.Any data = 1; + map metadata = 2; +} + +message DeleteStateEnvelope { + string storeName = 1; + string key = 2; + string etag = 3; + StateOptions options = 4; +} + +message SaveStateEnvelope { + string storeName = 1; + repeated StateRequest requests = 2; +} + +message GetStateEnvelope { + string storeName = 1; + string key = 2; + string consistency = 3; +} + +message GetStateResponseEnvelope { + google.protobuf.Any data = 1; + string etag = 2; +} + +message InvokeBindingEnvelope { + string name = 1; + google.protobuf.Any data = 2; + map metadata = 3; +} + +message InvokeServiceEnvelope { + string id = 1; + string method = 2; + google.protobuf.Any data = 3; + map metadata = 4; +} + +message PublishEventEnvelope { + string topic = 1; + google.protobuf.Any data = 2; +} + +message State { + string key = 1; + google.protobuf.Any value = 2; + string etag = 3; + map metadata = 4; + StateOptions options = 5; +} + +message StateOptions { + string concurrency = 1; + string consistency = 2; + RetryPolicy retryPolicy = 3; +} + +message RetryPolicy { + int32 threshold = 1; + string pattern = 2; + google.protobuf.Duration interval = 3; +} + +message StateRequest { + string key = 1; + google.protobuf.Any value = 2; + string etag = 3; + map metadata = 4; + StateRequestOptions options = 5; +} + +message StateRequestOptions { + string concurrency = 1; + string consistency = 2; + StateRetryPolicy retryPolicy = 3; +} + +message StateRetryPolicy { + int32 threshold = 1; + string pattern = 2; + google.protobuf.Duration interval = 3; +} diff --git a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java index 7ae25b8a2..b9c05ae56 100644 --- a/sdk-tests/src/test/java/io/dapr/it/BaseIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/BaseIT.java @@ -11,6 +11,8 @@ import org.junit.AfterClass; public abstract class BaseIT { + protected static final String STATE_STORE_NAME = "statestore"; + private static final Collection DAPR_RUNS = new ArrayList<>(); protected static DaprRun startDaprApp( diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java index eadf416d7..776850e21 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java @@ -62,13 +62,16 @@ public class PubSubIT extends BaseIT { assertEquals(11, messages.size()); for (int i = 0; i < NUM_MESSAGES; i++) { - - assertTrue(messages.get(i).startsWith("This is message ")); - + assertTrue(messages.contains(String.format("This is message #%d", i))); } - byte[] result=new byte[] { 1 }; - assertEquals(result.length, messages.get(10).getBytes().length); - assertEquals(result[0], messages.get(10).getBytes()[0]); + + boolean foundByte = false; + for (String message : messages) { + if ((message.getBytes().length == 1) && (message.getBytes()[0] == 1)) { + foundByte = true; + } + } + assertTrue(foundByte); }, 2000); } diff --git a/sdk-tests/src/test/java/io/dapr/it/state/GRPCStateClientIT.java b/sdk-tests/src/test/java/io/dapr/it/state/GRPCStateClientIT.java index b892007a1..1e53fe961 100644 --- a/sdk-tests/src/test/java/io/dapr/it/state/GRPCStateClientIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/state/GRPCStateClientIT.java @@ -62,12 +62,12 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //create of the deferred call to DAPR to store the state - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save action saveResponse.block(); //create of the deferred call to DAPR to get the state - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); //retrieve the state State myDataResponse = response.block(); @@ -94,7 +94,7 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute save action to DAPR saveResponse.block(); @@ -102,12 +102,13 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyA("data in property A"); data.setPropertyB("data in property B2"); //create deferred action to update the sate without any etag or options - saveResponse = daprClient.saveState(stateKey, null, data, null); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the update action to DAPR saveResponse.block(); //Create deferred action to retrieve the action - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), + MyData.class); //execute the retrieve of the state State myDataResponse = response.block(); @@ -126,12 +127,13 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyA("data in property A"); data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), + MyData.class); //execute the retrieve of the state State myDataResponse = response.block(); @@ -140,12 +142,12 @@ public class GRPCStateClientIT extends BaseIT { assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); //create deferred action to delete the state - Mono deleteResponse = daprClient.deleteState(stateKey, null, null); + Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, null, null); //execute the delete action deleteResponse.block(); //Create deferred action to retrieve the state - response = daprClient.getState(new State(stateKey, null, null), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); //execute the retrieve of the state myDataResponse = response.block(); @@ -166,12 +168,13 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), + MyData.class); //execute the action for retrieve the state and the etag State myDataResponse = response.block(); @@ -188,11 +191,11 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyA("data in property A2"); data.setPropertyB("data in property B2"); //Create deferred action to update the data using the correct etag - saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, null); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, null); saveResponse.block(); - response = daprClient.getState(new State(stateKey, null, null), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); //retrive the data wihout any etag myDataResponse = response.block(); @@ -206,7 +209,8 @@ public class GRPCStateClientIT extends BaseIT { assertEquals("data in property B2", myDataResponse.getValue().getPropertyB()); } - @Ignore("This test case is ignored because DAPR ignore the ETag is wrong when is sent from GRPC protocol, the execution continues and the state is updated.") + @Ignore("This test case is ignored because DAPR ignore the ETag is wrong when is sent from GRPC protocol, the " + + "execution continues and the state is updated.") @Test(expected = RuntimeException.class) public void saveUpdateAndGetStateWithWrongEtag() { final String stateKey = "keyToBeUpdatedWithWrongEtag"; @@ -217,12 +221,13 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), + MyData.class); //execute the action for retrieve the state and the etag State myDataResponse = response.block(); @@ -239,11 +244,11 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyA("data in property A2"); data.setPropertyB("data in property B2"); //Create deferred action to update the data using the incorrect etag - saveResponse = daprClient.saveState(stateKey, "99999999999999", data, null); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "99999999999999", data, null); saveResponse.block(); - response = daprClient.getState(new State(stateKey, null, null), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); //retrive the data wihout any etag myDataResponse = response.block(); @@ -267,12 +272,13 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyA("data in property A"); data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to get the state with the etag - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), + MyData.class); //execute the get state State myDataResponse = response.block(); @@ -283,19 +289,20 @@ public class GRPCStateClientIT extends BaseIT { assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); //Create deferred action to delete an state sending the etag - Mono deleteResponse = daprClient.deleteState(stateKey, myDataResponse.getEtag(), null); + Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), null); //execute the delete of the state deleteResponse.block(); //Create deferred action to get the sate without an etag - response = daprClient.getState(new State(stateKey, null, null), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); myDataResponse = response.block(); //Review that the response is null, because the state was deleted assertNull(myDataResponse.getValue()); } - @Ignore("This test case is ignored because DAPR ignore if the ETag is wrong when is sent from GRPC protocol, the execution continues and the state is deleted.") + @Ignore("This test case is ignored because DAPR ignore if the ETag is wrong when is sent from GRPC protocol, the " + + "execution continues and the state is deleted.") @Test(expected = RuntimeException.class) public void saveAndDeleteStateWithWrongEtag() { final String stateKey = "myeKeyToBeDeletedWithWrongEtag"; @@ -306,12 +313,13 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyA("data in property A"); data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to get the state with the etag - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), + MyData.class); //execute the get state State myDataResponse = response.block(); @@ -322,25 +330,27 @@ public class GRPCStateClientIT extends BaseIT { assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); //Create deferred action to delete an state sending the incorrect etag - Mono deleteResponse = daprClient.deleteState(stateKey, "99999999999", null); + Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999999", null); //execute the delete of the state, this should trhow an exception deleteResponse.block(); //Create deferred action to get the sate without an etag - response = daprClient.getState(new State(stateKey, null, null), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); myDataResponse = response.block(); //Review that the response is null, because the state was deleted assertNull(myDataResponse.getValue()); } - @Ignore("This test case is ignored because it seems that DAPR using GRPC is ignoring the state options for consistency and concurrency.") + @Ignore("This test case is ignored because it seems that DAPR using GRPC is ignoring the state options for " + + "consistency and concurrency.") @Test(expected = RuntimeException.class) public void saveUpdateAndGetStateWithEtagAndStateOptionsFirstWrite() { final String stateKey = "keyToBeUpdatedWithEtagAndOptions"; //create option with concurrency with first writte and consistency of strong - StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, null); + StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, + StateOptions.Concurrency.FIRST_WRITE, null); //create Dummy data @@ -349,13 +359,14 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //create state using stateOptions - Mono saveResponse = daprClient.saveState(stateKey, null, data, stateOptions); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions); //execute the save state saveResponse.block(); //crate deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), + MyData.class); //execute the retrieve of the state using options State myDataResponse = response.block(); @@ -369,7 +380,7 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyA("data in property A2"); data.setPropertyB("data in property B2"); //create deferred action to update the action with options - saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions); //update the state saveResponse.block(); @@ -377,11 +388,11 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyA("last write"); data.setPropertyB("data in property B2"); //create deferred action to update the action with the same etag - saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions); //throws an exception, the state was already udpated saveResponse.block(); - response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class); State myLastDataResponse = response.block(); assertNotNull(myLastDataResponse.getEtag()); @@ -397,7 +408,8 @@ public class GRPCStateClientIT extends BaseIT { final String stateKey = "keyToBeUpdatedWithEtagAndOptions"; //create option with concurrency with first writte and consistency of strong - StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE, null); + StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE + , null); //create Dummy data @@ -406,13 +418,14 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //create state using stateOptions - Mono saveResponse = daprClient.saveState(stateKey, null, data, stateOptions); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions); //execute the save state saveResponse.block(); //crate deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), + MyData.class); //execute the retrieve of the state using options State myDataResponse = response.block(); @@ -426,7 +439,7 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyA("data in property A2"); data.setPropertyB("data in property B2"); //create deferred action to update the action with options - saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions); //update the state saveResponse.block(); @@ -434,11 +447,11 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyA("last write"); data.setPropertyB("data in property B2"); //create deferred action to update the action with the same etag - saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions); //update the state without an error saveResponse.block(); - response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class); State myLastDataResponse = response.block(); assertNotNull(myLastDataResponse.getEtag()); @@ -452,7 +465,8 @@ public class GRPCStateClientIT extends BaseIT { @Test(timeout = 13000) public void saveDeleteWithRetry() { final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry"; - StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(3), 3, StateOptions.RetryPolicy.Pattern.LINEAR); + StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(3), 3, + StateOptions.RetryPolicy.Pattern.LINEAR); StateOptions stateOptions = new StateOptions(null, null, retryPolicy); @@ -462,12 +476,12 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); //execute the action for retrieve the state and the etag State myDataResponse = response.block(); @@ -479,7 +493,7 @@ public class GRPCStateClientIT extends BaseIT { assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); - Mono deleteResponse = daprClient.deleteState(stateKey, "99999999", stateOptions); + Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999", stateOptions); long start = System.currentTimeMillis(); try { @@ -499,7 +513,8 @@ public class GRPCStateClientIT extends BaseIT { @Test(timeout = 13000) public void saveUpdateWithRetry() { final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry"; - StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(4), 3, StateOptions.RetryPolicy.Pattern.LINEAR); + StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(4), 3, + StateOptions.RetryPolicy.Pattern.LINEAR); StateOptions stateOptions = new StateOptions(null, null, retryPolicy); @@ -509,12 +524,12 @@ public class GRPCStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); //execute the action for retrieve the state and the etag State myDataResponse = response.block(); @@ -526,7 +541,7 @@ public class GRPCStateClientIT extends BaseIT { assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); //Create deferred action to save the sate - saveResponse = daprClient.saveState(stateKey, "9999999", data, stateOptions); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "9999999", data, stateOptions); //execute the save state action long start = System.currentTimeMillis(); diff --git a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java index 2bd38c03c..d88441d47 100644 --- a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldClientIT.java @@ -33,6 +33,7 @@ public class HelloWorldClientIT extends BaseIT { { DaprProtos.GetStateEnvelope req = DaprProtos.GetStateEnvelope .newBuilder() + .setStoreName(STATE_STORE_NAME) .setKey(key) .build(); DaprProtos.GetStateResponseEnvelope response = client.getState(req); @@ -45,6 +46,7 @@ public class HelloWorldClientIT extends BaseIT { { DaprProtos.DeleteStateEnvelope req = DaprProtos.DeleteStateEnvelope .newBuilder() + .setStoreName(STATE_STORE_NAME) .setKey(key) .build(); client.deleteState(req); @@ -54,6 +56,7 @@ public class HelloWorldClientIT extends BaseIT { { DaprProtos.GetStateEnvelope req = DaprProtos.GetStateEnvelope .newBuilder() + .setStoreName(STATE_STORE_NAME) .setKey(key) .build(); DaprProtos.GetStateResponseEnvelope response = client.getState(req); diff --git a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java index 7c79b9999..087b3382e 100644 --- a/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java +++ b/sdk-tests/src/test/java/io/dapr/it/state/HelloWorldGrpcStateService.java @@ -38,13 +38,14 @@ public class HelloWorldGrpcStateService { String value = "Hello World"; StateRequest req = StateRequest - .newBuilder() - .setKey(key) - .setValue(Any.newBuilder().setValue(ByteString.copyFromUtf8(value)).build()) - .build(); + .newBuilder() + .setKey(key) + .setValue(Any.newBuilder().setValue(ByteString.copyFromUtf8(value)).build()) + .build(); SaveStateEnvelope state = SaveStateEnvelope.newBuilder() - .addRequests(req) - .build(); + .setStoreName("statestore") + .addRequests(req) + .build(); client.saveState(state); System.out.println("Saved!"); channel.shutdown(); diff --git a/sdk-tests/src/test/java/io/dapr/it/state/HttpStateClientIT.java b/sdk-tests/src/test/java/io/dapr/it/state/HttpStateClientIT.java index b24481de9..74bfc7488 100644 --- a/sdk-tests/src/test/java/io/dapr/it/state/HttpStateClientIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/state/HttpStateClientIT.java @@ -54,12 +54,12 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //create of the deferred call to DAPR to store the state - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save action saveResponse.block(); //create of the deferred call to DAPR to get the state - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); //retrieve the state State myDataResponse = response.block(); @@ -86,7 +86,7 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute save action to DAPR saveResponse.block(); @@ -94,12 +94,13 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyA("data in property A"); data.setPropertyB("data in property B2"); //create deferred action to update the sate without any etag or options - saveResponse = daprClient.saveState(stateKey, null, data, null); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the update action to DAPR saveResponse.block(); //Create deferred action to retrieve the action - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), + MyData.class); //execute the retrieve of the state State myDataResponse = response.block(); @@ -121,12 +122,13 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyA("data in property A"); data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), + MyData.class); //execute the retrieve of the state State myDataResponse = response.block(); @@ -135,12 +137,12 @@ public class HttpStateClientIT extends BaseIT { Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); //create deferred action to delete the state - Mono deleteResponse = daprClient.deleteState(stateKey, null, null); + Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, null, null); //execute the delete action deleteResponse.block(); //Create deferred action to retrieve the state - response = daprClient.getState(new State(stateKey, null, null), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); //execute the retrieve of the state myDataResponse = response.block(); @@ -161,12 +163,13 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), + MyData.class); //execute the action for retrieve the state and the etag State myDataResponse = response.block(); @@ -183,11 +186,11 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyA("data in property A2"); data.setPropertyB("data in property B2"); //Create deferred action to update the data using the correct etag - saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, null); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, null); saveResponse.block(); - response = daprClient.getState(new State(stateKey, null, null), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); //retrive the data wihout any etag myDataResponse = response.block(); @@ -214,12 +217,13 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), + MyData.class); //execute the action for retrieve the state and the etag State myDataResponse = response.block(); @@ -236,11 +240,11 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyA("data in property A2"); data.setPropertyB("data in property B2"); //Create deferred action to update the data using the incorrect etag - saveResponse = daprClient.saveState(stateKey, "99999999999999", data, null); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "99999999999999", data, null); saveResponse.block(); - response = daprClient.getState(new State(stateKey, null, null), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); //retrive the data wihout any etag myDataResponse = response.block(); @@ -264,12 +268,13 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyA("data in property A"); data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to get the state with the etag - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), + MyData.class); //execute the get state State myDataResponse = response.block(); @@ -280,12 +285,12 @@ public class HttpStateClientIT extends BaseIT { Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); //Create deferred action to delete an state sending the etag - Mono deleteResponse = daprClient.deleteState(stateKey, myDataResponse.getEtag(), null); + Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), null); //execute the delete of the state deleteResponse.block(); //Create deferred action to get the sate without an etag - response = daprClient.getState(new State(stateKey, null, null), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); myDataResponse = response.block(); //Review that the response is null, because the state was deleted @@ -304,12 +309,13 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyA("data in property A"); data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to get the state with the etag - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), + MyData.class); //execute the get state State myDataResponse = response.block(); @@ -320,12 +326,12 @@ public class HttpStateClientIT extends BaseIT { Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); //Create deferred action to delete an state sending the incorrect etag - Mono deleteResponse = daprClient.deleteState(stateKey, "99999999999", null); + Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999999", null); //execute the delete of the state, this should trhow an exception deleteResponse.block(); //Create deferred action to get the sate without an etag - response = daprClient.getState(new State(stateKey, null, null), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); myDataResponse = response.block(); //Review that the response is null, because the state was deleted @@ -337,7 +343,8 @@ public class HttpStateClientIT extends BaseIT { final String stateKey = "keyToBeUpdatedWithEtagAndOptions"; //create option with concurrency with first writte and consistency of strong - StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, null); + StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, + StateOptions.Concurrency.FIRST_WRITE, null); //create dapr client DaprClient daprClient = buildDaprClient(); @@ -347,13 +354,14 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //create state using stateOptions - Mono saveResponse = daprClient.saveState(stateKey, null, data, stateOptions); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions); //execute the save state saveResponse.block(); //crate deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), + MyData.class); //execute the retrieve of the state using options State myDataResponse = response.block(); @@ -367,7 +375,7 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyA("data in property A2"); data.setPropertyB("data in property B2"); //create deferred action to update the action with options - saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions); //update the state saveResponse.block(); @@ -375,11 +383,11 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyA("last write"); data.setPropertyB("data in property B2"); //create deferred action to update the action with the same etag - saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions); //throws an exception, the state was already udpated saveResponse.block(); - response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class); State myLastDataResponse = response.block(); Assert.assertNotNull(myLastDataResponse.getEtag()); @@ -395,7 +403,8 @@ public class HttpStateClientIT extends BaseIT { final String stateKey = "keyToBeUpdatedWithEtagAndOptions"; //create option with concurrency with first writte and consistency of strong - StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE, null); + StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE + , null); //create dapr client DaprClient daprClient = buildDaprClient(); @@ -405,13 +414,14 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //create state using stateOptions - Mono saveResponse = daprClient.saveState(stateKey, null, data, stateOptions); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, stateOptions); //execute the save state saveResponse.block(); //crate deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), + MyData.class); //execute the retrieve of the state using options State myDataResponse = response.block(); @@ -425,7 +435,7 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyA("data in property A2"); data.setPropertyB("data in property B2"); //create deferred action to update the action with options - saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions); //update the state saveResponse.block(); @@ -433,11 +443,11 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyA("last write"); data.setPropertyB("data in property B2"); //create deferred action to update the action with the same etag - saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, myDataResponse.getEtag(), data, stateOptions); //update the state without an error saveResponse.block(); - response = daprClient.getState(new State(stateKey, null, stateOptions), MyData.class); + response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, stateOptions), MyData.class); State myLastDataResponse = response.block(); Assert.assertNotNull(myLastDataResponse.getEtag()); @@ -451,7 +461,8 @@ public class HttpStateClientIT extends BaseIT { @Test(timeout = 13000) public void saveDeleteWithRetry() { final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry"; - StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(3), 3, StateOptions.RetryPolicy.Pattern.LINEAR); + StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(3), 3, + StateOptions.RetryPolicy.Pattern.LINEAR); StateOptions stateOptions = new StateOptions(null, null, retryPolicy); //create DAPR client @@ -462,12 +473,12 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); //execute the action for retrieve the state and the etag State myDataResponse = response.block(); @@ -479,7 +490,7 @@ public class HttpStateClientIT extends BaseIT { Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); - Mono deleteResponse = daprClient.deleteState(stateKey, "99999999", stateOptions); + Mono deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999", stateOptions); long start = System.currentTimeMillis(); try { @@ -499,7 +510,8 @@ public class HttpStateClientIT extends BaseIT { @Test(timeout = 13000) public void saveUpdateWithRetry() { final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry"; - StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(4), 3, StateOptions.RetryPolicy.Pattern.EXPONENTIAL); + StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(4), 3, + StateOptions.RetryPolicy.Pattern.EXPONENTIAL); StateOptions stateOptions = new StateOptions(null, null, retryPolicy); //create DAPR client @@ -510,12 +522,12 @@ public class HttpStateClientIT extends BaseIT { data.setPropertyB("data in property B"); //Create deferred action to save the sate - Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + Mono saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null); //execute the save state action saveResponse.block(); //Create deferred action to retrieve the state - Mono> response = daprClient.getState(new State(stateKey, null, null), MyData.class); + Mono> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class); //execute the action for retrieve the state and the etag State myDataResponse = response.block(); @@ -527,7 +539,7 @@ public class HttpStateClientIT extends BaseIT { Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); //Create deferred action to save the sate - saveResponse = daprClient.saveState(stateKey, "9999999", data, stateOptions); + saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "9999999", data, stateOptions); //execute the save state action long start = System.currentTimeMillis(); diff --git a/sdk/src/main/java/io/dapr/client/DaprClient.java b/sdk/src/main/java/io/dapr/client/DaprClient.java index a46511c76..c1f969244 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprClient.java @@ -148,78 +148,86 @@ public interface DaprClient { /** * Retrieve a State based on their key. * - * @param state State to be re-retrieved. - * @param clazz The Type of State needed as return. - * @param The Type of the return. + * @param stateStoreName The name of the state store. + * @param state State to be re-retrieved. + * @param clazz The Type of State needed as return. + * @param The Type of the return. * @return A Mono Plan for the requested State. */ - Mono> getState(State state, Class clazz); + Mono> getState(String stateStoreName, State state, Class clazz); /** * Retrieve a State based on their key. * - * @param key The key of the State to be retrieved. - * @param clazz The Type of State needed as return. - * @param The Type of the return. + * @param stateStoreName The name of the state store. + * @param key The key of the State to be retrieved. + * @param clazz The Type of State needed as return. + * @param The Type of the return. * @return A Mono Plan for the requested State. */ - Mono> getState(String key, Class clazz); + Mono> getState(String stateStoreName, String key, Class clazz); /** * Retrieve a State based on their key. * - * @param key The key of the State to be retrieved. - * @param etag Optional etag for conditional get - * @param options Optional settings for retrieve operation. - * @param clazz The Type of State needed as return. - * @param The Type of the return. + * @param stateStoreName The name of the state store. + * @param key The key of the State to be retrieved. + * @param etag Optional etag for conditional get + * @param options Optional settings for retrieve operation. + * @param clazz The Type of State needed as return. + * @param The Type of the return. * @return A Mono Plan for the requested State. */ - Mono> getState(String key, String etag, StateOptions options, Class clazz); + Mono> getState(String stateStoreName, String key, String etag, StateOptions options, Class clazz); /** * Save/Update a list of states. * - * @param states the States to be saved. + * @param stateStoreName The name of the state store. + * @param states The States to be saved. * @return a Mono plan of type Void. */ - Mono saveStates(List> states); + Mono saveStates(String stateStoreName, List> states); /** * Save/Update a state. * - * @param key the key of the state. - * @param value the value of the state. + * @param stateStoreName The name of the state store. + * @param key The key of the state. + * @param value The value of the state. * @return a Mono plan of type Void. */ - Mono saveState(String key, Object value); + Mono saveState(String stateStoreName, String key, Object value); /** * Save/Update a state. * - * @param key the key of the state. - * @param etag the etag to be used. - * @param value the value of the state. - * @param options the Options to use for each state. + * @param stateStoreName The name of the state store. + * @param key The key of the state. + * @param etag The etag to be used. + * @param value The value of the state. + * @param options The Options to use for each state. * @return a Mono plan of type Void. */ - Mono saveState(String key, String etag, Object value, StateOptions options); + Mono saveState(String stateStoreName, String key, String etag, Object value, StateOptions options); /** * Delete a state. * - * @param key The key of the State to be removed. + * @param stateStoreName The name of the state store. + * @param key The key of the State to be removed. * @return a Mono plan of type Void. */ - Mono deleteState(String key); + Mono deleteState(String stateStoreName, String key); /** * Delete a state. * - * @param key The key of the State to be removed. - * @param etag Optional etag for conditional delete. - * @param options Optional settings for state operation. + * @param stateStoreName The name of the state store. + * @param key The key of the State to be removed. + * @param etag Optional etag for conditional delete. + * @param options Optional settings for state operation. * @return a Mono plan of type Void. */ - Mono deleteState(String key, String etag, StateOptions options); + Mono deleteState(String stateStoreName, String key, String etag, StateOptions options); } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index 9eaed0a09..e08799d9c 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -210,25 +210,33 @@ public class DaprClientGrpc implements DaprClient { * {@inheritDoc} */ @Override - public Mono> getState(State state, Class clazz) { - return this.getState(state.getKey(), state.getEtag(), state.getOptions(), clazz); + public Mono> getState(String stateStoreName, State state, Class clazz) { + return this.getState(stateStoreName, state.getKey(), state.getEtag(), state.getOptions(), clazz); } /** * {@inheritDoc} */ @Override - public Mono> getState(String key, Class clazz) { - return this.getState(key, null, null, clazz); + public Mono> getState(String stateStoreName, String key, Class clazz) { + return this.getState(stateStoreName, key, null, null, clazz); } /** * {@inheritDoc} */ @Override - public Mono> getState(String key, String etag, StateOptions options, Class clazz) { + public Mono> getState( + String stateStoreName, String key, String etag, StateOptions options, Class clazz) { try { + if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) { + throw new IllegalArgumentException("State store name cannot be null or empty."); + } + if ((key == null) || (key.trim().isEmpty())) { + throw new IllegalArgumentException("Key cannot be null or empty."); + } DaprProtos.GetStateEnvelope.Builder builder = DaprProtos.GetStateEnvelope.newBuilder() + .setStoreName(stateStoreName) .setKey(key); if (options != null && options.getConsistency() != null) { builder.setConsistency(options.getConsistency().getValue()); @@ -267,9 +275,13 @@ public class DaprClientGrpc implements DaprClient { * {@inheritDoc} */ @Override - public Mono saveStates(List> states) { + public Mono saveStates(String stateStoreName, List> states) { try { + if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) { + throw new IllegalArgumentException("State store name cannot be null or empty."); + } DaprProtos.SaveStateEnvelope.Builder builder = DaprProtos.SaveStateEnvelope.newBuilder(); + builder.setStoreName(stateStoreName); for (State state : states) { builder.addRequests(buildStateRequest(state).build()); } @@ -342,33 +354,40 @@ public class DaprClientGrpc implements DaprClient { * {@inheritDoc} */ @Override - public Mono saveState(String key, Object value) { - return this.saveState(key, null, value, null); + public Mono saveState(String stateStoreName, String key, Object value) { + return this.saveState(stateStoreName, key, null, value, null); } /** * {@inheritDoc} */ @Override - public Mono saveState(String key, String etag, Object value, StateOptions options) { + public Mono saveState(String stateStoreName, String key, String etag, Object value, StateOptions options) { State state = new State<>(value, key, etag, options); - return this.saveStates(Arrays.asList(state)); + return this.saveStates(stateStoreName, Arrays.asList(state)); } /** * {@inheritDoc} */ @Override - public Mono deleteState(String key) { - return this.deleteState(key, null, null); + public Mono deleteState(String stateStoreName, String key) { + return this.deleteState(stateStoreName, key, null, null); } /** * {@inheritDoc} */ @Override - public Mono deleteState(String key, String etag, StateOptions options) { + public Mono deleteState(String stateStoreName, String key, String etag, StateOptions options) { try { + if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) { + throw new IllegalArgumentException("State store name cannot be null or empty."); + } + if ((key == null) || (key.trim().isEmpty())) { + throw new IllegalArgumentException("Key cannot be null or empty."); + } + DaprProtos.StateOptions.Builder optionBuilder = null; if (options != null) { optionBuilder = DaprProtos.StateOptions.newBuilder(); @@ -402,6 +421,7 @@ public class DaprClientGrpc implements DaprClient { } } DaprProtos.DeleteStateEnvelope.Builder builder = DaprProtos.DeleteStateEnvelope.newBuilder() + .setStoreName(stateStoreName) .setKey(key); if (etag != null) { builder.setEtag(etag); diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java index d1c154c26..33909a3a6 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java @@ -268,26 +268,30 @@ public class DaprClientHttp implements DaprClient { * {@inheritDoc} */ @Override - public Mono> getState(State state, Class clazz) { - return this.getState(state.getKey(), state.getEtag(), state.getOptions(), clazz); + public Mono> getState(String stateStoreName, State state, Class clazz) { + return this.getState(stateStoreName, state.getKey(), state.getEtag(), state.getOptions(), clazz); } /** * {@inheritDoc} */ @Override - public Mono> getState(String key, Class clazz) { - return this.getState(key, null, null, clazz); + public Mono> getState(String stateStoreName, String key, Class clazz) { + return this.getState(stateStoreName, key, null, null, clazz); } /** * {@inheritDoc} */ @Override - public Mono> getState(String key, String etag, StateOptions options, Class clazz) { + public Mono> getState( + String stateStoreName, String key, String etag, StateOptions options, Class clazz) { try { - if (key == null) { - throw new IllegalArgumentException("Name cannot be null or empty."); + if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) { + throw new IllegalArgumentException("State store name cannot be null or empty."); + } + if ((key == null) || (key.trim().isEmpty())) { + throw new IllegalArgumentException("Key cannot be null or empty."); } Map headers = new HashMap<>(); if (etag != null && !etag.trim().isEmpty()) { @@ -295,6 +299,8 @@ public class DaprClientHttp implements DaprClient { } StringBuilder url = new StringBuilder(Constants.STATE_PATH) + .append("/") + .append(stateStoreName) .append("/") .append(key); Map urlParameters = Optional.ofNullable(options) @@ -319,8 +325,11 @@ public class DaprClientHttp implements DaprClient { * {@inheritDoc} */ @Override - public Mono saveStates(List> states) { + public Mono saveStates(String stateStoreName, List> states) { try { + if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) { + throw new IllegalArgumentException("State store name cannot be null or empty."); + } if (states == null || states.isEmpty()) { return Mono.empty(); } @@ -330,7 +339,7 @@ public class DaprClientHttp implements DaprClient { if (etag != null && !etag.trim().isEmpty()) { headers.put(Constants.HEADER_HTTP_ETAG_ID, etag); } - final String url = Constants.STATE_PATH; + final String url = Constants.STATE_PATH + "/" + stateStoreName; List> internalStateObjects = new ArrayList<>(states.size()); for (State state : states) { if (state == null) { @@ -356,41 +365,44 @@ public class DaprClientHttp implements DaprClient { * {@inheritDoc} */ @Override - public Mono saveState(String key, Object value) { - return this.saveState(key, null, value, null); + public Mono saveState(String stateStoreName, String key, Object value) { + return this.saveState(stateStoreName, key, null, value, null); } /** * {@inheritDoc} */ @Override - public Mono saveState(String key, String etag, Object value, StateOptions options) { + public Mono saveState(String stateStoreName, String key, String etag, Object value, StateOptions options) { return Mono.fromSupplier(() -> new State<>(value, key, etag, options)) - .flatMap(state -> saveStates(Arrays.asList(state))); + .flatMap(state -> saveStates(stateStoreName, Arrays.asList(state))); } /** * {@inheritDoc} */ @Override - public Mono deleteState(String key) { - return this.deleteState(key, null, null); + public Mono deleteState(String stateStoreName, String key) { + return this.deleteState(stateStoreName, key, null, null); } /** * {@inheritDoc} */ @Override - public Mono deleteState(String key, String etag, StateOptions options) { + public Mono deleteState(String stateStoreName, String key, String etag, StateOptions options) { try { - if (key == null || key.trim().isEmpty()) { - throw new IllegalArgumentException("Name cannot be null or empty."); + if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) { + throw new IllegalArgumentException("State store name cannot be null or empty."); + } + if ((key == null) || (key.trim().isEmpty())) { + throw new IllegalArgumentException("Key cannot be null or empty."); } Map headers = new HashMap<>(); if (etag != null && !etag.trim().isEmpty()) { headers.put(Constants.HEADER_HTTP_ETAG_ID, etag); } - String url = Constants.STATE_PATH + "/" + key; + String url = Constants.STATE_PATH + "/" + stateStoreName + "/" + key; Map urlParameters = Optional.ofNullable(options) .map(stateOptions -> stateOptions.getStateOptionsAsMap()) .orElse(new HashMap<>()); diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java index 25a125aad..0c478f167 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java @@ -35,6 +35,8 @@ import static org.mockito.Mockito.*; public class DaprClientGrpcTest { + private static final String STATE_STORE_NAME = "MyStateStore"; + private DaprGrpc.DaprFutureStub client; private DaprClientGrpc adapter; private ObjectSerializer serializer; @@ -332,7 +334,7 @@ public class DaprClientGrpcTest { String request = "Request"; byte[] byteRequest = serializer.serialize(request); Mono result = - adapter.invokeService(Verb.GET, "appId", "method", byteRequest, (HashMap)null); + adapter.invokeService(Verb.GET, "appId", "method", byteRequest, (HashMap) null); settableFuture.setException(ex); result.block(); } @@ -351,7 +353,7 @@ public class DaprClientGrpcTest { String request = "Request"; byte[] byteRequest = serializer.serialize(request); Mono result = adapter.invokeService( - Verb.GET, "appId", "method", byteRequest, (HashMap)null); + Verb.GET, "appId", "method", byteRequest, (HashMap) null); byte[] byteOutput = result.block(); String strOutput = serializer.deserialize(byteOutput, String.class); assertEquals(expected, strOutput); @@ -434,7 +436,7 @@ public class DaprClientGrpcTest { public void getStateExceptionThrownTest() { when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class))).thenThrow(RuntimeException.class); State key = buildStateKey(null, "Key1", "ETag1", null); - Mono> result = adapter.getState(key, String.class); + Mono> result = adapter.getState(STATE_STORE_NAME, key, String.class); result.block(); } @@ -448,7 +450,7 @@ public class DaprClientGrpcTest { when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class))) .thenReturn(settableFuture); State key = buildStateKey(null, "Key1", "ETag1", null); - Mono> result = adapter.getState(key, String.class); + Mono> result = adapter.getState(STATE_STORE_NAME, key, String.class); settableFuture.setException(ex); result.block(); } @@ -466,7 +468,7 @@ public class DaprClientGrpcTest { when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class))) .thenReturn(settableFuture); State keyRequest = buildStateKey(null, key, etag, null); - Mono> result = adapter.getState(keyRequest, String.class); + Mono> result = adapter.getState(STATE_STORE_NAME, keyRequest, String.class); settableFuture.set(responseEnvelope); assertEquals(expectedState, result.block()); } @@ -489,7 +491,7 @@ public class DaprClientGrpcTest { addCallback(settableFuture, callback, directExecutor()); when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class))) .thenReturn(settableFuture); - Mono> result = adapter.getState(keyRequest, MyObject.class); + Mono> result = adapter.getState(STATE_STORE_NAME, keyRequest, MyObject.class); settableFuture.set(responseEnvelope); assertEquals(expectedState, result.block()); } @@ -512,7 +514,7 @@ public class DaprClientGrpcTest { addCallback(settableFuture, callback, directExecutor()); when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class))) .thenReturn(settableFuture); - Mono> result = adapter.getState(keyRequest, MyObject.class); + Mono> result = adapter.getState(STATE_STORE_NAME, keyRequest, MyObject.class); settableFuture.set(responseEnvelope); assertEquals(expectedState, result.block()); } @@ -521,7 +523,7 @@ public class DaprClientGrpcTest { public void deleteStateExceptionThrowTest() { when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))).thenThrow(RuntimeException.class); State key = buildStateKey(null, "Key1", "ETag1", null); - Mono result = adapter.deleteState(key.getKey(), key.getEtag(), key.getOptions()); + Mono result = adapter.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions()); result.block(); } @@ -535,7 +537,7 @@ public class DaprClientGrpcTest { when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); State key = buildStateKey(null, "Key1", "ETag1", null); - Mono result = adapter.deleteState(key.getKey(), key.getEtag(), key.getOptions()); + Mono result = adapter.deleteState(STATE_STORE_NAME, key.getKey(), key.getEtag(), key.getOptions()); settableFuture.setException(ex); result.block(); } @@ -550,7 +552,8 @@ public class DaprClientGrpcTest { when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, null); - Mono result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); + Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + stateKey.getOptions()); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -568,7 +571,8 @@ public class DaprClientGrpcTest { when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); - Mono result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); + Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + stateKey.getOptions()); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -586,7 +590,8 @@ public class DaprClientGrpcTest { when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); - Mono result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); + Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + stateKey.getOptions()); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -604,7 +609,8 @@ public class DaprClientGrpcTest { when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); - Mono result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); + Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + stateKey.getOptions()); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -622,7 +628,8 @@ public class DaprClientGrpcTest { when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); - Mono result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); + Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + stateKey.getOptions()); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -640,7 +647,8 @@ public class DaprClientGrpcTest { when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); - Mono result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); + Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + stateKey.getOptions()); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -658,7 +666,8 @@ public class DaprClientGrpcTest { when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); - Mono result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); + Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + stateKey.getOptions()); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -676,7 +685,8 @@ public class DaprClientGrpcTest { when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); State stateKey = buildStateKey(null, key, etag, options); - Mono result = adapter.deleteState(stateKey.getKey(), stateKey.getEtag(), stateKey.getOptions()); + Mono result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(), + stateKey.getOptions()); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -688,7 +698,7 @@ public class DaprClientGrpcTest { String etag = "ETag1"; String value = "State value"; when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenThrow(RuntimeException.class); - Mono result = adapter.saveState(key, etag, value, null); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null); result.block(); } @@ -702,7 +712,7 @@ public class DaprClientGrpcTest { MockCallback callback = new MockCallback<>(ex); addCallback(settableFuture, callback, directExecutor()); when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); - Mono result = adapter.saveState(key, etag, value, null); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null); settableFuture.setException(ex); result.block(); } @@ -716,7 +726,7 @@ public class DaprClientGrpcTest { MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); - Mono result = adapter.saveState(key, etag, value, null); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, null); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -733,7 +743,7 @@ public class DaprClientGrpcTest { when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); - Mono result = adapter.saveState(key, etag, value, options); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -750,7 +760,7 @@ public class DaprClientGrpcTest { when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE, Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); - Mono result = adapter.saveState(key, etag, value, options); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -767,7 +777,7 @@ public class DaprClientGrpcTest { when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null, Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); - Mono result = adapter.saveState(key, etag, value, options); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -784,7 +794,7 @@ public class DaprClientGrpcTest { when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, null, null, null); - Mono result = adapter.saveState(key, etag, value, options); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -801,7 +811,7 @@ public class DaprClientGrpcTest { when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, null, 1, StateOptions.RetryPolicy.Pattern.LINEAR); - Mono result = adapter.saveState(key, etag, value, options); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -818,7 +828,7 @@ public class DaprClientGrpcTest { when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR); - Mono result = adapter.saveState(key, etag, value, options); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -835,7 +845,7 @@ public class DaprClientGrpcTest { when(client.saveState(any(io.dapr.DaprProtos.SaveStateEnvelope.class))).thenReturn(settableFuture); StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, Duration.ofDays(100), 1, null); - Mono result = adapter.saveState(key, etag, value, options); + Mono result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options); settableFuture.set(Empty.newBuilder().build()); result.block(); assertTrue(callback.wasCalled); @@ -848,9 +858,9 @@ public class DaprClientGrpcTest { /** * The purpose of this test is to show that it doesn't matter when the client is called, the actual coll to DAPR * will be done when the output Mono response call the Mono.block method. - * Like for instanche if you call getState, withouth blocking for the response, and then call delete for the same state - * you just retrived but block for the delete response, when later you block for the response of the getState, you will - * not found the state. + * Like for instance if you call getState, without blocking for the response, and then call delete for the same + * state you just retrieved but block for the delete response, when later you block for the response of the getState, + * you will not find the state. *

This test will execute the following flow:

*
    *
  1. Exeucte client getState for Key=key1
  2. @@ -861,7 +871,8 @@ public class DaprClientGrpcTest { *
  3. Block for deleteState call.
  4. *
  5. Block for getState for Key=key2 and Assert they 2 was not found.
  6. *
- * @throws Exception - Test will fail if any unexpected exception is being thrown + * + * @throws Exception - Test will fail if any unexpected exception is being thrown. */ @Test @@ -877,17 +888,18 @@ public class DaprClientGrpcTest { futuresMap.put(key2, buildFutureGetStateEnvelop(expectedValue2, etag)); when(client.getState(argThat(new GetStateEnvelopeKeyMatcher(key1)))).thenReturn(futuresMap.get(key1)); State keyRequest1 = buildStateKey(null, key1, etag, null); - Mono> resultGet1 = adapter.getState(keyRequest1, String.class); + Mono> resultGet1 = adapter.getState(STATE_STORE_NAME, keyRequest1, String.class); assertEquals(expectedState1, resultGet1.block()); State keyRequest2 = buildStateKey(null, key2, etag, null); - Mono> resultGet2 = adapter.getState(keyRequest2, String.class); + Mono> resultGet2 = adapter.getState(STATE_STORE_NAME, keyRequest2, String.class); SettableFuture settableFutureDelete = SettableFuture.create(); MockCallback callbackDelete = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFutureDelete, callbackDelete, directExecutor()); when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFutureDelete); - Mono resultDelete = adapter.deleteState(keyRequest2.getKey(), keyRequest2.getEtag(), keyRequest2.getOptions()); + Mono resultDelete = adapter.deleteState(STATE_STORE_NAME, keyRequest2.getKey(), keyRequest2.getEtag(), + keyRequest2.getOptions()); settableFutureDelete.set(Empty.newBuilder().build()); resultDelete.block(); assertTrue(callbackDelete.wasCalled); @@ -916,7 +928,8 @@ public class DaprClientGrpcTest { } private StateOptions buildStateOptions(StateOptions.Consistency consistency, StateOptions.Concurrency concurrency, - Duration interval, Integer threshold, StateOptions.RetryPolicy.Pattern pattern) { + Duration interval, Integer threshold, + StateOptions.RetryPolicy.Pattern pattern) { StateOptions.RetryPolicy retryPolicy = null; if (interval != null || threshold != null || pattern != null) { diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index d6fc44ac5..659b32711 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -22,6 +22,8 @@ import static org.mockito.Mockito.mock; public class DaprClientHttpTest { + private static final String STATE_STORE_NAME = "MyStateStore"; + private DaprClientHttp daprClientHttp; private DaprHttp daprHttp; @@ -30,7 +32,8 @@ public class DaprClientHttpTest { private MockInterceptor mockInterceptor; - private final String EXPECTED_RESULT = "{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}"; + private final String EXPECTED_RESULT = + "{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}"; @Before public void setUp() throws Exception { @@ -215,14 +218,14 @@ public class DaprClientHttpTest { State stateKeyValue = new State("value", "key", "etag", stateOptions); State stateKeyNull = new State("value", null, "etag", stateOptions); mockInterceptor.addRule() - .get("http://localhost:3000/v1.0/state/key") + .get("http://localhost:3000/v1.0/state/MyStateStore/key") .respond("\"" + EXPECTED_RESULT + "\""); daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); assertThrows(IllegalArgumentException.class, () -> { - daprClientHttp.getState(stateKeyNull, String.class).block(); + daprClientHttp.getState(STATE_STORE_NAME, stateKeyNull, String.class).block(); }); - Mono> mono = daprClientHttp.getState(stateKeyValue, String.class); + Mono> mono = daprClientHttp.getState(STATE_STORE_NAME, stateKeyValue, String.class); assertEquals(mono.block().getKey(), "key"); } @@ -230,11 +233,11 @@ public class DaprClientHttpTest { public void getStatesEmptyEtag() { State stateEmptyEtag = new State("value", "key", "", null); mockInterceptor.addRule() - .get("http://localhost:3000/v1.0/state/key") + .get("http://localhost:3000/v1.0/state/MyStateStore/key") .respond("\"" + EXPECTED_RESULT + "\""); daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); - Mono> monoEmptyEtag = daprClientHttp.getState(stateEmptyEtag, String.class); + Mono> monoEmptyEtag = daprClientHttp.getState(STATE_STORE_NAME, stateEmptyEtag, String.class); assertEquals(monoEmptyEtag.block().getKey(), "key"); } @@ -242,11 +245,11 @@ public class DaprClientHttpTest { public void getStatesNullEtag() { State stateNullEtag = new State("value", "key", null, null); mockInterceptor.addRule() - .get("http://localhost:3000/v1.0/state/key") + .get("http://localhost:3000/v1.0/state/MyStateStore/key") .respond("\"" + EXPECTED_RESULT + "\""); daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); - Mono> monoNullEtag = daprClientHttp.getState(stateNullEtag, String.class); + Mono> monoNullEtag = daprClientHttp.getState(STATE_STORE_NAME, stateNullEtag, String.class); assertEquals(monoNullEtag.block().getKey(), "key"); } @@ -255,11 +258,11 @@ public class DaprClientHttpTest { State stateKeyValue = new State("value", "key", "etag", null); List> stateKeyValueList = Arrays.asList(stateKeyValue); mockInterceptor.addRule() - .post("http://localhost:3000/v1.0/state") + .post("http://localhost:3000/v1.0/state/MyStateStore") .respond(EXPECTED_RESULT); daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); - Mono mono = daprClientHttp.saveStates(stateKeyValueList); + Mono mono = daprClientHttp.saveStates(STATE_STORE_NAME, stateKeyValueList); assertNull(mono.block()); } @@ -268,13 +271,13 @@ public class DaprClientHttpTest { State stateKeyValue = new State("value", "key", "", null); List> stateKeyValueList = new ArrayList(); mockInterceptor.addRule() - .post("http://localhost:3000/v1.0/state") + .post("http://localhost:3000/v1.0/state/MyStateStore") .respond(EXPECTED_RESULT); daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); - Mono mono = daprClientHttp.saveStates(null); + Mono mono = daprClientHttp.saveStates(STATE_STORE_NAME, null); assertNull(mono.block()); - Mono mono1 = daprClientHttp.saveStates(stateKeyValueList); + Mono mono1 = daprClientHttp.saveStates(STATE_STORE_NAME, stateKeyValueList); assertNull(mono1.block()); } @@ -283,11 +286,11 @@ public class DaprClientHttpTest { State stateKeyValue = new State("value", "key", null, null); List> stateKeyValueList = Arrays.asList(stateKeyValue); mockInterceptor.addRule() - .post("http://localhost:3000/v1.0/state") + .post("http://localhost:3000/v1.0/state/MyStateStore") .respond(EXPECTED_RESULT); daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); - Mono mono = daprClientHttp.saveStates(stateKeyValueList); + Mono mono = daprClientHttp.saveStates(STATE_STORE_NAME, stateKeyValueList); assertNull(mono.block()); } @@ -296,23 +299,23 @@ public class DaprClientHttpTest { State stateKeyValue = new State("value", "key", "", null); List> stateKeyValueList = Arrays.asList(stateKeyValue); mockInterceptor.addRule() - .post("http://localhost:3000/v1.0/state") + .post("http://localhost:3000/v1.0/state/MyStateStore") .respond(EXPECTED_RESULT); daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); - Mono mono = daprClientHttp.saveStates(stateKeyValueList); + Mono mono = daprClientHttp.saveStates(STATE_STORE_NAME, stateKeyValueList); assertNull(mono.block()); } @Test public void simpleSaveStates() { mockInterceptor.addRule() - .post("http://localhost:3000/v1.0/state") + .post("http://localhost:3000/v1.0/state/MyStateStore") .respond(EXPECTED_RESULT); StateOptions stateOptions = mock(StateOptions.class); daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); - Mono mono = daprClientHttp.saveState("key", "etag", "value", stateOptions); + Mono mono = daprClientHttp.saveState(STATE_STORE_NAME, "key", "etag", "value", stateOptions); assertNull(mono.block()); } @@ -322,11 +325,11 @@ public class DaprClientHttpTest { StateOptions stateOptions = mock(StateOptions.class); State stateKeyValue = new State("value", "key", "etag", stateOptions); mockInterceptor.addRule() - .delete("http://localhost:3000/v1.0/state/key") + .delete("http://localhost:3000/v1.0/state/MyStateStore/key") .respond(EXPECTED_RESULT); daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); - Mono mono = daprClientHttp.deleteState(stateKeyValue.getKey(), stateKeyValue.getEtag(), stateOptions); + Mono mono = daprClientHttp.deleteState(STATE_STORE_NAME, stateKeyValue.getKey(), stateKeyValue.getEtag(), stateOptions); assertNull(mono.block()); } @@ -334,11 +337,11 @@ public class DaprClientHttpTest { public void deleteStateNullEtag() { State stateKeyValue = new State("value", "key", null, null); mockInterceptor.addRule() - .delete("http://localhost:3000/v1.0/state/key") + .delete("http://localhost:3000/v1.0/state/MyStateStore/key") .respond(EXPECTED_RESULT); daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); - Mono mono = daprClientHttp.deleteState(stateKeyValue.getKey(), stateKeyValue.getEtag(), null); + Mono mono = daprClientHttp.deleteState(STATE_STORE_NAME, stateKeyValue.getKey(), stateKeyValue.getEtag(), null); assertNull(mono.block()); } @@ -346,11 +349,11 @@ public class DaprClientHttpTest { public void deleteStateEmptyEtag() { State stateKeyValue = new State("value", "key", "", null); mockInterceptor.addRule() - .delete("http://localhost:3000/v1.0/state/key") + .delete("http://localhost:3000/v1.0/state/MyStateStore/key") .respond(EXPECTED_RESULT); daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); - Mono mono = daprClientHttp.deleteState(stateKeyValue.getKey(), stateKeyValue.getEtag(), null); + Mono mono = daprClientHttp.deleteState(STATE_STORE_NAME, stateKeyValue.getKey(), stateKeyValue.getEtag(), null); assertNull(mono.block()); } @@ -359,18 +362,27 @@ public class DaprClientHttpTest { State stateKeyValueNull = new State("value", null, "etag", null); State stateKeyValueEmpty = new State("value", "", "etag", null); mockInterceptor.addRule() - .delete("http://localhost:3000/v1.0/state/key") + .delete("http://localhost:3000/v1.0/state/MyStateStore/key") .respond(EXPECTED_RESULT); daprHttp = new DaprHttp(3000, okHttpClient); daprClientHttp = new DaprClientHttp(daprHttp); assertThrows(IllegalArgumentException.class, () -> { - daprClientHttp.deleteState(null, null, null).block(); + daprClientHttp.deleteState(STATE_STORE_NAME, null, null, null).block(); }); assertThrows(IllegalArgumentException.class, () -> { - daprClientHttp.deleteState("", null, null).block(); + daprClientHttp.deleteState(STATE_STORE_NAME, "", null, null).block(); }); assertThrows(IllegalArgumentException.class, () -> { - daprClientHttp.deleteState(" ", null, null).block(); + daprClientHttp.deleteState(STATE_STORE_NAME, " ", null, null).block(); + }); + assertThrows(IllegalArgumentException.class, () -> { + daprClientHttp.deleteState(null, "key", null, null).block(); + }); + assertThrows(IllegalArgumentException.class, () -> { + daprClientHttp.deleteState("", "key", null, null).block(); + }); + assertThrows(IllegalArgumentException.class, () -> { + daprClientHttp.deleteState(" ", "key", null, null).block(); }); } } \ No newline at end of file