proxy: Drop destination resolutions when unused (#956)
A proxy dispatches requests over a constrained number of routes. When the router's upper bound is reached---and potentially in other future scenarios---router capacity is created by removing unused routes, their load balancers, and all related endpoint stacks. However, in the current regime, the controller subsystem will continue to monitor discovery observations. As the number of active observations expands over time, the controller task ends up with more and more work to do. This change introduces a shared atomic boolean between the resolution returned to the load balancer and the state maintained when communicating with the service. Before the controller polls its active resolutions, it first ensures that all unused resolutions are dropped.
This commit is contained in:
parent
4473fd114d
commit
cd923abf94
|
@ -17,7 +17,7 @@ use conduit_proxy_controller_grpc::destination::client::Destination as Destinati
|
|||
use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2;
|
||||
use conduit_proxy_controller_grpc::destination::{Update as PbUpdate, WeightedAddr};
|
||||
|
||||
use super::{Metadata, ResolveRequest, Update};
|
||||
use super::{Metadata, ResolveRequest, Responder, Update};
|
||||
use control::cache::{Cache, CacheChange, Exists};
|
||||
use control::fully_qualified_authority::FullyQualifiedAuthority;
|
||||
use control::remote_stream::{Receiver, Remote};
|
||||
|
@ -60,7 +60,7 @@ struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
|
|||
addrs: Exists<Cache<SocketAddr, Metadata>>,
|
||||
query: Option<DestinationServiceQuery<T>>,
|
||||
dns_query: Option<IpAddrListFuture>,
|
||||
txs: Vec<mpsc::UnboundedSender<Update>>,
|
||||
responders: Vec<Responder>,
|
||||
}
|
||||
|
||||
// ==== impl Config =====
|
||||
|
@ -107,7 +107,8 @@ where
|
|||
// in `poll_destinations` while the `rpc` service is ready should
|
||||
// be reconnected now, otherwise the task would just sleep...
|
||||
loop {
|
||||
self.poll_new_watches(client);
|
||||
self.poll_resolve_requests(client);
|
||||
self.retain_active_destinations();
|
||||
self.poll_destinations();
|
||||
|
||||
if self.reconnects.is_empty() || !self.rpc_ready {
|
||||
|
@ -116,7 +117,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
fn poll_new_watches(&mut self, client: &mut T) {
|
||||
fn poll_resolve_requests(&mut self, client: &mut T) {
|
||||
loop {
|
||||
// if rpc service isn't ready, not much we can do...
|
||||
match client.poll_ready() {
|
||||
|
@ -141,12 +142,9 @@ where
|
|||
|
||||
// check for any new watches
|
||||
match self.request_rx.poll() {
|
||||
Ok(Async::Ready(Some(ResolveRequest {
|
||||
authority,
|
||||
update_tx,
|
||||
}))) => {
|
||||
trace!("Destination.Get {:?}", authority);
|
||||
match self.destinations.entry(authority) {
|
||||
Ok(Async::Ready(Some(resolve))) => {
|
||||
trace!("Destination.Get {:?}", resolve.authority);
|
||||
match self.destinations.entry(resolve.authority) {
|
||||
Entry::Occupied(mut occ) => {
|
||||
let set = occ.get_mut();
|
||||
// we may already know of some addresses here, so push
|
||||
|
@ -154,13 +152,13 @@ where
|
|||
match set.addrs {
|
||||
Exists::Yes(ref cache) => for (&addr, meta) in cache {
|
||||
let update = Update::Insert(addr, meta.clone());
|
||||
update_tx
|
||||
resolve.responder.update_tx
|
||||
.unbounded_send(update)
|
||||
.expect("unbounded_send does not fail");
|
||||
},
|
||||
Exists::No | Exists::Unknown => (),
|
||||
}
|
||||
set.txs.push(update_tx);
|
||||
set.responders.push(resolve.responder);
|
||||
},
|
||||
Entry::Vacant(vac) => {
|
||||
let query = Self::query_destination_service_if_relevant(
|
||||
|
@ -173,7 +171,7 @@ where
|
|||
addrs: Exists::Unknown,
|
||||
query,
|
||||
dns_query: None,
|
||||
txs: vec![update_tx],
|
||||
responders: vec![resolve.responder],
|
||||
};
|
||||
// If the authority is one for which the Destination service is never
|
||||
// relevant (e.g. an absolute name that doesn't end in ".svc.$zone." in
|
||||
|
@ -219,6 +217,16 @@ where
|
|||
false
|
||||
}
|
||||
|
||||
/// Ensures that `destinations` is updated to only maintain active resolutions.
|
||||
///
|
||||
/// If there are no active resolutions for a destination, the destination is removed.
|
||||
fn retain_active_destinations(&mut self) {
|
||||
self.destinations.retain(|_, ref mut dst| {
|
||||
dst.responders.retain(|r| r.is_active());
|
||||
dst.responders.len() > 0
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_destinations(&mut self) {
|
||||
for (auth, set) in &mut self.destinations {
|
||||
// Query the Destination service first.
|
||||
|
@ -312,10 +320,10 @@ where
|
|||
self.dns_query = Some(dns_resolver.resolve_all_ips(delay, &authority.host));
|
||||
}
|
||||
|
||||
// Processes Destination service updates from `rx`, returning the new query an an indication of
|
||||
// any *change* to whether the service exists as far as the Destination service is concerned,
|
||||
// where `Exists::Unknown` is to be interpreted as "no change in existence" instead of
|
||||
// "unknown".
|
||||
// Processes Destination service updates from `request_rx`, returning the new query
|
||||
// and an indication of any *change* to whether the service exists as far as the
|
||||
// Destination service is concerned, where `Exists::Unknown` is to be interpreted as
|
||||
// "no change in existence" instead of "unknown".
|
||||
fn poll_destination_service(
|
||||
&mut self,
|
||||
auth: &DnsNameAndPort,
|
||||
|
@ -437,7 +445,7 @@ impl<T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
Exists::Unknown | Exists::No => Cache::new(),
|
||||
};
|
||||
cache.update_union(addrs_to_add, &mut |change| {
|
||||
Self::on_change(&mut self.txs, authority_for_logging, change)
|
||||
Self::on_change(&mut self.responders, authority_for_logging, change)
|
||||
});
|
||||
self.addrs = Exists::Yes(cache);
|
||||
}
|
||||
|
@ -449,7 +457,7 @@ impl<T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
let cache = match self.addrs.take() {
|
||||
Exists::Yes(mut cache) => {
|
||||
cache.remove(addrs_to_remove, &mut |change| {
|
||||
Self::on_change(&mut self.txs, authority_for_logging, change)
|
||||
Self::on_change(&mut self.responders, authority_for_logging, change)
|
||||
});
|
||||
cache
|
||||
},
|
||||
|
@ -467,7 +475,7 @@ impl<T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
match self.addrs.take() {
|
||||
Exists::Yes(mut cache) => {
|
||||
cache.clear(&mut |change| {
|
||||
Self::on_change(&mut self.txs, authority_for_logging, change)
|
||||
Self::on_change(&mut self.responders, authority_for_logging, change)
|
||||
});
|
||||
},
|
||||
Exists::Unknown | Exists::No => (),
|
||||
|
@ -480,7 +488,7 @@ impl<T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
}
|
||||
|
||||
fn on_change(
|
||||
txs: &mut Vec<mpsc::UnboundedSender<Update>>,
|
||||
responders: &mut Vec<Responder>,
|
||||
authority_for_logging: &DnsNameAndPort,
|
||||
change: CacheChange<SocketAddr, Metadata>,
|
||||
) {
|
||||
|
@ -497,7 +505,10 @@ impl<T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
};
|
||||
trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging);
|
||||
// retain is used to drop any senders that are dead
|
||||
txs.retain(|tx| tx.unbounded_send(update.clone()).is_ok());
|
||||
responders.retain(|r| {
|
||||
let sent = r.update_tx.unbounded_send(update.clone());
|
||||
sent.is_ok()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,32 @@
|
|||
//! A client for the controller's Destination service.
|
||||
//!
|
||||
//! This client is split into two primary components: A `Resolver`, that routers use to
|
||||
//! initiate service discovery for a given name, and a `background::Process` that
|
||||
//! satisfies these resolution requests. These components are separated by a channel so
|
||||
//! that the thread responsible for proxying data need not also do this administrative
|
||||
//! work of communicating with the control plane.
|
||||
//!
|
||||
//! The number of active resolutions is not currently bounded by this module. Instead, we
|
||||
//! trust that callers of `Resolver` enforce such a constraint (for example, via
|
||||
//! `conduit_proxy_router`'s LRU cache). Additionally, users of this module must ensure
|
||||
//! they consume resolutions as they are sent so that the response channels don't grow
|
||||
//! without bounds.
|
||||
//!
|
||||
//! Furthermore, there are not currently any bounds on the number of endpoints that may be
|
||||
//! returned for a single resolution. It is expected that the Destination service enforce
|
||||
//! some reasonable upper bounds.
|
||||
//!
|
||||
//! ## TODO
|
||||
//!
|
||||
//! - Given that the underlying gRPC client has some max number of concurrent streams, we
|
||||
//! actually do have an upper bound on concurrent resolutions. This needs to be made
|
||||
//! more explicit.
|
||||
//! - We need some means to limit the number of endpoints that can be returned for a
|
||||
//! single resolution so that `control::Cache` is not effectively unbounded.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use futures::sync::mpsc;
|
||||
use futures::{Async, Poll, Stream};
|
||||
|
@ -23,22 +50,43 @@ pub struct Resolver {
|
|||
request_tx: mpsc::UnboundedSender<ResolveRequest>,
|
||||
}
|
||||
|
||||
/// Requests that resolution updaes for `authority` be sent on `responder`.
|
||||
#[derive(Debug)]
|
||||
struct ResolveRequest {
|
||||
authority: DnsNameAndPort,
|
||||
responder: Responder,
|
||||
}
|
||||
|
||||
/// A handle through which response updates may be sent.
|
||||
#[derive(Debug)]
|
||||
struct Responder {
|
||||
/// Sends updates from the controller to a `Resolution`.
|
||||
update_tx: mpsc::UnboundedSender<Update>,
|
||||
|
||||
/// Indicates whether the corresponding `Resolution` is still active.
|
||||
active: Weak<()>,
|
||||
}
|
||||
|
||||
/// A `tower_discover::Discover`, given to a `tower_balance::Balance`.
|
||||
#[derive(Debug)]
|
||||
pub struct Resolution<B> {
|
||||
/// Receives updates from the controller.
|
||||
update_rx: mpsc::UnboundedReceiver<Update>,
|
||||
|
||||
/// Allows `Responder` to detect when its `Resolution` has been lost.
|
||||
///
|
||||
/// `Responder` holds a weak reference to this `Arc` and can determine when this
|
||||
/// reference has been dropped.
|
||||
_active: Arc<()>,
|
||||
|
||||
/// Map associating addresses with the `Store` for the watch on that
|
||||
/// service's metric labels (as provided by the Destination service).
|
||||
///
|
||||
/// This is used to update the `Labeled` middleware on those services
|
||||
/// without requiring the service stack to be re-bound.
|
||||
metric_labels: HashMap<SocketAddr, Store<Option<DstLabels>>>,
|
||||
|
||||
/// Binds an update endpoint to a Service.
|
||||
bind: B,
|
||||
}
|
||||
|
||||
|
@ -100,11 +148,15 @@ impl Resolver {
|
|||
pub fn resolve<B>(&self, authority: &DnsNameAndPort, bind: B) -> Resolution<B> {
|
||||
trace!("resolve; authority={:?}", authority);
|
||||
let (update_tx, update_rx) = mpsc::unbounded();
|
||||
let active = Arc::new(());
|
||||
let req = {
|
||||
let authority = authority.clone();
|
||||
ResolveRequest {
|
||||
authority,
|
||||
update_tx,
|
||||
responder: Responder {
|
||||
update_tx,
|
||||
active: Arc::downgrade(&active),
|
||||
},
|
||||
}
|
||||
};
|
||||
self.request_tx
|
||||
|
@ -113,6 +165,7 @@ impl Resolver {
|
|||
|
||||
Resolution {
|
||||
update_rx,
|
||||
_active: active,
|
||||
metric_labels: HashMap::new(),
|
||||
bind,
|
||||
}
|
||||
|
@ -192,6 +245,14 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
// ===== impl Responder =====
|
||||
|
||||
impl Responder {
|
||||
fn is_active(&self) -> bool {
|
||||
self.active.upgrade().is_some()
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Metadata =====
|
||||
|
||||
impl Metadata {
|
||||
|
|
Loading…
Reference in New Issue