Decouple reconnect & connect layers from proxy::http::client (#101)
Previously, the `client` module was responsible for instrument reconnects. Now, the reconnect module becomes its own stack layer that composes over NewService stacks. Additionally, the `proxy::http::client` module can now layer over an underlying Connect stack.
This commit is contained in:
parent
978fed1cf6
commit
f1210b4947
|
@ -23,7 +23,7 @@ use proxy::{
|
||||||
http::{
|
http::{
|
||||||
balance, client, insert_target, metrics::timestamp_request_open, normalize_uri, router,
|
balance, client, insert_target, metrics::timestamp_request_open, normalize_uri, router,
|
||||||
},
|
},
|
||||||
limit, timeout,
|
limit, reconnect, timeout,
|
||||||
};
|
};
|
||||||
use svc::{self, Layer as _Layer, Stack as _Stack};
|
use svc::{self, Layer as _Layer, Stack as _Stack};
|
||||||
use tap;
|
use tap;
|
||||||
|
@ -269,10 +269,14 @@ where
|
||||||
.and_then(normalize_uri::Layer::new())
|
.and_then(normalize_uri::Layer::new())
|
||||||
.and_then(svc::stack_per_request::Layer::new());
|
.and_then(svc::stack_per_request::Layer::new());
|
||||||
|
|
||||||
|
let client = reconnect::Layer::new()
|
||||||
|
.and_then(client::Layer::new("out"))
|
||||||
|
.bind(connect.clone());
|
||||||
|
|
||||||
let capacity = config.outbound_router_capacity;
|
let capacity = config.outbound_router_capacity;
|
||||||
let max_idle_age = config.outbound_router_max_idle_age;
|
let max_idle_age = config.outbound_router_max_idle_age;
|
||||||
let router = router_layer
|
let router = router_layer
|
||||||
.bind(client::Stack::new("out", connect.clone()))
|
.bind(client)
|
||||||
.make(&router::Config::new("out", capacity, max_idle_age))
|
.make(&router::Config::new("out", capacity, max_idle_age))
|
||||||
.expect("outbound router");
|
.expect("outbound router");
|
||||||
|
|
||||||
|
@ -332,11 +336,15 @@ where
|
||||||
.and_then(normalize_uri::Layer::new())
|
.and_then(normalize_uri::Layer::new())
|
||||||
.and_then(svc::stack_per_request::Layer::new());
|
.and_then(svc::stack_per_request::Layer::new());
|
||||||
|
|
||||||
|
let client = reconnect::Layer::new()
|
||||||
|
.and_then(client::Layer::new("in"))
|
||||||
|
.bind(connect.clone());
|
||||||
|
|
||||||
// Build a router using the above policy
|
// Build a router using the above policy
|
||||||
let capacity = config.inbound_router_capacity;
|
let capacity = config.inbound_router_capacity;
|
||||||
let max_idle_age = config.inbound_router_max_idle_age;
|
let max_idle_age = config.inbound_router_max_idle_age;
|
||||||
let router = router_layer
|
let router = router_layer
|
||||||
.bind(client::Stack::new("in", connect.clone()))
|
.bind(client)
|
||||||
.make(&router::Config::new("in", capacity, max_idle_age))
|
.make(&router::Config::new("in", capacity, max_idle_age))
|
||||||
.expect("inbound router");
|
.expect("inbound router");
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,6 @@ use tower_h2;
|
||||||
use super::{h1, Settings};
|
use super::{h1, Settings};
|
||||||
use super::glue::{BodyPayload, HttpBody, HyperConnect};
|
use super::glue::{BodyPayload, HttpBody, HyperConnect};
|
||||||
use super::upgrade::{HttpConnect, Http11Upgrade};
|
use super::upgrade::{HttpConnect, Http11Upgrade};
|
||||||
use super::super::Reconnect;
|
|
||||||
use svc;
|
use svc;
|
||||||
use task::BoxExecutor;
|
use task::BoxExecutor;
|
||||||
use transport::connect;
|
use transport::connect;
|
||||||
|
@ -26,6 +25,15 @@ pub struct Config {
|
||||||
_p: (),
|
_p: (),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Configurs an HTTP client that uses a `C`-typed connector
|
||||||
|
///
|
||||||
|
/// The `proxy_name` is used for diagnostics (logging, mostly).
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Layer<B> {
|
||||||
|
proxy_name: &'static str,
|
||||||
|
_p: PhantomData<fn() -> B>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Configurs an HTTP client that uses a `C`-typed connector
|
/// Configurs an HTTP client that uses a `C`-typed connector
|
||||||
///
|
///
|
||||||
/// The `proxy_name` is used for diagnostics (logging, mostly).
|
/// The `proxy_name` is used for diagnostics (logging, mostly).
|
||||||
|
@ -144,31 +152,73 @@ impl Config {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// === impl Stack ===
|
// === impl Layer ===
|
||||||
|
|
||||||
impl<C, B> Stack<C, B>
|
impl<B> Layer<B>
|
||||||
where
|
where
|
||||||
C: svc::Stack<connect::Target>,
|
B: tower_h2::Body + Send + 'static,
|
||||||
C::Value: connect::Connect + Clone + Send + Sync + 'static,
|
<B::Data as IntoBuf>::Buf: Send + 'static,
|
||||||
B: tower_h2::Body + 'static,
|
|
||||||
{
|
{
|
||||||
pub fn new(proxy_name: &'static str, connect: C) -> Self {
|
pub fn new(proxy_name: &'static str) -> Self {
|
||||||
Self {
|
Self {
|
||||||
connect,
|
|
||||||
proxy_name,
|
proxy_name,
|
||||||
_p: PhantomData,
|
_p: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<B> Clone for Layer<B>
|
||||||
|
where
|
||||||
|
B: tower_h2::Body + 'static,
|
||||||
|
<B::Data as IntoBuf>::Buf: Send + 'static,
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
proxy_name: self.proxy_name,
|
||||||
|
_p: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, C, B> svc::Layer<T, connect::Target, C> for Layer<B>
|
||||||
|
where
|
||||||
|
T: Into<Config> + Clone,
|
||||||
|
C: svc::Stack<connect::Target>,
|
||||||
|
C::Value: connect::Connect + Clone + Send + Sync + 'static,
|
||||||
|
<C::Value as connect::Connect>::Connected: Send,
|
||||||
|
<C::Value as connect::Connect>::Future: Send + 'static,
|
||||||
|
<C::Value as connect::Connect>::Error: error::Error + Send + Sync,
|
||||||
|
B: tower_h2::Body + Send + 'static,
|
||||||
|
<B::Data as IntoBuf>::Buf: Send + 'static,
|
||||||
|
{
|
||||||
|
type Value = <Stack<C, B> as svc::Stack<T>>::Value;
|
||||||
|
type Error = <Stack<C, B> as svc::Stack<T>>::Error;
|
||||||
|
type Stack = Stack<C, B>;
|
||||||
|
|
||||||
|
fn bind(&self, connect: C) -> Self::Stack {
|
||||||
|
Stack {
|
||||||
|
connect,
|
||||||
|
proxy_name: self.proxy_name,
|
||||||
|
_p: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// === impl Stack ===
|
||||||
|
|
||||||
impl<C, B> Clone for Stack<C, B>
|
impl<C, B> Clone for Stack<C, B>
|
||||||
where
|
where
|
||||||
C: svc::Stack<connect::Target> + Clone,
|
C: svc::Stack<connect::Target> + Clone,
|
||||||
C::Value: connect::Connect + Clone + Send + Sync + 'static,
|
C::Value: connect::Connect + Clone + Send + Sync + 'static,
|
||||||
B: tower_h2::Body + 'static,
|
B: tower_h2::Body + 'static,
|
||||||
|
<B::Data as IntoBuf>::Buf: Send + 'static,
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self::new(self.proxy_name, self.connect.clone())
|
Self {
|
||||||
|
proxy_name: self.proxy_name,
|
||||||
|
connect: self.connect.clone(),
|
||||||
|
_p: PhantomData,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,10 +233,7 @@ where
|
||||||
B: tower_h2::Body + Send + 'static,
|
B: tower_h2::Body + Send + 'static,
|
||||||
<B::Data as IntoBuf>::Buf: Send + 'static,
|
<B::Data as IntoBuf>::Buf: Send + 'static,
|
||||||
{
|
{
|
||||||
type Value = Reconnect<
|
type Value = Client<C::Value, ::logging::ClientExecutor<&'static str, net::SocketAddr>, B>;
|
||||||
Config,
|
|
||||||
Client<C::Value, ::logging::ClientExecutor<&'static str, net::SocketAddr>, B>,
|
|
||||||
>;
|
|
||||||
type Error = C::Error;
|
type Error = C::Error;
|
||||||
|
|
||||||
fn make(&self, t: &T) -> Result<Self::Value, Self::Error> {
|
fn make(&self, t: &T) -> Result<Self::Value, Self::Error> {
|
||||||
|
@ -196,8 +243,7 @@ where
|
||||||
.with_settings(config.settings.clone())
|
.with_settings(config.settings.clone())
|
||||||
.executor();
|
.executor();
|
||||||
debug!("building client={:?}", config);
|
debug!("building client={:?}", config);
|
||||||
let client = Client::new(&config.settings, connect, executor);
|
Ok(Client::new(&config.settings, connect, executor))
|
||||||
Ok(Reconnect::new(config.clone(), client))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,13 +6,12 @@ pub mod buffer;
|
||||||
pub mod http;
|
pub mod http;
|
||||||
pub mod limit;
|
pub mod limit;
|
||||||
mod protocol;
|
mod protocol;
|
||||||
mod reconnect;
|
pub mod reconnect;
|
||||||
pub mod resolve;
|
pub mod resolve;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
mod tcp;
|
mod tcp;
|
||||||
pub mod timeout;
|
pub mod timeout;
|
||||||
|
|
||||||
pub use self::reconnect::Reconnect;
|
|
||||||
pub use self::resolve::{Resolve, Resolution};
|
pub use self::resolve::{Resolve, Resolution};
|
||||||
pub use self::server::{Server, Source};
|
pub use self::server::{Server, Source};
|
||||||
|
|
||||||
|
|
|
@ -1,15 +1,26 @@
|
||||||
use std::fmt;
|
|
||||||
|
|
||||||
use futures::{task, Async, Future, Poll};
|
use futures::{task, Async, Future, Poll};
|
||||||
|
use std::fmt;
|
||||||
|
use std::marker::PhantomData;
|
||||||
use tower_reconnect;
|
use tower_reconnect;
|
||||||
|
|
||||||
use svc;
|
use svc;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Layer<T, M> {
|
||||||
|
_p: PhantomData<fn() -> (T, M)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Stack<T, M> {
|
||||||
|
inner: M,
|
||||||
|
_p: PhantomData<fn() -> T>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Wraps `tower_reconnect`, handling errors.
|
/// Wraps `tower_reconnect`, handling errors.
|
||||||
///
|
///
|
||||||
/// Ensures that the underlying service is ready and, if the underlying service
|
/// Ensures that the underlying service is ready and, if the underlying service
|
||||||
/// fails to become ready, rebuilds the inner stack.
|
/// fails to become ready, rebuilds the inner stack.
|
||||||
pub struct Reconnect<T, N>
|
pub struct Service<T, N>
|
||||||
where
|
where
|
||||||
T: fmt::Debug,
|
T: fmt::Debug,
|
||||||
N: svc::NewService,
|
N: svc::NewService,
|
||||||
|
@ -29,26 +40,80 @@ pub struct ResponseFuture<N: svc::NewService> {
|
||||||
inner: <tower_reconnect::Reconnect<N> as svc::Service>::Future,
|
inner: <tower_reconnect::Reconnect<N> as svc::Service>::Future,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== impl Reconnect =====
|
// === impl Layer ===
|
||||||
|
|
||||||
|
impl<T, M> Layer<T, M>
|
||||||
impl<T, N> Reconnect<T, N>
|
|
||||||
where
|
where
|
||||||
T: fmt::Debug,
|
T: fmt::Debug,
|
||||||
N: svc::NewService,
|
M: svc::Stack<T>,
|
||||||
N::InitError: fmt::Display,
|
M::Value: svc::NewService,
|
||||||
{
|
{
|
||||||
pub fn new(target: T, new_service: N) -> Self {
|
pub fn new() -> Self {
|
||||||
let inner = tower_reconnect::Reconnect::new(new_service);
|
|
||||||
Self {
|
Self {
|
||||||
target,
|
_p: PhantomData,
|
||||||
inner,
|
|
||||||
mute_connect_error_log: false,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, N> svc::Service for Reconnect<T, N>
|
impl<T, M> Clone for Layer<T, M> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
_p: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, M> svc::Layer<T, T, M> for Layer<T, M>
|
||||||
|
where
|
||||||
|
T: Clone + fmt::Debug,
|
||||||
|
M: svc::Stack<T>,
|
||||||
|
M::Value: svc::NewService,
|
||||||
|
{
|
||||||
|
type Value = <Stack<T, M> as svc::Stack<T>>::Value;
|
||||||
|
type Error = <Stack<T, M> as svc::Stack<T>>::Error;
|
||||||
|
type Stack = Stack<T, M>;
|
||||||
|
|
||||||
|
fn bind(&self, inner: M) -> Self::Stack {
|
||||||
|
Stack {
|
||||||
|
inner,
|
||||||
|
_p: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// === impl Stack ===
|
||||||
|
|
||||||
|
impl<T, M: Clone> Clone for Stack<T, M> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: self.inner.clone(),
|
||||||
|
_p: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, M> svc::Stack<T> for Stack<T, M>
|
||||||
|
where
|
||||||
|
T: Clone + fmt::Debug,
|
||||||
|
M: svc::Stack<T>,
|
||||||
|
M::Value: svc::NewService,
|
||||||
|
{
|
||||||
|
type Value = Service<T, M::Value>;
|
||||||
|
type Error = M::Error;
|
||||||
|
|
||||||
|
fn make(&self, target: &T) -> Result<Self::Value, Self::Error> {
|
||||||
|
let new_service = self.inner.make(target)?;
|
||||||
|
Ok(Service {
|
||||||
|
inner: tower_reconnect::Reconnect::new(new_service),
|
||||||
|
target: target.clone(),
|
||||||
|
mute_connect_error_log: false,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// === impl Service ===
|
||||||
|
|
||||||
|
impl<T, N> svc::Service for Service<T, N>
|
||||||
where
|
where
|
||||||
T: fmt::Debug,
|
T: fmt::Debug,
|
||||||
N: svc::NewService,
|
N: svc::NewService,
|
||||||
|
@ -63,13 +128,11 @@ where
|
||||||
match self.inner.poll_ready() {
|
match self.inner.poll_ready() {
|
||||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||||
Ok(ready) => {
|
Ok(ready) => {
|
||||||
trace!("poll_ready: ready for business");
|
|
||||||
self.mute_connect_error_log = false;
|
self.mute_connect_error_log = false;
|
||||||
Ok(ready)
|
Ok(ready)
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(tower_reconnect::Error::Inner(err)) => {
|
Err(tower_reconnect::Error::Inner(err)) => {
|
||||||
trace!("poll_ready: inner error, debouncing");
|
|
||||||
self.mute_connect_error_log = false;
|
self.mute_connect_error_log = false;
|
||||||
Err(err)
|
Err(err)
|
||||||
}
|
}
|
||||||
|
@ -110,7 +173,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: fmt::Debug, N: svc::NewService> fmt::Debug for Reconnect<T, N> {
|
impl<T: fmt::Debug, N: svc::NewService> fmt::Debug for Service<T, N> {
|
||||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||||
fmt.debug_struct("Reconnect")
|
fmt.debug_struct("Reconnect")
|
||||||
.field("target", &self.target)
|
.field("target", &self.target)
|
||||||
|
|
Loading…
Reference in New Issue