Adding Support for Suspend / Resume Workflows (#1405)

* adding IT test

Signed-off-by: salaboy <Salaboy@gmail.com>

* adding initial version of suspend/resume example

Signed-off-by: salaboy <Salaboy@gmail.com>

* updating README

Signed-off-by: salaboy <Salaboy@gmail.com>

* Update README.md

Signed-off-by: salaboy <Salaboy@gmail.com>

* following Javi's suggestion

Signed-off-by: salaboy <Salaboy@gmail.com>

* fixing wrong year in headers

Signed-off-by: salaboy <Salaboy@gmail.com>

* fixing paths in one more README.md file

Signed-off-by: salaboy <Salaboy@gmail.com>

* adding output validation

Signed-off-by: salaboy <Salaboy@gmail.com>

* adding missing port

Signed-off-by: salaboy <Salaboy@gmail.com>

* fixing check conditions

Signed-off-by: salaboy <Salaboy@gmail.com>

---------

Signed-off-by: salaboy <Salaboy@gmail.com>
Co-authored-by: Cassie Coyle <cassie.i.coyle@gmail.com>
This commit is contained in:
salaboy 2025-06-11 22:52:29 +08:00 committed by GitHub
parent e13f934efe
commit dcaca773b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 517 additions and 7 deletions

View File

@ -420,7 +420,6 @@ client.raiseEvent(instanceId, "Approval", true);
Start the workflow and client using the following commands:
ex
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.externalevent.DemoExternalEventWorker
```
@ -652,4 +651,60 @@ Key Points:
1. Each successful booking step adds its compensation action to an ArrayList
2. If an error occurs, the list of compensations is reversed and executed in reverse order
3. The workflow ensures that all resources are properly cleaned up even if the process fails
4. Each activity simulates work with a short delay for demonstration purposes
4. Each activity simulates work with a short delay for demonstration purposes
### Suspend/Resume Pattern
Workflow instances can be suspended and resumed. This example shows how to use the suspend and resume commands.
For testing the suspend and resume operations we will use the same workflow definition used by the DemoExternalEventWorkflow.
Start the workflow and client using the following commands:
<!-- STEP
name: Run Suspend/Resume workflow
match_order: none
output_match_mode: substring
expected_stdout_lines:
- "Waiting for approval..."
- "Suspending Workflow Instance"
- "Workflow Instance Status: SUSPENDED"
- "Let's resume the Workflow Instance before sending the external event"
- "Workflow Instance Status: RUNNING"
- "Now that the instance is RUNNING again, lets send the external event."
- "approval granted - do the approved action"
- "Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity"
- "Running approval activity..."
- "approval-activity finished"
background: true
sleep: 60
timeout_seconds: 60
-->
```sh
dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.suspendresume.DemoSuspendResumeWorker
```
```sh
java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.suspendresume.DemoSuspendResumeClient
```
<!-- END_STEP -->
The worker logs:
```text
== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.examples.workflows.suspendresume.DemoExternalEventWorkflow
== APP == 2023-11-07 16:01:23,279 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - Waiting for approval...
== APP == 2023-11-07 16:01:23,324 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval granted - do the approved action
== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Starting Activity: io.dapr.examples.workflows.externalevent.ApproveActivity
== APP == 2023-11-07 16:01:23,348 {HH:mm:ss.SSS} [main] INFO i.d.e.w.e.ApproveActivity - Running approval activity...
== APP == 2023-11-07 16:01:28,410 {HH:mm:ss.SSS} [main] INFO io.dapr.workflows.WorkflowContext - approval-activity finished
```
The client log:
```text
Started a new external-event model workflow with instance ID: 23410d96-1afe-4698-9fcd-c01c1e0db255
workflow instance with ID: 23410d96-1afe-4698-9fcd-c01c1e0db255 completed.
```

View File

@ -0,0 +1,59 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.workflows.suspendresume;
import io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow;
import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus;
import java.util.concurrent.TimeoutException;
public class DemoSuspendResumeClient {
/**
* The main method to start the client.
*
* @param args Input arguments (unused).
* @throws InterruptedException If program has been interrupted.
*/
public static void main(String[] args) {
try (DaprWorkflowClient client = new DaprWorkflowClient()) {
String instanceId = client.scheduleNewWorkflow(DemoExternalEventWorkflow.class);
System.out.printf("Started a new external-event workflow with instance ID: %s%n", instanceId);
System.out.printf("Suspending Workflow Instance: %s%n", instanceId );
client.suspendWorkflow(instanceId, "suspending workflow instance.");
WorkflowInstanceStatus instanceState = client.getInstanceState(instanceId, false);
assert instanceState != null;
System.out.printf("Workflow Instance Status: %s%n", instanceState.getRuntimeStatus().name() );
System.out.printf("Let's resume the Workflow Instance before sending the external event: %s%n", instanceId );
client.resumeWorkflow(instanceId, "resuming workflow instance.");
instanceState = client.getInstanceState(instanceId, false);
assert instanceState != null;
System.out.printf("Workflow Instance Status: %s%n", instanceState.getRuntimeStatus().name() );
System.out.printf("Now that the instance is RUNNING again, lets send the external event. %n");
client.raiseEvent(instanceId, "Approval", true);
client.waitForInstanceCompletion(instanceId, null, true);
System.out.printf("workflow instance with ID: %s completed.", instanceId);
} catch (TimeoutException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.examples.workflows.suspendresume;
import io.dapr.examples.workflows.externalevent.ApproveActivity;
import io.dapr.examples.workflows.externalevent.DemoExternalEventWorkflow;
import io.dapr.examples.workflows.externalevent.DenyActivity;
import io.dapr.workflows.runtime.WorkflowRuntime;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
public class DemoSuspendResumeWorker {
/**
* The main method of this app.
*
* @param args The port the app will listen on.
* @throws Exception An Exception.
*/
public static void main(String[] args) throws Exception {
// Register the Workflow with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoExternalEventWorkflow.class);
builder.registerActivity(ApproveActivity.class);
builder.registerActivity(DenyActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
WorkflowRuntime runtime = builder.build();
System.out.println("Start workflow runtime");
runtime.start();
}
}

View File

@ -20,6 +20,7 @@ import io.dapr.testcontainers.DaprContainer;
import io.dapr.testcontainers.DaprLogLevel;
import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus;
import io.dapr.workflows.client.WorkflowRuntimeStatus;
import io.dapr.workflows.runtime.WorkflowRuntime;
import io.dapr.workflows.runtime.WorkflowRuntimeBuilder;
import org.junit.jupiter.api.BeforeEach;
@ -117,6 +118,36 @@ public class DaprWorkflowsIT {
assertEquals(instanceId, workflowOutput.getWorkflowId());
}
@Test
public void testSuspendAndResumeWorkflows() throws Exception {
TestWorkflowPayload payload = new TestWorkflowPayload(new ArrayList<>());
String instanceId = workflowClient.scheduleNewWorkflow(TestWorkflow.class, payload);
workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false);
workflowClient.suspendWorkflow(instanceId, "testing suspend.");
WorkflowInstanceStatus instanceState = workflowClient.getInstanceState(instanceId, false);
assertNotNull(instanceState);
assertEquals(WorkflowRuntimeStatus.SUSPENDED, instanceState.getRuntimeStatus());
workflowClient.resumeWorkflow(instanceId, "testing resume");
instanceState = workflowClient.getInstanceState(instanceId, false);
assertNotNull(instanceState);
assertEquals(WorkflowRuntimeStatus.RUNNING, instanceState.getRuntimeStatus());
workflowClient.raiseEvent(instanceId, "MoveForward", payload);
Duration timeout = Duration.ofSeconds(10);
instanceState = workflowClient.waitForInstanceCompletion(instanceId, timeout, true);
assertNotNull(instanceState);
assertEquals(WorkflowRuntimeStatus.COMPLETED, instanceState.getRuntimeStatus());
}
private TestWorkflowPayload deserialize(String value) throws JsonProcessingException {
return OBJECT_MAPPER.readValue(value, TestWorkflowPayload.class);
}

View File

@ -129,6 +129,26 @@ public class DaprWorkflowClient implements AutoCloseable {
orchestrationInstanceOptions);
}
/**
* Suspend the workflow associated with the provided instance id.
*
* @param workflowInstanceId Workflow instance id to suspend.
* @param reason reason for suspending the workflow instance.
*/
public void suspendWorkflow(String workflowInstanceId, @Nullable String reason) {
this.innerClient.suspendInstance(workflowInstanceId, reason);
}
/**
* Resume the workflow associated with the provided instance id.
*
* @param workflowInstanceId Workflow instance id to resume.
* @param reason reason for resuming the workflow instance.
*/
public void resumeWorkflow(String workflowInstanceId, @Nullable String reason) {
this.innerClient.resumeInstance(workflowInstanceId, reason);
}
/**
* Terminates the workflow associated with the provided instance id.
*

View File

@ -217,6 +217,17 @@ public class DaprWorkflowClientTest {
expectedEventName, expectedEventPayload);
}
@Test
public void suspendResumeInstance() {
String expectedArgument = "TestWorkflowInstanceId";
client.suspendWorkflow(expectedArgument, "suspending workflow instance");
client.resumeWorkflow(expectedArgument, "resuming workflow instance");
verify(mockInnerClient, times(1)).suspendInstance(expectedArgument,
"suspending workflow instance");
verify(mockInnerClient, times(1)).resumeInstance(expectedArgument,
"resuming workflow instance");
}
@Test
public void purgeInstance() {
String expectedArgument = "TestWorkflowInstanceId";

View File

@ -5,6 +5,7 @@ This application allows you to run different [workflow patterns](https://docs.da
- Parent/Child Workflows
- Continue workflow by sending External Events
- Fan Out/In activities for parallel execution
- Suspend/Resume workflows
## Running these examples from source code
@ -392,8 +393,141 @@ i.d.s.e.wfp.fanoutin.CountWordsActivity : Activity finished
io.dapr.workflows.WorkflowContext : Workflow finished with result: 60
```
### Suspend/Resume Workflow example
In this example, we start a workflow that executes an activity and then waits for an event. While the workflow instance
is waiting for the event, we execute a suspend workflow operation. Once we check the state of the instance, a resume
operation is executed.
To start the workflow, you can run:
<!-- STEP
name: Start Suspend/Resume Workflow
match_order: none
output_match_mode: substring
background: true
sleep: 1
timeout_seconds: 2
-->
<!-- Timeout for above service must be more than sleep + timeout for the client-->
```sh
curl -X POST "localhost:8080/wfp/suspendresume?orderId=123" -H 'Content-Type: application/json'
```
<!-- END_STEP -->
In the application output you should see the workflow activities being executed.
```bash
io.dapr.workflows.WorkflowContext : Starting Workflow: io.dapr.springboot.examples.wfp.suspendresume.SuspendResumeWorkflow
i.d.s.e.w.WorkflowPatternsRestController : Workflow instance 2de2b968-900a-4f5b-9092-b26aefbfc6b3 started
i.d.s.e.w.s.PerformTaskActivity : Starting Activity: io.dapr.springboot.examples.wfp.suspendresume.PerformTaskActivity
i.d.s.e.w.s.PerformTaskActivity : Running activity...
i.d.s.e.w.s.PerformTaskActivity : Completing activity...
io.dapr.workflows.WorkflowContext : Waiting for approval...
```
You should see the Workflow ID that was created, in this example you don't need to remember this id,
as you can use the orderId to find the right instance.
<!-- STEP
name: Suspend Workflow Operation
match_order: none
output_match_mode: substring
expected_stdout_lines:
- 'SUSPENDED'
background: true
sleep: 5
timeout_seconds: 10
-->
<!-- Timeout for above service must be more than sleep + timeout for the client-->
Let's suspend the workflow instance by sending the following request:
```sh
curl -X POST "localhost:8080/wfp/suspendresume/suspend?orderId=123" -H 'Content-Type: application/json'
```
<!-- END_STEP -->
You should see the output of the requests:
```sh
SUSPENDED
```
Now, let's resume the workflow instance:
<!-- STEP
name: Resume Workflow Operation
match_order: none
output_match_mode: substring
expected_stdout_lines:
- 'RUNNING'
background: true
sleep: 5
timeout_seconds: 10
-->
<!-- Timeout for above service must be more than sleep + timeout for the client-->
To send the event you can run:
```sh
curl -X POST "localhost:8080/wfp/suspendresume/resume?orderId=123" -H 'Content-Type: application/json'
```
<!-- END_STEP -->
You should see the output of the requests:
```sh
RUNNING
```
Now, let's send the event that the instance is waiting to validate that the workflow complete after
being suspended and resumed.
<!-- STEP
name: Send External Event
match_order: none
output_match_mode: substring
expected_stdout_lines:
- '{"approved":true}'
background: true
sleep: 5
timeout_seconds: 10
-->
<!-- Timeout for above service must be more than sleep + timeout for the client-->
To send the event you can run:
```sh
curl -X POST "localhost:8080/wfp/suspendresume/continue?orderId=123&decision=true" -H 'Content-Type: application/json'
```
<!-- END_STEP -->
The output of the request contains the output of the workflow based on the `decision` parameter that we sent.
```bash
{"approved":true}
```
In the application output you should see, that the workflow instance completed correctly:
```sh
i.d.s.e.w.WorkflowPatternsRestController : Workflow instance 2de2b968-900a-4f5b-9092-b26aefbfc6b3 continue
io.dapr.workflows.WorkflowContext : approval-event arrived
i.d.s.e.w.s.PerformTaskActivity : Starting Activity: io.dapr.springboot.examples.wfp.suspendresume.PerformTaskActivity
i.d.s.e.w.s.PerformTaskActivity : Running activity...
i.d.s.e.w.s.PerformTaskActivity : Completing activity...
```
## Testing workflow executions
Workflow execution can be tested using Testcontainers and you can find all the tests for the patterns covered in this
application [here](test/java/io/dapr/springboot/examples/wfp/TestWorkflowPatternsApplication.java).
application [here](test/java/io/dapr/springboot/examples/wfp/TestWorkflowPatternsApplication.java).

View File

@ -24,6 +24,7 @@ import io.dapr.springboot.examples.wfp.fanoutin.FanOutInWorkflow;
import io.dapr.springboot.examples.wfp.fanoutin.Result;
import io.dapr.springboot.examples.wfp.remoteendpoint.Payload;
import io.dapr.springboot.examples.wfp.remoteendpoint.RemoteEndpointWorkflow;
import io.dapr.springboot.examples.wfp.suspendresume.SuspendResumeWorkflow;
import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus;
import org.slf4j.Logger;
@ -153,4 +154,39 @@ public class WorkflowPatternsRestController {
return workflowInstanceStatus.readOutputAs(Payload.class);
}
@PostMapping("wfp/suspendresume")
public String suspendResume(@RequestParam("orderId") String orderId) {
String instanceId = daprWorkflowClient.scheduleNewWorkflow(SuspendResumeWorkflow.class);
logger.info("Workflow instance " + instanceId + " started");
ordersToApprove.put(orderId, instanceId);
return instanceId;
}
@PostMapping("wfp/suspendresume/suspend")
public String suspendResumeExecuteSuspend(@RequestParam("orderId") String orderId) {
String instanceId = ordersToApprove.get(orderId);
daprWorkflowClient.suspendWorkflow(instanceId, "testing suspend");
WorkflowInstanceStatus instanceState = daprWorkflowClient.getInstanceState(instanceId, false);
return instanceState.getRuntimeStatus().name();
}
@PostMapping("wfp/suspendresume/resume")
public String suspendResumeExecuteResume(@RequestParam("orderId") String orderId) {
String instanceId = ordersToApprove.get(orderId);
daprWorkflowClient.resumeWorkflow(instanceId, "testing resume");
WorkflowInstanceStatus instanceState = daprWorkflowClient.getInstanceState(instanceId, false);
return instanceState.getRuntimeStatus().name();
}
@PostMapping("wfp/suspendresume/continue")
public Decision suspendResumeContinue(@RequestParam("orderId") String orderId, @RequestParam("decision") Boolean decision)
throws TimeoutException {
String instanceId = ordersToApprove.get(orderId);
logger.info("Workflow instance " + instanceId + " continue");
daprWorkflowClient.raiseEvent(instanceId, "Approval", decision);
WorkflowInstanceStatus workflowInstanceStatus = daprWorkflowClient
.waitForInstanceCompletion(instanceId, null, true);
return workflowInstanceStatus.readOutputAs(Decision.class);
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.springboot.examples.wfp.suspendresume;
import io.dapr.workflows.WorkflowActivity;
import io.dapr.workflows.WorkflowActivityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class PerformTaskActivity implements WorkflowActivity {
@Override
public Object run(WorkflowActivityContext ctx) {
Logger logger = LoggerFactory.getLogger(PerformTaskActivity.class);
logger.info("Starting Activity: " + ctx.getName());
logger.info("Running activity...");
//Sleeping for 5 seconds to simulate long running operation
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("Completing activity...");
return "OK";
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.springboot.examples.wfp.suspendresume;
import io.dapr.springboot.examples.wfp.externalevent.Decision;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import org.springframework.stereotype.Component;
@Component
public class SuspendResumeWorkflow implements Workflow {
@Override
public WorkflowStub create() {
return ctx -> {
ctx.getLogger().info("Starting Workflow: " + ctx.getName());
ctx.callActivity(PerformTaskActivity.class.getName(), String.class).await();
ctx.getLogger().info("Waiting for approval...");
Boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await();
ctx.getLogger().info("approval-event arrived");
ctx.callActivity(PerformTaskActivity.class.getName(), String.class).await();
ctx.complete(new Decision(approved));
};
}
}

View File

@ -13,10 +13,10 @@ limitations under the License.
package io.dapr.springboot.examples.wfp;
import io.dapr.client.DaprClient;
import io.dapr.springboot.DaprAutoConfiguration;
import io.dapr.springboot.examples.wfp.continueasnew.CleanUpLog;
import io.dapr.springboot.examples.wfp.remoteendpoint.Payload;
import io.dapr.workflows.client.WorkflowRuntimeStatus;
import io.github.microcks.testcontainers.MicrocksContainersEnsemble;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
@ -32,15 +32,13 @@ import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@SpringBootTest(classes = {TestWorkflowPatternsApplication.class, DaprTestContainersConfig.class,
DaprAutoConfiguration.class, },
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
class WorkflowPatternsAppTests {
@Autowired
private DaprClient daprClient;
@Autowired
private MicrocksContainersEnsemble ensemble;
@ -160,4 +158,47 @@ class WorkflowPatternsAppTests {
.getServiceInvocationsCount("API Payload Processor", "1.0.0"));
}
@Test
void testSuspendResume() {
String instanceId = given()
.queryParam("orderId", "123")
.when()
.post("/wfp/suspendresume")
.then()
.statusCode(200).extract().asString();
assertNotNull(instanceId);
// The workflow is waiting on an event, let's suspend the workflow
String state = given()
.queryParam("orderId", "123")
.when()
.post("/wfp/suspendresume/suspend")
.then()
.statusCode(200).extract().asString();
assertEquals(WorkflowRuntimeStatus.SUSPENDED.name(), state);
// The let's resume the suspended workflow and check the state
state = given()
.queryParam("orderId", "123")
.when()
.post("/wfp/suspendresume/resume")
.then()
.statusCode(200).extract().asString();
assertEquals(WorkflowRuntimeStatus.RUNNING.name(), state);
// Now complete the workflow by sending an event
given()
.queryParam("orderId", "123")
.queryParam("decision", false)
.when()
.post("/wfp/suspendresume/continue")
.then()
.statusCode(200).body("approved", equalTo(false));
}
}