JMX Scraper: Kafka server, producer and consumer YAMLs and integration tests added (#1670)

This commit is contained in:
Robert Niedziela 2025-02-14 18:01:21 +01:00 committed by GitHub
parent a5e9082385
commit b92a465bc8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 987 additions and 72 deletions

View File

@ -140,51 +140,51 @@ abstract class KafkaIntegrationTest extends AbstractIntegrationTest {
metric,
"kafka.partition.count",
"The number of partitions on the broker",
"{partitions}"),
"{partition}"),
metric ->
assertGauge(
metric,
"kafka.partition.offline",
"The number of partitions offline",
"{partitions}"),
"{partition}"),
metric ->
assertGauge(
metric,
"kafka.partition.under_replicated",
"The number of under replicated partitions",
"{partitions}"),
"{partition}"),
metric ->
assertSumWithAttributes(
metric,
"kafka.isr.operation.count",
"The number of in-sync replica shrink and expand operations",
"{operations}",
"{operation}",
attrs -> attrs.containsOnly(entry("operation", "shrink")),
attrs -> attrs.containsOnly(entry("operation", "expand"))),
metric ->
assertGauge(
metric,
"kafka.controller.active.count",
"controller is active on broker",
"{controllers}"),
"For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.",
"{controller}"),
metric ->
assertSum(
metric,
"kafka.leader.election.rate",
"leader election rate - increasing indicates broker failures",
"{elections}"),
"Leader election rate - increasing indicates broker failures",
"{election}"),
metric ->
assertGauge(
metric,
"kafka.max.lag",
"max lag in messages between follower and leader replicas",
"{messages}"),
"Max lag in messages between follower and leader replicas",
"{message}"),
metric ->
assertSum(
metric,
"kafka.unclean.election.rate",
"unclean leader election rate - increasing indicates broker failures",
"{elections}"));
"Unclean leader election rate - increasing indicates broker failures",
"{election}"));
}
static class KafkaBrokerTargetIntegrationTest extends KafkaIntegrationTest {
@ -235,52 +235,52 @@ abstract class KafkaIntegrationTest extends AbstractIntegrationTest {
metric,
"kafka.consumer.bytes-consumed-rate",
"The average number of bytes consumed per second",
"by",
"By",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.fetch-rate",
"The number of fetch requests for all topics per second",
"1"),
"{request}"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.fetch-size-avg",
"The average number of bytes fetched per request",
"by",
"By",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.records-consumed-rate",
"The average number of records consumed per second",
"1",
"{record}",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.records-lag-max",
"Number of messages the consumer lags behind the producer",
"1"),
"{record}"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.total.bytes-consumed-rate",
"The average number of bytes consumed for all topics per second",
"by"),
"By"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.total.fetch-size-avg",
"The average number of bytes fetched per request for all topics",
"by"),
"By"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.total.records-consumed-rate",
"The average number of records consumed for all topics per second",
"1"));
"{record}"));
}
}
@ -300,14 +300,14 @@ abstract class KafkaIntegrationTest extends AbstractIntegrationTest {
metric,
"kafka.producer.byte-rate",
"The average number of bytes sent per second for a topic",
"by",
"By",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.compression-rate",
"The average compression rate of record batches for a topic",
"1",
"{ratio}",
topics),
metric ->
assertKafkaGauge(
@ -320,27 +320,27 @@ abstract class KafkaIntegrationTest extends AbstractIntegrationTest {
metric,
"kafka.producer.outgoing-byte-rate",
"The average number of outgoing bytes sent per second to all servers",
"by"),
"By"),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.record-error-rate",
"The average per-second number of record sends that resulted in errors for a topic",
"1",
"{record}",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.record-retry-rate",
"The average per-second number of retried record sends for a topic",
"1",
"{record}",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.record-send-rate",
"The average number of records sent per second for a topic",
"1",
"{record}",
topics),
metric ->
assertKafkaGauge(
@ -353,10 +353,13 @@ abstract class KafkaIntegrationTest extends AbstractIntegrationTest {
metric,
"kafka.producer.request-rate",
"The average number of requests sent per second",
"1"),
"{request}"),
metric ->
assertKafkaGauge(
metric, "kafka.producer.response-rate", "Responses received per second", "1"));
metric,
"kafka.producer.response-rate",
"Responses received per second",
"{response}"));
}
}

View File

