mirror of https://github.com/linkerd/linkerd2.git
proxy/router: Implement LRU cache eviction (#925)
The router's cache has no means to evict unused entries when capacity is reached. This change does the following: - Wraps cache values in a smart pointer that tracks the last time of access for each entry. The smart pointer updates the access time when the reference to entry is dropped. - When capacity is not available, all nodes that have not been accessed within some minimal idle age are dropped. Accesses and updates to the map are O(1) when capacity is available. Reclaiming capacity is O(n), so it's expected that the router is configured with enough capacity such that capacity need not be reclaimed usually.
This commit is contained in:
parent
8cf5f45a58
commit
e91699bba2
|
@ -1,30 +1,66 @@
|
|||
use indexmap::IndexMap;
|
||||
use std::hash::Hash;
|
||||
use std::{hash::Hash, ops::{Deref, DerefMut}, time::{Duration, Instant}};
|
||||
|
||||
// Reexported so IndexMap isn't exposed.
|
||||
pub use indexmap::Equivalent;
|
||||
|
||||
/// A cache for routes
|
||||
/// An LRU cache
|
||||
///
|
||||
/// ## Assumptions
|
||||
///
|
||||
/// - `access` is common;
|
||||
/// - `store` is less common;
|
||||
/// - `capacity` is large enough..
|
||||
/// - `capacity` is large enough that idle vals need not be removed frequently.
|
||||
///
|
||||
/// ## Complexity
|
||||
///
|
||||
/// - `access` computes in O(1) time (amortized average).
|
||||
/// - `store` computes in O(1) time (average).
|
||||
// TODO LRU
|
||||
pub struct Cache<K: Hash + Eq, V> {
|
||||
vals: IndexMap<K, V>,
|
||||
/// - `reserve` computes in O(n) time (average) when capacity is not available,
|
||||
///
|
||||
/// ### TODO
|
||||
///
|
||||
/// The underlying datastructure could be improved somewhat so that `reserve` can evict
|
||||
/// unused nodes more efficiently. Given that eviction is intended to be rare, this is
|
||||
/// probably not a very high priority.
|
||||
pub struct Cache<K: Hash + Eq, V, N: Now = ()> {
|
||||
vals: IndexMap<K, Node<V>>,
|
||||
capacity: usize,
|
||||
max_idle_age: Duration,
|
||||
|
||||
/// The time source.
|
||||
now: N,
|
||||
}
|
||||
|
||||
/// Provides the current time within the module. Useful for testing.
|
||||
pub trait Now {
|
||||
fn now(&self) -> Instant;
|
||||
}
|
||||
|
||||
/// Wraps cache values so that each tracks its last access time.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct Node<T> {
|
||||
value: T,
|
||||
last_access: Instant,
|
||||
}
|
||||
|
||||
/// A smart pointer that updates an access time when dropped.
|
||||
///
|
||||
/// Wraps a mutable reference to a `V`-typed value.
|
||||
///
|
||||
/// When the guard is dropped, the value's `last_access` time is updated with the provided
|
||||
/// time source.
|
||||
#[derive(Debug)]
|
||||
pub struct Access<'a, T: 'a, N: Now + 'a = ()> {
|
||||
node: &'a mut Node<T>,
|
||||
now: &'a N,
|
||||
}
|
||||
|
||||
/// A handle to a `Cache` that has capacity for at least one additional value.
|
||||
pub struct Reserve<'a, K: Hash + Eq + 'a, V: 'a> {
|
||||
vals: &'a mut IndexMap<K, V>,
|
||||
#[derive(Debug)]
|
||||
pub struct Reserve<'a, K: Hash + Eq + 'a, V: 'a, N: 'a> {
|
||||
vals: &'a mut IndexMap<K, Node<V>>,
|
||||
now: &'a N,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
|
@ -32,58 +68,186 @@ pub struct CapacityExhausted {
|
|||
pub capacity: usize,
|
||||
}
|
||||
|
||||
impl<K: Hash + Eq, V> Cache<K, V> {
|
||||
pub fn new(capacity: usize) -> Self {
|
||||
// ===== impl Cache =====
|
||||
|
||||
impl<K: Hash + Eq, V> Cache<K, V, ()> {
|
||||
pub fn new(capacity: usize, max_idle_age: Duration) -> Self {
|
||||
Self {
|
||||
capacity,
|
||||
vals: IndexMap::default(),
|
||||
max_idle_age,
|
||||
now: (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: Hash + Eq, V, N: Now> Cache<K, V, N> {
|
||||
/// Accesses a route.
|
||||
// TODO track access times for each entry.
|
||||
pub fn access<Q>(&mut self, key: &Q) -> Option<&mut V>
|
||||
///
|
||||
/// A mutable reference to the route is wrapped in the returned `Access` to
|
||||
/// ensure that the access-time is updated when the reference is released.
|
||||
pub fn access<Q>(&mut self, key: &Q) -> Option<Access<V, N>>
|
||||
where
|
||||
Q: Hash + Equivalent<K>,
|
||||
{
|
||||
self.vals.get_mut(key)
|
||||
let v = self.vals.get_mut(key)?;
|
||||
Some(v.access(&self.now))
|
||||
}
|
||||
|
||||
/// Ensures that there is capacity to store an additional route.
|
||||
///
|
||||
/// Returns a handle that may be used to store an ite,. If there is no available
|
||||
/// capacity, idle entries may be evicted to create capacity.
|
||||
///
|
||||
/// An error is returned if there is no available capacity.
|
||||
// TODO evict old entries
|
||||
pub fn reserve(&mut self) -> Result<Reserve<K, V>, CapacityExhausted> {
|
||||
let avail = self.capacity - self.vals.len();
|
||||
if avail == 0 {
|
||||
// TODO If the cache is full, evict the oldest inactive route. If all
|
||||
// routes are active, fail the request.
|
||||
return Err(CapacityExhausted {
|
||||
capacity: self.capacity,
|
||||
pub fn reserve(&mut self) -> Result<Reserve<K, V, N>, CapacityExhausted> {
|
||||
if self.vals.len() == self.capacity {
|
||||
// Only whole seconds are used to determine whether a node should be retained.
|
||||
// This is intended to prevent the need for repetitive reservations when
|
||||
// entries are clustered in tight time ranges.
|
||||
let max_age = self.max_idle_age.as_secs();
|
||||
let now = self.now.now();
|
||||
self.vals.retain(|_, n| {
|
||||
let age = now - n.last_access();
|
||||
age.as_secs() <= max_age
|
||||
});
|
||||
|
||||
if self.vals.len() == self.capacity {
|
||||
return Err(CapacityExhausted {
|
||||
capacity: self.capacity,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Reserve {
|
||||
vals: &mut self.vals,
|
||||
now: &self.now,
|
||||
})
|
||||
}
|
||||
|
||||
/// Overrides the time source for tests.
|
||||
#[cfg(test)]
|
||||
fn with_clock<M: Now>(self, now: M) -> Cache<K, V, M> {
|
||||
Cache {
|
||||
now,
|
||||
vals: self.vals,
|
||||
capacity: self.capacity,
|
||||
max_idle_age: self.max_idle_age,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, K: Hash + Eq + 'a, V: 'a> Reserve<'a, K, V> {
|
||||
// ===== impl Reserve =====
|
||||
|
||||
impl<'a, K: Hash + Eq + 'a, V: 'a, N: Now + 'a> Reserve<'a, K, V, N> {
|
||||
/// Stores a route in the cache.
|
||||
pub fn store(self, key: K, val: V) {
|
||||
self.vals.insert(key, val);
|
||||
let node = Node::new(val.into(), self.now.now());
|
||||
self.vals.insert(key, node);
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Access =====
|
||||
|
||||
impl<'a, T: 'a, N: Now + 'a> Deref for Access<'a, T, N> {
|
||||
type Target = T;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.node
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: 'a, N: Now + 'a> DerefMut for Access<'a, T, N> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.node
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: 'a, N: Now + 'a> Access<'a, T, N> {
|
||||
#[cfg(test)]
|
||||
fn last_access(&self) -> Instant {
|
||||
self.node.last_access
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: 'a, N: Now + 'a> Drop for Access<'a, T, N> {
|
||||
fn drop(&mut self) {
|
||||
self.node.last_access = self.now.now();
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Node =====
|
||||
|
||||
impl<T> Node<T> {
|
||||
pub fn new(value: T, last_access: Instant) -> Self {
|
||||
Node { value, last_access }
|
||||
}
|
||||
|
||||
pub fn access<'a, N: Now + 'a>(&'a mut self, now: &'a N) -> Access<'a, T, N> {
|
||||
Access { now, node: self }
|
||||
}
|
||||
|
||||
pub fn last_access(&self) -> Instant {
|
||||
self.last_access
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for Node<T> {
|
||||
type Target = T;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.value
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DerefMut for Node<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.value
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Now =====
|
||||
|
||||
/// Default source of time.
|
||||
impl Now for () {
|
||||
fn now(&self) -> Instant {
|
||||
Instant::now()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures::Future;
|
||||
use std::{cell::RefCell, rc::Rc, time::{Duration, Instant}};
|
||||
use test_util::MultiplyAndAssign;
|
||||
use tower_service::Service;
|
||||
|
||||
/// A mocked instance of `Now` to drive tests.
|
||||
#[derive(Clone)]
|
||||
pub struct Clock(Rc<RefCell<Instant>>);
|
||||
|
||||
// ===== impl Clock =====
|
||||
|
||||
impl Default for Clock {
|
||||
fn default() -> Clock {
|
||||
Clock(Rc::new(RefCell::new(Instant::now())))
|
||||
}
|
||||
}
|
||||
|
||||
impl Clock {
|
||||
pub fn advance(&mut self, d: Duration) {
|
||||
*self.0.borrow_mut() += d;
|
||||
}
|
||||
}
|
||||
|
||||
impl Now for Clock {
|
||||
fn now(&self) -> Instant {
|
||||
self.0.borrow().clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reserve_and_store() {
|
||||
let mut cache = Cache::<_, MultiplyAndAssign>::new(2);
|
||||
let mut cache = Cache::<_, MultiplyAndAssign>::new(2, Duration::from_secs(1));
|
||||
|
||||
{
|
||||
let r = cache.reserve().expect("reserve");
|
||||
|
@ -106,7 +270,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn store_and_access() {
|
||||
let mut cache = Cache::<_, MultiplyAndAssign>::new(2);
|
||||
let mut cache = Cache::<_, MultiplyAndAssign>::new(2, Duration::from_secs(0));
|
||||
|
||||
assert!(cache.access(&1).is_none());
|
||||
assert!(cache.access(&2).is_none());
|
||||
|
@ -125,4 +289,130 @@ mod tests {
|
|||
assert!(cache.access(&1).is_some());
|
||||
assert!(cache.access(&2).is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reserve_does_nothing_when_capacity_exists() {
|
||||
let mut cache = Cache::<_, MultiplyAndAssign, _>::new(2, Duration::from_secs(0));
|
||||
|
||||
// Create a route that goes idle immediately:
|
||||
{
|
||||
let r = cache.reserve().expect("capacity");
|
||||
let mut service = MultiplyAndAssign::default();
|
||||
service.call(1.into()).wait().unwrap();
|
||||
r.store(1, service);
|
||||
};
|
||||
assert_eq!(cache.vals.len(), 1);
|
||||
|
||||
assert!(cache.reserve().is_ok());
|
||||
assert_eq!(cache.vals.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reserve_honors_max_idle_age() {
|
||||
let mut clock = Clock::default();
|
||||
let mut cache = Cache::<_, MultiplyAndAssign, _>::new(1, Duration::from_secs(2))
|
||||
.with_clock(clock.clone());
|
||||
|
||||
// Touch `1` at 0s.
|
||||
cache
|
||||
.reserve()
|
||||
.expect("capacity")
|
||||
.store(1, MultiplyAndAssign::default());
|
||||
assert_eq!(
|
||||
cache.reserve().err(),
|
||||
Some(CapacityExhausted { capacity: 1 })
|
||||
);
|
||||
assert_eq!(cache.vals.len(), 1);
|
||||
|
||||
// No capacity at 1s.
|
||||
clock.advance(Duration::from_secs(1));
|
||||
assert_eq!(
|
||||
cache.reserve().err(),
|
||||
Some(CapacityExhausted { capacity: 1 })
|
||||
);
|
||||
assert_eq!(cache.vals.len(), 1);
|
||||
|
||||
// No capacity at 2s.
|
||||
clock.advance(Duration::from_secs(1));
|
||||
assert_eq!(
|
||||
cache.reserve().err(),
|
||||
Some(CapacityExhausted { capacity: 1 })
|
||||
);
|
||||
assert_eq!(cache.vals.len(), 1);
|
||||
|
||||
// Capacity at 3+s.
|
||||
clock.advance(Duration::from_secs(1));
|
||||
assert!(cache.reserve().is_ok());
|
||||
assert_eq!(cache.vals.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn last_access() {
|
||||
let mut clock = Clock::default();
|
||||
let mut cache =
|
||||
Cache::<_, MultiplyAndAssign>::new(1, Duration::from_secs(0)).with_clock(clock.clone());
|
||||
|
||||
let t0 = clock.now();
|
||||
cache
|
||||
.reserve()
|
||||
.expect("capacity")
|
||||
.store(333, MultiplyAndAssign::default());
|
||||
|
||||
clock.advance(Duration::from_secs(1));
|
||||
let t1 = clock.now();
|
||||
assert_eq!(cache.access(&333).map(|n| n.last_access()), Some(t0));
|
||||
|
||||
clock.advance(Duration::from_secs(1));
|
||||
assert_eq!(cache.access(&333).map(|n| n.last_access()), Some(t1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn last_access_wiped_on_evict() {
|
||||
let mut clock = Clock::default();
|
||||
let mut cache =
|
||||
Cache::<_, MultiplyAndAssign>::new(1, Duration::from_secs(0)).with_clock(clock.clone());
|
||||
|
||||
let t0 = clock.now();
|
||||
cache
|
||||
.reserve()
|
||||
.expect("capacity")
|
||||
.store(333, MultiplyAndAssign::default());
|
||||
|
||||
clock.advance(Duration::from_secs(1));
|
||||
assert_eq!(cache.access(&333).map(|n| n.last_access()), Some(t0));
|
||||
|
||||
// Cause the router to evict the `333` route.
|
||||
clock.advance(Duration::from_secs(1));
|
||||
cache
|
||||
.reserve()
|
||||
.expect("capacity")
|
||||
.store(444, MultiplyAndAssign::default());
|
||||
|
||||
clock.advance(Duration::from_secs(1));
|
||||
let t1 = clock.now();
|
||||
cache
|
||||
.reserve()
|
||||
.expect("capacity")
|
||||
.store(333, MultiplyAndAssign::default());
|
||||
|
||||
clock.advance(Duration::from_secs(1));
|
||||
assert_eq!(cache.access(&333).map(|n| n.last_access()), Some(t1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn node_access_updated_on_drop() {
|
||||
let mut clock = Clock::default();
|
||||
let t0 = clock.now();
|
||||
let mut node = Node::new(123, t0);
|
||||
|
||||
clock.advance(Duration::from_secs(1));
|
||||
{
|
||||
let access = node.access(&clock);
|
||||
assert_eq!(access.last_access(), t0);
|
||||
}
|
||||
|
||||
let t1 = clock.now();
|
||||
assert_eq!(node.last_access(), t1);
|
||||
assert_ne!(t0, t1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ use tower_service::Service;
|
|||
use std::{error, fmt, mem};
|
||||
use std::hash::Hash;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
mod cache;
|
||||
|
||||
|
@ -93,11 +94,11 @@ where T: Recognize,
|
|||
impl<T> Router<T>
|
||||
where T: Recognize
|
||||
{
|
||||
pub fn new(recognize: T, capacity: usize) -> Self {
|
||||
pub fn new(recognize: T, capacity: usize, max_idle_age: Duration) -> Self {
|
||||
Router {
|
||||
inner: Arc::new(Inner {
|
||||
recognize,
|
||||
cache: Mutex::new(Cache::new(capacity)),
|
||||
cache: Mutex::new(Cache::new(capacity, max_idle_age)),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
@ -134,7 +135,7 @@ where T: Recognize,
|
|||
let cache = &mut *self.inner.cache.lock().expect("lock router cache");
|
||||
|
||||
// First, try to load a cached route for `key`.
|
||||
if let Some(service) = cache.access(&key) {
|
||||
if let Some(mut service) = cache.access(&key) {
|
||||
return ResponseFuture::new(service.call(request));
|
||||
}
|
||||
|
||||
|
@ -253,18 +254,22 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod test_util {
|
||||
use futures::{Poll, Future, future};
|
||||
use futures::{Poll, future};
|
||||
use tower_service::Service;
|
||||
|
||||
pub struct Recognize;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MultiplyAndAssign(usize);
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Request {
|
||||
NotRecognized,
|
||||
Recognized(usize),
|
||||
}
|
||||
|
||||
// ===== impl Recognize =====
|
||||
|
||||
impl super::Recognize for Recognize {
|
||||
type Request = Request;
|
||||
type Response = usize;
|
||||
|
@ -285,6 +290,14 @@ mod test_util {
|
|||
}
|
||||
}
|
||||
|
||||
// ===== impl MultiplyAndAssign =====
|
||||
|
||||
impl Default for MultiplyAndAssign {
|
||||
fn default() -> Self {
|
||||
MultiplyAndAssign(1)
|
||||
}
|
||||
}
|
||||
|
||||
impl Service for MultiplyAndAssign {
|
||||
type Request = Request;
|
||||
type Response = usize;
|
||||
|
@ -310,32 +323,29 @@ mod test_util {
|
|||
Request::Recognized(n)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for MultiplyAndAssign {
|
||||
fn default() -> Self {
|
||||
MultiplyAndAssign(1)
|
||||
}
|
||||
}
|
||||
|
||||
impl super::Router<Recognize> {
|
||||
pub fn call_ok(&mut self, req: Request) -> usize {
|
||||
self.call(req).wait().expect("should route")
|
||||
}
|
||||
|
||||
pub fn call_err(&mut self, req: Request) -> super::Error<(), ()> {
|
||||
self.call(req).wait().expect_err("should not route")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use futures::Future;
|
||||
use std::time::Duration;
|
||||
use test_util::*;
|
||||
use tower_service::Service;
|
||||
use super::{Error, Router};
|
||||
|
||||
impl Router<Recognize> {
|
||||
fn call_ok(&mut self, req: Request) -> usize {
|
||||
self.call(req).wait().expect("should route")
|
||||
}
|
||||
|
||||
fn call_err(&mut self, req: Request) -> super::Error<(), ()> {
|
||||
self.call(req).wait().expect_err("should not route")
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid() {
|
||||
let mut router = Router::new(Recognize, 1);
|
||||
let mut router = Router::new(Recognize, 1, Duration::from_secs(0));
|
||||
|
||||
let rsp = router.call_err(Request::NotRecognized);
|
||||
assert_eq!(rsp, Error::NotRecognized);
|
||||
|
@ -343,7 +353,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn cache_limited_by_capacity() {
|
||||
let mut router = Router::new(Recognize, 1);
|
||||
let mut router = Router::new(Recognize, 1, Duration::from_secs(1));
|
||||
|
||||
let rsp = router.call_ok(2.into());
|
||||
assert_eq!(rsp, 2);
|
||||
|
@ -354,7 +364,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn services_cached() {
|
||||
let mut router = Router::new(Recognize, 1);
|
||||
let mut router = Router::new(Recognize, 1, Duration::from_secs(0));
|
||||
|
||||
let rsp = router.call_ok(2.into());
|
||||
assert_eq!(rsp, 2);
|
||||
|
|
|
@ -48,6 +48,10 @@ pub struct Config {
|
|||
|
||||
pub outbound_router_capacity: usize,
|
||||
|
||||
pub inbound_router_max_idle_age: Duration,
|
||||
|
||||
pub outbound_router_max_idle_age: Duration,
|
||||
|
||||
/// The path to "/etc/resolv.conf"
|
||||
pub resolv_conf_path: PathBuf,
|
||||
|
||||
|
@ -147,6 +151,9 @@ pub const ENV_BIND_TIMEOUT: &str = "CONDUIT_PROXY_BIND_TIMEOUT";
|
|||
pub const ENV_INBOUND_ROUTER_CAPACITY: &str = "CONDUIT_PROXY_INBOUND_ROUTER_CAPACITY";
|
||||
pub const ENV_OUTBOUND_ROUTER_CAPACITY: &str = "CONDUIT_PROXY_OUTBOUND_ROUTER_CAPACITY";
|
||||
|
||||
pub const ENV_INBOUND_ROUTER_MAX_IDLE_AGE: &str = "CONDUIT_PROXY_INBOUND_ROUTER_MAX_IDLE_AGE";
|
||||
pub const ENV_OUTBOUND_ROUTER_MAX_IDLE_AGE: &str = "CONDUIT_PROXY_OUTBOUND_ROUTER_MAX_IDLE_AGE";
|
||||
|
||||
// These *disable* our protocol detection for connections whose SO_ORIGINAL_DST
|
||||
// has a port in the provided list.
|
||||
pub const ENV_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION: &str = "CONDUIT_PROXY_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION";
|
||||
|
@ -171,9 +178,12 @@ const DEFAULT_RESOLV_CONF: &str = "/etc/resolv.conf";
|
|||
|
||||
/// It's assumed that a typical proxy can serve inbound traffic for up to 100 pod-local
|
||||
/// HTTP services and may communicate with up to 10K external HTTP domains.
|
||||
const DEFAULT_INBOUND_ROUTER_CAPACITY: usize = 100;
|
||||
const DEFAULT_INBOUND_ROUTER_CAPACITY: usize = 100;
|
||||
const DEFAULT_OUTBOUND_ROUTER_CAPACITY: usize = 10_000;
|
||||
|
||||
const DEFAULT_INBOUND_ROUTER_MAX_IDLE_AGE: Duration = Duration::from_secs(60);
|
||||
const DEFAULT_OUTBOUND_ROUTER_MAX_IDLE_AGE: Duration = Duration::from_secs(60);
|
||||
|
||||
// By default, we keep a list of known assigned ports of server-first protocols.
|
||||
//
|
||||
// https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt
|
||||
|
@ -202,6 +212,8 @@ impl<'a> TryFrom<&'a Strings> for Config {
|
|||
let outbound_disable_ports = parse(strings, ENV_OUTBOUND_PORTS_DISABLE_PROTOCOL_DETECTION, parse_port_set);
|
||||
let inbound_router_capacity = parse(strings, ENV_INBOUND_ROUTER_CAPACITY, parse_number);
|
||||
let outbound_router_capacity = parse(strings, ENV_OUTBOUND_ROUTER_CAPACITY, parse_number);
|
||||
let inbound_router_max_idle_age = parse(strings, ENV_INBOUND_ROUTER_MAX_IDLE_AGE, parse_duration);
|
||||
let outbound_router_max_idle_age = parse(strings, ENV_OUTBOUND_ROUTER_MAX_IDLE_AGE, parse_duration);
|
||||
let bind_timeout = parse(strings, ENV_BIND_TIMEOUT, parse_duration);
|
||||
let resolv_conf_path = strings.get(ENV_RESOLV_CONF);
|
||||
let event_buffer_capacity = parse(strings, ENV_EVENT_BUFFER_CAPACITY, parse_number);
|
||||
|
@ -259,6 +271,11 @@ impl<'a> TryFrom<&'a Strings> for Config {
|
|||
outbound_router_capacity: outbound_router_capacity?
|
||||
.unwrap_or(DEFAULT_OUTBOUND_ROUTER_CAPACITY),
|
||||
|
||||
inbound_router_max_idle_age: inbound_router_max_idle_age?
|
||||
.unwrap_or(DEFAULT_INBOUND_ROUTER_MAX_IDLE_AGE),
|
||||
outbound_router_max_idle_age: outbound_router_max_idle_age?
|
||||
.unwrap_or(DEFAULT_OUTBOUND_ROUTER_MAX_IDLE_AGE),
|
||||
|
||||
resolv_conf_path: resolv_conf_path?
|
||||
.unwrap_or(DEFAULT_RESOLV_CONF.into())
|
||||
.into(),
|
||||
|
|
|
@ -227,10 +227,14 @@ where
|
|||
|
||||
let default_addr = config.private_forward.map(|a| a.into());
|
||||
|
||||
let fut = serve(
|
||||
inbound_listener,
|
||||
let router = Router::new(
|
||||
Inbound::new(default_addr, bind),
|
||||
config.inbound_router_capacity,
|
||||
config.inbound_router_max_idle_age,
|
||||
);
|
||||
let fut = serve(
|
||||
inbound_listener,
|
||||
router,
|
||||
config.private_connect_timeout,
|
||||
config.inbound_ports_disable_protocol_detection,
|
||||
ctx,
|
||||
|
@ -248,11 +252,14 @@ where
|
|||
let outbound = {
|
||||
let ctx = ctx::Proxy::outbound(&process_ctx);
|
||||
let bind = bind.clone().with_ctx(ctx.clone());
|
||||
let outgoing = Outbound::new(bind, control, config.bind_timeout);
|
||||
let router = Router::new(
|
||||
Outbound::new(bind, control, config.bind_timeout),
|
||||
config.outbound_router_capacity,
|
||||
config.outbound_router_max_idle_age,
|
||||
);
|
||||
let fut = serve(
|
||||
outbound_listener,
|
||||
outgoing,
|
||||
config.outbound_router_capacity,
|
||||
router,
|
||||
config.public_connect_timeout,
|
||||
config.outbound_ports_disable_protocol_detection,
|
||||
ctx,
|
||||
|
@ -328,8 +335,7 @@ where
|
|||
|
||||
fn serve<R, B, E, F, G>(
|
||||
bound_port: BoundPort,
|
||||
recognize: R,
|
||||
router_capacity: usize,
|
||||
router: Router<R>,
|
||||
tcp_connect_timeout: Duration,
|
||||
disable_protocol_detection_ports: IndexSet<u16>,
|
||||
proxy_ctx: Arc<ctx::Proxy>,
|
||||
|
@ -351,7 +357,6 @@ where
|
|||
+ 'static,
|
||||
G: GetOriginalDst + 'static,
|
||||
{
|
||||
let router = Router::new(recognize, router_capacity);
|
||||
let stack = Arc::new(NewServiceFn::new(move || {
|
||||
// Clone the router handle
|
||||
let router = router.clone();
|
||||
|
|
Loading…
Reference in New Issue