Change import, add task chaining and fan-in fan-out

Signed-off-by: Marc Duiker <marcduiker@users.noreply.github.com>
This commit is contained in:
Marc Duiker 2025-05-07 14:13:51 +02:00
parent 6b06b9c52d
commit 50f15e4d65
No known key found for this signature in database
GPG Key ID: 6A36EA7754473DD7
17 changed files with 341 additions and 8 deletions

View File

@ -0,0 +1,102 @@
# Fan-out/Fan-in
This tutorial demonstrates how to author a workflow where multiple independent tasks can be scheduled and executed simultaneously. The workflow can either wait until all tasks are completed to proceed, or continue when the fastest task is completed. For more information about the fan-out/fan-in pattern see the [Dapr docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-patterns/#fan-outfan-in).
## Inspect the code
Open the `fanoutfanin_workflow.py` file in the `tutorials/workflow/python/fan-out-fan-in/fan_out_fan_in` folder. This file contains the definition for the workflow.
```mermaid
graph LR
SW((Start
Workflow))
subgraph for each word in the input
GWL[get_word_length]
end
SHORT[Select the
shortest word]
ALL[Wait until all tasks
are completed]
EW((End
Workflow))
SW --> GWL
GWL --> ALL
ALL --> SHORT
SHORT --> EW
```
## Run the tutorial
1. Use a terminal to navigate to the `tutorials/workflow/python/fan-out-fan-in/fan_out_fan_in` folder.
2. Install the dependencies using pip:
```bash
pip3 install -r requirements.txt
```
3. Navigate back one level to the `fan-out-fan-in` folder and use the Dapr CLI to run the Dapr Multi-App run file
<!-- STEP
name: Run multi app run template
expected_stdout_lines:
- 'Started Dapr with app id "fanoutfanin"'
expected_stderr_lines:
working_dir: .
output_match_mode: substring
background: true
sleep: 15
timeout_seconds: 30
-->
```bash
dapr run -f .
```
<!-- END_STEP -->
4. Use the POST request in the [`fanoutfanin.http`](./fanoutfanin.http) file to start the workflow, or use this cURL command:
```bash
curl -i --request POST \
--url http://localhost:5256/start \
--header 'content-type: application/json' \
--data '["which","word","is","the","shortest"]'
```
The input for the workflow is an array of strings:
```json
[
"which",
"word",
"is",
"the",
"shortest"
]
```
The expected app logs are as follows:
```text
== APP - fanoutfanin == GetWordLength: Received input: word.
== APP - fanoutfanin == GetWordLength: Received input: is.
== APP - fanoutfanin == GetWordLength: Received input: the.
== APP - fanoutfanin == GetWordLength: Received input: shortest.
== APP - fanoutfanin == GetWordLength: Received input: which.
```
> Note that the order of the logs may vary.
5. Use the GET request in the [`fanoutfanin.http`](./fanoutfanin.http) file to get the status of the workflow, or use this cURL command:
```bash
curl --request GET --url http://localhost:3556/v1.0/workflows/dapr/<INSTANCEID>
```
Where `<INSTANCEID>` is the workflow instance ID you received in the `instance_id` property in the previous step.
The expected serialized output of the workflow is:
```txt
"\"is\""
```
6. Stop the Dapr Multi-App run process by pressing `Ctrl+C`.

View File

@ -0,0 +1,11 @@
version: 1
common:
resourcesPath: ../../resources
apps:
- appID: fanoutfanin
appDirPath: fan_out_fan_in
appPort: 5256
daprHTTPPort: 3556
command: ["python3", "app.py"]
appLogDestination: console
daprdLogDestination: console

View File

@ -0,0 +1,20 @@
from fastapi import FastAPI, status
from fanoutfanin_workflow import wf_runtime, fanoutfanin_workflow
from typing import List
import dapr.ext.workflow as wf
import uvicorn
app = FastAPI()
@app.post("/start", status_code=status.HTTP_202_ACCEPTED)
async def start_workflow(words: List[str]):
wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
workflow=fanoutfanin_workflow,
input=words
)
return {"instance_id": instance_id}
if __name__ == "__main__":
wf_runtime.start()
uvicorn.run(app, host="0.0.0.0", port=5256)

View File

@ -0,0 +1,26 @@
from dataclasses import dataclass
from typing import List
import dapr.ext.workflow as wf
wf_runtime = wf.WorkflowRuntime()
@dataclass
class WordLength:
word: str
length: int
@wf_runtime.workflow(name='fanoutfanin_workflow')
def fanoutfanin_workflow(ctx: wf.DaprWorkflowContext, words: List[str]):
tasks = [
ctx.call_activity(get_word_length, input=word) for word in words
]
all_word_lengths = yield wf.when_all(tasks)
sorted_word_lengths = sorted(all_word_lengths, key=lambda x: x.length)
shortest_word = sorted_word_lengths[0]
return shortest_word.word
@wf_runtime.activity(name='get_word_length')
def get_word_length(ctx: wf.WorkflowActivityContext, word: str) -> WordLength:
print(f'get_word_length: Received input: {word}.')
return WordLength(word=word, length=len(word))

View File

@ -0,0 +1,2 @@
dapr>=1.15.0
dapr-ext-workflow>=1.15.0

View File

@ -0,0 +1,19 @@
@apphost=http://localhost:5256
### Start the FanOutFanIn workflow
# @name startWorkflowRequest
POST {{ apphost }}/start
Content-Type: application/json
[
"which",
"word",
"is",
"the",
"shortest"
]
### Get the workflow status
@instanceId={{startWorkflowRequest.response.body.instance_id}}
@daprHost=http://localhost:3556
GET {{ daprHost }}/v1.0/workflows/dapr/{{ instanceId }}

View File

@ -0,0 +1,2 @@
include ../../../../docker.mk
include ../../../../validate.mk

View File

@ -69,7 +69,7 @@ graph LR
curl --request GET --url http://localhost:3554/v1.0/workflows/dapr/<INSTANCEID>
```
Where `<INSTANCEID>` is the workflow instance ID you received in the `Location` header in the previous step.
Where `<INSTANCEID>` is the workflow instance ID you received in the `instance_id` property in the previous step.
The expected serialized output of the workflow is:

View File

@ -1,6 +1,6 @@
from fastapi import FastAPI, status
from dapr.ext.workflow import DaprWorkflowClient
from basic_workflow import wf_runtime, basic_workflow
import dapr.ext.workflow as wf
import uvicorn
app = FastAPI()
@ -11,7 +11,7 @@ async def start_workflow(input: str):
The DaprWorkflowClient is the API to manage workflows.
Here it is used to schedule a new workflow instance.
"""
wf_client = DaprWorkflowClient()
wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
workflow=basic_workflow,
input=input

View File

@ -1,7 +1,7 @@
from dapr.ext.workflow import DaprWorkflowContext, WorkflowActivityContext, WorkflowRuntime
import dapr.ext.workflow as wf
wf_runtime = WorkflowRuntime()
wf_runtime = wf.WorkflowRuntime()
"""
Workflows orchestrate activities and other (child)workflows, and include business logic (if/else or switch statements).
@ -13,7 +13,7 @@ about the workflow instance, and methods to call activities.
The second argument (`wf_input`) is the input to the workflow. It can be a simple or complex type.
"""
@wf_runtime.workflow(name='basic_workflow')
def basic_workflow(ctx: DaprWorkflowContext, wf_input: str):
def basic_workflow(ctx: wf.DaprWorkflowContext, wf_input: str):
result1 = yield ctx.call_activity(activity1, input=wf_input)
result2 = yield ctx.call_activity(activity2, input=result1)
return result2
@ -29,11 +29,11 @@ The second argument (`act_input`) is the input parameter for the activity.
There can only be one input parameter. Use a class if multiple input values are required.
"""
@wf_runtime.activity(name='activity1')
def activity1(ctx: WorkflowActivityContext, act_input: str) -> str:
def activity1(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity1: Received input: {act_input}.')
return f"{act_input} Two"
@wf_runtime.activity(name='activity2')
def activity2(ctx: WorkflowActivityContext, act_input: str) -> str:
def activity2(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity2: Received input: {act_input}.')
return f"{act_input} Three"

View File

@ -0,0 +1,80 @@
# Task Chaining Pattern
This tutorial demonstrates how to chain multiple tasks together as a sequence in a workflow. For more information about the task chaining pattern see the [Dapr docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-patterns/#task-chaining).
## Inspect the code
Open the `chaining_workflow.py` file in the `tutorials/workflow/python/task-chaining/task_chaining` folder. This file contains the definition for the workflow.
```mermaid
graph LR
SW((Start
Workflow))
A1[activity1]
A2[activity2]
A3[activity3]
EW((End
Workflow))
SW --> A1
A1 --> A2
A2 --> A3
A3 --> EW
```
## Run the tutorial
1. Use a terminal to navigate to the `tutorials/workflow/python/task-chaining/task_chaining` folder.
2. Install the dependencies using pip:
```bash
pip3 install -r requirements.txt
```
3. Navigate back one level to the `task-chaining` folder and use the Dapr CLI to run the Dapr Multi-App run file
<!-- STEP
name: Run multi app run template
expected_stdout_lines:
- 'Started Dapr with app id "chaining"'
expected_stderr_lines:
working_dir: .
output_match_mode: substring
background: true
sleep: 15
timeout_seconds: 30
-->
```bash
dapr run -f .
```
<!-- END_STEP -->
4. Use the POST request in the [`chaining.http`](./chaining.http) file to start the workflow, or use this cURL command:
```bash
curl -i --request POST http://localhost:5255/start
```
The input for the workflow is a string with the value `This`. The expected app logs are as follows:
```text
== APP - chaining == Activity1: Received input: This.
== APP - chaining == Activity2: Received input: This is.
== APP - chaining == Activity3: Received input: This is task.
```
5. Use the GET request in the [`chaining.http`](./chaining.http) file to get the status of the workflow, or use this cURL command:
```bash
curl --request GET --url http://localhost:3555/v1.0/workflows/dapr/<INSTANCEID>
```
Where `<INSTANCEID>` is the workflow instance ID you received in the `instance_id` property in the previous step.
The expected serialized output of the workflow is:
```txt
"\"This is task chaining\""
```
6. Stop the Dapr Multi-App run process by pressing `Ctrl+C`.

View File

@ -0,0 +1,11 @@
@apphost=http://localhost:5255
### Start the TaskChaining workflow
# @name startWorkflowRequest
POST {{ apphost }}/start
@instanceId={{startWorkflowRequest.response.body.instance_id}}
@daprHost=http://localhost:3555
### Get the workflow status
GET {{ daprHost }}/v1.0/workflows/dapr/{{ instanceId }}

View File

@ -0,0 +1,11 @@
version: 1
common:
resourcesPath: ../../resources
apps:
- appID: chaining
appDirPath: task_chaining
appPort: 5255
daprHTTPPort: 3555
command: ["python3", "app.py"]
appLogDestination: console
daprdLogDestination: console

View File

@ -0,0 +1,2 @@
include ../../../../docker.mk
include ../../../../validate.mk

View File

@ -0,0 +1,19 @@
from fastapi import FastAPI, status
from chaining_workflow import wf_runtime, chaining_workflow
import dapr.ext.workflow as wf
import uvicorn
app = FastAPI()
@app.post("/start", status_code=status.HTTP_202_ACCEPTED)
async def start_workflow():
wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
workflow=chaining_workflow,
input="This"
)
return {"instance_id": instance_id}
if __name__ == "__main__":
wf_runtime.start()
uvicorn.run(app, host="0.0.0.0", port=5255)

View File

@ -0,0 +1,26 @@
import dapr.ext.workflow as wf
wf_runtime = wf.WorkflowRuntime()
@wf_runtime.workflow(name='chaining_workflow')
def chaining_workflow(ctx: wf.DaprWorkflowContext, wf_input: str):
result1 = yield ctx.call_activity(activity1, input=wf_input)
result2 = yield ctx.call_activity(activity2, input=result1)
wf_result = yield ctx.call_activity(activity3, input=result2)
return wf_result
@wf_runtime.activity(name='activity1')
def activity1(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity1: Received input: {act_input}.')
return f"{act_input} is"
@wf_runtime.activity(name='activity2')
def activity2(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity2: Received input: {act_input}.')
return f"{act_input} task"
@wf_runtime.activity(name='activity3')
def activity3(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity3: Received input: {act_input}.')
return f"{act_input} chaining"

View File

@ -0,0 +1,2 @@
dapr>=1.15.0
dapr-ext-workflow>=1.15.0