feat: add ability to optionally disable internal HTTP send and receive spans (#2802)

This commit is contained in:
Tobias Backer Dirks 2024-09-10 17:45:06 +03:00 committed by GitHub
parent 9cced9757a
commit 02c956190d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 159 additions and 68 deletions

View File

@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-kafka-python` Instrument temporary fork, kafka-python-ng - `opentelemetry-instrumentation-kafka-python` Instrument temporary fork, kafka-python-ng
inside kafka-python's instrumentation inside kafka-python's instrumentation
([#2537](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2537)) ([#2537](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2537))
- `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-fastapi` Add ability to disable internal HTTP send and receive spans
([#2802](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2802))
### Breaking changes ### Breaking changes

View File

@ -483,7 +483,7 @@ def get_default_span_details(scope: dict) -> Tuple[str, dict]:
def _collect_target_attribute( def _collect_target_attribute(
scope: typing.Dict[str, typing.Any] scope: typing.Dict[str, typing.Any],
) -> typing.Optional[str]: ) -> typing.Optional[str]:
""" """
Returns the target path as defined by the Semantic Conventions. Returns the target path as defined by the Semantic Conventions.
@ -529,6 +529,7 @@ class OpenTelemetryMiddleware:
the current globally configured one is used. the current globally configured one is used.
meter_provider: The optional meter provider to use. If omitted meter_provider: The optional meter provider to use. If omitted
the current globally configured one is used. the current globally configured one is used.
exclude_spans: Optionally exclude HTTP `send` and/or `receive` spans from the trace.
""" """
# pylint: disable=too-many-branches # pylint: disable=too-many-branches
@ -547,6 +548,7 @@ class OpenTelemetryMiddleware:
http_capture_headers_server_request: list[str] | None = None, http_capture_headers_server_request: list[str] | None = None,
http_capture_headers_server_response: list[str] | None = None, http_capture_headers_server_response: list[str] | None = None,
http_capture_headers_sanitize_fields: list[str] | None = None, http_capture_headers_sanitize_fields: list[str] | None = None,
exclude_spans: list[typing.Literal["receive", "send"]] | None = None,
): ):
# initialize semantic conventions opt-in if needed # initialize semantic conventions opt-in if needed
_OpenTelemetrySemanticConventionStability._initialize() _OpenTelemetrySemanticConventionStability._initialize()
@ -653,6 +655,12 @@ class OpenTelemetryMiddleware:
) )
or [] or []
) )
self.exclude_receive_span = (
"receive" in exclude_spans if exclude_spans else False
)
self.exclude_send_span = (
"send" in exclude_spans if exclude_spans else False
)
# pylint: disable=too-many-statements # pylint: disable=too-many-statements
async def __call__( async def __call__(
@ -796,8 +804,10 @@ class OpenTelemetryMiddleware:
span.end() span.end()
# pylint: enable=too-many-branches # pylint: enable=too-many-branches
def _get_otel_receive(self, server_span_name, scope, receive): def _get_otel_receive(self, server_span_name, scope, receive):
if self.exclude_receive_span:
return receive
@wraps(receive) @wraps(receive)
async def otel_receive(): async def otel_receive():
with self.tracer.start_as_current_span( with self.tracer.start_as_current_span(
@ -821,6 +831,66 @@ class OpenTelemetryMiddleware:
return otel_receive return otel_receive
def _set_send_span(
self,
server_span_name,
scope,
send,
message,
status_code,
expecting_trailers,
):
"""Set send span attributes and status code."""
with self.tracer.start_as_current_span(
" ".join((server_span_name, scope["type"], "send"))
) as send_span:
if callable(self.client_response_hook):
self.client_response_hook(send_span, scope, message)
if send_span.is_recording():
if message["type"] == "http.response.start":
expecting_trailers = message.get("trailers", False)
send_span.set_attribute("asgi.event.type", message["type"])
if status_code:
set_status_code(
send_span,
status_code,
None,
self._sem_conv_opt_in_mode,
)
return expecting_trailers
def _set_server_span(
self, server_span, message, status_code, duration_attrs
):
"""Set server span attributes and status code."""
if (
server_span.is_recording()
and server_span.kind == trace.SpanKind.SERVER
and "headers" in message
):
custom_response_attributes = (
collect_custom_headers_attributes(
message,
self.http_capture_headers_sanitize_fields,
self.http_capture_headers_server_response,
normalise_response_header_name,
)
if self.http_capture_headers_server_response
else {}
)
if len(custom_response_attributes) > 0:
server_span.set_attributes(custom_response_attributes)
if status_code:
set_status_code(
server_span,
status_code,
duration_attrs,
self._sem_conv_opt_in_mode,
)
def _get_otel_send( def _get_otel_send(
self, self,
server_span, server_span,
@ -834,74 +904,46 @@ class OpenTelemetryMiddleware:
@wraps(send) @wraps(send)
async def otel_send(message: dict[str, Any]): async def otel_send(message: dict[str, Any]):
nonlocal expecting_trailers nonlocal expecting_trailers
with self.tracer.start_as_current_span(
" ".join((server_span_name, scope["type"], "send"))
) as send_span:
if callable(self.client_response_hook):
self.client_response_hook(send_span, scope, message)
status_code = None status_code = None
if message["type"] == "http.response.start": if message["type"] == "http.response.start":
status_code = message["status"] status_code = message["status"]
elif message["type"] == "websocket.send": elif message["type"] == "websocket.send":
status_code = 200 status_code = 200
if send_span.is_recording(): if not self.exclude_send_span:
if message["type"] == "http.response.start": expecting_trailers = self._set_send_span(
expecting_trailers = message.get("trailers", False) server_span_name,
send_span.set_attribute("asgi.event.type", message["type"]) scope,
if ( send,
server_span.is_recording() message,
and server_span.kind == trace.SpanKind.SERVER status_code,
and "headers" in message expecting_trailers,
): )
custom_response_attributes = (
collect_custom_headers_attributes(
message,
self.http_capture_headers_sanitize_fields,
self.http_capture_headers_server_response,
normalise_response_header_name,
)
if self.http_capture_headers_server_response
else {}
)
if len(custom_response_attributes) > 0:
server_span.set_attributes(
custom_response_attributes
)
if status_code:
# We record metrics only once
set_status_code(
server_span,
status_code,
duration_attrs,
self._sem_conv_opt_in_mode,
)
set_status_code(
send_span,
status_code,
None,
self._sem_conv_opt_in_mode,
)
propagator = get_global_response_propagator() self._set_server_span(
if propagator: server_span, message, status_code, duration_attrs
propagator.inject( )
message,
context=set_span_in_context(
server_span, trace.context_api.Context()
),
setter=asgi_setter,
)
content_length = asgi_getter.get(message, "content-length") propagator = get_global_response_propagator()
if content_length: if propagator:
try: propagator.inject(
self.content_length_header = int(content_length[0]) message,
except ValueError: context=set_span_in_context(
pass server_span, trace.context_api.Context()
),
setter=asgi_setter,
)
content_length = asgi_getter.get(message, "content-length")
if content_length:
try:
self.content_length_header = int(content_length[0])
except ValueError:
pass
await send(message)
await send(message)
# pylint: disable=too-many-boolean-expressions # pylint: disable=too-many-boolean-expressions
if ( if (
not expecting_trailers not expecting_trailers

View File

@ -566,6 +566,30 @@ class TestAsgiApplication(AsyncAsgiTestBase):
_SIMULATED_BACKGROUND_TASK_EXECUTION_TIME_S * 10**9, _SIMULATED_BACKGROUND_TASK_EXECUTION_TIME_S * 10**9,
) )
async def test_exclude_internal_spans(self):
"""Test that internal spans are excluded from the emitted spans when
the `exclude_receive_span` or `exclude_send_span` attributes are set.
"""
cases = [
(["receive", "send"], ["GET / http receive", "GET / http send"]),
(["send"], ["GET / http send"]),
(["receive"], ["GET / http receive"]),
([], []),
]
for exclude_spans, excluded_spans in cases:
self.memory_exporter.clear()
app = otel_asgi.OpenTelemetryMiddleware(
simple_asgi, exclude_spans=exclude_spans
)
self.seed_app(app)
await self.send_default_request()
await self.get_all_output()
span_list = self.memory_exporter.get_finished_spans()
self.assertTrue(span_list)
for span in span_list:
for excluded_span in excluded_spans:
self.assertNotEqual(span.name, excluded_span)
async def test_trailers(self): async def test_trailers(self):
"""Test that trailers are emitted as expected and that the server span is ended """Test that trailers are emitted as expected and that the server span is ended
BEFORE the background task is finished.""" BEFORE the background task is finished."""

View File

@ -179,7 +179,7 @@ API
from __future__ import annotations from __future__ import annotations
import logging import logging
from typing import Collection from typing import Collection, Literal
import fastapi import fastapi
from starlette.routing import Match from starlette.routing import Match
@ -222,7 +222,7 @@ class FastAPIInstrumentor(BaseInstrumentor):
@staticmethod @staticmethod
def instrument_app( def instrument_app(
app: fastapi.FastAPI, app,
server_request_hook: ServerRequestHook = None, server_request_hook: ServerRequestHook = None,
client_request_hook: ClientRequestHook = None, client_request_hook: ClientRequestHook = None,
client_response_hook: ClientResponseHook = None, client_response_hook: ClientResponseHook = None,
@ -232,8 +232,28 @@ class FastAPIInstrumentor(BaseInstrumentor):
http_capture_headers_server_request: list[str] | None = None, http_capture_headers_server_request: list[str] | None = None,
http_capture_headers_server_response: list[str] | None = None, http_capture_headers_server_response: list[str] | None = None,
http_capture_headers_sanitize_fields: list[str] | None = None, http_capture_headers_sanitize_fields: list[str] | None = None,
exclude_spans: list[Literal["receive", "send"]] | None = None,
): ):
"""Instrument an uninstrumented FastAPI application.""" """Instrument an uninstrumented FastAPI application.
Args:
app: The fastapi ASGI application callable to forward requests to.
server_request_hook: Optional callback which is called with the server span and ASGI
scope object for every incoming request.
client_request_hook: Optional callback which is called with the internal span, and ASGI
scope and event which are sent as dictionaries for when the method receive is called.
client_response_hook: Optional callback which is called with the internal span, and ASGI
scope and event which are sent as dictionaries for when the method send is called.
tracer_provider: The optional tracer provider to use. If omitted
the current globally configured one is used.
meter_provider: The optional meter provider to use. If omitted
the current globally configured one is used.
excluded_urls: Optional comma delimited string of regexes to match URLs that should not be traced.
http_capture_headers_server_request: Optional list of HTTP headers to capture from the request.
http_capture_headers_server_response: Optional list of HTTP headers to capture from the response.
http_capture_headers_sanitize_fields: Optional list of HTTP headers to sanitize.
exclude_spans: Optionally exclude HTTP `send` and/or `receive` spans from the trace.
"""
if not hasattr(app, "_is_instrumented_by_opentelemetry"): if not hasattr(app, "_is_instrumented_by_opentelemetry"):
app._is_instrumented_by_opentelemetry = False app._is_instrumented_by_opentelemetry = False
@ -273,6 +293,7 @@ class FastAPIInstrumentor(BaseInstrumentor):
http_capture_headers_server_request=http_capture_headers_server_request, http_capture_headers_server_request=http_capture_headers_server_request,
http_capture_headers_server_response=http_capture_headers_server_response, http_capture_headers_server_response=http_capture_headers_server_response,
http_capture_headers_sanitize_fields=http_capture_headers_sanitize_fields, http_capture_headers_sanitize_fields=http_capture_headers_sanitize_fields,
exclude_spans=exclude_spans,
) )
app._is_instrumented_by_opentelemetry = True app._is_instrumented_by_opentelemetry = True
if app not in _InstrumentedFastAPI._instrumented_fastapi_apps: if app not in _InstrumentedFastAPI._instrumented_fastapi_apps:
@ -323,6 +344,7 @@ class FastAPIInstrumentor(BaseInstrumentor):
else parse_excluded_urls(_excluded_urls) else parse_excluded_urls(_excluded_urls)
) )
_InstrumentedFastAPI._meter_provider = kwargs.get("meter_provider") _InstrumentedFastAPI._meter_provider = kwargs.get("meter_provider")
_InstrumentedFastAPI._exclude_spans = kwargs.get("exclude_spans")
fastapi.FastAPI = _InstrumentedFastAPI fastapi.FastAPI = _InstrumentedFastAPI
def _uninstrument(self, **kwargs): def _uninstrument(self, **kwargs):
@ -373,6 +395,7 @@ class _InstrumentedFastAPI(fastapi.FastAPI):
http_capture_headers_server_request=_InstrumentedFastAPI._http_capture_headers_server_request, http_capture_headers_server_request=_InstrumentedFastAPI._http_capture_headers_server_request,
http_capture_headers_server_response=_InstrumentedFastAPI._http_capture_headers_server_response, http_capture_headers_server_response=_InstrumentedFastAPI._http_capture_headers_server_response,
http_capture_headers_sanitize_fields=_InstrumentedFastAPI._http_capture_headers_sanitize_fields, http_capture_headers_sanitize_fields=_InstrumentedFastAPI._http_capture_headers_sanitize_fields,
exclude_spans=_InstrumentedFastAPI._exclude_spans,
) )
self._is_instrumented_by_opentelemetry = True self._is_instrumented_by_opentelemetry = True
_InstrumentedFastAPI._instrumented_fastapi_apps.add(self) _InstrumentedFastAPI._instrumented_fastapi_apps.add(self)