From 1d5ef1e4d583af782e2d3652c6574a968f8f7ac8 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 23 May 2018 16:02:44 -0700 Subject: [PATCH] proxy: Record HTTP latency at first data frame (#981) Currently, the proxy records a request's latency as the time between when a request is opened and when its response stream completes. This is not what we intend to record, especially when a response is long-lived. In order to more accurate record latency, we want to track the time at which the first response body frame is received (which is a close approximation of time-to-first-byte). Telemetry aggregation has been changed to use the first-frame time to compute latencies; tests have been updated to exercise this behavior; and the metrics documentation has been updated to reflect this change. Addresses #818 Relates to #980 --- proxy/benches/record.rs | 19 ++++++--- proxy/src/telemetry/event.rs | 2 + proxy/src/telemetry/metrics/record.rs | 34 +++++++++------- proxy/src/telemetry/sensor/http.rs | 57 ++++++++++++++------------- 4 files changed, 65 insertions(+), 47 deletions(-) diff --git a/proxy/benches/record.rs b/proxy/benches/record.rs index 372b09a96..fea9922a0 100644 --- a/proxy/benches/record.rs +++ b/proxy/benches/record.rs @@ -82,12 +82,15 @@ fn record_response_end(b: &mut Bencher) { let (_, rsp) = request("http://buoyant.io", &server, &client, 1); let request_open_at = Instant::now(); - let response_open_at = request_open_at + Duration::from_millis(300); + let response_open_at = request_open_at + Duration::from_millis(100); + let response_first_frame_at = response_open_at + Duration::from_millis(100); + let response_end_at = response_open_at + Duration::from_millis(100); let end = event::StreamResponseEnd { grpc_status: None, request_open_at, response_open_at, - response_end_at: response_open_at, + response_first_frame_at, + response_end_at, bytes_sent: 0, frames_sent: 0, }; @@ -115,8 +118,9 @@ fn record_one_conn_request(b: &mut Bencher) { let request_open_at = Instant::now(); let request_end_at = request_open_at + Duration::from_millis(10); - let response_open_at = request_open_at + Duration::from_millis(300); - let response_end_at = response_open_at; + let response_open_at = request_open_at + Duration::from_millis(100); + let response_first_frame_at = response_open_at + Duration::from_millis(100); + let response_end_at = response_open_at + Duration::from_millis(100); use Event::*; let events = vec![ @@ -136,6 +140,7 @@ fn record_one_conn_request(b: &mut Bencher) { grpc_status: None, request_open_at, response_open_at, + response_first_frame_at, response_end_at, bytes_sent: 0, frames_sent: 0, @@ -172,8 +177,9 @@ fn record_many_dsts(b: &mut Bencher) { let request_open_at = Instant::now(); let request_end_at = request_open_at + Duration::from_millis(10); - let response_open_at = request_open_at + Duration::from_millis(300); - let response_end_at = response_open_at; + let response_open_at = request_open_at + Duration::from_millis(100); + let response_first_frame_at = response_open_at + Duration::from_millis(100); + let response_end_at = response_open_at + Duration::from_millis(100); for n in 0..REQUESTS { let client = client(&proxy, vec![ @@ -201,6 +207,7 @@ fn record_many_dsts(b: &mut Bencher) { grpc_status: None, request_open_at, response_open_at, + response_first_frame_at, response_end_at, bytes_sent: 0, frames_sent: 0, diff --git a/proxy/src/telemetry/event.rs b/proxy/src/telemetry/event.rs index 0de8edf85..81cf8bf31 100644 --- a/proxy/src/telemetry/event.rs +++ b/proxy/src/telemetry/event.rs @@ -54,6 +54,7 @@ pub struct StreamResponseOpen { pub struct StreamResponseFail { pub request_open_at: Instant, pub response_open_at: Instant, + pub response_first_frame_at: Option, pub response_fail_at: Instant, pub error: h2::Reason, pub bytes_sent: u64, @@ -64,6 +65,7 @@ pub struct StreamResponseFail { pub struct StreamResponseEnd { pub request_open_at: Instant, pub response_open_at: Instant, + pub response_first_frame_at: Instant, pub response_end_at: Instant, pub grpc_status: Option, pub bytes_sent: u64, diff --git a/proxy/src/telemetry/metrics/record.rs b/proxy/src/telemetry/metrics/record.rs index e398204ec..a8301c3a9 100644 --- a/proxy/src/telemetry/metrics/record.rs +++ b/proxy/src/telemetry/metrics/record.rs @@ -51,18 +51,19 @@ impl Record { Event::StreamResponseOpen(_, _) => {}, Event::StreamResponseEnd(ref res, ref end) => { - let request_open_at = end.response_end_at - end.request_open_at; + let latency = end.response_first_frame_at - end.request_open_at; self.update(|metrics| { metrics.response(ResponseLabels::new(res, end.grpc_status)) - .end(request_open_at); + .end(latency); }); }, Event::StreamResponseFail(ref res, ref fail) => { // TODO: do we care about the failure's error code here? - let request_open_at = fail.response_fail_at - fail.request_open_at; + let first_frame_at = fail.response_first_frame_at.unwrap_or(fail.response_fail_at); + let latency = first_frame_at - fail.request_open_at; self.update(|metrics| { - metrics.response(ResponseLabels::fail(res)).end(request_open_at) + metrics.response(ResponseLabels::fail(res)).end(latency) }); }, @@ -110,12 +111,15 @@ mod test { let (_, rsp) = request("http://buoyant.io", &server, &client, 1); let request_open_at = Instant::now(); - let response_open_at = request_open_at + Duration::from_millis(300); + let response_open_at = request_open_at + Duration::from_millis(100); + let response_first_frame_at = response_open_at + Duration::from_millis(100); + let response_end_at = response_first_frame_at + Duration::from_millis(100); let end = event::StreamResponseEnd { grpc_status: None, request_open_at, response_open_at, - response_end_at: response_open_at, + response_first_frame_at, + response_end_at, bytes_sent: 0, frames_sent: 0, }; @@ -141,9 +145,9 @@ mod test { assert_eq!(scope.total(), 1); - scope.latency().assert_bucket_exactly(300, 1); - scope.latency().assert_lt_exactly(300, 0); - scope.latency().assert_gt_exactly(300, 0); + scope.latency().assert_bucket_exactly(200, 1); + scope.latency().assert_lt_exactly(200, 0); + scope.latency().assert_gt_exactly(200, 0); } } @@ -178,8 +182,9 @@ mod test { let request_open_at = Instant::now(); let request_end_at = request_open_at + Duration::from_millis(10); - let response_open_at = request_open_at + Duration::from_millis(300); - let response_end_at = response_open_at; + let response_open_at = request_open_at + Duration::from_millis(100); + let response_first_frame_at = response_open_at + Duration::from_millis(100); + let response_end_at = response_first_frame_at + Duration::from_millis(100); let events = vec![ TransportOpen(server_transport.clone()), TransportOpen(client_transport.clone()), @@ -197,6 +202,7 @@ mod test { grpc_status: None, request_open_at, response_open_at, + response_first_frame_at, response_end_at, bytes_sent: 0, frames_sent: 0, @@ -261,9 +267,9 @@ mod test { assert_eq!(response_scope.total(), 1); response_scope.latency() - .assert_bucket_exactly(300, 1) - .assert_gt_exactly(300, 0) - .assert_lt_exactly(300, 0); + .assert_bucket_exactly(200, 1) + .assert_gt_exactly(200, 0) + .assert_lt_exactly(200, 0); // === server transport open scope ====================== let srv_transport_scope = lock diff --git a/proxy/src/telemetry/sensor/http.rs b/proxy/src/telemetry/sensor/http.rs index 005adbcd5..895188e6f 100644 --- a/proxy/src/telemetry/sensor/http.rs +++ b/proxy/src/telemetry/sensor/http.rs @@ -87,10 +87,9 @@ pub struct MeasuredBody { /// The `inner` portion of a `MeasuredBody`, with differing implementations /// for request and response streams. pub trait BodySensor: Sized { + fn frame(&mut self, bytes: usize); fn fail(self, reason: h2::Reason); fn end(self, grpc_status: Option); - fn frames_sent(&mut self) -> &mut u32; - fn bytes_sent(&mut self) -> &mut u64; } #[derive(Debug)] @@ -101,6 +100,7 @@ pub struct ResponseBodyInner { frames_sent: u32, request_open_at: Instant, response_open_at: Instant, + response_first_frame_at: Option, } @@ -332,9 +332,10 @@ where Arc::clone(&ctx), event::StreamResponseEnd { grpc_status, - response_end_at: response_open_at, request_open_at, response_open_at, + response_first_frame_at: response_open_at, + response_end_at: response_open_at, bytes_sent: 0, frames_sent: 0, }, @@ -350,6 +351,7 @@ where frames_sent: 0, request_open_at, response_open_at, + response_first_frame_at: None, }) } }); @@ -438,15 +440,14 @@ where } fn poll_data(&mut self) -> Poll, h2::Error> { - let frame = try_ready!(self.sense_err(|b| b.poll_data())); - let frame = frame.map(|frame| { - let frame = frame.into_buf(); + let frame = try_ready!(self.sense_err(|b| b.poll_data())) + .map(|f| f.into_buf()); + + if let Some(ref f) = frame { if let Some(ref mut inner) = self.inner { - *inner.frames_sent() += 1; - *inner.bytes_sent() += frame.remaining() as u64; + inner.frame(f.remaining()); } - frame - }); + } // If the frame ended the stream, send the end of stream event now, // as we may not be polled again. @@ -510,12 +511,21 @@ where impl BodySensor for ResponseBodyInner { + fn frame(&mut self, bytes: usize) { + self.frames_sent += 1; + self.bytes_sent += bytes as u64; + if self.response_first_frame_at.is_none() { + self.response_first_frame_at = Some(Instant::now()); + } + } + fn fail(self, error: h2::Reason) { let ResponseBodyInner { ctx, mut handle, request_open_at, response_open_at, + response_first_frame_at, bytes_sent, frames_sent, .. @@ -528,6 +538,7 @@ impl BodySensor for ResponseBodyInner { error, request_open_at, response_open_at, + response_first_frame_at, response_fail_at: Instant::now(), bytes_sent, frames_sent, @@ -542,9 +553,11 @@ impl BodySensor for ResponseBodyInner { mut handle, request_open_at, response_open_at, + response_first_frame_at, bytes_sent, frames_sent, } = self; + let response_end_at = Instant::now(); handle.send(|| event::Event::StreamResponseEnd( @@ -553,25 +566,23 @@ impl BodySensor for ResponseBodyInner { grpc_status, request_open_at, response_open_at, - response_end_at: Instant::now(), + response_first_frame_at: response_first_frame_at.unwrap_or(response_end_at), + response_end_at, bytes_sent, frames_sent, }, ) ) } - - fn frames_sent(&mut self) -> &mut u32 { - &mut self.frames_sent - } - - fn bytes_sent(&mut self) -> &mut u64 { - &mut self.bytes_sent - } } impl BodySensor for RequestBodyInner { + fn frame(&mut self, bytes: usize) { + self.frames_sent += 1; + self.bytes_sent += bytes as u64; + } + fn fail(self, error: h2::Reason) { let RequestBodyInner { ctx, @@ -610,14 +621,6 @@ impl BodySensor for RequestBodyInner { ) ) } - - fn frames_sent(&mut self) -> &mut u32 { - &mut self.frames_sent - } - - fn bytes_sent(&mut self) -> &mut u64 { - &mut self.bytes_sent - } } impl TimestampRequestOpen {