proxy: clean up some logs and a few warnings in proxy tests (#780)

Signed-off-by: Sean McArthur <sean@seanmonstar.com>
This commit is contained in:
Sean McArthur 2018-04-17 12:53:20 -07:00 committed by GitHub
parent cf2d7b1d7d
commit 3cd16e8e40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 34 additions and 25 deletions

View File

@ -96,7 +96,6 @@ impl Control {
match self.rx.take() {
None => Ok(Async::Ready(None)),
Some(mut rx) => {
trace!("recv.poll({:?})", rx);
match rx.poll() {
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
ev => {
@ -138,7 +137,6 @@ impl Future for Control {
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
trace!("poll");
loop {
match try_ready!(self.recv()) {
Some(ev) => {
@ -151,7 +149,7 @@ impl Future for Control {
self.metrics_aggregate.record_event(&ev);
}
None => {
warn!("events finished");
debug!("events finished");
return Ok(Async::Ready(()));
}
};

View File

@ -50,7 +50,7 @@ impl Proxy {
);
orig_dst
} else {
debug!(
warn!(
"tcp accepted, no SO_ORIGINAL_DST to forward: remote={}",
srv_ctx.remote,
);

View File

@ -1,3 +1,4 @@
#![deny(warnings)]
mod support;
use self::support::*;

View File

@ -1,3 +1,4 @@
#![deny(warnings)]
mod support;
use self::support::*;
@ -158,3 +159,4 @@ fn tcp_waits_for_proxies_to_close() {
tcp_client.write(msg1);
assert_eq!(tcp_client.read(), msg2.as_bytes());
}

View File

@ -112,7 +112,7 @@ enum Run {
}
fn run(addr: SocketAddr, version: Run) -> (Sender, Running) {
let (tx, mut rx) = mpsc::unbounded::<(Request, oneshot::Sender<Result<Response, String>>)>();
let (tx, rx) = mpsc::unbounded::<(Request, oneshot::Sender<Result<Response, String>>)>();
let (running_tx, running_rx) = running();
::std::thread::Builder::new().name("support client".into()).spawn(move || {

View File

@ -9,7 +9,6 @@ use std::sync::{Arc, Mutex};
use conduit_proxy_controller_grpc as pb;
use self::bytes::BufMut;
use self::futures::sync::mpsc;
use self::prost::Message;
pub fn new() -> Controller {
@ -35,7 +34,7 @@ impl Controller {
}
}
pub fn destination(mut self, dest: &str, addr: SocketAddr) -> Self {
pub fn destination(self, dest: &str, addr: SocketAddr) -> Self {
self.destination_fn(dest, move || Some(destination_update(
addr,
HashMap::new(),
@ -43,7 +42,7 @@ impl Controller {
)))
}
pub fn labeled_destination(mut self, dest: &str, addr: SocketAddr,
pub fn labeled_destination(self, dest: &str, addr: SocketAddr,
addr_labels: HashMap<String, String>,
set_labels:HashMap<String, String>)
-> Self
@ -65,7 +64,7 @@ impl Controller {
}
pub fn destination_close(mut self, dest: &str) -> Self {
pub fn destination_close(self, dest: &str) -> Self {
self.destination_fn(dest, || None)
}

View File

@ -1,4 +1,9 @@
#![allow(unused)]
// The support mod is compiled for all the integration tests, which are each
// compiled as separate crates. Each only uses a subset of this module, which
// means some of it is unused.
//
// Note, lints like `unused_variable` should not be ignored.
#![allow(dead_code)]
extern crate bytes;
pub extern crate conduit_proxy_controller_grpc;
@ -18,8 +23,6 @@ extern crate log;
pub extern crate env_logger;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
pub use std::time::Duration;
use self::bytes::{BigEndian, BytesMut};

View File

@ -85,7 +85,7 @@ impl Proxy {
self.run_with_test_env(config::TestEnv::new())
}
pub fn run_with_test_env(self, mut env: config::TestEnv) -> Listening {
pub fn run_with_test_env(self, env: config::TestEnv) -> Listening {
run(self, env)
}
}
@ -163,7 +163,7 @@ fn run(proxy: Proxy, mut env: config::TestEnv) -> Listening {
);
}
let mut config = config::Config::try_from(&env).unwrap();
let config = config::Config::try_from(&env).unwrap();
let (running_tx, running_rx) = oneshot::channel();
let (tx, mut rx) = shutdown_signal();

View File

@ -62,7 +62,7 @@ impl TcpClient {
}
impl TcpServer {
pub fn accept<F, U>(mut self, cb: F) -> Self
pub fn accept<F, U>(self, cb: F) -> Self
where
F: FnOnce(Vec<u8>) -> U + Send + 'static,
U: Into<Vec<u8>>,
@ -122,7 +122,7 @@ impl TcpConn {
fn run_client(addr: SocketAddr) -> TcpSender {
let (tx, rx) = mpsc::unbounded();
::std::thread::Builder::new().name("support client".into()).spawn(move || {
::std::thread::Builder::new().name("support tcp client".into()).spawn(move || {
let mut core = Core::new().unwrap();
let handle = core.handle();
@ -186,7 +186,7 @@ fn run_server(tcp: TcpServer) -> server::Listening {
let (addr_tx, addr_rx) = oneshot::channel();
let conn_count = Arc::new(AtomicUsize::from(0));
let srv_conn_count = Arc::clone(&conn_count);
::std::thread::Builder::new().name("support server".into()).spawn(move || {
::std::thread::Builder::new().name("support tcp server".into()).spawn(move || {
let mut core = Core::new().unwrap();
let reactor = core.handle();
@ -198,17 +198,21 @@ fn run_server(tcp: TcpServer) -> server::Listening {
let mut accepts = tcp.accepts;
let work = bind.incoming().for_each(move |(sock, _)| {
let cb = accepts.pop_front().expect("no more accepts");
srv_conn_count.fetch_add(1, Ordering::Release);
let listen = bind
.incoming()
.for_each(move |(sock, _)| {
let cb = accepts.pop_front().expect("no more accepts");
srv_conn_count.fetch_add(1, Ordering::Release);
let fut = cb.call_box(sock);
let fut = cb.call_box(sock);
reactor.spawn(fut);
Ok(())
});
reactor.spawn(fut);
Ok(())
})
.map_err(|e| panic!("tcp accept error: {}", e));
core.run(work).unwrap();
core.handle().spawn(listen);
core.run(rx).unwrap();
}).unwrap();
let addr = addr_rx.wait().expect("addr");

View File

@ -1,3 +1,4 @@
#![deny(warnings)]
#[macro_use]
extern crate log;

View File

@ -1,3 +1,4 @@
#![deny(warnings)]
mod support;
use self::support::*;