Improve aiokafka instrumentation examples (#3466)

This commit is contained in:
Andre Murbach Maidl 2025-05-07 06:46:13 -03:00 committed by GitHub
parent 5c76d04a35
commit 45797ec3a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 32 additions and 8 deletions

View File

@ -20,6 +20,7 @@ Usage
.. code:: python
import asyncio
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
@ -27,13 +28,27 @@ Usage
AIOKafkaInstrumentor().instrument()
# report a span of type producer with the default settings
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
await producer.send('my-topic', b'raw_bytes')
async def produce():
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
await producer.start()
try:
await producer.send_and_wait('my-topic', b'raw_bytes')
finally:
await producer.stop()
# report a span of type consumer with the default settings
consumer = AIOKafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
async for message in consumer:
# process message
async def consume():
consumer = AIOKafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
await consumer.start()
try:
async for message in consumer:
# process message
print(message)
finally:
await consumer.stop()
asyncio.run(produce())
asyncio.run(consume())
The _instrument() method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
@ -47,12 +62,14 @@ for example:
.. code:: python
from opentelemetry.instrumentation.kafka import AIOKafkaInstrumentor
import asyncio
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
async def async_produce_hook(span, args, kwargs):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_async_response_hook", "some-value")
async def async_consume_hook(span, record, args, kwargs):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_consume_hook", "some-value")
@ -62,8 +79,15 @@ for example:
# Using kafka as normal now will automatically generate spans,
# including user custom attributes added from the hooks
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
await producer.send('my-topic', b'raw_bytes')
async def produce():
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
await producer.start()
try:
await producer.send_and_wait('my-topic', b'raw_bytes')
finally:
await producer.stop()
asyncio.run(produce())
API
___