Consolidate HTTP telemetry in telemetry::http (#79)

In preparation for further simplifications to HTTP telemetry, this
change consolidates all HTTP-specific logic under the `telemetry::http`
module.

Specifically, the following modules have been moved:
- `telemetry::event`;
- `telemetry::metrics::labels`;
- `telemetry::metrics::record`;
- `telemetry::sensors`; and
- `telemetry::sensors::http`.

This change takes pains to avoid changing any implementation details, so
some types and methods have been made public temporarily while the
interface boundaries are not well defined. This will be fixed in a
subsequent change.
This commit is contained in:
Oliver Gould 2018-08-21 15:17:38 -07:00 committed by GitHub
parent 40e9ffcaf8
commit b0526e4af7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 104 additions and 111 deletions

View File

@ -12,7 +12,7 @@ use tower_reconnect::{Reconnect, Error as ReconnectError};
use control; use control;
use control::destination::Endpoint; use control::destination::Endpoint;
use ctx; use ctx;
use telemetry::{self, sensor}; use telemetry;
use transparency::{self, HttpBody, h1, orig_proto}; use transparency::{self, HttpBody, h1, orig_proto};
use transport; use transport;
use tls; use tls;
@ -143,11 +143,11 @@ pub type Stack<B> = WatchService<tls::ConditionalClientConfig, RebindTls<B>>;
type StackInner<B> = Reconnect<orig_proto::Upgrade<NormalizeUri<NewHttp<B>>>>; type StackInner<B> = Reconnect<orig_proto::Upgrade<NormalizeUri<NewHttp<B>>>>;
pub type NewHttp<B> = sensor::NewHttp<Client<B>, B, HttpBody>; pub type NewHttp<B> = telemetry::http::service::NewHttp<Client<B>, B, HttpBody>;
pub type HttpResponse = http::Response<sensor::http::ResponseBody<HttpBody>>; pub type HttpResponse = http::Response<telemetry::http::service::ResponseBody<HttpBody>>;
pub type HttpRequest<B> = http::Request<sensor::http::RequestBody<B>>; pub type HttpRequest<B> = http::Request<telemetry::http::service::RequestBody<B>>;
pub type Client<B> = transparency::Client< pub type Client<B> = transparency::Client<
telemetry::transport::Connect<transport::Connect>, telemetry::transport::Connect<transport::Connect>,

View File

@ -8,7 +8,7 @@ use std::sync::Arc;
use linkerd2_proxy_api::*; use linkerd2_proxy_api::*;
use convert::*; use convert::*;
use ctx; use ctx;
use telemetry::{event, Event}; use telemetry::http::event::{self, Event};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct UnknownEvent; pub struct UnknownEvent;

View File

@ -450,7 +450,7 @@ where
// Install the request open timestamp module at the very top // Install the request open timestamp module at the very top
// of the stack, in order to take the timestamp as close as // of the stack, in order to take the timestamp as close as
// possible to the beginning of the request's lifetime. // possible to the beginning of the request's lifetime.
telemetry::sensor::http::TimestampRequestOpen::new(map_err) telemetry::http::service::TimestampRequestOpen::new(map_err)
})); }));
let listen_addr = bound_port.local_addr(); let listen_addr = bound_port.local_addr();

View File

@ -17,7 +17,7 @@ use linkerd2_proxy_router::Recognize;
use bind::{self, Bind, Protocol}; use bind::{self, Bind, Protocol};
use control::destination::{self, Bind as BindTrait, Resolution}; use control::destination::{self, Bind as BindTrait, Resolution};
use ctx; use ctx;
use telemetry::sensor::http::{ResponseBody as SensorBody}; use telemetry::http::service::{ResponseBody as SensorBody};
use timeout::Timeout; use timeout::Timeout;
use transparency::{h1, HttpBody}; use transparency::{h1, HttpBody};
use transport::{DnsNameAndPort, Host, HostAndPort}; use transport::{DnsNameAndPort, Host, HostAndPort};

