Update kafka-clients-0.11 to new agent api

This commit is contained in:
Trask Stalnaker 2019-10-19 11:57:44 -07:00
parent b53652303c
commit 7259e288ff
5 changed files with 59 additions and 76 deletions

View File

@ -3,8 +3,7 @@ package datadog.trace.instrumentation.kafka_clients;
import datadog.trace.agent.decorator.ClientDecorator;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import datadog.trace.instrumentation.api.AgentSpan;
import io.opentracing.tag.Tags;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -54,8 +53,7 @@ public abstract class KafkaDecorator extends ClientDecorator {
@Override
protected abstract String spanKind();
public void onConsume(final Scope scope, final ConsumerRecord record) {
final Span span = scope.span();
public void onConsume(final AgentSpan span, final ConsumerRecord record) {
if (record != null) {
final String topic = record.topic() == null ? "kafka" : record.topic();
span.setTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic);
@ -64,9 +62,8 @@ public abstract class KafkaDecorator extends ClientDecorator {
}
}
public void onProduce(final Scope scope, final ProducerRecord record) {
public void onProduce(final AgentSpan span, final ProducerRecord record) {
if (record != null) {
final Span span = scope.span();
final String topic = record.topic() == null ? "kafka" : record.topic();
if (record.partition() != null) {

View File

@ -1,6 +1,10 @@
package datadog.trace.instrumentation.kafka_clients;
import static datadog.trace.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.PRODUCER_DECORATE;
import static datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter.SETTER;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
@ -9,10 +13,8 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.propagation.Format;
import io.opentracing.util.GlobalTracer;
import datadog.trace.instrumentation.api.AgentScope;
import datadog.trace.instrumentation.api.AgentSpan;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
@ -63,26 +65,22 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
public static class ProducerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope startSpan(
public static AgentScope onEnter(
@Advice.FieldValue("apiVersions") final ApiVersions apiVersions,
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
@Advice.Argument(value = 1, readOnly = false) Callback callback) {
final Scope scope = GlobalTracer.get().buildSpan("kafka.produce").startActive(false);
PRODUCER_DECORATE.afterStart(scope);
PRODUCER_DECORATE.onProduce(scope, record);
final AgentSpan span = startSpan("kafka.produce");
PRODUCER_DECORATE.afterStart(span);
PRODUCER_DECORATE.onProduce(span, record);
callback = new ProducerCallback(callback, scope.span());
callback = new ProducerCallback(callback, span);
// Do not inject headers for batch versions below 2
// This is how similar check is being done in Kafka client itself:
// https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2) {
try {
GlobalTracer.get()
.inject(
scope.span().context(),
Format.Builtin.TEXT_MAP,
new TextMapInjectAdapter(record.headers()));
propagate().inject(span, record, SETTER);
} catch (final IllegalStateException e) {
// headers must be read-only from reused record. try again with new one.
record =
@ -94,20 +92,16 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
record.value(),
record.headers());
GlobalTracer.get()
.inject(
scope.span().context(),
Format.Builtin.TEXT_MAP,
new TextMapInjectAdapter(record.headers()));
propagate().inject(span, record, SETTER);
}
}
return scope;
return activateSpan(span, false);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) {
@Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) {
PRODUCER_DECORATE.onError(scope, throwable);
PRODUCER_DECORATE.beforeFinish(scope);
scope.close();
@ -116,16 +110,16 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
public static class ProducerCallback implements Callback {
private final Callback callback;
private final Span span;
private final AgentSpan span;
public ProducerCallback(final Callback callback, final Span span) {
public ProducerCallback(final Callback callback, final AgentSpan span) {
this.callback = callback;
this.span = span;
}
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
try (final AgentScope scope = activateSpan(span, false)) {
PRODUCER_DECORATE.onError(span, exception);
try {
if (callback != null) {

View File

@ -1,30 +1,31 @@
package datadog.trace.instrumentation.kafka_clients;
import io.opentracing.propagation.TextMap;
import datadog.trace.instrumentation.api.AgentPropagation;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
public class TextMapExtractAdapter implements TextMap {
public class TextMapExtractAdapter implements AgentPropagation.Getter<ConsumerRecord> {
private final Map<String, String> map = new HashMap<>();
public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter();
public TextMapExtractAdapter(final Headers headers) {
for (final Header header : headers) {
map.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
@Override
public Iterable<String> keys(final ConsumerRecord carrier) {
final List<String> keys = new ArrayList<>();
for (final Header header : carrier.headers()) {
keys.add(header.key());
}
return keys;
}
@Override
public Iterator<Map.Entry<String, String>> iterator() {
return map.entrySet().iterator();
public String get(final ConsumerRecord carrier, final String key) {
final Header header = carrier.headers().lastHeader(key);
if (header == null) {
return null;
}
@Override
public void put(final String key, final String value) {
throw new UnsupportedOperationException("Use inject adapter instead");
return new String(header.value(), StandardCharsets.UTF_8);
}
}

View File

@ -1,26 +1,15 @@
package datadog.trace.instrumentation.kafka_clients;
import io.opentracing.propagation.TextMap;
import datadog.trace.instrumentation.api.AgentPropagation;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.clients.producer.ProducerRecord;
public class TextMapInjectAdapter implements TextMap {
public class TextMapInjectAdapter implements AgentPropagation.Setter<ProducerRecord> {
private final Headers headers;
public TextMapInjectAdapter(final Headers headers) {
this.headers = headers;
}
public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter();
@Override
public Iterator<Map.Entry<String, String>> iterator() {
throw new UnsupportedOperationException("Use extract adapter instead");
}
@Override
public void put(final String key, final String value) {
headers.remove(key).add(key, value.getBytes(StandardCharsets.UTF_8));
public void set(final ProducerRecord carrier, final String key, final String value) {
carrier.headers().remove(key).add(key, value.getBytes(StandardCharsets.UTF_8));
}
}

View File

@ -1,9 +1,13 @@
package datadog.trace.instrumentation.kafka_clients;
import io.opentracing.Scope;
import io.opentracing.SpanContext;
import io.opentracing.propagation.Format;
import io.opentracing.util.GlobalTracer;
import static datadog.trace.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter.GETTER;
import datadog.trace.instrumentation.api.AgentScope;
import datadog.trace.instrumentation.api.AgentSpan;
import datadog.trace.instrumentation.api.AgentSpan.Context;
import java.util.Iterator;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -18,7 +22,7 @@ public class TracingIterator implements Iterator<ConsumerRecord> {
* Note: this may potentially create problems if this iterator is used from different threads. But
* at the moment we cannot do much about this.
*/
private Scope currentScope;
private AgentScope currentScope;
public TracingIterator(
final Iterator<ConsumerRecord> delegateIterator,
@ -50,13 +54,11 @@ public class TracingIterator implements Iterator<ConsumerRecord> {
try {
if (next != null) {
final SpanContext spanContext =
GlobalTracer.get()
.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(next.headers()));
currentScope =
GlobalTracer.get().buildSpan(operationName).asChildOf(spanContext).startActive(true);
decorator.afterStart(currentScope);
decorator.onConsume(currentScope, next);
final Context spanContext = propagate().extract(next, GETTER);
final AgentSpan span = startSpan(operationName, spanContext);
decorator.afterStart(span);
decorator.onConsume(span, next);
currentScope = activateSpan(span, true);
}
} catch (final Exception e) {
log.debug("Error during decoration", e);