Support confluent kafka (#1111)
* add kafka instrumentation * add confluent kafka instrumentation * fix tests * change documentation * lint fix * fix lint Co-authored-by: Nikolay Sokolik <81902191+oxeye-nikolay@users.noreply.github.com>
This commit is contained in:
		
							parent
							
								
									5f7c293e0f
								
							
						
					
					
						commit
						a8b9829f76
					
				|  | @ -15,6 +15,10 @@ components: | |||
|     - ben-natan | ||||
|     - machine424 | ||||
| 
 | ||||
|   instrumentation/opentelemetry-instrumentation-confluent-kafka: | ||||
|     - oxeye-dorkolog | ||||
|     - dorkolog | ||||
| 
 | ||||
|   propagator/opentelemetry-propagator-aws-xray: | ||||
|     - NathanielRN | ||||
| 
 | ||||
|  |  | |||
|  | @ -17,6 +17,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 | |||
| - `opentelemetry-instrumentation-remoulade` Initial release | ||||
|   ([#1082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1082)) | ||||
| 
 | ||||
| ### Added | ||||
| - Added `opentelemetry-instrumention-confluent-kafka` | ||||
|   ([#1111](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1111)) | ||||
| 
 | ||||
| 
 | ||||
| ## [1.12.0rc1-0.31b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc1-0.31b0) - 2022-05-17 | ||||
| 
 | ||||
| ### Fixed | ||||
|  |  | |||
|  | @ -10,6 +10,7 @@ | |||
| | [opentelemetry-instrumentation-boto3sqs](./opentelemetry-instrumentation-boto3sqs) | boto3 ~= 1.0 | | ||||
| | [opentelemetry-instrumentation-botocore](./opentelemetry-instrumentation-botocore) | botocore ~= 1.0 | | ||||
| | [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | | ||||
| | [opentelemetry-instrumentation-confluent-kafka](./opentelemetry-instrumentation-confluent-kafka) | confluent-kafka ~= 1.8.2 | | ||||
| | [opentelemetry-instrumentation-dbapi](./opentelemetry-instrumentation-dbapi) | dbapi | | ||||
| | [opentelemetry-instrumentation-django](./opentelemetry-instrumentation-django) | django >= 1.10 | | ||||
| | [opentelemetry-instrumentation-elasticsearch](./opentelemetry-instrumentation-elasticsearch) | elasticsearch >= 2.0 | | ||||
|  |  | |||
|  | @ -0,0 +1,23 @@ | |||
| OpenTelemetry confluent-kafka Instrumentation | ||||
| ============================================= | ||||
| 
 | ||||
| |pypi| | ||||
| 
 | ||||
| .. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-confluent-kafka.svg | ||||
|    :target: https://pypi.org/project/opentelemetry-instrumentation-confluent-kafka/ | ||||
| 
 | ||||
| This library allows tracing requests made by the confluent-kafka library. | ||||
| 
 | ||||
| Installation | ||||
| ------------ | ||||
| 
 | ||||
| :: | ||||
| 
 | ||||
|     pip install opentelemetry-instrumentation-confluent-kafka | ||||
| 
 | ||||
| 
 | ||||
| References | ||||
| ---------- | ||||
| 
 | ||||
| * `OpenTelemetry confluent-kafka/ Tracing <https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/confluent-kafka/confluent-kafka.html>`_ | ||||
| * `OpenTelemetry Project <https://opentelemetry.io/>`_ | ||||
|  | @ -0,0 +1,57 @@ | |||
| # Copyright The OpenTelemetry Authors | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| # | ||||
| [metadata] | ||||
| name = opentelemetry-instrumentation-confluent-kafka | ||||
| description = OpenTelemetry Confluent Kafka instrumentation | ||||
| long_description = file: README.rst | ||||
| long_description_content_type = text/x-rst | ||||
| author = OpenTelemetry Authors | ||||
| author_email = cncf-opentelemetry-contributors@lists.cncf.io | ||||
| url = https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation/opentelemetry-instrumentation-confluent-kafka | ||||
| platforms = any | ||||
| license = Apache-2.0 | ||||
| classifiers = | ||||
|     Development Status :: 4 - Beta | ||||
|     Intended Audience :: Developers | ||||
|     License :: OSI Approved :: Apache Software License | ||||
|     Programming Language :: Python | ||||
|     Programming Language :: Python :: 3 | ||||
|     Programming Language :: Python :: 3.6 | ||||
|     Programming Language :: Python :: 3.7 | ||||
|     Programming Language :: Python :: 3.8 | ||||
|     Programming Language :: Python :: 3.9 | ||||
|     Programming Language :: Python :: 3.10 | ||||
| 
 | ||||
| [options] | ||||
| python_requires = >=3.6 | ||||
| package_dir= | ||||
|     =src | ||||
| packages=find_namespace: | ||||
| 
 | ||||
| install_requires = | ||||
|     opentelemetry-api ~= 1.3 | ||||
|     wrapt >= 1.0.0, < 2.0.0 | ||||
| 
 | ||||
| [options.extras_require] | ||||
| test = | ||||
|     # add any test dependencies here | ||||
|     confluent-kafka ~= 1.8.2 | ||||
| 
 | ||||
| [options.packages.find] | ||||
| where = src | ||||
| 
 | ||||
| [options.entry_points] | ||||
| opentelemetry_instrumentor = | ||||
|     confluent_kafka = opentelemetry.instrumentation.confluent_kafka:ConfluentKafkaInstrumentor | ||||
|  | @ -0,0 +1,99 @@ | |||
| # Copyright The OpenTelemetry Authors | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| 
 | ||||
| # DO NOT EDIT. THIS FILE WAS AUTOGENERATED FROM templates/instrumentation_setup.py.txt. | ||||
| # RUN `python scripts/generate_setup.py` TO REGENERATE. | ||||
| 
 | ||||
| 
 | ||||
| import distutils.cmd | ||||
| import json | ||||
| import os | ||||
| from configparser import ConfigParser | ||||
| 
 | ||||
| import setuptools | ||||
| 
 | ||||
| config = ConfigParser() | ||||
| config.read("setup.cfg") | ||||
| 
 | ||||
| # We provide extras_require parameter to setuptools.setup later which | ||||
| # overwrites the extras_require section from setup.cfg. To support extras_require | ||||
| # section in setup.cfg, we load it here and merge it with the extras_require param. | ||||
| extras_require = {} | ||||
| if "options.extras_require" in config: | ||||
|     for key, value in config["options.extras_require"].items(): | ||||
|         extras_require[key] = [v for v in value.split("\n") if v.strip()] | ||||
| 
 | ||||
| BASE_DIR = os.path.dirname(__file__) | ||||
| PACKAGE_INFO = {} | ||||
| 
 | ||||
| VERSION_FILENAME = os.path.join( | ||||
|     BASE_DIR, | ||||
|     "src", | ||||
|     "opentelemetry", | ||||
|     "instrumentation", | ||||
|     "confluent_kafka", | ||||
|     "version.py", | ||||
| ) | ||||
| with open(VERSION_FILENAME, encoding="utf-8") as f: | ||||
|     exec(f.read(), PACKAGE_INFO) | ||||
| 
 | ||||
| PACKAGE_FILENAME = os.path.join( | ||||
|     BASE_DIR, | ||||
|     "src", | ||||
|     "opentelemetry", | ||||
|     "instrumentation", | ||||
|     "confluent_kafka", | ||||
|     "package.py", | ||||
| ) | ||||
| with open(PACKAGE_FILENAME, encoding="utf-8") as f: | ||||
|     exec(f.read(), PACKAGE_INFO) | ||||
| 
 | ||||
| # Mark any instruments/runtime dependencies as test dependencies as well. | ||||
| extras_require["instruments"] = PACKAGE_INFO["_instruments"] | ||||
| test_deps = extras_require.get("test", []) | ||||
| for dep in extras_require["instruments"]: | ||||
|     test_deps.append(dep) | ||||
| 
 | ||||
| extras_require["test"] = test_deps | ||||
| 
 | ||||
| 
 | ||||
| class JSONMetadataCommand(distutils.cmd.Command): | ||||
| 
 | ||||
|     description = ( | ||||
|         "print out package metadata as JSON. This is used by OpenTelemetry dev scripts to ", | ||||
|         "auto-generate code in other places", | ||||
|     ) | ||||
|     user_options = [] | ||||
| 
 | ||||
|     def initialize_options(self): | ||||
|         pass | ||||
| 
 | ||||
|     def finalize_options(self): | ||||
|         pass | ||||
| 
 | ||||
|     def run(self): | ||||
|         metadata = { | ||||
|             "name": config["metadata"]["name"], | ||||
|             "version": PACKAGE_INFO["__version__"], | ||||
|             "instruments": PACKAGE_INFO["_instruments"], | ||||
|         } | ||||
|         print(json.dumps(metadata)) | ||||
| 
 | ||||
| 
 | ||||
| setuptools.setup( | ||||
|     cmdclass={"meta": JSONMetadataCommand}, | ||||
|     version=PACKAGE_INFO["__version__"], | ||||
|     extras_require=extras_require, | ||||
| ) | ||||
|  | @ -0,0 +1,360 @@ | |||
| # Copyright The OpenTelemetry Authors | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| """ | ||||
| Instrument `confluent-kafka-python` to report instrumentation-confluent-kafka produced and consumed messages | ||||
| 
 | ||||
| Usage | ||||
| ----- | ||||
| 
 | ||||
| ..code:: python | ||||
| 
 | ||||
|     from opentelemetry.instrumentation.confluentkafka import ConfluentKafkaInstrumentor | ||||
|     from confluent_kafka import Producer, Consumer | ||||
| 
 | ||||
|     # Instrument kafka | ||||
|     ConfluentKafkaInstrumentor().instrument() | ||||
| 
 | ||||
|     # report a span of type producer with the default settings | ||||
|     conf1 = {'bootstrap.servers': "localhost:9092"} | ||||
|     producer = Producer(conf1) | ||||
|     producer.produce('my-topic',b'raw_bytes') | ||||
| 
 | ||||
|     conf2 = {'bootstrap.servers': "localhost:9092", | ||||
|         'group.id': "foo", | ||||
|         'auto.offset.reset': 'smallest'} | ||||
|     # report a span of type consumer with the default settings | ||||
|     consumer = Consumer(conf2) | ||||
|     def basic_consume_loop(consumer, topics): | ||||
|         try: | ||||
|             consumer.subscribe(topics) | ||||
|             running = True | ||||
|             while running: | ||||
|                 msg = consumer.poll(timeout=1.0) | ||||
|                 if msg is None: continue | ||||
| 
 | ||||
|                 if msg.error(): | ||||
|                     if msg.error().code() == KafkaError._PARTITION_EOF: | ||||
|                         # End of partition event | ||||
|                         sys.stderr.write(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}}\n") | ||||
|                     elif msg.error(): | ||||
|                         raise KafkaException(msg.error()) | ||||
|                 else: | ||||
|                     msg_process(msg) | ||||
|         finally: | ||||
|             # Close down consumer to commit final offsets. | ||||
|             consumer.close() | ||||
| 
 | ||||
|     basic_consume_loop(consumer, "my-topic") | ||||
| 
 | ||||
| 
 | ||||
| The `_instrument` method accepts the following keyword args: | ||||
| tracer_provider (TracerProvider) - an optional tracer provider | ||||
| instrument_producer (Callable) - a function with extra user-defined logic to be performed before sending the message | ||||
|                           this function signature is: | ||||
|                           def instrument_producer(producer: Producer, tracer_provider=None) | ||||
| instrument_consumer (Callable) - a function with extra user-defined logic to be performed after consuming a message | ||||
|                           this function signature is: | ||||
|                           def instrument_consumer(consumer: Consumer, tracer_provider=None) | ||||
| for example: | ||||
| .. code: python | ||||
|     from opentelemetry.instrumentation.confluentkafka import ConfluentKafkaInstrumentor | ||||
|     from confluent_kafka import Producer, Consumer | ||||
| 
 | ||||
|     inst = ConfluentKafkaInstrumentor() | ||||
| 
 | ||||
|     p = confluent_kafka.Producer({'bootstrap.servers': 'localhost:29092'}) | ||||
|     c = confluent_kafka.Consumer({ | ||||
|         'bootstrap.servers': 'localhost:29092', | ||||
|         'group.id': 'mygroup', | ||||
|         'auto.offset.reset': 'earliest' | ||||
|     }) | ||||
| 
 | ||||
|     # instrument confluent kafka with produce and consume hooks | ||||
|     p = inst.instrument_producer(p, tracer_provider) | ||||
|     c = inst.instrument_consumer(c, tracer_provider=tracer_provider) | ||||
| 
 | ||||
| 
 | ||||
|     # Using kafka as normal now will automatically generate spans, | ||||
|     # including user custom attributes added from the hooks | ||||
|     conf = {'bootstrap.servers': "localhost:9092"} | ||||
|     p.produce('my-topic',b'raw_bytes') | ||||
|     msg = c.poll() | ||||
| 
 | ||||
| 
 | ||||
| API | ||||
| ___ | ||||
| """ | ||||
| from typing import Collection | ||||
| 
 | ||||
| import confluent_kafka | ||||
| import wrapt | ||||
| from confluent_kafka import Consumer, Producer | ||||
| 
 | ||||
| from opentelemetry import context, propagate, trace | ||||
| from opentelemetry.instrumentation.instrumentor import BaseInstrumentor | ||||
| from opentelemetry.instrumentation.utils import unwrap | ||||
| from opentelemetry.semconv.trace import MessagingOperationValues | ||||
| from opentelemetry.trace import Link, SpanKind, Tracer | ||||
| 
 | ||||
| from .package import _instruments | ||||
| from .utils import ( | ||||
|     KafkaPropertiesExtractor, | ||||
|     _enrich_span, | ||||
|     _get_span_name, | ||||
|     _kafka_getter, | ||||
|     _kafka_setter, | ||||
| ) | ||||
| from .version import __version__ | ||||
| 
 | ||||
| 
 | ||||
| class AutoInstrumentedProducer(Producer): | ||||
| 
 | ||||
|     # This method is deliberately implemented in order to allow wrapt to wrap this function | ||||
|     def produce( | ||||
|         self, topic, value=None, *args, **kwargs | ||||
|     ):  # pylint: disable=keyword-arg-before-vararg,useless-super-delegation | ||||
|         super().produce(topic, value, *args, **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| class AutoInstrumentedConsumer(Consumer): | ||||
|     def __init__(self, config): | ||||
|         super().__init__(config) | ||||
|         self._current_consume_span = None | ||||
| 
 | ||||
|     # This method is deliberately implemented in order to allow wrapt to wrap this function | ||||
|     def poll(self, timeout=-1):  # pylint: disable=useless-super-delegation | ||||
|         return super().poll(timeout) | ||||
| 
 | ||||
| 
 | ||||
| class ProxiedProducer(Producer): | ||||
|     def __init__(self, producer: Producer, tracer: Tracer): | ||||
|         self._producer = producer | ||||
|         self._tracer = tracer | ||||
| 
 | ||||
|     def flush(self, timeout=-1): | ||||
|         self._producer.flush(timeout) | ||||
| 
 | ||||
|     def poll(self, timeout=-1): | ||||
|         self._producer.poll(timeout) | ||||
| 
 | ||||
|     def produce( | ||||
|         self, topic, value=None, *args, **kwargs | ||||
|     ):  # pylint: disable=keyword-arg-before-vararg | ||||
|         new_kwargs = kwargs.copy() | ||||
|         new_kwargs["topic"] = topic | ||||
|         new_kwargs["value"] = value | ||||
| 
 | ||||
|         return ConfluentKafkaInstrumentor.wrap_produce( | ||||
|             self._producer.produce, self, self._tracer, args, new_kwargs | ||||
|         ) | ||||
| 
 | ||||
|     def original_producer(self): | ||||
|         return self._producer | ||||
| 
 | ||||
| 
 | ||||
| class ProxiedConsumer(Consumer): | ||||
|     def __init__(self, consumer: Consumer, tracer: Tracer): | ||||
|         self._consumer = consumer | ||||
|         self._tracer = tracer | ||||
|         self._current_consume_span = None | ||||
|         self._current_context_token = None | ||||
| 
 | ||||
|     def committed(self, partitions, timeout=-1): | ||||
|         return self._consumer.committed(partitions, timeout) | ||||
| 
 | ||||
|     def consume( | ||||
|         self, num_messages=1, *args, **kwargs | ||||
|     ):  # pylint: disable=keyword-arg-before-vararg | ||||
|         return self._consumer.consume(num_messages, *args, **kwargs) | ||||
| 
 | ||||
|     def get_watermark_offsets( | ||||
|         self, partition, timeout=-1, *args, **kwargs | ||||
|     ):  # pylint: disable=keyword-arg-before-vararg | ||||
|         return self._consumer.get_watermark_offsets( | ||||
|             partition, timeout, *args, **kwargs | ||||
|         ) | ||||
| 
 | ||||
|     def offsets_for_times(self, partitions, timeout=-1): | ||||
|         return self._consumer.offsets_for_times(partitions, timeout) | ||||
| 
 | ||||
|     def poll(self, timeout=-1): | ||||
|         return ConfluentKafkaInstrumentor.wrap_poll( | ||||
|             self._consumer.poll, self, self._tracer, [timeout], {} | ||||
|         ) | ||||
| 
 | ||||
|     def subscribe( | ||||
|         self, topics, on_assign=lambda *args: None, *args, **kwargs | ||||
|     ):  # pylint: disable=keyword-arg-before-vararg | ||||
|         self._consumer.subscribe(topics, on_assign, *args, **kwargs) | ||||
| 
 | ||||
|     def original_consumer(self): | ||||
|         return self._consumer | ||||
| 
 | ||||
| 
 | ||||
| class ConfluentKafkaInstrumentor(BaseInstrumentor): | ||||
|     """An instrumentor for confluent kafka module | ||||
|     See `BaseInstrumentor` | ||||
|     """ | ||||
| 
 | ||||
|     # pylint: disable=attribute-defined-outside-init | ||||
|     @staticmethod | ||||
|     def instrument_producer( | ||||
|         producer: Producer, tracer_provider=None | ||||
|     ) -> ProxiedProducer: | ||||
|         tracer = trace.get_tracer( | ||||
|             __name__, __version__, tracer_provider=tracer_provider | ||||
|         ) | ||||
| 
 | ||||
|         manual_producer = ProxiedProducer(producer, tracer) | ||||
| 
 | ||||
|         return manual_producer | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def instrument_consumer( | ||||
|         consumer: Consumer, tracer_provider=None | ||||
|     ) -> ProxiedConsumer: | ||||
|         tracer = trace.get_tracer( | ||||
|             __name__, __version__, tracer_provider=tracer_provider | ||||
|         ) | ||||
| 
 | ||||
|         manual_consumer = ProxiedConsumer(consumer, tracer) | ||||
| 
 | ||||
|         return manual_consumer | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def uninstrument_producer(producer: Producer) -> Producer: | ||||
|         if isinstance(producer, ProxiedProducer): | ||||
|             return producer.original_producer() | ||||
|         return producer | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def uninstrument_consumer(consumer: Consumer) -> Consumer: | ||||
|         if isinstance(consumer, ProxiedConsumer): | ||||
|             return consumer.original_consumer() | ||||
|         return consumer | ||||
| 
 | ||||
|     def instrumentation_dependencies(self) -> Collection[str]: | ||||
|         return _instruments | ||||
| 
 | ||||
|     def _instrument(self, **kwargs): | ||||
|         self._original_kafka_producer = confluent_kafka.Producer | ||||
|         self._original_kafka_consumer = confluent_kafka.Consumer | ||||
| 
 | ||||
|         confluent_kafka.Producer = AutoInstrumentedProducer | ||||
|         confluent_kafka.Consumer = AutoInstrumentedConsumer | ||||
| 
 | ||||
|         tracer_provider = kwargs.get("tracer_provider") | ||||
|         tracer = trace.get_tracer( | ||||
|             __name__, __version__, tracer_provider=tracer_provider | ||||
|         ) | ||||
| 
 | ||||
|         self._tracer = tracer | ||||
| 
 | ||||
|         def _inner_wrap_produce(func, instance, args, kwargs): | ||||
|             return ConfluentKafkaInstrumentor.wrap_produce( | ||||
|                 func, instance, self._tracer, args, kwargs | ||||
|             ) | ||||
| 
 | ||||
|         def _inner_wrap_poll(func, instance, args, kwargs): | ||||
|             return ConfluentKafkaInstrumentor.wrap_poll( | ||||
|                 func, instance, self._tracer, args, kwargs | ||||
|             ) | ||||
| 
 | ||||
|         wrapt.wrap_function_wrapper( | ||||
|             AutoInstrumentedProducer, | ||||
|             "produce", | ||||
|             _inner_wrap_produce, | ||||
|         ) | ||||
| 
 | ||||
|         wrapt.wrap_function_wrapper( | ||||
|             AutoInstrumentedConsumer, | ||||
|             "poll", | ||||
|             _inner_wrap_poll, | ||||
|         ) | ||||
| 
 | ||||
|     def _uninstrument(self, **kwargs): | ||||
|         confluent_kafka.Producer = self._original_kafka_producer | ||||
|         confluent_kafka.Consumer = self._original_kafka_consumer | ||||
| 
 | ||||
|         unwrap(AutoInstrumentedProducer, "produce") | ||||
|         unwrap(AutoInstrumentedConsumer, "poll") | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def wrap_produce(func, instance, tracer, args, kwargs): | ||||
|         topic = kwargs.get("topic") | ||||
|         if not topic: | ||||
|             topic = args[0] | ||||
| 
 | ||||
|         span_name = _get_span_name("send", topic) | ||||
|         with tracer.start_as_current_span( | ||||
|             name=span_name, kind=trace.SpanKind.PRODUCER | ||||
|         ) as span: | ||||
|             headers = KafkaPropertiesExtractor.extract_produce_headers( | ||||
|                 args, kwargs | ||||
|             ) | ||||
|             if headers is None: | ||||
|                 headers = [] | ||||
|                 kwargs["headers"] = headers | ||||
| 
 | ||||
|             topic = KafkaPropertiesExtractor.extract_produce_topic(args) | ||||
|             _enrich_span( | ||||
|                 span, | ||||
|                 topic, | ||||
|                 operation=MessagingOperationValues.RECEIVE, | ||||
|             )  # Replace | ||||
|             propagate.inject( | ||||
|                 headers, | ||||
|                 setter=_kafka_setter, | ||||
|             ) | ||||
|             return func(*args, **kwargs) | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def wrap_poll(func, instance, tracer, args, kwargs): | ||||
|         if instance._current_consume_span: | ||||
|             context.detach(instance._current_context_token) | ||||
|             instance._current_context_token = None | ||||
|             instance._current_consume_span.end() | ||||
|             instance._current_consume_span = None | ||||
| 
 | ||||
|         with tracer.start_as_current_span( | ||||
|             "recv", end_on_exit=True, kind=trace.SpanKind.CONSUMER | ||||
|         ): | ||||
|             record = func(*args, **kwargs) | ||||
|             if record: | ||||
|                 links = [] | ||||
|                 ctx = propagate.extract(record.headers(), getter=_kafka_getter) | ||||
|                 if ctx: | ||||
|                     for item in ctx.values(): | ||||
|                         if hasattr(item, "get_span_context"): | ||||
|                             links.append(Link(context=item.get_span_context())) | ||||
| 
 | ||||
|                 instance._current_consume_span = tracer.start_span( | ||||
|                     name=f"{record.topic()} process", | ||||
|                     links=links, | ||||
|                     kind=SpanKind.CONSUMER, | ||||
|                 ) | ||||
| 
 | ||||
|                 _enrich_span( | ||||
|                     instance._current_consume_span, | ||||
|                     record.topic(), | ||||
|                     record.partition(), | ||||
|                     record.offset(), | ||||
|                     operation=MessagingOperationValues.PROCESS, | ||||
|                 ) | ||||
|         instance._current_context_token = context.attach( | ||||
|             trace.set_span_in_context(instance._current_consume_span) | ||||
|         ) | ||||
| 
 | ||||
|         return record | ||||
|  | @ -0,0 +1,16 @@ | |||
| # Copyright The OpenTelemetry Authors | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| 
 | ||||
| _instruments = ("confluent-kafka ~= 1.8.2",) | ||||
|  | @ -0,0 +1,109 @@ | |||
| from logging import getLogger | ||||
| from typing import List, Optional | ||||
| 
 | ||||
| from opentelemetry.propagators import textmap | ||||
| from opentelemetry.semconv.trace import ( | ||||
|     MessagingDestinationKindValues, | ||||
|     MessagingOperationValues, | ||||
|     SpanAttributes, | ||||
| ) | ||||
| 
 | ||||
| _LOG = getLogger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| class KafkaPropertiesExtractor: | ||||
|     @staticmethod | ||||
|     def extract_bootstrap_servers(instance): | ||||
|         return instance.config.get("bootstrap_servers") | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def _extract_argument(key, position, default_value, args, kwargs): | ||||
|         if len(args) > position: | ||||
|             return args[position] | ||||
|         return kwargs.get(key, default_value) | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def extract_produce_topic(args): | ||||
|         """extract topic from `produce` method arguments in Producer class""" | ||||
|         if len(args) > 0: | ||||
|             return args[0] | ||||
|         return "unknown" | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def extract_produce_headers(args, kwargs): | ||||
|         """extract headers from `produce` method arguments in Producer class""" | ||||
|         return KafkaPropertiesExtractor._extract_argument( | ||||
|             "headers", 6, None, args, kwargs | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| class KafkaContextGetter(textmap.Getter): | ||||
|     def get(self, carrier: textmap.CarrierT, key: str) -> Optional[List[str]]: | ||||
|         if carrier is None: | ||||
|             return None | ||||
|         for item_key, value in carrier: | ||||
|             if item_key == key: | ||||
|                 if value is not None: | ||||
|                     return [value.decode()] | ||||
|         return None | ||||
| 
 | ||||
|     def keys(self, carrier: textmap.CarrierT) -> List[str]: | ||||
|         if carrier is None: | ||||
|             return [] | ||||
|         return [key for (key, value) in carrier] | ||||
| 
 | ||||
| 
 | ||||
| class KafkaContextSetter(textmap.Setter): | ||||
|     def set(self, carrier: textmap.CarrierT, key: str, value: str) -> None: | ||||
|         if carrier is None or key is None: | ||||
|             return | ||||
| 
 | ||||
|         if value: | ||||
|             value = value.encode() | ||||
|         carrier.append((key, value)) | ||||
| 
 | ||||
| 
 | ||||
| _kafka_getter = KafkaContextGetter() | ||||
| 
 | ||||
| 
 | ||||
| def _enrich_span( | ||||
|     span, | ||||
|     topic, | ||||
|     partition: Optional[int] = None, | ||||
|     offset: Optional[int] = None, | ||||
|     operation: Optional[MessagingOperationValues] = None, | ||||
| ): | ||||
| 
 | ||||
|     if not span.is_recording(): | ||||
|         return | ||||
| 
 | ||||
|     span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka") | ||||
|     span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic) | ||||
| 
 | ||||
|     if partition: | ||||
|         span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition) | ||||
| 
 | ||||
|     span.set_attribute( | ||||
|         SpanAttributes.MESSAGING_DESTINATION_KIND, | ||||
|         MessagingDestinationKindValues.QUEUE.value, | ||||
|     ) | ||||
| 
 | ||||
|     if operation: | ||||
|         span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value) | ||||
|     else: | ||||
|         span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) | ||||
| 
 | ||||
|     # https://stackoverflow.com/questions/65935155/identify-and-find-specific-message-in-kafka-topic | ||||
|     # A message within Kafka is uniquely defined by its topic name, topic partition and offset. | ||||
|     if partition and offset and topic: | ||||
|         span.set_attribute( | ||||
|             SpanAttributes.MESSAGING_MESSAGE_ID, | ||||
|             f"{topic}.{partition}.{offset}", | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| _kafka_setter = KafkaContextSetter() | ||||
| 
 | ||||
| 
 | ||||
| def _get_span_name(operation: str, topic: str): | ||||
|     return f"{topic} {operation}" | ||||
|  | @ -0,0 +1,15 @@ | |||
| # Copyright The OpenTelemetry Authors | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| __version__ = "0.31b0" | ||||
|  | @ -0,0 +1,60 @@ | |||
| # Copyright The OpenTelemetry Authors | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| #     http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| 
 | ||||
| # pylint: disable=no-name-in-module | ||||
| 
 | ||||
| from unittest import TestCase | ||||
| 
 | ||||
| from confluent_kafka import Consumer, Producer | ||||
| 
 | ||||
| from opentelemetry.instrumentation.confluent_kafka import ( | ||||
|     ConfluentKafkaInstrumentor, | ||||
|     ProxiedConsumer, | ||||
|     ProxiedProducer, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| class TestConfluentKafka(TestCase): | ||||
|     def test_instrument_api(self) -> None: | ||||
|         instrumentation = ConfluentKafkaInstrumentor() | ||||
| 
 | ||||
|         producer = Producer({"bootstrap.servers": "localhost:29092"}) | ||||
|         producer = instrumentation.instrument_producer(producer) | ||||
| 
 | ||||
|         self.assertEqual(producer.__class__, ProxiedProducer) | ||||
| 
 | ||||
|         producer = instrumentation.uninstrument_producer(producer) | ||||
|         self.assertEqual(producer.__class__, Producer) | ||||
| 
 | ||||
|         producer = Producer({"bootstrap.servers": "localhost:29092"}) | ||||
|         producer = instrumentation.instrument_producer(producer) | ||||
| 
 | ||||
|         self.assertEqual(producer.__class__, ProxiedProducer) | ||||
| 
 | ||||
|         producer = instrumentation.uninstrument_producer(producer) | ||||
|         self.assertEqual(producer.__class__, Producer) | ||||
| 
 | ||||
|         consumer = Consumer( | ||||
|             { | ||||
|                 "bootstrap.servers": "localhost:29092", | ||||
|                 "group.id": "mygroup", | ||||
|                 "auto.offset.reset": "earliest", | ||||
|             } | ||||
|         ) | ||||
| 
 | ||||
|         consumer = instrumentation.instrument_consumer(consumer) | ||||
|         self.assertEqual(consumer.__class__, ProxiedConsumer) | ||||
| 
 | ||||
|         consumer = instrumentation.uninstrument_consumer(consumer) | ||||
|         self.assertEqual(consumer.__class__, Consumer) | ||||
|  | @ -37,6 +37,7 @@ install_requires = | |||
| 	opentelemetry-instrumentation-boto3sqs==0.31b0 | ||||
| 	opentelemetry-instrumentation-botocore==0.31b0 | ||||
| 	opentelemetry-instrumentation-celery==0.31b0 | ||||
| 	opentelemetry-instrumentation-confluent-kafka==0.31b0 | ||||
| 	opentelemetry-instrumentation-dbapi==0.31b0 | ||||
| 	opentelemetry-instrumentation-django==0.31b0 | ||||
| 	opentelemetry-instrumentation-elasticsearch==0.31b0 | ||||
|  |  | |||
|  | @ -48,6 +48,10 @@ libraries = { | |||
|         "library": "celery >= 4.0, < 6.0", | ||||
|         "instrumentation": "opentelemetry-instrumentation-celery==0.31b0", | ||||
|     }, | ||||
|     "confluent-kafka": { | ||||
|         "library": "confluent-kafka ~= 1.8.2", | ||||
|         "instrumentation": "opentelemetry-instrumentation-confluent-kafka==0.31b0", | ||||
|     }, | ||||
|     "django": { | ||||
|         "library": "django >= 1.10", | ||||
|         "instrumentation": "opentelemetry-instrumentation-django==0.31b0", | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue