diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java index 07fa3754c5..fc8e4bdf19 100644 --- a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java @@ -24,7 +24,11 @@ import org.apache.kafka.streams.processor.internals.StampedRecord; public class KafkaStreamsProcessorInstrumentation { // These two instrumentations work together to apply StreamTask.process. - // The combination of these are needed because there's not a good instrumentation point. + // The combination of these is needed because there's no good instrumentation point. + // FIXME: this instrumentation takes somewhat strange approach. It looks like Kafka Streams + // defines notions of 'processor', 'source' and 'sink'. There is no 'consumer' as such. + // Also this instrumentation doesn't define 'producer' making it 'asymmetric' - resulting + // in awkward tests and traces. We may want to revisit this in the future. @AutoService(Instrumenter.class) public static class StartInstrumentation extends Instrumenter.Default { diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy b/dd-java-agent/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy index d74433ec11..f957b872eb 100644 --- a/dd-java-agent/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy @@ -80,13 +80,13 @@ class KafkaStreamsTest extends AgentTestRunner { KStream textLines = builder.stream(STREAM_PENDING) def values = textLines .mapValues(new ValueMapper() { - @Override - String apply(String textLine) { - TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces - getTestTracer().activeSpan().setTag("asdf", "testing") - return textLine.toLowerCase() - } - }) + @Override + String apply(String textLine) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + getTestTracer().activeSpan().setTag("asdf", "testing") + return textLine.toLowerCase() + } + }) KafkaStreams streams try { @@ -115,7 +115,7 @@ class KafkaStreamsTest extends AgentTestRunner { received.value() == greeting.toLowerCase() received.key() == null - assertTraces(3) { + assertTraces(4) { trace(0, 1) { // PRODUCER span 0 span(0) { @@ -132,7 +132,25 @@ class KafkaStreamsTest extends AgentTestRunner { } } } - trace(1, 2) { + trace(1, 1) { + // CONSUMER span 0 + span(0) { + serviceName "kafka" + operationName "kafka.consume" + resourceName "Consume Topic $STREAM_PENDING" + spanType "queue" + errored false + childOf TEST_WRITER[0][0] + tags { + "component" "java-kafka" + "span.kind" "consumer" + "partition" { it >= 0 } + "offset" 0 + defaultTags(true) + } + } + } + trace(2, 2) { // STREAMING span 0 span(0) { @@ -169,7 +187,7 @@ class KafkaStreamsTest extends AgentTestRunner { } } } - trace(2, 1) { + trace(3, 1) { // CONSUMER span 0 span(0) { serviceName "kafka" @@ -177,7 +195,7 @@ class KafkaStreamsTest extends AgentTestRunner { resourceName "Consume Topic $STREAM_PROCESSED" spanType "queue" errored false - childOf TEST_WRITER[1][0] + childOf TEST_WRITER[2][0] tags { "component" "java-kafka" "span.kind" "consumer" @@ -192,8 +210,8 @@ class KafkaStreamsTest extends AgentTestRunner { def headers = received.headers() headers.iterator().hasNext() - new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${TEST_WRITER[1][0].traceId}" - new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${TEST_WRITER[1][0].spanId}" + new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${TEST_WRITER[2][0].traceId}" + new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${TEST_WRITER[2][0].spanId}" cleanup: