Instrument reactor-kafka (#8439)
This commit is contained in:
parent
f00393260c
commit
cdb08c9dca
|
@ -51,5 +51,10 @@ public final class KafkaConsumerContextUtil {
|
|||
recordsConsumerField.set(records, consumer);
|
||||
}
|
||||
|
||||
public static void copy(ConsumerRecord<?, ?> from, ConsumerRecord<?, ?> to) {
|
||||
recordContextField.set(to, recordContextField.get(from));
|
||||
recordConsumerField.set(to, recordConsumerField.get(from));
|
||||
}
|
||||
|
||||
private KafkaConsumerContextUtil() {}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
plugins {
|
||||
id("otel.javaagent-instrumentation")
|
||||
}
|
||||
|
||||
muzzle {
|
||||
pass {
|
||||
group.set("io.projectreactor.kafka")
|
||||
module.set("reactor-kafka")
|
||||
// TODO: add support for 1.3
|
||||
versions.set("[1.0.0,1.3.0)")
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
compileOnly("com.google.auto.value:auto-value-annotations")
|
||||
annotationProcessor("com.google.auto.value:auto-value")
|
||||
|
||||
bootstrap(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:bootstrap"))
|
||||
|
||||
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))
|
||||
implementation(project(":instrumentation:reactor:reactor-3.1:library"))
|
||||
|
||||
library("io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE")
|
||||
|
||||
testInstrumentation(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent"))
|
||||
testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent"))
|
||||
|
||||
testImplementation("org.testcontainers:kafka")
|
||||
|
||||
testLibrary("io.projectreactor:reactor-test:3.1.0.RELEASE")
|
||||
|
||||
latestDepTestLibrary("io.projectreactor:reactor-core:3.4.+")
|
||||
// TODO: add support for 1.3
|
||||
latestDepTestLibrary("io.projectreactor.kafka:reactor-kafka:1.2.+")
|
||||
}
|
||||
|
||||
tasks {
|
||||
test {
|
||||
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
|
||||
|
||||
jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true")
|
||||
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
|
||||
}
|
||||
|
||||
check {
|
||||
dependsOn(testing.suites)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.returns;
|
||||
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
// handles versions 1.0.0 - 1.2.+
|
||||
public class DefaultKafkaReceiverInstrumentation implements TypeInstrumentation {
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return named("reactor.kafka.receiver.internals.DefaultKafkaReceiver");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("createConsumerFlux").and(returns(named("reactor.core.publisher.Flux"))),
|
||||
this.getClass().getName() + "$CreateConsumerFluxAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class CreateConsumerFluxAdvice {
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void onExit(@Advice.Return(readOnly = false) Flux<?> flux) {
|
||||
if (!(flux instanceof TracingDisablingKafkaFlux)) {
|
||||
flux = new TracingDisablingKafkaFlux<>(flux);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,125 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0.ReactorKafkaSingletons.processInstrumenter;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext;
|
||||
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil;
|
||||
import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest;
|
||||
import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.reactivestreams.Subscription;
|
||||
import reactor.core.CoreSubscriber;
|
||||
import reactor.core.Scannable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxOperator;
|
||||
import reactor.core.publisher.Operators;
|
||||
|
||||
final class InstrumentedKafkaFlux<R extends ConsumerRecord<?, ?>> extends FluxOperator<R, R> {
|
||||
|
||||
InstrumentedKafkaFlux(Flux<R> source) {
|
||||
super(source);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void subscribe(CoreSubscriber<? super R> actual) {
|
||||
source.subscribe(new InstrumentedSubscriber((CoreSubscriber<ConsumerRecord<?, ?>>) actual));
|
||||
}
|
||||
|
||||
static final class InstrumentedSubscriber
|
||||
implements CoreSubscriber<ConsumerRecord<?, ?>>, Subscription, Scannable {
|
||||
|
||||
private final CoreSubscriber<ConsumerRecord<?, ?>> actual;
|
||||
private final Context currentContext;
|
||||
private Subscription subscription;
|
||||
|
||||
InstrumentedSubscriber(CoreSubscriber<ConsumerRecord<?, ?>> actual) {
|
||||
this.actual = actual;
|
||||
currentContext =
|
||||
ContextPropagationOperator.getOpenTelemetryContext(
|
||||
actual.currentContext(), Context.current());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Subscription s) {
|
||||
if (Operators.validate(this.subscription, s)) {
|
||||
this.subscription = s;
|
||||
|
||||
actual.onSubscribe(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public reactor.util.context.Context currentContext() {
|
||||
return actual.currentContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(ConsumerRecord<?, ?> record) {
|
||||
KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(record);
|
||||
Context receiveContext = consumerContext.getContext();
|
||||
// use the receive CONSUMER span as parent if it's available
|
||||
Context parentContext = receiveContext != null ? receiveContext : currentContext;
|
||||
|
||||
KafkaProcessRequest request = KafkaProcessRequest.create(consumerContext, record);
|
||||
if (!processInstrumenter().shouldStart(parentContext, request)) {
|
||||
actual.onNext(record);
|
||||
return;
|
||||
}
|
||||
|
||||
Context context = processInstrumenter().start(parentContext, request);
|
||||
Throwable error = null;
|
||||
try (Scope ignored = context.makeCurrent()) {
|
||||
actual.onNext(record);
|
||||
} catch (Throwable t) {
|
||||
error = t;
|
||||
throw t;
|
||||
} finally {
|
||||
processInstrumenter().end(context, request, null, error);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
try (Scope ignored = currentContext.makeCurrent()) {
|
||||
actual.onError(throwable);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
try (Scope ignored = currentContext.makeCurrent()) {
|
||||
actual.onComplete();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void request(long l) {
|
||||
subscription.request(l);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
subscription.cancel();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes") // that's how the method is defined
|
||||
@Override
|
||||
public Object scanUnsafe(Attr key) {
|
||||
if (key == Attr.ACTUAL) {
|
||||
return actual;
|
||||
}
|
||||
if (key == Attr.PARENT) {
|
||||
return subscription;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
|
||||
|
||||
import java.util.function.Function;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.kafka.receiver.KafkaReceiver;
|
||||
import reactor.kafka.receiver.ReceiverRecord;
|
||||
import reactor.kafka.sender.TransactionManager;
|
||||
|
||||
public final class InstrumentedKafkaReceiver<K, V> implements KafkaReceiver<K, V> {
|
||||
|
||||
private final KafkaReceiver<K, V> actual;
|
||||
|
||||
public InstrumentedKafkaReceiver(KafkaReceiver<K, V> actual) {
|
||||
this.actual = actual;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<ReceiverRecord<K, V>> receive() {
|
||||
return new InstrumentedKafkaFlux<>(actual.receive());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
|
||||
return actual.receiveAutoAck().map(InstrumentedKafkaFlux::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<ConsumerRecord<K, V>> receiveAtmostOnce() {
|
||||
return new InstrumentedKafkaFlux<>(actual.receiveAtmostOnce());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(
|
||||
TransactionManager transactionManager) {
|
||||
return actual.receiveAutoAck().map(InstrumentedKafkaFlux::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
|
||||
return actual.doOnConsumer(function);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.returns;
|
||||
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import reactor.kafka.receiver.KafkaReceiver;
|
||||
|
||||
public class KafkaReceiverInstrumentation implements TypeInstrumentation {
|
||||
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return named("reactor.kafka.receiver.KafkaReceiver");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("create").and(isStatic()).and(returns(named("reactor.kafka.receiver.KafkaReceiver"))),
|
||||
this.getClass().getName() + "$CreateAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class CreateAdvice {
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void onExit(@Advice.Return(readOnly = false) KafkaReceiver<?, ?> receiver) {
|
||||
if (!(receiver instanceof InstrumentedKafkaReceiver)) {
|
||||
receiver = new InstrumentedKafkaReceiver<>(receiver);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import java.util.List;
|
||||
|
||||
@AutoService(InstrumentationModule.class)
|
||||
public class ReactorKafkaInstrumentationModule extends InstrumentationModule {
|
||||
|
||||
public ReactorKafkaInstrumentationModule() {
|
||||
super("reactor-kafka", "reactor-kafka-1.0");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return asList(
|
||||
new KafkaReceiverInstrumentation(),
|
||||
new ReceiverRecordInstrumentation(),
|
||||
new DefaultKafkaReceiverInstrumentation());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
|
||||
import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest;
|
||||
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
|
||||
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
|
||||
|
||||
final class ReactorKafkaSingletons {
|
||||
|
||||
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.reactor-kafka-1.0";
|
||||
|
||||
private static final Instrumenter<KafkaProcessRequest, Void> PROCESS_INSTRUMENTER =
|
||||
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
|
||||
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
|
||||
.setCaptureExperimentalSpanAttributes(
|
||||
InstrumentationConfig.get()
|
||||
.getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false))
|
||||
.setMessagingReceiveInstrumentationEnabled(
|
||||
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
|
||||
.createConsumerProcessInstrumenter();
|
||||
|
||||
public static Instrumenter<KafkaProcessRequest, Void> processInstrumenter() {
|
||||
return PROCESS_INSTRUMENTER;
|
||||
}
|
||||
|
||||
private ReactorKafkaSingletons() {}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
|
||||
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
|
||||
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import reactor.kafka.receiver.ReceiverRecord;
|
||||
|
||||
public class ReceiverRecordInstrumentation implements TypeInstrumentation {
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return named("reactor.kafka.receiver.ReceiverRecord");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
isConstructor()
|
||||
.and(takesArgument(0, named("org.apache.kafka.clients.consumer.ConsumerRecord"))),
|
||||
this.getClass().getName() + "$ConstructorAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class ConstructorAdvice {
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void onExit(
|
||||
@Advice.This ReceiverRecord<?, ?> copy, @Advice.Argument(0) ConsumerRecord<?, ?> original) {
|
||||
KafkaConsumerContextUtil.copy(original, copy);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
|
||||
|
||||
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
|
||||
import org.reactivestreams.Subscription;
|
||||
import reactor.core.CoreSubscriber;
|
||||
import reactor.core.Scannable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxOperator;
|
||||
import reactor.core.publisher.Operators;
|
||||
|
||||
public final class TracingDisablingKafkaFlux<T> extends FluxOperator<T, T> {
|
||||
|
||||
public TracingDisablingKafkaFlux(Flux<? extends T> source) {
|
||||
super(source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(CoreSubscriber<? super T> actual) {
|
||||
source.subscribe(new TracingDisablingSubscriber<>(actual));
|
||||
}
|
||||
|
||||
static final class TracingDisablingSubscriber<T>
|
||||
implements CoreSubscriber<T>, Subscription, Scannable {
|
||||
|
||||
private final CoreSubscriber<T> actual;
|
||||
private Subscription subscription;
|
||||
|
||||
TracingDisablingSubscriber(CoreSubscriber<T> actual) {
|
||||
this.actual = actual;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSubscribe(Subscription s) {
|
||||
if (Operators.validate(this.subscription, s)) {
|
||||
this.subscription = s;
|
||||
|
||||
actual.onSubscribe(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public reactor.util.context.Context currentContext() {
|
||||
return actual.currentContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(T record) {
|
||||
boolean previous = KafkaClientsConsumerProcessTracing.setEnabled(false);
|
||||
try {
|
||||
actual.onNext(record);
|
||||
} finally {
|
||||
KafkaClientsConsumerProcessTracing.setEnabled(previous);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable throwable) {
|
||||
actual.onError(throwable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
actual.onComplete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void request(long l) {
|
||||
subscription.request(l);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
subscription.cancel();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes") // that's how the method is defined
|
||||
@Override
|
||||
public Object scanUnsafe(Attr key) {
|
||||
if (key == Attr.ACTUAL) {
|
||||
return actual;
|
||||
}
|
||||
if (key == Attr.PARENT) {
|
||||
return subscription;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,171 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
|
||||
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
|
||||
import static java.util.Collections.singleton;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.assertj.core.api.AbstractLongAssert;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.KafkaContainer;
|
||||
import org.testcontainers.containers.output.Slf4jLogConsumer;
|
||||
import org.testcontainers.containers.wait.strategy.Wait;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
import reactor.kafka.receiver.KafkaReceiver;
|
||||
import reactor.kafka.receiver.ReceiverOptions;
|
||||
import reactor.kafka.sender.KafkaSender;
|
||||
import reactor.kafka.sender.SenderOptions;
|
||||
|
||||
abstract class AbstractReactorKafkaTest {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(AbstractReactorKafkaTest.class);
|
||||
|
||||
@RegisterExtension
|
||||
protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||
|
||||
static KafkaContainer kafka;
|
||||
static KafkaSender<String, String> sender;
|
||||
static KafkaReceiver<String, String> receiver;
|
||||
|
||||
@BeforeAll
|
||||
static void setUpAll() {
|
||||
kafka =
|
||||
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.10"))
|
||||
.withEnv("KAFKA_HEAP_OPTS", "-Xmx256m")
|
||||
.withLogConsumer(new Slf4jLogConsumer(logger))
|
||||
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1))
|
||||
.withStartupTimeout(Duration.ofMinutes(1));
|
||||
kafka.start();
|
||||
|
||||
sender = KafkaSender.create(SenderOptions.create(producerProps()));
|
||||
receiver =
|
||||
KafkaReceiver.create(
|
||||
ReceiverOptions.<String, String>create(consumerProps())
|
||||
.subscription(singleton("testTopic")));
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void tearDownAll() {
|
||||
if (sender != null) {
|
||||
sender.close();
|
||||
}
|
||||
kafka.stop();
|
||||
}
|
||||
|
||||
private static Properties producerProps() {
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", kafka.getBootstrapServers());
|
||||
props.put("retries", 0);
|
||||
props.put("key.serializer", StringSerializer.class);
|
||||
props.put("value.serializer", StringSerializer.class);
|
||||
return props;
|
||||
}
|
||||
|
||||
private static Properties consumerProps() {
|
||||
Properties props = new Properties();
|
||||
props.put("bootstrap.servers", kafka.getBootstrapServers());
|
||||
props.put("group.id", "test");
|
||||
props.put("enable.auto.commit", true);
|
||||
props.put("auto.commit.interval.ms", 10);
|
||||
props.put("session.timeout.ms", 30000);
|
||||
props.put("auto.offset.reset", "earliest");
|
||||
props.put("key.deserializer", StringDeserializer.class);
|
||||
props.put("value.deserializer", StringDeserializer.class);
|
||||
return props;
|
||||
}
|
||||
|
||||
protected static List<AttributeAssertion> sendAttributes(ProducerRecord<String, String> record) {
|
||||
List<AttributeAssertion> assertions =
|
||||
new ArrayList<>(
|
||||
Arrays.asList(
|
||||
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
|
||||
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, record.topic()),
|
||||
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
|
||||
satisfies(
|
||||
SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID,
|
||||
stringAssert -> stringAssert.startsWith("producer")),
|
||||
satisfies(
|
||||
SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
|
||||
AbstractLongAssert::isNotNegative),
|
||||
satisfies(
|
||||
SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
|
||||
AbstractLongAssert::isNotNegative)));
|
||||
String messageKey = record.key();
|
||||
if (messageKey != null) {
|
||||
assertions.add(equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, messageKey));
|
||||
}
|
||||
return assertions;
|
||||
}
|
||||
|
||||
protected static List<AttributeAssertion> receiveAttributes(String topic) {
|
||||
return new ArrayList<>(
|
||||
Arrays.asList(
|
||||
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
|
||||
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, topic),
|
||||
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
|
||||
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
|
||||
satisfies(
|
||||
SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID,
|
||||
stringAssert -> stringAssert.startsWith("consumer"))));
|
||||
}
|
||||
|
||||
protected static List<AttributeAssertion> processAttributes(
|
||||
ProducerRecord<String, String> record) {
|
||||
List<AttributeAssertion> assertions =
|
||||
new ArrayList<>(
|
||||
Arrays.asList(
|
||||
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
|
||||
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, record.topic()),
|
||||
equalTo(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"),
|
||||
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
|
||||
satisfies(
|
||||
SemanticAttributes.MESSAGING_KAFKA_CLIENT_ID,
|
||||
stringAssert -> stringAssert.startsWith("consumer")),
|
||||
satisfies(
|
||||
SemanticAttributes.MESSAGING_KAFKA_SOURCE_PARTITION,
|
||||
AbstractLongAssert::isNotNegative),
|
||||
satisfies(
|
||||
SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
|
||||
AbstractLongAssert::isNotNegative)));
|
||||
if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) {
|
||||
assertions.add(
|
||||
satisfies(
|
||||
AttributeKey.longKey("kafka.record.queue_time_ms"),
|
||||
AbstractLongAssert::isNotNegative));
|
||||
}
|
||||
String messageKey = record.key();
|
||||
if (messageKey != null) {
|
||||
assertions.add(equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, messageKey));
|
||||
}
|
||||
String messageValue = record.value();
|
||||
if (messageValue != null) {
|
||||
assertions.add(
|
||||
equalTo(
|
||||
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
|
||||
messageValue.getBytes(StandardCharsets.UTF_8).length));
|
||||
}
|
||||
return assertions;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.reactor.kafka.v1_0;
|
||||
|
||||
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
|
||||
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.instrumentation.testing.internal.AutoCleanupExtension;
|
||||
import io.opentelemetry.sdk.trace.data.LinkData;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.kafka.sender.SenderRecord;
|
||||
|
||||
public class ReactorKafkaInstrumentationTest extends AbstractReactorKafkaTest {
|
||||
|
||||
@RegisterExtension static final AutoCleanupExtension cleanup = AutoCleanupExtension.create();
|
||||
|
||||
@Test
|
||||
void shouldCreateSpansForSingleRecordProcess() {
|
||||
Disposable disposable =
|
||||
receiver.receive().subscribe(record -> testing.runWithSpan("consumer", () -> {}));
|
||||
cleanup.deferCleanup(disposable::dispose);
|
||||
|
||||
SenderRecord<String, String, Object> record =
|
||||
SenderRecord.create("testTopic", 0, null, "10", "testSpan", null);
|
||||
Flux<?> producer = sender.send(Flux.just(record));
|
||||
testing.runWithSpan("producer", () -> producer.blockLast(Duration.ofSeconds(30)));
|
||||
|
||||
AtomicReference<SpanData> producerSpan = new AtomicReference<>();
|
||||
|
||||
testing.waitAndAssertSortedTraces(
|
||||
orderByRootSpanKind(SpanKind.INTERNAL, SpanKind.CONSUMER),
|
||||
trace -> {
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span -> span.hasName("producer"),
|
||||
span ->
|
||||
span.hasName("testTopic send")
|
||||
.hasKind(SpanKind.PRODUCER)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributesSatisfyingExactly(sendAttributes(record)));
|
||||
|
||||
producerSpan.set(trace.getSpan(1));
|
||||
},
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("testTopic receive")
|
||||
.hasKind(SpanKind.CONSUMER)
|
||||
.hasNoParent()
|
||||
.hasAttributesSatisfyingExactly(receiveAttributes("testTopic")),
|
||||
span ->
|
||||
span.hasName("testTopic process")
|
||||
.hasKind(SpanKind.CONSUMER)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
|
||||
.hasAttributesSatisfyingExactly(processAttributes(record)),
|
||||
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
|
||||
}
|
||||
}
|
|
@ -419,6 +419,7 @@ hideFromDependabot(":instrumentation:ratpack:ratpack-1.7:library")
|
|||
hideFromDependabot(":instrumentation:reactor:reactor-3.1:javaagent")
|
||||
hideFromDependabot(":instrumentation:reactor:reactor-3.1:library")
|
||||
hideFromDependabot(":instrumentation:reactor:reactor-3.1:testing")
|
||||
hideFromDependabot(":instrumentation:reactor:reactor-kafka-1.0:javaagent")
|
||||
hideFromDependabot(":instrumentation:reactor:reactor-netty:reactor-netty-0.9:javaagent")
|
||||
hideFromDependabot(":instrumentation:reactor:reactor-netty:reactor-netty-1.0:javaagent")
|
||||
hideFromDependabot(":instrumentation:reactor:reactor-netty:reactor-netty-1.0:javaagent-unit-tests")
|
||||
|
|
Loading…
Reference in New Issue