From 2e296dbf69376a831f9f5c060e7c97e01b4fb8a2 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 27 Apr 2018 14:18:23 -0700 Subject: [PATCH] proxy: wrap connections in Transport sensor before peeking (#851) In case there are any errors while peeking the connection to do protocol detection, the sensors will now be in place to detect them. Besides just errors, this will also allow reporting about connections that are accepted, but then immediately closed. Additionally: - add write_buf implementation for Transport sensor, can help performance for http1/http2 - add better logs for tcp connections errors - add printlns for when tests fail Signed-off-by: Sean McArthur --- proxy/src/bind.rs | 2 - proxy/src/connection.rs | 49 ++++++++++----- proxy/src/ctx/transport.rs | 19 +----- proxy/src/inbound.rs | 5 +- proxy/src/telemetry/sensor/transport.rs | 24 ++++++- proxy/src/transparency/server.rs | 83 +++++++------------------ proxy/src/transparency/tcp.rs | 4 +- proxy/tests/support/mod.rs | 11 ++++ proxy/tests/support/proxy.rs | 20 +++++- proxy/tests/support/server.rs | 23 ++++++- proxy/tests/support/tcp.rs | 17 +++-- 11 files changed, 144 insertions(+), 113 deletions(-) diff --git a/proxy/src/bind.rs b/proxy/src/bind.rs index 63f4f386b..c47fb2615 100644 --- a/proxy/src/bind.rs +++ b/proxy/src/bind.rs @@ -13,7 +13,6 @@ use tower; use tower_h2; use tower_reconnect::Reconnect; -use conduit_proxy_controller_grpc; use conduit_proxy_router::Reuse; use control; @@ -185,7 +184,6 @@ where let client_ctx = ctx::transport::Client::new( &self.ctx, &addr, - conduit_proxy_controller_grpc::common::Protocol::Http, ep.dst_labels().cloned(), ); diff --git a/proxy/src/connection.rs b/proxy/src/connection.rs index 9d6ebfbd3..c35665ab1 100644 --- a/proxy/src/connection.rs +++ b/proxy/src/connection.rs @@ -36,6 +36,17 @@ pub enum Connection { Plain(PlaintextSocket), } +/// A trait describing that a type can peek (such as MSG_PEEK). +pub trait Peek { + fn peek(&mut self, buf: &mut [u8]) -> io::Result; +} + +/// A future of when some `Peek` fulfills with some bytes. +#[derive(Debug)] +pub struct PeekFuture { + inner: Option<(T, B)>, +} + // ===== impl BoundPort ===== impl BoundPort { @@ -105,12 +116,6 @@ impl Connection { self.socket().local_addr() } - pub fn peek_future>(self, buf: T) -> Peek { - Peek { - inner: Some((self, buf)) - } - } - // This must never be made public so that in the future `Connection` can // control access to the plaintext socket for TLS, to ensure no private // data is accidentally writen to the socket and to ensure no unprotected @@ -190,23 +195,37 @@ impl AsyncWrite for Connection { } } -// impl Peek +impl Peek for Connection { + fn peek(&mut self, buf: &mut [u8]) -> io::Result { + use self::Connection::*; -pub struct Peek { - inner: Option<(Connection, T)>, + match *self { + Plain(ref mut t) => t.peek(buf), + } + } } -impl> Future for Peek { - type Item = (Connection, T, usize); +// impl PeekFuture + +impl> PeekFuture { + pub fn new(io: T, buf: B) -> Self { + PeekFuture { + inner: Some((io, buf)), + } + } +} + +impl> Future for PeekFuture { + type Item = (T, B, usize); type Error = std::io::Error; fn poll(&mut self) -> Poll { - let (conn, mut buf) = self.inner.take().expect("polled after completed"); - match conn.socket().peek(buf.as_mut()) { - Ok(n) => Ok(Async::Ready((conn, buf, n))), + let (mut io, mut buf) = self.inner.take().expect("polled after completed"); + match io.peek(buf.as_mut()) { + Ok(n) => Ok(Async::Ready((io, buf, n))), Err(e) => match e.kind() { std::io::ErrorKind::WouldBlock => { - self.inner = Some((conn, buf)); + self.inner = Some((io, buf)); Ok(Async::NotReady) }, _ => Err(e) diff --git a/proxy/src/ctx/transport.rs b/proxy/src/ctx/transport.rs index 0ca97d9ba..2cd225c6c 100644 --- a/proxy/src/ctx/transport.rs +++ b/proxy/src/ctx/transport.rs @@ -2,8 +2,6 @@ use std::{cmp, hash}; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; -use conduit_proxy_controller_grpc::common::Protocol; - use control::discovery::DstLabelsWatch; use ctx; @@ -20,7 +18,6 @@ pub struct Server { pub remote: SocketAddr, pub local: SocketAddr, pub orig_dst: Option, - pub protocol: Protocol, } /// Identifies a connection from the proxy to another process. @@ -28,7 +25,6 @@ pub struct Server { pub struct Client { pub proxy: Arc, pub remote: SocketAddr, - pub protocol: Protocol, pub dst_labels: Option, } @@ -39,13 +35,6 @@ impl Ctx { Ctx::Server(ref ctx) => &ctx.proxy, } } - - pub fn protocol(&self) -> Protocol { - match *self { - Ctx::Client(ref ctx) => ctx.protocol, - Ctx::Server(ref ctx) => ctx.protocol, - } - } } impl Server { @@ -54,14 +43,12 @@ impl Server { local: &SocketAddr, remote: &SocketAddr, orig_dst: &Option, - protocol: Protocol, ) -> Arc { let s = Server { proxy: Arc::clone(proxy), local: *local, remote: *remote, orig_dst: *orig_dst, - protocol: protocol, }; Arc::new(s) @@ -95,13 +82,11 @@ impl Client { pub fn new( proxy: &Arc, remote: &SocketAddr, - protocol: Protocol, dst_labels: Option, ) -> Arc { let c = Client { proxy: Arc::clone(proxy), remote: *remote, - protocol, dst_labels, }; @@ -113,7 +98,6 @@ impl hash::Hash for Client { fn hash(&self, state: &mut H) { self.proxy.hash(state); self.remote.hash(state); - self.protocol.hash(state); // ignore dst_labels } } @@ -121,8 +105,7 @@ impl hash::Hash for Client { impl cmp::PartialEq for Client { fn eq(&self, other: &Self) -> bool { self.proxy.eq(&other.proxy) && - self.remote.eq(&other.remote) && - self.protocol.eq(&other.protocol) + self.remote.eq(&other.remote) } } diff --git a/proxy/src/inbound.rs b/proxy/src/inbound.rs index 4241f4b02..c23a05bfa 100644 --- a/proxy/src/inbound.rs +++ b/proxy/src/inbound.rs @@ -94,7 +94,6 @@ mod tests { use conduit_proxy_router::Recognize; use super::Inbound; - use conduit_proxy_controller_grpc::common::Protocol; use bind::{self, Bind, Host}; use ctx; @@ -114,7 +113,7 @@ mod tests { let inbound = new_inbound(None, &ctx); - let srv_ctx = ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst), Protocol::Http); + let srv_ctx = ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst)); let rec = srv_ctx.orig_dst_if_not_local().map(|addr| bind::Protocol::Http1(Host::NoAuthority).into_key(addr) @@ -143,7 +142,6 @@ mod tests { &local, &remote, &None, - Protocol::Http, )); inbound.recognize(&req) == default.map(|addr| @@ -179,7 +177,6 @@ mod tests { &local, &remote, &Some(local), - Protocol::Http, )); inbound.recognize(&req) == default.map(|addr| diff --git a/proxy/src/telemetry/sensor/transport.rs b/proxy/src/telemetry/sensor/transport.rs index 38bffdc4d..3f56ef4c4 100644 --- a/proxy/src/telemetry/sensor/transport.rs +++ b/proxy/src/telemetry/sensor/transport.rs @@ -1,10 +1,12 @@ -use futures::{Future, Poll}; +use bytes::Buf; +use futures::{Async, Future, Poll}; use std::io; use std::sync::Arc; use std::time::Instant; use tokio_connect; use tokio_io::{AsyncRead, AsyncWrite}; +use connection::Peek; use ctx; use telemetry::event; @@ -132,7 +134,7 @@ impl io::Read for Transport { let bytes = self.sense_err(move |io| io.read(buf))?; if let Some(inner) = self.1.as_mut() { - inner.rx_bytes += bytes as u64; + inner.rx_bytes += bytes as u64; } Ok(bytes) @@ -148,7 +150,7 @@ impl io::Write for Transport { let bytes = self.sense_err(move |io| io.write(buf))?; if let Some(inner) = self.1.as_mut() { - inner.tx_bytes += bytes as u64; + inner.tx_bytes += bytes as u64; } Ok(bytes) @@ -165,6 +167,22 @@ impl AsyncWrite for Transport { fn shutdown(&mut self) -> Poll<(), io::Error> { self.sense_err(|io| io.shutdown()) } + + fn write_buf(&mut self, buf: &mut B) -> Poll { + let bytes = try_ready!(self.sense_err(|io| io.write_buf(buf))); + + if let Some(inner) = self.1.as_mut() { + inner.tx_bytes += bytes as u64; + } + + Ok(Async::Ready(bytes)) + } +} + +impl Peek for Transport { + fn peek(&mut self, buf: &mut [u8]) -> io::Result { + self.sense_err(|io| io.peek(buf)) + } } // === impl Connect === diff --git a/proxy/src/transparency/server.rs b/proxy/src/transparency/server.rs index f7c3ce503..d66b3ae87 100644 --- a/proxy/src/transparency/server.rs +++ b/proxy/src/transparency/server.rs @@ -8,11 +8,11 @@ use http; use hyper; use indexmap::IndexSet; use tokio_core::reactor::Handle; +use tokio_io::{AsyncRead, AsyncWrite}; use tower::NewService; use tower_h2; -use conduit_proxy_controller_grpc::common; -use connection::Connection; +use connection::{Connection, PeekFuture}; use ctx::Proxy as ProxyCtx; use ctx::transport::{Server as ServerCtx}; use drain; @@ -99,6 +99,15 @@ where // create Server context let orig_dst = connection.original_dst_addr(&self.get_orig_dst); let local_addr = connection.local_addr().unwrap_or(self.listen_addr); + let srv_ctx = ServerCtx::new( + &self.proxy_ctx, + &local_addr, + &remote_addr, + &orig_dst, + ); + + // record telemetry + let io = self.sensors.accept(connection, opened_at, &srv_ctx); // We are using the port from the connection's SO_ORIGINAL_DST to // determine whether to skip protocol detection, not any port that @@ -113,44 +122,25 @@ where trace!("protocol detection disabled for {:?}", orig_dst); let fut = tcp_serve( &self.tcp, - connection, + io, + srv_ctx, self.drain_signal.clone(), - &self.sensors, - opened_at, - &self.proxy_ctx, - LocalAddr(&local_addr), - RemoteAddr(&remote_addr), - OrigDst(&orig_dst), ); self.executor.spawn(fut); return; } // try to sniff protocol - let proxy_ctx = self.proxy_ctx.clone(); let sniff = [0u8; 32]; - let sensors = self.sensors.clone(); let h1 = self.h1.clone(); let h2 = self.h2.clone(); let tcp = self.tcp.clone(); let new_service = self.new_service.clone(); let drain_signal = self.drain_signal.clone(); - let fut = connection - .peek_future(sniff) - .map_err(|_| ()) - .and_then(move |(connection, sniff, n)| -> Box> { + let fut = PeekFuture::new(io, sniff) + .map_err(|e| debug!("peek error: {}", e)) + .and_then(move |(io, sniff, n)| -> Box> { if let Some(proto) = Protocol::detect(&sniff[..n]) { - let srv_ctx = ServerCtx::new( - &proxy_ctx, - &local_addr, - &remote_addr, - &orig_dst, - common::Protocol::Http, - ); - - // record telemetry - let io = sensors.accept(connection, opened_at, &srv_ctx); - match proto { Protocol::Http1 => { trace!("transparency detected HTTP/1"); @@ -187,14 +177,9 @@ where trace!("transparency did not detect protocol, treating as TCP"); tcp_serve( &tcp, - connection, + io, + srv_ctx, drain_signal, - &sensors, - opened_at, - &proxy_ctx, - LocalAddr(&local_addr), - RemoteAddr(&remote_addr), - OrigDst(&orig_dst), ) } }); @@ -203,37 +188,13 @@ where } } -// These newtypes act as a form of keyword arguments. -// -// It should be easier to notice when wrapping `LocalAddr(remote_addr)` at -// the call site, then simply passing multiple socket addr arguments. -struct LocalAddr<'a>(&'a SocketAddr); -struct RemoteAddr<'a>(&'a SocketAddr); -struct OrigDst<'a>(&'a Option); - -fn tcp_serve( +fn tcp_serve( tcp: &tcp::Proxy, - connection: Connection, + io: T, + srv_ctx: Arc, drain_signal: drain::Watch, - sensors: &Sensors, - opened_at: Instant, - proxy_ctx: &Arc, - local_addr: LocalAddr, - remote_addr: RemoteAddr, - orig_dst: OrigDst, ) -> Box> { - let srv_ctx = ServerCtx::new( - proxy_ctx, - local_addr.0, - remote_addr.0, - orig_dst.0, - common::Protocol::Tcp, - ); - - // record telemetry - let tcp_in = sensors.accept(connection, opened_at, &srv_ctx); - - let fut = tcp.serve(tcp_in, srv_ctx); + let fut = tcp.serve(io, srv_ctx); // There's nothing to do when drain is signaled, we just have to hope // the sockets finish soon. However, the drain signal still needs to diff --git a/proxy/src/transparency/tcp.rs b/proxy/src/transparency/tcp.rs index 9784fc0f8..4c3e96398 100644 --- a/proxy/src/transparency/tcp.rs +++ b/proxy/src/transparency/tcp.rs @@ -8,7 +8,6 @@ use tokio_connect::Connect; use tokio_core::reactor::Handle; use tokio_io::{AsyncRead, AsyncWrite}; -use conduit_proxy_controller_grpc::common; use ctx::transport::{Client as ClientCtx, Server as ServerCtx}; use telemetry::Sensors; use timeout::Timeout; @@ -60,7 +59,6 @@ impl Proxy { let client_ctx = ClientCtx::new( &srv_ctx.proxy, &orig_dst, - common::Protocol::Tcp, None, ); let c = Timeout::new( @@ -74,7 +72,7 @@ impl Proxy { .map_err(|e| debug!("tcp connect error: {:?}", e)) .and_then(move |tcp_out| { Duplex::new(tcp_in, tcp_out) - .map_err(|e| debug!("tcp error: {}", e)) + .map_err(|e| error!("tcp duplex error: {}", e)) }); Box::new(fut) } diff --git a/proxy/tests/support/mod.rs b/proxy/tests/support/mod.rs index 48ab3dc82..bbf47d078 100644 --- a/proxy/tests/support/mod.rs +++ b/proxy/tests/support/mod.rs @@ -154,6 +154,17 @@ pub fn s(bytes: &[u8]) -> &str { ::std::str::from_utf8(bytes.as_ref()).unwrap() } +/// The Rust test runner creates a thread per unit test, naming it after +/// the function name. If still in that thread, this can be useful to allow +/// associating test logs with a specific test, since tests *can* be run in +/// parallel. +pub fn thread_name() -> String { + ::std::thread::current() + .name() + .unwrap_or("") + .to_owned() +} + #[test] #[should_panic] fn assert_eventually() { diff --git a/proxy/tests/support/proxy.rs b/proxy/tests/support/proxy.rs index 312d1f3b9..39b1cf417 100644 --- a/proxy/tests/support/proxy.rs +++ b/proxy/tests/support/proxy.rs @@ -175,8 +175,9 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening { rx = Box::new(rx.select(fut).then(|_| Ok(()))); } + let tname = format!("support proxy (test={})", thread_name()); ::std::thread::Builder::new() - .name("support proxy".into()) + .name(tname) .spawn(move || { let _c = controller; @@ -220,6 +221,23 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening { let (control_addr, inbound_addr, outbound_addr, metrics_addr) = running_rx.wait().unwrap(); + // printlns will show if the test fails... + println!( + "proxy running; control={}, inbound={}{}, outbound={}{}, metrics={}", + control_addr, + inbound_addr, + inbound + .as_ref() + .map(|i| format!(" (SO_ORIGINAL_DST={})", i.addr)) + .unwrap_or_else(String::new), + outbound_addr, + outbound + .as_ref() + .map(|o| format!(" (SO_ORIGINAL_DST={})", o.addr)) + .unwrap_or_else(String::new), + metrics_addr, + ); + Listening { control: control_addr, inbound: inbound_addr, diff --git a/proxy/tests/support/server.rs b/proxy/tests/support/server.rs index a1b9f224b..7419bc646 100644 --- a/proxy/tests/support/server.rs +++ b/proxy/tests/support/server.rs @@ -38,6 +38,12 @@ impl Listening { } } +impl Drop for Listening { + fn drop(&mut self) { + println!("server Listening dropped; addr={}", self.addr); + } +} + impl Server { fn new(run: Run) -> Self { Server { @@ -103,7 +109,13 @@ impl Server { let (addr_tx, addr_rx) = oneshot::channel(); let conn_count = Arc::new(AtomicUsize::from(0)); let srv_conn_count = Arc::clone(&conn_count); - ::std::thread::Builder::new().name("support server".into()).spawn(move || { + let version = self.version; + let tname = format!( + "support {:?} server (test={})", + version, + thread_name(), + ); + ::std::thread::Builder::new().name(tname).spawn(move || { let mut core = Core::new().unwrap(); let reactor = core.handle(); @@ -172,6 +184,13 @@ impl Server { let addr = addr_rx.wait().expect("addr"); + // printlns will show if the test fails... + println!( + "{:?} server running; addr={}", + version, + addr, + ); + Listening { addr, shutdown: tx, @@ -180,7 +199,7 @@ impl Server { } } -#[derive(Debug)] +#[derive(Clone, Copy, Debug)] enum Run { Http1, Http2, diff --git a/proxy/tests/support/tcp.rs b/proxy/tests/support/tcp.rs index 21c646cdb..481b91641 100644 --- a/proxy/tests/support/tcp.rs +++ b/proxy/tests/support/tcp.rs @@ -59,6 +59,7 @@ impl TcpClient { let tx = rx.map_err(|_| panic!("tcp connect dropped")) .wait() .unwrap(); + println!("tcp client (addr={}): connected", self.addr); TcpConn { addr: self.addr, tx, @@ -110,6 +111,7 @@ impl TcpConn { } pub fn try_read(&self) -> io::Result> { + println!("tcp client (addr={}): read", self.addr); let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send((None, tx)); rx.map_err(|_| panic!("tcp read dropped")) @@ -119,6 +121,7 @@ impl TcpConn { } pub fn write>>(&self, buf: T) { + println!("tcp client (addr={}): write", self.addr); let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send((Some(buf.into()), tx)); rx.map_err(|_| panic!("tcp write dropped")) @@ -131,8 +134,8 @@ impl TcpConn { fn run_client(addr: SocketAddr) -> TcpSender { let (tx, rx) = mpsc::unbounded(); - let thread_name = format!("support tcp client (addr={})", addr); - ::std::thread::Builder::new().name(thread_name).spawn(move || { + let tname = format!("support tcp client (addr={})", addr); + ::std::thread::Builder::new().name(tname).spawn(move || { let mut core = Core::new().unwrap(); let handle = core.handle(); @@ -188,6 +191,8 @@ fn run_client(addr: SocketAddr) -> TcpSender { }).map_err(|e| println!("client error: {:?}", e)); core.run(work).unwrap(); }).unwrap(); + + println!("tcp client (addr={}) thread running", addr); tx } @@ -199,8 +204,8 @@ fn run_server(tcp: TcpServer) -> server::Listening { let any_port = SocketAddr::from(([127, 0, 0, 1], 0)); let std_listener = StdTcpListener::bind(&any_port).expect("bind"); let addr = std_listener.local_addr().expect("local_addr"); - let thread_name = format!("support tcp server (addr={})", addr); - ::std::thread::Builder::new().name(thread_name).spawn(move || { + let tname = format!("support tcp server (addr={})", addr); + ::std::thread::Builder::new().name(tname).spawn(move || { let mut core = Core::new().unwrap(); let reactor = core.handle(); @@ -233,6 +238,10 @@ fn run_server(tcp: TcpServer) -> server::Listening { }).unwrap(); started_rx.wait().expect("support tcp server started"); + + // printlns will show if the test fails... + println!("tcp server (addr={}): running", addr); + server::Listening { addr, shutdown: tx,