Compare commits

...

17 Commits
v0.8.1 ... main

Author SHA1 Message Date
Roberto Rodriguez a475687445
Merge pull request #185 from sicoyle/tweak-da-workflow
fix(tracing): enable tracing on durable agent + add quickstart
2025-09-04 16:14:26 -04:00
Samantha Coyle 6e6ff447be
style: make linter happy
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-09-04 09:01:31 -05:00
Samantha Coyle dde2ab0d2c
fix: add method i missed
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-09-02 13:16:49 -05:00
Samantha Coyle 99defbabe1
Merge branch 'main' into tweak-da-workflow 2025-09-02 13:01:27 -05:00
Yaron Schneider fbb3bfd61f
change agents version in quickstarts (#190)
Signed-off-by: yaron2 <schneider.yaron@live.com>
2025-08-31 12:06:26 -04:00
Yaron Schneider 224c61c6c2
Merge branch 'main' into tweak-da-workflow 2025-08-30 16:15:52 -07:00
Yaron Schneider d4e6c76353
Fix hang after multiple .run() calls (#189)
* fix hang after multiple .run() calls

Signed-off-by: yaron2 <schneider.yaron@live.com>

* linter

Signed-off-by: yaron2 <schneider.yaron@live.com>

---------

Signed-off-by: yaron2 <schneider.yaron@live.com>
2025-08-30 18:56:16 -04:00
Samantha Coyle 423de2a7a1
fix: update for tests to pass
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-08-27 10:43:28 -04:00
Samantha Coyle 00e0863bef
style: add file i missed for formatting
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-08-27 08:04:44 -04:00
Samantha Coyle 4b081c6984
style: tox -e ruff
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-08-27 08:01:08 -04:00
Samantha Coyle e31484dde3
style: update readme with durable agent tracing quickstart too
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-08-27 07:54:04 -04:00
Samantha Coyle 54d40dbcdb
fix(tracing): enable tracing on durable agent + quickstart
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-08-26 16:00:12 -04:00
Sam 649d45fa2e
docs: rm our repo dapr docs as in 1.16 preview now (#178)
Signed-off-by: Samantha Coyle <sam@diagrid.io>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
2025-08-13 15:56:38 -07:00
Sam e03dfefcc6
fix: speed up deps installed + handle errs in quickstarts better (#177)
* fix: speed up deps installed + handle errs in quickstarts better

Signed-off-by: Samantha Coyle <sam@diagrid.io>

* docs: update docs to use uv too

Signed-off-by: Samantha Coyle <sam@diagrid.io>

* docs: add mapping on cmds for myself

Signed-off-by: Samantha Coyle <sam@diagrid.io>

---------

Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-08-13 15:47:20 -07:00
Roberto Rodriguez 3ef87c28e6
fix: correct Pydantic type generation for anyOf/oneOf in MCP tool schemas (#176)
* fix: handle anyOf/oneOf in JSON Schema to generate correct Pydantic types for MCP tools

Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com>

* fix: Lint

Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com>

* fix: omit None values when serializing tool args to avoid sending nulls for non-nullable fields

Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com>

* Updated dependency

Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com>

---------

Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com>
2025-08-13 12:10:26 -07:00
Roberto Rodriguez d325cc07de
Update Tool Execution Final Message and Dependencies (#175)
* Update final message when max iteration hits in durable agent

Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com>

* Update dependencies

Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com>

* Fix lint

Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com>

* Update testenv deps to include vectorstore

Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com>

---------

Signed-off-by: Roberto Rodriguez <9653181+Cyb3rWard0g@users.noreply.github.com>
2025-08-11 12:15:13 -07:00
Albert Callarisa 249ea5ec43
Add partition key to state operations (#173)
Signed-off-by: Albert Callarisa <albert@diagrid.io>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
2025-08-05 16:28:35 -07:00
109 changed files with 1312 additions and 2442 deletions

View File

@ -1,77 +0,0 @@
name: docs
on:
push:
branches:
- main
paths:
- docs/**
- '!docs/development/**'
pull_request:
branches:
- main
paths:
- docs/**
- '!docs/development/**'
workflow_dispatch:
permissions:
contents: write
jobs:
changed_files:
runs-on: ubuntu-latest
name: Review changed files
outputs:
docs_any_changed: ${{ steps.changed-files.outputs.docs_any_changed }}
steps:
- uses: actions/checkout@v4
- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@v45
with:
files_yaml: |
docs:
- 'docs/**'
- 'mkdocs.yml'
- '!docs/development/**'
base_sha: 'main'
documentation_validation:
needs: changed_files
name: Documentation validation
if: needs.changed_files.outputs.docs_any_changed == 'true'
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Remove plugins from mkdocs configuration
run: |
sed -i '/^plugins:/,/^[^ ]/d' mkdocs.yml
- name: Install Python dependencies
run: |
pip install mkdocs-material
pip install .[recommended,git,imaging]
pip install mkdocs-jupyter
- name: Validate build
run: mkdocs build
deploy:
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
needs: documentation_validation
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: 3.x
- run: echo "cache_id=$(date --utc '+%V')" >> $GITHUB_ENV
- uses: actions/cache@v4
with:
key: mkdocs-material-${{ env.cache_id }}
path: .cache
restore-keys: |
mkdocs-material-
- name: Install Python dependencies
run: |
pip install mkdocs-material
pip install .[recommended,git,imaging]
pip install mkdocs-jupyter
- run: mkdocs gh-deploy --force

4
.gitignore vendored
View File

@ -177,3 +177,7 @@ db/
# Requirements files since we use pyproject.toml instead
dev-requirements.txt
docker-entrypoint-initdb.d/
*requirements.txt

View File

@ -34,7 +34,7 @@ Dapr Agents builds on top of Dapr's Workflow API, which under the hood represent
### Data-Centric AI Agents
With built-in connectivity to over 50 enterprise data sources, Dapr Agents efficiently handles structured and unstructured data. From basic [PDF extraction](./docs/concepts/arxiv_fetcher.md) to large-scale database interactions, it enables seamless data-driven AI workflows with minimal code changes. Dapr's [bindings](https://docs.dapr.io/reference/components-reference/supported-bindings/) and [state stores](https://docs.dapr.io/reference/components-reference/supported-state-stores/) provide access to a large number of data sources that can be used to ingest data to an agent.
With built-in connectivity to over 50 enterprise data sources, Dapr Agents efficiently handles structured and unstructured data. From basic [PDF extraction](https://v1-16.docs.dapr.io/developing-applications/dapr-agents/dapr-agents-integrations/#arxiv-fetcher) to large-scale database interactions, it enables seamless data-driven AI workflows with minimal code changes. Dapr's [bindings](https://docs.dapr.io/reference/components-reference/supported-bindings/) and [state stores](https://docs.dapr.io/reference/components-reference/supported-state-stores/) provide access to a large number of data sources that can be used to ingest data to an agent.
### Accelerated Development

View File

@ -101,7 +101,7 @@ class Agent(AgentBase):
)
# Process conversation iterations and return the result
return await self.process_iterations(messages)
return await self.conversation(messages)
async def execute_tools(self, tool_calls: List[ToolCall]) -> List[ToolMessage]:
"""
@ -180,7 +180,7 @@ class Agent(AgentBase):
# Run all tool calls concurrently, but bounded by max_concurrent
return await asyncio.gather(*(run_and_record(tc) for tc in tool_calls))
async def process_iterations(self, messages: List[Dict[str, Any]]) -> Any:
async def conversation(self, messages: List[Dict[str, Any]]) -> Any:
"""
Drives the agent conversation iteratively until a final answer or max iterations is reached.
Handles tool calls, updates memory, and returns the final assistant message.

View File

@ -86,7 +86,10 @@ class DurableAgent(AgenticWorkflow, AgentBase):
"pubsub_name": self.message_bus_name,
"orchestrator": False,
}
self.register_agentic_system()
if not self.wf_runtime_is_running:
self.start_runtime()
async def run(self, input_data: Union[str, Dict[str, Any]]) -> Any:
"""
@ -97,9 +100,6 @@ class DurableAgent(AgenticWorkflow, AgentBase):
Returns:
Any: The final output from the workflow execution.
"""
# Make sure the Dapr runtime is running
if not self.wf_runtime_is_running:
self.start_runtime()
# Prepare input payload for workflow
if isinstance(input_data, dict):
@ -209,6 +209,8 @@ class DurableAgent(AgenticWorkflow, AgentBase):
# 🔴 If this was the last turn, stop here—even though there were tool calls
if turn == self.max_iterations:
final_message = response_message
# Make sure content exists and is a string
final_message["content"] = final_message.get("content") or ""
final_message[
"content"
] += "\n\n⚠️ Stopped: reached max iterations."
@ -223,6 +225,8 @@ class DurableAgent(AgenticWorkflow, AgentBase):
# 🔴 If it happened to be the last turn, banner it
if turn == self.max_iterations:
# Again, ensure content is never None
final_message["content"] = final_message.get("content") or ""
final_message["content"] += "\n\n⚠️ Stopped: reached max iterations."
break # exit loop with final_message

View File

@ -9,7 +9,7 @@ Span Hierarchy by Agent Type:
**Regular Agent (Direct Execution):**
- Agent.run (AGENT span) - Root span for agent execution
Agent.process_iterations (CHAIN span) - Processing and reasoning logic
Agent.conversation (CHAIN span) - Processing and reasoning logic
Agent.execute_tools (TOOL span) - Batch tool coordination
AgentToolExecutor.run_tool (TOOL span) - Individual tool execution
ChatClient.generate (LLM span) - Language model interactions
@ -121,6 +121,20 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
"""
return ("dapr-agents",)
def instrument(self, **kwargs: Any) -> None:
"""
Public method to instrument Dapr Agents with OpenTelemetry tracing.
This method is called by users to enable instrumentation. It delegates
to the private _instrument method which contains the actual implementation.
Args:
**kwargs: Instrumentation configuration including:
- tracer_provider: Optional OpenTelemetry tracer provider
- Additional provider-specific configuration
"""
self._instrument(**kwargs)
def _instrument(self, **kwargs: Any) -> None:
"""
Instrument Dapr Agents with comprehensive OpenTelemetry tracing.
@ -151,10 +165,17 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
# Apply W3C context propagation fix for Dapr Workflows (critical for proper tracing)
self._apply_context_propagation_fix()
# Apply method wrappers in logical order for complete tracing coverage
self._apply_agent_wrappers() # Regular agent execution paths
self._apply_workflow_wrappers() # Workflow-based agent execution
self._apply_llm_wrappers() # LLM provider integrations
# Apply agent wrappers (Regular Agent class execution paths)
self._apply_agent_wrappers()
# Register workflow instrumentation callback for lazy loading of wrappers.
# This is needed for DurableAgent instrumentation,
# because the WorkflowApp is not available immediately when the instrumentor is initialized,
# so we must register the callback to properly instrument the WorkflowApp.
self._register_workflow_instrumentation_callback()
# LLM provider integrations
self._apply_llm_wrappers()
logger.info("✅ Dapr Agents OpenTelemetry instrumentation enabled")
@ -310,19 +331,13 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
logger.debug(
f"🔗 Storing W3C context for instance {instance_id}"
)
logger.debug(
f"🔗 Traceparent: {global_context.get('traceparent')}"
)
# Store W3C context with specific workflow instance ID
store_workflow_context(instance_id, global_context)
else:
logger.warning(
"⚠️ No global workflow context found for storage"
f"⚠️ No global workflow context found for storage - instance {instance_id}"
)
else:
logger.warning(
"⚠️ No workflow instance_id - cannot store W3C context"
"⚠️ No workflow instance_id - cannot access W3C context"
)
try:
@ -373,33 +388,35 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
Instruments core agent methods to create AGENT spans (top-level) and CHAIN spans
(processing steps) for comprehensive agent execution tracing in Phoenix UI.
"""
try:
from dapr_agents.agents.agent.agent import Agent
from dapr_agents.tool.executor import AgentToolExecutor
# Main agent run wrapper (AGENT span - top level)
wrap_function_wrapper(
module="dapr_agents.agents.agent.agent",
name="Agent.run",
target=Agent.run,
wrapper=AgentRunWrapper(self._tracer),
)
# Process iterations wrapper (CHAIN span - processing steps)
wrap_function_wrapper(
module="dapr_agents.agents.agent.agent",
name="Agent.process_iterations",
target=Agent.conversation,
wrapper=ProcessIterationsWrapper(self._tracer),
)
# Tool execution batch wrapper (TOOL span - batch execution)
wrap_function_wrapper(
module="dapr_agents.agents.agent.agent",
name="Agent.execute_tools",
target=Agent.execute_tools,
wrapper=ExecuteToolsWrapper(self._tracer),
)
# Individual tool execution wrapper (TOOL span - actual tool execution)
wrap_function_wrapper(
module="dapr_agents.tool.executor",
name="AgentToolExecutor.run_tool",
target=AgentToolExecutor.run_tool,
wrapper=RunToolWrapper(self._tracer),
)
except Exception as e:
logger.error(f"Error applying agent wrappers: {e}", exc_info=True)
def _apply_workflow_wrappers(self) -> None:
"""
@ -409,29 +426,37 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
that capture the complete workflow lifecycle from start to completion, enabling
detailed monitoring of workflow orchestration in Phoenix UI.
"""
# Workflow execution wrapper for DurableAgent (AGENT span with monitoring)
# This captures the complete workflow lifecycle from start to completion
wrap_function_wrapper(
module="dapr_agents.workflow.base",
name="WorkflowApp.run_and_monitor_workflow_async",
wrapper=WorkflowMonitorWrapper(self._tracer),
)
try:
from dapr_agents.workflow.base import WorkflowApp
from dapr_agents.workflow.task import WorkflowTask
# Workflow execution wrapper for Orchestrators (AGENT span, fire-and-forget)
# This captures orchestrator workflow starts (no monitoring since they're event-driven)
wrap_function_wrapper(
module="dapr_agents.workflow.base",
name="WorkflowApp.run_workflow",
wrapper=WorkflowRunWrapper(self._tracer),
)
original_run_and_monitor = WorkflowApp.run_and_monitor_workflow_async
original_run_workflow = WorkflowApp.run_workflow
original_task_call = WorkflowTask.__call__
# Create wrapper instances
monitor_wrapper = WorkflowMonitorWrapper(self._tracer)
run_wrapper = WorkflowRunWrapper(self._tracer)
task_wrapper = WorkflowTaskWrapper(self._tracer)
# Workflow task execution wrapper (TOOL/LLM/TASK spans for individual workflow tasks)
# This captures individual task execution within workflows, using ctx.instance_id for grouping
wrap_function_wrapper(
module="dapr_agents.workflow.task",
name="WorkflowTask.__call__",
wrapper=WorkflowTaskWrapper(self._tracer),
# Create wrapped versions that can be bound to instances
def wrapped_run_and_monitor_workflow_async(self, *args, **kwargs):
return monitor_wrapper(original_run_and_monitor, self, args, kwargs)
def wrapped_run_workflow(self, *args, **kwargs):
return run_wrapper(original_run_workflow, self, args, kwargs)
def wrapped_task_call(self, *args, **kwargs):
return task_wrapper(original_task_call, self, args, kwargs)
# Replace the methods on the classes with our wrapped versions
# This is the only way to instrument Pydantic model methods
WorkflowApp.run_and_monitor_workflow_async = (
wrapped_run_and_monitor_workflow_async
)
WorkflowApp.run_workflow = wrapped_run_workflow
WorkflowTask.__call__ = wrapped_task_call
except Exception as e:
logger.error(f"Error applying workflow wrappers: {e}", exc_info=True)
def _apply_llm_wrappers(self) -> None:
"""
@ -496,6 +521,31 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
return chat_client_classes
def _register_workflow_instrumentation_callback(self):
"""
Register workflow instrumentation callback for lazy loading of wrappers.
This method registers a callback in the WorkflowApp class that will be
invoked when start_runtime() is called. This ensures workflow wrappers
are applied at the right time (when classes are fully loaded) while
maintaining user control over instrumentation.
"""
try:
from dapr_agents.workflow.base import WorkflowApp
# Register the workflow wrapper application as a class method
# This will be called when any WorkflowApp instance starts its runtime
setattr(
WorkflowApp,
WorkflowApp.INSTRUMENTATION_CALLBACK_ATTR,
self._apply_workflow_wrappers,
)
except ImportError as e:
logger.warning(f"Could not register workflow instrumentation callback: {e}")
except Exception as e:
logger.warning(f"Error registering workflow instrumentation callback: {e}")
def _uninstrument(self, **kwargs: Any) -> None:
"""
Uninstrument Dapr Agents by clearing the tracer.

View File

@ -237,9 +237,9 @@ class AgentRunWrapper:
class ProcessIterationsWrapper:
"""
Wrapper for Agent.process_iterations() method to create CHAIN spans for processing logic.
Wrapper for Agent.conversation() method to create CHAIN spans for processing logic.
This wrapper instruments the Agent.process_iterations() method, creating
This wrapper instruments the Agent.conversation() method, creating
CHAIN spans that capture the iterative processing and workflow execution
within an agent's reasoning cycle.
@ -265,16 +265,16 @@ class ProcessIterationsWrapper:
def __call__(self, wrapped: Any, instance: Any, args: Any, kwargs: Any) -> Any:
"""
Wrap Agent.process_iterations() method with CHAIN span tracing.
Wrap Agent.conversation() method with CHAIN span tracing.
Creates CHAIN spans that capture the iterative processing logic
and reasoning flow within agent execution, providing visibility
into the step-by-step processing workflow.
Args:
wrapped (callable): Original Agent.process_iterations method to be instrumented
wrapped (callable): Original Agent.conversation method to be instrumented
instance (Agent): Agent instance containing metadata and configuration
args (tuple): Positional arguments passed to process_iterations method
args (tuple): Positional arguments passed to conversation method
kwargs (dict): Keyword arguments passed to the original method
Returns:
@ -289,7 +289,7 @@ class ProcessIterationsWrapper:
# Extract agent information for span naming
agent_name = getattr(instance, "name", instance.__class__.__name__)
span_name = f"{agent_name}.process_iterations"
span_name = f"{agent_name}.conversation"
# Build span attributes
attributes = {
@ -319,15 +319,15 @@ class ProcessIterationsWrapper:
"""
Handle asynchronous process iterations execution with comprehensive span tracing.
Manages async process_iterations execution by creating CHAIN spans with
Manages async conversation execution by creating CHAIN spans with
proper attribute handling, result extraction, and error management for
iterative agent processing workflows.
Args:
wrapped (callable): Original async process_iterations method to execute
wrapped (callable): Original async conversation method to execute
args (tuple): Positional arguments for the wrapped method
kwargs (dict): Keyword arguments for the wrapped method
span_name (str): Name for the created span (e.g., "MyAgent.process_iterations")
span_name (str): Name for the created span (e.g., "MyAgent.conversation")
attributes (dict): Span attributes including agent name and processing context
Returns:
@ -359,17 +359,17 @@ class ProcessIterationsWrapper:
self, wrapped: Any, args: Any, kwargs: Any, span_name: str, attributes: dict
) -> Any:
"""
Handle synchronous process iterations execution with comprehensive span tracing.
Handle synchronous conversation execution with comprehensive span tracing.
Manages sync process_iterations execution by creating CHAIN spans with
Manages sync conversation execution by creating CHAIN spans with
proper attribute handling, result extraction, and error management for
iterative agent processing workflows.
Args:
wrapped (callable): Original sync process_iterations method to execute
wrapped (callable): Original sync conversation method to execute
args (tuple): Positional arguments for the wrapped method
kwargs (dict): Keyword arguments for the wrapped method
span_name (str): Name for the created span (e.g., "MyAgent.process_iterations")
span_name (str): Name for the created span (e.g., "MyAgent.conversation")
attributes (dict): Span attributes including agent name and processing context
Returns:

View File

@ -1,4 +1,5 @@
import logging
import asyncio
from typing import Any, Dict
from ..constants import (
@ -202,11 +203,14 @@ class WorkflowMonitorWrapper:
"""
Wrap WorkflowApp.run_and_monitor_workflow_async with AGENT span tracing.
Creates the top-level AGENT span for DurableAgent workflow execution and
stores the global workflow context immediately for task correlation.
Args:
wrapped: Original WorkflowApp.run_and_monitor_workflow_async method
instance: WorkflowApp instance (DurableAgent)
args: Positional arguments (workflow, input)
kwargs: Keyword arguments
wrapped (callable): Original WorkflowApp.run_and_monitor_workflow_async method
instance (Any): WorkflowApp instance (DurableAgent)
args (tuple): Positional arguments for the wrapped method
kwargs (dict): Keyword arguments for the wrapped method
Returns:
Any: Result from wrapped method execution (workflow output)
@ -217,9 +221,58 @@ class WorkflowMonitorWrapper:
):
return wrapped(*args, **kwargs)
# Extract arguments
arguments = bind_arguments(wrapped, *args, **kwargs)
workflow = arguments.get("workflow")
workflow_name = self._extract_workflow_name(args, kwargs)
# Extract agent name from the instance
agent_name = getattr(instance, "name", "DurableAgent")
attributes = self._build_workflow_attributes(
workflow_name, agent_name, args, kwargs
)
# Store global context immediately when this method is called
# This ensures workflow tasks can access it before async execution begins
try:
from ..context_propagation import extract_otel_context
from ..context_storage import store_workflow_context
captured_context = extract_otel_context()
if captured_context.get("traceparent"):
logger.debug(
f"Captured traceparent: {captured_context.get('traceparent')}"
)
store_workflow_context("__global_workflow_context__", captured_context)
else:
logger.warning(
"No traceparent found in captured context during __call__"
)
except Exception as e:
logger.warning(f"Failed to capture/store workflow context in __call__: {e}")
# Handle async vs sync execution
if asyncio.iscoroutinefunction(wrapped):
return self._handle_async_execution(
wrapped, args, kwargs, attributes, workflow_name, agent_name
)
else:
return self._handle_sync_execution(
wrapped, args, kwargs, attributes, workflow_name, agent_name
)
def _extract_workflow_name(self, args: Any, kwargs: Any) -> str:
"""
Extract workflow name from method arguments.
Args:
args: Positional arguments
kwargs: Keyword arguments
Returns:
str: Workflow name
"""
if args and len(args) > 0:
workflow = args[0]
else:
workflow = kwargs.get("workflow")
# Extract workflow name
workflow_name = (
@ -228,47 +281,42 @@ class WorkflowMonitorWrapper:
else getattr(workflow, "__name__", "unknown_workflow")
)
# Build span attributes
attributes = self._build_monitor_attributes(instance, workflow_name, arguments)
return workflow_name
# Debug logging to confirm wrapper is being called
logger.debug(
f"🔍 WorkflowMonitorWrapper creating AGENT span for workflow: {workflow_name}"
)
# Handle async execution
return self._handle_async_execution(
wrapped, args, kwargs, attributes, workflow_name
)
def _build_monitor_attributes(
self, instance: Any, workflow_name: str, arguments: Dict[str, Any]
def _build_workflow_attributes(
self, workflow_name: str, agent_name: str, args: Any, kwargs: Any
) -> Dict[str, Any]:
"""
Build span attributes for DurableAgent workflow monitoring.
Build span attributes for workflow execution.
Args:
instance: WorkflowApp instance (DurableAgent)
workflow_name: Name of the workflow being monitored
arguments: Bound method arguments from the wrapped call
workflow_name: Name of the workflow
agent_name: Name of the agent
args: Positional arguments
kwargs: Keyword arguments
Returns:
Dict[str, Any]: Span attributes for the AGENT span
"""
agent_name = getattr(instance, "name", instance.__class__.__name__)
# Build basic attributes
attributes = {
"openinference.span.kind": "AGENT", # DurableAgent workflow execution is the agent action
INPUT_MIME_TYPE: "application/json",
OUTPUT_MIME_TYPE: "application/json",
"workflow.name": workflow_name,
"workflow.operation": "run_and_monitor",
"agent.execution_mode": "workflow_based",
"agent.name": agent_name,
"agent.type": instance.__class__.__name__,
OUTPUT_MIME_TYPE: "application/json",
}
# Serialize input arguments
attributes[INPUT_VALUE] = safe_json_dumps(arguments)
# Add input payload if available
if args and len(args) > 1:
# Second argument is typically the input
input_data = args[1]
if input_data is not None:
attributes[INPUT_VALUE] = safe_json_dumps(input_data)
attributes[INPUT_MIME_TYPE] = "application/json"
elif "input" in kwargs and kwargs["input"] is not None:
attributes[INPUT_VALUE] = safe_json_dumps(kwargs["input"])
attributes[INPUT_MIME_TYPE] = "application/json"
# Add context attributes
attributes.update(get_attributes_from_context())
@ -282,6 +330,7 @@ class WorkflowMonitorWrapper:
kwargs: Any,
attributes: Dict[str, Any],
workflow_name: str,
agent_name: str,
) -> Any:
"""
Handle async workflow monitoring execution with context propagation.
@ -301,42 +350,12 @@ class WorkflowMonitorWrapper:
span_name = f"Agent.{workflow_name}"
# Debug logging to confirm span creation
logger.debug(f"Creating AGENT span: {span_name}")
logger.debug(f"Creating AGENT span: {span_name}")
with self._tracer.start_as_current_span(
span_name, attributes=attributes
) as span:
try:
# Debug logging to confirm span context
logger.debug(f"📋 AGENT span context: {span.get_span_context()}")
# CRITICAL: Capture and store OpenTelemetry context BEFORE executing workflow
# This must happen before wrapped() is called so tasks can access it
try:
from ..context_propagation import extract_otel_context
from ..context_storage import store_workflow_context
captured_context = extract_otel_context()
if captured_context.get("traceparent"):
logger.debug("🔗 Captured workflow context in AGENT span")
logger.debug(
f"🔗 Traceparent: {captured_context.get('traceparent')}"
)
# Store context IMMEDIATELY for workflow tasks to use
store_workflow_context(
"__global_workflow_context__", captured_context
)
logger.debug(
"🔗 Stored global workflow context for task correlation"
)
else:
logger.warning("⚠️ No traceparent found in captured context")
except Exception as e:
logger.warning(
f"⚠️ Failed to capture/store workflow context: {e}"
)
# Execute workflow and get result
result = await wrapped(*args, **kwargs)
@ -349,17 +368,69 @@ class WorkflowMonitorWrapper:
span.set_attribute(OUTPUT_VALUE, safe_json_dumps(result))
span.set_status(Status(StatusCode.OK))
logger.debug(f"🎯 AGENT span completed successfully: {span_name}")
logger.debug(f"AGENT span completed successfully: {span_name}")
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
logger.error(f"AGENT span failed: {span_name} - {e}")
logger.error(f"AGENT span failed: {span_name} - {e}")
raise
return async_wrapper()
def _handle_sync_execution(
self,
wrapped: Any,
args: Any,
kwargs: Any,
attributes: Dict[str, Any],
workflow_name: str,
agent_name: str,
) -> Any:
"""
Handle synchronous workflow monitoring execution with context propagation.
Args:
wrapped: Original synchronous method to execute
args: Positional arguments for the wrapped method
kwargs: Keyword arguments for the wrapped method
attributes: Pre-built span attributes
workflow_name: Name of the workflow for span naming
Returns:
Any: Result from wrapped method execution
"""
span_name = f"Agent.{workflow_name}"
# Debug logging to confirm span creation
logger.debug(f"Creating AGENT span: {span_name}")
with self._tracer.start_as_current_span(
span_name, attributes=attributes
) as span:
try:
# Execute workflow and get result
result = wrapped(*args, **kwargs)
# Set output attributes - handle both string and object results consistently
if isinstance(result, str):
# If result is already a JSON string, use it directly
span.set_attribute(OUTPUT_VALUE, result)
else:
# If result is an object, serialize it to match input format
span.set_attribute(OUTPUT_VALUE, safe_json_dumps(result))
span.set_status(Status(StatusCode.OK))
logger.debug(f"AGENT span completed successfully: {span_name}")
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
logger.error(f"AGENT span failed: {span_name} - {e}")
raise
# ============================================================================
# Exported Classes

View File

@ -250,7 +250,7 @@ class AgentTool(BaseModel):
if self.args_model:
try:
validated_args = self.args_model(**kwargs)
return validated_args.model_dump()
return validated_args.model_dump(exclude_none=True)
except ValidationError as ve:
logger.debug(f"Validation failed for tool '{self.name}': {ve}")
raise ToolError(f"Validation error in tool '{self.name}': {ve}") from ve

View File

@ -45,52 +45,45 @@ def create_pydantic_model_from_schema(
# Process each property in the schema
for field_name, field_props in properties.items():
# Get field type information
json_type = field_props.get("type", "string")
# Handle complex type definitions (arrays, unions, etc.)
if isinstance(json_type, list):
# Process union types (e.g., ["string", "null"])
has_null = "null" in json_type
non_null_types = [t for t in json_type if t != "null"]
if not non_null_types:
# Only null type specified
field_type = Optional[str]
else:
# Use the first non-null type
# TODO: Proper union type handling would be better but more complex
primary_type = non_null_types[0]
# --- Handle anyOf/oneOf for nullable/union fields ---
if "anyOf" in field_props or "oneOf" in field_props:
variants = field_props.get("anyOf") or field_props.get("oneOf")
types = [v.get("type", "string") for v in variants]
has_null = "null" in types
non_null_variants = [v for v in variants if v.get("type") != "null"]
if non_null_variants:
primary_type = non_null_variants[0].get("type", "string")
field_type = TYPE_MAPPING.get(primary_type, str)
# Make optional if null is included
# Handle array/object with items/properties
if primary_type == "array" and "items" in non_null_variants[0]:
item_type = non_null_variants[0]["items"].get("type", "string")
field_type = List[TYPE_MAPPING.get(item_type, str)]
elif primary_type == "object":
field_type = dict
else:
field_type = str
if has_null:
field_type = Optional[field_type]
else:
# Simple type
# --- Fallback to "type" ---
json_type = field_props.get("type", "string")
field_type = TYPE_MAPPING.get(json_type, str)
# Handle arrays with item type information
if json_type == "array" or (
isinstance(json_type, list) and "array" in json_type
):
# Get the items type if specified
if "items" in field_props:
items_type = field_props["items"].get("type", "string")
if isinstance(items_type, str):
item_py_type = TYPE_MAPPING.get(items_type, str)
field_type = List[item_py_type]
if json_type == "array" and "items" in field_props:
item_type = field_props["items"].get("type", "string")
field_type = List[TYPE_MAPPING.get(item_type, str)]
# Set default value based on required status
if field_name in required:
default = ... # Required field
default = ...
else:
default = None
# Make optional if not already
if not isinstance(field_type, type(Optional)):
if not (
hasattr(field_type, "__origin__")
and field_type.__origin__ is Optional
):
field_type = Optional[field_type]
# Add field with description and default value
field_description = field_props.get("description", "")
fields[field_name] = (
field_type,

View File

@ -288,7 +288,10 @@ class AgenticWorkflow(
store_name=store_name,
key=store_key,
value=json.dumps({}),
state_metadata={"contentType": "application/json"},
state_metadata={
"contentType": "application/json",
"partitionKey": store_key,
},
options=StateOptions(
concurrency=Concurrency.first_write,
consistency=Consistency.strong,
@ -315,7 +318,10 @@ class AgenticWorkflow(
operation_type=TransactionOperationType.upsert,
)
],
transactional_metadata={"contentType": "application/json"},
transactional_metadata={
"contentType": "application/json",
"partitionKey": store_key,
},
)
except Exception as e:
raise e

View File

@ -15,6 +15,7 @@ from dapr.ext.workflow import (
from dapr.ext.workflow.workflow_state import WorkflowState
from durabletask import task as dtask
from pydantic import BaseModel, ConfigDict, Field
from typing import ClassVar
from dapr_agents.agents.base import ChatClientBase
from dapr_agents.types.workflow import DaprWorkflowStatus
@ -31,6 +32,11 @@ class WorkflowApp(BaseModel):
A Pydantic-based class to encapsulate a Dapr Workflow runtime and manage workflows and tasks.
"""
# Class-level constant for instrumentation callback
# This is used to register the workflow instrumentation callback with the instrumentor
# This is needed because the instrumentor is not available immediately when the WorkflowApp is initialized
INSTRUMENTATION_CALLBACK_ATTR: ClassVar[str] = "_workflow_instrumentation_callback"
llm: Optional[ChatClientBase] = Field(
default=None,
description="The default LLM client for tasks that explicitly require an LLM but don't specify one (optional).",
@ -436,6 +442,10 @@ class WorkflowApp(BaseModel):
logger.info("Starting workflow runtime.")
self.wf_runtime.start()
self.wf_runtime_is_running = True
# Apply workflow instrumentation if callback was registered
if hasattr(self.__class__, self.INSTRUMENTATION_CALLBACK_ATTR):
getattr(self.__class__, self.INSTRUMENTATION_CALLBACK_ATTR)()
else:
logger.debug("Workflow runtime already running; skipping.")
@ -465,10 +475,6 @@ class WorkflowApp(BaseModel):
Exception: If workflow execution fails.
"""
try:
# Start Workflow Runtime
if not self.wf_runtime_is_running:
self.start_runtime()
# Generate unique instance ID
instance_id = uuid.uuid4().hex
@ -631,8 +637,6 @@ class WorkflowApp(BaseModel):
raise
finally:
logger.info(f"Finished workflow with Instance ID: {instance_id}.")
# Off-load the stop_runtime call as it may block.
await asyncio.to_thread(self.stop_runtime)
def run_and_monitor_workflow_sync(
self,

View File

@ -1,5 +0,0 @@
FROM squidfunk/mkdocs-material
RUN apk update && \
apk add gcc python3-dev musl-dev linux-headers && \
pip install mkdocs-jupyter

View File

@ -1,220 +0,0 @@
# Agents
Agents in `Dapr Agents` are autonomous systems powered by Large Language Models (LLMs), designed to execute tasks, reason through problems, and collaborate within workflows. Acting as intelligent building blocks, agents seamlessly combine LLM-driven reasoning with tool integration, memory, and collaboration features to enable scalable, production-grade agentic systems.
![](../img/concepts-agents.png)
## Core Features
### 1. LLM Integration
Dapr Agents provides a unified interface to connect with LLM inference APIs. This abstraction allows developers to seamlessly integrate their agents with cutting-edge language models for reasoning and decision-making.
### 2. Structured Outputs
Agents in Dapr Agents leverage structured output capabilities, such as [OpenAIs Function Calling](https://platform.openai.com/docs/guides/function-calling), to generate predictable and reliable results. These outputs follow [JSON Schema Draft 2020-12](https://json-schema.org/draft/2020-12/release-notes.html) and [OpenAPI Specification v3.1.0](https://github.com/OAI/OpenAPI-Specification) standards, enabling easy interoperability and tool integration.
### 3. Tool Selection
Agents dynamically select the appropriate tool for a given task, using LLMs to analyze requirements and choose the best action. This is supported directly through LLM parametric knowledge and enhanced by [Function Calling](https://platform.openai.com/docs/guides/function-calling), ensuring tools are invoked efficiently and accurately.
### 4. MCP Support
Dapr Agents includes built-in support for the [Model Context Protocol (MCP)](https://modelcontextprotocol.io/), enabling agents to dynamically discover and invoke external tools through a standardized interface. Using the provided MCPClient, agents can connect to MCP servers via two transport options: stdio for local development and sse for remote or distributed environments.
Once connected, the MCP client fetches all available tools from the server and prepares them for immediate use within the agents toolset. This allows agents to incorporate capabilities exposed by external processes—such as local Python scripts or remote services without hardcoding or preloading them. Agents can invoke these tools at runtime, expanding their behavior based on whats offered by the active MCP server.
### 5. Memory
Agents retain context across interactions, enhancing their ability to provide coherent and adaptive responses. Memory options range from simple in-memory lists for managing chat history to vector databases for semantic search and retrieval. Dapr Agents also integrates with [Dapr state stores](https://docs.dapr.io/developing-applications/building-blocks/state-management/howto-get-save-state/), enabling scalable and persistent memory for advanced use cases from 28 different state store providers.
### 6. Prompt Flexibility
Dapr Agents supports flexible prompt templates to shape agent behavior and reasoning. Users can define placeholders within prompts, enabling dynamic input of context for inference calls. By leveraging prompt formatting with [Jinja templates](https://jinja.palletsprojects.com/en/stable/templates/), users can include loops, conditions, and variables, providing precise control over the structure and content of prompts. This flexibility ensures that LLM responses are tailored to the task at hand, offering modularity and adaptability for diverse use cases.
### 7. Agent Services
Agents are exposed as independent services using [FastAPI and Dapr applications](https://docs.dapr.io/developing-applications/sdks/python/python-sdk-extensions/python-fastapi/). This modular approach separates the agents logic from its service layer, enabling seamless reuse, deployment, and integration into multi-agent systems.
### 8. Message-Driven Communication
Agents collaborate through [Pub/Sub messaging](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-overview/), enabling event-driven communication and task distribution. This message-driven architecture allows agents to work asynchronously, share updates, and respond to real-time events, ensuring effective collaboration in distributed systems.
### 9. Workflow Orchestration
Dapr Agents supports both deterministic and event-driven workflows to manage multi-agent systems via [Dapr Workflows](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/). Deterministic workflows provide clear, repeatable processes, while event-driven workflows allow for dynamic, adaptive collaboration between agents in centralized or decentralized architectures.
## Agent Types
Dapr Agents provides two agent types, each designed for different use cases:
### Agent
The `Agent` class is a conversational agent that manages tool calls and conversations using a language model. It provides immediate, synchronous execution with built-in conversation memory and tool integration capabilities.
**Key Characteristics:**
- Synchronous execution with immediate responses
- Built-in conversation memory and tool history tracking
- Iterative conversation processing with max iteration limits
- Direct tool execution and result processing
- Graceful shutdown support with cancellation handling
**When to use:**
- Building conversational assistants that need immediate responses
- Scenarios requiring real-time tool execution and conversation flow
- When you need direct control over the conversation loop
- Quick prototyping and development of agent interactions
**Example Usage:**
```python
from dapr_agents import Agent
from dapr_agents.llm.dapr import DaprChatClient
from dapr_agents.memory import ConversationDaprStateMemory
# Create an incident intake agent
agent = Agent(
name="IncidentIntakeBot",
role="Incident Reporting Assistant",
instructions=[
"Collect detailed operational incident information",
"Retain user inputs across sessions for audit and recovery",
"Use memory to guide follow-up questions based on previous input",
"Update incident records as new details are provided",
],
tools=[incident_lookup_tool, escalate_incident_tool, update_incident_tool],
)
# Conversation history is preserved across calls
agent.run("first input")
agent.run("second input")
```
### DurableAgent
The `DurableAgent` class is a workflow-based agent that extends the standard Agent with Dapr Workflows for long-running, fault-tolerant, and durable execution. It provides persistent state management, automatic retry mechanisms, and deterministic execution across failures.
**Key Characteristics:**
- Workflow-based execution using Dapr Workflows
- Persistent workflow state management across sessions and failures
- Automatic retry and recovery mechanisms
- Deterministic execution with checkpointing
- Built-in message routing and agent communication
- Supports complex orchestration patterns and multi-agent collaboration
**When to use:**
- Multi-step workflows that span time or systems
- Tasks requiring guaranteed progress tracking and state persistence
- Scenarios where operations may pause, fail, or need recovery without data loss
- Complex agent orchestration and multi-agent collaboration
- Production systems requiring fault tolerance and scalability
**Example Usage:**
```python
from dapr_agents import DurableAgent
from dapr_agents.llm.dapr import DaprChatClient
from dapr_agents.memory import ConversationDaprStateMemory
# Create an onboarding workflow agent
durable_agent = DurableAgent(
name="OnboardingWorkflowBot",
role="Employee Onboarding Coordinator",
instructions=[
"Guide and automate multi-step onboarding processes",
"Track progress and retain state across sessions and failures",
"Coordinate with tools to provision accounts, access, and resources",
],
llm=OpenAIChatClient(),
tools=[
provision_email_account,
setup_github_access,
assign_kubernetes_namespace,
configure_slack_workspace,
request_hardware_kit,
],
message_bus_name="messagepubsub",
state_store_name="workflowstatestore",
state_key="workflow_state",
agents_registry_store_name="agentstatestore",
agents_registry_key="agents_registry",
),
```
## Agent Patterns
In Dapr Agents, Agent Patterns define the built-in loops that allow agents to dynamically handle tasks. These patterns enable agents to iteratively reason, act, and adapt, making them flexible and capable problem-solvers. By embedding these patterns, Dapr Agents ensures agents can independently complete tasks without requiring external orchestration.
### Tool Calling
Tool Calling is an essential pattern in autonomous agent design, allowing AI agents to interact dynamically with external tools based on user input. One reliable method for enabling this is through [OpenAI's Function Calling](https://platform.openai.com/docs/guides/function-calling) capability. This feature allows developers to describe functions to models trained to generate structured JSON objects containing the necessary arguments for tool execution, based on user queries.
#### How It Works
![](../img/concepts_agents_toolcall_flow.png)
1. The user submits a query specifying a task and the available tools.
2. The LLM analyzes the query and selects the right tool for the task.
3. The LLM provides a structured JSON output containing the tools unique ID, name, and arguments.
4. The AI agent parses the JSON, executes the tool with the provided arguments, and sends the results back as a tool message.
5. The LLM then summarizes the tool's execution results within the users context to deliver a comprehensive final response.
!!! info
Steps 2-4 can be repeated multiple times, depending on the task's complexity.
This pattern is highly flexible and supports multiple iterations of tool selection and execution, empowering agents to handle dynamic and multi-step tasks more effectively.
### ReAct
The [ReAct (Reason + Act)](https://arxiv.org/pdf/2210.03629.pdf) pattern was introduced in 2022 to enhance the capabilities of LLM-based AI agents by combining reasoning with action. This approach allows agents not only to reason through complex tasks but also to interact with the environment, taking actions based on their reasoning and observing the outcomes. ReAct enables AI agents to dynamically adapt to tasks by reasoning about the next steps and executing actions in real time.
#### How It Works
![](../img/concepts_agents_react_flow.png)
* **Thought (Reasoning)**: The agent analyzes the situation and generates a thought or a plan based on the input.
* **Action**: The agent takes an action based on its reasoning.
* **Observation**: After the action is executed, the agent observes the results or feedback from the environment, assessing the effectiveness of its action.
!!! info
These steps create a cycle that allows the agent to continuously think, act, and learn from the results.
The ReAct pattern gives the agent the flexibility to adapt based on task complexity:
* Deep reasoning tasks: The agent goes through multiple cycles of thought, action, and observation to refine its responses.
* Action-driven tasks: The agent focuses more on timely actions and thinks critically only at key decision points.
ReAct empowers agents to navigate complex, real-world environments efficiently, making them better suited for scenarios that require both deep reasoning and timely actions.
## Agent Collaboration using Workflows
While patterns empower individual agents, workflows enable the coordination of multiple agents to achieve shared goals. In Dapr Agents, workflows serve as a higher-level framework for organizing how agents collaborate and distribute tasks.
Workflows can orchestrate agents, each equipped with their own built-in patterns, to handle different parts of a larger process. For example, one agent might gather data using tools, another might analyze the results, and a third might generate a report. The workflow manages the communication and sequencing between these agents, ensuring smooth collaboration.
Interestingly, workflows can also define loops similar to agent patterns. Instead of relying on an agents built-in tool-calling loop, you can design workflows to orchestrate tool usage, reasoning, and action. This gives you the flexibility to use workflows to define both multi-agent collaboration and complex task handling for a single agent.
### Random Workflow
In a Random Workflow, the next agent to handle a task is selected randomly. This approach:
* Encourages diversity in agent responses and strategies.
* Simplifies orchestration in cases where task assignment does not depend on specific agent roles or expertise.
* Random workflows are particularly useful for exploratory tasks or brainstorming scenarios where agent collaboration benefits from randomness.
### Round Robin Workflow
The Round Robin Workflow assigns tasks sequentially to agents in a fixed order. This method:
* Ensures equal participation among agents.
* Is ideal for scenarios requiring predictable task distribution, such as routine monitoring or repetitive processes.
* For example, in a team of monitoring agents, each agent takes turns analyzing incoming data streams in a predefined order.
### LLM-Based Workflow
The LLM-Based Workflow relies on the reasoning capabilities of an LLM to dynamically choose the next agent based on:
* Task Context: The nature and requirements of the current task.
* Chat History: Previous agent responses and interactions.
* Agent Metadata: Attributes like expertise, availability, or priorities.
This approach ensures that the most suitable agent is selected for each task, optimizing collaboration and efficiency. For example, in a multi-agent customer support system, the LLM can assign tasks to agents based on customer issues, agent expertise, and workload distribution.

View File

@ -1,167 +0,0 @@
# Arxiv Fetcher
The Arxiv Fetcher module in `Dapr Agents` provides a powerful interface to interact with the [arXiv API](https://info.arxiv.org/help/api/index.html). It is designed to help users programmatically search for, retrieve, and download scientific papers from arXiv. With advanced querying capabilities, metadata extraction, and support for downloading PDF files, the Arxiv Fetcher is ideal for researchers, developers, and teams working with academic literature.
## Why Use the Arxiv Fetcher?
The Arxiv Fetcher simplifies the process of accessing research papers, offering features like:
* **Automated Literature Search**: Query arXiv for specific topics, keywords, or authors.
* **Metadata Retrieval**: Extract structured metadata, such as titles, abstracts, authors, categories, and submission dates.
* **Precise Filtering**: Limit search results by date ranges (e.g., retrieve the latest research in a field).
* **PDF Downloading**: Fetch full-text PDFs of papers for offline use.
## How to Use the Arxiv Fetcher
### Step 1: Install Required Modules
!!! info
The Arxiv Fetcher relies on a [lightweight Python wrapper](https://github.com/lukasschwab/arxiv.py) for the arXiv API, which is not included in the Dapr Agents core module. This design choice helps maintain modularity and avoids adding unnecessary dependencies for users who may not require this functionality. To use the Arxiv Fetcher, ensure you install the [library](https://pypi.org/project/arxiv/) separately.
```python
pip install arxiv
```
### Step 2: Initialize the Fetcher
Set up the `ArxivFetcher` to begin interacting with the arXiv API.
```python
from dapr_agents.document import ArxivFetcher
# Initialize the fetcher
fetcher = ArxivFetcher()
```
### Step 3: Perform Searches
**Basic Search by Query String**
Search for papers using simple keywords. The results are returned as Document objects, each containing:
* `text`: The abstract of the paper.
* `metadata`: Structured metadata such as title, authors, categories, and submission dates.
```python
# Search for papers related to "machine learning"
results = fetcher.search(query="machine learning", max_results=5)
# Display metadata and summaries
for doc in results:
print(f"Title: {doc.metadata['title']}")
print(f"Authors: {', '.join(doc.metadata['authors'])}")
print(f"Summary: {doc.text}\n")
```
**Advanced Querying**
Refine searches using logical operators like AND, OR, and NOT or perform field-specific searches, such as by author.
Examples:
Search for papers on "agents" and "cybersecurity":
```python
results = fetcher.search(query="all:(agents AND cybersecurity)", max_results=10)
```
Exclude specific terms (e.g., "quantum" but not "computing"):
```python
results = fetcher.search(query="all:(quantum NOT computing)", max_results=10)
```
Search for papers by a specific author:
```python
results = fetcher.search(query='au:"John Doe"', max_results=10)
```
**Filter Papers by Date**
Limit search results to a specific time range, such as papers submitted in the last 24 hours.
```python
from datetime import datetime, timedelta
# Calculate the date range
last_24_hours = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
today = datetime.now().strftime("%Y%m%d")
# Search for recent papers
recent_results = fetcher.search(
query="all:(agents AND cybersecurity)",
from_date=last_24_hours,
to_date=today,
max_results=5
)
# Display metadata
for doc in recent_results:
print(f"Title: {doc.metadata['title']}")
print(f"Authors: {', '.join(doc.metadata['authors'])}")
print(f"Published: {doc.metadata['published']}")
print(f"Summary: {doc.text}\n")
```
### Step 4: Download PDFs
Fetch the full-text PDFs of papers for offline use. Metadata is preserved alongside the downloaded files.
```python
import os
from pathlib import Path
# Create a directory for downloads
os.makedirs("arxiv_papers", exist_ok=True)
# Download PDFs
download_results = fetcher.search(
query="all:(agents AND cybersecurity)",
max_results=5,
download=True,
dirpath=Path("arxiv_papers")
)
for paper in download_results:
print(f"Downloaded Paper: {paper['title']}")
print(f"File Path: {paper['file_path']}\n")
```
### Step 5: Extract and Process PDF Content
Use `PyPDFReader` from `Dapr Agents` to extract content from downloaded PDFs. Each page is treated as a separate Document object with metadata.
```python
from pathlib import Path
from dapr_agents.document import PyPDFReader
reader = PyPDFReader()
docs_read = []
for paper in download_results:
local_pdf_path = Path(paper["file_path"])
documents = reader.load(local_pdf_path, additional_metadata=paper)
docs_read.extend(documents)
# Verify results
print(f"Extracted {len(docs_read)} documents.")
print(f"First document text: {docs_read[0].text}")
print(f"Metadata: {docs_read[0].metadata}")
```
## Practical Applications
The Arxiv Fetcher enables various use cases for researchers and developers:
* Literature Reviews: Quickly retrieve and organize relevant papers on a given topic or by a specific author.
* Trend Analysis: Identify the latest research in a domain by filtering for recent submissions.
* Offline Research Workflows: Download and process PDFs for local analysis and archiving.
## Next Steps
While the Arxiv Fetcher provides robust functionality for retrieving and processing research papers, its output can be integrated into advanced workflows:
* Building a Searchable Knowledge Base: Combine fetched papers with tools like text splitting and vector embeddings for advanced search capabilities.
* Retrieval-Augmented Generation (RAG): Use processed papers as inputs for RAG pipelines to power question-answering systems.
* Automated Literature Surveys: Generate summaries or insights based on the fetched and processed research.

View File

@ -1,53 +0,0 @@
# Messaging
Messaging is how agents communicate, collaborate, and adapt in workflows. It enables them to share updates, execute tasks, and respond to events seamlessly. Messaging is one of the main components of `event-driven` agentic workflows, ensuring tasks remain scalable, adaptable, and decoupled. Built entirely around the `Pub/Sub (publish/subscribe)` model, messaging leverages a message bus to facilitate communication across agents, services, and workflows.
## Key Role of Messaging in Agentic Workflows
Messaging connects agents in workflows, enabling real-time communication and coordination. It acts as the backbone of event-driven interactions, ensuring that agents work together effectively without requiring direct connections.
Through messaging, agents can:
* **Collaborate Across Tasks**: Agents exchange messages to share updates, broadcast events, or deliver task results.
* **Orchestrate Workflows**: Tasks are triggered and coordinated through published messages, enabling workflows to adjust dynamically.
* **Respond to Events**: Agents adapt to real-time changes by subscribing to relevant topics and processing events as they occur.
By using messaging, workflows remain modular and scalable, with agents focusing on their specific roles while seamlessly participating in the broader system.
## How Messaging Works
Messaging relies on the `Pub/Sub` model, which organizes communication into topics. These topics act as channels where agents can publish and subscribe to messages, enabling efficient and decoupled communication.
### Message Bus and Topics
The message bus serves as the central system that manages topics and message delivery. Agents interact with the message bus to send and receive messages:
* **Publishing Messages**: Agents publish messages to a specific topic, making the information available to all subscribed agents.
* **Subscribing to Topics**: Agents subscribe to topics relevant to their roles, ensuring they only receive the messages they need.
* **Broadcasting Updates**: Multiple agents can subscribe to the same topic, allowing them to act on shared events or updates.
### Scalability and Adaptability
The message bus ensures that communication scales effortlessly, whether you are adding new agents, expanding workflows, or adapting to changing requirements. Agents remain loosely coupled, allowing workflows to evolve without disruptions.
## Messaging in Event-Driven Workflows
Event-driven workflows depend on messaging to enable dynamic and real-time interactions. Unlike deterministic workflows, which follow a fixed sequence of tasks, event-driven workflows respond to the messages and events flowing through the system.
* **Real-Time Triggers**: Agents can initiate tasks or workflows by publishing specific events.
* **Asynchronous Execution**: Tasks are coordinated through messages, allowing agents to operate independently and in parallel.
* **Dynamic Adaptation**: Agents adjust their behavior based on the messages they receive, ensuring workflows remain flexible and resilient.
## Why Pub/Sub Messaging for Agentic Workflows?
Pub/Sub messaging is essential for event-driven agentic workflows because it:
* **Decouples Components**: Agents publish messages without needing to know which agents will receive them, promoting modular and scalable designs.
* **Enables Real-Time Communication**: Messages are delivered as events occur, allowing agents to react instantly.
* **Fosters Collaboration**: Multiple agents can subscribe to the same topic, making it easy to share updates or divide responsibilities.
This messaging framework ensures that agents operate efficiently, workflows remain flexible, and systems can scale dynamically.
## Conclusion
Messaging is the backbone of event-driven agentic workflows. By leveraging a robust Pub/Sub model, agents communicate efficiently, adapt dynamically, and collaborate seamlessly. This foundation ensures that workflows scale, evolve, and respond in real time, empowering agents to achieve their goals in a shared, dynamic environment.

View File

@ -1,136 +0,0 @@
# Text Splitter
The Text Splitter module is a foundational tool in `Dapr Agents` designed to preprocess documents for use in [Retrieval-Augmented Generation (RAG)](https://en.wikipedia.org/wiki/Retrieval-augmented_generation) workflows and other `in-context learning` applications. Its primary purpose is to break large documents into smaller, meaningful chunks that can be embedded, indexed, and efficiently retrieved based on user queries.
By focusing on manageable chunk sizes and preserving contextual integrity through overlaps, the Text Splitter ensures documents are processed in a way that supports downstream tasks like question answering, summarization, and document retrieval.
## Why Use a Text Splitter?
When building RAG pipelines, splitting text into smaller chunks serves these key purposes:
* Enabling Effective Indexing: Chunks are embedded and stored in a vector database, making them retrievable based on similarity to user queries.
* Maintaining Semantic Coherence: Overlapping chunks help retain context across splits, ensuring the system can connect related pieces of information.
* Handling Model Limitations: Many models have input size limits. Splitting ensures text fits within these constraints while remaining meaningful.
This step is crucial for preparing knowledge to be embedded into a searchable format, forming the backbone of retrieval-based workflows.
## Strategies for Text Splitting
The Text Splitter supports multiple strategies to handle different types of documents effectively. These strategies balance the size of each chunk with the need to maintain context.
### 1. Character-Based Length
* How It Works: Counts the number of characters in each chunk.
* Use Case: Simple and effective for text splitting without dependency on external tokenization tools.
Example:
```python
from dapr_agents.document.splitter.text import TextSplitter
# Character-based splitter (default)
splitter = TextSplitter(chunk_size=1024, chunk_overlap=200)
```
### 2. Token-Based Length
* How It Works: Counts tokens, which are the semantic units used by language models (e.g., words or subwords).
* Use Case: Ensures compatibility with models like GPT, where token limits are critical.
**Example**:
```python
import tiktoken
from dapr_agents.document.splitter.text import TextSplitter
enc = tiktoken.get_encoding("cl100k_base")
def length_function(text: str) -> int:
return len(enc.encode(text))
splitter = TextSplitter(
chunk_size=1024,
chunk_overlap=200,
chunk_size_function=length_function
)
```
The flexibility to define the chunk size function makes the Text Splitter adaptable to various scenarios.
## Chunk Overlap
To preserve context, the Text Splitter includes a chunk overlap feature. This ensures that parts of one chunk carry over into the next, helping maintain continuity when chunks are processed sequentially.
Example:
* With `chunk_size=1024` and `chunk_overlap=200`, the last `200` tokens or characters of one chunk appear at the start of the next.
* This design helps in tasks like text generation, where maintaining context across chunks is essential.
## How to Use the Text Splitter
Heres a practical example of using the Text Splitter to process a PDF document:
## Step 1: Load a PDF
```python
import requests
from pathlib import Path
# Download PDF
pdf_url = "https://arxiv.org/pdf/2412.05265.pdf"
local_pdf_path = Path("arxiv_paper.pdf")
if not local_pdf_path.exists():
response = requests.get(pdf_url)
response.raise_for_status()
with open(local_pdf_path, "wb") as pdf_file:
pdf_file.write(response.content)
```
### Step 2: Read the Document
For this example, we use Dapr Agents' `PyPDFReader`.
!!! info
The PyPDF Reader relies on the [pypdf python library](https://pypi.org/project/pypdf/), which is not included in the Dapr Agents core module. This design choice helps maintain modularity and avoids adding unnecessary dependencies for users who may not require this functionality. To use the PyPDF Reader, ensure that you install the library separately.
```python
pip install pypdf
```
Then, initialize the reader to load the PDF file.
```python
from dapr_agents.document.reader.pdf.pypdf import PyPDFReader
reader = PyPDFReader()
documents = reader.load(local_pdf_path)
```
### Step 3: Split the Document
```python
splitter = TextSplitter(
chunk_size=1024,
chunk_overlap=200,
chunk_size_function=length_function
)
chunked_documents = splitter.split_documents(documents)
```
### Step 4: Analyze Results
```python
print(f"Original document pages: {len(documents)}")
print(f"Total chunks: {len(chunked_documents)}")
print(f"First chunk: {chunked_documents[0]}")
```
## Key Features
* Hierarchical Splitting: Splits text by separators (e.g., paragraphs), then refines chunks further if needed.
* Customizable Chunk Size: Supports character-based and token-based length functions.
* Overlap for Context: Retains portions of one chunk in the next to maintain continuity.
* Metadata Preservation: Each chunk retains metadata like page numbers and start/end indices for easier mapping.
By understanding and leveraging the `Text Splitter`, you can preprocess large documents effectively, ensuring they are ready for embedding, indexing, and retrieval in advanced workflows like RAG pipelines.

View File

@ -8,20 +8,41 @@ This project uses modern Python packaging with `pyproject.toml`. Dependencies ar
- Test dependencies are in `[project.optional-dependencies.test]`
- Development dependencies are in `[project.optional-dependencies.dev]`
### Working within a virtual environment
Create your python virtual environment:
```bash
python -m venv venv
source venv/bin/activate
```
### Generating Requirements Files
If you need to generate requirements files (e.g., for deployment or specific environments):
#### Option 1 - Using pip-tools:
```bash
# Install dev tools
pip install -e ".[dev]"
# Generate requirements.txt
pip-compile pyproject.toml
# Generate dev-requirements.txt
pip-compile pyproject.toml --extra dev
pip-compile pyproject.toml # --extra dev
```
#### Option 2 - Using uv:
```bash
# Generate lock file with all dependencies
uv lock --all-extras
# Install everything from lock file
uv sync --all-extras
```
### Installing Dependencies
#### Option 1 - Using pip:
```bash
# Install main package with test dependencies
pip install -e ".[test]"
@ -33,6 +54,32 @@ pip install -e ".[dev]"
pip install -e ".[test,dev]"
```
#### Option 2 - Using uv:
```bash
# Install main package with test dependencies
uv sync --extra=test
# Install main package with development dependencies
uv sync --extra=dev
# Install main package with all optional dependencies
uv sync --all-extras
# Install in editable mode with all extras
uv sync --all-extras --editable
```
## Command Mapping
| pip/pip-tools command | uv equivalent |
|----------------------|---------------|
| `pip-compile pyproject.toml` | `uv lock` |
| `pip-compile --all-extras` | `uv lock` (automatic) |
| `pip install -r requirements.txt` | `uv sync` |
| `pip install -e .` | `uv sync --editable` |
| `pip install -e ".[dev]"` | `uv sync --extra=dev` |
| `pip install -e ".[test,dev]"` | `uv sync --all-extras` |
## Testing
The project uses pytest for testing. To run tests:
@ -65,9 +112,12 @@ tox -e type
## Development Workflow
### Option 1 - Using pip:
1. Install development dependencies:
```bash
pip install -e ".[dev]"
# Alternatively, you can use uv with:
# uv sync --extra=dev
```
2. Run tests before making changes:

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.7 KiB

View File

@ -1,123 +0,0 @@
# Installation
## Install Dapr Agents
!!! info
make sure you have Python already installed. `Python >=3.10`
### As a Python package using Pip
```bash
pip install dapr-agents
```
## Install Dapr CLI
Install the Dapr CLI to manage Dapr-related tasks like running applications with sidecars, viewing logs, and launching the Dapr dashboard. It works seamlessly with both self-hosted and Kubernetes environments. For a complete step-by-step guide, visit the official [Dapr CLI installation page](https://docs.dapr.io/getting-started/install-dapr-cli/).
Verify the CLI is installed by restarting your terminal/command prompt and running the following:
```bash
dapr -h
```
## Initialize Dapr in Local Mode
!!! info
Make sure you have [Docker](https://docs.docker.com/get-started/get-docker/) already installed. I use [Docker Desktop](https://www.docker.com/products/docker-desktop/).
Initialize Dapr locally to set up a self-hosted environment for development. This process fetches and installs the Dapr sidecar binaries, runs essential services as Docker containers, and prepares a default components folder for your application. For detailed steps, see the official [guide on initializing Dapr locally](https://docs.dapr.io/getting-started/install-dapr-selfhost/).
![](../img/home_installation_init.png)
To initialize the Dapr control plane containers and create a default configuration file, run:
```bash
dapr init
```
Verify you have container instances with `daprio/dapr`, `openzipkin/zipkin`, and `redis` images running:
```bash
docker ps
```
## Enable Redis Insights
Dapr uses [Redis](https://docs.dapr.io/reference/components-reference/supported-state-stores/setup-redis/) by default for state management and pub/sub messaging, which are fundamental to Dapr Agents's agentic workflows. These capabilities enable the following:
* Viewing Pub/Sub Messages: Monitor and inspect messages exchanged between agents in event-driven workflows.
* Inspecting State Information: Access and analyze shared state data among agents.
* Debugging and Monitoring Events: Track workflow events in real time to ensure smooth operations and identify issues.
To make these insights more accessible, you can leverage Redis Insight.
```bash
docker run --rm -d --name redisinsight -p 5540:5540 redis/redisinsight:latest
```
Once running, access the Redis Insight interface at `http://localhost:5540/`
### Connection Configuration
* Port: 6379
* Host (Linux): 172.17.0.1
* Host (Windows/Mac): host.docker.internal (example `host.docker.internal:6379`)
Redis Insight makes it easy to visualize and manage the data powering your agentic workflows, ensuring efficient debugging, monitoring, and optimization.
![](../img/home_installation_redis_dashboard.png)
## Using custom endpoints
### Azure hosted OpenAI endpoint
In order to use Azure OpenAI for the model you'll need the following `.env` file:
```env
AZURE_OPENAI_API_KEY=your_custom_key
AZURE_OPENAI_ENDPOINT=your_custom_endpoint
AZURE_OPENAI_DEPLOYMENT=your_custom_model
AZURE_OPENAI_API_VERSION="azure_openai_api_version"
```
**NB!** the `AZURE_OPENAI_DEPLOYMENT` refers to the _model_, e.g., `gpt-4o`. `AZURE_OPENAI_API_VERSION` has been tested to work against `2024-08-01-preview`.
Then instantiate the agent(s) as well as the orchestrator as follows:
```python
from dapr_agents import DurableAgent, OpenAIChatClient
from dotenv import load_dotenv
import asyncio
import logging
import os
async def main():
llm = OpenAIChatClient(
api_key=os.getenv("AZURE_OPENAI_API_KEY"),
azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT"),
api_version=os.getenv("AZURE_OPENAI_API_VERSION")
)
try:
elf_service = DurableAgent(
name="Legolas", role="Elf",
goal="Act as a scout, marksman, and protector, using keen senses and deadly accuracy to ensure the success of the journey.",
instructions=[
"Speak like Legolas, with grace, wisdom, and keen observation.",
"Be swift, silent, and precise, moving effortlessly across any terrain.",
"Use superior vision and heightened senses to scout ahead and detect threats.",
"Excel in ranged combat, delivering pinpoint arrow strikes from great distances.",
"Respond concisely, accurately, and relevantly, ensuring clarity and strict alignment with the task."],
llm=llm, # Note the explicit reference to the above OpenAIChatClient
message_bus_name="messagepubsub",
state_store_name="workflowstatestore",
state_key="workflow_state",
agents_registry_store_name="agentstatestore",
agents_registry_key="agents_registry",
)
...
```
The above is taken from [multi-agent quick starter](https://github.com/dapr/dapr-agents/blob/main/quickstarts/05-multi-agent-workflow-dapr-workflows/services/elf/app.py#L1-L23).

View File

@ -1,83 +0,0 @@
# Core Principles
![](../img/concepts-agents-overview.png)
## 1. Agent-Centric Design
Dapr Agents is designed to place agents, powered by LLMs, at the core of task execution and workflow orchestration. This principle emphasizes:
* LLM-Powered Agents: Dapr Agents enables the creation of agents that leverage LLMs for reasoning, dynamic decision-making, and natural language interactions.
* Adaptive Task Handling: Agents in Dapr Agents are equipped with flexible patterns like tool calling and reasoning loops (e.g., ReAct), allowing them to autonomously tackle complex and evolving tasks.
* Seamless Integration: Dapr Agents framework allows agents to act as modular, reusable building blocks that integrate seamlessly into workflows, whether they operate independently or collaboratively.
While Dapr Agents centers around agents, it also recognizes the versatility of using LLMs directly in deterministic workflows or simpler task sequences. In scenarios where the agent's built-in task-handling patterns, like `tool calling` or `ReAct` loops, are unnecessary, LLMs can act as core components for reasoning and decision-making. This flexibility ensures users can adapt Dapr Agents to suit diverse needs without being confined to a single approach.
!!! info
Agents are not standalone; they are building blocks in larger, orchestrated workflows.
## 2. Decoupled Infrastructure Design
Dapr Agents ensures a clean separation between agents and the underlying infrastructure, emphasizing simplicity, scalability, and adaptability:
* Agent Simplicity: Agents focus purely on reasoning and task execution, while Pub/Sub messaging, routing, and validation are managed externally by modular infrastructure components.
* Scalable and Adaptable Systems: By offloading non-agent-specific responsibilities, Dapr Agents allows agents to scale independently and adapt seamlessly to new use cases or integrations.
!!! info
Decoupling infrastructure keeps agents focused on tasks while enabling seamless scalability and integration across systems.
![](../img/home_concepts_principles_decoupled.png)
## 3. Modular Component Model
Dapr Agents utilizes [Dapr's pluggable component framework](https://docs.dapr.io/concepts/components-concept/) and building blocks to simplify development and enhance flexibility:
* Building Blocks for Core Functionality: Dapr provides API building blocks, such as Pub/Sub messaging, state management, service invocation, and more, to address common microservice challenges and promote best practices.
* Interchangeable Components: Each building block operates on swappable components (e.g., Redis, Kafka, Azure CosmosDB), allowing you to replace implementations without changing application code.
* Seamless Transitions: Develop locally with default configurations and deploy effortlessly to cloud environments by simply updating component definitions.
* Scalable Foundations: Build resilient and adaptable architectures using Daprs modular, production-ready building blocks.
!!! info
Developers can easily switch between different components (e.g., Redis to DynamoDB) based on their deployment environment, ensuring portability and adaptability.
![](../img/home_concepts_principles_modular.png)
## 4. Actor-Based Model for Agents
Dapr Agents leverages [Daprs Virtual Actor model](https://docs.dapr.io/developing-applications/building-blocks/actors/actors-overview/) to enable agents to function efficiently and flexibly within distributed environments. Each agent in Dapr Agents is instantiated as an instance of a class, wrapped and managed by a virtual actor. This design offers:
* Stateful Agents: Virtual actors allow agents to store and recall information across tasks, maintaining context and continuity for workflows.
* Dynamic Lifecycle Management: Virtual actors are automatically instantiated when invoked and deactivated when idle. This eliminates the need for explicit creation or cleanup, ensuring resource efficiency and simplicity.
* Location Transparency: Agents can be accessed and operate seamlessly, regardless of where they are located in the system. The underlying runtime handles their mobility, enabling fault-tolerance and dynamic load balancing.
* Scalable Execution: Agents process one task at a time, avoiding concurrency issues, and scale dynamically across nodes to meet workload demands.
This model ensures agents remain focused on their core logic, while the infrastructure abstracts complexities like state management, fault recovery, and resource optimization.
!!! info
Dapr Agents use of virtual actors makes agents always addressable and highly scalable, enabling them to operate reliably and efficiently in distributed, high-demand environments.
## 5. Message-Driven Communication
Dapr Agents emphasizes the use of Pub/Sub messaging for event-driven communication between agents. This principle ensures:
* Decoupled Architecture: Asynchronous communication for scalability and modularity.
* Real-Time Adaptability: Agents react dynamically to events for faster, more flexible task execution.
* Seamless Collaboration: Agents share updates, distribute tasks, and respond to events in a highly coordinated way.
!!! info
Pub/Sub messaging serves as the backbone for Dapr Agents event-driven workflows, enabling agents to communicate and collaborate in real time.
![](../img/home_concepts_principles_message.png)
## 6. Workflow-Oriented Design
Dapr Agents embraces workflows as a foundational concept, integrating [Dapr Workflows](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/) to support both deterministic and event-driven task orchestration. This dual approach enables robust and adaptive systems:
* Deterministic Workflows: Dapr Agents uses Dapr Workflows for stateful, predictable task sequences. These workflows ensure reliable execution, fault tolerance, and state persistence, making them ideal for structured, multi-step processes that require clear, repeatable logic.
* Event-Driven Workflows: By combining Dapr Workflows with Pub/Sub messaging, Dapr Agents supports workflows that adapt to real-time events. This facilitates decentralized, asynchronous collaboration between agents, allowing workflows to dynamically adjust to changing scenarios.
By integrating these paradigms, Dapr Agents enables workflows that combine the reliability of deterministic execution with the adaptability of event-driven processes, ensuring flexibility and resilience in a wide range of applications.
!!! info
Dapr Agents workflows blend structured, predictable logic with the dynamic responsiveness of event-driven systems, empowering both centralized and decentralized workflows.
![](../img/home_concepts_principles_workflows.png)

View File

@ -1,462 +0,0 @@
# Agentic Workflows
!!! info
This quickstart requires `Dapr CLI` and `Docker`. You must have your [local Dapr environment set up](../installation.md).
Traditional workflows follow fixed, step-by-step processes, while autonomous agents make real-time decisions based on reasoning and available data. Agentic workflows combine the best of both approaches, integrating structured execution with reasoning loops to enable more adaptive decision-making.
This allows systems to analyze information, adjust to new conditions, and refine actions dynamically rather than strictly following a predefined sequence. By incorporating planning, feedback loops, and model-driven adjustments, agentic workflows provide both scalability and predictability while still allowing for autonomous adaptation.
In `Dapr Agents`, agentic workflows leverage LLM-based tasks, reasoning loop patterns, and an event-driven system powered by pub/sub messaging and a shared message bus. Agents operate autonomously, responding to events in real time, making decisions, and collaborating dynamically. This makes the system highly adaptable—agents can communicate, share tasks, and adjust based on new information, ensuring fluid coordination across distributed environments. This approach is particularly useful for decentralized systems that require flexible, intelligent collaboration across multiple agents and applications.
!!! tip
We will demonstrate this concept using the [Multi-Agent Workflow Guide](https://github.com/dapr/dapr-agents/tree/main/cookbook/workflows/multi_agents/basic_lotr_agents_as_workflows) from our Cookbook, which outlines a step-by-step guide to implementing a basic agentic workflow.
## Agents as Services: Dapr Workflows
In `Dapr Agents`, agents can be implemented using [Dapr Workflows](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/), both of which are exposed as microservices via [FastAPI servers](https://docs.dapr.io/developing-applications/sdks/python/python-sdk-extensions/python-fastapi/).
### Agents as Dapr Workflows (Orchestration, Complex Execution)
[Dapr Workflows](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/) define the structured execution of agent behaviors, reasoning loops, and tool selection. Workflows allow agents to:
✅ Define complex execution sequences instead of just reacting to events.
✅ Integrate with message buses to listen and act on real-time inputs.
✅ Orchestrate multi-step reasoning, retrieval-augmented generation (RAG), and tool use.
✅ Best suited for goal-driven, structured, and iterative decision-making workflows.
🚀 Dapr agents uses Dapr Workflows for orchestration and complex multi-agent collaboration.
**Example: An Agent as a Dapr Workflow**
```python
from dapr_agents import DurableAgent
from dotenv import load_dotenv
import asyncio
import logging
async def main():
try:
# Define Agent
wizard_service = DurableAgent(
name="Gandalf",
role="Wizard",
goal="Guide the Fellowship with wisdom and strategy, using magic and insight to ensure the downfall of Sauron.",
instructions=[
"Speak like Gandalf, with wisdom, patience, and a touch of mystery.",
"Provide strategic counsel, always considering the long-term consequences of actions.",
"Use magic sparingly, applying it when necessary to guide or protect.",
"Encourage allies to find strength within themselves rather than relying solely on your power.",
"Respond concisely, accurately, and relevantly, ensuring clarity and strict alignment with the task."
],
message_bus_name="messagepubsub",
state_store_name="agenticworkflowstate",
state_key="workflow_state",
agents_registry_store_name="agentsregistrystore",
agents_registry_key="agents_registry",
)
await wizard_service.start()
except Exception as e:
print(f"Error starting service: {e}")
if __name__ == "__main__":
load_dotenv()
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
```
Here, `Gandalf` is an `DurableAgent` implemented as a workflow, meaning it executes structured reasoning, plans actions, and integrates tools within a managed workflow execution loop.
### How We Use Dapr Workflows for Orchestration
In dapr agents, the orchestrator itself is a Dapr Workflow, which:
✅ Coordinates execution of agentic workflows (LLM-driven or rule-based).
✅ Delegates tasks to agents implemented as either other workflows.
✅ Manages reasoning loops, plan adaptation, and error handling dynamically.
🚀 The LLM default orchestrator is a Dapr Workflow that interacts with agent workflows.
**Example: The Orchestrator as a Dapr Workflow**
```python
from dapr_agents import LLMOrchestrator
from dotenv import load_dotenv
import asyncio
import logging
async def main():
try:
agentic_orchestrator = LLMOrchestrator(
name="Orchestrator",
message_bus_name="messagepubsub",
state_store_name="agenticworkflowstate",
state_key="workflow_state",
agents_registry_store_name="agentsregistrystore",
agents_registry_key="agents_registry",
max_iterations=25
).as_service(port=8009)
await agentic_orchestrator.start()
except Exception as e:
print(f"Error starting service: {e}")
if __name__ == "__main__":
load_dotenv()
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
```
This orchestrator acts as a central controller, ensuring that agentic workflows communicate effectively, execute tasks in order, and handle iterative reasoning loops.
## Structuring A Multi-Agent Project
The way to structure such a project is straightforward. We organize our services into a directory that contains individual folders for each agent, along with a `components` directory for Dapr resources configurations. Each agent service includes its own app.py file, where the FastAPI server and the agent logic are defined.
```
dapr.yaml # Dapr main config file
components/ # Dapr resource files
├── statestore.yaml # State store configuration
├── pubsub.yaml # Pub/Sub configuration
└── ... # Other Dapr components
services/ # Directory for agent services
├── agent1/ # First agent's service
│ ├── app.py # FastAPI app for agent1
│ └── ... # Additional agent1 files
│── agent2/ # Second agent's service
│ ├── app.py # FastAPI app for agent2
│ └── ... # Additional agent2 files
└── ... # More agents
```
## Set Up an Environment Variables File
This example uses our default `LLM Orchestrator`. Therefore, you have to create an `.env` file to securely store your Inference Service (i.e. OpenAI) API keys and other sensitive information. For example:
```
OPENAI_API_KEY="your-api-key"
OPENAI_BASE_URL="https://api.openai.com/v1"
```
## Define Your First Agent Service
Let's start by definining a `Hobbit` service with a specific `name`, `role`, `goal` and `instructions`.
```
services/ # Directory for agent services
├── hobbit/ # Hobbit Service
│ ├── app.py # Dapr Enabled FastAPI app for Hobbit
```
Create the `app.py` script and provide the following information.
```python
from dapr_agents import DurableAgent
from dotenv import load_dotenv
import asyncio
import logging
async def main():
try:
# Define Agent and expose it as a service
hobbit_agent = DurableAgent(
role="Hobbit",
name="Frodo",
goal="Carry the One Ring to Mount Doom, resisting its corruptive power while navigating danger and uncertainty.",
instructions=[
"Speak like Frodo, with humility, determination, and a growing sense of resolve.",
"Endure hardships and temptations, staying true to the mission even when faced with doubt.",
"Seek guidance and trust allies, but bear the ultimate burden alone when necessary.",
"Move carefully through enemy-infested lands, avoiding unnecessary risks.",
"Respond concisely, accurately, and relevantly, ensuring clarity and strict alignment with the task."
],
message_bus_name="messagepubsub",
agents_registry_store_name="agentsregistrystore",
agents_registry_key="agents_registry",
).as_service(8001_)
await hobbit_service.start()
except Exception as e:
print(f"Error starting service: {e}")
if __name__ == "__main__":
load_dotenv()
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
```
Now, you can define multiple services following this format, but it's essential to pay attention to key areas to ensure everything runs smoothly. Specifically, focus on correctly configuring the components (e.g., `statestore` and `pubsub` names) and incrementing the ports for each service.
Key Considerations:
* Ensure the `message_bus_name` matches the `pub/sub` component name in your `pubsub.yaml` file.
* Verify the `agents_registry_store_name` matches the state store component defined in your `agentstate.yaml` file.
* Increment the `service_port` for each new agent service (e.g., 8001, 8002, 8003).
* Customize the Agent parameters (`role`, `name`, `goal`, and `instructions`) to match the behavior you want for each service.
## The Multi-App Run template file
The `Multi-App Run Template` File is a YAML configuration file named `dapr.yaml` that allows you to run multiple applications simultaneously. This file is placed at the same level as the `components/` and `services/` directories, ensuring a consistent and organized project structure.
```
dapr.yaml # The Multi-App Run template
components/ # Dapr configuration files
├── statestore.yaml # State store configuration
├── pubsub.yaml # Pub/Sub configuration
└── ... # Other Dapr components
services/ # Directory for agent services
├── agent1/ # First agent's service
│ ├── app.py # FastAPI app for agent1
│ └── ... # Additional agent1 files
│── agent2/ # Second agent's service
│ ├── app.py # FastAPI app for agent2
│ └── ... # Additional agent2 files
└── ... # More agents
```
Following our current scenario, we can set the following `Multi-App Run` template file:
```yaml
# https://docs.dapr.io/developing-applications/local-development/multi-app-dapr-run/multi-app-template/#template-properties
version: 1
common:
resourcesPath: ./components
logLevel: info
appLogDestination: console
daprdLogDestination: console
configFilePath: config.yaml
apps:
- appID: HobbitApp
appDirPath: ./services/hobbit/
appPort: 8001
command: ["python3", "app.py"]
- appID: WizardApp
appDirPath: ./services/wizard/
appPort: 8002
command: ["python3", "app.py"]
...
- appID: RangerApp
appDirPath: ./services/ranger/
appPort: 8007
command: ["python3", "app.py"]
- appID: WorkflowApp
appDirPath: ./services/workflow-llm/
command: ["python3", "app.py"]
appPort: 8009
```
## Starting All Service Servers
!!! tip
Make sure you have your environment variables set up in an `.env` file so that the library can pick it up and use it to communicate with `OpenAI` services. We set them up in the [LLM Inference Client](llm.md) section
To start all the service servers defined in your project, you can use the `Dapr CLI` with the `Multi-App Run template` file. When you provide a directory path, the CLI will look for the `dapr.yaml` file (the default name for the template) in that directory. If the file is not found, the CLI will return an error.
To execute the command, ensure you are in the root directory where the dapr.yaml file is located, then run:
```bash
dapr run -f .
```
This command reads the `dapr.yaml` file and starts all the services specified in the template.
## Monitor Services Initialization
- Verify agent console logs: Each service outputs logs to confirm successful initialization.
![](../../img/workflows_llm_agent_initialization_hobbit.png)
- Verify orchestrator console logs: The workflow is initialized showing workflow and task registrations.
![](../../img/workflows_llm_orchestrator_initialization.png)
- Verify Redis entries: Access the Redis Insight interface at `http://localhost:5540/`
![](../../img/workflows_llm_redis_agents_metadata.png)
## Start Workflow via an HTTP Request
Once all services are running, you can initiate the workflow by making an HTTP POST request to the Agentic Workflow Service. This service orchestrates the workflow, triggering agent actions and handling communication among agents. The `.as_service(port=8004)` is required on the orchestrator to enable the HTTP endpoint and built-in `start-workflow` route.
Heres an example of how to start the workflow using `curl`:
```bash
curl -i -X POST http://localhost:8009/start-workflow \
-H "Content-Type: application/json" \
-d '{"task": "Lets solve the riddle to open the Doors of Durin and enter Moria."}'
```
```
HTTP/1.1 200 OK
date: Sat, 22 Feb 2025 06:12:35 GMT
server: uvicorn
content-length: 104
content-type: application/json
{"message":"Workflow initiated successfully.","workflow_instance_id":"8cd46d085d6a44fbb46e1c7c92abdd0f"}
```
In this example:
* The request is sent to the Agentic Workflow Service running on port `8009`.
* The message parameter is passed as input to the `LLM Workflow`, which is then used to generate the plan and trigger the agentic workflow.
* This command demonstrates how to interact with the Agentic Workflow Service to kick off a new workflow.
## Starting the Workflow by Publishing a `TriggerAction` Message (Optional)
Agentic workflows can also be triggered by publishing a message to the orchestrator's pub/sub topic. This is an optional method instead of making HTTP requests and enables fully message-driven coordination.
### Step 1: Create a Trigger Script
Create a Python file (e.g., trigger.py) in your `services/client/` directory with the following content:
```Python
#!/usr/bin/env python3
import json
import sys
import time
import argparse
from dapr.clients import DaprClient
PUBSUB_NAME = "messagepubsub"
def main(topic, max_attempts=10, retry_delay=1):
message = {
"task": "How to get to Mordor? We all need to help!"
}
time.sleep(5) # Give orchestrators time to come online
for attempt in range(1, max_attempts + 1):
try:
print(f"📢 Attempt {attempt}: Publishing to topic '{topic}'...")
with DaprClient() as client:
client.publish_event(
pubsub_name=PUBSUB_NAME,
topic_name=topic,
data=json.dumps(message),
data_content_type="application/json",
publish_metadata={"cloudevent.type": "TriggerAction"}
)
print(f"✅ Message published to '{topic}'")
sys.exit(0)
except Exception as e:
print(f"❌ Publish failed: {e}")
if attempt < max_attempts:
print(f"⏳ Retrying in {retry_delay}s...")
time.sleep(retry_delay)
print("❌ Failed to publish message after multiple attempts.")
sys.exit(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Trigger a workflow by publishing to a Dapr topic.")
parser.add_argument("--orchestrator", type=str, default="LLMOrchestrator", help="Target orchestrator topic")
args = parser.parse_args()
main(args.orchestrator)
```
### Step 2: Run the Trigger Script
Once all your services are running, run the script with the target orchestrator topic:
```bash
python3 trigger.py --orchestrator LLMOrchestrator
```
Or if youre running the `RandomOrchestrator` workflow:
```bash
python3 trigger.py --orchestrator RandomOrchestrator
```
This will publish a `TriggerAction` message to the orchestrators topic, kicking off the workflow.
In this example:
* The message is published to the orchestrator topic (e.g., `LLMOrchestrator`) via Dapr's Pub/Sub.
* The orchestrator service listens on its topic and receives the message of type `TriggerAction`.
* The message payload (e.g., a task) is used to generate the plan and initiate the agentic workflow.
* This approach is fully decoupled from HTTP—no more direct POST requests to a service endpoint.
* It enables truly asynchronous and event-driven orchestration, making your system more scalable and resilient.
## Monitoring Workflow Execution
- Check console logs to trace activities in the workflow.
![](../../img/workflows_llm_console_logs_activities.png)
![](../../img/workflows_llm_console_logs_activities_chat_completions.png)
- Verify Redis entries: Access the Redis Insight interface at `http://localhost:5540/`
![](../../img/workflows_llm_redis_broadcast_channel.png)
- You can also check the `Workflow State` in the Redis Insight interface at `http://localhost:5540`. You can click on it, copy the log entry and paste it in your favorite editor. It is a `JSON` object. You will be able to see the chat history, the plan and tasks being completed.
![](../../img/workflows_llm_redis_workflow_state.png)
![](../../img/workflows_llm_redis_workflow_state_edit_mode.png)
![](../../img/workflows_llm_workflow_state_json_object.png)
![](../../img/workflows_llm_workflow_state_json_object_plan.png)
- As mentioned earlier, when we ran dapr init, Dapr initialized, a `Zipkin` container instance, used for observability and tracing. Open `http://localhost:9411/zipkin/` in your browser to view traces > Find a Trace > Run Query.
![](../../img/workflows_llm_zipkin_portal.png)
- Select the trace entry with multiple spans labeled `<workflow name>: /taskhubsidecarservice/startinstance.`. When you open this entry, youll see details about how each task or activity in the workflow was executed. If any task failed, the error will also be visible here.
![](../../img/workflows_llm_zipkin_spans_start.png)
- Check console logs to validate if workflow was executed successfuly.
![](../../img/workflows_llm_console_logs_complete.png)
## Switching Orchestrator
You can easily switch to a different `Orchestrator` type by updating the `dapr.yaml` file.
### Available Workflow Options
* **RoundRobin**: Cycles through agents in a fixed order, ensuring each agent gets an equal opportunity to process tasks.
* **Random**: Selects an agent randomly for each task.
* **LLM-based**: Uses a large language model (e.g., GPT-4o) to determine the most suitable agent based on the message and context.
### Switching to the Random Workflow
- Update dapr.yaml: Modify the appDirPath for the workflow service to point to the `workflow-random` directory:
```yaml
- appID: WorkflowApp
appDirPath: ./services/workflow-random/
command: ["python3", "app.py"]
```
### Reset Redis Database
1. Access the Redis Insight interface at `http://localhost:5540/`
2. In the search bar type `*` to select all items in the database.
3. Click on `Bulk Actions` > `Delete` > `Delete`
![](../../img/workflows_llm_redis_reset.png)
You should see an empty database now:
![](../../img/workflows_llm_redis_empty.png)
### Testing New Workflow
Restart the services with `dapr run -f` . and send a message to the workflow. Always ensure your `.env` file is configured correctly and contains the necessary credentials if needed.

View File

@ -1,21 +0,0 @@
# Dapr Agents Quickstarts
[Quickstarts](https://github.com/dapr/dapr-agents/tree/main/quickstarts) demonstrate how to use Dapr Agents to build applications with LLM-powered autonomous agents and event-driven workflows. Each quickstart builds upon the previous one, introducing new concepts incrementally.
!!! info
Not all quickstarts require Docker, but it is recommended to have your [local Dapr environment set up](../installation.md) with Docker for the best development experience and to follow the steps in this guide seamlessly.
## Quickstarts
| Scenario | What You'll Learn |
|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| --- |
| [Hello World](https://github.com/dapr/dapr-agents/tree/main/quickstarts/01-hello-world)<br>A rapid introduction that demonstrates core Dapr Agents concepts through simple, practical examples. | - **Basic LLM Usage**: Simple text generation with OpenAI models <br> - **Creating Agents**: Building agents with custom tools in under 20 lines of code <br> - **ReAct Pattern**: Implementing reasoning and action cycles in agents <br> - **Simple Workflows**: Setting up multi-step LLM processes |
| [LLM Call with Dapr Chat Client](https://github.com/dapr/dapr-agents/tree/main/quickstarts/02_llm_call_dapr)<br>Explore interaction with Language Models through Dapr Agents' `DaprChatClient`, featuring basic text generation with plain text prompts and templates. | - **Text Completion**: Generating responses to prompts <br> - **Swapping LLM providers**: Switching LLM backends without application code change <br> - **Resilience**: Setting timeout, retry and circuit-breaking <br> - **PII Obfuscation**: Automatically detect and mask sensitive user information |
| [LLM Call with OpenAI Client](https://github.com/dapr/dapr-agents/tree/main/quickstarts/02_llm_call_open_ai)<br>Discover how to leverage native LLM client libraries with Dapr Agents using the OpenAI Client for chat completion, audio processing, and embeddings. | - **Text Completion**: Generating responses to prompts <br> - **Structured Outputs**: Converting LLM responses to Pydantic objects <br><br> *Note: Other quickstarts for specific clients are available for [Elevenlabs](https://github.com/dapr/dapr-agents/tree/main/quickstarts/02_llm_call_elevenlabs), [Hugging Face](https://github.com/dapr/dapr-agents/tree/main/quickstarts/02_llm_call_hugging_face), and [Nvidia](https://github.com/dapr/dapr-agents/tree/main/quickstarts/02_llm_call_nvidia).* |
| [Agent Tool Call](https://github.com/dapr/dapr-agents/tree/main/quickstarts/03-agent-tool-call)<br>Build your first AI agent with custom tools by creating a practical weather assistant that fetches information and performs actions. | - **Tool Definition**: Creating reusable tools with the `@tool` decorator <br> - **Agent Configuration**: Setting up agents with roles, goals, and tools <br> - **Function Calling**: Enabling LLMs to execute Python functions |
| [Agentic Workflow](https://github.com/dapr/dapr-agents/tree/main/quickstarts/04-agentic-workflow)<br>Dive into stateful workflows with Dapr Agents by orchestrating sequential and parallel tasks through powerful workflow capabilities. | - **LLM-powered Tasks**: Using language models in workflows <br> - **Task Chaining**: Creating resilient multi-step processes executing in sequence <br> - **Fan-out/Fan-in**: Executing activities in parallel; then synchronizing these activities until all preceding activities have completed |
| [Multi-Agent Workflows](https://github.com/dapr/dapr-agents/tree/main/quickstarts/05-multi-agent-workflow-dapr-workflows)<br>Explore advanced event-driven workflows featuring a Lord of the Rings themed multi-agent system where autonomous agents collaborate to solve problems. | - **Multi-agent Systems**: Creating a network of specialized agents <br> - **Event-driven Architecture**: Implementing pub/sub messaging between agents <br> - **Actor Model**: Using Dapr Actors for stateful agent management <br> - **Workflow Orchestration**: Coordinating agents through different selection strategies <br><br> *Note: To see Actor-based workflow see [Multi-Agent Actors](https://github.com/dapr/dapr-agents/tree/main/quickstarts/05-multi-agent-workflow-actors).* |
| [Multi-Agent Workflow on Kubernetes](https://github.com/dapr/dapr-agents/tree/main/quickstarts/07-k8s-multi-agent-workflow)<br>Run multi-agent workflows in Kubernetes, demonstrating deployment and orchestration of event-driven agent systems in a containerized environment. | - **Kubernetes Deployment**: Running agents on Kubernetes <br> - **Container Orchestration**: Managing agent lifecycles with K8s <br> - **Service Communication**: Inter-agent communication in K8s |
| [Document Agent with Chainlit](https://github.com/dapr/dapr-agents/tree/main/quickstarts/06-document-agent-chainlit)<br>Create a conversational agent with an operational UI that can upload, and learn unstructured documents while retaining long-term memory. | - **Conversational Document Agent**: Upload and converse over unstructured documents <br> - **Cloud Agnostic Storage**: Upload files to multiple storage providers <br> - **Conversation Memory Storage**: Persists conversation history using external storage. |
| [Data Agent with MCP and Chainlit](https://github.com/dapr/dapr-agents/tree/main/quickstarts/08-data-agent-mcp-chainlit)<br>Build a conversational agent over a Postgres database using Model Composition Protocol (MCP) with a ChatGPT-like interface. | - **Database Querying**: Natural language queries to relational databases <br> - **MCP Integration**: Connecting to databases without DB-specific code <br> - **Data Analysis**: Complex data analysis through conversation |

View File

@ -1,19 +0,0 @@
# Why Dapr Agents
Dapr Agents is an open-source framework for building and orchestrating LLM-based autonomous agents, designed to simplify the complexity of creating scalable agentic workflows and microservices. Inspired by the growing need for frameworks that integrate seamlessly with distributed systems, Dapr Agents enables developers to focus on designing intelligent agents without getting bogged down by infrastructure concerns.
## The Problem
Many agentic frameworks today attempt to redefine how microservices are built and orchestrated by developing their own platforms for workflows, Pub/Sub messaging, state management, and service communication. While these efforts showcase innovation, they often lead to a steep learning curve, fragmented systems, and unnecessary complexity when scaling or adapting to new environments.
Many of these frameworks require developers to adopt entirely new paradigms or recreate foundational infrastructure, rather than building on existing solutions that are proven to handle these challenges at scale. This added complexity often diverts focus from the primary goal: designing and implementing intelligent, effective agents.
## Dapr Agents' Approach
Dapr Agents takes a distinct approach by building on [Dapr](https://dapr.io/), a portable and event-driven runtime optimized for distributed systems. Dapr offers built-in APIs and patterns such as state management, Pub/Sub messaging, service invocation, and virtual actors—that eliminate the need to recreate foundational components from scratch. By integrating seamlessly with Dapr, Dapr Agents empowers developers to focus on the intelligence and behavior of LLM-powered agents while leveraging a proven framework for scalability and reliability.
Rather than reinventing microservices, Dapr Agents enables developers to design, test, and deploy agents that seamlessly integrate as collaborative services within larger systems. Whether experimenting with a single agent or orchestrating workflows involving multiple agents, Dapr Agents simplifies the exploration and implementation of scalable agentic workflows.
## Conclusion
Dapr Agents provides a unified framework for designing, deploying, and orchestrating LLM-powered agents. By leveraging Daprs runtime and modular components, Dapr Agents allows developers to focus on building intelligent systems without worrying about the complexities of distributed infrastructure. Whether you're creating standalone agents or orchestrating multi-agent workflows, Dapr Agents empowers you to explore the future of intelligent, scalable, and collaborative systems.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 105 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 59 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 53 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 143 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 152 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 115 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 364 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 154 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 243 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 513 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 134 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 127 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 233 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 240 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 593 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 582 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 325 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 540 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 406 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 580 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 251 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 493 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 510 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 547 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 387 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 309 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 402 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 458 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 541 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 150 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 122 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 459 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 377 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 390 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 223 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 120 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 324 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 482 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 329 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 555 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 225 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 406 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 356 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 409 KiB

View File

@ -1,87 +0,0 @@
# Dapr Agents: A Framework for Agentic AI Systems
Dapr Agents is a developer framework designed to build production-grade resilient AI agent systems that operate at scale. Built on top of the battle-tested Dapr project, it enables software developers to create AI agents that reason, act, and collaborate using Large Language Models (LLMs), while leveraging built-in observability and stateful workflow execution to guarantee agentic workflows complete successfully, no matter how complex.
![](./img/logo-workflows.png)
## Key Features
- **Scale and Efficiency**: Run thousands of agents efficiently on a single core. Dapr distributes single and multi-agent apps transparently across fleets of machines and handles their lifecycle.
- **Workflow Resilience**: Automatically retries agentic workflows and ensures task completion.
- **Kubernetes-Native**: Easily deploy and manage agents in Kubernetes environments.
- **Data-Driven Agents**: Directly integrate with databases, documents, and unstructured data by connecting to dozens of different data sources.
- **Multi-Agent Systems**: Secure and observable by default, enabling collaboration between agents.
- **Vendor-Neutral & Open Source**: Avoid vendor lock-in and gain flexibility across cloud and on-premises deployments.
- **Platform-Ready**: Built-in RBAC, access scopes and declarative resources enable platform teams to integrate Dapr agents into their systems.
## Why Choose Dapr Agents?
### Scalable Workflows as a First Class Citizen
Dapr Agents uses a [durable-execution workflow engine](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/) that guarantees each agent task executes to completion in the face of network interruptions, node crashes and other types of disruptive failures. Developers do not need to know about the underlying concepts of the workflow engine - simply write an agent that performs any number of tasks and these will get automatically distributed across the cluster. If any task fails, it will be retried and recover its state from where it left off.
### Cost-Effective AI Adoption
Dapr Agents builds on top of Dapr's Workflow API, which under the hood represents each agent as an actor, a single unit of compute and state that is thread-safe and natively distributed, lending itself well to an agentic Scale-To-Zero architecture. This minimizes infrastructure costs, making AI adoption accessible to everyone. The underlying virtual actor model allows thousands of agents to run on demand on a single core machine with double-digit millisecond latency when scaling from zero. When unused, the agents are reclaimed by the system but retain their state until the next time they are needed. With this design, there's no trade-off between performance and resource efficiency.
### Data-Centric AI Agents
With built-in connectivity to over 50 enterprise data sources, Dapr Agents efficiently handles structured and unstructured data. From basic [PDF extraction](./concepts/arxiv_fetcher.md) to large-scale database interactions, it enables seamless data-driven AI workflows with minimal code changes. Dapr's [bindings](https://docs.dapr.io/reference/components-reference/supported-bindings/) and [state stores](https://docs.dapr.io/reference/components-reference/supported-state-stores/), along with [MCP](https://modelcontextprotocol.io/) support, provide access to a large number of data sources that can be used to ingest data to an agent.
### Accelerated Development
Dapr Agents provides a set of AI features that give developers a complete API surface to tackle common problems. Some of these include:
- Multi-agent communications
- Structured outputs
- Multiple LLM providers
- Contextual memory
- Flexible prompting
- Intelligent tool selection
- [MCP integration](https://docs.anthropic.com/en/docs/agents-and-tools/mcp)
### Integrated Security and Reliability
By building on top of Dapr, platform and infrastructure teams can apply Dapr's [resiliency policies](https://docs.dapr.io/operations/resiliency/policies/) to the database and/or message broker of their choice that are used by Dapr Agents. These policies include timeouts, retry/backoffs and circuit breakers. When it comes to security, Dapr provides the option to scope access to a given database or message broker to one or more agentic app deployments. In addition, Dapr Agents uses mTLS to encrypt the communication layer of its underlying components.
### Built-in Messaging and State Infrastructure
* 🎯 **Service-to-Service Invocation**: Facilitates direct communication between agents with built-in service discovery, error handling, and distributed tracing. Agents can leverage this for synchronous messaging in multi-agent workflows.
* ⚡️ **Publish and Subscribe**: Supports loosely coupled collaboration between agents through a shared message bus. This enables real-time, event-driven interactions critical for task distribution and coordination.
* 🔄 **Durable Workflow**: Defines long-running, persistent workflows that combine deterministic processes with LLM-based decision-making. Dapr Agents uses this to orchestrate complex multi-step agentic workflows seamlessly.
* 🧠 **State Management**: Provides a flexible key-value store for agents to retain context across interactions, ensuring continuity and adaptability during workflows.
* 🤖 **Actors**: Implements the Virtual Actor pattern, allowing agents to operate as self-contained, stateful units that handle messages sequentially. This eliminates concurrency concerns and enhances scalability in agentic systems.
### Vendor-Neutral and Open Source
As a part of **CNCF**, Dapr Agents is vendor-neutral, eliminating concerns about lock-in, intellectual property risks, or proprietary restrictions. Organizations gain full flexibility and control over their AI applications using open-source software they can audit and contribute to.
## Getting Started
<div class="grid cards" markdown>
- :material-clock-fast:{ .lg .middle } __Set up in 2 minutes__
---
Install [`Dapr Agents`](https://github.com/dapr/dapr-agents) with [`pip`](#) and set up your Dapr environment in minutes
[:octicons-arrow-right-24: Installation](home/installation.md)
- :material-rocket-launch:{ .lg .middle } __Start experimenting__
---
Build your first agent and design a custom workflow to get started with Dapr Agents.
[:octicons-arrow-right-24: Quickstarts](home/quickstarts/index.md)
- :material-lightbulb-on:{ .lg .middle } __Learn more__
---
Learn more about Dapr Agents and its main components!
[:octicons-arrow-right-24: Concepts](concepts/agents.md)
</div>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 117 KiB

View File

@ -21,9 +21,8 @@ dependencies = [
"pydantic>=2.11.3,<3.0.0",
"jinja2>=3.1.0,<4.0.0",
"pyyaml>=6.0,<7.0.0",
"requests>=2.31.0,<3.0.0",
"openapi-pydantic>=0.3.0,<0.4.0",
"openapi-schema-pydantic>=1.2.4,<2.0.0",
"requests>=2.32.4,<3.0.0",
"openapi-pydantic>=0.5.0,<0.6.0",
"rich>=13.9.4,<14.0.0",
"openai>=1.75.0,<2.0.0",
"azure-identity>=1.21.0,<2.0.0",
@ -36,9 +35,7 @@ dependencies = [
"cloudevents>=1.11.0,<2.0.0",
"numpy>=2.2.2,<3.0.0",
"mcp>=1.7.1,<2.0.0",
"pip-tools>=7.4.1,<8.0.0",
"sentence-transformers>=4.1.0,<5.0.0",
"chromadb>=1.0.13,<2.0.0",
"python-dotenv>=1.1.1,<2.0.0",
"posthog<6.0.0",
]
classifiers = [
@ -64,6 +61,8 @@ test = [
dev = [
"mypy>=1.15.0,<2.0.0",
"tox>=4.0.0,<5.0.0",
"pip-tools>=7.4.1,<8.0.0",
"cachetools>=6.1.0,<7.0.0",
]
observability = [
"opentelemetry-api>=1.12.0,<1.35.0",
@ -77,7 +76,6 @@ vectorstore = [
"torch>=2.7.0",
"sentence-transformers>=4.1.0,<5.0.0",
"chromadb>=0.4.22,<2.0.0",
"posthog<6.0.0",
]
[project.urls]

View File

@ -1,5 +1,5 @@
import asyncio
from dapr_agents import tool, Agent
from dapr_agents import tool, Agent, OpenAIChatClient
from dotenv import load_dotenv
load_dotenv()
@ -17,6 +17,7 @@ async def main():
role="Weather Assistant",
instructions=["Help users with weather information"],
tools=[my_weather_func],
llm=OpenAIChatClient(model="gpt-3.5-turbo"),
)
from opentelemetry import trace
@ -49,9 +50,12 @@ async def main():
instrumentor.instrument(tracer_provider=tracer_provider, skip_dep_check=True)
# Run the agent
try:
await weather_agent.run(
"What is the weather in Virginia, New York and Washington DC?"
)
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":

View File

@ -1,5 +1,5 @@
import asyncio
from dapr_agents import tool, Agent
from dapr_agents import tool, Agent, OpenAIChatClient
from dotenv import load_dotenv
load_dotenv()
@ -17,6 +17,7 @@ async def main():
role="Weather Assistant",
instructions=["Help users with weather information"],
tools=[my_weather_func],
llm=OpenAIChatClient(model="gpt-3.5-turbo"),
)
from opentelemetry import trace
@ -47,9 +48,12 @@ async def main():
instrumentor.instrument(tracer_provider=tracer_provider, skip_dep_check=True)
# Run the agent
try:
await weather_agent.run(
"What is the weather in Virginia, New York and Washington DC?"
)
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":

View File

@ -1,7 +1,8 @@
dapr-agents>=0.8.1
python-dotenv
dapr-agents>=0.8.3
opentelemetry-sdk>=1.25.0
opentelemetry-exporter-zipkin-json
opentelemetry-exporter-zipkin-json==1.25.0
opentelemetry-exporter-otlp>=1.25.0
opentelemetry-proto>=1.25.0
protobuf>=4.22
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -7,12 +7,15 @@ from dapr_agents.types.message import LLMChatResponse
load_dotenv()
# Initialize the OpenAI chat client
llm = OpenAIChatClient()
llm = OpenAIChatClient(model="gpt-3.5-turbo")
# Generate a response from the LLM
try:
response: LLMChatResponse = llm.generate("Tell me a joke")
# Print the Message content if it exists
if response.get_message() is not None:
content = response.get_message().content
print("Got response:", content)
except Exception as e:
print(f"Error: {e}")

View File

@ -1,5 +1,6 @@
import asyncio
from dapr_agents import tool, Agent
from dapr_agents import OpenAIChatClient
from dotenv import load_dotenv
load_dotenv()
@ -17,10 +18,13 @@ async def main():
role="Weather Assistant",
instructions=["Help users with weather information"],
tools=[my_weather_func],
llm=OpenAIChatClient(model="gpt-3.5-turbo"),
)
try:
response = await weather_agent.run("What's the weather?")
print(response)
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":

View File

@ -10,7 +10,7 @@ import asyncio
import logging
from typing import List
from pydantic import BaseModel, Field
from dapr_agents import tool, DurableAgent
from dapr_agents import tool, DurableAgent, OpenAIChatClient
from dapr_agents.memory import ConversationDaprStateMemory
from dotenv import load_dotenv
@ -58,6 +58,7 @@ async def main():
memory=ConversationDaprStateMemory(
store_name="conversationstore", session_id="my-unique-id"
),
llm=OpenAIChatClient(model="gpt-3.5-turbo"),
)
travel_planner.as_service(port=8001)

View File

@ -1,6 +1,5 @@
from dapr_agents.workflow import WorkflowApp, workflow, task
from dapr.ext.workflow import DaprWorkflowContext
from dotenv import load_dotenv
load_dotenv()
@ -31,6 +30,9 @@ def write_blog(outline: str) -> str:
if __name__ == "__main__":
wfapp = WorkflowApp()
try:
results = wfapp.run_and_monitor_workflow_sync(analyze_topic, input="AI Agents")
if len(results) > 0:
print(f"Result: {results}")
except Exception as e:
print(f"Error: {e}")

View File

@ -2,7 +2,7 @@ import logging
from dotenv import load_dotenv
from dapr_agents import Agent
from dapr_agents import Agent, OpenAIChatClient
from dapr_agents.document.embedder.sentence import SentenceTransformerEmbedder
from dapr_agents.storage.vectorstores import ChromaVectorStore
from dapr_agents.tool import tool
@ -110,14 +110,20 @@ async def main():
],
tools=[search_documents, add_document, add_machine_learning_doc],
vector_store=vector_store,
llm=OpenAIChatClient(model="gpt-3.5-turbo"),
)
try:
logging.info("Starting Vector Database Agent...")
await agent.run("Add a machine learning basics document")
logging.info("Add Machine Learning Document Response:")
except Exception as e:
print(f"Error: {e}")
try:
logging.info("Searching for machine learning documents...")
await agent.run("Search for documents about machine learning")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":

View File

@ -1,2 +1,3 @@
dapr-agents>=0.8.1
python-dotenv
dapr-agents>=0.8.3
# For local development use local changes by commenting out the line above and uncommenting the line below:
# -e ../../

View File

@ -1,2 +1,3 @@
dapr-agents>=0.8.1
python-dotenv
dapr-agents>=0.8.3
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -1,3 +1,4 @@
dapr-agents>=0.8.1
python-dotenv
dapr-agents>=0.8.3
elevenlabs
# For local development use local changes by commenting out the dapr-agentsline above and uncommenting the line below:
# -e ../../

View File

@ -1 +1,3 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
# For local development use local changes by commenting out the dapr-agentsline above and uncommenting the line below:
# -e ../../

View File

@ -1,3 +1,4 @@
dapr-agents>=0.8.1
python-dotenv
tiktoken
dapr-agents>=0.8.3
tiktoken==0.9.0
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -1,3 +1,4 @@
dapr-agents>=0.8.1
python-dotenv
tiktoken
dapr-agents>=0.8.3
tiktoken==0.9.0
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -325,6 +325,8 @@ if __name__ == "__main__":
asyncio.run(main())
```
Alternatively, you can use the DurableAgent with this same setup using `weather_durable_agent_tracing.py`.
#### Run with Observability
1. Ensure Phoenix server is running (see setup steps above)
@ -335,6 +337,11 @@ if __name__ == "__main__":
python weather_agent_tracing.py
```
Alternatively, you can run the DurableAgent using:
```bash
dapr run --app-id weatheragent --resources-path ./components -- python weather_durable_agent_tracing.py
```
3. View traces in Phoenix UI at [http://localhost:6006](http://localhost:6006)
![agent span results](AgentSpanResults.png)

View File

@ -0,0 +1,16 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: agentstatestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: enableTLS
value: "false"
- name: keyPrefix
value: none

View File

@ -0,0 +1,16 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagepubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: consumerID
value: "weather-agent-group"
- name: enableTLS
value: "false"

View File

@ -0,0 +1,16 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: workflowstatestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
- name: keyPrefix
value: none

View File

@ -1,4 +1,5 @@
dapr-agents[observability]>=0.8.1
python-dotenv
arize-phoenix
arize-phoenix-otel
dapr-agents[observability]>=0.8.3
arize-phoenix>=11.22.0,<12.0.0
arize-phoenix-otel>=0.12.0,<0.13.0
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -16,7 +16,6 @@ AIAgent = Agent(
"At the end, provide a concise summary that combines the weather information for all requested locations and any other actions you performed.",
],
memory=ConversationDaprStateMemory(store_name="historystore", session_id="some-id"),
pattern="toolcalling",
tools=tools,
)

View File

@ -0,0 +1,47 @@
import asyncio
from weather_tools import tools
from dapr_agents import DurableAgent
from dotenv import load_dotenv
load_dotenv()
# Wrap your async call
async def main():
from phoenix.otel import register
from dapr_agents.observability import DaprAgentsInstrumentor
# Register Dapr Agents with Phoenix OpenTelemetry
tracer_provider = register(
project_name="dapr-weather-durable-agent",
protocol="http/protobuf",
)
# Initialize Dapr Agents OpenTelemetry instrumentor
try:
instrumentor = DaprAgentsInstrumentor()
instrumentor.instrument(tracer_provider=tracer_provider, skip_dep_check=True)
except Exception as e:
raise
AIAgent = DurableAgent(
name="Steviee",
role="Weather Assistant",
goal="Assist Humans with weather related tasks.",
instructions=[
"Always answer the user's main weather question directly and clearly.",
"If you perform any additional actions (like jumping), summarize those actions and their results.",
"At the end, provide a concise summary that combines the weather information for all requested locations and any other actions you performed.",
],
tools=tools,
message_bus_name="messagepubsub",
state_store_name="workflowstatestore",
agents_registry_store_name="agentstatestore",
history_store_name="historystore",
)
await AIAgent.run("What is the weather in Virginia, New York and Washington DC?")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,3 +1,4 @@
dapr-agents>=0.8.1
python-dotenv
arize-phoenix-otel
dapr-agents>=0.8.3
arize-phoenix-otel>=0.12.0,<0.13.0
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -1,2 +1,3 @@
dapr-agents>=0.8.1
python-dotenv
dapr-agents>=0.8.3
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -1 +1 @@
requests
requests==2.32.4

View File

@ -1,2 +1,3 @@
dapr-agents>=0.8.1
python-dotenv
dapr-agents>=0.8.3
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -6,6 +6,7 @@ from unstructured.partition.pdf import partition_pdf
from dapr_agents import Agent
from dapr_agents.memory import ConversationDaprStateMemory
from dapr_agents.types import AssistantMessage
from dapr_agents import OpenAIChatClient
load_dotenv()
@ -23,6 +24,7 @@ agent = Agent(
memory=ConversationDaprStateMemory(
store_name="conversationstore", session_id="my-unique-id"
),
llm=OpenAIChatClient(model="gpt-3.5-turbo"),
)

View File

@ -1,5 +1,5 @@
dapr-agents>=0.8.1
python-dotenv
requests
chainlit
unstructured[all-docs]
dapr-agents>=0.8.3
chainlit==2.6.8
unstructured[all-docs]==0.18.11
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

Some files were not shown because too many files have changed in this diff Show More