convert kafka stream test from groovy to java (#12437)

Co-authored-by: Jay DeLuca <jaydeluca4@gmail.com>
Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
This commit is contained in:
shalk(xiao kun) 2024-10-30 23:29:57 +08:00 committed by GitHub
parent 0b9dffaf98
commit 3b0e7b8ef2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 782 additions and 540 deletions

View File

@ -1,122 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.IntegerDeserializer
import org.apache.kafka.common.serialization.IntegerSerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.kafka.KafkaContainer
import org.testcontainers.utility.DockerImageName
import spock.lang.Shared
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
class KafkaStreamsBaseTest extends AgentInstrumentationSpecification {
private static final Logger logger = LoggerFactory.getLogger("io.opentelemetry.KafkaStreamsBaseTest")
protected static final STREAM_PENDING = "test.pending"
protected static final STREAM_PROCESSED = "test.processed"
@Shared
static KafkaContainer kafka
@Shared
static Producer<Integer, String> producer
@Shared
static Consumer<Integer, String> consumer
@Shared
static CountDownLatch consumerReady = new CountDownLatch(1)
def setupSpec() {
kafka = new KafkaContainer(DockerImageName.parse("apache/kafka:3.8.0"))
.withEnv("KAFKA_HEAP_OPTS", "-Xmx256m")
.withLogConsumer(new Slf4jLogConsumer(logger))
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.Kafka.*Server\\).*", 1))
.withStartupTimeout(Duration.ofMinutes(1))
kafka.start()
// create test topic
AdminClient.create(["bootstrap.servers": kafka.bootstrapServers]).withCloseable { admin ->
admin.createTopics([
new NewTopic(STREAM_PENDING, 1, (short) 1),
new NewTopic(STREAM_PROCESSED, 1, (short) 1),
]).all().get(10, TimeUnit.SECONDS)
}
producer = new KafkaProducer<>(producerProps(kafka.bootstrapServers))
def consumerProps = [
"bootstrap.servers" : kafka.bootstrapServers,
"group.id" : "test",
"enable.auto.commit" : "true",
"auto.commit.interval.ms": "10",
"session.timeout.ms" : "30000",
"key.deserializer" : IntegerDeserializer,
"value.deserializer" : StringDeserializer
]
consumer = new KafkaConsumer<>(consumerProps)
consumer.subscribe([STREAM_PROCESSED], new ConsumerRebalanceListener() {
@Override
void onPartitionsRevoked(Collection<TopicPartition> collection) {
}
@Override
void onPartitionsAssigned(Collection<TopicPartition> collection) {
consumerReady.countDown()
}
})
}
def cleanupSpec() {
consumer?.close()
producer?.close()
kafka.stop()
}
static Map<String, Object> producerProps(String servers) {
// values copied from spring's KafkaTestUtils
return [
"bootstrap.servers": servers,
"retries" : 0,
"batch.size" : "16384",
"linger.ms" : 1,
"buffer.memory" : "33554432",
"key.serializer" : IntegerSerializer,
"value.serializer" : StringSerializer
]
}
// Kafka's eventual consistency behavior forces us to do a couple of empty poll() calls until it gets properly assigned a topic partition
static void awaitUntilConsumerIsReady() {
if (consumerReady.await(0, TimeUnit.SECONDS)) {
return
}
for (i in 0..<10) {
consumer.poll(0)
if (consumerReady.await(1, TimeUnit.SECONDS)) {
break
}
}
if (consumerReady.getCount() != 0) {
throw new AssertionError("Consumer wasn't assigned any partitions!")
}
consumer.seekToBeginning([])
}
}

View File

@ -1,232 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapGetter
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.ValueMapper
import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
def "test kafka produce and consume with streams in-between"() {
setup:
def config = new Properties()
config.putAll(producerProps(KafkaStreamsBaseTest.kafka.bootstrapServers))
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName())
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
// CONFIGURE PROCESSOR
def builder
try {
// Different class names for test and latestDepTest.
builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance()
} catch (ClassNotFoundException | NoClassDefFoundError e) {
builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance()
}
KStream<Integer, String> textLines = builder.stream(STREAM_PENDING)
def values = textLines
.mapValues(new ValueMapper<String, String>() {
@Override
String apply(String textLine) {
Span.current().setAttribute("asdf", "testing")
return textLine.toLowerCase()
}
})
KafkaStreams streams
try {
// Different api for test and latestDepTest.
values.to(Serdes.Integer(), Serdes.String(), STREAM_PROCESSED)
streams = new KafkaStreams(builder, config)
} catch (MissingMethodException e) {
def producer = Class.forName("org.apache.kafka.streams.kstream.Produced")
.with(Serdes.Integer(), Serdes.String())
values.to(STREAM_PROCESSED, producer)
streams = new KafkaStreams(builder.build(), config)
}
streams.start()
when:
String greeting = "TESTING TESTING 123!"
KafkaStreamsBaseTest.producer.send(new ProducerRecord<>(STREAM_PENDING, 10, greeting))
then:
awaitUntilConsumerIsReady()
def records = KafkaStreamsBaseTest.consumer.poll(Duration.ofSeconds(10).toMillis())
Headers receivedHeaders = null
for (record in records) {
Span.current().setAttribute("testing", 123)
assert record.key() == 10
assert record.value() == greeting.toLowerCase()
if (receivedHeaders == null) {
receivedHeaders = record.headers()
}
}
assertTraces(3) {
traces.sort(orderByRootSpanName(
STREAM_PENDING + " publish",
STREAM_PENDING + " receive",
STREAM_PROCESSED + " receive"))
SpanData producerPending, producerProcessed
trace(0, 1) {
// kafka-clients PRODUCER
span(0) {
name STREAM_PENDING + " publish"
kind PRODUCER
hasNoParent()
attributes {
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"messaging.client_id" { it.startsWith("producer") }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
}
}
producerPending = span(0)
}
trace(1, 3) {
// kafka-clients CONSUMER receive
span(0) {
name STREAM_PENDING + " receive"
kind CONSUMER
hasNoParent()
attributes {
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "receive"
"messaging.client_id" { it.endsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT" 1
if (Boolean.getBoolean("testLatestDeps")) {
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test-application"
}
}
}
// kafka-stream CONSUMER
span(1) {
name STREAM_PENDING + " process"
kind CONSUMER
childOf span(0)
hasLink(producerPending)
attributes {
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"messaging.client_id" { it.endsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
"kafka.record.queue_time_ms" { it >= 0 }
"asdf" "testing"
if (Boolean.getBoolean("testLatestDeps")) {
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test-application"
}
}
}
// kafka-clients PRODUCER
span(2) {
name STREAM_PROCESSED + " publish"
kind PRODUCER
childOf span(1)
attributes {
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"messaging.client_id" { it.endsWith("producer") }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
}
}
producerProcessed = span(2)
}
trace(2, 2) {
// kafka-clients CONSUMER receive
span(0) {
name STREAM_PROCESSED + " receive"
kind CONSUMER
hasNoParent()
attributes {
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "receive"
"messaging.client_id" { it.startsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT" 1
if (Boolean.getBoolean("testLatestDeps")) {
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test"
}
}
}
// kafka-clients CONSUMER process
span(1) {
name STREAM_PROCESSED + " process"
kind CONSUMER
childOf span(0)
hasLink producerProcessed
attributes {
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"messaging.client_id" { it.startsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
if (Boolean.getBoolean("testLatestDeps")) {
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test"
}
"kafka.record.queue_time_ms" { it >= 0 }
"testing" 123
}
}
}
}
receivedHeaders.iterator().hasNext()
def traceparent = new String(receivedHeaders.headers("traceparent").iterator().next().value())
Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter<String>() {
@Override
Iterable<String> keys(String carrier) {
return Collections.singleton("traceparent")
}
@Override
String get(String carrier, String key) {
if (key == "traceparent") {
return traceparent
}
return null
}
})
def spanContext = Span.fromContext(context).getSpanContext()
def streamTrace = traces.find { it.size() == 3 }
def streamSendSpan = streamTrace[2]
spanContext.traceId == streamSendSpan.traceId
spanContext.spanId == streamSendSpan.spanId
}
}

View File

@ -1,186 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapGetter
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.ValueMapper
import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
def "test kafka produce and consume with streams in-between"() {
setup:
def config = new Properties()
config.putAll(producerProps(KafkaStreamsBaseTest.kafka.bootstrapServers))
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName())
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
// CONFIGURE PROCESSOR
def builder
try {
// Different class names for test and latestDepTest.
builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance()
} catch (ClassNotFoundException | NoClassDefFoundError e) {
builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance()
}
KStream<Integer, String> textLines = builder.stream(STREAM_PENDING)
def values = textLines
.mapValues(new ValueMapper<String, String>() {
@Override
String apply(String textLine) {
Span.current().setAttribute("asdf", "testing")
return textLine.toLowerCase()
}
})
KafkaStreams streams
try {
// Different api for test and latestDepTest.
values.to(Serdes.Integer(), Serdes.String(), STREAM_PROCESSED)
streams = new KafkaStreams(builder, config)
} catch (MissingMethodException e) {
def producer = Class.forName("org.apache.kafka.streams.kstream.Produced")
.with(Serdes.Integer(), Serdes.String())
values.to(STREAM_PROCESSED, producer)
streams = new KafkaStreams(builder.build(), config)
}
streams.start()
when:
String greeting = "TESTING TESTING 123!"
KafkaStreamsBaseTest.producer.send(new ProducerRecord<>(STREAM_PENDING, 10, greeting))
then:
// check that the message was received
def records = KafkaStreamsBaseTest.consumer.poll(Duration.ofSeconds(10).toMillis())
Headers receivedHeaders = null
for (record in records) {
Span.current().setAttribute("testing", 123)
assert record.key() == 10
assert record.value() == greeting.toLowerCase()
if (receivedHeaders == null) {
receivedHeaders = record.headers()
}
}
SpanData streamSendSpan
assertTraces(1) {
trace(0, 4) {
// kafka-clients PRODUCER
span(0) {
name STREAM_PENDING + " publish"
kind PRODUCER
hasNoParent()
attributes {
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"messaging.client_id" "producer-1"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
}
}
// kafka-stream CONSUMER
span(1) {
name STREAM_PENDING + " process"
kind CONSUMER
childOf span(0)
attributes {
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"messaging.client_id" { it.endsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
"kafka.record.queue_time_ms" { it >= 0 }
"asdf" "testing"
if (Boolean.getBoolean("testLatestDeps")) {
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test-application"
}
}
}
streamSendSpan = span(2)
// kafka-clients PRODUCER
span(2) {
name STREAM_PROCESSED + " publish"
kind PRODUCER
childOf span(1)
attributes {
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"messaging.client_id" String
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
}
}
// kafka-clients CONSUMER process
span(3) {
name STREAM_PROCESSED + " process"
kind CONSUMER
childOf span(2)
attributes {
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "kafka"
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"messaging.client_id" { it.startsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
if (Boolean.getBoolean("testLatestDeps")) {
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP" "test"
}
"kafka.record.queue_time_ms" { it >= 0 }
"testing" 123
}
}
}
}
receivedHeaders.iterator().hasNext()
def traceparent = new String(receivedHeaders.headers("traceparent").iterator().next().value())
Context context = W3CTraceContextPropagator.instance.extract(Context.root(), "", new TextMapGetter<String>() {
@Override
Iterable<String> keys(String carrier) {
return Collections.singleton("traceparent")
}
@Override
String get(String carrier, String key) {
if (key == "traceparent") {
return traceparent
}
return null
}
})
def spanContext = Span.fromContext(context).getSpanContext()
spanContext.traceId == streamSendSpan.traceId
spanContext.spanId == streamSendSpan.spanId
}
}

View File

@ -0,0 +1,183 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import com.google.common.collect.ImmutableMap;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
abstract class KafkaStreamsBaseTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
protected static final AttributeKey<String> MESSAGING_CLIENT_ID =
AttributeKey.stringKey("messaging.client_id");
protected static final String STREAM_PENDING = "test.pending";
protected static final String STREAM_PROCESSED = "test.processed";
static KafkaContainer kafka;
static Producer<Integer, String> producer;
static Consumer<Integer, String> consumer;
static CountDownLatch consumerReady = new CountDownLatch(1);
@BeforeAll
static void setup() throws ExecutionException, InterruptedException, TimeoutException {
kafka =
new KafkaContainer(DockerImageName.parse("apache/kafka:3.8.0"))
.withEnv("KAFKA_HEAP_OPTS", "-Xmx256m")
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.Kafka.*Server\\).*", 1))
.withStartupTimeout(Duration.ofMinutes(1));
kafka.start();
// create test topic
try (AdminClient adminClient =
AdminClient.create(ImmutableMap.of("bootstrap.servers", kafka.getBootstrapServers()))) {
adminClient
.createTopics(
asList(
new NewTopic(STREAM_PENDING, 1, (short) 1),
new NewTopic(STREAM_PROCESSED, 1, (short) 1)))
.all()
.get(10, TimeUnit.SECONDS);
}
producer = new KafkaProducer<>(producerProps(kafka.getBootstrapServers()));
Map<String, Object> consumerProps =
ImmutableMap.of(
"bootstrap.servers",
kafka.getBootstrapServers(),
"group.id",
"test",
"enable.auto.commit",
"true",
"auto.commit.interval.ms",
"10",
"session.timeout.ms",
"30000",
"key.deserializer",
IntegerDeserializer.class,
"value.deserializer",
StringDeserializer.class);
consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(
singleton(STREAM_PROCESSED),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
consumerReady.countDown();
}
});
}
@AfterAll
static void cleanup() {
consumer.close();
producer.close();
kafka.stop();
}
static Map<String, Object> producerProps(String servers) {
// values copied from spring's KafkaTestUtils
return ImmutableMap.of(
"bootstrap.servers",
servers,
"retries",
0,
"batch.size",
"16384",
"linger.ms",
1,
"buffer.memory",
"33554432",
"key.serializer",
IntegerSerializer.class,
"value.serializer",
StringSerializer.class);
}
// Kafka's eventual consistency behavior forces us to do a couple of empty poll() calls until it
// gets properly assigned a topic partition
@SuppressWarnings("PreferJavaTimeOverload")
static void awaitUntilConsumerIsReady() throws InterruptedException {
if (consumerReady.await(0, TimeUnit.SECONDS)) {
return;
}
for (int i = 0; i < 10; i++) {
consumer.poll(0);
if (consumerReady.await(1, TimeUnit.SECONDS)) {
break;
}
}
if (consumerReady.getCount() != 0) {
throw new AssertionError("Consumer wasn't assigned any partitions!");
}
consumer.seekToBeginning(Collections.emptyList());
}
static Context getContext(Headers headers) {
String traceparent =
new String(
headers.headers("traceparent").iterator().next().value(), StandardCharsets.UTF_8);
return W3CTraceContextPropagator.getInstance()
.extract(
Context.root(),
"",
new TextMapGetter<String>() {
@Override
public String get(String carrier, String key) {
if ("traceparent".equals(key)) {
return traceparent;
}
return null;
}
@Override
public Iterable<String> keys(String carrier) {
return Collections.singleton("traceparent");
}
});
}
}

