Encapsulate listening port connection acceptance logic (#46)

Previously every use of `BoundPort` repeated a bunch of logic.

Move the repeated logic to `BoundPort` itself. Just remove the no-op
handshaking logic; new handshaking logic will be added to `BoundPort`
when TLS is added.
This commit is contained in:
Brian Smith 2017-12-14 13:19:05 -10:00 committed by GitHub
parent 95cb05d3a9
commit 1af68d3a14
2 changed files with 77 additions and 78 deletions

View File

@ -3,11 +3,12 @@ use std;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio_core; use tokio_core;
use tokio_core::net::{Incoming, TcpListener}; use tokio_core::net::TcpListener;
use tokio_core::reactor::Handle; use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use config::Addr; use config::Addr;
use transport;
pub type PlaintextSocket = tokio_core::net::TcpStream; pub type PlaintextSocket = tokio_core::net::TcpStream;
@ -22,13 +23,6 @@ pub enum Connection {
Plain(PlaintextSocket), Plain(PlaintextSocket),
} }
/// A connection handshake.
///
/// Resolves to a connection ready to be used at the next layer.
pub struct Handshake {
plaintext_socket: Option<PlaintextSocket>,
}
// ===== impl BoundPort ===== // ===== impl BoundPort =====
impl BoundPort { impl BoundPort {
@ -45,19 +39,62 @@ impl BoundPort {
self.local_addr self.local_addr
} }
pub fn listen(self, executor: &Handle) -> Result<Incoming, io::Error> { // Listen for incoming connections and dispatch them to the handler `f`.
TcpListener::from_listener(self.inner, &self.local_addr, executor) //
.map(|listener| listener.incoming()) // This ensures that every incoming connection has the correct options set.
// In the future it will also ensure that the connection is upgraded with
// TLS when needed.
pub fn listen_and_fold<T, F, Fut>(self, executor: &Handle, initial: T, f: F)
-> Box<Future<Item = (), Error = io::Error> + 'static>
where
F: Fn(T, (Connection, SocketAddr)) -> Fut + 'static,
T: 'static,
Fut: IntoFuture<Item = T, Error = std::io::Error> + 'static {
let fut = TcpListener::from_listener(self.inner, &self.local_addr, &executor)
.expect("from_listener") // TODO: get rid of this `expect()`.
.incoming()
.fold(initial, move |b, (socket, remote_addr)| {
// TODO: On Linux and most other platforms it would be better
// to set the `TCP_NODELAY` option on the bound socket and
// then have the listening sockets inherit it. However, that
// doesn't work on all platforms and also the underlying
// libraries don't have the necessary API for that, so just
// do it here.
if let Err(e) = socket.set_nodelay(true) {
warn!(
"could not set TCP_NODELAY on {:?}/{:?}: {}",
socket.local_addr(),
socket.peer_addr(),
e
);
}
f(b, (Connection::Plain(socket), remote_addr))
});
Box::new(fut.map(|_| ()))
} }
} }
// ===== impl Connection ===== // ===== impl Connection =====
impl Connection { impl Connection {
/// Establish a connection backed by the provided `io`. pub fn original_dst_addr(&self) -> Option<SocketAddr> {
pub fn handshake(io: PlaintextSocket) -> Handshake { transport::get_original_dst(self.socket())
Handshake { }
plaintext_socket: Some(io),
pub fn local_addr(&self) -> Result<SocketAddr, std::io::Error> {
self.socket().local_addr()
}
// This must never be made public so that in the future `Connection` can
// control access to the plaintext socket for TLS, to ensure no private
// data is accidentally writen to the socket and to ensure no unprotected
// data is read from the socket. Each piece of information needed about the
// underlying socket should be exposed by its own minimal accessor function
// as is done above.
fn socket(&self) -> &PlaintextSocket {
match self {
&Connection::Plain(ref socket) => socket
} }
} }
} }
@ -103,15 +140,3 @@ impl AsyncWrite for Connection {
} }
} }
} }
// ===== impl Handshake =====
impl Future for Handshake {
type Item = Connection;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, io::Error> {
let plaintext_socket = self.plaintext_socket.take().expect("poll after complete");
Ok(Connection::Plain(plaintext_socket).into())
}
}

View File

