opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentati.../tests/test_urllib3_ip_support.py

223 lines
8.9 KiB
Python

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from timeit import default_timer
import urllib3
import urllib3.exceptions
from urllib3.request import encode_multipart_formdata
from opentelemetry import trace
from opentelemetry.instrumentation.urllib3 import URLLib3Instrumentor
from opentelemetry.test.httptest import HttpTestBase
from opentelemetry.test.test_base import TestBase
from opentelemetry.util.http.httplib import HttpClientInstrumentor
class TestURLLib3InstrumentorWithRealSocket(HttpTestBase, TestBase):
def setUp(self):
super().setUp()
self.assert_ip = self.server.server_address[0]
self.http_host = ":".join(map(str, self.server.server_address[:2]))
self.http_url_base = "http://" + self.http_host
self.http_url = self.http_url_base + "/status/200"
HttpClientInstrumentor().instrument()
URLLib3Instrumentor().instrument()
def tearDown(self):
super().tearDown()
HttpClientInstrumentor().uninstrument()
URLLib3Instrumentor().uninstrument()
@staticmethod
def perform_request(url: str) -> urllib3.response.HTTPResponse:
with urllib3.PoolManager() as pool:
resp = pool.request("GET", url)
resp.close()
return resp
def test_basic_http_success(self):
response = self.perform_request(self.http_url)
self.assert_success_span(response, self.http_url)
def test_basic_http_success_using_connection_pool(self):
with urllib3.HTTPConnectionPool(self.http_host, timeout=3) as pool:
response = pool.request("GET", "/status/200")
self.assert_success_span(response, self.http_url)
# Test that when re-using an existing connection, everything still works.
# Especially relevant for IP capturing.
response = pool.request("GET", "/status/200")
self.assert_success_span(response, self.http_url)
def assert_span(self, num_spans=1):
span_list = self.memory_exporter.get_finished_spans()
self.assertEqual(num_spans, len(span_list))
if num_spans == 0:
return None
self.memory_exporter.clear()
if num_spans == 1:
return span_list[0]
return span_list
def assert_success_span(
self, response: urllib3.response.HTTPResponse, url: str
):
self.assertEqual(b"Hello!", response.data)
span = self.assert_span()
self.assertIs(trace.SpanKind.CLIENT, span.kind)
self.assertEqual("HTTP GET", span.name)
attributes = {
"http.status_code": 200,
"net.peer.ip": self.assert_ip,
}
self.assertGreaterEqual(span.attributes.items(), attributes.items())
class TestURLLib3InstrumentorMetric(HttpTestBase, TestBase):
def setUp(self):
super().setUp()
self.assert_ip = self.server.server_address[0]
self.assert_port = self.server.server_address[1]
self.http_host = ":".join(map(str, self.server.server_address[:2]))
self.http_url_base = "http://" + self.http_host
self.http_url = self.http_url_base + "/status/200"
URLLib3Instrumentor().instrument(meter_provider=self.meter_provider)
def tearDown(self):
super().tearDown()
URLLib3Instrumentor().uninstrument()
def test_metric_uninstrument(self):
with urllib3.PoolManager() as pool:
pool.request("GET", self.http_url)
URLLib3Instrumentor().uninstrument()
pool.request("GET", self.http_url)
metrics_list = self.memory_metrics_reader.get_metrics_data()
for resource_metric in metrics_list.resource_metrics:
for scope_metric in resource_metric.scope_metrics:
for metric in scope_metric.metrics:
for point in list(metric.data.data_points):
self.assertEqual(point.count, 1)
def test_basic_metric_check_client_size_get(self):
with urllib3.PoolManager() as pool:
start_time = default_timer()
response = pool.request("GET", self.http_url)
client_duration_estimated = (default_timer() - start_time) * 1000
expected_attributes = {
"http.status_code": 200,
"http.host": self.assert_ip,
"http.method": "GET",
"http.flavor": "1.1",
"http.scheme": "http",
"net.peer.name": self.assert_ip,
"net.peer.port": self.assert_port,
}
expected_data = {
"http.client.request.size": 0,
"http.client.response.size": len(response.data),
}
expected_metrics = [
"http.client.duration",
"http.client.request.size",
"http.client.response.size",
]
resource_metrics = (
self.memory_metrics_reader.get_metrics_data().resource_metrics
)
for metrics in resource_metrics:
for scope_metrics in metrics.scope_metrics:
self.assertEqual(len(scope_metrics.metrics), 3)
for metric in scope_metrics.metrics:
for data_point in metric.data.data_points:
if metric.name in expected_data:
self.assertEqual(
data_point.sum, expected_data[metric.name]
)
if metric.name == "http.client.duration":
self.assertAlmostEqual(
data_point.sum,
client_duration_estimated,
delta=1000,
)
self.assertIn(metric.name, expected_metrics)
self.assertDictEqual(
expected_attributes,
dict(data_point.attributes),
)
self.assertEqual(data_point.count, 1)
def test_basic_metric_check_client_size_post(self):
with urllib3.PoolManager() as pool:
start_time = default_timer()
data_fields = {"data": "test"}
response = pool.request("POST", self.http_url, fields=data_fields)
client_duration_estimated = (default_timer() - start_time) * 1000
expected_attributes = {
"http.status_code": 501,
"http.host": self.assert_ip,
"http.method": "POST",
"http.flavor": "1.1",
"http.scheme": "http",
"net.peer.name": self.assert_ip,
"net.peer.port": self.assert_port,
}
body = encode_multipart_formdata(data_fields)[0]
expected_data = {
"http.client.request.size": len(body),
"http.client.response.size": len(response.data),
}
expected_metrics = [
"http.client.duration",
"http.client.request.size",
"http.client.response.size",
]
resource_metrics = (
self.memory_metrics_reader.get_metrics_data().resource_metrics
)
for metrics in resource_metrics:
for scope_metrics in metrics.scope_metrics:
self.assertEqual(len(scope_metrics.metrics), 3)
for metric in scope_metrics.metrics:
for data_point in metric.data.data_points:
if metric.name in expected_data:
self.assertEqual(
data_point.sum, expected_data[metric.name]
)
if metric.name == "http.client.duration":
self.assertAlmostEqual(
data_point.sum,
client_duration_estimated,
delta=1000,
)
self.assertIn(metric.name, expected_metrics)
self.assertDictEqual(
expected_attributes,
dict(data_point.attributes),
)
self.assertEqual(data_point.count, 1)