diff --git a/proxy/src/control/cache.rs b/proxy/src/control/cache.rs index 3f93628eb..c795df675 100644 --- a/proxy/src/control/cache.rs +++ b/proxy/src/control/cache.rs @@ -1,5 +1,5 @@ -use indexmap::{map, IndexMap}; -use std::borrow::Borrow; +use indexmap::{IndexMap, IndexSet}; +use indexmap::map::{self, Entry}; use std::hash::Hash; use std::iter::IntoIterator; use std::mem; @@ -27,13 +27,14 @@ pub enum Exists { No, } -pub enum CacheChange { +#[derive(Debug, Eq, PartialEq)] +pub enum CacheChange<'value, K, V: 'value> { /// A new key was inserted. - Insertion, + Insertion { key: K, value: &'value V }, /// A key-value pair was removed. - Removal, + Removal { key: K }, /// The value mapped to an existing key was changed. - Modification, + Modification { key: K, new_value: &'value V }, } // ===== impl Exists ===== @@ -48,9 +49,8 @@ impl Exists { impl Cache where - K: Copy + Clone, - K: Hash + Eq, - V: PartialEq + Clone, + K: Copy + Hash + Eq, + V: PartialEq, { pub fn new() -> Self { Cache { @@ -70,48 +70,62 @@ where pub fn update_union(&mut self, iter: I, on_change: &mut F) where I: Iterator, - F: FnMut((K, V), CacheChange), + F: for<'value> FnMut(CacheChange<'value, K, V>), { - fn update_inner( + fn update_inner( inner: &mut IndexMap, - iter: I, + key: K, + new_value: V, on_change: &mut F ) where - K: Eq + Hash + Copy + Clone, - V: PartialEq + Clone, - I: Iterator, - F: FnMut((K, V), CacheChange), + K: Eq + Hash + Copy, + V: PartialEq, + F: for<'value> FnMut(CacheChange<'value, K, V>), { - for (key, value) in iter { - match inner.insert(key, value.clone()) { - // If the returned value is equal to the inserted value, - // then the inserted key was already present in the map - // and the value is unchanged. Do nothing. - Some(ref old_value) if old_value == &value => {}, - // If `insert` returns `Some` with a different value than - // the one we inserted, then we changed the value for that - // key. - Some(_) => - on_change((key, value), CacheChange::Modification), - // If `insert` returns `None`, then there was no old value - // previously present. Therefore, we inserted a new value - // into the cache. - None => on_change((key, value), CacheChange::Insertion), + match inner.entry(key) { + Entry::Occupied(ref mut entry) if *entry.get() != new_value => { + on_change(CacheChange::Modification { + key, + new_value: &new_value, + }); + entry.insert(new_value); + }, + Entry::Vacant(entry) => { + on_change(CacheChange::Insertion { + key, + value: &new_value, + }); + entry.insert(new_value); + }, + Entry::Occupied(_) => { + // Entry is occupied but the value is the same as the new + // value, so skip it. } } } if !self.reset_on_next_modification { - update_inner(&mut self.inner, iter, on_change); + // We don't need to invalidate the cache, so just update + // to add the new keys. + iter.for_each(|(k, v)| { + update_inner(&mut self.inner, k, v, on_change) + }); } else { - let to_insert = iter.collect::>(); - update_inner( - &mut self.inner, - to_insert.iter().map(|(k, v)| (*k, v.clone())), - on_change, - ); - self.update_intersection(to_insert, on_change); + // The cache was invalidated, so after updating entries present + // in `to_insert`, remove any keys not present in `to_insert`. + let retained_keys: IndexSet = iter + .map(|(k, v)| { + update_inner(&mut self.inner, k, v, on_change); + k + }) + .collect(); + self.inner.retain(|key, _| if retained_keys.contains(key) { + true + } else { + on_change(CacheChange::Removal { key: *key }); + false + }); } self.reset_on_next_modification = false; } @@ -119,12 +133,12 @@ where pub fn remove(&mut self, iter: I, on_change: &mut F) where I: Iterator, - F: FnMut((K, V), CacheChange), + F: for<'value> FnMut(CacheChange<'value, K, V>), { if !self.reset_on_next_modification { for key in iter { - if let Some(value) = self.inner.remove(&key) { - on_change((key, value), CacheChange::Removal); + if let Some(_) = self.inner.remove(&key) { + on_change(CacheChange::Removal { key }); } } } else { @@ -135,44 +149,11 @@ where pub fn clear(&mut self, on_change: &mut F) where - F: FnMut((K, V), CacheChange), + F: for<'value> FnMut(CacheChange<'value, K, V>), { - self.update_intersection(IndexMap::new(), on_change) - } - - /// Update the cache to contain the intersection of its current contents - /// and the key-value pairs in `to_update`. Pairs not present in - /// `to_update` will be removed from the cache, and any keys in the cache - /// with different inner from those in `to_update` will be ovewritten to - /// match `to_update`. - pub fn update_intersection( - &mut self, - mut to_update: IndexMap, - mut on_change: F - ) - where - F: FnMut((K, V), CacheChange), - K: Borrow, - Q: Hash + Eq, - { - self.inner.retain(|key, value| { - match to_update.remove(key.borrow()) { - // New value matches old value. Do nothing. - Some(ref new_value) if new_value == value => true, - // If the new value isn't equal to the old value, overwrite - // the old value. - Some(new_value) => { - let _ = mem::replace(value, new_value.clone()); - on_change((*key, new_value), CacheChange::Modification); - true - }, - // Key doesn't exist, remove it from the map. - None => { - on_change((*key, value.clone()), CacheChange::Removal); - false - } - } - }); + for (key, _) in self.inner.drain(..) { + on_change(CacheChange::Removal { key }) + }; self.reset_on_next_modification = false; } } @@ -193,6 +174,87 @@ where mod tests { use super::*; + #[test] + fn update_union_fires_events() { + let mut cache = Cache { + inner: indexmap!{ 1 => "one", 2 => "two", }, + reset_on_next_modification: false, + }; + + let mut insertions = 0; + let mut modifications = 0; + + cache.update_union( + indexmap!{ 1 => "one", 2 => "2", 3 => "three"}.into_iter(), + &mut |change: CacheChange| match change { + CacheChange::Removal { .. } => { + panic!("no removals should have been fired!"); + }, + CacheChange::Insertion { key, value } => { + insertions += 1; + assert_eq!(key, 3); + assert_eq!(value, &"three"); + }, + CacheChange::Modification { key, new_value } => { + modifications += 1; + assert_eq!(key, 2); + assert_eq!(new_value, &"2"); + } + } + ); + + cache.update_union( + indexmap!{ 1 => "1", 2 => "2", 3 => "3"}.into_iter(), + &mut |change: CacheChange| match change { + CacheChange::Removal { .. } => { + panic!("no removals should have been fired!"); + }, + CacheChange::Insertion { .. } => { + panic!("no insertions should have been fired!"); + }, + CacheChange::Modification { key, new_value } => { + modifications += 1; + assert!(key == 1 || key == 3); + assert!(new_value == &"1" || new_value == &"3") + } + } + ); + assert_eq!(insertions, 1); + assert_eq!(modifications, 3); + + cache.update_union( + indexmap!{ 4 => "four", 5 => "five"}.into_iter(), + &mut |change: CacheChange| match change { + CacheChange::Removal { .. } => { + panic!("no removals should have been fired!"); + }, + CacheChange::Insertion { key, value } => { + insertions += 1; + assert!(key == 4 || key == 5); + assert!(value == &"four" || value == &"five") + }, + CacheChange::Modification { .. } => { + panic!("no insertions should have been fired!"); + } + } + ); + assert_eq!(insertions, 3); + assert_eq!(modifications, 3); + } + + #[test] + fn clear_fires_removal_event() { + let mut cache = Cache { + inner: indexmap!{ 1 => () }, + reset_on_next_modification: false, + }; + cache.clear( + &mut |change| { + assert_eq!(change, CacheChange::Removal{ key: 1, }); + }, + ) + } + #[test] fn update_union_reset_on_next_modification() { let original_values = indexmap!{ 1 => (), 2 => (), 3 => (), 4 => () }; @@ -206,7 +268,7 @@ mod tests { }; cache.update_union( new_values.iter().map(|(&k, v)| (k, v.clone())), - &mut |_, _| (), + &mut |_| (), ); assert_eq!(&cache.inner, &new_values); assert_eq!(cache.reset_on_next_modification, false); @@ -219,7 +281,7 @@ mod tests { }; cache.update_union( new_values.iter().map(|(&k, v)| (k, v.clone())), - &mut |_, _| (), + &mut |_| (), ); assert_eq!( &cache.inner, @@ -241,7 +303,7 @@ mod tests { inner: original_values.clone(), reset_on_next_modification: true, }; - cache.remove(to_remove.iter().map(|(&k, _)| k), &mut |_, _| ()); + cache.remove(to_remove.iter().map(|(&k, _)| k), &mut |_| ()); assert_eq!(&cache.inner, &IndexMap::new()); assert_eq!(cache.reset_on_next_modification, false); } @@ -251,7 +313,7 @@ mod tests { inner: original_values.clone(), reset_on_next_modification: false, }; - cache.remove(to_remove.iter().map(|(&k, _)| k), &mut |_, _| ()); + cache.remove(to_remove.iter().map(|(&k, _)| k), &mut |_| ()); assert_eq!(&cache.inner, &indexmap!{1 => (), 2 => (), 4 => ()}); assert_eq!(cache.reset_on_next_modification, false); } @@ -266,7 +328,7 @@ mod tests { inner: original_values.clone(), reset_on_next_modification: true, }; - cache.clear(&mut |_, _| ()); + cache.clear(&mut |_| ()); assert_eq!(&cache.inner, &IndexMap::new()); assert_eq!(cache.reset_on_next_modification, false); } @@ -276,7 +338,7 @@ mod tests { inner: original_values.clone(), reset_on_next_modification: false, }; - cache.clear(&mut |_, _| ()); + cache.clear(&mut |_| ()); assert_eq!(&cache.inner, &IndexMap::new()); assert_eq!(cache.reset_on_next_modification, false); } diff --git a/proxy/src/control/discovery.rs b/proxy/src/control/discovery.rs index fb22a9109..f0bfa3ec9 100644 --- a/proxy/src/control/discovery.rs +++ b/proxy/src/control/discovery.rs @@ -127,7 +127,7 @@ enum RxError { Stream(grpc::Error), } -#[derive(Debug)] +#[derive(Debug, Clone)] enum Update { Insert(SocketAddr, Metadata), Remove(SocketAddr), @@ -611,14 +611,7 @@ impl > DestinationSet { }; cache.update_union( addrs_to_add, - &mut |(addr, meta), change| Self::on_change( - &mut self.txs, - authority_for_logging, - addr, - meta, - change, - ) - ); + &mut |change| Self::on_change(&mut self.txs, authority_for_logging, change)); self.addrs = Exists::Yes(cache); } @@ -629,14 +622,7 @@ impl > DestinationSet { Exists::Yes(mut cache) => { cache.remove( addrs_to_remove, - &mut |(addr, meta), change| Self::on_change( - &mut self.txs, - authority_for_logging, - addr, - meta, - change, - ) - ); + &mut |change| Self::on_change(&mut self.txs, authority_for_logging, change)); cache }, Exists::Unknown | Exists::No => Cache::new(), @@ -650,14 +636,7 @@ impl > DestinationSet { match self.addrs.take() { Exists::Yes(mut cache) => { cache.clear( - &mut |(addr, meta), change| Self::on_change( - &mut self.txs, - authority_for_logging, - addr, - meta, - change - ) - ); + &mut |change| Self::on_change(&mut self.txs, authority_for_logging, change)); }, Exists::Unknown | Exists::No => (), }; @@ -670,21 +649,19 @@ impl > DestinationSet { fn on_change(txs: &mut Vec>, authority_for_logging: &DnsNameAndPort, - addr: SocketAddr, - meta: Metadata, - change: CacheChange) { - let (update_str, update_constructor): (&'static str, fn(SocketAddr, Metadata) -> Update) = - match change { - CacheChange::Insertion => ("insert", Update::Insert), - CacheChange::Removal => - ("remove", |addr, _| Update::Remove(addr)), - CacheChange::Modification => - ("change metadata for", Update::ChangeMetadata), - }; + change: CacheChange) { + let (update_str, update, addr) = match change { + CacheChange::Insertion { key, value } => + ("insert", Update::Insert(key, value.clone()), key), + CacheChange::Removal { key } => + ("remove", Update::Remove(key), key), + CacheChange::Modification { key, new_value } => + ("change metadata for", Update::ChangeMetadata(key, new_value.clone()), key), + }; trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging); // retain is used to drop any senders that are dead txs.retain(|tx| { - tx.unbounded_send(update_constructor(addr, meta.clone())).is_ok() + tx.unbounded_send(update.clone()).is_ok() }); } }