headers) {
+ return Mono.fromRunnable(() -> {
+ try {
+ // Dapr's event is compliant to CloudEvent.
+ CloudEvent envelope = CloudEvent.deserialize(body);
+
+ String message = envelope.getData() == null ? "" : new String(envelope.getData());
+ System.out.println("Subscriber got message: " + message);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+}
diff --git a/examples/src/main/java/io/dapr/examples/state/grpc/Example.java b/examples/src/main/java/io/dapr/examples/state/grpc/Example.java
index 7b9b20d19..9b01b2dfc 100644
--- a/examples/src/main/java/io/dapr/examples/state/grpc/Example.java
+++ b/examples/src/main/java/io/dapr/examples/state/grpc/Example.java
@@ -16,46 +16,46 @@ import java.util.UUID;
* dapr run --grpc-port 50001 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.Example
*/
public class Example {
- public static void main(String[] args) {
- ManagedChannel channel =
- ManagedChannelBuilder.forAddress("localhost", 50001).usePlaintext().build();
- DaprBlockingStub client = DaprGrpc.newBlockingStub(channel);
+ public static void main(String[] args) {
+ ManagedChannel channel =
+ ManagedChannelBuilder.forAddress("localhost", 50001).usePlaintext().build();
+ DaprBlockingStub client = DaprGrpc.newBlockingStub(channel);
- String key = "mykey";
- // First, write key-value pair.
- {
- String value = UUID.randomUUID().toString();
- StateRequest req = StateRequest
- .newBuilder()
- .setKey(key)
- .setValue(Any.newBuilder().setValue(ByteString.copyFromUtf8(value)).build())
- .build();
- SaveStateEnvelope state = SaveStateEnvelope.newBuilder()
- .addRequests(req)
- .build();
- client.saveState(state);
- System.out.println("Saved!");
- }
-
- // Now, read it back.
- {
- GetStateEnvelope req = GetStateEnvelope
- .newBuilder()
- .setKey(key)
- .build();
- GetStateResponseEnvelope response = client.getState(req);
- String value = response.getData().getValue().toStringUtf8();
- System.out.println("Got: " + value);
- }
-
- // Then, delete it.
- {
- DeleteStateEnvelope req = DeleteStateEnvelope
- .newBuilder()
- .setKey(key)
- .build();
- client.deleteState(req);
- System.out.println("Deleted!");
- }
+ String key = "mykey";
+ // First, write key-value pair.
+ {
+ String value = UUID.randomUUID().toString();
+ StateRequest req = StateRequest
+ .newBuilder()
+ .setKey(key)
+ .setValue(Any.newBuilder().setValue(ByteString.copyFromUtf8(value)).build())
+ .build();
+ SaveStateEnvelope state = SaveStateEnvelope.newBuilder()
+ .addRequests(req)
+ .build();
+ client.saveState(state);
+ System.out.println("Saved!");
}
+
+ // Now, read it back.
+ {
+ GetStateEnvelope req = GetStateEnvelope
+ .newBuilder()
+ .setKey(key)
+ .build();
+ GetStateResponseEnvelope response = client.getState(req);
+ String value = response.getData().getValue().toStringUtf8();
+ System.out.println("Got: " + value);
+ }
+
+ // Then, delete it.
+ {
+ DeleteStateEnvelope req = DeleteStateEnvelope
+ .newBuilder()
+ .setKey(key)
+ .build();
+ client.deleteState(req);
+ System.out.println("Deleted!");
+ }
+ }
}
diff --git a/examples/src/main/java/io/dapr/examples/state/http/OrderManager.java b/examples/src/main/java/io/dapr/examples/state/http/OrderManager.java
index 05fa5d108..df2650f7e 100644
--- a/examples/src/main/java/io/dapr/examples/state/http/OrderManager.java
+++ b/examples/src/main/java/io/dapr/examples/state/http/OrderManager.java
@@ -2,6 +2,9 @@ package io.dapr.examples.state.http;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
+import io.dapr.client.DaprClient;
+import io.dapr.client.DaprClientBuilder;
+import io.dapr.serializer.DefaultObjectSerializer;
import org.json.JSONArray;
import org.json.JSONObject;
@@ -20,6 +23,8 @@ import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.nio.charset.Charset;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -27,117 +32,84 @@ import static java.lang.System.out;
/**
* OrderManager web app.
- *
+ *
* Based on the helloworld Node.js example in https://github.com/dapr/samples/blob/master/1.hello-world/app.js
- *
+ *
* To install jars into your local maven repo:
- * mvn clean install
- *
+ * mvn clean install
+ *
* To run (after step above):
- * dapr run --app-id orderapp --app-port 3000 --port 3500 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.state.http.OrderManager
- *
+ * dapr run --app-id orderapp --app-port 3000 --port 3500 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.state.http.OrderManager
+ *
* If this class changes, run this before running it again:
- * mvn compile
+ * mvn compile
*/
public class OrderManager {
- static HttpClient httpClient;
+ public static void main(String[] args) throws IOException {
+ int httpPort = 3001;
+ HttpServer httpServer = HttpServer.create(new InetSocketAddress(httpPort), 0);
- public static void main(String[] args) throws IOException {
- int httpPort = 3000;
- String daprPort = Optional.ofNullable(System.getenv("DAPR_HTTP_PORT")).orElse("3500");
- String stateUrl = String.format("http://localhost:%s/v1.0/state", daprPort);
- HttpServer httpServer = HttpServer.create(new InetSocketAddress(httpPort), 0);
+ DaprClient daprClient =
+ (new DaprClientBuilder(new DefaultObjectSerializer(), new DefaultObjectSerializer())).build();
- httpClient = HttpClient.newBuilder().version(Version.HTTP_1_1).followRedirects(Redirect.NORMAL)
- .connectTimeout(Duration.ofSeconds(2)).build();
-
- httpServer.createContext("/order").setHandler(e -> {
- out.println("Fetching order!");
- fetch(stateUrl + "/order").thenAccept(response -> {
- int resCode = response.statusCode() == 200 ? 200 : 500;
- String body = response.statusCode() == 200 ? response.body() : "Could not get state.";
-
- try {
- e.sendResponseHeaders(resCode, body.getBytes().length);
- OutputStream os = e.getResponseBody();
- try {
- os.write(body.getBytes());
- } finally {
- os.close();
- }
- } catch (IOException ioerror) {
- out.println(ioerror);
- }
- });
- });
-
- httpServer.createContext("/neworder").setHandler(e -> {
- try {
- out.println("Received new order ...");
- String json = readBody(e);
- JSONObject jsonObject = new JSONObject(json);
- JSONObject data = jsonObject.getJSONObject("data");
- String orderId = data.getString("orderId");
- out.printf("Got a new order! Order ID: %s\n", orderId);
-
- JSONObject item = new JSONObject();
- item.put("key", "order");
- item.put("value", data);
- JSONArray state = new JSONArray();
- state.put(item);
- out.printf("Writing to state: %s\n", state.toString());
-
- post(stateUrl, state.toString()).thenAccept(response -> {
- int resCode = response.statusCode() == 200 ? 200 : 500;
- String body = response.body();
- try {
- e.sendResponseHeaders(resCode, body.getBytes().length);
- OutputStream os = e.getResponseBody();
- try {
- os.write(body.getBytes());
- } finally {
- os.close();
- }
- } catch (IOException ioerror) {
- out.println(ioerror);
- }
- });
- } catch (IOException ioerror) {
- out.println(ioerror);
- }
- });
-
- httpServer.start();
- out.printf("Java App listening on port %s.", httpPort);
- }
-
- private static CompletableFuture> fetch(String url) {
- HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url)).build();
- return httpClient.sendAsync(request, BodyHandlers.ofString());
- }
-
- private static CompletableFuture> post(String url, String body) {
- HttpRequest request = HttpRequest.newBuilder().uri(URI.create(url))
- .header("Content-Type", "application/json; charset=UTF-8").POST(BodyPublishers.ofString(body)).build();
-
- return httpClient.sendAsync(request, BodyHandlers.ofString());
- }
-
- private static String readBody(HttpExchange t) throws IOException {
- // retrieve the request json data
- InputStream is = t.getRequestBody();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- byte[] buffer = new byte[1024];
- int len;
+ httpServer.createContext("/order").setHandler(e -> {
+ out.println("Fetching order!");
try {
- while ((len = is.read(buffer)) > 0)
- bos.write(buffer, 0, len);
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- bos.close();
+ byte[] data = daprClient.getState("order", String.class).block().getValue().getBytes();
+ e.getResponseHeaders().set("content-type", "application/json");
+ e.sendResponseHeaders(200, data.length);
+ e.getResponseBody().write(data);
+ e.getResponseBody().close();
+ } catch (IOException ioerror) {
+ out.println(ioerror);
+ e.sendResponseHeaders(500, ioerror.getMessage().getBytes().length);
+ e.getResponseBody().write(ioerror.getMessage().getBytes());
+ e.getResponseBody().close();
}
- return new String(bos.toByteArray(), Charset.forName("UTF-8"));
+ });
+
+ httpServer.createContext("/neworder").setHandler(e -> {
+ try {
+ out.println("Received new order ...");
+ String json = readBody(e);
+ JSONObject jsonObject = new JSONObject(json);
+ JSONObject data = jsonObject.getJSONObject("data");
+ String orderId = data.getString("orderId");
+ out.printf("Got a new order! Order ID: %s\n", orderId);
+
+ daprClient.saveState("order", data.toString()).block();
+
+ out.printf("Saved state: %s\n", data.toString());
+ e.sendResponseHeaders(200, 0);
+ e.getResponseBody().write(new byte[0]);
+ e.getResponseBody().close();
+ } catch (IOException ioerror) {
+ out.println(ioerror);
+ e.sendResponseHeaders(500, ioerror.getMessage().getBytes().length);
+ e.getResponseBody().write(ioerror.getMessage().getBytes());
+ e.getResponseBody().close();
+ }
+ });
+
+ httpServer.start();
+ out.printf("Java App listening on port %s.", httpPort);
+ }
+
+ private static String readBody(HttpExchange t) throws IOException {
+ // retrieve the request json data
+ InputStream is = t.getRequestBody();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ byte[] buffer = new byte[1024];
+ int len;
+ try {
+ while ((len = is.read(buffer)) > 0)
+ bos.write(buffer, 0, len);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } finally {
+ bos.close();
}
+ return new String(bos.toByteArray(), Charset.forName("UTF-8"));
+ }
}
diff --git a/examples/src/main/java/io/dapr/springboot/DaprApplication.java b/examples/src/main/java/io/dapr/springboot/DaprApplication.java
new file mode 100644
index 000000000..22d22ea6b
--- /dev/null
+++ b/examples/src/main/java/io/dapr/springboot/DaprApplication.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.springboot;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * Dapr's HTTP callback implementation via SpringBoot.
+ */
+@SpringBootApplication(scanBasePackages = {"io.dapr.springboot", "io.dapr.examples"})
+public class DaprApplication {
+
+ /**
+ * Starts Dapr's callback in a given port.
+ * @param port Port to listen to.
+ */
+ public static void start(int port) {
+ SpringApplication app = new SpringApplication(DaprApplication.class);
+ app.run(String.format("--server.port=%d", port));
+ }
+
+}
diff --git a/examples/src/main/java/io/dapr/springboot/DaprController.java b/examples/src/main/java/io/dapr/springboot/DaprController.java
new file mode 100644
index 000000000..1f1ffa018
--- /dev/null
+++ b/examples/src/main/java/io/dapr/springboot/DaprController.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.springboot;
+
+import io.dapr.actors.runtime.ActorRuntime;
+import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Mono;
+
+/**
+ * SpringBoot Controller to handle callback APIs for Dapr.
+ */
+@RestController
+public class DaprController {
+
+ @GetMapping("/")
+ public String index() {
+ return "Greetings from Dapr!";
+ }
+
+ @GetMapping("/dapr/config")
+ public byte[] daprConfig() throws Exception {
+ return ActorRuntime.getInstance().serializeConfig();
+ }
+
+ @PostMapping(path = "/actors/{type}/{id}")
+ public Mono activateActor(@PathVariable("type") String type,
+ @PathVariable("id") String id) throws Exception {
+ return ActorRuntime.getInstance().activate(type, id);
+ }
+
+ @DeleteMapping(path = "/actors/{type}/{id}")
+ public Mono deactivateActor(@PathVariable("type") String type,
+ @PathVariable("id") String id) throws Exception {
+ return ActorRuntime.getInstance().deactivate(type, id);
+ }
+
+ @PutMapping(path = "/actors/{type}/{id}/method/{method}")
+ public Mono invokeActorMethod(@PathVariable("type") String type,
+ @PathVariable("id") String id,
+ @PathVariable("method") String method,
+ @RequestBody(required = false) byte[] body) {
+ return ActorRuntime.getInstance().invoke(type, id, method, body);
+ }
+
+ @PutMapping(path = "/actors/{type}/{id}/method/timer/{timer}")
+ public Mono invokeActorTimer(@PathVariable("type") String type,
+ @PathVariable("id") String id,
+ @PathVariable("timer") String timer) {
+ return ActorRuntime.getInstance().invokeTimer(type, id, timer);
+ }
+
+ @PutMapping(path = "/actors/{type}/{id}/method/remind/{reminder}")
+ public Mono invokeActorReminder(@PathVariable("type") String type,
+ @PathVariable("id") String id,
+ @PathVariable("reminder") String reminder,
+ @RequestBody(required = false) byte[] body) {
+ return ActorRuntime.getInstance().invokeReminder(type, id, reminder, body);
+ }
+
+}
diff --git a/examples/src/main/resources/img/exposer-service.png b/examples/src/main/resources/img/exposer-service.png
new file mode 100644
index 000000000..8034d839a
Binary files /dev/null and b/examples/src/main/resources/img/exposer-service.png differ
diff --git a/examples/src/main/resources/img/inputbinding.png b/examples/src/main/resources/img/inputbinding.png
new file mode 100644
index 000000000..662b798bf
Binary files /dev/null and b/examples/src/main/resources/img/inputbinding.png differ
diff --git a/examples/src/main/resources/img/outputbinding.png b/examples/src/main/resources/img/outputbinding.png
new file mode 100644
index 000000000..a9ed04b1f
Binary files /dev/null and b/examples/src/main/resources/img/outputbinding.png differ
diff --git a/examples/src/main/resources/img/publisher.png b/examples/src/main/resources/img/publisher.png
new file mode 100644
index 000000000..cea3d8e60
Binary files /dev/null and b/examples/src/main/resources/img/publisher.png differ
diff --git a/examples/src/main/resources/img/subscriber.png b/examples/src/main/resources/img/subscriber.png
new file mode 100644
index 000000000..506456d98
Binary files /dev/null and b/examples/src/main/resources/img/subscriber.png differ
diff --git a/pom.xml b/pom.xml
index fe5af2040..24a4907f8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
io.dapr
dapr-sdk-parent
pom
- 0.3.0-alpha
+ 0.2.0-SNAPSHOT
dapr-sdk-parent
SDK for Dapr.
https://dapr.io
@@ -22,8 +22,23 @@
1.8
8
8
+ true
+ true
+
+
+
+ ossrh
+ https://oss.sonatype.org/content/repositories/snapshots
+
+
+
+
+
@@ -33,6 +48,11 @@
pom
import
+
+ io.grpc
+ grpc-api
+ ${grpc.version}
+
javax.annotation
javax.annotation-api
@@ -54,6 +74,87 @@
+
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+ 1.6
+
+
+ sign-artifacts
+ verify
+
+ sign
+
+
+
+ --batch
+ --pinentry-mode
+ loopback
+
+
+
+
+
+
+ org.sonatype.plugins
+ nexus-staging-maven-plugin
+ 1.6.8
+ true
+
+ ossrh
+ https://oss.sonatype.org/
+ true
+
+
+
+ org.codehaus.mojo
+ failsafe-maven-plugin
+ 2.4.3-alpha-1
+
+
+
+ integration-test
+ verify
+
+
+ ${skipITs}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 3.1.0
+
+ checkstyle.xml
+ UTF-8
+ true
+ false
+ false
+
+
+
+ validate
+ validate
+
+ check
+
+
+
+
+
+ com.puppycrawl.tools
+ checkstyle
+ 8.27
+
+
+
+
+
+
MIT License
@@ -71,13 +172,15 @@
- scm:git:git://github.com/dapr/java-sdk.git
- scm:git:ssh://github.com:dapr/java-sdk.git
- http://github.com/dapr/java-sdk
+ https://github.com/dapr/java-sdk
+ scm:git:https://github.com/dapr/java-sdk.git
+ HEAD
+ sdk-autogen
sdk
+ sdk-actors
examples
diff --git a/sdk-actors/pom.xml b/sdk-actors/pom.xml
new file mode 100644
index 000000000..7f6868eca
--- /dev/null
+++ b/sdk-actors/pom.xml
@@ -0,0 +1,158 @@
+
+ 4.0.0
+
+
+ io.dapr
+ dapr-sdk-parent
+ 0.2.0-SNAPSHOT
+
+
+ dapr-sdk-actors
+ jar
+ 0.2.0-SNAPSHOT
+ dapr-sdk-actors
+ SDK for Actors on Dapr
+
+
+
+
+ false
+
+ central
+ libs-release
+ https://repo.spring.io/libs-release
+
+
+
+
+ true
+ false
+
+
+
+
+ io.dapr
+ dapr-sdk
+ ${project.version}
+
+
+ junit
+ junit
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ com.github.gmazzo
+ okhttp-mock
+ 1.3.2
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ 5.5.2
+ test
+
+
+ com.github.stefanbirkner
+ system-rules
+ 1.19.0
+ test
+
+
+ commons-cli
+ commons-cli
+ 1.4
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ 2.2.2.RELEASE
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.2.0
+
+
+ attach-sources
+
+ jar-no-fork
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ 3.1.1
+
+
+ attach-javadocs
+
+ jar
+
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.8.4
+
+
+ default-prepare-agent
+
+ prepare-agent
+
+
+
+ report
+ test
+
+ report
+
+
+ target/jacoco-report/
+
+
+
+ check
+
+ check
+
+
+
+
+ BUNDLE
+
+
+ LINE
+ COVEREDRATIO
+ 80%
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/sdk-actors/src/main/java/io/dapr/actors/ActorId.java b/sdk-actors/src/main/java/io/dapr/actors/ActorId.java
new file mode 100644
index 000000000..27c1198fb
--- /dev/null
+++ b/sdk-actors/src/main/java/io/dapr/actors/ActorId.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.actors;
+
+import java.util.UUID;
+
+/**
+ * The ActorId represents the identity of an actor within an actor service.
+ */
+public class ActorId extends Object implements Comparable {
+
+ /**
+ * The ID of the actor as a String.
+ */
+ private final String stringId;
+
+ /**
+ * An error message for an invalid constructor arg.
+ */
+ private final String errorMsg = "actor needs to be initialized with an id!";
+
+ /**
+ * Initializes a new instance of the ActorId class with the id passed in.
+ *
+ * @param id Value for actor id
+ */
+ public ActorId(String id) {
+ if (id != null) {
+ this.stringId = id;
+ } else {
+ throw new IllegalArgumentException(errorMsg);
+ }
+ }
+
+ /**
+ * Returns the String representation of this Actor's identifier.
+ *
+ * @return The String representation of this ActorId
+ */
+ @Override
+ public String toString() {
+ return this.stringId;
+ }
+
+ /**
+ * Compares this instance with a specified {link #ActorId} object and
+ * indicates whether this instance precedes, follows, or appears in the same
+ * position in the sort order as the specified actorId.
+ * The comparison is done based on the id if both the instances.
+ *
+ * @param other The actorId to compare with this instance.
+ * @return A 32-bit signed integer that indicates whether this instance
+ * precedes, follows, or appears in the same position in the sort order as the
+ * other parameter.
+ */
+ @Override
+ public int compareTo(ActorId other) {
+ return (other == null) ? 1
+ : compareContent(this, other);
+ }
+
+ /**
+ * Calculates the hash code for this ActorId.
+ *
+ * @return The hash code of this ActorId.
+ */
+ @Override
+ public int hashCode() {
+ return this.stringId.hashCode();
+ }
+
+ /**
+ * Compare if the content of two ids are the same.
+ *
+ * @param id1 One identifier.
+ * @param id2 Another identifier.
+ * @return -1, 0, or 1 depending on the compare result of the stringId member.
+ */
+ private int compareContent(ActorId id1, ActorId id2) {
+ return id1.stringId.compareTo(id2.stringId);
+ }
+
+ /**
+ * Checks if this instance is equals to the other instance.
+ *
+ * @return true if the 2 ActorId's are equal.
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ return hasEqualContent(this, (ActorId) obj);
+ }
+
+ /**
+ * Creates a new ActorId with a random id.
+ *
+ * @return A new ActorId with a random id.
+ */
+ public static ActorId createRandom() {
+ UUID id = UUID.randomUUID();
+ return new ActorId(id.toString());
+ }
+
+
+ /**
+ * Compares if two actors have the same content.
+ *
+ * @param id1 One identifier.
+ * @param id2 Another identifier.
+ * @return true if the two ActorId's are equal
+ */
+ private static boolean hasEqualContent(ActorId id1, ActorId id2) {
+ return id1.stringId.equals(id2.stringId);
+ }
+}
diff --git a/sdk-actors/src/main/java/io/dapr/actors/ActorTrace.java b/sdk-actors/src/main/java/io/dapr/actors/ActorTrace.java
new file mode 100644
index 000000000..8a65a9548
--- /dev/null
+++ b/sdk-actors/src/main/java/io/dapr/actors/ActorTrace.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.actors;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+// TODO: Implement distributed tracing.
+// TODO: Make this generic to the SDK and not only for Actors.
+
+/**
+ * Class to emit trace log messages.
+ */
+public final class ActorTrace {
+
+ /**
+ * Gets the default Logger.
+ */
+ private static final Logger LOGGER = Logger.getLogger(ActorTrace.class.getName());
+
+ /**
+ * Writes an information trace log.
+ *
+ * @param type Type of log.
+ * @param id Instance identifier.
+ * @param msgFormat Message or message format (with type and id input as well).
+ * @param params Params for the message.
+ */
+ public void writeInfo(String type, String id, String msgFormat, Object... params) {
+ this.write(Level.INFO, type, id, msgFormat, params);
+ }
+
+ /**
+ * Writes an warning trace log.
+ *
+ * @param type Type of log.
+ * @param id Instance identifier.
+ * @param msgFormat Message or message format (with type and id input as well).
+ * @param params Params for the message.
+ */
+ public void writeWarning(String type, String id, String msgFormat, Object... params) {
+ this.write(Level.WARNING, type, id, msgFormat, params);
+ }
+
+ /**
+ * Writes an error trace log.
+ *
+ * @param type Type of log.
+ * @param id Instance identifier.
+ * @param msgFormat Message or message format (with type and id input as well).
+ * @param params Params for the message.
+ */
+ public void writeError(String type, String id, String msgFormat, Object... params) {
+ this.write(Level.SEVERE, type, id, msgFormat, params);
+ }
+
+ /**
+ * Writes a trace log.
+ *
+ * @param level Severity level of the log.
+ * @param type Type of log.
+ * @param id Instance identifier.
+ * @param msgFormat Message or message format (with type and id input as well).
+ * @param params Params for the message.
+ */
+ private void write(Level level, String type, String id, String msgFormat, Object... params) {
+ String formatString = String.format("%s:%s %s", emptyIfNul(type), emptyIfNul(id), emptyIfNul(msgFormat));
+ if ((params == null) || (params.length == 0)) {
+ LOGGER.log(level, formatString);
+ } else {
+ LOGGER.log(level, String.format(formatString, params));
+ }
+ }
+
+ /**
+ * Utility method that returns empty if String is null.
+ *
+ * @param s String to be checked.
+ * @return String (if not null) or empty (if null).
+ */
+ private static String emptyIfNul(String s) {
+ if (s == null) {
+ return "";
+ }
+
+ return s;
+ }
+}
diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxy.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxy.java
new file mode 100644
index 000000000..6955c3796
--- /dev/null
+++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxy.java
@@ -0,0 +1,63 @@
+package io.dapr.actors.client;
+
+import io.dapr.actors.ActorId;
+import reactor.core.publisher.Mono;
+
+/**
+ * Proxy to communicate to a given Actor instance in Dapr.
+ */
+public interface ActorProxy {
+
+ /**
+ * Returns the ActorId associated with the proxy object.
+ *
+ * @return An ActorId object.
+ */
+ ActorId getActorId();
+
+ /**
+ * Returns actor implementation type of the actor associated with the proxy object.
+ *
+ * @return Actor's type name.
+ */
+ String getActorType();
+
+ /**
+ * Invokes an Actor method on Dapr.
+ *
+ * @param methodName Method name to invoke.
+ * @param clazz The type of the return class.
+ * @param The type to be returned.
+ * @return Asynchronous result with the Actor's response.
+ */
+ Mono invokeActorMethod(String methodName, Class clazz);
+
+ /**
+ * Invokes an Actor method on Dapr.
+ *
+ * @param methodName Method name to invoke.
+ * @param data Object with the data.
+ * @param clazz The type of the return class.
+ * @param The type to be returned.
+ * @return Asynchronous result with the Actor's response.
+ */
+ Mono invokeActorMethod(String methodName, Object data, Class clazz);
+
+ /**
+ * Invokes an Actor method on Dapr.
+ *
+ * @param methodName Method name to invoke.
+ * @return Asynchronous result with the Actor's response.
+ */
+ Mono invokeActorMethod(String methodName);
+
+ /**
+ * Invokes an Actor method on Dapr.
+ *
+ * @param methodName Method name to invoke.
+ * @param data Object with the data.
+ * @return Asynchronous result with the Actor's response.
+ */
+ Mono invokeActorMethod(String methodName, Object data);
+
+}
diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java
new file mode 100644
index 000000000..37ed0a909
--- /dev/null
+++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyBuilder.java
@@ -0,0 +1,63 @@
+package io.dapr.actors.client;
+
+import io.dapr.actors.ActorId;
+import io.dapr.client.DaprHttpBuilder;
+import io.dapr.serializer.DaprObjectSerializer;
+
+/**
+ * Builder to generate an ActorProxy instance. Builder can be reused for multiple instances.
+ */
+public class ActorProxyBuilder {
+
+ /**
+ * Builder for Dapr's raw http client.
+ */
+ private final DaprHttpBuilder daprHttpBuilder = new DaprHttpBuilder();
+
+ /**
+ * Actor's type.
+ */
+ private final String actorType;
+
+ /**
+ * Dapr's object serializer.
+ */
+ private final DaprObjectSerializer objectSerializer;
+
+ /**
+ * Instantiates a new builder for a given Actor type.
+ *
+ * @param actorType Actor's type.
+ * @param objectSerializer Serializer for objects sent/received.
+ */
+ public ActorProxyBuilder(String actorType, DaprObjectSerializer objectSerializer) {
+ if ((actorType == null) || actorType.isEmpty()) {
+ throw new IllegalArgumentException("ActorType is required.");
+ }
+ if (objectSerializer == null) {
+ throw new IllegalArgumentException("Serializer is required.");
+ }
+
+ this.actorType = actorType;
+ this.objectSerializer = objectSerializer;
+ }
+
+ /**
+ * Instantiates a new ActorProxy.
+ *
+ * @param actorId Actor's identifier.
+ * @return New instance of ActorProxy.
+ */
+ public ActorProxy build(ActorId actorId) {
+ if (actorId == null) {
+ throw new IllegalArgumentException("Cannot instantiate an Actor without Id.");
+ }
+
+ return new ActorProxyImpl(
+ this.actorType,
+ actorId,
+ this.objectSerializer,
+ new DaprHttpClient(this.daprHttpBuilder.build()));
+ }
+
+}
diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyImpl.java b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyImpl.java
new file mode 100644
index 000000000..de354ab6c
--- /dev/null
+++ b/sdk-actors/src/main/java/io/dapr/actors/client/ActorProxyImpl.java
@@ -0,0 +1,136 @@
+package io.dapr.actors.client;
+
+import io.dapr.actors.ActorId;
+import io.dapr.actors.runtime.ObjectSerializer;
+import io.dapr.serializer.DaprObjectSerializer;
+import java.io.IOException;
+import reactor.core.publisher.Mono;
+
+/**
+ * Implements a proxy client for an Actor's instance.
+ */
+class ActorProxyImpl implements ActorProxy {
+
+ /**
+ * Serializer used for internal objects.
+ */
+ private static final ObjectSerializer INTERNAL_SERIALIZER = new ObjectSerializer();
+
+ /**
+ * Actor's identifier for this Actor instance.
+ */
+ private final ActorId actorId;
+
+ /**
+ * Actor's type for this Actor instance.
+ */
+ private final String actorType;
+
+ /**
+ * Serializer/deserialzier to exchange message for Actors.
+ */
+ private final DaprObjectSerializer serializer;
+
+ /**
+ * Client to talk to the Dapr's API.
+ */
+ private final DaprClient daprClient;
+
+ /**
+ * Creates a new instance of {@link ActorProxyImpl}.
+ *
+ * @param actorType actor implementation type of the actor associated with the proxy object.
+ * @param actorId The actorId associated with the proxy
+ * @param serializer Serializer and deserializer for method calls.
+ * @param daprClient Dapr client.
+ */
+ ActorProxyImpl(String actorType, ActorId actorId, DaprObjectSerializer serializer, DaprClient daprClient) {
+ this.actorType = actorType;
+ this.actorId = actorId;
+ this.daprClient = daprClient;
+ this.serializer = serializer;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public ActorId getActorId() {
+ return actorId;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getActorType() {
+ return actorType;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeActorMethod(String methodName, Object data, Class clazz) {
+ return this.daprClient.invokeActorMethod(actorType, actorId.toString(), methodName, this.wrap(data))
+ .filter(s -> s.length > 0)
+ .map(s -> unwrap(s, clazz));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeActorMethod(String methodName, Class clazz) {
+ return this.daprClient.invokeActorMethod(actorType, actorId.toString(), methodName, null)
+ .filter(s -> s.length > 0)
+ .map(s -> unwrap(s, clazz));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeActorMethod(String methodName) {
+ return this.daprClient.invokeActorMethod(actorType, actorId.toString(), methodName, null).then();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeActorMethod(String methodName, Object data) {
+ return this.daprClient.invokeActorMethod(actorType, actorId.toString(), methodName, this.wrap(data)).then();
+ }
+
+ /**
+ * Extracts the response object from the Actor's method result.
+ *
+ * @param response response returned by API.
+ * @param clazz Expected response class.
+ * @param Expected response type.
+ * @return Response object or null.
+ * @throws RuntimeException In case it cannot generate Object.
+ */
+ private T unwrap(final byte[] response, Class clazz) {
+ try {
+ return this.serializer.deserialize(INTERNAL_SERIALIZER.unwrapData(response), clazz);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Builds the request to invoke an API for Actors.
+ *
+ * @param request Request object for the original Actor's method.
+ * @return Payload to be sent to Dapr's API.
+ * @throws RuntimeException In case it cannot generate payload.
+ */
+ private byte[] wrap(final Object request) {
+ try {
+ return INTERNAL_SERIALIZER.wrapData(this.serializer.serialize(request));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprClient.java
new file mode 100644
index 000000000..5871f4cdc
--- /dev/null
+++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprClient.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.actors.client;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * Generic Client Adapter to be used regardless of the GRPC or the HTTP Client implementation required.
+ */
+interface DaprClient {
+
+ /**
+ * Invokes an Actor method on Dapr.
+ *
+ * @param actorType Type of actor.
+ * @param actorId Actor Identifier.
+ * @param methodName Method name to invoke.
+ * @param jsonPayload Serialized body.
+ * @return Asynchronous result with the Actor's response.
+ */
+ Mono invokeActorMethod(String actorType, String actorId, String methodName, byte[] jsonPayload);
+
+}
diff --git a/sdk-actors/src/main/java/io/dapr/actors/client/DaprHttpClient.java b/sdk-actors/src/main/java/io/dapr/actors/client/DaprHttpClient.java
new file mode 100644
index 000000000..45d109349
--- /dev/null
+++ b/sdk-actors/src/main/java/io/dapr/actors/client/DaprHttpClient.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.actors.client;
+
+import io.dapr.client.DaprHttp;
+import io.dapr.utils.Constants;
+import reactor.core.publisher.Mono;
+
+/**
+ * DaprClient over HTTP for actor client.
+ *
+ * @see DaprHttp
+ */
+class DaprHttpClient implements DaprClient {
+
+ /**
+ * The HTTP client to be used.
+ *
+ * @see DaprHttp
+ */
+ private final DaprHttp client;
+
+ /**
+ * Instantiates a new Dapr Http Client to invoke Actors.
+ *
+ * @param client Dapr's http client.
+ */
+ DaprHttpClient(DaprHttp client) {
+ this.client = client;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeActorMethod(String actorType, String actorId, String methodName, byte[] jsonPayload) {
+ String url = String.format(Constants.ACTOR_METHOD_RELATIVE_URL_FORMAT, actorType, actorId, methodName);
+ Mono responseMono =
+ this.client.invokeAPI(DaprHttp.HttpMethods.POST.name(), url, null, jsonPayload, null);
+ return responseMono.map(r -> r.getBody());
+ }
+
+}
diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java
new file mode 100644
index 000000000..a9662ac86
--- /dev/null
+++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/AbstractActor.java
@@ -0,0 +1,349 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.actors.runtime;
+
+import io.dapr.actors.ActorId;
+import io.dapr.actors.ActorTrace;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import reactor.core.publisher.Mono;
+
+/**
+ * Represents the base class for actors.
+ * The base type for actors, that provides the common functionality for actors.
+ * The state is preserved across actor garbage collections and fail-overs.
+ */
+public abstract class AbstractActor {
+
+ private static final ObjectSerializer INTERNAL_SERIALIZER = new ObjectSerializer();
+
+ /**
+ * Type of tracing messages.
+ */
+ private static final String TRACE_TYPE = "Actor";
+
+ /**
+ * Context for the Actor runtime.
+ */
+ private final ActorRuntimeContext> actorRuntimeContext;
+
+ /**
+ * Actor identifier.
+ */
+ private final ActorId id;
+
+ /**
+ * Emits trace messages for Actors.
+ */
+ private final ActorTrace actorTrace;
+
+ /**
+ * Registered timers for this Actor.
+ */
+ private final Map timers;
+
+ /**
+ * Manager for the states in Actors.
+ */
+ private final ActorStateManager actorStateManager;
+
+ /**
+ * Internal control to assert method invocation on start and finish in this SDK.
+ */
+ private boolean started;
+
+ /**
+ * Instantiates a new Actor.
+ *
+ * @param runtimeContext Context for the runtime.
+ * @param id Actor identifier.
+ */
+ protected AbstractActor(ActorRuntimeContext runtimeContext, ActorId id) {
+ this.actorRuntimeContext = runtimeContext;
+ this.id = id;
+ this.actorStateManager = new ActorStateManager(
+ runtimeContext.getStateProvider(),
+ runtimeContext.getActorTypeInformation().getName(),
+ id);
+ this.actorTrace = runtimeContext.getActorTrace();
+ this.timers = Collections.synchronizedMap(new HashMap<>());
+ this.started = false;
+ }
+
+ /**
+ * Returns the id of the actor.
+ *
+ * @return Actor id.
+ */
+ protected ActorId getId() {
+ return this.id;
+ }
+
+ /**
+ * Returns the state store manager for this Actor.
+ *
+ * @return State store manager for this Actor
+ */
+ protected ActorStateManager getActorStateManager() {
+ return this.actorStateManager;
+ }
+
+ /**
+ * Registers a reminder for this Actor.
+ *
+ * @param reminderName Name of the reminder.
+ * @param state State to be send along with reminder triggers.
+ * @param dueTime Due time for the first trigger.
+ * @param period Frequency for the triggers.
+ * @param Type of the state object.
+ * @return Asynchronous void response.
+ */
+ protected Mono registerReminder(
+ String reminderName,
+ T state,
+ Duration dueTime,
+ Duration period) {
+ try {
+ byte[] data = this.actorRuntimeContext.getObjectSerializer().serialize(state);
+ ActorReminderParams params = new ActorReminderParams(data, dueTime, period);
+ byte[] serialized = INTERNAL_SERIALIZER.serialize(params);
+ return this.actorRuntimeContext.getDaprClient().registerActorReminder(
+ this.actorRuntimeContext.getActorTypeInformation().getName(),
+ this.id.toString(),
+ reminderName,
+ serialized);
+ } catch (IOException e) {
+ return Mono.error(e);
+ }
+ }
+
+ /**
+ * Registers a Timer for the actor. A timer name is autogenerated by the runtime to keep track of it.
+ *
+ * @param timerName Name of the timer, unique per Actor (auto-generated if null).
+ * @param callback Name of the method to be called.
+ * @param state State to be passed it to the method when timer triggers.
+ * @param dueTime The amount of time to delay before the async callback is first invoked.
+ * Specify negative one (-1) milliseconds to prevent the timer from starting.
+ * Specify zero (0) to start the timer immediately.
+ * @param period The time interval between invocations of the async callback.
+ * Specify negative one (-1) milliseconds to disable periodic signaling.
+ * @param Type for the state to be passed in to timer.
+ * @return Asynchronous result.
+ */
+ protected Mono registerActorTimer(
+ String timerName,
+ String callback,
+ T state,
+ Duration dueTime,
+ Duration period) {
+ return Mono.fromSupplier(() -> {
+ if ((callback == null) || callback.isEmpty()) {
+ throw new IllegalArgumentException("Timer requires a callback function.");
+ }
+
+ String name = timerName;
+ if ((timerName == null) || (timerName.isEmpty())) {
+ name = String.format("%s_Timer_%d", this.id.toString(), this.timers.size() + 1);
+ }
+
+ ActorTimer actorTimer = new ActorTimer(this, name, callback, state, dueTime, period);
+ this.timers.put(name, actorTimer);
+ return actorTimer;
+ }).flatMap(actorTimer -> {
+ try {
+ return this.actorRuntimeContext.getDaprClient().registerActorTimer(
+ this.actorRuntimeContext.getActorTypeInformation().getName(),
+ this.id.toString(),
+ actorTimer.getName(),
+ INTERNAL_SERIALIZER.serialize(actorTimer));
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ });
+ }
+
+ /**
+ * Unregisters an Actor timer.
+ *
+ * @param timerName Name of Timer to be unregistered.
+ * @return Asynchronous void response.
+ */
+ protected Mono unregisterTimer(String timerName) {
+ return Mono.fromSupplier(() -> getActorTimer(timerName))
+ .flatMap(actorTimer -> this.actorRuntimeContext.getDaprClient().unregisterActorTimer(
+ this.actorRuntimeContext.getActorTypeInformation().getName(),
+ this.id.toString(),
+ timerName))
+ .then(Mono.fromRunnable(() -> this.timers.remove(timerName)));
+ }
+
+ /**
+ * Unregisters a Reminder.
+ *
+ * @param reminderName Name of Reminder to be unregistered.
+ * @return Asynchronous void response.
+ */
+ protected Mono unregisterReminder(String reminderName) {
+ return this.actorRuntimeContext.getDaprClient().unregisterActorReminder(
+ this.actorRuntimeContext.getActorTypeInformation().getName(),
+ this.id.toString(),
+ reminderName);
+ }
+
+ /**
+ * Callback function invoked after an Actor has been activated.
+ *
+ * @return Asynchronous void response.
+ */
+ protected Mono onActivate() {
+ return Mono.empty();
+ }
+
+ /**
+ * Callback function invoked after an Actor has been deactivated.
+ *
+ * @return Asynchronous void response.
+ */
+ protected Mono onDeactivate() {
+ return Mono.empty();
+ }
+
+ /**
+ * Callback function invoked before method is invoked.
+ *
+ * @param actorMethodContext Method context.
+ * @return Asynchronous void response.
+ */
+ protected Mono onPreActorMethod(ActorMethodContext actorMethodContext) {
+ return Mono.empty();
+ }
+
+ /**
+ * Callback function invoked after method is invoked.
+ *
+ * @param actorMethodContext Method context.
+ * @return Asynchronous void response.
+ */
+ protected Mono onPostActorMethod(ActorMethodContext actorMethodContext) {
+ return Mono.empty();
+ }
+
+ /**
+ * Saves the state of this Actor.
+ *
+ * @return Asynchronous void response.
+ */
+ protected Mono saveState() {
+ return this.actorStateManager.save();
+ }
+
+ /**
+ * Resets the cached state of this Actor.
+ */
+ void rollback() {
+ if (!this.started) {
+ throw new IllegalStateException("Cannot reset state before starting call.");
+ }
+
+ this.resetState();
+ this.started = false;
+ }
+
+ /**
+ * Resets the cached state of this Actor.
+ */
+ void resetState() {
+ this.actorStateManager.clear();
+ }
+
+ /**
+ * Gets a given timer by name.
+ *
+ * @param timerName Timer name.
+ * @return Asynchronous void response.
+ */
+ ActorTimer getActorTimer(String timerName) {
+ return timers.getOrDefault(timerName, null);
+ }
+
+ /**
+ * Internal callback when an Actor is activated.
+ *
+ * @return Asynchronous void response.
+ */
+ Mono onActivateInternal() {
+ return Mono.fromRunnable(() -> {
+ this.actorTrace.writeInfo(TRACE_TYPE, this.id.toString(), "Activating ...");
+ this.resetState();
+ }).then(this.onActivate())
+ .then(this.doWriteInfo(TRACE_TYPE, this.id.toString(), "Activated"))
+ .then(this.saveState());
+ }
+
+ /**
+ * Internal callback when an Actor is deactivated.
+ *
+ * @return Asynchronous void response.
+ */
+ Mono onDeactivateInternal() {
+ this.actorTrace.writeInfo(TRACE_TYPE, this.id.toString(), "Deactivating ...");
+
+ return Mono.fromRunnable(() -> this.resetState())
+ .then(this.onDeactivate())
+ .then(this.doWriteInfo(TRACE_TYPE, this.id.toString(), "Deactivated"));
+ }
+
+ /**
+ * Internal callback prior to method be invoked.
+ *
+ * @param actorMethodContext Method context.
+ * @return Asynchronous void response.
+ */
+ Mono onPreActorMethodInternal(ActorMethodContext actorMethodContext) {
+ return Mono.fromRunnable(() -> {
+ if (this.started) {
+ throw new IllegalStateException("Cannot invoke a method before completing previous call.");
+ }
+
+ this.started = true;
+ }).then(this.onPreActorMethod(actorMethodContext));
+ }
+
+ /**
+ * Internal callback after method is invoked.
+ *
+ * @param actorMethodContext Method context.
+ * @return Asynchronous void response.
+ */
+ Mono onPostActorMethodInternal(ActorMethodContext actorMethodContext) {
+ return Mono.fromRunnable(() -> {
+ if (!this.started) {
+ throw new IllegalStateException("Cannot complete a method before starting a call.");
+ }
+ }).then(this.onPostActorMethod(actorMethodContext))
+ .then(this.saveState())
+ .then(Mono.fromRunnable(() -> {
+ this.started = false;
+ }));
+ }
+
+ /**
+ * Internal method to emit a trace message.
+ *
+ * @param type Type of trace message.
+ * @param id Identifier of entity relevant for the trace message.
+ * @param message Message to be logged.
+ * @return Asynchronous void response.
+ */
+ private Mono doWriteInfo(String type, String id, String message) {
+ return Mono.fromRunnable(() -> this.actorTrace.writeInfo(type, id, message));
+ }
+
+}
diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorCallType.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorCallType.java
new file mode 100644
index 000000000..b0293b58e
--- /dev/null
+++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorCallType.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.actors.runtime;
+
+/**
+ * Represents the call-type associated with the method invoked by actor runtime.
+ */
+enum ActorCallType {
+
+ /**
+ * Specifies that the method invoked is an actor interface method for a given
+ * client request.
+ */
+ ACTOR_INTERFACE_METHOD,
+ /**
+ * Specifies that the method invoked is a timer callback method.
+ */
+ TIMER_METHOD,
+ /**
+ * Specifies that the method is when a reminder fires.
+ */
+ REMINDER_METHOD
+}
diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorFactory.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorFactory.java
new file mode 100644
index 000000000..8b34432e2
--- /dev/null
+++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) Microsoft Corporation.
+ * Licensed under the MIT License.
+ */
+
+package io.dapr.actors.runtime;
+
+import io.dapr.actors.ActorId;
+
+/**
+ * Creates an actor of a given type.
+ *
+ * @param Actor Type to be created.
+ */
+@FunctionalInterface
+public interface ActorFactory {
+
+ /**
+ * Creates an Actor.
+ *
+ * @param actorRuntimeContext Actor type's context in the runtime.
+ * @param actorId Actor Id.
+ * @return Actor or null it failed.
+ */
+ T createActor(ActorRuntimeContext actorRuntimeContext, ActorId actorId);
+}
diff --git a/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java
new file mode 100644
index 000000000..8acc020a8
--- /dev/null
+++ b/sdk-actors/src/main/java/io/dapr/actors/runtime/ActorManager.java
@@ -0,0 +1,342 @@
+package io.dapr.actors.runtime;
+
+import io.dapr.actors.ActorId;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import reactor.core.publisher.Mono;
+
+/**
+ * Manages actors of a specific type.
+ */
+class ActorManager {
+
+ /**
+ * Serializer for internal Dapr objects.
+ */
+ private static final ObjectSerializer OBJECT_SERIALIZER = new ObjectSerializer();
+
+ /**
+ * Context for the Actor runtime.
+ */
+ private final ActorRuntimeContext runtimeContext;
+
+ /**
+ * Methods found in Actors.
+ */
+ private final ActorMethodInfoMap actorMethods;
+
+ /**
+ * Active Actor instances.
+ */
+ private final Map activeActors;
+
+ /**
+ * Instantiates a new manager for a given actor referenced in the runtimeContext.
+ *
+ * @param runtimeContext Runtime context for the Actor.
+ */
+ ActorManager(ActorRuntimeContext runtimeContext) {
+ this.runtimeContext = runtimeContext;
+ this.actorMethods = new ActorMethodInfoMap(runtimeContext.getActorTypeInformation().getInterfaces());
+ this.activeActors = Collections.synchronizedMap(new HashMap<>());
+ }
+
+ /**
+ * Activates an Actor.
+ *
+ * @param actorId Actor identifier.
+ * @return Asynchronous void response.
+ */
+ Mono activateActor(ActorId actorId) {
+ return Mono.fromSupplier(() -> this.runtimeContext.getActorFactory().createActor(runtimeContext, actorId))
+ .flatMap(actor -> actor.onActivateInternal().then(this.onActivatedActor(actorId, actor)));
+ }
+
+ /**
+ * Deactivates an Actor.
+ *
+ * @param actorId Actor identifier.
+ * @return Asynchronous void response.
+ */
+ Mono deactivateActor(ActorId actorId) {
+ return Mono.fromSupplier(() -> this.activeActors.remove(actorId)).flatMap(actor -> actor.onDeactivateInternal());
+ }
+
+ /**
+ * Invokes reminder for Actor.
+ *
+ * @param actorId Identifier for Actor being invoked.
+ * @param reminderName Name of reminder being invoked.
+ * @param params Parameters for the reminder.
+ * @return Asynchronous void response.
+ */
+ Mono invokeReminder(ActorId actorId, String reminderName, byte[] params) {
+ return Mono.fromSupplier(() -> {
+ if (!this.runtimeContext.getActorTypeInformation().isRemindable()) {
+ return null;
+ }
+
+ try {
+ return OBJECT_SERIALIZER.deserialize(params, ActorReminderParams.class);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }).flatMap(p ->
+ invoke(actorId,
+ ActorMethodContext.createForReminder(reminderName),
+ actor -> doReminderInvokation((Remindable) actor, reminderName, p))).then();
+ }
+
+ /**
+ * Invokes a timer for a given Actor.
+ *
+ * @param actorId Identifier for Actor.
+ * @param timerName Name of timer being invoked.
+ * @return Asynchronous void response.
+ */
+ Mono invokeTimer(ActorId actorId, String timerName) {
+ return Mono.fromSupplier(() -> {
+ AbstractActor actor = this.activeActors.getOrDefault(actorId, null);
+ if (actor == null) {
+ throw new IllegalArgumentException(
+ String.format("Could not find actor %s of type %s.",
+ actorId.toString(),
+ this.runtimeContext.getActorTypeInformation().getName()));
+ }
+
+ ActorTimer actorTimer = actor.getActorTimer(timerName);
+ if (actorTimer == null) {
+ throw new IllegalStateException(
+ String.format("Could not find timer %s for actor %s.",
+ timerName,
+ this.runtimeContext.getActorTypeInformation().getName()));
+ }
+
+ return actorTimer;
+ }).flatMap(actorTimer -> invokeMethod(
+ actorId,
+ ActorMethodContext.createForTimer(actorTimer.getName()),
+ actorTimer.getCallback(),
+ actorTimer.getState()))
+ .then();
+ }
+
+ /**
+ * Internal callback for when Actor is activated.
+ *
+ * @param actorId Actor identifier.
+ * @param actor Actor's instance.
+ * @return Asynchronous void response.
+ */
+ private Mono onActivatedActor(ActorId actorId, T actor) {
+ return Mono.fromRunnable(() -> this.activeActors.put(actorId, actor));
+ }
+
+ /**
+ * Internal method to actually invoke a reminder.
+ *
+ * @param actor Actor that owns the reminder.
+ * @param reminderName Name of the reminder.
+ * @param reminderParams Params for the reminder.
+ * @return Asynchronous void response.
+ */
+ private Mono doReminderInvokation(
+ Remindable actor,
+ String reminderName,
+ ActorReminderParams reminderParams) {
+ return Mono.fromSupplier(() -> {
+ if (actor == null) {
+ throw new IllegalArgumentException("actor is mandatory.");
+ }
+ if (reminderName == null) {
+ throw new IllegalArgumentException("reminderName is mandatory.");
+ }
+ if (reminderParams == null) {
+ throw new IllegalArgumentException("reminderParams is mandatory.");
+ }
+
+ return true;
+ }).flatMap(x -> {
+ try {
+ Object data = this.runtimeContext.getObjectSerializer().deserialize(
+ reminderParams.getData(),
+ actor.getStateType());
+ return actor.receiveReminder(
+ reminderName,
+ data,
+ reminderParams.getDueTime(),
+ reminderParams.getPeriod());
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }).thenReturn(true);
+ }
+
+ /**
+ * Invokes a given method in the Actor.
+ *
+ * @param actorId Identifier for Actor being invoked.
+ * @param methodName Name of method being invoked.
+ * @param request Input object for the method being invoked.
+ * @return Asynchronous void response.
+ */
+ Mono invokeMethod(ActorId actorId, String methodName, byte[] request) {
+ return invokeMethod(actorId, null, methodName, request);
+ }
+
+ /**
+ * Internal method to actually invoke Actor's timer method.
+ *
+ * @param actorId Identifier for the Actor.
+ * @param context Method context to be invoked.
+ * @param methodName Method name to be invoked.
+ * @param input Input object to be passed in to the invoked method.
+ * @return Asynchronous void response.
+ */
+ private Mono