From 4314f71ab78ef7868510f236bc5d3b4b39ccdd72 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Tue, 30 Jan 2018 08:17:53 +1000 Subject: [PATCH] Kafka and Kafka Streams instrumentation This includes propagation and traces for producers and consumers. --- .../kafka-clients-0.11.gradle | 29 +++ .../KafkaConsumerInstrumentation.java | 104 +++++++++ .../KafkaProducerInstrumentation.java | 146 +++++++++++++ .../kafka_clients/TextMapExtractAdapter.java | 30 +++ .../kafka_clients/TextMapInjectAdapter.java | 26 +++ .../kafka_clients/TracingIterable.java | 84 ++++++++ .../src/test/groovy/KafkaTest.groovy | 138 ++++++++++++ .../kafka-streams-0.11.gradle | 33 +++ .../KafkaStreamsProcessorInstrumentation.java | 126 +++++++++++ ...NodeRecordDeserializerInstrumentation.java | 66 ++++++ .../kafka_streams/TextMapExtractAdapter.java | 30 +++ .../src/test/groovy/KafkaStreamsTest.groovy | 204 ++++++++++++++++++ .../trace/agent/test/AgentTestRunner.java | 15 +- .../trace/agent/tooling/HelperInjector.java | 7 +- settings.gradle | 2 + 15 files changed, 1038 insertions(+), 2 deletions(-) create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/kafka-clients-0.11.gradle create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapInjectAdapter.java create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaTest.groovy create mode 100644 dd-java-agent/instrumentation/kafka-streams-0.11/kafka-streams-0.11.gradle create mode 100644 dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java create mode 100644 dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsSourceNodeRecordDeserializerInstrumentation.java create mode 100644 dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/TextMapExtractAdapter.java create mode 100644 dd-java-agent/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/kafka-clients-0.11.gradle b/dd-java-agent/instrumentation/kafka-clients-0.11/kafka-clients-0.11.gradle new file mode 100644 index 0000000000..404aa8c8b0 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/kafka-clients-0.11.gradle @@ -0,0 +1,29 @@ +apply plugin: 'version-scan' + +versionScan { + group = "org.apache.kafka" + module = "kafka-clients" + versions = "[0.11.0.0,)" + verifyPresent = [ + 'org.apache.kafka.common.header.Header' : null, + 'org.apache.kafka.common.header.Headers': null, + ] +} + +apply from: "${rootDir}/gradle/java.gradle" + +dependencies { + compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0' + + compile project(':dd-trace-ot') + compile project(':dd-java-agent:tooling') + + compile deps.bytebuddy + compile deps.opentracing + + testCompile project(':dd-java-agent:testing') + testCompile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0' + testCompile group: 'org.springframework.kafka', name: 'spring-kafka', version: '1.3.3.RELEASE' + testCompile group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '1.3.3.RELEASE' + testCompile group: 'javax.xml.bind', name: 'jaxb-api', version: '2.3.0' +} diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java new file mode 100644 index 0000000000..2c5320111d --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java @@ -0,0 +1,104 @@ +package datadog.trace.instrumentation.kafka_clients; + +import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.DDAdvice; +import datadog.trace.agent.tooling.HelperInjector; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTags; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.opentracing.propagation.Format; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import java.util.Iterator; +import net.bytebuddy.agent.builder.AgentBuilder; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +@AutoService(Instrumenter.class) +public final class KafkaConsumerInstrumentation implements Instrumenter { + public static final HelperInjector HELPER_INJECTOR = + new HelperInjector( + "datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter", + "datadog.trace.instrumentation.kafka_clients.TracingIterable", + "datadog.trace.instrumentation.kafka_clients.TracingIterable$TracingIterator", + "datadog.trace.instrumentation.kafka_clients.TracingIterable$SpanBuilderDecorator", + "datadog.trace.instrumentation.kafka_clients.KafkaConsumerInstrumentation$ConsumeScopeAction"); + public static final ConsumeScopeAction CONSUME_ACTION = new ConsumeScopeAction(); + + private static final String OPERATION = "kafka.consume"; + private static final String COMPONENT_NAME = "java-kafka"; + + @Override + public AgentBuilder instrument(final AgentBuilder agentBuilder) { + return agentBuilder + .type( + named("org.apache.kafka.clients.consumer.ConsumerRecords"), + classLoaderHasClasses( + "org.apache.kafka.common.header.Header", "org.apache.kafka.common.header.Headers")) + .transform(HELPER_INJECTOR) + .transform( + DDAdvice.create() + .advice( + isMethod() + .and(isPublic()) + .and(named("records")) + .and(takesArgument(0, String.class)) + .and(returns(Iterable.class)), + IterableAdvice.class.getName()) + .advice( + isMethod() + .and(isPublic()) + .and(named("iterator")) + .and(takesArguments(0)) + .and(returns(Iterator.class)), + IteratorAdvice.class.getName())) + .asDecorator(); + } + + public static class IterableAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap(@Advice.Return(readOnly = false) Iterable iterable) { + iterable = new TracingIterable(iterable, OPERATION, CONSUME_ACTION); + } + } + + public static class IteratorAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap(@Advice.Return(readOnly = false) Iterator iterator) { + iterator = new TracingIterable.TracingIterator(iterator, OPERATION, CONSUME_ACTION); + } + } + + public static class ConsumeScopeAction + implements TracingIterable.SpanBuilderDecorator { + + @Override + public void decorate(final Tracer.SpanBuilder spanBuilder, final ConsumerRecord record) { + final String topic = record.topic() == null ? "unknown" : record.topic(); + final SpanContext spanContext = + GlobalTracer.get() + .extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(record.headers())); + spanBuilder + .asChildOf(spanContext) + .withTag(DDTags.SERVICE_NAME, "kafka") + .withTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER) + .withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) + .withTag("partition", record.partition()) + .withTag("offset", record.offset()); + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java new file mode 100644 index 0000000000..a8c33bb652 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -0,0 +1,146 @@ +package datadog.trace.instrumentation.kafka_clients; + +import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.DDAdvice; +import datadog.trace.agent.tooling.HelperInjector; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTags; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.propagation.Format; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import java.util.Collections; +import net.bytebuddy.agent.builder.AgentBuilder; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +@AutoService(Instrumenter.class) +public final class KafkaProducerInstrumentation implements Instrumenter { + public static final HelperInjector HELPER_INJECTOR = + new HelperInjector( + "datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter", + KafkaProducerInstrumentation.class.getName() + "$ProducerCallback"); + + private static final String OPERATION = "kafka.produce"; + private static final String COMPONENT_NAME = "java-kafka"; + + @Override + public AgentBuilder instrument(final AgentBuilder agentBuilder) { + return agentBuilder + .type( + named("org.apache.kafka.clients.producer.KafkaProducer"), + classLoaderHasClasses( + "org.apache.kafka.common.header.Header", "org.apache.kafka.common.header.Headers")) + .transform(HELPER_INJECTOR) + .transform( + DDAdvice.create() + .advice( + isMethod() + .and(isPublic()) + .and(named("send")) + .and( + takesArgument( + 0, named("org.apache.kafka.clients.producer.ProducerRecord"))) + .and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))), + ProducerAdvice.class.getName())) + .asDecorator(); + } + + public static class ProducerAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static Scope startSpan( + @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, + @Advice.Argument(value = 1, readOnly = false) Callback callback) { + final Scope scope = GlobalTracer.get().buildSpan(OPERATION).startActive(false); + callback = new ProducerCallback(callback, scope); + + final Span span = scope.span(); + final String topic = record.topic() == null ? "unknown" : record.topic(); + if (record.partition() != null) { + span.setTag("kafka.partition", record.partition()); + } + + Tags.COMPONENT.set(span, COMPONENT_NAME); + Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_PRODUCER); + + span.setTag(DDTags.RESOURCE_NAME, "Produce Topic " + topic); + span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_PRODUCER); + span.setTag(DDTags.SERVICE_NAME, "kafka"); + + try { + GlobalTracer.get() + .inject( + scope.span().context(), + Format.Builtin.TEXT_MAP, + new TextMapInjectAdapter(record.headers())); + } catch (final IllegalStateException e) { + //headers must be read-only from reused record. try again with new one. + record = + new ProducerRecord<>( + record.topic(), + record.partition(), + record.timestamp(), + record.key(), + record.value(), + record.headers()); + + GlobalTracer.get() + .inject( + scope.span().context(), + Format.Builtin.TEXT_MAP, + new TextMapInjectAdapter(record.headers())); + } + + return scope; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) { + if (throwable != null) { + final Span span = scope.span(); + Tags.ERROR.set(span, true); + span.log(Collections.singletonMap("error.object", throwable)); + span.finish(); + } + scope.close(); + } + } + + public static class ProducerCallback implements Callback { + private final Callback callback; + private final Scope scope; + + public ProducerCallback(final Callback callback, final Scope scope) { + this.callback = callback; + this.scope = scope; + } + + @Override + public void onCompletion(final RecordMetadata metadata, final Exception exception) { + if (exception != null) { + Tags.ERROR.set(scope.span(), Boolean.TRUE); + scope.span().log(Collections.singletonMap("error.object", exception)); + } + try { + if (callback != null) { + callback.onCompletion(metadata, exception); + } + } finally { + scope.span().finish(); + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java new file mode 100644 index 0000000000..c16034c0d4 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java @@ -0,0 +1,30 @@ +package datadog.trace.instrumentation.kafka_clients; + +import io.opentracing.propagation.TextMap; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +public class TextMapExtractAdapter implements TextMap { + + private final Map map = new HashMap<>(); + + public TextMapExtractAdapter(final Headers headers) { + for (final Header header : headers) { + map.put(header.key(), new String(header.value(), StandardCharsets.UTF_8)); + } + } + + @Override + public Iterator> iterator() { + return map.entrySet().iterator(); + } + + @Override + public void put(final String key, final String value) { + throw new UnsupportedOperationException("Use inject adapter instead"); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapInjectAdapter.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapInjectAdapter.java new file mode 100644 index 0000000000..2107e3cbef --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapInjectAdapter.java @@ -0,0 +1,26 @@ +package datadog.trace.instrumentation.kafka_clients; + +import io.opentracing.propagation.TextMap; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.Map; +import org.apache.kafka.common.header.Headers; + +public class TextMapInjectAdapter implements TextMap { + + private final Headers headers; + + public TextMapInjectAdapter(final Headers headers) { + this.headers = headers; + } + + @Override + public Iterator> iterator() { + throw new UnsupportedOperationException("Use extract adapter instead"); + } + + @Override + public void put(final String key, final String value) { + headers.add(key, value.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java new file mode 100644 index 0000000000..e2ff77f7dd --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java @@ -0,0 +1,84 @@ +package datadog.trace.instrumentation.kafka_clients; + +import io.opentracing.Scope; +import io.opentracing.Tracer; +import io.opentracing.util.GlobalTracer; +import java.util.Iterator; +import lombok.extern.slf4j.Slf4j; + +public class TracingIterable implements Iterable { + private final Iterable delegateIterable; + private final String operationName; + private final SpanBuilderDecorator decorator; + + public TracingIterable( + final Iterable delegateIterable, + final String operationName, + final SpanBuilderDecorator decorator) { + this.delegateIterable = delegateIterable; + this.operationName = operationName; + this.decorator = decorator; + } + + @Override + public Iterator iterator() { + return new TracingIterator<>(delegateIterable.iterator(), operationName, decorator); + } + + @Slf4j + public static class TracingIterator implements Iterator { + private final Iterator delegateIterator; + private final String operationName; + private final SpanBuilderDecorator decorator; + + private Scope currentScope; + + public TracingIterator( + final Iterator delegateIterator, + final String operationName, + final SpanBuilderDecorator decorator) { + this.delegateIterator = delegateIterator; + this.operationName = operationName; + this.decorator = decorator; + } + + @Override + public boolean hasNext() { + if (currentScope != null) { + currentScope.close(); + currentScope = null; + } + return delegateIterator.hasNext(); + } + + @Override + public T next() { + if (currentScope != null) { + // in case they didn't call hasNext()... + currentScope.close(); + currentScope = null; + } + + final T next = delegateIterator.next(); + + try { + if (next != null) { + final Tracer.SpanBuilder spanBuilder = GlobalTracer.get().buildSpan(operationName); + decorator.decorate(spanBuilder, next); + currentScope = spanBuilder.startActive(true); + } + } finally { + return next; + } + } + + @Override + public void remove() { + delegateIterator.remove(); + } + } + + public interface SpanBuilderDecorator { + void decorate(Tracer.SpanBuilder spanBuilder, T context); + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaTest.groovy new file mode 100644 index 0000000000..08cec690da --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaTest.groovy @@ -0,0 +1,138 @@ +import ch.qos.logback.classic.Level +import ch.qos.logback.classic.Logger +import datadog.trace.agent.test.AgentTestRunner +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.junit.ClassRule +import org.slf4j.LoggerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener +import org.springframework.kafka.listener.config.ContainerProperties +import org.springframework.kafka.test.rule.KafkaEmbedded +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Shared + +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +class KafkaTest extends AgentTestRunner { + static final SHARED_TOPIC = "shared.topic" + + static { + ((Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.WARN) + ((Logger) LoggerFactory.getLogger("datadog")).setLevel(Level.DEBUG) + } + + @Shared + @ClassRule + KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC) + + def "test kafka produce and consume"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + ContainerProperties containerProperties = new ContainerProperties(SHARED_TOPIC) + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + WRITER_PHASER.register() + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + WRITER_PHASER.arriveAndAwaitAdvance() // ensure consistent ordering of traces + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + + when: + String greeting = "Hello Spring Kafka Sender!" + kafkaTemplate.send(SHARED_TOPIC, greeting) + + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + received.value() == greeting + received.key() == null + + TEST_WRITER.waitForTraces(2) + TEST_WRITER.size() == 2 + + def t1 = TEST_WRITER.get(0) + t1.size() == 1 + def t2 = TEST_WRITER.get(1) + t2.size() == 1 + + and: // PRODUCER span 0 + def t1span1 = t1[0] + + t1span1.context().operationName == "kafka.produce" + t1span1.serviceName == "kafka" + t1span1.resourceName == "Produce Topic $SHARED_TOPIC" + t1span1.type == "queue" + !t1span1.context().getErrorFlag() + t1span1.context().parentId == 0 + + def t1tags1 = t1span1.context().tags + t1tags1["component"] == "java-kafka" + t1tags1["span.kind"] == "producer" + t1tags1["span.type"] == "queue" + t1tags1["thread.name"] != null + t1tags1["thread.id"] != null + t1tags1.size() == 5 + + and: // CONSUMER span 0 + def t2span1 = t2[0] + + t2span1.context().operationName == "kafka.consume" + t2span1.serviceName == "kafka" + t2span1.resourceName == "Consume Topic $SHARED_TOPIC" + t2span1.type == "queue" + !t2span1.context().getErrorFlag() + t2span1.context().parentId == t1span1.context().spanId + + def t2tags1 = t2span1.context().tags + t2tags1["component"] == "java-kafka" + t2tags1["span.kind"] == "consumer" + t1tags1["span.type"] == "queue" + t2tags1["partition"] >= 0 + t2tags1["offset"] == 0 + t2tags1["thread.name"] != null + t2tags1["thread.id"] != null + t2tags1.size() == 7 + + def headers = received.headers() + headers.iterator().hasNext() + new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "$t1span1.traceId" + new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "$t1span1.spanId" + + + cleanup: + producerFactory.stop() + container.stop() + } + +} diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/kafka-streams-0.11.gradle b/dd-java-agent/instrumentation/kafka-streams-0.11/kafka-streams-0.11.gradle new file mode 100644 index 0000000000..2b527c1de3 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/kafka-streams-0.11.gradle @@ -0,0 +1,33 @@ +apply plugin: 'version-scan' + +versionScan { + group = "org.apache.kafka" + module = "kafka-streams" + versions = "[0.11.0.0,)" + verifyPresent = [ + 'org.apache.kafka.streams.state.internals.CacheFunction' : null, + 'org.apache.kafka.streams.state.internals.InMemoryKeyValueStore': null, + ] +} + +apply from: "${rootDir}/gradle/java.gradle" + +dependencies { + compileOnly group: 'org.apache.kafka', name: 'kafka-streams', version: '0.11.0.0' + + compile project(':dd-trace-ot') + compile project(':dd-java-agent:tooling') + + compile deps.bytebuddy + compile deps.opentracing + + testCompile project(':dd-java-agent:testing') + // Include kafka-clients instrumentation for tests. + testCompile project(':dd-java-agent:instrumentation:kafka-clients-0.11') + + testCompile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0' + testCompile group: 'org.apache.kafka', name: 'kafka-streams', version: '0.11.0.0' + testCompile group: 'org.springframework.kafka', name: 'spring-kafka', version: '1.3.3.RELEASE' + testCompile group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '1.3.3.RELEASE' + testCompile group: 'javax.xml.bind', name: 'jaxb-api', version: '2.3.0' +} diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java new file mode 100644 index 0000000000..e30f81ef26 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java @@ -0,0 +1,126 @@ +package datadog.trace.instrumentation.kafka_streams; + +import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.DDAdvice; +import datadog.trace.agent.tooling.HelperInjector; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTags; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.propagation.Format; +import io.opentracing.tag.Tags; +import io.opentracing.util.GlobalTracer; +import java.util.Collections; +import net.bytebuddy.agent.builder.AgentBuilder; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.streams.processor.internals.StampedRecord; + +public class KafkaStreamsProcessorInstrumentation { + // These two instrumentations work together to instrument StreamTask.process. + // The combination of these are needed because there's not a good instrumentation point. + + public static final HelperInjector HELPER_INJECTOR = + new HelperInjector("datadog.trace.instrumentation.kafka_streams.TextMapExtractAdapter"); + + private static final String OPERATION = "kafka.consume"; + private static final String COMPONENT_NAME = "java-kafka"; + + @AutoService(Instrumenter.class) + public static class StartInstrumentation implements Instrumenter { + + @Override + public AgentBuilder instrument(final AgentBuilder agentBuilder) { + return agentBuilder + .type( + named("org.apache.kafka.streams.processor.internals.PartitionGroup"), + classLoaderHasClasses("org.apache.kafka.streams.state.internals.KeyValueIterators")) + .transform(HELPER_INJECTOR) + .transform( + DDAdvice.create() + .advice( + isMethod() + .and(isPackagePrivate()) + .and(named("nextRecord")) + .and( + returns( + named( + "org.apache.kafka.streams.processor.internals.StampedRecord"))), + StartSpanAdvice.class.getName())) + .asDecorator(); + } + + public static class StartSpanAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void startSpan(@Advice.Return final StampedRecord record) { + if (record == null) { + return; + } + + final SpanContext extractedContext = + GlobalTracer.get() + .extract( + Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(record.value.headers())); + + GlobalTracer.get() + .buildSpan(OPERATION) + .asChildOf(extractedContext) + .withTag(DDTags.SERVICE_NAME, "kafka") + .withTag(DDTags.RESOURCE_NAME, "Consume Topic " + record.topic()) + .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER) + .withTag(Tags.COMPONENT.getKey(), COMPONENT_NAME) + .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) + .withTag("partition", record.partition()) + .withTag("offset", record.offset()) + .startActive(true); + } + } + } + + @AutoService(Instrumenter.class) + public static class StopInstrumentation implements Instrumenter { + + @Override + public AgentBuilder instrument(final AgentBuilder agentBuilder) { + return agentBuilder + .type( + named("org.apache.kafka.streams.processor.internals.StreamTask"), + classLoaderHasClasses( + "org.apache.kafka.common.header.Header", + "org.apache.kafka.common.header.Headers")) + .transform(HELPER_INJECTOR) + .transform( + DDAdvice.create() + .advice( + isMethod().and(isPublic()).and(named("process")).and(takesArguments(0)), + StartSpanAdvice.class.getName())) + .asDecorator(); + } + + public static class StartSpanAdvice { + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan(@Advice.Thrown final Throwable throwable) { + final Scope scope = GlobalTracer.get().scopeManager().active(); + if (scope != null) { + if (throwable != null) { + final Span span = scope.span(); + Tags.ERROR.set(span, Boolean.TRUE); + span.log(Collections.singletonMap("error.object", throwable)); + } + scope.close(); + } + } + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsSourceNodeRecordDeserializerInstrumentation.java b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsSourceNodeRecordDeserializerInstrumentation.java new file mode 100644 index 0000000000..4e64ddb6b5 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsSourceNodeRecordDeserializerInstrumentation.java @@ -0,0 +1,66 @@ +package datadog.trace.instrumentation.kafka_streams; + +import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.DDAdvice; +import datadog.trace.agent.tooling.Instrumenter; +import net.bytebuddy.agent.builder.AgentBuilder; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.record.TimestampType; + +// This is necessary because SourceNodeRecordDeserializer drops the headers. :-( +public class KafkaStreamsSourceNodeRecordDeserializerInstrumentation { + + @AutoService(Instrumenter.class) + public static class StartInstrumentation implements Instrumenter { + + @Override + public AgentBuilder instrument(final AgentBuilder agentBuilder) { + return agentBuilder + .type( + named("org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer"), + classLoaderHasClasses("org.apache.kafka.streams.state.internals.KeyValueIterators")) + .transform( + DDAdvice.create() + .advice( + isMethod() + .and(isPublic()) + .and(named("deserialize")) + .and( + takesArgument( + 0, named("org.apache.kafka.clients.consumer.ConsumerRecord"))) + .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecord"))), + SaveHeadersAdvice.class.getName())) + .asDecorator(); + } + + public static class SaveHeadersAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void saveHeaders( + @Advice.Argument(0) final ConsumerRecord incoming, + @Advice.Return(readOnly = false) ConsumerRecord result) { + result = + new ConsumerRecord<>( + result.topic(), + result.partition(), + result.offset(), + result.timestamp(), + TimestampType.CREATE_TIME, + result.checksum(), + result.serializedKeySize(), + result.serializedValueSize(), + result.key(), + result.value(), + incoming.headers()); + } + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/TextMapExtractAdapter.java b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/TextMapExtractAdapter.java new file mode 100644 index 0000000000..7c5db64ccf --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/TextMapExtractAdapter.java @@ -0,0 +1,30 @@ +package datadog.trace.instrumentation.kafka_streams; + +import io.opentracing.propagation.TextMap; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +public class TextMapExtractAdapter implements TextMap { + + private final Map map = new HashMap<>(); + + public TextMapExtractAdapter(final Headers headers) { + for (final Header header : headers) { + map.put(header.key(), new String(header.value(), StandardCharsets.UTF_8)); + } + } + + @Override + public Iterator> iterator() { + return map.entrySet().iterator(); + } + + @Override + public void put(final String key, final String value) { + throw new UnsupportedOperationException("Use inject adapter instead"); + } +} diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy b/dd-java-agent/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy new file mode 100644 index 0000000000..002868d811 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy @@ -0,0 +1,204 @@ +import ch.qos.logback.classic.Level +import ch.qos.logback.classic.Logger +import datadog.trace.agent.test.AgentTestRunner +import io.opentracing.util.GlobalTracer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.KafkaStreams +import org.apache.kafka.streams.StreamsConfig +import org.apache.kafka.streams.kstream.KStream +import org.apache.kafka.streams.kstream.KStreamBuilder +import org.apache.kafka.streams.kstream.ValueMapper +import org.junit.ClassRule +import org.slf4j.LoggerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.kafka.listener.MessageListener +import org.springframework.kafka.listener.config.ContainerProperties +import org.springframework.kafka.test.rule.KafkaEmbedded +import org.springframework.kafka.test.utils.ContainerTestUtils +import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Shared + +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +class KafkaStreamsTest extends AgentTestRunner { + static final STREAM_PENDING = "test.pending" + static final STREAM_PROCESSED = "test.processed" + + static { + ((Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.WARN) + ((Logger) LoggerFactory.getLogger("datadog")).setLevel(Level.DEBUG) + } + + @Shared + @ClassRule + KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, STREAM_PENDING, STREAM_PROCESSED) + + def "test kafka produce and consume with streams in-between"() { + setup: + def config = new Properties() + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + config.putAll(senderProps) + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application") + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) + + // CONFIGURE CONSUMER + def consumerFactory = new DefaultKafkaConsumerFactory(KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)) + def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, new ContainerProperties(STREAM_PROCESSED)) + + // create a thread safe queue to store the processed message + WRITER_PHASER.register() + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + consumerContainer.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + WRITER_PHASER.arriveAndAwaitAdvance() // ensure consistent ordering of traces + GlobalTracer.get().activeSpan().setTag("testing", 123) + records.add(record) + } + }) + + // start the container and underlying message listener + consumerContainer.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(consumerContainer, embeddedKafka.getPartitionsPerTopic()) + + // CONFIGURE PROCESSOR + final KStreamBuilder builder = new KStreamBuilder() + KStream textLines = builder.stream(STREAM_PENDING) + textLines + .mapValues(new ValueMapper() { + @Override + String apply(String textLine) { + WRITER_PHASER.arriveAndAwaitAdvance() // ensure consistent ordering of traces + GlobalTracer.get().activeSpan().setTag("asdf", "testing") + return textLine.toLowerCase() + } + }) + .to(Serdes.String(), Serdes.String(), STREAM_PROCESSED) + KafkaStreams streams = new KafkaStreams(builder, config) + streams.start() + + // CONFIGURE PRODUCER + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + when: + String greeting = "TESTING TESTING 123!" + kafkaTemplate.send(STREAM_PENDING, greeting) + + + then: + // check that the message was received + def received = records.poll(10, TimeUnit.SECONDS) + received.value() == greeting.toLowerCase() + received.key() == null + + TEST_WRITER.waitForTraces(3) + TEST_WRITER.size() == 3 + + def t1 = TEST_WRITER.get(0) + t1.size() == 1 + def t2 = TEST_WRITER.get(1) + t2.size() == 2 + def t3 = TEST_WRITER.get(2) + t3.size() == 1 + + and: // PRODUCER span 0 + def t1span1 = t1[0] + + t1span1.context().operationName == "kafka.produce" + t1span1.serviceName == "kafka" + t1span1.resourceName == "Produce Topic $STREAM_PENDING" + t1span1.type == "queue" + !t1span1.context().getErrorFlag() + t1span1.context().parentId == 0 + + def t1tags1 = t1span1.context().tags + t1tags1["component"] == "java-kafka" + t1tags1["span.kind"] == "producer" + t1tags1["span.type"] == "queue" + t1tags1["thread.name"] != null + t1tags1["thread.id"] != null + t1tags1.size() == 5 + + and: // STREAMING span 0 + def t2span1 = t2[0] + + t2span1.context().operationName == "kafka.consume" + t2span1.serviceName == "kafka" + t2span1.resourceName == "Consume Topic $STREAM_PENDING" + t2span1.type == "queue" + !t2span1.context().getErrorFlag() + t2span1.context().parentId == t1span1.context().spanId + + def t2tags1 = t2span1.context().tags + t2tags1["component"] == "java-kafka" + t2tags1["span.kind"] == "consumer" + t1tags1["span.type"] == "queue" + t2tags1["partition"] >= 0 + t2tags1["offset"] == 0 + t2tags1["thread.name"] != null + t2tags1["thread.id"] != null + t2tags1["asdf"] == "testing" + t2tags1.size() == 8 + + and: // STREAMING span 1 + def t2span2 = t2[1] + + t2span2.context().operationName == "kafka.produce" + t2span2.serviceName == "kafka" + t2span2.resourceName == "Produce Topic $STREAM_PROCESSED" + t2span2.type == "queue" + !t2span2.context().getErrorFlag() + t2span2.context().parentId == t2span1.context().spanId + + def t2tags2 = t2span2.context().tags + t2tags2["component"] == "java-kafka" + t2tags2["span.kind"] == "producer" + t2tags2["span.type"] == "queue" + t2tags2["thread.name"] != null + t2tags2["thread.id"] != null + t2tags2.size() == 5 + + and: // CONSUMER span 0 + def t3span1 = t3[0] + + t3span1.context().operationName == "kafka.consume" + t3span1.serviceName == "kafka" + t3span1.resourceName == "Consume Topic $STREAM_PROCESSED" + t3span1.type == "queue" + !t3span1.context().getErrorFlag() + t3span1.context().parentId == t2span2.context().spanId + + def t3tags1 = t3span1.context().tags + t3tags1["component"] == "java-kafka" + t3tags1["span.kind"] == "consumer" + t2tags2["span.type"] == "queue" + t3tags1["partition"] >= 0 + t3tags1["offset"] == 0 + t3tags1["thread.name"] != null + t3tags1["thread.id"] != null + t3tags1["testing"] == 123 + t3tags1.size() == 8 + + def headers = received.headers() + headers.iterator().hasNext() + new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "$t2span2.traceId" + new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "$t2span2.spanId" + + + cleanup: + producerFactory?.stop() + streams?.close() + consumerContainer?.stop() + } +} diff --git a/dd-java-agent/testing/src/main/java/datadog/trace/agent/test/AgentTestRunner.java b/dd-java-agent/testing/src/main/java/datadog/trace/agent/test/AgentTestRunner.java index e30e8e6080..f765fab94d 100644 --- a/dd-java-agent/testing/src/main/java/datadog/trace/agent/test/AgentTestRunner.java +++ b/dd-java-agent/testing/src/main/java/datadog/trace/agent/test/AgentTestRunner.java @@ -1,5 +1,6 @@ package datadog.trace.agent.test; +import datadog.opentracing.DDSpan; import datadog.opentracing.DDTracer; import datadog.opentracing.decorators.AbstractDecorator; import datadog.opentracing.decorators.DDDecoratorsFactory; @@ -10,6 +11,7 @@ import io.opentracing.Tracer; import java.lang.instrument.ClassFileTransformer; import java.lang.instrument.Instrumentation; import java.util.List; +import java.util.concurrent.Phaser; import net.bytebuddy.agent.ByteBuddyAgent; import org.junit.AfterClass; import org.junit.Before; @@ -45,8 +47,19 @@ public abstract class AgentTestRunner extends Specification { private static final Instrumentation instrumentation; private static ClassFileTransformer activeTransformer = null; + protected static final Phaser WRITER_PHASER = new Phaser(); + static { - TEST_WRITER = new ListWriter(); + WRITER_PHASER.register(); + TEST_WRITER = + new ListWriter() { + @Override + public boolean add(final List trace) { + final boolean result = super.add(trace); + WRITER_PHASER.arrive(); + return result; + } + }; TEST_TRACER = new DDTracer(TEST_WRITER); final List decorators = DDDecoratorsFactory.createBuiltinDecorators(); diff --git a/dd-java-agent/tooling/src/main/java/datadog/trace/agent/tooling/HelperInjector.java b/dd-java-agent/tooling/src/main/java/datadog/trace/agent/tooling/HelperInjector.java index d3fdd94a45..1a1cc9f5c8 100644 --- a/dd-java-agent/tooling/src/main/java/datadog/trace/agent/tooling/HelperInjector.java +++ b/dd-java-agent/tooling/src/main/java/datadog/trace/agent/tooling/HelperInjector.java @@ -85,7 +85,12 @@ public class HelperInjector implements Transformer { } } } catch (final Exception e) { - log.error("Failed to inject helper classes into " + classLoader, e); + log.error( + "Error preparing helpers for " + + typeDescription + + ". Failed to inject helper classes into " + + classLoader, + e); throw new RuntimeException(e); } injectedClassLoaders.add(classLoader); diff --git a/settings.gradle b/settings.gradle index c54b760d5a..1184c556cf 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,6 +14,8 @@ include ':dd-java-agent:instrumentation:datastax-cassandra-3.2' include ':dd-java-agent:instrumentation:jdbc' include ':dd-java-agent:instrumentation:jms-1' include ':dd-java-agent:instrumentation:jms-2' +include ':dd-java-agent:instrumentation:kafka-clients-0.11' +include ':dd-java-agent:instrumentation:kafka-streams-0.11' include ':dd-java-agent:instrumentation:mongo-3.1' include ':dd-java-agent:instrumentation:mongo-async-3.3' include ':dd-java-agent:instrumentation:okhttp-3'