rfc: proxy: Split `control::discovery` into submodules (#955)

While preparing #946, I was again struck by the `discovery` module being very weighty
(nearly 800 dense lines). The intent of this change is only to improve readability. There
are no functional changes. The following aesthetic changes have been made:

* `control::discovery` has been renamed to `control::destination` to be more consistent
  with the rest of conduit's terminology (destinations aren't the only thing that need to
  be discovered).
* In that vein, the `Discovery` type has been renamed `Resolver` (since it exposes one
  function, `resolve`).
* The `Watch` type has been renamed `Resolution`. This disambiguates the type form
  `futures_watch::Watch`(which is used in the same code) and makes it more clearly the
  product of a `Resolver`.
* The `Background` and `DiscoveryWork` names were very opaque.  `Background` is now
  `background::Config` to indicate that it can't actually _do_ anything; and
  `DiscoveryWork` is now `background::Process` to indicate that it's responsible for
  processing destination updates.
* `DestinationSet` is now a private implementation detail in the `background` module.
* An internal `ResolveRequest` type replaces an unnamed tuple (now that it's used across
  files).
* `rustfmt` has been run on `background.rs` and `endpoint.rs`
This commit is contained in:
Oliver Gould 2018-05-15 17:23:01 -07:00 committed by GitHub
parent 86a75907ca
commit b23ed6e651
8 changed files with 445 additions and 363 deletions

View File

