From afa7fef976326d533181182932f0b5494b765ee0 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 22 May 2018 14:57:00 -0700 Subject: [PATCH] proxy: Alter telemetry to use discrete instants (#980) Proxy tasks emit events to the telemetry system. These events are used aggregate counts and latencies, as well as to inform Tap requests. Initially, these events included durations, describing the relevant time that elapsed between this event and another. This approach is somewhat inflexible -- it unnecessarily constrains the set of measurements that can computed in the telemetry system. To remedy this, the `Event` types can be changed to report discrete `Instant`s (rather than `Duration`s). Then, when latencies are computed in the telemetry system, these discrete instants can be compared to produce durations. There are no functional changes in this PR. --- proxy/benches/record.rs | 41 +++++++++++---- proxy/src/control/pb.rs | 12 ++--- proxy/src/telemetry/event.rs | 21 +++++--- proxy/src/telemetry/metrics/record.rs | 32 +++++++---- proxy/src/telemetry/sensor/http.rs | 76 +++++++++++++++------------ 5 files changed, 113 insertions(+), 69 deletions(-) diff --git a/proxy/benches/record.rs b/proxy/benches/record.rs index 500a94285..372b09a96 100644 --- a/proxy/benches/record.rs +++ b/proxy/benches/record.rs @@ -18,7 +18,7 @@ use std::{ fmt, net::SocketAddr, sync::Arc, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use test::Bencher; @@ -81,10 +81,13 @@ fn record_response_end(b: &mut Bencher) { let (_, rsp) = request("http://buoyant.io", &server, &client, 1); + let request_open_at = Instant::now(); + let response_open_at = request_open_at + Duration::from_millis(300); let end = event::StreamResponseEnd { grpc_status: None, - since_request_open: Duration::from_millis(300), - since_response_open: Duration::from_millis(0), + request_open_at, + response_open_at, + response_end_at: response_open_at, bytes_sent: 0, frames_sent: 0, }; @@ -110,22 +113,30 @@ fn record_one_conn_request(b: &mut Bencher) { let server_transport = Arc::new(ctx::transport::Ctx::Server(server)); let client_transport = Arc::new(ctx::transport::Ctx::Client(client)); + let request_open_at = Instant::now(); + let request_end_at = request_open_at + Duration::from_millis(10); + let response_open_at = request_open_at + Duration::from_millis(300); + let response_end_at = response_open_at; + use Event::*; let events = vec![ TransportOpen(server_transport.clone()), TransportOpen(client_transport.clone()), StreamRequestOpen(req.clone()), StreamRequestEnd(req.clone(), event::StreamRequestEnd { - since_request_open: Duration::from_millis(10), + request_open_at, + request_end_at, }), StreamResponseOpen(rsp.clone(), event::StreamResponseOpen { - since_request_open: Duration::from_millis(300), + request_open_at, + response_open_at, }), StreamResponseEnd(rsp.clone(), event::StreamResponseEnd { grpc_status: None, - since_request_open: Duration::from_millis(300), - since_response_open: Duration::from_millis(0), + request_open_at, + response_open_at, + response_end_at, bytes_sent: 0, frames_sent: 0, }), @@ -159,6 +170,11 @@ fn record_many_dsts(b: &mut Bencher) { let mut events = Vec::new(); events.push(TransportOpen(server_transport.clone())); + let request_open_at = Instant::now(); + let request_end_at = request_open_at + Duration::from_millis(10); + let response_open_at = request_open_at + Duration::from_millis(300); + let response_end_at = response_open_at; + for n in 0..REQUESTS { let client = client(&proxy, vec![ ("service".into(), format!("svc{}", n)), @@ -173,16 +189,19 @@ fn record_many_dsts(b: &mut Bencher) { events.push(StreamRequestOpen(req.clone())); events.push(StreamRequestEnd(req.clone(), event::StreamRequestEnd { - since_request_open: Duration::from_millis(10), + request_open_at, + request_end_at, })); events.push(StreamResponseOpen(rsp.clone(), event::StreamResponseOpen { - since_request_open: Duration::from_millis(300), + request_open_at, + response_open_at, })); events.push(StreamResponseEnd(rsp.clone(), event::StreamResponseEnd { grpc_status: None, - since_request_open: Duration::from_millis(300), - since_response_open: Duration::from_millis(0), + request_open_at, + response_open_at, + response_end_at, bytes_sent: 0, frames_sent: 0, })); diff --git a/proxy/src/control/pb.rs b/proxy/src/control/pb.rs index 6bb314136..a9836fa84 100644 --- a/proxy/src/control/pb.rs +++ b/proxy/src/control/pb.rs @@ -39,8 +39,8 @@ impl event::StreamResponseEnd { base: 0, // TODO FIXME stream: ctx.id as u64, }), - since_request_init: Some(pb_duration(&self.since_request_open)), - since_response_init: Some(pb_duration(&self.since_response_open)), + since_request_init: Some(pb_elapsed(self.request_open_at, self.response_end_at)), + since_response_init: Some(pb_elapsed(self.response_open_at, self.response_end_at)), response_bytes: self.bytes_sent, eos, }; @@ -71,8 +71,8 @@ impl event::StreamResponseFail { base: 0, // TODO FIXME stream: ctx.id as u64, }), - since_request_init: Some(pb_duration(&self.since_request_open)), - since_response_init: Some(pb_duration(&self.since_response_open)), + since_request_init: Some(pb_elapsed(self.request_open_at, self.response_fail_at)), + since_response_init: Some(pb_elapsed(self.response_open_at, self.response_fail_at)), response_bytes: self.bytes_sent, eos: Some(self.error.into()), }; @@ -103,7 +103,7 @@ impl event::StreamRequestFail { base: 0, // TODO FIXME stream: ctx.id as u64, }), - since_request_init: Some(pb_duration(&self.since_request_open)), + since_request_init: Some(pb_elapsed(self.request_open_at, self.request_fail_at)), since_response_init: None, response_bytes: 0, eos: Some(self.error.into()), @@ -172,7 +172,7 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent { // TODO FIXME stream: ctx.request.id as u64, }), - since_request_init: Some(pb_duration(&rsp.since_request_open)), + since_request_init: Some(pb_elapsed(rsp.request_open_at, rsp.response_open_at)), http_status: u32::from(ctx.status.as_u16()), }; diff --git a/proxy/src/telemetry/event.rs b/proxy/src/telemetry/event.rs index f07c0fa09..0de8edf85 100644 --- a/proxy/src/telemetry/event.rs +++ b/proxy/src/telemetry/event.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use h2; @@ -33,24 +33,28 @@ pub struct TransportClose { #[derive(Clone, Debug)] pub struct StreamRequestFail { - pub since_request_open: Duration, + pub request_open_at: Instant, + pub request_fail_at: Instant, pub error: h2::Reason, } #[derive(Clone, Debug)] pub struct StreamRequestEnd { - pub since_request_open: Duration, + pub request_open_at: Instant, + pub request_end_at: Instant, } #[derive(Clone, Debug)] pub struct StreamResponseOpen { - pub since_request_open: Duration, + pub request_open_at: Instant, + pub response_open_at: Instant, } #[derive(Clone, Debug)] pub struct StreamResponseFail { - pub since_request_open: Duration, - pub since_response_open: Duration, + pub request_open_at: Instant, + pub response_open_at: Instant, + pub response_fail_at: Instant, pub error: h2::Reason, pub bytes_sent: u64, pub frames_sent: u32, @@ -58,9 +62,10 @@ pub struct StreamResponseFail { #[derive(Clone, Debug)] pub struct StreamResponseEnd { + pub request_open_at: Instant, + pub response_open_at: Instant, + pub response_end_at: Instant, pub grpc_status: Option, - pub since_request_open: Duration, - pub since_response_open: Duration, pub bytes_sent: u64, pub frames_sent: u32, } diff --git a/proxy/src/telemetry/metrics/record.rs b/proxy/src/telemetry/metrics/record.rs index bd844526a..e398204ec 100644 --- a/proxy/src/telemetry/metrics/record.rs +++ b/proxy/src/telemetry/metrics/record.rs @@ -51,16 +51,18 @@ impl Record { Event::StreamResponseOpen(_, _) => {}, Event::StreamResponseEnd(ref res, ref end) => { + let request_open_at = end.response_end_at - end.request_open_at; self.update(|metrics| { metrics.response(ResponseLabels::new(res, end.grpc_status)) - .end(end.since_request_open); + .end(request_open_at); }); }, Event::StreamResponseFail(ref res, ref fail) => { // TODO: do we care about the failure's error code here? + let request_open_at = fail.response_fail_at - fail.request_open_at; self.update(|metrics| { - metrics.response(ResponseLabels::fail(res)).end(fail.since_request_open) + metrics.response(ResponseLabels::fail(res)).end(request_open_at) }); }, @@ -91,7 +93,7 @@ mod test { Event, }; use ctx::{self, test_util::* }; - use std::time::Duration; + use std::time::{Duration, Instant}; #[test] fn record_response_end() { @@ -107,10 +109,13 @@ mod test { let (_, rsp) = request("http://buoyant.io", &server, &client, 1); + let request_open_at = Instant::now(); + let response_open_at = request_open_at + Duration::from_millis(300); let end = event::StreamResponseEnd { grpc_status: None, - since_request_open: Duration::from_millis(300), - since_response_open: Duration::from_millis(0), + request_open_at, + response_open_at, + response_end_at: response_open_at, bytes_sent: 0, frames_sent: 0, }; @@ -171,25 +176,32 @@ mod test { tx_bytes: 4321, }; + let request_open_at = Instant::now(); + let request_end_at = request_open_at + Duration::from_millis(10); + let response_open_at = request_open_at + Duration::from_millis(300); + let response_end_at = response_open_at; let events = vec![ TransportOpen(server_transport.clone()), TransportOpen(client_transport.clone()), StreamRequestOpen(req.clone()), StreamRequestEnd(req.clone(), event::StreamRequestEnd { - since_request_open: Duration::from_millis(10), + request_open_at, + request_end_at, }), StreamResponseOpen(rsp.clone(), event::StreamResponseOpen { - since_request_open: Duration::from_millis(300), + request_open_at, + response_open_at, }), StreamResponseEnd(rsp.clone(), event::StreamResponseEnd { grpc_status: None, - since_request_open: Duration::from_millis(300), - since_response_open: Duration::from_millis(0), + request_open_at, + response_open_at, + response_end_at, bytes_sent: 0, frames_sent: 0, }), - TransportClose( + TransportClose( server_transport.clone(), transport_close.clone(), ), diff --git a/proxy/src/telemetry/sensor/http.rs b/proxy/src/telemetry/sensor/http.rs index adedb2b52..005adbcd5 100644 --- a/proxy/src/telemetry/sensor/http.rs +++ b/proxy/src/telemetry/sensor/http.rs @@ -6,7 +6,7 @@ use std::default::Default; use std::marker::PhantomData; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::{Duration, Instant}; +use std::time::Instant; use tower_service::{NewService, Service}; use tower_h2::{client, Body}; @@ -71,7 +71,7 @@ pub struct Respond { struct RespondInner { handle: super::Handle, ctx: Arc, - request_open: Instant, + request_open_at: Instant, } pub type ResponseBody = MeasuredBody; @@ -99,8 +99,8 @@ pub struct ResponseBodyInner { ctx: Arc, bytes_sent: u64, frames_sent: u32, - request_open: Instant, - response_open: Instant, + request_open_at: Instant, + response_open_at: Instant, } @@ -110,7 +110,7 @@ pub struct RequestBodyInner { ctx: Arc, bytes_sent: u64, frames_sent: u32, - request_open: Instant, + request_open_at: Instant, } // === NewHttp === @@ -227,7 +227,7 @@ where req.extensions_mut().remove::() ); let (inner, body_inner) = match metadata { - (Some(ctx), Some(RequestOpen(request_open))) => { + (Some(ctx), Some(RequestOpen(request_open_at))) => { let id = self.next_id.fetch_add(1, Ordering::SeqCst); let ctx = ctx::http::Request::new(&req, &ctx, &self.client_ctx, id); @@ -237,7 +237,7 @@ where let respond_inner = Some(RespondInner { ctx: ctx.clone(), handle: self.handle.clone(), - request_open, + request_open_at, }); let body_inner = if req.body().is_end_stream() { @@ -245,7 +245,8 @@ where Event::StreamRequestEnd( Arc::clone(&ctx), event::StreamRequestEnd { - since_request_open: request_open.elapsed(), + request_open_at, + request_end_at: request_open_at, }, ) }); @@ -254,17 +255,17 @@ where Some(RequestBodyInner { ctx, handle: self.handle.clone(), - request_open, + request_open_at, frames_sent: 0, bytes_sent: 0, }) }; (respond_inner, body_inner) }, - (ctx, request_open) => { + (ctx, request_open_at) => { warn!( - "missing metadata for a request to {:?}; ctx={:?}; request_open={:?};", - req.uri(), ctx, request_open + "missing metadata for a request to {:?}; ctx={:?}; request_open_at={:?};", + req.uri(), ctx, request_open_at ); (None, None) }, @@ -304,16 +305,18 @@ where let RespondInner { ctx, mut handle, - request_open, + request_open_at, } = i; let ctx = ctx::http::Response::new(&rsp, &ctx); + let response_open_at = Instant::now(); handle.send(|| { Event::StreamResponseOpen( Arc::clone(&ctx), event::StreamResponseOpen { - since_request_open: request_open.elapsed(), + request_open_at, + response_open_at, }, ) }); @@ -329,8 +332,9 @@ where Arc::clone(&ctx), event::StreamResponseEnd { grpc_status, - since_request_open: request_open.elapsed(), - since_response_open: Duration::default(), + response_end_at: response_open_at, + request_open_at, + response_open_at, bytes_sent: 0, frames_sent: 0, }, @@ -344,8 +348,8 @@ where ctx, bytes_sent: 0, frames_sent: 0, - request_open, - response_open: Instant::now(), + request_open_at, + response_open_at, }) } }); @@ -365,7 +369,7 @@ where let RespondInner { ctx, mut handle, - request_open, + request_open_at, } = i; handle.send(|| { @@ -373,7 +377,8 @@ where Arc::clone(&ctx), event::StreamRequestFail { error, - since_request_open: request_open.elapsed(), + request_open_at, + request_fail_at: Instant::now(), }, ) }); @@ -509,8 +514,8 @@ impl BodySensor for ResponseBodyInner { let ResponseBodyInner { ctx, mut handle, - request_open, - response_open, + request_open_at, + response_open_at, bytes_sent, frames_sent, .. @@ -521,8 +526,9 @@ impl BodySensor for ResponseBodyInner { Arc::clone(&ctx), event::StreamResponseFail { error, - since_request_open: request_open.elapsed(), - since_response_open: response_open.elapsed(), + request_open_at, + response_open_at, + response_fail_at: Instant::now(), bytes_sent, frames_sent, }, @@ -534,8 +540,8 @@ impl BodySensor for ResponseBodyInner { let ResponseBodyInner { ctx, mut handle, - request_open, - response_open, + request_open_at, + response_open_at, bytes_sent, frames_sent, } = self; @@ -545,8 +551,9 @@ impl BodySensor for ResponseBodyInner { Arc::clone(&ctx), event::StreamResponseEnd { grpc_status, - since_request_open: request_open.elapsed(), - since_response_open: response_open.elapsed(), + request_open_at, + response_open_at, + response_end_at: Instant::now(), bytes_sent, frames_sent, }, @@ -569,7 +576,7 @@ impl BodySensor for RequestBodyInner { let RequestBodyInner { ctx, mut handle, - request_open, + request_open_at, .. } = self; @@ -578,7 +585,8 @@ impl BodySensor for RequestBodyInner { Arc::clone(&ctx), event::StreamRequestFail { error, - since_request_open: request_open.elapsed(), + request_open_at, + request_fail_at: Instant::now(), }, ) ) @@ -588,7 +596,7 @@ impl BodySensor for RequestBodyInner { let RequestBodyInner { ctx, mut handle, - request_open, + request_open_at, .. } = self; @@ -596,7 +604,8 @@ impl BodySensor for RequestBodyInner { event::Event::StreamRequestEnd( Arc::clone(&ctx), event::StreamRequestEnd { - since_request_open: request_open.elapsed(), + request_open_at, + request_end_at: Instant::now(), }, ) ) @@ -631,8 +640,7 @@ where } fn call(&mut self, mut req: Self::Request) -> Self::Future { - let request_open = Instant::now(); - req.extensions_mut().insert(RequestOpen(request_open)); + req.extensions_mut().insert(RequestOpen(Instant::now())); self.inner.call(req) } }