diff --git a/proxy/src/control/destination/background.rs b/proxy/src/control/destination/background.rs index 0bfe65014..0390c0a4d 100644 --- a/proxy/src/control/destination/background.rs +++ b/proxy/src/control/destination/background.rs @@ -17,7 +17,7 @@ use conduit_proxy_controller_grpc::destination::client::Destination as Destinati use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2; use conduit_proxy_controller_grpc::destination::{Update as PbUpdate, WeightedAddr}; -use super::{Metadata, ResolveRequest, Update}; +use super::{Metadata, ResolveRequest, Responder, Update}; use control::cache::{Cache, CacheChange, Exists}; use control::fully_qualified_authority::FullyQualifiedAuthority; use control::remote_stream::{Receiver, Remote}; @@ -60,7 +60,7 @@ struct DestinationSet> { addrs: Exists>, query: Option>, dns_query: Option, - txs: Vec>, + responders: Vec, } // ==== impl Config ===== @@ -107,7 +107,8 @@ where // in `poll_destinations` while the `rpc` service is ready should // be reconnected now, otherwise the task would just sleep... loop { - self.poll_new_watches(client); + self.poll_resolve_requests(client); + self.retain_active_destinations(); self.poll_destinations(); if self.reconnects.is_empty() || !self.rpc_ready { @@ -116,7 +117,7 @@ where } } - fn poll_new_watches(&mut self, client: &mut T) { + fn poll_resolve_requests(&mut self, client: &mut T) { loop { // if rpc service isn't ready, not much we can do... match client.poll_ready() { @@ -141,12 +142,9 @@ where // check for any new watches match self.request_rx.poll() { - Ok(Async::Ready(Some(ResolveRequest { - authority, - update_tx, - }))) => { - trace!("Destination.Get {:?}", authority); - match self.destinations.entry(authority) { + Ok(Async::Ready(Some(resolve))) => { + trace!("Destination.Get {:?}", resolve.authority); + match self.destinations.entry(resolve.authority) { Entry::Occupied(mut occ) => { let set = occ.get_mut(); // we may already know of some addresses here, so push @@ -154,13 +152,13 @@ where match set.addrs { Exists::Yes(ref cache) => for (&addr, meta) in cache { let update = Update::Insert(addr, meta.clone()); - update_tx + resolve.responder.update_tx .unbounded_send(update) .expect("unbounded_send does not fail"); }, Exists::No | Exists::Unknown => (), } - set.txs.push(update_tx); + set.responders.push(resolve.responder); }, Entry::Vacant(vac) => { let query = Self::query_destination_service_if_relevant( @@ -173,7 +171,7 @@ where addrs: Exists::Unknown, query, dns_query: None, - txs: vec![update_tx], + responders: vec![resolve.responder], }; // If the authority is one for which the Destination service is never // relevant (e.g. an absolute name that doesn't end in ".svc.$zone." in @@ -219,6 +217,16 @@ where false } + /// Ensures that `destinations` is updated to only maintain active resolutions. + /// + /// If there are no active resolutions for a destination, the destination is removed. + fn retain_active_destinations(&mut self) { + self.destinations.retain(|_, ref mut dst| { + dst.responders.retain(|r| r.is_active()); + dst.responders.len() > 0 + }) + } + fn poll_destinations(&mut self) { for (auth, set) in &mut self.destinations { // Query the Destination service first. @@ -312,10 +320,10 @@ where self.dns_query = Some(dns_resolver.resolve_all_ips(delay, &authority.host)); } - // Processes Destination service updates from `rx`, returning the new query an an indication of - // any *change* to whether the service exists as far as the Destination service is concerned, - // where `Exists::Unknown` is to be interpreted as "no change in existence" instead of - // "unknown". + // Processes Destination service updates from `request_rx`, returning the new query + // and an indication of any *change* to whether the service exists as far as the + // Destination service is concerned, where `Exists::Unknown` is to be interpreted as + // "no change in existence" instead of "unknown". fn poll_destination_service( &mut self, auth: &DnsNameAndPort, @@ -437,7 +445,7 @@ impl> DestinationSet { Exists::Unknown | Exists::No => Cache::new(), }; cache.update_union(addrs_to_add, &mut |change| { - Self::on_change(&mut self.txs, authority_for_logging, change) + Self::on_change(&mut self.responders, authority_for_logging, change) }); self.addrs = Exists::Yes(cache); } @@ -449,7 +457,7 @@ impl> DestinationSet { let cache = match self.addrs.take() { Exists::Yes(mut cache) => { cache.remove(addrs_to_remove, &mut |change| { - Self::on_change(&mut self.txs, authority_for_logging, change) + Self::on_change(&mut self.responders, authority_for_logging, change) }); cache }, @@ -467,7 +475,7 @@ impl> DestinationSet { match self.addrs.take() { Exists::Yes(mut cache) => { cache.clear(&mut |change| { - Self::on_change(&mut self.txs, authority_for_logging, change) + Self::on_change(&mut self.responders, authority_for_logging, change) }); }, Exists::Unknown | Exists::No => (), @@ -480,7 +488,7 @@ impl> DestinationSet { } fn on_change( - txs: &mut Vec>, + responders: &mut Vec, authority_for_logging: &DnsNameAndPort, change: CacheChange, ) { @@ -497,7 +505,10 @@ impl> DestinationSet { }; trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging); // retain is used to drop any senders that are dead - txs.retain(|tx| tx.unbounded_send(update.clone()).is_ok()); + responders.retain(|r| { + let sent = r.update_tx.unbounded_send(update.clone()); + sent.is_ok() + }); } } diff --git a/proxy/src/control/destination/mod.rs b/proxy/src/control/destination/mod.rs index 64d4c2ab5..aeb2d5535 100644 --- a/proxy/src/control/destination/mod.rs +++ b/proxy/src/control/destination/mod.rs @@ -1,5 +1,32 @@ +//! A client for the controller's Destination service. +//! +//! This client is split into two primary components: A `Resolver`, that routers use to +//! initiate service discovery for a given name, and a `background::Process` that +//! satisfies these resolution requests. These components are separated by a channel so +//! that the thread responsible for proxying data need not also do this administrative +//! work of communicating with the control plane. +//! +//! The number of active resolutions is not currently bounded by this module. Instead, we +//! trust that callers of `Resolver` enforce such a constraint (for example, via +//! `conduit_proxy_router`'s LRU cache). Additionally, users of this module must ensure +//! they consume resolutions as they are sent so that the response channels don't grow +//! without bounds. +//! +//! Furthermore, there are not currently any bounds on the number of endpoints that may be +//! returned for a single resolution. It is expected that the Destination service enforce +//! some reasonable upper bounds. +//! +//! ## TODO +//! +//! - Given that the underlying gRPC client has some max number of concurrent streams, we +//! actually do have an upper bound on concurrent resolutions. This needs to be made +//! more explicit. +//! - We need some means to limit the number of endpoints that can be returned for a +//! single resolution so that `control::Cache` is not effectively unbounded. + use std::collections::HashMap; use std::net::SocketAddr; +use std::sync::{Arc, Weak}; use futures::sync::mpsc; use futures::{Async, Poll, Stream}; @@ -23,22 +50,43 @@ pub struct Resolver { request_tx: mpsc::UnboundedSender, } +/// Requests that resolution updaes for `authority` be sent on `responder`. #[derive(Debug)] struct ResolveRequest { authority: DnsNameAndPort, + responder: Responder, +} + +/// A handle through which response updates may be sent. +#[derive(Debug)] +struct Responder { + /// Sends updates from the controller to a `Resolution`. update_tx: mpsc::UnboundedSender, + + /// Indicates whether the corresponding `Resolution` is still active. + active: Weak<()>, } /// A `tower_discover::Discover`, given to a `tower_balance::Balance`. #[derive(Debug)] pub struct Resolution { + /// Receives updates from the controller. update_rx: mpsc::UnboundedReceiver, + + /// Allows `Responder` to detect when its `Resolution` has been lost. + /// + /// `Responder` holds a weak reference to this `Arc` and can determine when this + /// reference has been dropped. + _active: Arc<()>, + /// Map associating addresses with the `Store` for the watch on that /// service's metric labels (as provided by the Destination service). /// /// This is used to update the `Labeled` middleware on those services /// without requiring the service stack to be re-bound. metric_labels: HashMap>>, + + /// Binds an update endpoint to a Service. bind: B, } @@ -100,11 +148,15 @@ impl Resolver { pub fn resolve(&self, authority: &DnsNameAndPort, bind: B) -> Resolution { trace!("resolve; authority={:?}", authority); let (update_tx, update_rx) = mpsc::unbounded(); + let active = Arc::new(()); let req = { let authority = authority.clone(); ResolveRequest { authority, - update_tx, + responder: Responder { + update_tx, + active: Arc::downgrade(&active), + }, } }; self.request_tx @@ -113,6 +165,7 @@ impl Resolver { Resolution { update_rx, + _active: active, metric_labels: HashMap::new(), bind, } @@ -192,6 +245,14 @@ where } } +// ===== impl Responder ===== + +impl Responder { + fn is_active(&self) -> bool { + self.active.upgrade().is_some() + } +} + // ===== impl Metadata ===== impl Metadata {