Compare commits

...

11 Commits
v0.8.3 ... main

Author SHA1 Message Date
Roberto Rodriguez a475687445
Merge pull request #185 from sicoyle/tweak-da-workflow
fix(tracing): enable tracing on durable agent + add quickstart
2025-09-04 16:14:26 -04:00
Samantha Coyle 6e6ff447be
style: make linter happy
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-09-04 09:01:31 -05:00
Samantha Coyle dde2ab0d2c
fix: add method i missed
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-09-02 13:16:49 -05:00
Samantha Coyle 99defbabe1
Merge branch 'main' into tweak-da-workflow 2025-09-02 13:01:27 -05:00
Yaron Schneider fbb3bfd61f
change agents version in quickstarts (#190)
Signed-off-by: yaron2 <schneider.yaron@live.com>
2025-08-31 12:06:26 -04:00
Yaron Schneider 224c61c6c2
Merge branch 'main' into tweak-da-workflow 2025-08-30 16:15:52 -07:00
Samantha Coyle 423de2a7a1
fix: update for tests to pass
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-08-27 10:43:28 -04:00
Samantha Coyle 00e0863bef
style: add file i missed for formatting
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-08-27 08:04:44 -04:00
Samantha Coyle 4b081c6984
style: tox -e ruff
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-08-27 08:01:08 -04:00
Samantha Coyle e31484dde3
style: update readme with durable agent tracing quickstart too
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-08-27 07:54:04 -04:00
Samantha Coyle 54d40dbcdb
fix(tracing): enable tracing on durable agent + quickstart
Signed-off-by: Samantha Coyle <sam@diagrid.io>
2025-08-26 16:00:12 -04:00
29 changed files with 393 additions and 161 deletions

View File

@ -101,7 +101,7 @@ class Agent(AgentBase):
)
# Process conversation iterations and return the result
return await self.process_iterations(messages)
return await self.conversation(messages)
async def execute_tools(self, tool_calls: List[ToolCall]) -> List[ToolMessage]:
"""
@ -180,7 +180,7 @@ class Agent(AgentBase):
# Run all tool calls concurrently, but bounded by max_concurrent
return await asyncio.gather(*(run_and_record(tc) for tc in tool_calls))
async def process_iterations(self, messages: List[Dict[str, Any]]) -> Any:
async def conversation(self, messages: List[Dict[str, Any]]) -> Any:
"""
Drives the agent conversation iteratively until a final answer or max iterations is reached.
Handles tool calls, updates memory, and returns the final assistant message.

View File

