diff --git a/proxy/src/connection.rs b/proxy/src/connection.rs index 9057ff0a7..106901731 100644 --- a/proxy/src/connection.rs +++ b/proxy/src/connection.rs @@ -101,13 +101,13 @@ impl BoundPort { // In the future it will also ensure that the connection is upgraded with // TLS when needed. pub fn listen_and_fold(self, initial: T, f: F) - -> Box + Send + 'static> + -> impl Future + Send + 'static where F: Fn(T, (Connection, SocketAddr)) -> Fut + Send + 'static, T: Send + 'static, Fut: IntoFuture + Send + 'static, ::Future: Send, { - let fut = future::lazy(move || { + future::lazy(move || { // Create the TCP listener lazily, so that it's not bound to a // reactor until the future is run. This will avoid // `Handle::current()` creating a mew thread for the global @@ -129,9 +129,7 @@ impl BoundPort { f(b, (Connection::plain(socket), remote_addr)) }) ) - .map(|_| ()); - - Box::new(fut) + .map(|_| ()) } } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index bd849151f..a2ccf52b4 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -345,7 +345,7 @@ fn serve( sensors: telemetry::Sensors, get_orig_dst: G, drain_rx: drain::Watch, -) -> Box + Send + 'static> +) -> impl Future + Send + 'static where B: tower_h2::Body + Default + Send + 'static, ::Buf: Send, @@ -426,9 +426,9 @@ where // As soon as we get a shutdown signal, the listener // is canceled immediately. - Box::new(drain_rx.watch(accept_until, |accept| { + drain_rx.watch(accept_until, |accept| { accept.canceled = true; - })) + }) } /// Can cancel a future by setting a flag. @@ -459,7 +459,7 @@ where fn serve_control( bound_port: BoundPort, new_service: N, -) -> Box + 'static> +) -> impl Future + 'static where B: tower_h2::Body + Send + 'static, ::Buf: Send, diff --git a/proxy/src/telemetry/control.rs b/proxy/src/telemetry/control.rs index c817ddddb..f912de09f 100644 --- a/proxy/src/telemetry/control.rs +++ b/proxy/src/telemetry/control.rs @@ -111,7 +111,7 @@ impl Control { } pub fn serve_metrics(&self, bound_port: connection::BoundPort) - -> Box> + -> impl Future { use hyper; let service = self.metrics_service.clone(); diff --git a/proxy/src/transparency/server.rs b/proxy/src/transparency/server.rs index 5fd3e00cd..1b90878df 100644 --- a/proxy/src/transparency/server.rs +++ b/proxy/src/transparency/server.rs @@ -3,7 +3,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant}; -use futures::Future; +use futures::{future::Either, Future}; use http; use hyper; use indexmap::IndexSet; @@ -139,7 +139,7 @@ where ); DefaultExecutor::current() - .spawn(fut) + .spawn(Box::new(fut)) .expect("spawn TCP server task"); return; } @@ -152,13 +152,13 @@ where let drain_signal = self.drain_signal.clone(); let fut = io.peek() .map_err(|e| debug!("peek error: {}", e)) - .and_then(move |io| -> Box + Send> { + .and_then(move |io| { if let Some(proto) = Protocol::detect(io.peeked()) { - match proto { + Either::A(match proto { Protocol::Http1 => { trace!("transparency detected HTTP/1"); - Box::new(new_service.new_service() + let fut = new_service.new_service() .map_err(|_| ()) .and_then(move |s| { let svc = HyperServerSvc::new(s, srv_ctx); @@ -168,7 +168,8 @@ where }) .map(|_| ()) .map_err(|e| trace!("http1 server error: {:?}", e)) - })) + }); + Either::A(fut) }, Protocol::Http2 => { trace!("transparency detected HTTP/2"); @@ -183,17 +184,17 @@ where }) .map_err(|e| trace!("h2 server error: {:?}", e)); - Box::new(fut) + Either::B(fut) } - } + }) } else { trace!("transparency did not detect protocol, treating as TCP"); - tcp_serve( + Either::B(tcp_serve( &tcp, io, srv_ctx, drain_signal, - ) + )) } }); @@ -208,11 +209,11 @@ fn tcp_serve( io: T, srv_ctx: Arc, drain_signal: drain::Watch, -) -> Box + Send + 'static> { +) -> impl Future + Send + 'static { let fut = tcp.serve(io, srv_ctx); // There's nothing to do when drain is signaled, we just have to hope // the sockets finish soon. However, the drain signal still needs to // 'watch' the TCP future so that the process doesn't close early. - Box::new(drain_signal.watch(fut, |_| ())) + drain_signal.watch(fut, |_| ()) } diff --git a/proxy/src/transparency/tcp.rs b/proxy/src/transparency/tcp.rs index 333926225..2221ace7a 100644 --- a/proxy/src/transparency/tcp.rs +++ b/proxy/src/transparency/tcp.rs @@ -30,7 +30,7 @@ impl Proxy { /// Serve a TCP connection, trying to forward it to its destination. pub fn serve(&self, tcp_in: T, srv_ctx: Arc) - -> Box + Send> + -> impl Future + Send where T: AsyncRead + AsyncWrite + Send + 'static, { @@ -51,7 +51,7 @@ impl Proxy { "tcp accepted, no SO_ORIGINAL_DST to forward: remote={}", srv_ctx.remote, ); - return Box::new(future::ok(())); + return future::Either::B(future::ok(())); }; let client_ctx = ClientCtx::new( @@ -65,13 +65,12 @@ impl Proxy { ); let connect = self.sensors.connect(c, &client_ctx); - let fut = connect.connect() + future::Either::A(connect.connect() .map_err(move |e| error!("tcp connect error to {}: {:?}", orig_dst, e)) .and_then(move |tcp_out| { Duplex::new(tcp_in, tcp_out) .map_err(|e| error!("tcp duplex error: {}", e)) - }); - Box::new(fut) + })) } }