diff --git a/src/app/main.rs b/src/app/main.rs index c2c7a6698..450fdcae3 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -23,7 +23,7 @@ use proxy::{ http::{ balance, client, insert_target, metrics::timestamp_request_open, normalize_uri, router, }, - limit, timeout, + limit, reconnect, timeout, }; use svc::{self, Layer as _Layer, Stack as _Stack}; use tap; @@ -269,10 +269,14 @@ where .and_then(normalize_uri::Layer::new()) .and_then(svc::stack_per_request::Layer::new()); + let client = reconnect::Layer::new() + .and_then(client::Layer::new("out")) + .bind(connect.clone()); + let capacity = config.outbound_router_capacity; let max_idle_age = config.outbound_router_max_idle_age; let router = router_layer - .bind(client::Stack::new("out", connect.clone())) + .bind(client) .make(&router::Config::new("out", capacity, max_idle_age)) .expect("outbound router"); @@ -332,11 +336,15 @@ where .and_then(normalize_uri::Layer::new()) .and_then(svc::stack_per_request::Layer::new()); + let client = reconnect::Layer::new() + .and_then(client::Layer::new("in")) + .bind(connect.clone()); + // Build a router using the above policy let capacity = config.inbound_router_capacity; let max_idle_age = config.inbound_router_max_idle_age; let router = router_layer - .bind(client::Stack::new("in", connect.clone())) + .bind(client) .make(&router::Config::new("in", capacity, max_idle_age)) .expect("inbound router"); diff --git a/src/proxy/http/client.rs b/src/proxy/http/client.rs index 1072110d2..746c6a97f 100644 --- a/src/proxy/http/client.rs +++ b/src/proxy/http/client.rs @@ -11,7 +11,6 @@ use tower_h2; use super::{h1, Settings}; use super::glue::{BodyPayload, HttpBody, HyperConnect}; use super::upgrade::{HttpConnect, Http11Upgrade}; -use super::super::Reconnect; use svc; use task::BoxExecutor; use transport::connect; @@ -26,6 +25,15 @@ pub struct Config { _p: (), } +/// Configurs an HTTP client that uses a `C`-typed connector +/// +/// The `proxy_name` is used for diagnostics (logging, mostly). +#[derive(Debug)] +pub struct Layer { + proxy_name: &'static str, + _p: PhantomData B>, +} + /// Configurs an HTTP client that uses a `C`-typed connector /// /// The `proxy_name` is used for diagnostics (logging, mostly). @@ -144,31 +152,73 @@ impl Config { } } -// === impl Stack === +// === impl Layer === -impl Stack +impl Layer where - C: svc::Stack, - C::Value: connect::Connect + Clone + Send + Sync + 'static, - B: tower_h2::Body + 'static, + B: tower_h2::Body + Send + 'static, + ::Buf: Send + 'static, { - pub fn new(proxy_name: &'static str, connect: C) -> Self { + pub fn new(proxy_name: &'static str) -> Self { Self { - connect, proxy_name, _p: PhantomData, } } } +impl Clone for Layer +where + B: tower_h2::Body + 'static, + ::Buf: Send + 'static, +{ + fn clone(&self) -> Self { + Self { + proxy_name: self.proxy_name, + _p: PhantomData, + } + } +} + +impl svc::Layer for Layer +where + T: Into + Clone, + C: svc::Stack, + C::Value: connect::Connect + Clone + Send + Sync + 'static, + ::Connected: Send, + ::Future: Send + 'static, + ::Error: error::Error + Send + Sync, + B: tower_h2::Body + Send + 'static, + ::Buf: Send + 'static, +{ + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; + + fn bind(&self, connect: C) -> Self::Stack { + Stack { + connect, + proxy_name: self.proxy_name, + _p: PhantomData, + } + } +} + +// === impl Stack === + impl Clone for Stack where C: svc::Stack + Clone, C::Value: connect::Connect + Clone + Send + Sync + 'static, B: tower_h2::Body + 'static, + ::Buf: Send + 'static, { fn clone(&self) -> Self { - Self::new(self.proxy_name, self.connect.clone()) + Self { + proxy_name: self.proxy_name, + connect: self.connect.clone(), + _p: PhantomData, + } } } @@ -183,10 +233,7 @@ where B: tower_h2::Body + Send + 'static, ::Buf: Send + 'static, { - type Value = Reconnect< - Config, - Client, B>, - >; + type Value = Client, B>; type Error = C::Error; fn make(&self, t: &T) -> Result { @@ -196,8 +243,7 @@ where .with_settings(config.settings.clone()) .executor(); debug!("building client={:?}", config); - let client = Client::new(&config.settings, connect, executor); - Ok(Reconnect::new(config.clone(), client)) + Ok(Client::new(&config.settings, connect, executor)) } } diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index 262525323..5f735e529 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -6,13 +6,12 @@ pub mod buffer; pub mod http; pub mod limit; mod protocol; -mod reconnect; +pub mod reconnect; pub mod resolve; pub mod server; mod tcp; pub mod timeout; -pub use self::reconnect::Reconnect; pub use self::resolve::{Resolve, Resolution}; pub use self::server::{Server, Source}; diff --git a/src/proxy/reconnect.rs b/src/proxy/reconnect.rs index a27f37cda..2f9b2b364 100644 --- a/src/proxy/reconnect.rs +++ b/src/proxy/reconnect.rs @@ -1,15 +1,26 @@ -use std::fmt; - use futures::{task, Async, Future, Poll}; +use std::fmt; +use std::marker::PhantomData; use tower_reconnect; use svc; +#[derive(Debug)] +pub struct Layer { + _p: PhantomData (T, M)>, +} + +#[derive(Debug)] +pub struct Stack { + inner: M, + _p: PhantomData T>, +} + /// Wraps `tower_reconnect`, handling errors. /// /// Ensures that the underlying service is ready and, if the underlying service /// fails to become ready, rebuilds the inner stack. -pub struct Reconnect +pub struct Service where T: fmt::Debug, N: svc::NewService, @@ -29,26 +40,80 @@ pub struct ResponseFuture { inner: as svc::Service>::Future, } -// ===== impl Reconnect ===== +// === impl Layer === - -impl Reconnect +impl Layer where T: fmt::Debug, - N: svc::NewService, - N::InitError: fmt::Display, + M: svc::Stack, + M::Value: svc::NewService, { - pub fn new(target: T, new_service: N) -> Self { - let inner = tower_reconnect::Reconnect::new(new_service); + pub fn new() -> Self { Self { - target, - inner, - mute_connect_error_log: false, + _p: PhantomData, } } } -impl svc::Service for Reconnect +impl Clone for Layer { + fn clone(&self) -> Self { + Self { + _p: PhantomData, + } + } +} + +impl svc::Layer for Layer +where + T: Clone + fmt::Debug, + M: svc::Stack, + M::Value: svc::NewService, +{ + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; + + fn bind(&self, inner: M) -> Self::Stack { + Stack { + inner, + _p: PhantomData, + } + } +} + +// === impl Stack === + +impl Clone for Stack { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + _p: PhantomData, + } + } +} + +impl svc::Stack for Stack +where + T: Clone + fmt::Debug, + M: svc::Stack, + M::Value: svc::NewService, +{ + type Value = Service; + type Error = M::Error; + + fn make(&self, target: &T) -> Result { + let new_service = self.inner.make(target)?; + Ok(Service { + inner: tower_reconnect::Reconnect::new(new_service), + target: target.clone(), + mute_connect_error_log: false, + }) + } +} + +// === impl Service === + +impl svc::Service for Service where T: fmt::Debug, N: svc::NewService, @@ -63,13 +128,11 @@ where match self.inner.poll_ready() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(ready) => { - trace!("poll_ready: ready for business"); self.mute_connect_error_log = false; Ok(ready) } Err(tower_reconnect::Error::Inner(err)) => { - trace!("poll_ready: inner error, debouncing"); self.mute_connect_error_log = false; Err(err) } @@ -110,7 +173,7 @@ where } } -impl fmt::Debug for Reconnect { +impl fmt::Debug for Service { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("Reconnect") .field("target", &self.target)