Use lifespan for wf_runtime, add monitor

Signed-off-by: Marc Duiker <marcduiker@users.noreply.github.com>
This commit is contained in:
Marc Duiker 2025-05-07 18:35:49 +02:00
parent 50f15e4d65
commit f74eef6442
No known key found for this signature in database
GPG Key ID: 6A36EA7754473DD7
15 changed files with 213 additions and 17 deletions

View File

@ -7,5 +7,6 @@ apps:
appPort: 5256
daprHTTPPort: 3556
command: ["dotnet", "run"]
appLogDestination: console
daprdLogDestination: console
appLogDestination: fileAndConsole
daprdLogDestination: fileAndConsole
logLevel: debug

View File

@ -3,9 +3,9 @@ common:
resourcesPath: ../../resources
apps:
- appID: monitor
appDirPath: Monitor
appDirPath: monitor
appPort: 5257
daprHTTPPort: 3557
command: ["dotnet", "run"]
command: ["python3", "app.py"]
appLogDestination: console
daprdLogDestination: console

View File

@ -1,10 +1,17 @@
from fastapi import FastAPI, status
from contextlib import asynccontextmanager
from fanoutfanin_workflow import wf_runtime, fanoutfanin_workflow
from typing import List
import dapr.ext.workflow as wf
import uvicorn
app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
wf_runtime.start()
yield
wf_runtime.shutdown()
app = FastAPI(lifespan=lifespan)
@app.post("/start", status_code=status.HTTP_202_ACCEPTED)
async def start_workflow(words: List[str]):
@ -16,5 +23,5 @@ async def start_workflow(words: List[str]):
return {"instance_id": instance_id}
if __name__ == "__main__":
wf_runtime.start()
uvicorn.run(app, host="0.0.0.0", port=5256)
uvicorn.run(app, host="0.0.0.0", port=5256, log_level="debug")

View File

@ -1,9 +1,16 @@
from fastapi import FastAPI, status
from contextlib import asynccontextmanager
from basic_workflow import wf_runtime, basic_workflow
import dapr.ext.workflow as wf
import uvicorn
app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
wf_runtime.start()
yield
wf_runtime.shutdown()
app = FastAPI(lifespan=lifespan)
@app.post("/start/{input}", status_code=status.HTTP_202_ACCEPTED)
async def start_workflow(input: str):
@ -19,5 +26,4 @@ async def start_workflow(input: str):
return {"instance_id": instance_id}
if __name__ == "__main__":
wf_runtime.start()
uvicorn.run(app, host="0.0.0.0", port=5254)
uvicorn.run(app, host="0.0.0.0", port=5254, log_level="debug")

View File

@ -1,6 +1,8 @@
import logging
import dapr.ext.workflow as wf
logger = logging.getLogger(__name__)
wf_runtime = wf.WorkflowRuntime()
"""
@ -30,10 +32,12 @@ There can only be one input parameter. Use a class if multiple input values are
"""
@wf_runtime.activity(name='activity1')
def activity1(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity1: Received input: {act_input}.')
logger.info(f'activity1: Received input: {act_input}.')
#print(f'activity1: Received input: {act_input}.')
return f"{act_input} Two"
@wf_runtime.activity(name='activity2')
def activity2(ctx: wf.WorkflowActivityContext, act_input: str) -> str:
print(f'activity2: Received input: {act_input}.')
logger.info(f'activity2: Received input: {act_input}.')
#print(f'activity2: Received input: {act_input}.')
return f"{act_input} Three"

View File

@ -1,2 +1,3 @@
dapr>=1.15.0
dapr-ext-workflow>=1.15.0
dapr-ext-workflow>=1.15.0
fastapi>=0.115.0

View File

@ -0,0 +1,89 @@
# Monitor Pattern
This tutorial demonstrates how to run a workflow in a loop. This can be used for recurring tasks that need to be executed on a certain frequency (e.g. a clean-up job that runs every hour). For more information on the monitor pattern see the [Dapr docs](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-patterns/#monitor).
## Inspect the code
Open the `monitor_workflow.py` file in the `tutorials/workflow/python/monitor-pattern/monitor` folder. This file contains the definition for the workflow that calls the `check_status` activity to checks to see if a fictional resource is ready. The `check_status` activity uses a random number generator to simulate the status of the resource. If the status is not ready, the workflow will wait for one second and is continued as a new instance.
```mermaid
graph LR
SW((Start
Workflow))
CHECK[check_status]
IF{Is Ready}
TIMER[Wait for a period of time]
NEW[Continue as a new instance]
EW((End
Workflow))
SW --> CHECK
CHECK --> IF
IF -->|Yes| EW
IF -->|No| TIMER
TIMER --> NEW
NEW --> SW
```
## Run the tutorial
1. Use a terminal to navigate to the `tutorials/workflow/python/monitor-pattern/monitor` folder.
2. Install the dependencies using pip:
```bash
pip3 install -r requirements.txt
```
3. Navigate one level up to the `monitor-pattern` folder and use the Dapr CLI to run the Dapr Multi-App run file
<!-- STEP
name: Run multi app run template
expected_stdout_lines:
- 'Started Dapr with app id "monitor"'
expected_stderr_lines:
working_dir: .
output_match_mode: substring
background: true
sleep: 15
timeout_seconds: 30
-->
```bash
dapr run -f .
```
<!-- END_STEP -->
4. Use the POST request in the [`monitor.http`](./monitor.http) file to start the workflow, or use this cURL command:
```bash
curl -i --request POST http://localhost:5257/start/0
```
The input for the workflow is an integer with the value `0`.
The expected app logs are as follows:
```text
== APP - monitor == CheckStatus: Received input: 0.
== APP - monitor == CheckStatus: Received input: 1.
== APP - monitor == CheckStatus: Received input: 2.
== APP - monitor == CheckStatus: Received input: 3.
```
*Note that the number of app log statements can vary due to the randomization in the `CheckStatus` activity.*
5. Use the GET request in the [`monitor.http`](./monitor.http) file to get the status of the workflow, or use this cURL command:
```bash
curl --request GET --url http://localhost:3557/v1.0/workflows/dapr/<INSTANCEID>
```
Where `<INSTANCEID>` is the workflow instance ID you received in the `Location` header in the previous step.
The expected serialized output of the workflow is:
```txt
"\"Status is healthy after checking 3 times.\""
```
*The actual number of checks can vary since some randomization is used in the `CheckStatus` activity.*
6. Stop the Dapr Multi-App run process by pressing `Ctrl+C`.

View File

@ -0,0 +1,11 @@
version: 1
common:
resourcesPath: ../../resources
apps:
- appID: monitor
appDirPath: monitor
appPort: 5257
daprHTTPPort: 3557
command: ["python3", "app.py"]
appLogDestination: console
daprdLogDestination: console

View File

@ -0,0 +1,2 @@
include ../../../../docker.mk
include ../../../../validate.mk

View File

@ -0,0 +1,12 @@
@apphost=http://localhost:5257
### Start the Monitor workflow
# @name startWorkflowRequest
@counter=0
POST {{ apphost }}/start/{{counter}}
### Get the workflow status
@instanceId={{startWorkflowRequest.response.body.instance_id}}
@daprHost=http://localhost:3557
GET {{ daprHost }}/v1.0/workflows/dapr/{{ instanceId }}

View File

@ -0,0 +1,25 @@
from fastapi import FastAPI, status
from contextlib import asynccontextmanager
from monitor_workflow import wf_runtime, monitor_workflow
import dapr.ext.workflow as wf
import uvicorn
@asynccontextmanager
async def lifespan(app: FastAPI):
wf_runtime.start()
yield
wf_runtime.shutdown()
app = FastAPI(lifespan=lifespan)
@app.post("/start/{counter}", status_code=status.HTTP_202_ACCEPTED)
async def start_workflow(counter: int):
wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(
workflow=monitor_workflow,
input=counter
)
return {"instance_id": instance_id}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=5257, log_level="debug")

View File

@ -0,0 +1,28 @@
from dataclasses import dataclass
from datetime import timedelta
import random
import dapr.ext.workflow as wf
wf_runtime = wf.WorkflowRuntime()
@dataclass
class Status:
isReady: bool
@wf_runtime.workflow(name='monitor_workflow')
def monitor_workflow(ctx: wf.DaprWorkflowContext, counter: int):
status = yield ctx.call_activity(check_status, input=counter)
if not status.isReady:
yield ctx.create_timer(fire_at=timedelta(seconds=2))
counter += 1
yield ctx.continue_as_new(counter)
return f"Status is healthy after {counter} times."
@wf_runtime.activity(name='check_status')
def check_status(ctx: wf.WorkflowActivityContext, act_input: int) -> Status:
print(f'check_status: Received input: {act_input}.')
isReady = True if random.randint(0, act_input) > 1 else False
return Status(isReady=isReady)

View File

@ -0,0 +1,3 @@
dapr>=1.15.0
dapr-ext-workflow>=1.15.0
fastapi>=0.115.0

View File

@ -1,9 +1,16 @@
from fastapi import FastAPI, status
from contextlib import asynccontextmanager
from chaining_workflow import wf_runtime, chaining_workflow
import dapr.ext.workflow as wf
import uvicorn
app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
wf_runtime.start()
yield
wf_runtime.shutdown()
app = FastAPI(lifespan=lifespan)
@app.post("/start", status_code=status.HTTP_202_ACCEPTED)
async def start_workflow():
@ -15,5 +22,4 @@ async def start_workflow():
return {"instance_id": instance_id}
if __name__ == "__main__":
wf_runtime.start()
uvicorn.run(app, host="0.0.0.0", port=5255)
uvicorn.run(app, host="0.0.0.0", port=5255, log_level="debug")

View File

@ -1,2 +1,3 @@
dapr>=1.15.0
dapr-ext-workflow>=1.15.0
dapr-ext-workflow>=1.15.0
fastapi>=0.115.0