From 02c6887020010ead730d537bea765ef9774a0f2a Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 10 Apr 2018 14:15:37 -0700 Subject: [PATCH] proxy: improve graceful shutdown process (#684) - The listener is immediately closed on receipt of a shutdown signal. - All in-progress server connections are now counted, and the process will not shutdown until the connection count has dropped to zero. - In the case of HTTP1, idle connections are closed. In the case of HTTP2, the HTTP2 graceful shutdown steps are followed of sending various GOAWAYs. --- Cargo.lock | 14 +- Cargo.toml | 1 + proxy/Cargo.toml | 2 +- proxy/src/drain.rs | 267 ++++++++++++++++++++++++++++++ proxy/src/lib.rs | 52 +++++- proxy/src/transparency/server.rs | 35 +++- proxy/tests/discovery.rs | 12 +- proxy/tests/shutdown.rs | 154 +++++++++++++++++ proxy/tests/support/client.rs | 144 ++++++++++++---- proxy/tests/support/controller.rs | 1 - proxy/tests/support/mod.rs | 42 +++-- proxy/tests/support/proxy.rs | 21 ++- proxy/tests/support/server.rs | 26 ++- proxy/tests/transparency.rs | 8 +- 14 files changed, 708 insertions(+), 71 deletions(-) create mode 100644 proxy/src/drain.rs create mode 100644 proxy/tests/shutdown.rs diff --git a/Cargo.lock b/Cargo.lock index d377ecb1d..1d2e87cb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,7 +108,7 @@ dependencies = [ "env_logger 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "futures-mpsc-lossy 0.3.0", - "h2 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.22 (registry+https://github.com/rust-lang/crates.io-index)", @@ -143,7 +143,7 @@ dependencies = [ "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "convert 0.3.0", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", - "h2 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "prost-derive 0.3.2 (git+https://github.com/danburkert/prost)", @@ -249,7 +249,7 @@ dependencies = [ [[package]] name = "h2" -version = "0.1.3" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "byteorder 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -786,7 +786,7 @@ source = "git+https://github.com/tower-rs/tower-grpc#57d976aca89c13838b946dca9b6 dependencies = [ "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", - "h2 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -808,11 +808,11 @@ dependencies = [ [[package]] name = "tower-h2" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower-h2#59f344f02d37a9e8596805f159bdca3af13ee7b0" +source = "git+https://github.com/tower-rs/tower-h2#26453f5f0afe2928b47af38713f880ecf27fa85f" dependencies = [ "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", - "h2 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)", @@ -957,7 +957,7 @@ dependencies = [ "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "0bab5b5e94f5c31fc764ba5dd9ad16568aae5d4825538c01d6bca680c9bf94a7" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" -"checksum h2 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "771f68c296fba5783c4eddd3aab1e7d4523ac88ecc5549ffb19782b6b261981b" +"checksum h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "065fb096fc65bbfb9c765d48c9f3f1a21cdb25ba0d3f82105b38f30ddffa2f7e" "checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82" "checksum http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "75df369fd52c60635208a4d3e694777c099569b3dcf4844df8f652dc004644ab" "checksum httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c2f407128745b78abc95c0ffbe4e5d37427fdc0d45470710cfef8c44522a2e37" diff --git a/Cargo.toml b/Cargo.toml index cb9ddc34d..55af0a1ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,4 @@ members = [ [patch.crates-io] prost-derive = { git = "https://github.com/danburkert/prost" } + diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 3ee199113..10ac58ef1 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -19,7 +19,7 @@ bytes = "0.4" domain = "0.2.3" env_logger = { version = "0.5", default-features = false } futures = "0.1" -h2 = "0.1" +h2 = "0.1.5" http = "0.1" httparse = "1.2" hyper = { version = "0.11.22", default-features = false, features = ["compat"] } diff --git a/proxy/src/drain.rs b/proxy/src/drain.rs new file mode 100644 index 000000000..5ff536b48 --- /dev/null +++ b/proxy/src/drain.rs @@ -0,0 +1,267 @@ +use std::mem; + +use futures::{Async, Future, Poll, Stream}; +use futures::future::Shared; +use futures::sync::{mpsc, oneshot}; + +/// Creates a drain channel. +/// +/// The `Signal` is used to start a drain, and the `Watch` will be notified +/// when a drain is signaled. +pub fn channel() -> (Signal, Watch) { + let (tx, rx) = oneshot::channel(); + let (drained_tx, drained_rx) = mpsc::channel(0); + ( + Signal { + drained_rx, + tx, + }, + Watch { + drained_tx, + rx: rx.shared(), + }, + ) +} + +/// Send a drain command to all watchers. +/// +/// When a drain is started, this returns a `Drained` future which resolves +/// when all `Watch`ers have been dropped. +#[derive(Debug)] +pub struct Signal { + drained_rx: mpsc::Receiver, + tx: oneshot::Sender<()>, +} + +/// Watch for a drain command. +/// +/// This wraps another future and callback to be called when drain is triggered. +#[derive(Clone, Debug)] +pub struct Watch { + drained_tx: mpsc::Sender, + rx: Shared>, +} + +/// The wrapped watching `Future`. +#[derive(Debug)] +pub struct Watching { + future: A, + state: State, + watch: Watch, +} + +#[derive(Debug)] +enum State { + Watch(F), + Draining, +} + +//TODO: in Rust 1.26, replace this with `!`. +#[derive(Debug)] +enum Never {} + +/// A future that resolves when all `Watch`ers have been dropped (drained). +pub struct Drained { + drained_rx: mpsc::Receiver, +} + +// ===== impl Signal ===== + +impl Signal { + /// Start the draining process. + /// + /// A signal is sent to all futures watching for the signal. A new future + /// is returned from this method that resolves when all watchers have + /// completed. + pub fn drain(self) -> Drained { + let _ = self.tx.send(()); + Drained { + drained_rx: self.drained_rx, + } + } +} + +// ===== impl Watch ===== + +impl Watch { + /// Wrap a future and a callback that is triggered when drain is received. + /// + /// The callback receives a mutable reference to the original future, and + /// should be used to trigger any shutdown process for it. + pub fn watch(self, future: A, on_drain: F) -> Watching + where + A: Future, + F: FnOnce(&mut A), + { + Watching { + future, + state: State::Watch(on_drain), + watch: self, + } + } +} + +// ===== impl Watching ===== + +impl Future for Watching +where + A: Future, + F: FnOnce(&mut A), +{ + type Item = A::Item; + type Error = A::Error; + + fn poll(&mut self) -> Poll { + loop { + match mem::replace(&mut self.state, State::Draining) { + State::Watch(on_drain) => { + match self.watch.rx.poll() { + Ok(Async::Ready(_)) | Err(_) => { + // Drain has been triggered! + on_drain(&mut self.future); + }, + Ok(Async::NotReady) => { + self.state = State::Watch(on_drain); + return self.future.poll(); + } + } + }, + State::Draining => { + return self.future.poll(); + }, + } + } + } +} + +// ===== impl Drained ===== + +impl Future for Drained { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + match try_ready!(self.drained_rx.poll()) { + Some(never) => match never {}, + None => Ok(Async::Ready(())), + } + } +} + +#[cfg(test)] +mod tests { + use futures::{future, Async, Future, Poll}; + use super::*; + + struct TestMe { + draining: bool, + finished: bool, + poll_cnt: usize, + } + + impl Future for TestMe { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + self.poll_cnt += 1; + if self.finished { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + } + } + + #[test] + fn watch() { + future::lazy(|| { + let (tx, rx) = channel(); + let fut = TestMe { + draining: false, + finished: false, + poll_cnt: 0, + }; + + let mut watch = rx.watch(fut, |fut| { + fut.draining = true; + }); + + assert_eq!(watch.future.poll_cnt, 0); + + // First poll should poll the inner future + assert!(watch.poll().unwrap().is_not_ready()); + assert_eq!(watch.future.poll_cnt, 1); + + // Second poll should poll the inner future again + assert!(watch.poll().unwrap().is_not_ready()); + assert_eq!(watch.future.poll_cnt, 2); + + let mut draining = tx.drain(); + // Drain signaled, but needs another poll to be noticed. + assert!(!watch.future.draining); + assert_eq!(watch.future.poll_cnt, 2); + + // Now, poll after drain has been signaled. + assert!(watch.poll().unwrap().is_not_ready()); + assert_eq!(watch.future.poll_cnt, 3); + assert!(watch.future.draining); + + // Draining is not ready until watcher completes + assert!(draining.poll().unwrap().is_not_ready()); + + // Finishing up the watch future + watch.future.finished = true; + assert!(watch.poll().unwrap().is_ready()); + assert_eq!(watch.future.poll_cnt, 4); + drop(watch); + + assert!(draining.poll().unwrap().is_ready()); + + Ok::<_, ()>(()) + }).wait().unwrap(); + } + + #[test] + fn watch_clones() { + future::lazy(|| { + let (tx, rx) = channel(); + + let fut1 = TestMe { + draining: false, + finished: false, + poll_cnt: 0, + }; + let fut2 = TestMe { + draining: false, + finished: false, + poll_cnt: 0, + }; + + let watch1 = rx.clone().watch(fut1, |fut| { + fut.draining = true; + }); + let watch2 = rx.watch(fut2, |fut| { + fut.draining = true; + }); + + let mut draining = tx.drain(); + + // Still 2 outstanding watchers + assert!(draining.poll().unwrap().is_not_ready()); + + // drop 1 for whatever reason + drop(watch1); + + // Still not ready, 1 other watcher still pending + assert!(draining.poll().unwrap().is_not_ready()); + + drop(watch2); + + // Now all watchers are gone, draining is complete + assert!(draining.poll().unwrap().is_ready()); + + Ok::<_, ()>(()) + }).wait().unwrap(); + } +} diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index ede1adf46..0a13f6b26 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -65,6 +65,7 @@ mod connection; pub mod control; mod ctx; mod dns; +mod drain; mod inbound; mod logging; mod map_err; @@ -207,6 +208,7 @@ where let (control, control_bg) = control::new(dns_config.clone(), config.pod_namespace.clone()); let executor = core.handle(); + let (drain_tx, drain_rx) = drain::channel(); let bind = Bind::new(executor.clone()).with_sensors(sensors.clone()); @@ -228,6 +230,7 @@ where ctx, sensors.clone(), get_original_dst.clone(), + drain_rx.clone(), &executor, ); ::logging::context_future("inbound", fut) @@ -248,6 +251,7 @@ where ctx, sensors, get_original_dst, + drain_rx, &executor, ); ::logging::context_future("outbound", fut) @@ -308,7 +312,12 @@ where .map_err(|err| error!("main error: {:?}", err)); core.handle().spawn(fut); + let shutdown_signal = shutdown_signal.and_then(move |()| { + debug!("shutdown signaled"); + drain_tx.drain() + }); core.run(shutdown_signal).expect("executor"); + debug!("shutdown complete"); } } @@ -320,6 +329,7 @@ fn serve( proxy_ctx: Arc, sensors: telemetry::Sensors, get_orig_dst: G, + drain_rx: drain::Watch, executor: &Handle, ) -> Box + 'static> where @@ -368,17 +378,55 @@ where stack, tcp_connect_timeout, disable_protocol_detection_ports, + drain_rx.clone(), executor.clone(), ); - bound_port.listen_and_fold( + + let accept = bound_port.listen_and_fold( executor, (), move |(), (connection, remote_addr)| { server.serve(connection, remote_addr); Ok(()) }, - ) + ); + + let accept_until = Cancelable { + future: accept, + canceled: false, + }; + + // As soon as we get a shutdown signal, the listener + // is canceled immediately. + Box::new(drain_rx.watch(accept_until, |accept| { + accept.canceled = true; + })) +} + +/// Can cancel a future by setting a flag. +/// +/// Used to 'watch' the accept futures, and close the listeners +/// as soon as the shutdown signal starts. +struct Cancelable { + future: F, + canceled: bool, +} + +impl Future for Cancelable +where + F: Future, +{ + type Item = (); + type Error = F::Error; + + fn poll(&mut self) -> Poll { + if self.canceled { + Ok(().into()) + } else { + self.future.poll() + } + } } fn serve_control( diff --git a/proxy/src/transparency/server.rs b/proxy/src/transparency/server.rs index b3e3edd77..f7c3ce503 100644 --- a/proxy/src/transparency/server.rs +++ b/proxy/src/transparency/server.rs @@ -15,6 +15,7 @@ use conduit_proxy_controller_grpc::common; use connection::Connection; use ctx::Proxy as ProxyCtx; use ctx::transport::{Server as ServerCtx}; +use drain; use telemetry::Sensors; use transport::GetOriginalDst; use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc}; @@ -26,12 +27,14 @@ use super::tcp; /// This type can `serve` new connections, determine what protocol /// the connection is speaking, and route it to the corresponding /// service. -pub struct Server +pub struct Server where S: NewService>, S::Future: 'static, + B: tower_h2::Body, { disable_protocol_detection_ports: IndexSet, + drain_signal: drain::Watch, executor: Handle, get_orig_dst: G, h1: hyper::server::Http, @@ -51,6 +54,7 @@ where > + Clone + 'static, S::Future: 'static, S::Error: fmt::Debug, + S::InitError: fmt::Debug, B: tower_h2::Body + 'static, G: GetOriginalDst, { @@ -63,12 +67,14 @@ where stack: S, tcp_connect_timeout: Duration, disable_protocol_detection_ports: IndexSet, + drain_signal: drain::Watch, executor: Handle, ) -> Self { let recv_body_svc = HttpBodyNewSvc::new(stack.clone()); let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone(), &executor); Server { disable_protocol_detection_ports, + drain_signal, executor: executor.clone(), get_orig_dst, h1: hyper::server::Http::new(), @@ -108,6 +114,7 @@ where let fut = tcp_serve( &self.tcp, connection, + self.drain_signal.clone(), &self.sensors, opened_at, &self.proxy_ctx, @@ -127,6 +134,7 @@ where let h2 = self.h2.clone(); let tcp = self.tcp.clone(); let new_service = self.new_service.clone(); + let drain_signal = self.drain_signal.clone(); let fut = connection .peek_future(sniff) .map_err(|_| ()) @@ -151,9 +159,12 @@ where .map_err(|_| ()) .and_then(move |s| { let svc = HyperServerSvc::new(s, srv_ctx); - h1.serve_connection(io, svc) + drain_signal + .watch(h1.serve_connection(io, svc), |conn| { + conn.disable_keep_alive(); + }) .map(|_| ()) - .map_err(|_| ()) + .map_err(|e| trace!("http1 server error: {:?}", e)) })) }, Protocol::Http2 => { @@ -162,7 +173,14 @@ where let set_ctx = move |request: &mut http::Request<()>| { request.extensions_mut().insert(srv_ctx.clone()); }; - Box::new(h2.serve_modified(io, set_ctx).map_err(|_| ())) + + let fut = drain_signal + .watch(h2.serve_modified(io, set_ctx), |conn| { + conn.graceful_shutdown(); + }) + .map_err(|e| trace!("h2 server error: {:?}", e)); + + Box::new(fut) } } } else { @@ -170,6 +188,7 @@ where tcp_serve( &tcp, connection, + drain_signal, &sensors, opened_at, &proxy_ctx, @@ -195,6 +214,7 @@ struct OrigDst<'a>(&'a Option); fn tcp_serve( tcp: &tcp::Proxy, connection: Connection, + drain_signal: drain::Watch, sensors: &Sensors, opened_at: Instant, proxy_ctx: &Arc, @@ -213,5 +233,10 @@ fn tcp_serve( // record telemetry let tcp_in = sensors.accept(connection, opened_at, &srv_ctx); - tcp.serve(tcp_in, srv_ctx) + let fut = tcp.serve(tcp_in, srv_ctx); + + // There's nothing to do when drain is signaled, we just have to hope + // the sockets finish soon. However, the drain signal still needs to + // 'watch' the TCP future so that the process doesn't close early. + Box::new(drain_signal.watch(fut, |_| ())) } diff --git a/proxy/tests/discovery.rs b/proxy/tests/discovery.rs index a970affea..98a89b084 100644 --- a/proxy/tests/discovery.rs +++ b/proxy/tests/discovery.rs @@ -201,7 +201,17 @@ fn outbound_updates_newer_services() { // the HTTP2 service starts watching first, receiving an addr // from the controller let client1 = client::http2(proxy.outbound, "disco.test.svc.cluster.local"); - client1.get("/h2"); // 500, ignore + + // This HTTP2 client tries to talk to our HTTP1 server, and the server + // will return an error (see above TODO). + // + // The reason to do this request at all is because the proxy will create + // an H2 service mapping when it sees an H2 request, and we want to test + // that when it sees H1 and tries to create a new mapping, the existing + // known Discovery information is shared with it. + let res = client1.request(&mut client1.request_builder("/h1")); + assert_eq!(res.status(), 500); + // a new HTTP1 service needs to be build now, while the HTTP2 // service already exists, so make sure previously sent addrs diff --git a/proxy/tests/shutdown.rs b/proxy/tests/shutdown.rs new file mode 100644 index 000000000..2582ee4d7 --- /dev/null +++ b/proxy/tests/shutdown.rs @@ -0,0 +1,154 @@ +mod support; +use self::support::*; + +#[test] +fn h2_goaways_connections() { + let _ = env_logger::try_init(); + + let (shdn, rx) = shutdown_signal(); + + let srv = server::http2().route("/", "hello").run(); + let ctrl = controller::new().run(); + let proxy = proxy::new() + .controller(ctrl) + .inbound(srv) + .shutdown_signal(rx) + .run(); + let client = client::http2(proxy.inbound, "shutdown.test.svc.cluster.local"); + + assert_eq!(client.get("/"), "hello"); + + shdn.signal(); + + client.wait_for_closed(); +} + +#[test] +fn h2_exercise_goaways_connections() { + let _ = env_logger::try_init(); + + const RESPONSE_SIZE: usize = 1024 * 16; + const NUM_REQUESTS: usize = 50; + + let (shdn, rx) = shutdown_signal(); + + let body = Bytes::from(vec![b'1'; RESPONSE_SIZE]); + let srv = server::http2() + .route_fn("/", move |_req| { + Response::builder() + .body(body.clone()) + .unwrap() + }) + .run(); + let ctrl = controller::new().run(); + let proxy = proxy::new() + .controller(ctrl) + .inbound(srv) + .shutdown_signal(rx) + .run(); + let client = client::http2(proxy.inbound, "shutdown.test.svc.cluster.local"); + + let reqs = (0..NUM_REQUESTS) + .into_iter() + .map(|_| { + client.request_async( + client.request_builder("/").method("GET") + ) + }) + .collect::>(); + + // Wait to get all responses (but not bodies) + let resps = future::join_all(reqs) + .wait() + .expect("reqs"); + + // Trigger a shutdown while bodies are still in progress. + shdn.signal(); + + let bodies = resps + .into_iter() + .map(|resp| { + resp + .into_body() + .concat2() + // Make sure the bodies weren't cut off + .map(|buf| assert_eq!(buf.len(), RESPONSE_SIZE)) + }) + .collect::>(); + + // See that the proxy gives us all the bodies. + future::join_all(bodies) + .wait() + .expect("bodies"); + + client.wait_for_closed(); +} + +#[test] +fn http1_closes_idle_connections() { + use std::cell::RefCell; + let _ = env_logger::try_init(); + + let (shdn, rx) = shutdown_signal(); + + const RESPONSE_SIZE: usize = 1024 * 16; + let body = Bytes::from(vec![b'1'; RESPONSE_SIZE]); + + let shdn = RefCell::new(Some(shdn)); + let srv = server::http1() + .route_fn("/", move |_req| { + // Trigger a shutdown signal while the request is made + // but a response isn't returned yet. + shdn.borrow_mut() + .take() + .expect("only 1 request") + .signal(); + Response::builder() + .body(body.clone()) + .unwrap() + }) + .run(); + let ctrl = controller::new().run(); + let proxy = proxy::new() + .controller(ctrl) + .inbound(srv) + .shutdown_signal(rx) + .run(); + let client = client::http1(proxy.inbound, "shutdown.test.svc.cluster.local"); + + let res_body = client.get("/"); + assert_eq!(res_body.len(), RESPONSE_SIZE); + + client.wait_for_closed(); +} + +#[test] +fn tcp_waits_for_proxies_to_close() { + let _ = env_logger::try_init(); + + let (shdn, rx) = shutdown_signal(); + let msg1 = "custom tcp hello"; + let msg2 = "custom tcp bye"; + + let srv = server::tcp() + .accept(move |read| { + assert_eq!(read, msg1.as_bytes()); + msg2 + }) + .run(); + let ctrl = controller::new().run(); + let proxy = proxy::new() + .controller(ctrl) + .inbound(srv) + .shutdown_signal(rx) + .run(); + + let client = client::tcp(proxy.inbound); + + let tcp_client = client.connect(); + + shdn.signal(); + + tcp_client.write(msg1); + assert_eq!(tcp_client.read(), msg2.as_bytes()); +} diff --git a/proxy/tests/support/client.rs b/proxy/tests/support/client.rs index 0c467cd84..e5423a1c3 100644 --- a/proxy/tests/support/client.rs +++ b/proxy/tests/support/client.rs @@ -1,7 +1,11 @@ use support::*; +use std::cell::RefCell; +use std::io; + use self::futures::sync::{mpsc, oneshot}; use self::tokio_core::net::TcpStream; +use self::tokio_io::{AsyncRead, AsyncWrite}; type Request = http::Request<()>; type Response = http::Response; @@ -18,7 +22,7 @@ pub fn http1>(addr: SocketAddr, auth: T) -> Client { }) } -// This sends `GET http://foo.com/ HTTP/1.1` instead of just `GET / HTTP/1.1`. +/// This sends `GET http://foo.com/ HTTP/1.1` instead of just `GET / HTTP/1.1`. pub fn http1_absolute_uris>(addr: SocketAddr, auth: T) -> Client { Client::new(addr, auth.into(), Run::Http1 { absolute_uris: true, @@ -35,6 +39,9 @@ pub fn tcp(addr: SocketAddr) -> tcp::TcpClient { pub struct Client { authority: String, + /// This is a future that completes when the associated connection for + /// this Client has been dropped. + running: Running, tx: Sender, version: http::Version, } @@ -45,9 +52,11 @@ impl Client { Run::Http1 { .. } => http::Version::HTTP_11, Run::Http2 => http::Version::HTTP_2, }; + let (tx, running) = run(addr, r); Client { authority, - tx: run(addr, r), + running, + tx, version: v, } } @@ -55,20 +64,30 @@ impl Client { pub fn get(&self, path: &str) -> String { let mut req = self.request_builder(path); let res = self.request(req.method("GET")); + assert_eq!( + res.status(), + StatusCode::OK, + "client.get({:?}) expects 200 OK, got \"{}\"", + path, + res.status(), + ); let stream = res.into_parts().1; stream.concat2() .map(|body| ::std::str::from_utf8(&body).unwrap().to_string()) .wait() - .unwrap() + .expect("get() wait body") + } + + pub fn request_async(&self, builder: &mut http::request::Builder) -> Box + Send> { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send((builder.body(()).unwrap(), tx)); + Box::new(rx.then(|oneshot_result| oneshot_result.expect("request canceled"))) } pub fn request(&self, builder: &mut http::request::Builder) -> Response { - let (tx, rx) = oneshot::channel(); - let _ = self.tx.unbounded_send((builder.body(()).unwrap(), tx)); - rx.map_err(|_| panic!("client request dropped")) + self.request_async(builder) .wait() - .map(|result| result.unwrap()) - .unwrap() + .expect("response") } pub fn request_builder(&self, path: &str) -> http::request::Builder { @@ -77,6 +96,12 @@ impl Client { .version(self.version); b } + + pub fn wait_for_closed(self) { + self.running + .wait() + .expect("wait_for_closed"); + } } enum Run { @@ -86,14 +111,19 @@ enum Run { Http2, } -fn run(addr: SocketAddr, version: Run) -> Sender { - let (tx, rx) = mpsc::unbounded::<(Request, oneshot::Sender>)>(); +fn run(addr: SocketAddr, version: Run) -> (Sender, Running) { + let (tx, mut rx) = mpsc::unbounded::<(Request, oneshot::Sender>)>(); + let (running_tx, running_rx) = running(); ::std::thread::Builder::new().name("support client".into()).spawn(move || { - let mut core = Core::new().unwrap(); + let mut core = Core::new().expect("client core new"); let reactor = core.handle(); - let conn = Conn(addr, reactor.clone()); + let conn = Conn { + addr, + handle: reactor.clone(), + running: RefCell::new(Some(running_tx)), + }; let work: Box> = match version { Run::Http1 { absolute_uris } => { @@ -127,13 +157,13 @@ fn run(addr: SocketAddr, version: Run) -> Sender { .map_err(|e| println!("client error: {:?}", e))) }, Run::Http2 => { - let h2 = tower_h2::client::Connect::::new( + let connect = tower_h2::client::Connect::::new( conn, Default::default(), reactor.clone(), ); - Box::new(h2.new_service() + Box::new(connect.new_service() .map_err(move |err| println!("connect error ({:?}): {:?}", addr, err)) .and_then(move |mut h2| { rx.for_each(move |(req, cb)| { @@ -157,34 +187,86 @@ fn run(addr: SocketAddr, version: Run) -> Sender { } }; - core.run(work).unwrap(); - }).unwrap(); - tx + core.run(work).expect("support client core run"); + }).expect("support client thread spawn"); + (tx, running_rx) } -struct Conn(SocketAddr, Handle); +/// The "connector". Clones `running` into new connections, so we can signal +/// when all connections are finally closed. +struct Conn { + addr: SocketAddr, + handle: Handle, + /// When this Sender drops, that should mean the connection is closed. + running: RefCell>>, +} -impl Connect for Conn { - type Connected = TcpStream; - type Error = ::std::io::Error; - type Future = Box>; - - fn connect(&self) -> Self::Future { - let c = TcpStream::connect(&self.0, &self.1) - .and_then(|tcp| tcp.set_nodelay(true).map(move |_| tcp)); +impl Conn { + fn connect_(&self) -> Box> { + let running = self.running + .borrow_mut() + .take() + .expect("connected more than once"); + let c = TcpStream::connect(&self.addr, &self.handle) + .and_then(|tcp| tcp.set_nodelay(true).map(move |_| tcp)) + .map(move |tcp| RunningIo { + inner: tcp, + running: running, + }); Box::new(c) } } +impl Connect for Conn { + type Connected = RunningIo; + type Error = ::std::io::Error; + type Future = Box>; + + fn connect(&self) -> Self::Future { + self.connect_() + } +} impl hyper::client::Service for Conn { type Request = hyper::Uri; - type Response = TcpStream; - type Future = Box>; + type Response = RunningIo; + type Future = Box>; type Error = ::std::io::Error; fn call(&self, _: hyper::Uri) -> ::Future { - let c = TcpStream::connect(&self.0, &self.1) - .and_then(|tcp| tcp.set_nodelay(true).map(move |_| tcp)); - Box::new(c) + self.connect_() } } + +/// A wrapper around a TcpStream, allowing us to signal when the connection +/// is dropped. +struct RunningIo { + inner: TcpStream, + /// When this drops, the related Receiver is notified that the connection + /// is closed. + running: oneshot::Sender<()>, +} + +impl io::Read for RunningIo { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.inner.read(buf) + } +} + +impl io::Write for RunningIo { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.inner.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} + +impl AsyncRead for RunningIo {} + +impl AsyncWrite for RunningIo { + fn shutdown(&mut self) -> Poll<(), io::Error> { + AsyncWrite::shutdown(&mut self.inner) + } +} + diff --git a/proxy/tests/support/controller.rs b/proxy/tests/support/controller.rs index 896a8c235..896e328bd 100644 --- a/proxy/tests/support/controller.rs +++ b/proxy/tests/support/controller.rs @@ -24,7 +24,6 @@ pub struct Controller { reports: Option>, } -#[derive(Debug)] pub struct Listening { pub addr: SocketAddr, shutdown: Shutdown, diff --git a/proxy/tests/support/mod.rs b/proxy/tests/support/mod.rs index 169494173..4cfa9ebb1 100644 --- a/proxy/tests/support/mod.rs +++ b/proxy/tests/support/mod.rs @@ -14,9 +14,16 @@ extern crate tokio_core; pub extern crate tokio_io; extern crate tower; extern crate tower_h2; +extern crate log; pub extern crate env_logger; -use self::bytes::{BigEndian, Bytes, BytesMut}; +use std::net::SocketAddr; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +pub use std::time::Duration; + +use self::bytes::{BigEndian, BytesMut}; +pub use self::bytes::Bytes; pub use self::conduit_proxy::*; pub use self::futures::*; use self::futures::sync::oneshot; @@ -27,8 +34,6 @@ use self::tokio_core::net::{TcpListener, TcpStream}; use self::tokio_core::reactor::{Core, Handle}; use self::tower::{NewService, Service}; use self::tower_h2::{Body, RecvBody}; -use std::net::SocketAddr; -pub use std::time::Duration; /// Environment variable for overriding the test patience. pub const ENV_TEST_PATIENCE_MS: &'static str = "RUST_TEST_PATIENCE_MS"; @@ -103,18 +108,31 @@ pub mod proxy; pub mod server; mod tcp; -pub type Shutdown = oneshot::Sender<()>; -pub type ShutdownRx = future::Then< - oneshot::Receiver<()>, - Result<(), ()>, - fn(Result<(), oneshot::Canceled>) -> Result<(), ()>, ->; - -pub fn shutdown_signal() -> (oneshot::Sender<()>, ShutdownRx) { +pub fn shutdown_signal() -> (Shutdown, ShutdownRx) { let (tx, rx) = oneshot::channel(); - (tx, rx.then(|_| { Ok(()) } as _)) + (Shutdown { tx }, Box::new(rx.then(|_| Ok(())))) } +pub struct Shutdown { + tx: oneshot::Sender<()>, +} + +impl Shutdown { + pub fn signal(self) { + // a drop is enough + } +} + +pub type ShutdownRx = Box + Send>; + +/// A channel used to signal when a Client's related connection is running or closed. +pub fn running() -> (oneshot::Sender<()>, Running) { + let (tx, rx) = oneshot::channel(); + let rx = Box::new(rx.then(|_| Ok::<(), ()>(()))); + (tx, rx) +} + +pub type Running = Box + Send>; struct RecvBodyStream(tower_h2::RecvBody); diff --git a/proxy/tests/support/proxy.rs b/proxy/tests/support/proxy.rs index 960b3d237..28ce62518 100644 --- a/proxy/tests/support/proxy.rs +++ b/proxy/tests/support/proxy.rs @@ -8,7 +8,6 @@ pub fn new() -> Proxy { Proxy::new() } -#[derive(Debug)] pub struct Proxy { controller: Option, inbound: Option, @@ -17,9 +16,10 @@ pub struct Proxy { metrics_flush_interval: Option, inbound_disable_ports_protocol_detection: Option>, outbound_disable_ports_protocol_detection: Option>, + + shutdown_signal: Option + Send>>, } -#[derive(Debug)] pub struct Listening { pub control: SocketAddr, pub inbound: SocketAddr, @@ -42,6 +42,7 @@ impl Proxy { metrics_flush_interval: None, inbound_disable_ports_protocol_detection: None, outbound_disable_ports_protocol_detection: None, + shutdown_signal: None, } } @@ -75,6 +76,18 @@ impl Proxy { self } + pub fn shutdown_signal(mut self, sig: F) -> Self + where + F: Future + Send + 'static, + { + // It doesn't matter what kind of future you give us, + // we'll just wrap it up in a box and trigger when + // it triggers. The results are discarded. + let fut = Box::new(sig.then(|_| Ok(()))); + self.shutdown_signal = Some(fut); + self + } + pub fn run(self) -> Listening { self.run_with_test_env(config::TestEnv::new()) } @@ -170,6 +183,10 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening { let (running_tx, running_rx) = oneshot::channel(); let (tx, mut rx) = shutdown_signal(); + if let Some(fut) = proxy.shutdown_signal { + rx = Box::new(rx.select(fut).then(|_| Ok(()))); + } + ::std::thread::Builder::new() .name("support proxy".into()) .spawn(move || { diff --git a/proxy/tests/support/server.rs b/proxy/tests/support/server.rs index 6845517fb..a1b9f224b 100644 --- a/proxy/tests/support/server.rs +++ b/proxy/tests/support/server.rs @@ -21,13 +21,11 @@ pub fn tcp() -> tcp::TcpServer { tcp::server() } -#[derive(Debug)] pub struct Server { routes: HashMap, version: Run, } -#[derive(Debug)] pub struct Listening { pub addr: SocketAddr, pub(super) shutdown: Shutdown, @@ -55,14 +53,28 @@ impl Server { Server::new(Run::Http2) } + /// Return a string body as a 200 OK response, with the string as + /// the response body. pub fn route(mut self, path: &str, resp: &str) -> Self { self.routes.insert(path.into(), Route::string(resp)); self } + /// Return a 200 OK response with no body when the path matches. + pub fn route_empty_ok(self, path: &str) -> Self { + self.route_fn(path, |_| { + Response::builder() + .header("content-length", "0") + .body(Default::default()) + .unwrap() + }) + } + + /// Call a closure when the request matches, returning a response + /// to send back. pub fn route_fn(mut self, path: &str, cb: F) -> Self where - F: Fn(Request<()>) -> Response + Send + 'static, + F: Fn(Request<()>) -> Response + Send + 'static, { self.routes.insert(path.into(), Route(Box::new(cb))); self @@ -74,7 +86,7 @@ impl Server { resp: &str, latency: Duration ) -> Self { - let resp = resp.to_owned(); + let resp = Bytes::from(resp); let route = Route(Box::new(move |_| { thread::sleep(latency); http::Response::builder() @@ -202,11 +214,11 @@ impl Body for RspBody { } } -struct Route(Box) -> Response + Send>); +struct Route(Box) -> Response + Send>); impl Route { fn string(body: &str) -> Route { - let body = body.to_owned(); + let body = Bytes::from(body); Route(Box::new(move |_| { http::Response::builder() .status(200) @@ -239,7 +251,7 @@ impl Service for Svc { let rsp = match self.0.get(req.uri().path()) { Some(route) => { (route.0)(req.map(|_| ())) - .map(|s| RspBody::new(s.as_bytes().into())) + .map(|s| RspBody::new(s)) } None => { println!("server 404: {:?}", req.uri().path()); diff --git a/proxy/tests/transparency.rs b/proxy/tests/transparency.rs index e7d7e9f0b..30af33966 100644 --- a/proxy/tests/transparency.rs +++ b/proxy/tests/transparency.rs @@ -655,7 +655,9 @@ fn http1_response_end_of_file() { fn http1_one_connection_per_host() { let _ = env_logger::try_init(); - let srv = server::http1().route("/", "hello").run(); + let srv = server::http1() + .route_empty_ok("/") + .run(); let ctrl = controller::new() .run(); let proxy = proxy::new().controller(ctrl).inbound(srv).run(); @@ -714,7 +716,9 @@ fn http1_one_connection_per_host() { fn http1_requests_without_host_have_unique_connections() { let _ = env_logger::try_init(); - let srv = server::http1().route("/", "hello").run(); + let srv = server::http1() + .route_empty_ok("/") + .run(); let ctrl = controller::new() .run(); let proxy = proxy::new().controller(ctrl).inbound(srv).run();