Fix instrumentation for reactor kafka 1.3.21 (#9445)

This commit is contained in:
Lauri Tulmin 2023-09-12 16:23:48 +03:00 committed by GitHub
parent 7e16e40ca2
commit f0533aed1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 88 additions and 8 deletions

View File

@ -22,6 +22,8 @@ dependencies {
testImplementation("com.fasterxml.jackson.core:jackson-databind:2.14.2")
testImplementation("org.testcontainers:elasticsearch")
latestDepTestLibrary("co.elastic.clients:elasticsearch-java:8.0.+")
}
tasks {

View File

@ -22,9 +22,9 @@ dependencies {
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))
implementation(project(":instrumentation:reactor:reactor-3.1:library"))
// using 1.3.0 to be able to implement several new KafkaReceiver methods added in 1.3.3
// using 1.3 to be able to implement several new KafkaReceiver methods added in 1.3.3 and 1.3.21
// @NoMuzzle is used to ensure that this does not break muzzle checks
compileOnly("io.projectreactor.kafka:reactor-kafka:1.3.3")
compileOnly("io.projectreactor.kafka:reactor-kafka:1.3.21")
testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent"))
testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent"))
@ -60,6 +60,27 @@ testing {
}
}
}
val testV1_3_21 by registering(JvmTestSuite::class) {
dependencies {
implementation(project(":instrumentation:reactor:reactor-kafka-1.0:testing"))
if (testLatestDeps) {
implementation("io.projectreactor.kafka:reactor-kafka:+")
implementation("io.projectreactor:reactor-core:3.4.+")
} else {
implementation("io.projectreactor.kafka:reactor-kafka:1.3.21")
}
}
targets {
all {
testTask.configure {
systemProperty("hasConsumerGroupAndId", true)
}
}
}
}
}
}

View File

@ -25,7 +25,7 @@ public final class InstrumentedKafkaReceiver<K, V> implements KafkaReceiver<K, V
// added in 1.3.3
@Override
public Flux<ReceiverRecord<K, V>> receive(Integer prefetch) {
return wrap(KafkaReceiver133Access.receive(actual, prefetch));
return wrap(KafkaReceiver13Access.receive(actual, prefetch));
}
@Override
@ -36,7 +36,7 @@ public final class InstrumentedKafkaReceiver<K, V> implements KafkaReceiver<K, V
// added in 1.3.3
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
return KafkaReceiver133Access.receiveAutoAck(actual, prefetch)
return KafkaReceiver13Access.receiveAutoAck(actual, prefetch)
.map(InstrumentedKafkaReceiver::wrap);
}
@ -48,7 +48,7 @@ public final class InstrumentedKafkaReceiver<K, V> implements KafkaReceiver<K, V
// added in 1.3.3
@Override
public Flux<ConsumerRecord<K, V>> receiveAtmostOnce(Integer prefetch) {
return wrap(KafkaReceiver133Access.receiveAtmostOnce(actual, prefetch));
return wrap(KafkaReceiver13Access.receiveAtmostOnce(actual, prefetch));
}
@Override
@ -66,7 +66,7 @@ public final class InstrumentedKafkaReceiver<K, V> implements KafkaReceiver<K, V
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(
TransactionManager transactionManager, Integer prefetch) {
return KafkaReceiver133Access.receiveExactlyOnce(actual, transactionManager, prefetch)
return KafkaReceiver13Access.receiveExactlyOnce(actual, transactionManager, prefetch)
.map(InstrumentedKafkaReceiver::wrap);
}
@ -75,6 +75,19 @@ public final class InstrumentedKafkaReceiver<K, V> implements KafkaReceiver<K, V
return actual.doOnConsumer(function);
}
// added in 1.3.21
@Override
public Flux<Flux<ReceiverRecord<K, V>>> receiveBatch(Integer prefetch) {
return KafkaReceiver13Access.receiveBatch(actual, prefetch)
.map(InstrumentedKafkaReceiver::wrap);
}
// added in 1.3.21
@Override
public Flux<Flux<ReceiverRecord<K, V>>> receiveBatch() {
return KafkaReceiver13Access.receiveBatch(actual).map(InstrumentedKafkaReceiver::wrap);
}
private static <K, V, R extends ConsumerRecord<K, V>> Flux<R> wrap(Flux<R> flux) {
return flux instanceof InstrumentedKafkaFlux ? flux : new InstrumentedKafkaFlux<>(flux);
}

View File

@ -12,7 +12,7 @@ import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.TransactionManager;
final class KafkaReceiver133Access {
final class KafkaReceiver13Access {
@NoMuzzle
static <K, V> Flux<ReceiverRecord<K, V>> receive(KafkaReceiver<K, V> receiver, Integer prefetch) {
@ -37,5 +37,16 @@ final class KafkaReceiver133Access {
return receiver.receiveExactlyOnce(transactionManager, prefetch);
}
private KafkaReceiver133Access() {}
@NoMuzzle
static <K, V> Flux<Flux<ReceiverRecord<K, V>>> receiveBatch(
KafkaReceiver<K, V> receiver, Integer prefetch) {
return receiver.receiveBatch(prefetch);
}
@NoMuzzle
static <K, V> Flux<Flux<ReceiverRecord<K, V>>> receiveBatch(KafkaReceiver<K, V> receiver) {
return receiver.receiveBatch();
}
private KafkaReceiver13Access() {}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
import org.junit.jupiter.api.Test;
class ReactorKafka1321InstrumentationTest extends AbstractReactorKafkaTest {
@Test
void receiveBatch() {
testSingleRecordProcess(
recordConsumer ->
receiver
.receiveBatch()
.concatMap(r -> r)
.doOnNext(r -> r.receiverOffset().acknowledge())
.subscribe(recordConsumer));
}
@Test
void receiveBatchWithSize() {
testSingleRecordProcess(
recordConsumer ->
receiver
.receiveBatch(1)
.concatMap(r -> r)
.doOnNext(r -> r.receiverOffset().acknowledge())
.subscribe(recordConsumer));
}
}