Refactoring actor example. (#182)

Co-authored-by: Young Bu Park <youngp@microsoft.com>
This commit is contained in:
Artur Souza 2020-01-30 16:32:09 -08:00 committed by GitHub
parent 838224f18a
commit 082c4a46fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 173 additions and 101 deletions

View File

@ -11,70 +11,72 @@ import io.dapr.actors.client.ActorProxyBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Client for Actor runtime.
* Client for Actor runtime to invoke actor methods.
* 1. Build and install jars:
* mvn clean install
* 2. Run the client:
* dapr run --app-id demoactorclient --port 3006 -- mvn exec:java \
* -pl=examples -Dexec.mainClass=io.dapr.examples.actors.http.DemoActorClient
* -pl=examples -Dexec.mainClass=io.dapr.examples.actors.http.DemoActorClient
*/
public class DemoActorClient {
private static final int NUM_ACTORS = 3;
private static final int NUM_MESSAGES_PER_ACTOR = 10;
private static final String METHOD_NAME = "say";
private static final ExecutorService POOL = Executors.newFixedThreadPool(NUM_ACTORS);
/**
* The main method.
* @param args Unused.
* @throws Exception An Exception.
* @param args Input arguments (unused).
* @throws InterruptedException If program has been interrupted.
*/
public static void main(String[] args) throws Exception {
public static void main(String[] args) throws InterruptedException {
ActorProxyBuilder builder = new ActorProxyBuilder("DemoActor");
List<CompletableFuture<Void>> futures = new ArrayList<>(NUM_ACTORS);
List<Thread> threads = new ArrayList<>(NUM_ACTORS);
// Creates multiple actors.
for (int i = 0; i < NUM_ACTORS; i++) {
ActorProxy actor = builder.build(ActorId.createRandom());
futures.add(callActorNTimes(actor));
// Start a thread per actor.
Thread thread = new Thread(() -> callActorForever(actor));
thread.start();
threads.add(thread);
}
futures.forEach(CompletableFuture::join);
POOL.shutdown();
POOL.awaitTermination(1, TimeUnit.MINUTES);
// Waits for threads to finish.
for (Thread thread : threads) {
thread.join();
}
System.out.println("Done.");
}
private static final CompletableFuture<Void> callActorNTimes(ActorProxy actor) {
return CompletableFuture.runAsync(() -> {
actor.invokeActorMethod("registerReminder").block();
for (int i = 0; i < NUM_MESSAGES_PER_ACTOR; i++) {
actor.invokeActorMethod("incrementAndGet", 1).block();
String result = actor.invokeActorMethod(METHOD_NAME,
String.format("Actor %s said message #%d", actor.getActorId().toString(), i), String.class).block();
System.out.println(String.format("Actor %s got a reply: %s", actor.getActorId().toString(), result));
try {
Thread.sleep((long) (1000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
return;
}
}
/**
* Makes multiple method calls into actor until interrupted.
* @param actor Actor to be invoked.
*/
private static final void callActorForever(ActorProxy actor) {
// First, register reminder.
actor.invokeActorMethod("registerReminder").block();
System.out.println(
"Messages sent: " + actor.invokeActorMethod("incrementAndGet", 0, int.class).block());
}, POOL);
// Now, we run until thread is interrupted.
while (!Thread.currentThread().isInterrupted()) {
// Invoke actor method to increment counter by 1, then build message.
int messageNumber = actor.invokeActorMethod("incrementAndGet", 1, int.class).block();
String message = String.format("Actor %s said message #%d", actor.getActorId().toString(), messageNumber);
// Invoke the 'say' method in actor.
String result = actor.invokeActorMethod("say", message, String.class).block();
System.out.println(String.format("Actor %s got a reply: %s", actor.getActorId().toString(), result));
try {
// Waits for up to 1 second.
Thread.sleep((long) Math.rint(1000));
} catch (InterruptedException e) {
// We have been interrupted, so we set the interrupted flag to exit gracefully.
Thread.currentThread().interrupt();
}
}
}
}

View File

@ -31,9 +31,9 @@ public class DemoActorImpl extends AbstractActor implements DemoActor, Remindabl
private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
/**
* This is the constructor of an actor implementation.
* This is the constructor of an actor implementation, while also registering a timer.
* @param runtimeContext The runtime context object which contains objects such as the state provider.
* @param id The id of this actor.
* @param id The id of this actor.
*/
public DemoActorImpl(ActorRuntimeContext runtimeContext, ActorId id) {
super(runtimeContext, id);
@ -46,6 +46,9 @@ public class DemoActorImpl extends AbstractActor implements DemoActor, Remindabl
Duration.ofSeconds(1)).block();
}
/**
* Registers a reminder.
*/
@Override
public void registerReminder() {
super.registerReminder(
@ -55,6 +58,11 @@ public class DemoActorImpl extends AbstractActor implements DemoActor, Remindabl
Duration.ofSeconds(2)).block();
}
/**
* Prints a message and appends the timestamp.
* @param something Something to be said.
* @return What was said appended with timestamp.
*/
@Override
public String say(String something) {
Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
@ -71,6 +79,23 @@ public class DemoActorImpl extends AbstractActor implements DemoActor, Remindabl
return utcNowAsString;
}
/**
* Increments a persistent counter, saves and returns its updated value.
* Example of method implemented with Reactor's Mono class.
* This method could be rewritten with blocking calls in Mono, using block() method:
*
* <p>public int incrementAndGet(int delta) {
* int counter = 0;
* if (super.getActorStateManager().contains("counter").block()) {
* counter = super.getActorStateManager().get("counter", int.class).block();
* }
* counter = counter + 1;
* super.getActorStateManager().set("counter", counter).block();
* return counter;
* }</p>
* @param delta Amount to be added to counter.
* @return Mono response for the incremented value.
*/
@Override
public Mono<Integer> incrementAndGet(int delta) {
return super.getActorStateManager().contains("counter")
@ -79,6 +104,10 @@ public class DemoActorImpl extends AbstractActor implements DemoActor, Remindabl
.flatMap(c -> super.getActorStateManager().set("counter", c).thenReturn(c));
}
/**
* Method invoked by timer.
* @param message Message to be printed.
*/
@Override
public void clock(String message) {
Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
@ -90,20 +119,34 @@ public class DemoActorImpl extends AbstractActor implements DemoActor, Remindabl
+ (message == null ? "" : message + " @ " + utcNowAsString));
}
/**
* Method used to determine reminder's state type.
* @return Class for reminder's state.
*/
@Override
public Class<Integer> getStateType() {
return Integer.class;
}
/**
* Method used be invoked for a reminder.
* @param reminderName The name of reminder provided during registration.
* @param state The user state provided during registration.
* @param dueTime The invocation due time provided during registration.
* @param period The invocation period provided during registration.
* @return Mono result.
*/
@Override
public Mono<Void> receiveReminder(String reminderName, Integer state, Duration dueTime, Duration period) {
Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());
return Mono.fromRunnable(() -> {
Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
String utcNowAsString = DATE_FORMAT.format(utcNow.getTime());
// Handles the request by printing message.
System.out.println(String.format(
"Server reminded actor %s of: %s for %d @ %s",
this.getId(), reminderName, state, utcNowAsString));
return Mono.empty();
String message = String.format("Server reminded actor %s of: %s for %d @ %s",
this.getId(), reminderName, state, utcNowAsString);
// Handles the request by printing message.
System.out.println(message);
});
}
}

View File

@ -23,7 +23,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
* dapr run --app-id demoactorservice --app-port 3000 --port 3005 \
* -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.actors.http.DemoActorService -Dexec.args="-p 3000"
*/
@SpringBootApplication
public class DemoActorService {
/**
@ -42,13 +41,9 @@ public class DemoActorService {
int port = Integer.parseInt(cmd.getOptionValue("port"));
// Register the Actor class.
ActorRuntime.getInstance().registerActor(
DemoActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
ActorRuntime.getInstance().registerActor(DemoActorImpl.class);
// Start Dapr's callback endpoint.
DaprApplication.start(port);
// Start application's endpoint.
SpringApplication.run(DemoActorService.class);
}
}

View File

@ -1,17 +0,0 @@
/*
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
*/
package io.dapr.examples.actors.http;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@RequestMapping("/")
public String index() {
return "Greetings from your Spring Boot Application!";
}
}

View File

@ -44,24 +44,65 @@ public class DemoActorService {
public static void main(String[] args) throws Exception {
///...
// Register the Actor class.
ActorRuntime.getInstance().registerActor(
DemoActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer());
ActorRuntime.getInstance().registerActor(DemoActorImpl.class);
// Start Dapr's callback endpoint.
DaprApplication.start(port);
// Start application's endpoint.
SpringApplication.run(DemoActorService.class);
}
}
```
This application uses `ActorRuntime.getInstance().registerActor()` in order to register `DemoActorImpl` as an actor in the Dapr Actor runtime. Notice that this call passes in two serializer implementations: one is for Dapr's sent and received object and the other is for objects to be persisted.
This application uses `ActorRuntime.getInstance().registerActor()` in order to register `DemoActorImpl` as an actor in the Dapr Actor runtime. Internally, it is using `DefaultObjectSerializer` for two properties: `objectSerializer` is for Dapr's sent and received objects, and `stateSerializer` is for objects to be persisted.
`DaprApplication.start()` method will run the Spring Boot [DaprApplication](../../../springboot/DaprApplication.java), which registers the Dapr Spring Boot controller [DaprController](../../springboot/DaprController.java). This controller contains all Actor methods implemented as endpoints. The Dapr's sidecar will call into the controller. At the end of the main method, this class uses `SpringApplication.run()` to boostrap itself a an Spring application.
`DaprApplication.start()` method will run the Spring Boot [DaprApplication](../../../springboot/DaprApplication.java), which registers the Dapr Spring Boot controller [DaprController](../../springboot/DaprController.java). This controller contains all Actor methods implemented as endpoints. The Dapr's sidecar will call into the controller.
Execute the follow script in order to run the DemoActorService:
See [DemoActorImpl](DemoActorImpl.java) for details on the implementation of an actor:
```java
@ActorType(name = "DemoActor")
public class DemoActorImpl extends AbstractActor implements DemoActor, Remindable<Integer> {
//...
public DemoActorImpl(ActorRuntimeContext runtimeContext, ActorId id) {
super(runtimeContext, id);
//...
}
@Override
public void registerReminder() {
//...
}
@Override
public String say(String something) {
//...
}
@Override
public Mono<Integer> incrementAndGet(int delta) {
//...
}
@Override
public void clock(String message) {
//...
}
@Override
public Class<Integer> getStateType() {
return Integer.class;
}
@Override
public Mono<Void> receiveReminder(String reminderName, Integer state, Duration dueTime, Duration period) {
//...
}
}
```
An actor inherits from `AbstractActor` and implements the constructor to pass through `ActorRuntimeContext` and `ActorId`. By default, the actor's name will be the same as the class' name. Optionally, it can be annotated with `ActorType` and override the actor's name. The actor's methods can be synchronously or use [Project Reactor's Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) return type. Finally, state management is done via methods in `super.getActorStateManager()`.
Now, execute the following script in order to run DemoActorService:
```sh
cd to [repo-root]
dapr run --app-id demoactorservice --app-port 3000 --port 3005 -- mvn exec:java -pl=examples -D exec.mainClass=io.dapr.examples.actors.http.DemoActorService -D exec.args="-p 3000"
@ -77,39 +118,47 @@ The `DemoActorClient.java` file contains the `DemoActorClient` class. See the co
public class DemoActorClient {
private static final int NUM_ACTORS = 3;
private static final int NUM_MESSAGES_PER_ACTOR = 10;
private static final ExecutorService POOL = Executors.newFixedThreadPool(NUM_ACTORS);
public static void main(String[] args) throws Exception {
public static void main(String[] args) throws InterruptedException {
///...
for (int i = 0; i < NUM_ACTORS; i++) {
ActorProxy actor = builder.build(ActorId.createRandom());
futures.add(callActorNTimes(actor));
// Start a thread per actor.
Thread thread = new Thread(() -> callActorForever(actor));
thread.start();
threads.add(thread);
}
///...
private static final CompletableFuture<Void> callActorNTimes(ActorProxy actor) {
return CompletableFuture.runAsync(() -> {
actor.invokeActorMethod("registerReminder").block();
for (int i = 0; i < NUM_MESSAGES_PER_ACTOR; i++) {
//Invoking the "incrementAndGet" method:
actor.invokeActorMethod("incrementAndGet", 1).block();
//Invoking "say" method
String result = actor.invokeActorMethod("say",
String.format("Actor %s said message #%d", actor.getActorId().toString(), i), String.class).block();
System.out.println(String.format("Actor %s got a reply: %s", actor.getActorId().toString(), result));
///...
}
System.out.println(
"Messages sent: " + actor.invokeActorMethod("incrementAndGet", 0, int.class).block());
}, POOL);
}
private static final void callActorForever(ActorProxy actor) {
// First, register reminder.
actor.invokeActorMethod("registerReminder").block();
// Now, we run until thread is interrupted.
while (!Thread.currentThread().isInterrupted()) {
// Invoke actor method to increment counter by 1, then build message.
int messageNumber = actor.invokeActorMethod("incrementAndGet", 1, int.class).block();
String message = String.format("Actor %s said message #%d", actor.getActorId().toString(), messageNumber);
// Invoke the 'say' method in actor.
String result = actor.invokeActorMethod("say", message, String.class).block();
System.out.println(String.format("Actor %s got a reply: %s", actor.getActorId().toString(), result));
try {
// Waits for up to 1 second.
Thread.sleep((long) Math.rint(1000));
} catch (InterruptedException e) {
// We have been interrupted, so we set the interrupted flag to exit gracefully.
Thread.currentThread().interrupt();
}
}
}
}
```
First, The client defines how many actors it is going to create, as well as how many invocation calls it will perform per actor. Then the main method declares a `ActorProxyBuilder` for the `DemoActor` class for creating `ActorProxy` instances, which are the actor representation provided by the SDK. The code executes the `callActorNTimes` private method once per actor. This method executes functionality for the DemoActor implementation using `actor.invokeActorMethod()` in the follow order: `registerReminder()` which sets the due time and period for the reminder, `incrementAndGet()` which increments a counter, persists it and sends it back as response, and finally `say` method wich will print a message containing the received string along with the formatted server time. See [DemoActorImpl](DemoActorImpl.java) for details on the implementation of these methods.
First, the client defines how many actors it is going to create. Then the main method declares a `ActorProxyBuilder` for the `DemoActor` class to create `ActorProxy` instances, which are the actor representation provided by the SDK. The code executes the `callActorForever` private method once per actor. This method triggers the DemoActor's implementation by using `actor.invokeActorMethod()`. Initially, it will invoke `registerReminder()`, which sets the due time and period for the reminder. Then, `incrementAndGet()` increments a counter, persists it and sends it back as response. Finally `say` method which will print a message containing the received string along with the formatted server time.
Use the follow command to execute the DemoActorClient: