diff --git a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java index 067162ef2..7b4523b30 100644 --- a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java +++ b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorClient.java @@ -11,70 +11,72 @@ import io.dapr.actors.client.ActorProxyBuilder; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; /** - * Client for Actor runtime. + * Client for Actor runtime to invoke actor methods. * 1. Build and install jars: * mvn clean install * 2. Run the client: * dapr run --app-id demoactorclient --port 3006 -- mvn exec:java \ - * -pl=examples -Dexec.mainClass=io.dapr.examples.actors.http.DemoActorClient + * -pl=examples -Dexec.mainClass=io.dapr.examples.actors.http.DemoActorClient */ public class DemoActorClient { private static final int NUM_ACTORS = 3; - private static final int NUM_MESSAGES_PER_ACTOR = 10; - - private static final String METHOD_NAME = "say"; - - private static final ExecutorService POOL = Executors.newFixedThreadPool(NUM_ACTORS); - /** * The main method. - * @param args Unused. - * @throws Exception An Exception. + * @param args Input arguments (unused). + * @throws InterruptedException If program has been interrupted. */ - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws InterruptedException { ActorProxyBuilder builder = new ActorProxyBuilder("DemoActor"); - List> futures = new ArrayList<>(NUM_ACTORS); + List threads = new ArrayList<>(NUM_ACTORS); + // Creates multiple actors. for (int i = 0; i < NUM_ACTORS; i++) { ActorProxy actor = builder.build(ActorId.createRandom()); - futures.add(callActorNTimes(actor)); + + // Start a thread per actor. + Thread thread = new Thread(() -> callActorForever(actor)); + thread.start(); + threads.add(thread); } - futures.forEach(CompletableFuture::join); - POOL.shutdown(); - POOL.awaitTermination(1, TimeUnit.MINUTES); + // Waits for threads to finish. + for (Thread thread : threads) { + thread.join(); + } System.out.println("Done."); } - private static final CompletableFuture callActorNTimes(ActorProxy actor) { - return CompletableFuture.runAsync(() -> { - actor.invokeActorMethod("registerReminder").block(); - for (int i = 0; i < NUM_MESSAGES_PER_ACTOR; i++) { - actor.invokeActorMethod("incrementAndGet", 1).block(); - String result = actor.invokeActorMethod(METHOD_NAME, - String.format("Actor %s said message #%d", actor.getActorId().toString(), i), String.class).block(); - System.out.println(String.format("Actor %s got a reply: %s", actor.getActorId().toString(), result)); - try { - Thread.sleep((long) (1000 * Math.random())); - } catch (InterruptedException e) { - e.printStackTrace(); - Thread.currentThread().interrupt(); - return; - } - } + /** + * Makes multiple method calls into actor until interrupted. + * @param actor Actor to be invoked. + */ + private static final void callActorForever(ActorProxy actor) { + // First, register reminder. + actor.invokeActorMethod("registerReminder").block(); - System.out.println( - "Messages sent: " + actor.invokeActorMethod("incrementAndGet", 0, int.class).block()); - }, POOL); + // Now, we run until thread is interrupted. + while (!Thread.currentThread().isInterrupted()) { + // Invoke actor method to increment counter by 1, then build message. + int messageNumber = actor.invokeActorMethod("incrementAndGet", 1, int.class).block(); + String message = String.format("Actor %s said message #%d", actor.getActorId().toString(), messageNumber); + + // Invoke the 'say' method in actor. + String result = actor.invokeActorMethod("say", message, String.class).block(); + System.out.println(String.format("Actor %s got a reply: %s", actor.getActorId().toString(), result)); + + try { + // Waits for up to 1 second. + Thread.sleep((long) Math.rint(1000)); + } catch (InterruptedException e) { + // We have been interrupted, so we set the interrupted flag to exit gracefully. + Thread.currentThread().interrupt(); + } + } } } diff --git a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorImpl.java b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorImpl.java index 723247fee..e367c1ef5 100644 --- a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorImpl.java +++ b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorImpl.java @@ -31,9 +31,9 @@ public class DemoActorImpl extends AbstractActor implements DemoActor, Remindabl private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); /** - * This is the constructor of an actor implementation. + * This is the constructor of an actor implementation, while also registering a timer. * @param runtimeContext The runtime context object which contains objects such as the state provider. - * @param id The id of this actor. + * @param id The id of this actor. */ public DemoActorImpl(ActorRuntimeContext runtimeContext, ActorId id) { super(runtimeContext, id); @@ -46,6 +46,9 @@ public class DemoActorImpl extends AbstractActor implements DemoActor, Remindabl Duration.ofSeconds(1)).block(); } + /** + * Registers a reminder. + */ @Override public void registerReminder() { super.registerReminder( @@ -55,6 +58,11 @@ public class DemoActorImpl extends AbstractActor implements DemoActor, Remindabl Duration.ofSeconds(2)).block(); } + /** + * Prints a message and appends the timestamp. + * @param something Something to be said. + * @return What was said appended with timestamp. + */ @Override public String say(String something) { Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT")); @@ -71,6 +79,23 @@ public class DemoActorImpl extends AbstractActor implements DemoActor, Remindabl return utcNowAsString; } + /** + * Increments a persistent counter, saves and returns its updated value. + * Example of method implemented with Reactor's Mono class. + * This method could be rewritten with blocking calls in Mono, using block() method: + * + *

public int incrementAndGet(int delta) { + * int counter = 0; + * if (super.getActorStateManager().contains("counter").block()) { + * counter = super.getActorStateManager().get("counter", int.class).block(); + * } + * counter = counter + 1; + * super.getActorStateManager().set("counter", counter).block(); + * return counter; + * }

+ * @param delta Amount to be added to counter. + * @return Mono response for the incremented value. + */ @Override public Mono incrementAndGet(int delta) { return super.getActorStateManager().contains("counter") @@ -79,6 +104,10 @@ public class DemoActorImpl extends AbstractActor implements DemoActor, Remindabl .flatMap(c -> super.getActorStateManager().set("counter", c).thenReturn(c)); } + /** + * Method invoked by timer. + * @param message Message to be printed. + */ @Override public void clock(String message) { Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT")); @@ -90,20 +119,34 @@ public class DemoActorImpl extends AbstractActor implements DemoActor, Remindabl + (message == null ? "" : message + " @ " + utcNowAsString)); } + /** + * Method used to determine reminder's state type. + * @return Class for reminder's state. + */ @Override public Class getStateType() { return Integer.class; } + /** + * Method used be invoked for a reminder. + * @param reminderName The name of reminder provided during registration. + * @param state The user state provided during registration. + * @param dueTime The invocation due time provided during registration. + * @param period The invocation period provided during registration. + * @return Mono result. + */ @Override public Mono receiveReminder(String reminderName, Integer state, Duration dueTime, Duration period) { - Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT")); - String utcNowAsString = DATE_FORMAT.format(utcNow.getTime()); + return Mono.fromRunnable(() -> { + Calendar utcNow = Calendar.getInstance(TimeZone.getTimeZone("GMT")); + String utcNowAsString = DATE_FORMAT.format(utcNow.getTime()); - // Handles the request by printing message. - System.out.println(String.format( - "Server reminded actor %s of: %s for %d @ %s", - this.getId(), reminderName, state, utcNowAsString)); - return Mono.empty(); + String message = String.format("Server reminded actor %s of: %s for %d @ %s", + this.getId(), reminderName, state, utcNowAsString); + + // Handles the request by printing message. + System.out.println(message); + }); } } diff --git a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorService.java b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorService.java index a90c14a25..7b4f89ffb 100644 --- a/examples/src/main/java/io/dapr/examples/actors/http/DemoActorService.java +++ b/examples/src/main/java/io/dapr/examples/actors/http/DemoActorService.java @@ -23,7 +23,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; * dapr run --app-id demoactorservice --app-port 3000 --port 3005 \ * -- mvn exec:java -pl=examples -Dexec.mainClass=io.dapr.examples.actors.http.DemoActorService -Dexec.args="-p 3000" */ -@SpringBootApplication public class DemoActorService { /** @@ -42,13 +41,9 @@ public class DemoActorService { int port = Integer.parseInt(cmd.getOptionValue("port")); // Register the Actor class. - ActorRuntime.getInstance().registerActor( - DemoActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer()); + ActorRuntime.getInstance().registerActor(DemoActorImpl.class); // Start Dapr's callback endpoint. DaprApplication.start(port); - - // Start application's endpoint. - SpringApplication.run(DemoActorService.class); } } diff --git a/examples/src/main/java/io/dapr/examples/actors/http/HelloController.java b/examples/src/main/java/io/dapr/examples/actors/http/HelloController.java deleted file mode 100644 index 95a32f0e2..000000000 --- a/examples/src/main/java/io/dapr/examples/actors/http/HelloController.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright (c) Microsoft Corporation. - * Licensed under the MIT License. - */ - -package io.dapr.examples.actors.http; - -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -@RestController -public class HelloController { - @RequestMapping("/") - public String index() { - return "Greetings from your Spring Boot Application!"; - } -} diff --git a/examples/src/main/java/io/dapr/examples/actors/http/README.md b/examples/src/main/java/io/dapr/examples/actors/http/README.md index 7981fe8a9..f1dbef946 100644 --- a/examples/src/main/java/io/dapr/examples/actors/http/README.md +++ b/examples/src/main/java/io/dapr/examples/actors/http/README.md @@ -44,24 +44,65 @@ public class DemoActorService { public static void main(String[] args) throws Exception { ///... // Register the Actor class. - ActorRuntime.getInstance().registerActor( - DemoActorImpl.class, new DefaultObjectSerializer(), new DefaultObjectSerializer()); + ActorRuntime.getInstance().registerActor(DemoActorImpl.class); // Start Dapr's callback endpoint. DaprApplication.start(port); - - // Start application's endpoint. - SpringApplication.run(DemoActorService.class); } } ``` -This application uses `ActorRuntime.getInstance().registerActor()` in order to register `DemoActorImpl` as an actor in the Dapr Actor runtime. Notice that this call passes in two serializer implementations: one is for Dapr's sent and received object and the other is for objects to be persisted. +This application uses `ActorRuntime.getInstance().registerActor()` in order to register `DemoActorImpl` as an actor in the Dapr Actor runtime. Internally, it is using `DefaultObjectSerializer` for two properties: `objectSerializer` is for Dapr's sent and received objects, and `stateSerializer` is for objects to be persisted. -`DaprApplication.start()` method will run the Spring Boot [DaprApplication](../../../springboot/DaprApplication.java), which registers the Dapr Spring Boot controller [DaprController](../../springboot/DaprController.java). This controller contains all Actor methods implemented as endpoints. The Dapr's sidecar will call into the controller. At the end of the main method, this class uses `SpringApplication.run()` to boostrap itself a an Spring application. +`DaprApplication.start()` method will run the Spring Boot [DaprApplication](../../../springboot/DaprApplication.java), which registers the Dapr Spring Boot controller [DaprController](../../springboot/DaprController.java). This controller contains all Actor methods implemented as endpoints. The Dapr's sidecar will call into the controller. -Execute the follow script in order to run the DemoActorService: +See [DemoActorImpl](DemoActorImpl.java) for details on the implementation of an actor: +```java +@ActorType(name = "DemoActor") +public class DemoActorImpl extends AbstractActor implements DemoActor, Remindable { + //... + + public DemoActorImpl(ActorRuntimeContext runtimeContext, ActorId id) { + super(runtimeContext, id); + //... + } + + @Override + public void registerReminder() { + //... + } + + @Override + public String say(String something) { + //... + } + + @Override + public Mono incrementAndGet(int delta) { + //... + } + + @Override + public void clock(String message) { + //... + } + + @Override + public Class getStateType() { + return Integer.class; + } + + @Override + public Mono receiveReminder(String reminderName, Integer state, Duration dueTime, Duration period) { + //... + } +} +``` +An actor inherits from `AbstractActor` and implements the constructor to pass through `ActorRuntimeContext` and `ActorId`. By default, the actor's name will be the same as the class' name. Optionally, it can be annotated with `ActorType` and override the actor's name. The actor's methods can be synchronously or use [Project Reactor's Mono](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) return type. Finally, state management is done via methods in `super.getActorStateManager()`. + + +Now, execute the following script in order to run DemoActorService: ```sh cd to [repo-root] dapr run --app-id demoactorservice --app-port 3000 --port 3005 -- mvn exec:java -pl=examples -D exec.mainClass=io.dapr.examples.actors.http.DemoActorService -D exec.args="-p 3000" @@ -77,39 +118,47 @@ The `DemoActorClient.java` file contains the `DemoActorClient` class. See the co public class DemoActorClient { private static final int NUM_ACTORS = 3; - private static final int NUM_MESSAGES_PER_ACTOR = 10; - private static final ExecutorService POOL = Executors.newFixedThreadPool(NUM_ACTORS); - - public static void main(String[] args) throws Exception { + public static void main(String[] args) throws InterruptedException { ///... for (int i = 0; i < NUM_ACTORS; i++) { ActorProxy actor = builder.build(ActorId.createRandom()); - futures.add(callActorNTimes(actor)); + + // Start a thread per actor. + Thread thread = new Thread(() -> callActorForever(actor)); + thread.start(); + threads.add(thread); } ///... - - private static final CompletableFuture callActorNTimes(ActorProxy actor) { - return CompletableFuture.runAsync(() -> { - actor.invokeActorMethod("registerReminder").block(); - for (int i = 0; i < NUM_MESSAGES_PER_ACTOR; i++) { - //Invoking the "incrementAndGet" method: - actor.invokeActorMethod("incrementAndGet", 1).block(); - //Invoking "say" method - String result = actor.invokeActorMethod("say", - String.format("Actor %s said message #%d", actor.getActorId().toString(), i), String.class).block(); - System.out.println(String.format("Actor %s got a reply: %s", actor.getActorId().toString(), result)); - ///... - } - System.out.println( - "Messages sent: " + actor.invokeActorMethod("incrementAndGet", 0, int.class).block()); - }, POOL); } + + private static final void callActorForever(ActorProxy actor) { + // First, register reminder. + actor.invokeActorMethod("registerReminder").block(); + + // Now, we run until thread is interrupted. + while (!Thread.currentThread().isInterrupted()) { + // Invoke actor method to increment counter by 1, then build message. + int messageNumber = actor.invokeActorMethod("incrementAndGet", 1, int.class).block(); + String message = String.format("Actor %s said message #%d", actor.getActorId().toString(), messageNumber); + + // Invoke the 'say' method in actor. + String result = actor.invokeActorMethod("say", message, String.class).block(); + System.out.println(String.format("Actor %s got a reply: %s", actor.getActorId().toString(), result)); + + try { + // Waits for up to 1 second. + Thread.sleep((long) Math.rint(1000)); + } catch (InterruptedException e) { + // We have been interrupted, so we set the interrupted flag to exit gracefully. + Thread.currentThread().interrupt(); + } + } } } ``` -First, The client defines how many actors it is going to create, as well as how many invocation calls it will perform per actor. Then the main method declares a `ActorProxyBuilder` for the `DemoActor` class for creating `ActorProxy` instances, which are the actor representation provided by the SDK. The code executes the `callActorNTimes` private method once per actor. This method executes functionality for the DemoActor implementation using `actor.invokeActorMethod()` in the follow order: `registerReminder()` which sets the due time and period for the reminder, `incrementAndGet()` which increments a counter, persists it and sends it back as response, and finally `say` method wich will print a message containing the received string along with the formatted server time. See [DemoActorImpl](DemoActorImpl.java) for details on the implementation of these methods. +First, the client defines how many actors it is going to create. Then the main method declares a `ActorProxyBuilder` for the `DemoActor` class to create `ActorProxy` instances, which are the actor representation provided by the SDK. The code executes the `callActorForever` private method once per actor. This method triggers the DemoActor's implementation by using `actor.invokeActorMethod()`. Initially, it will invoke `registerReminder()`, which sets the due time and period for the reminder. Then, `incrementAndGet()` increments a counter, persists it and sends it back as response. Finally `say` method which will print a message containing the received string along with the formatted server time. Use the follow command to execute the DemoActorClient: