From ae8b7d739156ab4b3a0aea763f2f5325a38c7271 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 26 Jun 2018 15:56:21 -0700 Subject: [PATCH] proxy: Fix false positives in polling-based fs watches (#1140) There are currently two issues which can lead to false positives (changes being reported when files have not actually changed) in the polling-based filesystem watch implementation. The first issue is that when checking each watched file for changes, the loop iterating over each path currently short-circuits as soon as it detects a change. This means that if two or more files have changed, the first time we poll the fs, we will see the first change, then if we poll again, we will see the next change, and so on. This branch fixes that issue by always hashing all the watched files, even if a change has already been detected. This way, if all the files change between one poll and the next, we no longer generate additional change events until a file actually changes again. The other issue is that the old implementation would treat any instance of a "file not found" error as indicating that the file had been deleted, and generate a change event. This leads to changes repeatedly being detected as long as a file does not exist, rather than a single time when the file's existence state actually changes. This branch fixes that issue as well, by only generating change events on "file not found" errors if the file existed the last time it was polled. Otherwise, if a file did not previously exist, we no longer generate a new event. I've verified both of these fixes through manual testing, as well as a new test for the second issue. The new test fails on master but passes on this branch. Signed-off-by: Eliza Weisman --- Cargo.lock | 1 + proxy/Cargo.toml | 1 + proxy/src/fs_watch.rs | 169 +++++++++++++++++++++++++++++++++--------- proxy/src/lib.rs | 1 + 4 files changed, 139 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef03c41ab..8392c536d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,6 +155,7 @@ dependencies = [ "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-rustls 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-signal 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-discover 0.1.0 (git+https://github.com/tower-rs/tower)", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 37d7f0633..236897d56 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -37,6 +37,7 @@ regex = "1.0.0" # networking tokio = "0.1.7" tokio-signal = "0.2" +tokio-timer = "0.2.4" # for tokio_timer::clock tokio-connect = { git = "https://github.com/carllerche/tokio-connect" } tower-balance = { git = "https://github.com/tower-rs/tower" } tower-buffer = { git = "https://github.com/tower-rs/tower" } diff --git a/proxy/src/fs_watch.rs b/proxy/src/fs_watch.rs index 5a548612f..0be90133b 100644 --- a/proxy/src/fs_watch.rs +++ b/proxy/src/fs_watch.rs @@ -1,9 +1,9 @@ -use std::{fs, io, cell::RefCell, path::{Path, PathBuf}, time::{Duration, Instant}}; +use std::{fs, io, cell::RefCell, path::{Path, PathBuf}, time::Duration}; use futures::Stream; use ring::digest::{self, Digest}; -use tokio::timer::Interval; +use tokio_timer::{clock, Interval}; /// Stream changes to the files at a group of paths. pub fn stream_changes(paths: I, interval: Duration) -> impl Stream @@ -46,31 +46,24 @@ where { let files = paths.into_iter().map(PathAndHash::new).collect::>(); - Interval::new(Instant::now(), interval) + Interval::new(clock::now(), interval) .map_err(|e| error!("timer error: {:?}", e)) - .filter_map(move |_| { + .filter(move |_| { + let mut any_changes = false; for file in &files { - match file.has_changed() { - Ok(true) => { - trace!("{:?} changed", &file.path); - return Some(()); - } - Ok(false) => { - // If the hash hasn't changed, keep going. - } - Err(ref e) if e.kind() == io::ErrorKind::NotFound => { - // A file not found error indicates that the file - // has been deleted. - trace!("{:?} deleted", &file.path); - return Some(()); - } - Err(ref e) => { + let has_changed = file + .update_and_check() + .unwrap_or_else(|e| { warn!("error hashing {:?}: {}", &file.path, e); - } + false + }); + if has_changed { + any_changes = true; } } - None + any_changes }) + .map(|_| ()) } #[cfg(target_os = "linux")] @@ -110,26 +103,62 @@ struct PathAndHash { path: PathBuf, /// The last SHA-384 digest of the file, if we have previously hashed it. - last_hash: RefCell>, + curr_hash: RefCell>, } impl PathAndHash { fn new>(path: P) -> Self { Self { path: path.as_ref().to_path_buf(), - last_hash: RefCell::new(None), + curr_hash: RefCell::new(None), } } - fn has_changed(&self) -> io::Result { - let contents = fs::read(&self.path)?; - let hash = Some(digest::digest(&digest::SHA256, &contents[..])); - let changed = self.last_hash.borrow().as_ref().map(Digest::as_ref) - != hash.as_ref().map(Digest::as_ref); - if changed { - self.last_hash.replace(hash); + /// Attempts to update the hash for this file and checks if it has changed. + /// + /// # Returns + /// - `Ok(true)` if the file was read successfully and the hash has changed. + /// - `Ok(false)` if we were able to read the file but the has has not + /// changed. + /// - `Err(io::Error)` if an error occurred reading the file. + fn update_and_check(&self) -> io::Result { + match fs::read(&self.path) { + Ok(contents) => { + // If we were able to read the file, compare the hash of its + // current contents with the previous hash to determine if it + // has changed. + let curr_hash = Some(digest::digest(&digest::SHA256, &contents[..])); + let changed = { + // We can't compare `ring::Digest`s directly, so we have to + // borrow the hashes as byte slices to compare them. + let prev_hash = self.curr_hash.borrow(); + let prev_hash_bytes = prev_hash.as_ref().map(Digest::as_ref); + let curr_hash_bytes = curr_hash.as_ref().map(Digest::as_ref); + prev_hash_bytes != curr_hash_bytes + }; + if changed { + trace!("{:?} changed", &self.path); + self.curr_hash.replace(curr_hash); + } + Ok(changed) + }, + Err(ref e) if e.kind() == io::ErrorKind::NotFound => { + if self.curr_hash.borrow().is_some() { + // If we have a previous hash, then the file was deleted, + // so it has changed. + trace!("{:?} deleted", &self.path); + self.curr_hash.replace(None); + Ok(true) + } else { + // Otherwise, it didn't exist previously, so there hasn't + // been a change. + Ok(false) + } + }, + // Propagate any other errors. + Err(e) => Err(e), } - Ok(changed) + } } @@ -231,12 +260,18 @@ mod tests { use tempdir::TempDir; use tokio::runtime::current_thread::Runtime; + use tokio_timer::{clock, Interval}; #[cfg(not(target_os = "windows"))] use std::os::unix::fs::symlink; - use std::{fs::{self, File}, io::Write, path::Path, time::Duration}; + use std::{ + fs::{self, File}, + io::Write, + path::Path, + time::Duration, + }; - use futures::{Future, Sink, Stream}; + use futures::{future, Future, Sink, Stream}; use futures_watch::{Watch, WatchError}; struct Fixture { @@ -373,6 +408,64 @@ mod tests { }); } + fn test_nonexistent_files_dont_file_delete_events( + fixture: Fixture, + watch: Watch<()>, + bg: Box>, + ) { + // This test confirms that when a file has been deleted, + // it's nonexistence doesn't continuously generate new + // "file deleted" events. + let Fixture { + paths, + dir: _dir, + mut rt, + } = fixture; + + rt.spawn(bg); + + let watch = paths.iter().fold(watch, |watch, ref path| { + create_and_write(path, b"A").unwrap(); + + let (item, watch) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + watch + }); + + let watch = paths.iter().fold(watch, |watch, ref path| { + fs::remove_file(path).unwrap(); + println!("deleted {:?}", path); + + let (item, watch) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + watch + }); + + let watch2 = watch.clone(); + // Check if the stream has become ready every one second. + let stream = Interval::new(clock::now(), Duration::from_secs(1)) + .map(|_| { + let mut watch = watch2.clone(); + // The stream should not be ready, since the file + // system hasn't changed yet. + assert!(!watch.poll().unwrap().is_ready()); + () + }) + .take(5) + .fold((), |_, ()| future::ok(())); + + rt.block_on(stream).unwrap(); + + // Creating the files again should generate a new event + paths.iter().fold(watch, |watch, ref path| { + create_and_write(path, b"B").unwrap(); + + let (item, watch) = next_change(&mut rt, watch).unwrap(); + assert!(item.is_some()); + watch + }); + } + #[cfg(not(target_os = "windows"))] fn test_detects_create_symlink( fixture: Fixture, @@ -689,4 +782,14 @@ mod tests { Fixture::new().test_inotify(test_detects_delete_and_recreate) } + #[test] + fn polling_nonexistent_files_dont_file_delete_events() { + Fixture::new().test_polling(test_nonexistent_files_dont_file_delete_events) + } + + #[test] + #[cfg(target_os = "linux")] + fn inotify_nonexistent_files_dont_file_delete_events() { + Fixture::new().test_inotify(test_detects_delete_and_recreate) + } } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 388195ba8..e9d0c2f0c 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -37,6 +37,7 @@ extern crate ring; extern crate tempdir; extern crate tokio; extern crate tokio_connect; +extern crate tokio_timer; extern crate tower_balance; extern crate tower_buffer; extern crate tower_discover;