Merge pull request #911 from DataDog/mar-kolya/kafka-trace-records

Instrument records(TopicPartition) in kafka consumer
This commit is contained in:
Nikolay Martynov 2019-07-12 18:43:45 -04:00 committed by GitHub
commit 65c6b1fa97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 283 additions and 90 deletions

View File

@ -12,6 +12,7 @@ import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
@ -41,7 +42,8 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default {
packageName + ".KafkaDecorator$2",
packageName + ".TextMapExtractAdapter",
packageName + ".TracingIterable",
packageName + ".TracingIterable$TracingIterator",
packageName + ".TracingIterator",
packageName + ".TracingList",
};
}
@ -55,6 +57,13 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default {
.and(takesArgument(0, String.class))
.and(returns(Iterable.class)),
IterableAdvice.class.getName());
transformers.put(
isMethod()
.and(isPublic())
.and(named("records"))
.and(takesArgument(0, named("org.apache.kafka.common.TopicPartition")))
.and(returns(List.class)),
ListAdvice.class.getName());
transformers.put(
isMethod()
.and(isPublic())
@ -75,13 +84,22 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default {
}
}
public static class ListAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(@Advice.Return(readOnly = false) List<ConsumerRecord> iterable) {
if (iterable != null) {
iterable = new TracingList(iterable, "kafka.consume", CONSUMER_DECORATE);
}
}
}
public static class IteratorAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(@Advice.Return(readOnly = false) Iterator<ConsumerRecord> iterator) {
if (iterator != null) {
iterator =
new TracingIterable.TracingIterator(iterator, "kafka.consume", CONSUMER_DECORATE);
iterator = new TracingIterator(iterator, "kafka.consume", CONSUMER_DECORATE);
}
}
}

View File

@ -1,91 +1,24 @@
package datadog.trace.instrumentation.kafka_clients;
import io.opentracing.Scope;
import io.opentracing.SpanContext;
import io.opentracing.propagation.Format;
import io.opentracing.util.GlobalTracer;
import java.util.Iterator;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class TracingIterable implements Iterable<ConsumerRecord> {
private final Iterable<ConsumerRecord> delegateIterable;
private final Iterable<ConsumerRecord> delegate;
private final String operationName;
private final KafkaDecorator decorator;
public TracingIterable(
final Iterable<ConsumerRecord> delegateIterable,
final Iterable<ConsumerRecord> delegate,
final String operationName,
final KafkaDecorator decorator) {
this.delegateIterable = delegateIterable;
this.delegate = delegate;
this.operationName = operationName;
this.decorator = decorator;
}
@Override
public Iterator<ConsumerRecord> iterator() {
return new TracingIterator(delegateIterable.iterator(), operationName, decorator);
}
@Slf4j
public static class TracingIterator implements Iterator<ConsumerRecord> {
private final Iterator<ConsumerRecord> delegateIterator;
private final String operationName;
private final KafkaDecorator decorator;
/**
* Note: this may potentially create problems if this iterator is used from different threads.
* But at the moment we cannot do much about this.
*/
private Scope currentScope;
public TracingIterator(
final Iterator<ConsumerRecord> delegateIterator,
final String operationName,
final KafkaDecorator decorator) {
this.delegateIterator = delegateIterator;
this.operationName = operationName;
this.decorator = decorator;
}
@Override
public boolean hasNext() {
if (currentScope != null) {
currentScope.close();
currentScope = null;
}
return delegateIterator.hasNext();
}
@Override
public ConsumerRecord next() {
if (currentScope != null) {
// in case they didn't call hasNext()...
currentScope.close();
currentScope = null;
}
final ConsumerRecord next = delegateIterator.next();
try {
if (next != null) {
final SpanContext spanContext =
GlobalTracer.get()
.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(next.headers()));
currentScope =
GlobalTracer.get().buildSpan(operationName).asChildOf(spanContext).startActive(true);
decorator.afterStart(currentScope);
decorator.onConsume(currentScope, next);
}
} catch (final Exception e) {
log.debug("Error during decoration", e);
}
return next;
}
@Override
public void remove() {
delegateIterator.remove();
}
return new TracingIterator(delegate.iterator(), operationName, decorator);
}
}

View File

