mirror of https://github.com/linkerd/linkerd2.git
proxy: add SIGTERM and SIGINT handlers (#581)
When the proxy is run in a Docker container, it runs as PID 1, with no default signal handlers setup. In order to react to signals from Kubernetes about shutting down, we need to set up explicit handlers. This adds handlers for SIGTERM and SIGINT. Closes #549
This commit is contained in:
parent
e7c4a9d4b9
commit
cd59465366
|
@ -124,6 +124,7 @@ dependencies = [
|
|||
"tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)",
|
||||
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-io 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-signal 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tower 0.1.0 (git+https://github.com/tower-rs/tower)",
|
||||
"tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)",
|
||||
"tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)",
|
||||
|
@ -399,6 +400,15 @@ dependencies = [
|
|||
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio-uds"
|
||||
version = "0.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miow"
|
||||
version = "0.2.1"
|
||||
|
@ -722,6 +732,20 @@ dependencies = [
|
|||
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-signal"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
dependencies = [
|
||||
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"mio-uds 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-io 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.1.0"
|
||||
|
@ -956,6 +980,7 @@ dependencies = [
|
|||
"checksum log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "89f010e843f2b1a31dbd316b3b8d443758bc634bed37aabade59c686d644e0a2"
|
||||
"checksum mime 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e2e00e17be181010a91dbfefb01660b17311059dc8c7f48b9017677721e732bd"
|
||||
"checksum mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)" = "6d771e3ef92d58a8da8df7d6976bfca9371ed1de6619d9d5a5ce5b1f29b85bfe"
|
||||
"checksum mio-uds 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1731a873077147b626d89cc6c2a0db6288d607496c5d10c0cfcf3adc697ec673"
|
||||
"checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919"
|
||||
"checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151"
|
||||
"checksum net2 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)" = "3a80f842784ef6c9a958b68b7516bc7e35883c614004dd94959a4dca1b716c09"
|
||||
|
@ -994,6 +1019,7 @@ dependencies = [
|
|||
"checksum tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "52b4e32d8edbf29501aabb3570f027c6ceb00ccef6538f4bddba0200503e74e8"
|
||||
"checksum tokio-io 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "b9532748772222bf70297ec0e2ad0f17213b4a7dd0e6afb68e0a0768f69f4e4f"
|
||||
"checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162"
|
||||
"checksum tokio-signal 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e8f46863230f9a05cf52d173721ec391b9c5782a2465f593029922b8782b9ffe"
|
||||
"checksum tower 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
|
||||
"checksum tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
|
||||
"checksum tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
|
||||
|
|
|
@ -30,6 +30,7 @@ rand = "0.4"
|
|||
|
||||
tokio-core = "0.1"
|
||||
tokio-io = "0.1"
|
||||
tokio-signal = "0.1"
|
||||
|
||||
prost = "0.3.0"
|
||||
prost-types = "0.3.0"
|
||||
|
|
|
@ -103,6 +103,8 @@ pub struct Main<G> {
|
|||
outbound_listener: BoundPort,
|
||||
|
||||
get_original_dst: G,
|
||||
|
||||
reactor: Core,
|
||||
}
|
||||
|
||||
impl<G> Main<G>
|
||||
|
@ -117,12 +119,16 @@ where
|
|||
.expect("public listener bind");
|
||||
let outbound_listener = BoundPort::new(config.private_listener.addr)
|
||||
.expect("private listener bind");
|
||||
|
||||
let reactor = Core::new().expect("reactor");
|
||||
|
||||
Main {
|
||||
config,
|
||||
control_listener,
|
||||
inbound_listener,
|
||||
outbound_listener,
|
||||
get_original_dst,
|
||||
reactor,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,8 +145,8 @@ where
|
|||
self.outbound_listener.local_addr()
|
||||
}
|
||||
|
||||
pub fn run(self) {
|
||||
self.run_until(::futures::future::empty());
|
||||
pub fn handle(&self) -> Handle {
|
||||
self.reactor.handle()
|
||||
}
|
||||
|
||||
pub fn run_until<F>(self, shutdown_signal: F)
|
||||
|
@ -155,6 +161,7 @@ where
|
|||
inbound_listener,
|
||||
outbound_listener,
|
||||
get_original_dst,
|
||||
reactor: mut core,
|
||||
} = self;
|
||||
|
||||
let control_host_and_port = config.control_host_and_port.clone();
|
||||
|
@ -175,7 +182,6 @@ where
|
|||
|
||||
let (control, control_bg) = control::new();
|
||||
|
||||
let mut core = Core::new().expect("executor");
|
||||
let executor = core.handle();
|
||||
|
||||
let dns_config = dns::Config::from_file(&config.resolv_conf_path);
|
||||
|
|
|
@ -1,8 +1,13 @@
|
|||
#![deny(warnings)]
|
||||
|
||||
extern crate conduit_proxy;
|
||||
|
||||
#[macro_use] extern crate log;
|
||||
|
||||
use std::process;
|
||||
|
||||
mod signal;
|
||||
|
||||
// Look in lib.rs.
|
||||
fn main() {
|
||||
// Load configuration.
|
||||
|
@ -13,5 +18,7 @@ fn main() {
|
|||
process::exit(64)
|
||||
}
|
||||
};
|
||||
conduit_proxy::Main::new(config, conduit_proxy::SoOriginalDst).run();
|
||||
let main = conduit_proxy::Main::new(config, conduit_proxy::SoOriginalDst);
|
||||
let shutdown_signal = signal::shutdown(&main.handle());
|
||||
main.run_until(shutdown_signal);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
//! Unix signal handling for the proxy binary.
|
||||
|
||||
extern crate futures;
|
||||
extern crate tokio_core;
|
||||
extern crate tokio_signal;
|
||||
|
||||
use self::futures::Future;
|
||||
use self::tokio_core::reactor::Handle;
|
||||
|
||||
type ShutdownSignal = Box<Future<Item=(), Error=()> + Send>;
|
||||
|
||||
/// Returns a `Future` that completes when the proxy should start to shutdown.
|
||||
pub fn shutdown(handle: &Handle) -> ShutdownSignal {
|
||||
imp::shutdown(handle)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
mod imp {
|
||||
use std::fmt;
|
||||
|
||||
use super::futures::{future, Future, Stream};
|
||||
use super::tokio_signal::unix::{Signal, SIGINT, SIGTERM};
|
||||
use super::{Handle, ShutdownSignal};
|
||||
|
||||
pub(super) fn shutdown(handle: &Handle) -> ShutdownSignal {
|
||||
// SIGTERM - Kubernetes sends this to start a graceful shutdown.
|
||||
// SIGINT - To allow Ctrl-c to emulate SIGTERM while developing.
|
||||
//
|
||||
// If you add to this list, you should also update DisplaySignal below
|
||||
// to output nicer signal names.
|
||||
let signals = [SIGINT, SIGTERM]
|
||||
.into_iter()
|
||||
.map(|&sig| {
|
||||
// Create a Future that completes the first
|
||||
// time the process receives 'sig'.
|
||||
Signal::new(sig, handle)
|
||||
.flatten_stream()
|
||||
.into_future()
|
||||
.map(move |_| {
|
||||
info!(
|
||||
// use target to remove 'imp' from output
|
||||
target: "conduit_proxy::signal",
|
||||
"received {}, starting shutdown",
|
||||
DisplaySignal(sig),
|
||||
);
|
||||
})
|
||||
});
|
||||
|
||||
// With a list of Futures that could notify us,
|
||||
// we just want to know when the first one triggers,
|
||||
// so select over them all.
|
||||
let on_any_signal = future::select_all(signals)
|
||||
.map(|_| ())
|
||||
.map_err(|_| unreachable!("Signal never returns an error"));
|
||||
|
||||
Box::new(on_any_signal)
|
||||
}
|
||||
|
||||
struct DisplaySignal(i32);
|
||||
|
||||
impl fmt::Display for DisplaySignal {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let s = match self.0 {
|
||||
SIGINT => "SIGINT",
|
||||
SIGTERM => "SIGTERM",
|
||||
other => return write!(f, "signal {}", other),
|
||||
};
|
||||
f.write_str(s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
mod imp {
|
||||
use super::{tokio_signal, Handle, ShutdownSignal};
|
||||
use super::futures::{Future, Stream};
|
||||
|
||||
pub(super) fn shutdown(handle: &Handle) -> ShutdownSignal {
|
||||
// On Windows, we don't have all the signals, but Windows also
|
||||
// isn't our expected deployment target. This implementation allows
|
||||
// developers on Windows to simulate proxy graceful shutdown
|
||||
// by pressing Ctrl-C.
|
||||
let on_ctrl_c = tokio_signal::ctrl_c(handle)
|
||||
.flatten_stream()
|
||||
.into_future()
|
||||
.map(|_| {
|
||||
info!(
|
||||
// use target to remove 'imp' from output
|
||||
target: "conduit_proxy::signal",
|
||||
"received Ctrl-C, starting shutdown",
|
||||
);
|
||||
})
|
||||
.map_err(|_| unreachable!("ctrl_c never returns errors"));
|
||||
|
||||
Box::new(on_ctrl_c)
|
||||
}
|
||||
}
|
|
@ -130,6 +130,15 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening {
|
|||
config.metrics_flush_interval = dur;
|
||||
}
|
||||
|
||||
|
||||
let (running_tx, running_rx) = oneshot::channel();
|
||||
let (tx, mut rx) = shutdown_signal();
|
||||
|
||||
::std::thread::Builder::new()
|
||||
.name("support proxy".into())
|
||||
.spawn(move || {
|
||||
let _c = controller;
|
||||
|
||||
let mock_orig_dst = MockOriginalDst(Arc::new(Mutex::new(mock_orig_dst)));
|
||||
|
||||
let main = conduit_proxy::Main::new(config, mock_orig_dst.clone());
|
||||
|
@ -144,21 +153,24 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening {
|
|||
inner.outbound_local_addr = Some(outbound_addr);
|
||||
}
|
||||
|
||||
let (running_tx, running_rx) = shutdown_signal();
|
||||
let (tx, rx) = shutdown_signal();
|
||||
// slip the running tx into the shutdown future, since the first time
|
||||
// the shutdown future is polled, that means all of the proxy is now
|
||||
// running.
|
||||
let addrs = (control_addr, inbound_addr, outbound_addr);
|
||||
let mut running = Some((running_tx, addrs));
|
||||
let on_shutdown = future::poll_fn(move || {
|
||||
if let Some((tx, addrs)) = running.take() {
|
||||
let _ = tx.send(addrs);
|
||||
}
|
||||
|
||||
::std::thread::Builder::new()
|
||||
.name("support proxy".into())
|
||||
.spawn(move || {
|
||||
let _c = controller;
|
||||
rx.poll()
|
||||
});
|
||||
|
||||
let _ = running_tx.send(());
|
||||
main.run_until(rx);
|
||||
main.run_until(on_shutdown);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
running_rx.wait().unwrap();
|
||||
::std::thread::sleep(::std::time::Duration::from_millis(100));
|
||||
let (control_addr, inbound_addr, outbound_addr) = running_rx.wait().unwrap();
|
||||
|
||||
Listening {
|
||||
control: control_addr,
|
||||
|
|
Loading…
Reference in New Issue