Hide HTTP telemetry implementation details (#80)

Previously, much of `telemetry::http`'s types and internal
implementation details are exposed to the rest of the telemetry system.
In preparation for further changes to support more granular locking,
this change makes metric storage and recording implementation details.

Following this, `telemetry::http` exposes a `Report` type for printing
metrics to the server and a `Sensors` type used to instrument stacks
with HTTP telemetry. These types share an internally-mutable metrics
registry that is private to the http module.

The `event` types continue to be exposed to support Tap, but the
convenience exports have been removed.

The `metrics::Root` type no longer needs to be shareable. This type will
be replaced in a followup change.
This commit is contained in:
Oliver Gould 2018-08-22 15:17:11 -07:00 committed by GitHub
parent b0526e4af7
commit dfcc0086b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 230 additions and 218 deletions

View File

@ -1,4 +1,5 @@
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use super::metrics::{
@ -8,6 +9,7 @@ use super::metrics::{
Histogram,
Scopes,
};
use telemetry::tap::Taps;
pub mod event;
mod labels;
@ -15,18 +17,53 @@ mod record;
mod sensors;
pub mod service;
pub use self::labels::{RequestLabels, ResponseLabels};
pub use self::record::Record;
use self::labels::{RequestLabels, ResponseLabels};
use self::record::Record;
pub use self::sensors::Sensors;
pub(super) type RequestScopes = Scopes<RequestLabels, Stamped<RequestMetrics>>;
metrics! {
request_total: Counter { "Total count of HTTP requests." },
response_total: Counter { "Total count of HTTP responses" },
response_latency_ms: Histogram<latency::Ms> {
"Elapsed times between a request's headers being received \
and its response stream completing"
}
}
pub fn new(taps: &Arc<Mutex<Taps>>) -> (Sensors, Report) {
let inner = Arc::new(Mutex::new(Inner::default()));
let sensors = Sensors::new(Record::new(Registry(inner.clone())), taps);
(sensors, Report(inner))
}
/// Updates HTTP metrics.
///
/// TODO Currently, this is only used by `Record`. Later this, will be made
/// public and `Record` will be obviated.
#[derive(Clone, Debug)]
struct Registry(Arc<Mutex<Inner>>);
/// Reports HTTP metrics for prometheus.
///
/// TODO retain_since should be done implicitly and should not be part of the
/// public interface.
#[derive(Clone, Debug)]
pub struct Report(Arc<Mutex<Inner>>);
#[derive(Debug, Default)]
pub(super) struct RequestMetrics {
struct Inner {
requests: RequestScopes,
responses: ResponseScopes,
}
type RequestScopes = Scopes<RequestLabels, Stamped<RequestMetrics>>;
#[derive(Debug, Default)]
struct RequestMetrics {
total: Counter,
}
pub(super) type ResponseScopes = Scopes<ResponseLabels, Stamped<ResponseMetrics>>;
type ResponseScopes = Scopes<ResponseLabels, Stamped<ResponseMetrics>>;
#[derive(Debug, Default)]
pub struct ResponseMetrics {
@ -35,27 +72,69 @@ pub struct ResponseMetrics {
}
#[derive(Debug)]
pub(super) struct Stamped<T> {
struct Stamped<T> {
stamp: Instant,
inner: T,
}
// ===== impl RequestScopes =====
// ===== impl Registry =====
impl RequestScopes {
metrics! {
request_total: Counter { "Total count of HTTP requests." }
impl Registry {
#[cfg(test)]
fn for_test() -> Self {
Registry(Arc::new(Mutex::new(Inner::default())))
}
fn end_request(&mut self, labels: RequestLabels) {
let mut inner = match self.0.lock() {
Err(_) => return,
Ok(lock) => lock,
};
inner.requests.get_or_default(labels).stamped().end()
}
fn end_response(&mut self, labels: ResponseLabels, latency: Duration) {
let mut inner = match self.0.lock() {
Err(_) => return,
Ok(lock) => lock,
};
inner.responses.get_or_default(labels).stamped().end(latency)
}
}
impl FmtMetrics for RequestScopes {
// ===== impl Report =====
impl Report {
pub(super) fn retain_since(&mut self, epoch: Instant) {
if let Ok(mut inner) = self.0.lock() {
inner.requests.retain(|_, v| v.stamp >= epoch);
inner.responses.retain(|_, v| v.stamp >= epoch);
}
}
}
impl FmtMetrics for Report {
fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.is_empty() {
return Ok(());
let inner = match self.0.lock() {
Err(_) => return Ok(()),
Ok(inner) => inner,
};
if !inner.requests.is_empty() {
request_total.fmt_help(f)?;
request_total.fmt_scopes(f, &inner.requests, |s| &s.total)?;
}
Self::request_total.fmt_help(f)?;
Self::request_total.fmt_scopes(f, self, |s| &s.total)?;
if !inner.responses.is_empty() {
response_total.fmt_help(f)?;
response_total.fmt_scopes(f, &inner.responses, |s| &s.total)?;
response_latency_ms.fmt_help(f)?;
response_latency_ms.fmt_scopes(f, &inner.responses, |s| &s.latency)?;
}
Ok(())
}
@ -74,34 +153,6 @@ impl RequestMetrics {
}
}
// ===== impl ResponseScopes =====
impl ResponseScopes {
metrics! {
response_total: Counter { "Total count of HTTP responses" },
response_latency_ms: Histogram<latency::Ms> {
"Elapsed times between a request's headers being received \
and its response stream completing"
}
}
}
impl FmtMetrics for ResponseScopes {
fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.is_empty() {
return Ok(());
}
Self::response_total.fmt_help(f)?;
Self::response_total.fmt_scopes(f, self, |s| &s.total)?;
Self::response_latency_ms.fmt_help(f)?;
Self::response_latency_ms.fmt_scopes(f, self, |s| &s.latency)?;
Ok(())
}
}
// ===== impl ResponseMetrics =====
impl ResponseMetrics {
@ -124,11 +175,7 @@ impl ResponseMetrics {
// ===== impl Stamped =====
impl<T> Stamped<T> {
pub fn stamp(&self) -> Instant {
self.stamp
}
pub fn stamped(&mut self) -> &mut T {
fn stamped(&mut self) -> &mut T {
self.stamp = Instant::now();
&mut self.inner
}
@ -155,3 +202,75 @@ impl<T> ::std::ops::Deref for Stamped<T> {
&self.inner
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use ctx;
use ctx::test_util::*;
use super::*;
use conditional::Conditional;
use tls;
const TLS_DISABLED: Conditional<(), tls::ReasonForNoTls> =
Conditional::None(tls::ReasonForNoTls::Disabled);
fn mock_route(
registry: &mut Registry,
proxy: ctx::Proxy,
server: &Arc<ctx::transport::Server>,
team: &str
) {
let client = client(proxy, indexmap!["team".into() => team.into(),], TLS_DISABLED);
let (req, rsp) = request("http://nba.com", &server, &client);
registry.end_request(RequestLabels::new(&req));
registry.end_response(ResponseLabels::new(&rsp, None), Duration::from_millis(10));
}
#[test]
fn expiry() {
let proxy = ctx::Proxy::Outbound;
let server = server(proxy, TLS_DISABLED);
let inner = Arc::new(Mutex::new(Inner::default()));
let mut registry = Registry(inner.clone());
let mut report = Report(inner.clone());
let t0 = Instant::now();
mock_route(&mut registry, proxy, &server, "warriors");
let t1 = Instant::now();
mock_route(&mut registry, proxy, &server, "sixers");
let t2 = Instant::now();
{
let inner = inner.lock().unwrap();
assert_eq!(inner.requests.len(), 2);
assert_eq!(inner.responses.len(), 2);
}
report.retain_since(t0);
{
let inner = inner.lock().unwrap();
assert_eq!(inner.requests.len(), 2);
assert_eq!(inner.responses.len(), 2);
}
report.retain_since(t1);
{
let inner = inner.lock().unwrap();
assert_eq!(inner.requests.len(), 1);
assert_eq!(inner.responses.len(), 1);
}
report.retain_since(t2);
{
let inner = inner.lock().unwrap();
assert_eq!(inner.requests.len(), 0);
assert_eq!(inner.responses.len(), 0);
}
}
}

View File

@ -1,35 +1,23 @@
use std::sync::{Arc, Mutex};
use telemetry::http::event::Event;
use telemetry::metrics::Root;
use super::labels::{
RequestLabels,
ResponseLabels,
};
use super::Registry;
use super::event::Event;
use super::labels::{RequestLabels, ResponseLabels};
/// Tracks Prometheus metrics
#[derive(Clone, Debug)]
pub struct Record {
metrics: Arc<Mutex<Root>>,
metrics: Registry,
}
// ===== impl Record =====
impl Record {
pub fn new(metrics: &Arc<Mutex<Root>>) -> Self {
Self { metrics: metrics.clone() }
pub(super) fn new(metrics: Registry) -> Self {
Self { metrics }
}
#[cfg(test)]
pub fn for_test() -> Self {
Self { metrics: Default::default() }
}
#[inline]
fn update<F: Fn(&mut Root)>(&mut self, f: F) {
let mut lock = self.metrics.lock()
.expect("metrics lock poisoned");
f(&mut *lock);
Self { metrics: Registry::for_test() }
}
/// Observe the given event.
@ -40,34 +28,25 @@ impl Record {
Event::StreamRequestOpen(_) => {},
Event::StreamRequestFail(ref req, _) => {
self.update(|metrics| {
metrics.request(RequestLabels::new(req)).end();
})
self.metrics.end_request(RequestLabels::new(req));
},
Event::StreamRequestEnd(ref req, _) => {
self.update(|metrics| {
metrics.request(RequestLabels::new(req)).end();
})
self.metrics.end_request(RequestLabels::new(req));
},
Event::StreamResponseOpen(_, _) => {},
Event::StreamResponseEnd(ref res, ref end) => {
let latency = end.response_first_frame_at - end.request_open_at;
self.update(|metrics| {
metrics.response(ResponseLabels::new(res, end.grpc_status))
.end(latency);
});
self.metrics.end_response(ResponseLabels::new(res, end.grpc_status), latency);
},
Event::StreamResponseFail(ref res, ref fail) => {
// TODO: do we care about the failure's error code here?
let first_frame_at = fail.response_first_frame_at.unwrap_or(fail.response_fail_at);
let latency = first_frame_at - fail.request_open_at;
self.update(|metrics| {
metrics.response(ResponseLabels::fail(res)).end(latency)
});
self.metrics.end_response(ResponseLabels::fail(res), latency);
},
};
}
@ -75,19 +54,22 @@ impl Record {
#[cfg(test)]
mod test {
use std::time::{Duration, Instant};
use super::*;
use super::super::event;
use super::super::labels::{RequestLabels, ResponseLabels};
use ctx::{self, test_util::*, transport::TlsStatus};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use conditional::Conditional;
use telemetry::http::{event::{self, Event}, labels};
use tls;
const TLS_ENABLED: Conditional<(), tls::ReasonForNoTls> = Conditional::Some(());
const TLS_DISABLED: Conditional<(), tls::ReasonForNoTls> =
Conditional::None(tls::ReasonForNoTls::Disabled);
fn new_record() -> super::Record {
super::Record::new(&Default::default())
fn new_record() -> (Record, Arc<Mutex<super::super::Inner>>) {
let inner = Arc::new(Mutex::new(super::super::Inner::default()));
(Record::new(Registry(inner.clone())), inner)
}
fn test_record_response_end_outbound(client_tls: TlsStatus, server_tls: TlsStatus) {
@ -116,23 +98,17 @@ mod test {
frames_sent: 0,
};
let mut r = new_record();
let (mut r, i) = new_record();
let ev = Event::StreamResponseEnd(rsp.clone(), end.clone());
let labels = labels::ResponseLabels::new(&rsp, None);
let labels = ResponseLabels::new(&rsp, None);
assert_eq!(labels.tls_status(), client_tls);
assert!(r.metrics.lock()
.expect("lock")
.responses
.get(&labels)
.is_none()
);
assert!(i.lock().unwrap().responses.get(&labels).is_none());
r.record_event(&ev);
{
let lock = r.metrics.lock()
.expect("lock");
let lock = i.lock().unwrap();
let scope = lock
.responses
.get(&labels)
@ -144,12 +120,10 @@ mod test {
scope.latency().assert_lt_exactly(200, 0);
scope.latency().assert_gt_exactly(200, 0);
}
}
fn test_record_one_conn_request_outbound(client_tls: TlsStatus, server_tls: TlsStatus) {
use self::Event::*;
use self::labels::*;
let proxy = ctx::Proxy::Outbound;
let server = server(proxy, server_tls);
@ -189,7 +163,7 @@ mod test {
}),
];
let mut r = new_record();
let (mut r, i) = new_record();
let req_labels = RequestLabels::new(&req);
let rsp_labels = ResponseLabels::new(&rsp, None);
@ -198,7 +172,7 @@ mod test {
assert_eq!(client_tls, rsp_labels.tls_status());
{
let lock = r.metrics.lock().expect("lock");
let lock = i.lock().unwrap();
assert!(lock.requests.get(&req_labels).is_none());
assert!(lock.responses.get(&rsp_labels).is_none());
}
@ -208,7 +182,7 @@ mod test {
}
{
let lock = r.metrics.lock().expect("lock");
let lock = i.lock().unwrap();
// === request scope ====================================
assert_eq!(

View File

@ -5,10 +5,10 @@ use tower_service::NewService;
use tower_h2::Body;
use ctx;
use telemetry::tap;
use telemetry::{http::event, tap};
use transparency::ClientError;
use super::{event, Record};
use super::record::Record;
use super::service::{NewHttp, RequestBody};
#[derive(Clone, Debug)]
@ -42,7 +42,7 @@ impl Handle {
}
impl Sensors {
pub fn new(metrics: Record, taps: &Arc<Mutex<tap::Taps>>) -> Self {
pub(super) fn new(metrics: Record, taps: &Arc<Mutex<tap::Taps>>) -> Self {
Sensors(Inner {
metrics,
taps: taps.clone(),

View File

@ -26,10 +26,8 @@
//! labels, we can add new labels or modify the existing ones without having
//! to worry about missing commas, double commas, or trailing commas at the
//! end of the label set (all of which will make Prometheus angry).
use std::default::Default;
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::time::Instant;
mod counter;
mod gauge;
@ -48,65 +46,42 @@ pub use self::serve::Serve;
use super::{http, process, tls_config_reload, transport};
/// The root scope for all runtime metrics.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct Root {
pub(super) requests: http::RequestScopes,
pub(super) responses: http::ResponseScopes,
http: http::Report,
transports: transport::Report,
tls_config_reload: tls_config_reload::Report,
process: process::Report,
}
/// Construct the Prometheus metrics.
///
/// Returns the `Record` and `Serve` sides. The `Serve` side
/// is a Hyper service which can be used to create the server for the
/// scrape endpoint, while the `Record` side can receive updates to the
/// metrics by calling `record_event`.
pub fn new(
idle_retain: Duration,
process: process::Report,
transport_report: transport::Report,
tls: tls_config_reload::Report
) -> (http::Record, Serve) {
let metrics = Arc::new(Mutex::new(Root::new(process, transport_report, tls)));
(http::Record::new(&metrics), Serve::new(&metrics, idle_retain))
// ===== impl Root =====
impl Root {
pub(super) fn new(
http: http::Report,
transports: transport::Report,
tls_config_reload: tls_config_reload::Report,
process: process::Report,
) -> Self {
Self {
http,
transports,
tls_config_reload,
process,
}
}
// TODO this should be moved into `http`
fn retain_since(&mut self, epoch: Instant) {
self.http.retain_since(epoch);
}
}
// ===== impl Root =====
impl Root {
fn new(
process: process::Report,
transports: transport::Report,
tls_config_reload: tls_config_reload::Report
) -> Self {
Self {
process,
transports,
tls_config_reload,
.. Root::default()
}
}
pub(super) fn request(&mut self, labels: http::RequestLabels) -> &mut http::RequestMetrics {
self.requests.get_or_default(labels).stamped()
}
pub(super) fn response(&mut self, labels: http::ResponseLabels) -> &mut http::ResponseMetrics {
self.responses.get_or_default(labels).stamped()
}
fn retain_since(&mut self, epoch: Instant) {
self.requests.retain(|_, v| v.stamp() >= epoch);
self.responses.retain(|_, v| v.stamp() >= epoch);
}
}
impl FmtMetrics for Root {
fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.requests.fmt_metrics(f)?;
self.responses.fmt_metrics(f)?;
self.http.fmt_metrics(f)?;
self.transports.fmt_metrics(f)?;
self.tls_config_reload.fmt_metrics(f)?;
self.process.fmt_metrics(f)?;
@ -114,60 +89,3 @@ impl FmtMetrics for Root {
Ok(())
}
}
#[cfg(test)]
mod tests {
use ctx;
use ctx::test_util::*;
use super::*;
use conditional::Conditional;
use tls;
const TLS_DISABLED: Conditional<(), tls::ReasonForNoTls> =
Conditional::None(tls::ReasonForNoTls::Disabled);
fn mock_route(
root: &mut Root,
proxy: ctx::Proxy,
server: &Arc<ctx::transport::Server>,
team: &str
) {
let client = client(proxy, indexmap!["team".into() => team.into(),], TLS_DISABLED);
let (req, rsp) = request("http://nba.com", &server, &client);
root.request(http::RequestLabels::new(&req)).end();
root.response(http::ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10));
}
#[test]
fn expiry() {
let proxy = ctx::Proxy::Outbound;
let server = server(proxy, TLS_DISABLED);
let mut root = Root::default();
let t0 = Instant::now();
mock_route(&mut root, proxy, &server, "warriors");
let t1 = Instant::now();
mock_route(&mut root, proxy, &server, "sixers");
let t2 = Instant::now();
assert_eq!(root.requests.len(), 2);
assert_eq!(root.responses.len(), 2);
root.retain_since(t0);
assert_eq!(root.requests.len(), 2);
assert_eq!(root.responses.len(), 2);
root.retain_since(t1);
assert_eq!(root.requests.len(), 1);
assert_eq!(root.responses.len(), 1);
root.retain_since(t2);
assert_eq!(root.requests.len(), 0);
assert_eq!(root.responses.len(), 0);
}
}

View File

@ -35,7 +35,7 @@ enum ServeError {
// ===== impl Serve =====
impl Serve {
pub(super) fn new(metrics: &Arc<Mutex<Root>>, idle_retain: Duration) -> Self {
pub fn new(metrics: &Arc<Mutex<Root>>, idle_retain: Duration) -> Self {
Serve {
metrics: metrics.clone(),
idle_retain,

View File

@ -34,15 +34,16 @@ pub fn new(
taps: &Arc<Mutex<tap::Taps>>,
) -> (Sensors, transport::Registry, tls_config_reload::Sensor, ServeMetrics) {
let process = process::Report::new(start_time);
let (http_sensors, http_report) = http::new(taps);
let (transport_registry, transport_report) = transport::new();
let (tls_config_sensor, tls_config_fmt) = tls_config_reload::new();
let (record, serve) = metrics::new(
metrics_retain_idle,
process,
let report = Arc::new(Mutex::new(metrics::Root::new(
http_report,
transport_report,
tls_config_fmt
);
let s = Sensors::new(record, taps);
(s, transport_registry, tls_config_sensor, serve)
tls_config_fmt,
process,
)));
let serve = ServeMetrics::new(&report, metrics_retain_idle);
(http_sensors, transport_registry, tls_config_sensor, serve)
}