dapr-agents/quickstarts/01-hello-world/04_chain_tasks.py

37 lines
1012 B
Python

from dapr_agents.workflow import WorkflowApp, workflow, task
from dapr.ext.workflow import DaprWorkflowContext
from dotenv import load_dotenv
load_dotenv()
@workflow(name="analyze_topic")
def analyze_topic(ctx: DaprWorkflowContext, topic: str):
# Each step is durable and can be retried
outline = yield ctx.call_activity(create_outline, input=topic)
if len(outline) > 0:
print("Outline:", outline)
blog_post = yield ctx.call_activity(write_blog, input=outline)
if len(blog_post) > 0:
print("Blog post:", blog_post)
return blog_post
@task(description="Create a short outline about {topic}")
def create_outline(topic: str) -> str:
pass
@task(description="Write a short blog post following this outline: {outline}")
def write_blog(outline: str) -> str:
pass
if __name__ == "__main__":
wfapp = WorkflowApp()
results = wfapp.run_and_monitor_workflow_sync(analyze_topic, input="AI Agents")
if len(results) > 0:
print(f"Result: {results}")