Update kafka-streams-0.11 to new agent api
This commit is contained in:
parent
7259e288ff
commit
5f0a2ae6b4
|
|
@ -3,8 +3,7 @@ package datadog.trace.instrumentation.kafka_streams;
|
|||
import datadog.trace.agent.decorator.ClientDecorator;
|
||||
import datadog.trace.api.DDSpanTypes;
|
||||
import datadog.trace.api.DDTags;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.Span;
|
||||
import datadog.trace.instrumentation.api.AgentSpan;
|
||||
import io.opentracing.tag.Tags;
|
||||
import org.apache.kafka.streams.processor.internals.StampedRecord;
|
||||
|
||||
|
|
@ -36,8 +35,7 @@ public class KafkaStreamsDecorator extends ClientDecorator {
|
|||
return DDSpanTypes.MESSAGE_CONSUMER;
|
||||
}
|
||||
|
||||
public void onConsume(final Scope scope, final StampedRecord record) {
|
||||
final Span span = scope.span();
|
||||
public void onConsume(final AgentSpan span, final StampedRecord record) {
|
||||
if (record != null) {
|
||||
final String topic = record.topic() == null ? "kafka" : record.topic();
|
||||
span.setTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic);
|
||||
|
|
|
|||
|
|
@ -1,6 +1,12 @@
|
|||
package datadog.trace.instrumentation.kafka_streams;
|
||||
|
||||
import static datadog.trace.instrumentation.api.AgentTracer.activateSpan;
|
||||
import static datadog.trace.instrumentation.api.AgentTracer.activeScope;
|
||||
import static datadog.trace.instrumentation.api.AgentTracer.activeSpan;
|
||||
import static datadog.trace.instrumentation.api.AgentTracer.propagate;
|
||||
import static datadog.trace.instrumentation.api.AgentTracer.startSpan;
|
||||
import static datadog.trace.instrumentation.kafka_streams.KafkaStreamsDecorator.CONSUMER_DECORATE;
|
||||
import static datadog.trace.instrumentation.kafka_streams.TextMapExtractAdapter.GETTER;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate;
|
||||
|
|
@ -11,10 +17,9 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
|||
|
||||
import com.google.auto.service.AutoService;
|
||||
import datadog.trace.agent.tooling.Instrumenter;
|
||||
import io.opentracing.Scope;
|
||||
import io.opentracing.SpanContext;
|
||||
import io.opentracing.propagation.Format;
|
||||
import io.opentracing.util.GlobalTracer;
|
||||
import datadog.trace.context.TraceScope;
|
||||
import datadog.trace.instrumentation.api.AgentSpan;
|
||||
import datadog.trace.instrumentation.api.AgentSpan.Context;
|
||||
import java.util.Map;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.method.MethodDescription;
|
||||
|
|
@ -65,23 +70,18 @@ public class KafkaStreamsProcessorInstrumentation {
|
|||
public static class StartSpanAdvice {
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class)
|
||||
public static void startSpan(@Advice.Return final StampedRecord record) {
|
||||
public static void onExit(@Advice.Return final StampedRecord record) {
|
||||
if (record == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final SpanContext extractedContext =
|
||||
GlobalTracer.get()
|
||||
.extract(
|
||||
Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(record.value.headers()));
|
||||
final Context extractedContext = propagate().extract(record.value, GETTER);
|
||||
|
||||
final Scope scope =
|
||||
GlobalTracer.get()
|
||||
.buildSpan("kafka.consume")
|
||||
.asChildOf(extractedContext)
|
||||
.startActive(true);
|
||||
CONSUMER_DECORATE.afterStart(scope);
|
||||
CONSUMER_DECORATE.onConsume(scope, record);
|
||||
final AgentSpan span = startSpan("kafka.consume", extractedContext);
|
||||
CONSUMER_DECORATE.afterStart(span);
|
||||
CONSUMER_DECORATE.onConsume(span, record);
|
||||
|
||||
activateSpan(span, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -119,10 +119,13 @@ public class KafkaStreamsProcessorInstrumentation {
|
|||
|
||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||
public static void stopSpan(@Advice.Thrown final Throwable throwable) {
|
||||
final Scope scope = GlobalTracer.get().scopeManager().active();
|
||||
final AgentSpan span = activeSpan();
|
||||
if (span != null) {
|
||||
CONSUMER_DECORATE.onError(span, throwable);
|
||||
CONSUMER_DECORATE.beforeFinish(span);
|
||||
}
|
||||
final TraceScope scope = activeScope();
|
||||
if (scope != null) {
|
||||
CONSUMER_DECORATE.onError(scope, throwable);
|
||||
CONSUMER_DECORATE.beforeFinish(scope);
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,30 +1,31 @@
|
|||
package datadog.trace.instrumentation.kafka_streams;
|
||||
|
||||
import io.opentracing.propagation.TextMap;
|
||||
import datadog.trace.instrumentation.api.AgentPropagation;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
|
||||
public class TextMapExtractAdapter implements TextMap {
|
||||
public class TextMapExtractAdapter implements AgentPropagation.Getter<ConsumerRecord> {
|
||||
|
||||
private final Map<String, String> map = new HashMap<>();
|
||||
public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter();
|
||||
|
||||
public TextMapExtractAdapter(final Headers headers) {
|
||||
for (final Header header : headers) {
|
||||
map.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
|
||||
@Override
|
||||
public Iterable<String> keys(final ConsumerRecord carrier) {
|
||||
final List<String> keys = new ArrayList<>();
|
||||
for (final Header header : carrier.headers()) {
|
||||
keys.add(header.key());
|
||||
}
|
||||
return keys;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Map.Entry<String, String>> iterator() {
|
||||
return map.entrySet().iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(final String key, final String value) {
|
||||
throw new UnsupportedOperationException("Use inject adapter instead");
|
||||
public String get(final ConsumerRecord carrier, final String key) {
|
||||
final Header header = carrier.headers().lastHeader(key);
|
||||
if (header == null) {
|
||||
return null;
|
||||
}
|
||||
return new String(header.value(), StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue