diff --git a/examples/pom.xml b/examples/pom.xml
index a918ddb28..08f79158a 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -7,12 +7,12 @@
io.dapr
dapr-sdk-parent
- 0.3.0-preview01
+ 0.2.0-preview01
dapr-sdk-examples
jar
- 0.3.0-preview01
+ 0.2.0-preview01
dapr-sdk-examples
@@ -62,7 +62,7 @@
io.dapr
dapr-sdk
- 0.3.0-preview01
+ 0.2.0-preview01
diff --git a/pom.xml b/pom.xml
index 88a1a2209..aae62bcc6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
io.dapr
dapr-sdk-parent
pom
- 0.3.0-preview01
+ 0.2.0-preview01
dapr-sdk-parent
SDK for Dapr.
https://dapr.io
@@ -77,6 +77,7 @@
+ sdk-autogen
sdk
examples
diff --git a/sdk-autogen/pom.xml b/sdk-autogen/pom.xml
new file mode 100644
index 000000000..399a36cb4
--- /dev/null
+++ b/sdk-autogen/pom.xml
@@ -0,0 +1,113 @@
+
+ 4.0.0
+
+
+ io.dapr
+ dapr-sdk-parent
+ 0.2.0-preview01
+
+
+ dapr-sdk-autogen
+ jar
+ 0.2.0-preview01
+ dapr-sdk-autogen
+ Auto-generated SDK for Dapr
+
+
+ ${project.build.directory}/generated-sources
+ ${project.parent.basedir}/proto
+
+
+
+
+ javax.annotation
+ javax.annotation-api
+ provided
+
+
+ io.grpc
+ grpc-netty-shaded
+ runtime
+
+
+ io.grpc
+ grpc-protobuf
+
+
+ io.grpc
+ grpc-stub
+
+
+ io.grpc
+ grpc-testing
+ test
+
+
+ com.google.protobuf
+ protobuf-java-util
+ ${protobuf.version}
+
+
+ com.github.os72
+ protoc-jar-maven-plugin
+ 3.10.1
+
+
+
+
+
+
+ com.github.os72
+ protoc-jar-maven-plugin
+ 3.10.1
+
+
+ generate-sources
+
+ run
+
+
+ ${protobuf.version}
+ inputs
+ direct
+ true
+
+ ${protobuf.input.directory}/dapr
+ ${protobuf.input.directory}/daprclient
+
+
+
+ java
+ ${protobuf.output.directory}
+
+
+ grpc-java
+ ${protobuf.output.directory}
+ io.grpc:protoc-gen-grpc-java:${grpc.version}
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.2.0
+
+
+ attach-sources
+
+ jar-no-fork
+
+
+
+
+
+
+
+
diff --git a/sdk-autogen/src/main/java/.keepme b/sdk-autogen/src/main/java/.keepme
new file mode 100644
index 000000000..e69de29bb
diff --git a/sdk-autogen/src/test/java/.keepme b/sdk-autogen/src/test/java/.keepme
new file mode 100644
index 000000000..e69de29bb
diff --git a/sdk/pom.xml b/sdk/pom.xml
index 6ef230cdd..a55bc782b 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -7,21 +7,21 @@
io.dapr
dapr-sdk-parent
- 0.3.0-preview01
+ 0.2.0-preview01
dapr-sdk
jar
- 0.3.0-preview01
+ 0.2.0-preview01
dapr-sdk
SDK for Dapr
-
- ${project.build.directory}/generated-sources
- ${project.parent.basedir}/proto
-
-
+
+ io.dapr
+ dapr-sdk-autogen
+ 0.2.0-preview01
+
com.fasterxml.jackson.core
jackson-databind
@@ -37,34 +37,11 @@
okhttp
4.2.1
-
- io.grpc
- grpc-netty-shaded
- runtime
-
-
- io.grpc
- grpc-protobuf
-
-
- io.grpc
- grpc-stub
-
javax.annotation
javax.annotation-api
provided
-
- io.grpc
- grpc-testing
- test
-
-
- com.google.protobuf
- protobuf-java-util
- ${protobuf.version}
-
junit
junit
@@ -75,50 +52,10 @@
mockito-core
test
-
- com.github.os72
- protoc-jar-maven-plugin
- 3.10.1
-
-
- com.github.os72
- protoc-jar-maven-plugin
- 3.10.1
-
-
- generate-sources
-
- run
-
-
- ${protobuf.version}
- inputs
- direct
- true
-
- ${protobuf.input.directory}/dapr
- ${protobuf.input.directory}/daprclient
-
-
-
- java
- ${protobuf.output.directory}
-
-
- grpc-java
- ${protobuf.output.directory}
- io.grpc:protoc-gen-grpc-java:${grpc.version}
-
-
-
-
-
-
-
org.apache.maven.plugins
maven-source-plugin
diff --git a/sdk/src/main/java/io/dapr/actors/AbstractDaprClient.java b/sdk/src/main/java/io/dapr/actors/AbstractDaprClient.java
index 10ffb5f26..e94b8dc0c 100644
--- a/sdk/src/main/java/io/dapr/actors/AbstractDaprClient.java
+++ b/sdk/src/main/java/io/dapr/actors/AbstractDaprClient.java
@@ -152,20 +152,20 @@ public abstract class AbstractDaprClient {
public interface DaprHttpCallback {
/**
- * called when the server response was not 2xx or when an exception was
+ * Called when the server response was not 2xx or when an exception was
* thrown in the process
*
- * @param response - in case of server error (4xx, 5xx) this contains the
+ * @param call - in case of server error (4xx, 5xx) this contains the
* server response in case of IO exception this is null
- * @param throwable - contains the exception. in case of server error (4xx,
+ * @param e - contains the exception. in case of server error (4xx,
* 5xx) this is null
*/
public void onFailure(Call call, Exception e);
/**
- * contains the server response
+ * Contains the server response
*
- * @param response
+ * @param response Success response.
*/
public void onSuccess(String response);
}
diff --git a/sdk/src/main/java/io/dapr/actors/runtime/ActorRuntime.java b/sdk/src/main/java/io/dapr/actors/runtime/ActorRuntime.java
index acc56190d..180b8acd1 100644
--- a/sdk/src/main/java/io/dapr/actors/runtime/ActorRuntime.java
+++ b/sdk/src/main/java/io/dapr/actors/runtime/ActorRuntime.java
@@ -78,6 +78,7 @@ public class ActorRuntime {
* Registers an actor with the runtime.
*
* @param clazz The type of actor.
+ * @param Actor class type.
*/
public void RegisterActor(Class clazz) {
RegisterActor(clazz, null);
@@ -88,6 +89,7 @@ public class ActorRuntime {
*
* @param clazz The type of actor.
* @param actorFactory An optional factory to create actors.
+ * @param Actor class type.
* This can be used for dependency injection into actors.
*/
public void RegisterActor(Class clazz, ActorFactory actorFactory) {
@@ -95,7 +97,7 @@ public class ActorRuntime {
ActorFactory actualActorFactory = actorFactory != null ? actorFactory : new DefaultActorFactory(actorTypeInfo);
// TODO: Refactor into a Builder class.
- DaprStateAsyncProvider stateProvider = new DaprStateAsyncProvider(this.appToDaprAsyncClient, new ActorStateProviderSerializer());
+ DaprStateAsyncProvider stateProvider = new DaprStateAsyncProvider(this.appToDaprAsyncClient, new ActorStateSerializer());
ActorService actorService = new ActorServiceImpl(actorTypeInfo, stateProvider, actualActorFactory);
// Create ActorManagers, override existing entry if registered again.
diff --git a/sdk/src/main/java/io/dapr/actors/runtime/ActorStateChange.java b/sdk/src/main/java/io/dapr/actors/runtime/ActorStateChange.java
index 799a9cdb4..5430ebbef 100644
--- a/sdk/src/main/java/io/dapr/actors/runtime/ActorStateChange.java
+++ b/sdk/src/main/java/io/dapr/actors/runtime/ActorStateChange.java
@@ -5,8 +5,6 @@
package io.dapr.actors.runtime;
-import java.io.IOException;
-
/**
* Represents a state change for an actor.
* @param Type of the value being changed.
diff --git a/sdk/src/main/java/io/dapr/actors/runtime/ActorStateProviderSerializer.java b/sdk/src/main/java/io/dapr/actors/runtime/ActorStateProviderSerializer.java
deleted file mode 100644
index 75d940d3d..000000000
--- a/sdk/src/main/java/io/dapr/actors/runtime/ActorStateProviderSerializer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) Microsoft Corporation.
- * Licensed under the MIT License.
- */
-package io.dapr.actors.runtime;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-
-/**
- * Serializes and deserializes an object.
- */
-class ActorStateProviderSerializer {
-
- /**
- * Shared Json serializer/deserializer as per Jackson's documentation.
- */
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- /**
- * Serializes a given state object into byte array.
- *
- * @param state State object to be serialized.
- * @return Array of bytes[] with the serialized content.
- * @throws IOException
- */
- String serialize(Object state) throws IOException {
- return OBJECT_MAPPER.writeValueAsString(state);
- }
-
- /**
- * Deserializes the byte array into the original object.
- *
- * @param json String to be parsed.
- * @param clazz Type of the object being deserialized.
- * @param Generic type of the object being deserialized.
- * @return Object of type T.
- * @throws IOException
- */
- T deserialize(String json, Class clazz) throws IOException {
- return OBJECT_MAPPER.readValue(json, clazz);
- }
-
-}
diff --git a/sdk/src/main/java/io/dapr/actors/runtime/ActorStateSerializer.java b/sdk/src/main/java/io/dapr/actors/runtime/ActorStateSerializer.java
new file mode 100644
index 000000000..993d1e7e5
--- /dev/null
+++ b/sdk/src/main/java/io/dapr/actors/runtime/ActorStateSerializer.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+package io.dapr.actors.runtime;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * Serializes and deserializes an object.
+ */
+class ActorStateSerializer {
+
+ /**
+ * Shared Json serializer/deserializer as per Jackson's documentation.
+ */
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ /**
+ * Serializes a given state object into byte array.
+ *
+ * @param state State object to be serialized.
+ * @return Array of bytes[] with the serialized content.
+ * @throws IOException
+ */
+ String serialize(T state) throws IOException {
+ if (state == null) {
+ return null;
+ }
+
+ if (state.getClass() == String.class) {
+ return state.toString();
+ }
+
+ if (isPrimitive(state.getClass())) {
+ return state.toString();
+ }
+
+ // Not string, not primitive, so it is a complex type: we use JSON for that.
+ return OBJECT_MAPPER.writeValueAsString(state);
+ }
+
+ /**
+ * Deserializes the byte array into the original object.
+ *
+ * @param value String to be parsed.
+ * @param clazz Type of the object being deserialized.
+ * @param Generic type of the object being deserialized.
+ * @return Object of type T.
+ * @throws IOException
+ */
+ T deserialize(String value, Class clazz) throws IOException {
+ if (clazz == String.class) {
+ return (T) value;
+ }
+
+ if (isPrimitive(clazz)) {
+ return parse(value, clazz);
+ }
+
+ // Not string, not primitive, so it is a complex type: we use JSON for that.
+ return OBJECT_MAPPER.readValue(value, clazz);
+ }
+
+ private static boolean isPrimitive(Class> clazz) {
+ if (clazz == null) {
+ return false;
+ }
+
+ return (clazz.isPrimitive() ||
+ (clazz == Boolean.class) ||
+ (clazz == Character.class) ||
+ (clazz == Byte.class) ||
+ (clazz == Short.class) ||
+ (clazz == Integer.class) ||
+ (clazz == Long.class) ||
+ (clazz == Float.class) ||
+ (clazz == Double.class) ||
+ (clazz == Void.class));
+ }
+
+ private static T parse(String value, Class clazz) {
+ if (value == null) {
+ return null;
+ }
+
+ if ((Boolean.class == clazz) || (boolean.class == clazz)) return (T) Boolean.valueOf(value);
+ if ((Byte.class == clazz) || (byte.class == clazz)) return (T) Byte.valueOf(value);
+ if ((Short.class == clazz) || (short.class == clazz)) return (T) Short.valueOf(value);
+ if ((Integer.class == clazz) || (int.class == clazz)) return (T) Integer.valueOf(value);
+ if ((Long.class == clazz) || (long.class == clazz)) return (T) Long.valueOf(value);
+ if ((Float.class == clazz) || (float.class == clazz)) return (T) Float.valueOf(value);
+ if ((Double.class == clazz) || (double.class == clazz)) return (T) Double.valueOf(value);
+
+ return null;
+ }
+}
diff --git a/sdk/src/main/java/io/dapr/actors/runtime/ActorTimerImpl.java b/sdk/src/main/java/io/dapr/actors/runtime/ActorTimerImpl.java
index 33d3a23fe..ee53c2951 100644
--- a/sdk/src/main/java/io/dapr/actors/runtime/ActorTimerImpl.java
+++ b/sdk/src/main/java/io/dapr/actors/runtime/ActorTimerImpl.java
@@ -1,10 +1,14 @@
package io.dapr.actors.runtime;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
import java.time.Duration;
import java.util.function.Function;
@@ -14,9 +18,9 @@ import java.util.function.Function;
class ActorTimerImpl implements ActorTimer {
/**
- * Shared Json serializer/deserializer as per Jackson's documentation, used only for this class.
+ * Shared Json Factory as per Jackson's documentation, used only for this class.
*/
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final JsonFactory JSON_FACTORY = new JsonFactory();
/**
* Actor that owns this timer.
@@ -118,9 +122,15 @@ class ActorTimerImpl implements ActorTimer {
* @return JSON.
*/
String serialize() throws IOException {
- ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
- objectNode.put("dueTime", ConverterUtils.ConvertDurationToDaprFormat(this.dueTime));
- objectNode.put("period", ConverterUtils.ConvertDurationToDaprFormat(this.period));
- return OBJECT_MAPPER.writeValueAsString(objectNode);
+ try (Writer writer = new StringWriter()) {
+ JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
+ generator.writeStartObject();
+ generator.writeStringField("dueTime", ConverterUtils.ConvertDurationToDaprFormat(this.dueTime));
+ generator.writeStringField("period", ConverterUtils.ConvertDurationToDaprFormat(this.period));
+ generator.writeEndObject();
+ generator.close();
+ writer.flush();
+ return writer.toString();
+ }
}
-}
+}
\ No newline at end of file
diff --git a/sdk/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java b/sdk/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java
index 15b4aab8b..8d9212e75 100644
--- a/sdk/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java
+++ b/sdk/src/main/java/io/dapr/actors/runtime/DaprStateAsyncProvider.java
@@ -5,14 +5,13 @@
package io.dapr.actors.runtime;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
import reactor.core.publisher.Mono;
import java.io.IOException;
-import java.util.Collection;
+import java.io.StringWriter;
+import java.io.Writer;
/**
* State Provider to interact with Dapr runtime to handle state.
@@ -20,15 +19,15 @@ import java.util.Collection;
class DaprStateAsyncProvider {
/**
- * Shared Json serializer/deserializer as per Jackson's documentation, used only for this class.
+ * Shared Json Factory as per Jackson's documentation, used only for this class.
*/
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final JsonFactory JSON_FACTORY = new JsonFactory();
private final AppToDaprAsyncClient daprAsyncClient;
- private final ActorStateProviderSerializer serializer;
+ private final ActorStateSerializer serializer;
- DaprStateAsyncProvider(AppToDaprAsyncClient daprAsyncClient, ActorStateProviderSerializer serializer) {
+ DaprStateAsyncProvider(AppToDaprAsyncClient daprAsyncClient, ActorStateSerializer serializer) {
this.daprAsyncClient = daprAsyncClient;
this.serializer = serializer;
}
@@ -36,17 +35,15 @@ class DaprStateAsyncProvider {
Mono load(String actorType, String actorId, String stateName, Class clazz) {
Mono result = this.daprAsyncClient.getState(actorType, actorId, stateName);
- return result.map(s -> {
- if (s == null) {
- return (T)null;
- }
-
- try {
- return this.serializer.deserialize(s, clazz);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
+ return result
+ .filter(s -> (s != null) && (!s.isEmpty()))
+ .map(s -> {
+ try {
+ return this.serializer.deserialize(s, clazz);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
Mono contains(String actorType, String actorId, String stateName) {
@@ -79,50 +76,65 @@ class DaprStateAsyncProvider {
* @param stateChanges Collection of changes to be performed transactionally.
* @return Void.
*/
- Mono apply(String actorType, String actorId, Collection stateChanges)
+ Mono apply(String actorType, String actorId, ActorStateChange... stateChanges)
{
- if ((stateChanges == null) || stateChanges.isEmpty()) {
- return Mono.just(null);
+ if ((stateChanges == null) || stateChanges.length == 0) {
+ return Mono.empty();
}
- // Constructing the JSON "manually" to avoid creating transient classes to be parsed.
- ArrayNode operations = OBJECT_MAPPER.createArrayNode();
- for (ActorStateChange stateChange : stateChanges) {
- if ((stateChange == null) || (stateChange.getChangeKind() == null)) {
- continue;
- }
+ int count = 0;
+ // Constructing the JSON via a stream API to avoid creating transient objects to be instantiated.
+ String payload = null;
+ try (Writer writer = new StringWriter()) {
+ JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
+ // Start array
+ generator.writeStartArray();
- String operationName = stateChange.getChangeKind().getDaprStateChangeOperation();
- if ((operationName == null) || (operationName.length() == 0)) {
- continue;
- }
-
- try {
- ObjectNode operation = OBJECT_MAPPER.createObjectNode();
- operation.set("operation", operation.textNode(operationName));
- ObjectNode request = OBJECT_MAPPER.createObjectNode();
- request.put("key", stateChange.getStateName());
- if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE) || (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) {
- request.put("value", this.serializer.serialize(stateChange.getValue()));
+ for (ActorStateChange stateChange : stateChanges) {
+ if ((stateChange == null) || (stateChange.getChangeKind() == null)) {
+ continue;
}
- operations.add(operation);
- } catch (IOException e) {
- e.printStackTrace();
- return Mono.error(e);
+ String operationName = stateChange.getChangeKind().getDaprStateChangeOperation();
+ if ((operationName == null) || (operationName.length() == 0)) {
+ continue;
+ }
+
+ count++;
+
+ // Start operation object.
+ generator.writeStartObject();
+ generator.writeStringField("operation", operationName);
+
+ // Start request object.
+ generator.writeObjectFieldStart("request");
+ generator.writeStringField("key", stateChange.getStateName());
+ if ((stateChange.getChangeKind() == ActorStateChangeKind.UPDATE) || (stateChange.getChangeKind() == ActorStateChangeKind.ADD)) {
+ generator.writeStringField("value", this.serializer.serialize(stateChange.getValue()));
+ }
+ // End request object.
+ generator.writeEndObject();
+
+ // End operation object.
+ generator.writeEndObject();
}
- }
- if (operations.size() == 0) {
- // No-op since there is no operation to be performed.
- Mono.just(null);
- }
+ // End array
+ generator.writeEndArray();
- try {
- return this.daprAsyncClient.saveStateTransactionally(actorType, actorId, OBJECT_MAPPER.writeValueAsString(operations));
- } catch (JsonProcessingException e) {
+ generator.close();
+ writer.flush();
+ payload = writer.toString();
+ } catch (IOException e) {
e.printStackTrace();
return Mono.error(e);
}
+
+ if (count == 0) {
+ // No-op since there is no operation to be performed.
+ Mono.empty();
+ }
+
+ return this.daprAsyncClient.saveStateTransactionally(actorType, actorId, payload);
}
}
diff --git a/sdk/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java b/sdk/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java
new file mode 100644
index 000000000..81d16a6f4
--- /dev/null
+++ b/sdk/src/test/java/io/dapr/actors/runtime/DaprStateAsyncProviderTest.java
@@ -0,0 +1,248 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.actors.runtime;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the state store facade.
+ */
+public class DaprStateAsyncProviderTest {
+
+ private static final ActorStateSerializer SERIALIZER = new ActorStateSerializer();
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static final double EPSILON = 1e-10;
+
+ /**
+ * Class used to test JSON serialization.
+ */
+ public static final class Customer {
+
+ private int id;
+
+ private String name;
+
+ public int getId() {
+ return id;
+ }
+
+ public Customer setId(int id) {
+ this.id = id;
+ return this;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Customer setName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Customer customer = (Customer) o;
+ return id == customer.id &&
+ Objects.equals(name, customer.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, name);
+ }
+
+ }
+
+ @Test
+ public void happyCaseApply() {
+ AppToDaprAsyncClient daprAsyncClient = mock(AppToDaprAsyncClient.class);
+ when(daprAsyncClient
+ .saveStateTransactionally(
+ eq("MyActor"),
+ eq("123"),
+ argThat(s -> {
+ try {
+ JsonNode node = OBJECT_MAPPER.readTree(s);
+ if (node == null) {
+ return false;
+ }
+
+ if (node.size() != 3) {
+ return false;
+ }
+
+ boolean foundInsertName = false;
+ boolean foundUpdateZipcode = false;
+ boolean foundDeleteFlag = false;
+ for (JsonNode operation : node) {
+ if (operation.get("operation") == null) {
+ return false;
+ }
+ if (operation.get("request") == null) {
+ return false;
+ }
+
+ String opName = operation.get("operation").asText();
+ String key = operation.get("request").get("key").asText();
+ JsonNode valueNode = operation.get("request").get("value");
+
+ foundInsertName |= "upsert".equals(opName) &&
+ "name".equals(key) &&
+ "Jon Doe".equals(valueNode.asText());
+ foundUpdateZipcode |= "upsert".equals(opName) &&
+ "zipcode".equals(key) &&
+ "98011".equals(valueNode.asText());
+ foundDeleteFlag |= "delete".equals(opName) &&
+ "flag".equals(key) &&
+ (valueNode == null);
+ }
+
+ return foundInsertName && foundUpdateZipcode && foundDeleteFlag;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return false;
+ }
+ })))
+ .thenReturn(Mono.empty());
+
+ DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprAsyncClient, SERIALIZER);
+ provider.apply("MyActor",
+ "123",
+ createInsertChange("name", "Jon Doe"),
+ createUpdateChange("zipcode", "98011"),
+ createDeleteChange("flag"))
+ .block();
+
+ verify(daprAsyncClient).saveStateTransactionally(eq("MyActor"), eq("123"), any());
+ }
+
+ @Test
+ public void happyCaseLoad() {
+ AppToDaprAsyncClient daprAsyncClient = mock(AppToDaprAsyncClient.class);
+ when(daprAsyncClient
+ .getState(any(), any(), eq("name")))
+ .thenReturn(Mono.just("Jon Doe"));
+ when(daprAsyncClient
+ .getState(any(), any(), eq("zipcode")))
+ .thenReturn(Mono.just("98021"));
+ when(daprAsyncClient
+ .getState(any(), any(), eq("goals")))
+ .thenReturn(Mono.just("98"));
+ when(daprAsyncClient
+ .getState(any(), any(), eq("balance")))
+ .thenReturn(Mono.just("46.55"));
+ when(daprAsyncClient
+ .getState(any(), any(), eq("active")))
+ .thenReturn(Mono.just("true"));
+ when(daprAsyncClient
+ .getState(any(), any(), eq("customer")))
+ .thenReturn(Mono.just("{ \"id\": 1000, \"name\": \"Roxane\"}"));
+ when(daprAsyncClient
+ .getState(any(), any(), eq("anotherCustomer")))
+ .thenReturn(Mono.just("{ \"id\": 2000, \"name\": \"Max\"}"));
+ when(daprAsyncClient
+ .getState(any(), any(), eq("nullCustomer")))
+ .thenReturn(Mono.just(""));
+
+ DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprAsyncClient, SERIALIZER);
+
+ Assert.assertEquals("Jon Doe",
+ provider.load("MyActor", "123", "name", String.class).block());
+ Assert.assertEquals("98021",
+ provider.load("MyActor", "123", "zipcode", String.class).block());
+ Assert.assertEquals(98,
+ (int) provider.load("MyActor", "123", "goals", int.class).block());
+ Assert.assertEquals(98,
+ (int) provider.load("MyActor", "123", "goals", int.class).block());
+ Assert.assertEquals(46.55,
+ (double) provider.load("MyActor", "123", "balance", double.class).block(),
+ EPSILON);
+ Assert.assertEquals(true,
+ (boolean) provider.load("MyActor", "123", "active", boolean.class).block());
+ Assert.assertEquals(new Customer().setId(1000).setName("Roxane"),
+ provider.load("MyActor", "123", "customer", Customer.class).block());
+ Assert.assertNotEquals(new Customer().setId(1000).setName("Roxane"),
+ provider.load("MyActor", "123", "anotherCustomer", Customer.class).block());
+ Assert.assertNull(provider.load("MyActor", "123", "nullCustomer", Customer.class).block());
+ }
+
+ @Test
+ public void happyCaseContains() {
+ AppToDaprAsyncClient daprAsyncClient = mock(AppToDaprAsyncClient.class);
+
+ // Keys that exists.
+ when(daprAsyncClient
+ .getState(any(), any(), eq("name")))
+ .thenReturn(Mono.just("Jon Doe"));
+ when(daprAsyncClient
+ .getState(any(), any(), eq("zipcode")))
+ .thenReturn(Mono.just("98021"));
+ when(daprAsyncClient
+ .getState(any(), any(), eq("goals")))
+ .thenReturn(Mono.just("98"));
+ when(daprAsyncClient
+ .getState(any(), any(), eq("balance")))
+ .thenReturn(Mono.just("46.55"));
+ when(daprAsyncClient
+ .getState(any(), any(), eq("active")))
+ .thenReturn(Mono.just("true"));
+ when(daprAsyncClient
+ .getState(any(), any(), eq("customer")))
+ .thenReturn(Mono.just("{ \"id\": \"3000\", \"name\": \"Ely\" }"));
+
+ // Keys that do not exist.
+ when(daprAsyncClient
+ .getState(any(), any(), eq("Does not exist")))
+ .thenReturn(Mono.just(""));
+ when(daprAsyncClient
+ .getState(any(), any(), eq("NAME")))
+ .thenReturn(Mono.just(""));
+ when(daprAsyncClient
+ .getState(any(), any(), eq(null)))
+ .thenReturn(Mono.just(""));
+
+ DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprAsyncClient, SERIALIZER);
+
+ Assert.assertTrue(provider.contains("MyActor", "123", "name").block());
+ Assert.assertFalse(provider.contains("MyActor", "123", "NAME").block());
+ Assert.assertTrue(provider.contains("MyActor", "123", "zipcode").block());
+ Assert.assertTrue(provider.contains("MyActor", "123", "goals").block());
+ Assert.assertTrue(provider.contains("MyActor", "123", "balance").block());
+ Assert.assertTrue(provider.contains("MyActor", "123", "active").block());
+ Assert.assertTrue(provider.contains("MyActor", "123", "customer").block());
+ Assert.assertFalse(provider.contains("MyActor", "123", "Does not exist").block());
+ Assert.assertFalse(provider.contains("MyActor", "123", null).block());
+ }
+
+ private final ActorStateChange createInsertChange(String name, T value) {
+ return new ActorStateChange(name, value, ActorStateChangeKind.ADD);
+ }
+
+ private final ActorStateChange createUpdateChange(String name, T value) {
+ return new ActorStateChange(name, value, ActorStateChangeKind.UPDATE);
+ }
+
+ private final ActorStateChange createDeleteChange(String name) {
+ return new ActorStateChange(name, null, ActorStateChangeKind.REMOVE);
+ }
+}