From 70d2f57e2106dc78939e3d78ac524aa3ba2e34c2 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 26 Oct 2018 15:49:31 -0700 Subject: [PATCH] Make `resolve` a stack module (#106) The `proxy::http::balance` module uses the `proxy::resolve::Resolve` trait to implement a `Discover`. This coupling between the balance and resolve modules prevents integrating the destination profile API such that there is a per-route, per-endpoint stack. This change makes the `balance` stack generic over a stack that produces a `Discover`. The `resolve` module now implements a stack that produces a `Discover` and is generic over a per-endpoint stack. --- src/app/main.rs | 34 ++++----- src/lib.rs | 3 + src/proxy/http/balance.rs | 155 ++++++-------------------------------- src/proxy/resolve.rs | 141 +++++++++++++++++++++++++++++++++- 4 files changed, 177 insertions(+), 156 deletions(-) diff --git a/src/app/main.rs b/src/app/main.rs index 379092be7..d8d663168 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -20,10 +20,8 @@ use logging; use metrics; use proxy::{ self, buffer, - http::{ - balance, client, insert_target, metrics::timestamp_request_open, normalize_uri, router, - }, - limit, reconnect, timeout + http::{client, insert_target, metrics::timestamp_request_open, normalize_uri, router}, + limit, reconnect, timeout, }; use svc::{self, Layer as _Layer, Stack as _Stack}; use tap; @@ -223,7 +221,11 @@ where let (drain_tx, drain_rx) = drain::channel(); let outbound = { - use super::outbound; + use super::outbound::{discovery::Resolve, orig_proto_upgrade, Recognize}; + use proxy::{ + http::{balance, metrics}, + resolve, + }; let http_metrics = http_metrics.clone(); @@ -252,31 +254,25 @@ where // settings. Stack layers above this operate on an `Endpoint` with // the TLS client config is marked as `NoConfig` when the endpoint // has a TLS identity. - let router_layer = router::Layer::new(outbound::Recognize::new()) + let router_stack = router::Layer::new(Recognize::new()) .and_then(limit::Layer::new(MAX_IN_FLIGHT)) .and_then(timeout::Layer::new(config.bind_timeout)) .and_then(buffer::Layer::new()) - .and_then(balance::Layer::new(outbound::discovery::Resolve::new( - resolver, - ))) - .and_then(outbound::orig_proto_upgrade::Layer::new()) + .and_then(balance::layer()) + .and_then(resolve::layer(Resolve::new(resolver))) + .and_then(orig_proto_upgrade::Layer::new()) .and_then(svc::watch::layer(tls_client_config)) - .and_then(proxy::http::metrics::Layer::new( - http_metrics, - classify::Classify, - )) + .and_then(metrics::Layer::new(http_metrics, classify::Classify)) .and_then(tap::Layer::new(tap_next_id.clone(), taps.clone())) .and_then(normalize_uri::Layer::new()) - .and_then(svc::stack_per_request::Layer::new()); - - let client = reconnect::Layer::new() + .and_then(svc::stack_per_request::Layer::new()) + .and_then(reconnect::Layer::new()) .and_then(client::Layer::new("out")) .bind(connect.clone()); let capacity = config.outbound_router_capacity; let max_idle_age = config.outbound_router_max_idle_age; - let router = router_layer - .bind(client) + let router = router_stack .make(&router::Config::new("out", capacity, max_idle_age)) .expect("outbound router"); diff --git a/src/lib.rs b/src/lib.rs index b168e9269..706d6ef70 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,9 @@ #![cfg_attr(feature = "cargo-clippy", allow(new_without_default_derive))] #![deny(warnings)] +// Stack type inference requires a deeper recursion limit +#![recursion_limit="128"] + extern crate bytes; extern crate env_logger; extern crate linkerd2_fs_watch as fs_watch; diff --git a/src/proxy/http/balance.rs b/src/proxy/http/balance.rs index 2e8f47c31..9b4521806 100644 --- a/src/proxy/http/balance.rs +++ b/src/proxy/http/balance.rs @@ -2,69 +2,40 @@ extern crate tower_balance; extern crate tower_discover; extern crate tower_h2_balance; -use futures::{Async, Poll}; +use self::tower_discover::Discover; use http; -use std::{error, fmt}; -use std::marker::PhantomData; -use std::net::SocketAddr; use std::time::Duration; use tower_h2::Body; pub use self::tower_balance::{choose::PowerOfTwoChoices, load::WithPeakEwma, Balance}; -use self::tower_discover::{Change, Discover}; pub use self::tower_h2_balance::{PendingUntilFirstData, PendingUntilFirstDataBody}; -use proxy::resolve::{Resolve, Resolution, Update}; use svc; /// Configures a stack to resolve `T` typed targets to balance requests over /// `M`-typed endpoint stacks. #[derive(Clone, Debug)] -pub struct Layer { +pub struct Layer { decay: Duration, - resolve: R, - _p: PhantomData (T, M)>, } /// Resolves `T` typed targets to balance requests over `M`-typed endpoint stacks. #[derive(Clone, Debug)] -pub struct Stack { +pub struct Stack { decay: Duration, - resolve: R, inner: M, - _p: PhantomData T>, -} - -/// Observes an `R`-typed resolution stream, using an `M`-typed endpoint stack to -/// build a service for each endpoint. -pub struct Service> { - resolution: R, - make: M, } // === impl Layer === -impl Layer -where - R: Resolve + Clone, - R::Endpoint: fmt::Debug, - M: svc::Stack + Clone, - M::Value: svc::Service< - Request = http::Request, - Response = http::Response, - >, - A: Body, - B: Body, -{ - pub const DEFAULT_DECAY: Duration = Duration::from_secs(10); - - pub fn new(resolve: R) -> Self { - Self { - resolve, - decay: Self::DEFAULT_DECAY, - _p: PhantomData, - } +pub fn layer() -> Layer { + Layer { + decay: Layer::DEFAULT_DECAY, } +} + +impl Layer { + const DEFAULT_DECAY: Duration = Duration::from_secs(10); // pub fn with_decay(self, decay: Duration) -> Self { // Self { @@ -74,125 +45,41 @@ where // } } -impl svc::Layer for Layer +impl svc::Layer for Layer where - R: Resolve + Clone, - R::Endpoint: fmt::Debug, - M: svc::Stack + Clone, - M::Value: svc::Service< - Request = http::Request, - Response = http::Response, - >, + M: svc::Stack + Clone, + M::Value: Discover, Response = http::Response>, A: Body, B: Body, { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; - type Stack = Stack; + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { Stack { decay: self.decay, - resolve: self.resolve.clone(), inner, - _p: PhantomData, } } } // === impl Stack === -impl svc::Stack for Stack +impl svc::Stack for Stack where - R: Resolve, - R::Endpoint: fmt::Debug, - M: svc::Stack + Clone, - M::Value: svc::Service< - Request = http::Request, - Response = http::Response, - >, + M: svc::Stack + Clone, + M::Value: Discover, Response = http::Response>, A: Body, B: Body, { - type Value = Balance< - WithPeakEwma, PendingUntilFirstData>, - PowerOfTwoChoices, - >; + type Value = Balance, PowerOfTwoChoices>; type Error = M::Error; fn make(&self, target: &T) -> Result { - let discover = Service { - resolution: self.resolve.resolve(&target), - make: self.inner.clone(), - }; - + let discover = self.inner.make(target)?; let instrument = PendingUntilFirstData::default(); let loaded = WithPeakEwma::new(discover, self.decay, instrument); Ok(Balance::p2c(loaded)) } } - -// === impl Service === - -impl Discover for Service -where - R: Resolution, - R::Endpoint: fmt::Debug, - M: svc::Stack, - M::Value: svc::Service, -{ - type Key = SocketAddr; - type Request = ::Request; - type Response = ::Response; - type Error = ::Error; - type Service = M::Value; - type DiscoverError = Error; - - fn poll(&mut self) - -> Poll, Self::DiscoverError> - { - loop { - let up = try_ready!(self.resolution.poll().map_err(Error::Resolve)); - trace!("watch: {:?}", up); - match up { - Update::Add(addr, target) => { - // We expect the load balancer to handle duplicate inserts - // by replacing the old endpoint with the new one, so - // insertions of new endpoints and metadata changes for - // existing ones can be handled in the same way. - let svc = self.make.make(&target).map_err(Error::Stack)?; - return Ok(Async::Ready(Change::Insert(addr, svc))); - }, - Update::Remove(addr) => { - return Ok(Async::Ready(Change::Remove(addr))); - }, - } - } - } -} - -// === impl Error === - -#[derive(Debug)] -pub enum Error { - Resolve(R), - Stack(M), -} - -impl fmt::Display for Error<(), M> -where - M: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Error::Resolve(()) => unreachable!("resolution must succeed"), - Error::Stack(e) => e.fmt(f), - } - } -} - -impl error::Error for Error<(), M> -where - M: error::Error, -{ -} diff --git a/src/proxy/resolve.rs b/src/proxy/resolve.rs index f9a2b5aba..e1fcf65fe 100644 --- a/src/proxy/resolve.rs +++ b/src/proxy/resolve.rs @@ -1,5 +1,11 @@ -use futures::Poll; +extern crate tower_discover; + +use futures::{Async, Poll}; use std::net::SocketAddr; +use std::{error, fmt}; + +pub use self::tower_discover::Change; +use svc; /// Resolves `T`-typed names/addresses as a `Resolution`. pub trait Resolve { @@ -17,9 +23,138 @@ pub trait Resolution { fn poll(&mut self) -> Poll, Self::Error>; } - -#[derive(Debug, Clone)] +#[derive(Clone, Debug)] pub enum Update { Add(SocketAddr, T), Remove(SocketAddr), } + +#[derive(Clone, Debug)] +pub struct Layer { + resolve: R, +} + +#[derive(Clone, Debug)] +pub struct Stack { + resolve: R, + inner: M, +} + +/// Observes an `R`-typed resolution stream, using an `M`-typed endpoint stack to +/// build a service for each endpoint. +#[derive(Clone, Debug)] +pub struct Discover> { + resolution: R, + make: M, +} + +// === impl Layer === + +pub fn layer(resolve: R) -> Layer +where + R: Resolve + Clone, + R::Endpoint: fmt::Debug, +{ + Layer { + resolve, + } +} + +impl svc::Layer for Layer +where + R: Resolve + Clone, + R::Endpoint: fmt::Debug, + M: svc::Stack + Clone, + M::Value: svc::Service, +{ + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; + + fn bind(&self, inner: M) -> Self::Stack { + Stack { + resolve: self.resolve.clone(), + inner, + } + } +} + +// === impl Stack === + +impl svc::Stack for Stack +where + R: Resolve, + R::Endpoint: fmt::Debug, + M: svc::Stack + Clone, + M::Value: svc::Service, +{ + type Value = Discover; + type Error = M::Error; + + fn make(&self, target: &T) -> Result { + let resolution = self.resolve.resolve(target); + Ok(Discover { + resolution, + make: self.inner.clone(), + }) + } +} + +// === impl Discover === + +impl tower_discover::Discover for Discover +where + R: Resolution, + R::Endpoint: fmt::Debug, + M: svc::Stack, + M::Value: svc::Service, +{ + type Key = SocketAddr; + type Request = ::Request; + type Response = ::Response; + type Error = ::Error; + type Service = M::Value; + type DiscoverError = Error; + + fn poll(&mut self) -> Poll, Self::DiscoverError> { + loop { + let up = try_ready!(self.resolution.poll().map_err(Error::Resolve)); + trace!("watch: {:?}", up); + match up { + Update::Add(addr, target) => { + // We expect the load balancer to handle duplicate inserts + // by replacing the old endpoint with the new one, so + // insertions of new endpoints and metadata changes for + // existing ones can be handled in the same way. + let svc = self.make.make(&target).map_err(Error::Stack)?; + return Ok(Async::Ready(Change::Insert(addr, svc))); + } + Update::Remove(addr) => { + return Ok(Async::Ready(Change::Remove(addr))); + } + } + } + } +} + +// === impl Error === + +#[derive(Debug)] +pub enum Error { + Resolve(R), + Stack(M), +} + +impl fmt::Display for Error<(), M> +where + M: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Resolve(()) => unreachable!("resolution must succeed"), + Error::Stack(e) => e.fmt(f), + } + } +} + +impl error::Error for Error<(), M> where M: error::Error {}