diff --git a/proxy/src/bind.rs b/proxy/src/bind.rs index 539d4fe14..4d91c17e1 100644 --- a/proxy/src/bind.rs +++ b/proxy/src/bind.rs @@ -15,6 +15,7 @@ use control; use ctx; use telemetry; use transport; +use ::timeout::Timeout; const DEFAULT_TIMEOUT_MS: u64 = 300; @@ -120,6 +121,7 @@ impl Bind { // pub fn sensors(&self) -> &telemetry::Sensors { // &self.sensors // } + } impl Bind, B> @@ -132,9 +134,9 @@ where // Map a socket address to an HTTP/2.0 connection. let connect = { - let c = transport::TimeoutConnect::new( + let c = Timeout::new( transport::Connect::new(*addr, &self.executor), - self.connect_timeout, + self.connect_timeout, &self.executor, ); diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 1b836b186..914d829a2 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -40,8 +40,11 @@ pub struct Config { /// Event queue capacity. pub event_buffer_capacity: usize, - /// Interval after which to flush metrics + /// Interval after which to flush metrics. pub metrics_flush_interval: Duration, + + /// Timeout after which to cancel telemetry reports. + pub report_timeout: Duration, } /// Configuration settings for binding a listener. @@ -100,6 +103,7 @@ pub enum UrlError { // Environment variables to look at when loading the configuration const ENV_EVENT_BUFFER_CAPACITY: &str = "CONDUIT_PROXY_EVENT_BUFFER_CAPACITY"; const ENV_METRICS_FLUSH_INTERVAL_SECS: &str = "CONDUIT_PROXY_METRICS_FLUSH_INTERVAL_SECS"; +const ENV_REPORT_TIMEOUT_SECS: &str = "CONDUIT_PROXY_REPORT_TIMEOUT_SECS"; const ENV_PRIVATE_LISTENER: &str = "CONDUIT_PROXY_PRIVATE_LISTENER"; const ENV_PRIVATE_FORWARD: &str = "CONDUIT_PROXY_PRIVATE_FORWARD"; const ENV_PUBLIC_LISTENER: &str = "CONDUIT_PROXY_PUBLIC_LISTENER"; @@ -118,6 +122,7 @@ const ENV_RESOLV_CONF: &str = "CONDUIT_RESOLV_CONF"; // Default values for various configuration fields const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 10_000; // FIXME const DEFAULT_METRICS_FLUSH_INTERVAL_SECS: u64 = 10; +const DEFAULT_REPORT_TIMEOUT_SECS: u64 = 10; // TODO: is this a reasonable default? const DEFAULT_PRIVATE_LISTENER: &str = "tcp://127.0.0.1:4140"; const DEFAULT_PUBLIC_LISTENER: &str = "tcp://0.0.0.0:4143"; const DEFAULT_CONTROL_LISTENER: &str = "tcp://0.0.0.0:4190"; @@ -134,7 +139,13 @@ impl Config { let metrics_flush_interval = Duration::from_secs( env_var_parse(ENV_METRICS_FLUSH_INTERVAL_SECS, parse_number)? - .unwrap_or(DEFAULT_METRICS_FLUSH_INTERVAL_SECS)); + .unwrap_or(DEFAULT_METRICS_FLUSH_INTERVAL_SECS) + ); + + let report_timeout = Duration::from_secs( + env_var_parse(ENV_REPORT_TIMEOUT_SECS, parse_number)? + .unwrap_or(DEFAULT_REPORT_TIMEOUT_SECS) + ); Ok(Config { private_listener: Listener { @@ -166,6 +177,7 @@ impl Config { event_buffer_capacity, metrics_flush_interval, + report_timeout, }) } } diff --git a/proxy/src/control/mod.rs b/proxy/src/control/mod.rs index 8f692f3b2..6fe87fd46 100644 --- a/proxy/src/control/mod.rs +++ b/proxy/src/control/mod.rs @@ -5,14 +5,20 @@ use bytes::Bytes; use futures::{future, Async, Future, Poll, Stream}; use h2; use http; -use tokio_core::reactor::{Handle, Timeout}; +use tokio_core::reactor::{ + Handle, + // TODO: would rather just have Backoff in a separate file so this + // renaming import is not necessary. + Timeout as ReactorTimeout +}; use tower::Service; use tower_h2; use tower_reconnect::Reconnect; use url::HostAndPort; use dns; -use transport::{LookupAddressAndConnect, TimeoutConnect}; +use transport::LookupAddressAndConnect; +use timeout::Timeout; mod codec; pub mod discovery; @@ -64,6 +70,7 @@ impl Background { events: S, host_and_port: HostAndPort, dns_config: dns::Config, + report_timeout: Duration, executor: &Handle, ) -> Box> where @@ -77,11 +84,12 @@ impl Background { http::uri::Authority::from_shared(format!("{}", host_and_port).into()).unwrap(); let dns_resolver = dns::Resolver::new(dns_config, executor); - let connect = TimeoutConnect::new( + let connect = Timeout::new( LookupAddressAndConnect::new(host_and_port, dns_resolver, executor), Duration::from_secs(3), executor, ); + let h2_client = tower_h2::client::Client::new( connect, h2::client::Builder::default(), @@ -95,7 +103,7 @@ impl Background { }; let mut disco = self.disco.work(); - let mut telemetry = Telemetry::new(events); + let mut telemetry = Telemetry::new(events, report_timeout, executor); let fut = future::poll_fn(move || { trace!("poll rpc services"); @@ -114,7 +122,7 @@ impl Background { //TODO: move to tower-backoff struct Backoff { inner: S, - timer: Timeout, + timer: ReactorTimeout, waiting: bool, wait_dur: Duration, } @@ -123,7 +131,7 @@ impl Backoff { fn new(inner: S, wait_dur: Duration, handle: &Handle) -> Self { Backoff { inner, - timer: Timeout::new(wait_dur, handle).unwrap(), + timer: ReactorTimeout::new(wait_dur, handle).unwrap(), waiting: false, wait_dur, } diff --git a/proxy/src/control/telemetry.rs b/proxy/src/control/telemetry.rs index 5d1123ca0..bff954b8a 100644 --- a/proxy/src/control/telemetry.rs +++ b/proxy/src/control/telemetry.rs @@ -1,13 +1,15 @@ -use std::time::Instant; +use std::time::{Duration, Instant}; use futures::{Async, Future, Stream}; use tower::Service; use tower_grpc; +use tokio_core::reactor::Handle; use super::codec::Protobuf; use super::pb::proxy::telemetry::{ReportRequest, ReportResponse}; use super::pb::proxy::telemetry::client::Telemetry as TelemetrySvc; use super::pb::proxy::telemetry::client::telemetry_methods::Report as ReportRpc; +use ::timeout::{Timeout, TimeoutFuture}; pub type ClientBody = tower_grpc::client::codec::EncodingBody< Protobuf, @@ -16,7 +18,7 @@ pub type ClientBody = tower_grpc::client::codec::EncodingBody< type TelemetryStream = tower_grpc::client::BodyFuture< tower_grpc::client::Unary< - tower_grpc::client::ResponseFuture, F>, + tower_grpc::client::ResponseFuture, TimeoutFuture>, Protobuf, >, >; @@ -25,6 +27,8 @@ type TelemetryStream = tower_grpc::client::BodyFuture< pub struct Telemetry { reports: T, in_flight: Option<(Instant, TelemetryStream)>, + report_timeout: Duration, + handle: Handle, } impl Telemetry @@ -34,10 +38,12 @@ where F: Future>, F::Error: ::std::fmt::Debug, { - pub fn new(reports: T) -> Self { + pub fn new(reports: T, report_timeout: Duration, handle: &Handle) -> Self { Telemetry { reports, in_flight: None, + report_timeout, + handle: handle.clone(), } } @@ -50,6 +56,7 @@ where Future = F, >, { + let client = Timeout::new(client, self.report_timeout, &self.handle); let grpc = tower_grpc::Client::new(Protobuf::new(), client); let mut rpc = ReportRpc::new(grpc); @@ -60,8 +67,9 @@ where if let Some((t0, mut fut)) = self.in_flight.take() { match fut.poll() { Ok(Async::NotReady) => { + // TODO: can we just move this logging logic to `Timeout`? trace!("report in flight to controller for {:?}", t0.elapsed()); - self.in_flight = Some((t0, fut)); + self.in_flight = Some((t0, fut)) } Ok(Async::Ready(_)) => { trace!("report sent to controller in {:?}", t0.elapsed()) diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index f38b00f41..0f003d9cf 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -69,6 +69,7 @@ mod map_err; mod outbound; mod telemetry; mod transport; +pub mod timeout; mod tower_fn; // TODO: move to tower-fn use bind::Bind; @@ -242,8 +243,13 @@ impl Main { .make_control(&taps, &executor) .expect("bad news in telemetry town"); - let client = - control_bg.bind(telemetry, control_host_and_port, dns_config, &executor); + let client = control_bg.bind( + telemetry, + control_host_and_port, + dns_config, + config.report_timeout, + &executor + ); let fut = client.join(server.map_err(|_| {})).map(|_| {}); executor.spawn(::logging::context_future("controller-client", fut)); diff --git a/proxy/src/timeout.rs b/proxy/src/timeout.rs new file mode 100644 index 000000000..254b2e07f --- /dev/null +++ b/proxy/src/timeout.rs @@ -0,0 +1,179 @@ +// #![deny(missing_docs)] +use futures::{Async, Future, Poll}; + +use std::error::Error; +use std::fmt; +use std::time::Duration; + +use tokio_connect::Connect; +use tokio_core::reactor::{Timeout as ReactorTimeout, Handle}; +use tower::Service; + + +/// A timeout that wraps an underlying operation. +#[derive(Debug, Clone)] +pub struct Timeout { + inner: U, + duration: Duration, + handle: Handle, +} + +/// An error representing that an operation timed out. +#[derive(Debug)] +pub enum TimeoutError { + /// Indicates the underlying operation timed out. + Timeout(Duration), + /// Indicates that the underlying operation failed. + Error(E), +} + +/// A `Future` wrapped with a `Timeout`. +pub struct TimeoutFuture { + inner: F, + duration: Duration, + timeout: ReactorTimeout, +} + +//===== impl Timeout ===== + +impl Timeout { + + /// Construct a new `Timeout` wrapping `inner`. + pub fn new(inner: U, duration: Duration, handle: &Handle) -> Self { + Timeout { + inner, + duration, + handle: handle.clone(), + } + } + +} + +impl Service for Timeout +where + S: Service, + // E: Error, +{ + type Request = S::Request; + type Response = T; + type Error = TimeoutError; + type Future = TimeoutFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.inner.poll_ready().map_err(Self::Error::from) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + let duration = self.duration; + // TODO: should this panic or wrap the error? + let timeout = ReactorTimeout::new(duration, &self.handle) + .expect("failed to create timeout!"); + let inner = self.inner.call(req); + TimeoutFuture { + inner, + duration, + timeout, + } + } +} + + +impl Connect for Timeout +where + C: Connect, + // C::Error: Error, +{ + type Connected = C::Connected; + type Error = TimeoutError; + type Future = TimeoutFuture; + + fn connect(&self) -> Self::Future { + let duration = self.duration; + // TODO: should this panic or wrap the error? + let timeout = ReactorTimeout::new(duration, &self.handle) + .expect("failed to create timeout!"); + let inner = self.inner.connect(); + TimeoutFuture { + inner, + duration, + timeout, + } + } +} + + +//===== impl TimeoutError ===== + +impl From for TimeoutError { + #[inline] fn from(error: E) -> Self { + TimeoutError::Error(error) + } +} + +impl fmt::Display for TimeoutError +where + E: fmt::Display +{ + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + TimeoutError::Timeout(ref duration) => + // TODO: format the duration nicer. + write!(f, "operation timed out after {:?}", duration), + TimeoutError::Error(ref err) => + write!(f, "inner operation failed: {}", err), + } + } +} + +impl Error for TimeoutError +where + E: Error +{ + fn cause(&self) -> Option<&Error> { + match *self { + TimeoutError::Error(ref err) => Some(err), + _ => None, + } + } + + fn description(&self) -> &str { + match *self { + TimeoutError::Timeout(_) => "operation timed out", + TimeoutError::Error(ref err) => err.description(), + } + } +} + +//===== impl TimeoutFuture ===== + +impl Future for TimeoutFuture +where + F: Future, + // F::Error: Error, +{ + type Item = F::Item; + type Error = TimeoutError; + fn poll(&mut self) -> Poll { + if let Async::Ready(item) = self.inner.poll().map_err(TimeoutError::from)? { + Ok(Async::Ready(item)) + } else if let Async::Ready(_) = self.timeout.poll().expect("timer failed") { + Err(TimeoutError::Timeout(self.duration)) + } else { + Ok(Async::NotReady) + } + } +} + +// We have to provide a custom implementation of Debug, because +// tokio_core::reactor::Timeout is not Debug. +impl fmt::Debug for TimeoutFuture +where + F: fmt::Debug +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("TimeoutFuture") + .field("inner", &self.inner) + .field("duration", &self.duration) + .finish() + } +} \ No newline at end of file diff --git a/proxy/src/transport/connect.rs b/proxy/src/transport/connect.rs index 884747900..82c800f0a 100644 --- a/proxy/src/transport/connect.rs +++ b/proxy/src/transport/connect.rs @@ -1,14 +1,14 @@ use futures::{Async, Future, Poll}; use tokio_connect; use tokio_core::net::{TcpStream, TcpStreamNew}; -use tokio_core::reactor::{Handle, Timeout}; +use tokio_core::reactor::Handle; use url; use std::io; use std::net::{IpAddr, SocketAddr}; -use std::time::Duration; use dns; +use ::timeout; #[must_use = "futures do nothing unless polled"] pub struct TcpStreamNewNoDelay(TcpStreamNew); @@ -26,24 +26,8 @@ pub struct LookupAddressAndConnect { handle: Handle, } -#[derive(Debug, Clone)] -pub struct TimeoutConnect { - connect: C, - timeout: Duration, - handle: Handle, -} - -pub struct TimeoutConnectFuture { - connect: F, - duration: Duration, - timeout: Timeout, -} - -#[derive(Debug)] -pub enum TimeoutError { - Timeout(Duration), - Connect(E), -} +pub type TimeoutConnect = timeout::Timeout; +pub type TimeoutError = timeout::TimeoutError; // ===== impl TcpStreamNewNoDelay ===== @@ -126,51 +110,4 @@ impl tokio_connect::Connect for LookupAddressAndConnect { }); Box::new(c) } -} - -// ===== impl TimeoutConnect ===== - -impl TimeoutConnect { - /// Returns a `Connect` to `addr` and `handle`. - pub fn new(connect: C, timeout: Duration, handle: &Handle) -> Self { - Self { - connect, - timeout, - handle: handle.clone(), - } - } -} - -impl tokio_connect::Connect for TimeoutConnect { - type Connected = C::Connected; - type Error = TimeoutError; - type Future = TimeoutConnectFuture; - - fn connect(&self) -> Self::Future { - let connect = self.connect.connect(); - let duration = self.timeout; - let timeout = Timeout::new(duration, &self.handle).unwrap(); - TimeoutConnectFuture { - connect, - duration, - timeout, - } - } -} - -impl Future for TimeoutConnectFuture { - type Item = F::Item; - type Error = TimeoutError; - - fn poll(&mut self) -> Poll { - if let Async::Ready(tcp) = self.connect.poll().map_err(TimeoutError::Connect)? { - return Ok(Async::Ready(tcp)); - } - - if let Async::Ready(_) = self.timeout.poll().expect("timer failed") { - return Err(TimeoutError::Timeout(self.duration)); - } - - Ok(Async::NotReady) - } -} +} \ No newline at end of file