diff --git a/proxy/router/src/lib.rs b/proxy/router/src/lib.rs index 4b9b0bfc2..1abe59fa6 100644 --- a/proxy/router/src/lib.rs +++ b/proxy/router/src/lib.rs @@ -11,14 +11,19 @@ use std::convert::AsRef; use std::hash::Hash; use std::sync::{Arc, Mutex}; -/// Route requests based on the request authority +/// Routes requests based on a configurable `Key`. pub struct Router where T: Recognize, { inner: Arc>>, } -/// Route a request based on an authority +/// Provides a strategy for routing a Request to a Service. +/// +/// Implementors must provide a `Key` type that identifies each unique route. The +/// `recognize()` method is used to determine the key for a given request. This key is +/// used to look up a route in a cache (i.e. in `Router`), or can be passed to +/// `bind_service` to instantiate the identified route. pub trait Recognize { /// Requests handled by the discovered services type Request; @@ -29,21 +34,21 @@ pub trait Recognize { /// Errors produced by the discovered services type Error; - /// Key + /// Identifies a Route. type Key: Clone + Eq + Hash; /// Error produced by failed routing type RouteError; - /// The discovered `Service` instance. + /// A route. type Service: Service; - /// Obtains a Key for a request. + /// Determines the key for a route to handle the given request. fn recognize(&self, req: &Self::Request) -> Option>; - /// Return a `Service` to handle requests from the provided authority. + /// Return a `Service` to handle requests. /// /// The returned service must always be in the ready state (i.e. /// `poll_ready` must always return `Ready` or `Err`). @@ -64,10 +69,11 @@ pub enum Reuse { SingleUse(T), } -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum Error { Inner(T), Route(U), + NoCapacity(usize), NotRecognized, } @@ -82,6 +88,7 @@ where T: Recognize, { routes: IndexMap, recognize: T, + capacity: usize, } enum State @@ -89,6 +96,7 @@ where T: Recognize, { Inner(::Future), RouteError(T::RouteError), + NoCapacity(usize), NotRecognized, Invalid, } @@ -98,16 +106,26 @@ where T: Recognize, impl Router where T: Recognize { - pub fn new(recognize: T) -> Self { + pub fn new(recognize: T, capacity: usize) -> Self { Router { inner: Arc::new(Mutex::new(Inner { - routes: Default::default(), + routes: IndexMap::default(), recognize, + capacity, })), } } } +macro_rules! try_bind_route { + ( $bind:expr ) => { + match $bind { + Ok(svc) => svc, + Err(e) => return ResponseFuture { state: State::RouteError(e) }, + } + } +} + impl Service for Router where T: Recognize, { @@ -116,42 +134,55 @@ where T: Recognize, type Error = Error; type Future = ResponseFuture; + /// Always ready to serve. + /// + /// Graceful backpressure is **not** supported at this level, since each request may + /// be routed to different resources. Instead, requests should be issued and each + /// route should support a queue of requests. + /// + /// TODO Attempt to free capacity in the router. fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(().into()) } + /// Routes the request through an underlying service. + /// + /// The response fails if the request cannot be routed. fn call(&mut self, request: Self::Request) -> Self::Future { let inner = &mut *self.inner.lock().expect("lock router cache"); - let key = match inner.recognize.recognize(&request) { - Some(k) => k, - None => { - return ResponseFuture { state: State::NotRecognized }; - } - }; + match inner.recognize.recognize(&request) { + None => ResponseFuture::not_recognized(), - // Is the bound service for that key reusable? If `recognize` - // returned `SingleUse`, that indicates that the service may - // not be used to serve multiple requests. - if let Reuse::Reusable(ref k) = key { - if let Some(service) = inner.routes.get_mut(k) { + Some(Reuse::SingleUse(key)) => { + // TODO Keep SingleUse services in the cache as well, so that their + // capacity is considered. To do this, we should move the Reuse logic into + // the returned service (and not the key). + let mut service = try_bind_route!(inner.recognize.bind_service(&key)); + ResponseFuture::new(service.call(request)) + } + + Some(Reuse::Reusable(key)) => { + // First, try to load a cached route for `key`. + if let Some(service) = inner.routes.get_mut(&key) { + return ResponseFuture::new(service.call(request)); + } + + // Since there wasn't a cached route, ensure that there is capacity for a + // new one. + if inner.routes.len() == inner.capacity { + // TODO If the cache is full, evict the oldest inactive route. If all + // routes are active, fail the request. + return ResponseFuture::no_capacity(inner.capacity); + } + + // Bind a new route, send the request on the route, and cache the route. + let mut service = try_bind_route!(inner.recognize.bind_service(&key)); let response = service.call(request); - return ResponseFuture { state: State::Inner(response) }; + inner.routes.insert(key, service); + ResponseFuture::new(response) } - }; - - let mut service = match inner.recognize.bind_service(key.as_ref()) { - Ok(s) => s, - Err(e) => { - return ResponseFuture { state: State::RouteError(e) }; - } - }; - - let response = service.call(request); - if let Reuse::Reusable(k) = key { - inner.routes.insert(k, service); } - ResponseFuture { state: State::Inner(response) } } } @@ -163,8 +194,6 @@ where T: Recognize, } } -// ===== impl Recognize ===== - // ===== impl Single ===== impl Single { @@ -192,6 +221,22 @@ impl Recognize for Single { // ===== impl ResponseFuture ===== +impl ResponseFuture +where T: Recognize, +{ + fn new(inner: ::Future) -> Self { + ResponseFuture { state: State::Inner(inner) } + } + + fn not_recognized() -> Self { + ResponseFuture { state: State::NotRecognized } + } + + fn no_capacity(capacity: usize) -> Self { + ResponseFuture { state: State::NoCapacity(capacity) } + } +} + impl Future for ResponseFuture where T: Recognize, { @@ -210,12 +255,13 @@ where T: Recognize, } } NotRecognized => Err(Error::NotRecognized), + NoCapacity(capacity) => Err(Error::NoCapacity(capacity)), Invalid => panic!(), } } } -// ===== impl RouteError ===== +// ===== impl Error ===== impl fmt::Display for Error where @@ -228,6 +274,7 @@ where Error::Route(ref why) => write!(f, "route recognition failed: {}", why), Error::NotRecognized => f.pad("route not recognized"), + Error::NoCapacity(capacity) => write!(f, "router capacity reached ({})", capacity), } } } @@ -249,6 +296,7 @@ where match *self { Error::Inner(_) => "inner service error", Error::Route(_) => "route recognition failed", + Error::NoCapacity(_) => "router capacity reached", Error::NotRecognized => "route not recognized", } } @@ -264,3 +312,124 @@ impl AsRef for Reuse { } } } + +#[cfg(test)] +mod tests { + use futures::{Poll, Future, future}; + use tower_service::Service; + use super::{Error, Reuse, Router}; + + struct Recognize; + + struct MultiplyAndAssign(usize); + + enum Request { + NotRecognized, + Reusable(usize), + SingleUse(usize), + } + + impl super::Recognize for Recognize { + type Request = Request; + type Response = usize; + type Error = (); + type Key = usize; + type RouteError = (); + type Service = MultiplyAndAssign; + + fn recognize(&self, req: &Self::Request) -> Option> { + match *req { + Request::NotRecognized => None, + Request::Reusable(n) => Some(Reuse::Reusable(n)), + Request::SingleUse(n) => Some(Reuse::SingleUse(n)), + } + } + + fn bind_service(&mut self, _: &Self::Key) -> Result { + Ok(MultiplyAndAssign(1)) + } + } + + impl Service for MultiplyAndAssign { + type Request = Request; + type Response = usize; + type Error = (); + type Future = future::FutureResult; + + fn poll_ready(&mut self) -> Poll<(), ()> { + unimplemented!() + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + let n = match req { + Request::NotRecognized => unreachable!(), + Request::Reusable(n) => n, + Request::SingleUse(n) => n, + }; + self.0 *= n; + future::ok(self.0) + } + } + + impl Router { + fn call_ok(&mut self, req: Request) -> usize { + self.call(req).wait().expect("should route") + } + + fn call_err(&mut self, req: Request) -> super::Error<(), ()> { + self.call(req).wait().expect_err("should not route") + } + } + + #[test] + fn invalid() { + let mut router = Router::new(Recognize, 1); + + let rsp = router.call_err(Request::NotRecognized); + assert_eq!(rsp, Error::NotRecognized); + } + + #[test] + fn reuse_limited_by_capacity() { + let mut router = Router::new(Recognize, 1); + + let rsp = router.call_ok(Request::Reusable(2)); + assert_eq!(rsp, 2); + + let rsp = router.call_err(Request::Reusable(3)); + assert_eq!(rsp, Error::NoCapacity(1)); + } + + #[test] + fn reuse_shares_service() { + let mut router = Router::new(Recognize, 1); + + let rsp = router.call_ok(Request::Reusable(2)); + assert_eq!(rsp, 2); + + let rsp = router.call_ok(Request::Reusable(2)); + assert_eq!(rsp, 4); + } + + #[test] + fn single_use_does_not_share_service() { + let mut router = Router::new(Recognize, 1); + + let rsp = router.call_ok(Request::SingleUse(2)); + assert_eq!(rsp, 2); + + let rsp = router.call_ok(Request::SingleUse(2)); + assert_eq!(rsp, 2); + } + + #[test] + fn single_use_not_cached_or_limited_by_capacity() { + let mut router = Router::new(Recognize, 1); + + let rsp = router.call_ok(Request::Reusable(2)); + assert_eq!(rsp, 2); + + let rsp = router.call_ok(Request::SingleUse(2)); + assert_eq!(rsp, 2); + } +} diff --git a/proxy/src/config.rs b/proxy/src/config.rs index b1aea398d..84d49ed70 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -44,6 +44,10 @@ pub struct Config { pub outbound_ports_disable_protocol_detection: IndexSet, + pub inbound_router_capacity: usize, + + pub outbound_router_capacity: usize, + /// The path to "/etc/resolv.conf" pub resolv_conf_path: PathBuf, @@ -136,6 +140,12 @@ const ENV_PRIVATE_CONNECT_TIMEOUT: &str = "CONDUIT_PROXY_PRIVATE_CONNECT_TIMEOUT const ENV_PUBLIC_CONNECT_TIMEOUT: &str = "CONDUIT_PROXY_PUBLIC_CONNECT_TIMEOUT"; pub const ENV_BIND_TIMEOUT: &str = "CONDUIT_PROXY_BIND_TIMEOUT"; +// Limits the number of HTTP routes that may be active in the proxy at any time. There is +// an inbound route for each local port that receives connections. There is an outbound +// route for each protocol and authority. +pub const ENV_INBOUND_ROUTER_CAPACITY: &str = "CONDUIT_PROXY_INBOUND_ROUTER_CAPACITY"; +pub const ENV_OUTBOUND_ROUTER_CAPACITY: &str = "CONDUIT_PROXY_OUTBOUND_ROUTER_CAPACITY"; + // These *disable* our protocol detection for connections whose SO_ORIGINAL_DST // has a port in the provided list. pub const ENV_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION: &str = "CONDUIT_PROXY_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION"; @@ -158,6 +168,11 @@ const DEFAULT_PUBLIC_CONNECT_TIMEOUT_MS: u64 = 300; const DEFAULT_BIND_TIMEOUT_MS: u64 = 10_000; // ten seconds, as in Linkerd. const DEFAULT_RESOLV_CONF: &str = "/etc/resolv.conf"; +/// It's assumed that a typical proxy can serve inbound traffic for up to 100 pod-local +/// HTTP services and may communicate with up to 10K external HTTP domains. +const DEFAULT_INBOUND_ROUTER_CAPACITY: usize = 100; +const DEFAULT_OUTBOUND_ROUTER_CAPACITY: usize = 10_000; + // By default, we keep a list of known assigned ports of server-first protocols. // // https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt @@ -184,6 +199,8 @@ impl<'a> TryFrom<&'a Strings> for Config { let private_connect_timeout = parse(strings, ENV_PRIVATE_CONNECT_TIMEOUT, parse_number); let inbound_disable_ports = parse(strings, ENV_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION, parse_port_set); let outbound_disable_ports = parse(strings, ENV_OUTBOUND_PORTS_DISABLE_PROTOCOL_DETECTION, parse_port_set); + let inbound_router_capacity = parse(strings, ENV_INBOUND_ROUTER_CAPACITY, parse_number); + let outbound_router_capacity = parse(strings, ENV_OUTBOUND_ROUTER_CAPACITY, parse_number); let bind_timeout = parse(strings, ENV_BIND_TIMEOUT, parse_number); let resolv_conf_path = strings.get(ENV_RESOLV_CONF); let event_buffer_capacity = parse(strings, ENV_EVENT_BUFFER_CAPACITY, parse_number); @@ -236,6 +253,12 @@ impl<'a> TryFrom<&'a Strings> for Config { .unwrap_or_else(|| default_disable_ports_protocol_detection()), outbound_ports_disable_protocol_detection: outbound_disable_ports? .unwrap_or_else(|| default_disable_ports_protocol_detection()), + + inbound_router_capacity: inbound_router_capacity? + .unwrap_or(DEFAULT_INBOUND_ROUTER_CAPACITY), + outbound_router_capacity: outbound_router_capacity? + .unwrap_or(DEFAULT_OUTBOUND_ROUTER_CAPACITY), + resolv_conf_path: resolv_conf_path? .unwrap_or(DEFAULT_RESOLV_CONF.into()) .into(), diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 5ee70e8b3..5dfae3c1e 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -229,6 +229,7 @@ where let fut = serve( inbound_listener, Inbound::new(default_addr, bind), + config.inbound_router_capacity, config.private_connect_timeout, config.inbound_ports_disable_protocol_detection, ctx, @@ -250,6 +251,7 @@ where let fut = serve( outbound_listener, outgoing, + config.outbound_router_capacity, config.public_connect_timeout, config.outbound_ports_disable_protocol_detection, ctx, @@ -326,6 +328,7 @@ where fn serve( bound_port: BoundPort, recognize: R, + router_capacity: usize, tcp_connect_timeout: Duration, disable_protocol_detection_ports: IndexSet, proxy_ctx: Arc, @@ -347,7 +350,7 @@ where + 'static, G: GetOriginalDst + 'static, { - let router = Router::new(recognize); + let router = Router::new(recognize, router_capacity); let stack = Arc::new(NewServiceFn::new(move || { // Clone the router handle let router = router.clone(); @@ -367,6 +370,12 @@ where error!("turning route not recognized error into 500"); http::StatusCode::INTERNAL_SERVER_ERROR } + RouteError::NoCapacity(capacity) => { + // TODO For H2 streams, we should probably signal a protocol-level + // capacity change. + error!("router at capacity ({}); returning a 503", capacity); + http::StatusCode::SERVICE_UNAVAILABLE + } } });