Add external events, use flush true for print statements

Signed-off-by: Marc Duiker <marcduiker@users.noreply.github.com>
This commit is contained in:
Marc Duiker 2025-05-08 10:22:44 +02:00
parent 5324c1621f
commit cd27d4d8d0
No known key found for this signature in database
GPG Key ID: 6A36EA7754473DD7
11 changed files with 269 additions and 8 deletions

View File

@ -0,0 +1,126 @@
# External Events
This tutorial demonstrates how to author a workflow where the workflow will wait until it receives an external event. This pattern is often applied in workflows that require an approval step. For more information about the external system interaction pattern see the [Dapr docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-patterns/#external-system-interaction).
## Inspect the code
Open the `external_events_workflow.py` file in the `tutorials/workflow/python/external-system-interaction/external_events` folder. This file contains the definition for the workflow. It is an order workflow that requests an external approval if the order has a total price greater than 250 dollars.
```mermaid
graph LR
SW((Start
Workflow))
IF1{Is TotalPrice
> 250?}
IF2{Is Order Approved
or TotalPrice < 250?}
WAIT[Wait for
approval event]
EX{Event
received?}
PO[Process Order]
SN[Send Notification]
EW((End
Workflow))
SW --> IF1
IF1 -->|Yes| WAIT
IF1 -->|No| IF2
EX -->|Yes| IF2
EX -->|No| SN
WAIT --> EX
IF2 -->|Yes| PO
PO --> SN
IF2 -->|No| SN
SN --> EW
```
## Run the tutorial
1. Use a terminal to navigate to the `tutorials/workflow/python/external-system-interaction/external_events` folder.
2. Install the dependencies using pip:
```bash
pip3 install -r requirements.txt
```
3. Navigate one level back to the `external-system-interaction` folder and use the Dapr CLI to run the Dapr Multi-App run file
<!-- STEP
name: Run multi app run template
expected_stdout_lines:
- 'Started Dapr with app id "externalevents"'
expected_stderr_lines:
working_dir: .
output_match_mode: substring
background: true
sleep: 15
timeout_seconds: 30
-->
```bash
dapr run -f .
```
<!-- END_STEP -->
4. Use the POST request in the [`externalevents.http`](./externalevents.http) file to start the workflow, or use this cURL command:
```bash
curl -i --request POST \
--url http://localhost:5258/start \
--header 'content-type: application/json' \
--data '{"id": "b7dd836b-e913-4446-9912-d400befebec5","description": "Rubber ducks","quantity": 100,"totalPrice": 500}'
```
The input for the workflow is an `Order` object:
```json
{
"id": "{{orderId}}",
"description": "Rubber ducks",
"quantity": 100,
"totalPrice": 500
}
```
5. Use the GET request in the [`externalevents.http`](./externalevents.http) file to get the status of the workflow, or use this cURL command:
```bash
curl --request GET --url http://localhost:3558/v1.0/workflows/dapr/b7dd836b-e913-4446-9912-d400befebec5
```
The workflow should still be running since it is waiting for an external event.
The app logs should look similar to the following:
```text
== APP - externalevents == Received order: Order { Id = b7dd836b-e913-4446-9912-d400befebec5, Description = Rubber ducks, Quantity = 100, TotalPrice = 500 }.
```
6. Use the POST request in the [`externalevents.http`](./externalevents.http) file to send an `approval-event` to the workflow, or use this cURL command:
```bash
curl -i --request POST \
--url http://localhost:3558/v1.0/workflows/dapr/b7dd836b-e913-4446-9912-d400befebec5/raiseEvent/approval-event \
--header 'content-type: application/json' \
--data '{"order_id": "b7dd836b-e913-4446-9912-d400befebec5","is_approved": true}'
```
The payload for the event is an `ApprovalStatus` object:
```json
{
"order_id": "{{instanceId}}",
"is_approved": true
}
```
*The workflow will only wait for the external approval event for 2 minutes before timing out. In this case you will need to start a new order workflow instance.*
7. Again use the GET request in the [`externalevents.http`](./externalevents.http) file to get the status of the workflow. The workflow should now be completed.
The expected serialized output of the workflow is:
```txt
"\"Order b7dd836b-e913-4446-9912-d400befebec5 has been approved.\""
```
8. Stop the Dapr Multi-App run process by pressing `Ctrl+C`.

View File

@ -0,0 +1,11 @@
version: 1
common:
resourcesPath: ../../resources
apps:
- appID: externalevents
appDirPath: external_events
appPort: 5258
daprHTTPPort: 3558
command: ["python3", "app.py"]
appLogDestination: console
daprdLogDestination: console

View File

@ -0,0 +1,29 @@
from fastapi import FastAPI, status
from contextlib import asynccontextmanager
from external_events_workflow import wf_runtime, external_events_workflow, Order
import dapr.ext.workflow as wf
import uvicorn
@asynccontextmanager
async def lifespan(app: FastAPI):
wf_runtime.start()
yield
wf_runtime.shutdown()
app = FastAPI(lifespan=lifespan)
@app.post("/start", status_code=status.HTTP_202_ACCEPTED)
async def start_workflow(order: Order):
"""
The DaprWorkflowClient is the API to manage workflows.
Here it is used to schedule a new workflow instance.
"""
wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
workflow=external_events_workflow,
input=order
)
return {"instance_id": instance_id}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5258, log_level="debug")

View File

@ -0,0 +1,61 @@
from dataclasses import dataclass
from datetime import timedelta
import dapr.ext.workflow as wf
wf_runtime = wf.WorkflowRuntime()
@dataclass
class ApprovalStatus:
order_id: str
is_approved: bool
@staticmethod
def from_dict(dict):
return ApprovalStatus(**dict)
@dataclass
class Order:
id: str
description: str
quantity: int
total_price: float
@wf_runtime.workflow(name='external_events_workflow')
def external_events_workflow(ctx: wf.DaprWorkflowContext, order: Order):
approval_status = ApprovalStatus(order.id, True)
if order.total_price > 250:
yield ctx.call_activity(request_approval, input=order)
approval_status_task = ctx.wait_for_external_event(name='approval-event')
timeout_task = ctx.create_timer(fire_at=timedelta(minutes=2))
winner = yield wf.when_any([approval_status_task, timeout_task])
if winner == timeout_task:
notification_message = f"Approval request for order {order.id} timed out."
yield ctx.call_activity(send_notification, input=order)
return notification_message
approval_status = ApprovalStatus.from_dict(approval_status_task.get_result())
if approval_status.is_approved:
yield ctx.call_activity(process_order, input=order)
notification_message = f"Order {order.id} has been approved." if approval_status.is_approved else f"Order {order.id} has been rejected."
yield ctx.call_activity(send_notification, input=notification_message)
return notification_message
@wf_runtime.activity(name='request_approval')
def request_approval(ctx: wf.WorkflowActivityContext, order: Order) -> None:
print(f'request_approval: Request approval for order: {order.id}.', flush=True)
# Imagine an approval request being sent to another system
@wf_runtime.activity(name='send_notification')
def send_notification(ctx: wf.WorkflowActivityContext, message: str) -> None:
print(f'send_notification: {message}.', flush=True)
# Imagine a notification being sent to the user
@wf_runtime.activity(name='process_order')
def process_order(ctx: wf.WorkflowActivityContext, order: Order) -> None:
print(f'process_order: Processed order: {order.id}.', flush=True)
# Imagine the order being processed by another system

View File

@ -0,0 +1,3 @@
dapr>=1.15.0
dapr-ext-workflow>=1.15.0
fastapi>=0.115.0

View File

@ -0,0 +1,30 @@
@apphost=http://localhost:5258
### Start the external_events_workflow
# @name startWorkflowRequest
@orderId={{$guid}}
POST {{ apphost }}/start
Content-Type: application/json
{
"id": "{{orderId}}",
"description": "Rubber ducks",
"quantity": 100,
"total_price": 500
}
### Get the workflow status
@instanceId={{startWorkflowRequest.response.body.instance_id}}
@daprHost=http://localhost:3558
GET {{ daprHost }}/v1.0/workflows/dapr/{{ instanceId }}
### Send an approval event
@eventName=approval-event
POST {{ daprHost }}/v1.0/workflows/dapr/{{ instanceId }}/raiseEvent/{{ eventName }}
Content-Type: application/json
{
"order_id": "{{instanceId}}",
"is_approved": true
}

View File

@ -0,0 +1,2 @@
include ../../../../docker.mk
include ../../../../validate.mk

View File

@ -22,5 +22,5 @@ def fanoutfanin_workflow(ctx: wf.DaprWorkflowContext, words: List[str]):
@wf_runtime.activity(name='get_word_length')
def get_word_length(ctx: wf.WorkflowActivityContext, word: str) -> WordLength:
print(f'get_word_length: Received input: {word}.')
print(f'get_word_length: Received input: {word}.', flush=True)
return WordLength(word=word, length=len(word))

View File

@ -30,10 +30,10 @@ There can only be one input parameter. Use a class if multiple input values are
"""
@wf_runtime.activity(name='activity1')
def activity1(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity1: Received input: {act_input}.')
print(f'activity1: Received input: {act_input}.', flush=True)
return f"{act_input} Two"
@wf_runtime.activity(name='activity2')
def activity2(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity2: Received input: {act_input}.')
print(f'activity2: Received input: {act_input}.', flush=True)
return f"{act_input} Three"

View File

@ -23,6 +23,6 @@ def monitor_workflow(ctx: wf.DaprWorkflowContext, counter: int):
@wf_runtime.activity(name='check_status')
def check_status(ctx: wf.WorkflowActivityContext, act_input: int) -> Status:
print(f'check_status: Received input: {act_input}.')
print(f'check_status: Received input: {act_input}.', flush=True)
isReady = True if random.randint(0, act_input) > 1 else False
return Status(isReady=isReady)

View File

@ -1,4 +1,3 @@
import dapr.ext.workflow as wf
wf_runtime = wf.WorkflowRuntime()
@ -12,15 +11,15 @@ def chaining_workflow(ctx: wf.DaprWorkflowContext, wf_input: str):
@wf_runtime.activity(name='activity1')
def activity1(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity1: Received input: {act_input}.')
print(f'activity1: Received input: {act_input}.', flush=True)
return f"{act_input} is"
@wf_runtime.activity(name='activity2')
def activity2(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity2: Received input: {act_input}.')
print(f'activity2: Received input: {act_input}.', flush=True)
return f"{act_input} task"
@wf_runtime.activity(name='activity3')
def activity3(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity3: Received input: {act_input}.')
print(f'activity3: Received input: {act_input}.', flush=True)
return f"{act_input} chaining"