mirror of https://github.com/linkerd/linkerd2.git
proxy: Reload TLS config on changes (#1056)
This PR modifies the proxy's TLS code so that the TLS config files are reloaded when any of them has changed (including if they did not previously exist). If reloading the configs returns an error, we log an error and continue using the old config. Currently, this is implemented by polling the file system for the time they were last modified at a fixed interval. However, I've implemented this so that the changes are passed around as a `Stream`, and that reloading and updating the config is in a separate function the one that detects changes. Therefore, it should be fairly easy to plug in support for `inotify` (and other FS watch APIs) later, as long as we can use them to generate a `Stream` of changes. Closes #369 Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
parent
ec2433e9bd
commit
7220fb5367
|
@ -131,6 +131,7 @@ dependencies = [
|
|||
"flate2 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures-mpsc-lossy 0.3.0",
|
||||
"futures-watch 0.1.0 (git+https://github.com/carllerche/better-future)",
|
||||
"h2 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
@ -350,6 +351,15 @@ dependencies = [
|
|||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-watch"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/carllerche/better-future#07baa13e91fefe7a51533dfde7b4e69e109ebe14"
|
||||
dependencies = [
|
||||
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gzip-header"
|
||||
version = "0.1.2"
|
||||
|
@ -1477,6 +1487,7 @@ dependencies = [
|
|||
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
|
||||
"checksum futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "1a70b146671de62ec8c8ed572219ca5d594d9b06c0b364d5e67b722fc559b48c"
|
||||
"checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4"
|
||||
"checksum futures-watch 0.1.0 (git+https://github.com/carllerche/better-future)" = "<none>"
|
||||
"checksum gzip-header 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0a9fcfe1c9ee125342355b2467bc29b9dfcb2124fcae27edb9cee6f4cc5ecd40"
|
||||
"checksum h2 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "8111e316d0589775ee2bd671cdfdf3f63c9d97e21d8d16a88bb73dcf99bef7f5"
|
||||
"checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82"
|
||||
|
|
|
@ -19,6 +19,7 @@ bytes = "0.4"
|
|||
deflate = {version = "0.7.18", features = ["gzip"] }
|
||||
env_logger = { version = "0.5", default-features = false }
|
||||
futures = "0.1"
|
||||
futures-watch = { git = "https://github.com/carllerche/better-future" }
|
||||
h2 = "0.1.7"
|
||||
http = "0.1"
|
||||
httparse = "1.2"
|
||||
|
|
|
@ -97,7 +97,7 @@ impl BoundPort {
|
|||
// TLS when needed.
|
||||
pub fn listen_and_fold<T, F, Fut>(
|
||||
self,
|
||||
tls_config: Option<tls::ServerConfig>,
|
||||
tls_config: tls::ServerConfigWatch,
|
||||
initial: T,
|
||||
f: F)
|
||||
-> impl Future<Item = (), Error = io::Error> + Send + 'static
|
||||
|
@ -105,7 +105,8 @@ impl BoundPort {
|
|||
F: Fn(T, (Connection, SocketAddr)) -> Fut + Send + 'static,
|
||||
T: Send + 'static,
|
||||
Fut: IntoFuture<Item = T, Error = std::io::Error> + Send + 'static,
|
||||
<Fut as IntoFuture>::Future: Send, {
|
||||
<Fut as IntoFuture>::Future: Send,
|
||||
{
|
||||
future::lazy(move || {
|
||||
// Create the TCP listener lazily, so that it's not bound to a
|
||||
// reactor until the future is run. This will avoid
|
||||
|
@ -125,10 +126,10 @@ impl BoundPort {
|
|||
// libraries don't have the necessary API for that, so just
|
||||
// do it here.
|
||||
set_nodelay_or_warn(&socket);
|
||||
match tls_config.clone() {
|
||||
match tls_config.borrow().as_ref() {
|
||||
Some(tls_config) => {
|
||||
Either::A(
|
||||
tls::Connection::accept(socket, tls_config)
|
||||
tls::Connection::accept(socket, tls_config.clone())
|
||||
.map(move |tls| (Connection::new(Box::new(tls)), remote_addr)))
|
||||
},
|
||||
None => Either::B(future::ok((Connection::new(Box::new(socket)), remote_addr))),
|
||||
|
|
|
@ -10,6 +10,7 @@ extern crate deflate;
|
|||
#[macro_use]
|
||||
extern crate futures;
|
||||
extern crate futures_mpsc_lossy;
|
||||
extern crate futures_watch;
|
||||
extern crate h2;
|
||||
extern crate http;
|
||||
extern crate httparse;
|
||||
|
@ -229,20 +230,15 @@ where
|
|||
|
||||
let bind = Bind::new().with_sensors(sensors.clone());
|
||||
|
||||
// TODO: Load the TLS configuration asynchronously and watch for
|
||||
// changes to the files.
|
||||
let tls_config = config.tls_settings.and_then(|settings| {
|
||||
match tls::CommonConfig::load_from_disk(&settings) {
|
||||
Ok(config) => Some(config),
|
||||
Err(e) => {
|
||||
// Keep going without TLS if loading settings failed.
|
||||
error!("Error loading TLS configuration: {:?}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
});
|
||||
let tls_config_changes = config.tls_settings
|
||||
// TODO: determine the correct interval for this.
|
||||
.map(|settings| settings.stream_changes(Duration::from_secs(1)));
|
||||
|
||||
let (tls_server_config, tls_cfg_bg) =
|
||||
tls_config_changes
|
||||
.map(tls::ServerConfig::watch)
|
||||
.unwrap_or_else(tls::ServerConfig::no_tls);
|
||||
|
||||
let tls_server_config = tls_config.as_ref().map(tls::ServerConfig::from);
|
||||
|
||||
// Setup the public listener. This will listen on a publicly accessible
|
||||
// address and listen for inbound connections that should be forwarded
|
||||
|
@ -283,9 +279,11 @@ where
|
|||
config.outbound_router_capacity,
|
||||
config.outbound_router_max_idle_age,
|
||||
);
|
||||
// No TLS yet.
|
||||
let (no_tls, _) = tls::ServerConfig::no_tls();
|
||||
serve(
|
||||
outbound_listener,
|
||||
None, // No TLS
|
||||
no_tls,
|
||||
router,
|
||||
config.public_connect_timeout,
|
||||
config.outbound_ports_disable_protocol_detection,
|
||||
|
@ -318,7 +316,10 @@ where
|
|||
tap.map_err(|_| {}),
|
||||
metrics_server.map_err(|_| {}),
|
||||
::logging::admin().bg("dns-resolver").future(dns_bg),
|
||||
).map(|_| {});
|
||||
)
|
||||
// There's no `Future::join6` combinator...
|
||||
.join(::logging::admin().bg("tls-config").future(tls_cfg_bg))
|
||||
.map(|_| {});
|
||||
|
||||
rt.spawn(Box::new(fut));
|
||||
|
||||
|
@ -349,7 +350,7 @@ where
|
|||
|
||||
fn serve<R, B, E, F, G>(
|
||||
bound_port: BoundPort,
|
||||
tls_config: Option<tls::ServerConfig>,
|
||||
tls_config: tls::ServerConfigWatch,
|
||||
router: Router<R>,
|
||||
tcp_connect_timeout: Duration,
|
||||
disable_protocol_detection_ports: IndexSet<u16>,
|
||||
|
@ -504,11 +505,12 @@ where
|
|||
h2_builder,
|
||||
log.clone().executor(),
|
||||
);
|
||||
let (no_tls, _) = tls::ServerConfig::no_tls();
|
||||
|
||||
let fut = {
|
||||
let log = log.clone();
|
||||
bound_port.listen_and_fold(
|
||||
None, // No TLS
|
||||
no_tls,
|
||||
server,
|
||||
move |server, (session, remote)| {
|
||||
let log = log.clone().with_remote(remote);
|
||||
|
|
|
@ -98,12 +98,13 @@ impl Control {
|
|||
use hyper;
|
||||
|
||||
let log = ::logging::admin().server("metrics", bound_port.local_addr());
|
||||
let (no_tls, _) = ::tls::ServerConfig::no_tls();
|
||||
|
||||
let service = self.metrics_service.clone();
|
||||
let fut = {
|
||||
let log = log.clone();
|
||||
bound_port.listen_and_fold(
|
||||
None, // TODO: No TLS
|
||||
no_tls, // TODO: Serve over TLS.
|
||||
hyper::server::conn::Http::new(),
|
||||
move |hyper, (conn, remote)| {
|
||||
let service = service.clone();
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
use std::{
|
||||
fs::File,
|
||||
fs::{self, File},
|
||||
io::{self, Cursor, Read},
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant, SystemTime,},
|
||||
};
|
||||
|
||||
use super::{
|
||||
|
@ -13,6 +14,12 @@ use super::{
|
|||
webpki,
|
||||
};
|
||||
|
||||
use futures::{future, Future, Sink, Stream};
|
||||
use futures_watch::Watch;
|
||||
use tokio::timer::Interval;
|
||||
|
||||
pub type ServerConfigWatch = Watch<Option<ServerConfig>>;
|
||||
|
||||
/// Not-yet-validated settings that are used for both TLS clients and TLS
|
||||
/// servers.
|
||||
///
|
||||
|
@ -42,6 +49,7 @@ pub struct CommonConfig {
|
|||
cert_resolver: Arc<CertResolver>,
|
||||
}
|
||||
|
||||
|
||||
/// Validated configuration for TLS servers.
|
||||
#[derive(Clone)]
|
||||
pub struct ServerConfig(pub(super) Arc<rustls::ServerConfig>);
|
||||
|
@ -57,6 +65,61 @@ pub enum Error {
|
|||
TimeConversionFailed,
|
||||
}
|
||||
|
||||
impl CommonSettings {
|
||||
|
||||
fn change_timestamps(&self, interval: Duration) -> impl Stream<Item = (), Error = ()> {
|
||||
let paths = [
|
||||
self.trust_anchors.clone(),
|
||||
self.end_entity_cert.clone(),
|
||||
self.private_key.clone(),
|
||||
];
|
||||
let mut max: Option<SystemTime> = None;
|
||||
Interval::new(Instant::now(), interval)
|
||||
.map_err(|e| error!("timer error: {:?}", e))
|
||||
.filter_map(move |_| {
|
||||
for path in &paths {
|
||||
let t = fs::metadata(path)
|
||||
.and_then(|meta| meta.modified())
|
||||
.map_err(|e| if e.kind() != io::ErrorKind::NotFound {
|
||||
// Don't log if the files don't exist, since this
|
||||
// makes the logs *quite* noisy.
|
||||
warn!("metadata for {:?}: {}", path, e)
|
||||
})
|
||||
.ok();
|
||||
if t > max {
|
||||
max = t;
|
||||
trace!("{:?} changed at {:?}", path, t);
|
||||
return Some(());
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
}
|
||||
|
||||
/// Stream changes to the files described by this `CommonSettings`.
|
||||
///
|
||||
/// This will poll the filesystem for changes to the files at the paths
|
||||
/// described by this `CommonSettings` every `interval`, and attempt to
|
||||
/// load a new `CommonConfig` from the files again after each change.
|
||||
///
|
||||
/// The returned stream consists of each subsequent successfully loaded
|
||||
/// `CommonSettings` after each change. If the settings could not be
|
||||
/// reloaded (i.e., they were malformed), nothing is sent.
|
||||
///
|
||||
/// TODO: On Linux, this should be replaced with an `inotify` watch when
|
||||
/// available.
|
||||
pub fn stream_changes(self, interval: Duration)
|
||||
-> impl Stream<Item = CommonConfig, Error = ()>
|
||||
{
|
||||
self.change_timestamps(interval)
|
||||
.filter_map(move |_|
|
||||
CommonConfig::load_from_disk(&self)
|
||||
.map_err(|e| warn!("error reloading TLS config: {:?}, falling back", e))
|
||||
.ok()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl CommonConfig {
|
||||
/// Loads a configuration from the given files and validates it. If an
|
||||
/// error is returned then the caller should try again after the files are
|
||||
|
@ -99,6 +162,7 @@ impl CommonConfig {
|
|||
cert_resolver: Arc::new(cert_resolver),
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl ServerConfig {
|
||||
|
@ -108,6 +172,37 @@ impl ServerConfig {
|
|||
config.cert_resolver = common.cert_resolver.clone();
|
||||
ServerConfig(Arc::new(config))
|
||||
}
|
||||
|
||||
/// Watch a `Stream` of changes to a `CommonConfig`, such as those returned by
|
||||
/// `CommonSettings::stream_changes`, and update a `futures_watch::Watch` cell
|
||||
/// with a `ServerConfig` generated from each change.
|
||||
pub fn watch<C>(changes: C)
|
||||
-> (ServerConfigWatch, Box<Future<Item=(), Error=()> + Send>)
|
||||
where
|
||||
C: Stream<Item = CommonConfig, Error = ()> + Send + 'static,
|
||||
{
|
||||
let (watch, store) = Watch::new(None);
|
||||
let server_configs = changes.map(|ref config| Self::from(config));
|
||||
let store = store
|
||||
.sink_map_err(|_| warn!("all server config watches dropped"));
|
||||
let f = server_configs.map(Some).forward(store)
|
||||
.map(|_| trace!("forwarding to server config watch finished."));
|
||||
|
||||
// NOTE: This function and `no_tls` return `Box<Future<...>>` rather
|
||||
// than `impl Future<...>` so that they can have the _same_
|
||||
// return types (impl Traits are not the same type unless the
|
||||
// original non-anonymized type was the same).
|
||||
(watch, Box::new(f))
|
||||
}
|
||||
|
||||
pub fn no_tls()
|
||||
-> (ServerConfigWatch, Box<Future<Item = (), Error = ()> + Send>)
|
||||
{
|
||||
let (watch, _) = Watch::new(None);
|
||||
let no_future = future::ok(());
|
||||
|
||||
(watch, Box::new(no_future))
|
||||
}
|
||||
}
|
||||
|
||||
fn load_file_contents(path: &PathBuf) -> Result<Vec<u8>, Error> {
|
||||
|
|
|
@ -10,6 +10,6 @@ mod cert_resolver;
|
|||
mod connection;
|
||||
|
||||
pub use self::{
|
||||
config::{CommonSettings, CommonConfig, Error, ServerConfig},
|
||||
config::{CommonSettings, CommonConfig, Error, ServerConfig, ServerConfigWatch},
|
||||
connection::Connection,
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue