From 2ed7c4ad406ba7501090f123902599873c6241a4 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Thu, 7 Jun 2018 22:06:12 -0700 Subject: [PATCH] proxy: Use a PeakEWMA outbound load balancer (#1080) `tower-balance` has been updated with a Peak-EWMA load balancer; and a new crate, `tower-h2-balance` has been introduced to make the load balancer aware of some H2 stream events. The Peak-EWMA balancer is designed to reduce tail latency by maintaining an Exponentially Weighted Moving Average of latencies to each endpoint which decay over a 10s window. --- Cargo.lock | 56 +++++++++++++++++++++++++++---------------- proxy/Cargo.toml | 24 ++++++++++--------- proxy/src/lib.rs | 5 ++-- proxy/src/outbound.rs | 30 +++++++++++++---------- proxy/src/rng.rs | 26 -------------------- 5 files changed, 70 insertions(+), 71 deletions(-) delete mode 100644 proxy/src/rng.rs diff --git a/Cargo.lock b/Cargo.lock index 876b09f7e..882e9a30f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -149,7 +149,7 @@ dependencies = [ "regex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "ring 0.13.0-alpha4 (registry+https://github.com/rust-lang/crates.io-index)", "rustls 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)", "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-rustls 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -159,6 +159,7 @@ dependencies = [ "tower-discover 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)", "tower-h2 0.1.0 (git+https://github.com/tower-rs/tower-h2)", + "tower-h2-balance 0.1.0 (git+https://github.com/tower-rs/tower-h2)", "tower-in-flight-limit 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -433,12 +434,12 @@ dependencies = [ "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-executor 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tcp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "want 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1029,7 +1030,7 @@ dependencies = [ [[package]] name = "tokio" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1039,8 +1040,8 @@ dependencies = [ "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tcp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-threadpool 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-udp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1068,7 +1069,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-threadpool 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-threadpool 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1100,7 +1101,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "rustls 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "webpki 0.18.0-alpha4 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1134,7 +1135,7 @@ dependencies = [ [[package]] name = "tokio-threadpool" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "crossbeam-deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1147,7 +1148,7 @@ dependencies = [ [[package]] name = "tokio-timer" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1170,12 +1171,13 @@ dependencies = [ [[package]] name = "tower-balance" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d" +source = "git+https://github.com/tower-rs/tower#fcdc9d27777d82ab75e58aa67ff2aac1b89ccc82" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "indexmap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "tower-discover 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", ] @@ -1192,7 +1194,7 @@ dependencies = [ [[package]] name = "tower-discover" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d" +source = "git+https://github.com/tower-rs/tower#fcdc9d27777d82ab75e58aa67ff2aac1b89ccc82" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", @@ -1226,7 +1228,7 @@ dependencies = [ [[package]] name = "tower-h2" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower-h2#abb40873322af7c014d2cb3a4caf03f5f88175e9" +source = "git+https://github.com/tower-rs/tower-h2#760f9fc1c83c3a96edb6fbddb6b0cd3cac73d8ac" dependencies = [ "bytes 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1238,6 +1240,19 @@ dependencies = [ "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", ] +[[package]] +name = "tower-h2-balance" +version = "0.1.0" +source = "git+https://github.com/tower-rs/tower-h2#760f9fc1c83c3a96edb6fbddb6b0cd3cac73d8ac" +dependencies = [ + "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", + "h2 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)", + "tower-h2 0.1.0 (git+https://github.com/tower-rs/tower-h2)", + "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", +] + [[package]] name = "tower-in-flight-limit" version = "0.1.0" @@ -1260,7 +1275,7 @@ dependencies = [ [[package]] name = "tower-service" version = "0.1.0" -source = "git+https://github.com/tower-rs/tower#9d49396eb807e6913c46cd877d6f8d039d3d5e1d" +source = "git+https://github.com/tower-rs/tower#fcdc9d27777d82ab75e58aa67ff2aac1b89ccc82" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1292,7 +1307,7 @@ dependencies = [ "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-reactor 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-tcp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-udp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1311,7 +1326,7 @@ dependencies = [ "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "resolv-conf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "trust-dns-proto 0.4.0 (git+https://github.com/bluejekyll/trust-dns)", ] @@ -1586,7 +1601,7 @@ dependencies = [ "checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096" "checksum thread_local 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "279ef31c19ededf577bfd12dfae728040a21f635b06a24cd670ff510edd38963" "checksum time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)" = "a15375f1df02096fb3317256ce2cee6a1f42fc84ea5ad5fc8c421cfe40c73098" -"checksum tokio 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7d00555353b013e170ed8bc4e13f648a317d1fd12157dbcae13f7013f6cf29f5" +"checksum tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8ee337e5f4e501fc32966fec6fe0ca0cc1c237b0b1b14a335f8bfe3c5f06e286" "checksum tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)" = "" "checksum tokio-executor 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "8cac2a7883ff3567e9d66bb09100d09b33d90311feca0206c7ca034bc0c55113" "checksum tokio-fs 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "76766830bbf9a2d5bfb50c95350d56a2e79e2c80f675967fff448bc615899708" @@ -1595,8 +1610,8 @@ dependencies = [ "checksum tokio-rustls 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7abfd34d641c0cade850bd94bd0bbfa548b88adb2f08ae237a129144034ff9ae" "checksum tokio-signal 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6a5bf935a0151cc8899aa806ce6a425bdaec79ed4034de1a1e6bfa247e2def" "checksum tokio-tcp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ec9b094851aadd2caf83ba3ad8e8c4ce65a42104f7b94d9e6550023f0407853f" -"checksum tokio-threadpool 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "5783254b10c7c84a56f62c74766ef7e5b83d1f13053218c7cab8d3f2c826fa0e" -"checksum tokio-timer 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "535fed0ccee189f3d48447587697ba3fd234b3dbbb091f0ec4613ddfec0a7c4c" +"checksum tokio-threadpool 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b3c3873a6d8d0b636e024e77b9a82eaab6739578a06189ecd0e731c7308fbc5d" +"checksum tokio-timer 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "028b94314065b90f026a21826cffd62a4e40a92cda3e5c069cc7b02e5945f5e9" "checksum tokio-udp 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "137bda266504893ac4774e0ec4c2108f7ccdbcb7ac8dced6305fe9e4e0b5041a" "checksum tower-balance 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-buffer 0.1.0 (git+https://github.com/tower-rs/tower)" = "" @@ -1604,6 +1619,7 @@ dependencies = [ "checksum tower-grpc 0.1.0 (git+https://github.com/tower-rs/tower-grpc)" = "" "checksum tower-grpc-build 0.1.0 (git+https://github.com/tower-rs/tower-grpc)" = "" "checksum tower-h2 0.1.0 (git+https://github.com/tower-rs/tower-h2)" = "" +"checksum tower-h2-balance 0.1.0 (git+https://github.com/tower-rs/tower-h2)" = "" "checksum tower-in-flight-limit 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-service 0.1.0 (git+https://github.com/tower-rs/tower)" = "" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 45405b99d..a9d7d2421 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -27,30 +27,32 @@ hyper = "0.12" ipnet = "1.0" log = "0.4.1" indexmap = "1.0.0" +prost = "0.3.0" +prost-types = "0.3.0" rand = "0.4" # for config parsing regex = "1.0.0" -tokio = "0.1.6" +# networking +tokio = "0.1.7" tokio-signal = "0.2" - -prost = "0.3.0" -prost-types = "0.3.0" - -trust-dns-resolver = { default-features = false, git = "https://github.com/bluejekyll/trust-dns" } - tokio-connect = { git = "https://github.com/carllerche/tokio-connect" } -tower-service = { git = "https://github.com/tower-rs/tower" } tower-balance = { git = "https://github.com/tower-rs/tower" } tower-buffer = { git = "https://github.com/tower-rs/tower" } tower-discover = { git = "https://github.com/tower-rs/tower" } -tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" } -tower-h2 = { git = "https://github.com/tower-rs/tower-h2" } -tower-reconnect = { git = "https://github.com/tower-rs/tower" } tower-in-flight-limit = { git = "https://github.com/tower-rs/tower" } +tower-reconnect = { git = "https://github.com/tower-rs/tower" } +tower-service = { git = "https://github.com/tower-rs/tower" } tower-util = { git = "https://github.com/tower-rs/tower" } +tower-h2 = { git = "https://github.com/tower-rs/tower-h2" } +tower-h2-balance = { git = "https://github.com/tower-rs/tower-h2" } +tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" } +# dns +trust-dns-resolver = { default-features = false, git = "https://github.com/bluejekyll/trust-dns" } + +# tls ring = "0.13.0-alpha4" webpki = "0.18.0-alpha4" rustls = "0.12.0" diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 5469f7510..5cdfe34fe 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -38,6 +38,7 @@ extern crate tower_buffer; extern crate tower_discover; extern crate tower_grpc; extern crate tower_h2; +extern crate tower_h2_balance; extern crate tower_reconnect; extern crate tower_service; extern crate conduit_proxy_router; @@ -81,7 +82,6 @@ mod transparency; mod transport; pub mod timeout; mod tower_fn; // TODO: move to tower-fn -mod rng; use bind::Bind; use connection::BoundPort; @@ -356,12 +356,13 @@ fn serve( ) -> impl Future + Send + 'static where B: tower_h2::Body + Default + Send + 'static, + B::Data: Send, ::Buf: Send, E: Error + Send + 'static, F: Error + Send + 'static, R: Recognize< Request = http::Request, - Response = http::Response>, + Response = http::Response, Error = E, RouteError = F, > diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index b2b69d8e4..28dfb2d95 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -6,20 +6,21 @@ use std::sync::Arc; use http; use futures::{Async, Poll}; use tower_service as tower; -use tower_balance::{self, choose, load, Balance}; +use tower_balance::{choose, load, Balance}; use tower_buffer::Buffer; use tower_discover::{Change, Discover}; use tower_in_flight_limit::InFlightLimit; use tower_h2; +use tower_h2_balance::{PendingUntilFirstData, PendingUntilFirstDataBody}; use conduit_proxy_router::Recognize; use bind::{self, Bind, Protocol}; use control::destination::{self, Bind as BindTrait, Resolution}; use ctx; +use telemetry::sensor::http::{ResponseBody as SensorBody}; use timeout::Timeout; -use transparency::h1; +use transparency::{h1, HttpBody}; use transport::{DnsNameAndPort, Host, HostAndPort}; -use rng::LazyThreadRng; type BindProtocol = bind::BindProtocol, B>; @@ -31,6 +32,9 @@ pub struct Outbound { const MAX_IN_FLIGHT: usize = 10_000; +/// This default is used by Finagle. +const DEFAULT_DECAY: Duration = Duration::from_secs(10); + #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum Destination { Hostname(DnsNameAndPort), @@ -72,13 +76,16 @@ where ::Buf: Send, { type Request = http::Request; - type Response = bind::HttpResponse; + type Response = http::Response, + >>; type Error = ::Error; type Key = (Destination, Protocol); type RouteError = bind::BufferSpawnError; type Service = InFlightLimit>, - choose::PowerOfTwoChoices + load::WithPeakEwma, PendingUntilFirstData>, + choose::PowerOfTwoChoices, >>>>; fn recognize(&self, req: &Self::Request) -> Option { @@ -147,12 +154,11 @@ where } }; - let loaded = tower_balance::load::WithPendingRequests::new(resolve); - - // We can't use `rand::thread_rng` here because the returned `Service` - // needs to be `Send`, so instead, we use `LazyRng`, which calls - // `rand::thread_rng()` when it is *used*. - let balance = tower_balance::power_of_two_choices(loaded, LazyThreadRng); + let balance = { + let instrument = PendingUntilFirstData::default(); + let loaded = load::WithPeakEwma::new(resolve, DEFAULT_DECAY, instrument); + Balance::p2c(loaded) + }; let log = ::logging::proxy().client("out", Dst(dest.clone())) .with_protocol(protocol.clone()); diff --git a/proxy/src/rng.rs b/proxy/src/rng.rs deleted file mode 100644 index 9f7920d66..000000000 --- a/proxy/src/rng.rs +++ /dev/null @@ -1,26 +0,0 @@ -use rand; - -/// An empty type which implements `rand::Rng` by lazily getting the current -/// `thread_rng` when its' called. -/// -/// This can be used in cases where we need a type to be `Send`, but wish to -/// use the thread-local RNG. -#[derive(Copy, Clone, Debug, Default)] -pub struct LazyThreadRng; - -// ===== impl LazyRng ===== - -impl rand::Rng for LazyThreadRng { - fn next_u32(&mut self) -> u32 { - rand::thread_rng().next_u32() - } - - fn next_u64(&mut self) -> u64 { - rand::thread_rng().next_u64() - } - - #[inline] - fn fill_bytes(&mut self, bytes: &mut [u8]) { - rand::thread_rng().fill_bytes(bytes) - } -}