More Actors Stuff (#57)

* ActorManager

* More work done.

* Adding example for actor runtime service.

* Implements ActorStateManager + fixes + javadocs.
This commit is contained in:
Artur Souza 2019-12-26 18:08:01 -08:00 committed by GitHub
parent b229d462e9
commit 64bb4bfb3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 2217 additions and 917 deletions

View File

@ -18,10 +18,17 @@
<properties>
<protobuf.output.directory>${project.build.directory}/generated-sources</protobuf.output.directory>
<protobuf.input.directory>${project.parent.basedir}/proto</protobuf.input.directory>
<java.version>1.11</java.version>
<java.version>11</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.undertow</groupId>
<artifactId>undertow-servlet</artifactId>
<version>2.0.26.Final</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
@ -101,6 +108,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -9,5 +9,6 @@ package io.dapr.examples.actors.http;
* Example of implementation of an Actor.
*/
public interface DemoActor {
// TODO.
String say(String something);
}

View File

@ -5,9 +5,39 @@
package io.dapr.examples.actors.http;
import io.dapr.actors.ActorId;
import io.dapr.actors.runtime.AbstractActor;
import io.dapr.actors.runtime.Actor;
import io.dapr.actors.runtime.ActorRuntimeContext;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.TimeZone;
/**
* Implementation of the DemoActor for the server side.
*/
public class DemoActorImpl {
// TODO.
public class DemoActorImpl extends AbstractActor implements DemoActor, Actor {
/**
* Format to output date and time.
*/
private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
public DemoActorImpl(ActorRuntimeContext runtimeContext, ActorId id) {
super(runtimeContext, id);
}
@Override
public String say(String something) {
Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());
// Handles the request by printing message.
System.out.println("Server: " + something == null ? "" : something + " @ " + utcNowAsString);
// Now respond with current timestamp.
return utcNowAsString;
}
}

View File

@ -5,12 +5,193 @@
package io.dapr.examples.actors.http;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.actors.runtime.ActorRuntime;
import io.undertow.Undertow;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.RoutingHandler;
import io.undertow.util.Headers;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.Deque;
import java.util.Map;
/**
* Service for Actor runtime.
* 1. Build and install jars:
* mvn clean install
* 2. Run in server mode:
* dapr run --app-id hellogrpc --app-port 3000 --port 3005 -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.actors.http.DemoActorService -Dexec.args="-p 3000"
*/
public class DemoActorService {
private static final JsonFactory JSON_FACTORY = new JsonFactory();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final HttpHandler ROUTES = new RoutingHandler()
.get("/", DemoActorService::handleDaprConfig)
.get("/dapr/config", DemoActorService::handleDaprConfig)
.post("/actors/{actorType}/{id}", DemoActorService::handleActorActivate)
.delete("/actors/{actorType}/{id}", DemoActorService::handleActorDeactivate)
.put("/actors/{actorType}/{id}/method/{methodName}", DemoActorService::handleActorInvoke);
private final int port;
private final Undertow server;
private DemoActorService(int port) {
this.port = port;
this.server = Undertow
.builder()
.addHttpListener(port, "localhost")
.setHandler(ROUTES)
.build();
ActorRuntime.getInstance().registerActor(DemoActorImpl.class);
}
private void start() {
// Now we handle ctrl+c (or any other JVM shutdown)
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("Server: shutting down gracefully ...");
DemoActorService.this.server.stop();
System.out.println("Server: Bye.");
}
});
System.out.println(String.format("Server: listening on port %d ...", this.port));
this.server.start();
}
private static void handleDaprConfig(HttpServerExchange exchange) throws IOException {
exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/json");
String result = "";
try (Writer writer = new StringWriter()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
generator.writeStartObject();
generator.writeArrayFieldStart("entities");
for(String actorClass : ActorRuntime.getInstance().getRegisteredActorTypes()) {
generator.writeString(actorClass);
}
generator.writeEndArray();
generator.writeStringField("actorIdleTimeout", "10s");
generator.writeStringField("actorScanInterval", "1s");
generator.writeStringField("drainOngoingCallTimeout", "1s");
generator.writeBooleanField("drainBalancedActors", true);
generator.writeEndObject();
generator.close();
writer.flush();
result = writer.toString();
}
exchange.getResponseSender().send(result);
}
private static void handleActorActivate(HttpServerExchange exchange) {
if (exchange.isInIoThread()) {
exchange.dispatch(DemoActorService::handleActorActivate);
return;
}
String actorType = findParamValueOrNull(exchange, "actorType");
String actorId = findParamValueOrNull(exchange, "id");
ActorRuntime.getInstance().activate(actorType, actorId).block();
exchange.getResponseSender().send("");
}
private static void handleActorDeactivate(HttpServerExchange exchange) {
if (exchange.isInIoThread()) {
exchange.dispatch(DemoActorService::handleActorDeactivate);
return;
}
String actorType = findParamValueOrNull(exchange, "actorType");
String actorId = findParamValueOrNull(exchange, "id");
ActorRuntime.getInstance().deactivate(actorType, actorId).block();
}
private static void handleActorInvoke(HttpServerExchange exchange) throws IOException {
if (exchange.isInIoThread()) {
exchange.dispatch(DemoActorService::handleActorInvoke);
return;
}
String actorType = findParamValueOrNull(exchange, "actorType");
String actorId = findParamValueOrNull(exchange, "id");
String methodName = findParamValueOrNull(exchange, "methodName");
exchange.startBlocking();
String data = findData(exchange.getInputStream());
String result = ActorRuntime.getInstance().invoke(actorType, actorId, methodName, data).block();
exchange.getResponseSender().send(buildResponse(result));
}
private static String findParamValueOrNull(HttpServerExchange exchange, String name) {
Map<String, Deque<String>> params = exchange.getQueryParameters();
if (params == null) {
return null;
}
Deque<String> values = params.get(name);
if ((values == null) || (values.isEmpty())) {
return null;
}
return values.getFirst();
}
private static String findData(InputStream stream) throws IOException {
JsonNode root = OBJECT_MAPPER.readTree(stream);
if (root == null) {
return null;
}
JsonNode dataNode = root.get("data");
if (dataNode == null) {
return null;
}
return new String(dataNode.binaryValue(), StandardCharsets.UTF_8);
}
private static String buildResponse(String data) throws IOException {
try (Writer writer = new StringWriter()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
generator.writeStartObject();
if (data != null) {
generator.writeBinaryField("data", data.getBytes());
}
generator.writeEndObject();
generator.close();
writer.flush();
return writer.toString();
}
}
public static void main(String[] args) throws Exception {
// TODO
Options options = new Options();
options.addRequiredOption("p", "port", true, "Port to listen to.");
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);
// If port string is not valid, it will throw an exception.
int port = Integer.parseInt(cmd.getOptionValue("port"));
final DemoActorService service = new DemoActorService(port);
service.start();
}
}

View File

