diff --git a/proxy/src/config.rs b/proxy/src/config.rs index c6e737fd1..b1763b36d 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -49,6 +49,9 @@ pub struct Config { /// Timeout after which to cancel telemetry reports. pub report_timeout: Duration, + /// Timeout after which to cancel binding a request. + pub bind_timeout: Duration, + pub pod_name: Option, pub pod_namespace: Option, pub pod_zone: Option, @@ -139,6 +142,7 @@ pub const ENV_PUBLIC_LISTENER: &str = "CONDUIT_PROXY_PUBLIC_LISTENER"; pub const ENV_CONTROL_LISTENER: &str = "CONDUIT_PROXY_CONTROL_LISTENER"; const ENV_PRIVATE_CONNECT_TIMEOUT: &str = "CONDUIT_PROXY_PRIVATE_CONNECT_TIMEOUT"; const ENV_PUBLIC_CONNECT_TIMEOUT: &str = "CONDUIT_PROXY_PUBLIC_CONNECT_TIMEOUT"; +pub const ENV_BIND_TIMEOUT: &str = "CONDUIT_PROXY_BIND_TIMEOUT"; const ENV_NODE_NAME: &str = "CONDUIT_PROXY_NODE_NAME"; const ENV_POD_NAME: &str = "CONDUIT_PROXY_POD_NAME"; @@ -157,6 +161,7 @@ const DEFAULT_PRIVATE_LISTENER: &str = "tcp://127.0.0.1:4140"; const DEFAULT_PUBLIC_LISTENER: &str = "tcp://0.0.0.0:4143"; const DEFAULT_CONTROL_LISTENER: &str = "tcp://0.0.0.0:4190"; const DEFAULT_PRIVATE_CONNECT_TIMEOUT_MS: u64 = 20; +const DEFAULT_BIND_TIMEOUT_MS: u64 = 10_000; // ten seconds, as in Linkerd. const DEFAULT_RESOLV_CONF: &str = "/etc/resolv.conf"; // ===== impl Config ===== @@ -174,6 +179,7 @@ impl<'a> TryFrom<&'a Strings> for Config { let private_forward = parse(strings, ENV_PRIVATE_FORWARD, str::parse); let public_connect_timeout = parse(strings, ENV_PUBLIC_CONNECT_TIMEOUT, parse_number); let private_connect_timeout = parse(strings, ENV_PRIVATE_CONNECT_TIMEOUT, parse_number); + let bind_timeout = parse(strings, ENV_BIND_TIMEOUT, parse_number); let resolv_conf_path = strings.get(ENV_RESOLV_CONF); let event_buffer_capacity = parse(strings, ENV_EVENT_BUFFER_CAPACITY, parse_number); let metrics_flush_interval_secs = @@ -226,6 +232,8 @@ impl<'a> TryFrom<&'a Strings> for Config { .unwrap_or(DEFAULT_METRICS_FLUSH_INTERVAL_SECS)), report_timeout: Duration::from_secs(report_timeout?.unwrap_or(DEFAULT_REPORT_TIMEOUT_SECS)), + bind_timeout: + Duration::from_millis(bind_timeout?.unwrap_or(DEFAULT_BIND_TIMEOUT_MS)), pod_name: pod_name?, pod_namespace: pod_namespace?, pod_zone: pod_zone?, diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index b56920cee..456d2dacd 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -223,7 +223,9 @@ where bind, control, config.default_destination_namespace().cloned(), - config.default_destination_zone().cloned()); + config.default_destination_zone().cloned(), + config.bind_timeout, + ); let fut = serve( outbound_listener, diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index 5d1c5bf94..394ca58df 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -1,4 +1,5 @@ use std::net::SocketAddr; +use std::time::Duration; use futures::{Async, Poll}; use http; @@ -17,6 +18,7 @@ use control::{self, discovery}; use control::discovery::Bind as BindTrait; use ctx; use fully_qualified_authority::FullyQualifiedAuthority; +use timeout::Timeout; type BindProtocol = bind::BindProtocol, B>; @@ -25,6 +27,7 @@ pub struct Outbound { discovery: control::Control, default_namespace: Option, default_zone: Option, + bind_timeout: Duration, } const MAX_IN_FLIGHT: usize = 10_000; @@ -32,14 +35,18 @@ const MAX_IN_FLIGHT: usize = 10_000; // ===== impl Outbound ===== impl Outbound { - pub fn new(bind: Bind, B>, discovery: control::Control, - default_namespace: Option, default_zone: Option) + pub fn new(bind: Bind, B>, + discovery: control::Control, + default_namespace: Option, + default_zone: Option, + bind_timeout: Duration,) -> Outbound { Self { bind, discovery, default_namespace, default_zone, + bind_timeout, } } } @@ -59,10 +66,10 @@ where type Error = ::Error; type Key = (Destination, Protocol); type RouteError = (); - type Service = InFlightLimit>, - choose::PowerOfTwoChoices, - >>>; + choose::PowerOfTwoChoices + >>>>; fn recognize(&self, req: &Self::Request) -> Option { let local = req.uri().authority_part().and_then(|authority| { @@ -131,11 +138,16 @@ where let balance = tower_balance::power_of_two_choices(loaded, rand::thread_rng()); - Buffer::new(balance, self.bind.executor()) - .map(|buffer| { - InFlightLimit::new(buffer, MAX_IN_FLIGHT) - }) - .map_err(|_| {}) + // use the same executor as the underlying `Bind` for the `Buffer` and + // `Timeout`. + let handle = self.bind.executor(); + + let buffer = Buffer::new(balance, handle).map_err(|_| {})?; + + let timeout = Timeout::new(buffer, self.bind_timeout, handle); + + Ok(InFlightLimit::new(timeout, MAX_IN_FLIGHT)) + } } diff --git a/proxy/src/timeout.rs b/proxy/src/timeout.rs index 5c605781b..ecc2fb0fd 100644 --- a/proxy/src/timeout.rs +++ b/proxy/src/timeout.rs @@ -11,7 +11,6 @@ use tokio_core::reactor::{Timeout as ReactorTimeout, Handle}; use tokio_io; use tower::Service; - /// A timeout that wraps an underlying operation. #[derive(Debug, Clone)] pub struct Timeout { @@ -53,7 +52,6 @@ impl Timeout { impl Service for Timeout where S: Service, - // E: Error, { type Request = S::Request; type Response = T; @@ -66,9 +64,8 @@ where fn call(&mut self, req: Self::Request) -> Self::Future { let duration = self.duration; - // TODO: should this panic or wrap the error? let timeout = ReactorTimeout::new(duration, &self.handle) - .expect("failed to create timeout!"); + .expect("reactor gone"); let inner = self.inner.call(req); TimeoutFuture { inner, @@ -82,7 +79,6 @@ where impl Connect for Timeout where C: Connect, - // C::Error: Error, { type Connected = C::Connected; type Error = TimeoutError; @@ -90,9 +86,8 @@ where fn connect(&self) -> Self::Future { let duration = self.duration; - // TODO: should this panic or wrap the error? let timeout = ReactorTimeout::new(duration, &self.handle) - .expect("failed to create timeout!"); + .expect("reactor gone"); let inner = self.inner.connect(); TimeoutFuture { inner, diff --git a/proxy/tests/discovery.rs b/proxy/tests/discovery.rs index 1179ec005..efaa35567 100644 --- a/proxy/tests/discovery.rs +++ b/proxy/tests/discovery.rs @@ -1,6 +1,8 @@ mod support; use self::support::*; +use std::thread; + #[test] fn outbound_asks_controller_api() { let _ = env_logger::init(); @@ -55,10 +57,36 @@ fn outbound_updates_newer_services() { } #[test] -#[ignore] +#[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_times_out() { - // Currently, the outbound router will wait forever until discovery tells - // it where to send the request. It should probably time out eventually. + let _ = env_logger::init(); + let mut env = config::TestEnv::new(); + + // set the bind timeout to 100 ms. + env.put(config::ENV_BIND_TIMEOUT, "100".to_owned()); + + let srv = server::http1().route("/h1", "hello h1").run(); + let addr = srv.addr.clone(); + let ctrl = controller::new() + // when the proxy requests the destination, sleep for 500 ms, and then + // return the correct destination + .destination_fn("disco.test.svc.cluster.local", move || { + thread::sleep(Duration::from_millis(500)); + Some(controller::destination_update(addr)) + }) + .run(); + + let proxy = proxy::new() + .controller(ctrl) + .outbound(srv) + .run_with_test_env(env); + + let client = client::http2(proxy.outbound, "disco.test.svc.cluster.local"); + let mut req = client.request_builder("/"); + let rsp = client.request(req.method("GET")); + + // the request should time out + assert_eq!(rsp.status(), http::StatusCode::INTERNAL_SERVER_ERROR); } #[test] diff --git a/proxy/tests/support/controller.rs b/proxy/tests/support/controller.rs index d3b98bfc7..74bf595e2 100644 --- a/proxy/tests/support/controller.rs +++ b/proxy/tests/support/controller.rs @@ -2,6 +2,7 @@ use support::*; +use std::fmt; use std::net::IpAddr; use std::sync::{Arc, Mutex}; @@ -14,9 +15,11 @@ pub fn new() -> Controller { Controller::new() } +struct Destination(Box Option + Send>); + #[derive(Debug)] pub struct Controller { - destinations: Vec<(String, Option)>, + destinations: Vec<(String, Destination)>, reports: Option>, } @@ -35,14 +38,21 @@ impl Controller { } pub fn destination(mut self, dest: &str, addr: SocketAddr) -> Self { + self.destination_fn(dest, move || Some(destination_update(addr))) + } + + pub fn destination_fn(mut self, dest: &str, f: F) -> Self + where + F: Fn() -> Option + Send + 'static, + { self.destinations - .push((dest.into(), Some(destination_update(addr)))); + .push((dest.into(), Destination(Box::new(f)))); self } + pub fn destination_close(mut self, dest: &str) -> Self { - self.destinations.push((dest.into(), None)); - self + self.destination_fn(dest, || None) } pub fn reports(&mut self) -> mpsc::UnboundedReceiver { @@ -57,11 +67,17 @@ impl Controller { } type Response = self::http::Response; -type Destinations = Arc)>>>; +type Destinations = Arc>>; const DESTINATION_GET: &str = "/conduit.proxy.destination.Destination/Get"; const TELEMETRY_REPORT: &str = "/conduit.proxy.telemetry.Telemetry/Report"; +impl fmt::Debug for Destination { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Destination") + } +} + #[derive(Debug)] struct Svc { destinations: Destinations, @@ -85,7 +101,8 @@ impl Svc { let mut vec = destinations.lock().unwrap(); //TODO: decode `_bytes` and compare with `.0` if !vec.is_empty() { - vec.remove(0).1 + let Destination(f) = vec.remove(0).1; + f() } else { None } @@ -249,7 +266,7 @@ fn run(controller: Controller) -> Listening { } } -fn destination_update(addr: SocketAddr) -> pb::destination::Update { +pub fn destination_update(addr: SocketAddr) -> pb::destination::Update { pb::destination::Update { update: Some(pb::destination::update::Update::Add( pb::destination::WeightedAddrSet { diff --git a/proxy/tests/support/proxy.rs b/proxy/tests/support/proxy.rs index e94250bcb..eb601a752 100644 --- a/proxy/tests/support/proxy.rs +++ b/proxy/tests/support/proxy.rs @@ -58,7 +58,11 @@ impl Proxy { } pub fn run(self) -> Listening { - run(self) + self.run_with_test_env(config::TestEnv::new()) + } + + pub fn run_with_test_env(self, mut env: config::TestEnv) -> Listening { + run(self, env) } } @@ -90,7 +94,8 @@ impl conduit_proxy::GetOriginalDst for MockOriginalDst { } } -fn run(proxy: Proxy) -> Listening { + +fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening { use self::conduit_proxy::config; let controller = proxy.controller.expect("proxy controller missing"); @@ -98,7 +103,6 @@ fn run(proxy: Proxy) -> Listening { let outbound = proxy.outbound; let mut mock_orig_dst = DstInner::default(); - let mut env = config::TestEnv::new(); env.put(config::ENV_CONTROL_URL, format!("tcp://{}", controller.addr)); env.put(config::ENV_PRIVATE_LISTENER, "tcp://127.0.0.1:0".to_owned()); if let Some(ref inbound) = inbound {