From b0526e4af7cdea4f427ab33dc444f94a5b599064 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 21 Aug 2018 15:17:38 -0700 Subject: [PATCH] Consolidate HTTP telemetry in telemetry::http (#79) In preparation for further simplifications to HTTP telemetry, this change consolidates all HTTP-specific logic under the `telemetry::http` module. Specifically, the following modules have been moved: - `telemetry::event`; - `telemetry::metrics::labels`; - `telemetry::metrics::record`; - `telemetry::sensors`; and - `telemetry::sensors::http`. This change takes pains to avoid changing any implementation details, so some types and methods have been made public temporarily while the interface boundaries are not well defined. This will be fixed in a subsequent change. --- src/bind.rs | 8 +-- src/control/pb.rs | 2 +- src/lib.rs | 2 +- src/outbound.rs | 2 +- src/telemetry/{ => http}/event.rs | 0 src/telemetry/{metrics => http}/labels.rs | 2 +- .../{metrics/http.rs => http/mod.rs} | 60 ++++++++++++++-- src/telemetry/{metrics => http}/record.rs | 23 ++---- .../{sensor/mod.rs => http/sensors.rs} | 19 +++-- .../{sensor/http.rs => http/service.rs} | 18 ++--- src/telemetry/metrics/mod.rs | 70 ++++--------------- src/telemetry/mod.rs | 7 +- src/transparency/client.rs | 2 +- 13 files changed, 104 insertions(+), 111 deletions(-) rename src/telemetry/{ => http}/event.rs (100%) rename src/telemetry/{metrics => http}/labels.rs (99%) rename src/telemetry/{metrics/http.rs => http/mod.rs} (69%) rename src/telemetry/{metrics => http}/record.rs (95%) rename src/telemetry/{sensor/mod.rs => http/sensors.rs} (76%) rename src/telemetry/{sensor/http.rs => http/service.rs} (98%) diff --git a/src/bind.rs b/src/bind.rs index e9d784806..ae8761c25 100644 --- a/src/bind.rs +++ b/src/bind.rs @@ -12,7 +12,7 @@ use tower_reconnect::{Reconnect, Error as ReconnectError}; use control; use control::destination::Endpoint; use ctx; -use telemetry::{self, sensor}; +use telemetry; use transparency::{self, HttpBody, h1, orig_proto}; use transport; use tls; @@ -143,11 +143,11 @@ pub type Stack = WatchService>; type StackInner = Reconnect>>>; -pub type NewHttp = sensor::NewHttp, B, HttpBody>; +pub type NewHttp = telemetry::http::service::NewHttp, B, HttpBody>; -pub type HttpResponse = http::Response>; +pub type HttpResponse = http::Response>; -pub type HttpRequest = http::Request>; +pub type HttpRequest = http::Request>; pub type Client = transparency::Client< telemetry::transport::Connect, diff --git a/src/control/pb.rs b/src/control/pb.rs index 1b1acc28a..83f3efd2a 100644 --- a/src/control/pb.rs +++ b/src/control/pb.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use linkerd2_proxy_api::*; use convert::*; use ctx; -use telemetry::{event, Event}; +use telemetry::http::event::{self, Event}; #[derive(Debug, Clone)] pub struct UnknownEvent; diff --git a/src/lib.rs b/src/lib.rs index df1865223..3701a75c2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -450,7 +450,7 @@ where // Install the request open timestamp module at the very top // of the stack, in order to take the timestamp as close as // possible to the beginning of the request's lifetime. - telemetry::sensor::http::TimestampRequestOpen::new(map_err) + telemetry::http::service::TimestampRequestOpen::new(map_err) })); let listen_addr = bound_port.local_addr(); diff --git a/src/outbound.rs b/src/outbound.rs index c6f7b0ecf..4f47217bd 100644 --- a/src/outbound.rs +++ b/src/outbound.rs @@ -17,7 +17,7 @@ use linkerd2_proxy_router::Recognize; use bind::{self, Bind, Protocol}; use control::destination::{self, Bind as BindTrait, Resolution}; use ctx; -use telemetry::sensor::http::{ResponseBody as SensorBody}; +use telemetry::http::service::{ResponseBody as SensorBody}; use timeout::Timeout; use transparency::{h1, HttpBody}; use transport::{DnsNameAndPort, Host, HostAndPort}; diff --git a/src/telemetry/event.rs b/src/telemetry/http/event.rs similarity index 100% rename from src/telemetry/event.rs rename to src/telemetry/http/event.rs diff --git a/src/telemetry/metrics/labels.rs b/src/telemetry/http/labels.rs similarity index 99% rename from src/telemetry/metrics/labels.rs rename to src/telemetry/http/labels.rs index 9b31fdf0c..7d41df14f 100644 --- a/src/telemetry/metrics/labels.rs +++ b/src/telemetry/http/labels.rs @@ -7,7 +7,7 @@ use http; use ctx; use conditional::Conditional; -use super::prom::FmtLabels; +use telemetry::metrics::FmtLabels; use transport::tls; #[derive(Clone, Debug, Eq, PartialEq, Hash)] diff --git a/src/telemetry/metrics/http.rs b/src/telemetry/http/mod.rs similarity index 69% rename from src/telemetry/metrics/http.rs rename to src/telemetry/http/mod.rs index 8a07e9be9..18e7a1a84 100644 --- a/src/telemetry/metrics/http.rs +++ b/src/telemetry/http/mod.rs @@ -1,17 +1,24 @@ use std::fmt; -use std::time::Duration; +use std::time::{Duration, Instant}; -use super::{ +use super::metrics::{ latency, - prom::FmtMetrics, Counter, + FmtMetrics, Histogram, - RequestLabels, - ResponseLabels, Scopes, - Stamped, }; +pub mod event; +mod labels; +mod record; +mod sensors; +pub mod service; + +pub use self::labels::{RequestLabels, ResponseLabels}; +pub use self::record::Record; +pub use self::sensors::Sensors; + pub(super) type RequestScopes = Scopes>; #[derive(Debug, Default)] @@ -27,6 +34,12 @@ pub struct ResponseMetrics { latency: Histogram, } +#[derive(Debug)] +pub(super) struct Stamped { + stamp: Instant, + inner: T, +} + // ===== impl RequestScopes ===== impl RequestScopes { @@ -107,3 +120,38 @@ impl ResponseMetrics { &self.latency } } + +// ===== impl Stamped ===== + +impl Stamped { + pub fn stamp(&self) -> Instant { + self.stamp + } + + pub fn stamped(&mut self) -> &mut T { + self.stamp = Instant::now(); + &mut self.inner + } +} + +impl Default for Stamped { + fn default() -> Self { + T::default().into() + } +} + +impl From for Stamped { + fn from(inner: T) -> Self { + Self { + inner, + stamp: Instant::now(), + } + } +} + +impl ::std::ops::Deref for Stamped { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.inner + } +} diff --git a/src/telemetry/metrics/record.rs b/src/telemetry/http/record.rs similarity index 95% rename from src/telemetry/metrics/record.rs rename to src/telemetry/http/record.rs index 7d872f1cd..54ec99c29 100644 --- a/src/telemetry/metrics/record.rs +++ b/src/telemetry/http/record.rs @@ -1,7 +1,7 @@ use std::sync::{Arc, Mutex}; -use telemetry::event::Event; -use super::Root; +use telemetry::http::event::Event; +use telemetry::metrics::Root; use super::labels::{ RequestLabels, ResponseLabels, @@ -16,7 +16,7 @@ pub struct Record { // ===== impl Record ===== impl Record { - pub(super) fn new(metrics: &Arc>) -> Self { + pub fn new(metrics: &Arc>) -> Self { Self { metrics: metrics.clone() } } @@ -75,14 +75,11 @@ impl Record { #[cfg(test)] mod test { - use telemetry::{ - event, - metrics::{self, labels}, - Event, - }; - use ctx::{self, test_util::*, transport::TlsStatus}; use std::time::{Duration, Instant}; + + use ctx::{self, test_util::*, transport::TlsStatus}; use conditional::Conditional; + use telemetry::http::{event::{self, Event}, labels}; use tls; const TLS_ENABLED: Conditional<(), tls::ReasonForNoTls> = Conditional::Some(()); @@ -90,13 +87,7 @@ mod test { Conditional::None(tls::ReasonForNoTls::Disabled); fn new_record() -> super::Record { - let (r, _) = metrics::new( - Duration::from_secs(100), - Default::default(), - Default::default(), - Default::default() - ); - r + super::Record::new(&Default::default()) } fn test_record_response_end_outbound(client_tls: TlsStatus, server_tls: TlsStatus) { diff --git a/src/telemetry/sensor/mod.rs b/src/telemetry/http/sensors.rs similarity index 76% rename from src/telemetry/sensor/mod.rs rename to src/telemetry/http/sensors.rs index 85f2c45d4..4d7339a3a 100644 --- a/src/telemetry/sensor/mod.rs +++ b/src/telemetry/http/sensors.rs @@ -5,29 +5,28 @@ use tower_service::NewService; use tower_h2::Body; use ctx; -use telemetry::{event, metrics, tap}; +use telemetry::tap; use transparency::ClientError; -pub mod http; - -pub use self::http::{Http, NewHttp}; +use super::{event, Record}; +use super::service::{NewHttp, RequestBody}; #[derive(Clone, Debug)] struct Inner { - metrics: metrics::Record, + metrics: Record, taps: Arc>, } /// Accepts events from sensors. #[derive(Clone, Debug)] -struct Handle(Inner); +pub(super) struct Handle(Inner); /// Supports the creation of telemetry scopes. #[derive(Clone, Debug)] pub struct Sensors(Inner); impl Handle { - fn send(&mut self, mk: F) + pub fn send(&mut self, mk: F) where F: FnOnce() -> event::Event, { @@ -43,7 +42,7 @@ impl Handle { } impl Sensors { - pub(super) fn new(metrics: metrics::Record, taps: &Arc>) -> Self { + pub fn new(metrics: Record, taps: &Arc>) -> Self { Sensors(Inner { metrics, taps: taps.clone(), @@ -52,7 +51,7 @@ impl Sensors { #[cfg(test)] pub fn for_test() -> Self { - Self::new(metrics::Record::for_test(), &Default::default()) + Self::new(Record::for_test(), &Default::default()) } pub fn http( @@ -64,7 +63,7 @@ impl Sensors { A: Body + 'static, B: Body + 'static, N: NewService< - Request = Request>, + Request = Request>, Response = Response, Error = ClientError > diff --git a/src/telemetry/sensor/http.rs b/src/telemetry/http/service.rs similarity index 98% rename from src/telemetry/sensor/http.rs rename to src/telemetry/http/service.rs index a5de0a3dd..0c78bda7f 100644 --- a/src/telemetry/sensor/http.rs +++ b/src/telemetry/http/service.rs @@ -10,9 +10,11 @@ use tower_service::{NewService, Service}; use tower_h2::Body; use ctx; -use telemetry::event::{self, Event}; use transparency::ClientError; +use super::event::{self, Event}; +use super::sensors::Handle; + const GRPC_STATUS: &str = "grpc-status"; /// A `RequestOpen` timestamp. @@ -36,14 +38,14 @@ pub struct TimestampRequestOpen { pub struct NewHttp { new_service: N, - handle: super::Handle, + handle: Handle, client_ctx: Arc, _p: PhantomData<(A, B)>, } pub struct Init { future: F, - handle: super::Handle, + handle: Handle, client_ctx: Arc, _p: PhantomData<(A, B)>, } @@ -52,7 +54,7 @@ pub struct Init { #[derive(Debug)] pub struct Http { service: S, - handle: super::Handle, + handle: Handle, client_ctx: Arc, _p: PhantomData<(A, B)>, } @@ -66,7 +68,7 @@ pub struct Respond { #[derive(Debug)] struct RespondInner { - handle: super::Handle, + handle: Handle, ctx: Arc, request_open_at: Instant, } @@ -91,7 +93,7 @@ pub trait BodySensor: Sized { #[derive(Debug)] pub struct ResponseBodyInner { - handle: super::Handle, + handle: Handle, ctx: Arc, bytes_sent: u64, frames_sent: u32, @@ -103,7 +105,7 @@ pub struct ResponseBodyInner { #[derive(Debug)] pub struct RequestBodyInner { - handle: super::Handle, + handle: Handle, ctx: Arc, bytes_sent: u64, frames_sent: u32, @@ -125,7 +127,7 @@ where { pub(super) fn new( new_service: N, - handle: super::Handle, + handle: Handle, client_ctx: &Arc, ) -> Self { Self { diff --git a/src/telemetry/metrics/mod.rs b/src/telemetry/metrics/mod.rs index 8c6e70a5a..c12443889 100644 --- a/src/telemetry/metrics/mod.rs +++ b/src/telemetry/metrics/mod.rs @@ -34,45 +34,29 @@ use std::time::{Duration, Instant}; mod counter; mod gauge; mod histogram; -mod http; -mod labels; pub mod latency; pub mod prom; -mod record; mod scopes; mod serve; pub use self::counter::Counter; pub use self::gauge::Gauge; pub use self::histogram::Histogram; -use self::labels::{ - RequestLabels, - ResponseLabels, -}; pub use self::prom::{FmtMetrics, FmtLabels, FmtMetric}; -pub use self::record::Record; pub use self::scopes::Scopes; pub use self::serve::Serve; -use super::transport; -use super::process; -use super::tls_config_reload; +use super::{http, process, tls_config_reload, transport}; /// The root scope for all runtime metrics. #[derive(Debug, Default)] -struct Root { - requests: http::RequestScopes, - responses: http::ResponseScopes, +pub struct Root { + pub(super) requests: http::RequestScopes, + pub(super) responses: http::ResponseScopes, transports: transport::Report, tls_config_reload: tls_config_reload::Report, process: process::Report, } -#[derive(Debug)] -struct Stamped { - stamp: Instant, - inner: T, -} - /// Construct the Prometheus metrics. /// /// Returns the `Record` and `Serve` sides. The `Serve` side @@ -84,9 +68,9 @@ pub fn new( process: process::Report, transport_report: transport::Report, tls: tls_config_reload::Report -) -> (Record, Serve) { +) -> (http::Record, Serve) { let metrics = Arc::new(Mutex::new(Root::new(process, transport_report, tls))); - (Record::new(&metrics), Serve::new(&metrics, idle_retain)) + (http::Record::new(&metrics), Serve::new(&metrics, idle_retain)) } // ===== impl Root ===== @@ -105,17 +89,17 @@ impl Root { } } - fn request(&mut self, labels: RequestLabels) -> &mut http::RequestMetrics { + pub(super) fn request(&mut self, labels: http::RequestLabels) -> &mut http::RequestMetrics { self.requests.get_or_default(labels).stamped() } - fn response(&mut self, labels: ResponseLabels) -> &mut http::ResponseMetrics { + pub(super) fn response(&mut self, labels: http::ResponseLabels) -> &mut http::ResponseMetrics { self.responses.get_or_default(labels).stamped() } fn retain_since(&mut self, epoch: Instant) { - self.requests.retain(|_, v| v.stamp >= epoch); - self.responses.retain(|_, v| v.stamp >= epoch); + self.requests.retain(|_, v| v.stamp() >= epoch); + self.responses.retain(|_, v| v.stamp() >= epoch); } } @@ -131,36 +115,6 @@ impl FmtMetrics for Root { } } -// ===== impl Stamped ===== - -impl Stamped { - fn stamped(&mut self) -> &mut T { - self.stamp = Instant::now(); - &mut self.inner - } -} - -impl Default for Stamped { - fn default() -> Self { - T::default().into() - } -} - -impl From for Stamped { - fn from(inner: T) -> Self { - Self { - inner, - stamp: Instant::now(), - } - } -} - -impl ::std::ops::Deref for Stamped { - type Target = T; - fn deref(&self) -> &Self::Target { - &self.inner - } -} #[cfg(test)] mod tests { @@ -181,8 +135,8 @@ mod tests { ) { let client = client(proxy, indexmap!["team".into() => team.into(),], TLS_DISABLED); let (req, rsp) = request("http://nba.com", &server, &client); - root.request(RequestLabels::new(&req)).end(); - root.response(ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10)); + root.request(http::RequestLabels::new(&req)).end(); + root.response(http::ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10)); } #[test] diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs index 9a490d8a9..2643037ee 100644 --- a/src/telemetry/mod.rs +++ b/src/telemetry/mod.rs @@ -16,18 +16,17 @@ macro_rules! metrics { } mod errno; -pub mod event; +pub mod http; mod metrics; mod process; -pub mod sensor; pub mod tap; pub mod tls_config_reload; pub mod transport; use self::errno::Errno; -pub use self::event::Event; +pub use self::http::event::Event; pub use self::metrics::{Serve as ServeMetrics}; -pub use self::sensor::Sensors; +pub use self::http::Sensors; pub fn new( start_time: SystemTime, diff --git a/src/transparency/client.rs b/src/transparency/client.rs index 40d0ed5db..0b5de8d7a 100644 --- a/src/transparency/client.rs +++ b/src/transparency/client.rs @@ -10,7 +10,7 @@ use tower_h2; use bind; use task::BoxExecutor; -use telemetry::sensor::http::RequestBody; +use telemetry::http::service::RequestBody; use super::glue::{BodyPayload, HttpBody, HyperConnect}; use super::h1; use super::upgrade::{HttpConnect, Http11Upgrade};