Replaced pure Dapr workflow example with parallel workflow example (#41)

* Replaced pure Dapr workflow example with parallel workflow example

Signed-off-by: Bilgin Ibryam <bibryam@gmail.com>

* Fixes parallel workflows

Signed-off-by: Elena Kolevska <elena@kolevska.com>

---------

Signed-off-by: Bilgin Ibryam <bibryam@gmail.com>
Signed-off-by: Elena Kolevska <elena@kolevska.com>
Co-authored-by: Elena Kolevska <elena@kolevska.com>
This commit is contained in:
Bilgin Ibryam 2025-03-11 21:55:32 +00:00 committed by GitHub
parent 9e54b91c61
commit 2307007cac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 239 additions and 121 deletions

View File

@ -1,6 +1,6 @@
# Agentic Workflow with Task Chain
# Agentic Workflow Patterns
This quickstart demonstrates how to create stateful task chains using both pure Dapr Workflows and Dapr Agents' enhanced workflow capabilities. You'll learn how to orchestrate multiple tasks that use LLM inference, seeing firsthand how Dapr Agents simplifies and improves the workflow development experience.
This quickstart demonstrates how to orchestrate sequential and parallel tasks using Dapr Agents' workflow capabilities.
## Prerequisites
@ -68,102 +68,11 @@ spec:
## Examples
### 1. Dapr Workflow Without Dapr Agents
### 1. Sequential Task Execution
This example shows a basic task chain using pure Dapr workflows:
This example demonstrates the Chaining Pattern by executing two activities in sequence:
```python
# workflow_dapr.py
import dapr.ext.workflow as wf
from dotenv import load_dotenv
from openai import OpenAI
from time import sleep
# Load environment variables
load_dotenv()
# Initialize Workflow Instance
wfr = wf.WorkflowRuntime()
# Define Workflow logic
@wfr.workflow(name='lotr_workflow')
def task_chain_workflow(ctx: wf.DaprWorkflowContext):
result1 = yield ctx.call_activity(get_character)
result2 = yield ctx.call_activity(get_line, input=result1)
return result2
# Activity 1
@wfr.activity(name='step1')
def get_character(ctx):
client = OpenAI()
response = client.chat.completions.create(
messages = [
{
"role": "user",
"content": "Pick a random character from The Lord of the Rings and respond with the character name only"
}
],
model = 'gpt-4o'
)
character = response.choices[0].message.content
print(f"Character: {character}")
return character
# Activity 2
@wfr.activity(name='step2')
def get_line(ctx, character: str):
client = OpenAI()
response = client.chat.completions.create(
messages = [
{
"role": "user",
"content": f"What is a famous line by {character}"
}
],
model = 'gpt-4o'
)
line = response.choices[0].message.content
print(f"Line: {line}")
return line
if __name__ == '__main__':
wfr.start()
sleep(5) # wait for workflow runtime to start
wf_client = wf.DaprWorkflowClient()
instance_id = wf_client.schedule_new_workflow(workflow=task_chain_workflow)
print(f'Workflow started. Instance ID: {instance_id}')
state = wf_client.wait_for_workflow_completion(instance_id)
print(f'Workflow completed! Status: {state.runtime_status}')
wfr.shutdown()
```
Run the pure Dapr workflow:
<!-- STEP
name: Run text completion example
expected_stdout_lines:
- "== APP == Workflow started. Instance ID:"
- "== APP == Character:"
- "== APP == Line:"
- "== APP == Workflow completed! Status: WorkflowStatus.COMPLETED"
timeout_seconds: 30
output_match_mode: substring
-->
```bash
dapr run --app-id dapr-wf --resources-path components/ -- python workflow_dapr.py
```
<!-- END_STEP -->
**Expected output:** The workflow will select a random Lord of the Rings character and then generate a famous line by that character.
### 2. Dapr Agents Workflow
This example demonstrates how Dapr Agents simplifies the same workflow:
```python
# workflow_dapr_agent.py
from dapr_agents.workflow import WorkflowApp, workflow, task
from dapr_agents.types import DaprWorkflowContext
from dotenv import load_dotenv
@ -198,7 +107,7 @@ if __name__ == '__main__':
print(f"Famous Line: {results}")
```
Run the Dapr Agents workflow:
Run the sequential task chain workflow:
<!-- STEP
name: Run text completion example
@ -210,41 +119,142 @@ timeout_seconds: 30
output_match_mode: substring
-->
```bash
dapr run --app-id dapr-agent-wf --resources-path components/ -- python workflow_dapr_agent.py
dapr run --app-id dapr-agent-wf --resources-path components/ -- python sequential_workflow.py
```
<!-- END_STEP -->
**Expected output:** Similar to the pure Dapr workflow, but with significantly less code.
**How it works:**
In this chaining pattern, the workflow executes tasks in strict sequence:
1. The `get_character()` task executes first and returns a character name
2. Only after completion, the `get_line()` task runs using that character name as input
3. Each task awaits the previous task's completion before starting
## Key Concepts
### 2. Parallel Task Execution
- **Dapr Workflow**: A durable, resilient orchestration of activities
- **Workflow Activities**: Individual tasks within a workflow
- **Task Chain**: A sequence of tasks where each depends on the previous
- **WorkflowApp**: Dapr Agents' simplified workflow interface
- **LLM Tasks**: Automatically implemented tasks powered by LLMs
This example demonstrates the Fan-out/Fan-in Pattern with a research use case. It will execute 3 activities in parallel; then synchronize these activities do not proceed with the execution of subsequent activities until all preceding activities have completed.
## Key Differences and Benefits
```python
import logging
from typing import List
### Pure Dapr Workflow Approach
- Requires manual OpenAI client setup
- Explicit handling of API calls and responses
- More boilerplate code
- Direct control over workflow execution
from dotenv import load_dotenv
from pydantic import BaseModel, Field
### Dapr Agents Approach
- Automatic LLM client management
- Simplified task definitions using decorators
- Built-in prompt templating
- Reduced boilerplate code
from dapr_agents.workflow import WorkflowApp, workflow, task
from dapr_agents.types import DaprWorkflowContext
## Dapr Integration
# Load environment variables
load_dotenv()
This quickstart demonstrates core Dapr capabilities:
# Configure logging
logging.basicConfig(level=logging.INFO)
# Define a structured model for a single question
class Question(BaseModel):
"""Represents a single research question."""
text: str = Field(..., description="A research question related to the topic.")
# Define a model that holds multiple questions
class Questions(BaseModel):
"""Encapsulates a list of research questions."""
questions: List[Question] = Field(...,
description="A list of research questions generated for the topic.")
# Define Workflow logic
@workflow(name="research_workflow")
def research_workflow(ctx: DaprWorkflowContext, topic: str):
"""Defines a Dapr workflow for researching a given topic."""
# Generate research questions
questions: Questions = yield ctx.call_activity(generate_questions, input={"topic": topic})
# Gather information for each question in parallel
parallel_tasks = [ctx.call_activity(gather_information, input={"question": q["text"]}) for q in
questions["questions"]]
research_results = yield wfapp.when_all(parallel_tasks) # Ensure wfapp is initialized
# Synthesize the results into a final report
final_report = yield ctx.call_activity(synthesize_results,
input={"topic": topic, "research_results": research_results})
return final_report
@task(description="Generate 3 focused research questions about {topic}.")
def generate_questions(topic: str) -> Questions:
"""Generates three research questions related to the given topic."""
pass
@task(
description="Research information to answer this question: {question}. Provide a detailed response.")
def gather_information(question: str) -> str:
"""Fetches relevant information based on the research question provided."""
pass
@task(
description="Create a comprehensive research report on {topic} based on the following research: {research_results}")
def synthesize_results(topic: str, research_results: List[str]) -> str:
"""Synthesizes the gathered research into a structured report."""
pass
if __name__ == "__main__":
wfapp = WorkflowApp()
research_topic = "The environmental impact of quantum computing"
logging.info(f"Starting research workflow on: {research_topic}")
results = wfapp.run_and_monitor_workflow(research_workflow, input=research_topic)
logging.info(f"\nResearch Report:\n{results}")
```
Run the parallel research workflow:
<!-- STEP
name: Run parallel workflows example
expected_stdout_lines:
- "Starting research workflow on: The environmental impact of quantum computing"
- "Research Report:"
output_match_mode: substring
-->
```bash
dapr run --app-id dapr-agent-research --resources-path components/ -- python parallel_workflow.py
```
<!-- END_STEP -->
**How it works:**
This fan-out/fan-in pattern combines sequential and parallel execution:
1. First, `generate_questions()` executes sequentially
2. Multiple `gather_information()` tasks run in parallel using `ctx.when_all()`
3. The workflow waits for all parallel tasks to complete
4. Finally, `synthesize_results()` executes with all gathered data
## Additional Workflow Patterns
Beyond the patterns demonstrated in the examples, Dapr Agents supports other workflow patterns:
### Monitor Pattern
The **Monitor Pattern** periodically executes tasks in a loop at specified intervals. It's useful for scheduled jobs, polling external systems, or resource cleanup operations.
### External System Interaction Pattern
The **External System Interaction Pattern** allows workflows to wait for events from external systems before continuing. This pattern is ideal for approval workflows, integrating with external services, or implementing human-in-the-loop processes.
## Integration with Dapr
Dapr Agents workflows leverage Dapr's core capabilities:
- **Durability**: Workflows survive process restarts or crashes
- **State Management**: Workflow state is persisted in a distributed state store
- **Actor Model**: Tasks run as reliable, stateful actors within the workflow
- **Event Handling**: Workflows can react to external events
- **Durability**: Workflows can survive process restarts
- **Actor Model**: Tasks run as reliable, stateful actors
- **Observability**: Workflow status tracking
## Troubleshooting
@ -254,4 +264,4 @@ This quickstart demonstrates core Dapr capabilities:
## Next Steps
After completing this quickstart, move on to the [Multi-Agent Workflow quickstart](../05-multi-agent-workflow/README.md) to learn how to create distributed systems of collaborating agents.
After completing this quickstart, move on to the [Multi-Agent Workflow quickstart](../05-multi-agent-workflow-dapr-workflows/README.md) to learn how to create distributed systems of collaborating agents.

View File

@ -0,0 +1,78 @@
import logging
from typing import List
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from dapr_agents.workflow import WorkflowApp, workflow, task
from dapr_agents.types import DaprWorkflowContext
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
# Define a structured model for a single question
class Question(BaseModel):
"""Represents a single research question."""
text: str = Field(..., description="A research question related to the topic.")
# Define a model that holds multiple questions
class Questions(BaseModel):
"""Encapsulates a list of research questions."""
questions: List[Question] = Field(...,
description="A list of research questions generated for the topic.")
# Define Workflow logic
@workflow(name="research_workflow")
def research_workflow(ctx: DaprWorkflowContext, topic: str):
"""Defines a Dapr workflow for researching a given topic."""
# Generate research questions
questions: Questions = yield ctx.call_activity(generate_questions, input={"topic": topic})
# Gather information for each question in parallel
parallel_tasks = [ctx.call_activity(gather_information, input={"question": q["text"]}) for q in
questions["questions"]]
research_results = yield wfapp.when_all(parallel_tasks) # Ensure wfapp is initialized
# Synthesize the results into a final report
final_report = yield ctx.call_activity(synthesize_results,
input={"topic": topic, "research_results": research_results})
return final_report
@task(description="Generate 3 focused research questions about {topic}.")
def generate_questions(topic: str) -> Questions:
"""Generates three research questions related to the given topic."""
pass
@task(
description="Research information to answer this question: {question}. Provide a detailed response.")
def gather_information(question: str) -> str:
"""Fetches relevant information based on the research question provided."""
pass
@task(
description="Create a comprehensive research report on {topic} based on the following research: {research_results}")
def synthesize_results(topic: str, research_results: List[str]) -> str:
"""Synthesizes the gathered research into a structured report."""
pass
if __name__ == "__main__":
wfapp = WorkflowApp()
research_topic = "The environmental impact of quantum computing"
logging.info(f"Starting research workflow on: {research_topic}")
results = wfapp.run_and_monitor_workflow(research_workflow, input=research_topic)
if len(results) > 0:
logging.info(f"\nResearch Report:\n{results}")

View File

@ -0,0 +1,30 @@
from dapr_agents.workflow import WorkflowApp, workflow, task
from dapr_agents.types import DaprWorkflowContext
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Define Workflow logic
@workflow(name='task_chain_workflow')
def task_chain_workflow(ctx: DaprWorkflowContext):
result1 = yield ctx.call_activity(get_character)
result2 = yield ctx.call_activity(get_line, input={"character": result1})
return result2
@task(description="""
Pick a random character from The Lord of the Rings\n
and respond with the character's name only
""")
def get_character() -> str:
pass
@task(description="What is a famous line by {character}",)
def get_line(character: str) -> str:
pass
if __name__ == '__main__':
wfapp = WorkflowApp()
results = wfapp.run_and_monitor_workflow(task_chain_workflow)
print(f"Famous Line: {results}")