proxy: Track SingleUse services against router capacity (#902)

PR #898 introduces capacity limits to the balancer. However, because the
router supports "single-use" routes--routes that are bound only for the
life of a single HTTP1 request--it is easy for a router to exceed its
configured capacity.

In order to fix this, the `Reuse` type is removed from the router
library so that _all_ routes are considered cacheable. It's now the
responsibility of the bound service to enforce policies with regards to
client retention.

Routes were not added to the cache when the service could not be used to
process more than a single request. Now, `Bind` wraps its returned
services (via the `Binding` type), that dictate whether a single client
is reused or if one is bound for each request.

This enables all routes to be cached without changing behavior with
regards to connection reuse.
This commit is contained in:
Oliver Gould 2018-05-08 10:57:56 -07:00 committed by GitHub
parent a80da120ad
commit 3d6586a19f
4 changed files with 130 additions and 143 deletions

View File

@ -7,7 +7,6 @@ use indexmap::IndexMap;
use tower_service::Service; use tower_service::Service;
use std::{error, fmt, mem}; use std::{error, fmt, mem};
use std::convert::AsRef;
use std::hash::Hash; use std::hash::Hash;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -46,7 +45,7 @@ pub trait Recognize {
Error = Self::Error>; Error = Self::Error>;
/// Determines the key for a route to handle the given request. /// Determines the key for a route to handle the given request.
fn recognize(&self, req: &Self::Request) -> Option<Reuse<Self::Key>>; fn recognize(&self, req: &Self::Request) -> Option<Self::Key>;
/// Return a `Service` to handle requests. /// Return a `Service` to handle requests.
/// ///
@ -57,18 +56,6 @@ pub trait Recognize {
pub struct Single<S>(Option<S>); pub struct Single<S>(Option<S>);
/// Whether or not the service to a given key may be cached.
///
/// Some services may, for various reasons, may not be able to
/// be used to serve multiple requests. When this is the case,
/// implementors of `recognize` may use `Reuse::SingleUse` to
/// indicate that the service should not be cached.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Reuse<T> {
Reusable(T),
SingleUse(T),
}
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum Error<T, U> { pub enum Error<T, U> {
Inner(T), Inner(T),
@ -147,42 +134,33 @@ where T: Recognize,
/// Routes the request through an underlying service. /// Routes the request through an underlying service.
/// ///
/// The response fails if the request cannot be routed. /// The response fails when the request cannot be routed.
fn call(&mut self, request: Self::Request) -> Self::Future { fn call(&mut self, request: Self::Request) -> Self::Future {
let inner = &mut *self.inner.lock().expect("lock router cache"); let inner = &mut *self.inner.lock().expect("lock router cache");
match inner.recognize.recognize(&request) { let key = match inner.recognize.recognize(&request) {
None => ResponseFuture::not_recognized(), Some(key) => key,
None => return ResponseFuture::not_recognized(),
};
Some(Reuse::SingleUse(key)) => { // First, try to load a cached route for `key`.
// TODO Keep SingleUse services in the cache as well, so that their if let Some(service) = inner.routes.get_mut(&key) {
// capacity is considered. To do this, we should move the Reuse logic into return ResponseFuture::new(service.call(request));
// the returned service (and not the key).
let mut service = try_bind_route!(inner.recognize.bind_service(&key));
ResponseFuture::new(service.call(request))
}
Some(Reuse::Reusable(key)) => {
// First, try to load a cached route for `key`.
if let Some(service) = inner.routes.get_mut(&key) {
return ResponseFuture::new(service.call(request));
}
// Since there wasn't a cached route, ensure that there is capacity for a
// new one.
if inner.routes.len() == inner.capacity {
// TODO If the cache is full, evict the oldest inactive route. If all
// routes are active, fail the request.
return ResponseFuture::no_capacity(inner.capacity);
}
// Bind a new route, send the request on the route, and cache the route.
let mut service = try_bind_route!(inner.recognize.bind_service(&key));
let response = service.call(request);
inner.routes.insert(key, service);
ResponseFuture::new(response)
}
} }
// Since there wasn't a cached route, ensure that there is capacity for a
// new one.
if inner.routes.len() == inner.capacity {
// TODO If the cache is full, evict the oldest inactive route. If all
// routes are active, fail the request.
return ResponseFuture::no_capacity(inner.capacity);
}
// Bind a new route, send the request on the route, and cache the route.
let mut service = try_bind_route!(inner.recognize.bind_service(&key));
let response = service.call(request);
inner.routes.insert(key, service);
ResponseFuture::new(response)
} }
} }
@ -210,8 +188,8 @@ impl<S: Service> Recognize for Single<S> {
type RouteError = (); type RouteError = ();
type Service = S; type Service = S;
fn recognize(&self, _: &Self::Request) -> Option<Reuse<Self::Key>> { fn recognize(&self, _: &Self::Request) -> Option<Self::Key> {
Some(Reuse::Reusable(())) Some(())
} }
fn bind_service(&mut self, _: &Self::Key) -> Result<S, Self::RouteError> { fn bind_service(&mut self, _: &Self::Key) -> Result<S, Self::RouteError> {
@ -302,22 +280,11 @@ where
} }
} }
// ===== impl Reuse =====
impl<T> AsRef<T> for Reuse<T> {
fn as_ref(&self) -> &T {
match *self {
Reuse::Reusable(ref key) => key,
Reuse::SingleUse(ref key) => key,
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use futures::{Poll, Future, future}; use futures::{Poll, Future, future};
use tower_service::Service; use tower_service::Service;
use super::{Error, Reuse, Router}; use super::{Error, Router};
struct Recognize; struct Recognize;
@ -325,8 +292,7 @@ mod tests {
enum Request { enum Request {
NotRecognized, NotRecognized,
Reusable(usize), Recgonized(usize),
SingleUse(usize),
} }
impl super::Recognize for Recognize { impl super::Recognize for Recognize {
@ -337,11 +303,10 @@ mod tests {
type RouteError = (); type RouteError = ();
type Service = MultiplyAndAssign; type Service = MultiplyAndAssign;
fn recognize(&self, req: &Self::Request) -> Option<Reuse<Self::Key>> { fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
match *req { match *req {
Request::NotRecognized => None, Request::NotRecognized => None,
Request::Reusable(n) => Some(Reuse::Reusable(n)), Request::Recgonized(n) => Some(n),
Request::SingleUse(n) => Some(Reuse::SingleUse(n)),
} }
} }
@ -363,8 +328,7 @@ mod tests {
fn call(&mut self, req: Self::Request) -> Self::Future { fn call(&mut self, req: Self::Request) -> Self::Future {
let n = match req { let n = match req {
Request::NotRecognized => unreachable!(), Request::NotRecognized => unreachable!(),
Request::Reusable(n) => n, Request::Recgonized(n) => n,
Request::SingleUse(n) => n,
}; };
self.0 *= n; self.0 *= n;
future::ok(self.0) future::ok(self.0)
@ -390,46 +354,24 @@ mod tests {
} }
#[test] #[test]
fn reuse_limited_by_capacity() { fn cache_limited_by_capacity() {
let mut router = Router::new(Recognize, 1); let mut router = Router::new(Recognize, 1);
let rsp = router.call_ok(Request::Reusable(2)); let rsp = router.call_ok(Request::Recgonized(2));
assert_eq!(rsp, 2); assert_eq!(rsp, 2);
let rsp = router.call_err(Request::Reusable(3)); let rsp = router.call_err(Request::Recgonized(3));
assert_eq!(rsp, Error::NoCapacity(1)); assert_eq!(rsp, Error::NoCapacity(1));
} }
#[test] #[test]
fn reuse_shares_service() { fn services_cached() {
let mut router = Router::new(Recognize, 1); let mut router = Router::new(Recognize, 1);
let rsp = router.call_ok(Request::Reusable(2)); let rsp = router.call_ok(Request::Recgonized(2));
assert_eq!(rsp, 2); assert_eq!(rsp, 2);
let rsp = router.call_ok(Request::Reusable(2)); let rsp = router.call_ok(Request::Recgonized(2));
assert_eq!(rsp, 4); assert_eq!(rsp, 4);
} }
#[test]
fn single_use_does_not_share_service() {
let mut router = Router::new(Recognize, 1);
let rsp = router.call_ok(Request::SingleUse(2));
assert_eq!(rsp, 2);
let rsp = router.call_ok(Request::SingleUse(2));
assert_eq!(rsp, 2);
}
#[test]
fn single_use_not_cached_or_limited_by_capacity() {
let mut router = Router::new(Recognize, 1);
let rsp = router.call_ok(Request::Reusable(2));
assert_eq!(rsp, 2);
let rsp = router.call_ok(Request::SingleUse(2));
assert_eq!(rsp, 2);
}
} }

View File

@ -5,16 +5,13 @@ use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use futures::{Future, Poll}; use futures::{Future, Poll, future};
use futures::future::Map;
use http::{self, uri}; use http::{self, uri};
use tokio_core::reactor::Handle; use tokio_core::reactor::Handle;
use tower_service as tower; use tower_service as tower;
use tower_h2; use tower_h2;
use tower_reconnect::Reconnect; use tower_reconnect::Reconnect;
use conduit_proxy_router::Reuse;
use control; use control;
use control::discovery::Endpoint; use control::discovery::Endpoint;
use ctx; use ctx;
@ -43,6 +40,26 @@ pub struct BindProtocol<C, B> {
protocol: Protocol, protocol: Protocol,
} }
/// A type of service binding
///
/// Some services, for various reasons, may not be able to be used to serve multiple
/// requests. The `BindsPerRequest` binding ensures that a new stack is bound for each
/// request.
///
/// `Bound` serivces may be used to process an arbitrary number of requests.
pub enum Binding<B: tower_h2::Body + 'static> {
Bound(Stack<B>),
BindsPerRequest {
endpoint: Endpoint,
protocol: Protocol,
bind: Bind<Arc<ctx::Proxy>, B>,
// When `poll_ready` is called, the _next_ service to be used may be bound
// ahead-of-time. This stack is used only to serve the next request to this
// service.
next: Option<Stack<B>>
},
}
/// Protocol portion of the `Recognize` key for a request. /// Protocol portion of the `Recognize` key for a request.
/// ///
/// This marks whether to use HTTP/2 or HTTP/1.x for a request. In /// This marks whether to use HTTP/2 or HTTP/1.x for a request. In
@ -73,7 +90,9 @@ pub struct NormalizeUri<S> {
inner: S inner: S
} }
pub type Service<B> = Reconnect<NormalizeUri<NewHttp<B>>>; pub type Service<B> = Binding<B>;
pub type Stack<B> = Reconnect<NormalizeUri<NewHttp<B>>>;
pub type NewHttp<B> = sensor::NewHttp<Client<B>, B, HttpBody>; pub type NewHttp<B> = sensor::NewHttp<Client<B>, B, HttpBody>;
@ -81,10 +100,7 @@ pub type HttpResponse = http::Response<sensor::http::ResponseBody<HttpBody>>;
pub type HttpRequest<B> = http::Request<sensor::http::RequestBody<B>>; pub type HttpRequest<B> = http::Request<sensor::http::RequestBody<B>>;
pub type Client<B> = transparency::Client< pub type Client<B> = transparency::Client<sensor::Connect<transport::Connect>, B>;
sensor::Connect<transport::Connect>,
B,
>;
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub enum BufferSpawnError { pub enum BufferSpawnError {
@ -155,31 +171,17 @@ impl<C: Clone, B> Clone for Bind<C, B> {
impl<C, B> Bind<C, B> { impl<C, B> Bind<C, B> {
// pub fn ctx(&self) -> &C {
// &self.ctx
// }
pub fn executor(&self) -> &Handle { pub fn executor(&self) -> &Handle {
&self.executor &self.executor
} }
// pub fn req_ids(&self) -> &Arc<AtomicUsize> {
// &self.req_ids
// }
// pub fn sensors(&self) -> &telemetry::Sensors {
// &self.sensors
// }
} }
impl<B> Bind<Arc<ctx::Proxy>, B> impl<B> Bind<Arc<ctx::Proxy>, B>
where where
B: tower_h2::Body + 'static, B: tower_h2::Body + 'static,
{ {
pub fn bind_service(&self, ep: &Endpoint, protocol: &Protocol) -> Service<B> { fn bind_stack(&self, ep: &Endpoint, protocol: &Protocol) -> Stack<B> {
trace!("bind_service endpoint={:?}, protocol={:?}", ep, protocol); debug!("bind_stack endpoint={:?}, protocol={:?}", ep, protocol);
let addr = ep.address(); let addr = ep.address();
let client_ctx = ctx::transport::Client::new( let client_ctx = ctx::transport::Client::new(
&self.ctx, &self.ctx,
@ -196,7 +198,7 @@ where
let client = transparency::Client::new( let client = transparency::Client::new(
protocol, protocol,
connect, connect,
self.executor.clone(), self.executor.clone()
); );
let sensors = self.sensors.http( let sensors = self.sensors.http(
@ -214,6 +216,19 @@ where
// TODO: Add some sort of backoff logic. // TODO: Add some sort of backoff logic.
Reconnect::new(proxy) Reconnect::new(proxy)
} }
pub fn new_binding(&self, ep: &Endpoint, protocol: &Protocol) -> Binding<B> {
if protocol.can_reuse_clients() {
Binding::Bound(self.bind_stack(ep, protocol))
} else {
Binding::BindsPerRequest {
endpoint: ep.clone(),
protocol: protocol.clone(),
bind: self.clone(),
next: None
}
}
}
} }
// ===== impl BindProtocol ===== // ===== impl BindProtocol =====
@ -240,7 +255,7 @@ where
type BindError = (); type BindError = ();
fn bind(&self, ep: &Endpoint) -> Result<Self::Service, Self::BindError> { fn bind(&self, ep: &Endpoint) -> Result<Self::Service, Self::BindError> {
Ok(self.bind.bind_service(ep, &self.protocol)) Ok(self.bind.new_binding(ep, &self.protocol))
} }
} }
@ -272,7 +287,7 @@ where
type Error = <Self::Service as tower::Service>::Error; type Error = <Self::Service as tower::Service>::Error;
type Service = NormalizeUri<S::Service>; type Service = NormalizeUri<S::Service>;
type InitError = S::InitError; type InitError = S::InitError;
type Future = Map< type Future = future::Map<
S::Future, S::Future,
fn(S::Service) -> NormalizeUri<S::Service> fn(S::Service) -> NormalizeUri<S::Service>
>; >;
@ -306,6 +321,44 @@ where
} }
} }
// ===== impl Binding =====
impl<B: tower_h2::Body + 'static> tower::Service for Binding<B> {
type Request = <Stack<B> as tower::Service>::Request;
type Response = <Stack<B> as tower::Service>::Response;
type Error = <Stack<B> as tower::Service>::Error;
type Future = <Stack<B> as tower::Service>::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
match *self {
// A service is already bound, so poll its readiness.
Binding::Bound(ref mut svc) |
Binding::BindsPerRequest { next: Some(ref mut svc), .. } => svc.poll_ready(),
// If no stack has been bound, bind it now so that its readiness can be
// checked. Store it so it can be consumed to dispatch the next request.
Binding::BindsPerRequest { ref endpoint, ref protocol, ref bind, ref mut next } => {
let mut svc = bind.bind_stack(endpoint, protocol);
let ready = svc.poll_ready()?;
*next = Some(svc);
Ok(ready)
}
}
}
fn call(&mut self, request: Self::Request) -> Self::Future {
match *self {
Binding::Bound(ref mut svc) => svc.call(request),
Binding::BindsPerRequest { ref endpoint, ref protocol, ref bind, ref mut next } => {
// If a service has already been bound in `poll_ready`, consume it.
// Otherwise, bind a new service on-the-spot.
let mut svc = next.take().unwrap_or_else(|| bind.bind_stack(endpoint, protocol));
svc.call(request)
}
}
}
}
// ===== impl Protocol ===== // ===== impl Protocol =====
@ -327,18 +380,10 @@ impl Protocol {
Protocol::Http1(host) Protocol::Http1(host)
} }
pub fn is_cachable(&self) -> bool { pub fn can_reuse_clients(&self) -> bool {
match *self { match *self {
Protocol::Http2 | Protocol::Http1(Host::Authority(_)) => true, Protocol::Http2 | Protocol::Http1(Host::Authority(_)) => true,
_ => false, _ => false,
} }
} }
pub fn into_key<T>(self, key: T) -> Reuse<(T, Protocol)> {
if self.is_cachable() {
Reuse::Reusable((key, self))
} else {
Reuse::SingleUse((key, self))
}
}
} }

