Fix flaky kafka metrics test (#6511)

This commit is contained in:
Lauri Tulmin 2022-08-26 01:22:38 +03:00 committed by GitHub
parent dd752816b7
commit 33d2e40a9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 36 deletions

View File

@ -273,18 +273,8 @@ class OpenTelemetryMetricsReporterTest {
"kafka.consumer.successful_reauthentication_total",
"kafka.consumer.time_between_poll_avg",
"kafka.consumer.time_between_poll_max",
"kafka.consumer.incoming_byte_rate",
"kafka.consumer.incoming_byte_total",
"kafka.consumer.outgoing_byte_rate",
"kafka.consumer.outgoing_byte_total",
"kafka.consumer.request_latency_avg",
"kafka.consumer.request_latency_max",
"kafka.consumer.request_rate",
"kafka.consumer.request_size_avg",
"kafka.consumer.request_size_max",
"kafka.consumer.request_total",
"kafka.consumer.response_rate",
"kafka.consumer.response_total",
"kafka.producer.batch_size_avg",
"kafka.producer.batch_size_max",
"kafka.producer.batch_split_rate",
@ -350,27 +340,9 @@ class OpenTelemetryMetricsReporterTest {
"kafka.producer.successful_reauthentication_rate",
"kafka.producer.successful_reauthentication_total",
"kafka.producer.waiting_threads",
"kafka.producer.incoming_byte_rate",
"kafka.producer.incoming_byte_total",
"kafka.producer.outgoing_byte_rate",
"kafka.producer.outgoing_byte_total",
"kafka.producer.request_latency_avg",
"kafka.producer.request_latency_max",
"kafka.producer.request_rate",
"kafka.producer.request_size_avg",
"kafka.producer.request_size_max",
"kafka.producer.request_total",
"kafka.producer.response_rate",
"kafka.producer.response_total",
"kafka.producer.byte_rate",
"kafka.producer.byte_total",
"kafka.producer.compression_rate",
"kafka.producer.record_error_rate",
"kafka.producer.record_error_total",
"kafka.producer.record_retry_rate",
"kafka.producer.record_retry_total",
"kafka.producer.record_send_rate",
"kafka.producer.record_send_total"));
"kafka.producer.compression_rate"));
List<MetricData> metrics = testing.metrics();
Set<String> metricNames = metrics.stream().map(MetricData::getName).collect(toSet());

View File

@ -17,6 +17,7 @@ import io.opentelemetry.sdk.logs.data.LogData;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricExporter;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
@ -40,6 +41,7 @@ public final class LibraryTestRunner extends InstrumentationTestRunner {
private static final OpenTelemetrySdk openTelemetry;
private static final InMemorySpanExporter testSpanExporter;
private static final InMemoryMetricExporter testMetricExporter;
private static final MetricReader metricReader;
private static boolean forceFlushCalled;
static {
@ -48,6 +50,13 @@ public final class LibraryTestRunner extends InstrumentationTestRunner {
testSpanExporter = InMemorySpanExporter.create();
testMetricExporter = InMemoryMetricExporter.create(AggregationTemporality.DELTA);
metricReader =
PeriodicMetricReader.builder(testMetricExporter)
// Set really long interval. We'll call forceFlush when we need the metrics
// instead of collecting them periodically.
.setInterval(Duration.ofNanos(Long.MAX_VALUE))
.build();
openTelemetry =
OpenTelemetrySdk.builder()
.setTracerProvider(
@ -56,13 +65,7 @@ public final class LibraryTestRunner extends InstrumentationTestRunner {
.addSpanProcessor(SimpleSpanProcessor.create(LoggingSpanExporter.create()))
.addSpanProcessor(SimpleSpanProcessor.create(testSpanExporter))
.build())
.setMeterProvider(
SdkMeterProvider.builder()
.registerMetricReader(
PeriodicMetricReader.builder(testMetricExporter)
.setInterval(Duration.ofMillis(100))
.build())
.build())
.setMeterProvider(SdkMeterProvider.builder().registerMetricReader(metricReader).build())
.setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
.buildAndRegisterGlobal();
}
@ -114,6 +117,7 @@ public final class LibraryTestRunner extends InstrumentationTestRunner {
@Override
public List<MetricData> getExportedMetrics() {
metricReader.forceFlush().join(10, TimeUnit.SECONDS);
return testMetricExporter.getFinishedMetricItems();
}