Merge pull request #747 from DataDog/tyler/kafka-decorator

Migrate kafka instrumentation to Decorator.
This commit is contained in:
Tyler Benson 2019-03-01 08:26:13 -08:00 committed by GitHub
commit 246bdfe081
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 204 additions and 129 deletions

View File

@ -1,5 +1,6 @@
package datadog.trace.instrumentation.kafka_clients; package datadog.trace.instrumentation.kafka_clients;
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.CONSUMER_DECORATE;
import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.named;
@ -9,13 +10,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -27,14 +21,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
@AutoService(Instrumenter.class) @AutoService(Instrumenter.class)
public final class KafkaConsumerInstrumentation extends Instrumenter.Default { public final class KafkaConsumerInstrumentation extends Instrumenter.Default {
private static final String[] HELPER_CLASS_NAMES =
new String[] {
"datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter",
"datadog.trace.instrumentation.kafka_clients.TracingIterable",
"datadog.trace.instrumentation.kafka_clients.TracingIterable$TracingIterator",
"datadog.trace.instrumentation.kafka_clients.TracingIterable$SpanBuilderDecorator",
"datadog.trace.instrumentation.kafka_clients.KafkaConsumerInstrumentation$ConsumeScopeAction"
};
public KafkaConsumerInstrumentation() { public KafkaConsumerInstrumentation() {
super("kafka"); super("kafka");
@ -47,7 +33,16 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default {
@Override @Override
public String[] helperClassNames() { public String[] helperClassNames() {
return HELPER_CLASS_NAMES; return new String[] {
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator",
packageName + ".KafkaDecorator",
packageName + ".KafkaDecorator$1",
packageName + ".KafkaDecorator$2",
packageName + ".TextMapExtractAdapter",
packageName + ".TracingIterable",
packageName + ".TracingIterable$TracingIterator",
};
} }
@Override @Override
@ -75,7 +70,7 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default {
@Advice.OnMethodExit(suppress = Throwable.class) @Advice.OnMethodExit(suppress = Throwable.class)
public static void wrap(@Advice.Return(readOnly = false) Iterable<ConsumerRecord> iterable) { public static void wrap(@Advice.Return(readOnly = false) Iterable<ConsumerRecord> iterable) {
if (iterable != null) { if (iterable != null) {
iterable = new TracingIterable<>(iterable, "kafka.consume", ConsumeScopeAction.INSTANCE); iterable = new TracingIterable(iterable, "kafka.consume", CONSUMER_DECORATE);
} }
} }
} }
@ -86,31 +81,8 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default {
public static void wrap(@Advice.Return(readOnly = false) Iterator<ConsumerRecord> iterator) { public static void wrap(@Advice.Return(readOnly = false) Iterator<ConsumerRecord> iterator) {
if (iterator != null) { if (iterator != null) {
iterator = iterator =
new TracingIterable.TracingIterator<>( new TracingIterable.TracingIterator(iterator, "kafka.consume", CONSUMER_DECORATE);
iterator, "kafka.consume", ConsumeScopeAction.INSTANCE);
} }
} }
} }
public static class ConsumeScopeAction
implements TracingIterable.SpanBuilderDecorator<ConsumerRecord> {
public static final ConsumeScopeAction INSTANCE = new ConsumeScopeAction();
@Override
public void decorate(final Tracer.SpanBuilder spanBuilder, final ConsumerRecord record) {
final String topic = record.topic() == null ? "kafka" : record.topic();
final SpanContext spanContext =
GlobalTracer.get()
.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(record.headers()));
spanBuilder
.asChildOf(spanContext)
.withTag(DDTags.SERVICE_NAME, "kafka")
.withTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic)
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER)
.withTag(Tags.COMPONENT.getKey(), "java-kafka")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
.withTag("partition", record.partition())
.withTag("offset", record.offset());
}
}
} }

View File

@ -0,0 +1,79 @@
package datadog.trace.instrumentation.kafka_clients;
import datadog.trace.agent.decorator.ClientDecorator;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
public abstract class KafkaDecorator extends ClientDecorator {
public static final KafkaDecorator PRODUCER_DECORATE =
new KafkaDecorator() {
@Override
protected String spanKind() {
return Tags.SPAN_KIND_PRODUCER;
}
@Override
protected String spanType() {
return DDSpanTypes.MESSAGE_PRODUCER;
}
};
public static final KafkaDecorator CONSUMER_DECORATE =
new KafkaDecorator() {
@Override
protected String spanKind() {
return Tags.SPAN_KIND_CONSUMER;
}
@Override
protected String spanType() {
return DDSpanTypes.MESSAGE_CONSUMER;
}
};
@Override
protected String[] instrumentationNames() {
return new String[] {"kafka"};
}
@Override
protected String service() {
return "kafka";
}
@Override
protected String component() {
return "java-kafka";
}
@Override
protected abstract String spanKind();
public void onConsume(final Scope scope, final ConsumerRecord record) {
final Span span = scope.span();
if (record != null) {
final String topic = record.topic() == null ? "kafka" : record.topic();
span.setTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic);
span.setTag("partition", record.partition());
span.setTag("offset", record.offset());
}
}
public void onProduce(final Scope scope, final ProducerRecord record) {
if (record != null) {
final Span span = scope.span();
final String topic = record.topic() == null ? "kafka" : record.topic();
if (record.partition() != null) {
span.setTag("kafka.partition", record.partition());
}
span.setTag(DDTags.RESOURCE_NAME, "Produce Topic " + topic);
}
}
}

View File

