Make `control::Cache` key-value in order to store discovery metadata (#688)
This PR changes the proxy's `control::Cache` module from a set to a key-value map. This change is made in order to use the values in the map to store metadata from the Destination API, but allow evictions and insertions to be based only on the `SocketAddr` of the destination entry. This will make code in PR #661 much simpler, by removing the need to wrap `SocketAddr`s in the cache in a `Labeled` struct for storing metadata, and the need for custom `Borrow` implementations on that type. Furthermore, I've changed from using a standard library `HashSet`/`HashMap` as the underlying collection to using `IndexMap`, as we suspect that this will result in performance improvements. Currently, as `master` has no additional metadata to associate with cache entries, the type of the values in the map is `()`. When #661 merges, the values will actually contain metadata. If we suspect that there are many other use-cases for `control::Cache` where it will be treated as a set rather than a map, we may want to provide a separate set of impls for `Cache<T, ()>` (like `std::HashSet`) to make the API more ergonomic in this case.
This commit is contained in:
parent
3c5f1c824f
commit
d39457f10d
|
@ -1,9 +1,11 @@
|
|||
use std;
|
||||
use std::collections::HashSet;
|
||||
use indexmap::{map, IndexMap};
|
||||
use std::borrow::Borrow;
|
||||
use std::hash::Hash;
|
||||
use std::iter::IntoIterator;
|
||||
use std::mem;
|
||||
|
||||
/// A cache that supports incremental updates with lazy resetting on
|
||||
/// invalidation.
|
||||
/// A key-value cache that supports incremental updates with lazy resetting
|
||||
/// on invalidation.
|
||||
///
|
||||
/// When the cache `c` initially becomes invalid (i.e. it becomes
|
||||
/// potentially out of sync with the data source so that incremental updates
|
||||
|
@ -11,20 +13,27 @@ use std::mem;
|
|||
/// incremental update will then replace the entire contents of the cache,
|
||||
/// instead of incrementally augmenting it. Until that next modification,
|
||||
/// however, the stale contents of the cache will be made available.
|
||||
pub struct Cache<T> {
|
||||
values: HashSet<T>,
|
||||
pub struct Cache<K, V> {
|
||||
inner: IndexMap<K, V>,
|
||||
reset_on_next_modification: bool,
|
||||
}
|
||||
|
||||
pub enum Exists<T> {
|
||||
Unknown, // Unknown if the item exists or not
|
||||
/// Unknown if the item exists or not.
|
||||
Unknown,
|
||||
/// Affirmatively known to exist.
|
||||
Yes(T),
|
||||
No, // Affirmatively known to not exist.
|
||||
/// Affirmatively known to not exist.
|
||||
No,
|
||||
}
|
||||
|
||||
pub enum CacheChange {
|
||||
/// A new key was inserted.
|
||||
Insertion,
|
||||
/// A key-value pair was removed.
|
||||
Removal,
|
||||
/// The value mapped to an existing key was changed.
|
||||
Modification,
|
||||
}
|
||||
|
||||
// ===== impl Exists =====
|
||||
|
@ -37,54 +46,85 @@ impl<T> Exists<T> {
|
|||
|
||||
// ===== impl Cache =====
|
||||
|
||||
impl<T> Cache<T> where T: Clone + Copy + Eq + std::hash::Hash {
|
||||
impl<K, V> Cache<K, V>
|
||||
where
|
||||
K: Copy + Clone,
|
||||
K: Hash + Eq,
|
||||
V: PartialEq + Clone,
|
||||
{
|
||||
pub fn new() -> Self {
|
||||
Cache {
|
||||
values: HashSet::new(),
|
||||
inner: IndexMap::new(),
|
||||
reset_on_next_modification: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn values(&self) -> &HashSet<T> {
|
||||
&self.values
|
||||
}
|
||||
|
||||
pub fn set_reset_on_next_modification(&mut self) {
|
||||
self.reset_on_next_modification = true;
|
||||
}
|
||||
|
||||
pub fn extend<I, F>(&mut self, iter: I, on_change: &mut F)
|
||||
where I: Iterator<Item = T>,
|
||||
F: FnMut(T, CacheChange),
|
||||
/// Update the cache to contain the union of its current contents and the
|
||||
/// key-value pairs in `iter`. Pairs not present in the cache will be
|
||||
/// inserted, and keys present in both the cache and the iterator will be
|
||||
/// updated so that their inner match those in the iterator.
|
||||
pub fn update_union<I, F>(&mut self, iter: I, on_change: &mut F)
|
||||
where
|
||||
I: Iterator<Item = (K, V)>,
|
||||
F: FnMut((K, V), CacheChange),
|
||||
{
|
||||
fn extend_inner<T, I, F>(values: &mut HashSet<T>, iter: I, on_change: &mut F)
|
||||
where T: Copy + Eq + std::hash::Hash, I: Iterator<Item = T>, F: FnMut(T, CacheChange)
|
||||
fn update_inner<K, V, I, F>(
|
||||
inner: &mut IndexMap<K, V>,
|
||||
iter: I,
|
||||
on_change: &mut F
|
||||
)
|
||||
where
|
||||
K: Eq + Hash + Copy + Clone,
|
||||
V: PartialEq + Clone,
|
||||
I: Iterator<Item = (K, V)>,
|
||||
F: FnMut((K, V), CacheChange),
|
||||
{
|
||||
for value in iter {
|
||||
if values.insert(value) {
|
||||
on_change(value, CacheChange::Insertion);
|
||||
for (key, value) in iter {
|
||||
match inner.insert(key, value.clone()) {
|
||||
// If the returned value is equal to the inserted value,
|
||||
// then the inserted key was already present in the map
|
||||
// and the value is unchanged. Do nothing.
|
||||
Some(ref old_value) if old_value == &value => {},
|
||||
// If `insert` returns `Some` with a different value than
|
||||
// the one we inserted, then we changed the value for that
|
||||
// key.
|
||||
Some(_) =>
|
||||
on_change((key, value), CacheChange::Modification),
|
||||
// If `insert` returns `None`, then there was no old value
|
||||
// previously present. Therefore, we inserted a new value
|
||||
// into the cache.
|
||||
None => on_change((key, value), CacheChange::Insertion),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !self.reset_on_next_modification {
|
||||
extend_inner(&mut self.values, iter, on_change);
|
||||
update_inner(&mut self.inner, iter, on_change);
|
||||
} else {
|
||||
let to_insert = iter.collect::<HashSet<T>>();
|
||||
extend_inner(&mut self.values, to_insert.iter().map(|value| *value), on_change);
|
||||
self.retain(&to_insert, on_change);
|
||||
let to_insert = iter.collect::<IndexMap<K, V>>();
|
||||
update_inner(
|
||||
&mut self.inner,
|
||||
to_insert.iter().map(|(k, v)| (*k, v.clone())),
|
||||
on_change,
|
||||
);
|
||||
self.update_intersection(to_insert, on_change);
|
||||
}
|
||||
self.reset_on_next_modification = false;
|
||||
}
|
||||
|
||||
pub fn remove<I, F>(&mut self, iter: I, on_change: &mut F)
|
||||
where I: Iterator<Item = T>,
|
||||
F: FnMut(T, CacheChange)
|
||||
where
|
||||
I: Iterator<Item = K>,
|
||||
F: FnMut((K, V), CacheChange),
|
||||
{
|
||||
if !self.reset_on_next_modification {
|
||||
for value in iter {
|
||||
if self.values.remove(&value) {
|
||||
on_change(value, CacheChange::Removal);
|
||||
for key in iter {
|
||||
if let Some(value) = self.inner.remove(&key) {
|
||||
on_change((key, value), CacheChange::Removal);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -93,106 +133,151 @@ impl<T> Cache<T> where T: Clone + Copy + Eq + std::hash::Hash {
|
|||
self.reset_on_next_modification = false;
|
||||
}
|
||||
|
||||
pub fn clear<F>(&mut self, on_change: &mut F) where F: FnMut(T, CacheChange) {
|
||||
self.retain(&HashSet::new(), on_change)
|
||||
pub fn clear<F>(&mut self, on_change: &mut F)
|
||||
where
|
||||
F: FnMut((K, V), CacheChange),
|
||||
{
|
||||
self.update_intersection(IndexMap::new(), on_change)
|
||||
}
|
||||
|
||||
pub fn retain<F>(&mut self, to_retain: &HashSet<T>, mut on_change: F)
|
||||
where F: FnMut(T, CacheChange)
|
||||
/// Update the cache to contain the intersection of its current contents
|
||||
/// and the key-value pairs in `to_update`. Pairs not present in
|
||||
/// `to_update` will be removed from the cache, and any keys in the cache
|
||||
/// with different inner from those in `to_update` will be ovewritten to
|
||||
/// match `to_update`.
|
||||
pub fn update_intersection<F, Q>(
|
||||
&mut self,
|
||||
mut to_update: IndexMap<Q, V>,
|
||||
mut on_change: F
|
||||
)
|
||||
where
|
||||
F: FnMut((K, V), CacheChange),
|
||||
K: Borrow<Q>,
|
||||
Q: Hash + Eq,
|
||||
{
|
||||
self.values.retain(|value| {
|
||||
let retain = to_retain.contains(&value);
|
||||
if !retain {
|
||||
on_change(*value, CacheChange::Removal)
|
||||
self.inner.retain(|key, value| {
|
||||
match to_update.remove(key.borrow()) {
|
||||
// New value matches old value. Do nothing.
|
||||
Some(ref new_value) if new_value == value => true,
|
||||
// If the new value isn't equal to the old value, overwrite
|
||||
// the old value.
|
||||
Some(new_value) => {
|
||||
let _ = mem::replace(value, new_value.clone());
|
||||
on_change((*key, new_value), CacheChange::Modification);
|
||||
true
|
||||
},
|
||||
// Key doesn't exist, remove it from the map.
|
||||
None => {
|
||||
on_change((*key, value.clone()), CacheChange::Removal);
|
||||
false
|
||||
}
|
||||
}
|
||||
retain
|
||||
});
|
||||
self.reset_on_next_modification = false;
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, K, V> IntoIterator for &'a Cache<K, V>
|
||||
where
|
||||
K: Hash + Eq,
|
||||
{
|
||||
type IntoIter = map::Iter<'a, K, V>;
|
||||
type Item = <map::Iter<'a, K, V> as Iterator>::Item;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.inner.iter()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn extend_reset_on_next_modification() {
|
||||
let original_values = [1, 2, 3, 4].iter().cloned().collect::<HashSet<usize>>();
|
||||
|
||||
fn update_union_reset_on_next_modification() {
|
||||
let original_values = indexmap!{ 1 => (), 2 => (), 3 => (), 4 => () };
|
||||
// One original value, one new value.
|
||||
let new_values = [3, 5].iter().cloned().collect::<HashSet<usize>>();
|
||||
let new_values = indexmap!{3 => (), 5 => ()};
|
||||
|
||||
{
|
||||
let mut cache = Cache {
|
||||
values: original_values.clone(),
|
||||
inner: original_values.clone(),
|
||||
reset_on_next_modification: true,
|
||||
};
|
||||
cache.extend(new_values.iter().cloned(), &mut |_, _| ());
|
||||
assert_eq!(&cache.values, &new_values);
|
||||
cache.update_union(
|
||||
new_values.iter().map(|(&k, v)| (k, v.clone())),
|
||||
&mut |_, _| (),
|
||||
);
|
||||
assert_eq!(&cache.inner, &new_values);
|
||||
assert_eq!(cache.reset_on_next_modification, false);
|
||||
}
|
||||
|
||||
{
|
||||
let mut cache = Cache {
|
||||
values: original_values.clone(),
|
||||
inner: original_values.clone(),
|
||||
reset_on_next_modification: false,
|
||||
};
|
||||
cache.extend(new_values.iter().cloned(), &mut |_, _| ());
|
||||
assert_eq!(&cache.values,
|
||||
&[1, 2, 3, 4, 5].iter().cloned().collect::<HashSet<usize>>());
|
||||
cache.update_union(
|
||||
new_values.iter().map(|(&k, v)| (k, v.clone())),
|
||||
&mut |_, _| (),
|
||||
);
|
||||
assert_eq!(
|
||||
&cache.inner,
|
||||
&indexmap!{ 1 => (), 2 => (), 3 => (), 4 => (), 5 => () }
|
||||
);
|
||||
assert_eq!(cache.reset_on_next_modification, false);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_reset_on_next_modification() {
|
||||
let original_values = [1, 2, 3, 4].iter().cloned().collect::<HashSet<usize>>();
|
||||
let original_values = indexmap!{ 1 => (), 2 => (), 3 => (), 4 => () };
|
||||
|
||||
// One original value, one new value.
|
||||
let to_remove = [3, 5].iter().cloned().collect::<HashSet<usize>>();
|
||||
let to_remove = indexmap!{ 3 => (), 5 => ()};
|
||||
|
||||
{
|
||||
let mut cache = Cache {
|
||||
values: original_values.clone(),
|
||||
inner: original_values.clone(),
|
||||
reset_on_next_modification: true,
|
||||
};
|
||||
cache.remove(to_remove.iter().cloned(), &mut |_, _| ());
|
||||
assert_eq!(&cache.values, &HashSet::new());
|
||||
cache.remove(to_remove.iter().map(|(&k, _)| k), &mut |_, _| ());
|
||||
assert_eq!(&cache.inner, &IndexMap::new());
|
||||
assert_eq!(cache.reset_on_next_modification, false);
|
||||
}
|
||||
|
||||
{
|
||||
let mut cache = Cache {
|
||||
values: original_values.clone(),
|
||||
inner: original_values.clone(),
|
||||
reset_on_next_modification: false,
|
||||
};
|
||||
cache.remove(to_remove.iter().cloned(), &mut |_, _| ());
|
||||
assert_eq!(&cache.values, &[1, 2, 4].iter().cloned().collect::<HashSet<usize>>());
|
||||
cache.remove(to_remove.iter().map(|(&k, _)| k), &mut |_, _| ());
|
||||
assert_eq!(&cache.inner, &indexmap!{1 => (), 2 => (), 4 => ()});
|
||||
assert_eq!(cache.reset_on_next_modification, false);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clear_reset_on_next_modification() {
|
||||
let original_values = [1, 2, 3, 4].iter().cloned().collect::<HashSet<usize>>();
|
||||
let original_values = indexmap!{ 1 => (), 2 => (), 3 => (), 4 => () };
|
||||
|
||||
{
|
||||
let mut cache = Cache {
|
||||
values: original_values.clone(),
|
||||
inner: original_values.clone(),
|
||||
reset_on_next_modification: true,
|
||||
};
|
||||
cache.clear(&mut |_, _| ());
|
||||
assert_eq!(&cache.values, &HashSet::new());
|
||||
assert_eq!(&cache.inner, &IndexMap::new());
|
||||
assert_eq!(cache.reset_on_next_modification, false);
|
||||
}
|
||||
|
||||
{
|
||||
let mut cache = Cache {
|
||||
values: original_values.clone(),
|
||||
inner: original_values.clone(),
|
||||
reset_on_next_modification: false,
|
||||
};
|
||||
cache.clear(&mut |_, _| ());
|
||||
assert_eq!(&cache.values, &HashSet::new());
|
||||
assert_eq!(&cache.inner, &IndexMap::new());
|
||||
assert_eq!(cache.reset_on_next_modification, false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ pub struct DiscoveryWork<T: HttpService<ResponseBody = RecvBody>> {
|
|||
}
|
||||
|
||||
struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
|
||||
addrs: Exists<Cache<SocketAddr>>,
|
||||
addrs: Exists<Cache<SocketAddr, ()>>,
|
||||
query: DestinationServiceQuery<T>,
|
||||
txs: Vec<mpsc::UnboundedSender<Update>>,
|
||||
}
|
||||
|
@ -187,7 +187,10 @@ where
|
|||
let service = self.bind.bind(&addr).map_err(|_| ())?;
|
||||
|
||||
Ok(Async::Ready(Change::Insert(addr, service)))
|
||||
}
|
||||
},
|
||||
// TODO: handle metadata changes by changing the labeling
|
||||
// middleware to hold a `futures-watch::Watch` on the label value,
|
||||
// so it can be updated.
|
||||
Update::Remove(addr) => Ok(Async::Ready(Change::Remove(addr))),
|
||||
}
|
||||
}
|
||||
|
@ -265,7 +268,7 @@ where
|
|||
// them onto the new watch first
|
||||
match set.addrs {
|
||||
Exists::Yes(ref cache) => {
|
||||
for &addr in cache.values().iter() {
|
||||
for (&addr, _) in cache {
|
||||
tx.unbounded_send(Update::Insert(addr))
|
||||
.expect("unbounded_send does not fail");
|
||||
}
|
||||
|
@ -398,9 +401,9 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
Exists::Yes(mut cache) => cache,
|
||||
Exists::Unknown | Exists::No => Cache::new(),
|
||||
};
|
||||
cache.extend(
|
||||
addrs_to_add,
|
||||
&mut |addr, change| Self::on_change(&mut self.txs, authority_for_logging, addr,
|
||||
cache.update_union(
|
||||
addrs_to_add.map(|a| (a, ())),
|
||||
&mut |(addr, _), change| Self::on_change(&mut self.txs, authority_for_logging, addr,
|
||||
change));
|
||||
self.addrs = Exists::Yes(cache);
|
||||
}
|
||||
|
@ -412,7 +415,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
Exists::Yes(mut cache) => {
|
||||
cache.remove(
|
||||
addrs_to_remove,
|
||||
&mut |addr, change| Self::on_change(&mut self.txs, authority_for_logging, addr,
|
||||
&mut |(addr, _), change| Self::on_change(&mut self.txs, authority_for_logging, addr,
|
||||
change));
|
||||
cache
|
||||
},
|
||||
|
@ -427,7 +430,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
match self.addrs.take() {
|
||||
Exists::Yes(mut cache) => {
|
||||
cache.clear(
|
||||
&mut |addr, change| Self::on_change(&mut self.txs, authority_for_logging, addr,
|
||||
&mut |(addr, _), change| Self::on_change(&mut self.txs, authority_for_logging, addr,
|
||||
change));
|
||||
},
|
||||
Exists::Unknown | Exists::No => (),
|
||||
|
@ -447,6 +450,10 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
match change {
|
||||
CacheChange::Insertion => ("insert", Update::Insert),
|
||||
CacheChange::Removal => ("remove", Update::Remove),
|
||||
CacheChange::Modification => {
|
||||
// TODO: generate `ChangeMetadata` events.
|
||||
return;
|
||||
}
|
||||
};
|
||||
trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging);
|
||||
// retain is used to drop any senders that are dead
|
||||
|
|
|
@ -21,6 +21,7 @@ extern crate libc;
|
|||
#[macro_use]
|
||||
extern crate log;
|
||||
extern crate ns_dns_tokio;
|
||||
#[cfg_attr(test, macro_use)]
|
||||
extern crate indexmap;
|
||||
extern crate prost;
|
||||
extern crate prost_types;
|
||||
|
|
Loading…
Reference in New Issue