From 55d7c2a6f554757897d2120d1d0cde8703518e77 Mon Sep 17 00:00:00 2001 From: Nikolay Martynov Date: Thu, 16 May 2019 13:55:47 -0400 Subject: [PATCH 1/2] Do not send headers to old kafka servers Kafka message bundles with versions below 2 (e.d. 0.10) do not support headers so do not inject them. Otherwise client gets really upset. This is how similar check is being done in Kafka client itself: https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412 --- .../KafkaProducerInstrumentation.java | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 4e29178577..4fb2334859 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -17,9 +17,11 @@ import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.record.RecordBatch; @AutoService(Instrumenter.class) public final class KafkaProducerInstrumentation extends Instrumenter.Default { @@ -61,6 +63,7 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { @Advice.OnMethodEnter(suppress = Throwable.class) public static Scope startSpan( + @Advice.FieldValue("apiVersions") final ApiVersions apiVersions, @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, @Advice.Argument(value = 1, readOnly = false) Callback callback) { final Scope scope = GlobalTracer.get().buildSpan("kafka.produce").startActive(false); @@ -69,28 +72,31 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { callback = new ProducerCallback(callback, scope); - try { - GlobalTracer.get() - .inject( - scope.span().context(), - Format.Builtin.TEXT_MAP, - new TextMapInjectAdapter(record.headers())); - } catch (final IllegalStateException e) { - // headers must be read-only from reused record. try again with new one. - record = - new ProducerRecord<>( - record.topic(), - record.partition(), - record.timestamp(), - record.key(), - record.value(), - record.headers()); + // Do not inject headers for batch versions below 2 + if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2) { + try { + GlobalTracer.get() + .inject( + scope.span().context(), + Format.Builtin.TEXT_MAP, + new TextMapInjectAdapter(record.headers())); + } catch (final IllegalStateException e) { + // headers must be read-only from reused record. try again with new one. + record = + new ProducerRecord<>( + record.topic(), + record.partition(), + record.timestamp(), + record.key(), + record.value(), + record.headers()); - GlobalTracer.get() - .inject( - scope.span().context(), - Format.Builtin.TEXT_MAP, - new TextMapInjectAdapter(record.headers())); + GlobalTracer.get() + .inject( + scope.span().context(), + Format.Builtin.TEXT_MAP, + new TextMapInjectAdapter(record.headers())); + } } return scope; From c02110a2a1d4847054f145ed2a2dbceb84844b19 Mon Sep 17 00:00:00 2001 From: Nikolay Martynov Date: Fri, 17 May 2019 11:10:12 -0400 Subject: [PATCH 2/2] Add comment explaining Kafka versions limitations check --- .../kafka_clients/KafkaProducerInstrumentation.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 4fb2334859..a0c8d7f4c1 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -73,6 +73,8 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { callback = new ProducerCallback(callback, scope); // Do not inject headers for batch versions below 2 + // This is how similar check is being done in Kafka client itself: + // https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412 if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2) { try { GlobalTracer.get()