proxy: Add rich logging contexts (#1037)

While debugging proxy issues, I found it necessary to change how logging contexts are
instrumented, especially for clients.

This change moves away from using `Debug` types to in favor of `Display` types.
Furthermore, the `logging` module now provides a uniform set of logging contexts to be
used throughout the application.  All clients, servers, and background tasks should now be
instrumented so that their log messages contain predictable metadata.

Some small improvements have been made to ensure that logging contexts are correct
when a `Future` is dropped (which is important for some H2 uses, especially).
This commit is contained in:
Oliver Gould 2018-05-30 13:41:59 -07:00 committed by GitHub
parent 91075e7d32
commit 30ae471dda
12 changed files with 497 additions and 149 deletions

View File

@ -2,6 +2,7 @@ use std::error::Error;
use std::fmt;
use std::default::Default;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
@ -132,7 +133,11 @@ pub type HttpResponse = http::Response<sensor::http::ResponseBody<HttpBody>>;
pub type HttpRequest<B> = http::Request<sensor::http::RequestBody<B>>;
pub type Client<B> = transparency::Client<sensor::Connect<transport::Connect>, B>;
pub type Client<B> = transparency::Client<
sensor::Connect<transport::Connect>,
::logging::ClientExecutor<&'static str, SocketAddr>,
B,
>;
#[derive(Copy, Clone, Debug)]
pub enum BufferSpawnError {
@ -218,9 +223,12 @@ where
&client_ctx,
);
let log = ::logging::Client::proxy(&self.ctx, addr)
.with_protocol(protocol.clone());
let client = transparency::Client::new(
protocol,
connect,
log.executor(),
);
let sensors = self.sensors.http(
@ -376,11 +384,15 @@ where
let ready = match self.binding {
// A service is already bound, so poll its readiness.
Binding::Bound(ref mut svc) |
Binding::BindsPerRequest { next: Some(ref mut svc) } => svc.poll_ready(),
Binding::BindsPerRequest { next: Some(ref mut svc) } => {
trace!("poll_ready: stack already bound");
svc.poll_ready()
}
// If no stack has been bound, bind it now so that its readiness can be
// checked. Store it so it can be consumed to dispatch the next request.
Binding::BindsPerRequest { ref mut next } => {
trace!("poll_ready: binding stack");
let mut svc = self.bind.bind_stack(&self.endpoint, &self.protocol);
let ready = svc.poll_ready();
*next = Some(svc);
@ -400,12 +412,16 @@ where
if !self.debounce_connect_error_log {
self.debounce_connect_error_log = true;
warn!("connect error to {:?}: {}", self.endpoint, err);
} else {
debug!("connect error to {:?}: {}", self.endpoint, err);
}
match self.binding {
Binding::Bound(ref mut svc) => {
trace!("poll_ready: binding stack after error");
*svc = self.bind.bind_stack(&self.endpoint, &self.protocol);
},
Binding::BindsPerRequest { ref mut next } => {
trace!("poll_ready: dropping bound stack after error");
next.take();
}
}
@ -425,6 +441,7 @@ where
// don't debounce on NotReady...
Ok(Async::NotReady) => Ok(Async::NotReady),
other => {
trace!("poll_ready: ready for business");
self.debounce_connect_error_log = false;
other
},

View File

@ -32,7 +32,6 @@ use control::{
AddOrigin, Backoff, LogErrors
};
use dns::{self, IpAddrListFuture};
use task::LazyExecutor;
use telemetry::metrics::DstLabels;
use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect};
use timeout::Timeout;
@ -78,18 +77,18 @@ pub(super) fn task(
{
// Build up the Controller Client Stack
let mut client = {
let ctx = ("controller-client", format!("{:?}", host_and_port));
let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap();
let authority = http::uri::Authority::from(&host_and_port);
let connect = Timeout::new(
LookupAddressAndConnect::new(host_and_port, dns_resolver.clone()),
LookupAddressAndConnect::new(host_and_port.clone(), dns_resolver.clone()),
Duration::from_secs(3),
);
let log = ::logging::admin().client("control", host_and_port.clone());
let h2_client = tower_h2::client::Connect::new(
connect,
h2::client::Builder::default(),
::logging::context_executor(ctx, LazyExecutor),
log.executor()
);
let reconnect = Reconnect::new(h2_client);

View File

@ -47,7 +47,7 @@ impl<S> Backoff<S> {
impl<S> Service for Backoff<S>
where
S: Service,
S::Error: ::std::fmt::Debug,
S::Error: fmt::Debug,
{
type Request = S::Request;
type Response = S::Response;

View File

@ -10,7 +10,6 @@ use conduit_proxy_router::Recognize;
use bind;
use ctx;
use task::LazyExecutor;
type Bind<B> = bind::Bind<Arc<ctx::Proxy>, B>;
@ -88,7 +87,10 @@ where
let endpoint = (*addr).into();
let binding = self.bind.bind_service(&endpoint, proto);
Buffer::new(binding, &LazyExecutor)
let log = ::logging::proxy().client("in", "local")
.with_remote(*addr);
Buffer::new(binding, &log.executor())
.map(|buffer| {
InFlightLimit::new(buffer, MAX_IN_FLIGHT)
})

View File

@ -219,7 +219,7 @@ where
panic!("invalid DNS configuration: {:?}", e);
});
let (control, control_bg) = control::destination::new(
let (resolver, resolver_bg) = control::destination::new(
dns_resolver.clone(),
config.pod_namespace.clone(),
control_host_and_port
@ -245,7 +245,6 @@ where
config.inbound_router_max_idle_age,
);
serve(
"inbound",
inbound_listener,
router,
config.private_connect_timeout,
@ -264,12 +263,11 @@ where
let ctx = ctx::Proxy::outbound(&process_ctx);
let bind = bind.clone().with_ctx(ctx.clone());
let router = Router::new(
Outbound::new(bind, control, config.bind_timeout),
Outbound::new(bind, resolver, config.bind_timeout),
config.outbound_router_capacity,
config.outbound_router_max_idle_age,
);
serve(
"outbound",
outbound_listener,
router,
config.public_connect_timeout,
@ -283,35 +281,32 @@ where
trace!("running");
let (_tx, controller_shutdown_signal) = futures::sync::oneshot::channel::<()>();
let (_tx, admin_shutdown_signal) = futures::sync::oneshot::channel::<()>();
{
thread::Builder::new()
.name("controller-client".into())
.name("admin".into())
.spawn(move || {
use conduit_proxy_controller_grpc::tap::server::TapServer;
let mut rt = current_thread::Runtime::new()
.expect("initialize controller-client thread runtime");
.expect("initialize admin thread runtime");
let new_service = TapServer::new(observe);
let tap = serve_tap(control_listener, TapServer::new(observe));
let server = serve_control(control_listener, new_service);
let metrics_server = telemetry.serve_metrics(metrics_listener);
let metrics_server = telemetry
.serve_metrics(metrics_listener);
let fut = control_bg.join4(
server.map_err(|_| {}),
telemetry,
let fut = ::logging::admin().bg("resolver").future(resolver_bg)
.join4(
::logging::admin().bg("telemetry").future(telemetry),
tap.map_err(|_| {}),
metrics_server.map_err(|_| {}),
).map(|_| {});
let fut = ::logging::context_future("controller-client", fut);
rt.spawn(Box::new(fut));
let shutdown = controller_shutdown_signal.then(|_| Ok::<(), ()>(()));
rt.block_on(shutdown).expect("controller api");
trace!("controller client shutdown finished");
let shutdown = admin_shutdown_signal.then(|_| Ok::<(), ()>(()));
rt.block_on(shutdown).expect("admin");
trace!("admin shutdown finished");
})
.expect("initialize controller api thread");
trace!("controller client thread spawned");
@ -335,7 +330,6 @@ where
}
fn serve<R, B, E, F, G>(
name: &'static str,
bound_port: BoundPort,
router: Router<R>,
tcp_connect_timeout: Duration,
@ -400,7 +394,7 @@ where
let listen_addr = bound_port.local_addr();
let server = Server::new(
listen_addr,
proxy_ctx,
proxy_ctx.clone(),
sensors,
get_orig_dst,
stack,
@ -408,20 +402,22 @@ where
disable_protocol_detection_ports,
drain_rx.clone(),
);
let log = server.log().clone();
let accept = bound_port.listen_and_fold(
let accept = {
let fut = bound_port.listen_and_fold(
(),
move |(), (connection, remote_addr)| {
let s = server.serve(connection, remote_addr);
let s = ::logging::context_future((name, remote_addr), s);
// Logging context is configured by the server.
let r = DefaultExecutor::current()
.spawn(Box::new(s))
.map_err(task::Error::into_io);
future::result(r)
},
);
let accept = ::logging::context_future(name, accept);
log.future(fut)
};
let accept_until = Cancelable {
future: accept,
@ -460,7 +456,7 @@ where
}
}
fn serve_control<N, B>(
fn serve_tap<N, B>(
bound_port: BoundPort,
new_service: N,
) -> impl Future<Item = (), Error = io::Error> + 'static
@ -475,28 +471,36 @@ where
tower_h2::server::Connection<
connection::Connection,
N,
task::LazyExecutor,
::logging::ServerExecutor,
B,
()
>: Future<Item = ()>,
{
let log = logging::admin().server("tap", bound_port.local_addr());
let h2_builder = h2::server::Builder::default();
let server = tower_h2::Server::new(
new_service,
h2_builder,
task::LazyExecutor
log.clone().executor(),
);
let fut = {
let log = log.clone();
bound_port.listen_and_fold(
server,
move |server, (session, _)| {
let s = server.serve(session).map_err(|_| ());
let s = ::logging::context_future("serve_control", s);
move |server, (session, remote)| {
let log = log.clone().with_remote(remote);
let serve = server.serve(session).map_err(|_| ());
let r = executor::current_thread::TaskExecutor::current()
.spawn_local(Box::new(s))
.spawn_local(Box::new(log.future(serve)))
.map(move |_| server)
.map_err(task::Error::into_io);
future::result(r)
},
)
};
log.future(fut)
}

View File

@ -2,6 +2,7 @@ use std::cell::RefCell;
use std::env;
use std::io::Write;
use std::fmt;
use std::net::SocketAddr;
use std::sync::Arc;
use env_logger;
@ -12,7 +13,7 @@ use log::{Level};
const ENV_LOG: &str = "CONDUIT_PROXY_LOG";
thread_local! {
static CONTEXT: RefCell<Vec<*const fmt::Debug>> = RefCell::new(Vec::new());
static CONTEXT: RefCell<Vec<*const fmt::Display>> = RefCell::new(Vec::new());
}
pub fn init() {
@ -28,10 +29,10 @@ pub fn init() {
};
writeln!(
fmt,
"{} {} {:?}{}",
"{} {}{} {}",
level,
record.target(),
Context(&ctxt.borrow()),
record.target(),
record.args()
)
})
@ -40,43 +41,42 @@ pub fn init() {
.init();
}
/// Execute a closure with a `Debug` item attached to allow log messages.
/// Execute a closure with a `Display` item attached to allow log messages.
pub fn context<T, F, U>(context: &T, mut closure: F) -> U
where
T: ::std::fmt::Debug + 'static,
T: fmt::Display + 'static,
F: FnMut() -> U,
{
let _guard = ContextGuard::new(context);
closure()
}
/// Wrap a `Future` with a `Debug` value that will be inserted into all logs
/// Wrap a `Future` with a `Display` value that will be inserted into all logs
/// created by this Future.
pub fn context_future<T, F>(context: T, future: F) -> ContextualFuture<T, F> {
pub fn context_future<T: fmt::Display, F: Future>(context: T, future: F) -> ContextualFuture<T, F> {
ContextualFuture {
context,
future,
future: Some(future),
}
}
/// Wrap an `Executor` to spawn futures that have a reference to the `Debug`
/// Wrap `task::LazyExecutor` to spawn futures that have a reference to the `Display`
/// value, inserting it into all logs created by this future.
pub fn context_executor<T, E>(context: T, executor: E) -> ContextualExecutor<T, E> {
pub fn context_executor<T: fmt::Display>(context: T) -> ContextualExecutor<T> {
ContextualExecutor {
context: Arc::new(context),
executor,
}
}
#[derive(Debug)]
pub struct ContextualFuture<T, F> {
pub struct ContextualFuture<T: fmt::Display + 'static, F: Future> {
context: T,
future: F,
future: Option<F>,
}
impl<T, F> Future for ContextualFuture<T, F>
where
T: ::std::fmt::Debug + 'static,
T: fmt::Display + 'static,
F: Future,
{
type Item = F::Item;
@ -84,39 +84,71 @@ where
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let ctxt = &self.context;
let fut = &mut self.future;
let fut = self.future.as_mut().expect("poll after drop");
context(ctxt, || fut.poll())
}
}
#[derive(Clone, Debug)]
pub struct ContextualExecutor<T, E> {
context: Arc<T>,
executor: E,
impl<T, F> Drop for ContextualFuture<T, F>
where
T: fmt::Display + 'static,
F: Future,
{
fn drop(&mut self) {
if self.future.is_some() {
let ctxt = &self.context;
let fut = &mut self.future;
context(ctxt, || drop(fut.take()))
}
}
}
impl<T, E, F> Executor<F> for ContextualExecutor<T, E>
#[derive(Debug)]
pub struct ContextualExecutor<T> {
context: Arc<T>,
}
impl<T> ::tokio::executor::Executor for ContextualExecutor<T>
where
T: ::std::fmt::Debug + 'static,
E: Executor<ContextualFuture<Arc<T>, F>>,
F: Future<Item = (), Error = ()>,
T: fmt::Display + 'static + Send + Sync,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
fn spawn(
&mut self,
future: Box<Future<Item = (), Error = ()> + 'static + Send>
) -> ::std::result::Result<(), ::tokio::executor::SpawnError> {
let fut = context_future(self.context.clone(), future);
match self.executor.execute(fut) {
::task::LazyExecutor.spawn(Box::new(fut))
}
}
impl<T, F> Executor<F> for ContextualExecutor<T>
where
T: fmt::Display + 'static + Send + Sync,
F: Future<Item = (), Error = ()> + 'static + Send,
{
fn execute(&self, future: F) -> ::std::result::Result<(), ExecuteError<F>> {
let fut = context_future(self.context.clone(), future);
match ::task::LazyExecutor.execute(fut) {
Ok(()) => Ok(()),
Err(err) => {
let kind = err.kind();
let future = err.into_future();
Err(ExecuteError::new(kind, future.future))
let mut future = err.into_future();
Err(ExecuteError::new(kind, future.future.take().expect("future")))
}
}
}
}
struct Context<'a>(&'a [*const fmt::Debug]);
impl<T> Clone for ContextualExecutor<T> {
fn clone(&self) -> Self {
Self {
context: self.context.clone(),
}
}
}
impl<'a> fmt::Debug for Context<'a> {
struct Context<'a>(&'a [*const fmt::Display]);
impl<'a> fmt::Display for Context<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if self.0.is_empty() {
return Ok(());
@ -125,8 +157,7 @@ impl<'a> fmt::Debug for Context<'a> {
for item in self.0 {
// See `fn context()` for comments about this unsafe.
let item = unsafe { &**item };
item.fmt(f)?;
f.write_str(", ")?;
write!(f, "{} ", item)?;
}
Ok(())
}
@ -136,17 +167,17 @@ impl<'a> fmt::Debug for Context<'a> {
///
/// Specifically, this protects even if the passed function panics,
/// as destructors are run while unwinding.
struct ContextGuard<'a>(&'a (fmt::Debug + 'static));
struct ContextGuard<'a>(&'a (fmt::Display + 'static));
impl<'a> ContextGuard<'a> {
fn new(context: &'a (fmt::Debug + 'static)) -> Self {
fn new(context: &'a (fmt::Display + 'static)) -> Self {
// This is a raw pointer because of lifetime conflicts that require
// the thread local to have a static lifetime.
//
// We don't want to require a static lifetime, and in fact,
// only use the reference within this closure, so converting
// to a raw pointer is safe.
let raw = context as *const fmt::Debug;
let raw = context as *const fmt::Display;
CONTEXT.with(|ctxt| {
ctxt.borrow_mut().push(raw);
});
@ -161,3 +192,177 @@ impl<'a> Drop for ContextGuard<'a> {
});
}
}
pub fn admin() -> Section {
Section::Admin
}
pub fn proxy() -> Section {
Section::Proxy
}
#[derive(Copy, Clone)]
pub enum Section {
Proxy,
Admin,
}
/// A utility for logging actions taken on behalf of a server task.
#[derive(Clone)]
pub struct Server {
section: Section,
name: &'static str,
listen: SocketAddr,
remote: Option<SocketAddr>,
}
/// A utility for logging actions taken on behalf of a client task.
#[derive(Clone)]
pub struct Client<C: fmt::Display, D: fmt::Display> {
section: Section,
client: C,
dst: D,
protocol: Option<::bind::Protocol>,
remote: Option<SocketAddr>,
}
/// A utility for logging actions taken on behalf of a background task.
#[derive(Clone)]
pub struct Bg {
section: Section,
name: &'static str,
}
impl Section {
pub fn bg(&self, name: &'static str) -> Bg {
Bg {
section: *self,
name,
}
}
pub fn server(&self, name: &'static str, listen: SocketAddr) -> Server {
Server {
section: *self,
name,
listen,
remote: None,
}
}
pub fn client<C: fmt::Display, D: fmt::Display>(&self, client: C, dst: D) -> Client<C, D> {
Client {
section: *self,
client,
dst,
protocol: None,
remote: None,
}
}
}
impl fmt::Display for Section {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Section::Proxy => "proxy".fmt(f),
Section::Admin => "admin".fmt(f),
}
}
}
pub type BgFuture<F> = ContextualFuture<Bg, F>;
pub type ClientExecutor<C, D> = ContextualExecutor<Client<C, D>>;
pub type ServerExecutor = ContextualExecutor<Server>;
pub type ServerFuture<F> = ContextualFuture<Server, F>;
impl Server {
pub fn proxy(ctx: &::ctx::Proxy, listen: SocketAddr) -> Self {
let name = if ctx.is_inbound() {
"in"
} else {
"out"
};
Section::Proxy.server(name, listen)
}
pub fn with_remote(self, remote: SocketAddr) -> Self {
Self {
remote: Some(remote),
.. self
}
}
pub fn executor(self) -> ServerExecutor {
context_executor(self)
}
pub fn future<F: Future>(self, f: F) -> ServerFuture<F> {
context_future(self, f)
}
}
impl fmt::Display for Server {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}={{server={} listen={}", self.section, self.name, self.listen)?;
if let Some(remote) = self.remote {
write!(f, " remote={}", remote)?;
}
write!(f, "}}")
}
}
impl<D: fmt::Display> Client<&'static str, D> {
pub fn proxy(ctx: &::ctx::Proxy, dst: D) -> Self {
let name = if ctx.is_inbound() {
"in"
} else {
"out"
};
Section::Proxy.client(name, dst)
}
}
impl<C: fmt::Display, D: fmt::Display> Client<C, D> {
pub fn with_protocol(self, p: ::bind::Protocol) -> Self {
Self {
protocol: Some(p),
.. self
}
}
pub fn with_remote(self, remote: SocketAddr) -> Self {
Self {
remote: Some(remote),
.. self
}
}
pub fn executor(self) -> ClientExecutor<C, D> {
context_executor(self)
}
}
impl<C: fmt::Display, D: fmt::Display> fmt::Display for Client<C, D> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}={{client={} dst={}", self.section, self.client, self.dst)?;
if let Some(ref proto) = self.protocol {
write!(f, " proto={:?}", proto)?;
}
if let Some(remote) = self.remote {
write!(f, " remote={}", remote)?;
}
write!(f, "}}")
}
}
impl Bg {
pub fn future<F: Future>(self, f: F) -> BgFuture<F> {
context_future(self, f)
}
}
impl fmt::Display for Bg {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}={{bg={}}}", self.section, self.name)
}
}

View File

@ -16,7 +16,6 @@ use conduit_proxy_router::Recognize;
use bind::{self, Bind, Protocol};
use control::destination::{self, Bind as BindTrait, Resolution};
use ctx;
use task::LazyExecutor;
use timeout::Timeout;
use transparency::h1;
use transport::{DnsNameAndPort, Host, HostAndPort};
@ -150,19 +149,19 @@ where
let loaded = tower_balance::load::WithPendingRequests::new(resolve);
// We can't use `rand::thread_rng` here because the returned `Service`
// needs to be `Send`, so instead, we use `LazyRng`, which calls
// `rand::thread_rng()` when it is *used*.
let balance = tower_balance::power_of_two_choices(loaded, LazyThreadRng);
let buffer = Buffer::new(balance, &LazyExecutor)
let log = ::logging::proxy().client("out", Dst(dest.clone()))
.with_protocol(protocol.clone());
let buffer = Buffer::new(balance, &log.executor())
.map_err(|_| bind::BufferSpawnError::Outbound)?;
let timeout = Timeout::new(buffer, self.bind_timeout);
Ok(InFlightLimit::new(timeout, MAX_IN_FLIGHT))
}
}
@ -195,7 +194,7 @@ where
// closing down when the connection is no longer usable.
if let Some((addr, bind)) = opt.take() {
let svc = bind.bind(&addr.into())
.map_err(|_| BindError::External{ addr })?;
.map_err(|_| BindError::External { addr })?;
Ok(Async::Ready(Change::Insert(addr, svc)))
} else {
Ok(Async::NotReady)
@ -232,3 +231,16 @@ impl error::Error for BindError {
fn cause(&self) -> Option<&error::Error> { None }
}
struct Dst(Destination);
impl fmt::Display for Dst {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.0 {
Destination::Hostname(ref name) => {
write!(f, "{}:{}", name.host, name.port)
}
Destination::ImplicitOriginalDst(ref addr) => addr.fmt(f),
}
}
}

View File

@ -33,6 +33,9 @@ use std::{
#[derive(Copy, Clone, Debug, Default)]
pub struct LazyExecutor;
#[derive(Copy, Clone, Debug, Default)]
pub struct BoxExecutor<E: TokioExecutor>(E);
/// Indicates which Tokio `Runtime` should be used for the main proxy.
///
/// This is either a `tokio::runtime::current_thread::Runtime`, or a
@ -104,6 +107,53 @@ where
}
}
// ===== impl BoxExecutor =====;
impl<E: TokioExecutor> BoxExecutor<E> {
pub fn new(e: E) -> Self {
BoxExecutor(e)
}
}
impl<E: TokioExecutor> TokioExecutor for BoxExecutor<E> {
fn spawn(
&mut self,
future: Box<Future<Item = (), Error = ()> + 'static + Send>
) -> Result<(), SpawnError> {
self.0.spawn(future)
}
fn status(&self) -> Result<(), SpawnError> {
self.0.status()
}
}
impl<F, E> Executor<F> for BoxExecutor<E>
where
F: Future<Item = (), Error = ()> + 'static + Send,
E: TokioExecutor,
E: Executor<Box<Future<Item = (), Error = ()> + Send + 'static>>,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
// Check the status of the executor first, so that we can return the
// future in the `ExecuteError`. If we just called `spawn` and
// `map_err`ed the error into an `ExecuteError`, we'd have to move the
// future into the closure, but it was already moved into `spawn`.
if let Err(e) = self.0.status() {
if e.is_at_capacity() {
return Err(ExecuteError::new(ExecuteErrorKind::NoCapacity, future));
} else if e.is_shutdown() {
return Err(ExecuteError::new(ExecuteErrorKind::Shutdown, future));
} else {
panic!("unexpected `SpawnError`: {:?}", e);
}
};
self.0.execute(Box::new(future))
.expect("spawn() errored but status() was Ok");
Ok(())
}
}
// ===== impl MainRuntime =====
impl MainRuntime {

View File

@ -96,17 +96,22 @@ impl Control {
-> impl Future<Item = (), Error = io::Error>
{
use hyper;
let log = ::logging::admin().server("metrics", bound_port.local_addr());
let service = self.metrics_service.clone();
let fut = {
let log = log.clone();
bound_port.listen_and_fold(
hyper::server::conn::Http::new(),
move |hyper, (conn, _)| {
move |hyper, (conn, remote)| {
let service = service.clone();
let serve = hyper.serve_connection(conn, service)
.map(|_| {})
.map_err(|e| {
error!("error serving prometheus metrics: {:?}", e);
});
let serve = ::logging::context_future("serve_metrics", serve);
let serve = log.clone().with_remote(remote).future(serve);
let r = TaskExecutor::current()
.spawn_local(Box::new(serve))
@ -115,6 +120,9 @@ impl Control {
future::result(r)
})
};
log.future(fut)
}
}

View File

@ -1,14 +1,15 @@
use bytes::IntoBuf;
use futures::{Async, Future, Poll};
use futures::{future, Async, Future, Poll};
use h2;
use http;
use hyper;
use tokio::executor::Executor;
use tokio_connect::Connect;
use tower_service::{Service, NewService};
use tower_h2;
use bind;
use task::LazyExecutor;
use task::BoxExecutor;
use telemetry::sensor::http::RequestBody;
use super::glue::{BodyStream, HttpBody, HyperConnect};
@ -16,75 +17,91 @@ type HyperClient<C, B> =
hyper::Client<HyperConnect<C>, BodyStream<RequestBody<B>>>;
/// A `NewService` that can speak either HTTP/1 or HTTP/2.
pub struct Client<C, B>
pub struct Client<C, E, B>
where
B: tower_h2::Body + 'static,
C: Connect + 'static,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
{
inner: ClientInner<C, B>,
inner: ClientInner<C, E, B>,
}
enum ClientInner<C, B>
enum ClientInner<C, E, B>
where
B: tower_h2::Body + 'static,
C: Connect + 'static,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
{
Http1(HyperClient<C, B>),
Http2(tower_h2::client::Connect<C, LazyExecutor, RequestBody<B>>),
Http2(tower_h2::client::Connect<C, BoxExecutor<E>, RequestBody<B>>),
}
/// A `Future` returned from `Client::new_service()`.
pub struct ClientNewServiceFuture<C, B>
pub struct ClientNewServiceFuture<C, E, B>
where
B: tower_h2::Body + 'static,
C: Connect + 'static,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
{
inner: ClientNewServiceFutureInner<C, B>,
inner: ClientNewServiceFutureInner<C, E, B>,
}
enum ClientNewServiceFutureInner<C, B>
enum ClientNewServiceFutureInner<C, E, B>
where
B: tower_h2::Body + 'static,
C: Connect + 'static,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
{
Http1(Option<HyperClient<C, B>>),
Http2(tower_h2::client::ConnectFuture<C, LazyExecutor, RequestBody<B>>),
Http2(tower_h2::client::ConnectFuture<C, BoxExecutor<E>, RequestBody<B>>),
}
/// The `Service` yielded by `Client::new_service()`.
pub struct ClientService<C, B>
pub struct ClientService<C, E, B>
where
B: tower_h2::Body + 'static,
C: Connect,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
{
inner: ClientServiceInner<C, B>,
inner: ClientServiceInner<C, E, B>,
}
enum ClientServiceInner<C, B>
enum ClientServiceInner<C, E, B>
where
B: tower_h2::Body + 'static,
C: Connect
C: Connect,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
{
Http1(HyperClient<C, B>),
Http2(tower_h2::client::Connection<
<C as Connect>::Connected,
LazyExecutor,
BoxExecutor<E>,
RequestBody<B>,
>),
}
impl<C, B> Client<C, B>
impl<C, E, B> Client<C, E, B>
where
C: Connect + Clone + Send + Sync + 'static,
C::Future: Send + 'static,
C::Connected: Send,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
B: tower_h2::Body + Send + 'static,
<B::Data as IntoBuf>::Buf: Send + 'static,
{
/// Create a new `Client`, bound to a specific protocol (HTTP/1 or HTTP/2).
pub fn new(protocol: &bind::Protocol, connect: C) -> Self {
pub fn new(protocol: &bind::Protocol, connect: C, executor: E) -> Self {
match *protocol {
bind::Protocol::Http1 { was_absolute_form, .. } => {
let h1 = hyper::Client::builder()
.executor(LazyExecutor)
.executor(executor)
// hyper should never try to automatically set the Host
// header, instead always just passing whatever we received.
.set_host(false)
@ -98,7 +115,7 @@ where
// h2 currently doesn't handle PUSH_PROMISE that well, so we just
// disable it for now.
h2_builder.enable_push(false);
let h2 = tower_h2::client::Connect::new(connect, h2_builder, LazyExecutor);
let h2 = tower_h2::client::Connect::new(connect, h2_builder, BoxExecutor::new(executor));
Client {
inner: ClientInner::Http2(h2),
@ -108,11 +125,13 @@ where
}
}
impl<C, B> NewService for Client<C, B>
impl<C, E, B> NewService for Client<C, E, B>
where
C: Connect + Clone + Send + Sync + 'static,
C::Future: Send + 'static,
C::Connected: Send,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
B: tower_h2::Body + Send + 'static,
<B::Data as IntoBuf>::Buf: Send + 'static,
{
@ -120,8 +139,8 @@ where
type Response = http::Response<HttpBody>;
type Error = tower_h2::client::Error;
type InitError = tower_h2::client::ConnectError<C::Error>;
type Service = ClientService<C, B>;
type Future = ClientNewServiceFuture<C, B>;
type Service = ClientService<C, E, B>;
type Future = ClientNewServiceFuture<C, E, B>;
fn new_service(&self) -> Self::Future {
let inner = match self.inner {
@ -138,15 +157,17 @@ where
}
}
impl<C, B> Future for ClientNewServiceFuture<C, B>
impl<C, E, B> Future for ClientNewServiceFuture<C, E, B>
where
C: Connect + Send + 'static,
C::Connected: Send,
C::Future: Send + 'static,
B: tower_h2::Body + Send + 'static,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
<B::Data as IntoBuf>::Buf: Send + 'static,
{
type Item = ClientService<C, B>;
type Item = ClientService<C, E, B>;
type Error = tower_h2::client::ConnectError<C::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@ -165,11 +186,13 @@ where
}
}
impl<C, B> Service for ClientService<C, B>
impl<C, E, B> Service for ClientService<C, E, B>
where
C: Connect + Send + Sync + 'static,
C::Connected: Send,
C::Future: Send + 'static,
E: Executor + Clone,
E: future::Executor<Box<Future<Item = (), Error = ()> + Send + 'static>> + Send + Sync + 'static,
B: tower_h2::Body + Send + 'static,
<B::Data as IntoBuf>::Buf: Send + 'static,
{
@ -186,6 +209,8 @@ where
}
fn call(&mut self, req: Self::Request) -> Self::Future {
debug!("ClientService::call method={} uri={} headers={:?} ext={:?}",
req.method(), req.uri(), req.headers(), req.extensions());
match self.inner {
ClientServiceInner::Http1(ref h1) => {
let mut req = hyper::Request::from(req.map(BodyStream::new));

View File

@ -15,7 +15,6 @@ use connection::{Connection, Peek};
use ctx::Proxy as ProxyCtx;
use ctx::transport::{Server as ServerCtx};
use drain;
use task::LazyExecutor;
use telemetry::Sensors;
use transport::GetOriginalDst;
use super::glue::{HttpBody, HttpBodyNewSvc, HyperServerSvc};
@ -37,12 +36,17 @@ where
drain_signal: drain::Watch,
get_orig_dst: G,
h1: hyper::server::conn::Http,
h2: tower_h2::Server<HttpBodyNewSvc<S>, LazyExecutor, B>,
h2: tower_h2::Server<
HttpBodyNewSvc<S>,
::logging::ServerExecutor,
B
>,
listen_addr: SocketAddr,
new_service: S,
proxy_ctx: Arc<ProxyCtx>,
sensors: Sensors,
tcp: tcp::Proxy,
log: ::logging::Server,
}
impl<S, B, G> Server<S, B, G>
@ -79,6 +83,7 @@ where
) -> Self {
let recv_body_svc = HttpBodyNewSvc::new(stack.clone());
let tcp = tcp::Proxy::new(tcp_connect_timeout, sensors.clone());
let log = ::logging::Server::proxy(&proxy_ctx, listen_addr);
Server {
disable_protocol_detection_ports,
drain_signal,
@ -87,16 +92,21 @@ where
h2: tower_h2::Server::new(
recv_body_svc,
Default::default(),
LazyExecutor,
log.clone().executor(),
),
listen_addr,
new_service: stack,
proxy_ctx,
sensors,
tcp,
log,
}
}
pub fn log(&self) -> &::logging::Server {
&self.log
}
/// Handle a new connection.
///
/// This will peek on the connection for the first bytes to determine
@ -117,6 +127,8 @@ where
&remote_addr,
&orig_dst,
);
let log = self.log.clone()
.with_remote(remote_addr);
// record telemetry
let io = self.sensors.accept(connection, opened_at, &srv_ctx);
@ -139,7 +151,7 @@ where
self.drain_signal.clone(),
);
return Either::B(fut);
return log.future(Either::B(fut));
}
// try to sniff protocol
@ -148,7 +160,7 @@ where
let tcp = self.tcp.clone();
let new_service = self.new_service.clone();
let drain_signal = self.drain_signal.clone();
Either::A(io.peek()
let fut = Either::A(io.peek()
.map_err(|e| debug!("peek error: {}", e))
.and_then(move |io| {
if let Some(proto) = Protocol::detect(io.peeked()) {
@ -171,7 +183,6 @@ where
},
Protocol::Http2 => {
trace!("transparency detected HTTP/2");
let set_ctx = move |request: &mut http::Request<()>| {
request.extensions_mut().insert(srv_ctx.clone());
};
@ -194,7 +205,9 @@ where
drain_signal,
))
}
}))
}));
log.future(fut)
}
}

View File

@ -1,7 +1,7 @@
use futures::Future;
use tokio_connect;
use std::io;
use std::{fmt, io};
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
@ -78,6 +78,19 @@ impl<'a> From<&'a HostAndPort> for http::uri::Authority {
}
}
impl fmt::Display for HostAndPort {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.host {
Host::DnsName(ref dns) => {
write!(f, "{}:{}", dns, self.port)
}
Host::Ip(ref ip) => {
write!(f, "{}:{}", ip, self.port)
}
}
}
}
// ===== impl Connect =====
impl Connect {