Merge pull request #836 from DataDog/mar-kolya/do-not-send-headers-to-old-kafka

Do not send headers to old kafka servers
This commit is contained in:
Nikolay Martynov 2019-05-17 11:55:52 -04:00 committed by GitHub
commit 6287e552fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 29 additions and 21 deletions

View File

@ -17,9 +17,11 @@ import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.record.RecordBatch;
@AutoService(Instrumenter.class)
public final class KafkaProducerInstrumentation extends Instrumenter.Default {
@ -61,6 +63,7 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope startSpan(
@Advice.FieldValue("apiVersions") final ApiVersions apiVersions,
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
@Advice.Argument(value = 1, readOnly = false) Callback callback) {
final Scope scope = GlobalTracer.get().buildSpan("kafka.produce").startActive(false);
@ -69,6 +72,10 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
callback = new ProducerCallback(callback, scope);
// Do not inject headers for batch versions below 2
// This is how similar check is being done in Kafka client itself:
// https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2) {
try {
GlobalTracer.get()
.inject(
@ -92,6 +99,7 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
Format.Builtin.TEXT_MAP,
new TextMapInjectAdapter(record.headers()));
}
}
return scope;
}