proxy: Record HTTP latency at first data frame (#981)

Currently, the proxy records a request's latency as the time between
when a request is opened and when its response stream completes. This is
not what we intend to record, especially when a response is long-lived.

In order to more accurate record latency, we want to track the time at
which the first response body frame is received (which is a close
approximation of time-to-first-byte).

Telemetry aggregation has been changed to use the first-frame time to
compute latencies; tests have been updated to exercise this behavior; and
the metrics documentation has been updated to reflect this change.

Addresses #818 
Relates to #980
This commit is contained in:
Oliver Gould 2018-05-23 16:02:44 -07:00 committed by GitHub
parent f41e74fd2c
commit 1d5ef1e4d5
4 changed files with 65 additions and 47 deletions

View File

@ -82,12 +82,15 @@ fn record_response_end(b: &mut Bencher) {
let (_, rsp) = request("http://buoyant.io", &server, &client, 1);
let request_open_at = Instant::now();
let response_open_at = request_open_at + Duration::from_millis(300);
let response_open_at = request_open_at + Duration::from_millis(100);
let response_first_frame_at = response_open_at + Duration::from_millis(100);
let response_end_at = response_open_at + Duration::from_millis(100);
let end = event::StreamResponseEnd {
grpc_status: None,
request_open_at,
response_open_at,
response_end_at: response_open_at,
response_first_frame_at,
response_end_at,
bytes_sent: 0,
frames_sent: 0,
};
@ -115,8 +118,9 @@ fn record_one_conn_request(b: &mut Bencher) {
let request_open_at = Instant::now();
let request_end_at = request_open_at + Duration::from_millis(10);
let response_open_at = request_open_at + Duration::from_millis(300);
let response_end_at = response_open_at;
let response_open_at = request_open_at + Duration::from_millis(100);
let response_first_frame_at = response_open_at + Duration::from_millis(100);
let response_end_at = response_open_at + Duration::from_millis(100);
use Event::*;
let events = vec![
@ -136,6 +140,7 @@ fn record_one_conn_request(b: &mut Bencher) {
grpc_status: None,
request_open_at,
response_open_at,
response_first_frame_at,
response_end_at,
bytes_sent: 0,
frames_sent: 0,
@ -172,8 +177,9 @@ fn record_many_dsts(b: &mut Bencher) {
let request_open_at = Instant::now();
let request_end_at = request_open_at + Duration::from_millis(10);
let response_open_at = request_open_at + Duration::from_millis(300);
let response_end_at = response_open_at;
let response_open_at = request_open_at + Duration::from_millis(100);
let response_first_frame_at = response_open_at + Duration::from_millis(100);
let response_end_at = response_open_at + Duration::from_millis(100);
for n in 0..REQUESTS {
let client = client(&proxy, vec![
@ -201,6 +207,7 @@ fn record_many_dsts(b: &mut Bencher) {
grpc_status: None,
request_open_at,
response_open_at,
response_first_frame_at,
response_end_at,
bytes_sent: 0,
frames_sent: 0,

View File

@ -54,6 +54,7 @@ pub struct StreamResponseOpen {
pub struct StreamResponseFail {
pub request_open_at: Instant,
pub response_open_at: Instant,
pub response_first_frame_at: Option<Instant>,
pub response_fail_at: Instant,
pub error: h2::Reason,
pub bytes_sent: u64,
@ -64,6 +65,7 @@ pub struct StreamResponseFail {
pub struct StreamResponseEnd {
pub request_open_at: Instant,
pub response_open_at: Instant,
pub response_first_frame_at: Instant,
pub response_end_at: Instant,
pub grpc_status: Option<u32>,
pub bytes_sent: u64,

View File

@ -51,18 +51,19 @@ impl Record {
Event::StreamResponseOpen(_, _) => {},
Event::StreamResponseEnd(ref res, ref end) => {
let request_open_at = end.response_end_at - end.request_open_at;
let latency = end.response_first_frame_at - end.request_open_at;
self.update(|metrics| {
metrics.response(ResponseLabels::new(res, end.grpc_status))
.end(request_open_at);
.end(latency);
});
},
Event::StreamResponseFail(ref res, ref fail) => {
// TODO: do we care about the failure's error code here?
let request_open_at = fail.response_fail_at - fail.request_open_at;
let first_frame_at = fail.response_first_frame_at.unwrap_or(fail.response_fail_at);
let latency = first_frame_at - fail.request_open_at;
self.update(|metrics| {
metrics.response(ResponseLabels::fail(res)).end(request_open_at)
metrics.response(ResponseLabels::fail(res)).end(latency)
});
},
@ -110,12 +111,15 @@ mod test {
let (_, rsp) = request("http://buoyant.io", &server, &client, 1);
let request_open_at = Instant::now();
let response_open_at = request_open_at + Duration::from_millis(300);
let response_open_at = request_open_at + Duration::from_millis(100);
let response_first_frame_at = response_open_at + Duration::from_millis(100);
let response_end_at = response_first_frame_at + Duration::from_millis(100);
let end = event::StreamResponseEnd {
grpc_status: None,
request_open_at,
response_open_at,
response_end_at: response_open_at,
response_first_frame_at,
response_end_at,
bytes_sent: 0,
frames_sent: 0,
};
@ -141,9 +145,9 @@ mod test {
assert_eq!(scope.total(), 1);
scope.latency().assert_bucket_exactly(300, 1);
scope.latency().assert_lt_exactly(300, 0);
scope.latency().assert_gt_exactly(300, 0);
scope.latency().assert_bucket_exactly(200, 1);
scope.latency().assert_lt_exactly(200, 0);
scope.latency().assert_gt_exactly(200, 0);
}
}
@ -178,8 +182,9 @@ mod test {
let request_open_at = Instant::now();
let request_end_at = request_open_at + Duration::from_millis(10);
let response_open_at = request_open_at + Duration::from_millis(300);
let response_end_at = response_open_at;
let response_open_at = request_open_at + Duration::from_millis(100);
let response_first_frame_at = response_open_at + Duration::from_millis(100);
let response_end_at = response_first_frame_at + Duration::from_millis(100);
let events = vec![
TransportOpen(server_transport.clone()),
TransportOpen(client_transport.clone()),
@ -197,6 +202,7 @@ mod test {
grpc_status: None,
request_open_at,
response_open_at,
response_first_frame_at,
response_end_at,
bytes_sent: 0,
frames_sent: 0,
@ -261,9 +267,9 @@ mod test {
assert_eq!(response_scope.total(), 1);
response_scope.latency()
.assert_bucket_exactly(300, 1)
.assert_gt_exactly(300, 0)
.assert_lt_exactly(300, 0);
.assert_bucket_exactly(200, 1)
.assert_gt_exactly(200, 0)
.assert_lt_exactly(200, 0);
// === server transport open scope ======================
let srv_transport_scope = lock

View File

@ -87,10 +87,9 @@ pub struct MeasuredBody<B, I: BodySensor> {
/// The `inner` portion of a `MeasuredBody`, with differing implementations
/// for request and response streams.
pub trait BodySensor: Sized {
fn frame(&mut self, bytes: usize);
fn fail(self, reason: h2::Reason);
fn end(self, grpc_status: Option<u32>);
fn frames_sent(&mut self) -> &mut u32;
fn bytes_sent(&mut self) -> &mut u64;
}
#[derive(Debug)]
@ -101,6 +100,7 @@ pub struct ResponseBodyInner {
frames_sent: u32,
request_open_at: Instant,
response_open_at: Instant,
response_first_frame_at: Option<Instant>,
}
@ -332,9 +332,10 @@ where
Arc::clone(&ctx),
event::StreamResponseEnd {
grpc_status,
response_end_at: response_open_at,
request_open_at,
response_open_at,
response_first_frame_at: response_open_at,
response_end_at: response_open_at,
bytes_sent: 0,
frames_sent: 0,
},
@ -350,6 +351,7 @@ where
frames_sent: 0,
request_open_at,
response_open_at,
response_first_frame_at: None,
})
}
});
@ -438,15 +440,14 @@ where
}
fn poll_data(&mut self) -> Poll<Option<Self::Data>, h2::Error> {
let frame = try_ready!(self.sense_err(|b| b.poll_data()));
let frame = frame.map(|frame| {
let frame = frame.into_buf();
let frame = try_ready!(self.sense_err(|b| b.poll_data()))
.map(|f| f.into_buf());
if let Some(ref f) = frame {
if let Some(ref mut inner) = self.inner {
*inner.frames_sent() += 1;
*inner.bytes_sent() += frame.remaining() as u64;
inner.frame(f.remaining());
}
frame
});
}
// If the frame ended the stream, send the end of stream event now,
// as we may not be polled again.
@ -510,12 +511,21 @@ where
impl BodySensor for ResponseBodyInner {
fn frame(&mut self, bytes: usize) {
self.frames_sent += 1;
self.bytes_sent += bytes as u64;
if self.response_first_frame_at.is_none() {
self.response_first_frame_at = Some(Instant::now());
}
}
fn fail(self, error: h2::Reason) {
let ResponseBodyInner {
ctx,
mut handle,
request_open_at,
response_open_at,
response_first_frame_at,
bytes_sent,
frames_sent,
..
@ -528,6 +538,7 @@ impl BodySensor for ResponseBodyInner {
error,
request_open_at,
response_open_at,
response_first_frame_at,
response_fail_at: Instant::now(),
bytes_sent,
frames_sent,
@ -542,9 +553,11 @@ impl BodySensor for ResponseBodyInner {
mut handle,
request_open_at,
response_open_at,
response_first_frame_at,
bytes_sent,
frames_sent,
} = self;
let response_end_at = Instant::now();
handle.send(||
event::Event::StreamResponseEnd(
@ -553,25 +566,23 @@ impl BodySensor for ResponseBodyInner {
grpc_status,
request_open_at,
response_open_at,
response_end_at: Instant::now(),
response_first_frame_at: response_first_frame_at.unwrap_or(response_end_at),
response_end_at,
bytes_sent,
frames_sent,
},
)
)
}
fn frames_sent(&mut self) -> &mut u32 {
&mut self.frames_sent
}
fn bytes_sent(&mut self) -> &mut u64 {
&mut self.bytes_sent
}
}
impl BodySensor for RequestBodyInner {
fn frame(&mut self, bytes: usize) {
self.frames_sent += 1;
self.bytes_sent += bytes as u64;
}
fn fail(self, error: h2::Reason) {
let RequestBodyInner {
ctx,
@ -610,14 +621,6 @@ impl BodySensor for RequestBodyInner {
)
)
}
fn frames_sent(&mut self) -> &mut u32 {
&mut self.frames_sent
}
fn bytes_sent(&mut self) -> &mut u64 {
&mut self.bytes_sent
}
}
impl<S> TimestampRequestOpen<S> {