proxy: improve graceful shutdown process (#684)

- The listener is immediately closed on receipt of a shutdown signal.
- All in-progress server connections are now counted, and the process will
  not shutdown until the connection count has dropped to zero.
- In the case of HTTP1, idle connections are closed. In the case of HTTP2,
  the HTTP2 graceful shutdown steps are followed of sending various
  GOAWAYs.
This commit is contained in:
Sean McArthur 2018-04-10 14:15:37 -07:00 committed by GitHub
parent 91c359e612
commit 02c6887020
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 708 additions and 71 deletions

14
Cargo.lock generated
View File

@ -108,7 +108,7 @@ dependencies = [
"env_logger 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-mpsc-lossy 0.3.0", "futures-mpsc-lossy 0.3.0",
"h2 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.11.22 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.22 (registry+https://github.com/rust-lang/crates.io-index)",
@ -143,7 +143,7 @@ dependencies = [
"bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"convert 0.3.0", "convert 0.3.0",
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"h2 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"prost 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"prost-derive 0.3.2 (git+https://github.com/danburkert/prost)", "prost-derive 0.3.2 (git+https://github.com/danburkert/prost)",
@ -249,7 +249,7 @@ dependencies = [
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.1.3" version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"byteorder 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -786,7 +786,7 @@ source = "git+https://github.com/tower-rs/tower-grpc#57d976aca89c13838b946dca9b6
dependencies = [ dependencies = [
"bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"h2 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"prost 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -808,11 +808,11 @@ dependencies = [
[[package]] [[package]]
name = "tower-h2" name = "tower-h2"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/tower-rs/tower-h2#59f344f02d37a9e8596805f159bdca3af13ee7b0" source = "git+https://github.com/tower-rs/tower-h2#26453f5f0afe2928b47af38713f880ecf27fa85f"
dependencies = [ dependencies = [
"bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
"h2 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)", "tokio-connect 0.1.0 (git+https://github.com/carllerche/tokio-connect)",
@ -957,7 +957,7 @@ dependencies = [
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
"checksum futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "0bab5b5e94f5c31fc764ba5dd9ad16568aae5d4825538c01d6bca680c9bf94a7" "checksum futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "0bab5b5e94f5c31fc764ba5dd9ad16568aae5d4825538c01d6bca680c9bf94a7"
"checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4"
"checksum h2 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "771f68c296fba5783c4eddd3aab1e7d4523ac88ecc5549ffb19782b6b261981b" "checksum h2 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "065fb096fc65bbfb9c765d48c9f3f1a21cdb25ba0d3f82105b38f30ddffa2f7e"
"checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82" "checksum heck 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea04fa3ead4e05e51a7c806fc07271fdbde4e246a6c6d1efd52e72230b771b82"
"checksum http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "75df369fd52c60635208a4d3e694777c099569b3dcf4844df8f652dc004644ab" "checksum http 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "75df369fd52c60635208a4d3e694777c099569b3dcf4844df8f652dc004644ab"
"checksum httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c2f407128745b78abc95c0ffbe4e5d37427fdc0d45470710cfef8c44522a2e37" "checksum httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c2f407128745b78abc95c0ffbe4e5d37427fdc0d45470710cfef8c44522a2e37"

View File

@ -9,3 +9,4 @@ members = [
[patch.crates-io] [patch.crates-io]
prost-derive = { git = "https://github.com/danburkert/prost" } prost-derive = { git = "https://github.com/danburkert/prost" }

View File

@ -19,7 +19,7 @@ bytes = "0.4"
domain = "0.2.3" domain = "0.2.3"
env_logger = { version = "0.5", default-features = false } env_logger = { version = "0.5", default-features = false }
futures = "0.1" futures = "0.1"
h2 = "0.1" h2 = "0.1.5"
http = "0.1" http = "0.1"
httparse = "1.2" httparse = "1.2"
hyper = { version = "0.11.22", default-features = false, features = ["compat"] } hyper = { version = "0.11.22", default-features = false, features = ["compat"] }

267
proxy/src/drain.rs Normal file
View File

@ -0,0 +1,267 @@
use std::mem;
use futures::{Async, Future, Poll, Stream};
use futures::future::Shared;
use futures::sync::{mpsc, oneshot};
/// Creates a drain channel.
///
/// The `Signal` is used to start a drain, and the `Watch` will be notified
/// when a drain is signaled.
pub fn channel() -> (Signal, Watch) {
let (tx, rx) = oneshot::channel();
let (drained_tx, drained_rx) = mpsc::channel(0);
(
Signal {
drained_rx,
tx,
},
Watch {
drained_tx,
rx: rx.shared(),
},
)
}
/// Send a drain command to all watchers.
///
/// When a drain is started, this returns a `Drained` future which resolves
/// when all `Watch`ers have been dropped.
#[derive(Debug)]
pub struct Signal {
drained_rx: mpsc::Receiver<Never>,
tx: oneshot::Sender<()>,
}
/// Watch for a drain command.
///
/// This wraps another future and callback to be called when drain is triggered.
#[derive(Clone, Debug)]
pub struct Watch {
drained_tx: mpsc::Sender<Never>,
rx: Shared<oneshot::Receiver<()>>,
}
/// The wrapped watching `Future`.
#[derive(Debug)]
pub struct Watching<A, F> {
future: A,
state: State<F>,
watch: Watch,
}
#[derive(Debug)]
enum State<F> {
Watch(F),
Draining,
}
//TODO: in Rust 1.26, replace this with `!`.
#[derive(Debug)]
enum Never {}
/// A future that resolves when all `Watch`ers have been dropped (drained).
pub struct Drained {
drained_rx: mpsc::Receiver<Never>,
}
// ===== impl Signal =====
impl Signal {
/// Start the draining process.
///
/// A signal is sent to all futures watching for the signal. A new future
/// is returned from this method that resolves when all watchers have
/// completed.
pub fn drain(self) -> Drained {
let _ = self.tx.send(());
Drained {
drained_rx: self.drained_rx,
}
}
}
// ===== impl Watch =====
impl Watch {
/// Wrap a future and a callback that is triggered when drain is received.
///
/// The callback receives a mutable reference to the original future, and
/// should be used to trigger any shutdown process for it.
pub fn watch<A, F>(self, future: A, on_drain: F) -> Watching<A, F>
where
A: Future,
F: FnOnce(&mut A),
{
Watching {
future,
state: State::Watch(on_drain),
watch: self,
}
}
}
// ===== impl Watching =====
impl<A, F> Future for Watching<A, F>
where
A: Future,
F: FnOnce(&mut A),
{
type Item = A::Item;
type Error = A::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match mem::replace(&mut self.state, State::Draining) {
State::Watch(on_drain) => {
match self.watch.rx.poll() {
Ok(Async::Ready(_)) | Err(_) => {
// Drain has been triggered!
on_drain(&mut self.future);
},
Ok(Async::NotReady) => {
self.state = State::Watch(on_drain);
return self.future.poll();
}
}
},
State::Draining => {
return self.future.poll();
},
}
}
}
}
// ===== impl Drained =====
impl Future for Drained {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match try_ready!(self.drained_rx.poll()) {
Some(never) => match never {},
None => Ok(Async::Ready(())),
}
}
}
#[cfg(test)]
mod tests {
use futures::{future, Async, Future, Poll};
use super::*;
struct TestMe {
draining: bool,
finished: bool,
poll_cnt: usize,
}
impl Future for TestMe {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.poll_cnt += 1;
if self.finished {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}
#[test]
fn watch() {
future::lazy(|| {
let (tx, rx) = channel();
let fut = TestMe {
draining: false,
finished: false,
poll_cnt: 0,
};
let mut watch = rx.watch(fut, |fut| {
fut.draining = true;
});
assert_eq!(watch.future.poll_cnt, 0);
// First poll should poll the inner future
assert!(watch.poll().unwrap().is_not_ready());
assert_eq!(watch.future.poll_cnt, 1);
// Second poll should poll the inner future again
assert!(watch.poll().unwrap().is_not_ready());
assert_eq!(watch.future.poll_cnt, 2);
let mut draining = tx.drain();
// Drain signaled, but needs another poll to be noticed.
assert!(!watch.future.draining);
assert_eq!(watch.future.poll_cnt, 2);
// Now, poll after drain has been signaled.
assert!(watch.poll().unwrap().is_not_ready());
assert_eq!(watch.future.poll_cnt, 3);
assert!(watch.future.draining);
// Draining is not ready until watcher completes
assert!(draining.poll().unwrap().is_not_ready());
// Finishing up the watch future
watch.future.finished = true;
assert!(watch.poll().unwrap().is_ready());
assert_eq!(watch.future.poll_cnt, 4);
drop(watch);
assert!(draining.poll().unwrap().is_ready());
Ok::<_, ()>(())
}).wait().unwrap();
}
#[test]
fn watch_clones() {
future::lazy(|| {
let (tx, rx) = channel();
let fut1 = TestMe {
draining: false,
finished: false,
poll_cnt: 0,
};
let fut2 = TestMe {
draining: false,
finished: false,
poll_cnt: 0,
};
let watch1 = rx.clone().watch(fut1, |fut| {
fut.draining = true;
});
let watch2 = rx.watch(fut2, |fut| {
fut.draining = true;
});
let mut draining = tx.drain();
// Still 2 outstanding watchers
assert!(draining.poll().unwrap().is_not_ready());
// drop 1 for whatever reason
drop(watch1);
// Still not ready, 1 other watcher still pending
assert!(draining.poll().unwrap().is_not_ready());
drop(watch2);
// Now all watchers are gone, draining is complete
assert!(draining.poll().unwrap().is_ready());
Ok::<_, ()>(())
}).wait().unwrap();
}
}

View File

@ -65,6 +65,7 @@ mod connection;
pub mod control; pub mod control;
mod ctx; mod ctx;
mod dns; mod dns;
mod drain;
mod inbound; mod inbound;
mod logging; mod logging;
mod map_err; mod map_err;
@ -207,6 +208,7 @@ where
let (control, control_bg) = control::new(dns_config.clone(), config.pod_namespace.clone()); let (control, control_bg) = control::new(dns_config.clone(), config.pod_namespace.clone());
let executor = core.handle(); let executor = core.handle();
let (drain_tx, drain_rx) = drain::channel();
let bind = Bind::new(executor.clone()).with_sensors(sensors.clone()); let bind = Bind::new(executor.clone()).with_sensors(sensors.clone());
@ -228,6 +230,7 @@ where
ctx, ctx,
sensors.clone(), sensors.clone(),
get_original_dst.clone(), get_original_dst.clone(),
drain_rx.clone(),
&executor, &executor,
); );
::logging::context_future("inbound", fut) ::logging::context_future("inbound", fut)
@ -248,6 +251,7 @@ where
ctx, ctx,
sensors, sensors,
get_original_dst, get_original_dst,
drain_rx,
&executor, &executor,
); );
::logging::context_future("outbound", fut) ::logging::context_future("outbound", fut)
@ -308,7 +312,12 @@ where
.map_err(|err| error!("main error: {:?}", err)); .map_err(|err| error!("main error: {:?}", err));
core.handle().spawn(fut); core.handle().spawn(fut);
let shutdown_signal = shutdown_signal.and_then(move |()| {
debug!("shutdown signaled");
drain_tx.drain()
});
core.run(shutdown_signal).expect("executor"); core.run(shutdown_signal).expect("executor");
debug!("shutdown complete");
} }
} }
@ -320,6 +329,7 @@ fn serve<R, B, E, F, G>(
proxy_ctx: Arc<ctx::Proxy>, proxy_ctx: Arc<ctx::Proxy>,
sensors: telemetry::Sensors, sensors: telemetry::Sensors,
get_orig_dst: G, get_orig_dst: G,
drain_rx: drain::Watch,
executor: &Handle, executor: &Handle,
) -> Box<Future<Item = (), Error = io::Error> + 'static> ) -> Box<Future<Item = (), Error = io::Error> + 'static>
where where
@ -368,17 +378,55 @@ where
stack, stack,
tcp_connect_timeout, tcp_connect_timeout,
disable_protocol_detection_ports, disable_protocol_detection_ports,
drain_rx.clone(),
executor.clone(), executor.clone(),
); );
bound_port.listen_and_fold(
let accept = bound_port.listen_and_fold(
executor, executor,
(), (),
move |(), (connection, remote_addr)| { move |(), (connection, remote_addr)| {
server.serve(connection, remote_addr); server.serve(connection, remote_addr);
Ok(()) Ok(())
}, },
) );
let accept_until = Cancelable {
future: accept,
canceled: false,
};
// As soon as we get a shutdown signal, the listener
// is canceled immediately.
Box::new(drain_rx.watch(accept_until, |accept| {
accept.canceled = true;
}))
}
/// Can cancel a future by setting a flag.
///
/// Used to 'watch' the accept futures, and close the listeners
/// as soon as the shutdown signal starts.
struct Cancelable<F> {
future: F,
canceled: bool,
}
impl<F> Future for Cancelable<F>
where
F: Future<Item=()>,
{
type Item = ();
type Error = F::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.canceled {
Ok(().into())
} else {
self.future.poll()
}
}
} }
fn serve_control<N, B>( fn serve_control<N, B>(

View File

@ -15,6 +15,7 @@ use conduit_proxy_controller_grpc::common;
use connection::Connection; use connection::Connection;
use ctx::Proxy as ProxyCtx; use ctx::Proxy as ProxyCtx;
use ctx::transport::{Server as ServerCtx}; use ctx::transport::{Server as ServerCtx};
use drain;
use telemetry::Sensors; use telemetry::Sensors;
use transport::GetOriginalDst; use transport::GetOriginalDst;
use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc}; use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc};
@ -26,12 +27,14 @@ use super::tcp;
/// This type can `serve` new connections, determine what protocol /// This type can `serve` new connections, determine what protocol
/// the connection is speaking, and route it to the corresponding /// the connection is speaking, and route it to the corresponding
/// service. /// service.
pub struct Server<S: NewService, B: tower_h2::Body, G> pub struct Server<S, B, G>
where where
S: NewService<Request=http::Request<HttpBody>>, S: NewService<Request=http::Request<HttpBody>>,
S::Future: 'static, S::Future: 'static,
B: tower_h2::Body,
{ {
disable_protocol_detection_ports: IndexSet<u16>, disable_protocol_detection_ports: IndexSet<u16>,
drain_signal: drain::Watch,
executor: Handle, executor: Handle,
get_orig_dst: G, get_orig_dst: G,
h1: hyper::server::Http, h1: hyper::server::Http,
@ -51,6 +54,7 @@ where
> + Clone + 'static, > + Clone + 'static,
S::Future: 'static, S::Future: 'static,
S::Error: fmt::Debug, S::Error: fmt::Debug,
S::InitError: fmt::Debug,
B: tower_h2::Body + 'static, B: tower_h2::Body + 'static,
G: GetOriginalDst, G: GetOriginalDst,
{ {
@ -63,12 +67,14 @@ where
stack: S, stack: S,
tcp_connect_timeout: Duration, tcp_connect_timeout: Duration,
disable_protocol_detection_ports: IndexSet<u16>, disable_protocol_detection_ports: IndexSet<u16>,
drain_signal: drain::Watch,
executor: Handle, executor: Handle,
) -> Self { ) -> Self {
let recv_body_svc = HttpBodyNewSvc::new(stack.clone()); let recv_body_svc = HttpBodyNewSvc::new(stack.clone());
let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone(), &executor); let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone(), &executor);
Server { Server {
disable_protocol_detection_ports, disable_protocol_detection_ports,
drain_signal,
executor: executor.clone(), executor: executor.clone(),
get_orig_dst, get_orig_dst,
h1: hyper::server::Http::new(), h1: hyper::server::Http::new(),
@ -108,6 +114,7 @@ where
let fut = tcp_serve( let fut = tcp_serve(
&self.tcp, &self.tcp,
connection, connection,
self.drain_signal.clone(),
&self.sensors, &self.sensors,
opened_at, opened_at,
&self.proxy_ctx, &self.proxy_ctx,
@ -127,6 +134,7 @@ where
let h2 = self.h2.clone(); let h2 = self.h2.clone();
let tcp = self.tcp.clone(); let tcp = self.tcp.clone();
let new_service = self.new_service.clone(); let new_service = self.new_service.clone();
let drain_signal = self.drain_signal.clone();
let fut = connection let fut = connection
.peek_future(sniff) .peek_future(sniff)
.map_err(|_| ()) .map_err(|_| ())
@ -151,9 +159,12 @@ where
.map_err(|_| ()) .map_err(|_| ())
.and_then(move |s| { .and_then(move |s| {
let svc = HyperServerSvc::new(s, srv_ctx); let svc = HyperServerSvc::new(s, srv_ctx);
h1.serve_connection(io, svc) drain_signal
.watch(h1.serve_connection(io, svc), |conn| {
conn.disable_keep_alive();
})
.map(|_| ()) .map(|_| ())
.map_err(|_| ()) .map_err(|e| trace!("http1 server error: {:?}", e))
})) }))
}, },
Protocol::Http2 => { Protocol::Http2 => {
@ -162,7 +173,14 @@ where
let set_ctx = move |request: &mut http::Request<()>| { let set_ctx = move |request: &mut http::Request<()>| {
request.extensions_mut().insert(srv_ctx.clone()); request.extensions_mut().insert(srv_ctx.clone());
}; };
Box::new(h2.serve_modified(io, set_ctx).map_err(|_| ()))
let fut = drain_signal
.watch(h2.serve_modified(io, set_ctx), |conn| {
conn.graceful_shutdown();
})
.map_err(|e| trace!("h2 server error: {:?}", e));
Box::new(fut)
} }
} }
} else { } else {
@ -170,6 +188,7 @@ where
tcp_serve( tcp_serve(
&tcp, &tcp,
connection, connection,
drain_signal,
&sensors, &sensors,
opened_at, opened_at,
&proxy_ctx, &proxy_ctx,
@ -195,6 +214,7 @@ struct OrigDst<'a>(&'a Option<SocketAddr>);
fn tcp_serve( fn tcp_serve(
tcp: &tcp::Proxy, tcp: &tcp::Proxy,
connection: Connection, connection: Connection,
drain_signal: drain::Watch,
sensors: &Sensors, sensors: &Sensors,
opened_at: Instant, opened_at: Instant,
proxy_ctx: &Arc<ProxyCtx>, proxy_ctx: &Arc<ProxyCtx>,
@ -213,5 +233,10 @@ fn tcp_serve(
// record telemetry // record telemetry
let tcp_in = sensors.accept(connection, opened_at, &srv_ctx); let tcp_in = sensors.accept(connection, opened_at, &srv_ctx);
tcp.serve(tcp_in, srv_ctx) let fut = tcp.serve(tcp_in, srv_ctx);
// There's nothing to do when drain is signaled, we just have to hope
// the sockets finish soon. However, the drain signal still needs to
// 'watch' the TCP future so that the process doesn't close early.
Box::new(drain_signal.watch(fut, |_| ()))
} }

View File

@ -201,7 +201,17 @@ fn outbound_updates_newer_services() {
// the HTTP2 service starts watching first, receiving an addr // the HTTP2 service starts watching first, receiving an addr
// from the controller // from the controller
let client1 = client::http2(proxy.outbound, "disco.test.svc.cluster.local"); let client1 = client::http2(proxy.outbound, "disco.test.svc.cluster.local");
client1.get("/h2"); // 500, ignore
// This HTTP2 client tries to talk to our HTTP1 server, and the server
// will return an error (see above TODO).
//
// The reason to do this request at all is because the proxy will create
// an H2 service mapping when it sees an H2 request, and we want to test
// that when it sees H1 and tries to create a new mapping, the existing
// known Discovery information is shared with it.
let res = client1.request(&mut client1.request_builder("/h1"));
assert_eq!(res.status(), 500);
// a new HTTP1 service needs to be build now, while the HTTP2 // a new HTTP1 service needs to be build now, while the HTTP2
// service already exists, so make sure previously sent addrs // service already exists, so make sure previously sent addrs

154
proxy/tests/shutdown.rs Normal file
View File

@ -0,0 +1,154 @@
mod support;
use self::support::*;
#[test]
fn h2_goaways_connections() {
let _ = env_logger::try_init();
let (shdn, rx) = shutdown_signal();
let srv = server::http2().route("/", "hello").run();
let ctrl = controller::new().run();
let proxy = proxy::new()
.controller(ctrl)
.inbound(srv)
.shutdown_signal(rx)
.run();
let client = client::http2(proxy.inbound, "shutdown.test.svc.cluster.local");
assert_eq!(client.get("/"), "hello");
shdn.signal();
client.wait_for_closed();
}
#[test]
fn h2_exercise_goaways_connections() {
let _ = env_logger::try_init();
const RESPONSE_SIZE: usize = 1024 * 16;
const NUM_REQUESTS: usize = 50;
let (shdn, rx) = shutdown_signal();
let body = Bytes::from(vec![b'1'; RESPONSE_SIZE]);
let srv = server::http2()
.route_fn("/", move |_req| {
Response::builder()
.body(body.clone())
.unwrap()
})
.run();
let ctrl = controller::new().run();
let proxy = proxy::new()
.controller(ctrl)
.inbound(srv)
.shutdown_signal(rx)
.run();
let client = client::http2(proxy.inbound, "shutdown.test.svc.cluster.local");
let reqs = (0..NUM_REQUESTS)
.into_iter()
.map(|_| {
client.request_async(
client.request_builder("/").method("GET")
)
})
.collect::<Vec<_>>();
// Wait to get all responses (but not bodies)
let resps = future::join_all(reqs)
.wait()
.expect("reqs");
// Trigger a shutdown while bodies are still in progress.
shdn.signal();
let bodies = resps
.into_iter()
.map(|resp| {
resp
.into_body()
.concat2()
// Make sure the bodies weren't cut off
.map(|buf| assert_eq!(buf.len(), RESPONSE_SIZE))
})
.collect::<Vec<_>>();
// See that the proxy gives us all the bodies.
future::join_all(bodies)
.wait()
.expect("bodies");
client.wait_for_closed();
}
#[test]
fn http1_closes_idle_connections() {
use std::cell::RefCell;
let _ = env_logger::try_init();
let (shdn, rx) = shutdown_signal();
const RESPONSE_SIZE: usize = 1024 * 16;
let body = Bytes::from(vec![b'1'; RESPONSE_SIZE]);
let shdn = RefCell::new(Some(shdn));
let srv = server::http1()
.route_fn("/", move |_req| {
// Trigger a shutdown signal while the request is made
// but a response isn't returned yet.
shdn.borrow_mut()
.take()
.expect("only 1 request")
.signal();
Response::builder()
.body(body.clone())
.unwrap()
})
.run();
let ctrl = controller::new().run();
let proxy = proxy::new()
.controller(ctrl)
.inbound(srv)
.shutdown_signal(rx)
.run();
let client = client::http1(proxy.inbound, "shutdown.test.svc.cluster.local");
let res_body = client.get("/");
assert_eq!(res_body.len(), RESPONSE_SIZE);
client.wait_for_closed();
}
#[test]
fn tcp_waits_for_proxies_to_close() {
let _ = env_logger::try_init();
let (shdn, rx) = shutdown_signal();
let msg1 = "custom tcp hello";
let msg2 = "custom tcp bye";
let srv = server::tcp()
.accept(move |read| {
assert_eq!(read, msg1.as_bytes());
msg2
})
.run();
let ctrl = controller::new().run();
let proxy = proxy::new()
.controller(ctrl)
.inbound(srv)
.shutdown_signal(rx)
.run();
let client = client::tcp(proxy.inbound);
let tcp_client = client.connect();
shdn.signal();
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
}

View File

@ -1,7 +1,11 @@
use support::*; use support::*;
use std::cell::RefCell;
use std::io;
use self::futures::sync::{mpsc, oneshot}; use self::futures::sync::{mpsc, oneshot};
use self::tokio_core::net::TcpStream; use self::tokio_core::net::TcpStream;
use self::tokio_io::{AsyncRead, AsyncWrite};
type Request = http::Request<()>; type Request = http::Request<()>;
type Response = http::Response<BodyStream>; type Response = http::Response<BodyStream>;
@ -18,7 +22,7 @@ pub fn http1<T: Into<String>>(addr: SocketAddr, auth: T) -> Client {
}) })
} }
// This sends `GET http://foo.com/ HTTP/1.1` instead of just `GET / HTTP/1.1`. /// This sends `GET http://foo.com/ HTTP/1.1` instead of just `GET / HTTP/1.1`.
pub fn http1_absolute_uris<T: Into<String>>(addr: SocketAddr, auth: T) -> Client { pub fn http1_absolute_uris<T: Into<String>>(addr: SocketAddr, auth: T) -> Client {
Client::new(addr, auth.into(), Run::Http1 { Client::new(addr, auth.into(), Run::Http1 {
absolute_uris: true, absolute_uris: true,
@ -35,6 +39,9 @@ pub fn tcp(addr: SocketAddr) -> tcp::TcpClient {
pub struct Client { pub struct Client {
authority: String, authority: String,
/// This is a future that completes when the associated connection for
/// this Client has been dropped.
running: Running,
tx: Sender, tx: Sender,
version: http::Version, version: http::Version,
} }
@ -45,9 +52,11 @@ impl Client {
Run::Http1 { .. } => http::Version::HTTP_11, Run::Http1 { .. } => http::Version::HTTP_11,
Run::Http2 => http::Version::HTTP_2, Run::Http2 => http::Version::HTTP_2,
}; };
let (tx, running) = run(addr, r);
Client { Client {
authority, authority,
tx: run(addr, r), running,
tx,
version: v, version: v,
} }
} }
@ -55,20 +64,30 @@ impl Client {
pub fn get(&self, path: &str) -> String { pub fn get(&self, path: &str) -> String {
let mut req = self.request_builder(path); let mut req = self.request_builder(path);
let res = self.request(req.method("GET")); let res = self.request(req.method("GET"));
assert_eq!(
res.status(),
StatusCode::OK,
"client.get({:?}) expects 200 OK, got \"{}\"",
path,
res.status(),
);
let stream = res.into_parts().1; let stream = res.into_parts().1;
stream.concat2() stream.concat2()
.map(|body| ::std::str::from_utf8(&body).unwrap().to_string()) .map(|body| ::std::str::from_utf8(&body).unwrap().to_string())
.wait() .wait()
.unwrap() .expect("get() wait body")
}
pub fn request_async(&self, builder: &mut http::request::Builder) -> Box<Future<Item=Response, Error=String> + Send> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send((builder.body(()).unwrap(), tx));
Box::new(rx.then(|oneshot_result| oneshot_result.expect("request canceled")))
} }
pub fn request(&self, builder: &mut http::request::Builder) -> Response { pub fn request(&self, builder: &mut http::request::Builder) -> Response {
let (tx, rx) = oneshot::channel(); self.request_async(builder)
let _ = self.tx.unbounded_send((builder.body(()).unwrap(), tx));
rx.map_err(|_| panic!("client request dropped"))
.wait() .wait()
.map(|result| result.unwrap()) .expect("response")
.unwrap()
} }
pub fn request_builder(&self, path: &str) -> http::request::Builder { pub fn request_builder(&self, path: &str) -> http::request::Builder {
@ -77,6 +96,12 @@ impl Client {
.version(self.version); .version(self.version);
b b
} }
pub fn wait_for_closed(self) {
self.running
.wait()
.expect("wait_for_closed");
}
} }
enum Run { enum Run {
@ -86,14 +111,19 @@ enum Run {
Http2, Http2,
} }
fn run(addr: SocketAddr, version: Run) -> Sender { fn run(addr: SocketAddr, version: Run) -> (Sender, Running) {
let (tx, rx) = mpsc::unbounded::<(Request, oneshot::Sender<Result<Response, String>>)>(); let (tx, mut rx) = mpsc::unbounded::<(Request, oneshot::Sender<Result<Response, String>>)>();
let (running_tx, running_rx) = running();
::std::thread::Builder::new().name("support client".into()).spawn(move || { ::std::thread::Builder::new().name("support client".into()).spawn(move || {
let mut core = Core::new().unwrap(); let mut core = Core::new().expect("client core new");
let reactor = core.handle(); let reactor = core.handle();
let conn = Conn(addr, reactor.clone()); let conn = Conn {
addr,
handle: reactor.clone(),
running: RefCell::new(Some(running_tx)),
};
let work: Box<Future<Item=(), Error=()>> = match version { let work: Box<Future<Item=(), Error=()>> = match version {
Run::Http1 { absolute_uris } => { Run::Http1 { absolute_uris } => {
@ -127,13 +157,13 @@ fn run(addr: SocketAddr, version: Run) -> Sender {
.map_err(|e| println!("client error: {:?}", e))) .map_err(|e| println!("client error: {:?}", e)))
}, },
Run::Http2 => { Run::Http2 => {
let h2 = tower_h2::client::Connect::<Conn, Handle, ()>::new( let connect = tower_h2::client::Connect::<Conn, Handle, ()>::new(
conn, conn,
Default::default(), Default::default(),
reactor.clone(), reactor.clone(),
); );
Box::new(h2.new_service() Box::new(connect.new_service()
.map_err(move |err| println!("connect error ({:?}): {:?}", addr, err)) .map_err(move |err| println!("connect error ({:?}): {:?}", addr, err))
.and_then(move |mut h2| { .and_then(move |mut h2| {
rx.for_each(move |(req, cb)| { rx.for_each(move |(req, cb)| {
@ -157,34 +187,86 @@ fn run(addr: SocketAddr, version: Run) -> Sender {
} }
}; };
core.run(work).unwrap(); core.run(work).expect("support client core run");
}).unwrap(); }).expect("support client thread spawn");
tx (tx, running_rx)
} }
struct Conn(SocketAddr, Handle); /// The "connector". Clones `running` into new connections, so we can signal
/// when all connections are finally closed.
struct Conn {
addr: SocketAddr,
handle: Handle,
/// When this Sender drops, that should mean the connection is closed.
running: RefCell<Option<oneshot::Sender<()>>>,
}
impl Connect for Conn { impl Conn {
type Connected = TcpStream; fn connect_(&self) -> Box<Future<Item = RunningIo, Error = ::std::io::Error>> {
type Error = ::std::io::Error; let running = self.running
type Future = Box<Future<Item = TcpStream, Error = ::std::io::Error>>; .borrow_mut()
.take()
fn connect(&self) -> Self::Future { .expect("connected more than once");
let c = TcpStream::connect(&self.0, &self.1) let c = TcpStream::connect(&self.addr, &self.handle)
.and_then(|tcp| tcp.set_nodelay(true).map(move |_| tcp)); .and_then(|tcp| tcp.set_nodelay(true).map(move |_| tcp))
.map(move |tcp| RunningIo {
inner: tcp,
running: running,
});
Box::new(c) Box::new(c)
} }
} }
impl Connect for Conn {
type Connected = RunningIo;
type Error = ::std::io::Error;
type Future = Box<Future<Item = Self::Connected, Error = ::std::io::Error>>;
fn connect(&self) -> Self::Future {
self.connect_()
}
}
impl hyper::client::Service for Conn { impl hyper::client::Service for Conn {
type Request = hyper::Uri; type Request = hyper::Uri;
type Response = TcpStream; type Response = RunningIo;
type Future = Box<Future<Item = TcpStream, Error = ::std::io::Error>>; type Future = Box<Future<Item = Self::Response, Error = ::std::io::Error>>;
type Error = ::std::io::Error; type Error = ::std::io::Error;
fn call(&self, _: hyper::Uri) -> <Self as hyper::client::Service>::Future { fn call(&self, _: hyper::Uri) -> <Self as hyper::client::Service>::Future {
let c = TcpStream::connect(&self.0, &self.1) self.connect_()
.and_then(|tcp| tcp.set_nodelay(true).map(move |_| tcp));
Box::new(c)
} }
} }
/// A wrapper around a TcpStream, allowing us to signal when the connection
/// is dropped.
struct RunningIo {
inner: TcpStream,
/// When this drops, the related Receiver is notified that the connection
/// is closed.
running: oneshot::Sender<()>,
}
impl io::Read for RunningIo {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
impl io::Write for RunningIo {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl AsyncRead for RunningIo {}
impl AsyncWrite for RunningIo {
fn shutdown(&mut self) -> Poll<(), io::Error> {
AsyncWrite::shutdown(&mut self.inner)
}
}

View File

@ -24,7 +24,6 @@ pub struct Controller {
reports: Option<mpsc::UnboundedSender<pb::telemetry::ReportRequest>>, reports: Option<mpsc::UnboundedSender<pb::telemetry::ReportRequest>>,
} }
#[derive(Debug)]
pub struct Listening { pub struct Listening {
pub addr: SocketAddr, pub addr: SocketAddr,
shutdown: Shutdown, shutdown: Shutdown,

View File

@ -14,9 +14,16 @@ extern crate tokio_core;
pub extern crate tokio_io; pub extern crate tokio_io;
extern crate tower; extern crate tower;
extern crate tower_h2; extern crate tower_h2;
extern crate log;
pub extern crate env_logger; pub extern crate env_logger;
use self::bytes::{BigEndian, Bytes, BytesMut}; use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
pub use std::time::Duration;
use self::bytes::{BigEndian, BytesMut};
pub use self::bytes::Bytes;
pub use self::conduit_proxy::*; pub use self::conduit_proxy::*;
pub use self::futures::*; pub use self::futures::*;
use self::futures::sync::oneshot; use self::futures::sync::oneshot;
@ -27,8 +34,6 @@ use self::tokio_core::net::{TcpListener, TcpStream};
use self::tokio_core::reactor::{Core, Handle}; use self::tokio_core::reactor::{Core, Handle};
use self::tower::{NewService, Service}; use self::tower::{NewService, Service};
use self::tower_h2::{Body, RecvBody}; use self::tower_h2::{Body, RecvBody};
use std::net::SocketAddr;
pub use std::time::Duration;
/// Environment variable for overriding the test patience. /// Environment variable for overriding the test patience.
pub const ENV_TEST_PATIENCE_MS: &'static str = "RUST_TEST_PATIENCE_MS"; pub const ENV_TEST_PATIENCE_MS: &'static str = "RUST_TEST_PATIENCE_MS";
@ -103,18 +108,31 @@ pub mod proxy;
pub mod server; pub mod server;
mod tcp; mod tcp;
pub type Shutdown = oneshot::Sender<()>; pub fn shutdown_signal() -> (Shutdown, ShutdownRx) {
pub type ShutdownRx = future::Then<
oneshot::Receiver<()>,
Result<(), ()>,
fn(Result<(), oneshot::Canceled>) -> Result<(), ()>,
>;
pub fn shutdown_signal() -> (oneshot::Sender<()>, ShutdownRx) {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
(tx, rx.then(|_| { Ok(()) } as _)) (Shutdown { tx }, Box::new(rx.then(|_| Ok(()))))
} }
pub struct Shutdown {
tx: oneshot::Sender<()>,
}
impl Shutdown {
pub fn signal(self) {
// a drop is enough
}
}
pub type ShutdownRx = Box<Future<Item=(), Error=()> + Send>;
/// A channel used to signal when a Client's related connection is running or closed.
pub fn running() -> (oneshot::Sender<()>, Running) {
let (tx, rx) = oneshot::channel();
let rx = Box::new(rx.then(|_| Ok::<(), ()>(())));
(tx, rx)
}
pub type Running = Box<Future<Item=(), Error=()> + Send>;
struct RecvBodyStream(tower_h2::RecvBody); struct RecvBodyStream(tower_h2::RecvBody);

View File

@ -8,7 +8,6 @@ pub fn new() -> Proxy {
Proxy::new() Proxy::new()
} }
#[derive(Debug)]
pub struct Proxy { pub struct Proxy {
controller: Option<controller::Listening>, controller: Option<controller::Listening>,
inbound: Option<server::Listening>, inbound: Option<server::Listening>,
@ -17,9 +16,10 @@ pub struct Proxy {
metrics_flush_interval: Option<Duration>, metrics_flush_interval: Option<Duration>,
inbound_disable_ports_protocol_detection: Option<Vec<u16>>, inbound_disable_ports_protocol_detection: Option<Vec<u16>>,
outbound_disable_ports_protocol_detection: Option<Vec<u16>>, outbound_disable_ports_protocol_detection: Option<Vec<u16>>,
shutdown_signal: Option<Box<Future<Item=(), Error=()> + Send>>,
} }
#[derive(Debug)]
pub struct Listening { pub struct Listening {
pub control: SocketAddr, pub control: SocketAddr,
pub inbound: SocketAddr, pub inbound: SocketAddr,
@ -42,6 +42,7 @@ impl Proxy {
metrics_flush_interval: None, metrics_flush_interval: None,
inbound_disable_ports_protocol_detection: None, inbound_disable_ports_protocol_detection: None,
outbound_disable_ports_protocol_detection: None, outbound_disable_ports_protocol_detection: None,
shutdown_signal: None,
} }
} }
@ -75,6 +76,18 @@ impl Proxy {
self self
} }
pub fn shutdown_signal<F>(mut self, sig: F) -> Self
where
F: Future + Send + 'static,
{
// It doesn't matter what kind of future you give us,
// we'll just wrap it up in a box and trigger when
// it triggers. The results are discarded.
let fut = Box::new(sig.then(|_| Ok(())));
self.shutdown_signal = Some(fut);
self
}
pub fn run(self) -> Listening { pub fn run(self) -> Listening {
self.run_with_test_env(config::TestEnv::new()) self.run_with_test_env(config::TestEnv::new())
} }
@ -170,6 +183,10 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening {
let (running_tx, running_rx) = oneshot::channel(); let (running_tx, running_rx) = oneshot::channel();
let (tx, mut rx) = shutdown_signal(); let (tx, mut rx) = shutdown_signal();
if let Some(fut) = proxy.shutdown_signal {
rx = Box::new(rx.select(fut).then(|_| Ok(())));
}
::std::thread::Builder::new() ::std::thread::Builder::new()
.name("support proxy".into()) .name("support proxy".into())
.spawn(move || { .spawn(move || {

View File

@ -21,13 +21,11 @@ pub fn tcp() -> tcp::TcpServer {
tcp::server() tcp::server()
} }
#[derive(Debug)]
pub struct Server { pub struct Server {
routes: HashMap<String, Route>, routes: HashMap<String, Route>,
version: Run, version: Run,
} }
#[derive(Debug)]
pub struct Listening { pub struct Listening {
pub addr: SocketAddr, pub addr: SocketAddr,
pub(super) shutdown: Shutdown, pub(super) shutdown: Shutdown,
@ -55,14 +53,28 @@ impl Server {
Server::new(Run::Http2) Server::new(Run::Http2)
} }
/// Return a string body as a 200 OK response, with the string as
/// the response body.
pub fn route(mut self, path: &str, resp: &str) -> Self { pub fn route(mut self, path: &str, resp: &str) -> Self {
self.routes.insert(path.into(), Route::string(resp)); self.routes.insert(path.into(), Route::string(resp));
self self
} }
/// Return a 200 OK response with no body when the path matches.
pub fn route_empty_ok(self, path: &str) -> Self {
self.route_fn(path, |_| {
Response::builder()
.header("content-length", "0")
.body(Default::default())
.unwrap()
})
}
/// Call a closure when the request matches, returning a response
/// to send back.
pub fn route_fn<F>(mut self, path: &str, cb: F) -> Self pub fn route_fn<F>(mut self, path: &str, cb: F) -> Self
where where
F: Fn(Request<()>) -> Response<String> + Send + 'static, F: Fn(Request<()>) -> Response<Bytes> + Send + 'static,
{ {
self.routes.insert(path.into(), Route(Box::new(cb))); self.routes.insert(path.into(), Route(Box::new(cb)));
self self
@ -74,7 +86,7 @@ impl Server {
resp: &str, resp: &str,
latency: Duration latency: Duration
) -> Self { ) -> Self {
let resp = resp.to_owned(); let resp = Bytes::from(resp);
let route = Route(Box::new(move |_| { let route = Route(Box::new(move |_| {
thread::sleep(latency); thread::sleep(latency);
http::Response::builder() http::Response::builder()
@ -202,11 +214,11 @@ impl Body for RspBody {
} }
} }
struct Route(Box<Fn(Request<()>) -> Response<String> + Send>); struct Route(Box<Fn(Request<()>) -> Response<Bytes> + Send>);
impl Route { impl Route {
fn string(body: &str) -> Route { fn string(body: &str) -> Route {
let body = body.to_owned(); let body = Bytes::from(body);
Route(Box::new(move |_| { Route(Box::new(move |_| {
http::Response::builder() http::Response::builder()
.status(200) .status(200)
@ -239,7 +251,7 @@ impl Service for Svc {
let rsp = match self.0.get(req.uri().path()) { let rsp = match self.0.get(req.uri().path()) {
Some(route) => { Some(route) => {
(route.0)(req.map(|_| ())) (route.0)(req.map(|_| ()))
.map(|s| RspBody::new(s.as_bytes().into())) .map(|s| RspBody::new(s))
} }
None => { None => {
println!("server 404: {:?}", req.uri().path()); println!("server 404: {:?}", req.uri().path());

View File

@ -655,7 +655,9 @@ fn http1_response_end_of_file() {
fn http1_one_connection_per_host() { fn http1_one_connection_per_host() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let srv = server::http1().route("/", "hello").run(); let srv = server::http1()
.route_empty_ok("/")
.run();
let ctrl = controller::new() let ctrl = controller::new()
.run(); .run();
let proxy = proxy::new().controller(ctrl).inbound(srv).run(); let proxy = proxy::new().controller(ctrl).inbound(srv).run();
@ -714,7 +716,9 @@ fn http1_one_connection_per_host() {
fn http1_requests_without_host_have_unique_connections() { fn http1_requests_without_host_have_unique_connections() {
let _ = env_logger::try_init(); let _ = env_logger::try_init();
let srv = server::http1().route("/", "hello").run(); let srv = server::http1()
.route_empty_ok("/")
.run();
let ctrl = controller::new() let ctrl = controller::new()
.run(); .run();
let proxy = proxy::new().controller(ctrl).inbound(srv).run(); let proxy = proxy::new().controller(ctrl).inbound(srv).run();