@ -0,0 +1,71 @@
package datadog.trace.instrumentation.kafka_clients;
import io.opentracing.Scope;
import io.opentracing.SpanContext;
import io.opentracing.propagation.Format;
import io.opentracing.util.GlobalTracer;
import java.util.Iterator;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@Slf4j
public class TracingIterator implements Iterator<ConsumerRecord> {
private final Iterator<ConsumerRecord> delegateIterator;
private final String operationName;
private final KafkaDecorator decorator;
/**
* Note: this may potentially create problems if this iterator is used from different threads. But
* at the moment we cannot do much about this.
*/
private Scope currentScope;
public TracingIterator(
final Iterator<ConsumerRecord> delegateIterator,
final String operationName,
final KafkaDecorator decorator) {
this.delegateIterator = delegateIterator;
this.operationName = operationName;
this.decorator = decorator;
}
@Override
public boolean hasNext() {
if (currentScope != null) {
currentScope.close();
currentScope = null;
}
return delegateIterator.hasNext();
}
@Override
public ConsumerRecord next() {
if (currentScope != null) {
// in case they didn't call hasNext()...
currentScope.close();
currentScope = null;
}
final ConsumerRecord next = delegateIterator.next();
try {
if (next != null) {
final SpanContext spanContext =
GlobalTracer.get()
.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(next.headers()));
currentScope =
GlobalTracer.get().buildSpan(operationName).asChildOf(spanContext).startActive(true);
decorator.afterStart(currentScope);
decorator.onConsume(currentScope, next);
}
} catch (final Exception e) {
log.debug("Error during decoration", e);
}
return next;
}
@Override
public void remove() {
delegateIterator.remove();
}
}

View File

@ -0,0 +1,142 @@
package datadog.trace.instrumentation.kafka_clients;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class TracingList implements List<ConsumerRecord> {
private final List<ConsumerRecord> delegate;
private final String operationName;
private final KafkaDecorator decorator;
public TracingList(
final List<ConsumerRecord> delegate,
final String operationName,
final KafkaDecorator decorator) {
this.delegate = delegate;
this.operationName = operationName;
this.decorator = decorator;
}
@Override
public int size() {
return delegate.size();
}
@Override
public boolean isEmpty() {
return delegate.isEmpty();
}
@Override
public boolean contains(final Object o) {
return delegate.contains(o);
}
@Override
public Iterator<ConsumerRecord> iterator() {
return new TracingIterator(delegate.iterator(), operationName, decorator);
}
@Override
public Object[] toArray() {
return delegate.toArray();
}
@Override
public <T> T[] toArray(final T[] a) {
return delegate.toArray(a);
}
@Override
public boolean add(final ConsumerRecord consumerRecord) {
return delegate.add(consumerRecord);
}
@Override
public boolean remove(final Object o) {
return delegate.remove(o);
}
@Override
public boolean containsAll(final Collection<?> c) {
return delegate.containsAll(c);
}
@Override
public boolean addAll(final Collection<? extends ConsumerRecord> c) {
return delegate.addAll(c);
}
@Override
public boolean addAll(final int index, final Collection<? extends ConsumerRecord> c) {
return delegate.addAll(index, c);
}
@Override
public boolean removeAll(final Collection<?> c) {
return delegate.removeAll(c);
}
@Override
public boolean retainAll(final Collection<?> c) {
return delegate.retainAll(c);
}
@Override
public void clear() {
delegate.clear();
}
@Override
public ConsumerRecord get(final int index) {
// TODO: should this be instrumented as well?
return delegate.get(index);
}
@Override
public ConsumerRecord set(final int index, final ConsumerRecord element) {
return delegate.set(index, element);
}
@Override
public void add(final int index, final ConsumerRecord element) {
delegate.add(index, element);
}
@Override
public ConsumerRecord remove(final int index) {
return delegate.remove(index);
}
@Override
public int indexOf(final Object o) {
return delegate.indexOf(o);
}
@Override
public int lastIndexOf(final Object o) {
return delegate.lastIndexOf(o);
}
@Override
public ListIterator<ConsumerRecord> listIterator() {
// TODO: the API for ListIterator is not really good to instrument it in context of Kafka
// Consumer so we will not do that for now
return delegate.listIterator();
}
@Override
public ListIterator<ConsumerRecord> listIterator(final int index) {
// TODO: the API for ListIterator is not really good to instrument it in context of Kafka
// Consumer so we will not do that for now
return delegate.listIterator(index);
}
@Override
public List<ConsumerRecord> subList(final int fromIndex, final int toIndex) {
return new TracingList(delegate.subList(fromIndex, toIndex), operationName, decorator);
}
}

View File

