use dapr client in python sdk for LLM

Signed-off-by: Filinto Duran <1373693+filintod@users.noreply.github.com>
This commit is contained in:
Filinto Duran 2025-09-08 15:11:45 -05:00
parent 249ea5ec43
commit aa613f87ca
No known key found for this signature in database
GPG Key ID: FCA5E10CA2CB458A
16 changed files with 542 additions and 59 deletions

View File

@ -14,7 +14,8 @@ from typing import (
Union,
)
from dapr.clients.grpc._request import ConversationInput
# Alpha2 uses message variants; keep alpha1 import removed
from dapr.clients.grpc import conversation as dapr_conversation
from pydantic import BaseModel, Field
from dapr_agents.llm.chat import ChatClientBase
@ -86,17 +87,22 @@ class DaprChatClient(DaprInferenceClientBase, ChatClientBase):
def translate_response(self, response: dict, model: str) -> dict:
"""
Convert Dapr response into OpenAI-style ChatCompletion dict.
Convert Dapr Alpha2 response into OpenAI-style ChatCompletion dict.
"""
choices = [
{
"finish_reason": "stop",
"index": idx,
"message": {"role": "assistant", "content": out["result"]},
"logprobs": None,
}
for idx, out in enumerate(response.get("outputs", []))
]
choices = []
outputs = response.get("outputs", []) or []
for out in outputs:
for ch in out.get("choices", []) or []:
msg = ch.get("message", {})
finish = ch.get("finish_reason", "stop")
choices.append(
{
"finish_reason": finish,
"index": len(choices),
"message": msg,
"logprobs": None,
}
)
return {
"choices": choices,
"created": int(time.time()),
@ -105,20 +111,64 @@ class DaprChatClient(DaprInferenceClientBase, ChatClientBase):
"usage": {"total_tokens": "-1"},
}
def convert_to_conversation_inputs(
self, inputs: List[Dict[str, Any]]
) -> List[ConversationInput]:
def _to_alpha2_messages(self, inputs: List[Dict[str, Any]]) -> List[dapr_conversation.ConversationInputAlpha2]:
"""
Map normalized messages into Dapr ConversationInput objects.
Convert normalized messages into a single Alpha2 conversation input containing
a sequence of message variants (system/user/assistant/tool).
"""
return [
ConversationInput(
content=item["content"],
role=item.get("role"),
scrub_pii=bool(item.get("scrubPII")),
)
for item in inputs
]
alpha2_messages: List[Any] = []
for item in inputs:
role = item.get("role")
content = item.get("content")
if role == "system":
alpha2_messages.append(dapr_conversation.create_system_message(content))
elif role == "user":
alpha2_messages.append(dapr_conversation.create_user_message(content))
elif role == "assistant":
# Support assistant with tool_calls when present (Alpha2 requires this for tool responses)
tool_calls_src = item.get("tool_calls") or []
if tool_calls_src:
tool_calls = []
for tc in tool_calls_src:
fn = (tc or {}).get("function", {})
tool_calls.append(
dapr_conversation.ConversationToolCalls(
id=(tc or {}).get("id", ""),
function=dapr_conversation.ConversationToolCallsOfFunction(
name=fn.get("name", ""),
arguments=fn.get("arguments", ""),
),
)
)
alpha2_messages.append(
dapr_conversation.ConversationMessage(
of_assistant=dapr_conversation.ConversationMessageOfAssistant(
content=[
dapr_conversation.ConversationMessageContent(text=content)
]
if content
else [],
tool_calls=tool_calls,
)
)
)
else:
alpha2_messages.append(dapr_conversation.create_assistant_message(content))
elif role == "tool":
tool_call_id = item.get("tool_call_id") or ""
tool_name = item.get("name") or "tool"
alpha2_messages.append(
dapr_conversation.create_tool_message(
tool_id=tool_call_id,
name=tool_name,
content=content,
)
)
else:
# default to user if unknown
alpha2_messages.append(dapr_conversation.create_user_message(content))
return [dapr_conversation.ConversationInputAlpha2(messages=alpha2_messages)]
def generate(
self,
@ -205,23 +255,25 @@ class DaprChatClient(DaprInferenceClientBase, ChatClientBase):
structured_mode=structured_mode,
)
# 6) Convert to Dapr inputs & call
conv_inputs = self.convert_to_conversation_inputs(params["inputs"])
# 6) Build Alpha2 inputs and call the Alpha2 API with tools
alpha2_inputs = self._to_alpha2_messages(params["inputs"])
try:
logger.info("Invoking the Dapr Conversation API.")
raw = self.client.chat_completion(
logger.info("Invoking the Dapr Conversation API (Alpha2).")
raw = self.client.chat_completion_alpha2(
llm=llm_component or self._llm_component,
conversation_inputs=conv_inputs,
inputs=alpha2_inputs,
scrub_pii=scrubPII,
temperature=temperature,
tools=params.get("tools"),
tool_choice=params.get("tool_choice"),
)
normalized = self.translate_response(
raw, llm_component or self._llm_component
)
logger.info("Chat completion retrieved successfully.")
logger.info("Chat completion (Alpha2) retrieved successfully.")
except Exception as e:
logger.error(
f"An error occurred during the Dapr Conversation API call: {e}"
f"An error occurred during the Dapr Conversation Alpha2 API call: {e}"
)
raise

View File

@ -1,11 +1,11 @@
from dapr_agents.types.llm import DaprInferenceClientConfig
from dapr_agents.llm.base import LLMClientBase
from dapr.clients import DaprClient
from dapr.clients.grpc._request import ConversationInput
from dapr.clients.grpc._response import ConversationResponse
from typing import Dict, Any, List
from dapr.clients.grpc import conversation as dapr_conversation
from typing import Dict, Any, List, Optional
from pydantic import model_validator
import json
import logging
logger = logging.getLogger(__name__)
@ -15,34 +15,114 @@ class DaprInferenceClient:
def __init__(self):
self.dapr_client = DaprClient()
def translate_to_json(self, response: ConversationResponse) -> dict:
response_dict = {
"outputs": [
{
"result": output.result,
}
for output in response.outputs
]
}
# ──────────────────────────────────────────────────────────────────────────
# Alpha2 (Tool Calling) support
# ──────────────────────────────────────────────────────────────────────────
def _alpha2_tools_from_openai_like(
self, tools: Optional[List[Dict[str, Any]]]
) -> Optional[List[dapr_conversation.ConversationTools]]:
"""
Convert OpenAI-style tools (type=function, function={name, description, parameters})
into Dapr ConversationTools objects for Alpha2.
"""
if not tools:
return None
converted: List[dapr_conversation.ConversationTools] = []
for tool in tools:
fn = tool.get("function", {}) if isinstance(tool, dict) else {}
name = fn.get("name")
description = fn.get("description")
parameters = fn.get("parameters")
function_spec = dapr_conversation.ConversationToolsFunction(
name=name or "",
description=description or "",
parameters=parameters or {},
)
conv_tool = dapr_conversation.ConversationTools(function=function_spec)
converted.append(conv_tool)
return converted
return response_dict
def chat_completion(
def chat_completion_alpha2(
self,
*,
llm: str,
conversation_inputs: List[ConversationInput],
scrub_pii: bool | None = None,
temperature: float | None = None,
) -> Any:
response = self.dapr_client.converse_alpha1(
inputs: List[dapr_conversation.ConversationInputAlpha2],
scrub_pii: Optional[bool] = None,
temperature: Optional[float] = None,
tools: Optional[List[Dict[str, Any]]] = None,
tool_choice: Optional[str] = None,
context_id: Optional[str] = None,
parameters: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Invoke Dapr Conversation API Alpha2 with optional tool-calling support and
convert the response into a simplified OpenAI-like JSON envelope.
"""
conv_tools = self._alpha2_tools_from_openai_like(tools)
if not temperature:
temperature = 1
response_alpha2 = self.dapr_client.converse_alpha2(
name=llm,
inputs=conversation_inputs,
inputs=inputs,
context_id=context_id,
parameters=parameters,
scrub_pii=scrub_pii,
temperature=temperature,
tools=conv_tools,
tool_choice=tool_choice,
)
output = self.translate_to_json(response)
return output
outputs: List[Dict[str, Any]] = []
for output in getattr(response_alpha2, "outputs", []) or []:
choices_list: List[Dict[str, Any]] = []
for choice in getattr(output, "choices", []) or []:
msg = getattr(choice, "message", None)
content = getattr(msg, "content", None) if msg else None
# Convert tool calls if present
tool_calls_json: Optional[List[Dict[str, Any]]] = None
if msg and getattr(msg, "tool_calls", None):
tool_calls_json = []
for tc in msg.tool_calls:
fn = getattr(tc, "function", None)
arguments = getattr(fn, "arguments", None) if fn else None
if isinstance(arguments, (dict, list)):
try:
arguments = json.dumps(arguments)
except Exception:
arguments = str(arguments)
elif arguments is None:
arguments = ""
tool_calls_json.append(
{
"id": getattr(tc, "id", ""),
"type": "function",
"function": {
"name": getattr(fn, "name", "") if fn else "",
"arguments": arguments,
},
}
)
choices_list.append(
{
"message": {
"role": "assistant",
"content": content,
**({"tool_calls": tool_calls_json} if tool_calls_json else {}),
},
"finish_reason": "stop",
}
)
outputs.append({"choices": choices_list})
return {
"context_id": getattr(response_alpha2, "context_id", None),
"outputs": outputs,
}
class DaprInferenceClientBase(LLMClientBase):

View File

@ -1,11 +1,13 @@
import logging
import time
from typing import Any, Dict
from typing import Any, Dict, Optional, List
from dapr_agents.types.message import (
AssistantMessage,
LLMChatCandidate,
LLMChatResponse,
ToolCall,
FunctionCall,
)
logger = logging.getLogger(__name__)
@ -22,17 +24,41 @@ def process_dapr_chat_response(response: Dict[str, Any]) -> LLMChatResponse:
LLMChatResponse: Contains a list of candidates and metadata.
"""
# 1) Extract each choice → build AssistantMessage + LLMChatCandidate
candidates = []
candidates: List[LLMChatCandidate] = []
for choice in response.get("choices", []):
msg = choice.get("message", {})
# Build tool_calls if present (OpenAI-like)
tool_calls: Optional[List[ToolCall]] = None
if msg.get("tool_calls"):
tool_calls = []
for tc in msg["tool_calls"]:
try:
tool_calls.append(
ToolCall(
id=tc.get("id", ""),
type=tc.get("type", "function"),
function=FunctionCall(
name=tc.get("function", {}).get("name", ""),
arguments=tc.get("function", {}).get("arguments", ""),
),
)
)
except Exception:
logger.exception(f"Invalid tool_call entry: {tc}")
function_call = None
if tool_calls:
function_call = tool_calls[0].function
assistant_message = AssistantMessage(
content=msg.get("content"),
# Dapr currently never returns refusals, tool_calls or function_call here
tool_calls=tool_calls,
function_call=function_call,
)
candidate = LLMChatCandidate(
message=assistant_message,
finish_reason=choice.get("finish_reason"),
# Dapr translate_response includes index & no logprobs
index=choice.get("index"),
logprobs=choice.get("logprobs"),
)

View File

@ -162,7 +162,7 @@ def to_function_call_definition(
fmt = format_type.lower()
# OpenAIstyle wrapper schema:
if fmt in ("openai", "nvidia", "huggingface"):
if fmt in ("openai", "nvidia", "huggingface", "dapr"):
return to_openai_function_call_definition(
name, description, args_schema, use_deprecated
)

View File

@ -59,8 +59,10 @@ class ToolHelper:
return validate_and_format_tool(tool, tool_format, use_deprecated)
if not isinstance(tool, AgentTool):
raise TypeError(f"Unsupported tool type: {type(tool).__name__}")
# Treat 'dapr' like OpenAI-style function tools; Dapr client will convert
fmt = tool_format if tool_format != "dapr" else "openai"
return tool.to_function_call(
format_type=tool_format, use_deprecated=use_deprecated
format_type=fmt, use_deprecated=use_deprecated
)
@staticmethod

View File

@ -0,0 +1,28 @@
# Durable Agent Multi-Tool with Dapr LLM (Alpha2)
This quickstart demonstrates a Durable Agent that may call multiple tools (weather, calculator, web search) using Dapr Conversation API Alpha2 with tool-calling.
## Setup
```bash
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
Set environment for your Dapr sidecar and LLM component:
```bash
export DAPR_HTTP_PORT=3500
export DAPR_GRPC_PORT=56178
export DAPR_LLM_COMPONENT_DEFAULT=openai # or google
```
Optionally set API keys for your chosen LLM component (e.g., OPENAI_API_KEY).
## Run
```bash
python multi_tool_agent_dapr.py
```
The agent will orchestrate tool calls to answer a multi-step query.

View File

@ -0,0 +1,51 @@
from dapr_agents import DurableAgent
from dapr_agents.llm.dapr import DaprChatClient
from dotenv import load_dotenv
from multi_tools import tools
import asyncio
import logging
import os
async def main():
load_dotenv()
logging.basicConfig(level=logging.INFO)
# Ensure default Dapr LLM component is set (e.g., "openai" or "google")
os.environ.setdefault("DAPR_LLM_COMPONENT_DEFAULT", "openai")
agent = DurableAgent(
role="Research And Weather Assistant",
name="Alex",
goal=(
"Help humans get weather and general information; when needed, use tools like"
" weather lookup, calculator, and web search to answer multi-part queries."
),
instructions=[
"Be concise and accurate.",
"Use the calculator for numeric expressions.",
"Use web search for general facts when asked.",
"Use the weather tool for location-based weather.",
],
message_bus_name="pubsub",#messagepubsub",
state_store_name="statestore",#workflowstatestore",
state_key="workflow_state",
agents_registry_store_name="statestore",#workflowstatestore",
agents_registry_key="agents_registry",
tools=tools,
llm=DaprChatClient(),
)
# An example prompt that can require multiple tool calls
prompt = (
"What's the current weather in Boston, MA, then compute (14*7)+23, and finally "
"search for the official tourism site for Boston?"
)
await agent.run(prompt)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,92 @@
from dapr_agents import tool
from pydantic import BaseModel, Field
from typing import List
class GetWeatherSchema(BaseModel):
location: str = Field(description="location to get weather for")
@tool(args_model=GetWeatherSchema)
def get_weather(location: str) -> str:
"""Get weather information based on location."""
import random
temperature = random.randint(60, 85)
return f"{location}: {temperature}F."
class CalculateSchema(BaseModel):
expression: str = Field(description="Arithmetic expression like '2+2' or '14*7+23'")
@tool(args_model=CalculateSchema)
def calculate(expression: str) -> str:
"""Evaluate a simple arithmetic expression safely."""
import math
import operator
import re
# Very basic evaluator supporting + - * / parentheses and integers
tokens = re.findall(r"\d+|[()+\-*/]", expression.replace(" ", ""))
if not tokens:
return "Invalid expression"
def precedence(op):
return {"+": 1, "-": 1, "*": 2, "/": 2}.get(op, 0)
def apply_op(a, b, op):
ops = {
"+": operator.add,
"-": operator.sub,
"*": operator.mul,
"/": operator.truediv,
}
return str(ops[op](float(a), float(b)))
values = []
ops = []
for tok in tokens:
if tok.isdigit():
values.append(tok)
elif tok == "(":
ops.append(tok)
elif tok == ")":
while ops and ops[-1] != "(":
b = values.pop()
a = values.pop()
op = ops.pop()
values.append(apply_op(a, b, op))
ops.pop()
else:
while ops and precedence(ops[-1]) >= precedence(tok):
b = values.pop()
a = values.pop()
op = ops.pop()
values.append(apply_op(a, b, op))
ops.append(tok)
while ops:
b = values.pop()
a = values.pop()
op = ops.pop()
values.append(apply_op(a, b, op))
return values[-1]
class SearchSchema(BaseModel):
query: str = Field(description="web search query")
limit: int = Field(default=3, ge=1, le=10, description="max results")
@tool(args_model=SearchSchema)
def web_search(query: str, limit: int = 3) -> List[str]:
"""Fake web search that returns example links for a query."""
base = "https://example.org/search?q="
return [f"{base}{query}&n={i+1}" for i in range(limit)]
tools = [get_weather, calculate, web_search]

View File

@ -0,0 +1,3 @@
dapr-agents
python-dotenv

View File

@ -0,0 +1,41 @@
# Durable Agent Tool Call with Dapr LLM (Alpha2)
This quickstart mirrors `03-durable-agent-tool-call/` but uses the Dapr Conversation API Alpha2 as the LLM provider with tool calling.
## Prerequisites
- Python 3.10+
- Dapr CLI installed and initialized
## Setup
```bash
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
Create a `.env` with any provider-specific keys your chosen Dapr LLM component requires (e.g., OpenAI):
```env
OPENAI_API_KEY=your_api_key_here
```
Set the default Dapr LLM component name (matches a `components/*.yaml`):
```bash
export DAPR_LLM_COMPONENT_DEFAULT=openai
```
## Run
```bash
dapr run --app-id durableweatherapp \
--resources-path ./components \
-- python durable_weather_agent_dapr.py
```
## Files
- `durable_weather_agent_dapr.py`: Durable agent using `llm_provider="dapr"`
- `weather_tools.py`: sample tools
- `components/`: Dapr components for LLM and state/pubsub
Notes:
- Alpha2 currently does not support streaming; this example is non-streaming.
- Tool calling is enabled via Alpha2 `converse_alpha2` under the hood.

View File

@ -0,0 +1,13 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: openai
spec:
type: llm.openai
version: v1
metadata:
- name: apiKey
secretKeyRef:
name: openai_api_key
key: OPENAI_API_KEY

View File

@ -0,0 +1,11 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagepubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379

View File

@ -0,0 +1,11 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: workflowstatestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379

View File

@ -0,0 +1,40 @@
from dapr_agents import DurableAgent
from dapr_agents.llm.dapr import DaprChatClient
from dotenv import load_dotenv
from weather_tools import tools
import asyncio
import logging
import os
async def main():
load_dotenv()
logging.basicConfig(level=logging.INFO)
# Ensure default Dapr LLM component is set (e.g., "openai")
os.environ.setdefault("DAPR_LLM_COMPONENT_DEFAULT", "openai")
agent = DurableAgent(
role="Weather Assistant",
name="Stevie",
goal="Help humans get weather and location info using smart tools.",
instructions=[
"Respond clearly and helpfully to weather-related questions.",
"Use tools when appropriate to fetch weather data.",
],
message_bus_name="pubsub",#"messagepubsub",
state_store_name="statestore",#"workflowstatestore",
state_key="workflow_state",
agents_registry_store_name="statestore",#"workflowstatestore",
agents_registry_key="agents_registry",
tools=tools,
llm=DaprChatClient(),
)
await agent.run("What's the weather in Boston?")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,3 @@
dapr-agents
python-dotenv

View File

@ -0,0 +1,30 @@
from dapr_agents import tool
from pydantic import BaseModel, Field
class GetWeatherSchema(BaseModel):
location: str = Field(description="location to get weather for")
@tool(args_model=GetWeatherSchema)
def get_weather(location: str) -> str:
"""Get weather information based on location."""
import random
temperature = random.randint(60, 80)
return f"{location}: {temperature}F."
class JumpSchema(BaseModel):
distance: str = Field(description="Distance for agent to jump")
@tool(args_model=JumpSchema)
def jump(distance: str) -> str:
"""Jump a specific distance."""
return f"I jumped the following distance {distance}"
tools = [get_weather, jump]