updated cookbook notebooks to show sync and async with workflow monitoring

This commit is contained in:
Roberto Rodriguez 2025-04-22 11:57:31 -04:00
parent 41f234ca33
commit e8d96e186d
8 changed files with 132 additions and 7 deletions

View File

@ -1,6 +1,7 @@
import logging
from dapr_agents.workflow import WorkflowApp, workflow, task
from dapr_agents.types import DaprWorkflowContext
import logging
@workflow(name='random_workflow')
def task_chain_workflow(ctx:DaprWorkflowContext, input: int):
@ -32,6 +33,6 @@ if __name__ == '__main__':
wfapp = WorkflowApp()
results = wfapp.run_and_monitor_workflow(task_chain_workflow, input=10)
results = wfapp.run_and_monitor_workflow_sync(task_chain_workflow, input=10)
print(f"Results: {results}")

View File

@ -0,0 +1,40 @@
import asyncio
import logging
from dapr_agents.workflow import WorkflowApp, workflow, task
from dapr_agents.types import DaprWorkflowContext
@workflow(name="random_workflow")
def task_chain_workflow(ctx: DaprWorkflowContext, input: int):
result1 = yield ctx.call_activity(step1, input=input)
result2 = yield ctx.call_activity(step2, input=result1)
result3 = yield ctx.call_activity(step3, input=result2)
return [result1, result2, result3]
@task
def step1(activity_input: int) -> int:
print(f"Step 1: Received input: {activity_input}.")
return activity_input + 1
@task
def step2(activity_input: int) -> int:
print(f"Step 2: Received input: {activity_input}.")
return activity_input * 2
@task
def step3(activity_input: int) -> int:
print(f"Step 3: Received input: {activity_input}.")
return activity_input ^ 2
async def main():
logging.basicConfig(level=logging.INFO)
wfapp = WorkflowApp()
result = await wfapp.run_and_monitor_workflow_async(
task_chain_workflow,
input=10
)
print(f"Results: {result}")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -31,5 +31,5 @@ if __name__ == '__main__':
wfapp = WorkflowApp()
# Run workflow
results = wfapp.run_and_monitor_workflow(task_chain_workflow)
results = wfapp.run_and_monitor_workflow_sync(task_chain_workflow)
print(results)

View File

@ -0,0 +1,40 @@
import asyncio
import logging
from dapr_agents.workflow import WorkflowApp, workflow, task
from dapr_agents.types import DaprWorkflowContext
from dotenv import load_dotenv
# Define Workflow logic
@workflow(name='lotr_workflow')
def task_chain_workflow(ctx: DaprWorkflowContext):
result1 = yield ctx.call_activity(get_character)
result2 = yield ctx.call_activity(get_line, input={"character": result1})
return result2
@task(description="""
Pick a random character from The Lord of the Rings\n
and respond with the character's name ONLY
""")
def get_character() -> str:
pass
@task(description="What is a famous line by {character}",)
def get_line(character: str) -> str:
pass
async def main():
logging.basicConfig(level=logging.INFO)
# Load environment variables
load_dotenv()
# Initialize the WorkflowApp
wfapp = WorkflowApp()
# Run workflow
result = await wfapp.run_and_monitor_workflow_async(task_chain_workflow)
print(f"Results: {result}")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,8 +1,8 @@
import logging
from dapr_agents.workflow import WorkflowApp, workflow, task
from dapr_agents.types import DaprWorkflowContext
from pydantic import BaseModel
from dotenv import load_dotenv
import logging
@workflow
def question(ctx:DaprWorkflowContext, input:int):
@ -25,5 +25,9 @@ if __name__ == '__main__':
wfapp = WorkflowApp()
results = wfapp.run_and_monitor_workflow(workflow=question, input="Scooby Doo")
results = wfapp.run_and_monitor_workflow_sync(
workflow=question,
input="Scooby Doo"
)
print(results)

View File

@ -0,0 +1,40 @@
import asyncio
import logging
from dapr_agents.workflow import WorkflowApp, workflow, task
from dapr_agents.types import DaprWorkflowContext
from pydantic import BaseModel
from dotenv import load_dotenv
@workflow
def question(ctx:DaprWorkflowContext, input:int):
step1 = yield ctx.call_activity(ask, input=input)
return step1
class Dog(BaseModel):
name: str
bio: str
breed: str
@task("Who was {name}?")
def ask(name:str) -> Dog:
pass
async def main():
logging.basicConfig(level=logging.INFO)
# Load environment variables
load_dotenv()
# Initialize the WorkflowApp
wfapp = WorkflowApp()
# Run workflow
result = await wfapp.run_and_monitor_workflow_async(
workflow=question,
input="Scooby Doo"
)
print(f"Results: {result}")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -358,4 +358,4 @@ if __name__ == '__main__':
raise ValueError("PDF URL must be provided via CLI or config file.")
# Run the workflow
wfapp.run_and_monitor_workflow(workflow=doc2podcast, input=user_input)
wfapp.run_and_monitor_workflow_sync(workflow=doc2podcast, input=user_input)

View File

@ -54,7 +54,7 @@ if __name__ == '__main__':
wfapp = WorkflowApp()
results = wfapp.run_and_monitor_workflow(workflow=test_workflow)
results = wfapp.run_and_monitor_workflow_sync(workflow=test_workflow)
logging.info("Workflow results: %s", results)
logging.info("Workflow completed successfully.")