mirror of https://github.com/dapr/dapr-agents.git
Compare commits
11 Commits
Author | SHA1 | Date |
---|---|---|
|
a475687445 | |
|
6e6ff447be | |
|
dde2ab0d2c | |
|
99defbabe1 | |
|
fbb3bfd61f | |
|
224c61c6c2 | |
|
423de2a7a1 | |
|
00e0863bef | |
|
4b081c6984 | |
|
e31484dde3 | |
|
54d40dbcdb |
|
@ -101,7 +101,7 @@ class Agent(AgentBase):
|
|||
)
|
||||
|
||||
# Process conversation iterations and return the result
|
||||
return await self.process_iterations(messages)
|
||||
return await self.conversation(messages)
|
||||
|
||||
async def execute_tools(self, tool_calls: List[ToolCall]) -> List[ToolMessage]:
|
||||
"""
|
||||
|
@ -180,7 +180,7 @@ class Agent(AgentBase):
|
|||
# Run all tool calls concurrently, but bounded by max_concurrent
|
||||
return await asyncio.gather(*(run_and_record(tc) for tc in tool_calls))
|
||||
|
||||
async def process_iterations(self, messages: List[Dict[str, Any]]) -> Any:
|
||||
async def conversation(self, messages: List[Dict[str, Any]]) -> Any:
|
||||
"""
|
||||
Drives the agent conversation iteratively until a final answer or max iterations is reached.
|
||||
Handles tool calls, updates memory, and returns the final assistant message.
|
||||
|
|
|
@ -9,7 +9,7 @@ Span Hierarchy by Agent Type:
|
|||
|
||||
**Regular Agent (Direct Execution):**
|
||||
- Agent.run (AGENT span) - Root span for agent execution
|
||||
└── Agent.process_iterations (CHAIN span) - Processing and reasoning logic
|
||||
└── Agent.conversation (CHAIN span) - Processing and reasoning logic
|
||||
├── Agent.execute_tools (TOOL span) - Batch tool coordination
|
||||
│ └── AgentToolExecutor.run_tool (TOOL span) - Individual tool execution
|
||||
└── ChatClient.generate (LLM span) - Language model interactions
|
||||
|
@ -121,6 +121,20 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
|
|||
"""
|
||||
return ("dapr-agents",)
|
||||
|
||||
def instrument(self, **kwargs: Any) -> None:
|
||||
"""
|
||||
Public method to instrument Dapr Agents with OpenTelemetry tracing.
|
||||
|
||||
This method is called by users to enable instrumentation. It delegates
|
||||
to the private _instrument method which contains the actual implementation.
|
||||
|
||||
Args:
|
||||
**kwargs: Instrumentation configuration including:
|
||||
- tracer_provider: Optional OpenTelemetry tracer provider
|
||||
- Additional provider-specific configuration
|
||||
"""
|
||||
self._instrument(**kwargs)
|
||||
|
||||
def _instrument(self, **kwargs: Any) -> None:
|
||||
"""
|
||||
Instrument Dapr Agents with comprehensive OpenTelemetry tracing.
|
||||
|
@ -151,10 +165,17 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
|
|||
# Apply W3C context propagation fix for Dapr Workflows (critical for proper tracing)
|
||||
self._apply_context_propagation_fix()
|
||||
|
||||
# Apply method wrappers in logical order for complete tracing coverage
|
||||
self._apply_agent_wrappers() # Regular agent execution paths
|
||||
self._apply_workflow_wrappers() # Workflow-based agent execution
|
||||
self._apply_llm_wrappers() # LLM provider integrations
|
||||
# Apply agent wrappers (Regular Agent class execution paths)
|
||||
self._apply_agent_wrappers()
|
||||
|
||||
# Register workflow instrumentation callback for lazy loading of wrappers.
|
||||
# This is needed for DurableAgent instrumentation,
|
||||
# because the WorkflowApp is not available immediately when the instrumentor is initialized,
|
||||
# so we must register the callback to properly instrument the WorkflowApp.
|
||||
self._register_workflow_instrumentation_callback()
|
||||
|
||||
# LLM provider integrations
|
||||
self._apply_llm_wrappers()
|
||||
|
||||
logger.info("✅ Dapr Agents OpenTelemetry instrumentation enabled")
|
||||
|
||||
|
@ -310,19 +331,13 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
|
|||
logger.debug(
|
||||
f"🔗 Storing W3C context for instance {instance_id}"
|
||||
)
|
||||
logger.debug(
|
||||
f"🔗 Traceparent: {global_context.get('traceparent')}"
|
||||
)
|
||||
|
||||
# Store W3C context with specific workflow instance ID
|
||||
store_workflow_context(instance_id, global_context)
|
||||
else:
|
||||
logger.warning(
|
||||
"⚠️ No global workflow context found for storage"
|
||||
f"⚠️ No global workflow context found for storage - instance {instance_id}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"⚠️ No workflow instance_id - cannot store W3C context"
|
||||
"⚠️ No workflow instance_id - cannot access W3C context"
|
||||
)
|
||||
|
||||
try:
|
||||
|
@ -373,33 +388,35 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
|
|||
Instruments core agent methods to create AGENT spans (top-level) and CHAIN spans
|
||||
(processing steps) for comprehensive agent execution tracing in Phoenix UI.
|
||||
"""
|
||||
# Main agent run wrapper (AGENT span - top level)
|
||||
wrap_function_wrapper(
|
||||
module="dapr_agents.agents.agent.agent",
|
||||
name="Agent.run",
|
||||
wrapper=AgentRunWrapper(self._tracer),
|
||||
)
|
||||
try:
|
||||
from dapr_agents.agents.agent.agent import Agent
|
||||
from dapr_agents.tool.executor import AgentToolExecutor
|
||||
|
||||
# Process iterations wrapper (CHAIN span - processing steps)
|
||||
wrap_function_wrapper(
|
||||
module="dapr_agents.agents.agent.agent",
|
||||
name="Agent.process_iterations",
|
||||
wrapper=ProcessIterationsWrapper(self._tracer),
|
||||
)
|
||||
# Main agent run wrapper (AGENT span - top level)
|
||||
wrap_function_wrapper(
|
||||
target=Agent.run,
|
||||
wrapper=AgentRunWrapper(self._tracer),
|
||||
)
|
||||
|
||||
# Tool execution batch wrapper (TOOL span - batch execution)
|
||||
wrap_function_wrapper(
|
||||
module="dapr_agents.agents.agent.agent",
|
||||
name="Agent.execute_tools",
|
||||
wrapper=ExecuteToolsWrapper(self._tracer),
|
||||
)
|
||||
# Process iterations wrapper (CHAIN span - processing steps)
|
||||
wrap_function_wrapper(
|
||||
target=Agent.conversation,
|
||||
wrapper=ProcessIterationsWrapper(self._tracer),
|
||||
)
|
||||
|
||||
# Individual tool execution wrapper (TOOL span - actual tool execution)
|
||||
wrap_function_wrapper(
|
||||
module="dapr_agents.tool.executor",
|
||||
name="AgentToolExecutor.run_tool",
|
||||
wrapper=RunToolWrapper(self._tracer),
|
||||
)
|
||||
# Tool execution batch wrapper (TOOL span - batch execution)
|
||||
wrap_function_wrapper(
|
||||
target=Agent.execute_tools,
|
||||
wrapper=ExecuteToolsWrapper(self._tracer),
|
||||
)
|
||||
|
||||
# Individual tool execution wrapper (TOOL span - actual tool execution)
|
||||
wrap_function_wrapper(
|
||||
target=AgentToolExecutor.run_tool,
|
||||
wrapper=RunToolWrapper(self._tracer),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error applying agent wrappers: {e}", exc_info=True)
|
||||
|
||||
def _apply_workflow_wrappers(self) -> None:
|
||||
"""
|
||||
|
@ -409,29 +426,37 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
|
|||
that capture the complete workflow lifecycle from start to completion, enabling
|
||||
detailed monitoring of workflow orchestration in Phoenix UI.
|
||||
"""
|
||||
# Workflow execution wrapper for DurableAgent (AGENT span with monitoring)
|
||||
# This captures the complete workflow lifecycle from start to completion
|
||||
wrap_function_wrapper(
|
||||
module="dapr_agents.workflow.base",
|
||||
name="WorkflowApp.run_and_monitor_workflow_async",
|
||||
wrapper=WorkflowMonitorWrapper(self._tracer),
|
||||
)
|
||||
try:
|
||||
from dapr_agents.workflow.base import WorkflowApp
|
||||
from dapr_agents.workflow.task import WorkflowTask
|
||||
|
||||
# Workflow execution wrapper for Orchestrators (AGENT span, fire-and-forget)
|
||||
# This captures orchestrator workflow starts (no monitoring since they're event-driven)
|
||||
wrap_function_wrapper(
|
||||
module="dapr_agents.workflow.base",
|
||||
name="WorkflowApp.run_workflow",
|
||||
wrapper=WorkflowRunWrapper(self._tracer),
|
||||
)
|
||||
original_run_and_monitor = WorkflowApp.run_and_monitor_workflow_async
|
||||
original_run_workflow = WorkflowApp.run_workflow
|
||||
original_task_call = WorkflowTask.__call__
|
||||
# Create wrapper instances
|
||||
monitor_wrapper = WorkflowMonitorWrapper(self._tracer)
|
||||
run_wrapper = WorkflowRunWrapper(self._tracer)
|
||||
task_wrapper = WorkflowTaskWrapper(self._tracer)
|
||||
|
||||
# Workflow task execution wrapper (TOOL/LLM/TASK spans for individual workflow tasks)
|
||||
# This captures individual task execution within workflows, using ctx.instance_id for grouping
|
||||
wrap_function_wrapper(
|
||||
module="dapr_agents.workflow.task",
|
||||
name="WorkflowTask.__call__",
|
||||
wrapper=WorkflowTaskWrapper(self._tracer),
|
||||
)
|
||||
# Create wrapped versions that can be bound to instances
|
||||
def wrapped_run_and_monitor_workflow_async(self, *args, **kwargs):
|
||||
return monitor_wrapper(original_run_and_monitor, self, args, kwargs)
|
||||
|
||||
def wrapped_run_workflow(self, *args, **kwargs):
|
||||
return run_wrapper(original_run_workflow, self, args, kwargs)
|
||||
|
||||
def wrapped_task_call(self, *args, **kwargs):
|
||||
return task_wrapper(original_task_call, self, args, kwargs)
|
||||
|
||||
# Replace the methods on the classes with our wrapped versions
|
||||
# This is the only way to instrument Pydantic model methods
|
||||
WorkflowApp.run_and_monitor_workflow_async = (
|
||||
wrapped_run_and_monitor_workflow_async
|
||||
)
|
||||
WorkflowApp.run_workflow = wrapped_run_workflow
|
||||
WorkflowTask.__call__ = wrapped_task_call
|
||||
except Exception as e:
|
||||
logger.error(f"Error applying workflow wrappers: {e}", exc_info=True)
|
||||
|
||||
def _apply_llm_wrappers(self) -> None:
|
||||
"""
|
||||
|
@ -496,6 +521,31 @@ class DaprAgentsInstrumentor(BaseInstrumentor):
|
|||
|
||||
return chat_client_classes
|
||||
|
||||
def _register_workflow_instrumentation_callback(self):
|
||||
"""
|
||||
Register workflow instrumentation callback for lazy loading of wrappers.
|
||||
|
||||
This method registers a callback in the WorkflowApp class that will be
|
||||
invoked when start_runtime() is called. This ensures workflow wrappers
|
||||
are applied at the right time (when classes are fully loaded) while
|
||||
maintaining user control over instrumentation.
|
||||
"""
|
||||
try:
|
||||
from dapr_agents.workflow.base import WorkflowApp
|
||||
|
||||
# Register the workflow wrapper application as a class method
|
||||
# This will be called when any WorkflowApp instance starts its runtime
|
||||
setattr(
|
||||
WorkflowApp,
|
||||
WorkflowApp.INSTRUMENTATION_CALLBACK_ATTR,
|
||||
self._apply_workflow_wrappers,
|
||||
)
|
||||
|
||||
except ImportError as e:
|
||||
logger.warning(f"Could not register workflow instrumentation callback: {e}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error registering workflow instrumentation callback: {e}")
|
||||
|
||||
def _uninstrument(self, **kwargs: Any) -> None:
|
||||
"""
|
||||
Uninstrument Dapr Agents by clearing the tracer.
|
||||
|
|
|
@ -237,9 +237,9 @@ class AgentRunWrapper:
|
|||
|
||||
class ProcessIterationsWrapper:
|
||||
"""
|
||||
Wrapper for Agent.process_iterations() method to create CHAIN spans for processing logic.
|
||||
Wrapper for Agent.conversation() method to create CHAIN spans for processing logic.
|
||||
|
||||
This wrapper instruments the Agent.process_iterations() method, creating
|
||||
This wrapper instruments the Agent.conversation() method, creating
|
||||
CHAIN spans that capture the iterative processing and workflow execution
|
||||
within an agent's reasoning cycle.
|
||||
|
||||
|
@ -265,16 +265,16 @@ class ProcessIterationsWrapper:
|
|||
|
||||
def __call__(self, wrapped: Any, instance: Any, args: Any, kwargs: Any) -> Any:
|
||||
"""
|
||||
Wrap Agent.process_iterations() method with CHAIN span tracing.
|
||||
Wrap Agent.conversation() method with CHAIN span tracing.
|
||||
|
||||
Creates CHAIN spans that capture the iterative processing logic
|
||||
and reasoning flow within agent execution, providing visibility
|
||||
into the step-by-step processing workflow.
|
||||
|
||||
Args:
|
||||
wrapped (callable): Original Agent.process_iterations method to be instrumented
|
||||
wrapped (callable): Original Agent.conversation method to be instrumented
|
||||
instance (Agent): Agent instance containing metadata and configuration
|
||||
args (tuple): Positional arguments passed to process_iterations method
|
||||
args (tuple): Positional arguments passed to conversation method
|
||||
kwargs (dict): Keyword arguments passed to the original method
|
||||
|
||||
Returns:
|
||||
|
@ -289,7 +289,7 @@ class ProcessIterationsWrapper:
|
|||
|
||||
# Extract agent information for span naming
|
||||
agent_name = getattr(instance, "name", instance.__class__.__name__)
|
||||
span_name = f"{agent_name}.process_iterations"
|
||||
span_name = f"{agent_name}.conversation"
|
||||
|
||||
# Build span attributes
|
||||
attributes = {
|
||||
|
@ -319,15 +319,15 @@ class ProcessIterationsWrapper:
|
|||
"""
|
||||
Handle asynchronous process iterations execution with comprehensive span tracing.
|
||||
|
||||
Manages async process_iterations execution by creating CHAIN spans with
|
||||
Manages async conversation execution by creating CHAIN spans with
|
||||
proper attribute handling, result extraction, and error management for
|
||||
iterative agent processing workflows.
|
||||
|
||||
Args:
|
||||
wrapped (callable): Original async process_iterations method to execute
|
||||
wrapped (callable): Original async conversation method to execute
|
||||
args (tuple): Positional arguments for the wrapped method
|
||||
kwargs (dict): Keyword arguments for the wrapped method
|
||||
span_name (str): Name for the created span (e.g., "MyAgent.process_iterations")
|
||||
span_name (str): Name for the created span (e.g., "MyAgent.conversation")
|
||||
attributes (dict): Span attributes including agent name and processing context
|
||||
|
||||
Returns:
|
||||
|
@ -359,17 +359,17 @@ class ProcessIterationsWrapper:
|
|||
self, wrapped: Any, args: Any, kwargs: Any, span_name: str, attributes: dict
|
||||
) -> Any:
|
||||
"""
|
||||
Handle synchronous process iterations execution with comprehensive span tracing.
|
||||
Handle synchronous conversation execution with comprehensive span tracing.
|
||||
|
||||
Manages sync process_iterations execution by creating CHAIN spans with
|
||||
Manages sync conversation execution by creating CHAIN spans with
|
||||
proper attribute handling, result extraction, and error management for
|
||||
iterative agent processing workflows.
|
||||
|
||||
Args:
|
||||
wrapped (callable): Original sync process_iterations method to execute
|
||||
wrapped (callable): Original sync conversation method to execute
|
||||
args (tuple): Positional arguments for the wrapped method
|
||||
kwargs (dict): Keyword arguments for the wrapped method
|
||||
span_name (str): Name for the created span (e.g., "MyAgent.process_iterations")
|
||||
span_name (str): Name for the created span (e.g., "MyAgent.conversation")
|
||||
attributes (dict): Span attributes including agent name and processing context
|
||||
|
||||
Returns:
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import logging
|
||||
import asyncio
|
||||
from typing import Any, Dict
|
||||
|
||||
from ..constants import (
|
||||
|
@ -202,11 +203,14 @@ class WorkflowMonitorWrapper:
|
|||
"""
|
||||
Wrap WorkflowApp.run_and_monitor_workflow_async with AGENT span tracing.
|
||||
|
||||
Creates the top-level AGENT span for DurableAgent workflow execution and
|
||||
stores the global workflow context immediately for task correlation.
|
||||
|
||||
Args:
|
||||
wrapped: Original WorkflowApp.run_and_monitor_workflow_async method
|
||||
instance: WorkflowApp instance (DurableAgent)
|
||||
args: Positional arguments (workflow, input)
|
||||
kwargs: Keyword arguments
|
||||
wrapped (callable): Original WorkflowApp.run_and_monitor_workflow_async method
|
||||
instance (Any): WorkflowApp instance (DurableAgent)
|
||||
args (tuple): Positional arguments for the wrapped method
|
||||
kwargs (dict): Keyword arguments for the wrapped method
|
||||
|
||||
Returns:
|
||||
Any: Result from wrapped method execution (workflow output)
|
||||
|
@ -217,9 +221,58 @@ class WorkflowMonitorWrapper:
|
|||
):
|
||||
return wrapped(*args, **kwargs)
|
||||
|
||||
# Extract arguments
|
||||
arguments = bind_arguments(wrapped, *args, **kwargs)
|
||||
workflow = arguments.get("workflow")
|
||||
workflow_name = self._extract_workflow_name(args, kwargs)
|
||||
# Extract agent name from the instance
|
||||
agent_name = getattr(instance, "name", "DurableAgent")
|
||||
attributes = self._build_workflow_attributes(
|
||||
workflow_name, agent_name, args, kwargs
|
||||
)
|
||||
|
||||
# Store global context immediately when this method is called
|
||||
# This ensures workflow tasks can access it before async execution begins
|
||||
try:
|
||||
from ..context_propagation import extract_otel_context
|
||||
from ..context_storage import store_workflow_context
|
||||
|
||||
captured_context = extract_otel_context()
|
||||
if captured_context.get("traceparent"):
|
||||
logger.debug(
|
||||
f"Captured traceparent: {captured_context.get('traceparent')}"
|
||||
)
|
||||
|
||||
store_workflow_context("__global_workflow_context__", captured_context)
|
||||
else:
|
||||
logger.warning(
|
||||
"No traceparent found in captured context during __call__"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to capture/store workflow context in __call__: {e}")
|
||||
|
||||
# Handle async vs sync execution
|
||||
if asyncio.iscoroutinefunction(wrapped):
|
||||
return self._handle_async_execution(
|
||||
wrapped, args, kwargs, attributes, workflow_name, agent_name
|
||||
)
|
||||
else:
|
||||
return self._handle_sync_execution(
|
||||
wrapped, args, kwargs, attributes, workflow_name, agent_name
|
||||
)
|
||||
|
||||
def _extract_workflow_name(self, args: Any, kwargs: Any) -> str:
|
||||
"""
|
||||
Extract workflow name from method arguments.
|
||||
|
||||
Args:
|
||||
args: Positional arguments
|
||||
kwargs: Keyword arguments
|
||||
|
||||
Returns:
|
||||
str: Workflow name
|
||||
"""
|
||||
if args and len(args) > 0:
|
||||
workflow = args[0]
|
||||
else:
|
||||
workflow = kwargs.get("workflow")
|
||||
|
||||
# Extract workflow name
|
||||
workflow_name = (
|
||||
|
@ -228,47 +281,42 @@ class WorkflowMonitorWrapper:
|
|||
else getattr(workflow, "__name__", "unknown_workflow")
|
||||
)
|
||||
|
||||
# Build span attributes
|
||||
attributes = self._build_monitor_attributes(instance, workflow_name, arguments)
|
||||
return workflow_name
|
||||
|
||||
# Debug logging to confirm wrapper is being called
|
||||
logger.debug(
|
||||
f"🔍 WorkflowMonitorWrapper creating AGENT span for workflow: {workflow_name}"
|
||||
)
|
||||
|
||||
# Handle async execution
|
||||
return self._handle_async_execution(
|
||||
wrapped, args, kwargs, attributes, workflow_name
|
||||
)
|
||||
|
||||
def _build_monitor_attributes(
|
||||
self, instance: Any, workflow_name: str, arguments: Dict[str, Any]
|
||||
def _build_workflow_attributes(
|
||||
self, workflow_name: str, agent_name: str, args: Any, kwargs: Any
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Build span attributes for DurableAgent workflow monitoring.
|
||||
Build span attributes for workflow execution.
|
||||
|
||||
Args:
|
||||
instance: WorkflowApp instance (DurableAgent)
|
||||
workflow_name: Name of the workflow being monitored
|
||||
arguments: Bound method arguments from the wrapped call
|
||||
workflow_name: Name of the workflow
|
||||
agent_name: Name of the agent
|
||||
args: Positional arguments
|
||||
kwargs: Keyword arguments
|
||||
|
||||
Returns:
|
||||
Dict[str, Any]: Span attributes for the AGENT span
|
||||
"""
|
||||
agent_name = getattr(instance, "name", instance.__class__.__name__)
|
||||
|
||||
# Build basic attributes
|
||||
attributes = {
|
||||
"openinference.span.kind": "AGENT", # DurableAgent workflow execution is the agent action
|
||||
INPUT_MIME_TYPE: "application/json",
|
||||
OUTPUT_MIME_TYPE: "application/json",
|
||||
"workflow.name": workflow_name,
|
||||
"workflow.operation": "run_and_monitor",
|
||||
"agent.execution_mode": "workflow_based",
|
||||
"agent.name": agent_name,
|
||||
"agent.type": instance.__class__.__name__,
|
||||
OUTPUT_MIME_TYPE: "application/json",
|
||||
}
|
||||
|
||||
# Serialize input arguments
|
||||
attributes[INPUT_VALUE] = safe_json_dumps(arguments)
|
||||
# Add input payload if available
|
||||
if args and len(args) > 1:
|
||||
# Second argument is typically the input
|
||||
input_data = args[1]
|
||||
if input_data is not None:
|
||||
attributes[INPUT_VALUE] = safe_json_dumps(input_data)
|
||||
attributes[INPUT_MIME_TYPE] = "application/json"
|
||||
elif "input" in kwargs and kwargs["input"] is not None:
|
||||
attributes[INPUT_VALUE] = safe_json_dumps(kwargs["input"])
|
||||
attributes[INPUT_MIME_TYPE] = "application/json"
|
||||
|
||||
# Add context attributes
|
||||
attributes.update(get_attributes_from_context())
|
||||
|
@ -282,6 +330,7 @@ class WorkflowMonitorWrapper:
|
|||
kwargs: Any,
|
||||
attributes: Dict[str, Any],
|
||||
workflow_name: str,
|
||||
agent_name: str,
|
||||
) -> Any:
|
||||
"""
|
||||
Handle async workflow monitoring execution with context propagation.
|
||||
|
@ -301,42 +350,12 @@ class WorkflowMonitorWrapper:
|
|||
span_name = f"Agent.{workflow_name}"
|
||||
|
||||
# Debug logging to confirm span creation
|
||||
logger.debug(f"✅ Creating AGENT span: {span_name}")
|
||||
logger.debug(f"Creating AGENT span: {span_name}")
|
||||
|
||||
with self._tracer.start_as_current_span(
|
||||
span_name, attributes=attributes
|
||||
) as span:
|
||||
try:
|
||||
# Debug logging to confirm span context
|
||||
logger.debug(f"📋 AGENT span context: {span.get_span_context()}")
|
||||
|
||||
# CRITICAL: Capture and store OpenTelemetry context BEFORE executing workflow
|
||||
# This must happen before wrapped() is called so tasks can access it
|
||||
try:
|
||||
from ..context_propagation import extract_otel_context
|
||||
from ..context_storage import store_workflow_context
|
||||
|
||||
captured_context = extract_otel_context()
|
||||
if captured_context.get("traceparent"):
|
||||
logger.debug("🔗 Captured workflow context in AGENT span")
|
||||
logger.debug(
|
||||
f"🔗 Traceparent: {captured_context.get('traceparent')}"
|
||||
)
|
||||
|
||||
# Store context IMMEDIATELY for workflow tasks to use
|
||||
store_workflow_context(
|
||||
"__global_workflow_context__", captured_context
|
||||
)
|
||||
logger.debug(
|
||||
"🔗 Stored global workflow context for task correlation"
|
||||
)
|
||||
else:
|
||||
logger.warning("⚠️ No traceparent found in captured context")
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"⚠️ Failed to capture/store workflow context: {e}"
|
||||
)
|
||||
|
||||
# Execute workflow and get result
|
||||
result = await wrapped(*args, **kwargs)
|
||||
|
||||
|
@ -349,17 +368,69 @@ class WorkflowMonitorWrapper:
|
|||
span.set_attribute(OUTPUT_VALUE, safe_json_dumps(result))
|
||||
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
logger.debug(f"🎯 AGENT span completed successfully: {span_name}")
|
||||
logger.debug(f"AGENT span completed successfully: {span_name}")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
span.set_status(Status(StatusCode.ERROR, str(e)))
|
||||
span.record_exception(e)
|
||||
logger.error(f"❌ AGENT span failed: {span_name} - {e}")
|
||||
logger.error(f"AGENT span failed: {span_name} - {e}")
|
||||
raise
|
||||
|
||||
return async_wrapper()
|
||||
|
||||
def _handle_sync_execution(
|
||||
self,
|
||||
wrapped: Any,
|
||||
args: Any,
|
||||
kwargs: Any,
|
||||
attributes: Dict[str, Any],
|
||||
workflow_name: str,
|
||||
agent_name: str,
|
||||
) -> Any:
|
||||
"""
|
||||
Handle synchronous workflow monitoring execution with context propagation.
|
||||
|
||||
Args:
|
||||
wrapped: Original synchronous method to execute
|
||||
args: Positional arguments for the wrapped method
|
||||
kwargs: Keyword arguments for the wrapped method
|
||||
attributes: Pre-built span attributes
|
||||
workflow_name: Name of the workflow for span naming
|
||||
|
||||
Returns:
|
||||
Any: Result from wrapped method execution
|
||||
"""
|
||||
span_name = f"Agent.{workflow_name}"
|
||||
|
||||
# Debug logging to confirm span creation
|
||||
logger.debug(f"Creating AGENT span: {span_name}")
|
||||
|
||||
with self._tracer.start_as_current_span(
|
||||
span_name, attributes=attributes
|
||||
) as span:
|
||||
try:
|
||||
# Execute workflow and get result
|
||||
result = wrapped(*args, **kwargs)
|
||||
|
||||
# Set output attributes - handle both string and object results consistently
|
||||
if isinstance(result, str):
|
||||
# If result is already a JSON string, use it directly
|
||||
span.set_attribute(OUTPUT_VALUE, result)
|
||||
else:
|
||||
# If result is an object, serialize it to match input format
|
||||
span.set_attribute(OUTPUT_VALUE, safe_json_dumps(result))
|
||||
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
logger.debug(f"AGENT span completed successfully: {span_name}")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
span.set_status(Status(StatusCode.ERROR, str(e)))
|
||||
span.record_exception(e)
|
||||
logger.error(f"AGENT span failed: {span_name} - {e}")
|
||||
raise
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Exported Classes
|
||||
|
|
|
@ -15,6 +15,7 @@ from dapr.ext.workflow import (
|
|||
from dapr.ext.workflow.workflow_state import WorkflowState
|
||||
from durabletask import task as dtask
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
from typing import ClassVar
|
||||
|
||||
from dapr_agents.agents.base import ChatClientBase
|
||||
from dapr_agents.types.workflow import DaprWorkflowStatus
|
||||
|
@ -31,6 +32,11 @@ class WorkflowApp(BaseModel):
|
|||
A Pydantic-based class to encapsulate a Dapr Workflow runtime and manage workflows and tasks.
|
||||
"""
|
||||
|
||||
# Class-level constant for instrumentation callback
|
||||
# This is used to register the workflow instrumentation callback with the instrumentor
|
||||
# This is needed because the instrumentor is not available immediately when the WorkflowApp is initialized
|
||||
INSTRUMENTATION_CALLBACK_ATTR: ClassVar[str] = "_workflow_instrumentation_callback"
|
||||
|
||||
llm: Optional[ChatClientBase] = Field(
|
||||
default=None,
|
||||
description="The default LLM client for tasks that explicitly require an LLM but don't specify one (optional).",
|
||||
|
@ -436,6 +442,10 @@ class WorkflowApp(BaseModel):
|
|||
logger.info("Starting workflow runtime.")
|
||||
self.wf_runtime.start()
|
||||
self.wf_runtime_is_running = True
|
||||
|
||||
# Apply workflow instrumentation if callback was registered
|
||||
if hasattr(self.__class__, self.INSTRUMENTATION_CALLBACK_ATTR):
|
||||
getattr(self.__class__, self.INSTRUMENTATION_CALLBACK_ATTR)()
|
||||
else:
|
||||
logger.debug("Workflow runtime already running; skipping.")
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
opentelemetry-sdk>=1.25.0
|
||||
opentelemetry-exporter-zipkin-json==1.25.0
|
||||
opentelemetry-exporter-otlp>=1.25.0
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
# For local development use local changes by commenting out the line above and uncommenting the line below:
|
||||
# -e ../../
|
|
@ -1,3 +1,3 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
|
||||
# -e ../../
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
elevenlabs
|
||||
# For local development use local changes by commenting out the dapr-agentsline above and uncommenting the line below:
|
||||
# -e ../../
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
# For local development use local changes by commenting out the dapr-agentsline above and uncommenting the line below:
|
||||
# -e ../../
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
tiktoken==0.9.0
|
||||
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
|
||||
# -e ../../
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
tiktoken==0.9.0
|
||||
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
|
||||
# -e ../../
|
||||
|
|
|
@ -325,6 +325,8 @@ if __name__ == "__main__":
|
|||
asyncio.run(main())
|
||||
```
|
||||
|
||||
Alternatively, you can use the DurableAgent with this same setup using `weather_durable_agent_tracing.py`.
|
||||
|
||||
#### Run with Observability
|
||||
|
||||
1. Ensure Phoenix server is running (see setup steps above)
|
||||
|
@ -335,6 +337,11 @@ if __name__ == "__main__":
|
|||
python weather_agent_tracing.py
|
||||
```
|
||||
|
||||
Alternatively, you can run the DurableAgent using:
|
||||
```bash
|
||||
dapr run --app-id weatheragent --resources-path ./components -- python weather_durable_agent_tracing.py
|
||||
```
|
||||
|
||||
3. View traces in Phoenix UI at [http://localhost:6006](http://localhost:6006)
|
||||
|
||||

|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: agentstatestore
|
||||
spec:
|
||||
type: state.redis
|
||||
version: v1
|
||||
metadata:
|
||||
- name: redisHost
|
||||
value: localhost:6379
|
||||
- name: redisPassword
|
||||
value: ""
|
||||
- name: enableTLS
|
||||
value: "false"
|
||||
- name: keyPrefix
|
||||
value: none
|
|
@ -0,0 +1,16 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: messagepubsub
|
||||
spec:
|
||||
type: pubsub.redis
|
||||
version: v1
|
||||
metadata:
|
||||
- name: redisHost
|
||||
value: localhost:6379
|
||||
- name: redisPassword
|
||||
value: ""
|
||||
- name: consumerID
|
||||
value: "weather-agent-group"
|
||||
- name: enableTLS
|
||||
value: "false"
|
|
@ -0,0 +1,16 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: workflowstatestore
|
||||
spec:
|
||||
type: state.redis
|
||||
version: v1
|
||||
metadata:
|
||||
- name: redisHost
|
||||
value: localhost:6379
|
||||
- name: redisPassword
|
||||
value: ""
|
||||
- name: actorStateStore
|
||||
value: "true"
|
||||
- name: keyPrefix
|
||||
value: none
|
|
@ -1,4 +1,4 @@
|
|||
dapr-agents[observability]>=0.8.1
|
||||
dapr-agents[observability]>=0.8.3
|
||||
arize-phoenix>=11.22.0,<12.0.0
|
||||
arize-phoenix-otel>=0.12.0,<0.13.0
|
||||
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
|
||||
|
|
|
@ -16,7 +16,6 @@ AIAgent = Agent(
|
|||
"At the end, provide a concise summary that combines the weather information for all requested locations and any other actions you performed.",
|
||||
],
|
||||
memory=ConversationDaprStateMemory(store_name="historystore", session_id="some-id"),
|
||||
pattern="toolcalling",
|
||||
tools=tools,
|
||||
)
|
||||
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
import asyncio
|
||||
from weather_tools import tools
|
||||
from dapr_agents import DurableAgent
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
# Wrap your async call
|
||||
async def main():
|
||||
from phoenix.otel import register
|
||||
from dapr_agents.observability import DaprAgentsInstrumentor
|
||||
|
||||
# Register Dapr Agents with Phoenix OpenTelemetry
|
||||
tracer_provider = register(
|
||||
project_name="dapr-weather-durable-agent",
|
||||
protocol="http/protobuf",
|
||||
)
|
||||
|
||||
# Initialize Dapr Agents OpenTelemetry instrumentor
|
||||
try:
|
||||
instrumentor = DaprAgentsInstrumentor()
|
||||
instrumentor.instrument(tracer_provider=tracer_provider, skip_dep_check=True)
|
||||
except Exception as e:
|
||||
raise
|
||||
|
||||
AIAgent = DurableAgent(
|
||||
name="Steviee",
|
||||
role="Weather Assistant",
|
||||
goal="Assist Humans with weather related tasks.",
|
||||
instructions=[
|
||||
"Always answer the user's main weather question directly and clearly.",
|
||||
"If you perform any additional actions (like jumping), summarize those actions and their results.",
|
||||
"At the end, provide a concise summary that combines the weather information for all requested locations and any other actions you performed.",
|
||||
],
|
||||
tools=tools,
|
||||
message_bus_name="messagepubsub",
|
||||
state_store_name="workflowstatestore",
|
||||
agents_registry_store_name="agentstatestore",
|
||||
history_store_name="historystore",
|
||||
)
|
||||
|
||||
await AIAgent.run("What is the weather in Virginia, New York and Washington DC?")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
|
@ -1,4 +1,4 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
arize-phoenix-otel>=0.12.0,<0.13.0
|
||||
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
|
||||
# -e ../../
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
|
||||
# -e ../../
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
|
||||
# -e ../../
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
chainlit==2.6.8
|
||||
unstructured[all-docs]==0.18.11
|
||||
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
starlette==0.47.2
|
||||
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
|
||||
# -e ../../
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
|
||||
# -e ../../
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
|
||||
# -e ../../
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
chainlit==2.6.8
|
||||
psycopg[binary]==3.2.9
|
||||
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
dapr-agents>=0.8.1
|
||||
dapr-agents>=0.8.3
|
||||
arize-phoenix==11.22.0
|
||||
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
|
||||
# -e ../../
|
||||
|
|
|
@ -214,14 +214,14 @@ class TestAgent:
|
|||
await agent_with_tools.execute_tools([tool_call])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_iterations_max_reached(self, basic_agent):
|
||||
async def test_conversation_max_reached(self, basic_agent):
|
||||
"""Test that agent stops immediately when there are no tool calls."""
|
||||
mock_response = Mock(spec=LLMChatResponse)
|
||||
assistant_msg = AssistantMessage(content="Using tool", tool_calls=[])
|
||||
mock_response.get_message.return_value = assistant_msg
|
||||
basic_agent.llm.generate.return_value = mock_response
|
||||
|
||||
result = await basic_agent.process_iterations([])
|
||||
result = await basic_agent.conversation([])
|
||||
|
||||
# current logic sees no tools ===> returns on first iteration
|
||||
assert isinstance(result, AssistantMessage)
|
||||
|
@ -229,14 +229,14 @@ class TestAgent:
|
|||
assert basic_agent.llm.generate.call_count == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_iterations_with_llm_error(self, basic_agent):
|
||||
async def test_conversation_with_llm_error(self, basic_agent):
|
||||
"""Test handling of LLM errors during iterations."""
|
||||
basic_agent.llm.generate.side_effect = Exception("LLM error")
|
||||
|
||||
with pytest.raises(
|
||||
AgentError, match="Failed during chat generation: LLM error"
|
||||
):
|
||||
await basic_agent.process_iterations([])
|
||||
await basic_agent.conversation([])
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_tool_success(self, agent_with_tools):
|
||||
|
|
Loading…
Reference in New Issue