diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 4a0676b7e..23a29e2ea 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -53,7 +53,10 @@ use std::thread; use std::time::Duration; use indexmap::IndexSet; -use tokio::{executor, runtime::current_thread}; +use tokio::{ + executor::{self, DefaultExecutor, Executor}, + runtime::current_thread, +}; use tower_service::NewService; use tower_fn::*; use conduit_proxy_router::{Recognize, Router, Error as RouteError}; @@ -242,7 +245,8 @@ where config.inbound_router_capacity, config.inbound_router_max_idle_age, ); - let fut = serve( + serve( + "inbound", inbound_listener, router, config.private_connect_timeout, @@ -251,8 +255,7 @@ where sensors.clone(), get_original_dst.clone(), drain_rx.clone(), - ); - ::logging::context_future("inbound", fut) + ) }; // Setup the private listener. This will listen on a locally accessible @@ -266,7 +269,8 @@ where config.outbound_router_capacity, config.outbound_router_max_idle_age, ); - let fut = serve( + serve( + "outbound", outbound_listener, router, config.public_connect_timeout, @@ -275,8 +279,7 @@ where sensors, get_original_dst, drain_rx, - ); - ::logging::context_future("outbound", fut) + ) }; trace!("running"); @@ -333,6 +336,7 @@ where } fn serve( + name: &'static str, bound_port: BoundPort, router: Router, tcp_connect_timeout: Duration, @@ -410,10 +414,15 @@ where let accept = bound_port.listen_and_fold( (), move |(), (connection, remote_addr)| { - server.serve(connection, remote_addr); - Ok(()) + let s = server.serve(connection, remote_addr); + let s = ::logging::context_future((name, remote_addr), s); + let r = DefaultExecutor::current() + .spawn(Box::new(s)) + .map_err(task::Error::into_io); + future::result(r) }, ); + let accept = ::logging::context_future(name, accept); let accept_until = Cancelable { future: accept, diff --git a/proxy/src/transparency/server.rs b/proxy/src/transparency/server.rs index 1b90878df..2af45196b 100644 --- a/proxy/src/transparency/server.rs +++ b/proxy/src/transparency/server.rs @@ -7,7 +7,6 @@ use futures::{future::Either, Future}; use http; use hyper; use indexmap::IndexSet; -use tokio::executor::{Executor, DefaultExecutor}; use tokio::io::{AsyncRead, AsyncWrite}; use tower_service::NewService; use tower_h2; @@ -104,7 +103,9 @@ where /// what protocol the connection is speaking. From there, the connection /// will be mapped into respective services, and spawned into an /// executor. - pub fn serve(&self, connection: Connection, remote_addr: SocketAddr) { + pub fn serve(&self, connection: Connection, remote_addr: SocketAddr) + -> impl Future + { let opened_at = Instant::now(); // create Server context @@ -138,10 +139,7 @@ where self.drain_signal.clone(), ); - DefaultExecutor::current() - .spawn(Box::new(fut)) - .expect("spawn TCP server task"); - return; + return Either::B(fut); } // try to sniff protocol @@ -150,7 +148,7 @@ where let tcp = self.tcp.clone(); let new_service = self.new_service.clone(); let drain_signal = self.drain_signal.clone(); - let fut = io.peek() + Either::A(io.peek() .map_err(|e| debug!("peek error: {}", e)) .and_then(move |io| { if let Some(proto) = Protocol::detect(io.peeked()) { @@ -196,11 +194,7 @@ where drain_signal, )) } - }); - - DefaultExecutor::current() - .spawn(Box::new(fut)) - .expect("spawn transparent server task") + })) } }