Proxy: Completely replace current set of destinations on reconnect (#632)

Previosuly, when the proxy was disconnected from the Destination
service and then reconnects, the proxy would not forget old, outdated
entries in its cache of endpoints. If those endpoints had been removed
while the proxy was disconnected then the proxy would never become
aware of that.

Instead, on the first message after a reconnection, replace the entire
set of cached entries with the new set, which may be empty.

Prior to this change, the new test
outbound_destinations_reset_on_reconnect_followed_by_no_endpoints_exists
passed already
but outbound_destinations_reset_on_reconnect_followed_by_add_none
and outbound_destinations_reset_on_reconnect_followed_by_remove_none
failed. Now all these tests pass.

Fixes #573

Signed-off-by: Brian Smith <brian@briansmith.org>
This commit is contained in:
Brian Smith 2018-03-29 16:50:08 -10:00 committed by GitHub
parent 8fe742e2de
commit f931dec3b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 332 additions and 53 deletions

View File

@ -1,3 +1,4 @@
use std;
use std::collections::{HashSet, VecDeque};
use std::collections::hash_map::{Entry, HashMap};
use std::net::SocketAddr;
@ -55,7 +56,7 @@ pub struct DiscoveryWork<T: HttpService<ResponseBody = RecvBody>> {
}
struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
addrs: Exists<HashSet<SocketAddr>>,
addrs: Exists<Cache<SocketAddr>>,
query: DestinationServiceQuery<T>,
txs: Vec<mpsc::UnboundedSender<Update>>,
}
@ -66,6 +67,20 @@ enum Exists<T> {
No, // Affirmatively known to not exist.
}
/// A cache that supports incremental updates with lazy resetting on
/// invalidation.
///
/// When the cache `c` initially becomes invalid (i.e. it becomes
/// potentially out of sync with the data source so that incremental updates
/// would stop working), call `c.reset_on_next_modification()`; the next
/// incremental update will then replace the entire contents of the cache,
/// instead of incrementally augmenting it. Until that next modification,
/// however, the stale contents of the cache will be made available.
struct Cache<T> {
values: HashSet<T>,
reset_on_next_modification: bool,
}
impl<T> Exists<T> {
fn take(&mut self) -> Exists<T> {
mem::replace(self, Exists::Unknown)
@ -275,8 +290,8 @@ where
// we may already know of some addresses here, so push
// them onto the new watch first
match set.addrs {
Exists::Yes(ref mut addrs) => {
for &addr in addrs.iter() {
Exists::Yes(ref cache) => {
for &addr in cache.values.iter() {
tx.unbounded_send(Update::Insert(addr))
.expect("unbounded_send does not fail");
}
@ -365,6 +380,7 @@ where
};
if needs_reconnect {
set.query = DestinationServiceQuery::NeedsReconnect;
set.reset_on_next_modification();
self.reconnects.push_back(FullyQualifiedAuthority::clone(auth));
}
}
@ -391,79 +407,154 @@ impl<T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>> Destination
// ===== impl DestinationSet =====
impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
fn add<Addrs>(&mut self, authority_for_logging: &FullyQualifiedAuthority, addrs_to_add: Addrs)
where Addrs: Iterator<Item = SocketAddr>
{
let mut addrs = match self.addrs.take() {
Exists::Yes(addrs) => addrs,
Exists::Unknown | Exists::No => {
trace!("adding entries for {:?} that wasn't known to exist. Now assuming it does.",
authority_for_logging);
HashSet::new()
fn reset_on_next_modification(&mut self) {
match self.addrs {
Exists::Yes(ref mut cache) => {
cache.reset_on_next_modification = true;
},
};
for addr in addrs_to_add {
if addrs.insert(addr) {
trace!("update {:?} for {:?}", addr, authority_for_logging);
// retain is used to drop any senders that are dead
self.txs.retain(|tx| {
tx.unbounded_send(Update::Insert(addr)).is_ok()
});
Exists::No |
Exists::Unknown => (),
}
}
self.addrs = Exists::Yes(addrs);
}
fn remove<Addrs>(&mut self, authority_for_logging: &FullyQualifiedAuthority,
addrs_to_remove: Addrs)
where Addrs: Iterator<Item = SocketAddr>
fn add<A>(&mut self, authority_for_logging: &FullyQualifiedAuthority, addrs_to_add: A)
where A: Iterator<Item = SocketAddr>
{
let addrs = match self.addrs.take() {
Exists::Yes(mut addrs) => {
for addr in addrs_to_remove {
if addrs.remove(&addr) {
self.notify_of_removal(addr, authority_for_logging)
}
}
addrs
},
Exists::Unknown | Exists::No => {
trace!("remove addresses for {:?} that wasn't known to exist. Now assuming it does.",
authority_for_logging);
HashSet::new()
},
let mut cache = match self.addrs.take() {
Exists::Yes(mut cache) => cache,
Exists::Unknown | Exists::No => Cache::new(),
};
self.addrs = Exists::Yes(addrs)
cache.extend(
addrs_to_add,
&mut |addr, change| Self::on_change(&mut self.txs, authority_for_logging, addr,
change));
self.addrs = Exists::Yes(cache);
}
fn remove<A>(&mut self, authority_for_logging: &FullyQualifiedAuthority, addrs_to_remove: A)
where A: Iterator<Item = SocketAddr>
{
let cache = match self.addrs.take() {
Exists::Yes(mut cache) => {
cache.remove(
addrs_to_remove,
&mut |addr, change| Self::on_change(&mut self.txs, authority_for_logging, addr,
change));
cache
},
Exists::Unknown | Exists::No => Cache::new(),
};
self.addrs = Exists::Yes(cache);
}
fn no_endpoints(&mut self, authority_for_logging: &FullyQualifiedAuthority, exists: bool) {
trace!("no endpoints for {:?} that is known to {}", authority_for_logging,
if exists { "exist"} else { "not exist"});
if exists { "exist" } else { "not exist" });
match self.addrs.take() {
Exists::Yes(addrs) => {
for addr in addrs {
self.notify_of_removal(addr, authority_for_logging)
}
Exists::Yes(mut cache) => {
cache.clear(
&mut |addr, change| Self::on_change(&mut self.txs, authority_for_logging, addr,
change));
},
Exists::No | Exists::Unknown => {},
}
Exists::Unknown | Exists::No => (),
};
self.addrs = if exists {
Exists::Yes(HashSet::new())
Exists::Yes(Cache::new())
} else {
Exists::No
};
}
fn notify_of_removal(&mut self, addr: SocketAddr,
authority_for_logging: &FullyQualifiedAuthority) {
trace!("remove {:?} for {:?}", addr, authority_for_logging);
fn on_change(txs: &mut Vec<mpsc::UnboundedSender<Update>>,
authority_for_logging: &FullyQualifiedAuthority,
addr: SocketAddr,
change: CacheChange) {
let (update_str, update_constructor): (&'static str, fn(SocketAddr) -> Update) =
match change {
CacheChange::Insertion => ("insert", Update::Insert),
CacheChange::Removal => ("remove", Update::Remove),
};
trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging);
// retain is used to drop any senders that are dead
self.txs.retain(|tx| {
tx.unbounded_send(Update::Remove(addr)).is_ok()
txs.retain(|tx| {
tx.unbounded_send(update_constructor(addr)).is_ok()
});
}
}
// ===== impl Cache =====
enum CacheChange {
Insertion,
Removal,
}
impl<T> Cache<T> where T: Clone + Copy + Eq + std::hash::Hash {
fn new() -> Self {
Cache {
values: HashSet::new(),
reset_on_next_modification: true,
}
}
fn extend<I, F>(&mut self, iter: I, on_change: &mut F)
where I: Iterator<Item = T>,
F: FnMut(T, CacheChange),
{
fn extend_inner<T, I, F>(values: &mut HashSet<T>, iter: I, on_change: &mut F)
where T: Copy + Eq + std::hash::Hash, I: Iterator<Item = T>, F: FnMut(T, CacheChange)
{
for value in iter {
if values.insert(value) {
on_change(value, CacheChange::Insertion);
}
}
}
if !self.reset_on_next_modification {
extend_inner(&mut self.values, iter, on_change);
} else {
let to_insert = iter.collect::<HashSet<T>>();
extend_inner(&mut self.values, to_insert.iter().map(|value| *value), on_change);
self.retain(&to_insert, on_change);
}
self.reset_on_next_modification = false;
}
fn remove<I, F>(&mut self, iter: I, on_change: &mut F)
where I: Iterator<Item = T>,
F: FnMut(T, CacheChange)
{
if !self.reset_on_next_modification {
for value in iter {
if self.values.remove(&value) {
on_change(value, CacheChange::Removal);
}
}
} else {
self.clear(on_change);
}
self.reset_on_next_modification = false;
}
fn clear<F>(&mut self, on_change: &mut F) where F: FnMut(T, CacheChange) {
self.retain(&HashSet::new(), on_change)
}
fn retain<F>(&mut self, to_retain: &HashSet<T>, mut on_change: F)
where F: FnMut(T, CacheChange)
{
self.values.retain(|value| {
let retain = to_retain.contains(&value);
if !retain {
on_change(*value, CacheChange::Removal)
}
retain
});
self.reset_on_next_modification = false;
}
}
// ===== impl Bind =====
impl<F, S, E> Bind for F
@ -560,3 +651,90 @@ fn pb_to_sock_addr(pb: TcpAddress) -> Option<SocketAddr> {
None => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cache_extend_reset_on_next_modification() {
let original_values = [1, 2, 3, 4].iter().cloned().collect::<HashSet<usize>>();
// One original value, one new value.
let new_values = [3, 5].iter().cloned().collect::<HashSet<usize>>();
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: true,
};
cache.extend(new_values.iter().cloned(), &mut |_, _| ());
assert_eq!(&cache.values, &new_values);
assert_eq!(cache.reset_on_next_modification, false);
}
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: false,
};
cache.extend(new_values.iter().cloned(), &mut |_, _| ());
assert_eq!(&cache.values,
&[1, 2, 3, 4, 5].iter().cloned().collect::<HashSet<usize>>());
assert_eq!(cache.reset_on_next_modification, false);
}
}
#[test]
fn cache_remove_reset_on_next_modification() {
let original_values = [1, 2, 3, 4].iter().cloned().collect::<HashSet<usize>>();
// One original value, one new value.
let to_remove = [3, 5].iter().cloned().collect::<HashSet<usize>>();
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: true,
};
cache.remove(to_remove.iter().cloned(), &mut |_, _| ());
assert_eq!(&cache.values, &HashSet::new());
assert_eq!(cache.reset_on_next_modification, false);
}
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: false,
};
cache.remove(to_remove.iter().cloned(), &mut |_, _| ());
assert_eq!(&cache.values, &[1, 2, 4].iter().cloned().collect::<HashSet<usize>>());
assert_eq!(cache.reset_on_next_modification, false);
}
}
#[test]
fn cache_clear_reset_on_next_modification() {
let original_values = [1, 2, 3, 4].iter().cloned().collect::<HashSet<usize>>();
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: true,
};
cache.clear(&mut |_, _| ());
assert_eq!(&cache.values, &HashSet::new());
assert_eq!(cache.reset_on_next_modification, false);
}
{
let mut cache = Cache {
values: original_values.clone(),
reset_on_next_modification: false,
};
cache.clear(&mut |_, _| ());
assert_eq!(&cache.values, &HashSet::new());
assert_eq!(cache.reset_on_next_modification, false);
}
}
}

