Add async propagation flags for kafka consumer.
This commit is contained in:
parent
b3f15ca133
commit
7095ea3426
|
|
@ -59,6 +59,7 @@ public class TracingIterator implements Iterator<ConsumerRecord> {
|
||||||
decorator.afterStart(span);
|
decorator.afterStart(span);
|
||||||
decorator.onConsume(span, next);
|
decorator.onConsume(span, next);
|
||||||
currentScope = activateSpan(span, true);
|
currentScope = activateSpan(span, true);
|
||||||
|
currentScope.setAsyncPropagation(true);
|
||||||
}
|
}
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
log.debug("Error during decoration", e);
|
log.debug("Error during decoration", e);
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,7 @@ public class KafkaStreamsProcessorInstrumentation {
|
||||||
CONSUMER_DECORATE.afterStart(span);
|
CONSUMER_DECORATE.afterStart(span);
|
||||||
CONSUMER_DECORATE.onConsume(span, record);
|
CONSUMER_DECORATE.onConsume(span, record);
|
||||||
|
|
||||||
activateSpan(span, true);
|
activateSpan(span, true).setAsyncPropagation(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -119,6 +119,7 @@ public class KafkaStreamsProcessorInstrumentation {
|
||||||
|
|
||||||
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
|
||||||
public static void stopSpan(@Advice.Thrown final Throwable throwable) {
|
public static void stopSpan(@Advice.Thrown final Throwable throwable) {
|
||||||
|
// This is dangerous... we assume the span/scope is the one we expect, but it may not be.
|
||||||
final AgentSpan span = activeSpan();
|
final AgentSpan span = activeSpan();
|
||||||
if (span != null) {
|
if (span != null) {
|
||||||
CONSUMER_DECORATE.onError(span, throwable);
|
CONSUMER_DECORATE.onError(span, throwable);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue