diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java index cbd222383a..bf3eeffff1 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java @@ -3,8 +3,7 @@ package datadog.trace.instrumentation.kafka_clients; import datadog.trace.agent.decorator.ClientDecorator; import datadog.trace.api.DDSpanTypes; import datadog.trace.api.DDTags; -import io.opentracing.Scope; -import io.opentracing.Span; +import datadog.trace.instrumentation.api.AgentSpan; import io.opentracing.tag.Tags; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; @@ -54,8 +53,7 @@ public abstract class KafkaDecorator extends ClientDecorator { @Override protected abstract String spanKind(); - public void onConsume(final Scope scope, final ConsumerRecord record) { - final Span span = scope.span(); + public void onConsume(final AgentSpan span, final ConsumerRecord record) { if (record != null) { final String topic = record.topic() == null ? "kafka" : record.topic(); span.setTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic); @@ -64,9 +62,8 @@ public abstract class KafkaDecorator extends ClientDecorator { } } - public void onProduce(final Scope scope, final ProducerRecord record) { + public void onProduce(final AgentSpan span, final ProducerRecord record) { if (record != null) { - final Span span = scope.span(); final String topic = record.topic() == null ? "kafka" : record.topic(); if (record.partition() != null) { diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index f555215c4b..e592d95fb1 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -1,6 +1,10 @@ package datadog.trace.instrumentation.kafka_clients; +import static datadog.trace.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.instrumentation.api.AgentTracer.propagate; +import static datadog.trace.instrumentation.api.AgentTracer.startSpan; import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.PRODUCER_DECORATE; +import static datadog.trace.instrumentation.kafka_clients.TextMapInjectAdapter.SETTER; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; @@ -9,10 +13,8 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; -import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.propagation.Format; -import io.opentracing.util.GlobalTracer; +import datadog.trace.instrumentation.api.AgentScope; +import datadog.trace.instrumentation.api.AgentSpan; import java.util.Map; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; @@ -63,26 +65,22 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { public static class ProducerAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static Scope startSpan( + public static AgentScope onEnter( @Advice.FieldValue("apiVersions") final ApiVersions apiVersions, @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, @Advice.Argument(value = 1, readOnly = false) Callback callback) { - final Scope scope = GlobalTracer.get().buildSpan("kafka.produce").startActive(false); - PRODUCER_DECORATE.afterStart(scope); - PRODUCER_DECORATE.onProduce(scope, record); + final AgentSpan span = startSpan("kafka.produce"); + PRODUCER_DECORATE.afterStart(span); + PRODUCER_DECORATE.onProduce(span, record); - callback = new ProducerCallback(callback, scope.span()); + callback = new ProducerCallback(callback, span); // Do not inject headers for batch versions below 2 // This is how similar check is being done in Kafka client itself: // https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412 if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2) { try { - GlobalTracer.get() - .inject( - scope.span().context(), - Format.Builtin.TEXT_MAP, - new TextMapInjectAdapter(record.headers())); + propagate().inject(span, record, SETTER); } catch (final IllegalStateException e) { // headers must be read-only from reused record. try again with new one. record = @@ -94,20 +92,16 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { record.value(), record.headers()); - GlobalTracer.get() - .inject( - scope.span().context(), - Format.Builtin.TEXT_MAP, - new TextMapInjectAdapter(record.headers())); + propagate().inject(span, record, SETTER); } } - return scope; + return activateSpan(span, false); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( - @Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) { + @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { PRODUCER_DECORATE.onError(scope, throwable); PRODUCER_DECORATE.beforeFinish(scope); scope.close(); @@ -116,16 +110,16 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { public static class ProducerCallback implements Callback { private final Callback callback; - private final Span span; + private final AgentSpan span; - public ProducerCallback(final Callback callback, final Span span) { + public ProducerCallback(final Callback callback, final AgentSpan span) { this.callback = callback; this.span = span; } @Override public void onCompletion(final RecordMetadata metadata, final Exception exception) { - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { + try (final AgentScope scope = activateSpan(span, false)) { PRODUCER_DECORATE.onError(span, exception); try { if (callback != null) { diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java index c16034c0d4..0d718a7bb4 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java @@ -1,30 +1,31 @@ package datadog.trace.instrumentation.kafka_clients; -import io.opentracing.propagation.TextMap; +import datadog.trace.instrumentation.api.AgentPropagation; import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; -public class TextMapExtractAdapter implements TextMap { +public class TextMapExtractAdapter implements AgentPropagation.Getter { - private final Map map = new HashMap<>(); + public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter(); - public TextMapExtractAdapter(final Headers headers) { - for (final Header header : headers) { - map.put(header.key(), new String(header.value(), StandardCharsets.UTF_8)); + @Override + public Iterable keys(final ConsumerRecord carrier) { + final List keys = new ArrayList<>(); + for (final Header header : carrier.headers()) { + keys.add(header.key()); } + return keys; } @Override - public Iterator> iterator() { - return map.entrySet().iterator(); - } - - @Override - public void put(final String key, final String value) { - throw new UnsupportedOperationException("Use inject adapter instead"); + public String get(final ConsumerRecord carrier, final String key) { + final Header header = carrier.headers().lastHeader(key); + if (header == null) { + return null; + } + return new String(header.value(), StandardCharsets.UTF_8); } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapInjectAdapter.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapInjectAdapter.java index e270655a3c..90750fa5ed 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapInjectAdapter.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapInjectAdapter.java @@ -1,26 +1,15 @@ package datadog.trace.instrumentation.kafka_clients; -import io.opentracing.propagation.TextMap; +import datadog.trace.instrumentation.api.AgentPropagation; import java.nio.charset.StandardCharsets; -import java.util.Iterator; -import java.util.Map; -import org.apache.kafka.common.header.Headers; +import org.apache.kafka.clients.producer.ProducerRecord; -public class TextMapInjectAdapter implements TextMap { +public class TextMapInjectAdapter implements AgentPropagation.Setter { - private final Headers headers; - - public TextMapInjectAdapter(final Headers headers) { - this.headers = headers; - } + public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter(); @Override - public Iterator> iterator() { - throw new UnsupportedOperationException("Use extract adapter instead"); - } - - @Override - public void put(final String key, final String value) { - headers.remove(key).add(key, value.getBytes(StandardCharsets.UTF_8)); + public void set(final ProducerRecord carrier, final String key, final String value) { + carrier.headers().remove(key).add(key, value.getBytes(StandardCharsets.UTF_8)); } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java index d6fca5a3b9..cff3770e14 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java @@ -1,9 +1,13 @@ package datadog.trace.instrumentation.kafka_clients; -import io.opentracing.Scope; -import io.opentracing.SpanContext; -import io.opentracing.propagation.Format; -import io.opentracing.util.GlobalTracer; +import static datadog.trace.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.instrumentation.api.AgentTracer.propagate; +import static datadog.trace.instrumentation.api.AgentTracer.startSpan; +import static datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter.GETTER; + +import datadog.trace.instrumentation.api.AgentScope; +import datadog.trace.instrumentation.api.AgentSpan; +import datadog.trace.instrumentation.api.AgentSpan.Context; import java.util.Iterator; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -18,7 +22,7 @@ public class TracingIterator implements Iterator { * Note: this may potentially create problems if this iterator is used from different threads. But * at the moment we cannot do much about this. */ - private Scope currentScope; + private AgentScope currentScope; public TracingIterator( final Iterator delegateIterator, @@ -50,13 +54,11 @@ public class TracingIterator implements Iterator { try { if (next != null) { - final SpanContext spanContext = - GlobalTracer.get() - .extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(next.headers())); - currentScope = - GlobalTracer.get().buildSpan(operationName).asChildOf(spanContext).startActive(true); - decorator.afterStart(currentScope); - decorator.onConsume(currentScope, next); + final Context spanContext = propagate().extract(next, GETTER); + final AgentSpan span = startSpan(operationName, spanContext); + decorator.afterStart(span); + decorator.onConsume(span, next); + currentScope = activateSpan(span, true); } } catch (final Exception e) { log.debug("Error during decoration", e);