Fixes and Improvements to botocore instrumentation (#150)

This commit is contained in:
(Eliseo) Nathaniel Ruiz Nowell 2020-11-18 12:39:15 -08:00 committed by GitHub
parent c8904cef1a
commit fd493f446f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 339 additions and 208 deletions

View File

@ -48,13 +48,15 @@ from boto.connection import AWSAuthConnection, AWSQueryConnection
from wrapt import wrap_function_wrapper
from opentelemetry.instrumentation.boto.version import __version__
from opentelemetry.instrumentation.botocore import add_span_arg_tags, unwrap
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.sdk.trace import Resource
from opentelemetry.trace import SpanKind, get_tracer
logger = logging.getLogger(__name__)
SERVICE_PARAMS_BLOCK_LIST = {"s3": ["params.Body"]}
def _get_instance_region_name(instance):
region = getattr(instance, "region", None)
@ -201,3 +203,50 @@ class BotoInstrumentor(BaseInstrumentor):
args,
kwargs,
)
def flatten_dict(dict_, sep=".", prefix=""):
"""
Returns a normalized dict of depth 1 with keys in order of embedding
"""
# NOTE: This should probably be in `opentelemetry.instrumentation.utils`.
# adapted from https://stackoverflow.com/a/19647596
return (
{
prefix + sep + k if prefix else k: v
for kk, vv in dict_.items()
for k, v in flatten_dict(vv, sep, kk).items()
}
if isinstance(dict_, dict)
else {prefix: dict_}
)
def add_span_arg_tags(span, aws_service, args, args_names, args_traced):
def truncate_arg_value(value, max_len=1024):
"""Truncate values which are bytes and greater than `max_len`.
Useful for parameters like "Body" in `put_object` operations.
"""
if isinstance(value, bytes) and len(value) > max_len:
return b"..."
return value
if not span.is_recording():
return
# Do not trace `Key Management Service` or `Secure Token Service` API calls
# over concerns of security leaks.
if aws_service not in {"kms", "sts"}:
tags = dict(
(name, value)
for (name, value) in zip(args_names, args)
if name in args_traced
)
tags = flatten_dict(tags)
for param_key, value in tags.items():
if param_key in SERVICE_PARAMS_BLOCK_LIST.get(aws_service, {}):
continue
span.set_attribute(param_key, truncate_arg_value(value))

View File