@ -188,7 +188,7 @@ impl Main {
Inbound::new(default_addr, bind), Inbound::new(default_addr, bind),
ctx, ctx,
sensors.clone(), sensors.clone(),
executor.clone(), &executor,
); );
::logging::context_future("inbound", fut) ::logging::context_future("inbound", fut)
}; };
@ -210,7 +210,7 @@ impl Main {
Outbound::new(bind, control), Outbound::new(bind, control),
ctx, ctx,
sensors, sensors,
executor, &executor,
); );
::logging::context_future("outbound", fut) ::logging::context_future("outbound", fut)
}; };
@ -273,7 +273,7 @@ fn serve<R, B, E, F>(
recognize: R, recognize: R,
proxy_ctx: Arc<ctx::Proxy>, proxy_ctx: Arc<ctx::Proxy>,
sensors: telemetry::Sensors, sensors: telemetry::Sensors,
executor: Handle, executor: &Handle,
) -> Box<Future<Item = (), Error = io::Error> + 'static> ) -> Box<Future<Item = (), Error = io::Error> + 'static>
where where
B: Body + Default + 'static, B: Body + Default + 'static,
@ -302,27 +302,17 @@ where
h2_builder, h2_builder,
::logging::context_executor(("serve", listen_addr), executor.clone()), ::logging::context_executor(("serve", listen_addr), executor.clone()),
); );
let incoming = bound_port.listen(&executor).expect("listen"); bound_port.listen_and_fold(
let f = incoming.fold( executor,
(server, proxy_ctx, sensors, executor), (server, proxy_ctx, sensors, executor.clone()),
move |(server, proxy_ctx, sensors, executor), (socket, remote_addr)| { move |(server, proxy_ctx, sensors, executor), (connection, remote_addr)| {
if let Err(e) = socket.set_nodelay(true) {
warn!(
"could not set TCP_NODELAY on {:?}/{:?}: {}",
socket.local_addr(),
socket.peer_addr(),
e
);
}
let opened_at = Instant::now(); let opened_at = Instant::now();
let orig_dst = transport::get_original_dst(&socket); let orig_dst = connection.original_dst_addr();
let local_addr = socket.local_addr().unwrap_or(listen_addr); let local_addr = connection.local_addr().unwrap_or(listen_addr);
let srv_ctx = let srv_ctx =
ctx::transport::Server::new(&proxy_ctx, &local_addr, &remote_addr, &orig_dst); ctx::transport::Server::new(&proxy_ctx, &local_addr, &remote_addr, &orig_dst);
connection::Connection::handshake(socket).map(move |session| { let io = sensors.accept(connection, opened_at, &srv_ctx);
let io = sensors.accept(session, opened_at, &srv_ctx);
// TODO session context // TODO session context
let set_ctx = move |request: &mut http::Request<()>| { let set_ctx = move |request: &mut http::Request<()>| {
@ -332,12 +322,9 @@ where
let s = server.serve_modified(io, set_ctx).map_err(|_| ()); let s = server.serve_modified(io, set_ctx).map_err(|_| ());
executor.spawn(::logging::context_future(("serve", local_addr), s)); executor.spawn(::logging::context_future(("serve", local_addr), s));
(server, proxy_ctx, sensors, executor) future::ok((server, proxy_ctx, sensors, executor))
})
}, },
); )
Box::new(f.map(|_| {}))
} }
fn serve_control<N, B>( fn serve_control<N, B>(
@ -351,28 +338,15 @@ where
N: NewService<Request = http::Request<RecvBody>, Response = http::Response<B>> + 'static, N: NewService<Request = http::Request<RecvBody>, Response = http::Response<B>> + 'static,
{ {
let server = Server::new(new_service, h2_builder, executor.clone()); let server = Server::new(new_service, h2_builder, executor.clone());
let incoming = bound_port.listen(executor).expect("listen"); bound_port.listen_and_fold(
let f = incoming.fold( executor,
(server, executor.clone()), (server, executor.clone()),
move |(server, executor), (socket, _)| { move |(server, executor), (session, _)| {
if let Err(e) = socket.set_nodelay(true) {
warn!(
"could not set TCP_NODELAY on {:?}/{:?}: {}",
socket.local_addr(),
socket.peer_addr(),
e
);
}
connection::Connection::handshake(socket).map(move |session| {
let s = server.serve(session).map_err(|_| ()); let s = server.serve(session).map_err(|_| ());
executor.spawn(::logging::context_future("serve_control", s)); executor.spawn(::logging::context_future("serve_control", s));
(server, executor) future::ok((server, executor))
})
}, },
); )
Box::new(f.map(|_| {}))
} }