mirror of https://github.com/linkerd/linkerd2.git
Refactor control::Cache and add tests (#733)
Closes #713. This is a follow-up from #688. This PR makes a number of refactorings to the proxy's `control::Cache` module and removes all but one of the `clone` calls. The `CacheChange` enum now contains the changed key and a reference to the changed value when applicable. This simplifies `on_change` functions, which no longer have to take both a tuple of `(K, V)` and a `CacheChange` and can now simply destructure the `CacheChange`, and since the changed value is passed as a reference, the `on_change` function can now decide whether or not it should be cloned. This means that we can remove a majority of the clones previously present here. I've also rewritten `Cache::update_union` so that it no longer clones values (twice if the cache was invalidated). There's still one `clone` call in `Cache::update_intersection`, but it seems like it will be fairly tricky to remove. However, I've moved the `V: Clone` bound to that function specifically. `Cache::clear` and `Cache::update_union` so that they no longer call `Cache::update_intersection` internally, so they don't need a `V: Clone` bound. In addition, I've added some unit tests that test that `on_change` is called with the correct `CacheChange`s when key/value pairs are modified.
This commit is contained in:
parent
621f3c2e56
commit
64f4dfe07f
|
@ -1,5 +1,5 @@
|
|||
use indexmap::{map, IndexMap};
|
||||
use std::borrow::Borrow;
|
||||
use indexmap::{IndexMap, IndexSet};
|
||||
use indexmap::map::{self, Entry};
|
||||
use std::hash::Hash;
|
||||
use std::iter::IntoIterator;
|
||||
use std::mem;
|
||||
|
@ -27,13 +27,14 @@ pub enum Exists<T> {
|
|||
No,
|
||||
}
|
||||
|
||||
pub enum CacheChange {
|
||||
#[derive(Debug, Eq, PartialEq)]
|
||||
pub enum CacheChange<'value, K, V: 'value> {
|
||||
/// A new key was inserted.
|
||||
Insertion,
|
||||
Insertion { key: K, value: &'value V },
|
||||
/// A key-value pair was removed.
|
||||
Removal,
|
||||
Removal { key: K },
|
||||
/// The value mapped to an existing key was changed.
|
||||
Modification,
|
||||
Modification { key: K, new_value: &'value V },
|
||||
}
|
||||
|
||||
// ===== impl Exists =====
|
||||
|
@ -48,9 +49,8 @@ impl<T> Exists<T> {
|
|||
|
||||
impl<K, V> Cache<K, V>
|
||||
where
|
||||
K: Copy + Clone,
|
||||
K: Hash + Eq,
|
||||
V: PartialEq + Clone,
|
||||
K: Copy + Hash + Eq,
|
||||
V: PartialEq,
|
||||
{
|
||||
pub fn new() -> Self {
|
||||
Cache {
|
||||
|
@ -70,48 +70,62 @@ where
|
|||
pub fn update_union<I, F>(&mut self, iter: I, on_change: &mut F)
|
||||
where
|
||||
I: Iterator<Item = (K, V)>,
|
||||
F: FnMut((K, V), CacheChange),
|
||||
F: for<'value> FnMut(CacheChange<'value, K, V>),
|
||||
{
|
||||
fn update_inner<K, V, I, F>(
|
||||
fn update_inner<K, V, F>(
|
||||
inner: &mut IndexMap<K, V>,
|
||||
iter: I,
|
||||
key: K,
|
||||
new_value: V,
|
||||
on_change: &mut F
|
||||
)
|
||||
where
|
||||
K: Eq + Hash + Copy + Clone,
|
||||
V: PartialEq + Clone,
|
||||
I: Iterator<Item = (K, V)>,
|
||||
F: FnMut((K, V), CacheChange),
|
||||
K: Eq + Hash + Copy,
|
||||
V: PartialEq,
|
||||
F: for<'value> FnMut(CacheChange<'value, K, V>),
|
||||
{
|
||||
for (key, value) in iter {
|
||||
match inner.insert(key, value.clone()) {
|
||||
// If the returned value is equal to the inserted value,
|
||||
// then the inserted key was already present in the map
|
||||
// and the value is unchanged. Do nothing.
|
||||
Some(ref old_value) if old_value == &value => {},
|
||||
// If `insert` returns `Some` with a different value than
|
||||
// the one we inserted, then we changed the value for that
|
||||
// key.
|
||||
Some(_) =>
|
||||
on_change((key, value), CacheChange::Modification),
|
||||
// If `insert` returns `None`, then there was no old value
|
||||
// previously present. Therefore, we inserted a new value
|
||||
// into the cache.
|
||||
None => on_change((key, value), CacheChange::Insertion),
|
||||
match inner.entry(key) {
|
||||
Entry::Occupied(ref mut entry) if *entry.get() != new_value => {
|
||||
on_change(CacheChange::Modification {
|
||||
key,
|
||||
new_value: &new_value,
|
||||
});
|
||||
entry.insert(new_value);
|
||||
},
|
||||
Entry::Vacant(entry) => {
|
||||
on_change(CacheChange::Insertion {
|
||||
key,
|
||||
value: &new_value,
|
||||
});
|
||||
entry.insert(new_value);
|
||||
},
|
||||
Entry::Occupied(_) => {
|
||||
// Entry is occupied but the value is the same as the new
|
||||
// value, so skip it.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !self.reset_on_next_modification {
|
||||
update_inner(&mut self.inner, iter, on_change);
|
||||
// We don't need to invalidate the cache, so just update
|
||||
// to add the new keys.
|
||||
iter.for_each(|(k, v)| {
|
||||
update_inner(&mut self.inner, k, v, on_change)
|
||||
});
|
||||
} else {
|
||||
let to_insert = iter.collect::<IndexMap<K, V>>();
|
||||
update_inner(
|
||||
&mut self.inner,
|
||||
to_insert.iter().map(|(k, v)| (*k, v.clone())),
|
||||
on_change,
|
||||
);
|
||||
self.update_intersection(to_insert, on_change);
|
||||
// The cache was invalidated, so after updating entries present
|
||||
// in `to_insert`, remove any keys not present in `to_insert`.
|
||||
let retained_keys: IndexSet<K> = iter
|
||||
.map(|(k, v)| {
|
||||
update_inner(&mut self.inner, k, v, on_change);
|
||||
k
|
||||
})
|
||||
.collect();
|
||||
self.inner.retain(|key, _| if retained_keys.contains(key) {
|
||||
true
|
||||
} else {
|
||||
on_change(CacheChange::Removal { key: *key });
|
||||
false
|
||||
});
|
||||
}
|
||||
self.reset_on_next_modification = false;
|
||||
}
|
||||
|
@ -119,12 +133,12 @@ where
|
|||
pub fn remove<I, F>(&mut self, iter: I, on_change: &mut F)
|
||||
where
|
||||
I: Iterator<Item = K>,
|
||||
F: FnMut((K, V), CacheChange),
|
||||
F: for<'value> FnMut(CacheChange<'value, K, V>),
|
||||
{
|
||||
if !self.reset_on_next_modification {
|
||||
for key in iter {
|
||||
if let Some(value) = self.inner.remove(&key) {
|
||||
on_change((key, value), CacheChange::Removal);
|
||||
if let Some(_) = self.inner.remove(&key) {
|
||||
on_change(CacheChange::Removal { key });
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -135,44 +149,11 @@ where
|
|||
|
||||
pub fn clear<F>(&mut self, on_change: &mut F)
|
||||
where
|
||||
F: FnMut((K, V), CacheChange),
|
||||
F: for<'value> FnMut(CacheChange<'value, K, V>),
|
||||
{
|
||||
self.update_intersection(IndexMap::new(), on_change)
|
||||
}
|
||||
|
||||
/// Update the cache to contain the intersection of its current contents
|
||||
/// and the key-value pairs in `to_update`. Pairs not present in
|
||||
/// `to_update` will be removed from the cache, and any keys in the cache
|
||||
/// with different inner from those in `to_update` will be ovewritten to
|
||||
/// match `to_update`.
|
||||
pub fn update_intersection<F, Q>(
|
||||
&mut self,
|
||||
mut to_update: IndexMap<Q, V>,
|
||||
mut on_change: F
|
||||
)
|
||||
where
|
||||
F: FnMut((K, V), CacheChange),
|
||||
K: Borrow<Q>,
|
||||
Q: Hash + Eq,
|
||||
{
|
||||
self.inner.retain(|key, value| {
|
||||
match to_update.remove(key.borrow()) {
|
||||
// New value matches old value. Do nothing.
|
||||
Some(ref new_value) if new_value == value => true,
|
||||
// If the new value isn't equal to the old value, overwrite
|
||||
// the old value.
|
||||
Some(new_value) => {
|
||||
let _ = mem::replace(value, new_value.clone());
|
||||
on_change((*key, new_value), CacheChange::Modification);
|
||||
true
|
||||
},
|
||||
// Key doesn't exist, remove it from the map.
|
||||
None => {
|
||||
on_change((*key, value.clone()), CacheChange::Removal);
|
||||
false
|
||||
}
|
||||
}
|
||||
});
|
||||
for (key, _) in self.inner.drain(..) {
|
||||
on_change(CacheChange::Removal { key })
|
||||
};
|
||||
self.reset_on_next_modification = false;
|
||||
}
|
||||
}
|
||||
|
@ -193,6 +174,87 @@ where
|
|||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn update_union_fires_events() {
|
||||
let mut cache = Cache {
|
||||
inner: indexmap!{ 1 => "one", 2 => "two", },
|
||||
reset_on_next_modification: false,
|
||||
};
|
||||
|
||||
let mut insertions = 0;
|
||||
let mut modifications = 0;
|
||||
|
||||
cache.update_union(
|
||||
indexmap!{ 1 => "one", 2 => "2", 3 => "three"}.into_iter(),
|
||||
&mut |change: CacheChange<usize, &str>| match change {
|
||||
CacheChange::Removal { .. } => {
|
||||
panic!("no removals should have been fired!");
|
||||
},
|
||||
CacheChange::Insertion { key, value } => {
|
||||
insertions += 1;
|
||||
assert_eq!(key, 3);
|
||||
assert_eq!(value, &"three");
|
||||
},
|
||||
CacheChange::Modification { key, new_value } => {
|
||||
modifications += 1;
|
||||
assert_eq!(key, 2);
|
||||
assert_eq!(new_value, &"2");
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
cache.update_union(
|
||||
indexmap!{ 1 => "1", 2 => "2", 3 => "3"}.into_iter(),
|
||||
&mut |change: CacheChange<usize, &str>| match change {
|
||||
CacheChange::Removal { .. } => {
|
||||
panic!("no removals should have been fired!");
|
||||
},
|
||||
CacheChange::Insertion { .. } => {
|
||||
panic!("no insertions should have been fired!");
|
||||
},
|
||||
CacheChange::Modification { key, new_value } => {
|
||||
modifications += 1;
|
||||
assert!(key == 1 || key == 3);
|
||||
assert!(new_value == &"1" || new_value == &"3")
|
||||
}
|
||||
}
|
||||
);
|
||||
assert_eq!(insertions, 1);
|
||||
assert_eq!(modifications, 3);
|
||||
|
||||
cache.update_union(
|
||||
indexmap!{ 4 => "four", 5 => "five"}.into_iter(),
|
||||
&mut |change: CacheChange<usize, &str>| match change {
|
||||
CacheChange::Removal { .. } => {
|
||||
panic!("no removals should have been fired!");
|
||||
},
|
||||
CacheChange::Insertion { key, value } => {
|
||||
insertions += 1;
|
||||
assert!(key == 4 || key == 5);
|
||||
assert!(value == &"four" || value == &"five")
|
||||
},
|
||||
CacheChange::Modification { .. } => {
|
||||
panic!("no insertions should have been fired!");
|
||||
}
|
||||
}
|
||||
);
|
||||
assert_eq!(insertions, 3);
|
||||
assert_eq!(modifications, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clear_fires_removal_event() {
|
||||
let mut cache = Cache {
|
||||
inner: indexmap!{ 1 => () },
|
||||
reset_on_next_modification: false,
|
||||
};
|
||||
cache.clear(
|
||||
&mut |change| {
|
||||
assert_eq!(change, CacheChange::Removal{ key: 1, });
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn update_union_reset_on_next_modification() {
|
||||
let original_values = indexmap!{ 1 => (), 2 => (), 3 => (), 4 => () };
|
||||
|
@ -206,7 +268,7 @@ mod tests {
|
|||
};
|
||||
cache.update_union(
|
||||
new_values.iter().map(|(&k, v)| (k, v.clone())),
|
||||
&mut |_, _| (),
|
||||
&mut |_| (),
|
||||
);
|
||||
assert_eq!(&cache.inner, &new_values);
|
||||
assert_eq!(cache.reset_on_next_modification, false);
|
||||
|
@ -219,7 +281,7 @@ mod tests {
|
|||
};
|
||||
cache.update_union(
|
||||
new_values.iter().map(|(&k, v)| (k, v.clone())),
|
||||
&mut |_, _| (),
|
||||
&mut |_| (),
|
||||
);
|
||||
assert_eq!(
|
||||
&cache.inner,
|
||||
|
@ -241,7 +303,7 @@ mod tests {
|
|||
inner: original_values.clone(),
|
||||
reset_on_next_modification: true,
|
||||
};
|
||||
cache.remove(to_remove.iter().map(|(&k, _)| k), &mut |_, _| ());
|
||||
cache.remove(to_remove.iter().map(|(&k, _)| k), &mut |_| ());
|
||||
assert_eq!(&cache.inner, &IndexMap::new());
|
||||
assert_eq!(cache.reset_on_next_modification, false);
|
||||
}
|
||||
|
@ -251,7 +313,7 @@ mod tests {
|
|||
inner: original_values.clone(),
|
||||
reset_on_next_modification: false,
|
||||
};
|
||||
cache.remove(to_remove.iter().map(|(&k, _)| k), &mut |_, _| ());
|
||||
cache.remove(to_remove.iter().map(|(&k, _)| k), &mut |_| ());
|
||||
assert_eq!(&cache.inner, &indexmap!{1 => (), 2 => (), 4 => ()});
|
||||
assert_eq!(cache.reset_on_next_modification, false);
|
||||
}
|
||||
|
@ -266,7 +328,7 @@ mod tests {
|
|||
inner: original_values.clone(),
|
||||
reset_on_next_modification: true,
|
||||
};
|
||||
cache.clear(&mut |_, _| ());
|
||||
cache.clear(&mut |_| ());
|
||||
assert_eq!(&cache.inner, &IndexMap::new());
|
||||
assert_eq!(cache.reset_on_next_modification, false);
|
||||
}
|
||||
|
@ -276,7 +338,7 @@ mod tests {
|
|||
inner: original_values.clone(),
|
||||
reset_on_next_modification: false,
|
||||
};
|
||||
cache.clear(&mut |_, _| ());
|
||||
cache.clear(&mut |_| ());
|
||||
assert_eq!(&cache.inner, &IndexMap::new());
|
||||
assert_eq!(cache.reset_on_next_modification, false);
|
||||
}
|
||||
|
|
|
@ -127,7 +127,7 @@ enum RxError<T> {
|
|||
Stream(grpc::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
enum Update {
|
||||
Insert(SocketAddr, Metadata),
|
||||
Remove(SocketAddr),
|
||||
|
@ -611,14 +611,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
};
|
||||
cache.update_union(
|
||||
addrs_to_add,
|
||||
&mut |(addr, meta), change| Self::on_change(
|
||||
&mut self.txs,
|
||||
authority_for_logging,
|
||||
addr,
|
||||
meta,
|
||||
change,
|
||||
)
|
||||
);
|
||||
&mut |change| Self::on_change(&mut self.txs, authority_for_logging, change));
|
||||
self.addrs = Exists::Yes(cache);
|
||||
}
|
||||
|
||||
|
@ -629,14 +622,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
Exists::Yes(mut cache) => {
|
||||
cache.remove(
|
||||
addrs_to_remove,
|
||||
&mut |(addr, meta), change| Self::on_change(
|
||||
&mut self.txs,
|
||||
authority_for_logging,
|
||||
addr,
|
||||
meta,
|
||||
change,
|
||||
)
|
||||
);
|
||||
&mut |change| Self::on_change(&mut self.txs, authority_for_logging, change));
|
||||
cache
|
||||
},
|
||||
Exists::Unknown | Exists::No => Cache::new(),
|
||||
|
@ -650,14 +636,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
match self.addrs.take() {
|
||||
Exists::Yes(mut cache) => {
|
||||
cache.clear(
|
||||
&mut |(addr, meta), change| Self::on_change(
|
||||
&mut self.txs,
|
||||
authority_for_logging,
|
||||
addr,
|
||||
meta,
|
||||
change
|
||||
)
|
||||
);
|
||||
&mut |change| Self::on_change(&mut self.txs, authority_for_logging, change));
|
||||
},
|
||||
Exists::Unknown | Exists::No => (),
|
||||
};
|
||||
|
@ -670,21 +649,19 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
|
||||
fn on_change(txs: &mut Vec<mpsc::UnboundedSender<Update>>,
|
||||
authority_for_logging: &DnsNameAndPort,
|
||||
addr: SocketAddr,
|
||||
meta: Metadata,
|
||||
change: CacheChange) {
|
||||
let (update_str, update_constructor): (&'static str, fn(SocketAddr, Metadata) -> Update) =
|
||||
match change {
|
||||
CacheChange::Insertion => ("insert", Update::Insert),
|
||||
CacheChange::Removal =>
|
||||
("remove", |addr, _| Update::Remove(addr)),
|
||||
CacheChange::Modification =>
|
||||
("change metadata for", Update::ChangeMetadata),
|
||||
};
|
||||
change: CacheChange<SocketAddr, Metadata>) {
|
||||
let (update_str, update, addr) = match change {
|
||||
CacheChange::Insertion { key, value } =>
|
||||
("insert", Update::Insert(key, value.clone()), key),
|
||||
CacheChange::Removal { key } =>
|
||||
("remove", Update::Remove(key), key),
|
||||
CacheChange::Modification { key, new_value } =>
|
||||
("change metadata for", Update::ChangeMetadata(key, new_value.clone()), key),
|
||||
};
|
||||
trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging);
|
||||
// retain is used to drop any senders that are dead
|
||||
txs.retain(|tx| {
|
||||
tx.unbounded_send(update_constructor(addr, meta.clone())).is_ok()
|
||||
tx.unbounded_send(update.clone()).is_ok()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue