Add support to instrument httpx when using proxy (#2664)

This commit is contained in:
Emídio Neto 2024-07-17 14:46:31 -03:00 committed by GitHub
parent 5a7935ff1f
commit e6c27e0800
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 205 additions and 7 deletions

View File

@ -31,6 +31,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2630](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2630))
- `opentelemetry-instrumentation-system-metrics` Add support for capture open file descriptors
([#2652](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2652))
- `opentelemetry-instrumentation-httpx` Add support for instrument client with proxy
([#2664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2664))
- `opentelemetry-instrumentation-aiohttp-client` Implement new semantic convention opt-in migration
([#2673](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2673))
- `opentelemetry-instrumentation-django` Add `http.target` to Django duration metric attributes
@ -63,7 +65,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-asgi` Fix generation of `http.target` and `http.url` attributes for ASGI apps
using sub apps
([#2477](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2477))
- `opentelemetry-instrumentation-aws-lambda` Bugfix: AWS Lambda event source key incorrect for SNS in instrumentation library.
- `opentelemetry-instrumentation-aws-lambda` Bugfix: AWS Lambda event source key incorrect for SNS in instrumentation library.
([#2612](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2612))
- `opentelemetry-instrumentation-asyncio` instrumented `asyncio.wait_for` properly raises `asyncio.TimeoutError` as expected
([#2637](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2637))

View File

@ -640,6 +640,7 @@ class _InstrumentedClient(httpx.Client):
super().__init__(*args, **kwargs)
self._original_transport = self._transport
self._original_mounts = self._mounts.copy()
self._is_instrumented_by_opentelemetry = True
self._transport = SyncOpenTelemetryTransport(
@ -648,6 +649,21 @@ class _InstrumentedClient(httpx.Client):
request_hook=_InstrumentedClient._request_hook,
response_hook=_InstrumentedClient._response_hook,
)
self._mounts.update(
{
url_pattern: (
SyncOpenTelemetryTransport(
transport,
tracer_provider=_InstrumentedClient._tracer_provider,
request_hook=_InstrumentedClient._request_hook,
response_hook=_InstrumentedClient._response_hook,
)
if transport is not None
else transport
)
for url_pattern, transport in self._original_mounts.items()
}
)
class _InstrumentedAsyncClient(httpx.AsyncClient):
@ -659,6 +675,7 @@ class _InstrumentedAsyncClient(httpx.AsyncClient):
super().__init__(*args, **kwargs)
self._original_transport = self._transport
self._original_mounts = self._mounts.copy()
self._is_instrumented_by_opentelemetry = True
self._transport = AsyncOpenTelemetryTransport(
@ -668,6 +685,22 @@ class _InstrumentedAsyncClient(httpx.AsyncClient):
response_hook=_InstrumentedAsyncClient._response_hook,
)
self._mounts.update(
{
url_pattern: (
AsyncOpenTelemetryTransport(
transport,
tracer_provider=_InstrumentedAsyncClient._tracer_provider,
request_hook=_InstrumentedAsyncClient._request_hook,
response_hook=_InstrumentedAsyncClient._response_hook,
)
if transport is not None
else transport
)
for url_pattern, transport in self._original_mounts.items()
}
)
class HTTPXClientInstrumentor(BaseInstrumentor):
# pylint: disable=protected-access,attribute-defined-outside-init
@ -752,6 +785,7 @@ class HTTPXClientInstrumentor(BaseInstrumentor):
if not client._is_instrumented_by_opentelemetry:
if isinstance(client, httpx.Client):
client._original_transport = client._transport
client._original_mounts = client._mounts.copy()
transport = client._transport or httpx.HTTPTransport()
client._transport = SyncOpenTelemetryTransport(
transport,
@ -760,8 +794,25 @@ class HTTPXClientInstrumentor(BaseInstrumentor):
response_hook=response_hook,
)
client._is_instrumented_by_opentelemetry = True
client._mounts.update(
{
url_pattern: (
SyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
request_hook=request_hook,
response_hook=response_hook,
)
if transport is not None
else transport
)
for url_pattern, transport in client._original_mounts.items()
}
)
if isinstance(client, httpx.AsyncClient):
transport = client._transport or httpx.AsyncHTTPTransport()
client._original_mounts = client._mounts.copy()
client._transport = AsyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
@ -769,6 +820,21 @@ class HTTPXClientInstrumentor(BaseInstrumentor):
response_hook=response_hook,
)
client._is_instrumented_by_opentelemetry = True
client._mounts.update(
{
url_pattern: (
AsyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
request_hook=request_hook,
response_hook=response_hook,
)
if transport is not None
else transport
)
for url_pattern, transport in client._original_mounts.items()
}
)
else:
_logger.warning(
"Attempting to instrument Httpx client while already instrumented"
@ -787,6 +853,9 @@ class HTTPXClientInstrumentor(BaseInstrumentor):
client._transport = client._original_transport
del client._original_transport
client._is_instrumented_by_opentelemetry = False
if hasattr(client, "_original_mounts"):
client._mounts = client._original_mounts.copy()
del client._original_mounts
else:
_logger.warning(
"Attempting to uninstrument Httpx "

View File

@ -530,6 +530,7 @@ class BaseTestCases:
tracer_provider: typing.Optional["TracerProvider"] = None,
request_hook: typing.Optional["RequestHook"] = None,
response_hook: typing.Optional["ResponseHook"] = None,
**kwargs,
):
pass
@ -539,6 +540,7 @@ class BaseTestCases:
transport: typing.Union[
SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport, None
] = None,
**kwargs,
):
pass
@ -643,6 +645,30 @@ class BaseTestCases:
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)
@respx.mock
def test_client_mounts_with_instrumented_transport(self):
https_url = "https://mock/status/200"
respx.get(https_url).mock(httpx.Response(200))
proxy_mounts = {
"http://": self.create_transport(
proxy=httpx.Proxy("http://localhost:8080")
),
"https://": self.create_transport(
proxy=httpx.Proxy("http://localhost:8443")
),
}
client1 = self.create_client(mounts=proxy_mounts)
client2 = self.create_client(mounts=proxy_mounts)
self.perform_request(self.URL, client=client1)
self.perform_request(https_url, client=client2)
spans = self.assert_span(num_spans=2)
self.assertEqual(
spans[0].attributes[SpanAttributes.HTTP_URL], self.URL
)
self.assertEqual(
spans[1].attributes[SpanAttributes.HTTP_URL], https_url
)
class BaseInstrumentorTest(BaseTest, metaclass=abc.ABCMeta):
@abc.abstractmethod
def create_client(
@ -650,15 +676,39 @@ class BaseTestCases:
transport: typing.Union[
SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport, None
] = None,
**kwargs,
):
pass
@abc.abstractmethod
def create_proxy_transport(self, url: str):
pass
def setUp(self):
super().setUp()
HTTPXClientInstrumentor().instrument()
self.client = self.create_client()
HTTPXClientInstrumentor().uninstrument()
def create_proxy_mounts(self):
return {
"http://": self.create_proxy_transport(
"http://localhost:8080"
),
"https://": self.create_proxy_transport(
"http://localhost:8080"
),
}
def assert_proxy_mounts(self, mounts, num_mounts, transport_type):
self.assertEqual(len(mounts), num_mounts)
for transport in mounts:
with self.subTest(transport):
self.assertIsInstance(
transport,
transport_type,
)
def test_custom_tracer_provider(self):
resource = resources.Resource.create({})
result = self.create_tracer_provider(resource=resource)
@ -855,6 +905,71 @@ class BaseTestCases:
self.assertEqual(result.text, "Hello!")
self.assert_span()
def test_instrument_proxy(self):
proxy_mounts = self.create_proxy_mounts()
HTTPXClientInstrumentor().instrument()
client = self.create_client(mounts=proxy_mounts)
self.perform_request(self.URL, client=client)
self.assert_span(num_spans=1)
self.assert_proxy_mounts(
client._mounts.values(),
2,
(SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport),
)
HTTPXClientInstrumentor().uninstrument()
def test_instrument_client_with_proxy(self):
proxy_mounts = self.create_proxy_mounts()
client = self.create_client(mounts=proxy_mounts)
self.assert_proxy_mounts(
client._mounts.values(),
2,
(httpx.HTTPTransport, httpx.AsyncHTTPTransport),
)
HTTPXClientInstrumentor().instrument_client(client)
result = self.perform_request(self.URL, client=client)
self.assertEqual(result.text, "Hello!")
self.assert_span(num_spans=1)
self.assert_proxy_mounts(
client._mounts.values(),
2,
(SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport),
)
HTTPXClientInstrumentor().uninstrument_client(client)
def test_uninstrument_client_with_proxy(self):
proxy_mounts = self.create_proxy_mounts()
HTTPXClientInstrumentor().instrument()
client = self.create_client(mounts=proxy_mounts)
self.assert_proxy_mounts(
client._mounts.values(),
2,
(SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport),
)
HTTPXClientInstrumentor().uninstrument_client(client)
result = self.perform_request(self.URL, client=client)
self.assertEqual(result.text, "Hello!")
self.assert_span(num_spans=0)
self.assert_proxy_mounts(
client._mounts.values(),
2,
(httpx.HTTPTransport, httpx.AsyncHTTPTransport),
)
# Test that other clients as well as instance client is still
# instrumented
client2 = self.create_client()
result = self.perform_request(self.URL, client=client2)
self.assertEqual(result.text, "Hello!")
self.assert_span()
self.memory_exporter.clear()
result = self.perform_request(self.URL)
self.assertEqual(result.text, "Hello!")
self.assert_span()
class TestSyncIntegration(BaseTestCases.BaseManualTest):
def setUp(self):
@ -871,8 +986,9 @@ class TestSyncIntegration(BaseTestCases.BaseManualTest):
tracer_provider: typing.Optional["TracerProvider"] = None,
request_hook: typing.Optional["RequestHook"] = None,
response_hook: typing.Optional["ResponseHook"] = None,
**kwargs,
):
transport = httpx.HTTPTransport()
transport = httpx.HTTPTransport(**kwargs)
telemetry_transport = SyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
@ -884,8 +1000,9 @@ class TestSyncIntegration(BaseTestCases.BaseManualTest):
def create_client(
self,
transport: typing.Optional[SyncOpenTelemetryTransport] = None,
**kwargs,
):
return httpx.Client(transport=transport)
return httpx.Client(transport=transport, **kwargs)
def perform_request(
self,
@ -921,8 +1038,9 @@ class TestAsyncIntegration(BaseTestCases.BaseManualTest):
tracer_provider: typing.Optional["TracerProvider"] = None,
request_hook: typing.Optional["AsyncRequestHook"] = None,
response_hook: typing.Optional["AsyncResponseHook"] = None,
**kwargs,
):
transport = httpx.AsyncHTTPTransport()
transport = httpx.AsyncHTTPTransport(**kwargs)
telemetry_transport = AsyncOpenTelemetryTransport(
transport,
tracer_provider=tracer_provider,
@ -934,8 +1052,9 @@ class TestAsyncIntegration(BaseTestCases.BaseManualTest):
def create_client(
self,
transport: typing.Optional[AsyncOpenTelemetryTransport] = None,
**kwargs,
):
return httpx.AsyncClient(transport=transport)
return httpx.AsyncClient(transport=transport, **kwargs)
def perform_request(
self,
@ -977,8 +1096,9 @@ class TestSyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest):
def create_client(
self,
transport: typing.Optional[SyncOpenTelemetryTransport] = None,
**kwargs,
):
return httpx.Client()
return httpx.Client(**kwargs)
def perform_request(
self,
@ -991,6 +1111,9 @@ class TestSyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest):
return self.client.request(method, url, headers=headers)
return client.request(method, url, headers=headers)
def create_proxy_transport(self, url):
return httpx.HTTPTransport(proxy=httpx.Proxy(url))
class TestAsyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest):
response_hook = staticmethod(_async_response_hook)
@ -1007,8 +1130,9 @@ class TestAsyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest):
def create_client(
self,
transport: typing.Optional[AsyncOpenTelemetryTransport] = None,
**kwargs,
):
return httpx.AsyncClient()
return httpx.AsyncClient(**kwargs)
def perform_request(
self,
@ -1027,6 +1151,9 @@ class TestAsyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest):
return _async_call(_perform_request())
def create_proxy_transport(self, url):
return httpx.AsyncHTTPTransport(proxy=httpx.Proxy(url))
def test_basic_multiple(self):
# We need to create separate clients because in httpx >= 0.19,
# closing the client after "with" means the second http call fails