View File

@ -7,7 +7,7 @@ use http;
use ctx; use ctx;
use conditional::Conditional; use conditional::Conditional;
use super::prom::FmtLabels; use telemetry::metrics::FmtLabels;
use transport::tls; use transport::tls;
#[derive(Clone, Debug, Eq, PartialEq, Hash)] #[derive(Clone, Debug, Eq, PartialEq, Hash)]

View File

@ -1,17 +1,24 @@
use std::fmt; use std::fmt;
use std::time::Duration; use std::time::{Duration, Instant};
use super::{ use super::metrics::{
latency, latency,
prom::FmtMetrics,
Counter, Counter,
FmtMetrics,
Histogram, Histogram,
RequestLabels,
ResponseLabels,
Scopes, Scopes,
Stamped,
}; };
pub mod event;
mod labels;
mod record;
mod sensors;
pub mod service;
pub use self::labels::{RequestLabels, ResponseLabels};
pub use self::record::Record;
pub use self::sensors::Sensors;
pub(super) type RequestScopes = Scopes<RequestLabels, Stamped<RequestMetrics>>; pub(super) type RequestScopes = Scopes<RequestLabels, Stamped<RequestMetrics>>;
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -27,6 +34,12 @@ pub struct ResponseMetrics {
latency: Histogram<latency::Ms>, latency: Histogram<latency::Ms>,
} }
#[derive(Debug)]
pub(super) struct Stamped<T> {
stamp: Instant,
inner: T,
}
// ===== impl RequestScopes ===== // ===== impl RequestScopes =====
impl RequestScopes { impl RequestScopes {
@ -107,3 +120,38 @@ impl ResponseMetrics {
&self.latency &self.latency
} }
} }
// ===== impl Stamped =====
impl<T> Stamped<T> {
pub fn stamp(&self) -> Instant {
self.stamp
}
pub fn stamped(&mut self) -> &mut T {
self.stamp = Instant::now();
&mut self.inner
}
}
impl<T: Default> Default for Stamped<T> {
fn default() -> Self {
T::default().into()
}
}
impl<T> From<T> for Stamped<T> {
fn from(inner: T) -> Self {
Self {
inner,
stamp: Instant::now(),
}
}
}
impl<T> ::std::ops::Deref for Stamped<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}

View File