@ -24,7 +24,11 @@ import org.apache.kafka.streams.processor.internals.StampedRecord;
public class KafkaStreamsProcessorInstrumentation {
// These two instrumentations work together to apply StreamTask.process.
// The combination of these are needed because there's not a good instrumentation point.
// The combination of these is needed because there's no good instrumentation point.
// FIXME: this instrumentation takes somewhat strange approach. It looks like Kafka Streams
// defines notions of 'processor', 'source' and 'sink'. There is no 'consumer' as such.
// Also this instrumentation doesn't define 'producer' making it 'asymmetric' - resulting
// in awkward tests and traces. We may want to revisit this in the future.
@AutoService(Instrumenter.class)
public static class StartInstrumentation extends Instrumenter.Default {

View File

@ -57,7 +57,7 @@ class KafkaStreamsTest extends AgentTestRunner {
void onMessage(ConsumerRecord<String, String> record) {
// ensure consistent ordering of traces
// this is the last processing step so we should see 2 traces here
TEST_WRITER.waitForTraces(2)
TEST_WRITER.waitForTraces(3)
getTestTracer().activeSpan().setTag("testing", 123)
records.add(record)
}
@ -80,13 +80,13 @@ class KafkaStreamsTest extends AgentTestRunner {
KStream<String, String> textLines = builder.stream(STREAM_PENDING)
def values = textLines
.mapValues(new ValueMapper<String, String>() {
@Override
String apply(String textLine) {
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
getTestTracer().activeSpan().setTag("asdf", "testing")
return textLine.toLowerCase()
}
})
@Override
String apply(String textLine) {
TEST_WRITER.waitForTraces(2) // ensure consistent ordering of traces
getTestTracer().activeSpan().setTag("asdf", "testing")
return textLine.toLowerCase()
}
})
KafkaStreams streams
try {
@ -115,7 +115,14 @@ class KafkaStreamsTest extends AgentTestRunner {
received.value() == greeting.toLowerCase()
received.key() == null
assertTraces(3) {
if (TEST_WRITER[1][0].operationName == "kafka.produce") {
// Make sure that order of first two traces is predetermined.
// Unfortunately it looks like we cannot really control it in a better way through the code
def tmp = TEST_WRITER[1][0]
TEST_WRITER[1][0] = TEST_WRITER[0][0]
TEST_WRITER[0][0] = tmp
}
assertTraces(4) {
trace(0, 1) {
// PRODUCER span 0
span(0) {
@ -132,7 +139,25 @@ class KafkaStreamsTest extends AgentTestRunner {
}
}
}
trace(1, 2) {
trace(1, 1) {
// CONSUMER span 0
span(0) {
serviceName "kafka"
operationName "kafka.consume"
resourceName "Consume Topic $STREAM_PENDING"
spanType "queue"
errored false
childOf TEST_WRITER[0][0]
tags {
"component" "java-kafka"
"span.kind" "consumer"
"partition" { it >= 0 }
"offset" 0
defaultTags(true)
}
}
}
trace(2, 2) {
// STREAMING span 0
span(0) {
@ -169,7 +194,7 @@ class KafkaStreamsTest extends AgentTestRunner {
}
}
}
trace(2, 1) {
trace(3, 1) {
// CONSUMER span 0
span(0) {
serviceName "kafka"
@ -177,7 +202,7 @@ class KafkaStreamsTest extends AgentTestRunner {
resourceName "Consume Topic $STREAM_PROCESSED"
spanType "queue"
errored false
childOf TEST_WRITER[1][0]
childOf TEST_WRITER[2][0]
tags {
"component" "java-kafka"
"span.kind" "consumer"
@ -192,8 +217,8 @@ class KafkaStreamsTest extends AgentTestRunner {
def headers = received.headers()
headers.iterator().hasNext()
new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${TEST_WRITER[1][0].traceId}"
new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${TEST_WRITER[1][0].spanId}"
new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${TEST_WRITER[2][0].traceId}"
new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${TEST_WRITER[2][0].spanId}"
cleanup:

View File

@ -441,7 +441,7 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace
@Deprecated
private static Map<String, String> customRuntimeTags(
final String runtimeId, Map<String, String> applicationRootSpanTags) {
final String runtimeId, final Map<String, String> applicationRootSpanTags) {
final Map<String, String> runtimeTags = new HashMap<>(applicationRootSpanTags);
runtimeTags.put(Config.RUNTIME_ID_TAG, runtimeId);
return Collections.unmodifiableMap(runtimeTags);