mirror of https://github.com/dapr/java-sdk.git
Removing RetryPolicy for state API. (#315)
This commit is contained in:
parent
14fded8d18
commit
56927daedd
|
@ -357,7 +357,7 @@ public class GRPCStateClientIT extends BaseIT {
|
||||||
|
|
||||||
//create option with concurrency with first writte and consistency of strong
|
//create option with concurrency with first writte and consistency of strong
|
||||||
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG,
|
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG,
|
||||||
StateOptions.Concurrency.FIRST_WRITE, null);
|
StateOptions.Concurrency.FIRST_WRITE);
|
||||||
|
|
||||||
|
|
||||||
//create Dummy data
|
//create Dummy data
|
||||||
|
@ -415,8 +415,7 @@ public class GRPCStateClientIT extends BaseIT {
|
||||||
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
|
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
|
||||||
|
|
||||||
//create option with concurrency with first writte and consistency of strong
|
//create option with concurrency with first writte and consistency of strong
|
||||||
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE
|
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE);
|
||||||
, null);
|
|
||||||
|
|
||||||
|
|
||||||
//create Dummy data
|
//create Dummy data
|
||||||
|
@ -469,100 +468,4 @@ public class GRPCStateClientIT extends BaseIT {
|
||||||
assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
|
assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 13000)
|
|
||||||
public void saveDeleteWithRetry() {
|
|
||||||
final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry";
|
|
||||||
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(3), 3,
|
|
||||||
StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
|
|
||||||
|
|
||||||
|
|
||||||
//Create dummy data to be store
|
|
||||||
MyData data = new MyData();
|
|
||||||
data.setPropertyA("data in property A");
|
|
||||||
data.setPropertyB("data in property B");
|
|
||||||
|
|
||||||
//Create deferred action to save the sate
|
|
||||||
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
|
|
||||||
//execute the save state action
|
|
||||||
saveResponse.block();
|
|
||||||
|
|
||||||
//Create deferred action to retrieve the state
|
|
||||||
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
|
|
||||||
//execute the action for retrieve the state and the etag
|
|
||||||
State<MyData> myDataResponse = response.block();
|
|
||||||
|
|
||||||
//review that the etag is not empty
|
|
||||||
assertNotNull(myDataResponse.getEtag());
|
|
||||||
assertNotNull(myDataResponse.getKey());
|
|
||||||
assertNotNull(myDataResponse.getValue());
|
|
||||||
assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
|
|
||||||
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
|
|
||||||
|
|
||||||
|
|
||||||
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999", stateOptions);
|
|
||||||
|
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
try {
|
|
||||||
//delete action
|
|
||||||
deleteResponse.block();
|
|
||||||
} catch (RuntimeException ex) {
|
|
||||||
assertTrue(ex.getMessage().contains("failed to set value after 3 retries"));
|
|
||||||
}
|
|
||||||
long end = System.currentTimeMillis();
|
|
||||||
System.out.println("DEBUG: Logic A took " + (end - start) + " MilliSeconds");
|
|
||||||
long elapsedTime = end - start;
|
|
||||||
assertTrue(elapsedTime > 9000 && elapsedTime < 9200);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Ignore("Ignored as an issue on DAPR")
|
|
||||||
@Test(timeout = 13000)
|
|
||||||
public void saveUpdateWithRetry() {
|
|
||||||
final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry";
|
|
||||||
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(4), 3,
|
|
||||||
StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
|
|
||||||
|
|
||||||
|
|
||||||
//Create dummy data to be store
|
|
||||||
MyData data = new MyData();
|
|
||||||
data.setPropertyA("data in property A");
|
|
||||||
data.setPropertyB("data in property B");
|
|
||||||
|
|
||||||
//Create deferred action to save the sate
|
|
||||||
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
|
|
||||||
//execute the save state action
|
|
||||||
saveResponse.block();
|
|
||||||
|
|
||||||
//Create deferred action to retrieve the state
|
|
||||||
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
|
|
||||||
//execute the action for retrieve the state and the etag
|
|
||||||
State<MyData> myDataResponse = response.block();
|
|
||||||
|
|
||||||
//review that the etag is not empty
|
|
||||||
assertNotNull(myDataResponse.getEtag());
|
|
||||||
assertNotNull(myDataResponse.getKey());
|
|
||||||
assertNotNull(myDataResponse.getValue());
|
|
||||||
assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
|
|
||||||
assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
|
|
||||||
|
|
||||||
//Create deferred action to save the sate
|
|
||||||
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "9999999", data, stateOptions);
|
|
||||||
//execute the save state action
|
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
|
|
||||||
|
|
||||||
try {
|
|
||||||
saveResponse.block();
|
|
||||||
} catch (RuntimeException ex) {
|
|
||||||
assertTrue(ex.getMessage().contains("failed to set value after 3 retries"));
|
|
||||||
}
|
|
||||||
long end = System.currentTimeMillis();
|
|
||||||
System.out.println("DEBUG: Logic A took " + (end - start) + " MilliSeconds");
|
|
||||||
long elapsedTime = end - start;
|
|
||||||
assertTrue(elapsedTime > 9000 && elapsedTime < 9200);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -345,7 +345,7 @@ public class HttpStateClientIT extends BaseIT {
|
||||||
|
|
||||||
//create option with concurrency with first writte and consistency of strong
|
//create option with concurrency with first writte and consistency of strong
|
||||||
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG,
|
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG,
|
||||||
StateOptions.Concurrency.FIRST_WRITE, null);
|
StateOptions.Concurrency.FIRST_WRITE);
|
||||||
|
|
||||||
//create dapr client
|
//create dapr client
|
||||||
DaprClient daprClient = buildDaprClient();
|
DaprClient daprClient = buildDaprClient();
|
||||||
|
@ -404,8 +404,7 @@ public class HttpStateClientIT extends BaseIT {
|
||||||
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
|
final String stateKey = "keyToBeUpdatedWithEtagAndOptions";
|
||||||
|
|
||||||
//create option with concurrency with first writte and consistency of strong
|
//create option with concurrency with first writte and consistency of strong
|
||||||
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE
|
StateOptions stateOptions = new StateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.LAST_WRITE);
|
||||||
, null);
|
|
||||||
|
|
||||||
//create dapr client
|
//create dapr client
|
||||||
DaprClient daprClient = buildDaprClient();
|
DaprClient daprClient = buildDaprClient();
|
||||||
|
@ -459,104 +458,6 @@ public class HttpStateClientIT extends BaseIT {
|
||||||
Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
|
Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 13000)
|
|
||||||
public void saveDeleteWithRetry() {
|
|
||||||
final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry";
|
|
||||||
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(3), 3,
|
|
||||||
StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
|
|
||||||
|
|
||||||
//create DAPR client
|
|
||||||
DaprClient daprClient = buildDaprClient();
|
|
||||||
//Create dummy data to be store
|
|
||||||
MyData data = new MyData();
|
|
||||||
data.setPropertyA("data in property A");
|
|
||||||
data.setPropertyB("data in property B");
|
|
||||||
|
|
||||||
//Create deferred action to save the sate
|
|
||||||
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
|
|
||||||
//execute the save state action
|
|
||||||
saveResponse.block();
|
|
||||||
|
|
||||||
//Create deferred action to retrieve the state
|
|
||||||
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
|
|
||||||
//execute the action for retrieve the state and the etag
|
|
||||||
State<MyData> myDataResponse = response.block();
|
|
||||||
|
|
||||||
//review that the etag is not empty
|
|
||||||
Assert.assertNotNull(myDataResponse.getEtag());
|
|
||||||
Assert.assertNotNull(myDataResponse.getKey());
|
|
||||||
Assert.assertNotNull(myDataResponse.getValue());
|
|
||||||
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
|
|
||||||
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
|
|
||||||
|
|
||||||
|
|
||||||
Mono<Void> deleteResponse = daprClient.deleteState(STATE_STORE_NAME, stateKey, "99999999", stateOptions);
|
|
||||||
|
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
try {
|
|
||||||
//delete action
|
|
||||||
deleteResponse.block();
|
|
||||||
} catch (RuntimeException ex) {
|
|
||||||
Assert.assertTrue(ex.getMessage().contains("failed to set value after 3 retries"));
|
|
||||||
}
|
|
||||||
long end = System.currentTimeMillis();
|
|
||||||
System.out.println("DEBUG: Logic A took " + (end - start) + " MilliSeconds");
|
|
||||||
long elapsedTime = end - start;
|
|
||||||
Assert.assertTrue(elapsedTime > 9000 && elapsedTime < 9200);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Ignore("Ignored as an issue on DAPR")
|
|
||||||
@Test(timeout = 13000)
|
|
||||||
public void saveUpdateWithRetry() {
|
|
||||||
final String stateKey = "keyToBeDeleteWithWrongEtagAndRetry";
|
|
||||||
StateOptions.RetryPolicy retryPolicy = new StateOptions.RetryPolicy(Duration.ofSeconds(4), 3,
|
|
||||||
StateOptions.RetryPolicy.Pattern.EXPONENTIAL);
|
|
||||||
StateOptions stateOptions = new StateOptions(null, null, retryPolicy);
|
|
||||||
|
|
||||||
//create DAPR client
|
|
||||||
DaprClient daprClient = buildDaprClient();
|
|
||||||
//Create dummy data to be store
|
|
||||||
MyData data = new MyData();
|
|
||||||
data.setPropertyA("data in property A");
|
|
||||||
data.setPropertyB("data in property B");
|
|
||||||
|
|
||||||
//Create deferred action to save the sate
|
|
||||||
Mono<Void> saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, null, data, null);
|
|
||||||
//execute the save state action
|
|
||||||
saveResponse.block();
|
|
||||||
|
|
||||||
//Create deferred action to retrieve the state
|
|
||||||
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey, null, null), MyData.class);
|
|
||||||
//execute the action for retrieve the state and the etag
|
|
||||||
State<MyData> myDataResponse = response.block();
|
|
||||||
|
|
||||||
//review that the etag is not empty
|
|
||||||
Assert.assertNotNull(myDataResponse.getEtag());
|
|
||||||
Assert.assertNotNull(myDataResponse.getKey());
|
|
||||||
Assert.assertNotNull(myDataResponse.getValue());
|
|
||||||
Assert.assertEquals("data in property A", myDataResponse.getValue().getPropertyA());
|
|
||||||
Assert.assertEquals("data in property B", myDataResponse.getValue().getPropertyB());
|
|
||||||
|
|
||||||
//Create deferred action to save the sate
|
|
||||||
saveResponse = daprClient.saveState(STATE_STORE_NAME, stateKey, "9999999", data, stateOptions);
|
|
||||||
//execute the save state action
|
|
||||||
long start = System.currentTimeMillis();
|
|
||||||
|
|
||||||
|
|
||||||
try {
|
|
||||||
saveResponse.block();
|
|
||||||
} catch (RuntimeException ex) {
|
|
||||||
Assert.assertTrue(ex.getMessage().contains("failed to set value after 3 retries"));
|
|
||||||
}
|
|
||||||
long end = System.currentTimeMillis();
|
|
||||||
System.out.println("DEBUG: Logic A took " + (end - start) + " MilliSeconds");
|
|
||||||
long elapsedTime = end - start;
|
|
||||||
Assert.assertTrue(elapsedTime > 9000 && elapsedTime < 9200);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static DaprClient buildDaprClient() {
|
private static DaprClient buildDaprClient() {
|
||||||
return new DaprClientBuilder().build();
|
return new DaprClientBuilder().build();
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,6 @@ package io.dapr.client;
|
||||||
|
|
||||||
import io.dapr.client.domain.State;
|
import io.dapr.client.domain.State;
|
||||||
import io.dapr.client.domain.StateOptions;
|
import io.dapr.client.domain.StateOptions;
|
||||||
import io.dapr.client.domain.Verb;
|
|
||||||
import io.dapr.utils.TypeRef;
|
import io.dapr.utils.TypeRef;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,6 @@ package io.dapr.client;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.protobuf.Any;
|
import com.google.protobuf.Any;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
import com.google.protobuf.Duration;
|
|
||||||
import com.google.protobuf.Empty;
|
import com.google.protobuf.Empty;
|
||||||
import io.dapr.client.domain.State;
|
import io.dapr.client.domain.State;
|
||||||
import io.dapr.client.domain.StateOptions;
|
import io.dapr.client.domain.StateOptions;
|
||||||
|
@ -25,8 +24,6 @@ import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static io.dapr.client.domain.StateOptions.RetryPolicy;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An adapter for the GRPC Client.
|
* An adapter for the GRPC Client.
|
||||||
*
|
*
|
||||||
|
@ -99,17 +96,6 @@ public class DaprClientGrpc implements DaprClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private CommonProtos.StateRetryPolicy.RetryPattern getGrpcStateRetryPolicy(RetryPolicy policy) {
|
|
||||||
switch (policy.getPattern()) {
|
|
||||||
case LINEAR:
|
|
||||||
return CommonProtos.StateRetryPolicy.RetryPattern.RETRY_LINEAR;
|
|
||||||
case EXPONENTIAL:
|
|
||||||
return CommonProtos.StateRetryPolicy.RetryPattern.RETRY_EXPONENTIAL;
|
|
||||||
default:
|
|
||||||
throw new IllegalArgumentException("Missing RetryPattern mapping to gRPC retry pattern enum");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
|
@ -447,24 +433,6 @@ public class DaprClientGrpc implements DaprClient {
|
||||||
CommonProtos.StateOptions.Builder optionBuilder = null;
|
CommonProtos.StateOptions.Builder optionBuilder = null;
|
||||||
if (state.getOptions() != null) {
|
if (state.getOptions() != null) {
|
||||||
StateOptions options = state.getOptions();
|
StateOptions options = state.getOptions();
|
||||||
CommonProtos.StateRetryPolicy.Builder retryPolicyBuilder = null;
|
|
||||||
if (options.getRetryPolicy() != null) {
|
|
||||||
retryPolicyBuilder = CommonProtos.StateRetryPolicy.newBuilder();
|
|
||||||
RetryPolicy retryPolicy = options.getRetryPolicy();
|
|
||||||
if (options.getRetryPolicy().getInterval() != null) {
|
|
||||||
Duration.Builder durationBuilder = Duration.newBuilder()
|
|
||||||
.setNanos(retryPolicy.getInterval().getNano())
|
|
||||||
.setSeconds(retryPolicy.getInterval().getSeconds());
|
|
||||||
retryPolicyBuilder.setInterval(durationBuilder.build());
|
|
||||||
}
|
|
||||||
if (retryPolicy.getThreshold() != null) {
|
|
||||||
retryPolicyBuilder.setThreshold(retryPolicy.getThreshold());
|
|
||||||
}
|
|
||||||
if (retryPolicy.getPattern() != null) {
|
|
||||||
retryPolicyBuilder.setPattern(getGrpcStateRetryPolicy(retryPolicy));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
optionBuilder = CommonProtos.StateOptions.newBuilder();
|
optionBuilder = CommonProtos.StateOptions.newBuilder();
|
||||||
if (options.getConcurrency() != null) {
|
if (options.getConcurrency() != null) {
|
||||||
optionBuilder.setConcurrency(getGrpcStateConcurrency(options));
|
optionBuilder.setConcurrency(getGrpcStateConcurrency(options));
|
||||||
|
@ -472,9 +440,6 @@ public class DaprClientGrpc implements DaprClient {
|
||||||
if (options.getConsistency() != null) {
|
if (options.getConsistency() != null) {
|
||||||
optionBuilder.setConsistency(getGrpcStateConsistency(options));
|
optionBuilder.setConsistency(getGrpcStateConsistency(options));
|
||||||
}
|
}
|
||||||
if (retryPolicyBuilder != null) {
|
|
||||||
optionBuilder.setRetryPolicy(retryPolicyBuilder.build());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (optionBuilder != null) {
|
if (optionBuilder != null) {
|
||||||
stateBuilder.setOptions(optionBuilder.build());
|
stateBuilder.setOptions(optionBuilder.build());
|
||||||
|
@ -523,23 +488,6 @@ public class DaprClientGrpc implements DaprClient {
|
||||||
CommonProtos.StateOptions.Builder optionBuilder = null;
|
CommonProtos.StateOptions.Builder optionBuilder = null;
|
||||||
if (options != null) {
|
if (options != null) {
|
||||||
optionBuilder = CommonProtos.StateOptions.newBuilder();
|
optionBuilder = CommonProtos.StateOptions.newBuilder();
|
||||||
CommonProtos.StateRetryPolicy.Builder retryPolicyBuilder = null;
|
|
||||||
if (options.getRetryPolicy() != null) {
|
|
||||||
retryPolicyBuilder = CommonProtos.StateRetryPolicy.newBuilder();
|
|
||||||
RetryPolicy retryPolicy = options.getRetryPolicy();
|
|
||||||
if (options.getRetryPolicy().getInterval() != null) {
|
|
||||||
Duration.Builder durationBuilder = Duration.newBuilder()
|
|
||||||
.setNanos(retryPolicy.getInterval().getNano())
|
|
||||||
.setSeconds(retryPolicy.getInterval().getSeconds());
|
|
||||||
retryPolicyBuilder.setInterval(durationBuilder.build());
|
|
||||||
}
|
|
||||||
if (retryPolicy.getThreshold() != null) {
|
|
||||||
retryPolicyBuilder.setThreshold(retryPolicy.getThreshold());
|
|
||||||
}
|
|
||||||
if (retryPolicy.getPattern() != null) {
|
|
||||||
retryPolicyBuilder.setPattern(getGrpcStateRetryPolicy(retryPolicy));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
optionBuilder = CommonProtos.StateOptions.newBuilder();
|
optionBuilder = CommonProtos.StateOptions.newBuilder();
|
||||||
if (options.getConcurrency() != null) {
|
if (options.getConcurrency() != null) {
|
||||||
|
@ -548,9 +496,6 @@ public class DaprClientGrpc implements DaprClient {
|
||||||
if (options.getConsistency() != null) {
|
if (options.getConsistency() != null) {
|
||||||
optionBuilder.setConsistency(getGrpcStateConsistency(options));
|
optionBuilder.setConsistency(getGrpcStateConsistency(options));
|
||||||
}
|
}
|
||||||
if (retryPolicyBuilder != null) {
|
|
||||||
optionBuilder.setRetryPolicy(retryPolicyBuilder.build());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
DaprProtos.DeleteStateRequest.Builder builder = DaprProtos.DeleteStateRequest.newBuilder()
|
DaprProtos.DeleteStateRequest.Builder builder = DaprProtos.DeleteStateRequest.newBuilder()
|
||||||
.setStoreName(stateStoreName)
|
.setStoreName(stateStoreName)
|
||||||
|
|
|
@ -7,7 +7,6 @@ package io.dapr.client;
|
||||||
|
|
||||||
import io.dapr.client.domain.State;
|
import io.dapr.client.domain.State;
|
||||||
import io.dapr.client.domain.StateOptions;
|
import io.dapr.client.domain.StateOptions;
|
||||||
import io.dapr.client.domain.Verb;
|
|
||||||
import io.dapr.serializer.DaprObjectSerializer;
|
import io.dapr.serializer.DaprObjectSerializer;
|
||||||
import io.dapr.serializer.DefaultObjectSerializer;
|
import io.dapr.serializer.DefaultObjectSerializer;
|
||||||
import io.dapr.utils.Constants;
|
import io.dapr.utils.Constants;
|
||||||
|
|
|
@ -29,18 +29,15 @@ import java.util.Optional;
|
||||||
public class StateOptions {
|
public class StateOptions {
|
||||||
private final Consistency consistency;
|
private final Consistency consistency;
|
||||||
private final Concurrency concurrency;
|
private final Concurrency concurrency;
|
||||||
private final RetryPolicy retryPolicy;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents options for a Dapr state API call.
|
* Represents options for a Dapr state API call.
|
||||||
* @param consistency The consistency mode.
|
* @param consistency The consistency mode.
|
||||||
* @param concurrency The concurrency mode.
|
* @param concurrency The concurrency mode.
|
||||||
* @param retryPolicy The retry policy.
|
|
||||||
*/
|
*/
|
||||||
public StateOptions(Consistency consistency, Concurrency concurrency, RetryPolicy retryPolicy) {
|
public StateOptions(Consistency consistency, Concurrency concurrency) {
|
||||||
this.consistency = consistency;
|
this.consistency = consistency;
|
||||||
this.concurrency = concurrency;
|
this.concurrency = concurrency;
|
||||||
this.retryPolicy = retryPolicy;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Concurrency getConcurrency() {
|
public Concurrency getConcurrency() {
|
||||||
|
@ -51,10 +48,6 @@ public class StateOptions {
|
||||||
return consistency;
|
return consistency;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RetryPolicy getRetryPolicy() {
|
|
||||||
return retryPolicy;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns state options as a Map of option name to value.
|
* Returns state options as a Map of option name to value.
|
||||||
* @return A map of state options.
|
* @return A map of state options.
|
||||||
|
@ -70,17 +63,6 @@ public class StateOptions {
|
||||||
if (this.getConcurrency() != null) {
|
if (this.getConcurrency() != null) {
|
||||||
mapOptions.put("concurrency", this.getConcurrency().getValue());
|
mapOptions.put("concurrency", this.getConcurrency().getValue());
|
||||||
}
|
}
|
||||||
if (this.getRetryPolicy() != null) {
|
|
||||||
if (this.getRetryPolicy().getInterval() != null) {
|
|
||||||
mapOptions.put("retryInterval", String.valueOf(this.getRetryPolicy().getInterval().toMillis()));
|
|
||||||
}
|
|
||||||
if (this.getRetryPolicy().getThreshold() != null) {
|
|
||||||
mapOptions.put("retryThreshold", this.getRetryPolicy().getThreshold().toString());
|
|
||||||
}
|
|
||||||
if (this.getRetryPolicy().getPattern() != null) {
|
|
||||||
mapOptions.put("retryPattern", this.getRetryPolicy().getPattern().getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return Collections.unmodifiableMap(Optional.ofNullable(mapOptions).orElse(Collections.EMPTY_MAP));
|
return Collections.unmodifiableMap(Optional.ofNullable(mapOptions).orElse(Collections.EMPTY_MAP));
|
||||||
}
|
}
|
||||||
|
@ -127,60 +109,6 @@ public class StateOptions {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class RetryPolicy {
|
|
||||||
public enum Pattern {
|
|
||||||
LINEAR("linear"),
|
|
||||||
EXPONENTIAL("exponential");
|
|
||||||
|
|
||||||
private String value;
|
|
||||||
|
|
||||||
Pattern(String value) {
|
|
||||||
this.value = value;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonValue
|
|
||||||
public String getValue() {
|
|
||||||
return this.value;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonCreator
|
|
||||||
public static Pattern fromValue(String value) {
|
|
||||||
return Pattern.valueOf(value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonSerialize(using = StateOptionDurationSerializer.class)
|
|
||||||
@JsonDeserialize(using = StateOptionDurationDeserializer.class)
|
|
||||||
private final Duration interval;
|
|
||||||
private final Integer threshold;
|
|
||||||
private final Pattern pattern;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Represents retry policies on a state operation.
|
|
||||||
* @param interval The delay between retries.
|
|
||||||
* @param threshold The total number of retries.
|
|
||||||
* @param pattern The way to retry: linear or exponential.
|
|
||||||
*/
|
|
||||||
public RetryPolicy(Duration interval, Integer threshold, Pattern pattern) {
|
|
||||||
this.interval = interval;
|
|
||||||
this.threshold = threshold;
|
|
||||||
this.pattern = pattern;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Duration getInterval() {
|
|
||||||
return interval;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Integer getThreshold() {
|
|
||||||
return threshold;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Pattern getPattern() {
|
|
||||||
return pattern;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class StateOptionDurationSerializer extends StdSerializer<Duration> {
|
public static class StateOptionDurationSerializer extends StdSerializer<Duration> {
|
||||||
|
|
||||||
public StateOptionDurationSerializer() {
|
public StateOptionDurationSerializer() {
|
||||||
|
|
|
@ -1,16 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) Microsoft Corporation.
|
|
||||||
* Licensed under the MIT License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.dapr.client.domain;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Verbs used to invoke methods in Dapr.
|
|
||||||
*/
|
|
||||||
public enum Verb {
|
|
||||||
GET,
|
|
||||||
PUT,
|
|
||||||
POST,
|
|
||||||
DELETE
|
|
||||||
}
|
|
|
@ -639,8 +639,7 @@ public class DaprClientGrpcTest {
|
||||||
String etag = "ETag1";
|
String etag = "ETag1";
|
||||||
String key = "key1";
|
String key = "key1";
|
||||||
MyObject expectedValue = new MyObject(1, "The Value");
|
MyObject expectedValue = new MyObject(1, "The Value");
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
|
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE);
|
||||||
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
State<MyObject> expectedState = buildStateKey(expectedValue, key, etag, options);
|
State<MyObject> expectedState = buildStateKey(expectedValue, key, etag, options);
|
||||||
DaprProtos.GetStateResponse responseEnvelope = DaprProtos.GetStateResponse.newBuilder()
|
DaprProtos.GetStateResponse responseEnvelope = DaprProtos.GetStateResponse.newBuilder()
|
||||||
.setData(getBytes(expectedValue))
|
.setData(getBytes(expectedValue))
|
||||||
|
@ -662,8 +661,7 @@ public class DaprClientGrpcTest {
|
||||||
String etag = "ETag1";
|
String etag = "ETag1";
|
||||||
String key = "key1";
|
String key = "key1";
|
||||||
MyObject expectedValue = new MyObject(1, "The Value");
|
MyObject expectedValue = new MyObject(1, "The Value");
|
||||||
StateOptions options = new StateOptions(null, StateOptions.Concurrency.FIRST_WRITE,
|
StateOptions options = new StateOptions(null, StateOptions.Concurrency.FIRST_WRITE);
|
||||||
new StateOptions.RetryPolicy(Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR));
|
|
||||||
State<MyObject> expectedState = buildStateKey(expectedValue, key, etag, options);
|
State<MyObject> expectedState = buildStateKey(expectedValue, key, etag, options);
|
||||||
DaprProtos.GetStateResponse responseEnvelope = DaprProtos.GetStateResponse.newBuilder()
|
DaprProtos.GetStateResponse responseEnvelope = DaprProtos.GetStateResponse.newBuilder()
|
||||||
.setData(getBytes(expectedValue))
|
.setData(getBytes(expectedValue))
|
||||||
|
@ -724,8 +722,7 @@ public class DaprClientGrpcTest {
|
||||||
public void deleteStateTest() {
|
public void deleteStateTest() {
|
||||||
String etag = "ETag1";
|
String etag = "ETag1";
|
||||||
String key = "key1";
|
String key = "key1";
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
|
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE);
|
||||||
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
addCallback(settableFuture, callback, directExecutor());
|
||||||
|
@ -743,8 +740,7 @@ public class DaprClientGrpcTest {
|
||||||
public void deleteStateTestNoHotMono() {
|
public void deleteStateTestNoHotMono() {
|
||||||
String etag = "ETag1";
|
String etag = "ETag1";
|
||||||
String key = "key1";
|
String key = "key1";
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
|
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE);
|
||||||
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
addCallback(settableFuture, callback, directExecutor());
|
||||||
|
@ -764,8 +760,7 @@ public class DaprClientGrpcTest {
|
||||||
public void deleteStateNoConsistencyTest() {
|
public void deleteStateNoConsistencyTest() {
|
||||||
String etag = "ETag1";
|
String etag = "ETag1";
|
||||||
String key = "key1";
|
String key = "key1";
|
||||||
StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE,
|
StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE);
|
||||||
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
addCallback(settableFuture, callback, directExecutor());
|
||||||
|
@ -783,84 +778,7 @@ public class DaprClientGrpcTest {
|
||||||
public void deleteStateNoConcurrencyTest() {
|
public void deleteStateNoConcurrencyTest() {
|
||||||
String etag = "ETag1";
|
String etag = "ETag1";
|
||||||
String key = "key1";
|
String key = "key1";
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null,
|
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null);
|
||||||
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
|
||||||
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
|
|
||||||
.thenReturn(settableFuture);
|
|
||||||
State<String> stateKey = buildStateKey(null, key, etag, options);
|
|
||||||
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
|
|
||||||
stateKey.getOptions());
|
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
|
||||||
result.block();
|
|
||||||
assertTrue(callback.wasCalled);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void deleteStateNoRetryPolicyTest() {
|
|
||||||
String etag = "ETag1";
|
|
||||||
String key = "key1";
|
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
|
|
||||||
null, null, null);
|
|
||||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
|
||||||
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
|
|
||||||
.thenReturn(settableFuture);
|
|
||||||
State<String> stateKey = buildStateKey(null, key, etag, options);
|
|
||||||
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
|
|
||||||
stateKey.getOptions());
|
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
|
||||||
result.block();
|
|
||||||
assertTrue(callback.wasCalled);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void deleteStateRetryPolicyNoDurationTest() {
|
|
||||||
String etag = "ETag1";
|
|
||||||
String key = "key1";
|
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
|
|
||||||
null, 1, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
|
||||||
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
|
|
||||||
.thenReturn(settableFuture);
|
|
||||||
State<String> stateKey = buildStateKey(null, key, etag, options);
|
|
||||||
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
|
|
||||||
stateKey.getOptions());
|
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
|
||||||
result.block();
|
|
||||||
assertTrue(callback.wasCalled);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void deleteStateRetryPolicyNoThresholdTest() {
|
|
||||||
String etag = "ETag1";
|
|
||||||
String key = "key1";
|
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
|
|
||||||
Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
|
||||||
when(client.deleteState(any(io.dapr.v1.DaprProtos.DeleteStateRequest.class)))
|
|
||||||
.thenReturn(settableFuture);
|
|
||||||
State<String> stateKey = buildStateKey(null, key, etag, options);
|
|
||||||
Mono<Void> result = adapter.deleteState(STATE_STORE_NAME, stateKey.getKey(), stateKey.getEtag(),
|
|
||||||
stateKey.getOptions());
|
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
|
||||||
result.block();
|
|
||||||
assertTrue(callback.wasCalled);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void deleteStateRetryPolicyNoPatternTest() {
|
|
||||||
String etag = "ETag1";
|
|
||||||
String key = "key1";
|
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
|
|
||||||
Duration.ofDays(100), 1, null);
|
|
||||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
addCallback(settableFuture, callback, directExecutor());
|
||||||
|
@ -923,8 +841,7 @@ public class DaprClientGrpcTest {
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
addCallback(settableFuture, callback, directExecutor());
|
||||||
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
|
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE);
|
||||||
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
settableFuture.set(Empty.newBuilder().build());
|
||||||
result.block();
|
result.block();
|
||||||
|
@ -943,8 +860,7 @@ public class DaprClientGrpcTest {
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
settableFuture.set(Empty.newBuilder().build());
|
||||||
return settableFuture;
|
return settableFuture;
|
||||||
});
|
});
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
|
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE);
|
||||||
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
||||||
// No call to result.block(), so nothing should happen.
|
// No call to result.block(), so nothing should happen.
|
||||||
assertFalse(callback.wasCalled);
|
assertFalse(callback.wasCalled);
|
||||||
|
@ -959,8 +875,7 @@ public class DaprClientGrpcTest {
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
addCallback(settableFuture, callback, directExecutor());
|
||||||
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
||||||
StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE,
|
StateOptions options = buildStateOptions(null, StateOptions.Concurrency.FIRST_WRITE);
|
||||||
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
settableFuture.set(Empty.newBuilder().build());
|
||||||
result.block();
|
result.block();
|
||||||
|
@ -976,8 +891,7 @@ public class DaprClientGrpcTest {
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
addCallback(settableFuture, callback, directExecutor());
|
||||||
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null,
|
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null);
|
||||||
Duration.ofDays(100), 1, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
settableFuture.set(Empty.newBuilder().build());
|
||||||
result.block();
|
result.block();
|
||||||
|
@ -993,59 +907,7 @@ public class DaprClientGrpcTest {
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
addCallback(settableFuture, callback, directExecutor());
|
||||||
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
|
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE);
|
||||||
null, null, null);
|
|
||||||
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
|
||||||
result.block();
|
|
||||||
assertTrue(callback.wasCalled);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void saveStateRetryPolicyNoDurationTest() {
|
|
||||||
String key = "key1";
|
|
||||||
String etag = "ETag1";
|
|
||||||
String value = "State value";
|
|
||||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
|
||||||
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
|
|
||||||
null, 1, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
|
||||||
result.block();
|
|
||||||
assertTrue(callback.wasCalled);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void saveStateRetryPolicyNoThresholdTest() {
|
|
||||||
String key = "key1";
|
|
||||||
String etag = "ETag1";
|
|
||||||
String value = "State value";
|
|
||||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
|
||||||
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
|
|
||||||
Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
|
||||||
result.block();
|
|
||||||
assertTrue(callback.wasCalled);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void saveStateRetryPolicyNoPatternTest() {
|
|
||||||
String key = "key1";
|
|
||||||
String etag = "ETag1";
|
|
||||||
String value = "State value";
|
|
||||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
|
||||||
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, StateOptions.Concurrency.FIRST_WRITE,
|
|
||||||
Duration.ofDays(100), 1, null);
|
|
||||||
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
settableFuture.set(Empty.newBuilder().build());
|
||||||
result.block();
|
result.block();
|
||||||
|
@ -1221,8 +1083,7 @@ public class DaprClientGrpcTest {
|
||||||
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
settableFuture.set(Empty.newBuilder().build());
|
||||||
for (StateOptions.Consistency consistency : StateOptions.Consistency.values()) {
|
for (StateOptions.Consistency consistency : StateOptions.Consistency.values()) {
|
||||||
StateOptions options = buildStateOptions(consistency, StateOptions.Concurrency.FIRST_WRITE,
|
StateOptions options = buildStateOptions(consistency, StateOptions.Concurrency.FIRST_WRITE);
|
||||||
Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
||||||
result.block();
|
result.block();
|
||||||
}
|
}
|
||||||
|
@ -1244,31 +1105,7 @@ public class DaprClientGrpcTest {
|
||||||
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
settableFuture.set(Empty.newBuilder().build());
|
||||||
for (StateOptions.Concurrency concurrency : StateOptions.Concurrency.values()) {
|
for (StateOptions.Concurrency concurrency : StateOptions.Concurrency.values()) {
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.EVENTUAL, concurrency,
|
StateOptions options = buildStateOptions(StateOptions.Consistency.EVENTUAL, concurrency);
|
||||||
Duration.ofDays(100), null, StateOptions.RetryPolicy.Pattern.LINEAR);
|
|
||||||
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
|
||||||
result.block();
|
|
||||||
}
|
|
||||||
|
|
||||||
assertTrue(callback.wasCalled);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* If this test is failing, it means that a new value was added to StateOptions.RetryPolicy.Pattern
|
|
||||||
* enum, without creating a mapping to one of the proto defined gRPC enums
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void stateOptionsRetryPatternValuesHaveValidGrpcEnumMappings() {
|
|
||||||
String key = "key1";
|
|
||||||
String etag = "ETag1";
|
|
||||||
String value = "State value";
|
|
||||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
|
||||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
|
||||||
addCallback(settableFuture, callback, directExecutor());
|
|
||||||
when(client.saveState(any(io.dapr.v1.DaprProtos.SaveStateRequest.class))).thenReturn(settableFuture);
|
|
||||||
settableFuture.set(Empty.newBuilder().build());
|
|
||||||
for (StateOptions.RetryPolicy.Pattern retryPattern : StateOptions.RetryPolicy.Pattern.values()) {
|
|
||||||
StateOptions options = buildStateOptions(StateOptions.Consistency.EVENTUAL,
|
|
||||||
StateOptions.Concurrency.FIRST_WRITE, Duration.ofDays(100), null, retryPattern);
|
|
||||||
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
Mono<Void> result = adapter.saveState(STATE_STORE_NAME, key, etag, value, options);
|
||||||
result.block();
|
result.block();
|
||||||
}
|
}
|
||||||
|
@ -1303,17 +1140,10 @@ public class DaprClientGrpcTest {
|
||||||
return DaprProtos.GetSecretResponse.newBuilder().build();
|
return DaprProtos.GetSecretResponse.newBuilder().build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private StateOptions buildStateOptions(StateOptions.Consistency consistency, StateOptions.Concurrency concurrency,
|
private StateOptions buildStateOptions(StateOptions.Consistency consistency, StateOptions.Concurrency concurrency) {
|
||||||
Duration interval, Integer threshold,
|
|
||||||
StateOptions.RetryPolicy.Pattern pattern) {
|
|
||||||
|
|
||||||
StateOptions.RetryPolicy retryPolicy = null;
|
|
||||||
if (interval != null || threshold != null || pattern != null) {
|
|
||||||
retryPolicy = new StateOptions.RetryPolicy(interval, threshold, pattern);
|
|
||||||
}
|
|
||||||
StateOptions options = null;
|
StateOptions options = null;
|
||||||
if (consistency != null || concurrency != null || retryPolicy != null) {
|
if (consistency != null || concurrency != null) {
|
||||||
options = new StateOptions(consistency, concurrency, retryPolicy);
|
options = new StateOptions(consistency, concurrency);
|
||||||
}
|
}
|
||||||
return options;
|
return options;
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,6 @@ import com.fasterxml.jackson.core.JsonFactory;
|
||||||
import com.fasterxml.jackson.core.JsonGenerator;
|
import com.fasterxml.jackson.core.JsonGenerator;
|
||||||
import io.dapr.client.*;
|
import io.dapr.client.*;
|
||||||
import io.dapr.client.domain.CloudEvent;
|
import io.dapr.client.domain.CloudEvent;
|
||||||
import io.dapr.client.domain.Verb;
|
|
||||||
import io.dapr.serializer.DaprObjectSerializer;
|
import io.dapr.serializer.DaprObjectSerializer;
|
||||||
import io.dapr.serializer.DefaultObjectSerializer;
|
import io.dapr.serializer.DefaultObjectSerializer;
|
||||||
import io.dapr.utils.Constants;
|
import io.dapr.utils.Constants;
|
||||||
|
|
Loading…
Reference in New Issue