Do not send headers to old kafka servers

Kafka message bundles with versions below 2 (e.d. 0.10) do not support
headers so do not inject them. Otherwise client gets really upset.

This is how similar check is being done in Kafka client itself:
05fcfde8f6/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java (L411-L412)
This commit is contained in:
Nikolay Martynov 2019-05-16 13:55:47 -04:00
parent 4a8bea4602
commit 55d7c2a6f5
1 changed files with 27 additions and 21 deletions

View File

@ -17,9 +17,11 @@ import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.record.RecordBatch;
@AutoService(Instrumenter.class)
public final class KafkaProducerInstrumentation extends Instrumenter.Default {
@ -61,6 +63,7 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope startSpan(
@Advice.FieldValue("apiVersions") final ApiVersions apiVersions,
@Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
@Advice.Argument(value = 1, readOnly = false) Callback callback) {
final Scope scope = GlobalTracer.get().buildSpan("kafka.produce").startActive(false);
@ -69,28 +72,31 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
callback = new ProducerCallback(callback, scope);
try {
GlobalTracer.get()
.inject(
scope.span().context(),
Format.Builtin.TEXT_MAP,
new TextMapInjectAdapter(record.headers()));
} catch (final IllegalStateException e) {
// headers must be read-only from reused record. try again with new one.
record =
new ProducerRecord<>(
record.topic(),
record.partition(),
record.timestamp(),
record.key(),
record.value(),
record.headers());
// Do not inject headers for batch versions below 2
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2) {
try {
GlobalTracer.get()
.inject(
scope.span().context(),
Format.Builtin.TEXT_MAP,
new TextMapInjectAdapter(record.headers()));
} catch (final IllegalStateException e) {
// headers must be read-only from reused record. try again with new one.
record =
new ProducerRecord<>(
record.topic(),
record.partition(),
record.timestamp(),
record.key(),
record.value(),
record.headers());
GlobalTracer.get()
.inject(
scope.span().context(),
Format.Builtin.TEXT_MAP,
new TextMapInjectAdapter(record.headers()));
GlobalTracer.get()
.inject(
scope.span().context(),
Format.Builtin.TEXT_MAP,
new TextMapInjectAdapter(record.headers()));
}
}
return scope;