proxy: Reload TLS config on changes (#1056)

This PR modifies the proxy's TLS code so that the TLS config files are reloaded
when any of them has changed (including if they did not previously exist).

If reloading the configs returns an error, we log an error and continue using
the old config.

Currently, this is implemented by polling the file system for the time they
were last modified at a fixed interval. However, I've implemented this so 
that the changes are passed around as a `Stream`, and that reloading and
updating the config is in a separate function the one that detects changes.
Therefore, it should be fairly easy to plug in support for `inotify` (and 
other FS watch APIs) later, as long as we can use them to generate a 
`Stream` of changes.

Closes #369 

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Eliza Weisman 2018-06-04 13:36:28 -07:00 committed by GitHub
parent f73e34e0d8
commit 896fe75929
7 changed files with 135 additions and 24 deletions

11
Cargo.lock generated
View File

@ -131,6 +131,7 @@ dependencies = [
"flate2 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "flate2 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-mpsc-lossy 0.3.0", "futures-mpsc-lossy 0.3.0",
"futures-watch 0.1.0 (git+https://github.com/carllerche/better-future)",
"h2 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", "h2 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
@ -350,6 +351,15 @@ dependencies = [
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "futures-watch"
version = "0.1.0"
source = "git+https://github.com/carllerche/better-future#07baa13e91fefe7a51533dfde7b4e69e109ebe14"
dependencies = [
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "gzip-header" name = "gzip-header"
version = "0.1.2" version = "0.1.2"
@ -1477,6 +1487,7 @@ dependencies = [
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
"checksum futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "1a70b146671de62ec8c8ed572219ca5d594d9b06c0b364d5e67b722fc559b48c" "checksum futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)" = "1a70b146671de62ec8c8ed572219ca5d594d9b06c0b364d5e67b722fc559b48c"
"checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4"
"checksum futures-watch 0.1.0 (git+https://github.com/carllerche/better-future)" = "<none>"
"checksum gzip-header 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0a9fcfe1c9ee125342355b2467bc29b9dfcb2124fcae27edb9cee6f4cc5ecd40" "checksum gzip-header 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0a9fcfe1c9ee125342355b2467bc29b9dfcb2124fcae27edb9cee6f4cc5ecd40"
"checksum h2 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "8111e316d0589775ee2bd671cdfdf3f63c9d97e21d8d16a88bb73dcf99bef7f5" "checksum h2 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "8111e316d0589775ee2bd671cdfdf3f63c9d97e21d8d16a88bb73dcf99bef7f5"
"checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82" "checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82"

View File

@ -19,6 +19,7 @@ bytes = "0.4"
deflate = {version = "0.7.18", features = ["gzip"] } deflate = {version = "0.7.18", features = ["gzip"] }
env_logger = { version = "0.5", default-features = false } env_logger = { version = "0.5", default-features = false }
futures = "0.1" futures = "0.1"
futures-watch = { git = "https://github.com/carllerche/better-future" }
h2 = "0.1.7" h2 = "0.1.7"
http = "0.1" http = "0.1"
httparse = "1.2" httparse = "1.2"

View File

@ -97,7 +97,7 @@ impl BoundPort {
// TLS when needed. // TLS when needed.
pub fn listen_and_fold<T, F, Fut>( pub fn listen_and_fold<T, F, Fut>(
self, self,
tls_config: Option<tls::ServerConfig>, tls_config: tls::ServerConfigWatch,
initial: T, initial: T,
f: F) f: F)
-> impl Future<Item = (), Error = io::Error> + Send + 'static -> impl Future<Item = (), Error = io::Error> + Send + 'static
@ -105,7 +105,8 @@ impl BoundPort {
F: Fn(T, (Connection, SocketAddr)) -> Fut + Send + 'static, F: Fn(T, (Connection, SocketAddr)) -> Fut + Send + 'static,
T: Send + 'static, T: Send + 'static,
Fut: IntoFuture<Item = T, Error = std::io::Error> + Send + 'static, Fut: IntoFuture<Item = T, Error = std::io::Error> + Send + 'static,
<Fut as IntoFuture>::Future: Send, { <Fut as IntoFuture>::Future: Send,
{
future::lazy(move || { future::lazy(move || {
// Create the TCP listener lazily, so that it's not bound to a // Create the TCP listener lazily, so that it's not bound to a
// reactor until the future is run. This will avoid // reactor until the future is run. This will avoid
@ -125,10 +126,10 @@ impl BoundPort {
// libraries don't have the necessary API for that, so just // libraries don't have the necessary API for that, so just
// do it here. // do it here.
set_nodelay_or_warn(&socket); set_nodelay_or_warn(&socket);
match tls_config.clone() { match tls_config.borrow().as_ref() {
Some(tls_config) => { Some(tls_config) => {
Either::A( Either::A(
tls::Connection::accept(socket, tls_config) tls::Connection::accept(socket, tls_config.clone())
.map(move |tls| (Connection::new(Box::new(tls)), remote_addr))) .map(move |tls| (Connection::new(Box::new(tls)), remote_addr)))
}, },
None => Either::B(future::ok((Connection::new(Box::new(socket)), remote_addr))), None => Either::B(future::ok((Connection::new(Box::new(socket)), remote_addr))),

View File

@ -10,6 +10,7 @@ extern crate deflate;
#[macro_use] #[macro_use]
extern crate futures; extern crate futures;
extern crate futures_mpsc_lossy; extern crate futures_mpsc_lossy;
extern crate futures_watch;
extern crate h2; extern crate h2;
extern crate http; extern crate http;
extern crate httparse; extern crate httparse;
@ -229,20 +230,15 @@ where
let bind = Bind::new().with_sensors(sensors.clone()); let bind = Bind::new().with_sensors(sensors.clone());
// TODO: Load the TLS configuration asynchronously and watch for let tls_config_changes = config.tls_settings
// changes to the files. // TODO: determine the correct interval for this.
let tls_config = config.tls_settings.and_then(|settings| { .map(|settings| settings.stream_changes(Duration::from_secs(1)));
match tls::CommonConfig::load_from_disk(&settings) {
Ok(config) => Some(config), let (tls_server_config, tls_cfg_bg) =
Err(e) => { tls_config_changes
// Keep going without TLS if loading settings failed. .map(tls::ServerConfig::watch)
error!("Error loading TLS configuration: {:?}", e); .unwrap_or_else(tls::ServerConfig::no_tls);
None
}
}
});
let tls_server_config = tls_config.as_ref().map(tls::ServerConfig::from);
// Setup the public listener. This will listen on a publicly accessible // Setup the public listener. This will listen on a publicly accessible
// address and listen for inbound connections that should be forwarded // address and listen for inbound connections that should be forwarded
@ -283,9 +279,11 @@ where
config.outbound_router_capacity, config.outbound_router_capacity,
config.outbound_router_max_idle_age, config.outbound_router_max_idle_age,
); );
// No TLS yet.
let (no_tls, _) = tls::ServerConfig::no_tls();
serve( serve(
outbound_listener, outbound_listener,
None, // No TLS no_tls,
router, router,
config.public_connect_timeout, config.public_connect_timeout,
config.outbound_ports_disable_protocol_detection, config.outbound_ports_disable_protocol_detection,
@ -318,7 +316,10 @@ where
tap.map_err(|_| {}), tap.map_err(|_| {}),
metrics_server.map_err(|_| {}), metrics_server.map_err(|_| {}),
::logging::admin().bg("dns-resolver").future(dns_bg), ::logging::admin().bg("dns-resolver").future(dns_bg),
).map(|_| {}); )
// There's no `Future::join6` combinator...
.join(::logging::admin().bg("tls-config").future(tls_cfg_bg))
.map(|_| {});
rt.spawn(Box::new(fut)); rt.spawn(Box::new(fut));
@ -349,7 +350,7 @@ where
fn serve<R, B, E, F, G>( fn serve<R, B, E, F, G>(
bound_port: BoundPort, bound_port: BoundPort,
tls_config: Option<tls::ServerConfig>, tls_config: tls::ServerConfigWatch,
router: Router<R>, router: Router<R>,
tcp_connect_timeout: Duration, tcp_connect_timeout: Duration,
disable_protocol_detection_ports: IndexSet<u16>, disable_protocol_detection_ports: IndexSet<u16>,
@ -504,11 +505,12 @@ where
h2_builder, h2_builder,
log.clone().executor(), log.clone().executor(),
); );
let (no_tls, _) = tls::ServerConfig::no_tls();
let fut = { let fut = {
let log = log.clone(); let log = log.clone();
bound_port.listen_and_fold( bound_port.listen_and_fold(
None, // No TLS no_tls,
server, server,
move |server, (session, remote)| { move |server, (session, remote)| {
let log = log.clone().with_remote(remote); let log = log.clone().with_remote(remote);

View File

@ -98,12 +98,13 @@ impl Control {
use hyper; use hyper;
let log = ::logging::admin().server("metrics", bound_port.local_addr()); let log = ::logging::admin().server("metrics", bound_port.local_addr());
let (no_tls, _) = ::tls::ServerConfig::no_tls();
let service = self.metrics_service.clone(); let service = self.metrics_service.clone();
let fut = { let fut = {
let log = log.clone(); let log = log.clone();
bound_port.listen_and_fold( bound_port.listen_and_fold(
None, // TODO: No TLS no_tls, // TODO: Serve over TLS.
hyper::server::conn::Http::new(), hyper::server::conn::Http::new(),
move |hyper, (conn, remote)| { move |hyper, (conn, remote)| {
let service = service.clone(); let service = service.clone();

View File

@ -1,8 +1,9 @@
use std::{ use std::{
fs::File, fs::{self, File},
io::{self, Cursor, Read}, io::{self, Cursor, Read},
path::PathBuf, path::PathBuf,
sync::Arc, sync::Arc,
time::{Duration, Instant, SystemTime,},
}; };
use super::{ use super::{
@ -13,6 +14,12 @@ use super::{
webpki, webpki,
}; };
use futures::{future, Future, Sink, Stream};
use futures_watch::Watch;
use tokio::timer::Interval;
pub type ServerConfigWatch = Watch<Option<ServerConfig>>;
/// Not-yet-validated settings that are used for both TLS clients and TLS /// Not-yet-validated settings that are used for both TLS clients and TLS
/// servers. /// servers.
/// ///
@ -42,6 +49,7 @@ pub struct CommonConfig {
cert_resolver: Arc<CertResolver>, cert_resolver: Arc<CertResolver>,
} }
/// Validated configuration for TLS servers. /// Validated configuration for TLS servers.
#[derive(Clone)] #[derive(Clone)]
pub struct ServerConfig(pub(super) Arc<rustls::ServerConfig>); pub struct ServerConfig(pub(super) Arc<rustls::ServerConfig>);
@ -57,6 +65,61 @@ pub enum Error {
TimeConversionFailed, TimeConversionFailed,
} }
impl CommonSettings {
fn change_timestamps(&self, interval: Duration) -> impl Stream<Item = (), Error = ()> {
let paths = [
self.trust_anchors.clone(),
self.end_entity_cert.clone(),
self.private_key.clone(),
];
let mut max: Option<SystemTime> = None;
Interval::new(Instant::now(), interval)
.map_err(|e| error!("timer error: {:?}", e))
.filter_map(move |_| {
for path in &paths {
let t = fs::metadata(path)
.and_then(|meta| meta.modified())
.map_err(|e| if e.kind() != io::ErrorKind::NotFound {
// Don't log if the files don't exist, since this
// makes the logs *quite* noisy.
warn!("metadata for {:?}: {}", path, e)
})
.ok();
if t > max {
max = t;
trace!("{:?} changed at {:?}", path, t);
return Some(());
}
}
None
})
}
/// Stream changes to the files described by this `CommonSettings`.
///
/// This will poll the filesystem for changes to the files at the paths
/// described by this `CommonSettings` every `interval`, and attempt to
/// load a new `CommonConfig` from the files again after each change.
///
/// The returned stream consists of each subsequent successfully loaded
/// `CommonSettings` after each change. If the settings could not be
/// reloaded (i.e., they were malformed), nothing is sent.
///
/// TODO: On Linux, this should be replaced with an `inotify` watch when
/// available.
pub fn stream_changes(self, interval: Duration)
-> impl Stream<Item = CommonConfig, Error = ()>
{
self.change_timestamps(interval)
.filter_map(move |_|
CommonConfig::load_from_disk(&self)
.map_err(|e| warn!("error reloading TLS config: {:?}, falling back", e))
.ok()
)
}
}
impl CommonConfig { impl CommonConfig {
/// Loads a configuration from the given files and validates it. If an /// Loads a configuration from the given files and validates it. If an
/// error is returned then the caller should try again after the files are /// error is returned then the caller should try again after the files are
@ -99,6 +162,7 @@ impl CommonConfig {
cert_resolver: Arc::new(cert_resolver), cert_resolver: Arc::new(cert_resolver),
}) })
} }
} }
impl ServerConfig { impl ServerConfig {
@ -108,6 +172,37 @@ impl ServerConfig {
config.cert_resolver = common.cert_resolver.clone(); config.cert_resolver = common.cert_resolver.clone();
ServerConfig(Arc::new(config)) ServerConfig(Arc::new(config))
} }
/// Watch a `Stream` of changes to a `CommonConfig`, such as those returned by
/// `CommonSettings::stream_changes`, and update a `futures_watch::Watch` cell
/// with a `ServerConfig` generated from each change.
pub fn watch<C>(changes: C)
-> (ServerConfigWatch, Box<Future<Item=(), Error=()> + Send>)
where
C: Stream<Item = CommonConfig, Error = ()> + Send + 'static,
{
let (watch, store) = Watch::new(None);
let server_configs = changes.map(|ref config| Self::from(config));
let store = store
.sink_map_err(|_| warn!("all server config watches dropped"));
let f = server_configs.map(Some).forward(store)
.map(|_| trace!("forwarding to server config watch finished."));
// NOTE: This function and `no_tls` return `Box<Future<...>>` rather
// than `impl Future<...>` so that they can have the _same_
// return types (impl Traits are not the same type unless the
// original non-anonymized type was the same).
(watch, Box::new(f))
}
pub fn no_tls()
-> (ServerConfigWatch, Box<Future<Item = (), Error = ()> + Send>)
{
let (watch, _) = Watch::new(None);
let no_future = future::ok(());
(watch, Box::new(no_future))
}
} }
fn load_file_contents(path: &PathBuf) -> Result<Vec<u8>, Error> { fn load_file_contents(path: &PathBuf) -> Result<Vec<u8>, Error> {

View File

@ -10,6 +10,6 @@ mod cert_resolver;
mod connection; mod connection;
pub use self::{ pub use self::{
config::{CommonSettings, CommonConfig, Error, ServerConfig}, config::{CommonSettings, CommonConfig, Error, ServerConfig, ServerConfigWatch},
connection::Connection, connection::Connection,
}; };