refactor: Build stacks from bottom-to-top (#111)

Previously, stacks were built with `Layer::and_then`. This pattern
severely impacts compile-times as stack complexity grows.

In order to ameliorate this, `app::main` has been changed to build
stacks from the "bottom" (endpoint client) to "top" (serverside
connection) by _push_-ing Layers onto a concrete stack, i.e. and not
composing layers for an abstract stack.

While doing this, we take the oppportunity to remove a ton of
now-unnecessary `PhantomData`. A new, dedicated `phantom_data` stack
module can be used to aid type inference as needed.

Other stack utilities like `map_target` and `map_err` have been
introduced to assist this transition.

Furthermore, all instances of `Layer::new` have been changed to a free
`fn layer` to improve readability.

This change sets up two upcoming changes: a stack-oriented `controller`
client and, subsequently, service-profile-based routing.
This commit is contained in:
Oliver Gould 2018-10-29 14:02:42 -07:00 committed by GitHub
parent 70d2f57e21
commit 4e0a1f0100
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 523 additions and 449 deletions

View File

@ -4,7 +4,7 @@ use futures::Poll;
use svc;
/// Describes two alternate `Layer`s, `Stacks`s or `Service`s.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum Either<A, B> {
A(A),
B(B),

View File

@ -10,15 +10,15 @@ use std::marker::PhantomData;
/// ```ignore
/// impl<M: Stack<SocketAddr>> Layer<Authority, SocketAddr, M> for BalanceLayer<M> { ... }
/// ```
pub trait Layer<T, U, M: super::Stack<U>> {
pub trait Layer<T, U, S: super::Stack<U>> {
type Value;
type Error;
type Stack: super::Stack<T, Value = Self::Value, Error = Self::Error>;
/// Produce a `Stack` value from a `M` value.
fn bind(&self, next: M) -> Self::Stack;
/// Produces a `Stack` value from a `M` value.
fn bind(&self, next: S) -> Self::Stack;
/// Compose this `Layer` with another.
/// Produces a new Layer with this layer wrapping the provided inner layer.
fn and_then<V, N, L>(self, inner: L)
-> AndThen<U, Self, L>
where
@ -32,6 +32,42 @@ pub trait Layer<T, U, M: super::Stack<U>> {
_p: PhantomData,
}
}
/// Produces a new Layer with another layer wrapping this one.
fn push<R, L>(self, outer: L)
-> AndThen<T, L, Self>
where
L: Layer<R, T, Self::Stack>,
Self: Sized,
{
AndThen {
outer,
inner: self,
_p: PhantomData,
}
}
/// Wraps this layer such that stack errors are modified by `map_err`.
fn map_err<M>(self, map_err: M)
-> AndThen<T, super::map_err::Layer<M>, Self>
where
Self: Sized,
M: super::map_err::MapErr<Self::Error>,
super::map_err::Layer<M>: Layer<T, T, Self::Stack>,
{
super::map_err::layer(map_err).and_then(self)
}
}
/// The identity layer.
impl<T, M: super::Stack<T>> Layer<T, T, M> for () {
type Value = M::Value;
type Error = M::Error;
type Stack = M;
fn bind(&self, inner: M) -> M {
inner
}
}
/// Combines two `Layers` as one.

View File

@ -3,10 +3,11 @@ extern crate futures;
extern crate log;
extern crate tower_service as svc;
use std::marker::PhantomData;
pub mod either;
pub mod layer;
mod map_err;
pub mod map_target;
pub mod phantom_data;
pub mod stack_new_service;
pub mod stack_per_request;
pub mod watch;
@ -39,29 +40,45 @@ pub trait Stack<T> {
{
layer.bind(self)
}
/// Wraps this `Stack` such that errors are altered by `map_err`
fn map_err<M>(self, map_err: M) -> map_err::Stack<Self, M>
where
M: map_err::MapErr<Self::Error>,
Self: Sized,
{
map_err::stack(self, map_err)
}
}
/// Implements `Stack<T>` for any `T` by cloning a `V`-typed value.
#[derive(Debug)]
pub struct Shared<T, V: Clone>(V, PhantomData<fn() -> T>);
pub mod shared {
use std::{error, fmt};
impl<T, V: Clone> Shared<T, V> {
pub fn new(v: V) -> Self {
Shared(v, PhantomData)
pub fn stack<V: Clone>(v: V) -> Stack<V> {
Stack(v)
}
}
impl<T, V: Clone> Clone for Shared<T, V> {
fn clone(&self) -> Self {
Self::new(self.0.clone())
#[derive(Clone, Debug)]
pub struct Stack<V: Clone>(V);
#[derive(Debug)]
pub enum Error {}
impl<T, V: Clone> super::Stack<T> for Stack<V> {
type Value = V;
type Error = Error;
fn make(&self, _: &T) -> Result<V, Error> {
Ok(self.0.clone())
}
}
}
impl<T, V: Clone> Stack<T> for Shared<T, V> {
type Value = V;
type Error = ();
fn make(&self, _: &T) -> Result<V, ()> {
Ok(self.0.clone())
impl fmt::Display for Error {
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
unreachable!()
}
}
impl error::Error for Error {}
}

72
lib/stack/src/map_err.rs Normal file
View File

@ -0,0 +1,72 @@
pub fn layer<E, M>(map_err: M) -> Layer<M>
where
M: MapErr<E>,
{
Layer(map_err)
}
pub(super) fn stack<T, S, M>(inner: S, map_err: M) -> Stack<S, M>
where
S: super::Stack<T>,
M: MapErr<S::Error>,
{
Stack {
inner,
map_err,
}
}
pub trait MapErr<Input> {
type Output;
fn map_err(&self, e: Input) -> Self::Output;
}
#[derive(Clone, Debug)]
pub struct Layer<M>(M);
#[derive(Clone, Debug)]
pub struct Stack<S, M> {
inner: S,
map_err: M,
}
impl<T, S, M> super::Layer<T, T, S> for Layer<M>
where
S: super::Stack<T>,
M: MapErr<S::Error> + Clone,
{
type Value = <Stack<S, M> as super::Stack<T>>::Value;
type Error = <Stack<S, M> as super::Stack<T>>::Error;
type Stack = Stack<S, M>;
fn bind(&self, inner: S) -> Self::Stack {
Stack {
inner,
map_err: self.0.clone(),
}
}
}
impl<T, S, M> super::Stack<T> for Stack<S, M>
where
S: super::Stack<T>,
M: MapErr<S::Error>,
{
type Value = S::Value;
type Error = M::Output;
fn make(&self, target: &T) -> Result<Self::Value, Self::Error> {
self.inner.make(target).map_err(|e| self.map_err.map_err(e))
}
}
impl<F, I, O> MapErr<I> for F
where
F: Fn(I) -> O,
{
type Output = O;
fn map_err(&self, i: I) -> O {
(self)(i)
}
}

View File

@ -0,0 +1,62 @@
pub fn layer<T, M>(map_target: M) -> Layer<M>
where
M: MapTarget<T>,
{
Layer(map_target)
}
pub trait MapTarget<T> {
type Target;
fn map_target(&self, t: &T) -> Self::Target;
}
#[derive(Clone, Debug)]
pub struct Layer<M>(M);
#[derive(Clone, Debug)]
pub struct Stack<S, M> {
inner: S,
map_target: M,
}
impl<T, S, M> super::Layer<T, M::Target, S> for Layer<M>
where
S: super::Stack<M::Target>,
M: MapTarget<T> + Clone,
{
type Value = <Stack<S, M> as super::Stack<T>>::Value;
type Error = <Stack<S, M> as super::Stack<T>>::Error;
type Stack = Stack<S, M>;
fn bind(&self, inner: S) -> Self::Stack {
Stack {
inner,
map_target: self.0.clone(),
}
}
}
impl<T, S, M> super::Stack<T> for Stack<S, M>
where
S: super::Stack<M::Target>,
M: MapTarget<T>,
{
type Value = S::Value;
type Error = S::Error;
fn make(&self, target: &T) -> Result<Self::Value, Self::Error> {
self.inner.make(&self.map_target.map_target(target))
}
}
impl<F, T, U> MapTarget<T> for F
where
F: Fn(&T) -> U,
{
type Target = U;
fn map_target(&self, t: &T) -> U {
(self)(t)
}
}

View File

@ -0,0 +1,39 @@
use std::marker::PhantomData;
pub fn layer<T, M>() -> Layer<T, M>
where
M: super::Stack<T>,
{
Layer(PhantomData)
}
#[derive(Clone, Debug)]
pub struct Layer<T, M>(PhantomData<fn() -> (T, M)>);
#[derive(Clone, Debug)]
pub struct Stack<T, M> {
inner: M,
_p: PhantomData<fn() -> T>,
}
impl<T, M: super::Stack<T>> super::Layer<T, T, M> for Layer<T, M> {
type Value = <Stack<T, M> as super::Stack<T>>::Value;
type Error = <Stack<T, M> as super::Stack<T>>::Error;
type Stack = Stack<T, M>;
fn bind(&self, inner: M) -> Self::Stack {
Stack {
inner,
_p: PhantomData
}
}
}
impl<T, M: super::Stack<T>> super::Stack<T> for Stack<T, M> {
type Value = M::Value;
type Error = M::Error;
fn make(&self, target: &T) -> Result<Self::Value, Self::Error> {
self.inner.make(target)
}
}

View File

@ -2,7 +2,6 @@
use futures::Poll;
use std::fmt;
use std::marker::PhantomData;
use svc;
@ -12,14 +11,13 @@ pub trait ShouldStackPerRequest {
/// A `Layer` produces a `Service` `Stack` that creates a new service for each
/// request.
#[derive(Debug)]
pub struct Layer<T, M>(PhantomData<fn() -> (T, M)>);
#[derive(Clone, Debug)]
pub struct Layer();
/// A `Stack` that builds a new `Service` for each request it serves.
#[derive(Debug)]
pub struct Stack<T, M: super::Stack<T>> {
#[derive(Clone, Debug)]
pub struct Stack<M> {
inner: M,
_p: PhantomData<fn() -> T>,
}
/// A `Service` that uses a new inner service for each request.
@ -36,6 +34,7 @@ pub struct Service<T, M: super::Stack<T>> {
/// A helper that asserts `M` can successfully build services for a specific
/// value of `T`.
#[derive(Clone, Debug)]
struct StackValid<T, M: super::Stack<T>> {
target: T,
make: M,
@ -43,63 +42,28 @@ struct StackValid<T, M: super::Stack<T>> {
// === Layer ===
impl<T, N> Layer<T, N>
where
T: ShouldStackPerRequest + Clone,
N: super::Stack<T> + Clone,
N::Error: fmt::Debug,
{
pub fn new() -> Self {
Layer(PhantomData)
}
pub fn layer() -> Layer {
Layer()
}
impl<T, N> Clone for Layer<T, N>
impl<T, N> super::Layer<T, T, N> for Layer
where
T: ShouldStackPerRequest + Clone,
N: super::Stack<T> + Clone,
N::Error: fmt::Debug,
{
fn clone(&self) -> Self {
Self::new()
}
}
impl<T, N> super::Layer<T, T, N> for Layer<T, N>
where
T: ShouldStackPerRequest + Clone,
N: super::Stack<T> + Clone,
N::Error: fmt::Debug,
{
type Value = <Stack<T, N> as super::Stack<T>>::Value;
type Error = <Stack<T, N> as super::Stack<T>>::Error;
type Stack = Stack<T, N>;
type Value = <Stack<N> as super::Stack<T>>::Value;
type Error = <Stack<N> as super::Stack<T>>::Error;
type Stack = Stack<N>;
fn bind(&self, inner: N) -> Self::Stack {
Stack {
inner,
_p: PhantomData,
}
Stack { inner }
}
}
// === Stack ===
impl<T, N> Clone for Stack<T, N>
where
T: ShouldStackPerRequest + Clone,
N: super::Stack<T> + Clone,
N::Error: fmt::Debug,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_p: PhantomData,
}
}
}
impl<T, N> super::Stack<T> for Stack<T, N>
impl<T, N> super::Stack<T> for Stack<N>
where
T: ShouldStackPerRequest + Clone,
N: super::Stack<T> + Clone,
@ -160,6 +124,19 @@ where
}
}
impl<T, N> Clone for Service<T, N>
where
T: ShouldStackPerRequest + Clone,
N: super::Stack<T> + Clone,
{
fn clone(&self) -> Self {
Self {
next: None,
make: self.make.clone(),
}
}
}
// === StackValid ===
impl<T, M> StackValid<T, M>

View File

@ -3,6 +3,7 @@ extern crate futures_watch;
use self::futures_watch::Watch;
use futures::{future::MapErr, Async, Future, Poll, Stream};
use std::{error, fmt};
use std::marker::PhantomData;
use svc;
@ -14,14 +15,16 @@ pub trait WithUpdate<U> {
}
#[derive(Debug)]
pub struct Layer<U> {
pub struct Layer<T: WithUpdate<U>, U, M> {
watch: Watch<U>,
_p: PhantomData<fn() -> (T, M)>,
}
#[derive(Debug)]
pub struct Stack<U, M> {
pub struct Stack<T: WithUpdate<U>, U, M> {
watch: Watch<U>,
inner: M,
_p: PhantomData<fn() -> T>,
}
/// A Service that updates itself as a Watch updates.
@ -45,47 +48,58 @@ pub struct CloneUpdate {}
// === impl Layer ===
pub fn layer<U>(watch: Watch<U>) -> Layer<U> {
Layer { watch }
}
impl<U> Clone for Layer<U> {
fn clone(&self) -> Self {
Self {
watch: self.watch.clone(),
}
}
}
impl<T, U, M> super::Layer<T, T::Updated, M> for Layer<U>
pub fn layer<T, U, M>(watch: Watch<U>) -> Layer<T, U, M>
where
T: WithUpdate<U> + Clone,
M: super::Stack<T::Updated> + Clone,
{
type Value = <Stack<U, M> as super::Stack<T>>::Value;
type Error = <Stack<U, M> as super::Stack<T>>::Error;
type Stack = Stack<U, M>;
Layer {
watch,
_p: PhantomData,
}
}
impl<T, U, M> Clone for Layer<T, U, M>
where
T: WithUpdate<U> + Clone,
M: super::Stack<T::Updated> + Clone,
{
fn clone(&self) -> Self {
layer(self.watch.clone())
}
}
impl<T, U, M> super::Layer<T, T::Updated, M> for Layer<T, U, M>
where
T: WithUpdate<U> + Clone,
M: super::Stack<T::Updated> + Clone,
{
type Value = <Stack<T, U, M> as super::Stack<T>>::Value;
type Error = <Stack<T, U, M> as super::Stack<T>>::Error;
type Stack = Stack<T, U, M>;
fn bind(&self, inner: M) -> Self::Stack {
Stack {
inner,
watch: self.watch.clone(),
_p: PhantomData,
}
}
}
// === impl Stack ===
impl<U, M: Clone> Clone for Stack<U, M> {
impl<T: WithUpdate<U>, U, M: Clone> Clone for Stack<T, U, M> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
watch: self.watch.clone(),
_p: PhantomData,
}
}
}
impl<T, U, M> super::Stack<T> for Stack<U, M>
impl<T, U, M> super::Stack<T> for Stack<T, U, M>
where
T: WithUpdate<U> + Clone,
M: super::Stack<T::Updated> + Clone,

View File

@ -130,10 +130,8 @@ pub mod orig_proto_downgrade {
// === impl Layer ===
impl Layer {
pub fn new() -> Self {
Layer
}
pub fn layer() -> Layer {
Layer
}
impl<M, A, B> svc::Layer<Source, Source, M> for Layer

View File

@ -221,7 +221,7 @@ where
let (drain_tx, drain_rx) = drain::channel();
let outbound = {
use super::outbound::{discovery::Resolve, orig_proto_upgrade, Recognize};
use super::outbound::{discovery::Resolve, orig_proto_upgrade, Endpoint, Recognize};
use proxy::{
http::{balance, metrics},
resolve,
@ -234,54 +234,53 @@ where
let accept = transport_metrics.accept("outbound").bind(());
// Establishes connections to remote peers.
let connect = transport_metrics
.connect("outbound")
.and_then(proxy::timeout::Layer::new(config.outbound_connect_timeout))
.bind(connect::Stack::new());
let connect = connect::Stack::new()
.push(proxy::timeout::layer(config.outbound_connect_timeout))
.push(transport_metrics.connect("outbound"));
// As HTTP requests are accepted, we add some request extensions
// including metadata about the request's origin.
let source_layer =
timestamp_request_open::Layer::new().and_then(insert_target::Layer::new());
let client_stack = connect
.clone()
.push(client::layer("out"))
.push(svc::stack::map_target::layer(|ep: &Endpoint| {
client::Config::from(ep.clone())
}))
.push(reconnect::layer());
// `normalize_uri` and `stack_per_request` are applied on the stack
// selectively. For HTTP/2 stacks, for instance, neither service will be
// employed.
//
// The TLS status of outbound requests depends on the local
// configuration. As the local configuration changes, the inner
// stack (including a Client) is rebuilt with the appropriate
// settings. Stack layers above this operate on an `Endpoint` with
// the TLS client config is marked as `NoConfig` when the endpoint
// has a TLS identity.
let router_stack = router::Layer::new(Recognize::new())
.and_then(limit::Layer::new(MAX_IN_FLIGHT))
.and_then(timeout::Layer::new(config.bind_timeout))
.and_then(buffer::Layer::new())
.and_then(balance::layer())
.and_then(resolve::layer(Resolve::new(resolver)))
.and_then(orig_proto_upgrade::Layer::new())
.and_then(svc::watch::layer(tls_client_config))
.and_then(metrics::Layer::new(http_metrics, classify::Classify))
.and_then(tap::Layer::new(tap_next_id.clone(), taps.clone()))
.and_then(normalize_uri::Layer::new())
.and_then(svc::stack_per_request::Layer::new())
.and_then(reconnect::Layer::new())
.and_then(client::Layer::new("out"))
.bind(connect.clone());
let endpoint_stack = client_stack
.push(svc::stack_per_request::layer())
.push(normalize_uri::layer())
.push(orig_proto_upgrade::layer())
.push(tap::layer(tap_next_id.clone(), taps.clone()))
.push(metrics::layer(http_metrics, classify::Classify))
.push(svc::watch::layer(tls_client_config));
let dst_router_stack = endpoint_stack
.push(resolve::layer(Resolve::new(resolver)))
.push(balance::layer())
.push(buffer::layer())
.push(timeout::layer(config.bind_timeout))
.push(limit::layer(MAX_IN_FLIGHT))
.push(router::layer(Recognize::new()));
let capacity = config.outbound_router_capacity;
let max_idle_age = config.outbound_router_max_idle_age;
let router = router_stack
let router = dst_router_stack
.make(&router::Config::new("out", capacity, max_idle_age))
.expect("outbound router");
// As HTTP requests are accepted, we add some request extensions
// including metadata about the request's origin.
let server_stack = svc::stack::phantom_data::layer()
.push(insert_target::layer())
.push(timestamp_request_open::layer())
.bind(svc::shared::stack(router));
serve(
"out",
outbound_listener,
accept,
connect,
source_layer.bind(svc::Shared::new(router)),
server_stack.map_err(|_| {}),
config.outbound_ports_disable_protocol_detection,
get_original_dst.clone(),
drain_rx.clone(),
@ -289,27 +288,17 @@ where
};
let inbound = {
use super::inbound;
use super::inbound::{self, Endpoint};
use proxy::http::metrics;
// As the inbound proxy accepts connections, we don't do any
// special transport-level handling.
let accept = transport_metrics.accept("inbound").bind(());
// Establishes connections to the local application.
let connect = transport_metrics
.connect("inbound")
.and_then(proxy::timeout::Layer::new(config.inbound_connect_timeout))
.bind(connect::Stack::new());
// As HTTP requests are accepted, we add some request extensions
// including metadata about the request's origin.
//
// Furthermore, HTTP/2 requests may be downgraded to HTTP/1.1 per
// `orig-proto` headers. This happens in the source stack so that
// the router need not detect whether a request _will be_ downgraded.
let source_layer = timestamp_request_open::Layer::new()
.and_then(insert_target::Layer::new())
.and_then(inbound::orig_proto_downgrade::Layer::new());
let connect = connect::Stack::new()
.push(proxy::timeout::layer(config.inbound_connect_timeout))
.push(transport_metrics.connect("inbound"));
// A stack configured by `router::Config`, responsible for building
// a router made of route stacks configured by `inbound::Endpoint`.
@ -321,35 +310,46 @@ where
// selectively. For HTTP/2 stacks, for instance, neither service will be
// employed.
let default_fwd_addr = config.inbound_forward.map(|a| a.into());
let router_layer = router::Layer::new(inbound::Recognize::new(default_fwd_addr))
.and_then(limit::Layer::new(MAX_IN_FLIGHT))
.and_then(buffer::Layer::new())
.and_then(proxy::http::metrics::Layer::new(
http_metrics,
classify::Classify,
))
.and_then(tap::Layer::new(tap_next_id, taps))
.and_then(normalize_uri::Layer::new())
.and_then(svc::stack_per_request::Layer::new());
let client = reconnect::Layer::new()
.and_then(client::Layer::new("in"))
.bind(connect.clone());
let stack = connect
.clone()
.push(client::layer("in"))
.push(svc::stack::map_target::layer(|ep: &Endpoint| {
client::Config::from(ep.clone())
}))
.push(reconnect::layer())
.push(svc::stack_per_request::layer())
.push(normalize_uri::layer())
.push(tap::layer(tap_next_id, taps))
.push(metrics::layer(http_metrics, classify::Classify))
.push(buffer::layer())
.push(limit::layer(MAX_IN_FLIGHT))
.push(router::layer(inbound::Recognize::new(default_fwd_addr)));
// Build a router using the above policy
let capacity = config.inbound_router_capacity;
let max_idle_age = config.inbound_router_max_idle_age;
let router = router_layer
.bind(client)
let router = stack
.make(&router::Config::new("in", capacity, max_idle_age))
.expect("inbound router");
// As HTTP requests are accepted, we add some request extensions
// including metadata about the request's origin.
//
// Furthermore, HTTP/2 requests may be downgraded to HTTP/1.1 per
// `orig-proto` headers. This happens in the source stack so that
// the router need not detect whether a request _will be_ downgraded.
let source_stack = svc::stack::phantom_data::layer()
.push(inbound::orig_proto_downgrade::layer())
.push(insert_target::layer())
.push(timestamp_request_open::layer())
.bind(svc::shared::stack(router));
serve(
"in",
inbound_listener,
accept,
connect,
source_layer.bind(svc::Shared::new(router)),
source_stack.map_err(|_| {}),
config.inbound_ports_disable_protocol_detection,
get_original_dst.clone(),
drain_rx.clone(),

View File

@ -214,13 +214,10 @@ pub mod orig_proto_upgrade {
inner: M,
}
impl Layer {
pub fn new() -> Self {
Layer
}
pub fn layer() -> Layer {
Layer
}
impl<M, A, B> svc::Layer<Endpoint, Endpoint, M> for Layer
where
M: svc::Stack<Endpoint>,

View File

@ -2,9 +2,6 @@
#![cfg_attr(feature = "cargo-clippy", allow(new_without_default_derive))]
#![deny(warnings)]
// Stack type inference requires a deeper recursion limit
#![recursion_limit="128"]
extern crate bytes;
extern crate env_logger;
extern crate linkerd2_fs_watch as fs_watch;

View File

@ -1,15 +1,15 @@
extern crate tower_buffer;
use std::fmt;
use std::{error, fmt};
pub use self::tower_buffer::{Buffer, SpawnError};
pub use self::tower_buffer::{Buffer, Error as ServiceError, SpawnError};
use logging;
use svc;
/// Wraps `Service` stacks with a `Buffer`.
#[derive(Debug, Clone)]
pub struct Layer;
pub struct Layer();
/// Produces `Service`s wrapped with a `Buffer`
#[derive(Debug, Clone)]
@ -24,10 +24,8 @@ pub enum Error<M, S> {
// === impl Layer ===
impl Layer {
pub fn new() -> Self {
Layer
}
pub fn layer() -> Layer {
Layer()
}
impl<T, M> svc::Layer<T, T, M> for Layer
@ -43,15 +41,12 @@ where
type Stack = Stack<M>;
fn bind(&self, inner: M) -> Self::Stack {
Stack {
inner,
}
Stack { inner }
}
}
// === impl Stack ===
impl<T, M> svc::Stack<T> for Stack<M>
where
T: fmt::Display + Clone + Send + Sync + 'static,
@ -80,3 +75,21 @@ impl<M: fmt::Debug, S> fmt::Debug for Error<M, S> {
}
}
}
impl<M: fmt::Display, S> fmt::Display for Error<M, S> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Stack(e) => fmt::Display::fmt(e, fmt),
Error::Spawn(_) => write!(fmt, "Stack built without an executor"),
}
}
}
impl<M: error::Error, S> error::Error for Error<M, S> {
fn cause(&self) -> Option<&error::Error> {
match self {
Error::Stack(e) => e.cause(),
Error::Spawn(_) => None,
}
}
}

View File

@ -2,9 +2,9 @@ extern crate tower_balance;
extern crate tower_discover;
extern crate tower_h2_balance;
use self::tower_discover::Discover;
use http;
use std::time::Duration;
use self::tower_discover::Discover;
use tower_h2::Body;
pub use self::tower_balance::{choose::PowerOfTwoChoices, load::WithPeakEwma, Balance};

View File

@ -154,16 +154,14 @@ impl Config {
// === impl Layer ===
impl<B> Layer<B>
pub fn layer<B>(proxy_name: &'static str) -> Layer<B>
where
B: tower_h2::Body + Send + 'static,
<B::Data as IntoBuf>::Buf: Send + 'static,
{
pub fn new(proxy_name: &'static str) -> Self {
Self {
proxy_name,
_p: PhantomData,
}
Layer {
proxy_name,
_p: PhantomData,
}
}
@ -180,9 +178,8 @@ where
}
}
impl<T, C, B> svc::Layer<T, connect::Target, C> for Layer<B>
impl<C, B> svc::Layer<Config, connect::Target, C> for Layer<B>
where
T: Into<Config> + Clone,
C: svc::Stack<connect::Target>,
C::Value: connect::Connect + Clone + Send + Sync + 'static,
<C::Value as connect::Connect>::Connected: Send,
@ -191,8 +188,8 @@ where
B: tower_h2::Body + Send + 'static,
<B::Data as IntoBuf>::Buf: Send + 'static,
{
type Value = <Stack<C, B> as svc::Stack<T>>::Value;
type Error = <Stack<C, B> as svc::Stack<T>>::Error;
type Value = <Stack<C, B> as svc::Stack<Config>>::Value;
type Error = <Stack<C, B> as svc::Stack<Config>>::Error;
type Stack = Stack<C, B>;
fn bind(&self, connect: C) -> Self::Stack {
@ -222,9 +219,8 @@ where
}
}
impl<T, C, B> svc::Stack<T> for Stack<C, B>
impl<C, B> svc::Stack<Config> for Stack<C, B>
where
T: Into<Config> + Clone,
C: svc::Stack<connect::Target>,
C::Value: connect::Connect + Clone + Send + Sync + 'static,
<C::Value as connect::Connect>::Connected: Send,
@ -236,13 +232,12 @@ where
type Value = Client<C::Value, ::logging::ClientExecutor<&'static str, net::SocketAddr>, B>;
type Error = C::Error;
fn make(&self, t: &T) -> Result<Self::Value, Self::Error> {
let config = t.clone().into();
fn make(&self, config: &Config) -> Result<Self::Value, Self::Error> {
debug!("building client={:?}", config);
let connect = self.connect.make(&config.target)?;
let executor = ::logging::Client::proxy(self.proxy_name, config.target.addr)
.with_settings(config.settings.clone())
.executor();
debug!("building client={:?}", config);
Ok(Client::new(&config.settings, connect, executor))
}
}
@ -261,7 +256,7 @@ where
<B::Data as IntoBuf>::Buf: Send + 'static,
{
/// Create a new `Client`, bound to a specific protocol (HTTP/1 or HTTP/2).
fn new(settings: &Settings, connect: C, executor: E) -> Self {
pub fn new(settings: &Settings, connect: C, executor: E) -> Self {
match settings {
Settings::Http1 { was_absolute_form, .. } => {
let h1 = hyper::Client::builder()

View File

@ -21,10 +21,8 @@ pub struct Service<T, S> {
// === impl Layer ===
impl Layer {
pub fn new() -> Self {
Layer
}
pub fn layer() -> Layer {
Layer
}
impl<T, M, B> svc::Layer<T, T, M> for Layer

View File

@ -12,7 +12,7 @@ mod service;
pub mod timestamp_request_open;
pub use self::report::Report;
pub use self::service::Layer;
pub use self::service::layer;
pub fn new<T, C>(retain_idle: Duration) -> (Arc<Mutex<Registry<T, C>>>, Report<T, C>)
where

View File

@ -91,30 +91,27 @@ where
// ===== impl Stack =====
impl<M, K, C> Layer<M, K, C>
pub fn layer<M, K, C, T, A, B>(registry: Arc<Mutex<Registry<K, C::Class>>>, classify: C)
-> Layer<M, K, C>
where
K: Clone + Hash + Eq,
C: Classify<Error = h2::Error> + Clone,
C::Class: Hash + Eq,
C::ClassifyResponse: Send + Sync + 'static,
T: Clone + Debug,
K: From<T>,
M: svc::Stack<T>,
M::Value: svc::Service<
Request = http::Request<RequestBody<A, C::Class>>,
Response = http::Response<B>,
>,
A: tower_h2::Body,
B: tower_h2::Body,
{
pub fn new<T, A, B>(registry: Arc<Mutex<Registry<K, C::Class>>>, classify: C) -> Self
where
T: Clone + Debug,
K: From<T>,
M: svc::Stack<T>,
M::Value: svc::Service<
Request = http::Request<RequestBody<A, C::Class>>,
Response = http::Response<B>,
>,
A: tower_h2::Body,
B: tower_h2::Body,
{
Self {
classify,
registry,
_p: PhantomData,
}
Layer {
classify,
registry,
_p: PhantomData,
}
}

View File

@ -24,8 +24,8 @@ pub struct TimestampRequestOpen<S> {
}
/// Layers a `TimestampRequestOpen` middleware on an HTTP client.
#[derive(Debug)]
pub struct Layer<M>(::std::marker::PhantomData<fn() -> (M)>);
#[derive(Clone, Debug)]
pub struct Layer();
/// Uses an `M`-typed `Stack` to build a `TimestampRequestOpen` service.
#[derive(Clone, Debug)]
@ -54,19 +54,11 @@ where
// === impl Layer ===
impl<M> Layer<M> {
pub fn new() -> Self {
Layer(::std::marker::PhantomData)
}
pub fn layer() -> Layer {
Layer()
}
impl<M> Clone for Layer<M> {
fn clone(&self) -> Self {
Self::new()
}
}
impl<T, B, M> svc::Layer<T, T, M> for Layer<M>
impl<T, B, M> svc::Layer<T, T, M> for Layer
where
M: svc::Stack<T>,
M::Value: svc::Service<Request = http::Request<B>>,

View File

@ -1,6 +1,5 @@
use futures::Poll;
use http;
use std::marker::PhantomData;
use super::h1;
use svc;
@ -9,77 +8,43 @@ pub trait ShouldNormalizeUri {
fn should_normalize_uri(&self) -> bool;
}
pub struct Layer<T, M>(PhantomData<fn() -> (T, M)>);
#[derive(Clone, Debug)]
pub struct Layer();
pub struct Stack<T, N: svc::Stack<T>> {
#[derive(Clone, Debug)]
pub struct Stack<N> {
inner: N,
_p: PhantomData<T>,
}
#[derive(Copy, Clone, Debug)]
#[derive(Clone, Debug)]
pub struct Service<S> {
inner: S,
}
// === impl Layer ===
impl<T, B, M> Layer<T, M>
where
T: ShouldNormalizeUri,
M: svc::Stack<T>,
M::Value: svc::Service<Request = http::Request<B>>,
{
pub fn new() -> Self {
Layer(PhantomData)
}
pub fn layer() -> Layer {
Layer()
}
impl<T, B, M> Clone for Layer<T, M>
impl<T, B, M> svc::Layer<T, T, M> for Layer
where
T: ShouldNormalizeUri,
M: svc::Stack<T>,
M::Value: svc::Service<Request = http::Request<B>>,
{
fn clone(&self) -> Self {
Self::new()
}
}
impl<T, B, M> svc::Layer<T, T, M> for Layer<T, M>
where
T: ShouldNormalizeUri,
M: svc::Stack<T>,
M::Value: svc::Service<Request = http::Request<B>>,
{
type Value = <Stack<T, M> as svc::Stack<T>>::Value;
type Error = <Stack<T, M> as svc::Stack<T>>::Error;
type Stack = Stack<T, M>;
type Value = <Stack<M> as svc::Stack<T>>::Value;
type Error = <Stack<M> as svc::Stack<T>>::Error;
type Stack = Stack<M>;
fn bind(&self, inner: M) -> Self::Stack {
Stack {
inner,
_p: PhantomData,
}
Stack { inner }
}
}
// === impl Stack ===
impl<T, B, M> Clone for Stack<T, M>
where
T: ShouldNormalizeUri,
M: svc::Stack<T> + Clone,
M::Value: svc::Service<Request = http::Request<B>>,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_p: PhantomData,
}
}
}
impl<T, B, M> svc::Stack<T> for Stack<T, M>
impl<T, B, M> svc::Stack<T> for Stack<M>
where
T: ShouldNormalizeUri,
M: svc::Stack<T>,

View File

@ -2,9 +2,9 @@ use futures::{Future, Poll};
use h2;
use http;
use http::header::CONTENT_LENGTH;
use std::{error, fmt};
use std::marker::PhantomData;
use std::time::Duration;
use std::{error, fmt};
use svc;
@ -25,17 +25,14 @@ pub struct Config {
/// A `Rec`-typed `Recognize` instance is used to produce a target for each
/// `Req`-typed request. If the router doesn't already have a `Service` for this
/// target, it uses a `Stk`-typed `Service` stack.
#[derive(Debug)]
pub struct Layer<Req, Rec, Stk> {
#[derive(Clone, Debug)]
pub struct Layer<Req, Rec: Recognize<Req>> {
recognize: Rec,
_p: PhantomData<fn() -> (Req, Stk)>,
_p: PhantomData<fn() -> Req>,
}
pub struct Stack<Req, Rec, Stk>
where
Rec: Recognize<Req>,
Stk: svc::Stack<Rec::Target> + Clone,
{
#[derive(Clone, Debug)]
pub struct Stack<Req, Rec: Recognize<Req>, Stk> {
recognize: Rec,
inner: Stk,
_p: PhantomData<fn() -> Req>,
@ -76,27 +73,27 @@ impl fmt::Display for Config {
// === impl Layer ===
impl<Req, Rec, Stk> Layer<Req, Rec, Stk>
pub fn layer<Rec, Req>(recognize: Rec) -> Layer<Req, Rec>
where
Rec: Recognize<Req> + Clone,
Rec: Recognize<Req> + Clone + Send + Sync + 'static,
{
pub fn new(recognize: Rec) -> Self {
Layer { recognize, _p: PhantomData }
Layer {
recognize,
_p: PhantomData,
}
}
impl<T, Req, Rec, Stk, B> svc::Layer<T, Rec::Target, Stk> for Layer<Req, Rec, Stk>
impl<Req, Rec, Stk, B> svc::Layer<Config, Rec::Target, Stk> for Layer<Req, Rec>
where
Rec: Recognize<Req> + Clone,
Rec: Recognize<Req> + Clone + Send + Sync + 'static,
Stk: svc::Stack<Rec::Target> + Clone + Send + Sync + 'static,
Stk::Value: svc::Service<Response = http::Response<B>>,
Stk::Value: svc::Service<Request = Req, Response = http::Response<B>>,
<Stk::Value as svc::Service>::Error: error::Error,
Stk::Error: fmt::Debug,
B: Default + Send + 'static,
Stack<Req, Rec, Stk>: svc::Stack<T>,
{
type Value = <Stack<Req, Rec, Stk> as svc::Stack<T>>::Value;
type Error = <Stack<Req, Rec, Stk> as svc::Stack<T>>::Error;
type Value = <Stack<Req, Rec, Stk> as svc::Stack<Config>>::Value;
type Error = <Stack<Req, Rec, Stk> as svc::Stack<Config>>::Error;
type Stack = Stack<Req, Rec, Stk>;
fn bind(&self, inner: Stk) -> Self::Stack {
@ -110,20 +107,6 @@ where
// === impl Stack ===
impl<Req, Rec, Stk> Clone for Stack<Req, Rec, Stk>
where
Rec: Recognize<Req> + Clone,
Stk: svc::Stack<Rec::Target> + Clone,
{
fn clone(&self) -> Self {
Self {
recognize: self.recognize.clone(),
inner: self.inner.clone(),
_p: PhantomData
}
}
}
impl<Req, Rec, Stk, B> svc::Stack<Config> for Stack<Req, Rec, Stk>
where
Rec: Recognize<Req> + Clone + Send + Sync + 'static,

View File

@ -1,43 +1,31 @@
extern crate tower_in_flight_limit;
use std::fmt;
use std::marker::PhantomData;
pub use self::tower_in_flight_limit::InFlightLimit;
use svc;
/// Wraps `Service` stacks with an `InFlightLimit`.
#[derive(Debug)]
pub struct Layer<T, M> {
#[derive(Clone, Debug)]
pub struct Layer {
max_in_flight: usize,
_p: PhantomData<fn() -> (T, M)>
}
/// Produces `Services` wrapped with an `InFlightLimit`.
#[derive(Debug)]
pub struct Stack<T, M> {
#[derive(Clone, Debug)]
pub struct Stack<M> {
max_in_flight: usize,
inner: M,
_p: PhantomData<fn() -> T>
}
impl<T, M> Layer<T, M> {
pub fn new(max_in_flight: usize) -> Self {
Layer {
max_in_flight,
_p: PhantomData
}
}
// === impl Layer ===
pub fn layer(max_in_flight: usize) -> Layer {
Layer { max_in_flight }
}
impl<T, M> Clone for Layer<T, M> {
fn clone(&self) -> Self {
Self::new(self.max_in_flight)
}
}
impl<T, M> svc::Layer<T, T, M> for Layer<T, M>
impl<T, M> svc::Layer<T, T, M> for Layer
where
T: fmt::Display + Clone + Send + Sync + 'static,
M: svc::Stack<T>,
@ -45,30 +33,21 @@ where
<M::Value as svc::Service>::Request: Send,
<M::Value as svc::Service>::Future: Send,
{
type Value = <Stack<T, M> as svc::Stack<T>>::Value;
type Error = <Stack<T, M> as svc::Stack<T>>::Error;
type Stack = Stack<T, M>;
type Value = <Stack<M> as svc::Stack<T>>::Value;
type Error = <Stack<M> as svc::Stack<T>>::Error;
type Stack = Stack<M>;
fn bind(&self, inner: M) -> Self::Stack {
Stack {
inner,
max_in_flight: self.max_in_flight,
_p: PhantomData
}
}
}
impl<T, M: Clone> Clone for Stack<T, M> {
fn clone(&self) -> Self {
Self {
max_in_flight: self.max_in_flight,
inner: self.inner.clone(),
_p: PhantomData,
}
}
}
// === impl Stack ===
impl<T, M> svc::Stack<T> for Stack<T, M>
impl<T, M> svc::Stack<T> for Stack<M>
where
T: fmt::Display + Clone + Send + Sync + 'static,
M: svc::Stack<T>,

View File

@ -2,24 +2,21 @@ extern crate tower_reconnect;
use futures::{task, Async, Future, Poll};
use std::fmt;
use std::marker::PhantomData;
use std::time::Duration;
pub use self::tower_reconnect::{Error, Reconnect};
use tokio_timer::{clock, Delay};
use svc;
#[derive(Debug)]
pub struct Layer<T, M> {
#[derive(Clone, Debug)]
pub struct Layer {
backoff: Backoff,
_p: PhantomData<fn() -> (T, M)>,
}
#[derive(Debug)]
pub struct Stack<T, M> {
#[derive(Clone, Debug)]
pub struct Stack<M> {
backoff: Backoff,
inner: M,
_p: PhantomData<fn() -> T>,
}
/// Wraps `tower_reconnect`, handling errors.
@ -57,70 +54,43 @@ pub struct ResponseFuture<N: svc::NewService> {
// === impl Layer ===
impl<T, M> Layer<T, M>
where
T: fmt::Debug,
M: svc::Stack<T>,
M::Value: svc::NewService,
{
pub fn new() -> Self {
Self {
backoff: Backoff::None,
_p: PhantomData,
}
}
// TODO: once a stacked clients needs backoff...
//
// pub fn with_fixed_backoff(self, wait: Duration) -> Self {
// Self {
// backoff: Backoff::Fixed(wait),
// .. self
// }
// }
}
impl<T, M> Clone for Layer<T, M> {
fn clone(&self) -> Self {
Self {
backoff: self.backoff.clone(),
_p: PhantomData,
}
pub fn layer() -> Layer {
Layer {
backoff: Backoff::None,
}
}
impl<T, M> svc::Layer<T, T, M> for Layer<T, M>
// TODO: once a stacked clients needs backoff...
// impl Layer {
// pub fn with_fixed_backoff(self, wait: Duration) -> Self {
// Self {
// backoff: Backoff::Fixed(wait),
// .. self
// }
// }
// }
impl<T, M> svc::Layer<T, T, M> for Layer
where
T: Clone + fmt::Debug,
M: svc::Stack<T>,
M::Value: svc::NewService,
{
type Value = <Stack<T, M> as svc::Stack<T>>::Value;
type Error = <Stack<T, M> as svc::Stack<T>>::Error;
type Stack = Stack<T, M>;
type Value = <Stack<M> as svc::Stack<T>>::Value;
type Error = <Stack<M> as svc::Stack<T>>::Error;
type Stack = Stack<M>;
fn bind(&self, inner: M) -> Self::Stack {
Stack {
inner,
backoff: self.backoff.clone(),
_p: PhantomData,
}
}
}
// === impl Stack ===
impl<T, M: Clone> Clone for Stack<T, M> {
fn clone(&self) -> Self {
Self {
backoff: self.backoff.clone(),
inner: self.inner.clone(),
_p: PhantomData,
}
}
}
impl<T, M> svc::Stack<T> for Stack<T, M>
impl<T, M> svc::Stack<T> for Stack<M>
where
T: Clone + fmt::Debug,
M: svc::Stack<T>,

View File

@ -1,67 +1,42 @@
// TODO move to `timeout` crate.
use std::marker::PhantomData;
use std::time::Duration;
use svc;
pub use timeout::Timeout;
#[derive(Debug)]
pub struct Layer<T, M> {
#[derive(Clone, Debug)]
pub struct Layer {
timeout: Duration,
_p: PhantomData<fn() -> (T, M)>
}
#[derive(Debug)]
pub struct Stack<T, M> {
#[derive(Clone, Debug)]
pub struct Stack<M> {
inner: M,
timeout: Duration,
_p: PhantomData<fn() -> T>
}
impl<T, M> Layer<T, M> {
pub fn new(timeout: Duration) -> Self {
Self {
timeout,
_p: PhantomData
}
}
pub fn layer(timeout: Duration) -> Layer {
Layer { timeout }
}
impl<T, M> Clone for Layer<T, M> {
fn clone(&self) -> Self {
Self::new(self.timeout)
}
}
impl<T, M> svc::Layer<T, T, M> for Layer<T, M>
impl<T, M> svc::Layer<T, T, M> for Layer
where
M: svc::Stack<T>,
{
type Value = <Stack<T, M> as svc::Stack<T>>::Value;
type Error = <Stack<T, M> as svc::Stack<T>>::Error;
type Stack = Stack<T, M>;
type Value = <Stack<M> as svc::Stack<T>>::Value;
type Error = <Stack<M> as svc::Stack<T>>::Error;
type Stack = Stack<M>;
fn bind(&self, inner: M) -> Self::Stack {
Stack {
inner,
timeout: self.timeout,
_p: PhantomData
}
}
}
impl<T, M: Clone> Clone for Stack<T, M> {
fn clone(&self) -> Self {
Stack {
inner: self.inner.clone(),
timeout: self.timeout,
_p: PhantomData,
}
}
}
impl<T, M> svc::Stack<T> for Stack<T, M>
impl<T, M> svc::Stack<T> for Stack<M>
where
M: svc::Stack<T>,
{

View File

@ -4,10 +4,10 @@ extern crate tower_service;
pub use self::tower_service::{NewService, Service};
pub use self::stack::{
shared,
stack_per_request,
watch,
Either,
Layer,
Stack,
Shared,
};

View File

@ -11,7 +11,7 @@ mod service;
pub use self::event::{Direction, Endpoint, Event};
pub use self::match_::InvalidMatch;
use self::match_::*;
pub use self::service::{Layer, Stack, RequestBody, Service};
pub use self::service::layer;
#[derive(Clone, Debug, Default)]
pub struct NextId(Arc<AtomicUsize>);

View File

@ -82,7 +82,7 @@ pub struct ResponseBody<B> {
// === Layer ===
impl<T, M, A, B> Layer<T, M>
pub fn layer<T, M, A, B>(next_id: NextId, taps: Arc<Mutex<Taps>>) -> Layer<T, M>
where
T: Clone + Into<event::Endpoint>,
M: svc::Stack<T>,
@ -94,12 +94,10 @@ where
A: Body,
B: Body,
{
pub fn new(next_id: NextId, taps: Arc<Mutex<Taps>>) -> Self {
Self {
next_id,
taps,
_p: PhantomData,
}
Layer {
next_id,
taps,
_p: PhantomData,
}
}