diff --git a/docs/supported-libraries.md b/docs/supported-libraries.md index 074c98a3bd..38c9a025a7 100644 --- a/docs/supported-libraries.md +++ b/docs/supported-libraries.md @@ -114,6 +114,7 @@ These are the supported libraries and frameworks: | [Vaadin](https://vaadin.com/) | 14.2+ | | [Vert.x Web](https://vertx.io/docs/vertx-web/java/) | 3.0+ | | [Vert.x HttpClient](https://vertx.io/docs/apidocs/io/vertx/core/http/HttpClient.html) | 3.0+ | +| [Vert.x Kafka Client](https://vertx.io/docs/vertx-kafka-client/java/) | 3.6+ | | [Vert.x RxJava2](https://vertx.io/docs/vertx-rx/java2/) | 3.5+ | ## Application Servers diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessAttributesGetter.java similarity index 96% rename from instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessAttributesGetter.java rename to instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessAttributesGetter.java index 04e548c8b5..15c4306dc1 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessAttributesGetter.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.spring.kafka; +package io.opentelemetry.instrumentation.kafka.internal; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessSpanLinksExtractor.java similarity index 89% rename from instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java rename to instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessSpanLinksExtractor.java index fb793e0930..9011d67d4a 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchProcessSpanLinksExtractor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessSpanLinksExtractor.java @@ -3,13 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -package io.opentelemetry.javaagent.instrumentation.spring.kafka; +package io.opentelemetry.instrumentation.kafka.internal; import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.ContextPropagators; import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; -import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java index 82f60dde1a..8628f2c582 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java @@ -19,6 +19,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttr import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import java.util.Collections; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; /** @@ -118,4 +119,19 @@ public final class KafkaInstrumenterFactory { return builder.newConsumerInstrumenter(KafkaConsumerRecordGetter.INSTANCE); } } + + public Instrumenter, Void> createBatchProcessInstrumenter() { + KafkaBatchProcessAttributesGetter getter = KafkaBatchProcessAttributesGetter.INSTANCE; + MessageOperation operation = MessageOperation.PROCESS; + + return Instrumenter., Void>builder( + openTelemetry, + instrumentationName, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)) + .addSpanLinksExtractor( + new KafkaBatchProcessSpanLinksExtractor(openTelemetry.getPropagators())) + .setErrorCauseExtractor(errorCauseExtractor) + .newInstrumenter(SpanKindExtractor.alwaysConsumer()); + } } diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java index 0274f7c9bc..ec8f802c6d 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java @@ -7,10 +7,6 @@ package io.opentelemetry.javaagent.instrumentation.spring.kafka; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -19,26 +15,15 @@ public final class SpringKafkaSingletons { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7"; - private static final Instrumenter, Void> BATCH_PROCESS_INSTRUMENTER = - buildBatchProcessInstrumenter(); - private static final Instrumenter, Void> PROCESS_INSTRUMENTER = - new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) - .setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE) - .createConsumerProcessInstrumenter(); + private static final Instrumenter, Void> BATCH_PROCESS_INSTRUMENTER; + private static final Instrumenter, Void> PROCESS_INSTRUMENTER; - private static Instrumenter, Void> buildBatchProcessInstrumenter() { - KafkaBatchProcessAttributesGetter getter = KafkaBatchProcessAttributesGetter.INSTANCE; - MessageOperation operation = MessageOperation.PROCESS; - - return Instrumenter., Void>builder( - GlobalOpenTelemetry.get(), - INSTRUMENTATION_NAME, - MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)) - .addSpanLinksExtractor( - new KafkaBatchProcessSpanLinksExtractor(GlobalOpenTelemetry.getPropagators())) - .setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE) - .newInstrumenter(SpanKindExtractor.alwaysConsumer()); + static { + KafkaInstrumenterFactory factory = + new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) + .setErrorCauseExtractor(SpringKafkaErrorCauseExtractor.INSTANCE); + BATCH_PROCESS_INSTRUMENTER = factory.createBatchProcessInstrumenter(); + PROCESS_INSTRUMENTER = factory.createConsumerProcessInstrumenter(); } public static Instrumenter, Void> batchProcessInstrumenter() { diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedBatchRecordsHandler.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedBatchRecordsHandler.java new file mode 100644 index 0000000000..9e6a20f181 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedBatchRecordsHandler.java @@ -0,0 +1,69 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; + +import static io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6.VertxKafkaSingletons.batchProcessInstrumenter; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.field.VirtualField; +import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; +import io.vertx.core.Handler; +import javax.annotation.Nullable; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +public final class InstrumentedBatchRecordsHandler implements Handler> { + + private final VirtualField, Context> receiveContextField; + @Nullable private final Handler> delegate; + + public InstrumentedBatchRecordsHandler( + VirtualField, Context> receiveContextField, + @Nullable Handler> delegate) { + this.receiveContextField = receiveContextField; + this.delegate = delegate; + } + + @Override + public void handle(ConsumerRecords records) { + Context parentContext = getParentContext(records); + + if (!batchProcessInstrumenter().shouldStart(parentContext, records)) { + callDelegateHandler(records); + return; + } + + // the instrumenter iterates over records when adding links, we need to suppress that + boolean previousWrappingEnabled = KafkaClientsConsumerProcessTracing.setEnabled(false); + try { + Context context = batchProcessInstrumenter().start(parentContext, records); + Throwable error = null; + try (Scope ignored = context.makeCurrent()) { + callDelegateHandler(records); + } catch (Throwable t) { + error = t; + throw t; + } finally { + batchProcessInstrumenter().end(context, records, null, error); + } + } finally { + KafkaClientsConsumerProcessTracing.setEnabled(previousWrappingEnabled); + } + } + + private Context getParentContext(ConsumerRecords records) { + Context receiveContext = receiveContextField.get(records); + + // use the receive CONSUMER span as parent if it's available + return receiveContext != null ? receiveContext : Context.current(); + } + + private void callDelegateHandler(ConsumerRecords records) { + if (delegate != null) { + delegate.handle(records); + } + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedSingleRecordHandler.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedSingleRecordHandler.java index f404be0159..38e8298190 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedSingleRecordHandler.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/InstrumentedSingleRecordHandler.java @@ -47,8 +47,8 @@ public final class InstrumentedSingleRecordHandler implements Handler records) { - Context receiveContext = receiveContextField.get(records); + private Context getParentContext(ConsumerRecord record) { + Context receiveContext = receiveContextField.get(record); // use the receive CONSUMER span as parent if it's available return receiveContext != null ? receiveContext : Context.current(); diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaReadStreamImplInstrumentation.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaReadStreamImplInstrumentation.java index ba53efb2b8..fb420c3761 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaReadStreamImplInstrumentation.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/KafkaReadStreamImplInstrumentation.java @@ -64,7 +64,10 @@ public class KafkaReadStreamImplInstrumentation implements TypeInstrumentation { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( @Advice.Argument(value = 0, readOnly = false) Handler> handler) { - // TODO: next PR + + VirtualField, Context> receiveContextField = + VirtualField.find(ConsumerRecords.class, Context.class); + handler = new InstrumentedBatchRecordsHandler<>(receiveContextField, handler); } } diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java index e2135bb8ca..88bc4f9f5c 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java @@ -9,14 +9,25 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; public final class VertxKafkaSingletons { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.vertx-kafka-client-3.5"; - private static final Instrumenter, Void> PROCESS_INSTRUMENTER = - new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) - .createConsumerProcessInstrumenter(); + private static final Instrumenter, Void> BATCH_PROCESS_INSTRUMENTER; + private static final Instrumenter, Void> PROCESS_INSTRUMENTER; + + static { + KafkaInstrumenterFactory factory = + new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME); + BATCH_PROCESS_INSTRUMENTER = factory.createBatchProcessInstrumenter(); + PROCESS_INSTRUMENTER = factory.createConsumerProcessInstrumenter(); + } + + public static Instrumenter, Void> batchProcessInstrumenter() { + return BATCH_PROCESS_INSTRUMENTER; + } public static Instrumenter, Void> processInstrumenter() { return PROCESS_INSTRUMENTER; diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsVertxKafkaTest.java new file mode 100644 index 0000000000..73bd1af936 --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsVertxKafkaTest.java @@ -0,0 +1,236 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; + +import static io.opentelemetry.api.common.AttributeKey.longKey; +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer.OrderAnnotation; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +// ordering is needed to ensure that the error case runs last - throwing errors in the batch handler +// is possible and tolerated, but it messes up the internal state of the vertx kafka consumer +@TestMethodOrder(OrderAnnotation.class) +class BatchRecordsVertxKafkaTest extends AbstractVertxKafkaTest { + + static final CountDownLatch consumerReady = new CountDownLatch(1); + + @BeforeAll + static void setUpTopicAndConsumer() { + // in Vertx, a batch handler is something that runs in addition to the regular single record + // handler -- the KafkaConsumer won't start polling unless you set the regular handler + kafkaConsumer.batchHandler(BatchRecordsHandler.INSTANCE); + kafkaConsumer.handler(record -> testing.runWithSpan("process " + record.value(), () -> {})); + + kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown()); + kafkaConsumer.subscribe("testBatchTopic"); + } + + @Order(1) + @Test + void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + sendBatchMessages( + KafkaProducerRecord.create("testBatchTopic", "10", "testSpan1"), + KafkaProducerRecord.create("testBatchTopic", "20", "testSpan2")); + + AtomicReference producer1 = new AtomicReference<>(); + AtomicReference producer2 = new AtomicReference<>(); + + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testBatchTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")), + span -> + span.hasName("testBatchTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"))); + + producer1.set(trace.getSpan(1)); + producer2.set(trace.getSpan(2)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testBatchTopic receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")), + + // batch consumer + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks( + LinkData.create(producer1.get().getSpanContext()), + LinkData.create(producer2.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")), + span -> span.hasName("batch consumer").hasParent(trace.getSpan(1)), + + // single consumer 1 + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer1.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies(longKey("kafka.offset"), AbstractLongAssert::isNotNegative), + satisfies( + longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative)), + span -> span.hasName("process testSpan1").hasParent(trace.getSpan(3)), + + // single consumer 2 + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer2.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies(longKey("kafka.offset"), AbstractLongAssert::isNotNegative), + satisfies( + longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative)), + span -> span.hasName("process testSpan2").hasParent(trace.getSpan(5)))); + } + + @Order(2) + @Test + void shouldHandleFailureInKafkaBatchListener() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + sendBatchMessages(KafkaProducerRecord.create("testBatchTopic", "10", "error")); + // make sure that the consumer eats up any leftover records + kafkaConsumer.resume(); + + AtomicReference producer = new AtomicReference<>(); + + // the regular handler is not being called if the batch one fails + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testBatchTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"))); + + producer.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testBatchTopic receive") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")), + + // batch consumer + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")), + span -> span.hasName("batch consumer").hasParent(trace.getSpan(1)), + + // single consumer + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative), + satisfies(longKey("kafka.offset"), AbstractLongAssert::isNotNegative), + satisfies( + longKey("kafka.record.queue_time_ms"), + AbstractLongAssert::isNotNegative)), + span -> span.hasName("process error").hasParent(trace.getSpan(3)))); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordVertxKafkaTest.java index caac2a862b..306d26993b 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordVertxKafkaTest.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/SingleRecordVertxKafkaTest.java @@ -49,11 +49,10 @@ class SingleRecordVertxKafkaTest extends AbstractVertxKafkaTest { CountDownLatch sent = new CountDownLatch(1); testing.runWithSpan( "producer", - () -> { - kafkaProducer.write( - KafkaProducerRecord.create("testSingleTopic", "10", "testSpan"), - result -> sent.countDown()); - }); + () -> + sendRecord( + KafkaProducerRecord.create("testSingleTopic", "10", "testSpan"), + result -> sent.countDown())); assertTrue(sent.await(30, TimeUnit.SECONDS)); AtomicReference producer = new AtomicReference<>(); @@ -115,11 +114,10 @@ class SingleRecordVertxKafkaTest extends AbstractVertxKafkaTest { CountDownLatch sent = new CountDownLatch(1); testing.runWithSpan( "producer", - () -> { - kafkaProducer.write( - KafkaProducerRecord.create("testSingleTopic", "10", "error"), - result -> sent.countDown()); - }); + () -> + sendRecord( + KafkaProducerRecord.create("testSingleTopic", "10", "error"), + result -> sent.countDown())); assertTrue(sent.await(30, TimeUnit.SECONDS)); AtomicReference producer = new AtomicReference<>(); diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetryBatchRecordsVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetryBatchRecordsVertxKafkaTest.java new file mode 100644 index 0000000000..93d4937acf --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetryBatchRecordsVertxKafkaTest.java @@ -0,0 +1,198 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; + +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +// ordering is needed to ensure that the error case runs last - throwing errors in the batch handler +// is possible and tolerated, but it messes up the internal state of the vertx kafka consumer +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class NoReceiveTelemetryBatchRecordsVertxKafkaTest extends AbstractVertxKafkaTest { + + static final CountDownLatch consumerReady = new CountDownLatch(1); + + @BeforeAll + static void setUpTopicAndConsumer() { + // in Vertx, a batch handler is something that runs in addition to the regular single record + // handler -- the KafkaConsumer won't start polling unless you set the regular handler + kafkaConsumer.batchHandler(BatchRecordsHandler.INSTANCE); + kafkaConsumer.handler(record -> testing.runWithSpan("process " + record.value(), () -> {})); + + kafkaConsumer.partitionsAssignedHandler(partitions -> consumerReady.countDown()); + kafkaConsumer.subscribe("testBatchTopic"); + } + + @Order(1) + @Test + void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + sendBatchMessages( + KafkaProducerRecord.create("testBatchTopic", "10", "testSpan1"), + KafkaProducerRecord.create("testBatchTopic", "20", "testSpan2")); + + AtomicReference producer1 = new AtomicReference<>(); + AtomicReference producer2 = new AtomicReference<>(); + + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + + // first record + span -> + span.hasName("testBatchTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")), + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative)), + span -> span.hasName("process testSpan1").hasParent(trace.getSpan(2)), + + // second record + span -> + span.hasName("testBatchTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")), + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(4)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative)), + span -> span.hasName("process testSpan2").hasParent(trace.getSpan(5))); + + producer1.set(trace.getSpan(1)); + producer2.set(trace.getSpan(4)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + // batch consumer + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinks( + LinkData.create(producer1.get().getSpanContext()), + LinkData.create(producer2.get().getSpanContext())) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")), + span -> span.hasName("batch consumer").hasParent(trace.getSpan(0)))); + } + + @Order(2) + @Test + void shouldHandleFailureInKafkaBatchListener() throws InterruptedException { + assertTrue(consumerReady.await(30, TimeUnit.SECONDS)); + + sendBatchMessages(KafkaProducerRecord.create("testBatchTopic", "10", "error")); + // make sure that the consumer eats up any leftover records + kafkaConsumer.resume(); + + AtomicReference producer = new AtomicReference<>(); + + // the regular handler is not being called if the batch one fails + testing.waitAndAssertSortedTraces( + orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER), + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer"), + span -> + span.hasName("testBatchTopic send") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")), + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + satisfies( + SemanticAttributes.MESSAGING_KAFKA_PARTITION, + AbstractLongAssert::isNotNegative)), + span -> span.hasName("process error").hasParent(trace.getSpan(2))); + + producer.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testBatchTopic process") + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasLinks(LinkData.create(producer.get().getSpanContext())) + .hasStatus(StatusData.error()) + .hasException(new IllegalArgumentException("boom")) + .hasAttributesSatisfyingExactly( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION, "testBatchTopic"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process")), + span -> span.hasName("batch consumer").hasParent(trace.getSpan(0)))); + } +} diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetrySingleRecordVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetrySingleRecordVertxKafkaTest.java index f37c964d1b..3a1f70094b 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetrySingleRecordVertxKafkaTest.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/testNoReceiveTelemetry/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/NoReceiveTelemetrySingleRecordVertxKafkaTest.java @@ -44,11 +44,10 @@ class NoReceiveTelemetrySingleRecordVertxKafkaTest extends AbstractVertxKafkaTes CountDownLatch sent = new CountDownLatch(1); testing.runWithSpan( "producer", - () -> { - kafkaProducer.write( - KafkaProducerRecord.create("testSingleTopic", "10", "testSpan"), - result -> sent.countDown()); - }); + () -> + sendRecord( + KafkaProducerRecord.create("testSingleTopic", "10", "testSpan"), + result -> sent.countDown())); assertTrue(sent.await(30, TimeUnit.SECONDS)); testing.waitAndAssertTraces( @@ -88,11 +87,10 @@ class NoReceiveTelemetrySingleRecordVertxKafkaTest extends AbstractVertxKafkaTes CountDownLatch sent = new CountDownLatch(1); testing.runWithSpan( "producer", - () -> { - kafkaProducer.write( - KafkaProducerRecord.create("testSingleTopic", "10", "error"), - result -> sent.countDown()); - }); + () -> + sendRecord( + KafkaProducerRecord.create("testSingleTopic", "10", "error"), + result -> sent.countDown())); assertTrue(sent.await(30, TimeUnit.SECONDS)); testing.waitAndAssertTraces( diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java index d96a0921a6..978dc6d9eb 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/AbstractVertxKafkaTest.java @@ -5,13 +5,24 @@ package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; +import static org.junit.jupiter.api.Assertions.assertTrue; + import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.kafka.client.consumer.KafkaConsumer; import io.vertx.kafka.client.producer.KafkaProducer; +import io.vertx.kafka.client.producer.KafkaProducerRecord; +import io.vertx.kafka.client.producer.RecordMetadata; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; import java.time.Duration; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.AfterAll; @@ -33,7 +44,7 @@ public abstract class AbstractVertxKafkaTest { static KafkaContainer kafka; static Vertx vertx; - protected static KafkaProducer kafkaProducer; + static KafkaProducer kafkaProducer; protected static KafkaConsumer kafkaConsumer; @BeforeAll @@ -85,4 +96,90 @@ public abstract class AbstractVertxKafkaTest { props.put("value.deserializer", StringDeserializer.class); return props; } + + @SafeVarargs + protected final void sendBatchMessages(KafkaProducerRecord... records) + throws InterruptedException { + // This test assumes that messages are sent and received as a batch. Occasionally it happens + // that the messages are not received as a batch, but one by one. This doesn't match what the + // assertion expects. To reduce flakiness we retry the test when messages weren't received as + // a batch. + int maxAttempts = 5; + for (int i = 1; i <= maxAttempts; i++) { + BatchRecordsHandler.reset(); + kafkaConsumer.pause(); + + // wait a bit to ensure that the consumer has really paused + Thread.sleep(1000); + + CountDownLatch sent = new CountDownLatch(records.length); + testing.runWithSpan( + "producer", + () -> { + for (KafkaProducerRecord record : records) { + sendRecord(record, result -> sent.countDown()); + } + }); + assertTrue(sent.await(30, TimeUnit.SECONDS)); + + kafkaConsumer.resume(); + BatchRecordsHandler.waitForMessages(); + + if (BatchRecordsHandler.getLastBatchSize() == records.length) { + break; + } else if (i < maxAttempts) { + testing.waitForTraces(2); + Thread.sleep(1_000); // sleep a bit to give time for all the spans to arrive + testing.clearData(); + logger.info("Messages weren't received as batch, retrying"); + } + } + } + + private static final MethodHandle SEND_METHOD; + + static { + MethodHandles.Lookup lookup = MethodHandles.publicLookup(); + MethodHandle sendMethod = null; + + // versions 3.6+ + try { + sendMethod = + lookup.findVirtual( + KafkaProducer.class, + "write", + MethodType.methodType(KafkaProducer.class, KafkaProducerRecord.class, Handler.class)); + } catch (NoSuchMethodException | IllegalAccessException ignored) { + // ignore + } + + // versions 4+ + if (sendMethod == null) { + try { + sendMethod = + lookup.findVirtual( + KafkaProducer.class, + "send", + MethodType.methodType( + KafkaProducer.class, KafkaProducerRecord.class, Handler.class)); + } catch (NoSuchMethodException | IllegalAccessException ignored) { + // ignore + } + } + + if (sendMethod == null) { + throw new AssertionError("Could not find send/write method on KafkaProducer"); + } + SEND_METHOD = sendMethod; + } + + protected static void sendRecord( + KafkaProducerRecord record, Handler> handler) { + + try { + SEND_METHOD.invoke(kafkaProducer, record, handler); + } catch (Throwable e) { + throw new AssertionError("Failed producer send/write invocation", e); + } + } } diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsHandler.java b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsHandler.java new file mode 100644 index 0000000000..997498b4af --- /dev/null +++ b/instrumentation/vertx/vertx-kafka-client-3.6/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/BatchRecordsHandler.java @@ -0,0 +1,52 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.vertx.kafka.v3_6; + +import io.opentelemetry.instrumentation.testing.GlobalTraceUtil; +import io.vertx.core.Handler; +import io.vertx.kafka.client.consumer.KafkaConsumerRecord; +import io.vertx.kafka.client.consumer.KafkaConsumerRecords; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +public final class BatchRecordsHandler implements Handler> { + + public static final BatchRecordsHandler INSTANCE = new BatchRecordsHandler(); + + private static final AtomicInteger lastBatchSize = new AtomicInteger(); + private static volatile CountDownLatch messageReceived = new CountDownLatch(2); + + private BatchRecordsHandler() {} + + @Override + public void handle(KafkaConsumerRecords records) { + lastBatchSize.set(records.size()); + IntStream.range(0, records.size()).forEach(it -> messageReceived.countDown()); + + GlobalTraceUtil.runWithSpan("batch consumer", () -> {}); + for (int i = 0; i < records.size(); ++i) { + KafkaConsumerRecord record = records.recordAt(i); + if (record.value().equals("error")) { + throw new IllegalArgumentException("boom"); + } + } + } + + public static void reset() { + messageReceived = new CountDownLatch(2); + lastBatchSize.set(0); + } + + public static void waitForMessages() throws InterruptedException { + messageReceived.await(30, TimeUnit.SECONDS); + } + + public static int getLastBatchSize() { + return lastBatchSize.get(); + } +} diff --git a/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/InstrumentationTestRunner.java b/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/InstrumentationTestRunner.java index 31c90b64b8..064e57d834 100644 --- a/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/InstrumentationTestRunner.java +++ b/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/InstrumentationTestRunner.java @@ -94,25 +94,25 @@ public abstract class InstrumentationTestRunner { assertions.forEach(assertionsList::add); try { - await() - .untilAsserted( - () -> { - List> traces = waitForTraces(assertionsList.size()); - if (traceComparator != null) { - traces.sort(traceComparator); - } - TracesAssert.assertThat(traces).hasTracesSatisfyingExactly(assertionsList); - }); + await().untilAsserted(() -> doAssertTraces(traceComparator, assertionsList)); } catch (ConditionTimeoutException e) { // Don't throw this failure since the stack is the awaitility thread, causing confusion. // Instead, just assert one more time on the test thread, which will fail with a better stack // trace. // TODO(anuraaga): There is probably a better way to do this. - List> traces = waitForTraces(assertionsList.size()); - TracesAssert.assertThat(traces).hasTracesSatisfyingExactly(assertionsList); + doAssertTraces(traceComparator, assertionsList); } } + private > void doAssertTraces( + @Nullable Comparator> traceComparator, List assertionsList) { + List> traces = waitForTraces(assertionsList.size()); + if (traceComparator != null) { + traces.sort(traceComparator); + } + TracesAssert.assertThat(traces).hasTracesSatisfyingExactly(assertionsList); + } + /** * Runs the provided {@code callback} inside the scope of an INTERNAL span with name {@code * spanName}.