Adding an option to manually disable Kafka headers (DataDog/dd-trace-java#1448)

This commit is contained in:
Tyler Benson 2020-05-12 18:04:24 -04:00 committed by Trask Stalnaker
parent c58d259459
commit dbe1c00ac1
3 changed files with 92 additions and 1 deletions

View File

@ -73,6 +73,8 @@ public class Config {
public static final String RUNTIME_CONTEXT_FIELD_INJECTION =
"trace.runtime.context.field.injection";
public static final String KAFKA_CLIENT_PROPAGATION_ENABLED = "kafka.client.propagation.enabled";
public static final String LOG_INJECTION_ENABLED = "log.injection.enabled";
public static final String EXPERIMENTAL_LOG_CAPTURE_THRESHOLD =
"experimental.log.capture.threshold";
@ -93,6 +95,8 @@ public class Config {
public static final boolean DEFAULT_LOG_INJECTION_ENABLED = false;
public static final String DEFAULT_EXPERIMENTAL_LOG_CAPTURE_THRESHOLD = null;
public static final boolean DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED = true;
private static final String DEFAULT_TRACE_ANNOTATIONS = null;
private static final boolean DEFAULT_TRACE_EXECUTORS_ALL = false;
private static final String DEFAULT_TRACE_EXECUTORS = "";
@ -142,6 +146,8 @@ public class Config {
@Getter private final boolean sqlNormalizerEnabled;
@Getter private final boolean kafkaClientPropagationEnabled;
// Values from an optionally provided properties file
private static Properties propertiesFromConfigFile;
@ -203,6 +209,10 @@ public class Config {
sqlNormalizerEnabled =
getBooleanSettingFromEnvironment(SQL_NORMALIZER_ENABLED, DEFAULT_SQL_NORMALIZER_ENABLED);
kafkaClientPropagationEnabled =
getBooleanSettingFromEnvironment(
KAFKA_CLIENT_PROPAGATION_ENABLED, DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED);
log.debug("New instance: {}", this);
}
@ -262,6 +272,10 @@ public class Config {
sqlNormalizerEnabled =
getPropertyBooleanValue(properties, SQL_NORMALIZER_ENABLED, parent.sqlNormalizerEnabled);
kafkaClientPropagationEnabled =
getPropertyBooleanValue(
properties, KAFKA_CLIENT_PROPAGATION_ENABLED, parent.kafkaClientPropagationEnabled);
log.debug("New instance: {}", this);
}

View File

@ -30,6 +30,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService;
import io.grpc.Context;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.auto.config.Config;
import io.opentelemetry.auto.instrumentation.api.SpanWithScope;
import io.opentelemetry.auto.tooling.Instrumenter;
import io.opentelemetry.context.Scope;
@ -94,7 +95,12 @@ public final class KafkaProducerInstrumentation extends Instrumenter.Default {
// Do not inject headers for batch versions below 2
// This is how similar check is being done in Kafka client itself:
// https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2) {
// Also, do not inject headers if specified by JVM option or environment variable
// This can help in mixed client environments where clients < 0.11 that do not support
// headers attempt to read messages that were produced by clients > 0.11 and the magic
// value of the broker(s) is >= 2
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2
&& Config.get().isKafkaClientPropagationEnabled()) {
final Context context = withSpan(span, Context.current());
try {
OpenTelemetry.getPropagators()

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.opentelemetry.auto.config.Config
import io.opentelemetry.auto.test.AgentTestRunner
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
@ -29,10 +30,12 @@ import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.rule.KafkaEmbedded
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
import spock.lang.Unroll
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import static io.opentelemetry.auto.test.utils.ConfigUtils.withConfigOverride
import static io.opentelemetry.trace.Span.Kind.CONSUMER
import static io.opentelemetry.trace.Span.Kind.PRODUCER
@ -187,4 +190,72 @@ class KafkaClientTest extends AgentTestRunner {
}
@Unroll
def "test kafka client header propagation manual config"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)
// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)
// set the topic that needs to be consumed
def containerProperties
try {
// Different class names for test and latestDepTest.
containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC)
} catch (ClassNotFoundException | NoClassDefFoundError e) {
containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
}
// create a Kafka MessageListenerContainer
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
// create a thread safe queue to store the received message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
records.add(record)
}
})
// start the container and underlying message listener
container.start()
// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())
when:
String message = "Testing without headers"
withConfigOverride(Config.KAFKA_CLIENT_PROPAGATION_ENABLED, value) {
kafkaTemplate.send(SHARED_TOPIC, message)
}
then:
// check that the message was received
def received = records.poll(5, TimeUnit.SECONDS)
received.headers().iterator().hasNext() == expected
cleanup:
producerFactory.stop()
container?.stop()
where:
value | expected
"false" | false
"true" | true
String.valueOf(Config.DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED) | true
}
}