proxy: clean up some logs and a few warnings in proxy tests (#780)

Signed-off-by: Sean McArthur <sean@seanmonstar.com>
This commit is contained in:
Sean McArthur 2018-04-17 12:53:20 -07:00 committed by GitHub
parent 239b362e9a
commit 3e2d782d19
11 changed files with 34 additions and 25 deletions

View File

@ -96,7 +96,6 @@ impl Control {
match self.rx.take() { match self.rx.take() {
None => Ok(Async::Ready(None)), None => Ok(Async::Ready(None)),
Some(mut rx) => { Some(mut rx) => {
trace!("recv.poll({:?})", rx);
match rx.poll() { match rx.poll() {
Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
ev => { ev => {
@ -138,7 +137,6 @@ impl Future for Control {
type Error = (); type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
trace!("poll");
loop { loop {
match try_ready!(self.recv()) { match try_ready!(self.recv()) {
Some(ev) => { Some(ev) => {
@ -151,7 +149,7 @@ impl Future for Control {
self.metrics_aggregate.record_event(&ev); self.metrics_aggregate.record_event(&ev);
} }
None => { None => {
warn!("events finished"); debug!("events finished");
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
} }
}; };

View File

@ -50,7 +50,7 @@ impl Proxy {
); );
orig_dst orig_dst
} else { } else {
debug!( warn!(
"tcp accepted, no SO_ORIGINAL_DST to forward: remote={}", "tcp accepted, no SO_ORIGINAL_DST to forward: remote={}",
srv_ctx.remote, srv_ctx.remote,
); );

View File

@ -1,3 +1,4 @@
#![deny(warnings)]
mod support; mod support;
use self::support::*; use self::support::*;

View File

@ -1,3 +1,4 @@
#![deny(warnings)]
mod support; mod support;
use self::support::*; use self::support::*;
@ -158,3 +159,4 @@ fn tcp_waits_for_proxies_to_close() {
tcp_client.write(msg1); tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes()); assert_eq!(tcp_client.read(), msg2.as_bytes());
} }

View File

@ -112,7 +112,7 @@ enum Run {
} }
fn run(addr: SocketAddr, version: Run) -> (Sender, Running) { fn run(addr: SocketAddr, version: Run) -> (Sender, Running) {
let (tx, mut rx) = mpsc::unbounded::<(Request, oneshot::Sender<Result<Response, String>>)>(); let (tx, rx) = mpsc::unbounded::<(Request, oneshot::Sender<Result<Response, String>>)>();
let (running_tx, running_rx) = running(); let (running_tx, running_rx) = running();
::std::thread::Builder::new().name("support client".into()).spawn(move || { ::std::thread::Builder::new().name("support client".into()).spawn(move || {

View File

@ -9,7 +9,6 @@ use std::sync::{Arc, Mutex};
use conduit_proxy_controller_grpc as pb; use conduit_proxy_controller_grpc as pb;
use self::bytes::BufMut; use self::bytes::BufMut;
use self::futures::sync::mpsc;
use self::prost::Message; use self::prost::Message;
pub fn new() -> Controller { pub fn new() -> Controller {
@ -35,7 +34,7 @@ impl Controller {
} }
} }
pub fn destination(mut self, dest: &str, addr: SocketAddr) -> Self { pub fn destination(self, dest: &str, addr: SocketAddr) -> Self {
self.destination_fn(dest, move || Some(destination_update( self.destination_fn(dest, move || Some(destination_update(
addr, addr,
HashMap::new(), HashMap::new(),
@ -43,7 +42,7 @@ impl Controller {
))) )))
} }
pub fn labeled_destination(mut self, dest: &str, addr: SocketAddr, pub fn labeled_destination(self, dest: &str, addr: SocketAddr,
addr_labels: HashMap<String, String>, addr_labels: HashMap<String, String>,
set_labels:HashMap<String, String>) set_labels:HashMap<String, String>)
-> Self -> Self
@ -65,7 +64,7 @@ impl Controller {
} }
pub fn destination_close(mut self, dest: &str) -> Self { pub fn destination_close(self, dest: &str) -> Self {
self.destination_fn(dest, || None) self.destination_fn(dest, || None)
} }

View File

@ -1,4 +1,9 @@
#![allow(unused)] // The support mod is compiled for all the integration tests, which are each
// compiled as separate crates. Each only uses a subset of this module, which
// means some of it is unused.
//
// Note, lints like `unused_variable` should not be ignored.
#![allow(dead_code)]
extern crate bytes; extern crate bytes;
pub extern crate conduit_proxy_controller_grpc; pub extern crate conduit_proxy_controller_grpc;
@ -18,8 +23,6 @@ extern crate log;
pub extern crate env_logger; pub extern crate env_logger;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
pub use std::time::Duration; pub use std::time::Duration;
use self::bytes::{BigEndian, BytesMut}; use self::bytes::{BigEndian, BytesMut};

View File

@ -85,7 +85,7 @@ impl Proxy {
self.run_with_test_env(config::TestEnv::new()) self.run_with_test_env(config::TestEnv::new())
} }
pub fn run_with_test_env(self, mut env: config::TestEnv) -> Listening { pub fn run_with_test_env(self, env: config::TestEnv) -> Listening {
run(self, env) run(self, env)
} }
} }
@ -163,7 +163,7 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening {
); );
} }
let mut config = config::Config::try_from(&env).unwrap(); let config = config::Config::try_from(&env).unwrap();
let (running_tx, running_rx) = oneshot::channel(); let (running_tx, running_rx) = oneshot::channel();
let (tx, mut rx) = shutdown_signal(); let (tx, mut rx) = shutdown_signal();

View File

@ -62,7 +62,7 @@ impl TcpClient {
} }
impl TcpServer { impl TcpServer {
pub fn accept<F, U>(mut self, cb: F) -> Self pub fn accept<F, U>(self, cb: F) -> Self
where where
F: FnOnce(Vec<u8>) -> U + Send + 'static, F: FnOnce(Vec<u8>) -> U + Send + 'static,
U: Into<Vec<u8>>, U: Into<Vec<u8>>,
@ -122,7 +122,7 @@ impl TcpConn {
fn run_client(addr: SocketAddr) -> TcpSender { fn run_client(addr: SocketAddr) -> TcpSender {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
::std::thread::Builder::new().name("support client".into()).spawn(move || { ::std::thread::Builder::new().name("support tcp client".into()).spawn(move || {
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
let handle = core.handle(); let handle = core.handle();
@ -186,7 +186,7 @@ fn run_server(tcp: TcpServer) -> server::Listening {
let (addr_tx, addr_rx) = oneshot::channel(); let (addr_tx, addr_rx) = oneshot::channel();
let conn_count = Arc::new(AtomicUsize::from(0)); let conn_count = Arc::new(AtomicUsize::from(0));
let srv_conn_count = Arc::clone(&conn_count); let srv_conn_count = Arc::clone(&conn_count);
::std::thread::Builder::new().name("support server".into()).spawn(move || { ::std::thread::Builder::new().name("support tcp server".into()).spawn(move || {
let mut core = Core::new().unwrap(); let mut core = Core::new().unwrap();
let reactor = core.handle(); let reactor = core.handle();
@ -198,17 +198,21 @@ fn run_server(tcp: TcpServer) -> server::Listening {
let mut accepts = tcp.accepts; let mut accepts = tcp.accepts;
let work = bind.incoming().for_each(move |(sock, _)| { let listen = bind
let cb = accepts.pop_front().expect("no more accepts"); .incoming()
srv_conn_count.fetch_add(1, Ordering::Release); .for_each(move |(sock, _)| {
let cb = accepts.pop_front().expect("no more accepts");
srv_conn_count.fetch_add(1, Ordering::Release);
let fut = cb.call_box(sock); let fut = cb.call_box(sock);
reactor.spawn(fut); reactor.spawn(fut);
Ok(()) Ok(())
}); })
.map_err(|e| panic!("tcp accept error: {}", e));
core.run(work).unwrap(); core.handle().spawn(listen);
core.run(rx).unwrap();
}).unwrap(); }).unwrap();
let addr = addr_rx.wait().expect("addr"); let addr = addr_rx.wait().expect("addr");

View File

@ -1,3 +1,4 @@
#![deny(warnings)]
#[macro_use] #[macro_use]
extern crate log; extern crate log;

View File

@ -1,3 +1,4 @@
#![deny(warnings)]
mod support; mod support;
use self::support::*; use self::support::*;