From 1af68d3a144983eab85170a6b3573d2b0c9d9c99 Mon Sep 17 00:00:00 2001 From: Brian Smith Date: Thu, 14 Dec 2017 13:19:05 -1000 Subject: [PATCH] Encapsulate listening port connection acceptance logic (#46) Previously every use of `BoundPort` repeated a bunch of logic. Move the repeated logic to `BoundPort` itself. Just remove the no-op handshaking logic; new handshaking logic will be added to `BoundPort` when TLS is added. --- proxy/src/connection.rs | 79 +++++++++++++++++++++++++++-------------- proxy/src/lib.rs | 76 +++++++++++++-------------------------- 2 files changed, 77 insertions(+), 78 deletions(-) diff --git a/proxy/src/connection.rs b/proxy/src/connection.rs index 03257a764..2bc5e7a0a 100644 --- a/proxy/src/connection.rs +++ b/proxy/src/connection.rs @@ -3,11 +3,12 @@ use std; use std::io; use std::net::SocketAddr; use tokio_core; -use tokio_core::net::{Incoming, TcpListener}; +use tokio_core::net::TcpListener; use tokio_core::reactor::Handle; use tokio_io::{AsyncRead, AsyncWrite}; use config::Addr; +use transport; pub type PlaintextSocket = tokio_core::net::TcpStream; @@ -22,13 +23,6 @@ pub enum Connection { Plain(PlaintextSocket), } -/// A connection handshake. -/// -/// Resolves to a connection ready to be used at the next layer. -pub struct Handshake { - plaintext_socket: Option, -} - // ===== impl BoundPort ===== impl BoundPort { @@ -45,19 +39,62 @@ impl BoundPort { self.local_addr } - pub fn listen(self, executor: &Handle) -> Result { - TcpListener::from_listener(self.inner, &self.local_addr, executor) - .map(|listener| listener.incoming()) + // Listen for incoming connections and dispatch them to the handler `f`. + // + // This ensures that every incoming connection has the correct options set. + // In the future it will also ensure that the connection is upgraded with + // TLS when needed. + pub fn listen_and_fold(self, executor: &Handle, initial: T, f: F) + -> Box + 'static> + where + F: Fn(T, (Connection, SocketAddr)) -> Fut + 'static, + T: 'static, + Fut: IntoFuture + 'static { + let fut = TcpListener::from_listener(self.inner, &self.local_addr, &executor) + .expect("from_listener") // TODO: get rid of this `expect()`. + .incoming() + .fold(initial, move |b, (socket, remote_addr)| { + // TODO: On Linux and most other platforms it would be better + // to set the `TCP_NODELAY` option on the bound socket and + // then have the listening sockets inherit it. However, that + // doesn't work on all platforms and also the underlying + // libraries don't have the necessary API for that, so just + // do it here. + if let Err(e) = socket.set_nodelay(true) { + warn!( + "could not set TCP_NODELAY on {:?}/{:?}: {}", + socket.local_addr(), + socket.peer_addr(), + e + ); + } + f(b, (Connection::Plain(socket), remote_addr)) + }); + + Box::new(fut.map(|_| ())) } } // ===== impl Connection ===== impl Connection { - /// Establish a connection backed by the provided `io`. - pub fn handshake(io: PlaintextSocket) -> Handshake { - Handshake { - plaintext_socket: Some(io), + pub fn original_dst_addr(&self) -> Option { + transport::get_original_dst(self.socket()) + } + + pub fn local_addr(&self) -> Result { + self.socket().local_addr() + } + + // This must never be made public so that in the future `Connection` can + // control access to the plaintext socket for TLS, to ensure no private + // data is accidentally writen to the socket and to ensure no unprotected + // data is read from the socket. Each piece of information needed about the + // underlying socket should be exposed by its own minimal accessor function + // as is done above. + fn socket(&self) -> &PlaintextSocket { + match self { + &Connection::Plain(ref socket) => socket } } } @@ -103,15 +140,3 @@ impl AsyncWrite for Connection { } } } - -// ===== impl Handshake ===== - -impl Future for Handshake { - type Item = Connection; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - let plaintext_socket = self.plaintext_socket.take().expect("poll after complete"); - Ok(Connection::Plain(plaintext_socket).into()) - } -} diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index ec58f04a9..422bec827 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -188,7 +188,7 @@ impl Main { Inbound::new(default_addr, bind), ctx, sensors.clone(), - executor.clone(), + &executor, ); ::logging::context_future("inbound", fut) }; @@ -210,7 +210,7 @@ impl Main { Outbound::new(bind, control), ctx, sensors, - executor, + &executor, ); ::logging::context_future("outbound", fut) }; @@ -273,7 +273,7 @@ fn serve( recognize: R, proxy_ctx: Arc, sensors: telemetry::Sensors, - executor: Handle, + executor: &Handle, ) -> Box + 'static> where B: Body + Default + 'static, @@ -302,42 +302,29 @@ where h2_builder, ::logging::context_executor(("serve", listen_addr), executor.clone()), ); - let incoming = bound_port.listen(&executor).expect("listen"); - let f = incoming.fold( - (server, proxy_ctx, sensors, executor), - move |(server, proxy_ctx, sensors, executor), (socket, remote_addr)| { - if let Err(e) = socket.set_nodelay(true) { - warn!( - "could not set TCP_NODELAY on {:?}/{:?}: {}", - socket.local_addr(), - socket.peer_addr(), - e - ); - } - + bound_port.listen_and_fold( + executor, + (server, proxy_ctx, sensors, executor.clone()), + move |(server, proxy_ctx, sensors, executor), (connection, remote_addr)| { let opened_at = Instant::now(); - let orig_dst = transport::get_original_dst(&socket); - let local_addr = socket.local_addr().unwrap_or(listen_addr); + let orig_dst = connection.original_dst_addr(); + let local_addr = connection.local_addr().unwrap_or(listen_addr); let srv_ctx = ctx::transport::Server::new(&proxy_ctx, &local_addr, &remote_addr, &orig_dst); - connection::Connection::handshake(socket).map(move |session| { - let io = sensors.accept(session, opened_at, &srv_ctx); + let io = sensors.accept(connection, opened_at, &srv_ctx); - // TODO session context - let set_ctx = move |request: &mut http::Request<()>| { - request.extensions_mut().insert(Arc::clone(&srv_ctx)); - }; + // TODO session context + let set_ctx = move |request: &mut http::Request<()>| { + request.extensions_mut().insert(Arc::clone(&srv_ctx)); + }; - let s = server.serve_modified(io, set_ctx).map_err(|_| ()); - executor.spawn(::logging::context_future(("serve", local_addr), s)); + let s = server.serve_modified(io, set_ctx).map_err(|_| ()); + executor.spawn(::logging::context_future(("serve", local_addr), s)); - (server, proxy_ctx, sensors, executor) - }) + future::ok((server, proxy_ctx, sensors, executor)) }, - ); - - Box::new(f.map(|_| {})) + ) } fn serve_control( @@ -351,28 +338,15 @@ where N: NewService, Response = http::Response> + 'static, { let server = Server::new(new_service, h2_builder, executor.clone()); - let incoming = bound_port.listen(executor).expect("listen"); - let f = incoming.fold( + bound_port.listen_and_fold( + executor, (server, executor.clone()), - move |(server, executor), (socket, _)| { - if let Err(e) = socket.set_nodelay(true) { - warn!( - "could not set TCP_NODELAY on {:?}/{:?}: {}", - socket.local_addr(), - socket.peer_addr(), - e - ); - } + move |(server, executor), (session, _)| { + let s = server.serve(session).map_err(|_| ()); - connection::Connection::handshake(socket).map(move |session| { - let s = server.serve(session).map_err(|_| ()); + executor.spawn(::logging::context_future("serve_control", s)); - executor.spawn(::logging::context_future("serve_control", s)); - - (server, executor) - }) + future::ok((server, executor)) }, - ); - - Box::new(f.map(|_| {})) + ) }