Record transport telemetry without Events (#76)
Previousy, transport telemetry was recorded by emitting Events from an IO instance to an aggegator. This requires that each update take a global telemetry lock, and is an impediment to richer telemetry. This change removes the transport event types so that the Event and Record types are left only to represent HTTP telemetry. Now, the transport's IO type holds a reference to a shared `Metrics` structure. As the transport is used, metric values are updated immediately. A lock on the transport _registry_ is taken whenever a new transport is opened/accepted and when metrics are reported. Each transport class's metrics are now shared & locked independently, so it's possible for a transport to update its metrics while the registry is being manipulated. This has one functional change: the `tcp_read_bytes_total` and `tcp_write_bytes_total` counters are now updated instantaneously. Previously these values were only incremented on transport close, which is misleading, especially for long-lived connections. With this change, all transport-related telemetry logic lives in `telemetry::transport`.
This commit is contained in:
parent
912b9d3b7c
commit
447f3320a7
18
src/bind.rs
18
src/bind.rs
|
@ -29,6 +29,7 @@ use watch_service::{WatchService, Rebind};
|
|||
pub struct Bind<C, B> {
|
||||
ctx: C,
|
||||
sensors: telemetry::Sensors,
|
||||
transport_registry: telemetry::transport::Registry,
|
||||
tls_client_config: tls::ClientConfigWatch,
|
||||
_p: PhantomData<fn() -> B>,
|
||||
}
|
||||
|
@ -149,7 +150,7 @@ pub type HttpResponse = http::Response<sensor::http::ResponseBody<HttpBody>>;
|
|||
pub type HttpRequest<B> = http::Request<sensor::http::RequestBody<B>>;
|
||||
|
||||
pub type Client<B> = transparency::Client<
|
||||
sensor::Connect<transport::Connect>,
|
||||
telemetry::transport::Connect<transport::Connect>,
|
||||
::logging::ClientExecutor<&'static str, SocketAddr>,
|
||||
B,
|
||||
>;
|
||||
|
@ -181,10 +182,14 @@ impl Error for BufferSpawnError {
|
|||
}
|
||||
|
||||
impl<B> Bind<(), B> {
|
||||
pub fn new(tls_client_config: tls::ClientConfigWatch) -> Self {
|
||||
pub fn new(
|
||||
transport_registry: telemetry::transport::Registry,
|
||||
tls_client_config: tls::ClientConfigWatch
|
||||
) -> Self {
|
||||
Self {
|
||||
ctx: (),
|
||||
sensors: telemetry::Sensors::null(),
|
||||
transport_registry,
|
||||
tls_client_config,
|
||||
_p: PhantomData,
|
||||
}
|
||||
|
@ -201,6 +206,7 @@ impl<B> Bind<(), B> {
|
|||
Bind {
|
||||
ctx,
|
||||
sensors: self.sensors,
|
||||
transport_registry: self.transport_registry,
|
||||
tls_client_config: self.tls_client_config,
|
||||
_p: PhantomData,
|
||||
}
|
||||
|
@ -212,6 +218,7 @@ impl<C: Clone, B> Clone for Bind<C, B> {
|
|||
Self {
|
||||
ctx: self.ctx.clone(),
|
||||
sensors: self.sensors.clone(),
|
||||
transport_registry: self.transport_registry.clone(),
|
||||
tls_client_config: self.tls_client_config.clone(),
|
||||
_p: PhantomData,
|
||||
}
|
||||
|
@ -261,10 +268,9 @@ where
|
|||
);
|
||||
|
||||
// Map a socket address to a connection.
|
||||
let connect = self.sensors.connect(
|
||||
transport::Connect::new(addr, tls),
|
||||
&client_ctx,
|
||||
);
|
||||
let connect = self.transport_registry
|
||||
.new_connect(client_ctx.as_ref(), transport::Connect::new(addr, tls));
|
||||
|
||||
|
||||
let log = ::logging::Client::proxy(self.ctx, addr)
|
||||
.with_protocol(protocol.clone());
|
||||
|
|
|
@ -114,8 +114,11 @@ mod tests {
|
|||
use tls;
|
||||
|
||||
fn new_inbound(default: Option<net::SocketAddr>, ctx: ctx::Proxy) -> Inbound<()> {
|
||||
let bind = Bind::new(tls::ClientConfig::no_tls()).with_ctx(ctx);
|
||||
Inbound::new(default, bind)
|
||||
let bind = Bind::new(
|
||||
::telemetry::transport::Registry::default(),
|
||||
tls::ClientConfig::no_tls()
|
||||
);
|
||||
Inbound::new(default, bind.with_ctx(ctx))
|
||||
}
|
||||
|
||||
fn make_key_http1(addr: net::SocketAddr) -> (net::SocketAddr, bind::Protocol) {
|
||||
|
|
13
src/lib.rs
13
src/lib.rs
|
@ -251,7 +251,7 @@ where
|
|||
);
|
||||
|
||||
let (taps, observe) = control::Observe::new(100);
|
||||
let (sensors, tls_config_sensor, metrics_server) = telemetry::new(
|
||||
let (sensors, transport_registry, tls_config_sensor, metrics_server) = telemetry::new(
|
||||
start_time,
|
||||
config.metrics_retain_idle,
|
||||
&taps,
|
||||
|
@ -286,7 +286,8 @@ where
|
|||
|
||||
let (drain_tx, drain_rx) = drain::channel();
|
||||
|
||||
let bind = Bind::new(tls_client_config).with_sensors(sensors.clone());
|
||||
let bind = Bind::new(transport_registry.clone(), tls_client_config)
|
||||
.with_sensors(sensors.clone());
|
||||
|
||||
// Setup the public listener. This will listen on a publicly accessible
|
||||
// address and listen for inbound connections that should be forwarded
|
||||
|
@ -307,7 +308,7 @@ where
|
|||
config.private_connect_timeout,
|
||||
config.inbound_ports_disable_protocol_detection,
|
||||
ctx,
|
||||
sensors.clone(),
|
||||
transport_registry.clone(),
|
||||
get_original_dst.clone(),
|
||||
drain_rx.clone(),
|
||||
)
|
||||
|
@ -330,7 +331,7 @@ where
|
|||
config.public_connect_timeout,
|
||||
config.outbound_ports_disable_protocol_detection,
|
||||
ctx,
|
||||
sensors,
|
||||
transport_registry,
|
||||
get_original_dst,
|
||||
drain_rx,
|
||||
)
|
||||
|
@ -392,7 +393,7 @@ fn serve<R, B, E, F, G>(
|
|||
tcp_connect_timeout: Duration,
|
||||
disable_protocol_detection_ports: IndexSet<u16>,
|
||||
proxy_ctx: ctx::Proxy,
|
||||
sensors: telemetry::Sensors,
|
||||
transport_registry: telemetry::transport::Registry,
|
||||
get_orig_dst: G,
|
||||
drain_rx: drain::Watch,
|
||||
) -> impl Future<Item = (), Error = io::Error> + Send + 'static
|
||||
|
@ -453,7 +454,7 @@ where
|
|||
let server = Server::new(
|
||||
listen_addr,
|
||||
proxy_ctx,
|
||||
sensors,
|
||||
transport_registry,
|
||||
get_orig_dst,
|
||||
stack,
|
||||
tcp_connect_timeout,
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::Instant;
|
||||
|
||||
use h2;
|
||||
|
||||
|
@ -7,9 +7,6 @@ use ctx;
|
|||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Event {
|
||||
TransportOpen(Arc<ctx::transport::Ctx>),
|
||||
TransportClose(Arc<ctx::transport::Ctx>, TransportClose),
|
||||
|
||||
StreamRequestOpen(Arc<ctx::http::Request>),
|
||||
StreamRequestFail(Arc<ctx::http::Request>, StreamRequestFail),
|
||||
StreamRequestEnd(Arc<ctx::http::Request>, StreamRequestEnd),
|
||||
|
@ -19,19 +16,6 @@ pub enum Event {
|
|||
StreamResponseEnd(Arc<ctx::http::Response>, StreamResponseEnd),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct TransportClose {
|
||||
/// Indicates that the transport was closed without error.
|
||||
// TODO include details.
|
||||
pub clean: bool,
|
||||
pub errno: Option<i32>,
|
||||
|
||||
pub duration: Duration,
|
||||
|
||||
pub rx_bytes: u64,
|
||||
pub tx_bytes: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct StreamRequestFail {
|
||||
pub request_open_at: Instant,
|
||||
|
@ -72,26 +56,3 @@ pub struct StreamResponseEnd {
|
|||
pub bytes_sent: u64,
|
||||
pub frames_sent: u32,
|
||||
}
|
||||
|
||||
// ===== impl Event =====
|
||||
|
||||
impl Event {
|
||||
pub fn is_http(&self) -> bool {
|
||||
match *self {
|
||||
Event::StreamRequestOpen(_) |
|
||||
Event::StreamRequestFail(_, _) |
|
||||
Event::StreamRequestEnd(_, _) |
|
||||
Event::StreamResponseOpen(_, _) |
|
||||
Event::StreamResponseFail(_, _) |
|
||||
Event::StreamResponseEnd(_, _) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_transport(&self) -> bool {
|
||||
match *self {
|
||||
Event::TransportOpen(_) | Event::TransportClose(_, _) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,11 +82,11 @@ struct Stamped<T> {
|
|||
pub fn new(
|
||||
idle_retain: Duration,
|
||||
process: process::Report,
|
||||
transport_report: transport::Report,
|
||||
tls: tls_config_reload::Report
|
||||
) -> (Record, Serve) {
|
||||
let (transports, transports_report) = transport::new();
|
||||
let metrics = Arc::new(Mutex::new(Root::new(process, transports_report, tls)));
|
||||
(Record::new(&metrics, transports), Serve::new(&metrics, idle_retain))
|
||||
let metrics = Arc::new(Mutex::new(Root::new(process, transport_report, tls)));
|
||||
(Record::new(&metrics), Serve::new(&metrics, idle_retain))
|
||||
}
|
||||
|
||||
// ===== impl Root =====
|
||||
|
|
|
@ -6,20 +6,18 @@ use super::labels::{
|
|||
RequestLabels,
|
||||
ResponseLabels,
|
||||
};
|
||||
use super::transport;
|
||||
|
||||
/// Tracks Prometheus metrics
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Record {
|
||||
metrics: Arc<Mutex<Root>>,
|
||||
transports: transport::Registry,
|
||||
}
|
||||
|
||||
// ===== impl Record =====
|
||||
|
||||
impl Record {
|
||||
pub(super) fn new(metrics: &Arc<Mutex<Root>>, transports: transport::Registry) -> Self {
|
||||
Self { metrics: metrics.clone(), transports }
|
||||
pub(super) fn new(metrics: &Arc<Mutex<Root>>) -> Self {
|
||||
Self { metrics: metrics.clone() }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -66,22 +64,6 @@ impl Record {
|
|||
metrics.response(ResponseLabels::fail(res)).end(latency)
|
||||
});
|
||||
},
|
||||
|
||||
Event::TransportOpen(ref ctx) => {
|
||||
self.transports.open(ctx);
|
||||
},
|
||||
|
||||
Event::TransportClose(ref ctx, ref close) => {
|
||||
let eos = if close.clean {
|
||||
transport::Eos::Clean
|
||||
} else {
|
||||
transport::Eos::Error {
|
||||
errno: close.errno.map(|e| e.into())
|
||||
}
|
||||
};
|
||||
self.transports
|
||||
.close(ctx, eos, close.duration, close.rx_bytes, close.tx_bytes);
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -102,6 +84,16 @@ mod test {
|
|||
const TLS_DISABLED: Conditional<(), tls::ReasonForNoTls> =
|
||||
Conditional::None(tls::ReasonForNoTls::Disabled);
|
||||
|
||||
fn new_record() -> super::Record {
|
||||
let (r, _) = metrics::new(
|
||||
Duration::from_secs(100),
|
||||
Default::default(),
|
||||
Default::default(),
|
||||
Default::default()
|
||||
);
|
||||
r
|
||||
}
|
||||
|
||||
fn test_record_response_end_outbound(client_tls: TlsStatus, server_tls: TlsStatus) {
|
||||
let proxy = ctx::Proxy::Outbound;
|
||||
let server = server(proxy, server_tls);
|
||||
|
@ -128,7 +120,7 @@ mod test {
|
|||
frames_sent: 0,
|
||||
};
|
||||
|
||||
let (mut r, _) = metrics::new(Duration::from_secs(100), Default::default(), Default::default());
|
||||
let mut r = new_record();
|
||||
let ev = Event::StreamResponseEnd(rsp.clone(), end.clone());
|
||||
let labels = labels::ResponseLabels::new(&rsp, None);
|
||||
|
||||
|
@ -162,7 +154,6 @@ mod test {
|
|||
fn test_record_one_conn_request_outbound(client_tls: TlsStatus, server_tls: TlsStatus) {
|
||||
use self::Event::*;
|
||||
use self::labels::*;
|
||||
use std::sync::Arc;
|
||||
|
||||
let proxy = ctx::Proxy::Outbound;
|
||||
let server = server(proxy, server_tls);
|
||||
|
@ -174,17 +165,6 @@ mod test {
|
|||
], client_tls);
|
||||
|
||||
let (req, rsp) = request("http://buoyant.io", &server, &client);
|
||||
let server_transport =
|
||||
Arc::new(ctx::transport::Ctx::Server(server.clone()));
|
||||
let client_transport =
|
||||
Arc::new(ctx::transport::Ctx::Client(client.clone()));
|
||||
let transport_close = event::TransportClose {
|
||||
clean: true,
|
||||
errno: None,
|
||||
duration: Duration::from_secs(30_000),
|
||||
rx_bytes: 4321,
|
||||
tx_bytes: 4321,
|
||||
};
|
||||
|
||||
let request_open_at = Instant::now();
|
||||
let request_end_at = request_open_at + Duration::from_millis(10);
|
||||
|
@ -192,8 +172,6 @@ mod test {
|
|||
let response_first_frame_at = response_open_at + Duration::from_millis(100);
|
||||
let response_end_at = response_first_frame_at + Duration::from_millis(100);
|
||||
let events = vec![
|
||||
TransportOpen(server_transport.clone()),
|
||||
TransportOpen(client_transport.clone()),
|
||||
StreamRequestOpen(req.clone()),
|
||||
StreamRequestEnd(req.clone(), event::StreamRequestEnd {
|
||||
request_open_at,
|
||||
|
@ -213,17 +191,9 @@ mod test {
|
|||
bytes_sent: 0,
|
||||
frames_sent: 0,
|
||||
}),
|
||||
TransportClose(
|
||||
server_transport.clone(),
|
||||
transport_close.clone(),
|
||||
),
|
||||
TransportClose(
|
||||
client_transport.clone(),
|
||||
transport_close.clone(),
|
||||
),
|
||||
];
|
||||
];
|
||||
|
||||
let (mut r, _) = metrics::new(Duration::from_secs(1000), Default::default(), Default::default());
|
||||
let mut r = new_record();
|
||||
|
||||
let req_labels = RequestLabels::new(&req);
|
||||
let rsp_labels = ResponseLabels::new(&rsp, None);
|
||||
|
@ -235,8 +205,6 @@ mod test {
|
|||
let lock = r.metrics.lock().expect("lock");
|
||||
assert!(lock.requests.get(&req_labels).is_none());
|
||||
assert!(lock.responses.get(&rsp_labels).is_none());
|
||||
assert_eq!(r.transports.open_total(&server_transport), 0);
|
||||
assert_eq!(r.transports.open_total(&client_transport), 0);
|
||||
}
|
||||
|
||||
for e in &events {
|
||||
|
@ -267,26 +235,6 @@ mod test {
|
|||
.assert_gt_exactly(200, 0)
|
||||
.assert_lt_exactly(200, 0);
|
||||
}
|
||||
|
||||
use super::transport::Eos;
|
||||
let transport_duration: u64 = 30_000 * 1_000;
|
||||
let t = r.transports;
|
||||
|
||||
assert_eq!(t.open_total(&server_transport), 1);
|
||||
assert_eq!(t.rx_tx_bytes_total(&server_transport), (4321, 4321));
|
||||
assert_eq!(t.close_total(&server_transport, Eos::Clean), 1);
|
||||
t.connection_durations(&server_transport, Eos::Clean)
|
||||
.assert_bucket_exactly(transport_duration, 1)
|
||||
.assert_gt_exactly(transport_duration, 0)
|
||||
.assert_lt_exactly(transport_duration, 0);
|
||||
|
||||
assert_eq!(t.open_total(&client_transport), 1);
|
||||
assert_eq!(t.rx_tx_bytes_total(&client_transport), (4321, 4321));
|
||||
assert_eq!(t.close_total(&server_transport, Eos::Clean), 1);
|
||||
t.connection_durations(&server_transport, Eos::Clean)
|
||||
.assert_bucket_exactly(transport_duration, 1)
|
||||
.assert_gt_exactly(transport_duration, 0)
|
||||
.assert_lt_exactly(transport_duration, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ mod process;
|
|||
pub mod sensor;
|
||||
pub mod tap;
|
||||
pub mod tls_config_reload;
|
||||
mod transport;
|
||||
pub mod transport;
|
||||
|
||||
use self::errno::Errno;
|
||||
pub use self::event::Event;
|
||||
|
@ -33,11 +33,17 @@ pub fn new(
|
|||
start_time: SystemTime,
|
||||
metrics_retain_idle: Duration,
|
||||
taps: &Arc<Mutex<tap::Taps>>,
|
||||
) -> (Sensors, tls_config_reload::Sensor, ServeMetrics) {
|
||||
) -> (Sensors, transport::Registry, tls_config_reload::Sensor, ServeMetrics) {
|
||||
let process = process::Report::new(start_time);
|
||||
let (transport_registry, transport_report) = transport::new();
|
||||
let (tls_config_sensor, tls_config_fmt) = tls_config_reload::new();
|
||||
|
||||
let (record, serve) = metrics::new(metrics_retain_idle, process, tls_config_fmt);
|
||||
let (record, serve) = metrics::new(
|
||||
metrics_retain_idle,
|
||||
process,
|
||||
transport_report,
|
||||
tls_config_fmt
|
||||
);
|
||||
let s = Sensors::new(record, taps);
|
||||
(s, tls_config_sensor, serve)
|
||||
(s, transport_registry, tls_config_sensor, serve)
|
||||
}
|
||||
|
|
|
@ -1,22 +1,16 @@
|
|||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Instant;
|
||||
|
||||
use http::{Request, Response};
|
||||
use tokio_connect;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tower_service::NewService;
|
||||
use tower_h2::Body;
|
||||
|
||||
use ctx;
|
||||
use telemetry::{event, metrics, tap};
|
||||
use transport::Connection;
|
||||
use transparency::ClientError;
|
||||
|
||||
pub mod http;
|
||||
mod transport;
|
||||
|
||||
pub use self::http::{Http, NewHttp};
|
||||
pub use self::transport::{Connect, Transport};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct Inner {
|
||||
|
@ -62,27 +56,6 @@ impl Sensors {
|
|||
Sensors(None)
|
||||
}
|
||||
|
||||
pub fn accept<T>(
|
||||
&self,
|
||||
io: T,
|
||||
opened_at: Instant,
|
||||
ctx: &Arc<ctx::transport::Server>,
|
||||
) -> Transport<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite,
|
||||
{
|
||||
debug!("server connection open");
|
||||
let ctx = Arc::new(ctx::transport::Ctx::Server(Arc::clone(ctx)));
|
||||
Transport::open(io, opened_at, Handle(self.0.clone()), ctx)
|
||||
}
|
||||
|
||||
pub fn connect<C>(&self, connect: C, ctx: &Arc<ctx::transport::Client>) -> Connect<C>
|
||||
where
|
||||
C: tokio_connect::Connect<Connected = Connection>,
|
||||
{
|
||||
Connect::new(connect, Handle(self.0.clone()), ctx)
|
||||
}
|
||||
|
||||
pub fn http<N, A, B>(
|
||||
&self,
|
||||
new_service: N,
|
||||
|
|
|
@ -1,252 +0,0 @@
|
|||
use bytes::Buf;
|
||||
use futures::{Async, Future, Poll};
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio_connect;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use transport::{Connection, Peek};
|
||||
use ctx;
|
||||
use telemetry::event;
|
||||
|
||||
/// Wraps a transport with telemetry.
|
||||
#[derive(Debug)]
|
||||
pub struct Transport<T>(T, Option<Inner>);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Inner {
|
||||
handle: super::Handle,
|
||||
ctx: Arc<ctx::transport::Ctx>,
|
||||
opened_at: Instant,
|
||||
|
||||
rx_bytes: u64,
|
||||
tx_bytes: u64,
|
||||
}
|
||||
|
||||
/// Builds client transports with telemetry.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Connect<C> {
|
||||
underlying: C,
|
||||
handle: super::Handle,
|
||||
ctx: Arc<ctx::transport::Client>,
|
||||
}
|
||||
|
||||
/// Adds telemetry to a pending client transport.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Connecting<C: tokio_connect::Connect> {
|
||||
underlying: C::Future,
|
||||
handle: super::Handle,
|
||||
ctx: Arc<ctx::transport::Client>,
|
||||
}
|
||||
|
||||
// === impl Transport ===
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> Transport<T> {
|
||||
/// Wraps a transport with telemetry and emits a transport open event.
|
||||
pub(super) fn open(
|
||||
io: T,
|
||||
opened_at: Instant,
|
||||
mut handle: super::Handle,
|
||||
ctx: Arc<ctx::transport::Ctx>,
|
||||
) -> Self {
|
||||
handle.send(|| event::Event::TransportOpen(Arc::clone(&ctx)));
|
||||
|
||||
Transport(
|
||||
io,
|
||||
Some(Inner {
|
||||
ctx,
|
||||
handle,
|
||||
opened_at,
|
||||
rx_bytes: 0,
|
||||
tx_bytes: 0,
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
/// Wraps an operation on the underlying transport with error telemetry.
|
||||
///
|
||||
/// If the transport operation results in a non-recoverable error, a transport close
|
||||
/// event is emitted.
|
||||
fn sense_err<F, U>(&mut self, op: F) -> io::Result<U>
|
||||
where
|
||||
F: FnOnce(&mut T) -> io::Result<U>,
|
||||
{
|
||||
match op(&mut self.0) {
|
||||
Ok(v) => Ok(v),
|
||||
Err(e) => {
|
||||
if e.kind() != io::ErrorKind::WouldBlock {
|
||||
if let Some(Inner {
|
||||
mut handle,
|
||||
ctx,
|
||||
opened_at,
|
||||
rx_bytes,
|
||||
tx_bytes,
|
||||
}) = self.1.take()
|
||||
{
|
||||
let errno = e.raw_os_error();
|
||||
handle.send(move || {
|
||||
let duration = opened_at.elapsed();
|
||||
let ev = event::TransportClose {
|
||||
duration,
|
||||
clean: false,
|
||||
errno,
|
||||
rx_bytes,
|
||||
tx_bytes,
|
||||
};
|
||||
event::Event::TransportClose(ctx, ev)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Transport<T> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(Inner {
|
||||
mut handle,
|
||||
ctx,
|
||||
opened_at,
|
||||
rx_bytes,
|
||||
tx_bytes,
|
||||
}) = self.1.take()
|
||||
{
|
||||
handle.send(move || {
|
||||
let duration = opened_at.elapsed();
|
||||
let ev = event::TransportClose {
|
||||
clean: true,
|
||||
errno: None,
|
||||
duration,
|
||||
rx_bytes,
|
||||
tx_bytes,
|
||||
};
|
||||
event::Event::TransportClose(ctx, ev)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> io::Read for Transport<T> {
|
||||
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
|
||||
let bytes = self.sense_err(move |io| io.read(buf))?;
|
||||
|
||||
if let Some(inner) = self.1.as_mut() {
|
||||
inner.rx_bytes += bytes as u64;
|
||||
}
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> io::Write for Transport<T> {
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.sense_err(|io| io.flush())
|
||||
}
|
||||
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let bytes = self.sense_err(move |io| io.write(buf))?;
|
||||
|
||||
if let Some(inner) = self.1.as_mut() {
|
||||
inner.tx_bytes += bytes as u64;
|
||||
}
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> AsyncRead for Transport<T> {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
self.0.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> AsyncWrite for Transport<T> {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
self.sense_err(|io| io.shutdown())
|
||||
}
|
||||
|
||||
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
||||
let bytes = try_ready!(self.sense_err(|io| io.write_buf(buf)));
|
||||
|
||||
if let Some(inner) = self.1.as_mut() {
|
||||
inner.tx_bytes += bytes as u64;
|
||||
}
|
||||
|
||||
Ok(Async::Ready(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + Peek> Peek for Transport<T> {
|
||||
fn poll_peek(&mut self) -> Poll<usize, io::Error> {
|
||||
self.sense_err(|io| io.poll_peek())
|
||||
}
|
||||
|
||||
fn peeked(&self) -> &[u8] {
|
||||
self.0.peeked()
|
||||
}
|
||||
}
|
||||
|
||||
// === impl Connect ===
|
||||
|
||||
impl<C> Connect<C>
|
||||
where
|
||||
C: tokio_connect::Connect<Connected = Connection>,
|
||||
{
|
||||
/// Returns a `Connect` to `addr` and `handle`.
|
||||
pub(super) fn new(
|
||||
underlying: C,
|
||||
handle: super::Handle,
|
||||
ctx: &Arc<ctx::transport::Client>,
|
||||
) -> Self {
|
||||
Connect {
|
||||
underlying,
|
||||
handle,
|
||||
ctx: Arc::clone(ctx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> tokio_connect::Connect for Connect<C>
|
||||
where
|
||||
C: tokio_connect::Connect<Connected = Connection>,
|
||||
{
|
||||
type Connected = Transport<C::Connected>;
|
||||
type Error = C::Error;
|
||||
type Future = Connecting<C>;
|
||||
|
||||
fn connect(&self) -> Self::Future {
|
||||
Connecting {
|
||||
underlying: self.underlying.connect(),
|
||||
handle: self.handle.clone(),
|
||||
ctx: Arc::clone(&self.ctx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === impl Connecting ===
|
||||
|
||||
impl<C> Future for Connecting<C>
|
||||
where
|
||||
C: tokio_connect::Connect<Connected = Connection>,
|
||||
{
|
||||
type Item = Transport<C::Connected>;
|
||||
type Error = C::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let io = try_ready!(self.underlying.poll());
|
||||
debug!("client connection open");
|
||||
let ctx = ctx::transport::Client::new(
|
||||
self.ctx.proxy,
|
||||
&self.ctx.remote,
|
||||
self.ctx.metadata.clone(),
|
||||
io.tls_status(),
|
||||
);
|
||||
let ctx = Arc::new(ctx.into());
|
||||
let trans = Transport::open(io, Instant::now(), self.handle.clone(), ctx);
|
||||
Ok(trans.into())
|
||||
}
|
||||
}
|
|
@ -37,7 +37,7 @@ impl Taps {
|
|||
|
||||
///
|
||||
pub(super) fn inspect(&mut self, ev: &Event) {
|
||||
if !ev.is_http() || self.by_id.is_empty() {
|
||||
if self.by_id.is_empty() {
|
||||
return;
|
||||
}
|
||||
debug!("inspect taps={:?} event={:?}", self.by_id.keys().collect::<Vec<_>>(), ev);
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
use bytes::Buf;
|
||||
use futures::{Async, Future, Poll};
|
||||
use std::io;
|
||||
use tokio_connect;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
use transport::{Connection, Peek};
|
||||
|
||||
use super::{NewSensor, Sensor, Eos};
|
||||
|
||||
/// Wraps a transport with telemetry.
|
||||
#[derive(Debug)]
|
||||
pub struct Io<T> {
|
||||
io: T,
|
||||
sensor: Sensor,
|
||||
}
|
||||
|
||||
/// Builds client transports with telemetry.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Connect<C> {
|
||||
underlying: C,
|
||||
new_sensor: NewSensor,
|
||||
}
|
||||
|
||||
/// Adds telemetry to a pending client transport.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Connecting<C: tokio_connect::Connect> {
|
||||
underlying: C::Future,
|
||||
new_sensor: Option<NewSensor>,
|
||||
}
|
||||
|
||||
// === impl Io ===
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> Io<T> {
|
||||
pub(super) fn new(io: T, sensor: Sensor) -> Self {
|
||||
Self { io, sensor }
|
||||
}
|
||||
|
||||
/// Wraps an operation on the underlying transport with error telemetry.
|
||||
///
|
||||
/// If the transport operation results in a non-recoverable error, record a
|
||||
/// transport closure.
|
||||
fn sense_err<F, U>(&mut self, op: F) -> io::Result<U>
|
||||
where
|
||||
F: FnOnce(&mut T) -> io::Result<U>,
|
||||
{
|
||||
match op(&mut self.io) {
|
||||
Ok(v) => Ok(v),
|
||||
Err(e) => {
|
||||
if e.kind() != io::ErrorKind::WouldBlock {
|
||||
let eos = Eos::Error { errno: e.raw_os_error().map(|e| e.into()) };
|
||||
self.sensor.record_close(eos);
|
||||
}
|
||||
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> io::Read for Io<T> {
|
||||
fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
|
||||
let bytes = self.sense_err(move |io| io.read(buf))?;
|
||||
self.sensor.record_read(bytes);
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> io::Write for Io<T> {
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.sense_err(|io| io.flush())
|
||||
}
|
||||
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
let bytes = self.sense_err(move |io| io.write(buf))?;
|
||||
self.sensor.record_write(bytes);
|
||||
|
||||
Ok(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> AsyncRead for Io<T> {
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
self.io.prepare_uninitialized_buffer(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite> AsyncWrite for Io<T> {
|
||||
fn shutdown(&mut self) -> Poll<(), io::Error> {
|
||||
self.sense_err(|io| io.shutdown())
|
||||
}
|
||||
|
||||
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
|
||||
let bytes = try_ready!(self.sense_err(|io| io.write_buf(buf)));
|
||||
self.sensor.record_write(bytes);
|
||||
|
||||
Ok(Async::Ready(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + Peek> Peek for Io<T> {
|
||||
fn poll_peek(&mut self) -> Poll<usize, io::Error> {
|
||||
self.sense_err(|io| io.poll_peek())
|
||||
}
|
||||
|
||||
fn peeked(&self) -> &[u8] {
|
||||
self.io.peeked()
|
||||
}
|
||||
}
|
||||
|
||||
// === impl Connect ===
|
||||
|
||||
impl<C> Connect<C>
|
||||
where
|
||||
C: tokio_connect::Connect<Connected = Connection>,
|
||||
{
|
||||
/// Returns a `Connect` to `addr` and `handle`.
|
||||
pub(super) fn new(underlying: C, new_sensor: NewSensor) -> Self {
|
||||
Self { underlying, new_sensor }
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> tokio_connect::Connect for Connect<C>
|
||||
where
|
||||
C: tokio_connect::Connect<Connected = Connection>,
|
||||
{
|
||||
type Connected = Io<C::Connected>;
|
||||
type Error = C::Error;
|
||||
type Future = Connecting<C>;
|
||||
|
||||
fn connect(&self) -> Self::Future {
|
||||
Connecting {
|
||||
underlying: self.underlying.connect(),
|
||||
new_sensor: Some(self.new_sensor.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === impl Connecting ===
|
||||
|
||||
impl<C> Future for Connecting<C>
|
||||
where
|
||||
C: tokio_connect::Connect<Connected = Connection>,
|
||||
{
|
||||
type Item = Io<C::Connected>;
|
||||
type Error = C::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let io = try_ready!(self.underlying.poll());
|
||||
debug!("client connection open");
|
||||
|
||||
let sensor = self.new_sensor.take()
|
||||
.expect("future must not be polled after ready")
|
||||
.new_sensor();
|
||||
let t = Io::new(io, sensor);
|
||||
Ok(t.into())
|
||||
}
|
||||
}
|
|
@ -1,17 +1,24 @@
|
|||
use indexmap::IndexMap;
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use std::time::Instant;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio_connect;
|
||||
|
||||
use ctx;
|
||||
use telemetry::Errno;
|
||||
use telemetry::metrics::{
|
||||
latency,
|
||||
prom::{FmtLabels, FmtMetrics},
|
||||
prom::{FmtLabels, FmtMetric, FmtMetrics, Metric},
|
||||
Counter,
|
||||
Gauge,
|
||||
Histogram,
|
||||
};
|
||||
use transport::Connection;
|
||||
|
||||
mod io;
|
||||
|
||||
pub use self::io::{Connect, Connecting, Io};
|
||||
|
||||
metrics! {
|
||||
tcp_open_total: Counter { "Total count of opened connections" },
|
||||
|
@ -32,16 +39,15 @@ pub fn new() -> (Registry, Report) {
|
|||
#[derive(Debug, Default)]
|
||||
pub struct Report(Arc<Mutex<Inner>>);
|
||||
|
||||
/// Supports recording telemetry metrics.
|
||||
#[derive(Clone, Debug)]
|
||||
/// Instruments transports to record telemetry.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct Registry(Arc<Mutex<Inner>>);
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct Inner(IndexMap<Key, Metrics>);
|
||||
|
||||
/// Describes the dimensions across which transport metrics are aggregated.
|
||||
/// Describes a class of transport.
|
||||
///
|
||||
/// Implements `fmt::Display` to render a comma-separated list of key-value pairs.
|
||||
/// A `Metrics` type exists for each unique `Key`.
|
||||
///
|
||||
/// Implements `FmtLabels`.
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||
struct Key {
|
||||
proxy: ctx::Proxy,
|
||||
|
@ -49,7 +55,10 @@ struct Key {
|
|||
tls_status: ctx::transport::TlsStatus,
|
||||
}
|
||||
|
||||
/// Holds all of the metrics for a class of transport.
|
||||
/// Stores a class of transport's metrics.
|
||||
///
|
||||
/// TODO We should probaby use AtomicUsize for most of these counters so that
|
||||
/// simple increments don't require a lock. Especially for read|write_bytes_total.
|
||||
#[derive(Debug, Default)]
|
||||
struct Metrics {
|
||||
open_total: Counter,
|
||||
|
@ -60,9 +69,11 @@ struct Metrics {
|
|||
by_eos: IndexMap<Eos, EosMetrics>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||
enum Peer { Src, Dst }
|
||||
|
||||
/// Describes a class of transport end.
|
||||
///
|
||||
/// An `EosMetrics` type exists for each unique `Key` and `Eos` pair.
|
||||
///
|
||||
/// Implements `FmtLabels`.
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||
pub enum Eos {
|
||||
Clean,
|
||||
|
@ -78,110 +89,157 @@ struct EosMetrics {
|
|||
connection_duration: Histogram<latency::Ms>,
|
||||
}
|
||||
|
||||
/// Tracks the state of a single instance of `Io` throughout its lifetime.
|
||||
#[derive(Debug)]
|
||||
struct Sensor {
|
||||
metrics: Option<Arc<Mutex<Metrics>>>,
|
||||
opened_at: Instant,
|
||||
}
|
||||
|
||||
/// Lazily builds instances of `Sensor`.
|
||||
#[derive(Clone, Debug)]
|
||||
struct NewSensor(Option<Arc<Mutex<Metrics>>>);
|
||||
|
||||
/// Shares state between `Report` and `Registry`.
|
||||
#[derive(Debug, Default)]
|
||||
struct Inner(IndexMap<Key, Arc<Mutex<Metrics>>>);
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
|
||||
enum Peer {
|
||||
/// Represents the side of the proxy that accepts connections.
|
||||
Src,
|
||||
/// Represents the side of the proxy that opens connections.
|
||||
Dst,
|
||||
}
|
||||
|
||||
// ===== impl Inner =====
|
||||
|
||||
impl Inner {
|
||||
fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
}
|
||||
|
||||
/// Iterates over all metrics.
|
||||
fn iter(&self) -> impl Iterator<Item = (&Key, &Metrics)> {
|
||||
fn iter(&self) -> impl Iterator<Item = (&Key, MutexGuard<Metrics>)> {
|
||||
self.0.iter()
|
||||
.filter_map(|(k, l)| l.lock().ok().map(move |m| (k, m)))
|
||||
}
|
||||
|
||||
/// Iterates over all end-of-stream metrics.
|
||||
fn iter_eos(&self) -> impl Iterator<Item = ((&Key, &Eos), &EosMetrics)> {
|
||||
self.iter()
|
||||
.flat_map(|(k, t)| {
|
||||
t.by_eos.iter().map(move |(e, m)| ((k ,e), m))
|
||||
})
|
||||
/// Formats a metric across all instances of `Metrics` in the registry.
|
||||
fn fmt_by<F, M>(&self, f: &mut fmt::Formatter, metric: Metric<M>, get_metric: F)
|
||||
-> fmt::Result
|
||||
where
|
||||
F: Fn(&Metrics) -> &M,
|
||||
M: FmtMetric,
|
||||
{
|
||||
for (key, m) in self.iter() {
|
||||
get_metric(&*m).fmt_metric_labeled(f, metric.name, key)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_or_default(&mut self, k: Key) -> &mut Metrics {
|
||||
self.0.entry(k).or_insert_with(|| Metrics::default())
|
||||
/// Formats a metric across all instances of `EosMetrics` in the registry.
|
||||
fn fmt_eos_by<F, M>(&self, f: &mut fmt::Formatter, metric: Metric<M>, get_metric: F)
|
||||
-> fmt::Result
|
||||
where
|
||||
F: Fn(&EosMetrics) -> &M,
|
||||
M: FmtMetric,
|
||||
{
|
||||
for (key, metrics) in self.iter() {
|
||||
for (eos, m) in (*metrics).by_eos.iter() {
|
||||
get_metric(&*m).fmt_metric_labeled(f, metric.name, (key, eos))?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_or_default(&mut self, k: Key) -> &Arc<Mutex<Metrics>> {
|
||||
self.0.entry(k).or_insert_with(|| Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Registry =====
|
||||
|
||||
impl Registry {
|
||||
pub fn open(&mut self, ctx: &ctx::transport::Ctx) {
|
||||
let mut inner = match self.0.lock() {
|
||||
Err(_) => return,
|
||||
Ok(lock) => lock,
|
||||
};
|
||||
|
||||
let metrics = inner.get_or_default(Key::new(ctx));
|
||||
metrics.open_total.incr();
|
||||
metrics.open_connections.incr();
|
||||
pub fn new_connect<C>(&self, ctx: &ctx::transport::Client, inner: C) -> Connect<C>
|
||||
where
|
||||
C: tokio_connect::Connect<Connected = Connection>,
|
||||
{
|
||||
let metrics = match self.0.lock() {
|
||||
Ok(mut inner) => {
|
||||
Some(inner.get_or_default(Key::client(ctx)).clone())
|
||||
}
|
||||
Err(_) => {
|
||||
error!("unable to lock metrics registry");
|
||||
None
|
||||
}
|
||||
};
|
||||
Connect::new(inner, NewSensor(metrics))
|
||||
}
|
||||
|
||||
pub fn close(
|
||||
&mut self,
|
||||
ctx: &ctx::transport::Ctx,
|
||||
eos: Eos,
|
||||
duration: Duration,
|
||||
rx: u64,
|
||||
tx: u64,
|
||||
) {
|
||||
let mut inner = match self.0.lock() {
|
||||
Err(_) => return,
|
||||
Ok(lock) => lock,
|
||||
pub fn accept<T>(&self, ctx: &ctx::transport::Server, io: T) -> Io<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite,
|
||||
{
|
||||
let metrics = match self.0.lock() {
|
||||
Ok(mut inner) => Some(inner.get_or_default(Key::server(ctx)).clone()),
|
||||
Err(_) => {
|
||||
error!("unable to lock metrics registry");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let key = Key::new(ctx);
|
||||
let metrics = inner.get_or_default(key);
|
||||
metrics.open_connections.decr();
|
||||
metrics.read_bytes_total += rx;
|
||||
metrics.write_bytes_total += tx;
|
||||
|
||||
let class = metrics.by_eos
|
||||
.entry(eos)
|
||||
.or_insert_with(|| EosMetrics::default());
|
||||
class.close_total.incr();
|
||||
class.connection_duration.add(duration);
|
||||
Io::new(io, Sensor::open(metrics))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl Registry {
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn open_total(&self, ctx: &ctx::transport::Ctx) -> u64 {
|
||||
self.0.lock().unwrap().0
|
||||
.get(&Key::new(ctx))
|
||||
.map(|m| m.open_total.into())
|
||||
.map(|m| m.lock().unwrap().open_total.into())
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
// #[cfg(test)]
|
||||
// pub fn open_connections(&self, ctx: &ctx::transport::Ctx) -> u64 {
|
||||
// self.0.lock().unwrap().0
|
||||
// .get(&Key::new(ctx))
|
||||
// .map(|m| m.open_connections.into())
|
||||
// .map(|m| m.lock().unwrap().open_connections.into())
|
||||
// .unwrap_or(0)
|
||||
// }
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn rx_tx_bytes_total(&self, ctx: &ctx::transport::Ctx) -> (u64, u64) {
|
||||
self.0.lock().unwrap().0
|
||||
.get(&Key::new(ctx))
|
||||
.map(|m| (m.read_bytes_total.into(), m.write_bytes_total.into()))
|
||||
.map(|m| {
|
||||
let m = m.lock().unwrap();
|
||||
(m.read_bytes_total.into(), m.write_bytes_total.into())
|
||||
})
|
||||
.unwrap_or((0, 0))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn close_total(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> u64 {
|
||||
self.0.lock().unwrap().0
|
||||
.get(&Key::new(ctx))
|
||||
.and_then(move |m| m.by_eos.get(&eos).map(|m| m.close_total.into()))
|
||||
.and_then(move |m| m.lock().unwrap().by_eos.get(&eos).map(|m| m.close_total.into()))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn connection_durations(&self, ctx: &ctx::transport::Ctx, eos: Eos) -> Histogram<latency::Ms> {
|
||||
self.0.lock().unwrap().0
|
||||
.get(&Key::new(ctx))
|
||||
.and_then(move |m| m.by_eos.get(&eos).map(|m| m.connection_duration.clone()))
|
||||
.and_then(move |m| {
|
||||
m.lock().unwrap().by_eos.get(&eos).map(|m| m.connection_duration.clone())
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Report =====
|
||||
|
||||
impl FmtMetrics for Report {
|
||||
fn fmt_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let metrics = match self.0.lock() {
|
||||
|
@ -194,63 +252,141 @@ impl FmtMetrics for Report {
|
|||
}
|
||||
|
||||
tcp_open_total.fmt_help(f)?;
|
||||
tcp_open_total.fmt_scopes(f, metrics.iter(), |m| &m.open_total)?;
|
||||
metrics.fmt_by(f, tcp_open_total, |m| &m.open_total)?;
|
||||
|
||||
tcp_open_connections.fmt_help(f)?;
|
||||
tcp_open_connections.fmt_scopes(f, metrics.iter(), |m| &m.open_connections)?;
|
||||
metrics.fmt_by(f, tcp_open_connections, |m| &m.open_connections)?;
|
||||
|
||||
tcp_read_bytes_total.fmt_help(f)?;
|
||||
tcp_read_bytes_total.fmt_scopes(f, metrics.iter(), |m| &m.read_bytes_total)?;
|
||||
metrics.fmt_by(f, tcp_read_bytes_total, |m| &m.read_bytes_total)?;
|
||||
|
||||
tcp_write_bytes_total.fmt_help(f)?;
|
||||
tcp_write_bytes_total.fmt_scopes(f, metrics.iter(), |m| &m.write_bytes_total)?;
|
||||
metrics.fmt_by(f, tcp_write_bytes_total, |m| &m.write_bytes_total)?;
|
||||
|
||||
tcp_close_total.fmt_help(f)?;
|
||||
tcp_close_total.fmt_scopes(f, metrics.iter_eos(), |e| &e.close_total)?;
|
||||
metrics.fmt_eos_by(f, tcp_close_total, |e| &e.close_total)?;
|
||||
|
||||
tcp_connection_duration_ms.fmt_help(f)?;
|
||||
tcp_connection_duration_ms.fmt_scopes(f, metrics.iter_eos(), |e| &e.connection_duration)?;
|
||||
metrics.fmt_eos_by(f, tcp_connection_duration_ms, |e| &e.connection_duration)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Sensor =====
|
||||
|
||||
impl Sensor {
|
||||
|
||||
pub fn open(metrics: Option<Arc<Mutex<Metrics>>>) -> Self {
|
||||
if let Some(ref m) = metrics {
|
||||
if let Ok(mut m) = m.lock() {
|
||||
m.open_total.incr();
|
||||
m.open_connections.incr();
|
||||
}
|
||||
}
|
||||
Self {
|
||||
metrics,
|
||||
opened_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_read(&mut self, sz: usize) {
|
||||
if let Some(ref m) = self.metrics {
|
||||
if let Ok(mut m) = m.lock() {
|
||||
m.read_bytes_total += sz as u64;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_write(&mut self, sz: usize) {
|
||||
if let Some(ref m) = self.metrics {
|
||||
if let Ok(mut m) = m.lock() {
|
||||
m.write_bytes_total += sz as u64;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record_close(&mut self, eos: Eos) {
|
||||
// When closed, the metrics structure is dropped so that no further
|
||||
// updates can occur (i.e. so that an additional close won't be recorded
|
||||
// on Drop).
|
||||
if let Some(m) = self.metrics.take() {
|
||||
let duration = self.opened_at.elapsed();
|
||||
if let Ok(mut m) = m.lock() {
|
||||
m.open_connections.decr();
|
||||
|
||||
let class = m.by_eos.entry(eos).or_insert_with(|| EosMetrics::default());
|
||||
class.close_total.incr();
|
||||
class.connection_duration.add(duration);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Sensor {
|
||||
fn drop(&mut self) {
|
||||
self.record_close(Eos::Clean)
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl NewSensor =====
|
||||
|
||||
impl NewSensor {
|
||||
fn new_sensor(mut self) -> Sensor {
|
||||
Sensor::open(self.0.take())
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Key =====
|
||||
|
||||
impl Key {
|
||||
#[cfg(test)]
|
||||
fn new(ctx: &ctx::transport::Ctx) -> Self {
|
||||
match ctx {
|
||||
ctx::transport::Ctx::Client(ctx) => Self::client(ctx),
|
||||
ctx::transport::Ctx::Server(ctx) => Self::server(ctx),
|
||||
}
|
||||
}
|
||||
|
||||
fn client(ctx: &ctx::transport::Client) -> Self {
|
||||
Self {
|
||||
proxy: ctx.proxy,
|
||||
peer: Peer::Dst,
|
||||
tls_status: ctx.tls_status.into(),
|
||||
}
|
||||
}
|
||||
|
||||
fn server(ctx: &ctx::transport::Server) -> Self {
|
||||
Self {
|
||||
proxy: ctx.proxy,
|
||||
peer: Peer::Src,
|
||||
tls_status: ctx.tls_status.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl FmtLabels for Key {
|
||||
fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
((self.proxy, self.peer), self.tls_status).fmt_labels(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl Key {
|
||||
fn new(ctx: &ctx::transport::Ctx) -> Self {
|
||||
Self {
|
||||
proxy: ctx.proxy(),
|
||||
peer: match *ctx {
|
||||
ctx::transport::Ctx::Server(_) => Peer::Src,
|
||||
ctx::transport::Ctx::Client(_) => Peer::Dst,
|
||||
},
|
||||
tls_status: ctx.tls_status().into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
// ===== impl Peer =====
|
||||
|
||||
impl FmtLabels for Peer {
|
||||
fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self {
|
||||
match self {
|
||||
Peer::Src => f.pad("peer=\"src\""),
|
||||
Peer::Dst => f.pad("peer=\"dst\""),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ===== impl Eos =====
|
||||
|
||||
impl FmtLabels for Eos {
|
||||
fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match &self {
|
||||
match self {
|
||||
Eos::Clean => f.pad("classification=\"success\""),
|
||||
Eos::Error { errno } => {
|
||||
f.pad("classification=\"failure\"")?;
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::{
|
|||
fmt,
|
||||
net::SocketAddr,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::{future::Either, Future};
|
||||
|
@ -18,7 +18,7 @@ use transport::{Connection, Peek};
|
|||
use ctx::Proxy as ProxyCtx;
|
||||
use ctx::transport::{Server as ServerCtx};
|
||||
use drain;
|
||||
use telemetry::Sensors;
|
||||
use telemetry;
|
||||
use transport::GetOriginalDst;
|
||||
use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc};
|
||||
use super::protocol::Protocol;
|
||||
|
@ -47,7 +47,7 @@ where
|
|||
listen_addr: SocketAddr,
|
||||
new_service: S,
|
||||
proxy_ctx: ProxyCtx,
|
||||
sensors: Sensors,
|
||||
transport_registry: telemetry::transport::Registry,
|
||||
tcp: tcp::Proxy,
|
||||
log: ::logging::Server,
|
||||
}
|
||||
|
@ -76,7 +76,7 @@ where
|
|||
pub fn new(
|
||||
listen_addr: SocketAddr,
|
||||
proxy_ctx: ProxyCtx,
|
||||
sensors: Sensors,
|
||||
transport_registry: telemetry::transport::Registry,
|
||||
get_orig_dst: G,
|
||||
stack: S,
|
||||
tcp_connect_timeout: Duration,
|
||||
|
@ -84,7 +84,7 @@ where
|
|||
drain_signal: drain::Watch,
|
||||
) -> Self {
|
||||
let recv_body_svc = HttpBodyNewSvc::new(stack.clone());
|
||||
let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone());
|
||||
let tcp = tcp::Proxy::new(tcp_connect_timeout, transport_registry.clone());
|
||||
let log = ::logging::Server::proxy(proxy_ctx, listen_addr);
|
||||
Server {
|
||||
disable_protocol_detection_ports,
|
||||
|
@ -99,7 +99,7 @@ where
|
|||
listen_addr,
|
||||
new_service: stack,
|
||||
proxy_ctx,
|
||||
sensors,
|
||||
transport_registry,
|
||||
tcp,
|
||||
log,
|
||||
}
|
||||
|
@ -118,8 +118,6 @@ where
|
|||
pub fn serve(&self, connection: Connection, remote_addr: SocketAddr)
|
||||
-> impl Future<Item=(), Error=()>
|
||||
{
|
||||
let opened_at = Instant::now();
|
||||
|
||||
// create Server context
|
||||
let orig_dst = connection.original_dst_addr(&self.get_orig_dst);
|
||||
let local_addr = connection.local_addr().unwrap_or(self.listen_addr);
|
||||
|
@ -134,7 +132,7 @@ where
|
|||
.with_remote(remote_addr);
|
||||
|
||||
// record telemetry
|
||||
let io = self.sensors.accept(connection, opened_at, &srv_ctx);
|
||||
let io = self.transport_registry.accept(&srv_ctx, connection);
|
||||
|
||||
// We are using the port from the connection's SO_ORIGINAL_DST to
|
||||
// determine whether to skip protocol detection, not any port that
|
||||
|
|
|
@ -13,7 +13,7 @@ use ctx::transport::{
|
|||
Client as ClientCtx,
|
||||
Server as ServerCtx,
|
||||
};
|
||||
use telemetry::Sensors;
|
||||
use telemetry;
|
||||
use timeout::Timeout;
|
||||
use transport::{self, tls};
|
||||
use ctx::transport::TlsStatus;
|
||||
|
@ -22,15 +22,18 @@ use ctx::transport::TlsStatus;
|
|||
#[derive(Debug, Clone)]
|
||||
pub struct Proxy {
|
||||
connect_timeout: Duration,
|
||||
sensors: Sensors,
|
||||
transport_registry: telemetry::transport::Registry,
|
||||
}
|
||||
|
||||
impl Proxy {
|
||||
/// Create a new TCP `Proxy`.
|
||||
pub fn new(connect_timeout: Duration, sensors: Sensors) -> Self {
|
||||
pub fn new(
|
||||
connect_timeout: Duration,
|
||||
transport_registry: telemetry::transport::Registry
|
||||
) -> Self {
|
||||
Self {
|
||||
connect_timeout,
|
||||
sensors,
|
||||
transport_registry,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -72,7 +75,7 @@ impl Proxy {
|
|||
transport::Connect::new(orig_dst, tls),
|
||||
self.connect_timeout,
|
||||
);
|
||||
let connect = self.sensors.connect(c, &client_ctx);
|
||||
let connect = self.transport_registry.new_connect(&client_ctx, c);
|
||||
|
||||
future::Either::A(connect.connect()
|
||||
.map_err(move |e| error!("tcp connect error to {}: {:?}", orig_dst, e))
|
||||
|
|
Loading…
Reference in New Issue