proxy: Add rich logging contexts (#1037)

While debugging proxy issues, I found it necessary to change how logging contexts are
instrumented, especially for clients.

This change moves away from using `Debug` types to in favor of `Display` types.
Furthermore, the `logging` module now provides a uniform set of logging contexts to be
used throughout the application.  All clients, servers, and background tasks should now be
instrumented so that their log messages contain predictable metadata.

Some small improvements have been made to ensure that logging contexts are correct
when a `Future` is dropped (which is important for some H2 uses, especially).
This commit is contained in:
Oliver Gould 2018-05-30 13:41:59 -07:00 committed by GitHub
parent db2478f5a2
commit 294de5b3c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 497 additions and 149 deletions

View File

@ -2,6 +2,7 @@ use std::error::Error;
use std::fmt; use std::fmt;
use std::default::Default; use std::default::Default;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
@ -132,7 +133,11 @@ pub type HttpResponse = http::Response<sensor::http::ResponseBody<HttpBody>>;
pub type HttpRequest<B> = http::Request<sensor::http::RequestBody<B>>; pub type HttpRequest<B> = http::Request<sensor::http::RequestBody<B>>;
pub type Client<B> = transparency::Client<sensor::Connect<transport::Connect>, B>; pub type Client<B> = transparency::Client<
sensor::Connect<transport::Connect>,
::logging::ClientExecutor<&'static str, SocketAddr>,
B,
>;
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub enum BufferSpawnError { pub enum BufferSpawnError {
@ -218,9 +223,12 @@ where
&client_ctx, &client_ctx,
); );
let log = ::logging::Client::proxy(&self.ctx, addr)
.with_protocol(protocol.clone());
let client = transparency::Client::new( let client = transparency::Client::new(
protocol, protocol,
connect, connect,
log.executor(),
); );
let sensors = self.sensors.http( let sensors = self.sensors.http(
@ -376,11 +384,15 @@ where
let ready = match self.binding { let ready = match self.binding {
// A service is already bound, so poll its readiness. // A service is already bound, so poll its readiness.
Binding::Bound(ref mut svc) | Binding::Bound(ref mut svc) |
Binding::BindsPerRequest { next: Some(ref mut svc) } => svc.poll_ready(), Binding::BindsPerRequest { next: Some(ref mut svc) } => {
trace!("poll_ready: stack already bound");
svc.poll_ready()
}
// If no stack has been bound, bind it now so that its readiness can be // If no stack has been bound, bind it now so that its readiness can be
// checked. Store it so it can be consumed to dispatch the next request. // checked. Store it so it can be consumed to dispatch the next request.
Binding::BindsPerRequest { ref mut next } => { Binding::BindsPerRequest { ref mut next } => {
trace!("poll_ready: binding stack");
let mut svc = self.bind.bind_stack(&self.endpoint, &self.protocol); let mut svc = self.bind.bind_stack(&self.endpoint, &self.protocol);
let ready = svc.poll_ready(); let ready = svc.poll_ready();
*next = Some(svc); *next = Some(svc);
@ -400,12 +412,16 @@ where
if !self.debounce_connect_error_log { if !self.debounce_connect_error_log {
self.debounce_connect_error_log = true; self.debounce_connect_error_log = true;
warn!("connect error to {:?}: {}", self.endpoint, err); warn!("connect error to {:?}: {}", self.endpoint, err);
} else {
debug!("connect error to {:?}: {}", self.endpoint, err);
} }
match self.binding { match self.binding {
Binding::Bound(ref mut svc) => { Binding::Bound(ref mut svc) => {
trace!("poll_ready: binding stack after error");
*svc = self.bind.bind_stack(&self.endpoint, &self.protocol); *svc = self.bind.bind_stack(&self.endpoint, &self.protocol);
}, },
Binding::BindsPerRequest { ref mut next } => { Binding::BindsPerRequest { ref mut next } => {
trace!("poll_ready: dropping bound stack after error");
next.take(); next.take();
} }
} }
@ -425,6 +441,7 @@ where
// don't debounce on NotReady... // don't debounce on NotReady...
Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::NotReady) => Ok(Async::NotReady),
other => { other => {
trace!("poll_ready: ready for business");
self.debounce_connect_error_log = false; self.debounce_connect_error_log = false;
other other
}, },

View File

@ -32,7 +32,6 @@ use control::{
AddOrigin, Backoff, LogErrors AddOrigin, Backoff, LogErrors
}; };
use dns::{self, IpAddrListFuture}; use dns::{self, IpAddrListFuture};
use task::LazyExecutor;
use telemetry::metrics::DstLabels; use telemetry::metrics::DstLabels;
use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect}; use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect};
use timeout::Timeout; use timeout::Timeout;
@ -78,18 +77,18 @@ pub(super) fn task(
{ {
// Build up the Controller Client Stack // Build up the Controller Client Stack
let mut client = { let mut client = {
let ctx = ("controller-client", format!("{:?}", host_and_port));
let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap(); let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap();
let authority = http::uri::Authority::from(&host_and_port); let authority = http::uri::Authority::from(&host_and_port);
let connect = Timeout::new( let connect = Timeout::new(
LookupAddressAndConnect::new(host_and_port, dns_resolver.clone()), LookupAddressAndConnect::new(host_and_port.clone(), dns_resolver.clone()),
Duration::from_secs(3), Duration::from_secs(3),
); );
let log = ::logging::admin().client("control", host_and_port.clone());
let h2_client = tower_h2::client::Connect::new( let h2_client = tower_h2::client::Connect::new(
connect, connect,
h2::client::Builder::default(), h2::client::Builder::default(),
::logging::context_executor(ctx, LazyExecutor), log.executor()
); );
let reconnect = Reconnect::new(h2_client); let reconnect = Reconnect::new(h2_client);

View File

@ -47,7 +47,7 @@ impl<S> Backoff<S> {
impl<S> Service for Backoff<S> impl<S> Service for Backoff<S>
where where
S: Service, S: Service,
S::Error: ::std::fmt::Debug, S::Error: fmt::Debug,
{ {
type Request = S::Request; type Request = S::Request;
type Response = S::Response; type Response = S::Response;

View File

@ -10,7 +10,6 @@ use conduit_proxy_router::Recognize;
use bind; use bind;
use ctx; use ctx;
use task::LazyExecutor;
type Bind<B> = bind::Bind<Arc<ctx::Proxy>, B>; type Bind<B> = bind::Bind<Arc<ctx::Proxy>, B>;
@ -88,7 +87,10 @@ where
let endpoint = (*addr).into(); let endpoint = (*addr).into();
let binding = self.bind.bind_service(&endpoint, proto); let binding = self.bind.bind_service(&endpoint, proto);
Buffer::new(binding, &LazyExecutor)
let log = ::logging::proxy().client("in", "local")
.with_remote(*addr);
Buffer::new(binding, &log.executor())
.map(|buffer| { .map(|buffer| {
InFlightLimit::new(buffer, MAX_IN_FLIGHT) InFlightLimit::new(buffer, MAX_IN_FLIGHT)
}) })

View File

@ -219,7 +219,7 @@ where
panic!("invalid DNS configuration: {:?}", e); panic!("invalid DNS configuration: {:?}", e);
}); });
let (control, control_bg) = control::destination::new( let (resolver, resolver_bg) = control::destination::new(
dns_resolver.clone(), dns_resolver.clone(),
config.pod_namespace.clone(), config.pod_namespace.clone(),
control_host_and_port control_host_and_port
@ -245,7 +245,6 @@ where
config.inbound_router_max_idle_age, config.inbound_router_max_idle_age,
); );
serve( serve(
"inbound",
inbound_listener, inbound_listener,
router, router,
config.private_connect_timeout, config.private_connect_timeout,
@ -264,12 +263,11 @@ where
let ctx = ctx::Proxy::outbound(&process_ctx); let ctx = ctx::Proxy::outbound(&process_ctx);
let bind = bind.clone().with_ctx(ctx.clone()); let bind = bind.clone().with_ctx(ctx.clone());
let router = Router::new( let router = Router::new(
Outbound::new(bind, control, config.bind_timeout), Outbound::new(bind, resolver, config.bind_timeout),
config.outbound_router_capacity, config.outbound_router_capacity,
config.outbound_router_max_idle_age, config.outbound_router_max_idle_age,
); );
serve( serve(
"outbound",
outbound_listener, outbound_listener,
router, router,
config.public_connect_timeout, config.public_connect_timeout,
@ -283,35 +281,32 @@ where
trace!("running"); trace!("running");
let (_tx, controller_shutdown_signal) = futures::sync::oneshot::channel::<()>(); let (_tx, admin_shutdown_signal) = futures::sync::oneshot::channel::<()>();
{ {
thread::Builder::new() thread::Builder::new()
.name("controller-client".into()) .name("admin".into())
.spawn(move || { .spawn(move || {
use conduit_proxy_controller_grpc::tap::server::TapServer; use conduit_proxy_controller_grpc::tap::server::TapServer;
let mut rt = current_thread::Runtime::new() let mut rt = current_thread::Runtime::new()
.expect("initialize controller-client thread runtime"); .expect("initialize admin thread runtime");
let new_service = TapServer::new(observe); let tap = serve_tap(control_listener, TapServer::new(observe));
let server = serve_control(control_listener, new_service); let metrics_server = telemetry.serve_metrics(metrics_listener);
let metrics_server = telemetry let fut = ::logging::admin().bg("resolver").future(resolver_bg)
.serve_metrics(metrics_listener); .join4(
::logging::admin().bg("telemetry").future(telemetry),
let fut = control_bg.join4( tap.map_err(|_| {}),
server.map_err(|_| {}), metrics_server.map_err(|_| {}),
telemetry, ).map(|_| {});
metrics_server.map_err(|_| {}),
).map(|_| {});
let fut = ::logging::context_future("controller-client", fut);
rt.spawn(Box::new(fut)); rt.spawn(Box::new(fut));
let shutdown = controller_shutdown_signal.then(|_| Ok::<(), ()>(())); let shutdown = admin_shutdown_signal.then(|_| Ok::<(), ()>(()));
rt.block_on(shutdown).expect("controller api"); rt.block_on(shutdown).expect("admin");
trace!("controller client shutdown finished"); trace!("admin shutdown finished");
}) })
.expect("initialize controller api thread"); .expect("initialize controller api thread");
trace!("controller client thread spawned"); trace!("controller client thread spawned");
@ -335,7 +330,6 @@ where
} }
fn serve<R, B, E, F, G>( fn serve<R, B, E, F, G>(
name: &'static str,
bound_port: BoundPort, bound_port: BoundPort,
router: Router<R>, router: Router<R>,
tcp_connect_timeout: Duration, tcp_connect_timeout: Duration,
@ -400,7 +394,7 @@ where
let listen_addr = bound_port.local_addr(); let listen_addr = bound_port.local_addr();
let server = Server::new( let server = Server::new(
listen_addr, listen_addr,
proxy_ctx, proxy_ctx.clone(),
sensors, sensors,
get_orig_dst, get_orig_dst,
stack, stack,
@ -408,20 +402,22 @@ where
disable_protocol_detection_ports, disable_protocol_detection_ports,
drain_rx.clone(), drain_rx.clone(),
); );
let log = server.log().clone();
let accept = {
let accept = bound_port.listen_and_fold( let fut = bound_port.listen_and_fold(
(), (),
move |(), (connection, remote_addr)| { move |(), (connection, remote_addr)| {
let s = server.serve(connection, remote_addr); let s = server.serve(connection, remote_addr);
let s = ::logging::context_future((name, remote_addr), s); // Logging context is configured by the server.
let r = DefaultExecutor::current() let r = DefaultExecutor::current()
.spawn(Box::new(s)) .spawn(Box::new(s))
.map_err(task::Error::into_io); .map_err(task::Error::into_io);
future::result(r) future::result(r)
}, },
); );
let accept = ::logging::context_future(name, accept); log.future(fut)
};
let accept_until = Cancelable { let accept_until = Cancelable {
future: accept, future: accept,
@ -460,7 +456,7 @@ where
} }
} }
fn serve_control<N, B>( fn serve_tap<N, B>(
bound_port: BoundPort, bound_port: BoundPort,
new_service: N, new_service: N,
) -> impl Future<Item = (), Error = io::Error> + 'static ) -> impl Future<Item = (), Error = io::Error> + 'static
@ -475,28 +471,36 @@ where
tower_h2::server::Connection< tower_h2::server::Connection<
connection::Connection, connection::Connection,
N, N,
task::LazyExecutor, ::logging::ServerExecutor,
B, B,
() ()
>: Future<Item = ()>, >: Future<Item = ()>,
{ {
let log = logging::admin().server("tap", bound_port.local_addr());
let h2_builder = h2::server::Builder::default(); let h2_builder = h2::server::Builder::default();
let server = tower_h2::Server::new( let server = tower_h2::Server::new(
new_service, new_service,
h2_builder, h2_builder,
task::LazyExecutor log.clone().executor(),
); );
bound_port.listen_and_fold(
server,
move |server, (session, _)| {
let s = server.serve(session).map_err(|_| ());
let s = ::logging::context_future("serve_control", s);
let r = executor::current_thread::TaskExecutor::current() let fut = {
.spawn_local(Box::new(s)) let log = log.clone();
.map(move |_| server) bound_port.listen_and_fold(
.map_err(task::Error::into_io); server,
future::result(r) move |server, (session, remote)| {
}, let log = log.clone().with_remote(remote);
) let serve = server.serve(session).map_err(|_| ());
let r = executor::current_thread::TaskExecutor::current()
.spawn_local(Box::new(log.future(serve)))
.map(move |_| server)
.map_err(task::Error::into_io);
future::result(r)
},
)
};
log.future(fut)
} }

View File

@ -2,6 +2,7 @@ use std::cell::RefCell;
use std::env; use std::env;
use std::io::Write; use std::io::Write;
use std::fmt; use std::fmt;
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use env_logger; use env_logger;
@ -12,7 +13,7 @@ use log::{Level};
const ENV_LOG: &str = "CONDUIT_PROXY_LOG"; const ENV_LOG: &str = "CONDUIT_PROXY_LOG";
thread_local! { thread_local! {
static CONTEXT: RefCell<Vec<*const fmt::Debug>> = RefCell::new(Vec::new()); static CONTEXT: RefCell<Vec<*const fmt::Display>> = RefCell::new(Vec::new());
} }
pub fn init() { pub fn init() {
@ -28,10 +29,10 @@ pub fn init() {
}; };
writeln!( writeln!(
fmt, fmt,
"{} {} {:?}{}", "{} {}{} {}",
level, level,
record.target(),
Context(&ctxt.borrow()), Context(&ctxt.borrow()),
record.target(),
record.args() record.args()
) )
}) })
@ -40,43 +41,42 @@ pub fn init() {
.init(); .init();
} }
/// Execute a closure with a `Debug` item attached to allow log messages. /// Execute a closure with a `Display` item attached to allow log messages.
pub fn context<T, F, U>(context: &T, mut closure: F) -> U pub fn context<T, F, U>(context: &T, mut closure: F) -> U
where where
T: ::std::fmt::Debug + 'static, T: fmt::Display + 'static,
F: FnMut() -> U, F: FnMut() -> U,
{ {
let _guard = ContextGuard::new(context); let _guard = ContextGuard::new(context);
closure() closure()
} }
/// Wrap a `Future` with a `Debug` value that will be inserted into all logs /// Wrap a `Future` with a `Display` value that will be inserted into all logs
/// created by this Future. /// created by this Future.
pub fn context_future<T, F>(context: T, future: F) -> ContextualFuture<T, F> { pub fn context_future<T: fmt::Display, F: Future>(context: T, future: F) -> ContextualFuture<T, F> {
ContextualFuture { ContextualFuture {
context, context,
future, future: Some(future),
} }
} }
/// Wrap an `Executor` to spawn futures that have a reference to the `Debug` /// Wrap `task::LazyExecutor` to spawn futures that have a reference to the `Display`
/// value, inserting it into all logs created by this future. /// value, inserting it into all logs created by this future.
pub fn context_executor<T, E>(context: T, executor: E) -> ContextualExecutor<T, E> { pub fn context_executor<T: fmt::Display>(context: T) -> ContextualExecutor<T> {
ContextualExecutor { ContextualExecutor {
context: Arc::new(context), context: Arc::new(context),
executor,
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct ContextualFuture<T, F> { pub struct ContextualFuture<T: fmt::Display + 'static, F: Future> {
context: T, context: T,
future: F, future: Option<F>,
} }
impl<T, F> Future for ContextualFuture<T, F> impl<T, F> Future for ContextualFuture<T, F>
where where
T: ::std::fmt::Debug + 'static, T: fmt::Display + 'static,
F: Future, F: Future,
{ {
type Item = F::Item; type Item = F::Item;
@ -84,39 +84,71 @@ where
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let ctxt = &self.context; let ctxt = &self.context;
let fut = &mut self.future; let fut = self.future.as_mut().expect("poll after drop");
context(ctxt, || fut.poll()) context(ctxt, || fut.poll())
} }
} }
impl<T, F> Drop for ContextualFuture<T, F>
#[derive(Clone, Debug)] where
pub struct ContextualExecutor<T, E> { T: fmt::Display + 'static,
context: Arc<T>, F: Future,
executor: E, {
fn drop(&mut self) {
if self.future.is_some() {
let ctxt = &self.context;
let fut = &mut self.future;
context(ctxt, || drop(fut.take()))
}
}
} }
impl<T, E, F> Executor<F> for ContextualExecutor<T, E> #[derive(Debug)]
pub struct ContextualExecutor<T> {
context: Arc<T>,
}
impl<T> ::tokio::executor::Executor for ContextualExecutor<T>
where where
T: ::std::fmt::Debug + 'static, T: fmt::Display + 'static + Send + Sync,
E: Executor<ContextualFuture<Arc<T>, F>>,
F: Future<Item = (), Error = ()>,
{ {
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { fn spawn(
&mut self,
future: Box<Future<Item = (), Error = ()> + 'static + Send>
) -> ::std::result::Result<(), ::tokio::executor::SpawnError> {
let fut = context_future(self.context.clone(), future); let fut = context_future(self.context.clone(), future);
match self.executor.execute(fut) { ::task::LazyExecutor.spawn(Box::new(fut))
}
}
impl<T, F> Executor<F> for ContextualExecutor<T>
where
T: fmt::Display + 'static + Send + Sync,
F: Future<Item = (), Error = ()> + 'static + Send,
{
fn execute(&self, future: F) -> ::std::result::Result<(), ExecuteError<F>> {
let fut = context_future(self.context.clone(), future);
match ::task::LazyExecutor.execute(fut) {
Ok(()) => Ok(()), Ok(()) => Ok(()),
Err(err) => { Err(err) => {
let kind = err.kind(); let kind = err.kind();
let future = err.into_future(); let mut future = err.into_future();
Err(ExecuteError::new(kind, future.future)) Err(ExecuteError::new(kind, future.future.take().expect("future")))
} }
} }
} }
} }
struct Context<'a>(&'a [*const fmt::Debug]); impl<T> Clone for ContextualExecutor<T> {
fn clone(&self) -> Self {
Self {
context: self.context.clone(),
}
}
}
impl<'a> fmt::Debug for Context<'a> { struct Context<'a>(&'a [*const fmt::Display]);
impl<'a> fmt::Display for Context<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.0.is_empty() { if self.0.is_empty() {
return Ok(()); return Ok(());
@ -125,8 +157,7 @@ impl<'a> fmt::Debug for Context<'a> {
for item in self.0 { for item in self.0 {
// See `fn context()` for comments about this unsafe. // See `fn context()` for comments about this unsafe.
let item = unsafe { &**item }; let item = unsafe { &**item };
item.fmt(f)?; write!(f, "{} ", item)?;
f.write_str(", ")?;
} }
Ok(()) Ok(())
} }
@ -136,17 +167,17 @@ impl<'a> fmt::Debug for Context<'a> {
/// ///
/// Specifically, this protects even if the passed function panics, /// Specifically, this protects even if the passed function panics,
/// as destructors are run while unwinding. /// as destructors are run while unwinding.
struct ContextGuard<'a>(&'a (fmt::Debug + 'static)); struct ContextGuard<'a>(&'a (fmt::Display + 'static));
impl<'a> ContextGuard<'a> { impl<'a> ContextGuard<'a> {
fn new(context: &'a (fmt::Debug + 'static)) -> Self { fn new(context: &'a (fmt::Display + 'static)) -> Self {
// This is a raw pointer because of lifetime conflicts that require // This is a raw pointer because of lifetime conflicts that require
// the thread local to have a static lifetime. // the thread local to have a static lifetime.
// //
// We don't want to require a static lifetime, and in fact, // We don't want to require a static lifetime, and in fact,
// only use the reference within this closure, so converting // only use the reference within this closure, so converting
// to a raw pointer is safe. // to a raw pointer is safe.
let raw = context as *const fmt::Debug; let raw = context as *const fmt::Display;
CONTEXT.with(|ctxt| { CONTEXT.with(|ctxt| {
ctxt.borrow_mut().push(raw); ctxt.borrow_mut().push(raw);
}); });
@ -161,3 +192,177 @@ impl<'a> Drop for ContextGuard<'a> {
}); });
} }
} }
pub fn admin() -> Section {
Section::Admin
}
pub fn proxy() -> Section {
Section::Proxy
}
#[derive(Copy, Clone)]
pub enum Section {
Proxy,
Admin,
}
/// A utility for logging actions taken on behalf of a server task.
#[derive(Clone)]
pub struct Server {
section: Section,
name: &'static str,
listen: SocketAddr,
remote: Option<SocketAddr>,
}
/// A utility for logging actions taken on behalf of a client task.
#[derive(Clone)]
pub struct Client<C: fmt::Display, D: fmt::Display> {
section: Section,
client: C,
dst: D,
protocol: Option<::bind::Protocol>,
remote: Option<SocketAddr>,
}
/// A utility for logging actions taken on behalf of a background task.
#[derive(Clone)]
pub struct Bg {
section: Section,
name: &'static str,
}
impl Section {
pub fn bg(&self, name: &'static str) -> Bg {
Bg {
section: *self,
name,
}
}
pub fn server(&self, name: &'static str, listen: SocketAddr) -> Server {
Server {
section: *self,
name,
listen,
remote: None,
}
}
pub fn client<C: fmt::Display, D: fmt::Display>(&self, client: C, dst: D) -> Client<C, D> {
Client {
section: *self,
client,
dst,
protocol: None,
remote: None,
}
}
}
impl fmt::Display for Section {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Section::Proxy => "proxy".fmt(f),
Section::Admin => "admin".fmt(f),
}
}
}
pub type BgFuture<F> = ContextualFuture<Bg, F>;
pub type ClientExecutor<C, D> = ContextualExecutor<Client<C, D>>;
pub type ServerExecutor = ContextualExecutor<Server>;
pub type ServerFuture<F> = ContextualFuture<Server, F>;
impl Server {
pub fn proxy(ctx: &::ctx::Proxy, listen: SocketAddr) -> Self {
let name = if ctx.is_inbound() {
"in"
} else {
"out"
};
Section::Proxy.server(name, listen)
}
pub fn with_remote(self, remote: SocketAddr) -> Self {
Self {
remote: Some(remote),
.. self
}
}
pub fn executor(self) -> ServerExecutor {
context_executor(self)
}
pub fn future<F: Future>(self, f: F) -> ServerFuture<F> {
context_future(self, f)
}
}
impl fmt::Display for Server {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}={{server={} listen={}", self.section, self.name, self.listen)?;
if let Some(remote) = self.remote {
write!(f, " remote={}", remote)?;
}
write!(f, "}}")
}
}
impl<D: fmt::Display> Client<&'static str, D> {
pub fn proxy(ctx: &::ctx::Proxy, dst: D) -> Self {
let name = if ctx.is_inbound() {
"in"
} else {
"out"
};
Section::Proxy.client(name, dst)
}
}
impl<C: fmt::Display, D: fmt::Display> Client<C, D> {
pub fn with_protocol(self, p: ::bind::Protocol) -> Self {
Self {
protocol: Some(p),
.. self
}
}
pub fn with_remote(self, remote: SocketAddr) -> Self {
Self {
remote: Some(remote),
.. self
}
}
pub fn executor(self) -> ClientExecutor<C, D> {
context_executor(self)
}
}
impl<C: fmt::Display, D: fmt::Display> fmt::Display for Client<C, D> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}={{client={} dst={}", self.section, self.client, self.dst)?;
if let Some(ref proto) = self.protocol {
write!(f, " proto={:?}", proto)?;
}
if let Some(remote) = self.remote {
write!(f, " remote={}", remote)?;
}
write!(f, "}}")
}
}
impl Bg {
pub fn future<F: Future>(self, f: F) -> BgFuture<F> {
context_future(self, f)
}
}
impl fmt::Display for Bg {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}={{bg={}}}", self.section, self.name)
}
}

