mirror of https://github.com/linkerd/linkerd2.git
proxy: Track SingleUse services against router capacity (#902)
PR #898 introduces capacity limits to the balancer. However, because the router supports "single-use" routes--routes that are bound only for the life of a single HTTP1 request--it is easy for a router to exceed its configured capacity. In order to fix this, the `Reuse` type is removed from the router library so that _all_ routes are considered cacheable. It's now the responsibility of the bound service to enforce policies with regards to client retention. Routes were not added to the cache when the service could not be used to process more than a single request. Now, `Bind` wraps its returned services (via the `Binding` type), that dictate whether a single client is reused or if one is bound for each request. This enables all routes to be cached without changing behavior with regards to connection reuse.
This commit is contained in:
parent
f94856e489
commit
f4dba72cc3
|
@ -7,7 +7,6 @@ use indexmap::IndexMap;
|
|||
use tower_service::Service;
|
||||
|
||||
use std::{error, fmt, mem};
|
||||
use std::convert::AsRef;
|
||||
use std::hash::Hash;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
|
@ -46,7 +45,7 @@ pub trait Recognize {
|
|||
Error = Self::Error>;
|
||||
|
||||
/// Determines the key for a route to handle the given request.
|
||||
fn recognize(&self, req: &Self::Request) -> Option<Reuse<Self::Key>>;
|
||||
fn recognize(&self, req: &Self::Request) -> Option<Self::Key>;
|
||||
|
||||
/// Return a `Service` to handle requests.
|
||||
///
|
||||
|
@ -57,18 +56,6 @@ pub trait Recognize {
|
|||
|
||||
pub struct Single<S>(Option<S>);
|
||||
|
||||
/// Whether or not the service to a given key may be cached.
|
||||
///
|
||||
/// Some services may, for various reasons, may not be able to
|
||||
/// be used to serve multiple requests. When this is the case,
|
||||
/// implementors of `recognize` may use `Reuse::SingleUse` to
|
||||
/// indicate that the service should not be cached.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum Reuse<T> {
|
||||
Reusable(T),
|
||||
SingleUse(T),
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Error<T, U> {
|
||||
Inner(T),
|
||||
|
@ -147,42 +134,33 @@ where T: Recognize,
|
|||
|
||||
/// Routes the request through an underlying service.
|
||||
///
|
||||
/// The response fails if the request cannot be routed.
|
||||
/// The response fails when the request cannot be routed.
|
||||
fn call(&mut self, request: Self::Request) -> Self::Future {
|
||||
let inner = &mut *self.inner.lock().expect("lock router cache");
|
||||
|
||||
match inner.recognize.recognize(&request) {
|
||||
None => ResponseFuture::not_recognized(),
|
||||
let key = match inner.recognize.recognize(&request) {
|
||||
Some(key) => key,
|
||||
None => return ResponseFuture::not_recognized(),
|
||||
};
|
||||
|
||||
Some(Reuse::SingleUse(key)) => {
|
||||
// TODO Keep SingleUse services in the cache as well, so that their
|
||||
// capacity is considered. To do this, we should move the Reuse logic into
|
||||
// the returned service (and not the key).
|
||||
let mut service = try_bind_route!(inner.recognize.bind_service(&key));
|
||||
ResponseFuture::new(service.call(request))
|
||||
}
|
||||
|
||||
Some(Reuse::Reusable(key)) => {
|
||||
// First, try to load a cached route for `key`.
|
||||
if let Some(service) = inner.routes.get_mut(&key) {
|
||||
return ResponseFuture::new(service.call(request));
|
||||
}
|
||||
|
||||
// Since there wasn't a cached route, ensure that there is capacity for a
|
||||
// new one.
|
||||
if inner.routes.len() == inner.capacity {
|
||||
// TODO If the cache is full, evict the oldest inactive route. If all
|
||||
// routes are active, fail the request.
|
||||
return ResponseFuture::no_capacity(inner.capacity);
|
||||
}
|
||||
|
||||
// Bind a new route, send the request on the route, and cache the route.
|
||||
let mut service = try_bind_route!(inner.recognize.bind_service(&key));
|
||||
let response = service.call(request);
|
||||
inner.routes.insert(key, service);
|
||||
ResponseFuture::new(response)
|
||||
}
|
||||
// First, try to load a cached route for `key`.
|
||||
if let Some(service) = inner.routes.get_mut(&key) {
|
||||
return ResponseFuture::new(service.call(request));
|
||||
}
|
||||
|
||||
// Since there wasn't a cached route, ensure that there is capacity for a
|
||||
// new one.
|
||||
if inner.routes.len() == inner.capacity {
|
||||
// TODO If the cache is full, evict the oldest inactive route. If all
|
||||
// routes are active, fail the request.
|
||||
return ResponseFuture::no_capacity(inner.capacity);
|
||||
}
|
||||
|
||||
// Bind a new route, send the request on the route, and cache the route.
|
||||
let mut service = try_bind_route!(inner.recognize.bind_service(&key));
|
||||
let response = service.call(request);
|
||||
inner.routes.insert(key, service);
|
||||
ResponseFuture::new(response)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -210,8 +188,8 @@ impl<S: Service> Recognize for Single<S> {
|
|||
type RouteError = ();
|
||||
type Service = S;
|
||||
|
||||
fn recognize(&self, _: &Self::Request) -> Option<Reuse<Self::Key>> {
|
||||
Some(Reuse::Reusable(()))
|
||||
fn recognize(&self, _: &Self::Request) -> Option<Self::Key> {
|
||||
Some(())
|
||||
}
|
||||
|
||||
fn bind_service(&mut self, _: &Self::Key) -> Result<S, Self::RouteError> {
|
||||
|
@ -302,22 +280,11 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
// ===== impl Reuse =====
|
||||
|
||||
impl<T> AsRef<T> for Reuse<T> {
|
||||
fn as_ref(&self) -> &T {
|
||||
match *self {
|
||||
Reuse::Reusable(ref key) => key,
|
||||
Reuse::SingleUse(ref key) => key,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::{Poll, Future, future};
|
||||
use tower_service::Service;
|
||||
use super::{Error, Reuse, Router};
|
||||
use super::{Error, Router};
|
||||
|
||||
struct Recognize;
|
||||
|
||||
|
@ -325,8 +292,7 @@ mod tests {
|
|||
|
||||
enum Request {
|
||||
NotRecognized,
|
||||
Reusable(usize),
|
||||
SingleUse(usize),
|
||||
Recgonized(usize),
|
||||
}
|
||||
|
||||
impl super::Recognize for Recognize {
|
||||
|
@ -337,11 +303,10 @@ mod tests {
|
|||
type RouteError = ();
|
||||
type Service = MultiplyAndAssign;
|
||||
|
||||
fn recognize(&self, req: &Self::Request) -> Option<Reuse<Self::Key>> {
|
||||
fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
|
||||
match *req {
|
||||
Request::NotRecognized => None,
|
||||
Request::Reusable(n) => Some(Reuse::Reusable(n)),
|
||||
Request::SingleUse(n) => Some(Reuse::SingleUse(n)),
|
||||
Request::Recgonized(n) => Some(n),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -363,8 +328,7 @@ mod tests {
|
|||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
let n = match req {
|
||||
Request::NotRecognized => unreachable!(),
|
||||
Request::Reusable(n) => n,
|
||||
Request::SingleUse(n) => n,
|
||||
Request::Recgonized(n) => n,
|
||||
};
|
||||
self.0 *= n;
|
||||
future::ok(self.0)
|
||||
|
@ -390,46 +354,24 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn reuse_limited_by_capacity() {
|
||||
fn cache_limited_by_capacity() {
|
||||
let mut router = Router::new(Recognize, 1);
|
||||
|
||||
let rsp = router.call_ok(Request::Reusable(2));
|
||||
let rsp = router.call_ok(Request::Recgonized(2));
|
||||
assert_eq!(rsp, 2);
|
||||
|
||||
let rsp = router.call_err(Request::Reusable(3));
|
||||
let rsp = router.call_err(Request::Recgonized(3));
|
||||
assert_eq!(rsp, Error::NoCapacity(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reuse_shares_service() {
|
||||
fn services_cached() {
|
||||
let mut router = Router::new(Recognize, 1);
|
||||
|
||||
let rsp = router.call_ok(Request::Reusable(2));
|
||||
let rsp = router.call_ok(Request::Recgonized(2));
|
||||
assert_eq!(rsp, 2);
|
||||
|
||||
let rsp = router.call_ok(Request::Reusable(2));
|
||||
let rsp = router.call_ok(Request::Recgonized(2));
|
||||
assert_eq!(rsp, 4);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn single_use_does_not_share_service() {
|
||||
let mut router = Router::new(Recognize, 1);
|
||||
|
||||
let rsp = router.call_ok(Request::SingleUse(2));
|
||||
assert_eq!(rsp, 2);
|
||||
|
||||
let rsp = router.call_ok(Request::SingleUse(2));
|
||||
assert_eq!(rsp, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn single_use_not_cached_or_limited_by_capacity() {
|
||||
let mut router = Router::new(Recognize, 1);
|
||||
|
||||
let rsp = router.call_ok(Request::Reusable(2));
|
||||
assert_eq!(rsp, 2);
|
||||
|
||||
let rsp = router.call_ok(Request::SingleUse(2));
|
||||
assert_eq!(rsp, 2);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,16 +5,13 @@ use std::marker::PhantomData;
|
|||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
|
||||
use futures::{Future, Poll};
|
||||
use futures::future::Map;
|
||||
use futures::{Future, Poll, future};
|
||||
use http::{self, uri};
|
||||
use tokio_core::reactor::Handle;
|
||||
use tower_service as tower;
|
||||
use tower_h2;
|
||||
use tower_reconnect::Reconnect;
|
||||
|
||||
use conduit_proxy_router::Reuse;
|
||||
|
||||
use control;
|
||||
use control::discovery::Endpoint;
|
||||
use ctx;
|
||||
|
@ -43,6 +40,26 @@ pub struct BindProtocol<C, B> {
|
|||
protocol: Protocol,
|
||||
}
|
||||
|
||||
/// A type of service binding
|
||||
///
|
||||
/// Some services, for various reasons, may not be able to be used to serve multiple
|
||||
/// requests. The `BindsPerRequest` binding ensures that a new stack is bound for each
|
||||
/// request.
|
||||
///
|
||||
/// `Bound` serivces may be used to process an arbitrary number of requests.
|
||||
pub enum Binding<B: tower_h2::Body + 'static> {
|
||||
Bound(Stack<B>),
|
||||
BindsPerRequest {
|
||||
endpoint: Endpoint,
|
||||
protocol: Protocol,
|
||||
bind: Bind<Arc<ctx::Proxy>, B>,
|
||||
// When `poll_ready` is called, the _next_ service to be used may be bound
|
||||
// ahead-of-time. This stack is used only to serve the next request to this
|
||||
// service.
|
||||
next: Option<Stack<B>>
|
||||
},
|
||||
}
|
||||
|
||||
/// Protocol portion of the `Recognize` key for a request.
|
||||
///
|
||||
/// This marks whether to use HTTP/2 or HTTP/1.x for a request. In
|
||||
|
@ -73,7 +90,9 @@ pub struct NormalizeUri<S> {
|
|||
inner: S
|
||||
}
|
||||
|
||||
pub type Service<B> = Reconnect<NormalizeUri<NewHttp<B>>>;
|
||||
pub type Service<B> = Binding<B>;
|
||||
|
||||
pub type Stack<B> = Reconnect<NormalizeUri<NewHttp<B>>>;
|
||||
|
||||
pub type NewHttp<B> = sensor::NewHttp<Client<B>, B, HttpBody>;
|
||||
|
||||
|
@ -81,10 +100,7 @@ pub type HttpResponse = http::Response<sensor::http::ResponseBody<HttpBody>>;
|
|||
|
||||
pub type HttpRequest<B> = http::Request<sensor::http::RequestBody<B>>;
|
||||
|
||||
pub type Client<B> = transparency::Client<
|
||||
sensor::Connect<transport::Connect>,
|
||||
B,
|
||||
>;
|
||||
pub type Client<B> = transparency::Client<sensor::Connect<transport::Connect>, B>;
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub enum BufferSpawnError {
|
||||
|
@ -155,31 +171,17 @@ impl<C: Clone, B> Clone for Bind<C, B> {
|
|||
|
||||
|
||||
impl<C, B> Bind<C, B> {
|
||||
|
||||
// pub fn ctx(&self) -> &C {
|
||||
// &self.ctx
|
||||
// }
|
||||
|
||||
pub fn executor(&self) -> &Handle {
|
||||
&self.executor
|
||||
}
|
||||
|
||||
// pub fn req_ids(&self) -> &Arc<AtomicUsize> {
|
||||
// &self.req_ids
|
||||
// }
|
||||
|
||||
// pub fn sensors(&self) -> &telemetry::Sensors {
|
||||
// &self.sensors
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
impl<B> Bind<Arc<ctx::Proxy>, B>
|
||||
where
|
||||
B: tower_h2::Body + 'static,
|
||||
{
|
||||
pub fn bind_service(&self, ep: &Endpoint, protocol: &Protocol) -> Service<B> {
|
||||
trace!("bind_service endpoint={:?}, protocol={:?}", ep, protocol);
|
||||
fn bind_stack(&self, ep: &Endpoint, protocol: &Protocol) -> Stack<B> {
|
||||
debug!("bind_stack endpoint={:?}, protocol={:?}", ep, protocol);
|
||||
let addr = ep.address();
|
||||
let client_ctx = ctx::transport::Client::new(
|
||||
&self.ctx,
|
||||
|
@ -196,7 +198,7 @@ where
|
|||
let client = transparency::Client::new(
|
||||
protocol,
|
||||
connect,
|
||||
self.executor.clone(),
|
||||
self.executor.clone()
|
||||
);
|
||||
|
||||
let sensors = self.sensors.http(
|
||||
|
@ -214,6 +216,19 @@ where
|
|||
// TODO: Add some sort of backoff logic.
|
||||
Reconnect::new(proxy)
|
||||
}
|
||||
|
||||
pub fn new_binding(&self, ep: &Endpoint, protocol: &Protocol) -> Binding<B> {
|
||||
if protocol.can_reuse_clients() {
|
||||
Binding::Bound(self.bind_stack(ep, protocol))
|
||||
} else {
|
||||
Binding::BindsPerRequest {
|
||||
endpoint: ep.clone(),
|
||||
protocol: protocol.clone(),
|
||||
bind: self.clone(),
|
||||
next: None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl BindProtocol =====
|
||||
|
@ -240,7 +255,7 @@ where
|
|||
type BindError = ();
|
||||
|
||||
fn bind(&self, ep: &Endpoint) -> Result<Self::Service, Self::BindError> {
|
||||
Ok(self.bind.bind_service(ep, &self.protocol))
|
||||
Ok(self.bind.new_binding(ep, &self.protocol))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -272,7 +287,7 @@ where
|
|||
type Error = <Self::Service as tower::Service>::Error;
|
||||
type Service = NormalizeUri<S::Service>;
|
||||
type InitError = S::InitError;
|
||||
type Future = Map<
|
||||
type Future = future::Map<
|
||||
S::Future,
|
||||
fn(S::Service) -> NormalizeUri<S::Service>
|
||||
>;
|
||||
|
@ -306,6 +321,44 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
// ===== impl Binding =====
|
||||
|
||||
impl<B: tower_h2::Body + 'static> tower::Service for Binding<B> {
|
||||
type Request = <Stack<B> as tower::Service>::Request;
|
||||
type Response = <Stack<B> as tower::Service>::Response;
|
||||
type Error = <Stack<B> as tower::Service>::Error;
|
||||
type Future = <Stack<B> as tower::Service>::Future;
|
||||
|
||||
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
|
||||
match *self {
|
||||
// A service is already bound, so poll its readiness.
|
||||
Binding::Bound(ref mut svc) |
|
||||
Binding::BindsPerRequest { next: Some(ref mut svc), .. } => svc.poll_ready(),
|
||||
|
||||
// If no stack has been bound, bind it now so that its readiness can be
|
||||
// checked. Store it so it can be consumed to dispatch the next request.
|
||||
Binding::BindsPerRequest { ref endpoint, ref protocol, ref bind, ref mut next } => {
|
||||
let mut svc = bind.bind_stack(endpoint, protocol);
|
||||
let ready = svc.poll_ready()?;
|
||||
*next = Some(svc);
|
||||
Ok(ready)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, request: Self::Request) -> Self::Future {
|
||||
match *self {
|
||||
Binding::Bound(ref mut svc) => svc.call(request),
|
||||
Binding::BindsPerRequest { ref endpoint, ref protocol, ref bind, ref mut next } => {
|
||||
// If a service has already been bound in `poll_ready`, consume it.
|
||||
// Otherwise, bind a new service on-the-spot.
|
||||
let mut svc = next.take().unwrap_or_else(|| bind.bind_stack(endpoint, protocol));
|
||||
svc.call(request)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Protocol =====
|
||||
|
||||
|
||||
|
@ -327,18 +380,10 @@ impl Protocol {
|
|||
Protocol::Http1(host)
|
||||
}
|
||||
|
||||
pub fn is_cachable(&self) -> bool {
|
||||
pub fn can_reuse_clients(&self) -> bool {
|
||||
match *self {
|
||||
Protocol::Http2 | Protocol::Http1(Host::Authority(_)) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_key<T>(self, key: T) -> Reuse<(T, Protocol)> {
|
||||
if self.is_cachable() {
|
||||
Reuse::Reusable((key, self))
|
||||
} else {
|
||||
Reuse::SingleUse((key, self))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ use tower_service as tower;
|
|||
use tower_buffer::{self, Buffer};
|
||||
use tower_in_flight_limit::{self, InFlightLimit};
|
||||
use tower_h2;
|
||||
use conduit_proxy_router::{Reuse, Recognize};
|
||||
use conduit_proxy_router::Recognize;
|
||||
|
||||
use bind;
|
||||
use ctx;
|
||||
|
@ -46,7 +46,7 @@ where
|
|||
type RouteError = bind::BufferSpawnError;
|
||||
type Service = InFlightLimit<Buffer<bind::Service<B>>>;
|
||||
|
||||
fn recognize(&self, req: &Self::Request) -> Option<Reuse<Self::Key>> {
|
||||
fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
|
||||
let key = req.extensions()
|
||||
.get::<Arc<ctx::transport::Server>>()
|
||||
.and_then(|ctx| {
|
||||
|
@ -57,7 +57,7 @@ where
|
|||
|
||||
let proto = bind::Protocol::detect(req);
|
||||
|
||||
let key = key.map(move|addr| proto.into_key(addr));
|
||||
let key = key.map(move |addr| (addr, proto));
|
||||
trace!("recognize key={:?}", key);
|
||||
|
||||
|
||||
|
@ -75,8 +75,8 @@ where
|
|||
debug!("building inbound {:?} client to {}", proto, addr);
|
||||
|
||||
let endpoint = (*addr).into();
|
||||
let bind = self.bind.bind_service(&endpoint, proto);
|
||||
Buffer::new(bind, self.bind.executor())
|
||||
let binding = self.bind.new_binding(&endpoint, proto);
|
||||
Buffer::new(binding, self.bind.executor())
|
||||
.map(|buffer| {
|
||||
InFlightLimit::new(buffer, MAX_IN_FLIGHT)
|
||||
})
|
||||
|
@ -116,7 +116,7 @@ mod tests {
|
|||
let srv_ctx = ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst));
|
||||
|
||||
let rec = srv_ctx.orig_dst_if_not_local().map(|addr|
|
||||
bind::Protocol::Http1(Host::NoAuthority).into_key(addr)
|
||||
(addr, bind::Protocol::Http1(Host::NoAuthority))
|
||||
);
|
||||
|
||||
let mut req = http::Request::new(());
|
||||
|
@ -145,7 +145,7 @@ mod tests {
|
|||
));
|
||||
|
||||
inbound.recognize(&req) == default.map(|addr|
|
||||
bind::Protocol::Http1(Host::NoAuthority).into_key(addr)
|
||||
(addr, bind::Protocol::Http1(Host::NoAuthority))
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -157,7 +157,7 @@ mod tests {
|
|||
let req = http::Request::new(());
|
||||
|
||||
inbound.recognize(&req) == default.map(|addr|
|
||||
bind::Protocol::Http1(Host::NoAuthority).into_key(addr)
|
||||
(addr, bind::Protocol::Http1(Host::NoAuthority))
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -180,7 +180,7 @@ mod tests {
|
|||
));
|
||||
|
||||
inbound.recognize(&req) == default.map(|addr|
|
||||
bind::Protocol::Http1(Host::NoAuthority).into_key(addr)
|
||||
(addr, bind::Protocol::Http1(Host::NoAuthority))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ use tower_buffer::Buffer;
|
|||
use tower_discover::{Change, Discover};
|
||||
use tower_in_flight_limit::InFlightLimit;
|
||||
use tower_h2;
|
||||
use conduit_proxy_router::{Reuse, Recognize};
|
||||
use conduit_proxy_router::Recognize;
|
||||
|
||||
use bind::{self, Bind, Protocol};
|
||||
use control::{self, discovery};
|
||||
|
@ -67,7 +67,7 @@ where
|
|||
choose::PowerOfTwoChoices<rand::ThreadRng>
|
||||
>>>>;
|
||||
|
||||
fn recognize(&self, req: &Self::Request) -> Option<Reuse<Self::Key>> {
|
||||
fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
|
||||
let proto = bind::Protocol::detect(req);
|
||||
|
||||
// The request URI and Host: header have not yet been normalized
|
||||
|
@ -106,7 +106,7 @@ where
|
|||
// original destination.
|
||||
let dest = dest?;
|
||||
|
||||
Some(proto.into_key(dest))
|
||||
Some((dest, proto))
|
||||
}
|
||||
|
||||
/// Builds a dynamic, load balancing service.
|
||||
|
|
Loading…
Reference in New Issue