diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 4e29178577..a0c8d7f4c1 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -17,9 +17,11 @@ import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.record.RecordBatch; @AutoService(Instrumenter.class) public final class KafkaProducerInstrumentation extends Instrumenter.Default { @@ -61,6 +63,7 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { @Advice.OnMethodEnter(suppress = Throwable.class) public static Scope startSpan( + @Advice.FieldValue("apiVersions") final ApiVersions apiVersions, @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, @Advice.Argument(value = 1, readOnly = false) Callback callback) { final Scope scope = GlobalTracer.get().buildSpan("kafka.produce").startActive(false); @@ -69,28 +72,33 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { callback = new ProducerCallback(callback, scope); - try { - GlobalTracer.get() - .inject( - scope.span().context(), - Format.Builtin.TEXT_MAP, - new TextMapInjectAdapter(record.headers())); - } catch (final IllegalStateException e) { - // headers must be read-only from reused record. try again with new one. - record = - new ProducerRecord<>( - record.topic(), - record.partition(), - record.timestamp(), - record.key(), - record.value(), - record.headers()); + // Do not inject headers for batch versions below 2 + // This is how similar check is being done in Kafka client itself: + // https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412 + if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2) { + try { + GlobalTracer.get() + .inject( + scope.span().context(), + Format.Builtin.TEXT_MAP, + new TextMapInjectAdapter(record.headers())); + } catch (final IllegalStateException e) { + // headers must be read-only from reused record. try again with new one. + record = + new ProducerRecord<>( + record.topic(), + record.partition(), + record.timestamp(), + record.key(), + record.value(), + record.headers()); - GlobalTracer.get() - .inject( - scope.span().context(), - Format.Builtin.TEXT_MAP, - new TextMapInjectAdapter(record.headers())); + GlobalTracer.get() + .inject( + scope.span().context(), + Format.Builtin.TEXT_MAP, + new TextMapInjectAdapter(record.headers())); + } } return scope;