diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 09b767e..69e2e11 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -20,10 +20,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - name: Set up Python 3.9 + - name: Set up Python 3.10 uses: actions/setup-python@v5 with: - python-version: 3.9 + python-version: "3.10" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/dapr_agents/agent/telemetry/__init__.py b/dapr_agents/agent/telemetry/__init__.py new file mode 100644 index 0000000..d428d05 --- /dev/null +++ b/dapr_agents/agent/telemetry/__init__.py @@ -0,0 +1 @@ +from .otel import DaprAgentsOTel diff --git a/dapr_agents/agent/telemetry/otel.py b/dapr_agents/agent/telemetry/otel.py new file mode 100644 index 0000000..3b23efb --- /dev/null +++ b/dapr_agents/agent/telemetry/otel.py @@ -0,0 +1,144 @@ +from logging import Logger +from typing import Union + +from opentelemetry._logs import set_logger_provider +from opentelemetry.metrics import set_meter_provider +from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler +from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import Resource, SERVICE_NAME +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.trace import set_tracer_provider +from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + + +class DaprAgentsOTel: + """ + OpenTelemetry configuration for Dapr agents. + """ + + def __init__(self, service_name: str = "", otlp_endpoint: str = ""): + # Configure OpenTelemetry + self.service_name = service_name + self.otlp_endpoint = otlp_endpoint + + self.setup_resources() + + def setup_resources(self): + """ + Set up the resource for OpenTelemetry. + """ + + self._resource = Resource.create( + attributes={ + SERVICE_NAME: str(self.service_name), + } + ) + + def create_and_instrument_meter_provider( + self, + otlp_endpoint: str = "", + ) -> MeterProvider: + """ + Returns a `MeterProvider` that is configured to export metrics using the `PeriodicExportingMetricReader` + which means that metrics are exported periodically in the background. The interval can be set by + the environment variable `OTEL_METRIC_EXPORT_INTERVAL`. The default value is 60000ms (1 minute). + + Also sets the global OpenTelemetry meter provider to the returned meter provider. + """ + + # Ensure the endpoint is set correctly + endpoint = self._endpoint_validator( + endpoint=self.otlp_endpoint if otlp_endpoint == "" else otlp_endpoint, + telemetry_type="metrics", + ) + + metric_exporter = OTLPMetricExporter(endpoint=str(endpoint)) + metric_reader = PeriodicExportingMetricReader(metric_exporter) + meter_provider = MeterProvider( + resource=self._resource, metric_readers=[metric_reader] + ) + set_meter_provider(meter_provider) + return meter_provider + + def create_and_instrument_tracer_provider( + self, + otlp_endpoint: str = "", + ) -> TracerProvider: + """ + Returns a `TracerProvider` that is configured to export traces using the `BatchSpanProcessor` + which means that traces are exported in batches. The batch size can be set by + the environment variable `OTEL_TRACES_EXPORT_BATCH_SIZE`. The default value is 512. + Also sets the global OpenTelemetry tracer provider to the returned tracer provider. + """ + + # Ensure the endpoint is set correctly + endpoint = self._endpoint_validator( + endpoint=self.otlp_endpoint if otlp_endpoint == "" else otlp_endpoint, + telemetry_type="traces", + ) + + trace_exporter = OTLPSpanExporter(endpoint=str(endpoint)) + tracer_processor = BatchSpanProcessor(trace_exporter) + tracer_provider = TracerProvider(resource=self._resource) + tracer_provider.add_span_processor(tracer_processor) + set_tracer_provider(tracer_provider) + return tracer_provider + + def create_and_instrument_logging_provider( + self, + logger: Logger, + otlp_endpoint: str = "", + ) -> LoggerProvider: + """ + Returns a `LoggingProvider` that is configured to export logs using the `BatchLogProcessor` + which means that logs are exported in batches. The batch size can be set by + the environment variable `OTEL_LOGS_EXPORT_BATCH_SIZE`. The default value is 512. + Also sets the global OpenTelemetry logging provider to the returned logging provider. + """ + + # Ensure the endpoint is set correctly + endpoint = self._endpoint_validator( + endpoint=self.otlp_endpoint if otlp_endpoint == "" else otlp_endpoint, + telemetry_type="logs", + ) + + log_exporter = OTLPLogExporter(endpoint=str(endpoint)) + logging_provider = LoggerProvider(resource=self._resource) + logging_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) + set_logger_provider(logging_provider) + + handler = LoggingHandler(logger_provider=logging_provider) + logger.addHandler(handler) + return logging_provider + + def _endpoint_validator( + self, + endpoint: str, + telemetry_type: str, + ) -> Union[str | Exception]: + """ + Validates the endpoint and method. + """ + + if endpoint == "": + raise ValueError( + "OTLP endpoint must be set either in the environment variable OTEL_EXPORTER_OTLP_ENDPOINT or in the constructor." + ) + if endpoint.startswith("https://"): + raise NotImplementedError( + "OTLP over HTTPS is not supported. Please use HTTP." + ) + + endpoint = ( + endpoint + if endpoint.endswith(f"/v1/{telemetry_type}") + else f"{endpoint}/v1/{telemetry_type}" + ) + endpoint = endpoint if endpoint.startswith("http://") else f"http://{endpoint}" + + return endpoint diff --git a/dapr_agents/tool/http/__init__.py b/dapr_agents/tool/http/__init__.py new file mode 100644 index 0000000..9c9dabf --- /dev/null +++ b/dapr_agents/tool/http/__init__.py @@ -0,0 +1 @@ +from .client import DaprHTTPClient diff --git a/dapr_agents/tool/http/client.py b/dapr_agents/tool/http/client.py new file mode 100644 index 0000000..3590369 --- /dev/null +++ b/dapr_agents/tool/http/client.py @@ -0,0 +1,197 @@ +import os +from typing import Dict, Optional, Any, Union +from distutils.util import strtobool +import logging +import requests + +from pydantic import BaseModel, Field, PrivateAttr +from dapr_agents.types import ToolError +from dapr_agents import tool +from urllib.parse import urlparse +from opentelemetry.instrumentation.requests import RequestsInstrumentor +from opentelemetry import trace +from opentelemetry._logs import set_logger_provider + + +logger = logging.getLogger(__name__) + + +class DaprHTTPClient(BaseModel): + """ + Client for sending HTTP requests to Dapr endpoints. + """ + + dapr_app_id: Optional[str] = Field( + default="", description="Optional name of the Dapr App ID to invoke." + ) + + dapr_http_endpoint: Optional[str] = Field( + default="", + description="Optional name of the HTTPEndpoint to call for invocation", + ) + + http_endpoint: Optional[str] = Field( + default="", description="Optional FQDN URL to request to." + ) + + path: Optional[str] = Field( + default="", description="Optional name of the path to invoke." + ) + + headers: Optional[Dict[str, str]] = Field( + default={}, + description="Default headers to include in all requests.", + ) + + # Private attributes not exposed in model schema + _base_url: str = PrivateAttr(default="http://localhost:3500/v1.0/invoke") + + def model_post_init(self, __context: Any) -> None: + """Initialize the client after the model is created.""" + + try: + otel_enabled: bool = bool( + strtobool(os.getenv("DAPR_AGENTS_OTEL_ENABLED", "True")) + ) + except ValueError: + otel_enabled = False + + if otel_enabled: + from dapr_agents.agent.telemetry.otel import DaprAgentsOTel # type: ignore[import-not-found] + + otel_client = DaprAgentsOTel( + service_name=os.getenv("OTEL_SERVICE_NAME", "dapr-http-client"), + otlp_endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", ""), + ) + tracer = otel_client.create_and_instrument_tracer_provider() + trace.set_tracer_provider(tracer) + + otel_logger = otel_client.create_and_instrument_logging_provider( + logger=logger, + ) + set_logger_provider(otel_logger) + + RequestsInstrumentor().instrument() + + logger.debug("Initializing DaprHTTPClient client") + + super().model_post_init(__context) + + def do_http_request( + self, + payload: dict[str, str], + endpoint: str = "", + path: str = "", + verb: str = "GET", + ) -> Union[tuple[int, str] | ToolError]: + """ + Send a POST request to the specified endpoint with the given input. + + Args: + endpoint_url (str): The host of the URI to send the request to. + payload (dict[str, str]): The payload to include in the request. + path (str): The path of the URI to invoke including any query strings appended. + verb (str): The HTTP verb. Either GET or POST. + Returns: + A tuple with the http status code and respose or a ToolError. + """ + + try: + url = self._validate_endpoint_type( + endpoint=endpoint, path=self.path if path == "" else path + ) + except ToolError as e: + logger.error(f"Error validating endpoint: {e}") + raise e + + logger.debug( + f"[HTTP] Sending POST request to '{url}' with input '{payload}' and headers '{self.headers}" + ) + + match verb.upper(): + case "GET": + response = requests.get(url=str(url), headers=self.headers) + case "POST": + response = requests.post( + url=str(url), headers=self.headers, json=payload + ) + case _: + raise ValueError( + f"Value for 'verb' not in expected format ['GET'|'POST']: {verb}" + ) + + logger.debug( + f"Request returned status code '{response.status_code}' and '{response.text}'" + ) + + if not response.ok: + raise ToolError( + f"Error occured sending the request. Received '{response.status_code}' - '{response.text}'" + ) + + return (response.status_code, response.text) + + def _validate_endpoint_type( + self, endpoint: str, path: Optional[str | None] + ) -> Union[str | ToolError]: + if path == "": + raise ToolError("No path provided. Please provide a valid path.") + + if isinstance(path, str) and path.startswith("/"): + # Remove leading slash + path = path[1:] + + try: + if self.dapr_app_id != "": + # Prefered option + if isinstance(self.dapr_app_id, str) and self.dapr_app_id.endswith("/"): + # Remove trailing slash + self.dapr_app_id = self.dapr_app_id[:-1] + url = f"{self._base_url}/{self.dapr_app_id}/method/{self.path if path == '' else path}" + elif self.dapr_http_endpoint != "": + # Dapr HTTPEndpoint + if isinstance( + self.dapr_http_endpoint, str + ) and self.dapr_http_endpoint.endswith("/"): + # Remove trailing slash + self.dapr_http_endpoint = self.dapr_http_endpoint[:-1] + url = f"{self._base_url}/{self.dapr_http_endpoint}/method/{self.path if path == '' else path}" + elif self.http_endpoint != "": + # FQDN URL + if isinstance(self.http_endpoint, str) and self.http_endpoint.endswith( + "/" + ): + # Remove trailing slash + self.http_endpoint = self.http_endpoint[:-1] + url = f"{self._base_url}/{self.http_endpoint}/method/{self.path if path == '' else path}" + elif endpoint != "": + # Fallback to default + if isinstance(endpoint, str) and endpoint.endswith("/"): + # Remove trailing slash + endpoint = endpoint[:-1] + url = f"{self._base_url}/{endpoint}/method/{self.path if path == '' else path}" + else: + raise ToolError( + "No endpoint provided. Please provide a valid dapr-app-id, HTTPEndpoint or endpoint." + ) + except Exception as e: + logger.error(f"Error validating endpoint: {e}") + raise ToolError( + "Error occured while validating the endpoint. Please check the provided values." + ) + + if not self._validate_url(url): + raise ToolError(f"'{url}' is not a valid URL.") + + return url + + def _validate_url(self, url) -> bool: + """ + Valides URL for HTTP requests + """ + logger.debug(f"[HTTP] Url to be validated: {url}") + try: + parsed_url = urlparse(url=url) + return all([parsed_url.scheme, parsed_url.netloc]) + except AttributeError: + return False diff --git a/dev-requirements.txt b/dev-requirements.txt index 3b24574..e68bb55 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -14,4 +14,7 @@ pyyaml==6.0.2 rich==13.9.4 huggingface_hub==0.27.1 numpy==2.2.2 -mypy==1.15.0 \ No newline at end of file +mypy==1.15.0 +opentelemetry-distro==0.53b1 +opentelemetry-exporter-otlp==1.32.1 +opentelemetry-instrumentation-requests==0.53b1 \ No newline at end of file diff --git a/mypy.ini b/mypy.ini index 4e4aa72..39a65cf 100644 --- a/mypy.ini +++ b/mypy.ini @@ -16,6 +16,9 @@ exclude = [mypy-dapr_agents.agent.*] ignore_errors = True +[mypy-dapr_agents.agent.telemetry.*] +ignore_errors = False + [mypy-dapr_agents.document.*] ignore_errors = True @@ -37,9 +40,21 @@ ignore_errors = True [mypy-dapr_agents.storage.*] ignore_errors = True -[mypy-dapr_agents.tool.*] +[mypy-dapr_agents.tool.mcp.*] ignore_errors = True +[mypy-dapr_agents.tool.utils.*] +ignore_errors = True + +[mypy-dapr_agents.tool.openapi.*] +ignore_errors = True + +[mypy-dapr_agents.tool.base.*] +ignore_errors = True + +[mypy-dapr_agents.tool.http.*] +ignore_errors = False + [mypy-dapr_agents.types.*] ignore_errors = True diff --git a/pyproject.toml b/pyproject.toml index 7c3cd47..9340afd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,9 @@ dependencies = [ "rich == 13.9.4", "huggingface_hub == 0.30.2", "numpy == 2.2.2", + "opentelemetry-distro == 0.53b1", + "opentelemetry-exporter-otlp == 1.32.1", + "opentelemetry-instrumentation-requests == 0.53b1", ] [project.urls] diff --git a/requirements.txt b/requirements.txt index 6ee1dc6..1c9dd13 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,4 +14,7 @@ pyyaml==6.0.2 rich==13.9.4 huggingface_hub==0.30.2 numpy==2.2.2 -mcp==1.6.0 \ No newline at end of file +mcp==1.6.0 +opentelemetry-distro==0.53b1 +opentelemetry-exporter-otlp==1.32.1 +opentelemetry-instrumentation-requests==0.53b1 \ No newline at end of file