Update cassandra jmx metrics script to combine similar metrics into labelled metric (#118)
* Update cassandra script to combine similar metrics into labelled metric * Appease spotless * Rename status All to Ok * Update abstract integration data point asserts * Address PR feedback * Update cassandra metrics documentation
This commit is contained in:
parent
f5a170c651
commit
2ebfe9917b
|
|
@ -17,26 +17,11 @@ These metrics are sourced from Cassandra's exposed Dropwizard Metrics for each n
|
|||
* Unit: `µs`
|
||||
* Instrument Type: DoubleValueObserver
|
||||
|
||||
* Name: `cassandra.client.request.range_slice.latency.count`
|
||||
* Description: Number of token range read request operations
|
||||
* Unit: `1`
|
||||
* Instrument Type: LongSumObserver
|
||||
|
||||
* Name: `cassandra.client.request.range_slice.latency.max`
|
||||
* Description: Maximum token range read request latency
|
||||
* Unit: `µs`
|
||||
* Instrument Type: DoubleValueObserver
|
||||
|
||||
* Name: `cassandra.client.request.range_slice.timeout.count`
|
||||
* Description: Number of token range read request timeouts encountered
|
||||
* Unit: `1`
|
||||
* Instrument Type: LongSumObserver
|
||||
|
||||
* Name: `cassandra.client.request.range_slice.unavailable.count`
|
||||
* Description: Number of token range read request unavailable exceptions encountered
|
||||
* Unit: `1`
|
||||
* Instrument Type: LongSumObserver
|
||||
|
||||
* Name: `cassandra.client.request.read.latency.50p`
|
||||
* Description: Standard read request latency - 50th percentile
|
||||
* Unit: `µs`
|
||||
|
|
@ -47,26 +32,11 @@ These metrics are sourced from Cassandra's exposed Dropwizard Metrics for each n
|
|||
* Unit: `µs`
|
||||
* Instrument Type: DoubleValueObserver
|
||||
|
||||
* Name: `cassandra.client.request.read.latency.count`
|
||||
* Description: Number of standard read request operations
|
||||
* Unit: `1`
|
||||
* Instrument Type: LongSumObserver
|
||||
|
||||
* Name: `cassandra.client.request.read.latency.max`
|
||||
* Description: Maximum standard read request latency
|
||||
* Unit: `µs`
|
||||
* Instrument Type: DoubleValueObserver
|
||||
|
||||
* Name: `cassandra.client.request.read.timeout.count`
|
||||
* Description: Number of standard read request timeouts encountered
|
||||
* Unit: `1`
|
||||
* Instrument Type: LongSumObserver
|
||||
|
||||
* Name: `cassandra.client.request.read.unavailable.count`
|
||||
* Description: Number of standard read request unavailable exceptions encountered
|
||||
* Unit: `1`
|
||||
* Instrument Type: LongSumObserver
|
||||
|
||||
* Name: `cassandra.client.request.write.latency.50p`
|
||||
* Description: Regular write request latency - 50th percentile
|
||||
* Unit: `µs`
|
||||
|
|
@ -77,23 +47,20 @@ These metrics are sourced from Cassandra's exposed Dropwizard Metrics for each n
|
|||
* Unit: `µs`
|
||||
* Instrument Type: DoubleValueObserver
|
||||
|
||||
* Name: `cassandra.client.request.write.latency.count`
|
||||
* Description: Number of regular write request operations
|
||||
* Unit: `1`
|
||||
* Instrument Type: LongSumObserver
|
||||
|
||||
* Name: `cassandra.client.request.write.latency.max`
|
||||
* Description: Maximum regular write request latency
|
||||
* Unit: `µs`
|
||||
* Instrument Type: DoubleValueObserver
|
||||
|
||||
* Name: `cassandra.client.request.write.timeout.count`
|
||||
* Description: Number of regular write request timeouts encountered
|
||||
* Name: `cassandra.client.request.count`
|
||||
* Description: Number of requests by operation
|
||||
* Labels: `operation`
|
||||
* Unit: `1`
|
||||
* Instrument Type: LongSumObserver
|
||||
|
||||
* Name: `cassandra.client.request.write.unavailable.count`
|
||||
* Description: Number of regular write request unavailable exceptions encountered
|
||||
* Name: `cassandra.client.request.error.count`
|
||||
* Description: Number of request errors by operation
|
||||
* Labels: `operation`, `status`
|
||||
* Unit: `1`
|
||||
* Instrument Type: LongSumObserver
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import io.grpc.stub.StreamObserver;
|
|||
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
|
||||
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
|
||||
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
|
||||
import io.opentelemetry.proto.common.v1.KeyValue;
|
||||
import io.opentelemetry.proto.metrics.v1.Metric;
|
||||
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
|
||||
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
|
||||
|
|
@ -23,7 +24,9 @@ import java.time.Duration;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.function.Consumer;
|
||||
|
|
@ -179,6 +182,19 @@ public abstract class AbstractIntegrationTest {
|
|||
assertTypedPoints(metric.getSum().getDataPointsList(), types);
|
||||
}
|
||||
|
||||
protected void assertSumWithAttributes(
|
||||
Metric metric,
|
||||
String name,
|
||||
String description,
|
||||
String unit,
|
||||
List<Map<String, String>> attributeGroups) {
|
||||
assertThat(metric.getName()).isEqualTo(name);
|
||||
assertThat(metric.getDescription()).isEqualTo(description);
|
||||
assertThat(metric.getUnit()).isEqualTo(unit);
|
||||
assertThat(metric.hasSum()).isTrue();
|
||||
assertAttributedPoints(metric.getSum().getDataPointsList(), attributeGroups);
|
||||
}
|
||||
|
||||
private static final String expectedMeterVersion() {
|
||||
// Automatically set by gradle when running the tests
|
||||
String version = System.getProperty("gradle.project.version");
|
||||
|
|
@ -186,23 +202,39 @@ public abstract class AbstractIntegrationTest {
|
|||
return version;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void assertTypedPoints(List<NumberDataPoint> points, List<String> types) {
|
||||
List<Map<String, String>> expectedAttributes =
|
||||
types.stream()
|
||||
.map(
|
||||
type ->
|
||||
new HashMap<String, String>() {
|
||||
{
|
||||
put("name", type);
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
assertAttributedPoints(points, expectedAttributes);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void assertAttributedPoints(
|
||||
List<NumberDataPoint> points, List<Map<String, String>> attributeGroups) {
|
||||
assertThat(points)
|
||||
.extracting(
|
||||
numberDataPoint ->
|
||||
numberDataPoint.getAttributesList().stream()
|
||||
.collect(
|
||||
Collectors.toMap(
|
||||
KeyValue::getKey, keyValue -> keyValue.getValue().getStringValue())))
|
||||
.satisfiesExactlyInAnyOrder(
|
||||
types.stream()
|
||||
attributeGroups.stream()
|
||||
.map(
|
||||
type ->
|
||||
(Consumer<NumberDataPoint>)
|
||||
point ->
|
||||
assertThat(point.getAttributesList())
|
||||
.singleElement()
|
||||
.satisfies(
|
||||
attribute -> {
|
||||
assertThat(attribute.getKey()).isEqualTo("name");
|
||||
assertThat(attribute.getValue().getStringValue())
|
||||
.isEqualTo(type);
|
||||
}))
|
||||
expected ->
|
||||
(Consumer<Map<String, String>>)
|
||||
pointAttributes ->
|
||||
assertThat(pointAttributes)
|
||||
.containsExactlyInAnyOrderEntriesOf(expected))
|
||||
.toArray(Consumer[]::new));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,6 +7,11 @@ package io.opentelemetry.contrib.jmxmetrics.target_systems;
|
|||
|
||||
import io.opentelemetry.contrib.jmxmetrics.AbstractIntegrationTest;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
|
|
@ -48,30 +53,12 @@ class CassandraIntegrationTest extends AbstractIntegrationTest {
|
|||
"cassandra.client.request.range_slice.latency.99p",
|
||||
"Token range read request latency - 99th percentile",
|
||||
"µs"),
|
||||
metric ->
|
||||
assertSum(
|
||||
metric,
|
||||
"cassandra.client.request.range_slice.latency.count",
|
||||
"Number of token range read request operations",
|
||||
"1"),
|
||||
metric ->
|
||||
assertGauge(
|
||||
metric,
|
||||
"cassandra.client.request.range_slice.latency.max",
|
||||
"Maximum token range read request latency",
|
||||
"µs"),
|
||||
metric ->
|
||||
assertSum(
|
||||
metric,
|
||||
"cassandra.client.request.range_slice.timeout.count",
|
||||
"Number of token range read request timeouts encountered",
|
||||
"1"),
|
||||
metric ->
|
||||
assertSum(
|
||||
metric,
|
||||
"cassandra.client.request.range_slice.unavailable.count",
|
||||
"Number of token range read request unavailable exceptions encountered",
|
||||
"1"),
|
||||
metric ->
|
||||
assertGauge(
|
||||
metric,
|
||||
|
|
@ -84,30 +71,12 @@ class CassandraIntegrationTest extends AbstractIntegrationTest {
|
|||
"cassandra.client.request.read.latency.99p",
|
||||
"Standard read request latency - 99th percentile",
|
||||
"µs"),
|
||||
metric ->
|
||||
assertSum(
|
||||
metric,
|
||||
"cassandra.client.request.read.latency.count",
|
||||
"Number of standard read request operations",
|
||||
"1"),
|
||||
metric ->
|
||||
assertGauge(
|
||||
metric,
|
||||
"cassandra.client.request.read.latency.max",
|
||||
"Maximum standard read request latency",
|
||||
"µs"),
|
||||
metric ->
|
||||
assertSum(
|
||||
metric,
|
||||
"cassandra.client.request.read.timeout.count",
|
||||
"Number of standard read request timeouts encountered",
|
||||
"1"),
|
||||
metric ->
|
||||
assertSum(
|
||||
metric,
|
||||
"cassandra.client.request.read.unavailable.count",
|
||||
"Number of standard read request unavailable exceptions encountered",
|
||||
"1"),
|
||||
metric ->
|
||||
assertGauge(
|
||||
metric,
|
||||
|
|
@ -120,30 +89,12 @@ class CassandraIntegrationTest extends AbstractIntegrationTest {
|
|||
"cassandra.client.request.write.latency.99p",
|
||||
"Regular write request latency - 99th percentile",
|
||||
"µs"),
|
||||
metric ->
|
||||
assertSum(
|
||||
metric,
|
||||
"cassandra.client.request.write.latency.count",
|
||||
"Number of regular write request operations",
|
||||
"1"),
|
||||
metric ->
|
||||
assertGauge(
|
||||
metric,
|
||||
"cassandra.client.request.write.latency.max",
|
||||
"Maximum regular write request latency",
|
||||
"µs"),
|
||||
metric ->
|
||||
assertSum(
|
||||
metric,
|
||||
"cassandra.client.request.write.timeout.count",
|
||||
"Number of regular write request timeouts encountered",
|
||||
"1"),
|
||||
metric ->
|
||||
assertSum(
|
||||
metric,
|
||||
"cassandra.client.request.write.unavailable.count",
|
||||
"Number of regular write request unavailable exceptions encountered",
|
||||
"1"),
|
||||
metric ->
|
||||
assertSum(
|
||||
metric,
|
||||
|
|
@ -175,6 +126,53 @@ class CassandraIntegrationTest extends AbstractIntegrationTest {
|
|||
"cassandra.storage.total_hints.in_progress.count",
|
||||
"Number of hints attempting to be sent currently",
|
||||
"1",
|
||||
false));
|
||||
false),
|
||||
metric ->
|
||||
assertSumWithAttributes(
|
||||
metric,
|
||||
"cassandra.client.request.count",
|
||||
"Number of requests by operation",
|
||||
"1",
|
||||
getRequestCountAttributes()),
|
||||
metric ->
|
||||
assertSumWithAttributes(
|
||||
metric,
|
||||
"cassandra.client.request.error.count",
|
||||
"Number of request errors by operation",
|
||||
"1",
|
||||
getRequestErrorCountAttributes()));
|
||||
}
|
||||
|
||||
private List<Map<String, String>> getRequestCountAttributes() {
|
||||
List<String> operations = Arrays.asList("RangeSlice", "Read", "Write");
|
||||
|
||||
return operations.stream()
|
||||
.map(
|
||||
op ->
|
||||
new HashMap<String, String>() {
|
||||
{
|
||||
put("operation", op);
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private List<Map<String, String>> getRequestErrorCountAttributes() {
|
||||
List<String> operations = Arrays.asList("RangeSlice", "Read", "Write");
|
||||
List<String> statuses = Arrays.asList("Timeout", "Failure", "Unavailable");
|
||||
|
||||
return operations.stream()
|
||||
.flatMap(
|
||||
op ->
|
||||
statuses.stream()
|
||||
.map(
|
||||
st ->
|
||||
new HashMap<String, String>() {
|
||||
{
|
||||
put("operation", op);
|
||||
put("status", st);
|
||||
}
|
||||
}))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
def cassandraMetrics = "org.apache.cassandra.metrics"
|
||||
def clientRequest = "${cassandraMetrics}:type=ClientRequest"
|
||||
def clientRequestRangeSlice = "${clientRequest},scope=RangeSlice"
|
||||
|
||||
def clientRequestRangeSliceLatency = otel.mbean("${clientRequestRangeSlice},name=Latency")
|
||||
otel.instrument(clientRequestRangeSliceLatency,
|
||||
"cassandra.client.request.range_slice.latency.50p",
|
||||
|
|
@ -28,28 +29,11 @@ otel.instrument(clientRequestRangeSliceLatency,
|
|||
"Token range read request latency - 99th percentile", "µs", "99thPercentile",
|
||||
otel.&doubleValueCallback)
|
||||
|
||||
otel.instrument(clientRequestRangeSliceLatency,
|
||||
"cassandra.client.request.range_slice.latency.count",
|
||||
"Number of token range read request operations", "1", "Count",
|
||||
otel.&longCounterCallback)
|
||||
|
||||
otel.instrument(clientRequestRangeSliceLatency,
|
||||
"cassandra.client.request.range_slice.latency.max",
|
||||
"Maximum token range read request latency", "µs", "Max",
|
||||
otel.&doubleValueCallback)
|
||||
|
||||
def clientRequestRangeSliceTimeouts = otel.mbean("${clientRequestRangeSlice},name=Timeouts")
|
||||
otel.instrument(clientRequestRangeSliceTimeouts,
|
||||
"cassandra.client.request.range_slice.timeout.count",
|
||||
"Number of token range read request timeouts encountered", "1", "Count",
|
||||
otel.&longCounterCallback)
|
||||
|
||||
def clientRequestRangeSliceUnavailables = otel.mbean("${clientRequestRangeSlice},name=Unavailables")
|
||||
otel.instrument(clientRequestRangeSliceUnavailables,
|
||||
"cassandra.client.request.range_slice.unavailable.count",
|
||||
"Number of token range read request unavailable exceptions encountered", "1", "Count",
|
||||
otel.&longCounterCallback)
|
||||
|
||||
def clientRequestRead = "${clientRequest},scope=Read"
|
||||
def clientRequestReadLatency = otel.mbean("${clientRequestRead},name=Latency")
|
||||
otel.instrument(clientRequestReadLatency,
|
||||
|
|
@ -62,28 +46,11 @@ otel.instrument(clientRequestReadLatency,
|
|||
"Standard read request latency - 99th percentile", "µs", "99thPercentile",
|
||||
otel.&doubleValueCallback)
|
||||
|
||||
otel.instrument(clientRequestReadLatency,
|
||||
"cassandra.client.request.read.latency.count",
|
||||
"Number of standard read request operations", "1", "Count",
|
||||
otel.&longCounterCallback)
|
||||
|
||||
otel.instrument(clientRequestReadLatency,
|
||||
"cassandra.client.request.read.latency.max",
|
||||
"Maximum standard read request latency", "µs", "Max",
|
||||
otel.&doubleValueCallback)
|
||||
|
||||
def clientRequestReadTimeouts = otel.mbean("${clientRequestRead},name=Timeouts")
|
||||
otel.instrument(clientRequestReadTimeouts,
|
||||
"cassandra.client.request.read.timeout.count",
|
||||
"Number of standard read request timeouts encountered", "1", "Count",
|
||||
otel.&longCounterCallback)
|
||||
|
||||
def clientRequestReadUnavailables = otel.mbean("${clientRequestRead},name=Unavailables")
|
||||
otel.instrument(clientRequestReadUnavailables,
|
||||
"cassandra.client.request.read.unavailable.count",
|
||||
"Number of standard read request unavailable exceptions encountered", "1", "Count",
|
||||
otel.&longCounterCallback)
|
||||
|
||||
def clientRequestWrite = "${clientRequest},scope=Write"
|
||||
def clientRequestWriteLatency = otel.mbean("${clientRequestWrite},name=Latency")
|
||||
otel.instrument(clientRequestWriteLatency,
|
||||
|
|
@ -96,28 +63,11 @@ otel.instrument(clientRequestWriteLatency,
|
|||
"Regular write request latency - 99th percentile", "µs", "99thPercentile",
|
||||
otel.&doubleValueCallback)
|
||||
|
||||
otel.instrument(clientRequestWriteLatency,
|
||||
"cassandra.client.request.write.latency.count",
|
||||
"Number of regular write request operations", "1", "Count",
|
||||
otel.&longCounterCallback)
|
||||
|
||||
otel.instrument(clientRequestWriteLatency,
|
||||
"cassandra.client.request.write.latency.max",
|
||||
"Maximum regular write request latency", "µs", "Max",
|
||||
otel.&doubleValueCallback)
|
||||
|
||||
def clientRequestWriteTimeouts = otel.mbean("${clientRequestWrite},name=Timeouts")
|
||||
otel.instrument(clientRequestWriteTimeouts,
|
||||
"cassandra.client.request.write.timeout.count",
|
||||
"Number of regular write request timeouts encountered", "1", "Count",
|
||||
otel.&longCounterCallback)
|
||||
|
||||
def clientRequestWriteUnavailables = otel.mbean("${clientRequestWrite},name=Unavailables")
|
||||
otel.instrument(clientRequestWriteUnavailables,
|
||||
"cassandra.client.request.write.unavailable.count",
|
||||
"Number of regular write request unavailable exceptions encountered", "1", "Count",
|
||||
otel.&longCounterCallback)
|
||||
|
||||
def storage = "${cassandraMetrics}:type=Storage"
|
||||
def storageLoad = otel.mbean("${storage},name=Load")
|
||||
otel.instrument(storageLoad,
|
||||
|
|
@ -137,6 +87,7 @@ otel.instrument(storageTotalHintsInProgress,
|
|||
"Number of hints attempting to be sent currently", "1", "Count",
|
||||
otel.&longUpDownCounterCallback)
|
||||
|
||||
|
||||
def compaction = "${cassandraMetrics}:type=Compaction"
|
||||
def compactionPendingTasks = otel.mbean("${compaction},name=PendingTasks")
|
||||
otel.instrument(compactionPendingTasks,
|
||||
|
|
@ -149,3 +100,53 @@ otel.instrument(compactionCompletedTasks,
|
|||
"cassandra.compaction.tasks.completed",
|
||||
"Number of completed compactions since server [re]start", "1", "Value",
|
||||
otel.&longCounterCallback)
|
||||
|
||||
|
||||
def clientRequests = otel.mbeans([
|
||||
"${clientRequestRangeSlice},name=Latency",
|
||||
"${clientRequestRead},name=Latency",
|
||||
"${clientRequestWrite},name=Latency",
|
||||
])
|
||||
|
||||
otel.instrument(clientRequests,
|
||||
"cassandra.client.request.count",
|
||||
"Number of requests by operation",
|
||||
"1",
|
||||
[
|
||||
"operation" : { mbean -> mbean.name().getKeyProperty("scope") },
|
||||
],
|
||||
"Count", otel.&longCounterCallback)
|
||||
|
||||
def clientRequestErrors = otel.mbeans([
|
||||
"${clientRequestRangeSlice},name=Unavailables",
|
||||
"${clientRequestRangeSlice},name=Timeouts",
|
||||
"${clientRequestRangeSlice},name=Failures",
|
||||
"${clientRequestRead},name=Unavailables",
|
||||
"${clientRequestRead},name=Timeouts",
|
||||
"${clientRequestRead},name=Failures",
|
||||
"${clientRequestWrite},name=Unavailables",
|
||||
"${clientRequestWrite},name=Timeouts",
|
||||
"${clientRequestWrite},name=Failures",
|
||||
])
|
||||
|
||||
otel.instrument(clientRequestErrors,
|
||||
"cassandra.client.request.error.count",
|
||||
"Number of request errors by operation",
|
||||
"1",
|
||||
[
|
||||
"operation" : { mbean -> mbean.name().getKeyProperty("scope") },
|
||||
"status" : {
|
||||
mbean -> switch(mbean.name().getKeyProperty("name")) {
|
||||
case "Unavailables":
|
||||
return "Unavailable"
|
||||
break
|
||||
case "Timeouts":
|
||||
return "Timeout"
|
||||
break
|
||||
case "Failures":
|
||||
return "Failure"
|
||||
break
|
||||
}
|
||||
}
|
||||
],
|
||||
"Count", otel.&longCounterCallback)
|
||||
|
|
|
|||
Loading…
Reference in New Issue