Add timeout to in-flight telemetry reports (#12)

This PR adds a configurable timeout duration after which in-flight telemetry reports are dropped, cancelling the corresponding RPC request to the control plane.

I've also made the `Timeout` implementation used in `TimeoutConnect` generic, and reused it in multiple places, including the timeout for in-flight reports.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Eliza Weisman 2017-12-13 15:07:36 -08:00 committed by GitHub
parent b4ace4642a
commit 97be2dd8cd
7 changed files with 236 additions and 84 deletions

View File

@ -15,6 +15,7 @@ use control;
use ctx; use ctx;
use telemetry; use telemetry;
use transport; use transport;
use ::timeout::Timeout;
const DEFAULT_TIMEOUT_MS: u64 = 300; const DEFAULT_TIMEOUT_MS: u64 = 300;
@ -120,6 +121,7 @@ impl<C, B> Bind<C, B> {
// pub fn sensors(&self) -> &telemetry::Sensors { // pub fn sensors(&self) -> &telemetry::Sensors {
// &self.sensors // &self.sensors
// } // }
} }
impl<B> Bind<Arc<ctx::Proxy>, B> impl<B> Bind<Arc<ctx::Proxy>, B>
@ -132,7 +134,7 @@ where
// Map a socket address to an HTTP/2.0 connection. // Map a socket address to an HTTP/2.0 connection.
let connect = { let connect = {
let c = transport::TimeoutConnect::new( let c = Timeout::new(
transport::Connect::new(*addr, &self.executor), transport::Connect::new(*addr, &self.executor),
self.connect_timeout, self.connect_timeout,
&self.executor, &self.executor,

View File

@ -40,8 +40,11 @@ pub struct Config {
/// Event queue capacity. /// Event queue capacity.
pub event_buffer_capacity: usize, pub event_buffer_capacity: usize,
/// Interval after which to flush metrics /// Interval after which to flush metrics.
pub metrics_flush_interval: Duration, pub metrics_flush_interval: Duration,
/// Timeout after which to cancel telemetry reports.
pub report_timeout: Duration,
} }
/// Configuration settings for binding a listener. /// Configuration settings for binding a listener.
@ -100,6 +103,7 @@ pub enum UrlError {
// Environment variables to look at when loading the configuration // Environment variables to look at when loading the configuration
const ENV_EVENT_BUFFER_CAPACITY: &str = "CONDUIT_PROXY_EVENT_BUFFER_CAPACITY"; const ENV_EVENT_BUFFER_CAPACITY: &str = "CONDUIT_PROXY_EVENT_BUFFER_CAPACITY";
const ENV_METRICS_FLUSH_INTERVAL_SECS: &str = "CONDUIT_PROXY_METRICS_FLUSH_INTERVAL_SECS"; const ENV_METRICS_FLUSH_INTERVAL_SECS: &str = "CONDUIT_PROXY_METRICS_FLUSH_INTERVAL_SECS";
const ENV_REPORT_TIMEOUT_SECS: &str = "CONDUIT_PROXY_REPORT_TIMEOUT_SECS";
const ENV_PRIVATE_LISTENER: &str = "CONDUIT_PROXY_PRIVATE_LISTENER"; const ENV_PRIVATE_LISTENER: &str = "CONDUIT_PROXY_PRIVATE_LISTENER";
const ENV_PRIVATE_FORWARD: &str = "CONDUIT_PROXY_PRIVATE_FORWARD"; const ENV_PRIVATE_FORWARD: &str = "CONDUIT_PROXY_PRIVATE_FORWARD";
const ENV_PUBLIC_LISTENER: &str = "CONDUIT_PROXY_PUBLIC_LISTENER"; const ENV_PUBLIC_LISTENER: &str = "CONDUIT_PROXY_PUBLIC_LISTENER";
@ -118,6 +122,7 @@ const ENV_RESOLV_CONF: &str = "CONDUIT_RESOLV_CONF";
// Default values for various configuration fields // Default values for various configuration fields
const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 10_000; // FIXME const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 10_000; // FIXME
const DEFAULT_METRICS_FLUSH_INTERVAL_SECS: u64 = 10; const DEFAULT_METRICS_FLUSH_INTERVAL_SECS: u64 = 10;
const DEFAULT_REPORT_TIMEOUT_SECS: u64 = 10; // TODO: is this a reasonable default?
const DEFAULT_PRIVATE_LISTENER: &str = "tcp://127.0.0.1:4140"; const DEFAULT_PRIVATE_LISTENER: &str = "tcp://127.0.0.1:4140";
const DEFAULT_PUBLIC_LISTENER: &str = "tcp://0.0.0.0:4143"; const DEFAULT_PUBLIC_LISTENER: &str = "tcp://0.0.0.0:4143";
const DEFAULT_CONTROL_LISTENER: &str = "tcp://0.0.0.0:4190"; const DEFAULT_CONTROL_LISTENER: &str = "tcp://0.0.0.0:4190";
@ -134,7 +139,13 @@ impl Config {
let metrics_flush_interval = Duration::from_secs( let metrics_flush_interval = Duration::from_secs(
env_var_parse(ENV_METRICS_FLUSH_INTERVAL_SECS, parse_number)? env_var_parse(ENV_METRICS_FLUSH_INTERVAL_SECS, parse_number)?
.unwrap_or(DEFAULT_METRICS_FLUSH_INTERVAL_SECS)); .unwrap_or(DEFAULT_METRICS_FLUSH_INTERVAL_SECS)
);
let report_timeout = Duration::from_secs(
env_var_parse(ENV_REPORT_TIMEOUT_SECS, parse_number)?
.unwrap_or(DEFAULT_REPORT_TIMEOUT_SECS)
);
Ok(Config { Ok(Config {
private_listener: Listener { private_listener: Listener {
@ -166,6 +177,7 @@ impl Config {
event_buffer_capacity, event_buffer_capacity,
metrics_flush_interval, metrics_flush_interval,
report_timeout,
}) })
} }
} }

View File

@ -5,14 +5,20 @@ use bytes::Bytes;
use futures::{future, Async, Future, Poll, Stream}; use futures::{future, Async, Future, Poll, Stream};
use h2; use h2;
use http; use http;
use tokio_core::reactor::{Handle, Timeout}; use tokio_core::reactor::{
Handle,
// TODO: would rather just have Backoff in a separate file so this
// renaming import is not necessary.
Timeout as ReactorTimeout
};
use tower::Service; use tower::Service;
use tower_h2; use tower_h2;
use tower_reconnect::Reconnect; use tower_reconnect::Reconnect;
use url::HostAndPort; use url::HostAndPort;
use dns; use dns;
use transport::{LookupAddressAndConnect, TimeoutConnect}; use transport::LookupAddressAndConnect;
use timeout::Timeout;
mod codec; mod codec;
pub mod discovery; pub mod discovery;
@ -64,6 +70,7 @@ impl Background {
events: S, events: S,
host_and_port: HostAndPort, host_and_port: HostAndPort,
dns_config: dns::Config, dns_config: dns::Config,
report_timeout: Duration,
executor: &Handle, executor: &Handle,
) -> Box<Future<Item = (), Error = ()>> ) -> Box<Future<Item = (), Error = ()>>
where where
@ -77,11 +84,12 @@ impl Background {
http::uri::Authority::from_shared(format!("{}", host_and_port).into()).unwrap(); http::uri::Authority::from_shared(format!("{}", host_and_port).into()).unwrap();
let dns_resolver = dns::Resolver::new(dns_config, executor); let dns_resolver = dns::Resolver::new(dns_config, executor);
let connect = TimeoutConnect::new( let connect = Timeout::new(
LookupAddressAndConnect::new(host_and_port, dns_resolver, executor), LookupAddressAndConnect::new(host_and_port, dns_resolver, executor),
Duration::from_secs(3), Duration::from_secs(3),
executor, executor,
); );
let h2_client = tower_h2::client::Client::new( let h2_client = tower_h2::client::Client::new(
connect, connect,
h2::client::Builder::default(), h2::client::Builder::default(),
@ -95,7 +103,7 @@ impl Background {
}; };
let mut disco = self.disco.work(); let mut disco = self.disco.work();
let mut telemetry = Telemetry::new(events); let mut telemetry = Telemetry::new(events, report_timeout, executor);
let fut = future::poll_fn(move || { let fut = future::poll_fn(move || {
trace!("poll rpc services"); trace!("poll rpc services");
@ -114,7 +122,7 @@ impl Background {
//TODO: move to tower-backoff //TODO: move to tower-backoff
struct Backoff<S> { struct Backoff<S> {
inner: S, inner: S,
timer: Timeout, timer: ReactorTimeout,
waiting: bool, waiting: bool,
wait_dur: Duration, wait_dur: Duration,
} }
@ -123,7 +131,7 @@ impl<S> Backoff<S> {
fn new(inner: S, wait_dur: Duration, handle: &Handle) -> Self { fn new(inner: S, wait_dur: Duration, handle: &Handle) -> Self {
Backoff { Backoff {
inner, inner,
timer: Timeout::new(wait_dur, handle).unwrap(), timer: ReactorTimeout::new(wait_dur, handle).unwrap(),
waiting: false, waiting: false,
wait_dur, wait_dur,
} }

View File

@ -1,13 +1,15 @@
use std::time::Instant; use std::time::{Duration, Instant};
use futures::{Async, Future, Stream}; use futures::{Async, Future, Stream};
use tower::Service; use tower::Service;
use tower_grpc; use tower_grpc;
use tokio_core::reactor::Handle;
use super::codec::Protobuf; use super::codec::Protobuf;
use super::pb::proxy::telemetry::{ReportRequest, ReportResponse}; use super::pb::proxy::telemetry::{ReportRequest, ReportResponse};
use super::pb::proxy::telemetry::client::Telemetry as TelemetrySvc; use super::pb::proxy::telemetry::client::Telemetry as TelemetrySvc;
use super::pb::proxy::telemetry::client::telemetry_methods::Report as ReportRpc; use super::pb::proxy::telemetry::client::telemetry_methods::Report as ReportRpc;
use ::timeout::{Timeout, TimeoutFuture};
pub type ClientBody = tower_grpc::client::codec::EncodingBody< pub type ClientBody = tower_grpc::client::codec::EncodingBody<
Protobuf<ReportRequest, ReportResponse>, Protobuf<ReportRequest, ReportResponse>,
@ -16,7 +18,7 @@ pub type ClientBody = tower_grpc::client::codec::EncodingBody<
type TelemetryStream<F> = tower_grpc::client::BodyFuture< type TelemetryStream<F> = tower_grpc::client::BodyFuture<
tower_grpc::client::Unary< tower_grpc::client::Unary<
tower_grpc::client::ResponseFuture<Protobuf<ReportRequest, ReportResponse>, F>, tower_grpc::client::ResponseFuture<Protobuf<ReportRequest, ReportResponse>, TimeoutFuture<F>>,
Protobuf<ReportRequest, ReportResponse>, Protobuf<ReportRequest, ReportResponse>,
>, >,
>; >;
@ -25,6 +27,8 @@ type TelemetryStream<F> = tower_grpc::client::BodyFuture<
pub struct Telemetry<T, F> { pub struct Telemetry<T, F> {
reports: T, reports: T,
in_flight: Option<(Instant, TelemetryStream<F>)>, in_flight: Option<(Instant, TelemetryStream<F>)>,
report_timeout: Duration,
handle: Handle,
} }
impl<T, F> Telemetry<T, F> impl<T, F> Telemetry<T, F>
@ -34,10 +38,12 @@ where
F: Future<Item = ::http::Response<::tower_h2::RecvBody>>, F: Future<Item = ::http::Response<::tower_h2::RecvBody>>,
F::Error: ::std::fmt::Debug, F::Error: ::std::fmt::Debug,
{ {
pub fn new(reports: T) -> Self { pub fn new(reports: T, report_timeout: Duration, handle: &Handle) -> Self {
Telemetry { Telemetry {
reports, reports,
in_flight: None, in_flight: None,
report_timeout,
handle: handle.clone(),
} }
} }
@ -50,6 +56,7 @@ where
Future = F, Future = F,
>, >,
{ {
let client = Timeout::new(client, self.report_timeout, &self.handle);
let grpc = tower_grpc::Client::new(Protobuf::new(), client); let grpc = tower_grpc::Client::new(Protobuf::new(), client);
let mut rpc = ReportRpc::new(grpc); let mut rpc = ReportRpc::new(grpc);
@ -60,8 +67,9 @@ where
if let Some((t0, mut fut)) = self.in_flight.take() { if let Some((t0, mut fut)) = self.in_flight.take() {
match fut.poll() { match fut.poll() {
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
// TODO: can we just move this logging logic to `Timeout`?
trace!("report in flight to controller for {:?}", t0.elapsed()); trace!("report in flight to controller for {:?}", t0.elapsed());
self.in_flight = Some((t0, fut)); self.in_flight = Some((t0, fut))
} }
Ok(Async::Ready(_)) => { Ok(Async::Ready(_)) => {
trace!("report sent to controller in {:?}", t0.elapsed()) trace!("report sent to controller in {:?}", t0.elapsed())

View File

@ -69,6 +69,7 @@ mod map_err;
mod outbound; mod outbound;
mod telemetry; mod telemetry;
mod transport; mod transport;
pub mod timeout;
mod tower_fn; // TODO: move to tower-fn mod tower_fn; // TODO: move to tower-fn
use bind::Bind; use bind::Bind;
@ -242,8 +243,13 @@ impl Main {
.make_control(&taps, &executor) .make_control(&taps, &executor)
.expect("bad news in telemetry town"); .expect("bad news in telemetry town");
let client = let client = control_bg.bind(
control_bg.bind(telemetry, control_host_and_port, dns_config, &executor); telemetry,
control_host_and_port,
dns_config,
config.report_timeout,
&executor
);
let fut = client.join(server.map_err(|_| {})).map(|_| {}); let fut = client.join(server.map_err(|_| {})).map(|_| {});
executor.spawn(::logging::context_future("controller-client", fut)); executor.spawn(::logging::context_future("controller-client", fut));

179
proxy/src/timeout.rs Normal file
View File

@ -0,0 +1,179 @@
// #![deny(missing_docs)]
use futures::{Async, Future, Poll};
use std::error::Error;
use std::fmt;
use std::time::Duration;
use tokio_connect::Connect;
use tokio_core::reactor::{Timeout as ReactorTimeout, Handle};
use tower::Service;
/// A timeout that wraps an underlying operation.
#[derive(Debug, Clone)]
pub struct Timeout<U> {
inner: U,
duration: Duration,
handle: Handle,
}
/// An error representing that an operation timed out.
#[derive(Debug)]
pub enum TimeoutError<E> {
/// Indicates the underlying operation timed out.
Timeout(Duration),
/// Indicates that the underlying operation failed.
Error(E),
}
/// A `Future` wrapped with a `Timeout`.
pub struct TimeoutFuture<F> {
inner: F,
duration: Duration,
timeout: ReactorTimeout,
}
//===== impl Timeout =====
impl<U> Timeout<U> {
/// Construct a new `Timeout` wrapping `inner`.
pub fn new(inner: U, duration: Duration, handle: &Handle) -> Self {
Timeout {
inner,
duration,
handle: handle.clone(),
}
}
}
impl<S, T, E> Service for Timeout<S>
where
S: Service<Response=T, Error=E>,
// E: Error,
{
type Request = S::Request;
type Response = T;
type Error = TimeoutError<E>;
type Future = TimeoutFuture<S::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready().map_err(Self::Error::from)
}
fn call(&mut self, req: Self::Request) -> Self::Future {
let duration = self.duration;
// TODO: should this panic or wrap the error?
let timeout = ReactorTimeout::new(duration, &self.handle)
.expect("failed to create timeout!");
let inner = self.inner.call(req);
TimeoutFuture {
inner,
duration,
timeout,
}
}
}
impl<C> Connect for Timeout<C>
where
C: Connect,
// C::Error: Error,
{
type Connected = C::Connected;
type Error = TimeoutError<C::Error>;
type Future = TimeoutFuture<C::Future>;
fn connect(&self) -> Self::Future {
let duration = self.duration;
// TODO: should this panic or wrap the error?
let timeout = ReactorTimeout::new(duration, &self.handle)
.expect("failed to create timeout!");
let inner = self.inner.connect();
TimeoutFuture {
inner,
duration,
timeout,
}
}
}
//===== impl TimeoutError =====
impl<E> From<E> for TimeoutError<E> {
#[inline] fn from(error: E) -> Self {
TimeoutError::Error(error)
}
}
impl<E> fmt::Display for TimeoutError<E>
where
E: fmt::Display
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
TimeoutError::Timeout(ref duration) =>
// TODO: format the duration nicer.
write!(f, "operation timed out after {:?}", duration),
TimeoutError::Error(ref err) =>
write!(f, "inner operation failed: {}", err),
}
}
}
impl<E> Error for TimeoutError<E>
where
E: Error
{
fn cause(&self) -> Option<&Error> {
match *self {
TimeoutError::Error(ref err) => Some(err),
_ => None,
}
}
fn description(&self) -> &str {
match *self {
TimeoutError::Timeout(_) => "operation timed out",
TimeoutError::Error(ref err) => err.description(),
}
}
}
//===== impl TimeoutFuture =====
impl<F> Future for TimeoutFuture<F>
where
F: Future,
// F::Error: Error,
{
type Item = F::Item;
type Error = TimeoutError<F::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(item) = self.inner.poll().map_err(TimeoutError::from)? {
Ok(Async::Ready(item))
} else if let Async::Ready(_) = self.timeout.poll().expect("timer failed") {
Err(TimeoutError::Timeout(self.duration))
} else {
Ok(Async::NotReady)
}
}
}
// We have to provide a custom implementation of Debug, because
// tokio_core::reactor::Timeout is not Debug.
impl<F> fmt::Debug for TimeoutFuture<F>
where
F: fmt::Debug
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("TimeoutFuture")
.field("inner", &self.inner)
.field("duration", &self.duration)
.finish()
}
}

View File

@ -1,14 +1,14 @@
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use tokio_connect; use tokio_connect;
use tokio_core::net::{TcpStream, TcpStreamNew}; use tokio_core::net::{TcpStream, TcpStreamNew};
use tokio_core::reactor::{Handle, Timeout}; use tokio_core::reactor::Handle;
use url; use url;
use std::io; use std::io;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::time::Duration;
use dns; use dns;
use ::timeout;
#[must_use = "futures do nothing unless polled"] #[must_use = "futures do nothing unless polled"]
pub struct TcpStreamNewNoDelay(TcpStreamNew); pub struct TcpStreamNewNoDelay(TcpStreamNew);
@ -26,24 +26,8 @@ pub struct LookupAddressAndConnect {
handle: Handle, handle: Handle,
} }
#[derive(Debug, Clone)] pub type TimeoutConnect<C> = timeout::Timeout<C>;
pub struct TimeoutConnect<C> { pub type TimeoutError<E> = timeout::TimeoutError<E>;
connect: C,
timeout: Duration,
handle: Handle,
}
pub struct TimeoutConnectFuture<F> {
connect: F,
duration: Duration,
timeout: Timeout,
}
#[derive(Debug)]
pub enum TimeoutError<E> {
Timeout(Duration),
Connect(E),
}
// ===== impl TcpStreamNewNoDelay ===== // ===== impl TcpStreamNewNoDelay =====
@ -127,50 +111,3 @@ impl tokio_connect::Connect for LookupAddressAndConnect {
Box::new(c) Box::new(c)
} }
} }
// ===== impl TimeoutConnect =====
impl<C: tokio_connect::Connect> TimeoutConnect<C> {
/// Returns a `Connect` to `addr` and `handle`.
pub fn new(connect: C, timeout: Duration, handle: &Handle) -> Self {
Self {
connect,
timeout,
handle: handle.clone(),
}
}
}
impl<C: tokio_connect::Connect> tokio_connect::Connect for TimeoutConnect<C> {
type Connected = C::Connected;
type Error = TimeoutError<C::Error>;
type Future = TimeoutConnectFuture<C::Future>;
fn connect(&self) -> Self::Future {
let connect = self.connect.connect();
let duration = self.timeout;
let timeout = Timeout::new(duration, &self.handle).unwrap();
TimeoutConnectFuture {
connect,
duration,
timeout,
}
}
}
impl<F: Future> Future for TimeoutConnectFuture<F> {
type Item = F::Item;
type Error = TimeoutError<F::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(tcp) = self.connect.poll().map_err(TimeoutError::Connect)? {
return Ok(Async::Ready(tcp));
}
if let Async::Ready(_) = self.timeout.poll().expect("timer failed") {
return Err(TimeoutError::Timeout(self.duration));
}
Ok(Async::NotReady)
}
}