Implement vertx-kafka-client instrumentation; batch processing (#5982)

* Implement vertx-kafka-client instrumentation; batch processing

* try-finally just in case

* Add to supported libraries list

* Update instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedBatchRecordsHandler.java

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
Mateusz Rzeszutek 2022-05-10 12:00:54 +02:00 committed by GitHub
parent c22af231bd
commit 2fad192fc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 727 additions and 64 deletions

View File

@ -114,6 +114,7 @@ These are the supported libraries and frameworks:
| [Vaadin](https://vaadin.com/) | 14.2+ |
| [Vert.x Web](https://vertx.io/docs/vertx-web/java/) | 3.0+ |
| [Vert.x HttpClient](https://vertx.io/docs/apidocs/io/vertx/core/http/HttpClient.html) | 3.0+ |
| [Vert.x Kafka Client](https://vertx.io/docs/vertx-kafka-client/java/) | 3.6+ |
| [Vert.x RxJava2](https://vertx.io/docs/vertx-rx/java2/) | 3.5+ |
## Application Servers

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.kafka;
package io.opentelemetry.instrumentation.kafka.internal;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;

View File

@ -3,13 +3,12 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.kafka;
package io.opentelemetry.instrumentation.kafka.internal;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

View File

@ -19,6 +19,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttr
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import java.util.Collections;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
@ -118,4 +119,19 @@ public final class KafkaInstrumenterFactory {
return builder.newConsumerInstrumenter(KafkaConsumerRecordGetter.INSTANCE);
}
}
public Instrumenter<ConsumerRecords<?, ?>, Void> createBatchProcessInstrumenter() {
KafkaBatchProcessAttributesGetter getter = KafkaBatchProcessAttributesGetter.INSTANCE;
MessageOperation operation = MessageOperation.PROCESS;
return Instrumenter.<ConsumerRecords<?, ?>, Void>builder(
openTelemetry,
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addSpanLinksExtractor(
new KafkaBatchProcessSpanLinksExtractor(openTelemetry.getPropagators()))
.setErrorCauseExtractor(errorCauseExtractor)
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
}

View File

@ -7,10 +7,6 @@ package io.opentelemetry.javaagent.instrumentation.spring.kafka;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -19,26 +15,15 @@ public final class SpringKafkaSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7";
private static final Instrumenter<ConsumerRecords<?, ?>, Void> BATCH_PROCESS_INSTRUMENTER =
buildBatchProcessInstrumenter();
private static final Instrumenter<ConsumerRecord<?, ?>, Void> PROCESS_INSTRUMENTER =
private static final Instrumenter<ConsumerRecords<?, ?>, Void> BATCH_PROCESS_INSTRUMENTER;
private static final Instrumenter<ConsumerRecord<?, ?>, Void> PROCESS_INSTRUMENTER;
static {
KafkaInstrumenterFactory factory =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
.setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE)
.createConsumerProcessInstrumenter();
private static Instrumenter<ConsumerRecords<?, ?>, Void> buildBatchProcessInstrumenter() {
KafkaBatchProcessAttributesGetter getter = KafkaBatchProcessAttributesGetter.INSTANCE;
MessageOperation operation = MessageOperation.PROCESS;
return Instrumenter.<ConsumerRecords<?, ?>, Void>builder(
GlobalOpenTelemetry.get(),
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation))
.addSpanLinksExtractor(
new KafkaBatchProcessSpanLinksExtractor(GlobalOpenTelemetry.getPropagators()))
.setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE)
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
.setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE);
BATCH_PROCESS_INSTRUMENTER = factory.createBatchProcessInstrumenter();
PROCESS_INSTRUMENTER = factory.createConsumerProcessInstrumenter();
}
public static Instrumenter<ConsumerRecords<?, ?>, Void> batchProcessInstrumenter() {

View File

@ -0,0 +1,69 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6;
import static io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6.VertxKafkaSingletons.batchProcessInstrumenter;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.vertx.core.Handler;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecords;
public final class InstrumentedBatchRecordsHandler<K, V> implements Handler<ConsumerRecords<K, V>> {
private final VirtualField<ConsumerRecords<K, V>, Context> receiveContextField;
@Nullable private final Handler<ConsumerRecords<K, V>> delegate;
public InstrumentedBatchRecordsHandler(
VirtualField<ConsumerRecords<K, V>, Context> receiveContextField,
@Nullable Handler<ConsumerRecords<K, V>> delegate) {
this.receiveContextField = receiveContextField;
this.delegate = delegate;
}
@Override
public void handle(ConsumerRecords<K, V> records) {
Context parentContext = getParentContext(records);
if (!batchProcessInstrumenter().shouldStart(parentContext, records)) {
callDelegateHandler(records);
return;
}
// the instrumenter iterates over records when adding links, we need to suppress that
boolean previousWrappingEnabled = KafkaClientsConsumerProcessTracing.setEnabled(false);
try {
Context context = batchProcessInstrumenter().start(parentContext, records);
Throwable error = null;
try (Scope ignored = context.makeCurrent()) {
callDelegateHandler(records);
} catch (Throwable t) {
error = t;
throw t;
} finally {
batchProcessInstrumenter().end(context, records, null, error);
}
} finally {
KafkaClientsConsumerProcessTracing.setEnabled(previousWrappingEnabled);
}
}
private Context getParentContext(ConsumerRecords<K, V> records) {
Context receiveContext = receiveContextField.get(records);
// use the receive CONSUMER span as parent if it's available
return receiveContext != null ? receiveContext : Context.current();
}
private void callDelegateHandler(ConsumerRecords<K, V> records) {
if (delegate != null) {
delegate.handle(records);
}
}
}

View File

@ -47,8 +47,8 @@ public final class InstrumentedSingleRecordHandler<K, V> implements Handler<Cons
}
}
private Context getParentContext(ConsumerRecord<K, V> records) {
Context receiveContext = receiveContextField.get(records);
private Context getParentContext(ConsumerRecord<K, V> record) {
Context receiveContext = receiveContextField.get(record);
// use the receive CONSUMER span as parent if it's available
return receiveContext != null ? receiveContext : Context.current();

View File

@ -64,7 +64,10 @@ public class KafkaReadStreamImplInstrumentation implements TypeInstrumentation {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static <K, V> void onEnter(
@Advice.Argument(value = 0, readOnly = false) Handler<ConsumerRecords<K, V>> handler) {
// TODO: next PR
VirtualField<ConsumerRecords<K, V>, Context> receiveContextField =
VirtualField.find(ConsumerRecords.class, Context.class);
handler = new InstrumentedBatchRecordsHandler<>(receiveContextField, handler);
}
}

View File

@ -9,14 +9,25 @@ import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
public final class VertxKafkaSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.vertx-kafka-client-3.5";
private static final Instrumenter<ConsumerRecord<?, ?>, Void> PROCESS_INSTRUMENTER =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
.createConsumerProcessInstrumenter();
private static final Instrumenter<ConsumerRecords<?, ?>, Void> BATCH_PROCESS_INSTRUMENTER;
private static final Instrumenter<ConsumerRecord<?, ?>, Void> PROCESS_INSTRUMENTER;
static {
KafkaInstrumenterFactory factory =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME);
BATCH_PROCESS_INSTRUMENTER = factory.createBatchProcessInstrumenter();
PROCESS_INSTRUMENTER = factory.createConsumerProcessInstrumenter();
}
public static Instrumenter<ConsumerRecords<?, ?>, Void> batchProcessInstrumenter() {
return BATCH_PROCESS_INSTRUMENTER;
}
public static Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter() {
return PROCESS_INSTRUMENTER;

View File

@ -0,0 +1,236 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6;
import static io.opentelemetry.api.common.AttributeKey.longKey;
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.AbstractLongAssert;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
// ordering is needed to ensure that the error case runs last - throwing errors in the batch handler
// is possible and tolerated, but it messes up the internal state of the vertx kafka consumer
@TestMethodOrder(OrderAnnotation.class)
class BatchRecordsVertxKafkaTest extends AbstractVertxKafkaTest {
static final CountDownLatch consumerReady = new CountDownLatch(1);
@BeforeAll
static void setUpTopicAndConsumer() {
// in Vertx, a batch handler is something that runs in addition to the regular single record
// handler -- the KafkaConsumer won't start polling unless you set the regular handler
kafkaConsumer.batchHandler(BatchRecordsHandler.INSTANCE);
kafkaConsumer.handler(record -> testing.runWithSpan("process " + record.value(), () -> {}));
kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown());
kafkaConsumer.subscribe("testBatchTopic");
}
@Order(1)
@Test
void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException {
assertTrue(consumerReady.await(30, TimeUnit.SECONDS));
sendBatchMessages(
KafkaProducerRecord.create("testBatchTopic", "10", "testSpan1"),
KafkaProducerRecord.create("testBatchTopic", "20", "testSpan2"));
AtomicReference<SpanData> producer1 = new AtomicReference<>();
AtomicReference<SpanData> producer2 = new AtomicReference<>();
testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer"),
span ->
span.hasName("testBatchTopic send")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")),
span ->
span.hasName("testBatchTopic send")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")));
producer1.set(trace.getSpan(1));
producer2.set(trace.getSpan(2));
},
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("testBatchTopic receive")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")),
// batch consumer
span ->
span.hasName("testBatchTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(
LinkData.create(producer1.get().getSpanContext()),
LinkData.create(producer2.get().getSpanContext()))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")),
span -> span.hasName("batch consumer").hasParent(trace.getSpan(1)),
// single consumer 1
span ->
span.hasName("testBatchTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producer1.get().getSpanContext()))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
AbstractLongAssert::isNotNegative),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(longKey("kafka.offset"), AbstractLongAssert::isNotNegative),
satisfies(
longKey("kafka.record.queue_time_ms"),
AbstractLongAssert::isNotNegative)),
span -> span.hasName("process testSpan1").hasParent(trace.getSpan(3)),
// single consumer 2
span ->
span.hasName("testBatchTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producer2.get().getSpanContext()))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
AbstractLongAssert::isNotNegative),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(longKey("kafka.offset"), AbstractLongAssert::isNotNegative),
satisfies(
longKey("kafka.record.queue_time_ms"),
AbstractLongAssert::isNotNegative)),
span -> span.hasName("process testSpan2").hasParent(trace.getSpan(5))));
}
@Order(2)
@Test
void shouldHandleFailureInKafkaBatchListener() throws InterruptedException {
assertTrue(consumerReady.await(30, TimeUnit.SECONDS));
sendBatchMessages(KafkaProducerRecord.create("testBatchTopic", "10", "error"));
// make sure that the consumer eats up any leftover records
kafkaConsumer.resume();
AtomicReference<SpanData> producer = new AtomicReference<>();
// the regular handler is not being called if the batch one fails
testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer"),
span ->
span.hasName("testBatchTopic send")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")));
producer.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("testBatchTopic receive")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")),
// batch consumer
span ->
span.hasName("testBatchTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producer.get().getSpanContext()))
.hasStatus(StatusData.error())
.hasException(new IllegalArgumentException("boom"))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")),
span -> span.hasName("batch consumer").hasParent(trace.getSpan(1)),
// single consumer
span ->
span.hasName("testBatchTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
AbstractLongAssert::isNotNegative),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative),
satisfies(longKey("kafka.offset"), AbstractLongAssert::isNotNegative),
satisfies(
longKey("kafka.record.queue_time_ms"),
AbstractLongAssert::isNotNegative)),
span -> span.hasName("process error").hasParent(trace.getSpan(3))));
}
}

View File

@ -49,11 +49,10 @@ class SingleRecordVertxKafkaTest extends AbstractVertxKafkaTest {
CountDownLatch sent = new CountDownLatch(1);
testing.runWithSpan(
"producer",
() -> {
kafkaProducer.write(
() ->
sendRecord(
KafkaProducerRecord.create("testSingleTopic", "10", "testSpan"),
result -> sent.countDown());
});
result -> sent.countDown()));
assertTrue(sent.await(30, TimeUnit.SECONDS));
AtomicReference<SpanData> producer = new AtomicReference<>();
@ -115,11 +114,10 @@ class SingleRecordVertxKafkaTest extends AbstractVertxKafkaTest {
CountDownLatch sent = new CountDownLatch(1);
testing.runWithSpan(
"producer",
() -> {
kafkaProducer.write(
() ->
sendRecord(
KafkaProducerRecord.create("testSingleTopic", "10", "error"),
result -> sent.countDown());
});
result -> sent.countDown()));
assertTrue(sent.await(30, TimeUnit.SECONDS));
AtomicReference<SpanData> producer = new AtomicReference<>();

View File

@ -0,0 +1,198 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6;
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.AbstractLongAssert;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
// ordering is needed to ensure that the error case runs last - throwing errors in the batch handler
// is possible and tolerated, but it messes up the internal state of the vertx kafka consumer
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class NoReceiveTelemetryBatchRecordsVertxKafkaTest extends AbstractVertxKafkaTest {
static final CountDownLatch consumerReady = new CountDownLatch(1);
@BeforeAll
static void setUpTopicAndConsumer() {
// in Vertx, a batch handler is something that runs in addition to the regular single record
// handler -- the KafkaConsumer won't start polling unless you set the regular handler
kafkaConsumer.batchHandler(BatchRecordsHandler.INSTANCE);
kafkaConsumer.handler(record -> testing.runWithSpan("process " + record.value(), () -> {}));
kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown());
kafkaConsumer.subscribe("testBatchTopic");
}
@Order(1)
@Test
void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException {
assertTrue(consumerReady.await(30, TimeUnit.SECONDS));
sendBatchMessages(
KafkaProducerRecord.create("testBatchTopic", "10", "testSpan1"),
KafkaProducerRecord.create("testBatchTopic", "20", "testSpan2"));
AtomicReference<SpanData> producer1 = new AtomicReference<>();
AtomicReference<SpanData> producer2 = new AtomicReference<>();
testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer"),
// first record
span ->
span.hasName("testBatchTopic send")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")),
span ->
span.hasName("testBatchTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
AbstractLongAssert::isNotNegative),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative)),
span -> span.hasName("process testSpan1").hasParent(trace.getSpan(2)),
// second record
span ->
span.hasName("testBatchTopic send")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")),
span ->
span.hasName("testBatchTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(4))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
AbstractLongAssert::isNotNegative),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative)),
span -> span.hasName("process testSpan2").hasParent(trace.getSpan(5)));
producer1.set(trace.getSpan(1));
producer2.set(trace.getSpan(4));
},
trace ->
trace.hasSpansSatisfyingExactly(
// batch consumer
span ->
span.hasName("testBatchTopic process")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasLinks(
LinkData.create(producer1.get().getSpanContext()),
LinkData.create(producer2.get().getSpanContext()))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")),
span -> span.hasName("batch consumer").hasParent(trace.getSpan(0))));
}
@Order(2)
@Test
void shouldHandleFailureInKafkaBatchListener() throws InterruptedException {
assertTrue(consumerReady.await(30, TimeUnit.SECONDS));
sendBatchMessages(KafkaProducerRecord.create("testBatchTopic", "10", "error"));
// make sure that the consumer eats up any leftover records
kafkaConsumer.resume();
AtomicReference<SpanData> producer = new AtomicReference<>();
// the regular handler is not being called if the batch one fails
testing.waitAndAssertSortedTraces(
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("producer"),
span ->
span.hasName("testBatchTopic send")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")),
span ->
span.hasName("testBatchTopic process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
AbstractLongAssert::isNotNegative),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
AbstractLongAssert::isNotNegative)),
span -> span.hasName("process error").hasParent(trace.getSpan(2)));
producer.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("testBatchTopic process")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasLinks(LinkData.create(producer.get().getSpanContext()))
.hasStatus(StatusData.error())
.hasException(new IllegalArgumentException("boom"))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")),
span -> span.hasName("batch consumer").hasParent(trace.getSpan(0))));
}
}

