diff --git a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsSourceNodeRecordDeserializerInstrumentation.java b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsSourceNodeRecordDeserializerInstrumentation.java index 8ed9d375ae..20449ceb7c 100644 --- a/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsSourceNodeRecordDeserializerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamsSourceNodeRecordDeserializerInstrumentation.java @@ -17,56 +17,54 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.record.TimestampType; // This is necessary because SourceNodeRecordDeserializer drops the headers. :-( -public class KafkaStreamsSourceNodeRecordDeserializerInstrumentation { +@AutoService(Instrumenter.class) +public class KafkaStreamsSourceNodeRecordDeserializerInstrumentation + extends Instrumenter.Configurable { - @AutoService(Instrumenter.class) - public static class StartInstrumentation extends Instrumenter.Configurable { + public KafkaStreamsSourceNodeRecordDeserializerInstrumentation() { + super("kafka", "kafka-streams"); + } - public StartInstrumentation() { - super("kafka", "kafka-streams"); - } + @Override + public AgentBuilder apply(final AgentBuilder agentBuilder) { + return agentBuilder + .type( + named("org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer"), + classLoaderHasClasses("org.apache.kafka.streams.state.internals.KeyValueIterators")) + .transform(DDTransformers.defaultTransformers()) + .transform( + DDAdvice.create() + .advice( + isMethod() + .and(isPublic()) + .and(named("deserialize")) + .and( + takesArgument( + 0, named("org.apache.kafka.clients.consumer.ConsumerRecord"))) + .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecord"))), + SaveHeadersAdvice.class.getName())) + .asDecorator(); + } - @Override - public AgentBuilder apply(final AgentBuilder agentBuilder) { - return agentBuilder - .type( - named("org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer"), - classLoaderHasClasses("org.apache.kafka.streams.state.internals.KeyValueIterators")) - .transform(DDTransformers.defaultTransformers()) - .transform( - DDAdvice.create() - .advice( - isMethod() - .and(isPublic()) - .and(named("deserialize")) - .and( - takesArgument( - 0, named("org.apache.kafka.clients.consumer.ConsumerRecord"))) - .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecord"))), - SaveHeadersAdvice.class.getName())) - .asDecorator(); - } + public static class SaveHeadersAdvice { - public static class SaveHeadersAdvice { - - @Advice.OnMethodExit(suppress = Throwable.class) - public static void saveHeaders( - @Advice.Argument(0) final ConsumerRecord incoming, - @Advice.Return(readOnly = false) ConsumerRecord result) { - result = - new ConsumerRecord<>( - result.topic(), - result.partition(), - result.offset(), - result.timestamp(), - TimestampType.CREATE_TIME, - result.checksum(), - result.serializedKeySize(), - result.serializedValueSize(), - result.key(), - result.value(), - incoming.headers()); - } + @Advice.OnMethodExit(suppress = Throwable.class) + public static void saveHeaders( + @Advice.Argument(0) final ConsumerRecord incoming, + @Advice.Return(readOnly = false) ConsumerRecord result) { + result = + new ConsumerRecord<>( + result.topic(), + result.partition(), + result.offset(), + result.timestamp(), + TimestampType.CREATE_TIME, + result.checksum(), + result.serializedKeySize(), + result.serializedValueSize(), + result.key(), + result.value(), + incoming.headers()); } } }