Feature/add boto3 sqs instrumentation (#1081)
* Add Boto3SQS Instrumentation * Add basic tests * Add context setting list * Fix linting * CR and lint fixes * Add newline * Run tox generate * Change the dependency version * Fix linting * PR fixes. Remove walrus operator and use the `start_as_current_span` * Run lint and generate Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com> Co-authored-by: Diego Hurtado <ocelotl@users.noreply.github.com>
This commit is contained in:
parent
a5ec3f7f55
commit
8a7a3f1993
|
|
@ -7,6 +7,10 @@ components:
|
|||
- oxeye-nikolay
|
||||
- nikosokolik
|
||||
|
||||
instrumentation/opentelemetry-instrumentation-boto3sqs:
|
||||
- oxeye-nikolay
|
||||
- nikosokolik
|
||||
|
||||
propagator/opentelemetry-propagator-aws-xray:
|
||||
- NathanielRN
|
||||
|
||||
|
|
|
|||
|
|
@ -28,6 +28,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||
([#1065](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1065))
|
||||
- `opentelemetry-instrumentation-redis` now instruments asynchronous Redis clients, if the installed redis-py includes async support (>=4.2.0).
|
||||
([#1076](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1076))
|
||||
- `opentelemetry-instrumentation-boto3sqs` added AWS's SQS instrumentation.
|
||||
([#1081](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1081))
|
||||
|
||||
## [1.11.1-0.30b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.11.1-0.30b1) - 2022-04-21
|
||||
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@
|
|||
| [opentelemetry-instrumentation-asyncpg](./opentelemetry-instrumentation-asyncpg) | asyncpg >= 0.12.0 |
|
||||
| [opentelemetry-instrumentation-aws-lambda](./opentelemetry-instrumentation-aws-lambda) | aws_lambda |
|
||||
| [opentelemetry-instrumentation-boto](./opentelemetry-instrumentation-boto) | boto~=2.0 |
|
||||
| [opentelemetry-instrumentation-boto3sqs](./opentelemetry-instrumentation-boto3sqs) | boto3 ~= 1.0 |
|
||||
| [opentelemetry-instrumentation-botocore](./opentelemetry-instrumentation-botocore) | botocore ~= 1.0 |
|
||||
| [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 |
|
||||
| [opentelemetry-instrumentation-dbapi](./opentelemetry-instrumentation-dbapi) | dbapi |
|
||||
|
|
|
|||
|
|
@ -0,0 +1,23 @@
|
|||
OpenTelemetry Boto3 SQS Instrumentation
|
||||
=======================================
|
||||
|
||||
|pypi|
|
||||
|
||||
.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-boto3sqs.svg
|
||||
:target: https://pypi.org/project/opentelemetry-instrumentation-boto3sqs/
|
||||
|
||||
This library allows tracing requests made by the Boto3 library to the SQS service.
|
||||
|
||||
Installation
|
||||
------------
|
||||
|
||||
::
|
||||
|
||||
pip install opentelemetry-instrumentation-boto3sqs
|
||||
|
||||
|
||||
References
|
||||
----------
|
||||
|
||||
* `OpenTelemetry boto3sqs/ Tracing <https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/boto3sqs/boto3sqs.html>`_
|
||||
* `OpenTelemetry Project <https://opentelemetry.io/>`_
|
||||
|
|
@ -0,0 +1,59 @@
|
|||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
[metadata]
|
||||
# opentelemetry-instrumentation plus the name of the library being instrument e.g
|
||||
# name = opentelemetry-instrumentation-sqlalchemy
|
||||
name = opentelemetry-instrumentation-boto3sqs
|
||||
# a description of the instrumentation e.g
|
||||
# description = SQLAlchemy tracing for OpenTelemetry
|
||||
description = Boto3 SQS service tracing for OpenTelemetry
|
||||
long_description = file: README.rst
|
||||
long_description_content_type = text/x-rst
|
||||
author = OpenTelemetry Authors
|
||||
author_email = cncf-opentelemetry-contributors@lists.cncf.io
|
||||
# url of the instrumentation e.g
|
||||
url = https://github.com/open-telemetry/opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentation-boto3sqs
|
||||
platforms = any
|
||||
license = Apache-2.0
|
||||
classifiers =
|
||||
Development Status :: 4 - Beta
|
||||
Intended Audience :: Developers
|
||||
License :: OSI Approved :: Apache Software License
|
||||
Programming Language :: Python
|
||||
Programming Language :: Python :: 3
|
||||
Programming Language :: Python :: 3.6
|
||||
Programming Language :: Python :: 3.7
|
||||
Programming Language :: Python :: 3.8
|
||||
Programming Language :: Python :: 3.9
|
||||
Programming Language :: Python :: 3.10
|
||||
|
||||
[options]
|
||||
python_requires = >=3.6
|
||||
package_dir=
|
||||
=src
|
||||
packages=find_namespace:
|
||||
install_requires =
|
||||
opentelemetry-api ~= 1.3
|
||||
wrapt >= 1.0.0, < 2.0.0
|
||||
|
||||
[options.extras_require]
|
||||
test =
|
||||
|
||||
[options.packages.find]
|
||||
where = src
|
||||
|
||||
[options.entry_points]
|
||||
opentelemetry_instrumentor =
|
||||
boto3sqs = opentelemetry.instrumentation.boto3sqs:Boto3SQSInstrumentation
|
||||
|
|
@ -0,0 +1,99 @@
|
|||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
# DO NOT EDIT. THIS FILE WAS AUTOGENERATED FROM templates/instrumentation_setup.py.txt.
|
||||
# RUN `python scripts/generate_setup.py` TO REGENERATE.
|
||||
|
||||
|
||||
import distutils.cmd
|
||||
import json
|
||||
import os
|
||||
from configparser import ConfigParser
|
||||
|
||||
import setuptools
|
||||
|
||||
config = ConfigParser()
|
||||
config.read("setup.cfg")
|
||||
|
||||
# We provide extras_require parameter to setuptools.setup later which
|
||||
# overwrites the extras_require section from setup.cfg. To support extras_require
|
||||
# section in setup.cfg, we load it here and merge it with the extras_require param.
|
||||
extras_require = {}
|
||||
if "options.extras_require" in config:
|
||||
for key, value in config["options.extras_require"].items():
|
||||
extras_require[key] = [v for v in value.split("\n") if v.strip()]
|
||||
|
||||
BASE_DIR = os.path.dirname(__file__)
|
||||
PACKAGE_INFO = {}
|
||||
|
||||
VERSION_FILENAME = os.path.join(
|
||||
BASE_DIR,
|
||||
"src",
|
||||
"opentelemetry",
|
||||
"instrumentation",
|
||||
"boto3sqs",
|
||||
"version.py",
|
||||
)
|
||||
with open(VERSION_FILENAME, encoding="utf-8") as f:
|
||||
exec(f.read(), PACKAGE_INFO)
|
||||
|
||||
PACKAGE_FILENAME = os.path.join(
|
||||
BASE_DIR,
|
||||
"src",
|
||||
"opentelemetry",
|
||||
"instrumentation",
|
||||
"boto3sqs",
|
||||
"package.py",
|
||||
)
|
||||
with open(PACKAGE_FILENAME, encoding="utf-8") as f:
|
||||
exec(f.read(), PACKAGE_INFO)
|
||||
|
||||
# Mark any instruments/runtime dependencies as test dependencies as well.
|
||||
extras_require["instruments"] = PACKAGE_INFO["_instruments"]
|
||||
test_deps = extras_require.get("test", [])
|
||||
for dep in extras_require["instruments"]:
|
||||
test_deps.append(dep)
|
||||
|
||||
extras_require["test"] = test_deps
|
||||
|
||||
|
||||
class JSONMetadataCommand(distutils.cmd.Command):
|
||||
|
||||
description = (
|
||||
"print out package metadata as JSON. This is used by OpenTelemetry dev scripts to ",
|
||||
"auto-generate code in other places",
|
||||
)
|
||||
user_options = []
|
||||
|
||||
def initialize_options(self):
|
||||
pass
|
||||
|
||||
def finalize_options(self):
|
||||
pass
|
||||
|
||||
def run(self):
|
||||
metadata = {
|
||||
"name": config["metadata"]["name"],
|
||||
"version": PACKAGE_INFO["__version__"],
|
||||
"instruments": PACKAGE_INFO["_instruments"],
|
||||
}
|
||||
print(json.dumps(metadata))
|
||||
|
||||
|
||||
setuptools.setup(
|
||||
cmdclass={"meta": JSONMetadataCommand},
|
||||
version=PACKAGE_INFO["__version__"],
|
||||
extras_require=extras_require,
|
||||
)
|
||||
|
|
@ -0,0 +1,426 @@
|
|||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Instrument `boto3sqs`_ to trace SQS applications.
|
||||
|
||||
.. _boto3sqs: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html
|
||||
|
||||
|
||||
Usage
|
||||
-----
|
||||
|
||||
.. code:: python
|
||||
|
||||
import boto3
|
||||
from opentelemetry.instrumentation.boto3sqs import Boto3SQSInstrumentor
|
||||
|
||||
|
||||
Boto3SQSInstrumentor().instrument()
|
||||
"""
|
||||
import logging
|
||||
from typing import Any, Collection, Dict, Generator, List, Optional
|
||||
|
||||
import boto3
|
||||
import botocore.client
|
||||
from wrapt import wrap_function_wrapper
|
||||
|
||||
from opentelemetry import context, propagate, trace
|
||||
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
|
||||
from opentelemetry.instrumentation.utils import (
|
||||
_SUPPRESS_INSTRUMENTATION_KEY,
|
||||
unwrap,
|
||||
)
|
||||
from opentelemetry.propagators.textmap import CarrierT, Getter, Setter
|
||||
from opentelemetry.semconv.trace import (
|
||||
MessagingDestinationKindValues,
|
||||
MessagingOperationValues,
|
||||
SpanAttributes,
|
||||
)
|
||||
from opentelemetry.trace import Link, Span, SpanKind, Tracer, TracerProvider
|
||||
|
||||
from .package import _instruments
|
||||
from .version import __version__
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
# We use this prefix so we can request all instrumentation MessageAttributeNames with a wildcard, without harming
|
||||
# existing filters
|
||||
_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER: str = "otel."
|
||||
_OTEL_IDENTIFIER_LENGTH = len(_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER)
|
||||
|
||||
|
||||
class Boto3SQSGetter(Getter):
|
||||
def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]:
|
||||
value = carrier.get(f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", {})
|
||||
if not value:
|
||||
return None
|
||||
return [value.get("StringValue")]
|
||||
|
||||
def keys(self, carrier: CarrierT) -> List[str]:
|
||||
return [
|
||||
key[_OTEL_IDENTIFIER_LENGTH:]
|
||||
if key.startswith(_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER)
|
||||
else key
|
||||
for key in carrier.keys()
|
||||
]
|
||||
|
||||
|
||||
class Boto3SQSSetter(Setter):
|
||||
def set(self, carrier: CarrierT, key: str, value: str) -> None:
|
||||
# This is a limitation defined by AWS for SQS MessageAttributes size
|
||||
if len(carrier.items()) < 10:
|
||||
carrier[f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}"] = {
|
||||
"StringValue": value,
|
||||
"DataType": "String",
|
||||
}
|
||||
else:
|
||||
_logger.warning(
|
||||
"Boto3 SQS instrumentation: cannot set context propagation on SQS/SNS message due to maximum amount of "
|
||||
"MessageAttributes"
|
||||
)
|
||||
|
||||
|
||||
boto3sqs_getter = Boto3SQSGetter()
|
||||
boto3sqs_setter = Boto3SQSSetter()
|
||||
|
||||
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
class Boto3SQSInstrumentor(BaseInstrumentor):
|
||||
received_messages_spans: Dict[str, Span] = {}
|
||||
current_span_related_to_token: Span = None
|
||||
current_context_token = None
|
||||
|
||||
class ContextableList(list):
|
||||
"""
|
||||
Since the classic way to process SQS messages is using a `for` loop, without a well defined scope like a
|
||||
callback - we are doing something similar to the instrumentaiton of Kafka-python and instrumenting the
|
||||
`__iter__` functions and the `__getitem__` functions to set the span context of the addressed message. Since
|
||||
the return value from an `SQS.ReceiveMessage` returns a builtin list, we cannot wrap it and change all of the
|
||||
calls for `list.__iter__` and `list.__getitem__` - therefore we use ContextableList. It is bound to the
|
||||
received_messages_spans dict
|
||||
"""
|
||||
|
||||
def __getitem__(self, key: int) -> Any:
|
||||
retval = super(
|
||||
Boto3SQSInstrumentor.ContextableList, self
|
||||
).__getitem__(key)
|
||||
if not isinstance(retval, dict):
|
||||
return retval
|
||||
receipt_handle = retval.get("ReceiptHandle")
|
||||
if not receipt_handle:
|
||||
return retval
|
||||
started_span = Boto3SQSInstrumentor.received_messages_spans.get(
|
||||
receipt_handle
|
||||
)
|
||||
if started_span is None:
|
||||
return retval
|
||||
if Boto3SQSInstrumentor.current_context_token:
|
||||
context.detach(Boto3SQSInstrumentor.current_context_token)
|
||||
Boto3SQSInstrumentor.current_context_token = context.attach(
|
||||
trace.set_span_in_context(started_span)
|
||||
)
|
||||
Boto3SQSInstrumentor.current_span_related_to_token = started_span
|
||||
return retval
|
||||
|
||||
def __iter__(self) -> Generator:
|
||||
index = 0
|
||||
while index < len(self):
|
||||
yield self[index]
|
||||
index = index + 1
|
||||
|
||||
def instrumentation_dependencies(self) -> Collection[str]:
|
||||
return _instruments
|
||||
|
||||
@staticmethod
|
||||
def _enrich_span(
|
||||
span: Span,
|
||||
queue_name: str,
|
||||
conversation_id: Optional[str] = None,
|
||||
operation: Optional[MessagingOperationValues] = None,
|
||||
message_id: Optional[str] = None,
|
||||
) -> None:
|
||||
if not span.is_recording():
|
||||
return
|
||||
span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "aws.sqs")
|
||||
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, queue_name)
|
||||
span.set_attribute(
|
||||
SpanAttributes.MESSAGING_DESTINATION_KIND,
|
||||
MessagingDestinationKindValues.QUEUE.value,
|
||||
)
|
||||
if operation:
|
||||
span.set_attribute(
|
||||
SpanAttributes.MESSAGING_OPERATION, operation.value
|
||||
)
|
||||
else:
|
||||
span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)
|
||||
if conversation_id:
|
||||
span.set_attribute(
|
||||
SpanAttributes.MESSAGING_CONVERSATION_ID, conversation_id
|
||||
)
|
||||
if message_id:
|
||||
span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, message_id)
|
||||
|
||||
@staticmethod
|
||||
def _safe_end_processing_span(receipt_handle: str) -> None:
|
||||
started_span: Span = Boto3SQSInstrumentor.received_messages_spans.pop(
|
||||
receipt_handle, None
|
||||
)
|
||||
if started_span:
|
||||
if (
|
||||
Boto3SQSInstrumentor.current_span_related_to_token
|
||||
== started_span
|
||||
):
|
||||
context.detach(Boto3SQSInstrumentor.current_context_token)
|
||||
Boto3SQSInstrumentor.current_context_token = None
|
||||
started_span.end()
|
||||
|
||||
@staticmethod
|
||||
def _extract_queue_name_from_url(queue_url: str) -> str:
|
||||
# A Queue name cannot have the `/` char, therefore we can return the part after the last /
|
||||
return queue_url.split("/")[-1]
|
||||
|
||||
def _create_processing_span(
|
||||
self, queue_name: str, receipt_handle: str, message: Dict[str, Any]
|
||||
) -> None:
|
||||
message_attributes = message.get("MessageAttributes", {})
|
||||
links = []
|
||||
ctx = propagate.extract(message_attributes, getter=boto3sqs_getter)
|
||||
if ctx:
|
||||
for item in ctx.values():
|
||||
if hasattr(item, "get_span_context"):
|
||||
links.append(Link(context=item.get_span_context()))
|
||||
span = self._tracer.start_span(
|
||||
name=f"{queue_name} process", links=links, kind=SpanKind.CONSUMER
|
||||
)
|
||||
with trace.use_span(span):
|
||||
message_id = message.get("MessageId")
|
||||
Boto3SQSInstrumentor.received_messages_spans[receipt_handle] = span
|
||||
Boto3SQSInstrumentor._enrich_span(
|
||||
span,
|
||||
queue_name,
|
||||
message_id=message_id,
|
||||
operation=MessagingOperationValues.PROCESS,
|
||||
)
|
||||
|
||||
def _wrap_send_message(self) -> None:
|
||||
def send_wrapper(wrapped, instance, args, kwargs):
|
||||
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
|
||||
return wrapped(*args, **kwargs)
|
||||
queue_url = kwargs.get("QueueUrl")
|
||||
# The method expect QueueUrl and Entries params, so if they are None, we call wrapped to receive the
|
||||
# original exception
|
||||
queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url(
|
||||
queue_url
|
||||
)
|
||||
with self._tracer.start_as_current_span(
|
||||
name=f"{queue_name} send",
|
||||
kind=SpanKind.PRODUCER,
|
||||
end_on_exit=True,
|
||||
) as span:
|
||||
Boto3SQSInstrumentor._enrich_span(span, queue_name)
|
||||
attributes = kwargs.pop("MessageAttributes", {})
|
||||
propagate.inject(attributes, setter=boto3sqs_setter)
|
||||
retval = wrapped(*args, MessageAttributes=attributes, **kwargs)
|
||||
message_id = retval.get("MessageId")
|
||||
if message_id:
|
||||
if span.is_recording():
|
||||
span.set_attribute(
|
||||
SpanAttributes.MESSAGING_MESSAGE_ID, message_id
|
||||
)
|
||||
return retval
|
||||
|
||||
wrap_function_wrapper(self._sqs_class, "send_message", send_wrapper)
|
||||
|
||||
def _wrap_send_message_batch(self) -> None:
|
||||
def send_batch_wrapper(wrapped, instance, args, kwargs):
|
||||
queue_url = kwargs.get("QueueUrl")
|
||||
entries = kwargs.get("Entries")
|
||||
# The method expect QueueUrl and Entries params, so if they are None, we call wrapped to receive the
|
||||
# original exception
|
||||
if (
|
||||
context.get_value(_SUPPRESS_INSTRUMENTATION_KEY)
|
||||
or not queue_url
|
||||
or not entries
|
||||
):
|
||||
return wrapped(*args, **kwargs)
|
||||
queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url(
|
||||
queue_url
|
||||
)
|
||||
ids_to_spans: Dict[str, Span] = {}
|
||||
for entry in entries:
|
||||
entry_id = entry["Id"]
|
||||
span = self._tracer.start_span(
|
||||
name=f"{queue_name} send",
|
||||
kind=SpanKind.PRODUCER,
|
||||
)
|
||||
ids_to_spans[entry_id] = span
|
||||
Boto3SQSInstrumentor._enrich_span(
|
||||
span, queue_name, conversation_id=entry_id
|
||||
)
|
||||
with trace.use_span(span):
|
||||
if "MessageAttributes" not in entry:
|
||||
entry["MessageAttributes"] = {}
|
||||
propagate.inject(
|
||||
entry["MessageAttributes"], setter=boto3sqs_setter
|
||||
)
|
||||
retval = wrapped(*args, **kwargs)
|
||||
for successful_messages in retval["Successful"]:
|
||||
message_identifier = successful_messages["Id"]
|
||||
message_span = ids_to_spans.get(message_identifier)
|
||||
if message_span:
|
||||
if message_span.is_recording():
|
||||
message_span.set_attribute(
|
||||
SpanAttributes.MESSAGING_MESSAGE_ID,
|
||||
successful_messages.get("MessageId"),
|
||||
)
|
||||
for span in ids_to_spans.values():
|
||||
span.end()
|
||||
return retval
|
||||
|
||||
wrap_function_wrapper(
|
||||
self._sqs_class, "send_message_batch", send_batch_wrapper
|
||||
)
|
||||
|
||||
def _wrap_receive_message(self) -> None:
|
||||
def receive_message_wrapper(wrapped, instance, args, kwargs):
|
||||
queue_url = kwargs.get("QueueUrl")
|
||||
message_attribute_names = kwargs.pop("MessageAttributeNames", [])
|
||||
message_attribute_names.append(
|
||||
f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}*"
|
||||
)
|
||||
queue_name = Boto3SQSInstrumentor._extract_queue_name_from_url(
|
||||
queue_url
|
||||
)
|
||||
with self._tracer.start_as_current_span(
|
||||
name=f"{queue_name} receive",
|
||||
end_on_exit=True,
|
||||
kind=SpanKind.CONSUMER,
|
||||
) as span:
|
||||
Boto3SQSInstrumentor._enrich_span(
|
||||
span,
|
||||
queue_name,
|
||||
operation=MessagingOperationValues.RECEIVE,
|
||||
)
|
||||
retval = wrapped(
|
||||
*args,
|
||||
MessageAttributeNames=message_attribute_names,
|
||||
**kwargs,
|
||||
)
|
||||
messages = retval.get("Messages", [])
|
||||
if not messages:
|
||||
return retval
|
||||
for message in messages:
|
||||
receipt_handle = message.get("ReceiptHandle")
|
||||
if not receipt_handle:
|
||||
continue
|
||||
Boto3SQSInstrumentor._safe_end_processing_span(
|
||||
receipt_handle
|
||||
)
|
||||
self._create_processing_span(
|
||||
queue_name, receipt_handle, message
|
||||
)
|
||||
retval["Messages"] = Boto3SQSInstrumentor.ContextableList(
|
||||
messages
|
||||
)
|
||||
return retval
|
||||
|
||||
wrap_function_wrapper(
|
||||
self._sqs_class, "receive_message", receive_message_wrapper
|
||||
)
|
||||
|
||||
def _wrap_delete_message(self) -> None:
|
||||
def delete_message_wrapper(wrapped, instance, args, kwargs):
|
||||
receipt_handle = kwargs.get("ReceiptHandle")
|
||||
if receipt_handle:
|
||||
Boto3SQSInstrumentor._safe_end_processing_span(receipt_handle)
|
||||
return wrapped(*args, **kwargs)
|
||||
|
||||
wrap_function_wrapper(
|
||||
self._sqs_class, "delete_message", delete_message_wrapper
|
||||
)
|
||||
|
||||
def _wrap_delete_message_batch(self) -> None:
|
||||
def delete_message_wrapper_batch(wrapped, instance, args, kwargs):
|
||||
entries = kwargs.get("Entries")
|
||||
for entry in entries:
|
||||
receipt_handle = entry.get("ReceiptHandle")
|
||||
if receipt_handle:
|
||||
Boto3SQSInstrumentor._safe_end_processing_span(
|
||||
receipt_handle
|
||||
)
|
||||
return wrapped(*args, **kwargs)
|
||||
|
||||
wrap_function_wrapper(
|
||||
self._sqs_class,
|
||||
"delete_message_batch",
|
||||
delete_message_wrapper_batch,
|
||||
)
|
||||
|
||||
def _wrap_client_creation(self) -> None:
|
||||
"""
|
||||
Since botocore creates classes on the fly using schemas, the SQS class is not necesraily created upon the call
|
||||
of `instrument()`. Therefore we need to wrap the creation of the boto3 client, which triggers the creation of
|
||||
the SQS client.
|
||||
"""
|
||||
|
||||
def client_wrapper(wrapped, instance, args, kwargs):
|
||||
retval = wrapped(*args, **kwargs)
|
||||
if not self._did_decorate:
|
||||
self._decorate_sqs()
|
||||
return retval
|
||||
|
||||
wrap_function_wrapper(boto3, "client", client_wrapper)
|
||||
|
||||
def _decorate_sqs(self) -> None:
|
||||
"""
|
||||
Since botocore creates classes on the fly using schemas, we try to find the class that inherits from the base
|
||||
class and is SQS to wrap.
|
||||
"""
|
||||
# We define SQS client as the only client that implements send_message_batch
|
||||
sqs_class = [
|
||||
cls
|
||||
for cls in botocore.client.BaseClient.__subclasses__()
|
||||
if hasattr(cls, "send_message_batch")
|
||||
]
|
||||
if sqs_class:
|
||||
self._sqs_class = sqs_class[0]
|
||||
self._did_decorate = True
|
||||
self._wrap_send_message()
|
||||
self._wrap_send_message_batch()
|
||||
self._wrap_receive_message()
|
||||
self._wrap_delete_message()
|
||||
self._wrap_delete_message_batch()
|
||||
|
||||
def _un_decorate_sqs(self) -> None:
|
||||
if self._did_decorate:
|
||||
unwrap(self._sqs_class, "send_message")
|
||||
unwrap(self._sqs_class, "send_message_batch")
|
||||
unwrap(self._sqs_class, "receive_message")
|
||||
unwrap(self._sqs_class, "delete_message")
|
||||
unwrap(self._sqs_class, "delete_message_batch")
|
||||
self._did_decorate = False
|
||||
|
||||
def _instrument(self, **kwargs: Dict[str, Any]) -> None:
|
||||
self._did_decorate: bool = False
|
||||
self._tracer_provider: Optional[TracerProvider] = kwargs.get(
|
||||
"tracer_provider"
|
||||
)
|
||||
self._tracer: Tracer = trace.get_tracer(
|
||||
__name__, __version__, self._tracer_provider
|
||||
)
|
||||
self._wrap_client_creation()
|
||||
self._decorate_sqs()
|
||||
|
||||
def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
|
||||
unwrap(boto3, "client")
|
||||
self._un_decorate_sqs()
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from typing import Collection
|
||||
|
||||
_instruments: Collection[str] = ("boto3 ~= 1.0",)
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
__version__ = "0.31b0"
|
||||
|
|
@ -0,0 +1,152 @@
|
|||
# Copyright The OpenTelemetry Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# pylint: disable=no-name-in-module
|
||||
|
||||
from unittest import TestCase
|
||||
|
||||
import boto3
|
||||
import botocore.client
|
||||
from wrapt import BoundFunctionWrapper, FunctionWrapper
|
||||
|
||||
from opentelemetry.instrumentation.boto3sqs import (
|
||||
_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER,
|
||||
Boto3SQSGetter,
|
||||
Boto3SQSInstrumentor,
|
||||
Boto3SQSSetter,
|
||||
)
|
||||
|
||||
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
class TestBoto3SQSInstrumentor(TestCase):
|
||||
def define_sqs_mock(self) -> None:
|
||||
# pylint: disable=R0201
|
||||
class SQSClientMock(botocore.client.BaseClient):
|
||||
def send_message(self, *args, **kwargs):
|
||||
...
|
||||
|
||||
def send_message_batch(self, *args, **kwargs):
|
||||
...
|
||||
|
||||
def receive_message(self, *args, **kwargs):
|
||||
...
|
||||
|
||||
def delete_message(self, *args, **kwargs):
|
||||
...
|
||||
|
||||
def delete_message_batch(self, *args, **kwargs):
|
||||
...
|
||||
|
||||
self._boto_sqs_mock = SQSClientMock
|
||||
|
||||
def test_instrument_api_before_client_init(self) -> None:
|
||||
instrumentation = Boto3SQSInstrumentor()
|
||||
|
||||
instrumentation.instrument()
|
||||
self.assertTrue(isinstance(boto3.client, FunctionWrapper))
|
||||
instrumentation.uninstrument()
|
||||
|
||||
def test_instrument_api_after_client_init(self) -> None:
|
||||
self.define_sqs_mock()
|
||||
instrumentation = Boto3SQSInstrumentor()
|
||||
|
||||
instrumentation.instrument()
|
||||
self.assertTrue(isinstance(boto3.client, FunctionWrapper))
|
||||
self.assertTrue(
|
||||
isinstance(self._boto_sqs_mock.send_message, BoundFunctionWrapper)
|
||||
)
|
||||
self.assertTrue(
|
||||
isinstance(
|
||||
self._boto_sqs_mock.send_message_batch, BoundFunctionWrapper
|
||||
)
|
||||
)
|
||||
self.assertTrue(
|
||||
isinstance(
|
||||
self._boto_sqs_mock.receive_message, BoundFunctionWrapper
|
||||
)
|
||||
)
|
||||
self.assertTrue(
|
||||
isinstance(
|
||||
self._boto_sqs_mock.delete_message, BoundFunctionWrapper
|
||||
)
|
||||
)
|
||||
self.assertTrue(
|
||||
isinstance(
|
||||
self._boto_sqs_mock.delete_message_batch, BoundFunctionWrapper
|
||||
)
|
||||
)
|
||||
instrumentation.uninstrument()
|
||||
|
||||
|
||||
class TestBoto3SQSGetter(TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.getter = Boto3SQSGetter()
|
||||
|
||||
def test_get_none(self) -> None:
|
||||
carrier = {}
|
||||
value = self.getter.get(carrier, "test")
|
||||
self.assertIsNone(value)
|
||||
|
||||
def test_get_value(self) -> None:
|
||||
key = "test"
|
||||
value = "value"
|
||||
carrier = {
|
||||
f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}": {
|
||||
"StringValue": value,
|
||||
"DataType": "String",
|
||||
}
|
||||
}
|
||||
val = self.getter.get(carrier, key)
|
||||
self.assertEqual(val, [value])
|
||||
|
||||
def test_keys(self):
|
||||
key1 = "test1"
|
||||
value1 = "value1"
|
||||
key2 = "test2"
|
||||
value2 = "value2"
|
||||
carrier = {
|
||||
f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key1}": {
|
||||
"StringValue": value1,
|
||||
"DataType": "String",
|
||||
},
|
||||
key2: {"StringValue": value2, "DataType": "String"},
|
||||
}
|
||||
keys = self.getter.keys(carrier)
|
||||
self.assertEqual(keys, [key1, key2])
|
||||
|
||||
def test_keys_empty(self):
|
||||
keys = self.getter.keys({})
|
||||
self.assertEqual(keys, [])
|
||||
|
||||
|
||||
class TestBoto3SQSSetter(TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.setter = Boto3SQSSetter()
|
||||
|
||||
def test_simple(self):
|
||||
original_key = "SomeHeader"
|
||||
original_value = {"NumberValue": 1, "DataType": "Number"}
|
||||
carrier = {original_key: original_value.copy()}
|
||||
key = "test"
|
||||
value = "value"
|
||||
self.setter.set(carrier, key, value)
|
||||
# Ensure the original value is not harmed
|
||||
for dict_key, dict_val in carrier[original_key].items():
|
||||
self.assertEqual(original_value[dict_key], dict_val)
|
||||
# Ensure the new key is added well
|
||||
self.assertIn(
|
||||
f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}", carrier.keys()
|
||||
)
|
||||
new_value = carrier[f"{_OPENTELEMETRY_ATTRIBUTE_IDENTIFIER}{key}"]
|
||||
self.assertEqual(new_value["StringValue"], value)
|
||||
|
|
@ -34,6 +34,7 @@ install_requires =
|
|||
opentelemetry-instrumentation-asyncpg==0.31b0
|
||||
opentelemetry-instrumentation-aws-lambda==0.31b0
|
||||
opentelemetry-instrumentation-boto==0.31b0
|
||||
opentelemetry-instrumentation-boto3sqs==0.31b0
|
||||
opentelemetry-instrumentation-botocore==0.31b0
|
||||
opentelemetry-instrumentation-celery==0.31b0
|
||||
opentelemetry-instrumentation-dbapi==0.31b0
|
||||
|
|
|
|||
|
|
@ -36,6 +36,10 @@ libraries = {
|
|||
"library": "boto~=2.0",
|
||||
"instrumentation": "opentelemetry-instrumentation-boto==0.31b0",
|
||||
},
|
||||
"boto3": {
|
||||
"library": "boto3 ~= 1.0",
|
||||
"instrumentation": "opentelemetry-instrumentation-boto3sqs==0.31b0",
|
||||
},
|
||||
"botocore": {
|
||||
"library": "botocore ~= 1.0",
|
||||
"instrumentation": "opentelemetry-instrumentation-botocore==0.31b0",
|
||||
|
|
|
|||
Loading…
Reference in New Issue