View File

@ -0,0 +1,282 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
import static io.opentelemetry.api.common.AttributeKey.longKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil;
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
@SuppressWarnings("deprecation") // using deprecated semconv
@DisplayName("test kafka produce and consume with streams in-between")
@Test
void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
Properties config = new Properties();
config.putAll(producerProps(KafkaStreamsBaseTest.kafka.getBootstrapServers()));
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
config.put(
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// CONFIGURE PROCESSOR
KafkaStreamsReflectionUtil.StreamBuilder streamBuilder =
KafkaStreamsReflectionUtil.createBuilder();
KStream<Integer, String> textLines = streamBuilder.stream(STREAM_PENDING);
KStream<Integer, String> values =
textLines.mapValues(
textLine -> {
Span.current().setAttribute("asdf", "testing");
return textLine.toLowerCase(Locale.ROOT);
});
KafkaStreams streams = streamBuilder.createStreams(values, config, STREAM_PROCESSED);
streams.start();
String greeting = "TESTING TESTING 123!";
KafkaStreamsBaseTest.producer.send(new ProducerRecord<>(STREAM_PENDING, 10, greeting));
awaitUntilConsumerIsReady();
@SuppressWarnings("PreferJavaTimeOverload")
ConsumerRecords<Integer, String> records =
KafkaStreamsBaseTest.consumer.poll(Duration.ofSeconds(10).toMillis());
Headers receivedHeaders = null;
for (ConsumerRecord<Integer, String> record : records) {
Span.current().setAttribute("testing", 123);
assertThat(record.key()).isEqualTo(10);
assertThat(record.value()).isEqualTo(greeting.toLowerCase(Locale.ROOT));
if (receivedHeaders == null) {
receivedHeaders = record.headers();
}
}
assertThat(receivedHeaders).isNotEmpty();
SpanContext receivedContext = Span.fromContext(getContext(receivedHeaders)).getSpanContext();
AtomicReference<SpanData> producerPendingRef = new AtomicReference<>();
AtomicReference<SpanData> producerProcessedRef = new AtomicReference<>();
// Add your assertTraces logic here
testing.waitAndAssertSortedTraces(
TelemetryDataUtil.orderByRootSpanName(
STREAM_PENDING + " publish",
STREAM_PENDING + " receive",
STREAM_PROCESSED + " receive"),
trace -> {
trace.hasSpansSatisfyingExactly(
// kafka-clients PRODUCER
span ->
span.hasName(STREAM_PENDING + " publish")
.hasKind(SpanKind.PRODUCER)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
MessagingIncubatingAttributes.MessagingSystemIncubatingValues.KAFKA),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
STREAM_PENDING),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
k -> k.isInstanceOf(String.class)),
equalTo(MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0),
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10")));
producerPendingRef.set(trace.getSpan(0));
},
trace -> {
trace.hasSpansSatisfyingExactly(
// kafka-clients CONSUMER receive
span -> {
List<AttributeAssertion> assertions =
new ArrayList<>(
asList(
equalTo(
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
.KAFKA),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
STREAM_PENDING),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "receive"),
satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("consumer")),
equalTo(
MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)));
if (Boolean.getBoolean("testLatestDeps")) {
assertions.add(
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP,
"test-application"));
}
span.hasName(STREAM_PENDING + " receive")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasAttributesSatisfyingExactly(assertions);
},
// kafka-stream CONSUMER
span -> {
List<AttributeAssertion> assertions =
new ArrayList<>(
asList(
equalTo(
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
.KAFKA),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
STREAM_PENDING),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"),
satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("consumer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
k -> k.isInstanceOf(Long.class)),
satisfies(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
k -> k.isInstanceOf(String.class)),
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0),
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"),
satisfies(
longKey("kafka.record.queue_time_ms"),
k -> k.isGreaterThanOrEqualTo(0)),
equalTo(stringKey("asdf"), "testing")));
if (Boolean.getBoolean("testLatestDeps")) {
assertions.add(
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP,
"test-application"));
}
span.hasName(STREAM_PENDING + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producerPendingRef.get().getSpanContext()))
.hasAttributesSatisfyingExactly(assertions);
},
// kafka-clients PRODUCER
span ->
span.hasName(STREAM_PROCESSED + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(1))
.hasTraceId(receivedContext.getTraceId())
.hasSpanId(receivedContext.getSpanId())
.hasAttributesSatisfyingExactly(
equalTo(
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
MessagingIncubatingAttributes.MessagingSystemIncubatingValues.KAFKA),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
STREAM_PROCESSED),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("producer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
k -> k.isInstanceOf(String.class)),
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0)));
producerProcessedRef.set(trace.getSpan(2));
},
trace ->
trace.hasSpansSatisfyingExactly(
// kafka-clients CONSUMER receive
span -> {
List<AttributeAssertion> assertions =
new ArrayList<>(
asList(
equalTo(
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
.KAFKA),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
STREAM_PROCESSED),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "receive"),
satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("consumer")),
equalTo(
MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1)));
if (Boolean.getBoolean("testLatestDeps")) {
assertions.add(
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"));
}
span.hasName(STREAM_PROCESSED + " receive")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasAttributesSatisfyingExactly(assertions);
},
// kafka-clients CONSUMER process
span -> {
List<AttributeAssertion> assertions =
new ArrayList<>(
asList(
equalTo(
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
.KAFKA),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
STREAM_PROCESSED),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"),
satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("consumer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
k -> k.isInstanceOf(Long.class)),
satisfies(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
k -> k.isInstanceOf(String.class)),
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0),
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"),
satisfies(
longKey("kafka.record.queue_time_ms"),
k -> k.isGreaterThanOrEqualTo(0)),
equalTo(longKey("testing"), 123)));
if (Boolean.getBoolean("testLatestDeps")) {
assertions.add(
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"));
}
span.hasName(STREAM_PROCESSED + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producerProcessedRef.get().getSpanContext()))
.hasAttributesSatisfyingExactly(assertions);
}));
}
}

