mirror of https://github.com/dapr/java-sdk.git
Improve workflow examples (#949)
* add determinstic UUID generation Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> * add unit test to improve coverage Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> * add clearn examples - update grpcversion Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> * clean up Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> * update readme to using dapr/mechanical-markdown Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> * update validate.yml Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> * resove readme port issue Signed-off-by: kaibocai <kaibocai@microsoft.com> * fix readme.md Signed-off-by: kaibocai <kaibocai@microsoft.com> * fix port issue Signed-off-by: kaibocai <kaibocai@microsoft.com> * test steps Signed-off-by: kaibocai <kaibocai@microsoft.com> * try fix readme validation Signed-off-by: kaibocai <kaibocai@microsoft.com> * update readme Signed-off-by: Kaibo Cai (from Dev Box) <kaibocai@microsoft.com> * MM readme Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> * fix mm error Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> * add port to readme mm Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> * add sleep to readme mm Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> * update mm for readme Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> * fix mm for readme Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> * fix readme mm Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> * remove mm for continueasnew Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> --------- Signed-off-by: kaibocai <89094811+kaibocai@users.noreply.github.com> Signed-off-by: kaibocai <kaibocai@microsoft.com> Signed-off-by: Kaibo Cai (from Dev Box) <kaibocai@microsoft.com> Co-authored-by: Artur Souza <artursouza.ms@outlook.com> Co-authored-by: Mukundan Sundararajan <65565396+mukundansundar@users.noreply.github.com>
This commit is contained in:
parent
3b0747c1fe
commit
da395f8dac
|
@ -162,3 +162,7 @@ jobs:
|
|||
working-directory: ./examples
|
||||
run: |
|
||||
mm.py ./src/main/java/io/dapr/examples/querystate/README.md
|
||||
- name: Validate workflows example
|
||||
working-directory: ./examples
|
||||
run: |
|
||||
mm.py ./src/main/java/io/dapr/examples/workflows/README.md
|
||||
|
|
|
@ -0,0 +1,541 @@
|
|||
# Dapr Workflow Sample
|
||||
|
||||
In this example, we'll use Dapr to test workflow features.
|
||||
|
||||
Visit [the Workflow documentation landing page](https://docs.dapr.io/developing-applications/building-blocks/workflow) for more information.
|
||||
|
||||
This example contains the follow classes:
|
||||
|
||||
* DemoWorkflow: An example of a Dapr Workflow.
|
||||
* DemoWorkflowClient: This application will start workflows using Dapr.
|
||||
* DemoWorkflowWorker: An application that registers a workflow to the Dapr workflow runtime engine. It also executes the workflow instance.
|
||||
|
||||
## Pre-requisites
|
||||
|
||||
* [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/).
|
||||
* Java JDK 11 (or greater):
|
||||
* [Microsoft JDK 11](https://docs.microsoft.com/en-us/java/openjdk/download#openjdk-11)
|
||||
* [Oracle JDK 11](https://www.oracle.com/technetwork/java/javase/downloads/index.html#JDK11)
|
||||
* [OpenJDK 11](https://jdk.java.net/11/)
|
||||
* [Apache Maven](https://maven.apache.org/install.html) version 3.x.
|
||||
|
||||
### Checking out the code
|
||||
|
||||
Clone this repository:
|
||||
|
||||
```sh
|
||||
git clone https://github.com/dapr/java-sdk.git
|
||||
cd java-sdk
|
||||
```
|
||||
|
||||
Then build the Maven project:
|
||||
|
||||
```sh
|
||||
# make sure you are in the `java-sdk` directory.
|
||||
mvn install
|
||||
```
|
||||
|
||||
Get into the `examples` directory.
|
||||
```sh
|
||||
cd examples
|
||||
```
|
||||
|
||||
### Initialize Dapr
|
||||
|
||||
Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized.
|
||||
|
||||
## Patterns
|
||||
|
||||
Those examples contain the following workflow patterns:
|
||||
1. [Chaining Pattern](#chaining-pattern)
|
||||
2. [Fan-out/Fan-in Pattern](#fan-outfan-in-pattern)
|
||||
3. [Continue As New Pattern](#continue-as-new-pattern)
|
||||
4. [External Event Pattern](#external-event-pattern)
|
||||
5. [Sub-workflow Pattern](#sub-workflow-pattern)
|
||||
|
||||
### Chaining Pattern
|
||||
In the chaining pattern, a sequence of activities executes in a specific order.
|
||||
In this pattern, the output of one activity is applied to the input of another activity.
|
||||
The chaining pattern is useful when you need to execute a sequence of activities in a specific order.
|
||||
|
||||
The first Java class is `DemoChainWorker`. Its job is to register an implementation of `DemoChainWorkflow` in Dapr's workflow runtime engine. In the `DemoChainWorker.java` file, you will find the `DemoChainWorker` class and the `main` method. See the code snippet below:
|
||||
```java
|
||||
public class DemoChainWorker {
|
||||
/**
|
||||
* The main method of this app.
|
||||
*
|
||||
* @param args The port the app will listen on.
|
||||
* @throws Exception An Exception.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Register the Workflow with the builder.
|
||||
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoChainWorkflow.class);
|
||||
builder.registerActivity(ToUpperCaseActivity.class);
|
||||
|
||||
// Build and then start the workflow runtime pulling and executing tasks
|
||||
try (WorkflowRuntime runtime = builder.build()) {
|
||||
System.out.println("Start workflow runtime");
|
||||
runtime.start();
|
||||
}
|
||||
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The second Java class you want to look at is `DemoChainWorkflow`, it defines the workflow. In this example it chains the activites in order. See the code snippet below:
|
||||
```java
|
||||
public class DemoChainWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
String result = "";
|
||||
result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Tokyo", String.class).await() + ", ";
|
||||
result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "London", String.class).await() + ", ";
|
||||
result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Seattle", String.class).await();
|
||||
|
||||
ctx.getLogger().info("Workflow finished with result: " + result);
|
||||
ctx.complete(result);
|
||||
};
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The next Java class you want to look at is `ToUpperCaseActivity`, it defines the logics for a single acitvity, in this case, it converts a string to upper case. See the code snippet below:
|
||||
```java
|
||||
public class ToUpperCaseActivity implements WorkflowActivity {
|
||||
|
||||
@Override
|
||||
public String run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class);
|
||||
logger.info("Starting Chaining Activity: " + ctx.getName());
|
||||
|
||||
var message = ctx.getInput(String.class);
|
||||
var newMessage = message.toUpperCase();
|
||||
|
||||
logger.info("Message Received from input: " + message);
|
||||
logger.info("Sending message to output: " + newMessage);
|
||||
|
||||
logger.info("Activity returned: " + newMessage);
|
||||
logger.info("Activity finished");
|
||||
|
||||
return newMessage;
|
||||
}
|
||||
}
|
||||
```
|
||||
<!-- STEP
|
||||
name: Run Chaining Pattern workflow
|
||||
match_order: none
|
||||
output_match_mode: substring
|
||||
expected_stdout_lines:
|
||||
- 'Starting Workflow: io.dapr.examples.workflows.chain.DemoChainWorkflow'
|
||||
- 'Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity'
|
||||
- 'Message Received from input: Tokyo'
|
||||
- 'Sending message to output: TOKYO'
|
||||
- 'Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity'
|
||||
- 'Message Received from input: London'
|
||||
- 'Sending message to output: LONDON'
|
||||
- 'Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity'
|
||||
- 'Message Received from input: Seattle'
|
||||
- 'Sending message to output: SEATTLE'
|
||||
- 'Workflow finished with result: TOKYO, LONDON, SEATTLE'
|
||||
background: true
|
||||
sleep: 60
|
||||
timeout_seconds: 60
|
||||
-->
|
||||
Execute the following script in order to run DemoChainWorker:
|
||||
```sh
|
||||
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainWorker
|
||||
```
|
||||
|
||||
Once running, the logs will start displaying the different steps: First, you can see workflow is starting:
|
||||
```text
|
||||
== APP == Start workflow runtime
|
||||
== APP == Nov 07, 2023 11:03:07 AM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock
|
||||
== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001.
|
||||
```
|
||||
|
||||
Then, execute the following script in order to run DemoChainClient:
|
||||
```sh
|
||||
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainClient
|
||||
```
|
||||
<!-- END_STEP -->
|
||||
|
||||
|
||||
|
||||
Now you can see the worker logs showing the acitvity is invoked in sequnce and the status of each activity:
|
||||
```text
|
||||
== APP == 2023-11-07 11:03:14,178 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.chain.DemoChainWorkflow
|
||||
== APP == 2023-11-07 11:03:14,229 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity
|
||||
== APP == 2023-11-07 11:03:14,235 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: Tokyo
|
||||
== APP == 2023-11-07 11:03:14,235 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: TOKYO
|
||||
== APP == 2023-11-07 11:03:14,266 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity
|
||||
== APP == 2023-11-07 11:03:14,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: London
|
||||
== APP == 2023-11-07 11:03:14,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: LONDON
|
||||
== APP == 2023-11-07 11:03:14,282 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity
|
||||
== APP == 2023-11-07 11:03:14,282 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: Seattle
|
||||
== APP == 2023-11-07 11:03:14,283 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: SEATTLE
|
||||
== APP == 2023-11-07 11:03:14,298 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Workflow finished with result: TOKYO, LONDON, SEATTLE
|
||||
```
|
||||
and the client logs showing the workflow is started and finished with expected result:
|
||||
```text
|
||||
Started a new chaining model workflow with instance ID: 6e4fe69b-689b-4998-b095-d6b52c7d6328
|
||||
workflow instance with ID: 6e4fe69b-689b-4998-b095-d6b52c7d6328 completed with result: TOKYO, LONDON, SEATTLE
|
||||
```
|
||||
|
||||
### Fan-out/Fan-in Pattern
|
||||
|
||||
In the fan out/fan in pattern, you execute multiple activities in parallel and then wait for all activities to finish. Often, some aggregation work is done on the results that are returned from the activities.
|
||||
|
||||
The `DemoFanInOutWorkflow` class defines the workflow. In this example it executes the activities in parallel and then sums the results. See the code snippet below:
|
||||
```java
|
||||
public class DemoFanInOutWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
|
||||
// The input is a list of objects that need to be operated on.
|
||||
// In this example, inputs are expected to be strings.
|
||||
List<?> inputs = ctx.getInput(List.class);
|
||||
|
||||
// Fan-out to multiple concurrent activity invocations, each of which does a word count.
|
||||
List<Task<Integer>> tasks = inputs.stream()
|
||||
.map(input -> ctx.callActivity(CountWordsActivity.class.getName(), input.toString(), Integer.class))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// Fan-in to get the total word count from all the individual activity results.
|
||||
List<Integer> allWordCountResults = ctx.allOf(tasks).await();
|
||||
int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
|
||||
|
||||
ctx.getLogger().info("Workflow finished with result: " + totalWordCount);
|
||||
// Save the final result as the orchestration output.
|
||||
ctx.complete(totalWordCount);
|
||||
};
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `CountWordsActivity` class defines the logics for a single acitvity, in this case, it counts the words in a string. See the code snippet below:
|
||||
```java
|
||||
public class CountWordsActivity implements WorkflowActivity {
|
||||
@Override
|
||||
public Object run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class);
|
||||
logger.info("Starting Activity: " + ctx.getName());
|
||||
|
||||
String input = ctx.getInput(String.class);
|
||||
StringTokenizer tokenizer = new StringTokenizer(input);
|
||||
int result = tokenizer.countTokens();
|
||||
|
||||
logger.info("Activity returned: " + result);
|
||||
logger.info("Activity finished");
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
```
|
||||
<!-- STEP
|
||||
name: Run Chaining Pattern workflow
|
||||
match_order: none
|
||||
output_match_mode: substring
|
||||
expected_stdout_lines:
|
||||
- 'Activity returned: 2'
|
||||
- 'Activity returned: 9'
|
||||
- 'Activity returned: 21'
|
||||
- 'Activity returned: 17'
|
||||
- 'Workflow finished with result: 60'
|
||||
background: true
|
||||
sleep: 60
|
||||
timeout_seconds: 60
|
||||
-->
|
||||
|
||||
Execute the following script in order to run DemoFanInOutWorker:
|
||||
```sh
|
||||
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutWorker
|
||||
```
|
||||
|
||||
Execute the following script in order to run DemoFanInOutClient:
|
||||
|
||||
```sh
|
||||
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutClient
|
||||
```
|
||||
<!-- END_STEP -->
|
||||
|
||||
Now you can see the logs from worker:
|
||||
```text
|
||||
== APP == 2023-11-07 14:52:03,075 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.faninout.DemoFanInOutWorkflow
|
||||
== APP == 2023-11-07 14:52:03,144 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
|
||||
== APP == 2023-11-07 14:52:03,147 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 2
|
||||
== APP == 2023-11-07 14:52:03,148 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
|
||||
== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
|
||||
== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 9
|
||||
== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
|
||||
== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
|
||||
== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 21
|
||||
== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
|
||||
== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
|
||||
== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 17
|
||||
== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
|
||||
== APP == 2023-11-07 14:52:03,173 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
|
||||
== APP == 2023-11-07 14:52:03,173 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 11
|
||||
== APP == 2023-11-07 14:52:03,174 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
|
||||
== APP == 2023-11-07 14:52:03,212 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Workflow finished with result: 60
|
||||
```
|
||||
|
||||
and the client:
|
||||
```text
|
||||
Started a new fan out/fan in model model workflow with instance ID: 092c1928-b5dd-4576-9468-300bf6aed986
|
||||
workflow instance with ID: 092c1928-b5dd-4576-9468-300bf6aed986 completed with result: 60
|
||||
```
|
||||
|
||||
### Continue As New Pattern
|
||||
`ContinueAsNew` API allows you to restart the workflow with a new input.
|
||||
|
||||
The `DemoContinueAsNewWorkflow` class defines the workflow. It simulates periodic cleanup work that happen every 10 seconds, after previous cleanup has finished. See the code snippet below:
|
||||
```java
|
||||
public class DemoContinueAsNewWorkflow extends Workflow {
|
||||
/*
|
||||
Compared with a CRON schedule, this periodic workflow example will never overlap.
|
||||
For example, a CRON schedule that executes a cleanup every hour will execute it at 1:00, 2:00, 3:00 etc.
|
||||
and could potentially run into overlap issues if the cleanup takes longer than an hour.
|
||||
In this example, however, if the cleanup takes 30 minutes, and we create a timer for 1 hour between cleanups,
|
||||
then it will be scheduled at 1:00, 2:30, 4:00, etc. and there is no chance of overlap.
|
||||
*/
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
ctx.getLogger().info("call CleanUpActivity to do the clean up");
|
||||
ctx.callActivity(CleanUpActivity.class.getName()).await();
|
||||
ctx.getLogger().info("CleanUpActivity finished");
|
||||
|
||||
ctx.getLogger().info("wait 10 seconds for next clean up");
|
||||
ctx.createTimer(Duration.ofSeconds(10)).await();
|
||||
|
||||
// continue the workflow.
|
||||
ctx.continueAsNew(null);
|
||||
};
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `CleanUpActivity` class defines the logics for a single acitvity, in this case, it simulates a clean up work. See the code snippet below:
|
||||
```java
|
||||
public class CleanUpActivity implements WorkflowActivity {
|
||||
@Override
|
||||
public Object run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(CleanUpActivity.class);
|
||||
logger.info("Starting Activity: " + ctx.getName());
|
||||
|
||||
logger.info("start clean up work, it may take few seconds to finish...");
|
||||
|
||||
//Sleeping for 5 seconds to simulate long running operation
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return "clean up finish.";
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Once you start the workflow and client using the following commands:
|
||||
```sh
|
||||
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorker
|
||||
```
|
||||
```sh
|
||||
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewClient
|
||||
````
|
||||
|
||||
You will see the logs from worker showing the `CleanUpActivity` is invoked every 10 seconds after previous one is finished:
|
||||
```text
|
||||
== APP == 2023-11-07 14:44:42,004 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow
|
||||
== APP == 2023-11-07 14:44:42,004 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up
|
||||
== APP == 2023-11-07 14:44:42,009 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity
|
||||
== APP == 2023-11-07 14:44:42,009 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish...
|
||||
== APP == 2023-11-07 14:44:47,026 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow
|
||||
== APP == 2023-11-07 14:44:47,026 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up
|
||||
== APP == 2023-11-07 14:44:47,030 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished
|
||||
== APP == 2023-11-07 14:44:47,030 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up
|
||||
== APP == 2023-11-07 14:44:47,033 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity
|
||||
== APP == 2023-11-07 14:44:47,033 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish...
|
||||
== APP == 2023-11-07 14:44:52,053 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished
|
||||
== APP == 2023-11-07 14:44:52,053 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up
|
||||
== APP == 2023-11-07 14:44:57,006 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow
|
||||
== APP == 2023-11-07 14:44:57,006 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up
|
||||
== APP == 2023-11-07 14:44:57,012 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity
|
||||
== APP == 2023-11-07 14:44:57,012 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish...
|
||||
== APP == 2023-11-07 14:45:02,017 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow
|
||||
== APP == 2023-11-07 14:45:02,020 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up
|
||||
== APP == 2023-11-07 14:45:02,021 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished
|
||||
== APP == 2023-11-07 14:45:02,021 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up
|
||||
...
|
||||
```
|
||||
|
||||
and the client:
|
||||
```text
|
||||
Started a new continue-as-new model workflow with instance ID: c853fb93-f0e7-4ad7-ad41-385732386f94
|
||||
```
|
||||
It will continue to run until you stop the worker.
|
||||
|
||||
### External Event Pattern
|
||||
In the external event pattern, a workflow is started by an external event. The workflow can then wait for other external events to occur before completing.
|
||||
|
||||
The `DemoExternalEventWorkflow` class defines the workflow. It waits for an external event `Approval` to run the corresponding activity. See the code snippet below:
|
||||
```java
|
||||
public class DemoExternalEventWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
Boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await();
|
||||
if (approved) {
|
||||
ctx.getLogger().info("approval granted - do the approved action");
|
||||
ctx.callActivity(ApproveActivity.class.getName()).await();
|
||||
ctx.getLogger().info("approval-activity finished");
|
||||
} else {
|
||||
ctx.getLogger().info("approval denied - send a notification");
|
||||
ctx.callActivity(DenyActivity.class.getName()).await();
|
||||
ctx.getLogger().info("denied-activity finished");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
In the `DemoExternalEventClient` class we send out Approval event to tell our workflow to run the approved activity.
|
||||
```java
|
||||
client.raiseEvent(instanceId, "Approval", true);
|
||||
```
|
||||
|
||||
Start the workflow and client using the following commands:
|
||||
|
||||
ex
|
||||
```sh
|
||||
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker
|
||||
```
|
||||
|
||||
```sh
|
||||
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventClient
|
||||
```
|
||||
|
||||
The worker logs:
|
||||
```text
|
||||
== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow
|
||||
== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Waiting for approval...
|
||||
== APP == 2023-11-07 16:01:23,324 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval granted - do the approved action
|
||||
== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity
|
||||
== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Running approval activity...
|
||||
== APP == 2023-11-07 16:01:28,410 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval-activity finished
|
||||
```
|
||||
|
||||
The client log:
|
||||
```text
|
||||
Started a new external-event model workflow with instance ID: 23410d96-1afe-4698-9fcd-c01c1e0db255
|
||||
workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed.
|
||||
```
|
||||
|
||||
### Sub-workflow Pattern
|
||||
The sub-workflow pattern allows you to call a workflow from another workflow.
|
||||
|
||||
The `DemoWorkflow` class defines the workflow. It calls a sub-workflow `DemoSubWorkflow` to do the work. See the code snippet below:
|
||||
```java
|
||||
public class DemoWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
var subWorkflowInput = "Hello Dapr Workflow!";
|
||||
ctx.getLogger().info("calling subworkflow with input: " + subWorkflowInput);
|
||||
|
||||
var subWorkflowOutput =
|
||||
ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), subWorkflowInput, String.class).await();
|
||||
|
||||
ctx.getLogger().info("subworkflow finished with: " + subWorkflowOutput);
|
||||
};
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `DemoSubWorkflow` class defines the sub-workflow. It call the activity to do the work and returns the result. See the code snippet below:
|
||||
```java
|
||||
public class DemoSubWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting SubWorkflow: " + ctx.getName());
|
||||
|
||||
var subWorkflowInput = ctx.getInput(String.class);
|
||||
ctx.getLogger().info("SubWorkflow received input: " + subWorkflowInput);
|
||||
|
||||
ctx.getLogger().info("SubWorkflow is calling Activity: " + ReverseActivity.class.getName());
|
||||
String result = ctx.callActivity(ReverseActivity.class.getName(), subWorkflowInput, String.class).await();
|
||||
|
||||
ctx.getLogger().info("SubWorkflow finished with: " + result);
|
||||
ctx.complete(result);
|
||||
};
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `ReverseActivity` class defines the logics for a single acitvity, in this case, it reverses a string. See the code snippet below:
|
||||
```java
|
||||
public class ReverseActivity implements WorkflowActivity {
|
||||
@Override
|
||||
public Object run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(ReverseActivity.class);
|
||||
logger.info("Starting Activity: " + ctx.getName());
|
||||
|
||||
var message = ctx.getInput(String.class);
|
||||
var newMessage = new StringBuilder(message).reverse().toString();
|
||||
|
||||
logger.info("Message Received from input: " + message);
|
||||
logger.info("Sending message to output: " + newMessage);
|
||||
|
||||
logger.info("Activity returned: " + newMessage);
|
||||
logger.info("Activity finished");
|
||||
|
||||
return newMessage;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Start the workflow and client using the following commands:
|
||||
|
||||
ex
|
||||
```sh
|
||||
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker
|
||||
```
|
||||
|
||||
```sh
|
||||
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkerflowClient
|
||||
```
|
||||
|
||||
The log from worker:
|
||||
```text
|
||||
== APP == 2023-11-07 20:08:52,521 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.subworkflow.DemoWorkflow
|
||||
== APP == 2023-11-07 20:08:52,523 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - calling subworkflow with input: Hello Dapr Workflow!
|
||||
== APP == 2023-11-07 20:08:52,561 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting SubWorkflow: io.dapr.examples.workflows.subworkflow.DemoSubWorkflow
|
||||
== APP == 2023-11-07 20:08:52,566 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow received input: Hello Dapr Workflow!
|
||||
== APP == 2023-11-07 20:08:52,566 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow is calling Activity: io.dapr.examples.workflows.subworkflow.ReverseActivity
|
||||
== APP == 2023-11-07 20:08:52,576 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Starting Activity: io.dapr.examples.workflows.subworkflow.ReverseActivity
|
||||
== APP == 2023-11-07 20:08:52,577 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Message Received from input: Hello Dapr Workflow!
|
||||
== APP == 2023-11-07 20:08:52,577 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Sending message to output: !wolfkroW rpaD olleH
|
||||
== APP == 2023-11-07 20:08:52,596 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow finished with: !wolfkroW rpaD olleH
|
||||
== APP == 2023-11-07 20:08:52,611 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - subworkflow finished with: !wolfkroW rpaD olleH
|
||||
```
|
||||
|
||||
The log from client:
|
||||
```text
|
||||
Started a new sub-workflow model workflow with instance ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb
|
||||
workflow instance with ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb completed with result: !wolfkroW rpaD olleH
|
||||
```
|
|
@ -1,34 +0,0 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows;
|
||||
|
||||
public class DemoActivityInput {
|
||||
|
||||
private String message;
|
||||
|
||||
public DemoActivityInput() {
|
||||
}
|
||||
|
||||
public DemoActivityInput(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public String getMessage() {
|
||||
return message;
|
||||
}
|
||||
|
||||
public void setMessage(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
}
|
|
@ -1,44 +0,0 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows;
|
||||
|
||||
public class DemoActivityOutput {
|
||||
|
||||
private String originalMessage;
|
||||
private String newMessage;
|
||||
|
||||
public DemoActivityOutput() {
|
||||
}
|
||||
|
||||
public DemoActivityOutput(String originalMessage, String newMessage) {
|
||||
this.originalMessage = originalMessage;
|
||||
this.newMessage = newMessage;
|
||||
}
|
||||
|
||||
public String getOriginalMessage() {
|
||||
return originalMessage;
|
||||
}
|
||||
|
||||
public void setOriginalMessage(String originalMessage) {
|
||||
this.originalMessage = originalMessage;
|
||||
}
|
||||
|
||||
public String getNewMessage() {
|
||||
return newMessage;
|
||||
}
|
||||
|
||||
public void setNewMessage(String newMessage) {
|
||||
this.newMessage = newMessage;
|
||||
}
|
||||
}
|
|
@ -1,126 +0,0 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows;
|
||||
|
||||
import com.microsoft.durabletask.CompositeTaskFailedException;
|
||||
import com.microsoft.durabletask.Task;
|
||||
import com.microsoft.durabletask.TaskCanceledException;
|
||||
import io.dapr.workflows.Workflow;
|
||||
import io.dapr.workflows.WorkflowStub;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Implementation of the DemoWorkflow for the server side.
|
||||
*/
|
||||
public class DemoWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
ctx.getLogger().info("Instance ID: " + ctx.getInstanceId());
|
||||
ctx.getLogger().info("Current Orchestration Time: " + ctx.getCurrentInstant());
|
||||
ctx.getLogger().info("Waiting for event: 'TimedOutEvent'...");
|
||||
try {
|
||||
ctx.waitForExternalEvent("TimedOutEvent", Duration.ofSeconds(10)).await();
|
||||
} catch (TaskCanceledException e) {
|
||||
ctx.getLogger().warn("Timed out");
|
||||
ctx.getLogger().warn(e.getMessage());
|
||||
}
|
||||
|
||||
ctx.getLogger().info("Waiting for event: 'TestEvent'...");
|
||||
try {
|
||||
ctx.waitForExternalEvent("TestEvent", Duration.ofSeconds(10)).await();
|
||||
ctx.getLogger().info("Received TestEvent");
|
||||
} catch (TaskCanceledException e) {
|
||||
ctx.getLogger().warn("Timed out");
|
||||
ctx.getLogger().warn(e.getMessage());
|
||||
}
|
||||
|
||||
ctx.getLogger().info("Parallel Execution - Waiting for all tasks to finish...");
|
||||
try {
|
||||
Task<String> t1 = ctx.waitForExternalEvent("event1", Duration.ofSeconds(5), String.class);
|
||||
Task<String> t2 = ctx.waitForExternalEvent("event2", Duration.ofSeconds(5), String.class);
|
||||
Task<String> t3 = ctx.waitForExternalEvent("event3", Duration.ofSeconds(5), String.class);
|
||||
|
||||
List<String> results = ctx.allOf(Arrays.asList(t1, t2, t3)).await();
|
||||
results.forEach(t -> ctx.getLogger().info("finished task: " + t));
|
||||
ctx.getLogger().info("All tasks finished!");
|
||||
|
||||
} catch (CompositeTaskFailedException e) {
|
||||
ctx.getLogger().warn(e.getMessage());
|
||||
List<Exception> exceptions = e.getExceptions();
|
||||
exceptions.forEach(ex -> ctx.getLogger().warn(ex.getMessage()));
|
||||
}
|
||||
|
||||
ctx.getLogger().info("Parallel Execution - Waiting for any task to finish...");
|
||||
try {
|
||||
Task<String> e1 = ctx.waitForExternalEvent("e1", Duration.ofSeconds(5), String.class);
|
||||
Task<String> e2 = ctx.waitForExternalEvent("e2", Duration.ofSeconds(5), String.class);
|
||||
Task<String> e3 = ctx.waitForExternalEvent("e3", Duration.ofSeconds(5), String.class);
|
||||
Task<Void> timeoutTask = ctx.createTimer(Duration.ofSeconds(1));
|
||||
|
||||
Task<?> winner = ctx.anyOf(Arrays.asList(e1, e2, e3, timeoutTask)).await();
|
||||
if (winner == timeoutTask) {
|
||||
ctx.getLogger().info("All tasks timed out!");
|
||||
} else {
|
||||
ctx.getLogger().info("One of the tasks finished!");
|
||||
}
|
||||
} catch (TaskCanceledException e) {
|
||||
ctx.getLogger().warn("Timed out");
|
||||
ctx.getLogger().warn(e.getMessage());
|
||||
}
|
||||
|
||||
ctx.getLogger().info("Calling Activity...");
|
||||
var input = new DemoActivityInput("Hello Activity!");
|
||||
var output = ctx.callActivity(DemoWorkflowActivity.class.getName(), input, DemoActivityOutput.class).await();
|
||||
|
||||
ctx.getLogger().info("Activity returned: " + output);
|
||||
ctx.getLogger().info("Activity returned: " + output.getNewMessage());
|
||||
ctx.getLogger().info("Activity returned: " + output.getOriginalMessage());
|
||||
|
||||
|
||||
boolean shouldComplete = true;
|
||||
ctx.getLogger().info("Waiting for event: 'RestartEvent'...");
|
||||
try {
|
||||
ctx.waitForExternalEvent("RestartEvent", Duration.ofSeconds(10)).await();
|
||||
ctx.getLogger().info("Received RestartEvent");
|
||||
ctx.getLogger().info("Restarting Workflow by calling continueAsNew...");
|
||||
ctx.continueAsNew("TestInputRestart", false);
|
||||
shouldComplete = false;
|
||||
} catch (TaskCanceledException e) {
|
||||
ctx.getLogger().warn("Restart Timed out");
|
||||
ctx.getLogger().warn(e.getMessage());
|
||||
}
|
||||
|
||||
if (shouldComplete) {
|
||||
ctx.getLogger().info("Child-Workflow> Calling ChildWorkflow...");
|
||||
var childWorkflowInput = "Hello ChildWorkflow!";
|
||||
var childWorkflowOutput =
|
||||
ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), childWorkflowInput, String.class).await();
|
||||
|
||||
ctx.getLogger().info("Child-Workflow> returned: " + childWorkflowOutput);
|
||||
|
||||
ctx.getLogger().info("Workflow finished");
|
||||
ctx.complete("finished");
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
ctx.getLogger().info("Workflow restarted");
|
||||
};
|
||||
}
|
||||
}
|
|
@ -1,125 +0,0 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows;
|
||||
|
||||
import io.dapr.workflows.client.DaprWorkflowClient;
|
||||
import io.dapr.workflows.client.WorkflowInstanceStatus;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* For setup instructions, see the README.
|
||||
*/
|
||||
public class DemoWorkflowClient {
|
||||
|
||||
/**
|
||||
* The main method.
|
||||
*
|
||||
* @param args Input arguments (unused).
|
||||
* @throws InterruptedException If program has been interrupted.
|
||||
*/
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
DaprWorkflowClient client = new DaprWorkflowClient();
|
||||
|
||||
try (client) {
|
||||
String separatorStr = "*******";
|
||||
System.out.println(separatorStr);
|
||||
String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data");
|
||||
System.out.printf("Started new workflow instance with random ID: %s%n", instanceId);
|
||||
|
||||
System.out.println(separatorStr);
|
||||
System.out.println("**GetInstanceMetadata:Running Workflow**");
|
||||
WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true);
|
||||
System.out.printf("Result: %s%n", workflowMetadata);
|
||||
|
||||
System.out.println(separatorStr);
|
||||
System.out.println("**WaitForInstanceStart**");
|
||||
try {
|
||||
WorkflowInstanceStatus waitForInstanceStartResult =
|
||||
client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true);
|
||||
System.out.printf("Result: %s%n", waitForInstanceStartResult);
|
||||
} catch (TimeoutException ex) {
|
||||
System.out.printf("waitForInstanceStart has an exception:%s%n", ex);
|
||||
}
|
||||
|
||||
System.out.println(separatorStr);
|
||||
System.out.println("**SendExternalMessage**");
|
||||
client.raiseEvent(instanceId, "TestEvent", "TestEventPayload");
|
||||
|
||||
System.out.println(separatorStr);
|
||||
System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **");
|
||||
client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload");
|
||||
client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload");
|
||||
client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload");
|
||||
System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId);
|
||||
|
||||
System.out.println(separatorStr);
|
||||
System.out.println("** Registering Event to be captured by anyOf(t1,t2,t3) **");
|
||||
client.raiseEvent(instanceId, "e2", "event 2 Payload");
|
||||
System.out.printf("Event raised for workflow with instanceId: %s\n", instanceId);
|
||||
|
||||
|
||||
System.out.println(separatorStr);
|
||||
System.out.println("**WaitForInstanceCompletion**");
|
||||
try {
|
||||
WorkflowInstanceStatus waitForInstanceCompletionResult =
|
||||
client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true);
|
||||
System.out.printf("Result: %s%n", waitForInstanceCompletionResult);
|
||||
} catch (TimeoutException ex) {
|
||||
System.out.printf("waitForInstanceCompletion has an exception:%s%n", ex);
|
||||
}
|
||||
|
||||
System.out.println(separatorStr);
|
||||
System.out.println("**purgeInstance**");
|
||||
boolean purgeResult = client.purgeInstance(instanceId);
|
||||
System.out.printf("purgeResult: %s%n", purgeResult);
|
||||
|
||||
System.out.println(separatorStr);
|
||||
System.out.println("**raiseEvent**");
|
||||
|
||||
String eventInstanceId = client.scheduleNewWorkflow(DemoWorkflow.class);
|
||||
System.out.printf("Started new workflow instance with random ID: %s%n", eventInstanceId);
|
||||
client.raiseEvent(eventInstanceId, "TestException", null);
|
||||
System.out.printf("Event raised for workflow with instanceId: %s\n", eventInstanceId);
|
||||
|
||||
System.out.println(separatorStr);
|
||||
String instanceToTerminateId = "terminateMe";
|
||||
client.scheduleNewWorkflow(DemoWorkflow.class, null, instanceToTerminateId);
|
||||
System.out.printf("Started new workflow instance with specified ID: %s%n", instanceToTerminateId);
|
||||
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
System.out.println("Terminate this workflow instance manually before the timeout is reached");
|
||||
client.terminateWorkflow(instanceToTerminateId, null);
|
||||
System.out.println(separatorStr);
|
||||
|
||||
String restartingInstanceId = "restarting";
|
||||
client.scheduleNewWorkflow(DemoWorkflow.class, null, restartingInstanceId);
|
||||
System.out.printf("Started new workflow instance with ID: %s%n", restartingInstanceId);
|
||||
System.out.println("Sleeping 30 seconds to restart the workflow");
|
||||
TimeUnit.SECONDS.sleep(30);
|
||||
|
||||
System.out.println("**SendExternalMessage: RestartEvent**");
|
||||
client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload");
|
||||
|
||||
System.out.println("Sleeping 30 seconds to terminate the eternal workflow");
|
||||
TimeUnit.SECONDS.sleep(30);
|
||||
client.terminateWorkflow(restartingInstanceId, null);
|
||||
}
|
||||
|
||||
System.out.println("Exiting DemoWorkflowClient.");
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
|
@ -44,38 +44,519 @@ cd examples
|
|||
|
||||
Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized.
|
||||
|
||||
### Running the demo Workflow worker
|
||||
## Patterns
|
||||
|
||||
The first Java class to consider is `DemoWorkflowWorker`. It's job is to register an implementation of `DemoWorkflow` in Dapr's workflow runtime engine. In the `DemoWorkflowWorker.java` file, you will find the `DemoWorkflowWorker` class and the `main` method. See the code snippet below:
|
||||
Those examples contain the following workflow patterns:
|
||||
1. [Chaining Pattern](#chaining-pattern)
|
||||
2. [Fan-out/Fan-in Pattern](#fan-outfan-in-pattern)
|
||||
3. [Continue As New Pattern](#continue-as-new-pattern)
|
||||
4. [External Event Pattern](#external-event-pattern)
|
||||
5. [Sub-workflow Pattern](#sub-workflow-pattern)
|
||||
|
||||
### Chaining Pattern
|
||||
In the chaining pattern, a sequence of activities executes in a specific order.
|
||||
In this pattern, the output of one activity is applied to the input of another activity.
|
||||
The chaining pattern is useful when you need to execute a sequence of activities in a specific order.
|
||||
|
||||
The first Java class is `DemoChainWorker`. Its job is to register an implementation of `DemoChainWorkflow` in Dapr's workflow runtime engine. In the `DemoChainWorker.java` file, you will find the `DemoChainWorker` class and the `main` method. See the code snippet below:
|
||||
```java
|
||||
public class DemoWorkflowWorker {
|
||||
|
||||
public class DemoChainWorker {
|
||||
/**
|
||||
* The main method of this app.
|
||||
*
|
||||
* @param args The port the app will listen on.
|
||||
* @throws Exception An Exception.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Register the Workflow with the runtime.
|
||||
WorkflowRuntime.getInstance().registerWorkflow(DemoWorkflow.class);
|
||||
System.out.println("Start workflow runtime");
|
||||
WorkflowRuntime.getInstance().startAndBlock();
|
||||
// Register the Workflow with the builder.
|
||||
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoChainWorkflow.class);
|
||||
builder.registerActivity(ToUpperCaseActivity.class);
|
||||
|
||||
// Build and then start the workflow runtime pulling and executing tasks
|
||||
try (WorkflowRuntime runtime = builder.build()) {
|
||||
System.out.println("Start workflow runtime");
|
||||
runtime.start();
|
||||
}
|
||||
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
This application uses `WorkflowRuntime.getInstance().registerWorkflow()` in order to register `DemoWorkflow` as a Workflow in the Dapr Workflow runtime.
|
||||
The second Java class you want to look at is `DemoChainWorkflow`, it defines the workflow. In this example it chains the activites in order. See the code snippet below:
|
||||
```java
|
||||
public class DemoChainWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
The `WorkflowRuntime.getInstance().start()` method will build and start the engine within the Dapr workflow runtime.
|
||||
String result = "";
|
||||
result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Tokyo", String.class).await() + ", ";
|
||||
result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "London", String.class).await() + ", ";
|
||||
result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Seattle", String.class).await();
|
||||
|
||||
Now, execute the following script in order to run DemoWorkflowWorker:
|
||||
```sh
|
||||
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.DemoWorkflowWorker
|
||||
ctx.getLogger().info("Workflow finished with result: " + result);
|
||||
ctx.complete(result);
|
||||
};
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Running the Workflow client
|
||||
The next Java class you want to look at is `ToUpperCaseActivity`, it defines the logics for a single acitvity, in this case, it converts a string to upper case. See the code snippet below:
|
||||
```java
|
||||
public class ToUpperCaseActivity implements WorkflowActivity {
|
||||
|
||||
The `DemoWorkflowClient` starts instances of workflows that have been registered with Dapr.
|
||||
@Override
|
||||
public String run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class);
|
||||
logger.info("Starting Chaining Activity: " + ctx.getName());
|
||||
|
||||
With the DemoWorkflowWorker running, use the follow command to start the workflow with the DemoWorkflowClient:
|
||||
var message = ctx.getInput(String.class);
|
||||
var newMessage = message.toUpperCase();
|
||||
|
||||
logger.info("Message Received from input: " + message);
|
||||
logger.info("Sending message to output: " + newMessage);
|
||||
|
||||
logger.info("Activity returned: " + newMessage);
|
||||
logger.info("Activity finished");
|
||||
|
||||
return newMessage;
|
||||
}
|
||||
}
|
||||
```
|
||||
<!-- STEP
|
||||
name: Run Chaining Pattern workflow
|
||||
match_order: none
|
||||
output_match_mode: substring
|
||||
expected_stdout_lines:
|
||||
- 'Starting Workflow: io.dapr.examples.workflows.chain.DemoChainWorkflow'
|
||||
- 'Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity'
|
||||
- 'Message Received from input: Tokyo'
|
||||
- 'Sending message to output: TOKYO'
|
||||
- 'Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity'
|
||||
- 'Message Received from input: London'
|
||||
- 'Sending message to output: LONDON'
|
||||
- 'Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity'
|
||||
- 'Message Received from input: Seattle'
|
||||
- 'Sending message to output: SEATTLE'
|
||||
- 'Workflow finished with result: TOKYO, LONDON, SEATTLE'
|
||||
background: true
|
||||
sleep: 60
|
||||
timeout_seconds: 60
|
||||
-->
|
||||
Execute the following script in order to run DemoChainWorker:
|
||||
```sh
|
||||
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainWorker
|
||||
```
|
||||
Once running, the logs will start displaying the different steps: First, you can see workflow is starting:
|
||||
```text
|
||||
== APP == Start workflow runtime
|
||||
== APP == Nov 07, 2023 11:03:07 AM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock
|
||||
== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001.
|
||||
```
|
||||
|
||||
Then, execute the following script in order to run DemoChainClient:
|
||||
```sh
|
||||
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.chain.DemoChainClient
|
||||
```
|
||||
<!-- END_STEP -->
|
||||
|
||||
Now you can see the worker logs showing the acitvity is invoked in sequnce and the status of each activity:
|
||||
```text
|
||||
== APP == 2023-11-07 11:03:14,178 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.chain.DemoChainWorkflow
|
||||
== APP == 2023-11-07 11:03:14,229 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity
|
||||
== APP == 2023-11-07 11:03:14,235 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: Tokyo
|
||||
== APP == 2023-11-07 11:03:14,235 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: TOKYO
|
||||
== APP == 2023-11-07 11:03:14,266 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity
|
||||
== APP == 2023-11-07 11:03:14,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: London
|
||||
== APP == 2023-11-07 11:03:14,267 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: LONDON
|
||||
== APP == 2023-11-07 11:03:14,282 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Starting Activity: io.dapr.examples.workflows.chain.ToUpperCaseActivity
|
||||
== APP == 2023-11-07 11:03:14,282 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Message Received from input: Seattle
|
||||
== APP == 2023-11-07 11:03:14,283 {HH:mm:ss.SSS} [main] INFO i.d.e.w.chain.ToUpperCaseActivity - Sending message to output: SEATTLE
|
||||
== APP == 2023-11-07 11:03:14,298 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Workflow finished with result: TOKYO, LONDON, SEATTLE
|
||||
```
|
||||
and the client logs showing the workflow is started and finished with expected result:
|
||||
```text
|
||||
Started a new chaining model workflow with instance ID: 6e4fe69b-689b-4998-b095-d6b52c7d6328
|
||||
workflow instance with ID: 6e4fe69b-689b-4998-b095-d6b52c7d6328 completed with result: TOKYO, LONDON, SEATTLE
|
||||
```
|
||||
|
||||
### Fan-out/Fan-in Pattern
|
||||
|
||||
In the fan out/fan in pattern, you execute multiple activities in parallel and then wait for all activities to finish. Often, some aggregation work is done on the results that are returned from the activities.
|
||||
|
||||
The `DemoFanInOutWorkflow` class defines the workflow. In this example it executes the activities in parallel and then sums the results. See the code snippet below:
|
||||
```java
|
||||
public class DemoFanInOutWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
|
||||
// The input is a list of objects that need to be operated on.
|
||||
// In this example, inputs are expected to be strings.
|
||||
List<?> inputs = ctx.getInput(List.class);
|
||||
|
||||
// Fan-out to multiple concurrent activity invocations, each of which does a word count.
|
||||
List<Task<Integer>> tasks = inputs.stream()
|
||||
.map(input -> ctx.callActivity(CountWordsActivity.class.getName(), input.toString(), Integer.class))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// Fan-in to get the total word count from all the individual activity results.
|
||||
List<Integer> allWordCountResults = ctx.allOf(tasks).await();
|
||||
int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
|
||||
|
||||
ctx.getLogger().info("Workflow finished with result: " + totalWordCount);
|
||||
// Save the final result as the orchestration output.
|
||||
ctx.complete(totalWordCount);
|
||||
};
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `CountWordsActivity` class defines the logics for a single acitvity, in this case, it counts the words in a string. See the code snippet below:
|
||||
```java
|
||||
public class CountWordsActivity implements WorkflowActivity {
|
||||
@Override
|
||||
public Object run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class);
|
||||
logger.info("Starting Activity: " + ctx.getName());
|
||||
|
||||
String input = ctx.getInput(String.class);
|
||||
StringTokenizer tokenizer = new StringTokenizer(input);
|
||||
int result = tokenizer.countTokens();
|
||||
|
||||
logger.info("Activity returned: " + result);
|
||||
logger.info("Activity finished");
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
```
|
||||
<!-- STEP
|
||||
name: Run Fan-in/Fan-out Pattern workflow
|
||||
match_order: none
|
||||
output_match_mode: substring
|
||||
expected_stdout_lines:
|
||||
- 'Activity returned: 2'
|
||||
- 'Activity returned: 9'
|
||||
- 'Activity returned: 21'
|
||||
- 'Activity returned: 17'
|
||||
- 'Workflow finished with result: 60'
|
||||
background: true
|
||||
sleep: 60
|
||||
timeout_seconds: 60
|
||||
-->
|
||||
|
||||
Execute the following script in order to run DemoFanInOutWorker:
|
||||
```sh
|
||||
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutWorker
|
||||
```
|
||||
Execute the following script in order to run DemoFanInOutClient:
|
||||
```sh
|
||||
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.faninout.DemoFanInOutClient
|
||||
```
|
||||
<!-- END_STEP -->
|
||||
|
||||
Now you can see the logs from worker:
|
||||
```text
|
||||
== APP == 2023-11-07 14:52:03,075 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.faninout.DemoFanInOutWorkflow
|
||||
== APP == 2023-11-07 14:52:03,144 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
|
||||
== APP == 2023-11-07 14:52:03,147 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 2
|
||||
== APP == 2023-11-07 14:52:03,148 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
|
||||
== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
|
||||
== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 9
|
||||
== APP == 2023-11-07 14:52:03,152 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
|
||||
== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
|
||||
== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 21
|
||||
== APP == 2023-11-07 14:52:03,167 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
|
||||
== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
|
||||
== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 17
|
||||
== APP == 2023-11-07 14:52:03,170 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
|
||||
== APP == 2023-11-07 14:52:03,173 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Starting Activity: io.dapr.examples.workflows.faninout.CountWordsActivity
|
||||
== APP == 2023-11-07 14:52:03,173 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity returned: 11
|
||||
== APP == 2023-11-07 14:52:03,174 {HH:mm:ss.SSS} [main] INFO i.d.e.w.faninout.CountWordsActivity - Activity finished
|
||||
== APP == 2023-11-07 14:52:03,212 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Workflow finished with result: 60
|
||||
```
|
||||
|
||||
and the client:
|
||||
```text
|
||||
Started a new fan out/fan in model model workflow with instance ID: 092c1928-b5dd-4576-9468-300bf6aed986
|
||||
workflow instance with ID: 092c1928-b5dd-4576-9468-300bf6aed986 completed with result: 60
|
||||
```
|
||||
|
||||
### Continue As New Pattern
|
||||
`ContinueAsNew` API allows you to restart the workflow with a new input.
|
||||
|
||||
The `DemoContinueAsNewWorkflow` class defines the workflow. It simulates periodic cleanup work that happen every 10 seconds, after previous cleanup has finished. See the code snippet below:
|
||||
```java
|
||||
public class DemoContinueAsNewWorkflow extends Workflow {
|
||||
/*
|
||||
Compared with a CRON schedule, this periodic workflow example will never overlap.
|
||||
For example, a CRON schedule that executes a cleanup every hour will execute it at 1:00, 2:00, 3:00 etc.
|
||||
and could potentially run into overlap issues if the cleanup takes longer than an hour.
|
||||
In this example, however, if the cleanup takes 30 minutes, and we create a timer for 1 hour between cleanups,
|
||||
then it will be scheduled at 1:00, 2:30, 4:00, etc. and there is no chance of overlap.
|
||||
*/
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
ctx.getLogger().info("call CleanUpActivity to do the clean up");
|
||||
ctx.callActivity(CleanUpActivity.class.getName()).await();
|
||||
ctx.getLogger().info("CleanUpActivity finished");
|
||||
|
||||
ctx.getLogger().info("wait 10 seconds for next clean up");
|
||||
ctx.createTimer(Duration.ofSeconds(10)).await();
|
||||
|
||||
// continue the workflow.
|
||||
ctx.continueAsNew(null);
|
||||
};
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `CleanUpActivity` class defines the logics for a single acitvity, in this case, it simulates a clean up work. See the code snippet below:
|
||||
```java
|
||||
public class CleanUpActivity implements WorkflowActivity {
|
||||
@Override
|
||||
public Object run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(CleanUpActivity.class);
|
||||
logger.info("Starting Activity: " + ctx.getName());
|
||||
|
||||
logger.info("start clean up work, it may take few seconds to finish...");
|
||||
|
||||
//Sleeping for 5 seconds to simulate long running operation
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return "clean up finish.";
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Once you start the workflow and client using the following commands:
|
||||
```sh
|
||||
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorker
|
||||
```
|
||||
```sh
|
||||
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewClient
|
||||
````
|
||||
|
||||
You will see the logs from worker showing the `CleanUpActivity` is invoked every 10 seconds after previous one is finished:
|
||||
```text
|
||||
== APP == 2023-11-07 14:44:42,004 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow
|
||||
== APP == 2023-11-07 14:44:42,004 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up
|
||||
== APP == 2023-11-07 14:44:42,009 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity
|
||||
== APP == 2023-11-07 14:44:42,009 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish...
|
||||
== APP == 2023-11-07 14:44:47,026 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow
|
||||
== APP == 2023-11-07 14:44:47,026 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up
|
||||
== APP == 2023-11-07 14:44:47,030 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished
|
||||
== APP == 2023-11-07 14:44:47,030 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up
|
||||
== APP == 2023-11-07 14:44:47,033 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity
|
||||
== APP == 2023-11-07 14:44:47,033 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish...
|
||||
== APP == 2023-11-07 14:44:52,053 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished
|
||||
== APP == 2023-11-07 14:44:52,053 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up
|
||||
== APP == 2023-11-07 14:44:57,006 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow
|
||||
== APP == 2023-11-07 14:44:57,006 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up
|
||||
== APP == 2023-11-07 14:44:57,012 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - Starting Activity: io.dapr.examples.workflows.continueasnew.CleanUpActivity
|
||||
== APP == 2023-11-07 14:44:57,012 {HH:mm:ss.SSS} [main] INFO i.d.e.w.c.CleanUpActivity - start clean up work, it may take few seconds to finish...
|
||||
== APP == 2023-11-07 14:45:02,017 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.continueasnew.DemoContinueAsNewWorkflow
|
||||
== APP == 2023-11-07 14:45:02,020 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - call CleanUpActivity to do the clean up
|
||||
== APP == 2023-11-07 14:45:02,021 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - CleanUpActivity finished
|
||||
== APP == 2023-11-07 14:45:02,021 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - wait 10 seconds for next clean up
|
||||
...
|
||||
```
|
||||
|
||||
and the client:
|
||||
```text
|
||||
Started a new continue-as-new model workflow with instance ID: c853fb93-f0e7-4ad7-ad41-385732386f94
|
||||
```
|
||||
It will continue to run until you stop the worker.
|
||||
|
||||
### External Event Pattern
|
||||
In the external event pattern, a workflow is started by an external event. The workflow can then wait for other external events to occur before completing.
|
||||
|
||||
The `DemoExternalEventWorkflow` class defines the workflow. It waits for an external event `Approval` to run the corresponding activity. See the code snippet below:
|
||||
```java
|
||||
public class DemoExternalEventWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
Boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await();
|
||||
if (approved) {
|
||||
ctx.getLogger().info("approval granted - do the approved action");
|
||||
ctx.callActivity(ApproveActivity.class.getName()).await();
|
||||
ctx.getLogger().info("approval-activity finished");
|
||||
} else {
|
||||
ctx.getLogger().info("approval denied - send a notification");
|
||||
ctx.callActivity(DenyActivity.class.getName()).await();
|
||||
ctx.getLogger().info("denied-activity finished");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
In the `DemoExternalEventClient` class we send out Approval event to tell our workflow to run the approved activity.
|
||||
```java
|
||||
client.raiseEvent(instanceId, "Approval", true);
|
||||
```
|
||||
|
||||
Start the workflow and client using the following commands:
|
||||
|
||||
<!-- STEP
|
||||
name: Run Wait External Event Pattern workflow
|
||||
match_order: none
|
||||
output_match_mode: substring
|
||||
expected_stdout_lines:
|
||||
- 'Starting Workflow: io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow'
|
||||
- 'Waiting for approval...'
|
||||
- 'approval granted - do the approved action'
|
||||
- 'Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity'
|
||||
- 'Running approval activity...'
|
||||
- 'approval-activity finished'
|
||||
background: true
|
||||
sleep: 60
|
||||
timeout_seconds: 60
|
||||
-->
|
||||
```sh
|
||||
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker
|
||||
```
|
||||
|
||||
```sh
|
||||
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.DemoWorkflowClient
|
||||
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventClient
|
||||
```
|
||||
<!-- END_STEP -->
|
||||
|
||||
The worker logs:
|
||||
```text
|
||||
== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow
|
||||
== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Waiting for approval...
|
||||
== APP == 2023-11-07 16:01:23,324 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval granted - do the approved action
|
||||
== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity
|
||||
== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Running approval activity...
|
||||
== APP == 2023-11-07 16:01:28,410 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval-activity finished
|
||||
```
|
||||
|
||||
The client log:
|
||||
```text
|
||||
Started a new external-event model workflow with instance ID: 23410d96-1afe-4698-9fcd-c01c1e0db255
|
||||
workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed.
|
||||
```
|
||||
|
||||
### Sub-workflow Pattern
|
||||
The sub-workflow pattern allows you to call a workflow from another workflow.
|
||||
|
||||
The `DemoWorkflow` class defines the workflow. It calls a sub-workflow `DemoSubWorkflow` to do the work. See the code snippet below:
|
||||
```java
|
||||
public class DemoWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
var subWorkflowInput = "Hello Dapr Workflow!";
|
||||
ctx.getLogger().info("calling subworkflow with input: " + subWorkflowInput);
|
||||
|
||||
var subWorkflowOutput =
|
||||
ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), subWorkflowInput, String.class).await();
|
||||
|
||||
ctx.getLogger().info("subworkflow finished with: " + subWorkflowOutput);
|
||||
};
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `DemoSubWorkflow` class defines the sub-workflow. It call the activity to do the work and returns the result. See the code snippet below:
|
||||
```java
|
||||
public class DemoSubWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting SubWorkflow: " + ctx.getName());
|
||||
|
||||
var subWorkflowInput = ctx.getInput(String.class);
|
||||
ctx.getLogger().info("SubWorkflow received input: " + subWorkflowInput);
|
||||
|
||||
ctx.getLogger().info("SubWorkflow is calling Activity: " + ReverseActivity.class.getName());
|
||||
String result = ctx.callActivity(ReverseActivity.class.getName(), subWorkflowInput, String.class).await();
|
||||
|
||||
ctx.getLogger().info("SubWorkflow finished with: " + result);
|
||||
ctx.complete(result);
|
||||
};
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `ReverseActivity` class defines the logics for a single acitvity, in this case, it reverses a string. See the code snippet below:
|
||||
```java
|
||||
public class ReverseActivity implements WorkflowActivity {
|
||||
@Override
|
||||
public Object run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(ReverseActivity.class);
|
||||
logger.info("Starting Activity: " + ctx.getName());
|
||||
|
||||
var message = ctx.getInput(String.class);
|
||||
var newMessage = new StringBuilder(message).reverse().toString();
|
||||
|
||||
logger.info("Message Received from input: " + message);
|
||||
logger.info("Sending message to output: " + newMessage);
|
||||
|
||||
logger.info("Activity returned: " + newMessage);
|
||||
logger.info("Activity finished");
|
||||
|
||||
return newMessage;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Start the workflow and client using the following commands:
|
||||
|
||||
<!-- STEP
|
||||
name: Run Sub-workflow Pattern workflow
|
||||
match_order: none
|
||||
output_match_mode: substring
|
||||
expected_stdout_lines:
|
||||
- 'calling subworkflow with input: Hello Dapr Workflow!'
|
||||
- 'SubWorkflow finished with: !wolfkroW rpaD olleH'
|
||||
background: true
|
||||
sleep: 60
|
||||
timeout_seconds: 60
|
||||
-->
|
||||
```sh
|
||||
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkflowWorker
|
||||
```
|
||||
|
||||
```sh
|
||||
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.subworkflow.DemoSubWorkerflowClient
|
||||
```
|
||||
<!-- END_STEP -->
|
||||
|
||||
The log from worker:
|
||||
```text
|
||||
== APP == 2023-11-07 20:08:52,521 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.subworkflow.DemoWorkflow
|
||||
== APP == 2023-11-07 20:08:52,523 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - calling subworkflow with input: Hello Dapr Workflow!
|
||||
== APP == 2023-11-07 20:08:52,561 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting SubWorkflow: io.dapr.examples.workflows.subworkflow.DemoSubWorkflow
|
||||
== APP == 2023-11-07 20:08:52,566 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow received input: Hello Dapr Workflow!
|
||||
== APP == 2023-11-07 20:08:52,566 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow is calling Activity: io.dapr.examples.workflows.subworkflow.ReverseActivity
|
||||
== APP == 2023-11-07 20:08:52,576 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Starting Activity: io.dapr.examples.workflows.subworkflow.ReverseActivity
|
||||
== APP == 2023-11-07 20:08:52,577 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Message Received from input: Hello Dapr Workflow!
|
||||
== APP == 2023-11-07 20:08:52,577 {HH:mm:ss.SSS} [main] INFO i.d.e.w.subworkflow.ReverseActivity - Sending message to output: !wolfkroW rpaD olleH
|
||||
== APP == 2023-11-07 20:08:52,596 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - SubWorkflow finished with: !wolfkroW rpaD olleH
|
||||
== APP == 2023-11-07 20:08:52,611 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - subworkflow finished with: !wolfkroW rpaD olleH
|
||||
```
|
||||
|
||||
The log from client:
|
||||
```text
|
||||
Started a new sub-workflow model workflow with instance ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb
|
||||
workflow instance with ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb completed with result: !wolfkroW rpaD olleH
|
||||
```
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.chain;
|
||||
|
||||
import io.dapr.workflows.client.DaprWorkflowClient;
|
||||
import io.dapr.workflows.client.WorkflowInstanceStatus;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class DemoChainClient {
|
||||
/**
|
||||
* The main method to start the client.
|
||||
*
|
||||
* @param args Input arguments (unused).
|
||||
* @throws InterruptedException If program has been interrupted.
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
try (DaprWorkflowClient client = new DaprWorkflowClient()) {
|
||||
String instanceId = client.scheduleNewWorkflow(DemoChainWorkflow.class);
|
||||
System.out.printf("Started a new chaining model workflow with instance ID: %s%n", instanceId);
|
||||
WorkflowInstanceStatus workflowInstanceStatus =
|
||||
client.waitForInstanceCompletion(instanceId, null, true);
|
||||
|
||||
String result = workflowInstanceStatus.readOutputAs(String.class);
|
||||
System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, result);
|
||||
|
||||
|
||||
} catch (TimeoutException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,43 +1,37 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows;
|
||||
|
||||
import io.dapr.workflows.runtime.WorkflowRuntime;
|
||||
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
|
||||
|
||||
/**
|
||||
* For setup instructions, see the README.
|
||||
*/
|
||||
public class DemoWorkflowWorker {
|
||||
|
||||
/**
|
||||
* The main method of this app.
|
||||
*
|
||||
* @param args The port the app will listen on.
|
||||
* @throws Exception An Exception.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Register the Workflow with the builder.
|
||||
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoWorkflow.class);
|
||||
builder.registerActivity(DemoWorkflowActivity.class);
|
||||
|
||||
// Build and then start the workflow runtime pulling and executing tasks
|
||||
try (WorkflowRuntime runtime = builder.build()) {
|
||||
System.out.println("Start workflow runtime");
|
||||
runtime.start();
|
||||
}
|
||||
|
||||
System.exit(0);
|
||||
}
|
||||
}
|
||||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.chain;
|
||||
|
||||
import io.dapr.workflows.runtime.WorkflowRuntime;
|
||||
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
|
||||
|
||||
public class DemoChainWorker {
|
||||
/**
|
||||
* The main method of this app.
|
||||
*
|
||||
* @param args The port the app will listen on.
|
||||
* @throws Exception An Exception.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Register the Workflow with the builder.
|
||||
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoChainWorkflow.class);
|
||||
builder.registerActivity(ToUpperCaseActivity.class);
|
||||
|
||||
// Build and then start the workflow runtime pulling and executing tasks
|
||||
try (WorkflowRuntime runtime = builder.build()) {
|
||||
System.out.println("Start workflow runtime");
|
||||
runtime.start();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.chain;
|
||||
|
||||
import io.dapr.workflows.Workflow;
|
||||
import io.dapr.workflows.WorkflowStub;
|
||||
|
||||
public class DemoChainWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
String result = "";
|
||||
result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Tokyo", String.class).await() + ", ";
|
||||
result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "London", String.class).await() + ", ";
|
||||
result += ctx.callActivity(ToUpperCaseActivity.class.getName(), "Seattle", String.class).await();
|
||||
|
||||
ctx.getLogger().info("Workflow finished with result: " + result);
|
||||
ctx.complete(result);
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.chain;
|
||||
|
||||
import io.dapr.workflows.runtime.WorkflowActivity;
|
||||
import io.dapr.workflows.runtime.WorkflowActivityContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
public class ToUpperCaseActivity implements WorkflowActivity {
|
||||
|
||||
@Override
|
||||
public Object run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(ToUpperCaseActivity.class);
|
||||
logger.info("Starting Activity: " + ctx.getName());
|
||||
|
||||
var message = ctx.getInput(String.class);
|
||||
var newMessage = message.toUpperCase();
|
||||
|
||||
logger.info("Message Received from input: " + message);
|
||||
logger.info("Sending message to output: " + newMessage);
|
||||
|
||||
return newMessage;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.continueasnew;
|
||||
|
||||
import io.dapr.workflows.runtime.WorkflowActivity;
|
||||
import io.dapr.workflows.runtime.WorkflowActivityContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class CleanUpActivity implements WorkflowActivity {
|
||||
@Override
|
||||
public Object run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(CleanUpActivity.class);
|
||||
logger.info("Starting Activity: " + ctx.getName());
|
||||
|
||||
logger.info("start clean up work, it may take few seconds to finish...");
|
||||
|
||||
//Sleeping for 5 seconds to simulate long running operation
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return "clean up finish.";
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.continueasnew;
|
||||
|
||||
import io.dapr.workflows.client.DaprWorkflowClient;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class DemoContinueAsNewClient {
|
||||
/**
|
||||
* The main method to start the client.
|
||||
*
|
||||
* @param args Input arguments (unused).
|
||||
* @throws InterruptedException If program has been interrupted.
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
try (DaprWorkflowClient client = new DaprWorkflowClient()) {
|
||||
String instanceId = client.scheduleNewWorkflow(DemoContinueAsNewWorkflow.class);
|
||||
System.out.printf("Started a new continue-as-new model workflow with instance ID: %s%n", instanceId);
|
||||
|
||||
client.waitForInstanceCompletion(instanceId, null, true);
|
||||
System.out.printf("workflow instance with ID: %s completed.", instanceId);
|
||||
|
||||
} catch (TimeoutException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.continueasnew;
|
||||
|
||||
import io.dapr.workflows.runtime.WorkflowRuntime;
|
||||
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
|
||||
|
||||
public class DemoContinueAsNewWorker {
|
||||
/**
|
||||
* The main method of this app.
|
||||
*
|
||||
* @param args The port the app will listen on.
|
||||
* @throws Exception An Exception.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Register the Workflow with the builder.
|
||||
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoContinueAsNewWorkflow.class);
|
||||
builder.registerActivity(CleanUpActivity.class);
|
||||
|
||||
// Build and then start the workflow runtime pulling and executing tasks
|
||||
try (WorkflowRuntime runtime = builder.build()) {
|
||||
System.out.println("Start workflow runtime");
|
||||
runtime.start();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.continueasnew;
|
||||
|
||||
import io.dapr.workflows.Workflow;
|
||||
import io.dapr.workflows.WorkflowStub;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
public class DemoContinueAsNewWorkflow extends Workflow {
|
||||
/*
|
||||
Compared with a CRON schedule, this periodic workflow example will never overlap.
|
||||
For example, a CRON schedule that executes a cleanup every hour will execute it at 1:00, 2:00, 3:00 etc.
|
||||
and could potentially run into overlap issues if the cleanup takes longer than an hour.
|
||||
In this example, however, if the cleanup takes 30 minutes, and we create a timer for 1 hour between cleanups,
|
||||
then it will be scheduled at 1:00, 2:30, 4:00, etc. and there is no chance of overlap.
|
||||
*/
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
ctx.getLogger().info("call CleanUpActivity to do the clean up");
|
||||
ctx.callActivity(CleanUpActivity.class.getName()).await();
|
||||
ctx.getLogger().info("CleanUpActivity finished");
|
||||
|
||||
ctx.getLogger().info("wait 10 seconds for next clean up");
|
||||
ctx.createTimer(Duration.ofSeconds(10)).await();
|
||||
|
||||
// continue the workflow.
|
||||
ctx.continueAsNew(null);
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.externalevent;
|
||||
|
||||
import io.dapr.workflows.runtime.WorkflowActivity;
|
||||
import io.dapr.workflows.runtime.WorkflowActivityContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ApproveActivity implements WorkflowActivity {
|
||||
@Override
|
||||
public Object run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(ApproveActivity.class);
|
||||
logger.info("Starting Activity: " + ctx.getName());
|
||||
|
||||
logger.info("Running approval activity...");
|
||||
//Sleeping for 5 seconds to simulate long running operation
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.externalevent;
|
||||
|
||||
import io.dapr.workflows.client.DaprWorkflowClient;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class DemoExternalEventClient {
|
||||
/**
|
||||
* The main method to start the client.
|
||||
*
|
||||
* @param args Input arguments (unused).
|
||||
* @throws InterruptedException If program has been interrupted.
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
try (DaprWorkflowClient client = new DaprWorkflowClient()) {
|
||||
String instanceId = client.scheduleNewWorkflow(DemoExternalEventWorkflow.class);
|
||||
System.out.printf("Started a new external-event workflow with instance ID: %s%n", instanceId);
|
||||
|
||||
client.raiseEvent(instanceId, "Approval", true);
|
||||
//client.raiseEvent(instanceId, "Approval", false);
|
||||
|
||||
client.waitForInstanceCompletion(instanceId, null, true);
|
||||
System.out.printf("workflow instance with ID: %s completed.", instanceId);
|
||||
|
||||
} catch (TimeoutException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.externalevent;
|
||||
|
||||
import io.dapr.workflows.runtime.WorkflowRuntime;
|
||||
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
|
||||
|
||||
public class DemoExternalEventWorker {
|
||||
/**
|
||||
* The main method of this app.
|
||||
*
|
||||
* @param args The port the app will listen on.
|
||||
* @throws Exception An Exception.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Register the Workflow with the builder.
|
||||
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoExternalEventWorkflow.class);
|
||||
builder.registerActivity(ApproveActivity.class);
|
||||
builder.registerActivity(DenyActivity.class);
|
||||
|
||||
// Build and then start the workflow runtime pulling and executing tasks
|
||||
try (WorkflowRuntime runtime = builder.build()) {
|
||||
System.out.println("Start workflow runtime");
|
||||
runtime.start();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.externalevent;
|
||||
|
||||
import io.dapr.workflows.Workflow;
|
||||
import io.dapr.workflows.WorkflowStub;
|
||||
|
||||
public class DemoExternalEventWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
ctx.getLogger().info("Waiting for approval...");
|
||||
Boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await();
|
||||
if (approved) {
|
||||
ctx.getLogger().info("approval granted - do the approved action");
|
||||
ctx.callActivity(ApproveActivity.class.getName()).await();
|
||||
ctx.getLogger().info("approval-activity finished");
|
||||
} else {
|
||||
ctx.getLogger().info("approval denied - send a notification");
|
||||
ctx.callActivity(DenyActivity.class.getName()).await();
|
||||
ctx.getLogger().info("denied-activity finished");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -11,9 +11,8 @@
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows;
|
||||
package io.dapr.examples.workflows.externalevent;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAutoDetect;
|
||||
import io.dapr.workflows.runtime.WorkflowActivity;
|
||||
import io.dapr.workflows.runtime.WorkflowActivityContext;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -21,33 +20,20 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
|
||||
public class DemoWorkflowActivity implements WorkflowActivity {
|
||||
|
||||
public class DenyActivity implements WorkflowActivity {
|
||||
@Override
|
||||
public DemoActivityOutput run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(DemoWorkflowActivity.class);
|
||||
public Object run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(DenyActivity.class);
|
||||
logger.info("Starting Activity: " + ctx.getName());
|
||||
|
||||
var message = ctx.getInput(DemoActivityInput.class).getMessage();
|
||||
var newMessage = message + " World!, from Activity";
|
||||
logger.info("Message Received from input: " + message);
|
||||
logger.info("Sending message to output: " + newMessage);
|
||||
|
||||
logger.info("Sleeping for 5 seconds to simulate long running operation...");
|
||||
|
||||
logger.info("Running denied activity...");
|
||||
//Sleeping for 5 seconds to simulate long running operation
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
|
||||
logger.info("Activity finished");
|
||||
|
||||
var output = new DemoActivityOutput(message, newMessage);
|
||||
logger.info("Activity returned: " + output);
|
||||
|
||||
return output;
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.faninout;
|
||||
|
||||
import io.dapr.workflows.runtime.WorkflowActivity;
|
||||
import io.dapr.workflows.runtime.WorkflowActivityContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
public class CountWordsActivity implements WorkflowActivity {
|
||||
@Override
|
||||
public Object run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(CountWordsActivity.class);
|
||||
logger.info("Starting Activity: " + ctx.getName());
|
||||
|
||||
String input = ctx.getInput(String.class);
|
||||
StringTokenizer tokenizer = new StringTokenizer(input);
|
||||
int result = tokenizer.countTokens();
|
||||
|
||||
logger.info("Activity returned: " + result);
|
||||
logger.info("Activity finished");
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.faninout;
|
||||
|
||||
import io.dapr.workflows.client.DaprWorkflowClient;
|
||||
import io.dapr.workflows.client.WorkflowInstanceStatus;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class DemoFanInOutClient {
|
||||
/**
|
||||
* The main method to start the client.
|
||||
*
|
||||
* @param args Input arguments (unused).
|
||||
* @throws InterruptedException If program has been interrupted.
|
||||
*/
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
try (DaprWorkflowClient client = new DaprWorkflowClient()) {
|
||||
// The input is an arbitrary list of strings.
|
||||
List<String> listOfStrings = Arrays.asList(
|
||||
"Hello, world!",
|
||||
"The quick brown fox jumps over the lazy dog.",
|
||||
"If a tree falls in the forest and there is no one there to hear it, does it make a sound?",
|
||||
"The greatest glory in living lies not in never falling, but in rising every time we fall.",
|
||||
"Always remember that you are absolutely unique. Just like everyone else.");
|
||||
|
||||
// Schedule an orchestration which will reliably count the number of words in all the given sentences.
|
||||
String instanceId = client.scheduleNewWorkflow(
|
||||
DemoFanInOutWorkflow.class,
|
||||
listOfStrings);
|
||||
System.out.printf("Started a new fan out/fan in model workflow with instance ID: %s%n", instanceId);
|
||||
|
||||
// Block until the orchestration completes. Then print the final status, which includes the output.
|
||||
WorkflowInstanceStatus workflowInstanceStatus = client.waitForInstanceCompletion(
|
||||
instanceId,
|
||||
Duration.ofSeconds(30),
|
||||
true);
|
||||
System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId,
|
||||
workflowInstanceStatus.readOutputAs(int.class));
|
||||
} catch (TimeoutException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.faninout;
|
||||
|
||||
import io.dapr.workflows.runtime.WorkflowRuntime;
|
||||
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
|
||||
|
||||
public class DemoFanInOutWorker {
|
||||
/**
|
||||
* The main method of this app.
|
||||
*
|
||||
* @param args The port the app will listen on.
|
||||
* @throws Exception An Exception.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Register the Workflow with the builder.
|
||||
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoFanInOutWorkflow.class);
|
||||
builder.registerActivity(CountWordsActivity.class);
|
||||
|
||||
// Build and then start the workflow runtime pulling and executing tasks
|
||||
try (WorkflowRuntime runtime = builder.build()) {
|
||||
System.out.println("Start workflow runtime");
|
||||
runtime.start();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.faninout;
|
||||
|
||||
import com.microsoft.durabletask.Task;
|
||||
import io.dapr.workflows.Workflow;
|
||||
import io.dapr.workflows.WorkflowStub;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class DemoFanInOutWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
|
||||
// The input is a list of objects that need to be operated on.
|
||||
// In this example, inputs are expected to be strings.
|
||||
List<?> inputs = ctx.getInput(List.class);
|
||||
|
||||
// Fan-out to multiple concurrent activity invocations, each of which does a word count.
|
||||
List<Task<Integer>> tasks = inputs.stream()
|
||||
.map(input -> ctx.callActivity(CountWordsActivity.class.getName(), input.toString(), Integer.class))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// Fan-in to get the total word count from all the individual activity results.
|
||||
List<Integer> allWordCountResults = ctx.allOf(tasks).await();
|
||||
int totalWordCount = allWordCountResults.stream().mapToInt(Integer::intValue).sum();
|
||||
|
||||
ctx.getLogger().info("Workflow finished with result: " + totalWordCount);
|
||||
// Save the final result as the orchestration output.
|
||||
ctx.complete(totalWordCount);
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.subworkflow;
|
||||
|
||||
import io.dapr.workflows.client.DaprWorkflowClient;
|
||||
import io.dapr.workflows.client.WorkflowInstanceStatus;
|
||||
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class DemoSubWorkerflowClient {
|
||||
/**
|
||||
* The main method to start the client.
|
||||
*
|
||||
* @param args Input arguments (unused).
|
||||
* @throws InterruptedException If program has been interrupted.
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
try (DaprWorkflowClient client = new DaprWorkflowClient()) {
|
||||
String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class);
|
||||
System.out.printf("Started a new sub-workflow model workflow with instance ID: %s%n", instanceId);
|
||||
WorkflowInstanceStatus workflowInstanceStatus =
|
||||
client.waitForInstanceCompletion(instanceId, null, true);
|
||||
|
||||
String result = workflowInstanceStatus.readOutputAs(String.class);
|
||||
System.out.printf("workflow instance with ID: %s completed with result: %s%n", instanceId, result);
|
||||
|
||||
} catch (TimeoutException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -11,29 +11,25 @@
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows;
|
||||
package io.dapr.examples.workflows.subworkflow;
|
||||
|
||||
import io.dapr.workflows.Workflow;
|
||||
import io.dapr.workflows.WorkflowStub;
|
||||
|
||||
/**
|
||||
* Implementation of the DemoWorkflow for the server side.
|
||||
*/
|
||||
public class DemoSubWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting SubWorkflow: " + ctx.getName());
|
||||
|
||||
var logger = ctx.getLogger();
|
||||
logger.info("Child-Workflow> Started: " + ctx.getName());
|
||||
logger.info("Child-Workflow> Instance ID: " + ctx.getInstanceId());
|
||||
logger.info("Child-Workflow> Current Time: " + ctx.getCurrentInstant());
|
||||
var subWorkflowInput = ctx.getInput(String.class);
|
||||
ctx.getLogger().info("SubWorkflow received input: " + subWorkflowInput);
|
||||
|
||||
var input = ctx.getInput(String.class);
|
||||
logger.info("Child-Workflow> Input: " + input);
|
||||
ctx.getLogger().info("SubWorkflow is calling Activity: " + ReverseActivity.class.getName());
|
||||
String result = ctx.callActivity(ReverseActivity.class.getName(), subWorkflowInput, String.class).await();
|
||||
|
||||
logger.info("Child-Workflow> Completed");
|
||||
ctx.complete("result: " + input);
|
||||
ctx.getLogger().info("SubWorkflow finished with: " + result);
|
||||
ctx.complete(result);
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.subworkflow;
|
||||
|
||||
import io.dapr.workflows.runtime.WorkflowRuntime;
|
||||
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
|
||||
|
||||
public class DemoSubWorkflowWorker {
|
||||
/**
|
||||
* The main method of this app.
|
||||
*
|
||||
* @param args The port the app will listen on.
|
||||
* @throws Exception An Exception.
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
// Register the Workflow with the builder.
|
||||
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder()
|
||||
.registerWorkflow(DemoWorkflow.class)
|
||||
.registerWorkflow(DemoSubWorkflow.class);
|
||||
builder.registerActivity(ReverseActivity.class);
|
||||
|
||||
// Build and then start the workflow runtime pulling and executing tasks
|
||||
try (WorkflowRuntime runtime = builder.build()) {
|
||||
System.out.println("Start workflow runtime");
|
||||
runtime.start();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.subworkflow;
|
||||
|
||||
import io.dapr.workflows.Workflow;
|
||||
import io.dapr.workflows.WorkflowStub;
|
||||
|
||||
public class DemoWorkflow extends Workflow {
|
||||
@Override
|
||||
public WorkflowStub create() {
|
||||
return ctx -> {
|
||||
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
|
||||
|
||||
var subWorkflowInput = "Hello Dapr Workflow!";
|
||||
ctx.getLogger().info("calling subworkflow with input: " + subWorkflowInput);
|
||||
|
||||
var subWorkflowOutput =
|
||||
ctx.callSubWorkflow(DemoSubWorkflow.class.getName(), subWorkflowInput, String.class).await();
|
||||
|
||||
ctx.getLogger().info("subworkflow finished with: " + subWorkflowOutput);
|
||||
ctx.complete(subWorkflowOutput);
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/*
|
||||
* Copyright 2023 The Dapr Authors
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package io.dapr.examples.workflows.subworkflow;
|
||||
|
||||
import io.dapr.examples.workflows.chain.ToUpperCaseActivity;
|
||||
import io.dapr.workflows.runtime.WorkflowActivity;
|
||||
import io.dapr.workflows.runtime.WorkflowActivityContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class ReverseActivity implements WorkflowActivity {
|
||||
@Override
|
||||
public Object run(WorkflowActivityContext ctx) {
|
||||
Logger logger = LoggerFactory.getLogger(ReverseActivity.class);
|
||||
logger.info("Starting Activity: " + ctx.getName());
|
||||
|
||||
var message = ctx.getInput(String.class);
|
||||
var newMessage = new StringBuilder(message).reverse().toString();
|
||||
|
||||
logger.info("Message Received from input: " + message);
|
||||
logger.info("Sending message to output: " + newMessage);
|
||||
|
||||
return newMessage;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue