From 30ae471dda075a15c7126f6cd2f07c320bf08bcf Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Wed, 30 May 2018 13:41:59 -0700 Subject: [PATCH] proxy: Add rich logging contexts (#1037) While debugging proxy issues, I found it necessary to change how logging contexts are instrumented, especially for clients. This change moves away from using `Debug` types to in favor of `Display` types. Furthermore, the `logging` module now provides a uniform set of logging contexts to be used throughout the application. All clients, servers, and background tasks should now be instrumented so that their log messages contain predictable metadata. Some small improvements have been made to ensure that logging contexts are correct when a `Future` is dropped (which is important for some H2 uses, especially). --- proxy/src/bind.rs | 21 +- proxy/src/control/destination/background.rs | 7 +- proxy/src/control/mod.rs | 2 +- proxy/src/inbound.rs | 6 +- proxy/src/lib.rs | 106 ++++---- proxy/src/logging.rs | 275 +++++++++++++++++--- proxy/src/outbound.rs | 22 +- proxy/src/task.rs | 50 ++++ proxy/src/telemetry/control.rs | 40 +-- proxy/src/transparency/client.rs | 75 ++++-- proxy/src/transparency/server.rs | 27 +- proxy/src/transport/connect.rs | 15 +- 12 files changed, 497 insertions(+), 149 deletions(-) diff --git a/proxy/src/bind.rs b/proxy/src/bind.rs index 9924fbaa7..ad576c976 100644 --- a/proxy/src/bind.rs +++ b/proxy/src/bind.rs @@ -2,6 +2,7 @@ use std::error::Error; use std::fmt; use std::default::Default; use std::marker::PhantomData; +use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::AtomicUsize; @@ -132,7 +133,11 @@ pub type HttpResponse = http::Response>; pub type HttpRequest = http::Request>; -pub type Client = transparency::Client, B>; +pub type Client = transparency::Client< + sensor::Connect, + ::logging::ClientExecutor<&'static str, SocketAddr>, + B, +>; #[derive(Copy, Clone, Debug)] pub enum BufferSpawnError { @@ -218,9 +223,12 @@ where &client_ctx, ); + let log = ::logging::Client::proxy(&self.ctx, addr) + .with_protocol(protocol.clone()); let client = transparency::Client::new( protocol, connect, + log.executor(), ); let sensors = self.sensors.http( @@ -376,11 +384,15 @@ where let ready = match self.binding { // A service is already bound, so poll its readiness. Binding::Bound(ref mut svc) | - Binding::BindsPerRequest { next: Some(ref mut svc) } => svc.poll_ready(), + Binding::BindsPerRequest { next: Some(ref mut svc) } => { + trace!("poll_ready: stack already bound"); + svc.poll_ready() + } // If no stack has been bound, bind it now so that its readiness can be // checked. Store it so it can be consumed to dispatch the next request. Binding::BindsPerRequest { ref mut next } => { + trace!("poll_ready: binding stack"); let mut svc = self.bind.bind_stack(&self.endpoint, &self.protocol); let ready = svc.poll_ready(); *next = Some(svc); @@ -400,12 +412,16 @@ where if !self.debounce_connect_error_log { self.debounce_connect_error_log = true; warn!("connect error to {:?}: {}", self.endpoint, err); + } else { + debug!("connect error to {:?}: {}", self.endpoint, err); } match self.binding { Binding::Bound(ref mut svc) => { + trace!("poll_ready: binding stack after error"); *svc = self.bind.bind_stack(&self.endpoint, &self.protocol); }, Binding::BindsPerRequest { ref mut next } => { + trace!("poll_ready: dropping bound stack after error"); next.take(); } } @@ -425,6 +441,7 @@ where // don't debounce on NotReady... Ok(Async::NotReady) => Ok(Async::NotReady), other => { + trace!("poll_ready: ready for business"); self.debounce_connect_error_log = false; other }, diff --git a/proxy/src/control/destination/background.rs b/proxy/src/control/destination/background.rs index 3e4689ad8..20f212bcf 100644 --- a/proxy/src/control/destination/background.rs +++ b/proxy/src/control/destination/background.rs @@ -32,7 +32,6 @@ use control::{ AddOrigin, Backoff, LogErrors }; use dns::{self, IpAddrListFuture}; -use task::LazyExecutor; use telemetry::metrics::DstLabels; use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect}; use timeout::Timeout; @@ -78,18 +77,18 @@ pub(super) fn task( { // Build up the Controller Client Stack let mut client = { - let ctx = ("controller-client", format!("{:?}", host_and_port)); let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap(); let authority = http::uri::Authority::from(&host_and_port); let connect = Timeout::new( - LookupAddressAndConnect::new(host_and_port, dns_resolver.clone()), + LookupAddressAndConnect::new(host_and_port.clone(), dns_resolver.clone()), Duration::from_secs(3), ); + let log = ::logging::admin().client("control", host_and_port.clone()); let h2_client = tower_h2::client::Connect::new( connect, h2::client::Builder::default(), - ::logging::context_executor(ctx, LazyExecutor), + log.executor() ); let reconnect = Reconnect::new(h2_client); diff --git a/proxy/src/control/mod.rs b/proxy/src/control/mod.rs index 088d38064..6517a952f 100644 --- a/proxy/src/control/mod.rs +++ b/proxy/src/control/mod.rs @@ -47,7 +47,7 @@ impl Backoff { impl Service for Backoff where S: Service, - S::Error: ::std::fmt::Debug, + S::Error: fmt::Debug, { type Request = S::Request; type Response = S::Response; diff --git a/proxy/src/inbound.rs b/proxy/src/inbound.rs index 727ebfd18..140512e70 100644 --- a/proxy/src/inbound.rs +++ b/proxy/src/inbound.rs @@ -10,7 +10,6 @@ use conduit_proxy_router::Recognize; use bind; use ctx; -use task::LazyExecutor; type Bind = bind::Bind, B>; @@ -88,7 +87,10 @@ where let endpoint = (*addr).into(); let binding = self.bind.bind_service(&endpoint, proto); - Buffer::new(binding, &LazyExecutor) + + let log = ::logging::proxy().client("in", "local") + .with_remote(*addr); + Buffer::new(binding, &log.executor()) .map(|buffer| { InFlightLimit::new(buffer, MAX_IN_FLIGHT) }) diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 7a44fe5e5..f3ae54803 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -219,7 +219,7 @@ where panic!("invalid DNS configuration: {:?}", e); }); - let (control, control_bg) = control::destination::new( + let (resolver, resolver_bg) = control::destination::new( dns_resolver.clone(), config.pod_namespace.clone(), control_host_and_port @@ -245,7 +245,6 @@ where config.inbound_router_max_idle_age, ); serve( - "inbound", inbound_listener, router, config.private_connect_timeout, @@ -264,12 +263,11 @@ where let ctx = ctx::Proxy::outbound(&process_ctx); let bind = bind.clone().with_ctx(ctx.clone()); let router = Router::new( - Outbound::new(bind, control, config.bind_timeout), + Outbound::new(bind, resolver, config.bind_timeout), config.outbound_router_capacity, config.outbound_router_max_idle_age, ); serve( - "outbound", outbound_listener, router, config.public_connect_timeout, @@ -283,35 +281,32 @@ where trace!("running"); - let (_tx, controller_shutdown_signal) = futures::sync::oneshot::channel::<()>(); + let (_tx, admin_shutdown_signal) = futures::sync::oneshot::channel::<()>(); { thread::Builder::new() - .name("controller-client".into()) + .name("admin".into()) .spawn(move || { use conduit_proxy_controller_grpc::tap::server::TapServer; let mut rt = current_thread::Runtime::new() - .expect("initialize controller-client thread runtime"); + .expect("initialize admin thread runtime"); - let new_service = TapServer::new(observe); + let tap = serve_tap(control_listener, TapServer::new(observe)); - let server = serve_control(control_listener, new_service); + let metrics_server = telemetry.serve_metrics(metrics_listener); - let metrics_server = telemetry - .serve_metrics(metrics_listener); - - let fut = control_bg.join4( - server.map_err(|_| {}), - telemetry, - metrics_server.map_err(|_| {}), - ).map(|_| {}); - let fut = ::logging::context_future("controller-client", fut); + let fut = ::logging::admin().bg("resolver").future(resolver_bg) + .join4( + ::logging::admin().bg("telemetry").future(telemetry), + tap.map_err(|_| {}), + metrics_server.map_err(|_| {}), + ).map(|_| {}); rt.spawn(Box::new(fut)); - let shutdown = controller_shutdown_signal.then(|_| Ok::<(), ()>(())); - rt.block_on(shutdown).expect("controller api"); - trace!("controller client shutdown finished"); + let shutdown = admin_shutdown_signal.then(|_| Ok::<(), ()>(())); + rt.block_on(shutdown).expect("admin"); + trace!("admin shutdown finished"); }) .expect("initialize controller api thread"); trace!("controller client thread spawned"); @@ -335,7 +330,6 @@ where } fn serve( - name: &'static str, bound_port: BoundPort, router: Router, tcp_connect_timeout: Duration, @@ -400,7 +394,7 @@ where let listen_addr = bound_port.local_addr(); let server = Server::new( listen_addr, - proxy_ctx, + proxy_ctx.clone(), sensors, get_orig_dst, stack, @@ -408,20 +402,22 @@ where disable_protocol_detection_ports, drain_rx.clone(), ); + let log = server.log().clone(); - - let accept = bound_port.listen_and_fold( - (), - move |(), (connection, remote_addr)| { - let s = server.serve(connection, remote_addr); - let s = ::logging::context_future((name, remote_addr), s); - let r = DefaultExecutor::current() - .spawn(Box::new(s)) - .map_err(task::Error::into_io); - future::result(r) - }, - ); - let accept = ::logging::context_future(name, accept); + let accept = { + let fut = bound_port.listen_and_fold( + (), + move |(), (connection, remote_addr)| { + let s = server.serve(connection, remote_addr); + // Logging context is configured by the server. + let r = DefaultExecutor::current() + .spawn(Box::new(s)) + .map_err(task::Error::into_io); + future::result(r) + }, + ); + log.future(fut) + }; let accept_until = Cancelable { future: accept, @@ -460,7 +456,7 @@ where } } -fn serve_control( +fn serve_tap( bound_port: BoundPort, new_service: N, ) -> impl Future + 'static @@ -475,28 +471,36 @@ where tower_h2::server::Connection< connection::Connection, N, - task::LazyExecutor, + ::logging::ServerExecutor, B, () >: Future, { + let log = logging::admin().server("tap", bound_port.local_addr()); + let h2_builder = h2::server::Builder::default(); let server = tower_h2::Server::new( new_service, h2_builder, - task::LazyExecutor + log.clone().executor(), ); - bound_port.listen_and_fold( - server, - move |server, (session, _)| { - let s = server.serve(session).map_err(|_| ()); - let s = ::logging::context_future("serve_control", s); - let r = executor::current_thread::TaskExecutor::current() - .spawn_local(Box::new(s)) - .map(move |_| server) - .map_err(task::Error::into_io); - future::result(r) - }, - ) + let fut = { + let log = log.clone(); + bound_port.listen_and_fold( + server, + move |server, (session, remote)| { + let log = log.clone().with_remote(remote); + let serve = server.serve(session).map_err(|_| ()); + + let r = executor::current_thread::TaskExecutor::current() + .spawn_local(Box::new(log.future(serve))) + .map(move |_| server) + .map_err(task::Error::into_io); + future::result(r) + }, + ) + }; + + log.future(fut) } diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index 073e52a44..586792f0a 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -2,6 +2,7 @@ use std::cell::RefCell; use std::env; use std::io::Write; use std::fmt; +use std::net::SocketAddr; use std::sync::Arc; use env_logger; @@ -12,7 +13,7 @@ use log::{Level}; const ENV_LOG: &str = "CONDUIT_PROXY_LOG"; thread_local! { - static CONTEXT: RefCell> = RefCell::new(Vec::new()); + static CONTEXT: RefCell> = RefCell::new(Vec::new()); } pub fn init() { @@ -28,10 +29,10 @@ pub fn init() { }; writeln!( fmt, - "{} {} {:?}{}", + "{} {}{} {}", level, - record.target(), Context(&ctxt.borrow()), + record.target(), record.args() ) }) @@ -40,43 +41,42 @@ pub fn init() { .init(); } -/// Execute a closure with a `Debug` item attached to allow log messages. +/// Execute a closure with a `Display` item attached to allow log messages. pub fn context(context: &T, mut closure: F) -> U where - T: ::std::fmt::Debug + 'static, + T: fmt::Display + 'static, F: FnMut() -> U, { let _guard = ContextGuard::new(context); closure() } -/// Wrap a `Future` with a `Debug` value that will be inserted into all logs +/// Wrap a `Future` with a `Display` value that will be inserted into all logs /// created by this Future. -pub fn context_future(context: T, future: F) -> ContextualFuture { +pub fn context_future(context: T, future: F) -> ContextualFuture { ContextualFuture { context, - future, + future: Some(future), } } -/// Wrap an `Executor` to spawn futures that have a reference to the `Debug` +/// Wrap `task::LazyExecutor` to spawn futures that have a reference to the `Display` /// value, inserting it into all logs created by this future. -pub fn context_executor(context: T, executor: E) -> ContextualExecutor { +pub fn context_executor(context: T) -> ContextualExecutor { ContextualExecutor { context: Arc::new(context), - executor, } } #[derive(Debug)] -pub struct ContextualFuture { +pub struct ContextualFuture { context: T, - future: F, + future: Option, } impl Future for ContextualFuture where - T: ::std::fmt::Debug + 'static, + T: fmt::Display + 'static, F: Future, { type Item = F::Item; @@ -84,39 +84,71 @@ where fn poll(&mut self) -> Poll { let ctxt = &self.context; - let fut = &mut self.future; + let fut = self.future.as_mut().expect("poll after drop"); context(ctxt, || fut.poll()) } } - -#[derive(Clone, Debug)] -pub struct ContextualExecutor { - context: Arc, - executor: E, +impl Drop for ContextualFuture +where + T: fmt::Display + 'static, + F: Future, +{ + fn drop(&mut self) { + if self.future.is_some() { + let ctxt = &self.context; + let fut = &mut self.future; + context(ctxt, || drop(fut.take())) + } + } } -impl Executor for ContextualExecutor +#[derive(Debug)] +pub struct ContextualExecutor { + context: Arc, +} + +impl ::tokio::executor::Executor for ContextualExecutor where - T: ::std::fmt::Debug + 'static, - E: Executor, F>>, - F: Future, + T: fmt::Display + 'static + Send + Sync, { - fn execute(&self, future: F) -> Result<(), ExecuteError> { + fn spawn( + &mut self, + future: Box + 'static + Send> + ) -> ::std::result::Result<(), ::tokio::executor::SpawnError> { let fut = context_future(self.context.clone(), future); - match self.executor.execute(fut) { + ::task::LazyExecutor.spawn(Box::new(fut)) + } +} + +impl Executor for ContextualExecutor +where + T: fmt::Display + 'static + Send + Sync, + F: Future + 'static + Send, +{ + fn execute(&self, future: F) -> ::std::result::Result<(), ExecuteError> { + let fut = context_future(self.context.clone(), future); + match ::task::LazyExecutor.execute(fut) { Ok(()) => Ok(()), Err(err) => { let kind = err.kind(); - let future = err.into_future(); - Err(ExecuteError::new(kind, future.future)) + let mut future = err.into_future(); + Err(ExecuteError::new(kind, future.future.take().expect("future"))) } } } } -struct Context<'a>(&'a [*const fmt::Debug]); +impl Clone for ContextualExecutor { + fn clone(&self) -> Self { + Self { + context: self.context.clone(), + } + } +} -impl<'a> fmt::Debug for Context<'a> { +struct Context<'a>(&'a [*const fmt::Display]); + +impl<'a> fmt::Display for Context<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { if self.0.is_empty() { return Ok(()); @@ -125,8 +157,7 @@ impl<'a> fmt::Debug for Context<'a> { for item in self.0 { // See `fn context()` for comments about this unsafe. let item = unsafe { &**item }; - item.fmt(f)?; - f.write_str(", ")?; + write!(f, "{} ", item)?; } Ok(()) } @@ -136,17 +167,17 @@ impl<'a> fmt::Debug for Context<'a> { /// /// Specifically, this protects even if the passed function panics, /// as destructors are run while unwinding. -struct ContextGuard<'a>(&'a (fmt::Debug + 'static)); +struct ContextGuard<'a>(&'a (fmt::Display + 'static)); impl<'a> ContextGuard<'a> { - fn new(context: &'a (fmt::Debug + 'static)) -> Self { + fn new(context: &'a (fmt::Display + 'static)) -> Self { // This is a raw pointer because of lifetime conflicts that require // the thread local to have a static lifetime. // // We don't want to require a static lifetime, and in fact, // only use the reference within this closure, so converting // to a raw pointer is safe. - let raw = context as *const fmt::Debug; + let raw = context as *const fmt::Display; CONTEXT.with(|ctxt| { ctxt.borrow_mut().push(raw); }); @@ -161,3 +192,177 @@ impl<'a> Drop for ContextGuard<'a> { }); } } + +pub fn admin() -> Section { + Section::Admin +} + +pub fn proxy() -> Section { + Section::Proxy +} + +#[derive(Copy, Clone)] +pub enum Section { + Proxy, + Admin, +} + +/// A utility for logging actions taken on behalf of a server task. +#[derive(Clone)] +pub struct Server { + section: Section, + name: &'static str, + listen: SocketAddr, + remote: Option, +} + +/// A utility for logging actions taken on behalf of a client task. +#[derive(Clone)] +pub struct Client { + section: Section, + client: C, + dst: D, + protocol: Option<::bind::Protocol>, + remote: Option, +} + +/// A utility for logging actions taken on behalf of a background task. +#[derive(Clone)] +pub struct Bg { + section: Section, + name: &'static str, +} + +impl Section { + pub fn bg(&self, name: &'static str) -> Bg { + Bg { + section: *self, + name, + } + } + + pub fn server(&self, name: &'static str, listen: SocketAddr) -> Server { + Server { + section: *self, + name, + listen, + remote: None, + } + } + + pub fn client(&self, client: C, dst: D) -> Client { + Client { + section: *self, + client, + dst, + protocol: None, + remote: None, + } + } +} + +impl fmt::Display for Section { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Section::Proxy => "proxy".fmt(f), + Section::Admin => "admin".fmt(f), + } + } +} + +pub type BgFuture = ContextualFuture; +pub type ClientExecutor = ContextualExecutor>; +pub type ServerExecutor = ContextualExecutor; +pub type ServerFuture = ContextualFuture; + +impl Server { + pub fn proxy(ctx: &::ctx::Proxy, listen: SocketAddr) -> Self { + let name = if ctx.is_inbound() { + "in" + } else { + "out" + }; + Section::Proxy.server(name, listen) + } + + pub fn with_remote(self, remote: SocketAddr) -> Self { + Self { + remote: Some(remote), + .. self + } + } + + pub fn executor(self) -> ServerExecutor { + context_executor(self) + } + + pub fn future(self, f: F) -> ServerFuture { + context_future(self, f) + } +} + +impl fmt::Display for Server { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}={{server={} listen={}", self.section, self.name, self.listen)?; + if let Some(remote) = self.remote { + write!(f, " remote={}", remote)?; + } + write!(f, "}}") + } +} + +impl Client<&'static str, D> { + pub fn proxy(ctx: &::ctx::Proxy, dst: D) -> Self { + let name = if ctx.is_inbound() { + "in" + } else { + "out" + }; + Section::Proxy.client(name, dst) + } +} + +impl Client { + pub fn with_protocol(self, p: ::bind::Protocol) -> Self { + Self { + protocol: Some(p), + .. self + } + } + + pub fn with_remote(self, remote: SocketAddr) -> Self { + Self { + remote: Some(remote), + .. self + } + } + + pub fn executor(self) -> ClientExecutor { + context_executor(self) + } +} + +impl fmt::Display for Client { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}={{client={} dst={}", self.section, self.client, self.dst)?; + if let Some(ref proto) = self.protocol { + write!(f, " proto={:?}", proto)?; + } + if let Some(remote) = self.remote { + write!(f, " remote={}", remote)?; + } + write!(f, "}}") + } +} + +impl Bg { + pub fn future(self, f: F) -> BgFuture { + context_future(self, f) + } +} + +impl fmt::Display for Bg { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}={{bg={}}}", self.section, self.name) + } +} diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index fb0aceab1..b2b69d8e4 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -16,7 +16,6 @@ use conduit_proxy_router::Recognize; use bind::{self, Bind, Protocol}; use control::destination::{self, Bind as BindTrait, Resolution}; use ctx; -use task::LazyExecutor; use timeout::Timeout; use transparency::h1; use transport::{DnsNameAndPort, Host, HostAndPort}; @@ -150,19 +149,19 @@ where let loaded = tower_balance::load::WithPendingRequests::new(resolve); - // We can't use `rand::thread_rng` here because the returned `Service` // needs to be `Send`, so instead, we use `LazyRng`, which calls // `rand::thread_rng()` when it is *used*. let balance = tower_balance::power_of_two_choices(loaded, LazyThreadRng); - let buffer = Buffer::new(balance, &LazyExecutor) + let log = ::logging::proxy().client("out", Dst(dest.clone())) + .with_protocol(protocol.clone()); + let buffer = Buffer::new(balance, &log.executor()) .map_err(|_| bind::BufferSpawnError::Outbound)?; let timeout = Timeout::new(buffer, self.bind_timeout); Ok(InFlightLimit::new(timeout, MAX_IN_FLIGHT)) - } } @@ -195,7 +194,7 @@ where // closing down when the connection is no longer usable. if let Some((addr, bind)) = opt.take() { let svc = bind.bind(&addr.into()) - .map_err(|_| BindError::External{ addr })?; + .map_err(|_| BindError::External { addr })?; Ok(Async::Ready(Change::Insert(addr, svc))) } else { Ok(Async::NotReady) @@ -232,3 +231,16 @@ impl error::Error for BindError { fn cause(&self) -> Option<&error::Error> { None } } + +struct Dst(Destination); + +impl fmt::Display for Dst { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.0 { + Destination::Hostname(ref name) => { + write!(f, "{}:{}", name.host, name.port) + } + Destination::ImplicitOriginalDst(ref addr) => addr.fmt(f), + } + } +} diff --git a/proxy/src/task.rs b/proxy/src/task.rs index 71563552c..d6681f925 100644 --- a/proxy/src/task.rs +++ b/proxy/src/task.rs @@ -33,6 +33,9 @@ use std::{ #[derive(Copy, Clone, Debug, Default)] pub struct LazyExecutor; +#[derive(Copy, Clone, Debug, Default)] +pub struct BoxExecutor(E); + /// Indicates which Tokio `Runtime` should be used for the main proxy. /// /// This is either a `tokio::runtime::current_thread::Runtime`, or a @@ -104,6 +107,53 @@ where } } +// ===== impl BoxExecutor =====; + +impl BoxExecutor { + pub fn new(e: E) -> Self { + BoxExecutor(e) + } +} + +impl TokioExecutor for BoxExecutor { + fn spawn( + &mut self, + future: Box + 'static + Send> + ) -> Result<(), SpawnError> { + self.0.spawn(future) + } + + fn status(&self) -> Result<(), SpawnError> { + self.0.status() + } +} + +impl Executor for BoxExecutor +where + F: Future + 'static + Send, + E: TokioExecutor, + E: Executor + Send + 'static>>, +{ + fn execute(&self, future: F) -> Result<(), ExecuteError> { + // Check the status of the executor first, so that we can return the + // future in the `ExecuteError`. If we just called `spawn` and + // `map_err`ed the error into an `ExecuteError`, we'd have to move the + // future into the closure, but it was already moved into `spawn`. + if let Err(e) = self.0.status() { + if e.is_at_capacity() { + return Err(ExecuteError::new(ExecuteErrorKind::NoCapacity, future)); + } else if e.is_shutdown() { + return Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future)); + } else { + panic!("unexpected `SpawnError`: {:?}", e); + } + }; + self.0.execute(Box::new(future)) + .expect("spawn() errored but status() was Ok"); + Ok(()) + } +} + // ===== impl MainRuntime ===== impl MainRuntime { diff --git a/proxy/src/telemetry/control.rs b/proxy/src/telemetry/control.rs index 4d2cbf9d9..e92564c6c 100644 --- a/proxy/src/telemetry/control.rs +++ b/proxy/src/telemetry/control.rs @@ -96,25 +96,33 @@ impl Control { -> impl Future { use hyper; + + let log = ::logging::admin().server("metrics", bound_port.local_addr()); + let service = self.metrics_service.clone(); - bound_port.listen_and_fold( - hyper::server::conn::Http::new(), - move |hyper, (conn, _)| { - let service = service.clone(); - let serve = hyper.serve_connection(conn, service) - .map(|_| {}) - .map_err(|e| { - error!("error serving prometheus metrics: {:?}", e); - }); - let serve = ::logging::context_future("serve_metrics", serve); + let fut = { + let log = log.clone(); + bound_port.listen_and_fold( + hyper::server::conn::Http::new(), + move |hyper, (conn, remote)| { + let service = service.clone(); + let serve = hyper.serve_connection(conn, service) + .map(|_| {}) + .map_err(|e| { + error!("error serving prometheus metrics: {:?}", e); + }); + let serve = log.clone().with_remote(remote).future(serve); - let r = TaskExecutor::current() - .spawn_local(Box::new(serve)) - .map(move |()| hyper) - .map_err(task::Error::into_io); + let r = TaskExecutor::current() + .spawn_local(Box::new(serve)) + .map(move |()| hyper) + .map_err(task::Error::into_io); - future::result(r) - }) + future::result(r) + }) + }; + + log.future(fut) } } diff --git a/proxy/src/transparency/client.rs b/proxy/src/transparency/client.rs index bef111798..8e72facfa 100644 --- a/proxy/src/transparency/client.rs +++ b/proxy/src/transparency/client.rs @@ -1,14 +1,15 @@ use bytes::IntoBuf; -use futures::{Async, Future, Poll}; +use futures::{future, Async, Future, Poll}; use h2; use http; use hyper; +use tokio::executor::Executor; use tokio_connect::Connect; use tower_service::{Service, NewService}; use tower_h2; use bind; -use task::LazyExecutor; +use task::BoxExecutor; use telemetry::sensor::http::RequestBody; use super::glue::{BodyStream, HttpBody, HyperConnect}; @@ -16,75 +17,91 @@ type HyperClient = hyper::Client, BodyStream>>; /// A `NewService` that can speak either HTTP/1 or HTTP/2. -pub struct Client +pub struct Client where B: tower_h2::Body + 'static, + C: Connect + 'static, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, { - inner: ClientInner, + inner: ClientInner, } -enum ClientInner +enum ClientInner where B: tower_h2::Body + 'static, + C: Connect + 'static, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, { Http1(HyperClient), - Http2(tower_h2::client::Connect>), + Http2(tower_h2::client::Connect, RequestBody>), } /// A `Future` returned from `Client::new_service()`. -pub struct ClientNewServiceFuture +pub struct ClientNewServiceFuture where B: tower_h2::Body + 'static, C: Connect + 'static, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, { - inner: ClientNewServiceFutureInner, + inner: ClientNewServiceFutureInner, } -enum ClientNewServiceFutureInner +enum ClientNewServiceFutureInner where B: tower_h2::Body + 'static, C: Connect + 'static, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, { Http1(Option>), - Http2(tower_h2::client::ConnectFuture>), + Http2(tower_h2::client::ConnectFuture, RequestBody>), } /// The `Service` yielded by `Client::new_service()`. -pub struct ClientService +pub struct ClientService where B: tower_h2::Body + 'static, C: Connect, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, { - inner: ClientServiceInner, + inner: ClientServiceInner, } -enum ClientServiceInner +enum ClientServiceInner where B: tower_h2::Body + 'static, - C: Connect + C: Connect, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, { Http1(HyperClient), Http2(tower_h2::client::Connection< ::Connected, - LazyExecutor, + BoxExecutor, RequestBody, >), } -impl Client +impl Client where C: Connect + Clone + Send + Sync + 'static, C::Future: Send + 'static, C::Connected: Send, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, B: tower_h2::Body + Send + 'static, ::Buf: Send + 'static, { /// Create a new `Client`, bound to a specific protocol (HTTP/1 or HTTP/2). - pub fn new(protocol: &bind::Protocol, connect: C) -> Self { + pub fn new(protocol: &bind::Protocol, connect: C, executor: E) -> Self { match *protocol { bind::Protocol::Http1 { was_absolute_form, .. } => { let h1 = hyper::Client::builder() - .executor(LazyExecutor) + .executor(executor) // hyper should never try to automatically set the Host // header, instead always just passing whatever we received. .set_host(false) @@ -98,7 +115,7 @@ where // h2 currently doesn't handle PUSH_PROMISE that well, so we just // disable it for now. h2_builder.enable_push(false); - let h2 = tower_h2::client::Connect::new(connect, h2_builder, LazyExecutor); + let h2 = tower_h2::client::Connect::new(connect, h2_builder, BoxExecutor::new(executor)); Client { inner: ClientInner::Http2(h2), @@ -108,11 +125,13 @@ where } } -impl NewService for Client +impl NewService for Client where C: Connect + Clone + Send + Sync + 'static, C::Future: Send + 'static, C::Connected: Send, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, B: tower_h2::Body + Send + 'static, ::Buf: Send + 'static, { @@ -120,8 +139,8 @@ where type Response = http::Response; type Error = tower_h2::client::Error; type InitError = tower_h2::client::ConnectError; - type Service = ClientService; - type Future = ClientNewServiceFuture; + type Service = ClientService; + type Future = ClientNewServiceFuture; fn new_service(&self) -> Self::Future { let inner = match self.inner { @@ -138,15 +157,17 @@ where } } -impl Future for ClientNewServiceFuture +impl Future for ClientNewServiceFuture where C: Connect + Send + 'static, C::Connected: Send, C::Future: Send + 'static, B: tower_h2::Body + Send + 'static, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, ::Buf: Send + 'static, { - type Item = ClientService; + type Item = ClientService; type Error = tower_h2::client::ConnectError; fn poll(&mut self) -> Poll { @@ -165,11 +186,13 @@ where } } -impl Service for ClientService +impl Service for ClientService where C: Connect + Send + Sync + 'static, C::Connected: Send, C::Future: Send + 'static, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, B: tower_h2::Body + Send + 'static, ::Buf: Send + 'static, { @@ -186,6 +209,8 @@ where } fn call(&mut self, req: Self::Request) -> Self::Future { + debug!("ClientService::call method={} uri={} headers={:?} ext={:?}", + req.method(), req.uri(), req.headers(), req.extensions()); match self.inner { ClientServiceInner::Http1(ref h1) => { let mut req = hyper::Request::from(req.map(BodyStream::new)); diff --git a/proxy/src/transparency/server.rs b/proxy/src/transparency/server.rs index 2af45196b..d11e13b58 100644 --- a/proxy/src/transparency/server.rs +++ b/proxy/src/transparency/server.rs @@ -15,7 +15,6 @@ use connection::{Connection, Peek}; use ctx::Proxy as ProxyCtx; use ctx::transport::{Server as ServerCtx}; use drain; -use task::LazyExecutor; use telemetry::Sensors; use transport::GetOriginalDst; use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc}; @@ -37,12 +36,17 @@ where drain_signal: drain::Watch, get_orig_dst: G, h1: hyper::server::conn::Http, - h2: tower_h2::Server, LazyExecutor, B>, + h2: tower_h2::Server< + HttpBodyNewSvc, + ::logging::ServerExecutor, + B + >, listen_addr: SocketAddr, new_service: S, proxy_ctx: Arc, sensors: Sensors, tcp: tcp::Proxy, + log: ::logging::Server, } impl Server @@ -79,6 +83,7 @@ where ) -> Self { let recv_body_svc = HttpBodyNewSvc::new(stack.clone()); let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone()); + let log = ::logging::Server::proxy(&proxy_ctx, listen_addr); Server { disable_protocol_detection_ports, drain_signal, @@ -87,16 +92,21 @@ where h2: tower_h2::Server::new( recv_body_svc, Default::default(), - LazyExecutor, + log.clone().executor(), ), listen_addr, new_service: stack, proxy_ctx, sensors, tcp, + log, } } + pub fn log(&self) -> &::logging::Server { + &self.log + } + /// Handle a new connection. /// /// This will peek on the connection for the first bytes to determine @@ -117,6 +127,8 @@ where &remote_addr, &orig_dst, ); + let log = self.log.clone() + .with_remote(remote_addr); // record telemetry let io = self.sensors.accept(connection, opened_at, &srv_ctx); @@ -139,7 +151,7 @@ where self.drain_signal.clone(), ); - return Either::B(fut); + return log.future(Either::B(fut)); } // try to sniff protocol @@ -148,7 +160,7 @@ where let tcp = self.tcp.clone(); let new_service = self.new_service.clone(); let drain_signal = self.drain_signal.clone(); - Either::A(io.peek() + let fut = Either::A(io.peek() .map_err(|e| debug!("peek error: {}", e)) .and_then(move |io| { if let Some(proto) = Protocol::detect(io.peeked()) { @@ -171,7 +183,6 @@ where }, Protocol::Http2 => { trace!("transparency detected HTTP/2"); - let set_ctx = move |request: &mut http::Request<()>| { request.extensions_mut().insert(srv_ctx.clone()); }; @@ -194,7 +205,9 @@ where drain_signal, )) } - })) + })); + + log.future(fut) } } diff --git a/proxy/src/transport/connect.rs b/proxy/src/transport/connect.rs index 6f1b8b5d2..61940a86e 100644 --- a/proxy/src/transport/connect.rs +++ b/proxy/src/transport/connect.rs @@ -1,7 +1,7 @@ use futures::Future; use tokio_connect; -use std::io; +use std::{fmt, io}; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; @@ -78,6 +78,19 @@ impl<'a> From<&'a HostAndPort> for http::uri::Authority { } } +impl fmt::Display for HostAndPort { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.host { + Host::DnsName(ref dns) => { + write!(f, "{}:{}", dns, self.port) + } + Host::Ip(ref ip) => { + write!(f, "{}:{}", ip, self.port) + } + } + } +} + // ===== impl Connect ===== impl Connect {