Proxy: Limit the max number of in-flight requests. (#398)

Currently, the max number of in-flight requests in the proxy is
unbounded. This is due to the `Buffer` middleware being unbounded.

This is resolved by adding an instance of `InFlightLimit` around
`Buffer`, capping the max number of in-flight requests for a given
endpoint.

Currently, the limit is hardcoded to 10,000. However, this will
eventually become a configuration value.

Fixes #287

Signed-off-by: Carl Lerche <me@carllerche.com>
This commit is contained in:
Carl Lerche 2018-02-20 19:56:21 -08:00 committed by GitHub
parent c579a8fe8d
commit 287128885e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 63 additions and 30 deletions

35
Cargo.lock generated
View File

@ -157,6 +157,7 @@ dependencies = [
"tower-discover 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)",
"tower-h2 0.1.0 (git+https://github.com/tower-rs/tower-h2)",
"tower-in-flight-limit 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-util 0.1.0 (git+https://github.com/tower-rs/tower)",
"url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -997,7 +998,7 @@ dependencies = [
[[package]]
name = "tower"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#c06aa5452d130da948c0b9a3b587edc094b19045"
source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8"
dependencies = [
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -1005,7 +1006,7 @@ dependencies = [
[[package]]
name = "tower-balance"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#c06aa5452d130da948c0b9a3b587edc094b19045"
source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8"
dependencies = [
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1018,7 +1019,7 @@ dependencies = [
[[package]]
name = "tower-buffer"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#c06aa5452d130da948c0b9a3b587edc094b19045"
source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8"
dependencies = [
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"tower 0.1.0 (git+https://github.com/tower-rs/tower)",
@ -1027,7 +1028,7 @@ dependencies = [
[[package]]
name = "tower-discover"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#c06aa5452d130da948c0b9a3b587edc094b19045"
source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8"
dependencies = [
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"tower 0.1.0 (git+https://github.com/tower-rs/tower)",
@ -1073,10 +1074,29 @@ dependencies = [
"tower 0.1.0 (git+https://github.com/tower-rs/tower)",
]
[[package]]
name = "tower-in-flight-limit"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8"
dependencies = [
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"tower 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-ready-service 0.1.0 (git+https://github.com/tower-rs/tower)",
]
[[package]]
name = "tower-ready-service"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8"
dependencies = [
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"tower 0.1.0 (git+https://github.com/tower-rs/tower)",
]
[[package]]
name = "tower-reconnect"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#c06aa5452d130da948c0b9a3b587edc094b19045"
source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8"
dependencies = [
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1086,10 +1106,11 @@ dependencies = [
[[package]]
name = "tower-util"
version = "0.1.0"
source = "git+https://github.com/tower-rs/tower#c06aa5452d130da948c0b9a3b587edc094b19045"
source = "git+https://github.com/tower-rs/tower#41c54b208e9dcc89ef9e83c0acd584d66b6a90b8"
dependencies = [
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"tower 0.1.0 (git+https://github.com/tower-rs/tower)",
"tower-ready-service 0.1.0 (git+https://github.com/tower-rs/tower)",
]
[[package]]
@ -1333,6 +1354,8 @@ dependencies = [
"checksum tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)" = "<none>"
"checksum tower-grpc-build 0.1.0 (git+https://github.com/tower-rs/tower-grpc)" = "<none>"
"checksum tower-h2 0.1.0 (git+https://github.com/tower-rs/tower-h2)" = "<none>"
"checksum tower-in-flight-limit 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-ready-service 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum tower-util 0.1.0 (git+https://github.com/tower-rs/tower)" = "<none>"
"checksum unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "284b6d3db520d67fbe88fd778c21510d1b0ba4a551e5d0fbb023d33405f6de8a"

View File

@ -36,15 +36,16 @@ ns-dns-tokio = "0.4"
#futures-watch = { git = "https://github.com/carllerche/better-future" }
tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
tower = { git = "https://github.com/tower-rs/tower" }
tower-balance = { git = "https://github.com/tower-rs/tower" }
tower-buffer = { git = "https://github.com/tower-rs/tower" }
tower-discover = { git = "https://github.com/tower-rs/tower" }
tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" }
tower-h2 = { git = "https://github.com/tower-rs/tower-h2" }
tower-reconnect = { git = "https://github.com/tower-rs/tower" }
tower-util = { git = "https://github.com/tower-rs/tower" }
tokio-connect = { git = "https://github.com/carllerche/tokio-connect" }
tower = { git = "https://github.com/tower-rs/tower" }
tower-balance = { git = "https://github.com/tower-rs/tower" }
tower-buffer = { git = "https://github.com/tower-rs/tower" }
tower-discover = { git = "https://github.com/tower-rs/tower" }
tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" }
tower-h2 = { git = "https://github.com/tower-rs/tower-h2" }
tower-reconnect = { git = "https://github.com/tower-rs/tower" }
tower-in-flight-limit = { git = "https://github.com/tower-rs/tower" }
tower-util = { git = "https://github.com/tower-rs/tower" }
[target.'cfg(target_os = "linux")'.dependencies]
libc = "0.2"

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use http;
use tower;
use tower_buffer::{self, Buffer};
use tower_in_flight_limit::{self, InFlightLimit};
use tower_h2;
use conduit_proxy_router::Recognize;
@ -17,6 +18,8 @@ pub struct Inbound<B> {
bind: Bind<B>,
}
const MAX_IN_FLIGHT: usize = 10_000;
// ===== impl Inbound =====
impl<B> Inbound<B> {
@ -34,12 +37,14 @@ where
{
type Request = http::Request<B>;
type Response = bind::HttpResponse;
type Error = tower_buffer::Error<
<bind::Service<B> as tower::Service>::Error
type Error = tower_in_flight_limit::Error<
tower_buffer::Error<
<bind::Service<B> as tower::Service>::Error
>
>;
type Key = (SocketAddr, bind::Protocol);
type RouteError = ();
type Service = Buffer<bind::Service<B>>;
type Service = InFlightLimit<Buffer<bind::Service<B>>>;
fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
let key = req.extensions()
@ -72,11 +77,11 @@ where
let &(ref addr, proto) = key;
debug!("building inbound {:?} client to {}", proto, addr);
// Wrap with buffering. This currently is an unbounded buffer, which
// is not ideal.
//
// TODO: Don't use unbounded buffering.
Buffer::new(self.bind.bind_service(addr, proto), self.bind.executor()).map_err(|_| {})
Buffer::new(self.bind.bind_service(addr, proto), self.bind.executor())
.map(|buffer| {
InFlightLimit::new(buffer, MAX_IN_FLIGHT)
})
.map_err(|_| {})
}
}

View File

@ -41,6 +41,7 @@ extern crate tower_h2;
extern crate tower_reconnect;
extern crate conduit_proxy_router;
extern crate tower_util;
extern crate tower_in_flight_limit;
extern crate url;
use futures::*;

View File

@ -8,6 +8,7 @@ use tower;
use tower_balance::{self, choose, load, Balance};
use tower_buffer::Buffer;
use tower_discover::{Change, Discover};
use tower_in_flight_limit::InFlightLimit;
use tower_h2;
use conduit_proxy_router::Recognize;
@ -26,6 +27,8 @@ pub struct Outbound<B> {
default_zone: Option<String>,
}
const MAX_IN_FLIGHT: usize = 10_000;
// ===== impl Outbound =====
impl<B> Outbound<B> {
@ -56,10 +59,10 @@ where
type Error = <Self::Service as tower::Service>::Error;
type Key = (Destination, Protocol);
type RouteError = ();
type Service = Buffer<Balance<
type Service = InFlightLimit<Buffer<Balance<
load::WithPendingRequests<Discovery<B>>,
choose::PowerOfTwoChoices<rand::ThreadRng>,
>>;
>>>;
fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
let local = req.uri().authority_part().and_then(|authority| {
@ -128,11 +131,11 @@ where
let balance = tower_balance::power_of_two_choices(loaded, rand::thread_rng());
// Wrap with buffering. This currently is an unbounded buffer,
// which is not ideal.
//
// TODO: Don't use unbounded buffering.
Buffer::new(balance, self.bind.executor()).map_err(|_| {})
Buffer::new(balance, self.bind.executor())
.map(|buffer| {
InFlightLimit::new(buffer, MAX_IN_FLIGHT)
})
.map_err(|_| {})
}
}