Merge pull request #854 from DataDog/mar-kolya/kafka-client-service-name

Set kafka client service name to application default service name.
This commit is contained in:
Nikolay Martynov 2019-05-24 10:51:16 -04:00 committed by GitHub
commit bc37601fc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 14 deletions

View File

@ -12,6 +12,11 @@ import org.apache.kafka.clients.producer.ProducerRecord;
public abstract class KafkaDecorator extends ClientDecorator {
public static final KafkaDecorator PRODUCER_DECORATE =
new KafkaDecorator() {
@Override
protected String service() {
return "kafka";
}
@Override
protected String spanKind() {
return Tags.SPAN_KIND_PRODUCER;
@ -25,6 +30,16 @@ public abstract class KafkaDecorator extends ClientDecorator {
public static final KafkaDecorator CONSUMER_DECORATE =
new KafkaDecorator() {
@Override
protected String service() {
/*
Use default service name. Common use-case here is to have consumer span parent
children spans in instrumented application. Since service name is inherited it makes
sense to default that to application service name rather than 'kafka'.
*/
return null;
}
@Override
protected String spanKind() {
return Tags.SPAN_KIND_CONSUMER;
@ -41,11 +56,6 @@ public abstract class KafkaDecorator extends ClientDecorator {
return new String[] {"kafka"};
}
@Override
protected String service() {
return "kafka";
}
@Override
protected String component() {
return "java-kafka";

View File

@ -94,7 +94,7 @@ class KafkaClientTest extends AgentTestRunner {
trace(1, 1) {
// CONSUMER span 0
span(0) {
serviceName "kafka"
serviceName "unnamed-java-app"
operationName "kafka.consume"
resourceName "Consume Topic $SHARED_TOPIC"
spanType "queue"

View File

@ -80,13 +80,13 @@ class KafkaStreamsTest extends AgentTestRunner {
KStream<String, String> textLines = builder.stream(STREAM_PENDING)
def values = textLines
.mapValues(new ValueMapper<String, String>() {
@Override
String apply(String textLine) {
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
getTestTracer().activeSpan().setTag("asdf", "testing")
return textLine.toLowerCase()
}
})
@Override
String apply(String textLine) {
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
getTestTracer().activeSpan().setTag("asdf", "testing")
return textLine.toLowerCase()
}
})
KafkaStreams streams
try {
@ -172,7 +172,7 @@ class KafkaStreamsTest extends AgentTestRunner {
trace(2, 1) {
// CONSUMER span 0
span(0) {
serviceName "kafka"
serviceName "unnamed-java-app"
operationName "kafka.consume"
resourceName "Consume Topic $STREAM_PROCESSED"
spanType "queue"