diff --git a/proxy/src/bind.rs b/proxy/src/bind.rs index 223a15740..c33caf839 100644 --- a/proxy/src/bind.rs +++ b/proxy/src/bind.rs @@ -13,7 +13,7 @@ use tower_h2; use tower_reconnect::Reconnect; use control; -use control::discovery::Endpoint; +use control::destination::Endpoint; use ctx; use telemetry::{self, sensor}; use transparency::{self, HttpBody, h1}; @@ -253,7 +253,7 @@ impl Bind { } } -impl control::discovery::Bind for BindProtocol, B> +impl control::destination::Bind for BindProtocol, B> where B: tower_h2::Body + 'static, { diff --git a/proxy/src/control/discovery.rs b/proxy/src/control/destination/background.rs similarity index 55% rename from proxy/src/control/discovery.rs rename to proxy/src/control/destination/background.rs index 841b37c34..0bfe65014 100644 --- a/proxy/src/control/discovery.rs +++ b/proxy/src/control/destination/background.rs @@ -1,78 +1,48 @@ -use std::{cmp, fmt, hash}; -use std::collections::VecDeque; -use std::collections::hash_map::{Entry, HashMap}; +use futures::sync::mpsc; +use futures::{Async, Future, Stream}; +use std::collections::{ + hash_map::{Entry, HashMap}, + VecDeque, +}; +use std::fmt; use std::iter::IntoIterator; use std::net::SocketAddr; use std::time::Duration; - -use futures::{Async, Future, Poll, Stream}; -use futures::sync::mpsc; -use futures_watch; -use http; use tokio_core::reactor::Handle; -use tower_service::Service; -use tower_h2::{HttpService, BoxBody, RecvBody}; -use tower_discover::{Change, Discover}; use tower_grpc as grpc; - -use dns::{self, IpAddrListFuture}; -use super::fully_qualified_authority::FullyQualifiedAuthority; +use tower_h2::{BoxBody, HttpService, RecvBody}; use conduit_proxy_controller_grpc::common::{Destination, TcpAddress}; -use conduit_proxy_controller_grpc::destination::{ - Update as PbUpdate, - WeightedAddr, -}; +use conduit_proxy_controller_grpc::destination::client::Destination as DestinationSvc; use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2; -use conduit_proxy_controller_grpc::destination::client::{Destination as DestinationSvc}; -use transport::DnsNameAndPort; +use conduit_proxy_controller_grpc::destination::{Update as PbUpdate, WeightedAddr}; +use super::{Metadata, ResolveRequest, Update}; use control::cache::{Cache, CacheChange, Exists}; -use control::remote_stream::{Remote, Receiver}; - -use ::telemetry::metrics::DstLabels; - -/// A handle to start watching a destination for address changes. -#[derive(Clone, Debug)] -pub struct Discovery { - tx: mpsc::UnboundedSender<(DnsNameAndPort, mpsc::UnboundedSender)>, -} - -#[derive(Clone, Debug)] -pub struct Endpoint { - address: SocketAddr, - dst_labels: Option, -} - -pub type DstLabelsWatch = futures_watch::Watch>; +use control::fully_qualified_authority::FullyQualifiedAuthority; +use control::remote_stream::{Receiver, Remote}; +use dns::{self, IpAddrListFuture}; +use telemetry::metrics::DstLabels; +use transport::DnsNameAndPort; type DestinationServiceQuery = Remote; type UpdateRx = Receiver; -/// A `tower_discover::Discover`, given to a `tower_balance::Balance`. +/// Stores the configuration for a destination background worker. #[derive(Debug)] -pub struct Watch { - rx: mpsc::UnboundedReceiver, - /// Map associating addresses with the `Store` for the watch on that - /// service's metric labels (as provided by the Destination service). - /// - /// This is used to update the `Labeled` middleware on those services - /// without requiring the service stack to be re-bound. - metric_labels: HashMap>>, - bind: B, -} - -/// A background handle to eventually bind on the controller thread. -#[derive(Debug)] -pub struct Background { - rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender)>, +pub struct Config { + request_rx: mpsc::UnboundedReceiver, dns_config: dns::Config, default_destination_namespace: String, } -/// A future returned from `Background::work()`, doing the work of talking to -/// the controller destination API. -pub struct DiscoveryWork> { +/// Satisfies resolutions as requested via `request_rx`. +/// +/// As `Process` is polled with a client to Destination service, if the client to the +/// service is healthy, it reads requests from `request_rx`, determines how to resolve the +/// provided authority to a set of addresses, and ensures that resolution updates are +/// propagated to all requesters. +pub struct Process> { dns_resolver: dns::Resolver, default_destination_namespace: String, destinations: HashMap>, @@ -82,16 +52,10 @@ pub struct DiscoveryWork> { /// Each poll, records whether the rpc service was till ready. rpc_ready: bool, /// A receiver of new watch requests. - rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender)>, -} - -/// Any additional metadata describing a discovered service. -#[derive(Clone, Debug, Hash, Eq, PartialEq)] -struct Metadata { - /// A set of Prometheus metric labels describing the destination. - metric_labels: Option, + request_rx: mpsc::UnboundedReceiver, } +/// Holds the state of a single resolution. struct DestinationSet> { addrs: Exists>, query: Option>, @@ -99,211 +63,41 @@ struct DestinationSet> { txs: Vec>, } -#[derive(Debug, Clone)] -enum Update { - Insert(SocketAddr, Metadata), - Remove(SocketAddr), - ChangeMetadata(SocketAddr, Metadata), -} +// ==== impl Config ===== -/// Bind a `SocketAddr` with a protocol. -pub trait Bind { - /// The type of endpoint upon which a `Service` is bound. - type Endpoint; - - /// Requests handled by the discovered services - type Request; - - /// Responses given by the discovered services - type Response; - - /// Errors produced by the discovered services - type Error; - - type BindError; - - /// The discovered `Service` instance. - type Service: Service; - - /// Bind a service from an endpoint. - fn bind(&self, addr: &Self::Endpoint) -> Result; -} - -/// Creates a "channel" of `Discovery` to `Background` handles. -/// -/// The `Discovery` is used by a listener, the `Background` is consumed -/// on the controller thread. -pub fn new(dns_config: dns::Config, default_destination_namespace: String) -> (Discovery, Background) { - let (tx, rx) = mpsc::unbounded(); - ( - Discovery { - tx, - }, - Background { - rx, +impl Config { + pub(super) fn new( + request_rx: mpsc::UnboundedReceiver, + dns_config: dns::Config, + default_destination_namespace: String, + ) -> Self { + Self { + request_rx, dns_config, default_destination_namespace, - }, - ) -} - -// ==== impl Discovery ===== - -impl Discovery { - /// Start watching for address changes for a certain authority. - pub fn resolve(&self, authority: &DnsNameAndPort, bind: B) -> Watch { - trace!("resolve; authority={:?}", authority); - let (tx, rx) = mpsc::unbounded(); - self.tx - .unbounded_send((authority.clone(), tx)) - .expect("unbounded can't fail"); - - Watch { - rx, - metric_labels: HashMap::new(), - bind, - } - } -} - -// ==== impl Endpoint ===== - -impl Endpoint { - pub fn new(address: SocketAddr, dst_labels: DstLabelsWatch) -> Self { - Self { - address, - dst_labels: Some(dst_labels), } } - pub fn address(&self) -> SocketAddr { - self.address - } - - pub fn dst_labels(&self) -> Option<&DstLabelsWatch> { - self.dst_labels.as_ref() - } -} - -impl From for Endpoint { - fn from(address: SocketAddr) -> Self { - Self { - address, - dst_labels: None, - } - } -} - -impl hash::Hash for Endpoint { - fn hash(&self, state: &mut H) { - self.address.hash(state) - } -} - -impl cmp::PartialEq for Endpoint { - fn eq(&self, other: &Self) -> bool { - self.address.eq(&other.address) - } -} - -impl cmp::Eq for Endpoint {} - -// ==== impl Watch ===== - -impl Watch { - fn update_metadata(&mut self, addr: SocketAddr, meta: Metadata) -> Result<(), ()> { - if let Some(store) = self.metric_labels.get_mut(&addr) { - store.store(meta.metric_labels) - .map_err(|e| { - error!("update_metadata: label store error: {:?}", e); - }) - .map(|_| ()) - } else { - // The store has already been removed, so nobody cares about - // the metadata change. We expect that this shouldn't happen, - // but if it does, log a warning and handle it gracefully. - warn!( - "update_metadata: ignoring ChangeMetadata for {:?} \ - because the service no longer exists.", - addr - ); - Ok(()) - } - } -} - -impl Discover for Watch -where - B: Bind>, -{ - type Key = SocketAddr; - type Request = B::Request; - type Response = B::Response; - type Error = B::Error; - type Service = B::Service; - type DiscoverError = (); - - fn poll(&mut self) -> Poll, Self::DiscoverError> { - loop { - let up = self.rx.poll(); - trace!("watch: {:?}", up); - let update = try_ready!(up).expect("discovery stream must be infinite"); - - match update { - Update::Insert(addr, meta) => { - // Construct a watch for the `Labeled` middleware that will - // wrap the bound service, and insert the store into our map - // so it can be updated later. - let (labels_watch, labels_store) = - futures_watch::Watch::new(meta.metric_labels); - self.metric_labels.insert(addr, labels_store); - - let endpoint = Endpoint::new(addr, labels_watch.clone()); - - let service = self.bind.bind(&endpoint) - .map_err(|_| ())?; - - return Ok(Async::Ready(Change::Insert(addr, service))) - }, - Update::ChangeMetadata(addr, meta) => { - // Update metadata and continue polling `rx`. - self.update_metadata(addr, meta)?; - }, - Update::Remove(addr) => { - // It's safe to drop the store handle here, even if - // the `Labeled` middleware using the watch handle - // still exists --- it will simply read the final - // value from the watch. - self.metric_labels.remove(&addr); - return Ok(Async::Ready(Change::Remove(addr))); - }, - } - } - } -} - -// ==== impl Background ===== - -impl Background { /// Bind this handle to start talking to the controller API. - pub fn work(self, executor: &Handle) -> DiscoveryWork - where T: HttpService, - T::Error: fmt::Debug, + pub fn process(self, executor: &Handle) -> Process + where + T: HttpService, + T::Error: fmt::Debug, { - DiscoveryWork { + Process { dns_resolver: dns::Resolver::new(self.dns_config, executor), default_destination_namespace: self.default_destination_namespace, destinations: HashMap::new(), reconnects: VecDeque::new(), rpc_ready: false, - rx: self.rx, + request_rx: self.request_rx, } } } -// ==== impl DiscoveryWork ===== +// ==== impl Process ===== -impl DiscoveryWork +impl Process where T: HttpService, T::Error: fmt::Debug, @@ -328,16 +122,16 @@ where match client.poll_ready() { Ok(Async::Ready(())) => { self.rpc_ready = true; - } + }, Ok(Async::NotReady) => { self.rpc_ready = false; break; - } + }, Err(err) => { warn!("Destination.Get poll_ready error: {:?}", err); self.rpc_ready = false; break; - } + }, } // handle any pending reconnects first @@ -346,40 +140,40 @@ where } // check for any new watches - match self.rx.poll() { - Ok(Async::Ready(Some((auth, tx)))) => { - trace!("Destination.Get {:?}", auth); - match self.destinations.entry(auth) { + match self.request_rx.poll() { + Ok(Async::Ready(Some(ResolveRequest { + authority, + update_tx, + }))) => { + trace!("Destination.Get {:?}", authority); + match self.destinations.entry(authority) { Entry::Occupied(mut occ) => { let set = occ.get_mut(); // we may already know of some addresses here, so push // them onto the new watch first match set.addrs { - Exists::Yes(ref cache) => { - for (&addr, meta) in cache { - let update = Update::Insert( - addr, - meta.clone() - ); - tx.unbounded_send(update) - .expect("unbounded_send does not fail"); - } + Exists::Yes(ref cache) => for (&addr, meta) in cache { + let update = Update::Insert(addr, meta.clone()); + update_tx + .unbounded_send(update) + .expect("unbounded_send does not fail"); }, Exists::No | Exists::Unknown => (), } - set.txs.push(tx); - } + set.txs.push(update_tx); + }, Entry::Vacant(vac) => { let query = Self::query_destination_service_if_relevant( &self.default_destination_namespace, client, vac.key(), - "connect"); + "connect", + ); let mut set = DestinationSet { addrs: Exists::Unknown, query, dns_query: None, - txs: vec![tx], + txs: vec![update_tx], }; // If the authority is one for which the Destination service is never // relevant (e.g. an absolute name that doesn't end in ".svc.$zone." in @@ -388,16 +182,17 @@ where set.reset_dns_query( &self.dns_resolver, Duration::from_secs(0), - vac.key()); + vac.key(), + ); } vac.insert(set); - } + }, } - } + }, Ok(Async::Ready(None)) => { trace!("Discover tx is dropped, shutdown?"); return; - } + }, Ok(Async::NotReady) => break, Err(_) => unreachable!("unbounded receiver doesn't error"), } @@ -414,7 +209,8 @@ where &self.default_destination_namespace, client, &auth, - "reconnect"); + "reconnect", + ); return true; } else { trace!("reconnect no longer needed: {:?}", auth); @@ -427,7 +223,7 @@ where for (auth, set) in &mut self.destinations { // Query the Destination service first. let (new_query, found_by_destination_service) = match set.query.take() { - Some(Remote::ConnectedOrConnecting{ rx }) => { + Some(Remote::ConnectedOrConnecting { rx }) => { let (new_query, found_by_destination_service) = set.poll_destination_service(auth, rx); if let Remote::NeedsReconnect = new_query { @@ -473,36 +269,45 @@ where default_destination_namespace: &str, client: &mut T, auth: &DnsNameAndPort, - connect_or_reconnect: &str) - -> Option> - { - trace!("DestinationServiceQuery {} {:?}", connect_or_reconnect, auth); - FullyQualifiedAuthority::normalize(auth, default_destination_namespace) - .map(|auth| { - let req = Destination { - scheme: "k8s".into(), - path: auth.without_trailing_dot().to_owned(), - }; - let mut svc = DestinationSvc::new(client.lift_ref()); - let response = svc.get(grpc::Request::new(req)); - Remote::ConnectedOrConnecting { rx: Receiver::new(response) } - }) + connect_or_reconnect: &str, + ) -> Option> { + trace!( + "DestinationServiceQuery {} {:?}", + connect_or_reconnect, + auth + ); + FullyQualifiedAuthority::normalize(auth, default_destination_namespace).map(|auth| { + let req = Destination { + scheme: "k8s".into(), + path: auth.without_trailing_dot().to_owned(), + }; + let mut svc = DestinationSvc::new(client.lift_ref()); + let response = svc.get(grpc::Request::new(req)); + Remote::ConnectedOrConnecting { + rx: Receiver::new(response), + } + }) } } // ===== impl DestinationSet ===== impl DestinationSet - where T: HttpService, - T::Error: fmt::Debug +where + T: HttpService, + T::Error: fmt::Debug, { fn reset_dns_query( &mut self, dns_resolver: &dns::Resolver, delay: Duration, - authority: &DnsNameAndPort) - { - trace!("resetting DNS query for {} with delay {:?}", authority.host, delay); + authority: &DnsNameAndPort, + ) { + trace!( + "resetting DNS query for {} with delay {:?}", + authority.host, + delay + ); self.reset_on_next_modification(); self.dns_query = Some(dns_resolver.resolve_all_ips(delay, &authority.host)); } @@ -514,9 +319,8 @@ impl DestinationSet fn poll_destination_service( &mut self, auth: &DnsNameAndPort, - mut rx: UpdateRx) - -> (DestinationServiceQuery, Exists<()>) - { + mut rx: UpdateRx, + ) -> (DestinationServiceQuery, Exists<()>) { let mut exists = Exists::Unknown; loop { @@ -524,7 +328,9 @@ impl DestinationSet Ok(Async::Ready(Some(update))) => match update.update { Some(PbUpdate2::Add(a_set)) => { let set_labels = a_set.metric_labels; - let addrs = a_set.addrs.into_iter() + let addrs = a_set + .addrs + .into_iter() .filter_map(|pb| pb_to_addr_meta(pb, &set_labels)); self.add(auth, addrs) }, @@ -532,7 +338,10 @@ impl DestinationSet exists = Exists::Yes(()); self.remove( auth, - r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone())) + r_set + .addrs + .iter() + .filter_map(|addr| pb_to_sock_addr(addr.clone())), ); }, Some(PbUpdate2::NoEndpoints(ref no_endpoints)) if no_endpoints.exists => { @@ -558,7 +367,7 @@ impl DestinationSet Err(err) => { warn!("Destination.Get stream errored for {:?}: {:?}", auth, err); return (Remote::NeedsReconnect, exists); - } + }, }; } } @@ -574,13 +383,26 @@ impl DestinationSet return; }, Ok(Async::Ready(dns::Response::Exists(ips))) => { - trace!("positive result of DNS query for {:?}: {:?}", authority, ips); - self.add(authority, ips.iter().map(|ip| { - (SocketAddr::from((ip, authority.port)), Metadata::no_metadata()) - })); + trace!( + "positive result of DNS query for {:?}: {:?}", + authority, + ips + ); + self.add( + authority, + ips.iter().map(|ip| { + ( + SocketAddr::from((ip, authority.port)), + Metadata::no_metadata(), + ) + }), + ); }, Ok(Async::Ready(dns::Response::DoesNotExist)) => { - trace!("negative result (NXDOMAIN) of DNS query for {:?}", authority); + trace!( + "negative result (NXDOMAIN) of DNS query for {:?}", + authority + ); self.no_endpoints(authority, false); }, Err(e) => { @@ -596,38 +418,39 @@ impl DestinationSet } } -impl > DestinationSet { +impl> DestinationSet { fn reset_on_next_modification(&mut self) { match self.addrs { Exists::Yes(ref mut cache) => { cache.set_reset_on_next_modification(); }, - Exists::No | - Exists::Unknown => (), + Exists::No | Exists::Unknown => (), } } fn add(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_add: A) - where A: Iterator + where + A: Iterator, { let mut cache = match self.addrs.take() { Exists::Yes(mut cache) => cache, Exists::Unknown | Exists::No => Cache::new(), }; - cache.update_union( - addrs_to_add, - &mut |change| Self::on_change(&mut self.txs, authority_for_logging, change)); + cache.update_union(addrs_to_add, &mut |change| { + Self::on_change(&mut self.txs, authority_for_logging, change) + }); self.addrs = Exists::Yes(cache); } fn remove(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_remove: A) - where A: Iterator + where + A: Iterator, { let cache = match self.addrs.take() { Exists::Yes(mut cache) => { - cache.remove( - addrs_to_remove, - &mut |change| Self::on_change(&mut self.txs, authority_for_logging, change)); + cache.remove(addrs_to_remove, &mut |change| { + Self::on_change(&mut self.txs, authority_for_logging, change) + }); cache }, Exists::Unknown | Exists::No => Cache::new(), @@ -636,12 +459,16 @@ impl > DestinationSet { } fn no_endpoints(&mut self, authority_for_logging: &DnsNameAndPort, exists: bool) { - trace!("no endpoints for {:?} that is known to {}", authority_for_logging, - if exists { "exist" } else { "not exist" }); + trace!( + "no endpoints for {:?} that is known to {}", + authority_for_logging, + if exists { "exist" } else { "not exist" } + ); match self.addrs.take() { Exists::Yes(mut cache) => { - cache.clear( - &mut |change| Self::on_change(&mut self.txs, authority_for_logging, change)); + cache.clear(&mut |change| { + Self::on_change(&mut self.txs, authority_for_logging, change) + }); }, Exists::Unknown | Exists::No => (), }; @@ -652,38 +479,33 @@ impl > DestinationSet { }; } - fn on_change(txs: &mut Vec>, - authority_for_logging: &DnsNameAndPort, - change: CacheChange) { + fn on_change( + txs: &mut Vec>, + authority_for_logging: &DnsNameAndPort, + change: CacheChange, + ) { let (update_str, update, addr) = match change { - CacheChange::Insertion { key, value } => - ("insert", Update::Insert(key, value.clone()), key), - CacheChange::Removal { key } => - ("remove", Update::Remove(key), key), - CacheChange::Modification { key, new_value } => - ("change metadata for", Update::ChangeMetadata(key, new_value.clone()), key), + CacheChange::Insertion { key, value } => { + ("insert", Update::Insert(key, value.clone()), key) + }, + CacheChange::Removal { key } => ("remove", Update::Remove(key), key), + CacheChange::Modification { key, new_value } => ( + "change metadata for", + Update::ChangeMetadata(key, new_value.clone()), + key, + ), }; trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging); // retain is used to drop any senders that are dead - txs.retain(|tx| { - tx.unbounded_send(update.clone()).is_ok() - }); - } -} - -// ===== impl Metadata ===== - -impl Metadata { - fn no_metadata() -> Self { - Metadata { - metric_labels: None, - } + txs.retain(|tx| tx.unbounded_send(update.clone()).is_ok()); } } /// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`. -fn pb_to_addr_meta(pb: WeightedAddr, set_labels: &HashMap) - -> Option<(SocketAddr, Metadata)> { +fn pb_to_addr_meta( + pb: WeightedAddr, + set_labels: &HashMap, +) -> Option<(SocketAddr, Metadata)> { let addr = pb.addr.and_then(pb_to_sock_addr)?; let label_iter = set_labels.iter().chain(pb.metric_labels.iter()); let meta = Metadata { @@ -715,7 +537,7 @@ fn pb_to_sock_addr(pb: TcpAddress) -> Option { Some(Ip::Ipv4(octets)) => { let ipv4 = Ipv4Addr::from(octets); Some(SocketAddr::from((ipv4, pb.port as u16))) - } + }, Some(Ip::Ipv6(v6)) => { let octets = [ (v6.first >> 56) as u8, @@ -737,7 +559,7 @@ fn pb_to_sock_addr(pb: TcpAddress) -> Option { ]; let ipv6 = Ipv6Addr::from(octets); Some(SocketAddr::from((ipv6, pb.port as u16))) - } + }, None => None, }, None => None, diff --git a/proxy/src/control/destination/endpoint.rs b/proxy/src/control/destination/endpoint.rs new file mode 100644 index 000000000..27cc5a3c6 --- /dev/null +++ b/proxy/src/control/destination/endpoint.rs @@ -0,0 +1,57 @@ +use futures_watch; +use std::{cmp, hash, net::SocketAddr}; + +use telemetry::metrics::DstLabels; + +pub type DstLabelsWatch = futures_watch::Watch>; + +/// An individual traffic target. +/// +/// Equality, Ordering, and hashability is determined soley by the Endpoint's address. +#[derive(Clone, Debug)] +pub struct Endpoint { + address: SocketAddr, + dst_labels: Option, +} + +// ==== impl Endpoint ===== + +impl Endpoint { + pub fn new(address: SocketAddr, dst_labels: DstLabelsWatch) -> Self { + Self { + address, + dst_labels: Some(dst_labels), + } + } + + pub fn address(&self) -> SocketAddr { + self.address + } + + pub fn dst_labels(&self) -> Option<&DstLabelsWatch> { + self.dst_labels.as_ref() + } +} + +impl From for Endpoint { + fn from(address: SocketAddr) -> Self { + Self { + address, + dst_labels: None, + } + } +} + +impl hash::Hash for Endpoint { + fn hash(&self, state: &mut H) { + self.address.hash(state) + } +} + +impl cmp::PartialEq for Endpoint { + fn eq(&self, other: &Self) -> bool { + self.address.eq(&other.address) + } +} + +impl cmp::Eq for Endpoint {} diff --git a/proxy/src/control/destination/mod.rs b/proxy/src/control/destination/mod.rs new file mode 100644 index 000000000..64d4c2ab5 --- /dev/null +++ b/proxy/src/control/destination/mod.rs @@ -0,0 +1,203 @@ +use std::collections::HashMap; +use std::net::SocketAddr; + +use futures::sync::mpsc; +use futures::{Async, Poll, Stream}; +use futures_watch::{Store, Watch}; +use http; +use tower_discover::{Change, Discover}; +use tower_service::Service; + +use dns; +use telemetry::metrics::DstLabels; +use transport::DnsNameAndPort; + +pub mod background; +mod endpoint; + +pub use self::endpoint::{DstLabelsWatch, Endpoint}; + +/// A handle to request resolutions from a `Background`. +#[derive(Clone, Debug)] +pub struct Resolver { + request_tx: mpsc::UnboundedSender, +} + +#[derive(Debug)] +struct ResolveRequest { + authority: DnsNameAndPort, + update_tx: mpsc::UnboundedSender, +} + +/// A `tower_discover::Discover`, given to a `tower_balance::Balance`. +#[derive(Debug)] +pub struct Resolution { + update_rx: mpsc::UnboundedReceiver, + /// Map associating addresses with the `Store` for the watch on that + /// service's metric labels (as provided by the Destination service). + /// + /// This is used to update the `Labeled` middleware on those services + /// without requiring the service stack to be re-bound. + metric_labels: HashMap>>, + bind: B, +} + +/// . +#[derive(Clone, Debug, Hash, Eq, PartialEq)] +struct Metadata { + /// A set of Prometheus metric labels describing the destination. + metric_labels: Option, +} + +#[derive(Debug, Clone)] +enum Update { + Insert(SocketAddr, Metadata), + Remove(SocketAddr), + ChangeMetadata(SocketAddr, Metadata), +} + +/// Bind a `SocketAddr` with a protocol. +pub trait Bind { + /// The type of endpoint upon which a `Service` is bound. + type Endpoint; + + /// Requests handled by the discovered services + type Request; + + /// Responses given by the discovered services + type Response; + + /// Errors produced by the discovered services + type Error; + + type BindError; + + /// The discovered `Service` instance. + type Service: Service; + + /// Bind a service from an endpoint. + fn bind(&self, addr: &Self::Endpoint) -> Result; +} + +/// Creates a "channel" of `Resolver` to `Background` handles. +/// +/// The `Resolver` is used by a listener, the `Background` is consumed +/// on the controller thread. +pub fn new( + dns_config: dns::Config, + default_destination_namespace: String, +) -> (Resolver, background::Config) { + let (request_tx, rx) = mpsc::unbounded(); + let disco = Resolver { request_tx }; + let bg = background::Config::new(rx, dns_config, default_destination_namespace); + (disco, bg) +} + +// ==== impl Resolver ===== + +impl Resolver { + /// Start watching for address changes for a certain authority. + pub fn resolve(&self, authority: &DnsNameAndPort, bind: B) -> Resolution { + trace!("resolve; authority={:?}", authority); + let (update_tx, update_rx) = mpsc::unbounded(); + let req = { + let authority = authority.clone(); + ResolveRequest { + authority, + update_tx, + } + }; + self.request_tx + .unbounded_send(req) + .expect("unbounded can't fail"); + + Resolution { + update_rx, + metric_labels: HashMap::new(), + bind, + } + } +} + +// ==== impl Resolution ===== + +impl Resolution { + fn update_metadata(&mut self, addr: SocketAddr, meta: Metadata) -> Result<(), ()> { + if let Some(store) = self.metric_labels.get_mut(&addr) { + store + .store(meta.metric_labels) + .map_err(|e| { + error!("update_metadata: label store error: {:?}", e); + }) + .map(|_| ()) + } else { + // The store has already been removed, so nobody cares about + // the metadata change. We expect that this shouldn't happen, + // but if it does, log a warning and handle it gracefully. + warn!( + "update_metadata: ignoring ChangeMetadata for {:?} because the service no longer \ + exists.", + addr + ); + Ok(()) + } + } +} + +impl Discover for Resolution +where + B: Bind>, +{ + type Key = SocketAddr; + type Request = B::Request; + type Response = B::Response; + type Error = B::Error; + type Service = B::Service; + type DiscoverError = (); + + fn poll(&mut self) -> Poll, Self::DiscoverError> { + loop { + let up = self.update_rx.poll(); + trace!("watch: {:?}", up); + let update = try_ready!(up).expect("destination stream must be infinite"); + + match update { + Update::Insert(addr, meta) => { + // Construct a watch for the `Labeled` middleware that will + // wrap the bound service, and insert the store into our map + // so it can be updated later. + let (labels_watch, labels_store) = Watch::new(meta.metric_labels); + self.metric_labels.insert(addr, labels_store); + + let endpoint = Endpoint::new(addr, labels_watch.clone()); + + let service = self.bind.bind(&endpoint).map_err(|_| ())?; + + return Ok(Async::Ready(Change::Insert(addr, service))); + }, + Update::ChangeMetadata(addr, meta) => { + // Update metadata and continue polling `rx`. + self.update_metadata(addr, meta)?; + }, + Update::Remove(addr) => { + // It's safe to drop the store handle here, even if + // the `Labeled` middleware using the watch handle + // still exists --- it will simply read the final + // value from the watch. + self.metric_labels.remove(&addr); + return Ok(Async::Ready(Change::Remove(addr))); + }, + } + } + } +} + +// ===== impl Metadata ===== + +impl Metadata { + fn no_metadata() -> Self { + Metadata { + metric_labels: None, + } + } +} diff --git a/proxy/src/control/mod.rs b/proxy/src/control/mod.rs index 884f11a3a..3f3930ff9 100644 --- a/proxy/src/control/mod.rs +++ b/proxy/src/control/mod.rs @@ -21,28 +21,28 @@ use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect}; use timeout::{Timeout, TimeoutError}; mod cache; -pub mod discovery; +pub mod destination; mod fully_qualified_authority; mod observe; pub mod pb; mod remote_stream; -use self::discovery::{Background as DiscoBg, Discovery, Watch}; -pub use self::discovery::Bind; +use self::destination::{Resolver, Resolution}; +pub use self::destination::Bind; pub use self::observe::Observe; #[derive(Clone)] pub struct Control { - disco: Discovery, + disco: Resolver, } pub struct Background { - disco: DiscoBg, + disco: destination::background::Config, } pub fn new(dns_config: dns::Config, default_destination_namespace: String) -> (Control, Background) { - let (tx, rx) = self::discovery::new(dns_config, default_destination_namespace); + let (tx, rx) = self::destination::new(dns_config, default_destination_namespace); let c = Control { disco: tx, @@ -58,7 +58,7 @@ pub fn new(dns_config: dns::Config, default_destination_namespace: String) -> (C // ===== impl Control ===== impl Control { - pub fn resolve(&self, auth: &DnsNameAndPort, bind: B) -> Watch { + pub fn resolve(&self, auth: &DnsNameAndPort, bind: B) -> Resolution { self.disco.resolve(auth, bind) } } @@ -97,7 +97,7 @@ impl Background { AddOrigin::new(scheme, authority, backoff) }; - let mut disco = self.disco.work(executor); + let mut disco = self.disco.process(executor); let fut = future::poll_fn(move || { disco.poll_rpc(&mut client); diff --git a/proxy/src/ctx/http.rs b/proxy/src/ctx/http.rs index f72754c0e..80ff43710 100644 --- a/proxy/src/ctx/http.rs +++ b/proxy/src/ctx/http.rs @@ -1,7 +1,7 @@ use http; use std::sync::Arc; -use control::discovery::DstLabelsWatch; +use control::destination::DstLabelsWatch; use ctx; diff --git a/proxy/src/ctx/transport.rs b/proxy/src/ctx/transport.rs index 2cd225c6c..47055c166 100644 --- a/proxy/src/ctx/transport.rs +++ b/proxy/src/ctx/transport.rs @@ -2,7 +2,7 @@ use std::{cmp, hash}; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; -use control::discovery::DstLabelsWatch; +use control::destination::DstLabelsWatch; use ctx; #[derive(Clone, Debug, PartialEq, Eq, Hash)] diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index f4cc67346..b1270123a 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -14,8 +14,8 @@ use tower_h2; use conduit_proxy_router::Recognize; use bind::{self, Bind, Protocol}; -use control::{self, discovery}; -use control::discovery::Bind as BindTrait; +use control; +use control::destination::{Bind as BindTrait, Resolution}; use ctx; use timeout::Timeout; use transparency::h1; @@ -170,7 +170,7 @@ where } pub enum Discovery { - NamedSvc(discovery::Watch>), + NamedSvc(Resolution>), ImplicitOriginalDst(Option<(SocketAddr, BindProtocol)>), }