diff --git a/proxy/src/bind.rs b/proxy/src/bind.rs index c04d74097..05d539bfb 100644 --- a/proxy/src/bind.rs +++ b/proxy/src/bind.rs @@ -134,13 +134,17 @@ where { pub fn bind_service(&self, addr: &SocketAddr) -> Service { trace!("bind_service {}", addr); - let client_ctx = ctx::transport::Client::new(&self.ctx, addr); + let client_ctx = ctx::transport::Client::new( + &self.ctx, + addr, + control::pb::proxy::common::Protocol::Http, + ); // Map a socket address to an HTTP/2.0 connection. let connect = { let c = Timeout::new( transport::Connect::new(*addr, &self.executor), - self.connect_timeout, + self.connect_timeout, &self.executor, ); diff --git a/proxy/src/control/pb.rs b/proxy/src/control/pb.rs index 8790f8326..feffd8743 100644 --- a/proxy/src/control/pb.rs +++ b/proxy/src/control/pb.rs @@ -2,7 +2,7 @@ #![cfg_attr(feature = "cargo-clippy", allow(clippy))] use std::error::Error; -use std::fmt; +use std::{fmt, hash}; use std::sync::Arc; use http; @@ -396,6 +396,13 @@ impl<'a> From<&'a ::std::net::SocketAddr> for common::TcpAddress { } } +impl hash::Hash for common::Protocol { + // it's necessary to implement Hash for Protocol as it's a field on + // ctx::Transport, which derives Hash. + fn hash(&self, state: &mut H) { + (*self as i32).hash(state) + } +} fn pb_duration(d: &::std::time::Duration) -> ::prost_types::Duration { let seconds = if d.as_secs() > ::std::i64::MAX as u64 { diff --git a/proxy/src/ctx/transport.rs b/proxy/src/ctx/transport.rs index 9cc87639b..a9b542eb4 100644 --- a/proxy/src/ctx/transport.rs +++ b/proxy/src/ctx/transport.rs @@ -1,6 +1,8 @@ use std::net::SocketAddr; use std::sync::Arc; +use control::pb::common::Protocol; + use ctx; #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -16,6 +18,7 @@ pub struct Server { pub remote: SocketAddr, pub local: SocketAddr, pub orig_dst: Option, + pub protocol: Protocol, } /// Identifies a connection from the proxy to another process. @@ -23,6 +26,7 @@ pub struct Server { pub struct Client { pub proxy: Arc, pub remote: SocketAddr, + pub protocol: Protocol, } impl Ctx { @@ -40,12 +44,14 @@ impl Server { local: &SocketAddr, remote: &SocketAddr, orig_dst: &Option, + protocol: Protocol, ) -> Arc { let s = Server { proxy: Arc::clone(proxy), local: *local, remote: *remote, orig_dst: *orig_dst, + protocol: protocol, }; Arc::new(s) @@ -53,10 +59,15 @@ impl Server { } impl Client { - pub fn new(proxy: &Arc, remote: &SocketAddr) -> Arc { + pub fn new( + proxy: &Arc, + remote: &SocketAddr, + protocol: Protocol, + ) -> Arc { let c = Client { proxy: Arc::clone(proxy), remote: *remote, + protocol: protocol, }; Arc::new(c) diff --git a/proxy/src/inbound.rs b/proxy/src/inbound.rs index dfb3eadab..2c2f2b6db 100644 --- a/proxy/src/inbound.rs +++ b/proxy/src/inbound.rs @@ -117,6 +117,7 @@ mod tests { use tower_router::Recognize; use super::Inbound; + use control::pb::common::Protocol; use bind::Bind; use ctx; @@ -182,7 +183,13 @@ mod tests { let mut req = http::Request::new(()); req.extensions_mut() - .insert(ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst))); + .insert(ctx::transport::Server::new( + &ctx, + &local, + &remote, + &Some(orig_dst), + Protocol::Http, + )); let rec = if Inbound::<()>::same_addr(&orig_dst, &local) { None @@ -204,7 +211,13 @@ mod tests { let mut req = http::Request::new(()); req.extensions_mut() - .insert(ctx::transport::Server::new(&ctx, &local, &remote, &None)); + .insert(ctx::transport::Server::new( + &ctx, + &local, + &remote, + &None, + Protocol::Http, + )); inbound.recognize(&req) == default } @@ -230,7 +243,13 @@ mod tests { let mut req = http::Request::new(()); req.extensions_mut() - .insert(ctx::transport::Server::new(&ctx, &local, &remote, &Some(local))); + .insert(ctx::transport::Server::new( + &ctx, + &local, + &remote, + &Some(local), + Protocol::Http, + )); inbound.recognize(&req) == default } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index db77f0607..f5be3265b 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -317,8 +317,16 @@ where let opened_at = Instant::now(); let orig_dst = connection.original_dst_addr(); let local_addr = connection.local_addr().unwrap_or(listen_addr); + // TODO: detect protocol. + let protocol = control::pb::common::Protocol::Http; let srv_ctx = - ctx::transport::Server::new(&proxy_ctx, &local_addr, &remote_addr, &orig_dst); + ctx::transport::Server::new( + &proxy_ctx, + &local_addr, + &remote_addr, + &orig_dst, + protocol, + ); let io = sensors.accept(connection, opened_at, &srv_ctx); diff --git a/proxy/src/telemetry/metrics.rs b/proxy/src/telemetry/metrics.rs index 260fdf687..30ec8c88e 100644 --- a/proxy/src/telemetry/metrics.rs +++ b/proxy/src/telemetry/metrics.rs @@ -6,7 +6,7 @@ use std::time::Duration; use http; use ordermap::OrderMap; -use control::pb::common::{HttpMethod, TcpAddress}; +use control::pb::common::{HttpMethod, TcpAddress, Protocol}; use control::pb::proxy::telemetry::{ eos_ctx, ClientTransport, @@ -81,6 +81,7 @@ enum End { #[derive(Debug, Default)] struct TransportStats { + protocol: Protocol, connects: u32, disconnects: Vec, } @@ -199,11 +200,17 @@ impl Metrics { let source = s.remote.ip(); self.sources .entry(source) - .or_insert_with(TransportStats::default) + .or_insert_with(|| TransportStats { + protocol: s.protocol, + ..TransportStats::default() + }) } ctx::transport::Ctx::Client(ref c) => self.destinations .entry(c.remote) - .or_insert_with(TransportStats::default), + .or_insert_with(|| TransportStats { + protocol: c.protocol, + ..TransportStats::default() + }) } } @@ -216,6 +223,7 @@ impl Metrics { source_ip: Some(ip.into()), connects: stats.connects, disconnects: stats.disconnects, + protocol: stats.protocol as i32, }) } @@ -227,6 +235,7 @@ impl Metrics { }), connects: stats.connects, disconnects: stats.disconnects, + protocol: stats.protocol as i32, }); }