proxy: Expire metrics that have not been updated for 10 minutes (#880)

The proxy is now configured with the CONDUIT_PROXY_METRICS_RETAIN_IDLE
environment variable that dictates the amount of time that the proxy will retain
metrics that have not been updated.

A timestamp is maintained for each unique set of labels, indicating the last time
that the scope was updated. Then, when metrics are read, all metrics older than
CONDUIT_PROXY_METRICS_RETAIN_IDLE are dropped from the stats registry.

A ctx::test_utils module has been added to aid testing.

Fixes #819
This commit is contained in:
Oliver Gould 2018-04-30 16:11:12 -07:00 committed by GitHub
parent 01aba7c711
commit 810f6bb719
10 changed files with 227 additions and 19 deletions

View File

@ -53,6 +53,9 @@ pub struct Config {
/// Event queue capacity.
pub event_buffer_capacity: usize,
/// Age after which metrics may be dropped.
pub metrics_retain_idle: Duration,
/// Timeout after which to cancel binding a request.
pub bind_timeout: Duration,
@ -128,6 +131,7 @@ pub const ENV_PRIVATE_FORWARD: &str = "CONDUIT_PROXY_PRIVATE_FORWARD";
pub const ENV_PUBLIC_LISTENER: &str = "CONDUIT_PROXY_PUBLIC_LISTENER";
pub const ENV_CONTROL_LISTENER: &str = "CONDUIT_PROXY_CONTROL_LISTENER";
pub const ENV_METRICS_LISTENER: &str = "CONDUIT_PROXY_METRICS_LISTENER";
pub const ENV_METRICS_RETAIN_IDLE: &str = "CONDUIT_PROXY_METRICS_RETAIN_IDLE";
const ENV_PRIVATE_CONNECT_TIMEOUT: &str = "CONDUIT_PROXY_PRIVATE_CONNECT_TIMEOUT";
const ENV_PUBLIC_CONNECT_TIMEOUT: &str = "CONDUIT_PROXY_PUBLIC_CONNECT_TIMEOUT";
pub const ENV_BIND_TIMEOUT: &str = "CONDUIT_PROXY_BIND_TIMEOUT";
@ -148,6 +152,7 @@ const DEFAULT_PRIVATE_LISTENER: &str = "tcp://127.0.0.1:4140";
const DEFAULT_PUBLIC_LISTENER: &str = "tcp://0.0.0.0:4143";
const DEFAULT_CONTROL_LISTENER: &str = "tcp://0.0.0.0:4190";
const DEFAULT_METRICS_LISTENER: &str = "tcp://127.0.0.1:4191";
const DEFAULT_METRICS_RETAIN_IDLE: u64 = 10 * 60; // Ten minutes
const DEFAULT_PRIVATE_CONNECT_TIMEOUT_MS: u64 = 20;
const DEFAULT_PUBLIC_CONNECT_TIMEOUT_MS: u64 = 300;
const DEFAULT_BIND_TIMEOUT_MS: u64 = 10_000; // ten seconds, as in Linkerd.
@ -182,6 +187,7 @@ impl<'a> TryFrom<&'a Strings> for Config {
let bind_timeout = parse(strings, ENV_BIND_TIMEOUT, parse_number);
let resolv_conf_path = strings.get(ENV_RESOLV_CONF);
let event_buffer_capacity = parse(strings, ENV_EVENT_BUFFER_CAPACITY, parse_number);
let metrics_retain_idle = parse(strings, ENV_METRICS_RETAIN_IDLE, parse_number);
let pod_namespace = strings.get(ENV_POD_NAMESPACE).and_then(|maybe_value| {
// There cannot be a default pod namespace, and the pod namespace is required.
maybe_value.ok_or_else(|| {
@ -236,6 +242,10 @@ impl<'a> TryFrom<&'a Strings> for Config {
control_host_and_port: control_host_and_port?,
event_buffer_capacity: event_buffer_capacity?.unwrap_or(DEFAULT_EVENT_BUFFER_CAPACITY),
metrics_retain_idle: Duration::from_millis(
metrics_retain_idle?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE)
),
bind_timeout:
Duration::from_millis(bind_timeout?.unwrap_or(DEFAULT_BIND_TIMEOUT_MS)),
pod_namespace: pod_namespace?,

View File

@ -74,3 +74,61 @@ impl Proxy {
!self.is_inbound()
}
}
#[cfg(test)]
pub mod test_util {
use http;
use futures_watch;
use std::{
fmt,
net::SocketAddr,
sync::Arc,
time::SystemTime,
};
use ctx;
use telemetry::metrics::DstLabels;
fn addr() -> SocketAddr {
([1, 2, 3, 4], 5678).into()
}
pub fn process() -> Arc<ctx::Process> {
Arc::new(ctx::Process {
scheduled_namespace: "test".into(),
start_time: SystemTime::now(),
})
}
pub fn server(proxy: &Arc<ctx::Proxy>) -> Arc<ctx::transport::Server> {
ctx::transport::Server::new(&proxy, &addr(), &addr(), &Some(addr()))
}
pub fn client<L, S>(proxy: &Arc<ctx::Proxy>, labels: L) -> Arc<ctx::transport::Client>
where
L: IntoIterator<Item=(S, S)>,
S: fmt::Display,
{
let (labels_watch, _store) = futures_watch::Watch::new(DstLabels::new(labels));
ctx::transport::Client::new(&proxy, &addr(), Some(labels_watch))
}
pub fn request(
uri: &str,
server: &Arc<ctx::transport::Server>,
client: &Arc<ctx::transport::Client>,
id: usize
) -> (Arc<ctx::http::Request>, Arc<ctx::http::Response>) {
let req = ctx::http::Request::new(
&http::Request::get(uri).body(()).unwrap(),
&server,
&client,
id,
);
let rsp = ctx::http::Response::new(
&http::Response::builder().status(http::StatusCode::OK).body(()).unwrap(),
&req,
);
(req, rsp)
}
}

View File

@ -200,6 +200,7 @@ where
let (sensors, telemetry) = telemetry::new(
&process_ctx,
config.event_buffer_capacity,
config.metrics_retain_idle,
);
let dns_config = dns::Config::from_system_config()

View File

@ -1,5 +1,6 @@
use std::io;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::{future, Async, Future, Poll, Stream};
use futures_mpsc_lossy::Receiver;
@ -18,6 +19,8 @@ pub struct MakeControl {
rx: Receiver<Event>,
process_ctx: Arc<ctx::Process>,
metrics_retain_idle: Duration,
}
/// Handles the receipt of events.
@ -59,10 +62,12 @@ impl MakeControl {
pub(super) fn new(
rx: Receiver<Event>,
process_ctx: &Arc<ctx::Process>,
metrics_retain_idle: Duration,
) -> Self {
Self {
rx,
process_ctx: Arc::clone(process_ctx),
metrics_retain_idle,
}
}
@ -77,7 +82,7 @@ impl MakeControl {
/// - `Err(io::Error)` if the timeout could not be created.
pub fn make_control(self, taps: &Arc<Mutex<Taps>>, handle: &Handle) -> io::Result<Control> {
let (metrics_record, metrics_service) =
metrics::new(&self.process_ctx);
metrics::new(&self.process_ctx, self.metrics_retain_idle);
Ok(Control {
metrics_record,

View File

@ -8,17 +8,18 @@ use super::{
Metric,
RequestLabels,
ResponseLabels,
Scopes
Scopes,
Stamped,
};
pub(super) type RequestScopes = Scopes<RequestLabels, RequestMetrics>;
pub(super) type RequestScopes = Scopes<RequestLabels, Stamped<RequestMetrics>>;
#[derive(Debug, Default)]
pub(super) struct RequestMetrics {
total: Counter,
}
pub(super) type ResponseScopes = Scopes<ResponseLabels, ResponseMetrics>;
pub(super) type ResponseScopes = Scopes<ResponseLabels, Stamped<ResponseMetrics>>;
#[derive(Debug, Default)]
pub struct ResponseMetrics {

View File

@ -81,7 +81,7 @@ pub struct DstLabels {
// ===== impl RequestLabels =====
impl<'a> RequestLabels {
impl RequestLabels {
pub fn new(req: &ctx::http::Request) -> Self {
let direction = Direction::from_context(req.server.proxy.as_ref());

View File

@ -31,7 +31,7 @@ use std::fmt::{self, Display};
use std::hash::Hash;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use std::time::UNIX_EPOCH;
use std::time::{UNIX_EPOCH, Duration, Instant};
use indexmap::IndexMap;
@ -120,15 +120,21 @@ struct Scopes<L: Display + Hash + Eq, S> {
scopes: IndexMap<L, S>,
}
#[derive(Debug)]
struct Stamped<T> {
stamp: Instant,
inner: T,
}
/// Construct the Prometheus metrics.
///
/// Returns the `Record` and `Serve` sides. The `Serve` side
/// is a Hyper service which can be used to create the server for the
/// scrape endpoint, while the `Record` side can receive updates to the
/// metrics by calling `record_event`.
pub fn new(process: &Arc<ctx::Process>) -> (Record, Serve){
pub fn new(process: &Arc<ctx::Process>, idle_retain: Duration) -> (Record, Serve){
let metrics = Arc::new(Mutex::new(Root::new(process)));
(Record::new(&metrics), Serve::new(&metrics))
(Record::new(&metrics), Serve::new(&metrics, idle_retain))
}
// ===== impl Metric =====
@ -184,22 +190,33 @@ impl Root {
fn request(&mut self, labels: RequestLabels) -> &mut http::RequestMetrics {
self.requests.scopes.entry(labels)
.or_insert_with(http::RequestMetrics::default)
.or_insert_with(|| http::RequestMetrics::default().into())
.stamped()
}
fn response(&mut self, labels: ResponseLabels) -> &mut http::ResponseMetrics {
self.responses.scopes.entry(labels)
.or_insert_with(http::ResponseMetrics::default)
.or_insert_with(|| http::ResponseMetrics::default().into())
.stamped()
}
fn transport(&mut self, labels: TransportLabels) -> &mut transport::OpenMetrics {
self.transports.scopes.entry(labels)
.or_insert_with(transport::OpenMetrics::default)
.or_insert_with(|| transport::OpenMetrics::default().into())
.stamped()
}
fn transport_close(&mut self, labels: TransportCloseLabels) -> &mut transport::CloseMetrics {
self.transport_closes.scopes.entry(labels)
.or_insert_with(transport::CloseMetrics::default)
.or_insert_with(|| transport::CloseMetrics::default().into())
.stamped()
}
fn retain_since(&mut self, epoch: Instant) {
self.requests.retain_since(epoch);
self.responses.retain_since(epoch);
self.transports.retain_since(epoch);
self.transport_closes.retain_since(epoch);
}
}
@ -217,10 +234,117 @@ impl fmt::Display for Root {
}
}
// ===== impl Stamped =====
impl<T> Stamped<T> {
fn stamped(&mut self) -> &mut T {
self.stamp = Instant::now();
&mut self.inner
}
}
impl<T> From<T> for Stamped<T> {
fn from(inner: T) -> Self {
Self {
inner,
stamp: Instant::now(),
}
}
}
impl<T> ::std::ops::Deref for Stamped<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
// ===== impl Scopes =====
impl<L: Display + Hash + Eq, M> Default for Scopes<L, M> {
impl<L: Display + Hash + Eq, S> Default for Scopes<L, S> {
fn default() -> Self {
Scopes { scopes: IndexMap::default(), }
}
}
impl<L: Display + Hash + Eq, S> Scopes<L, Stamped<S>> {
fn retain_since(&mut self, epoch: Instant) {
self.scopes.retain(|_, v| v.stamp >= epoch);
}
}
#[cfg(test)]
mod tests {
use ctx::test_util::*;
use telemetry::event;
use super::*;
fn mock_route(
root: &mut Root,
proxy: &Arc<ctx::Proxy>,
server: &Arc<ctx::transport::Server>,
team: &str
) {
let client = client(&proxy, vec![("team", team)]);
let (req, rsp) = request("http://nba.com", &server, &client, 1);
let client_transport = Arc::new(ctx::transport::Ctx::Client(client));
let transport = TransportLabels::new(&client_transport);
root.transport(transport.clone()).open();
root.request(RequestLabels::new(&req)).end();
root.response(ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10));
root.transport(transport).close(100, 200);
let end = TransportCloseLabels::new(&client_transport, &event::TransportClose {
clean: true,
duration: Duration::from_millis(15),
rx_bytes: 40,
tx_bytes: 0,
});
root.transport_close(end).close(Duration::from_millis(15));
}
#[test]
fn expiry() {
let process = process();
let proxy = ctx::Proxy::outbound(&process);
let server = server(&proxy);
let server_transport = Arc::new(ctx::transport::Ctx::Server(server.clone()));
let mut root = Root::default();
let t0 = Instant::now();
root.transport(TransportLabels::new(&server_transport)).open();
mock_route(&mut root, &proxy, &server, "warriors");
let t1 = Instant::now();
mock_route(&mut root, &proxy, &server, "sixers");
let t2 = Instant::now();
assert_eq!(root.requests.scopes.len(), 2);
assert_eq!(root.responses.scopes.len(), 2);
assert_eq!(root.transports.scopes.len(), 2);
assert_eq!(root.transport_closes.scopes.len(), 1);
root.retain_since(t0);
assert_eq!(root.requests.scopes.len(), 2);
assert_eq!(root.responses.scopes.len(), 2);
assert_eq!(root.transports.scopes.len(), 2);
assert_eq!(root.transport_closes.scopes.len(), 1);
root.retain_since(t1);
assert_eq!(root.requests.scopes.len(), 1);
assert_eq!(root.responses.scopes.len(), 1);
assert_eq!(root.transports.scopes.len(), 1);
assert_eq!(root.transport_closes.scopes.len(), 1);
root.retain_since(t2);
assert_eq!(root.requests.scopes.len(), 0);
assert_eq!(root.responses.scopes.len(), 0);
assert_eq!(root.transports.scopes.len(), 0);
assert_eq!(root.transport_closes.scopes.len(), 0);
}
}

View File

@ -6,6 +6,7 @@ use hyper::header::{AcceptEncoding, ContentEncoding, ContentType, Encoding, Qual
use hyper::server::{Request, Response, Service};
use std::io::Write;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use super::Root;
@ -13,14 +14,16 @@ use super::Root;
#[derive(Debug, Clone)]
pub struct Serve {
metrics: Arc<Mutex<Root>>,
idle_retain: Duration,
}
// ===== impl Serve =====
impl Serve {
pub(super) fn new(metrics: &Arc<Mutex<Root>>) -> Self {
pub(super) fn new(metrics: &Arc<Mutex<Root>>, idle_retain: Duration) -> Self {
Serve {
metrics: metrics.clone(),
idle_retain,
}
}
@ -49,9 +52,12 @@ impl Service for Serve {
.with_status(StatusCode::NotFound));
}
let metrics = self.metrics.lock()
let mut metrics = self.metrics.lock()
.expect("metrics lock poisoned");
metrics.retain_since(Instant::now() - self.idle_retain);
let metrics = metrics;
let resp = if Self::is_gzip(&req) {
trace!("gzipping metrics");
let mut writer = GzEncoder::new(Vec::<u8>::new(), CompressionOptions::fast());

View File

@ -9,10 +9,11 @@ use super::{
Metric,
TransportLabels,
TransportCloseLabels,
Scopes
Scopes,
Stamped,
};
pub(super) type OpenScopes = Scopes<TransportLabels, OpenMetrics>;
pub(super) type OpenScopes = Scopes<TransportLabels, Stamped<OpenMetrics>>;
#[derive(Debug, Default)]
pub(super) struct OpenMetrics {
@ -22,7 +23,7 @@ pub(super) struct OpenMetrics {
read_bytes_total: Counter,
}
pub(super) type CloseScopes = Scopes<TransportCloseLabels, CloseMetrics>;
pub(super) type CloseScopes = Scopes<TransportCloseLabels, Stamped<CloseMetrics>>;
#[derive(Debug, Default)]
pub(super) struct CloseMetrics {

View File

@ -1,4 +1,5 @@
use std::sync::Arc;
use std::time::Duration;
use futures_mpsc_lossy;
@ -30,9 +31,10 @@ pub use self::sensor::Sensors;
pub fn new(
process: &Arc<ctx::Process>,
capacity: usize,
metrics_retain_idle: Duration,
) -> (Sensors, MakeControl) {
let (tx, rx) = futures_mpsc_lossy::channel(capacity);
let s = Sensors::new(tx);
let c = MakeControl::new(rx, process);
let c = MakeControl::new(rx, process, metrics_retain_idle);
(s, c)
}