diff --git a/src/control/destination/background/client.rs b/src/control/destination/background/client.rs index 8760497c5..65ecc2edf 100644 --- a/src/control/destination/background/client.rs +++ b/src/control/destination/background/client.rs @@ -1,30 +1,25 @@ -use std::{ - fmt, - io, - time::{Duration, Instant}, -}; +use std::time::Duration; use bytes::Bytes; -use futures::{Async, Future, Poll}; +use futures::Poll; use h2; use http; -use tokio::timer::Delay; use tower_h2::{self, BoxBody, RecvBody}; use tower_add_origin::AddOrigin; -use tower_reconnect::{ - Reconnect, - Error as ReconnectError, - ResponseFuture as ReconnectFuture, -}; use Conditional; use dns; +use proxy; use svc; -use timeout::{Timeout, Error as TimeoutError}; +use timeout::Timeout; use transport::{tls, HostAndPort, LookupAddressAndConnect}; /// Type of the client service stack used to make destination requests. -pub(super) struct ClientService(AddOrigin, ::logging::ContextualExecutor< @@ -35,7 +30,8 @@ pub(super) struct ClientService(AddOrigin, BoxBody, > - >>>>); + > +>; /// The state needed to bind a new controller client stack. pub(super) struct BindClient { @@ -46,54 +42,24 @@ pub(super) struct BindClient { log_ctx: ::logging::Client<&'static str, HostAndPort>, } -/// Wait a duration if inner `poll_ready` returns an error. -//TODO: move to tower-backoff -struct Backoff { - inner: S, - timer: Delay, - waiting: bool, - wait_dur: Duration, -} - -/// Log errors talking to the controller in human format. -struct LogErrors { - inner: S, -} - -// We want some friendly logs, but the stack of services don't have fmt::Display -// errors, so we have to build that ourselves. For now, this hard codes the -// expected error stack, and so any new middleware added will need to adjust this. -// -// The dead_code allowance is because rustc is being stupid and doesn't see it -// is used down below. -#[allow(dead_code)] -type LogError = ReconnectError< - tower_h2::client::Error, - tower_h2::client::ConnectError< - TimeoutError< - io::Error - > - > ->; - // ===== impl ClientService ===== impl svc::Service for ClientService { type Request = http::Request; type Response = http::Response; - type Error = LogError; - type Future = ReconnectFuture< - tower_h2::client::Connect< - Timeout, - ::logging::ContextualExecutor< - ::logging::Client< - &'static str, - HostAndPort, - > - >, BoxBody>>; + type Error = ::Error; + type Future = ::Future; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.0.poll_ready() + // If an error occurs on established connection, log it; otherwise the `reconnect` module will handle re-establishing a client. + loop { + match self.0.poll_ready() { + Ok(v) => return Ok(v), + Err(e) => { + info!("Controller client error: {}", e) + } + } + } } fn call(&mut self, request: Self::Request) -> Self::Future { @@ -150,185 +116,9 @@ impl svc::Stack for BindClient { self.log_ctx.clone().executor() ); - let reconnect = Reconnect::new(h2_client); - let log_errors = LogErrors::new(reconnect); - let backoff = Backoff::new(log_errors, self.backoff_delay); - Ok(ClientService(AddOrigin::new(backoff, scheme, authority))) + let reconnect = proxy::reconnect::Service::new(self.host_and_port.clone(), h2_client) + .with_fixed_backoff(self.backoff_delay); + Ok(ClientService(AddOrigin::new(reconnect, scheme, authority))) } } - -// ===== impl Backoff ===== - -impl Backoff -where - S: svc::Service, -{ - fn new(inner: S, wait_dur: Duration) -> Self { - Backoff { - inner, - timer: Delay::new(Instant::now() + wait_dur), - waiting: false, - wait_dur, - } - } - - fn poll_timer(&mut self) -> Poll<(), S::Error> { - debug_assert!(self.waiting, "poll_timer expects to be waiting"); - - match self.timer.poll().expect("timer shouldn't error") { - Async::Ready(()) => { - self.waiting = false; - Ok(Async::Ready(())) - } - Async::NotReady => { - Ok(Async::NotReady) - } - } - } -} - -impl svc::Service for Backoff -where - S: svc::Service, - S::Error: fmt::Debug, -{ - type Request = S::Request; - type Response = S::Response; - type Error = S::Error; - type Future = S::Future; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - if self.waiting { - try_ready!(self.poll_timer()); - } - - match self.inner.poll_ready() { - Err(_err) => { - trace!("backoff: controller error, waiting {:?}", self.wait_dur); - self.waiting = true; - self.timer.reset(Instant::now() + self.wait_dur); - self.poll_timer() - } - ok => ok, - } - } - - fn call(&mut self, req: Self::Request) -> Self::Future { - self.inner.call(req) - } -} - - -// ===== impl LogErrors ===== - -impl LogErrors -where - S: svc::Service, -{ - fn new(service: S) -> Self { - LogErrors { - inner: service, - } - } -} - -impl svc::Service for LogErrors -where - S: svc::Service, -{ - type Request = S::Request; - type Response = S::Response; - type Error = S::Error; - type Future = S::Future; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready().map_err(|e| { - error!("controller error: {}", HumanError(&e)); - e - }) - } - - fn call(&mut self, req: Self::Request) -> Self::Future { - self.inner.call(req) - } -} - -struct HumanError<'a>(&'a LogError); - -impl<'a> fmt::Display for HumanError<'a> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self.0 { - ReconnectError::Inner(ref e) => { - fmt::Display::fmt(e, f) - }, - ReconnectError::Connect(ref e) => { - fmt::Display::fmt(e, f) - }, - ReconnectError::NotReady => { - // this error should only happen if we `call` the service - // when it isn't ready, which is really more of a bug on - // our side... - f.pad("bug: called service when not ready") - }, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures::future; - use svc::Service as _Service; - use tokio::runtime::current_thread::Runtime; - - struct MockService { - polls: usize, - succeed_after: usize, - } - - impl svc::Service for MockService { - type Request = (); - type Response = (); - type Error = (); - type Future = future::FutureResult<(), ()>; - - fn poll_ready(&mut self) -> Poll<(), ()> { - self.polls += 1; - if self.polls > self.succeed_after { - Ok(().into()) - } else { - Err(()) - } - } - - fn call(&mut self, _req: ()) -> Self::Future { - if self.polls > self.succeed_after { - future::ok(()) - } else { - future::err(()) - } - } - } - - fn succeed_after(cnt: usize) -> MockService { - MockService { - polls: 0, - succeed_after: cnt, - } - } - - - #[test] - fn backoff() { - let mock = succeed_after(2); - let mut backoff = Backoff::new(mock, Duration::from_millis(5)); - let mut rt = Runtime::new().unwrap(); - - // The simple existance of this test checks that `Backoff` doesn't - // hang after seeing an error in `inner.poll_ready()`, but registers - // to poll again. - rt.block_on(future::poll_fn(|| backoff.poll_ready())).unwrap(); - } -} - diff --git a/src/lib.rs b/src/lib.rs index d3f201724..b168e9269 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,7 +37,6 @@ extern crate tokio_timer; extern crate tower_add_origin; extern crate tower_grpc; extern crate tower_h2; -extern crate tower_reconnect; extern crate tower_util; extern crate trust_dns_resolver; extern crate try_lock; diff --git a/src/proxy/reconnect.rs b/src/proxy/reconnect.rs index 2f9b2b364..8ab19795e 100644 --- a/src/proxy/reconnect.rs +++ b/src/proxy/reconnect.rs @@ -1,17 +1,23 @@ +extern crate tower_reconnect; + use futures::{task, Async, Future, Poll}; use std::fmt; use std::marker::PhantomData; -use tower_reconnect; +use std::time::Duration; +pub use self::tower_reconnect::{Error, Reconnect}; +use tokio_timer::{clock, Delay}; use svc; #[derive(Debug)] pub struct Layer { + backoff: Backoff, _p: PhantomData (T, M)>, } #[derive(Debug)] pub struct Stack { + backoff: Backoff, inner: M, _p: PhantomData T>, } @@ -25,19 +31,28 @@ where T: fmt::Debug, N: svc::NewService, { - inner: tower_reconnect::Reconnect, + inner: Reconnect, /// The target, used for debug logging. target: T, + backoff: Backoff, + active_backoff: Option, + /// Prevents logging repeated connect errors. /// /// Set back to false after a connect succeeds, to log about future errors. mute_connect_error_log: bool, } +#[derive(Clone, Debug)] +enum Backoff { + None, + Fixed(Duration), +} + pub struct ResponseFuture { - inner: as svc::Service>::Future, + inner: as svc::Service>::Future, } // === impl Layer === @@ -50,14 +65,25 @@ where { pub fn new() -> Self { Self { + backoff: Backoff::None, _p: PhantomData, } } + + // TODO: once a stacked clients needs backoff... + // + // pub fn with_fixed_backoff(self, wait: Duration) -> Self { + // Self { + // backoff: Backoff::Fixed(wait), + // .. self + // } + // } } impl Clone for Layer { fn clone(&self) -> Self { Self { + backoff: self.backoff.clone(), _p: PhantomData, } } @@ -76,6 +102,7 @@ where fn bind(&self, inner: M) -> Self::Stack { Stack { inner, + backoff: self.backoff.clone(), _p: PhantomData, } } @@ -86,6 +113,7 @@ where impl Clone for Stack { fn clone(&self) -> Self { Self { + backoff: self.backoff.clone(), inner: self.inner.clone(), _p: PhantomData, } @@ -104,8 +132,10 @@ where fn make(&self, target: &T) -> Result { let new_service = self.inner.make(target)?; Ok(Service { - inner: tower_reconnect::Reconnect::new(new_service), + inner: Reconnect::new(new_service), target: target.clone(), + backoff: self.backoff.clone(), + active_backoff: None, mute_connect_error_log: false, }) } @@ -113,6 +143,30 @@ where // === impl Service === +impl Service +where + T: fmt::Debug, + N: svc::NewService, + N::InitError: fmt::Display, +{ + pub fn new(target: T, new_service: N) -> Self { + Self { + inner: Reconnect::new(new_service), + target, + backoff: Backoff::None, + active_backoff: None, + mute_connect_error_log: false, + } + } + + pub fn with_fixed_backoff(self, wait: Duration) -> Self { + Self { + backoff: Backoff::Fixed(wait), + .. self + } + } +} + impl svc::Service for Service where T: fmt::Debug, @@ -125,6 +179,22 @@ where type Future = ResponseFuture; fn poll_ready(&mut self) -> Poll<(), Self::Error> { + match self.backoff { + Backoff::None => {} + Backoff::Fixed(_) => { + if let Some(delay) = self.active_backoff.as_mut() { + match delay.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(())) => {}, + Err(e) => { + error!("timer failed; continuing without backoff: {}", e); + } + } + } + } + }; + self.active_backoff = None; + match self.inner.poll_ready() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(ready) => { @@ -132,12 +202,12 @@ where Ok(ready) } - Err(tower_reconnect::Error::Inner(err)) => { + Err(Error::Inner(err)) => { self.mute_connect_error_log = false; Err(err) } - Err(tower_reconnect::Error::Connect(err)) => { + Err(Error::Connect(err)) => { // A connection could not be established to the target. // This is only logged as a warning at most once. Subsequent @@ -149,6 +219,15 @@ where debug!("connect error to {:?}: {}", self.target, err); } + // Set a backoff if appropriate. + // + // This future need not be polled immediately because the + // task is notified below. + self.active_backoff = match self.backoff { + Backoff::None => None, + Backoff::Fixed(wait) => Some(Delay::new(clock::now() + wait)), + }; + // The inner service is now idle and will renew its internal // state on the next poll. Instead of doing this immediately, // the task is scheduled to be polled again only if the caller @@ -160,7 +239,7 @@ where Ok(Async::NotReady) } - Err(tower_reconnect::Error::NotReady) => { + Err(Error::NotReady) => { unreachable!("poll_ready can't fail with NotReady"); } } @@ -187,8 +266,97 @@ impl Future for ResponseFuture { fn poll(&mut self) -> Poll { self.inner.poll().map_err(|e| match e { - tower_reconnect::Error::Inner(err) => err, + Error::Inner(err) => err, _ => unreachable!("response future must fail with inner error"), }) } } + +#[cfg(test)] +mod tests { + use super::*; + use futures::{future, Future}; + use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; + use svc::Service as _Service; + use std::{error, fmt, time}; + use tokio::runtime::current_thread::Runtime; + + struct NewService { + fails: AtomicUsize, + } + + struct Service {} + + struct InitFuture { + should_fail: bool, + } + + #[derive(Debug)] + struct InitErr {} + + impl svc::NewService for NewService { + type Request = (); + type Response = (); + type Error = (); + type Service = Service; + type InitError = InitErr; + type Future = InitFuture; + + fn new_service(&self) -> Self::Future { + InitFuture { + should_fail: self.fails.fetch_sub(1, Relaxed) > 0, + } + } + } + + impl svc::Service for Service { + type Request = (); + type Response = (); + type Error = (); + type Future = future::FutureResult<(), ()>; + + fn poll_ready(&mut self) -> Poll<(), ()> { + Ok(().into()) + } + + fn call(&mut self, _req: ()) -> Self::Future { + future::ok(()) + } + } + + impl Future for InitFuture { + type Item = Service; + type Error = InitErr; + + fn poll(&mut self) -> Poll { + if self.should_fail { + return Err(InitErr {}) + } + + Ok(Service{}.into()) + } + } + + impl fmt::Display for InitErr { + fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { + Ok(()) + } + } + impl error::Error for InitErr {} + + #[test] + fn reconnects_with_backoff() { + let mock = NewService { fails: 2.into() }; + let mut backoff = super::Service::new("test", mock) + .with_fixed_backoff(Duration::from_millis(100)); + let mut rt = Runtime::new().unwrap(); + + // Checks that, after the inner NewService fails to connect twice, it + // succeeds on a third attempt. + let t0 = time::Instant::now(); + let f = future::poll_fn(|| backoff.poll_ready()); + rt.block_on(f).unwrap(); + + assert!(t0.elapsed() >= Duration::from_millis(200)) + } +}