use std::cell::RefCell; use std::env; use std::io::Write; use std::fmt; use std::net::SocketAddr; use std::sync::Arc; use env_logger; use futures::{Future, Poll}; use futures::future::{ExecuteError, Executor}; use log::{Level}; const ENV_LOG: &str = "LINKERD2_PROXY_LOG"; thread_local! { static CONTEXT: RefCell> = RefCell::new(Vec::new()); } pub fn init() { env_logger::Builder::new() .format(|fmt, record| { CONTEXT.with(|ctxt| { let level = match record.level() { Level::Trace => "TRCE", Level::Debug => "DBUG", Level::Info => "INFO", Level::Warn => "WARN", Level::Error => "ERR!", }; writeln!( fmt, "{} {}{} {}", level, Context(&ctxt.borrow()), record.target(), record.args() ) }) }) .parse(&env::var(ENV_LOG).unwrap_or_default()) .init(); } /// Execute a closure with a `Display` item attached to allow log messages. pub fn context(context: &T, mut closure: F) -> U where T: fmt::Display + 'static, F: FnMut() -> U, { let _guard = ContextGuard::new(context); closure() } /// Wrap a `Future` with a `Display` value that will be inserted into all logs /// created by this Future. pub fn context_future(context: T, future: F) -> ContextualFuture { ContextualFuture { context, future: Some(future), } } /// Wrap `task::LazyExecutor` to spawn futures that have a reference to the `Display` /// value, inserting it into all logs created by this future. pub fn context_executor(context: T) -> ContextualExecutor { ContextualExecutor { context: Arc::new(context), } } #[derive(Debug)] pub struct ContextualFuture { context: T, future: Option, } impl Future for ContextualFuture where T: fmt::Display + 'static, F: Future, { type Item = F::Item; type Error = F::Error; fn poll(&mut self) -> Poll { let ctxt = &self.context; let fut = self.future.as_mut().expect("poll after drop"); context(ctxt, || fut.poll()) } } impl Drop for ContextualFuture where T: fmt::Display + 'static, F: Future, { fn drop(&mut self) { if self.future.is_some() { let ctxt = &self.context; let fut = &mut self.future; context(ctxt, || drop(fut.take())) } } } #[derive(Debug)] pub struct ContextualExecutor { context: Arc, } impl ::tokio::executor::Executor for ContextualExecutor where T: fmt::Display + 'static + Send + Sync, { fn spawn( &mut self, future: Box + 'static + Send> ) -> ::std::result::Result<(), ::tokio::executor::SpawnError> { let fut = context_future(self.context.clone(), future); ::task::LazyExecutor.spawn(Box::new(fut)) } } impl Executor for ContextualExecutor where T: fmt::Display + 'static + Send + Sync, F: Future + 'static + Send, { fn execute(&self, future: F) -> ::std::result::Result<(), ExecuteError> { let fut = context_future(self.context.clone(), future); match ::task::LazyExecutor.execute(fut) { Ok(()) => Ok(()), Err(err) => { let kind = err.kind(); let mut future = err.into_future(); Err(ExecuteError::new(kind, future.future.take().expect("future"))) } } } } impl Clone for ContextualExecutor { fn clone(&self) -> Self { Self { context: self.context.clone(), } } } struct Context<'a>(&'a [*const fmt::Display]); impl<'a> fmt::Display for Context<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { if self.0.is_empty() { return Ok(()); } for item in self.0 { // See `fn context()` for comments about this unsafe. let item = unsafe { &**item }; write!(f, "{} ", item)?; } Ok(()) } } /// Guards that the pushed context is removed from TLS afterwards. /// /// Specifically, this protects even if the passed function panics, /// as destructors are run while unwinding. struct ContextGuard<'a>(&'a (fmt::Display + 'static)); impl<'a> ContextGuard<'a> { fn new(context: &'a (fmt::Display + 'static)) -> Self { // This is a raw pointer because of lifetime conflicts that require // the thread local to have a static lifetime. // // We don't want to require a static lifetime, and in fact, // only use the reference within this closure, so converting // to a raw pointer is safe. let raw = context as *const fmt::Display; CONTEXT.with(|ctxt| { ctxt.borrow_mut().push(raw); }); ContextGuard(context) } } impl<'a> Drop for ContextGuard<'a> { fn drop(&mut self) { CONTEXT.with(|ctxt| { ctxt.borrow_mut().pop(); }); } } pub fn admin() -> Section { Section::Admin } pub fn proxy() -> Section { Section::Proxy } #[derive(Copy, Clone)] pub enum Section { Proxy, Admin, } /// A utility for logging actions taken on behalf of a server task. #[derive(Clone)] pub struct Server { section: Section, name: &'static str, listen: SocketAddr, remote: Option, } /// A utility for logging actions taken on behalf of a client task. #[derive(Clone)] pub struct Client { section: Section, client: C, dst: D, protocol: Option<::bind::Protocol>, remote: Option, } /// A utility for logging actions taken on behalf of a background task. #[derive(Clone)] pub struct Bg { section: Section, name: &'static str, } impl Section { pub fn bg(&self, name: &'static str) -> Bg { Bg { section: *self, name, } } pub fn server(&self, name: &'static str, listen: SocketAddr) -> Server { Server { section: *self, name, listen, remote: None, } } pub fn client(&self, client: C, dst: D) -> Client { Client { section: *self, client, dst, protocol: None, remote: None, } } } impl fmt::Display for Section { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { Section::Proxy => "proxy".fmt(f), Section::Admin => "admin".fmt(f), } } } pub type BgFuture = ContextualFuture; pub type ClientExecutor = ContextualExecutor>; pub type ServerExecutor = ContextualExecutor; pub type ServerFuture = ContextualFuture; impl Server { pub fn proxy(ctx: &::ctx::Proxy, listen: SocketAddr) -> Self { let name = if ctx.is_inbound() { "in" } else { "out" }; Section::Proxy.server(name, listen) } pub fn with_remote(self, remote: SocketAddr) -> Self { Self { remote: Some(remote), .. self } } pub fn executor(self) -> ServerExecutor { context_executor(self) } pub fn future(self, f: F) -> ServerFuture { context_future(self, f) } } impl fmt::Display for Server { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}={{server={} listen={}", self.section, self.name, self.listen)?; if let Some(remote) = self.remote { write!(f, " remote={}", remote)?; } write!(f, "}}") } } impl Client<&'static str, D> { pub fn proxy(ctx: &::ctx::Proxy, dst: D) -> Self { let name = if ctx.is_inbound() { "in" } else { "out" }; Section::Proxy.client(name, dst) } } impl Client { pub fn with_protocol(self, p: ::bind::Protocol) -> Self { Self { protocol: Some(p), .. self } } pub fn with_remote(self, remote: SocketAddr) -> Self { Self { remote: Some(remote), .. self } } pub fn executor(self) -> ClientExecutor { context_executor(self) } } impl fmt::Display for Client { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}={{client={} dst={}", self.section, self.client, self.dst)?; if let Some(ref proto) = self.protocol { write!(f, " proto={:?}", proto)?; } if let Some(remote) = self.remote { write!(f, " remote={}", remote)?; } write!(f, "}}") } } impl Bg { pub fn future(self, f: F) -> BgFuture { context_future(self, f) } } impl fmt::Display for Bg { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}={{bg={}}}", self.section, self.name) } }