@ -16,45 +16,45 @@
def consumerFetchManagerMetrics = otel.mbeans("kafka.consumer:client-id=*,type=consumer-fetch-manager-metrics")
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.fetch-rate",
"The number of fetch requests for all topics per second", "1",
"The number of fetch requests for all topics per second", "{request}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"fetch-rate", otel.&doubleValueCallback)
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.records-lag-max",
"Number of messages the consumer lags behind the producer", "1",
"Number of messages the consumer lags behind the producer", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"records-lag-max", otel.&doubleValueCallback)
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.bytes-consumed-rate",
"The average number of bytes consumed for all topics per second", "by",
"The average number of bytes consumed for all topics per second", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"bytes-consumed-rate", otel.&doubleValueCallback)
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.fetch-size-avg",
"The average number of bytes fetched per request for all topics", "by",
"The average number of bytes fetched per request for all topics", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"fetch-size-avg", otel.&doubleValueCallback)
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.records-consumed-rate",
"The average number of records consumed for all topics per second", "1",
"The average number of records consumed for all topics per second", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"records-consumed-rate", otel.&doubleValueCallback)
def consumerFetchManagerMetricsByTopic = otel.mbeans("kafka.consumer:client-id=*,topic=*,type=consumer-fetch-manager-metrics")
otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.bytes-consumed-rate",
"The average number of bytes consumed per second", "by",
"The average number of bytes consumed per second", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"bytes-consumed-rate", otel.&doubleValueCallback)
otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.fetch-size-avg",
"The average number of bytes fetched per request", "by",
"The average number of bytes fetched per request", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"fetch-size-avg", otel.&doubleValueCallback)
otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.records-consumed-rate",
"The average number of records consumed per second", "1",
"The average number of records consumed per second", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"records-consumed-rate", otel.&doubleValueCallback)

View File