@ -125,7 +125,7 @@ public class HelloWorldService {
public static void main(String[] args) throws Exception {
Options options = new Options();
options.addRequiredOption("p", "port", true, "Port to listen or send event to.");
options.addRequiredOption("p", "port", true, "Port to listen to.");
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);

View File

@ -5,13 +5,16 @@
package io.dapr.actors;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URL;
import java.util.UUID;
import okhttp3.*;
import reactor.core.publisher.Mono;
// base class of hierarchy
import java.io.IOException;
import java.net.URL;
import java.util.UUID;
/**
* Base for Dapr HTTP Client.
*/
public abstract class AbstractDaprClient {
/**

View File

@ -35,41 +35,14 @@ public class ActorId extends Object implements Comparable<ActorId> {
}
/**
* Returns the id of the actor as {link #java.lang.String}
*
* @return ActorID as {link #java.lang.String}
* @return The String representation of this ActorId
*/
public String getStringId() {
@Override
public String toString() {
return this.stringId;
}
/**
* Creates a new ActorId with a random id.
*
* @return A new ActorId with a random id.
*/
public static ActorId createRandom() {
UUID id = UUID.randomUUID();
return new ActorId(id.toString());
}
/**
* Determines whether two specified actorIds have the same id.
*
* @param id1 The first actorId to compare, or null
* @param id2 The second actorId to compare, or null.
* @return true if the id is same for both objects; otherwise, false.
*/
static public boolean equals(ActorId id1, ActorId id2) {
if (id1 == null && id2 == null) {
return true;
} else if (id2 == null || id1 == null) {
return false;
} else {
return hasEqualContent(id1, id2);
}
}
/**
* Compares this instance with a specified {link #ActorId} object and
* indicates whether this instance precedes, follows, or appears in the same
@ -88,35 +61,6 @@ public class ActorId extends Object implements Comparable<ActorId> {
: compareContent(this, other);
}
/**
*
* @param id1
* @param id2
* @return true if the two ActorId's are equal
*/
static private boolean hasEqualContent(ActorId id1, ActorId id2) {
return id1.getStringId().equalsIgnoreCase(id2.getStringId());
}
/**
*
* @param id1
* @param id2
* @return -1, 0, or 1 depending on the compare result of the stringId member.
*/
private int compareContent(ActorId id1, ActorId id2) {
return id1.getStringId().compareToIgnoreCase(id2.getStringId());
}
/**
*
* @return The String representation of this ActorId
*/
@Override
public String toString() {
return this.stringId;
}
/**
*
* @return The hash code of this ActorId
@ -127,7 +71,17 @@ public class ActorId extends Object implements Comparable<ActorId> {
}
/**
*
* Compare if the content of two ids are the same.
* @param id1 One identifier.
* @param id2 Another identifier.
* @return -1, 0, or 1 depending on the compare result of the stringId member.
*/
private int compareContent(ActorId id1, ActorId id2) {
return id1.stringId.compareTo(id2.stringId);
}
/**
* Checks if this instance is equals to the other instance.
* @return true if the 2 ActorId's are equal.
*/
@Override
@ -146,4 +100,42 @@ public class ActorId extends Object implements Comparable<ActorId> {
return hasEqualContent(this, (ActorId) obj);
}
/**
* Creates a new ActorId with a random id.
*
* @return A new ActorId with a random id.
*/
public static ActorId createRandom() {
UUID id = UUID.randomUUID();
return new ActorId(id.toString());
}
/**
* Determines whether two specified actorIds have the same id.
*
* @param id1 The first actorId to compare, or null
* @param id2 The second actorId to compare, or null.
* @return true if the id is same for both objects; otherwise, false.
*/
private static boolean equals(ActorId id1, ActorId id2) {
if (id1 == null && id2 == null) {
return true;
} else if (id2 == null || id1 == null) {
return false;
} else {
return hasEqualContent(id1, id2);
}
}
/**
* Compares if two actors have the same content.
*
* @param id1 One identifier.
* @param id2 Another identifier.
* @return true if the two ActorId's are equal
*/
private static boolean hasEqualContent(ActorId id1, ActorId id2) {
return id1.stringId.equals(id2.stringId);
}
}

View File

@ -2,22 +2,85 @@
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors;
import java.util.logging.Level;
import java.util.logging.Logger;
// TODO: Implement distributed tracing.
// TODO: Make this generic to the SDK and not only for Actors.
/**
* Stub
* Class to emit trace log messages.
*/
public class ActorTrace {
public final class ActorTrace {
public static void WriteInfo(String text) {
System.out.println(text);
/**
* Gets the default Logger.
*/
private static final Logger LOGGER = Logger.getLogger(ActorTrace.class.getName());
/**
* Writes an information trace log.
* @param type Type of log.
* @param id Instance identifier.
* @param msgFormat Message or message format (with type and id input as well).
* @param params Params for the message.
*/
public void writeInfo(String type, String id, String msgFormat, Object... params) {
this.write(Level.INFO, type, id, msgFormat, params);
}
public static void WriteWarning(String text) {
System.out.println("Warning: " + text);
/**
* Writes an warning trace log.
* @param type Type of log.
* @param id Instance identifier.
* @param msgFormat Message or message format (with type and id input as well).
* @param params Params for the message.
*/
public void writeWarning(String type, String id, String msgFormat, Object... params) {
this.write(Level.WARNING, type, id, msgFormat, params);
}
public static void WriteError(String text) {
System.err.println(text);
/**
* Writes an error trace log.
* @param type Type of log.
* @param id Instance identifier.
* @param msgFormat Message or message format (with type and id input as well).
* @param params Params for the message.
*/
public void writeError(String type, String id, String msgFormat, Object... params) {
this.write(Level.SEVERE, type, id, msgFormat, params);
}
/**
* Writes a trace log.
* @param level Severity level of the log.
* @param type Type of log.
* @param id Instance identifier.
* @param msgFormat Message or message format (with type and id input as well).
* @param params Params for the message.
*/
private void write(Level level, String type, String id, String msgFormat, Object... params) {
String formatString = String.format("%s:%s %s", emptyIfNul(type), emptyIfNul(id), emptyIfNul(msgFormat));
if ((params == null) || (params.length == 0)) {
LOGGER.log(level, formatString);
} else {
LOGGER.log(level, String.format(formatString, params));
}
}
/**
* Utility method that returns empty if String is null.
* @param s String to be checked.
* @return String (if not null) or empty (if null).
*/
private static String emptyIfNul(String s) {
if (s == null) {
return "";
}
return s;
}
}

View File

@ -4,8 +4,269 @@
*/
package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import io.dapr.actors.ActorTrace;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* TODO - this is the base class Actor implementations (user code) will extend.
* Represents the base class for actors.
*
* The base type for actors, that provides the common functionality
* for actors that derive from {@link Actor}.
* The state is preserved across actor garbage collections and fail-overs.
*/
public abstract class AbstractActor {
/**
* Type of tracing messages.
*/
private static final String TRACE_TYPE = "Actor";
/**
* Context for the Actor runtime.
*/
private final ActorRuntimeContext<?> actorRuntimeContext;
/**
* Actor identifier.
*/
private final ActorId id;
/**
* Manager for the states in Actors.
*/
private final ActorStateManager actorStateManager;
/**
* Emits trace messages for Actors.
*/
private final ActorTrace actorTrace;
/**
* Registered timers for this Actor.
*/
private final Map<String, ActorTimer<?>> timers;
/**
* Instantiates a new Actor.
* @param runtimeContext Context for the runtime.
* @param id Actor identifier.
*/
protected AbstractActor(ActorRuntimeContext runtimeContext, ActorId id) {
this.actorRuntimeContext = runtimeContext;
this.id = id;
this.actorStateManager = new ActorStateManager(
runtimeContext.getStateProvider(),
runtimeContext.getActorTypeInformation().getName(),
id);
this.actorTrace = runtimeContext.getActorTrace();
this.timers = Collections.synchronizedMap(new HashMap<>());
}
/**
* Registers a reminder for this Actor.
* @param reminderName Name of the reminder.
* @param data Data to be send along with reminder triggers.
* @param dueTime Due time for the first trigger.
* @param period Frequency for the triggers.
* @return Asynchronous void response.
*/
protected Mono<Void> registerReminder(
String reminderName,
String data,
Duration dueTime,
Duration period) {
try {
ActorReminderParams params = new ActorReminderParams(data, dueTime, period);
String serialized = this.actorRuntimeContext.getActorSerializer().serialize(params);
return this.actorRuntimeContext.getDaprClient().registerReminder(
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
reminderName,
serialized);
} catch (IOException e) {
return Mono.error(e);
}
}
/**
* Registers a Timer for the actor. A timer name is autogenerated by the runtime to keep track of it.
*
* @param timerName Name of the timer, unique per Actor (auto-generated if null).
* @param methodName Name of the method to be called.
* @param state State object to be passed it to the method when timer triggers.
* @param dueTime The amount of time to delay before the async callback is first invoked.
* Specify negative one (-1) milliseconds to prevent the timer from starting.
* Specify zero (0) to start the timer immediately.
* @param period The time interval between invocations of the async callback.
* Specify negative one (-1) milliseconds to disable periodic signaling.
* @param <S> Type for the state object.
* @return Asynchronous result.
*/
protected <S> Mono<Void> registerActorTimer(
String timerName,
String methodName,
S state,
Duration dueTime,
Duration period) {
String name = timerName;
if ((timerName == null) || (timerName.isEmpty())) {
name = String.format("%s_Timer_%d", this.id.toString(), this.timers.size() + 1);
}
ActorTimer<S> actorTimer = new ActorTimer(this, name, methodName, state, dueTime, period);
String serializedTimer = null;
try {
serializedTimer = this.actorRuntimeContext.getActorSerializer().serialize(actorTimer);
} catch (IOException e) {
return Mono.error(e);
}
this.timers.put(name, actorTimer);
return this.actorRuntimeContext.getDaprClient().registerTimer(
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
name,
serializedTimer);
}
/**
* Unregisters an Actor timer.
* @param actorTimer Timer to be unregistered.
* @return Asynchronous void response.
*/
protected Mono<Void> unregister(ActorTimer<?> actorTimer) {
return this.actorRuntimeContext.getDaprClient().unregisterTimer(
this.actorRuntimeContext.getActorTypeInformation().getName(),
this.id.toString(),
actorTimer.getName())
.then(this.onUnregisteredTimer(actorTimer));
}
/**
* Callback function invoked after an Actor has been activated.
* @return Asynchronous void response.
*/
protected Mono<Void> onActivate() { return Mono.empty(); }
/**
* Callback function invoked after an Actor has been deactivated.
* @return Asynchronous void response.
*/
protected Mono<Void> onDeactivate() { return Mono.empty(); }
/**
* Callback function invoked before method is invoked.
* @param actorMethodContext Method context.
* @return Asynchronous void response.
*/
protected Mono<Void> onPreActorMethod(ActorMethodContext actorMethodContext) {
return Mono.empty();
}
/**
* Callback function invoked after method is invoked.
* @param actorMethodContext Method context.
* @return Asynchronous void response.
*/
protected Mono<Void> onPostActorMethod(ActorMethodContext actorMethodContext) {
return Mono.empty();
}
/**
* Saves the state of this Actor.
* @return Asynchronous void response.
*/
protected Mono<Void> saveState() {
return this.actorStateManager.save();
}
/**
* Resets the state of this Actor.
* @return Asynchronous void response.
*/
Mono<Void> resetState() { return this.actorStateManager.clear(); }
/**
* Gets a given timer by name.
* @param timerName Timer name.
* @return Asynchronous void response.
*/
ActorTimer getActorTimer(String timerName)
{
return timers.getOrDefault(timerName, null);
}
/**
* Internal callback when an Actor is activated.
* @return Asynchronous void response.
*/
Mono<Void> onActivateInternal() {
this.actorTrace.writeInfo(TRACE_TYPE, this.id.toString(), "Activating ...");
return this.resetState()
.then(this.onActivate())
.then(this.doWriteInfo(TRACE_TYPE, this.id.toString(), "Activated"))
.then(this.saveState());
}
/**
* Internal callback when an Actor is deactivated.
* @return Asynchronous void response.
*/
Mono<Void> onDeactivateInternal() {
this.actorTrace.writeInfo(TRACE_TYPE, this.id.toString(), "Deactivating ...");
return this.resetState()
.then(this.onDeactivate())
.then(this.doWriteInfo(TRACE_TYPE, this.id.toString(), "Deactivated"))
.then(this.saveState());
}
/**
* Internal callback prior to method be invoked.
* @param actorMethodContext Method context.
* @return Asynchronous void response.
*/
Mono<Void> onPreActorMethodInternal(ActorMethodContext actorMethodContext) {
return this.onPreActorMethod(actorMethodContext);
}
/**
* Internal callback after method is invoked.
* @param actorMethodContext Method context.
* @return Asynchronous void response.
*/
Mono<Void> onPostActorMethodInternal(ActorMethodContext actorMethodContext) {
return this.onPostActorMethod(actorMethodContext)
.then(this.saveState());
}
/**
* Internal callback for when Actor timer is unregistered.
* @param timer Timer being unregistered.
* @return Asynchronous void response.
*/
Mono<Void> onUnregisteredTimer(ActorTimer<?> timer) {
this.timers.remove(timer.getName());
return Mono.empty();
}
/**
* Internal method to emit a trace message.
* @param type Type of trace message.
* @param id Identifier of entity relevant for the trace message.
* @param message Message to be logged.
* @return Asynchronous void response.
*/
private Mono<Void> doWriteInfo(String type, String id, String message) {
this.actorTrace.writeInfo(type, id, message);
return Mono.empty();
}
}

View File

@ -16,9 +16,9 @@ public interface ActorFactory<T extends AbstractActor> {
/**
* Creates an Actor.
* @param actorService Actor Service.
* @param actorRuntimeContext Actor type's context in the runtime.
* @param actorId Actor Id.
* @return Actor or null it failed.
*/
T createActor(ActorService actorService, ActorId actorId);
T createActor(ActorRuntimeContext<T> actorRuntimeContext, ActorId actorId);
}

View File

@ -1,9 +1,260 @@
package io.dapr.actors.runtime;
// stub
public class ActorManager {
import io.dapr.actors.ActorId;
import reactor.core.publisher.Mono;
public ActorManager(ActorService actorService) {
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
/**
* Manages actors of a specific type.
*/
class ActorManager<T extends AbstractActor> {
/**
* Context for the Actor runtime.
*/
private final ActorRuntimeContext<T> runtimeContext;
/**
* Methods found in Actors.
*/
private final ActorMethodInfoMap actorMethods;
/**
* Active Actor instances.
*/
private final Map<ActorId, T> activeActors;
/**
* Instantiates a new manager for a given actor referenced in the runtimeContext.
* @param runtimeContext Runtime context for the Actor.
*/
ActorManager(ActorRuntimeContext runtimeContext) {
this.runtimeContext = runtimeContext;
this.actorMethods = new ActorMethodInfoMap(runtimeContext.getActorTypeInformation().getInterfaces());
this.activeActors = Collections.synchronizedMap(new HashMap<>());
}
/**
* Activates an Actor.
* @param actorId Actor identifier.
* @return Asynchronous void response.
*/
Mono<Void> activateActor(ActorId actorId) {
T actor = this.runtimeContext.getActorFactory().createActor(runtimeContext, actorId);
return actor.onActivateInternal().then(this.onActivatedActor(actorId, actor));
}
/**
* Deactivates an Actor.
* @param actorId Actor identifier.
* @return Asynchronous void response.
*/
Mono<Void> deactivateActor(ActorId actorId) {
T actor = this.activeActors.remove(actorId);
if (actor != null) {
return actor.onDeactivateInternal();
}
return Mono.empty();
}
/**
* Invokes a given method in the Actor.
* @param actorId Identifier for Actor being invoked.
* @param methodName Name of method being invoked.
* @param request Input object for the method being invoked.
* @return Asynchronous void response.
*/
Mono<String> invokeMethod(ActorId actorId, String methodName, String request) {
return invokeMethod(actorId, null, methodName, request);
}
/**
* Invokes reminder for Actor.
* @param actorId Identifier for Actor being invoked.
* @param reminderName Name of reminder being invoked.
* @param request Input object for the reminder being invoked.
* @return Asynchronous void response.
*/
Mono<Void> invokeReminder(ActorId actorId, String reminderName, String request) {
if (!this.runtimeContext.getActorTypeInformation().isRemindable()) {
return Mono.empty();
}
try {
ActorReminderParams reminder = this.runtimeContext.getActorSerializer().deserialize(request, ActorReminderParams.class);
return invoke(
actorId,
ActorMethodContext.CreateForReminder(reminderName),
actor -> doReminderInvokation((Remindable)actor, reminderName, reminder))
.then();
} catch (Exception e) {
return Mono.error(e);
}
}
/**
* Invokes a timer for a given Actor.
* @param actorId Identifier for Actor.
* @param timerName Name of timer being invoked.
* @return Asynchronous void response.
*/
Mono<Void> invokeTimer(ActorId actorId, String timerName) {
try {
AbstractActor actor = this.activeActors.getOrDefault(actorId, null);
if (actor == null) {
throw new IllegalArgumentException(
String.format("Could not find actor %s of type %s.",
actorId.toString(),
this.runtimeContext.getActorTypeInformation().getName()));
}
ActorTimer<?> actorTimer = actor.getActorTimer(timerName);
if (actorTimer == null) {
throw new IllegalStateException(
String.format("Could not find timer %s for actor %s.",
timerName,
this.runtimeContext.getActorTypeInformation().getName()));
}
return invokeMethod(
actorId,
ActorMethodContext.CreateForTimer(timerName),
actorTimer.getMethodName(),
actorTimer.getState())
.then();
} catch (Exception e) {
return Mono.error(e);
}
}
/**
* Internal callback for when Actor is activated.
* @param actorId Actor identifier.
* @param actor Actor's instance.
* @return Asynchronous void response.
*/
private Mono<Void> onActivatedActor(ActorId actorId, T actor) {
this.activeActors.put(actorId, actor);
return Mono.empty();
}
/**
* Internal method to actually invoke a reminder.
* @param actor Actor that owns the reminder.
* @param reminderName Name of the reminder.
* @param reminderParams Params for the reminder.
* @return Asynchronous void response.
*/
private Mono<Void> doReminderInvokation(
Remindable actor,
String reminderName,
ActorReminderParams reminderParams) {
try {
Object data = this.runtimeContext.getActorSerializer().deserialize(
reminderParams.getData(),
actor.getReminderStateType());
return actor.receiveReminder(
reminderName,
data,
reminderParams.getDueTime(),
reminderParams.getPeriod());
} catch (IOException e) {
return Mono.error(e);
}
}
/**
* Internal method to actually invoke Actor's method.
* @param actorId Identifier for the Actor.
* @param context Method context to be invoked.
* @param methodName Method name to be invoked.
* @param request Input object to be passed in to the invoked method.
* @return Asynchronous void response.
*/
private Mono<String> invokeMethod(ActorId actorId, ActorMethodContext context, String methodName, Object request) {
ActorMethodContext actorMethodContext = context;
if (actorMethodContext == null) {
actorMethodContext = ActorMethodContext.CreateForActor(methodName);
}
return this.invoke(actorId, actorMethodContext, actor -> {
try {
// Finds the actor method with the given name and 1 or no parameter.
Method method = this.actorMethods.get(methodName);
Object response;
if (method.getParameterCount() == 0) {
response = method.invoke(actor);
} else {
// Actor methods must have a one or no parameter, which is guaranteed at this point.
Class<?> inputClass = method.getParameterTypes()[0];
if ((request != null) && !inputClass.isInstance(request)) {
// If request object is String, we deserialize it.
response = method.invoke(
actor,
this.runtimeContext.getActorSerializer().deserialize((String) request, inputClass));
} else {
// If input already of the right type, so we just cast it.
response = method.invoke(actor, inputClass.cast(request));
}
}
if (response == null) {
return Mono.empty();
}
if (response instanceof Mono) {
return ((Mono<Object>) response).map(r -> {
try {
return this.runtimeContext.getActorSerializer().serialize(r);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
// Method was not Mono, so we serialize response.
return Mono.just(this.runtimeContext.getActorSerializer().serialize(response));
} catch (Exception e) {
return Mono.error(e);
}
}).map(r -> r.toString());
}
/**
* Internal call to invoke a method, timer or reminder for an Actor.
* @param actorId Actor identifier.
* @param context Context for the method/timer/reminder call.
* @param func Function to perform the method call.
* @param <T> Expected return type for the function call.
* @return Asynchronous response for the returned object.
*/
private <T> Mono<T> invoke(ActorId actorId, ActorMethodContext context, Function<AbstractActor, Mono<T>> func) {
try {
AbstractActor actor = this.activeActors.getOrDefault(actorId, null);
if (actor == null) {
throw new IllegalArgumentException(
String.format("Could not find actor %s of type %s.",
actorId.toString(),
this.runtimeContext.getActorTypeInformation().getName()));
}
return actor.onPreActorMethodInternal(context).then(
func.apply(actor).flatMap(result -> actor.onPostActorMethodInternal(context).thenReturn(result))
);
} catch (Exception e) {
return Mono.error(e);
}
}
}

View File

@ -7,7 +7,7 @@ package io.dapr.actors.runtime;
/**
* Contains information about the method that is invoked by actor runtime.
*/
class ActorMethodContext {
public class ActorMethodContext {
/**
* Method name to be invoked.
@ -20,7 +20,7 @@ class ActorMethodContext {
private final ActorCallType callType;
/**
* Constructs a new instance of {@link ActorMethodContext}
* Constructs a new instance of {@link ActorMethodContext}, representing a call for an Actor.
*
* @param methodName Method name to be invoked.
* @param callType Call type to be used.

View File

@ -1,35 +1,54 @@
package io.dapr.actors.runtime;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Actor method dispatcher map. Holds method_name -> Method for methods defined in Actor interfaces.
* Actor method dispatcher map. Holds method_name -> Method for methods defined in Actor interfaces.
*/
class ActorMethodInfoMap {
private final HashMap<String, Method> methods;
/**
* Map for methods based on name.
*/
private final Map<String, Method> methods;
public <T> ActorMethodInfoMap(Iterable<Class<T>> interfaceTypes)
{
this.methods = new HashMap<String, Method>();
/**
* Instantiates a given Actor map based on the interfaces found in the class.
* @param interfaceTypes Interfaces found in the Actor class.
*/
ActorMethodInfoMap(Collection<Class<?>> interfaceTypes) {
Map<String, Method> methods = new HashMap<>();
// Find methods which are defined in Actor interface.
for (Class<T> actorInterface : interfaceTypes)
{
for (Method methodInfo : actorInterface.getMethods())
{
this.methods.put(methodInfo.getName(), methodInfo);
}
// Find methods which are defined in Actor interface.
for (Class<?> actorInterface : interfaceTypes) {
for (Method methodInfo : actorInterface.getMethods()) {
// Only support methods with 1 or 0 argument.
if (methodInfo.getParameterCount() <= 1) {
// If Actor class uses overloading, then one will win.
// Document this behavior, so users know how to write their code.
methods.put(methodInfo.getName(), methodInfo);
}
}
}
public Method LookupActorMethodInfo(String methodName) throws NoSuchMethodException
{
Method methodInfo = this.methods.get(methodName);
if (methodInfo == null) {
throw new NoSuchMethodException("Actor type doesn't contain method " + methodName);
}
this.methods = Collections.unmodifiableMap(methods);
}
return methodInfo;
/**
* Gets the Actor's method by name.
* @param methodName Name of the method.
* @return Method.
* @throws NoSuchMethodException If method is not found.
*/
Method get(String methodName) throws NoSuchMethodException {
Method method = this.methods.get(methodName);
if (method == null) {
throw new NoSuchMethodException(String.format("Could not find method %s.", methodName));
}
return method;
}
}

View File

@ -0,0 +1,98 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package io.dapr.actors.runtime;
import java.time.Duration;
/**
* Parameters for Actor Reminder.
*/
final class ActorReminderParams {
/**
* Minimum duration for period.
*/
private static final Duration MIN_TIME_PERIOD = Duration.ofMillis(-1);
/**
* Data to be passed in as part of the reminder trigger.
*/
private final String data;
/**
* Time the reminder is due for the 1st time.
*/
private final Duration dueTime;
/**
* Interval between triggers.
*/
private final Duration period;
/**
* Instantiates a new instance for the params of a reminder.
* @param data Data to be passed in as part of the reminder trigger.
* @param dueTime Time the reminder is due for the 1st time.
* @param period Interval between triggers.
*/
ActorReminderParams(String data, Duration dueTime, Duration period) {
ValidateDueTime("DueTime", dueTime);
ValidatePeriod("Period", period);
this.data = data;
this.dueTime = dueTime;
this.period = period;
}
/**
* Gets the time the reminder is due for the 1st time.
* @return Time the reminder is due for the 1st time.
*/
Duration getDueTime() {
return dueTime;
}
/**
* Gets the interval between triggers.
* @return Interval between triggers.
*/
Duration getPeriod() {
return period;
}
/**
* Gets the data to be passed in as part of the reminder trigger.
* @return Data to be passed in as part of the reminder trigger.
*/
String getData() {
return data;
}
/**
* Validates due time is valid, throws {@link IllegalArgumentException}.
* @param argName Name of the argument passed in.
* @param value Vale being checked.
*/
private static void ValidateDueTime(String argName, Duration value) {
if (value.compareTo(Duration.ZERO) < 0) {
String message = String.format(
"argName: %s - Duration toMillis() - specified value must be greater than %s", argName, Duration.ZERO);
throw new IllegalArgumentException(message);
}
}
/**
* Validates reminder period is valid, throws {@link IllegalArgumentException}.
* @param argName Name of the argument passed in.
* @param value Vale being checked.
*/
private static void ValidatePeriod(String argName, Duration value) throws IllegalArgumentException {
if (value.compareTo(MIN_TIME_PERIOD) < 0) {
String message = String.format(
"argName: %s - Duration toMillis() - specified value must be greater than %s", argName, MIN_TIME_PERIOD);
throw new IllegalArgumentException(message);
}
}
}

View File

@ -4,10 +4,14 @@
*/
package io.dapr.actors.runtime;
import io.dapr.actors.*;
import io.dapr.actors.ActorId;
import io.dapr.actors.ActorTrace;
import reactor.core.publisher.Mono;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Contains methods to register actor types. Registering the types allows the
@ -15,6 +19,16 @@ import java.util.HashMap;
*/
public class ActorRuntime {
/**
* A trace type used when logging.
*/
private static final String TRACE_TYPE = "ActorRuntime";
/**
* Tracing errors, warnings and info logs.
*/
private static final ActorTrace ACTOR_TRACE = new ActorTrace();
/**
* Gets an instance to the ActorRuntime. There is only 1.
*/
@ -23,17 +37,22 @@ public class ActorRuntime {
/**
* A client used to communicate from the actor to the Dapr runtime.
*/
private static AppToDaprAsyncClient appToDaprAsyncClient;
private final AppToDaprAsyncClient appToDaprAsyncClient;
/**
* A trace type used when logging.
* State provider for Dapr.
*/
private static final String TraceType = "ActorRuntime";
private final DaprStateAsyncProvider daprStateProvider;
/**
* Serializes/deserializes objects for Actors.
*/
private final ActorStateSerializer actorSerializer;
/**
* Map of ActorType --> ActorManager.
*/
private final HashMap<String, ActorManager> actorManagers;
private final Map<String, ActorManager> actorManagers;
/**
* The default constructor. This should not be called directly.
@ -45,8 +64,10 @@ public class ActorRuntime {
throw new IllegalStateException("ActorRuntime should only be constructed once");
}
this.actorManagers = new HashMap<String, ActorManager>();
appToDaprAsyncClient = new AppToDaprClientBuilder().buildAsyncClient();
this.actorManagers = Collections.synchronizedMap(new HashMap<>());
this.appToDaprAsyncClient = new AppToDaprClientBuilder().buildAsyncClient();
this.actorSerializer = new ActorStateSerializer();
this.daprStateProvider = new DaprStateAsyncProvider(this.appToDaprAsyncClient, this.actorSerializer);
}
/**
@ -67,8 +88,8 @@ public class ActorRuntime {
}
/**
*
* @return Actor type names registered with the runtime.
* Gets the Actor type names registered with the runtime.
* @return Actor type names.
*/
public Collection<String> getRegisteredActorTypes() {
return Collections.unmodifiableCollection(this.actorManagers.keySet());
@ -79,9 +100,10 @@ public class ActorRuntime {
*
* @param clazz The type of actor.
* @param <T> Actor class type.
* @return Async void task.
*/
public <T extends AbstractActor> void RegisterActor(Class<T> clazz) {
RegisterActor(clazz, null);
public <T extends AbstractActor> Mono<Void> registerActor(Class<T> clazz) {
return registerActor(clazz, null);
}
/**
@ -90,20 +112,25 @@ public class ActorRuntime {
* @param clazz The type of actor.
* @param actorFactory An optional factory to create actors.
* @param <T> Actor class type.
* @return Async void task.
* This can be used for dependency injection into actors.
*/
public <T extends AbstractActor> void RegisterActor(Class<T> clazz, ActorFactory actorFactory) {
ActorTypeInformation actorTypeInfo = ActorTypeInformation.create(clazz);
public <T extends AbstractActor> Mono<Void> registerActor(Class<T> clazz, ActorFactory<T> actorFactory) {
ActorTypeInformation<T> actorTypeInfo = ActorTypeInformation.create(clazz);
ActorFactory actualActorFactory = actorFactory != null ? actorFactory : new DefaultActorFactory<T>(actorTypeInfo);
// TODO: Refactor into a Builder class.
DaprStateAsyncProvider stateProvider = new DaprStateAsyncProvider(this.appToDaprAsyncClient, new ActorStateSerializer());
ActorService actorService = new ActorServiceImpl(actorTypeInfo, stateProvider, actualActorFactory);
ActorFactory<T> actualActorFactory = actorFactory != null ? actorFactory : new DefaultActorFactory<T>();
ActorRuntimeContext<T> context = new ActorRuntimeContext<T>(
this,
this.actorSerializer,
actualActorFactory,
actorTypeInfo,
this.appToDaprAsyncClient,
new DaprStateAsyncProvider(this.appToDaprAsyncClient, this.actorSerializer));
// Create ActorManagers, override existing entry if registered again.
synchronized (this.actorManagers) {
this.actorManagers.put(actorTypeInfo.getName(), new ActorManager(actorService));
}
this.actorManagers.put(actorTypeInfo.getName(), new ActorManager<T>(context));
return Mono.empty();
}
/**
@ -111,10 +138,10 @@ public class ActorRuntime {
*
* @param actorTypeName Actor type name to activate the actor for.
* @param actorId Actor id for the actor to be activated.
* @return Async void task.
*/
static void Activate(String actorTypeName, String actorId) {
// uncomment when ActorManager implemented
// return instance.GetActorManager(actorTypeName).ActivateActor(new ActorId(actorId));
public Mono<Void> activate(String actorTypeName, String actorId) {
return this.getActorManager(actorTypeName).flatMap(m -> m.activateActor(new ActorId(actorId)));
}
/**
@ -122,10 +149,10 @@ public class ActorRuntime {
*
* @param actorTypeName Actor type name to deactivate the actor for.
* @param actorId Actor id for the actor to be deactivated.
* @return Async void task.
*/
static void Deactivate(String actorTypeName, String actorId) {
// uncomment when ActorManager implemented
// return instance.GetActorManager(actorTypeName).DeactivateActor(new ActorId(actorId));
public Mono<Void> deactivate(String actorTypeName, String actorId) {
return this.getActorManager(actorTypeName).flatMap(m -> m.deactivateActor(new ActorId(actorId)));
}
/**
@ -135,13 +162,11 @@ public class ActorRuntime {
* @param actorTypeName Actor type name to invoke the method for.
* @param actorId Actor id for the actor for which method will be invoked.
* @param actorMethodName Method name on actor type which will be invoked.
* @param requestBodyStream Payload for the actor method.
* @param responseBodyStream Response for the actor method.
* @return
* @param request Payload for the actor method.
* @return Response for the actor method.
*/
static void Dispatch(String actorTypeName, String actorId, String actorMethodName, byte[] requestBodyStream, byte[] responseBodyStream) {
// uncomment when ActorManager implemented
// return instance.GetActorManager(actorTypeName).Dispatch(new ActorId(actorId), actorMethodName, requestBodyStream, responseBodyStream);
public Mono<String> invoke(String actorTypeName, String actorId, String actorMethodName, String request) {
return this.getActorManager(actorTypeName).flatMap(m -> m.invokeMethod(new ActorId(actorId), actorMethodName, request));
}
/**
@ -150,11 +175,11 @@ public class ActorRuntime {
* @param actorTypeName Actor type name to invoke the method for.
* @param actorId Actor id for the actor for which method will be invoked.
* @param reminderName The name of reminder provided during registration.
* @param requestBodyStream Payload for the actor method
* @param request Payload for the actor method
* @return Async void task.
*/
static void FireReminder(String actorTypeName, String actorId, String reminderName, byte[] requestBodyStream) {
// uncomment when ActorManager implemented
// return instance.GetActorManager(actorTypeName).FireReminder(new ActorId(actorId), reminderName, requestBodyStream);
public Mono<Void> invokeReminder(String actorTypeName, String actorId, String reminderName, String request) {
return this.getActorManager(actorTypeName).flatMap(m -> m.invokeReminder(new ActorId(actorId), reminderName, request));
}
/**
@ -163,22 +188,32 @@ public class ActorRuntime {
* @param actorTypeName Actor type name to invoke the method for.
* @param actorId Actor id for the actor for which method will be invoked.
* @param timerName The name of timer provided during registration.
* @return Async void task.
*/
static void FireTimer(String actorTypeName, String actorId, String timerName) {
// uncomment when ActorManager implemented
// return instance.GetActorManager(actorTypeName).FireTimerAsync(new ActorId(actorId), timerName);
public Mono<Void> invokeTimer(String actorTypeName, String actorId, String timerName) {
return this.getActorManager(actorTypeName).flatMap(m -> m.invokeTimer(new ActorId(actorId), timerName));
}
private ActorManager GetActorManager(String actorTypeName) throws IllegalStateException {
/**
* Finds the actor manager or errors out.
* @param actorTypeName Actor type for the actor manager to be found.
* @return Actor manager or error if not found.
*/
private Mono<ActorManager> getActorManager(String actorTypeName) {
ActorManager actorManager = this.actorManagers.get(actorTypeName);
if (actorManager == null) {
String errorMsg = String.format("Actor type %s is not registered with Actor runtime.", actorTypeName);
try {
if (actorManager == null) {
String errorMsg = String.format("Actor type %s is not registered with Actor runtime.", actorTypeName);
ActorTrace.WriteError(errorMsg);
throw new IllegalStateException(errorMsg);
ACTOR_TRACE.writeError(TRACE_TYPE, actorTypeName, "Actor type is not registered with runtime.");
throw new IllegalStateException(errorMsg);
}
} catch (IllegalStateException e) {
return Mono.error(e);
}
return actorManager;
return Mono.just(actorManager);
}
}

View File

@ -0,0 +1,123 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import io.dapr.actors.ActorTrace;
/**
* Provides the context for the Actor's runtime.
* @param <T> Actor's type for the context.
*/
public class ActorRuntimeContext<T extends AbstractActor> {
/**
* Runtime.
*/
private final ActorRuntime actorRuntime;
/**
* Serializer.
*/
private final ActorStateSerializer actorSerializer;
/**
* Actor factory.
*/
private final ActorFactory<T> actorFactory;
/**
* Information of the Actor's type.
*/
private final ActorTypeInformation<T> actorTypeInformation;
/**
* Trace for Actor logs.
*/
private final ActorTrace actorTrace;
/**
* Client to communicate to Dapr's API.
*/
private final AppToDaprAsyncClient daprClient;
/**
* State provider for given Actor Type.
*/
private final DaprStateAsyncProvider stateProvider;
/**
* Instantiates a new runtime context for the Actor type.
* @param actorRuntime Runtime.
* @param actorSerializer Serializer.
* @param actorFactory Factory for Actors.
* @param actorTypeInformation Information for Actor's type.
* @param daprClient Client to communicate to Dapr.
* @param stateProvider State provider for given Actor's type.
*/
ActorRuntimeContext(ActorRuntime actorRuntime,
ActorStateSerializer actorSerializer,
ActorFactory<T> actorFactory,
ActorTypeInformation<T> actorTypeInformation,
AppToDaprAsyncClient daprClient, DaprStateAsyncProvider stateProvider) {
this.actorRuntime = actorRuntime;
this.actorSerializer = actorSerializer;
this.actorFactory = actorFactory;
this.actorTypeInformation = actorTypeInformation;
this.actorTrace = new ActorTrace();
this.daprClient = daprClient;
this.stateProvider = stateProvider;
}
/**
* Gets the Actor's runtime.
* @return Actor's runtime.
*/
ActorRuntime getActorRuntime() {
return this.actorRuntime;
}
/**
* Gets the Actor's serializer.
* @return Actor's serializer.
*/
ActorStateSerializer getActorSerializer() {
return this.actorSerializer;
}
/**
* Gets the Actor's serializer.
* @return Actor's serializer.
*/
ActorFactory<T> getActorFactory() {
return this.actorFactory;
}
/**
* Gets the information about the Actor's type.
* @return Information about the Actor's type.
*/
ActorTypeInformation<T> getActorTypeInformation() {
return this.actorTypeInformation;
}
/**
* Gets the trace for Actor logs.
* @return Trace for Actor logs.
*/
ActorTrace getActorTrace() { return this.actorTrace; }
/**
* Gets the client to communicate to Dapr's API.
* @return Client to communicate to Dapr's API.
*/
AppToDaprAsyncClient getDaprClient() { return this.daprClient; }
/**
* Gets the state provider for given Actor's type.
* @return State provider for given Actor's type.
*/
DaprStateAsyncProvider getStateProvider() { return stateProvider; }
}

View File

@ -1,16 +0,0 @@
package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
/**
* Interface exposed to Actor's implementations (application layer).
*/
public interface ActorService {
/**
* Creates an actor.
* @param actorId Identifier for the Actor to be created.
* @return New Actor instance.
*/
AbstractActor createActor(ActorId actorId);
}

View File

@ -1,67 +0,0 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
/**
* Implementation of the Actor Service that contains a state provider.
*/
class ActorServiceImpl implements ActorService {
/**
* Customizable factory for Actors.
*/
private final ActorFactory actorFactory;
/**
* State provider for Actors.
*/
private final DaprStateAsyncProvider stateProvider;
/**
* Information on the {@link Actor} type being serviced.
*/
private final ActorTypeInformation actorTypeInformation;
/**
* Instantiates a stateful service for a given {@link Actor} type.
* @param actorTypeInformation Information on the {@link Actor} type being serviced.
* @param stateProvider State provider for Actors.
* @param actorFactory Customizable factor for Actors.
*/
public ActorServiceImpl(ActorTypeInformation actorTypeInformation, DaprStateAsyncProvider stateProvider, ActorFactory actorFactory) {
this.actorTypeInformation = actorTypeInformation;
this.actorFactory = actorFactory;
this.stateProvider = stateProvider;
}
/**
* Gets the state provider for {@link Actor}.
* @return State provider.
*/
DaprStateAsyncProvider getStateProvider() {
return stateProvider;
}
/**
* Gets the information on the {@link Actor} Type.
* @return Information on the {@link Actor} Type.
*/
ActorTypeInformation getActorTypeInformation() {
return actorTypeInformation;
}
/**
* Creates an {@link Actor} for this service.
* @param actorId Identifier for the Actor to be created.
* @return New {@link Actor} instance.
*/
@Override
public AbstractActor createActor(ActorId actorId) {
return this.actorFactory.createActor(this, actorId);
}
}

View File

@ -7,9 +7,8 @@ package io.dapr.actors.runtime;
/**
* Represents a state change for an actor.
* @param <T> Type of the value being changed.
*/
public final class ActorStateChange<T> {
public final class ActorStateChange {
/**
* Name of the state being changed.
@ -19,7 +18,7 @@ public final class ActorStateChange<T> {
/**
* New value for the state being changed.
*/
private final T value;
private final Object value;
/**
* Type of change {@link ActorStateChangeKind}.
@ -32,7 +31,7 @@ public final class ActorStateChange<T> {
* @param value New value for the state being changed.
* @param changeKind Kind of change.
*/
ActorStateChange(String stateName, T value, ActorStateChangeKind changeKind) {
ActorStateChange(String stateName, Object value, ActorStateChangeKind changeKind) {
this.stateName = stateName;
this.value = value;
this.changeKind = changeKind;
@ -50,7 +49,7 @@ public final class ActorStateChange<T> {
* Gets the new value of the state being changed.
* @return New value.
*/
T getValue() {
Object getValue() {
return value;
}

View File

@ -0,0 +1,307 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import io.dapr.actors.ActorId;
import reactor.core.publisher.Mono;
import java.util.*;
/**
* Manages state changes of a given Actor instance.
*
* All changes are cached in-memory until save() is called.
*/
class ActorStateManager {
/**
* Provides states using a state store.
*/
private final DaprStateAsyncProvider stateProvider;
/**
* Name of the Actor's type.
*/
private final String actorTypeName;
/**
* Actor's identifier.
*/
private final ActorId actorId;
/**
* Cache of state changes in this Actor's instance.
*/
private final Map<String, StateChangeMetadata> stateChangeTracker;
/**
* Instantiates a new state manager for the given Actor's instance.
* @param stateProvider State store provider.
* @param actorTypeName Name of Actor's type.
* @param actorId Actor's identifier.
*/
ActorStateManager(DaprStateAsyncProvider stateProvider, String actorTypeName, ActorId actorId) {
this.stateProvider = stateProvider;
this.actorTypeName = actorTypeName;
this.actorId = actorId;
this.stateChangeTracker = new HashMap<>();
}
/**
* Adds a given key/value to the Actor's state store's cache.
* @param stateName Name of the state being added.
* @param value Value to be added.
* @param <T> Type of the object being added.
* @return Asynchronous void operation.
*/
<T> Mono<Void> add(String stateName, T value) {
try {
if (stateName == null) {
throw new IllegalArgumentException("State's name cannot be null.");
}
if (this.stateChangeTracker.containsKey(stateName)) {
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);
if (metadata.kind == ActorStateChangeKind.REMOVE) {
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.UPDATE, value));
return Mono.empty();
}
throw new IllegalStateException("Duplicate cached state: " + stateName);
}
return this.stateProvider.contains(this.actorTypeName, this.actorId, stateName)
.flatMap(exists -> {
if (exists) {
throw new IllegalStateException("Duplicate state: " + stateName);
}
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.ADD, value));
return Mono.empty();
});
} catch (Exception e) {
return Mono.error(e);
}
}
/**
* Fetches the most recent value for the given state, including cached value.
* @param stateName Name of the state.
* @param clazz Class type for the value being fetched.
* @param <T> Type being fetched.
* @return Asynchronous response with fetched object.
*/
<T> Mono<T> get(String stateName, Class<T> clazz) {
try {
if (stateName == null) {
throw new IllegalArgumentException("State's name cannot be null.");
}
if (this.stateChangeTracker.containsKey(stateName)) {
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);
if (metadata.kind == ActorStateChangeKind.REMOVE) {
throw new NoSuchElementException("State is marked for removal: " + stateName);
}
return Mono.just((T) metadata.value);
}
return this.stateProvider.load(this.actorTypeName, this.actorId, stateName, clazz)
.switchIfEmpty(Mono.error(new NoSuchElementException("State not found: " + stateName)))
.map(v -> {
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, v));
return (T)v;
});
} catch (Exception e) {
return Mono.error(e);
}
}
/**
* Updates a given key/value pair in the state store's cache.
* @param stateName Name of the state being updated.
* @param value Value to be set for given state.
* @param <T> Type of the value being set.
* @return Asynchronous void result.
*/
<T> Mono<Void> set(String stateName, T value) {
try {
if (stateName == null) {
throw new IllegalArgumentException("State's name cannot be null.");
}
if (this.stateChangeTracker.containsKey(stateName)) {
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);
ActorStateChangeKind kind = metadata.kind;
if ((kind == ActorStateChangeKind.NONE) || (kind == ActorStateChangeKind.REMOVE)) {
kind = ActorStateChangeKind.UPDATE;
}
this.stateChangeTracker.put(stateName, new StateChangeMetadata(kind, value));
return Mono.empty();
}
return this.stateProvider.contains(this.actorTypeName, this.actorId, stateName)
.map(exists -> {
this.stateChangeTracker.put(stateName,
new StateChangeMetadata(exists ? ActorStateChangeKind.UPDATE : ActorStateChangeKind.ADD, value));
return exists;
})
.then();
} catch (Exception e) {
return Mono.error(e);
}
}
/**
* Removes a given state from state store's cache.
* @param stateName State being stored.
* @return Asynchronous void result.
*/
Mono<Void> remove(String stateName) {
try {
if (stateName == null) {
throw new IllegalArgumentException("State's name cannot be null.");
}
if (this.stateChangeTracker.containsKey(stateName)) {
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);
if (metadata.kind == ActorStateChangeKind.REMOVE) {
return Mono.empty();
}
if (metadata.kind == ActorStateChangeKind.ADD) {
this.stateChangeTracker.remove(stateName);
return Mono.empty();
}
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null));
return Mono.empty();
}
return this.stateProvider.contains(this.actorTypeName, this.actorId, stateName)
.filter(exists -> exists)
.map(exists -> {
this.stateChangeTracker.put(stateName, new StateChangeMetadata(ActorStateChangeKind.REMOVE, null));
return exists;
})
.then();
} catch (Exception e) {
return Mono.error(e);
}
}
/**
* Checks if a given state exists in state store or cache.
* @param stateName State being checked.
* @return Asynchronous boolean result indicating whether state is present.
*/
Mono<Boolean> contains(String stateName) {
try {
if (stateName == null) {
throw new IllegalArgumentException("State's name cannot be null.");
}
if (this.stateChangeTracker.containsKey(stateName)) {
StateChangeMetadata metadata = this.stateChangeTracker.get(stateName);
if (metadata.kind == ActorStateChangeKind.REMOVE) {
return Mono.just(false);
}
return Mono.just(true);
}
return this.stateProvider.contains(this.actorTypeName, this.actorId, stateName);
} catch (Exception e) {
return Mono.error(e);
}
}
/**
* Saves all changes to state store.
* @return Asynchronous void result.
*/
Mono<Void> save() {
if (this.stateChangeTracker.isEmpty()) {
return Mono.empty();
}
List<ActorStateChange> changes = new ArrayList<>();
List<String> removed = new ArrayList<>();
for (Map.Entry<String, StateChangeMetadata> tuple : this.stateChangeTracker.entrySet()) {
if (tuple.getValue().kind == ActorStateChangeKind.NONE) {
continue;
}
if (tuple.getValue().kind == ActorStateChangeKind.REMOVE) {
removed.add(tuple.getKey());
}
changes.add(new ActorStateChange(tuple.getKey(), tuple.getValue().value, tuple.getValue().kind));
}
return this.stateProvider.apply(this.actorTypeName, this.actorId, changes.toArray(new ActorStateChange[0]))
.then(this.flush());
}
/**
* Clears all changes not yet saved to state store.
* @return
*/
Mono<Void> clear() {
this.stateChangeTracker.clear();
return Mono.empty();
}
/**
* Commits the current cached values after successful save.
* @return
*/
private Mono<Void> flush() {
for (Map.Entry<String, StateChangeMetadata> tuple : this.stateChangeTracker.entrySet()) {
String stateName = tuple.getKey();
if (tuple.getValue().kind == ActorStateChangeKind.REMOVE) {
this.stateChangeTracker.remove(stateName);
} else {
StateChangeMetadata metadata = new StateChangeMetadata(ActorStateChangeKind.NONE, tuple.getValue().value);
this.stateChangeTracker.put(stateName, metadata);
}
}
return Mono.empty();
}
/**
* Internal class to represent value and change kind.
*/
private static final class StateChangeMetadata {
/**
* Kind of change cached.
*/
private final ActorStateChangeKind kind;
/**
* Value cached.
*/
private final Object value;
/**
* Creates a new instance of the metadata on state change.
* @param kind Kind of change.
* @param value Value to be set.
*/
private StateChangeMetadata(ActorStateChangeKind kind, Object value) {
this.kind = kind;
this.value = value;
}
}
}

View File

@ -4,97 +4,206 @@
*/
package io.dapr.actors.runtime;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.time.Duration;
/**
* Serializes and deserializes an object.
*/
class ActorStateSerializer {
/**
* Shared Json serializer/deserializer as per Jackson's documentation.
*/
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* Shared Json Factory as per Jackson's documentation, used only for this class.
*/
private static final JsonFactory JSON_FACTORY = new JsonFactory();
/**
* Serializes a given state object into byte array.
*
* @param state State object to be serialized.
* @return Array of bytes[] with the serialized content.
* @throws IOException
*/
<T> String serialize(T state) throws IOException {
if (state == null) {
return null;
}
/**
* Shared Json serializer/deserializer as per Jackson's documentation.
*/
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
if (state.getClass() == String.class) {
return state.toString();
}
if (isPrimitive(state.getClass())) {
return state.toString();
}
// Not string, not primitive, so it is a complex type: we use JSON for that.
return OBJECT_MAPPER.writeValueAsString(state);
/**
* Serializes a given state object into byte array.
*
* @param state State object to be serialized.
* @return Array of bytes[] with the serialized content.
* @throws IOException
*/
<T> String serialize(T state) throws IOException {
if (state == null) {
return null;
}
/**
* Deserializes the byte array into the original object.
*
* @param value String to be parsed.
* @param clazz Type of the object being deserialized.
* @param <T> Generic type of the object being deserialized.
* @return Object of type T.
* @throws IOException
*/
<T> T deserialize(String value, Class<T> clazz) throws IOException {
if (clazz == String.class) {
return (T) value;
}
if (isPrimitive(clazz)) {
return parse(value, clazz);
}
// Not string, not primitive, so it is a complex type: we use JSON for that.
return OBJECT_MAPPER.readValue(value, clazz);
if (state.getClass() == String.class) {
return state.toString();
}
private static boolean isPrimitive(Class<?> clazz) {
if (clazz == null) {
return false;
}
return (clazz.isPrimitive() ||
(clazz == Boolean.class) ||
(clazz == Character.class) ||
(clazz == Byte.class) ||
(clazz == Short.class) ||
(clazz == Integer.class) ||
(clazz == Long.class) ||
(clazz == Float.class) ||
(clazz == Double.class) ||
(clazz == Void.class));
if (state.getClass() == ActorTimer.class) {
// Special serializer for this internal classes.
return serialize((ActorTimer<?>) state);
}
private static <T> T parse(String value, Class<T> clazz) {
if (value == null) {
return null;
}
if ((Boolean.class == clazz) || (boolean.class == clazz)) return (T) Boolean.valueOf(value);
if ((Byte.class == clazz) || (byte.class == clazz)) return (T) Byte.valueOf(value);
if ((Short.class == clazz) || (short.class == clazz)) return (T) Short.valueOf(value);
if ((Integer.class == clazz) || (int.class == clazz)) return (T) Integer.valueOf(value);
if ((Long.class == clazz) || (long.class == clazz)) return (T) Long.valueOf(value);
if ((Float.class == clazz) || (float.class == clazz)) return (T) Float.valueOf(value);
if ((Double.class == clazz) || (double.class == clazz)) return (T) Double.valueOf(value);
return null;
if (state.getClass() == ActorReminderParams.class) {
// Special serializer for this internal classes.
return serialize((ActorReminderParams) state);
}
if (isPrimitiveOrEquivalent(state.getClass())) {
return state.toString();
}
// Not string, not primitive, so it is a complex type: we use JSON for that.
return OBJECT_MAPPER.writeValueAsString(state);
}
/**
* Deserializes the byte array into the original object.
*
* @param value String to be parsed.
* @param clazz Type of the object being deserialized.
* @param <T> Generic type of the object being deserialized.
* @return Object of type T.
* @throws IOException
*/
<T> T deserialize(String value, Class<T> clazz) throws IOException {
if (clazz == String.class) {
return (T) value;
}
if (clazz == ActorReminderParams.class) {
// Special serializer for this internal classes.
return (T) deserializeActorReminder(value);
}
if (isPrimitiveOrEquivalent(clazz)) {
return parse(value, clazz);
}
if (value == null) {
return (T) null;
}
// Not string, not primitive, so it is a complex type: we use JSON for that.
return OBJECT_MAPPER.readValue(value, clazz);
}
/**
* Checks if the class is a primitive or equivalent.
* @param clazz Class to be checked.
* @return True if primitive or equivalent.
*/
private static boolean isPrimitiveOrEquivalent(Class<?> clazz) {
if (clazz == null) {
return false;
}
return (clazz.isPrimitive() ||
(clazz == Boolean.class) ||
(clazz == Character.class) ||
(clazz == Byte.class) ||
(clazz == Short.class) ||
(clazz == Integer.class) ||
(clazz == Long.class) ||
(clazz == Float.class) ||
(clazz == Double.class) ||
(clazz == Void.class));
}
/**
* Parses a given String to the corresponding object defined by class.
* @param value String to be parsed.
* @param clazz Class of the expected result type.
* @param <T> Result type.
* @return Result as corresponding type.
*/
private static <T> T parse(String value, Class<T> clazz) {
if (value == null) {
if (boolean.class == clazz) return (T) Boolean.FALSE;
if (byte.class == clazz) return (T) Byte.valueOf((byte) 0);
if (short.class == clazz) return (T) Short.valueOf((short) 0);
if (int.class == clazz) return (T) Integer.valueOf(0);
if (long.class == clazz) return (T) Long.valueOf(0L);
if (float.class == clazz) return (T) Float.valueOf(0);
if (double.class == clazz) return (T) Double.valueOf(0);
return null;
}
if ((Boolean.class == clazz) || (boolean.class == clazz)) return (T) Boolean.valueOf(value);
if ((Byte.class == clazz) || (byte.class == clazz)) return (T) Byte.valueOf(value);
if ((Short.class == clazz) || (short.class == clazz)) return (T) Short.valueOf(value);
if ((Integer.class == clazz) || (int.class == clazz)) return (T) Integer.valueOf(value);
if ((Long.class == clazz) || (long.class == clazz)) return (T) Long.valueOf(value);
if ((Float.class == clazz) || (float.class == clazz)) return (T) Float.valueOf(value);
if ((Double.class == clazz) || (double.class == clazz)) return (T) Double.valueOf(value);
return null;
}
/**
* Faster serialization for Actor's timer.
* @param timer Timer to be serialized.
* @return JSON String.
* @throws IOException If cannot generate JSON.
*/
private static String serialize(ActorTimer<?> timer) throws IOException {
try (Writer writer = new StringWriter()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
generator.writeStartObject();
generator.writeStringField("dueTime", DurationUtils.ConvertDurationToDaprFormat(timer.getDueTime()));
generator.writeStringField("period", DurationUtils.ConvertDurationToDaprFormat(timer.getPeriod()));
generator.writeEndObject();
generator.close();
writer.flush();
return writer.toString();
}
}
/**
* Faster serialization for Actor's reminder.
* @param reminder Reminder to be serialized.
* @return JSON String.
* @throws IOException If cannot generate JSON.
*/
private static String serialize(ActorReminderParams reminder) throws IOException {
try (Writer writer = new StringWriter()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
generator.writeStartObject();
generator.writeStringField("dueTime", DurationUtils.ConvertDurationToDaprFormat(reminder.getDueTime()));
generator.writeStringField("period", DurationUtils.ConvertDurationToDaprFormat(reminder.getPeriod()));
if (reminder.getData() != null) {
generator.writeStringField("data", reminder.getData());
}
generator.writeEndObject();
generator.close();
writer.flush();
return writer.toString();
}
}
/**
* Deserializes an Actor Reminder.
* @param value String to be deserialized.
* @return Actor Reminder.
* @throws IOException If cannot parse JSON.
*/
private static ActorReminderParams deserializeActorReminder(String value) throws IOException {
if (value == null) {
return null;
}
JsonNode node = OBJECT_MAPPER.readTree(value);
Duration dueTime = DurationUtils.ConvertDurationFromDaprFormat(node.get("dueTime").asText());
Duration period = DurationUtils.ConvertDurationFromDaprFormat(node.get("period").asText());
String data = node.get("data") != null ? node.get("data").asText() : null;
return new ActorReminderParams(data, dueTime, period);
}
}

View File

@ -1,41 +1,115 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.actors.runtime;
import java.time.Duration;
import java.util.function.Function;
/**
* Represents the timer set on an Actor.
* Represents the timer set on an Actor, to be called once after due time and then every period.
* @param <T> State type.
*/
public interface ActorTimer {
final class ActorTimer<T> {
/**
* Gets the time when timer is first due.
* @return Time as Duration when timer is first due.
* Actor that owns this timer.
*/
Duration getDueTime();
private final AbstractActor owner;
/**
* Gets the periodic time when timer will be invoked.
* @return Periodic time as Duration when timer will be invoked.
* Name of this timer.
*/
Duration getPeriod();
private String name;
/**
* Name of the method to be called for this timer.
*/
private String methodName;
/**
* State to be sent in the timer.
*/
private T state;
/**
* Due time for the timer's first trigger.
*/
private Duration dueTime;
/**
* Period at which the timer will be triggered.
*/
private Duration period;
/**
* Instantiates a new Actor Timer.
*
* @param owner The Actor that owns this timer. The timer callback will be fired for this Actor.
* @param timerName The name of the timer.
* @param methodName The name of the method to be called for this timer.
* @param state information to be used by the callback method
* @param dueTime the time when timer is first due.
* @param period the periodic time when timer will be invoked.
*/
ActorTimer(AbstractActor owner,
String timerName,
String methodName,
T state,
Duration dueTime,
Duration period) {
this.owner = owner;
this.name = timerName;
this.methodName = methodName;
this.state = state;
this.dueTime = dueTime;
this.period = period;
}
/**
* Gets the name of the Timer. The name is unique per actor.
*
* @return The name of the timer.
*/
String getName();
public String getName() {
return this.name;
}
/**
* Gets the name of the method for this Timer.
*
* @return Gets a delegate that specifies a method to be called when the timer fires.
* It has one parameter: the state object passed to RegisterTimer.
* @return The name of the method for this timer.
*/
Function<Object, Void> getAsyncCallback();
public String getMethodName() {
return this.methodName;
}
/**
* Gets the time when timer is first due.
*
* @return Gets state containing information to be used by the callback method, or null.
* @return Time as Duration when timer is first due.
*/
Object getState();
}
public Duration getDueTime() {
return this.dueTime;
}
/**
* Gets the periodic time when timer will be invoked.
*
* @return Periodic time as Duration when timer will be invoked.
*/
public Duration getPeriod() {
return this.period;
}
/**
* Gets state containing information to be used by the callback method, or null.
*
* @return State containing information to be used by the callback method, or null.
*/
public T getState() {
return this.state;
}
}

View File

@ -1,136 +0,0 @@
package io.dapr.actors.runtime;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.time.Duration;
import java.util.function.Function;
/**
* Represents the timer set on an Actor.
*/
class ActorTimerImpl implements ActorTimer {
/**
* Shared Json Factory as per Jackson's documentation, used only for this class.
*/
private static final JsonFactory JSON_FACTORY = new JsonFactory();
/**
* Actor that owns this timer.
*/
private final AbstractActor owner;
/**
* Name of this timer.
*/
private String name;
/**
* Async callbacks for the timer.
*/
private Function<Object, Void> asyncCallback;
/**
* State to be sent in the timer.
*/
private Object state;
/**
* Due time for the timer's first trigger.
*/
private Duration dueTime;
/**
* Period at which the timer will be triggered.
*/
private Duration period;
/**
* @param owner The Actor that owns this timer. The timer callback will be fired for this Actor.
* @param timerName The name of the timer.
* @param asyncCallback The callback to invoke when the timer fires.
* @param state information to be used by the callback method
* @param dueTime the time when timer is first due.
* @param period the periodic time when timer will be invoked.
*/
public ActorTimerImpl(AbstractActor owner,
String timerName,
Function<Object, Void> asyncCallback,
Object state,
Duration dueTime,
Duration period) {
this.owner = owner;
this.name = timerName;
this.asyncCallback = asyncCallback;
this.state = state;
this.dueTime = dueTime;
this.period = period;
}
/**
* Gets the name of the Timer. The name is unique per actor.
*
* @return The name of the timer.
*/
public String getName() {
return this.name;
}
/**
* Gets the time when timer is first due.
*
* @return Time as Duration when timer is first due.
*/
public Duration getDueTime() {
return this.dueTime;
}
/**
* @return Gets a delegate that specifies a method to be called when the timer fires.
* It has one parameter: the state object passed to RegisterTimer.
*/
public Function<Object, Void> getAsyncCallback() {
return this.asyncCallback;
}
/**
* Gets the periodic time when timer will be invoked.
*
* @return Periodic time as Duration when timer will be invoked.
*/
public Duration getPeriod() {
return this.period;
}
/**
* @return Gets state containing information to be used by the callback method, or null.
*/
public Object getState() {
return this.state;
}
/**
* Generates JSON representation of this timer.
*
* @return JSON.
*/
String serialize() throws IOException {
try (Writer writer = new StringWriter()) {
JsonGenerator generator = JSON_FACTORY.createGenerator(writer);
generator.writeStartObject();
generator.writeStringField("dueTime", ConverterUtils.ConvertDurationToDaprFormat(this.dueTime));
generator.writeStringField("period", ConverterUtils.ConvertDurationToDaprFormat(this.period));
generator.writeEndObject();
generator.close();
writer.flush();
return writer.toString();
}
}
}

View File

@ -14,6 +14,10 @@ import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
public @interface ActorType {
/**
* Overrides Actor's name.
* @return Actor's name.
*/
String Name();
}

View File

@ -27,7 +27,7 @@ final class ActorTypeInformation<T> {
/**
* Actor's immediate interfaces.
*/
private final Collection<Class> interfaces;
private final Collection<Class<?>> interfaces;
/**
* Whether Actor type is abstract.
@ -50,7 +50,7 @@ final class ActorTypeInformation<T> {
*/
private ActorTypeInformation(String name,
Class<T> implementationClass,
Collection<Class> interfaces,
Collection<Class<?>> interfaces,
boolean abstractClass,
boolean remindable) {
this.name = name;
@ -84,7 +84,7 @@ final class ActorTypeInformation<T> {
*
* @return Collection of actor interfaces.
*/
public Collection<Class> getInterfaces() {
public Collection<Class<?>> getInterfaces() {
return Collections.unmodifiableCollection(this.interfaces);
}

View File

@ -71,5 +71,5 @@ interface AppToDaprAsyncClient {
* @param timerName Name of timer to be unregistered.
* @return Asynchronous void result.
*/
Mono<Void> unregisterTimerAsync(String actorType, String actorId, String timerName);
Mono<Void> unregisterTimer(String actorType, String actorId, String timerName);
}

View File

@ -4,7 +4,7 @@
*/
package io.dapr.actors.runtime;
import io.dapr.actors.*;
import io.dapr.actors.AbstractClientBuilder;
import okhttp3.OkHttpClient;
/**

View File

@ -6,13 +6,12 @@ package io.dapr.actors.runtime;
import io.dapr.actors.AbstractDaprClient;
import io.dapr.actors.Constants;
import okhttp3.*;
import okhttp3.OkHttpClient;
import reactor.core.publisher.Mono;
/**
* Http client to call Dapr's API for actors.
*/
//public class DaprHttpAsyncClient implements DaprAsyncClient {
class AppToDaprHttpAsyncClient extends AbstractDaprClient implements AppToDaprAsyncClient {
/**
@ -74,7 +73,7 @@ class AppToDaprHttpAsyncClient extends AbstractDaprClient implements AppToDaprAs
* {@inheritDoc}
*/
@Override
public Mono<Void> unregisterTimerAsync(String actorType, String actorId, String timerName) {
public Mono<Void> unregisterTimer(String actorType, String actorId, String timerName) {
String url = String.format(Constants.ACTOR_TIMER_RELATIVE_URL_FORMAT, actorType, actorId, timerName);
return super.invokeAPIVoid("DELETE", url, null);
}

View File

@ -7,6 +7,7 @@ package io.dapr.actors.runtime;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import io.dapr.actors.ActorId;
import reactor.core.publisher.Mono;
import java.io.IOException;
@ -32,8 +33,8 @@ class DaprStateAsyncProvider {
this.serializer = serializer;
}
<T> Mono<T> load(String actorType, String actorId, String stateName, Class<T> clazz) {
Mono<String> result = this.daprAsyncClient.getState(actorType, actorId, stateName);
<T> Mono<T> load(String actorType, ActorId actorId, String stateName, Class<T> clazz) {
Mono<String> result = this.daprAsyncClient.getState(actorType, actorId.toString(), stateName);
return result
.filter(s -> (s != null) && (!s.isEmpty()))
@ -46,8 +47,8 @@ class DaprStateAsyncProvider {
});
}
Mono<Boolean> contains(String actorType, String actorId, String stateName) {
Mono<String> result = this.daprAsyncClient.getState(actorType, actorId, stateName);
Mono<Boolean> contains(String actorType, ActorId actorId, String stateName) {
Mono<String> result = this.daprAsyncClient.getState(actorType, actorId.toString(), stateName);
return result.map(s -> {
return (s != null) && (s.length() > 0);
@ -76,7 +77,7 @@ class DaprStateAsyncProvider {
* @param stateChanges Collection of changes to be performed transactionally.
* @return Void.
*/
Mono<Void> apply(String actorType, String actorId, ActorStateChange... stateChanges)
Mono<Void> apply(String actorType, ActorId actorId, ActorStateChange... stateChanges)
{
if ((stateChanges == null) || stateChanges.length == 0) {
return Mono.empty();
@ -135,6 +136,6 @@ class DaprStateAsyncProvider {
Mono.empty();
}
return this.daprAsyncClient.saveStateTransactionally(actorType, actorId, payload);
return this.daprAsyncClient.saveStateTransactionally(actorType, actorId.toString(), payload);
}
}

View File

@ -15,35 +15,23 @@ import java.lang.reflect.Constructor;
*/
class DefaultActorFactory<T extends AbstractActor> implements ActorFactory<T> {
/**
* Information on the {@link Actor} type being serviced.
*/
private final ActorTypeInformation<T> actorTypeInformation;
/**
* Instantiates the default factory for Actors of a given type.
* @param actorTypeInformation Information of the actor type for this instance.
*/
DefaultActorFactory(ActorTypeInformation<T> actorTypeInformation) {
this.actorTypeInformation = actorTypeInformation;
}
/**
* {@inheritDoc}
*/
@Override
public T createActor(ActorService actorService, ActorId actorId) {
public T createActor(ActorRuntimeContext<T> actorRuntimeContext, ActorId actorId) {
try {
if (this.actorTypeInformation == null) {
if (actorRuntimeContext == null) {
return null;
}
Constructor<T> constructor = this
.actorTypeInformation
.getImplementationClass()
.getConstructor(ActorService.class, ActorId.class);
return constructor.newInstance(actorService, actorId);
} catch (ReflectiveOperationException e) {
Constructor<T> constructor = actorRuntimeContext
.getActorTypeInformation()
.getImplementationClass()
.getConstructor(ActorRuntimeContext.class, ActorId.class);
return constructor.newInstance(actorRuntimeContext, actorId);
} catch (Exception e) {
//TODO: Use ActorTrace.
e.printStackTrace();
}
return null;

View File

@ -5,9 +5,9 @@
package io.dapr.actors.runtime;
import java.time.*;
import java.time.Duration;
public class ConverterUtils {
public class DurationUtils {
/**
* Converts time from the String format used by Dapr into a Duration.
@ -84,8 +84,8 @@ public class ConverterUtils {
/**
* Helper to get the "days" part of the Duration. For example if the duration is 26 hours, this returns 1.
*
* @param d
* @return
* @param d Duration
* @return Number of days.
*/
static long getDaysPart(Duration d) {
long t = d.getSeconds() / 60 / 60 / 24;
@ -95,7 +95,7 @@ public class ConverterUtils {
/**
* Helper to get the "hours" part of the Duration. For example if the duration is 26 hours, this is 1 day, 2 hours, so this returns 2.
*
* @param The duration to parse
* @param d The duration to parse
* @return the hour part of the duration
*/
static long getHoursPart(Duration d) {
@ -107,7 +107,7 @@ public class ConverterUtils {
/**
* Helper to get the "minutes" part of the Duration.
*
* @param The duration to parse
* @param d The duration to parse
* @return the minutes part of the duration
*/
static long getMinutesPart(Duration d) {
@ -119,7 +119,7 @@ public class ConverterUtils {
/**
* Helper to get the "seconds" part of the Duration.
*
* @param The duration to parse
* @param d The duration to parse
* @return the seconds part of the duration
*/
static long getSecondsPart(Duration d) {
@ -131,7 +131,7 @@ public class ConverterUtils {
/**
* Helper to get the "millis" part of the Duration.
*
* @param The duration to parse
* @param d The duration to parse
* @return the milliseconds part of the duration
*/
static long getMilliSecondsPart(Duration d) {

View File

@ -4,8 +4,33 @@
*/
package io.dapr.actors.runtime;
import reactor.core.publisher.Mono;
import java.time.Duration;
/**
* TODO
* Interface that actors must implement to consume reminders registered using RegisterReminderAsync.
*/
public interface Remindable {
public interface Remindable<T> {
/**
* Gets the class for state object.
* @return Class for state object.
*/
Class<T> getReminderStateType();
/**
* The reminder call back invoked when an actor reminder is triggered.
*
* The state of this actor is saved by the actor runtime upon completion of the task returned by this method.
* If an error occurs while saving the state, then all state cached by this actor's {@link ActorStateManager} will
* be discarded and reloaded from previously saved state when the next actor method or reminder invocation occurs.
*
* @param reminderName The name of reminder provided during registration.
* @param state The user state provided during registration.
* @param dueTime The invocation due time provided during registration.
* @param period The invocation period provided during registration.
* @return A task that represents the asynchronous operation performed by this callback.
*/
Mono<Void> receiveReminder(String reminderName, T state, Duration dueTime, Duration period);
}

View File

@ -1,101 +0,0 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package io.dapr.actors.runtime;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.type.MapType;
import java.io.IOException;
import java.time.*;
import java.util.Base64;
import java.util.Map;
class ReminderInfo
{
private final Duration minTimePeriod = Duration.ofMillis(-1);
public Duration dueTime;
public Duration period;
public byte[] data;
public ReminderInfo() {
}
public ReminderInfo(byte[] state, Duration dueTime, Duration period) {
this.ValidateDueTime("DueTime", dueTime);
this.ValidatePeriod("Period", period);
this.data = state;
this.dueTime = dueTime;
this.period = period;
}
Duration getDueTime() {
return this.dueTime;
}
Duration getPeriod() {
return this.period;
}
byte[] getData() {
return this.data;
}
String serialize() throws IOException {
try {
ObjectMapper om = new ObjectMapper();
ObjectNode objectNode = om.createObjectNode();
objectNode.put("dueTime", ConverterUtils.ConvertDurationToDaprFormat(this.dueTime));
objectNode.put("period", ConverterUtils.ConvertDurationToDaprFormat(this.period));
if (this.data != null) {
objectNode.put("data", Base64.getEncoder().encodeToString(this.data));
}
return om.writeValueAsString(objectNode);
} catch (IOException e) {
throw e;
}
}
static ReminderInfo deserialize(byte[] stream) throws IOException {
try {
ObjectMapper om = new ObjectMapper();
MapType type = om.getTypeFactory().constructMapType(Map.class, String.class, Object.class);
Map<String, Object> data = om.readValue(stream, type);
String d = (String)data.getOrDefault("dueTime", "");
Duration dueTime = ConverterUtils.ConvertDurationFromDaprFormat(d);
String p = (String)data.getOrDefault("period", "");
Duration period = ConverterUtils.ConvertDurationFromDaprFormat(p);
String s = (String)data.getOrDefault("data", null);
byte[] state = (s == null) ? null : Base64.getDecoder().decode(s);
return new ReminderInfo(state, dueTime, period);
} catch (IOException e) {
throw e;
}
}
private void ValidateDueTime(String argName, Duration value)
{
if (value.compareTo(Duration.ZERO) < 0 )
{
String message = String.format("argName: %s - Duration toMillis() - specified value must be greater than %s", argName, Duration.ZERO);
throw new IllegalArgumentException(message);
}
}
private void ValidatePeriod(String argName, Duration value) throws IllegalArgumentException
{
if (value.compareTo(this.minTimePeriod) < 0)
{
String message = String.format("argName: %s - Duration toMillis() - specified value must be greater than %s", argName, Duration.ZERO);
throw new IllegalArgumentException(message);
}
}
}

View File

@ -23,7 +23,7 @@ public class ActorIdTest {
public void getId() {
String id = "123";
ActorId actorId = new ActorId(id);
Assert.assertEquals(id, actorId.getStringId());
Assert.assertEquals(id, actorId.toString());
}
@Test

View File

@ -19,12 +19,12 @@ public class ActorMethodInfoMapTest {
@Test
public void normalUsage() {
ArrayList<Class<TestActor>> interfaceTypes = new ArrayList<Class<TestActor>>();
ArrayList<Class<?>> interfaceTypes = new ArrayList<>();
interfaceTypes.add(TestActor.class);
ActorMethodInfoMap m = new ActorMethodInfoMap(interfaceTypes);
try {
Method m1 = m.LookupActorMethodInfo("getData");
Method m1 = m.get("getData");
Assert.assertEquals("getData", m1.getName());
Class c = m1.getReturnType();
Assert.assertEquals(c.getClass(), String.class.getClass());
@ -37,11 +37,11 @@ public class ActorMethodInfoMapTest {
@Test(expected = NoSuchMethodException.class)
public void lookUpNonExistingMethod() throws NoSuchMethodException {
ArrayList<Class<TestActor>> interfaceTypes = new ArrayList<Class<TestActor>>();
ArrayList<Class<?>> interfaceTypes = new ArrayList<>();
interfaceTypes.add(TestActor.class);
ActorMethodInfoMap m = new ActorMethodInfoMap(interfaceTypes);
m.LookupActorMethodInfo("thisMethodDoesNotExist");
m.get("thisMethodDoesNotExist");
}
/**

View File

@ -0,0 +1,62 @@
package io.dapr.actors.runtime;
import org.junit.Assert;
import org.junit.Test;
import java.time.Duration;
public class ActorReminderParamsTest {
private static final ActorStateSerializer SERIALIZER = new ActorStateSerializer();
@Test(expected = IllegalArgumentException.class)
public void outOfRangeDueTime() {
ActorReminderParams info = new ActorReminderParams(null, Duration.ZERO.plusSeconds(-10), Duration.ZERO.plusMinutes(1));
}
@Test
public void negativePeriod() {
// this is ok
ActorReminderParams info = new ActorReminderParams(null, Duration.ZERO.plusMinutes(1), Duration.ZERO.plusMillis(-1));
}
@Test(expected = IllegalArgumentException.class)
public void outOfRangePeriod() {
ActorReminderParams info = new ActorReminderParams(null, Duration.ZERO.plusMinutes(1), Duration.ZERO.plusMinutes(-10));
}
@Test
public void noState() {
ActorReminderParams original = new ActorReminderParams(null, Duration.ZERO.plusMinutes(2), Duration.ZERO.plusMinutes((5)));
ActorReminderParams recreated = null;
try {
String serialized = SERIALIZER.serialize(original);
recreated = SERIALIZER.deserialize(serialized, ActorReminderParams.class);
}
catch(Exception e) {
System.out.println("The error is: " + e);
Assert.fail();
}
Assert.assertEquals(original.getData(), recreated.getData());
Assert.assertEquals(original.getDueTime(), recreated.getDueTime());
Assert.assertEquals(original.getPeriod(), recreated.getPeriod());
}
@Test
public void withState() {
ActorReminderParams original = new ActorReminderParams("maru", Duration.ZERO.plusMinutes(2), Duration.ZERO.plusMinutes((5)));
ActorReminderParams recreated = null;
try {
String serialized = SERIALIZER.serialize(original);
recreated = SERIALIZER.deserialize(serialized, ActorReminderParams.class);
}
catch(Exception e) {
System.out.println("The error is: " + e);
Assert.fail();
}
Assert.assertEquals(original.getData(), recreated.getData());
Assert.assertEquals(original.getDueTime(), recreated.getDueTime());
Assert.assertEquals(original.getPeriod(), recreated.getPeriod());
}
}

View File

@ -7,7 +7,7 @@ import org.junit.Test;
import java.io.IOException;
import java.time.Duration;
public class ActorTimerImplTest {
public class ActorTimerTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -21,14 +21,14 @@ public class ActorTimerImplTest {
.plusHours(1)
.plusSeconds(3);
ActorTimerImpl timer = new ActorTimerImpl(
ActorTimer timer = new ActorTimer(
null,
"testTimer",
null,
null,
dueTime,
period);
String s = timer.serialize();
String s = new ActorStateSerializer().serialize(timer);
String expected = "{\"period\":\"1h0m3s0ms\",\"dueTime\":\"0h7m17s0ms\"}";
// Deep comparison via JsonNode.equals method.
@ -46,14 +46,14 @@ public class ActorTimerImplTest {
.minusHours(1)
.minusMinutes(3);
ActorTimerImpl timer = new ActorTimerImpl(
ActorTimer timer = new ActorTimer(
null,
"testTimer",
null,
null,
dueTime,
period);
String s = timer.serialize();
String s = new ActorStateSerializer().serialize(timer);
// A negative period will be serialized to an empty string which is interpreted by Dapr to mean fire once only.
String expected = "{\"period\":\"\",\"dueTime\":\"0h7m17s0ms\"}";

View File

@ -7,6 +7,9 @@ package io.dapr.actors.runtime;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.time.Duration;
/**
* Unit tests for ActorTypeInformation.
@ -26,6 +29,9 @@ public class ActorTypeInformationTest {
public void notRemindable() {
class A extends AbstractActor implements MyActor {
A() {
super(null, null);
}
}
ActorTypeInformation info = ActorTypeInformation.create(A.class);
@ -45,6 +51,19 @@ public class ActorTypeInformationTest {
public void remindable() {
class A extends AbstractActor implements MyActor, Remindable {
A() {
super(null, null);
}
@Override
public Class getReminderStateType() {
return null;
}
@Override
public Mono<Void> receiveReminder(String reminderName, Object state, Duration dueTime, Duration period) {
return null;
}
}
ActorTypeInformation info = ActorTypeInformation.create(A.class);
@ -65,6 +84,9 @@ public class ActorTypeInformationTest {
public void renamedWithAnnotation() {
@ActorType(Name = "B")
class A extends AbstractActor implements MyActor {
A() {
super(null, null);
}
}
ActorTypeInformation info = ActorTypeInformation.create(A.class);
@ -83,6 +105,9 @@ public class ActorTypeInformationTest {
@Test
public void nonActorParentClass() {
abstract class MyAbstractClass extends AbstractActor implements MyActor {
MyAbstractClass() {
super(null, null);
}
}
class A extends MyAbstractClass {

View File

@ -7,6 +7,7 @@ package io.dapr.actors.runtime;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.actors.ActorId;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Mono;
@ -24,225 +25,225 @@ import static org.mockito.Mockito.*;
*/
public class DaprStateAsyncProviderTest {
private static final ActorStateSerializer SERIALIZER = new ActorStateSerializer();
private static final ActorStateSerializer SERIALIZER = new ActorStateSerializer();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final double EPSILON = 1e-10;
private static final double EPSILON = 1e-10;
/**
* Class used to test JSON serialization.
*/
public static final class Customer {
/**
* Class used to test JSON serialization.
*/
public static final class Customer {
private int id;
private int id;
private String name;
public int getId() {
return id;
}
public Customer setId(int id) {
this.id = id;
return this;
}
public String getName() {
return name;
}
public Customer setName(String name) {
this.name = name;
return this;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Customer customer = (Customer) o;
return id == customer.id &&
Objects.equals(name, customer.name);
}
@Override
public int hashCode() {
return Objects.hash(id, name);
}
private String name;
public int getId() {
return id;
}
@Test
public void happyCaseApply() {
AppToDaprAsyncClient daprAsyncClient = mock(AppToDaprAsyncClient.class);
when(daprAsyncClient
.saveStateTransactionally(
eq("MyActor"),
eq("123"),
argThat(s -> {
try {
JsonNode node = OBJECT_MAPPER.readTree(s);
if (node == null) {
return false;
}
if (node.size() != 3) {
return false;
}
boolean foundInsertName = false;
boolean foundUpdateZipcode = false;
boolean foundDeleteFlag = false;
for (JsonNode operation : node) {
if (operation.get("operation") == null) {
return false;
}
if (operation.get("request") == null) {
return false;
}
String opName = operation.get("operation").asText();
String key = operation.get("request").get("key").asText();
JsonNode valueNode = operation.get("request").get("value");
foundInsertName |= "upsert".equals(opName) &&
"name".equals(key) &&
"Jon Doe".equals(valueNode.asText());
foundUpdateZipcode |= "upsert".equals(opName) &&
"zipcode".equals(key) &&
"98011".equals(valueNode.asText());
foundDeleteFlag |= "delete".equals(opName) &&
"flag".equals(key) &&
(valueNode == null);
}
return foundInsertName && foundUpdateZipcode && foundDeleteFlag;
} catch (IOException e) {
e.printStackTrace();
return false;
}
})))
.thenReturn(Mono.empty());
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprAsyncClient, SERIALIZER);
provider.apply("MyActor",
"123",
createInsertChange("name", "Jon Doe"),
createUpdateChange("zipcode", "98011"),
createDeleteChange("flag"))
.block();
verify(daprAsyncClient).saveStateTransactionally(eq("MyActor"), eq("123"), any());
public Customer setId(int id) {
this.id = id;
return this;
}
@Test
public void happyCaseLoad() {
AppToDaprAsyncClient daprAsyncClient = mock(AppToDaprAsyncClient.class);
when(daprAsyncClient
.getState(any(), any(), eq("name")))
.thenReturn(Mono.just("Jon Doe"));
when(daprAsyncClient
.getState(any(), any(), eq("zipcode")))
.thenReturn(Mono.just("98021"));
when(daprAsyncClient
.getState(any(), any(), eq("goals")))
.thenReturn(Mono.just("98"));
when(daprAsyncClient
.getState(any(), any(), eq("balance")))
.thenReturn(Mono.just("46.55"));
when(daprAsyncClient
.getState(any(), any(), eq("active")))
.thenReturn(Mono.just("true"));
when(daprAsyncClient
.getState(any(), any(), eq("customer")))
.thenReturn(Mono.just("{ \"id\": 1000, \"name\": \"Roxane\"}"));
when(daprAsyncClient
.getState(any(), any(), eq("anotherCustomer")))
.thenReturn(Mono.just("{ \"id\": 2000, \"name\": \"Max\"}"));
when(daprAsyncClient
.getState(any(), any(), eq("nullCustomer")))
.thenReturn(Mono.just(""));
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprAsyncClient, SERIALIZER);
Assert.assertEquals("Jon Doe",
provider.load("MyActor", "123", "name", String.class).block());
Assert.assertEquals("98021",
provider.load("MyActor", "123", "zipcode", String.class).block());
Assert.assertEquals(98,
(int) provider.load("MyActor", "123", "goals", int.class).block());
Assert.assertEquals(98,
(int) provider.load("MyActor", "123", "goals", int.class).block());
Assert.assertEquals(46.55,
(double) provider.load("MyActor", "123", "balance", double.class).block(),
EPSILON);
Assert.assertEquals(true,
(boolean) provider.load("MyActor", "123", "active", boolean.class).block());
Assert.assertEquals(new Customer().setId(1000).setName("Roxane"),
provider.load("MyActor", "123", "customer", Customer.class).block());
Assert.assertNotEquals(new Customer().setId(1000).setName("Roxane"),
provider.load("MyActor", "123", "anotherCustomer", Customer.class).block());
Assert.assertNull(provider.load("MyActor", "123", "nullCustomer", Customer.class).block());
public String getName() {
return name;
}
@Test
public void happyCaseContains() {
AppToDaprAsyncClient daprAsyncClient = mock(AppToDaprAsyncClient.class);
// Keys that exists.
when(daprAsyncClient
.getState(any(), any(), eq("name")))
.thenReturn(Mono.just("Jon Doe"));
when(daprAsyncClient
.getState(any(), any(), eq("zipcode")))
.thenReturn(Mono.just("98021"));
when(daprAsyncClient
.getState(any(), any(), eq("goals")))
.thenReturn(Mono.just("98"));
when(daprAsyncClient
.getState(any(), any(), eq("balance")))
.thenReturn(Mono.just("46.55"));
when(daprAsyncClient
.getState(any(), any(), eq("active")))
.thenReturn(Mono.just("true"));
when(daprAsyncClient
.getState(any(), any(), eq("customer")))
.thenReturn(Mono.just("{ \"id\": \"3000\", \"name\": \"Ely\" }"));
// Keys that do not exist.
when(daprAsyncClient
.getState(any(), any(), eq("Does not exist")))
.thenReturn(Mono.just(""));
when(daprAsyncClient
.getState(any(), any(), eq("NAME")))
.thenReturn(Mono.just(""));
when(daprAsyncClient
.getState(any(), any(), eq(null)))
.thenReturn(Mono.just(""));
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprAsyncClient, SERIALIZER);
Assert.assertTrue(provider.contains("MyActor", "123", "name").block());
Assert.assertFalse(provider.contains("MyActor", "123", "NAME").block());
Assert.assertTrue(provider.contains("MyActor", "123", "zipcode").block());
Assert.assertTrue(provider.contains("MyActor", "123", "goals").block());
Assert.assertTrue(provider.contains("MyActor", "123", "balance").block());
Assert.assertTrue(provider.contains("MyActor", "123", "active").block());
Assert.assertTrue(provider.contains("MyActor", "123", "customer").block());
Assert.assertFalse(provider.contains("MyActor", "123", "Does not exist").block());
Assert.assertFalse(provider.contains("MyActor", "123", null).block());
public Customer setName(String name) {
this.name = name;
return this;
}
private final <T> ActorStateChange<T> createInsertChange(String name, T value) {
return new ActorStateChange(name, value, ActorStateChangeKind.ADD);
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Customer customer = (Customer) o;
return id == customer.id &&
Objects.equals(name, customer.name);
}
private final <T> ActorStateChange<T> createUpdateChange(String name, T value) {
return new ActorStateChange(name, value, ActorStateChangeKind.UPDATE);
@Override
public int hashCode() {
return Objects.hash(id, name);
}
private final <T> ActorStateChange<T> createDeleteChange(String name) {
return new ActorStateChange(name, null, ActorStateChangeKind.REMOVE);
}
}
@Test
public void happyCaseApply() {
AppToDaprAsyncClient daprAsyncClient = mock(AppToDaprAsyncClient.class);
when(daprAsyncClient
.saveStateTransactionally(
eq("MyActor"),
eq("123"),
argThat(s -> {
try {
JsonNode node = OBJECT_MAPPER.readTree(s);
if (node == null) {
return false;
}
if (node.size() != 3) {
return false;
}
boolean foundInsertName = false;
boolean foundUpdateZipcode = false;
boolean foundDeleteFlag = false;
for (JsonNode operation : node) {
if (operation.get("operation") == null) {
return false;
}
if (operation.get("request") == null) {
return false;
}
String opName = operation.get("operation").asText();
String key = operation.get("request").get("key").asText();
JsonNode valueNode = operation.get("request").get("value");
foundInsertName |= "upsert".equals(opName) &&
"name".equals(key) &&
"Jon Doe".equals(valueNode.asText());
foundUpdateZipcode |= "upsert".equals(opName) &&
"zipcode".equals(key) &&
"98011".equals(valueNode.asText());
foundDeleteFlag |= "delete".equals(opName) &&
"flag".equals(key) &&
(valueNode == null);
}
return foundInsertName && foundUpdateZipcode && foundDeleteFlag;
} catch (IOException e) {
e.printStackTrace();
return false;
}
})))
.thenReturn(Mono.empty());
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprAsyncClient, SERIALIZER);
provider.apply("MyActor",
new ActorId("123"),
createInsertChange("name", "Jon Doe"),
createUpdateChange("zipcode", "98011"),
createDeleteChange("flag"))
.block();
verify(daprAsyncClient).saveStateTransactionally(eq("MyActor"), eq("123"), any());
}
@Test
public void happyCaseLoad() {
AppToDaprAsyncClient daprAsyncClient = mock(AppToDaprAsyncClient.class);
when(daprAsyncClient
.getState(any(), any(), eq("name")))
.thenReturn(Mono.just("Jon Doe"));
when(daprAsyncClient
.getState(any(), any(), eq("zipcode")))
.thenReturn(Mono.just("98021"));
when(daprAsyncClient
.getState(any(), any(), eq("goals")))
.thenReturn(Mono.just("98"));
when(daprAsyncClient
.getState(any(), any(), eq("balance")))
.thenReturn(Mono.just("46.55"));
when(daprAsyncClient
.getState(any(), any(), eq("active")))
.thenReturn(Mono.just("true"));
when(daprAsyncClient
.getState(any(), any(), eq("customer")))
.thenReturn(Mono.just("{ \"id\": 1000, \"name\": \"Roxane\"}"));
when(daprAsyncClient
.getState(any(), any(), eq("anotherCustomer")))
.thenReturn(Mono.just("{ \"id\": 2000, \"name\": \"Max\"}"));
when(daprAsyncClient
.getState(any(), any(), eq("nullCustomer")))
.thenReturn(Mono.just(""));
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprAsyncClient, SERIALIZER);
Assert.assertEquals("Jon Doe",
provider.load("MyActor", new ActorId("123"), "name", String.class).block());
Assert.assertEquals("98021",
provider.load("MyActor", new ActorId("123"), "zipcode", String.class).block());
Assert.assertEquals(98,
(int) provider.load("MyActor", new ActorId("123"), "goals", int.class).block());
Assert.assertEquals(98,
(int) provider.load("MyActor", new ActorId("123"), "goals", int.class).block());
Assert.assertEquals(46.55,
(double) provider.load("MyActor", new ActorId("123"), "balance", double.class).block(),
EPSILON);
Assert.assertEquals(true,
(boolean) provider.load("MyActor", new ActorId("123"), "active", boolean.class).block());
Assert.assertEquals(new Customer().setId(1000).setName("Roxane"),
provider.load("MyActor", new ActorId("123"), "customer", Customer.class).block());
Assert.assertNotEquals(new Customer().setId(1000).setName("Roxane"),
provider.load("MyActor", new ActorId("123"), "anotherCustomer", Customer.class).block());
Assert.assertNull(provider.load("MyActor", new ActorId("123"), "nullCustomer", Customer.class).block());
}
@Test
public void happyCaseContains() {
AppToDaprAsyncClient daprAsyncClient = mock(AppToDaprAsyncClient.class);
// Keys that exists.
when(daprAsyncClient
.getState(any(), any(), eq("name")))
.thenReturn(Mono.just("Jon Doe"));
when(daprAsyncClient
.getState(any(), any(), eq("zipcode")))
.thenReturn(Mono.just("98021"));
when(daprAsyncClient
.getState(any(), any(), eq("goals")))
.thenReturn(Mono.just("98"));
when(daprAsyncClient
.getState(any(), any(), eq("balance")))
.thenReturn(Mono.just("46.55"));
when(daprAsyncClient
.getState(any(), any(), eq("active")))
.thenReturn(Mono.just("true"));
when(daprAsyncClient
.getState(any(), any(), eq("customer")))
.thenReturn(Mono.just("{ \"id\": \"3000\", \"name\": \"Ely\" }"));
// Keys that do not exist.
when(daprAsyncClient
.getState(any(), any(), eq("Does not exist")))
.thenReturn(Mono.just(""));
when(daprAsyncClient
.getState(any(), any(), eq("NAME")))
.thenReturn(Mono.just(""));
when(daprAsyncClient
.getState(any(), any(), eq(null)))
.thenReturn(Mono.just(""));
DaprStateAsyncProvider provider = new DaprStateAsyncProvider(daprAsyncClient, SERIALIZER);
Assert.assertTrue(provider.contains("MyActor", new ActorId("123"), "name").block());
Assert.assertFalse(provider.contains("MyActor", new ActorId("123"), "NAME").block());
Assert.assertTrue(provider.contains("MyActor", new ActorId("123"), "zipcode").block());
Assert.assertTrue(provider.contains("MyActor", new ActorId("123"), "goals").block());
Assert.assertTrue(provider.contains("MyActor", new ActorId("123"), "balance").block());
Assert.assertTrue(provider.contains("MyActor", new ActorId("123"), "active").block());
Assert.assertTrue(provider.contains("MyActor", new ActorId("123"), "customer").block());
Assert.assertFalse(provider.contains("MyActor", new ActorId("123"), "Does not exist").block());
Assert.assertFalse(provider.contains("MyActor", new ActorId("123"), null).block());
}
private final <T> ActorStateChange createInsertChange(String name, T value) {
return new ActorStateChange(name, value, ActorStateChangeKind.ADD);
}
private final <T> ActorStateChange createUpdateChange(String name, T value) {
return new ActorStateChange(name, value, ActorStateChangeKind.UPDATE);
}
private final ActorStateChange createDeleteChange(String name) {
return new ActorStateChange(name, null, ActorStateChangeKind.REMOVE);
}
}

View File

@ -21,12 +21,13 @@ public class DefaultActorFactoryTest {
*/
static class MyActor extends AbstractActor implements Actor {
ActorService actorService;
ActorRuntimeContext<MyActor> context;
ActorId actorId;
public MyActor(ActorService actorService, ActorId actorId) {
this.actorService = actorService;
public MyActor(ActorRuntimeContext<MyActor> context, ActorId actorId) {
super(context, actorId);
this.context = context;
this.actorId = actorId;
}
}
@ -34,7 +35,10 @@ public class DefaultActorFactoryTest {
/**
* A non-compliant implementation of Actor to be used in the tests below.
*/
static class InvalidActor extends AbstractActor {
static class InvalidActor extends AbstractActor implements Actor {
InvalidActor() {
super(null, null);
}
}
/**
@ -42,13 +46,13 @@ public class DefaultActorFactoryTest {
*/
@Test
public void happyActor() {
DefaultActorFactory<MyActor> factory = new DefaultActorFactory(ActorTypeInformation.tryCreate(MyActor.class));
DefaultActorFactory<MyActor> factory = new DefaultActorFactory<>();
ActorId actorId = ActorId.createRandom();
MyActor actor = factory.createActor(mock(ActorService.class), actorId);
MyActor actor = factory.createActor(createActorRuntimeContext(MyActor.class), actorId);
Assert.assertEquals(actorId, actor.actorId);
Assert.assertNotNull(actor.actorService);
Assert.assertNotNull(actor.context);
}
/**
@ -56,12 +60,22 @@ public class DefaultActorFactoryTest {
*/
@Test
public void noValidConstructor() {
DefaultActorFactory<InvalidActor> factory = new DefaultActorFactory(ActorTypeInformation.tryCreate(InvalidActor.class));
DefaultActorFactory<InvalidActor> factory = new DefaultActorFactory<>();
ActorId actorId = ActorId.createRandom();
InvalidActor actor = factory.createActor(mock(ActorService.class), actorId);
InvalidActor actor = factory.createActor(createActorRuntimeContext(InvalidActor.class), actorId);
Assert.assertNull(actor);
}
private static <T extends AbstractActor> ActorRuntimeContext<T> createActorRuntimeContext(Class<T> clazz) {
return new ActorRuntimeContext(
mock(ActorRuntime.class),
mock(ActorStateSerializer.class),
mock(ActorFactory.class),
ActorTypeInformation.create(clazz),
mock(AppToDaprAsyncClient.class),
mock(DaprStateAsyncProvider.class));
}
}

View File

@ -5,14 +5,14 @@ import org.junit.Test;
import java.time.Duration;
public class ConverterUtilsTest {
public class DurationUtilsTest {
@Test
public void convertTimeBothWays() {
String s = "4h15m50s60ms";
Duration d1 = ConverterUtils.ConvertDurationFromDaprFormat(s);
Duration d1 = DurationUtils.ConvertDurationFromDaprFormat(s);
String t = ConverterUtils.ConvertDurationToDaprFormat(d1);
String t = DurationUtils.ConvertDurationToDaprFormat(d1);
Assert.assertEquals(s, t);
}
@ -20,82 +20,82 @@ public class ConverterUtilsTest {
public void largeHours() {
// hours part is larger than 24
String s = "31h15m50s60ms";
Duration d1 = ConverterUtils.ConvertDurationFromDaprFormat(s);
Duration d1 = DurationUtils.ConvertDurationFromDaprFormat(s);
String t = ConverterUtils.ConvertDurationToDaprFormat(d1);
String t = DurationUtils.ConvertDurationToDaprFormat(d1);
Assert.assertEquals(s, t);
}
@Test
public void negativeDuration() {
Duration d = Duration.ofSeconds(-99);
String t = ConverterUtils.ConvertDurationToDaprFormat(d);
String t = DurationUtils.ConvertDurationToDaprFormat(d);
Assert.assertEquals("", t);
}
@Test
public void testGetHoursPart() {
Duration d1 = Duration.ZERO.plusHours(26);
Assert.assertEquals(2, ConverterUtils.getHoursPart(d1));
Assert.assertEquals(2, DurationUtils.getHoursPart(d1));
Duration d2 = Duration.ZERO.plusHours(23);
Assert.assertEquals(23, ConverterUtils.getHoursPart(d2));
Assert.assertEquals(23, DurationUtils.getHoursPart(d2));
Duration d3 = Duration.ZERO.plusHours(24);
Assert.assertEquals(0, ConverterUtils.getHoursPart(d3));
Assert.assertEquals(0, DurationUtils.getHoursPart(d3));
}
@Test
public void testGetMinutesPart() {
Duration d1 = Duration.ZERO.plusMinutes(61);
Assert.assertEquals(1, ConverterUtils.getMinutesPart(d1));
Assert.assertEquals(1, DurationUtils.getMinutesPart(d1));
Duration d2 = Duration.ZERO.plusMinutes(60);
Assert.assertEquals(0, ConverterUtils.getMinutesPart(d2));
Assert.assertEquals(0, DurationUtils.getMinutesPart(d2));
Duration d3 = Duration.ZERO.plusMinutes(59);
Assert.assertEquals(59, ConverterUtils.getMinutesPart(d3));
Assert.assertEquals(59, DurationUtils.getMinutesPart(d3));
Duration d4 = Duration.ZERO.plusMinutes(3600);
Assert.assertEquals(0, ConverterUtils.getMinutesPart(d4));
Assert.assertEquals(0, DurationUtils.getMinutesPart(d4));
}
@Test
public void testGetSecondsPart() {
Duration d1 = Duration.ZERO.plusSeconds(61);
Assert.assertEquals(1, ConverterUtils.getSecondsPart(d1));
Assert.assertEquals(1, DurationUtils.getSecondsPart(d1));
Duration d2 = Duration.ZERO.plusSeconds(60);
Assert.assertEquals(0, ConverterUtils.getSecondsPart(d2));
Assert.assertEquals(0, DurationUtils.getSecondsPart(d2));
Duration d3 = Duration.ZERO.plusSeconds(59);
Assert.assertEquals(59, ConverterUtils.getSecondsPart(d3));
Assert.assertEquals(59, DurationUtils.getSecondsPart(d3));
Duration d4 = Duration.ZERO.plusSeconds(3600);
Assert.assertEquals(0, ConverterUtils.getSecondsPart(d4));
Assert.assertEquals(0, DurationUtils.getSecondsPart(d4));
}
@Test
public void testGetMillisecondsPart() {
Duration d1 = Duration.ZERO.plusMillis(61);
Assert.assertEquals(61, ConverterUtils.getMilliSecondsPart(d1));
Assert.assertEquals(61, DurationUtils.getMilliSecondsPart(d1));
Duration d2 = Duration.ZERO.plusMillis(60);
Assert.assertEquals(60, ConverterUtils.getMilliSecondsPart(d2));
Assert.assertEquals(60, DurationUtils.getMilliSecondsPart(d2));
Duration d3 = Duration.ZERO.plusMillis(59);
Assert.assertEquals(59, ConverterUtils.getMilliSecondsPart(d3));
Assert.assertEquals(59, DurationUtils.getMilliSecondsPart(d3));
Duration d4 = Duration.ZERO.plusMillis(999);
Assert.assertEquals(999, ConverterUtils.getMilliSecondsPart(d4));
Assert.assertEquals(999, DurationUtils.getMilliSecondsPart(d4));
Duration d5 = Duration.ZERO.plusMillis(1001);
Assert.assertEquals(1, ConverterUtils.getMilliSecondsPart(d5));
Assert.assertEquals(1, DurationUtils.getMilliSecondsPart(d5));
Duration d6 = Duration.ZERO.plusMillis(1000);
Assert.assertEquals(0, ConverterUtils.getMilliSecondsPart(d6));
Assert.assertEquals(0, DurationUtils.getMilliSecondsPart(d6));
Duration d7 = Duration.ZERO.plusMillis(10000);
Assert.assertEquals(0, ConverterUtils.getMilliSecondsPart(d7));
Assert.assertEquals(0, DurationUtils.getMilliSecondsPart(d7));
}
}

View File

@ -1,60 +0,0 @@
package io.dapr.actors.runtime;
import org.junit.Assert;
import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
public class ReminderInfoTest {
@Test(expected = IllegalArgumentException.class)
public void outOfRangeDueTime() {
ReminderInfo info = new ReminderInfo(null, Duration.ZERO.plusSeconds(-10), Duration.ZERO.plusMinutes(1));
}
@Test
public void negativePeriod() {
// this is ok
ReminderInfo info = new ReminderInfo(null, Duration.ZERO.plusMinutes(1), Duration.ZERO.plusMillis(-1));
}
@Test(expected = IllegalArgumentException.class)
public void outOfRangePeriod() {
ReminderInfo info = new ReminderInfo(null, Duration.ZERO.plusMinutes(1), Duration.ZERO.plusMinutes(-10));
}
@Test
public void noState() {
ReminderInfo original = new ReminderInfo(null, Duration.ZERO.plusMinutes(2), Duration.ZERO.plusMinutes((5)));
ReminderInfo recreated = null;
try {
String serialized = original.serialize();
recreated = ReminderInfo.deserialize(serialized.getBytes());
}
catch(Exception e) {
System.out.println("The error is: " + e);
Assert.fail();
}
Assert.assertEquals(original.data, recreated.data);
Assert.assertEquals(original.dueTime, recreated.dueTime);
Assert.assertEquals(original.period, recreated.period);
}
@Test
public void withState() {
ReminderInfo original = new ReminderInfo("maru".getBytes(), Duration.ZERO.plusMinutes(2), Duration.ZERO.plusMinutes((5)));
ReminderInfo recreated = null;
try {
String serialized = original.serialize();
recreated = ReminderInfo.deserialize(serialized.getBytes());
}
catch(Exception e) {
System.out.println("The error is: " + e);
Assert.fail();
}
Assert.assertTrue(Arrays.equals(original.data, recreated.data));
Assert.assertEquals(original.dueTime, recreated.dueTime);
Assert.assertEquals(original.period, recreated.period);
}
}