diff --git a/proxy/src/bind.rs b/proxy/src/bind.rs index 9924fbaa7..ad576c976 100644 --- a/proxy/src/bind.rs +++ b/proxy/src/bind.rs @@ -2,6 +2,7 @@ use std::error::Error; use std::fmt; use std::default::Default; use std::marker::PhantomData; +use std::net::SocketAddr; use std::sync::Arc; use std::sync::atomic::AtomicUsize; @@ -132,7 +133,11 @@ pub type HttpResponse = http::Response>; pub type HttpRequest = http::Request>; -pub type Client = transparency::Client, B>; +pub type Client = transparency::Client< + sensor::Connect, + ::logging::ClientExecutor<&'static str, SocketAddr>, + B, +>; #[derive(Copy, Clone, Debug)] pub enum BufferSpawnError { @@ -218,9 +223,12 @@ where &client_ctx, ); + let log = ::logging::Client::proxy(&self.ctx, addr) + .with_protocol(protocol.clone()); let client = transparency::Client::new( protocol, connect, + log.executor(), ); let sensors = self.sensors.http( @@ -376,11 +384,15 @@ where let ready = match self.binding { // A service is already bound, so poll its readiness. Binding::Bound(ref mut svc) | - Binding::BindsPerRequest { next: Some(ref mut svc) } => svc.poll_ready(), + Binding::BindsPerRequest { next: Some(ref mut svc) } => { + trace!("poll_ready: stack already bound"); + svc.poll_ready() + } // If no stack has been bound, bind it now so that its readiness can be // checked. Store it so it can be consumed to dispatch the next request. Binding::BindsPerRequest { ref mut next } => { + trace!("poll_ready: binding stack"); let mut svc = self.bind.bind_stack(&self.endpoint, &self.protocol); let ready = svc.poll_ready(); *next = Some(svc); @@ -400,12 +412,16 @@ where if !self.debounce_connect_error_log { self.debounce_connect_error_log = true; warn!("connect error to {:?}: {}", self.endpoint, err); + } else { + debug!("connect error to {:?}: {}", self.endpoint, err); } match self.binding { Binding::Bound(ref mut svc) => { + trace!("poll_ready: binding stack after error"); *svc = self.bind.bind_stack(&self.endpoint, &self.protocol); }, Binding::BindsPerRequest { ref mut next } => { + trace!("poll_ready: dropping bound stack after error"); next.take(); } } @@ -425,6 +441,7 @@ where // don't debounce on NotReady... Ok(Async::NotReady) => Ok(Async::NotReady), other => { + trace!("poll_ready: ready for business"); self.debounce_connect_error_log = false; other }, diff --git a/proxy/src/control/destination/background.rs b/proxy/src/control/destination/background.rs index 3e4689ad8..20f212bcf 100644 --- a/proxy/src/control/destination/background.rs +++ b/proxy/src/control/destination/background.rs @@ -32,7 +32,6 @@ use control::{ AddOrigin, Backoff, LogErrors }; use dns::{self, IpAddrListFuture}; -use task::LazyExecutor; use telemetry::metrics::DstLabels; use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect}; use timeout::Timeout; @@ -78,18 +77,18 @@ pub(super) fn task( { // Build up the Controller Client Stack let mut client = { - let ctx = ("controller-client", format!("{:?}", host_and_port)); let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap(); let authority = http::uri::Authority::from(&host_and_port); let connect = Timeout::new( - LookupAddressAndConnect::new(host_and_port, dns_resolver.clone()), + LookupAddressAndConnect::new(host_and_port.clone(), dns_resolver.clone()), Duration::from_secs(3), ); + let log = ::logging::admin().client("control", host_and_port.clone()); let h2_client = tower_h2::client::Connect::new( connect, h2::client::Builder::default(), - ::logging::context_executor(ctx, LazyExecutor), + log.executor() ); let reconnect = Reconnect::new(h2_client); diff --git a/proxy/src/control/mod.rs b/proxy/src/control/mod.rs index 088d38064..6517a952f 100644 --- a/proxy/src/control/mod.rs +++ b/proxy/src/control/mod.rs @@ -47,7 +47,7 @@ impl Backoff { impl Service for Backoff where S: Service, - S::Error: ::std::fmt::Debug, + S::Error: fmt::Debug, { type Request = S::Request; type Response = S::Response; diff --git a/proxy/src/inbound.rs b/proxy/src/inbound.rs index 727ebfd18..140512e70 100644 --- a/proxy/src/inbound.rs +++ b/proxy/src/inbound.rs @@ -10,7 +10,6 @@ use conduit_proxy_router::Recognize; use bind; use ctx; -use task::LazyExecutor; type Bind = bind::Bind, B>; @@ -88,7 +87,10 @@ where let endpoint = (*addr).into(); let binding = self.bind.bind_service(&endpoint, proto); - Buffer::new(binding, &LazyExecutor) + + let log = ::logging::proxy().client("in", "local") + .with_remote(*addr); + Buffer::new(binding, &log.executor()) .map(|buffer| { InFlightLimit::new(buffer, MAX_IN_FLIGHT) }) diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 7a44fe5e5..f3ae54803 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -219,7 +219,7 @@ where panic!("invalid DNS configuration: {:?}", e); }); - let (control, control_bg) = control::destination::new( + let (resolver, resolver_bg) = control::destination::new( dns_resolver.clone(), config.pod_namespace.clone(), control_host_and_port @@ -245,7 +245,6 @@ where config.inbound_router_max_idle_age, ); serve( - "inbound", inbound_listener, router, config.private_connect_timeout, @@ -264,12 +263,11 @@ where let ctx = ctx::Proxy::outbound(&process_ctx); let bind = bind.clone().with_ctx(ctx.clone()); let router = Router::new( - Outbound::new(bind, control, config.bind_timeout), + Outbound::new(bind, resolver, config.bind_timeout), config.outbound_router_capacity, config.outbound_router_max_idle_age, ); serve( - "outbound", outbound_listener, router, config.public_connect_timeout, @@ -283,35 +281,32 @@ where trace!("running"); - let (_tx, controller_shutdown_signal) = futures::sync::oneshot::channel::<()>(); + let (_tx, admin_shutdown_signal) = futures::sync::oneshot::channel::<()>(); { thread::Builder::new() - .name("controller-client".into()) + .name("admin".into()) .spawn(move || { use conduit_proxy_controller_grpc::tap::server::TapServer; let mut rt = current_thread::Runtime::new() - .expect("initialize controller-client thread runtime"); + .expect("initialize admin thread runtime"); - let new_service = TapServer::new(observe); + let tap = serve_tap(control_listener, TapServer::new(observe)); - let server = serve_control(control_listener, new_service); + let metrics_server = telemetry.serve_metrics(metrics_listener); - let metrics_server = telemetry - .serve_metrics(metrics_listener); - - let fut = control_bg.join4( - server.map_err(|_| {}), - telemetry, - metrics_server.map_err(|_| {}), - ).map(|_| {}); - let fut = ::logging::context_future("controller-client", fut); + let fut = ::logging::admin().bg("resolver").future(resolver_bg) + .join4( + ::logging::admin().bg("telemetry").future(telemetry), + tap.map_err(|_| {}), + metrics_server.map_err(|_| {}), + ).map(|_| {}); rt.spawn(Box::new(fut)); - let shutdown = controller_shutdown_signal.then(|_| Ok::<(), ()>(())); - rt.block_on(shutdown).expect("controller api"); - trace!("controller client shutdown finished"); + let shutdown = admin_shutdown_signal.then(|_| Ok::<(), ()>(())); + rt.block_on(shutdown).expect("admin"); + trace!("admin shutdown finished"); }) .expect("initialize controller api thread"); trace!("controller client thread spawned"); @@ -335,7 +330,6 @@ where } fn serve( - name: &'static str, bound_port: BoundPort, router: Router, tcp_connect_timeout: Duration, @@ -400,7 +394,7 @@ where let listen_addr = bound_port.local_addr(); let server = Server::new( listen_addr, - proxy_ctx, + proxy_ctx.clone(), sensors, get_orig_dst, stack, @@ -408,20 +402,22 @@ where disable_protocol_detection_ports, drain_rx.clone(), ); + let log = server.log().clone(); - - let accept = bound_port.listen_and_fold( - (), - move |(), (connection, remote_addr)| { - let s = server.serve(connection, remote_addr); - let s = ::logging::context_future((name, remote_addr), s); - let r = DefaultExecutor::current() - .spawn(Box::new(s)) - .map_err(task::Error::into_io); - future::result(r) - }, - ); - let accept = ::logging::context_future(name, accept); + let accept = { + let fut = bound_port.listen_and_fold( + (), + move |(), (connection, remote_addr)| { + let s = server.serve(connection, remote_addr); + // Logging context is configured by the server. + let r = DefaultExecutor::current() + .spawn(Box::new(s)) + .map_err(task::Error::into_io); + future::result(r) + }, + ); + log.future(fut) + }; let accept_until = Cancelable { future: accept, @@ -460,7 +456,7 @@ where } } -fn serve_control( +fn serve_tap( bound_port: BoundPort, new_service: N, ) -> impl Future + 'static @@ -475,28 +471,36 @@ where tower_h2::server::Connection< connection::Connection, N, - task::LazyExecutor, + ::logging::ServerExecutor, B, () >: Future, { + let log = logging::admin().server("tap", bound_port.local_addr()); + let h2_builder = h2::server::Builder::default(); let server = tower_h2::Server::new( new_service, h2_builder, - task::LazyExecutor + log.clone().executor(), ); - bound_port.listen_and_fold( - server, - move |server, (session, _)| { - let s = server.serve(session).map_err(|_| ()); - let s = ::logging::context_future("serve_control", s); - let r = executor::current_thread::TaskExecutor::current() - .spawn_local(Box::new(s)) - .map(move |_| server) - .map_err(task::Error::into_io); - future::result(r) - }, - ) + let fut = { + let log = log.clone(); + bound_port.listen_and_fold( + server, + move |server, (session, remote)| { + let log = log.clone().with_remote(remote); + let serve = server.serve(session).map_err(|_| ()); + + let r = executor::current_thread::TaskExecutor::current() + .spawn_local(Box::new(log.future(serve))) + .map(move |_| server) + .map_err(task::Error::into_io); + future::result(r) + }, + ) + }; + + log.future(fut) } diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index 073e52a44..586792f0a 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -2,6 +2,7 @@ use std::cell::RefCell; use std::env; use std::io::Write; use std::fmt; +use std::net::SocketAddr; use std::sync::Arc; use env_logger; @@ -12,7 +13,7 @@ use log::{Level}; const ENV_LOG: &str = "CONDUIT_PROXY_LOG"; thread_local! { - static CONTEXT: RefCell> = RefCell::new(Vec::new()); + static CONTEXT: RefCell> = RefCell::new(Vec::new()); } pub fn init() { @@ -28,10 +29,10 @@ pub fn init() { }; writeln!( fmt, - "{} {} {:?}{}", + "{} {}{} {}", level, - record.target(), Context(&ctxt.borrow()), + record.target(), record.args() ) }) @@ -40,43 +41,42 @@ pub fn init() { .init(); } -/// Execute a closure with a `Debug` item attached to allow log messages. +/// Execute a closure with a `Display` item attached to allow log messages. pub fn context(context: &T, mut closure: F) -> U where - T: ::std::fmt::Debug + 'static, + T: fmt::Display + 'static, F: FnMut() -> U, { let _guard = ContextGuard::new(context); closure() } -/// Wrap a `Future` with a `Debug` value that will be inserted into all logs +/// Wrap a `Future` with a `Display` value that will be inserted into all logs /// created by this Future. -pub fn context_future(context: T, future: F) -> ContextualFuture { +pub fn context_future(context: T, future: F) -> ContextualFuture { ContextualFuture { context, - future, + future: Some(future), } } -/// Wrap an `Executor` to spawn futures that have a reference to the `Debug` +/// Wrap `task::LazyExecutor` to spawn futures that have a reference to the `Display` /// value, inserting it into all logs created by this future. -pub fn context_executor(context: T, executor: E) -> ContextualExecutor { +pub fn context_executor(context: T) -> ContextualExecutor { ContextualExecutor { context: Arc::new(context), - executor, } } #[derive(Debug)] -pub struct ContextualFuture { +pub struct ContextualFuture { context: T, - future: F, + future: Option, } impl Future for ContextualFuture where - T: ::std::fmt::Debug + 'static, + T: fmt::Display + 'static, F: Future, { type Item = F::Item; @@ -84,39 +84,71 @@ where fn poll(&mut self) -> Poll { let ctxt = &self.context; - let fut = &mut self.future; + let fut = self.future.as_mut().expect("poll after drop"); context(ctxt, || fut.poll()) } } - -#[derive(Clone, Debug)] -pub struct ContextualExecutor { - context: Arc, - executor: E, +impl Drop for ContextualFuture +where + T: fmt::Display + 'static, + F: Future, +{ + fn drop(&mut self) { + if self.future.is_some() { + let ctxt = &self.context; + let fut = &mut self.future; + context(ctxt, || drop(fut.take())) + } + } } -impl Executor for ContextualExecutor +#[derive(Debug)] +pub struct ContextualExecutor { + context: Arc, +} + +impl ::tokio::executor::Executor for ContextualExecutor where - T: ::std::fmt::Debug + 'static, - E: Executor, F>>, - F: Future, + T: fmt::Display + 'static + Send + Sync, { - fn execute(&self, future: F) -> Result<(), ExecuteError> { + fn spawn( + &mut self, + future: Box + 'static + Send> + ) -> ::std::result::Result<(), ::tokio::executor::SpawnError> { let fut = context_future(self.context.clone(), future); - match self.executor.execute(fut) { + ::task::LazyExecutor.spawn(Box::new(fut)) + } +} + +impl Executor for ContextualExecutor +where + T: fmt::Display + 'static + Send + Sync, + F: Future + 'static + Send, +{ + fn execute(&self, future: F) -> ::std::result::Result<(), ExecuteError> { + let fut = context_future(self.context.clone(), future); + match ::task::LazyExecutor.execute(fut) { Ok(()) => Ok(()), Err(err) => { let kind = err.kind(); - let future = err.into_future(); - Err(ExecuteError::new(kind, future.future)) + let mut future = err.into_future(); + Err(ExecuteError::new(kind, future.future.take().expect("future"))) } } } } -struct Context<'a>(&'a [*const fmt::Debug]); +impl Clone for ContextualExecutor { + fn clone(&self) -> Self { + Self { + context: self.context.clone(), + } + } +} -impl<'a> fmt::Debug for Context<'a> { +struct Context<'a>(&'a [*const fmt::Display]); + +impl<'a> fmt::Display for Context<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { if self.0.is_empty() { return Ok(()); @@ -125,8 +157,7 @@ impl<'a> fmt::Debug for Context<'a> { for item in self.0 { // See `fn context()` for comments about this unsafe. let item = unsafe { &**item }; - item.fmt(f)?; - f.write_str(", ")?; + write!(f, "{} ", item)?; } Ok(()) } @@ -136,17 +167,17 @@ impl<'a> fmt::Debug for Context<'a> { /// /// Specifically, this protects even if the passed function panics, /// as destructors are run while unwinding. -struct ContextGuard<'a>(&'a (fmt::Debug + 'static)); +struct ContextGuard<'a>(&'a (fmt::Display + 'static)); impl<'a> ContextGuard<'a> { - fn new(context: &'a (fmt::Debug + 'static)) -> Self { + fn new(context: &'a (fmt::Display + 'static)) -> Self { // This is a raw pointer because of lifetime conflicts that require // the thread local to have a static lifetime. // // We don't want to require a static lifetime, and in fact, // only use the reference within this closure, so converting // to a raw pointer is safe. - let raw = context as *const fmt::Debug; + let raw = context as *const fmt::Display; CONTEXT.with(|ctxt| { ctxt.borrow_mut().push(raw); }); @@ -161,3 +192,177 @@ impl<'a> Drop for ContextGuard<'a> { }); } } + +pub fn admin() -> Section { + Section::Admin +} + +pub fn proxy() -> Section { + Section::Proxy +} + +#[derive(Copy, Clone)] +pub enum Section { + Proxy, + Admin, +} + +/// A utility for logging actions taken on behalf of a server task. +#[derive(Clone)] +pub struct Server { + section: Section, + name: &'static str, + listen: SocketAddr, + remote: Option, +} + +/// A utility for logging actions taken on behalf of a client task. +#[derive(Clone)] +pub struct Client { + section: Section, + client: C, + dst: D, + protocol: Option<::bind::Protocol>, + remote: Option, +} + +/// A utility for logging actions taken on behalf of a background task. +#[derive(Clone)] +pub struct Bg { + section: Section, + name: &'static str, +} + +impl Section { + pub fn bg(&self, name: &'static str) -> Bg { + Bg { + section: *self, + name, + } + } + + pub fn server(&self, name: &'static str, listen: SocketAddr) -> Server { + Server { + section: *self, + name, + listen, + remote: None, + } + } + + pub fn client(&self, client: C, dst: D) -> Client { + Client { + section: *self, + client, + dst, + protocol: None, + remote: None, + } + } +} + +impl fmt::Display for Section { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Section::Proxy => "proxy".fmt(f), + Section::Admin => "admin".fmt(f), + } + } +} + +pub type BgFuture = ContextualFuture; +pub type ClientExecutor = ContextualExecutor>; +pub type ServerExecutor = ContextualExecutor; +pub type ServerFuture = ContextualFuture; + +impl Server { + pub fn proxy(ctx: &::ctx::Proxy, listen: SocketAddr) -> Self { + let name = if ctx.is_inbound() { + "in" + } else { + "out" + }; + Section::Proxy.server(name, listen) + } + + pub fn with_remote(self, remote: SocketAddr) -> Self { + Self { + remote: Some(remote), + .. self + } + } + + pub fn executor(self) -> ServerExecutor { + context_executor(self) + } + + pub fn future(self, f: F) -> ServerFuture { + context_future(self, f) + } +} + +impl fmt::Display for Server { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}={{server={} listen={}", self.section, self.name, self.listen)?; + if let Some(remote) = self.remote { + write!(f, " remote={}", remote)?; + } + write!(f, "}}") + } +} + +impl Client<&'static str, D> { + pub fn proxy(ctx: &::ctx::Proxy, dst: D) -> Self { + let name = if ctx.is_inbound() { + "in" + } else { + "out" + }; + Section::Proxy.client(name, dst) + } +} + +impl Client { + pub fn with_protocol(self, p: ::bind::Protocol) -> Self { + Self { + protocol: Some(p), + .. self + } + } + + pub fn with_remote(self, remote: SocketAddr) -> Self { + Self { + remote: Some(remote), + .. self + } + } + + pub fn executor(self) -> ClientExecutor { + context_executor(self) + } +} + +impl fmt::Display for Client { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}={{client={} dst={}", self.section, self.client, self.dst)?; + if let Some(ref proto) = self.protocol { + write!(f, " proto={:?}", proto)?; + } + if let Some(remote) = self.remote { + write!(f, " remote={}", remote)?; + } + write!(f, "}}") + } +} + +impl Bg { + pub fn future(self, f: F) -> BgFuture { + context_future(self, f) + } +} + +impl fmt::Display for Bg { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}={{bg={}}}", self.section, self.name) + } +} diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index fb0aceab1..b2b69d8e4 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -16,7 +16,6 @@ use conduit_proxy_router::Recognize; use bind::{self, Bind, Protocol}; use control::destination::{self, Bind as BindTrait, Resolution}; use ctx; -use task::LazyExecutor; use timeout::Timeout; use transparency::h1; use transport::{DnsNameAndPort, Host, HostAndPort}; @@ -150,19 +149,19 @@ where let loaded = tower_balance::load::WithPendingRequests::new(resolve); - // We can't use `rand::thread_rng` here because the returned `Service` // needs to be `Send`, so instead, we use `LazyRng`, which calls // `rand::thread_rng()` when it is *used*. let balance = tower_balance::power_of_two_choices(loaded, LazyThreadRng); - let buffer = Buffer::new(balance, &LazyExecutor) + let log = ::logging::proxy().client("out", Dst(dest.clone())) + .with_protocol(protocol.clone()); + let buffer = Buffer::new(balance, &log.executor()) .map_err(|_| bind::BufferSpawnError::Outbound)?; let timeout = Timeout::new(buffer, self.bind_timeout); Ok(InFlightLimit::new(timeout, MAX_IN_FLIGHT)) - } } @@ -195,7 +194,7 @@ where // closing down when the connection is no longer usable. if let Some((addr, bind)) = opt.take() { let svc = bind.bind(&addr.into()) - .map_err(|_| BindError::External{ addr })?; + .map_err(|_| BindError::External { addr })?; Ok(Async::Ready(Change::Insert(addr, svc))) } else { Ok(Async::NotReady) @@ -232,3 +231,16 @@ impl error::Error for BindError { fn cause(&self) -> Option<&error::Error> { None } } + +struct Dst(Destination); + +impl fmt::Display for Dst { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.0 { + Destination::Hostname(ref name) => { + write!(f, "{}:{}", name.host, name.port) + } + Destination::ImplicitOriginalDst(ref addr) => addr.fmt(f), + } + } +} diff --git a/proxy/src/task.rs b/proxy/src/task.rs index 71563552c..d6681f925 100644 --- a/proxy/src/task.rs +++ b/proxy/src/task.rs @@ -33,6 +33,9 @@ use std::{ #[derive(Copy, Clone, Debug, Default)] pub struct LazyExecutor; +#[derive(Copy, Clone, Debug, Default)] +pub struct BoxExecutor(E); + /// Indicates which Tokio `Runtime` should be used for the main proxy. /// /// This is either a `tokio::runtime::current_thread::Runtime`, or a @@ -104,6 +107,53 @@ where } } +// ===== impl BoxExecutor =====; + +impl BoxExecutor { + pub fn new(e: E) -> Self { + BoxExecutor(e) + } +} + +impl TokioExecutor for BoxExecutor { + fn spawn( + &mut self, + future: Box + 'static + Send> + ) -> Result<(), SpawnError> { + self.0.spawn(future) + } + + fn status(&self) -> Result<(), SpawnError> { + self.0.status() + } +} + +impl Executor for BoxExecutor +where + F: Future + 'static + Send, + E: TokioExecutor, + E: Executor + Send + 'static>>, +{ + fn execute(&self, future: F) -> Result<(), ExecuteError> { + // Check the status of the executor first, so that we can return the + // future in the `ExecuteError`. If we just called `spawn` and + // `map_err`ed the error into an `ExecuteError`, we'd have to move the + // future into the closure, but it was already moved into `spawn`. + if let Err(e) = self.0.status() { + if e.is_at_capacity() { + return Err(ExecuteError::new(ExecuteErrorKind::NoCapacity, future)); + } else if e.is_shutdown() { + return Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future)); + } else { + panic!("unexpected `SpawnError`: {:?}", e); + } + }; + self.0.execute(Box::new(future)) + .expect("spawn() errored but status() was Ok"); + Ok(()) + } +} + // ===== impl MainRuntime ===== impl MainRuntime { diff --git a/proxy/src/telemetry/control.rs b/proxy/src/telemetry/control.rs index 4d2cbf9d9..e92564c6c 100644 --- a/proxy/src/telemetry/control.rs +++ b/proxy/src/telemetry/control.rs @@ -96,25 +96,33 @@ impl Control { -> impl Future { use hyper; + + let log = ::logging::admin().server("metrics", bound_port.local_addr()); + let service = self.metrics_service.clone(); - bound_port.listen_and_fold( - hyper::server::conn::Http::new(), - move |hyper, (conn, _)| { - let service = service.clone(); - let serve = hyper.serve_connection(conn, service) - .map(|_| {}) - .map_err(|e| { - error!("error serving prometheus metrics: {:?}", e); - }); - let serve = ::logging::context_future("serve_metrics", serve); + let fut = { + let log = log.clone(); + bound_port.listen_and_fold( + hyper::server::conn::Http::new(), + move |hyper, (conn, remote)| { + let service = service.clone(); + let serve = hyper.serve_connection(conn, service) + .map(|_| {}) + .map_err(|e| { + error!("error serving prometheus metrics: {:?}", e); + }); + let serve = log.clone().with_remote(remote).future(serve); - let r = TaskExecutor::current() - .spawn_local(Box::new(serve)) - .map(move |()| hyper) - .map_err(task::Error::into_io); + let r = TaskExecutor::current() + .spawn_local(Box::new(serve)) + .map(move |()| hyper) + .map_err(task::Error::into_io); - future::result(r) - }) + future::result(r) + }) + }; + + log.future(fut) } } diff --git a/proxy/src/transparency/client.rs b/proxy/src/transparency/client.rs index bef111798..8e72facfa 100644 --- a/proxy/src/transparency/client.rs +++ b/proxy/src/transparency/client.rs @@ -1,14 +1,15 @@ use bytes::IntoBuf; -use futures::{Async, Future, Poll}; +use futures::{future, Async, Future, Poll}; use h2; use http; use hyper; +use tokio::executor::Executor; use tokio_connect::Connect; use tower_service::{Service, NewService}; use tower_h2; use bind; -use task::LazyExecutor; +use task::BoxExecutor; use telemetry::sensor::http::RequestBody; use super::glue::{BodyStream, HttpBody, HyperConnect}; @@ -16,75 +17,91 @@ type HyperClient = hyper::Client, BodyStream>>; /// A `NewService` that can speak either HTTP/1 or HTTP/2. -pub struct Client +pub struct Client where B: tower_h2::Body + 'static, + C: Connect + 'static, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, { - inner: ClientInner, + inner: ClientInner, } -enum ClientInner +enum ClientInner where B: tower_h2::Body + 'static, + C: Connect + 'static, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, { Http1(HyperClient), - Http2(tower_h2::client::Connect>), + Http2(tower_h2::client::Connect, RequestBody>), } /// A `Future` returned from `Client::new_service()`. -pub struct ClientNewServiceFuture +pub struct ClientNewServiceFuture where B: tower_h2::Body + 'static, C: Connect + 'static, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, { - inner: ClientNewServiceFutureInner, + inner: ClientNewServiceFutureInner, } -enum ClientNewServiceFutureInner +enum ClientNewServiceFutureInner where B: tower_h2::Body + 'static, C: Connect + 'static, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, { Http1(Option>), - Http2(tower_h2::client::ConnectFuture>), + Http2(tower_h2::client::ConnectFuture, RequestBody>), } /// The `Service` yielded by `Client::new_service()`. -pub struct ClientService +pub struct ClientService where B: tower_h2::Body + 'static, C: Connect, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, { - inner: ClientServiceInner, + inner: ClientServiceInner, } -enum ClientServiceInner +enum ClientServiceInner where B: tower_h2::Body + 'static, - C: Connect + C: Connect, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, { Http1(HyperClient), Http2(tower_h2::client::Connection< ::Connected, - LazyExecutor, + BoxExecutor, RequestBody, >), } -impl Client +impl Client where C: Connect + Clone + Send + Sync + 'static, C::Future: Send + 'static, C::Connected: Send, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, B: tower_h2::Body + Send + 'static, ::Buf: Send + 'static, { /// Create a new `Client`, bound to a specific protocol (HTTP/1 or HTTP/2). - pub fn new(protocol: &bind::Protocol, connect: C) -> Self { + pub fn new(protocol: &bind::Protocol, connect: C, executor: E) -> Self { match *protocol { bind::Protocol::Http1 { was_absolute_form, .. } => { let h1 = hyper::Client::builder() - .executor(LazyExecutor) + .executor(executor) // hyper should never try to automatically set the Host // header, instead always just passing whatever we received. .set_host(false) @@ -98,7 +115,7 @@ where // h2 currently doesn't handle PUSH_PROMISE that well, so we just // disable it for now. h2_builder.enable_push(false); - let h2 = tower_h2::client::Connect::new(connect, h2_builder, LazyExecutor); + let h2 = tower_h2::client::Connect::new(connect, h2_builder, BoxExecutor::new(executor)); Client { inner: ClientInner::Http2(h2), @@ -108,11 +125,13 @@ where } } -impl NewService for Client +impl NewService for Client where C: Connect + Clone + Send + Sync + 'static, C::Future: Send + 'static, C::Connected: Send, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, B: tower_h2::Body + Send + 'static, ::Buf: Send + 'static, { @@ -120,8 +139,8 @@ where type Response = http::Response; type Error = tower_h2::client::Error; type InitError = tower_h2::client::ConnectError; - type Service = ClientService; - type Future = ClientNewServiceFuture; + type Service = ClientService; + type Future = ClientNewServiceFuture; fn new_service(&self) -> Self::Future { let inner = match self.inner { @@ -138,15 +157,17 @@ where } } -impl Future for ClientNewServiceFuture +impl Future for ClientNewServiceFuture where C: Connect + Send + 'static, C::Connected: Send, C::Future: Send + 'static, B: tower_h2::Body + Send + 'static, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, ::Buf: Send + 'static, { - type Item = ClientService; + type Item = ClientService; type Error = tower_h2::client::ConnectError; fn poll(&mut self) -> Poll { @@ -165,11 +186,13 @@ where } } -impl Service for ClientService +impl Service for ClientService where C: Connect + Send + Sync + 'static, C::Connected: Send, C::Future: Send + 'static, + E: Executor + Clone, + E: future::Executor + Send + 'static>> + Send + Sync + 'static, B: tower_h2::Body + Send + 'static, ::Buf: Send + 'static, { @@ -186,6 +209,8 @@ where } fn call(&mut self, req: Self::Request) -> Self::Future { + debug!("ClientService::call method={} uri={} headers={:?} ext={:?}", + req.method(), req.uri(), req.headers(), req.extensions()); match self.inner { ClientServiceInner::Http1(ref h1) => { let mut req = hyper::Request::from(req.map(BodyStream::new)); diff --git a/proxy/src/transparency/server.rs b/proxy/src/transparency/server.rs index 2af45196b..d11e13b58 100644 --- a/proxy/src/transparency/server.rs +++ b/proxy/src/transparency/server.rs @@ -15,7 +15,6 @@ use connection::{Connection, Peek}; use ctx::Proxy as ProxyCtx; use ctx::transport::{Server as ServerCtx}; use drain; -use task::LazyExecutor; use telemetry::Sensors; use transport::GetOriginalDst; use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc}; @@ -37,12 +36,17 @@ where drain_signal: drain::Watch, get_orig_dst: G, h1: hyper::server::conn::Http, - h2: tower_h2::Server, LazyExecutor, B>, + h2: tower_h2::Server< + HttpBodyNewSvc, + ::logging::ServerExecutor, + B + >, listen_addr: SocketAddr, new_service: S, proxy_ctx: Arc, sensors: Sensors, tcp: tcp::Proxy, + log: ::logging::Server, } impl Server @@ -79,6 +83,7 @@ where ) -> Self { let recv_body_svc = HttpBodyNewSvc::new(stack.clone()); let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone()); + let log = ::logging::Server::proxy(&proxy_ctx, listen_addr); Server { disable_protocol_detection_ports, drain_signal, @@ -87,16 +92,21 @@ where h2: tower_h2::Server::new( recv_body_svc, Default::default(), - LazyExecutor, + log.clone().executor(), ), listen_addr, new_service: stack, proxy_ctx, sensors, tcp, + log, } } + pub fn log(&self) -> &::logging::Server { + &self.log + } + /// Handle a new connection. /// /// This will peek on the connection for the first bytes to determine @@ -117,6 +127,8 @@ where &remote_addr, &orig_dst, ); + let log = self.log.clone() + .with_remote(remote_addr); // record telemetry let io = self.sensors.accept(connection, opened_at, &srv_ctx); @@ -139,7 +151,7 @@ where self.drain_signal.clone(), ); - return Either::B(fut); + return log.future(Either::B(fut)); } // try to sniff protocol @@ -148,7 +160,7 @@ where let tcp = self.tcp.clone(); let new_service = self.new_service.clone(); let drain_signal = self.drain_signal.clone(); - Either::A(io.peek() + let fut = Either::A(io.peek() .map_err(|e| debug!("peek error: {}", e)) .and_then(move |io| { if let Some(proto) = Protocol::detect(io.peeked()) { @@ -171,7 +183,6 @@ where }, Protocol::Http2 => { trace!("transparency detected HTTP/2"); - let set_ctx = move |request: &mut http::Request<()>| { request.extensions_mut().insert(srv_ctx.clone()); }; @@ -194,7 +205,9 @@ where drain_signal, )) } - })) + })); + + log.future(fut) } } diff --git a/proxy/src/transport/connect.rs b/proxy/src/transport/connect.rs index 6f1b8b5d2..61940a86e 100644 --- a/proxy/src/transport/connect.rs +++ b/proxy/src/transport/connect.rs @@ -1,7 +1,7 @@ use futures::Future; use tokio_connect; -use std::io; +use std::{fmt, io}; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; @@ -78,6 +78,19 @@ impl<'a> From<&'a HostAndPort> for http::uri::Authority { } } +impl fmt::Display for HostAndPort { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.host { + Host::DnsName(ref dns) => { + write!(f, "{}:{}", dns, self.port) + } + Host::Ip(ref ip) => { + write!(f, "{}:{}", ip, self.port) + } + } + } +} + // ===== impl Connect ===== impl Connect {