diff --git a/proxy/router/src/lib.rs b/proxy/router/src/lib.rs index 1abe59fa6..8846b8b6f 100644 --- a/proxy/router/src/lib.rs +++ b/proxy/router/src/lib.rs @@ -7,7 +7,6 @@ use indexmap::IndexMap; use tower_service::Service; use std::{error, fmt, mem}; -use std::convert::AsRef; use std::hash::Hash; use std::sync::{Arc, Mutex}; @@ -46,7 +45,7 @@ pub trait Recognize { Error = Self::Error>; /// Determines the key for a route to handle the given request. - fn recognize(&self, req: &Self::Request) -> Option>; + fn recognize(&self, req: &Self::Request) -> Option; /// Return a `Service` to handle requests. /// @@ -57,18 +56,6 @@ pub trait Recognize { pub struct Single(Option); -/// Whether or not the service to a given key may be cached. -/// -/// Some services may, for various reasons, may not be able to -/// be used to serve multiple requests. When this is the case, -/// implementors of `recognize` may use `Reuse::SingleUse` to -/// indicate that the service should not be cached. -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum Reuse { - Reusable(T), - SingleUse(T), -} - #[derive(Debug, PartialEq)] pub enum Error { Inner(T), @@ -147,42 +134,33 @@ where T: Recognize, /// Routes the request through an underlying service. /// - /// The response fails if the request cannot be routed. + /// The response fails when the request cannot be routed. fn call(&mut self, request: Self::Request) -> Self::Future { let inner = &mut *self.inner.lock().expect("lock router cache"); - match inner.recognize.recognize(&request) { - None => ResponseFuture::not_recognized(), + let key = match inner.recognize.recognize(&request) { + Some(key) => key, + None => return ResponseFuture::not_recognized(), + }; - Some(Reuse::SingleUse(key)) => { - // TODO Keep SingleUse services in the cache as well, so that their - // capacity is considered. To do this, we should move the Reuse logic into - // the returned service (and not the key). - let mut service = try_bind_route!(inner.recognize.bind_service(&key)); - ResponseFuture::new(service.call(request)) - } - - Some(Reuse::Reusable(key)) => { - // First, try to load a cached route for `key`. - if let Some(service) = inner.routes.get_mut(&key) { - return ResponseFuture::new(service.call(request)); - } - - // Since there wasn't a cached route, ensure that there is capacity for a - // new one. - if inner.routes.len() == inner.capacity { - // TODO If the cache is full, evict the oldest inactive route. If all - // routes are active, fail the request. - return ResponseFuture::no_capacity(inner.capacity); - } - - // Bind a new route, send the request on the route, and cache the route. - let mut service = try_bind_route!(inner.recognize.bind_service(&key)); - let response = service.call(request); - inner.routes.insert(key, service); - ResponseFuture::new(response) - } + // First, try to load a cached route for `key`. + if let Some(service) = inner.routes.get_mut(&key) { + return ResponseFuture::new(service.call(request)); } + + // Since there wasn't a cached route, ensure that there is capacity for a + // new one. + if inner.routes.len() == inner.capacity { + // TODO If the cache is full, evict the oldest inactive route. If all + // routes are active, fail the request. + return ResponseFuture::no_capacity(inner.capacity); + } + + // Bind a new route, send the request on the route, and cache the route. + let mut service = try_bind_route!(inner.recognize.bind_service(&key)); + let response = service.call(request); + inner.routes.insert(key, service); + ResponseFuture::new(response) } } @@ -210,8 +188,8 @@ impl Recognize for Single { type RouteError = (); type Service = S; - fn recognize(&self, _: &Self::Request) -> Option> { - Some(Reuse::Reusable(())) + fn recognize(&self, _: &Self::Request) -> Option { + Some(()) } fn bind_service(&mut self, _: &Self::Key) -> Result { @@ -302,22 +280,11 @@ where } } -// ===== impl Reuse ===== - -impl AsRef for Reuse { - fn as_ref(&self) -> &T { - match *self { - Reuse::Reusable(ref key) => key, - Reuse::SingleUse(ref key) => key, - } - } -} - #[cfg(test)] mod tests { use futures::{Poll, Future, future}; use tower_service::Service; - use super::{Error, Reuse, Router}; + use super::{Error, Router}; struct Recognize; @@ -325,8 +292,7 @@ mod tests { enum Request { NotRecognized, - Reusable(usize), - SingleUse(usize), + Recgonized(usize), } impl super::Recognize for Recognize { @@ -337,11 +303,10 @@ mod tests { type RouteError = (); type Service = MultiplyAndAssign; - fn recognize(&self, req: &Self::Request) -> Option> { + fn recognize(&self, req: &Self::Request) -> Option { match *req { Request::NotRecognized => None, - Request::Reusable(n) => Some(Reuse::Reusable(n)), - Request::SingleUse(n) => Some(Reuse::SingleUse(n)), + Request::Recgonized(n) => Some(n), } } @@ -363,8 +328,7 @@ mod tests { fn call(&mut self, req: Self::Request) -> Self::Future { let n = match req { Request::NotRecognized => unreachable!(), - Request::Reusable(n) => n, - Request::SingleUse(n) => n, + Request::Recgonized(n) => n, }; self.0 *= n; future::ok(self.0) @@ -390,46 +354,24 @@ mod tests { } #[test] - fn reuse_limited_by_capacity() { + fn cache_limited_by_capacity() { let mut router = Router::new(Recognize, 1); - let rsp = router.call_ok(Request::Reusable(2)); + let rsp = router.call_ok(Request::Recgonized(2)); assert_eq!(rsp, 2); - let rsp = router.call_err(Request::Reusable(3)); + let rsp = router.call_err(Request::Recgonized(3)); assert_eq!(rsp, Error::NoCapacity(1)); } #[test] - fn reuse_shares_service() { + fn services_cached() { let mut router = Router::new(Recognize, 1); - let rsp = router.call_ok(Request::Reusable(2)); + let rsp = router.call_ok(Request::Recgonized(2)); assert_eq!(rsp, 2); - let rsp = router.call_ok(Request::Reusable(2)); + let rsp = router.call_ok(Request::Recgonized(2)); assert_eq!(rsp, 4); } - - #[test] - fn single_use_does_not_share_service() { - let mut router = Router::new(Recognize, 1); - - let rsp = router.call_ok(Request::SingleUse(2)); - assert_eq!(rsp, 2); - - let rsp = router.call_ok(Request::SingleUse(2)); - assert_eq!(rsp, 2); - } - - #[test] - fn single_use_not_cached_or_limited_by_capacity() { - let mut router = Router::new(Recognize, 1); - - let rsp = router.call_ok(Request::Reusable(2)); - assert_eq!(rsp, 2); - - let rsp = router.call_ok(Request::SingleUse(2)); - assert_eq!(rsp, 2); - } } diff --git a/proxy/src/bind.rs b/proxy/src/bind.rs index e689df4c3..4955d4c40 100644 --- a/proxy/src/bind.rs +++ b/proxy/src/bind.rs @@ -5,16 +5,13 @@ use std::marker::PhantomData; use std::sync::Arc; use std::sync::atomic::AtomicUsize; -use futures::{Future, Poll}; -use futures::future::Map; +use futures::{Future, Poll, future}; use http::{self, uri}; use tokio_core::reactor::Handle; use tower_service as tower; use tower_h2; use tower_reconnect::Reconnect; -use conduit_proxy_router::Reuse; - use control; use control::discovery::Endpoint; use ctx; @@ -43,6 +40,26 @@ pub struct BindProtocol { protocol: Protocol, } +/// A type of service binding +/// +/// Some services, for various reasons, may not be able to be used to serve multiple +/// requests. The `BindsPerRequest` binding ensures that a new stack is bound for each +/// request. +/// +/// `Bound` serivces may be used to process an arbitrary number of requests. +pub enum Binding { + Bound(Stack), + BindsPerRequest { + endpoint: Endpoint, + protocol: Protocol, + bind: Bind, B>, + // When `poll_ready` is called, the _next_ service to be used may be bound + // ahead-of-time. This stack is used only to serve the next request to this + // service. + next: Option> + }, +} + /// Protocol portion of the `Recognize` key for a request. /// /// This marks whether to use HTTP/2 or HTTP/1.x for a request. In @@ -73,7 +90,9 @@ pub struct NormalizeUri { inner: S } -pub type Service = Reconnect>>; +pub type Service = Binding; + +pub type Stack = Reconnect>>; pub type NewHttp = sensor::NewHttp, B, HttpBody>; @@ -81,10 +100,7 @@ pub type HttpResponse = http::Response>; pub type HttpRequest = http::Request>; -pub type Client = transparency::Client< - sensor::Connect, - B, ->; +pub type Client = transparency::Client, B>; #[derive(Copy, Clone, Debug)] pub enum BufferSpawnError { @@ -155,31 +171,17 @@ impl Clone for Bind { impl Bind { - - // pub fn ctx(&self) -> &C { - // &self.ctx - // } - pub fn executor(&self) -> &Handle { &self.executor } - - // pub fn req_ids(&self) -> &Arc { - // &self.req_ids - // } - - // pub fn sensors(&self) -> &telemetry::Sensors { - // &self.sensors - // } - } impl Bind, B> where B: tower_h2::Body + 'static, { - pub fn bind_service(&self, ep: &Endpoint, protocol: &Protocol) -> Service { - trace!("bind_service endpoint={:?}, protocol={:?}", ep, protocol); + fn bind_stack(&self, ep: &Endpoint, protocol: &Protocol) -> Stack { + debug!("bind_stack endpoint={:?}, protocol={:?}", ep, protocol); let addr = ep.address(); let client_ctx = ctx::transport::Client::new( &self.ctx, @@ -196,7 +198,7 @@ where let client = transparency::Client::new( protocol, connect, - self.executor.clone(), + self.executor.clone() ); let sensors = self.sensors.http( @@ -214,6 +216,19 @@ where // TODO: Add some sort of backoff logic. Reconnect::new(proxy) } + + pub fn new_binding(&self, ep: &Endpoint, protocol: &Protocol) -> Binding { + if protocol.can_reuse_clients() { + Binding::Bound(self.bind_stack(ep, protocol)) + } else { + Binding::BindsPerRequest { + endpoint: ep.clone(), + protocol: protocol.clone(), + bind: self.clone(), + next: None + } + } + } } // ===== impl BindProtocol ===== @@ -240,7 +255,7 @@ where type BindError = (); fn bind(&self, ep: &Endpoint) -> Result { - Ok(self.bind.bind_service(ep, &self.protocol)) + Ok(self.bind.new_binding(ep, &self.protocol)) } } @@ -272,7 +287,7 @@ where type Error = ::Error; type Service = NormalizeUri; type InitError = S::InitError; - type Future = Map< + type Future = future::Map< S::Future, fn(S::Service) -> NormalizeUri >; @@ -306,6 +321,44 @@ where } } +// ===== impl Binding ===== + +impl tower::Service for Binding { + type Request = as tower::Service>::Request; + type Response = as tower::Service>::Response; + type Error = as tower::Service>::Error; + type Future = as tower::Service>::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + match *self { + // A service is already bound, so poll its readiness. + Binding::Bound(ref mut svc) | + Binding::BindsPerRequest { next: Some(ref mut svc), .. } => svc.poll_ready(), + + // If no stack has been bound, bind it now so that its readiness can be + // checked. Store it so it can be consumed to dispatch the next request. + Binding::BindsPerRequest { ref endpoint, ref protocol, ref bind, ref mut next } => { + let mut svc = bind.bind_stack(endpoint, protocol); + let ready = svc.poll_ready()?; + *next = Some(svc); + Ok(ready) + } + } + } + + fn call(&mut self, request: Self::Request) -> Self::Future { + match *self { + Binding::Bound(ref mut svc) => svc.call(request), + Binding::BindsPerRequest { ref endpoint, ref protocol, ref bind, ref mut next } => { + // If a service has already been bound in `poll_ready`, consume it. + // Otherwise, bind a new service on-the-spot. + let mut svc = next.take().unwrap_or_else(|| bind.bind_stack(endpoint, protocol)); + svc.call(request) + } + } + } +} + // ===== impl Protocol ===== @@ -327,18 +380,10 @@ impl Protocol { Protocol::Http1(host) } - pub fn is_cachable(&self) -> bool { + pub fn can_reuse_clients(&self) -> bool { match *self { Protocol::Http2 | Protocol::Http1(Host::Authority(_)) => true, _ => false, } } - - pub fn into_key(self, key: T) -> Reuse<(T, Protocol)> { - if self.is_cachable() { - Reuse::Reusable((key, self)) - } else { - Reuse::SingleUse((key, self)) - } - } } diff --git a/proxy/src/inbound.rs b/proxy/src/inbound.rs index 8e303595e..77941e721 100644 --- a/proxy/src/inbound.rs +++ b/proxy/src/inbound.rs @@ -6,7 +6,7 @@ use tower_service as tower; use tower_buffer::{self, Buffer}; use tower_in_flight_limit::{self, InFlightLimit}; use tower_h2; -use conduit_proxy_router::{Reuse, Recognize}; +use conduit_proxy_router::Recognize; use bind; use ctx; @@ -46,7 +46,7 @@ where type RouteError = bind::BufferSpawnError; type Service = InFlightLimit>>; - fn recognize(&self, req: &Self::Request) -> Option> { + fn recognize(&self, req: &Self::Request) -> Option { let key = req.extensions() .get::>() .and_then(|ctx| { @@ -57,7 +57,7 @@ where let proto = bind::Protocol::detect(req); - let key = key.map(move|addr| proto.into_key(addr)); + let key = key.map(move |addr| (addr, proto)); trace!("recognize key={:?}", key); @@ -75,8 +75,8 @@ where debug!("building inbound {:?} client to {}", proto, addr); let endpoint = (*addr).into(); - let bind = self.bind.bind_service(&endpoint, proto); - Buffer::new(bind, self.bind.executor()) + let binding = self.bind.new_binding(&endpoint, proto); + Buffer::new(binding, self.bind.executor()) .map(|buffer| { InFlightLimit::new(buffer, MAX_IN_FLIGHT) }) @@ -116,7 +116,7 @@ mod tests { let srv_ctx = ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst)); let rec = srv_ctx.orig_dst_if_not_local().map(|addr| - bind::Protocol::Http1(Host::NoAuthority).into_key(addr) + (addr, bind::Protocol::Http1(Host::NoAuthority)) ); let mut req = http::Request::new(()); @@ -145,7 +145,7 @@ mod tests { )); inbound.recognize(&req) == default.map(|addr| - bind::Protocol::Http1(Host::NoAuthority).into_key(addr) + (addr, bind::Protocol::Http1(Host::NoAuthority)) ) } @@ -157,7 +157,7 @@ mod tests { let req = http::Request::new(()); inbound.recognize(&req) == default.map(|addr| - bind::Protocol::Http1(Host::NoAuthority).into_key(addr) + (addr, bind::Protocol::Http1(Host::NoAuthority)) ) } @@ -180,7 +180,7 @@ mod tests { )); inbound.recognize(&req) == default.map(|addr| - bind::Protocol::Http1(Host::NoAuthority).into_key(addr) + (addr, bind::Protocol::Http1(Host::NoAuthority)) ) } } diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index 85b10fe70..d0deea4a3 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -12,7 +12,7 @@ use tower_buffer::Buffer; use tower_discover::{Change, Discover}; use tower_in_flight_limit::InFlightLimit; use tower_h2; -use conduit_proxy_router::{Reuse, Recognize}; +use conduit_proxy_router::Recognize; use bind::{self, Bind, Protocol}; use control::{self, discovery}; @@ -67,7 +67,7 @@ where choose::PowerOfTwoChoices >>>>; - fn recognize(&self, req: &Self::Request) -> Option> { + fn recognize(&self, req: &Self::Request) -> Option { let proto = bind::Protocol::detect(req); // The request URI and Host: header have not yet been normalized @@ -106,7 +106,7 @@ where // original destination. let dest = dest?; - Some(proto.into_key(dest)) + Some((dest, proto)) } /// Builds a dynamic, load balancing service.