Make `resolve` a stack module (#106)
The `proxy::http::balance` module uses the `proxy::resolve::Resolve` trait to implement a `Discover`. This coupling between the balance and resolve modules prevents integrating the destination profile API such that there is a per-route, per-endpoint stack. This change makes the `balance` stack generic over a stack that produces a `Discover`. The `resolve` module now implements a stack that produces a `Discover` and is generic over a per-endpoint stack.
This commit is contained in:
parent
af4dd6fff3
commit
70d2f57e21
|
@ -20,10 +20,8 @@ use logging;
|
|||
use metrics;
|
||||
use proxy::{
|
||||
self, buffer,
|
||||
http::{
|
||||
balance, client, insert_target, metrics::timestamp_request_open, normalize_uri, router,
|
||||
},
|
||||
limit, reconnect, timeout
|
||||
http::{client, insert_target, metrics::timestamp_request_open, normalize_uri, router},
|
||||
limit, reconnect, timeout,
|
||||
};
|
||||
use svc::{self, Layer as _Layer, Stack as _Stack};
|
||||
use tap;
|
||||
|
@ -223,7 +221,11 @@ where
|
|||
let (drain_tx, drain_rx) = drain::channel();
|
||||
|
||||
let outbound = {
|
||||
use super::outbound;
|
||||
use super::outbound::{discovery::Resolve, orig_proto_upgrade, Recognize};
|
||||
use proxy::{
|
||||
http::{balance, metrics},
|
||||
resolve,
|
||||
};
|
||||
|
||||
let http_metrics = http_metrics.clone();
|
||||
|
||||
|
@ -252,31 +254,25 @@ where
|
|||
// settings. Stack layers above this operate on an `Endpoint` with
|
||||
// the TLS client config is marked as `NoConfig` when the endpoint
|
||||
// has a TLS identity.
|
||||
let router_layer = router::Layer::new(outbound::Recognize::new())
|
||||
let router_stack = router::Layer::new(Recognize::new())
|
||||
.and_then(limit::Layer::new(MAX_IN_FLIGHT))
|
||||
.and_then(timeout::Layer::new(config.bind_timeout))
|
||||
.and_then(buffer::Layer::new())
|
||||
.and_then(balance::Layer::new(outbound::discovery::Resolve::new(
|
||||
resolver,
|
||||
)))
|
||||
.and_then(outbound::orig_proto_upgrade::Layer::new())
|
||||
.and_then(balance::layer())
|
||||
.and_then(resolve::layer(Resolve::new(resolver)))
|
||||
.and_then(orig_proto_upgrade::Layer::new())
|
||||
.and_then(svc::watch::layer(tls_client_config))
|
||||
.and_then(proxy::http::metrics::Layer::new(
|
||||
http_metrics,
|
||||
classify::Classify,
|
||||
))
|
||||
.and_then(metrics::Layer::new(http_metrics, classify::Classify))
|
||||
.and_then(tap::Layer::new(tap_next_id.clone(), taps.clone()))
|
||||
.and_then(normalize_uri::Layer::new())
|
||||
.and_then(svc::stack_per_request::Layer::new());
|
||||
|
||||
let client = reconnect::Layer::new()
|
||||
.and_then(svc::stack_per_request::Layer::new())
|
||||
.and_then(reconnect::Layer::new())
|
||||
.and_then(client::Layer::new("out"))
|
||||
.bind(connect.clone());
|
||||
|
||||
let capacity = config.outbound_router_capacity;
|
||||
let max_idle_age = config.outbound_router_max_idle_age;
|
||||
let router = router_layer
|
||||
.bind(client)
|
||||
let router = router_stack
|
||||
.make(&router::Config::new("out", capacity, max_idle_age))
|
||||
.expect("outbound router");
|
||||
|
||||
|
|
|
@ -2,6 +2,9 @@
|
|||
#![cfg_attr(feature = "cargo-clippy", allow(new_without_default_derive))]
|
||||
#![deny(warnings)]
|
||||
|
||||
// Stack type inference requires a deeper recursion limit
|
||||
#![recursion_limit="128"]
|
||||
|
||||
extern crate bytes;
|
||||
extern crate env_logger;
|
||||
extern crate linkerd2_fs_watch as fs_watch;
|
||||
|
|
|
@ -2,69 +2,40 @@ extern crate tower_balance;
|
|||
extern crate tower_discover;
|
||||
extern crate tower_h2_balance;
|
||||
|
||||
use futures::{Async, Poll};
|
||||
use self::tower_discover::Discover;
|
||||
use http;
|
||||
use std::{error, fmt};
|
||||
use std::marker::PhantomData;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
use tower_h2::Body;
|
||||
|
||||
pub use self::tower_balance::{choose::PowerOfTwoChoices, load::WithPeakEwma, Balance};
|
||||
use self::tower_discover::{Change, Discover};
|
||||
pub use self::tower_h2_balance::{PendingUntilFirstData, PendingUntilFirstDataBody};
|
||||
|
||||
use proxy::resolve::{Resolve, Resolution, Update};
|
||||
use svc;
|
||||
|
||||
/// Configures a stack to resolve `T` typed targets to balance requests over
|
||||
/// `M`-typed endpoint stacks.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Layer<T, R, M> {
|
||||
pub struct Layer {
|
||||
decay: Duration,
|
||||
resolve: R,
|
||||
_p: PhantomData<fn() -> (T, M)>,
|
||||
}
|
||||
|
||||
/// Resolves `T` typed targets to balance requests over `M`-typed endpoint stacks.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Stack<T, R, M> {
|
||||
pub struct Stack<M> {
|
||||
decay: Duration,
|
||||
resolve: R,
|
||||
inner: M,
|
||||
_p: PhantomData<fn() -> T>,
|
||||
}
|
||||
|
||||
/// Observes an `R`-typed resolution stream, using an `M`-typed endpoint stack to
|
||||
/// build a service for each endpoint.
|
||||
pub struct Service<R: Resolution, M: svc::Stack<R::Endpoint>> {
|
||||
resolution: R,
|
||||
make: M,
|
||||
}
|
||||
|
||||
// === impl Layer ===
|
||||
|
||||
impl<T, R, M, A, B> Layer<T, R, M>
|
||||
where
|
||||
R: Resolve<T> + Clone,
|
||||
R::Endpoint: fmt::Debug,
|
||||
M: svc::Stack<R::Endpoint> + Clone,
|
||||
M::Value: svc::Service<
|
||||
Request = http::Request<A>,
|
||||
Response = http::Response<B>,
|
||||
>,
|
||||
A: Body,
|
||||
B: Body,
|
||||
{
|
||||
pub const DEFAULT_DECAY: Duration = Duration::from_secs(10);
|
||||
|
||||
pub fn new(resolve: R) -> Self {
|
||||
Self {
|
||||
resolve,
|
||||
decay: Self::DEFAULT_DECAY,
|
||||
_p: PhantomData,
|
||||
}
|
||||
pub fn layer() -> Layer {
|
||||
Layer {
|
||||
decay: Layer::DEFAULT_DECAY,
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer {
|
||||
const DEFAULT_DECAY: Duration = Duration::from_secs(10);
|
||||
|
||||
// pub fn with_decay(self, decay: Duration) -> Self {
|
||||
// Self {
|
||||
|
@ -74,125 +45,41 @@ where
|
|||
// }
|
||||
}
|
||||
|
||||
impl<T, R, M, A, B> svc::Layer<T, R::Endpoint, M> for Layer<T, R, M>
|
||||
impl<T, M, A, B> svc::Layer<T, T, M> for Layer
|
||||
where
|
||||
R: Resolve<T> + Clone,
|
||||
R::Endpoint: fmt::Debug,
|
||||
M: svc::Stack<R::Endpoint> + Clone,
|
||||
M::Value: svc::Service<
|
||||
Request = http::Request<A>,
|
||||
Response = http::Response<B>,
|
||||
>,
|
||||
M: svc::Stack<T> + Clone,
|
||||
M::Value: Discover<Request = http::Request<A>, Response = http::Response<B>>,
|
||||
A: Body,
|
||||
B: Body,
|
||||
{
|
||||
type Value = <Stack<T, R, M> as svc::Stack<T>>::Value;
|
||||
type Error = <Stack<T, R, M> as svc::Stack<T>>::Error;
|
||||
type Stack = Stack<T, R, M>;
|
||||
type Value = <Stack<M> as svc::Stack<T>>::Value;
|
||||
type Error = <Stack<M> as svc::Stack<T>>::Error;
|
||||
type Stack = Stack<M>;
|
||||
|
||||
fn bind(&self, inner: M) -> Self::Stack {
|
||||
Stack {
|
||||
decay: self.decay,
|
||||
resolve: self.resolve.clone(),
|
||||
inner,
|
||||
_p: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === impl Stack ===
|
||||
|
||||
impl<T, R, M, A, B> svc::Stack<T> for Stack<T, R, M>
|
||||
impl<T, M, A, B> svc::Stack<T> for Stack<M>
|
||||
where
|
||||
R: Resolve<T>,
|
||||
R::Endpoint: fmt::Debug,
|
||||
M: svc::Stack<R::Endpoint> + Clone,
|
||||
M::Value: svc::Service<
|
||||
Request = http::Request<A>,
|
||||
Response = http::Response<B>,
|
||||
>,
|
||||
M: svc::Stack<T> + Clone,
|
||||
M::Value: Discover<Request = http::Request<A>, Response = http::Response<B>>,
|
||||
A: Body,
|
||||
B: Body,
|
||||
{
|
||||
type Value = Balance<
|
||||
WithPeakEwma<Service<R::Resolution, M>, PendingUntilFirstData>,
|
||||
PowerOfTwoChoices,
|
||||
>;
|
||||
type Value = Balance<WithPeakEwma<M::Value, PendingUntilFirstData>, PowerOfTwoChoices>;
|
||||
type Error = M::Error;
|
||||
|
||||
fn make(&self, target: &T) -> Result<Self::Value, Self::Error> {
|
||||
let discover = Service {
|
||||
resolution: self.resolve.resolve(&target),
|
||||
make: self.inner.clone(),
|
||||
};
|
||||
|
||||
let discover = self.inner.make(target)?;
|
||||
let instrument = PendingUntilFirstData::default();
|
||||
let loaded = WithPeakEwma::new(discover, self.decay, instrument);
|
||||
Ok(Balance::p2c(loaded))
|
||||
}
|
||||
}
|
||||
|
||||
// === impl Service ===
|
||||
|
||||
impl<R, M> Discover for Service<R, M>
|
||||
where
|
||||
R: Resolution,
|
||||
R::Endpoint: fmt::Debug,
|
||||
M: svc::Stack<R::Endpoint>,
|
||||
M::Value: svc::Service,
|
||||
{
|
||||
type Key = SocketAddr;
|
||||
type Request = <M::Value as svc::Service>::Request;
|
||||
type Response = <M::Value as svc::Service>::Response;
|
||||
type Error = <M::Value as svc::Service>::Error;
|
||||
type Service = M::Value;
|
||||
type DiscoverError = Error<R::Error, M::Error>;
|
||||
|
||||
fn poll(&mut self)
|
||||
-> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError>
|
||||
{
|
||||
loop {
|
||||
let up = try_ready!(self.resolution.poll().map_err(Error::Resolve));
|
||||
trace!("watch: {:?}", up);
|
||||
match up {
|
||||
Update::Add(addr, target) => {
|
||||
// We expect the load balancer to handle duplicate inserts
|
||||
// by replacing the old endpoint with the new one, so
|
||||
// insertions of new endpoints and metadata changes for
|
||||
// existing ones can be handled in the same way.
|
||||
let svc = self.make.make(&target).map_err(Error::Stack)?;
|
||||
return Ok(Async::Ready(Change::Insert(addr, svc)));
|
||||
},
|
||||
Update::Remove(addr) => {
|
||||
return Ok(Async::Ready(Change::Remove(addr)));
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === impl Error ===
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error<R, M> {
|
||||
Resolve(R),
|
||||
Stack(M),
|
||||
}
|
||||
|
||||
impl<M> fmt::Display for Error<(), M>
|
||||
where
|
||||
M: fmt::Display,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
Error::Resolve(()) => unreachable!("resolution must succeed"),
|
||||
Error::Stack(e) => e.fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M> error::Error for Error<(), M>
|
||||
where
|
||||
M: error::Error,
|
||||
{
|
||||
}
|
||||
|
|
|
@ -1,5 +1,11 @@
|
|||
use futures::Poll;
|
||||
extern crate tower_discover;
|
||||
|
||||
use futures::{Async, Poll};
|
||||
use std::net::SocketAddr;
|
||||
use std::{error, fmt};
|
||||
|
||||
pub use self::tower_discover::Change;
|
||||
use svc;
|
||||
|
||||
/// Resolves `T`-typed names/addresses as a `Resolution`.
|
||||
pub trait Resolve<T> {
|
||||
|
@ -17,9 +23,138 @@ pub trait Resolution {
|
|||
fn poll(&mut self) -> Poll<Update<Self::Endpoint>, Self::Error>;
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Update<T> {
|
||||
Add(SocketAddr, T),
|
||||
Remove(SocketAddr),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Layer<R> {
|
||||
resolve: R,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Stack<R, M> {
|
||||
resolve: R,
|
||||
inner: M,
|
||||
}
|
||||
|
||||
/// Observes an `R`-typed resolution stream, using an `M`-typed endpoint stack to
|
||||
/// build a service for each endpoint.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Discover<R: Resolution, M: svc::Stack<R::Endpoint>> {
|
||||
resolution: R,
|
||||
make: M,
|
||||
}
|
||||
|
||||
// === impl Layer ===
|
||||
|
||||
pub fn layer<T, R>(resolve: R) -> Layer<R>
|
||||
where
|
||||
R: Resolve<T> + Clone,
|
||||
R::Endpoint: fmt::Debug,
|
||||
{
|
||||
Layer {
|
||||
resolve,
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, R, M> svc::Layer<T, R::Endpoint, M> for Layer<R>
|
||||
where
|
||||
R: Resolve<T> + Clone,
|
||||
R::Endpoint: fmt::Debug,
|
||||
M: svc::Stack<R::Endpoint> + Clone,
|
||||
M::Value: svc::Service,
|
||||
{
|
||||
type Value = <Stack<R, M> as svc::Stack<T>>::Value;
|
||||
type Error = <Stack<R, M> as svc::Stack<T>>::Error;
|
||||
type Stack = Stack<R, M>;
|
||||
|
||||
fn bind(&self, inner: M) -> Self::Stack {
|
||||
Stack {
|
||||
resolve: self.resolve.clone(),
|
||||
inner,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === impl Stack ===
|
||||
|
||||
impl<T, R, M> svc::Stack<T> for Stack<R, M>
|
||||
where
|
||||
R: Resolve<T>,
|
||||
R::Endpoint: fmt::Debug,
|
||||
M: svc::Stack<R::Endpoint> + Clone,
|
||||
M::Value: svc::Service,
|
||||
{
|
||||
type Value = Discover<R::Resolution, M>;
|
||||
type Error = M::Error;
|
||||
|
||||
fn make(&self, target: &T) -> Result<Self::Value, Self::Error> {
|
||||
let resolution = self.resolve.resolve(target);
|
||||
Ok(Discover {
|
||||
resolution,
|
||||
make: self.inner.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// === impl Discover ===
|
||||
|
||||
impl<R, M> tower_discover::Discover for Discover<R, M>
|
||||
where
|
||||
R: Resolution,
|
||||
R::Endpoint: fmt::Debug,
|
||||
M: svc::Stack<R::Endpoint>,
|
||||
M::Value: svc::Service,
|
||||
{
|
||||
type Key = SocketAddr;
|
||||
type Request = <M::Value as svc::Service>::Request;
|
||||
type Response = <M::Value as svc::Service>::Response;
|
||||
type Error = <M::Value as svc::Service>::Error;
|
||||
type Service = M::Value;
|
||||
type DiscoverError = Error<R::Error, M::Error>;
|
||||
|
||||
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
|
||||
loop {
|
||||
let up = try_ready!(self.resolution.poll().map_err(Error::Resolve));
|
||||
trace!("watch: {:?}", up);
|
||||
match up {
|
||||
Update::Add(addr, target) => {
|
||||
// We expect the load balancer to handle duplicate inserts
|
||||
// by replacing the old endpoint with the new one, so
|
||||
// insertions of new endpoints and metadata changes for
|
||||
// existing ones can be handled in the same way.
|
||||
let svc = self.make.make(&target).map_err(Error::Stack)?;
|
||||
return Ok(Async::Ready(Change::Insert(addr, svc)));
|
||||
}
|
||||
Update::Remove(addr) => {
|
||||
return Ok(Async::Ready(Change::Remove(addr)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === impl Error ===
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error<R, M> {
|
||||
Resolve(R),
|
||||
Stack(M),
|
||||
}
|
||||
|
||||
impl<M> fmt::Display for Error<(), M>
|
||||
where
|
||||
M: fmt::Display,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match self {
|
||||
Error::Resolve(()) => unreachable!("resolution must succeed"),
|
||||
Error::Stack(e) => e.fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M> error::Error for Error<(), M> where M: error::Error {}
|
||||
|
|
Loading…
Reference in New Issue