From 33d2e40a9e511f927924ecaf6edde2ed743edd96 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Fri, 26 Aug 2022 01:22:38 +0300 Subject: [PATCH] Fix flaky kafka metrics test (#6511) --- .../OpenTelemetryMetricsReporterTest.java | 30 +------------------ .../testing/LibraryTestRunner.java | 18 ++++++----- 2 files changed, 12 insertions(+), 36 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java index ddf5bf6f3b..304eb58295 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java @@ -273,18 +273,8 @@ class OpenTelemetryMetricsReporterTest { "kafka.consumer.successful_reauthentication_total", "kafka.consumer.time_between_poll_avg", "kafka.consumer.time_between_poll_max", - "kafka.consumer.incoming_byte_rate", - "kafka.consumer.incoming_byte_total", - "kafka.consumer.outgoing_byte_rate", - "kafka.consumer.outgoing_byte_total", "kafka.consumer.request_latency_avg", "kafka.consumer.request_latency_max", - "kafka.consumer.request_rate", - "kafka.consumer.request_size_avg", - "kafka.consumer.request_size_max", - "kafka.consumer.request_total", - "kafka.consumer.response_rate", - "kafka.consumer.response_total", "kafka.producer.batch_size_avg", "kafka.producer.batch_size_max", "kafka.producer.batch_split_rate", @@ -350,27 +340,9 @@ class OpenTelemetryMetricsReporterTest { "kafka.producer.successful_reauthentication_rate", "kafka.producer.successful_reauthentication_total", "kafka.producer.waiting_threads", - "kafka.producer.incoming_byte_rate", - "kafka.producer.incoming_byte_total", - "kafka.producer.outgoing_byte_rate", - "kafka.producer.outgoing_byte_total", - "kafka.producer.request_latency_avg", - "kafka.producer.request_latency_max", - "kafka.producer.request_rate", - "kafka.producer.request_size_avg", - "kafka.producer.request_size_max", - "kafka.producer.request_total", - "kafka.producer.response_rate", - "kafka.producer.response_total", "kafka.producer.byte_rate", "kafka.producer.byte_total", - "kafka.producer.compression_rate", - "kafka.producer.record_error_rate", - "kafka.producer.record_error_total", - "kafka.producer.record_retry_rate", - "kafka.producer.record_retry_total", - "kafka.producer.record_send_rate", - "kafka.producer.record_send_total")); + "kafka.producer.compression_rate")); List metrics = testing.metrics(); Set metricNames = metrics.stream().map(MetricData::getName).collect(toSet()); diff --git a/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/LibraryTestRunner.java b/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/LibraryTestRunner.java index 940c2acf89..7b88a12348 100644 --- a/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/LibraryTestRunner.java +++ b/testing-common/src/main/java/io/opentelemetry/instrumentation/testing/LibraryTestRunner.java @@ -17,6 +17,7 @@ import io.opentelemetry.sdk.logs.data.LogData; import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.export.MetricReader; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricExporter; import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; @@ -40,6 +41,7 @@ public final class LibraryTestRunner extends InstrumentationTestRunner { private static final OpenTelemetrySdk openTelemetry; private static final InMemorySpanExporter testSpanExporter; private static final InMemoryMetricExporter testMetricExporter; + private static final MetricReader metricReader; private static boolean forceFlushCalled; static { @@ -48,6 +50,13 @@ public final class LibraryTestRunner extends InstrumentationTestRunner { testSpanExporter = InMemorySpanExporter.create(); testMetricExporter = InMemoryMetricExporter.create(AggregationTemporality.DELTA); + metricReader = + PeriodicMetricReader.builder(testMetricExporter) + // Set really long interval. We'll call forceFlush when we need the metrics + // instead of collecting them periodically. + .setInterval(Duration.ofNanos(Long.MAX_VALUE)) + .build(); + openTelemetry = OpenTelemetrySdk.builder() .setTracerProvider( @@ -56,13 +65,7 @@ public final class LibraryTestRunner extends InstrumentationTestRunner { .addSpanProcessor(SimpleSpanProcessor.create(LoggingSpanExporter.create())) .addSpanProcessor(SimpleSpanProcessor.create(testSpanExporter)) .build()) - .setMeterProvider( - SdkMeterProvider.builder() - .registerMetricReader( - PeriodicMetricReader.builder(testMetricExporter) - .setInterval(Duration.ofMillis(100)) - .build()) - .build()) + .setMeterProvider(SdkMeterProvider.builder().registerMetricReader(metricReader).build()) .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) .buildAndRegisterGlobal(); } @@ -114,6 +117,7 @@ public final class LibraryTestRunner extends InstrumentationTestRunner { @Override public List getExportedMetrics() { + metricReader.forceFlush().join(10, TimeUnit.SECONDS); return testMetricExporter.getFinishedMetricItems(); }