@ -1,7 +1,7 @@
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use telemetry::event::Event; use telemetry::http::event::Event;
use super::Root; use telemetry::metrics::Root;
use super::labels::{ use super::labels::{
RequestLabels, RequestLabels,
ResponseLabels, ResponseLabels,
@ -16,7 +16,7 @@ pub struct Record {
// ===== impl Record ===== // ===== impl Record =====
impl Record { impl Record {
pub(super) fn new(metrics: &Arc<Mutex<Root>>) -> Self { pub fn new(metrics: &Arc<Mutex<Root>>) -> Self {
Self { metrics: metrics.clone() } Self { metrics: metrics.clone() }
} }
@ -75,14 +75,11 @@ impl Record {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use telemetry::{
event,
metrics::{self, labels},
Event,
};
use ctx::{self, test_util::*, transport::TlsStatus};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use ctx::{self, test_util::*, transport::TlsStatus};
use conditional::Conditional; use conditional::Conditional;
use telemetry::http::{event::{self, Event}, labels};
use tls; use tls;
const TLS_ENABLED: Conditional<(), tls::ReasonForNoTls> = Conditional::Some(()); const TLS_ENABLED: Conditional<(), tls::ReasonForNoTls> = Conditional::Some(());
@ -90,13 +87,7 @@ mod test {
Conditional::None(tls::ReasonForNoTls::Disabled); Conditional::None(tls::ReasonForNoTls::Disabled);
fn new_record() -> super::Record { fn new_record() -> super::Record {
let (r, _) = metrics::new( super::Record::new(&Default::default())
Duration::from_secs(100),
Default::default(),
Default::default(),
Default::default()
);
r
} }
fn test_record_response_end_outbound(client_tls: TlsStatus, server_tls: TlsStatus) { fn test_record_response_end_outbound(client_tls: TlsStatus, server_tls: TlsStatus) {

View File

@ -5,29 +5,28 @@ use tower_service::NewService;
use tower_h2::Body; use tower_h2::Body;
use ctx; use ctx;
use telemetry::{event, metrics, tap}; use telemetry::tap;
use transparency::ClientError; use transparency::ClientError;
pub mod http; use super::{event, Record};
use super::service::{NewHttp, RequestBody};
pub use self::http::{Http, NewHttp};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct Inner { struct Inner {
metrics: metrics::Record, metrics: Record,
taps: Arc<Mutex<tap::Taps>>, taps: Arc<Mutex<tap::Taps>>,
} }
/// Accepts events from sensors. /// Accepts events from sensors.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct Handle(Inner); pub(super) struct Handle(Inner);
/// Supports the creation of telemetry scopes. /// Supports the creation of telemetry scopes.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Sensors(Inner); pub struct Sensors(Inner);
impl Handle { impl Handle {
fn send<F>(&mut self, mk: F) pub fn send<F>(&mut self, mk: F)
where where
F: FnOnce() -> event::Event, F: FnOnce() -> event::Event,
{ {
@ -43,7 +42,7 @@ impl Handle {
} }
impl Sensors { impl Sensors {
pub(super) fn new(metrics: metrics::Record, taps: &Arc<Mutex<tap::Taps>>) -> Self { pub fn new(metrics: Record, taps: &Arc<Mutex<tap::Taps>>) -> Self {
Sensors(Inner { Sensors(Inner {
metrics, metrics,
taps: taps.clone(), taps: taps.clone(),
@ -52,7 +51,7 @@ impl Sensors {
#[cfg(test)] #[cfg(test)]
pub fn for_test() -> Self { pub fn for_test() -> Self {
Self::new(metrics::Record::for_test(), &Default::default()) Self::new(Record::for_test(), &Default::default())
} }
pub fn http<N, A, B>( pub fn http<N, A, B>(
@ -64,7 +63,7 @@ impl Sensors {
A: Body + 'static, A: Body + 'static,
B: Body + 'static, B: Body + 'static,
N: NewService< N: NewService<
Request = Request<http::RequestBody<A>>, Request = Request<RequestBody<A>>,
Response = Response<B>, Response = Response<B>,
Error = ClientError Error = ClientError
> >

View File

@ -10,9 +10,11 @@ use tower_service::{NewService, Service};
use tower_h2::Body; use tower_h2::Body;
use ctx; use ctx;
use telemetry::event::{self, Event};
use transparency::ClientError; use transparency::ClientError;
use super::event::{self, Event};
use super::sensors::Handle;
const GRPC_STATUS: &str = "grpc-status"; const GRPC_STATUS: &str = "grpc-status";
/// A `RequestOpen` timestamp. /// A `RequestOpen` timestamp.
@ -36,14 +38,14 @@ pub struct TimestampRequestOpen<S> {
pub struct NewHttp<N, A, B> { pub struct NewHttp<N, A, B> {
new_service: N, new_service: N,
handle: super::Handle, handle: Handle,
client_ctx: Arc<ctx::transport::Client>, client_ctx: Arc<ctx::transport::Client>,
_p: PhantomData<(A, B)>, _p: PhantomData<(A, B)>,
} }
pub struct Init<F, A, B> { pub struct Init<F, A, B> {
future: F, future: F,
handle: super::Handle, handle: Handle,
client_ctx: Arc<ctx::transport::Client>, client_ctx: Arc<ctx::transport::Client>,
_p: PhantomData<(A, B)>, _p: PhantomData<(A, B)>,
} }
@ -52,7 +54,7 @@ pub struct Init<F, A, B> {
#[derive(Debug)] #[derive(Debug)]
pub struct Http<S, A, B> { pub struct Http<S, A, B> {
service: S, service: S,
handle: super::Handle, handle: Handle,
client_ctx: Arc<ctx::transport::Client>, client_ctx: Arc<ctx::transport::Client>,
_p: PhantomData<(A, B)>, _p: PhantomData<(A, B)>,
} }
@ -66,7 +68,7 @@ pub struct Respond<F, B> {
#[derive(Debug)] #[derive(Debug)]
struct RespondInner { struct RespondInner {
handle: super::Handle, handle: Handle,
ctx: Arc<ctx::http::Request>, ctx: Arc<ctx::http::Request>,
request_open_at: Instant, request_open_at: Instant,
} }
@ -91,7 +93,7 @@ pub trait BodySensor: Sized {
#[derive(Debug)] #[derive(Debug)]
pub struct ResponseBodyInner { pub struct ResponseBodyInner {
handle: super::Handle, handle: Handle,
ctx: Arc<ctx::http::Response>, ctx: Arc<ctx::http::Response>,
bytes_sent: u64, bytes_sent: u64,
frames_sent: u32, frames_sent: u32,
@ -103,7 +105,7 @@ pub struct ResponseBodyInner {
#[derive(Debug)] #[derive(Debug)]
pub struct RequestBodyInner { pub struct RequestBodyInner {
handle: super::Handle, handle: Handle,
ctx: Arc<ctx::http::Request>, ctx: Arc<ctx::http::Request>,
bytes_sent: u64, bytes_sent: u64,
frames_sent: u32, frames_sent: u32,
@ -125,7 +127,7 @@ where
{ {
pub(super) fn new( pub(super) fn new(
new_service: N, new_service: N,
handle: super::Handle, handle: Handle,
client_ctx: &Arc<ctx::transport::Client>, client_ctx: &Arc<ctx::transport::Client>,
) -> Self { ) -> Self {
Self { Self {

View File

@ -34,45 +34,29 @@ use std::time::{Duration, Instant};
mod counter; mod counter;
mod gauge; mod gauge;
mod histogram; mod histogram;
mod http;
mod labels;
pub mod latency; pub mod latency;
pub mod prom; pub mod prom;
mod record;
mod scopes; mod scopes;
mod serve; mod serve;
pub use self::counter::Counter; pub use self::counter::Counter;
pub use self::gauge::Gauge; pub use self::gauge::Gauge;
pub use self::histogram::Histogram; pub use self::histogram::Histogram;
use self::labels::{
RequestLabels,
ResponseLabels,
};
pub use self::prom::{FmtMetrics, FmtLabels, FmtMetric}; pub use self::prom::{FmtMetrics, FmtLabels, FmtMetric};
pub use self::record::Record;
pub use self::scopes::Scopes; pub use self::scopes::Scopes;
pub use self::serve::Serve; pub use self::serve::Serve;
use super::transport; use super::{http, process, tls_config_reload, transport};
use super::process;
use super::tls_config_reload;
/// The root scope for all runtime metrics. /// The root scope for all runtime metrics.
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct Root { pub struct Root {
requests: http::RequestScopes, pub(super) requests: http::RequestScopes,
responses: http::ResponseScopes, pub(super) responses: http::ResponseScopes,
transports: transport::Report, transports: transport::Report,
tls_config_reload: tls_config_reload::Report, tls_config_reload: tls_config_reload::Report,
process: process::Report, process: process::Report,
} }
#[derive(Debug)]
struct Stamped<T> {
stamp: Instant,
inner: T,
}
/// Construct the Prometheus metrics. /// Construct the Prometheus metrics.
/// ///
/// Returns the `Record` and `Serve` sides. The `Serve` side /// Returns the `Record` and `Serve` sides. The `Serve` side
@ -84,9 +68,9 @@ pub fn new(
process: process::Report, process: process::Report,
transport_report: transport::Report, transport_report: transport::Report,
tls: tls_config_reload::Report tls: tls_config_reload::Report
) -> (Record, Serve) { ) -> (http::Record, Serve) {
let metrics = Arc::new(Mutex::new(Root::new(process, transport_report, tls))); let metrics = Arc::new(Mutex::new(Root::new(process, transport_report, tls)));
(Record::new(&metrics), Serve::new(&metrics, idle_retain)) (http::Record::new(&metrics), Serve::new(&metrics, idle_retain))
} }
// ===== impl Root ===== // ===== impl Root =====
@ -105,17 +89,17 @@ impl Root {
} }
} }
fn request(&mut self, labels: RequestLabels) -> &mut http::RequestMetrics { pub(super) fn request(&mut self, labels: http::RequestLabels) -> &mut http::RequestMetrics {
self.requests.get_or_default(labels).stamped() self.requests.get_or_default(labels).stamped()
} }
fn response(&mut self, labels: ResponseLabels) -> &mut http::ResponseMetrics { pub(super) fn response(&mut self, labels: http::ResponseLabels) -> &mut http::ResponseMetrics {
self.responses.get_or_default(labels).stamped() self.responses.get_or_default(labels).stamped()
} }
fn retain_since(&mut self, epoch: Instant) { fn retain_since(&mut self, epoch: Instant) {
self.requests.retain(|_, v| v.stamp >= epoch); self.requests.retain(|_, v| v.stamp() >= epoch);
self.responses.retain(|_, v| v.stamp >= epoch); self.responses.retain(|_, v| v.stamp() >= epoch);
} }
} }
@ -131,36 +115,6 @@ impl FmtMetrics for Root {
} }
} }
// ===== impl Stamped =====
impl<T> Stamped<T> {
fn stamped(&mut self) -> &mut T {
self.stamp = Instant::now();
&mut self.inner
}
}
impl<T: Default> Default for Stamped<T> {
fn default() -> Self {
T::default().into()
}
}
impl<T> From<T> for Stamped<T> {
fn from(inner: T) -> Self {
Self {
inner,
stamp: Instant::now(),
}
}
}
impl<T> ::std::ops::Deref for Stamped<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
@ -181,8 +135,8 @@ mod tests {
) { ) {
let client = client(proxy, indexmap!["team".into() => team.into(),], TLS_DISABLED); let client = client(proxy, indexmap!["team".into() => team.into(),], TLS_DISABLED);
let (req, rsp) = request("http://nba.com", &server, &client); let (req, rsp) = request("http://nba.com", &server, &client);
root.request(RequestLabels::new(&req)).end(); root.request(http::RequestLabels::new(&req)).end();
root.response(ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10)); root.response(http::ResponseLabels::new(&rsp, None)).end(Duration::from_millis(10));
} }
#[test] #[test]

View File

@ -16,18 +16,17 @@ macro_rules! metrics {
} }
mod errno; mod errno;
pub mod event; pub mod http;
mod metrics; mod metrics;
mod process; mod process;
pub mod sensor;
pub mod tap; pub mod tap;
pub mod tls_config_reload; pub mod tls_config_reload;
pub mod transport; pub mod transport;
use self::errno::Errno; use self::errno::Errno;
pub use self::event::Event; pub use self::http::event::Event;
pub use self::metrics::{Serve as ServeMetrics}; pub use self::metrics::{Serve as ServeMetrics};
pub use self::sensor::Sensors; pub use self::http::Sensors;
pub fn new( pub fn new(
start_time: SystemTime, start_time: SystemTime,

View File

@ -10,7 +10,7 @@ use tower_h2;
use bind; use bind;
use task::BoxExecutor; use task::BoxExecutor;
use telemetry::sensor::http::RequestBody; use telemetry::http::service::RequestBody;
use super::glue::{BodyPayload, HttpBody, HyperConnect}; use super::glue::{BodyPayload, HttpBody, HyperConnect};
use super::h1; use super::h1;
use super::upgrade::{HttpConnect, Http11Upgrade}; use super::upgrade::{HttpConnect, Http11Upgrade};