Refactor control::destination::background::client module (#38)
This branch should not make any functional changes. This branch makes two minor refactorings to the `client` module in `control::destination::background`: 1. Remove the `AddOrigin` middleware and replace it with the `tower-add-origin` crate from `tower-http`. These middlewares are functionally identical, but the Tower version has tests. 2. Change `ClientService` from a type alias to a tuple struct. This means that some of the middleware that are used only in this module (`LogErrors` and `Backoff`) are no longer part of a publicly visible type and can be made private to the module. Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
parent
ab1b280de8
commit
1774c87400
12
Cargo.lock
12
Cargo.lock
|
@ -506,6 +506,7 @@ dependencies = [
|
|||
"tokio-rustls 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-signal 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tower-add-origin 0.1.0 (git+https://github.com/tower-rs/tower-http)",
|
||||
"tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)",
|
||||
"tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)",
|
||||
"tower-discover 0.1.0 (git+https://github.com/tower-rs/tower)",
|
||||
|
@ -1218,6 +1219,16 @@ dependencies = [
|
|||
"tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower-add-origin"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/tower-rs/tower-http#c9e13f641a681b3ef01e96910789586e39aee2e2"
|
||||
dependencies = [
|
||||
"futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"http 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"tower-service 0.1.0 (git+https://github.com/tower-rs/tower)",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower-balance"
|
||||
version = "0.1.0"
|
||||
|
@ -1679,6 +1690,7 @@ dependencies = [
|
|||
"checksum tokio-threadpool 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b3c3873a6d8d0b636e024e77b9a82eaab6739578a06189ecd0e731c7308fbc5d"
|
||||
"checksum tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "028b94314065b90f026a21826cffd62a4e40a92cda3e5c069cc7b02e5945f5e9"
|
||||
"checksum tokio-udp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "137bda266504893ac4774e0ec4c2108f7ccdbcb7ac8dced6305fe9e4e0b5041a"
|
||||
"checksum tower-add-origin 0.1.0 (git+https://github.com/tower-rs/tower-http)" = "<none>"
|
||||
"checksum tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
|
||||
"checksum tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
|
||||
"checksum tower-discover 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
|
||||
|
|
|
@ -47,6 +47,7 @@ tokio = "0.1.7"
|
|||
tokio-signal = "0.2"
|
||||
tokio-timer = "0.2.4" # for tokio_timer::clock
|
||||
tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
|
||||
tower-add-origin = { git = "https://github.com/tower-rs/tower-http" }
|
||||
tower-balance = { git = "https://github.com/tower-rs/tower" }
|
||||
tower-buffer = { git = "https://github.com/tower-rs/tower" }
|
||||
tower-discover = { git = "https://github.com/tower-rs/tower" }
|
||||
|
|
|
@ -10,9 +10,14 @@ use h2;
|
|||
use http;
|
||||
use tokio::timer::Delay;
|
||||
|
||||
use tower_h2::{self, BoxBody};
|
||||
use tower_h2::{self, BoxBody, RecvBody};
|
||||
use tower_add_origin::AddOrigin;
|
||||
use tower_service::Service;
|
||||
use tower_reconnect::{Reconnect, Error as ReconnectError};
|
||||
use tower_reconnect::{
|
||||
Reconnect,
|
||||
Error as ReconnectError,
|
||||
ResponseFuture as ReconnectFuture,
|
||||
};
|
||||
use conditional::Conditional;
|
||||
use dns;
|
||||
use timeout::{Timeout, TimeoutError};
|
||||
|
@ -20,7 +25,7 @@ use transport::{tls, HostAndPort, LookupAddressAndConnect};
|
|||
use watch_service::Rebind;
|
||||
|
||||
/// Type of the client service stack used to make destination requests.
|
||||
pub(super) type ClientService = AddOrigin<Backoff<LogErrors<Reconnect<
|
||||
pub(super) struct ClientService(AddOrigin<Backoff<LogErrors<Reconnect<
|
||||
tower_h2::client::Connect<
|
||||
Timeout<LookupAddressAndConnect>,
|
||||
::logging::ContextualExecutor<
|
||||
|
@ -31,7 +36,7 @@ pub(super) type ClientService = AddOrigin<Backoff<LogErrors<Reconnect<
|
|||
>,
|
||||
BoxBody,
|
||||
>
|
||||
>>>>;
|
||||
>>>>);
|
||||
|
||||
/// The state needed to bind a new controller client stack.
|
||||
pub(super) struct BindClient {
|
||||
|
@ -44,23 +49,15 @@ pub(super) struct BindClient {
|
|||
|
||||
/// Wait a duration if inner `poll_ready` returns an error.
|
||||
//TODO: move to tower-backoff
|
||||
pub(super) struct Backoff<S> {
|
||||
struct Backoff<S> {
|
||||
inner: S,
|
||||
timer: Delay,
|
||||
waiting: bool,
|
||||
wait_dur: Duration,
|
||||
}
|
||||
|
||||
|
||||
/// Wraps an HTTP service, injecting authority and scheme on every request.
|
||||
pub(super) struct AddOrigin<S> {
|
||||
authority: http::uri::Authority,
|
||||
inner: S,
|
||||
scheme: http::uri::Scheme,
|
||||
}
|
||||
|
||||
/// Log errors talking to the controller in human format.
|
||||
pub(super) struct LogErrors<S> {
|
||||
struct LogErrors<S> {
|
||||
inner: S,
|
||||
}
|
||||
|
||||
|
@ -80,6 +77,31 @@ type LogError = ReconnectError<
|
|||
>
|
||||
>;
|
||||
|
||||
// ===== impl ClientService =====
|
||||
|
||||
impl Service for ClientService {
|
||||
type Request = http::Request<BoxBody>;
|
||||
type Response = http::Response<RecvBody>;
|
||||
type Error = LogError;
|
||||
type Future = ReconnectFuture<
|
||||
tower_h2::client::Connect<
|
||||
Timeout<LookupAddressAndConnect>,
|
||||
::logging::ContextualExecutor<
|
||||
::logging::Client<
|
||||
&'static str,
|
||||
HostAndPort,
|
||||
>
|
||||
>, BoxBody>>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.0.poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, request: Self::Request) -> Self::Future {
|
||||
self.0.call(request)
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl BindClient =====
|
||||
|
||||
impl BindClient {
|
||||
|
@ -133,8 +155,7 @@ impl Rebind<tls::ConditionalClientConfig> for BindClient {
|
|||
let reconnect = Reconnect::new(h2_client);
|
||||
let log_errors = LogErrors::new(reconnect);
|
||||
let backoff = Backoff::new(log_errors, self.backoff_delay);
|
||||
// TODO: Use AddOrigin in tower-http
|
||||
AddOrigin::new(scheme, authority, backoff)
|
||||
ClientService(AddOrigin::new(backoff, scheme, authority))
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -145,7 +166,7 @@ impl<S> Backoff<S>
|
|||
where
|
||||
S: Service,
|
||||
{
|
||||
pub(super) fn new(inner: S, wait_dur: Duration) -> Self {
|
||||
fn new(inner: S, wait_dur: Duration) -> Self {
|
||||
Backoff {
|
||||
inner,
|
||||
timer: Delay::new(Instant::now() + wait_dur),
|
||||
|
@ -207,7 +228,7 @@ impl<S> LogErrors<S>
|
|||
where
|
||||
S: Service<Error=LogError>,
|
||||
{
|
||||
pub(super) fn new(service: S) -> Self {
|
||||
fn new(service: S) -> Self {
|
||||
LogErrors {
|
||||
inner: service,
|
||||
}
|
||||
|
@ -235,7 +256,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) struct HumanError<'a>(&'a LogError);
|
||||
struct HumanError<'a>(&'a LogError);
|
||||
|
||||
impl<'a> fmt::Display for HumanError<'a> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
|
@ -256,42 +277,6 @@ impl<'a> fmt::Display for HumanError<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
// ===== impl AddOrigin =====
|
||||
|
||||
impl<S> AddOrigin<S> {
|
||||
pub(super) fn new(scheme: http::uri::Scheme, auth: http::uri::Authority, service: S) -> Self {
|
||||
AddOrigin {
|
||||
authority: auth,
|
||||
inner: service,
|
||||
scheme,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, B> Service for AddOrigin<S>
|
||||
where
|
||||
S: Service<Request = http::Request<B>>,
|
||||
{
|
||||
type Request = http::Request<B>;
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.inner.poll_ready()
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
let (mut head, body) = req.into_parts();
|
||||
let mut uri: http::uri::Parts = head.uri.into();
|
||||
uri.scheme = Some(self.scheme.clone());
|
||||
uri.authority = Some(self.authority.clone());
|
||||
head.uri = http::Uri::from_parts(uri).expect("valid uri");
|
||||
|
||||
self.inner.call(http::Request::from_parts(head, body))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
|
@ -38,6 +38,7 @@ extern crate tempdir;
|
|||
extern crate tokio;
|
||||
extern crate tokio_connect;
|
||||
extern crate tokio_timer;
|
||||
extern crate tower_add_origin;
|
||||
extern crate tower_balance;
|
||||
extern crate tower_buffer;
|
||||
extern crate tower_discover;
|
||||
|
|
Loading…
Reference in New Issue