diff --git a/Cargo.lock b/Cargo.lock index ea0adebcb..2aea7302e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,6 +148,7 @@ dependencies = [ "regex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "ring 0.13.0-alpha5 (registry+https://github.com/rust-lang/crates.io-index)", "rustls 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)", "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 2e37813d3..7ec3ab061 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -71,3 +71,6 @@ flate2 = { version = "1.0.1", default-features = false, features = ["rust_backen # `tokio-io` is needed for TCP tests, because `tokio::io` doesn't re-export # the `read` function. tokio-io = "0.1.6" +# Used for filesystem watch tests. We should switch from `tempdir` to `tempfile` once +# `prost-build` switches to it.` +tempdir = "0.3" diff --git a/proxy/src/fs_watch.rs b/proxy/src/fs_watch.rs new file mode 100644 index 000000000..5a548612f --- /dev/null +++ b/proxy/src/fs_watch.rs @@ -0,0 +1,692 @@ +use std::{fs, io, cell::RefCell, path::{Path, PathBuf}, time::{Duration, Instant}}; + +use futures::Stream; +use ring::digest::{self, Digest}; + +use tokio::timer::Interval; + +/// Stream changes to the files at a group of paths. +pub fn stream_changes(paths: I, interval: Duration) -> impl Stream +where + I: IntoIterator, + P: AsRef, +{ + // If we're on Linux, first atttempt to start an Inotify watch on the + // paths. If this fails, fall back to polling the filesystem. + #[cfg(target_os = "linux")] + { + stream_changes_inotify(paths, interval) + } + + // If we're not on Linux, we can't use inotify, so simply poll the fs. + // TODO: Use other FS events APIs (such as `kqueue`) as well, when + // they're available. + #[cfg(not(target_os = "linux"))] + { + stream_changes_polling(paths, interval) + } +} + +/// Stream changes by polling the filesystem. +/// +/// This will calculate the SHA-384 hash of each of files at the paths +/// described by this `CommonSettings` every `interval`, and attempt to +/// load a new `CommonConfig` from the files again if any of the hashes +/// has changed. +/// +/// This is used on operating systems other than Linux, or on Linux if +/// our attempt to use `inotify` failed. +pub fn stream_changes_polling( + paths: I, + interval: Duration, +) -> impl Stream +where + I: IntoIterator, + P: AsRef, +{ + let files = paths.into_iter().map(PathAndHash::new).collect::>(); + + Interval::new(Instant::now(), interval) + .map_err(|e| error!("timer error: {:?}", e)) + .filter_map(move |_| { + for file in &files { + match file.has_changed() { + Ok(true) => { + trace!("{:?} changed", &file.path); + return Some(()); + } + Ok(false) => { + // If the hash hasn't changed, keep going. + } + Err(ref e) if e.kind() == io::ErrorKind::NotFound => { + // A file not found error indicates that the file + // has been deleted. + trace!("{:?} deleted", &file.path); + return Some(()); + } + Err(ref e) => { + warn!("error hashing {:?}: {}", &file.path, e); + } + } + } + None + }) +} + +#[cfg(target_os = "linux")] +pub fn stream_changes_inotify( + paths: I, + interval: Duration, +) -> impl Stream +where + I: IntoIterator, + P: AsRef, +{ + use stream; + + let paths: Vec = paths + .into_iter() + .map(|p| p.as_ref().to_path_buf()) + .collect(); + let polls = Box::new(stream_changes_polling(paths.clone(), interval)); + match inotify::WatchStream::new(paths) { + Ok(watch) => { + let stream = inotify::FallbackStream { watch, polls }; + stream::Either::A(stream) + } + Err(e) => { + // If initializing the `Inotify` instance failed, it probably won't + // succeed in the future (it's likely that inotify unsupported on + // this OS). + warn!("inotify init error: {}, falling back to polling", e); + stream::Either::B(polls) + } + } +} + +#[derive(Clone, Debug)] +struct PathAndHash { + /// The path to the file. + path: PathBuf, + + /// The last SHA-384 digest of the file, if we have previously hashed it. + last_hash: RefCell>, +} + +impl PathAndHash { + fn new>(path: P) -> Self { + Self { + path: path.as_ref().to_path_buf(), + last_hash: RefCell::new(None), + } + } + + fn has_changed(&self) -> io::Result { + let contents = fs::read(&self.path)?; + let hash = Some(digest::digest(&digest::SHA256, &contents[..])); + let changed = self.last_hash.borrow().as_ref().map(Digest::as_ref) + != hash.as_ref().map(Digest::as_ref); + if changed { + self.last_hash.replace(hash); + } + Ok(changed) + } +} + +#[cfg(target_os = "linux")] +pub mod inotify { + use futures::{Async, Poll, Stream}; + use inotify::{Event, EventMask, EventStream, Inotify, WatchMask}; + use std::{io, path::PathBuf}; + + pub struct WatchStream { + inotify: Inotify, + stream: EventStream, + paths: Vec, + } + + pub struct FallbackStream { + pub watch: WatchStream, + pub polls: Box + Send>, + } + + impl WatchStream { + pub fn new(paths: Vec) -> Result { + let mut inotify = Inotify::init()?; + let stream = inotify.event_stream(); + + let mut watch_stream = WatchStream { + inotify, + stream, + paths, + }; + + watch_stream.add_paths()?; + + Ok(watch_stream) + } + + fn add_paths(&mut self) -> Result<(), io::Error> { + let mask = WatchMask::CREATE | WatchMask::MODIFY | WatchMask::DELETE + | WatchMask::DELETE_SELF | WatchMask::MOVE + | WatchMask::MOVE_SELF; + for path in &self.paths { + let watch_path = path.canonicalize().unwrap_or_else(|e| { + trace!("canonicalize({:?}): {:?}", &path, e); + path.parent().unwrap_or_else(|| path.as_ref()).to_path_buf() + }); + self.inotify.add_watch(&watch_path, mask)?; + trace!("watch {:?} (for {:?})", watch_path, path); + } + Ok(()) + } + } + + impl Stream for WatchStream { + type Item = (); + type Error = io::Error; + fn poll(&mut self) -> Poll, Self::Error> { + loop { + match try_ready!(self.stream.poll()) { + Some(Event { mask, name, .. }) => { + if mask.contains(EventMask::IGNORED) { + // This event fires if we removed a watch. Poll the + // stream again. + continue; + } + trace!("event={:?}; path={:?}", mask, name); + if mask.contains( + EventMask::DELETE & EventMask::DELETE_SELF & EventMask::CREATE, + ) { + self.add_paths()?; + } + return Ok(Async::Ready(Some(()))); + } + None => { + debug!("watch stream ending"); + return Ok(Async::Ready(None)); + } + } + } + } + } + + impl Stream for FallbackStream { + type Item = (); + type Error = (); + fn poll(&mut self) -> Poll, Self::Error> { + self.watch.poll().or_else(|e| { + warn!("watch error: {:?}, polling the fs until next change", e); + self.polls.poll() + }) + } + } + +} + +#[cfg(test)] +mod tests { + use super::*; + use task::test_util::BlockOnFor; + + use tempdir::TempDir; + use tokio::runtime::current_thread::Runtime; + + #[cfg(not(target_os = "windows"))] + use std::os::unix::fs::symlink; + use std::{fs::{self, File}, io::Write, path::Path, time::Duration}; + + use futures::{Future, Sink, Stream}; + use futures_watch::{Watch, WatchError}; + + struct Fixture { + paths: Vec, + dir: TempDir, + rt: Runtime, + } + + impl Fixture { + fn new() -> Fixture { + let _ = ::env_logger::try_init(); + let dir = TempDir::new("test").unwrap(); + let paths = vec![ + dir.path().join("a"), + dir.path().join("b"), + dir.path().join("c"), + ]; + let rt = Runtime::new().unwrap(); + Fixture { paths, dir, rt } + } + + fn test_polling(self, test: fn(Self, Watch<()>, Box>)) { + let stream = stream_changes_polling(self.paths.clone(), Duration::from_secs(1)); + let (watch, bg) = watch_stream(stream); + test(self, watch, bg) + } + + #[cfg(target_os = "linux")] + fn test_inotify(self, test: fn(Self, Watch<()>, Box>)) { + let paths = self.paths.clone(); + let stream = inotify::WatchStream::new(paths) + .unwrap() + .map_err(|e| panic!("{}", e)); + let (watch, bg) = watch_stream(stream); + test(self, watch, bg) + } + } + + fn create_file>(path: P) -> io::Result { + let f = File::create(path)?; + println!("created {:?}", f); + Ok(f) + } + + fn create_and_write>(path: P, s: &[u8]) -> io::Result { + let mut f = File::create(path)?; + f.write_all(s)?; + println!("created and wrote to {:?}", f); + Ok(f) + } + + fn watch_stream( + stream: impl Stream + 'static, + ) -> (Watch<()>, Box>) { + let (watch, store) = Watch::new(()); + // Use a watch so we can start running the stream immediately but also + // wait on stream updates. + let f = stream + .forward(store.sink_map_err(|_| ())) + .map(|_| ()) + .map_err(|_| ()); + + (watch, Box::new(f)) + } + + fn next_change( + rt: &mut Runtime, + watch: Watch<()>, + ) -> Result<(Option<()>, Watch<()>), WatchError> { + let next = watch.into_future().map_err(|(e, _)| e); + // Rust will print a warning if a test runs longer than 60 seconds, + // so we'll use that as the timeout after which we'll kill the test + // if we don't see a change. + rt.block_on_for(Duration::from_secs(60), next) + } + + fn test_detects_create( + fixture: Fixture, + watch: Watch<()>, + bg: Box>, + ) { + let Fixture { + paths, + dir: _dir, + mut rt, + } = fixture; + + rt.spawn(bg); + + paths.iter().fold(watch, |watch, path| { + create_file(path).unwrap(); + + let (item, watch) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + watch + }); + } + + fn test_detects_delete_and_recreate( + fixture: Fixture, + watch: Watch<()>, + bg: Box>, + ) { + let Fixture { + paths, + dir: _dir, + mut rt, + } = fixture; + rt.spawn(bg); + + let watch = paths.iter().fold(watch, |watch, ref path| { + create_and_write(path, b"A").unwrap(); + + let (item, watch) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + watch + }); + + let watch = paths.iter().fold(watch, |watch, ref path| { + fs::remove_file(path).unwrap(); + println!("deleted {:?}", path); + + let (item, watch) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + watch + }); + + paths.iter().fold(watch, |watch, ref path| { + create_and_write(path, b"B").unwrap(); + + let (item, watch) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + watch + }); + } + + #[cfg(not(target_os = "windows"))] + fn test_detects_create_symlink( + fixture: Fixture, + watch: Watch<()>, + bg: Box>, + ) { + let Fixture { paths, dir, mut rt } = fixture; + + let data_path = dir.path().join("data"); + fs::create_dir(&data_path).unwrap(); + + let data_paths = paths + .iter() + .map(|p| { + let path = data_path.clone().join(p.file_name().unwrap()); + create_file(&path).unwrap(); + path + }) + .collect::>(); + + rt.spawn(bg); + + data_paths + .iter() + .zip(paths.iter()) + .fold(watch, |watch, (path, target_path)| { + symlink(path, target_path).unwrap(); + + let (item, watch) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + watch + }); + } + + // Test for when the watched files are symlinks to a file inside of a + // directory which is also a symlink (as is the case with Kubernetes + // ConfigMap/Secret volume mounts). + #[cfg(not(target_os = "windows"))] + fn test_detects_create_double_symlink( + fixture: Fixture, + watch: Watch<()>, + bg: Box>, + ) { + let Fixture { paths, dir, mut rt } = fixture; + + let real_data_path = dir.path().join("real_data"); + let data_path = dir.path().join("data"); + fs::create_dir(&real_data_path).unwrap(); + symlink(&real_data_path, &data_path).unwrap(); + + for path in &paths { + let path = real_data_path.clone().join(path.file_name().unwrap()); + create_file(&path).unwrap(); + } + + // -- Below this point, the watch is running. ----------------------- + rt.spawn(bg); + + paths.iter().fold(watch, |watch, path| { + let file_name = path.file_name().unwrap(); + symlink(data_path.clone().join(file_name), path).unwrap(); + + let (item, watch) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + watch + }); + } + + #[cfg(not(target_os = "windows"))] + fn test_detects_modification_symlink( + fixture: Fixture, + watch: Watch<()>, + bg: Box>, + ) { + let Fixture { paths, dir, mut rt } = fixture; + + let data_path = dir.path().join("data"); + fs::create_dir(&data_path).unwrap(); + + let data_paths = paths + .iter() + .map(|p| { + let path = data_path.clone().join(p.file_name().unwrap()); + path + }) + .collect::>(); + + let mut data_files = data_paths + .iter() + .map(|path| create_and_write(path, b"a").unwrap()) + .collect::>(); + + for (path, target_path) in data_paths.iter().zip(paths.iter()) { + // Don't assert that events are seen here, as we haven't started + // running the watch yet. + symlink(path, target_path).unwrap(); + } + + // -- Below this point, the watch is running. ----------------------- + rt.spawn(bg); + + data_files.iter_mut().fold(watch, |watch, file| { + file.write_all(b"b").unwrap(); + + let (item, watch) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + watch + }); + } + + fn test_detects_modification( + fixture: Fixture, + watch: Watch<()>, + bg: Box>, + ) { + let Fixture { + paths, + dir: _dir, + mut rt, + } = fixture; + + let mut files = paths + .iter() + .map(|path| create_and_write(path, b"a").unwrap()) + .collect::>(); + + rt.spawn(bg); + + files.iter_mut().fold(watch, |watch, file| { + file.write_all(b"b").unwrap(); + let (item, watch) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + watch + }); + } + + #[cfg(not(target_os = "windows"))] + fn test_detects_modification_double_symlink( + fixture: Fixture, + watch: Watch<()>, + bg: Box>, + ) { + let Fixture { paths, dir, mut rt } = fixture; + + let real_data_path = dir.path().join("real_data"); + let data_path = dir.path().join("data"); + fs::create_dir(&real_data_path).unwrap(); + symlink(&real_data_path, &data_path).unwrap(); + + let mut files = paths + .iter() + .map(|p| { + let path = real_data_path.clone().join(p.file_name().unwrap()); + create_and_write(path, b"a").unwrap() + }) + .collect::>(); + + for path in &paths { + let file_path = data_path.clone().join(path.file_name().unwrap()); + // Don't assert that events are seen here, as we haven't started + // running the watch yet. + symlink(file_path, path).unwrap(); + } + + // -- Below this point, the watch is running. ----------------------- + rt.spawn(bg); + + files.iter_mut().fold(watch, |watch, file| { + file.write_all(b"b").unwrap(); + let (item, watch) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + watch + }); + } + + #[cfg(not(target_os = "windows"))] + fn test_detects_double_symlink_retargeting( + fixture: Fixture, + watch: Watch<()>, + bg: Box>, + ) { + let Fixture { paths, dir, mut rt } = fixture; + + let real_data_path = dir.path().join("real_data"); + let real_data_path_2 = dir.path().join("real_data_2"); + let data_path = dir.path().join("data"); + fs::create_dir(&real_data_path).unwrap(); + fs::create_dir(&real_data_path_2).unwrap(); + symlink(&real_data_path, &data_path).unwrap(); + + // Create the first set of files. + // We won't assert that any changes are detected until we actually + // start the watch. + for path in &paths { + let path = real_data_path.clone().join(path.file_name().unwrap()); + create_and_write(path, b"a").unwrap(); + } + + // Symlink those files into `data_path` + for path in &paths { + let data_file_path = data_path.clone().join(path.file_name().unwrap()); + symlink(data_file_path, path).unwrap(); + } + + // Create the second set of files. + for path in &paths { + let path = real_data_path_2.clone().join(path.file_name().unwrap()); + create_and_write(path, b"b").unwrap(); + } + + // -- Below this point, the watch is running. ----------------------- + rt.spawn(bg); + + let (item, watch) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + + fs::remove_dir_all(&data_path).unwrap(); + symlink(&real_data_path_2, &data_path).unwrap(); + + let (item, _) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + } + + #[test] + fn polling_detects_create() { + Fixture::new().test_polling(test_detects_create) + } + + #[test] + #[cfg(target_os = "linux")] + fn inotify_detects_create() { + Fixture::new().test_inotify(test_detects_create) + } + + #[test] + #[cfg(not(target_os = "windows"))] + fn polling_detects_create_symlink() { + Fixture::new().test_polling(test_detects_create_symlink) + } + + #[test] + #[cfg(target_os = "linux")] + fn inotify_detects_create_symlink() { + Fixture::new().test_inotify(test_detects_create_symlink) + } + + #[test] + #[cfg(not(target_os = "windows"))] + fn polling_detects_create_double_symlink() { + Fixture::new().test_polling(test_detects_create_double_symlink) + } + + #[test] + #[cfg(target_os = "linux")] + fn inotify_detects_create_double_symlink() { + Fixture::new().test_inotify(test_detects_create_double_symlink) + } + + #[test] + fn polling_detects_modification() { + Fixture::new().test_polling(test_detects_modification) + } + + #[test] + #[cfg(target_os = "linux")] + fn inotify_detects_modification() { + Fixture::new().test_inotify(test_detects_modification) + } + + #[test] + #[cfg(not(target_os = "windows"))] + fn polling_detects_modification_symlink() { + Fixture::new().test_polling(test_detects_modification_symlink) + } + + #[test] + #[cfg(target_os = "linux")] + fn inotify_detects_modification_symlink() { + Fixture::new().test_inotify(test_detects_modification_symlink) + } + + #[test] + #[cfg(not(target_os = "windows"))] + fn polling_detects_modification_double_symlink() { + Fixture::new().test_polling(test_detects_modification_double_symlink) + } + + #[test] + #[cfg(target_os = "linux")] + fn inotify_detects_modification_double_symlink() { + Fixture::new().test_inotify(test_detects_modification_double_symlink) + } + + #[test] + #[cfg(not(target_os = "windows"))] + fn polling_detects_double_symlink_retargeting() { + Fixture::new().test_polling(test_detects_double_symlink_retargeting) + } + + #[test] + #[cfg(target_os = "linux")] + fn inotify_detects_double_symlink_retargeting() { + Fixture::new().test_inotify(test_detects_double_symlink_retargeting) + } + + #[test] + fn polling_detects_delete_and_recreate() { + Fixture::new().test_polling(test_detects_delete_and_recreate) + } + + #[test] + #[cfg(target_os = "linux")] + fn inotify_detects_delete_and_recreate() { + Fixture::new().test_inotify(test_detects_delete_and_recreate) + } + +} diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 3e2b3f43c..6cd116a15 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -30,6 +30,9 @@ extern crate prost_types; extern crate quickcheck; extern crate rand; extern crate regex; +extern crate ring; +#[cfg(test)] +extern crate tempdir; extern crate tokio; extern crate tokio_connect; extern crate tower_balance; @@ -72,10 +75,12 @@ pub mod convert; pub mod ctx; mod dns; mod drain; +pub mod fs_watch; mod inbound; mod logging; mod map_err; mod outbound; +pub mod stream; pub mod task; pub mod telemetry; mod transparency; diff --git a/proxy/src/stream.rs b/proxy/src/stream.rs new file mode 100644 index 000000000..e28a6414c --- /dev/null +++ b/proxy/src/stream.rs @@ -0,0 +1,43 @@ +use futures::{Poll, Stream}; + +/// Like `futures::future::Either` but for `Stream`s. +/// +/// Combines two different `Stream`s yielding the same item and error +/// types into a single type. +/// +// TODO: This is probably useful outside of Conduit as well. Perhaps it +// deserves to be in a library... +#[derive(Clone, Debug)] +pub enum Either { + A(A), + B(B), +} + +// We could implement this function using the exact same code as +// `futures::future::Either`, if we needed it. I've commented it +// out, as we currently don't need it. +// impl Either<(T, A), (T, B)> { +// /// Splits out the homogeneous type from an either of tuples. +// pub fn split(self) -> (T, Either) { +// match self { +// Either::A((a, b)) => (a, Either::A(b)), +// Either::B((a, b)) => (a, Either::B(b)), +// } +// } +// } + +impl Stream for Either +where + A: Stream, + B: Stream, +{ + type Item = A::Item; + type Error = A::Error; + + fn poll(&mut self) -> Poll, A::Error> { + match *self { + Either::A(ref mut a) => a.poll(), + Either::B(ref mut b) => b.poll(), + } + } +} diff --git a/proxy/src/task.rs b/proxy/src/task.rs index d6681f925..8f744675c 100644 --- a/proxy/src/task.rs +++ b/proxy/src/task.rs @@ -263,3 +263,50 @@ impl StdError for Error { } } } + +#[cfg(test)] +pub mod test_util { + use futures::Future; + use tokio::{ + runtime::current_thread, + timer, + }; + + use std::time::{Duration, Instant}; + + /// A trait that allows an executor to execute a future for up to a given + /// time limit, and then panics if the future has not finished. + /// + /// This is intended for use in cases where the failure mode of some future + /// is to wait forever, rather than returning an error. When this happens, + /// it can make debugging test failures much more difficult, as killing + /// the tests when one has been waiting for over a minute prevents any + /// remaining tests from running, and doesn't print any output from the + /// killed test. + pub trait BlockOnFor { + /// Runs the provided future for up to `timeout`, blocking the thread + /// until the future completes. + fn block_on_for(&mut self, timeout: Duration, f: F) -> Result + where + F: Future; + } + + impl BlockOnFor for current_thread::Runtime { + fn block_on_for(&mut self, timeout: Duration, f: F) -> Result + where + F: Future + { + let f = timer::Deadline::new(f, Instant::now() + timeout); + match self.block_on(f) { + Ok(item) => Ok(item), + Err(e) => if e.is_inner() { + return Err(e.into_inner().unwrap()); + } else if e.is_timer() { + panic!("timer error: {}", e.into_timer().unwrap()); + } else { + panic!("assertion failed: future did not finish within {:?}", timeout); + }, + } + } + } +} diff --git a/proxy/src/transport/tls/cert_resolver.rs b/proxy/src/transport/tls/cert_resolver.rs index 89348d500..9d4f2ab58 100755 --- a/proxy/src/transport/tls/cert_resolver.rs +++ b/proxy/src/transport/tls/cert_resolver.rs @@ -6,11 +6,11 @@ use std::{ use super::{ config, - ring::{self, rand, signature}, rustls, untrusted, webpki, }; +use ring::{self, rand, signature}; pub struct CertResolver { certified_key: rustls::sign::CertifiedKey, diff --git a/proxy/src/transport/tls/config.rs b/proxy/src/transport/tls/config.rs index fae73e739..9ec4e5036 100644 --- a/proxy/src/transport/tls/config.rs +++ b/proxy/src/transport/tls/config.rs @@ -3,7 +3,7 @@ use std::{ io::{self, Cursor, Read}, path::PathBuf, sync::Arc, - time::{Duration, Instant, SystemTime,}, + time::Duration, }; use super::{ @@ -16,7 +16,6 @@ use super::{ use futures::{future, Future, Stream}; use futures_watch::Watch; -use tokio::timer::Interval; /// Not-yet-validated settings that are used for both TLS clients and TLS /// servers. @@ -67,8 +66,6 @@ pub enum Error { EndEntityCertIsNotValid(webpki::Error), InvalidPrivateKey, TimeConversionFailed, - #[cfg(target_os = "linux")] - InotifyInit(io::Error), } impl CommonSettings { @@ -85,149 +82,20 @@ impl CommonSettings { /// The returned stream consists of each subsequent successfully loaded /// `CommonSettings` after each change. If the settings could not be /// reloaded (i.e., they were malformed), nothing is sent. - fn stream_changes(self, interval: Duration) + pub fn stream_changes(self, interval: Duration) -> impl Stream { - // If we're on Linux, first atttempt to start an Inotify watch on the - // paths. If this fails, fall back to polling the filesystem. - #[cfg(target_os = "linux")] - let changes: Box + Send> = - match self.stream_changes_inotify() { - Ok(s) => Box::new(s), - Err(e) => { - warn!( - "inotify init error: {:?}, falling back to polling", - e - ); - Box::new(self.stream_changes_polling(interval)) - }, - }; - - // If we're not on Linux, we can't use inotify, so simply poll the fs. - // TODO: Use other FS events APIs (such as `kqueue`) as well, when - // they're available. - #[cfg(not(target_os = "linux"))] - let changes = self.stream_changes_polling(interval); - - changes.filter_map(move |_| - CommonConfig::load_from_disk(&self) - .map_err(|e| warn!("error reloading TLS config: {:?}, falling back", e)) - .ok() - ) - - } - - /// Stream changes by polling the filesystem. - /// - /// This will poll the filesystem for changes to the files at the paths - /// described by this `CommonSettings` every `interval`, and attempt to - /// load a new `CommonConfig` from the files again after each change. - /// - /// This is used on operating systems other than Linux, or on Linux if - /// our attempt to use `inotify` failed. - fn stream_changes_polling(&self, interval: Duration) - -> impl Stream - { - fn last_modified(path: &PathBuf) -> Option { - // We have to canonicalize the path _every_ time we poll the fs, - // rather than once when we start watching, because if it's a - // symlink, the target may change. If that happened, and we - // continued watching the original canonical path, we wouldn't see - // any subsequent changes to the new symlink target. - path.canonicalize() - .and_then(|canonical| { - trace!("last_modified: {:?} -> {:?}", path, canonical); - canonical.symlink_metadata() - .and_then(|meta| meta.modified()) - }) - .map_err(|e| if e.kind() != io::ErrorKind::NotFound { - // Don't log if the files don't exist, since this - // makes the logs *quite* noisy. - warn!("error reading metadata for {:?}: {}", path, e) - }) - .ok() - } - let paths = self.paths().iter() .map(|&p| p.clone()) - .collect::>(); - - let mut max: Option = None; - - Interval::new(Instant::now(), interval) - .map_err(|e| error!("timer error: {:?}", e)) + .collect::>(); + ::fs_watch::stream_changes(paths, interval) .filter_map(move |_| { - for path in &paths { - let t = last_modified(path); - if t > max { - max = t; - trace!("{:?} changed at {:?}", path, t); - return Some(()); - } - } - None + CommonConfig::load_from_disk(&self) + .map_err(|e| warn!("error reloading TLS config: {:?}, falling back", e)) + .ok() }) } - #[cfg(target_os = "linux")] - fn stream_changes_inotify(&self) - -> Result, Error> - { - use std::{collections::HashSet, path::Path}; - use inotify::{Inotify, WatchMask}; - - // Use a broad watch mask so that we will pick up any events that might - // indicate a change to the watched files. - // - // Such a broad mask may lead to reloading certs multiple times when k8s - // modifies a ConfigMap or Secret, which is a multi-step process that we - // see as a series CREATE, MOVED_TO, MOVED_FROM, and DELETE events. - // However, we want to catch single events that might occur when the - // files we're watching *don't* live in a k8s ConfigMap/Secret. - let mask = WatchMask::CREATE - | WatchMask::MODIFY - | WatchMask::DELETE - | WatchMask::MOVE - ; - let mut inotify = Inotify::init().map_err(Error::InotifyInit)?; - - let paths = self.paths(); - let paths = paths.into_iter() - .map(|path| { - // If the path to watch has a parent, watch that instead. This - // will allow us to pick up events to files in k8s ConfigMaps - // or Secrets (which we wouldn't detect if we watch the file - // itself, as they are double-symlinked). - // - // This may also result in some false positives (if a file we - // *don't* care about in the same dir changes, we'll still - // reload), but that's unlikely to be a problem. - let parent = path - .parent() - .map(Path::to_path_buf) - .unwrap_or(path.to_path_buf()); - trace!("will watch {:?} for {:?}", parent, path); - path - }) - // Collect the paths into a `HashSet` eliminates any duplicates, to - // conserve the number of inotify watches we create. - .collect::>(); - - for path in paths { - inotify.add_watch(path, mask) - .map_err(|e| Error::Io(path.to_path_buf(), e))?; - trace!("inotify: watch {:?}", path); - } - - let events = inotify.into_event_stream() - .map(|ev| { - trace!("inotify: event={:?}; path={:?};", ev.mask, ev.name); - }) - .map_err(|e| error!("inotify watch error: {}", e)); - trace!("started inotify watch"); - - Ok(events) - } } impl CommonConfig { diff --git a/proxy/src/transport/tls/mod.rs b/proxy/src/transport/tls/mod.rs index 94d5f0b80..7ac1136d8 100755 --- a/proxy/src/transport/tls/mod.rs +++ b/proxy/src/transport/tls/mod.rs @@ -1,5 +1,4 @@ // These crates are only used within the `tls` module. -extern crate ring; extern crate rustls; extern crate tokio_rustls; extern crate untrusted;