Move control::discovery::Cache into its own module (#672)

The proxy's control::discovery module is becoming a bit dense in terms
of what it implements.

In order to make this code more understandable, and to be able to use a
similar caching strategy in other parts of the controller, the
`control::cache` module now holds discovery's cache implementation.

This module is only visible within the `control` module, and it now
exposes two new public methods: `values()` and
`set_reset_on_next_modification()`.
This commit is contained in:
Oliver Gould 2018-04-04 14:27:04 -07:00 committed by GitHub
parent 01628bfa43
commit 2dc964c583
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 205 additions and 191 deletions

199
proxy/src/control/cache.rs Normal file
View File

@ -0,0 +1,199 @@
use std;
use std::collections::HashSet;
use std::mem;
/// A cache that supports incremental updates with lazy resetting on
/// invalidation.
///
/// When the cache `c` initially becomes invalid (i.e. it becomes
/// potentially out of sync with the data source so that incremental updates
/// would stop working), call `c.reset_on_next_modification()`; the next
/// incremental update will then replace the entire contents of the cache,
/// instead of incrementally augmenting it. Until that next modification,
/// however, the stale contents of the cache will be made available.
pub struct Cache<T> {
values: HashSet<T>,
reset_on_next_modification: bool,
}
pub enum Exists<T> {
Unknown, // Unknown if the item exists or not
Yes(T),
No, // Affirmatively known to not exist.
}
pub enum CacheChange {
Insertion,
Removal,
}
// ===== impl Exists =====
impl<T> Exists<T> {
pub fn take(&mut self) -> Exists<T> {
mem::replace(self, Exists::Unknown)
}
}
// ===== impl Cache =====
impl<T> Cache<T> where T: Clone + Copy + Eq + std::hash::Hash {
pub fn new() -> Self {
Cache {
values: HashSet::new(),
reset_on_next_modification: true,
}
}
pub fn values(&self) -> &HashSet<T> {
&self.values
}
pub fn set_reset_on_next_modification(&mut self) {
self.reset_on_next_modification = true;
}
pub fn extend<I, F>(&mut self, iter: I, on_change: &mut F)
where I: Iterator<Item = T>,
F: FnMut(T, CacheChange),
{
fn extend_inner<T, I, F>(values: &mut HashSet<T>, iter: I, on_change: &mut F)
where T: Copy + Eq + std::hash::Hash, I: Iterator<Item = T>, F: FnMut(T, CacheChange)
{
for value in iter {
if values.insert(value) {
on_change(value, CacheChange::Insertion);
}
}
}
if !self.reset_on_next_modification {
extend_inner(&mut self.values, iter, on_change);
} else {
let to_insert = iter.collect::<HashSet<T>>();
extend_inner(&mut self.values, to_insert.iter().map(|value| *value), on_change);
self.retain(&to_insert, on_change);
}
self.reset_on_next_modification = false;
}
pub fn remove<I, F>(&mut self, iter: I, on_change: &mut F)
where I: Iterator<Item = T>,
F: FnMut(T, CacheChange)
{
if !self.reset_on_next_modification {
for value in iter {
if self.values.remove(&value) {
on_change(value, CacheChange::Removal);
}
}
} else {
self.clear(on_change);
}
self.reset_on_next_modification = false;
}
pub fn clear<F>(&mut self, on_change: &mut F) where F: FnMut(T, CacheChange) {
self.retain(&HashSet::new(), on_change)
}
pub fn retain<F>(&mut self, to_retain: &HashSet<T>, mut on_change: F)
where F: FnMut(T, CacheChange)
{
self.values.retain(|value| {
let retain = to_retain.contains(&value);
if !retain {
on_change(*value, CacheChange::Removal)
}
retain
});
self.reset_on_next_modification = false;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extend_reset_on_next_modification() {
let original_values = [1, 2, 3, 4].iter().cloned().collect::<HashSet<usize>>();
// One original value, one new value.
let new_values = [3, 5].iter().cloned().collect::<HashSet<usize>>();
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: true,
};
cache.extend(new_values.iter().cloned(), &mut |_, _| ());
assert_eq!(&cache.values, &new_values);
assert_eq!(cache.reset_on_next_modification, false);
}
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: false,
};
cache.extend(new_values.iter().cloned(), &mut |_, _| ());
assert_eq!(&cache.values,
&[1, 2, 3, 4, 5].iter().cloned().collect::<HashSet<usize>>());
assert_eq!(cache.reset_on_next_modification, false);
}
}
#[test]
fn remove_reset_on_next_modification() {
let original_values = [1, 2, 3, 4].iter().cloned().collect::<HashSet<usize>>();
// One original value, one new value.
let to_remove = [3, 5].iter().cloned().collect::<HashSet<usize>>();
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: true,
};
cache.remove(to_remove.iter().cloned(), &mut |_, _| ());
assert_eq!(&cache.values, &HashSet::new());
assert_eq!(cache.reset_on_next_modification, false);
}
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: false,
};
cache.remove(to_remove.iter().cloned(), &mut |_, _| ());
assert_eq!(&cache.values, &[1, 2, 4].iter().cloned().collect::<HashSet<usize>>());
assert_eq!(cache.reset_on_next_modification, false);
}
}
#[test]
fn clear_reset_on_next_modification() {
let original_values = [1, 2, 3, 4].iter().cloned().collect::<HashSet<usize>>();
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: true,
};
cache.clear(&mut |_, _| ());
assert_eq!(&cache.values, &HashSet::new());
assert_eq!(cache.reset_on_next_modification, false);
}
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: false,
};
cache.clear(&mut |_, _| ());
assert_eq!(&cache.values, &HashSet::new());
assert_eq!(cache.reset_on_next_modification, false);
}
}
}

View File

@ -1,9 +1,7 @@
use std;
use std::collections::{HashSet, VecDeque};
use std::collections::VecDeque;
use std::collections::hash_map::{Entry, HashMap};
use std::net::SocketAddr;
use std::fmt;
use std::mem;
use futures::{Async, Future, Poll, Stream};
use futures::sync::mpsc;
@ -19,6 +17,8 @@ use conduit_proxy_controller_grpc::destination::Update as PbUpdate;
use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2;
use conduit_proxy_controller_grpc::destination::client::{Destination as DestinationSvc};
use control::cache::{Cache, CacheChange, Exists};
/// A handle to start watching a destination for address changes.
#[derive(Clone, Debug)]
pub struct Discovery {
@ -61,32 +61,6 @@ struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
txs: Vec<mpsc::UnboundedSender<Update>>,
}
enum Exists<T> {
Unknown, // Unknown if the item exists or not
Yes(T),
No, // Affirmatively known to not exist.
}
/// A cache that supports incremental updates with lazy resetting on
/// invalidation.
///
/// When the cache `c` initially becomes invalid (i.e. it becomes
/// potentially out of sync with the data source so that incremental updates
/// would stop working), call `c.reset_on_next_modification()`; the next
/// incremental update will then replace the entire contents of the cache,
/// instead of incrementally augmenting it. Until that next modification,
/// however, the stale contents of the cache will be made available.
struct Cache<T> {
values: HashSet<T>,
reset_on_next_modification: bool,
}
impl<T> Exists<T> {
fn take(&mut self) -> Exists<T> {
mem::replace(self, Exists::Unknown)
}
}
enum DestinationServiceQuery<T: HttpService<ResponseBody = RecvBody>> {
NeedsReconnect,
ConnectedOrConnecting {
@ -291,7 +265,7 @@ where
// them onto the new watch first
match set.addrs {
Exists::Yes(ref cache) => {
for &addr in cache.values.iter() {
for &addr in cache.values().iter() {
tx.unbounded_send(Update::Insert(addr))
.expect("unbounded_send does not fail");
}
@ -410,7 +384,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
fn reset_on_next_modification(&mut self) {
match self.addrs {
Exists::Yes(ref mut cache) => {
cache.reset_on_next_modification = true;
cache.set_reset_on_next_modification();
},
Exists::No |
Exists::Unknown => (),
@ -482,79 +456,6 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
}
}
// ===== impl Cache =====
enum CacheChange {
Insertion,
Removal,
}
impl<T> Cache<T> where T: Clone + Copy + Eq + std::hash::Hash {
fn new() -> Self {
Cache {
values: HashSet::new(),
reset_on_next_modification: true,
}
}
fn extend<I, F>(&mut self, iter: I, on_change: &mut F)
where I: Iterator<Item = T>,
F: FnMut(T, CacheChange),
{
fn extend_inner<T, I, F>(values: &mut HashSet<T>, iter: I, on_change: &mut F)
where T: Copy + Eq + std::hash::Hash, I: Iterator<Item = T>, F: FnMut(T, CacheChange)
{
for value in iter {
if values.insert(value) {
on_change(value, CacheChange::Insertion);
}
}
}
if !self.reset_on_next_modification {
extend_inner(&mut self.values, iter, on_change);
} else {
let to_insert = iter.collect::<HashSet<T>>();
extend_inner(&mut self.values, to_insert.iter().map(|value| *value), on_change);
self.retain(&to_insert, on_change);
}
self.reset_on_next_modification = false;
}
fn remove<I, F>(&mut self, iter: I, on_change: &mut F)
where I: Iterator<Item = T>,
F: FnMut(T, CacheChange)
{
if !self.reset_on_next_modification {
for value in iter {
if self.values.remove(&value) {
on_change(value, CacheChange::Removal);
}
}
} else {
self.clear(on_change);
}
self.reset_on_next_modification = false;
}
fn clear<F>(&mut self, on_change: &mut F) where F: FnMut(T, CacheChange) {
self.retain(&HashSet::new(), on_change)
}
fn retain<F>(&mut self, to_retain: &HashSet<T>, mut on_change: F)
where F: FnMut(T, CacheChange)
{
self.values.retain(|value| {
let retain = to_retain.contains(&value);
if !retain {
on_change(*value, CacheChange::Removal)
}
retain
});
self.reset_on_next_modification = false;
}
}
// ===== impl Bind =====
impl<F, S, E> Bind for F
@ -651,90 +552,3 @@ fn pb_to_sock_addr(pb: TcpAddress) -> Option<SocketAddr> {
None => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cache_extend_reset_on_next_modification() {
let original_values = [1, 2, 3, 4].iter().cloned().collect::<HashSet<usize>>();
// One original value, one new value.
let new_values = [3, 5].iter().cloned().collect::<HashSet<usize>>();
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: true,
};
cache.extend(new_values.iter().cloned(), &mut |_, _| ());
assert_eq!(&cache.values, &new_values);
assert_eq!(cache.reset_on_next_modification, false);
}
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: false,
};
cache.extend(new_values.iter().cloned(), &mut |_, _| ());
assert_eq!(&cache.values,
&[1, 2, 3, 4, 5].iter().cloned().collect::<HashSet<usize>>());
assert_eq!(cache.reset_on_next_modification, false);
}
}
#[test]
fn cache_remove_reset_on_next_modification() {
let original_values = [1, 2, 3, 4].iter().cloned().collect::<HashSet<usize>>();
// One original value, one new value.
let to_remove = [3, 5].iter().cloned().collect::<HashSet<usize>>();
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: true,
};
cache.remove(to_remove.iter().cloned(), &mut |_, _| ());
assert_eq!(&cache.values, &HashSet::new());
assert_eq!(cache.reset_on_next_modification, false);
}
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: false,
};
cache.remove(to_remove.iter().cloned(), &mut |_, _| ());
assert_eq!(&cache.values, &[1, 2, 4].iter().cloned().collect::<HashSet<usize>>());
assert_eq!(cache.reset_on_next_modification, false);
}
}
#[test]
fn cache_clear_reset_on_next_modification() {
let original_values = [1, 2, 3, 4].iter().cloned().collect::<HashSet<usize>>();
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: true,
};
cache.clear(&mut |_, _| ());
assert_eq!(&cache.values, &HashSet::new());
assert_eq!(cache.reset_on_next_modification, false);
}
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: false,
};
cache.clear(&mut |_, _| ());
assert_eq!(&cache.values, &HashSet::new());
assert_eq!(cache.reset_on_next_modification, false);
}
}
}

View File

@ -21,6 +21,7 @@ use fully_qualified_authority::FullyQualifiedAuthority;
use transport::{HostAndPort, LookupAddressAndConnect};
use timeout::{Timeout, TimeoutError};
mod cache;
pub mod discovery;
mod observe;
pub mod pb;