From 68d80ac7f9cd6b41b93a62c61af88a3a774ff13e Mon Sep 17 00:00:00 2001 From: Casper Guldbech Nielsen Date: Mon, 5 May 2025 10:46:17 -0700 Subject: [PATCH] feat: add helper functions for extracting and restoring OTel context for context propagation Signed-off-by: Casper Guldbech Nielsen --- dapr_agents/agent/telemetry/otel.py | 38 +++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/dapr_agents/agent/telemetry/otel.py b/dapr_agents/agent/telemetry/otel.py index 89479e5..de1e739 100644 --- a/dapr_agents/agent/telemetry/otel.py +++ b/dapr_agents/agent/telemetry/otel.py @@ -1,5 +1,5 @@ from logging import Logger -from typing import Union +from typing import Any, Union import functools @@ -16,6 +16,7 @@ from opentelemetry.trace import set_tracer_provider from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator class DaprAgentsOTel: @@ -174,12 +175,39 @@ def async_span_decorator(name): @functools.wraps(func) async def wrapper(self, *args, **kwargs): tracer = getattr(self, "_tracer", None) - if tracer: - with tracer.start_as_current_span(name): - return await func(self, *args, **kwargs) - else: + if not tracer: + return await func(self, *args, **kwargs) + + # Extract context from kwargs if available + otel_context = kwargs.pop("otel_context", None) + context = None + + if otel_context: + # Restore parent context from carrier + from opentelemetry.trace.propagation.tracecontext import ( + TraceContextTextMapPropagator, + ) + + carrier = otel_context + context = TraceContextTextMapPropagator().extract(carrier=carrier) + + # Start span with parent context if available + with tracer.start_as_current_span(name, context=context): return await func(self, *args, **kwargs) return wrapper return decorator + + +def extract_otel_context(): + """Extract current OpenTelemetry context for cross-boundary propagation""" + carrier: dict[Any, Any] = {} + TraceContextTextMapPropagator().inject(carrier) + return carrier + + +def restore_otel_context(carrier): + """Restore OpenTelemetry context from carrier""" + context = TraceContextTextMapPropagator().extract(carrier) + return context