Fix kafka-streaming tests broken by records(TopicPartition) instrumentation

This commit is contained in:
Nikolay Martynov 2019-07-11 10:31:18 -04:00
parent 3e8b65290a
commit 765b76b0ae
2 changed files with 36 additions and 14 deletions

View File

@ -24,7 +24,11 @@ import org.apache.kafka.streams.processor.internals.StampedRecord;
public class KafkaStreamsProcessorInstrumentation { public class KafkaStreamsProcessorInstrumentation {
// These two instrumentations work together to apply StreamTask.process. // These two instrumentations work together to apply StreamTask.process.
// The combination of these are needed because there's not a good instrumentation point. // The combination of these is needed because there's no good instrumentation point.
// FIXME: this instrumentation takes somewhat strange approach. It looks like Kafka Streams
// defines notions of 'processor', 'source' and 'sink'. There is no 'consumer' as such.
// Also this instrumentation doesn't define 'producer' making it 'asymmetric' - resulting
// in awkward tests and traces. We may want to revisit this in the future.
@AutoService(Instrumenter.class) @AutoService(Instrumenter.class)
public static class StartInstrumentation extends Instrumenter.Default { public static class StartInstrumentation extends Instrumenter.Default {

View File

@ -80,13 +80,13 @@ class KafkaStreamsTest extends AgentTestRunner {
KStream<String, String> textLines = builder.stream(STREAM_PENDING) KStream<String, String> textLines = builder.stream(STREAM_PENDING)
def values = textLines def values = textLines
.mapValues(new ValueMapper<String, String>() { .mapValues(new ValueMapper<String, String>() {
@Override @Override
String apply(String textLine) { String apply(String textLine) {
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
getTestTracer().activeSpan().setTag("asdf", "testing") getTestTracer().activeSpan().setTag("asdf", "testing")
return textLine.toLowerCase() return textLine.toLowerCase()
} }
}) })
KafkaStreams streams KafkaStreams streams
try { try {
@ -115,7 +115,7 @@ class KafkaStreamsTest extends AgentTestRunner {
received.value() == greeting.toLowerCase() received.value() == greeting.toLowerCase()
received.key() == null received.key() == null
assertTraces(3) { assertTraces(4) {
trace(0, 1) { trace(0, 1) {
// PRODUCER span 0 // PRODUCER span 0
span(0) { span(0) {
@ -132,7 +132,25 @@ class KafkaStreamsTest extends AgentTestRunner {
} }
} }
} }
trace(1, 2) { trace(1, 1) {
// CONSUMER span 0
span(0) {
serviceName "kafka"
operationName "kafka.consume"
resourceName "Consume Topic $STREAM_PENDING"
spanType "queue"
errored false
childOf TEST_WRITER[0][0]
tags {
"component" "java-kafka"
"span.kind" "consumer"
"partition" { it >= 0 }
"offset" 0
defaultTags(true)
}
}
}
trace(2, 2) {
// STREAMING span 0 // STREAMING span 0
span(0) { span(0) {
@ -169,7 +187,7 @@ class KafkaStreamsTest extends AgentTestRunner {
} }
} }
} }
trace(2, 1) { trace(3, 1) {
// CONSUMER span 0 // CONSUMER span 0
span(0) { span(0) {
serviceName "kafka" serviceName "kafka"
@ -177,7 +195,7 @@ class KafkaStreamsTest extends AgentTestRunner {
resourceName "Consume Topic $STREAM_PROCESSED" resourceName "Consume Topic $STREAM_PROCESSED"
spanType "queue" spanType "queue"
errored false errored false
childOf TEST_WRITER[1][0] childOf TEST_WRITER[2][0]
tags { tags {
"component" "java-kafka" "component" "java-kafka"
"span.kind" "consumer" "span.kind" "consumer"
@ -192,8 +210,8 @@ class KafkaStreamsTest extends AgentTestRunner {
def headers = received.headers() def headers = received.headers()
headers.iterator().hasNext() headers.iterator().hasNext()
new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${TEST_WRITER[1][0].traceId}" new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${TEST_WRITER[2][0].traceId}"
new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${TEST_WRITER[1][0].spanId}" new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${TEST_WRITER[2][0].spanId}"
cleanup: cleanup: