Support Spring Kafka 3 (#7271)

Fixes #7265

I took a look at the new Observation API, and I think that it still
makes sense to continue using the interceptors to implement this
instrumentation: they implement the OTel spec (which includes way more
attributes than the default observation convention implemented in
Spring), and cooperate with the Kafka client instrumentation and link
the receive and process spans together. And it's quite a simple change
in one of our interceptors, instead of rewriting everything.

(Draft because Spring Boot 3 hasn't released yet, and it is required to
run the tests. If we're not in a hurry this PR can wait a bit for that)
This commit is contained in:
Mateusz Rzeszutek 2022-11-29 11:07:57 +01:00 committed by GitHub
parent a87d66e496
commit 69938b3f79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 149 additions and 40 deletions

View File

@ -6,7 +6,7 @@ muzzle {
pass {
group.set("org.springframework.kafka")
module.set("spring-kafka")
versions.set("[2.7.0,3)")
versions.set("[2.7.0,)")
assertInverse.set(true)
}
}
@ -27,21 +27,26 @@ dependencies {
testLibrary("org.springframework.boot:spring-boot-starter-test:2.5.3")
testLibrary("org.springframework.boot:spring-boot-starter:2.5.3")
latestDepTestLibrary("org.springframework.kafka:spring-kafka:2.+")
// TODO: temp change, will be reverted in #7271
latestDepTestLibrary("org.springframework.boot:spring-boot-starter-test:2.+")
latestDepTestLibrary("org.springframework.boot:spring-boot-starter:2.+")
}
val latestDepTest = findProperty("testLatestDeps") as Boolean
testing {
suites {
val testNoReceiveTelemetry by registering(JvmTestSuite::class) {
dependencies {
implementation("org.springframework.kafka:spring-kafka:2.7.0")
implementation(project(":instrumentation:spring:spring-kafka-2.7:testing"))
implementation("org.springframework.boot:spring-boot-starter-test:2.5.3")
implementation("org.springframework.boot:spring-boot-starter:2.5.3")
// the "library" configuration is not recognized by the test suite plugin
if (latestDepTest) {
implementation("org.springframework.kafka:spring-kafka:+")
implementation("org.springframework.boot:spring-boot-starter-test:+")
implementation("org.springframework.boot:spring-boot-starter:+")
} else {
implementation("org.springframework.kafka:spring-kafka:2.7.0")
implementation("org.springframework.boot:spring-boot-starter-test:2.5.3")
implementation("org.springframework.boot:spring-boot-starter:2.5.3")
}
}
targets {
@ -71,18 +76,28 @@ tasks {
}
}
configurations {
listOf(
testRuntimeClasspath,
named("testNoReceiveTelemetryRuntimeClasspath")
)
.forEach {
it.configure {
resolutionStrategy {
// requires old logback (and therefore also old slf4j)
force("ch.qos.logback:logback-classic:1.2.11")
force("org.slf4j:slf4j-api:1.7.36")
// spring 6 (which spring-kafka 3.+ uses) requires java 17
if (latestDepTest) {
otelJava {
minJavaVersionSupported.set(JavaVersion.VERSION_17)
}
}
// spring 6 uses slf4j 2.0
if (!latestDepTest) {
configurations {
listOf(
testRuntimeClasspath,
named("testNoReceiveTelemetryRuntimeClasspath")
)
.forEach {
it.configure {
resolutionStrategy {
// requires old logback (and therefore also old slf4j)
force("ch.qos.logback:logback-classic:1.2.11")
force("org.slf4j:slf4j-api:1.7.36")
}
}
}
}
}
}

View File

@ -49,7 +49,7 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testSingleTopic", "10", "testSpan");
send("testSingleTopic", "10", "testSpan");
return 0;
});
});
@ -113,7 +113,7 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testSingleTopic", "10", "error");
send("testSingleTopic", "10", "error");
return 0;
});
});
@ -240,7 +240,7 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testBatchTopic", "10", "error");
send("testBatchTopic", "10", "error");
return 0;
});
});

View File

@ -18,17 +18,24 @@ dependencies {
testLibrary("org.springframework.boot:spring-boot-starter-test:2.5.3")
testLibrary("org.springframework.boot:spring-boot-starter:2.5.3")
latestDepTestLibrary("org.springframework.kafka:spring-kafka:2.+")
// TODO: temp change, will be reverted in #7271
latestDepTestLibrary("org.springframework.boot:spring-boot-starter-test:2.+")
latestDepTestLibrary("org.springframework.boot:spring-boot-starter:2.+")
}
configurations.testRuntimeClasspath {
resolutionStrategy {
// requires old logback (and therefore also old slf4j)
force("ch.qos.logback:logback-classic:1.2.11")
force("org.slf4j:slf4j-api:1.7.36")
val latestDepTest = findProperty("testLatestDeps") as Boolean
// spring 6 (which spring-kafka 3.+ uses) requires java 17
if (latestDepTest) {
otelJava {
minJavaVersionSupported.set(JavaVersion.VERSION_17)
}
}
// spring 6 uses slf4j 2.0
if (!latestDepTest) {
configurations.testRuntimeClasspath {
resolutionStrategy {
// requires old logback (and therefore also old slf4j)
force("ch.qos.logback:logback-classic:1.2.11")
force("org.slf4j:slf4j-api:1.7.36")
}
}
}

View File

@ -9,6 +9,9 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -20,6 +23,22 @@ final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K,
VirtualField.find(ConsumerRecord.class, Context.class);
private static final VirtualField<ConsumerRecord<?, ?>, State<ConsumerRecord<?, ?>>> stateField =
VirtualField.find(ConsumerRecord.class, State.class);
private static final MethodHandle interceptRecord;
static {
MethodHandle interceptRecordHandle;
try {
interceptRecordHandle =
MethodHandles.lookup()
.findVirtual(
RecordInterceptor.class,
"intercept",
MethodType.methodType(ConsumerRecord.class, ConsumerRecord.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
interceptRecordHandle = null;
}
interceptRecord = interceptRecordHandle;
}
private final Instrumenter<ConsumerRecord<?, ?>, Void> processInstrumenter;
@Nullable private final RecordInterceptor<K, V> decorated;
@ -31,11 +50,25 @@ final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K,
this.decorated = decorated;
}
@SuppressWarnings("deprecation") // implementing deprecated method for better compatibility
@SuppressWarnings({
"deprecation",
"unchecked"
}) // implementing deprecated method (removed in 3.0) for better compatibility
@Override
public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record) {
if (interceptRecord == null) {
return null;
}
start(record);
return decorated == null ? record : decorated.intercept(record);
if (decorated == null) {
return null;
}
try {
return (ConsumerRecord<K, V>) interceptRecord.invoke(decorated, record);
} catch (Throwable e) {
rethrow(e);
return null; // unreachable
}
}
@Override
@ -44,6 +77,11 @@ final class InstrumentedRecordInterceptor<K, V> implements RecordInterceptor<K,
return decorated == null ? record : decorated.intercept(record, consumer);
}
@SuppressWarnings("unchecked")
private static <E extends Throwable> void rethrow(Throwable e) throws E {
throw (E) e;
}
private void start(ConsumerRecord<K, V> record) {
Context parentContext = getParentContext(record);

View File

@ -29,7 +29,7 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testSingleTopic", "10", "testSpan");
send("testSingleTopic", "10", "testSpan");
return 0;
});
});
@ -75,7 +75,7 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testSingleTopic", "10", "error");
send("testSingleTopic", "10", "error");
return 0;
});
});
@ -177,7 +177,7 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
ops.send("testBatchTopic", "10", "error");
send("testBatchTopic", "10", "error");
return 0;
});
});

View File

@ -10,10 +10,14 @@ import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.trace.data.LinkData;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@ -23,7 +27,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.concurrent.ListenableFuture;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;
@ -82,6 +88,49 @@ public abstract class AbstractSpringKafkaTest {
}
}
static final MethodHandle send;
static {
MethodHandle sendMethod = null;
Exception failure = null;
try {
sendMethod =
MethodHandles.lookup()
.findVirtual(
KafkaOperations.class,
"send",
MethodType.methodType(
ListenableFuture.class, String.class, Object.class, Object.class));
} catch (NoSuchMethodException e) {
// spring-kafka 3.0 changed the return type
try {
sendMethod =
MethodHandles.lookup()
.findVirtual(
KafkaOperations.class,
"send",
MethodType.methodType(
CompletableFuture.class, String.class, Object.class, Object.class));
} catch (NoSuchMethodException | IllegalAccessException ex) {
failure = ex;
}
} catch (IllegalAccessException e) {
failure = e;
}
if (sendMethod == null) {
throw new AssertionError("Could not find the KafkaOperations#send() method", failure);
}
send = sendMethod;
}
protected void send(String topic, String key, String data) {
try {
send.invoke(kafkaTemplate, topic, key, data);
} catch (Throwable e) {
throw new AssertionError(e);
}
}
protected void sendBatchMessages(Map<String, String> keyToData) throws InterruptedException {
// This test assumes that messages are sent and received as a batch. Occasionally it happens
// that the messages are not received as a batch, but one by one. This doesn't match what the
@ -97,7 +146,7 @@ public abstract class AbstractSpringKafkaTest {
() -> {
kafkaTemplate.executeInTransaction(
ops -> {
keyToData.forEach((key, data) -> ops.send("testBatchTopic", key, data));
keyToData.forEach((key, data) -> send("testBatchTopic", key, data));
return 0;
});
});