@ -5,6 +5,8 @@
([#181](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/181))
- Make botocore instrumentation check if instrumentation has been suppressed
([#182](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/182))
- Botocore SpanKind as CLIENT and modify existing traced attributes
([#150])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/150)
## Version 0.13b0

View File

@ -49,12 +49,14 @@ API
import logging
from botocore.client import BaseClient
from botocore.exceptions import ClientError, ParamValidationError
from wrapt import ObjectProxy, wrap_function_wrapper
from opentelemetry import context as context_api
from opentelemetry import propagators
from opentelemetry.instrumentation.botocore.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.sdk.trace import Resource
from opentelemetry.trace import SpanKind, get_tracer
@ -70,15 +72,13 @@ def _patched_endpoint_prepare_request(wrapped, instance, args, kwargs):
class BotocoreInstrumentor(BaseInstrumentor):
"""A instrumentor for Botocore
"""An instrumentor for Botocore.
See `BaseInstrumentor`
"""
def _instrument(self, **kwargs):
# FIXME should the tracer provider be accessed via Configuration
# instead?
# pylint: disable=attribute-defined-outside-init
self._tracer = get_tracer(
__name__, __version__, kwargs.get("tracer_provider")
@ -99,137 +99,66 @@ class BotocoreInstrumentor(BaseInstrumentor):
def _uninstrument(self, **kwargs):
unwrap(BaseClient, "_make_api_call")
# pylint: disable=too-many-branches
def _patched_api_call(self, original_func, instance, args, kwargs):
if context_api.get_value("suppress_instrumentation"):
return original_func(*args, **kwargs)
endpoint_name = deep_getattr(instance, "_endpoint._endpoint_prefix")
# pylint: disable=protected-access
service_name = instance._service_model.service_name
operation_name, api_params = args
error = None
result = None
with self._tracer.start_as_current_span(
"{}.command".format(endpoint_name), kind=SpanKind.CONSUMER,
"{}".format(service_name), kind=SpanKind.CLIENT,
) as span:
if span.is_recording():
span.set_attribute("aws.operation", operation_name)
span.set_attribute("aws.region", instance.meta.region_name)
span.set_attribute("aws.service", service_name)
if "QueueUrl" in api_params:
span.set_attribute("aws.queue_url", api_params["QueueUrl"])
if "TableName" in api_params:
span.set_attribute(
"aws.table_name", api_params["TableName"]
)
operation = None
if args and span.is_recording():
operation = args[0]
span.resource = Resource(
attributes={
"endpoint": endpoint_name,
"operation": operation.lower(),
}
)
try:
result = original_func(*args, **kwargs)
except ClientError as ex:
error = ex
else:
span.resource = Resource(
attributes={"endpoint": endpoint_name}
)
add_span_arg_tags(
span,
endpoint_name,
args,
("action", "params", "path", "verb"),
{"params", "path", "verb"},
)
if error:
result = error.response
if span.is_recording():
region_name = deep_getattr(instance, "meta.region_name")
if "ResponseMetadata" in result:
metadata = result["ResponseMetadata"]
req_id = None
if "RequestId" in metadata:
req_id = metadata["RequestId"]
elif "HTTPHeaders" in metadata:
headers = metadata["HTTPHeaders"]
if "x-amzn-RequestId" in headers:
req_id = headers["x-amzn-RequestId"]
elif "x-amz-request-id" in headers:
req_id = headers["x-amz-request-id"]
elif "x-amz-id-2" in headers:
req_id = headers["x-amz-id-2"]
meta = {
"aws.agent": "botocore",
"aws.operation": operation,
"aws.region": region_name,
}
for key, value in meta.items():
span.set_attribute(key, value)
if req_id:
span.set_attribute(
"aws.request_id", req_id,
)
result = original_func(*args, **kwargs)
if "HTTPStatusCode" in metadata:
span.set_attribute(
"http.status_code", metadata["HTTPStatusCode"],
)
if span.is_recording():
span.set_attribute(
"http.status_code",
result["ResponseMetadata"]["HTTPStatusCode"],
)
span.set_attribute(
"retry_attempts",
result["ResponseMetadata"]["RetryAttempts"],
)
if error:
raise error
return result
def unwrap(obj, attr):
function = getattr(obj, attr, None)
if (
function
and isinstance(function, ObjectProxy)
and hasattr(function, "__wrapped__")
):
setattr(obj, attr, function.__wrapped__)
def add_span_arg_tags(span, endpoint_name, args, args_names, args_traced):
def truncate_arg_value(value, max_len=1024):
"""Truncate values which are bytes and greater than `max_len`.
Useful for parameters like "Body" in `put_object` operations.
"""
if isinstance(value, bytes) and len(value) > max_len:
return b"..."
return value
def flatten_dict(dict_, sep=".", prefix=""):
"""
Returns a normalized dict of depth 1 with keys in order of embedding
"""
# adapted from https://stackoverflow.com/a/19647596
return (
{
prefix + sep + k if prefix else k: v
for kk, vv in dict_.items()
for k, v in flatten_dict(vv, sep, kk).items()
}
if isinstance(dict_, dict)
else {prefix: dict_}
)
if not span.is_recording():
return
if endpoint_name not in {"kms", "sts"}:
tags = dict(
(name, value)
for (name, value) in zip(args_names, args)
if name in args_traced
)
tags = flatten_dict(tags)
for key, value in {
k: truncate_arg_value(v)
for k, v in tags.items()
if k not in {"s3": ["params.Body"]}.get(endpoint_name, [])
}.items():
span.set_attribute(key, value)
def deep_getattr(obj, attr_string, default=None):
"""
Returns the attribute of ``obj`` at the dotted path given by
``attr_string``, if no such attribute is reachable, returns ``default``.
>>> deep_getattr(cass, "cluster")
<cassandra.cluster.Cluster object at 0xa20c350
>>> deep_getattr(cass, "cluster.metadata.partitioner")
u"org.apache.cassandra.dht.Murmur3Partitioner"
>>> deep_getattr(cass, "i.dont.exist", default="default")
"default"
"""
attrs = attr_string.split(".")
for attr in attrs:
try:
obj = getattr(obj, attr)
except AttributeError:
return default
return obj

View File

@ -17,29 +17,25 @@ from unittest.mock import Mock, patch
import botocore.session
from botocore.exceptions import ParamValidationError
from moto import ( # pylint: disable=import-error
mock_dynamodb2,
mock_ec2,
mock_kinesis,
mock_kms,
mock_lambda,
mock_s3,
mock_sqs,
mock_sts,
mock_xray,
)
from opentelemetry import propagators
from opentelemetry import trace as trace_api
from opentelemetry.context import attach, detach, set_value
from opentelemetry.instrumentation.botocore import BotocoreInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.test.mock_textmap import MockTextMapPropagator
from opentelemetry.test.test_base import TestBase
def assert_span_http_status_code(span, code):
"""Assert on the span"s "http.status_code" tag"""
tag = span.attributes["http.status_code"]
assert tag == code, "%r != %r" % (tag, code)
class TestBotocoreInstrumentor(TestBase):
"""Botocore integration testsuite"""
@ -66,20 +62,17 @@ class TestBotocoreInstrumentor(TestBase):
assert spans
span = spans[0]
self.assertEqual(len(spans), 1)
self.assertEqual(span.attributes["aws.agent"], "botocore")
self.assertEqual(span.attributes["aws.region"], "us-west-2")
self.assertEqual(span.attributes["aws.operation"], "DescribeInstances")
assert_span_http_status_code(span, 200)
self.assertEqual(
span.resource,
Resource(
attributes={
"endpoint": "ec2",
"operation": "describeinstances",
}
),
span.attributes,
{
"aws.operation": "DescribeInstances",
"aws.region": "us-west-2",
"aws.request_id": "fdcdcab1-ae5c-489e-9c33-4637c5dda355",
"aws.service": "ec2",
"http.status_code": 200,
},
)
self.assertEqual(span.name, "ec2.command")
self.assertEqual(span.name, "ec2")
@mock_ec2
def test_not_recording(self):
@ -117,13 +110,14 @@ class TestBotocoreInstrumentor(TestBase):
assert spans
span = spans[0]
self.assertEqual(len(spans), 2)
self.assertEqual(span.attributes["aws.operation"], "ListBuckets")
assert_span_http_status_code(span, 200)
self.assertEqual(
span.resource,
Resource(
attributes={"endpoint": "s3", "operation": "listbuckets"}
),
span.attributes,
{
"aws.operation": "ListBuckets",
"aws.region": "us-west-2",
"aws.service": "s3",
"http.status_code": 200,
},
)
# testing for span error
@ -134,10 +128,15 @@ class TestBotocoreInstrumentor(TestBase):
assert spans
span = spans[2]
self.assertEqual(
span.resource,
Resource(
attributes={"endpoint": "s3", "operation": "listobjects"}
),
span.attributes,
{
"aws.operation": "ListObjects",
"aws.region": "us-west-2",
"aws.service": "s3",
},
)
self.assertIs(
span.status.status_code, trace_api.status.StatusCode.ERROR,
)
# Comment test for issue 1088
@ -148,29 +147,42 @@ class TestBotocoreInstrumentor(TestBase):
location = {"LocationConstraint": "us-west-2"}
s3.create_bucket(Bucket="mybucket", CreateBucketConfiguration=location)
s3.put_object(**params)
s3.get_object(Bucket="mybucket", Key="foo")
spans = self.memory_exporter.get_finished_spans()
assert spans
span = spans[0]
self.assertEqual(len(spans), 2)
self.assertEqual(span.attributes["aws.operation"], "CreateBucket")
assert_span_http_status_code(span, 200)
self.assertEqual(len(spans), 3)
create_bucket_attributes = spans[0].attributes
self.assertEqual(
span.resource,
Resource(
attributes={"endpoint": "s3", "operation": "createbucket"}
),
create_bucket_attributes,
{
"aws.operation": "CreateBucket",
"aws.region": "us-west-2",
"aws.service": "s3",
"http.status_code": 200,
},
)
self.assertEqual(spans[1].attributes["aws.operation"], "PutObject")
put_object_attributes = spans[1].attributes
self.assertEqual(
spans[1].resource,
Resource(attributes={"endpoint": "s3", "operation": "putobject"}),
)
self.assertEqual(spans[1].attributes["params.Key"], str(params["Key"]))
self.assertEqual(
spans[1].attributes["params.Bucket"], str(params["Bucket"])
put_object_attributes,
{
"aws.operation": "PutObject",
"aws.region": "us-west-2",
"aws.service": "s3",
"http.status_code": 200,
},
)
self.assertTrue("params.Body" not in spans[1].attributes.keys())
get_object_attributes = spans[2].attributes
self.assertEqual(
get_object_attributes,
{
"aws.operation": "GetObject",
"aws.region": "us-west-2",
"aws.service": "s3",
"http.status_code": 200,
},
)
@mock_sqs
def test_sqs_client(self):
@ -182,14 +194,62 @@ class TestBotocoreInstrumentor(TestBase):
assert spans
span = spans[0]
self.assertEqual(len(spans), 1)
self.assertEqual(span.attributes["aws.region"], "us-east-1")
self.assertEqual(span.attributes["aws.operation"], "ListQueues")
assert_span_http_status_code(span, 200)
actual = span.attributes
self.assertRegex(actual["aws.request_id"], r"[A-Z0-9]{52}")
del actual["aws.request_id"]
self.assertEqual(
span.resource,
Resource(
attributes={"endpoint": "sqs", "operation": "listqueues"}
),
actual,
{
"aws.operation": "ListQueues",
"aws.region": "us-east-1",
"aws.service": "sqs",
"http.status_code": 200,
},
)
@mock_sqs
def test_sqs_send_message(self):
sqs = self.session.create_client("sqs", region_name="us-east-1")
test_queue_name = "test_queue_name"
response = sqs.create_queue(QueueName=test_queue_name)
sqs.send_message(
QueueUrl=response["QueueUrl"], MessageBody="Test SQS MESSAGE!"
)
spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 2)
create_queue_attributes = spans[0].attributes
self.assertRegex(
create_queue_attributes["aws.request_id"], r"[A-Z0-9]{52}"
)
del create_queue_attributes["aws.request_id"]
self.assertEqual(
create_queue_attributes,
{
"aws.operation": "CreateQueue",
"aws.region": "us-east-1",
"aws.service": "sqs",
"http.status_code": 200,
},
)
send_msg_attributes = spans[1].attributes
self.assertRegex(
send_msg_attributes["aws.request_id"], r"[A-Z0-9]{52}"
)
del send_msg_attributes["aws.request_id"]
self.assertEqual(
send_msg_attributes,
{
"aws.operation": "SendMessage",
"aws.queue_url": response["QueueUrl"],
"aws.region": "us-east-1",
"aws.service": "sqs",
"http.status_code": 200,
},
)
@mock_kinesis
@ -204,14 +264,14 @@ class TestBotocoreInstrumentor(TestBase):
assert spans
span = spans[0]
self.assertEqual(len(spans), 1)
self.assertEqual(span.attributes["aws.region"], "us-east-1")
self.assertEqual(span.attributes["aws.operation"], "ListStreams")
assert_span_http_status_code(span, 200)
self.assertEqual(
span.resource,
Resource(
attributes={"endpoint": "kinesis", "operation": "liststreams"}
),
span.attributes,
{
"aws.operation": "ListStreams",
"aws.region": "us-east-1",
"aws.service": "kinesis",
"http.status_code": 200,
},
)
@mock_kinesis
@ -249,14 +309,14 @@ class TestBotocoreInstrumentor(TestBase):
assert spans
span = spans[0]
self.assertEqual(len(spans), 1)
self.assertEqual(span.attributes["aws.region"], "us-east-1")
self.assertEqual(span.attributes["aws.operation"], "ListFunctions")
assert_span_http_status_code(span, 200)
self.assertEqual(
span.resource,
Resource(
attributes={"endpoint": "lambda", "operation": "listfunctions"}
),
span.attributes,
{
"aws.operation": "ListFunctions",
"aws.region": "us-east-1",
"aws.service": "lambda",
"http.status_code": 200,
},
)
@mock_kms
@ -269,12 +329,38 @@ class TestBotocoreInstrumentor(TestBase):
assert spans
span = spans[0]
self.assertEqual(len(spans), 1)
self.assertEqual(span.attributes["aws.region"], "us-east-1")
self.assertEqual(span.attributes["aws.operation"], "ListKeys")
assert_span_http_status_code(span, 200)
self.assertEqual(
span.resource,
Resource(attributes={"endpoint": "kms", "operation": "listkeys"}),
span.attributes,
{
"aws.operation": "ListKeys",
"aws.region": "us-east-1",
"aws.service": "kms",
"http.status_code": 200,
},
)
# checking for protection on kms against security leak
self.assertTrue("params" not in span.attributes.keys())
@mock_sts
def test_sts_client(self):
sts = self.session.create_client("sts", region_name="us-east-1")
sts.get_caller_identity()
spans = self.memory_exporter.get_finished_spans()
assert spans
span = spans[0]
self.assertEqual(len(spans), 1)
self.assertEqual(
span.attributes,
{
"aws.operation": "GetCallerIdentity",
"aws.region": "us-east-1",
"aws.request_id": "c6104cbe-af31-11e0-8154-cbc7ccf896c7",
"aws.service": "sts",
"http.status_code": 200,
},
)
# checking for protection on sts against security leak
@ -299,25 +385,19 @@ class TestBotocoreInstrumentor(TestBase):
ec2.describe_instances()
spans = self.memory_exporter.get_finished_spans()
assert spans
span = spans[0]
self.assertEqual(len(spans), 1)
self.assertEqual(span.attributes["aws.agent"], "botocore")
self.assertEqual(span.attributes["aws.region"], "us-west-2")
span = spans[0]
describe_instances_attributes = spans[0].attributes
self.assertEqual(
span.attributes["aws.operation"], "DescribeInstances"
describe_instances_attributes,
{
"aws.operation": "DescribeInstances",
"aws.region": "us-west-2",
"aws.request_id": "fdcdcab1-ae5c-489e-9c33-4637c5dda355",
"aws.service": "ec2",
"http.status_code": 200,
},
)
assert_span_http_status_code(span, 200)
self.assertEqual(
span.resource,
Resource(
attributes={
"endpoint": "ec2",
"operation": "describeinstances",
}
),
)
self.assertEqual(span.name, "ec2.command")
self.assertIn(MockTextMapPropagator.TRACE_ID_KEY, headers)
self.assertEqual(
@ -345,3 +425,74 @@ class TestBotocoreInstrumentor(TestBase):
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(0, len(spans))
@mock_dynamodb2
def test_dynamodb_client(self):
ddb = self.session.create_client("dynamodb", region_name="us-west-2")
test_table_name = "test_table_name"
ddb.create_table(
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
],
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
ProvisionedThroughput={
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5,
},
TableName=test_table_name,
)
ddb.put_item(TableName=test_table_name, Item={"id": {"S": "test_key"}})
ddb.get_item(TableName=test_table_name, Key={"id": {"S": "test_key"}})
spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 3)
create_table_attributes = spans[0].attributes
self.assertRegex(
create_table_attributes["aws.request_id"], r"[A-Z0-9]{52}"
)
del create_table_attributes["aws.request_id"]
self.assertEqual(
create_table_attributes,
{
"aws.operation": "CreateTable",
"aws.region": "us-west-2",
"aws.service": "dynamodb",
"aws.table_name": "test_table_name",
"http.status_code": 200,
},
)
put_item_attributes = spans[1].attributes
self.assertRegex(
put_item_attributes["aws.request_id"], r"[A-Z0-9]{52}"
)
del put_item_attributes["aws.request_id"]
self.assertEqual(
put_item_attributes,
{
"aws.operation": "PutItem",
"aws.region": "us-west-2",
"aws.service": "dynamodb",
"aws.table_name": "test_table_name",
"http.status_code": 200,
},
)
get_item_attributes = spans[2].attributes
self.assertRegex(
get_item_attributes["aws.request_id"], r"[A-Z0-9]{52}"
)
del get_item_attributes["aws.request_id"]
self.assertEqual(
get_item_attributes,
{
"aws.operation": "GetItem",
"aws.region": "us-west-2",
"aws.service": "dynamodb",
"aws.table_name": "test_table_name",
"http.status_code": 200,
},
)