Migrate jmx-metrics IT to Java (#104)

* Migrate jmx-metrics integration tests to Java.

* More

* Remove default unit assertions

* Fixes
This commit is contained in:
Anuraag Agrawal 2021-09-30 08:00:31 +09:00 committed by GitHub
parent 5b98ae9ab2
commit 7315737bc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1144 additions and 1732 deletions

View File

@ -51,7 +51,7 @@ jobs:
if: always() if: always()
with: with:
name: integration-test-results name: integration-test-results
path: jmx-metrics/build/reports/tests/test path: jmx-metrics/build/reports/tests/integrationTest
publish-snapshots: publish-snapshots:
name: publish-snapshots name: publish-snapshots
runs-on: ubuntu-latest runs-on: ubuntu-latest

View File

@ -94,7 +94,7 @@ jobs:
if: always() if: always()
with: with:
name: integration-test-results name: integration-test-results
path: jmx-metrics/build/reports/tests/test path: jmx-metrics/build/reports/tests/integrationTest
publish: publish:
name: publish name: publish
runs-on: ubuntu-latest runs-on: ubuntu-latest

View File

@ -51,4 +51,4 @@ jobs:
if: always() if: always()
with: with:
name: integration-test-results name: integration-test-results
path: jmx-metrics/build/reports/tests/test path: jmx-metrics/build/reports/tests/integrationTest

View File

@ -53,7 +53,7 @@ jobs:
if: always() if: always()
with: with:
name: integration-test-results name: integration-test-results
path: jmx-metrics/build/reports/tests/test path: jmx-metrics/build/reports/tests/integrationTest
publish: publish:
name: publish name: publish
runs-on: ubuntu-latest runs-on: ubuntu-latest

View File

@ -27,16 +27,6 @@ tasks {
} }
} }
val integrationTest by registering {
dependsOn(test)
}
test {
if (gradle.startParameter.taskNames.contains(integrationTest.name)) {
systemProperty("ojc.integration.tests", "true")
}
}
withType<Test>().configureEach { withType<Test>().configureEach {
useJUnitPlatform() useJUnitPlatform()

View File

@ -14,6 +14,7 @@ rootProject.extra["versions"] = dependencyVersions
val DEPENDENCY_BOMS = listOf( val DEPENDENCY_BOMS = listOf(
"com.fasterxml.jackson:jackson-bom:2.12.3", "com.fasterxml.jackson:jackson-bom:2.12.3",
"com.google.guava:guava-bom:30.1.1-jre", "com.google.guava:guava-bom:30.1.1-jre",
"com.linecorp.armeria:armeria-bom:1.11.0",
"org.junit:junit-bom:5.7.2", "org.junit:junit-bom:5.7.2",
"com.linecorp.armeria:armeria-bom:1.9.1", "com.linecorp.armeria:armeria-bom:1.9.1",
"io.grpc:grpc-bom:1.39.0", "io.grpc:grpc-bom:1.39.0",

View File

@ -1,6 +1,7 @@
plugins { plugins {
application application
id("com.github.johnrengelman.shadow") id("com.github.johnrengelman.shadow")
id("org.unbroken-dome.test-sets")
id("otel.groovy-conventions") id("otel.groovy-conventions")
id("otel.publish-conventions") id("otel.publish-conventions")
@ -23,6 +24,10 @@ repositories {
val groovyVersion = "2.5.11" val groovyVersion = "2.5.11"
testSets {
create("integrationTest")
}
dependencies { dependencies {
api(platform("org.codehaus.groovy:groovy-bom:$groovyVersion")) api(platform("org.codehaus.groovy:groovy-bom:$groovyVersion"))
@ -36,6 +41,7 @@ dependencies {
implementation("io.opentelemetry:opentelemetry-sdk") implementation("io.opentelemetry:opentelemetry-sdk")
implementation("io.opentelemetry:opentelemetry-sdk-metrics") implementation("io.opentelemetry:opentelemetry-sdk-metrics")
implementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure") implementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
implementation("io.opentelemetry:opentelemetry-sdk-metrics-testing")
implementation("io.opentelemetry:opentelemetry-sdk-testing") implementation("io.opentelemetry:opentelemetry-sdk-testing")
implementation("io.opentelemetry:opentelemetry-exporter-logging") implementation("io.opentelemetry:opentelemetry-exporter-logging")
implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics") implementation("io.opentelemetry:opentelemetry-exporter-otlp-metrics")
@ -55,6 +61,10 @@ dependencies {
testImplementation("org.apache.httpcomponents.client5:httpclient5-fluent:5.0.1") testImplementation("org.apache.httpcomponents.client5:httpclient5-fluent:5.0.1")
testImplementation("org.testcontainers:testcontainers") testImplementation("org.testcontainers:testcontainers")
testImplementation("io.opentelemetry:opentelemetry-proto") testImplementation("io.opentelemetry:opentelemetry-proto")
add("integrationTestImplementation", "com.linecorp.armeria:armeria-grpc")
add("integrationTestImplementation", "com.linecorp.armeria:armeria-junit5")
add("integrationTestImplementation", "org.testcontainers:junit-jupiter")
} }
tasks { tasks {

View File

@ -0,0 +1,235 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxmetrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.testcontainers.Testcontainers.exposeHostPorts;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInstance;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.MountableFile;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@Testcontainers(disabledWithoutDocker = true)
public abstract class AbstractIntegrationTest {
private final boolean configFromStdin;
private final String configName;
private GenericContainer<?> scraper;
private OtlpGrpcServer otlpServer;
protected AbstractIntegrationTest(boolean configFromStdin, String configName) {
this.configFromStdin = configFromStdin;
this.configName = configName;
}
@BeforeAll
void beforeAll() {
otlpServer = new OtlpGrpcServer();
otlpServer.start();
exposeHostPorts(otlpServer.httpPort());
String scraperJarPath = System.getProperty("shadow.jar.path");
List<String> scraperCommand = new ArrayList<>();
scraperCommand.add("java");
scraperCommand.add("-cp");
scraperCommand.add("/app/OpenTelemetryJava.jar");
scraperCommand.add("-Dotel.jmx.username=cassandra");
scraperCommand.add("-Dotel.jmx.password=cassandra");
scraperCommand.add(
"-Dotel.exporter.otlp.endpoint=http://host.testcontainers.internal:"
+ otlpServer.httpPort());
scraperCommand.add("io.opentelemetry.contrib.jmxmetrics.JmxMetrics");
scraperCommand.add("-config");
if (configFromStdin) {
String cmd = String.join(" ", scraperCommand);
scraperCommand = Arrays.asList("sh", "-c", "cat /app/" + configName + " | " + cmd + " -");
} else {
scraperCommand.add("/app/" + configName);
}
scraper =
new GenericContainer<>("openjdk:8u272-jre-slim")
.withNetwork(Network.SHARED)
.withCopyFileToContainer(
MountableFile.forHostPath(scraperJarPath), "/app/OpenTelemetryJava.jar")
.withCopyFileToContainer(
MountableFile.forClasspathResource("script.groovy"), "/app/script.groovy")
.withCopyFileToContainer(
MountableFile.forClasspathResource(configName), "/app/" + configName)
.withCommand(scraperCommand.toArray(new String[0]))
.withStartupTimeout(Duration.ofSeconds(120))
.waitingFor(Wait.forLogMessage(".*Started GroovyRunner.*", 1));
scraper.start();
}
@AfterAll
void afterAll() {
otlpServer.stop();
}
@BeforeEach
void beforeEach() {
otlpServer.reset();
}
protected void waitAndAssertMetrics(Consumer<Metric>... assertions) {
await()
.atMost(Duration.ofSeconds(30))
.untilAsserted(
() -> {
List<Metric> metrics =
otlpServer.getMetrics().stream()
.map(ExportMetricsServiceRequest::getResourceMetricsList)
.flatMap(
rm ->
rm.stream()
.map(ResourceMetrics::getInstrumentationLibraryMetricsList))
.flatMap(Collection::stream)
.filter(
ilm ->
ilm.getInstrumentationLibrary()
.getName()
.equals("io.opentelemetry.contrib.jmxmetrics")
&& ilm.getInstrumentationLibrary()
.getVersion()
.equals(expectedMeterVersion()))
.flatMap(ilm -> ilm.getMetricsList().stream())
.collect(Collectors.toList());
assertThat(metrics).isNotEmpty();
for (Consumer<Metric> assertion : assertions) {
assertThat(metrics).anySatisfy(assertion);
}
});
}
protected void assertGauge(Metric metric, String name, String description, String unit) {
assertThat(metric.getName()).isEqualTo(name);
assertThat(metric.getDescription()).isEqualTo(description);
assertThat(metric.getUnit()).isEqualTo(unit);
assertThat(metric.hasGauge()).isTrue();
assertThat(metric.getGauge().getDataPointsList())
.satisfiesExactly(point -> assertThat(point.getAttributesList()).isEmpty());
}
protected void assertSum(Metric metric, String name, String description, String unit) {
assertThat(metric.getName()).isEqualTo(name);
assertThat(metric.getDescription()).isEqualTo(description);
assertThat(metric.getUnit()).isEqualTo(unit);
assertThat(metric.hasSum()).isTrue();
assertThat(metric.getSum().getDataPointsList())
.satisfiesExactly(point -> assertThat(point.getAttributesList()).isEmpty());
}
protected void assertTypedGauge(
Metric metric, String name, String description, String unit, List<String> types) {
assertThat(metric.getName()).isEqualTo(name);
assertThat(metric.getDescription()).isEqualTo(description);
assertThat(metric.getUnit()).isEqualTo(unit);
assertThat(metric.hasGauge()).isTrue();
assertTypedPoints(metric.getGauge().getDataPointsList(), types);
}
protected void assertTypedSum(
Metric metric, String name, String description, String unit, List<String> types) {
assertThat(metric.getName()).isEqualTo(name);
assertThat(metric.getDescription()).isEqualTo(description);
assertThat(metric.getUnit()).isEqualTo(unit);
assertThat(metric.hasSum()).isTrue();
assertTypedPoints(metric.getSum().getDataPointsList(), types);
}
private static final String expectedMeterVersion() {
// Automatically set by gradle when running the tests
String version = System.getProperty("gradle.project.version");
assert version != null && !version.isEmpty();
return version;
}
@SuppressWarnings("unchecked")
private static void assertTypedPoints(List<NumberDataPoint> points, List<String> types) {
assertThat(points)
.satisfiesExactlyInAnyOrder(
types.stream()
.map(
type ->
(Consumer<NumberDataPoint>)
point ->
assertThat(point.getAttributesList())
.singleElement()
.satisfies(
attribute -> {
assertThat(attribute.getKey()).isEqualTo("name");
assertThat(attribute.getValue().getStringValue())
.isEqualTo(type);
}))
.toArray(Consumer[]::new));
}
private static class OtlpGrpcServer extends ServerExtension {
private final BlockingQueue<ExportMetricsServiceRequest> metricRequests =
new LinkedBlockingDeque<>();
List<ExportMetricsServiceRequest> getMetrics() {
return new ArrayList<>(metricRequests);
}
void reset() {
metricRequests.clear();
}
@Override
protected void configure(ServerBuilder sb) {
sb.service(
GrpcService.builder()
.addService(
new MetricsServiceGrpc.MetricsServiceImplBase() {
@Override
public void export(
ExportMetricsServiceRequest request,
StreamObserver<ExportMetricsServiceResponse> responseObserver) {
metricRequests.add(request);
responseObserver.onNext(ExportMetricsServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
}
})
.build());
sb.http(0);
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxmetrics;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.MountableFile;
abstract class OtlpIntegrationTest extends AbstractIntegrationTest {
OtlpIntegrationTest(boolean configFromStdin) {
super(configFromStdin, "otlp_config.properties");
}
@Container
GenericContainer<?> cassandra =
new GenericContainer<>("cassandra:3.11")
.withNetwork(Network.SHARED)
.withEnv("LOCAL_JMX", "no")
.withCopyFileToContainer(
MountableFile.forClasspathResource("cassandra/jmxremote.password", 0400),
"/etc/cassandra/jmxremote.password")
.withNetworkAliases("cassandra")
.withExposedPorts(7199)
.withStartupTimeout(Duration.ofSeconds(120))
.waitingFor(Wait.forListeningPort());
@Test
void endToEnd() {
waitAndAssertMetrics(
metric -> {
assertThat(metric.getName()).isEqualTo("cassandra.storage.load");
assertThat(metric.getDescription())
.isEqualTo("Size, in bytes, of the on disk data size this node manages");
assertThat(metric.getUnit()).isEqualTo("By");
assertThat(metric.hasHistogram()).isTrue();
assertThat(metric.getHistogram().getDataPointsList())
.satisfiesExactly(
point ->
assertThat(point.getAttributesList())
.containsExactly(
KeyValue.newBuilder()
.setKey("myKey")
.setValue(AnyValue.newBuilder().setStringValue("myVal"))
.build()));
});
}
static class ConfigFromStdin extends OtlpIntegrationTest {
ConfigFromStdin() {
super(true);
}
}
static class ConfigFromFile extends OtlpIntegrationTest {
ConfigFromFile() {
super(false);
}
}
}

View File

@ -0,0 +1,178 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxmetrics.target_systems;
import io.opentelemetry.contrib.jmxmetrics.AbstractIntegrationTest;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.MountableFile;
class CassandraIntegrationTest extends AbstractIntegrationTest {
CassandraIntegrationTest() {
super(false, "target-systems/cassandra.properties");
}
@Container
GenericContainer<?> cassandra =
new GenericContainer<>("cassandra:3.11")
.withNetwork(Network.SHARED)
.withEnv("LOCAL_JMX", "no")
.withCopyFileToContainer(
MountableFile.forClasspathResource("cassandra/jmxremote.password", 0400),
"/etc/cassandra/jmxremote.password")
.withNetworkAliases("cassandra")
.withExposedPorts(7199)
.withStartupTimeout(Duration.ofSeconds(120))
.waitingFor(Wait.forListeningPort());
@Test
void endToEnd() {
waitAndAssertMetrics(
metric ->
assertGauge(
metric,
"cassandra.client.request.range_slice.latency.50p",
"Token range read request latency - 50th percentile",
"µs"),
metric ->
assertGauge(
metric,
"cassandra.client.request.range_slice.latency.99p",
"Token range read request latency - 99th percentile",
"µs"),
metric ->
assertSum(
metric,
"cassandra.client.request.range_slice.latency.count",
"Total token range read request latency",
"µs"),
metric ->
assertGauge(
metric,
"cassandra.client.request.range_slice.latency.max",
"Maximum token range read request latency",
"µs"),
metric ->
assertSum(
metric,
"cassandra.client.request.range_slice.timeout.count",
"Number of token range read request timeouts encountered",
"1"),
metric ->
assertSum(
metric,
"cassandra.client.request.range_slice.unavailable.count",
"Number of token range read request unavailable exceptions encountered",
"1"),
metric ->
assertGauge(
metric,
"cassandra.client.request.read.latency.50p",
"Standard read request latency - 50th percentile",
"µs"),
metric ->
assertGauge(
metric,
"cassandra.client.request.read.latency.99p",
"Standard read request latency - 99th percentile",
"µs"),
metric ->
assertSum(
metric,
"cassandra.client.request.read.latency.count",
"Total standard read request latency",
"µs"),
metric ->
assertGauge(
metric,
"cassandra.client.request.read.latency.max",
"Maximum standard read request latency",
"µs"),
metric ->
assertSum(
metric,
"cassandra.client.request.read.timeout.count",
"Number of standard read request timeouts encountered",
"1"),
metric ->
assertSum(
metric,
"cassandra.client.request.read.unavailable.count",
"Number of standard read request unavailable exceptions encountered",
"1"),
metric ->
assertGauge(
metric,
"cassandra.client.request.write.latency.50p",
"Regular write request latency - 50th percentile",
"µs"),
metric ->
assertGauge(
metric,
"cassandra.client.request.write.latency.99p",
"Regular write request latency - 99th percentile",
"µs"),
metric ->
assertSum(
metric,
"cassandra.client.request.write.latency.count",
"Total regular write request latency",
"µs"),
metric ->
assertGauge(
metric,
"cassandra.client.request.write.latency.max",
"Maximum regular write request latency",
"µs"),
metric ->
assertSum(
metric,
"cassandra.client.request.write.timeout.count",
"Number of regular write request timeouts encountered",
"1"),
metric ->
assertSum(
metric,
"cassandra.client.request.write.unavailable.count",
"Number of regular write request unavailable exceptions encountered",
"1"),
metric ->
assertSum(
metric,
"cassandra.compaction.tasks.completed",
"Number of completed compactions since server [re]start",
"1"),
metric ->
assertGauge(
metric,
"cassandra.compaction.tasks.pending",
"Estimated number of compactions remaining to perform",
"1"),
metric ->
assertSum(
metric,
"cassandra.storage.load.count",
"Size of the on disk data size this node manages",
"by"),
metric ->
assertSum(
metric,
"cassandra.storage.total_hints.count",
"Number of hint messages written to this node since [re]start",
"1"),
metric ->
assertSum(
metric,
"cassandra.storage.total_hints.in_progress.count",
"Number of hints attempting to be sent currently",
"1"));
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxmetrics.target_systems;
import io.opentelemetry.contrib.jmxmetrics.AbstractIntegrationTest;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.MountableFile;
class JvmTargetSystemIntegrationTest extends AbstractIntegrationTest {
JvmTargetSystemIntegrationTest() {
super(false, "target-systems/jvm.properties");
}
@Container
GenericContainer<?> cassandra =
new GenericContainer<>("cassandra:3.11")
.withNetwork(Network.SHARED)
.withEnv("LOCAL_JMX", "no")
.withCopyFileToContainer(
MountableFile.forClasspathResource("cassandra/jmxremote.password", 0400),
"/etc/cassandra/jmxremote.password")
.withNetworkAliases("cassandra")
.withExposedPorts(7199)
.withStartupTimeout(Duration.ofSeconds(120))
.waitingFor(Wait.forListeningPort());
@Test
void endToEnd() {
List<String> gcLabels =
Arrays.asList(
"Code Cache",
"Par Eden Space",
"CMS Old Gen",
"Compressed Class Space",
"Metaspace",
"Par Survivor Space");
waitAndAssertMetrics(
metric -> assertGauge(metric, "jvm.classes.loaded", "number of loaded classes", "1"),
metric ->
assertTypedSum(
metric,
"jvm.gc.collections.count",
"total number of collections that have occurred",
"1",
Arrays.asList("ConcurrentMarkSweep", "ParNew")),
metric ->
assertTypedSum(
metric,
"jvm.gc.collections.elapsed",
"the approximate accumulated collection elapsed time in milliseconds",
"ms",
Arrays.asList("ConcurrentMarkSweep", "ParNew")),
metric -> assertGauge(metric, "jvm.memory.heap.committed", "current heap usage", "by"),
metric -> assertGauge(metric, "jvm.memory.heap.init", "current heap usage", "by"),
metric -> assertGauge(metric, "jvm.memory.heap.max", "current heap usage", "by"),
metric -> assertGauge(metric, "jvm.memory.heap.used", "current heap usage", "by"),
metric ->
assertGauge(metric, "jvm.memory.nonheap.committed", "current non-heap usage", "by"),
metric -> assertGauge(metric, "jvm.memory.nonheap.init", "current non-heap usage", "by"),
metric -> assertGauge(metric, "jvm.memory.nonheap.max", "current non-heap usage", "by"),
metric -> assertGauge(metric, "jvm.memory.nonheap.used", "current non-heap usage", "by"),
metric ->
assertTypedGauge(
metric, "jvm.memory.pool.committed", "current memory pool usage", "by", gcLabels),
metric ->
assertTypedGauge(
metric, "jvm.memory.pool.init", "current memory pool usage", "by", gcLabels),
metric ->
assertTypedGauge(
metric, "jvm.memory.pool.max", "current memory pool usage", "by", gcLabels),
metric ->
assertTypedGauge(
metric, "jvm.memory.pool.used", "current memory pool usage", "by", gcLabels),
metric -> assertGauge(metric, "jvm.threads.count", "number of threads", "1"));
}
}

View File

@ -0,0 +1,550 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxmetrics.target_systems;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.contrib.jmxmetrics.AbstractIntegrationTest;
import io.opentelemetry.proto.metrics.v1.Metric;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.utility.MountableFile;
abstract class KafkaIntegrationTest extends AbstractIntegrationTest {
protected KafkaIntegrationTest(String configName) {
super(false, configName);
}
@Container
GenericContainer<?> zookeeper =
new GenericContainer<>("zookeeper:3.5")
.withNetwork(Network.SHARED)
.withNetworkAliases("zookeeper")
.withStartupTimeout(Duration.ofSeconds(120))
.waitingFor(Wait.forListeningPort());
@Container
GenericContainer<?> kafka =
new GenericContainer<>("bitnami/kafka:latest")
.withNetwork(Network.SHARED)
.withEnv("KAFKA_CFG_ZOOKEEPER_CONNECT", "zookeeper:2181")
.withEnv("ALLOW_PLAINTEXT_LISTENER", "yes")
.withEnv("JMX_PORT", "7199")
.withNetworkAliases("kafka")
.withExposedPorts(7199)
.withStartupTimeout(Duration.ofSeconds(120))
.waitingFor(Wait.forListeningPort())
.dependsOn(zookeeper);
Startable createTopics =
new Startable() {
@Override
public void start() {
try {
kafka.execInContainer(
"sh",
"-c",
"unset JMX_PORT; for i in `seq 3`; do kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic-$i; done");
} catch (Exception e) {
throw new AssertionError(e);
}
}
@Override
public void stop() {}
@Override
public Set<Startable> getDependencies() {
return Collections.singleton(kafka);
}
};
static class KafkaBrokerTargetIntegrationTest extends KafkaIntegrationTest {
KafkaBrokerTargetIntegrationTest() {
super("target-systems/kafka.properties");
}
@Test
void endToEnd() {
waitAndAssertMetrics(
metric -> assertGauge(metric, "kafka.bytes.in", "bytes in per second from clients", "by"),
metric -> assertGauge(metric, "kafka.bytes.out", "bytes out per second to clients", "by"),
metric ->
assertGauge(
metric, "kafka.controller.active.count", "controller is active on broker", "1"),
metric ->
assertGauge(
metric,
"kafka.fetch.consumer.total.time.99p",
"fetch consumer request time - 99th percentile",
"ms"),
metric ->
assertSum(
metric,
"kafka.fetch.consumer.total.time.count",
"fetch consumer request count",
"1"),
metric ->
assertGauge(
metric,
"kafka.fetch.consumer.total.time.median",
"fetch consumer request time - 50th percentile",
"ms"),
metric ->
assertGauge(
metric,
"kafka.fetch.follower.total.time.99p",
"fetch follower request time - 99th percentile",
"ms"),
metric ->
assertSum(
metric,
"kafka.fetch.follower.total.time.count",
"fetch follower request count",
"1"),
metric ->
assertGauge(
metric,
"kafka.fetch.follower.total.time.median",
"fetch follower request time - 50th percentile",
"ms"),
metric ->
assertGauge(metric, "kafka.isr.shrinks", "in-sync replica shrinks per second", "1"),
metric ->
assertGauge(
metric,
"kafka.leader.election.rate",
"leader election rate - non-zero indicates broker failures",
"1"),
metric ->
assertGauge(
metric,
"kafka.max.lag",
"max lag in messages between follower and leader replicas",
"1"),
metric ->
assertGauge(metric, "kafka.messages.in", "number of messages in per second", "1"),
metric ->
assertGauge(
metric,
"kafka.partitions.offline.count",
"number of partitions without an active leader",
"1"),
metric ->
assertGauge(
metric,
"kafka.partitions.underreplicated.count",
"number of under replicated partitions",
"1"),
metric ->
assertGauge(
metric,
"kafka.produce.total.time.99p",
"produce request time - 99th percentile",
"ms"),
metric ->
assertSum(metric, "kafka.produce.total.time.count", "produce request count", "1"),
metric ->
assertGauge(
metric,
"kafka.produce.total.time.median",
"produce request time - 50th percentile",
"ms"),
metric -> assertGauge(metric, "kafka.request.queue", "size of the request queue", "1"),
metric ->
assertGauge(
metric,
"kafka.unclean.election.rate",
"unclean leader election rate - non-zero indicates broker failures",
"1"));
}
}
static class KafkaConsumerIntegrationTest extends KafkaIntegrationTest {
KafkaConsumerIntegrationTest() {
super("target-systems/kafka-consumer.properties");
}
@Container
GenericContainer<?> consumer =
new GenericContainer<>("bitnami/kafka:latest")
.withNetwork(Network.SHARED)
.withEnv("KAFKA_CFG_ZOOKEEPER_CONNECT", "zookeeper:2181")
.withEnv("ALLOW_PLAINTEXT_LISTENER", "yes")
.withEnv("JMX_PORT", "7199")
.withNetworkAliases("kafka-consumer")
.withExposedPorts(7199)
.withCommand(
"kafka-console-consumer.sh",
"--bootstrap-server",
"kafka:9092",
"--whitelist",
"test-topic-.*",
"--max-messages",
"100")
.withStartupTimeout(Duration.ofSeconds(120))
.waitingFor(Wait.forListeningPort())
.dependsOn(createTopics);
@Test
void endToEnd() {
List<String> topics = Arrays.asList("test-topic-1", "test-topic-2", "test-topic-3");
waitAndAssertMetrics(
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.bytes-consumed-rate",
"The average number of bytes consumed per second",
"by",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.fetch-rate",
"The number of fetch requests for all topics per second",
"1"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.fetch-size-avg",
"The average number of bytes fetched per request",
"by",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.records-consumed-rate",
"The average number of records consumed per second",
"1",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.records-lag-max",
"Number of messages the consumer lags behind the producer",
"1"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.total.bytes-consumed-rate",
"The average number of bytes consumed for all topics per second",
"by"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.total.fetch-size-avg",
"The average number of bytes fetched per request for all topics",
"by"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.total.records-consumed-rate",
"The average number of records consumed for all topics per second",
"1"));
}
}
static class KafkaProducerIntegrationTest extends KafkaIntegrationTest {
KafkaProducerIntegrationTest() {
super("target-systems/kafka-producer.properties");
}
@Container
GenericContainer<?> producer =
new GenericContainer<>("bitnami/kafka:latest")
.withNetwork(Network.SHARED)
.withEnv("KAFKA_CFG_ZOOKEEPER_CONNECT", "zookeeper:2181")
.withEnv("ALLOW_PLAINTEXT_LISTENER", "yes")
.withEnv("JMX_PORT", "7199")
.withNetworkAliases("kafka-producer")
.withExposedPorts(7199)
.withCopyFileToContainer(
MountableFile.forClasspathResource("target-systems/kafka-producer.sh"),
"/usr/bin/kafka-producer.sh")
.withCommand("kafka-producer.sh")
.withStartupTimeout(Duration.ofSeconds(120))
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka-producer")))
.waitingFor(Wait.forListeningPort())
.dependsOn(createTopics);
@Test
void endToEnd() {
List<String> topics = Collections.singletonList("test-topic-1");
waitAndAssertMetrics(
metric ->
assertKafkaGauge(
metric,
"kafka.producer.byte-rate",
"The average number of bytes sent per second for a topic",
"by",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.compression-rate",
"The average compression rate of record batches for a topic",
"1",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.io-wait-time-ns-avg",
"The average length of time the I/O thread spent waiting for a socket ready for reads or writes",
"ns"),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.outgoing-byte-rate",
"The average number of outgoing bytes sent per second to all servers",
"by"),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.record-error-rate",
"The average per-second number of record sends that resulted in errors for a topic",
"1",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.record-retry-rate",
"The average per-second number of retried record sends for a topic",
"1",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.record-send-rate",
"The average number of records sent per second for a topic",
"1",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.request-latency-avg",
"The average request latency",
"ms"),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.request-rate",
"The average number of requests sent per second",
"1"),
metric ->
assertKafkaGauge(
metric, "kafka.producer.response-rate", "Responses received per second", "1"));
}
}
static class KafkaAndJvmIntegrationText extends KafkaIntegrationTest {
KafkaAndJvmIntegrationText() {
super("target-systems/jvm-and-kafka.properties");
}
@Test
void endToEnd() {
List<String> gcLabels =
Arrays.asList(
"CodeHeap 'non-nmethods'",
"CodeHeap 'non-profiled nmethods'",
"CodeHeap 'profiled nmethods'",
"Compressed Class Space",
"G1 Eden Space",
"G1 Old Gen",
"G1 Survivor Space",
"Metaspace");
waitAndAssertMetrics(
metric -> assertGauge(metric, "jvm.classes.loaded", "number of loaded classes", "1"),
metric ->
assertTypedSum(
metric,
"jvm.gc.collections.count",
"total number of collections that have occurred",
"1",
Arrays.asList("G1 Young Generation", "G1 Old Generation")),
metric ->
assertTypedSum(
metric,
"jvm.gc.collections.elapsed",
"the approximate accumulated collection elapsed time in milliseconds",
"ms",
Arrays.asList("G1 Young Generation", "G1 Old Generation")),
metric -> assertGauge(metric, "jvm.memory.heap.committed", "current heap usage", "by"),
metric -> assertGauge(metric, "jvm.memory.heap.init", "current heap usage", "by"),
metric -> assertGauge(metric, "jvm.memory.heap.max", "current heap usage", "by"),
metric -> assertGauge(metric, "jvm.memory.heap.used", "current heap usage", "by"),
metric ->
assertGauge(metric, "jvm.memory.nonheap.committed", "current non-heap usage", "by"),
metric -> assertGauge(metric, "jvm.memory.nonheap.init", "current non-heap usage", "by"),
metric -> assertGauge(metric, "jvm.memory.nonheap.max", "current non-heap usage", "by"),
metric -> assertGauge(metric, "jvm.memory.nonheap.used", "current non-heap usage", "by"),
metric ->
assertTypedGauge(
metric, "jvm.memory.pool.committed", "current memory pool usage", "by", gcLabels),
metric ->
assertTypedGauge(
metric, "jvm.memory.pool.init", "current memory pool usage", "by", gcLabels),
metric ->
assertTypedGauge(
metric, "jvm.memory.pool.max", "current memory pool usage", "by", gcLabels),
metric ->
assertTypedGauge(
metric, "jvm.memory.pool.used", "current memory pool usage", "by", gcLabels),
metric -> assertGauge(metric, "jvm.threads.count", "number of threads", "1"),
metric -> assertGauge(metric, "kafka.bytes.in", "bytes in per second from clients", "by"),
metric -> assertGauge(metric, "kafka.bytes.out", "bytes out per second to clients", "by"),
metric ->
assertGauge(
metric, "kafka.controller.active.count", "controller is active on broker", "1"),
metric ->
assertGauge(
metric,
"kafka.fetch.consumer.total.time.99p",
"fetch consumer request time - 99th percentile",
"ms"),
metric ->
assertSum(
metric,
"kafka.fetch.consumer.total.time.count",
"fetch consumer request count",
"1"),
metric ->
assertGauge(
metric,
"kafka.fetch.consumer.total.time.median",
"fetch consumer request time - 50th percentile",
"ms"),
metric ->
assertGauge(
metric,
"kafka.fetch.follower.total.time.99p",
"fetch follower request time - 99th percentile",
"ms"),
metric ->
assertSum(
metric,
"kafka.fetch.follower.total.time.count",
"fetch follower request count",
"1"),
metric ->
assertGauge(
metric,
"kafka.fetch.follower.total.time.median",
"fetch follower request time - 50th percentile",
"ms"),
metric ->
assertGauge(metric, "kafka.isr.shrinks", "in-sync replica shrinks per second", "1"),
metric ->
assertGauge(
metric,
"kafka.leader.election.rate",
"leader election rate - non-zero indicates broker failures",
"1"),
metric ->
assertGauge(
metric,
"kafka.max.lag",
"max lag in messages between follower and leader replicas",
"1"),
metric ->
assertGauge(metric, "kafka.messages.in", "number of messages in per second", "1"),
metric ->
assertGauge(
metric,
"kafka.partitions.offline.count",
"number of partitions without an active leader",
"1"),
metric ->
assertGauge(
metric,
"kafka.partitions.underreplicated.count",
"number of under replicated partitions",
"1"),
metric ->
assertGauge(
metric,
"kafka.produce.total.time.99p",
"produce request time - 99th percentile",
"ms"),
metric ->
assertSum(metric, "kafka.produce.total.time.count", "produce request count", "1"),
metric ->
assertGauge(
metric,
"kafka.produce.total.time.median",
"produce request time - 50th percentile",
"ms"),
metric -> assertGauge(metric, "kafka.request.queue", "size of the request queue", "1"),
metric ->
assertGauge(
metric,
"kafka.unclean.election.rate",
"unclean leader election rate - non-zero indicates broker failures",
"1"));
}
}
static void assertKafkaGauge(Metric metric, String name, String description, String unit) {
assertThat(metric.getName()).isEqualTo(name);
assertThat(metric.getDescription()).isEqualTo(description);
assertThat(metric.getUnit()).isEqualTo(unit);
assertThat(metric.hasGauge()).isTrue();
assertThat(metric.getGauge().getDataPointsList())
.satisfiesExactly(
point ->
assertThat(point.getAttributesList())
.singleElement()
.satisfies(
attribute -> {
assertThat(attribute.getKey()).isEqualTo("client-id");
assertThat(attribute.getValue().getStringValue()).isNotEmpty();
}));
}
@SuppressWarnings("unchecked")
static void assertKafkaGauge(
Metric metric, String name, String description, String unit, List<String> topics) {
assertThat(metric.getName()).isEqualTo(name);
assertThat(metric.getDescription()).isEqualTo(description);
assertThat(metric.getUnit()).isEqualTo(unit);
assertThat(metric.hasGauge()).isTrue();
assertThat(metric.getGauge().getDataPointsList())
.satisfiesExactlyInAnyOrder(
topics.stream()
.map(
topic ->
(Consumer<NumberDataPoint>)
point ->
assertThat(point.getAttributesList())
.satisfiesExactlyInAnyOrder(
attribute -> {
assertThat(attribute.getKey()).isEqualTo("client-id");
assertThat(attribute.getValue().getStringValue())
.isNotEmpty();
},
attribute -> {
assertThat(attribute.getKey()).isEqualTo("topic");
assertThat(attribute.getValue().getStringValue())
.isEqualTo(topic);
}))
.toArray(Consumer[]::new));
}
}

View File

@ -0,0 +1 @@
cassandra cassandra

View File

@ -1,257 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxmetrics
import static org.junit.Assert.assertTrue
import java.time.Duration
import java.util.concurrent.TimeUnit
import io.grpc.ServerBuilder
import io.grpc.stub.StreamObserver
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc
import io.opentelemetry.proto.metrics.v1.ResourceMetrics
import org.slf4j.LoggerFactory
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.Network
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.images.builder.ImageFromDockerfile
import org.testcontainers.lifecycle.Startable
import org.testcontainers.utility.MountableFile
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Unroll
@Unroll
class IntegrationTest extends Specification{
@Shared
def targets
@Shared
def targetContainers = []
@Shared
def jmxExtensionAppContainer
@Shared
def jmxExposedPort
void configureContainers(String configName, int otlpPort, int prometheusPort, boolean configFromStdin) {
def jarPath = System.getProperty("shadow.jar.path")
def scriptName = "script.groovy"
def scriptPath = ClassLoader.getSystemClassLoader().getResource(scriptName).path
def configPath = ClassLoader.getSystemClassLoader().getResource(configName).path
def network = Network.SHARED
def jvmCommand = [
"java",
"-cp",
"/app/OpenTelemetryJava.jar",
"-Dotel.jmx.username=cassandra",
"-Dotel.jmx.password=cassandra",
"-Dotel.exporter.otlp.endpoint=http://host.testcontainers.internal:${otlpPort}",
"io.opentelemetry.contrib.jmxmetrics.JmxMetrics",
"-config",
]
if (configFromStdin) {
def cmd = jvmCommand.join(' ')
jvmCommand = [
"sh",
"-c",
"cat /app/${configName} | ${cmd} -",
]
} else {
jvmCommand.add("/app/${configName}")
}
if ("cassandra" in targets) {
def dockerfile = ("FROM cassandra:3.11\nENV LOCAL_JMX=no\n"
+ "RUN echo 'cassandra cassandra' > /etc/cassandra/jmxremote.password\n"
+ "RUN chmod 0400 /etc/cassandra/jmxremote.password\n")
targetContainers.add(
new GenericContainer<>(
new ImageFromDockerfile().withFileFromString( "Dockerfile", dockerfile)
).withNetwork(network)
.withNetworkAliases("cassandra")
.withExposedPorts(7199)
.withStartupTimeout(Duration.ofSeconds(120))
.waitingFor(Wait.forListeningPort())
)
}
if (targets.any { it.contains("kafka") }) {
def zookeeper = new GenericContainer<>("zookeeper:3.5")
.withNetwork(network)
.withNetworkAliases("zookeeper")
.withStartupTimeout(Duration.ofSeconds(120))
.waitingFor(Wait.forListeningPort())
targetContainers.add(zookeeper)
def kafka = new GenericContainer<>("bitnami/kafka:latest")
.withNetwork(network)
.withEnv([ "KAFKA_CFG_ZOOKEEPER_CONNECT" : "zookeeper:2181", "ALLOW_PLAINTEXT_LISTENER" : "yes", "JMX_PORT": "7199"])
.withNetworkAliases("kafka")
.withExposedPorts(7199)
.withStartupTimeout(Duration.ofSeconds(120))
.waitingFor(Wait.forListeningPort())
.dependsOn(zookeeper)
targetContainers.add(kafka)
if (targets.any {
it in [
"kafka-consumer",
"kafka-producer"
]
}) {
def createTopics = new Startable() {
@Override
void start() {
kafka.execInContainer(
"sh", "-c",
"unset JMX_PORT; for i in `seq 3`; do kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic-\$i; done"
)
}
@Override
void stop() { }
}
if ("kafka-consumer" in targets) {
def kafkaConsumer = new GenericContainer<>("bitnami/kafka:latest")
.withNetwork(network)
.withEnv([ "KAFKA_CFG_ZOOKEEPER_CONNECT" : "zookeeper:2181", "ALLOW_PLAINTEXT_LISTENER" : "yes", "JMX_PORT": "7199"])
.withNetworkAliases("kafka-consumer")
.withExposedPorts(7199)
.withCommand("kafka-console-consumer.sh", "--bootstrap-server", "kafka:9092",
"--whitelist", "test-topic-.*", "--max-messages", "100"
).withStartupTimeout(Duration.ofSeconds(120))
.waitingFor(Wait.forListeningPort())
.dependsOn(kafka, createTopics)
targetContainers.add(kafkaConsumer)
} else {
def producerPath = ClassLoader.getSystemClassLoader().getResource("target-systems/kafka-producer.sh").path
def kafkaProducer = new GenericContainer<>("bitnami/kafka:latest")
.withNetwork(network)
.withEnv([ "KAFKA_CFG_ZOOKEEPER_CONNECT" : "zookeeper:2181", "ALLOW_PLAINTEXT_LISTENER" : "yes", "JMX_PORT": "7199"])
.withNetworkAliases("kafka-producer")
.withExposedPorts(7199)
.withCopyFileToContainer(MountableFile.forHostPath(producerPath), "/usr/bin/kafka-producer.sh")
.withCommand( "kafka-producer.sh")
.withStartupTimeout(Duration.ofSeconds(120))
.waitingFor(Wait.forListeningPort())
.dependsOn(kafka, createTopics)
targetContainers.add(kafkaProducer)
}
}
}
targetContainers.each {
it.start()
}
jmxExtensionAppContainer =
new GenericContainer<>("openjdk:8u272-jre-slim")
.withNetwork(network)
.withCopyFileToContainer(MountableFile.forHostPath(jarPath), "/app/OpenTelemetryJava.jar")
.withCopyFileToContainer(
MountableFile.forHostPath(scriptPath), "/app/${scriptName}")
.withCopyFileToContainer(
MountableFile.forHostPath(configPath), "/app/${configName}")
.withCommand(jvmCommand as String[])
.withStartupTimeout(Duration.ofSeconds(120))
.waitingFor(Wait.forLogMessage(".*Started GroovyRunner.*", 1))
.dependsOn(targetContainers)
if (prometheusPort != 0) {
jmxExtensionAppContainer.withExposedPorts(prometheusPort)
}
jmxExtensionAppContainer.start()
targetContainers.each {
assertTrue(it.running)
}
assertTrue(jmxExtensionAppContainer.running)
if (prometheusPort != 0) {
jmxExposedPort = jmxExtensionAppContainer.getMappedPort(prometheusPort)
}
}
}
class OtlpIntegrationTest extends IntegrationTest {
@Shared
def collector
@Shared
def collectorServer
@Shared
def otlpPort
def setup() {
// set up a collector per test to avoid noisy neighbor
otlpPort = availablePort()
collector = new Collector()
collectorServer = ServerBuilder.forPort(otlpPort).addService(collector).build()
collectorServer.start()
}
def cleanup() {
collectorServer.shutdownNow()
collectorServer.awaitTermination(5, TimeUnit.SECONDS)
}
static def availablePort() {
def sock = new ServerSocket(0);
def port = sock.getLocalPort()
sock.close()
return port
}
static final class Collector extends MetricsServiceGrpc.MetricsServiceImplBase {
private final List<ResourceMetrics> receivedMetrics = new ArrayList<>()
private final Object monitor = new Object()
@Override
void export(
ExportMetricsServiceRequest request,
StreamObserver<ExportMetricsServiceResponse> responseObserver) {
synchronized (receivedMetrics) {
receivedMetrics.addAll(request.resourceMetricsList)
}
synchronized (monitor) {
monitor.notify()
}
responseObserver.onNext(ExportMetricsServiceResponse.newBuilder().build())
responseObserver.onCompleted()
}
List<ResourceMetrics> getReceivedMetrics() {
List<ResourceMetrics> received
try {
synchronized (monitor) {
monitor.wait(15000)
}
} catch (final InterruptedException e) {
assertTrue(e.message, false)
}
synchronized (receivedMetrics) {
received = new ArrayList<>(receivedMetrics)
receivedMetrics.clear()
}
return received
}
}
static final String expectedMeterVersion() {
// Automatically set by gradle when running the tests
def version = System.getProperty("gradle.project.version")
assert version != null && version != ""
return version
}
}

View File

@ -16,7 +16,6 @@ import javax.management.remote.JMXServiceURL
import spock.lang.Shared import spock.lang.Shared
import spock.lang.Specification import spock.lang.Specification
class OtelHelperJmxTest extends Specification { class OtelHelperJmxTest extends Specification {
static String thingName = 'io.opentelemetry.extensions.metrics.jmx:type=OtelHelperJmxTest.Thing' static String thingName = 'io.opentelemetry.extensions.metrics.jmx:type=OtelHelperJmxTest.Thing'
@ -35,12 +34,19 @@ class OtelHelperJmxTest extends Specification {
} }
private JMXServiceURL setupServer(Map env) { private JMXServiceURL setupServer(Map env) {
def serviceUrl = new JMXServiceURL('rmi', 'localhost', OtlpIntegrationTest.availablePort()) def serviceUrl = new JMXServiceURL('rmi', 'localhost', availablePort())
jmxServer = JMXConnectorServerFactory.newJMXConnectorServer(serviceUrl, env, getPlatformMBeanServer()) jmxServer = JMXConnectorServerFactory.newJMXConnectorServer(serviceUrl, env, getPlatformMBeanServer())
jmxServer.start() jmxServer.start()
return jmxServer.getAddress() return jmxServer.getAddress()
} }
private static def availablePort() {
def sock = new ServerSocket(0);
def port = sock.getLocalPort()
sock.close()
return port
}
private OtelHelper setupHelper(JmxConfig config) { private OtelHelper setupHelper(JmxConfig config) {
return new OtelHelper(new JmxClient(config), new GroovyMetricEnvironment(config)) return new OtelHelper(new JmxClient(config), new GroovyMetricEnvironment(config))
} }

View File

@ -1,92 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxmetrics
import io.opentelemetry.proto.common.v1.AnyValue
import io.opentelemetry.proto.common.v1.InstrumentationLibrary
import io.opentelemetry.proto.common.v1.KeyValue
import io.opentelemetry.proto.metrics.v1.Histogram
import io.opentelemetry.proto.metrics.v1.HistogramDataPoint
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics
import io.opentelemetry.proto.metrics.v1.Metric
import io.opentelemetry.proto.metrics.v1.ResourceMetrics
import org.testcontainers.Testcontainers
import spock.lang.Requires
import spock.lang.Timeout
import spock.lang.Unroll
@Requires({
System.getProperty('ojc.integration.tests') == 'true'
})
@Timeout(90)
class OtlpIntegrationTests extends OtlpIntegrationTest {
@Unroll
def 'end to end with stdin config: #useStdin'() {
setup: 'we configure JMX metrics gatherer and target server'
targets = ["cassandra"]
Testcontainers.exposeHostPorts(otlpPort)
configureContainers('otlp_config.properties', otlpPort, 0, useStdin)
expect:
when: 'we receive metrics from the JMX metrics gatherer'
List<ResourceMetrics> receivedMetrics = collector.receivedMetrics
then: 'they are of the expected size'
receivedMetrics.size() == 1
when: "we examine the received metric's instrumentation library metrics lists"
ResourceMetrics receivedMetric = receivedMetrics.get(0)
List<InstrumentationLibraryMetrics> ilMetrics =
receivedMetric.instrumentationLibraryMetricsList
then: 'they of the expected size'
ilMetrics.size() == 1
when: 'we examine the instrumentation library'
InstrumentationLibraryMetrics ilMetric = ilMetrics.get(0)
InstrumentationLibrary il = ilMetric.instrumentationLibrary
then: 'it is of the expected content'
il.name == 'io.opentelemetry.contrib.jmxmetrics'
il.version == expectedMeterVersion()
when: 'we examine the instrumentation library metric metrics list'
List<Metric> metrics = ilMetric.metricsList
then: 'it is of the expected size'
metrics.size() == 1
when: 'we examine the metric metadata'
Metric metric = metrics.get(0)
then: 'it is of the expected content'
metric.name == 'cassandra.storage.load'
metric.description == 'Size, in bytes, of the on disk data size this node manages'
metric.unit == 'By'
metric.hasHistogram()
when: 'we examine the datapoints'
Histogram datapoints = metric.histogram
then: 'they are of the expected size'
datapoints.dataPointsCount == 1
when: 'we example the datapoint labels and sum'
HistogramDataPoint datapoint = datapoints.getDataPoints(0)
List<KeyValue> attributes = datapoint.attributesList
def sum = datapoint.sum
then: 'they are of the expected content'
attributes.size() == 1
attributes.get(0) == KeyValue.newBuilder().setKey("myKey").setValue(AnyValue.newBuilder().setStringValue("myVal")).build()
datapoint.count == 1
datapoint.sum == sum
cleanup:
targetContainers.each { it.stop() }
jmxExtensionAppContainer.stop()
where:
useStdin | _
false | _
true | _
}
}

View File

@ -1,223 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxmetrics
import static org.awaitility.Awaitility.await
import io.opentelemetry.proto.common.v1.InstrumentationLibrary
import io.opentelemetry.proto.common.v1.KeyValue
import io.opentelemetry.proto.metrics.v1.Gauge
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics
import io.opentelemetry.proto.metrics.v1.Metric
import io.opentelemetry.proto.metrics.v1.ResourceMetrics
import io.opentelemetry.proto.metrics.v1.Sum
import java.util.concurrent.TimeUnit
import org.testcontainers.Testcontainers
import spock.lang.Requires
import spock.lang.Timeout
@Requires({
System.getProperty('ojc.integration.tests') == 'true'
})
@Timeout(90)
class CassandraIntegrationTests extends OtlpIntegrationTest {
def 'end to end'() {
setup: 'we configure JMX metrics gatherer and target server to use Cassandra as target system'
targets = ["cassandra"]
Testcontainers.exposeHostPorts(otlpPort)
configureContainers('target-systems/cassandra.properties', otlpPort, 0, false)
ArrayList<Metric> metrics
await().atMost(30, TimeUnit.SECONDS).untilAsserted {
List<ResourceMetrics> receivedMetrics = collector.receivedMetrics
assert receivedMetrics.size() == 1
ResourceMetrics receivedMetric = receivedMetrics.get(0)
List<InstrumentationLibraryMetrics> ilMetrics =
receivedMetric.instrumentationLibraryMetricsList
assert ilMetrics.size() == 1
InstrumentationLibraryMetrics ilMetric = ilMetrics.get(0)
InstrumentationLibrary il = ilMetric.instrumentationLibrary
assert il.name == 'io.opentelemetry.contrib.jmxmetrics'
assert il.version == expectedMeterVersion()
metrics = ilMetric.metricsList as ArrayList
metrics.sort{ a, b -> a.name <=> b.name}
assert metrics.size() == 23
}
def expectedMetrics = [
[
'cassandra.client.request.range_slice.latency.50p',
'Token range read request latency - 50th percentile',
'µs',
Gauge
],
[
'cassandra.client.request.range_slice.latency.99p',
'Token range read request latency - 99th percentile',
'µs',
Gauge
],
[
'cassandra.client.request.range_slice.latency.count',
'Total token range read request latency',
'µs',
Sum
],
[
'cassandra.client.request.range_slice.latency.max',
'Maximum token range read request latency',
'µs',
Gauge,
],
[
'cassandra.client.request.range_slice.timeout.count',
'Number of token range read request timeouts encountered',
'1',
Sum,
],
[
'cassandra.client.request.range_slice.unavailable.count',
'Number of token range read request unavailable exceptions encountered',
'1',
Sum,
],
[
'cassandra.client.request.read.latency.50p',
'Standard read request latency - 50th percentile',
'µs',
Gauge,
],
[
'cassandra.client.request.read.latency.99p',
'Standard read request latency - 99th percentile',
'µs',
Gauge,
],
[
'cassandra.client.request.read.latency.count',
'Total standard read request latency',
'µs',
Sum,
],
[
'cassandra.client.request.read.latency.max',
'Maximum standard read request latency',
'µs',
Gauge,
],
[
'cassandra.client.request.read.timeout.count',
'Number of standard read request timeouts encountered',
'1',
Sum,
],
[
'cassandra.client.request.read.unavailable.count',
'Number of standard read request unavailable exceptions encountered',
'1',
Sum,
],
[
'cassandra.client.request.write.latency.50p',
'Regular write request latency - 50th percentile',
'µs',
Gauge,
],
[
'cassandra.client.request.write.latency.99p',
'Regular write request latency - 99th percentile',
'µs',
Gauge,
],
[
'cassandra.client.request.write.latency.count',
'Total regular write request latency',
'µs',
Sum,
],
[
'cassandra.client.request.write.latency.max',
'Maximum regular write request latency',
'µs',
Gauge,
],
[
'cassandra.client.request.write.timeout.count',
'Number of regular write request timeouts encountered',
'1',
Sum,
],
[
'cassandra.client.request.write.unavailable.count',
'Number of regular write request unavailable exceptions encountered',
'1',
Sum,
],
[
'cassandra.compaction.tasks.completed',
'Number of completed compactions since server [re]start',
'1',
Sum,
],
[
'cassandra.compaction.tasks.pending',
'Estimated number of compactions remaining to perform',
'1',
Gauge,
],
[
'cassandra.storage.load.count',
'Size of the on disk data size this node manages',
'by',
Sum,
],
[
'cassandra.storage.total_hints.count',
'Number of hint messages written to this node since [re]start',
'1',
Sum,
],
[
'cassandra.storage.total_hints.in_progress.count',
'Number of hints attempting to be sent currently',
'1',
Sum,
],
].eachWithIndex{ item, index ->
Metric metric = metrics.get(index)
assert metric.name == item[0]
assert metric.description == item[1]
assert metric.unit == item[2]
def datapoint
switch(item[3]) {
case Gauge:
assert metric.hasGauge()
Gauge datapoints = metric.gauge
assert datapoints.dataPointsCount == 1
datapoint = datapoints.getDataPoints(0)
break
case Sum:
assert metric.hasSum()
Sum datapoints = metric.sum
assert datapoints.dataPointsCount == 1
datapoint = datapoints.getDataPoints(0)
break
default:
assert false, "Invalid expected data type: ${item[3]}"
}
List<KeyValue> labels = datapoint.attributesList
assert labels.size() == 0
}
cleanup:
targetContainers.each { it.stop() }
jmxExtensionAppContainer.stop()
}
}

View File

@ -1,244 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxmetrics
import static org.awaitility.Awaitility.await
import io.opentelemetry.proto.common.v1.InstrumentationLibrary
import io.opentelemetry.proto.common.v1.KeyValue
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics
import io.opentelemetry.proto.metrics.v1.IntGauge
import io.opentelemetry.proto.metrics.v1.IntSum
import io.opentelemetry.proto.metrics.v1.Metric
import io.opentelemetry.proto.metrics.v1.ResourceMetrics
import java.util.concurrent.TimeUnit
import org.testcontainers.Testcontainers
import spock.lang.Requires
import spock.lang.Timeout
@Requires({
System.getProperty('ojc.integration.tests') == 'true'
})
@Timeout(90)
class JVMTargetSystemIntegrationTests extends OtlpIntegrationTest {
def 'end to end'() {
setup: 'we configure JMX metrics gatherer and target server to use JVM as target system'
targets = ["cassandra"]
Testcontainers.exposeHostPorts(otlpPort)
configureContainers('target-systems/jvm.properties', otlpPort, 0, false)
ArrayList<Metric> metrics
await().atMost(30, TimeUnit.SECONDS).untilAsserted {
List<ResourceMetrics> receivedMetrics = collector.receivedMetrics
assert receivedMetrics.size() == 1
ResourceMetrics receivedMetric = receivedMetrics.get(0)
List<InstrumentationLibraryMetrics> ilMetrics =
receivedMetric.instrumentationLibraryMetricsList
assert ilMetrics.size() == 1
InstrumentationLibraryMetrics ilMetric = ilMetrics.get(0)
InstrumentationLibrary il = ilMetric.instrumentationLibrary
assert il.name == 'io.opentelemetry.contrib.jmxmetrics'
assert il.version == expectedMeterVersion()
when: 'we examine the instrumentation library metric metrics list'
metrics = ilMetric.metricsList as ArrayList
metrics.sort{ a, b -> a.name <=> b.name}
assert metrics.size() == 16
}
def expectedMetrics = [
[
'jvm.classes.loaded',
'number of loaded classes',
'1',
[],
IntGauge
],
[
'jvm.gc.collections.count',
'total number of collections that have occurred',
'1',
[
"ConcurrentMarkSweep",
"ParNew"
],
IntSum
],
[
'jvm.gc.collections.elapsed',
'the approximate accumulated collection elapsed time in milliseconds',
'ms',
[
"ConcurrentMarkSweep",
"ParNew"
],
IntSum
],
[
'jvm.memory.heap.committed',
'current heap usage',
'by',
[],
IntGauge
],
[
'jvm.memory.heap.init',
'current heap usage',
'by',
[],
IntGauge
],
[
'jvm.memory.heap.max',
'current heap usage',
'by',
[],
IntGauge
],
[
'jvm.memory.heap.used',
'current heap usage',
'by',
[],
IntGauge
],
[
'jvm.memory.nonheap.committed',
'current non-heap usage',
'by',
[],
IntGauge
],
[
'jvm.memory.nonheap.init',
'current non-heap usage',
'by',
[],
IntGauge
],
[
'jvm.memory.nonheap.max',
'current non-heap usage',
'by',
[],
IntGauge
],
[
'jvm.memory.nonheap.used',
'current non-heap usage',
'by',
[],
IntGauge
],
[
'jvm.memory.pool.committed',
'current memory pool usage',
'by',
[
"Code Cache",
"Par Eden Space",
"CMS Old Gen",
"Compressed Class Space",
"Metaspace",
"Par Survivor Space"
],
IntGauge
],
[
'jvm.memory.pool.init',
'current memory pool usage',
'by',
[
"Code Cache",
"Par Eden Space",
"CMS Old Gen",
"Compressed Class Space",
"Metaspace",
"Par Survivor Space"
],
IntGauge
],
[
'jvm.memory.pool.max',
'current memory pool usage',
'by',
[
"Code Cache",
"Par Eden Space",
"CMS Old Gen",
"Compressed Class Space",
"Metaspace",
"Par Survivor Space"
],
IntGauge
],
[
'jvm.memory.pool.used',
'current memory pool usage',
'by',
[
"Code Cache",
"Par Eden Space",
"CMS Old Gen",
"Compressed Class Space",
"Metaspace",
"Par Survivor Space"
],
IntGauge
],
[
'jvm.threads.count',
'number of threads',
'1',
[],
IntGauge
],
].eachWithIndex{ item, index ->
def expectedType = item[4]
Metric metric = metrics.get(index)
assert metric.name == item[0]
assert metric.description == item[1]
assert metric.unit == item[2]
def datapoints
if (expectedType == IntGauge) {
assert metric.hasGauge()
datapoints = metric.gauge
} else {
assert metric.hasSum()
datapoints = metric.sum
}
def expectedLabelCount = item[3].size()
def expectedLabels = item[3] as Set
def expectedDatapointCount = expectedLabelCount == 0 ? 1 : expectedLabelCount
assert datapoints.dataPointsCount == expectedDatapointCount
(0..<expectedDatapointCount).each { i ->
def datapoint = datapoints.getDataPoints(i)
List<KeyValue> labels = datapoint.attributesList
if (expectedLabelCount != 0) {
assert labels.size() == 1
assert labels[0].key == 'name'
def value = labels[0].value.stringValue
assert expectedLabels.remove(value)
} else {
assert labels.size() == 0
}
}
assert expectedLabels.size() == 0
}
cleanup:
targetContainers.each { it.stop() }
jmxExtensionAppContainer.stop()
}
}

View File

@ -1,144 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxmetrics
import static org.awaitility.Awaitility.await
import io.opentelemetry.proto.common.v1.KeyValue
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics
import io.opentelemetry.proto.metrics.v1.Metric
import io.opentelemetry.proto.metrics.v1.ResourceMetrics
import java.util.concurrent.TimeUnit
import org.testcontainers.Testcontainers
import spock.lang.Requires
import spock.lang.Timeout
@Requires({
System.getProperty('ojc.integration.tests') == 'true'
})
@Timeout(90)
class KafkaConsumerTargetSystemIntegrationTests extends OtlpIntegrationTest {
def 'end to end'() {
setup: 'we configure JMX metrics gatherer and target server to use Kafka as target system'
targets = ["kafka-consumer"]
Testcontainers.exposeHostPorts(otlpPort)
configureContainers('target-systems/kafka-consumer.properties', otlpPort, 0, false)
ArrayList<Metric> metrics
await().atMost(30, TimeUnit.SECONDS).untilAsserted {
List<ResourceMetrics> receivedMetrics = collector.receivedMetrics
assert receivedMetrics.size() == 1
ResourceMetrics receivedMetric = receivedMetrics.get(0)
List<InstrumentationLibraryMetrics> ilMetrics =
receivedMetric.instrumentationLibraryMetricsList
assert ilMetrics.size() == 1
InstrumentationLibraryMetrics ilMetric = ilMetrics.get(0)
metrics = ilMetric.metricsList as ArrayList
metrics.sort{ a, b -> a.name <=> b.name}
assert metrics.size() == 8
}
def expectedTopics = [
'test-topic-1',
'test-topic-2',
'test-topic-3'
]
[
[
'kafka.consumer.bytes-consumed-rate',
'The average number of bytes consumed per second',
'by',
['client-id' : '', 'topic' : expectedTopics.clone() ],
],
[
'kafka.consumer.fetch-rate',
'The number of fetch requests for all topics per second',
'1',
['client-id' : '']
],
[
'kafka.consumer.fetch-size-avg',
'The average number of bytes fetched per request',
'by',
['client-id' : '', 'topic' : expectedTopics.clone() ],
],
[
'kafka.consumer.records-consumed-rate',
'The average number of records consumed per second',
'1',
['client-id' : '', 'topic' : expectedTopics.clone() ],
],
[
'kafka.consumer.records-lag-max',
'Number of messages the consumer lags behind the producer',
'1',
['client-id' : '']
],
[
'kafka.consumer.total.bytes-consumed-rate',
'The average number of bytes consumed for all topics per second',
'by',
['client-id' : '']
],
[
'kafka.consumer.total.fetch-size-avg',
'The average number of bytes fetched per request for all topics',
'by',
['client-id' : '']
],
[
'kafka.consumer.total.records-consumed-rate',
'The average number of records consumed for all topics per second',
'1',
['client-id' : '']
],
].eachWithIndex{ item, index ->
Metric metric = metrics.get(index)
assert metric.name == item[0]
assert metric.description == item[1]
assert metric.unit == item[2]
assert metric.hasGauge()
def datapoints = metric.gauge
Map<String, String> expectedLabels = item[3]
def expectedLabelCount = expectedLabels.size()
assert datapoints.dataPointsCount == expectedLabelCount == 1 ? 1 : 3
(0..<datapoints.dataPointsCount).each { i ->
def datapoint = datapoints.getDataPoints(i)
List<KeyValue> labels = datapoint.attributesList
assert labels.size() == expectedLabelCount
(0..<expectedLabelCount).each { j ->
def key = labels[j].key
assert expectedLabels.containsKey(key)
def value = expectedLabels[key]
if (!value.empty) {
def actual = labels[j].value.stringValue
assert value.contains(actual)
value.remove(actual)
if (value.empty) {
expectedLabels.remove(key)
}
}
}
}
assert expectedLabels == ['client-id': '']
}
cleanup:
targetContainers.each { it.stop() }
jmxExtensionAppContainer.stop()
}
}

View File

@ -1,149 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxmetrics
import static org.awaitility.Awaitility.await
import io.opentelemetry.proto.common.v1.KeyValue
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics
import io.opentelemetry.proto.metrics.v1.Metric
import io.opentelemetry.proto.metrics.v1.ResourceMetrics
import java.util.concurrent.TimeUnit
import org.testcontainers.Testcontainers
import spock.lang.Requires
import spock.lang.Timeout
@Requires({
System.getProperty('ojc.integration.tests') == 'true'
})
@Timeout(90)
class KafkaProducerTargetSystemIntegrationTests extends OtlpIntegrationTest {
def 'end to end'() {
setup: 'we configure JMX metrics gatherer and target server to use Kafka as target system'
targets = ["kafka-producer"]
Testcontainers.exposeHostPorts(otlpPort)
configureContainers('target-systems/kafka-producer.properties', otlpPort, 0, false)
ArrayList<Metric> metrics
await().atMost(30, TimeUnit.SECONDS).untilAsserted {
List<ResourceMetrics> receivedMetrics = collector.receivedMetrics
assert receivedMetrics.size() == 1
ResourceMetrics receivedMetric = receivedMetrics.get(0)
List<InstrumentationLibraryMetrics> ilMetrics =
receivedMetric.instrumentationLibraryMetricsList
assert ilMetrics.size() == 1
InstrumentationLibraryMetrics ilMetric = ilMetrics.get(0)
metrics = ilMetric.metricsList as ArrayList
metrics.sort{ a, b -> a.name <=> b.name}
metrics.size() == 10
}
[
[
"kafka.producer.byte-rate",
"The average number of bytes sent per second for a topic",
"by",
['client-id' : '', 'topic' : ['test-topic-1']],
],
[
"kafka.producer.compression-rate",
"The average compression rate of record batches for a topic",
"1",
['client-id' : '', 'topic' : ['test-topic-1']],
],
[
"kafka.producer.io-wait-time-ns-avg",
"The average length of time the I/O thread spent waiting for a socket ready for reads or writes",
"ns",
['client-id' : ''],
],
[
"kafka.producer.outgoing-byte-rate",
"The average number of outgoing bytes sent per second to all servers",
"by",
['client-id' : ''],
],
[
"kafka.producer.record-error-rate",
"The average per-second number of record sends that resulted in errors for a topic",
"1",
['client-id' : '', 'topic' : ['test-topic-1']],
],
[
"kafka.producer.record-retry-rate",
"The average per-second number of retried record sends for a topic",
"1",
['client-id' : '', 'topic' : ['test-topic-1']],
],
[
"kafka.producer.record-send-rate",
"The average number of records sent per second for a topic",
"1",
['client-id' : '', 'topic' : ['test-topic-1']],
],
[
"kafka.producer.request-latency-avg",
"The average request latency",
"ms",
['client-id' : ''],
],
[
"kafka.producer.request-rate",
"The average number of requests sent per second",
"1",
['client-id' : ''],
],
[
"kafka.producer.response-rate",
"Responses received per second",
"1",
['client-id' : ''],
],
].eachWithIndex{ item, index ->
Metric metric = metrics.get(index)
assert metric.name == item[0]
assert metric.description == item[1]
assert metric.unit == item[2]
assert metric.hasGauge()
def datapoints = metric.gauge
Map<String, String> expectedLabels = item[3]
def expectedLabelCount = expectedLabels.size()
assert datapoints.dataPointsCount == 1
def datapoint = datapoints.getDataPoints(0)
List<KeyValue> labels = datapoint.attributesList
assert labels.size() == expectedLabelCount
(0..<expectedLabelCount).each { j ->
def key = labels[j].key
assert expectedLabels.containsKey(key)
def value = expectedLabels[key]
if (!value.empty) {
def actual = labels[j].value.stringValue
assert value.contains(actual)
value.remove(actual)
if (value.empty) {
expectedLabels.remove(key)
}
}
}
assert expectedLabels == ['client-id': '']
}
cleanup:
targetContainers.each { it.stop() }
println jmxExtensionAppContainer.getLogs()
jmxExtensionAppContainer.stop()
}
}

View File

@ -1,206 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxmetrics
import static org.awaitility.Awaitility.await
import io.opentelemetry.proto.common.v1.KeyValue
import io.opentelemetry.proto.metrics.v1.Gauge
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics
import io.opentelemetry.proto.metrics.v1.Metric
import io.opentelemetry.proto.metrics.v1.ResourceMetrics
import io.opentelemetry.proto.metrics.v1.Sum
import java.util.concurrent.TimeUnit
import org.testcontainers.Testcontainers
import spock.lang.Requires
import spock.lang.Timeout
@Requires({
System.getProperty('ojc.integration.tests') == 'true'
})
@Timeout(90)
class KafkaTargetSystemIntegrationTests extends OtlpIntegrationTest {
def 'end to end'() {
setup: 'we configure JMX metrics gatherer and target server to use Kafka as target system'
targets = ["kafka"]
Testcontainers.exposeHostPorts(otlpPort)
configureContainers('target-systems/kafka.properties', otlpPort, 0, false)
ArrayList<Metric> metrics
await().atMost(30, TimeUnit.SECONDS).untilAsserted {
List<ResourceMetrics> receivedMetrics = collector.receivedMetrics
assert receivedMetrics.size() == 1
ResourceMetrics receivedMetric = receivedMetrics.get(0)
List<InstrumentationLibraryMetrics> ilMetrics =
receivedMetric.instrumentationLibraryMetricsList
assert ilMetrics.size() == 1
InstrumentationLibraryMetrics ilMetric = ilMetrics.get(0)
metrics = ilMetric.metricsList as ArrayList
metrics.sort{ a, b -> a.name <=> b.name}
assert metrics.size() == 21
}
def expectedMetrics = [
[
'kafka.bytes.in',
'bytes in per second from clients',
'by',
Gauge,
],
[
'kafka.bytes.out',
'bytes out per second to clients',
'by',
Gauge,
],
[
'kafka.controller.active.count',
'controller is active on broker',
'1',
Gauge,
],
[
'kafka.fetch.consumer.total.time.99p',
'fetch consumer request time - 99th percentile',
'ms',
Gauge,
],
[
'kafka.fetch.consumer.total.time.count',
'fetch consumer request count',
'1',
Sum,
],
[
'kafka.fetch.consumer.total.time.median',
'fetch consumer request time - 50th percentile',
'ms',
Gauge,
],
[
'kafka.fetch.follower.total.time.99p',
'fetch follower request time - 99th percentile',
'ms',
Gauge,
],
[
'kafka.fetch.follower.total.time.count',
'fetch follower request count',
'1',
Sum,
],
[
'kafka.fetch.follower.total.time.median',
'fetch follower request time - 50th percentile',
'ms',
Gauge,
],
[
'kafka.isr.expands',
'in-sync replica expands per second',
'1',
Gauge,
],
[
'kafka.isr.shrinks',
'in-sync replica shrinks per second',
'1',
Gauge,
],
[
'kafka.leader.election.rate',
'leader election rate - non-zero indicates broker failures',
'1',
Gauge,
],
[
'kafka.max.lag',
'max lag in messages between follower and leader replicas',
'1',
Gauge,
],
[
'kafka.messages.in',
'number of messages in per second',
'1',
Gauge,
],
[
'kafka.partitions.offline.count',
'number of partitions without an active leader',
'1',
Gauge,
],
[
'kafka.partitions.underreplicated.count',
'number of under replicated partitions',
'1',
Gauge,
],
[
'kafka.produce.total.time.99p',
'produce request time - 99th percentile',
'ms',
Gauge,
],
[
'kafka.produce.total.time.count',
'produce request count',
'1',
Sum,
],
[
'kafka.produce.total.time.median',
'produce request time - 50th percentile',
'ms',
Gauge,
],
[
'kafka.request.queue',
'size of the request queue',
'1',
Gauge,
],
[
'kafka.unclean.election.rate',
'unclean leader election rate - non-zero indicates broker failures',
'1',
Gauge,
],
].eachWithIndex{ item, index ->
Metric metric = metrics.get(index)
assert metric.name == item[0]
assert metric.description == item[1]
assert metric.unit == item[2]
def datapoint
switch(item[3]) {
case Gauge:
assert metric.hasGauge()
Gauge datapoints = metric.gauge
assert datapoints.dataPointsCount == 1
datapoint = datapoints.getDataPoints(0)
break
case Sum:
assert metric.hasSum()
Sum datapoints = metric.sum
assert datapoints.dataPointsCount == 1
datapoint = datapoints.getDataPoints(0)
break
default:
assert false, "Invalid expected data type: ${item[3]}"
}
List<KeyValue> labels = datapoint.attributesList
assert labels.size() == 0
}
cleanup:
targetContainers.each { it.stop() }
jmxExtensionAppContainer.stop()
}
}

View File

@ -1,401 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.jmxmetrics
import static org.awaitility.Awaitility.await
import io.opentelemetry.proto.common.v1.InstrumentationLibrary
import io.opentelemetry.proto.common.v1.KeyValue
import io.opentelemetry.proto.metrics.v1.Gauge
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics
import io.opentelemetry.proto.metrics.v1.Metric
import io.opentelemetry.proto.metrics.v1.ResourceMetrics
import io.opentelemetry.proto.metrics.v1.Sum
import java.util.concurrent.TimeUnit
import org.testcontainers.Testcontainers
import spock.lang.Requires
import spock.lang.Timeout
@Requires({
System.getProperty('ojc.integration.tests') == 'true'
})
@Timeout(90)
class MultipleTargetSystemsIntegrationTests extends OtlpIntegrationTest {
def 'end to end'() {
setup: 'we configure JMX metrics gatherer and target server to use JVM and Kafka as target systems'
targets = ["kafka"]
Testcontainers.exposeHostPorts(otlpPort)
configureContainers('target-systems/jvm-and-kafka.properties', otlpPort, 0, false)
ArrayList<Metric> metrics
await().atMost(30, TimeUnit.SECONDS).untilAsserted {
List<ResourceMetrics> receivedMetrics = collector.receivedMetrics
assert receivedMetrics.size() == 1
ResourceMetrics receivedMetric = receivedMetrics.get(0)
List<InstrumentationLibraryMetrics> ilMetrics =
receivedMetric.instrumentationLibraryMetricsList
assert ilMetrics.size() == 1
InstrumentationLibraryMetrics ilMetric = ilMetrics.get(0)
InstrumentationLibrary il = ilMetric.instrumentationLibrary
assert il.name == 'io.opentelemetry.contrib.jmxmetrics'
assert il.version == expectedMeterVersion()
metrics = ilMetric.metricsList as ArrayList
metrics.sort{ a, b -> a.name <=> b.name}
assert metrics.size() == 37
}
def expectedMetrics = [
[
'jvm.classes.loaded',
'number of loaded classes',
'1',
[],
Gauge
],
[
'jvm.gc.collections.count',
'total number of collections that have occurred',
'1',
[
"G1 Young Generation",
"G1 Old Generation",
],
Sum
],
[
'jvm.gc.collections.elapsed',
'the approximate accumulated collection elapsed time in milliseconds',
'ms',
[
"G1 Young Generation",
"G1 Old Generation",
],
Sum
],
[
'jvm.memory.heap.committed',
'current heap usage',
'by',
[],
Gauge
],
[
'jvm.memory.heap.init',
'current heap usage',
'by',
[],
Gauge
],
[
'jvm.memory.heap.max',
'current heap usage',
'by',
[],
Gauge
],
[
'jvm.memory.heap.used',
'current heap usage',
'by',
[],
Gauge
],
[
'jvm.memory.nonheap.committed',
'current non-heap usage',
'by',
[],
Gauge
],
[
'jvm.memory.nonheap.init',
'current non-heap usage',
'by',
[],
Gauge
],
[
'jvm.memory.nonheap.max',
'current non-heap usage',
'by',
[],
Gauge
],
[
'jvm.memory.nonheap.used',
'current non-heap usage',
'by',
[],
Gauge
],
[
'jvm.memory.pool.committed',
'current memory pool usage',
'by',
[
"CodeHeap 'non-nmethods'",
"CodeHeap 'non-profiled nmethods'",
"CodeHeap 'profiled nmethods'",
"Compressed Class Space",
"G1 Eden Space",
"G1 Old Gen",
"G1 Survivor Space",
"Metaspace",
],
Gauge
],
[
'jvm.memory.pool.init',
'current memory pool usage',
'by',
[
"CodeHeap 'non-nmethods'",
"CodeHeap 'non-profiled nmethods'",
"CodeHeap 'profiled nmethods'",
"Compressed Class Space",
"G1 Eden Space",
"G1 Old Gen",
"G1 Survivor Space",
"Metaspace",
],
Gauge
],
[
'jvm.memory.pool.max',
'current memory pool usage',
'by',
[
"CodeHeap 'non-nmethods'",
"CodeHeap 'non-profiled nmethods'",
"CodeHeap 'profiled nmethods'",
"Compressed Class Space",
"G1 Eden Space",
"G1 Old Gen",
"G1 Survivor Space",
"Metaspace",
],
Gauge
],
[
'jvm.memory.pool.used',
'current memory pool usage',
'by',
[
"CodeHeap 'non-nmethods'",
"CodeHeap 'non-profiled nmethods'",
"CodeHeap 'profiled nmethods'",
"Compressed Class Space",
"G1 Eden Space",
"G1 Old Gen",
"G1 Survivor Space",
"Metaspace",
],
Gauge
],
[
'jvm.threads.count',
'number of threads',
'1',
[],
Gauge
],
[
'kafka.bytes.in',
'bytes in per second from clients',
'by',
[],
Gauge,
],
[
'kafka.bytes.out',
'bytes out per second to clients',
'by',
[],
Gauge,
],
[
'kafka.controller.active.count',
'controller is active on broker',
'1',
[],
Gauge,
],
[
'kafka.fetch.consumer.total.time.99p',
'fetch consumer request time - 99th percentile',
'ms',
[],
Gauge,
],
[
'kafka.fetch.consumer.total.time.count',
'fetch consumer request count',
'1',
[],
Sum,
],
[
'kafka.fetch.consumer.total.time.median',
'fetch consumer request time - 50th percentile',
'ms',
[],
Gauge,
],
[
'kafka.fetch.follower.total.time.99p',
'fetch follower request time - 99th percentile',
'ms',
[],
Gauge,
],
[
'kafka.fetch.follower.total.time.count',
'fetch follower request count',
'1',
[],
Sum,
],
[
'kafka.fetch.follower.total.time.median',
'fetch follower request time - 50th percentile',
'ms',
[],
Gauge,
],
[
'kafka.isr.expands',
'in-sync replica expands per second',
'1',
[],
Gauge,
],
[
'kafka.isr.shrinks',
'in-sync replica shrinks per second',
'1',
[],
Gauge,
],
[
'kafka.leader.election.rate',
'leader election rate - non-zero indicates broker failures',
'1',
[],
Gauge,
],
[
'kafka.max.lag',
'max lag in messages between follower and leader replicas',
'1',
[],
Gauge,
],
[
'kafka.messages.in',
'number of messages in per second',
'1',
[],
Gauge,
],
[
'kafka.partitions.offline.count',
'number of partitions without an active leader',
'1',
[],
Gauge,
],
[
'kafka.partitions.underreplicated.count',
'number of under replicated partitions',
'1',
[],
Gauge,
],
[
'kafka.produce.total.time.99p',
'produce request time - 99th percentile',
'ms',
[],
Gauge,
],
[
'kafka.produce.total.time.count',
'produce request count',
'1',
[],
Sum,
],
[
'kafka.produce.total.time.median',
'produce request time - 50th percentile',
'ms',
[],
Gauge,
],
[
'kafka.request.queue',
'size of the request queue',
'1',
[],
Gauge,
],
[
'kafka.unclean.election.rate',
'unclean leader election rate - non-zero indicates broker failures',
'1',
[],
Gauge,
],
].eachWithIndex{ item, index ->
def expectedType = item[4]
Metric metric = metrics.get(index)
assert metric.name == item[0]
assert metric.description == item[1]
assert metric.unit == item[2]
def datapoints
switch(expectedType) {
case Gauge :
assert metric.hasGauge()
datapoints = metric.gauge
break
default:
assert metric.hasSum()
datapoints = metric.sum
}
def expectedLabelCount = item[3].size()
def expectedLabels = item[3] as Set
def expectedDatapointCount = expectedLabelCount == 0 ? 1 : expectedLabelCount
assert datapoints.dataPointsCount == expectedDatapointCount
(0..<expectedDatapointCount).each { i ->
def datapoint = datapoints.getDataPoints(i)
List<KeyValue> labels = datapoint.attributesList
if (expectedLabelCount != 0) {
assert labels.size() == 1
assert labels[0].key == 'name'
def value = labels[0].value.stringValue
assert expectedLabels.remove(value)
} else {
assert labels.size() == 0
}
}
assert expectedLabels.size() == 0
}
cleanup:
targetContainers.each { it.stop() }
jmxExtensionAppContainer.stop()
}
}