proxy: Alter telemetry to use discrete instants (#980)

Proxy tasks emit events to the telemetry system. These events are used
aggregate counts and latencies, as well as to inform Tap requests.
Initially, these events included durations, describing the relevant time
that elapsed between this event and another.

This approach is somewhat inflexible -- it unnecessarily constrains the
set of measurements that can computed in the telemetry system.

To remedy this, the `Event` types can be changed to report discrete
`Instant`s (rather than `Duration`s). Then, when latencies are computed
in the telemetry system, these discrete instants can be compared to
produce durations.

There are no functional changes in this PR.
This commit is contained in:
Oliver Gould 2018-05-22 14:57:00 -07:00 committed by GitHub
parent 8a1a3b31d4
commit 41d9f915ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 118 additions and 70 deletions

View File

@ -18,7 +18,7 @@ use std::{
fmt,
net::SocketAddr,
sync::Arc,
time::{Duration, SystemTime},
time::{Duration, Instant, SystemTime},
};
use test::Bencher;
@ -81,10 +81,13 @@ fn record_response_end(b: &mut Bencher) {
let (_, rsp) = request("http://buoyant.io", &server, &client, 1);
let request_open_at = Instant::now();
let response_open_at = request_open_at + Duration::from_millis(300);
let end = event::StreamResponseEnd {
grpc_status: None,
since_request_open: Duration::from_millis(300),
since_response_open: Duration::from_millis(0),
request_open_at,
response_open_at,
response_end_at: response_open_at,
bytes_sent: 0,
frames_sent: 0,
};
@ -110,22 +113,30 @@ fn record_one_conn_request(b: &mut Bencher) {
let server_transport = Arc::new(ctx::transport::Ctx::Server(server));
let client_transport = Arc::new(ctx::transport::Ctx::Client(client));
let request_open_at = Instant::now();
let request_end_at = request_open_at + Duration::from_millis(10);
let response_open_at = request_open_at + Duration::from_millis(300);
let response_end_at = response_open_at;
use Event::*;
let events = vec![
TransportOpen(server_transport.clone()),
TransportOpen(client_transport.clone()),
StreamRequestOpen(req.clone()),
StreamRequestEnd(req.clone(), event::StreamRequestEnd {
since_request_open: Duration::from_millis(10),
request_open_at,
request_end_at,
}),
StreamResponseOpen(rsp.clone(), event::StreamResponseOpen {
since_request_open: Duration::from_millis(300),
request_open_at,
response_open_at,
}),
StreamResponseEnd(rsp.clone(), event::StreamResponseEnd {
grpc_status: None,
since_request_open: Duration::from_millis(300),
since_response_open: Duration::from_millis(0),
request_open_at,
response_open_at,
response_end_at,
bytes_sent: 0,
frames_sent: 0,
}),
@ -159,6 +170,11 @@ fn record_many_dsts(b: &mut Bencher) {
let mut events = Vec::new();
events.push(TransportOpen(server_transport.clone()));
let request_open_at = Instant::now();
let request_end_at = request_open_at + Duration::from_millis(10);
let response_open_at = request_open_at + Duration::from_millis(300);
let response_end_at = response_open_at;
for n in 0..REQUESTS {
let client = client(&proxy, vec![
("service".into(), format!("svc{}", n)),
@ -173,16 +189,19 @@ fn record_many_dsts(b: &mut Bencher) {
events.push(StreamRequestOpen(req.clone()));
events.push(StreamRequestEnd(req.clone(), event::StreamRequestEnd {
since_request_open: Duration::from_millis(10),
request_open_at,
request_end_at,
}));
events.push(StreamResponseOpen(rsp.clone(), event::StreamResponseOpen {
since_request_open: Duration::from_millis(300),
request_open_at,
response_open_at,
}));
events.push(StreamResponseEnd(rsp.clone(), event::StreamResponseEnd {
grpc_status: None,
since_request_open: Duration::from_millis(300),
since_response_open: Duration::from_millis(0),
request_open_at,
response_open_at,
response_end_at,
bytes_sent: 0,
frames_sent: 0,
}));

View File

@ -34,8 +34,12 @@ mod gen {
}
}
pub fn pb_elapsed(t0: ::std::time::Instant, t1: ::std::time::Instant) -> ::prost_types::Duration {
pb_duration(t1 - t0)
}
/// Converts a Rust Duration to a Protobuf Duration.
pub fn pb_duration(d: &::std::time::Duration) -> ::prost_types::Duration {
pub fn pb_duration(d: ::std::time::Duration) -> ::prost_types::Duration {
let seconds = if d.as_secs() > ::std::i64::MAX as u64 {
::std::i64::MAX
} else {

View File

@ -39,8 +39,8 @@ impl event::StreamResponseEnd {
base: 0, // TODO FIXME
stream: ctx.id as u64,
}),
since_request_init: Some(pb_duration(&self.since_request_open)),
since_response_init: Some(pb_duration(&self.since_response_open)),
since_request_init: Some(pb_elapsed(self.request_open_at, self.response_end_at)),
since_response_init: Some(pb_elapsed(self.response_open_at, self.response_end_at)),
response_bytes: self.bytes_sent,
eos,
};
@ -71,8 +71,8 @@ impl event::StreamResponseFail {
base: 0, // TODO FIXME
stream: ctx.id as u64,
}),
since_request_init: Some(pb_duration(&self.since_request_open)),
since_response_init: Some(pb_duration(&self.since_response_open)),
since_request_init: Some(pb_elapsed(self.request_open_at, self.response_fail_at)),
since_response_init: Some(pb_elapsed(self.response_open_at, self.response_fail_at)),
response_bytes: self.bytes_sent,
eos: Some(self.error.into()),
};
@ -103,7 +103,7 @@ impl event::StreamRequestFail {
base: 0, // TODO FIXME
stream: ctx.id as u64,
}),
since_request_init: Some(pb_duration(&self.since_request_open)),
since_request_init: Some(pb_elapsed(self.request_open_at, self.request_fail_at)),
since_response_init: None,
response_bytes: 0,
eos: Some(self.error.into()),
@ -172,7 +172,7 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent {
// TODO FIXME
stream: ctx.request.id as u64,
}),
since_request_init: Some(pb_duration(&rsp.since_request_open)),
since_request_init: Some(pb_elapsed(rsp.request_open_at, rsp.response_open_at)),
http_status: u32::from(ctx.status.as_u16()),
};

View File

@ -1,5 +1,5 @@
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use h2;
@ -33,24 +33,28 @@ pub struct TransportClose {
#[derive(Clone, Debug)]
pub struct StreamRequestFail {
pub since_request_open: Duration,
pub request_open_at: Instant,
pub request_fail_at: Instant,
pub error: h2::Reason,
}
#[derive(Clone, Debug)]
pub struct StreamRequestEnd {
pub since_request_open: Duration,
pub request_open_at: Instant,
pub request_end_at: Instant,
}
#[derive(Clone, Debug)]
pub struct StreamResponseOpen {
pub since_request_open: Duration,
pub request_open_at: Instant,
pub response_open_at: Instant,
}
#[derive(Clone, Debug)]
pub struct StreamResponseFail {
pub since_request_open: Duration,
pub since_response_open: Duration,
pub request_open_at: Instant,
pub response_open_at: Instant,
pub response_fail_at: Instant,
pub error: h2::Reason,
pub bytes_sent: u64,
pub frames_sent: u32,
@ -58,9 +62,10 @@ pub struct StreamResponseFail {
#[derive(Clone, Debug)]
pub struct StreamResponseEnd {
pub request_open_at: Instant,
pub response_open_at: Instant,
pub response_end_at: Instant,
pub grpc_status: Option<u32>,
pub since_request_open: Duration,
pub since_response_open: Duration,
pub bytes_sent: u64,
pub frames_sent: u32,
}

View File

@ -51,16 +51,18 @@ impl Record {
Event::StreamResponseOpen(_, _) => {},
Event::StreamResponseEnd(ref res, ref end) => {
let request_open_at = end.response_end_at - end.request_open_at;
self.update(|metrics| {
metrics.response(ResponseLabels::new(res, end.grpc_status))
.end(end.since_request_open);
.end(request_open_at);
});
},
Event::StreamResponseFail(ref res, ref fail) => {
// TODO: do we care about the failure's error code here?
let request_open_at = fail.response_fail_at - fail.request_open_at;
self.update(|metrics| {
metrics.response(ResponseLabels::fail(res)).end(fail.since_request_open)
metrics.response(ResponseLabels::fail(res)).end(request_open_at)
});
},
@ -91,7 +93,7 @@ mod test {
Event,
};
use ctx::{self, test_util::* };
use std::time::Duration;
use std::time::{Duration, Instant};
#[test]
fn record_response_end() {
@ -107,10 +109,13 @@ mod test {
let (_, rsp) = request("http://buoyant.io", &server, &client, 1);
let request_open_at = Instant::now();
let response_open_at = request_open_at + Duration::from_millis(300);
let end = event::StreamResponseEnd {
grpc_status: None,
since_request_open: Duration::from_millis(300),
since_response_open: Duration::from_millis(0),
request_open_at,
response_open_at,
response_end_at: response_open_at,
bytes_sent: 0,
frames_sent: 0,
};
@ -171,25 +176,32 @@ mod test {
tx_bytes: 4321,
};
let request_open_at = Instant::now();
let request_end_at = request_open_at + Duration::from_millis(10);
let response_open_at = request_open_at + Duration::from_millis(300);
let response_end_at = response_open_at;
let events = vec![
TransportOpen(server_transport.clone()),
TransportOpen(client_transport.clone()),
StreamRequestOpen(req.clone()),
StreamRequestEnd(req.clone(), event::StreamRequestEnd {
since_request_open: Duration::from_millis(10),
request_open_at,
request_end_at,
}),
StreamResponseOpen(rsp.clone(), event::StreamResponseOpen {
since_request_open: Duration::from_millis(300),
request_open_at,
response_open_at,
}),
StreamResponseEnd(rsp.clone(), event::StreamResponseEnd {
grpc_status: None,
since_request_open: Duration::from_millis(300),
since_response_open: Duration::from_millis(0),
request_open_at,
response_open_at,
response_end_at,
bytes_sent: 0,
frames_sent: 0,
}),
TransportClose(
TransportClose(
server_transport.clone(),
transport_close.clone(),
),

View File

@ -6,7 +6,7 @@ use std::default::Default;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use std::time::Instant;
use tower_service::{NewService, Service};
use tower_h2::{client, Body};
@ -71,7 +71,7 @@ pub struct Respond<F, B> {
struct RespondInner {
handle: super::Handle,
ctx: Arc<ctx::http::Request>,
request_open: Instant,
request_open_at: Instant,
}
pub type ResponseBody<B> = MeasuredBody<B, ResponseBodyInner>;
@ -99,8 +99,8 @@ pub struct ResponseBodyInner {
ctx: Arc<ctx::http::Response>,
bytes_sent: u64,
frames_sent: u32,
request_open: Instant,
response_open: Instant,
request_open_at: Instant,
response_open_at: Instant,
}
@ -110,7 +110,7 @@ pub struct RequestBodyInner {
ctx: Arc<ctx::http::Request>,
bytes_sent: u64,
frames_sent: u32,
request_open: Instant,
request_open_at: Instant,
}
// === NewHttp ===
@ -227,7 +227,7 @@ where
req.extensions_mut().remove::<RequestOpen>()
);
let (inner, body_inner) = match metadata {
(Some(ctx), Some(RequestOpen(request_open))) => {
(Some(ctx), Some(RequestOpen(request_open_at))) => {
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let ctx = ctx::http::Request::new(&req, &ctx, &self.client_ctx, id);
@ -237,7 +237,7 @@ where
let respond_inner = Some(RespondInner {
ctx: ctx.clone(),
handle: self.handle.clone(),
request_open,
request_open_at,
});
let body_inner =
if req.body().is_end_stream() {
@ -245,7 +245,8 @@ where
Event::StreamRequestEnd(
Arc::clone(&ctx),
event::StreamRequestEnd {
since_request_open: request_open.elapsed(),
request_open_at,
request_end_at: request_open_at,
},
)
});
@ -254,17 +255,17 @@ where
Some(RequestBodyInner {
ctx,
handle: self.handle.clone(),
request_open,
request_open_at,
frames_sent: 0,
bytes_sent: 0,
})
};
(respond_inner, body_inner)
},
(ctx, request_open) => {
(ctx, request_open_at) => {
warn!(
"missing metadata for a request to {:?}; ctx={:?}; request_open={:?};",
req.uri(), ctx, request_open
"missing metadata for a request to {:?}; ctx={:?}; request_open_at={:?};",
req.uri(), ctx, request_open_at
);
(None, None)
},
@ -304,16 +305,18 @@ where
let RespondInner {
ctx,
mut handle,
request_open,
request_open_at,
} = i;
let ctx = ctx::http::Response::new(&rsp, &ctx);
let response_open_at = Instant::now();
handle.send(|| {
Event::StreamResponseOpen(
Arc::clone(&ctx),
event::StreamResponseOpen {
since_request_open: request_open.elapsed(),
request_open_at,
response_open_at,
},
)
});
@ -329,8 +332,9 @@ where
Arc::clone(&ctx),
event::StreamResponseEnd {
grpc_status,
since_request_open: request_open.elapsed(),
since_response_open: Duration::default(),
response_end_at: response_open_at,
request_open_at,
response_open_at,
bytes_sent: 0,
frames_sent: 0,
},
@ -344,8 +348,8 @@ where
ctx,
bytes_sent: 0,
frames_sent: 0,
request_open,
response_open: Instant::now(),
request_open_at,
response_open_at,
})
}
});
@ -365,7 +369,7 @@ where
let RespondInner {
ctx,
mut handle,
request_open,
request_open_at,
} = i;
handle.send(|| {
@ -373,7 +377,8 @@ where
Arc::clone(&ctx),
event::StreamRequestFail {
error,
since_request_open: request_open.elapsed(),
request_open_at,
request_fail_at: Instant::now(),
},
)
});
@ -509,8 +514,8 @@ impl BodySensor for ResponseBodyInner {
let ResponseBodyInner {
ctx,
mut handle,
request_open,
response_open,
request_open_at,
response_open_at,
bytes_sent,
frames_sent,
..
@ -521,8 +526,9 @@ impl BodySensor for ResponseBodyInner {
Arc::clone(&ctx),
event::StreamResponseFail {
error,
since_request_open: request_open.elapsed(),
since_response_open: response_open.elapsed(),
request_open_at,
response_open_at,
response_fail_at: Instant::now(),
bytes_sent,
frames_sent,
},
@ -534,8 +540,8 @@ impl BodySensor for ResponseBodyInner {
let ResponseBodyInner {
ctx,
mut handle,
request_open,
response_open,
request_open_at,
response_open_at,
bytes_sent,
frames_sent,
} = self;
@ -545,8 +551,9 @@ impl BodySensor for ResponseBodyInner {
Arc::clone(&ctx),
event::StreamResponseEnd {
grpc_status,
since_request_open: request_open.elapsed(),
since_response_open: response_open.elapsed(),
request_open_at,
response_open_at,
response_end_at: Instant::now(),
bytes_sent,
frames_sent,
},
@ -569,7 +576,7 @@ impl BodySensor for RequestBodyInner {
let RequestBodyInner {
ctx,
mut handle,
request_open,
request_open_at,
..
} = self;
@ -578,7 +585,8 @@ impl BodySensor for RequestBodyInner {
Arc::clone(&ctx),
event::StreamRequestFail {
error,
since_request_open: request_open.elapsed(),
request_open_at,
request_fail_at: Instant::now(),
},
)
)
@ -588,7 +596,7 @@ impl BodySensor for RequestBodyInner {
let RequestBodyInner {
ctx,
mut handle,
request_open,
request_open_at,
..
} = self;
@ -596,7 +604,8 @@ impl BodySensor for RequestBodyInner {
event::Event::StreamRequestEnd(
Arc::clone(&ctx),
event::StreamRequestEnd {
since_request_open: request_open.elapsed(),
request_open_at,
request_end_at: Instant::now(),
},
)
)
@ -631,8 +640,7 @@ where
}
fn call(&mut self, mut req: Self::Request) -> Self::Future {
let request_open = Instant::now();
req.extensions_mut().insert(RequestOpen(request_open));
req.extensions_mut().insert(RequestOpen(Instant::now()));
self.inner.call(req)
}
}