From d709ec37e3390f156a4215d891b8f4ef82da9d20 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 21 May 2018 15:40:33 -0700 Subject: [PATCH] proxy: Remove configure-and-bind-to-executor pattern (#967) A common pattern when using the old Tokio API was separating the configuration of a task from binding it to an executor to run on. This was often necessary when we wanted to construct a type corresponding to some task before the reactor on which it would execute was initialized. Typically, this was accomplished with two separate types, one of which represented the configuration and exposed only a method to take a reactor `Handle` and transform it to the other type, representing the actual task. After we migrate to the new Tokio API in #944, executors no longer need to be passed explictly, as we can use `DefaultExecutor::current` or `current_thread::TaskExecutor::current` to spawn a task on the current executor. Therefore, a lot of this complexity can be refactored away. This PR refactors the `Config` and `Process` structs in i`control::destination::background` into a single `Background` struct, and removes the `dns::Config` and `telemetry::MakeControl` structs (`dns::Resolver` and `telemetry::Control` are now constructed directly). It should not cause any functional changes. Closes #966 Signed-off-by: Eliza Weisman --- proxy/src/control/destination/background.rs | 129 ++++++++++++-------- proxy/src/control/destination/mod.rs | 32 +++-- proxy/src/control/mod.rs | 87 +------------ proxy/src/dns.rs | 27 ++-- proxy/src/lib.rs | 22 ++-- proxy/src/outbound.rs | 7 +- proxy/src/telemetry/control.rs | 42 ++----- proxy/src/telemetry/mod.rs | 7 +- proxy/src/transport/connect.rs | 2 +- 9 files changed, 144 insertions(+), 211 deletions(-) diff --git a/proxy/src/control/destination/background.rs b/proxy/src/control/destination/background.rs index 0bfca8159..6b44de6a6 100644 --- a/proxy/src/control/destination/background.rs +++ b/proxy/src/control/destination/background.rs @@ -1,5 +1,3 @@ -use futures::sync::mpsc; -use futures::{Async, Future, Stream}; use std::collections::{ hash_map::{Entry, HashMap}, VecDeque, @@ -8,8 +6,18 @@ use std::fmt; use std::iter::IntoIterator; use std::net::SocketAddr; use std::time::Duration; + +use bytes::Bytes; +use futures::{ + future, + sync::mpsc, + Async, Future, Stream, +}; +use h2; +use http; use tower_grpc as grpc; -use tower_h2::{BoxBody, HttpService, RecvBody}; +use tower_h2::{self, BoxBody, HttpService, RecvBody}; +use tower_reconnect::Reconnect; use conduit_proxy_controller_grpc::common::{Destination, TcpAddress}; use conduit_proxy_controller_grpc::destination::client::Destination as DestinationSvc; @@ -17,31 +25,28 @@ use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2; use conduit_proxy_controller_grpc::destination::{Update as PbUpdate, WeightedAddr}; use super::{Metadata, ResolveRequest, Responder, Update}; -use control::cache::{Cache, CacheChange, Exists}; -use control::fully_qualified_authority::FullyQualifiedAuthority; -use control::remote_stream::{Receiver, Remote}; +use control::{ + cache::{Cache, CacheChange, Exists}, + fully_qualified_authority::FullyQualifiedAuthority, + remote_stream::{Receiver, Remote}, + AddOrigin, Backoff, LogErrors +}; use dns::{self, IpAddrListFuture}; +use task::LazyExecutor; use telemetry::metrics::DstLabels; -use transport::DnsNameAndPort; +use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect}; +use timeout::Timeout; type DestinationServiceQuery = Remote; type UpdateRx = Receiver; -/// Stores the configuration for a destination background worker. -#[derive(Debug)] -pub struct Config { - request_rx: mpsc::UnboundedReceiver, - dns_config: dns::Config, - default_destination_namespace: String, -} - /// Satisfies resolutions as requested via `request_rx`. /// -/// As `Process` is polled with a client to Destination service, if the client to the +/// As the `Background` is polled with a client to Destination service, if the client to the /// service is healthy, it reads requests from `request_rx`, determines how to resolve the /// provided authority to a set of addresses, and ensures that resolution updates are /// propagated to all requesters. -pub struct Process> { +struct Background> { dns_resolver: dns::Resolver, default_destination_namespace: String, destinations: HashMap>, @@ -62,46 +67,74 @@ struct DestinationSet> { responders: Vec, } -// ==== impl Config ===== -impl Config { - pub(super) fn new( - request_rx: mpsc::UnboundedReceiver, - dns_config: dns::Config, - default_destination_namespace: String, - ) -> Self { - Self { - request_rx, - dns_config, - default_destination_namespace, - } - } +/// Returns a new discovery background task. +pub(super) fn task( + request_rx: mpsc::UnboundedReceiver, + dns_resolver: dns::Resolver, + default_destination_namespace: String, + host_and_port: HostAndPort, +) -> impl Future +{ + // Build up the Controller Client Stack + let mut client = { + let ctx = ("controller-client", format!("{:?}", host_and_port)); + let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap(); + let authority = http::uri::Authority::from(&host_and_port); + let connect = Timeout::new( + LookupAddressAndConnect::new(host_and_port, dns_resolver.clone()), + Duration::from_secs(3), + ); - /// Bind this handle to start talking to the controller API. - pub fn process(self) -> Process - where - T: HttpService, - T::Error: fmt::Debug, - { - Process { - dns_resolver: dns::Resolver::new(self.dns_config), - default_destination_namespace: self.default_destination_namespace, - destinations: HashMap::new(), - reconnects: VecDeque::new(), - rpc_ready: false, - request_rx: self.request_rx, - } - } + let h2_client = tower_h2::client::Connect::new( + connect, + h2::client::Builder::default(), + ::logging::context_executor(ctx, LazyExecutor), + ); + + let reconnect = Reconnect::new(h2_client); + let log_errors = LogErrors::new(reconnect); + let backoff = Backoff::new(log_errors, Duration::from_secs(5)); + // TODO: Use AddOrigin in tower-http + AddOrigin::new(scheme, authority, backoff) + }; + + let mut disco = Background::new( + request_rx, + dns_resolver, + default_destination_namespace, + ); + + future::poll_fn(move || { + disco.poll_rpc(&mut client); + + Ok(Async::NotReady) + }) } -// ==== impl Process ===== +// ==== impl Background ===== -impl Process +impl Background where T: HttpService, T::Error: fmt::Debug, { - pub fn poll_rpc(&mut self, client: &mut T) { + fn new( + request_rx: mpsc::UnboundedReceiver, + dns_resolver: dns::Resolver, + default_destination_namespace: String, + ) -> Self { + Self { + dns_resolver, + default_destination_namespace, + destinations: HashMap::new(), + reconnects: VecDeque::new(), + rpc_ready: false, + request_rx, + } + } + + fn poll_rpc(&mut self, client: &mut T) { // This loop is make sure any streams that were found disconnected // in `poll_destinations` while the `rpc` service is ready should // be reconnected now, otherwise the task would just sleep... diff --git a/proxy/src/control/destination/mod.rs b/proxy/src/control/destination/mod.rs index aeb2d5535..724faa9c1 100644 --- a/proxy/src/control/destination/mod.rs +++ b/proxy/src/control/destination/mod.rs @@ -28,8 +28,13 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, Weak}; -use futures::sync::mpsc; -use futures::{Async, Poll, Stream}; +use futures::{ + sync::mpsc, + Future, + Async, + Poll, + Stream +}; use futures_watch::{Store, Watch}; use http; use tower_discover::{Change, Discover}; @@ -37,14 +42,14 @@ use tower_service::Service; use dns; use telemetry::metrics::DstLabels; -use transport::DnsNameAndPort; +use transport::{DnsNameAndPort, HostAndPort}; pub mod background; mod endpoint; pub use self::endpoint::{DstLabelsWatch, Endpoint}; -/// A handle to request resolutions from a `Background`. +/// A handle to request resolutions from the background discovery task. #[derive(Clone, Debug)] pub struct Resolver { request_tx: mpsc::UnboundedSender, @@ -127,17 +132,24 @@ pub trait Bind { fn bind(&self, addr: &Self::Endpoint) -> Result; } -/// Creates a "channel" of `Resolver` to `Background` handles. +/// Returns a `Resolver` and a background task future. /// -/// The `Resolver` is used by a listener, the `Background` is consumed -/// on the controller thread. +/// The `Resolver` is used by a listener to request resolutions, while +/// the background future is executed on the controller thread's executor +/// to drive the background task. pub fn new( - dns_config: dns::Config, + dns_resolver: dns::Resolver, default_destination_namespace: String, -) -> (Resolver, background::Config) { + host_and_port: HostAndPort, +) -> (Resolver, impl Future) { let (request_tx, rx) = mpsc::unbounded(); let disco = Resolver { request_tx }; - let bg = background::Config::new(rx, dns_config, default_destination_namespace); + let bg = background::task( + rx, + dns_resolver, + default_destination_namespace, + host_and_port, + ); (disco, bg) } diff --git a/proxy/src/control/mod.rs b/proxy/src/control/mod.rs index 976baf0a3..088d38064 100644 --- a/proxy/src/control/mod.rs +++ b/proxy/src/control/mod.rs @@ -2,19 +2,14 @@ use std::fmt; use std::io; use std::time::{Duration, Instant}; -use bytes::Bytes; -use futures::{future, Async, Future, Poll}; -use h2; +use futures::{Async, Future, Poll}; use http; use tokio::timer::Delay; use tower_service::Service; use tower_h2; -use tower_reconnect::{Error as ReconnectError, Reconnect}; +use tower_reconnect::{Error as ReconnectError}; -use dns; -use task::LazyExecutor; -use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect}; -use timeout::{Timeout, TimeoutError}; +use timeout::TimeoutError; mod cache; pub mod destination; @@ -23,85 +18,9 @@ mod observe; pub mod pb; mod remote_stream; -use self::destination::{Resolver, Resolution}; pub use self::destination::Bind; pub use self::observe::Observe; -#[derive(Clone)] -pub struct Control { - disco: Resolver, -} - -pub struct Background { - disco: destination::background::Config, -} - -pub fn new(dns_config: dns::Config, default_destination_namespace: String) -> (Control, Background) -{ - let (tx, rx) = self::destination::new(dns_config, default_destination_namespace); - - let c = Control { - disco: tx, - }; - - let b = Background { - disco: rx, - }; - - (c, b) -} - -// ===== impl Control ===== - -impl Control { - pub fn resolve(&self, auth: &DnsNameAndPort, bind: B) -> Resolution { - self.disco.resolve(auth, bind) - } -} - -// ===== impl Background ===== - -impl Background { - pub fn bind( - self, - host_and_port: HostAndPort, - dns_config: dns::Config, - ) -> Box> { - // Build up the Controller Client Stack - let mut client = { - let ctx = ("controller-client", format!("{:?}", host_and_port)); - let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap(); - let authority = http::uri::Authority::from(&host_and_port); - let dns_resolver = dns::Resolver::new(dns_config); - let connect = Timeout::new( - LookupAddressAndConnect::new(host_and_port, dns_resolver), - Duration::from_secs(3), - ); - - let h2_client = tower_h2::client::Connect::new( - connect, - h2::client::Builder::default(), - ::logging::context_executor(ctx, LazyExecutor), - ); - - let reconnect = Reconnect::new(h2_client); - let log_errors = LogErrors::new(reconnect); - let backoff = Backoff::new(log_errors, Duration::from_secs(5)); - // TODO: Use AddOrigin in tower-http - AddOrigin::new(scheme, authority, backoff) - }; - - let mut disco = self.disco.process(); - - let fut = future::poll_fn(move || { - disco.poll_rpc(&mut client); - - Ok(Async::NotReady) - }); - - Box::new(fut) - } -} // ===== Backoff ===== diff --git a/proxy/src/dns.rs b/proxy/src/dns.rs index a8d2a5537..7016427eb 100644 --- a/proxy/src/dns.rs +++ b/proxy/src/dns.rs @@ -11,18 +11,13 @@ use trust_dns_resolver::ResolverFuture; use trust_dns_resolver::lookup_ip::LookupIp; #[derive(Clone, Debug)] -pub struct Config { +pub struct Resolver { config: ResolverConfig, opts: ResolverOpts, } -#[derive(Clone, Debug)] -pub struct Resolver { - config: Config, -} - pub enum IpAddrFuture { - DNS(Box>), + DNS(Box + Send>), Fixed(IpAddr), } @@ -37,7 +32,7 @@ pub enum Response { } // `Box` implements `Future` so it doesn't need to be implemented manually. -pub type IpAddrListFuture = Box>; +pub type IpAddrListFuture = Box + Send>; /// A DNS name. #[derive(Clone, Debug, Eq, Hash, PartialEq)] @@ -63,23 +58,19 @@ impl AsRef for Name { } } -impl Config { +impl Resolver { /// TODO: Make this infallible, like it is in the `domain` crate. pub fn from_system_config() -> Result { let (config, opts) = trust_dns_resolver::system_conf::read_system_conf()?; trace!("DNS config: {:?}", &config); trace!("DNS opts: {:?}", &opts); - Ok(Config { - config, - opts - }) + Ok(Self::new(config, opts)) } -} -impl Resolver { - pub fn new(config: Config) -> Self { + pub fn new(config: ResolverConfig, opts: ResolverOpts) -> Self { Resolver { config, + opts, } } @@ -126,8 +117,8 @@ impl Resolver { { let name = name.clone(); // TODO: ref-count names. let resolver = ResolverFuture::new( - self.config.config, - self.config.opts + self.config, + self.opts ); resolver.and_then(move |r| r.lookup_ip(name.as_str())) } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index a2ccf52b4..4a0676b7e 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -203,19 +203,25 @@ where config.outbound_ports_disable_protocol_detection, ); + let (taps, observe) = control::Observe::new(100); let (sensors, telemetry) = telemetry::new( &process_ctx, config.event_buffer_capacity, config.metrics_retain_idle, + &taps, ); - let dns_config = dns::Config::from_system_config() + let dns_resolver = dns::Resolver::from_system_config() .unwrap_or_else(|e| { // TODO: Make DNS configuration infallible. panic!("invalid DNS configuration: {:?}", e); }); - let (control, control_bg) = control::new(dns_config.clone(), config.pod_namespace.clone()); + let (control, control_bg) = control::destination::new( + dns_resolver.clone(), + config.pod_namespace.clone(), + control_host_and_port + ); let (drain_tx, drain_rx) = drain::channel(); @@ -285,24 +291,14 @@ where let mut rt = current_thread::Runtime::new() .expect("initialize controller-client thread runtime"); - let (taps, observe) = control::Observe::new(100); let new_service = TapServer::new(observe); let server = serve_control(control_listener, new_service); - let telemetry = telemetry - .make_control(&taps) - .expect("bad news in telemetry town"); - let metrics_server = telemetry .serve_metrics(metrics_listener); - let client = control_bg.bind( - control_host_and_port, - dns_config, - ); - - let fut = client.join4( + let fut = control_bg.join4( server.map_err(|_| {}), telemetry, metrics_server.map_err(|_| {}), diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index 7eac51fa5..fb0aceab1 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -14,8 +14,7 @@ use tower_h2; use conduit_proxy_router::Recognize; use bind::{self, Bind, Protocol}; -use control; -use control::destination::{Bind as BindTrait, Resolution}; +use control::destination::{self, Bind as BindTrait, Resolution}; use ctx; use task::LazyExecutor; use timeout::Timeout; @@ -27,7 +26,7 @@ type BindProtocol = bind::BindProtocol, B>; pub struct Outbound { bind: Bind, B>, - discovery: control::Control, + discovery: destination::Resolver, bind_timeout: Duration, } @@ -43,7 +42,7 @@ pub enum Destination { impl Outbound { pub fn new(bind: Bind, B>, - discovery: control::Control, + discovery: destination::Resolver, bind_timeout: Duration) -> Outbound { Self { diff --git a/proxy/src/telemetry/control.rs b/proxy/src/telemetry/control.rs index f912de09f..4d2cbf9d9 100644 --- a/proxy/src/telemetry/control.rs +++ b/proxy/src/telemetry/control.rs @@ -51,50 +51,32 @@ pub struct Control { } -// ===== impl MakeControl ===== +// ===== impl Control ===== -impl MakeControl { - /// Constructs a type that can instantiate a `Control`. +impl Control { + + /// Returns a new `Control`. /// /// # Arguments /// - `rx`: the `Receiver` side of the channel on which events are sent. /// - `process_ctx`: runtime process metadata. + /// - `taps`: shares a `Taps` instance. pub(super) fn new( rx: Receiver, process_ctx: &Arc, metrics_retain_idle: Duration, + taps: &Arc> ) -> Self { + let (metrics_record, metrics_service) = + metrics::new(&process_ctx, metrics_retain_idle); Self { - rx, - process_ctx: Arc::clone(process_ctx), - metrics_retain_idle, + metrics_record, + metrics_service, + rx: Some(rx), + taps: Some(taps.clone()), } } - /// Bind a `Control` with the current task executor. - /// - /// # Arguments - /// - `taps`: shares a `Taps` instance. - /// - /// # Returns - /// - `Ok(())` if the timeout was successfully created. - /// - `Err(io::Error)` if the timeout could not be created. - pub fn make_control(self, taps: &Arc>) -> io::Result { - let (metrics_record, metrics_service) = - metrics::new(&self.process_ctx, self.metrics_retain_idle); - - Ok(Control { - metrics_record, - metrics_service, - rx: Some(self.rx), - taps: Some(taps.clone()), - }) - } -} - -// ===== impl Control ===== - -impl Control { fn recv(&mut self) -> Poll, ()> { match self.rx.take() { None => Ok(Async::Ready(None)), diff --git a/proxy/src/telemetry/mod.rs b/proxy/src/telemetry/mod.rs index 99a408335..fd4c5220c 100644 --- a/proxy/src/telemetry/mod.rs +++ b/proxy/src/telemetry/mod.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use futures_mpsc_lossy; @@ -32,9 +32,10 @@ pub fn new( process: &Arc, capacity: usize, metrics_retain_idle: Duration, -) -> (Sensors, MakeControl) { + taps: &Arc>, +) -> (Sensors, Control) { let (tx, rx) = futures_mpsc_lossy::channel(capacity); let s = Sensors::new(tx); - let c = MakeControl::new(rx, process, metrics_retain_idle); + let c = Control::new(rx, process, metrics_retain_idle, taps); (s, c) } diff --git a/proxy/src/transport/connect.rs b/proxy/src/transport/connect.rs index 3430a1779..6f1b8b5d2 100644 --- a/proxy/src/transport/connect.rs +++ b/proxy/src/transport/connect.rs @@ -116,7 +116,7 @@ impl LookupAddressAndConnect { impl tokio_connect::Connect for LookupAddressAndConnect { type Connected = connection::Connection; type Error = io::Error; - type Future = Box>; + type Future = Box + Send>; fn connect(&self) -> Self::Future { let port = self.host_and_port.port;