From dbe1c00ac15ab25730ff1767b8c7b1dd00e3e4d8 Mon Sep 17 00:00:00 2001 From: Tyler Benson Date: Tue, 12 May 2020 18:04:24 -0400 Subject: [PATCH] Adding an option to manually disable Kafka headers (DataDog/dd-trace-java#1448) --- .../io/opentelemetry/auto/config/Config.java | 14 ++++ .../KafkaProducerInstrumentation.java | 8 ++- .../src/test/groovy/KafkaClientTest.groovy | 71 +++++++++++++++++++ 3 files changed, 92 insertions(+), 1 deletion(-) diff --git a/agent-bootstrap/src/main/java/io/opentelemetry/auto/config/Config.java b/agent-bootstrap/src/main/java/io/opentelemetry/auto/config/Config.java index ffe19909ea..824467a4a1 100644 --- a/agent-bootstrap/src/main/java/io/opentelemetry/auto/config/Config.java +++ b/agent-bootstrap/src/main/java/io/opentelemetry/auto/config/Config.java @@ -73,6 +73,8 @@ public class Config { public static final String RUNTIME_CONTEXT_FIELD_INJECTION = "trace.runtime.context.field.injection"; + public static final String KAFKA_CLIENT_PROPAGATION_ENABLED = "kafka.client.propagation.enabled"; + public static final String LOG_INJECTION_ENABLED = "log.injection.enabled"; public static final String EXPERIMENTAL_LOG_CAPTURE_THRESHOLD = "experimental.log.capture.threshold"; @@ -93,6 +95,8 @@ public class Config { public static final boolean DEFAULT_LOG_INJECTION_ENABLED = false; public static final String DEFAULT_EXPERIMENTAL_LOG_CAPTURE_THRESHOLD = null; + public static final boolean DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED = true; + private static final String DEFAULT_TRACE_ANNOTATIONS = null; private static final boolean DEFAULT_TRACE_EXECUTORS_ALL = false; private static final String DEFAULT_TRACE_EXECUTORS = ""; @@ -142,6 +146,8 @@ public class Config { @Getter private final boolean sqlNormalizerEnabled; + @Getter private final boolean kafkaClientPropagationEnabled; + // Values from an optionally provided properties file private static Properties propertiesFromConfigFile; @@ -203,6 +209,10 @@ public class Config { sqlNormalizerEnabled = getBooleanSettingFromEnvironment(SQL_NORMALIZER_ENABLED, DEFAULT_SQL_NORMALIZER_ENABLED); + kafkaClientPropagationEnabled = + getBooleanSettingFromEnvironment( + KAFKA_CLIENT_PROPAGATION_ENABLED, DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED); + log.debug("New instance: {}", this); } @@ -262,6 +272,10 @@ public class Config { sqlNormalizerEnabled = getPropertyBooleanValue(properties, SQL_NORMALIZER_ENABLED, parent.sqlNormalizerEnabled); + kafkaClientPropagationEnabled = + getPropertyBooleanValue( + properties, KAFKA_CLIENT_PROPAGATION_ENABLED, parent.kafkaClientPropagationEnabled); + log.debug("New instance: {}", this); } diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/KafkaProducerInstrumentation.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/KafkaProducerInstrumentation.java index 9223aaadde..bbcbe4a147 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/KafkaProducerInstrumentation.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/auto/instrumentation/kafkaclients/KafkaProducerInstrumentation.java @@ -30,6 +30,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import io.grpc.Context; import io.opentelemetry.OpenTelemetry; +import io.opentelemetry.auto.config.Config; import io.opentelemetry.auto.instrumentation.api.SpanWithScope; import io.opentelemetry.auto.tooling.Instrumenter; import io.opentelemetry.context.Scope; @@ -94,7 +95,12 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default { // Do not inject headers for batch versions below 2 // This is how similar check is being done in Kafka client itself: // https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412 - if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2) { + // Also, do not inject headers if specified by JVM option or environment variable + // This can help in mixed client environments where clients < 0.11 that do not support + // headers attempt to read messages that were produced by clients > 0.11 and the magic + // value of the broker(s) is >= 2 + if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2 + && Config.get().isKafkaClientPropagationEnabled()) { final Context context = withSpan(span, Context.current()); try { OpenTelemetry.getPropagators() diff --git a/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy b/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy index 371579d5c5..1f85d96345 100644 --- a/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy +++ b/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import io.opentelemetry.auto.config.Config import io.opentelemetry.auto.test.AgentTestRunner import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord @@ -29,10 +30,12 @@ import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.test.rule.KafkaEmbedded import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Unroll import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit +import static io.opentelemetry.auto.test.utils.ConfigUtils.withConfigOverride import static io.opentelemetry.trace.Span.Kind.CONSUMER import static io.opentelemetry.trace.Span.Kind.PRODUCER @@ -187,4 +190,72 @@ class KafkaClientTest extends AgentTestRunner { } + @Unroll + def "test kafka client header propagation manual config"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties + try { + // Different class names for test and latestDepTest. + containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) + } catch (ClassNotFoundException | NoClassDefFoundError e) { + containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) + } + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + + when: + String message = "Testing without headers" + withConfigOverride(Config.KAFKA_CLIENT_PROPAGATION_ENABLED, value) { + kafkaTemplate.send(SHARED_TOPIC, message) + } + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + + received.headers().iterator().hasNext() == expected + + cleanup: + producerFactory.stop() + container?.stop() + + where: + value | expected + "false" | false + "true" | true + String.valueOf(Config.DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED) | true + + } + } +