diff --git a/proxy/src/control/destination/background.rs b/proxy/src/control/destination/background.rs index 0bfca8159..6b44de6a6 100644 --- a/proxy/src/control/destination/background.rs +++ b/proxy/src/control/destination/background.rs @@ -1,5 +1,3 @@ -use futures::sync::mpsc; -use futures::{Async, Future, Stream}; use std::collections::{ hash_map::{Entry, HashMap}, VecDeque, @@ -8,8 +6,18 @@ use std::fmt; use std::iter::IntoIterator; use std::net::SocketAddr; use std::time::Duration; + +use bytes::Bytes; +use futures::{ + future, + sync::mpsc, + Async, Future, Stream, +}; +use h2; +use http; use tower_grpc as grpc; -use tower_h2::{BoxBody, HttpService, RecvBody}; +use tower_h2::{self, BoxBody, HttpService, RecvBody}; +use tower_reconnect::Reconnect; use conduit_proxy_controller_grpc::common::{Destination, TcpAddress}; use conduit_proxy_controller_grpc::destination::client::Destination as DestinationSvc; @@ -17,31 +25,28 @@ use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2; use conduit_proxy_controller_grpc::destination::{Update as PbUpdate, WeightedAddr}; use super::{Metadata, ResolveRequest, Responder, Update}; -use control::cache::{Cache, CacheChange, Exists}; -use control::fully_qualified_authority::FullyQualifiedAuthority; -use control::remote_stream::{Receiver, Remote}; +use control::{ + cache::{Cache, CacheChange, Exists}, + fully_qualified_authority::FullyQualifiedAuthority, + remote_stream::{Receiver, Remote}, + AddOrigin, Backoff, LogErrors +}; use dns::{self, IpAddrListFuture}; +use task::LazyExecutor; use telemetry::metrics::DstLabels; -use transport::DnsNameAndPort; +use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect}; +use timeout::Timeout; type DestinationServiceQuery = Remote; type UpdateRx = Receiver; -/// Stores the configuration for a destination background worker. -#[derive(Debug)] -pub struct Config { - request_rx: mpsc::UnboundedReceiver, - dns_config: dns::Config, - default_destination_namespace: String, -} - /// Satisfies resolutions as requested via `request_rx`. /// -/// As `Process` is polled with a client to Destination service, if the client to the +/// As the `Background` is polled with a client to Destination service, if the client to the /// service is healthy, it reads requests from `request_rx`, determines how to resolve the /// provided authority to a set of addresses, and ensures that resolution updates are /// propagated to all requesters. -pub struct Process> { +struct Background> { dns_resolver: dns::Resolver, default_destination_namespace: String, destinations: HashMap>, @@ -62,46 +67,74 @@ struct DestinationSet> { responders: Vec, } -// ==== impl Config ===== -impl Config { - pub(super) fn new( - request_rx: mpsc::UnboundedReceiver, - dns_config: dns::Config, - default_destination_namespace: String, - ) -> Self { - Self { - request_rx, - dns_config, - default_destination_namespace, - } - } +/// Returns a new discovery background task. +pub(super) fn task( + request_rx: mpsc::UnboundedReceiver, + dns_resolver: dns::Resolver, + default_destination_namespace: String, + host_and_port: HostAndPort, +) -> impl Future +{ + // Build up the Controller Client Stack + let mut client = { + let ctx = ("controller-client", format!("{:?}", host_and_port)); + let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap(); + let authority = http::uri::Authority::from(&host_and_port); + let connect = Timeout::new( + LookupAddressAndConnect::new(host_and_port, dns_resolver.clone()), + Duration::from_secs(3), + ); - /// Bind this handle to start talking to the controller API. - pub fn process(self) -> Process - where - T: HttpService, - T::Error: fmt::Debug, - { - Process { - dns_resolver: dns::Resolver::new(self.dns_config), - default_destination_namespace: self.default_destination_namespace, - destinations: HashMap::new(), - reconnects: VecDeque::new(), - rpc_ready: false, - request_rx: self.request_rx, - } - } + let h2_client = tower_h2::client::Connect::new( + connect, + h2::client::Builder::default(), + ::logging::context_executor(ctx, LazyExecutor), + ); + + let reconnect = Reconnect::new(h2_client); + let log_errors = LogErrors::new(reconnect); + let backoff = Backoff::new(log_errors, Duration::from_secs(5)); + // TODO: Use AddOrigin in tower-http + AddOrigin::new(scheme, authority, backoff) + }; + + let mut disco = Background::new( + request_rx, + dns_resolver, + default_destination_namespace, + ); + + future::poll_fn(move || { + disco.poll_rpc(&mut client); + + Ok(Async::NotReady) + }) } -// ==== impl Process ===== +// ==== impl Background ===== -impl Process +impl Background where T: HttpService, T::Error: fmt::Debug, { - pub fn poll_rpc(&mut self, client: &mut T) { + fn new( + request_rx: mpsc::UnboundedReceiver, + dns_resolver: dns::Resolver, + default_destination_namespace: String, + ) -> Self { + Self { + dns_resolver, + default_destination_namespace, + destinations: HashMap::new(), + reconnects: VecDeque::new(), + rpc_ready: false, + request_rx, + } + } + + fn poll_rpc(&mut self, client: &mut T) { // This loop is make sure any streams that were found disconnected // in `poll_destinations` while the `rpc` service is ready should // be reconnected now, otherwise the task would just sleep... diff --git a/proxy/src/control/destination/mod.rs b/proxy/src/control/destination/mod.rs index aeb2d5535..724faa9c1 100644 --- a/proxy/src/control/destination/mod.rs +++ b/proxy/src/control/destination/mod.rs @@ -28,8 +28,13 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, Weak}; -use futures::sync::mpsc; -use futures::{Async, Poll, Stream}; +use futures::{ + sync::mpsc, + Future, + Async, + Poll, + Stream +}; use futures_watch::{Store, Watch}; use http; use tower_discover::{Change, Discover}; @@ -37,14 +42,14 @@ use tower_service::Service; use dns; use telemetry::metrics::DstLabels; -use transport::DnsNameAndPort; +use transport::{DnsNameAndPort, HostAndPort}; pub mod background; mod endpoint; pub use self::endpoint::{DstLabelsWatch, Endpoint}; -/// A handle to request resolutions from a `Background`. +/// A handle to request resolutions from the background discovery task. #[derive(Clone, Debug)] pub struct Resolver { request_tx: mpsc::UnboundedSender, @@ -127,17 +132,24 @@ pub trait Bind { fn bind(&self, addr: &Self::Endpoint) -> Result; } -/// Creates a "channel" of `Resolver` to `Background` handles. +/// Returns a `Resolver` and a background task future. /// -/// The `Resolver` is used by a listener, the `Background` is consumed -/// on the controller thread. +/// The `Resolver` is used by a listener to request resolutions, while +/// the background future is executed on the controller thread's executor +/// to drive the background task. pub fn new( - dns_config: dns::Config, + dns_resolver: dns::Resolver, default_destination_namespace: String, -) -> (Resolver, background::Config) { + host_and_port: HostAndPort, +) -> (Resolver, impl Future) { let (request_tx, rx) = mpsc::unbounded(); let disco = Resolver { request_tx }; - let bg = background::Config::new(rx, dns_config, default_destination_namespace); + let bg = background::task( + rx, + dns_resolver, + default_destination_namespace, + host_and_port, + ); (disco, bg) } diff --git a/proxy/src/control/mod.rs b/proxy/src/control/mod.rs index 976baf0a3..088d38064 100644 --- a/proxy/src/control/mod.rs +++ b/proxy/src/control/mod.rs @@ -2,19 +2,14 @@ use std::fmt; use std::io; use std::time::{Duration, Instant}; -use bytes::Bytes; -use futures::{future, Async, Future, Poll}; -use h2; +use futures::{Async, Future, Poll}; use http; use tokio::timer::Delay; use tower_service::Service; use tower_h2; -use tower_reconnect::{Error as ReconnectError, Reconnect}; +use tower_reconnect::{Error as ReconnectError}; -use dns; -use task::LazyExecutor; -use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect}; -use timeout::{Timeout, TimeoutError}; +use timeout::TimeoutError; mod cache; pub mod destination; @@ -23,85 +18,9 @@ mod observe; pub mod pb; mod remote_stream; -use self::destination::{Resolver, Resolution}; pub use self::destination::Bind; pub use self::observe::Observe; -#[derive(Clone)] -pub struct Control { - disco: Resolver, -} - -pub struct Background { - disco: destination::background::Config, -} - -pub fn new(dns_config: dns::Config, default_destination_namespace: String) -> (Control, Background) -{ - let (tx, rx) = self::destination::new(dns_config, default_destination_namespace); - - let c = Control { - disco: tx, - }; - - let b = Background { - disco: rx, - }; - - (c, b) -} - -// ===== impl Control ===== - -impl Control { - pub fn resolve(&self, auth: &DnsNameAndPort, bind: B) -> Resolution { - self.disco.resolve(auth, bind) - } -} - -// ===== impl Background ===== - -impl Background { - pub fn bind( - self, - host_and_port: HostAndPort, - dns_config: dns::Config, - ) -> Box> { - // Build up the Controller Client Stack - let mut client = { - let ctx = ("controller-client", format!("{:?}", host_and_port)); - let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap(); - let authority = http::uri::Authority::from(&host_and_port); - let dns_resolver = dns::Resolver::new(dns_config); - let connect = Timeout::new( - LookupAddressAndConnect::new(host_and_port, dns_resolver), - Duration::from_secs(3), - ); - - let h2_client = tower_h2::client::Connect::new( - connect, - h2::client::Builder::default(), - ::logging::context_executor(ctx, LazyExecutor), - ); - - let reconnect = Reconnect::new(h2_client); - let log_errors = LogErrors::new(reconnect); - let backoff = Backoff::new(log_errors, Duration::from_secs(5)); - // TODO: Use AddOrigin in tower-http - AddOrigin::new(scheme, authority, backoff) - }; - - let mut disco = self.disco.process(); - - let fut = future::poll_fn(move || { - disco.poll_rpc(&mut client); - - Ok(Async::NotReady) - }); - - Box::new(fut) - } -} // ===== Backoff ===== diff --git a/proxy/src/dns.rs b/proxy/src/dns.rs index a8d2a5537..7016427eb 100644 --- a/proxy/src/dns.rs +++ b/proxy/src/dns.rs @@ -11,18 +11,13 @@ use trust_dns_resolver::ResolverFuture; use trust_dns_resolver::lookup_ip::LookupIp; #[derive(Clone, Debug)] -pub struct Config { +pub struct Resolver { config: ResolverConfig, opts: ResolverOpts, } -#[derive(Clone, Debug)] -pub struct Resolver { - config: Config, -} - pub enum IpAddrFuture { - DNS(Box>), + DNS(Box + Send>), Fixed(IpAddr), } @@ -37,7 +32,7 @@ pub enum Response { } // `Box` implements `Future` so it doesn't need to be implemented manually. -pub type IpAddrListFuture = Box>; +pub type IpAddrListFuture = Box + Send>; /// A DNS name. #[derive(Clone, Debug, Eq, Hash, PartialEq)] @@ -63,23 +58,19 @@ impl AsRef for Name { } } -impl Config { +impl Resolver { /// TODO: Make this infallible, like it is in the `domain` crate. pub fn from_system_config() -> Result { let (config, opts) = trust_dns_resolver::system_conf::read_system_conf()?; trace!("DNS config: {:?}", &config); trace!("DNS opts: {:?}", &opts); - Ok(Config { - config, - opts - }) + Ok(Self::new(config, opts)) } -} -impl Resolver { - pub fn new(config: Config) -> Self { + pub fn new(config: ResolverConfig, opts: ResolverOpts) -> Self { Resolver { config, + opts, } } @@ -126,8 +117,8 @@ impl Resolver { { let name = name.clone(); // TODO: ref-count names. let resolver = ResolverFuture::new( - self.config.config, - self.config.opts + self.config, + self.opts ); resolver.and_then(move |r| r.lookup_ip(name.as_str())) } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index a2ccf52b4..4a0676b7e 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -203,19 +203,25 @@ where config.outbound_ports_disable_protocol_detection, ); + let (taps, observe) = control::Observe::new(100); let (sensors, telemetry) = telemetry::new( &process_ctx, config.event_buffer_capacity, config.metrics_retain_idle, + &taps, ); - let dns_config = dns::Config::from_system_config() + let dns_resolver = dns::Resolver::from_system_config() .unwrap_or_else(|e| { // TODO: Make DNS configuration infallible. panic!("invalid DNS configuration: {:?}", e); }); - let (control, control_bg) = control::new(dns_config.clone(), config.pod_namespace.clone()); + let (control, control_bg) = control::destination::new( + dns_resolver.clone(), + config.pod_namespace.clone(), + control_host_and_port + ); let (drain_tx, drain_rx) = drain::channel(); @@ -285,24 +291,14 @@ where let mut rt = current_thread::Runtime::new() .expect("initialize controller-client thread runtime"); - let (taps, observe) = control::Observe::new(100); let new_service = TapServer::new(observe); let server = serve_control(control_listener, new_service); - let telemetry = telemetry - .make_control(&taps) - .expect("bad news in telemetry town"); - let metrics_server = telemetry .serve_metrics(metrics_listener); - let client = control_bg.bind( - control_host_and_port, - dns_config, - ); - - let fut = client.join4( + let fut = control_bg.join4( server.map_err(|_| {}), telemetry, metrics_server.map_err(|_| {}), diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index 7eac51fa5..fb0aceab1 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -14,8 +14,7 @@ use tower_h2; use conduit_proxy_router::Recognize; use bind::{self, Bind, Protocol}; -use control; -use control::destination::{Bind as BindTrait, Resolution}; +use control::destination::{self, Bind as BindTrait, Resolution}; use ctx; use task::LazyExecutor; use timeout::Timeout; @@ -27,7 +26,7 @@ type BindProtocol = bind::BindProtocol, B>; pub struct Outbound { bind: Bind, B>, - discovery: control::Control, + discovery: destination::Resolver, bind_timeout: Duration, } @@ -43,7 +42,7 @@ pub enum Destination { impl Outbound { pub fn new(bind: Bind, B>, - discovery: control::Control, + discovery: destination::Resolver, bind_timeout: Duration) -> Outbound { Self { diff --git a/proxy/src/telemetry/control.rs b/proxy/src/telemetry/control.rs index f912de09f..4d2cbf9d9 100644 --- a/proxy/src/telemetry/control.rs +++ b/proxy/src/telemetry/control.rs @@ -51,50 +51,32 @@ pub struct Control { } -// ===== impl MakeControl ===== +// ===== impl Control ===== -impl MakeControl { - /// Constructs a type that can instantiate a `Control`. +impl Control { + + /// Returns a new `Control`. /// /// # Arguments /// - `rx`: the `Receiver` side of the channel on which events are sent. /// - `process_ctx`: runtime process metadata. + /// - `taps`: shares a `Taps` instance. pub(super) fn new( rx: Receiver, process_ctx: &Arc, metrics_retain_idle: Duration, + taps: &Arc> ) -> Self { + let (metrics_record, metrics_service) = + metrics::new(&process_ctx, metrics_retain_idle); Self { - rx, - process_ctx: Arc::clone(process_ctx), - metrics_retain_idle, + metrics_record, + metrics_service, + rx: Some(rx), + taps: Some(taps.clone()), } } - /// Bind a `Control` with the current task executor. - /// - /// # Arguments - /// - `taps`: shares a `Taps` instance. - /// - /// # Returns - /// - `Ok(())` if the timeout was successfully created. - /// - `Err(io::Error)` if the timeout could not be created. - pub fn make_control(self, taps: &Arc>) -> io::Result { - let (metrics_record, metrics_service) = - metrics::new(&self.process_ctx, self.metrics_retain_idle); - - Ok(Control { - metrics_record, - metrics_service, - rx: Some(self.rx), - taps: Some(taps.clone()), - }) - } -} - -// ===== impl Control ===== - -impl Control { fn recv(&mut self) -> Poll, ()> { match self.rx.take() { None => Ok(Async::Ready(None)), diff --git a/proxy/src/telemetry/mod.rs b/proxy/src/telemetry/mod.rs index 99a408335..fd4c5220c 100644 --- a/proxy/src/telemetry/mod.rs +++ b/proxy/src/telemetry/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use futures_mpsc_lossy; @@ -32,9 +32,10 @@ pub fn new( process: &Arc, capacity: usize, metrics_retain_idle: Duration, -) -> (Sensors, MakeControl) { + taps: &Arc>, +) -> (Sensors, Control) { let (tx, rx) = futures_mpsc_lossy::channel(capacity); let s = Sensors::new(tx); - let c = MakeControl::new(rx, process, metrics_retain_idle); + let c = Control::new(rx, process, metrics_retain_idle, taps); (s, c) } diff --git a/proxy/src/transport/connect.rs b/proxy/src/transport/connect.rs index 3430a1779..6f1b8b5d2 100644 --- a/proxy/src/transport/connect.rs +++ b/proxy/src/transport/connect.rs @@ -116,7 +116,7 @@ impl LookupAddressAndConnect { impl tokio_connect::Connect for LookupAddressAndConnect { type Connected = connection::Connection; type Error = io::Error; - type Future = Box>; + type Future = Box + Send>; fn connect(&self) -> Self::Future { let port = self.host_and_port.port;