proxy: Fix Inotify falling back to polling when files don't exist yet (#1119)
This PR changes the proxy's Inotify watch code to avoid always falling back to polling the filesystem when the watched files don't exist yet. It also contains some additional cleanup and refactoring of the inotify code, including moving the non-TLS-specific filesystem watching code out of the `tls::config` module and into a new `fs_watch` module. In addition, it adds tests for both the polling-based and inotify-based watch implementations, and changes the polling-based watches to hash the files rather than using timestamps from the file's metadata to detect changes. These changes are originally from #1094 and #1091, respectively, but they're included here because @briansmith asked that all the changes be made in one PR. Closes #1094. Closes #1091. Fixes #1090. Fixes #1097. Fixes #1061. Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
parent
a5cf4c2db9
commit
24d54dc2d2
|
@ -148,6 +148,7 @@ dependencies = [
|
|||
"regex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"ring 0.13.0-alpha5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rustls 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)",
|
||||
"tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
|
|
@ -71,3 +71,6 @@ flate2 = { version = "1.0.1", default-features = false, features = ["rust_backen
|
|||
# `tokio-io` is needed for TCP tests, because `tokio::io` doesn't re-export
|
||||
# the `read` function.
|
||||
tokio-io = "0.1.6"
|
||||
# Used for filesystem watch tests. We should switch from `tempdir` to `tempfile` once
|
||||
# `prost-build` switches to it.`
|
||||
tempdir = "0.3"
|
||||
|
|
|
@ -0,0 +1,692 @@
|
|||
use std::{fs, io, cell::RefCell, path::{Path, PathBuf}, time::{Duration, Instant}};
|
||||
|
||||
use futures::Stream;
|
||||
use ring::digest::{self, Digest};
|
||||
|
||||
use tokio::timer::Interval;
|
||||
|
||||
/// Stream changes to the files at a group of paths.
|
||||
pub fn stream_changes<I, P>(paths: I, interval: Duration) -> impl Stream<Item = (), Error = ()>
|
||||
where
|
||||
I: IntoIterator<Item = P>,
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
// If we're on Linux, first atttempt to start an Inotify watch on the
|
||||
// paths. If this fails, fall back to polling the filesystem.
|
||||
#[cfg(target_os = "linux")]
|
||||
{
|
||||
stream_changes_inotify(paths, interval)
|
||||
}
|
||||
|
||||
// If we're not on Linux, we can't use inotify, so simply poll the fs.
|
||||
// TODO: Use other FS events APIs (such as `kqueue`) as well, when
|
||||
// they're available.
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
{
|
||||
stream_changes_polling(paths, interval)
|
||||
}
|
||||
}
|
||||
|
||||
/// Stream changes by polling the filesystem.
|
||||
///
|
||||
/// This will calculate the SHA-384 hash of each of files at the paths
|
||||
/// described by this `CommonSettings` every `interval`, and attempt to
|
||||
/// load a new `CommonConfig` from the files again if any of the hashes
|
||||
/// has changed.
|
||||
///
|
||||
/// This is used on operating systems other than Linux, or on Linux if
|
||||
/// our attempt to use `inotify` failed.
|
||||
pub fn stream_changes_polling<I, P>(
|
||||
paths: I,
|
||||
interval: Duration,
|
||||
) -> impl Stream<Item = (), Error = ()>
|
||||
where
|
||||
I: IntoIterator<Item = P>,
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let files = paths.into_iter().map(PathAndHash::new).collect::<Vec<_>>();
|
||||
|
||||
Interval::new(Instant::now(), interval)
|
||||
.map_err(|e| error!("timer error: {:?}", e))
|
||||
.filter_map(move |_| {
|
||||
for file in &files {
|
||||
match file.has_changed() {
|
||||
Ok(true) => {
|
||||
trace!("{:?} changed", &file.path);
|
||||
return Some(());
|
||||
}
|
||||
Ok(false) => {
|
||||
// If the hash hasn't changed, keep going.
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
|
||||
// A file not found error indicates that the file
|
||||
// has been deleted.
|
||||
trace!("{:?} deleted", &file.path);
|
||||
return Some(());
|
||||
}
|
||||
Err(ref e) => {
|
||||
warn!("error hashing {:?}: {}", &file.path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub fn stream_changes_inotify<I, P>(
|
||||
paths: I,
|
||||
interval: Duration,
|
||||
) -> impl Stream<Item = (), Error = ()>
|
||||
where
|
||||
I: IntoIterator<Item = P>,
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
use stream;
|
||||
|
||||
let paths: Vec<PathBuf> = paths
|
||||
.into_iter()
|
||||
.map(|p| p.as_ref().to_path_buf())
|
||||
.collect();
|
||||
let polls = Box::new(stream_changes_polling(paths.clone(), interval));
|
||||
match inotify::WatchStream::new(paths) {
|
||||
Ok(watch) => {
|
||||
let stream = inotify::FallbackStream { watch, polls };
|
||||
stream::Either::A(stream)
|
||||
}
|
||||
Err(e) => {
|
||||
// If initializing the `Inotify` instance failed, it probably won't
|
||||
// succeed in the future (it's likely that inotify unsupported on
|
||||
// this OS).
|
||||
warn!("inotify init error: {}, falling back to polling", e);
|
||||
stream::Either::B(polls)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct PathAndHash {
|
||||
/// The path to the file.
|
||||
path: PathBuf,
|
||||
|
||||
/// The last SHA-384 digest of the file, if we have previously hashed it.
|
||||
last_hash: RefCell<Option<Digest>>,
|
||||
}
|
||||
|
||||
impl PathAndHash {
|
||||
fn new<P: AsRef<Path>>(path: P) -> Self {
|
||||
Self {
|
||||
path: path.as_ref().to_path_buf(),
|
||||
last_hash: RefCell::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn has_changed(&self) -> io::Result<bool> {
|
||||
let contents = fs::read(&self.path)?;
|
||||
let hash = Some(digest::digest(&digest::SHA256, &contents[..]));
|
||||
let changed = self.last_hash.borrow().as_ref().map(Digest::as_ref)
|
||||
!= hash.as_ref().map(Digest::as_ref);
|
||||
if changed {
|
||||
self.last_hash.replace(hash);
|
||||
}
|
||||
Ok(changed)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod inotify {
|
||||
use futures::{Async, Poll, Stream};
|
||||
use inotify::{Event, EventMask, EventStream, Inotify, WatchMask};
|
||||
use std::{io, path::PathBuf};
|
||||
|
||||
pub struct WatchStream {
|
||||
inotify: Inotify,
|
||||
stream: EventStream,
|
||||
paths: Vec<PathBuf>,
|
||||
}
|
||||
|
||||
pub struct FallbackStream {
|
||||
pub watch: WatchStream,
|
||||
pub polls: Box<Stream<Item = (), Error = ()> + Send>,
|
||||
}
|
||||
|
||||
impl WatchStream {
|
||||
pub fn new(paths: Vec<PathBuf>) -> Result<Self, io::Error> {
|
||||
let mut inotify = Inotify::init()?;
|
||||
let stream = inotify.event_stream();
|
||||
|
||||
let mut watch_stream = WatchStream {
|
||||
inotify,
|
||||
stream,
|
||||
paths,
|
||||
};
|
||||
|
||||
watch_stream.add_paths()?;
|
||||
|
||||
Ok(watch_stream)
|
||||
}
|
||||
|
||||
fn add_paths(&mut self) -> Result<(), io::Error> {
|
||||
let mask = WatchMask::CREATE | WatchMask::MODIFY | WatchMask::DELETE
|
||||
| WatchMask::DELETE_SELF | WatchMask::MOVE
|
||||
| WatchMask::MOVE_SELF;
|
||||
for path in &self.paths {
|
||||
let watch_path = path.canonicalize().unwrap_or_else(|e| {
|
||||
trace!("canonicalize({:?}): {:?}", &path, e);
|
||||
path.parent().unwrap_or_else(|| path.as_ref()).to_path_buf()
|
||||
});
|
||||
self.inotify.add_watch(&watch_path, mask)?;
|
||||
trace!("watch {:?} (for {:?})", watch_path, path);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for WatchStream {
|
||||
type Item = ();
|
||||
type Error = io::Error;
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
loop {
|
||||
match try_ready!(self.stream.poll()) {
|
||||
Some(Event { mask, name, .. }) => {
|
||||
if mask.contains(EventMask::IGNORED) {
|
||||
// This event fires if we removed a watch. Poll the
|
||||
// stream again.
|
||||
continue;
|
||||
}
|
||||
trace!("event={:?}; path={:?}", mask, name);
|
||||
if mask.contains(
|
||||
EventMask::DELETE & EventMask::DELETE_SELF & EventMask::CREATE,
|
||||
) {
|
||||
self.add_paths()?;
|
||||
}
|
||||
return Ok(Async::Ready(Some(())));
|
||||
}
|
||||
None => {
|
||||
debug!("watch stream ending");
|
||||
return Ok(Async::Ready(None));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for FallbackStream {
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
self.watch.poll().or_else(|e| {
|
||||
warn!("watch error: {:?}, polling the fs until next change", e);
|
||||
self.polls.poll()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use task::test_util::BlockOnFor;
|
||||
|
||||
use tempdir::TempDir;
|
||||
use tokio::runtime::current_thread::Runtime;
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
use std::os::unix::fs::symlink;
|
||||
use std::{fs::{self, File}, io::Write, path::Path, time::Duration};
|
||||
|
||||
use futures::{Future, Sink, Stream};
|
||||
use futures_watch::{Watch, WatchError};
|
||||
|
||||
struct Fixture {
|
||||
paths: Vec<PathBuf>,
|
||||
dir: TempDir,
|
||||
rt: Runtime,
|
||||
}
|
||||
|
||||
impl Fixture {
|
||||
fn new() -> Fixture {
|
||||
let _ = ::env_logger::try_init();
|
||||
let dir = TempDir::new("test").unwrap();
|
||||
let paths = vec![
|
||||
dir.path().join("a"),
|
||||
dir.path().join("b"),
|
||||
dir.path().join("c"),
|
||||
];
|
||||
let rt = Runtime::new().unwrap();
|
||||
Fixture { paths, dir, rt }
|
||||
}
|
||||
|
||||
fn test_polling(self, test: fn(Self, Watch<()>, Box<Future<Item = (), Error = ()>>)) {
|
||||
let stream = stream_changes_polling(self.paths.clone(), Duration::from_secs(1));
|
||||
let (watch, bg) = watch_stream(stream);
|
||||
test(self, watch, bg)
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn test_inotify(self, test: fn(Self, Watch<()>, Box<Future<Item = (), Error = ()>>)) {
|
||||
let paths = self.paths.clone();
|
||||
let stream = inotify::WatchStream::new(paths)
|
||||
.unwrap()
|
||||
.map_err(|e| panic!("{}", e));
|
||||
let (watch, bg) = watch_stream(stream);
|
||||
test(self, watch, bg)
|
||||
}
|
||||
}
|
||||
|
||||
fn create_file<P: AsRef<Path>>(path: P) -> io::Result<File> {
|
||||
let f = File::create(path)?;
|
||||
println!("created {:?}", f);
|
||||
Ok(f)
|
||||
}
|
||||
|
||||
fn create_and_write<P: AsRef<Path>>(path: P, s: &[u8]) -> io::Result<File> {
|
||||
let mut f = File::create(path)?;
|
||||
f.write_all(s)?;
|
||||
println!("created and wrote to {:?}", f);
|
||||
Ok(f)
|
||||
}
|
||||
|
||||
fn watch_stream(
|
||||
stream: impl Stream<Item = (), Error = ()> + 'static,
|
||||
) -> (Watch<()>, Box<Future<Item = (), Error = ()>>) {
|
||||
let (watch, store) = Watch::new(());
|
||||
// Use a watch so we can start running the stream immediately but also
|
||||
// wait on stream updates.
|
||||
let f = stream
|
||||
.forward(store.sink_map_err(|_| ()))
|
||||
.map(|_| ())
|
||||
.map_err(|_| ());
|
||||
|
||||
(watch, Box::new(f))
|
||||
}
|
||||
|
||||
fn next_change(
|
||||
rt: &mut Runtime,
|
||||
watch: Watch<()>,
|
||||
) -> Result<(Option<()>, Watch<()>), WatchError> {
|
||||
let next = watch.into_future().map_err(|(e, _)| e);
|
||||
// Rust will print a warning if a test runs longer than 60 seconds,
|
||||
// so we'll use that as the timeout after which we'll kill the test
|
||||
// if we don't see a change.
|
||||
rt.block_on_for(Duration::from_secs(60), next)
|
||||
}
|
||||
|
||||
fn test_detects_create(
|
||||
fixture: Fixture,
|
||||
watch: Watch<()>,
|
||||
bg: Box<Future<Item = (), Error = ()>>,
|
||||
) {
|
||||
let Fixture {
|
||||
paths,
|
||||
dir: _dir,
|
||||
mut rt,
|
||||
} = fixture;
|
||||
|
||||
rt.spawn(bg);
|
||||
|
||||
paths.iter().fold(watch, |watch, path| {
|
||||
create_file(path).unwrap();
|
||||
|
||||
let (item, watch) = next_change(&mut rt, watch).unwrap();
|
||||
assert!(item.is_some());
|
||||
watch
|
||||
});
|
||||
}
|
||||
|
||||
fn test_detects_delete_and_recreate(
|
||||
fixture: Fixture,
|
||||
watch: Watch<()>,
|
||||
bg: Box<Future<Item = (), Error = ()>>,
|
||||
) {
|
||||
let Fixture {
|
||||
paths,
|
||||
dir: _dir,
|
||||
mut rt,
|
||||
} = fixture;
|
||||
rt.spawn(bg);
|
||||
|
||||
let watch = paths.iter().fold(watch, |watch, ref path| {
|
||||
create_and_write(path, b"A").unwrap();
|
||||
|
||||
let (item, watch) = next_change(&mut rt, watch).unwrap();
|
||||
assert!(item.is_some());
|
||||
watch
|
||||
});
|
||||
|
||||
let watch = paths.iter().fold(watch, |watch, ref path| {
|
||||
fs::remove_file(path).unwrap();
|
||||
println!("deleted {:?}", path);
|
||||
|
||||
let (item, watch) = next_change(&mut rt, watch).unwrap();
|
||||
assert!(item.is_some());
|
||||
watch
|
||||
});
|
||||
|
||||
paths.iter().fold(watch, |watch, ref path| {
|
||||
create_and_write(path, b"B").unwrap();
|
||||
|
||||
let (item, watch) = next_change(&mut rt, watch).unwrap();
|
||||
assert!(item.is_some());
|
||||
watch
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
fn test_detects_create_symlink(
|
||||
fixture: Fixture,
|
||||
watch: Watch<()>,
|
||||
bg: Box<Future<Item = (), Error = ()>>,
|
||||
) {
|
||||
let Fixture { paths, dir, mut rt } = fixture;
|
||||
|
||||
let data_path = dir.path().join("data");
|
||||
fs::create_dir(&data_path).unwrap();
|
||||
|
||||
let data_paths = paths
|
||||
.iter()
|
||||
.map(|p| {
|
||||
let path = data_path.clone().join(p.file_name().unwrap());
|
||||
create_file(&path).unwrap();
|
||||
path
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
rt.spawn(bg);
|
||||
|
||||
data_paths
|
||||
.iter()
|
||||
.zip(paths.iter())
|
||||
.fold(watch, |watch, (path, target_path)| {
|
||||
symlink(path, target_path).unwrap();
|
||||
|
||||
let (item, watch) = next_change(&mut rt, watch).unwrap();
|
||||
assert!(item.is_some());
|
||||
watch
|
||||
});
|
||||
}
|
||||
|
||||
// Test for when the watched files are symlinks to a file inside of a
|
||||
// directory which is also a symlink (as is the case with Kubernetes
|
||||
// ConfigMap/Secret volume mounts).
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
fn test_detects_create_double_symlink(
|
||||
fixture: Fixture,
|
||||
watch: Watch<()>,
|
||||
bg: Box<Future<Item = (), Error = ()>>,
|
||||
) {
|
||||
let Fixture { paths, dir, mut rt } = fixture;
|
||||
|
||||
let real_data_path = dir.path().join("real_data");
|
||||
let data_path = dir.path().join("data");
|
||||
fs::create_dir(&real_data_path).unwrap();
|
||||
symlink(&real_data_path, &data_path).unwrap();
|
||||
|
||||
for path in &paths {
|
||||
let path = real_data_path.clone().join(path.file_name().unwrap());
|
||||
create_file(&path).unwrap();
|
||||
}
|
||||
|
||||
// -- Below this point, the watch is running. -----------------------
|
||||
rt.spawn(bg);
|
||||
|
||||
paths.iter().fold(watch, |watch, path| {
|
||||
let file_name = path.file_name().unwrap();
|
||||
symlink(data_path.clone().join(file_name), path).unwrap();
|
||||
|
||||
let (item, watch) = next_change(&mut rt, watch).unwrap();
|
||||
assert!(item.is_some());
|
||||
watch
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
fn test_detects_modification_symlink(
|
||||
fixture: Fixture,
|
||||
watch: Watch<()>,
|
||||
bg: Box<Future<Item = (), Error = ()>>,
|
||||
) {
|
||||
let Fixture { paths, dir, mut rt } = fixture;
|
||||
|
||||
let data_path = dir.path().join("data");
|
||||
fs::create_dir(&data_path).unwrap();
|
||||
|
||||
let data_paths = paths
|
||||
.iter()
|
||||
.map(|p| {
|
||||
let path = data_path.clone().join(p.file_name().unwrap());
|
||||
path
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut data_files = data_paths
|
||||
.iter()
|
||||
.map(|path| create_and_write(path, b"a").unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (path, target_path) in data_paths.iter().zip(paths.iter()) {
|
||||
// Don't assert that events are seen here, as we haven't started
|
||||
// running the watch yet.
|
||||
symlink(path, target_path).unwrap();
|
||||
}
|
||||
|
||||
// -- Below this point, the watch is running. -----------------------
|
||||
rt.spawn(bg);
|
||||
|
||||
data_files.iter_mut().fold(watch, |watch, file| {
|
||||
file.write_all(b"b").unwrap();
|
||||
|
||||
let (item, watch) = next_change(&mut rt, watch).unwrap();
|
||||
assert!(item.is_some());
|
||||
watch
|
||||
});
|
||||
}
|
||||
|
||||
fn test_detects_modification(
|
||||
fixture: Fixture,
|
||||
watch: Watch<()>,
|
||||
bg: Box<Future<Item = (), Error = ()>>,
|
||||
) {
|
||||
let Fixture {
|
||||
paths,
|
||||
dir: _dir,
|
||||
mut rt,
|
||||
} = fixture;
|
||||
|
||||
let mut files = paths
|
||||
.iter()
|
||||
.map(|path| create_and_write(path, b"a").unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
rt.spawn(bg);
|
||||
|
||||
files.iter_mut().fold(watch, |watch, file| {
|
||||
file.write_all(b"b").unwrap();
|
||||
let (item, watch) = next_change(&mut rt, watch).unwrap();
|
||||
assert!(item.is_some());
|
||||
watch
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
fn test_detects_modification_double_symlink(
|
||||
fixture: Fixture,
|
||||
watch: Watch<()>,
|
||||
bg: Box<Future<Item = (), Error = ()>>,
|
||||
) {
|
||||
let Fixture { paths, dir, mut rt } = fixture;
|
||||
|
||||
let real_data_path = dir.path().join("real_data");
|
||||
let data_path = dir.path().join("data");
|
||||
fs::create_dir(&real_data_path).unwrap();
|
||||
symlink(&real_data_path, &data_path).unwrap();
|
||||
|
||||
let mut files = paths
|
||||
.iter()
|
||||
.map(|p| {
|
||||
let path = real_data_path.clone().join(p.file_name().unwrap());
|
||||
create_and_write(path, b"a").unwrap()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for path in &paths {
|
||||
let file_path = data_path.clone().join(path.file_name().unwrap());
|
||||
// Don't assert that events are seen here, as we haven't started
|
||||
// running the watch yet.
|
||||
symlink(file_path, path).unwrap();
|
||||
}
|
||||
|
||||
// -- Below this point, the watch is running. -----------------------
|
||||
rt.spawn(bg);
|
||||
|
||||
files.iter_mut().fold(watch, |watch, file| {
|
||||
file.write_all(b"b").unwrap();
|
||||
let (item, watch) = next_change(&mut rt, watch).unwrap();
|
||||
assert!(item.is_some());
|
||||
watch
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
fn test_detects_double_symlink_retargeting(
|
||||
fixture: Fixture,
|
||||
watch: Watch<()>,
|
||||
bg: Box<Future<Item = (), Error = ()>>,
|
||||
) {
|
||||
let Fixture { paths, dir, mut rt } = fixture;
|
||||
|
||||
let real_data_path = dir.path().join("real_data");
|
||||
let real_data_path_2 = dir.path().join("real_data_2");
|
||||
let data_path = dir.path().join("data");
|
||||
fs::create_dir(&real_data_path).unwrap();
|
||||
fs::create_dir(&real_data_path_2).unwrap();
|
||||
symlink(&real_data_path, &data_path).unwrap();
|
||||
|
||||
// Create the first set of files.
|
||||
// We won't assert that any changes are detected until we actually
|
||||
// start the watch.
|
||||
for path in &paths {
|
||||
let path = real_data_path.clone().join(path.file_name().unwrap());
|
||||
create_and_write(path, b"a").unwrap();
|
||||
}
|
||||
|
||||
// Symlink those files into `data_path`
|
||||
for path in &paths {
|
||||
let data_file_path = data_path.clone().join(path.file_name().unwrap());
|
||||
symlink(data_file_path, path).unwrap();
|
||||
}
|
||||
|
||||
// Create the second set of files.
|
||||
for path in &paths {
|
||||
let path = real_data_path_2.clone().join(path.file_name().unwrap());
|
||||
create_and_write(path, b"b").unwrap();
|
||||
}
|
||||
|
||||
// -- Below this point, the watch is running. -----------------------
|
||||
rt.spawn(bg);
|
||||
|
||||
let (item, watch) = next_change(&mut rt, watch).unwrap();
|
||||
assert!(item.is_some());
|
||||
|
||||
fs::remove_dir_all(&data_path).unwrap();
|
||||
symlink(&real_data_path_2, &data_path).unwrap();
|
||||
|
||||
let (item, _) = next_change(&mut rt, watch).unwrap();
|
||||
assert!(item.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn polling_detects_create() {
|
||||
Fixture::new().test_polling(test_detects_create)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(target_os = "linux")]
|
||||
fn inotify_detects_create() {
|
||||
Fixture::new().test_inotify(test_detects_create)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
fn polling_detects_create_symlink() {
|
||||
Fixture::new().test_polling(test_detects_create_symlink)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(target_os = "linux")]
|
||||
fn inotify_detects_create_symlink() {
|
||||
Fixture::new().test_inotify(test_detects_create_symlink)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
fn polling_detects_create_double_symlink() {
|
||||
Fixture::new().test_polling(test_detects_create_double_symlink)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(target_os = "linux")]
|
||||
fn inotify_detects_create_double_symlink() {
|
||||
Fixture::new().test_inotify(test_detects_create_double_symlink)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn polling_detects_modification() {
|
||||
Fixture::new().test_polling(test_detects_modification)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(target_os = "linux")]
|
||||
fn inotify_detects_modification() {
|
||||
Fixture::new().test_inotify(test_detects_modification)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
fn polling_detects_modification_symlink() {
|
||||
Fixture::new().test_polling(test_detects_modification_symlink)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(target_os = "linux")]
|
||||
fn inotify_detects_modification_symlink() {
|
||||
Fixture::new().test_inotify(test_detects_modification_symlink)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
fn polling_detects_modification_double_symlink() {
|
||||
Fixture::new().test_polling(test_detects_modification_double_symlink)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(target_os = "linux")]
|
||||
fn inotify_detects_modification_double_symlink() {
|
||||
Fixture::new().test_inotify(test_detects_modification_double_symlink)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
fn polling_detects_double_symlink_retargeting() {
|
||||
Fixture::new().test_polling(test_detects_double_symlink_retargeting)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(target_os = "linux")]
|
||||
fn inotify_detects_double_symlink_retargeting() {
|
||||
Fixture::new().test_inotify(test_detects_double_symlink_retargeting)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn polling_detects_delete_and_recreate() {
|
||||
Fixture::new().test_polling(test_detects_delete_and_recreate)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(target_os = "linux")]
|
||||
fn inotify_detects_delete_and_recreate() {
|
||||
Fixture::new().test_inotify(test_detects_delete_and_recreate)
|
||||
}
|
||||
|
||||
}
|
|
@ -30,6 +30,9 @@ extern crate prost_types;
|
|||
extern crate quickcheck;
|
||||
extern crate rand;
|
||||
extern crate regex;
|
||||
extern crate ring;
|
||||
#[cfg(test)]
|
||||
extern crate tempdir;
|
||||
extern crate tokio;
|
||||
extern crate tokio_connect;
|
||||
extern crate tower_balance;
|
||||
|
@ -72,10 +75,12 @@ pub mod convert;
|
|||
pub mod ctx;
|
||||
mod dns;
|
||||
mod drain;
|
||||
pub mod fs_watch;
|
||||
mod inbound;
|
||||
mod logging;
|
||||
mod map_err;
|
||||
mod outbound;
|
||||
pub mod stream;
|
||||
pub mod task;
|
||||
pub mod telemetry;
|
||||
mod transparency;
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
use futures::{Poll, Stream};
|
||||
|
||||
/// Like `futures::future::Either` but for `Stream`s.
|
||||
///
|
||||
/// Combines two different `Stream`s yielding the same item and error
|
||||
/// types into a single type.
|
||||
///
|
||||
// TODO: This is probably useful outside of Conduit as well. Perhaps it
|
||||
// deserves to be in a library...
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Either<A, B> {
|
||||
A(A),
|
||||
B(B),
|
||||
}
|
||||
|
||||
// We could implement this function using the exact same code as
|
||||
// `futures::future::Either`, if we needed it. I've commented it
|
||||
// out, as we currently don't need it.
|
||||
// impl<T, A, B> Either<(T, A), (T, B)> {
|
||||
// /// Splits out the homogeneous type from an either of tuples.
|
||||
// pub fn split(self) -> (T, Either<A, B>) {
|
||||
// match self {
|
||||
// Either::A((a, b)) => (a, Either::A(b)),
|
||||
// Either::B((a, b)) => (a, Either::B(b)),
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
impl<A, B> Stream for Either<A, B>
|
||||
where
|
||||
A: Stream,
|
||||
B: Stream<Item = A::Item, Error = A::Error>,
|
||||
{
|
||||
type Item = A::Item;
|
||||
type Error = A::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<A::Item>, A::Error> {
|
||||
match *self {
|
||||
Either::A(ref mut a) => a.poll(),
|
||||
Either::B(ref mut b) => b.poll(),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -263,3 +263,50 @@ impl StdError for Error {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod test_util {
|
||||
use futures::Future;
|
||||
use tokio::{
|
||||
runtime::current_thread,
|
||||
timer,
|
||||
};
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
/// A trait that allows an executor to execute a future for up to a given
|
||||
/// time limit, and then panics if the future has not finished.
|
||||
///
|
||||
/// This is intended for use in cases where the failure mode of some future
|
||||
/// is to wait forever, rather than returning an error. When this happens,
|
||||
/// it can make debugging test failures much more difficult, as killing
|
||||
/// the tests when one has been waiting for over a minute prevents any
|
||||
/// remaining tests from running, and doesn't print any output from the
|
||||
/// killed test.
|
||||
pub trait BlockOnFor {
|
||||
/// Runs the provided future for up to `timeout`, blocking the thread
|
||||
/// until the future completes.
|
||||
fn block_on_for<F>(&mut self, timeout: Duration, f: F) -> Result<F::Item, F::Error>
|
||||
where
|
||||
F: Future;
|
||||
}
|
||||
|
||||
impl BlockOnFor for current_thread::Runtime {
|
||||
fn block_on_for<F>(&mut self, timeout: Duration, f: F) -> Result<F::Item, F::Error>
|
||||
where
|
||||
F: Future
|
||||
{
|
||||
let f = timer::Deadline::new(f, Instant::now() + timeout);
|
||||
match self.block_on(f) {
|
||||
Ok(item) => Ok(item),
|
||||
Err(e) => if e.is_inner() {
|
||||
return Err(e.into_inner().unwrap());
|
||||
} else if e.is_timer() {
|
||||
panic!("timer error: {}", e.into_timer().unwrap());
|
||||
} else {
|
||||
panic!("assertion failed: future did not finish within {:?}", timeout);
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,11 +6,11 @@ use std::{
|
|||
use super::{
|
||||
config,
|
||||
|
||||
ring::{self, rand, signature},
|
||||
rustls,
|
||||
untrusted,
|
||||
webpki,
|
||||
};
|
||||
use ring::{self, rand, signature};
|
||||
|
||||
pub struct CertResolver {
|
||||
certified_key: rustls::sign::CertifiedKey,
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::{
|
|||
io::{self, Cursor, Read},
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant, SystemTime,},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use super::{
|
||||
|
@ -16,7 +16,6 @@ use super::{
|
|||
|
||||
use futures::{future, Future, Stream};
|
||||
use futures_watch::Watch;
|
||||
use tokio::timer::Interval;
|
||||
|
||||
/// Not-yet-validated settings that are used for both TLS clients and TLS
|
||||
/// servers.
|
||||
|
@ -67,8 +66,6 @@ pub enum Error {
|
|||
EndEntityCertIsNotValid(webpki::Error),
|
||||
InvalidPrivateKey,
|
||||
TimeConversionFailed,
|
||||
#[cfg(target_os = "linux")]
|
||||
InotifyInit(io::Error),
|
||||
}
|
||||
|
||||
impl CommonSettings {
|
||||
|
@ -85,149 +82,20 @@ impl CommonSettings {
|
|||
/// The returned stream consists of each subsequent successfully loaded
|
||||
/// `CommonSettings` after each change. If the settings could not be
|
||||
/// reloaded (i.e., they were malformed), nothing is sent.
|
||||
fn stream_changes(self, interval: Duration)
|
||||
pub fn stream_changes(self, interval: Duration)
|
||||
-> impl Stream<Item = CommonConfig, Error = ()>
|
||||
{
|
||||
// If we're on Linux, first atttempt to start an Inotify watch on the
|
||||
// paths. If this fails, fall back to polling the filesystem.
|
||||
#[cfg(target_os = "linux")]
|
||||
let changes: Box<Stream<Item = (), Error = ()> + Send> =
|
||||
match self.stream_changes_inotify() {
|
||||
Ok(s) => Box::new(s),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"inotify init error: {:?}, falling back to polling",
|
||||
e
|
||||
);
|
||||
Box::new(self.stream_changes_polling(interval))
|
||||
},
|
||||
};
|
||||
|
||||
// If we're not on Linux, we can't use inotify, so simply poll the fs.
|
||||
// TODO: Use other FS events APIs (such as `kqueue`) as well, when
|
||||
// they're available.
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
let changes = self.stream_changes_polling(interval);
|
||||
|
||||
changes.filter_map(move |_|
|
||||
CommonConfig::load_from_disk(&self)
|
||||
.map_err(|e| warn!("error reloading TLS config: {:?}, falling back", e))
|
||||
.ok()
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
/// Stream changes by polling the filesystem.
|
||||
///
|
||||
/// This will poll the filesystem for changes to the files at the paths
|
||||
/// described by this `CommonSettings` every `interval`, and attempt to
|
||||
/// load a new `CommonConfig` from the files again after each change.
|
||||
///
|
||||
/// This is used on operating systems other than Linux, or on Linux if
|
||||
/// our attempt to use `inotify` failed.
|
||||
fn stream_changes_polling(&self, interval: Duration)
|
||||
-> impl Stream<Item = (), Error = ()>
|
||||
{
|
||||
fn last_modified(path: &PathBuf) -> Option<SystemTime> {
|
||||
// We have to canonicalize the path _every_ time we poll the fs,
|
||||
// rather than once when we start watching, because if it's a
|
||||
// symlink, the target may change. If that happened, and we
|
||||
// continued watching the original canonical path, we wouldn't see
|
||||
// any subsequent changes to the new symlink target.
|
||||
path.canonicalize()
|
||||
.and_then(|canonical| {
|
||||
trace!("last_modified: {:?} -> {:?}", path, canonical);
|
||||
canonical.symlink_metadata()
|
||||
.and_then(|meta| meta.modified())
|
||||
})
|
||||
.map_err(|e| if e.kind() != io::ErrorKind::NotFound {
|
||||
// Don't log if the files don't exist, since this
|
||||
// makes the logs *quite* noisy.
|
||||
warn!("error reading metadata for {:?}: {}", path, e)
|
||||
})
|
||||
.ok()
|
||||
}
|
||||
|
||||
let paths = self.paths().iter()
|
||||
.map(|&p| p.clone())
|
||||
.collect::<Vec<PathBuf>>();
|
||||
|
||||
let mut max: Option<SystemTime> = None;
|
||||
|
||||
Interval::new(Instant::now(), interval)
|
||||
.map_err(|e| error!("timer error: {:?}", e))
|
||||
.collect::<Vec<_>>();
|
||||
::fs_watch::stream_changes(paths, interval)
|
||||
.filter_map(move |_| {
|
||||
for path in &paths {
|
||||
let t = last_modified(path);
|
||||
if t > max {
|
||||
max = t;
|
||||
trace!("{:?} changed at {:?}", path, t);
|
||||
return Some(());
|
||||
}
|
||||
}
|
||||
None
|
||||
CommonConfig::load_from_disk(&self)
|
||||
.map_err(|e| warn!("error reloading TLS config: {:?}, falling back", e))
|
||||
.ok()
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
fn stream_changes_inotify(&self)
|
||||
-> Result<impl Stream<Item = (), Error = ()>, Error>
|
||||
{
|
||||
use std::{collections::HashSet, path::Path};
|
||||
use inotify::{Inotify, WatchMask};
|
||||
|
||||
// Use a broad watch mask so that we will pick up any events that might
|
||||
// indicate a change to the watched files.
|
||||
//
|
||||
// Such a broad mask may lead to reloading certs multiple times when k8s
|
||||
// modifies a ConfigMap or Secret, which is a multi-step process that we
|
||||
// see as a series CREATE, MOVED_TO, MOVED_FROM, and DELETE events.
|
||||
// However, we want to catch single events that might occur when the
|
||||
// files we're watching *don't* live in a k8s ConfigMap/Secret.
|
||||
let mask = WatchMask::CREATE
|
||||
| WatchMask::MODIFY
|
||||
| WatchMask::DELETE
|
||||
| WatchMask::MOVE
|
||||
;
|
||||
let mut inotify = Inotify::init().map_err(Error::InotifyInit)?;
|
||||
|
||||
let paths = self.paths();
|
||||
let paths = paths.into_iter()
|
||||
.map(|path| {
|
||||
// If the path to watch has a parent, watch that instead. This
|
||||
// will allow us to pick up events to files in k8s ConfigMaps
|
||||
// or Secrets (which we wouldn't detect if we watch the file
|
||||
// itself, as they are double-symlinked).
|
||||
//
|
||||
// This may also result in some false positives (if a file we
|
||||
// *don't* care about in the same dir changes, we'll still
|
||||
// reload), but that's unlikely to be a problem.
|
||||
let parent = path
|
||||
.parent()
|
||||
.map(Path::to_path_buf)
|
||||
.unwrap_or(path.to_path_buf());
|
||||
trace!("will watch {:?} for {:?}", parent, path);
|
||||
path
|
||||
})
|
||||
// Collect the paths into a `HashSet` eliminates any duplicates, to
|
||||
// conserve the number of inotify watches we create.
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
for path in paths {
|
||||
inotify.add_watch(path, mask)
|
||||
.map_err(|e| Error::Io(path.to_path_buf(), e))?;
|
||||
trace!("inotify: watch {:?}", path);
|
||||
}
|
||||
|
||||
let events = inotify.into_event_stream()
|
||||
.map(|ev| {
|
||||
trace!("inotify: event={:?}; path={:?};", ev.mask, ev.name);
|
||||
})
|
||||
.map_err(|e| error!("inotify watch error: {}", e));
|
||||
trace!("started inotify watch");
|
||||
|
||||
Ok(events)
|
||||
}
|
||||
}
|
||||
|
||||
impl CommonConfig {
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
// These crates are only used within the `tls` module.
|
||||
extern crate ring;
|
||||
extern crate rustls;
|
||||
extern crate tokio_rustls;
|
||||
extern crate untrusted;
|
||||
|
|
Loading…
Reference in New Issue