proxy: wrap connections in Transport sensor before peeking (#851)

In case there are any errors while peeking the connection to do protocol
detection, the sensors will now be in place to detect them. Besides just
errors, this will also allow reporting about connections that are
accepted, but then immediately closed.

Additionally:

- add write_buf implementation for Transport sensor, can help
  performance for http1/http2
- add better logs for tcp connections errors
- add printlns for when tests fail

Signed-off-by: Sean McArthur <sean@seanmonstar.com>
This commit is contained in:
Sean McArthur 2018-04-27 14:18:23 -07:00 committed by GitHub
parent 6bcc8b25fb
commit 2e296dbf69
11 changed files with 144 additions and 113 deletions

View File

@ -13,7 +13,6 @@ use tower;
use tower_h2; use tower_h2;
use tower_reconnect::Reconnect; use tower_reconnect::Reconnect;
use conduit_proxy_controller_grpc;
use conduit_proxy_router::Reuse; use conduit_proxy_router::Reuse;
use control; use control;
@ -185,7 +184,6 @@ where
let client_ctx = ctx::transport::Client::new( let client_ctx = ctx::transport::Client::new(
&self.ctx, &self.ctx,
&addr, &addr,
conduit_proxy_controller_grpc::common::Protocol::Http,
ep.dst_labels().cloned(), ep.dst_labels().cloned(),
); );

View File

@ -36,6 +36,17 @@ pub enum Connection {
Plain(PlaintextSocket), Plain(PlaintextSocket),
} }
/// A trait describing that a type can peek (such as MSG_PEEK).
pub trait Peek {
fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize>;
}
/// A future of when some `Peek` fulfills with some bytes.
#[derive(Debug)]
pub struct PeekFuture<T, B> {
inner: Option<(T, B)>,
}
// ===== impl BoundPort ===== // ===== impl BoundPort =====
impl BoundPort { impl BoundPort {
@ -105,12 +116,6 @@ impl Connection {
self.socket().local_addr() self.socket().local_addr()
} }
pub fn peek_future<T: AsMut<[u8]>>(self, buf: T) -> Peek<T> {
Peek {
inner: Some((self, buf))
}
}
// This must never be made public so that in the future `Connection` can // This must never be made public so that in the future `Connection` can
// control access to the plaintext socket for TLS, to ensure no private // control access to the plaintext socket for TLS, to ensure no private
// data is accidentally writen to the socket and to ensure no unprotected // data is accidentally writen to the socket and to ensure no unprotected
@ -190,23 +195,37 @@ impl AsyncWrite for Connection {
} }
} }
// impl Peek impl Peek for Connection {
fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
use self::Connection::*;
pub struct Peek<T> { match *self {
inner: Option<(Connection, T)>, Plain(ref mut t) => t.peek(buf),
}
}
} }
impl<T: AsMut<[u8]>> Future for Peek<T> { // impl PeekFuture
type Item = (Connection, T, usize);
impl<T: Peek, B: AsMut<[u8]>> PeekFuture<T, B> {
pub fn new(io: T, buf: B) -> Self {
PeekFuture {
inner: Some((io, buf)),
}
}
}
impl<T: Peek, B: AsMut<[u8]>> Future for PeekFuture<T, B> {
type Item = (T, B, usize);
type Error = std::io::Error; type Error = std::io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (conn, mut buf) = self.inner.take().expect("polled after completed"); let (mut io, mut buf) = self.inner.take().expect("polled after completed");
match conn.socket().peek(buf.as_mut()) { match io.peek(buf.as_mut()) {
Ok(n) => Ok(Async::Ready((conn, buf, n))), Ok(n) => Ok(Async::Ready((io, buf, n))),
Err(e) => match e.kind() { Err(e) => match e.kind() {
std::io::ErrorKind::WouldBlock => { std::io::ErrorKind::WouldBlock => {
self.inner = Some((conn, buf)); self.inner = Some((io, buf));
Ok(Async::NotReady) Ok(Async::NotReady)
}, },
_ => Err(e) _ => Err(e)

View File

@ -2,8 +2,6 @@ use std::{cmp, hash};
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::sync::Arc; use std::sync::Arc;
use conduit_proxy_controller_grpc::common::Protocol;
use control::discovery::DstLabelsWatch; use control::discovery::DstLabelsWatch;
use ctx; use ctx;
@ -20,7 +18,6 @@ pub struct Server {
pub remote: SocketAddr, pub remote: SocketAddr,
pub local: SocketAddr, pub local: SocketAddr,
pub orig_dst: Option<SocketAddr>, pub orig_dst: Option<SocketAddr>,
pub protocol: Protocol,
} }
/// Identifies a connection from the proxy to another process. /// Identifies a connection from the proxy to another process.
@ -28,7 +25,6 @@ pub struct Server {
pub struct Client { pub struct Client {
pub proxy: Arc<ctx::Proxy>, pub proxy: Arc<ctx::Proxy>,
pub remote: SocketAddr, pub remote: SocketAddr,
pub protocol: Protocol,
pub dst_labels: Option<DstLabelsWatch>, pub dst_labels: Option<DstLabelsWatch>,
} }
@ -39,13 +35,6 @@ impl Ctx {
Ctx::Server(ref ctx) => &ctx.proxy, Ctx::Server(ref ctx) => &ctx.proxy,
} }
} }
pub fn protocol(&self) -> Protocol {
match *self {
Ctx::Client(ref ctx) => ctx.protocol,
Ctx::Server(ref ctx) => ctx.protocol,
}
}
} }
impl Server { impl Server {
@ -54,14 +43,12 @@ impl Server {
local: &SocketAddr, local: &SocketAddr,
remote: &SocketAddr, remote: &SocketAddr,
orig_dst: &Option<SocketAddr>, orig_dst: &Option<SocketAddr>,
protocol: Protocol,
) -> Arc<Server> { ) -> Arc<Server> {
let s = Server { let s = Server {
proxy: Arc::clone(proxy), proxy: Arc::clone(proxy),
local: *local, local: *local,
remote: *remote, remote: *remote,
orig_dst: *orig_dst, orig_dst: *orig_dst,
protocol: protocol,
}; };
Arc::new(s) Arc::new(s)
@ -95,13 +82,11 @@ impl Client {
pub fn new( pub fn new(
proxy: &Arc<ctx::Proxy>, proxy: &Arc<ctx::Proxy>,
remote: &SocketAddr, remote: &SocketAddr,
protocol: Protocol,
dst_labels: Option<DstLabelsWatch>, dst_labels: Option<DstLabelsWatch>,
) -> Arc<Client> { ) -> Arc<Client> {
let c = Client { let c = Client {
proxy: Arc::clone(proxy), proxy: Arc::clone(proxy),
remote: *remote, remote: *remote,
protocol,
dst_labels, dst_labels,
}; };
@ -113,7 +98,6 @@ impl hash::Hash for Client {
fn hash<H: hash::Hasher>(&self, state: &mut H) { fn hash<H: hash::Hasher>(&self, state: &mut H) {
self.proxy.hash(state); self.proxy.hash(state);
self.remote.hash(state); self.remote.hash(state);
self.protocol.hash(state);
// ignore dst_labels // ignore dst_labels
} }
} }
@ -121,8 +105,7 @@ impl hash::Hash for Client {
impl cmp::PartialEq for Client { impl cmp::PartialEq for Client {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.proxy.eq(&other.proxy) && self.proxy.eq(&other.proxy) &&
self.remote.eq(&other.remote) && self.remote.eq(&other.remote)
self.protocol.eq(&other.protocol)
} }
} }

View File

@ -94,7 +94,6 @@ mod tests {
use conduit_proxy_router::Recognize; use conduit_proxy_router::Recognize;
use super::Inbound; use super::Inbound;
use conduit_proxy_controller_grpc::common::Protocol;
use bind::{self, Bind, Host}; use bind::{self, Bind, Host};
use ctx; use ctx;
@ -114,7 +113,7 @@ mod tests {
let inbound = new_inbound(None, &ctx); let inbound = new_inbound(None, &ctx);
let srv_ctx = ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst), Protocol::Http); let srv_ctx = ctx::transport::Server::new(&ctx, &local, &remote, &Some(orig_dst));
let rec = srv_ctx.orig_dst_if_not_local().map(|addr| let rec = srv_ctx.orig_dst_if_not_local().map(|addr|
bind::Protocol::Http1(Host::NoAuthority).into_key(addr) bind::Protocol::Http1(Host::NoAuthority).into_key(addr)
@ -143,7 +142,6 @@ mod tests {
&local, &local,
&remote, &remote,
&None, &None,
Protocol::Http,
)); ));
inbound.recognize(&req) == default.map(|addr| inbound.recognize(&req) == default.map(|addr|
@ -179,7 +177,6 @@ mod tests {
&local, &local,
&remote, &remote,
&Some(local), &Some(local),
Protocol::Http,
)); ));
inbound.recognize(&req) == default.map(|addr| inbound.recognize(&req) == default.map(|addr|

View File

@ -1,10 +1,12 @@
use futures::{Future, Poll}; use bytes::Buf;
use futures::{Async, Future, Poll};
use std::io; use std::io;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use tokio_connect; use tokio_connect;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use connection::Peek;
use ctx; use ctx;
use telemetry::event; use telemetry::event;
@ -132,7 +134,7 @@ impl<T: AsyncRead + AsyncWrite> io::Read for Transport<T> {
let bytes = self.sense_err(move |io| io.read(buf))?; let bytes = self.sense_err(move |io| io.read(buf))?;
if let Some(inner) = self.1.as_mut() { if let Some(inner) = self.1.as_mut() {
inner.rx_bytes += bytes as u64; inner.rx_bytes += bytes as u64;
} }
Ok(bytes) Ok(bytes)
@ -148,7 +150,7 @@ impl<T: AsyncRead + AsyncWrite> io::Write for Transport<T> {
let bytes = self.sense_err(move |io| io.write(buf))?; let bytes = self.sense_err(move |io| io.write(buf))?;
if let Some(inner) = self.1.as_mut() { if let Some(inner) = self.1.as_mut() {
inner.tx_bytes += bytes as u64; inner.tx_bytes += bytes as u64;
} }
Ok(bytes) Ok(bytes)
@ -165,6 +167,22 @@ impl<T: AsyncRead + AsyncWrite> AsyncWrite for Transport<T> {
fn shutdown(&mut self) -> Poll<(), io::Error> { fn shutdown(&mut self) -> Poll<(), io::Error> {
self.sense_err(|io| io.shutdown()) self.sense_err(|io| io.shutdown())
} }
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
let bytes = try_ready!(self.sense_err(|io| io.write_buf(buf)));
if let Some(inner) = self.1.as_mut() {
inner.tx_bytes += bytes as u64;
}
Ok(Async::Ready(bytes))
}
}
impl<T: AsyncRead + AsyncWrite + Peek> Peek for Transport<T> {
fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.sense_err(|io| io.peek(buf))
}
} }
// === impl Connect === // === impl Connect ===

View File

@ -8,11 +8,11 @@ use http;
use hyper; use hyper;
use indexmap::IndexSet; use indexmap::IndexSet;
use tokio_core::reactor::Handle; use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
use tower::NewService; use tower::NewService;
use tower_h2; use tower_h2;
use conduit_proxy_controller_grpc::common; use connection::{Connection, PeekFuture};
use connection::Connection;
use ctx::Proxy as ProxyCtx; use ctx::Proxy as ProxyCtx;
use ctx::transport::{Server as ServerCtx}; use ctx::transport::{Server as ServerCtx};
use drain; use drain;
@ -99,6 +99,15 @@ where
// create Server context // create Server context
let orig_dst = connection.original_dst_addr(&self.get_orig_dst); let orig_dst = connection.original_dst_addr(&self.get_orig_dst);
let local_addr = connection.local_addr().unwrap_or(self.listen_addr); let local_addr = connection.local_addr().unwrap_or(self.listen_addr);
let srv_ctx = ServerCtx::new(
&self.proxy_ctx,
&local_addr,
&remote_addr,
&orig_dst,
);
// record telemetry
let io = self.sensors.accept(connection, opened_at, &srv_ctx);
// We are using the port from the connection's SO_ORIGINAL_DST to // We are using the port from the connection's SO_ORIGINAL_DST to
// determine whether to skip protocol detection, not any port that // determine whether to skip protocol detection, not any port that
@ -113,44 +122,25 @@ where
trace!("protocol detection disabled for {:?}", orig_dst); trace!("protocol detection disabled for {:?}", orig_dst);
let fut = tcp_serve( let fut = tcp_serve(
&self.tcp, &self.tcp,
connection, io,
srv_ctx,
self.drain_signal.clone(), self.drain_signal.clone(),
&self.sensors,
opened_at,
&self.proxy_ctx,
LocalAddr(&local_addr),
RemoteAddr(&remote_addr),
OrigDst(&orig_dst),
); );
self.executor.spawn(fut); self.executor.spawn(fut);
return; return;
} }
// try to sniff protocol // try to sniff protocol
let proxy_ctx = self.proxy_ctx.clone();
let sniff = [0u8; 32]; let sniff = [0u8; 32];
let sensors = self.sensors.clone();
let h1 = self.h1.clone(); let h1 = self.h1.clone();
let h2 = self.h2.clone(); let h2 = self.h2.clone();
let tcp = self.tcp.clone(); let tcp = self.tcp.clone();
let new_service = self.new_service.clone(); let new_service = self.new_service.clone();
let drain_signal = self.drain_signal.clone(); let drain_signal = self.drain_signal.clone();
let fut = connection let fut = PeekFuture::new(io, sniff)
.peek_future(sniff) .map_err(|e| debug!("peek error: {}", e))
.map_err(|_| ()) .and_then(move |(io, sniff, n)| -> Box<Future<Item=(), Error=()>> {
.and_then(move |(connection, sniff, n)| -> Box<Future<Item=(), Error=()>> {
if let Some(proto) = Protocol::detect(&sniff[..n]) { if let Some(proto) = Protocol::detect(&sniff[..n]) {
let srv_ctx = ServerCtx::new(
&proxy_ctx,
&local_addr,
&remote_addr,
&orig_dst,
common::Protocol::Http,
);
// record telemetry
let io = sensors.accept(connection, opened_at, &srv_ctx);
match proto { match proto {
Protocol::Http1 => { Protocol::Http1 => {
trace!("transparency detected HTTP/1"); trace!("transparency detected HTTP/1");
@ -187,14 +177,9 @@ where
trace!("transparency did not detect protocol, treating as TCP"); trace!("transparency did not detect protocol, treating as TCP");
tcp_serve( tcp_serve(
&tcp, &tcp,
connection, io,
srv_ctx,
drain_signal, drain_signal,
&sensors,
opened_at,
&proxy_ctx,
LocalAddr(&local_addr),
RemoteAddr(&remote_addr),
OrigDst(&orig_dst),
) )
} }
}); });
@ -203,37 +188,13 @@ where
} }
} }
// These newtypes act as a form of keyword arguments. fn tcp_serve<T: AsyncRead + AsyncWrite + 'static>(
//
// It should be easier to notice when wrapping `LocalAddr(remote_addr)` at
// the call site, then simply passing multiple socket addr arguments.
struct LocalAddr<'a>(&'a SocketAddr);
struct RemoteAddr<'a>(&'a SocketAddr);
struct OrigDst<'a>(&'a Option<SocketAddr>);
fn tcp_serve(
tcp: &tcp::Proxy, tcp: &tcp::Proxy,
connection: Connection, io: T,
srv_ctx: Arc<ServerCtx>,
drain_signal: drain::Watch, drain_signal: drain::Watch,
sensors: &Sensors,
opened_at: Instant,
proxy_ctx: &Arc<ProxyCtx>,
local_addr: LocalAddr,
remote_addr: RemoteAddr,
orig_dst: OrigDst,
) -> Box<Future<Item=(), Error=()>> { ) -> Box<Future<Item=(), Error=()>> {
let srv_ctx = ServerCtx::new( let fut = tcp.serve(io, srv_ctx);
proxy_ctx,
local_addr.0,
remote_addr.0,
orig_dst.0,
common::Protocol::Tcp,
);
// record telemetry
let tcp_in = sensors.accept(connection, opened_at, &srv_ctx);
let fut = tcp.serve(tcp_in, srv_ctx);
// There's nothing to do when drain is signaled, we just have to hope // There's nothing to do when drain is signaled, we just have to hope
// the sockets finish soon. However, the drain signal still needs to // the sockets finish soon. However, the drain signal still needs to

View File

@ -8,7 +8,6 @@ use tokio_connect::Connect;
use tokio_core::reactor::Handle; use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use conduit_proxy_controller_grpc::common;
use ctx::transport::{Client as ClientCtx, Server as ServerCtx}; use ctx::transport::{Client as ClientCtx, Server as ServerCtx};
use telemetry::Sensors; use telemetry::Sensors;
use timeout::Timeout; use timeout::Timeout;
@ -60,7 +59,6 @@ impl Proxy {
let client_ctx = ClientCtx::new( let client_ctx = ClientCtx::new(
&srv_ctx.proxy, &srv_ctx.proxy,
&orig_dst, &orig_dst,
common::Protocol::Tcp,
None, None,
); );
let c = Timeout::new( let c = Timeout::new(
@ -74,7 +72,7 @@ impl Proxy {
.map_err(|e| debug!("tcp connect error: {:?}", e)) .map_err(|e| debug!("tcp connect error: {:?}", e))
.and_then(move |tcp_out| { .and_then(move |tcp_out| {
Duplex::new(tcp_in, tcp_out) Duplex::new(tcp_in, tcp_out)
.map_err(|e| debug!("tcp error: {}", e)) .map_err(|e| error!("tcp duplex error: {}", e))
}); });
Box::new(fut) Box::new(fut)
} }

View File

@ -154,6 +154,17 @@ pub fn s(bytes: &[u8]) -> &str {
::std::str::from_utf8(bytes.as_ref()).unwrap() ::std::str::from_utf8(bytes.as_ref()).unwrap()
} }
/// The Rust test runner creates a thread per unit test, naming it after
/// the function name. If still in that thread, this can be useful to allow
/// associating test logs with a specific test, since tests *can* be run in
/// parallel.
pub fn thread_name() -> String {
::std::thread::current()
.name()
.unwrap_or("<no-name>")
.to_owned()
}
#[test] #[test]
#[should_panic] #[should_panic]
fn assert_eventually() { fn assert_eventually() {

View File

@ -175,8 +175,9 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening {
rx = Box::new(rx.select(fut).then(|_| Ok(()))); rx = Box::new(rx.select(fut).then(|_| Ok(())));
} }
let tname = format!("support proxy (test={})", thread_name());
::std::thread::Builder::new() ::std::thread::Builder::new()
.name("support proxy".into()) .name(tname)
.spawn(move || { .spawn(move || {
let _c = controller; let _c = controller;
@ -220,6 +221,23 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening {
let (control_addr, inbound_addr, outbound_addr, metrics_addr) = let (control_addr, inbound_addr, outbound_addr, metrics_addr) =
running_rx.wait().unwrap(); running_rx.wait().unwrap();
// printlns will show if the test fails...
println!(
"proxy running; control={}, inbound={}{}, outbound={}{}, metrics={}",
control_addr,
inbound_addr,
inbound
.as_ref()
.map(|i| format!(" (SO_ORIGINAL_DST={})", i.addr))
.unwrap_or_else(String::new),
outbound_addr,
outbound
.as_ref()
.map(|o| format!(" (SO_ORIGINAL_DST={})", o.addr))
.unwrap_or_else(String::new),
metrics_addr,
);
Listening { Listening {
control: control_addr, control: control_addr,
inbound: inbound_addr, inbound: inbound_addr,

View File

@ -38,6 +38,12 @@ impl Listening {
} }
} }
impl Drop for Listening {
fn drop(&mut self) {
println!("server Listening dropped; addr={}", self.addr);
}
}
impl Server { impl Server {
fn new(run: Run) -> Self { fn new(run: Run) -> Self {
Server { Server {
@ -103,7 +109,13 @@ impl Server {
let (addr_tx, addr_rx) = oneshot::channel(); let (addr_tx, addr_rx) = oneshot::channel();
let conn_count = Arc::new(AtomicUsize::from(0)); let conn_count = Arc::new(AtomicUsize::from(0));
let srv_conn_count = Arc::clone(&conn_count); let srv_conn_count = Arc::clone(&conn_count);
::std::thread::Builder::new().name("support server".into()).spawn(move || { let version = self.version;
let tname = format!(
"support {:?} server (test={})",
version,
thread_name(),
);
::std::thread::Builder::new().name(tname).spawn(move || {
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
let reactor = core.handle(); let reactor = core.handle();
@ -172,6 +184,13 @@ impl Server {
let addr = addr_rx.wait().expect("addr"); let addr = addr_rx.wait().expect("addr");
// printlns will show if the test fails...
println!(
"{:?} server running; addr={}",
version,
addr,
);
Listening { Listening {
addr, addr,
shutdown: tx, shutdown: tx,
@ -180,7 +199,7 @@ impl Server {
} }
} }
#[derive(Debug)] #[derive(Clone, Copy, Debug)]
enum Run { enum Run {
Http1, Http1,
Http2, Http2,

View File

@ -59,6 +59,7 @@ impl TcpClient {
let tx = rx.map_err(|_| panic!("tcp connect dropped")) let tx = rx.map_err(|_| panic!("tcp connect dropped"))
.wait() .wait()
.unwrap(); .unwrap();
println!("tcp client (addr={}): connected", self.addr);
TcpConn { TcpConn {
addr: self.addr, addr: self.addr,
tx, tx,
@ -110,6 +111,7 @@ impl TcpConn {
} }
pub fn try_read(&self) -> io::Result<Vec<u8>> { pub fn try_read(&self) -> io::Result<Vec<u8>> {
println!("tcp client (addr={}): read", self.addr);
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send((None, tx)); let _ = self.tx.unbounded_send((None, tx));
rx.map_err(|_| panic!("tcp read dropped")) rx.map_err(|_| panic!("tcp read dropped"))
@ -119,6 +121,7 @@ impl TcpConn {
} }
pub fn write<T: Into<Vec<u8>>>(&self, buf: T) { pub fn write<T: Into<Vec<u8>>>(&self, buf: T) {
println!("tcp client (addr={}): write", self.addr);
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send((Some(buf.into()), tx)); let _ = self.tx.unbounded_send((Some(buf.into()), tx));
rx.map_err(|_| panic!("tcp write dropped")) rx.map_err(|_| panic!("tcp write dropped"))
@ -131,8 +134,8 @@ impl TcpConn {
fn run_client(addr: SocketAddr) -> TcpSender { fn run_client(addr: SocketAddr) -> TcpSender {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
let thread_name = format!("support tcp client (addr={})", addr); let tname = format!("support tcp client (addr={})", addr);
::std::thread::Builder::new().name(thread_name).spawn(move || { ::std::thread::Builder::new().name(tname).spawn(move || {
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
let handle = core.handle(); let handle = core.handle();
@ -188,6 +191,8 @@ fn run_client(addr: SocketAddr) -> TcpSender {
}).map_err(|e| println!("client error: {:?}", e)); }).map_err(|e| println!("client error: {:?}", e));
core.run(work).unwrap(); core.run(work).unwrap();
}).unwrap(); }).unwrap();
println!("tcp client (addr={}) thread running", addr);
tx tx
} }
@ -199,8 +204,8 @@ fn run_server(tcp: TcpServer) -> server::Listening {
let any_port = SocketAddr::from(([127, 0, 0, 1], 0)); let any_port = SocketAddr::from(([127, 0, 0, 1], 0));
let std_listener = StdTcpListener::bind(&any_port).expect("bind"); let std_listener = StdTcpListener::bind(&any_port).expect("bind");
let addr = std_listener.local_addr().expect("local_addr"); let addr = std_listener.local_addr().expect("local_addr");
let thread_name = format!("support tcp server (addr={})", addr); let tname = format!("support tcp server (addr={})", addr);
::std::thread::Builder::new().name(thread_name).spawn(move || { ::std::thread::Builder::new().name(tname).spawn(move || {
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
let reactor = core.handle(); let reactor = core.handle();
@ -233,6 +238,10 @@ fn run_server(tcp: TcpServer) -> server::Listening {
}).unwrap(); }).unwrap();
started_rx.wait().expect("support tcp server started"); started_rx.wait().expect("support tcp server started");
// printlns will show if the test fails...
println!("tcp server (addr={}): running", addr);
server::Listening { server::Listening {
addr, addr,
shutdown: tx, shutdown: tx,