diff --git a/src/bind.rs b/src/bind.rs index 8bd01ce86..3dd0b9a4a 100644 --- a/src/bind.rs +++ b/src/bind.rs @@ -29,6 +29,7 @@ use watch_service::{WatchService, Rebind}; pub struct Bind { ctx: C, sensors: telemetry::Sensors, + transport_registry: telemetry::transport::Registry, tls_client_config: tls::ClientConfigWatch, _p: PhantomData B>, } @@ -149,7 +150,7 @@ pub type HttpResponse = http::Response>; pub type HttpRequest = http::Request>; pub type Client = transparency::Client< - sensor::Connect, + telemetry::transport::Connect, ::logging::ClientExecutor<&'static str, SocketAddr>, B, >; @@ -181,10 +182,14 @@ impl Error for BufferSpawnError { } impl Bind<(), B> { - pub fn new(tls_client_config: tls::ClientConfigWatch) -> Self { + pub fn new( + transport_registry: telemetry::transport::Registry, + tls_client_config: tls::ClientConfigWatch + ) -> Self { Self { ctx: (), sensors: telemetry::Sensors::null(), + transport_registry, tls_client_config, _p: PhantomData, } @@ -201,6 +206,7 @@ impl Bind<(), B> { Bind { ctx, sensors: self.sensors, + transport_registry: self.transport_registry, tls_client_config: self.tls_client_config, _p: PhantomData, } @@ -212,6 +218,7 @@ impl Clone for Bind { Self { ctx: self.ctx.clone(), sensors: self.sensors.clone(), + transport_registry: self.transport_registry.clone(), tls_client_config: self.tls_client_config.clone(), _p: PhantomData, } @@ -261,10 +268,9 @@ where ); // Map a socket address to a connection. - let connect = self.sensors.connect( - transport::Connect::new(addr, tls), - &client_ctx, - ); + let connect = self.transport_registry + .new_connect(client_ctx.as_ref(), transport::Connect::new(addr, tls)); + let log = ::logging::Client::proxy(self.ctx, addr) .with_protocol(protocol.clone()); diff --git a/src/inbound.rs b/src/inbound.rs index a9c440282..b033dfb46 100644 --- a/src/inbound.rs +++ b/src/inbound.rs @@ -114,8 +114,11 @@ mod tests { use tls; fn new_inbound(default: Option, ctx: ctx::Proxy) -> Inbound<()> { - let bind = Bind::new(tls::ClientConfig::no_tls()).with_ctx(ctx); - Inbound::new(default, bind) + let bind = Bind::new( + ::telemetry::transport::Registry::default(), + tls::ClientConfig::no_tls() + ); + Inbound::new(default, bind.with_ctx(ctx)) } fn make_key_http1(addr: net::SocketAddr) -> (net::SocketAddr, bind::Protocol) { diff --git a/src/lib.rs b/src/lib.rs index 3876e568c..bc0ae802f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -251,7 +251,7 @@ where ); let (taps, observe) = control::Observe::new(100); - let (sensors, tls_config_sensor, metrics_server) = telemetry::new( + let (sensors, transport_registry, tls_config_sensor, metrics_server) = telemetry::new( start_time, config.metrics_retain_idle, &taps, @@ -286,7 +286,8 @@ where let (drain_tx, drain_rx) = drain::channel(); - let bind = Bind::new(tls_client_config).with_sensors(sensors.clone()); + let bind = Bind::new(transport_registry.clone(), tls_client_config) + .with_sensors(sensors.clone()); // Setup the public listener. This will listen on a publicly accessible // address and listen for inbound connections that should be forwarded @@ -307,7 +308,7 @@ where config.private_connect_timeout, config.inbound_ports_disable_protocol_detection, ctx, - sensors.clone(), + transport_registry.clone(), get_original_dst.clone(), drain_rx.clone(), ) @@ -330,7 +331,7 @@ where config.public_connect_timeout, config.outbound_ports_disable_protocol_detection, ctx, - sensors, + transport_registry, get_original_dst, drain_rx, ) @@ -392,7 +393,7 @@ fn serve( tcp_connect_timeout: Duration, disable_protocol_detection_ports: IndexSet, proxy_ctx: ctx::Proxy, - sensors: telemetry::Sensors, + transport_registry: telemetry::transport::Registry, get_orig_dst: G, drain_rx: drain::Watch, ) -> impl Future + Send + 'static @@ -453,7 +454,7 @@ where let server = Server::new( listen_addr, proxy_ctx, - sensors, + transport_registry, get_orig_dst, stack, tcp_connect_timeout, diff --git a/src/telemetry/event.rs b/src/telemetry/event.rs index 65af9d36a..037d77dd2 100644 --- a/src/telemetry/event.rs +++ b/src/telemetry/event.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use h2; @@ -7,9 +7,6 @@ use ctx; #[derive(Clone, Debug)] pub enum Event { - TransportOpen(Arc), - TransportClose(Arc, TransportClose), - StreamRequestOpen(Arc), StreamRequestFail(Arc, StreamRequestFail), StreamRequestEnd(Arc, StreamRequestEnd), @@ -19,19 +16,6 @@ pub enum Event { StreamResponseEnd(Arc, StreamResponseEnd), } -#[derive(Clone, Debug)] -pub struct TransportClose { - /// Indicates that the transport was closed without error. - // TODO include details. - pub clean: bool, - pub errno: Option, - - pub duration: Duration, - - pub rx_bytes: u64, - pub tx_bytes: u64, -} - #[derive(Clone, Debug)] pub struct StreamRequestFail { pub request_open_at: Instant, @@ -72,26 +56,3 @@ pub struct StreamResponseEnd { pub bytes_sent: u64, pub frames_sent: u32, } - -// ===== impl Event ===== - -impl Event { - pub fn is_http(&self) -> bool { - match *self { - Event::StreamRequestOpen(_) | - Event::StreamRequestFail(_, _) | - Event::StreamRequestEnd(_, _) | - Event::StreamResponseOpen(_, _) | - Event::StreamResponseFail(_, _) | - Event::StreamResponseEnd(_, _) => true, - _ => false, - } - } - - pub fn is_transport(&self) -> bool { - match *self { - Event::TransportOpen(_) | Event::TransportClose(_, _) => true, - _ => false, - } - } -} diff --git a/src/telemetry/metrics/mod.rs b/src/telemetry/metrics/mod.rs index c2e16771a..8c6e70a5a 100644 --- a/src/telemetry/metrics/mod.rs +++ b/src/telemetry/metrics/mod.rs @@ -82,11 +82,11 @@ struct Stamped { pub fn new( idle_retain: Duration, process: process::Report, + transport_report: transport::Report, tls: tls_config_reload::Report ) -> (Record, Serve) { - let (transports, transports_report) = transport::new(); - let metrics = Arc::new(Mutex::new(Root::new(process, transports_report, tls))); - (Record::new(&metrics, transports), Serve::new(&metrics, idle_retain)) + let metrics = Arc::new(Mutex::new(Root::new(process, transport_report, tls))); + (Record::new(&metrics), Serve::new(&metrics, idle_retain)) } // ===== impl Root ===== diff --git a/src/telemetry/metrics/record.rs b/src/telemetry/metrics/record.rs index dccefc5ff..ee7b470cc 100644 --- a/src/telemetry/metrics/record.rs +++ b/src/telemetry/metrics/record.rs @@ -6,20 +6,18 @@ use super::labels::{ RequestLabels, ResponseLabels, }; -use super::transport; /// Tracks Prometheus metrics #[derive(Clone, Debug)] pub struct Record { metrics: Arc>, - transports: transport::Registry, } // ===== impl Record ===== impl Record { - pub(super) fn new(metrics: &Arc>, transports: transport::Registry) -> Self { - Self { metrics: metrics.clone(), transports } + pub(super) fn new(metrics: &Arc>) -> Self { + Self { metrics: metrics.clone() } } #[inline] @@ -66,22 +64,6 @@ impl Record { metrics.response(ResponseLabels::fail(res)).end(latency) }); }, - - Event::TransportOpen(ref ctx) => { - self.transports.open(ctx); - }, - - Event::TransportClose(ref ctx, ref close) => { - let eos = if close.clean { - transport::Eos::Clean - } else { - transport::Eos::Error { - errno: close.errno.map(|e| e.into()) - } - }; - self.transports - .close(ctx, eos, close.duration, close.rx_bytes, close.tx_bytes); - }, }; } } @@ -102,6 +84,16 @@ mod test { const TLS_DISABLED: Conditional<(), tls::ReasonForNoTls> = Conditional::None(tls::ReasonForNoTls::Disabled); + fn new_record() -> super::Record { + let (r, _) = metrics::new( + Duration::from_secs(100), + Default::default(), + Default::default(), + Default::default() + ); + r + } + fn test_record_response_end_outbound(client_tls: TlsStatus, server_tls: TlsStatus) { let proxy = ctx::Proxy::Outbound; let server = server(proxy, server_tls); @@ -128,7 +120,7 @@ mod test { frames_sent: 0, }; - let (mut r, _) = metrics::new(Duration::from_secs(100), Default::default(), Default::default()); + let mut r = new_record(); let ev = Event::StreamResponseEnd(rsp.clone(), end.clone()); let labels = labels::ResponseLabels::new(&rsp, None); @@ -162,7 +154,6 @@ mod test { fn test_record_one_conn_request_outbound(client_tls: TlsStatus, server_tls: TlsStatus) { use self::Event::*; use self::labels::*; - use std::sync::Arc; let proxy = ctx::Proxy::Outbound; let server = server(proxy, server_tls); @@ -174,17 +165,6 @@ mod test { ], client_tls); let (req, rsp) = request("http://buoyant.io", &server, &client); - let server_transport = - Arc::new(ctx::transport::Ctx::Server(server.clone())); - let client_transport = - Arc::new(ctx::transport::Ctx::Client(client.clone())); - let transport_close = event::TransportClose { - clean: true, - errno: None, - duration: Duration::from_secs(30_000), - rx_bytes: 4321, - tx_bytes: 4321, - }; let request_open_at = Instant::now(); let request_end_at = request_open_at + Duration::from_millis(10); @@ -192,8 +172,6 @@ mod test { let response_first_frame_at = response_open_at + Duration::from_millis(100); let response_end_at = response_first_frame_at + Duration::from_millis(100); let events = vec![ - TransportOpen(server_transport.clone()), - TransportOpen(client_transport.clone()), StreamRequestOpen(req.clone()), StreamRequestEnd(req.clone(), event::StreamRequestEnd { request_open_at, @@ -213,17 +191,9 @@ mod test { bytes_sent: 0, frames_sent: 0, }), - TransportClose( - server_transport.clone(), - transport_close.clone(), - ), - TransportClose( - client_transport.clone(), - transport_close.clone(), - ), - ]; + ]; - let (mut r, _) = metrics::new(Duration::from_secs(1000), Default::default(), Default::default()); + let mut r = new_record(); let req_labels = RequestLabels::new(&req); let rsp_labels = ResponseLabels::new(&rsp, None); @@ -235,8 +205,6 @@ mod test { let lock = r.metrics.lock().expect("lock"); assert!(lock.requests.get(&req_labels).is_none()); assert!(lock.responses.get(&rsp_labels).is_none()); - assert_eq!(r.transports.open_total(&server_transport), 0); - assert_eq!(r.transports.open_total(&client_transport), 0); } for e in &events { @@ -267,26 +235,6 @@ mod test { .assert_gt_exactly(200, 0) .assert_lt_exactly(200, 0); } - - use super::transport::Eos; - let transport_duration: u64 = 30_000 * 1_000; - let t = r.transports; - - assert_eq!(t.open_total(&server_transport), 1); - assert_eq!(t.rx_tx_bytes_total(&server_transport), (4321, 4321)); - assert_eq!(t.close_total(&server_transport, Eos::Clean), 1); - t.connection_durations(&server_transport, Eos::Clean) - .assert_bucket_exactly(transport_duration, 1) - .assert_gt_exactly(transport_duration, 0) - .assert_lt_exactly(transport_duration, 0); - - assert_eq!(t.open_total(&client_transport), 1); - assert_eq!(t.rx_tx_bytes_total(&client_transport), (4321, 4321)); - assert_eq!(t.close_total(&server_transport, Eos::Clean), 1); - t.connection_durations(&server_transport, Eos::Clean) - .assert_bucket_exactly(transport_duration, 1) - .assert_gt_exactly(transport_duration, 0) - .assert_lt_exactly(transport_duration, 0); } } diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs index 23c356aee..9a490d8a9 100644 --- a/src/telemetry/mod.rs +++ b/src/telemetry/mod.rs @@ -22,7 +22,7 @@ mod process; pub mod sensor; pub mod tap; pub mod tls_config_reload; -mod transport; +pub mod transport; use self::errno::Errno; pub use self::event::Event; @@ -33,11 +33,17 @@ pub fn new( start_time: SystemTime, metrics_retain_idle: Duration, taps: &Arc>, -) -> (Sensors, tls_config_reload::Sensor, ServeMetrics) { +) -> (Sensors, transport::Registry, tls_config_reload::Sensor, ServeMetrics) { let process = process::Report::new(start_time); + let (transport_registry, transport_report) = transport::new(); let (tls_config_sensor, tls_config_fmt) = tls_config_reload::new(); - let (record, serve) = metrics::new(metrics_retain_idle, process, tls_config_fmt); + let (record, serve) = metrics::new( + metrics_retain_idle, + process, + transport_report, + tls_config_fmt + ); let s = Sensors::new(record, taps); - (s, tls_config_sensor, serve) + (s, transport_registry, tls_config_sensor, serve) } diff --git a/src/telemetry/sensor/mod.rs b/src/telemetry/sensor/mod.rs index ad6384ec4..94b575479 100644 --- a/src/telemetry/sensor/mod.rs +++ b/src/telemetry/sensor/mod.rs @@ -1,22 +1,16 @@ use std::sync::{Arc, Mutex}; -use std::time::Instant; use http::{Request, Response}; -use tokio_connect; -use tokio::io::{AsyncRead, AsyncWrite}; use tower_service::NewService; use tower_h2::Body; use ctx; use telemetry::{event, metrics, tap}; -use transport::Connection; use transparency::ClientError; pub mod http; -mod transport; pub use self::http::{Http, NewHttp}; -pub use self::transport::{Connect, Transport}; #[derive(Clone, Debug)] struct Inner { @@ -62,27 +56,6 @@ impl Sensors { Sensors(None) } - pub fn accept( - &self, - io: T, - opened_at: Instant, - ctx: &Arc, - ) -> Transport - where - T: AsyncRead + AsyncWrite, - { - debug!("server connection open"); - let ctx = Arc::new(ctx::transport::Ctx::Server(Arc::clone(ctx))); - Transport::open(io, opened_at, Handle(self.0.clone()), ctx) - } - - pub fn connect(&self, connect: C, ctx: &Arc) -> Connect - where - C: tokio_connect::Connect, - { - Connect::new(connect, Handle(self.0.clone()), ctx) - } - pub fn http( &self, new_service: N, diff --git a/src/telemetry/sensor/transport.rs b/src/telemetry/sensor/transport.rs deleted file mode 100644 index 2444110a8..000000000 --- a/src/telemetry/sensor/transport.rs +++ /dev/null @@ -1,252 +0,0 @@ -use bytes::Buf; -use futures::{Async, Future, Poll}; -use std::io; -use std::sync::Arc; -use std::time::Instant; -use tokio_connect; -use tokio::io::{AsyncRead, AsyncWrite}; - -use transport::{Connection, Peek}; -use ctx; -use telemetry::event; - -/// Wraps a transport with telemetry. -#[derive(Debug)] -pub struct Transport(T, Option); - -#[derive(Debug)] -struct Inner { - handle: super::Handle, - ctx: Arc, - opened_at: Instant, - - rx_bytes: u64, - tx_bytes: u64, -} - -/// Builds client transports with telemetry. -#[derive(Clone, Debug)] -pub struct Connect { - underlying: C, - handle: super::Handle, - ctx: Arc, -} - -/// Adds telemetry to a pending client transport. -#[derive(Clone, Debug)] -pub struct Connecting { - underlying: C::Future, - handle: super::Handle, - ctx: Arc, -} - -// === impl Transport === - -impl Transport { - /// Wraps a transport with telemetry and emits a transport open event. - pub(super) fn open( - io: T, - opened_at: Instant, - mut handle: super::Handle, - ctx: Arc, - ) -> Self { - handle.send(|| event::Event::TransportOpen(Arc::clone(&ctx))); - - Transport( - io, - Some(Inner { - ctx, - handle, - opened_at, - rx_bytes: 0, - tx_bytes: 0, - }), - ) - } - - /// Wraps an operation on the underlying transport with error telemetry. - /// - /// If the transport operation results in a non-recoverable error, a transport close - /// event is emitted. - fn sense_err(&mut self, op: F) -> io::Result - where - F: FnOnce(&mut T) -> io::Result, - { - match op(&mut self.0) { - Ok(v) => Ok(v), - Err(e) => { - if e.kind() != io::ErrorKind::WouldBlock { - if let Some(Inner { - mut handle, - ctx, - opened_at, - rx_bytes, - tx_bytes, - }) = self.1.take() - { - let errno = e.raw_os_error(); - handle.send(move || { - let duration = opened_at.elapsed(); - let ev = event::TransportClose { - duration, - clean: false, - errno, - rx_bytes, - tx_bytes, - }; - event::Event::TransportClose(ctx, ev) - }); - } - } - - Err(e) - } - } - } -} - -impl Drop for Transport { - fn drop(&mut self) { - if let Some(Inner { - mut handle, - ctx, - opened_at, - rx_bytes, - tx_bytes, - }) = self.1.take() - { - handle.send(move || { - let duration = opened_at.elapsed(); - let ev = event::TransportClose { - clean: true, - errno: None, - duration, - rx_bytes, - tx_bytes, - }; - event::Event::TransportClose(ctx, ev) - }); - } - } -} - -impl io::Read for Transport { - fn read(&mut self, mut buf: &mut [u8]) -> io::Result { - let bytes = self.sense_err(move |io| io.read(buf))?; - - if let Some(inner) = self.1.as_mut() { - inner.rx_bytes += bytes as u64; - } - - Ok(bytes) - } -} - -impl io::Write for Transport { - fn flush(&mut self) -> io::Result<()> { - self.sense_err(|io| io.flush()) - } - - fn write(&mut self, buf: &[u8]) -> io::Result { - let bytes = self.sense_err(move |io| io.write(buf))?; - - if let Some(inner) = self.1.as_mut() { - inner.tx_bytes += bytes as u64; - } - - Ok(bytes) - } -} - -impl AsyncRead for Transport { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.0.prepare_uninitialized_buffer(buf) - } -} - -impl AsyncWrite for Transport { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.sense_err(|io| io.shutdown()) - } - - fn write_buf(&mut self, buf: &mut B) -> Poll { - let bytes = try_ready!(self.sense_err(|io| io.write_buf(buf))); - - if let Some(inner) = self.1.as_mut() { - inner.tx_bytes += bytes as u64; - } - - Ok(Async::Ready(bytes)) - } -} - -impl Peek for Transport { - fn poll_peek(&mut self) -> Poll { - self.sense_err(|io| io.poll_peek()) - } - - fn peeked(&self) -> &[u8] { - self.0.peeked() - } -} - -// === impl Connect === - -impl Connect -where - C: tokio_connect::Connect, -{ - /// Returns a `Connect` to `addr` and `handle`. - pub(super) fn new( - underlying: C, - handle: super::Handle, - ctx: &Arc, - ) -> Self { - Connect { - underlying, - handle, - ctx: Arc::clone(ctx), - } - } -} - -impl tokio_connect::Connect for Connect -where - C: tokio_connect::Connect, -{ - type Connected = Transport; - type Error = C::Error; - type Future = Connecting; - - fn connect(&self) -> Self::Future { - Connecting { - underlying: self.underlying.connect(), - handle: self.handle.clone(), - ctx: Arc::clone(&self.ctx), - } - } -} - -// === impl Connecting === - -impl Future for Connecting -where - C: tokio_connect::Connect, -{ - type Item = Transport; - type Error = C::Error; - - fn poll(&mut self) -> Poll { - let io = try_ready!(self.underlying.poll()); - debug!("client connection open"); - let ctx = ctx::transport::Client::new( - self.ctx.proxy, - &self.ctx.remote, - self.ctx.metadata.clone(), - io.tls_status(), - ); - let ctx = Arc::new(ctx.into()); - let trans = Transport::open(io, Instant::now(), self.handle.clone(), ctx); - Ok(trans.into()) - } -} diff --git a/src/telemetry/tap/mod.rs b/src/telemetry/tap/mod.rs index 857f74080..5b8e592ee 100644 --- a/src/telemetry/tap/mod.rs +++ b/src/telemetry/tap/mod.rs @@ -37,7 +37,7 @@ impl Taps { /// pub(super) fn inspect(&mut self, ev: &Event) { - if !ev.is_http() || self.by_id.is_empty() { + if self.by_id.is_empty() { return; } debug!("inspect taps={:?} event={:?}", self.by_id.keys().collect::>(), ev); diff --git a/src/telemetry/transport/io.rs b/src/telemetry/transport/io.rs new file mode 100644 index 000000000..5340ded3f --- /dev/null +++ b/src/telemetry/transport/io.rs @@ -0,0 +1,159 @@ +use bytes::Buf; +use futures::{Async, Future, Poll}; +use std::io; +use tokio_connect; +use tokio::io::{AsyncRead, AsyncWrite}; + +use transport::{Connection, Peek}; + +use super::{NewSensor, Sensor, Eos}; + +/// Wraps a transport with telemetry. +#[derive(Debug)] +pub struct Io { + io: T, + sensor: Sensor, +} + +/// Builds client transports with telemetry. +#[derive(Clone, Debug)] +pub struct Connect { + underlying: C, + new_sensor: NewSensor, +} + +/// Adds telemetry to a pending client transport. +#[derive(Clone, Debug)] +pub struct Connecting { + underlying: C::Future, + new_sensor: Option, +} + +// === impl Io === + +impl Io { + pub(super) fn new(io: T, sensor: Sensor) -> Self { + Self { io, sensor } + } + + /// Wraps an operation on the underlying transport with error telemetry. + /// + /// If the transport operation results in a non-recoverable error, record a + /// transport closure. + fn sense_err(&mut self, op: F) -> io::Result + where + F: FnOnce(&mut T) -> io::Result, + { + match op(&mut self.io) { + Ok(v) => Ok(v), + Err(e) => { + if e.kind() != io::ErrorKind::WouldBlock { + let eos = Eos::Error { errno: e.raw_os_error().map(|e| e.into()) }; + self.sensor.record_close(eos); + } + + Err(e) + } + } + } +} + +impl io::Read for Io { + fn read(&mut self, mut buf: &mut [u8]) -> io::Result { + let bytes = self.sense_err(move |io| io.read(buf))?; + self.sensor.record_read(bytes); + + Ok(bytes) + } +} + +impl io::Write for Io { + fn flush(&mut self) -> io::Result<()> { + self.sense_err(|io| io.flush()) + } + + fn write(&mut self, buf: &[u8]) -> io::Result { + let bytes = self.sense_err(move |io| io.write(buf))?; + self.sensor.record_write(bytes); + + Ok(bytes) + } +} + +impl AsyncRead for Io { + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.io.prepare_uninitialized_buffer(buf) + } +} + +impl AsyncWrite for Io { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.sense_err(|io| io.shutdown()) + } + + fn write_buf(&mut self, buf: &mut B) -> Poll { + let bytes = try_ready!(self.sense_err(|io| io.write_buf(buf))); + self.sensor.record_write(bytes); + + Ok(Async::Ready(bytes)) + } +} + +impl Peek for Io { + fn poll_peek(&mut self) -> Poll { + self.sense_err(|io| io.poll_peek()) + } + + fn peeked(&self) -> &[u8] { + self.io.peeked() + } +} + +// === impl Connect === + +impl Connect +where + C: tokio_connect::Connect, +{ + /// Returns a `Connect` to `addr` and `handle`. + pub(super) fn new(underlying: C, new_sensor: NewSensor) -> Self { + Self { underlying, new_sensor } + } +} + +impl tokio_connect::Connect for Connect +where + C: tokio_connect::Connect, +{ + type Connected = Io; + type Error = C::Error; + type Future = Connecting; + + fn connect(&self) -> Self::Future { + Connecting { + underlying: self.underlying.connect(), + new_sensor: Some(self.new_sensor.clone()), + } + } +} + +// === impl Connecting === + +impl Future for Connecting +where + C: tokio_connect::Connect, +{ + type Item = Io; + type Error = C::Error; + + fn poll(&mut self) -> Poll { + let io = try_ready!(self.underlying.poll()); + debug!("client connection open"); + + let sensor = self.new_sensor.take() + .expect("future must not be polled after ready") + .new_sensor(); + let t = Io::new(io, sensor); + Ok(t.into()) + } +} diff --git a/src/telemetry/transport/mod.rs b/src/telemetry/transport/mod.rs index 46a0131af..7bb29e72d 100644 --- a/src/telemetry/transport/mod.rs +++ b/src/telemetry/transport/mod.rs @@ -1,17 +1,24 @@ use indexmap::IndexMap; use std::fmt; -use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::sync::{Arc, Mutex, MutexGuard}; +use std::time::Instant; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_connect; use ctx; use telemetry::Errno; use telemetry::metrics::{ latency, - prom::{FmtLabels, FmtMetrics}, + prom::{FmtLabels, FmtMetric, FmtMetrics, Metric}, Counter, Gauge, Histogram, }; +use transport::Connection; + +mod io; + +pub use self::io::{Connect, Connecting, Io}; metrics! { tcp_open_total: Counter { "Total count of opened connections" }, @@ -32,16 +39,15 @@ pub fn new() -> (Registry, Report) { #[derive(Debug, Default)] pub struct Report(Arc>); -/// Supports recording telemetry metrics. -#[derive(Clone, Debug)] +/// Instruments transports to record telemetry. +#[derive(Clone, Debug, Default)] pub struct Registry(Arc>); -#[derive(Debug, Default)] -struct Inner(IndexMap); - -/// Describes the dimensions across which transport metrics are aggregated. +/// Describes a class of transport. /// -/// Implements `fmt::Display` to render a comma-separated list of key-value pairs. +/// A `Metrics` type exists for each unique `Key`. +/// +/// Implements `FmtLabels`. #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] struct Key { proxy: ctx::Proxy, @@ -49,7 +55,10 @@ struct Key { tls_status: ctx::transport::TlsStatus, } -/// Holds all of the metrics for a class of transport. +/// Stores a class of transport's metrics. +/// +/// TODO We should probaby use AtomicUsize for most of these counters so that +/// simple increments don't require a lock. Especially for read|write_bytes_total. #[derive(Debug, Default)] struct Metrics { open_total: Counter, @@ -60,9 +69,11 @@ struct Metrics { by_eos: IndexMap, } -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] -enum Peer { Src, Dst } - +/// Describes a class of transport end. +/// +/// An `EosMetrics` type exists for each unique `Key` and `Eos` pair. +/// +/// Implements `FmtLabels`. #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub enum Eos { Clean, @@ -78,110 +89,157 @@ struct EosMetrics { connection_duration: Histogram, } +/// Tracks the state of a single instance of `Io` throughout its lifetime. +#[derive(Debug)] +struct Sensor { + metrics: Option>>, + opened_at: Instant, +} + +/// Lazily builds instances of `Sensor`. +#[derive(Clone, Debug)] +struct NewSensor(Option>>); + +/// Shares state between `Report` and `Registry`. +#[derive(Debug, Default)] +struct Inner(IndexMap>>); + +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] +enum Peer { + /// Represents the side of the proxy that accepts connections. + Src, + /// Represents the side of the proxy that opens connections. + Dst, +} + +// ===== impl Inner ===== + impl Inner { fn is_empty(&self) -> bool { self.0.is_empty() } - /// Iterates over all metrics. - fn iter(&self) -> impl Iterator { + fn iter(&self) -> impl Iterator)> { self.0.iter() + .filter_map(|(k, l)| l.lock().ok().map(move |m| (k, m))) } - /// Iterates over all end-of-stream metrics. - fn iter_eos(&self) -> impl Iterator { - self.iter() - .flat_map(|(k, t)| { - t.by_eos.iter().map(move |(e, m)| ((k ,e), m)) - }) + /// Formats a metric across all instances of `Metrics` in the registry. + fn fmt_by(&self, f: &mut fmt::Formatter, metric: Metric, get_metric: F) + -> fmt::Result + where + F: Fn(&Metrics) -> &M, + M: FmtMetric, + { + for (key, m) in self.iter() { + get_metric(&*m).fmt_metric_labeled(f, metric.name, key)?; + } + + Ok(()) } - fn get_or_default(&mut self, k: Key) -> &mut Metrics { - self.0.entry(k).or_insert_with(|| Metrics::default()) + /// Formats a metric across all instances of `EosMetrics` in the registry. + fn fmt_eos_by(&self, f: &mut fmt::Formatter, metric: Metric, get_metric: F) + -> fmt::Result + where + F: Fn(&EosMetrics) -> &M, + M: FmtMetric, + { + for (key, metrics) in self.iter() { + for (eos, m) in (*metrics).by_eos.iter() { + get_metric(&*m).fmt_metric_labeled(f, metric.name, (key, eos))?; + } + } + + Ok(()) + } + + fn get_or_default(&mut self, k: Key) -> &Arc> { + self.0.entry(k).or_insert_with(|| Default::default()) } } // ===== impl Registry ===== impl Registry { - pub fn open(&mut self, ctx: &ctx::transport::Ctx) { - let mut inner = match self.0.lock() { - Err(_) => return, - Ok(lock) => lock, - }; - let metrics = inner.get_or_default(Key::new(ctx)); - metrics.open_total.incr(); - metrics.open_connections.incr(); + pub fn new_connect(&self, ctx: &ctx::transport::Client, inner: C) -> Connect + where + C: tokio_connect::Connect, + { + let metrics = match self.0.lock() { + Ok(mut inner) => { + Some(inner.get_or_default(Key::client(ctx)).clone()) + } + Err(_) => { + error!("unable to lock metrics registry"); + None + } + }; + Connect::new(inner, NewSensor(metrics)) } - pub fn close( - &mut self, - ctx: &ctx::transport::Ctx, - eos: Eos, - duration: Duration, - rx: u64, - tx: u64, - ) { - let mut inner = match self.0.lock() { - Err(_) => return, - Ok(lock) => lock, + pub fn accept(&self, ctx: &ctx::transport::Server, io: T) -> Io + where + T: AsyncRead + AsyncWrite, + { + let metrics = match self.0.lock() { + Ok(mut inner) => Some(inner.get_or_default(Key::server(ctx)).clone()), + Err(_) => { + error!("unable to lock metrics registry"); + None + } }; - - let key = Key::new(ctx); - let metrics = inner.get_or_default(key); - metrics.open_connections.decr(); - metrics.read_bytes_total += rx; - metrics.write_bytes_total += tx; - - let class = metrics.by_eos - .entry(eos) - .or_insert_with(|| EosMetrics::default()); - class.close_total.incr(); - class.connection_duration.add(duration); + Io::new(io, Sensor::open(metrics)) } +} + +#[cfg(test)] +impl Registry { - #[cfg(test)] pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 { self.0.lock().unwrap().0 .get(&Key::new(ctx)) - .map(|m| m.open_total.into()) + .map(|m| m.lock().unwrap().open_total.into()) .unwrap_or(0) } - // #[cfg(test)] // pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 { // self.0.lock().unwrap().0 // .get(&Key::new(ctx)) - // .map(|m| m.open_connections.into()) + // .map(|m| m.lock().unwrap().open_connections.into()) // .unwrap_or(0) // } - #[cfg(test)] pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) { self.0.lock().unwrap().0 .get(&Key::new(ctx)) - .map(|m| (m.read_bytes_total.into(), m.write_bytes_total.into())) + .map(|m| { + let m = m.lock().unwrap(); + (m.read_bytes_total.into(), m.write_bytes_total.into()) + }) .unwrap_or((0, 0)) } - #[cfg(test)] pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 { self.0.lock().unwrap().0 .get(&Key::new(ctx)) - .and_then(move |m| m.by_eos.get(&eos).map(|m| m.close_total.into())) + .and_then(move |m| m.lock().unwrap().by_eos.get(&eos).map(|m| m.close_total.into())) .unwrap_or(0) } - #[cfg(test)] pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram { self.0.lock().unwrap().0 .get(&Key::new(ctx)) - .and_then(move |m| m.by_eos.get(&eos).map(|m| m.connection_duration.clone())) + .and_then(move |m| { + m.lock().unwrap().by_eos.get(&eos).map(|m| m.connection_duration.clone()) + }) .unwrap_or_default() } } +// ===== impl Report ===== + impl FmtMetrics for Report { fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result { let metrics = match self.0.lock() { @@ -194,63 +252,141 @@ impl FmtMetrics for Report { } tcp_open_total.fmt_help(f)?; - tcp_open_total.fmt_scopes(f, metrics.iter(), |m| &m.open_total)?; + metrics.fmt_by(f, tcp_open_total, |m| &m.open_total)?; tcp_open_connections.fmt_help(f)?; - tcp_open_connections.fmt_scopes(f, metrics.iter(), |m| &m.open_connections)?; + metrics.fmt_by(f, tcp_open_connections, |m| &m.open_connections)?; tcp_read_bytes_total.fmt_help(f)?; - tcp_read_bytes_total.fmt_scopes(f, metrics.iter(), |m| &m.read_bytes_total)?; + metrics.fmt_by(f, tcp_read_bytes_total, |m| &m.read_bytes_total)?; tcp_write_bytes_total.fmt_help(f)?; - tcp_write_bytes_total.fmt_scopes(f, metrics.iter(), |m| &m.write_bytes_total)?; + metrics.fmt_by(f, tcp_write_bytes_total, |m| &m.write_bytes_total)?; tcp_close_total.fmt_help(f)?; - tcp_close_total.fmt_scopes(f, metrics.iter_eos(), |e| &e.close_total)?; + metrics.fmt_eos_by(f, tcp_close_total, |e| &e.close_total)?; tcp_connection_duration_ms.fmt_help(f)?; - tcp_connection_duration_ms.fmt_scopes(f, metrics.iter_eos(), |e| &e.connection_duration)?; + metrics.fmt_eos_by(f, tcp_connection_duration_ms, |e| &e.connection_duration)?; Ok(()) } } +// ===== impl Sensor ===== + +impl Sensor { + + pub fn open(metrics: Option>>) -> Self { + if let Some(ref m) = metrics { + if let Ok(mut m) = m.lock() { + m.open_total.incr(); + m.open_connections.incr(); + } + } + Self { + metrics, + opened_at: Instant::now(), + } + } + + pub fn record_read(&mut self, sz: usize) { + if let Some(ref m) = self.metrics { + if let Ok(mut m) = m.lock() { + m.read_bytes_total += sz as u64; + } + } + } + + pub fn record_write(&mut self, sz: usize) { + if let Some(ref m) = self.metrics { + if let Ok(mut m) = m.lock() { + m.write_bytes_total += sz as u64; + } + } + } + + pub fn record_close(&mut self, eos: Eos) { + // When closed, the metrics structure is dropped so that no further + // updates can occur (i.e. so that an additional close won't be recorded + // on Drop). + if let Some(m) = self.metrics.take() { + let duration = self.opened_at.elapsed(); + if let Ok(mut m) = m.lock() { + m.open_connections.decr(); + + let class = m.by_eos.entry(eos).or_insert_with(|| EosMetrics::default()); + class.close_total.incr(); + class.connection_duration.add(duration); + } + } + } +} + +impl Drop for Sensor { + fn drop(&mut self) { + self.record_close(Eos::Clean) + } +} + +// ===== impl NewSensor ===== + +impl NewSensor { + fn new_sensor(mut self) -> Sensor { + Sensor::open(self.0.take()) + } +} + // ===== impl Key ===== +impl Key { + #[cfg(test)] + fn new(ctx: &ctx::transport::Ctx) -> Self { + match ctx { + ctx::transport::Ctx::Client(ctx) => Self::client(ctx), + ctx::transport::Ctx::Server(ctx) => Self::server(ctx), + } + } + + fn client(ctx: &ctx::transport::Client) -> Self { + Self { + proxy: ctx.proxy, + peer: Peer::Dst, + tls_status: ctx.tls_status.into(), + } + } + + fn server(ctx: &ctx::transport::Server) -> Self { + Self { + proxy: ctx.proxy, + peer: Peer::Src, + tls_status: ctx.tls_status.into(), + } + } +} + impl FmtLabels for Key { fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result { ((self.proxy, self.peer), self.tls_status).fmt_labels(f) } } -impl Key { - fn new(ctx: &ctx::transport::Ctx) -> Self { - Self { - proxy: ctx.proxy(), - peer: match *ctx { - ctx::transport::Ctx::Server(_) => Peer::Src, - ctx::transport::Ctx::Client(_) => Peer::Dst, - }, - tls_status: ctx.tls_status().into(), - } - } -} +// ===== impl Peer ===== impl FmtLabels for Peer { fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { + match self { Peer::Src => f.pad("peer=\"src\""), Peer::Dst => f.pad("peer=\"dst\""), } } } - // ===== impl Eos ===== impl FmtLabels for Eos { fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result { - match &self { + match self { Eos::Clean => f.pad("classification=\"success\""), Eos::Error { errno } => { f.pad("classification=\"failure\"")?; diff --git a/src/transparency/server.rs b/src/transparency/server.rs index f9ecdf4bc..11beb7760 100644 --- a/src/transparency/server.rs +++ b/src/transparency/server.rs @@ -3,7 +3,7 @@ use std::{ fmt, net::SocketAddr, sync::Arc, - time::{Duration, Instant}, + time::Duration, }; use futures::{future::Either, Future}; @@ -18,7 +18,7 @@ use transport::{Connection, Peek}; use ctx::Proxy as ProxyCtx; use ctx::transport::{Server as ServerCtx}; use drain; -use telemetry::Sensors; +use telemetry; use transport::GetOriginalDst; use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc}; use super::protocol::Protocol; @@ -47,7 +47,7 @@ where listen_addr: SocketAddr, new_service: S, proxy_ctx: ProxyCtx, - sensors: Sensors, + transport_registry: telemetry::transport::Registry, tcp: tcp::Proxy, log: ::logging::Server, } @@ -76,7 +76,7 @@ where pub fn new( listen_addr: SocketAddr, proxy_ctx: ProxyCtx, - sensors: Sensors, + transport_registry: telemetry::transport::Registry, get_orig_dst: G, stack: S, tcp_connect_timeout: Duration, @@ -84,7 +84,7 @@ where drain_signal: drain::Watch, ) -> Self { let recv_body_svc = HttpBodyNewSvc::new(stack.clone()); - let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone()); + let tcp = tcp::Proxy::new(tcp_connect_timeout, transport_registry.clone()); let log = ::logging::Server::proxy(proxy_ctx, listen_addr); Server { disable_protocol_detection_ports, @@ -99,7 +99,7 @@ where listen_addr, new_service: stack, proxy_ctx, - sensors, + transport_registry, tcp, log, } @@ -118,8 +118,6 @@ where pub fn serve(&self, connection: Connection, remote_addr: SocketAddr) -> impl Future { - let opened_at = Instant::now(); - // create Server context let orig_dst = connection.original_dst_addr(&self.get_orig_dst); let local_addr = connection.local_addr().unwrap_or(self.listen_addr); @@ -134,7 +132,7 @@ where .with_remote(remote_addr); // record telemetry - let io = self.sensors.accept(connection, opened_at, &srv_ctx); + let io = self.transport_registry.accept(&srv_ctx, connection); // We are using the port from the connection's SO_ORIGINAL_DST to // determine whether to skip protocol detection, not any port that diff --git a/src/transparency/tcp.rs b/src/transparency/tcp.rs index e04b8862f..0040f5e6e 100644 --- a/src/transparency/tcp.rs +++ b/src/transparency/tcp.rs @@ -13,7 +13,7 @@ use ctx::transport::{ Client as ClientCtx, Server as ServerCtx, }; -use telemetry::Sensors; +use telemetry; use timeout::Timeout; use transport::{self, tls}; use ctx::transport::TlsStatus; @@ -22,15 +22,18 @@ use ctx::transport::TlsStatus; #[derive(Debug, Clone)] pub struct Proxy { connect_timeout: Duration, - sensors: Sensors, + transport_registry: telemetry::transport::Registry, } impl Proxy { /// Create a new TCP `Proxy`. - pub fn new(connect_timeout: Duration, sensors: Sensors) -> Self { + pub fn new( + connect_timeout: Duration, + transport_registry: telemetry::transport::Registry + ) -> Self { Self { connect_timeout, - sensors, + transport_registry, } } @@ -72,7 +75,7 @@ impl Proxy { transport::Connect::new(orig_dst, tls), self.connect_timeout, ); - let connect = self.sensors.connect(c, &client_ctx); + let connect = self.transport_registry.new_connect(&client_ctx, c); future::Either::A(connect.connect() .map_err(move |e| error!("tcp connect error to {}: {:?}", orig_dst, e))