From a615834f7b72d234e2c09f437e1e045ba20928ff Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 1 Aug 2018 14:14:34 -0700 Subject: [PATCH] Refactor `control::destination::background` into smaller modules (#34) This branch is purely a refactor and should result in no functional changes. The `control::destination::background` module has become quite large, making the code difficult to read and review changes to. This branch separates out the `DestinationSet` type and the destination service client code into their own modules inside of `background`. Furthermore, it rolls the `control::utils` module into the `client` submodule of `background`, as that code is only used in the `client` module. I think there's some additional work that can be done to make this code clearer beyond simply splitting apart some of these large files, and I intend to do some refactoring in additional follow-up branches. Signed-off-by: Eliza Weisman --- src/control/destination/background.rs | 742 ------------------ .../background/client.rs} | 233 ++++-- .../destination/background/destination_set.rs | 352 +++++++++ src/control/destination/background/mod.rs | 336 ++++++++ src/control/mod.rs | 1 - 5 files changed, 852 insertions(+), 812 deletions(-) delete mode 100644 src/control/destination/background.rs rename src/control/{util.rs => destination/background/client.rs} (65%) create mode 100644 src/control/destination/background/destination_set.rs create mode 100644 src/control/destination/background/mod.rs diff --git a/src/control/destination/background.rs b/src/control/destination/background.rs deleted file mode 100644 index e4daee480..000000000 --- a/src/control/destination/background.rs +++ /dev/null @@ -1,742 +0,0 @@ -use std::collections::{ - hash_map::{Entry, HashMap}, - VecDeque, -}; -use std::fmt; -use std::iter::IntoIterator; -use std::net::SocketAddr; -use std::time::{Instant, Duration}; - -use bytes::Bytes; -use futures::{ - future, - sync::mpsc, - Async, Future, Poll, Stream, -}; -use h2; -use http; -use tower_grpc as grpc; -use tower_h2::{self, BoxBody, HttpService, RecvBody}; -use tower_reconnect::Reconnect; - -use linkerd2_proxy_api::destination::client::Destination; -use linkerd2_proxy_api::destination::update::Update as PbUpdate2; -use linkerd2_proxy_api::destination::{ - GetDestination, - Update as PbUpdate, - WeightedAddr, -}; -use linkerd2_proxy_api::net::TcpAddress; - -use super::{Metadata, ResolveRequest, Responder, Update}; -use config::Namespaces; -use control::{ - cache::{Cache, CacheChange, Exists}, - fully_qualified_authority::FullyQualifiedAuthority, - remote_stream::{Receiver, Remote}, - util::{AddOrigin, Backoff, LogErrors}, -}; -use dns::{self, IpAddrListFuture}; -use telemetry::metrics::DstLabels; -use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect}; -use timeout::Timeout; -use transport::tls; -use conditional::Conditional; -use watch_service::{Rebind, WatchService}; -use futures_watch::Watch; - -type DestinationServiceQuery = Remote; -type UpdateRx = Receiver; - -/// Type of the client service stack used to make destination requests. -type ClientService = AddOrigin, - ::logging::ContextualExecutor< - ::logging::Client< - &'static str, - HostAndPort - > - >, - BoxBody, - > - >>>>; - -/// Satisfies resolutions as requested via `request_rx`. -/// -/// As the `Background` is polled with a client to Destination service, if the client to the -/// service is healthy, it reads requests from `request_rx`, determines how to resolve the -/// provided authority to a set of addresses, and ensures that resolution updates are -/// propagated to all requesters. -struct Background> { - dns_resolver: dns::Resolver, - namespaces: Namespaces, - destinations: HashMap>, - /// A queue of authorities that need to be reconnected. - reconnects: VecDeque, - /// The Destination.Get RPC client service. - /// Each poll, records whether the rpc service was till ready. - rpc_ready: bool, - /// A receiver of new watch requests. - request_rx: mpsc::UnboundedReceiver, -} - -/// Holds the state of a single resolution. -struct DestinationSet> { - addrs: Exists>, - query: Option>, - dns_query: Option, - responders: Vec, -} - -/// The state needed to bind a new controller client stack. -struct BindClient { - backoff_delay: Duration, - identity: Conditional, - host_and_port: HostAndPort, - dns_resolver: dns::Resolver, - log_ctx: ::logging::Client<&'static str, HostAndPort>, -} - -/// Returns a new discovery background task. -pub(super) fn task( - request_rx: mpsc::UnboundedReceiver, - dns_resolver: dns::Resolver, - namespaces: Namespaces, - host_and_port: Option, - controller_tls: tls::ConditionalConnectionConfig, - control_backoff_delay: Duration, -) -> impl Future -{ - // Build up the Controller Client Stack - let mut client = host_and_port.map(|host_and_port| { - let (identity, watch) = match controller_tls { - Conditional::Some(cfg) => - (Conditional::Some(cfg.server_identity), cfg.config), - Conditional::None(reason) => { - // If there's no connection config, then construct a new - // `Watch` that never updates to construct the `WatchService`. - // We do this here rather than calling `ClientConfig::no_tls` - // in order to propagate the reason for no TLS to the watch. - let (watch, _) = Watch::new(Conditional::None(reason)); - (Conditional::None(reason), watch) - }, - }; - let bind_client = BindClient::new( - identity, - &dns_resolver, - host_and_port, - control_backoff_delay, - ); - WatchService::new(watch, bind_client) - }); - - let mut disco = Background::new( - request_rx, - dns_resolver, - namespaces, - ); - - future::poll_fn(move || { - disco.poll_rpc(&mut client) - }) -} - -// ==== impl Background ===== - -impl Background -where - T: HttpService, - T::Error: fmt::Debug, -{ - fn new( - request_rx: mpsc::UnboundedReceiver, - dns_resolver: dns::Resolver, - namespaces: Namespaces, - ) -> Self { - Self { - dns_resolver, - namespaces, - destinations: HashMap::new(), - reconnects: VecDeque::new(), - rpc_ready: false, - request_rx, - } - } - - fn poll_rpc(&mut self, client: &mut Option) -> Poll<(), ()> { - // This loop is make sure any streams that were found disconnected - // in `poll_destinations` while the `rpc` service is ready should - // be reconnected now, otherwise the task would just sleep... - loop { - if let Async::Ready(()) = self.poll_resolve_requests(client) { - // request_rx has closed, meaning the main thread is terminating. - return Ok(Async::Ready(())); - } - self.retain_active_destinations(); - self.poll_destinations(); - - if self.reconnects.is_empty() || !self.rpc_ready { - return Ok(Async::NotReady); - } - } - } - - fn poll_resolve_requests(&mut self, client: &mut Option) -> Async<()> { - loop { - if let Some(client) = client { - // if rpc service isn't ready, not much we can do... - match client.poll_ready() { - Ok(Async::Ready(())) => { - self.rpc_ready = true; - }, - Ok(Async::NotReady) => { - self.rpc_ready = false; - return Async::NotReady; - }, - Err(err) => { - warn!("Destination.Get poll_ready error: {:?}", err); - self.rpc_ready = false; - return Async::NotReady; - }, - } - - // handle any pending reconnects first - if self.poll_reconnect(client) { - continue; - } - } - - // check for any new watches - match self.request_rx.poll() { - Ok(Async::Ready(Some(resolve))) => { - trace!("Destination.Get {:?}", resolve.authority); - match self.destinations.entry(resolve.authority) { - Entry::Occupied(mut occ) => { - let set = occ.get_mut(); - // we may already know of some addresses here, so push - // them onto the new watch first - match set.addrs { - Exists::Yes(ref cache) => for (&addr, meta) in cache { - let update = Update::Bind(addr, meta.clone()); - resolve.responder.update_tx - .unbounded_send(update) - .expect("unbounded_send does not fail"); - }, - Exists::No | Exists::Unknown => (), - } - set.responders.push(resolve.responder); - }, - Entry::Vacant(vac) => { - let pod_namespace = &self.namespaces.pod; - let query = client.as_mut().and_then(|client| { - Self::query_destination_service_if_relevant( - pod_namespace, - client, - vac.key(), - "connect", - ) - }); - let mut set = DestinationSet { - addrs: Exists::Unknown, - query, - dns_query: None, - responders: vec![resolve.responder], - }; - // If the authority is one for which the Destination service is never - // relevant (e.g. an absolute name that doesn't end in ".svc.$zone." in - // Kubernetes), or if we don't have a `client`, then immediately start - // polling DNS. - if set.query.is_none() { - set.reset_dns_query( - &self.dns_resolver, - Instant::now(), - vac.key(), - ); - } - vac.insert(set); - }, - } - }, - Ok(Async::Ready(None)) => { - trace!("Discover tx is dropped, shutdown"); - return Async::Ready(()); - }, - Ok(Async::NotReady) => return Async::NotReady, - Err(_) => unreachable!("unbounded receiver doesn't error"), - } - } - } - - /// Tries to reconnect next watch stream. Returns true if reconnection started. - fn poll_reconnect(&mut self, client: &mut T) -> bool { - debug_assert!(self.rpc_ready); - - while let Some(auth) = self.reconnects.pop_front() { - if let Some(set) = self.destinations.get_mut(&auth) { - set.query = Self::query_destination_service_if_relevant( - &self.namespaces.pod, - client, - &auth, - "reconnect", - ); - return true; - } else { - trace!("reconnect no longer needed: {:?}", auth); - } - } - false - } - - /// Ensures that `destinations` is updated to only maintain active resolutions. - /// - /// If there are no active resolutions for a destination, the destination is removed. - fn retain_active_destinations(&mut self) { - self.destinations.retain(|_, ref mut dst| { - dst.responders.retain(|r| r.is_active()); - dst.responders.len() > 0 - }) - } - - fn poll_destinations(&mut self) { - for (auth, set) in &mut self.destinations { - // Query the Destination service first. - let (new_query, found_by_destination_service) = match set.query.take() { - Some(Remote::ConnectedOrConnecting { rx }) => { - let (new_query, found_by_destination_service) = - set.poll_destination_service( - auth, rx, self.namespaces.tls_controller.as_ref().map(|s| s.as_ref())); - if let Remote::NeedsReconnect = new_query { - set.reset_on_next_modification(); - self.reconnects.push_back(auth.clone()); - } - (Some(new_query), found_by_destination_service) - }, - query => (query, Exists::Unknown), - }; - set.query = new_query; - - // Any active response from the Destination service cancels the DNS query except for a - // positive assertion that the service doesn't exist. - // - // Any disconnection from the Destination service has no effect on the DNS query; we - // assume that if we were querying DNS before, we should continue to do so, and if we - // weren't querying DNS then we shouldn't start now. In particular, temporary - // disruptions of connectivity to the Destination service do not cause a fallback to - // DNS. - match found_by_destination_service { - Exists::Yes(()) => { - // Stop polling DNS on any active update from the Destination service. - set.dns_query = None; - }, - Exists::No => { - // Fall back to DNS. - set.reset_dns_query(&self.dns_resolver, Instant::now(), auth); - }, - Exists::Unknown => (), // No change from Destination service's perspective. - } - - // Poll DNS after polling the Destination service. This may reset the DNS query but it - // won't affect the Destination Service query. - set.poll_dns(&self.dns_resolver, auth); - } - } - - /// Initiates a query `query` to the Destination service and returns it as - /// `Some(query)` if the given authority's host is of a form suitable for using to - /// query the Destination service. Otherwise, returns `None`. - fn query_destination_service_if_relevant( - default_destination_namespace: &str, - client: &mut T, - auth: &DnsNameAndPort, - connect_or_reconnect: &str, - ) -> Option> { - trace!( - "DestinationServiceQuery {} {:?}", - connect_or_reconnect, - auth - ); - FullyQualifiedAuthority::normalize(auth, default_destination_namespace).map(|auth| { - let req = GetDestination { - scheme: "k8s".into(), - path: auth.without_trailing_dot().to_owned(), - }; - let mut svc = Destination::new(client.lift_ref()); - let response = svc.get(grpc::Request::new(req)); - Remote::ConnectedOrConnecting { - rx: Receiver::new(response), - } - }) - } -} - -// ===== impl DestinationSet ===== - -impl DestinationSet -where - T: HttpService, - T::Error: fmt::Debug, -{ - fn reset_dns_query( - &mut self, - dns_resolver: &dns::Resolver, - deadline: Instant, - authority: &DnsNameAndPort, - ) { - trace!( - "resetting DNS query for {} at {:?}", - authority.host, - deadline - ); - self.reset_on_next_modification(); - self.dns_query = Some(dns_resolver.resolve_all_ips(deadline, &authority.host)); - } - - // Processes Destination service updates from `request_rx`, returning the new query - // and an indication of any *change* to whether the service exists as far as the - // Destination service is concerned, where `Exists::Unknown` is to be interpreted as - // "no change in existence" instead of "unknown". - fn poll_destination_service( - &mut self, - auth: &DnsNameAndPort, - mut rx: UpdateRx, - tls_controller_namespace: Option<&str>, - ) -> (DestinationServiceQuery, Exists<()>) { - let mut exists = Exists::Unknown; - - loop { - match rx.poll() { - Ok(Async::Ready(Some(update))) => match update.update { - Some(PbUpdate2::Add(a_set)) => { - let set_labels = a_set.metric_labels; - let addrs = a_set - .addrs - .into_iter() - .filter_map(|pb| - pb_to_addr_meta(pb, &set_labels, tls_controller_namespace)); - self.add(auth, addrs) - }, - Some(PbUpdate2::Remove(r_set)) => { - exists = Exists::Yes(()); - self.remove( - auth, - r_set - .addrs - .iter() - .filter_map(|addr| pb_to_sock_addr(addr.clone())), - ); - }, - Some(PbUpdate2::NoEndpoints(ref no_endpoints)) if no_endpoints.exists => { - exists = Exists::Yes(()); - self.no_endpoints(auth, no_endpoints.exists); - }, - Some(PbUpdate2::NoEndpoints(no_endpoints)) => { - debug_assert!(!no_endpoints.exists); - exists = Exists::No; - }, - None => (), - }, - Ok(Async::Ready(None)) => { - trace!( - "Destination.Get stream ended for {:?}, must reconnect", - auth - ); - return (Remote::NeedsReconnect, exists); - }, - Ok(Async::NotReady) => { - return (Remote::ConnectedOrConnecting { rx }, exists); - }, - Err(err) => { - warn!("Destination.Get stream errored for {:?}: {:?}", auth, err); - return (Remote::NeedsReconnect, exists); - }, - }; - } - } - - fn poll_dns(&mut self, dns_resolver: &dns::Resolver, authority: &DnsNameAndPort) { - // Duration to wait before polling DNS again after an error - // (or a NXDOMAIN response with no TTL). - const DNS_ERROR_TTL: Duration = Duration::from_secs(5); - - trace!("checking DNS for {:?}", authority); - while let Some(mut query) = self.dns_query.take() { - trace!("polling DNS for {:?}", authority); - let deadline = match query.poll() { - Ok(Async::NotReady) => { - trace!("DNS query not ready {:?}", authority); - self.dns_query = Some(query); - return; - }, - Ok(Async::Ready(dns::Response::Exists(ips))) => { - trace!( - "positive result of DNS query for {:?}: {:?}", - authority, - ips - ); - self.add( - authority, - ips.iter().map(|ip| { - ( - SocketAddr::from((ip, authority.port)), - Metadata::no_metadata(), - ) - }), - ); - - // Poll again after the deadline on the DNS response. - ips.valid_until() - }, - Ok(Async::Ready(dns::Response::DoesNotExist { retry_after })) => { - trace!( - "negative result (NXDOMAIN) of DNS query for {:?}", - authority - ); - self.no_endpoints(authority, false); - // Poll again after the deadline on the DNS response, if - // there is one. - retry_after.unwrap_or_else(|| Instant::now() + DNS_ERROR_TTL) - }, - Err(e) => { - // Do nothing so that the most recent non-error response is used until a - // non-error response is received - trace!("DNS resolution failed for {}: {}", &authority.host, e); - - // Poll again after the default wait time. - Instant::now() + DNS_ERROR_TTL - }, - }; - self.reset_dns_query(dns_resolver, deadline, &authority) - } - } -} - -impl> DestinationSet { - fn reset_on_next_modification(&mut self) { - match self.addrs { - Exists::Yes(ref mut cache) => { - cache.set_reset_on_next_modification(); - }, - Exists::No | Exists::Unknown => (), - } - } - - fn add(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_add: A) - where - A: Iterator, - { - let mut cache = match self.addrs.take() { - Exists::Yes(mut cache) => cache, - Exists::Unknown | Exists::No => Cache::new(), - }; - cache.update_union(addrs_to_add, &mut |change| { - Self::on_change(&mut self.responders, authority_for_logging, change) - }); - self.addrs = Exists::Yes(cache); - } - - fn remove(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_remove: A) - where - A: Iterator, - { - let cache = match self.addrs.take() { - Exists::Yes(mut cache) => { - cache.remove(addrs_to_remove, &mut |change| { - Self::on_change(&mut self.responders, authority_for_logging, change) - }); - cache - }, - Exists::Unknown | Exists::No => Cache::new(), - }; - self.addrs = Exists::Yes(cache); - } - - fn no_endpoints(&mut self, authority_for_logging: &DnsNameAndPort, exists: bool) { - trace!( - "no endpoints for {:?} that is known to {}", - authority_for_logging, - if exists { "exist" } else { "not exist" } - ); - match self.addrs.take() { - Exists::Yes(mut cache) => { - cache.clear(&mut |change| { - Self::on_change(&mut self.responders, authority_for_logging, change) - }); - }, - Exists::Unknown | Exists::No => (), - }; - self.addrs = if exists { - Exists::Yes(Cache::new()) - } else { - Exists::No - }; - } - - fn on_change( - responders: &mut Vec, - authority_for_logging: &DnsNameAndPort, - change: CacheChange, - ) { - let (update_str, update, addr) = match change { - CacheChange::Insertion { key, value } => { - ("insert", Update::Bind(key, value.clone()), key) - }, - CacheChange::Removal { key } => ("remove", Update::Remove(key), key), - CacheChange::Modification { key, new_value } => ( - "change metadata for", - Update::Bind(key, new_value.clone()), - key, - ), - }; - trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging); - // retain is used to drop any senders that are dead - responders.retain(|r| { - let sent = r.update_tx.unbounded_send(update.clone()); - sent.is_ok() - }); - } -} - -// ===== impl BindClient ===== - -impl BindClient { - fn new( - identity: Conditional, - dns_resolver: &dns::Resolver, - host_and_port: HostAndPort, - backoff_delay: Duration, - ) -> Self { - let log_ctx = ::logging::admin().client("control", host_and_port.clone()); - Self { - backoff_delay, - identity, - dns_resolver: dns_resolver.clone(), - host_and_port, - log_ctx, - } - } -} -impl Rebind for BindClient { - type Service = ClientService; - fn rebind( - &mut self, - client_cfg: &tls::ConditionalClientConfig, - ) -> Self::Service { - let conn_cfg = match (&self.identity, client_cfg) { - (Conditional::Some(ref id), Conditional::Some(ref cfg)) => - Conditional::Some(tls::ConnectionConfig { - server_identity: id.clone(), - config: cfg.clone(), - }), - (Conditional::None(ref reason), _) | - (_, Conditional::None(ref reason)) => - Conditional::None(reason.clone()), - }; - let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap(); - let authority = http::uri::Authority::from(&self.host_and_port); - let connect = Timeout::new( - LookupAddressAndConnect::new(self.host_and_port.clone(), - self.dns_resolver.clone(), - conn_cfg), - Duration::from_secs(3), - ); - let h2_client = tower_h2::client::Connect::new( - connect, - h2::client::Builder::default(), - self.log_ctx.clone().executor() - ); - - let reconnect = Reconnect::new(h2_client); - let log_errors = LogErrors::new(reconnect); - let backoff = Backoff::new(log_errors, self.backoff_delay); - // TODO: Use AddOrigin in tower-http - AddOrigin::new(scheme, authority, backoff) - } - -} - -/// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`. -fn pb_to_addr_meta( - pb: WeightedAddr, - set_labels: &HashMap, - tls_controller_namespace: Option<&str>, -) -> Option<(SocketAddr, Metadata)> { - let addr = pb.addr.and_then(pb_to_sock_addr)?; - - let mut labels = set_labels.iter() - .chain(pb.metric_labels.iter()) - .collect::>(); - labels.sort_by(|(k0, _), (k1, _)| k0.cmp(k1)); - - let mut tls_identity = - Conditional::None(tls::ReasonForNoIdentity::NotProvidedByServiceDiscovery); - if let Some(pb) = pb.tls_identity { - match tls::Identity::maybe_from_protobuf(tls_controller_namespace, pb) { - Ok(Some(identity)) => { - tls_identity = Conditional::Some(identity); - }, - Ok(None) => (), - Err(e) => { - error!("Failed to parse TLS identity: {:?}", e); - // XXX: Wallpaper over the error and keep going without TLS. - // TODO: Hard fail here once the TLS infrastructure has been - // validated. - }, - } - }; - - let meta = Metadata::new(DstLabels::new(labels.into_iter()), tls_identity); - Some((addr, meta)) -} - -fn pb_to_sock_addr(pb: TcpAddress) -> Option { - use linkerd2_proxy_api::net::ip_address::Ip; - use std::net::{Ipv4Addr, Ipv6Addr}; - /* - current structure is: - TcpAddress { - ip: Option, - }>, - port: u32, - } - */ - match pb.ip { - Some(ip) => match ip.ip { - Some(Ip::Ipv4(octets)) => { - let ipv4 = Ipv4Addr::from(octets); - Some(SocketAddr::from((ipv4, pb.port as u16))) - }, - Some(Ip::Ipv6(v6)) => { - let octets = [ - (v6.first >> 56) as u8, - (v6.first >> 48) as u8, - (v6.first >> 40) as u8, - (v6.first >> 32) as u8, - (v6.first >> 24) as u8, - (v6.first >> 16) as u8, - (v6.first >> 8) as u8, - v6.first as u8, - (v6.last >> 56) as u8, - (v6.last >> 48) as u8, - (v6.last >> 40) as u8, - (v6.last >> 32) as u8, - (v6.last >> 24) as u8, - (v6.last >> 16) as u8, - (v6.last >> 8) as u8, - v6.last as u8, - ]; - let ipv6 = Ipv6Addr::from(octets); - Some(SocketAddr::from((ipv6, pb.port as u16))) - }, - None => None, - }, - None => None, - } -} diff --git a/src/control/util.rs b/src/control/destination/background/client.rs similarity index 65% rename from src/control/util.rs rename to src/control/destination/background/client.rs index c731ead2d..6b54dcd16 100644 --- a/src/control/util.rs +++ b/src/control/destination/background/client.rs @@ -1,16 +1,46 @@ +use std::{ + fmt, + io, + time::{Duration, Instant}, +}; + +use bytes::Bytes; use futures::{Async, Future, Poll}; +use h2; use http; -use std::fmt; -use std::io; -use std::time::{Duration, Instant}; use tokio::timer::Delay; + +use tower_h2::{self, BoxBody}; use tower_service::Service; -use tower_h2; -use tower_reconnect::{Error as ReconnectError}; +use tower_reconnect::{Reconnect, Error as ReconnectError}; +use conditional::Conditional; +use dns; +use timeout::{Timeout, TimeoutError}; +use transport::{tls, HostAndPort, LookupAddressAndConnect}; +use watch_service::Rebind; -use timeout::TimeoutError; +/// Type of the client service stack used to make destination requests. +pub(super) type ClientService = AddOrigin, + ::logging::ContextualExecutor< + ::logging::Client< + &'static str, + HostAndPort + > + >, + BoxBody, + > + >>>>; -// ===== Backoff ===== +/// The state needed to bind a new controller client stack. +pub(super) struct BindClient { + backoff_delay: Duration, + identity: Conditional, + host_and_port: HostAndPort, + dns_resolver: dns::Resolver, + log_ctx: ::logging::Client<&'static str, HostAndPort>, +} /// Wait a duration if inner `poll_ready` returns an error. //TODO: move to tower-backoff @@ -21,6 +51,96 @@ pub(super) struct Backoff { wait_dur: Duration, } + +/// Wraps an HTTP service, injecting authority and scheme on every request. +pub(super) struct AddOrigin { + authority: http::uri::Authority, + inner: S, + scheme: http::uri::Scheme, +} + +/// Log errors talking to the controller in human format. +pub(super) struct LogErrors { + inner: S, +} + +// We want some friendly logs, but the stack of services don't have fmt::Display +// errors, so we have to build that ourselves. For now, this hard codes the +// expected error stack, and so any new middleware added will need to adjust this. +// +// The dead_code allowance is because rustc is being stupid and doesn't see it +// is used down below. +#[allow(dead_code)] +type LogError = ReconnectError< + tower_h2::client::Error, + tower_h2::client::ConnectError< + TimeoutError< + io::Error + > + > +>; + +// ===== impl BindClient ===== + +impl BindClient { + pub(super) fn new( + identity: Conditional, + dns_resolver: &dns::Resolver, + host_and_port: HostAndPort, + backoff_delay: Duration, + ) -> Self { + let log_ctx = ::logging::admin().client("control", host_and_port.clone()); + Self { + backoff_delay, + identity, + dns_resolver: dns_resolver.clone(), + host_and_port, + log_ctx, + } + } +} + +impl Rebind for BindClient { + type Service = ClientService; + fn rebind( + &mut self, + client_cfg: &tls::ConditionalClientConfig, + ) -> Self::Service { + let conn_cfg = match (&self.identity, client_cfg) { + (Conditional::Some(ref id), Conditional::Some(ref cfg)) => + Conditional::Some(tls::ConnectionConfig { + server_identity: id.clone(), + config: cfg.clone(), + }), + (Conditional::None(ref reason), _) | + (_, Conditional::None(ref reason)) => + Conditional::None(reason.clone()), + }; + let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap(); + let authority = http::uri::Authority::from(&self.host_and_port); + let connect = Timeout::new( + LookupAddressAndConnect::new(self.host_and_port.clone(), + self.dns_resolver.clone(), + conn_cfg), + Duration::from_secs(3), + ); + let h2_client = tower_h2::client::Connect::new( + connect, + h2::client::Builder::default(), + self.log_ctx.clone().executor() + ); + + let reconnect = Reconnect::new(h2_client); + let log_errors = LogErrors::new(reconnect); + let backoff = Backoff::new(log_errors, self.backoff_delay); + // TODO: Use AddOrigin in tower-http + AddOrigin::new(scheme, authority, backoff) + } + +} + +// ===== impl Backoff ===== + impl Backoff where S: Service, @@ -80,69 +200,8 @@ where } } -/// Wraps an HTTP service, injecting authority and scheme on every request. -pub(super) struct AddOrigin { - authority: http::uri::Authority, - inner: S, - scheme: http::uri::Scheme, -} -impl AddOrigin { - pub(super) fn new(scheme: http::uri::Scheme, auth: http::uri::Authority, service: S) -> Self { - AddOrigin { - authority: auth, - inner: service, - scheme, - } - } -} - -impl Service for AddOrigin -where - S: Service>, -{ - type Request = http::Request; - type Response = S::Response; - type Error = S::Error; - type Future = S::Future; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready() - } - - fn call(&mut self, req: Self::Request) -> Self::Future { - let (mut head, body) = req.into_parts(); - let mut uri: http::uri::Parts = head.uri.into(); - uri.scheme = Some(self.scheme.clone()); - uri.authority = Some(self.authority.clone()); - head.uri = http::Uri::from_parts(uri).expect("valid uri"); - - self.inner.call(http::Request::from_parts(head, body)) - } -} - -// ===== impl LogErrors - -/// Log errors talking to the controller in human format. -pub(super) struct LogErrors { - inner: S, -} - -// We want some friendly logs, but the stack of services don't have fmt::Display -// errors, so we have to build that ourselves. For now, this hard codes the -// expected error stack, and so any new middleware added will need to adjust this. -// -// The dead_code allowance is because rustc is being stupid and doesn't see it -// is used down below. -#[allow(dead_code)] -type LogError = ReconnectError< - tower_h2::client::Error, - tower_h2::client::ConnectError< - TimeoutError< - io::Error - > - > ->; +// ===== impl LogErrors ===== impl LogErrors where @@ -197,6 +256,42 @@ impl<'a> fmt::Display for HumanError<'a> { } } +// ===== impl AddOrigin ===== + +impl AddOrigin { + pub(super) fn new(scheme: http::uri::Scheme, auth: http::uri::Authority, service: S) -> Self { + AddOrigin { + authority: auth, + inner: service, + scheme, + } + } +} + +impl Service for AddOrigin +where + S: Service>, +{ + type Request = http::Request; + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.inner.poll_ready() + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + let (mut head, body) = req.into_parts(); + let mut uri: http::uri::Parts = head.uri.into(); + uri.scheme = Some(self.scheme.clone()); + uri.authority = Some(self.authority.clone()); + head.uri = http::Uri::from_parts(uri).expect("valid uri"); + + self.inner.call(http::Request::from_parts(head, body)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/control/destination/background/destination_set.rs b/src/control/destination/background/destination_set.rs new file mode 100644 index 000000000..86eef319b --- /dev/null +++ b/src/control/destination/background/destination_set.rs @@ -0,0 +1,352 @@ +use std::{ + collections::HashMap, + fmt, + iter::IntoIterator, + net::SocketAddr, + time::{Instant, Duration}, +}; + +use futures::{Async, Future, Stream,}; +use tower_h2::{BoxBody, HttpService, RecvBody}; + +use linkerd2_proxy_api::{ + destination::{ + update::Update as PbUpdate2, + WeightedAddr, + }, + net::TcpAddress, +}; + +use super::super::{Metadata, Responder, Update}; +use control::{ + cache::{Cache, CacheChange, Exists}, + remote_stream::Remote, +}; +use dns::{self, IpAddrListFuture}; +use telemetry::metrics::DstLabels; +use transport::{tls, DnsNameAndPort}; +use conditional::Conditional; + +use super::{DestinationServiceQuery, UpdateRx}; + +/// Holds the state of a single resolution. +pub(super) struct DestinationSet> { + pub addrs: Exists>, + pub query: Option>, + pub dns_query: Option, + pub responders: Vec, +} + +// ===== impl DestinationSet ===== + +impl DestinationSet +where + T: HttpService, + T::Error: fmt::Debug, +{ + pub(super) fn reset_dns_query( + &mut self, + dns_resolver: &dns::Resolver, + deadline: Instant, + authority: &DnsNameAndPort, + ) { + trace!( + "resetting DNS query for {} at {:?}", + authority.host, + deadline + ); + self.reset_on_next_modification(); + self.dns_query = Some(dns_resolver.resolve_all_ips(deadline, &authority.host)); + } + + // Processes Destination service updates from `request_rx`, returning the new query + // and an indication of any *change* to whether the service exists as far as the + // Destination service is concerned, where `Exists::Unknown` is to be interpreted as + // "no change in existence" instead of "unknown". + pub(super) fn poll_destination_service( + &mut self, + auth: &DnsNameAndPort, + mut rx: UpdateRx, + tls_controller_namespace: Option<&str>, + ) -> (DestinationServiceQuery, Exists<()>) { + let mut exists = Exists::Unknown; + + loop { + match rx.poll() { + Ok(Async::Ready(Some(update))) => match update.update { + Some(PbUpdate2::Add(a_set)) => { + let set_labels = a_set.metric_labels; + let addrs = a_set + .addrs + .into_iter() + .filter_map(|pb| + pb_to_addr_meta(pb, &set_labels, tls_controller_namespace)); + self.add(auth, addrs) + }, + Some(PbUpdate2::Remove(r_set)) => { + exists = Exists::Yes(()); + self.remove( + auth, + r_set + .addrs + .iter() + .filter_map(|addr| pb_to_sock_addr(addr.clone())), + ); + }, + Some(PbUpdate2::NoEndpoints(ref no_endpoints)) if no_endpoints.exists => { + exists = Exists::Yes(()); + self.no_endpoints(auth, no_endpoints.exists); + }, + Some(PbUpdate2::NoEndpoints(no_endpoints)) => { + debug_assert!(!no_endpoints.exists); + exists = Exists::No; + }, + None => (), + }, + Ok(Async::Ready(None)) => { + trace!( + "Destination.Get stream ended for {:?}, must reconnect", + auth + ); + return (Remote::NeedsReconnect, exists); + }, + Ok(Async::NotReady) => { + return (Remote::ConnectedOrConnecting { rx }, exists); + }, + Err(err) => { + warn!("Destination.Get stream errored for {:?}: {:?}", auth, err); + return (Remote::NeedsReconnect, exists); + }, + }; + } + } + + pub(super) fn poll_dns(&mut self, dns_resolver: &dns::Resolver, authority: &DnsNameAndPort) { + // Duration to wait before polling DNS again after an error + // (or a NXDOMAIN response with no TTL). + const DNS_ERROR_TTL: Duration = Duration::from_secs(5); + + trace!("checking DNS for {:?}", authority); + while let Some(mut query) = self.dns_query.take() { + trace!("polling DNS for {:?}", authority); + let deadline = match query.poll() { + Ok(Async::NotReady) => { + trace!("DNS query not ready {:?}", authority); + self.dns_query = Some(query); + return; + }, + Ok(Async::Ready(dns::Response::Exists(ips))) => { + trace!( + "positive result of DNS query for {:?}: {:?}", + authority, + ips + ); + self.add( + authority, + ips.iter().map(|ip| { + ( + SocketAddr::from((ip, authority.port)), + Metadata::no_metadata(), + ) + }), + ); + + // Poll again after the deadline on the DNS response. + ips.valid_until() + }, + Ok(Async::Ready(dns::Response::DoesNotExist { retry_after })) => { + trace!( + "negative result (NXDOMAIN) of DNS query for {:?}", + authority + ); + self.no_endpoints(authority, false); + // Poll again after the deadline on the DNS response, if + // there is one. + retry_after.unwrap_or_else(|| Instant::now() + DNS_ERROR_TTL) + }, + Err(e) => { + // Do nothing so that the most recent non-error response is used until a + // non-error response is received + trace!("DNS resolution failed for {}: {}", &authority.host, e); + + // Poll again after the default wait time. + Instant::now() + DNS_ERROR_TTL + }, + }; + self.reset_dns_query(dns_resolver, deadline, &authority) + } + } +} + +impl> DestinationSet { + pub(super) fn reset_on_next_modification(&mut self) { + match self.addrs { + Exists::Yes(ref mut cache) => { + cache.set_reset_on_next_modification(); + }, + Exists::No | Exists::Unknown => (), + } + } + + fn add(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_add: A) + where + A: Iterator, + { + let mut cache = match self.addrs.take() { + Exists::Yes(mut cache) => cache, + Exists::Unknown | Exists::No => Cache::new(), + }; + cache.update_union(addrs_to_add, &mut |change| { + Self::on_change(&mut self.responders, authority_for_logging, change) + }); + self.addrs = Exists::Yes(cache); + } + + fn remove(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_remove: A) + where + A: Iterator, + { + let cache = match self.addrs.take() { + Exists::Yes(mut cache) => { + cache.remove(addrs_to_remove, &mut |change| { + Self::on_change(&mut self.responders, authority_for_logging, change) + }); + cache + }, + Exists::Unknown | Exists::No => Cache::new(), + }; + self.addrs = Exists::Yes(cache); + } + + fn no_endpoints(&mut self, authority_for_logging: &DnsNameAndPort, exists: bool) { + trace!( + "no endpoints for {:?} that is known to {}", + authority_for_logging, + if exists { "exist" } else { "not exist" } + ); + match self.addrs.take() { + Exists::Yes(mut cache) => { + cache.clear(&mut |change| { + Self::on_change(&mut self.responders, authority_for_logging, change) + }); + }, + Exists::Unknown | Exists::No => (), + }; + self.addrs = if exists { + Exists::Yes(Cache::new()) + } else { + Exists::No + }; + } + + fn on_change( + responders: &mut Vec, + authority_for_logging: &DnsNameAndPort, + change: CacheChange, + ) { + let (update_str, update, addr) = match change { + CacheChange::Insertion { key, value } => { + ("insert", Update::Bind(key, value.clone()), key) + }, + CacheChange::Removal { key } => ("remove", Update::Remove(key), key), + CacheChange::Modification { key, new_value } => ( + "change metadata for", + Update::Bind(key, new_value.clone()), + key, + ), + }; + trace!("{} {:?} for {:?}", update_str, addr, authority_for_logging); + // retain is used to drop any senders that are dead + responders.retain(|r| { + let sent = r.update_tx.unbounded_send(update.clone()); + sent.is_ok() + }); + } +} + + +/// Construct a new labeled `SocketAddr `from a protobuf `WeightedAddr`. +fn pb_to_addr_meta( + pb: WeightedAddr, + set_labels: &HashMap, + tls_controller_namespace: Option<&str>, +) -> Option<(SocketAddr, Metadata)> { + let addr = pb.addr.and_then(pb_to_sock_addr)?; + + let mut labels = set_labels.iter() + .chain(pb.metric_labels.iter()) + .collect::>(); + labels.sort_by(|(k0, _), (k1, _)| k0.cmp(k1)); + + let mut tls_identity = + Conditional::None(tls::ReasonForNoIdentity::NotProvidedByServiceDiscovery); + if let Some(pb) = pb.tls_identity { + match tls::Identity::maybe_from_protobuf(tls_controller_namespace, pb) { + Ok(Some(identity)) => { + tls_identity = Conditional::Some(identity); + }, + Ok(None) => (), + Err(e) => { + error!("Failed to parse TLS identity: {:?}", e); + // XXX: Wallpaper over the error and keep going without TLS. + // TODO: Hard fail here once the TLS infrastructure has been + // validated. + }, + } + }; + + let meta = Metadata::new(DstLabels::new(labels.into_iter()), tls_identity); + Some((addr, meta)) +} + +fn pb_to_sock_addr(pb: TcpAddress) -> Option { + use linkerd2_proxy_api::net::ip_address::Ip; + use std::net::{Ipv4Addr, Ipv6Addr}; + /* + current structure is: + TcpAddress { + ip: Option, + }>, + port: u32, + } + */ + match pb.ip { + Some(ip) => match ip.ip { + Some(Ip::Ipv4(octets)) => { + let ipv4 = Ipv4Addr::from(octets); + Some(SocketAddr::from((ipv4, pb.port as u16))) + }, + Some(Ip::Ipv6(v6)) => { + let octets = [ + (v6.first >> 56) as u8, + (v6.first >> 48) as u8, + (v6.first >> 40) as u8, + (v6.first >> 32) as u8, + (v6.first >> 24) as u8, + (v6.first >> 16) as u8, + (v6.first >> 8) as u8, + v6.first as u8, + (v6.last >> 56) as u8, + (v6.last >> 48) as u8, + (v6.last >> 40) as u8, + (v6.last >> 32) as u8, + (v6.last >> 24) as u8, + (v6.last >> 16) as u8, + (v6.last >> 8) as u8, + v6.last as u8, + ]; + let ipv6 = Ipv6Addr::from(octets); + Some(SocketAddr::from((ipv6, pb.port as u16))) + }, + None => None, + }, + None => None, + } +} diff --git a/src/control/destination/background/mod.rs b/src/control/destination/background/mod.rs new file mode 100644 index 000000000..6ab0c5685 --- /dev/null +++ b/src/control/destination/background/mod.rs @@ -0,0 +1,336 @@ +use std::{ + collections::{ + hash_map::{Entry, HashMap}, + VecDeque, + }, + fmt, + time::{Instant, Duration}, +}; +use futures::{ + future, + sync::mpsc, + Async, Future, Poll, Stream, +}; +use tower_grpc as grpc; +use tower_h2::{BoxBody, HttpService, RecvBody}; + +use linkerd2_proxy_api::destination::client::Destination; +use linkerd2_proxy_api::destination::{ + GetDestination, + Update as PbUpdate, +}; + +use super::{ResolveRequest, Update}; +use config::Namespaces; +use control::{ + cache::Exists, + fully_qualified_authority::FullyQualifiedAuthority, + remote_stream::{Receiver, Remote}, +}; +use dns; +use transport::{tls, DnsNameAndPort, HostAndPort}; +use conditional::Conditional; +use watch_service::WatchService; +use futures_watch::Watch; + +mod client; +mod destination_set; + +use self::{ + client::BindClient, + destination_set::DestinationSet, +}; + +type DestinationServiceQuery = Remote; +type UpdateRx = Receiver; + +/// Satisfies resolutions as requested via `request_rx`. +/// +/// As the `Background` is polled with a client to Destination service, if the client to the +/// service is healthy, it reads requests from `request_rx`, determines how to resolve the +/// provided authority to a set of addresses, and ensures that resolution updates are +/// propagated to all requesters. +struct Background> { + dns_resolver: dns::Resolver, + namespaces: Namespaces, + destinations: HashMap>, + /// A queue of authorities that need to be reconnected. + reconnects: VecDeque, + /// The Destination.Get RPC client service. + /// Each poll, records whether the rpc service was till ready. + rpc_ready: bool, + /// A receiver of new watch requests. + request_rx: mpsc::UnboundedReceiver, +} + +/// Returns a new discovery background task. +pub(super) fn task( + request_rx: mpsc::UnboundedReceiver, + dns_resolver: dns::Resolver, + namespaces: Namespaces, + host_and_port: Option, + controller_tls: tls::ConditionalConnectionConfig, + control_backoff_delay: Duration, +) -> impl Future +{ + // Build up the Controller Client Stack + let mut client = host_and_port.map(|host_and_port| { + let (identity, watch) = match controller_tls { + Conditional::Some(cfg) => + (Conditional::Some(cfg.server_identity), cfg.config), + Conditional::None(reason) => { + // If there's no connection config, then construct a new + // `Watch` that never updates to construct the `WatchService`. + // We do this here rather than calling `ClientConfig::no_tls` + // in order to propagate the reason for no TLS to the watch. + let (watch, _) = Watch::new(Conditional::None(reason)); + (Conditional::None(reason), watch) + }, + }; + let bind_client = BindClient::new( + identity, + &dns_resolver, + host_and_port, + control_backoff_delay, + ); + WatchService::new(watch, bind_client) + }); + + let mut disco = Background::new( + request_rx, + dns_resolver, + namespaces, + ); + + future::poll_fn(move || { + disco.poll_rpc(&mut client) + }) +} + +// ==== impl Background ===== + +impl Background +where + T: HttpService, + T::Error: fmt::Debug, +{ + fn new( + request_rx: mpsc::UnboundedReceiver, + dns_resolver: dns::Resolver, + namespaces: Namespaces, + ) -> Self { + Self { + dns_resolver, + namespaces, + destinations: HashMap::new(), + reconnects: VecDeque::new(), + rpc_ready: false, + request_rx, + } + } + + fn poll_rpc(&mut self, client: &mut Option) -> Poll<(), ()> { + // This loop is make sure any streams that were found disconnected + // in `poll_destinations` while the `rpc` service is ready should + // be reconnected now, otherwise the task would just sleep... + loop { + if let Async::Ready(()) = self.poll_resolve_requests(client) { + // request_rx has closed, meaning the main thread is terminating. + return Ok(Async::Ready(())); + } + self.retain_active_destinations(); + self.poll_destinations(); + + if self.reconnects.is_empty() || !self.rpc_ready { + return Ok(Async::NotReady); + } + } + } + + fn poll_resolve_requests(&mut self, client: &mut Option) -> Async<()> { + loop { + if let Some(client) = client { + // if rpc service isn't ready, not much we can do... + match client.poll_ready() { + Ok(Async::Ready(())) => { + self.rpc_ready = true; + }, + Ok(Async::NotReady) => { + self.rpc_ready = false; + return Async::NotReady; + }, + Err(err) => { + warn!("Destination.Get poll_ready error: {:?}", err); + self.rpc_ready = false; + return Async::NotReady; + }, + } + + // handle any pending reconnects first + if self.poll_reconnect(client) { + continue; + } + } + + // check for any new watches + match self.request_rx.poll() { + Ok(Async::Ready(Some(resolve))) => { + trace!("Destination.Get {:?}", resolve.authority); + match self.destinations.entry(resolve.authority) { + Entry::Occupied(mut occ) => { + let set = occ.get_mut(); + // we may already know of some addresses here, so push + // them onto the new watch first + match set.addrs { + Exists::Yes(ref cache) => for (&addr, meta) in cache { + let update = Update::Bind(addr, meta.clone()); + resolve.responder.update_tx + .unbounded_send(update) + .expect("unbounded_send does not fail"); + }, + Exists::No | Exists::Unknown => (), + } + set.responders.push(resolve.responder); + }, + Entry::Vacant(vac) => { + let pod_namespace = &self.namespaces.pod; + let query = client.as_mut().and_then(|client| { + Self::query_destination_service_if_relevant( + pod_namespace, + client, + vac.key(), + "connect", + ) + }); + let mut set = DestinationSet { + addrs: Exists::Unknown, + query, + dns_query: None, + responders: vec![resolve.responder], + }; + // If the authority is one for which the Destination service is never + // relevant (e.g. an absolute name that doesn't end in ".svc.$zone." in + // Kubernetes), or if we don't have a `client`, then immediately start + // polling DNS. + if set.query.is_none() { + set.reset_dns_query( + &self.dns_resolver, + Instant::now(), + vac.key(), + ); + } + vac.insert(set); + }, + } + }, + Ok(Async::Ready(None)) => { + trace!("Discover tx is dropped, shutdown"); + return Async::Ready(()); + }, + Ok(Async::NotReady) => return Async::NotReady, + Err(_) => unreachable!("unbounded receiver doesn't error"), + } + } + } + + /// Tries to reconnect next watch stream. Returns true if reconnection started. + fn poll_reconnect(&mut self, client: &mut T) -> bool { + debug_assert!(self.rpc_ready); + + while let Some(auth) = self.reconnects.pop_front() { + if let Some(set) = self.destinations.get_mut(&auth) { + set.query = Self::query_destination_service_if_relevant( + &self.namespaces.pod, + client, + &auth, + "reconnect", + ); + return true; + } else { + trace!("reconnect no longer needed: {:?}", auth); + } + } + false + } + + /// Ensures that `destinations` is updated to only maintain active resolutions. + /// + /// If there are no active resolutions for a destination, the destination is removed. + fn retain_active_destinations(&mut self) { + self.destinations.retain(|_, ref mut dst| { + dst.responders.retain(|r| r.is_active()); + dst.responders.len() > 0 + }) + } + + fn poll_destinations(&mut self) { + for (auth, set) in &mut self.destinations { + // Query the Destination service first. + let (new_query, found_by_destination_service) = match set.query.take() { + Some(Remote::ConnectedOrConnecting { rx }) => { + let (new_query, found_by_destination_service) = + set.poll_destination_service( + auth, rx, self.namespaces.tls_controller.as_ref().map(|s| s.as_ref())); + if let Remote::NeedsReconnect = new_query { + set.reset_on_next_modification(); + self.reconnects.push_back(auth.clone()); + } + (Some(new_query), found_by_destination_service) + }, + query => (query, Exists::Unknown), + }; + set.query = new_query; + + // Any active response from the Destination service cancels the DNS query except for a + // positive assertion that the service doesn't exist. + // + // Any disconnection from the Destination service has no effect on the DNS query; we + // assume that if we were querying DNS before, we should continue to do so, and if we + // weren't querying DNS then we shouldn't start now. In particular, temporary + // disruptions of connectivity to the Destination service do not cause a fallback to + // DNS. + match found_by_destination_service { + Exists::Yes(()) => { + // Stop polling DNS on any active update from the Destination service. + set.dns_query = None; + }, + Exists::No => { + // Fall back to DNS. + set.reset_dns_query(&self.dns_resolver, Instant::now(), auth); + }, + Exists::Unknown => (), // No change from Destination service's perspective. + } + + // Poll DNS after polling the Destination service. This may reset the DNS query but it + // won't affect the Destination Service query. + set.poll_dns(&self.dns_resolver, auth); + } + } + + /// Initiates a query `query` to the Destination service and returns it as + /// `Some(query)` if the given authority's host is of a form suitable for using to + /// query the Destination service. Otherwise, returns `None`. + fn query_destination_service_if_relevant( + default_destination_namespace: &str, + client: &mut T, + auth: &DnsNameAndPort, + connect_or_reconnect: &str, + ) -> Option> { + trace!( + "DestinationServiceQuery {} {:?}", + connect_or_reconnect, + auth + ); + FullyQualifiedAuthority::normalize(auth, default_destination_namespace).map(|auth| { + let req = GetDestination { + scheme: "k8s".into(), + path: auth.without_trailing_dot().to_owned(), + }; + let mut svc = Destination::new(client.lift_ref()); + let response = svc.get(grpc::Request::new(req)); + Remote::ConnectedOrConnecting { + rx: Receiver::new(response), + } + }) + } +} diff --git a/src/control/mod.rs b/src/control/mod.rs index 7360896b4..3d3fbbc62 100644 --- a/src/control/mod.rs +++ b/src/control/mod.rs @@ -4,7 +4,6 @@ mod fully_qualified_authority; mod observe; pub mod pb; mod remote_stream; -mod util; pub use self::destination::Bind; pub use self::observe::Observe;