mirror of https://github.com/linkerd/linkerd2.git
rfc: proxy: Split `control::discovery` into submodules (#955)
While preparing #946, I was again struck by the `discovery` module being very weighty (nearly 800 dense lines). The intent of this change is only to improve readability. There are no functional changes. The following aesthetic changes have been made: * `control::discovery` has been renamed to `control::destination` to be more consistent with the rest of conduit's terminology (destinations aren't the only thing that need to be discovered). * In that vein, the `Discovery` type has been renamed `Resolver` (since it exposes one function, `resolve`). * The `Watch` type has been renamed `Resolution`. This disambiguates the type form `futures_watch::Watch`(which is used in the same code) and makes it more clearly the product of a `Resolver`. * The `Background` and `DiscoveryWork` names were very opaque. `Background` is now `background::Config` to indicate that it can't actually _do_ anything; and `DiscoveryWork` is now `background::Process` to indicate that it's responsible for processing destination updates. * `DestinationSet` is now a private implementation detail in the `background` module. * An internal `ResolveRequest` type replaces an unnamed tuple (now that it's used across files). * `rustfmt` has been run on `background.rs` and `endpoint.rs`
This commit is contained in:
parent
b28193817c
commit
fbba6bf4c8
|
@ -13,7 +13,7 @@ use tower_h2;
|
|||
use tower_reconnect::Reconnect;
|
||||
|
||||
use control;
|
||||
use control::discovery::Endpoint;
|
||||
use control::destination::Endpoint;
|
||||
use ctx;
|
||||
use telemetry::{self, sensor};
|
||||
use transparency::{self, HttpBody, h1};
|
||||
|
@ -253,7 +253,7 @@ impl<C, B> Bind<C, B> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<B> control::discovery::Bind for BindProtocol<Arc<ctx::Proxy>, B>
|
||||
impl<B> control::destination::Bind for BindProtocol<Arc<ctx::Proxy>, B>
|
||||
where
|
||||
B: tower_h2::Body + 'static,
|
||||
{
|
||||
|
|
|
@ -1,78 +1,48 @@
|
|||
use std::{cmp, fmt, hash};
|
||||
use std::collections::VecDeque;
|
||||
use std::collections::hash_map::{Entry, HashMap};
|
||||
use futures::sync::mpsc;
|
||||
use futures::{Async, Future, Stream};
|
||||
use std::collections::{
|
||||
hash_map::{Entry, HashMap},
|
||||
VecDeque,
|
||||
};
|
||||
use std::fmt;
|
||||
use std::iter::IntoIterator;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use futures::sync::mpsc;
|
||||
use futures_watch;
|
||||
use http;
|
||||
use tokio_core::reactor::Handle;
|
||||
use tower_service::Service;
|
||||
use tower_h2::{HttpService, BoxBody, RecvBody};
|
||||
use tower_discover::{Change, Discover};
|
||||
use tower_grpc as grpc;
|
||||
|
||||
use dns::{self, IpAddrListFuture};
|
||||
use super::fully_qualified_authority::FullyQualifiedAuthority;
|
||||
use tower_h2::{BoxBody, HttpService, RecvBody};
|
||||
|
||||
use conduit_proxy_controller_grpc::common::{Destination, TcpAddress};
|
||||
use conduit_proxy_controller_grpc::destination::{
|
||||
Update as PbUpdate,
|
||||
WeightedAddr,
|
||||
};
|
||||
use conduit_proxy_controller_grpc::destination::client::Destination as DestinationSvc;
|
||||
use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2;
|
||||
use conduit_proxy_controller_grpc::destination::client::{Destination as DestinationSvc};
|
||||
use transport::DnsNameAndPort;
|
||||
use conduit_proxy_controller_grpc::destination::{Update as PbUpdate, WeightedAddr};
|
||||
|
||||
use super::{Metadata, ResolveRequest, Update};
|
||||
use control::cache::{Cache, CacheChange, Exists};
|
||||
use control::remote_stream::{Remote, Receiver};
|
||||
|
||||
use ::telemetry::metrics::DstLabels;
|
||||
|
||||
/// A handle to start watching a destination for address changes.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Discovery {
|
||||
tx: mpsc::UnboundedSender<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Endpoint {
|
||||
address: SocketAddr,
|
||||
dst_labels: Option<DstLabelsWatch>,
|
||||
}
|
||||
|
||||
pub type DstLabelsWatch = futures_watch::Watch<Option<DstLabels>>;
|
||||
use control::fully_qualified_authority::FullyQualifiedAuthority;
|
||||
use control::remote_stream::{Receiver, Remote};
|
||||
use dns::{self, IpAddrListFuture};
|
||||
use telemetry::metrics::DstLabels;
|
||||
use transport::DnsNameAndPort;
|
||||
|
||||
type DestinationServiceQuery<T> = Remote<PbUpdate, T>;
|
||||
type UpdateRx<T> = Receiver<PbUpdate, T>;
|
||||
|
||||
/// A `tower_discover::Discover`, given to a `tower_balance::Balance`.
|
||||
/// Stores the configuration for a destination background worker.
|
||||
#[derive(Debug)]
|
||||
pub struct Watch<B> {
|
||||
rx: mpsc::UnboundedReceiver<Update>,
|
||||
/// Map associating addresses with the `Store` for the watch on that
|
||||
/// service's metric labels (as provided by the Destination service).
|
||||
///
|
||||
/// This is used to update the `Labeled` middleware on those services
|
||||
/// without requiring the service stack to be re-bound.
|
||||
metric_labels: HashMap<SocketAddr, futures_watch::Store<Option<DstLabels>>>,
|
||||
bind: B,
|
||||
}
|
||||
|
||||
/// A background handle to eventually bind on the controller thread.
|
||||
#[derive(Debug)]
|
||||
pub struct Background {
|
||||
rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>,
|
||||
pub struct Config {
|
||||
request_rx: mpsc::UnboundedReceiver<ResolveRequest>,
|
||||
dns_config: dns::Config,
|
||||
default_destination_namespace: String,
|
||||
}
|
||||
|
||||
/// A future returned from `Background::work()`, doing the work of talking to
|
||||
/// the controller destination API.
|
||||
pub struct DiscoveryWork<T: HttpService<ResponseBody = RecvBody>> {
|
||||
/// Satisfies resolutions as requested via `request_rx`.
|
||||
///
|
||||
/// As `Process` is polled with a client to Destination service, if the client to the
|
||||
/// service is healthy, it reads requests from `request_rx`, determines how to resolve the
|
||||
/// provided authority to a set of addresses, and ensures that resolution updates are
|
||||
/// propagated to all requesters.
|
||||
pub struct Process<T: HttpService<ResponseBody = RecvBody>> {
|
||||
dns_resolver: dns::Resolver,
|
||||
default_destination_namespace: String,
|
||||
destinations: HashMap<DnsNameAndPort, DestinationSet<T>>,
|
||||
|
@ -82,16 +52,10 @@ pub struct DiscoveryWork<T: HttpService<ResponseBody = RecvBody>> {
|
|||
/// Each poll, records whether the rpc service was till ready.
|
||||
rpc_ready: bool,
|
||||
/// A receiver of new watch requests.
|
||||
rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>,
|
||||
}
|
||||
|
||||
/// Any additional metadata describing a discovered service.
|
||||
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
|
||||
struct Metadata {
|
||||
/// A set of Prometheus metric labels describing the destination.
|
||||
metric_labels: Option<DstLabels>,
|
||||
request_rx: mpsc::UnboundedReceiver<ResolveRequest>,
|
||||
}
|
||||
|
||||
/// Holds the state of a single resolution.
|
||||
struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
|
||||
addrs: Exists<Cache<SocketAddr, Metadata>>,
|
||||
query: Option<DestinationServiceQuery<T>>,
|
||||
|
@ -99,211 +63,41 @@ struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
|
|||
txs: Vec<mpsc::UnboundedSender<Update>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum Update {
|
||||
Insert(SocketAddr, Metadata),
|
||||
Remove(SocketAddr),
|
||||
ChangeMetadata(SocketAddr, Metadata),
|
||||
}
|
||||
// ==== impl Config =====
|
||||
|
||||
/// Bind a `SocketAddr` with a protocol.
|
||||
pub trait Bind {
|
||||
/// The type of endpoint upon which a `Service` is bound.
|
||||
type Endpoint;
|
||||
|
||||
/// Requests handled by the discovered services
|
||||
type Request;
|
||||
|
||||
/// Responses given by the discovered services
|
||||
type Response;
|
||||
|
||||
/// Errors produced by the discovered services
|
||||
type Error;
|
||||
|
||||
type BindError;
|
||||
|
||||
/// The discovered `Service` instance.
|
||||
type Service: Service<Request = Self::Request, Response = Self::Response, Error = Self::Error>;
|
||||
|
||||
/// Bind a service from an endpoint.
|
||||
fn bind(&self, addr: &Self::Endpoint) -> Result<Self::Service, Self::BindError>;
|
||||
}
|
||||
|
||||
/// Creates a "channel" of `Discovery` to `Background` handles.
|
||||
///
|
||||
/// The `Discovery` is used by a listener, the `Background` is consumed
|
||||
/// on the controller thread.
|
||||
pub fn new(dns_config: dns::Config, default_destination_namespace: String) -> (Discovery, Background) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
(
|
||||
Discovery {
|
||||
tx,
|
||||
},
|
||||
Background {
|
||||
rx,
|
||||
impl Config {
|
||||
pub(super) fn new(
|
||||
request_rx: mpsc::UnboundedReceiver<ResolveRequest>,
|
||||
dns_config: dns::Config,
|
||||
default_destination_namespace: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
request_rx,
|
||||
dns_config,
|
||||
default_destination_namespace,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// ==== impl Discovery =====
|
||||
|
||||
impl Discovery {
|
||||
/// Start watching for address changes for a certain authority.
|
||||
pub fn resolve<B>(&self, authority: &DnsNameAndPort, bind: B) -> Watch<B> {
|
||||
trace!("resolve; authority={:?}", authority);
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
self.tx
|
||||
.unbounded_send((authority.clone(), tx))
|
||||
.expect("unbounded can't fail");
|
||||
|
||||
Watch {
|
||||
rx,
|
||||
metric_labels: HashMap::new(),
|
||||
bind,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ==== impl Endpoint =====
|
||||
|
||||
impl Endpoint {
|
||||
pub fn new(address: SocketAddr, dst_labels: DstLabelsWatch) -> Self {
|
||||
Self {
|
||||
address,
|
||||
dst_labels: Some(dst_labels),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn address(&self) -> SocketAddr {
|
||||
self.address
|
||||
}
|
||||
|
||||
pub fn dst_labels(&self) -> Option<&DstLabelsWatch> {
|
||||
self.dst_labels.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SocketAddr> for Endpoint {
|
||||
fn from(address: SocketAddr) -> Self {
|
||||
Self {
|
||||
address,
|
||||
dst_labels: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl hash::Hash for Endpoint {
|
||||
fn hash<H: hash::Hasher>(&self, state: &mut H) {
|
||||
self.address.hash(state)
|
||||
}
|
||||
}
|
||||
|
||||
impl cmp::PartialEq for Endpoint {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.address.eq(&other.address)
|
||||
}
|
||||
}
|
||||
|
||||
impl cmp::Eq for Endpoint {}
|
||||
|
||||
// ==== impl Watch =====
|
||||
|
||||
impl<B> Watch<B> {
|
||||
fn update_metadata(&mut self, addr: SocketAddr, meta: Metadata) -> Result<(), ()> {
|
||||
if let Some(store) = self.metric_labels.get_mut(&addr) {
|
||||
store.store(meta.metric_labels)
|
||||
.map_err(|e| {
|
||||
error!("update_metadata: label store error: {:?}", e);
|
||||
})
|
||||
.map(|_| ())
|
||||
} else {
|
||||
// The store has already been removed, so nobody cares about
|
||||
// the metadata change. We expect that this shouldn't happen,
|
||||
// but if it does, log a warning and handle it gracefully.
|
||||
warn!(
|
||||
"update_metadata: ignoring ChangeMetadata for {:?} \
|
||||
because the service no longer exists.",
|
||||
addr
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, A> Discover for Watch<B>
|
||||
where
|
||||
B: Bind<Endpoint = Endpoint, Request = http::Request<A>>,
|
||||
{
|
||||
type Key = SocketAddr;
|
||||
type Request = B::Request;
|
||||
type Response = B::Response;
|
||||
type Error = B::Error;
|
||||
type Service = B::Service;
|
||||
type DiscoverError = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
|
||||
loop {
|
||||
let up = self.rx.poll();
|
||||
trace!("watch: {:?}", up);
|
||||
let update = try_ready!(up).expect("discovery stream must be infinite");
|
||||
|
||||
match update {
|
||||
Update::Insert(addr, meta) => {
|
||||
// Construct a watch for the `Labeled` middleware that will
|
||||
// wrap the bound service, and insert the store into our map
|
||||
// so it can be updated later.
|
||||
let (labels_watch, labels_store) =
|
||||
futures_watch::Watch::new(meta.metric_labels);
|
||||
self.metric_labels.insert(addr, labels_store);
|
||||
|
||||
let endpoint = Endpoint::new(addr, labels_watch.clone());
|
||||
|
||||
let service = self.bind.bind(&endpoint)
|
||||
.map_err(|_| ())?;
|
||||
|
||||
return Ok(Async::Ready(Change::Insert(addr, service)))
|
||||
},
|
||||
Update::ChangeMetadata(addr, meta) => {
|
||||
// Update metadata and continue polling `rx`.
|
||||
self.update_metadata(addr, meta)?;
|
||||
},
|
||||
Update::Remove(addr) => {
|
||||
// It's safe to drop the store handle here, even if
|
||||
// the `Labeled` middleware using the watch handle
|
||||
// still exists --- it will simply read the final
|
||||
// value from the watch.
|
||||
self.metric_labels.remove(&addr);
|
||||
return Ok(Async::Ready(Change::Remove(addr)));
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ==== impl Background =====
|
||||
|
||||
impl Background {
|
||||
/// Bind this handle to start talking to the controller API.
|
||||
pub fn work<T>(self, executor: &Handle) -> DiscoveryWork<T>
|
||||
where T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
|
||||
T::Error: fmt::Debug,
|
||||
pub fn process<T>(self, executor: &Handle) -> Process<T>
|
||||
where
|
||||
T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
|
||||
T::Error: fmt::Debug,
|
||||
{
|
||||
DiscoveryWork {
|
||||
Process {
|
||||
dns_resolver: dns::Resolver::new(self.dns_config, executor),
|
||||
default_destination_namespace: self.default_destination_namespace,
|
||||
destinations: HashMap::new(),
|
||||
reconnects: VecDeque::new(),
|
||||
rpc_ready: false,
|
||||
rx: self.rx,
|
||||
request_rx: self.request_rx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ==== impl DiscoveryWork =====
|
||||
// ==== impl Process =====
|
||||
|
||||
impl<T> DiscoveryWork<T>
|
||||
impl<T> Process<T>
|
||||
where
|
||||
T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
|
||||
T::Error: fmt::Debug,
|
||||
|
@ -328,16 +122,16 @@ where
|
|||
match client.poll_ready() {
|
||||
Ok(Async::Ready(())) => {
|
||||
self.rpc_ready = true;
|
||||
}
|
||||
},
|
||||
Ok(Async::NotReady) => {
|
||||
self.rpc_ready = false;
|
||||
break;
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
warn!("Destination.Get poll_ready error: {:?}", err);
|
||||
self.rpc_ready = false;
|
||||
break;
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
// handle any pending reconnects first
|
||||
|
@ -346,40 +140,40 @@ where
|
|||
}
|
||||
|
||||
// check for any new watches
|
||||
match self.rx.poll() {
|
||||
Ok(Async::Ready(Some((auth, tx)))) => {
|
||||
trace!("Destination.Get {:?}", auth);
|
||||
match self.destinations.entry(auth) {
|
||||
match self.request_rx.poll() {
|
||||
Ok(Async::Ready(Some(ResolveRequest {
|
||||
authority,
|
||||
update_tx,
|
||||
}))) => {
|
||||
trace!("Destination.Get {:?}", authority);
|
||||
match self.destinations.entry(authority) {
|
||||
Entry::Occupied(mut occ) => {
|
||||
let set = occ.get_mut();
|
||||
// we may already know of some addresses here, so push
|
||||
// them onto the new watch first
|
||||
match set.addrs {
|
||||
Exists::Yes(ref cache) => {
|
||||
for (&addr, meta) in cache {
|
||||
let update = Update::Insert(
|
||||
addr,
|
||||
meta.clone()
|
||||
);
|
||||
tx.unbounded_send(update)
|
||||
.expect("unbounded_send does not fail");
|
||||
}
|
||||
Exists::Yes(ref cache) => for (&addr, meta) in cache {
|
||||
let update = Update::Insert(addr, meta.clone());
|
||||
update_tx
|
||||
.unbounded_send(update)
|
||||
.expect("unbounded_send does not fail");
|
||||
},
|
||||
Exists::No | Exists::Unknown => (),
|
||||
}
|
||||
set.txs.push(tx);
|
||||
}
|
||||
set.txs.push(update_tx);
|
||||
},
|
||||
Entry::Vacant(vac) => {
|
||||
let query = Self::query_destination_service_if_relevant(
|
||||
&self.default_destination_namespace,
|
||||
client,
|
||||
vac.key(),
|
||||
"connect");
|
||||
"connect",
|
||||
);
|
||||
let mut set = DestinationSet {
|
||||
addrs: Exists::Unknown,
|
||||
query,
|
||||
dns_query: None,
|
||||
txs: vec![tx],
|
||||
txs: vec![update_tx],
|
||||
};
|
||||
// If the authority is one for which the Destination service is never
|
||||
// relevant (e.g. an absolute name that doesn't end in ".svc.$zone." in
|
||||
|
@ -388,16 +182,17 @@ where
|
|||
set.reset_dns_query(
|
||||
&self.dns_resolver,
|
||||
Duration::from_secs(0),
|
||||
vac.key());
|
||||
vac.key(),
|
||||
);
|
||||
}
|
||||
vac.insert(set);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
Ok(Async::Ready(None)) => {
|
||||
trace!("Discover tx is dropped, shutdown?");
|
||||
return;
|
||||
}
|
||||
},
|
||||
Ok(Async::NotReady) => break,
|
||||
Err(_) => unreachable!("unbounded receiver doesn't error"),
|
||||
}
|
||||
|
@ -414,7 +209,8 @@ where
|
|||
&self.default_destination_namespace,
|
||||
client,
|
||||
&auth,
|
||||
"reconnect");
|
||||
"reconnect",
|
||||
);
|
||||
return true;
|
||||
} else {
|
||||
trace!("reconnect no longer needed: {:?}", auth);
|
||||
|
@ -427,7 +223,7 @@ where
|
|||
for (auth, set) in &mut self.destinations {
|
||||
// Query the Destination service first.
|
||||
let (new_query, found_by_destination_service) = match set.query.take() {
|
||||
Some(Remote::ConnectedOrConnecting{ rx }) => {
|
||||
Some(Remote::ConnectedOrConnecting { rx }) => {
|
||||
let (new_query, found_by_destination_service) =
|
||||
set.poll_destination_service(auth, rx);
|
||||
if let Remote::NeedsReconnect = new_query {
|
||||
|
@ -473,36 +269,45 @@ where
|
|||
default_destination_namespace: &str,
|
||||
client: &mut T,
|
||||
auth: &DnsNameAndPort,
|
||||
connect_or_reconnect: &str)
|
||||
-> Option<DestinationServiceQuery<T>>
|
||||
{
|
||||
trace!("DestinationServiceQuery {} {:?}", connect_or_reconnect, auth);
|
||||
FullyQualifiedAuthority::normalize(auth, default_destination_namespace)
|
||||
.map(|auth| {
|
||||
let req = Destination {
|
||||
scheme: "k8s".into(),
|
||||
path: auth.without_trailing_dot().to_owned(),
|
||||
};
|
||||
let mut svc = DestinationSvc::new(client.lift_ref());
|
||||
let response = svc.get(grpc::Request::new(req));
|
||||
Remote::ConnectedOrConnecting { rx: Receiver::new(response) }
|
||||
})
|
||||
connect_or_reconnect: &str,
|
||||
) -> Option<DestinationServiceQuery<T>> {
|
||||
trace!(
|
||||
"DestinationServiceQuery {} {:?}",
|
||||
connect_or_reconnect,
|
||||
auth
|
||||
);
|
||||
FullyQualifiedAuthority::normalize(auth, default_destination_namespace).map(|auth| {
|
||||
let req = Destination {
|
||||
scheme: "k8s".into(),
|
||||
path: auth.without_trailing_dot().to_owned(),
|
||||
};
|
||||
let mut svc = DestinationSvc::new(client.lift_ref());
|
||||
let response = svc.get(grpc::Request::new(req));
|
||||
Remote::ConnectedOrConnecting {
|
||||
rx: Receiver::new(response),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl DestinationSet =====
|
||||
|
||||
impl<T> DestinationSet<T>
|
||||
where T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
|
||||
T::Error: fmt::Debug
|
||||
where
|
||||
T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>,
|
||||
T::Error: fmt::Debug,
|
||||
{
|
||||
fn reset_dns_query(
|
||||
&mut self,
|
||||
dns_resolver: &dns::Resolver,
|
||||
delay: Duration,
|
||||
authority: &DnsNameAndPort)
|
||||
{
|
||||
trace!("resetting DNS query for {} with delay {:?}", authority.host, delay);
|
||||
authority: &DnsNameAndPort,
|
||||
) {
|
||||
trace!(
|
||||
"resetting DNS query for {} with delay {:?}",
|
||||
authority.host,
|
||||
delay
|
||||
);
|
||||
self.reset_on_next_modification();
|
||||
self.dns_query = Some(dns_resolver.resolve_all_ips(delay, &authority.host));
|
||||
}
|
||||
|
@ -514,9 +319,8 @@ impl<T> DestinationSet<T>
|
|||
fn poll_destination_service(
|
||||
&mut self,
|
||||
auth: &DnsNameAndPort,
|
||||
mut rx: UpdateRx<T>)
|
||||
-> (DestinationServiceQuery<T>, Exists<()>)
|
||||
{
|
||||
mut rx: UpdateRx<T>,
|
||||
) -> (DestinationServiceQuery<T>, Exists<()>) {
|
||||
let mut exists = Exists::Unknown;
|
||||
|
||||
loop {
|
||||
|
@ -524,7 +328,9 @@ impl<T> DestinationSet<T>
|
|||
Ok(Async::Ready(Some(update))) => match update.update {
|
||||
Some(PbUpdate2::Add(a_set)) => {
|
||||
let set_labels = a_set.metric_labels;
|
||||
let addrs = a_set.addrs.into_iter()
|
||||
let addrs = a_set
|
||||
.addrs
|
||||
.into_iter()
|
||||
.filter_map(|pb| pb_to_addr_meta(pb, &set_labels));
|
||||
self.add(auth, addrs)
|
||||
},
|
||||
|
@ -532,7 +338,10 @@ impl<T> DestinationSet<T>
|
|||
exists = Exists::Yes(());
|
||||
self.remove(
|
||||
auth,
|
||||
r_set.addrs.iter().filter_map(|addr| pb_to_sock_addr(addr.clone()))
|
||||
r_set
|
||||
.addrs
|
||||
.iter()
|
||||
.filter_map(|addr| pb_to_sock_addr(addr.clone())),
|
||||
);
|
||||
},
|
||||
Some(PbUpdate2::NoEndpoints(ref no_endpoints)) if no_endpoints.exists => {
|
||||
|
@ -558,7 +367,7 @@ impl<T> DestinationSet<T>
|
|||
Err(err) => {
|
||||
warn!("Destination.Get stream errored for {:?}: {:?}", auth, err);
|
||||
return (Remote::NeedsReconnect, exists);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -574,13 +383,26 @@ impl<T> DestinationSet<T>
|
|||
return;
|
||||
},
|
||||
Ok(Async::Ready(dns::Response::Exists(ips))) => {
|
||||
trace!("positive result of DNS query for {:?}: {:?}", authority, ips);
|
||||
self.add(authority, ips.iter().map(|ip| {
|
||||
(SocketAddr::from((ip, authority.port)), Metadata::no_metadata())
|
||||
}));
|
||||
trace!(
|
||||
"positive result of DNS query for {:?}: {:?}",
|
||||
authority,
|
||||
ips
|
||||
);
|
||||
self.add(
|
||||
authority,
|
||||
ips.iter().map(|ip| {
|
||||
(
|
||||
SocketAddr::from((ip, authority.port)),
|
||||
Metadata::no_metadata(),
|
||||
)
|
||||
}),
|
||||
);
|
||||
},
|
||||
Ok(Async::Ready(dns::Response::DoesNotExist)) => {
|
||||
trace!("negative result (NXDOMAIN) of DNS query for {:?}", authority);
|
||||
trace!(
|
||||
"negative result (NXDOMAIN) of DNS query for {:?}",
|
||||
authority
|
||||
);
|
||||
self.no_endpoints(authority, false);
|
||||
},
|
||||
Err(e) => {
|
||||
|
@ -596,38 +418,39 @@ impl<T> DestinationSet<T>
|
|||
}
|
||||
}
|
||||
|
||||
impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
||||
impl<T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
||||
fn reset_on_next_modification(&mut self) {
|
||||
match self.addrs {
|
||||
Exists::Yes(ref mut cache) => {
|
||||
cache.set_reset_on_next_modification();
|
||||
},
|
||||
Exists::No |
|
||||
Exists::Unknown => (),
|
||||
Exists::No | Exists::Unknown => (),
|
||||
}
|
||||
}
|
||||
|
||||
fn add<A>(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_add: A)
|
||||
where A: Iterator<Item = (SocketAddr, Metadata)>
|
||||
where
|
||||
A: Iterator<Item = (SocketAddr, Metadata)>,
|
||||
{
|
||||
let mut cache = match self.addrs.take() {
|
||||
Exists::Yes(mut cache) => cache,
|
||||
Exists::Unknown | Exists::No => Cache::new(),
|
||||
};
|
||||
cache.update_union(
|
||||
addrs_to_add,
|
||||
&mut |change| Self::on_change(&mut self.txs, authority_for_logging, change));
|
||||
cache.update_union(addrs_to_add, &mut |change| {
|
||||
Self::on_change(&mut self.txs, authority_for_logging, change)
|
||||
});
|
||||
self.addrs = Exists::Yes(cache);
|
||||
}
|
||||
|
||||
fn remove<A>(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_remove: A)
|
||||
where A: Iterator<Item = SocketAddr>
|
||||
where
|
||||
A: Iterator<Item = SocketAddr>,
|
||||
{
|
||||
let cache = match self.addrs.take() {
|
||||
Exists::Yes(mut cache) => {
|
||||
cache.remove(
|
||||
addrs_to_remove,
|
||||
&mut |change| Self::on_change(&mut self.txs, authority_for_logging, change));
|
||||
cache.remove(addrs_to_remove, &mut |change| {
|
||||
Self::on_change(&mut self.txs, authority_for_logging, change)
|
||||
});
|
||||
cache
|
||||
},
|
||||
Exists::Unknown | Exists::No => Cache::new(),
|
||||
|
@ -636,12 +459,16 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
}
|
||||
|
||||
fn no_endpoints(&mut self, authority_for_logging: &DnsNameAndPort, exists: bool) {
|
||||
trace!("no endpoints for {:?} that is known to {}", authority_for_logging,
|
||||
if exists { "exist" } else { "not exist" });
|
||||
trace!(
|
||||
"no endpoints for {:?} that is known to {}",
|
||||
authority_for_logging,
|
||||
if exists { "exist" } else { "not exist" }
|
||||
);
|
||||
match self.addrs.take() {
|
||||
Exists::Yes(mut cache) => {
|
||||
cache.clear(
|
||||
&mut |change| Self::on_change(&mut self.txs, authority_for_logging, change));
|
||||
cache.clear(&mut |change| {
|
||||
Self::on_change(&mut self.txs, authority_for_logging, change)
|
||||
});
|
||||
},
|
||||
Exists::Unknown | Exists::No => (),
|
||||
};
|
||||
|
@ -652,38 +479,33 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
};
|
||||
}
|
||||
|
||||
fn on_change(txs: &mut Vec<mpsc::UnboundedSender<Update>>,
|
||||
authority_for_logging: &DnsNameAndPort,
|
||||
change: CacheChange<SocketAddr, Metadata>) {
|
||||
fn on_change(
|
||||
txs: &mut Vec<mpsc::UnboundedSender<Update>>,
|
||||
authority_for_logging: &DnsNameAndPort,
|
||||
change: CacheChange<SocketAddr, Metadata>,
|
||||
) {
|
||||
let (update_str, update, addr) = match change {
|
||||
CacheChange::Insertion { key, value } =>
|
||||
("insert", Update::Insert(key, value.clone()), key),
|
||||
CacheChange::Removal { key } =>
|
||||
("remove", Update::Remove(key), key),
|
||||
CacheChange::Modification { key, new_value } =>
|
||||
("change metadata for", Update::ChangeMetadata(key, new_value.clone()), key),
|
||||
CacheChange::Insertion { key, value } => {
|
||||
("insert", Update::Insert(key, value.clone()), key)
|
||||
},
|
||||
CacheChange::Removal { key } => ("remove", Update::Remove(key), key),
|
||||
CacheChange::Modification { key, new_value } => (
|
||||
"change metadata for",
|
||||
Update::ChangeMetadata(key, new_value.clone()),
|
||||
key,
|
||||
),
|
||||
};
|
||||
trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging);
|
||||
// retain is used to drop any senders that are dead
|
||||
txs.retain(|tx| {
|
||||
tx.unbounded_send(update.clone()).is_ok()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Metadata =====
|
||||
|
||||
impl Metadata {
|
||||
fn no_metadata() -> Self {
|
||||
Metadata {
|
||||
metric_labels: None,
|
||||
}
|
||||
txs.retain(|tx| tx.unbounded_send(update.clone()).is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`.
|
||||
fn pb_to_addr_meta(pb: WeightedAddr, set_labels: &HashMap<String, String>)
|
||||
-> Option<(SocketAddr, Metadata)> {
|
||||
fn pb_to_addr_meta(
|
||||
pb: WeightedAddr,
|
||||
set_labels: &HashMap<String, String>,
|
||||
) -> Option<(SocketAddr, Metadata)> {
|
||||
let addr = pb.addr.and_then(pb_to_sock_addr)?;
|
||||
let label_iter = set_labels.iter().chain(pb.metric_labels.iter());
|
||||
let meta = Metadata {
|
||||
|
@ -715,7 +537,7 @@ fn pb_to_sock_addr(pb: TcpAddress) -> Option<SocketAddr> {
|
|||
Some(Ip::Ipv4(octets)) => {
|
||||
let ipv4 = Ipv4Addr::from(octets);
|
||||
Some(SocketAddr::from((ipv4, pb.port as u16)))
|
||||
}
|
||||
},
|
||||
Some(Ip::Ipv6(v6)) => {
|
||||
let octets = [
|
||||
(v6.first >> 56) as u8,
|
||||
|
@ -737,7 +559,7 @@ fn pb_to_sock_addr(pb: TcpAddress) -> Option<SocketAddr> {
|
|||
];
|
||||
let ipv6 = Ipv6Addr::from(octets);
|
||||
Some(SocketAddr::from((ipv6, pb.port as u16)))
|
||||
}
|
||||
},
|
||||
None => None,
|
||||
},
|
||||
None => None,
|
|
@ -0,0 +1,57 @@
|
|||
use futures_watch;
|
||||
use std::{cmp, hash, net::SocketAddr};
|
||||
|
||||
use telemetry::metrics::DstLabels;
|
||||
|
||||
pub type DstLabelsWatch = futures_watch::Watch<Option<DstLabels>>;
|
||||
|
||||
/// An individual traffic target.
|
||||
///
|
||||
/// Equality, Ordering, and hashability is determined soley by the Endpoint's address.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Endpoint {
|
||||
address: SocketAddr,
|
||||
dst_labels: Option<DstLabelsWatch>,
|
||||
}
|
||||
|
||||
// ==== impl Endpoint =====
|
||||
|
||||
impl Endpoint {
|
||||
pub fn new(address: SocketAddr, dst_labels: DstLabelsWatch) -> Self {
|
||||
Self {
|
||||
address,
|
||||
dst_labels: Some(dst_labels),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn address(&self) -> SocketAddr {
|
||||
self.address
|
||||
}
|
||||
|
||||
pub fn dst_labels(&self) -> Option<&DstLabelsWatch> {
|
||||
self.dst_labels.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SocketAddr> for Endpoint {
|
||||
fn from(address: SocketAddr) -> Self {
|
||||
Self {
|
||||
address,
|
||||
dst_labels: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl hash::Hash for Endpoint {
|
||||
fn hash<H: hash::Hasher>(&self, state: &mut H) {
|
||||
self.address.hash(state)
|
||||
}
|
||||
}
|
||||
|
||||
impl cmp::PartialEq for Endpoint {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.address.eq(&other.address)
|
||||
}
|
||||
}
|
||||
|
||||
impl cmp::Eq for Endpoint {}
|
|
@ -0,0 +1,203 @@
|
|||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use futures::sync::mpsc;
|
||||
use futures::{Async, Poll, Stream};
|
||||
use futures_watch::{Store, Watch};
|
||||
use http;
|
||||
use tower_discover::{Change, Discover};
|
||||
use tower_service::Service;
|
||||
|
||||
use dns;
|
||||
use telemetry::metrics::DstLabels;
|
||||
use transport::DnsNameAndPort;
|
||||
|
||||
pub mod background;
|
||||
mod endpoint;
|
||||
|
||||
pub use self::endpoint::{DstLabelsWatch, Endpoint};
|
||||
|
||||
/// A handle to request resolutions from a `Background`.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Resolver {
|
||||
request_tx: mpsc::UnboundedSender<ResolveRequest>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ResolveRequest {
|
||||
authority: DnsNameAndPort,
|
||||
update_tx: mpsc::UnboundedSender<Update>,
|
||||
}
|
||||
|
||||
/// A `tower_discover::Discover`, given to a `tower_balance::Balance`.
|
||||
#[derive(Debug)]
|
||||
pub struct Resolution<B> {
|
||||
update_rx: mpsc::UnboundedReceiver<Update>,
|
||||
/// Map associating addresses with the `Store` for the watch on that
|
||||
/// service's metric labels (as provided by the Destination service).
|
||||
///
|
||||
/// This is used to update the `Labeled` middleware on those services
|
||||
/// without requiring the service stack to be re-bound.
|
||||
metric_labels: HashMap<SocketAddr, Store<Option<DstLabels>>>,
|
||||
bind: B,
|
||||
}
|
||||
|
||||
/// .
|
||||
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
|
||||
struct Metadata {
|
||||
/// A set of Prometheus metric labels describing the destination.
|
||||
metric_labels: Option<DstLabels>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum Update {
|
||||
Insert(SocketAddr, Metadata),
|
||||
Remove(SocketAddr),
|
||||
ChangeMetadata(SocketAddr, Metadata),
|
||||
}
|
||||
|
||||
/// Bind a `SocketAddr` with a protocol.
|
||||
pub trait Bind {
|
||||
/// The type of endpoint upon which a `Service` is bound.
|
||||
type Endpoint;
|
||||
|
||||
/// Requests handled by the discovered services
|
||||
type Request;
|
||||
|
||||
/// Responses given by the discovered services
|
||||
type Response;
|
||||
|
||||
/// Errors produced by the discovered services
|
||||
type Error;
|
||||
|
||||
type BindError;
|
||||
|
||||
/// The discovered `Service` instance.
|
||||
type Service: Service<Request = Self::Request, Response = Self::Response, Error = Self::Error>;
|
||||
|
||||
/// Bind a service from an endpoint.
|
||||
fn bind(&self, addr: &Self::Endpoint) -> Result<Self::Service, Self::BindError>;
|
||||
}
|
||||
|
||||
/// Creates a "channel" of `Resolver` to `Background` handles.
|
||||
///
|
||||
/// The `Resolver` is used by a listener, the `Background` is consumed
|
||||
/// on the controller thread.
|
||||
pub fn new(
|
||||
dns_config: dns::Config,
|
||||
default_destination_namespace: String,
|
||||
) -> (Resolver, background::Config) {
|
||||
let (request_tx, rx) = mpsc::unbounded();
|
||||
let disco = Resolver { request_tx };
|
||||
let bg = background::Config::new(rx, dns_config, default_destination_namespace);
|
||||
(disco, bg)
|
||||
}
|
||||
|
||||
// ==== impl Resolver =====
|
||||
|
||||
impl Resolver {
|
||||
/// Start watching for address changes for a certain authority.
|
||||
pub fn resolve<B>(&self, authority: &DnsNameAndPort, bind: B) -> Resolution<B> {
|
||||
trace!("resolve; authority={:?}", authority);
|
||||
let (update_tx, update_rx) = mpsc::unbounded();
|
||||
let req = {
|
||||
let authority = authority.clone();
|
||||
ResolveRequest {
|
||||
authority,
|
||||
update_tx,
|
||||
}
|
||||
};
|
||||
self.request_tx
|
||||
.unbounded_send(req)
|
||||
.expect("unbounded can't fail");
|
||||
|
||||
Resolution {
|
||||
update_rx,
|
||||
metric_labels: HashMap::new(),
|
||||
bind,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ==== impl Resolution =====
|
||||
|
||||
impl<B> Resolution<B> {
|
||||
fn update_metadata(&mut self, addr: SocketAddr, meta: Metadata) -> Result<(), ()> {
|
||||
if let Some(store) = self.metric_labels.get_mut(&addr) {
|
||||
store
|
||||
.store(meta.metric_labels)
|
||||
.map_err(|e| {
|
||||
error!("update_metadata: label store error: {:?}", e);
|
||||
})
|
||||
.map(|_| ())
|
||||
} else {
|
||||
// The store has already been removed, so nobody cares about
|
||||
// the metadata change. We expect that this shouldn't happen,
|
||||
// but if it does, log a warning and handle it gracefully.
|
||||
warn!(
|
||||
"update_metadata: ignoring ChangeMetadata for {:?} because the service no longer \
|
||||
exists.",
|
||||
addr
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, A> Discover for Resolution<B>
|
||||
where
|
||||
B: Bind<Endpoint = Endpoint, Request = http::Request<A>>,
|
||||
{
|
||||
type Key = SocketAddr;
|
||||
type Request = B::Request;
|
||||
type Response = B::Response;
|
||||
type Error = B::Error;
|
||||
type Service = B::Service;
|
||||
type DiscoverError = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
|
||||
loop {
|
||||
let up = self.update_rx.poll();
|
||||
trace!("watch: {:?}", up);
|
||||
let update = try_ready!(up).expect("destination stream must be infinite");
|
||||
|
||||
match update {
|
||||
Update::Insert(addr, meta) => {
|
||||
// Construct a watch for the `Labeled` middleware that will
|
||||
// wrap the bound service, and insert the store into our map
|
||||
// so it can be updated later.
|
||||
let (labels_watch, labels_store) = Watch::new(meta.metric_labels);
|
||||
self.metric_labels.insert(addr, labels_store);
|
||||
|
||||
let endpoint = Endpoint::new(addr, labels_watch.clone());
|
||||
|
||||
let service = self.bind.bind(&endpoint).map_err(|_| ())?;
|
||||
|
||||
return Ok(Async::Ready(Change::Insert(addr, service)));
|
||||
},
|
||||
Update::ChangeMetadata(addr, meta) => {
|
||||
// Update metadata and continue polling `rx`.
|
||||
self.update_metadata(addr, meta)?;
|
||||
},
|
||||
Update::Remove(addr) => {
|
||||
// It's safe to drop the store handle here, even if
|
||||
// the `Labeled` middleware using the watch handle
|
||||
// still exists --- it will simply read the final
|
||||
// value from the watch.
|
||||
self.metric_labels.remove(&addr);
|
||||
return Ok(Async::Ready(Change::Remove(addr)));
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Metadata =====
|
||||
|
||||
impl Metadata {
|
||||
fn no_metadata() -> Self {
|
||||
Metadata {
|
||||
metric_labels: None,
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,28 +21,28 @@ use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect};
|
|||
use timeout::{Timeout, TimeoutError};
|
||||
|
||||
mod cache;
|
||||
pub mod discovery;
|
||||
pub mod destination;
|
||||
mod fully_qualified_authority;
|
||||
mod observe;
|
||||
pub mod pb;
|
||||
mod remote_stream;
|
||||
|
||||
use self::discovery::{Background as DiscoBg, Discovery, Watch};
|
||||
pub use self::discovery::Bind;
|
||||
use self::destination::{Resolver, Resolution};
|
||||
pub use self::destination::Bind;
|
||||
pub use self::observe::Observe;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Control {
|
||||
disco: Discovery,
|
||||
disco: Resolver,
|
||||
}
|
||||
|
||||
pub struct Background {
|
||||
disco: DiscoBg,
|
||||
disco: destination::background::Config,
|
||||
}
|
||||
|
||||
pub fn new(dns_config: dns::Config, default_destination_namespace: String) -> (Control, Background)
|
||||
{
|
||||
let (tx, rx) = self::discovery::new(dns_config, default_destination_namespace);
|
||||
let (tx, rx) = self::destination::new(dns_config, default_destination_namespace);
|
||||
|
||||
let c = Control {
|
||||
disco: tx,
|
||||
|
@ -58,7 +58,7 @@ pub fn new(dns_config: dns::Config, default_destination_namespace: String) -> (C
|
|||
// ===== impl Control =====
|
||||
|
||||
impl Control {
|
||||
pub fn resolve<B>(&self, auth: &DnsNameAndPort, bind: B) -> Watch<B> {
|
||||
pub fn resolve<B>(&self, auth: &DnsNameAndPort, bind: B) -> Resolution<B> {
|
||||
self.disco.resolve(auth, bind)
|
||||
}
|
||||
}
|
||||
|
@ -97,7 +97,7 @@ impl Background {
|
|||
AddOrigin::new(scheme, authority, backoff)
|
||||
};
|
||||
|
||||
let mut disco = self.disco.work(executor);
|
||||
let mut disco = self.disco.process(executor);
|
||||
|
||||
let fut = future::poll_fn(move || {
|
||||
disco.poll_rpc(&mut client);
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use http;
|
||||
use std::sync::Arc;
|
||||
|
||||
use control::discovery::DstLabelsWatch;
|
||||
use control::destination::DstLabelsWatch;
|
||||
use ctx;
|
||||
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::{cmp, hash};
|
|||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
|
||||
use control::discovery::DstLabelsWatch;
|
||||
use control::destination::DstLabelsWatch;
|
||||
use ctx;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
|
|
|
@ -14,8 +14,8 @@ use tower_h2;
|
|||
use conduit_proxy_router::Recognize;
|
||||
|
||||
use bind::{self, Bind, Protocol};
|
||||
use control::{self, discovery};
|
||||
use control::discovery::Bind as BindTrait;
|
||||
use control;
|
||||
use control::destination::{Bind as BindTrait, Resolution};
|
||||
use ctx;
|
||||
use timeout::Timeout;
|
||||
use transparency::h1;
|
||||
|
@ -170,7 +170,7 @@ where
|
|||
}
|
||||
|
||||
pub enum Discovery<B> {
|
||||
NamedSvc(discovery::Watch<BindProtocol<B>>),
|
||||
NamedSvc(Resolution<BindProtocol<B>>),
|
||||
ImplicitOriginalDst(Option<(SocketAddr, BindProtocol<B>)>),
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue