From acd28bb1896c9bb8ad8da8d4956981c9178560e7 Mon Sep 17 00:00:00 2001 From: Andres Robles <15348598+AndresRoblesMX@users.noreply.github.com> Date: Thu, 16 Jan 2020 12:57:12 -0600 Subject: [PATCH] Receiving the StateOptions on save as part of the State, to comply with the DAPR API (#105) * Receiving the StateOptions on save as part of the State, to comply with the DAPR API * #26 Add integration test for Concurrency funtionality in the states module Co-authored-by: Juan Jose Herrera <35985447+JuanJose-Herrera@users.noreply.github.com> Co-authored-by: Artur Souza --- .../main/java/io/dapr/client/DaprClient.java | 3 +- .../io/dapr/client/DaprClientGrpcAdapter.java | 96 +++-- .../io/dapr/client/DaprClientHttpAdapter.java | 21 +- .../io/dapr/client/domain/StateKeyValue.java | 42 +- .../io/dapr/client/domain/StateOptions.java | 5 + .../client/DaprClientGrpcAdapterTest.java | 76 ++-- .../io/dapr/it/state/HttpStateClientIT.java | 398 ++++++++++++++++-- 7 files changed, 504 insertions(+), 137 deletions(-) diff --git a/sdk/src/main/java/io/dapr/client/DaprClient.java b/sdk/src/main/java/io/dapr/client/DaprClient.java index 2ab002af8..b53a869d1 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprClient.java @@ -129,11 +129,10 @@ public interface DaprClient { * Save/Update a list of states. * * @param states the States to be saved. - * @param options the Options to use for each state. * @param the Type of the State. * @return a Mono plan of type Void. */ - Mono saveStates(List> states, StateOptions options); + Mono saveStates(List> states); /** * Save/Update a state. diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java index aa6885413..8e2d27a99 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpcAdapter.java @@ -183,7 +183,7 @@ class DaprClientGrpcAdapter implements DaprClient { ListenableFuture futureResponse = client.getState(envelope); return Mono.just(futureResponse).flatMap(f -> { try { - return Mono.just(buildStateKeyValue(f.get(), state.getKey(), clazz)); + return Mono.just(buildStateKeyValue(f.get(), state.getKey(), stateOptions, clazz)); } catch (Exception ex) { return Mono.error(ex); } @@ -193,61 +193,22 @@ class DaprClientGrpcAdapter implements DaprClient { } } - private StateKeyValue buildStateKeyValue(DaprProtos.GetStateResponseEnvelope resonse, String requestedKey, Class clazz) throws IOException { + private StateKeyValue buildStateKeyValue(DaprProtos.GetStateResponseEnvelope resonse, String requestedKey, StateOptions stateOptions, Class clazz) throws IOException { T value = objectSerializer.deserialize(resonse.getData().getValue().toByteArray(), clazz); String etag = resonse.getEtag(); String key = requestedKey; - - return new StateKeyValue<>(value, key, etag); + return new StateKeyValue<>(value, key, etag, stateOptions); } /** * {@inheritDoc} */ @Override - public Mono saveStates(List> states, StateOptions options) { + public Mono saveStates(List> states) { try { - DaprProtos.StateRequestOptions.Builder optionBuilder = null; - if (options != null) { - DaprProtos.StateRetryPolicy.Builder retryPolicyBuilder = null; - if (options.getRetryPolicy() != null) { - retryPolicyBuilder = DaprProtos.StateRetryPolicy.newBuilder(); - StateOptions.RetryPolicy retryPolicy = options.getRetryPolicy(); - if (options.getRetryPolicy().getInterval() != null) { - Duration.Builder durationBuilder = Duration.newBuilder() - .setNanos(retryPolicy.getInterval().getNano()) - .setSeconds(retryPolicy.getInterval().getSeconds()); - retryPolicyBuilder.setInterval(durationBuilder.build()); - } - retryPolicyBuilder.setThreshold(objectSerializer.deserialize(retryPolicy.getThreshold(), int.class)); - if (retryPolicy.getPattern() != null) { - retryPolicyBuilder.setPattern(retryPolicy.getPattern().getValue()); - } - } - - optionBuilder = DaprProtos.StateRequestOptions.newBuilder(); - if (options.getConcurrency() != null) { - optionBuilder.setConcurrency(options.getConcurrency().getValue()); - } - if (options.getConsistency() != null) { - optionBuilder.setConsistency(options.getConsistency().getValue()); - } - if (retryPolicyBuilder != null) { - optionBuilder.setRetryPolicy(retryPolicyBuilder.build()); - } - } DaprProtos.SaveStateEnvelope.Builder builder = DaprProtos.SaveStateEnvelope.newBuilder(); for (StateKeyValue state : states) { - byte[] byteState = objectSerializer.serialize(state.getValue()); - Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteState)).build(); - DaprProtos.StateRequest.Builder stateBuilder = DaprProtos.StateRequest.newBuilder() - .setEtag(state.getEtag()) - .setKey(state.getKey()) - .setValue(data); - if(optionBuilder != null) { - stateBuilder.setOptions(optionBuilder.build()); - } - builder.addRequests(stateBuilder.build()); + builder.addRequests(buildStateRequest(state).build()); } DaprProtos.SaveStateEnvelope envelope = builder.build(); @@ -265,10 +226,53 @@ class DaprClientGrpcAdapter implements DaprClient { } } + private DaprProtos.StateRequest.Builder buildStateRequest(StateKeyValue state) throws IOException { + byte[] byteState = objectSerializer.serialize(state.getValue()); + Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteState)).build(); + DaprProtos.StateRequest.Builder stateBuilder = DaprProtos.StateRequest.newBuilder() + .setEtag(state.getEtag()) + .setKey(state.getKey()) + .setValue(data); + DaprProtos.StateRequestOptions.Builder optionBuilder = null; + if (state.getOptions() != null) { + StateOptions options = state.getOptions(); + DaprProtos.StateRetryPolicy.Builder retryPolicyBuilder = null; + if (options.getRetryPolicy() != null) { + retryPolicyBuilder = DaprProtos.StateRetryPolicy.newBuilder(); + StateOptions.RetryPolicy retryPolicy = options.getRetryPolicy(); + if (options.getRetryPolicy().getInterval() != null) { + Duration.Builder durationBuilder = Duration.newBuilder() + .setNanos(retryPolicy.getInterval().getNano()) + .setSeconds(retryPolicy.getInterval().getSeconds()); + retryPolicyBuilder.setInterval(durationBuilder.build()); + } + retryPolicyBuilder.setThreshold(objectSerializer.deserialize(retryPolicy.getThreshold(), int.class)); + if (retryPolicy.getPattern() != null) { + retryPolicyBuilder.setPattern(retryPolicy.getPattern().getValue()); + } + } + + optionBuilder = DaprProtos.StateRequestOptions.newBuilder(); + if (options.getConcurrency() != null) { + optionBuilder.setConcurrency(options.getConcurrency().getValue()); + } + if (options.getConsistency() != null) { + optionBuilder.setConsistency(options.getConsistency().getValue()); + } + if (retryPolicyBuilder != null) { + optionBuilder.setRetryPolicy(retryPolicyBuilder.build()); + } + } + if(optionBuilder != null) { + stateBuilder.setOptions(optionBuilder.build()); + } + return stateBuilder; + } + @Override public Mono saveState(String key, String etag, T value, StateOptions options) { - StateKeyValue state = new StateKeyValue<>(value, key, etag); - return saveStates(Arrays.asList(state), options); + StateKeyValue state = new StateKeyValue<>(value, key, etag, options); + return saveStates(Arrays.asList(state)); } /** diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java b/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java index 504a5ce4c..6a5b2840f 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttpAdapter.java @@ -185,7 +185,7 @@ public class DaprClientHttpAdapter implements DaprClient { .invokeAPI(DaprHttp.HttpMethods.GET.name(), url.toString(), urlParameters, headers) .flatMap(s -> { try { - return Mono.just(buildStateKeyValue(s, state.getKey(), clazz)); + return Mono.just(buildStateKeyValue(s, state.getKey(), stateOptions, clazz)); }catch (Exception ex){ return Mono.error(ex); } @@ -199,22 +199,21 @@ public class DaprClientHttpAdapter implements DaprClient { * {@inheritDoc} */ @Override - public Mono saveStates(List> states, StateOptions options) { + public Mono saveStates(List> states) { try { if (states == null || states.isEmpty()) { return Mono.empty(); } final Map headers = new HashMap<>(); final String etag = states.stream().filter(state -> null != state.getEtag() && !state.getEtag().trim().isEmpty()) - .findFirst().orElse(new StateKeyValue<>(null, null, null)).getEtag(); + .findFirst().orElse(new StateKeyValue<>(null, null, null, null)).getEtag(); if (etag != null && !etag.trim().isEmpty()) { headers.put(Constants.HEADER_HTTP_ETAG_ID, etag); } final String url = Constants.STATE_PATH; - Map urlParameter = Optional.ofNullable(options).map(stateOptions -> stateOptions.getStateOptionsAsMap() ).orElse( new HashMap<>()); byte[] serializedStateBody = objectSerializer.serialize(states); return this.client.invokeAPI( - DaprHttp.HttpMethods.POST.name(), url, urlParameter, serializedStateBody, headers).then(); + DaprHttp.HttpMethods.POST.name(), url, null, serializedStateBody, headers).then(); } catch (Exception ex) { return Mono.error(ex); } @@ -225,8 +224,8 @@ public class DaprClientHttpAdapter implements DaprClient { */ @Override public Mono saveState(String key, String etag, T value, StateOptions options) { - StateKeyValue state = new StateKeyValue<>(value, key, etag); - return saveStates(Arrays.asList(state), options); + StateKeyValue state = new StateKeyValue<>(value, key, etag, options); + return saveStates(Arrays.asList(state)); } /** @@ -339,14 +338,14 @@ public class DaprClientHttpAdapter implements DaprClient { * @return A StateKeyValue instance * @throws IOException If there's a issue deserialzing the response. */ - private StateKeyValue buildStateKeyValue(DaprHttp.Response resonse, String requestedKey, Class clazz) throws IOException { + private StateKeyValue buildStateKeyValue(DaprHttp.Response resonse, String requestedKey, StateOptions stateOptions, Class clazz) throws IOException { T value = objectSerializer.deserialize(resonse.getBody(), clazz); String key = requestedKey; String etag = null; - if (resonse.getHeaders() != null && resonse.getHeaders().containsKey("ETag")) { - etag = objectSerializer.deserialize(resonse.getHeaders().get("ETag"), String.class); + if (resonse.getHeaders() != null && resonse.getHeaders().containsKey("Etag")) { + etag = objectSerializer.deserialize(resonse.getHeaders().get("Etag"), String.class); } - return new StateKeyValue<>(value, key, etag); + return new StateKeyValue<>(value, key, etag, stateOptions); } } diff --git a/sdk/src/main/java/io/dapr/client/domain/StateKeyValue.java b/sdk/src/main/java/io/dapr/client/domain/StateKeyValue.java index 1f1e59c6a..d42eb1e5c 100644 --- a/sdk/src/main/java/io/dapr/client/domain/StateKeyValue.java +++ b/sdk/src/main/java/io/dapr/client/domain/StateKeyValue.java @@ -6,7 +6,7 @@ package io.dapr.client.domain; /** * This class reprent what a State is - * @param + * @param The type of the value of the sate */ public class StateKeyValue { /** @@ -24,20 +24,27 @@ public class StateKeyValue { private final String etag; /** - * Create an inmutable state - * @param value - * @param key - * @param etag + * The options used for saving the state */ - public StateKeyValue(T value, String key, String etag) { + private final StateOptions options; + + /** + * Create an inmutable state + * @param value - The value of the state + * @param key - The key of the state + * @param etag - The etag of the state - Keep in mind that for some state stores (like reids) only numbers are supported. + * @param options - REQUIRED when saving a state. + */ + public StateKeyValue(T value, String key, String etag, StateOptions options) { this.value = value; this.key = key; this.etag = etag; + this.options = options; } /** * Retrieves the Value of the state - * @return + * @return The value of the state */ public T getValue() { return value; @@ -45,7 +52,7 @@ public class StateKeyValue { /** * Retrieves the Key of the state - * @return + * @return The key of the state */ public String getKey() { return key; @@ -53,12 +60,20 @@ public class StateKeyValue { /** * Retrieve the ETag of this state - * @return + * @return The etag of the state */ public String getEtag() { return etag; } + /** + * Retrieve the Options used for saving the state + * @return The options to save the state + */ + public StateOptions getOptions() { + return options; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -69,6 +84,7 @@ public class StateKeyValue { if (getValue() != null ? !getValue().equals(that.getValue()) : that.getValue() != null) return false; if (getKey() != null ? !getKey().equals(that.getKey()) : that.getKey() != null) return false; if (getEtag() != null ? !getEtag().equals(that.getEtag()) : that.getEtag() != null) return false; + if (getOptions() != null ? !getOptions().equals(that.getOptions()) : that.getOptions() != null) return false; return true; } @@ -78,6 +94,7 @@ public class StateKeyValue { int result = getValue() != null ? getValue().hashCode() : 0; result = 31 * result + (getKey() != null ? getKey().hashCode() : 0); result = 31 * result + (getEtag() != null ? getEtag().hashCode() : 0); + result = 31 * result + (getOptions() != null ? options.hashCode() : 0); return result; } @@ -85,8 +102,9 @@ public class StateKeyValue { public String toString() { return "StateKeyValue{" + "value=" + value + - ", key='" + key + '\'' + - ", etag='" + etag + '\'' + - '}'; + ", key='" + key + "'" + + ", etag='" + etag + "'" + + ", options={'" + options.toString() + "}" + + "}"; } } diff --git a/sdk/src/main/java/io/dapr/client/domain/StateOptions.java b/sdk/src/main/java/io/dapr/client/domain/StateOptions.java index fa264febd..67938c3d1 100644 --- a/sdk/src/main/java/io/dapr/client/domain/StateOptions.java +++ b/sdk/src/main/java/io/dapr/client/domain/StateOptions.java @@ -4,6 +4,7 @@ */ package io.dapr.client.domain; +import com.fasterxml.jackson.annotation.JsonValue; import io.dapr.utils.DurationUtils; import java.time.Duration; @@ -70,9 +71,11 @@ public class StateOptions { this.value = value; } + @JsonValue public String getValue() { return this.value; } + } public static enum Concurrency { @@ -85,9 +88,11 @@ public class StateOptions { this.value = value; } + @JsonValue public String getValue() { return this.value; } + } public static class RetryPolicy { diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcAdapterTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcAdapterTest.java index e860eefc2..dd83cb318 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcAdapterTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcAdapterTest.java @@ -465,7 +465,7 @@ public class DaprClientGrpcAdapterTest { @Test(expected = RuntimeException.class) public void getStateExceptionThrownTest() { when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class))).thenThrow(RuntimeException.class); - StateKeyValue key = buildStateKey(null, "Key1", "ETag1"); + StateKeyValue key = buildStateKey(null, "Key1", "ETag1", null); Mono> result = adater.getState(key, null, String.class); result.block(); } @@ -479,7 +479,7 @@ public class DaprClientGrpcAdapterTest { addCallback(settableFuture, callback, directExecutor()); when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class))) .thenReturn(settableFuture); - StateKeyValue key = buildStateKey(null, "Key1", "ETag1"); + StateKeyValue key = buildStateKey(null, "Key1", "ETag1", null); Mono> result = adater.getState(key, null, String.class); settableFuture.setException(ex); result.block(); @@ -490,7 +490,7 @@ public class DaprClientGrpcAdapterTest { String etag = "ETag1"; String key = "key1"; String expectedValue = "Expected state"; - StateKeyValue expectedState = buildStateKey(expectedValue, key, etag); + StateKeyValue expectedState = buildStateKey(expectedValue, key, etag, null); DaprProtos.GetStateResponseEnvelope responseEnvelope = DaprProtos.GetStateResponseEnvelope.newBuilder() .setData(getAny(expectedValue)) .setEtag(etag) @@ -500,7 +500,7 @@ public class DaprClientGrpcAdapterTest { addCallback(settableFuture, callback, directExecutor()); when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class))) .thenReturn(settableFuture); - StateKeyValue keyRequest = buildStateKey(null, key, etag); + StateKeyValue keyRequest = buildStateKey(null, key, etag, null); Mono> result = adater.getState(keyRequest, null, String.class); settableFuture.set(responseEnvelope); assertEquals(expectedState, result.block()); @@ -511,14 +511,14 @@ public class DaprClientGrpcAdapterTest { String etag = "ETag1"; String key = "key1"; MyObject expectedValue = new MyObject(1, "The Value"); - StateKeyValue expectedState = buildStateKey(expectedValue, key, etag); + StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, + Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); + StateKeyValue expectedState = buildStateKey(expectedValue, key, etag, options); DaprProtos.GetStateResponseEnvelope responseEnvelope = DaprProtos.GetStateResponseEnvelope.newBuilder() .setData(getAny(expectedValue)) .setEtag(etag) .build(); - StateKeyValue keyRequest = buildStateKey(null, key, etag); - StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, - Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); + StateKeyValue keyRequest = buildStateKey(null, key, etag, options); SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(responseEnvelope); addCallback(settableFuture, callback, directExecutor()); @@ -534,14 +534,14 @@ public class DaprClientGrpcAdapterTest { String etag = "ETag1"; String key = "key1"; MyObject expectedValue = new MyObject(1, "The Value"); - StateKeyValue expectedState = buildStateKey(expectedValue, key, etag); + StateOptions options = new StateOptions(null, StateOptions.Concurrency.FIRST_WRITE, + new StateOptions.RetryPolicy(Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR)); + StateKeyValue expectedState = buildStateKey(expectedValue, key, etag, options); DaprProtos.GetStateResponseEnvelope responseEnvelope = DaprProtos.GetStateResponseEnvelope.newBuilder() .setData(getAny(expectedValue)) .setEtag(etag) .build(); - StateKeyValue keyRequest = buildStateKey(null, key, etag); - StateOptions options = new StateOptions(null, StateOptions.Concurrency.FIRST_WRITE, - new StateOptions.RetryPolicy(Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR)); + StateKeyValue keyRequest = buildStateKey(null, key, etag, options); SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(responseEnvelope); addCallback(settableFuture, callback, directExecutor()); @@ -555,7 +555,7 @@ public class DaprClientGrpcAdapterTest { @Test(expected = RuntimeException.class) public void deleteStateExceptionThrowTest() { when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))).thenThrow(RuntimeException.class); - StateKeyValue key = buildStateKey(null, "Key1", "ETag1"); + StateKeyValue key = buildStateKey(null, "Key1", "ETag1", null); Mono result = adater.deleteState(key, null); result.block(); } @@ -569,7 +569,7 @@ public class DaprClientGrpcAdapterTest { addCallback(settableFuture, callback, directExecutor()); when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); - StateKeyValue key = buildStateKey(null, "Key1", "ETag1"); + StateKeyValue key = buildStateKey(null, "Key1", "ETag1", null); Mono result = adater.deleteState(key, null); settableFuture.setException(ex); result.block(); @@ -584,7 +584,7 @@ public class DaprClientGrpcAdapterTest { addCallback(settableFuture, callback, directExecutor()); when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); - StateKeyValue stateKey = buildStateKey(null, key, etag); + StateKeyValue stateKey = buildStateKey(null, key, etag, null); Mono result = adater.deleteState(stateKey, null); settableFuture.set(Empty.newBuilder().build()); result.block(); @@ -595,14 +595,14 @@ public class DaprClientGrpcAdapterTest { public void deleteStateTest() { String etag = "ETag1"; String key = "key1"; + StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, + Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); - StateKeyValue stateKey = buildStateKey(null, key, etag); - StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, - Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); + StateKeyValue stateKey = buildStateKey(null, key, etag, options); Mono result = adater.deleteState(stateKey, options); settableFuture.set(Empty.newBuilder().build()); result.block(); @@ -613,14 +613,14 @@ public class DaprClientGrpcAdapterTest { public void deleteStateNoConsistencyTest() { String etag = "ETag1"; String key = "key1"; + StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE, + Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); - StateKeyValue stateKey = buildStateKey(null, key, etag); - StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE, - Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); + StateKeyValue stateKey = buildStateKey(null, key, etag, options); Mono result = adater.deleteState(stateKey, options); settableFuture.set(Empty.newBuilder().build()); result.block(); @@ -631,14 +631,14 @@ public class DaprClientGrpcAdapterTest { public void deleteStateNoConcurrencyTest() { String etag = "ETag1"; String key = "key1"; + StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null, + Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); - StateKeyValue stateKey = buildStateKey(null, key, etag); - StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null, - Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR); + StateKeyValue stateKey = buildStateKey(null, key, etag, options); Mono result = adater.deleteState(stateKey, options); settableFuture.set(Empty.newBuilder().build()); result.block(); @@ -649,14 +649,14 @@ public class DaprClientGrpcAdapterTest { public void deleteStateNoRetryPolicyTest() { String etag = "ETag1"; String key = "key1"; + StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, + null, null, null); SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); - StateKeyValue stateKey = buildStateKey(null, key, etag); - StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, - null, null, null); + StateKeyValue stateKey = buildStateKey(null, key, etag, options); Mono result = adater.deleteState(stateKey, options); settableFuture.set(Empty.newBuilder().build()); result.block(); @@ -667,14 +667,14 @@ public class DaprClientGrpcAdapterTest { public void deleteStateRetryPolicyNoDurationTest() { String etag = "ETag1"; String key = "key1"; + StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, + null, 1, StateOptions.RetryPolicy.Pattern.LINEAR); SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); - StateKeyValue stateKey = buildStateKey(null, key, etag); - StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, - null, 1, StateOptions.RetryPolicy.Pattern.LINEAR); + StateKeyValue stateKey = buildStateKey(null, key, etag, options); Mono result = adater.deleteState(stateKey, options); settableFuture.set(Empty.newBuilder().build()); result.block(); @@ -685,14 +685,14 @@ public class DaprClientGrpcAdapterTest { public void deleteStateRetryPolicyNoThresholdTest() { String etag = "ETag1"; String key = "key1"; + StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, + Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR); SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); - StateKeyValue stateKey = buildStateKey(null, key, etag); - StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, - Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR); + StateKeyValue stateKey = buildStateKey(null, key, etag, options); Mono result = adater.deleteState(stateKey, options); settableFuture.set(Empty.newBuilder().build()); result.block(); @@ -703,14 +703,14 @@ public class DaprClientGrpcAdapterTest { public void deleteStateRetryPolicyNoPatternTest() { String etag = "ETag1"; String key = "key1"; + StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, + Duration.ofDays(100), 1, null); SettableFuture settableFuture = SettableFuture.create(); MockCallback callback = new MockCallback<>(Empty.newBuilder().build()); addCallback(settableFuture, callback, directExecutor()); when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))) .thenReturn(settableFuture); - StateKeyValue stateKey = buildStateKey(null, key, etag); - StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, - Duration.ofDays(100), 1, null); + StateKeyValue stateKey = buildStateKey(null, key, etag, options); Mono result = adater.deleteState(stateKey, options); settableFuture.set(Empty.newBuilder().build()); result.block(); @@ -876,8 +876,8 @@ public class DaprClientGrpcAdapterTest { assertTrue(callback.wasCalled); } - private StateKeyValue buildStateKey(T value, String key, String etag) { - return new StateKeyValue(value, key, etag); + private StateKeyValue buildStateKey(T value, String key, String etag, StateOptions options) { + return new StateKeyValue(value, key, etag, options); } private StateOptions buildStateOptions(StateOptions.Consistency consistency, StateOptions.Concurrency concurrency, diff --git a/sdk/src/test/java/io/dapr/it/state/HttpStateClientIT.java b/sdk/src/test/java/io/dapr/it/state/HttpStateClientIT.java index 454c01164..2c6e3325b 100644 --- a/sdk/src/test/java/io/dapr/it/state/HttpStateClientIT.java +++ b/sdk/src/test/java/io/dapr/it/state/HttpStateClientIT.java @@ -8,6 +8,7 @@ package io.dapr.it.state; import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; import io.dapr.client.domain.StateKeyValue; +import io.dapr.client.domain.StateOptions; import io.dapr.it.BaseIT; import io.dapr.it.services.EmptyService; import org.junit.Assert; @@ -35,69 +36,410 @@ public class HttpStateClientIT extends BaseIT { @Test public void saveAndGetState() { - final String stateKey= "myKey"; + //The key use to store the state + final String stateKey = "myKey"; - DaprClient daprClient= new DaprClientBuilder().build(); - MyData data= new MyData(); + //create the http client + DaprClient daprClient = new DaprClientBuilder().build(); + + //creation of a dummy data + MyData data = new MyData(); data.setPropertyA("data in property A"); data.setPropertyB("data in property B"); - Mono saveResponse= daprClient.saveState(stateKey,null,data, null); + + //create of the deferred call to DAPR to store the state + Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + //execute the save action saveResponse.block(); - Mono> response= daprClient.getState( new StateKeyValue(null,stateKey,null),null,MyData.class); - StateKeyValue myDataResponse=response.block(); + //create of the deferred call to DAPR to get the state + Mono> response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class); - Assert.assertEquals("data in property A",myDataResponse.getValue().getPropertyA()); - Assert.assertEquals("data in property B",myDataResponse.getValue().getPropertyB()); + //retrieve the state + StateKeyValue myDataResponse = response.block(); + + //Assert that the response is the correct one + Assert.assertNotNull(myDataResponse.getEtag()); + Assert.assertNotNull(myDataResponse.getKey()); + Assert.assertNotNull(myDataResponse.getValue()); + Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA()); + Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); } @Test public void saveUpdateAndGetState() { - final String stateKey= "keyToBeUpdated"; - DaprClient daprClient= new DaprClientBuilder().build(); - MyData data= new MyData(); + //The key use to store the state and be updated + final String stateKey = "keyToBeUpdated"; + + //create http DAPR client + DaprClient daprClient = new DaprClientBuilder().build(); + //Create dummy data to be store + MyData data = new MyData(); data.setPropertyA("data in property A"); data.setPropertyB("data in property B"); - Mono saveResponse= daprClient.saveState(stateKey,null,data, null); + + //Create deferred action to save the sate + Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + //execute save action to DAPR saveResponse.block(); + //change data properties data.setPropertyA("data in property A"); data.setPropertyB("data in property B2"); - saveResponse= daprClient.saveState(stateKey,null,data, null); + //create deferred action to update the sate without any etag or options + saveResponse = daprClient.saveState(stateKey, null, data, null); + //execute the update action to DAPR saveResponse.block(); - Mono> response= daprClient.getState( new StateKeyValue(null,stateKey,null),null,MyData.class); - StateKeyValue myDataResponse=response.block(); + //Create deferred action to retrieve the action + Mono> response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class); + //execute the retrieve of the state + StateKeyValue myDataResponse = response.block(); - Assert.assertEquals("data in property A",myDataResponse.getValue().getPropertyA()); - Assert.assertEquals("data in property B2",myDataResponse.getValue().getPropertyB()); + //review that the update was success action + Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA()); + Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB()); } @Test public void saveAndDeleteState() { - final String stateKey= "myeKeyToBeDeleted"; + //The key use to store the state and be deleted + final String stateKey = "myeKeyToBeDeleted"; - DaprClient daprClient= new DaprClientBuilder().build(); - MyData data= new MyData(); + //create DAPR client + DaprClient daprClient = new DaprClientBuilder().build(); + + //Create dummy data to be store + MyData data = new MyData(); data.setPropertyA("data in property A"); data.setPropertyB("data in property B"); - Mono saveResponse= daprClient.saveState(stateKey,null,data, null); + //Create deferred action to save the sate + Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + //execute the save state action saveResponse.block(); - Mono> response= daprClient.getState( new StateKeyValue(null,stateKey,null),null,MyData.class); - StateKeyValue myDataResponse=response.block(); + //Create deferred action to retrieve the state + Mono> response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class); + //execute the retrieve of the state + StateKeyValue myDataResponse = response.block(); - Assert.assertEquals("data in property A",myDataResponse.getValue().getPropertyA()); - Assert.assertEquals("data in property B",myDataResponse.getValue().getPropertyB()); + //review that the state was saved correctly + Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA()); + Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); - Mono deleteResponse= daprClient.deleteState( new StateKeyValue(null,stateKey,null),null); + //create deferred action to delete the state + Mono deleteResponse = daprClient.deleteState(new StateKeyValue(null, stateKey, null, null), null); + //execute the delete action deleteResponse.block(); - response= daprClient.getState( new StateKeyValue(null,stateKey,null),null,MyData.class); - myDataResponse=response.block(); + //Create deferred action to retrieve the state + response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class); + //execute the retrieve of the state + myDataResponse = response.block(); + //review that the action does not return any value, because the state was deleted Assert.assertNull(myDataResponse.getValue()); } + + @Test + public void saveUpdateAndGetStateWithEtag() { + //The key use to store the state and be updated using etags + final String stateKey = "keyToBeUpdatedWithEtag"; + //create DAPR client + DaprClient daprClient = new DaprClientBuilder().build(); + //Create dummy data to be store + MyData data = new MyData(); + data.setPropertyA("data in property A"); + data.setPropertyB("data in property B"); + + //Create deferred action to save the sate + Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + //execute the save state action + saveResponse.block(); + + //Create deferred action to retrieve the state + Mono> response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class); + //execute the action for retrieve the state and the etag + StateKeyValue myDataResponse = response.block(); + + //review that the etag is not empty + Assert.assertNotNull(myDataResponse.getEtag()); + Assert.assertNotNull(myDataResponse.getKey()); + Assert.assertNotNull(myDataResponse.getValue()); + Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA()); + Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); + + String firstETag = myDataResponse.getEtag(); + + //change the data in order to update the state + data.setPropertyA("data in property A2"); + data.setPropertyB("data in property B2"); + //Create deferred action to update the data using the correct etag + saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, null); + saveResponse.block(); + + + response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class); + //retrive the data wihout any etag + myDataResponse = response.block(); + + //review that state value changes + Assert.assertNotNull(myDataResponse.getEtag()); + //review that the etag changes after an update + Assert.assertNotEquals(firstETag,myDataResponse.getEtag()); + Assert.assertNotNull(myDataResponse.getKey()); + Assert.assertNotNull(myDataResponse.getValue()); + Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA()); + Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB()); + } + + + @Test(expected = RuntimeException.class) + public void saveUpdateAndGetStateWithWrongEtag() { + final String stateKey = "keyToBeUpdatedWithWrongEtag"; + + //create DAPR client + DaprClient daprClient = new DaprClientBuilder().build(); + //Create dummy data to be store + MyData data = new MyData(); + data.setPropertyA("data in property A"); + data.setPropertyB("data in property B"); + + //Create deferred action to save the sate + Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + //execute the save state action + saveResponse.block(); + + //Create deferred action to retrieve the state + Mono> response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class); + //execute the action for retrieve the state and the etag + StateKeyValue myDataResponse = response.block(); + + //review that the etag is not empty + Assert.assertNotNull(myDataResponse.getEtag()); + Assert.assertNotNull(myDataResponse.getKey()); + Assert.assertNotNull(myDataResponse.getValue()); + Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA()); + Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); + + String firstETag = myDataResponse.getEtag(); + + //change the data in order to update the state + data.setPropertyA("data in property A2"); + data.setPropertyB("data in property B2"); + //Create deferred action to update the data using the incorrect etag + saveResponse = daprClient.saveState(stateKey, "99999999999999", data, null); + saveResponse.block(); + + + response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class); + //retrive the data wihout any etag + myDataResponse = response.block(); + + //review that state value changes + Assert.assertNotNull(myDataResponse.getEtag()); + //review that the etag changes after an update + Assert.assertNotEquals(firstETag,myDataResponse.getEtag()); + Assert.assertNotNull(myDataResponse.getKey()); + Assert.assertNotNull(myDataResponse.getValue()); + Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA()); + Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB()); + } + + @Test + public void saveAndDeleteStateWithEtag() { + final String stateKey = "myeKeyToBeDeletedWithEtag"; + //create DAPR client + DaprClient daprClient = new DaprClientBuilder().build(); + //Create dummy data to be store + MyData data = new MyData(); + data.setPropertyA("data in property A"); + data.setPropertyB("data in property B"); + //Create deferred action to save the sate + Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + //execute the save state action + saveResponse.block(); + + //Create deferred action to get the state with the etag + Mono> response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class); + //execute the get state + StateKeyValue myDataResponse = response.block(); + + Assert.assertNotNull(myDataResponse.getEtag()); + Assert.assertNotNull(myDataResponse.getKey()); + Assert.assertNotNull(myDataResponse.getValue()); + Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA()); + Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); + + //Create deferred action to delete an state sending the etag + Mono deleteResponse = daprClient.deleteState(new StateKeyValue(null, stateKey, myDataResponse.getEtag(), null), null); + //execute the delete of the state + deleteResponse.block(); + + //Create deferred action to get the sate without an etag + response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class); + myDataResponse = response.block(); + + //Review that the response is null, because the state was deleted + Assert.assertNull(myDataResponse.getValue()); + } + + + @Test(expected = RuntimeException.class) + public void saveAndDeleteStateWithWrongEtag() { + final String stateKey = "myeKeyToBeDeletedWithWrongEtag"; + + //create DAPR client + DaprClient daprClient = new DaprClientBuilder().build(); + //Create dummy data to be store + MyData data = new MyData(); + data.setPropertyA("data in property A"); + data.setPropertyB("data in property B"); + //Create deferred action to save the sate + Mono saveResponse = daprClient.saveState(stateKey, null, data, null); + //execute the save state action + saveResponse.block(); + + //Create deferred action to get the state with the etag + Mono> response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class); + //execute the get state + StateKeyValue myDataResponse = response.block(); + + Assert.assertNotNull(myDataResponse.getEtag()); + Assert.assertNotNull(myDataResponse.getKey()); + Assert.assertNotNull(myDataResponse.getValue()); + Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA()); + Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); + + //Create deferred action to delete an state sending the incorrect etag + Mono deleteResponse = daprClient.deleteState(new StateKeyValue(null, stateKey, "99999999999", null), null); + //execute the delete of the state, this should trhow an exception + deleteResponse.block(); + + //Create deferred action to get the sate without an etag + response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class); + myDataResponse = response.block(); + + //Review that the response is null, because the state was deleted + Assert.assertNull(myDataResponse.getValue()); + } + + @Test(expected = RuntimeException.class) + public void saveUpdateAndGetStateWithEtagAndStateOptionsFirstWrite() { + final String stateKey = "keyToBeUpdatedWithEtagAndOptions"; + + //create option with concurrency with first writte and consistency of strong + StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, null); + + //create dapr client + DaprClient daprClient = new DaprClientBuilder().build(); + //create Dummy data + MyData data = new MyData(); + data.setPropertyA("data in property A"); + data.setPropertyB("data in property B"); + + //create state using stateOptions + Mono saveResponse = daprClient.saveState(stateKey, null, data, stateOptions); + //execute the save state + saveResponse.block(); + + + //crate deferred action to retrieve the state + Mono> response = daprClient.getState(new StateKeyValue(null, stateKey, null, stateOptions), stateOptions, MyData.class); + //execute the retrieve of the state using options + StateKeyValue myDataResponse = response.block(); + + Assert.assertNotNull(myDataResponse.getEtag()); + Assert.assertNotNull(myDataResponse.getKey()); + Assert.assertNotNull(myDataResponse.getValue()); + Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA()); + Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); + + //change data to be udpated + data.setPropertyA("data in property A2"); + data.setPropertyB("data in property B2"); + //create deferred action to update the action with options + saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions); + //update the state + saveResponse.block(); + + + data.setPropertyA("last write"); + data.setPropertyB("data in property B2"); + //create deferred action to update the action with the same etag + saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions); + //throws an exception, the state was already udpated + saveResponse.block(); + + response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), stateOptions, MyData.class); + StateKeyValue myLastDataResponse = response.block(); + + Assert.assertNotNull(myLastDataResponse.getEtag()); + Assert.assertNotNull(myLastDataResponse.getKey()); + Assert.assertNotNull(myLastDataResponse.getValue()); + Assert.assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag()); + Assert.assertEquals("data in property A2", myLastDataResponse.getValue().getPropertyA()); + Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB()); + } + + @Test() + public void saveUpdateAndGetStateWithEtagAndStateOptionsLastWrite() { + final String stateKey = "keyToBeUpdatedWithEtagAndOptions"; + + //create option with concurrency with first writte and consistency of strong + StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE, null); + + //create dapr client + DaprClient daprClient = new DaprClientBuilder().build(); + //create Dummy data + MyData data = new MyData(); + data.setPropertyA("data in property A"); + data.setPropertyB("data in property B"); + + //create state using stateOptions + Mono saveResponse = daprClient.saveState(stateKey, null, data, stateOptions); + //execute the save state + saveResponse.block(); + + + //crate deferred action to retrieve the state + Mono> response = daprClient.getState(new StateKeyValue(null, stateKey, null, stateOptions), stateOptions, MyData.class); + //execute the retrieve of the state using options + StateKeyValue myDataResponse = response.block(); + + Assert.assertNotNull(myDataResponse.getEtag()); + Assert.assertNotNull(myDataResponse.getKey()); + Assert.assertNotNull(myDataResponse.getValue()); + Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA()); + Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB()); + + //change data to be udpated + data.setPropertyA("data in property A2"); + data.setPropertyB("data in property B2"); + //create deferred action to update the action with options + saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions); + //update the state + saveResponse.block(); + + + data.setPropertyA("last write"); + data.setPropertyB("data in property B2"); + //create deferred action to update the action with the same etag + saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions); + //update the state without an error + saveResponse.block(); + + response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), stateOptions, MyData.class); + StateKeyValue myLastDataResponse = response.block(); + + Assert.assertNotNull(myLastDataResponse.getEtag()); + Assert.assertNotNull(myLastDataResponse.getKey()); + Assert.assertNotNull(myLastDataResponse.getValue()); + Assert.assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag()); + Assert.assertEquals("last write", myLastDataResponse.getValue().getPropertyA()); + Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB()); + } + }