From d6cd34fc98e9e2d0a8fd3d631b9577a51a6101f1 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 11 Jan 2018 16:00:38 -0800 Subject: [PATCH] Add Protocol field to Transports telemetry (#138) See #132. This PR adds a protocol field to the ClientTransport and ServerTransport messages, and modifies the proxy to report a value for this field (currently, it's only ever HTTP). Currently, HTTP/1 and HTTP/2 are collapsed into one Protocol variant, see #132 (comment). I expect that we can treat H1 as a subset of H2 as far as metrics goes. Note that after discussing it with @klingerf, I learned that the control plane telemetry API currently does not do anything with the ClientTransport and ServerTransport messages, so beyond regenerating the protobuf-generated code, no controller changes were actually necessary. As we actually add metrics to TCP transports, we'll want to make some additions to the telemetry API to ingest these metrics. If any metrics are shared between HTTP and raw TCP transports (say, bytes sent), we'll want to differentiate between them in Prometheus. All the metrics that the control plane currently ingests from telemetry reports are likely to be HTTP-specific (requests, responses, response latencies), or at least, do not apply to raw TCP. Actually adding metrics to raw TCP transports will probably have to wait until there are raw TCP transports implemented in the proxy... Signed-off-by: Eliza Weisman --- proxy/src/bind.rs | 8 ++++++-- proxy/src/control/pb.rs | 9 ++++++++- proxy/src/ctx/transport.rs | 13 ++++++++++++- proxy/src/inbound.rs | 25 ++++++++++++++++++++++--- proxy/src/lib.rs | 10 +++++++++- proxy/src/telemetry/metrics.rs | 15 ++++++++++++--- 6 files changed, 69 insertions(+), 11 deletions(-) diff --git a/proxy/src/bind.rs b/proxy/src/bind.rs index c04d74097..05d539bfb 100644 --- a/proxy/src/bind.rs +++ b/proxy/src/bind.rs @@ -134,13 +134,17 @@ where { pub fn bind_service(&self, addr: &SocketAddr) -> Service { trace!("bind_service {}", addr); - let client_ctx = ctx::transport::Client::new(&self.ctx, addr); + let client_ctx = ctx::transport::Client::new( + &self.ctx, + addr, + control::pb::proxy::common::Protocol::Http, + ); // Map a socket address to an HTTP/2.0 connection. let connect = { let c = Timeout::new( transport::Connect::new(*addr, &self.executor), - self.connect_timeout, + self.connect_timeout, &self.executor, ); diff --git a/proxy/src/control/pb.rs b/proxy/src/control/pb.rs index 8790f8326..feffd8743 100644 --- a/proxy/src/control/pb.rs +++ b/proxy/src/control/pb.rs @@ -2,7 +2,7 @@ #![cfg_attr(feature = "cargo-clippy", allow(clippy))] use std::error::Error; -use std::fmt; +use std::{fmt, hash}; use std::sync::Arc; use http; @@ -396,6 +396,13 @@ impl<'a> From<&'a ::std::net::SocketAddr> for common::TcpAddress { } } +impl hash::Hash for common::Protocol { + // it's necessary to implement Hash for Protocol as it's a field on + // ctx::Transport, which derives Hash. + fn hash(&self, state: &mut H) { + (*self as i32).hash(state) + } +} fn pb_duration(d: &::std::time::Duration) -> ::prost_types::Duration { let seconds = if d.as_secs() > ::std::i64::MAX as u64 { diff --git a/proxy/src/ctx/transport.rs b/proxy/src/ctx/transport.rs index 9cc87639b..a9b542eb4 100644 --- a/proxy/src/ctx/transport.rs +++ b/proxy/src/ctx/transport.rs @@ -1,6 +1,8 @@ use std::net::SocketAddr; use std::sync::Arc; +use control::pb::common::Protocol; + use ctx; #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -16,6 +18,7 @@ pub struct Server { pub remote: SocketAddr, pub local: SocketAddr, pub orig_dst: Option, + pub protocol: Protocol, } /// Identifies a connection from the proxy to another process. @@ -23,6 +26,7 @@ pub struct Server { pub struct Client { pub proxy: Arc, pub remote: SocketAddr, + pub protocol: Protocol, } impl Ctx { @@ -40,12 +44,14 @@ impl Server { local: &SocketAddr, remote: &SocketAddr, orig_dst: &Option, + protocol: Protocol, ) -> Arc { let s = Server { proxy: Arc::clone(proxy), local: *local, remote: *remote, orig_dst: *orig_dst, + protocol: protocol, }; Arc::new(s) @@ -53,10 +59,15 @@ impl Server { } impl Client { - pub fn new(proxy: &Arc, remote: &SocketAddr) -> Arc { + pub fn new( + proxy: &Arc, + remote: &SocketAddr, + protocol: Protocol, + ) -> Arc { let c = Client { proxy: Arc::clone(proxy), remote: *remote, + protocol: protocol, }; Arc::new(c) diff --git a/proxy/src/inbound.rs b/proxy/src/inbound.rs index dfb3eadab..2c2f2b6db 100644 --- a/proxy/src/inbound.rs +++ b/proxy/src/inbound.rs @@ -117,6 +117,7 @@ mod tests { use tower_router::Recognize; use super::Inbound; + use control::pb::common::Protocol; use bind::Bind; use ctx; @@ -182,7 +183,13 @@ mod tests { let mut req = http::Request::new(()); req.extensions_mut() - .insert(ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst))); + .insert(ctx::transport::Server::new( + &ctx, + &local, + &remote, + &Some(orig_dst), + Protocol::Http, + )); let rec = if Inbound::<()>::same_addr(&orig_dst, &local) { None @@ -204,7 +211,13 @@ mod tests { let mut req = http::Request::new(()); req.extensions_mut() - .insert(ctx::transport::Server::new(&ctx, &local, &remote, &None)); + .insert(ctx::transport::Server::new( + &ctx, + &local, + &remote, + &None, + Protocol::Http, + )); inbound.recognize(&req) == default } @@ -230,7 +243,13 @@ mod tests { let mut req = http::Request::new(()); req.extensions_mut() - .insert(ctx::transport::Server::new(&ctx, &local, &remote, &Some(local))); + .insert(ctx::transport::Server::new( + &ctx, + &local, + &remote, + &Some(local), + Protocol::Http, + )); inbound.recognize(&req) == default } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index db77f0607..f5be3265b 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -317,8 +317,16 @@ where let opened_at = Instant::now(); let orig_dst = connection.original_dst_addr(); let local_addr = connection.local_addr().unwrap_or(listen_addr); + // TODO: detect protocol. + let protocol = control::pb::common::Protocol::Http; let srv_ctx = - ctx::transport::Server::new(&proxy_ctx, &local_addr, &remote_addr, &orig_dst); + ctx::transport::Server::new( + &proxy_ctx, + &local_addr, + &remote_addr, + &orig_dst, + protocol, + ); let io = sensors.accept(connection, opened_at, &srv_ctx); diff --git a/proxy/src/telemetry/metrics.rs b/proxy/src/telemetry/metrics.rs index 260fdf687..30ec8c88e 100644 --- a/proxy/src/telemetry/metrics.rs +++ b/proxy/src/telemetry/metrics.rs @@ -6,7 +6,7 @@ use std::time::Duration; use http; use ordermap::OrderMap; -use control::pb::common::{HttpMethod, TcpAddress}; +use control::pb::common::{HttpMethod, TcpAddress, Protocol}; use control::pb::proxy::telemetry::{ eos_ctx, ClientTransport, @@ -81,6 +81,7 @@ enum End { #[derive(Debug, Default)] struct TransportStats { + protocol: Protocol, connects: u32, disconnects: Vec, } @@ -199,11 +200,17 @@ impl Metrics { let source = s.remote.ip(); self.sources .entry(source) - .or_insert_with(TransportStats::default) + .or_insert_with(|| TransportStats { + protocol: s.protocol, + ..TransportStats::default() + }) } ctx::transport::Ctx::Client(ref c) => self.destinations .entry(c.remote) - .or_insert_with(TransportStats::default), + .or_insert_with(|| TransportStats { + protocol: c.protocol, + ..TransportStats::default() + }) } } @@ -216,6 +223,7 @@ impl Metrics { source_ip: Some(ip.into()), connects: stats.connects, disconnects: stats.disconnects, + protocol: stats.protocol as i32, }) } @@ -227,6 +235,7 @@ impl Metrics { }), connects: stats.connects, disconnects: stats.disconnects, + protocol: stats.protocol as i32, }); }