Receiving the StateOptions on save as part of the State, to comply with the DAPR API (#105)

* Receiving the StateOptions on save as part of the State, to comply with the DAPR API

* #26 Add integration test for Concurrency funtionality in the states module

Co-authored-by: Juan Jose Herrera <35985447+JuanJose-Herrera@users.noreply.github.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
Andres Robles 2020-01-16 12:57:12 -06:00 committed by Artur Souza
parent 4c68286aa5
commit acd28bb189
7 changed files with 504 additions and 137 deletions

View File

@ -129,11 +129,10 @@ public interface DaprClient {
* Save/Update a list of states.
*
* @param states the States to be saved.
* @param options the Options to use for each state.
* @param <T> the Type of the State.
* @return a Mono plan of type Void.
*/
<T> Mono<Void> saveStates(List<StateKeyValue<T>> states, StateOptions options);
<T> Mono<Void> saveStates(List<StateKeyValue<T>> states);
/**
* Save/Update a state.

View File

@ -183,7 +183,7 @@ class DaprClientGrpcAdapter implements DaprClient {
ListenableFuture<DaprProtos.GetStateResponseEnvelope> futureResponse = client.getState(envelope);
return Mono.just(futureResponse).flatMap(f -> {
try {
return Mono.just(buildStateKeyValue(f.get(), state.getKey(), clazz));
return Mono.just(buildStateKeyValue(f.get(), state.getKey(), stateOptions, clazz));
} catch (Exception ex) {
return Mono.error(ex);
}
@ -193,61 +193,22 @@ class DaprClientGrpcAdapter implements DaprClient {
}
}
private <T> StateKeyValue<T> buildStateKeyValue(DaprProtos.GetStateResponseEnvelope resonse, String requestedKey, Class<T> clazz) throws IOException {
private <T> StateKeyValue<T> buildStateKeyValue(DaprProtos.GetStateResponseEnvelope resonse, String requestedKey, StateOptions stateOptions, Class<T> clazz) throws IOException {
T value = objectSerializer.deserialize(resonse.getData().getValue().toByteArray(), clazz);
String etag = resonse.getEtag();
String key = requestedKey;
return new StateKeyValue<>(value, key, etag);
return new StateKeyValue<>(value, key, etag, stateOptions);
}
/**
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> saveStates(List<StateKeyValue<T>> states, StateOptions options) {
public <T> Mono<Void> saveStates(List<StateKeyValue<T>> states) {
try {
DaprProtos.StateRequestOptions.Builder optionBuilder = null;
if (options != null) {
DaprProtos.StateRetryPolicy.Builder retryPolicyBuilder = null;
if (options.getRetryPolicy() != null) {
retryPolicyBuilder = DaprProtos.StateRetryPolicy.newBuilder();
StateOptions.RetryPolicy retryPolicy = options.getRetryPolicy();
if (options.getRetryPolicy().getInterval() != null) {
Duration.Builder durationBuilder = Duration.newBuilder()
.setNanos(retryPolicy.getInterval().getNano())
.setSeconds(retryPolicy.getInterval().getSeconds());
retryPolicyBuilder.setInterval(durationBuilder.build());
}
retryPolicyBuilder.setThreshold(objectSerializer.deserialize(retryPolicy.getThreshold(), int.class));
if (retryPolicy.getPattern() != null) {
retryPolicyBuilder.setPattern(retryPolicy.getPattern().getValue());
}
}
optionBuilder = DaprProtos.StateRequestOptions.newBuilder();
if (options.getConcurrency() != null) {
optionBuilder.setConcurrency(options.getConcurrency().getValue());
}
if (options.getConsistency() != null) {
optionBuilder.setConsistency(options.getConsistency().getValue());
}
if (retryPolicyBuilder != null) {
optionBuilder.setRetryPolicy(retryPolicyBuilder.build());
}
}
DaprProtos.SaveStateEnvelope.Builder builder = DaprProtos.SaveStateEnvelope.newBuilder();
for (StateKeyValue state : states) {
byte[] byteState = objectSerializer.serialize(state.getValue());
Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteState)).build();
DaprProtos.StateRequest.Builder stateBuilder = DaprProtos.StateRequest.newBuilder()
.setEtag(state.getEtag())
.setKey(state.getKey())
.setValue(data);
if(optionBuilder != null) {
stateBuilder.setOptions(optionBuilder.build());
}
builder.addRequests(stateBuilder.build());
builder.addRequests(buildStateRequest(state).build());
}
DaprProtos.SaveStateEnvelope envelope = builder.build();
@ -265,10 +226,53 @@ class DaprClientGrpcAdapter implements DaprClient {
}
}
private <T> DaprProtos.StateRequest.Builder buildStateRequest(StateKeyValue<T> state) throws IOException {
byte[] byteState = objectSerializer.serialize(state.getValue());
Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteState)).build();
DaprProtos.StateRequest.Builder stateBuilder = DaprProtos.StateRequest.newBuilder()
.setEtag(state.getEtag())
.setKey(state.getKey())
.setValue(data);
DaprProtos.StateRequestOptions.Builder optionBuilder = null;
if (state.getOptions() != null) {
StateOptions options = state.getOptions();
DaprProtos.StateRetryPolicy.Builder retryPolicyBuilder = null;
if (options.getRetryPolicy() != null) {
retryPolicyBuilder = DaprProtos.StateRetryPolicy.newBuilder();
StateOptions.RetryPolicy retryPolicy = options.getRetryPolicy();
if (options.getRetryPolicy().getInterval() != null) {
Duration.Builder durationBuilder = Duration.newBuilder()
.setNanos(retryPolicy.getInterval().getNano())
.setSeconds(retryPolicy.getInterval().getSeconds());
retryPolicyBuilder.setInterval(durationBuilder.build());
}
retryPolicyBuilder.setThreshold(objectSerializer.deserialize(retryPolicy.getThreshold(), int.class));
if (retryPolicy.getPattern() != null) {
retryPolicyBuilder.setPattern(retryPolicy.getPattern().getValue());
}
}
optionBuilder = DaprProtos.StateRequestOptions.newBuilder();
if (options.getConcurrency() != null) {
optionBuilder.setConcurrency(options.getConcurrency().getValue());
}
if (options.getConsistency() != null) {
optionBuilder.setConsistency(options.getConsistency().getValue());
}
if (retryPolicyBuilder != null) {
optionBuilder.setRetryPolicy(retryPolicyBuilder.build());
}
}
if(optionBuilder != null) {
stateBuilder.setOptions(optionBuilder.build());
}
return stateBuilder;
}
@Override
public <T> Mono<Void> saveState(String key, String etag, T value, StateOptions options) {
StateKeyValue<T> state = new StateKeyValue<>(value, key, etag);
return saveStates(Arrays.asList(state), options);
StateKeyValue<T> state = new StateKeyValue<>(value, key, etag, options);
return saveStates(Arrays.asList(state));
}
/**

View File

@ -185,7 +185,7 @@ public class DaprClientHttpAdapter implements DaprClient {
.invokeAPI(DaprHttp.HttpMethods.GET.name(), url.toString(), urlParameters, headers)
.flatMap(s -> {
try {
return Mono.just(buildStateKeyValue(s, state.getKey(), clazz));
return Mono.just(buildStateKeyValue(s, state.getKey(), stateOptions, clazz));
}catch (Exception ex){
return Mono.error(ex);
}
@ -199,22 +199,21 @@ public class DaprClientHttpAdapter implements DaprClient {
* {@inheritDoc}
*/
@Override
public <T> Mono<Void> saveStates(List<StateKeyValue<T>> states, StateOptions options) {
public <T> Mono<Void> saveStates(List<StateKeyValue<T>> states) {
try {
if (states == null || states.isEmpty()) {
return Mono.empty();
}
final Map<String, String> headers = new HashMap<>();
final String etag = states.stream().filter(state -> null != state.getEtag() && !state.getEtag().trim().isEmpty())
.findFirst().orElse(new StateKeyValue<>(null, null, null)).getEtag();
.findFirst().orElse(new StateKeyValue<>(null, null, null, null)).getEtag();
if (etag != null && !etag.trim().isEmpty()) {
headers.put(Constants.HEADER_HTTP_ETAG_ID, etag);
}
final String url = Constants.STATE_PATH;
Map<String, String> urlParameter = Optional.ofNullable(options).map(stateOptions -> stateOptions.getStateOptionsAsMap() ).orElse( new HashMap<>());
byte[] serializedStateBody = objectSerializer.serialize(states);
return this.client.invokeAPI(
DaprHttp.HttpMethods.POST.name(), url, urlParameter, serializedStateBody, headers).then();
DaprHttp.HttpMethods.POST.name(), url, null, serializedStateBody, headers).then();
} catch (Exception ex) {
return Mono.error(ex);
}
@ -225,8 +224,8 @@ public class DaprClientHttpAdapter implements DaprClient {
*/
@Override
public <T> Mono<Void> saveState(String key, String etag, T value, StateOptions options) {
StateKeyValue<T> state = new StateKeyValue<>(value, key, etag);
return saveStates(Arrays.asList(state), options);
StateKeyValue<T> state = new StateKeyValue<>(value, key, etag, options);
return saveStates(Arrays.asList(state));
}
/**
@ -339,14 +338,14 @@ public class DaprClientHttpAdapter implements DaprClient {
* @return A StateKeyValue instance
* @throws IOException If there's a issue deserialzing the response.
*/
private <T> StateKeyValue<T> buildStateKeyValue(DaprHttp.Response resonse, String requestedKey, Class<T> clazz) throws IOException {
private <T> StateKeyValue<T> buildStateKeyValue(DaprHttp.Response resonse, String requestedKey, StateOptions stateOptions, Class<T> clazz) throws IOException {
T value = objectSerializer.deserialize(resonse.getBody(), clazz);
String key = requestedKey;
String etag = null;
if (resonse.getHeaders() != null && resonse.getHeaders().containsKey("ETag")) {
etag = objectSerializer.deserialize(resonse.getHeaders().get("ETag"), String.class);
if (resonse.getHeaders() != null && resonse.getHeaders().containsKey("Etag")) {
etag = objectSerializer.deserialize(resonse.getHeaders().get("Etag"), String.class);
}
return new StateKeyValue<>(value, key, etag);
return new StateKeyValue<>(value, key, etag, stateOptions);
}
}

View File

@ -6,7 +6,7 @@ package io.dapr.client.domain;
/**
* This class reprent what a State is
* @param <T>
* @param <T> The type of the value of the sate
*/
public class StateKeyValue<T> {
/**
@ -24,20 +24,27 @@ public class StateKeyValue<T> {
private final String etag;
/**
* Create an inmutable state
* @param value
* @param key
* @param etag
* The options used for saving the state
*/
public StateKeyValue(T value, String key, String etag) {
private final StateOptions options;
/**
* Create an inmutable state
* @param value - The value of the state
* @param key - The key of the state
* @param etag - The etag of the state - Keep in mind that for some state stores (like reids) only numbers are supported.
* @param options - REQUIRED when saving a state.
*/
public StateKeyValue(T value, String key, String etag, StateOptions options) {
this.value = value;
this.key = key;
this.etag = etag;
this.options = options;
}
/**
* Retrieves the Value of the state
* @return
* @return The value of the state
*/
public T getValue() {
return value;
@ -45,7 +52,7 @@ public class StateKeyValue<T> {
/**
* Retrieves the Key of the state
* @return
* @return The key of the state
*/
public String getKey() {
return key;
@ -53,12 +60,20 @@ public class StateKeyValue<T> {
/**
* Retrieve the ETag of this state
* @return
* @return The etag of the state
*/
public String getEtag() {
return etag;
}
/**
* Retrieve the Options used for saving the state
* @return The options to save the state
*/
public StateOptions getOptions() {
return options;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -69,6 +84,7 @@ public class StateKeyValue<T> {
if (getValue() != null ? !getValue().equals(that.getValue()) : that.getValue() != null) return false;
if (getKey() != null ? !getKey().equals(that.getKey()) : that.getKey() != null) return false;
if (getEtag() != null ? !getEtag().equals(that.getEtag()) : that.getEtag() != null) return false;
if (getOptions() != null ? !getOptions().equals(that.getOptions()) : that.getOptions() != null) return false;
return true;
}
@ -78,6 +94,7 @@ public class StateKeyValue<T> {
int result = getValue() != null ? getValue().hashCode() : 0;
result = 31 * result + (getKey() != null ? getKey().hashCode() : 0);
result = 31 * result + (getEtag() != null ? getEtag().hashCode() : 0);
result = 31 * result + (getOptions() != null ? options.hashCode() : 0);
return result;
}
@ -85,8 +102,9 @@ public class StateKeyValue<T> {
public String toString() {
return "StateKeyValue{" +
"value=" + value +
", key='" + key + '\'' +
", etag='" + etag + '\'' +
'}';
", key='" + key + "'" +
", etag='" + etag + "'" +
", options={'" + options.toString() + "}" +
"}";
}
}

View File

@ -4,6 +4,7 @@
*/
package io.dapr.client.domain;
import com.fasterxml.jackson.annotation.JsonValue;
import io.dapr.utils.DurationUtils;
import java.time.Duration;
@ -70,9 +71,11 @@ public class StateOptions {
this.value = value;
}
@JsonValue
public String getValue() {
return this.value;
}
}
public static enum Concurrency {
@ -85,9 +88,11 @@ public class StateOptions {
this.value = value;
}
@JsonValue
public String getValue() {
return this.value;
}
}
public static class RetryPolicy {

View File

@ -465,7 +465,7 @@ public class DaprClientGrpcAdapterTest {
@Test(expected = RuntimeException.class)
public void getStateExceptionThrownTest() {
when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class))).thenThrow(RuntimeException.class);
StateKeyValue<String> key = buildStateKey(null, "Key1", "ETag1");
StateKeyValue<String> key = buildStateKey(null, "Key1", "ETag1", null);
Mono<StateKeyValue<String>> result = adater.getState(key, null, String.class);
result.block();
}
@ -479,7 +479,7 @@ public class DaprClientGrpcAdapterTest {
addCallback(settableFuture, callback, directExecutor());
when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class)))
.thenReturn(settableFuture);
StateKeyValue<String> key = buildStateKey(null, "Key1", "ETag1");
StateKeyValue<String> key = buildStateKey(null, "Key1", "ETag1", null);
Mono<StateKeyValue<String>> result = adater.getState(key, null, String.class);
settableFuture.setException(ex);
result.block();
@ -490,7 +490,7 @@ public class DaprClientGrpcAdapterTest {
String etag = "ETag1";
String key = "key1";
String expectedValue = "Expected state";
StateKeyValue<String> expectedState = buildStateKey(expectedValue, key, etag);
StateKeyValue<String> expectedState = buildStateKey(expectedValue, key, etag, null);
DaprProtos.GetStateResponseEnvelope responseEnvelope = DaprProtos.GetStateResponseEnvelope.newBuilder()
.setData(getAny(expectedValue))
.setEtag(etag)
@ -500,7 +500,7 @@ public class DaprClientGrpcAdapterTest {
addCallback(settableFuture, callback, directExecutor());
when(client.getState(any(io.dapr.DaprProtos.GetStateEnvelope.class)))
.thenReturn(settableFuture);
StateKeyValue<String> keyRequest = buildStateKey(null, key, etag);
StateKeyValue<String> keyRequest = buildStateKey(null, key, etag, null);
Mono<StateKeyValue<String>> result = adater.getState(keyRequest, null, String.class);
settableFuture.set(responseEnvelope);
assertEquals(expectedState, result.block());
@ -511,14 +511,14 @@ public class DaprClientGrpcAdapterTest {
String etag = "ETag1";
String key = "key1";
MyObject expectedValue = new MyObject(1, "The Value");
StateKeyValue<MyObject> expectedState = buildStateKey(expectedValue, key, etag);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
StateKeyValue<MyObject> expectedState = buildStateKey(expectedValue, key, etag, options);
DaprProtos.GetStateResponseEnvelope responseEnvelope = DaprProtos.GetStateResponseEnvelope.newBuilder()
.setData(getAny(expectedValue))
.setEtag(etag)
.build();
StateKeyValue<MyObject> keyRequest = buildStateKey(null, key, etag);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
StateKeyValue<MyObject> keyRequest = buildStateKey(null, key, etag, options);
SettableFuture<DaprProtos.GetStateResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
@ -534,14 +534,14 @@ public class DaprClientGrpcAdapterTest {
String etag = "ETag1";
String key = "key1";
MyObject expectedValue = new MyObject(1, "The Value");
StateKeyValue<MyObject> expectedState = buildStateKey(expectedValue, key, etag);
StateOptions options = new StateOptions(null, StateOptions.Concurrency.FIRST_WRITE,
new StateOptions.RetryPolicy(Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR));
StateKeyValue<MyObject> expectedState = buildStateKey(expectedValue, key, etag, options);
DaprProtos.GetStateResponseEnvelope responseEnvelope = DaprProtos.GetStateResponseEnvelope.newBuilder()
.setData(getAny(expectedValue))
.setEtag(etag)
.build();
StateKeyValue<MyObject> keyRequest = buildStateKey(null, key, etag);
StateOptions options = new StateOptions(null, StateOptions.Concurrency.FIRST_WRITE,
new StateOptions.RetryPolicy(Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR));
StateKeyValue<MyObject> keyRequest = buildStateKey(null, key, etag, options);
SettableFuture<DaprProtos.GetStateResponseEnvelope> settableFuture = SettableFuture.create();
MockCallback<DaprProtos.GetStateResponseEnvelope> callback = new MockCallback<>(responseEnvelope);
addCallback(settableFuture, callback, directExecutor());
@ -555,7 +555,7 @@ public class DaprClientGrpcAdapterTest {
@Test(expected = RuntimeException.class)
public void deleteStateExceptionThrowTest() {
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class))).thenThrow(RuntimeException.class);
StateKeyValue<String> key = buildStateKey(null, "Key1", "ETag1");
StateKeyValue<String> key = buildStateKey(null, "Key1", "ETag1", null);
Mono<Void> result = adater.deleteState(key, null);
result.block();
}
@ -569,7 +569,7 @@ public class DaprClientGrpcAdapterTest {
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
StateKeyValue<String> key = buildStateKey(null, "Key1", "ETag1");
StateKeyValue<String> key = buildStateKey(null, "Key1", "ETag1", null);
Mono<Void> result = adater.deleteState(key, null);
settableFuture.setException(ex);
result.block();
@ -584,7 +584,7 @@ public class DaprClientGrpcAdapterTest {
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag, null);
Mono<Void> result = adater.deleteState(stateKey, null);
settableFuture.set(Empty.newBuilder().build());
result.block();
@ -595,14 +595,14 @@ public class DaprClientGrpcAdapterTest {
public void deleteStateTest() {
String etag = "ETag1";
String key = "key1";
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adater.deleteState(stateKey, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
@ -613,14 +613,14 @@ public class DaprClientGrpcAdapterTest {
public void deleteStateNoConsistencyTest() {
String etag = "ETag1";
String key = "key1";
StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag);
StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adater.deleteState(stateKey, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
@ -631,14 +631,14 @@ public class DaprClientGrpcAdapterTest {
public void deleteStateNoConcurrencyTest() {
String etag = "ETag1";
String key = "key1";
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null,
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adater.deleteState(stateKey, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
@ -649,14 +649,14 @@ public class DaprClientGrpcAdapterTest {
public void deleteStateNoRetryPolicyTest() {
String etag = "ETag1";
String key = "key1";
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
null, null, null);
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
null, null, null);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adater.deleteState(stateKey, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
@ -667,14 +667,14 @@ public class DaprClientGrpcAdapterTest {
public void deleteStateRetryPolicyNoDurationTest() {
String etag = "ETag1";
String key = "key1";
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
null, 1, StateOptions.RetryPolicy.Pattern.LINEAR);
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
null, 1, StateOptions.RetryPolicy.Pattern.LINEAR);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adater.deleteState(stateKey, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
@ -685,14 +685,14 @@ public class DaprClientGrpcAdapterTest {
public void deleteStateRetryPolicyNoThresholdTest() {
String etag = "ETag1";
String key = "key1";
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR);
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adater.deleteState(stateKey, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
@ -703,14 +703,14 @@ public class DaprClientGrpcAdapterTest {
public void deleteStateRetryPolicyNoPatternTest() {
String etag = "ETag1";
String key = "key1";
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, null);
SettableFuture<Empty> settableFuture = SettableFuture.create();
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
addCallback(settableFuture, callback, directExecutor());
when(client.deleteState(any(io.dapr.DaprProtos.DeleteStateEnvelope.class)))
.thenReturn(settableFuture);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag);
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
Duration.ofDays(100), 1, null);
StateKeyValue<String> stateKey = buildStateKey(null, key, etag, options);
Mono<Void> result = adater.deleteState(stateKey, options);
settableFuture.set(Empty.newBuilder().build());
result.block();
@ -876,8 +876,8 @@ public class DaprClientGrpcAdapterTest {
assertTrue(callback.wasCalled);
}
private <T> StateKeyValue<T> buildStateKey(T value, String key, String etag) {
return new StateKeyValue(value, key, etag);
private <T> StateKeyValue<T> buildStateKey(T value, String key, String etag, StateOptions options) {
return new StateKeyValue(value, key, etag, options);
}
private StateOptions buildStateOptions(StateOptions.Consistency consistency, StateOptions.Concurrency concurrency,

View File

@ -8,6 +8,7 @@ package io.dapr.it.state;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.StateKeyValue;
import io.dapr.client.domain.StateOptions;
import io.dapr.it.BaseIT;
import io.dapr.it.services.EmptyService;
import org.junit.Assert;
@ -35,69 +36,410 @@ public class HttpStateClientIT extends BaseIT {
@Test
public void saveAndGetState() {
final String stateKey= "myKey";
//The key use to store the state
final String stateKey = "myKey";
DaprClient daprClient= new DaprClientBuilder().build();
MyData data= new MyData();
//create the http client
DaprClient daprClient = new DaprClientBuilder().build();
//creation of a dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
Mono<Void> saveResponse= daprClient.saveState(stateKey,null,data, null);
//create of the deferred call to DAPR to store the state
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
//execute the save action
saveResponse.block();
Mono<StateKeyValue<MyData>> response= daprClient.getState( new StateKeyValue(null,stateKey,null),null,MyData.class);
StateKeyValue<MyData> myDataResponse=response.block();
//create of the deferred call to DAPR to get the state
Mono<StateKeyValue<MyData>> response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class);
Assert.assertEquals("data in property A",myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B",myDataResponse.getValue().getPropertyB());
//retrieve the state
StateKeyValue<MyData> myDataResponse = response.block();
//Assert that the response is the correct one
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
}
@Test
public void saveUpdateAndGetState() {
final String stateKey= "keyToBeUpdated";
DaprClient daprClient= new DaprClientBuilder().build();
MyData data= new MyData();
//The key use to store the state and be updated
final String stateKey = "keyToBeUpdated";
//create http DAPR client
DaprClient daprClient = new DaprClientBuilder().build();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
Mono<Void> saveResponse= daprClient.saveState(stateKey,null,data, null);
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
//execute save action to DAPR
saveResponse.block();
//change data properties
data.setPropertyA("data in property A");
data.setPropertyB("data in property B2");
saveResponse= daprClient.saveState(stateKey,null,data, null);
//create deferred action to update the sate without any etag or options
saveResponse = daprClient.saveState(stateKey, null, data, null);
//execute the update action to DAPR
saveResponse.block();
Mono<StateKeyValue<MyData>> response= daprClient.getState( new StateKeyValue<MyData>(null,stateKey,null),null,MyData.class);
StateKeyValue<MyData> myDataResponse=response.block();
//Create deferred action to retrieve the action
Mono<StateKeyValue<MyData>> response = daprClient.getState(new StateKeyValue<MyData>(null, stateKey, null, null), null, MyData.class);
//execute the retrieve of the state
StateKeyValue<MyData> myDataResponse = response.block();
Assert.assertEquals("data in property A",myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2",myDataResponse.getValue().getPropertyB());
//review that the update was success action
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
}
@Test
public void saveAndDeleteState() {
final String stateKey= "myeKeyToBeDeleted";
//The key use to store the state and be deleted
final String stateKey = "myeKeyToBeDeleted";
DaprClient daprClient= new DaprClientBuilder().build();
MyData data= new MyData();
//create DAPR client
DaprClient daprClient = new DaprClientBuilder().build();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
Mono<Void> saveResponse= daprClient.saveState(stateKey,null,data, null);
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
//execute the save state action
saveResponse.block();
Mono<StateKeyValue<MyData>> response= daprClient.getState( new StateKeyValue<MyData>(null,stateKey,null),null,MyData.class);
StateKeyValue<MyData> myDataResponse=response.block();
//Create deferred action to retrieve the state
Mono<StateKeyValue<MyData>> response = daprClient.getState(new StateKeyValue<MyData>(null, stateKey, null, null), null, MyData.class);
//execute the retrieve of the state
StateKeyValue<MyData> myDataResponse = response.block();
Assert.assertEquals("data in property A",myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B",myDataResponse.getValue().getPropertyB());
//review that the state was saved correctly
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
Mono<Void> deleteResponse= daprClient.deleteState( new StateKeyValue<MyData>(null,stateKey,null),null);
//create deferred action to delete the state
Mono<Void> deleteResponse = daprClient.deleteState(new StateKeyValue<MyData>(null, stateKey, null, null), null);
//execute the delete action
deleteResponse.block();
response= daprClient.getState( new StateKeyValue<MyData>(null,stateKey,null),null,MyData.class);
myDataResponse=response.block();
//Create deferred action to retrieve the state
response = daprClient.getState(new StateKeyValue<MyData>(null, stateKey, null, null), null, MyData.class);
//execute the retrieve of the state
myDataResponse = response.block();
//review that the action does not return any value, because the state was deleted
Assert.assertNull(myDataResponse.getValue());
}
@Test
public void saveUpdateAndGetStateWithEtag() {
//The key use to store the state and be updated using etags
final String stateKey = "keyToBeUpdatedWithEtag";
//create DAPR client
DaprClient daprClient = new DaprClientBuilder().build();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<StateKeyValue<MyData>> response = daprClient.getState(new StateKeyValue<MyData>(null, stateKey, null, null), null, MyData.class);
//execute the action for retrieve the state and the etag
StateKeyValue<MyData> myDataResponse = response.block();
//review that the etag is not empty
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
String firstETag = myDataResponse.getEtag();
//change the data in order to update the state
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//Create deferred action to update the data using the correct etag
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, null);
saveResponse.block();
response = daprClient.getState(new StateKeyValue<MyData>(null, stateKey, null, null), null, MyData.class);
//retrive the data wihout any etag
myDataResponse = response.block();
//review that state value changes
Assert.assertNotNull(myDataResponse.getEtag());
//review that the etag changes after an update
Assert.assertNotEquals(firstETag,myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
}
@Test(expected = RuntimeException.class)
public void saveUpdateAndGetStateWithWrongEtag() {
final String stateKey = "keyToBeUpdatedWithWrongEtag";
//create DAPR client
DaprClient daprClient = new DaprClientBuilder().build();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to retrieve the state
Mono<StateKeyValue<MyData>> response = daprClient.getState(new StateKeyValue<MyData>(null, stateKey, null, null), null, MyData.class);
//execute the action for retrieve the state and the etag
StateKeyValue<MyData> myDataResponse = response.block();
//review that the etag is not empty
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
String firstETag = myDataResponse.getEtag();
//change the data in order to update the state
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//Create deferred action to update the data using the incorrect etag
saveResponse = daprClient.saveState(stateKey, "99999999999999", data, null);
saveResponse.block();
response = daprClient.getState(new StateKeyValue<MyData>(null, stateKey, null, null), null, MyData.class);
//retrive the data wihout any etag
myDataResponse = response.block();
//review that state value changes
Assert.assertNotNull(myDataResponse.getEtag());
//review that the etag changes after an update
Assert.assertNotEquals(firstETag,myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A2", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myDataResponse.getValue().getPropertyB());
}
@Test
public void saveAndDeleteStateWithEtag() {
final String stateKey = "myeKeyToBeDeletedWithEtag";
//create DAPR client
DaprClient daprClient = new DaprClientBuilder().build();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to get the state with the etag
Mono<StateKeyValue<MyData>> response = daprClient.getState(new StateKeyValue<MyData>(null, stateKey, null, null), null, MyData.class);
//execute the get state
StateKeyValue<MyData> myDataResponse = response.block();
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to delete an state sending the etag
Mono<Void> deleteResponse = daprClient.deleteState(new StateKeyValue<MyData>(null, stateKey, myDataResponse.getEtag(), null), null);
//execute the delete of the state
deleteResponse.block();
//Create deferred action to get the sate without an etag
response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class);
myDataResponse = response.block();
//Review that the response is null, because the state was deleted
Assert.assertNull(myDataResponse.getValue());
}
@Test(expected = RuntimeException.class)
public void saveAndDeleteStateWithWrongEtag() {
final String stateKey = "myeKeyToBeDeletedWithWrongEtag";
//create DAPR client
DaprClient daprClient = new DaprClientBuilder().build();
//Create dummy data to be store
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//Create deferred action to save the sate
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, null);
//execute the save state action
saveResponse.block();
//Create deferred action to get the state with the etag
Mono<StateKeyValue<MyData>> response = daprClient.getState(new StateKeyValue<MyData>(null, stateKey, null, null), null, MyData.class);
//execute the get state
StateKeyValue<MyData> myDataResponse = response.block();
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//Create deferred action to delete an state sending the incorrect etag
Mono<Void> deleteResponse = daprClient.deleteState(new StateKeyValue<MyData>(null, stateKey, "99999999999", null), null);
//execute the delete of the state, this should trhow an exception
deleteResponse.block();
//Create deferred action to get the sate without an etag
response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), null, MyData.class);
myDataResponse = response.block();
//Review that the response is null, because the state was deleted
Assert.assertNull(myDataResponse.getValue());
}
@Test(expected = RuntimeException.class)
public void saveUpdateAndGetStateWithEtagAndStateOptionsFirstWrite() {
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
//create option with concurrency with first writte and consistency of strong
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE, null);
//create dapr client
DaprClient daprClient = new DaprClientBuilder().build();
//create Dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//create state using stateOptions
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, stateOptions);
//execute the save state
saveResponse.block();
//crate deferred action to retrieve the state
Mono<StateKeyValue<MyData>> response = daprClient.getState(new StateKeyValue(null, stateKey, null, stateOptions), stateOptions, MyData.class);
//execute the retrieve of the state using options
StateKeyValue<MyData> myDataResponse = response.block();
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//change data to be udpated
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//create deferred action to update the action with options
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state
saveResponse.block();
data.setPropertyA("last write");
data.setPropertyB("data in property B2");
//create deferred action to update the action with the same etag
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions);
//throws an exception, the state was already udpated
saveResponse.block();
response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), stateOptions, MyData.class);
StateKeyValue<MyData> myLastDataResponse = response.block();
Assert.assertNotNull(myLastDataResponse.getEtag());
Assert.assertNotNull(myLastDataResponse.getKey());
Assert.assertNotNull(myLastDataResponse.getValue());
Assert.assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
Assert.assertEquals("data in property A2", myLastDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
}
@Test()
public void saveUpdateAndGetStateWithEtagAndStateOptionsLastWrite() {
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
//create option with concurrency with first writte and consistency of strong
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE, null);
//create dapr client
DaprClient daprClient = new DaprClientBuilder().build();
//create Dummy data
MyData data = new MyData();
data.setPropertyA("data in property A");
data.setPropertyB("data in property B");
//create state using stateOptions
Mono<Void> saveResponse = daprClient.saveState(stateKey, null, data, stateOptions);
//execute the save state
saveResponse.block();
//crate deferred action to retrieve the state
Mono<StateKeyValue<MyData>> response = daprClient.getState(new StateKeyValue(null, stateKey, null, stateOptions), stateOptions, MyData.class);
//execute the retrieve of the state using options
StateKeyValue<MyData> myDataResponse = response.block();
Assert.assertNotNull(myDataResponse.getEtag());
Assert.assertNotNull(myDataResponse.getKey());
Assert.assertNotNull(myDataResponse.getValue());
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
//change data to be udpated
data.setPropertyA("data in property A2");
data.setPropertyB("data in property B2");
//create deferred action to update the action with options
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state
saveResponse.block();
data.setPropertyA("last write");
data.setPropertyB("data in property B2");
//create deferred action to update the action with the same etag
saveResponse = daprClient.saveState(stateKey, myDataResponse.getEtag(), data, stateOptions);
//update the state without an error
saveResponse.block();
response = daprClient.getState(new StateKeyValue(null, stateKey, null, null), stateOptions, MyData.class);
StateKeyValue<MyData> myLastDataResponse = response.block();
Assert.assertNotNull(myLastDataResponse.getEtag());
Assert.assertNotNull(myLastDataResponse.getKey());
Assert.assertNotNull(myLastDataResponse.getValue());
Assert.assertNotNull(myDataResponse.getEtag(), myLastDataResponse.getEtag());
Assert.assertEquals("last write", myLastDataResponse.getValue().getPropertyA());
Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
}
}