Improve naming for instrumentation class.

This commit is contained in:
Tyler Benson 2018-03-29 12:43:32 +08:00
parent 513cded8aa
commit a1a2a0db27
1 changed files with 44 additions and 46 deletions

View File

@ -17,56 +17,54 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.TimestampType;
// This is necessary because SourceNodeRecordDeserializer drops the headers. :-( // This is necessary because SourceNodeRecordDeserializer drops the headers. :-(
public class KafkaStreamsSourceNodeRecordDeserializerInstrumentation { @AutoService(Instrumenter.class)
public class KafkaStreamsSourceNodeRecordDeserializerInstrumentation
extends Instrumenter.Configurable {
@AutoService(Instrumenter.class) public KafkaStreamsSourceNodeRecordDeserializerInstrumentation() {
public static class StartInstrumentation extends Instrumenter.Configurable { super("kafka", "kafka-streams");
}
public StartInstrumentation() { @Override
super("kafka", "kafka-streams"); public AgentBuilder apply(final AgentBuilder agentBuilder) {
} return agentBuilder
.type(
named("org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer"),
classLoaderHasClasses("org.apache.kafka.streams.state.internals.KeyValueIterators"))
.transform(DDTransformers.defaultTransformers())
.transform(
DDAdvice.create()
.advice(
isMethod()
.and(isPublic())
.and(named("deserialize"))
.and(
takesArgument(
0, named("org.apache.kafka.clients.consumer.ConsumerRecord")))
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecord"))),
SaveHeadersAdvice.class.getName()))
.asDecorator();
}
@Override public static class SaveHeadersAdvice {
public AgentBuilder apply(final AgentBuilder agentBuilder) {
return agentBuilder
.type(
named("org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer"),
classLoaderHasClasses("org.apache.kafka.streams.state.internals.KeyValueIterators"))
.transform(DDTransformers.defaultTransformers())
.transform(
DDAdvice.create()
.advice(
isMethod()
.and(isPublic())
.and(named("deserialize"))
.and(
takesArgument(
0, named("org.apache.kafka.clients.consumer.ConsumerRecord")))
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecord"))),
SaveHeadersAdvice.class.getName()))
.asDecorator();
}
public static class SaveHeadersAdvice { @Advice.OnMethodExit(suppress = Throwable.class)
public static void saveHeaders(
@Advice.OnMethodExit(suppress = Throwable.class) @Advice.Argument(0) final ConsumerRecord incoming,
public static void saveHeaders( @Advice.Return(readOnly = false) ConsumerRecord result) {
@Advice.Argument(0) final ConsumerRecord incoming, result =
@Advice.Return(readOnly = false) ConsumerRecord result) { new ConsumerRecord<>(
result = result.topic(),
new ConsumerRecord<>( result.partition(),
result.topic(), result.offset(),
result.partition(), result.timestamp(),
result.offset(), TimestampType.CREATE_TIME,
result.timestamp(), result.checksum(),
TimestampType.CREATE_TIME, result.serializedKeySize(),
result.checksum(), result.serializedValueSize(),
result.serializedKeySize(), result.key(),
result.serializedValueSize(), result.value(),
result.key(), incoming.headers());
result.value(),
incoming.headers());
}
} }
} }
} }