proxy: Fix missing logging contexts on inbound/outbound (#1025)

Changes to `BoundPort::listen_and_fold` inadvertently broke the 
`::logging::context_future`s on the `serve` futures for the Inbound and 
outbound proxies, leading to log messages that didn't have the appropriate
context. This fixes that.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Eliza Weisman 2018-05-25 16:31:33 -07:00 committed by GitHub
parent 7764e97a25
commit 4cca72fb92
2 changed files with 24 additions and 21 deletions

View File

@ -53,7 +53,10 @@ use std::thread;
use std::time::Duration;
use indexmap::IndexSet;
use tokio::{executor, runtime::current_thread};
use tokio::{
executor::{self, DefaultExecutor, Executor},
runtime::current_thread,
};
use tower_service::NewService;
use tower_fn::*;
use conduit_proxy_router::{Recognize, Router, Error as RouteError};
@ -242,7 +245,8 @@ where
config.inbound_router_capacity,
config.inbound_router_max_idle_age,
);
let fut = serve(
serve(
"inbound",
inbound_listener,
router,
config.private_connect_timeout,
@ -251,8 +255,7 @@ where
sensors.clone(),
get_original_dst.clone(),
drain_rx.clone(),
);
::logging::context_future("inbound", fut)
)
};
// Setup the private listener. This will listen on a locally accessible
@ -266,7 +269,8 @@ where
config.outbound_router_capacity,
config.outbound_router_max_idle_age,
);
let fut = serve(
serve(
"outbound",
outbound_listener,
router,
config.public_connect_timeout,
@ -275,8 +279,7 @@ where
sensors,
get_original_dst,
drain_rx,
);
::logging::context_future("outbound", fut)
)
};
trace!("running");
@ -333,6 +336,7 @@ where
}
fn serve<R, B, E, F, G>(
name: &'static str,
bound_port: BoundPort,
router: Router<R>,
tcp_connect_timeout: Duration,
@ -410,10 +414,15 @@ where
let accept = bound_port.listen_and_fold(
(),
move |(), (connection, remote_addr)| {
server.serve(connection, remote_addr);
Ok(())
let s = server.serve(connection, remote_addr);
let s = ::logging::context_future((name, remote_addr), s);
let r = DefaultExecutor::current()
.spawn(Box::new(s))
.map_err(task::Error::into_io);
future::result(r)
},
);
let accept = ::logging::context_future(name, accept);
let accept_until = Cancelable {
future: accept,

View File

@ -7,7 +7,6 @@ use futures::{future::Either, Future};
use http;
use hyper;
use indexmap::IndexSet;
use tokio::executor::{Executor, DefaultExecutor};
use tokio::io::{AsyncRead, AsyncWrite};
use tower_service::NewService;
use tower_h2;
@ -104,7 +103,9 @@ where
/// what protocol the connection is speaking. From there, the connection
/// will be mapped into respective services, and spawned into an
/// executor.
pub fn serve(&self, connection: Connection, remote_addr: SocketAddr) {
pub fn serve(&self, connection: Connection, remote_addr: SocketAddr)
-> impl Future<Item=(), Error=()>
{
let opened_at = Instant::now();
// create Server context
@ -138,10 +139,7 @@ where
self.drain_signal.clone(),
);
DefaultExecutor::current()
.spawn(Box::new(fut))
.expect("spawn TCP server task");
return;
return Either::B(fut);
}
// try to sniff protocol
@ -150,7 +148,7 @@ where
let tcp = self.tcp.clone();
let new_service = self.new_service.clone();
let drain_signal = self.drain_signal.clone();
let fut = io.peek()
Either::A(io.peek()
.map_err(|e| debug!("peek error: {}", e))
.and_then(move |io| {
if let Some(proto) = Protocol::detect(io.peeked()) {
@ -196,11 +194,7 @@ where
drain_signal,
))
}
});
DefaultExecutor::current()
.spawn(Box::new(fut))
.expect("spawn transparent server task")
}))
}
}