feat: support batch (getmany) in aiokafka instrumentation (#3257)

* feat: support batch (getmany) in aiokafka instrumentation

* test: fix unclosed resources and typing

* test: add test_wrap_getmany

* fix: get unique topic list in batch

* fix: update typing, run pyupgrade

* fix: remove json.dumps from SERVER_ADDRESS attribute

* fix pylint

* fix: sync span_kind with spec

* add CHANGELOG entry

* remove changes not from this issue

* move types under TYPE_CHECKING

* move CHANGELOG entry to unreleased

* enable pyright for aiokafka, fix key type

* Update CHANGELOG.md

---------

Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
Co-authored-by: Emídio Neto <9735060+emdneto@users.noreply.github.com>
This commit is contained in:
Dmitriy 2025-06-03 18:52:00 +05:00 committed by GitHub
parent b9a78e7475
commit 701d65b022
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 615 additions and 112 deletions

View File

@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased
### Added
- `opentelemetry-instrumentation-aiokafka` Add instrumentation of `consumer.getmany` (batch)
([#3257](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3257))
### Fixed
- `opentelemetry-instrumentation-system-metrics`: fix loading on Google Cloud Run

View File

@ -28,6 +28,7 @@ dependencies = [
"opentelemetry-api ~= 1.27",
"opentelemetry-instrumentation == 0.55b0.dev",
"opentelemetry-semantic-conventions == 0.55b0.dev",
"typing_extensions ~= 4.1",
]
[project.optional-dependencies]

View File

@ -93,15 +93,20 @@ API
___
"""
from __future__ import annotations
from asyncio import iscoroutinefunction
from typing import Collection
from typing import TYPE_CHECKING, Collection
import aiokafka
from wrapt import wrap_function_wrapper
from wrapt import (
wrap_function_wrapper, # type: ignore[reportUnknownVariableType]
)
from opentelemetry import trace
from opentelemetry.instrumentation.aiokafka.package import _instruments
from opentelemetry.instrumentation.aiokafka.utils import (
_wrap_getmany,
_wrap_getone,
_wrap_send,
)
@ -110,6 +115,21 @@ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.schemas import Schemas
if TYPE_CHECKING:
from typing import TypedDict
from typing_extensions import Unpack
from .utils import ConsumeHookT, ProduceHookT
class InstrumentKwargs(TypedDict, total=False):
tracer_provider: trace.TracerProvider
async_produce_hook: ProduceHookT
async_consume_hook: ConsumeHookT
class UninstrumentKwargs(TypedDict, total=False):
pass
class AIOKafkaInstrumentor(BaseInstrumentor):
"""An instrumentor for kafka module
@ -119,7 +139,7 @@ class AIOKafkaInstrumentor(BaseInstrumentor):
def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
def _instrument(self, **kwargs):
def _instrument(self, **kwargs: Unpack[InstrumentKwargs]):
"""Instruments the kafka module
Args:
@ -155,7 +175,13 @@ class AIOKafkaInstrumentor(BaseInstrumentor):
"getone",
_wrap_getone(tracer, async_consume_hook),
)
wrap_function_wrapper(
aiokafka.AIOKafkaConsumer,
"getmany",
_wrap_getmany(tracer, async_consume_hook),
)
def _uninstrument(self, **kwargs):
def _uninstrument(self, **kwargs: Unpack[UninstrumentKwargs]):
unwrap(aiokafka.AIOKafkaProducer, "send")
unwrap(aiokafka.AIOKafkaConsumer, "getone")
unwrap(aiokafka.AIOKafkaConsumer, "getmany")

View File

@ -1,9 +1,24 @@
from __future__ import annotations
import asyncio
import contextlib
import json
from logging import getLogger
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple, Union
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
MutableSequence,
Optional,
Protocol,
Sequence,
Tuple,
cast,
)
import aiokafka
from aiokafka import ConsumerRecord
from opentelemetry import context, propagate, trace
from opentelemetry.context import Context
@ -13,12 +28,58 @@ from opentelemetry.semconv.attributes import server_attributes
from opentelemetry.trace import Tracer
from opentelemetry.trace.span import Span
if TYPE_CHECKING:
from aiokafka.structs import RecordMetadata
class AIOKafkaGetOneProto(Protocol):
async def __call__(
self, *partitions: aiokafka.TopicPartition
) -> aiokafka.ConsumerRecord[object, object]: ...
class AIOKafkaGetManyProto(Protocol):
async def __call__(
self,
*partitions: aiokafka.TopicPartition,
timeout_ms: int = 0,
max_records: int | None = None,
) -> dict[
aiokafka.TopicPartition,
list[aiokafka.ConsumerRecord[object, object]],
]: ...
class AIOKafkaSendProto(Protocol):
async def __call__(
self,
topic: str,
value: object | None = None,
key: object | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: HeadersT | None = None,
) -> asyncio.Future[RecordMetadata]: ...
ProduceHookT = Callable[
[Span, Tuple[Any, ...], Dict[str, Any]], Awaitable[None]
]
ConsumeHookT = Callable[
[
Span,
aiokafka.ConsumerRecord[object, object],
Tuple[aiokafka.TopicPartition, ...],
Dict[str, Any],
],
Awaitable[None],
]
HeadersT = Sequence[Tuple[str, Optional[bytes]]]
_LOG = getLogger(__name__)
def _extract_bootstrap_servers(
client: aiokafka.AIOKafkaClient,
) -> Union[str, List[str]]:
) -> str | list[str]:
return client._bootstrap_servers
@ -28,42 +89,44 @@ def _extract_client_id(client: aiokafka.AIOKafkaClient) -> str:
def _extract_consumer_group(
consumer: aiokafka.AIOKafkaConsumer,
) -> Optional[str]:
return consumer._group_id
) -> str | None:
return consumer._group_id # type: ignore[reportUnknownVariableType]
def _extract_argument(
key: str,
position: int,
default_value: Any,
args: Tuple[Any],
kwargs: Dict[str, Any],
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> Any:
if len(args) > position:
return args[position]
return kwargs.get(key, default_value)
def _extract_send_topic(args: Tuple[Any], kwargs: Dict[str, Any]) -> str:
def _extract_send_topic(args: tuple[Any, ...], kwargs: dict[str, Any]) -> str:
"""extract topic from `send` method arguments in AIOKafkaProducer class"""
return _extract_argument("topic", 0, "unknown", args, kwargs)
def _extract_send_value(
args: Tuple[Any], kwargs: Dict[str, Any]
) -> Optional[Any]:
args: tuple[Any, ...], kwargs: dict[str, Any]
) -> object | None:
"""extract value from `send` method arguments in AIOKafkaProducer class"""
return _extract_argument("value", 1, None, args, kwargs)
def _extract_send_key(
args: Tuple[Any], kwargs: Dict[str, Any]
) -> Optional[Any]:
args: tuple[Any, ...], kwargs: dict[str, Any]
) -> object | None:
"""extract key from `send` method arguments in AIOKafkaProducer class"""
return _extract_argument("key", 2, None, args, kwargs)
def _extract_send_headers(args: Tuple[Any], kwargs: Dict[str, Any]):
def _extract_send_headers(
args: tuple[Any, ...], kwargs: dict[str, Any]
) -> HeadersT | None:
"""extract headers from `send` method arguments in AIOKafkaProducer class"""
return _extract_argument("headers", 5, None, args, kwargs)
@ -77,18 +140,32 @@ def _move_headers_to_kwargs(
return args[:5], kwargs
def _deserialize_key(key: object | None) -> str | None:
if key is None:
return None
if isinstance(key, bytes):
with contextlib.suppress(UnicodeDecodeError):
return key.decode()
return str(key)
async def _extract_send_partition(
instance: aiokafka.AIOKafkaProducer,
args: Tuple[Any],
kwargs: Dict[str, Any],
) -> Optional[int]:
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> int | None:
"""extract partition `send` method arguments, using the `_partition` method in AIOKafkaProducer class"""
try:
topic = _extract_send_topic(args, kwargs)
key = _extract_send_key(args, kwargs)
value = _extract_send_value(args, kwargs)
partition = _extract_argument("partition", 3, None, args, kwargs)
key_bytes, value_bytes = instance._serialize(topic, key, value)
key_bytes, value_bytes = cast(
"tuple[bytes | None, bytes | None]",
instance._serialize(topic, key, value), # type: ignore[reportUnknownMemberType]
)
valid_types = (bytes, bytearray, memoryview, type(None))
if (
type(key_bytes) not in valid_types
@ -96,9 +173,9 @@ async def _extract_send_partition(
):
return None
await instance.client._wait_on_metadata(topic)
await instance.client._wait_on_metadata(topic) # type: ignore[reportUnknownMemberType]
return instance._partition(
return instance._partition( # type: ignore[reportUnknownMemberType]
topic, partition, key, value, key_bytes, value_bytes
)
except Exception as exception: # pylint: disable=W0703
@ -106,36 +183,29 @@ async def _extract_send_partition(
return None
ProduceHookT = Optional[Callable[[Span, Tuple, Dict], Awaitable[None]]]
ConsumeHookT = Optional[
Callable[[Span, ConsumerRecord, Tuple, Dict], Awaitable[None]]
]
HeadersT = List[Tuple[str, Optional[bytes]]]
class AIOKafkaContextGetter(textmap.Getter[HeadersT]):
def get(self, carrier: HeadersT, key: str) -> Optional[List[str]]:
if carrier is None:
return None
class AIOKafkaContextGetter(textmap.Getter["HeadersT"]):
def get(self, carrier: HeadersT, key: str) -> list[str] | None:
for item_key, value in carrier:
if item_key == key:
if value is not None:
return [value.decode()]
return None
def keys(self, carrier: HeadersT) -> List[str]:
if carrier is None:
return []
return [key for (key, value) in carrier]
def keys(self, carrier: HeadersT) -> list[str]:
return [key for (key, _) in carrier]
class AIOKafkaContextSetter(textmap.Setter[HeadersT]):
class AIOKafkaContextSetter(textmap.Setter["HeadersT"]):
def set(
self, carrier: HeadersT, key: Optional[str], value: Optional[str]
self, carrier: HeadersT, key: str | None, value: str | None
) -> None:
if carrier is None or key is None:
if key is None:
return
if not isinstance(carrier, MutableSequence):
_LOG.warning(
"Unable to set context in headers. Headers is immutable"
)
return
if value is not None:
@ -151,11 +221,11 @@ _aiokafka_setter = AIOKafkaContextSetter()
def _enrich_base_span(
span: Span,
*,
bootstrap_servers: Union[str, List[str]],
bootstrap_servers: str | list[str],
client_id: str,
topic: str,
partition: Optional[int],
key: Optional[Any],
partition: int | None,
key: str | None,
) -> None:
span.set_attribute(
messaging_attributes.MESSAGING_SYSTEM,
@ -182,11 +252,11 @@ def _enrich_base_span(
def _enrich_send_span(
span: Span,
*,
bootstrap_servers: Union[str, List[str]],
bootstrap_servers: str | list[str],
client_id: str,
topic: str,
partition: Optional[int],
key: Optional[str],
partition: int | None,
key: str | None,
) -> None:
if not span.is_recording():
return
@ -207,15 +277,15 @@ def _enrich_send_span(
)
def _enrich_anext_span(
def _enrich_getone_span(
span: Span,
*,
bootstrap_servers: Union[str, List[str]],
bootstrap_servers: str | list[str],
client_id: str,
consumer_group: Optional[str],
consumer_group: str | None,
topic: str,
partition: Optional[int],
key: Optional[str],
partition: int | None,
key: str | None,
offset: int,
) -> None:
if not span.is_recording():
@ -256,21 +326,99 @@ def _enrich_anext_span(
)
def _enrich_getmany_poll_span(
span: Span,
*,
bootstrap_servers: str | list[str],
client_id: str,
consumer_group: str | None,
message_count: int,
) -> None:
if not span.is_recording():
return
span.set_attribute(
messaging_attributes.MESSAGING_SYSTEM,
messaging_attributes.MessagingSystemValues.KAFKA.value,
)
span.set_attribute(
server_attributes.SERVER_ADDRESS, json.dumps(bootstrap_servers)
)
span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id)
if consumer_group is not None:
span.set_attribute(
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group
)
span.set_attribute(
messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT, message_count
)
span.set_attribute(
messaging_attributes.MESSAGING_OPERATION_NAME, "receive"
)
span.set_attribute(
messaging_attributes.MESSAGING_OPERATION_TYPE,
messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
)
def _enrich_getmany_topic_span(
span: Span,
*,
bootstrap_servers: str | list[str],
client_id: str,
consumer_group: str | None,
topic: str,
partition: int,
message_count: int,
) -> None:
if not span.is_recording():
return
_enrich_base_span(
span,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
topic=topic,
partition=partition,
key=None,
)
if consumer_group is not None:
span.set_attribute(
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group
)
span.set_attribute(
messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT, message_count
)
span.set_attribute(
messaging_attributes.MESSAGING_OPERATION_NAME, "receive"
)
span.set_attribute(
messaging_attributes.MESSAGING_OPERATION_TYPE,
messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
)
def _get_span_name(operation: str, topic: str):
return f"{topic} {operation}"
def _wrap_send(
tracer: Tracer, async_produce_hook: ProduceHookT
) -> Callable[..., Awaitable[None]]:
def _wrap_send( # type: ignore[reportUnusedFunction]
tracer: Tracer, async_produce_hook: ProduceHookT | None
) -> Callable[..., Awaitable[asyncio.Future[RecordMetadata]]]:
async def _traced_send(
func: Callable[..., Awaitable[None]],
func: AIOKafkaSendProto,
instance: aiokafka.AIOKafkaProducer,
args: Tuple[Any],
kwargs: Dict[str, Any],
) -> None:
args: tuple[Any, ...],
kwargs: dict[str, Any],
) -> asyncio.Future[RecordMetadata]:
args, kwargs = _move_headers_to_kwargs(args, kwargs)
headers = _extract_send_headers(args, kwargs)
headers: HeadersT | None = _extract_send_headers(args, kwargs)
if headers is None:
headers = []
kwargs["headers"] = headers
@ -278,7 +426,7 @@ def _wrap_send(
topic = _extract_send_topic(args, kwargs)
bootstrap_servers = _extract_bootstrap_servers(instance.client)
client_id = _extract_client_id(instance.client)
key = _extract_send_key(args, kwargs)
key = _deserialize_key(_extract_send_key(args, kwargs))
partition = await _extract_send_partition(instance, args, kwargs)
span_name = _get_span_name("send", topic)
with tracer.start_as_current_span(
@ -310,15 +458,15 @@ def _wrap_send(
async def _create_consumer_span(
tracer: Tracer,
async_consume_hook: ConsumeHookT,
record: ConsumerRecord,
async_consume_hook: ConsumeHookT | None,
record: aiokafka.ConsumerRecord[object, object],
extracted_context: Context,
bootstrap_servers: Union[str, List[str]],
bootstrap_servers: str | list[str],
client_id: str,
consumer_group: Optional[str],
args: Tuple[Any],
kwargs: Dict[str, Any],
):
consumer_group: str | None,
args: tuple[aiokafka.TopicPartition, ...],
kwargs: dict[str, Any],
) -> trace.Span:
span_name = _get_span_name("receive", record.topic)
with tracer.start_as_current_span(
span_name,
@ -327,14 +475,14 @@ async def _create_consumer_span(
) as span:
new_context = trace.set_span_in_context(span, extracted_context)
token = context.attach(new_context)
_enrich_anext_span(
_enrich_getone_span(
span,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
consumer_group=consumer_group,
topic=record.topic,
partition=record.partition,
key=record.key,
key=_deserialize_key(record.key),
offset=record.offset,
)
try:
@ -344,16 +492,18 @@ async def _create_consumer_span(
_LOG.exception(hook_exception)
context.detach(token)
return span
def _wrap_getone(
tracer: Tracer, async_consume_hook: ConsumeHookT
) -> Callable[..., Awaitable[aiokafka.ConsumerRecord]]:
async def _traced_next(
func: Callable[..., Awaitable[aiokafka.ConsumerRecord]],
def _wrap_getone( # type: ignore[reportUnusedFunction]
tracer: Tracer, async_consume_hook: ConsumeHookT | None
) -> Callable[..., Awaitable[aiokafka.ConsumerRecord[object, object]]]:
async def _traced_getone(
func: AIOKafkaGetOneProto,
instance: aiokafka.AIOKafkaConsumer,
args: Tuple[Any],
kwargs: Dict[str, Any],
) -> aiokafka.ConsumerRecord:
args: tuple[aiokafka.TopicPartition, ...],
kwargs: dict[str, Any],
) -> aiokafka.ConsumerRecord[object, object]:
record = await func(*args, **kwargs)
if record:
@ -377,4 +527,81 @@ def _wrap_getone(
)
return record
return _traced_next
return _traced_getone
def _wrap_getmany( # type: ignore[reportUnusedFunction]
tracer: Tracer, async_consume_hook: ConsumeHookT | None
) -> Callable[
...,
Awaitable[
dict[
aiokafka.TopicPartition,
list[aiokafka.ConsumerRecord[object, object]],
]
],
]:
async def _traced_getmany(
func: AIOKafkaGetManyProto,
instance: aiokafka.AIOKafkaConsumer,
args: tuple[aiokafka.TopicPartition, ...],
kwargs: dict[str, Any],
) -> dict[
aiokafka.TopicPartition, list[aiokafka.ConsumerRecord[object, object]]
]:
records = await func(*args, **kwargs)
if records:
bootstrap_servers = _extract_bootstrap_servers(instance._client)
client_id = _extract_client_id(instance._client)
consumer_group = _extract_consumer_group(instance)
span_name = _get_span_name(
"receive",
", ".join(sorted({topic.topic for topic in records.keys()})),
)
with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CONSUMER
) as poll_span:
_enrich_getmany_poll_span(
poll_span,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
consumer_group=consumer_group,
message_count=sum(len(r) for r in records.values()),
)
for topic, topic_records in records.items():
span_name = _get_span_name("receive", topic.topic)
with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.CONSUMER
) as topic_span:
_enrich_getmany_topic_span(
topic_span,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
consumer_group=consumer_group,
topic=topic.topic,
partition=topic.partition,
message_count=len(topic_records),
)
for record in topic_records:
extracted_context = propagate.extract(
record.headers, getter=_aiokafka_getter
)
record_span = await _create_consumer_span(
tracer,
async_consume_hook,
record,
extracted_context,
bootstrap_servers,
client_id,
consumer_group,
args,
kwargs,
)
topic_span.add_link(record_span.get_span_context())
return records
return _traced_getmany

View File

@ -12,10 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import uuid
from typing import Any, List, Sequence, Tuple
from typing import Any, Sequence, cast
from unittest import IsolatedAsyncioTestCase, TestCase, mock
import aiokafka
from aiokafka import (
AIOKafkaConsumer,
AIOKafkaProducer,
@ -44,6 +47,9 @@ class TestAIOKafkaInstrumentor(TestCase):
self.assertTrue(
isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper)
)
self.assertTrue(
isinstance(AIOKafkaConsumer.getmany, BoundFunctionWrapper)
)
instrumentation.uninstrument()
self.assertFalse(
@ -52,12 +58,15 @@ class TestAIOKafkaInstrumentor(TestCase):
self.assertFalse(
isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper)
)
self.assertFalse(
isinstance(AIOKafkaConsumer.getmany, BoundFunctionWrapper)
)
class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
@staticmethod
def consumer_record_factory(
number: int, headers: Tuple[Tuple[str, bytes], ...]
number: int, headers: tuple[tuple[str, bytes], ...]
) -> ConsumerRecord:
return ConsumerRecord(
f"topic_{number}",
@ -73,6 +82,34 @@ class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
headers=headers,
)
@staticmethod
def consumer_batch_factory(
*headers: tuple[tuple[str, bytes], ...],
) -> dict[aiokafka.TopicPartition, list[aiokafka.ConsumerRecord]]:
records = {}
for number, record_headers in enumerate(headers, start=1):
records[
aiokafka.TopicPartition(
topic=f"topic_{number}", partition=number
)
] = [
ConsumerRecord(
f"topic_{number}",
number,
number,
number,
number,
f"key_{number}".encode(),
f"value_{number}".encode(),
None,
number,
number,
headers=record_headers,
)
]
return records
@staticmethod
async def consumer_factory(**consumer_kwargs: Any) -> AIOKafkaConsumer:
consumer = AIOKafkaConsumer(**consumer_kwargs)
@ -83,6 +120,7 @@ class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
await consumer.start()
consumer._fetcher.next_record = mock.AsyncMock()
consumer._fetcher.fetched_records = mock.AsyncMock()
return consumer
@ -100,16 +138,22 @@ class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
return producer
async def test_getone(self) -> None:
AIOKafkaInstrumentor().uninstrument()
def setUp(self):
super().setUp()
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)
def tearDown(self):
super().tearDown()
AIOKafkaInstrumentor().uninstrument()
async def test_getone(self) -> None:
client_id = str(uuid.uuid4())
group_id = str(uuid.uuid4())
consumer = await self.consumer_factory(
client_id=client_id, group_id=group_id
)
next_record_mock: mock.AsyncMock = consumer._fetcher.next_record
self.addAsyncCleanup(consumer.stop)
next_record_mock = cast(mock.AsyncMock, consumer._fetcher.next_record)
expected_spans = [
{
@ -191,7 +235,8 @@ class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
)
consumer = await self.consumer_factory()
next_record_mock: mock.AsyncMock = consumer._fetcher.next_record
self.addAsyncCleanup(consumer.stop)
next_record_mock = cast(mock.AsyncMock, consumer._fetcher.next_record)
self.memory_exporter.clear()
@ -223,7 +268,8 @@ class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
)
consumer = await self.consumer_factory()
next_record_mock: mock.AsyncMock = consumer._fetcher.next_record
self.addAsyncCleanup(consumer.stop)
next_record_mock = cast(mock.AsyncMock, consumer._fetcher.next_record)
next_record_mock.side_effect = [
self.consumer_record_factory(1, headers=())
@ -233,13 +279,121 @@ class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
async_consume_hook_mock.assert_awaited_once()
async def test_send(self) -> None:
AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)
async def test_getmany(self) -> None:
client_id = str(uuid.uuid4())
group_id = str(uuid.uuid4())
consumer = await self.consumer_factory(
client_id=client_id, group_id=group_id
)
self.addAsyncCleanup(consumer.stop)
fetched_records_mock = cast(
mock.AsyncMock, consumer._fetcher.fetched_records
)
expected_spans = [
{
"name": "topic_1 receive",
"kind": SpanKind.CONSUMER,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: '"localhost"',
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1",
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_1",
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "receive",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 1,
messaging_attributes.MESSAGING_MESSAGE_ID: "topic_1.1.1",
},
},
{
"name": "topic_1 receive",
"kind": SpanKind.CONSUMER,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: '"localhost"',
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1",
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "receive",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT: 1,
},
},
{
"name": "topic_2 receive",
"kind": SpanKind.CONSUMER,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: '"localhost"',
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2",
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_2",
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "receive",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 2,
messaging_attributes.MESSAGING_MESSAGE_ID: "topic_2.2.2",
},
},
{
"name": "topic_2 receive",
"kind": SpanKind.CONSUMER,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: '"localhost"',
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2",
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "receive",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT: 1,
},
},
{
"name": "topic_1, topic_2 receive",
"kind": SpanKind.CONSUMER,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: '"localhost"',
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "receive",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_BATCH_MESSAGE_COUNT: 2,
},
},
]
self.memory_exporter.clear()
fetched_records_mock.side_effect = [
self.consumer_batch_factory(
(
(
"traceparent",
b"00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01",
),
),
(),
),
]
await consumer.getmany()
fetched_records_mock.assert_awaited_with((), 0.0, max_records=None)
span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)
async def test_send(self) -> None:
producer = await self.producer_factory()
add_message_mock: mock.AsyncMock = (
producer._message_accumulator.add_message
self.addAsyncCleanup(producer.stop)
add_message_mock = cast(
mock.AsyncMock, producer._message_accumulator.add_message
)
tracer = self.tracer_provider.get_tracer(__name__)
@ -269,12 +423,10 @@ class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
)
async def test_send_baggage(self) -> None:
AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)
producer = await self.producer_factory()
add_message_mock: mock.AsyncMock = (
producer._message_accumulator.add_message
self.addAsyncCleanup(producer.stop)
add_message_mock = cast(
mock.AsyncMock, producer._message_accumulator.add_message
)
tracer = self.tracer_provider.get_tracer(__name__)
@ -303,20 +455,23 @@ class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
)
producer = await self.producer_factory()
self.addAsyncCleanup(producer.stop)
await producer.send("topic_1", b"value_1")
async_produce_hook_mock.assert_awaited_once()
def _compare_spans(
self, spans: Sequence[ReadableSpan], expected_spans: List[dict]
self, spans: Sequence[ReadableSpan], expected_spans: list[dict]
) -> None:
self.assertEqual(len(spans), len(expected_spans))
for span, expected_span in zip(spans, expected_spans):
self.assertEqual(expected_span["name"], span.name)
self.assertEqual(expected_span["kind"], span.kind)
self.assertEqual(expected_span["name"], span.name, msg=span.name)
self.assertEqual(expected_span["kind"], span.kind, msg=span.name)
self.assertEqual(
expected_span["attributes"], dict(span.attributes)
expected_span["attributes"],
dict(span.attributes),
msg=span.name,
)
async def test_send_and_wait(self) -> None:

View File

@ -12,9 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=unnecessary-dunder-call
from __future__ import annotations
from unittest import IsolatedAsyncioTestCase, mock
import aiokafka
from opentelemetry.instrumentation.aiokafka.utils import (
AIOKafkaContextGetter,
AIOKafkaContextSetter,
@ -23,6 +26,7 @@ from opentelemetry.instrumentation.aiokafka.utils import (
_create_consumer_span,
_extract_send_partition,
_get_span_name,
_wrap_getmany,
_wrap_getone,
_wrap_send,
)
@ -42,7 +46,7 @@ class TestUtils(IsolatedAsyncioTestCase):
carrier_list = [("key1", b"val1")]
context_setter.set(carrier_list, "key2", "val2")
self.assertTrue(("key2", "val2".encode()) in carrier_list)
self.assertTrue(("key2", b"val2") in carrier_list)
def test_context_getter(self) -> None:
context_setter = AIOKafkaContextSetter()
@ -174,7 +178,7 @@ class TestUtils(IsolatedAsyncioTestCase):
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._extract_consumer_group"
)
async def test_wrap_next(
async def test_wrap_getone(
self,
extract_consumer_group: mock.MagicMock,
extract_client_id: mock.MagicMock,
@ -184,12 +188,12 @@ class TestUtils(IsolatedAsyncioTestCase):
) -> None:
tracer = mock.MagicMock()
consume_hook = mock.AsyncMock()
original_next_callback = mock.AsyncMock()
original_getone_callback = mock.AsyncMock()
kafka_consumer = mock.MagicMock()
wrapped_next = _wrap_getone(tracer, consume_hook)
record = await wrapped_next(
original_next_callback, kafka_consumer, self.args, self.kwargs
wrapped_getone = _wrap_getone(tracer, consume_hook)
record = await wrapped_getone(
original_getone_callback, kafka_consumer, self.args, self.kwargs
)
extract_bootstrap_servers.assert_called_once_with(
@ -203,10 +207,10 @@ class TestUtils(IsolatedAsyncioTestCase):
extract_consumer_group.assert_called_once_with(kafka_consumer)
consumer_group = extract_consumer_group.return_value
original_next_callback.assert_awaited_once_with(
original_getone_callback.assert_awaited_once_with(
*self.args, **self.kwargs
)
self.assertEqual(record, original_next_callback.return_value)
self.assertEqual(record, original_getone_callback.return_value)
extract.assert_called_once_with(
record.headers, getter=_aiokafka_getter
@ -225,10 +229,90 @@ class TestUtils(IsolatedAsyncioTestCase):
self.kwargs,
)
@mock.patch("opentelemetry.propagate.extract")
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._create_consumer_span"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._enrich_getmany_topic_span"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._enrich_getmany_poll_span"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._extract_client_id"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._extract_consumer_group"
)
# pylint: disable=too-many-locals
async def test_wrap_getmany(
self,
extract_consumer_group: mock.MagicMock,
extract_client_id: mock.MagicMock,
extract_bootstrap_servers: mock.MagicMock,
_enrich_getmany_poll_span: mock.MagicMock,
_enrich_getmany_topic_span: mock.MagicMock,
_create_consumer_span: mock.MagicMock,
extract: mock.MagicMock,
) -> None:
tracer = mock.MagicMock()
consume_hook = mock.AsyncMock()
record_mock = mock.MagicMock()
original_getmany_callback = mock.AsyncMock(
return_value={
aiokafka.TopicPartition(topic="topic_1", partition=0): [
record_mock
]
}
)
kafka_consumer = mock.MagicMock()
wrapped_getmany = _wrap_getmany(tracer, consume_hook)
records = await wrapped_getmany(
original_getmany_callback, kafka_consumer, self.args, self.kwargs
)
extract_bootstrap_servers.assert_called_once_with(
kafka_consumer._client
)
bootstrap_servers = extract_bootstrap_servers.return_value
extract_client_id.assert_called_once_with(kafka_consumer._client)
client_id = extract_client_id.return_value
extract_consumer_group.assert_called_once_with(kafka_consumer)
consumer_group = extract_consumer_group.return_value
original_getmany_callback.assert_awaited_once_with(
*self.args, **self.kwargs
)
self.assertEqual(records, original_getmany_callback.return_value)
extract.assert_called_once_with(
record_mock.headers, getter=_aiokafka_getter
)
context = extract.return_value
_create_consumer_span.assert_called_once_with(
tracer,
consume_hook,
record_mock,
context,
bootstrap_servers,
client_id,
consumer_group,
self.args,
self.kwargs,
)
@mock.patch("opentelemetry.trace.set_span_in_context")
@mock.patch("opentelemetry.context.attach")
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._enrich_anext_span"
"opentelemetry.instrumentation.aiokafka.utils._enrich_getone_span"
)
@mock.patch("opentelemetry.context.detach")
async def test_create_consumer_span(
@ -276,7 +360,7 @@ class TestUtils(IsolatedAsyncioTestCase):
consumer_group=consumer_group,
topic=record.topic,
partition=record.partition,
key=record.key,
key=str(record.key),
offset=record.offset,
)
consume_hook.assert_awaited_once_with(

View File

@ -194,6 +194,7 @@ pythonVersion = "3.9"
reportPrivateUsage = false # Ignore private attributes added by instrumentation packages.
# Add progressively instrumentation packages here.
include = [
"instrumentation/opentelemetry-instrumentation-aiokafka",
"instrumentation/opentelemetry-instrumentation-asyncclick",
"instrumentation/opentelemetry-instrumentation-threading",
"instrumentation-genai/opentelemetry-instrumentation-vertexai",
@ -201,6 +202,7 @@ include = [
# We should also add type hints to the test suite - It helps on finding bugs.
# We are excluding for now because it's easier, and more important to add to the instrumentation packages.
exclude = [
"instrumentation/opentelemetry-instrumentation-aiokafka/tests/**/*.py",
"instrumentation/opentelemetry-instrumentation-asyncclick/tests/**/*.py",
"instrumentation/opentelemetry-instrumentation-threading/tests/**",
"instrumentation-genai/opentelemetry-instrumentation-vertexai/tests/**/*.py",

View File

@ -1048,6 +1048,7 @@ deps =
{toxinidir}/util/opentelemetry-util-http
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-vertexai[instruments]
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-google-genai[instruments]
{toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka[instruments]
{toxinidir}/instrumentation/opentelemetry-instrumentation-asyncclick[instruments]
commands =

View File

@ -2275,6 +2275,7 @@ dependencies = [
{ name = "opentelemetry-api" },
{ name = "opentelemetry-instrumentation" },
{ name = "opentelemetry-semantic-conventions" },
{ name = "typing-extensions" },
]
[package.optional-dependencies]
@ -2288,6 +2289,7 @@ requires-dist = [
{ name = "opentelemetry-api", git = "https://github.com/open-telemetry/opentelemetry-python?subdirectory=opentelemetry-api&branch=main" },
{ name = "opentelemetry-instrumentation", editable = "opentelemetry-instrumentation" },
{ name = "opentelemetry-semantic-conventions", git = "https://github.com/open-telemetry/opentelemetry-python?subdirectory=opentelemetry-semantic-conventions&branch=main" },
{ name = "typing-extensions", specifier = "~=4.1" },
]
provides-extras = ["instruments"]