mirror of https://github.com/linkerd/linkerd2.git
proxy: Add transport-level metrics (#785)
This branch adds all the transport-level Prometheus metrics as described in #742, with the exception of the `tcp_connections_open` gauge (to be added in a subsequent branch). A brief description of the metrics added in this branch: * `tcp_accept_open_total`: counter of the number of connections accepted by the proxy * `tcp_accept_close_total`: counter of the number of accepted connections that have closed * `tcp_connect_open_total`: counter of the number of connections opened by the proxy * `tcp_connect_close_total`: counter of the number of connections opened by the proxy that have been closed. * `tcp_connection_duration_ms`: histogram of the total duration of each TCP connection (incremented on connection close) * `sent_bytes`: counter of the total number of bytes sent on TCP connections (incremented on connection close) * `received_bytes`: counter of the total number of bytes received on TCP connections (incremented on connection close) These metrics are labeled with the direction (inbound or outbound) and whether the connection was proxied as raw TCP or corresponds to an HTTP request. Additionally, I've added several proxy tests for these metrics. Note that there are some cases which are currently untested; in particular, while there are tests for the `tcp_accept_close_total` counter, it's more difficult to test the `tcp_connect_close_total` counter, due to connection pooling. I'd like to improve the tests for this code in additional branches.
This commit is contained in:
parent
689c42263a
commit
674ce87588
|
@ -27,9 +27,8 @@ pub struct TransportClose {
|
|||
|
||||
pub duration: Duration,
|
||||
|
||||
// TODO
|
||||
//pub rx_bytes: usize,
|
||||
//pub tx_bytes: usize,
|
||||
pub rx_bytes: u64,
|
||||
pub tx_bytes: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
|
|
@ -38,6 +38,13 @@ pub struct ResponseLabels {
|
|||
classification: Classification,
|
||||
}
|
||||
|
||||
/// Labels describing a TCP connection
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
|
||||
pub struct TransportLabels {
|
||||
/// Was the transport opened in the inbound or outbound direction?
|
||||
direction: Direction,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||
enum Classification {
|
||||
Success,
|
||||
|
@ -255,3 +262,19 @@ impl fmt::Display for DstLabels {
|
|||
self.formatted.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl TransportLabels =====
|
||||
|
||||
impl TransportLabels {
|
||||
pub fn new(ctx: &ctx::transport::Ctx) -> Self {
|
||||
TransportLabels {
|
||||
direction: Direction::from_context(&ctx.proxy()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for TransportLabels {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", self.direction)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,7 +49,7 @@ use telemetry::event::Event;
|
|||
mod labels;
|
||||
mod latency;
|
||||
|
||||
use self::labels::{RequestLabels, ResponseLabels};
|
||||
use self::labels::{RequestLabels, ResponseLabels, TransportLabels};
|
||||
use self::latency::{BUCKET_BOUNDS, Histogram};
|
||||
pub use self::labels::DstLabels;
|
||||
|
||||
|
@ -62,6 +62,17 @@ struct Metrics {
|
|||
response_duration: Metric<Histogram, Arc<ResponseLabels>>,
|
||||
response_latency: Metric<Histogram, Arc<ResponseLabels>>,
|
||||
|
||||
tcp_accept_open_total: Metric<Counter, Arc<TransportLabels>>,
|
||||
tcp_accept_close_total: Metric<Counter, Arc<TransportLabels>>,
|
||||
|
||||
tcp_connect_open_total: Metric<Counter, Arc<TransportLabels>>,
|
||||
tcp_connect_close_total: Metric<Counter, Arc<TransportLabels>>,
|
||||
|
||||
tcp_connection_duration: Metric<Histogram, Arc<TransportLabels>>,
|
||||
|
||||
sent_bytes: Metric<Counter, Arc<TransportLabels>>,
|
||||
received_bytes: Metric<Counter, Arc<TransportLabels>>,
|
||||
|
||||
start_time: u64,
|
||||
}
|
||||
|
||||
|
@ -162,12 +173,58 @@ impl Metrics {
|
|||
stream has completed.",
|
||||
);
|
||||
|
||||
let tcp_accept_open_total = Metric::<Counter, Arc<TransportLabels>>::new(
|
||||
"tcp_accept_open_total",
|
||||
"A counter of the total number of transport connections which \
|
||||
have been accepted by the proxy.",
|
||||
);
|
||||
|
||||
let tcp_accept_close_total = Metric::<Counter, Arc<TransportLabels>>::new(
|
||||
"tcp_accept_close_total",
|
||||
"A counter of the total number of transport connections accepted \
|
||||
by the proxy which have been closed.",
|
||||
);
|
||||
|
||||
let tcp_connect_open_total = Metric::<Counter, Arc<TransportLabels>>::new(
|
||||
"tcp_connect_open_total",
|
||||
"A counter of the total number of transport connections which \
|
||||
have been opened by the proxy.",
|
||||
);
|
||||
|
||||
let tcp_connect_close_total = Metric::<Counter, Arc<TransportLabels>>::new(
|
||||
"tcp_connect_close_total",
|
||||
"A counter of the total number of transport connections opened \
|
||||
by the proxy which have been closed.",
|
||||
);
|
||||
|
||||
let tcp_connection_duration = Metric::<Histogram, Arc<TransportLabels>>::new(
|
||||
"tcp_connection_duration_ms",
|
||||
"A histogram of the duration of the lifetime of a connection, in milliseconds",
|
||||
);
|
||||
|
||||
let received_bytes = Metric::<Counter, Arc<TransportLabels>>::new(
|
||||
"received_bytes",
|
||||
"A counter of the total number of recieved bytes."
|
||||
);
|
||||
|
||||
let sent_bytes = Metric::<Counter, Arc<TransportLabels>>::new(
|
||||
"sent_bytes",
|
||||
"A counter of the total number of sent bytes."
|
||||
);
|
||||
|
||||
Metrics {
|
||||
request_total,
|
||||
request_duration,
|
||||
response_total,
|
||||
response_duration,
|
||||
response_latency,
|
||||
tcp_accept_open_total,
|
||||
tcp_accept_close_total,
|
||||
tcp_connect_open_total,
|
||||
tcp_connect_close_total,
|
||||
tcp_connection_duration,
|
||||
received_bytes,
|
||||
sent_bytes,
|
||||
start_time,
|
||||
}
|
||||
}
|
||||
|
@ -177,7 +234,7 @@ impl Metrics {
|
|||
-> &mut Counter {
|
||||
self.request_total.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Default::default)
|
||||
.or_insert_with(Counter::default)
|
||||
}
|
||||
|
||||
fn request_duration(&mut self,
|
||||
|
@ -185,7 +242,7 @@ impl Metrics {
|
|||
-> &mut Histogram {
|
||||
self.request_duration.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Default::default)
|
||||
.or_insert_with(Histogram::default)
|
||||
}
|
||||
|
||||
fn response_duration(&mut self,
|
||||
|
@ -193,7 +250,7 @@ impl Metrics {
|
|||
-> &mut Histogram {
|
||||
self.response_duration.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Default::default)
|
||||
.or_insert_with(Histogram::default)
|
||||
}
|
||||
|
||||
fn response_latency(&mut self,
|
||||
|
@ -201,7 +258,7 @@ impl Metrics {
|
|||
-> &mut Histogram {
|
||||
self.response_latency.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Default::default)
|
||||
.or_insert_with(Histogram::default)
|
||||
}
|
||||
|
||||
fn response_total(&mut self,
|
||||
|
@ -209,21 +266,82 @@ impl Metrics {
|
|||
-> &mut Counter {
|
||||
self.response_total.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Default::default)
|
||||
.or_insert_with(Counter::default)
|
||||
}
|
||||
|
||||
fn tcp_accept_open_total(&mut self,
|
||||
labels: &Arc<TransportLabels>)
|
||||
-> &mut Counter {
|
||||
self.tcp_accept_open_total.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Counter::default)
|
||||
}
|
||||
|
||||
fn tcp_accept_close_total(&mut self,
|
||||
labels: &Arc<TransportLabels>)
|
||||
-> &mut Counter {
|
||||
self.tcp_accept_close_total.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Counter::default)
|
||||
}
|
||||
|
||||
fn tcp_connect_open_total(&mut self,
|
||||
labels: &Arc<TransportLabels>)
|
||||
-> &mut Counter {
|
||||
self.tcp_connect_open_total.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Counter::default)
|
||||
}
|
||||
|
||||
fn tcp_connect_close_total(&mut self,
|
||||
labels: &Arc<TransportLabels>)
|
||||
-> &mut Counter {
|
||||
self.tcp_connect_close_total.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Counter::default)
|
||||
}
|
||||
|
||||
fn tcp_connection_duration(&mut self,
|
||||
labels: &Arc<TransportLabels>)
|
||||
-> &mut Histogram {
|
||||
self.tcp_connection_duration.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Histogram::default)
|
||||
}
|
||||
|
||||
fn sent_bytes(&mut self,
|
||||
labels: &Arc<TransportLabels>)
|
||||
-> &mut Counter {
|
||||
self.sent_bytes.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Counter::default)
|
||||
}
|
||||
|
||||
fn received_bytes(&mut self,
|
||||
labels: &Arc<TransportLabels>)
|
||||
-> &mut Counter {
|
||||
self.received_bytes.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Counter::default)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl fmt::Display for Metrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}\n{}\n{}\n{}\n{}\nprocess_start_time_seconds {}\n",
|
||||
self.request_total,
|
||||
self.request_duration,
|
||||
self.response_total,
|
||||
self.response_duration,
|
||||
self.response_latency,
|
||||
self.start_time,
|
||||
)
|
||||
writeln!(f, "{}", self.request_total)?;
|
||||
writeln!(f, "{}", self.request_duration)?;
|
||||
writeln!(f, "{}", self.response_total)?;
|
||||
writeln!(f, "{}", self.response_duration)?;
|
||||
writeln!(f, "{}", self.response_latency)?;
|
||||
writeln!(f, "{}", self.tcp_accept_open_total)?;
|
||||
writeln!(f, "{}", self.tcp_accept_close_total)?;
|
||||
writeln!(f, "{}", self.tcp_connect_open_total)?;
|
||||
writeln!(f, "{}", self.tcp_connect_close_total)?;
|
||||
writeln!(f, "{}", self.tcp_connection_duration)?;
|
||||
writeln!(f, "{}", self.sent_bytes)?;
|
||||
writeln!(f, "{}", self.received_bytes)?;
|
||||
writeln!(f, "{}", self.start_time)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -249,6 +367,13 @@ impl ops::Add for Counter {
|
|||
}
|
||||
}
|
||||
|
||||
impl ops::AddAssign<u64> for Counter {
|
||||
fn add_assign(&mut self, rhs: u64) {
|
||||
(*self).0 += Wrapping(rhs)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl Counter {
|
||||
|
||||
/// Increment the counter by one.
|
||||
|
@ -414,8 +539,35 @@ impl Aggregate {
|
|||
});
|
||||
},
|
||||
|
||||
Event::TransportOpen(_) | Event::TransportClose(_, _) => {
|
||||
// TODO: we don't collect any metrics around transport events.
|
||||
Event::TransportOpen(ref ctx) => {
|
||||
let labels = Arc::new(TransportLabels::new(ctx));
|
||||
self.update(|metrics| match ctx.as_ref() {
|
||||
&ctx::transport::Ctx::Server(_) => {
|
||||
*metrics.tcp_accept_open_total(&labels).incr();
|
||||
},
|
||||
&ctx::transport::Ctx::Client(_) => {
|
||||
*metrics.tcp_connect_open_total(&labels).incr();
|
||||
},
|
||||
})
|
||||
},
|
||||
|
||||
Event::TransportClose(ref ctx, ref close) => {
|
||||
// TODO: use the `clean` field in `close` to record whether or not
|
||||
// there was an error.
|
||||
let labels = Arc::new(TransportLabels::new(ctx));
|
||||
self.update(|metrics| {
|
||||
*metrics.tcp_connection_duration(&labels) += close.duration;
|
||||
*metrics.sent_bytes(&labels) += close.tx_bytes as u64;
|
||||
*metrics.received_bytes(&labels) += close.tx_bytes as u64;
|
||||
match ctx.as_ref() {
|
||||
&ctx::transport::Ctx::Server(_) => {
|
||||
*metrics.tcp_accept_close_total(&labels).incr();
|
||||
},
|
||||
&ctx::transport::Ctx::Client(_) => {
|
||||
*metrics.tcp_connect_close_total(&labels).incr();
|
||||
},
|
||||
};
|
||||
})
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
|
@ -18,9 +18,8 @@ struct Inner {
|
|||
ctx: Arc<ctx::transport::Ctx>,
|
||||
opened_at: Instant,
|
||||
|
||||
// TODO
|
||||
//rx_bytes: usize,
|
||||
//tx_bytes: usize,
|
||||
rx_bytes: u64,
|
||||
tx_bytes: u64,
|
||||
}
|
||||
|
||||
/// Builds client transports with telemetry.
|
||||
|
@ -59,6 +58,8 @@ impl<T: AsyncRead + AsyncWrite> Transport<T> {
|
|||
ctx,
|
||||
handle,
|
||||
opened_at,
|
||||
rx_bytes: 0,
|
||||
tx_bytes: 0,
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
@ -79,6 +80,8 @@ impl<T: AsyncRead + AsyncWrite> Transport<T> {
|
|||
mut handle,
|
||||
ctx,
|
||||
opened_at,
|
||||
rx_bytes,
|
||||
tx_bytes,
|
||||
}) = self.1.take()
|
||||
{
|
||||
handle.send(move || {
|
||||
|
@ -86,6 +89,8 @@ impl<T: AsyncRead + AsyncWrite> Transport<T> {
|
|||
let ev = event::TransportClose {
|
||||
duration,
|
||||
clean: false,
|
||||
rx_bytes,
|
||||
tx_bytes,
|
||||
};
|
||||
event::Event::TransportClose(ctx, ev)
|
||||
});
|
||||
|
@ -104,6 +109,8 @@ impl<T> Drop for Transport<T> {
|
|||
mut handle,
|
||||
ctx,
|
||||
opened_at,
|
||||
rx_bytes,
|
||||
tx_bytes,
|
||||
}) = self.1.take()
|
||||
{
|
||||
handle.send(move || {
|
||||
|
@ -111,6 +118,8 @@ impl<T> Drop for Transport<T> {
|
|||
let ev = event::TransportClose {
|
||||
clean: true,
|
||||
duration,
|
||||
rx_bytes,
|
||||
tx_bytes,
|
||||
};
|
||||
event::Event::TransportClose(ctx, ev)
|
||||
});
|
||||
|
@ -120,7 +129,13 @@ impl<T> Drop for Transport<T> {
|
|||
|
||||
impl<T: AsyncRead + AsyncWrite> io::Read for Transport<T> {
|
||||
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.sense_err(move |io| io.read(buf))
|
||||
let bytes = self.sense_err(move |io| io.read(buf))?;
|
||||
|
||||
if let Some(inner) = self.1.as_mut() {
|
||||
inner.rx_bytes += bytes as u64;
|
||||
}
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -130,7 +145,13 @@ impl<T: AsyncRead + AsyncWrite> io::Write for Transport<T> {
|
|||
}
|
||||
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.sense_err(move |io| io.write(buf))
|
||||
let bytes = self.sense_err(move |io| io.write(buf))?;
|
||||
|
||||
if let Some(inner) = self.1.as_mut() {
|
||||
inner.tx_bytes += bytes as u64;
|
||||
}
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -110,7 +110,7 @@ pub mod client;
|
|||
pub mod controller;
|
||||
pub mod proxy;
|
||||
pub mod server;
|
||||
mod tcp;
|
||||
pub mod tcp;
|
||||
|
||||
pub fn shutdown_signal() -> (Shutdown, ShutdownRx) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
|
|
@ -18,6 +18,12 @@ struct Fixture {
|
|||
proxy: proxy::Listening,
|
||||
}
|
||||
|
||||
struct TcpFixture {
|
||||
client: tcp::TcpClient,
|
||||
metrics: client::Client,
|
||||
proxy: proxy::Listening,
|
||||
}
|
||||
|
||||
impl Fixture {
|
||||
fn inbound() -> Self {
|
||||
info!("running test server");
|
||||
|
@ -66,6 +72,48 @@ impl Fixture {
|
|||
}
|
||||
}
|
||||
|
||||
impl TcpFixture {
|
||||
fn server() -> server::Listening {
|
||||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
|
||||
server::tcp()
|
||||
.accept(move |read| {
|
||||
assert_eq!(read, msg1.as_bytes());
|
||||
msg2
|
||||
})
|
||||
.accept(move |read| {
|
||||
assert_eq!(read, msg1.as_bytes());
|
||||
msg2
|
||||
})
|
||||
.run()
|
||||
}
|
||||
|
||||
fn inbound() -> Self {
|
||||
let ctrl = controller::new().run();
|
||||
let proxy = proxy::new()
|
||||
.controller(ctrl)
|
||||
.inbound(TcpFixture::server())
|
||||
.run();
|
||||
|
||||
let client = client::tcp(proxy.inbound);
|
||||
let metrics = client::http1(proxy.metrics, "localhost");
|
||||
TcpFixture { client, metrics, proxy }
|
||||
}
|
||||
|
||||
fn outbound() -> Self {
|
||||
let ctrl = controller::new().run();
|
||||
let proxy = proxy::new()
|
||||
.controller(ctrl)
|
||||
.outbound(TcpFixture::server())
|
||||
.run();
|
||||
|
||||
let client = client::tcp(proxy.outbound);
|
||||
let metrics = client::http1(proxy.metrics, "localhost");
|
||||
TcpFixture { client, metrics, proxy }
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn metrics_endpoint_inbound_request_count() {
|
||||
let _ = env_logger::try_init();
|
||||
|
@ -665,3 +713,368 @@ fn metrics_have_no_double_commas() {
|
|||
let scrape = metrics.get("/metrics");
|
||||
assert!(!scrape.contains(",,"), "outbound metrics had double comma");
|
||||
}
|
||||
|
||||
mod transport {
|
||||
use super::support::*;
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn inbound_http_accept() {
|
||||
let _ = env_logger::try_init();
|
||||
let Fixture { client, metrics, proxy } = Fixture::inbound();
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_open_total{direction=\"inbound\"} 1"
|
||||
);
|
||||
// drop the client to force the connection to close.
|
||||
drop(client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"inbound\"} 1"
|
||||
);
|
||||
|
||||
// create a new client to force a new connection
|
||||
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_open_total{direction=\"inbound\"} 2"
|
||||
);
|
||||
// drop the client to force the connection to close.
|
||||
drop(client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"inbound\"} 2"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn inbound_http_connect() {
|
||||
let _ = env_logger::try_init();
|
||||
let Fixture { client, metrics, proxy } = Fixture::inbound();
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connect_open_total{direction=\"inbound\"} 1");
|
||||
|
||||
// create a new client to force a new connection
|
||||
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
// server connection should be pooled
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connect_open_total{direction=\"inbound\"} 1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn outbound_http_accept() {
|
||||
let _ = env_logger::try_init();
|
||||
let Fixture { client, metrics, proxy } = Fixture::outbound();
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_open_total{direction=\"outbound\"} 1"
|
||||
);
|
||||
// drop the client to force the connection to close.
|
||||
drop(client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"outbound\"} 1"
|
||||
);
|
||||
|
||||
// create a new client to force a new connection
|
||||
let client = client::new(proxy.outbound, "tele.test.svc.cluster.local");
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_open_total{direction=\"outbound\"} 2"
|
||||
);
|
||||
// drop the client to force the connection to close.
|
||||
drop(client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"outbound\"} 2"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn outbound_http_connect() {
|
||||
let _ = env_logger::try_init();
|
||||
let Fixture { client, metrics, proxy } = Fixture::outbound();
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connect_open_total{direction=\"outbound\"} 1");
|
||||
|
||||
// create a new client to force a new connection
|
||||
let client2 = client::new(proxy.outbound, "tele.test.svc.cluster.local");
|
||||
|
||||
info!("client.get(/)");
|
||||
assert_eq!(client2.get("/"), "hello");
|
||||
// server connection should be pooled
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connect_open_total{direction=\"outbound\"} 1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn inbound_tcp_connect() {
|
||||
let _ = env_logger::try_init();
|
||||
let TcpFixture { client, metrics, proxy: _proxy } =
|
||||
TcpFixture::inbound();
|
||||
|
||||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connect_open_total{direction=\"inbound\"} 1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn inbound_tcp_accept() {
|
||||
let _ = env_logger::try_init();
|
||||
let TcpFixture { client, metrics, proxy: _proxy } =
|
||||
TcpFixture::inbound();
|
||||
|
||||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_open_total{direction=\"inbound\"} 1");
|
||||
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"inbound\"} 1");
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_open_total{direction=\"inbound\"} 2");
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"inbound\"} 2");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn inbound_tcp_duration() {
|
||||
let _ = env_logger::try_init();
|
||||
let TcpFixture { client, metrics, proxy: _proxy } =
|
||||
TcpFixture::inbound();
|
||||
|
||||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
drop(tcp_client);
|
||||
// TODO: make assertions about buckets
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connection_duration_ms_count{direction=\"inbound\"} 2");
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connection_duration_ms_count{direction=\"inbound\"} 2");
|
||||
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connection_duration_ms_count{direction=\"inbound\"} 4");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn inbound_tcp_sent_bytes() {
|
||||
let _ = env_logger::try_init();
|
||||
let TcpFixture { client, metrics, proxy: _proxy } =
|
||||
TcpFixture::inbound();
|
||||
|
||||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
let expected = format!(
|
||||
"sent_bytes{{direction=\"inbound\"}} {}",
|
||||
msg1.len() + msg2.len()
|
||||
);
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"), &expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn inbound_tcp_received_bytes() {
|
||||
let _ = env_logger::try_init();
|
||||
let TcpFixture { client, metrics, proxy: _proxy } =
|
||||
TcpFixture::inbound();
|
||||
|
||||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
let expected = format!(
|
||||
"received_bytes{{direction=\"inbound\"}} {}",
|
||||
msg1.len() + msg2.len()
|
||||
);
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"), &expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn outbound_tcp_connect() {
|
||||
let _ = env_logger::try_init();
|
||||
let TcpFixture { client, metrics, proxy: _proxy } =
|
||||
TcpFixture::outbound();
|
||||
|
||||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connect_open_total{direction=\"outbound\"} 1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn outbound_tcp_accept() {
|
||||
let _ = env_logger::try_init();
|
||||
let TcpFixture { client, metrics, proxy: _proxy } =
|
||||
TcpFixture::outbound();
|
||||
|
||||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_open_total{direction=\"outbound\"} 1");
|
||||
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"outbound\"} 1");
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_open_total{direction=\"outbound\"} 2");
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"outbound\"} 2");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn outbound_tcp_duration() {
|
||||
let _ = env_logger::try_init();
|
||||
let TcpFixture { client, metrics, proxy: _proxy } =
|
||||
TcpFixture::outbound();
|
||||
|
||||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
drop(tcp_client);
|
||||
// TODO: make assertions about buckets
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connection_duration_ms_count{direction=\"outbound\"} 2");
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connection_duration_ms_count{direction=\"outbound\"} 2");
|
||||
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connection_duration_ms_count{direction=\"outbound\"} 4");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn outbound_tcp_sent_bytes() {
|
||||
let _ = env_logger::try_init();
|
||||
let TcpFixture { client, metrics, proxy: _proxy } =
|
||||
TcpFixture::outbound();
|
||||
|
||||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
let expected = format!(
|
||||
"sent_bytes{{direction=\"outbound\"}} {}",
|
||||
msg1.len() + msg2.len()
|
||||
);
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"), &expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn outbound_tcp_received_bytes() {
|
||||
let _ = env_logger::try_init();
|
||||
let TcpFixture { client, metrics, proxy: _proxy } =
|
||||
TcpFixture::outbound();
|
||||
|
||||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
let expected = format!(
|
||||
"received_bytes{{direction=\"outbound\"}} {}",
|
||||
msg1.len() + msg2.len()
|
||||
);
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"), &expected);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue