From f931dec3b387a2ec3eff511a866cdd7df97f29f1 Mon Sep 17 00:00:00 2001 From: Brian Smith Date: Thu, 29 Mar 2018 16:50:08 -1000 Subject: [PATCH] Proxy: Completely replace current set of destinations on reconnect (#632) Previosuly, when the proxy was disconnected from the Destination service and then reconnects, the proxy would not forget old, outdated entries in its cache of endpoints. If those endpoints had been removed while the proxy was disconnected then the proxy would never become aware of that. Instead, on the first message after a reconnection, replace the entire set of cached entries with the new set, which may be empty. Prior to this change, the new test outbound_destinations_reset_on_reconnect_followed_by_no_endpoints_exists passed already but outbound_destinations_reset_on_reconnect_followed_by_add_none and outbound_destinations_reset_on_reconnect_followed_by_remove_none failed. Now all these tests pass. Fixes #573 Signed-off-by: Brian Smith --- proxy/src/control/discovery.rs | 284 ++++++++++++++++++++++++------ proxy/tests/discovery.rs | 73 ++++++++ proxy/tests/support/controller.rs | 28 +++ 3 files changed, 332 insertions(+), 53 deletions(-) diff --git a/proxy/src/control/discovery.rs b/proxy/src/control/discovery.rs index 49d050377..26f6ee63a 100644 --- a/proxy/src/control/discovery.rs +++ b/proxy/src/control/discovery.rs @@ -1,3 +1,4 @@ +use std; use std::collections::{HashSet, VecDeque}; use std::collections::hash_map::{Entry, HashMap}; use std::net::SocketAddr; @@ -55,7 +56,7 @@ pub struct DiscoveryWork> { } struct DestinationSet> { - addrs: Exists>, + addrs: Exists>, query: DestinationServiceQuery, txs: Vec>, } @@ -66,6 +67,20 @@ enum Exists { No, // Affirmatively known to not exist. } +/// A cache that supports incremental updates with lazy resetting on +/// invalidation. +/// +/// When the cache `c` initially becomes invalid (i.e. it becomes +/// potentially out of sync with the data source so that incremental updates +/// would stop working), call `c.reset_on_next_modification()`; the next +/// incremental update will then replace the entire contents of the cache, +/// instead of incrementally augmenting it. Until that next modification, +/// however, the stale contents of the cache will be made available. +struct Cache { + values: HashSet, + reset_on_next_modification: bool, +} + impl Exists { fn take(&mut self) -> Exists { mem::replace(self, Exists::Unknown) @@ -275,8 +290,8 @@ where // we may already know of some addresses here, so push // them onto the new watch first match set.addrs { - Exists::Yes(ref mut addrs) => { - for &addr in addrs.iter() { + Exists::Yes(ref cache) => { + for &addr in cache.values.iter() { tx.unbounded_send(Update::Insert(addr)) .expect("unbounded_send does not fail"); } @@ -365,6 +380,7 @@ where }; if needs_reconnect { set.query = DestinationServiceQuery::NeedsReconnect; + set.reset_on_next_modification(); self.reconnects.push_back(FullyQualifiedAuthority::clone(auth)); } } @@ -391,79 +407,154 @@ impl> Destination // ===== impl DestinationSet ===== impl > DestinationSet { - fn add(&mut self, authority_for_logging: &FullyQualifiedAuthority, addrs_to_add: Addrs) - where Addrs: Iterator - { - let mut addrs = match self.addrs.take() { - Exists::Yes(addrs) => addrs, - Exists::Unknown | Exists::No => { - trace!("adding entries for {:?} that wasn't known to exist. Now assuming it does.", - authority_for_logging); - HashSet::new() + fn reset_on_next_modification(&mut self) { + match self.addrs { + Exists::Yes(ref mut cache) => { + cache.reset_on_next_modification = true; }, - }; - for addr in addrs_to_add { - if addrs.insert(addr) { - trace!("update {:?} for {:?}", addr, authority_for_logging); - // retain is used to drop any senders that are dead - self.txs.retain(|tx| { - tx.unbounded_send(Update::Insert(addr)).is_ok() - }); - } + Exists::No | + Exists::Unknown => (), } - self.addrs = Exists::Yes(addrs); } - fn remove(&mut self, authority_for_logging: &FullyQualifiedAuthority, - addrs_to_remove: Addrs) - where Addrs: Iterator + fn add(&mut self, authority_for_logging: &FullyQualifiedAuthority, addrs_to_add: A) + where A: Iterator { - let addrs = match self.addrs.take() { - Exists::Yes(mut addrs) => { - for addr in addrs_to_remove { - if addrs.remove(&addr) { - self.notify_of_removal(addr, authority_for_logging) - } - } - addrs - }, - Exists::Unknown | Exists::No => { - trace!("remove addresses for {:?} that wasn't known to exist. Now assuming it does.", - authority_for_logging); - HashSet::new() - }, + let mut cache = match self.addrs.take() { + Exists::Yes(mut cache) => cache, + Exists::Unknown | Exists::No => Cache::new(), }; - self.addrs = Exists::Yes(addrs) + cache.extend( + addrs_to_add, + &mut |addr, change| Self::on_change(&mut self.txs, authority_for_logging, addr, + change)); + self.addrs = Exists::Yes(cache); + } + + fn remove(&mut self, authority_for_logging: &FullyQualifiedAuthority, addrs_to_remove: A) + where A: Iterator + { + let cache = match self.addrs.take() { + Exists::Yes(mut cache) => { + cache.remove( + addrs_to_remove, + &mut |addr, change| Self::on_change(&mut self.txs, authority_for_logging, addr, + change)); + cache + }, + Exists::Unknown | Exists::No => Cache::new(), + }; + self.addrs = Exists::Yes(cache); } fn no_endpoints(&mut self, authority_for_logging: &FullyQualifiedAuthority, exists: bool) { trace!("no endpoints for {:?} that is known to {}", authority_for_logging, - if exists { "exist"} else { "not exist"}); + if exists { "exist" } else { "not exist" }); match self.addrs.take() { - Exists::Yes(addrs) => { - for addr in addrs { - self.notify_of_removal(addr, authority_for_logging) - } + Exists::Yes(mut cache) => { + cache.clear( + &mut |addr, change| Self::on_change(&mut self.txs, authority_for_logging, addr, + change)); }, - Exists::No | Exists::Unknown => {}, - } + Exists::Unknown | Exists::No => (), + }; self.addrs = if exists { - Exists::Yes(HashSet::new()) + Exists::Yes(Cache::new()) } else { Exists::No }; } - fn notify_of_removal(&mut self, addr: SocketAddr, - authority_for_logging: &FullyQualifiedAuthority) { - trace!("remove {:?} for {:?}", addr, authority_for_logging); + fn on_change(txs: &mut Vec>, + authority_for_logging: &FullyQualifiedAuthority, + addr: SocketAddr, + change: CacheChange) { + let (update_str, update_constructor): (&'static str, fn(SocketAddr) -> Update) = + match change { + CacheChange::Insertion => ("insert", Update::Insert), + CacheChange::Removal => ("remove", Update::Remove), + }; + trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging); // retain is used to drop any senders that are dead - self.txs.retain(|tx| { - tx.unbounded_send(Update::Remove(addr)).is_ok() + txs.retain(|tx| { + tx.unbounded_send(update_constructor(addr)).is_ok() }); } } +// ===== impl Cache ===== + +enum CacheChange { + Insertion, + Removal, +} + +impl Cache where T: Clone + Copy + Eq + std::hash::Hash { + fn new() -> Self { + Cache { + values: HashSet::new(), + reset_on_next_modification: true, + } + } + + fn extend(&mut self, iter: I, on_change: &mut F) + where I: Iterator, + F: FnMut(T, CacheChange), + { + fn extend_inner(values: &mut HashSet, iter: I, on_change: &mut F) + where T: Copy + Eq + std::hash::Hash, I: Iterator, F: FnMut(T, CacheChange) + { + for value in iter { + if values.insert(value) { + on_change(value, CacheChange::Insertion); + } + } + } + + if !self.reset_on_next_modification { + extend_inner(&mut self.values, iter, on_change); + } else { + let to_insert = iter.collect::>(); + extend_inner(&mut self.values, to_insert.iter().map(|value| *value), on_change); + self.retain(&to_insert, on_change); + } + self.reset_on_next_modification = false; + } + + fn remove(&mut self, iter: I, on_change: &mut F) + where I: Iterator, + F: FnMut(T, CacheChange) + { + if !self.reset_on_next_modification { + for value in iter { + if self.values.remove(&value) { + on_change(value, CacheChange::Removal); + } + } + } else { + self.clear(on_change); + } + self.reset_on_next_modification = false; + } + + fn clear(&mut self, on_change: &mut F) where F: FnMut(T, CacheChange) { + self.retain(&HashSet::new(), on_change) + } + + fn retain(&mut self, to_retain: &HashSet, mut on_change: F) + where F: FnMut(T, CacheChange) + { + self.values.retain(|value| { + let retain = to_retain.contains(&value); + if !retain { + on_change(*value, CacheChange::Removal) + } + retain + }); + self.reset_on_next_modification = false; + } +} + // ===== impl Bind ===== impl Bind for F @@ -560,3 +651,90 @@ fn pb_to_sock_addr(pb: TcpAddress) -> Option { None => None, } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cache_extend_reset_on_next_modification() { + let original_values = [1, 2, 3, 4].iter().cloned().collect::>(); + + // One original value, one new value. + let new_values = [3, 5].iter().cloned().collect::>(); + + { + let mut cache = Cache { + values: original_values.clone(), + reset_on_next_modification: true, + }; + cache.extend(new_values.iter().cloned(), &mut |_, _| ()); + assert_eq!(&cache.values, &new_values); + assert_eq!(cache.reset_on_next_modification, false); + } + + { + let mut cache = Cache { + values: original_values.clone(), + reset_on_next_modification: false, + }; + cache.extend(new_values.iter().cloned(), &mut |_, _| ()); + assert_eq!(&cache.values, + &[1, 2, 3, 4, 5].iter().cloned().collect::>()); + assert_eq!(cache.reset_on_next_modification, false); + } + } + + #[test] + fn cache_remove_reset_on_next_modification() { + let original_values = [1, 2, 3, 4].iter().cloned().collect::>(); + + // One original value, one new value. + let to_remove = [3, 5].iter().cloned().collect::>(); + + { + let mut cache = Cache { + values: original_values.clone(), + reset_on_next_modification: true, + }; + cache.remove(to_remove.iter().cloned(), &mut |_, _| ()); + assert_eq!(&cache.values, &HashSet::new()); + assert_eq!(cache.reset_on_next_modification, false); + } + + { + let mut cache = Cache { + values: original_values.clone(), + reset_on_next_modification: false, + }; + cache.remove(to_remove.iter().cloned(), &mut |_, _| ()); + assert_eq!(&cache.values, &[1, 2, 4].iter().cloned().collect::>()); + assert_eq!(cache.reset_on_next_modification, false); + } + } + + #[test] + fn cache_clear_reset_on_next_modification() { + let original_values = [1, 2, 3, 4].iter().cloned().collect::>(); + + { + let mut cache = Cache { + values: original_values.clone(), + reset_on_next_modification: true, + }; + cache.clear(&mut |_, _| ()); + assert_eq!(&cache.values, &HashSet::new()); + assert_eq!(cache.reset_on_next_modification, false); + } + + { + let mut cache = Cache { + values: original_values.clone(), + reset_on_next_modification: false, + }; + cache.clear(&mut |_, _| ()); + assert_eq!(&cache.values, &HashSet::new()); + assert_eq!(cache.reset_on_next_modification, false); + } + } +} diff --git a/proxy/tests/discovery.rs b/proxy/tests/discovery.rs index e730a21d3..e54355f61 100644 --- a/proxy/tests/discovery.rs +++ b/proxy/tests/discovery.rs @@ -3,6 +3,8 @@ use self::support::*; macro_rules! generate_tests { (server: $make_server:path, client: $make_client:path) => { + use conduit_proxy_controller_grpc as pb; + #[test] fn outbound_asks_controller_api() { let _ = env_logger::try_init(); @@ -32,6 +34,77 @@ macro_rules! generate_tests { assert_eq!(client.get("/recon"), "nect"); } + #[test] + #[cfg_attr(not(feature = "flaky_tests"), ignore)] + fn outbound_destinations_reset_on_reconnect_followed_by_no_endpoints_exists() { + outbound_destinations_reset_on_reconnect(move || { + Some(controller::destination_exists_with_no_endpoints()) + }) + } + + #[test] + #[cfg_attr(not(feature = "flaky_tests"), ignore)] + fn outbound_destinations_reset_on_reconnect_followed_by_add_none() { + outbound_destinations_reset_on_reconnect(move || { + Some(controller::destination_add_none()) + }) + } + + #[test] + #[cfg_attr(not(feature = "flaky_tests"), ignore)] + fn outbound_destinations_reset_on_reconnect_followed_by_remove_none() { + outbound_destinations_reset_on_reconnect(move || { + Some(controller::destination_remove_none()) + }) + } + + fn outbound_destinations_reset_on_reconnect(f: F) + where F: Fn() -> Option + Send + 'static + { + use std::thread; + let _ = env_logger::try_init(); + let mut env = config::TestEnv::new(); + + // set the bind timeout to 100 ms. + env.put(config::ENV_BIND_TIMEOUT, "100".to_owned()); + + let srv = $make_server().route("/", "hello").run(); + let ctrl = controller::new() + .destination("initially-exists.ns.svc.cluster.local", srv.addr) + .destination_close("trigger-close.ns.svc.cluster.local") + .destination_fn("initially-exists.ns.svc.cluster.local", f) + .run(); + + let proxy = proxy::new() + .controller(ctrl) + .outbound(srv) + .run_with_test_env(env); + + let initially_exists = + $make_client(proxy.outbound, "initially-exists.ns.svc.cluster.local"); + assert_eq!(initially_exists.get("/"), "hello"); + + // Try to access a different server which will trigger the `destination_close()` + // above. + { + let trigger_close = + $make_client(proxy.outbound, "trigger-close.ns.svc.cluster.local"); + let mut req = trigger_close.request_builder("/"); + let rsp = trigger_close.request(req.method("GET")); + // the request should time out + assert_eq!(rsp.status(), http::StatusCode::INTERNAL_SERVER_ERROR); + } + + // Wait for the reconnect to happen. TODO: Replace this flaky logic. + thread::sleep(Duration::from_millis(1000)); + + // This will time out since there are no endpoints. + let mut req = initially_exists.request_builder("/"); + let rsp = initially_exists.request(req.method("GET")); + // the request should time out + assert_eq!(rsp.status(), http::StatusCode::INTERNAL_SERVER_ERROR); + } + #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_times_out() { diff --git a/proxy/tests/support/controller.rs b/proxy/tests/support/controller.rs index 220b47453..687d5de7a 100644 --- a/proxy/tests/support/controller.rs +++ b/proxy/tests/support/controller.rs @@ -288,6 +288,34 @@ pub fn destination_update(addr: SocketAddr) -> pb::destination::Update { } } +pub fn destination_add_none() -> pb::destination::Update { + pb::destination::Update { + update: Some(pb::destination::update::Update::Add( + pb::destination::WeightedAddrSet { + addrs: Vec::new(), + }, + )), + } +} + +pub fn destination_remove_none() -> pb::destination::Update { + pb::destination::Update { + update: Some(pb::destination::update::Update::Remove( + pb::destination::AddrSet { + addrs: Vec::new(), + }, + )), + } +} + +pub fn destination_exists_with_no_endpoints() -> pb::destination::Update { + pb::destination::Update { + update: Some(pb::destination::update::Update::NoEndpoints ( + pb::destination::NoEndpoints { exists: true } + )), + } +} + fn ip_conv(ip: IpAddr) -> pb::common::IpAddress { match ip { IpAddr::V4(v4) => pb::common::IpAddress {