diff --git a/proxy/src/bind.rs b/proxy/src/bind.rs index 63f4f386b..c47fb2615 100644 --- a/proxy/src/bind.rs +++ b/proxy/src/bind.rs @@ -13,7 +13,6 @@ use tower; use tower_h2; use tower_reconnect::Reconnect; -use conduit_proxy_controller_grpc; use conduit_proxy_router::Reuse; use control; @@ -185,7 +184,6 @@ where let client_ctx = ctx::transport::Client::new( &self.ctx, &addr, - conduit_proxy_controller_grpc::common::Protocol::Http, ep.dst_labels().cloned(), ); diff --git a/proxy/src/connection.rs b/proxy/src/connection.rs index 9d6ebfbd3..c35665ab1 100644 --- a/proxy/src/connection.rs +++ b/proxy/src/connection.rs @@ -36,6 +36,17 @@ pub enum Connection { Plain(PlaintextSocket), } +/// A trait describing that a type can peek (such as MSG_PEEK). +pub trait Peek { + fn peek(&mut self, buf: &mut [u8]) -> io::Result; +} + +/// A future of when some `Peek` fulfills with some bytes. +#[derive(Debug)] +pub struct PeekFuture { + inner: Option<(T, B)>, +} + // ===== impl BoundPort ===== impl BoundPort { @@ -105,12 +116,6 @@ impl Connection { self.socket().local_addr() } - pub fn peek_future>(self, buf: T) -> Peek { - Peek { - inner: Some((self, buf)) - } - } - // This must never be made public so that in the future `Connection` can // control access to the plaintext socket for TLS, to ensure no private // data is accidentally writen to the socket and to ensure no unprotected @@ -190,23 +195,37 @@ impl AsyncWrite for Connection { } } -// impl Peek +impl Peek for Connection { + fn peek(&mut self, buf: &mut [u8]) -> io::Result { + use self::Connection::*; -pub struct Peek { - inner: Option<(Connection, T)>, + match *self { + Plain(ref mut t) => t.peek(buf), + } + } } -impl> Future for Peek { - type Item = (Connection, T, usize); +// impl PeekFuture + +impl> PeekFuture { + pub fn new(io: T, buf: B) -> Self { + PeekFuture { + inner: Some((io, buf)), + } + } +} + +impl> Future for PeekFuture { + type Item = (T, B, usize); type Error = std::io::Error; fn poll(&mut self) -> Poll { - let (conn, mut buf) = self.inner.take().expect("polled after completed"); - match conn.socket().peek(buf.as_mut()) { - Ok(n) => Ok(Async::Ready((conn, buf, n))), + let (mut io, mut buf) = self.inner.take().expect("polled after completed"); + match io.peek(buf.as_mut()) { + Ok(n) => Ok(Async::Ready((io, buf, n))), Err(e) => match e.kind() { std::io::ErrorKind::WouldBlock => { - self.inner = Some((conn, buf)); + self.inner = Some((io, buf)); Ok(Async::NotReady) }, _ => Err(e) diff --git a/proxy/src/ctx/transport.rs b/proxy/src/ctx/transport.rs index 0ca97d9ba..2cd225c6c 100644 --- a/proxy/src/ctx/transport.rs +++ b/proxy/src/ctx/transport.rs @@ -2,8 +2,6 @@ use std::{cmp, hash}; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; -use conduit_proxy_controller_grpc::common::Protocol; - use control::discovery::DstLabelsWatch; use ctx; @@ -20,7 +18,6 @@ pub struct Server { pub remote: SocketAddr, pub local: SocketAddr, pub orig_dst: Option, - pub protocol: Protocol, } /// Identifies a connection from the proxy to another process. @@ -28,7 +25,6 @@ pub struct Server { pub struct Client { pub proxy: Arc, pub remote: SocketAddr, - pub protocol: Protocol, pub dst_labels: Option, } @@ -39,13 +35,6 @@ impl Ctx { Ctx::Server(ref ctx) => &ctx.proxy, } } - - pub fn protocol(&self) -> Protocol { - match *self { - Ctx::Client(ref ctx) => ctx.protocol, - Ctx::Server(ref ctx) => ctx.protocol, - } - } } impl Server { @@ -54,14 +43,12 @@ impl Server { local: &SocketAddr, remote: &SocketAddr, orig_dst: &Option, - protocol: Protocol, ) -> Arc { let s = Server { proxy: Arc::clone(proxy), local: *local, remote: *remote, orig_dst: *orig_dst, - protocol: protocol, }; Arc::new(s) @@ -95,13 +82,11 @@ impl Client { pub fn new( proxy: &Arc, remote: &SocketAddr, - protocol: Protocol, dst_labels: Option, ) -> Arc { let c = Client { proxy: Arc::clone(proxy), remote: *remote, - protocol, dst_labels, }; @@ -113,7 +98,6 @@ impl hash::Hash for Client { fn hash(&self, state: &mut H) { self.proxy.hash(state); self.remote.hash(state); - self.protocol.hash(state); // ignore dst_labels } } @@ -121,8 +105,7 @@ impl hash::Hash for Client { impl cmp::PartialEq for Client { fn eq(&self, other: &Self) -> bool { self.proxy.eq(&other.proxy) && - self.remote.eq(&other.remote) && - self.protocol.eq(&other.protocol) + self.remote.eq(&other.remote) } } diff --git a/proxy/src/inbound.rs b/proxy/src/inbound.rs index 4241f4b02..c23a05bfa 100644 --- a/proxy/src/inbound.rs +++ b/proxy/src/inbound.rs @@ -94,7 +94,6 @@ mod tests { use conduit_proxy_router::Recognize; use super::Inbound; - use conduit_proxy_controller_grpc::common::Protocol; use bind::{self, Bind, Host}; use ctx; @@ -114,7 +113,7 @@ mod tests { let inbound = new_inbound(None, &ctx); - let srv_ctx = ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst), Protocol::Http); + let srv_ctx = ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst)); let rec = srv_ctx.orig_dst_if_not_local().map(|addr| bind::Protocol::Http1(Host::NoAuthority).into_key(addr) @@ -143,7 +142,6 @@ mod tests { &local, &remote, &None, - Protocol::Http, )); inbound.recognize(&req) == default.map(|addr| @@ -179,7 +177,6 @@ mod tests { &local, &remote, &Some(local), - Protocol::Http, )); inbound.recognize(&req) == default.map(|addr| diff --git a/proxy/src/telemetry/sensor/transport.rs b/proxy/src/telemetry/sensor/transport.rs index 38bffdc4d..3f56ef4c4 100644 --- a/proxy/src/telemetry/sensor/transport.rs +++ b/proxy/src/telemetry/sensor/transport.rs @@ -1,10 +1,12 @@ -use futures::{Future, Poll}; +use bytes::Buf; +use futures::{Async, Future, Poll}; use std::io; use std::sync::Arc; use std::time::Instant; use tokio_connect; use tokio_io::{AsyncRead, AsyncWrite}; +use connection::Peek; use ctx; use telemetry::event; @@ -132,7 +134,7 @@ impl io::Read for Transport { let bytes = self.sense_err(move |io| io.read(buf))?; if let Some(inner) = self.1.as_mut() { - inner.rx_bytes += bytes as u64; + inner.rx_bytes += bytes as u64; } Ok(bytes) @@ -148,7 +150,7 @@ impl io::Write for Transport { let bytes = self.sense_err(move |io| io.write(buf))?; if let Some(inner) = self.1.as_mut() { - inner.tx_bytes += bytes as u64; + inner.tx_bytes += bytes as u64; } Ok(bytes) @@ -165,6 +167,22 @@ impl AsyncWrite for Transport { fn shutdown(&mut self) -> Poll<(), io::Error> { self.sense_err(|io| io.shutdown()) } + + fn write_buf(&mut self, buf: &mut B) -> Poll { + let bytes = try_ready!(self.sense_err(|io| io.write_buf(buf))); + + if let Some(inner) = self.1.as_mut() { + inner.tx_bytes += bytes as u64; + } + + Ok(Async::Ready(bytes)) + } +} + +impl Peek for Transport { + fn peek(&mut self, buf: &mut [u8]) -> io::Result { + self.sense_err(|io| io.peek(buf)) + } } // === impl Connect === diff --git a/proxy/src/transparency/server.rs b/proxy/src/transparency/server.rs index f7c3ce503..d66b3ae87 100644 --- a/proxy/src/transparency/server.rs +++ b/proxy/src/transparency/server.rs @@ -8,11 +8,11 @@ use http; use hyper; use indexmap::IndexSet; use tokio_core::reactor::Handle; +use tokio_io::{AsyncRead, AsyncWrite}; use tower::NewService; use tower_h2; -use conduit_proxy_controller_grpc::common; -use connection::Connection; +use connection::{Connection, PeekFuture}; use ctx::Proxy as ProxyCtx; use ctx::transport::{Server as ServerCtx}; use drain; @@ -99,6 +99,15 @@ where // create Server context let orig_dst = connection.original_dst_addr(&self.get_orig_dst); let local_addr = connection.local_addr().unwrap_or(self.listen_addr); + let srv_ctx = ServerCtx::new( + &self.proxy_ctx, + &local_addr, + &remote_addr, + &orig_dst, + ); + + // record telemetry + let io = self.sensors.accept(connection, opened_at, &srv_ctx); // We are using the port from the connection's SO_ORIGINAL_DST to // determine whether to skip protocol detection, not any port that @@ -113,44 +122,25 @@ where trace!("protocol detection disabled for {:?}", orig_dst); let fut = tcp_serve( &self.tcp, - connection, + io, + srv_ctx, self.drain_signal.clone(), - &self.sensors, - opened_at, - &self.proxy_ctx, - LocalAddr(&local_addr), - RemoteAddr(&remote_addr), - OrigDst(&orig_dst), ); self.executor.spawn(fut); return; } // try to sniff protocol - let proxy_ctx = self.proxy_ctx.clone(); let sniff = [0u8; 32]; - let sensors = self.sensors.clone(); let h1 = self.h1.clone(); let h2 = self.h2.clone(); let tcp = self.tcp.clone(); let new_service = self.new_service.clone(); let drain_signal = self.drain_signal.clone(); - let fut = connection - .peek_future(sniff) - .map_err(|_| ()) - .and_then(move |(connection, sniff, n)| -> Box> { + let fut = PeekFuture::new(io, sniff) + .map_err(|e| debug!("peek error: {}", e)) + .and_then(move |(io, sniff, n)| -> Box> { if let Some(proto) = Protocol::detect(&sniff[..n]) { - let srv_ctx = ServerCtx::new( - &proxy_ctx, - &local_addr, - &remote_addr, - &orig_dst, - common::Protocol::Http, - ); - - // record telemetry - let io = sensors.accept(connection, opened_at, &srv_ctx); - match proto { Protocol::Http1 => { trace!("transparency detected HTTP/1"); @@ -187,14 +177,9 @@ where trace!("transparency did not detect protocol, treating as TCP"); tcp_serve( &tcp, - connection, + io, + srv_ctx, drain_signal, - &sensors, - opened_at, - &proxy_ctx, - LocalAddr(&local_addr), - RemoteAddr(&remote_addr), - OrigDst(&orig_dst), ) } }); @@ -203,37 +188,13 @@ where } } -// These newtypes act as a form of keyword arguments. -// -// It should be easier to notice when wrapping `LocalAddr(remote_addr)` at -// the call site, then simply passing multiple socket addr arguments. -struct LocalAddr<'a>(&'a SocketAddr); -struct RemoteAddr<'a>(&'a SocketAddr); -struct OrigDst<'a>(&'a Option); - -fn tcp_serve( +fn tcp_serve( tcp: &tcp::Proxy, - connection: Connection, + io: T, + srv_ctx: Arc, drain_signal: drain::Watch, - sensors: &Sensors, - opened_at: Instant, - proxy_ctx: &Arc, - local_addr: LocalAddr, - remote_addr: RemoteAddr, - orig_dst: OrigDst, ) -> Box> { - let srv_ctx = ServerCtx::new( - proxy_ctx, - local_addr.0, - remote_addr.0, - orig_dst.0, - common::Protocol::Tcp, - ); - - // record telemetry - let tcp_in = sensors.accept(connection, opened_at, &srv_ctx); - - let fut = tcp.serve(tcp_in, srv_ctx); + let fut = tcp.serve(io, srv_ctx); // There's nothing to do when drain is signaled, we just have to hope // the sockets finish soon. However, the drain signal still needs to diff --git a/proxy/src/transparency/tcp.rs b/proxy/src/transparency/tcp.rs index 9784fc0f8..4c3e96398 100644 --- a/proxy/src/transparency/tcp.rs +++ b/proxy/src/transparency/tcp.rs @@ -8,7 +8,6 @@ use tokio_connect::Connect; use tokio_core::reactor::Handle; use tokio_io::{AsyncRead, AsyncWrite}; -use conduit_proxy_controller_grpc::common; use ctx::transport::{Client as ClientCtx, Server as ServerCtx}; use telemetry::Sensors; use timeout::Timeout; @@ -60,7 +59,6 @@ impl Proxy { let client_ctx = ClientCtx::new( &srv_ctx.proxy, &orig_dst, - common::Protocol::Tcp, None, ); let c = Timeout::new( @@ -74,7 +72,7 @@ impl Proxy { .map_err(|e| debug!("tcp connect error: {:?}", e)) .and_then(move |tcp_out| { Duplex::new(tcp_in, tcp_out) - .map_err(|e| debug!("tcp error: {}", e)) + .map_err(|e| error!("tcp duplex error: {}", e)) }); Box::new(fut) } diff --git a/proxy/tests/support/mod.rs b/proxy/tests/support/mod.rs index 48ab3dc82..bbf47d078 100644 --- a/proxy/tests/support/mod.rs +++ b/proxy/tests/support/mod.rs @@ -154,6 +154,17 @@ pub fn s(bytes: &[u8]) -> &str { ::std::str::from_utf8(bytes.as_ref()).unwrap() } +/// The Rust test runner creates a thread per unit test, naming it after +/// the function name. If still in that thread, this can be useful to allow +/// associating test logs with a specific test, since tests *can* be run in +/// parallel. +pub fn thread_name() -> String { + ::std::thread::current() + .name() + .unwrap_or("") + .to_owned() +} + #[test] #[should_panic] fn assert_eventually() { diff --git a/proxy/tests/support/proxy.rs b/proxy/tests/support/proxy.rs index 312d1f3b9..39b1cf417 100644 --- a/proxy/tests/support/proxy.rs +++ b/proxy/tests/support/proxy.rs @@ -175,8 +175,9 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening { rx = Box::new(rx.select(fut).then(|_| Ok(()))); } + let tname = format!("support proxy (test={})", thread_name()); ::std::thread::Builder::new() - .name("support proxy".into()) + .name(tname) .spawn(move || { let _c = controller; @@ -220,6 +221,23 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening { let (control_addr, inbound_addr, outbound_addr, metrics_addr) = running_rx.wait().unwrap(); + // printlns will show if the test fails... + println!( + "proxy running; control={}, inbound={}{}, outbound={}{}, metrics={}", + control_addr, + inbound_addr, + inbound + .as_ref() + .map(|i| format!(" (SO_ORIGINAL_DST={})", i.addr)) + .unwrap_or_else(String::new), + outbound_addr, + outbound + .as_ref() + .map(|o| format!(" (SO_ORIGINAL_DST={})", o.addr)) + .unwrap_or_else(String::new), + metrics_addr, + ); + Listening { control: control_addr, inbound: inbound_addr, diff --git a/proxy/tests/support/server.rs b/proxy/tests/support/server.rs index a1b9f224b..7419bc646 100644 --- a/proxy/tests/support/server.rs +++ b/proxy/tests/support/server.rs @@ -38,6 +38,12 @@ impl Listening { } } +impl Drop for Listening { + fn drop(&mut self) { + println!("server Listening dropped; addr={}", self.addr); + } +} + impl Server { fn new(run: Run) -> Self { Server { @@ -103,7 +109,13 @@ impl Server { let (addr_tx, addr_rx) = oneshot::channel(); let conn_count = Arc::new(AtomicUsize::from(0)); let srv_conn_count = Arc::clone(&conn_count); - ::std::thread::Builder::new().name("support server".into()).spawn(move || { + let version = self.version; + let tname = format!( + "support {:?} server (test={})", + version, + thread_name(), + ); + ::std::thread::Builder::new().name(tname).spawn(move || { let mut core = Core::new().unwrap(); let reactor = core.handle(); @@ -172,6 +184,13 @@ impl Server { let addr = addr_rx.wait().expect("addr"); + // printlns will show if the test fails... + println!( + "{:?} server running; addr={}", + version, + addr, + ); + Listening { addr, shutdown: tx, @@ -180,7 +199,7 @@ impl Server { } } -#[derive(Debug)] +#[derive(Clone, Copy, Debug)] enum Run { Http1, Http2, diff --git a/proxy/tests/support/tcp.rs b/proxy/tests/support/tcp.rs index 21c646cdb..481b91641 100644 --- a/proxy/tests/support/tcp.rs +++ b/proxy/tests/support/tcp.rs @@ -59,6 +59,7 @@ impl TcpClient { let tx = rx.map_err(|_| panic!("tcp connect dropped")) .wait() .unwrap(); + println!("tcp client (addr={}): connected", self.addr); TcpConn { addr: self.addr, tx, @@ -110,6 +111,7 @@ impl TcpConn { } pub fn try_read(&self) -> io::Result> { + println!("tcp client (addr={}): read", self.addr); let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send((None, tx)); rx.map_err(|_| panic!("tcp read dropped")) @@ -119,6 +121,7 @@ impl TcpConn { } pub fn write>>(&self, buf: T) { + println!("tcp client (addr={}): write", self.addr); let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send((Some(buf.into()), tx)); rx.map_err(|_| panic!("tcp write dropped")) @@ -131,8 +134,8 @@ impl TcpConn { fn run_client(addr: SocketAddr) -> TcpSender { let (tx, rx) = mpsc::unbounded(); - let thread_name = format!("support tcp client (addr={})", addr); - ::std::thread::Builder::new().name(thread_name).spawn(move || { + let tname = format!("support tcp client (addr={})", addr); + ::std::thread::Builder::new().name(tname).spawn(move || { let mut core = Core::new().unwrap(); let handle = core.handle(); @@ -188,6 +191,8 @@ fn run_client(addr: SocketAddr) -> TcpSender { }).map_err(|e| println!("client error: {:?}", e)); core.run(work).unwrap(); }).unwrap(); + + println!("tcp client (addr={}) thread running", addr); tx } @@ -199,8 +204,8 @@ fn run_server(tcp: TcpServer) -> server::Listening { let any_port = SocketAddr::from(([127, 0, 0, 1], 0)); let std_listener = StdTcpListener::bind(&any_port).expect("bind"); let addr = std_listener.local_addr().expect("local_addr"); - let thread_name = format!("support tcp server (addr={})", addr); - ::std::thread::Builder::new().name(thread_name).spawn(move || { + let tname = format!("support tcp server (addr={})", addr); + ::std::thread::Builder::new().name(tname).spawn(move || { let mut core = Core::new().unwrap(); let reactor = core.handle(); @@ -233,6 +238,10 @@ fn run_server(tcp: TcpServer) -> server::Listening { }).unwrap(); started_rx.wait().expect("support tcp server started"); + + // printlns will show if the test fails... + println!("tcp server (addr={}): running", addr); + server::Listening { addr, shutdown: tx,