diff --git a/Cargo.lock b/Cargo.lock index d51926afd..ca2cf15fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,6 +124,7 @@ dependencies = [ "tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)", "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-signal 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "tower 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -399,6 +400,15 @@ dependencies = [ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "mio-uds" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "miow" version = "0.2.1" @@ -722,6 +732,20 @@ dependencies = [ "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tokio-signal" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)", + "mio-uds 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tower" version = "0.1.0" @@ -956,6 +980,7 @@ dependencies = [ "checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2" "checksum mime 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e2e00e17be181010a91dbfefb01660b17311059dc8c7f48b9017677721e732bd" "checksum mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)" = "6d771e3ef92d58a8da8df7d6976bfca9371ed1de6619d9d5a5ce5b1f29b85bfe" +"checksum mio-uds 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1731a873077147b626d89cc6c2a0db6288d607496c5d10c0cfcf3adc697ec673" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" "checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151" "checksum net2 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)" = "3a80f842784ef6c9a958b68b7516bc7e35883c614004dd94959a4dca1b716c09" @@ -994,6 +1019,7 @@ dependencies = [ "checksum tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "52b4e32d8edbf29501aabb3570f027c6ceb00ccef6538f4bddba0200503e74e8" "checksum tokio-io 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "b9532748772222bf70297ec0e2ad0f17213b4a7dd0e6afb68e0a0768f69f4e4f" "checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162" +"checksum tokio-signal 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e8f46863230f9a05cf52d173721ec391b9c5782a2465f593029922b8782b9ffe" "checksum tower 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)" = "" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index f462e2aab..3ee199113 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -30,6 +30,7 @@ rand = "0.4" tokio-core = "0.1" tokio-io = "0.1" +tokio-signal = "0.1" prost = "0.3.0" prost-types = "0.3.0" diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 3fe35cea3..812453965 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -103,6 +103,8 @@ pub struct Main { outbound_listener: BoundPort, get_original_dst: G, + + reactor: Core, } impl Main @@ -117,12 +119,16 @@ where .expect("public listener bind"); let outbound_listener = BoundPort::new(config.private_listener.addr) .expect("private listener bind"); + + let reactor = Core::new().expect("reactor"); + Main { config, control_listener, inbound_listener, outbound_listener, get_original_dst, + reactor, } } @@ -139,8 +145,8 @@ where self.outbound_listener.local_addr() } - pub fn run(self) { - self.run_until(::futures::future::empty()); + pub fn handle(&self) -> Handle { + self.reactor.handle() } pub fn run_until(self, shutdown_signal: F) @@ -155,6 +161,7 @@ where inbound_listener, outbound_listener, get_original_dst, + reactor: mut core, } = self; let control_host_and_port = config.control_host_and_port.clone(); @@ -175,7 +182,6 @@ where let (control, control_bg) = control::new(); - let mut core = Core::new().expect("executor"); let executor = core.handle(); let dns_config = dns::Config::from_file(&config.resolv_conf_path); diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 0c07eb88c..8a3cff498 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -1,8 +1,13 @@ #![deny(warnings)] + extern crate conduit_proxy; +#[macro_use] extern crate log; + use std::process; +mod signal; + // Look in lib.rs. fn main() { // Load configuration. @@ -13,5 +18,7 @@ fn main() { process::exit(64) } }; - conduit_proxy::Main::new(config, conduit_proxy::SoOriginalDst).run(); + let main = conduit_proxy::Main::new(config, conduit_proxy::SoOriginalDst); + let shutdown_signal = signal::shutdown(&main.handle()); + main.run_until(shutdown_signal); } diff --git a/proxy/src/signal.rs b/proxy/src/signal.rs new file mode 100644 index 000000000..9b32e1091 --- /dev/null +++ b/proxy/src/signal.rs @@ -0,0 +1,97 @@ +//! Unix signal handling for the proxy binary. + +extern crate futures; +extern crate tokio_core; +extern crate tokio_signal; + +use self::futures::Future; +use self::tokio_core::reactor::Handle; + +type ShutdownSignal = Box + Send>; + +/// Returns a `Future` that completes when the proxy should start to shutdown. +pub fn shutdown(handle: &Handle) -> ShutdownSignal { + imp::shutdown(handle) +} + +#[cfg(unix)] +mod imp { + use std::fmt; + + use super::futures::{future, Future, Stream}; + use super::tokio_signal::unix::{Signal, SIGINT, SIGTERM}; + use super::{Handle, ShutdownSignal}; + + pub(super) fn shutdown(handle: &Handle) -> ShutdownSignal { + // SIGTERM - Kubernetes sends this to start a graceful shutdown. + // SIGINT - To allow Ctrl-c to emulate SIGTERM while developing. + // + // If you add to this list, you should also update DisplaySignal below + // to output nicer signal names. + let signals = [SIGINT, SIGTERM] + .into_iter() + .map(|&sig| { + // Create a Future that completes the first + // time the process receives 'sig'. + Signal::new(sig, handle) + .flatten_stream() + .into_future() + .map(move |_| { + info!( + // use target to remove 'imp' from output + target: "conduit_proxy::signal", + "received {}, starting shutdown", + DisplaySignal(sig), + ); + }) + }); + + // With a list of Futures that could notify us, + // we just want to know when the first one triggers, + // so select over them all. + let on_any_signal = future::select_all(signals) + .map(|_| ()) + .map_err(|_| unreachable!("Signal never returns an error")); + + Box::new(on_any_signal) + } + + struct DisplaySignal(i32); + + impl fmt::Display for DisplaySignal { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let s = match self.0 { + SIGINT => "SIGINT", + SIGTERM => "SIGTERM", + other => return write!(f, "signal {}", other), + }; + f.write_str(s) + } + } +} + +#[cfg(not(unix))] +mod imp { + use super::{tokio_signal, Handle, ShutdownSignal}; + use super::futures::{Future, Stream}; + + pub(super) fn shutdown(handle: &Handle) -> ShutdownSignal { + // On Windows, we don't have all the signals, but Windows also + // isn't our expected deployment target. This implementation allows + // developers on Windows to simulate proxy graceful shutdown + // by pressing Ctrl-C. + let on_ctrl_c = tokio_signal::ctrl_c(handle) + .flatten_stream() + .into_future() + .map(|_| { + info!( + // use target to remove 'imp' from output + target: "conduit_proxy::signal", + "received Ctrl-C, starting shutdown", + ); + }) + .map_err(|_| unreachable!("ctrl_c never returns errors")); + + Box::new(on_ctrl_c) + } +} diff --git a/proxy/tests/support/proxy.rs b/proxy/tests/support/proxy.rs index dc0302fea..71ab97bec 100644 --- a/proxy/tests/support/proxy.rs +++ b/proxy/tests/support/proxy.rs @@ -130,35 +130,47 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening { config.metrics_flush_interval = dur; } - let mock_orig_dst = MockOriginalDst(Arc::new(Mutex::new(mock_orig_dst))); - let main = conduit_proxy::Main::new(config, mock_orig_dst.clone()); - - let control_addr = main.control_addr(); - let inbound_addr = main.inbound_addr(); - let outbound_addr = main.outbound_addr(); - - { - let mut inner = mock_orig_dst.0.lock().unwrap(); - inner.inbound_local_addr = Some(inbound_addr); - inner.outbound_local_addr = Some(outbound_addr); - } - - let (running_tx, running_rx) = shutdown_signal(); - let (tx, rx) = shutdown_signal(); + let (running_tx, running_rx) = oneshot::channel(); + let (tx, mut rx) = shutdown_signal(); ::std::thread::Builder::new() .name("support proxy".into()) .spawn(move || { let _c = controller; - let _ = running_tx.send(()); - main.run_until(rx); + let mock_orig_dst = MockOriginalDst(Arc::new(Mutex::new(mock_orig_dst))); + + let main = conduit_proxy::Main::new(config, mock_orig_dst.clone()); + + let control_addr = main.control_addr(); + let inbound_addr = main.inbound_addr(); + let outbound_addr = main.outbound_addr(); + + { + let mut inner = mock_orig_dst.0.lock().unwrap(); + inner.inbound_local_addr = Some(inbound_addr); + inner.outbound_local_addr = Some(outbound_addr); + } + + // slip the running tx into the shutdown future, since the first time + // the shutdown future is polled, that means all of the proxy is now + // running. + let addrs = (control_addr, inbound_addr, outbound_addr); + let mut running = Some((running_tx, addrs)); + let on_shutdown = future::poll_fn(move || { + if let Some((tx, addrs)) = running.take() { + let _ = tx.send(addrs); + } + + rx.poll() + }); + + main.run_until(on_shutdown); }) .unwrap(); - running_rx.wait().unwrap(); - ::std::thread::sleep(::std::time::Duration::from_millis(100)); + let (control_addr, inbound_addr, outbound_addr) = running_rx.wait().unwrap(); Listening { control: control_addr,