diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index c094f92fa..06c911702 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -54,7 +54,48 @@ This section describes the guidelines for contributing code / docs to Dapr. ### Things to consider when adding new API to SDK 1. All the new API's go under [dapr-sdk maven package](https://github.com/dapr/java-sdk/tree/master/sdk) -2. Make sure there is an example talking about how to use the API along with a README. [Example](https://github.com/dapr/java-sdk/pull/1235/files#diff-69ed756c4c01fd5fa884aac030dccb8f3f4d4fefa0dc330862d55a6f87b34a14) +2. Make sure there is an example talking about how to use the API along with a README with mechanical markdown. [Example](https://github.com/dapr/java-sdk/pull/1235/files#diff-69ed756c4c01fd5fa884aac030dccb8f3f4d4fefa0dc330862d55a6f87b34a14) + +#### Mechanical Markdown + +Mechanical markdown is used to validate example outputs in our CI pipeline. It ensures that the expected output in README files matches the actual output when running the examples. This helps maintain example output, catches any unintended changes in example behavior, and regressions. + +To test mechanical markdown locally: + +1. Install the package: +```bash +pip3 install mechanical-markdown +``` + +2. Run the test from the respective examples README directory, for example: +```bash +cd examples +mm.py ./src/main/java/io/dapr/examples/workflows/README.md +``` + +The test will: +- Parse the STEP markers in the README +- Execute the commands specified in the markers +- Compare the actual output with the expected output +- Report any mismatches + +When writing STEP markers: +- Use `output_match_mode: substring` for flexible matching +- Quote strings containing special YAML characters (like `:`, `*`, `'`) +- Set appropriate timeouts for long-running examples + +Example STEP marker: +```yaml + +``` ### Pull Requests diff --git a/examples/src/main/java/io/dapr/examples/workflows/README.md b/examples/src/main/java/io/dapr/examples/workflows/README.md index 047f1d744..b90726080 100644 --- a/examples/src/main/java/io/dapr/examples/workflows/README.md +++ b/examples/src/main/java/io/dapr/examples/workflows/README.md @@ -51,7 +51,8 @@ Those examples contain the following workflow patterns: 2. [Fan-out/Fan-in Pattern](#fan-outfan-in-pattern) 3. [Continue As New Pattern](#continue-as-new-pattern) 4. [External Event Pattern](#external-event-pattern) -5. [child-workflow Pattern](#child-workflow-pattern) +5. [Child-workflow Pattern](#child-workflow-pattern) +6. [Compensation Pattern](#compensation-pattern) ### Chaining Pattern In the chaining pattern, a sequence of activities executes in a specific order. @@ -353,7 +354,7 @@ dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- ``` ```sh java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.continueasnew.DemoContinueAsNewClient -```` +``` You will see the logs from worker showing the `CleanUpActivity` is invoked every 10 seconds after previous one is finished: ```text @@ -419,7 +420,6 @@ client.raiseEvent(instanceId, "Approval", true); Start the workflow and client using the following commands: -ex ```sh dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker ``` @@ -444,7 +444,7 @@ Started a new external-event model workflow with instance ID: 23410d96-1afe-4698 workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed. ``` -### child-workflow Pattern +### Child-workflow Pattern The child-workflow pattern allows you to call a workflow from another workflow. The `DemoWorkflow` class defines the workflow. It calls a child-workflow `DemoChildWorkflow` to do the work. See the code snippet below: @@ -540,3 +540,171 @@ The log from client: Started a new child-workflow model workflow with instance ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb workflow instance with ID: c2fb9c83-435b-4b55-bdf1-833b39366cfb completed with result: !wolfkroW rpaD olleH ``` + +### Compensation Pattern +The compensation pattern is used to "undo" or "roll back" previously completed steps if a later step fails. This pattern is particularly useful in scenarios where you need to ensure that all resources are properly cleaned up even if the process fails. + +The example simulates a trip booking workflow that books a flight, hotel, and car. If any step fails, the workflow will automatically compensate (cancel) the previously completed bookings in reverse order. + +The `BookTripWorkflow` class defines the workflow. It orchestrates the booking process and handles compensation if any step fails. See the code snippet below: +```java +public class BookTripWorkflow extends Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + List compensations = new ArrayList<>(); + + try { + // Book flight + String flightResult = ctx.callActivity(BookFlightActivity.class.getName(), String.class).await(); + ctx.getLogger().info("Flight booking completed: " + flightResult); + compensations.add(CancelFlightActivity.class.getName()); + + // Book hotel + String hotelResult = ctx.callActivity(BookHotelActivity.class.getName(), String.class).await(); + ctx.getLogger().info("Hotel booking completed: " + hotelResult); + compensations.add(CancelHotelActivity.class.getName()); + + // Book car + String carResult = ctx.callActivity(BookCarActivity.class.getName(), String.class).await(); + ctx.getLogger().info("Car booking completed: " + carResult); + compensations.add(CancelCarActivity.class.getName()); + + } catch (Exception e) { + ctx.getLogger().info("******** executing compensation logic ********"); + // Execute compensations in reverse order + Collections.reverse(compensations); + for (String compensation : compensations) { + try { + ctx.callActivity(compensation, String.class).await(); + } catch (Exception ex) { + ctx.getLogger().error("Error during compensation: " + ex.getMessage()); + } + } + ctx.complete("Workflow failed, compensation applied"); + return; + } + ctx.complete("All bookings completed successfully"); + }; + } +} +``` + +Each activity class (`BookFlightActivity`, `BookHotelActivity`, `BookCarActivity`) implements the booking logic, while their corresponding compensation activities (`CancelFlightActivity`, `CancelHotelActivity`, `CancelCarActivity`) implement the cancellation logic. + + + +Execute the following script in order to run the BookTripWorker: +```sh +dapr run --app-id book-trip-worker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.compensation.BookTripWorker +``` + +Once running, execute the following script to run the BookTripClient: +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.compensation.BookTripClient +``` + + +The output demonstrates: +1. The workflow starts and successfully books a flight +2. Then successfully books a hotel +3. When attempting to book a car, it fails (intentionally) +4. The compensation logic triggers, canceling the hotel and flight in reverse order +5. The workflow completes with a status indicating the compensation was applied + +Key Points: +1. Each successful booking step adds its compensation action to an ArrayList +2. If an error occurs, the list of compensations is reversed and executed in reverse order +3. The workflow ensures that all resources are properly cleaned up even if the process fails +4. Each activity simulates work with a short delay for demonstration purposes + + +### Suspend/Resume Pattern + +Workflow instances can be suspended and resumed. This example shows how to use the suspend and resume commands. + +For testing the suspend and resume operations we will use the same workflow definition used by the DemoExternalEventWorkflow. + +Start the workflow and client using the following commands: + + + + +```sh +dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.suspendresume.DemoSuspendResumeWorker +``` + +```sh +java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.suspendresume.DemoSuspendResumeClient +``` + + + +The worker logs: +```text +== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.suspendresume.DemoExternalEventWorkflow +== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Waiting for approval... +== APP == 2023-11-07 16:01:23,324 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval granted - do the approved action +== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity +== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Running approval activity... +== APP == 2023-11-07 16:01:28,410 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval-activity finished +``` + +The client log: +```text +Started a new external-event model workflow with instance ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 +workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed. +``` \ No newline at end of file diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookCarActivity.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookCarActivity.java new file mode 100644 index 000000000..9ad0285d9 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookCarActivity.java @@ -0,0 +1,42 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class BookCarActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(BookCarActivity.class); + + @Override + public String run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + // Simulate work + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + logger.info("Forcing Failure to trigger compensation for activity: " + ctx.getName()); + + // force the compensation + throw new RuntimeException("Failed to book car"); + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookFlightActivity.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookFlightActivity.java new file mode 100644 index 000000000..075c4d275 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookFlightActivity.java @@ -0,0 +1,41 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class BookFlightActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(BookFlightActivity.class); + + @Override + public String run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + // Simulate work + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + String result = "Flight booked successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookHotelActivity.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookHotelActivity.java new file mode 100644 index 000000000..a2eca04c4 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookHotelActivity.java @@ -0,0 +1,40 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BookHotelActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(BookHotelActivity.class); + + @Override + public String run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + logger.info("Simulating hotel booking process..."); + + // Simulate some work + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + String result = "Hotel booked successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripClient.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripClient.java new file mode 100644 index 000000000..212c1f0a1 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripClient.java @@ -0,0 +1,35 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowInstanceStatus; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +public class BookTripClient { + public static void main(String[] args) { + try (DaprWorkflowClient client = new DaprWorkflowClient()) { + String instanceId = client.scheduleNewWorkflow(BookTripWorkflow.class); + System.out.printf("Started a new trip booking workflow with instance ID: %s%n", instanceId); + + WorkflowInstanceStatus status = client.waitForInstanceCompletion(instanceId, Duration.ofMinutes(30), true); + System.out.printf("Workflow instance with ID: %s completed with status: %s%n", instanceId, status); + System.out.printf("Workflow output: %s%n", status.getSerializedOutput()); + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorker.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorker.java new file mode 100644 index 000000000..d32ade26a --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorker.java @@ -0,0 +1,38 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class BookTripWorker { + + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder() + .registerWorkflow(BookTripWorkflow.class) + .registerActivity(BookFlightActivity.class) + .registerActivity(CancelFlightActivity.class) + .registerActivity(BookHotelActivity.class) + .registerActivity(CancelHotelActivity.class) + .registerActivity(BookCarActivity.class) + .registerActivity(CancelCarActivity.class); + + // Build and start the workflow runtime + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("Start workflow runtime"); + runtime.start(); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorkflow.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorkflow.java new file mode 100644 index 000000000..f375363ed --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/BookTripWorkflow.java @@ -0,0 +1,107 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.durabletask.TaskFailedException; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.WorkflowTaskOptions; +import io.dapr.workflows.WorkflowTaskRetryPolicy; + +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; +import java.time.Duration; + +public class BookTripWorkflow implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + List compensations = new ArrayList<>(); + + // Define retry policy for compensation activities + WorkflowTaskRetryPolicy compensationRetryPolicy = WorkflowTaskRetryPolicy.newBuilder() + .setFirstRetryInterval(Duration.ofSeconds(1)) + .setMaxNumberOfAttempts(3) + .build(); + + WorkflowTaskOptions compensationOptions = new WorkflowTaskOptions(compensationRetryPolicy); + + try { + // Book flight + String flightResult = ctx.callActivity(BookFlightActivity.class.getName(), null, String.class).await(); + ctx.getLogger().info("Flight booking completed: {}", flightResult); + compensations.add("CancelFlight"); + + // Book hotel + String hotelResult = ctx.callActivity(BookHotelActivity.class.getName(), null, String.class).await(); + ctx.getLogger().info("Hotel booking completed: {}", hotelResult); + compensations.add("CancelHotel"); + + // Book car + String carResult = ctx.callActivity(BookCarActivity.class.getName(), null, String.class).await(); + ctx.getLogger().info("Car booking completed: {}", carResult); + compensations.add("CancelCar"); + + String result = String.format("%s, %s, %s", flightResult, hotelResult, carResult); + ctx.getLogger().info("Trip booked successfully: {}", result); + ctx.complete(result); + + } catch (TaskFailedException e) { + ctx.getLogger().info("******** executing compensation logic ********"); + ctx.getLogger().error("Activity failed: {}", e.getMessage()); + + // Execute compensations in reverse order + Collections.reverse(compensations); + for (String compensation : compensations) { + try { + switch (compensation) { + case "CancelCar": + String carCancelResult = ctx.callActivity( + CancelCarActivity.class.getName(), + null, + compensationOptions, + String.class).await(); + ctx.getLogger().info("Car cancellation completed: {}", carCancelResult); + break; + + case "CancelHotel": + String hotelCancelResult = ctx.callActivity( + CancelHotelActivity.class.getName(), + null, + compensationOptions, + String.class).await(); + ctx.getLogger().info("Hotel cancellation completed: {}", hotelCancelResult); + break; + + case "CancelFlight": + String flightCancelResult = ctx.callActivity( + CancelFlightActivity.class.getName(), + null, + compensationOptions, + String.class).await(); + ctx.getLogger().info("Flight cancellation completed: {}", flightCancelResult); + break; + } + } catch (TaskFailedException ex) { + // Only catch TaskFailedException for actual activity failures + ctx.getLogger().error("Activity failed during compensation: {}", ex.getMessage()); + } + } + ctx.complete("Workflow failed, compensation applied"); + } + }; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelCarActivity.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelCarActivity.java new file mode 100644 index 000000000..bca6af0da --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelCarActivity.java @@ -0,0 +1,41 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class CancelCarActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(CancelCarActivity.class); + + @Override + public String run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + // Simulate work + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + String result = "Car canceled successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelFlightActivity.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelFlightActivity.java new file mode 100644 index 000000000..0c2034dee --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelFlightActivity.java @@ -0,0 +1,41 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class CancelFlightActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(CancelFlightActivity.class); + + @Override + public String run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + // Simulate work + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + String result = "Flight canceled successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelHotelActivity.java b/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelHotelActivity.java new file mode 100644 index 000000000..03f5f9b64 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/compensation/CancelHotelActivity.java @@ -0,0 +1,41 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.compensation; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +public class CancelHotelActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(CancelHotelActivity.class); + + @Override + public String run(WorkflowActivityContext ctx) { + logger.info("Starting Activity: " + ctx.getName()); + + // Simulate work + try { + TimeUnit.SECONDS.sleep(2); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + String result = "Hotel canceled successfully"; + logger.info("Activity completed with result: " + result); + return result; + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/suspendresume/DemoSuspendResumeClient.java b/examples/src/main/java/io/dapr/examples/workflows/suspendresume/DemoSuspendResumeClient.java new file mode 100644 index 000000000..7e8289798 --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/suspendresume/DemoSuspendResumeClient.java @@ -0,0 +1,59 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.suspendresume; + +import io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowInstanceStatus; + +import java.util.concurrent.TimeoutException; + +public class DemoSuspendResumeClient { + /** + * The main method to start the client. + * + * @param args Input arguments (unused). + * @throws InterruptedException If program has been interrupted. + */ + public static void main(String[] args) { + try (DaprWorkflowClient client = new DaprWorkflowClient()) { + String instanceId = client.scheduleNewWorkflow(DemoExternalEventWorkflow.class); + System.out.printf("Started a new external-event workflow with instance ID: %s%n", instanceId); + + + System.out.printf("Suspending Workflow Instance: %s%n", instanceId ); + client.suspendWorkflow(instanceId, "suspending workflow instance."); + + WorkflowInstanceStatus instanceState = client.getInstanceState(instanceId, false); + assert instanceState != null; + System.out.printf("Workflow Instance Status: %s%n", instanceState.getRuntimeStatus().name() ); + + System.out.printf("Let's resume the Workflow Instance before sending the external event: %s%n", instanceId ); + client.resumeWorkflow(instanceId, "resuming workflow instance."); + + instanceState = client.getInstanceState(instanceId, false); + assert instanceState != null; + System.out.printf("Workflow Instance Status: %s%n", instanceState.getRuntimeStatus().name() ); + + System.out.printf("Now that the instance is RUNNING again, lets send the external event. %n"); + client.raiseEvent(instanceId, "Approval", true); + + client.waitForInstanceCompletion(instanceId, null, true); + System.out.printf("workflow instance with ID: %s completed.", instanceId); + + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/examples/src/main/java/io/dapr/examples/workflows/suspendresume/DemoSuspendResumeWorker.java b/examples/src/main/java/io/dapr/examples/workflows/suspendresume/DemoSuspendResumeWorker.java new file mode 100644 index 000000000..5ca4bc34b --- /dev/null +++ b/examples/src/main/java/io/dapr/examples/workflows/suspendresume/DemoSuspendResumeWorker.java @@ -0,0 +1,40 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.examples.workflows.suspendresume; + +import io.dapr.examples.workflows.externalevent.ApproveActivity; +import io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow; +import io.dapr.examples.workflows.externalevent.DenyActivity; +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +public class DemoSuspendResumeWorker { + /** + * The main method of this app. + * + * @param args The port the app will listen on. + * @throws Exception An Exception. + */ + public static void main(String[] args) throws Exception { + // Register the Workflow with the builder. + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoExternalEventWorkflow.class); + builder.registerActivity(ApproveActivity.class); + builder.registerActivity(DenyActivity.class); + + // Build and then start the workflow runtime pulling and executing tasks + WorkflowRuntime runtime = builder.build(); + System.out.println("Start workflow runtime"); + runtime.start(); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java index 5c6a360c8..ee70222d5 100644 --- a/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/DaprWorkflowsIT.java @@ -20,6 +20,7 @@ import io.dapr.testcontainers.DaprContainer; import io.dapr.testcontainers.DaprLogLevel; import io.dapr.workflows.client.DaprWorkflowClient; import io.dapr.workflows.client.WorkflowInstanceStatus; +import io.dapr.workflows.client.WorkflowRuntimeStatus; import io.dapr.workflows.runtime.WorkflowRuntime; import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; import org.junit.jupiter.api.BeforeEach; @@ -117,6 +118,36 @@ public class DaprWorkflowsIT { assertEquals(instanceId, workflowOutput.getWorkflowId()); } + @Test + public void testSuspendAndResumeWorkflows() throws Exception { + TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>()); + String instanceId = workflowClient.scheduleNewWorkflow(TestWorkflow.class, payload); + workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false); + + workflowClient.suspendWorkflow(instanceId, "testing suspend."); + + + WorkflowInstanceStatus instanceState = workflowClient.getInstanceState(instanceId, false); + assertNotNull(instanceState); + assertEquals(WorkflowRuntimeStatus.SUSPENDED, instanceState.getRuntimeStatus()); + + workflowClient.resumeWorkflow(instanceId, "testing resume"); + + instanceState = workflowClient.getInstanceState(instanceId, false); + assertNotNull(instanceState); + assertEquals(WorkflowRuntimeStatus.RUNNING, instanceState.getRuntimeStatus()); + + workflowClient.raiseEvent(instanceId, "MoveForward", payload); + + Duration timeout = Duration.ofSeconds(10); + instanceState = workflowClient.waitForInstanceCompletion(instanceId, timeout, true); + + assertNotNull(instanceState); + assertEquals(WorkflowRuntimeStatus.COMPLETED, instanceState.getRuntimeStatus()); + + } + + private TestWorkflowPayload deserialize(String value) throws JsonProcessingException { return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class); } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java index ab46dff79..b24c8bcc9 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/client/DaprWorkflowClient.java @@ -129,6 +129,26 @@ public class DaprWorkflowClient implements AutoCloseable { orchestrationInstanceOptions); } + /** + * Suspend the workflow associated with the provided instance id. + * + * @param workflowInstanceId Workflow instance id to suspend. + * @param reason reason for suspending the workflow instance. + */ + public void suspendWorkflow(String workflowInstanceId, @Nullable String reason) { + this.innerClient.suspendInstance(workflowInstanceId, reason); + } + + /** + * Resume the workflow associated with the provided instance id. + * + * @param workflowInstanceId Workflow instance id to resume. + * @param reason reason for resuming the workflow instance. + */ + public void resumeWorkflow(String workflowInstanceId, @Nullable String reason) { + this.innerClient.resumeInstance(workflowInstanceId, reason); + } + /** * Terminates the workflow associated with the provided instance id. * diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java index 3ad66877c..55f7c9fdd 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/client/DaprWorkflowClientTest.java @@ -217,6 +217,17 @@ public class DaprWorkflowClientTest { expectedEventName, expectedEventPayload); } + @Test + public void suspendResumeInstance() { + String expectedArgument = "TestWorkflowInstanceId"; + client.suspendWorkflow(expectedArgument, "suspending workflow instance"); + client.resumeWorkflow(expectedArgument, "resuming workflow instance"); + verify(mockInnerClient, times(1)).suspendInstance(expectedArgument, + "suspending workflow instance"); + verify(mockInnerClient, times(1)).resumeInstance(expectedArgument, + "resuming workflow instance"); + } + @Test public void purgeInstance() { String expectedArgument = "TestWorkflowInstanceId"; diff --git a/spring-boot-examples/workflows/README.md b/spring-boot-examples/workflows/README.md new file mode 100644 index 000000000..ae02d2ee6 --- /dev/null +++ b/spring-boot-examples/workflows/README.md @@ -0,0 +1,533 @@ +# Dapr Spring Boot Workflow Examples + +This application allows you to run different [workflow patterns](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-patterns) including: +- Chained Activities +- Parent/Child Workflows +- Continue workflow by sending External Events +- Fan Out/In activities for parallel execution +- Suspend/Resume workflows + +## Running these examples from source code + +To run these examples you will need: +- Java SDK +- Maven +- Docker or a container runtime such as Podman + +From the `spring-boot-examples/workflows` directory you can start the service by running the following command: + + + + +```sh +../../mvnw spring-boot:test-run +``` + + + +By running the `spring-boot:test-run` goal, the application is loaded using the [test configurations](src/test/java/io/dapr/springboot/examples/wfp/DaprTestContainersConfig.java) +configured using [Testcontainers](https://testcontainers.com) to boostrap the [Dapr](https://dapr.io) sidecar and control plane. + +Once the application is running you can trigger the different patterns by sending the following requests: + +### Chaining Activities Workflow example + +The `io.dapr.springboot.examples.wfp.chain.ChainWorkflow` executes three chained activities. For this example the +`ToUpperCaseActivity.java` is used to transform to upper case three strings from an array. + +```mermaid +graph LR + SW((Start + Workflow)) + A1[Activity1] + A2[Activity2] + A3[Activity3] + EW((End + Workflow)) + SW --> A1 + A1 --> A2 + A2 --> A3 + A3 --> EW +``` + + + + +To start the workflow with the three chained activities you can run: + +```sh +curl -X POST localhost:8080/wfp/chain -H 'Content-Type: application/json' +``` + + + + +As result from executing the request you should see: + +```bash +TOKYO, LONDON, SEATTLE +``` + +In the application output you should see the workflow activities being executed. + +```bash +io.dapr.workflows.WorkflowContext : Starting Workflow: io.dapr.springboot.examples.wfp.chain.ChainWorkflow +i.d.s.e.w.WorkflowPatternsRestController : Workflow instance 7625b4af-8c04-408a-93dc-bad753466e43 started +i.d.s.e.wfp.chain.ToUpperCaseActivity : Starting Activity: io.dapr.springboot.examples.wfp.chain.ToUpperCaseActivity +i.d.s.e.wfp.chain.ToUpperCaseActivity : Message Received from input: Tokyo +i.d.s.e.wfp.chain.ToUpperCaseActivity : Sending message to output: TOKYO +i.d.s.e.wfp.chain.ToUpperCaseActivity : Starting Activity: io.dapr.springboot.examples.wfp.chain.ToUpperCaseActivity +i.d.s.e.wfp.chain.ToUpperCaseActivity : Message Received from input: London +i.d.s.e.wfp.chain.ToUpperCaseActivity : Sending message to output: LONDON +i.d.s.e.wfp.chain.ToUpperCaseActivity : Starting Activity: io.dapr.springboot.examples.wfp.chain.ToUpperCaseActivity +i.d.s.e.wfp.chain.ToUpperCaseActivity : Message Received from input: Seattle +i.d.s.e.wfp.chain.ToUpperCaseActivity : Sending message to output: SEATTLE +io.dapr.workflows.WorkflowContext : Workflow finished with result: TOKYO, LONDON, SEATTLE +``` + +### Parent / Child Workflows example + +In this example we start a Parent workflow that calls a child workflow that execute one activity that reverses an input string. + +The Parent workflow looks like this: + +```mermaid +graph LR + SW((Start + Workflow)) + subgraph for each word in the input + GWL[Call child workflow] + end + EW((End + Workflow)) + SW --> GWL + GWL --> EW +``` + +The Child workflow looks like this: + +```mermaid +graph LR + SW((Start + Workflow)) + A1[Activity1] + EW((End + Workflow)) + SW --> A1 + A1 --> EW +``` + +To start the parent workflow you can run: + + + + +To start the workflow with the three chained activities you can run: + +```sh +curl -X POST localhost:8080/wfp/child -H 'Content-Type: application/json' +``` + + + + +As result from executing the request you should see: + +```bash +!wolfkroW rpaD olleH +``` + +In the application output you should see the workflow activities being executed. + +```bash +io.dapr.workflows.WorkflowContext : Starting Workflow: io.dapr.springboot.examples.wfp.child.ParentWorkflow +io.dapr.workflows.WorkflowContext : calling childworkflow with input: Hello Dapr Workflow! +i.d.s.e.w.WorkflowPatternsRestController : Workflow instance f3ec9566-a0fc-4d28-8912-3f3ded3cd8a9 started +io.dapr.workflows.WorkflowContext : Starting ChildWorkflow: io.dapr.springboot.examples.wfp.child.ChildWorkflow +io.dapr.workflows.WorkflowContext : ChildWorkflow received input: Hello Dapr Workflow! +io.dapr.workflows.WorkflowContext : ChildWorkflow is calling Activity: io.dapr.springboot.examples.wfp.child.ReverseActivity +i.d.s.e.wfp.child.ReverseActivity : Starting Activity: io.dapr.springboot.examples.wfp.child.ReverseActivity +i.d.s.e.wfp.child.ReverseActivity : Message Received from input: Hello Dapr Workflow! +i.d.s.e.wfp.child.ReverseActivity : Sending message to output: !wolfkroW rpaD olleH +io.dapr.workflows.WorkflowContext : ChildWorkflow finished with: !wolfkroW rpaD olleH +io.dapr.workflows.WorkflowContext : childworkflow finished with: !wolfkroW rpaD olleH +``` + +### ContinueAsNew Workflows example + +In this example we start a workflow that every 3 seconds schedule a new workflow consistently. This workflow executes +one activity called CleanUpActivity that takes 2 seconds to complete. This loops repeat consistently for 5 times. + +To start the workflow you can run: + + + + +```sh +curl -X POST localhost:8080/wfp/continueasnew -H 'Content-Type: application/json' +``` + + + +As result from executing the request you should see: + +```bash +{"cleanUpTimes":5} +``` + +In the application output you should see the workflow activities being executed. + +```bash +io.dapr.workflows.WorkflowContext : Starting Workflow: io.dapr.springboot.examples.wfp.continueasnew.ContinueAsNewWorkflow +io.dapr.workflows.WorkflowContext : call CleanUpActivity to do the clean up +i.d.s.e.w.WorkflowPatternsRestController : Workflow instance b808e7d6-ab47-4eba-8188-dc9ff8780764 started +i.d.s.e.w.continueasnew.CleanUpActivity : Starting Activity: io.dapr.springboot.examples.wfp.continueasnew.CleanUpActivity +i.d.s.e.w.continueasnew.CleanUpActivity : start clean up work, it may take few seconds to finish... Time:10:48:45 +io.dapr.workflows.WorkflowContext : CleanUpActivity finished +io.dapr.workflows.WorkflowContext : wait 5 seconds for next clean up +io.dapr.workflows.WorkflowContext : Let's do more cleaning. +io.dapr.workflows.WorkflowContext : Starting Workflow: io.dapr.springboot.examples.wfp.continueasnew.ContinueAsNewWorkflow +io.dapr.workflows.WorkflowContext : call CleanUpActivity to do the clean up +i.d.s.e.w.continueasnew.CleanUpActivity : Starting Activity: io.dapr.springboot.examples.wfp.continueasnew.CleanUpActivity +i.d.s.e.w.continueasnew.CleanUpActivity : start clean up work, it may take few seconds to finish... Time:10:48:50 +io.dapr.workflows.WorkflowContext : CleanUpActivity finished +io.dapr.workflows.WorkflowContext : wait 5 seconds for next clean up +io.dapr.workflows.WorkflowContext : Let's do more cleaning. +io.dapr.workflows.WorkflowContext : Starting Workflow: io.dapr.springboot.examples.wfp.continueasnew.ContinueAsNewWorkflow +io.dapr.workflows.WorkflowContext : call CleanUpActivity to do the clean up +i.d.s.e.w.continueasnew.CleanUpActivity : Starting Activity: io.dapr.springboot.examples.wfp.continueasnew.CleanUpActivity +i.d.s.e.w.continueasnew.CleanUpActivity : start clean up work, it may take few seconds to finish... Time:10:48:55 +io.dapr.workflows.WorkflowContext : CleanUpActivity finished +io.dapr.workflows.WorkflowContext : wait 5 seconds for next clean up +io.dapr.workflows.WorkflowContext : Let's do more cleaning. +io.dapr.workflows.WorkflowContext : Starting Workflow: io.dapr.springboot.examples.wfp.continueasnew.ContinueAsNewWorkflow +io.dapr.workflows.WorkflowContext : call CleanUpActivity to do the clean up +i.d.s.e.w.continueasnew.CleanUpActivity : Starting Activity: io.dapr.springboot.examples.wfp.continueasnew.CleanUpActivity +i.d.s.e.w.continueasnew.CleanUpActivity : start clean up work, it may take few seconds to finish... Time:10:49:0 +io.dapr.workflows.WorkflowContext : CleanUpActivity finished +io.dapr.workflows.WorkflowContext : wait 5 seconds for next clean up +io.dapr.workflows.WorkflowContext : Let's do more cleaning. +io.dapr.workflows.WorkflowContext : Starting Workflow: io.dapr.springboot.examples.wfp.continueasnew.ContinueAsNewWorkflow +io.dapr.workflows.WorkflowContext : call CleanUpActivity to do the clean up +i.d.s.e.w.continueasnew.CleanUpActivity : Starting Activity: io.dapr.springboot.examples.wfp.continueasnew.CleanUpActivity +i.d.s.e.w.continueasnew.CleanUpActivity : start clean up work, it may take few seconds to finish... Time:10:49:5 +io.dapr.workflows.WorkflowContext : CleanUpActivity finished +io.dapr.workflows.WorkflowContext : wait 5 seconds for next clean up +io.dapr.workflows.WorkflowContext : We did enough cleaning +``` + +### External Event Workflow example + +In this example we start a workflow that as part of its execution waits for an external event to continue. To correlate +workflows and events we use the parameter `orderId` + +To start the workflow you can run: + + + + +```sh +curl -X POST "localhost:8080/wfp/externalevent?orderId=123" -H 'Content-Type: application/json' +``` + + + + +In the application output you should see the workflow activities being executed. + +```bash +io.dapr.workflows.WorkflowContext : Starting Workflow: io.dapr.springboot.examples.wfp.externalevent.ExternalEventWorkflow +io.dapr.workflows.WorkflowContext : Waiting for approval... +i.d.s.e.w.WorkflowPatternsRestController : Workflow instance 8a55cf6d-9059-49b1-8c83-fbe17567a02e started +``` + +You should see the Workflow ID that was created, in this example you don't need to remember this id, +as you can use the orderId to find the right instance. +When you are ready to approve the order you can send the following request: + + + + +To send the event you can run: + +```sh +curl -X POST "localhost:8080/wfp/externalevent-continue?orderId=123&decision=true" -H 'Content-Type: application/json' +``` + + + +```bash +{"approved":true} +``` + +In the application output you should see the workflow activities being executed. + +```bash +i.d.s.e.w.WorkflowPatternsRestController : Workflow instance e86bc464-6166-434d-8c91-d99040d6f54e continue +io.dapr.workflows.WorkflowContext : approval granted - do the approved action +i.d.s.e.w.externalevent.ApproveActivity : Starting Activity: io.dapr.springboot.examples.wfp.externalevent.ApproveActivity +i.d.s.e.w.externalevent.ApproveActivity : Running approval activity... +io.dapr.workflows.WorkflowContext : approval-activity finished +``` + +### Fan Out/In Workflow example + +In this example we start a workflow that takes an ArrayList of strings and calls one activity per item in the ArrayList. The activities +are executed and the workflow waits for all of them to complete to aggregate the results. + +```mermaid +graph LR + SW((Start + Workflow)) + subgraph for each word in the input + GWL[GetWordLength] + end + ALL[Wait until all tasks + are completed] + EW((End + Workflow)) + SW --> GWL + GWL --> ALL + ALL --> EW +``` + +To start the workflow you can run: + + + + +```sh +curl -X POST localhost:8080/wfp/fanoutin -H 'Content-Type: application/json' -d @body.json +``` + + + +As result from executing the request you should see: + +```bash +{"wordCount":60} +``` + +In the application output you should see the workflow activities being executed. + +```bash +io.dapr.workflows.WorkflowContext : Starting Workflow: io.dapr.springboot.examples.wfp.fanoutin.FanOutInWorkflow +i.d.s.e.w.WorkflowPatternsRestController : Workflow instance a771a7ba-f9fb-4399-aaee-a2fb0b102e5d started +i.d.s.e.wfp.fanoutin.CountWordsActivity : Starting Activity: io.dapr.springboot.examples.wfp.fanoutin.CountWordsActivity +i.d.s.e.wfp.fanoutin.CountWordsActivity : Starting Activity: io.dapr.springboot.examples.wfp.fanoutin.CountWordsActivity +i.d.s.e.wfp.fanoutin.CountWordsActivity : Starting Activity: io.dapr.springboot.examples.wfp.fanoutin.CountWordsActivity +i.d.s.e.wfp.fanoutin.CountWordsActivity : Activity returned: 2. +i.d.s.e.wfp.fanoutin.CountWordsActivity : Activity finished +i.d.s.e.wfp.fanoutin.CountWordsActivity : Activity returned: 11. +i.d.s.e.wfp.fanoutin.CountWordsActivity : Activity returned: 17. +i.d.s.e.wfp.fanoutin.CountWordsActivity : Activity finished +i.d.s.e.wfp.fanoutin.CountWordsActivity : Activity finished +i.d.s.e.wfp.fanoutin.CountWordsActivity : Starting Activity: io.dapr.springboot.examples.wfp.fanoutin.CountWordsActivity +i.d.s.e.wfp.fanoutin.CountWordsActivity : Activity returned: 21. +i.d.s.e.wfp.fanoutin.CountWordsActivity : Activity finished +i.d.s.e.wfp.fanoutin.CountWordsActivity : Starting Activity: io.dapr.springboot.examples.wfp.fanoutin.CountWordsActivity +i.d.s.e.wfp.fanoutin.CountWordsActivity : Activity returned: 9. +i.d.s.e.wfp.fanoutin.CountWordsActivity : Activity finished +io.dapr.workflows.WorkflowContext : Workflow finished with result: 60 +``` + +### Suspend/Resume Workflow example + +In this example, we start a workflow that executes an activity and then waits for an event. While the workflow instance +is waiting for the event, we execute a suspend workflow operation. Once we check the state of the instance, a resume +operation is executed. + +To start the workflow, you can run: + + + + +```sh +curl -X POST "localhost:8080/wfp/suspendresume?orderId=123" -H 'Content-Type: application/json' +``` + + + +In the application output you should see the workflow activities being executed. + +```bash +io.dapr.workflows.WorkflowContext : Starting Workflow: io.dapr.springboot.examples.wfp.suspendresume.SuspendResumeWorkflow +i.d.s.e.w.WorkflowPatternsRestController : Workflow instance 2de2b968-900a-4f5b-9092-b26aefbfc6b3 started +i.d.s.e.w.s.PerformTaskActivity : Starting Activity: io.dapr.springboot.examples.wfp.suspendresume.PerformTaskActivity +i.d.s.e.w.s.PerformTaskActivity : Running activity... +i.d.s.e.w.s.PerformTaskActivity : Completing activity... +io.dapr.workflows.WorkflowContext : Waiting for approval... + +``` + +You should see the Workflow ID that was created, in this example you don't need to remember this id, +as you can use the orderId to find the right instance. + + + + + +Let's suspend the workflow instance by sending the following request: + +```sh +curl -X POST "localhost:8080/wfp/suspendresume/suspend?orderId=123" -H 'Content-Type: application/json' +``` + + + + +You should see the output of the requests: + +```sh +SUSPENDED +``` + +Now, let's resume the workflow instance: + + + + +To send the event you can run: + +```sh +curl -X POST "localhost:8080/wfp/suspendresume/resume?orderId=123" -H 'Content-Type: application/json' +``` + + + +You should see the output of the requests: + +```sh +RUNNING +``` + +Now, let's send the event that the instance is waiting to validate that the workflow complete after +being suspended and resumed. + + + + +To send the event you can run: + +```sh +curl -X POST "localhost:8080/wfp/suspendresume/continue?orderId=123&decision=true" -H 'Content-Type: application/json' +``` + + + +The output of the request contains the output of the workflow based on the `decision` parameter that we sent. + +```bash +{"approved":true} +``` + +In the application output you should see, that the workflow instance completed correctly: + +```sh +i.d.s.e.w.WorkflowPatternsRestController : Workflow instance 2de2b968-900a-4f5b-9092-b26aefbfc6b3 continue +io.dapr.workflows.WorkflowContext : approval-event arrived +i.d.s.e.w.s.PerformTaskActivity : Starting Activity: io.dapr.springboot.examples.wfp.suspendresume.PerformTaskActivity +i.d.s.e.w.s.PerformTaskActivity : Running activity... +i.d.s.e.w.s.PerformTaskActivity : Completing activity... +``` + +## Testing workflow executions + +Workflow execution can be tested using Testcontainers and you can find all the tests for the patterns covered in this +application [here](test/java/io/dapr/springboot/examples/wfp/TestWorkflowPatternsApplication.java). diff --git a/spring-boot-examples/workflows/src/main/java/io/dapr/springboot/examples/wfp/WorkflowPatternsRestController.java b/spring-boot-examples/workflows/src/main/java/io/dapr/springboot/examples/wfp/WorkflowPatternsRestController.java new file mode 100644 index 000000000..ad64993e1 --- /dev/null +++ b/spring-boot-examples/workflows/src/main/java/io/dapr/springboot/examples/wfp/WorkflowPatternsRestController.java @@ -0,0 +1,192 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot.examples.wfp; + +import io.dapr.spring.workflows.config.EnableDaprWorkflows; +import io.dapr.springboot.examples.wfp.chain.ChainWorkflow; +import io.dapr.springboot.examples.wfp.child.ParentWorkflow; +import io.dapr.springboot.examples.wfp.continueasnew.CleanUpLog; +import io.dapr.springboot.examples.wfp.continueasnew.ContinueAsNewWorkflow; +import io.dapr.springboot.examples.wfp.externalevent.Decision; +import io.dapr.springboot.examples.wfp.externalevent.ExternalEventWorkflow; +import io.dapr.springboot.examples.wfp.fanoutin.FanOutInWorkflow; +import io.dapr.springboot.examples.wfp.fanoutin.Result; +import io.dapr.springboot.examples.wfp.remoteendpoint.Payload; +import io.dapr.springboot.examples.wfp.remoteendpoint.RemoteEndpointWorkflow; +import io.dapr.springboot.examples.wfp.suspendresume.SuspendResumeWorkflow; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowInstanceStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +@RestController +@EnableDaprWorkflows +public class WorkflowPatternsRestController { + + private final Logger logger = LoggerFactory.getLogger(WorkflowPatternsRestController.class); + + @Autowired + private DaprWorkflowClient daprWorkflowClient; + + @Autowired + private CleanUpLog cleanUpLog; + + private Map ordersToApprove = new HashMap<>(); + + + + /** + * Run Chain Demo Workflow + * @return the output of the ChainWorkflow execution + */ + @PostMapping("wfp/chain") + public String chain() throws TimeoutException { + String instanceId = daprWorkflowClient.scheduleNewWorkflow(ChainWorkflow.class); + logger.info("Workflow instance " + instanceId + " started"); + return daprWorkflowClient + .waitForInstanceCompletion(instanceId, Duration.ofSeconds(2), true) + .readOutputAs(String.class); + } + + + /** + * Run Child Demo Workflow + * @return confirmation that the workflow instance was created for the workflow pattern child + */ + @PostMapping("wfp/child") + public String child() throws TimeoutException { + String instanceId = daprWorkflowClient.scheduleNewWorkflow(ParentWorkflow.class); + logger.info("Workflow instance " + instanceId + " started"); + return daprWorkflowClient + .waitForInstanceCompletion(instanceId, Duration.ofSeconds(2), true) + .readOutputAs(String.class); + } + + + /** + * Run Fan Out/in Demo Workflow + * @return confirmation that the workflow instance was created for the workflow pattern faninout + */ + @PostMapping("wfp/fanoutin") + public Result fanOutIn(@RequestBody List listOfStrings) throws TimeoutException { + + String instanceId = daprWorkflowClient.scheduleNewWorkflow(FanOutInWorkflow.class, listOfStrings); + logger.info("Workflow instance " + instanceId + " started"); + + // Block until the orchestration completes. Then print the final status, which includes the output. + WorkflowInstanceStatus workflowInstanceStatus = daprWorkflowClient.waitForInstanceCompletion( + instanceId, + Duration.ofSeconds(30), + true); + logger.info("workflow instance with ID: %s completed with result: %s%n", instanceId, + workflowInstanceStatus.readOutputAs(Result.class)); + return workflowInstanceStatus.readOutputAs(Result.class); + } + + /** + * Run External Event Workflow Pattern + * @return confirmation that the workflow instance was created for the workflow pattern externalevent + */ + @PostMapping("wfp/externalevent") + public String externalEvent(@RequestParam("orderId") String orderId) { + String instanceId = daprWorkflowClient.scheduleNewWorkflow(ExternalEventWorkflow.class); + ordersToApprove.put(orderId, instanceId); + logger.info("Workflow instance " + instanceId + " started"); + return instanceId; + } + + @PostMapping("wfp/externalevent-continue") + public Decision externalEventContinue(@RequestParam("orderId") String orderId, @RequestParam("decision") Boolean decision) + throws TimeoutException { + String instanceId = ordersToApprove.get(orderId); + logger.info("Workflow instance " + instanceId + " continue"); + daprWorkflowClient.raiseEvent(instanceId, "Approval", decision); + WorkflowInstanceStatus workflowInstanceStatus = daprWorkflowClient + .waitForInstanceCompletion(instanceId, null, true); + return workflowInstanceStatus.readOutputAs(Decision.class); + } + + @PostMapping("wfp/continueasnew") + public CleanUpLog continueAsNew() + throws TimeoutException { + + cleanUpLog.clearLog(); + String instanceId = daprWorkflowClient.scheduleNewWorkflow(ContinueAsNewWorkflow.class); + logger.info("Workflow instance " + instanceId + " started"); + + WorkflowInstanceStatus workflowInstanceStatus = daprWorkflowClient.waitForInstanceCompletion(instanceId, null, true); + System.out.printf("workflow instance with ID: %s completed.", instanceId); + return workflowInstanceStatus.readOutputAs(CleanUpLog.class); + } + + @PostMapping("wfp/remote-endpoint") + public Payload remoteEndpoint(@RequestBody Payload payload) + throws TimeoutException { + + String instanceId = daprWorkflowClient.scheduleNewWorkflow(RemoteEndpointWorkflow.class, payload); + logger.info("Workflow instance " + instanceId + " started"); + + WorkflowInstanceStatus workflowInstanceStatus = daprWorkflowClient + .waitForInstanceCompletion(instanceId, null, true); + System.out.printf("workflow instance with ID: %s completed.", instanceId); + return workflowInstanceStatus.readOutputAs(Payload.class); + } + + @PostMapping("wfp/suspendresume") + public String suspendResume(@RequestParam("orderId") String orderId) { + String instanceId = daprWorkflowClient.scheduleNewWorkflow(SuspendResumeWorkflow.class); + logger.info("Workflow instance " + instanceId + " started"); + ordersToApprove.put(orderId, instanceId); + return instanceId; + } + + @PostMapping("wfp/suspendresume/suspend") + public String suspendResumeExecuteSuspend(@RequestParam("orderId") String orderId) { + String instanceId = ordersToApprove.get(orderId); + daprWorkflowClient.suspendWorkflow(instanceId, "testing suspend"); + WorkflowInstanceStatus instanceState = daprWorkflowClient.getInstanceState(instanceId, false); + return instanceState.getRuntimeStatus().name(); + } + + @PostMapping("wfp/suspendresume/resume") + public String suspendResumeExecuteResume(@RequestParam("orderId") String orderId) { + String instanceId = ordersToApprove.get(orderId); + daprWorkflowClient.resumeWorkflow(instanceId, "testing resume"); + WorkflowInstanceStatus instanceState = daprWorkflowClient.getInstanceState(instanceId, false); + return instanceState.getRuntimeStatus().name(); + } + + + @PostMapping("wfp/suspendresume/continue") + public Decision suspendResumeContinue(@RequestParam("orderId") String orderId, @RequestParam("decision") Boolean decision) + throws TimeoutException { + String instanceId = ordersToApprove.get(orderId); + logger.info("Workflow instance " + instanceId + " continue"); + daprWorkflowClient.raiseEvent(instanceId, "Approval", decision); + WorkflowInstanceStatus workflowInstanceStatus = daprWorkflowClient + .waitForInstanceCompletion(instanceId, null, true); + return workflowInstanceStatus.readOutputAs(Decision.class); + } +} diff --git a/spring-boot-examples/workflows/src/main/java/io/dapr/springboot/examples/wfp/suspendresume/PerformTaskActivity.java b/spring-boot-examples/workflows/src/main/java/io/dapr/springboot/examples/wfp/suspendresume/PerformTaskActivity.java new file mode 100644 index 000000000..6a2ac1d9a --- /dev/null +++ b/spring-boot-examples/workflows/src/main/java/io/dapr/springboot/examples/wfp/suspendresume/PerformTaskActivity.java @@ -0,0 +1,43 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot.examples.wfp.suspendresume; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +@Component +public class PerformTaskActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + Logger logger = LoggerFactory.getLogger(PerformTaskActivity.class); + logger.info("Starting Activity: " + ctx.getName()); + + logger.info("Running activity..."); + //Sleeping for 5 seconds to simulate long running operation + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + logger.info("Completing activity..."); + + return "OK"; + } +} diff --git a/spring-boot-examples/workflows/src/main/java/io/dapr/springboot/examples/wfp/suspendresume/SuspendResumeWorkflow.java b/spring-boot-examples/workflows/src/main/java/io/dapr/springboot/examples/wfp/suspendresume/SuspendResumeWorkflow.java new file mode 100644 index 000000000..b9578a687 --- /dev/null +++ b/spring-boot-examples/workflows/src/main/java/io/dapr/springboot/examples/wfp/suspendresume/SuspendResumeWorkflow.java @@ -0,0 +1,40 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot.examples.wfp.suspendresume; + +import io.dapr.springboot.examples.wfp.externalevent.Decision; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import org.springframework.stereotype.Component; + +@Component +public class SuspendResumeWorkflow implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + ctx.getLogger().info("Starting Workflow: " + ctx.getName()); + + ctx.callActivity(PerformTaskActivity.class.getName(), String.class).await(); + + ctx.getLogger().info("Waiting for approval..."); + Boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await(); + + ctx.getLogger().info("approval-event arrived"); + + ctx.callActivity(PerformTaskActivity.class.getName(), String.class).await(); + + ctx.complete(new Decision(approved)); + }; + } +} diff --git a/spring-boot-examples/workflows/src/test/java/io/dapr/springboot/examples/wfp/WorkflowPatternsAppTests.java b/spring-boot-examples/workflows/src/test/java/io/dapr/springboot/examples/wfp/WorkflowPatternsAppTests.java new file mode 100644 index 000000000..6b2ed0501 --- /dev/null +++ b/spring-boot-examples/workflows/src/test/java/io/dapr/springboot/examples/wfp/WorkflowPatternsAppTests.java @@ -0,0 +1,204 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.springboot.examples.wfp; + +import io.dapr.springboot.DaprAutoConfiguration; +import io.dapr.springboot.examples.wfp.continueasnew.CleanUpLog; +import io.dapr.springboot.examples.wfp.remoteendpoint.Payload; +import io.dapr.workflows.client.WorkflowRuntimeStatus; +import io.github.microcks.testcontainers.MicrocksContainersEnsemble; +import io.restassured.RestAssured; +import io.restassured.http.ContentType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.Arrays; +import java.util.List; + +import static io.restassured.RestAssured.given; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@SpringBootTest(classes = {TestWorkflowPatternsApplication.class, DaprTestContainersConfig.class, + DaprAutoConfiguration.class, }, + webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT) +class WorkflowPatternsAppTests { + + @Autowired + private MicrocksContainersEnsemble ensemble; + + @BeforeEach + void setUp() { + RestAssured.baseURI = "http://localhost:" + 8080; + org.testcontainers.Testcontainers.exposeHostPorts(8080); + } + + + @Test + void testChainWorkflow() { + given().contentType(ContentType.JSON) + .body("") + .when() + .post("/wfp/chain") + .then() + .statusCode(200).body(containsString("TOKYO, LONDON, SEATTLE")); + } + + @Test + void testChildWorkflow() { + given().contentType(ContentType.JSON) + .body("") + .when() + .post("/wfp/child") + .then() + .statusCode(200).body(containsString("!wolfkroW rpaD olleH")); + } + + @Test + void testFanOutIn() { + List listOfStrings = Arrays.asList( + "Hello, world!", + "The quick brown fox jumps over the lazy dog.", + "If a tree falls in the forest and there is no one there to hear it, does it make a sound?", + "The greatest glory in living lies not in never falling, but in rising every time we fall.", + "Always remember that you are absolutely unique. Just like everyone else."); + + given().contentType(ContentType.JSON) + .body(listOfStrings) + .when() + .post("/wfp/fanoutin") + .then() + .statusCode(200).body("wordCount",equalTo(60)); + } + + @Test + void testExternalEventApprove() { + + given() + .queryParam("orderId", "123") + .when() + .post("/wfp/externalevent") + .then() + .statusCode(200).extract().asString(); + + + + given() + .queryParam("orderId", "123") + .queryParam("decision", true) + .when() + .post("/wfp/externalevent-continue") + .then() + .statusCode(200).body("approved", equalTo(true)); + } + + @Test + void testExternalEventDeny() { + + given() + .queryParam("orderId", "123") + .when() + .post("/wfp/externalevent") + .then() + .statusCode(200).extract().asString(); + + + + given() + .queryParam("orderId", "123") + .queryParam("decision", false) + .when() + .post("/wfp/externalevent-continue") + .then() + .statusCode(200).body("approved", equalTo(false)); + } + + + @Test + void testContinueAsNew() { + //This call blocks until all the clean up activities are executed + CleanUpLog cleanUpLog = given().contentType(ContentType.JSON) + .body("") + .when() + .post("/wfp/continueasnew") + .then() + .statusCode(200).extract().as(CleanUpLog.class); + + assertEquals(5, cleanUpLog.getCleanUpTimes()); + } + + @Test + void testRemoteEndpoint() { + + Payload payload = given().contentType(ContentType.JSON) + .body(new Payload("123", "content goes here")) + .when() + .post("/wfp/remote-endpoint") + .then() + .statusCode(200).extract().as(Payload.class); + + assertEquals(true, payload.getProcessed()); + + assertEquals(2, ensemble.getMicrocksContainer() + .getServiceInvocationsCount("API Payload Processor", "1.0.0")); + } + + @Test + void testSuspendResume() { + + String instanceId = given() + .queryParam("orderId", "123") + .when() + .post("/wfp/suspendresume") + .then() + .statusCode(200).extract().asString(); + + assertNotNull(instanceId); + + // The workflow is waiting on an event, let's suspend the workflow + String state = given() + .queryParam("orderId", "123") + .when() + .post("/wfp/suspendresume/suspend") + .then() + .statusCode(200).extract().asString(); + + assertEquals(WorkflowRuntimeStatus.SUSPENDED.name(), state); + + // The let's resume the suspended workflow and check the state + state = given() + .queryParam("orderId", "123") + .when() + .post("/wfp/suspendresume/resume") + .then() + .statusCode(200).extract().asString(); + + assertEquals(WorkflowRuntimeStatus.RUNNING.name(), state); + + // Now complete the workflow by sending an event + given() + .queryParam("orderId", "123") + .queryParam("decision", false) + .when() + .post("/wfp/suspendresume/continue") + .then() + .statusCode(200).body("approved", equalTo(false)); + + } + +}