View File

@ -0,0 +1,105 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;
/**
* Kafka streams reflection util which is used to be compatible with different versions of kafka
* streams.
*/
class KafkaStreamsReflectionUtil {
private KafkaStreamsReflectionUtil() {}
static class StreamBuilder {
private final Object builder;
StreamBuilder(Object builder) {
this.builder = builder;
}
@SuppressWarnings("unchecked")
KStream<Integer, String> stream(String topic)
throws Exception { // Different api for test and latestDepTest.
Method method;
Object[] arguments;
try {
// equivalent to:
// ((org.apache.kafka.streams.kstream.KStreamBuilder)builder).stream(STREAM_PENDING);
method = builder.getClass().getMethod("stream", String[].class);
String[] topics = new String[] {topic};
arguments = new Object[] {topics};
} catch (Exception exception) {
// equivalent to:
// ((org.apache.kafka.streams.StreamsBuilder)builder).stream(STREAM_PENDING);
method = builder.getClass().getMethod("stream", String.class);
arguments = new Object[] {topic};
}
return (KStream<Integer, String>) method.invoke(builder, arguments);
}
KafkaStreams createStreams(KStream<Integer, String> values, Properties config, String topic)
throws Exception {
Constructor<?> constructor;
// Different api for test and latestDepTest.
try {
// equivalent to:
// values.to(Serdes.Integer(), Serdes.String(), STREAM_PROCESSED);
// return new KafkaStreams(builder, config);
KStream.class
.getMethod("to", Serde.class, Serde.class, String.class)
.invoke(values, Serdes.Integer(), Serdes.String(), topic);
Class<?> topologyBuilderClass =
Class.forName("org.apache.kafka.streams.processor.TopologyBuilder");
constructor = KafkaStreams.class.getConstructor(topologyBuilderClass, Properties.class);
} catch (Exception exception) {
constructor = null;
}
if (constructor != null) {
return (KafkaStreams) constructor.newInstance(builder, config);
}
// equivalent to:
// Produced<Integer, String> produced = Produced.with(Serdes.Integer(), Serdes.String());
// values.to(STREAM_PROCESSED, produced);
//
// Topology topology = builder.build();
// new KafkaStreams(topology, props);
Class<?> producedClass = Class.forName("org.apache.kafka.streams.kstream.Produced");
Method producedWith = producedClass.getMethod("with", Serde.class, Serde.class);
Object producer = producedWith.invoke(null, Serdes.Integer(), Serdes.String());
KStream.class.getMethod("to", String.class, producedClass).invoke(values, topic, producer);
Object topology = builder.getClass().getMethod("build").invoke(builder);
Class<?> topologyClass = Class.forName("org.apache.kafka.streams.Topology");
constructor = KafkaStreams.class.getConstructor(topologyClass, Properties.class);
return (KafkaStreams) constructor.newInstance(topology, config);
}
}
static StreamBuilder createBuilder() throws Exception {
Class<?> builderClass;
try {
// Different class names for test and latestDepTest.
builderClass = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder");
} catch (Exception e) {
builderClass = Class.forName("org.apache.kafka.streams.StreamsBuilder");
}
return new StreamBuilder(builderClass.getConstructor().newInstance());
}
}

