Handle Scope in Kafka producer properly

Holding onto scope in `Callback` is bad because that code may run on
different thread.
This commit is contained in:
Nikolay Martynov 2019-05-24 16:27:28 -04:00
parent a3a98ceac8
commit 0b85f048d1
1 changed files with 14 additions and 12 deletions

View File

@ -10,6 +10,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import io.opentracing.Scope; import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.propagation.Format; import io.opentracing.propagation.Format;
import io.opentracing.util.GlobalTracer; import io.opentracing.util.GlobalTracer;
import java.util.Map; import java.util.Map;
@ -70,7 +71,7 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
PRODUCER_DECORATE.afterStart(scope); PRODUCER_DECORATE.afterStart(scope);
PRODUCER_DECORATE.onProduce(scope, record); PRODUCER_DECORATE.onProduce(scope, record);
callback = new ProducerCallback(callback, scope); callback = new ProducerCallback(callback, scope.span());
// Do not inject headers for batch versions below 2 // Do not inject headers for batch versions below 2
// This is how similar check is being done in Kafka client itself: // This is how similar check is being done in Kafka client itself:
@ -115,24 +116,25 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
public static class ProducerCallback implements Callback { public static class ProducerCallback implements Callback {
private final Callback callback; private final Callback callback;
private final Scope scope; private final Span span;
public ProducerCallback(final Callback callback, final Scope scope) { public ProducerCallback(final Callback callback, final Span span) {
this.callback = callback; this.callback = callback;
this.scope = scope; this.span = span;
} }
@Override @Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) { public void onCompletion(final RecordMetadata metadata, final Exception exception) {
PRODUCER_DECORATE.onError(scope, exception); try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
try { PRODUCER_DECORATE.onError(span, exception);
if (callback != null) { try {
callback.onCompletion(metadata, exception); if (callback != null) {
callback.onCompletion(metadata, exception);
}
} finally {
PRODUCER_DECORATE.beforeFinish(span);
span.finish();
} }
} finally {
PRODUCER_DECORATE.beforeFinish(scope);
scope.span().finish();
scope.close();
} }
} }
} }