From f0533aed1c6fc7d571a175fb75a3d493fc8f0112 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Tue, 12 Sep 2023 16:23:48 +0300 Subject: [PATCH] Fix instrumentation for reactor kafka 1.3.21 (#9445) --- .../javaagent/build.gradle.kts | 2 ++ .../javaagent/build.gradle.kts | 25 ++++++++++++-- .../kafka/v1_0/InstrumentedKafkaReceiver.java | 21 +++++++++--- ...Access.java => KafkaReceiver13Access.java} | 15 +++++++-- .../ReactorKafka1321InstrumentationTest.java | 33 +++++++++++++++++++ 5 files changed, 88 insertions(+), 8 deletions(-) rename instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/{KafkaReceiver133Access.java => KafkaReceiver13Access.java} (75%) create mode 100644 instrumentation/reactor/reactor-kafka-1.0/javaagent/src/testV1_3_21/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafka1321InstrumentationTest.java diff --git a/instrumentation/elasticsearch/elasticsearch-api-client-7.16/javaagent/build.gradle.kts b/instrumentation/elasticsearch/elasticsearch-api-client-7.16/javaagent/build.gradle.kts index 01e2416df2..1a1cca49c0 100644 --- a/instrumentation/elasticsearch/elasticsearch-api-client-7.16/javaagent/build.gradle.kts +++ b/instrumentation/elasticsearch/elasticsearch-api-client-7.16/javaagent/build.gradle.kts @@ -22,6 +22,8 @@ dependencies { testImplementation("com.fasterxml.jackson.core:jackson-databind:2.14.2") testImplementation("org.testcontainers:elasticsearch") + + latestDepTestLibrary("co.elastic.clients:elasticsearch-java:8.0.+") } tasks { diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts index 68a9b40d28..43af8a9593 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/build.gradle.kts @@ -22,9 +22,9 @@ dependencies { implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library")) implementation(project(":instrumentation:reactor:reactor-3.1:library")) - // using 1.3.0 to be able to implement several new KafkaReceiver methods added in 1.3.3 + // using 1.3 to be able to implement several new KafkaReceiver methods added in 1.3.3 and 1.3.21 // @NoMuzzle is used to ensure that this does not break muzzle checks - compileOnly("io.projectreactor.kafka:reactor-kafka:1.3.3") + compileOnly("io.projectreactor.kafka:reactor-kafka:1.3.21") testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent")) testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent")) @@ -60,6 +60,27 @@ testing { } } } + + val testV1_3_21 by registering(JvmTestSuite::class) { + dependencies { + implementation(project(":instrumentation:reactor:reactor-kafka-1.0:testing")) + + if (testLatestDeps) { + implementation("io.projectreactor.kafka:reactor-kafka:+") + implementation("io.projectreactor:reactor-core:3.4.+") + } else { + implementation("io.projectreactor.kafka:reactor-kafka:1.3.21") + } + } + + targets { + all { + testTask.configure { + systemProperty("hasConsumerGroupAndId", true) + } + } + } + } } } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java index 62b827f2fd..a834273217 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/InstrumentedKafkaReceiver.java @@ -25,7 +25,7 @@ public final class InstrumentedKafkaReceiver implements KafkaReceiver> receive(Integer prefetch) { - return wrap(KafkaReceiver133Access.receive(actual, prefetch)); + return wrap(KafkaReceiver13Access.receive(actual, prefetch)); } @Override @@ -36,7 +36,7 @@ public final class InstrumentedKafkaReceiver implements KafkaReceiver>> receiveAutoAck(Integer prefetch) { - return KafkaReceiver133Access.receiveAutoAck(actual, prefetch) + return KafkaReceiver13Access.receiveAutoAck(actual, prefetch) .map(InstrumentedKafkaReceiver::wrap); } @@ -48,7 +48,7 @@ public final class InstrumentedKafkaReceiver implements KafkaReceiver> receiveAtmostOnce(Integer prefetch) { - return wrap(KafkaReceiver133Access.receiveAtmostOnce(actual, prefetch)); + return wrap(KafkaReceiver13Access.receiveAtmostOnce(actual, prefetch)); } @Override @@ -66,7 +66,7 @@ public final class InstrumentedKafkaReceiver implements KafkaReceiver>> receiveExactlyOnce( TransactionManager transactionManager, Integer prefetch) { - return KafkaReceiver133Access.receiveExactlyOnce(actual, transactionManager, prefetch) + return KafkaReceiver13Access.receiveExactlyOnce(actual, transactionManager, prefetch) .map(InstrumentedKafkaReceiver::wrap); } @@ -75,6 +75,19 @@ public final class InstrumentedKafkaReceiver implements KafkaReceiver>> receiveBatch(Integer prefetch) { + return KafkaReceiver13Access.receiveBatch(actual, prefetch) + .map(InstrumentedKafkaReceiver::wrap); + } + + // added in 1.3.21 + @Override + public Flux>> receiveBatch() { + return KafkaReceiver13Access.receiveBatch(actual).map(InstrumentedKafkaReceiver::wrap); + } + private static > Flux wrap(Flux flux) { return flux instanceof InstrumentedKafkaFlux ? flux : new InstrumentedKafkaFlux<>(flux); } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver133Access.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver13Access.java similarity index 75% rename from instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver133Access.java rename to instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver13Access.java index 2e21f39d88..de50fa9b35 100644 --- a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver133Access.java +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/KafkaReceiver13Access.java @@ -12,7 +12,7 @@ import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverRecord; import reactor.kafka.sender.TransactionManager; -final class KafkaReceiver133Access { +final class KafkaReceiver13Access { @NoMuzzle static Flux> receive(KafkaReceiver receiver, Integer prefetch) { @@ -37,5 +37,16 @@ final class KafkaReceiver133Access { return receiver.receiveExactlyOnce(transactionManager, prefetch); } - private KafkaReceiver133Access() {} + @NoMuzzle + static Flux>> receiveBatch( + KafkaReceiver receiver, Integer prefetch) { + return receiver.receiveBatch(prefetch); + } + + @NoMuzzle + static Flux>> receiveBatch(KafkaReceiver receiver) { + return receiver.receiveBatch(); + } + + private KafkaReceiver13Access() {} } diff --git a/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/testV1_3_21/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafka1321InstrumentationTest.java b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/testV1_3_21/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafka1321InstrumentationTest.java new file mode 100644 index 0000000000..c146c009cb --- /dev/null +++ b/instrumentation/reactor/reactor-kafka-1.0/javaagent/src/testV1_3_21/java/io/opentelemetry/javaagent/instrumentation/reactor/kafka/v1_0/ReactorKafka1321InstrumentationTest.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0; + +import org.junit.jupiter.api.Test; + +class ReactorKafka1321InstrumentationTest extends AbstractReactorKafkaTest { + + @Test + void receiveBatch() { + testSingleRecordProcess( + recordConsumer -> + receiver + .receiveBatch() + .concatMap(r -> r) + .doOnNext(r -> r.receiverOffset().acknowledge()) + .subscribe(recordConsumer)); + } + + @Test + void receiveBatchWithSize() { + testSingleRecordProcess( + recordConsumer -> + receiver + .receiveBatch(1) + .concatMap(r -> r) + .doOnNext(r -> r.receiverOffset().acknowledge()) + .subscribe(recordConsumer)); + } +}