diff --git a/proxy/router/src/lib.rs b/proxy/router/src/lib.rs index f089cf227..a615ad601 100644 --- a/proxy/router/src/lib.rs +++ b/proxy/router/src/lib.rs @@ -14,7 +14,7 @@ use std::sync::{Arc, Mutex}; pub struct Router where T: Recognize, { - inner: Arc>>, + inner: Arc>, } /// Provides a strategy for routing a Request to a Service. @@ -51,11 +51,9 @@ pub trait Recognize { /// /// The returned service must always be in the ready state (i.e. /// `poll_ready` must always return `Ready` or `Err`). - fn bind_service(&mut self, key: &Self::Key) -> Result; + fn bind_service(&self, key: &Self::Key) -> Result; } -pub struct Single(Option); - #[derive(Debug, PartialEq)] pub enum Error { Inner(T), @@ -73,8 +71,13 @@ where T: Recognize, struct Inner where T: Recognize, { - routes: IndexMap, recognize: T, + cache: Mutex>, +} + +struct Cache +{ + routes: IndexMap, capacity: usize, } @@ -95,11 +98,13 @@ where T: Recognize { pub fn new(recognize: T, capacity: usize) -> Self { Router { - inner: Arc::new(Mutex::new(Inner { - routes: IndexMap::default(), + inner: Arc::new(Inner { recognize, - capacity, - })), + cache: Mutex::new(Cache { + routes: IndexMap::default(), + capacity, + }), + }), } } } @@ -127,34 +132,34 @@ where T: Recognize, /// /// The response fails when the request cannot be routed. fn call(&mut self, request: Self::Request) -> Self::Future { - let inner = &mut *self.inner.lock().expect("lock router cache"); - - let key = match inner.recognize.recognize(&request) { + let key = match self.inner.recognize.recognize(&request) { Some(key) => key, None => return ResponseFuture::not_recognized(), }; + let cache = &mut *self.inner.cache.lock().expect("lock router cache"); + // First, try to load a cached route for `key`. - if let Some(service) = inner.routes.get_mut(&key) { + if let Some(service) = cache.routes.get_mut(&key) { return ResponseFuture::new(service.call(request)); } // Since there wasn't a cached route, ensure that there is capacity for a // new one. - if inner.routes.len() == inner.capacity { + if cache.routes.len() == cache.capacity { // TODO If the cache is full, evict the oldest inactive route. If all // routes are active, fail the request. - return ResponseFuture::no_capacity(inner.capacity); + return ResponseFuture::no_capacity(cache.capacity); } // Bind a new route, send the request on the route, and cache the route. - let mut service = match inner.recognize.bind_service(&key) { + let mut service = match self.inner.recognize.bind_service(&key) { Ok(svc) => svc, Err(e) => return ResponseFuture { state: State::RouteError(e) }, }; let response = service.call(request); - inner.routes.insert(key, service); + cache.routes.insert(key, service); ResponseFuture::new(response) } } @@ -167,31 +172,6 @@ where T: Recognize, } } -// ===== impl Single ===== - -impl Single { - pub fn new(svc: S) -> Self { - Single(Some(svc)) - } -} - -impl Recognize for Single { - type Request = S::Request; - type Response = S::Response; - type Error = S::Error; - type Key = (); - type RouteError = (); - type Service = S; - - fn recognize(&self, _: &Self::Request) -> Option { - Some(()) - } - - fn bind_service(&mut self, _: &Self::Key) -> Result { - Ok(self.0.take().expect("static route bound twice")) - } -} - // ===== impl ResponseFuture ===== impl ResponseFuture @@ -305,7 +285,7 @@ mod tests { } } - fn bind_service(&mut self, _: &Self::Key) -> Result { + fn bind_service(&self, _: &Self::Key) -> Result { Ok(MultiplyAndAssign(1)) } } diff --git a/proxy/src/control/mod.rs b/proxy/src/control/mod.rs index 6fd972597..884f11a3a 100644 --- a/proxy/src/control/mod.rs +++ b/proxy/src/control/mod.rs @@ -31,6 +31,7 @@ use self::discovery::{Background as DiscoBg, Discovery, Watch}; pub use self::discovery::Bind; pub use self::observe::Observe; +#[derive(Clone)] pub struct Control { disco: Discovery, } diff --git a/proxy/src/inbound.rs b/proxy/src/inbound.rs index 77941e721..6e338dc67 100644 --- a/proxy/src/inbound.rs +++ b/proxy/src/inbound.rs @@ -70,7 +70,7 @@ where /// /// Buffering is currently unbounded and does not apply timeouts. This must be /// changed. - fn bind_service(&mut self, key: &Self::Key) -> Result { + fn bind_service(&self, key: &Self::Key) -> Result { let &(ref addr, ref proto) = key; debug!("building inbound {:?} client to {}", proto, addr); diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index d0deea4a3..7ad3150ce 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -119,7 +119,7 @@ where /// Buffering is currently unbounded and does not apply timeouts. This must be /// changed. fn bind_service( - &mut self, + &self, key: &Self::Key, ) -> Result { let &(ref dest, ref protocol) = key;