Change kafka non-spec'd attributes (#1904)

* Change kafka non-spec'd attributes

* Move to existing config place

* Shorten name, and match api Config name
This commit is contained in:
Trask Stalnaker 2020-12-14 22:11:48 -08:00 committed by GitHub
parent 4af2655fa2
commit b8b0257e33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 65 additions and 30 deletions

View File

@ -7,11 +7,16 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients;
import io.opentelemetry.instrumentation.api.config.Config;
public final class KafkaClientConfiguration {
public final class KafkaClientsConfig {
public static boolean isPropagationEnabled() {
return Config.get().getBooleanProperty("otel.instrumentation.kafka.client-propagation", true);
}
private KafkaClientConfiguration() {}
public static boolean captureExperimentalSpanAttributes() {
return Config.get()
.getBooleanProperty("otel.instrumentation.kafka.experimental-span-attributes", false);
}
private KafkaClientsConfig() {}
}

View File

@ -46,7 +46,7 @@ public class KafkaConsumerTracer extends BaseTracer {
}
private Context extractParent(ConsumerRecord<?, ?> record) {
if (KafkaClientConfiguration.isPropagationEnabled()) {
if (KafkaClientsConfig.isPropagationEnabled()) {
return extract(record.headers(), GETTER);
} else {
return Context.current();
@ -60,19 +60,21 @@ public class KafkaConsumerTracer extends BaseTracer {
public void onConsume(Span span, long startTimeMillis, ConsumerRecord<?, ?> record) {
// TODO should we set topic + offset as messaging.message_id?
span.setAttribute(SemanticAttributes.MESSAGING_KAFKA_PARTITION, record.partition());
span.setAttribute("kafka-clients.offset", record.offset());
if (record.value() == null) {
span.setAttribute(SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE, true);
}
// don't record a duration if the message was sent from an old Kafka client
if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) {
long produceTime = record.timestamp();
// this attribute shows how much time elapsed between the producer and the consumer of this
// message, which can be helpful for identifying queue bottlenecks
span.setAttribute(
"kafka-clients.record.queue_time_ms", Math.max(0L, startTimeMillis - produceTime));
if (KafkaClientsConfig.captureExperimentalSpanAttributes()) {
span.setAttribute("kafka.offset", record.offset());
// don't record a duration if the message was sent from an old Kafka client
if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) {
long produceTime = record.timestamp();
// this attribute shows how much time elapsed between the producer and the consumer of this
// message, which can be helpful for identifying queue bottlenecks
span.setAttribute(
"kafka.record.queue_time_ms", Math.max(0L, startTimeMillis - produceTime));
}
}
}

View File

@ -36,7 +36,7 @@ public class KafkaProducerTracer extends BaseTracer {
// value of the broker(s) is >= 2
public boolean shouldPropagate(ApiVersions apiVersions) {
return apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2
&& KafkaClientConfiguration.isPropagationEnabled();
&& KafkaClientsConfig.isPropagationEnabled();
}
public String spanNameOnProduce(ProducerRecord<?, ?> record) {

View File

@ -32,7 +32,7 @@ public class TracingIterator implements Iterator<ConsumerRecord<?, ?>> {
Iterator<ConsumerRecord<?, ?>> delegateIterator, KafkaConsumerTracer tracer) {
this.delegateIterator = delegateIterator;
this.tracer = tracer;
this.propagationEnabled = KafkaClientConfiguration.isPropagationEnabled();
this.propagationEnabled = KafkaClientsConfig.isPropagationEnabled();
}
@Override

View File

@ -23,6 +23,8 @@ import org.springframework.kafka.test.utils.KafkaTestUtils
class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
static final PREVIOUS_CONFIG = ConfigUtils.updateConfigAndResetInstrumentation {
it.setProperty("otel.instrumentation.kafka.client-propagation", "false")
// TODO run tests both with and without experimental span attributes
it.setProperty("otel.instrumentation.kafka.experimental-span-attributes", "true")
}
def cleanupSpec() {
@ -97,8 +99,8 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka-clients.offset" 0
"kafka-clients.record.queue_time_ms" { it >= 0 }
"kafka.offset" 0
"kafka.record.queue_time_ms" { it >= 0 }
}
}
}

View File

@ -9,6 +9,7 @@ import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import io.opentelemetry.api.trace.attributes.SemanticAttributes
import io.opentelemetry.instrumentation.test.utils.ConfigUtils
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.consumer.ConsumerConfig
@ -28,6 +29,14 @@ import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
static final PREVIOUS_CONFIG = ConfigUtils.updateConfigAndResetInstrumentation {
// TODO run tests both with and without experimental span attributes
it.setProperty("otel.instrumentation.kafka.experimental-span-attributes", "true")
}
def cleanupSpec() {
ConfigUtils.setConfig(PREVIOUS_CONFIG)
}
def "test kafka produce and consume"() {
setup:
@ -108,8 +117,8 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka-clients.offset" 0
"kafka-clients.record.queue_time_ms" { it >= 0 }
"kafka.offset" 0
"kafka.record.queue_time_ms" { it >= 0 }
}
}
basicSpan(it, 3, "producer callback", span(0))
@ -198,8 +207,8 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka-clients.offset" 0
"kafka-clients.record.queue_time_ms" { it >= 0 }
"kafka.offset" 0
"kafka.record.queue_time_ms" { it >= 0 }
}
}
basicSpan(it, 3, "producer callback", span(0))
@ -284,8 +293,8 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"${SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE.key}" true
"kafka-clients.offset" 0
"kafka-clients.record.queue_time_ms" { it >= 0 }
"kafka.offset" 0
"kafka.record.queue_time_ms" { it >= 0 }
}
}
}
@ -357,8 +366,8 @@ class KafkaClientPropagationEnabledTest extends KafkaClientBaseTest {
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka-clients.offset" 0
"kafka-clients.record.queue_time_ms" { it >= 0 }
"kafka.offset" 0
"kafka.record.queue_time_ms" { it >= 0 }
}
}
}

