Add workflow management

Signed-off-by: Marc Duiker <marcduiker@users.noreply.github.com>
This commit is contained in:
Marc Duiker 2025-05-08 11:59:26 +02:00
parent fee1a022f8
commit 00a4e38743
No known key found for this signature in database
GPG Key ID: 6A36EA7754473DD7
9 changed files with 158 additions and 2 deletions

View File

@ -13,7 +13,7 @@ For more information on workflow management, see the [Dapr docs](https://docs.da
## Inspect the code
Open the `Program.cs` file in the `tutorials/workflow/csharp/child-workflows/WorkflowManagement` folder. This file contains the endpoint definitions that use the workflow management API. The workflow that is being managed is named `NeverEndingWorkflow` and is a counter that will keep running once it's started.
Open the `Program.cs` file in the `tutorials/workflow/csharp/workflow-management/WorkflowManagement` folder. This file contains the endpoint definitions that use the workflow management API. The workflow that is being managed is named `NeverEndingWorkflow` and is a counter that will keep running once it's started.
## Run the tutorial

View File

@ -1,4 +1,3 @@
from dataclasses import dataclass
from datetime import timedelta
import random

View File

@ -0,0 +1,51 @@
# Workflow Management
This tutorial demonstrates the various APIs to manage a workflow instance, these methods include:
- Scheduling a workflow instance
- Getting the workflow instance state
- Suspending the workflow instance
- Resuming the workflow instance
- Terminating the workflow instance
- Purging the workflow instance
For more information on workflow management, see the [Dapr docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/howto-manage-workflow/).
## Inspect the code
Open the `app.py` file in the `tutorials/workflow/python/workflow-management/workflow_-_management/workflow_management` folder. This file contains the endpoint definitions that use the workflow management API. The workflow that is being managed is named `never_ending_workflow` and is a counter that will keep running once it's started.
## Run the tutorial
1. Use a terminal to navigate to the `tutorials/workflow/python/workflow-management/workflow-management` folder.
2. Install the dependencies using pip:
```bash
pip3 install -r requirements.txt
```
3. Navigate one level back to the `workflow-management` folder and use the Dapr CLI to run the Dapr Multi-App run file
<!-- STEP
name: Run multi app run template
expected_stdout_lines:
- 'Started Dapr with app id "neverendingworkflow"'
expected_stderr_lines:
working_dir: .
output_match_mode: substring
background: true
sleep: 15
timeout_seconds: 30
-->
```bash
dapr run -f .
```
<!-- END_STEP -->
4. Use the first POST request in the [`workflowmanagement.http`](./workflowmanagement.http) file to start the workflow.
5. Use other requests in the [`workflowmanagement.http`](./workflowmanagement.http) file to perform other actions on the workflow, such as:
- Getting the workflow instance state.
- Suspending & resuming the workflow instance.
- Terminating the workflow instance.
- Purging the workflow instance.
6. Stop the Dapr Multi-App run process by pressing `Ctrl+C`.

View File

@ -0,0 +1,11 @@
version: 1
common:
resourcesPath: ../../resources
apps:
- appID: neverendingworkflow
appDirPath: workflow_management
appPort: 5262
daprHTTPPort: 3562
command: ["python3", "app.py"]
appLogDestination: console
daprdLogDestination: console

View File

@ -0,0 +1,2 @@
include ../../../../docker.mk
include ../../../../validate.mk

View File

@ -0,0 +1,46 @@
from fastapi import FastAPI, status
from contextlib import asynccontextmanager
from never_ending_workflow import wf_runtime, never_ending_workflow
import dapr.ext.workflow as wf
import uvicorn
@asynccontextmanager
async def lifespan(app: FastAPI):
wf_runtime.start()
yield
wf_runtime.shutdown()
app = FastAPI(lifespan=lifespan)
wf_client = wf.DaprWorkflowClient()
@app.post("/start/{counter}", status_code=status.HTTP_202_ACCEPTED)
async def start_workflow(counter: int):
instance_id = wf_client.schedule_new_workflow(
workflow=never_ending_workflow,
input=counter
)
return {"instance_id": instance_id}
@app.get("/status/{instance_id}")
async def get_status(instance_id: str):
wf_status = wf_client.get_workflow_state(instance_id)
return wf_status
@app.post("/suspend/{instance_id}", status_code=status.HTTP_202_ACCEPTED)
async def suspend_workflow(instance_id: str):
wf_client.pause_workflow(instance_id);
@app.post("/resume/{instance_id}", status_code=status.HTTP_202_ACCEPTED)
async def resume_workflow(instance_id: str):
wf_client.resume_workflow(instance_id);
@app.post("/terminate/{instance_id}", status_code=status.HTTP_202_ACCEPTED)
async def terminate_workflow(instance_id: str):
wf_client.terminate_workflow(instance_id);
@app.delete("/purge/{instance_id}", status_code=status.HTTP_202_ACCEPTED)
async def purge_workflow(instance_id: str):
wf_client.purge_workflow(instance_id);
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5262)

View File

@ -0,0 +1,18 @@
from datetime import timedelta
import dapr.ext.workflow as wf
wf_runtime = wf.WorkflowRuntime()
@wf_runtime.workflow(name='never_ending_workflow')
def never_ending_workflow(ctx: wf.DaprWorkflowContext, counter: int):
yield ctx.call_activity(send_notification, input=counter)
yield ctx.create_timer(fire_at=timedelta(seconds=2))
counter += 1
yield ctx.continue_as_new(counter)
return True
@wf_runtime.activity(name='send_notification')
def send_notification(ctx: wf.WorkflowActivityContext, counter: int) -> None:
print(f'send_notification: Received input: {counter}.', flush=True)
# Imagine that a notification is sent to the user here.

View File

@ -0,0 +1,3 @@
dapr>=1.15.0
dapr-ext-workflow>=1.15.0
fastapi>=0.115.0

View File

@ -0,0 +1,26 @@
@apphost=http://localhost:5262
### Start the never_ending_workflow
# @name startWorkflowRequest
@counter=0
POST {{ apphost }}/start/{{counter}}
### Get the workflow status via the Dapr API
@instanceId={{startWorkflowRequest.response.body.instance_id}}
@daprHost=http://localhost:3562
GET {{ daprHost }}/v1.0/workflows/dapr/{{ instanceId }}
### Get the workflow status via the application
GET {{ apphost }}/status/{{ instanceId }}
### Suspend the workflow
POST {{ apphost }}/suspend/{{ instanceId }}
### Resume the workflow
POST {{ apphost }}/resume/{{ instanceId }}
### Terminate the workflow
POST {{ apphost }}/terminate/{{ instanceId }}
### Purge the workflow
DELETE {{ apphost }}/purge/{{ instanceId }}