router: Store `recognize` outside of lock (#913)
The router stores its cache and `Recognize` implementation within a `Mutex`, but there is no need for the recognizer to be locked. This change creates a new `Cache` type that is locked independently of `Recognize`. In order to accomplish this, `Recognize::bind_service` has been changed to take an immutable reference to its `self`. The (unused) `Single` type has been removed because it relied on `bind_service` being mutable.
This commit is contained in:
parent
ee839d5ba6
commit
b238d97137
|
@ -14,7 +14,7 @@ use std::sync::{Arc, Mutex};
|
|||
pub struct Router<T>
|
||||
where T: Recognize,
|
||||
{
|
||||
inner: Arc<Mutex<Inner<T>>>,
|
||||
inner: Arc<Inner<T>>,
|
||||
}
|
||||
|
||||
/// Provides a strategy for routing a Request to a Service.
|
||||
|
@ -51,11 +51,9 @@ pub trait Recognize {
|
|||
///
|
||||
/// The returned service must always be in the ready state (i.e.
|
||||
/// `poll_ready` must always return `Ready` or `Err`).
|
||||
fn bind_service(&mut self, key: &Self::Key) -> Result<Self::Service, Self::RouteError>;
|
||||
fn bind_service(&self, key: &Self::Key) -> Result<Self::Service, Self::RouteError>;
|
||||
}
|
||||
|
||||
pub struct Single<S>(Option<S>);
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Error<T, U> {
|
||||
Inner(T),
|
||||
|
@ -73,8 +71,13 @@ where T: Recognize,
|
|||
struct Inner<T>
|
||||
where T: Recognize,
|
||||
{
|
||||
routes: IndexMap<T::Key, T::Service>,
|
||||
recognize: T,
|
||||
cache: Mutex<Cache<T::Key, T::Service>>,
|
||||
}
|
||||
|
||||
struct Cache<K: Hash + Eq, V>
|
||||
{
|
||||
routes: IndexMap<K, V>,
|
||||
capacity: usize,
|
||||
}
|
||||
|
||||
|
@ -95,11 +98,13 @@ where T: Recognize
|
|||
{
|
||||
pub fn new(recognize: T, capacity: usize) -> Self {
|
||||
Router {
|
||||
inner: Arc::new(Mutex::new(Inner {
|
||||
routes: IndexMap::default(),
|
||||
inner: Arc::new(Inner {
|
||||
recognize,
|
||||
capacity,
|
||||
})),
|
||||
cache: Mutex::new(Cache {
|
||||
routes: IndexMap::default(),
|
||||
capacity,
|
||||
}),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -127,34 +132,34 @@ where T: Recognize,
|
|||
///
|
||||
/// The response fails when the request cannot be routed.
|
||||
fn call(&mut self, request: Self::Request) -> Self::Future {
|
||||
let inner = &mut *self.inner.lock().expect("lock router cache");
|
||||
|
||||
let key = match inner.recognize.recognize(&request) {
|
||||
let key = match self.inner.recognize.recognize(&request) {
|
||||
Some(key) => key,
|
||||
None => return ResponseFuture::not_recognized(),
|
||||
};
|
||||
|
||||
let cache = &mut *self.inner.cache.lock().expect("lock router cache");
|
||||
|
||||
// First, try to load a cached route for `key`.
|
||||
if let Some(service) = inner.routes.get_mut(&key) {
|
||||
if let Some(service) = cache.routes.get_mut(&key) {
|
||||
return ResponseFuture::new(service.call(request));
|
||||
}
|
||||
|
||||
// Since there wasn't a cached route, ensure that there is capacity for a
|
||||
// new one.
|
||||
if inner.routes.len() == inner.capacity {
|
||||
if cache.routes.len() == cache.capacity {
|
||||
// TODO If the cache is full, evict the oldest inactive route. If all
|
||||
// routes are active, fail the request.
|
||||
return ResponseFuture::no_capacity(inner.capacity);
|
||||
return ResponseFuture::no_capacity(cache.capacity);
|
||||
}
|
||||
|
||||
// Bind a new route, send the request on the route, and cache the route.
|
||||
let mut service = match inner.recognize.bind_service(&key) {
|
||||
let mut service = match self.inner.recognize.bind_service(&key) {
|
||||
Ok(svc) => svc,
|
||||
Err(e) => return ResponseFuture { state: State::RouteError(e) },
|
||||
};
|
||||
|
||||
let response = service.call(request);
|
||||
inner.routes.insert(key, service);
|
||||
cache.routes.insert(key, service);
|
||||
ResponseFuture::new(response)
|
||||
}
|
||||
}
|
||||
|
@ -167,31 +172,6 @@ where T: Recognize,
|
|||
}
|
||||
}
|
||||
|
||||
// ===== impl Single =====
|
||||
|
||||
impl<S: Service> Single<S> {
|
||||
pub fn new(svc: S) -> Self {
|
||||
Single(Some(svc))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Service> Recognize for Single<S> {
|
||||
type Request = S::Request;
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Key = ();
|
||||
type RouteError = ();
|
||||
type Service = S;
|
||||
|
||||
fn recognize(&self, _: &Self::Request) -> Option<Self::Key> {
|
||||
Some(())
|
||||
}
|
||||
|
||||
fn bind_service(&mut self, _: &Self::Key) -> Result<S, Self::RouteError> {
|
||||
Ok(self.0.take().expect("static route bound twice"))
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl ResponseFuture =====
|
||||
|
||||
impl<T> ResponseFuture<T>
|
||||
|
@ -305,7 +285,7 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
fn bind_service(&mut self, _: &Self::Key) -> Result<Self::Service, Self::RouteError> {
|
||||
fn bind_service(&self, _: &Self::Key) -> Result<Self::Service, Self::RouteError> {
|
||||
Ok(MultiplyAndAssign(1))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ use self::discovery::{Background as DiscoBg, Discovery, Watch};
|
|||
pub use self::discovery::Bind;
|
||||
pub use self::observe::Observe;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Control {
|
||||
disco: Discovery,
|
||||
}
|
||||
|
|
|
@ -70,7 +70,7 @@ where
|
|||
///
|
||||
/// Buffering is currently unbounded and does not apply timeouts. This must be
|
||||
/// changed.
|
||||
fn bind_service(&mut self, key: &Self::Key) -> Result<Self::Service, Self::RouteError> {
|
||||
fn bind_service(&self, key: &Self::Key) -> Result<Self::Service, Self::RouteError> {
|
||||
let &(ref addr, ref proto) = key;
|
||||
debug!("building inbound {:?} client to {}", proto, addr);
|
||||
|
||||
|
|
|
@ -119,7 +119,7 @@ where
|
|||
/// Buffering is currently unbounded and does not apply timeouts. This must be
|
||||
/// changed.
|
||||
fn bind_service(
|
||||
&mut self,
|
||||
&self,
|
||||
key: &Self::Key,
|
||||
) -> Result<Self::Service, Self::RouteError> {
|
||||
let &(ref dest, ref protocol) = key;
|
||||
|
|
Loading…
Reference in New Issue