From b238d97137abd98c91f65e3d7571141640dcd409 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 10 May 2018 11:51:01 -0700 Subject: [PATCH] router: Store `recognize` outside of lock (#913) The router stores its cache and `Recognize` implementation within a `Mutex`, but there is no need for the recognizer to be locked. This change creates a new `Cache` type that is locked independently of `Recognize`. In order to accomplish this, `Recognize::bind_service` has been changed to take an immutable reference to its `self`. The (unused) `Single` type has been removed because it relied on `bind_service` being mutable. --- proxy/router/src/lib.rs | 66 ++++++++++++++-------------------------- proxy/src/control/mod.rs | 1 + proxy/src/inbound.rs | 2 +- proxy/src/outbound.rs | 2 +- 4 files changed, 26 insertions(+), 45 deletions(-) diff --git a/proxy/router/src/lib.rs b/proxy/router/src/lib.rs index f089cf227..a615ad601 100644 --- a/proxy/router/src/lib.rs +++ b/proxy/router/src/lib.rs @@ -14,7 +14,7 @@ use std::sync::{Arc, Mutex}; pub struct Router where T: Recognize, { - inner: Arc>>, + inner: Arc>, } /// Provides a strategy for routing a Request to a Service. @@ -51,11 +51,9 @@ pub trait Recognize { /// /// The returned service must always be in the ready state (i.e. /// `poll_ready` must always return `Ready` or `Err`). - fn bind_service(&mut self, key: &Self::Key) -> Result; + fn bind_service(&self, key: &Self::Key) -> Result; } -pub struct Single(Option); - #[derive(Debug, PartialEq)] pub enum Error { Inner(T), @@ -73,8 +71,13 @@ where T: Recognize, struct Inner where T: Recognize, { - routes: IndexMap, recognize: T, + cache: Mutex>, +} + +struct Cache +{ + routes: IndexMap, capacity: usize, } @@ -95,11 +98,13 @@ where T: Recognize { pub fn new(recognize: T, capacity: usize) -> Self { Router { - inner: Arc::new(Mutex::new(Inner { - routes: IndexMap::default(), + inner: Arc::new(Inner { recognize, - capacity, - })), + cache: Mutex::new(Cache { + routes: IndexMap::default(), + capacity, + }), + }), } } } @@ -127,34 +132,34 @@ where T: Recognize, /// /// The response fails when the request cannot be routed. fn call(&mut self, request: Self::Request) -> Self::Future { - let inner = &mut *self.inner.lock().expect("lock router cache"); - - let key = match inner.recognize.recognize(&request) { + let key = match self.inner.recognize.recognize(&request) { Some(key) => key, None => return ResponseFuture::not_recognized(), }; + let cache = &mut *self.inner.cache.lock().expect("lock router cache"); + // First, try to load a cached route for `key`. - if let Some(service) = inner.routes.get_mut(&key) { + if let Some(service) = cache.routes.get_mut(&key) { return ResponseFuture::new(service.call(request)); } // Since there wasn't a cached route, ensure that there is capacity for a // new one. - if inner.routes.len() == inner.capacity { + if cache.routes.len() == cache.capacity { // TODO If the cache is full, evict the oldest inactive route. If all // routes are active, fail the request. - return ResponseFuture::no_capacity(inner.capacity); + return ResponseFuture::no_capacity(cache.capacity); } // Bind a new route, send the request on the route, and cache the route. - let mut service = match inner.recognize.bind_service(&key) { + let mut service = match self.inner.recognize.bind_service(&key) { Ok(svc) => svc, Err(e) => return ResponseFuture { state: State::RouteError(e) }, }; let response = service.call(request); - inner.routes.insert(key, service); + cache.routes.insert(key, service); ResponseFuture::new(response) } } @@ -167,31 +172,6 @@ where T: Recognize, } } -// ===== impl Single ===== - -impl Single { - pub fn new(svc: S) -> Self { - Single(Some(svc)) - } -} - -impl Recognize for Single { - type Request = S::Request; - type Response = S::Response; - type Error = S::Error; - type Key = (); - type RouteError = (); - type Service = S; - - fn recognize(&self, _: &Self::Request) -> Option { - Some(()) - } - - fn bind_service(&mut self, _: &Self::Key) -> Result { - Ok(self.0.take().expect("static route bound twice")) - } -} - // ===== impl ResponseFuture ===== impl ResponseFuture @@ -305,7 +285,7 @@ mod tests { } } - fn bind_service(&mut self, _: &Self::Key) -> Result { + fn bind_service(&self, _: &Self::Key) -> Result { Ok(MultiplyAndAssign(1)) } } diff --git a/proxy/src/control/mod.rs b/proxy/src/control/mod.rs index 6fd972597..884f11a3a 100644 --- a/proxy/src/control/mod.rs +++ b/proxy/src/control/mod.rs @@ -31,6 +31,7 @@ use self::discovery::{Background as DiscoBg, Discovery, Watch}; pub use self::discovery::Bind; pub use self::observe::Observe; +#[derive(Clone)] pub struct Control { disco: Discovery, } diff --git a/proxy/src/inbound.rs b/proxy/src/inbound.rs index 77941e721..6e338dc67 100644 --- a/proxy/src/inbound.rs +++ b/proxy/src/inbound.rs @@ -70,7 +70,7 @@ where /// /// Buffering is currently unbounded and does not apply timeouts. This must be /// changed. - fn bind_service(&mut self, key: &Self::Key) -> Result { + fn bind_service(&self, key: &Self::Key) -> Result { let &(ref addr, ref proto) = key; debug!("building inbound {:?} client to {}", proto, addr); diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index d0deea4a3..7ad3150ce 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -119,7 +119,7 @@ where /// Buffering is currently unbounded and does not apply timeouts. This must be /// changed. fn bind_service( - &mut self, + &self, key: &Self::Key, ) -> Result { let &(ref dest, ref protocol) = key;