Introduce an optional fixed backoff between reconnects (#105)
The control client implements a backoff service that dampens reconnect attempts to the control plane by waiting a fixed period of time after a failure. Furthermore, the control client logs errors each time a reconnect attempt fails. This change moves backoff logic from control::destination::background::client to proxy::reconnect. Because the reconnect module handles connection errors uniformly, muting repeated errors, it also has enough context to know when a backoff should be applied -- when the underlying NewService cannot produce a Service. If polling the inner service fails once the Service has been established, we do not want to apply a backoff, since this may just be the result of a connection being terminated, a process being restarted, etc.
This commit is contained in:
parent
66b1731f19
commit
6f031bdca0
|
@ -1,30 +1,25 @@
|
|||
use std::{
|
||||
fmt,
|
||||
io,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{Async, Future, Poll};
|
||||
use futures::Poll;
|
||||
use h2;
|
||||
use http;
|
||||
use tokio::timer::Delay;
|
||||
|
||||
use tower_h2::{self, BoxBody, RecvBody};
|
||||
use tower_add_origin::AddOrigin;
|
||||
use tower_reconnect::{
|
||||
Reconnect,
|
||||
Error as ReconnectError,
|
||||
ResponseFuture as ReconnectFuture,
|
||||
};
|
||||
use Conditional;
|
||||
use dns;
|
||||
use proxy;
|
||||
use svc;
|
||||
use timeout::{Timeout, Error as TimeoutError};
|
||||
use timeout::Timeout;
|
||||
use transport::{tls, HostAndPort, LookupAddressAndConnect};
|
||||
|
||||
/// Type of the client service stack used to make destination requests.
|
||||
pub(super) struct ClientService(AddOrigin<Backoff<LogErrors<Reconnect<
|
||||
pub(super) struct ClientService(Service);
|
||||
|
||||
type Service = AddOrigin<
|
||||
proxy::reconnect::Service<
|
||||
HostAndPort,
|
||||
tower_h2::client::Connect<
|
||||
Timeout<LookupAddressAndConnect>,
|
||||
::logging::ContextualExecutor<
|
||||
|
@ -35,7 +30,8 @@ pub(super) struct ClientService(AddOrigin<Backoff<LogErrors<Reconnect<
|
|||
>,
|
||||
BoxBody,
|
||||
>
|
||||
>>>>);
|
||||
>
|
||||
>;
|
||||
|
||||
/// The state needed to bind a new controller client stack.
|
||||
pub(super) struct BindClient {
|
||||
|
@ -46,54 +42,24 @@ pub(super) struct BindClient {
|
|||
log_ctx: ::logging::Client<&'static str, HostAndPort>,
|
||||
}
|
||||
|
||||
/// Wait a duration if inner `poll_ready` returns an error.
|
||||
//TODO: move to tower-backoff
|
||||
struct Backoff<S> {
|
||||
inner: S,
|
||||
timer: Delay,
|
||||
waiting: bool,
|
||||
wait_dur: Duration,
|
||||
}
|
||||
|
||||
/// Log errors talking to the controller in human format.
|
||||
struct LogErrors<S> {
|
||||
inner: S,
|
||||
}
|
||||
|
||||
// We want some friendly logs, but the stack of services don't have fmt::Display
|
||||
// errors, so we have to build that ourselves. For now, this hard codes the
|
||||
// expected error stack, and so any new middleware added will need to adjust this.
|
||||
//
|
||||
// The dead_code allowance is because rustc is being stupid and doesn't see it
|
||||
// is used down below.
|
||||
#[allow(dead_code)]
|
||||
type LogError = ReconnectError<
|
||||
tower_h2::client::Error,
|
||||
tower_h2::client::ConnectError<
|
||||
TimeoutError<
|
||||
io::Error
|
||||
>
|
||||
>
|
||||
>;
|
||||
|
||||
// ===== impl ClientService =====
|
||||
|
||||
impl svc::Service for ClientService {
|
||||
type Request = http::Request<BoxBody>;
|
||||
type Response = http::Response<RecvBody>;
|
||||
type Error = LogError;
|
||||
type Future = ReconnectFuture<
|
||||
tower_h2::client::Connect<
|
||||
Timeout<LookupAddressAndConnect>,
|
||||
::logging::ContextualExecutor<
|
||||
::logging::Client<
|
||||
&'static str,
|
||||
HostAndPort,
|
||||
>
|
||||
>, BoxBody>>;
|
||||
type Error = <Service as svc::Service>::Error;
|
||||
type Future = <Service as svc::Service>::Future;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.0.poll_ready()
|
||||
// If an error occurs on established connection, log it; otherwise the `reconnect` module will handle re-establishing a client.
|
||||
loop {
|
||||
match self.0.poll_ready() {
|
||||
Ok(v) => return Ok(v),
|
||||
Err(e) => {
|
||||
info!("Controller client error: {}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, request: Self::Request) -> Self::Future {
|
||||
|
@ -150,185 +116,9 @@ impl svc::Stack<tls::ConditionalClientConfig> for BindClient {
|
|||
self.log_ctx.clone().executor()
|
||||
);
|
||||
|
||||
let reconnect = Reconnect::new(h2_client);
|
||||
let log_errors = LogErrors::new(reconnect);
|
||||
let backoff = Backoff::new(log_errors, self.backoff_delay);
|
||||
Ok(ClientService(AddOrigin::new(backoff, scheme, authority)))
|
||||
let reconnect = proxy::reconnect::Service::new(self.host_and_port.clone(), h2_client)
|
||||
.with_fixed_backoff(self.backoff_delay);
|
||||
Ok(ClientService(AddOrigin::new(reconnect, scheme, authority)))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// ===== impl Backoff =====
|
||||
|
||||
impl<S> Backoff<S>
|
||||
where
|
||||
S: svc::Service,
|
||||
{
|
||||
fn new(inner: S, wait_dur: Duration) -> Self {
|
||||
Backoff {
|
||||
inner,
|
||||
timer: Delay::new(Instant::now() + wait_dur),
|
||||
waiting: false,
|
||||
wait_dur,
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_timer(&mut self) -> Poll<(), S::Error> {
|
||||
debug_assert!(self.waiting, "poll_timer expects to be waiting");
|
||||
|
||||
match self.timer.poll().expect("timer shouldn't error") {
|
||||
Async::Ready(()) => {
|
||||
self.waiting = false;
|
||||
Ok(Async::Ready(()))
|
||||
}
|
||||
Async::NotReady => {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> svc::Service for Backoff<S>
|
||||
where
|
||||
S: svc::Service,
|
||||
S::Error: fmt::Debug,
|
||||
{
|
||||
type Request = S::Request;
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
if self.waiting {
|
||||
try_ready!(self.poll_timer());
|
||||
}
|
||||
|
||||
match self.inner.poll_ready() {
|
||||
Err(_err) => {
|
||||
trace!("backoff: controller error, waiting {:?}", self.wait_dur);
|
||||
self.waiting = true;
|
||||
self.timer.reset(Instant::now() + self.wait_dur);
|
||||
self.poll_timer()
|
||||
}
|
||||
ok => ok,
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
self.inner.call(req)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ===== impl LogErrors =====
|
||||
|
||||
impl<S> LogErrors<S>
|
||||
where
|
||||
S: svc::Service<Error=LogError>,
|
||||
{
|
||||
fn new(service: S) -> Self {
|
||||
LogErrors {
|
||||
inner: service,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> svc::Service for LogErrors<S>
|
||||
where
|
||||
S: svc::Service<Error=LogError>,
|
||||
{
|
||||
type Request = S::Request;
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
self.inner.poll_ready().map_err(|e| {
|
||||
error!("controller error: {}", HumanError(&e));
|
||||
e
|
||||
})
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
self.inner.call(req)
|
||||
}
|
||||
}
|
||||
|
||||
struct HumanError<'a>(&'a LogError);
|
||||
|
||||
impl<'a> fmt::Display for HumanError<'a> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self.0 {
|
||||
ReconnectError::Inner(ref e) => {
|
||||
fmt::Display::fmt(e, f)
|
||||
},
|
||||
ReconnectError::Connect(ref e) => {
|
||||
fmt::Display::fmt(e, f)
|
||||
},
|
||||
ReconnectError::NotReady => {
|
||||
// this error should only happen if we `call` the service
|
||||
// when it isn't ready, which is really more of a bug on
|
||||
// our side...
|
||||
f.pad("bug: called service when not ready")
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures::future;
|
||||
use svc::Service as _Service;
|
||||
use tokio::runtime::current_thread::Runtime;
|
||||
|
||||
struct MockService {
|
||||
polls: usize,
|
||||
succeed_after: usize,
|
||||
}
|
||||
|
||||
impl svc::Service for MockService {
|
||||
type Request = ();
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = future::FutureResult<(), ()>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), ()> {
|
||||
self.polls += 1;
|
||||
if self.polls > self.succeed_after {
|
||||
Ok(().into())
|
||||
} else {
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, _req: ()) -> Self::Future {
|
||||
if self.polls > self.succeed_after {
|
||||
future::ok(())
|
||||
} else {
|
||||
future::err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn succeed_after(cnt: usize) -> MockService {
|
||||
MockService {
|
||||
polls: 0,
|
||||
succeed_after: cnt,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn backoff() {
|
||||
let mock = succeed_after(2);
|
||||
let mut backoff = Backoff::new(mock, Duration::from_millis(5));
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
|
||||
// The simple existance of this test checks that `Backoff` doesn't
|
||||
// hang after seeing an error in `inner.poll_ready()`, but registers
|
||||
// to poll again.
|
||||
rt.block_on(future::poll_fn(|| backoff.poll_ready())).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,6 @@ extern crate tokio_timer;
|
|||
extern crate tower_add_origin;
|
||||
extern crate tower_grpc;
|
||||
extern crate tower_h2;
|
||||
extern crate tower_reconnect;
|
||||
extern crate tower_util;
|
||||
extern crate trust_dns_resolver;
|
||||
extern crate try_lock;
|
||||
|
|
|
@ -1,17 +1,23 @@
|
|||
extern crate tower_reconnect;
|
||||
|
||||
use futures::{task, Async, Future, Poll};
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use tower_reconnect;
|
||||
use std::time::Duration;
|
||||
pub use self::tower_reconnect::{Error, Reconnect};
|
||||
use tokio_timer::{clock, Delay};
|
||||
|
||||
use svc;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Layer<T, M> {
|
||||
backoff: Backoff,
|
||||
_p: PhantomData<fn() -> (T, M)>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Stack<T, M> {
|
||||
backoff: Backoff,
|
||||
inner: M,
|
||||
_p: PhantomData<fn() -> T>,
|
||||
}
|
||||
|
@ -25,19 +31,28 @@ where
|
|||
T: fmt::Debug,
|
||||
N: svc::NewService,
|
||||
{
|
||||
inner: tower_reconnect::Reconnect<N>,
|
||||
inner: Reconnect<N>,
|
||||
|
||||
/// The target, used for debug logging.
|
||||
target: T,
|
||||
|
||||
backoff: Backoff,
|
||||
active_backoff: Option<Delay>,
|
||||
|
||||
/// Prevents logging repeated connect errors.
|
||||
///
|
||||
/// Set back to false after a connect succeeds, to log about future errors.
|
||||
mute_connect_error_log: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
enum Backoff {
|
||||
None,
|
||||
Fixed(Duration),
|
||||
}
|
||||
|
||||
pub struct ResponseFuture<N: svc::NewService> {
|
||||
inner: <tower_reconnect::Reconnect<N> as svc::Service>::Future,
|
||||
inner: <Reconnect<N> as svc::Service>::Future,
|
||||
}
|
||||
|
||||
// === impl Layer ===
|
||||
|
@ -50,14 +65,25 @@ where
|
|||
{
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
backoff: Backoff::None,
|
||||
_p: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: once a stacked clients needs backoff...
|
||||
//
|
||||
// pub fn with_fixed_backoff(self, wait: Duration) -> Self {
|
||||
// Self {
|
||||
// backoff: Backoff::Fixed(wait),
|
||||
// .. self
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
impl<T, M> Clone for Layer<T, M> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
backoff: self.backoff.clone(),
|
||||
_p: PhantomData,
|
||||
}
|
||||
}
|
||||
|
@ -76,6 +102,7 @@ where
|
|||
fn bind(&self, inner: M) -> Self::Stack {
|
||||
Stack {
|
||||
inner,
|
||||
backoff: self.backoff.clone(),
|
||||
_p: PhantomData,
|
||||
}
|
||||
}
|
||||
|
@ -86,6 +113,7 @@ where
|
|||
impl<T, M: Clone> Clone for Stack<T, M> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
backoff: self.backoff.clone(),
|
||||
inner: self.inner.clone(),
|
||||
_p: PhantomData,
|
||||
}
|
||||
|
@ -104,8 +132,10 @@ where
|
|||
fn make(&self, target: &T) -> Result<Self::Value, Self::Error> {
|
||||
let new_service = self.inner.make(target)?;
|
||||
Ok(Service {
|
||||
inner: tower_reconnect::Reconnect::new(new_service),
|
||||
inner: Reconnect::new(new_service),
|
||||
target: target.clone(),
|
||||
backoff: self.backoff.clone(),
|
||||
active_backoff: None,
|
||||
mute_connect_error_log: false,
|
||||
})
|
||||
}
|
||||
|
@ -113,6 +143,30 @@ where
|
|||
|
||||
// === impl Service ===
|
||||
|
||||
impl<T, N> Service<T, N>
|
||||
where
|
||||
T: fmt::Debug,
|
||||
N: svc::NewService,
|
||||
N::InitError: fmt::Display,
|
||||
{
|
||||
pub fn new(target: T, new_service: N) -> Self {
|
||||
Self {
|
||||
inner: Reconnect::new(new_service),
|
||||
target,
|
||||
backoff: Backoff::None,
|
||||
active_backoff: None,
|
||||
mute_connect_error_log: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_fixed_backoff(self, wait: Duration) -> Self {
|
||||
Self {
|
||||
backoff: Backoff::Fixed(wait),
|
||||
.. self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, N> svc::Service for Service<T, N>
|
||||
where
|
||||
T: fmt::Debug,
|
||||
|
@ -125,6 +179,22 @@ where
|
|||
type Future = ResponseFuture<N>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
match self.backoff {
|
||||
Backoff::None => {}
|
||||
Backoff::Fixed(_) => {
|
||||
if let Some(delay) = self.active_backoff.as_mut() {
|
||||
match delay.poll() {
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Ok(Async::Ready(())) => {},
|
||||
Err(e) => {
|
||||
error!("timer failed; continuing without backoff: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
self.active_backoff = None;
|
||||
|
||||
match self.inner.poll_ready() {
|
||||
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||
Ok(ready) => {
|
||||
|
@ -132,12 +202,12 @@ where
|
|||
Ok(ready)
|
||||
}
|
||||
|
||||
Err(tower_reconnect::Error::Inner(err)) => {
|
||||
Err(Error::Inner(err)) => {
|
||||
self.mute_connect_error_log = false;
|
||||
Err(err)
|
||||
}
|
||||
|
||||
Err(tower_reconnect::Error::Connect(err)) => {
|
||||
Err(Error::Connect(err)) => {
|
||||
// A connection could not be established to the target.
|
||||
|
||||
// This is only logged as a warning at most once. Subsequent
|
||||
|
@ -149,6 +219,15 @@ where
|
|||
debug!("connect error to {:?}: {}", self.target, err);
|
||||
}
|
||||
|
||||
// Set a backoff if appropriate.
|
||||
//
|
||||
// This future need not be polled immediately because the
|
||||
// task is notified below.
|
||||
self.active_backoff = match self.backoff {
|
||||
Backoff::None => None,
|
||||
Backoff::Fixed(wait) => Some(Delay::new(clock::now() + wait)),
|
||||
};
|
||||
|
||||
// The inner service is now idle and will renew its internal
|
||||
// state on the next poll. Instead of doing this immediately,
|
||||
// the task is scheduled to be polled again only if the caller
|
||||
|
@ -160,7 +239,7 @@ where
|
|||
Ok(Async::NotReady)
|
||||
}
|
||||
|
||||
Err(tower_reconnect::Error::NotReady) => {
|
||||
Err(Error::NotReady) => {
|
||||
unreachable!("poll_ready can't fail with NotReady");
|
||||
}
|
||||
}
|
||||
|
@ -187,8 +266,97 @@ impl<N: svc::NewService> Future for ResponseFuture<N> {
|
|||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
self.inner.poll().map_err(|e| match e {
|
||||
tower_reconnect::Error::Inner(err) => err,
|
||||
Error::Inner(err) => err,
|
||||
_ => unreachable!("response future must fail with inner error"),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures::{future, Future};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
|
||||
use svc::Service as _Service;
|
||||
use std::{error, fmt, time};
|
||||
use tokio::runtime::current_thread::Runtime;
|
||||
|
||||
struct NewService {
|
||||
fails: AtomicUsize,
|
||||
}
|
||||
|
||||
struct Service {}
|
||||
|
||||
struct InitFuture {
|
||||
should_fail: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct InitErr {}
|
||||
|
||||
impl svc::NewService for NewService {
|
||||
type Request = ();
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Service = Service;
|
||||
type InitError = InitErr;
|
||||
type Future = InitFuture;
|
||||
|
||||
fn new_service(&self) -> Self::Future {
|
||||
InitFuture {
|
||||
should_fail: self.fails.fetch_sub(1, Relaxed) > 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl svc::Service for Service {
|
||||
type Request = ();
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = future::FutureResult<(), ()>;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), ()> {
|
||||
Ok(().into())
|
||||
}
|
||||
|
||||
fn call(&mut self, _req: ()) -> Self::Future {
|
||||
future::ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for InitFuture {
|
||||
type Item = Service;
|
||||
type Error = InitErr;
|
||||
|
||||
fn poll(&mut self) -> Poll<Service, InitErr> {
|
||||
if self.should_fail {
|
||||
return Err(InitErr {})
|
||||
}
|
||||
|
||||
Ok(Service{}.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for InitErr {
|
||||
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
impl error::Error for InitErr {}
|
||||
|
||||
#[test]
|
||||
fn reconnects_with_backoff() {
|
||||
let mock = NewService { fails: 2.into() };
|
||||
let mut backoff = super::Service::new("test", mock)
|
||||
.with_fixed_backoff(Duration::from_millis(100));
|
||||
let mut rt = Runtime::new().unwrap();
|
||||
|
||||
// Checks that, after the inner NewService fails to connect twice, it
|
||||
// succeeds on a third attempt.
|
||||
let t0 = time::Instant::now();
|
||||
let f = future::poll_fn(|| backoff.poll_ready());
|
||||
rt.block_on(f).unwrap();
|
||||
|
||||
assert!(t0.elapsed() >= Duration::from_millis(200))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue