diff --git a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py index 48b2bd2c2..db422db7f 100644 --- a/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-confluent-kafka/src/opentelemetry/instrumentation/confluent_kafka/__init__.py @@ -34,6 +34,9 @@ Usage # report a span of type consumer with the default settings consumer = Consumer(conf2) + def msg_process(msg): + print(msg) + def basic_consume_loop(consumer, topics): try: consumer.subscribe(topics) @@ -44,7 +47,7 @@ Usage if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event - sys.stderr.write(f"{msg.topic() [{msg.partition()}] reached end at offset {msg.offset()}}") + sys.stderr.write(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}") elif msg.error(): raise KafkaException(msg.error()) else: @@ -53,7 +56,7 @@ Usage # Close down consumer to commit final offsets. consumer.close() - basic_consume_loop(consumer, "my-topic") + basic_consume_loop(consumer, ["my-topic"]) The _instrument method accepts the following keyword args: tracer_provider (TracerProvider) - an optional tracer provider @@ -72,14 +75,16 @@ The _instrument method accepts the following keyword args: .. code:: python from opentelemetry.instrumentation.confluent_kafka import ConfluentKafkaInstrumentor + from opentelemetry.trace import get_tracer_provider from confluent_kafka import Producer, Consumer inst = ConfluentKafkaInstrumentor() + tracer_provider = get_tracer_provider() - p = confluent_kafka.Producer({'bootstrap.servers': 'localhost:29092'}) - c = confluent_kafka.Consumer({ - 'bootstrap.servers': 'localhost:29092', + p = Producer({'bootstrap.servers': 'localhost:9092'}) + c = Consumer({ + 'bootstrap.servers': 'localhost:9092', 'group.id': 'mygroup', 'auto.offset.reset': 'earliest' })