Extract common consumer-related kafka-clients parts to a new module (#3817)

This commit is contained in:
Mateusz Rzeszutek 2021-08-14 05:28:25 +02:00 committed by GitHub
parent 6c8d2e1fc3
commit 88be2940eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 183 additions and 16 deletions

View File

@ -12,6 +12,8 @@ muzzle {
}
dependencies {
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent"))
library("org.apache.kafka:kafka-clients:0.11.0.0")
testLibrary("org.springframework.kafka:spring-kafka:1.3.3.RELEASE")

View File

@ -6,11 +6,11 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients;
import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.TextMapExtractAdapter.GETTER;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -18,6 +18,7 @@ import org.apache.kafka.common.record.TimestampType;
public class KafkaConsumerTracer extends BaseTracer {
private static final KafkaConsumerTracer TRACER = new KafkaConsumerTracer();
private static final KafkaHeadersGetter GETTER = new KafkaHeadersGetter();
public static KafkaConsumerTracer tracer() {
return TRACER;
@ -45,7 +46,7 @@ public class KafkaConsumerTracer extends BaseTracer {
private Context extractParent(ConsumerRecord<?, ?> record) {
if (KafkaClientsConfig.isPropagationEnabled()) {
return extract(record.headers(), GETTER);
return extract(record, GETTER);
} else {
return Context.current();
}

View File

@ -5,7 +5,7 @@ plugins {
dependencies {
library("org.apache.kafka:kafka-clients:2.4.0")
testInstrumentation(project(":instrumentation:kafka-clients-0.11:javaagent"))
testInstrumentation(project(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent"))
testLibrary("org.springframework.kafka:spring-kafka:2.4.0.RELEASE")
testLibrary("org.springframework.kafka:spring-kafka-test:2.4.0.RELEASE")

View File

@ -0,0 +1,7 @@
plugins {
id("otel.javaagent-instrumentation")
}
dependencies {
compileOnly("org.apache.kafka:kafka-clients:0.11.0.0")
}

View File

@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafka;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.checkerframework.checker.nullness.qual.Nullable;
public final class KafkaConsumerAdditionalAttributesExtractor
extends AttributesExtractor<ConsumerRecord<?, ?>, Void> {
@Override
protected void onStart(AttributesBuilder attributes, ConsumerRecord<?, ?> consumerRecord) {
set(
attributes,
SemanticAttributes.MESSAGING_KAFKA_PARTITION,
(long) consumerRecord.partition());
if (consumerRecord.value() == null) {
set(attributes, SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE, true);
}
}
@Override
protected void onEnd(
AttributesBuilder attributes, ConsumerRecord<?, ?> consumerRecord, @Nullable Void unused) {}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafka;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.checkerframework.checker.nullness.qual.Nullable;
public final class KafkaConsumerAttributesExtractor
extends MessagingAttributesExtractor<ConsumerRecord<?, ?>, Void> {
@Override
protected String system(ConsumerRecord<?, ?> consumerRecord) {
return "kafka";
}
@Override
protected String destinationKind(ConsumerRecord<?, ?> consumerRecord) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}
@Override
protected String destination(ConsumerRecord<?, ?> consumerRecord) {
return consumerRecord.topic();
}
@Override
protected boolean temporaryDestination(ConsumerRecord<?, ?> consumerRecord) {
return false;
}
@Override
protected @Nullable String protocol(ConsumerRecord<?, ?> consumerRecord) {
return null;
}
@Override
protected @Nullable String protocolVersion(ConsumerRecord<?, ?> consumerRecord) {
return null;
}
@Override
protected @Nullable String url(ConsumerRecord<?, ?> consumerRecord) {
return null;
}
@Override
protected @Nullable String conversationId(ConsumerRecord<?, ?> consumerRecord) {
return null;
}
@Override
protected Long messagePayloadSize(ConsumerRecord<?, ?> consumerRecord) {
return (long) consumerRecord.serializedValueSize();
}
@Override
protected @Nullable Long messagePayloadCompressedSize(ConsumerRecord<?, ?> consumerRecord) {
return null;
}
@Override
protected MessageOperation operation(ConsumerRecord<?, ?> consumerRecord) {
return MessageOperation.PROCESS;
}
@Override
protected @Nullable String messageId(ConsumerRecord<?, ?> consumerRecord, @Nullable Void unused) {
return null;
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafka;
import static io.opentelemetry.api.common.AttributeKey.longKey;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.record.TimestampType;
import org.checkerframework.checker.nullness.qual.Nullable;
public final class KafkaConsumerExperimentalAttributesExtractor
extends AttributesExtractor<ConsumerRecord<?, ?>, Void> {
private static final AttributeKey<Long> KAFKA_OFFSET = longKey("kafka.offset");
private static final AttributeKey<Long> KAFKA_RECORD_QUEUE_TIME_MS =
longKey("kafka.record.queue_time_ms");
private static final boolean ENABLED =
Config.get()
.getBooleanProperty("otel.instrumentation.kafka.experimental-span-attributes", false);
public static boolean isEnabled() {
return ENABLED;
}
@Override
protected void onStart(AttributesBuilder attributes, ConsumerRecord<?, ?> consumerRecord) {
set(attributes, KAFKA_OFFSET, consumerRecord.offset());
// don't record a duration if the message was sent from an old Kafka client
if (consumerRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) {
long produceTime = consumerRecord.timestamp();
// this attribute shows how much time elapsed between the producer and the consumer of this
// message, which can be helpful for identifying queue bottlenecks
set(
attributes,
KAFKA_RECORD_QUEUE_TIME_MS,
Math.max(0L, System.currentTimeMillis() - produceTime));
}
}
@Override
protected void onEnd(
AttributesBuilder attributes, ConsumerRecord<?, ?> consumerRecord, @Nullable Void unused) {}
}

View File

@ -3,29 +3,28 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.kafkaclients;
package io.opentelemetry.javaagent.instrumentation.kafka;
import io.opentelemetry.context.propagation.TextMapGetter;
import java.nio.charset.StandardCharsets;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
public class TextMapExtractAdapter implements TextMapGetter<Headers> {
public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter();
import org.checkerframework.checker.nullness.qual.Nullable;
public final class KafkaHeadersGetter implements TextMapGetter<ConsumerRecord<?, ?>> {
@Override
public Iterable<String> keys(Headers headers) {
return StreamSupport.stream(headers.spliterator(), false)
public Iterable<String> keys(ConsumerRecord<?, ?> carrier) {
return StreamSupport.stream(carrier.headers().spliterator(), false)
.map(Header::key)
.collect(Collectors.toList());
}
@Nullable
@Override
public String get(Headers headers, String key) {
Header header = headers.lastHeader(key);
public String get(@Nullable ConsumerRecord<?, ?> carrier, String key) {
Header header = carrier.headers().lastHeader(key);
if (header == null) {
return null;
}

View File

@ -19,7 +19,7 @@ dependencies {
compileOnly("org.apache.kafka:kafka-streams:0.11.0.0")
// Include kafka-clients instrumentation for tests.
testInstrumentation(project(":instrumentation:kafka-clients-0.11:javaagent"))
testInstrumentation(project(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent"))
testImplementation("org.apache.kafka:kafka-streams:0.11.0.0")
testImplementation("org.apache.kafka:kafka-clients:0.11.0.0")

View File

@ -207,8 +207,9 @@ include(":instrumentation:jsf:jsf-testing-common")
include(":instrumentation:jsf:mojarra-1.2:javaagent")
include(":instrumentation:jsf:myfaces-1.2:javaagent")
include(":instrumentation:jsp-2.3:javaagent")
include(":instrumentation:kafka-clients-0.11:javaagent")
include(":instrumentation:kafka-clients-0.11:kafka-clients-2.4.0-testing")
include(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent")
include(":instrumentation:kafka-clients:kafka-clients-2.4.0-testing")
include(":instrumentation:kafka-clients:kafka-clients-common:javaagent")
include(":instrumentation:kafka-streams-0.11:javaagent")
include(":instrumentation:kotlinx-coroutines:javaagent")
include(":instrumentation:kubernetes-client-7.0:javaagent")