View File

@ -0,0 +1,212 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkastreams;
import static io.opentelemetry.api.common.AttributeKey.longKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
@SuppressWarnings("deprecation") // using deprecated semconv
@DisplayName("test kafka produce and consume with streams in-between")
@Test
void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception {
Properties config = new Properties();
config.putAll(producerProps(KafkaStreamsBaseTest.kafka.getBootstrapServers()));
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
config.put(
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// CONFIGURE PROCESSOR
KafkaStreamsReflectionUtil.StreamBuilder streamBuilder =
KafkaStreamsReflectionUtil.createBuilder();
KStream<Integer, String> textLines = streamBuilder.stream(STREAM_PENDING);
KStream<Integer, String> values =
textLines.mapValues(
textLine -> {
Span.current().setAttribute("asdf", "testing");
return textLine.toLowerCase(Locale.ROOT);
});
KafkaStreams streams = streamBuilder.createStreams(values, config, STREAM_PROCESSED);
streams.start();
String greeting = "TESTING TESTING 123!";
KafkaStreamsBaseTest.producer.send(new ProducerRecord<>(STREAM_PENDING, 10, greeting));
// check that the message was received
@SuppressWarnings("PreferJavaTimeOverload")
ConsumerRecords<Integer, String> records =
KafkaStreamsBaseTest.consumer.poll(Duration.ofSeconds(10).toMillis());
Headers receivedHeaders = null;
for (ConsumerRecord<Integer, String> record : records) {
Span.current().setAttribute("testing", 123);
assertThat(record.key()).isEqualTo(10);
assertThat(record.value()).isEqualTo(greeting.toLowerCase(Locale.ROOT));
if (receivedHeaders == null) {
receivedHeaders = record.headers();
}
}
assertThat(receivedHeaders).isNotEmpty();
SpanContext receivedContext = Span.fromContext(getContext(receivedHeaders)).getSpanContext();
AtomicReference<SpanData> streamSendSpanRef = new AtomicReference<>();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
// kafka-clients PRODUCER
span ->
span.hasName(STREAM_PENDING + " publish")
.hasKind(SpanKind.PRODUCER)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
.KAFKA),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
STREAM_PENDING),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
equalTo(MESSAGING_CLIENT_ID, "producer-1"),
satisfies(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
k -> k.isInstanceOf(String.class)),
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0),
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10")),
// kafka-stream CONSUMER
span -> {
List<AttributeAssertion> assertions =
new ArrayList<>(
asList(
equalTo(
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
.KAFKA),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
STREAM_PENDING),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"),
satisfies(MESSAGING_CLIENT_ID, k -> k.endsWith("consumer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
k -> k.isInstanceOf(Long.class)),
satisfies(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
k -> k.isInstanceOf(String.class)),
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0),
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"),
satisfies(
longKey("kafka.record.queue_time_ms"),
k -> k.isGreaterThanOrEqualTo(0)),
equalTo(stringKey("asdf"), "testing")));
if (Boolean.getBoolean("testLatestDeps")) {
assertions.add(
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP,
"test-application"));
}
span.hasName(STREAM_PENDING + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(assertions);
},
// kafka-clients PRODUCER
span -> {
streamSendSpanRef.set(trace.getSpan(2));
span.hasName(STREAM_PROCESSED + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(1))
.hasTraceId(receivedContext.getTraceId())
.hasSpanId(receivedContext.getSpanId())
.hasAttributesSatisfyingExactly(
equalTo(
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
MessagingIncubatingAttributes.MessagingSystemIncubatingValues.KAFKA),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
STREAM_PROCESSED),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
satisfies(MESSAGING_CLIENT_ID, k -> k.isInstanceOf(String.class)),
satisfies(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
k -> k.isInstanceOf(String.class)),
equalTo(MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0));
},
// kafka-clients CONSUMER process
span -> {
List<AttributeAssertion> assertions =
new ArrayList<>(
asList(
equalTo(
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
.KAFKA),
equalTo(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME,
STREAM_PROCESSED),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"),
satisfies(MESSAGING_CLIENT_ID, k -> k.startsWith("consumer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
k -> k.isInstanceOf(Long.class)),
satisfies(
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
k -> k.isInstanceOf(String.class)),
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, 0),
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY, "10"),
satisfies(
longKey("kafka.record.queue_time_ms"),
k -> k.isGreaterThanOrEqualTo(0)),
equalTo(longKey("testing"), 123)));
if (Boolean.getBoolean("testLatestDeps")) {
assertions.add(
equalTo(
MessagingIncubatingAttributes.MESSAGING_KAFKA_CONSUMER_GROUP, "test"));
}
span.hasName(STREAM_PROCESSED + " process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(2))
.hasAttributesSatisfyingExactly(assertions);
}));
}
}