* Update quickstarts Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com> * Update stream parameter in LLM generation Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com> * Update initialization of LLM client for agent base Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com> * Switch comment to debug logging Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com> * Improve logic to handle api key and other parameters in openai clients Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com> * Add Workflow register_task method Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com> * Fix lint errors Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com> * Update version Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com> --------- Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com> |
||
|---|---|---|
| .. | ||
| components | ||
| README.md | ||
| parallel_workflow.py | ||
| requirements.txt | ||
| sequential_workflow.py | ||
| workflow_dapr.py | ||
README.md
LLM-based Workflow Patterns
This quickstart demonstrates how to orchestrate sequential and parallel tasks using Dapr Agents' workflow capabilities powered by Language Models (LLMs). You'll learn how to build resilient, stateful workflows that leverage LLMs for reasoning, decision-making, and automation.
Prerequisites
- Python 3.10 (recommended)
- pip package manager
- OpenAI API key
- Dapr CLI and Docker installed
Environment Setup
# Create a virtual environment
python3.10 -m venv .venv
# Activate the virtual environment
# On Windows:
.venv\Scripts\activate
# On macOS/Linux:
source .venv/bin/activate
# Install dependencies
pip install -r requirements.txt
Configuration
- Create a
.envfile in the project root:
OPENAI_API_KEY=your_api_key_here
-
Replace
your_api_key_herewith your actual OpenAI API key. -
Make sure Dapr is initialized on your system:
dapr init
- Create the workflow state store component:
Look at the workflowstate.yaml file in the components directory:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: workflowstatestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
Examples
1. Sequential Task Execution
This example demonstrates the Chaining Pattern by executing two activities in sequence:
from dapr_agents.workflow import WorkflowApp, workflow, task
from dapr.ext.workflow import DaprWorkflowContext
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Initialize the WorkflowApp
# Define Workflow logic
@workflow(name='task_chain_workflow')
def task_chain_workflow(ctx: DaprWorkflowContext):
result1 = yield ctx.call_activity(get_character)
result2 = yield ctx.call_activity(get_line, input={"character": result1})
return result2
@task(description="""
Pick a random character from The Lord of the Rings\n
and respond with the character's name only
""")
def get_character() -> str:
pass
@task(description="What is a famous line by {character}",)
def get_line(character: str) -> str:
pass
if __name__ == '__main__':
wfapp = WorkflowApp()
results = wfapp.run_and_monitor_workflow_sync(task_chain_workflow)
print(f"Famous Line: {results}")
Run the sequential task chain workflow:
dapr run --app-id dapr-agent-wf --resources-path components/ -- python sequential_workflow.py
How it works: In this chaining pattern, the workflow executes tasks in strict sequence:
- The
get_character()task executes first and returns a character name - Only after completion, the
get_line()task runs using that character name as input - Each task awaits the previous task's completion before starting
2. Parallel Task Execution
This example demonstrates the Fan-out/Fan-in Pattern with a research use case. It will execute 3 activities in parallel; then synchronize these activities do not proceed with the execution of subsequent activities until all preceding activities have completed.
import logging
from typing import List
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from dapr_agents.workflow import WorkflowApp, workflow, task
from dapr.ext.workflow import DaprWorkflowContext
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
# Define a structured model for a single question
class Question(BaseModel):
"""Represents a single research question."""
text: str = Field(..., description="A research question related to the topic.")
# Define a model that holds multiple questions
class Questions(BaseModel):
"""Encapsulates a list of research questions."""
questions: List[Question] = Field(...,
description="A list of research questions generated for the topic.")
# Define Workflow logic
@workflow(name="research_workflow")
def research_workflow(ctx: DaprWorkflowContext, topic: str):
"""Defines a Dapr workflow for researching a given topic."""
# Generate research questions
questions: Questions = yield ctx.call_activity(generate_questions, input={"topic": topic})
# Gather information for each question in parallel
parallel_tasks = [ctx.call_activity(gather_information, input={"question": q["text"]}) for q in
questions["questions"]]
research_results = yield wfapp.when_all(parallel_tasks) # Ensure wfapp is initialized
# Synthesize the results into a final report
final_report = yield ctx.call_activity(synthesize_results,
input={"topic": topic, "research_results": research_results})
return final_report
@task(description="Generate 3 focused research questions about {topic}.")
def generate_questions(topic: str) -> Questions:
"""Generates three research questions related to the given topic."""
pass
@task(
description="Research information to answer this question: {question}. Provide a detailed response.")
def gather_information(question: str) -> str:
"""Fetches relevant information based on the research question provided."""
pass
@task(
description="Create a comprehensive research report on {topic} based on the following research: {research_results}")
def synthesize_results(topic: str, research_results: List[str]) -> str:
"""Synthesizes the gathered research into a structured report."""
pass
if __name__ == "__main__":
wfapp = WorkflowApp()
research_topic = "The environmental impact of quantum computing"
logging.info(f"Starting research workflow on: {research_topic}")
results = wfapp.run_and_monitor_workflow_sync(research_workflow, input=research_topic)
logging.info(f"\nResearch Report:\n{results}")
Run the parallel research workflow:
dapr run --app-id dapr-agent-research --resources-path components/ -- python parallel_workflow.py
How it works: This fan-out/fan-in pattern combines sequential and parallel execution:
- First,
generate_questions()executes sequentially - Multiple
gather_information()tasks run in parallel usingctx.when_all() - The workflow waits for all parallel tasks to complete
- Finally,
synthesize_results()executes with all gathered data
Integration with Dapr
Dapr Agents workflows leverage Dapr's core capabilities:
- Durability: Workflows survive process restarts or crashes
- State Management: Workflow state is persisted in a distributed state store
- Actor Model: Tasks run as reliable, stateful actors within the workflow
- Event Handling: Workflows can react to external events
Troubleshooting
- Docker is Running: Ensure Docker is running with
docker psand verify you have container instances withdaprio/dapr,openzipkin/zipkin, andredisimages running - Redis Connection: Ensure Redis is running (automatically installed by Dapr)
- Dapr Initialization: If components aren't found, verify Dapr is initialized with
dapr init - API Key: Check your OpenAI API key if authentication fails
Next Steps
After completing this quickstart, move on to the Multi-Agent Workflow quickstart to learn how to create distributed systems of collaborating agents.