mirror of https://github.com/linkerd/linkerd2.git
proxy: Add classifications to TCP close stats (#790)
This PR adds a `classification` label to transport level metrics collected on transport close. Like the `classification` label on HTTP response metrics, the value may be either `"success"` or `"failure"`. The label value is determined based on the `clean` field on the `TransportClose` event, which indicates whether a transport closed cleanly or due to an error. I've updated the tests for transport-level metrics to reflect the addition of the new label. I'd like to also modify the test support code to allow us to close transports with errors, in order to test that the errors are correctly classified as failures.
This commit is contained in:
parent
326d9f493c
commit
c19da70965
|
@ -39,6 +39,13 @@ impl Ctx {
|
|||
Ctx::Server(ref ctx) => &ctx.proxy,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn protocol(&self) -> Protocol {
|
||||
match *self {
|
||||
Ctx::Client(ref ctx) => ctx.protocol,
|
||||
Ctx::Server(ref ctx) => ctx.protocol,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Server {
|
||||
|
|
|
@ -6,6 +6,7 @@ use std::sync::Arc;
|
|||
use http;
|
||||
|
||||
use ctx;
|
||||
use telemetry::event;
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
|
||||
pub struct RequestLabels {
|
||||
|
@ -45,6 +46,14 @@ pub struct TransportLabels {
|
|||
direction: Direction,
|
||||
}
|
||||
|
||||
/// Labels describing the end of a TCP connection
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
|
||||
pub struct TransportCloseLabels {
|
||||
transport: TransportLabels,
|
||||
|
||||
classification: Classification,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||
enum Classification {
|
||||
Success,
|
||||
|
@ -173,6 +182,14 @@ impl Classification {
|
|||
.unwrap_or_else(|| Classification::http_status(&rsp.status))
|
||||
}
|
||||
|
||||
fn transport_close(close: &event::TransportClose) -> Self {
|
||||
if close.clean {
|
||||
Classification::Success
|
||||
} else {
|
||||
Classification::Failure
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl fmt::Display for Classification {
|
||||
|
@ -263,6 +280,7 @@ impl fmt::Display for DstLabels {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
// ===== impl TransportLabels =====
|
||||
|
||||
impl TransportLabels {
|
||||
|
@ -275,6 +293,26 @@ impl TransportLabels {
|
|||
|
||||
impl fmt::Display for TransportLabels {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", self.direction)
|
||||
fmt::Display::fmt(&self.direction, f)
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl TransportCloseLabels =====
|
||||
|
||||
impl TransportCloseLabels {
|
||||
pub fn new(ctx: &ctx::transport::Ctx,
|
||||
close: &event::TransportClose)
|
||||
-> Self {
|
||||
TransportCloseLabels {
|
||||
transport: TransportLabels::new(ctx),
|
||||
classification: Classification::transport_close(close),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for TransportCloseLabels {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{},{}", self.transport, self.classification)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,7 +49,12 @@ use telemetry::event::Event;
|
|||
mod labels;
|
||||
mod latency;
|
||||
|
||||
use self::labels::{RequestLabels, ResponseLabels, TransportLabels};
|
||||
use self::labels::{
|
||||
RequestLabels,
|
||||
ResponseLabels,
|
||||
TransportLabels,
|
||||
TransportCloseLabels
|
||||
};
|
||||
use self::latency::{BUCKET_BOUNDS, Histogram};
|
||||
pub use self::labels::DstLabels;
|
||||
|
||||
|
@ -63,15 +68,15 @@ struct Metrics {
|
|||
response_latency: Metric<Histogram, Arc<ResponseLabels>>,
|
||||
|
||||
tcp_accept_open_total: Metric<Counter, Arc<TransportLabels>>,
|
||||
tcp_accept_close_total: Metric<Counter, Arc<TransportLabels>>,
|
||||
tcp_accept_close_total: Metric<Counter, Arc<TransportCloseLabels>>,
|
||||
|
||||
tcp_connect_open_total: Metric<Counter, Arc<TransportLabels>>,
|
||||
tcp_connect_close_total: Metric<Counter, Arc<TransportLabels>>,
|
||||
tcp_connect_close_total: Metric<Counter, Arc<TransportCloseLabels>>,
|
||||
|
||||
tcp_connection_duration: Metric<Histogram, Arc<TransportLabels>>,
|
||||
tcp_connection_duration: Metric<Histogram, Arc<TransportCloseLabels>>,
|
||||
|
||||
sent_bytes: Metric<Counter, Arc<TransportLabels>>,
|
||||
received_bytes: Metric<Counter, Arc<TransportLabels>>,
|
||||
sent_bytes: Metric<Counter, Arc<TransportCloseLabels>>,
|
||||
received_bytes: Metric<Counter, Arc<TransportCloseLabels>>,
|
||||
|
||||
start_time: u64,
|
||||
}
|
||||
|
@ -179,7 +184,7 @@ impl Metrics {
|
|||
have been accepted by the proxy.",
|
||||
);
|
||||
|
||||
let tcp_accept_close_total = Metric::<Counter, Arc<TransportLabels>>::new(
|
||||
let tcp_accept_close_total = Metric::<Counter, Arc<TransportCloseLabels>>::new(
|
||||
"tcp_accept_close_total",
|
||||
"A counter of the total number of transport connections accepted \
|
||||
by the proxy which have been closed.",
|
||||
|
@ -191,23 +196,23 @@ impl Metrics {
|
|||
have been opened by the proxy.",
|
||||
);
|
||||
|
||||
let tcp_connect_close_total = Metric::<Counter, Arc<TransportLabels>>::new(
|
||||
let tcp_connect_close_total = Metric::<Counter, Arc<TransportCloseLabels>>::new(
|
||||
"tcp_connect_close_total",
|
||||
"A counter of the total number of transport connections opened \
|
||||
by the proxy which have been closed.",
|
||||
);
|
||||
|
||||
let tcp_connection_duration = Metric::<Histogram, Arc<TransportLabels>>::new(
|
||||
let tcp_connection_duration = Metric::<Histogram, Arc<TransportCloseLabels>>::new(
|
||||
"tcp_connection_duration_ms",
|
||||
"A histogram of the duration of the lifetime of a connection, in milliseconds",
|
||||
);
|
||||
|
||||
let received_bytes = Metric::<Counter, Arc<TransportLabels>>::new(
|
||||
let received_bytes = Metric::<Counter, Arc<TransportCloseLabels>>::new(
|
||||
"received_bytes",
|
||||
"A counter of the total number of recieved bytes."
|
||||
);
|
||||
|
||||
let sent_bytes = Metric::<Counter, Arc<TransportLabels>>::new(
|
||||
let sent_bytes = Metric::<Counter, Arc<TransportCloseLabels>>::new(
|
||||
"sent_bytes",
|
||||
"A counter of the total number of sent bytes."
|
||||
);
|
||||
|
@ -274,11 +279,11 @@ impl Metrics {
|
|||
-> &mut Counter {
|
||||
self.tcp_accept_open_total.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Counter::default)
|
||||
.or_insert_with(Default::default)
|
||||
}
|
||||
|
||||
fn tcp_accept_close_total(&mut self,
|
||||
labels: &Arc<TransportLabels>)
|
||||
labels: &Arc<TransportCloseLabels>)
|
||||
-> &mut Counter {
|
||||
self.tcp_accept_close_total.values
|
||||
.entry(labels.clone())
|
||||
|
@ -290,11 +295,11 @@ impl Metrics {
|
|||
-> &mut Counter {
|
||||
self.tcp_connect_open_total.values
|
||||
.entry(labels.clone())
|
||||
.or_insert_with(Counter::default)
|
||||
.or_insert_with(Default::default)
|
||||
}
|
||||
|
||||
fn tcp_connect_close_total(&mut self,
|
||||
labels: &Arc<TransportLabels>)
|
||||
labels: &Arc<TransportCloseLabels>)
|
||||
-> &mut Counter {
|
||||
self.tcp_connect_close_total.values
|
||||
.entry(labels.clone())
|
||||
|
@ -302,7 +307,7 @@ impl Metrics {
|
|||
}
|
||||
|
||||
fn tcp_connection_duration(&mut self,
|
||||
labels: &Arc<TransportLabels>)
|
||||
labels: &Arc<TransportCloseLabels>)
|
||||
-> &mut Histogram {
|
||||
self.tcp_connection_duration.values
|
||||
.entry(labels.clone())
|
||||
|
@ -310,7 +315,7 @@ impl Metrics {
|
|||
}
|
||||
|
||||
fn sent_bytes(&mut self,
|
||||
labels: &Arc<TransportLabels>)
|
||||
labels: &Arc<TransportCloseLabels>)
|
||||
-> &mut Counter {
|
||||
self.sent_bytes.values
|
||||
.entry(labels.clone())
|
||||
|
@ -318,7 +323,7 @@ impl Metrics {
|
|||
}
|
||||
|
||||
fn received_bytes(&mut self,
|
||||
labels: &Arc<TransportLabels>)
|
||||
labels: &Arc<TransportCloseLabels>)
|
||||
-> &mut Counter {
|
||||
self.received_bytes.values
|
||||
.entry(labels.clone())
|
||||
|
@ -552,9 +557,7 @@ impl Aggregate {
|
|||
},
|
||||
|
||||
Event::TransportClose(ref ctx, ref close) => {
|
||||
// TODO: use the `clean` field in `close` to record whether or not
|
||||
// there was an error.
|
||||
let labels = Arc::new(TransportLabels::new(ctx));
|
||||
let labels = Arc::new(TransportCloseLabels::new(ctx, close));
|
||||
self.update(|metrics| {
|
||||
*metrics.tcp_connection_duration(&labels) += close.duration;
|
||||
*metrics.sent_bytes(&labels) += close.tx_bytes as u64;
|
||||
|
|
|
@ -732,7 +732,7 @@ mod transport {
|
|||
// drop the client to force the connection to close.
|
||||
drop(client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"inbound\"} 1"
|
||||
"tcp_accept_close_total{direction=\"inbound\",classification=\"success\"} 1"
|
||||
);
|
||||
|
||||
// create a new client to force a new connection
|
||||
|
@ -746,7 +746,7 @@ mod transport {
|
|||
// drop the client to force the connection to close.
|
||||
drop(client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"inbound\"} 2"
|
||||
"tcp_accept_close_total{direction=\"inbound\",classification=\"success\"} 2"
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -785,7 +785,7 @@ mod transport {
|
|||
// drop the client to force the connection to close.
|
||||
drop(client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"outbound\"} 1"
|
||||
"tcp_accept_close_total{direction=\"outbound\",classification=\"success\"} 1"
|
||||
);
|
||||
|
||||
// create a new client to force a new connection
|
||||
|
@ -799,7 +799,7 @@ mod transport {
|
|||
// drop the client to force the connection to close.
|
||||
drop(client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"outbound\"} 2"
|
||||
"tcp_accept_close_total{direction=\"outbound\",classification=\"success\"} 2"
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -821,7 +821,7 @@ mod transport {
|
|||
assert_eq!(client2.get("/"), "hello");
|
||||
// server connection should be pooled
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connect_open_total{direction=\"outbound\"} 1");
|
||||
"tcp_connect_open_total{direction=\"outbound\",classification=\"success\"} 1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -843,7 +843,6 @@ mod transport {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn inbound_tcp_accept() {
|
||||
let _ = env_logger::try_init();
|
||||
let TcpFixture { client, metrics, proxy: _proxy } =
|
||||
|
@ -862,7 +861,7 @@ mod transport {
|
|||
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"inbound\"} 1");
|
||||
"tcp_accept_close_total{direction=\"inbound\",classification=\"success\"} 1");
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
|
@ -873,11 +872,10 @@ mod transport {
|
|||
"tcp_accept_open_total{direction=\"inbound\"} 2");
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"inbound\"} 2");
|
||||
"tcp_accept_close_total{direction=\"inbound\",classification=\"success\"} 2");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
|
||||
fn inbound_tcp_duration() {
|
||||
let _ = env_logger::try_init();
|
||||
let TcpFixture { client, metrics, proxy: _proxy } =
|
||||
|
@ -893,18 +891,18 @@ mod transport {
|
|||
drop(tcp_client);
|
||||
// TODO: make assertions about buckets
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connection_duration_ms_count{direction=\"inbound\"} 2");
|
||||
"tcp_connection_duration_ms_count{direction=\"inbound\",classification=\"success\"} 2");
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connection_duration_ms_count{direction=\"inbound\"} 2");
|
||||
"tcp_connection_duration_ms_count{direction=\"inbound\",classification=\"success\"} 2");
|
||||
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connection_duration_ms_count{direction=\"inbound\"} 4");
|
||||
"tcp_connection_duration_ms_count{direction=\"inbound\",classification=\"success\"} 4");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -917,7 +915,7 @@ mod transport {
|
|||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
let expected = format!(
|
||||
"sent_bytes{{direction=\"inbound\"}} {}",
|
||||
"sent_bytes{{direction=\"inbound\",classification=\"success\"}} {}",
|
||||
msg1.len() + msg2.len()
|
||||
);
|
||||
|
||||
|
@ -939,7 +937,7 @@ mod transport {
|
|||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
let expected = format!(
|
||||
"received_bytes{{direction=\"inbound\"}} {}",
|
||||
"sent_bytes{{direction=\"inbound\",classification=\"success\"}} {}",
|
||||
msg1.len() + msg2.len()
|
||||
);
|
||||
|
||||
|
@ -989,7 +987,7 @@ mod transport {
|
|||
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"outbound\"} 1");
|
||||
"tcp_accept_close_total{direction=\"outbound\",classification=\"success\"} 1");
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
|
@ -1000,7 +998,7 @@ mod transport {
|
|||
"tcp_accept_open_total{direction=\"outbound\"} 2");
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_accept_close_total{direction=\"outbound\"} 2");
|
||||
"tcp_accept_close_total{direction=\"outbound\",classification=\"success\"} 2");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -1020,18 +1018,18 @@ mod transport {
|
|||
drop(tcp_client);
|
||||
// TODO: make assertions about buckets
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connection_duration_ms_count{direction=\"outbound\"} 2");
|
||||
"tcp_connection_duration_ms_count{direction=\"outbound\",classification=\"success\"} 2");
|
||||
|
||||
let tcp_client = client.connect();
|
||||
|
||||
tcp_client.write(msg1);
|
||||
assert_eq!(tcp_client.read(), msg2.as_bytes());
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connection_duration_ms_count{direction=\"outbound\"} 2");
|
||||
"tcp_connection_duration_ms_count{direction=\"outbound\",classification=\"success\"} 2");
|
||||
|
||||
drop(tcp_client);
|
||||
assert_contains!(metrics.get("/metrics"),
|
||||
"tcp_connection_duration_ms_count{direction=\"outbound\"} 4");
|
||||
"tcp_connection_duration_ms_count{direction=\"outbound\",classification=\"success\"} 4");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -1044,7 +1042,7 @@ mod transport {
|
|||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
let expected = format!(
|
||||
"sent_bytes{{direction=\"outbound\"}} {}",
|
||||
"sent_bytes{{direction=\"outbound\",classification=\"success\"}} {}",
|
||||
msg1.len() + msg2.len()
|
||||
);
|
||||
|
||||
|
@ -1066,7 +1064,7 @@ mod transport {
|
|||
let msg1 = "custom tcp hello";
|
||||
let msg2 = "custom tcp bye";
|
||||
let expected = format!(
|
||||
"received_bytes{{direction=\"outbound\"}} {}",
|
||||
"received_bytes{{direction=\"outbound\",classification=\"success\"}} {}",
|
||||
msg1.len() + msg2.len()
|
||||
);
|
||||
|
||||
|
|
Loading…
Reference in New Issue