View File

@ -16,7 +16,6 @@ use conduit_proxy_router::Recognize;
use bind::{self, Bind, Protocol}; use bind::{self, Bind, Protocol};
use control::destination::{self, Bind as BindTrait, Resolution}; use control::destination::{self, Bind as BindTrait, Resolution};
use ctx; use ctx;
use task::LazyExecutor;
use timeout::Timeout; use timeout::Timeout;
use transparency::h1; use transparency::h1;
use transport::{DnsNameAndPort, Host, HostAndPort}; use transport::{DnsNameAndPort, Host, HostAndPort};
@ -150,19 +149,19 @@ where
let loaded = tower_balance::load::WithPendingRequests::new(resolve); let loaded = tower_balance::load::WithPendingRequests::new(resolve);
// We can't use `rand::thread_rng` here because the returned `Service` // We can't use `rand::thread_rng` here because the returned `Service`
// needs to be `Send`, so instead, we use `LazyRng`, which calls // needs to be `Send`, so instead, we use `LazyRng`, which calls
// `rand::thread_rng()` when it is *used*. // `rand::thread_rng()` when it is *used*.
let balance = tower_balance::power_of_two_choices(loaded, LazyThreadRng); let balance = tower_balance::power_of_two_choices(loaded, LazyThreadRng);
let buffer = Buffer::new(balance, &LazyExecutor) let log = ::logging::proxy().client("out", Dst(dest.clone()))
.with_protocol(protocol.clone());
let buffer = Buffer::new(balance, &log.executor())
.map_err(|_| bind::BufferSpawnError::Outbound)?; .map_err(|_| bind::BufferSpawnError::Outbound)?;
let timeout = Timeout::new(buffer, self.bind_timeout); let timeout = Timeout::new(buffer, self.bind_timeout);
Ok(InFlightLimit::new(timeout, MAX_IN_FLIGHT)) Ok(InFlightLimit::new(timeout, MAX_IN_FLIGHT))
} }
} }
@ -195,7 +194,7 @@ where
// closing down when the connection is no longer usable. // closing down when the connection is no longer usable.
if let Some((addr, bind)) = opt.take() { if let Some((addr, bind)) = opt.take() {
let svc = bind.bind(&addr.into()) let svc = bind.bind(&addr.into())
.map_err(|_| BindError::External{ addr })?; .map_err(|_| BindError::External { addr })?;
Ok(Async::Ready(Change::Insert(addr, svc))) Ok(Async::Ready(Change::Insert(addr, svc)))
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)
@ -232,3 +231,16 @@ impl error::Error for BindError {
fn cause(&self) -> Option<&error::Error> { None } fn cause(&self) -> Option<&error::Error> { None }
} }
struct Dst(Destination);
impl fmt::Display for Dst {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.0 {
Destination::Hostname(ref name) => {
write!(f, "{}:{}", name.host, name.port)
}
Destination::ImplicitOriginalDst(ref addr) => addr.fmt(f),
}
}
}

View File

@ -33,6 +33,9 @@ use std::{
#[derive(Copy, Clone, Debug, Default)] #[derive(Copy, Clone, Debug, Default)]
pub struct LazyExecutor; pub struct LazyExecutor;
#[derive(Copy, Clone, Debug, Default)]
pub struct BoxExecutor<E: TokioExecutor>(E);
/// Indicates which Tokio `Runtime` should be used for the main proxy. /// Indicates which Tokio `Runtime` should be used for the main proxy.
/// ///
/// This is either a `tokio::runtime::current_thread::Runtime`, or a /// This is either a `tokio::runtime::current_thread::Runtime`, or a
@ -104,6 +107,53 @@ where
} }
} }
// ===== impl BoxExecutor =====;
impl<E: TokioExecutor> BoxExecutor<E> {
pub fn new(e: E) -> Self {
BoxExecutor(e)
}
}
impl<E: TokioExecutor> TokioExecutor for BoxExecutor<E> {
fn spawn(
&mut self,
future: Box<Future<Item = (), Error = ()> + 'static + Send>
) -> Result<(), SpawnError> {
self.0.spawn(future)
}
fn status(&self) -> Result<(), SpawnError> {
self.0.status()
}
}
impl<F, E> Executor<F> for BoxExecutor<E>
where
F: Future<Item = (), Error = ()> + 'static + Send,
E: TokioExecutor,
E: Executor<Box<Future<Item = (), Error = ()> + Send + 'static>>,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
// Check the status of the executor first, so that we can return the
// future in the `ExecuteError`. If we just called `spawn` and
// `map_err`ed the error into an `ExecuteError`, we'd have to move the
// future into the closure, but it was already moved into `spawn`.
if let Err(e) = self.0.status() {
if e.is_at_capacity() {
return Err(ExecuteError::new(ExecuteErrorKind::NoCapacity, future));
} else if e.is_shutdown() {
return Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future));
} else {
panic!("unexpected `SpawnError`: {:?}", e);
}
};
self.0.execute(Box::new(future))
.expect("spawn() errored but status() was Ok");
Ok(())
}
}
// ===== impl MainRuntime ===== // ===== impl MainRuntime =====
impl MainRuntime { impl MainRuntime {

View File

@ -96,25 +96,33 @@ impl Control {
-> impl Future<Item = (), Error = io::Error> -> impl Future<Item = (), Error = io::Error>
{ {
use hyper; use hyper;
let log = ::logging::admin().server("metrics", bound_port.local_addr());
let service = self.metrics_service.clone(); let service = self.metrics_service.clone();
bound_port.listen_and_fold( let fut = {
hyper::server::conn::Http::new(), let log = log.clone();
move |hyper, (conn, _)| { bound_port.listen_and_fold(
let service = service.clone(); hyper::server::conn::Http::new(),
let serve = hyper.serve_connection(conn, service) move |hyper, (conn, remote)| {
.map(|_| {}) let service = service.clone();
.map_err(|e| { let serve = hyper.serve_connection(conn, service)
error!("error serving prometheus metrics: {:?}", e); .map(|_| {})
}); .map_err(|e| {
let serve = ::logging::context_future("serve_metrics", serve); error!("error serving prometheus metrics: {:?}", e);
});
let serve = log.clone().with_remote(remote).future(serve);
let r = TaskExecutor::current() let r = TaskExecutor::current()
.spawn_local(Box::new(serve)) .spawn_local(Box::new(serve))
.map(move |()| hyper) .map(move |()| hyper)
.map_err(task::Error::into_io); .map_err(task::Error::into_io);
future::result(r) future::result(r)
}) })
};
log.future(fut)
} }
} }

View File

@ -1,14 +1,15 @@
use bytes::IntoBuf; use bytes::IntoBuf;
use futures::{Async, Future, Poll}; use futures::{future, Async, Future, Poll};
use h2; use h2;
use http; use http;
use hyper; use hyper;
use tokio::executor::Executor;
use tokio_connect::Connect; use tokio_connect::Connect;
use tower_service::{Service, NewService}; use tower_service::{Service, NewService};
use tower_h2; use tower_h2;
use bind; use bind;
use task::LazyExecutor; use task::BoxExecutor;
use telemetry::sensor::http::RequestBody; use telemetry::sensor::http::RequestBody;
use super::glue::{BodyStream, HttpBody, HyperConnect}; use super::glue::{BodyStream, HttpBody, HyperConnect};
@ -16,75 +17,91 @@ type HyperClient<C, B> =
hyper::Client<HyperConnect<C>, BodyStream<RequestBody<B>>>; hyper::Client<HyperConnect<C>, BodyStream<RequestBody<B>>>;
/// A `NewService` that can speak either HTTP/1 or HTTP/2. /// A `NewService` that can speak either HTTP/1 or HTTP/2.
pub struct Client<C, B> pub struct Client<C, E, B>
where where
B: tower_h2::Body + 'static, B: tower_h2::Body + 'static,
C: Connect + 'static,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
{ {
inner: ClientInner<C, B>, inner: ClientInner<C, E, B>,
} }
enum ClientInner<C, B> enum ClientInner<C, E, B>
where where
B: tower_h2::Body + 'static, B: tower_h2::Body + 'static,
C: Connect + 'static,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
{ {
Http1(HyperClient<C, B>), Http1(HyperClient<C, B>),
Http2(tower_h2::client::Connect<C, LazyExecutor, RequestBody<B>>), Http2(tower_h2::client::Connect<C, BoxExecutor<E>, RequestBody<B>>),
} }
/// A `Future` returned from `Client::new_service()`. /// A `Future` returned from `Client::new_service()`.
pub struct ClientNewServiceFuture<C, B> pub struct ClientNewServiceFuture<C, E, B>
where where
B: tower_h2::Body + 'static, B: tower_h2::Body + 'static,
C: Connect + 'static, C: Connect + 'static,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
{ {
inner: ClientNewServiceFutureInner<C, B>, inner: ClientNewServiceFutureInner<C, E, B>,
} }
enum ClientNewServiceFutureInner<C, B> enum ClientNewServiceFutureInner<C, E, B>
where where
B: tower_h2::Body + 'static, B: tower_h2::Body + 'static,
C: Connect + 'static, C: Connect + 'static,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
{ {
Http1(Option<HyperClient<C, B>>), Http1(Option<HyperClient<C, B>>),
Http2(tower_h2::client::ConnectFuture<C, LazyExecutor, RequestBody<B>>), Http2(tower_h2::client::ConnectFuture<C, BoxExecutor<E>, RequestBody<B>>),
} }
/// The `Service` yielded by `Client::new_service()`. /// The `Service` yielded by `Client::new_service()`.
pub struct ClientService<C, B> pub struct ClientService<C, E, B>
where where
B: tower_h2::Body + 'static, B: tower_h2::Body + 'static,
C: Connect, C: Connect,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
{ {
inner: ClientServiceInner<C, B>, inner: ClientServiceInner<C, E, B>,
} }
enum ClientServiceInner<C, B> enum ClientServiceInner<C, E, B>
where where
B: tower_h2::Body + 'static, B: tower_h2::Body + 'static,
C: Connect C: Connect,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
{ {
Http1(HyperClient<C, B>), Http1(HyperClient<C, B>),
Http2(tower_h2::client::Connection< Http2(tower_h2::client::Connection<
<C as Connect>::Connected, <C as Connect>::Connected,
LazyExecutor, BoxExecutor<E>,
RequestBody<B>, RequestBody<B>,
>), >),
} }
impl<C, B> Client<C, B> impl<C, E, B> Client<C, E, B>
where where
C: Connect + Clone + Send + Sync + 'static, C: Connect + Clone + Send + Sync + 'static,
C::Future: Send + 'static, C::Future: Send + 'static,
C::Connected: Send, C::Connected: Send,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
B: tower_h2::Body + Send + 'static, B: tower_h2::Body + Send + 'static,
<B::Data as IntoBuf>::Buf: Send + 'static, <B::Data as IntoBuf>::Buf: Send + 'static,
{ {
/// Create a new `Client`, bound to a specific protocol (HTTP/1 or HTTP/2). /// Create a new `Client`, bound to a specific protocol (HTTP/1 or HTTP/2).
pub fn new(protocol: &bind::Protocol, connect: C) -> Self { pub fn new(protocol: &bind::Protocol, connect: C, executor: E) -> Self {
match *protocol { match *protocol {
bind::Protocol::Http1 { was_absolute_form, .. } => { bind::Protocol::Http1 { was_absolute_form, .. } => {
let h1 = hyper::Client::builder() let h1 = hyper::Client::builder()
.executor(LazyExecutor) .executor(executor)
// hyper should never try to automatically set the Host // hyper should never try to automatically set the Host
// header, instead always just passing whatever we received. // header, instead always just passing whatever we received.
.set_host(false) .set_host(false)
@ -98,7 +115,7 @@ where
// h2 currently doesn't handle PUSH_PROMISE that well, so we just // h2 currently doesn't handle PUSH_PROMISE that well, so we just
// disable it for now. // disable it for now.
h2_builder.enable_push(false); h2_builder.enable_push(false);
let h2 = tower_h2::client::Connect::new(connect, h2_builder, LazyExecutor); let h2 = tower_h2::client::Connect::new(connect, h2_builder, BoxExecutor::new(executor));
Client { Client {
inner: ClientInner::Http2(h2), inner: ClientInner::Http2(h2),
@ -108,11 +125,13 @@ where
} }
} }
impl<C, B> NewService for Client<C, B> impl<C, E, B> NewService for Client<C, E, B>
where where
C: Connect + Clone + Send + Sync + 'static, C: Connect + Clone + Send + Sync + 'static,
C::Future: Send + 'static, C::Future: Send + 'static,
C::Connected: Send, C::Connected: Send,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
B: tower_h2::Body + Send + 'static, B: tower_h2::Body + Send + 'static,
<B::Data as IntoBuf>::Buf: Send + 'static, <B::Data as IntoBuf>::Buf: Send + 'static,
{ {
@ -120,8 +139,8 @@ where
type Response = http::Response<HttpBody>; type Response = http::Response<HttpBody>;
type Error = tower_h2::client::Error; type Error = tower_h2::client::Error;
type InitError = tower_h2::client::ConnectError<C::Error>; type InitError = tower_h2::client::ConnectError<C::Error>;
type Service = ClientService<C, B>; type Service = ClientService<C, E, B>;
type Future = ClientNewServiceFuture<C, B>; type Future = ClientNewServiceFuture<C, E, B>;
fn new_service(&self) -> Self::Future { fn new_service(&self) -> Self::Future {
let inner = match self.inner { let inner = match self.inner {
@ -138,15 +157,17 @@ where
} }
} }
impl<C, B> Future for ClientNewServiceFuture<C, B> impl<C, E, B> Future for ClientNewServiceFuture<C, E, B>
where where
C: Connect + Send + 'static, C: Connect + Send + 'static,
C::Connected: Send, C::Connected: Send,
C::Future: Send + 'static, C::Future: Send + 'static,
B: tower_h2::Body + Send + 'static, B: tower_h2::Body + Send + 'static,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
<B::Data as IntoBuf>::Buf: Send + 'static, <B::Data as IntoBuf>::Buf: Send + 'static,
{ {
type Item = ClientService<C, B>; type Item = ClientService<C, E, B>;
type Error = tower_h2::client::ConnectError<C::Error>; type Error = tower_h2::client::ConnectError<C::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -165,11 +186,13 @@ where
} }
} }
impl<C, B> Service for ClientService<C, B> impl<C, E, B> Service for ClientService<C, E, B>
where where
C: Connect + Send + Sync + 'static, C: Connect + Send + Sync + 'static,
C::Connected: Send, C::Connected: Send,
C::Future: Send + 'static, C::Future: Send + 'static,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
B: tower_h2::Body + Send + 'static, B: tower_h2::Body + Send + 'static,
<B::Data as IntoBuf>::Buf: Send + 'static, <B::Data as IntoBuf>::Buf: Send + 'static,
{ {
@ -186,6 +209,8 @@ where
} }
fn call(&mut self, req: Self::Request) -> Self::Future { fn call(&mut self, req: Self::Request) -> Self::Future {
debug!("ClientService::call method={} uri={} headers={:?} ext={:?}",
req.method(), req.uri(), req.headers(), req.extensions());
match self.inner { match self.inner {
ClientServiceInner::Http1(ref h1) => { ClientServiceInner::Http1(ref h1) => {
let mut req = hyper::Request::from(req.map(BodyStream::new)); let mut req = hyper::Request::from(req.map(BodyStream::new));

View File

@ -15,7 +15,6 @@ use connection::{Connection, Peek};
use ctx::Proxy as ProxyCtx; use ctx::Proxy as ProxyCtx;
use ctx::transport::{Server as ServerCtx}; use ctx::transport::{Server as ServerCtx};
use drain; use drain;
use task::LazyExecutor;
use telemetry::Sensors; use telemetry::Sensors;
use transport::GetOriginalDst; use transport::GetOriginalDst;
use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc}; use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc};
@ -37,12 +36,17 @@ where
drain_signal: drain::Watch, drain_signal: drain::Watch,
get_orig_dst: G, get_orig_dst: G,
h1: hyper::server::conn::Http, h1: hyper::server::conn::Http,
h2: tower_h2::Server<HttpBodyNewSvc<S>, LazyExecutor, B>, h2: tower_h2::Server<
HttpBodyNewSvc<S>,
::logging::ServerExecutor,
B
>,
listen_addr: SocketAddr, listen_addr: SocketAddr,
new_service: S, new_service: S,
proxy_ctx: Arc<ProxyCtx>, proxy_ctx: Arc<ProxyCtx>,
sensors: Sensors, sensors: Sensors,
tcp: tcp::Proxy, tcp: tcp::Proxy,
log: ::logging::Server,
} }
impl<S, B, G> Server<S, B, G> impl<S, B, G> Server<S, B, G>
@ -79,6 +83,7 @@ where
) -> Self { ) -> Self {
let recv_body_svc = HttpBodyNewSvc::new(stack.clone()); let recv_body_svc = HttpBodyNewSvc::new(stack.clone());
let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone()); let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone());
let log = ::logging::Server::proxy(&proxy_ctx, listen_addr);
Server { Server {
disable_protocol_detection_ports, disable_protocol_detection_ports,
drain_signal, drain_signal,
@ -87,16 +92,21 @@ where
h2: tower_h2::Server::new( h2: tower_h2::Server::new(
recv_body_svc, recv_body_svc,
Default::default(), Default::default(),
LazyExecutor, log.clone().executor(),
), ),
listen_addr, listen_addr,
new_service: stack, new_service: stack,
proxy_ctx, proxy_ctx,
sensors, sensors,
tcp, tcp,
log,
} }
} }
pub fn log(&self) -> &::logging::Server {
&self.log
}
/// Handle a new connection. /// Handle a new connection.
/// ///
/// This will peek on the connection for the first bytes to determine /// This will peek on the connection for the first bytes to determine
@ -117,6 +127,8 @@ where
&remote_addr, &remote_addr,
&orig_dst, &orig_dst,
); );
let log = self.log.clone()
.with_remote(remote_addr);
// record telemetry // record telemetry
let io = self.sensors.accept(connection, opened_at, &srv_ctx); let io = self.sensors.accept(connection, opened_at, &srv_ctx);
@ -139,7 +151,7 @@ where
self.drain_signal.clone(), self.drain_signal.clone(),
); );
return Either::B(fut); return log.future(Either::B(fut));
} }
// try to sniff protocol // try to sniff protocol
@ -148,7 +160,7 @@ where
let tcp = self.tcp.clone(); let tcp = self.tcp.clone();
let new_service = self.new_service.clone(); let new_service = self.new_service.clone();
let drain_signal = self.drain_signal.clone(); let drain_signal = self.drain_signal.clone();
Either::A(io.peek() let fut = Either::A(io.peek()
.map_err(|e| debug!("peek error: {}", e)) .map_err(|e| debug!("peek error: {}", e))
.and_then(move |io| { .and_then(move |io| {
if let Some(proto) = Protocol::detect(io.peeked()) { if let Some(proto) = Protocol::detect(io.peeked()) {
@ -171,7 +183,6 @@ where
}, },
Protocol::Http2 => { Protocol::Http2 => {
trace!("transparency detected HTTP/2"); trace!("transparency detected HTTP/2");
let set_ctx = move |request: &mut http::Request<()>| { let set_ctx = move |request: &mut http::Request<()>| {
request.extensions_mut().insert(srv_ctx.clone()); request.extensions_mut().insert(srv_ctx.clone());
}; };
@ -194,7 +205,9 @@ where
drain_signal, drain_signal,
)) ))
} }
})) }));
log.future(fut)
} }
} }

View File

@ -1,7 +1,7 @@
use futures::Future; use futures::Future;
use tokio_connect; use tokio_connect;
use std::io; use std::{fmt, io};
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::str::FromStr; use std::str::FromStr;
@ -78,6 +78,19 @@ impl<'a> From<&'a HostAndPort> for http::uri::Authority {
} }
} }
impl fmt::Display for HostAndPort {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.host {
Host::DnsName(ref dns) => {
write!(f, "{}:{}", dns, self.port)
}
Host::Ip(ref ip) => {
write!(f, "{}:{}", ip, self.port)
}
}
}
}
// ===== impl Connect ===== // ===== impl Connect =====
impl Connect { impl Connect {