View File

@ -10,12 +10,17 @@ import static io.opentelemetry.javaagent.instrumentation.kafkastreams.TextMapExt
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Span.Kind;
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import org.apache.kafka.streams.processor.internals.StampedRecord;
public class KafkaStreamsTracer extends BaseTracer {
private static final KafkaStreamsTracer TRACER = new KafkaStreamsTracer();
private final boolean captureExperimentalSpanAttributes =
Config.get()
.getBooleanProperty("otel.instrumentation.kafka.experimental-span-attributes", false);
public static KafkaStreamsTracer tracer() {
return TRACER;
}
@ -45,7 +50,9 @@ public class KafkaStreamsTracer extends BaseTracer {
public void onConsume(Span span, StampedRecord record) {
if (record != null) {
span.setAttribute(SemanticAttributes.MESSAGING_KAFKA_PARTITION, record.partition());
span.setAttribute("kafka-streams.offset", record.offset());
if (captureExperimentalSpanAttributes) {
span.setAttribute("kafka.offset", record.offset());
}
}
}

View File

@ -12,6 +12,7 @@ import io.opentelemetry.api.trace.propagation.HttpTraceContext
import io.opentelemetry.context.Context
import io.opentelemetry.context.propagation.TextMapPropagator
import io.opentelemetry.instrumentation.test.AgentTestRunner
import io.opentelemetry.instrumentation.test.utils.ConfigUtils
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import org.apache.kafka.clients.consumer.ConsumerRecord
@ -32,6 +33,15 @@ import org.springframework.kafka.test.utils.KafkaTestUtils
import spock.lang.Shared
class KafkaStreamsTest extends AgentTestRunner {
static final PREVIOUS_CONFIG = ConfigUtils.updateConfigAndResetInstrumentation {
// TODO run tests both with and without experimental span attributes
it.setProperty("otel.instrumentation.kafka.experimental-span-attributes", "true")
}
def cleanupSpec() {
ConfigUtils.setConfig(PREVIOUS_CONFIG)
}
static final STREAM_PENDING = "test.pending"
static final STREAM_PROCESSED = "test.processed"
@ -150,8 +160,8 @@ class KafkaStreamsTest extends AgentTestRunner {
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka-clients.offset" 0
"kafka-clients.record.queue_time_ms" { it >= 0 }
"kafka.offset" 0
"kafka.record.queue_time_ms" { it >= 0 }
}
}
// STREAMING span 1
@ -166,7 +176,7 @@ class KafkaStreamsTest extends AgentTestRunner {
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka-streams.offset" 0
"kafka.offset" 0
"asdf" "testing"
}
}
@ -195,8 +205,8 @@ class KafkaStreamsTest extends AgentTestRunner {
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka-clients.offset" 0
"kafka-clients.record.queue_time_ms" { it >= 0 }
"kafka.offset" 0
"kafka.record.queue_time_ms" { it >= 0 }
"testing" 123
}
}