mirror of https://github.com/dapr/java-sdk.git
Initial transaction API changes (#337)
* Initial transaction API changes * Add more unit tests * Refactor IT
This commit is contained in:
parent
a9ee8e00ea
commit
f6b56bb376
|
|
@ -25,7 +25,7 @@ jobs:
|
|||
DAPR_RUNTIME_VER: 0.10.0-rc.0
|
||||
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/e7c9a643dfefbcfff0c2c26c12029259e6e81180/install/install.sh
|
||||
DAPR_CLI_REF: e7c9a643dfefbcfff0c2c26c12029259e6e81180
|
||||
DAPR_REF: 1e3b7aa3202e268759bce6cf9b30b6d95471ab8c
|
||||
DAPR_REF: afbe726a56a3f165d8a86681ae9e1f608047a7a9
|
||||
OSSRH_USER_TOKEN: ${{ secrets.OSSRH_USER_TOKEN }}
|
||||
OSSRH_PWD_TOKEN: ${{ secrets.OSSRH_PWD_TOKEN }}
|
||||
GPG_KEY: ${{ secrets.GPG_KEY }}
|
||||
|
|
|
|||
|
|
@ -6,17 +6,16 @@
|
|||
package io.dapr.it.state;
|
||||
|
||||
import io.dapr.client.DaprClient;
|
||||
import io.dapr.client.DaprClientBuilder;
|
||||
import io.dapr.client.domain.State;
|
||||
import io.dapr.client.domain.StateOptions;
|
||||
import io.dapr.client.domain.TransactionalStateOperation;
|
||||
import io.dapr.it.BaseIT;
|
||||
import io.dapr.it.DaprRun;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
|
|
@ -485,6 +484,113 @@ public abstract class AbstractStateClientIT extends BaseIT {
|
|||
Assert.assertEquals("data in property B2", myLastDataResponse.getValue().getPropertyB());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void saveVerifyAndDeleteTransactionalStateString() {
|
||||
|
||||
//create dapr client
|
||||
DaprClient daprClient = buildDaprClient();
|
||||
//The key use to store the state
|
||||
final String stateKey = "myTKey";
|
||||
|
||||
//creation of a dummy data
|
||||
String data = "my state 3";
|
||||
|
||||
TransactionalStateOperation<String> operation = createTransactionalStateOperation(
|
||||
TransactionalStateOperation.OperationType.UPSERT,
|
||||
createState(stateKey, null, null, data));
|
||||
|
||||
//create of the deferred call to DAPR to execute the transaction
|
||||
Mono<Void> saveResponse = daprClient.executeTransaction(STATE_STORE_NAME, Collections.singletonList(operation));
|
||||
//execute the save action
|
||||
saveResponse.block();
|
||||
|
||||
//create of the deferred call to DAPR to get the state
|
||||
Mono<State<String>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey), String.class);
|
||||
|
||||
//retrieve the state
|
||||
State<String> myDataResponse = response.block();
|
||||
|
||||
//Assert that the response is the correct one
|
||||
Assert.assertNotNull(myDataResponse.getEtag());
|
||||
Assert.assertNotNull(myDataResponse.getKey());
|
||||
Assert.assertNotNull(myDataResponse.getValue());
|
||||
Assert.assertEquals("my state 3", myDataResponse.getValue());
|
||||
operation = createTransactionalStateOperation(
|
||||
TransactionalStateOperation.OperationType.DELETE,
|
||||
createState(stateKey, null, null, data));
|
||||
//create of the deferred call to DAPR to execute the transaction
|
||||
Mono<Void> deleteResponse = daprClient.executeTransaction(STATE_STORE_NAME, Collections.singletonList(operation));
|
||||
//execute the delete action
|
||||
deleteResponse.block();
|
||||
|
||||
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey), String.class);
|
||||
State<String> deletedData = response.block();
|
||||
|
||||
//Review that the response is null, because the state was deleted
|
||||
Assert.assertNull(deletedData.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void saveVerifyAndDeleteTransactionalState() {
|
||||
|
||||
//create dapr client
|
||||
DaprClient daprClient = buildDaprClient();
|
||||
//The key use to store the state
|
||||
final String stateKey = "myTKey";
|
||||
|
||||
//creation of a dummy data
|
||||
MyData data = new MyData();
|
||||
data.setPropertyA("data in property AA");
|
||||
data.setPropertyB("data in property BA");
|
||||
|
||||
TransactionalStateOperation<MyData> operation = createTransactionalStateOperation(
|
||||
TransactionalStateOperation.OperationType.UPSERT,
|
||||
createState(stateKey, null, null, data));
|
||||
|
||||
Assert.assertNotNull(daprClient);
|
||||
//create of the deferred call to DAPR to execute the transaction
|
||||
Mono<Void> saveResponse = daprClient.executeTransaction(STATE_STORE_NAME, Collections.singletonList(operation));
|
||||
//execute the save action
|
||||
saveResponse.block();
|
||||
|
||||
//create of the deferred call to DAPR to get the state
|
||||
Mono<State<MyData>> response = daprClient.getState(STATE_STORE_NAME, new State(stateKey), MyData.class);
|
||||
|
||||
//retrieve the state
|
||||
State<MyData> myDataResponse = response.block();
|
||||
|
||||
//Assert that the response is the correct one
|
||||
Assert.assertNotNull(myDataResponse.getEtag());
|
||||
Assert.assertNotNull(myDataResponse.getKey());
|
||||
Assert.assertNotNull(myDataResponse.getValue());
|
||||
Assert.assertEquals("data in property AA", myDataResponse.getValue().getPropertyA());
|
||||
Assert.assertEquals("data in property BA", myDataResponse.getValue().getPropertyB());
|
||||
|
||||
operation = createTransactionalStateOperation(
|
||||
TransactionalStateOperation.OperationType.DELETE,
|
||||
createState(stateKey, null, null, data));
|
||||
//create of the deferred call to DAPR to execute the transaction
|
||||
Mono<Void> deleteResponse = daprClient.executeTransaction(STATE_STORE_NAME, Collections.singletonList(operation));
|
||||
//execute the delete action
|
||||
deleteResponse.block();
|
||||
|
||||
response = daprClient.getState(STATE_STORE_NAME, new State(stateKey), MyData.class);
|
||||
State<MyData> deletedData = response.block();
|
||||
|
||||
//Review that the response is null, because the state was deleted
|
||||
Assert.assertNull(deletedData.getValue());
|
||||
}
|
||||
|
||||
private <T> TransactionalStateOperation<T> createTransactionalStateOperation(
|
||||
TransactionalStateOperation.OperationType type,
|
||||
State<T> state) {
|
||||
return new TransactionalStateOperation<>(type, state);
|
||||
}
|
||||
|
||||
private <T> State<T> createState(String stateKey, String etag, StateOptions options, T data) {
|
||||
return new State<>(data, stateKey, etag, options);
|
||||
}
|
||||
|
||||
protected abstract DaprClient buildDaprClient();
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,10 +38,13 @@ public class GRPCStateClientIT extends AbstractStateClientIT {
|
|||
public static void tearDown() throws IOException {
|
||||
daprClient.close();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected DaprClient buildDaprClient() {
|
||||
return daprClient;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,7 +30,6 @@ public class HttpStateClientIT extends AbstractStateClientIT {
|
|||
daprRun = startDaprApp(HttpStateClientIT.class.getSimpleName(), 5000);
|
||||
daprRun.switchToHTTP();
|
||||
daprClient = new DaprClientBuilder().build();
|
||||
|
||||
assertTrue(daprClient instanceof DaprClientHttp);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ import com.google.protobuf.Any;
|
|||
import com.google.protobuf.ByteString;
|
||||
import io.dapr.client.domain.DeleteStateRequest;
|
||||
import io.dapr.client.domain.DeleteStateRequestBuilder;
|
||||
import io.dapr.client.domain.ExecuteStateTransactionRequest;
|
||||
import io.dapr.client.domain.ExecuteStateTransactionRequestBuilder;
|
||||
import io.dapr.client.domain.GetSecretRequest;
|
||||
import io.dapr.client.domain.GetSecretRequestBuilder;
|
||||
import io.dapr.client.domain.GetStateRequest;
|
||||
|
|
@ -25,6 +27,7 @@ import io.dapr.client.domain.SaveStateRequest;
|
|||
import io.dapr.client.domain.SaveStateRequestBuilder;
|
||||
import io.dapr.client.domain.State;
|
||||
import io.dapr.client.domain.StateOptions;
|
||||
import io.dapr.client.domain.TransactionalStateOperation;
|
||||
import io.dapr.serializer.DaprObjectSerializer;
|
||||
import io.dapr.utils.TypeRef;
|
||||
import io.dapr.v1.CommonProtos;
|
||||
|
|
@ -36,6 +39,7 @@ import java.util.Arrays;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Abstract class with convenient methods common between client implementations.
|
||||
|
|
@ -319,6 +323,18 @@ abstract class AbstractDaprClient implements DaprClient {
|
|||
return this.getStates(stateStoreName, keys, TypeRef.get(clazz));
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Mono<Void> executeTransaction(String stateStoreName,
|
||||
List<TransactionalStateOperation<?>> operations) {
|
||||
ExecuteStateTransactionRequest request = new ExecuteStateTransactionRequestBuilder(stateStoreName)
|
||||
.withTransactionalStates(operations)
|
||||
.build();
|
||||
return executeTransaction(request).then();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@
|
|||
package io.dapr.client;
|
||||
|
||||
import io.dapr.client.domain.DeleteStateRequest;
|
||||
import io.dapr.client.domain.ExecuteStateTransactionRequest;
|
||||
import io.dapr.client.domain.GetSecretRequest;
|
||||
import io.dapr.client.domain.GetStateRequest;
|
||||
import io.dapr.client.domain.GetStatesRequest;
|
||||
|
|
@ -17,6 +18,7 @@ import io.dapr.client.domain.Response;
|
|||
import io.dapr.client.domain.SaveStateRequest;
|
||||
import io.dapr.client.domain.State;
|
||||
import io.dapr.client.domain.StateOptions;
|
||||
import io.dapr.client.domain.TransactionalStateOperation;
|
||||
import io.dapr.utils.TypeRef;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
|
|
@ -405,6 +407,23 @@ public interface DaprClient extends Closeable {
|
|||
*/
|
||||
<T> Mono<Response<List<State<T>>>> getStates(GetStatesRequest request, TypeRef<T> type);
|
||||
|
||||
/** Execute a transaction.
|
||||
*
|
||||
* @param stateStoreName The name of the state store.
|
||||
* @param operations The operations to be performed.
|
||||
* @return a Mono plan of type Void
|
||||
*/
|
||||
Mono<Void> executeTransaction(String stateStoreName,
|
||||
List<TransactionalStateOperation<?>> operations);
|
||||
|
||||
|
||||
/** Execute a transaction.
|
||||
*
|
||||
* @param request Request to execute transaction.
|
||||
* @return a Mono plan of type Response Void
|
||||
*/
|
||||
Mono<Response<Void>> executeTransaction(ExecuteStateTransactionRequest request);
|
||||
|
||||
/**
|
||||
* Save/Update a list of states.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -61,8 +61,8 @@ public class DaprClientBuilder {
|
|||
throw new IllegalArgumentException("Object serializer is required");
|
||||
}
|
||||
|
||||
if (objectSerializer.getContentType() == null || objectSerializer.getContentType().isBlank()) {
|
||||
throw new IllegalArgumentException("Content Type should not be null or blank");
|
||||
if (objectSerializer.getContentType() == null || objectSerializer.getContentType().isEmpty()) {
|
||||
throw new IllegalArgumentException("Content Type should not be null or empty");
|
||||
}
|
||||
|
||||
this.objectSerializer = objectSerializer;
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import com.google.protobuf.Any;
|
|||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Empty;
|
||||
import io.dapr.client.domain.DeleteStateRequest;
|
||||
import io.dapr.client.domain.ExecuteStateTransactionRequest;
|
||||
import io.dapr.client.domain.GetSecretRequest;
|
||||
import io.dapr.client.domain.GetStateRequest;
|
||||
import io.dapr.client.domain.GetStatesRequest;
|
||||
|
|
@ -22,6 +23,7 @@ import io.dapr.client.domain.Response;
|
|||
import io.dapr.client.domain.SaveStateRequest;
|
||||
import io.dapr.client.domain.State;
|
||||
import io.dapr.client.domain.StateOptions;
|
||||
import io.dapr.client.domain.TransactionalStateOperation;
|
||||
import io.dapr.config.Properties;
|
||||
import io.dapr.serializer.DaprObjectSerializer;
|
||||
import io.dapr.utils.TypeRef;
|
||||
|
|
@ -337,6 +339,47 @@ public class DaprClientGrpc extends AbstractDaprClient {
|
|||
return new State<>(value, key, etag, stateOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Mono<Response<Void>> executeTransaction(ExecuteStateTransactionRequest request) {
|
||||
try {
|
||||
final String stateStoreName = request.getStateStoreName();
|
||||
final List<TransactionalStateOperation<?>> operations = request.getOperations();
|
||||
final Map<String, String> metadata = request.getMetadata();
|
||||
final Context context = request.getContext();
|
||||
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
|
||||
throw new IllegalArgumentException("State store name cannot be null or empty.");
|
||||
}
|
||||
DaprProtos.ExecuteStateTransactionRequest.Builder builder = DaprProtos.ExecuteStateTransactionRequest
|
||||
.newBuilder();
|
||||
builder.setStoreName(stateStoreName);
|
||||
if (metadata != null) {
|
||||
builder.putAllMetadata(metadata);
|
||||
}
|
||||
for (TransactionalStateOperation operation: operations) {
|
||||
DaprProtos.TransactionalStateOperation.Builder operationBuilder = DaprProtos.TransactionalStateOperation
|
||||
.newBuilder();
|
||||
operationBuilder.setOperationType(operation.getOperation().toString().toLowerCase());
|
||||
operationBuilder.setRequest(buildStateRequest(operation.getRequest()).build());
|
||||
builder.addOperations(operationBuilder.build());
|
||||
}
|
||||
DaprProtos.ExecuteStateTransactionRequest req = builder.build();
|
||||
|
||||
return Mono.fromCallable(wrap(context, () -> client.executeStateTransaction(req))).flatMap(f -> {
|
||||
try {
|
||||
f.get();
|
||||
} catch (Exception e) {
|
||||
return Mono.error(e);
|
||||
}
|
||||
return Mono.empty();
|
||||
}).thenReturn(new Response<Void>(context, null));
|
||||
} catch (IOException e) {
|
||||
return Mono.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ package io.dapr.client;
|
|||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.base.Strings;
|
||||
import io.dapr.client.domain.DeleteStateRequest;
|
||||
import io.dapr.client.domain.ExecuteStateTransactionRequest;
|
||||
import io.dapr.client.domain.GetSecretRequest;
|
||||
import io.dapr.client.domain.GetStateRequest;
|
||||
import io.dapr.client.domain.GetStatesRequest;
|
||||
|
|
@ -19,6 +20,8 @@ import io.dapr.client.domain.Response;
|
|||
import io.dapr.client.domain.SaveStateRequest;
|
||||
import io.dapr.client.domain.State;
|
||||
import io.dapr.client.domain.StateOptions;
|
||||
import io.dapr.client.domain.TransactionalStateOperation;
|
||||
import io.dapr.client.domain.TransactionalStateRequest;
|
||||
import io.dapr.config.Properties;
|
||||
import io.dapr.serializer.DaprObjectSerializer;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
|
|
@ -43,7 +46,6 @@ import java.util.Optional;
|
|||
* @see io.dapr.client.DaprClient
|
||||
*/
|
||||
public class DaprClientHttp extends AbstractDaprClient {
|
||||
|
||||
/**
|
||||
* Header for the conditional operation.
|
||||
*/
|
||||
|
|
@ -74,6 +76,11 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
*/
|
||||
public static final String STATE_PATH = DaprHttp.API_VERSION + "/state";
|
||||
|
||||
/**
|
||||
* String format for transaction API.
|
||||
*/
|
||||
private static final String TRANSACTION_URL_FORMAT = STATE_PATH + "/%s/transaction";
|
||||
|
||||
/**
|
||||
* Secrets Path.
|
||||
*/
|
||||
|
|
@ -360,6 +367,62 @@ public class DaprClientHttp extends AbstractDaprClient {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Mono<Response<Void>> executeTransaction(ExecuteStateTransactionRequest request) {
|
||||
try {
|
||||
final String stateStoreName = request.getStateStoreName();
|
||||
final List<TransactionalStateOperation<?>> operations = request.getOperations();
|
||||
final Map<String, String> metadata = request.getMetadata();
|
||||
final Context context = request.getContext();
|
||||
if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
|
||||
throw new IllegalArgumentException("State store name cannot be null or empty.");
|
||||
}
|
||||
if (operations == null || operations.isEmpty()) {
|
||||
return Mono.empty();
|
||||
}
|
||||
final Map<String, String> headers = new HashMap<>();
|
||||
final String etag = operations.stream()
|
||||
.filter(op -> null != op.getRequest().getEtag() && !op.getRequest().getEtag().trim().isEmpty())
|
||||
.findFirst()
|
||||
.orElse(new TransactionalStateOperation<>(null, new State<>(null,null, null, null)))
|
||||
.getRequest()
|
||||
.getEtag();
|
||||
if (etag != null && !etag.trim().isEmpty()) {
|
||||
headers.put(HEADER_HTTP_ETAG_ID, etag);
|
||||
}
|
||||
final String url = String.format(TRANSACTION_URL_FORMAT, stateStoreName);
|
||||
List<TransactionalStateOperation<Object>> internalOperationObjects = new ArrayList<>(operations.size());
|
||||
for (TransactionalStateOperation operation : operations) {
|
||||
State<?> state = operation.getRequest();
|
||||
if (state == null) {
|
||||
continue;
|
||||
}
|
||||
if (this.isStateSerializerDefault) {
|
||||
// If default serializer is being used, we just pass the object through to be serialized directly.
|
||||
// This avoids a JSON object from being quoted inside a string.
|
||||
// We WANT this: { "value" : { "myField" : 123 } }
|
||||
// We DON't WANT this: { "value" : "{ \"myField\" : 123 }" }
|
||||
internalOperationObjects.add(operation);
|
||||
continue;
|
||||
}
|
||||
byte[] data = this.stateSerializer.serialize(state.getValue());
|
||||
// Custom serializer, so everything is byte[].
|
||||
operations.add(new TransactionalStateOperation<>(operation.getOperation(),
|
||||
new State<>(data, state.getKey(), state.getEtag(), state.getOptions())));
|
||||
}
|
||||
TransactionalStateRequest<Object> req = new TransactionalStateRequest<>(internalOperationObjects, metadata);
|
||||
byte[] serializedOperationBody = INTERNAL_SERIALIZER.serialize(req);
|
||||
return this.client.invokeApi(
|
||||
DaprHttp.HttpMethods.POST.name(), url, null, serializedOperationBody, headers, context)
|
||||
.thenReturn(new Response<>(context, null));
|
||||
} catch (IOException e) {
|
||||
return Mono.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.client.domain;
|
||||
|
||||
import io.grpc.Context;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ExecuteStateTransactionRequest {
|
||||
|
||||
/**
|
||||
* Name of the state store.
|
||||
*/
|
||||
private final String stateStoreName;
|
||||
|
||||
/**
|
||||
* Transactional operations list.
|
||||
*/
|
||||
private final List<TransactionalStateOperation<?>> operations;
|
||||
|
||||
/**
|
||||
* Metadata used for transactional operations.
|
||||
*/
|
||||
private final Map<String, String> metadata;
|
||||
|
||||
/**
|
||||
* Context to be passed on in the call.
|
||||
*/
|
||||
private final Context context;
|
||||
|
||||
ExecuteStateTransactionRequest(String stateStoreName,
|
||||
List<TransactionalStateOperation<?>> operations,
|
||||
Map<String, String> metadata,
|
||||
Context context) {
|
||||
this.stateStoreName = stateStoreName;
|
||||
this.operations = operations;
|
||||
this.metadata = metadata;
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
public String getStateStoreName() {
|
||||
return stateStoreName;
|
||||
}
|
||||
|
||||
public List<TransactionalStateOperation<?>> getOperations() {
|
||||
return operations;
|
||||
}
|
||||
|
||||
public Map<String, String> getMetadata() {
|
||||
return metadata;
|
||||
}
|
||||
|
||||
public Context getContext() {
|
||||
return context;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.client.domain;
|
||||
|
||||
import io.grpc.Context;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public final class ExecuteStateTransactionRequestBuilder {
|
||||
private final String storeName;
|
||||
private List<TransactionalStateOperation<?>> transactionalStates;
|
||||
private Map<String, String> metadata;
|
||||
private Context context;
|
||||
|
||||
public ExecuteStateTransactionRequestBuilder(String storeName) {
|
||||
this.storeName = storeName;
|
||||
}
|
||||
|
||||
public ExecuteStateTransactionRequestBuilder withTransactionalStates(
|
||||
TransactionalStateOperation<?>... transactionalStates) {
|
||||
this.transactionalStates = Collections.unmodifiableList(Arrays.asList(transactionalStates));
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExecuteStateTransactionRequestBuilder withTransactionalStates(
|
||||
List<TransactionalStateOperation<?>> transactionalStates) {
|
||||
this.transactionalStates = transactionalStates == null ? null : Collections.unmodifiableList(transactionalStates);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExecuteStateTransactionRequestBuilder withMetadata(Map<String, String> metadata) {
|
||||
this.metadata = Collections.unmodifiableMap(metadata);
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExecuteStateTransactionRequestBuilder withContext(Context context) {
|
||||
this.context = context;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ExecuteStateTransactionRequest build() {
|
||||
return new ExecuteStateTransactionRequest(storeName, transactionalStates, metadata, context);
|
||||
}
|
||||
}
|
||||
|
|
@ -215,7 +215,7 @@ public class State<T> {
|
|||
+ ", key='" + key + "'"
|
||||
+ ", etag='" + etag + "'"
|
||||
+ ", etag='" + error + "'"
|
||||
+ ", options={'" + options.toString() + "}"
|
||||
+ ", options={'" + (options != null ? options.toString() : null) + "}"
|
||||
+ "}";
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.client.domain;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class TransactionalStateOperation<T> {
|
||||
|
||||
/**
|
||||
* The type of operation to be executed.
|
||||
*/
|
||||
private final OperationType operation;
|
||||
|
||||
/**
|
||||
* State values to be operated on.
|
||||
*/
|
||||
private final State<T> request;
|
||||
|
||||
/**
|
||||
* Construct an immutable transactional state operation object.
|
||||
* @param operationType The type of operation done.
|
||||
* @param state The required state.
|
||||
*/
|
||||
public TransactionalStateOperation(OperationType operationType, State<T> state) {
|
||||
this.operation = operationType;
|
||||
this.request = state;
|
||||
}
|
||||
|
||||
public OperationType getOperation() {
|
||||
return operation;
|
||||
}
|
||||
|
||||
public State<T> getRequest() {
|
||||
return request;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
TransactionalStateOperation<?> that = (TransactionalStateOperation<?>) o;
|
||||
return operation.equals(that.operation)
|
||||
&& request.equals(that.request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(operation, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TransactionalStateOperation{"
|
||||
+ "operationType='" + operation + '\''
|
||||
+ ", state=" + request
|
||||
+ '}';
|
||||
}
|
||||
|
||||
public static enum OperationType {
|
||||
@JsonProperty("upsert") UPSERT,
|
||||
@JsonProperty("delete") DELETE;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright (c) Microsoft Corporation.
|
||||
* Licensed under the MIT License.
|
||||
*/
|
||||
|
||||
package io.dapr.client.domain;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class TransactionalStateRequest<T> {
|
||||
|
||||
/**
|
||||
* Transactional operations list.
|
||||
*/
|
||||
private final List<TransactionalStateOperation<T>> operations;
|
||||
|
||||
/**
|
||||
* Metadata used for transactional operations.
|
||||
*/
|
||||
private final Map<String, String> metadata;
|
||||
|
||||
/**
|
||||
* Constructor to create immutable transactional state request object.
|
||||
* @param operations List of operations to be performed.
|
||||
* @param metadata Metadata used for transactional operations.
|
||||
*/
|
||||
public TransactionalStateRequest(List<TransactionalStateOperation<T>> operations, Map<String, String> metadata) {
|
||||
this.operations = operations;
|
||||
this.metadata = metadata;
|
||||
}
|
||||
|
||||
public List<TransactionalStateOperation<T>> getOperations() {
|
||||
return Collections.unmodifiableList(operations);
|
||||
}
|
||||
|
||||
public Map<String, String> getMetadata() {
|
||||
return metadata;
|
||||
}
|
||||
}
|
||||
|
|
@ -10,7 +10,6 @@ import com.google.common.util.concurrent.SettableFuture;
|
|||
import com.google.protobuf.Any;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.Empty;
|
||||
|
||||
import io.dapr.client.domain.DeleteStateRequest;
|
||||
import io.dapr.client.domain.DeleteStateRequestBuilder;
|
||||
import io.dapr.client.domain.GetStateRequest;
|
||||
|
|
@ -19,6 +18,7 @@ import io.dapr.client.domain.HttpExtension;
|
|||
import io.dapr.client.domain.Response;
|
||||
import io.dapr.client.domain.State;
|
||||
import io.dapr.client.domain.StateOptions;
|
||||
import io.dapr.client.domain.TransactionalStateOperation;
|
||||
import io.dapr.serializer.DefaultObjectSerializer;
|
||||
import io.dapr.utils.TypeRef;
|
||||
import io.dapr.v1.CommonProtos;
|
||||
|
|
@ -1068,6 +1068,68 @@ public class DaprClientGrpcTest {
|
|||
assertTrue(callback.wasCalled);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void executeTransactionTest() {
|
||||
String etag = "ETag1";
|
||||
String key = "key1";
|
||||
String data = "my data";
|
||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null);
|
||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
||||
MockCallback<Empty> callback = new MockCallback<>(Empty.newBuilder().build());
|
||||
addCallback(settableFuture, callback, directExecutor());
|
||||
when(client.executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class)))
|
||||
.thenReturn(settableFuture);
|
||||
State<String> stateKey = buildStateKey(data, key, etag, options);
|
||||
TransactionalStateOperation<String> upsertOperation = new TransactionalStateOperation<>(
|
||||
TransactionalStateOperation.OperationType.UPSERT,
|
||||
stateKey);
|
||||
TransactionalStateOperation<String> deleteOperation = new TransactionalStateOperation<>(
|
||||
TransactionalStateOperation.OperationType.DELETE,
|
||||
new State<>("testKey")
|
||||
);
|
||||
Mono<Void> result = adapter.executeTransaction(STATE_STORE_NAME, Arrays.asList(upsertOperation, deleteOperation));
|
||||
settableFuture.set(Empty.newBuilder().build());
|
||||
result.block();
|
||||
assertTrue(callback.wasCalled);
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void executeTransactionExceptionThrownTest() {
|
||||
String etag = "ETag1";
|
||||
String key = "key1";
|
||||
String data = "my data";
|
||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null);
|
||||
when(client.executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class)))
|
||||
.thenThrow(RuntimeException.class);
|
||||
State<String> stateKey = buildStateKey(data, key, etag, options);
|
||||
TransactionalStateOperation<String> operation = new TransactionalStateOperation<>(
|
||||
TransactionalStateOperation.OperationType.UPSERT,
|
||||
stateKey);
|
||||
Mono<Void> result = adapter.executeTransaction(STATE_STORE_NAME, Collections.singletonList(operation));
|
||||
result.block();
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void executeTransactionCallbackExceptionTest() {
|
||||
String etag = "ETag1";
|
||||
String key = "key1";
|
||||
String data = "my data";
|
||||
StateOptions options = buildStateOptions(StateOptions.Consistency.STRONG, null);
|
||||
SettableFuture<Empty> settableFuture = SettableFuture.create();
|
||||
RuntimeException ex = new RuntimeException("ex");
|
||||
MockCallback<Empty> callback = new MockCallback<>(ex);
|
||||
addCallback(settableFuture, callback, directExecutor());
|
||||
when(client.executeStateTransaction(any(DaprProtos.ExecuteStateTransactionRequest.class)))
|
||||
.thenReturn(settableFuture);
|
||||
State<String> stateKey = buildStateKey(data, key, etag, options);
|
||||
TransactionalStateOperation<String> operation = new TransactionalStateOperation<>(
|
||||
TransactionalStateOperation.OperationType.UPSERT,
|
||||
stateKey);
|
||||
Mono<Void> result = adapter.executeTransaction(STATE_STORE_NAME, Collections.singletonList(operation));
|
||||
settableFuture.setException(ex);
|
||||
result.block();
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void saveStateExceptionThrownTest() {
|
||||
String key = "key1";
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import io.dapr.client.domain.HttpExtension;
|
|||
import io.dapr.client.domain.Response;
|
||||
import io.dapr.client.domain.State;
|
||||
import io.dapr.client.domain.StateOptions;
|
||||
import io.dapr.client.domain.TransactionalStateOperation;
|
||||
import io.dapr.config.Properties;
|
||||
import io.dapr.utils.TypeRef;
|
||||
import okhttp3.OkHttpClient;
|
||||
|
|
@ -27,7 +28,6 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
|
@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
|||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class DaprClientHttpTest {
|
||||
|
||||
|
||||
private static final String STATE_STORE_NAME = "MyStateStore";
|
||||
|
||||
private static final String SECRET_STORE_NAME = "MySecretStore";
|
||||
|
|
@ -587,7 +587,6 @@ public class DaprClientHttpTest {
|
|||
|
||||
@Test
|
||||
public void saveStatesNull() {
|
||||
State<String> stateKeyValue = new State("value", "key", "", null);
|
||||
List<State<?>> stateKeyValueList = new ArrayList();
|
||||
mockInterceptor.addRule()
|
||||
.post("http://127.0.0.1:3000/v1.0/state/MyStateStore")
|
||||
|
|
@ -650,6 +649,55 @@ public class DaprClientHttpTest {
|
|||
// No exception should be thrown because we did not call block() on the mono above.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void simpleExecuteTransaction() {
|
||||
mockInterceptor.addRule()
|
||||
.post("http://127.0.0.1:3000/v1.0/state/MyStateStore/transaction")
|
||||
.respond(EXPECTED_RESULT);
|
||||
String etag = "ETag1";
|
||||
String key = "key1";
|
||||
String data = "my data";
|
||||
StateOptions stateOptions = mock(StateOptions.class);
|
||||
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
|
||||
daprClientHttp = new DaprClientHttp(daprHttp);
|
||||
|
||||
State<String> stateKey = new State(data, key, etag, stateOptions);
|
||||
TransactionalStateOperation<String> upsertOperation = new TransactionalStateOperation<>(
|
||||
TransactionalStateOperation.OperationType.UPSERT,
|
||||
stateKey);
|
||||
TransactionalStateOperation<String> deleteOperation = new TransactionalStateOperation<>(
|
||||
TransactionalStateOperation.OperationType.DELETE,
|
||||
new State<>("deleteKey"));
|
||||
Mono<Void> mono = daprClientHttp.executeTransaction(STATE_STORE_NAME, Arrays.asList(upsertOperation,
|
||||
deleteOperation));
|
||||
assertNull(mono.block());
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void executeTransactionNullStateStoreName() {
|
||||
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
|
||||
daprClientHttp = new DaprClientHttp(daprHttp);
|
||||
Mono<Void> mono = daprClientHttp.executeTransaction(null, null);
|
||||
assertNull(mono.block());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void simpleExecuteTransactionNull() {
|
||||
mockInterceptor.addRule()
|
||||
.post("http://127.0.0.1:3000/v1.0/state/MyStateStore/transaction")
|
||||
.respond(EXPECTED_RESULT);
|
||||
daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, okHttpClient);
|
||||
daprClientHttp = new DaprClientHttp(daprHttp);
|
||||
Mono<Void> mono = daprClientHttp.executeTransaction(STATE_STORE_NAME, null);
|
||||
assertNull(mono.block());
|
||||
mono = daprClientHttp.executeTransaction(STATE_STORE_NAME, Collections.emptyList());
|
||||
assertNull(mono.block());
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
*/
|
||||
|
||||
@Test
|
||||
public void deleteState() {
|
||||
StateOptions stateOptions = mock(StateOptions.class);
|
||||
|
|
|
|||
Loading…
Reference in New Issue