From 1774c874004273a9921e85c9bcd8bdaceabb7585 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 3 Aug 2018 17:00:20 -0700 Subject: [PATCH] Refactor control::destination::background::client module (#38) This branch should not make any functional changes. This branch makes two minor refactorings to the `client` module in `control::destination::background`: 1. Remove the `AddOrigin` middleware and replace it with the `tower-add-origin` crate from `tower-http`. These middlewares are functionally identical, but the Tower version has tests. 2. Change `ClientService` from a type alias to a tuple struct. This means that some of the middleware that are used only in this module (`LogErrors` and `Backoff`) are no longer part of a publicly visible type and can be made private to the module. Signed-off-by: Eliza Weisman --- Cargo.lock | 12 +++ Cargo.toml | 1 + src/control/destination/background/client.rs | 95 +++++++++----------- src/lib.rs | 1 + 4 files changed, 54 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9f75d6dab..4b1cfc09e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -506,6 +506,7 @@ dependencies = [ "tokio-rustls 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-signal 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-add-origin 0.1.0 (git+https://github.com/tower-rs/tower-http)", "tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-discover 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -1218,6 +1219,16 @@ dependencies = [ "tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tower-add-origin" +version = "0.1.0" +source = "git+https://github.com/tower-rs/tower-http#c9e13f641a681b3ef01e96910789586e39aee2e2" +dependencies = [ + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", +] + [[package]] name = "tower-balance" version = "0.1.0" @@ -1679,6 +1690,7 @@ dependencies = [ "checksum tokio-threadpool 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b3c3873a6d8d0b636e024e77b9a82eaab6739578a06189ecd0e731c7308fbc5d" "checksum tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "028b94314065b90f026a21826cffd62a4e40a92cda3e5c069cc7b02e5945f5e9" "checksum tokio-udp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "137bda266504893ac4774e0ec4c2108f7ccdbcb7ac8dced6305fe9e4e0b5041a" +"checksum tower-add-origin 0.1.0 (git+https://github.com/tower-rs/tower-http)" = "" "checksum tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-discover 0.1.0 (git+https://github.com/tower-rs/tower)" = "" diff --git a/Cargo.toml b/Cargo.toml index aab0f49d5..c757d384d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ tokio = "0.1.7" tokio-signal = "0.2" tokio-timer = "0.2.4" # for tokio_timer::clock tokio-connect = { git = "https://github.com/carllerche/tokio-connect" } +tower-add-origin = { git = "https://github.com/tower-rs/tower-http" } tower-balance = { git = "https://github.com/tower-rs/tower" } tower-buffer = { git = "https://github.com/tower-rs/tower" } tower-discover = { git = "https://github.com/tower-rs/tower" } diff --git a/src/control/destination/background/client.rs b/src/control/destination/background/client.rs index 6b54dcd16..f95ecce0d 100644 --- a/src/control/destination/background/client.rs +++ b/src/control/destination/background/client.rs @@ -10,9 +10,14 @@ use h2; use http; use tokio::timer::Delay; -use tower_h2::{self, BoxBody}; +use tower_h2::{self, BoxBody, RecvBody}; +use tower_add_origin::AddOrigin; use tower_service::Service; -use tower_reconnect::{Reconnect, Error as ReconnectError}; +use tower_reconnect::{ + Reconnect, + Error as ReconnectError, + ResponseFuture as ReconnectFuture, +}; use conditional::Conditional; use dns; use timeout::{Timeout, TimeoutError}; @@ -20,7 +25,7 @@ use transport::{tls, HostAndPort, LookupAddressAndConnect}; use watch_service::Rebind; /// Type of the client service stack used to make destination requests. -pub(super) type ClientService = AddOrigin, ::logging::ContextualExecutor< @@ -31,7 +36,7 @@ pub(super) type ClientService = AddOrigin, BoxBody, > - >>>>; + >>>>); /// The state needed to bind a new controller client stack. pub(super) struct BindClient { @@ -44,23 +49,15 @@ pub(super) struct BindClient { /// Wait a duration if inner `poll_ready` returns an error. //TODO: move to tower-backoff -pub(super) struct Backoff { +struct Backoff { inner: S, timer: Delay, waiting: bool, wait_dur: Duration, } - -/// Wraps an HTTP service, injecting authority and scheme on every request. -pub(super) struct AddOrigin { - authority: http::uri::Authority, - inner: S, - scheme: http::uri::Scheme, -} - /// Log errors talking to the controller in human format. -pub(super) struct LogErrors { +struct LogErrors { inner: S, } @@ -80,6 +77,31 @@ type LogError = ReconnectError< > >; +// ===== impl ClientService ===== + +impl Service for ClientService { + type Request = http::Request; + type Response = http::Response; + type Error = LogError; + type Future = ReconnectFuture< + tower_h2::client::Connect< + Timeout, + ::logging::ContextualExecutor< + ::logging::Client< + &'static str, + HostAndPort, + > + >, BoxBody>>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.0.poll_ready() + } + + fn call(&mut self, request: Self::Request) -> Self::Future { + self.0.call(request) + } +} + // ===== impl BindClient ===== impl BindClient { @@ -133,8 +155,7 @@ impl Rebind for BindClient { let reconnect = Reconnect::new(h2_client); let log_errors = LogErrors::new(reconnect); let backoff = Backoff::new(log_errors, self.backoff_delay); - // TODO: Use AddOrigin in tower-http - AddOrigin::new(scheme, authority, backoff) + ClientService(AddOrigin::new(backoff, scheme, authority)) } } @@ -145,7 +166,7 @@ impl Backoff where S: Service, { - pub(super) fn new(inner: S, wait_dur: Duration) -> Self { + fn new(inner: S, wait_dur: Duration) -> Self { Backoff { inner, timer: Delay::new(Instant::now() + wait_dur), @@ -207,7 +228,7 @@ impl LogErrors where S: Service, { - pub(super) fn new(service: S) -> Self { + fn new(service: S) -> Self { LogErrors { inner: service, } @@ -235,7 +256,7 @@ where } } -pub(super) struct HumanError<'a>(&'a LogError); +struct HumanError<'a>(&'a LogError); impl<'a> fmt::Display for HumanError<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -256,42 +277,6 @@ impl<'a> fmt::Display for HumanError<'a> { } } -// ===== impl AddOrigin ===== - -impl AddOrigin { - pub(super) fn new(scheme: http::uri::Scheme, auth: http::uri::Authority, service: S) -> Self { - AddOrigin { - authority: auth, - inner: service, - scheme, - } - } -} - -impl Service for AddOrigin -where - S: Service>, -{ - type Request = http::Request; - type Response = S::Response; - type Error = S::Error; - type Future = S::Future; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready() - } - - fn call(&mut self, req: Self::Request) -> Self::Future { - let (mut head, body) = req.into_parts(); - let mut uri: http::uri::Parts = head.uri.into(); - uri.scheme = Some(self.scheme.clone()); - uri.authority = Some(self.authority.clone()); - head.uri = http::Uri::from_parts(uri).expect("valid uri"); - - self.inner.call(http::Request::from_parts(head, body)) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/lib.rs b/src/lib.rs index 4b0f61127..a0abb7e2f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ extern crate tempdir; extern crate tokio; extern crate tokio_connect; extern crate tokio_timer; +extern crate tower_add_origin; extern crate tower_balance; extern crate tower_buffer; extern crate tower_discover;