View File

@ -44,11 +44,10 @@ class NoReceiveTelemetrySingleRecordVertxKafkaTest extends AbstractVertxKafkaTes
CountDownLatch sent = new CountDownLatch(1);
testing.runWithSpan(
"producer",
() -> {
kafkaProducer.write(
() ->
sendRecord(
KafkaProducerRecord.create("testSingleTopic", "10", "testSpan"),
result -> sent.countDown());
});
result -> sent.countDown()));
assertTrue(sent.await(30, TimeUnit.SECONDS));
testing.waitAndAssertTraces(
@ -88,11 +87,10 @@ class NoReceiveTelemetrySingleRecordVertxKafkaTest extends AbstractVertxKafkaTes
CountDownLatch sent = new CountDownLatch(1);
testing.runWithSpan(
"producer",
() -> {
kafkaProducer.write(
() ->
sendRecord(
KafkaProducerRecord.create("testSingleTopic", "10", "error"),
result -> sent.countDown());
});
result -> sent.countDown()));
assertTrue(sent.await(30, TimeUnit.SECONDS));
testing.waitAndAssertTraces(

View File

@ -5,13 +5,24 @@
package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
@ -33,7 +44,7 @@ public abstract class AbstractVertxKafkaTest {
static KafkaContainer kafka;
static Vertx vertx;
protected static KafkaProducer<String, String> kafkaProducer;
static KafkaProducer<String, String> kafkaProducer;
protected static KafkaConsumer<String, String> kafkaConsumer;
@BeforeAll
@ -85,4 +96,90 @@ public abstract class AbstractVertxKafkaTest {
props.put("value.deserializer", StringDeserializer.class);
return props;
}
@SafeVarargs
protected final void sendBatchMessages(KafkaProducerRecord<String, String>... records)
throws InterruptedException {
// This test assumes that messages are sent and received as a batch. Occasionally it happens
// that the messages are not received as a batch, but one by one. This doesn't match what the
// assertion expects. To reduce flakiness we retry the test when messages weren't received as
// a batch.
int maxAttempts = 5;
for (int i = 1; i <= maxAttempts; i++) {
BatchRecordsHandler.reset();
kafkaConsumer.pause();
// wait a bit to ensure that the consumer has really paused
Thread.sleep(1000);
CountDownLatch sent = new CountDownLatch(records.length);
testing.runWithSpan(
"producer",
() -> {
for (KafkaProducerRecord<String, String> record : records) {
sendRecord(record, result -> sent.countDown());
}
});
assertTrue(sent.await(30, TimeUnit.SECONDS));
kafkaConsumer.resume();
BatchRecordsHandler.waitForMessages();
if (BatchRecordsHandler.getLastBatchSize() == records.length) {
break;
} else if (i < maxAttempts) {
testing.waitForTraces(2);
Thread.sleep(1_000); // sleep a bit to give time for all the spans to arrive
testing.clearData();
logger.info("Messages weren't received as batch, retrying");
}
}
}
private static final MethodHandle SEND_METHOD;
static {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
MethodHandle sendMethod = null;
// versions 3.6+
try {
sendMethod =
lookup.findVirtual(
KafkaProducer.class,
"write",
MethodType.methodType(KafkaProducer.class, KafkaProducerRecord.class, Handler.class));
} catch (NoSuchMethodException | IllegalAccessException ignored) {
// ignore
}
// versions 4+
if (sendMethod == null) {
try {
sendMethod =
lookup.findVirtual(
KafkaProducer.class,
"send",
MethodType.methodType(
KafkaProducer.class, KafkaProducerRecord.class, Handler.class));
} catch (NoSuchMethodException | IllegalAccessException ignored) {
// ignore
}
}
if (sendMethod == null) {
throw new AssertionError("Could not find send/write method on KafkaProducer");
}
SEND_METHOD = sendMethod;
}
protected static void sendRecord(
KafkaProducerRecord<String, String> record, Handler<AsyncResult<RecordMetadata>> handler) {
try {
SEND_METHOD.invoke(kafkaProducer, record, handler);
} catch (Throwable e) {
throw new AssertionError("Failed producer send/write invocation", e);
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6;
import io.opentelemetry.instrumentation.testing.GlobalTraceUtil;
import io.vertx.core.Handler;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
public final class BatchRecordsHandler implements Handler<KafkaConsumerRecords<String, String>> {
public static final BatchRecordsHandler INSTANCE = new BatchRecordsHandler();
private static final AtomicInteger lastBatchSize = new AtomicInteger();
private static volatile CountDownLatch messageReceived = new CountDownLatch(2);
private BatchRecordsHandler() {}
@Override
public void handle(KafkaConsumerRecords<String, String> records) {
lastBatchSize.set(records.size());
IntStream.range(0, records.size()).forEach(it -> messageReceived.countDown());
GlobalTraceUtil.runWithSpan("batch consumer", () -> {});
for (int i = 0; i < records.size(); ++i) {
KafkaConsumerRecord<String, String> record = records.recordAt(i);
if (record.value().equals("error")) {
throw new IllegalArgumentException("boom");
}
}
}
public static void reset() {
messageReceived = new CountDownLatch(2);
lastBatchSize.set(0);
}
public static void waitForMessages() throws InterruptedException {
messageReceived.await(30, TimeUnit.SECONDS);
}
public static int getLastBatchSize() {
return lastBatchSize.get();
}
}

View File

@ -94,25 +94,25 @@ public abstract class InstrumentationTestRunner {
assertions.forEach(assertionsList::add);
try {
await()
.untilAsserted(
() -> {
List<List<SpanData>> traces = waitForTraces(assertionsList.size());
if (traceComparator != null) {
traces.sort(traceComparator);
}
TracesAssert.assertThat(traces).hasTracesSatisfyingExactly(assertionsList);
});
await().untilAsserted(() -> doAssertTraces(traceComparator, assertionsList));
} catch (ConditionTimeoutException e) {
// Don't throw this failure since the stack is the awaitility thread, causing confusion.
// Instead, just assert one more time on the test thread, which will fail with a better stack
// trace.
// TODO(anuraaga): There is probably a better way to do this.
List<List<SpanData>> traces = waitForTraces(assertionsList.size());
TracesAssert.assertThat(traces).hasTracesSatisfyingExactly(assertionsList);
doAssertTraces(traceComparator, assertionsList);
}
}
private <T extends Consumer<TraceAssert>> void doAssertTraces(
@Nullable Comparator<List<SpanData>> traceComparator, List<T> assertionsList) {
List<List<SpanData>> traces = waitForTraces(assertionsList.size());
if (traceComparator != null) {
traces.sort(traceComparator);
}
TracesAssert.assertThat(traces).hasTracesSatisfyingExactly(assertionsList);
}
/**
* Runs the provided {@code callback} inside the scope of an INTERNAL span with name {@code
* spanName}.