@ -1,6 +1,6 @@
package datadog.trace.instrumentation.kafka_clients; package datadog.trace.instrumentation.kafka_clients;
import static io.opentracing.log.Fields.ERROR_OBJECT; import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.PRODUCER_DECORATE;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.isPublic;
@ -9,14 +9,9 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope; import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.propagation.Format; import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer; import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.method.MethodDescription;
@ -28,14 +23,6 @@ import org.apache.kafka.clients.producer.RecordMetadata;
@AutoService(Instrumenter.class) @AutoService(Instrumenter.class)
public final class KafkaProducerInstrumentation extends Instrumenter.Default { public final class KafkaProducerInstrumentation extends Instrumenter.Default {
private static final String[] HELPER_CLASS_NAMES =
new String[] {
"datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter",
KafkaProducerInstrumentation.class.getName() + "$ProducerCallback"
};
private static final String OPERATION = "kafka.produce";
private static final String COMPONENT_NAME = "java-kafka";
public KafkaProducerInstrumentation() { public KafkaProducerInstrumentation() {
super("kafka"); super("kafka");
@ -48,7 +35,15 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
@Override @Override
public String[] helperClassNames() { public String[] helperClassNames() {
return HELPER_CLASS_NAMES; return new String[] {
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator",
packageName + ".KafkaDecorator",
packageName + ".KafkaDecorator$1",
packageName + ".KafkaDecorator$2",
packageName + ".TextMapInjectAdapter",
KafkaProducerInstrumentation.class.getName() + "$ProducerCallback"
};
} }
@Override @Override
@ -68,22 +63,12 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
public static Scope startSpan( public static Scope startSpan(
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record, @Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
@Advice.Argument(value = 1, readOnly = false) Callback callback) { @Advice.Argument(value = 1, readOnly = false) Callback callback) {
final Scope scope = GlobalTracer.get().buildSpan(OPERATION).startActive(false); final Scope scope = GlobalTracer.get().buildSpan("kafka.produce").startActive(false);
PRODUCER_DECORATE.afterStart(scope);
PRODUCER_DECORATE.onProduce(scope, record);
callback = new ProducerCallback(callback, scope); callback = new ProducerCallback(callback, scope);
final Span span = scope.span();
final String topic = record.topic() == null ? "kafka" : record.topic();
if (record.partition() != null) {
span.setTag("kafka.partition", record.partition());
}
Tags.COMPONENT.set(span, COMPONENT_NAME);
Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_PRODUCER);
span.setTag(DDTags.RESOURCE_NAME, "Produce Topic " + topic);
span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_PRODUCER);
span.setTag(DDTags.SERVICE_NAME, "kafka");
try { try {
GlobalTracer.get() GlobalTracer.get()
.inject( .inject(
@ -114,12 +99,8 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan( public static void stopSpan(
@Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) { @Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) {
if (throwable != null) { PRODUCER_DECORATE.onError(scope, throwable);
final Span span = scope.span(); PRODUCER_DECORATE.beforeFinish(scope);
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
span.finish();
}
scope.close(); scope.close();
} }
} }
@ -135,15 +116,13 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
@Override @Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) { public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) { PRODUCER_DECORATE.onError(scope, exception);
Tags.ERROR.set(scope.span(), Boolean.TRUE);
scope.span().log(Collections.singletonMap(ERROR_OBJECT, exception));
}
try { try {
if (callback != null) { if (callback != null) {
callback.onCompletion(metadata, exception); callback.onCompletion(metadata, exception);
} }
} finally { } finally {
PRODUCER_DECORATE.beforeFinish(scope);
scope.span().finish(); scope.span().finish();
scope.close(); scope.close();
} }

View File

@ -1,42 +1,44 @@
package datadog.trace.instrumentation.kafka_clients; package datadog.trace.instrumentation.kafka_clients;
import io.opentracing.Scope; import io.opentracing.Scope;
import io.opentracing.Tracer; import io.opentracing.SpanContext;
import io.opentracing.propagation.Format;
import io.opentracing.util.GlobalTracer; import io.opentracing.util.GlobalTracer;
import java.util.Iterator; import java.util.Iterator;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class TracingIterable<T> implements Iterable<T> { public class TracingIterable implements Iterable<ConsumerRecord> {
private final Iterable<T> delegateIterable; private final Iterable<ConsumerRecord> delegateIterable;
private final String operationName; private final String operationName;
private final SpanBuilderDecorator<T> decorator; private final KafkaDecorator decorator;
public TracingIterable( public TracingIterable(
final Iterable<T> delegateIterable, final Iterable<ConsumerRecord> delegateIterable,
final String operationName, final String operationName,
final SpanBuilderDecorator<T> decorator) { final KafkaDecorator decorator) {
this.delegateIterable = delegateIterable; this.delegateIterable = delegateIterable;
this.operationName = operationName; this.operationName = operationName;
this.decorator = decorator; this.decorator = decorator;
} }
@Override @Override
public Iterator<T> iterator() { public Iterator<ConsumerRecord> iterator() {
return new TracingIterator<>(delegateIterable.iterator(), operationName, decorator); return new TracingIterator(delegateIterable.iterator(), operationName, decorator);
} }
@Slf4j @Slf4j
public static class TracingIterator<T> implements Iterator<T> { public static class TracingIterator implements Iterator<ConsumerRecord> {
private final Iterator<T> delegateIterator; private final Iterator<ConsumerRecord> delegateIterator;
private final String operationName; private final String operationName;
private final SpanBuilderDecorator<T> decorator; private final KafkaDecorator decorator;
private Scope currentScope; private Scope currentScope;
public TracingIterator( public TracingIterator(
final Iterator<T> delegateIterator, final Iterator<ConsumerRecord> delegateIterator,
final String operationName, final String operationName,
final SpanBuilderDecorator<T> decorator) { final KafkaDecorator decorator) {
this.delegateIterator = delegateIterator; this.delegateIterator = delegateIterator;
this.operationName = operationName; this.operationName = operationName;
this.decorator = decorator; this.decorator = decorator;
@ -52,20 +54,24 @@ public class TracingIterable<T> implements Iterable<T> {
} }
@Override @Override
public T next() { public ConsumerRecord next() {
if (currentScope != null) { if (currentScope != null) {
// in case they didn't call hasNext()... // in case they didn't call hasNext()...
currentScope.close(); currentScope.close();
currentScope = null; currentScope = null;
} }
final T next = delegateIterator.next(); final ConsumerRecord next = delegateIterator.next();
try { try {
if (next != null) { if (next != null) {
final Tracer.SpanBuilder spanBuilder = GlobalTracer.get().buildSpan(operationName); final SpanContext spanContext =
decorator.decorate(spanBuilder, next); GlobalTracer.get()
currentScope = spanBuilder.startActive(true); .extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(next.headers()));
currentScope =
GlobalTracer.get().buildSpan(operationName).asChildOf(spanContext).startActive(true);
decorator.afterStart(currentScope);
decorator.onConsume(currentScope, next);
} }
} catch (final Exception e) { } catch (final Exception e) {
log.debug("Error during decoration", e); log.debug("Error during decoration", e);
@ -78,8 +84,4 @@ public class TracingIterable<T> implements Iterable<T> {
delegateIterator.remove(); delegateIterator.remove();
} }
} }
public interface SpanBuilderDecorator<T> {
void decorate(Tracer.SpanBuilder spanBuilder, T context);
}
} }

View File

@ -0,0 +1,48 @@
package datadog.trace.instrumentation.kafka_streams;
import datadog.trace.agent.decorator.ClientDecorator;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import org.apache.kafka.streams.processor.internals.StampedRecord;
public class KafkaStreamsDecorator extends ClientDecorator {
public static final KafkaStreamsDecorator CONSUMER_DECORATE = new KafkaStreamsDecorator();
@Override
protected String[] instrumentationNames() {
return new String[] {"kafka", "kafka-streams"};
}
@Override
protected String service() {
return "kafka";
}
@Override
protected String component() {
return "java-kafka";
}
@Override
protected String spanKind() {
return Tags.SPAN_KIND_CONSUMER;
}
@Override
protected String spanType() {
return DDSpanTypes.MESSAGE_CONSUMER;
}
public void onConsume(final Scope scope, final StampedRecord record) {
final Span span = scope.span();
if (record != null) {
final String topic = record.topic() == null ? "kafka" : record.topic();
span.setTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic);
span.setTag("partition", record.partition());
span.setTag("offset", record.offset());
}
}
}

View File

@ -1,6 +1,6 @@
package datadog.trace.instrumentation.kafka_streams; package datadog.trace.instrumentation.kafka_streams;
import static io.opentracing.log.Fields.ERROR_OBJECT; import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.CONSUMER_DECORATE;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate; import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate;
@ -11,15 +11,10 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope; import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext; import io.opentracing.SpanContext;
import io.opentracing.propagation.Format; import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer; import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import net.bytebuddy.asm.Advice; import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.method.MethodDescription;
@ -31,9 +26,6 @@ public class KafkaStreamsProcessorInstrumentation {
// These two instrumentations work together to apply StreamTask.process. // These two instrumentations work together to apply StreamTask.process.
// The combination of these are needed because there's not a good instrumentation point. // The combination of these are needed because there's not a good instrumentation point.
public static final String[] HELPER_CLASS_NAMES =
new String[] {"datadog.trace.instrumentation.kafka_streams.TextMapExtractAdapter"};
@AutoService(Instrumenter.class) @AutoService(Instrumenter.class)
public static class StartInstrumentation extends Instrumenter.Default { public static class StartInstrumentation extends Instrumenter.Default {
@ -48,7 +40,12 @@ public class KafkaStreamsProcessorInstrumentation {
@Override @Override
public String[] helperClassNames() { public String[] helperClassNames() {
return HELPER_CLASS_NAMES; return new String[] {
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator",
packageName + ".KafkaStreamsDecorator",
packageName + ".TextMapExtractAdapter"
};
} }
@Override @Override
@ -74,17 +71,13 @@ public class KafkaStreamsProcessorInstrumentation {
.extract( .extract(
Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(record.value.headers())); Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(record.value.headers()));
final Scope scope =
GlobalTracer.get() GlobalTracer.get()
.buildSpan("kafka.consume") .buildSpan("kafka.consume")
.asChildOf(extractedContext) .asChildOf(extractedContext)
.withTag(DDTags.SERVICE_NAME, "kafka")
.withTag(DDTags.RESOURCE_NAME, "Consume Topic " + record.topic())
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER)
.withTag(Tags.COMPONENT.getKey(), "java-kafka")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER)
.withTag("partition", record.partition())
.withTag("offset", record.offset())
.startActive(true); .startActive(true);
CONSUMER_DECORATE.afterStart(scope);
CONSUMER_DECORATE.onConsume(scope, record);
} }
} }
} }
@ -103,7 +96,12 @@ public class KafkaStreamsProcessorInstrumentation {
@Override @Override
public String[] helperClassNames() { public String[] helperClassNames() {
return HELPER_CLASS_NAMES; return new String[] {
"datadog.trace.agent.decorator.BaseDecorator",
"datadog.trace.agent.decorator.ClientDecorator",
packageName + ".KafkaStreamsDecorator",
packageName + ".TextMapExtractAdapter"
};
} }
@Override @Override
@ -119,11 +117,8 @@ public class KafkaStreamsProcessorInstrumentation {
public static void stopSpan(@Advice.Thrown final Throwable throwable) { public static void stopSpan(@Advice.Thrown final Throwable throwable) {
final Scope scope = GlobalTracer.get().scopeManager().active(); final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope != null) { if (scope != null) {
if (throwable != null) { CONSUMER_DECORATE.onError(scope, throwable);
final Span span = scope.span(); CONSUMER_DECORATE.beforeFinish(scope);
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
}
scope.close(); scope.close();
} }
} }