From e098e6bd64ed3c45d96790d4ba95d5a942bb0cae Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Thu, 28 Feb 2019 16:27:09 -0800 Subject: [PATCH] Migrate kafka instrumentation to Decorator. --- .../KafkaConsumerInstrumentation.java | 54 +++---------- .../kafka_clients/KafkaDecorator.java | 79 +++++++++++++++++++ .../KafkaProducerInstrumentation.java | 57 +++++-------- .../kafka_clients/TracingIterable.java | 46 +++++------ .../kafka_streams/KafkaStreamsDecorator.java | 48 +++++++++++ .../KafkaStreamsProcessorInstrumentation.java | 49 ++++++------ 6 files changed, 204 insertions(+), 129 deletions(-) create mode 100644 dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java create mode 100644 dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsDecorator.java diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java index 899ff6ac00..9653a61b0b 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInstrumentation.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.kafka_clients; +import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.CONSUMER_DECORATE; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; @@ -9,13 +10,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.api.DDSpanTypes; -import datadog.trace.api.DDTags; -import io.opentracing.SpanContext; -import io.opentracing.Tracer; -import io.opentracing.propagation.Format; -import io.opentracing.tag.Tags; -import io.opentracing.util.GlobalTracer; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -27,14 +21,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; @AutoService(Instrumenter.class) public final class KafkaConsumerInstrumentation extends Instrumenter.Default { - private static final String[] HELPER_CLASS_NAMES = - new String[] { - "datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter", - "datadog.trace.instrumentation.kafka_clients.TracingIterable", - "datadog.trace.instrumentation.kafka_clients.TracingIterable$TracingIterator", - "datadog.trace.instrumentation.kafka_clients.TracingIterable$SpanBuilderDecorator", - "datadog.trace.instrumentation.kafka_clients.KafkaConsumerInstrumentation$ConsumeScopeAction" - }; public KafkaConsumerInstrumentation() { super("kafka"); @@ -47,7 +33,16 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default { @Override public String[] helperClassNames() { - return HELPER_CLASS_NAMES; + return new String[] { + "datadog.trace.agent.decorator.BaseDecorator", + "datadog.trace.agent.decorator.ClientDecorator", + packageName + ".KafkaDecorator", + packageName + ".KafkaDecorator$1", + packageName + ".KafkaDecorator$2", + packageName + ".TextMapExtractAdapter", + packageName + ".TracingIterable", + packageName + ".TracingIterable$TracingIterator", + }; } @Override @@ -75,7 +70,7 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default { @Advice.OnMethodExit(suppress = Throwable.class) public static void wrap(@Advice.Return(readOnly = false) Iterable iterable) { if (iterable != null) { - iterable = new TracingIterable<>(iterable, "kafka.consume", ConsumeScopeAction.INSTANCE); + iterable = new TracingIterable(iterable, "kafka.consume", CONSUMER_DECORATE); } } } @@ -86,31 +81,8 @@ public final class KafkaConsumerInstrumentation extends Instrumenter.Default { public static void wrap(@Advice.Return(readOnly = false) Iterator iterator) { if (iterator != null) { iterator = - new TracingIterable.TracingIterator<>( - iterator, "kafka.consume", ConsumeScopeAction.INSTANCE); + new TracingIterable.TracingIterator(iterator, "kafka.consume", CONSUMER_DECORATE); } } } - - public static class ConsumeScopeAction - implements TracingIterable.SpanBuilderDecorator { - public static final ConsumeScopeAction INSTANCE = new ConsumeScopeAction(); - - @Override - public void decorate(final Tracer.SpanBuilder spanBuilder, final ConsumerRecord record) { - final String topic = record.topic() == null ? "kafka" : record.topic(); - final SpanContext spanContext = - GlobalTracer.get() - .extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(record.headers())); - spanBuilder - .asChildOf(spanContext) - .withTag(DDTags.SERVICE_NAME, "kafka") - .withTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic) - .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER) - .withTag(Tags.COMPONENT.getKey(), "java-kafka") - .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) - .withTag("partition", record.partition()) - .withTag("offset", record.offset()); - } - } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java new file mode 100644 index 0000000000..cbd222383a --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java @@ -0,0 +1,79 @@ +package datadog.trace.instrumentation.kafka_clients; + +import datadog.trace.agent.decorator.ClientDecorator; +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTags; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.tag.Tags; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; + +public abstract class KafkaDecorator extends ClientDecorator { + public static final KafkaDecorator PRODUCER_DECORATE = + new KafkaDecorator() { + @Override + protected String spanKind() { + return Tags.SPAN_KIND_PRODUCER; + } + + @Override + protected String spanType() { + return DDSpanTypes.MESSAGE_PRODUCER; + } + }; + + public static final KafkaDecorator CONSUMER_DECORATE = + new KafkaDecorator() { + @Override + protected String spanKind() { + return Tags.SPAN_KIND_CONSUMER; + } + + @Override + protected String spanType() { + return DDSpanTypes.MESSAGE_CONSUMER; + } + }; + + @Override + protected String[] instrumentationNames() { + return new String[] {"kafka"}; + } + + @Override + protected String service() { + return "kafka"; + } + + @Override + protected String component() { + return "java-kafka"; + } + + @Override + protected abstract String spanKind(); + + public void onConsume(final Scope scope, final ConsumerRecord record) { + final Span span = scope.span(); + if (record != null) { + final String topic = record.topic() == null ? "kafka" : record.topic(); + span.setTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic); + span.setTag("partition", record.partition()); + span.setTag("offset", record.offset()); + } + } + + public void onProduce(final Scope scope, final ProducerRecord record) { + if (record != null) { + final Span span = scope.span(); + + final String topic = record.topic() == null ? "kafka" : record.topic(); + if (record.partition() != null) { + span.setTag("kafka.partition", record.partition()); + } + + span.setTag(DDTags.RESOURCE_NAME, "Produce Topic " + topic); + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 972f1f9659..4e29178577 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -1,6 +1,6 @@ package datadog.trace.instrumentation.kafka_clients; -import static io.opentracing.log.Fields.ERROR_OBJECT; +import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.PRODUCER_DECORATE; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; @@ -9,14 +9,9 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.api.DDSpanTypes; -import datadog.trace.api.DDTags; import io.opentracing.Scope; -import io.opentracing.Span; import io.opentracing.propagation.Format; -import io.opentracing.tag.Tags; import io.opentracing.util.GlobalTracer; -import java.util.Collections; import java.util.Map; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; @@ -28,14 +23,6 @@ import org.apache.kafka.clients.producer.RecordMetadata; @AutoService(Instrumenter.class) public final class KafkaProducerInstrumentation extends Instrumenter.Default { - private static final String[] HELPER_CLASS_NAMES = - new String[] { - "datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter", - KafkaProducerInstrumentation.class.getName() + "$ProducerCallback" - }; - - private static final String OPERATION = "kafka.produce"; - private static final String COMPONENT_NAME = "java-kafka"; public KafkaProducerInstrumentation() { super("kafka"); @@ -48,7 +35,15 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { @Override public String[] helperClassNames() { - return HELPER_CLASS_NAMES; + return new String[] { + "datadog.trace.agent.decorator.BaseDecorator", + "datadog.trace.agent.decorator.ClientDecorator", + packageName + ".KafkaDecorator", + packageName + ".KafkaDecorator$1", + packageName + ".KafkaDecorator$2", + packageName + ".TextMapInjectAdapter", + KafkaProducerInstrumentation.class.getName() + "$ProducerCallback" + }; } @Override @@ -68,22 +63,12 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { public static Scope startSpan( @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, @Advice.Argument(value = 1, readOnly = false) Callback callback) { - final Scope scope = GlobalTracer.get().buildSpan(OPERATION).startActive(false); + final Scope scope = GlobalTracer.get().buildSpan("kafka.produce").startActive(false); + PRODUCER_DECORATE.afterStart(scope); + PRODUCER_DECORATE.onProduce(scope, record); + callback = new ProducerCallback(callback, scope); - final Span span = scope.span(); - final String topic = record.topic() == null ? "kafka" : record.topic(); - if (record.partition() != null) { - span.setTag("kafka.partition", record.partition()); - } - - Tags.COMPONENT.set(span, COMPONENT_NAME); - Tags.SPAN_KIND.set(span, Tags.SPAN_KIND_PRODUCER); - - span.setTag(DDTags.RESOURCE_NAME, "Produce Topic " + topic); - span.setTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_PRODUCER); - span.setTag(DDTags.SERVICE_NAME, "kafka"); - try { GlobalTracer.get() .inject( @@ -114,12 +99,8 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( @Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) { - if (throwable != null) { - final Span span = scope.span(); - Tags.ERROR.set(span, true); - span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); - span.finish(); - } + PRODUCER_DECORATE.onError(scope, throwable); + PRODUCER_DECORATE.beforeFinish(scope); scope.close(); } } @@ -135,15 +116,13 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { @Override public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception != null) { - Tags.ERROR.set(scope.span(), Boolean.TRUE); - scope.span().log(Collections.singletonMap(ERROR_OBJECT, exception)); - } + PRODUCER_DECORATE.onError(scope, exception); try { if (callback != null) { callback.onCompletion(metadata, exception); } } finally { + PRODUCER_DECORATE.beforeFinish(scope); scope.span().finish(); scope.close(); } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java index fea7908ea2..01135fea39 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java @@ -1,42 +1,44 @@ package datadog.trace.instrumentation.kafka_clients; import io.opentracing.Scope; -import io.opentracing.Tracer; +import io.opentracing.SpanContext; +import io.opentracing.propagation.Format; import io.opentracing.util.GlobalTracer; import java.util.Iterator; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; -public class TracingIterable implements Iterable { - private final Iterable delegateIterable; +public class TracingIterable implements Iterable { + private final Iterable delegateIterable; private final String operationName; - private final SpanBuilderDecorator decorator; + private final KafkaDecorator decorator; public TracingIterable( - final Iterable delegateIterable, + final Iterable delegateIterable, final String operationName, - final SpanBuilderDecorator decorator) { + final KafkaDecorator decorator) { this.delegateIterable = delegateIterable; this.operationName = operationName; this.decorator = decorator; } @Override - public Iterator iterator() { - return new TracingIterator<>(delegateIterable.iterator(), operationName, decorator); + public Iterator iterator() { + return new TracingIterator(delegateIterable.iterator(), operationName, decorator); } @Slf4j - public static class TracingIterator implements Iterator { - private final Iterator delegateIterator; + public static class TracingIterator implements Iterator { + private final Iterator delegateIterator; private final String operationName; - private final SpanBuilderDecorator decorator; + private final KafkaDecorator decorator; private Scope currentScope; public TracingIterator( - final Iterator delegateIterator, + final Iterator delegateIterator, final String operationName, - final SpanBuilderDecorator decorator) { + final KafkaDecorator decorator) { this.delegateIterator = delegateIterator; this.operationName = operationName; this.decorator = decorator; @@ -52,20 +54,24 @@ public class TracingIterable implements Iterable { } @Override - public T next() { + public ConsumerRecord next() { if (currentScope != null) { // in case they didn't call hasNext()... currentScope.close(); currentScope = null; } - final T next = delegateIterator.next(); + final ConsumerRecord next = delegateIterator.next(); try { if (next != null) { - final Tracer.SpanBuilder spanBuilder = GlobalTracer.get().buildSpan(operationName); - decorator.decorate(spanBuilder, next); - currentScope = spanBuilder.startActive(true); + final SpanContext spanContext = + GlobalTracer.get() + .extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(next.headers())); + currentScope = + GlobalTracer.get().buildSpan(operationName).asChildOf(spanContext).startActive(true); + decorator.afterStart(currentScope); + decorator.onConsume(currentScope, next); } } catch (final Exception e) { log.debug("Error during decoration", e); @@ -78,8 +84,4 @@ public class TracingIterable implements Iterable { delegateIterator.remove(); } } - - public interface SpanBuilderDecorator { - void decorate(Tracer.SpanBuilder spanBuilder, T context); - } } diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsDecorator.java b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsDecorator.java new file mode 100644 index 0000000000..31e9eb65a9 --- /dev/null +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsDecorator.java @@ -0,0 +1,48 @@ +package datadog.trace.instrumentation.kafka_streams; + +import datadog.trace.agent.decorator.ClientDecorator; +import datadog.trace.api.DDSpanTypes; +import datadog.trace.api.DDTags; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.tag.Tags; +import org.apache.kafka.streams.processor.internals.StampedRecord; + +public class KafkaStreamsDecorator extends ClientDecorator { + public static final KafkaStreamsDecorator CONSUMER_DECORATE = new KafkaStreamsDecorator(); + + @Override + protected String[] instrumentationNames() { + return new String[] {"kafka", "kafka-streams"}; + } + + @Override + protected String service() { + return "kafka"; + } + + @Override + protected String component() { + return "java-kafka"; + } + + @Override + protected String spanKind() { + return Tags.SPAN_KIND_CONSUMER; + } + + @Override + protected String spanType() { + return DDSpanTypes.MESSAGE_CONSUMER; + } + + public void onConsume(final Scope scope, final StampedRecord record) { + final Span span = scope.span(); + if (record != null) { + final String topic = record.topic() == null ? "kafka" : record.topic(); + span.setTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic); + span.setTag("partition", record.partition()); + span.setTag("offset", record.offset()); + } + } +} diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java index 81c7bc1c60..07fa3754c5 100644 --- a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsProcessorInstrumentation.java @@ -1,6 +1,6 @@ package datadog.trace.instrumentation.kafka_streams; -import static io.opentracing.log.Fields.ERROR_OBJECT; +import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.CONSUMER_DECORATE; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate; @@ -11,15 +11,10 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.api.DDSpanTypes; -import datadog.trace.api.DDTags; import io.opentracing.Scope; -import io.opentracing.Span; import io.opentracing.SpanContext; import io.opentracing.propagation.Format; -import io.opentracing.tag.Tags; import io.opentracing.util.GlobalTracer; -import java.util.Collections; import java.util.Map; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; @@ -31,9 +26,6 @@ public class KafkaStreamsProcessorInstrumentation { // These two instrumentations work together to apply StreamTask.process. // The combination of these are needed because there's not a good instrumentation point. - public static final String[] HELPER_CLASS_NAMES = - new String[] {"datadog.trace.instrumentation.kafka_streams.TextMapExtractAdapter"}; - @AutoService(Instrumenter.class) public static class StartInstrumentation extends Instrumenter.Default { @@ -48,7 +40,12 @@ public class KafkaStreamsProcessorInstrumentation { @Override public String[] helperClassNames() { - return HELPER_CLASS_NAMES; + return new String[] { + "datadog.trace.agent.decorator.BaseDecorator", + "datadog.trace.agent.decorator.ClientDecorator", + packageName + ".KafkaStreamsDecorator", + packageName + ".TextMapExtractAdapter" + }; } @Override @@ -74,17 +71,13 @@ public class KafkaStreamsProcessorInstrumentation { .extract( Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(record.value.headers())); - GlobalTracer.get() - .buildSpan("kafka.consume") - .asChildOf(extractedContext) - .withTag(DDTags.SERVICE_NAME, "kafka") - .withTag(DDTags.RESOURCE_NAME, "Consume Topic " + record.topic()) - .withTag(DDTags.SPAN_TYPE, DDSpanTypes.MESSAGE_CONSUMER) - .withTag(Tags.COMPONENT.getKey(), "java-kafka") - .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CONSUMER) - .withTag("partition", record.partition()) - .withTag("offset", record.offset()) - .startActive(true); + final Scope scope = + GlobalTracer.get() + .buildSpan("kafka.consume") + .asChildOf(extractedContext) + .startActive(true); + CONSUMER_DECORATE.afterStart(scope); + CONSUMER_DECORATE.onConsume(scope, record); } } } @@ -103,7 +96,12 @@ public class KafkaStreamsProcessorInstrumentation { @Override public String[] helperClassNames() { - return HELPER_CLASS_NAMES; + return new String[] { + "datadog.trace.agent.decorator.BaseDecorator", + "datadog.trace.agent.decorator.ClientDecorator", + packageName + ".KafkaStreamsDecorator", + packageName + ".TextMapExtractAdapter" + }; } @Override @@ -119,11 +117,8 @@ public class KafkaStreamsProcessorInstrumentation { public static void stopSpan(@Advice.Thrown final Throwable throwable) { final Scope scope = GlobalTracer.get().scopeManager().active(); if (scope != null) { - if (throwable != null) { - final Span span = scope.span(); - Tags.ERROR.set(span, Boolean.TRUE); - span.log(Collections.singletonMap(ERROR_OBJECT, throwable)); - } + CONSUMER_DECORATE.onError(scope, throwable); + CONSUMER_DECORATE.beforeFinish(scope); scope.close(); } }