diff --git a/lib/stack/src/either.rs b/lib/stack/src/either.rs index 3da24b9a1..74582f4d8 100644 --- a/lib/stack/src/either.rs +++ b/lib/stack/src/either.rs @@ -4,7 +4,7 @@ use futures::Poll; use svc; /// Describes two alternate `Layer`s, `Stacks`s or `Service`s. -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum Either { A(A), B(B), diff --git a/lib/stack/src/layer.rs b/lib/stack/src/layer.rs index b67e2635e..8dd285069 100644 --- a/lib/stack/src/layer.rs +++ b/lib/stack/src/layer.rs @@ -10,15 +10,15 @@ use std::marker::PhantomData; /// ```ignore /// impl> Layer for BalanceLayer { ... } /// ``` -pub trait Layer> { +pub trait Layer> { type Value; type Error; type Stack: super::Stack; - /// Produce a `Stack` value from a `M` value. - fn bind(&self, next: M) -> Self::Stack; + /// Produces a `Stack` value from a `M` value. + fn bind(&self, next: S) -> Self::Stack; - /// Compose this `Layer` with another. + /// Produces a new Layer with this layer wrapping the provided inner layer. fn and_then(self, inner: L) -> AndThen where @@ -32,6 +32,42 @@ pub trait Layer> { _p: PhantomData, } } + + /// Produces a new Layer with another layer wrapping this one. + fn push(self, outer: L) + -> AndThen + where + L: Layer, + Self: Sized, + { + AndThen { + outer, + inner: self, + _p: PhantomData, + } + } + + /// Wraps this layer such that stack errors are modified by `map_err`. + fn map_err(self, map_err: M) + -> AndThen, Self> + where + Self: Sized, + M: super::map_err::MapErr, + super::map_err::Layer: Layer, + { + super::map_err::layer(map_err).and_then(self) + } +} + +/// The identity layer. +impl> Layer for () { + type Value = M::Value; + type Error = M::Error; + type Stack = M; + + fn bind(&self, inner: M) -> M { + inner + } } /// Combines two `Layers` as one. diff --git a/lib/stack/src/lib.rs b/lib/stack/src/lib.rs index 3de44c100..920fb805e 100644 --- a/lib/stack/src/lib.rs +++ b/lib/stack/src/lib.rs @@ -3,10 +3,11 @@ extern crate futures; extern crate log; extern crate tower_service as svc; -use std::marker::PhantomData; - pub mod either; pub mod layer; +mod map_err; +pub mod map_target; +pub mod phantom_data; pub mod stack_new_service; pub mod stack_per_request; pub mod watch; @@ -39,29 +40,45 @@ pub trait Stack { { layer.bind(self) } + + /// Wraps this `Stack` such that errors are altered by `map_err` + fn map_err(self, map_err: M) -> map_err::Stack + where + M: map_err::MapErr, + Self: Sized, + { + map_err::stack(self, map_err) + } } /// Implements `Stack` for any `T` by cloning a `V`-typed value. -#[derive(Debug)] -pub struct Shared(V, PhantomData T>); +pub mod shared { + use std::{error, fmt}; -impl Shared { - pub fn new(v: V) -> Self { - Shared(v, PhantomData) + pub fn stack(v: V) -> Stack { + Stack(v) } -} -impl Clone for Shared { - fn clone(&self) -> Self { - Self::new(self.0.clone()) + #[derive(Clone, Debug)] + pub struct Stack(V); + + #[derive(Debug)] + pub enum Error {} + + impl super::Stack for Stack { + type Value = V; + type Error = Error; + + fn make(&self, _: &T) -> Result { + Ok(self.0.clone()) + } } -} -impl Stack for Shared { - type Value = V; - type Error = (); - - fn make(&self, _: &T) -> Result { - Ok(self.0.clone()) + impl fmt::Display for Error { + fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { + unreachable!() + } } + + impl error::Error for Error {} } diff --git a/lib/stack/src/map_err.rs b/lib/stack/src/map_err.rs new file mode 100644 index 000000000..8b4107087 --- /dev/null +++ b/lib/stack/src/map_err.rs @@ -0,0 +1,72 @@ +pub fn layer(map_err: M) -> Layer +where + M: MapErr, +{ + Layer(map_err) +} + +pub(super) fn stack(inner: S, map_err: M) -> Stack +where + S: super::Stack, + M: MapErr, +{ + Stack { + inner, + map_err, + } +} + +pub trait MapErr { + type Output; + + fn map_err(&self, e: Input) -> Self::Output; +} + +#[derive(Clone, Debug)] +pub struct Layer(M); + +#[derive(Clone, Debug)] +pub struct Stack { + inner: S, + map_err: M, +} + +impl super::Layer for Layer +where + S: super::Stack, + M: MapErr + Clone, +{ + type Value = as super::Stack>::Value; + type Error = as super::Stack>::Error; + type Stack = Stack; + + fn bind(&self, inner: S) -> Self::Stack { + Stack { + inner, + map_err: self.0.clone(), + } + } +} + +impl super::Stack for Stack +where + S: super::Stack, + M: MapErr, +{ + type Value = S::Value; + type Error = M::Output; + + fn make(&self, target: &T) -> Result { + self.inner.make(target).map_err(|e| self.map_err.map_err(e)) + } +} + +impl MapErr for F +where + F: Fn(I) -> O, +{ + type Output = O; + fn map_err(&self, i: I) -> O { + (self)(i) + } +} diff --git a/lib/stack/src/map_target.rs b/lib/stack/src/map_target.rs new file mode 100644 index 000000000..5d1e02794 --- /dev/null +++ b/lib/stack/src/map_target.rs @@ -0,0 +1,62 @@ + +pub fn layer(map_target: M) -> Layer +where + M: MapTarget, +{ + Layer(map_target) +} + +pub trait MapTarget { + type Target; + + fn map_target(&self, t: &T) -> Self::Target; +} + +#[derive(Clone, Debug)] +pub struct Layer(M); + +#[derive(Clone, Debug)] +pub struct Stack { + inner: S, + map_target: M, +} + +impl super::Layer for Layer +where + S: super::Stack, + M: MapTarget + Clone, +{ + type Value = as super::Stack>::Value; + type Error = as super::Stack>::Error; + type Stack = Stack; + + fn bind(&self, inner: S) -> Self::Stack { + Stack { + inner, + map_target: self.0.clone(), + } + } +} + +impl super::Stack for Stack +where + S: super::Stack, + M: MapTarget, +{ + type Value = S::Value; + type Error = S::Error; + + fn make(&self, target: &T) -> Result { + self.inner.make(&self.map_target.map_target(target)) + } +} + +impl MapTarget for F +where + F: Fn(&T) -> U, +{ + type Target = U; + fn map_target(&self, t: &T) -> U { + (self)(t) + } +} diff --git a/lib/stack/src/phantom_data.rs b/lib/stack/src/phantom_data.rs new file mode 100644 index 000000000..29a7c27b6 --- /dev/null +++ b/lib/stack/src/phantom_data.rs @@ -0,0 +1,39 @@ +use std::marker::PhantomData; + +pub fn layer() -> Layer +where + M: super::Stack, +{ + Layer(PhantomData) +} + +#[derive(Clone, Debug)] +pub struct Layer(PhantomData (T, M)>); + +#[derive(Clone, Debug)] +pub struct Stack { + inner: M, + _p: PhantomData T>, +} + +impl> super::Layer for Layer { + type Value = as super::Stack>::Value; + type Error = as super::Stack>::Error; + type Stack = Stack; + + fn bind(&self, inner: M) -> Self::Stack { + Stack { + inner, + _p: PhantomData + } + } +} + +impl> super::Stack for Stack { + type Value = M::Value; + type Error = M::Error; + + fn make(&self, target: &T) -> Result { + self.inner.make(target) + } +} diff --git a/lib/stack/src/stack_per_request.rs b/lib/stack/src/stack_per_request.rs index 9a5c40981..50004ca51 100644 --- a/lib/stack/src/stack_per_request.rs +++ b/lib/stack/src/stack_per_request.rs @@ -2,7 +2,6 @@ use futures::Poll; use std::fmt; -use std::marker::PhantomData; use svc; @@ -12,14 +11,13 @@ pub trait ShouldStackPerRequest { /// A `Layer` produces a `Service` `Stack` that creates a new service for each /// request. -#[derive(Debug)] -pub struct Layer(PhantomData (T, M)>); +#[derive(Clone, Debug)] +pub struct Layer(); /// A `Stack` that builds a new `Service` for each request it serves. -#[derive(Debug)] -pub struct Stack> { +#[derive(Clone, Debug)] +pub struct Stack { inner: M, - _p: PhantomData T>, } /// A `Service` that uses a new inner service for each request. @@ -36,6 +34,7 @@ pub struct Service> { /// A helper that asserts `M` can successfully build services for a specific /// value of `T`. +#[derive(Clone, Debug)] struct StackValid> { target: T, make: M, @@ -43,63 +42,28 @@ struct StackValid> { // === Layer === -impl Layer -where - T: ShouldStackPerRequest + Clone, - N: super::Stack + Clone, - N::Error: fmt::Debug, -{ - pub fn new() -> Self { - Layer(PhantomData) - } +pub fn layer() -> Layer { + Layer() } -impl Clone for Layer +impl super::Layer for Layer where T: ShouldStackPerRequest + Clone, N: super::Stack + Clone, N::Error: fmt::Debug, { - fn clone(&self) -> Self { - Self::new() - } -} - -impl super::Layer for Layer -where - T: ShouldStackPerRequest + Clone, - N: super::Stack + Clone, - N::Error: fmt::Debug, -{ - type Value = as super::Stack>::Value; - type Error = as super::Stack>::Error; - type Stack = Stack; + type Value = as super::Stack>::Value; + type Error = as super::Stack>::Error; + type Stack = Stack; fn bind(&self, inner: N) -> Self::Stack { - Stack { - inner, - _p: PhantomData, - } + Stack { inner } } } // === Stack === -impl Clone for Stack -where - T: ShouldStackPerRequest + Clone, - N: super::Stack + Clone, - N::Error: fmt::Debug, -{ - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - _p: PhantomData, - } - } -} - -impl super::Stack for Stack +impl super::Stack for Stack where T: ShouldStackPerRequest + Clone, N: super::Stack + Clone, @@ -160,6 +124,19 @@ where } } +impl Clone for Service +where + T: ShouldStackPerRequest + Clone, + N: super::Stack + Clone, +{ + fn clone(&self) -> Self { + Self { + next: None, + make: self.make.clone(), + } + } +} + // === StackValid === impl StackValid diff --git a/lib/stack/src/watch.rs b/lib/stack/src/watch.rs index 0d0f8c6ec..0614d84ad 100644 --- a/lib/stack/src/watch.rs +++ b/lib/stack/src/watch.rs @@ -3,6 +3,7 @@ extern crate futures_watch; use self::futures_watch::Watch; use futures::{future::MapErr, Async, Future, Poll, Stream}; use std::{error, fmt}; +use std::marker::PhantomData; use svc; @@ -14,14 +15,16 @@ pub trait WithUpdate { } #[derive(Debug)] -pub struct Layer { +pub struct Layer, U, M> { watch: Watch, + _p: PhantomData (T, M)>, } #[derive(Debug)] -pub struct Stack { +pub struct Stack, U, M> { watch: Watch, inner: M, + _p: PhantomData T>, } /// A Service that updates itself as a Watch updates. @@ -45,47 +48,58 @@ pub struct CloneUpdate {} // === impl Layer === -pub fn layer(watch: Watch) -> Layer { - Layer { watch } -} - -impl Clone for Layer { - fn clone(&self) -> Self { - Self { - watch: self.watch.clone(), - } - } -} - -impl super::Layer for Layer +pub fn layer(watch: Watch) -> Layer where T: WithUpdate + Clone, M: super::Stack + Clone, { - type Value = as super::Stack>::Value; - type Error = as super::Stack>::Error; - type Stack = Stack; + Layer { + watch, + _p: PhantomData, + } +} + +impl Clone for Layer +where + T: WithUpdate + Clone, + M: super::Stack + Clone, +{ + fn clone(&self) -> Self { + layer(self.watch.clone()) + } +} + +impl super::Layer for Layer +where + T: WithUpdate + Clone, + M: super::Stack + Clone, +{ + type Value = as super::Stack>::Value; + type Error = as super::Stack>::Error; + type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { Stack { inner, watch: self.watch.clone(), + _p: PhantomData, } } } // === impl Stack === -impl Clone for Stack { +impl, U, M: Clone> Clone for Stack { fn clone(&self) -> Self { Self { inner: self.inner.clone(), watch: self.watch.clone(), + _p: PhantomData, } } } -impl super::Stack for Stack +impl super::Stack for Stack where T: WithUpdate + Clone, M: super::Stack + Clone, diff --git a/src/app/inbound.rs b/src/app/inbound.rs index 5c0601045..d495d0426 100644 --- a/src/app/inbound.rs +++ b/src/app/inbound.rs @@ -130,10 +130,8 @@ pub mod orig_proto_downgrade { // === impl Layer === - impl Layer { - pub fn new() -> Self { - Layer - } + pub fn layer() -> Layer { + Layer } impl svc::Layer for Layer diff --git a/src/app/main.rs b/src/app/main.rs index d8d663168..c471a8332 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -221,7 +221,7 @@ where let (drain_tx, drain_rx) = drain::channel(); let outbound = { - use super::outbound::{discovery::Resolve, orig_proto_upgrade, Recognize}; + use super::outbound::{discovery::Resolve, orig_proto_upgrade, Endpoint, Recognize}; use proxy::{ http::{balance, metrics}, resolve, @@ -234,54 +234,53 @@ where let accept = transport_metrics.accept("outbound").bind(()); // Establishes connections to remote peers. - let connect = transport_metrics - .connect("outbound") - .and_then(proxy::timeout::Layer::new(config.outbound_connect_timeout)) - .bind(connect::Stack::new()); + let connect = connect::Stack::new() + .push(proxy::timeout::layer(config.outbound_connect_timeout)) + .push(transport_metrics.connect("outbound")); - // As HTTP requests are accepted, we add some request extensions - // including metadata about the request's origin. - let source_layer = - timestamp_request_open::Layer::new().and_then(insert_target::Layer::new()); + let client_stack = connect + .clone() + .push(client::layer("out")) + .push(svc::stack::map_target::layer(|ep: &Endpoint| { + client::Config::from(ep.clone()) + })) + .push(reconnect::layer()); - // `normalize_uri` and `stack_per_request` are applied on the stack - // selectively. For HTTP/2 stacks, for instance, neither service will be - // employed. - // - // The TLS status of outbound requests depends on the local - // configuration. As the local configuration changes, the inner - // stack (including a Client) is rebuilt with the appropriate - // settings. Stack layers above this operate on an `Endpoint` with - // the TLS client config is marked as `NoConfig` when the endpoint - // has a TLS identity. - let router_stack = router::Layer::new(Recognize::new()) - .and_then(limit::Layer::new(MAX_IN_FLIGHT)) - .and_then(timeout::Layer::new(config.bind_timeout)) - .and_then(buffer::Layer::new()) - .and_then(balance::layer()) - .and_then(resolve::layer(Resolve::new(resolver))) - .and_then(orig_proto_upgrade::Layer::new()) - .and_then(svc::watch::layer(tls_client_config)) - .and_then(metrics::Layer::new(http_metrics, classify::Classify)) - .and_then(tap::Layer::new(tap_next_id.clone(), taps.clone())) - .and_then(normalize_uri::Layer::new()) - .and_then(svc::stack_per_request::Layer::new()) - .and_then(reconnect::Layer::new()) - .and_then(client::Layer::new("out")) - .bind(connect.clone()); + let endpoint_stack = client_stack + .push(svc::stack_per_request::layer()) + .push(normalize_uri::layer()) + .push(orig_proto_upgrade::layer()) + .push(tap::layer(tap_next_id.clone(), taps.clone())) + .push(metrics::layer(http_metrics, classify::Classify)) + .push(svc::watch::layer(tls_client_config)); + + let dst_router_stack = endpoint_stack + .push(resolve::layer(Resolve::new(resolver))) + .push(balance::layer()) + .push(buffer::layer()) + .push(timeout::layer(config.bind_timeout)) + .push(limit::layer(MAX_IN_FLIGHT)) + .push(router::layer(Recognize::new())); let capacity = config.outbound_router_capacity; let max_idle_age = config.outbound_router_max_idle_age; - let router = router_stack + let router = dst_router_stack .make(&router::Config::new("out", capacity, max_idle_age)) .expect("outbound router"); + // As HTTP requests are accepted, we add some request extensions + // including metadata about the request's origin. + let server_stack = svc::stack::phantom_data::layer() + .push(insert_target::layer()) + .push(timestamp_request_open::layer()) + .bind(svc::shared::stack(router)); + serve( "out", outbound_listener, accept, connect, - source_layer.bind(svc::Shared::new(router)), + server_stack.map_err(|_| {}), config.outbound_ports_disable_protocol_detection, get_original_dst.clone(), drain_rx.clone(), @@ -289,27 +288,17 @@ where }; let inbound = { - use super::inbound; + use super::inbound::{self, Endpoint}; + use proxy::http::metrics; // As the inbound proxy accepts connections, we don't do any // special transport-level handling. let accept = transport_metrics.accept("inbound").bind(()); // Establishes connections to the local application. - let connect = transport_metrics - .connect("inbound") - .and_then(proxy::timeout::Layer::new(config.inbound_connect_timeout)) - .bind(connect::Stack::new()); - - // As HTTP requests are accepted, we add some request extensions - // including metadata about the request's origin. - // - // Furthermore, HTTP/2 requests may be downgraded to HTTP/1.1 per - // `orig-proto` headers. This happens in the source stack so that - // the router need not detect whether a request _will be_ downgraded. - let source_layer = timestamp_request_open::Layer::new() - .and_then(insert_target::Layer::new()) - .and_then(inbound::orig_proto_downgrade::Layer::new()); + let connect = connect::Stack::new() + .push(proxy::timeout::layer(config.inbound_connect_timeout)) + .push(transport_metrics.connect("inbound")); // A stack configured by `router::Config`, responsible for building // a router made of route stacks configured by `inbound::Endpoint`. @@ -321,35 +310,46 @@ where // selectively. For HTTP/2 stacks, for instance, neither service will be // employed. let default_fwd_addr = config.inbound_forward.map(|a| a.into()); - let router_layer = router::Layer::new(inbound::Recognize::new(default_fwd_addr)) - .and_then(limit::Layer::new(MAX_IN_FLIGHT)) - .and_then(buffer::Layer::new()) - .and_then(proxy::http::metrics::Layer::new( - http_metrics, - classify::Classify, - )) - .and_then(tap::Layer::new(tap_next_id, taps)) - .and_then(normalize_uri::Layer::new()) - .and_then(svc::stack_per_request::Layer::new()); - - let client = reconnect::Layer::new() - .and_then(client::Layer::new("in")) - .bind(connect.clone()); + let stack = connect + .clone() + .push(client::layer("in")) + .push(svc::stack::map_target::layer(|ep: &Endpoint| { + client::Config::from(ep.clone()) + })) + .push(reconnect::layer()) + .push(svc::stack_per_request::layer()) + .push(normalize_uri::layer()) + .push(tap::layer(tap_next_id, taps)) + .push(metrics::layer(http_metrics, classify::Classify)) + .push(buffer::layer()) + .push(limit::layer(MAX_IN_FLIGHT)) + .push(router::layer(inbound::Recognize::new(default_fwd_addr))); // Build a router using the above policy let capacity = config.inbound_router_capacity; let max_idle_age = config.inbound_router_max_idle_age; - let router = router_layer - .bind(client) + let router = stack .make(&router::Config::new("in", capacity, max_idle_age)) .expect("inbound router"); + // As HTTP requests are accepted, we add some request extensions + // including metadata about the request's origin. + // + // Furthermore, HTTP/2 requests may be downgraded to HTTP/1.1 per + // `orig-proto` headers. This happens in the source stack so that + // the router need not detect whether a request _will be_ downgraded. + let source_stack = svc::stack::phantom_data::layer() + .push(inbound::orig_proto_downgrade::layer()) + .push(insert_target::layer()) + .push(timestamp_request_open::layer()) + .bind(svc::shared::stack(router)); + serve( "in", inbound_listener, accept, connect, - source_layer.bind(svc::Shared::new(router)), + source_stack.map_err(|_| {}), config.inbound_ports_disable_protocol_detection, get_original_dst.clone(), drain_rx.clone(), diff --git a/src/app/outbound.rs b/src/app/outbound.rs index a66224a60..9f1e9ac21 100644 --- a/src/app/outbound.rs +++ b/src/app/outbound.rs @@ -214,13 +214,10 @@ pub mod orig_proto_upgrade { inner: M, } - impl Layer { - pub fn new() -> Self { - Layer - } + pub fn layer() -> Layer { + Layer } - impl svc::Layer for Layer where M: svc::Stack, diff --git a/src/lib.rs b/src/lib.rs index 706d6ef70..b168e9269 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,9 +2,6 @@ #![cfg_attr(feature = "cargo-clippy", allow(new_without_default_derive))] #![deny(warnings)] -// Stack type inference requires a deeper recursion limit -#![recursion_limit="128"] - extern crate bytes; extern crate env_logger; extern crate linkerd2_fs_watch as fs_watch; diff --git a/src/proxy/buffer.rs b/src/proxy/buffer.rs index ad3c20423..a6d6fcdde 100644 --- a/src/proxy/buffer.rs +++ b/src/proxy/buffer.rs @@ -1,15 +1,15 @@ extern crate tower_buffer; -use std::fmt; +use std::{error, fmt}; -pub use self::tower_buffer::{Buffer, SpawnError}; +pub use self::tower_buffer::{Buffer, Error as ServiceError, SpawnError}; use logging; use svc; /// Wraps `Service` stacks with a `Buffer`. #[derive(Debug, Clone)] -pub struct Layer; +pub struct Layer(); /// Produces `Service`s wrapped with a `Buffer` #[derive(Debug, Clone)] @@ -24,10 +24,8 @@ pub enum Error { // === impl Layer === -impl Layer { - pub fn new() -> Self { - Layer - } +pub fn layer() -> Layer { + Layer() } impl svc::Layer for Layer @@ -43,15 +41,12 @@ where type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { - Stack { - inner, - } + Stack { inner } } } // === impl Stack === - impl svc::Stack for Stack where T: fmt::Display + Clone + Send + Sync + 'static, @@ -80,3 +75,21 @@ impl fmt::Debug for Error { } } } + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Stack(e) => fmt::Display::fmt(e, fmt), + Error::Spawn(_) => write!(fmt, "Stack built without an executor"), + } + } +} + +impl error::Error for Error { + fn cause(&self) -> Option<&error::Error> { + match self { + Error::Stack(e) => e.cause(), + Error::Spawn(_) => None, + } + } +} diff --git a/src/proxy/http/balance.rs b/src/proxy/http/balance.rs index 9b4521806..53aa8a6bf 100644 --- a/src/proxy/http/balance.rs +++ b/src/proxy/http/balance.rs @@ -2,9 +2,9 @@ extern crate tower_balance; extern crate tower_discover; extern crate tower_h2_balance; -use self::tower_discover::Discover; use http; use std::time::Duration; +use self::tower_discover::Discover; use tower_h2::Body; pub use self::tower_balance::{choose::PowerOfTwoChoices, load::WithPeakEwma, Balance}; diff --git a/src/proxy/http/client.rs b/src/proxy/http/client.rs index 746c6a97f..7808085a8 100644 --- a/src/proxy/http/client.rs +++ b/src/proxy/http/client.rs @@ -154,16 +154,14 @@ impl Config { // === impl Layer === -impl Layer +pub fn layer(proxy_name: &'static str) -> Layer where B: tower_h2::Body + Send + 'static, ::Buf: Send + 'static, { - pub fn new(proxy_name: &'static str) -> Self { - Self { - proxy_name, - _p: PhantomData, - } + Layer { + proxy_name, + _p: PhantomData, } } @@ -180,9 +178,8 @@ where } } -impl svc::Layer for Layer +impl svc::Layer for Layer where - T: Into + Clone, C: svc::Stack, C::Value: connect::Connect + Clone + Send + Sync + 'static, ::Connected: Send, @@ -191,8 +188,8 @@ where B: tower_h2::Body + Send + 'static, ::Buf: Send + 'static, { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; type Stack = Stack; fn bind(&self, connect: C) -> Self::Stack { @@ -222,9 +219,8 @@ where } } -impl svc::Stack for Stack +impl svc::Stack for Stack where - T: Into + Clone, C: svc::Stack, C::Value: connect::Connect + Clone + Send + Sync + 'static, ::Connected: Send, @@ -236,13 +232,12 @@ where type Value = Client, B>; type Error = C::Error; - fn make(&self, t: &T) -> Result { - let config = t.clone().into(); + fn make(&self, config: &Config) -> Result { + debug!("building client={:?}", config); let connect = self.connect.make(&config.target)?; let executor = ::logging::Client::proxy(self.proxy_name, config.target.addr) .with_settings(config.settings.clone()) .executor(); - debug!("building client={:?}", config); Ok(Client::new(&config.settings, connect, executor)) } } @@ -261,7 +256,7 @@ where ::Buf: Send + 'static, { /// Create a new `Client`, bound to a specific protocol (HTTP/1 or HTTP/2). - fn new(settings: &Settings, connect: C, executor: E) -> Self { + pub fn new(settings: &Settings, connect: C, executor: E) -> Self { match settings { Settings::Http1 { was_absolute_form, .. } => { let h1 = hyper::Client::builder() diff --git a/src/proxy/http/insert_target.rs b/src/proxy/http/insert_target.rs index ed4318082..37f7ff1f4 100644 --- a/src/proxy/http/insert_target.rs +++ b/src/proxy/http/insert_target.rs @@ -21,10 +21,8 @@ pub struct Service { // === impl Layer === -impl Layer { - pub fn new() -> Self { - Layer - } +pub fn layer() -> Layer { + Layer } impl svc::Layer for Layer diff --git a/src/proxy/http/metrics/mod.rs b/src/proxy/http/metrics/mod.rs index baa8fd8fb..613ad0e56 100644 --- a/src/proxy/http/metrics/mod.rs +++ b/src/proxy/http/metrics/mod.rs @@ -12,7 +12,7 @@ mod service; pub mod timestamp_request_open; pub use self::report::Report; -pub use self::service::Layer; +pub use self::service::layer; pub fn new(retain_idle: Duration) -> (Arc>>, Report) where diff --git a/src/proxy/http/metrics/service.rs b/src/proxy/http/metrics/service.rs index 40846d2ae..531c02743 100644 --- a/src/proxy/http/metrics/service.rs +++ b/src/proxy/http/metrics/service.rs @@ -91,30 +91,27 @@ where // ===== impl Stack ===== -impl Layer +pub fn layer(registry: Arc>>, classify: C) + -> Layer where K: Clone + Hash + Eq, C: Classify + Clone, C::Class: Hash + Eq, C::ClassifyResponse: Send + Sync + 'static, + T: Clone + Debug, + K: From, + M: svc::Stack, + M::Value: svc::Service< + Request = http::Request>, + Response = http::Response, + >, + A: tower_h2::Body, + B: tower_h2::Body, { - pub fn new(registry: Arc>>, classify: C) -> Self - where - T: Clone + Debug, - K: From, - M: svc::Stack, - M::Value: svc::Service< - Request = http::Request>, - Response = http::Response, - >, - A: tower_h2::Body, - B: tower_h2::Body, - { - Self { - classify, - registry, - _p: PhantomData, - } + Layer { + classify, + registry, + _p: PhantomData, } } diff --git a/src/proxy/http/metrics/timestamp_request_open.rs b/src/proxy/http/metrics/timestamp_request_open.rs index 6dbc35827..9188cf1c7 100644 --- a/src/proxy/http/metrics/timestamp_request_open.rs +++ b/src/proxy/http/metrics/timestamp_request_open.rs @@ -24,8 +24,8 @@ pub struct TimestampRequestOpen { } /// Layers a `TimestampRequestOpen` middleware on an HTTP client. -#[derive(Debug)] -pub struct Layer(::std::marker::PhantomData (M)>); +#[derive(Clone, Debug)] +pub struct Layer(); /// Uses an `M`-typed `Stack` to build a `TimestampRequestOpen` service. #[derive(Clone, Debug)] @@ -54,19 +54,11 @@ where // === impl Layer === -impl Layer { - pub fn new() -> Self { - Layer(::std::marker::PhantomData) - } +pub fn layer() -> Layer { + Layer() } -impl Clone for Layer { - fn clone(&self) -> Self { - Self::new() - } -} - -impl svc::Layer for Layer +impl svc::Layer for Layer where M: svc::Stack, M::Value: svc::Service>, diff --git a/src/proxy/http/normalize_uri.rs b/src/proxy/http/normalize_uri.rs index 1359c6ebb..9adf27361 100644 --- a/src/proxy/http/normalize_uri.rs +++ b/src/proxy/http/normalize_uri.rs @@ -1,6 +1,5 @@ use futures::Poll; use http; -use std::marker::PhantomData; use super::h1; use svc; @@ -9,77 +8,43 @@ pub trait ShouldNormalizeUri { fn should_normalize_uri(&self) -> bool; } -pub struct Layer(PhantomData (T, M)>); +#[derive(Clone, Debug)] +pub struct Layer(); -pub struct Stack> { +#[derive(Clone, Debug)] +pub struct Stack { inner: N, - _p: PhantomData, } -#[derive(Copy, Clone, Debug)] +#[derive(Clone, Debug)] pub struct Service { inner: S, } // === impl Layer === -impl Layer -where - T: ShouldNormalizeUri, - M: svc::Stack, - M::Value: svc::Service>, -{ - pub fn new() -> Self { - Layer(PhantomData) - } +pub fn layer() -> Layer { + Layer() } -impl Clone for Layer +impl svc::Layer for Layer where T: ShouldNormalizeUri, M: svc::Stack, M::Value: svc::Service>, { - fn clone(&self) -> Self { - Self::new() - } -} - -impl svc::Layer for Layer -where - T: ShouldNormalizeUri, - M: svc::Stack, - M::Value: svc::Service>, -{ - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; - type Stack = Stack; + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { - Stack { - inner, - _p: PhantomData, - } + Stack { inner } } } // === impl Stack === -impl Clone for Stack -where - T: ShouldNormalizeUri, - M: svc::Stack + Clone, - M::Value: svc::Service>, -{ - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - _p: PhantomData, - } - } -} - -impl svc::Stack for Stack +impl svc::Stack for Stack where T: ShouldNormalizeUri, M: svc::Stack, diff --git a/src/proxy/http/router.rs b/src/proxy/http/router.rs index a9f29d8d6..4ab8e0217 100644 --- a/src/proxy/http/router.rs +++ b/src/proxy/http/router.rs @@ -2,9 +2,9 @@ use futures::{Future, Poll}; use h2; use http; use http::header::CONTENT_LENGTH; -use std::{error, fmt}; use std::marker::PhantomData; use std::time::Duration; +use std::{error, fmt}; use svc; @@ -25,17 +25,14 @@ pub struct Config { /// A `Rec`-typed `Recognize` instance is used to produce a target for each /// `Req`-typed request. If the router doesn't already have a `Service` for this /// target, it uses a `Stk`-typed `Service` stack. -#[derive(Debug)] -pub struct Layer { +#[derive(Clone, Debug)] +pub struct Layer> { recognize: Rec, - _p: PhantomData (Req, Stk)>, + _p: PhantomData Req>, } -pub struct Stack -where - Rec: Recognize, - Stk: svc::Stack + Clone, -{ +#[derive(Clone, Debug)] +pub struct Stack, Stk> { recognize: Rec, inner: Stk, _p: PhantomData Req>, @@ -76,27 +73,27 @@ impl fmt::Display for Config { // === impl Layer === -impl Layer +pub fn layer(recognize: Rec) -> Layer where - Rec: Recognize + Clone, + Rec: Recognize + Clone + Send + Sync + 'static, { - pub fn new(recognize: Rec) -> Self { - Layer { recognize, _p: PhantomData } + Layer { + recognize, + _p: PhantomData, } } -impl svc::Layer for Layer +impl svc::Layer for Layer where - Rec: Recognize + Clone, + Rec: Recognize + Clone + Send + Sync + 'static, Stk: svc::Stack + Clone + Send + Sync + 'static, - Stk::Value: svc::Service>, + Stk::Value: svc::Service>, ::Error: error::Error, Stk::Error: fmt::Debug, B: Default + Send + 'static, - Stack: svc::Stack, { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; type Stack = Stack; fn bind(&self, inner: Stk) -> Self::Stack { @@ -110,20 +107,6 @@ where // === impl Stack === -impl Clone for Stack -where - Rec: Recognize + Clone, - Stk: svc::Stack + Clone, -{ - fn clone(&self) -> Self { - Self { - recognize: self.recognize.clone(), - inner: self.inner.clone(), - _p: PhantomData - } - } -} - impl svc::Stack for Stack where Rec: Recognize + Clone + Send + Sync + 'static, diff --git a/src/proxy/limit.rs b/src/proxy/limit.rs index b86c57c29..1a25dac2c 100644 --- a/src/proxy/limit.rs +++ b/src/proxy/limit.rs @@ -1,43 +1,31 @@ extern crate tower_in_flight_limit; use std::fmt; -use std::marker::PhantomData; pub use self::tower_in_flight_limit::InFlightLimit; use svc; /// Wraps `Service` stacks with an `InFlightLimit`. -#[derive(Debug)] -pub struct Layer { +#[derive(Clone, Debug)] +pub struct Layer { max_in_flight: usize, - _p: PhantomData (T, M)> } /// Produces `Services` wrapped with an `InFlightLimit`. -#[derive(Debug)] -pub struct Stack { +#[derive(Clone, Debug)] +pub struct Stack { max_in_flight: usize, inner: M, - _p: PhantomData T> } -impl Layer { - pub fn new(max_in_flight: usize) -> Self { - Layer { - max_in_flight, - _p: PhantomData - } - } +// === impl Layer === + +pub fn layer(max_in_flight: usize) -> Layer { + Layer { max_in_flight } } -impl Clone for Layer { - fn clone(&self) -> Self { - Self::new(self.max_in_flight) - } -} - -impl svc::Layer for Layer +impl svc::Layer for Layer where T: fmt::Display + Clone + Send + Sync + 'static, M: svc::Stack, @@ -45,30 +33,21 @@ where ::Request: Send, ::Future: Send, { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; - type Stack = Stack; + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { Stack { inner, max_in_flight: self.max_in_flight, - _p: PhantomData } } } -impl Clone for Stack { - fn clone(&self) -> Self { - Self { - max_in_flight: self.max_in_flight, - inner: self.inner.clone(), - _p: PhantomData, - } - } -} +// === impl Stack === -impl svc::Stack for Stack +impl svc::Stack for Stack where T: fmt::Display + Clone + Send + Sync + 'static, M: svc::Stack, diff --git a/src/proxy/reconnect.rs b/src/proxy/reconnect.rs index 8ab19795e..1e2583abc 100644 --- a/src/proxy/reconnect.rs +++ b/src/proxy/reconnect.rs @@ -2,24 +2,21 @@ extern crate tower_reconnect; use futures::{task, Async, Future, Poll}; use std::fmt; -use std::marker::PhantomData; use std::time::Duration; pub use self::tower_reconnect::{Error, Reconnect}; use tokio_timer::{clock, Delay}; use svc; -#[derive(Debug)] -pub struct Layer { +#[derive(Clone, Debug)] +pub struct Layer { backoff: Backoff, - _p: PhantomData (T, M)>, } -#[derive(Debug)] -pub struct Stack { +#[derive(Clone, Debug)] +pub struct Stack { backoff: Backoff, inner: M, - _p: PhantomData T>, } /// Wraps `tower_reconnect`, handling errors. @@ -57,70 +54,43 @@ pub struct ResponseFuture { // === impl Layer === -impl Layer -where - T: fmt::Debug, - M: svc::Stack, - M::Value: svc::NewService, -{ - pub fn new() -> Self { - Self { - backoff: Backoff::None, - _p: PhantomData, - } - } - - // TODO: once a stacked clients needs backoff... - // - // pub fn with_fixed_backoff(self, wait: Duration) -> Self { - // Self { - // backoff: Backoff::Fixed(wait), - // .. self - // } - // } -} - -impl Clone for Layer { - fn clone(&self) -> Self { - Self { - backoff: self.backoff.clone(), - _p: PhantomData, - } +pub fn layer() -> Layer { + Layer { + backoff: Backoff::None, } } -impl svc::Layer for Layer +// TODO: once a stacked clients needs backoff... +// impl Layer { +// pub fn with_fixed_backoff(self, wait: Duration) -> Self { +// Self { +// backoff: Backoff::Fixed(wait), +// .. self +// } +// } +// } + +impl svc::Layer for Layer where T: Clone + fmt::Debug, M: svc::Stack, M::Value: svc::NewService, { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; - type Stack = Stack; + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { Stack { inner, backoff: self.backoff.clone(), - _p: PhantomData, } } } // === impl Stack === -impl Clone for Stack { - fn clone(&self) -> Self { - Self { - backoff: self.backoff.clone(), - inner: self.inner.clone(), - _p: PhantomData, - } - } -} - -impl svc::Stack for Stack +impl svc::Stack for Stack where T: Clone + fmt::Debug, M: svc::Stack, diff --git a/src/proxy/timeout.rs b/src/proxy/timeout.rs index 9f1846984..06777c0de 100644 --- a/src/proxy/timeout.rs +++ b/src/proxy/timeout.rs @@ -1,67 +1,42 @@ // TODO move to `timeout` crate. -use std::marker::PhantomData; use std::time::Duration; use svc; pub use timeout::Timeout; -#[derive(Debug)] -pub struct Layer { +#[derive(Clone, Debug)] +pub struct Layer { timeout: Duration, - _p: PhantomData (T, M)> } -#[derive(Debug)] -pub struct Stack { +#[derive(Clone, Debug)] +pub struct Stack { inner: M, timeout: Duration, - _p: PhantomData T> } -impl Layer { - pub fn new(timeout: Duration) -> Self { - Self { - timeout, - _p: PhantomData - } - } +pub fn layer(timeout: Duration) -> Layer { + Layer { timeout } } -impl Clone for Layer { - fn clone(&self) -> Self { - Self::new(self.timeout) - } -} - -impl svc::Layer for Layer +impl svc::Layer for Layer where M: svc::Stack, { - type Value = as svc::Stack>::Value; - type Error = as svc::Stack>::Error; - type Stack = Stack; + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; fn bind(&self, inner: M) -> Self::Stack { Stack { inner, timeout: self.timeout, - _p: PhantomData } } } -impl Clone for Stack { - fn clone(&self) -> Self { - Stack { - inner: self.inner.clone(), - timeout: self.timeout, - _p: PhantomData, - } - } -} - -impl svc::Stack for Stack +impl svc::Stack for Stack where M: svc::Stack, { diff --git a/src/svc.rs b/src/svc.rs index 8566dbcf7..efb430346 100644 --- a/src/svc.rs +++ b/src/svc.rs @@ -4,10 +4,10 @@ extern crate tower_service; pub use self::tower_service::{NewService, Service}; pub use self::stack::{ + shared, stack_per_request, watch, Either, Layer, Stack, - Shared, }; diff --git a/src/tap/mod.rs b/src/tap/mod.rs index 43eb84617..9da36f902 100644 --- a/src/tap/mod.rs +++ b/src/tap/mod.rs @@ -11,7 +11,7 @@ mod service; pub use self::event::{Direction, Endpoint, Event}; pub use self::match_::InvalidMatch; use self::match_::*; -pub use self::service::{Layer, Stack, RequestBody, Service}; +pub use self::service::layer; #[derive(Clone, Debug, Default)] pub struct NextId(Arc); diff --git a/src/tap/service.rs b/src/tap/service.rs index 2f381d226..af8080a36 100644 --- a/src/tap/service.rs +++ b/src/tap/service.rs @@ -82,7 +82,7 @@ pub struct ResponseBody { // === Layer === -impl Layer +pub fn layer(next_id: NextId, taps: Arc>) -> Layer where T: Clone + Into, M: svc::Stack, @@ -94,12 +94,10 @@ where A: Body, B: Body, { - pub fn new(next_id: NextId, taps: Arc>) -> Self { - Self { - next_id, - taps, - _p: PhantomData, - } + Layer { + next_id, + taps, + _p: PhantomData, } }