Add kafka client metrics (#6138)

* Add kafka client metrics

* Spotless

* Refactor to general purpose bridge between kafka client metrics and opentelemetry metrics

* Include kafka prefix, fix typo

* Spotless, update readme with latest metric names

* PR feedback

* Map rate measureables to gauges instead of up down counters

* Spotless, quote attributes, log placeholder

* Move metric table printing to test, only retain most granular attribute set

* PR feedback

* Remove synchornization from metricChange

* remove kafka dependency

* PR feedback

* Fix reset

* Adjust configuration pattern to not rely on GlobalOpenTelemetry

* Merge into KafkaTelemetry

* Relocate readme and fix typo
This commit is contained in:
jack-berg 2022-07-06 11:34:14 -05:00 committed by GitHub
parent 3f3e94ddb6
commit 3e08f36cfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1164 additions and 1 deletions

View File

@ -6,6 +6,8 @@
package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import org.testcontainers.utility.DockerImageName
import java.time.Duration
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.NewTopic
@ -46,7 +48,7 @@ abstract class KafkaClientBaseTest extends InstrumentationSpecification {
static TopicPartition topicPartition = new TopicPartition(SHARED_TOPIC, 0)
def setupSpec() {
kafka = new KafkaContainer()
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"))
.withLogConsumer(new Slf4jLogConsumer(logger))
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1))
.withStartupTimeout(Duration.ofMinutes(1))

View File

@ -0,0 +1,225 @@
# Kafka Metrics
The Kafka client exposes metrics via `org.apache.kafka.common.metrics.MetricsReporter` interface.
OpenTelemetry provides an implementation that bridges the metrics into OpenTelemetry.
To use, merge the config properties
from `KafkaTelemetry.create(OpenTelemetry).metricConfigProperties()`
with the configuration used when creating your producer or consumer.
Note: Kafka reports several metrics at multiple attribute granularities. For
example, `records-consumed-total` is reported with attribute key `[client-id]`
and `[client-id, topic]`. If you analyze the sum of records consumed, ignoring dimensions, backends
are likely to double count. The implementation detects this scenario and only records the most
granular set of attributes available. In the case
of `records-consumed-total`, it reports `[client-id, topic]` and ignores `[client-id]`.
The following table shows the full set of metrics exposed by the kafka client, and the corresponding
OpenTelemetry metric each maps to (if available). Empty values in the Instrument Name, Instrument
Description, etc column indicates there is no registered mapping for the metric and data is NOT
collected.
| Metric Group | Metric Name | Attribute Keys | Instrument Name | Instrument Description | Instrument Type |
|--------------|-------------|----------------|-----------------|------------------------|-----------------|
| `app-info` | `commit-id` | `client-id` | | | |
| `app-info` | `start-time-ms` | `client-id` | | | |
| `app-info` | `version` | `client-id` | | | |
| `consumer-coordinator-metrics` | `assigned-partitions` | `client-id` | `kafka.consumer.assigned_partitions` | The number of partitions currently assigned to this consumer | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `commit-latency-avg` | `client-id` | `kafka.consumer.commit_latency_avg` | The average time taken for a commit request | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `commit-latency-max` | `client-id` | `kafka.consumer.commit_latency_max` | The max time taken for a commit request | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `commit-rate` | `client-id` | `kafka.consumer.commit_rate` | The number of commit calls per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `commit-total` | `client-id` | `kafka.consumer.commit_total` | The total number of commit calls | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-coordinator-metrics` | `failed-rebalance-rate-per-hour` | `client-id` | `kafka.consumer.failed_rebalance_rate_per_hour` | The number of failed rebalance events per hour | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `failed-rebalance-total` | `client-id` | `kafka.consumer.failed_rebalance_total` | The total number of failed rebalance events | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-coordinator-metrics` | `heartbeat-rate` | `client-id` | `kafka.consumer.heartbeat_rate` | The number of heartbeats per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `heartbeat-response-time-max` | `client-id` | `kafka.consumer.heartbeat_response_time_max` | The max time taken to receive a response to a heartbeat request | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `heartbeat-total` | `client-id` | `kafka.consumer.heartbeat_total` | The total number of heartbeats | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-coordinator-metrics` | `join-rate` | `client-id` | `kafka.consumer.join_rate` | The number of group joins per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `join-time-avg` | `client-id` | `kafka.consumer.join_time_avg` | The average time taken for a group rejoin | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `join-time-max` | `client-id` | `kafka.consumer.join_time_max` | The max time taken for a group rejoin | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `join-total` | `client-id` | `kafka.consumer.join_total` | The total number of group joins | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-coordinator-metrics` | `last-heartbeat-seconds-ago` | `client-id` | `kafka.consumer.last_heartbeat_seconds_ago` | The number of seconds since the last coordinator heartbeat was sent | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `last-rebalance-seconds-ago` | `client-id` | `kafka.consumer.last_rebalance_seconds_ago` | The number of seconds since the last successful rebalance event | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `partition-assigned-latency-avg` | `client-id` | `kafka.consumer.partition_assigned_latency_avg` | The average time taken for a partition-assigned rebalance listener callback | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `partition-assigned-latency-max` | `client-id` | `kafka.consumer.partition_assigned_latency_max` | The max time taken for a partition-assigned rebalance listener callback | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `partition-lost-latency-avg` | `client-id` | `kafka.consumer.partition_lost_latency_avg` | The average time taken for a partition-lost rebalance listener callback | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `partition-lost-latency-max` | `client-id` | `kafka.consumer.partition_lost_latency_max` | The max time taken for a partition-lost rebalance listener callback | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `partition-revoked-latency-avg` | `client-id` | `kafka.consumer.partition_revoked_latency_avg` | The average time taken for a partition-revoked rebalance listener callback | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `partition-revoked-latency-max` | `client-id` | `kafka.consumer.partition_revoked_latency_max` | The max time taken for a partition-revoked rebalance listener callback | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `rebalance-latency-avg` | `client-id` | `kafka.consumer.rebalance_latency_avg` | The average time taken for a group to complete a successful rebalance, which may be composed of several failed re-trials until it succeeded | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `rebalance-latency-max` | `client-id` | `kafka.consumer.rebalance_latency_max` | The max time taken for a group to complete a successful rebalance, which may be composed of several failed re-trials until it succeeded | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `rebalance-latency-total` | `client-id` | `kafka.consumer.rebalance_latency_total` | The total number of milliseconds this consumer has spent in successful rebalances since creation | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-coordinator-metrics` | `rebalance-rate-per-hour` | `client-id` | `kafka.consumer.rebalance_rate_per_hour` | The number of successful rebalance events per hour, each event is composed of several failed re-trials until it succeeded | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `rebalance-total` | `client-id` | `kafka.consumer.rebalance_total` | The total number of successful rebalance events, each event is composed of several failed re-trials until it succeeded | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-coordinator-metrics` | `sync-rate` | `client-id` | `kafka.consumer.sync_rate` | The number of group syncs per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `sync-time-avg` | `client-id` | `kafka.consumer.sync_time_avg` | The average time taken for a group sync | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `sync-time-max` | `client-id` | `kafka.consumer.sync_time_max` | The max time taken for a group sync | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-coordinator-metrics` | `sync-total` | `client-id` | `kafka.consumer.sync_total` | The total number of group syncs | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-fetch-manager-metrics` | `bytes-consumed-rate` | `client-id` | | | |
| `consumer-fetch-manager-metrics` | `bytes-consumed-rate` | `client-id`,`topic` | `kafka.consumer.bytes_consumed_rate` | The average number of bytes consumed per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `bytes-consumed-total` | `client-id` | | | |
| `consumer-fetch-manager-metrics` | `bytes-consumed-total` | `client-id`,`topic` | `kafka.consumer.bytes_consumed_total` | The total number of bytes consumed | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-fetch-manager-metrics` | `fetch-latency-avg` | `client-id` | `kafka.consumer.fetch_latency_avg` | The average time taken for a fetch request. | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `fetch-latency-max` | `client-id` | `kafka.consumer.fetch_latency_max` | The max time taken for any fetch request. | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `fetch-rate` | `client-id` | `kafka.consumer.fetch_rate` | The number of fetch requests per second. | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `fetch-size-avg` | `client-id` | | | |
| `consumer-fetch-manager-metrics` | `fetch-size-avg` | `client-id`,`topic` | `kafka.consumer.fetch_size_avg` | The average number of bytes fetched per request | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `fetch-size-max` | `client-id` | | | |
| `consumer-fetch-manager-metrics` | `fetch-size-max` | `client-id`,`topic` | `kafka.consumer.fetch_size_max` | The maximum number of bytes fetched per request | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `fetch-throttle-time-avg` | `client-id` | `kafka.consumer.fetch_throttle_time_avg` | The average throttle time in ms | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `fetch-throttle-time-max` | `client-id` | `kafka.consumer.fetch_throttle_time_max` | The maximum throttle time in ms | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `fetch-total` | `client-id` | `kafka.consumer.fetch_total` | The total number of fetch requests. | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-fetch-manager-metrics` | `preferred-read-replica` | `client-id`,`topic`,`partition` | | | |
| `consumer-fetch-manager-metrics` | `records-consumed-rate` | `client-id` | | | |
| `consumer-fetch-manager-metrics` | `records-consumed-rate` | `client-id`,`topic` | `kafka.consumer.records_consumed_rate` | The average number of records consumed per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `records-consumed-total` | `client-id` | | | |
| `consumer-fetch-manager-metrics` | `records-consumed-total` | `client-id`,`topic` | `kafka.consumer.records_consumed_total` | The total number of records consumed | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-fetch-manager-metrics` | `records-lag` | `client-id`,`topic`,`partition` | `kafka.consumer.records_lag` | The latest lag of the partition | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `records-lag-avg` | `client-id`,`topic`,`partition` | `kafka.consumer.records_lag_avg` | The average lag of the partition | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `records-lag-max` | `client-id` | | | |
| `consumer-fetch-manager-metrics` | `records-lag-max` | `client-id`,`topic`,`partition` | `kafka.consumer.records_lag_max` | The maximum lag in terms of number of records for any partition in this window | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `records-lead` | `client-id`,`topic`,`partition` | `kafka.consumer.records_lead` | The latest lead of the partition | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `records-lead-avg` | `client-id`,`topic`,`partition` | `kafka.consumer.records_lead_avg` | The average lead of the partition | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `records-lead-min` | `client-id` | | | |
| `consumer-fetch-manager-metrics` | `records-lead-min` | `client-id`,`topic`,`partition` | `kafka.consumer.records_lead_min` | The minimum lead in terms of number of records for any partition in this window | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-fetch-manager-metrics` | `records-per-request-avg` | `client-id` | | | |
| `consumer-fetch-manager-metrics` | `records-per-request-avg` | `client-id`,`topic` | `kafka.consumer.records_per_request_avg` | The average number of records in each request | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `connection-close-rate` | `client-id` | `kafka.consumer.connection_close_rate` | The number of connections closed per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `connection-close-total` | `client-id` | `kafka.consumer.connection_close_total` | The total number of connections closed | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-metrics` | `connection-count` | `client-id` | `kafka.consumer.connection_count` | The current number of active connections. | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `connection-creation-rate` | `client-id` | `kafka.consumer.connection_creation_rate` | The number of new connections established per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `connection-creation-total` | `client-id` | `kafka.consumer.connection_creation_total` | The total number of new connections established | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-metrics` | `failed-authentication-rate` | `client-id` | `kafka.consumer.failed_authentication_rate` | The number of connections with failed authentication per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `failed-authentication-total` | `client-id` | `kafka.consumer.failed_authentication_total` | The total number of connections with failed authentication | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-metrics` | `failed-reauthentication-rate` | `client-id` | `kafka.consumer.failed_reauthentication_rate` | The number of failed re-authentication of connections per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `failed-reauthentication-total` | `client-id` | `kafka.consumer.failed_reauthentication_total` | The total number of failed re-authentication of connections | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-metrics` | `incoming-byte-rate` | `client-id` | | | |
| `consumer-metrics` | `incoming-byte-total` | `client-id` | | | |
| `consumer-metrics` | `io-ratio` | `client-id` | `kafka.consumer.io_ratio` | The fraction of time the I/O thread spent doing I/O | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `io-time-ns-avg` | `client-id` | `kafka.consumer.io_time_ns_avg` | The average length of time for I/O per select call in nanoseconds. | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `io-wait-ratio` | `client-id` | `kafka.consumer.io_wait_ratio` | The fraction of time the I/O thread spent waiting | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `io-wait-time-ns-avg` | `client-id` | `kafka.consumer.io_wait_time_ns_avg` | The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds. | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `io-waittime-total` | `client-id` | `kafka.consumer.io_waittime_total` | The total time the I/O thread spent waiting | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-metrics` | `iotime-total` | `client-id` | `kafka.consumer.iotime_total` | The total time the I/O thread spent doing I/O | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-metrics` | `last-poll-seconds-ago` | `client-id` | `kafka.consumer.last_poll_seconds_ago` | The number of seconds since the last poll() invocation. | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `network-io-rate` | `client-id` | `kafka.consumer.network_io_rate` | The number of network operations (reads or writes) on all connections per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `network-io-total` | `client-id` | `kafka.consumer.network_io_total` | The total number of network operations (reads or writes) on all connections | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-metrics` | `outgoing-byte-rate` | `client-id` | | | |
| `consumer-metrics` | `outgoing-byte-total` | `client-id` | | | |
| `consumer-metrics` | `poll-idle-ratio-avg` | `client-id` | `kafka.consumer.poll_idle_ratio_avg` | The average fraction of time the consumer's poll() is idle as opposed to waiting for the user code to process records. | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `reauthentication-latency-avg` | `client-id` | `kafka.consumer.reauthentication_latency_avg` | The average latency observed due to re-authentication | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `reauthentication-latency-max` | `client-id` | `kafka.consumer.reauthentication_latency_max` | The max latency observed due to re-authentication | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `request-rate` | `client-id` | | | |
| `consumer-metrics` | `request-size-avg` | `client-id` | | | |
| `consumer-metrics` | `request-size-max` | `client-id` | | | |
| `consumer-metrics` | `request-total` | `client-id` | | | |
| `consumer-metrics` | `response-rate` | `client-id` | | | |
| `consumer-metrics` | `response-total` | `client-id` | | | |
| `consumer-metrics` | `select-rate` | `client-id` | `kafka.consumer.select_rate` | The number of times the I/O layer checked for new I/O to perform per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `select-total` | `client-id` | `kafka.consumer.select_total` | The total number of times the I/O layer checked for new I/O to perform | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-metrics` | `successful-authentication-no-reauth-total` | `client-id` | `kafka.consumer.successful_authentication_no_reauth_total` | The total number of connections with successful authentication where the client does not support re-authentication | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-metrics` | `successful-authentication-rate` | `client-id` | `kafka.consumer.successful_authentication_rate` | The number of connections with successful authentication per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `successful-authentication-total` | `client-id` | `kafka.consumer.successful_authentication_total` | The total number of connections with successful authentication | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-metrics` | `successful-reauthentication-rate` | `client-id` | `kafka.consumer.successful_reauthentication_rate` | The number of successful re-authentication of connections per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `successful-reauthentication-total` | `client-id` | `kafka.consumer.successful_reauthentication_total` | The total number of successful re-authentication of connections | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-metrics` | `time-between-poll-avg` | `client-id` | `kafka.consumer.time_between_poll_avg` | The average delay between invocations of poll(). | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-metrics` | `time-between-poll-max` | `client-id` | `kafka.consumer.time_between_poll_max` | The max delay between invocations of poll(). | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-node-metrics` | `incoming-byte-rate` | `client-id`,`node-id` | `kafka.consumer.incoming_byte_rate` | The number of bytes read off all sockets per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-node-metrics` | `incoming-byte-total` | `client-id`,`node-id` | `kafka.consumer.incoming_byte_total` | The total number of bytes read off all sockets | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-node-metrics` | `outgoing-byte-rate` | `client-id`,`node-id` | `kafka.consumer.outgoing_byte_rate` | The number of outgoing bytes sent to all servers per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-node-metrics` | `outgoing-byte-total` | `client-id`,`node-id` | `kafka.consumer.outgoing_byte_total` | The total number of outgoing bytes sent to all servers | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-node-metrics` | `request-latency-avg` | `client-id`,`node-id` | `kafka.consumer.request_latency_avg` | | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-node-metrics` | `request-latency-max` | `client-id`,`node-id` | `kafka.consumer.request_latency_max` | | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-node-metrics` | `request-rate` | `client-id`,`node-id` | `kafka.consumer.request_rate` | The number of requests sent per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-node-metrics` | `request-size-avg` | `client-id`,`node-id` | `kafka.consumer.request_size_avg` | The average size of requests sent. | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-node-metrics` | `request-size-max` | `client-id`,`node-id` | `kafka.consumer.request_size_max` | The maximum size of any request sent. | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-node-metrics` | `request-total` | `client-id`,`node-id` | `kafka.consumer.request_total` | The total number of requests sent | `DOUBLE_OBSERVABLE_COUNTER` |
| `consumer-node-metrics` | `response-rate` | `client-id`,`node-id` | `kafka.consumer.response_rate` | The number of responses received per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `consumer-node-metrics` | `response-total` | `client-id`,`node-id` | `kafka.consumer.response_total` | The total number of responses received | `DOUBLE_OBSERVABLE_COUNTER` |
| `kafka-metrics-count` | `count` | `client-id` | | | |
| `producer-metrics` | `batch-size-avg` | `client-id` | `kafka.producer.batch_size_avg` | The average number of bytes sent per partition per-request. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `batch-size-max` | `client-id` | `kafka.producer.batch_size_max` | The max number of bytes sent per partition per-request. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `batch-split-rate` | `client-id` | `kafka.producer.batch_split_rate` | The average number of batch splits per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `batch-split-total` | `client-id` | `kafka.producer.batch_split_total` | The total number of batch splits | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `buffer-available-bytes` | `client-id` | `kafka.producer.buffer_available_bytes` | The total amount of buffer memory that is not being used (either unallocated or in the free list). | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `buffer-exhausted-rate` | `client-id` | `kafka.producer.buffer_exhausted_rate` | The average per-second number of record sends that are dropped due to buffer exhaustion | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `buffer-exhausted-total` | `client-id` | `kafka.producer.buffer_exhausted_total` | The total number of record sends that are dropped due to buffer exhaustion | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `buffer-total-bytes` | `client-id` | `kafka.producer.buffer_total_bytes` | The maximum amount of buffer memory the client can use (whether or not it is currently used). | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `bufferpool-wait-ratio` | `client-id` | `kafka.producer.bufferpool_wait_ratio` | The fraction of time an appender waits for space allocation. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `bufferpool-wait-time-total` | `client-id` | `kafka.producer.bufferpool_wait_time_total` | The total time an appender waits for space allocation. | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `compression-rate-avg` | `client-id` | `kafka.producer.compression_rate_avg` | The average compression rate of record batches. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `connection-close-rate` | `client-id` | `kafka.producer.connection_close_rate` | The number of connections closed per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `connection-close-total` | `client-id` | `kafka.producer.connection_close_total` | The total number of connections closed | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `connection-count` | `client-id` | `kafka.producer.connection_count` | The current number of active connections. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `connection-creation-rate` | `client-id` | `kafka.producer.connection_creation_rate` | The number of new connections established per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `connection-creation-total` | `client-id` | `kafka.producer.connection_creation_total` | The total number of new connections established | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `failed-authentication-rate` | `client-id` | `kafka.producer.failed_authentication_rate` | The number of connections with failed authentication per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `failed-authentication-total` | `client-id` | `kafka.producer.failed_authentication_total` | The total number of connections with failed authentication | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `failed-reauthentication-rate` | `client-id` | `kafka.producer.failed_reauthentication_rate` | The number of failed re-authentication of connections per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `failed-reauthentication-total` | `client-id` | `kafka.producer.failed_reauthentication_total` | The total number of failed re-authentication of connections | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `incoming-byte-rate` | `client-id` | | | |
| `producer-metrics` | `incoming-byte-total` | `client-id` | | | |
| `producer-metrics` | `io-ratio` | `client-id` | `kafka.producer.io_ratio` | The fraction of time the I/O thread spent doing I/O | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `io-time-ns-avg` | `client-id` | `kafka.producer.io_time_ns_avg` | The average length of time for I/O per select call in nanoseconds. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `io-wait-ratio` | `client-id` | `kafka.producer.io_wait_ratio` | The fraction of time the I/O thread spent waiting | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `io-wait-time-ns-avg` | `client-id` | `kafka.producer.io_wait_time_ns_avg` | The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `io-waittime-total` | `client-id` | `kafka.producer.io_waittime_total` | The total time the I/O thread spent waiting | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `iotime-total` | `client-id` | `kafka.producer.iotime_total` | The total time the I/O thread spent doing I/O | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `metadata-age` | `client-id` | `kafka.producer.metadata_age` | The age in seconds of the current producer metadata being used. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `network-io-rate` | `client-id` | `kafka.producer.network_io_rate` | The number of network operations (reads or writes) on all connections per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `network-io-total` | `client-id` | `kafka.producer.network_io_total` | The total number of network operations (reads or writes) on all connections | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `outgoing-byte-rate` | `client-id` | | | |
| `producer-metrics` | `outgoing-byte-total` | `client-id` | | | |
| `producer-metrics` | `produce-throttle-time-avg` | `client-id` | `kafka.producer.produce_throttle_time_avg` | The average time in ms a request was throttled by a broker | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `produce-throttle-time-max` | `client-id` | `kafka.producer.produce_throttle_time_max` | The maximum time in ms a request was throttled by a broker | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `reauthentication-latency-avg` | `client-id` | `kafka.producer.reauthentication_latency_avg` | The average latency observed due to re-authentication | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `reauthentication-latency-max` | `client-id` | `kafka.producer.reauthentication_latency_max` | The max latency observed due to re-authentication | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `record-error-rate` | `client-id` | | | |
| `producer-metrics` | `record-error-total` | `client-id` | | | |
| `producer-metrics` | `record-queue-time-avg` | `client-id` | `kafka.producer.record_queue_time_avg` | The average time in ms record batches spent in the send buffer. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `record-queue-time-max` | `client-id` | `kafka.producer.record_queue_time_max` | The maximum time in ms record batches spent in the send buffer. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `record-retry-rate` | `client-id` | | | |
| `producer-metrics` | `record-retry-total` | `client-id` | | | |
| `producer-metrics` | `record-send-rate` | `client-id` | | | |
| `producer-metrics` | `record-send-total` | `client-id` | | | |
| `producer-metrics` | `record-size-avg` | `client-id` | `kafka.producer.record_size_avg` | The average record size | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `record-size-max` | `client-id` | `kafka.producer.record_size_max` | The maximum record size | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `records-per-request-avg` | `client-id` | `kafka.producer.records_per_request_avg` | The average number of records per request. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `request-latency-avg` | `client-id` | | | |
| `producer-metrics` | `request-latency-max` | `client-id` | | | |
| `producer-metrics` | `request-rate` | `client-id` | | | |
| `producer-metrics` | `request-size-avg` | `client-id` | | | |
| `producer-metrics` | `request-size-max` | `client-id` | | | |
| `producer-metrics` | `request-total` | `client-id` | | | |
| `producer-metrics` | `requests-in-flight` | `client-id` | `kafka.producer.requests_in_flight` | The current number of in-flight requests awaiting a response. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `response-rate` | `client-id` | | | |
| `producer-metrics` | `response-total` | `client-id` | | | |
| `producer-metrics` | `select-rate` | `client-id` | `kafka.producer.select_rate` | The number of times the I/O layer checked for new I/O to perform per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `select-total` | `client-id` | `kafka.producer.select_total` | The total number of times the I/O layer checked for new I/O to perform | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `successful-authentication-no-reauth-total` | `client-id` | `kafka.producer.successful_authentication_no_reauth_total` | The total number of connections with successful authentication where the client does not support re-authentication | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `successful-authentication-rate` | `client-id` | `kafka.producer.successful_authentication_rate` | The number of connections with successful authentication per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `successful-authentication-total` | `client-id` | `kafka.producer.successful_authentication_total` | The total number of connections with successful authentication | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `successful-reauthentication-rate` | `client-id` | `kafka.producer.successful_reauthentication_rate` | The number of successful re-authentication of connections per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-metrics` | `successful-reauthentication-total` | `client-id` | `kafka.producer.successful_reauthentication_total` | The total number of successful re-authentication of connections | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-metrics` | `waiting-threads` | `client-id` | `kafka.producer.waiting_threads` | The number of user threads blocked waiting for buffer memory to enqueue their records | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-node-metrics` | `incoming-byte-rate` | `client-id`,`node-id` | `kafka.producer.incoming_byte_rate` | The number of bytes read off all sockets per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-node-metrics` | `incoming-byte-total` | `client-id`,`node-id` | `kafka.producer.incoming_byte_total` | The total number of bytes read off all sockets | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-node-metrics` | `outgoing-byte-rate` | `client-id`,`node-id` | `kafka.producer.outgoing_byte_rate` | The number of outgoing bytes sent to all servers per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-node-metrics` | `outgoing-byte-total` | `client-id`,`node-id` | `kafka.producer.outgoing_byte_total` | The total number of outgoing bytes sent to all servers | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-node-metrics` | `request-latency-avg` | `client-id`,`node-id` | `kafka.producer.request_latency_avg` | The average request latency in ms | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-node-metrics` | `request-latency-max` | `client-id`,`node-id` | `kafka.producer.request_latency_max` | The maximum request latency in ms | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-node-metrics` | `request-rate` | `client-id`,`node-id` | `kafka.producer.request_rate` | The number of requests sent per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-node-metrics` | `request-size-avg` | `client-id`,`node-id` | `kafka.producer.request_size_avg` | The average size of requests sent. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-node-metrics` | `request-size-max` | `client-id`,`node-id` | `kafka.producer.request_size_max` | The maximum size of any request sent. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-node-metrics` | `request-total` | `client-id`,`node-id` | `kafka.producer.request_total` | The total number of requests sent | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-node-metrics` | `response-rate` | `client-id`,`node-id` | `kafka.producer.response_rate` | The number of responses received per second | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-node-metrics` | `response-total` | `client-id`,`node-id` | `kafka.producer.response_total` | The total number of responses received | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-topic-metrics` | `byte-rate` | `client-id`,`topic` | `kafka.producer.byte_rate` | The average number of bytes sent per second for a topic. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-topic-metrics` | `byte-total` | `client-id`,`topic` | `kafka.producer.byte_total` | The total number of bytes sent for a topic. | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-topic-metrics` | `compression-rate` | `client-id`,`topic` | `kafka.producer.compression_rate` | The average compression rate of record batches for a topic. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-topic-metrics` | `record-error-rate` | `client-id`,`topic` | `kafka.producer.record_error_rate` | The average per-second number of record sends that resulted in errors | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-topic-metrics` | `record-error-total` | `client-id`,`topic` | `kafka.producer.record_error_total` | The total number of record sends that resulted in errors | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-topic-metrics` | `record-retry-rate` | `client-id`,`topic` | `kafka.producer.record_retry_rate` | The average per-second number of retried record sends | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-topic-metrics` | `record-retry-total` | `client-id`,`topic` | `kafka.producer.record_retry_total` | The total number of retried record sends | `DOUBLE_OBSERVABLE_COUNTER` |
| `producer-topic-metrics` | `record-send-rate` | `client-id`,`topic` | `kafka.producer.record_send_rate` | The average number of records sent per second. | `DOUBLE_OBSERVABLE_GAUGE` |
| `producer-topic-metrics` | `record-send-total` | `client-id`,`topic` | `kafka.producer.record_send_total` | The total number of records sent. | `DOUBLE_OBSERVABLE_COUNTER` |

View File

@ -10,6 +10,10 @@ dependencies {
testImplementation("com.fasterxml.jackson.core:jackson-databind:2.10.2")
testImplementation("org.testcontainers:kafka")
testImplementation("org.testcontainers:junit-jupiter")
testCompileOnly("com.google.auto.value:auto-value-annotations")
testAnnotationProcessor("com.google.auto.value:auto-value")
}
tasks {

View File

@ -17,17 +17,25 @@ import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.logging.Logger;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.MetricsReporter;
public final class KafkaTelemetry {
private static final Logger logger = Logger.getLogger(KafkaTelemetry.class.getName());
@ -76,6 +84,41 @@ public final class KafkaTelemetry {
return new TracingConsumer<>(consumer, this);
}
/**
* Produces a set of kafka client config properties (consumer or producer) to register a {@link
* MetricsReporter} that records metrics to an {@code openTelemetry} instance. Add these resulting
* properties to the configuration map used to initialize a {@link KafkaConsumer} or {@link
* KafkaProducer}.
*
* <p>For producers:
*
* <pre>{@code
* // Map<String, Object> config = new HashMap<>();
* // config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
* // config.putAll(kafkaTelemetry.metricConfigProperties());
* // try (KafkaProducer<?, ?> producer = new KafkaProducer<>(config)) { ... }
* }</pre>
*
* <p>For consumers:
*
* <pre>{@code
* // Map<String, Object> config = new HashMap<>();
* // config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
* // config.putAll(kafkaTelemetry.metricConfigProperties());
* // try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(config)) { ... }
* }</pre>
*
* @return the kafka client properties
*/
public Map<String, ?> metricConfigProperties() {
Map<String, Object> config = new HashMap<>();
config.put(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName());
config.put(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, openTelemetry);
return Collections.unmodifiableMap(config);
}
/**
* Build and inject span into record.
*

View File

@ -0,0 +1,519 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka.internal;
import static java.lang.System.lineSeparator;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.google.auto.value.AutoValue;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.PointData;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
@Testcontainers
class OpenTelemetryMetricsReporterTest {
private static final Logger logger =
LoggerFactory.getLogger(OpenTelemetryMetricsReporterTest.class);
private static final List<String> TOPICS = Arrays.asList("foo", "bar", "baz", "qux");
private static final Random RANDOM = new Random();
@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
private static KafkaContainer kafka;
private static KafkaProducer<byte[], byte[]> producer;
private static KafkaConsumer<byte[], byte[]> consumer;
@BeforeAll
static void beforeAll() {
kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"))
.withLogConsumer(new Slf4jLogConsumer(logger))
.waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1))
.withStartupTimeout(Duration.ofMinutes(1));
kafka.start();
producer = new KafkaProducer<>(producerConfig());
consumer = new KafkaConsumer<>(consumerConfig());
}
@AfterAll
static void afterAll() {
kafka.stop();
producer.close();
consumer.close();
}
@AfterEach
void tearDown() {
OpenTelemetryMetricsReporter.resetForTest();
}
private static Map<String, Object> producerConfig() {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-client-id");
producerConfig.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerConfig.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
producerConfig.putAll(
KafkaTelemetry.create(testing.getOpenTelemetry()).metricConfigProperties());
producerConfig.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
TestMetricsReporter.class.getName(),
(o, o2) -> o + "," + o2);
return producerConfig;
}
private static Map<String, Object> consumerConfig() {
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
consumerConfig.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerConfig.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
consumerConfig.putAll(
KafkaTelemetry.create(testing.getOpenTelemetry()).metricConfigProperties());
consumerConfig.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
TestMetricsReporter.class.getName(),
(o, o2) -> o + "," + o2);
return consumerConfig;
}
@Test
void badConfig() {
// Bad producer config
assertThatThrownBy(
() -> {
Map<String, Object> producerConfig = producerConfig();
producerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE);
new KafkaProducer<>(producerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage("Missing required configuration property: opentelemetry.instance");
assertThatThrownBy(
() -> {
Map<String, Object> producerConfig = producerConfig();
producerConfig.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, "foo");
new KafkaProducer<>(producerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage(
"Configuration property opentelemetry.instance is not instance of OpenTelemetry");
// Bad consumer config
assertThatThrownBy(
() -> {
Map<String, Object> consumerConfig = consumerConfig();
consumerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE);
new KafkaConsumer<>(consumerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage("Missing required configuration property: opentelemetry.instance");
assertThatThrownBy(
() -> {
Map<String, Object> consumerConfig = consumerConfig();
consumerConfig.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, "foo");
new KafkaConsumer<>(consumerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage(
"Configuration property opentelemetry.instance is not instance of OpenTelemetry");
}
@Test
void observeMetrics() {
produceRecords();
consumeRecords();
Set<String> expectedMetricNames =
new HashSet<>(
Arrays.asList(
"kafka.consumer.commit_latency_avg",
"kafka.consumer.commit_latency_max",
"kafka.consumer.commit_rate",
"kafka.consumer.commit_total",
"kafka.consumer.failed_rebalance_rate_per_hour",
"kafka.consumer.failed_rebalance_total",
"kafka.consumer.heartbeat_rate",
"kafka.consumer.heartbeat_response_time_max",
"kafka.consumer.heartbeat_total",
"kafka.consumer.join_rate",
"kafka.consumer.join_time_avg",
"kafka.consumer.join_time_max",
"kafka.consumer.join_total",
"kafka.consumer.last_heartbeat_seconds_ago",
"kafka.consumer.last_rebalance_seconds_ago",
"kafka.consumer.partition_assigned_latency_avg",
"kafka.consumer.partition_assigned_latency_max",
"kafka.consumer.partition_lost_latency_avg",
"kafka.consumer.partition_lost_latency_max",
"kafka.consumer.partition_revoked_latency_avg",
"kafka.consumer.partition_revoked_latency_max",
"kafka.consumer.rebalance_latency_avg",
"kafka.consumer.rebalance_latency_max",
"kafka.consumer.rebalance_latency_total",
"kafka.consumer.rebalance_rate_per_hour",
"kafka.consumer.rebalance_total",
"kafka.consumer.sync_rate",
"kafka.consumer.sync_time_avg",
"kafka.consumer.sync_time_max",
"kafka.consumer.sync_total",
"kafka.consumer.bytes_consumed_rate",
"kafka.consumer.bytes_consumed_total",
"kafka.consumer.fetch_latency_avg",
"kafka.consumer.fetch_latency_max",
"kafka.consumer.fetch_rate",
"kafka.consumer.fetch_size_avg",
"kafka.consumer.fetch_size_max",
"kafka.consumer.fetch_throttle_time_avg",
"kafka.consumer.fetch_throttle_time_max",
"kafka.consumer.fetch_total",
"kafka.consumer.records_consumed_rate",
"kafka.consumer.records_consumed_total",
"kafka.consumer.records_lag",
"kafka.consumer.records_lag_avg",
"kafka.consumer.records_lag_max",
"kafka.consumer.records_lead",
"kafka.consumer.records_lead_avg",
"kafka.consumer.records_lead_min",
"kafka.consumer.records_per_request_avg",
"kafka.consumer.connection_close_rate",
"kafka.consumer.connection_close_total",
"kafka.consumer.connection_count",
"kafka.consumer.connection_creation_rate",
"kafka.consumer.connection_creation_total",
"kafka.consumer.failed_authentication_rate",
"kafka.consumer.failed_authentication_total",
"kafka.consumer.failed_reauthentication_rate",
"kafka.consumer.failed_reauthentication_total",
"kafka.consumer.incoming_byte_rate",
"kafka.consumer.incoming_byte_total",
"kafka.consumer.io_ratio",
"kafka.consumer.io_time_ns_avg",
"kafka.consumer.io_wait_ratio",
"kafka.consumer.io_wait_time_ns_avg",
"kafka.consumer.io_waittime_total",
"kafka.consumer.iotime_total",
"kafka.consumer.last_poll_seconds_ago",
"kafka.consumer.network_io_rate",
"kafka.consumer.network_io_total",
"kafka.consumer.outgoing_byte_rate",
"kafka.consumer.outgoing_byte_total",
"kafka.consumer.poll_idle_ratio_avg",
"kafka.consumer.reauthentication_latency_avg",
"kafka.consumer.reauthentication_latency_max",
"kafka.consumer.request_rate",
"kafka.consumer.request_size_avg",
"kafka.consumer.request_size_max",
"kafka.consumer.request_total",
"kafka.consumer.response_rate",
"kafka.consumer.response_total",
"kafka.consumer.select_rate",
"kafka.consumer.select_total",
"kafka.consumer.successful_authentication_no_reauth_total",
"kafka.consumer.successful_authentication_rate",
"kafka.consumer.successful_authentication_total",
"kafka.consumer.successful_reauthentication_rate",
"kafka.consumer.successful_reauthentication_total",
"kafka.consumer.time_between_poll_avg",
"kafka.consumer.time_between_poll_max",
"kafka.consumer.incoming_byte_rate",
"kafka.consumer.incoming_byte_total",
"kafka.consumer.outgoing_byte_rate",
"kafka.consumer.outgoing_byte_total",
"kafka.consumer.request_latency_avg",
"kafka.consumer.request_latency_max",
"kafka.consumer.request_rate",
"kafka.consumer.request_size_avg",
"kafka.consumer.request_size_max",
"kafka.consumer.request_total",
"kafka.consumer.response_rate",
"kafka.consumer.response_total",
"kafka.producer.batch_size_avg",
"kafka.producer.batch_size_max",
"kafka.producer.batch_split_rate",
"kafka.producer.batch_split_total",
"kafka.producer.buffer_available_bytes",
"kafka.producer.buffer_exhausted_rate",
"kafka.producer.buffer_exhausted_total",
"kafka.producer.buffer_total_bytes",
"kafka.producer.bufferpool_wait_ratio",
"kafka.producer.bufferpool_wait_time_total",
"kafka.producer.compression_rate_avg",
"kafka.producer.connection_close_rate",
"kafka.producer.connection_close_total",
"kafka.producer.connection_count",
"kafka.producer.connection_creation_rate",
"kafka.producer.connection_creation_total",
"kafka.producer.failed_authentication_rate",
"kafka.producer.failed_authentication_total",
"kafka.producer.failed_reauthentication_rate",
"kafka.producer.failed_reauthentication_total",
"kafka.producer.incoming_byte_rate",
"kafka.producer.incoming_byte_total",
"kafka.producer.io_ratio",
"kafka.producer.io_time_ns_avg",
"kafka.producer.io_wait_ratio",
"kafka.producer.io_wait_time_ns_avg",
"kafka.producer.io_waittime_total",
"kafka.producer.iotime_total",
"kafka.producer.metadata_age",
"kafka.producer.network_io_rate",
"kafka.producer.network_io_total",
"kafka.producer.outgoing_byte_rate",
"kafka.producer.outgoing_byte_total",
"kafka.producer.produce_throttle_time_avg",
"kafka.producer.produce_throttle_time_max",
"kafka.producer.reauthentication_latency_avg",
"kafka.producer.reauthentication_latency_max",
"kafka.producer.record_error_rate",
"kafka.producer.record_error_total",
"kafka.producer.record_queue_time_avg",
"kafka.producer.record_queue_time_max",
"kafka.producer.record_retry_rate",
"kafka.producer.record_retry_total",
"kafka.producer.record_send_rate",
"kafka.producer.record_send_total",
"kafka.producer.record_size_avg",
"kafka.producer.record_size_max",
"kafka.producer.records_per_request_avg",
"kafka.producer.request_latency_avg",
"kafka.producer.request_latency_max",
"kafka.producer.request_rate",
"kafka.producer.request_size_avg",
"kafka.producer.request_size_max",
"kafka.producer.request_total",
"kafka.producer.requests_in_flight",
"kafka.producer.response_rate",
"kafka.producer.response_total",
"kafka.producer.select_rate",
"kafka.producer.select_total",
"kafka.producer.successful_authentication_no_reauth_total",
"kafka.producer.successful_authentication_rate",
"kafka.producer.successful_authentication_total",
"kafka.producer.successful_reauthentication_rate",
"kafka.producer.successful_reauthentication_total",
"kafka.producer.waiting_threads",
"kafka.producer.incoming_byte_rate",
"kafka.producer.incoming_byte_total",
"kafka.producer.outgoing_byte_rate",
"kafka.producer.outgoing_byte_total",
"kafka.producer.request_latency_avg",
"kafka.producer.request_latency_max",
"kafka.producer.request_rate",
"kafka.producer.request_size_avg",
"kafka.producer.request_size_max",
"kafka.producer.request_total",
"kafka.producer.response_rate",
"kafka.producer.response_total",
"kafka.producer.byte_rate",
"kafka.producer.byte_total",
"kafka.producer.compression_rate",
"kafka.producer.record_error_rate",
"kafka.producer.record_error_total",
"kafka.producer.record_retry_rate",
"kafka.producer.record_retry_total",
"kafka.producer.record_send_rate",
"kafka.producer.record_send_total"));
List<MetricData> metrics = testing.metrics();
Set<String> metricNames = metrics.stream().map(MetricData::getName).collect(toSet());
assertThat(metricNames).containsAll(expectedMetricNames);
assertThat(metrics)
.allSatisfy(
metricData -> {
Set<String> expectedKeys =
metricData.getData().getPoints().stream()
.findFirst()
.map(
point ->
point.getAttributes().asMap().keySet().stream()
.map(AttributeKey::getKey)
.collect(toSet()))
.orElse(Collections.emptySet());
assertThat(metricData.getData().getPoints())
.extracting(PointData::getAttributes)
.extracting(
attributes ->
attributes.asMap().keySet().stream()
.map(AttributeKey::getKey)
.collect(toSet()))
.allSatisfy(attributeKeys -> assertThat(attributeKeys).isEqualTo(expectedKeys));
});
// Print mapping table
printMappingTable();
}
private static void produceRecords() {
for (int i = 0; i < 100; i++) {
producer.send(
new ProducerRecord<>(
TOPICS.get(RANDOM.nextInt(TOPICS.size())),
0,
System.currentTimeMillis(),
"key".getBytes(StandardCharsets.UTF_8),
"value".getBytes(StandardCharsets.UTF_8)));
}
}
private static void consumeRecords() {
consumer.subscribe(TOPICS);
Instant stopTime = Instant.now().plusSeconds(10);
while (Instant.now().isBefore(stopTime)) {
consumer.poll(Duration.ofSeconds(1));
}
}
/**
* Print a table mapping kafka metrics to equivalent OpenTelemetry metrics, in markdown format.
*/
private static void printMappingTable() {
StringBuilder sb = new StringBuilder();
// Append table headers
sb.append(
"| Metric Group | Metric Name | Attribute Keys | Instrument Name | Instrument Description | Instrument Type |")
.append(lineSeparator())
.append(
"|--------------|-------------|----------------|-----------------|------------------------|-----------------|")
.append(lineSeparator());
Map<String, List<KafkaMetricId>> kafkaMetricsByGroup =
TestMetricsReporter.seenMetrics.stream().collect(groupingBy(KafkaMetricId::getGroup));
List<RegisteredObservable> registeredObservables =
OpenTelemetryMetricsReporter.getRegisteredObservables();
// Iterate through groups in alpha order
for (String group : kafkaMetricsByGroup.keySet().stream().sorted().collect(toList())) {
List<KafkaMetricId> kafkaMetricIds =
kafkaMetricsByGroup.get(group).stream()
.sorted(
comparing(KafkaMetricId::getName)
.thenComparing(kafkaMetricId -> kafkaMetricId.getAttributeKeys().size()))
.collect(toList());
// Iterate through metrics in alpha order by name
for (KafkaMetricId kafkaMetricId : kafkaMetricIds) {
// Find first (there may be multiple) registered instrument that matches the kafkaMetricId
Optional<InstrumentDescriptor> descriptor =
registeredObservables.stream()
.filter(
registeredObservable ->
KafkaMetricId.create(registeredObservable.getKafkaMetricName())
.equals(kafkaMetricId))
.findFirst()
.map(RegisteredObservable::getInstrumentDescriptor);
// Append table row
sb.append(
String.format(
"| %s | %s | %s | %s | %s | %s |%n",
"`" + group + "`",
"`" + kafkaMetricId.getName() + "`",
kafkaMetricId.getAttributeKeys().stream()
.map(key -> "`" + key + "`")
.collect(joining(",")),
descriptor.map(i -> "`" + i.getName() + "`").orElse(""),
descriptor.map(InstrumentDescriptor::getDescription).orElse(""),
descriptor.map(i -> "`" + i.getInstrumentType() + "`").orElse("")));
}
}
logger.info("Mapping table" + System.lineSeparator() + sb);
}
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public static class TestMetricsReporter implements MetricsReporter {
private static final Set<KafkaMetricId> seenMetrics = new HashSet<>();
@Override
public void init(List<KafkaMetric> list) {
list.forEach(this::metricChange);
}
@Override
public void metricChange(KafkaMetric kafkaMetric) {
seenMetrics.add(KafkaMetricId.create(kafkaMetric.metricName()));
}
@Override
public void metricRemoval(KafkaMetric kafkaMetric) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> map) {}
}
@AutoValue
abstract static class KafkaMetricId {
abstract String getGroup();
abstract String getName();
abstract Set<String> getAttributeKeys();
static KafkaMetricId create(MetricName metricName) {
return new AutoValue_OpenTelemetryMetricsReporterTest_KafkaMetricId(
metricName.group(), metricName.name(), metricName.tags().keySet());
}
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka.internal;
import com.google.auto.value.AutoValue;
/** A description of an OpenTelemetry metric instrument. */
@AutoValue
abstract class InstrumentDescriptor {
static final String INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE = "DOUBLE_OBSERVABLE_GAUGE";
static final String INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER = "DOUBLE_OBSERVABLE_COUNTER";
abstract String getName();
abstract String getDescription();
abstract String getInstrumentType();
static InstrumentDescriptor createDoubleGauge(String name, String description) {
return new AutoValue_InstrumentDescriptor(
name, description, INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE);
}
static InstrumentDescriptor createDoubleCounter(String name, String description) {
return new AutoValue_InstrumentDescriptor(
name, description, INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER);
}
}

View File

@ -0,0 +1,145 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka.internal;
import static io.opentelemetry.instrumentation.kafka.internal.InstrumentDescriptor.INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER;
import static io.opentelemetry.instrumentation.kafka.internal.InstrumentDescriptor.INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
/** A registry mapping kafka metrics to corresponding OpenTelemetry metric definitions. */
final class KafkaMetricRegistry {
private static final Set<String> groups = new HashSet<>(Arrays.asList("consumer", "producer"));
private static final Map<Class<?>, String> measureableToInstrumentType = new HashMap<>();
private static final Map<String, String> descriptionCache = new ConcurrentHashMap<>();
static {
Map<String, String> classNameToType = new HashMap<>();
classNameToType.put(
"org.apache.kafka.common.metrics.stats.Rate", INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE);
classNameToType.put(
"org.apache.kafka.common.metrics.stats.Avg", INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE);
classNameToType.put(
"org.apache.kafka.common.metrics.stats.Max", INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE);
classNameToType.put(
"org.apache.kafka.common.metrics.stats.Value", INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE);
classNameToType.put(
"org.apache.kafka.common.metrics.stats.CumulativeSum",
INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER);
classNameToType.put(
"org.apache.kafka.common.metrics.stats.CumulativeCount",
INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER);
for (Map.Entry<String, String> entry : classNameToType.entrySet()) {
try {
measureableToInstrumentType.put(Class.forName(entry.getKey()), entry.getValue());
} catch (ClassNotFoundException e) {
// Class doesn't exist in this version of kafka client - skip
}
}
}
@Nullable
static RegisteredObservable getRegisteredObservable(Meter meter, KafkaMetric kafkaMetric) {
// If metric is not a Measureable, we can't map it to an instrument
Class<? extends Measurable> measurable = getMeasurable(kafkaMetric);
if (measurable == null) {
return null;
}
MetricName metricName = kafkaMetric.metricName();
Optional<String> matchingGroup =
groups.stream().filter(group -> metricName.group().contains(group)).findFirst();
// Only map metrics that have a matching group
if (!matchingGroup.isPresent()) {
return null;
}
String instrumentName =
"kafka." + matchingGroup.get() + "." + metricName.name().replace("-", "_");
String instrumentDescription =
descriptionCache.computeIfAbsent(instrumentName, s -> metricName.description());
String instrumentType =
measureableToInstrumentType.getOrDefault(
measurable, INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE);
InstrumentDescriptor instrumentDescriptor =
toInstrumentDescriptor(instrumentType, instrumentName, instrumentDescription);
Attributes attributes = toAttributes(metricName.tags());
AutoCloseable observable =
createObservable(meter, attributes, instrumentDescriptor, kafkaMetric);
return RegisteredObservable.create(metricName, instrumentDescriptor, attributes, observable);
}
@Nullable
private static Class<? extends Measurable> getMeasurable(KafkaMetric kafkaMetric) {
try {
return kafkaMetric.measurable().getClass();
} catch (IllegalStateException e) {
return null;
}
}
private static InstrumentDescriptor toInstrumentDescriptor(
String instrumentType, String instrumentName, String instrumentDescription) {
switch (instrumentType) {
case INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE:
return InstrumentDescriptor.createDoubleGauge(instrumentName, instrumentDescription);
case INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER:
return InstrumentDescriptor.createDoubleCounter(instrumentName, instrumentDescription);
default: // Continue below to throw
}
throw new IllegalStateException("Unrecognized instrument type. This is a bug.");
}
private static Attributes toAttributes(Map<String, String> tags) {
AttributesBuilder attributesBuilder = Attributes.builder();
tags.forEach(attributesBuilder::put);
return attributesBuilder.build();
}
private static AutoCloseable createObservable(
Meter meter,
Attributes attributes,
InstrumentDescriptor instrumentDescriptor,
KafkaMetric kafkaMetric) {
Consumer<ObservableDoubleMeasurement> callback =
observableMeasurement -> observableMeasurement.record(kafkaMetric.value(), attributes);
switch (instrumentDescriptor.getInstrumentType()) {
case INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE:
return meter
.gaugeBuilder(instrumentDescriptor.getName())
.setDescription(instrumentDescriptor.getDescription())
.buildWithCallback(callback);
case INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER:
return meter
.counterBuilder(instrumentDescriptor.getName())
.setDescription(instrumentDescriptor.getDescription())
.ofDoubles()
.buildWithCallback(callback);
default: // Continue below to throw
}
// TODO: add support for other instrument types and value types as needed for new instruments.
// This should not happen.
throw new IllegalStateException("Unrecognized instrument type. This is a bug.");
}
private KafkaMetricRegistry() {}
}

View File

@ -0,0 +1,162 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka.internal;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.instrumentation.api.internal.GuardedBy;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
/**
* A {@link MetricsReporter} which bridges Kafka metrics to OpenTelemetry metrics.
*
* <p>To configure, use:
*
* <pre><{@code
* // KafkaTelemetry.KafkaTelemetry.create(OpenTelemetry).metricConfigProperties()
* }</pre>
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class OpenTelemetryMetricsReporter implements MetricsReporter {
public static final String CONFIG_KEY_OPENTELEMETRY_INSTANCE = "opentelemetry.instance";
private static final Logger logger =
Logger.getLogger(OpenTelemetryMetricsReporter.class.getName());
private volatile Meter meter;
private static final Object lock = new Object();
@GuardedBy("lock")
private static final List<RegisteredObservable> registeredObservables = new ArrayList<>();
/**
* Reset for test by reseting the {@link #meter} to {@code null} and closing all registered
* instruments.
*/
static void resetForTest() {
closeAllInstruments();
}
// Visible for test
static List<RegisteredObservable> getRegisteredObservables() {
synchronized (lock) {
return new ArrayList<>(registeredObservables);
}
}
@Override
public void init(List<KafkaMetric> metrics) {
metrics.forEach(this::metricChange);
}
@Override
public void metricChange(KafkaMetric metric) {
Meter currentMeter = meter;
if (currentMeter == null) {
// Ignore if meter hasn't been initialized in configure(Map<String, ?)
return;
}
RegisteredObservable registeredObservable =
KafkaMetricRegistry.getRegisteredObservable(currentMeter, metric);
if (registeredObservable == null) {
logger.log(
Level.FINEST, "Metric changed but cannot map to instrument: {0}", metric.metricName());
return;
}
Set<AttributeKey<?>> attributeKeys = registeredObservable.getAttributes().asMap().keySet();
synchronized (lock) {
for (Iterator<RegisteredObservable> it = registeredObservables.iterator(); it.hasNext(); ) {
RegisteredObservable curRegisteredObservable = it.next();
Set<AttributeKey<?>> curAttributeKeys =
curRegisteredObservable.getAttributes().asMap().keySet();
if (curRegisteredObservable.getKafkaMetricName().equals(metric.metricName())) {
logger.log(Level.FINEST, "Replacing instrument: {0}", curRegisteredObservable);
closeInstrument(curRegisteredObservable.getObservable());
it.remove();
} else if (curRegisteredObservable
.getInstrumentDescriptor()
.equals(registeredObservable.getInstrumentDescriptor())
&& attributeKeys.size() > curAttributeKeys.size()
&& attributeKeys.containsAll(curAttributeKeys)) {
logger.log(
Level.FINEST,
"Replacing instrument with higher dimension version: {0}",
curRegisteredObservable);
closeInstrument(curRegisteredObservable.getObservable());
it.remove();
}
}
registeredObservables.add(registeredObservable);
}
}
@Override
public void metricRemoval(KafkaMetric metric) {
logger.log(Level.FINEST, "Metric removed: {0}", metric.metricName());
synchronized (lock) {
for (Iterator<RegisteredObservable> it = registeredObservables.iterator(); it.hasNext(); ) {
RegisteredObservable current = it.next();
if (current.getKafkaMetricName().equals(metric.metricName())) {
closeInstrument(current.getObservable());
it.remove();
}
}
}
}
@Override
public void close() {
closeAllInstruments();
}
private static void closeAllInstruments() {
synchronized (lock) {
for (Iterator<RegisteredObservable> it = registeredObservables.iterator(); it.hasNext(); ) {
closeInstrument(it.next().getObservable());
it.remove();
}
}
}
private static void closeInstrument(AutoCloseable observable) {
try {
observable.close();
} catch (Exception e) {
throw new IllegalStateException("Error occurred closing instrument", e);
}
}
@Override
public void configure(Map<String, ?> configs) {
Object openTelemetry = configs.get(CONFIG_KEY_OPENTELEMETRY_INSTANCE);
if (openTelemetry == null) {
throw new IllegalStateException(
"Missing required configuration property: " + CONFIG_KEY_OPENTELEMETRY_INSTANCE);
}
if (!(openTelemetry instanceof OpenTelemetry)) {
throw new IllegalStateException(
"Configuration property "
+ CONFIG_KEY_OPENTELEMETRY_INSTANCE
+ " is not instance of OpenTelemetry");
}
meter = ((OpenTelemetry) openTelemetry).getMeter("io.opentelemetry.kafka-clients");
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.kafka.internal;
import com.google.auto.value.AutoValue;
import io.opentelemetry.api.common.Attributes;
import org.apache.kafka.common.MetricName;
@AutoValue
abstract class RegisteredObservable {
abstract MetricName getKafkaMetricName();
abstract InstrumentDescriptor getInstrumentDescriptor();
abstract Attributes getAttributes();
abstract AutoCloseable getObservable();
static RegisteredObservable create(
MetricName metricName,
InstrumentDescriptor instrumentDescriptor,
Attributes attributes,
AutoCloseable observable) {
return new AutoValue_RegisteredObservable(
metricName, instrumentDescriptor, attributes, observable);
}
}