@ -13,7 +13,7 @@ use tower_h2;
use tower_reconnect::Reconnect; use tower_reconnect::Reconnect;
use control; use control;
use control::discovery::Endpoint; use control::destination::Endpoint;
use ctx; use ctx;
use telemetry::{self, sensor}; use telemetry::{self, sensor};
use transparency::{self, HttpBody, h1}; use transparency::{self, HttpBody, h1};
@ -253,7 +253,7 @@ impl<C, B> Bind<C, B> {
} }
} }
impl<B> control::discovery::Bind for BindProtocol<Arc<ctx::Proxy>, B> impl<B> control::destination::Bind for BindProtocol<Arc<ctx::Proxy>, B>
where where
B: tower_h2::Body + 'static, B: tower_h2::Body + 'static,
{ {

View File

@ -1,78 +1,48 @@
use std::{cmp, fmt, hash}; use futures::sync::mpsc;
use std::collections::VecDeque; use futures::{Async, Future, Stream};
use std::collections::hash_map::{Entry, HashMap}; use std::collections::{
hash_map::{Entry, HashMap},
VecDeque,
};
use std::fmt;
use std::iter::IntoIterator; use std::iter::IntoIterator;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Duration; use std::time::Duration;
use futures::{Async, Future, Poll, Stream};
use futures::sync::mpsc;
use futures_watch;
use http;
use tokio_core::reactor::Handle; use tokio_core::reactor::Handle;
use tower_service::Service;
use tower_h2::{HttpService, BoxBody, RecvBody};
use tower_discover::{Change, Discover};
use tower_grpc as grpc; use tower_grpc as grpc;
use tower_h2::{BoxBody, HttpService, RecvBody};
use dns::{self, IpAddrListFuture};
use super::fully_qualified_authority::FullyQualifiedAuthority;
use conduit_proxy_controller_grpc::common::{Destination, TcpAddress}; use conduit_proxy_controller_grpc::common::{Destination, TcpAddress};
use conduit_proxy_controller_grpc::destination::{ use conduit_proxy_controller_grpc::destination::client::Destination as DestinationSvc;
Update as PbUpdate,
WeightedAddr,
};
use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2; use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2;
use conduit_proxy_controller_grpc::destination::client::{Destination as DestinationSvc}; use conduit_proxy_controller_grpc::destination::{Update as PbUpdate, WeightedAddr};
use transport::DnsNameAndPort;
use super::{Metadata, ResolveRequest, Update};
use control::cache::{Cache, CacheChange, Exists}; use control::cache::{Cache, CacheChange, Exists};
use control::remote_stream::{Remote, Receiver}; use control::fully_qualified_authority::FullyQualifiedAuthority;
use control::remote_stream::{Receiver, Remote};
use ::telemetry::metrics::DstLabels; use dns::{self, IpAddrListFuture};
use telemetry::metrics::DstLabels;
/// A handle to start watching a destination for address changes. use transport::DnsNameAndPort;
#[derive(Clone, Debug)]
pub struct Discovery {
tx: mpsc::UnboundedSender<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>,
}
#[derive(Clone, Debug)]
pub struct Endpoint {
address: SocketAddr,
dst_labels: Option<DstLabelsWatch>,
}
pub type DstLabelsWatch = futures_watch::Watch<Option<DstLabels>>;
type DestinationServiceQuery<T> = Remote<PbUpdate, T>; type DestinationServiceQuery<T> = Remote<PbUpdate, T>;
type UpdateRx<T> = Receiver<PbUpdate, T>; type UpdateRx<T> = Receiver<PbUpdate, T>;
/// A `tower_discover::Discover`, given to a `tower_balance::Balance`. /// Stores the configuration for a destination background worker.
#[derive(Debug)] #[derive(Debug)]
pub struct Watch<B> { pub struct Config {
rx: mpsc::UnboundedReceiver<Update>, request_rx: mpsc::UnboundedReceiver<ResolveRequest>,
/// Map associating addresses with the `Store` for the watch on that
/// service's metric labels (as provided by the Destination service).
///
/// This is used to update the `Labeled` middleware on those services
/// without requiring the service stack to be re-bound.
metric_labels: HashMap<SocketAddr, futures_watch::Store<Option<DstLabels>>>,
bind: B,
}
/// A background handle to eventually bind on the controller thread.
#[derive(Debug)]
pub struct Background {
rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>,
dns_config: dns::Config, dns_config: dns::Config,
default_destination_namespace: String, default_destination_namespace: String,
} }
/// A future returned from `Background::work()`, doing the work of talking to /// Satisfies resolutions as requested via `request_rx`.
/// the controller destination API. ///
pub struct DiscoveryWork<T: HttpService<ResponseBody = RecvBody>> { /// As `Process` is polled with a client to Destination service, if the client to the
/// service is healthy, it reads requests from `request_rx`, determines how to resolve the
/// provided authority to a set of addresses, and ensures that resolution updates are
/// propagated to all requesters.
pub struct Process<T: HttpService<ResponseBody = RecvBody>> {
dns_resolver: dns::Resolver, dns_resolver: dns::Resolver,
default_destination_namespace: String, default_destination_namespace: String,
destinations: HashMap<DnsNameAndPort, DestinationSet<T>>, destinations: HashMap<DnsNameAndPort, DestinationSet<T>>,
@ -82,16 +52,10 @@ pub struct DiscoveryWork<T: HttpService<ResponseBody = RecvBody>> {
/// Each poll, records whether the rpc service was till ready. /// Each poll, records whether the rpc service was till ready.
rpc_ready: bool, rpc_ready: bool,
/// A receiver of new watch requests. /// A receiver of new watch requests.
rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>, request_rx: mpsc::UnboundedReceiver<ResolveRequest>,
}
/// Any additional metadata describing a discovered service.
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
struct Metadata {
/// A set of Prometheus metric labels describing the destination.
metric_labels: Option<DstLabels>,
} }
/// Holds the state of a single resolution.
struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> { struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
addrs: Exists<Cache<SocketAddr, Metadata>>, addrs: Exists<Cache<SocketAddr, Metadata>>,
query: Option<DestinationServiceQuery<T>>, query: Option<DestinationServiceQuery<T>>,
@ -99,211 +63,41 @@ struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
txs: Vec<mpsc::UnboundedSender<Update>>, txs: Vec<mpsc::UnboundedSender<Update>>,
} }
#[derive(Debug, Clone)] // ==== impl Config =====
enum Update {
Insert(SocketAddr, Metadata),
Remove(SocketAddr),
ChangeMetadata(SocketAddr, Metadata),
}
/// Bind a `SocketAddr` with a protocol. impl Config {
pub trait Bind { pub(super) fn new(
/// The type of endpoint upon which a `Service` is bound. request_rx: mpsc::UnboundedReceiver<ResolveRequest>,
type Endpoint; dns_config: dns::Config,
default_destination_namespace: String,
/// Requests handled by the discovered services ) -> Self {
type Request; Self {
request_rx,
/// Responses given by the discovered services
type Response;
/// Errors produced by the discovered services
type Error;
type BindError;
/// The discovered `Service` instance.
type Service: Service<Request = Self::Request, Response = Self::Response, Error = Self::Error>;
/// Bind a service from an endpoint.
fn bind(&self, addr: &Self::Endpoint) -> Result<Self::Service, Self::BindError>;
}
/// Creates a "channel" of `Discovery` to `Background` handles.
///
/// The `Discovery` is used by a listener, the `Background` is consumed
/// on the controller thread.
pub fn new(dns_config: dns::Config, default_destination_namespace: String) -> (Discovery, Background) {
let (tx, rx) = mpsc::unbounded();
(
Discovery {
tx,
},
Background {
rx,
dns_config, dns_config,
default_destination_namespace, default_destination_namespace,
},
)
}
// ==== impl Discovery =====
impl Discovery {
/// Start watching for address changes for a certain authority.
pub fn resolve<B>(&self, authority: &DnsNameAndPort, bind: B) -> Watch<B> {
trace!("resolve; authority={:?}", authority);
let (tx, rx) = mpsc::unbounded();
self.tx
.unbounded_send((authority.clone(), tx))
.expect("unbounded can't fail");
Watch {
rx,
metric_labels: HashMap::new(),
bind,
}
}
}
// ==== impl Endpoint =====
impl Endpoint {
pub fn new(address: SocketAddr, dst_labels: DstLabelsWatch) -> Self {
Self {
address,
dst_labels: Some(dst_labels),
} }
} }
pub fn address(&self) -> SocketAddr {
self.address
}
pub fn dst_labels(&self) -> Option<&DstLabelsWatch> {
self.dst_labels.as_ref()
}
}
impl From<SocketAddr> for Endpoint {
fn from(address: SocketAddr) -> Self {
Self {
address,
dst_labels: None,
}
}
}
impl hash::Hash for Endpoint {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
self.address.hash(state)
}
}
impl cmp::PartialEq for Endpoint {
fn eq(&self, other: &Self) -> bool {
self.address.eq(&other.address)
}
}
impl cmp::Eq for Endpoint {}
// ==== impl Watch =====
impl<B> Watch<B> {
fn update_metadata(&mut self, addr: SocketAddr, meta: Metadata) -> Result<(), ()> {
if let Some(store) = self.metric_labels.get_mut(&addr) {
store.store(meta.metric_labels)
.map_err(|e| {
error!("update_metadata: label store error: {:?}", e);
})
.map(|_| ())
} else {
// The store has already been removed, so nobody cares about
// the metadata change. We expect that this shouldn't happen,
// but if it does, log a warning and handle it gracefully.
warn!(
"update_metadata: ignoring ChangeMetadata for {:?} \
because the service no longer exists.",
addr
);
Ok(())
}
}
}
impl<B, A> Discover for Watch<B>
where
B: Bind<Endpoint = Endpoint, Request = http::Request<A>>,
{
type Key = SocketAddr;
type Request = B::Request;
type Response = B::Response;
type Error = B::Error;
type Service = B::Service;
type DiscoverError = ();
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
loop {
let up = self.rx.poll();
trace!("watch: {:?}", up);
let update = try_ready!(up).expect("discovery stream must be infinite");
match update {
Update::Insert(addr, meta) => {
// Construct a watch for the `Labeled` middleware that will
// wrap the bound service, and insert the store into our map
// so it can be updated later.
let (labels_watch, labels_store) =
futures_watch::Watch::new(meta.metric_labels);
self.metric_labels.insert(addr, labels_store);
let endpoint = Endpoint::new(addr, labels_watch.clone());
let service = self.bind.bind(&endpoint)
.map_err(|_| ())?;
return Ok(Async::Ready(Change::Insert(addr, service)))
},
Update::ChangeMetadata(addr, meta) => {
// Update metadata and continue polling `rx`.
self.update_metadata(addr, meta)?;
},
Update::Remove(addr) => {
// It's safe to drop the store handle here, even if
// the `Labeled` middleware using the watch handle
// still exists --- it will simply read the final
// value from the watch.
self.metric_labels.remove(&addr);
return Ok(Async::Ready(Change::Remove(addr)));
},
}
}
}
}
// ==== impl Background =====
impl Background {
/// Bind this handle to start talking to the controller API. /// Bind this handle to start talking to the controller API.
pub fn work<T>(self, executor: &Handle) -> DiscoveryWork<T> pub fn process<T>(self, executor: &Handle) -> Process<T>
where T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>, where
T::Error: fmt::Debug, T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
T::Error: fmt::Debug,
{ {
DiscoveryWork { Process {
dns_resolver: dns::Resolver::new(self.dns_config, executor), dns_resolver: dns::Resolver::new(self.dns_config, executor),
default_destination_namespace: self.default_destination_namespace, default_destination_namespace: self.default_destination_namespace,
destinations: HashMap::new(), destinations: HashMap::new(),
reconnects: VecDeque::new(), reconnects: VecDeque::new(),
rpc_ready: false, rpc_ready: false,
rx: self.rx, request_rx: self.request_rx,
} }
} }
} }
// ==== impl DiscoveryWork ===== // ==== impl Process =====
impl<T> DiscoveryWork<T> impl<T> Process<T>
where where
T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>, T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
T::Error: fmt::Debug, T::Error: fmt::Debug,
@ -328,16 +122,16 @@ where
match client.poll_ready() { match client.poll_ready() {
Ok(Async::Ready(())) => { Ok(Async::Ready(())) => {
self.rpc_ready = true; self.rpc_ready = true;
} },
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
self.rpc_ready = false; self.rpc_ready = false;
break; break;
} },
Err(err) => { Err(err) => {
warn!("Destination.Get poll_ready error: {:?}", err); warn!("Destination.Get poll_ready error: {:?}", err);
self.rpc_ready = false; self.rpc_ready = false;
break; break;
} },
} }
// handle any pending reconnects first // handle any pending reconnects first
@ -346,40 +140,40 @@ where
} }
// check for any new watches // check for any new watches
match self.rx.poll() { match self.request_rx.poll() {
Ok(Async::Ready(Some((auth, tx)))) => { Ok(Async::Ready(Some(ResolveRequest {
trace!("Destination.Get {:?}", auth); authority,
match self.destinations.entry(auth) { update_tx,
}))) => {
trace!("Destination.Get {:?}", authority);
match self.destinations.entry(authority) {
Entry::Occupied(mut occ) => { Entry::Occupied(mut occ) => {
let set = occ.get_mut(); let set = occ.get_mut();
// we may already know of some addresses here, so push // we may already know of some addresses here, so push
// them onto the new watch first // them onto the new watch first
match set.addrs { match set.addrs {
Exists::Yes(ref cache) => { Exists::Yes(ref cache) => for (&addr, meta) in cache {
for (&addr, meta) in cache { let update = Update::Insert(addr, meta.clone());
let update = Update::Insert( update_tx
addr, .unbounded_send(update)
meta.clone() .expect("unbounded_send does not fail");
);
tx.unbounded_send(update)
.expect("unbounded_send does not fail");
}
}, },
Exists::No | Exists::Unknown => (), Exists::No | Exists::Unknown => (),
} }
set.txs.push(tx); set.txs.push(update_tx);
} },
Entry::Vacant(vac) => { Entry::Vacant(vac) => {
let query = Self::query_destination_service_if_relevant( let query = Self::query_destination_service_if_relevant(
&self.default_destination_namespace, &self.default_destination_namespace,
client, client,
vac.key(), vac.key(),
"connect"); "connect",
);
let mut set = DestinationSet { let mut set = DestinationSet {
addrs: Exists::Unknown, addrs: Exists::Unknown,
query, query,
dns_query: None, dns_query: None,
txs: vec![tx], txs: vec![update_tx],
}; };
// If the authority is one for which the Destination service is never // If the authority is one for which the Destination service is never
// relevant (e.g. an absolute name that doesn't end in ".svc.$zone." in // relevant (e.g. an absolute name that doesn't end in ".svc.$zone." in
@ -388,16 +182,17 @@ where
set.reset_dns_query( set.reset_dns_query(
&self.dns_resolver, &self.dns_resolver,
Duration::from_secs(0), Duration::from_secs(0),
vac.key()); vac.key(),
);
} }
vac.insert(set); vac.insert(set);
} },
} }
} },
Ok(Async::Ready(None)) => { Ok(Async::Ready(None)) => {
trace!("Discover tx is dropped, shutdown?"); trace!("Discover tx is dropped, shutdown?");
return; return;
} },
Ok(Async::NotReady) => break, Ok(Async::NotReady) => break,
Err(_) => unreachable!("unbounded receiver doesn't error"), Err(_) => unreachable!("unbounded receiver doesn't error"),
} }
@ -414,7 +209,8 @@ where
&self.default_destination_namespace, &self.default_destination_namespace,
client, client,
&auth, &auth,
"reconnect"); "reconnect",
);
return true; return true;
} else { } else {
trace!("reconnect no longer needed: {:?}", auth); trace!("reconnect no longer needed: {:?}", auth);
@ -427,7 +223,7 @@ where
for (auth, set) in &mut self.destinations { for (auth, set) in &mut self.destinations {
// Query the Destination service first. // Query the Destination service first.
let (new_query, found_by_destination_service) = match set.query.take() { let (new_query, found_by_destination_service) = match set.query.take() {
Some(Remote::ConnectedOrConnecting{ rx }) => { Some(Remote::ConnectedOrConnecting { rx }) => {
let (new_query, found_by_destination_service) = let (new_query, found_by_destination_service) =
set.poll_destination_service(auth, rx); set.poll_destination_service(auth, rx);
if let Remote::NeedsReconnect = new_query { if let Remote::NeedsReconnect = new_query {
@ -473,36 +269,45 @@ where
default_destination_namespace: &str, default_destination_namespace: &str,
client: &mut T, client: &mut T,
auth: &DnsNameAndPort, auth: &DnsNameAndPort,
connect_or_reconnect: &str) connect_or_reconnect: &str,
-> Option<DestinationServiceQuery<T>> ) -> Option<DestinationServiceQuery<T>> {
{ trace!(
trace!("DestinationServiceQuery {} {:?}", connect_or_reconnect, auth); "DestinationServiceQuery {} {:?}",
FullyQualifiedAuthority::normalize(auth, default_destination_namespace) connect_or_reconnect,
.map(|auth| { auth
let req = Destination { );
scheme: "k8s".into(), FullyQualifiedAuthority::normalize(auth, default_destination_namespace).map(|auth| {
path: auth.without_trailing_dot().to_owned(), let req = Destination {
}; scheme: "k8s".into(),
let mut svc = DestinationSvc::new(client.lift_ref()); path: auth.without_trailing_dot().to_owned(),
let response = svc.get(grpc::Request::new(req)); };
Remote::ConnectedOrConnecting { rx: Receiver::new(response) } let mut svc = DestinationSvc::new(client.lift_ref());
}) let response = svc.get(grpc::Request::new(req));
Remote::ConnectedOrConnecting {
rx: Receiver::new(response),
}
})
} }
} }
// ===== impl DestinationSet ===== // ===== impl DestinationSet =====
impl<T> DestinationSet<T> impl<T> DestinationSet<T>
where T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>, where
T::Error: fmt::Debug T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
T::Error: fmt::Debug,
{ {
fn reset_dns_query( fn reset_dns_query(
&mut self, &mut self,
dns_resolver: &dns::Resolver, dns_resolver: &dns::Resolver,
delay: Duration, delay: Duration,
authority: &DnsNameAndPort) authority: &DnsNameAndPort,
{ ) {
trace!("resetting DNS query for {} with delay {:?}", authority.host, delay); trace!(
"resetting DNS query for {} with delay {:?}",
authority.host,
delay
);
self.reset_on_next_modification(); self.reset_on_next_modification();
self.dns_query = Some(dns_resolver.resolve_all_ips(delay, &authority.host)); self.dns_query = Some(dns_resolver.resolve_all_ips(delay, &authority.host));
} }
@ -514,9 +319,8 @@ impl<T> DestinationSet<T>
fn poll_destination_service( fn poll_destination_service(
&mut self, &mut self,
auth: &DnsNameAndPort, auth: &DnsNameAndPort,
mut rx: UpdateRx<T>) mut rx: UpdateRx<T>,
-> (DestinationServiceQuery<T>, Exists<()>) ) -> (DestinationServiceQuery<T>, Exists<()>) {
{
let mut exists = Exists::Unknown; let mut exists = Exists::Unknown;
loop { loop {
@ -524,7 +328,9 @@ impl<T> DestinationSet<T>
Ok(Async::Ready(Some(update))) => match update.update { Ok(Async::Ready(Some(update))) => match update.update {
Some(PbUpdate2::Add(a_set)) => { Some(PbUpdate2::Add(a_set)) => {
let set_labels = a_set.metric_labels; let set_labels = a_set.metric_labels;
let addrs = a_set.addrs.into_iter() let addrs = a_set
.addrs
.into_iter()
.filter_map(|pb| pb_to_addr_meta(pb, &set_labels)); .filter_map(|pb| pb_to_addr_meta(pb, &set_labels));
self.add(auth, addrs) self.add(auth, addrs)
}, },
@ -532,7 +338,10 @@ impl<T> DestinationSet<T>
exists = Exists::Yes(()); exists = Exists::Yes(());
self.remove( self.remove(
auth, auth,
r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone())) r_set
.addrs
.iter()
.filter_map(|addr| pb_to_sock_addr(addr.clone())),
); );
}, },
Some(PbUpdate2::NoEndpoints(ref no_endpoints)) if no_endpoints.exists => { Some(PbUpdate2::NoEndpoints(ref no_endpoints)) if no_endpoints.exists => {
@ -558,7 +367,7 @@ impl<T> DestinationSet<T>
Err(err) => { Err(err) => {
warn!("Destination.Get stream errored for {:?}: {:?}", auth, err); warn!("Destination.Get stream errored for {:?}: {:?}", auth, err);
return (Remote::NeedsReconnect, exists); return (Remote::NeedsReconnect, exists);
} },
}; };
} }
} }
@ -574,13 +383,26 @@ impl<T> DestinationSet<T>
return; return;
}, },
Ok(Async::Ready(dns::Response::Exists(ips))) => { Ok(Async::Ready(dns::Response::Exists(ips))) => {
trace!("positive result of DNS query for {:?}: {:?}", authority, ips); trace!(
self.add(authority, ips.iter().map(|ip| { "positive result of DNS query for {:?}: {:?}",
(SocketAddr::from((ip, authority.port)), Metadata::no_metadata()) authority,
})); ips
);
self.add(
authority,
ips.iter().map(|ip| {
(
SocketAddr::from((ip, authority.port)),
Metadata::no_metadata(),
)
}),
);
}, },
Ok(Async::Ready(dns::Response::DoesNotExist)) => { Ok(Async::Ready(dns::Response::DoesNotExist)) => {
trace!("negative result (NXDOMAIN) of DNS query for {:?}", authority); trace!(
"negative result (NXDOMAIN) of DNS query for {:?}",
authority
);
self.no_endpoints(authority, false); self.no_endpoints(authority, false);
}, },
Err(e) => { Err(e) => {
@ -596,38 +418,39 @@ impl<T> DestinationSet<T>
} }
} }
impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> { impl<T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
fn reset_on_next_modification(&mut self) { fn reset_on_next_modification(&mut self) {
match self.addrs { match self.addrs {
Exists::Yes(ref mut cache) => { Exists::Yes(ref mut cache) => {
cache.set_reset_on_next_modification(); cache.set_reset_on_next_modification();
}, },
Exists::No | Exists::No | Exists::Unknown => (),
Exists::Unknown => (),
} }
} }
fn add<A>(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_add: A) fn add<A>(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_add: A)
where A: Iterator<Item = (SocketAddr, Metadata)> where
A: Iterator<Item = (SocketAddr, Metadata)>,
{ {
let mut cache = match self.addrs.take() { let mut cache = match self.addrs.take() {
Exists::Yes(mut cache) => cache, Exists::Yes(mut cache) => cache,
Exists::Unknown | Exists::No => Cache::new(), Exists::Unknown | Exists::No => Cache::new(),
}; };
cache.update_union( cache.update_union(addrs_to_add, &mut |change| {
addrs_to_add, Self::on_change(&mut self.txs, authority_for_logging, change)
&mut |change| Self::on_change(&mut self.txs, authority_for_logging, change)); });
self.addrs = Exists::Yes(cache); self.addrs = Exists::Yes(cache);
} }
fn remove<A>(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_remove: A) fn remove<A>(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_remove: A)
where A: Iterator<Item = SocketAddr> where
A: Iterator<Item = SocketAddr>,
{ {
let cache = match self.addrs.take() { let cache = match self.addrs.take() {
Exists::Yes(mut cache) => { Exists::Yes(mut cache) => {
cache.remove( cache.remove(addrs_to_remove, &mut |change| {
addrs_to_remove, Self::on_change(&mut self.txs, authority_for_logging, change)
&mut |change| Self::on_change(&mut self.txs, authority_for_logging, change)); });
cache cache
}, },
Exists::Unknown | Exists::No => Cache::new(), Exists::Unknown | Exists::No => Cache::new(),
@ -636,12 +459,16 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
} }
fn no_endpoints(&mut self, authority_for_logging: &DnsNameAndPort, exists: bool) { fn no_endpoints(&mut self, authority_for_logging: &DnsNameAndPort, exists: bool) {
trace!("no endpoints for {:?} that is known to {}", authority_for_logging, trace!(
if exists { "exist" } else { "not exist" }); "no endpoints for {:?} that is known to {}",
authority_for_logging,
if exists { "exist" } else { "not exist" }
);
match self.addrs.take() { match self.addrs.take() {
Exists::Yes(mut cache) => { Exists::Yes(mut cache) => {
cache.clear( cache.clear(&mut |change| {
&mut |change| Self::on_change(&mut self.txs, authority_for_logging, change)); Self::on_change(&mut self.txs, authority_for_logging, change)
});
}, },
Exists::Unknown | Exists::No => (), Exists::Unknown | Exists::No => (),
}; };
@ -652,38 +479,33 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
}; };
} }
fn on_change(txs: &mut Vec<mpsc::UnboundedSender<Update>>, fn on_change(
authority_for_logging: &DnsNameAndPort, txs: &mut Vec<mpsc::UnboundedSender<Update>>,
change: CacheChange<SocketAddr, Metadata>) { authority_for_logging: &DnsNameAndPort,
change: CacheChange<SocketAddr, Metadata>,
) {
let (update_str, update, addr) = match change { let (update_str, update, addr) = match change {
CacheChange::Insertion { key, value } => CacheChange::Insertion { key, value } => {
("insert", Update::Insert(key, value.clone()), key), ("insert", Update::Insert(key, value.clone()), key)
CacheChange::Removal { key } => },
("remove", Update::Remove(key), key), CacheChange::Removal { key } => ("remove", Update::Remove(key), key),
CacheChange::Modification { key, new_value } => CacheChange::Modification { key, new_value } => (
("change metadata for", Update::ChangeMetadata(key, new_value.clone()), key), "change metadata for",
Update::ChangeMetadata(key, new_value.clone()),
key,
),
}; };
trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging); trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging);
// retain is used to drop any senders that are dead // retain is used to drop any senders that are dead
txs.retain(|tx| { txs.retain(|tx| tx.unbounded_send(update.clone()).is_ok());
tx.unbounded_send(update.clone()).is_ok()
});
}
}
// ===== impl Metadata =====
impl Metadata {
fn no_metadata() -> Self {
Metadata {
metric_labels: None,
}
} }
} }
/// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`. /// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`.
fn pb_to_addr_meta(pb: WeightedAddr, set_labels: &HashMap<String, String>) fn pb_to_addr_meta(
-> Option<(SocketAddr, Metadata)> { pb: WeightedAddr,
set_labels: &HashMap<String, String>,
) -> Option<(SocketAddr, Metadata)> {
let addr = pb.addr.and_then(pb_to_sock_addr)?; let addr = pb.addr.and_then(pb_to_sock_addr)?;
let label_iter = set_labels.iter().chain(pb.metric_labels.iter()); let label_iter = set_labels.iter().chain(pb.metric_labels.iter());
let meta = Metadata { let meta = Metadata {
@ -715,7 +537,7 @@ fn pb_to_sock_addr(pb: TcpAddress) -> Option<SocketAddr> {
Some(Ip::Ipv4(octets)) => { Some(Ip::Ipv4(octets)) => {
let ipv4 = Ipv4Addr::from(octets); let ipv4 = Ipv4Addr::from(octets);
Some(SocketAddr::from((ipv4, pb.port as u16))) Some(SocketAddr::from((ipv4, pb.port as u16)))
} },
Some(Ip::Ipv6(v6)) => { Some(Ip::Ipv6(v6)) => {
let octets = [ let octets = [
(v6.first >> 56) as u8, (v6.first >> 56) as u8,
@ -737,7 +559,7 @@ fn pb_to_sock_addr(pb: TcpAddress) -> Option<SocketAddr> {
]; ];
let ipv6 = Ipv6Addr::from(octets); let ipv6 = Ipv6Addr::from(octets);
Some(SocketAddr::from((ipv6, pb.port as u16))) Some(SocketAddr::from((ipv6, pb.port as u16)))
} },
None => None, None => None,
}, },
None => None, None => None,

View File

@ -0,0 +1,57 @@
use futures_watch;
use std::{cmp, hash, net::SocketAddr};
use telemetry::metrics::DstLabels;
pub type DstLabelsWatch = futures_watch::Watch<Option<DstLabels>>;
/// An individual traffic target.
///
/// Equality, Ordering, and hashability is determined soley by the Endpoint's address.
#[derive(Clone, Debug)]
pub struct Endpoint {
address: SocketAddr,
dst_labels: Option<DstLabelsWatch>,
}
// ==== impl Endpoint =====
impl Endpoint {
pub fn new(address: SocketAddr, dst_labels: DstLabelsWatch) -> Self {
Self {
address,
dst_labels: Some(dst_labels),
}
}
pub fn address(&self) -> SocketAddr {
self.address
}
pub fn dst_labels(&self) -> Option<&DstLabelsWatch> {
self.dst_labels.as_ref()
}
}
impl From<SocketAddr> for Endpoint {
fn from(address: SocketAddr) -> Self {
Self {
address,
dst_labels: None,
}
}
}
impl hash::Hash for Endpoint {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
self.address.hash(state)
}
}
impl cmp::PartialEq for Endpoint {
fn eq(&self, other: &Self) -> bool {
self.address.eq(&other.address)
}
}
impl cmp::Eq for Endpoint {}

View File

@ -0,0 +1,203 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use futures::sync::mpsc;
use futures::{Async, Poll, Stream};
use futures_watch::{Store, Watch};
use http;
use tower_discover::{Change, Discover};
use tower_service::Service;
use dns;
use telemetry::metrics::DstLabels;
use transport::DnsNameAndPort;
pub mod background;
mod endpoint;
pub use self::endpoint::{DstLabelsWatch, Endpoint};
/// A handle to request resolutions from a `Background`.
#[derive(Clone, Debug)]
pub struct Resolver {
request_tx: mpsc::UnboundedSender<ResolveRequest>,
}
#[derive(Debug)]
struct ResolveRequest {
authority: DnsNameAndPort,
update_tx: mpsc::UnboundedSender<Update>,
}
/// A `tower_discover::Discover`, given to a `tower_balance::Balance`.
#[derive(Debug)]
pub struct Resolution<B> {
update_rx: mpsc::UnboundedReceiver<Update>,
/// Map associating addresses with the `Store` for the watch on that
/// service's metric labels (as provided by the Destination service).
///
/// This is used to update the `Labeled` middleware on those services
/// without requiring the service stack to be re-bound.
metric_labels: HashMap<SocketAddr, Store<Option<DstLabels>>>,
bind: B,
}
/// .
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
struct Metadata {
/// A set of Prometheus metric labels describing the destination.
metric_labels: Option<DstLabels>,
}
#[derive(Debug, Clone)]
enum Update {
Insert(SocketAddr, Metadata),
Remove(SocketAddr),
ChangeMetadata(SocketAddr, Metadata),
}
/// Bind a `SocketAddr` with a protocol.
pub trait Bind {
/// The type of endpoint upon which a `Service` is bound.
type Endpoint;
/// Requests handled by the discovered services
type Request;
/// Responses given by the discovered services
type Response;
/// Errors produced by the discovered services
type Error;
type BindError;
/// The discovered `Service` instance.
type Service: Service<Request = Self::Request, Response = Self::Response, Error = Self::Error>;
/// Bind a service from an endpoint.
fn bind(&self, addr: &Self::Endpoint) -> Result<Self::Service, Self::BindError>;
}
/// Creates a "channel" of `Resolver` to `Background` handles.
///
/// The `Resolver` is used by a listener, the `Background` is consumed
/// on the controller thread.
pub fn new(
dns_config: dns::Config,
default_destination_namespace: String,
) -> (Resolver, background::Config) {
let (request_tx, rx) = mpsc::unbounded();
let disco = Resolver { request_tx };
let bg = background::Config::new(rx, dns_config, default_destination_namespace);
(disco, bg)
}
// ==== impl Resolver =====
impl Resolver {
/// Start watching for address changes for a certain authority.
pub fn resolve<B>(&self, authority: &DnsNameAndPort, bind: B) -> Resolution<B> {
trace!("resolve; authority={:?}", authority);
let (update_tx, update_rx) = mpsc::unbounded();
let req = {
let authority = authority.clone();
ResolveRequest {
authority,
update_tx,
}
};
self.request_tx
.unbounded_send(req)
.expect("unbounded can't fail");
Resolution {
update_rx,
metric_labels: HashMap::new(),
bind,
}
}
}
// ==== impl Resolution =====
impl<B> Resolution<B> {
fn update_metadata(&mut self, addr: SocketAddr, meta: Metadata) -> Result<(), ()> {
if let Some(store) = self.metric_labels.get_mut(&addr) {
store
.store(meta.metric_labels)
.map_err(|e| {
error!("update_metadata: label store error: {:?}", e);
})
.map(|_| ())
} else {
// The store has already been removed, so nobody cares about
// the metadata change. We expect that this shouldn't happen,
// but if it does, log a warning and handle it gracefully.
warn!(
"update_metadata: ignoring ChangeMetadata for {:?} because the service no longer \
exists.",
addr
);
Ok(())
}
}
}
impl<B, A> Discover for Resolution<B>
where
B: Bind<Endpoint = Endpoint, Request = http::Request<A>>,
{
type Key = SocketAddr;
type Request = B::Request;
type Response = B::Response;
type Error = B::Error;
type Service = B::Service;
type DiscoverError = ();
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
loop {
let up = self.update_rx.poll();
trace!("watch: {:?}", up);
let update = try_ready!(up).expect("destination stream must be infinite");
match update {
Update::Insert(addr, meta) => {
// Construct a watch for the `Labeled` middleware that will
// wrap the bound service, and insert the store into our map
// so it can be updated later.
let (labels_watch, labels_store) = Watch::new(meta.metric_labels);
self.metric_labels.insert(addr, labels_store);
let endpoint = Endpoint::new(addr, labels_watch.clone());
let service = self.bind.bind(&endpoint).map_err(|_| ())?;
return Ok(Async::Ready(Change::Insert(addr, service)));
},
Update::ChangeMetadata(addr, meta) => {
// Update metadata and continue polling `rx`.
self.update_metadata(addr, meta)?;
},
Update::Remove(addr) => {
// It's safe to drop the store handle here, even if
// the `Labeled` middleware using the watch handle
// still exists --- it will simply read the final
// value from the watch.
self.metric_labels.remove(&addr);
return Ok(Async::Ready(Change::Remove(addr)));
},
}
}
}
}
// ===== impl Metadata =====
impl Metadata {
fn no_metadata() -> Self {
Metadata {
metric_labels: None,
}
}
}

View File

@ -21,28 +21,28 @@ use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect};
use timeout::{Timeout, TimeoutError}; use timeout::{Timeout, TimeoutError};
mod cache; mod cache;
pub mod discovery; pub mod destination;
mod fully_qualified_authority; mod fully_qualified_authority;
mod observe; mod observe;
pub mod pb; pub mod pb;
mod remote_stream; mod remote_stream;
use self::discovery::{Background as DiscoBg, Discovery, Watch}; use self::destination::{Resolver, Resolution};
pub use self::discovery::Bind; pub use self::destination::Bind;
pub use self::observe::Observe; pub use self::observe::Observe;
#[derive(Clone)] #[derive(Clone)]
pub struct Control { pub struct Control {
disco: Discovery, disco: Resolver,
} }
pub struct Background { pub struct Background {
disco: DiscoBg, disco: destination::background::Config,
} }
pub fn new(dns_config: dns::Config, default_destination_namespace: String) -> (Control, Background) pub fn new(dns_config: dns::Config, default_destination_namespace: String) -> (Control, Background)
{ {
let (tx, rx) = self::discovery::new(dns_config, default_destination_namespace); let (tx, rx) = self::destination::new(dns_config, default_destination_namespace);
let c = Control { let c = Control {
disco: tx, disco: tx,
@ -58,7 +58,7 @@ pub fn new(dns_config: dns::Config, default_destination_namespace: String) -> (C
// ===== impl Control ===== // ===== impl Control =====
impl Control { impl Control {
pub fn resolve<B>(&self, auth: &DnsNameAndPort, bind: B) -> Watch<B> { pub fn resolve<B>(&self, auth: &DnsNameAndPort, bind: B) -> Resolution<B> {
self.disco.resolve(auth, bind) self.disco.resolve(auth, bind)
} }
} }
@ -97,7 +97,7 @@ impl Background {
AddOrigin::new(scheme, authority, backoff) AddOrigin::new(scheme, authority, backoff)
}; };
let mut disco = self.disco.work(executor); let mut disco = self.disco.process(executor);
let fut = future::poll_fn(move || { let fut = future::poll_fn(move || {
disco.poll_rpc(&mut client); disco.poll_rpc(&mut client);

View File

@ -1,7 +1,7 @@
use http; use http;
use std::sync::Arc; use std::sync::Arc;
use control::discovery::DstLabelsWatch; use control::destination::DstLabelsWatch;
use ctx; use ctx;

View File

@ -2,7 +2,7 @@ use std::{cmp, hash};
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use control::discovery::DstLabelsWatch; use control::destination::DstLabelsWatch;
use ctx; use ctx;
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]

View File

@ -14,8 +14,8 @@ use tower_h2;
use conduit_proxy_router::Recognize; use conduit_proxy_router::Recognize;
use bind::{self, Bind, Protocol}; use bind::{self, Bind, Protocol};
use control::{self, discovery}; use control;
use control::discovery::Bind as BindTrait; use control::destination::{Bind as BindTrait, Resolution};
use ctx; use ctx;
use timeout::Timeout; use timeout::Timeout;
use transparency::h1; use transparency::h1;
@ -170,7 +170,7 @@ where
} }
pub enum Discovery<B> { pub enum Discovery<B> {
NamedSvc(discovery::Watch<BindProtocol<B>>), NamedSvc(Resolution<BindProtocol<B>>),
ImplicitOriginalDst(Option<(SocketAddr, BindProtocol<B>)>), ImplicitOriginalDst(Option<(SocketAddr, BindProtocol<B>)>),
} }