proxy: Add tcp_connections_open gauge (#791)

Depends on #785.

This PR adds the `tcp_connections_open` gauge to the proxy's TCP metrics. It also adds some tests for that metric.
This commit is contained in:
Eliza Weisman 2018-04-24 10:17:48 -07:00 committed by GitHub
parent b053b16e9d
commit 5d29c270bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 163 additions and 9 deletions

View File

@ -49,8 +49,10 @@ pub struct TransportLabels {
/// Labels describing the end of a TCP connection /// Labels describing the end of a TCP connection
#[derive(Clone, Debug, Eq, PartialEq, Hash)] #[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct TransportCloseLabels { pub struct TransportCloseLabels {
transport: TransportLabels, /// Labels describing the TCP connection that closed.
pub(super) transport: TransportLabels,
/// Was the transport closed successfully?
classification: Classification, classification: Classification,
} }

View File

@ -72,6 +72,7 @@ struct Metrics {
tcp_connect_close_total: Metric<Counter, Arc<TransportCloseLabels>>, tcp_connect_close_total: Metric<Counter, Arc<TransportCloseLabels>>,
tcp_connection_duration: Metric<Histogram, Arc<TransportCloseLabels>>, tcp_connection_duration: Metric<Histogram, Arc<TransportCloseLabels>>,
tcp_connections_open: Metric<Gauge, Arc<TransportLabels>>,
sent_bytes: Metric<Counter, Arc<TransportCloseLabels>>, sent_bytes: Metric<Counter, Arc<TransportCloseLabels>>,
received_bytes: Metric<Counter, Arc<TransportCloseLabels>>, received_bytes: Metric<Counter, Arc<TransportCloseLabels>>,
@ -108,6 +109,10 @@ struct Metric<M, L: Hash + Eq> {
#[derive(Copy, Debug, Default, Clone, Eq, PartialEq)] #[derive(Copy, Debug, Default, Clone, Eq, PartialEq)]
pub struct Counter(Wrapping<u64>); pub struct Counter(Wrapping<u64>);
/// A Prometheus gauge
#[derive(Copy, Debug, Default, Clone, Eq, PartialEq)]
pub struct Gauge(u64);
/// Tracks Prometheus metrics /// Tracks Prometheus metrics
#[derive(Debug)] #[derive(Debug)]
pub struct Aggregate { pub struct Aggregate {
@ -191,6 +196,11 @@ impl Metrics {
"A histogram of the duration of the lifetime of a connection, in milliseconds", "A histogram of the duration of the lifetime of a connection, in milliseconds",
); );
let tcp_connections_open = Metric::<Gauge, Arc<TransportLabels>>::new(
"tcp_connections_open",
"A gauge of the number of transport connections currently open.",
);
let received_bytes = Metric::<Counter, Arc<TransportCloseLabels>>::new( let received_bytes = Metric::<Counter, Arc<TransportCloseLabels>>::new(
"received_bytes", "received_bytes",
"A counter of the total number of recieved bytes." "A counter of the total number of recieved bytes."
@ -210,6 +220,7 @@ impl Metrics {
tcp_connect_open_total, tcp_connect_open_total,
tcp_connect_close_total, tcp_connect_close_total,
tcp_connection_duration, tcp_connection_duration,
tcp_connections_open,
received_bytes, received_bytes,
sent_bytes, sent_bytes,
start_time, start_time,
@ -280,6 +291,14 @@ impl Metrics {
.or_insert_with(Histogram::default) .or_insert_with(Histogram::default)
} }
fn tcp_connections_open(&mut self,
labels: &Arc<TransportLabels>)
-> &mut Gauge {
self.tcp_connections_open.values
.entry(labels.clone())
.or_insert_with(Gauge::default)
}
fn sent_bytes(&mut self, fn sent_bytes(&mut self,
labels: &Arc<TransportCloseLabels>) labels: &Arc<TransportCloseLabels>)
-> &mut Counter { -> &mut Counter {
@ -307,6 +326,7 @@ impl fmt::Display for Metrics {
writeln!(f, "{}", self.tcp_connect_open_total)?; writeln!(f, "{}", self.tcp_connect_open_total)?;
writeln!(f, "{}", self.tcp_connect_close_total)?; writeln!(f, "{}", self.tcp_connect_close_total)?;
writeln!(f, "{}", self.tcp_connection_duration)?; writeln!(f, "{}", self.tcp_connection_duration)?;
writeln!(f, "{}", self.tcp_connections_open)?;
writeln!(f, "{}", self.sent_bytes)?; writeln!(f, "{}", self.sent_bytes)?;
writeln!(f, "{}", self.received_bytes)?; writeln!(f, "{}", self.received_bytes)?;
writeln!(f, "process_start_time_seconds {}", self.start_time)?; writeln!(f, "process_start_time_seconds {}", self.start_time)?;
@ -354,6 +374,37 @@ impl Counter {
} }
} }
// ===== impl Gauge =====
impl fmt::Display for Gauge {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl Gauge {
/// Increment the gauge by one.
pub fn incr(&mut self) -> &mut Self {
if let Some(new_value) = self.0.checked_add(1) {
(*self).0 = new_value;
} else {
warn!("Gauge::incr() would wrap!");
}
self
}
/// Decrement the gauge by one.
pub fn decr(&mut self) -> &mut Self {
if let Some(new_value) = self.0.checked_sub(1) {
(*self).0 = new_value;
} else {
warn!("Gauge::decr() called on a gauge with value 0");
}
self
}
}
// ===== impl Metric ===== // ===== impl Metric =====
impl<M, L: Hash + Eq> Metric<M, L> { impl<M, L: Hash + Eq> Metric<M, L> {
@ -392,6 +443,30 @@ where
} }
} }
impl<L> fmt::Display for Metric<Gauge, L>
where
L: fmt::Display,
L: Hash + Eq,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f,
"# HELP {name} {help}\n# TYPE {name} gauge\n",
name = self.name,
help = self.help,
)?;
for (labels, value) in &self.values {
write!(f, "{name}{{{labels}}} {value}\n",
name = self.name,
labels = labels,
value = value,
)?;
}
Ok(())
}
}
impl<L> fmt::Display for Metric<Histogram, L> where impl<L> fmt::Display for Metric<Histogram, L> where
L: fmt::Display, L: fmt::Display,
L: Hash + Eq, L: Hash + Eq,
@ -504,13 +579,16 @@ impl Aggregate {
Event::TransportOpen(ref ctx) => { Event::TransportOpen(ref ctx) => {
let labels = Arc::new(TransportLabels::new(ctx)); let labels = Arc::new(TransportLabels::new(ctx));
self.update(|metrics| match ctx.as_ref() { self.update(|metrics| {
&ctx::transport::Ctx::Server(_) => { match ctx.as_ref() {
*metrics.tcp_accept_open_total(&labels).incr(); &ctx::transport::Ctx::Server(_) => {
}, *metrics.tcp_accept_open_total(&labels).incr();
&ctx::transport::Ctx::Client(_) => { *metrics.tcp_connections_open(&labels).incr();
*metrics.tcp_connect_open_total(&labels).incr(); },
}, &ctx::transport::Ctx::Client(_) => {
*metrics.tcp_connect_open_total(&labels).incr();
},
}
}) })
}, },
@ -523,6 +601,20 @@ impl Aggregate {
match ctx.as_ref() { match ctx.as_ref() {
&ctx::transport::Ctx::Server(_) => { &ctx::transport::Ctx::Server(_) => {
*metrics.tcp_accept_close_total(&labels).incr(); *metrics.tcp_accept_close_total(&labels).incr();
// We don't use the accessor method here like we do
// for all the other metrics so that we can just
// use the transport open labels in `labels`
// without having to ref-count it separately. Since
// we're handling a close event, we expect those
// labels to already be in the map from when the
// transport was opened.
*metrics.tcp_connections_open.values
.get_mut(&labels.transport)
.expect(
"observed TransportClose event for a \
transport that was not counted"
)
.decr();
}, },
&ctx::transport::Ctx::Client(_) => { &ctx::transport::Ctx::Client(_) => {
*metrics.tcp_connect_close_total(&labels).incr(); *metrics.tcp_connect_close_total(&labels).incr();

View File

@ -771,7 +771,7 @@ mod transport {
assert_eq!(client2.get("/"), "hello"); assert_eq!(client2.get("/"), "hello");
// server connection should be pooled // server connection should be pooled
assert_contains!(metrics.get("/metrics"), assert_contains!(metrics.get("/metrics"),
"tcp_connect_open_total{direction=\"outbound\",classification=\"success\"} 1"); "tcp_connect_open_total{direction=\"outbound\"} 1");
} }
#[test] #[test]
@ -1028,4 +1028,64 @@ mod transport {
drop(tcp_client); drop(tcp_client);
assert_contains!(metrics.get("/metrics"), &expected); assert_contains!(metrics.get("/metrics"), &expected);
} }
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn outbound_tcp_connections_open() {
let _ = env_logger::try_init();
let TcpFixture { client, metrics, proxy: _proxy } =
TcpFixture::outbound();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 1");
drop(tcp_client);
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 0");
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 1");
drop(tcp_client);
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 0");
}
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn outbound_http_tcp_connections_open() {
let _ = env_logger::try_init();
let Fixture { client, metrics, proxy } =
Fixture::outbound();
info!("client.get(/)");
assert_eq!(client.get("/"), "hello");
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 1");
drop(client);
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 0");
// create a new client to force a new connection
let client = client::new(proxy.outbound, "tele.test.svc.cluster.local");
info!("client.get(/)");
assert_eq!(client.get("/"), "hello");
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 1");
drop(client);
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 0");
}
} }