View File

@ -6,7 +6,7 @@ use tower_service as tower;
use tower_buffer::{self, Buffer}; use tower_buffer::{self, Buffer};
use tower_in_flight_limit::{self, InFlightLimit}; use tower_in_flight_limit::{self, InFlightLimit};
use tower_h2; use tower_h2;
use conduit_proxy_router::{Reuse, Recognize}; use conduit_proxy_router::Recognize;
use bind; use bind;
use ctx; use ctx;
@ -46,7 +46,7 @@ where
type RouteError = bind::BufferSpawnError; type RouteError = bind::BufferSpawnError;
type Service = InFlightLimit<Buffer<bind::Service<B>>>; type Service = InFlightLimit<Buffer<bind::Service<B>>>;
fn recognize(&self, req: &Self::Request) -> Option<Reuse<Self::Key>> { fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
let key = req.extensions() let key = req.extensions()
.get::<Arc<ctx::transport::Server>>() .get::<Arc<ctx::transport::Server>>()
.and_then(|ctx| { .and_then(|ctx| {
@ -57,7 +57,7 @@ where
let proto = bind::Protocol::detect(req); let proto = bind::Protocol::detect(req);
let key = key.map(move|addr| proto.into_key(addr)); let key = key.map(move |addr| (addr, proto));
trace!("recognize key={:?}", key); trace!("recognize key={:?}", key);
@ -75,8 +75,8 @@ where
debug!("building inbound {:?} client to {}", proto, addr); debug!("building inbound {:?} client to {}", proto, addr);
let endpoint = (*addr).into(); let endpoint = (*addr).into();
let bind = self.bind.bind_service(&endpoint, proto); let binding = self.bind.new_binding(&endpoint, proto);
Buffer::new(bind, self.bind.executor()) Buffer::new(binding, self.bind.executor())
.map(|buffer| { .map(|buffer| {
InFlightLimit::new(buffer, MAX_IN_FLIGHT) InFlightLimit::new(buffer, MAX_IN_FLIGHT)
}) })
@ -116,7 +116,7 @@ mod tests {
let srv_ctx = ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst)); let srv_ctx = ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst));
let rec = srv_ctx.orig_dst_if_not_local().map(|addr| let rec = srv_ctx.orig_dst_if_not_local().map(|addr|
bind::Protocol::Http1(Host::NoAuthority).into_key(addr) (addr, bind::Protocol::Http1(Host::NoAuthority))
); );
let mut req = http::Request::new(()); let mut req = http::Request::new(());
@ -145,7 +145,7 @@ mod tests {
)); ));
inbound.recognize(&req) == default.map(|addr| inbound.recognize(&req) == default.map(|addr|
bind::Protocol::Http1(Host::NoAuthority).into_key(addr) (addr, bind::Protocol::Http1(Host::NoAuthority))
) )
} }
@ -157,7 +157,7 @@ mod tests {
let req = http::Request::new(()); let req = http::Request::new(());
inbound.recognize(&req) == default.map(|addr| inbound.recognize(&req) == default.map(|addr|
bind::Protocol::Http1(Host::NoAuthority).into_key(addr) (addr, bind::Protocol::Http1(Host::NoAuthority))
) )
} }
@ -180,7 +180,7 @@ mod tests {
)); ));
inbound.recognize(&req) == default.map(|addr| inbound.recognize(&req) == default.map(|addr|
bind::Protocol::Http1(Host::NoAuthority).into_key(addr) (addr, bind::Protocol::Http1(Host::NoAuthority))
) )
} }
} }

View File

@ -12,7 +12,7 @@ use tower_buffer::Buffer;
use tower_discover::{Change, Discover}; use tower_discover::{Change, Discover};
use tower_in_flight_limit::InFlightLimit; use tower_in_flight_limit::InFlightLimit;
use tower_h2; use tower_h2;
use conduit_proxy_router::{Reuse, Recognize}; use conduit_proxy_router::Recognize;
use bind::{self, Bind, Protocol}; use bind::{self, Bind, Protocol};
use control::{self, discovery}; use control::{self, discovery};
@ -67,7 +67,7 @@ where
choose::PowerOfTwoChoices<rand::ThreadRng> choose::PowerOfTwoChoices<rand::ThreadRng>
>>>>; >>>>;
fn recognize(&self, req: &Self::Request) -> Option<Reuse<Self::Key>> { fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
let proto = bind::Protocol::detect(req); let proto = bind::Protocol::detect(req);
// The request URI and Host: header have not yet been normalized // The request URI and Host: header have not yet been normalized
@ -106,7 +106,7 @@ where
// original destination. // original destination.
let dest = dest?; let dest = dest?;
Some(proto.into_key(dest)) Some((dest, proto))
} }
/// Builds a dynamic, load balancing service. /// Builds a dynamic, load balancing service.