View File

@ -3,6 +3,8 @@ use self::support::*;
macro_rules! generate_tests {
(server: $make_server:path, client: $make_client:path) => {
use conduit_proxy_controller_grpc as pb;
#[test]
fn outbound_asks_controller_api() {
let _ = env_logger::try_init();
@ -32,6 +34,77 @@ macro_rules! generate_tests {
assert_eq!(client.get("/recon"), "nect");
}
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn outbound_destinations_reset_on_reconnect_followed_by_no_endpoints_exists() {
outbound_destinations_reset_on_reconnect(move || {
Some(controller::destination_exists_with_no_endpoints())
})
}
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn outbound_destinations_reset_on_reconnect_followed_by_add_none() {
outbound_destinations_reset_on_reconnect(move || {
Some(controller::destination_add_none())
})
}
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn outbound_destinations_reset_on_reconnect_followed_by_remove_none() {
outbound_destinations_reset_on_reconnect(move || {
Some(controller::destination_remove_none())
})
}
fn outbound_destinations_reset_on_reconnect<F>(f: F)
where F: Fn() -> Option<pb::destination::Update> + Send + 'static
{
use std::thread;
let _ = env_logger::try_init();
let mut env = config::TestEnv::new();
// set the bind timeout to 100 ms.
env.put(config::ENV_BIND_TIMEOUT, "100".to_owned());
let srv = $make_server().route("/", "hello").run();
let ctrl = controller::new()
.destination("initially-exists.ns.svc.cluster.local", srv.addr)
.destination_close("trigger-close.ns.svc.cluster.local")
.destination_fn("initially-exists.ns.svc.cluster.local", f)
.run();
let proxy = proxy::new()
.controller(ctrl)
.outbound(srv)
.run_with_test_env(env);
let initially_exists =
$make_client(proxy.outbound, "initially-exists.ns.svc.cluster.local");
assert_eq!(initially_exists.get("/"), "hello");
// Try to access a different server which will trigger the `destination_close()`
// above.
{
let trigger_close =
$make_client(proxy.outbound, "trigger-close.ns.svc.cluster.local");
let mut req = trigger_close.request_builder("/");
let rsp = trigger_close.request(req.method("GET"));
// the request should time out
assert_eq!(rsp.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
}
// Wait for the reconnect to happen. TODO: Replace this flaky logic.
thread::sleep(Duration::from_millis(1000));
// This will time out since there are no endpoints.
let mut req = initially_exists.request_builder("/");
let rsp = initially_exists.request(req.method("GET"));
// the request should time out
assert_eq!(rsp.status(), http::StatusCode::INTERNAL_SERVER_ERROR);
}
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn outbound_times_out() {

View File

@ -288,6 +288,34 @@ pub fn destination_update(addr: SocketAddr) -> pb::destination::Update {
}
}
pub fn destination_add_none() -> pb::destination::Update {
pb::destination::Update {
update: Some(pb::destination::update::Update::Add(
pb::destination::WeightedAddrSet {
addrs: Vec::new(),
},
)),
}
}
pub fn destination_remove_none() -> pb::destination::Update {
pb::destination::Update {
update: Some(pb::destination::update::Update::Remove(
pb::destination::AddrSet {
addrs: Vec::new(),
},
)),
}
}
pub fn destination_exists_with_no_endpoints() -> pb::destination::Update {
pb::destination::Update {
update: Some(pb::destination::update::Update::NoEndpoints (
pb::destination::NoEndpoints { exists: true }
)),
}
}
fn ip_conv(ip: IpAddr) -> pb::common::IpAddress {
match ip {
IpAddr::V4(v4) => pb::common::IpAddress {