@ -9,7 +9,7 @@ Span Hierarchy by Agent Type:
**Regular Agent (Direct Execution):**
- Agent.run (AGENT span) - Root span for agent execution
Agent.process_iterations (CHAIN span) - Processing and reasoning logic
Agent.conversation (CHAIN span) - Processing and reasoning logic
Agent.execute_tools (TOOL span) - Batch tool coordination
AgentToolExecutor.run_tool (TOOL span) - Individual tool execution
ChatClient.generate (LLM span) - Language model interactions
@ -121,6 +121,20 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
"""
return ("dapr-agents",)
def instrument(self, **kwargs: Any) -> None:
"""
Public method to instrument Dapr Agents with OpenTelemetry tracing.
This method is called by users to enable instrumentation. It delegates
to the private _instrument method which contains the actual implementation.
Args:
**kwargs: Instrumentation configuration including:
- tracer_provider: Optional OpenTelemetry tracer provider
- Additional provider-specific configuration
"""
self._instrument(**kwargs)
def _instrument(self, **kwargs: Any) -> None:
"""
Instrument Dapr Agents with comprehensive OpenTelemetry tracing.
@ -151,10 +165,17 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
# Apply W3C context propagation fix for Dapr Workflows (critical for proper tracing)
self._apply_context_propagation_fix()
# Apply method wrappers in logical order for complete tracing coverage
self._apply_agent_wrappers() # Regular agent execution paths
self._apply_workflow_wrappers() # Workflow-based agent execution
self._apply_llm_wrappers() # LLM provider integrations
# Apply agent wrappers (Regular Agent class execution paths)
self._apply_agent_wrappers()
# Register workflow instrumentation callback for lazy loading of wrappers.
# This is needed for DurableAgent instrumentation,
# because the WorkflowApp is not available immediately when the instrumentor is initialized,
# so we must register the callback to properly instrument the WorkflowApp.
self._register_workflow_instrumentation_callback()
# LLM provider integrations
self._apply_llm_wrappers()
logger.info("✅ Dapr Agents OpenTelemetry instrumentation enabled")
@ -310,19 +331,13 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
logger.debug(
f"🔗 Storing W3C context for instance {instance_id}"
)
logger.debug(
f"🔗 Traceparent: {global_context.get('traceparent')}"
)
# Store W3C context with specific workflow instance ID
store_workflow_context(instance_id, global_context)
else:
logger.warning(
"⚠️ No global workflow context found for storage"
f"⚠️ No global workflow context found for storage - instance {instance_id}"
)
else:
logger.warning(
"⚠️ No workflow instance_id - cannot store W3C context"
"⚠️ No workflow instance_id - cannot access W3C context"
)
try:
@ -373,33 +388,35 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
Instruments core agent methods to create AGENT spans (top-level) and CHAIN spans
(processing steps) for comprehensive agent execution tracing in Phoenix UI.
"""
# Main agent run wrapper (AGENT span - top level)
wrap_function_wrapper(
module="dapr_agents.agents.agent.agent",
name="Agent.run",
wrapper=AgentRunWrapper(self._tracer),
)
try:
from dapr_agents.agents.agent.agent import Agent
from dapr_agents.tool.executor import AgentToolExecutor
# Process iterations wrapper (CHAIN span - processing steps)
wrap_function_wrapper(
module="dapr_agents.agents.agent.agent",
name="Agent.process_iterations",
wrapper=ProcessIterationsWrapper(self._tracer),
)
# Main agent run wrapper (AGENT span - top level)
wrap_function_wrapper(
target=Agent.run,
wrapper=AgentRunWrapper(self._tracer),
)
# Tool execution batch wrapper (TOOL span - batch execution)
wrap_function_wrapper(
module="dapr_agents.agents.agent.agent",
name="Agent.execute_tools",
wrapper=ExecuteToolsWrapper(self._tracer),
)
# Process iterations wrapper (CHAIN span - processing steps)
wrap_function_wrapper(
target=Agent.conversation,
wrapper=ProcessIterationsWrapper(self._tracer),
)
# Individual tool execution wrapper (TOOL span - actual tool execution)
wrap_function_wrapper(
module="dapr_agents.tool.executor",
name="AgentToolExecutor.run_tool",
wrapper=RunToolWrapper(self._tracer),
)
# Tool execution batch wrapper (TOOL span - batch execution)
wrap_function_wrapper(
target=Agent.execute_tools,
wrapper=ExecuteToolsWrapper(self._tracer),
)
# Individual tool execution wrapper (TOOL span - actual tool execution)
wrap_function_wrapper(
target=AgentToolExecutor.run_tool,
wrapper=RunToolWrapper(self._tracer),
)
except Exception as e:
logger.error(f"Error applying agent wrappers: {e}", exc_info=True)
def _apply_workflow_wrappers(self) -> None:
"""
@ -409,29 +426,37 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
that capture the complete workflow lifecycle from start to completion, enabling
detailed monitoring of workflow orchestration in Phoenix UI.
"""
# Workflow execution wrapper for DurableAgent (AGENT span with monitoring)
# This captures the complete workflow lifecycle from start to completion
wrap_function_wrapper(
module="dapr_agents.workflow.base",
name="WorkflowApp.run_and_monitor_workflow_async",
wrapper=WorkflowMonitorWrapper(self._tracer),
)
try:
from dapr_agents.workflow.base import WorkflowApp
from dapr_agents.workflow.task import WorkflowTask
# Workflow execution wrapper for Orchestrators (AGENT span, fire-and-forget)
# This captures orchestrator workflow starts (no monitoring since they're event-driven)
wrap_function_wrapper(
module="dapr_agents.workflow.base",
name="WorkflowApp.run_workflow",
wrapper=WorkflowRunWrapper(self._tracer),
)
original_run_and_monitor = WorkflowApp.run_and_monitor_workflow_async
original_run_workflow = WorkflowApp.run_workflow
original_task_call = WorkflowTask.__call__
# Create wrapper instances
monitor_wrapper = WorkflowMonitorWrapper(self._tracer)
run_wrapper = WorkflowRunWrapper(self._tracer)
task_wrapper = WorkflowTaskWrapper(self._tracer)
# Workflow task execution wrapper (TOOL/LLM/TASK spans for individual workflow tasks)
# This captures individual task execution within workflows, using ctx.instance_id for grouping
wrap_function_wrapper(
module="dapr_agents.workflow.task",
name="WorkflowTask.__call__",
wrapper=WorkflowTaskWrapper(self._tracer),
)
# Create wrapped versions that can be bound to instances
def wrapped_run_and_monitor_workflow_async(self, *args, **kwargs):
return monitor_wrapper(original_run_and_monitor, self, args, kwargs)
def wrapped_run_workflow(self, *args, **kwargs):
return run_wrapper(original_run_workflow, self, args, kwargs)
def wrapped_task_call(self, *args, **kwargs):
return task_wrapper(original_task_call, self, args, kwargs)
# Replace the methods on the classes with our wrapped versions
# This is the only way to instrument Pydantic model methods
WorkflowApp.run_and_monitor_workflow_async = (
wrapped_run_and_monitor_workflow_async
)
WorkflowApp.run_workflow = wrapped_run_workflow
WorkflowTask.__call__ = wrapped_task_call
except Exception as e:
logger.error(f"Error applying workflow wrappers: {e}", exc_info=True)
def _apply_llm_wrappers(self) -> None:
"""
@ -496,6 +521,31 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
return chat_client_classes
def _register_workflow_instrumentation_callback(self):
"""
Register workflow instrumentation callback for lazy loading of wrappers.
This method registers a callback in the WorkflowApp class that will be
invoked when start_runtime() is called. This ensures workflow wrappers
are applied at the right time (when classes are fully loaded) while
maintaining user control over instrumentation.
"""
try:
from dapr_agents.workflow.base import WorkflowApp
# Register the workflow wrapper application as a class method
# This will be called when any WorkflowApp instance starts its runtime
setattr(
WorkflowApp,
WorkflowApp.INSTRUMENTATION_CALLBACK_ATTR,
self._apply_workflow_wrappers,
)
except ImportError as e:
logger.warning(f"Could not register workflow instrumentation callback: {e}")
except Exception as e:
logger.warning(f"Error registering workflow instrumentation callback: {e}")
def _uninstrument(self, **kwargs: Any) -> None:
"""
Uninstrument Dapr Agents by clearing the tracer.

View File

@ -237,9 +237,9 @@ class AgentRunWrapper:
class ProcessIterationsWrapper:
"""
Wrapper for Agent.process_iterations() method to create CHAIN spans for processing logic.
Wrapper for Agent.conversation() method to create CHAIN spans for processing logic.
This wrapper instruments the Agent.process_iterations() method, creating
This wrapper instruments the Agent.conversation() method, creating
CHAIN spans that capture the iterative processing and workflow execution
within an agent's reasoning cycle.
@ -265,16 +265,16 @@ class ProcessIterationsWrapper:
def __call__(self, wrapped: Any, instance: Any, args: Any, kwargs: Any) -> Any:
"""
Wrap Agent.process_iterations() method with CHAIN span tracing.
Wrap Agent.conversation() method with CHAIN span tracing.
Creates CHAIN spans that capture the iterative processing logic
and reasoning flow within agent execution, providing visibility
into the step-by-step processing workflow.
Args:
wrapped (callable): Original Agent.process_iterations method to be instrumented
wrapped (callable): Original Agent.conversation method to be instrumented
instance (Agent): Agent instance containing metadata and configuration
args (tuple): Positional arguments passed to process_iterations method
args (tuple): Positional arguments passed to conversation method
kwargs (dict): Keyword arguments passed to the original method
Returns:
@ -289,7 +289,7 @@ class ProcessIterationsWrapper:
# Extract agent information for span naming
agent_name = getattr(instance, "name", instance.__class__.__name__)
span_name = f"{agent_name}.process_iterations"
span_name = f"{agent_name}.conversation"
# Build span attributes
attributes = {
@ -319,15 +319,15 @@ class ProcessIterationsWrapper:
"""
Handle asynchronous process iterations execution with comprehensive span tracing.
Manages async process_iterations execution by creating CHAIN spans with
Manages async conversation execution by creating CHAIN spans with
proper attribute handling, result extraction, and error management for
iterative agent processing workflows.
Args:
wrapped (callable): Original async process_iterations method to execute
wrapped (callable): Original async conversation method to execute
args (tuple): Positional arguments for the wrapped method
kwargs (dict): Keyword arguments for the wrapped method
span_name (str): Name for the created span (e.g., "MyAgent.process_iterations")
span_name (str): Name for the created span (e.g., "MyAgent.conversation")
attributes (dict): Span attributes including agent name and processing context
Returns:
@ -359,17 +359,17 @@ class ProcessIterationsWrapper:
self, wrapped: Any, args: Any, kwargs: Any, span_name: str, attributes: dict
) -> Any:
"""
Handle synchronous process iterations execution with comprehensive span tracing.
Handle synchronous conversation execution with comprehensive span tracing.
Manages sync process_iterations execution by creating CHAIN spans with
Manages sync conversation execution by creating CHAIN spans with
proper attribute handling, result extraction, and error management for
iterative agent processing workflows.
Args:
wrapped (callable): Original sync process_iterations method to execute
wrapped (callable): Original sync conversation method to execute
args (tuple): Positional arguments for the wrapped method
kwargs (dict): Keyword arguments for the wrapped method
span_name (str): Name for the created span (e.g., "MyAgent.process_iterations")
span_name (str): Name for the created span (e.g., "MyAgent.conversation")
attributes (dict): Span attributes including agent name and processing context
Returns:

View File

@ -1,4 +1,5 @@
import logging
import asyncio
from typing import Any, Dict
from ..constants import (
@ -202,11 +203,14 @@ class WorkflowMonitorWrapper:
"""
Wrap WorkflowApp.run_and_monitor_workflow_async with AGENT span tracing.
Creates the top-level AGENT span for DurableAgent workflow execution and
stores the global workflow context immediately for task correlation.
Args:
wrapped: Original WorkflowApp.run_and_monitor_workflow_async method
instance: WorkflowApp instance (DurableAgent)
args: Positional arguments (workflow, input)
kwargs: Keyword arguments
wrapped (callable): Original WorkflowApp.run_and_monitor_workflow_async method
instance (Any): WorkflowApp instance (DurableAgent)
args (tuple): Positional arguments for the wrapped method
kwargs (dict): Keyword arguments for the wrapped method
Returns:
Any: Result from wrapped method execution (workflow output)
@ -217,9 +221,58 @@ class WorkflowMonitorWrapper:
):
return wrapped(*args, **kwargs)
# Extract arguments
arguments = bind_arguments(wrapped, *args, **kwargs)
workflow = arguments.get("workflow")
workflow_name = self._extract_workflow_name(args, kwargs)
# Extract agent name from the instance
agent_name = getattr(instance, "name", "DurableAgent")
attributes = self._build_workflow_attributes(
workflow_name, agent_name, args, kwargs
)
# Store global context immediately when this method is called
# This ensures workflow tasks can access it before async execution begins
try:
from ..context_propagation import extract_otel_context
from ..context_storage import store_workflow_context
captured_context = extract_otel_context()
if captured_context.get("traceparent"):
logger.debug(
f"Captured traceparent: {captured_context.get('traceparent')}"
)
store_workflow_context("__global_workflow_context__", captured_context)
else:
logger.warning(
"No traceparent found in captured context during __call__"
)
except Exception as e:
logger.warning(f"Failed to capture/store workflow context in __call__: {e}")
# Handle async vs sync execution
if asyncio.iscoroutinefunction(wrapped):
return self._handle_async_execution(
wrapped, args, kwargs, attributes, workflow_name, agent_name
)
else:
return self._handle_sync_execution(
wrapped, args, kwargs, attributes, workflow_name, agent_name
)
def _extract_workflow_name(self, args: Any, kwargs: Any) -> str:
"""
Extract workflow name from method arguments.
Args:
args: Positional arguments
kwargs: Keyword arguments
Returns:
str: Workflow name
"""
if args and len(args) > 0:
workflow = args[0]
else:
workflow = kwargs.get("workflow")
# Extract workflow name
workflow_name = (
@ -228,47 +281,42 @@ class WorkflowMonitorWrapper:
else getattr(workflow, "__name__", "unknown_workflow")
)
# Build span attributes
attributes = self._build_monitor_attributes(instance, workflow_name, arguments)
return workflow_name
# Debug logging to confirm wrapper is being called
logger.debug(
f"🔍 WorkflowMonitorWrapper creating AGENT span for workflow: {workflow_name}"
)
# Handle async execution
return self._handle_async_execution(
wrapped, args, kwargs, attributes, workflow_name
)
def _build_monitor_attributes(
self, instance: Any, workflow_name: str, arguments: Dict[str, Any]
def _build_workflow_attributes(
self, workflow_name: str, agent_name: str, args: Any, kwargs: Any
) -> Dict[str, Any]:
"""
Build span attributes for DurableAgent workflow monitoring.
Build span attributes for workflow execution.
Args:
instance: WorkflowApp instance (DurableAgent)
workflow_name: Name of the workflow being monitored
arguments: Bound method arguments from the wrapped call
workflow_name: Name of the workflow
agent_name: Name of the agent
args: Positional arguments
kwargs: Keyword arguments
Returns:
Dict[str, Any]: Span attributes for the AGENT span
"""
agent_name = getattr(instance, "name", instance.__class__.__name__)
# Build basic attributes
attributes = {
"openinference.span.kind": "AGENT", # DurableAgent workflow execution is the agent action
INPUT_MIME_TYPE: "application/json",
OUTPUT_MIME_TYPE: "application/json",
"workflow.name": workflow_name,
"workflow.operation": "run_and_monitor",
"agent.execution_mode": "workflow_based",
"agent.name": agent_name,
"agent.type": instance.__class__.__name__,
OUTPUT_MIME_TYPE: "application/json",
}
# Serialize input arguments
attributes[INPUT_VALUE] = safe_json_dumps(arguments)
# Add input payload if available
if args and len(args) > 1:
# Second argument is typically the input
input_data = args[1]
if input_data is not None:
attributes[INPUT_VALUE] = safe_json_dumps(input_data)
attributes[INPUT_MIME_TYPE] = "application/json"
elif "input" in kwargs and kwargs["input"] is not None:
attributes[INPUT_VALUE] = safe_json_dumps(kwargs["input"])
attributes[INPUT_MIME_TYPE] = "application/json"
# Add context attributes
attributes.update(get_attributes_from_context())
@ -282,6 +330,7 @@ class WorkflowMonitorWrapper:
kwargs: Any,
attributes: Dict[str, Any],
workflow_name: str,
agent_name: str,
) -> Any:
"""
Handle async workflow monitoring execution with context propagation.
@ -301,42 +350,12 @@ class WorkflowMonitorWrapper:
span_name = f"Agent.{workflow_name}"
# Debug logging to confirm span creation
logger.debug(f"Creating AGENT span: {span_name}")
logger.debug(f"Creating AGENT span: {span_name}")
with self._tracer.start_as_current_span(
span_name, attributes=attributes
) as span:
try:
# Debug logging to confirm span context
logger.debug(f"📋 AGENT span context: {span.get_span_context()}")
# CRITICAL: Capture and store OpenTelemetry context BEFORE executing workflow
# This must happen before wrapped() is called so tasks can access it
try:
from ..context_propagation import extract_otel_context
from ..context_storage import store_workflow_context
captured_context = extract_otel_context()
if captured_context.get("traceparent"):
logger.debug("🔗 Captured workflow context in AGENT span")
logger.debug(
f"🔗 Traceparent: {captured_context.get('traceparent')}"
)
# Store context IMMEDIATELY for workflow tasks to use
store_workflow_context(
"__global_workflow_context__", captured_context
)
logger.debug(
"🔗 Stored global workflow context for task correlation"
)
else:
logger.warning("⚠️ No traceparent found in captured context")
except Exception as e:
logger.warning(
f"⚠️ Failed to capture/store workflow context: {e}"
)
# Execute workflow and get result
result = await wrapped(*args, **kwargs)
@ -349,17 +368,69 @@ class WorkflowMonitorWrapper:
span.set_attribute(OUTPUT_VALUE, safe_json_dumps(result))
span.set_status(Status(StatusCode.OK))
logger.debug(f"🎯 AGENT span completed successfully: {span_name}")
logger.debug(f"AGENT span completed successfully: {span_name}")
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
logger.error(f"AGENT span failed: {span_name} - {e}")
logger.error(f"AGENT span failed: {span_name} - {e}")
raise
return async_wrapper()
def _handle_sync_execution(
self,
wrapped: Any,
args: Any,
kwargs: Any,
attributes: Dict[str, Any],
workflow_name: str,
agent_name: str,
) -> Any:
"""
Handle synchronous workflow monitoring execution with context propagation.
Args:
wrapped: Original synchronous method to execute
args: Positional arguments for the wrapped method
kwargs: Keyword arguments for the wrapped method
attributes: Pre-built span attributes
workflow_name: Name of the workflow for span naming
Returns:
Any: Result from wrapped method execution
"""
span_name = f"Agent.{workflow_name}"
# Debug logging to confirm span creation
logger.debug(f"Creating AGENT span: {span_name}")
with self._tracer.start_as_current_span(
span_name, attributes=attributes
) as span:
try:
# Execute workflow and get result
result = wrapped(*args, **kwargs)
# Set output attributes - handle both string and object results consistently
if isinstance(result, str):
# If result is already a JSON string, use it directly
span.set_attribute(OUTPUT_VALUE, result)
else:
# If result is an object, serialize it to match input format
span.set_attribute(OUTPUT_VALUE, safe_json_dumps(result))
span.set_status(Status(StatusCode.OK))
logger.debug(f"AGENT span completed successfully: {span_name}")
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
logger.error(f"AGENT span failed: {span_name} - {e}")
raise
# ============================================================================
# Exported Classes

View File

@ -15,6 +15,7 @@ from dapr.ext.workflow import (
from dapr.ext.workflow.workflow_state import WorkflowState
from durabletask import task as dtask
from pydantic import BaseModel, ConfigDict, Field
from typing import ClassVar
from dapr_agents.agents.base import ChatClientBase
from dapr_agents.types.workflow import DaprWorkflowStatus
@ -31,6 +32,11 @@ class WorkflowApp(BaseModel):
A Pydantic-based class to encapsulate a Dapr Workflow runtime and manage workflows and tasks.
"""
# Class-level constant for instrumentation callback
# This is used to register the workflow instrumentation callback with the instrumentor
# This is needed because the instrumentor is not available immediately when the WorkflowApp is initialized
INSTRUMENTATION_CALLBACK_ATTR: ClassVar[str] = "_workflow_instrumentation_callback"
llm: Optional[ChatClientBase] = Field(
default=None,
description="The default LLM client for tasks that explicitly require an LLM but don't specify one (optional).",
@ -436,6 +442,10 @@ class WorkflowApp(BaseModel):
logger.info("Starting workflow runtime.")
self.wf_runtime.start()
self.wf_runtime_is_running = True
# Apply workflow instrumentation if callback was registered
if hasattr(self.__class__, self.INSTRUMENTATION_CALLBACK_ATTR):
getattr(self.__class__, self.INSTRUMENTATION_CALLBACK_ATTR)()
else:
logger.debug("Workflow runtime already running; skipping.")

View File

@ -1,4 +1,4 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
opentelemetry-sdk>=1.25.0
opentelemetry-exporter-zipkin-json==1.25.0
opentelemetry-exporter-otlp>=1.25.0

View File

@ -1,3 +1,3 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
# For local development use local changes by commenting out the line above and uncommenting the line below:
# -e ../../

View File

@ -1,3 +1,3 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -1,4 +1,4 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
elevenlabs
# For local development use local changes by commenting out the dapr-agentsline above and uncommenting the line below:
# -e ../../

View File

@ -1,3 +1,3 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
# For local development use local changes by commenting out the dapr-agentsline above and uncommenting the line below:
# -e ../../

View File

@ -1,4 +1,4 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
tiktoken==0.9.0
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -1,4 +1,4 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
tiktoken==0.9.0
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -325,6 +325,8 @@ if __name__ == "__main__":
asyncio.run(main())
```
Alternatively, you can use the DurableAgent with this same setup using `weather_durable_agent_tracing.py`.
#### Run with Observability
1. Ensure Phoenix server is running (see setup steps above)
@ -335,6 +337,11 @@ if __name__ == "__main__":
python weather_agent_tracing.py
```
Alternatively, you can run the DurableAgent using:
```bash
dapr run --app-id weatheragent --resources-path ./components -- python weather_durable_agent_tracing.py
```
3. View traces in Phoenix UI at [http://localhost:6006](http://localhost:6006)
![agent span results](AgentSpanResults.png)

View File

@ -0,0 +1,16 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: agentstatestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: enableTLS
value: "false"
- name: keyPrefix
value: none

View File

@ -0,0 +1,16 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagepubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: consumerID
value: "weather-agent-group"
- name: enableTLS
value: "false"

View File

@ -0,0 +1,16 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: workflowstatestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
- name: keyPrefix
value: none

View File

@ -1,4 +1,4 @@
dapr-agents[observability]>=0.8.1
dapr-agents[observability]>=0.8.3
arize-phoenix>=11.22.0,<12.0.0
arize-phoenix-otel>=0.12.0,<0.13.0
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:

View File

@ -16,7 +16,6 @@ AIAgent = Agent(
"At the end, provide a concise summary that combines the weather information for all requested locations and any other actions you performed.",
],
memory=ConversationDaprStateMemory(store_name="historystore", session_id="some-id"),
pattern="toolcalling",
tools=tools,
)

View File

@ -0,0 +1,47 @@
import asyncio
from weather_tools import tools
from dapr_agents import DurableAgent
from dotenv import load_dotenv
load_dotenv()
# Wrap your async call
async def main():
from phoenix.otel import register
from dapr_agents.observability import DaprAgentsInstrumentor
# Register Dapr Agents with Phoenix OpenTelemetry
tracer_provider = register(
project_name="dapr-weather-durable-agent",
protocol="http/protobuf",
)
# Initialize Dapr Agents OpenTelemetry instrumentor
try:
instrumentor = DaprAgentsInstrumentor()
instrumentor.instrument(tracer_provider=tracer_provider, skip_dep_check=True)
except Exception as e:
raise
AIAgent = DurableAgent(
name="Steviee",
role="Weather Assistant",
goal="Assist Humans with weather related tasks.",
instructions=[
"Always answer the user's main weather question directly and clearly.",
"If you perform any additional actions (like jumping), summarize those actions and their results.",
"At the end, provide a concise summary that combines the weather information for all requested locations and any other actions you performed.",
],
tools=tools,
message_bus_name="messagepubsub",
state_store_name="workflowstatestore",
agents_registry_store_name="agentstatestore",
history_store_name="historystore",
)
await AIAgent.run("What is the weather in Virginia, New York and Washington DC?")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,4 +1,4 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
arize-phoenix-otel>=0.12.0,<0.13.0
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -1,3 +1,3 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -1,3 +1,3 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -1,4 +1,4 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
chainlit==2.6.8
unstructured[all-docs]==0.18.11
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:

View File

@ -1,4 +1,4 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
starlette==0.47.2
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -1,3 +1,3 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -1,3 +1,3 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -1,4 +1,4 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
chainlit==2.6.8
psycopg[binary]==3.2.9
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:

View File

@ -1,4 +1,4 @@
dapr-agents>=0.8.1
dapr-agents>=0.8.3
arize-phoenix==11.22.0
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
# -e ../../

View File

@ -214,14 +214,14 @@ class TestAgent:
await agent_with_tools.execute_tools([tool_call])
@pytest.mark.asyncio
async def test_process_iterations_max_reached(self, basic_agent):
async def test_conversation_max_reached(self, basic_agent):
"""Test that agent stops immediately when there are no tool calls."""
mock_response = Mock(spec=LLMChatResponse)
assistant_msg = AssistantMessage(content="Using tool", tool_calls=[])
mock_response.get_message.return_value = assistant_msg
basic_agent.llm.generate.return_value = mock_response
result = await basic_agent.process_iterations([])
result = await basic_agent.conversation([])
# current logic sees no tools ===> returns on first iteration
assert isinstance(result, AssistantMessage)
@ -229,14 +229,14 @@ class TestAgent:
assert basic_agent.llm.generate.call_count == 1
@pytest.mark.asyncio
async def test_process_iterations_with_llm_error(self, basic_agent):
async def test_conversation_with_llm_error(self, basic_agent):
"""Test handling of LLM errors during iterations."""
basic_agent.llm.generate.side_effect = Exception("LLM error")
with pytest.raises(
AgentError, match="Failed during chat generation: LLM error"
):
await basic_agent.process_iterations([])
await basic_agent.conversation([])
@pytest.mark.asyncio
async def test_run_tool_success(self, agent_with_tools):