HTTP semantic convention stability migration for aiohttp-client (#2673)

This commit is contained in:
Emídio Neto 2024-07-15 17:49:35 -03:00 committed by GitHub
parent 7c7a2a4312
commit 7e48ee7254
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 348 additions and 56 deletions

View File

@ -29,6 +29,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2630](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2630))
- `opentelemetry-instrumentation-system-metrics` Add support for capture open file descriptors
([#2652](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2652))
- `opentelemetry-instrumentation-aiohttp-client` Implement new semantic convention opt-in migration
([#2673](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2673))
### Breaking changes

View File

@ -2,7 +2,7 @@
| Instrumentation | Supported Packages | Metrics support | Semconv status |
| --------------- | ------------------ | --------------- | -------------- |
| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika >= 7.2.0, < 10.0.0 | No | experimental
| [opentelemetry-instrumentation-aiohttp-client](./opentelemetry-instrumentation-aiohttp-client) | aiohttp ~= 3.0 | No | experimental
| [opentelemetry-instrumentation-aiohttp-client](./opentelemetry-instrumentation-aiohttp-client) | aiohttp ~= 3.0 | No | migration
| [opentelemetry-instrumentation-aiohttp-server](./opentelemetry-instrumentation-aiohttp-server) | aiohttp ~= 3.0 | No | experimental
| [opentelemetry-instrumentation-aiopg](./opentelemetry-instrumentation-aiopg) | aiopg >= 0.13.0, < 2.0.0 | No | experimental
| [opentelemetry-instrumentation-asgi](./opentelemetry-instrumentation-asgi) | asgiref ~= 3.0 | Yes | migration

View File

@ -90,19 +90,28 @@ import yarl
from opentelemetry import context as context_api
from opentelemetry import trace
from opentelemetry.instrumentation._semconv import (
_get_schema_url,
_HTTPStabilityMode,
_OpenTelemetrySemanticConventionStability,
_OpenTelemetryStabilitySignalType,
_report_new,
_set_http_method,
_set_http_url,
_set_status,
)
from opentelemetry.instrumentation.aiohttp_client.package import _instruments
from opentelemetry.instrumentation.aiohttp_client.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
http_status_to_status_code,
is_instrumentation_enabled,
unwrap,
)
from opentelemetry.propagate import inject
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
from opentelemetry.trace import Span, SpanKind, TracerProvider, get_tracer
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util.http import remove_url_credentials
from opentelemetry.util.http import remove_url_credentials, sanitize_method
_UrlFilterT = typing.Optional[typing.Callable[[yarl.URL], str]]
_RequestHookT = typing.Optional[
@ -122,11 +131,46 @@ _ResponseHookT = typing.Optional[
]
def _get_span_name(method: str) -> str:
method = sanitize_method(method.upper().strip())
if method == "_OTHER":
method = "HTTP"
return method
def _set_http_status_code_attribute(
span,
status_code,
metric_attributes=None,
sem_conv_opt_in_mode=_HTTPStabilityMode.DEFAULT,
):
status_code_str = str(status_code)
try:
status_code = int(status_code)
except ValueError:
status_code = -1
if metric_attributes is None:
metric_attributes = {}
# When we have durations we should set metrics only once
# Also the decision to include status code on a histogram should
# not be dependent on tracing decisions.
_set_status(
span,
metric_attributes,
status_code,
status_code_str,
server_span=False,
sem_conv_opt_in_mode=sem_conv_opt_in_mode,
)
def create_trace_config(
url_filter: _UrlFilterT = None,
request_hook: _RequestHookT = None,
response_hook: _ResponseHookT = None,
tracer_provider: TracerProvider = None,
sem_conv_opt_in_mode: _HTTPStabilityMode = _HTTPStabilityMode.DEFAULT,
) -> aiohttp.TraceConfig:
"""Create an aiohttp-compatible trace configuration.
@ -167,9 +211,12 @@ def create_trace_config(
__name__,
__version__,
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
schema_url=_get_schema_url(sem_conv_opt_in_mode),
)
# TODO: Use this when we have durations for aiohttp-client
metric_attributes = {}
def _end_trace(trace_config_ctx: types.SimpleNamespace):
context_api.detach(trace_config_ctx.token)
trace_config_ctx.span.end()
@ -183,18 +230,22 @@ def create_trace_config(
trace_config_ctx.span = None
return
http_method = params.method.upper()
request_span_name = f"{http_method}"
http_method = params.method
request_span_name = _get_span_name(http_method)
request_url = (
remove_url_credentials(trace_config_ctx.url_filter(params.url))
if callable(trace_config_ctx.url_filter)
else remove_url_credentials(str(params.url))
)
span_attributes = {
SpanAttributes.HTTP_METHOD: http_method,
SpanAttributes.HTTP_URL: request_url,
}
span_attributes = {}
_set_http_method(
span_attributes,
http_method,
request_span_name,
sem_conv_opt_in_mode,
)
_set_http_url(span_attributes, request_url, sem_conv_opt_in_mode)
trace_config_ctx.span = trace_config_ctx.tracer.start_span(
request_span_name, kind=SpanKind.CLIENT, attributes=span_attributes
@ -219,14 +270,13 @@ def create_trace_config(
if callable(response_hook):
response_hook(trace_config_ctx.span, params)
_set_http_status_code_attribute(
trace_config_ctx.span,
params.response.status,
metric_attributes,
sem_conv_opt_in_mode,
)
if trace_config_ctx.span.is_recording():
trace_config_ctx.span.set_status(
Status(http_status_to_status_code(int(params.response.status)))
)
trace_config_ctx.span.set_attribute(
SpanAttributes.HTTP_STATUS_CODE, params.response.status
)
_end_trace(trace_config_ctx)
async def on_request_exception(
@ -238,7 +288,13 @@ def create_trace_config(
return
if trace_config_ctx.span.is_recording() and params.exception:
trace_config_ctx.span.set_status(Status(StatusCode.ERROR))
exc_type = type(params.exception).__qualname__
if _report_new(sem_conv_opt_in_mode):
trace_config_ctx.span.set_attribute(ERROR_TYPE, exc_type)
trace_config_ctx.span.set_status(
Status(StatusCode.ERROR, exc_type)
)
trace_config_ctx.span.record_exception(params.exception)
if callable(response_hook):
@ -271,6 +327,7 @@ def _instrument(
trace_configs: typing.Optional[
typing.Sequence[aiohttp.TraceConfig]
] = None,
sem_conv_opt_in_mode: _HTTPStabilityMode = _HTTPStabilityMode.DEFAULT,
):
"""Enables tracing of all ClientSessions
@ -293,6 +350,7 @@ def _instrument(
request_hook=request_hook,
response_hook=response_hook,
tracer_provider=tracer_provider,
sem_conv_opt_in_mode=sem_conv_opt_in_mode,
)
trace_config._is_instrumented_by_opentelemetry = True
client_trace_configs.append(trace_config)
@ -344,12 +402,17 @@ class AioHttpClientInstrumentor(BaseInstrumentor):
``trace_configs``: An optional list of aiohttp.TraceConfig items, allowing customize enrichment of spans
based on aiohttp events (see specification: https://docs.aiohttp.org/en/stable/tracing_reference.html)
"""
_OpenTelemetrySemanticConventionStability._initialize()
_sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.HTTP,
)
_instrument(
tracer_provider=kwargs.get("tracer_provider"),
url_filter=kwargs.get("url_filter"),
request_hook=kwargs.get("request_hook"),
response_hook=kwargs.get("response_hook"),
trace_configs=kwargs.get("trace_configs"),
sem_conv_opt_in_mode=_sem_conv_opt_in_mode,
)
def _uninstrument(self, **kwargs):

View File

@ -14,3 +14,7 @@
_instruments = ("aiohttp ~= 3.0",)
_supports_metrics = False
_semconv_status = "migration"

View File

@ -28,10 +28,21 @@ from pkg_resources import iter_entry_points
from opentelemetry import trace as trace_api
from opentelemetry.instrumentation import aiohttp_client
from opentelemetry.instrumentation._semconv import (
OTEL_SEMCONV_STABILITY_OPT_IN,
_HTTPStabilityMode,
_OpenTelemetrySemanticConventionStability,
)
from opentelemetry.instrumentation.aiohttp_client import (
AioHttpClientInstrumentor,
)
from opentelemetry.instrumentation.utils import suppress_instrumentation
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
from opentelemetry.semconv.attributes.http_attributes import (
HTTP_REQUEST_METHOD,
HTTP_RESPONSE_STATUS_CODE,
)
from opentelemetry.semconv.attributes.url_attributes import URL_FULL
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import Span, StatusCode
@ -59,7 +70,23 @@ def run_with_test_server(
class TestAioHttpIntegration(TestBase):
def assert_spans(self, spans):
_test_status_codes = (
(HTTPStatus.OK, StatusCode.UNSET),
(HTTPStatus.TEMPORARY_REDIRECT, StatusCode.UNSET),
(HTTPStatus.NOT_FOUND, StatusCode.ERROR),
(HTTPStatus.BAD_REQUEST, StatusCode.ERROR),
(HTTPStatus.SERVICE_UNAVAILABLE, StatusCode.ERROR),
(HTTPStatus.GATEWAY_TIMEOUT, StatusCode.ERROR),
)
def setUp(self):
super().setUp()
_OpenTelemetrySemanticConventionStability._initialized = False
def assert_spans(self, spans, num_spans=1):
finished_spans = self.memory_exporter.get_finished_spans()
self.assertEqual(num_spans, len(finished_spans))
self.assertEqual(
[
(
@ -67,7 +94,7 @@ class TestAioHttpIntegration(TestBase):
(span.status.status_code, span.status.description),
dict(span.attributes),
)
for span in self.memory_exporter.get_finished_spans()
for span in finished_spans
],
spans,
)
@ -99,39 +126,72 @@ class TestAioHttpIntegration(TestBase):
return run_with_test_server(client_request, url, handler)
def test_status_codes(self):
for status_code, span_status in (
(HTTPStatus.OK, StatusCode.UNSET),
(HTTPStatus.TEMPORARY_REDIRECT, StatusCode.UNSET),
(HTTPStatus.SERVICE_UNAVAILABLE, StatusCode.ERROR),
(
HTTPStatus.GATEWAY_TIMEOUT,
StatusCode.ERROR,
),
):
for status_code, span_status in self._test_status_codes:
with self.subTest(status_code=status_code):
path = "test-path?query=param#foobar"
host, port = self._http_request(
trace_config=aiohttp_client.create_trace_config(),
url="/test-path?query=param#foobar",
url=f"/{path}",
status_code=status_code,
)
url = f"http://{host}:{port}/{path}"
attributes = {
SpanAttributes.HTTP_METHOD: "GET",
SpanAttributes.HTTP_URL: url,
SpanAttributes.HTTP_STATUS_CODE: status_code,
}
spans = [("GET", (span_status, None), attributes)]
self.assert_spans(spans)
self.memory_exporter.clear()
url = f"http://{host}:{port}/test-path?query=param#foobar"
self.assert_spans(
[
(
"GET",
(span_status, None),
{
SpanAttributes.HTTP_METHOD: "GET",
SpanAttributes.HTTP_URL: url,
SpanAttributes.HTTP_STATUS_CODE: int(
status_code
),
},
)
]
def test_status_codes_new_semconv(self):
for status_code, span_status in self._test_status_codes:
with self.subTest(status_code=status_code):
path = "test-path?query=param#foobar"
host, port = self._http_request(
trace_config=aiohttp_client.create_trace_config(
sem_conv_opt_in_mode=_HTTPStabilityMode.HTTP
),
url=f"/{path}",
status_code=status_code,
)
url = f"http://{host}:{port}/{path}"
attributes = {
HTTP_REQUEST_METHOD: "GET",
URL_FULL: url,
HTTP_RESPONSE_STATUS_CODE: status_code,
}
if status_code >= 400:
attributes[ERROR_TYPE] = str(status_code.value)
spans = [("GET", (span_status, None), attributes)]
self.assert_spans(spans)
self.memory_exporter.clear()
def test_status_codes_both_semconv(self):
for status_code, span_status in self._test_status_codes:
with self.subTest(status_code=status_code):
path = "test-path?query=param#foobar"
host, port = self._http_request(
trace_config=aiohttp_client.create_trace_config(
sem_conv_opt_in_mode=_HTTPStabilityMode.HTTP_DUP
),
url=f"/{path}",
status_code=status_code,
)
url = f"http://{host}:{port}/{path}"
attributes = {
HTTP_REQUEST_METHOD: "GET",
SpanAttributes.HTTP_METHOD: "GET",
URL_FULL: url,
SpanAttributes.HTTP_URL: url,
HTTP_RESPONSE_STATUS_CODE: status_code,
SpanAttributes.HTTP_STATUS_CODE: status_code,
}
if status_code >= 400:
attributes[ERROR_TYPE] = str(status_code.value)
spans = [("GET", (span_status, None), attributes)]
self.assert_spans(spans, 1)
self.memory_exporter.clear()
def test_schema_url(self):
@ -149,6 +209,40 @@ class TestAioHttpIntegration(TestBase):
)
self.memory_exporter.clear()
def test_schema_url_new_semconv(self):
with self.subTest(status_code=200):
self._http_request(
trace_config=aiohttp_client.create_trace_config(
sem_conv_opt_in_mode=_HTTPStabilityMode.HTTP
),
url="/test-path?query=param#foobar",
status_code=200,
)
span = self.memory_exporter.get_finished_spans()[0]
self.assertEqual(
span.instrumentation_info.schema_url,
"https://opentelemetry.io/schemas/v1.21.0",
)
self.memory_exporter.clear()
def test_schema_url_both_semconv(self):
with self.subTest(status_code=200):
self._http_request(
trace_config=aiohttp_client.create_trace_config(
sem_conv_opt_in_mode=_HTTPStabilityMode.HTTP_DUP
),
url="/test-path?query=param#foobar",
status_code=200,
)
span = self.memory_exporter.get_finished_spans()[0]
self.assertEqual(
span.instrumentation_info.schema_url,
"https://opentelemetry.io/schemas/v1.21.0",
)
self.memory_exporter.clear()
def test_not_recording(self):
mock_tracer = mock.Mock()
mock_span = mock.Mock()
@ -263,7 +357,7 @@ class TestAioHttpIntegration(TestBase):
[
(
"GET",
(expected_status, None),
(expected_status, "ClientConnectorError"),
{
SpanAttributes.HTTP_METHOD: "GET",
SpanAttributes.HTTP_URL: url,
@ -273,6 +367,89 @@ class TestAioHttpIntegration(TestBase):
)
self.memory_exporter.clear()
def test_basic_exception(self):
async def request_handler(request):
assert "traceparent" in request.headers
host, port = self._http_request(
trace_config=aiohttp_client.create_trace_config(),
url="/test",
request_handler=request_handler,
)
span = self.memory_exporter.get_finished_spans()[0]
self.assertEqual(len(span.events), 1)
self.assertEqual(span.events[0].name, "exception")
self.assert_spans(
[
(
"GET",
(StatusCode.ERROR, "ServerDisconnectedError"),
{
SpanAttributes.HTTP_METHOD: "GET",
SpanAttributes.HTTP_URL: f"http://{host}:{port}/test",
},
)
]
)
def test_basic_exception_new_semconv(self):
async def request_handler(request):
assert "traceparent" in request.headers
host, port = self._http_request(
trace_config=aiohttp_client.create_trace_config(
sem_conv_opt_in_mode=_HTTPStabilityMode.HTTP
),
url="/test",
request_handler=request_handler,
)
span = self.memory_exporter.get_finished_spans()[0]
self.assertEqual(len(span.events), 1)
self.assertEqual(span.events[0].name, "exception")
self.assert_spans(
[
(
"GET",
(StatusCode.ERROR, "ServerDisconnectedError"),
{
HTTP_REQUEST_METHOD: "GET",
URL_FULL: f"http://{host}:{port}/test",
ERROR_TYPE: "ServerDisconnectedError",
},
)
]
)
def test_basic_exception_both_semconv(self):
async def request_handler(request):
assert "traceparent" in request.headers
host, port = self._http_request(
trace_config=aiohttp_client.create_trace_config(
sem_conv_opt_in_mode=_HTTPStabilityMode.HTTP_DUP
),
url="/test",
request_handler=request_handler,
)
span = self.memory_exporter.get_finished_spans()[0]
self.assertEqual(len(span.events), 1)
self.assertEqual(span.events[0].name, "exception")
self.assert_spans(
[
(
"GET",
(StatusCode.ERROR, "ServerDisconnectedError"),
{
HTTP_REQUEST_METHOD: "GET",
URL_FULL: f"http://{host}:{port}/test",
ERROR_TYPE: "ServerDisconnectedError",
SpanAttributes.HTTP_METHOD: "GET",
SpanAttributes.HTTP_URL: f"http://{host}:{port}/test",
},
)
]
)
def test_timeout(self):
async def request_handler(request):
await asyncio.sleep(1)
@ -290,7 +467,7 @@ class TestAioHttpIntegration(TestBase):
[
(
"GET",
(StatusCode.ERROR, None),
(StatusCode.ERROR, "ServerTimeoutError"),
{
SpanAttributes.HTTP_METHOD: "GET",
SpanAttributes.HTTP_URL: f"http://{host}:{port}/test_timeout",
@ -317,7 +494,7 @@ class TestAioHttpIntegration(TestBase):
[
(
"GET",
(StatusCode.ERROR, None),
(StatusCode.ERROR, "TooManyRedirects"),
{
SpanAttributes.HTTP_METHOD: "GET",
SpanAttributes.HTTP_URL: f"http://{host}:{port}/test_too_many_redirects",
@ -374,6 +551,7 @@ class TestAioHttpClientInstrumentor(TestBase):
def setUp(self):
super().setUp()
AioHttpClientInstrumentor().instrument()
_OpenTelemetrySemanticConventionStability._initialized = False
def tearDown(self):
super().tearDown()
@ -414,6 +592,46 @@ class TestAioHttpClientInstrumentor(TestBase):
)
self.assertEqual(200, span.attributes[SpanAttributes.HTTP_STATUS_CODE])
def test_instrument_new_semconv(self):
AioHttpClientInstrumentor().uninstrument()
with mock.patch.dict(
"os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: "http"}
):
AioHttpClientInstrumentor().instrument()
host, port = run_with_test_server(
self.get_default_request(), self.URL, self.default_handler
)
span = self.assert_spans(1)
self.assertEqual("GET", span.name)
self.assertEqual("GET", span.attributes[HTTP_REQUEST_METHOD])
self.assertEqual(
f"http://{host}:{port}/test-path",
span.attributes[URL_FULL],
)
self.assertEqual(200, span.attributes[HTTP_RESPONSE_STATUS_CODE])
def test_instrument_both_semconv(self):
AioHttpClientInstrumentor().uninstrument()
with mock.patch.dict(
"os.environ", {OTEL_SEMCONV_STABILITY_OPT_IN: "http/dup"}
):
AioHttpClientInstrumentor().instrument()
host, port = run_with_test_server(
self.get_default_request(), self.URL, self.default_handler
)
url = f"http://{host}:{port}/test-path"
attributes = {
HTTP_REQUEST_METHOD: "GET",
SpanAttributes.HTTP_METHOD: "GET",
URL_FULL: url,
SpanAttributes.HTTP_URL: url,
HTTP_RESPONSE_STATUS_CODE: 200,
SpanAttributes.HTTP_STATUS_CODE: 200,
}
span = self.assert_spans(1)
self.assertEqual("GET", span.name)
self.assertEqual(span.attributes, attributes)
def test_instrument_with_custom_trace_config(self):
trace_config = aiohttp.TraceConfig()

View File

@ -451,7 +451,8 @@ def set_status_code(
metric_attributes,
status_code,
status_code_str,
sem_conv_opt_in_mode,
server_span=True,
sem_conv_opt_in_mode=sem_conv_opt_in_mode,
)

View File

@ -496,7 +496,8 @@ def add_response_attributes(
duration_attrs,
status_code,
status_code_str,
sem_conv_opt_in_mode,
server_span=True,
sem_conv_opt_in_mode=sem_conv_opt_in_mode,
)

View File

@ -349,10 +349,11 @@ def _set_http_flavor_version(result, version, sem_conv_opt_in_mode):
def _set_status(
span,
metrics_attributes,
status_code,
status_code_str,
sem_conv_opt_in_mode,
metrics_attributes: dict,
status_code: int,
status_code_str: str,
server_span: bool = True,
sem_conv_opt_in_mode: _HTTPStabilityMode = _HTTPStabilityMode.DEFAULT,
):
if status_code < 0:
metrics_attributes[ERROR_TYPE] = status_code_str
@ -366,7 +367,9 @@ def _set_status(
)
)
else:
status = http_status_to_status_code(status_code, server_span=True)
status = http_status_to_status_code(
status_code, server_span=server_span
)
if _report_old(sem_conv_opt_in_mode):
if span.is_recording():