From d4d0e579c2dc626ff72f2403505c59efb0fe0379 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 25 Apr 2018 14:06:33 -0700 Subject: [PATCH] Introduce the `peer` label to transport metrics (#848) Previously, the proxy exposed separate _accept_ and _connect_ metrics for some metric types, but not for all. This leads to confusing aggregations, particularly for read and write taotals. This change primarily introduces the `peer` prometheus label (with possible values _src_ or _dst_) to indicate which side of the proxy the metric reflects. Additionally, the `received_bytes` and `sent_bytes` metrics have been renamed as `tcp_read_bytes_total` and `tcp_write_bytes_total`, resectively. This more naturally fits into existing idioms. Stream classification is not applied to these metrics, as we plan to increment them throughout stream lifetime and not only on close. The `tcp_connections_open` metric has also been renamed to `tcp_open_connections` to reflect Prometheus idioms. Finally, `msg1` and `msg2` have been constified in telemetry test fixtures so that tests are somewhat easier to read. --- proxy/src/telemetry/metrics/labels.rs | 15 +- proxy/src/telemetry/metrics/mod.rs | 299 +++++++++++--------------- proxy/tests/telemetry.rs | 288 +++++++++++++------------ 3 files changed, 294 insertions(+), 308 deletions(-) diff --git a/proxy/src/telemetry/metrics/labels.rs b/proxy/src/telemetry/metrics/labels.rs index 266cc9c70..c32837878 100644 --- a/proxy/src/telemetry/metrics/labels.rs +++ b/proxy/src/telemetry/metrics/labels.rs @@ -44,8 +44,13 @@ pub struct ResponseLabels { pub struct TransportLabels { /// Was the transport opened in the inbound or outbound direction? direction: Direction, + + peer: Peer, } +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +pub enum Peer { Src, Dst } + /// Labels describing the end of a TCP connection #[derive(Clone, Debug, Eq, PartialEq, Hash)] pub struct TransportCloseLabels { @@ -289,13 +294,21 @@ impl TransportLabels { pub fn new(ctx: &ctx::transport::Ctx) -> Self { TransportLabels { direction: Direction::from_context(&ctx.proxy()), + peer: match *ctx { + ctx::transport::Ctx::Server(_) => Peer::Src, + ctx::transport::Ctx::Client(_) => Peer::Dst, + }, } } } impl fmt::Display for TransportLabels { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Display::fmt(&self.direction, f) + fmt::Display::fmt(&self.direction, f)?; + f.pad(match self.peer { + Peer::Src => ",peer=\"src\"", + Peer::Dst => ",peer=\"dst\"", + }) } } diff --git a/proxy/src/telemetry/metrics/mod.rs b/proxy/src/telemetry/metrics/mod.rs index 70ebc5c7e..703df0604 100644 --- a/proxy/src/telemetry/metrics/mod.rs +++ b/proxy/src/telemetry/metrics/mod.rs @@ -68,21 +68,23 @@ struct Metrics { response_total: Metric>, response_latency: Metric>, - tcp_accept_open_total: Metric>, - tcp_accept_close_total: Metric>, - - tcp_connect_open_total: Metric>, - tcp_connect_close_total: Metric>, - - tcp_connection_duration: Metric>, - tcp_connections_open: Metric>, - - sent_bytes: Metric>, - received_bytes: Metric>, + tcp: TcpMetrics, start_time: u64, } +#[derive(Debug, Clone)] +struct TcpMetrics { + open_total: Metric>, + close_total: Metric>, + + connection_duration: Metric>, + open_connections: Metric>, + + write_bytes_total: Metric>, + read_bytes_total: Metric>, +} + #[derive(Debug, Clone)] struct Metric { name: &'static str, @@ -170,62 +172,11 @@ impl Metrics { stream has completed.", ); - let tcp_accept_open_total = Metric::>::new( - "tcp_accept_open_total", - "A counter of the total number of transport connections which \ - have been accepted by the proxy.", - ); - - let tcp_accept_close_total = Metric::>::new( - "tcp_accept_close_total", - "A counter of the total number of transport connections accepted \ - by the proxy which have been closed.", - ); - - let tcp_connect_open_total = Metric::>::new( - "tcp_connect_open_total", - "A counter of the total number of transport connections which \ - have been opened by the proxy.", - ); - - let tcp_connect_close_total = Metric::>::new( - "tcp_connect_close_total", - "A counter of the total number of transport connections opened \ - by the proxy which have been closed.", - ); - - let tcp_connection_duration = Metric::>::new( - "tcp_connection_duration_ms", - "A histogram of the duration of the lifetime of a connection, in milliseconds", - ); - - let tcp_connections_open = Metric::>::new( - "tcp_connections_open", - "A gauge of the number of transport connections currently open.", - ); - - let received_bytes = Metric::>::new( - "received_bytes", - "A counter of the total number of recieved bytes." - ); - - let sent_bytes = Metric::>::new( - "sent_bytes", - "A counter of the total number of sent bytes." - ); - Metrics { request_total, response_total, response_latency, - tcp_accept_open_total, - tcp_accept_close_total, - tcp_connect_open_total, - tcp_connect_close_total, - tcp_connection_duration, - tcp_connections_open, - received_bytes, - sent_bytes, + tcp: TcpMetrics::new(), start_time, } } @@ -254,68 +205,8 @@ impl Metrics { .or_insert_with(Counter::default) } - fn tcp_accept_open_total(&mut self, - labels: &Arc) - -> &mut Counter { - self.tcp_accept_open_total.values - .entry(labels.clone()) - .or_insert_with(Default::default) - } - - fn tcp_accept_close_total(&mut self, - labels: &Arc) - -> &mut Counter { - self.tcp_accept_close_total.values - .entry(labels.clone()) - .or_insert_with(Counter::default) - } - - fn tcp_connect_open_total(&mut self, - labels: &Arc) - -> &mut Counter { - self.tcp_connect_open_total.values - .entry(labels.clone()) - .or_insert_with(Default::default) - } - - fn tcp_connect_close_total(&mut self, - labels: &Arc) - -> &mut Counter { - self.tcp_connect_close_total.values - .entry(labels.clone()) - .or_insert_with(Counter::default) - } - - fn tcp_connection_duration(&mut self, - labels: &Arc) - -> &mut Histogram { - self.tcp_connection_duration.values - .entry(labels.clone()) - .or_insert_with(Histogram::default) - } - - fn tcp_connections_open(&mut self, - labels: &Arc) - -> &mut Gauge { - self.tcp_connections_open.values - .entry(labels.clone()) - .or_insert_with(Gauge::default) - } - - fn sent_bytes(&mut self, - labels: &Arc) - -> &mut Counter { - self.sent_bytes.values - .entry(labels.clone()) - .or_insert_with(Counter::default) - } - - fn received_bytes(&mut self, - labels: &Arc) - -> &mut Counter { - self.received_bytes.values - .entry(labels.clone()) - .or_insert_with(Counter::default) + fn tcp(&mut self) -> &mut TcpMetrics { + &mut self.tcp } } @@ -324,19 +215,106 @@ impl fmt::Display for Metrics { writeln!(f, "{}", self.request_total)?; writeln!(f, "{}", self.response_total)?; writeln!(f, "{}", self.response_latency)?; - writeln!(f, "{}", self.tcp_accept_open_total)?; - writeln!(f, "{}", self.tcp_accept_close_total)?; - writeln!(f, "{}", self.tcp_connect_open_total)?; - writeln!(f, "{}", self.tcp_connect_close_total)?; - writeln!(f, "{}", self.tcp_connection_duration)?; - writeln!(f, "{}", self.tcp_connections_open)?; - writeln!(f, "{}", self.sent_bytes)?; - writeln!(f, "{}", self.received_bytes)?; + writeln!(f, "{}", self.tcp)?; + writeln!(f, "process_start_time_seconds {}", self.start_time)?; Ok(()) } } +// ===== impl TcpMetrics ===== + +impl TcpMetrics { + pub fn new() -> TcpMetrics { + let open_total = Metric::>::new( + "tcp_open_total", + "A counter of the total number of transport connections.", + ); + + let close_total = Metric::>::new( + "tcp_close_total", + "A counter of the total number of transport connections.", + ); + + let connection_duration = Metric::>::new( + "tcp_connection_duration_ms", + "A histogram of the duration of the lifetime of connections, in milliseconds", + ); + + let open_connections = Metric::>::new( + "tcp_open_connections", + "A gauge of the number of transport connections currently open.", + ); + + let read_bytes_total = Metric::>::new( + "tcp_read_bytes_total", + "A counter of the total number of recieved bytes." + ); + + let write_bytes_total = Metric::>::new( + "tcp_write_bytes_total", + "A counter of the total number of sent bytes." + ); + + Self { + open_total, + close_total, + connection_duration, + open_connections, + read_bytes_total, + write_bytes_total, + } + } + + fn open_total(&mut self, labels: &Arc) -> &mut Counter { + self.open_total.values + .entry(labels.clone()) + .or_insert_with(Default::default) + } + + fn close_total(&mut self, labels: &Arc) -> &mut Counter { + self.close_total.values + .entry(labels.clone()) + .or_insert_with(Counter::default) + } + + fn connection_duration(&mut self, labels: &Arc) -> &mut Histogram { + self.connection_duration.values + .entry(labels.clone()) + .or_insert_with(Histogram::default) + } + + fn open_connections(&mut self, labels: &Arc) -> &mut Gauge { + self.open_connections.values + .entry(labels.clone()) + .or_insert_with(Gauge::default) + } + + fn write_bytes_total(&mut self, labels: &Arc) -> &mut Counter { + self.write_bytes_total.values + .entry(labels.clone()) + .or_insert_with(Counter::default) + } + + fn read_bytes_total(&mut self, labels: &Arc) -> &mut Counter { + self.read_bytes_total.values + .entry(labels.clone()) + .or_insert_with(Counter::default) + } +} + +impl fmt::Display for TcpMetrics { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + writeln!(f, "{}", self.open_total)?; + writeln!(f, "{}", self.close_total)?; + writeln!(f, "{}", self.connection_duration)?; + writeln!(f, "{}", self.open_connections)?; + writeln!(f, "{}", self.write_bytes_total)?; + writeln!(f, "{}", self.read_bytes_total)?; + + Ok(()) + } +} // ===== impl Counter ===== @@ -583,46 +561,31 @@ impl Aggregate { Event::TransportOpen(ref ctx) => { let labels = Arc::new(TransportLabels::new(ctx)); self.update(|metrics| { - match ctx.as_ref() { - &ctx::transport::Ctx::Server(_) => { - *metrics.tcp_accept_open_total(&labels).incr(); - *metrics.tcp_connections_open(&labels).incr(); - }, - &ctx::transport::Ctx::Client(_) => { - *metrics.tcp_connect_open_total(&labels).incr(); - }, - } + *metrics.tcp().open_total(&labels).incr(); + *metrics.tcp().open_connections(&labels).incr(); }) }, Event::TransportClose(ref ctx, ref close) => { - let labels = Arc::new(TransportCloseLabels::new(ctx, close)); + let labels = Arc::new(TransportLabels::new(ctx)); + let close_labels = Arc::new(TransportCloseLabels::new(ctx, close)); self.update(|metrics| { - *metrics.tcp_connection_duration(&labels) += close.duration; - *metrics.sent_bytes(&labels) += close.tx_bytes as u64; - *metrics.received_bytes(&labels) += close.tx_bytes as u64; - match ctx.as_ref() { - &ctx::transport::Ctx::Server(_) => { - *metrics.tcp_accept_close_total(&labels).incr(); - // We don't use the accessor method here like we do - // for all the other metrics so that we can just - // use the transport open labels in `labels` - // without having to ref-count it separately. Since - // we're handling a close event, we expect those - // labels to already be in the map from when the - // transport was opened. - *metrics.tcp_connections_open.values - .get_mut(&labels.transport) - .expect( - "observed TransportClose event for a \ - transport that was not counted" - ) - .decr(); - }, - &ctx::transport::Ctx::Client(_) => { - *metrics.tcp_connect_close_total(&labels).incr(); - }, - }; + *metrics.tcp().write_bytes_total(&labels) += close.tx_bytes as u64; + *metrics.tcp().read_bytes_total(&labels) += close.rx_bytes as u64; + + *metrics.tcp().connection_duration(&close_labels) += close.duration; + *metrics.tcp().close_total(&close_labels).incr(); + + let metrics = metrics.tcp().open_connections.values.get_mut(&labels); + debug_assert!(metrics.is_some()); + match metrics { + Some(m) => { + *m.decr(); + } + None => { + error!("Closed transport missing from metrics registry: {{{}}}", labels); + } + } }) }, }; diff --git a/proxy/tests/telemetry.rs b/proxy/tests/telemetry.rs index 7907a1b21..fa882c291 100644 --- a/proxy/tests/telemetry.rs +++ b/proxy/tests/telemetry.rs @@ -75,18 +75,18 @@ impl Fixture { } impl TcpFixture { - fn server() -> server::Listening { - let msg1 = "custom tcp hello"; - let msg2 = "custom tcp bye"; + const HELLO_MSG: &'static str = "custom tcp hello"; + const BYE_MSG: &'static str = "custom tcp bye"; + fn server() -> server::Listening { server::tcp() .accept(move |read| { - assert_eq!(read, msg1.as_bytes()); - msg2 + assert_eq!(read, Self::HELLO_MSG.as_bytes()); + TcpFixture::BYE_MSG }) .accept(move |read| { - assert_eq!(read, msg1.as_bytes()); - msg2 + assert_eq!(read, Self::HELLO_MSG.as_bytes()); + TcpFixture::BYE_MSG }) .run() } @@ -680,12 +680,12 @@ mod transport { info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); assert_contains!(metrics.get("/metrics"), - "{direction=\"inbound\"} 1" + "tcp_open_total{direction=\"inbound\",peer=\"src\"} 1" ); // drop the client to force the connection to close. drop(client); assert_contains!(metrics.get("/metrics"), - "tcp_accept_close_total{direction=\"inbound\",classification=\"success\"} 1" + "tcp_close_total{direction=\"inbound\",peer=\"src\",classification=\"success\"} 1" ); // create a new client to force a new connection @@ -694,12 +694,12 @@ mod transport { info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); assert_contains!(metrics.get("/metrics"), - "tcp_accept_open_total{direction=\"inbound\"} 2" + "tcp_open_total{direction=\"inbound\",peer=\"src\"} 2" ); // drop the client to force the connection to close. drop(client); assert_contains!(metrics.get("/metrics"), - "tcp_accept_close_total{direction=\"inbound\",classification=\"success\"} 2" + "tcp_close_total{direction=\"inbound\",peer=\"src\",classification=\"success\"} 2" ); } @@ -712,7 +712,7 @@ mod transport { info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); assert_contains!(metrics.get("/metrics"), - "tcp_connect_open_total{direction=\"inbound\"} 1"); + "tcp_open_total{direction=\"inbound\",peer=\"dst\"} 1"); // create a new client to force a new connection let client = client::new(proxy.inbound, "tele.test.svc.cluster.local"); @@ -721,7 +721,7 @@ mod transport { assert_eq!(client.get("/"), "hello"); // server connection should be pooled assert_contains!(metrics.get("/metrics"), - "tcp_connect_open_total{direction=\"inbound\"} 1"); + "tcp_open_total{direction=\"inbound\",peer=\"dst\"} 1"); } #[test] @@ -733,12 +733,12 @@ mod transport { info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); assert_contains!(metrics.get("/metrics"), - "tcp_accept_open_total{direction=\"outbound\"} 1" + "tcp_open_total{direction=\"outbound\",peer=\"src\"} 1" ); // drop the client to force the connection to close. drop(client); assert_contains!(metrics.get("/metrics"), - "tcp_accept_close_total{direction=\"outbound\",classification=\"success\"} 1" + "tcp_close_total{direction=\"outbound\",peer=\"src\",classification=\"success\"} 1" ); // create a new client to force a new connection @@ -747,12 +747,12 @@ mod transport { info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); assert_contains!(metrics.get("/metrics"), - "tcp_accept_open_total{direction=\"outbound\"} 2" + "tcp_open_total{direction=\"outbound\",peer=\"src\"} 2" ); // drop the client to force the connection to close. drop(client); assert_contains!(metrics.get("/metrics"), - "tcp_accept_close_total{direction=\"outbound\",classification=\"success\"} 2" + "tcp_close_total{direction=\"outbound\",peer=\"src\",classification=\"success\"} 2" ); } @@ -765,7 +765,7 @@ mod transport { info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); assert_contains!(metrics.get("/metrics"), - "tcp_connect_open_total{direction=\"outbound\"} 1"); + "tcp_open_total{direction=\"outbound\",peer=\"dst\"} 1"); // create a new client to force a new connection let client2 = client::new(proxy.outbound, "tele.test.svc.cluster.local"); @@ -774,7 +774,7 @@ mod transport { assert_eq!(client2.get("/"), "hello"); // server connection should be pooled assert_contains!(metrics.get("/metrics"), - "tcp_connect_open_total{direction=\"outbound\"} 1"); + "tcp_open_total{direction=\"outbound\",peer=\"dst\"} 1"); } #[test] @@ -784,15 +784,12 @@ mod transport { let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::inbound(); - let msg1 = "custom tcp hello"; - let msg2 = "custom tcp bye"; - let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); assert_contains!(metrics.get("/metrics"), - "tcp_connect_open_total{direction=\"inbound\"} 1"); + "tcp_open_total{direction=\"inbound\",peer=\"dst\"} 1"); } #[test] @@ -802,31 +799,28 @@ mod transport { let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::inbound(); - let msg1 = "custom tcp hello"; - let msg2 = "custom tcp bye"; - let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); assert_contains!(metrics.get("/metrics"), - "tcp_accept_open_total{direction=\"inbound\"} 1"); + "tcp_open_total{direction=\"inbound\",peer=\"src\"} 1"); drop(tcp_client); assert_contains!(metrics.get("/metrics"), - "tcp_accept_close_total{direction=\"inbound\",classification=\"success\"} 1"); + "tcp_close_total{direction=\"inbound\",peer=\"src\",classification=\"success\"} 1"); let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); assert_contains!(metrics.get("/metrics"), - "tcp_accept_open_total{direction=\"inbound\"} 2"); + "tcp_open_total{direction=\"inbound\",peer=\"src\"} 2"); drop(tcp_client); assert_contains!(metrics.get("/metrics"), - "tcp_accept_close_total{direction=\"inbound\",classification=\"success\"} 2"); + "tcp_close_total{direction=\"inbound\",peer=\"src\",classification=\"success\"} 2"); } // https://github.com/runconduit/conduit/issues/831 @@ -837,73 +831,85 @@ mod transport { let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::inbound(); - let msg1 = "custom tcp hello"; - let msg2 = "custom tcp bye"; - let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); drop(tcp_client); // TODO: make assertions about buckets - assert_contains!(metrics.get("/metrics"), - "tcp_connection_duration_ms_count{direction=\"inbound\",classification=\"success\"} 2"); + let out = metrics.get("/metrics"); + assert_contains!(out, + "tcp_connection_duration_ms_count{direction=\"inbound\",peer=\"src\",classification=\"success\"} 1"); + assert_contains!(out, + "tcp_connection_duration_ms_count{direction=\"inbound\",peer=\"dst\",classification=\"success\"} 1"); let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); - assert_contains!(metrics.get("/metrics"), - "tcp_connection_duration_ms_count{direction=\"inbound\",classification=\"success\"} 2"); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); + let out = metrics.get("/metrics"); + assert_contains!(out, + "tcp_connection_duration_ms_count{direction=\"inbound\",peer=\"src\",classification=\"success\"} 1"); + assert_contains!(out, + "tcp_connection_duration_ms_count{direction=\"inbound\",peer=\"dst\",classification=\"success\"} 1"); drop(tcp_client); - assert_contains!(metrics.get("/metrics"), - "tcp_connection_duration_ms_count{direction=\"inbound\",classification=\"success\"} 4"); - } + let out = metrics.get("/metrics"); + assert_contains!(out, + "tcp_connection_duration_ms_count{direction=\"inbound\",peer=\"src\",classification=\"success\"} 2"); + assert_contains!(out, + "tcp_connection_duration_ms_count{direction=\"inbound\",peer=\"dst\",classification=\"success\"} 2"); } #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] - fn inbound_tcp_sent_bytes() { + fn inbound_tcp_write_bytes_total() { let _ = env_logger::try_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::inbound(); - - let msg1 = "custom tcp hello"; - let msg2 = "custom tcp bye"; - let expected = format!( - "sent_bytes{{direction=\"inbound\",classification=\"success\"}} {}", - msg1.len() + msg2.len() + let src_expected = format!( + "tcp_write_bytes_total{{direction=\"inbound\",peer=\"src\"}} {}", + TcpFixture::BYE_MSG.len() + ); + let dst_expected = format!( + "tcp_write_bytes_total{{direction=\"inbound\",peer=\"dst\"}} {}", + TcpFixture::HELLO_MSG.len() ); let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); drop(tcp_client); - assert_contains!(metrics.get("/metrics"), &expected); + + let out = metrics.get("/metrics"); + assert_contains!(out, &src_expected); + assert_contains!(out, &dst_expected); } #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] - fn inbound_tcp_received_bytes() { + fn inbound_tcp_read_bytes_total() { let _ = env_logger::try_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::inbound(); - - let msg1 = "custom tcp hello"; - let msg2 = "custom tcp bye"; - let expected = format!( - "sent_bytes{{direction=\"inbound\",classification=\"success\"}} {}", - msg1.len() + msg2.len() + let src_expected = format!( + "tcp_read_bytes_total{{direction=\"inbound\",peer=\"src\"}} {}", + TcpFixture::HELLO_MSG.len() + ); + let dst_expected = format!( + "tcp_read_bytes_total{{direction=\"inbound\",peer=\"dst\"}} {}", + TcpFixture::BYE_MSG.len() ); let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); drop(tcp_client); - assert_contains!(metrics.get("/metrics"), &expected); - } + + let out = metrics.get("/metrics"); + assert_contains!(out, &src_expected); + assert_contains!(out, &dst_expected); } #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] @@ -912,15 +918,12 @@ mod transport { let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::outbound(); - let msg1 = "custom tcp hello"; - let msg2 = "custom tcp bye"; - let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); assert_contains!(metrics.get("/metrics"), - "tcp_connect_open_total{direction=\"outbound\"} 1"); + "tcp_open_total{direction=\"outbound\",peer=\"dst\"} 1"); } #[test] @@ -930,31 +933,28 @@ mod transport { let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::outbound(); - let msg1 = "custom tcp hello"; - let msg2 = "custom tcp bye"; - let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); assert_contains!(metrics.get("/metrics"), - "tcp_accept_open_total{direction=\"outbound\"} 1"); + "tcp_open_total{direction=\"outbound\",peer=\"src\"} 1"); drop(tcp_client); assert_contains!(metrics.get("/metrics"), - "tcp_accept_close_total{direction=\"outbound\",classification=\"success\"} 1"); + "tcp_close_total{direction=\"outbound\",peer=\"src\",classification=\"success\"} 1"); let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); assert_contains!(metrics.get("/metrics"), - "tcp_accept_open_total{direction=\"outbound\"} 2"); + "tcp_open_total{direction=\"outbound\",peer=\"src\"} 2"); drop(tcp_client); assert_contains!(metrics.get("/metrics"), - "tcp_accept_close_total{direction=\"outbound\",classification=\"success\"} 2"); + "tcp_close_total{direction=\"outbound\",peer=\"src\",classification=\"success\"} 2"); } #[test] @@ -964,108 +964,118 @@ mod transport { let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::outbound(); - let msg1 = "custom tcp hello"; - let msg2 = "custom tcp bye"; - let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); drop(tcp_client); // TODO: make assertions about buckets - assert_contains!(metrics.get("/metrics"), - "tcp_connection_duration_ms_count{direction=\"outbound\",classification=\"success\"} 2"); + let out = metrics.get("/metrics"); + assert_contains!(out, + "tcp_connection_duration_ms_count{direction=\"outbound\",peer=\"src\",classification=\"success\"} 1"); + assert_contains!(out, + "tcp_connection_duration_ms_count{direction=\"outbound\",peer=\"dst\",classification=\"success\"} 1"); let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); - assert_contains!(metrics.get("/metrics"), - "tcp_connection_duration_ms_count{direction=\"outbound\",classification=\"success\"} 2"); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); + let out = metrics.get("/metrics"); + assert_contains!(out, + "tcp_connection_duration_ms_count{direction=\"outbound\",peer=\"src\",classification=\"success\"} 1"); + assert_contains!(out, + "tcp_connection_duration_ms_count{direction=\"outbound\",peer=\"dst\",classification=\"success\"} 1"); drop(tcp_client); - assert_contains!(metrics.get("/metrics"), - "tcp_connection_duration_ms_count{direction=\"outbound\",classification=\"success\"} 4"); + let out = metrics.get("/metrics"); + assert_contains!(out, + "tcp_connection_duration_ms_count{direction=\"outbound\",peer=\"src\",classification=\"success\"} 2"); + assert_contains!(out, + "tcp_connection_duration_ms_count{direction=\"outbound\",peer=\"dst\",classification=\"success\"} 2"); } #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] - fn outbound_tcp_sent_bytes() { + fn outbound_tcp_write_bytes_total() { let _ = env_logger::try_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::outbound(); - - let msg1 = "custom tcp hello"; - let msg2 = "custom tcp bye"; - let expected = format!( - "sent_bytes{{direction=\"outbound\",classification=\"success\"}} {}", - msg1.len() + msg2.len() + let src_expected = format!( + "tcp_write_bytes_total{{direction=\"outbound\",peer=\"src\"}} {}", + TcpFixture::BYE_MSG.len() + ); + let dst_expected = format!( + "tcp_write_bytes_total{{direction=\"outbound\",peer=\"dst\"}} {}", + TcpFixture::HELLO_MSG.len() ); let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); drop(tcp_client); - assert_contains!(metrics.get("/metrics"), &expected); - } + + let out = metrics.get("/metrics"); + assert_contains!(out, &src_expected); + assert_contains!(out, &dst_expected); } #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] - fn outbound_tcp_received_bytes() { + fn outbound_tcp_read_bytes_total() { let _ = env_logger::try_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::outbound(); - - let msg1 = "custom tcp hello"; - let msg2 = "custom tcp bye"; - let expected = format!( - "received_bytes{{direction=\"outbound\",classification=\"success\"}} {}", - msg1.len() + msg2.len() + let src_expected = format!( + "tcp_read_bytes_total{{direction=\"outbound\",peer=\"src\"}} {}", + TcpFixture::HELLO_MSG.len() + ); + let dst_expected = format!( + "tcp_read_bytes_total{{direction=\"outbound\",peer=\"dst\"}} {}", + TcpFixture::BYE_MSG.len() ); let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); drop(tcp_client); - assert_contains!(metrics.get("/metrics"), &expected); + + let out = metrics.get("/metrics"); + assert_contains!(out, &src_expected); + assert_contains!(out, &dst_expected); } #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] - fn outbound_tcp_connections_open() { + fn outbound_tcp_open_connections() { let _ = env_logger::try_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::outbound(); - let msg1 = "custom tcp hello"; - let msg2 = "custom tcp bye"; - let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); assert_contains!(metrics.get("/metrics"), - "tcp_connections_open{direction=\"outbound\"} 1"); + "tcp_open_connections{direction=\"outbound\",peer=\"src\"} 1"); drop(tcp_client); assert_contains!(metrics.get("/metrics"), - "tcp_connections_open{direction=\"outbound\"} 0"); + "tcp_open_connections{direction=\"outbound\",peer=\"src\"} 0"); let tcp_client = client.connect(); - tcp_client.write(msg1); - assert_eq!(tcp_client.read(), msg2.as_bytes()); + tcp_client.write(TcpFixture::HELLO_MSG); + assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes()); assert_contains!(metrics.get("/metrics"), - "tcp_connections_open{direction=\"outbound\"} 1"); + "tcp_open_connections{direction=\"outbound\",peer=\"src\"} 1"); drop(tcp_client); assert_contains!(metrics.get("/metrics"), - "tcp_connections_open{direction=\"outbound\"} 0"); + "tcp_open_connections{direction=\"outbound\",peer=\"src\"} 0"); } #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] - fn outbound_http_tcp_connections_open() { + fn outbound_http_tcp_open_connections() { let _ = env_logger::try_init(); let Fixture { client, metrics, proxy } = Fixture::outbound(); @@ -1074,10 +1084,10 @@ mod transport { assert_eq!(client.get("/"), "hello"); assert_contains!(metrics.get("/metrics"), - "tcp_connections_open{direction=\"outbound\"} 1"); + "tcp_open_connections{direction=\"outbound\",peer=\"src\"} 1"); drop(client); assert_contains!(metrics.get("/metrics"), - "tcp_connections_open{direction=\"outbound\"} 0"); + "tcp_open_connections{direction=\"outbound\",peer=\"src\"} 0"); // create a new client to force a new connection let client = client::new(proxy.outbound, "tele.test.svc.cluster.local"); @@ -1085,11 +1095,11 @@ mod transport { info!("client.get(/)"); assert_eq!(client.get("/"), "hello"); assert_contains!(metrics.get("/metrics"), - "tcp_connections_open{direction=\"outbound\"} 1"); + "tcp_open_connections{direction=\"outbound\",peer=\"src\"} 1"); drop(client); assert_contains!(metrics.get("/metrics"), - "tcp_connections_open{direction=\"outbound\"} 0"); + "tcp_open_connections{direction=\"outbound\",peer=\"src\"} 0"); } }