Introduce the `peer` label to transport metrics (#848)

Previously, the proxy exposed separate _accept_ and _connect_ metrics
for some metric types, but not for all. This leads to confusing
aggregations, particularly for read and write taotals.

This change primarily introduces the `peer` prometheus label (with
possible values _src_ or _dst_) to indicate which side of the proxy the
metric reflects.

Additionally, the `received_bytes` and `sent_bytes` metrics have been
renamed as `tcp_read_bytes_total` and `tcp_write_bytes_total`,
resectively. This more naturally fits into existing idioms.  Stream
classification is not applied to these metrics, as we plan to increment
them throughout stream lifetime and not only on close.

The `tcp_connections_open` metric has also been renamed to
`tcp_open_connections` to reflect Prometheus idioms.

Finally, `msg1` and `msg2` have been constified in telemetry test
fixtures so that tests are somewhat easier to read.
This commit is contained in:
Oliver Gould 2018-04-25 14:06:33 -07:00 committed by GitHub
parent 0c23ad416b
commit d4d0e579c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 294 additions and 308 deletions

View File

@ -44,8 +44,13 @@ pub struct ResponseLabels {
pub struct TransportLabels {
/// Was the transport opened in the inbound or outbound direction?
direction: Direction,
peer: Peer,
}
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub enum Peer { Src, Dst }
/// Labels describing the end of a TCP connection
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct TransportCloseLabels {
@ -289,13 +294,21 @@ impl TransportLabels {
pub fn new(ctx: &ctx::transport::Ctx) -> Self {
TransportLabels {
direction: Direction::from_context(&ctx.proxy()),
peer: match *ctx {
ctx::transport::Ctx::Server(_) => Peer::Src,
ctx::transport::Ctx::Client(_) => Peer::Dst,
},
}
}
}
impl fmt::Display for TransportLabels {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.direction, f)
fmt::Display::fmt(&self.direction, f)?;
f.pad(match self.peer {
Peer::Src => ",peer=\"src\"",
Peer::Dst => ",peer=\"dst\"",
})
}
}

View File

@ -68,21 +68,23 @@ struct Metrics {
response_total: Metric<Counter, Arc<ResponseLabels>>,
response_latency: Metric<Histogram, Arc<ResponseLabels>>,
tcp_accept_open_total: Metric<Counter, Arc<TransportLabels>>,
tcp_accept_close_total: Metric<Counter, Arc<TransportCloseLabels>>,
tcp_connect_open_total: Metric<Counter, Arc<TransportLabels>>,
tcp_connect_close_total: Metric<Counter, Arc<TransportCloseLabels>>,
tcp_connection_duration: Metric<Histogram, Arc<TransportCloseLabels>>,
tcp_connections_open: Metric<Gauge, Arc<TransportLabels>>,
sent_bytes: Metric<Counter, Arc<TransportCloseLabels>>,
received_bytes: Metric<Counter, Arc<TransportCloseLabels>>,
tcp: TcpMetrics,
start_time: u64,
}
#[derive(Debug, Clone)]
struct TcpMetrics {
open_total: Metric<Counter, Arc<TransportLabels>>,
close_total: Metric<Counter, Arc<TransportCloseLabels>>,
connection_duration: Metric<Histogram, Arc<TransportCloseLabels>>,
open_connections: Metric<Gauge, Arc<TransportLabels>>,
write_bytes_total: Metric<Counter, Arc<TransportLabels>>,
read_bytes_total: Metric<Counter, Arc<TransportLabels>>,
}
#[derive(Debug, Clone)]
struct Metric<M, L: Hash + Eq> {
name: &'static str,
@ -170,62 +172,11 @@ impl Metrics {
stream has completed.",
);
let tcp_accept_open_total = Metric::<Counter, Arc<TransportLabels>>::new(
"tcp_accept_open_total",
"A counter of the total number of transport connections which \
have been accepted by the proxy.",
);
let tcp_accept_close_total = Metric::<Counter, Arc<TransportCloseLabels>>::new(
"tcp_accept_close_total",
"A counter of the total number of transport connections accepted \
by the proxy which have been closed.",
);
let tcp_connect_open_total = Metric::<Counter, Arc<TransportLabels>>::new(
"tcp_connect_open_total",
"A counter of the total number of transport connections which \
have been opened by the proxy.",
);
let tcp_connect_close_total = Metric::<Counter, Arc<TransportCloseLabels>>::new(
"tcp_connect_close_total",
"A counter of the total number of transport connections opened \
by the proxy which have been closed.",
);
let tcp_connection_duration = Metric::<Histogram, Arc<TransportCloseLabels>>::new(
"tcp_connection_duration_ms",
"A histogram of the duration of the lifetime of a connection, in milliseconds",
);
let tcp_connections_open = Metric::<Gauge, Arc<TransportLabels>>::new(
"tcp_connections_open",
"A gauge of the number of transport connections currently open.",
);
let received_bytes = Metric::<Counter, Arc<TransportCloseLabels>>::new(
"received_bytes",
"A counter of the total number of recieved bytes."
);
let sent_bytes = Metric::<Counter, Arc<TransportCloseLabels>>::new(
"sent_bytes",
"A counter of the total number of sent bytes."
);
Metrics {
request_total,
response_total,
response_latency,
tcp_accept_open_total,
tcp_accept_close_total,
tcp_connect_open_total,
tcp_connect_close_total,
tcp_connection_duration,
tcp_connections_open,
received_bytes,
sent_bytes,
tcp: TcpMetrics::new(),
start_time,
}
}
@ -254,68 +205,8 @@ impl Metrics {
.or_insert_with(Counter::default)
}
fn tcp_accept_open_total(&mut self,
labels: &Arc<TransportLabels>)
-> &mut Counter {
self.tcp_accept_open_total.values
.entry(labels.clone())
.or_insert_with(Default::default)
}
fn tcp_accept_close_total(&mut self,
labels: &Arc<TransportCloseLabels>)
-> &mut Counter {
self.tcp_accept_close_total.values
.entry(labels.clone())
.or_insert_with(Counter::default)
}
fn tcp_connect_open_total(&mut self,
labels: &Arc<TransportLabels>)
-> &mut Counter {
self.tcp_connect_open_total.values
.entry(labels.clone())
.or_insert_with(Default::default)
}
fn tcp_connect_close_total(&mut self,
labels: &Arc<TransportCloseLabels>)
-> &mut Counter {
self.tcp_connect_close_total.values
.entry(labels.clone())
.or_insert_with(Counter::default)
}
fn tcp_connection_duration(&mut self,
labels: &Arc<TransportCloseLabels>)
-> &mut Histogram {
self.tcp_connection_duration.values
.entry(labels.clone())
.or_insert_with(Histogram::default)
}
fn tcp_connections_open(&mut self,
labels: &Arc<TransportLabels>)
-> &mut Gauge {
self.tcp_connections_open.values
.entry(labels.clone())
.or_insert_with(Gauge::default)
}
fn sent_bytes(&mut self,
labels: &Arc<TransportCloseLabels>)
-> &mut Counter {
self.sent_bytes.values
.entry(labels.clone())
.or_insert_with(Counter::default)
}
fn received_bytes(&mut self,
labels: &Arc<TransportCloseLabels>)
-> &mut Counter {
self.received_bytes.values
.entry(labels.clone())
.or_insert_with(Counter::default)
fn tcp(&mut self) -> &mut TcpMetrics {
&mut self.tcp
}
}
@ -324,19 +215,106 @@ impl fmt::Display for Metrics {
writeln!(f, "{}", self.request_total)?;
writeln!(f, "{}", self.response_total)?;
writeln!(f, "{}", self.response_latency)?;
writeln!(f, "{}", self.tcp_accept_open_total)?;
writeln!(f, "{}", self.tcp_accept_close_total)?;
writeln!(f, "{}", self.tcp_connect_open_total)?;
writeln!(f, "{}", self.tcp_connect_close_total)?;
writeln!(f, "{}", self.tcp_connection_duration)?;
writeln!(f, "{}", self.tcp_connections_open)?;
writeln!(f, "{}", self.sent_bytes)?;
writeln!(f, "{}", self.received_bytes)?;
writeln!(f, "{}", self.tcp)?;
writeln!(f, "process_start_time_seconds {}", self.start_time)?;
Ok(())
}
}
// ===== impl TcpMetrics =====
impl TcpMetrics {
pub fn new() -> TcpMetrics {
let open_total = Metric::<Counter, Arc<TransportLabels>>::new(
"tcp_open_total",
"A counter of the total number of transport connections.",
);
let close_total = Metric::<Counter, Arc<TransportCloseLabels>>::new(
"tcp_close_total",
"A counter of the total number of transport connections.",
);
let connection_duration = Metric::<Histogram, Arc<TransportCloseLabels>>::new(
"tcp_connection_duration_ms",
"A histogram of the duration of the lifetime of connections, in milliseconds",
);
let open_connections = Metric::<Gauge, Arc<TransportLabels>>::new(
"tcp_open_connections",
"A gauge of the number of transport connections currently open.",
);
let read_bytes_total = Metric::<Counter, Arc<TransportLabels>>::new(
"tcp_read_bytes_total",
"A counter of the total number of recieved bytes."
);
let write_bytes_total = Metric::<Counter, Arc<TransportLabels>>::new(
"tcp_write_bytes_total",
"A counter of the total number of sent bytes."
);
Self {
open_total,
close_total,
connection_duration,
open_connections,
read_bytes_total,
write_bytes_total,
}
}
fn open_total(&mut self, labels: &Arc<TransportLabels>) -> &mut Counter {
self.open_total.values
.entry(labels.clone())
.or_insert_with(Default::default)
}
fn close_total(&mut self, labels: &Arc<TransportCloseLabels>) -> &mut Counter {
self.close_total.values
.entry(labels.clone())
.or_insert_with(Counter::default)
}
fn connection_duration(&mut self, labels: &Arc<TransportCloseLabels>) -> &mut Histogram {
self.connection_duration.values
.entry(labels.clone())
.or_insert_with(Histogram::default)
}
fn open_connections(&mut self, labels: &Arc<TransportLabels>) -> &mut Gauge {
self.open_connections.values
.entry(labels.clone())
.or_insert_with(Gauge::default)
}
fn write_bytes_total(&mut self, labels: &Arc<TransportLabels>) -> &mut Counter {
self.write_bytes_total.values
.entry(labels.clone())
.or_insert_with(Counter::default)
}
fn read_bytes_total(&mut self, labels: &Arc<TransportLabels>) -> &mut Counter {
self.read_bytes_total.values
.entry(labels.clone())
.or_insert_with(Counter::default)
}
}
impl fmt::Display for TcpMetrics {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "{}", self.open_total)?;
writeln!(f, "{}", self.close_total)?;
writeln!(f, "{}", self.connection_duration)?;
writeln!(f, "{}", self.open_connections)?;
writeln!(f, "{}", self.write_bytes_total)?;
writeln!(f, "{}", self.read_bytes_total)?;
Ok(())
}
}
// ===== impl Counter =====
@ -583,46 +561,31 @@ impl Aggregate {
Event::TransportOpen(ref ctx) => {
let labels = Arc::new(TransportLabels::new(ctx));
self.update(|metrics| {
match ctx.as_ref() {
&ctx::transport::Ctx::Server(_) => {
*metrics.tcp_accept_open_total(&labels).incr();
*metrics.tcp_connections_open(&labels).incr();
},
&ctx::transport::Ctx::Client(_) => {
*metrics.tcp_connect_open_total(&labels).incr();
},
}
*metrics.tcp().open_total(&labels).incr();
*metrics.tcp().open_connections(&labels).incr();
})
},
Event::TransportClose(ref ctx, ref close) => {
let labels = Arc::new(TransportCloseLabels::new(ctx, close));
let labels = Arc::new(TransportLabels::new(ctx));
let close_labels = Arc::new(TransportCloseLabels::new(ctx, close));
self.update(|metrics| {
*metrics.tcp_connection_duration(&labels) += close.duration;
*metrics.sent_bytes(&labels) += close.tx_bytes as u64;
*metrics.received_bytes(&labels) += close.tx_bytes as u64;
match ctx.as_ref() {
&ctx::transport::Ctx::Server(_) => {
*metrics.tcp_accept_close_total(&labels).incr();
// We don't use the accessor method here like we do
// for all the other metrics so that we can just
// use the transport open labels in `labels`
// without having to ref-count it separately. Since
// we're handling a close event, we expect those
// labels to already be in the map from when the
// transport was opened.
*metrics.tcp_connections_open.values
.get_mut(&labels.transport)
.expect(
"observed TransportClose event for a \
transport that was not counted"
)
.decr();
},
&ctx::transport::Ctx::Client(_) => {
*metrics.tcp_connect_close_total(&labels).incr();
},
};
*metrics.tcp().write_bytes_total(&labels) += close.tx_bytes as u64;
*metrics.tcp().read_bytes_total(&labels) += close.rx_bytes as u64;
*metrics.tcp().connection_duration(&close_labels) += close.duration;
*metrics.tcp().close_total(&close_labels).incr();
let metrics = metrics.tcp().open_connections.values.get_mut(&labels);
debug_assert!(metrics.is_some());
match metrics {
Some(m) => {
*m.decr();
}
None => {
error!("Closed transport missing from metrics registry: {{{}}}", labels);
}
}
})
},
};

View File

@ -75,18 +75,18 @@ impl Fixture {
}
impl TcpFixture {
fn server() -> server::Listening {
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
const HELLO_MSG: &'static str = "custom tcp hello";
const BYE_MSG: &'static str = "custom tcp bye";
fn server() -> server::Listening {
server::tcp()
.accept(move |read| {
assert_eq!(read, msg1.as_bytes());
msg2
assert_eq!(read, Self::HELLO_MSG.as_bytes());
TcpFixture::BYE_MSG
})
.accept(move |read| {
assert_eq!(read, msg1.as_bytes());
msg2
assert_eq!(read, Self::HELLO_MSG.as_bytes());
TcpFixture::BYE_MSG
})
.run()
}
@ -680,12 +680,12 @@ mod transport {
info!("client.get(/)");
assert_eq!(client.get("/"), "hello");
assert_contains!(metrics.get("/metrics"),
"{direction=\"inbound\"} 1"
"tcp_open_total{direction=\"inbound\",peer=\"src\"} 1"
);
// drop the client to force the connection to close.
drop(client);
assert_contains!(metrics.get("/metrics"),
"tcp_accept_close_total{direction=\"inbound\",classification=\"success\"} 1"
"tcp_close_total{direction=\"inbound\",peer=\"src\",classification=\"success\"} 1"
);
// create a new client to force a new connection
@ -694,12 +694,12 @@ mod transport {
info!("client.get(/)");
assert_eq!(client.get("/"), "hello");
assert_contains!(metrics.get("/metrics"),
"tcp_accept_open_total{direction=\"inbound\"} 2"
"tcp_open_total{direction=\"inbound\",peer=\"src\"} 2"
);
// drop the client to force the connection to close.
drop(client);
assert_contains!(metrics.get("/metrics"),
"tcp_accept_close_total{direction=\"inbound\",classification=\"success\"} 2"
"tcp_close_total{direction=\"inbound\",peer=\"src\",classification=\"success\"} 2"
);
}
@ -712,7 +712,7 @@ mod transport {
info!("client.get(/)");
assert_eq!(client.get("/"), "hello");
assert_contains!(metrics.get("/metrics"),
"tcp_connect_open_total{direction=\"inbound\"} 1");
"tcp_open_total{direction=\"inbound\",peer=\"dst\"} 1");
// create a new client to force a new connection
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
@ -721,7 +721,7 @@ mod transport {
assert_eq!(client.get("/"), "hello");
// server connection should be pooled
assert_contains!(metrics.get("/metrics"),
"tcp_connect_open_total{direction=\"inbound\"} 1");
"tcp_open_total{direction=\"inbound\",peer=\"dst\"} 1");
}
#[test]
@ -733,12 +733,12 @@ mod transport {
info!("client.get(/)");
assert_eq!(client.get("/"), "hello");
assert_contains!(metrics.get("/metrics"),
"tcp_accept_open_total{direction=\"outbound\"} 1"
"tcp_open_total{direction=\"outbound\",peer=\"src\"} 1"
);
// drop the client to force the connection to close.
drop(client);
assert_contains!(metrics.get("/metrics"),
"tcp_accept_close_total{direction=\"outbound\",classification=\"success\"} 1"
"tcp_close_total{direction=\"outbound\",peer=\"src\",classification=\"success\"} 1"
);
// create a new client to force a new connection
@ -747,12 +747,12 @@ mod transport {
info!("client.get(/)");
assert_eq!(client.get("/"), "hello");
assert_contains!(metrics.get("/metrics"),
"tcp_accept_open_total{direction=\"outbound\"} 2"
"tcp_open_total{direction=\"outbound\",peer=\"src\"} 2"
);
// drop the client to force the connection to close.
drop(client);
assert_contains!(metrics.get("/metrics"),
"tcp_accept_close_total{direction=\"outbound\",classification=\"success\"} 2"
"tcp_close_total{direction=\"outbound\",peer=\"src\",classification=\"success\"} 2"
);
}
@ -765,7 +765,7 @@ mod transport {
info!("client.get(/)");
assert_eq!(client.get("/"), "hello");
assert_contains!(metrics.get("/metrics"),
"tcp_connect_open_total{direction=\"outbound\"} 1");
"tcp_open_total{direction=\"outbound\",peer=\"dst\"} 1");
// create a new client to force a new connection
let client2 = client::new(proxy.outbound, "tele.test.svc.cluster.local");
@ -774,7 +774,7 @@ mod transport {
assert_eq!(client2.get("/"), "hello");
// server connection should be pooled
assert_contains!(metrics.get("/metrics"),
"tcp_connect_open_total{direction=\"outbound\"} 1");
"tcp_open_total{direction=\"outbound\",peer=\"dst\"} 1");
}
#[test]
@ -784,15 +784,12 @@ mod transport {
let TcpFixture { client, metrics, proxy: _proxy } =
TcpFixture::inbound();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
assert_contains!(metrics.get("/metrics"),
"tcp_connect_open_total{direction=\"inbound\"} 1");
"tcp_open_total{direction=\"inbound\",peer=\"dst\"} 1");
}
#[test]
@ -802,31 +799,28 @@ mod transport {
let TcpFixture { client, metrics, proxy: _proxy } =
TcpFixture::inbound();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
assert_contains!(metrics.get("/metrics"),
"tcp_accept_open_total{direction=\"inbound\"} 1");
"tcp_open_total{direction=\"inbound\",peer=\"src\"} 1");
drop(tcp_client);
assert_contains!(metrics.get("/metrics"),
"tcp_accept_close_total{direction=\"inbound\",classification=\"success\"} 1");
"tcp_close_total{direction=\"inbound\",peer=\"src\",classification=\"success\"} 1");
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
assert_contains!(metrics.get("/metrics"),
"tcp_accept_open_total{direction=\"inbound\"} 2");
"tcp_open_total{direction=\"inbound\",peer=\"src\"} 2");
drop(tcp_client);
assert_contains!(metrics.get("/metrics"),
"tcp_accept_close_total{direction=\"inbound\",classification=\"success\"} 2");
"tcp_close_total{direction=\"inbound\",peer=\"src\",classification=\"success\"} 2");
}
// https://github.com/runconduit/conduit/issues/831
@ -837,73 +831,85 @@ mod transport {
let TcpFixture { client, metrics, proxy: _proxy } =
TcpFixture::inbound();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
drop(tcp_client);
// TODO: make assertions about buckets
assert_contains!(metrics.get("/metrics"),
"tcp_connection_duration_ms_count{direction=\"inbound\",classification=\"success\"} 2");
let out = metrics.get("/metrics");
assert_contains!(out,
"tcp_connection_duration_ms_count{direction=\"inbound\",peer=\"src\",classification=\"success\"} 1");
assert_contains!(out,
"tcp_connection_duration_ms_count{direction=\"inbound\",peer=\"dst\",classification=\"success\"} 1");
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
assert_contains!(metrics.get("/metrics"),
"tcp_connection_duration_ms_count{direction=\"inbound\",classification=\"success\"} 2");
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
let out = metrics.get("/metrics");
assert_contains!(out,
"tcp_connection_duration_ms_count{direction=\"inbound\",peer=\"src\",classification=\"success\"} 1");
assert_contains!(out,
"tcp_connection_duration_ms_count{direction=\"inbound\",peer=\"dst\",classification=\"success\"} 1");
drop(tcp_client);
assert_contains!(metrics.get("/metrics"),
"tcp_connection_duration_ms_count{direction=\"inbound\",classification=\"success\"} 4");
}
let out = metrics.get("/metrics");
assert_contains!(out,
"tcp_connection_duration_ms_count{direction=\"inbound\",peer=\"src\",classification=\"success\"} 2");
assert_contains!(out,
"tcp_connection_duration_ms_count{direction=\"inbound\",peer=\"dst\",classification=\"success\"} 2"); }
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn inbound_tcp_sent_bytes() {
fn inbound_tcp_write_bytes_total() {
let _ = env_logger::try_init();
let TcpFixture { client, metrics, proxy: _proxy } =
TcpFixture::inbound();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let expected = format!(
"sent_bytes{{direction=\"inbound\",classification=\"success\"}} {}",
msg1.len() + msg2.len()
let src_expected = format!(
"tcp_write_bytes_total{{direction=\"inbound\",peer=\"src\"}} {}",
TcpFixture::BYE_MSG.len()
);
let dst_expected = format!(
"tcp_write_bytes_total{{direction=\"inbound\",peer=\"dst\"}} {}",
TcpFixture::HELLO_MSG.len()
);
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
drop(tcp_client);
assert_contains!(metrics.get("/metrics"), &expected);
let out = metrics.get("/metrics");
assert_contains!(out, &src_expected);
assert_contains!(out, &dst_expected);
}
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn inbound_tcp_received_bytes() {
fn inbound_tcp_read_bytes_total() {
let _ = env_logger::try_init();
let TcpFixture { client, metrics, proxy: _proxy } =
TcpFixture::inbound();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let expected = format!(
"sent_bytes{{direction=\"inbound\",classification=\"success\"}} {}",
msg1.len() + msg2.len()
let src_expected = format!(
"tcp_read_bytes_total{{direction=\"inbound\",peer=\"src\"}} {}",
TcpFixture::HELLO_MSG.len()
);
let dst_expected = format!(
"tcp_read_bytes_total{{direction=\"inbound\",peer=\"dst\"}} {}",
TcpFixture::BYE_MSG.len()
);
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
drop(tcp_client);
assert_contains!(metrics.get("/metrics"), &expected);
}
let out = metrics.get("/metrics");
assert_contains!(out, &src_expected);
assert_contains!(out, &dst_expected); }
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
@ -912,15 +918,12 @@ mod transport {
let TcpFixture { client, metrics, proxy: _proxy } =
TcpFixture::outbound();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
assert_contains!(metrics.get("/metrics"),
"tcp_connect_open_total{direction=\"outbound\"} 1");
"tcp_open_total{direction=\"outbound\",peer=\"dst\"} 1");
}
#[test]
@ -930,31 +933,28 @@ mod transport {
let TcpFixture { client, metrics, proxy: _proxy } =
TcpFixture::outbound();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
assert_contains!(metrics.get("/metrics"),
"tcp_accept_open_total{direction=\"outbound\"} 1");
"tcp_open_total{direction=\"outbound\",peer=\"src\"} 1");
drop(tcp_client);
assert_contains!(metrics.get("/metrics"),
"tcp_accept_close_total{direction=\"outbound\",classification=\"success\"} 1");
"tcp_close_total{direction=\"outbound\",peer=\"src\",classification=\"success\"} 1");
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
assert_contains!(metrics.get("/metrics"),
"tcp_accept_open_total{direction=\"outbound\"} 2");
"tcp_open_total{direction=\"outbound\",peer=\"src\"} 2");
drop(tcp_client);
assert_contains!(metrics.get("/metrics"),
"tcp_accept_close_total{direction=\"outbound\",classification=\"success\"} 2");
"tcp_close_total{direction=\"outbound\",peer=\"src\",classification=\"success\"} 2");
}
#[test]
@ -964,108 +964,118 @@ mod transport {
let TcpFixture { client, metrics, proxy: _proxy } =
TcpFixture::outbound();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
drop(tcp_client);
// TODO: make assertions about buckets
assert_contains!(metrics.get("/metrics"),
"tcp_connection_duration_ms_count{direction=\"outbound\",classification=\"success\"} 2");
let out = metrics.get("/metrics");
assert_contains!(out,
"tcp_connection_duration_ms_count{direction=\"outbound\",peer=\"src\",classification=\"success\"} 1");
assert_contains!(out,
"tcp_connection_duration_ms_count{direction=\"outbound\",peer=\"dst\",classification=\"success\"} 1");
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
assert_contains!(metrics.get("/metrics"),
"tcp_connection_duration_ms_count{direction=\"outbound\",classification=\"success\"} 2");
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
let out = metrics.get("/metrics");
assert_contains!(out,
"tcp_connection_duration_ms_count{direction=\"outbound\",peer=\"src\",classification=\"success\"} 1");
assert_contains!(out,
"tcp_connection_duration_ms_count{direction=\"outbound\",peer=\"dst\",classification=\"success\"} 1");
drop(tcp_client);
assert_contains!(metrics.get("/metrics"),
"tcp_connection_duration_ms_count{direction=\"outbound\",classification=\"success\"} 4");
let out = metrics.get("/metrics");
assert_contains!(out,
"tcp_connection_duration_ms_count{direction=\"outbound\",peer=\"src\",classification=\"success\"} 2");
assert_contains!(out,
"tcp_connection_duration_ms_count{direction=\"outbound\",peer=\"dst\",classification=\"success\"} 2");
}
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn outbound_tcp_sent_bytes() {
fn outbound_tcp_write_bytes_total() {
let _ = env_logger::try_init();
let TcpFixture { client, metrics, proxy: _proxy } =
TcpFixture::outbound();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let expected = format!(
"sent_bytes{{direction=\"outbound\",classification=\"success\"}} {}",
msg1.len() + msg2.len()
let src_expected = format!(
"tcp_write_bytes_total{{direction=\"outbound\",peer=\"src\"}} {}",
TcpFixture::BYE_MSG.len()
);
let dst_expected = format!(
"tcp_write_bytes_total{{direction=\"outbound\",peer=\"dst\"}} {}",
TcpFixture::HELLO_MSG.len()
);
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
drop(tcp_client);
assert_contains!(metrics.get("/metrics"), &expected);
}
let out = metrics.get("/metrics");
assert_contains!(out, &src_expected);
assert_contains!(out, &dst_expected); }
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn outbound_tcp_received_bytes() {
fn outbound_tcp_read_bytes_total() {
let _ = env_logger::try_init();
let TcpFixture { client, metrics, proxy: _proxy } =
TcpFixture::outbound();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let expected = format!(
"received_bytes{{direction=\"outbound\",classification=\"success\"}} {}",
msg1.len() + msg2.len()
let src_expected = format!(
"tcp_read_bytes_total{{direction=\"outbound\",peer=\"src\"}} {}",
TcpFixture::HELLO_MSG.len()
);
let dst_expected = format!(
"tcp_read_bytes_total{{direction=\"outbound\",peer=\"dst\"}} {}",
TcpFixture::BYE_MSG.len()
);
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
drop(tcp_client);
assert_contains!(metrics.get("/metrics"), &expected);
let out = metrics.get("/metrics");
assert_contains!(out, &src_expected);
assert_contains!(out, &dst_expected);
}
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn outbound_tcp_connections_open() {
fn outbound_tcp_open_connections() {
let _ = env_logger::try_init();
let TcpFixture { client, metrics, proxy: _proxy } =
TcpFixture::outbound();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 1");
"tcp_open_connections{direction=\"outbound\",peer=\"src\"} 1");
drop(tcp_client);
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 0");
"tcp_open_connections{direction=\"outbound\",peer=\"src\"} 0");
let tcp_client = client.connect();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
tcp_client.write(TcpFixture::HELLO_MSG);
assert_eq!(tcp_client.read(), TcpFixture::BYE_MSG.as_bytes());
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 1");
"tcp_open_connections{direction=\"outbound\",peer=\"src\"} 1");
drop(tcp_client);
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 0");
"tcp_open_connections{direction=\"outbound\",peer=\"src\"} 0");
}
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn outbound_http_tcp_connections_open() {
fn outbound_http_tcp_open_connections() {
let _ = env_logger::try_init();
let Fixture { client, metrics, proxy } =
Fixture::outbound();
@ -1074,10 +1084,10 @@ mod transport {
assert_eq!(client.get("/"), "hello");
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 1");
"tcp_open_connections{direction=\"outbound\",peer=\"src\"} 1");
drop(client);
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 0");
"tcp_open_connections{direction=\"outbound\",peer=\"src\"} 0");
// create a new client to force a new connection
let client = client::new(proxy.outbound, "tele.test.svc.cluster.local");
@ -1085,11 +1095,11 @@ mod transport {
info!("client.get(/)");
assert_eq!(client.get("/"), "hello");
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 1");
"tcp_open_connections{direction=\"outbound\",peer=\"src\"} 1");
drop(client);
assert_contains!(metrics.get("/metrics"),
"tcp_connections_open{direction=\"outbound\"} 0");
"tcp_open_connections{direction=\"outbound\",peer=\"src\"} 0");
}
}