@ -20,7 +20,7 @@ otel.instrument(producerMetrics, "kafka.producer.io-wait-time-ns-avg",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"io-wait-time-ns-avg", otel.&doubleValueCallback)
otel.instrument(producerMetrics, "kafka.producer.outgoing-byte-rate",
"The average number of outgoing bytes sent per second to all servers", "by",
"The average number of outgoing bytes sent per second to all servers", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"outgoing-byte-rate", otel.&doubleValueCallback)
otel.instrument(producerMetrics, "kafka.producer.request-latency-avg",
@ -28,37 +28,37 @@ otel.instrument(producerMetrics, "kafka.producer.request-latency-avg",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"request-latency-avg", otel.&doubleValueCallback)
otel.instrument(producerMetrics, "kafka.producer.request-rate",
"The average number of requests sent per second", "1",
"The average number of requests sent per second", "{request}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"request-rate", otel.&doubleValueCallback)
otel.instrument(producerMetrics, "kafka.producer.response-rate",
"Responses received per second", "1",
"Responses received per second", "{response}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"response-rate", otel.&doubleValueCallback)
def producerTopicMetrics = otel.mbeans("kafka.producer:client-id=*,topic=*,type=producer-topic-metrics")
otel.instrument(producerTopicMetrics, "kafka.producer.byte-rate",
"The average number of bytes sent per second for a topic", "by",
"The average number of bytes sent per second for a topic", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"byte-rate", otel.&doubleValueCallback)
otel.instrument(producerTopicMetrics, "kafka.producer.compression-rate",
"The average compression rate of record batches for a topic", "1",
"The average compression rate of record batches for a topic", "{ratio}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"compression-rate", otel.&doubleValueCallback)
otel.instrument(producerTopicMetrics, "kafka.producer.record-error-rate",
"The average per-second number of record sends that resulted in errors for a topic", "1",
"The average per-second number of record sends that resulted in errors for a topic", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"record-error-rate", otel.&doubleValueCallback)
otel.instrument(producerTopicMetrics, "kafka.producer.record-retry-rate",
"The average per-second number of retried record sends for a topic", "1",
"The average per-second number of retried record sends for a topic", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"record-retry-rate", otel.&doubleValueCallback)
otel.instrument(producerTopicMetrics, "kafka.producer.record-send-rate",
"The average number of records sent per second for a topic", "1",
"The average number of records sent per second for a topic", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"record-send-rate", otel.&doubleValueCallback)

View File

@ -18,7 +18,7 @@ def messagesInPerSec = otel.mbean("kafka.server:type=BrokerTopicMetrics,name=Mes
otel.instrument(messagesInPerSec,
"kafka.message.count",
"The number of messages received by the broker",
"{messages}",
"{message}",
"Count", otel.&longCounterCallback)
def requests = otel.mbeans(["kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec",
@ -26,7 +26,7 @@ def requests = otel.mbeans(["kafka.server:type=BrokerTopicMetrics,name=TotalProd
otel.instrument(requests,
"kafka.request.count",
"The number of requests received by the broker",
"{requests}",
"{request}",
[
"type" : { mbean -> switch(mbean.name().getKeyProperty("name")) {
case "TotalProduceRequestsPerSec":
@ -45,7 +45,7 @@ def failedRequests = otel.mbeans(["kafka.server:type=BrokerTopicMetrics,name=Fai
otel.instrument(failedRequests,
"kafka.request.failed",
"The number of requests to the broker resulting in a failure",
"{requests}",
"{request}",
[
"type" : { mbean -> switch(mbean.name().getKeyProperty("name")) {
case "FailedProduceRequestsPerSec":
@ -100,7 +100,7 @@ def network = otel.mbeans(["kafka.server:type=BrokerTopicMetrics,name=BytesInPer
otel.instrument(network,
"kafka.network.io",
"The bytes received or sent by the broker",
"by",
"By",
[
"state" : { mbean -> switch(mbean.name().getKeyProperty("name")) {
case "BytesInPerSec":
@ -119,7 +119,7 @@ def purgatorySize = otel.mbeans(["kafka.server:type=DelayedOperationPurgatory,na
otel.instrument(purgatorySize,
"kafka.purgatory.size",
"The number of requests waiting in purgatory",
"{requests}",
"{request}",
[
"type" : { mbean -> mbean.name().getKeyProperty("delayedOperation").toLowerCase() },
],
@ -129,21 +129,21 @@ def partitionCount = otel.mbean("kafka.server:type=ReplicaManager,name=Partition
otel.instrument(partitionCount,
"kafka.partition.count",
"The number of partitions on the broker",
"{partitions}",
"{partition}",
"Value", otel.&longValueCallback)
def partitionOffline = otel.mbean("kafka.controller:type=KafkaController,name=OfflinePartitionsCount")
otel.instrument(partitionOffline,
"kafka.partition.offline",
"The number of partitions offline",
"{partitions}",
"{partition}",
"Value", otel.&longValueCallback)
def partitionUnderReplicated = otel.mbean("kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions")
otel.instrument(partitionUnderReplicated,
"kafka.partition.under_replicated",
"The number of under replicated partitions",
"{partitions}",
"{partition}",
"Value", otel.&longValueCallback)
def isrOperations = otel.mbeans(["kafka.server:type=ReplicaManager,name=IsrShrinksPerSec",
@ -151,7 +151,7 @@ def isrOperations = otel.mbeans(["kafka.server:type=ReplicaManager,name=IsrShrin
otel.instrument(isrOperations,
"kafka.isr.operation.count",
"The number of in-sync replica shrink and expand operations",
"{operations}",
"{operation}",
[
"operation" : { mbean -> switch(mbean.name().getKeyProperty("name")) {
case "IsrShrinksPerSec":
@ -167,29 +167,29 @@ otel.instrument(isrOperations,
def maxLag = otel.mbean("kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica")
otel.instrument(maxLag, "kafka.max.lag", "max lag in messages between follower and leader replicas",
"{messages}", "Value", otel.&longValueCallback)
otel.instrument(maxLag, "kafka.max.lag", "Max lag in messages between follower and leader replicas",
"{message}", "Value", otel.&longValueCallback)
def activeControllerCount = otel.mbean("kafka.controller:type=KafkaController,name=ActiveControllerCount")
otel.instrument(activeControllerCount, "kafka.controller.active.count", "controller is active on broker",
"{controllers}", "Value", otel.&longValueCallback)
otel.instrument(activeControllerCount, "kafka.controller.active.count", "For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.",
"{controller}", "Value", otel.&longValueCallback)
def leaderElectionRate = otel.mbean("kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs")
otel.instrument(leaderElectionRate, "kafka.leader.election.rate", "leader election rate - increasing indicates broker failures",
"{elections}", "Count", otel.&longCounterCallback)
otel.instrument(leaderElectionRate, "kafka.leader.election.rate", "Leader election rate - increasing indicates broker failures",
"{election}", "Count", otel.&longCounterCallback)
def uncleanLeaderElections = otel.mbean("kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec")
otel.instrument(uncleanLeaderElections, "kafka.unclean.election.rate", "unclean leader election rate - increasing indicates broker failures",
"{elections}", "Count", otel.&longCounterCallback)
otel.instrument(uncleanLeaderElections, "kafka.unclean.election.rate", "Unclean leader election rate - increasing indicates broker failures",
"{election}", "Count", otel.&longCounterCallback)
def requestQueueSize = otel.mbean("kafka.network:type=RequestChannel,name=RequestQueueSize")
otel.instrument(requestQueueSize, "kafka.request.queue", "size of the request queue",
"{requests}", "Value", otel.&longValueCallback)
otel.instrument(requestQueueSize, "kafka.request.queue", "Size of the request queue",
"{request}", "Value", otel.&longValueCallback)
def logFlushRate = otel.mbean("kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs")
otel.instrument(logFlushRate, "kafka.logs.flush.time.count", "log flush count",
otel.instrument(logFlushRate, "kafka.logs.flush.time.count", "Log flush count",
"ms", "Count", otel.&longCounterCallback)
otel.instrument(logFlushRate, "kafka.logs.flush.time.median", "log flush time - 50th percentile",
otel.instrument(logFlushRate, "kafka.logs.flush.time.median", "Log flush time - 50th percentile",
"ms", "50thPercentile", otel.&doubleValueCallback)
otel.instrument(logFlushRate, "kafka.logs.flush.time.99p", "log flush time - 99th percentile",
otel.instrument(logFlushRate, "kafka.logs.flush.time.99p", "Log flush time - 99th percentile",
"ms", "99thPercentile", otel.&doubleValueCallback)

View File

@ -22,6 +22,7 @@ import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
@ -56,6 +57,7 @@ public abstract class TargetSystemIntegrationTest {
private static Network network;
private static OtlpGrpcServer otlpServer;
private Collection<GenericContainer<?>> prerequisiteContainers;
private GenericContainer<?> target;
private JmxScraperContainer scraper;
@ -86,12 +88,23 @@ public abstract class TargetSystemIntegrationTest {
@AfterEach
void afterEach() {
if (target != null && target.isRunning()) {
target.stop();
}
if (scraper != null && scraper.isRunning()) {
scraper.stop();
}
if (target != null && target.isRunning()) {
target.stop();
}
if (prerequisiteContainers != null) {
prerequisiteContainers.forEach(
container -> {
if (container.isRunning()) {
container.stop();
}
});
}
if (otlpServer != null) {
otlpServer.reset();
}
@ -103,14 +116,31 @@ public abstract class TargetSystemIntegrationTest {
@Test
void endToEndTest(@TempDir Path tmpDir) {
startContainers(tmpDir);
verifyMetrics();
}
protected void startContainers(Path tmpDir) {
prerequisiteContainers = createPrerequisiteContainers();
target =
createTargetContainer(JMX_PORT)
.withLogConsumer(new Slf4jLogConsumer(targetSystemLogger))
.withNetwork(network)
.withNetworkAliases(TARGET_SYSTEM_NETWORK_ALIAS);
// If there are any containers that must be started before target then initialize them.
// Then make target depending on them, so it is started after dependencies
for (GenericContainer<?> container : prerequisiteContainers) {
container.withNetwork(network);
target.dependsOn(container);
}
// Target container must be running before scraper container is customized.
// It is necessary to allow interactions with the container, like file copying etc.
target.start();
// Create and initialize scraper container
scraper =
new JmxScraperContainer(otlpEndpoint, scraperBaseImage())
.withLogConsumer(new Slf4jLogConsumer(jmxScraperLogger))
@ -119,14 +149,13 @@ public abstract class TargetSystemIntegrationTest {
scraper = customizeScraperContainer(scraper, target, tmpDir);
scraper.start();
verifyMetrics();
}
protected void verifyMetrics() {
MetricsVerifier metricsVerifier = createMetricsVerifier();
await()
.atMost(Duration.ofSeconds(60))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(
() -> {
List<ExportMetricsServiceRequest> receivedMetrics = otlpServer.getMetrics();
@ -158,6 +187,10 @@ public abstract class TargetSystemIntegrationTest {
return scraper;
}
protected Collection<GenericContainer<?>> createPrerequisiteContainers() {
return Collections.emptyList();
}
private static class OtlpGrpcServer extends ServerExtension {
private final BlockingQueue<ExportMetricsServiceRequest> metricRequests =

View File

@ -0,0 +1,142 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxscraper.target_systems.kafka;
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attribute;
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeGroup;
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeWithAnyValue;
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaConsumerContainer;
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaContainer;
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaProducerContainer;
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createZookeeperContainer;
import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer;
import io.opentelemetry.contrib.jmxscraper.assertions.AttributeMatcher;
import io.opentelemetry.contrib.jmxscraper.target_systems.MetricsVerifier;
import io.opentelemetry.contrib.jmxscraper.target_systems.TargetSystemIntegrationTest;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
public class KafkaConsumerIntegrationTest extends TargetSystemIntegrationTest {
@Override
protected Collection<GenericContainer<?>> createPrerequisiteContainers() {
GenericContainer<?> zookeeper =
createZookeeperContainer()
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper")))
.withNetworkAliases("zookeeper");
GenericContainer<?> kafka =
createKafkaContainer()
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka")))
.withNetworkAliases("kafka")
.dependsOn(zookeeper);
GenericContainer<?> kafkaProducer =
createKafkaProducerContainer()
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka-producer")))
.withNetworkAliases("kafka-producer")
.dependsOn(kafka);
return Arrays.asList(zookeeper, kafka, kafkaProducer);
}
@Override
protected GenericContainer<?> createTargetContainer(int jmxPort) {
return createKafkaConsumerContainer()
.withEnv("JMX_PORT", Integer.toString(jmxPort))
.withExposedPorts(jmxPort)
.waitingFor(Wait.forListeningPorts(jmxPort));
}
@Override
protected JmxScraperContainer customizeScraperContainer(
JmxScraperContainer scraper, GenericContainer<?> target, Path tempDir) {
return scraper.withTargetSystem("kafka-consumer");
}
@Override
protected MetricsVerifier createMetricsVerifier() {
// TODO: change to follow semconv
AttributeMatcher clientIdAttribute = attributeWithAnyValue("client-id");
AttributeMatcher topicAttribute = attribute("topic", "test-topic-1");
return MetricsVerifier.create()
.add(
"kafka.consumer.fetch-rate",
metric ->
metric
.hasDescription("The number of fetch requests for all topics per second")
.hasUnit("{request}")
.isGauge()
.hasDataPointsWithOneAttribute(clientIdAttribute))
.add(
"kafka.consumer.total.bytes-consumed-rate",
metric ->
metric
.hasDescription(
"The average number of bytes consumed for all topics per second")
.hasUnit("By")
.isGauge()
.hasDataPointsWithOneAttribute(clientIdAttribute))
.add(
"kafka.consumer.total.fetch-size-avg",
metric ->
metric
.hasDescription(
"The average number of bytes fetched per request for all topics")
.hasUnit("By")
.isGauge()
.hasDataPointsWithOneAttribute(clientIdAttribute))
.add(
"kafka.consumer.total.records-consumed-rate",
metric ->
metric
.hasDescription(
"The average number of records consumed for all topics per second")
.hasUnit("{record}")
.isGauge()
.hasDataPointsWithOneAttribute(clientIdAttribute))
.add(
"kafka.consumer.records-lag-max",
metric ->
metric
.hasDescription("Number of messages the consumer lags behind the producer")
.hasUnit("{record}")
.isGauge()
.hasDataPointsWithOneAttribute(clientIdAttribute))
.add(
"kafka.consumer.bytes-consumed-rate",
metric ->
metric
.hasDescription("The average number of bytes consumed per second")
.hasUnit("By")
.isGauge()
.hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute)))
.add(
"kafka.consumer.fetch-size-avg",
metric ->
metric
.hasDescription("The average number of bytes fetched per request")
.hasUnit("By")
.isGauge()
.hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute)))
.add(
"kafka.consumer.records-consumed-rate",
metric ->
metric
.hasDescription("The average number of records consumed per second")
.hasUnit("{record}")
.isGauge()
.hasDataPointsWithAttributes(
attributeGroup(clientIdAttribute, topicAttribute)));
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxscraper.target_systems.kafka;
import java.time.Duration;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
public class KafkaContainerFactory {
private static final int KAFKA_PORT = 9092;
private static final String KAFKA_BROKER = "kafka:" + KAFKA_PORT;
private static final String KAFKA_DOCKER_IMAGE = "bitnami/kafka:2.8.1";
private KafkaContainerFactory() {}
public static GenericContainer<?> createZookeeperContainer() {
return new GenericContainer<>("zookeeper:3.5")
.withStartupTimeout(Duration.ofMinutes(2))
.waitingFor(Wait.forListeningPort());
}
public static GenericContainer<?> createKafkaContainer() {
return new GenericContainer<>(KAFKA_DOCKER_IMAGE)
.withEnv("KAFKA_CFG_ZOOKEEPER_CONNECT", "zookeeper:2181")
.withEnv("ALLOW_PLAINTEXT_LISTENER", "yes") // Removed in 3.5.1
.withStartupTimeout(Duration.ofMinutes(2))
.withExposedPorts(KAFKA_PORT)
.waitingFor(
Wait.forLogMessage(".*KafkaServer.*started \\(kafka.server.KafkaServer\\).*", 1));
}
public static GenericContainer<?> createKafkaProducerContainer() {
return new GenericContainer<>(KAFKA_DOCKER_IMAGE)
.withCommand(
"sh",
"-c",
"echo 'Sending messages to test-topic-1'; "
+ "i=1; while true; do echo \"Message $i\"; sleep .25; i=$((i+1)); done | /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server "
+ KAFKA_BROKER
+ " --topic test-topic-1;")
.withStartupTimeout(Duration.ofMinutes(2))
.waitingFor(Wait.forLogMessage(".*Welcome to the Bitnami kafka container.*", 1));
}
public static GenericContainer<?> createKafkaConsumerContainer() {
return new GenericContainer<>(KAFKA_DOCKER_IMAGE)
.withCommand(
"kafka-console-consumer.sh",
"--bootstrap-server",
KAFKA_BROKER,
"--whitelist",
"test-topic-.*",
"--max-messages",
"100")
.withStartupTimeout(Duration.ofMinutes(2))
.waitingFor(Wait.forListeningPort());
}
}

View File

@ -0,0 +1,215 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxscraper.target_systems.kafka;
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attribute;
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeGroup;
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaContainer;
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createZookeeperContainer;
import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer;
import io.opentelemetry.contrib.jmxscraper.assertions.AttributeMatcherGroup;
import io.opentelemetry.contrib.jmxscraper.target_systems.MetricsVerifier;
import io.opentelemetry.contrib.jmxscraper.target_systems.TargetSystemIntegrationTest;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
public class KafkaIntegrationTest extends TargetSystemIntegrationTest {
@Override
protected Collection<GenericContainer<?>> createPrerequisiteContainers() {
GenericContainer<?> zookeeper =
createZookeeperContainer()
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper")))
.withNetworkAliases("zookeeper");
return Collections.singletonList(zookeeper);
}
@Override
protected GenericContainer<?> createTargetContainer(int jmxPort) {
return createKafkaContainer().withEnv("JMX_PORT", Integer.toString(jmxPort));
}
@Override
protected JmxScraperContainer customizeScraperContainer(
JmxScraperContainer scraper, GenericContainer<?> target, Path tempDir) {
return scraper.withTargetSystem("kafka");
}
@Override
protected MetricsVerifier createMetricsVerifier() {
AttributeMatcherGroup[] requestTypes = {
attributeGroup(attribute("type", "Produce")),
attributeGroup(attribute("type", "FetchFollower")),
attributeGroup(attribute("type", "FetchConsumer"))
};
return MetricsVerifier.create()
.add(
"kafka.message.count",
metric ->
metric
.hasDescription("The number of messages received by the broker")
.hasUnit("{message}")
.isCounter()
.hasDataPointsWithoutAttributes())
.add(
"kafka.request.count",
metric ->
metric
.hasDescription("The number of requests received by the broker")
.hasUnit("{request}")
.isCounter()
.hasDataPointsWithAttributes(
attributeGroup(attribute("type", "produce")),
attributeGroup(attribute("type", "fetch"))))
.add(
"kafka.request.failed",
metric ->
metric
.hasDescription("The number of requests to the broker resulting in a failure")
.hasUnit("{request}")
.isCounter()
.hasDataPointsWithAttributes(
attributeGroup(attribute("type", "produce")),
attributeGroup(attribute("type", "fetch"))))
.add(
"kafka.network.io",
metric ->
metric
.hasDescription("The bytes received or sent by the broker")
.hasUnit("By")
.isCounter()
.hasDataPointsWithAttributes(
attributeGroup(attribute("direction", "in")),
attributeGroup(attribute("direction", "out"))))
.add(
"kafka.purgatory.size",
metric ->
metric
.hasDescription("The number of requests waiting in purgatory")
.hasUnit("{request}")
.isGauge()
.hasDataPointsWithAttributes(
attributeGroup(attribute("type", "Produce")),
attributeGroup(attribute("type", "Fetch"))))
.add(
"kafka.request.time.total",
metric ->
metric
.hasDescription("The total time the broker has taken to service requests")
.hasUnit("ms")
.isCounter()
.hasDataPointsWithAttributes(requestTypes))
.add(
"kafka.request.time.50p",
metric ->
metric
.hasDescription(
"The 50th percentile time the broker has taken to service requests")
.hasUnit("ms")
.isGauge()
.hasDataPointsWithAttributes(requestTypes))
.add(
"kafka.request.time.99p",
metric ->
metric
.hasDescription(
"The 99th percentile time the broker has taken to service requests")
.hasUnit("ms")
.isGauge()
.hasDataPointsWithAttributes(requestTypes))
.add(
"kafka.request.time.avg",
metric ->
metric
.hasDescription("The average time the broker has taken to service requests")
.hasUnit("ms")
.isGauge()
.hasDataPointsWithAttributes(requestTypes))
.add(
"kafka.request.queue",
metric ->
metric
.hasDescription("Size of the request queue")
.hasUnit("{request}")
.isGauge()
.hasDataPointsWithoutAttributes())
.add(
"kafka.partition.count",
metric ->
metric
.hasDescription("The number of partitions on the broker")
.hasUnit("{partition}")
.isGauge()
.hasDataPointsWithoutAttributes())
.add(
"kafka.partition.offline",
metric ->
metric
.hasDescription("The number of partitions offline")
.hasUnit("{partition}")
.isGauge()
.hasDataPointsWithoutAttributes())
.add(
"kafka.partition.under_replicated",
metric ->
metric
.hasDescription("The number of under replicated partitions")
.hasUnit("{partition}")
.isGauge()
.hasDataPointsWithoutAttributes())
.add(
"kafka.isr.operation.count",
metric ->
metric
.hasDescription("The number of in-sync replica shrink and expand operations")
.hasUnit("{operation}")
.isCounter()
.hasDataPointsWithAttributes(
attributeGroup(attribute("operation", "shrink")),
attributeGroup(attribute("operation", "expand"))))
.add(
"kafka.controller.active.count",
metric ->
metric
.hasDescription(
"For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.")
.hasUnit("{controller}")
.isGauge()
.hasDataPointsWithoutAttributes())
.add(
"kafka.leader.election.rate",
metric ->
metric
.hasDescription("The leader election count")
.hasUnit("{election}")
.isCounter()
.hasDataPointsWithoutAttributes())
.add(
"kafka.unclean.election.rate",
metric ->
metric
.hasDescription(
"Unclean leader election count - increasing indicates broker failures") // CHANGED
.hasUnit("{election}")
.isCounter()
.hasDataPointsWithoutAttributes())
.add(
"kafka.max.lag",
metric ->
metric
.hasDescription(
"The max lag in messages between follower and leader replicas") // CHANGED
.hasUnit("{message}")
.isGauge()
.hasDataPointsWithoutAttributes());
}
}

View File

@ -0,0 +1,155 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxscraper.target_systems.kafka;
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attribute;
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeGroup;
import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeWithAnyValue;
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaContainer;
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaProducerContainer;
import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createZookeeperContainer;
import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer;
import io.opentelemetry.contrib.jmxscraper.assertions.AttributeMatcher;
import io.opentelemetry.contrib.jmxscraper.target_systems.MetricsVerifier;
import io.opentelemetry.contrib.jmxscraper.target_systems.TargetSystemIntegrationTest;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
public class KafkaProducerIntegrationTest extends TargetSystemIntegrationTest {
@Override
protected Collection<GenericContainer<?>> createPrerequisiteContainers() {
GenericContainer<?> zookeeper =
createZookeeperContainer()
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper")))
.withNetworkAliases("zookeeper");
GenericContainer<?> kafka =
createKafkaContainer()
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka")))
.withNetworkAliases("kafka")
.dependsOn(zookeeper);
return Arrays.asList(zookeeper, kafka);
}
@Override
protected GenericContainer<?> createTargetContainer(int jmxPort) {
return createKafkaProducerContainer()
.withEnv("JMX_PORT", Integer.toString(jmxPort))
.withExposedPorts(jmxPort)
.waitingFor(Wait.forListeningPorts(jmxPort));
}
@Override
protected JmxScraperContainer customizeScraperContainer(
JmxScraperContainer scraper, GenericContainer<?> target, Path tempDir) {
return scraper.withTargetSystem("kafka-producer");
}
@Override
protected MetricsVerifier createMetricsVerifier() {
// TODO: change to follow semconv
AttributeMatcher clientIdAttribute = attributeWithAnyValue("client-id");
AttributeMatcher topicAttribute = attribute("topic", "test-topic-1");
return MetricsVerifier.create()
.add(
"kafka.producer.io-wait-time-ns-avg",
metric ->
metric
.hasDescription(
"The average length of time the I/O thread spent waiting for a socket ready for reads or writes")
.hasUnit("ns")
.isGauge()
.hasDataPointsWithOneAttribute(clientIdAttribute))
.add(
"kafka.producer.outgoing-byte-rate",
metric ->
metric
.hasDescription(
"The average number of outgoing bytes sent per second to all servers")
.hasUnit("By")
.isGauge()
.hasDataPointsWithOneAttribute(clientIdAttribute))
.add(
"kafka.producer.request-latency-avg",
metric ->
metric
.hasDescription("The average request latency")
.hasUnit("ms")
.isGauge()
.hasDataPointsWithOneAttribute(clientIdAttribute))
.add(
"kafka.producer.request-rate",
metric ->
metric
.hasDescription("The average number of requests sent per second")
.hasUnit("{request}")
.isGauge()
.hasDataPointsWithOneAttribute(clientIdAttribute))
.add(
"kafka.producer.response-rate",
metric ->
metric
.hasDescription("Responses received per second")
.hasUnit("{response}")
.isGauge()
.hasDataPointsWithOneAttribute(clientIdAttribute))
// Per topic metrics
.add(
"kafka.producer.byte-rate",
metric ->
metric
.hasDescription("The average number of bytes sent per second for a topic")
.hasUnit("By")
.isGauge()
.hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute)))
.add(
"kafka.producer.compression-rate",
metric ->
metric
.hasDescription(
"The average compression rate of record batches for a topic, defined as the average ratio of the compressed batch size divided by the uncompressed size")
.hasUnit("{ratio}")
.isGauge()
.hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute)))
.add(
"kafka.producer.record-error-rate",
metric ->
metric
.hasDescription(
"The average per-second number of record sends that resulted in errors for a topic")
.hasUnit("{record}")
.isGauge()
.hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute)))
.add(
"kafka.producer.record-retry-rate",
metric ->
metric
.hasDescription(
"The average per-second number of retried record sends for a topic")
.hasUnit("{record}")
.isGauge()
.hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute)))
.add(
"kafka.producer.record-send-rate",
metric ->
metric
.hasDescription("The average number of records sent per second for a topic")
.hasUnit("{record}")
.isGauge()
.hasDataPointsWithAttributes(
attributeGroup(clientIdAttribute, topicAttribute)));
}
}

View File

@ -237,7 +237,7 @@ public class JmxScraper {
RuleParser parserInstance = RuleParser.get();
parserInstance.addMetricDefsTo(conf, inputStream, system);
} else {
throw new IllegalArgumentException("No support for system" + system);
throw new IllegalArgumentException("No support for system " + system);
}
} catch (Exception e) {
throw new IllegalStateException("Error while loading rules for system " + system, e);

View File

@ -0,0 +1,45 @@
---
# Kafka Consumer metrics
rules:
- bean: kafka.consumer:client-id=*,type=consumer-fetch-manager-metrics
metricAttribute:
client-id: param(client-id)
prefix: kafka.consumer.
type: gauge
mapping:
fetch-rate:
desc: The number of fetch requests for all topics per second
unit: '{request}'
bytes-consumed-rate:
metric: total.bytes-consumed-rate
desc: The average number of bytes consumed for all topics per second
unit: By
fetch-size-avg:
metric: total.fetch-size-avg
desc: The average number of bytes fetched per request for all topics
unit: By
records-consumed-rate:
metric: total.records-consumed-rate
desc: The average number of records consumed for all topics per second
unit: '{record}'
records-lag-max:
desc: Number of messages the consumer lags behind the producer
unit: '{record}'
- bean: kafka.consumer:client-id=*,topic=*,type=consumer-fetch-manager-metrics
metricAttribute:
client-id: param(client-id)
topic: param(topic)
prefix: kafka.consumer.
type: gauge
mapping:
bytes-consumed-rate:
desc: The average number of bytes consumed per second
unit: By
fetch-size-avg:
desc: The average number of bytes fetched per request
unit: By
records-consumed-rate:
desc: The average number of records consumed per second
unit: '{record}'

View File

@ -0,0 +1,48 @@
---
# Kafka Producer metrics
rules:
- bean: kafka.producer:client-id=*,type=producer-metrics
metricAttribute:
client-id: param(client-id)
prefix: kafka.producer.
type: gauge
mapping:
io-wait-time-ns-avg:
desc: The average length of time the I/O thread spent waiting for a socket ready for reads or writes
unit: ns
outgoing-byte-rate:
desc: The average number of outgoing bytes sent per second to all servers
unit: By
request-latency-avg:
desc: The average request latency
unit: ms
request-rate:
desc: The average number of requests sent per second
unit: '{request}'
response-rate:
desc: Responses received per second
unit: '{response}'
# per topic metrics
- bean: kafka.producer:client-id=*,topic=*,type=producer-topic-metrics
metricAttribute:
client-id: param(client-id)
topic: param(topic)
prefix: kafka.producer.
type: gauge
mapping:
byte-rate:
desc: The average number of bytes sent per second for a topic
unit: By
compression-rate:
desc: The average compression rate of record batches for a topic, defined as the average ratio of the compressed batch size divided by the uncompressed size
unit: '{ratio}'
record-error-rate:
desc: The average per-second number of record sends that resulted in errors for a topic
unit: '{record}'
record-retry-rate:
desc: The average per-second number of retried record sends for a topic
unit: '{record}'
record-send-rate:
desc: The average number of records sent per second for a topic
unit: '{record}'

View File

@ -0,0 +1,213 @@
---
rules:
# Broker metrics
- bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
mapping:
Count:
metric: kafka.message.count
type: counter
desc: The number of messages received by the broker
unit: "{message}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.count
type: &type counter
desc: &desc The number of requests received by the broker
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec
metricAttribute:
type: const(fetch)
mapping:
Count:
metric: &metric kafka.request.failed
type: &type counter
desc: &desc The number of requests to the broker resulting in a failure
unit: &unit "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec
metricAttribute:
type: const(produce)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
- kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower
metricAttribute:
type: param(request)
unit: ms
mapping:
Count:
metric: kafka.request.time.total
type: counter
desc: The total time the broker has taken to service requests
50thPercentile:
metric: kafka.request.time.50p
type: gauge
desc: The 50th percentile time the broker has taken to service requests
99thPercentile:
metric: kafka.request.time.99p
type: gauge
desc: The 99th percentile time the broker has taken to service requests
# Added
Mean:
metric: kafka.request.time.avg
type: gauge
desc: The average time the broker has taken to service requests
- bean: kafka.network:type=RequestChannel,name=RequestQueueSize
mapping:
Value:
metric: kafka.request.queue
type: gauge
desc: Size of the request queue
unit: "{request}"
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
metricAttribute:
direction: const(in)
mapping:
Count:
metric: &metric kafka.network.io
type: &type counter
desc: &desc The bytes received or sent by the broker
unit: &unit By
- bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
metricAttribute:
direction: const(out)
mapping:
Count:
metric: *metric
type: *type
desc: *desc
unit: *unit
- beans:
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce
- kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch
metricAttribute:
type: param(delayedOperation)
mapping:
Value:
metric: kafka.purgatory.size
type: gauge
desc: The number of requests waiting in purgatory
unit: "{request}"
- bean: kafka.server:type=ReplicaManager,name=PartitionCount
mapping:
Value:
metric: kafka.partition.count
type: gauge
desc: The number of partitions on the broker
unit: "{partition}"
- bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount
mapping:
Value:
metric: kafka.partition.offline
type: gauge
desc: The number of partitions offline
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
mapping:
Value:
metric: kafka.partition.under_replicated
type: gauge
desc: The number of under replicated partitions
unit: "{partition}"
- bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
metricAttribute:
operation: const(shrink)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec
metricAttribute:
operation: const(expand)
mapping:
Count:
metric: kafka.isr.operation.count
type: counter
desc: The number of in-sync replica shrink and expand operations
unit: "{operation}"
- bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica
mapping:
Value:
metric: kafka.max.lag
type: gauge
desc: The max lag in messages between follower and leader replicas
unit: "{message}"
- bean: kafka.controller:type=KafkaController,name=ActiveControllerCount
mapping:
Value:
metric: kafka.controller.active.count
type: gauge
desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.
unit: "{controller}"
- bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs
mapping:
Count:
metric: kafka.leader.election.rate
type: counter
desc: The leader election count
unit: "{election}"
- bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec
mapping:
Count:
metric: kafka.unclean.election.rate
type: counter
desc: Unclean leader election count - increasing indicates broker failures
unit: "{election}"
# Log metrics
- bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs
unit: ms
type: gauge
prefix: kafka.logs.flush.
mapping:
Count:
metric: count
unit: '{flush}'
type: counter
desc: Log flush count
50thPercentile:
metric: time.50p
desc: Log flush time - 50th percentile
99thPercentile:
metric: time.99p
desc: Log flush time - 99th percentile