Add net.peer.ip in requests & urllib3 instrumentations. (#661)

This commit is contained in:
Christian Neumüller 2021-09-27 19:21:26 +02:00 committed by GitHub
parent 201aa2bb1b
commit efaa257a63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 492 additions and 4 deletions

View File

@ -6,7 +6,7 @@ on:
- 'release/*'
pull_request:
env:
CORE_REPO_SHA: c49ad57bfe35cfc69bfa863d74058ca9bec55fc3
CORE_REPO_SHA: d9c22a87b6bfc5ec332588c764f82c32f068b2c3
jobs:
build:

View File

@ -28,6 +28,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-urllib3` Updated `_RequestHookT` with two additional fields - the request body and the request headers
([#660](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/660))
### Added
- `opentelemetry-instrumentation-urllib3`, `opentelemetry-instrumentation-requests`
The `net.peer.ip` attribute is set to the IP of the connected HTTP server or proxy
using a new instrumentor in `opententelemetry-util-http`
([#661](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/661))
## [1.5.0-0.24b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.5.0-0.24b0) - 2021-08-26
### Added

View File

@ -53,6 +53,7 @@ from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import SpanKind, get_tracer
from opentelemetry.trace.status import Status
from opentelemetry.util.http import remove_url_credentials
from opentelemetry.util.http.httplib import set_ip_on_next_http_connection
# A key to a context variable to avoid creating duplicate spans when instrumenting
# both, Session.request and Session.send, since Session.request calls into Session.send
@ -133,7 +134,7 @@ def _instrument(tracer, span_callback=None, name_callback=None):
with tracer.start_as_current_span(
span_name, kind=SpanKind.CLIENT
) as span:
) as span, set_ip_on_next_http_connection(span):
exception = None
if span.is_recording():
span.set_attribute(SpanAttributes.HTTP_METHOD, method)

View File

@ -0,0 +1,80 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
from opentelemetry import trace
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.test.httptest import HttpTestBase
from opentelemetry.test.test_base import TestBase
from opentelemetry.util.http.httplib import HttpClientInstrumentor
class TestURLLib3InstrumentorWithRealSocket(HttpTestBase, TestBase):
def setUp(self):
super().setUp()
self.assert_ip = self.server.server_address[0]
self.http_host = ":".join(map(str, self.server.server_address[:2]))
self.http_url_base = "http://" + self.http_host
self.http_url = self.http_url_base + "/status/200"
HttpClientInstrumentor().instrument()
RequestsInstrumentor().instrument()
def tearDown(self):
super().tearDown()
HttpClientInstrumentor().uninstrument()
RequestsInstrumentor().uninstrument()
@staticmethod
def perform_request(url: str) -> requests.Response:
return requests.get(url)
def test_basic_http_success(self):
response = self.perform_request(self.http_url)
self.assert_success_span(response)
def test_basic_http_success_using_connection_pool(self):
with requests.Session() as session:
response = session.get(self.http_url)
self.assert_success_span(response)
# Test that when re-using an existing connection, everything still works.
# Especially relevant for IP capturing.
response = session.get(self.http_url)
self.assert_success_span(response)
def assert_span(self, num_spans=1): # TODO: Move this to TestBase
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(num_spans, len(span_list))
if num_spans == 0:
return None
self.memory_exporter.clear()
if num_spans == 1:
return span_list[0]
return span_list
def assert_success_span(self, response: requests.Response):
self.assertEqual("Hello!", response.text)
span = self.assert_span()
self.assertIs(trace.SpanKind.CLIENT, span.kind)
self.assertEqual("HTTP GET", span.name)
attributes = {
"http.status_code": 200,
"net.peer.ip": self.assert_ip,
}
self.assertGreaterEqual(span.attributes.items(), attributes.items())

View File

@ -41,6 +41,7 @@ install_requires =
opentelemetry-api ~= 1.3
opentelemetry-semantic-conventions == 0.24b0
opentelemetry-instrumentation == 0.24b0
opentelemetry-util-http == 0.24b0
wrapt >= 1.0.0, < 2.0.0
[options.extras_require]

View File

@ -81,6 +81,7 @@ from opentelemetry.propagate import inject
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span, SpanKind, get_tracer
from opentelemetry.trace.status import Status
from opentelemetry.util.http.httplib import set_ip_on_next_http_connection
# A key to a context variable to avoid creating duplicate spans when instrumenting
# both, Session.request and Session.send, since Session.request calls into Session.send
@ -168,7 +169,7 @@ def _instrument(
with tracer.start_as_current_span(
span_name, kind=SpanKind.CLIENT, attributes=span_attributes
) as span:
) as span, set_ip_on_next_http_connection(span):
if callable(request_hook):
request_hook(span, instance, headers, body)
inject(headers)

View File

@ -0,0 +1,86 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import urllib3
import urllib3.exceptions
from opentelemetry import trace
from opentelemetry.instrumentation.urllib3 import URLLib3Instrumentor
from opentelemetry.test.httptest import HttpTestBase
from opentelemetry.test.test_base import TestBase
from opentelemetry.util.http.httplib import HttpClientInstrumentor
class TestURLLib3InstrumentorWithRealSocket(HttpTestBase, TestBase):
def setUp(self):
super().setUp()
self.assert_ip = self.server.server_address[0]
self.http_host = ":".join(map(str, self.server.server_address[:2]))
self.http_url_base = "http://" + self.http_host
self.http_url = self.http_url_base + "/status/200"
HttpClientInstrumentor().instrument()
URLLib3Instrumentor().instrument()
def tearDown(self):
super().tearDown()
HttpClientInstrumentor().uninstrument()
URLLib3Instrumentor().uninstrument()
@staticmethod
def perform_request(url: str) -> urllib3.response.HTTPResponse:
with urllib3.PoolManager() as pool:
resp = pool.request("GET", url)
resp.close()
return resp
def test_basic_http_success(self):
response = self.perform_request(self.http_url)
self.assert_success_span(response, self.http_url)
def test_basic_http_success_using_connection_pool(self):
with urllib3.HTTPConnectionPool(self.http_host, timeout=3) as pool:
response = pool.request("GET", "/status/200")
self.assert_success_span(response, self.http_url)
# Test that when re-using an existing connection, everything still works.
# Especially relevant for IP capturing.
response = pool.request("GET", "/status/200")
self.assert_success_span(response, self.http_url)
def assert_span(self, num_spans=1):
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(num_spans, len(span_list))
if num_spans == 0:
return None
self.memory_exporter.clear()
if num_spans == 1:
return span_list[0]
return span_list
def assert_success_span(
self, response: urllib3.response.HTTPResponse, url: str
):
self.assertEqual(b"Hello!", response.data)
span = self.assert_span()
self.assertIs(trace.SpanKind.CLIENT, span.kind)
self.assertEqual("HTTP GET", span.name)
attributes = {
"http.status_code": 200,
"net.peer.ip": self.assert_ip,
}
self.assertGreaterEqual(span.attributes.items(), attributes.items())

View File

@ -236,7 +236,7 @@ commands_pre =
grpc: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-grpc[test]
falcon,flask,django,pyramid,tornado,starlette,fastapi,aiohttp,asgi,requests,urllib,wsgi: pip install {toxinidir}/util/opentelemetry-util-http[test]
falcon,flask,django,pyramid,tornado,starlette,fastapi,aiohttp,asgi,requests,urllib,urllib3,wsgi: pip install {toxinidir}/util/opentelemetry-util-http[test]
wsgi,falcon,flask,django,pyramid: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-wsgi[test]
asgi,starlette,fastapi: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-asgi[test]

View File

@ -40,3 +40,7 @@ packages=find_namespace:
[options.packages.find]
where = src
[options.entry_points]
opentelemetry_instrumentor =
httplib = opentelemetry.util.http.httplib:HttpClientInstrumentor

View File

@ -0,0 +1,175 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This library provides functionality to enrich HTTP client spans with IPs. It does
not create spans on its own.
"""
import contextlib
import http.client
import logging
import socket # pylint:disable=unused-import # Used for typing
import typing
from typing import Collection
import wrapt
from opentelemetry import context
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.span import Span
_STATE_KEY = "httpbase_instrumentation_state"
logger = logging.getLogger(__name__)
class HttpClientInstrumentor(BaseInstrumentor):
def instrumentation_dependencies(self) -> Collection[str]:
return () # This instruments http.client from stdlib; no extra deps.
def _instrument(self, **kwargs):
"""Instruments the http.client module (not creating spans on its own)"""
_instrument()
def _uninstrument(self, **kwargs):
_uninstrument()
def _remove_nonrecording(spanlist: typing.List[Span]):
idx = len(spanlist) - 1
while idx >= 0:
if not spanlist[idx].is_recording():
logger.debug("Span is not recording: %s", spanlist[idx])
islast = idx + 1 == len(spanlist)
if not islast:
spanlist[idx] = spanlist[len(spanlist) - 1]
spanlist.pop()
if islast:
if idx == 0:
return False # We removed everything
idx -= 1
else:
idx -= 1
return True
def trysetip(conn: http.client.HTTPConnection, loglevel=logging.DEBUG) -> bool:
"""Tries to set the net.peer.ip semantic attribute on the current span from the given
HttpConnection.
Returns False if the connection is not yet established, False if the IP was captured
or there is no need to capture it.
"""
state = _getstate()
if not state:
return True
spanlist = state.get("need_ip") # type: typing.List[Span]
if not spanlist:
return True
# Remove all non-recording spans from the list.
if not _remove_nonrecording(spanlist):
return True
sock = "<property not accessed>"
try:
sock = conn.sock # type: typing.Optional[socket.socket]
logger.debug("Got socket: %s", sock)
if sock is None:
return False
addr = sock.getpeername()
if addr and addr[0]:
ip = addr[0]
except Exception: # pylint:disable=broad-except
logger.log(
loglevel,
"Failed to get peer address from %s",
sock,
exc_info=True,
stack_info=True,
)
else:
for span in spanlist:
span.set_attribute(SpanAttributes.NET_PEER_IP, ip)
return True
def _instrumented_connect(
wrapped, instance: http.client.HTTPConnection, args, kwargs
):
result = wrapped(*args, **kwargs)
trysetip(instance, loglevel=logging.WARNING)
return result
def instrument_connect(module, name="connect"):
"""Instrument additional connect() methods, e.g. for derived classes."""
wrapt.wrap_function_wrapper(
module, name, _instrumented_connect,
)
def _instrument():
def instrumented_send(
wrapped, instance: http.client.HTTPConnection, args, kwargs
):
done = trysetip(instance)
result = wrapped(*args, **kwargs)
if not done:
trysetip(instance, loglevel=logging.WARNING)
return result
wrapt.wrap_function_wrapper(
http.client.HTTPConnection, "send", instrumented_send,
)
instrument_connect(http.client.HTTPConnection)
# No need to instrument HTTPSConnection, as it calls super().connect()
def _getstate() -> typing.Optional[dict]:
return context.get_value(_STATE_KEY)
@contextlib.contextmanager
def set_ip_on_next_http_connection(span: Span):
state = _getstate()
if not state:
token = context.attach(
context.set_value(_STATE_KEY, {"need_ip": [span]})
)
try:
yield
finally:
context.detach(token)
else:
spans = state["need_ip"] # type: typing.List[Span]
spans.append(span)
try:
yield
finally:
try:
spans.remove(span)
except ValueError: # Span might have become non-recording
pass
def _uninstrument():
unwrap(http.client.HTTPConnection, "send")
unwrap(http.client.HTTPConnection, "connect")

View File

@ -0,0 +1,133 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from http.client import HTTPConnection, HTTPResponse, HTTPSConnection
from opentelemetry import trace
from opentelemetry.test.httptest import HttpTestBase
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace.span import INVALID_SPAN
from opentelemetry.util.http.httplib import (
HttpClientInstrumentor,
set_ip_on_next_http_connection,
)
# pylint: disable=too-many-public-methods
class TestHttpBase(TestBase, HttpTestBase):
def setUp(self):
super().setUp()
HttpClientInstrumentor().instrument()
self.server_thread, self.server = self.run_server()
def tearDown(self):
HttpClientInstrumentor().uninstrument()
self.server.shutdown()
self.server_thread.join()
super().tearDown()
def assert_span(self, exporter=None, num_spans=1):
if exporter is None:
exporter = self.memory_exporter
span_list = exporter.get_finished_spans()
self.assertEqual(num_spans, len(span_list))
if num_spans == 0:
return None
if num_spans == 1:
return span_list[0]
return span_list
def test_basic(self):
resp, body = self.perform_request()
assert resp.status == 200
assert body == b"Hello!"
self.assert_span(num_spans=0)
def test_basic_with_span(self):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(
"HTTP GET"
) as span, set_ip_on_next_http_connection(span):
resp, body = self.perform_request()
assert resp.status == 200
assert body == b"Hello!"
span = self.assert_span(num_spans=1)
self.assertEqual(span.attributes, {"net.peer.ip": "127.0.0.1"})
def test_with_nested_span(self):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(
"requests HTTP GET"
) as span, set_ip_on_next_http_connection(span):
with tracer.start_as_current_span(
"urllib3 HTTP GET"
) as span2, set_ip_on_next_http_connection(span2):
resp, body = self.perform_request()
assert resp.status == 200
assert body == b"Hello!"
for span in self.assert_span(num_spans=2):
self.assertEqual(span.attributes, {"net.peer.ip": "127.0.0.1"})
def test_with_nested_nonrecording_span(self):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(
"requests HTTP GET"
) as span, set_ip_on_next_http_connection(span):
with trace.use_span(INVALID_SPAN), set_ip_on_next_http_connection(
INVALID_SPAN
):
resp, body = self.perform_request()
assert resp.status == 200
assert body == b"Hello!"
span = self.assert_span(num_spans=1)
self.assertEqual(span.attributes, {"net.peer.ip": "127.0.0.1"})
def test_with_only_nonrecording_span(self):
with trace.use_span(INVALID_SPAN), set_ip_on_next_http_connection(
INVALID_SPAN
):
resp, body = self.perform_request()
assert resp.status == 200
assert body == b"Hello!"
self.assert_span(num_spans=0)
def perform_request(self, secure=False) -> HTTPResponse:
conn_cls = HTTPSConnection if secure else HTTPConnection
conn = conn_cls(self.server.server_address[0], self.server.server_port)
resp = None
try:
conn.request("GET", "/", headers={"Connection": "close"})
resp = conn.getresponse()
return resp, resp.read()
finally:
if resp:
resp.close()
conn.close()
def test_uninstrument(self):
HttpClientInstrumentor().uninstrument()
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(
"HTTP GET"
) as span, set_ip_on_next_http_connection(span):
body = self.perform_request()[1]
self.assertEqual(b"Hello!", body)
# We should have a span, but it should have no attributes
self.assertFalse(self.assert_span(num_spans=1).attributes)
# instrument again to avoid warning message on tearDown
HttpClientInstrumentor().instrument()