From 5d29c270bfbd4681921fbc203210667bc9f49962 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 24 Apr 2018 10:17:48 -0700 Subject: [PATCH] proxy: Add tcp_connections_open gauge (#791) Depends on #785. This PR adds the `tcp_connections_open` gauge to the proxy's TCP metrics. It also adds some tests for that metric. --- proxy/src/telemetry/metrics/labels.rs | 4 +- proxy/src/telemetry/metrics/mod.rs | 106 ++++++++++++++++++++++++-- proxy/tests/telemetry.rs | 62 ++++++++++++++- 3 files changed, 163 insertions(+), 9 deletions(-) diff --git a/proxy/src/telemetry/metrics/labels.rs b/proxy/src/telemetry/metrics/labels.rs index 04397a81f..266cc9c70 100644 --- a/proxy/src/telemetry/metrics/labels.rs +++ b/proxy/src/telemetry/metrics/labels.rs @@ -49,8 +49,10 @@ pub struct TransportLabels { /// Labels describing the end of a TCP connection #[derive(Clone, Debug, Eq, PartialEq, Hash)] pub struct TransportCloseLabels { - transport: TransportLabels, + /// Labels describing the TCP connection that closed. + pub(super) transport: TransportLabels, + /// Was the transport closed successfully? classification: Classification, } diff --git a/proxy/src/telemetry/metrics/mod.rs b/proxy/src/telemetry/metrics/mod.rs index f961e03d3..253b2039f 100644 --- a/proxy/src/telemetry/metrics/mod.rs +++ b/proxy/src/telemetry/metrics/mod.rs @@ -72,6 +72,7 @@ struct Metrics { tcp_connect_close_total: Metric>, tcp_connection_duration: Metric>, + tcp_connections_open: Metric>, sent_bytes: Metric>, received_bytes: Metric>, @@ -108,6 +109,10 @@ struct Metric { #[derive(Copy, Debug, Default, Clone, Eq, PartialEq)] pub struct Counter(Wrapping); +/// A Prometheus gauge +#[derive(Copy, Debug, Default, Clone, Eq, PartialEq)] +pub struct Gauge(u64); + /// Tracks Prometheus metrics #[derive(Debug)] pub struct Aggregate { @@ -191,6 +196,11 @@ impl Metrics { "A histogram of the duration of the lifetime of a connection, in milliseconds", ); + let tcp_connections_open = Metric::>::new( + "tcp_connections_open", + "A gauge of the number of transport connections currently open.", + ); + let received_bytes = Metric::>::new( "received_bytes", "A counter of the total number of recieved bytes." @@ -210,6 +220,7 @@ impl Metrics { tcp_connect_open_total, tcp_connect_close_total, tcp_connection_duration, + tcp_connections_open, received_bytes, sent_bytes, start_time, @@ -280,6 +291,14 @@ impl Metrics { .or_insert_with(Histogram::default) } + fn tcp_connections_open(&mut self, + labels: &Arc) + -> &mut Gauge { + self.tcp_connections_open.values + .entry(labels.clone()) + .or_insert_with(Gauge::default) + } + fn sent_bytes(&mut self, labels: &Arc) -> &mut Counter { @@ -307,6 +326,7 @@ impl fmt::Display for Metrics { writeln!(f, "{}", self.tcp_connect_open_total)?; writeln!(f, "{}", self.tcp_connect_close_total)?; writeln!(f, "{}", self.tcp_connection_duration)?; + writeln!(f, "{}", self.tcp_connections_open)?; writeln!(f, "{}", self.sent_bytes)?; writeln!(f, "{}", self.received_bytes)?; writeln!(f, "process_start_time_seconds {}", self.start_time)?; @@ -354,6 +374,37 @@ impl Counter { } } +// ===== impl Gauge ===== + +impl fmt::Display for Gauge { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl Gauge { + /// Increment the gauge by one. + pub fn incr(&mut self) -> &mut Self { + if let Some(new_value) = self.0.checked_add(1) { + (*self).0 = new_value; + } else { + warn!("Gauge::incr() would wrap!"); + } + self + } + + /// Decrement the gauge by one. + pub fn decr(&mut self) -> &mut Self { + if let Some(new_value) = self.0.checked_sub(1) { + (*self).0 = new_value; + } else { + warn!("Gauge::decr() called on a gauge with value 0"); + } + self + } + +} + // ===== impl Metric ===== impl Metric { @@ -392,6 +443,30 @@ where } } +impl fmt::Display for Metric +where + L: fmt::Display, + L: Hash + Eq, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, + "# HELP {name} {help}\n# TYPE {name} gauge\n", + name = self.name, + help = self.help, + )?; + + for (labels, value) in &self.values { + write!(f, "{name}{{{labels}}} {value}\n", + name = self.name, + labels = labels, + value = value, + )?; + } + + Ok(()) + } +} + impl fmt::Display for Metric where L: fmt::Display, L: Hash + Eq, @@ -504,13 +579,16 @@ impl Aggregate { Event::TransportOpen(ref ctx) => { let labels = Arc::new(TransportLabels::new(ctx)); - self.update(|metrics| match ctx.as_ref() { - &ctx::transport::Ctx::Server(_) => { - *metrics.tcp_accept_open_total(&labels).incr(); - }, - &ctx::transport::Ctx::Client(_) => { - *metrics.tcp_connect_open_total(&labels).incr(); - }, + self.update(|metrics| { + match ctx.as_ref() { + &ctx::transport::Ctx::Server(_) => { + *metrics.tcp_accept_open_total(&labels).incr(); + *metrics.tcp_connections_open(&labels).incr(); + }, + &ctx::transport::Ctx::Client(_) => { + *metrics.tcp_connect_open_total(&labels).incr(); + }, + } }) }, @@ -523,6 +601,20 @@ impl Aggregate { match ctx.as_ref() { &ctx::transport::Ctx::Server(_) => { *metrics.tcp_accept_close_total(&labels).incr(); + // We don't use the accessor method here like we do + // for all the other metrics so that we can just + // use the transport open labels in `labels` + // without having to ref-count it separately. Since + // we're handling a close event, we expect those + // labels to already be in the map from when the + // transport was opened. + *metrics.tcp_connections_open.values + .get_mut(&labels.transport) + .expect( + "observed TransportClose event for a \ + transport that was not counted" + ) + .decr(); }, &ctx::transport::Ctx::Client(_) => { *metrics.tcp_connect_close_total(&labels).incr(); diff --git a/proxy/tests/telemetry.rs b/proxy/tests/telemetry.rs index 166ee68fb..ac1ebbede 100644 --- a/proxy/tests/telemetry.rs +++ b/proxy/tests/telemetry.rs @@ -771,7 +771,7 @@ mod transport { assert_eq!(client2.get("/"), "hello"); // server connection should be pooled assert_contains!(metrics.get("/metrics"), - "tcp_connect_open_total{direction=\"outbound\",classification=\"success\"} 1"); + "tcp_connect_open_total{direction=\"outbound\"} 1"); } #[test] @@ -1028,4 +1028,64 @@ mod transport { drop(tcp_client); assert_contains!(metrics.get("/metrics"), &expected); } + + #[test] + #[cfg_attr(not(feature = "flaky_tests"), ignore)] + fn outbound_tcp_connections_open() { + let _ = env_logger::try_init(); + let TcpFixture { client, metrics, proxy: _proxy } = + TcpFixture::outbound(); + + let msg1 = "custom tcp hello"; + let msg2 = "custom tcp bye"; + + let tcp_client = client.connect(); + + tcp_client.write(msg1); + assert_eq!(tcp_client.read(), msg2.as_bytes()); + assert_contains!(metrics.get("/metrics"), + "tcp_connections_open{direction=\"outbound\"} 1"); + drop(tcp_client); + assert_contains!(metrics.get("/metrics"), + "tcp_connections_open{direction=\"outbound\"} 0"); + let tcp_client = client.connect(); + + tcp_client.write(msg1); + assert_eq!(tcp_client.read(), msg2.as_bytes()); + assert_contains!(metrics.get("/metrics"), + "tcp_connections_open{direction=\"outbound\"} 1"); + + drop(tcp_client); + assert_contains!(metrics.get("/metrics"), + "tcp_connections_open{direction=\"outbound\"} 0"); + } + + #[test] + #[cfg_attr(not(feature = "flaky_tests"), ignore)] + fn outbound_http_tcp_connections_open() { + let _ = env_logger::try_init(); + let Fixture { client, metrics, proxy } = + Fixture::outbound(); + + info!("client.get(/)"); + assert_eq!(client.get("/"), "hello"); + + assert_contains!(metrics.get("/metrics"), + "tcp_connections_open{direction=\"outbound\"} 1"); + drop(client); + assert_contains!(metrics.get("/metrics"), + "tcp_connections_open{direction=\"outbound\"} 0"); + + // create a new client to force a new connection + let client = client::new(proxy.outbound, "tele.test.svc.cluster.local"); + + info!("client.get(/)"); + assert_eq!(client.get("/"), "hello"); + assert_contains!(metrics.get("/metrics"), + "tcp_connections_open{direction=\"outbound\"} 1"); + + drop(client); + assert_contains!(metrics.get("/metrics"), + "tcp_connections_open{direction=\"outbound\"} 0"); + } }