Add `assert_eventually!` macro to help de-flake telemetry tests (#669)

Closes #615.

Based on @olix0r's suggestion in https://github.com/runconduit/conduit/issues/613#issuecomment-376024744, this PR adds an `assert_eventually!` macro to retry an assertion a set number of times, waiting for 15 ms between retries. This is loosely based on ScalaTest's [eventually](http://doc.scalatest.org/1.8/org/scalatest/concurrent/Eventually.html).

I've rewritten the flaky telemetry tests to use the `assert_eventually!` macro, to compensate for delays in the served metrics being updated between client requests and metrics scrapes.
This commit is contained in:
Eliza Weisman 2018-04-05 11:23:34 -07:00 committed by GitHub
parent 6b370b4466
commit 49bf01b0da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 104 additions and 56 deletions

View File

@ -30,6 +30,71 @@ use self::tower_h2::{Body, RecvBody};
use std::net::SocketAddr;
pub use std::time::Duration;
/// Environment variable for overriding the test patience.
pub const ENV_TEST_PATIENCE_MS: &'static str = "RUST_TEST_PATIENCE_MS";
pub const DEFAULT_TEST_PATIENCE: Duration = Duration::from_millis(15);
/// Retry an assertion up to a specified number of times, waiting
/// `RUST_TEST_PATIENCE_MS` between retries.
///
/// If the assertion is successful after a retry, execution will continue
/// normally. If all retries are exhausted and the assertion still fails,
/// `assert_eventually!` will panic as though a regular `assert!` had failed.
/// Note that other panics elsewhere in the code under test will not be
/// prevented.
///
/// This should be used sparingly, but is often useful in end-to-end testing
/// where a desired state may not be reached immediately. For example, when
/// some state updates asynchronously and there's no obvious way for the test
/// to wait for an update to occur before making assertions.
///
/// The `RUST_TEST_PATIENCE_MS` environment variable may be used to customize
/// the backoff duration between retries. This may be useful for purposes such
/// compensating for decreased performance on CI.
#[macro_export]
macro_rules! assert_eventually {
($cond:expr, retries: $retries:expr, $($arg:tt)+) => {
{
use std::{env, u64};
use std::time::{Instant, Duration};
use std::str::FromStr;
// TODO: don't do this *every* time eventually is called (lazy_static?)
let patience = env::var($crate::support::ENV_TEST_PATIENCE_MS).ok()
.map(|s| {
let millis = u64::from_str(&s)
.expect(
"Could not parse RUST_TEST_PATIENCE_MS environment \
variable."
);
Duration::from_millis(millis)
})
.unwrap_or($crate::support::DEFAULT_TEST_PATIENCE);
let start_t = Instant::now();
for i in 0..($retries + 1) {
if $cond {
break;
} else if i == $retries {
panic!(
"assertion failed after {:?} (retried {} times): {}",
start_t.elapsed(), i, format_args!($($arg)+)
)
} else {
::std::thread::sleep(patience);
}
}
}
};
($cond:expr, $($arg:tt)+) => {
assert_eventually!($cond, retries: 5, $($arg)+)
};
($cond:expr, retries: $retries:expr) => {
assert_eventually!($cond, retries: $retries, stringify!($cond))
};
($cond:expr) => {
assert_eventually!($cond, retries: 5, stringify!($cond))
};
}
pub mod client;
pub mod controller;
pub mod proxy;
@ -64,3 +129,9 @@ impl Stream for RecvBodyStream {
pub fn s(bytes: &[u8]) -> &str {
::std::str::from_utf8(bytes.as_ref()).unwrap()
}
#[test]
#[should_panic]
fn assert_eventually() {
assert_eventually!(false)
}

View File

@ -1,6 +1,7 @@
#[macro_use]
extern crate log;
#[macro_use]
mod support;
use self::support::*;
@ -262,13 +263,11 @@ fn telemetry_report_errors_are_ignored() {}
macro_rules! assert_contains {
($scrape:expr, $contains:expr) => {
assert!($scrape.contains($contains), "metrics scrape:\n{:8}\ndid not contain:\n{:8}", $scrape, $contains)
assert_eventually!($scrape.contains($contains), "metrics scrape:\n{:8}\ndid not contain:\n{:8}", $scrape, $contains)
}
}
// https://github.com/runconduit/conduit/issues/613
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn metrics_endpoint_inbound_request_count() {
let _ = env_logger::try_init();
@ -291,15 +290,12 @@ fn metrics_endpoint_inbound_request_count() {
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
let scrape = metrics.get("/metrics");
// after seeing a request, the request count should be 1.
assert_contains!(scrape, "request_total{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\"} 1");
assert_contains!(metrics.get("/metrics"), "request_total{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\"} 1");
}
// https://github.com/runconduit/conduit/issues/613
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn metrics_endpoint_outbound_request_count() {
let _ = env_logger::try_init();
@ -324,9 +320,8 @@ fn metrics_endpoint_outbound_request_count() {
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
let scrape = metrics.get("/metrics");
// after seeing a request, the request count should be 1.
assert_contains!(scrape, "request_total{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\"} 1");
assert_contains!(metrics.get("/metrics"), "request_total{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\"} 1");
}
@ -385,9 +380,7 @@ mod response_classification {
})
}
// https://github.com/runconduit/conduit/issues/613
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn inbound_http() {
let _ = env_logger::try_init();
let srv = make_test_server().run();
@ -408,18 +401,15 @@ mod response_classification {
);
assert_eq!(&request.status(), status);
let scrape = metrics.get("/metrics");
for status in &STATUSES[0..i] {
// assert that the current status code is incremented, *and* that
// all previous requests are *not* incremented.
assert_contains!(scrape, &expected_metric(status, "inbound"))
assert_contains!(metrics.get("/metrics"), &expected_metric(status, "inbound"))
}
}
}
// https://github.com/runconduit/conduit/issues/613
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn outbound_http() {
let _ = env_logger::try_init();
let srv = make_test_server().run();
@ -442,11 +432,10 @@ mod response_classification {
);
assert_eq!(&request.status(), status);
let scrape = metrics.get("/metrics");
for status in &STATUSES[0..i] {
// assert that the current status code is incremented, *and* that
// all previous requests are *not* incremented.
assert_contains!(scrape, &expected_metric(status, "outbound"))
assert_contains!(metrics.get("/metrics"), &expected_metric(status, "outbound"))
}
}
}
@ -479,13 +468,12 @@ fn metrics_endpoint_inbound_response_latency() {
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
let scrape = metrics.get("/metrics");
// assert the >=1000ms bucket is incremented by our request with 500ms
// extra latency.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 1");
// the histogram's count should be 1.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\"} 1");
// TODO: we're not going to make any assertions about the
// response_latency_ms_sum stat, since its granularity depends on the actual
@ -496,46 +484,44 @@ fn metrics_endpoint_inbound_response_latency() {
info!("client.get(/hi)");
assert_eq!(client.get("/hi"), "good morning");
let scrape = metrics.get("/metrics");
// request with 40ms extra latency should fall into the 50ms bucket.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 1");
// 1000ms bucket should be incremented as well, since it counts *all*
// bservations less than or equal to 1000ms, even if they also increment
// other buckets.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 2");
// the histogram's total count should be 2.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\"} 2");
info!("client.get(/hi)");
assert_eq!(client.get("/hi"), "good morning");
let scrape = metrics.get("/metrics");
// request with 40ms extra latency should fall into the 50ms bucket.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 2");
// 1000ms bucket should be incremented as well.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 3");
// the histogram's total count should be 3.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\"} 3");
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
let scrape = metrics.get("/metrics");
// 50ms bucket should be un-changed by the request with 500ms latency.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 2");
// 1000ms bucket should be incremented.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 4");
// the histogram's total count should be 4.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\"} 4");
}
@ -568,13 +554,12 @@ fn metrics_endpoint_outbound_response_latency() {
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
let scrape = metrics.get("/metrics");
// assert the >=1000ms bucket is incremented by our request with 500ms
// extra latency.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 1");
// the histogram's count should be 1.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\"} 1");
// TODO: we're not going to make any assertions about the
// response_latency_ms_sum stat, since its granularity depends on the actual
@ -585,46 +570,42 @@ fn metrics_endpoint_outbound_response_latency() {
info!("client.get(/hi)");
assert_eq!(client.get("/hi"), "good morning");
let scrape = metrics.get("/metrics");
// request with 40ms extra latency should fall into the 50ms bucket.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 1");
// 1000ms bucket should be incremented as well, since it counts *all*
// bservations less than or equal to 1000ms, even if they also increment
// other buckets.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 2");
// the histogram's total count should be 2.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\"} 2");
info!("client.get(/hi)");
assert_eq!(client.get("/hi"), "good morning");
let scrape = metrics.get("/metrics");
// request with 40ms extra latency should fall into the 50ms bucket.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 2");
// 1000ms bucket should be incremented as well.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 3");
// the histogram's total count should be 3.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\"} 3");
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
let scrape = metrics.get("/metrics");
// 50ms bucket should be un-changed by the request with 500ms latency.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 2");
// 1000ms bucket should be incremented.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 4");
// the histogram's total count should be 4.
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\"} 4");
}
@ -652,16 +633,14 @@ fn metrics_endpoint_inbound_request_duration() {
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
let scrape = metrics.get("/metrics");
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"request_duration_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\"} 1");
// request without body should also increment request_duration
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
let scrape = metrics.get("/metrics");
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"request_duration_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\"} 2");
}
@ -689,15 +668,13 @@ fn metrics_endpoint_outbound_request_duration() {
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
let scrape = metrics.get("/metrics");
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"request_duration_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\"} 1");
info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");
let scrape = metrics.get("/metrics");
assert_contains!(scrape,
assert_contains!(metrics.get("/metrics"),
"request_duration_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\"} 2");
}