From 3f872136023c63ffeeb2263d202eaab6fee78252 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Tue, 5 Dec 2017 00:44:16 +0000 Subject: [PATCH] apply rustffmt on proxy, remove rustfmt.toml for now --- .rustfmt.toml | 84 --------- futures-mpsc-lossy/src/lib.rs | 9 +- proxy/build.rs | 8 +- proxy/src/app.rs | 1 - proxy/src/bind.rs | 11 +- proxy/src/config.rs | 78 +++++--- proxy/src/connection.rs | 4 +- proxy/src/control/codec.rs | 6 +- proxy/src/control/discovery.rs | 149 +++++++-------- proxy/src/control/mod.rs | 24 ++- proxy/src/control/observe.rs | 14 +- proxy/src/control/pb.rs | 202 ++++++++++---------- proxy/src/control/telemetry.rs | 74 ++++---- proxy/src/ctx/mod.rs | 23 ++- proxy/src/dns.rs | 27 ++- proxy/src/inbound.rs | 32 ++-- proxy/src/lib.rs | 90 ++++----- proxy/src/logging.rs | 7 +- proxy/src/main.rs | 5 +- proxy/src/map_err.rs | 46 +++-- proxy/src/outbound.rs | 16 +- proxy/src/telemetry/control.rs | 20 +- proxy/src/telemetry/event.rs | 11 +- proxy/src/telemetry/metrics.rs | 116 +++++++----- proxy/src/telemetry/mod.rs | 8 +- proxy/src/telemetry/sensor/http.rs | 46 ++--- proxy/src/telemetry/sensor/mod.rs | 28 ++- proxy/src/telemetry/sensor/transport.rs | 33 +++- proxy/src/telemetry/tap/match_.rs | 235 +++++++++++------------- proxy/src/telemetry/tap/mod.rs | 12 +- proxy/src/tower_fn.rs | 16 +- proxy/src/transport/connect.rs | 23 ++- proxy/src/transport/so_original_dst.rs | 4 +- proxy/tests/discovery.rs | 19 +- proxy/tests/support/client.rs | 27 +-- proxy/tests/support/controller.rs | 90 ++++----- proxy/tests/support/mod.rs | 16 +- proxy/tests/support/server.rs | 17 +- proxy/tests/telemetry.rs | 4 +- 39 files changed, 784 insertions(+), 851 deletions(-) delete mode 100644 .rustfmt.toml diff --git a/.rustfmt.toml b/.rustfmt.toml deleted file mode 100644 index 3e26058ba..000000000 --- a/.rustfmt.toml +++ /dev/null @@ -1,84 +0,0 @@ -# https://github.com/rust-lang-nursery/rustfmt/blob/master/Configurations.md -verbose = false -disable_all_formatting = false -skip_children = false -max_width = 100 -error_on_line_overflow = false -error_on_line_overflow_comments = false -tab_spaces = 4 -fn_call_width = 60 -struct_lit_width = 18 -struct_variant_width = 35 -force_explicit_abi = true -newline_style = "Unix" -fn_brace_style = "SameLineWhere" -item_brace_style = "SameLineWhere" -control_style = "Rfc" -control_brace_style = "AlwaysSameLine" -impl_empty_single_line = true -trailing_comma = "Vertical" -trailing_semicolon = true -fn_empty_single_line = true -fn_single_line = false -fn_return_indent = "WithArgs" -fn_args_paren_newline = false -fn_args_density = "Tall" -fn_args_layout = "Block" -array_layout = "Block" -array_width = 60 -array_horizontal_layout_threshold = 0 -type_punctuation_density = "Wide" -where_style = "Rfc" -where_density = "CompressedIfEmpty" -where_layout = "Vertical" -where_pred_indent = "Visual" -generics_indent = "Block" -struct_lit_style = "Block" -struct_lit_multiline_style = "ForceMulti" -fn_call_style = "Block" -report_todo = "Never" -report_fixme = "Never" -chain_indent = "Block" -chain_one_line_max = 60 -chain_split_single_child = false -imports_indent = "Block" -imports_layout = "HorizontalVertical" -reorder_extern_crates = true -reorder_extern_crates_in_group = true -reorder_imports = true -reorder_imports_in_group = true -reorder_imported_names = true -single_line_if_else_max_width = 50 -format_strings = true -force_format_strings = false -take_source_hints = false -hard_tabs = false -wrap_comments = false -comment_width = 80 -normalize_comments = false -wrap_match_arms = true -match_block_trailing_comma = true -indent_match_arms = true -match_pattern_separator_break_point = "Back" -closure_block_indent_threshold = 0 -space_before_type_annotation = false -space_after_type_annotation_colon = true -space_before_struct_lit_field_colon = false -space_after_struct_lit_field_colon = true -space_before_bound = false -space_after_bound_colon = true -spaces_around_ranges = false -spaces_within_angle_brackets = false -spaces_within_square_brackets = false -spaces_within_parens = false -use_try_shorthand = true -write_mode = "Overwrite" -condense_wildcard_suffixes = false -combine_control_expr = true -struct_field_align_threshold = 0 -remove_blank_lines_at_start_or_end_of_block = true -attributes_on_same_line_as_field = true -attributes_on_same_line_as_variant = true -multiline_closure_forces_block = false -multiline_match_arm_forces_block = false -merge_derives = true diff --git a/futures-mpsc-lossy/src/lib.rs b/futures-mpsc-lossy/src/lib.rs index d50144198..ed3f62308 100644 --- a/futures-mpsc-lossy/src/lib.rs +++ b/futures-mpsc-lossy/src/lib.rs @@ -92,7 +92,8 @@ impl Sender { } } - self.tx.unbounded_send(v) + self.tx + .unbounded_send(v) .map_err(|se| SendError::NoReceiver(se.into_inner())) } } @@ -103,8 +104,7 @@ impl Sink for Sender { type SinkError = SendError; fn start_send(&mut self, item: T) -> StartSend { - self.lossy_send(item) - .map(|_| AsyncSink::Ready) + self.lossy_send(item).map(|_| AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { @@ -136,8 +136,7 @@ impl fmt::Debug for Sender { impl SendError { pub fn into_inner(self) -> T { match self { - SendError::NoReceiver(v) | - SendError::Rejected(v) => v + SendError::NoReceiver(v) | SendError::Rejected(v) => v, } } } diff --git a/proxy/build.rs b/proxy/build.rs index da0886f38..c17687940 100644 --- a/proxy/build.rs +++ b/proxy/build.rs @@ -10,12 +10,8 @@ fn build_control() { "../proto/proxy/destination/destination.proto", "../proto/proxy/telemetry/telemetry.proto", ]; - let server_files = &[ - "../proto/proxy/tap/tap.proto", - ]; - let dirs = &[ - "../proto", - ]; + let server_files = &["../proto/proxy/tap/tap.proto"]; + let dirs = &["../proto"]; tower_grpc_build::Config::new() .enable_client(true) diff --git a/proxy/src/app.rs b/proxy/src/app.rs index e8d916b73..e83fb1858 100644 --- a/proxy/src/app.rs +++ b/proxy/src/app.rs @@ -5,4 +5,3 @@ pub fn init() -> Result { logging::init(); Config::load_from_env() } - diff --git a/proxy/src/bind.rs b/proxy/src/bind.rs index 72feaac4b..539d4fe14 100644 --- a/proxy/src/bind.rs +++ b/proxy/src/bind.rs @@ -1,7 +1,7 @@ use std::io; use std::marker::PhantomData; -use std::sync::Arc; use std::net::SocketAddr; +use std::sync::Arc; use std::sync::atomic::AtomicUsize; use std::time::Duration; @@ -65,14 +65,14 @@ impl Bind<(), B> { pub fn with_connect_timeout(self, connect_timeout: Duration) -> Self { Self { connect_timeout, - .. self + ..self } } pub fn with_sensors(self, sensors: telemetry::Sensors) -> Self { Self { sensors, - .. self + ..self } } @@ -87,7 +87,6 @@ impl Bind<(), B> { _p: PhantomData, } } - } impl Clone for Bind { @@ -139,14 +138,14 @@ where &self.executor, ); - self.sensors.connect(c, &client_ctx ) + self.sensors.connect(c, &client_ctx) }; // Establishes an HTTP/2.0 connection let client = tower_h2::client::Client::new( connect, self.h2_builder.clone(), - ::logging::context_executor(("client", *addr), self.executor.clone()) + ::logging::context_executor(("client", *addr), self.executor.clone()), ); let h2_proxy = self.sensors.http(self.req_ids.clone(), client, &client_ctx); diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 1d536a629..fd7c6c206 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -1,7 +1,7 @@ use std::env; use std::net::SocketAddr; -use std::str::FromStr; use std::path::PathBuf; +use std::str::FromStr; use std::time::Duration; use url::{Host, HostAndPort, Url}; @@ -109,7 +109,7 @@ const ENV_RESOLV_CONF: &str = "CONDUIT_RESOLV_CONF"; const DEFAULT_EVENT_BUFFER_CAPACITY: usize = 10_000; // FIXME const DEFAULT_METRICS_FLUSH_INTERVAL_SECS: u64 = 10; const DEFAULT_PRIVATE_LISTENER: &str = "tcp://127.0.0.1:4140"; -const DEFAULT_PUBLIC_LISTENER: &str = "tcp://0.0.0.0:4143"; +const DEFAULT_PUBLIC_LISTENER: &str = "tcp://0.0.0.0:4143"; const DEFAULT_CONTROL_LISTENER: &str = "tcp://0.0.0.0:4190"; const DEFAULT_CONTROL_URL: &str = "tcp://proxy-api.conduit.svc.cluster.local:8086"; const DEFAULT_RESOLV_CONF: &str = "/etc/resolv.conf"; @@ -123,16 +123,16 @@ impl Config { None => DEFAULT_EVENT_BUFFER_CAPACITY, Some(c) => match c.parse() { Ok(c) => c, - Err(_) => return Err(Error::NotANumber(c)) - } + Err(_) => return Err(Error::NotANumber(c)), + }, }; let metrics_flush_interval = match env::var(ENV_METRICS_FLUSH_INTERVAL_SECS).ok() { None => Duration::from_secs(DEFAULT_METRICS_FLUSH_INTERVAL_SECS), Some(c) => match c.parse() { Ok(c) => Duration::from_secs(c), - Err(_) => return Err(Error::NotANumber(c)) - } + Err(_) => return Err(Error::NotANumber(c)), + }, }; Ok(Config { @@ -147,11 +147,13 @@ impl Config { }, private_forward: Addr::from_env_opt(ENV_PRIVATE_FORWARD)?, - public_connect_timeout: env::var(ENV_PUBLIC_CONNECT_TIMEOUT).ok() + public_connect_timeout: env::var(ENV_PUBLIC_CONNECT_TIMEOUT) + .ok() .and_then(|c| c.parse().ok()) .map(Duration::from_millis), - private_connect_timeout: env::var(ENV_PRIVATE_CONNECT_TIMEOUT).ok() + private_connect_timeout: env::var(ENV_PRIVATE_CONNECT_TIMEOUT) + .ok() .and_then(|c| c.parse().ok()) .map(Duration::from_millis), @@ -159,7 +161,10 @@ impl Config { .unwrap_or_else(|_| DEFAULT_RESOLV_CONF.into()) .into(), - control_host_and_port: control_host_and_port_from_env(ENV_CONTROL_URL, DEFAULT_CONTROL_URL)?, + control_host_and_port: control_host_and_port_from_env( + ENV_CONTROL_URL, + DEFAULT_CONTROL_URL, + )?, event_buffer_capacity, metrics_flush_interval, }) @@ -177,8 +182,7 @@ impl Addr { } fn from_env_or(key: &str, default: &str) -> Result { - let s = env::var(key) - .unwrap_or_else(|_| default.into()); + let s = env::var(key).unwrap_or_else(|_| default.into()); s.parse() } @@ -190,16 +194,20 @@ impl FromStr for Addr { fn from_str(s: &str) -> Result { match Url::parse(s) { Err(_) => Err(Error::InvalidAddr), - Ok(u) => { - match u.scheme() { - "tcp" => match u.with_default_port(|_| Err(())) { - Ok(HostAndPort { host: Host::Ipv4(ip), port }) => Ok(Addr(SocketAddr::new(ip.into(), port))), - Ok(HostAndPort { host: Host::Ipv6(ip), port }) => Ok(Addr(SocketAddr::new(ip.into(), port))), - _ => Err(Error::InvalidAddr), - }, + Ok(u) => match u.scheme() { + "tcp" => match u.with_default_port(|_| Err(())) { + Ok(HostAndPort { + host: Host::Ipv4(ip), + port, + }) => Ok(Addr(SocketAddr::new(ip.into(), port))), + Ok(HostAndPort { + host: Host::Ipv6(ip), + port, + }) => Ok(Addr(SocketAddr::new(ip.into(), port))), _ => Err(Error::InvalidAddr), - } - } + }, + _ => Err(Error::InvalidAddr), + }, } } } @@ -212,21 +220,37 @@ impl From for SocketAddr { fn control_host_and_port_from_env(key: &str, default: &str) -> Result { let s = env::var(key).unwrap_or_else(|_| default.into()); - let url = Url::parse(&s).map_err(|_| Error::ControlPlaneConfigError(s.clone(), UrlError::SyntaxError))?; - let host = url.host().ok_or_else(|| Error::ControlPlaneConfigError(s.clone(), UrlError::MissingHost))? + let url = Url::parse(&s).map_err(|_| { + Error::ControlPlaneConfigError(s.clone(), UrlError::SyntaxError) + })?; + let host = url.host() + .ok_or_else(|| { + Error::ControlPlaneConfigError(s.clone(), UrlError::MissingHost) + })? .to_owned(); if url.scheme() != "tcp" { - return Err(Error::ControlPlaneConfigError(s.clone(), UrlError::UnsupportedScheme)); + return Err(Error::ControlPlaneConfigError( + s.clone(), + UrlError::UnsupportedScheme, + )); } - let port = url.port().ok_or_else(|| Error::ControlPlaneConfigError(s.clone(), UrlError::MissingPort))?; + let port = url.port().ok_or_else(|| { + Error::ControlPlaneConfigError(s.clone(), UrlError::MissingPort) + })?; if url.path() != "/" { - return Err(Error::ControlPlaneConfigError(s.clone(), UrlError::PathNotAllowed)); + return Err(Error::ControlPlaneConfigError( + s.clone(), + UrlError::PathNotAllowed, + )); } if url.fragment().is_some() { - return Err(Error::ControlPlaneConfigError(s.clone(), UrlError::FragmentNotAllowed)); + return Err(Error::ControlPlaneConfigError( + s.clone(), + UrlError::FragmentNotAllowed, + )); } Ok(HostAndPort { host, - port + port, }) } diff --git a/proxy/src/connection.rs b/proxy/src/connection.rs index af45ee173..5b0071e9a 100644 --- a/proxy/src/connection.rs +++ b/proxy/src/connection.rs @@ -23,7 +23,9 @@ pub struct Handshake { impl Connection { /// Establish a connection backed by the provided `io`. pub fn handshake(io: PlaintextSocket) -> Handshake { - Handshake { plaintext_socket: Some(io) } + Handshake { + plaintext_socket: Some(io), + } } } diff --git a/proxy/src/control/codec.rs b/proxy/src/control/codec.rs index 344e498ab..4acfbefe0 100644 --- a/proxy/src/control/codec.rs +++ b/proxy/src/control/codec.rs @@ -50,13 +50,11 @@ impl Codec for Protobuf { trace!("decode; bytes={}", buf.remaining()); match Message::decode(buf) { - Ok(msg) => { - Ok(msg) - }, + Ok(msg) => Ok(msg), Err(err) => { debug!("decode error: {:?}", err); Err(err) - }, + } } } } diff --git a/proxy/src/control/discovery.rs b/proxy/src/control/discovery.rs index 51d845dd2..96c484fc7 100644 --- a/proxy/src/control/discovery.rs +++ b/proxy/src/control/discovery.rs @@ -1,24 +1,20 @@ -use std::collections::hash_map::{Entry, HashMap}; use std::collections::{HashSet, VecDeque}; +use std::collections::hash_map::{Entry, HashMap}; use std::net::SocketAddr; use futures::{Async, Future, Poll, Stream}; use futures::sync::mpsc; use http::uri::Authority; use tower::Service; -use tower_discover::{Discover, Change}; +use tower_discover::{Change, Discover}; use tower_grpc; use super::codec::Protobuf; use super::pb::common::{Destination, TcpAddress}; use super::pb::proxy::destination::Update as PbUpdate; +use super::pb::proxy::destination::client::Destination as DestinationSvc; +use super::pb::proxy::destination::client::destination_methods::Get as GetRpc; use super::pb::proxy::destination::update::Update as PbUpdate2; -use super::pb::proxy::destination::client::{ - Destination as DestinationSvc -}; -use super::pb::proxy::destination::client::destination_methods::{ - Get as GetRpc -}; pub type ClientBody = ::tower_grpc::client::codec::EncodingBody< Protobuf, @@ -44,16 +40,12 @@ pub struct Background { rx: mpsc::UnboundedReceiver<(Authority, mpsc::UnboundedSender)>, } -type DiscoveryWatch = - DestinationSet< - tower_grpc::client::Streaming< - tower_grpc::client::ResponseFuture< - Protobuf, - F, - >, - tower_grpc::client::codec::DecodingBody>, - > - >; +type DiscoveryWatch = DestinationSet< + tower_grpc::client::Streaming< + tower_grpc::client::ResponseFuture, F>, + tower_grpc::client::codec::DecodingBody>, + >, +>; /// A future returned from `Background::work()`, doing the work of talking to /// the controller destination API. @@ -97,9 +89,7 @@ pub trait Bind { type BindError; /// The discovered `Service` instance. - type Service: Service; + type Service: Service; /// Bind a socket address with a service. fn bind(&self, addr: &SocketAddr) -> Result; @@ -111,11 +101,14 @@ pub trait Bind { /// on the controller thread. pub fn new() -> (Discovery, Background) { let (tx, rx) = mpsc::unbounded(); - (Discovery { - tx, - }, Background { - rx, - }) + ( + Discovery { + tx, + }, + Background { + rx, + }, + ) } // ==== impl Discovery ===== @@ -125,7 +118,9 @@ impl Discovery { pub fn resolve(&self, authority: &Authority, bind: B) -> Watch { trace!("resolve; authority={:?}", authority); let (tx, rx) = mpsc::unbounded(); - self.tx.unbounded_send((authority.clone(), tx)).expect("unbounded can't fail"); + self.tx + .unbounded_send((authority.clone(), tx)) + .expect("unbounded can't fail"); Watch { rx, @@ -137,7 +132,8 @@ impl Discovery { // ==== impl Watch ===== impl Discover for Watch -where B: Bind, +where + B: Bind, { type Key = SocketAddr; type Request = B::Request; @@ -158,8 +154,7 @@ where B: Bind, match update { Update::Insert(addr) => { - let service = self.bind.bind(&addr) - .map_err(|_| ())?; + let service = self.bind.bind(&addr).map_err(|_| ())?; Ok(Async::Ready(Change::Insert(addr, service))) } @@ -186,16 +181,16 @@ impl Background { impl DiscoveryWork where - F: Future>, + F: Future>, F::Error: ::std::fmt::Debug, { pub fn poll_rpc(&mut self, client: &mut S) where S: Service< - Request=::http::Request, - Response=F::Item, - Error=F::Error, - Future=F, + Request = ::http::Request, + Response = F::Item, + Error = F::Error, + Future = F, >, { // This loop is make sure any streams that were found disconnected @@ -215,10 +210,10 @@ where fn poll_new_watches(&mut self, mut client: &mut S) where S: Service< - Request=::http::Request, - Response=F::Item, - Error=F::Error, - Future=F, + Request = ::http::Request, + Response = F::Item, + Error = F::Error, + Future = F, >, { loop { @@ -226,11 +221,11 @@ where match client.poll_ready() { Ok(Async::Ready(())) => { self.rpc_ready = true; - }, + } Ok(Async::NotReady) => { self.rpc_ready = false; break; - }, + } Err(err) => { warn!("Destination.Get poll_ready error: {:?}", err); self.rpc_ready = false; @@ -252,7 +247,7 @@ where match self.destinations.entry(auth) { Entry::Occupied(mut occ) => { occ.get_mut().tx = tx; - }, + } Entry::Vacant(vac) => { let req = Destination { scheme: "k8s".into(), @@ -267,11 +262,11 @@ where }); } } - }, + } Ok(Async::Ready(None)) => { trace!("Discover tx is dropped, shutdown?"); return; - }, + } Ok(Async::NotReady) => break, Err(_) => unreachable!("unbounded receiver doesn't error"), } @@ -282,10 +277,10 @@ where fn poll_reconnect(&mut self, client: &mut S) -> bool where S: Service< - Request=::http::Request, - Response=F::Item, - Error=F::Error, - Future=F, + Request = ::http::Request, + Response = F::Item, + Error = F::Error, + Future = F, >, { debug_assert!(self.rpc_ready); @@ -316,40 +311,37 @@ where } let needs_reconnect = 'set: loop { match set.rx.poll() { - Ok(Async::Ready(Some(update))) => { - match update.update { - Some(PbUpdate2::Add(a_set)) => { - for addr in a_set.addrs { - if let Some(addr) = addr.addr.and_then(pb_to_sock_addr) { - if set.addrs.insert(addr) { - trace!("update {:?} for {:?}", addr, auth); - let _ = set.tx.unbounded_send(Update::Insert(addr)); - } - } + Ok(Async::Ready(Some(update))) => match update.update { + Some(PbUpdate2::Add(a_set)) => for addr in a_set.addrs { + if let Some(addr) = addr.addr.and_then(pb_to_sock_addr) { + if set.addrs.insert(addr) { + trace!("update {:?} for {:?}", addr, auth); + let _ = set.tx.unbounded_send(Update::Insert(addr)); } - }, - Some(PbUpdate2::Remove(r_set)) => { - for addr in r_set.addrs { - if let Some(addr) = pb_to_sock_addr(addr) { - if set.addrs.remove(&addr) { - trace!("remove {:?} for {:?}", addr, auth); - let _ = set.tx.unbounded_send(Update::Remove(addr)); - } - } + } + }, + Some(PbUpdate2::Remove(r_set)) => for addr in r_set.addrs { + if let Some(addr) = pb_to_sock_addr(addr) { + if set.addrs.remove(&addr) { + trace!("remove {:?} for {:?}", addr, auth); + let _ = set.tx.unbounded_send(Update::Remove(addr)); } - }, - None => (), - } + } + }, + None => (), }, Ok(Async::Ready(None)) => { - trace!("Destination.Get stream ended for {:?}, must reconnect", auth); + trace!( + "Destination.Get stream ended for {:?}, must reconnect", + auth + ); break 'set true; - }, + } Ok(Async::NotReady) => break 'set false, Err(err) => { warn!("Destination.Get stream errored for {:?}: {:?}", auth, err); break 'set true; - }, + } } }; if needs_reconnect { @@ -363,8 +355,9 @@ where // ===== impl Bind ===== impl Bind for F -where F: Fn(&SocketAddr) -> Result, - S: Service, +where + F: Fn(&SocketAddr) -> Result, + S: Service, { type Request = S::Request; type Response = S::Response; @@ -378,8 +371,8 @@ where F: Fn(&SocketAddr) -> Result, } fn pb_to_sock_addr(pb: TcpAddress) -> Option { - use std::net::{Ipv4Addr, Ipv6Addr}; use super::pb::common::ip_address::Ip; + use std::net::{Ipv4Addr, Ipv6Addr}; /* current structure is: TcpAddress { @@ -401,7 +394,7 @@ fn pb_to_sock_addr(pb: TcpAddress) -> Option { Some(Ip::Ipv4(octets)) => { let ipv4 = Ipv4Addr::from(octets); Some(SocketAddr::from((ipv4, pb.port as u16))) - }, + } Some(Ip::Ipv6(v6)) => { let octets = [ (v6.first >> 56) as u8, @@ -423,7 +416,7 @@ fn pb_to_sock_addr(pb: TcpAddress) -> Option { ]; let ipv6 = Ipv6Addr::from(octets); Some(SocketAddr::from((ipv6, pb.port as u16))) - }, + } None => None, }, None => None, diff --git a/proxy/src/control/mod.rs b/proxy/src/control/mod.rs index 88bce4eb8..8f692f3b2 100644 --- a/proxy/src/control/mod.rs +++ b/proxy/src/control/mod.rs @@ -2,7 +2,7 @@ use std::marker::PhantomData; use std::time::{Duration, Instant}; use bytes::Bytes; -use futures::{Async, Future, future, Poll, Stream}; +use futures::{future, Async, Future, Poll, Stream}; use h2; use http; use tokio_core::reactor::{Handle, Timeout}; @@ -59,15 +59,22 @@ impl Control { // ===== impl Background ===== impl Background { - pub fn bind(self, events: S, host_and_port: HostAndPort, dns_config: dns::Config, executor: &Handle) -> Box> + pub fn bind( + self, + events: S, + host_and_port: HostAndPort, + dns_config: dns::Config, + executor: &Handle, + ) -> Box> where - S: Stream + 'static, + S: Stream + 'static, { // Build up the Controller Client Stack let mut client = { let ctx = ("controller-client", format!("{}", host_and_port)); let scheme = http::uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap(); - let authority = http::uri::Authority::from_shared(format!("{}", host_and_port).into()).unwrap(); + let authority = + http::uri::Authority::from_shared(format!("{}", host_and_port).into()).unwrap(); let dns_resolver = dns::Resolver::new(dns_config, executor); let connect = TimeoutConnect::new( @@ -78,7 +85,7 @@ impl Background { let h2_client = tower_h2::client::Client::new( connect, h2::client::Builder::default(), - ::logging::context_executor(ctx, executor.clone()) + ::logging::context_executor(ctx, executor.clone()), ); @@ -148,7 +155,7 @@ where self.waiting = true; self.timer.reset(Instant::now() + self.wait_dur); Ok(Async::NotReady) - }, + } ok => ok, } } @@ -177,7 +184,7 @@ impl AddOrigin { impl Service for AddOrigin where - S: Service>, + S: Service>, { type Request = http::Request; type Response = S::Response; @@ -205,7 +212,7 @@ struct EnumService(S, PhantomData); impl Service for EnumService where - S: Service>, + S: Service>, B: Into, { type Request = http::Request; @@ -256,7 +263,6 @@ impl tower_h2::Body for GrpcEncodingBody { GrpcEncodingBody::DestinationGet(ref mut b) => b.poll_trailers(), } } - } impl From for GrpcEncodingBody { diff --git a/proxy/src/control/observe.rs b/proxy/src/control/observe.rs index b810254a6..23a545291 100644 --- a/proxy/src/control/observe.rs +++ b/proxy/src/control/observe.rs @@ -1,17 +1,17 @@ use std::sync::{Arc, Mutex}; -use futures::{future, Stream, Poll}; +use futures::{future, Poll, Stream}; use futures_mpsc_lossy; use ordermap::OrderMap; use tower_grpc::{self, Request, Response}; use tower_grpc::codegen::server::grpc::ServerStreamingService; use control::pb::common::TapEvent; -use control::pb::proxy::tap::{ObserveRequest}; +use control::pb::proxy::tap::ObserveRequest; +use convert::*; use ctx; use telemetry::Event; use telemetry::tap::{Tap, Taps}; -use convert::*; #[derive(Clone, Debug)] pub struct Observe { @@ -58,10 +58,14 @@ impl ServerStreamingService for Observe { } let (_, req) = req.into_http().into_parts(); - let (tap, rx) = match req.match_.and_then(|m| Tap::new(&m, self.tap_capacity).ok()) { + let (tap, rx) = match req.match_ + .and_then(|m| Tap::new(&m, self.tap_capacity).ok()) + { Some(m) => m, None => { - return future::err(tower_grpc::Error::Grpc(tower_grpc::Status::INVALID_ARGUMENT)); + return future::err(tower_grpc::Error::Grpc( + tower_grpc::Status::INVALID_ARGUMENT, + )); } }; diff --git a/proxy/src/control/pb.rs b/proxy/src/control/pb.rs index f389648ed..8790f8326 100644 --- a/proxy/src/control/pb.rs +++ b/proxy/src/control/pb.rs @@ -7,9 +7,9 @@ use std::sync::Arc; use http; +use convert::*; use ctx; use telemetry::Event; -use convert::*; // re-export proxy here since we dont care about the other dirs pub use self::proxy::*; @@ -57,7 +57,7 @@ fn pb_response_end( source: Some((&ctx.server.remote).into()), target: Some((&ctx.client.remote).into()), event: Some(tap_event::Event::Http(tap_event::Http { - event: Some(tap_event::http::Event::ResponseEnd(end)) + event: Some(tap_event::http::Event::ResponseEnd(end)), })), } } @@ -73,7 +73,8 @@ impl fmt::Display for InvalidMethod { } impl Error for InvalidMethod { - #[inline] fn description(&self) -> &str { + #[inline] + fn description(&self) -> &str { "invalid http method" } } @@ -88,7 +89,8 @@ impl fmt::Display for InvalidScheme { } impl Error for InvalidScheme { - #[inline] fn description(&self) -> &str { + #[inline] + fn description(&self) -> &str { "invalid http scheme" } } @@ -103,7 +105,8 @@ impl fmt::Display for UnknownEvent { } impl Error for UnknownEvent { - #[inline] fn description(&self) -> &str { + #[inline] + fn description(&self) -> &str { "unknown tap event" } } @@ -124,7 +127,8 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent { }), method: Some((&ctx.method).into()), scheme: ctx.uri.scheme().map(|s| s.into()), - authority: ctx.uri.authority_part() + authority: ctx.uri + .authority_part() .map(|a| a.as_str()) .unwrap_or_default() .into(), @@ -135,10 +139,10 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent { source: Some((&ctx.server.remote).into()), target: Some((&ctx.client.remote).into()), event: Some(tap_event::Event::Http(tap_event::Http { - event: Some(tap_event::http::Event::RequestInit(init)) + event: Some(tap_event::http::Event::RequestInit(init)), })), } - }, + } Event::StreamResponseOpen(ref ctx, ref rsp) => { let init = tap_event::http::ResponseInit { @@ -155,40 +159,30 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent { source: Some((&ctx.request.server.remote).into()), target: Some((&ctx.request.client.remote).into()), event: Some(tap_event::Event::Http(tap_event::Http { - event: Some(tap_event::http::Event::ResponseInit(init)) + event: Some(tap_event::http::Event::ResponseInit(init)), })), } } Event::StreamRequestFail(ref ctx, ref fail) => { - pb_response_end( - ctx, - fail.since_request_open, - None, - 0, - 0, - ) + pb_response_end(ctx, fail.since_request_open, None, 0, 0) } - Event::StreamResponseEnd(ref ctx, ref end) => { - pb_response_end( - &ctx.request, - end.since_request_open, - Some(end.since_response_open), - end.bytes_sent, - end.grpc_status.unwrap_or(0), - ) - } + Event::StreamResponseEnd(ref ctx, ref end) => pb_response_end( + &ctx.request, + end.since_request_open, + Some(end.since_response_open), + end.bytes_sent, + end.grpc_status.unwrap_or(0), + ), - Event::StreamResponseFail(ref ctx, ref fail) => { - pb_response_end( - &ctx.request, - fail.since_request_open, - Some(fail.since_response_open), - fail.bytes_sent, - 0, - ) - } + Event::StreamResponseFail(ref ctx, ref fail) => pb_response_end( + &ctx.request, + fail.since_request_open, + Some(fail.since_response_open), + fail.bytes_sent, + 0, + ), _ => return Err(UnknownEvent), }; @@ -200,36 +194,34 @@ impl<'a> TryFrom<&'a Event> for common::TapEvent { impl<'a> TryFrom<&'a common::http_method::Type> for http::Method { type Err = InvalidMethod; fn try_from(m: &'a common::http_method::Type) -> Result { - use http::HttpTryFrom; use self::common::http_method::*; + use http::HttpTryFrom; match *m { - Type::Registered(reg) => { - if reg == Registered::Get.into() { - Ok(http::Method::GET) - } else if reg == Registered::Post.into() { - Ok(http::Method::POST) - } else if reg == Registered::Put.into() { - Ok(http::Method::PUT) - } else if reg == Registered::Delete.into() { - Ok(http::Method::DELETE) - } else if reg == Registered::Patch.into() { - Ok(http::Method::PATCH) - } else if reg == Registered::Options.into() { - Ok(http::Method::OPTIONS) - } else if reg == Registered::Connect.into() { - Ok(http::Method::CONNECT) - } else if reg == Registered::Head.into() { - Ok(http::Method::HEAD) - } else if reg == Registered::Trace.into() { - Ok(http::Method::TRACE) - } else { - Err(InvalidMethod) - } + Type::Registered(reg) => if reg == Registered::Get.into() { + Ok(http::Method::GET) + } else if reg == Registered::Post.into() { + Ok(http::Method::POST) + } else if reg == Registered::Put.into() { + Ok(http::Method::PUT) + } else if reg == Registered::Delete.into() { + Ok(http::Method::DELETE) + } else if reg == Registered::Patch.into() { + Ok(http::Method::PATCH) + } else if reg == Registered::Options.into() { + Ok(http::Method::OPTIONS) + } else if reg == Registered::Connect.into() { + Ok(http::Method::CONNECT) + } else if reg == Registered::Head.into() { + Ok(http::Method::HEAD) + } else if reg == Registered::Trace.into() { + Ok(http::Method::TRACE) + } else { + Err(InvalidMethod) + }, + Type::Unregistered(ref m) => { + HttpTryFrom::try_from(m.as_str()).map_err(|_| InvalidMethod) } - Type::Unregistered(ref m) => - HttpTryFrom::try_from(m.as_str()) - .map_err(|_| InvalidMethod), } } } @@ -240,15 +232,13 @@ impl<'a> TryInto for &'a common::scheme::Type { use self::common::scheme::*; match *self { - Type::Registered(reg) => { - if reg == Registered::Http.into() { - Ok("http".into()) - } else if reg == Registered::Https.into() { - Ok("https".into()) - } else { - Err(InvalidScheme) - } - } + Type::Registered(reg) => if reg == Registered::Http.into() { + Ok("http".into()) + } else if reg == Registered::Https.into() { + Ok("https".into()) + } else { + Err(InvalidScheme) + }, Type::Unregistered(ref s) => Ok(s.clone()), } } @@ -275,7 +265,7 @@ impl<'a> From<&'a http::Method> for common::http_method::Type { impl<'a> From<&'a http::Method> for common::HttpMethod { fn from(m: &'a http::Method) -> Self { common::HttpMethod { - type_: Some(m.into()) + type_: Some(m.into()), } } } @@ -295,18 +285,18 @@ impl<'a> From<&'a str> for common::scheme::Type { impl<'a> From<&'a str> for common::Scheme { fn from(s: &'a str) -> Self { common::Scheme { - type_: Some(s.into()) + type_: Some(s.into()), } } } // ===== impl common::IpAddress ===== -impl From for common::IpAddress -where - common::ip_address::Ip: From +impl From for common::IpAddress +where + common::ip_address::Ip: From, { - #[inline] + #[inline] fn from(ip: T) -> Self { Self { ip: Some(ip.into()), @@ -317,14 +307,12 @@ where impl From<::std::net::IpAddr> for common::IpAddress { fn from(ip: ::std::net::IpAddr) -> Self { match ip { - ::std::net::IpAddr::V4(v4) => - Self { - ip: Some(v4.into()), - }, - ::std::net::IpAddr::V6(v6) => - Self { - ip: Some(v6.into()), - }, + ::std::net::IpAddr::V4(v4) => Self { + ip: Some(v4.into()), + }, + ::std::net::IpAddr::V6(v6) => Self { + ip: Some(v6.into()), + }, } } } @@ -333,22 +321,14 @@ impl From<::std::net::IpAddr> for common::IpAddress { impl From<[u8; 16]> for common::IPv6 { fn from(octets: [u8; 16]) -> Self { - let first = (u64::from(octets[0]) << 56) - + (u64::from(octets[1]) << 48) - + (u64::from(octets[2]) << 40) - + (u64::from(octets[3]) << 32) - + (u64::from(octets[4]) << 24) - + (u64::from(octets[5]) << 16) - + (u64::from(octets[6]) << 8) - + u64::from(octets[7]); - let last = (u64::from(octets[8]) << 56) - + (u64::from(octets[9]) << 48) - + (u64::from(octets[10]) << 40) - + (u64::from(octets[11]) << 32) - + (u64::from(octets[12]) << 24) - + (u64::from(octets[13]) << 16) - + (u64::from(octets[14]) << 8) - + u64::from(octets[15]); + let first = (u64::from(octets[0]) << 56) + (u64::from(octets[1]) << 48) + + (u64::from(octets[2]) << 40) + (u64::from(octets[3]) << 32) + + (u64::from(octets[4]) << 24) + (u64::from(octets[5]) << 16) + + (u64::from(octets[6]) << 8) + u64::from(octets[7]); + let last = (u64::from(octets[8]) << 56) + (u64::from(octets[9]) << 48) + + (u64::from(octets[10]) << 40) + (u64::from(octets[11]) << 32) + + (u64::from(octets[12]) << 24) + (u64::from(octets[13]) << 16) + + (u64::from(octets[14]) << 8) + u64::from(octets[15]); Self { first, last, @@ -357,7 +337,8 @@ impl From<[u8; 16]> for common::IPv6 { } impl From<::std::net::Ipv6Addr> for common::IPv6 { - #[inline] fn from(v6: ::std::net::Ipv6Addr) -> Self { + #[inline] + fn from(v6: ::std::net::Ipv6Addr) -> Self { Self::from(v6.octets()) } } @@ -382,25 +363,25 @@ impl<'a> From<&'a common::IPv6> for ::std::net::Ipv6Addr { impl From<[u8; 4]> for common::ip_address::Ip { fn from(octets: [u8; 4]) -> Self { common::ip_address::Ip::Ipv4( - u32::from(octets[0]) << 24 | - u32::from(octets[1]) << 16 | - u32::from(octets[2]) << 8 | - u32::from(octets[3]) + u32::from(octets[0]) << 24 | u32::from(octets[1]) << 16 | u32::from(octets[2]) << 8 + | u32::from(octets[3]), ) } } impl From<::std::net::Ipv4Addr> for common::ip_address::Ip { - #[inline] fn from(v4: ::std::net::Ipv4Addr) -> Self { + #[inline] + fn from(v4: ::std::net::Ipv4Addr) -> Self { Self::from(v4.octets()) } } impl From for common::ip_address::Ip -where - common::IPv6: From +where + common::IPv6: From, { - #[inline] fn from(t: T) -> Self { + #[inline] + fn from(t: T) -> Self { common::ip_address::Ip::Ipv6(common::IPv6::from(t)) } } @@ -429,5 +410,8 @@ fn pb_duration(d: &::std::time::Duration) -> ::prost_types::Duration { d.subsec_nanos() as i32 }; - ::prost_types::Duration { seconds, nanos } + ::prost_types::Duration { + seconds, + nanos, + } } diff --git a/proxy/src/control/telemetry.rs b/proxy/src/control/telemetry.rs index 034fdbb56..5d1123ca0 100644 --- a/proxy/src/control/telemetry.rs +++ b/proxy/src/control/telemetry.rs @@ -6,12 +6,8 @@ use tower_grpc; use super::codec::Protobuf; use super::pb::proxy::telemetry::{ReportRequest, ReportResponse}; -use super::pb::proxy::telemetry::client::{ - Telemetry as TelemetrySvc, -}; -use super::pb::proxy::telemetry::client::telemetry_methods::{ - Report as ReportRpc, -}; +use super::pb::proxy::telemetry::client::Telemetry as TelemetrySvc; +use super::pb::proxy::telemetry::client::telemetry_methods::Report as ReportRpc; pub type ClientBody = tower_grpc::client::codec::EncodingBody< Protobuf, @@ -20,12 +16,9 @@ pub type ClientBody = tower_grpc::client::codec::EncodingBody< type TelemetryStream = tower_grpc::client::BodyFuture< tower_grpc::client::Unary< - tower_grpc::client::ResponseFuture< - Protobuf, - F - >, + tower_grpc::client::ResponseFuture, F>, Protobuf, - > + >, >; #[derive(Debug)] @@ -36,9 +29,9 @@ pub struct Telemetry { impl Telemetry where - T: Stream, + T: Stream, T::Error: ::std::fmt::Debug, - F: Future>, + F: Future>, F::Error: ::std::fmt::Debug, { pub fn new(reports: T) -> Self { @@ -51,10 +44,10 @@ where pub fn poll_rpc(&mut self, client: &mut S) where S: Service< - Request=::http::Request, - Response=F::Item, - Error=F::Error, - Future=F, + Request = ::http::Request, + Response = F::Item, + Error = F::Error, + Future = F, >, { let grpc = tower_grpc::Client::new(Protobuf::new(), client); @@ -70,57 +63,58 @@ where trace!("report in flight to controller for {:?}", t0.elapsed()); self.in_flight = Some((t0, fut)); } - Ok(Async::Ready(_)) => trace!("report sent to controller in {:?}", t0.elapsed()), + Ok(Async::Ready(_)) => { + trace!("report sent to controller in {:?}", t0.elapsed()) + } Err(err) => warn!("controller error: {:?}", err), } } - let controller_ready = self.in_flight.is_none() && - match rpc.poll_ready() { - Ok(Async::Ready(_)) => true, - Ok(Async::NotReady) => { - trace!("controller unavailable"); - false - } - Err(err) => { - warn!("controller error: {:?}", err); - false - } - }; + let controller_ready = self.in_flight.is_none() && match rpc.poll_ready() { + Ok(Async::Ready(_)) => true, + Ok(Async::NotReady) => { + trace!("controller unavailable"); + false + } + Err(err) => { + warn!("controller error: {:?}", err); + false + } + }; match self.reports.poll() { Ok(Async::NotReady) => { return; - }, + } Ok(Async::Ready(None)) => { error!("report stream complete"); return; - }, + } Err(err) => { warn!("report stream error: {:?}", err); - }, + } Ok(Async::Ready(Some(report))) => { // Attempt to send the report. Continue looping so that `reports` is // polled until it's not ready. if !controller_ready { info!( "report dropped; requests={} accepts={} connects={}", - report.requests.len(), - report.server_transports.len(), - report.client_transports.len(), + report.requests.len(), + report.server_transports.len(), + report.client_transports.len(), ); } else { trace!( "report sent; requests={} accepts={} connects={}", - report.requests.len(), - report.server_transports.len(), - report.client_transports.len(), + report.requests.len(), + report.server_transports.len(), + report.client_transports.len(), ); let rep = TelemetrySvc::new(&mut rpc).report(report); self.in_flight = Some((Instant::now(), rep)); } - }, + } } } } diff --git a/proxy/src/ctx/mod.rs b/proxy/src/ctx/mod.rs index e8d5811d9..6118c6168 100644 --- a/proxy/src/ctx/mod.rs +++ b/proxy/src/ctx/mod.rs @@ -7,9 +7,9 @@ //! As a rule, context types should implement `Clone + Send + Sync`. This allows them to //! be stored in `http::Extensions`, for instance. Furthermore, because these contexts //! will be sent to a telemetry processing thread, we want to avoid excessive cloning. +use control::pb::proxy::telemetry as proto; use std::env; use std::sync::Arc; -use control::pb::proxy::telemetry as proto; pub mod http; pub mod transport; @@ -51,22 +51,21 @@ impl Process { Arc::new(Self { node: node.into(), scheduled_instance: instance.into(), - scheduled_namespace: ns.into() + scheduled_namespace: ns.into(), }) } /// Construct a new `Process` from environment variables. pub fn from_env() -> Arc { fn get_var(key: &str) -> String { - env::var(key) - .unwrap_or_else(|why| { - warn!( - "Process::from_env(): Failed to get value of {} environment variable: {:?}", - key, - why - ); - String::from("") - }) + env::var(key).unwrap_or_else(|why| { + warn!( + "Process::from_env(): Failed to get value of {} environment variable: {:?}", + key, + why + ); + String::from("") + }) } let node = get_var(::config::ENV_NODE_NAME); @@ -103,7 +102,7 @@ impl Proxy { pub fn is_inbound(&self) -> bool { match *self { Proxy::Inbound(_) => true, - Proxy::Outbound(_) => false + Proxy::Outbound(_) => false, } } diff --git a/proxy/src/dns.rs b/proxy/src/dns.rs index 27a0c3469..aea13d62a 100644 --- a/proxy/src/dns.rs +++ b/proxy/src/dns.rs @@ -1,9 +1,9 @@ -use futures::prelude::*; -use std::net::IpAddr; -use domain; -use ns_dns_tokio; use abstract_ns; use abstract_ns::HostResolve; +use domain; +use futures::prelude::*; +use ns_dns_tokio; +use std::net::IpAddr; use std::path::Path; use std::str::FromStr; use tokio_core::reactor::Handle; @@ -40,7 +40,9 @@ impl Config { impl Resolver { pub fn new(config: Config, executor: &Handle) -> Self { - Resolver(ns_dns_tokio::DnsResolver::new_from_resolver(domain::resolv::Resolver::from_conf(executor, config.0))) + Resolver(ns_dns_tokio::DnsResolver::new_from_resolver( + domain::resolv::Resolver::from_conf(executor, config.0), + )) } pub fn resolve_host(&self, host: &url::Host) -> IpAddrFuture { @@ -65,15 +67,12 @@ impl Future for IpAddrFuture { fn poll(&mut self) -> Poll { match *self { - IpAddrFuture::DNS(ref mut inner) => { - match inner.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(ips)) => - ips.pick_one() - .map(Async::Ready) - .ok_or(Error::NoAddressesFound), - Err(e) => Err(Error::ResolutionFailed(e)), - } + IpAddrFuture::DNS(ref mut inner) => match inner.poll() { + Ok(Async::NotReady) => Ok(Async::NotReady), + Ok(Async::Ready(ips)) => ips.pick_one() + .map(Async::Ready) + .ok_or(Error::NoAddressesFound), + Err(e) => Err(Error::ResolutionFailed(e)), }, IpAddrFuture::Fixed(addr) => Ok(Async::Ready(addr)), IpAddrFuture::InvalidDNSName(ref name) => Err(Error::InvalidDNSName(name.clone())), diff --git a/proxy/src/inbound.rs b/proxy/src/inbound.rs index 20a343c48..d011c5293 100644 --- a/proxy/src/inbound.rs +++ b/proxy/src/inbound.rs @@ -32,29 +32,25 @@ type CtxtExec = ::logging::ContextualExecutor<(&'static str, SocketAddr), Handle // ===== impl Inbound ===== impl Inbound { - pub fn new( - default_addr: Option, - bind: Bind - ) -> Self { + pub fn new(default_addr: Option, bind: Bind) -> Self { Self { default_addr, - bind + bind, } } fn same_addr(a0: &SocketAddr, a1: &SocketAddr) -> bool { - (a0.port() == a1.port()) && - match (a0.ip(), a1.ip()) { + (a0.port() == a1.port()) && match (a0.ip(), a1.ip()) { (IpAddr::V6(a0), IpAddr::V4(a1)) => a0.to_ipv4() == Some(a1), (IpAddr::V4(a0), IpAddr::V6(a1)) => Some(a0) == a1.to_ipv4(), - (a0, a1) => (a0 == a1) + (a0, a1) => (a0 == a1), } } } impl Recognize for Inbound where - B: tower_h2::Body + 'static + B: tower_h2::Body + 'static, { type Request = http::Request; type Response = http::Response>; @@ -66,11 +62,7 @@ where >; type Key = SocketAddr; type RouteError = (); - type Service = Buffer, - B, - tower_h2::RecvBody - >>>; + type Service = Buffer, B, tower_h2::RecvBody>>>; fn recognize(&self, req: &Self::Request) -> Option { let key = req.extensions() @@ -82,8 +74,11 @@ where Some(orig_dst) => { // If the original destination is actually the listening socket, // we don't want to create a loop. - if Self::same_addr(&orig_dst, &ctx.local) { None } - else { Some(orig_dst) } + if Self::same_addr(&orig_dst, &ctx.local) { + None + } else { + Some(orig_dst) + } } } }) @@ -107,8 +102,7 @@ where // is not ideal. // // TODO: Don't use unbounded buffering. - Buffer::new(self.bind.bind_service(addr), self.bind.executor()) - .map_err(|_| {}) + Buffer::new(self.bind.bind_service(addr), self.bind.executor()).map_err(|_| {}) } } @@ -122,9 +116,9 @@ mod tests { use tokio_core::reactor::Core; use tower_router::Recognize; + use super::Inbound; use bind::Bind; use ctx; - use super::Inbound; fn new_inbound(default: Option, ctx: &Arc) -> Inbound<()> { let core = Core::new().unwrap(); diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index c62e0dd51..f38b00f41 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -74,8 +74,8 @@ mod tower_fn; // TODO: move to tower-fn use bind::Bind; use control::pb::proxy::tap; use inbound::Inbound; -use outbound::Outbound; use map_err::MapErr; +use outbound::Outbound; /// Runs a sidecar proxy. /// @@ -99,13 +99,13 @@ pub struct Main { impl Main { pub fn new(config: config::Config) -> Self { - - let control_listener = StdTcpListener::bind(SocketAddr::from(config.control_listener.addr)) + let control_listener = StdTcpListener::bind(SocketAddr::from(config.control_listener.addr)) .expect("controller listener bind"); let inbound_listener = StdTcpListener::bind(SocketAddr::from(config.public_listener.addr)) .expect("public listener bind"); - let outbound_listener = StdTcpListener::bind(SocketAddr::from(config.private_listener.addr)) - .expect("private listener bind"); + let outbound_listener = StdTcpListener::bind( + SocketAddr::from(config.private_listener.addr), + ).expect("private listener bind"); Self { config, @@ -134,7 +134,7 @@ impl Main { pub fn run_until(self, shutdown_signal: F) where - F: Future, + F: Future, { let Main { config, @@ -146,10 +146,7 @@ impl Main { let control_host_and_port = config.control_host_and_port.clone(); info!("using controller at {:?}", control_host_and_port); - info!( - "routing on {:?}", - outbound_listener.local_addr().unwrap(), - ); + info!("routing on {:?}", outbound_listener.local_addr().unwrap(),); info!( "proxying on {:?} to {:?}", inbound_listener.local_addr().unwrap(), @@ -170,8 +167,7 @@ impl Main { let dns_config = dns::Config::from_file(&config.resolv_conf_path); - let bind = Bind::new(executor.clone()) - .with_sensors(sensors.clone()); + let bind = Bind::new(executor.clone()).with_sensors(sensors.clone()); // Setup the public listener. This will listen on a publicly accessible // address and listen for inbound connections that should be forwarded @@ -179,7 +175,9 @@ impl Main { let inbound = { let ctx = ctx::Proxy::inbound(&process_ctx); - let timeout = config.private_connect_timeout.unwrap_or_else(|| Duration::from_millis(20)); + let timeout = config + .private_connect_timeout + .unwrap_or_else(|| Duration::from_millis(20)); let bind = bind.clone() .with_connect_timeout(timeout) .with_ctx(ctx.clone()); @@ -203,11 +201,9 @@ impl Main { let outbound = { let ctx = ctx::Proxy::outbound(&process_ctx); - let bind = config.public_connect_timeout - .map_or_else( - || bind.clone(), - |t| bind.clone().with_connect_timeout(t), - ) + let bind = config + .public_connect_timeout + .map_or_else(|| bind.clone(), |t| bind.clone().with_connect_timeout(t)) .with_ctx(ctx.clone()); let fut = serve( @@ -233,36 +229,33 @@ impl Main { let (taps, observe) = control::Observe::new(100); - let new_service = tap::server::Tap::new_service() - .observe(observe); + let new_service = tap::server::Tap::new_service().observe(observe); let server = serve_control( control_listener, h2::server::Builder::default(), new_service, - &executor + &executor, ); let telemetry = telemetry .make_control(&taps, &executor) .expect("bad news in telemetry town"); - let client = control_bg.bind( - telemetry, - control_host_and_port, - dns_config, - &executor - ); + let client = + control_bg.bind(telemetry, control_host_and_port, dns_config, &executor); let fut = client.join(server.map_err(|_| {})).map(|_| {}); executor.spawn(::logging::context_future("controller-client", fut)); let shutdown = controller_shutdown_signal.then(|_| Ok::<(), ()>(())); core.run(shutdown).expect("controller api"); - }).expect("initialize controller api thread"); + }) + .expect("initialize controller api thread"); } - let fut = inbound.join(outbound) + let fut = inbound + .join(outbound) .map(|_| ()) .map_err(|err| error!("main error: {:?}", err)); @@ -288,12 +281,12 @@ where Response = http::Response>, Error = E, RouteError = F, - > + 'static, + > + + 'static, { let listen_addr = listen.local_addr().expect("local addr"); - let bind = TcpListener::from_listener(listen, &listen_addr, &executor) - .expect("bind"); + let bind = TcpListener::from_listener(listen, &listen_addr, &executor).expect("bind"); let router = Router::new(recognize); let stack = NewServiceFn::new(move || { @@ -304,19 +297,28 @@ where MapErr::new(router) }); - let server = Server::new(stack, h2_builder, ::logging::context_executor(("serve", listen_addr) , executor.clone())); + let server = Server::new( + stack, + h2_builder, + ::logging::context_executor(("serve", listen_addr), executor.clone()), + ); let f = bind.incoming().fold( (server, proxy_ctx, sensors, executor), move |(server, proxy_ctx, sensors, executor), (socket, remote_addr)| { if let Err(e) = socket.set_nodelay(true) { - warn!("could not set TCP_NODELAY on {:?}/{:?}: {}", - socket.local_addr(), socket.peer_addr(), e); + warn!( + "could not set TCP_NODELAY on {:?}/{:?}: {}", + socket.local_addr(), + socket.peer_addr(), + e + ); } let opened_at = Instant::now(); let orig_dst = transport::get_original_dst(&socket); let local_addr = socket.local_addr().unwrap_or(listen_addr); - let srv_ctx = ctx::transport::Server::new(&proxy_ctx, &local_addr, &remote_addr, &orig_dst); + let srv_ctx = + ctx::transport::Server::new(&proxy_ctx, &local_addr, &remote_addr, &orig_dst); connection::Connection::handshake(socket).map(move |session| { let io = sensors.accept(session, opened_at, &srv_ctx); @@ -345,22 +347,22 @@ fn serve_control( ) -> Box + 'static> where B: Body + 'static, - N: NewService< - Request = http::Request, - Response = http::Response, - > + 'static, + N: NewService, Response = http::Response> + 'static, { let listen_addr = listen.local_addr().expect("local addr"); - let bind = TcpListener::from_listener(listen, &listen_addr, executor) - .expect("bind"); + let bind = TcpListener::from_listener(listen, &listen_addr, executor).expect("bind"); let server = Server::new(new_service, h2_builder, executor.clone()); let f = bind.incoming().fold( (server, executor.clone()), move |(server, executor), (socket, _)| { if let Err(e) = socket.set_nodelay(true) { - warn!("could not set TCP_NODELAY on {:?}/{:?}: {}", - socket.local_addr(), socket.peer_addr(), e); + warn!( + "could not set TCP_NODELAY on {:?}/{:?}: {}", + socket.local_addr(), + socket.peer_addr(), + e + ); } connection::Connection::handshake(socket).map(move |session| { diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index f41edd1e2..8f33f62d8 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -4,9 +4,9 @@ use std::fmt; use std::rc::Rc; use chrono::Utc; -use futures::{Future, Poll}; -use futures::future::{Executor, ExecuteError}; use env_logger::LogBuilder; +use futures::{Future, Poll}; +use futures::future::{ExecuteError, Executor}; use log::LogLevel; const ENV_LOG: &str = "CONDUIT_PROXY_LOG"; @@ -39,7 +39,6 @@ pub fn init() { .parse(&env::var(ENV_LOG).unwrap_or_default()) .init() .expect("logger"); - } /// Execute a closure with a `Debug` item attached to allow log messages. @@ -112,7 +111,7 @@ impl Executor for ContextualExecutor where T: ::std::fmt::Debug + 'static, E: Executor, F>>, - F: Future, + F: Future, { fn execute(&self, future: F) -> Result<(), ExecuteError> { let fut = context_future(self.context.clone(), future); diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 5eb1b819a..8b4514c15 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -11,8 +11,7 @@ fn main() { Err(e) => { eprintln!("configuration error: {:#?}", e); process::exit(64) - }, + } }; - conduit_proxy::Main::new(config) - .run(); + conduit_proxy::Main::new(config).run(); } diff --git a/proxy/src/map_err.rs b/proxy/src/map_err.rs index d8ae01a48..1d002aa73 100644 --- a/proxy/src/map_err.rs +++ b/proxy/src/map_err.rs @@ -21,19 +21,24 @@ pub struct ResponseFuture { // ===== impl MapErr ===== impl MapErr -where T: Service, - E: Debug, +where + T: Service, + E: Debug, { /// Crete a new `MapErr` pub fn new(inner: T) -> Self { - MapErr { inner, _p: PhantomData } + MapErr { + inner, + _p: PhantomData, + } } } impl Service for MapErr -where T: Service, Error = E>, - B: Default, - E: Debug, +where + T: Service, Error = E>, + B: Default, + E: Debug, { type Request = T::Request; type Response = T::Response; @@ -48,30 +53,33 @@ where T: Service, Error = E>, fn call(&mut self, request: Self::Request) -> Self::Future { let inner = self.inner.call(request); - ResponseFuture { inner, _p: PhantomData } + ResponseFuture { + inner, + _p: PhantomData, + } } } // ===== impl ResponseFuture ===== impl Future for ResponseFuture -where T: Future, Error = E>, - B: Default, - E: Debug, +where + T: Future, Error = E>, + B: Default, + E: Debug, { type Item = T::Item; type Error = h2::Error; fn poll(&mut self) -> Poll { - self.inner.poll() - .or_else(|e| { - error!("turning h2 error into 500: {:?}", e); - let response = http::Response::builder() - .status(500) - .body(Default::default()) - .unwrap(); + self.inner.poll().or_else(|e| { + error!("turning h2 error into 500: {:?}", e); + let response = http::Response::builder() + .status(500) + .body(Default::default()) + .unwrap(); - Ok(response.into()) - }) + Ok(response.into()) + }) } } diff --git a/proxy/src/outbound.rs b/proxy/src/outbound.rs index 483fbd3b7..b2041aec7 100644 --- a/proxy/src/outbound.rs +++ b/proxy/src/outbound.rs @@ -20,10 +20,10 @@ type Error = tower_buffer::Error< tower_balance::Error< tower_reconnect::Error< tower_h2::client::Error, - tower_h2::client::ConnectError> + tower_h2::client::ConnectError>, >, (), - > + >, >; pub struct Outbound { @@ -44,7 +44,7 @@ impl Outbound { impl Recognize for Outbound where - B: tower_h2::Body + 'static + B: tower_h2::Body + 'static, { type Request = http::Request; type Response = http::Response>; @@ -66,9 +66,10 @@ where /// /// Buffering is currently unbounded and does not apply timeouts. This must be /// changed. - fn bind_service(&mut self, authority: &http::uri::Authority) - -> Result - { + fn bind_service( + &mut self, + authority: &http::uri::Authority, + ) -> Result { debug!("building outbound client to {:?}", authority); let resolve = self.discovery.resolve(authority, self.bind.clone()); @@ -79,7 +80,6 @@ where // which is not ideal. // // TODO: Don't use unbounded buffering. - Buffer::new(balance, self.bind.executor()) - .map_err(|_| {}) + Buffer::new(balance, self.bind.executor()).map_err(|_| {}) } } diff --git a/proxy/src/telemetry/control.rs b/proxy/src/telemetry/control.rs index ff9fb013e..e7a426584 100644 --- a/proxy/src/telemetry/control.rs +++ b/proxy/src/telemetry/control.rs @@ -1,16 +1,16 @@ use std::{fmt, io}; -use std::time::{Duration, Instant}; use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; use futures::{Async, Future, Poll, Stream}; use futures_mpsc_lossy::Receiver; use tokio_core::reactor::{Handle, Timeout}; -use ctx; use super::event::Event; -use super::tap::Taps; use super::metrics::Metrics; +use super::tap::Taps; use control::pb::telemetry::ReportRequest; +use ctx; /// A `Control` which has been configured but not initialized. #[derive(Debug)] @@ -64,7 +64,7 @@ impl MakeControl { /// - `rx`: the `Receiver` side of the channel on which events are sent. /// - `flush_interval`: the maximum amount of time between sending reports to the /// controller. - pub (super) fn new( + pub(super) fn new( rx: Receiver, flush_interval: Duration, process_ctx: &Arc, @@ -138,7 +138,8 @@ impl Control { /// Reset the flush timeout. fn reset_timeout(&mut self) { trace!("flushing in {:?}", self.flush_interval); - self.flush_timeout.reset(Instant::now() + self.flush_interval); + self.flush_timeout + .reset(Instant::now() + self.flush_interval); } fn recv(&mut self) -> Async> { @@ -185,7 +186,9 @@ impl Stream for Control { } Async::Ready(None) => { warn!("events finished"); - let report = self.metrics.take().map(|mut m| Self::generate_report(&mut m)); + let report = self.metrics + .take() + .map(|mut m| Self::generate_report(&mut m)); if report.is_none() { return Ok(Async::Ready(None)); } @@ -226,7 +229,10 @@ impl fmt::Debug for Control { .field("rx", &self.rx) .field("taps", &self.taps) .field("flush_interval", &self.flush_interval) - .field("flush_timeout", &format!("Timeout({:?})", &self.flush_interval)) + .field( + "flush_timeout", + &format!("Timeout({:?})", &self.flush_interval), + ) .finish() } } diff --git a/proxy/src/telemetry/event.rs b/proxy/src/telemetry/event.rs index 60891351c..5ddcaa1de 100644 --- a/proxy/src/telemetry/event.rs +++ b/proxy/src/telemetry/event.rs @@ -76,18 +76,17 @@ impl Event { pub fn is_transport(&self) -> bool { match *self { - Event::TransportOpen(_) | - Event::TransportClose(_, _) => true, + Event::TransportOpen(_) | Event::TransportClose(_, _) => true, _ => false, } } pub fn proxy(&self) -> &Arc { match *self { - Event::TransportOpen(ref ctx) | - Event::TransportClose(ref ctx, _) => ctx.proxy(), - Event::StreamRequestOpen(ref req) | - Event::StreamRequestFail(ref req, _) => &req.server.proxy, + Event::TransportOpen(ref ctx) | Event::TransportClose(ref ctx, _) => ctx.proxy(), + Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { + &req.server.proxy + } Event::StreamResponseOpen(ref rsp, _) | Event::StreamResponseFail(ref rsp, _) | Event::StreamResponseEnd(ref rsp, _) => &rsp.request.server.proxy, diff --git a/proxy/src/telemetry/metrics.rs b/proxy/src/telemetry/metrics.rs index 414f0f697..260fdf687 100644 --- a/proxy/src/telemetry/metrics.rs +++ b/proxy/src/telemetry/metrics.rs @@ -1,21 +1,18 @@ +use std::{u32, u64}; use std::net; use std::sync::Arc; use std::time::Duration; -use std::{u32, u64}; use http; use ordermap::OrderMap; -use ctx; -use control::pb::common::{ - TcpAddress, - HttpMethod, -}; +use control::pb::common::{HttpMethod, TcpAddress}; use control::pb::proxy::telemetry::{ eos_ctx, ClientTransport, EosCtx, EosScope, + Latency as PbLatency, ReportRequest, RequestCtx, RequestScope, @@ -24,9 +21,9 @@ use control::pb::proxy::telemetry::{ ServerTransport, StreamSummary, TransportSummary, - Latency as PbLatency, }; -use telemetry::event::{Event}; +use ctx; +use telemetry::event::Event; #[derive(Debug)] pub struct Metrics { @@ -113,24 +110,27 @@ impl Metrics { match *event { Event::TransportOpen(ref transport) => { self.transport(transport).connects += 1; - }, + } Event::TransportClose(ref transport, ref close) => { - self.transport(transport).disconnects.push(TransportSummary { - duration_ms: dur_to_ms(close.duration), - bytes_sent: 0, - }); - }, + self.transport(transport) + .disconnects + .push(TransportSummary { + duration_ms: dur_to_ms(close.duration), + bytes_sent: 0, + }); + } Event::StreamRequestOpen(ref req) => { self.request(req).count += 1; - }, + } Event::StreamRequestFail(ref req, ref fail) => { let stats = self.request(req) .responses .entry(None) .or_insert_with(Default::default); - let ends = stats.ends + let ends = stats + .ends .entry(End::Reset(fail.error.into())) .or_insert_with(Default::default); @@ -145,11 +145,11 @@ impl Metrics { bytes_sent: 0, frames_sent: 0, }); - }, + } Event::StreamResponseOpen(ref res, ref open) => { self.response(res).latencies.add(open.since_request_open); - }, + } Event::StreamResponseFail(ref res, ref fail) => { self.response_end(res, End::Reset(fail.error.into())) .push(EndStats { @@ -157,7 +157,7 @@ impl Metrics { bytes_sent: fail.bytes_sent, frames_sent: fail.frames_sent, }); - }, + } Event::StreamResponseEnd(ref res, ref end) => { let e = end.grpc_status.map(End::Grpc).unwrap_or(End::Other); self.response_end(res, e).push(EndStats { @@ -165,7 +165,7 @@ impl Metrics { bytes_sent: end.bytes_sent, frames_sent: end.frames_sent, }); - }, + } } } @@ -177,11 +177,20 @@ impl Metrics { fn response<'a>(&mut self, res: &'a Arc) -> &mut ResponseStats { let req = self.request(&res.request); - req.responses.entry(Some(res.status)).or_insert_with(Default::default) + req.responses + .entry(Some(res.status)) + .or_insert_with(Default::default) } - fn response_end<'a>(&mut self, res: &'a Arc, end: End) -> &mut Vec { - self.response(res).ends.entry(end).or_insert_with(Default::default) + fn response_end<'a>( + &mut self, + res: &'a Arc, + end: End, + ) -> &mut Vec { + self.response(res) + .ends + .entry(end) + .or_insert_with(Default::default) } fn transport<'a>(&mut self, transport: &'a ctx::transport::Ctx) -> &mut TransportStats { @@ -192,11 +201,9 @@ impl Metrics { .entry(source) .or_insert_with(TransportStats::default) } - ctx::transport::Ctx::Client(ref c) => { - self.destinations - .entry(c.remote) - .or_insert_with(TransportStats::default) - } + ctx::transport::Ctx::Client(ref c) => self.destinations + .entry(c.remote) + .or_insert_with(TransportStats::default), } } @@ -232,7 +239,6 @@ impl Metrics { let mut ends = Vec::with_capacity(res_stats.ends.len()); for (end, end_stats) in res_stats.ends { - let mut streams = Vec::with_capacity(end_stats.len()); for stats in end_stats { @@ -257,8 +263,10 @@ impl Metrics { responses.push(ResponseScope { - ctx: status_code.map(|code| ResponseCtx { - http_status_code: u32::from(code.as_u16()), + ctx: status_code.map(|code| { + ResponseCtx { + http_status_code: u32::from(code.as_u16()), + } }), ends: ends, response_latencies: res_stats.latencies.into(), @@ -269,7 +277,8 @@ impl Metrics { ctx: Some(RequestCtx { method: Some(HttpMethod::from(&req.method)), path: req.uri.path().to_string(), - authority: req.uri.authority_part() + authority: req.uri + .authority_part() .map(|a| a.to_string()) .unwrap_or_else(String::new), source_ip: Some(req.source.into()), @@ -311,12 +320,10 @@ impl From for Latency { }; // divide the duration as ms by ten to get the value in tenths of a ms. - let as_tenths = as_ms - .and_then(|ms| ms.checked_div(10)) - .unwrap_or_else(|| { - debug!("{:?} too large to convert to tenths of a millisecond!", dur); - u32::MAX - }); + let as_tenths = as_ms.and_then(|ms| ms.checked_div(10)).unwrap_or_else(|| { + debug!("{:?} too large to convert to tenths of a millisecond!", dur); + u32::MAX + }); Latency(as_tenths) } @@ -337,12 +344,14 @@ impl Into> for Latencies { fn into(mut self) -> Vec { // NOTE: `OrderMap.drain` means we can reuse the allocated memory --- can we // ensure we're not allocating a new OrderMap after covnerting to pb? - self.0.drain(..) - .map(|(Latency(latency), count)| + self.0 + .drain(..) + .map(|(Latency(latency), count)| { PbLatency { latency, count, - }) + } + }) .collect() } } @@ -372,14 +381,29 @@ mod tests { assert!(latencies.0.is_empty()); latencies.add(Duration::from_secs(10)); - assert_eq!(latencies.0.get(&Latency::from(Duration::from_secs(10))), Some(&1)); + assert_eq!( + latencies.0.get(&Latency::from(Duration::from_secs(10))), + Some(&1) + ); latencies.add(Duration::from_secs(15)); - assert_eq!(latencies.0.get(&Latency::from(Duration::from_secs(10))), Some(&1)); - assert_eq!(latencies.0.get(&Latency::from(Duration::from_secs(15))), Some(&1)); + assert_eq!( + latencies.0.get(&Latency::from(Duration::from_secs(10))), + Some(&1) + ); + assert_eq!( + latencies.0.get(&Latency::from(Duration::from_secs(15))), + Some(&1) + ); latencies.add(Duration::from_secs(10)); - assert_eq!(latencies.0.get(&Latency::from(Duration::from_secs(10))), Some(&2)); - assert_eq!(latencies.0.get(&Latency::from(Duration::from_secs(15))), Some(&1)); + assert_eq!( + latencies.0.get(&Latency::from(Duration::from_secs(10))), + Some(&2) + ); + assert_eq!( + latencies.0.get(&Latency::from(Duration::from_secs(15))), + Some(&1) + ); } } diff --git a/proxy/src/telemetry/mod.rs b/proxy/src/telemetry/mod.rs index b004f9e26..b9002421b 100644 --- a/proxy/src/telemetry/mod.rs +++ b/proxy/src/telemetry/mod.rs @@ -13,7 +13,7 @@ mod metrics; pub mod sensor; pub mod tap; -pub use self::control::{MakeControl, Control}; +pub use self::control::{Control, MakeControl}; pub use self::event::Event; pub use self::sensor::Sensors; @@ -39,10 +39,6 @@ pub fn new( ) -> (Sensors, MakeControl) { let (tx, rx) = futures_mpsc_lossy::channel(capacity); let s = Sensors::new(tx); - let c = MakeControl::new( - rx, - flush_interval, - process, - ); + let c = MakeControl::new(rx, flush_interval, process); (s, c) } diff --git a/proxy/src/telemetry/sensor/http.rs b/proxy/src/telemetry/sensor/http.rs index 0c2a409a8..138d46bcd 100644 --- a/proxy/src/telemetry/sensor/http.rs +++ b/proxy/src/telemetry/sensor/http.rs @@ -77,17 +77,14 @@ impl NewHttp where A: Body + 'static, B: Body + 'static, - N: NewService< - Request = http::Request, - Response = http::Response, - Error = client::Error> - + 'static + N: NewService, Response = http::Response, Error = client::Error> + + 'static, { pub(super) fn new( next_id: Arc, new_service: N, handle: &super::Handle, - client_ctx: &Arc + client_ctx: &Arc, ) -> Self { Self { next_id, @@ -103,11 +100,8 @@ impl NewService for NewHttp where A: Body + 'static, B: Body + 'static, - N: NewService< - Request = http::Request, - Response = http::Response, - Error = client::Error> - + 'static + N: NewService, Response = http::Response, Error = client::Error> + + 'static, { type Request = N::Request; type Response = http::Response>; @@ -158,18 +152,15 @@ impl Service for Http where A: Body + 'static, B: Body + 'static, - S: Service< - Request = http::Request, - Response = http::Response, - Error = client::Error> - + 'static + S: Service, Response = http::Response, Error = client::Error> + + 'static, { type Request = S::Request; type Response = http::Response>; type Error = S::Error; type Future = Respond; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.service.poll_ready() } @@ -180,7 +171,8 @@ where let id = self.next_id.fetch_add(1, Ordering::SeqCst); let ctx = ctx::http::Request::new(&req, &ctx, &self.client_ctx, id); - self.handle.send(|| Event::StreamRequestOpen(Arc::clone(&ctx))); + self.handle + .send(|| Event::StreamRequestOpen(Arc::clone(&ctx))); Some(RespondInner { ctx, @@ -196,7 +188,7 @@ where Respond { future, inner, - _p: PhantomData + _p: PhantomData, } } } @@ -206,7 +198,7 @@ where impl Future for Respond where F: Future, Error = client::Error>, - B: Body + 'static + B: Body + 'static, { type Item = http::Response>; type Error = F::Error; @@ -230,7 +222,7 @@ where Arc::clone(&ctx), event::StreamResponseOpen { since_request_open: request_open.elapsed(), - } + }, ) }); @@ -249,7 +241,7 @@ where since_response_open: Duration::default(), bytes_sent: 0, frames_sent: 0, - } + }, ) }); @@ -294,7 +286,7 @@ where event::StreamRequestFail { error, since_request_open: request_open.elapsed(), - } + }, ) }); } @@ -341,21 +333,21 @@ impl ResponseBody { since_response_open: response_open.elapsed(), bytes_sent, frames_sent, - } + }, ) }); } } Err(e) - }, + } } } } impl Body for ResponseBody where - B: Body + 'static + B: Body + 'static, { /// The body chunk type type Data = ::Buf; @@ -406,7 +398,7 @@ where since_response_open: response_open.elapsed(), bytes_sent, frames_sent, - } + }, ) }) } diff --git a/proxy/src/telemetry/sensor/mod.rs b/proxy/src/telemetry/sensor/mod.rs index 356bda2c3..9785a7bab 100644 --- a/proxy/src/telemetry/sensor/mod.rs +++ b/proxy/src/telemetry/sensor/mod.rs @@ -15,7 +15,7 @@ use telemetry::event; pub mod http; mod transport; -pub use self::http::{NewHttp, Http}; +pub use self::http::{Http, NewHttp}; pub use self::transport::{Connect, Transport}; /// Accepts events from sensors. @@ -27,7 +27,10 @@ struct Handle(Option>); pub struct Sensors(Handle); impl Handle { - fn send(&mut self, mk: F) where F: FnOnce()-> event::Event { + fn send(&mut self, mk: F) + where + F: FnOnce() -> event::Event, + { if let Some(tx) = self.0.as_mut() { // We may want to capture timestamps here instead of on the consumer-side... That // level of precision doesn't necessarily seem worth it yet. @@ -51,8 +54,14 @@ impl Sensors { Sensors(Handle(None)) } - pub fn accept(&self, io: T, opened_at: Instant, ctx: &Arc) -> Transport - where T: AsyncRead + AsyncWrite + pub fn accept( + &self, + io: T, + opened_at: Instant, + ctx: &Arc, + ) -> Transport + where + T: AsyncRead + AsyncWrite, { debug!("server connection open"); let ctx = Arc::new(ctx::transport::Ctx::Server(Arc::clone(ctx))); @@ -60,7 +69,8 @@ impl Sensors { } pub fn connect(&self, connect: C, ctx: &Arc) -> Connect - where C: tokio_connect::Connect + where + C: tokio_connect::Connect, { Connect::new(connect, &self.0, ctx) } @@ -69,15 +79,13 @@ impl Sensors { &self, next_id: Arc, new_service: N, - client_ctx: &Arc + client_ctx: &Arc, ) -> NewHttp where A: Body + 'static, B: Body + 'static, - N: NewService< - Request = Request, - Response = Response, - Error = client::Error> + 'static + N: NewService, Response = Response, Error = client::Error> + + 'static, { NewHttp::new(next_id, new_service, &self.0, client_ctx) } diff --git a/proxy/src/telemetry/sensor/transport.rs b/proxy/src/telemetry/sensor/transport.rs index 672eab3b4..26c309768 100644 --- a/proxy/src/telemetry/sensor/transport.rs +++ b/proxy/src/telemetry/sensor/transport.rs @@ -43,7 +43,12 @@ pub struct Connecting { impl Transport { /// Wraps a transport with telemetry and emits a transport open event. - pub(super) fn open(io: T, opened_at: Instant, handle: &super::Handle, ctx: Arc) -> Self { + pub(super) fn open( + io: T, + opened_at: Instant, + handle: &super::Handle, + ctx: Arc, + ) -> Self { let mut handle = handle.clone(); handle.send(|| event::Event::TransportOpen(Arc::clone(&ctx))); @@ -70,27 +75,43 @@ impl Transport { Ok(v) => Ok(v), Err(e) => { if e.kind() != io::ErrorKind::WouldBlock { - if let Some(Inner { mut handle, ctx, opened_at }) = self.1.take() { + if let Some(Inner { + mut handle, + ctx, + opened_at, + }) = self.1.take() + { handle.send(move || { let duration = opened_at.elapsed(); - let ev = event::TransportClose { duration, clean: false }; + let ev = event::TransportClose { + duration, + clean: false, + }; event::Event::TransportClose(ctx, ev) }); } } Err(e) - }, + } } } } impl Drop for Transport { fn drop(&mut self) { - if let Some(Inner { mut handle, ctx, opened_at }) = self.1.take() { + if let Some(Inner { + mut handle, + ctx, + opened_at, + }) = self.1.take() + { handle.send(move || { let duration = opened_at.elapsed(); - let ev = event::TransportClose { clean: true, duration }; + let ev = event::TransportClose { + clean: true, + duration, + }; event::Event::TransportClose(ctx, ev) }); } diff --git a/proxy/src/telemetry/tap/match_.rs b/proxy/src/telemetry/tap/match_.rs index 21afa295c..598e3f858 100644 --- a/proxy/src/telemetry/tap/match_.rs +++ b/proxy/src/telemetry/tap/match_.rs @@ -1,5 +1,5 @@ -use std::net; use std::boxed::Box; +use std::net; use std::sync::Arc; use http; @@ -8,8 +8,8 @@ use ipnet::{Contains, Ipv4Net, Ipv6Net}; use super::Event; use control::pb::common::ip_address; use control::pb::tap::observe_request; -use ctx; use convert::*; +use ctx; #[derive(Clone, Debug)] pub(super) enum Match { @@ -40,7 +40,7 @@ pub(super) enum TcpMatch { #[derive(Clone, Debug)] pub(super) enum NetMatch { Net4(Ipv4Net), - Net6(Ipv6Net) , + Net6(Ipv6Net), } #[derive(Clone, Debug)] @@ -76,55 +76,44 @@ impl Match { Match::Not(ref not) => !not.matches(ev), - Match::Source(ref src) => { - match *ev { - Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { - src.matches(&req.server.remote) - }, - Event::StreamResponseOpen(ref rsp, _) | - Event::StreamResponseFail(ref rsp, _) | - Event::StreamResponseEnd(ref rsp, _) => { - src.matches(&rsp.request.server.remote) - }, - _ => false, + Match::Source(ref src) => match *ev { + Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { + src.matches(&req.server.remote) } - } + Event::StreamResponseOpen(ref rsp, _) | + Event::StreamResponseFail(ref rsp, _) | + Event::StreamResponseEnd(ref rsp, _) => src.matches(&rsp.request.server.remote), + _ => false, + }, - Match::Destination(ref dst) => { - match *ev { - Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { - dst.matches(&req.client.remote) - } - Event::StreamResponseOpen(ref rsp, _) | - Event::StreamResponseFail(ref rsp, _) | - Event::StreamResponseEnd(ref rsp, _) => { - dst.matches(&rsp.request.client.remote) - } - _ => false, + Match::Destination(ref dst) => match *ev { + Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { + dst.matches(&req.client.remote) } - } + Event::StreamResponseOpen(ref rsp, _) | + Event::StreamResponseFail(ref rsp, _) | + Event::StreamResponseEnd(ref rsp, _) => dst.matches(&rsp.request.client.remote), + _ => false, + }, - Match::Http(ref http) => { - match *ev { - Event::StreamRequestOpen(ref req) | - Event::StreamRequestFail(ref req, _) => { - http.matches(req) - } - - Event::StreamResponseOpen(ref rsp, _) | - Event::StreamResponseFail(ref rsp, _) | - Event::StreamResponseEnd(ref rsp, _) => { - http.matches(&rsp.request) - } - - _ => false, + Match::Http(ref http) => match *ev { + Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { + http.matches(req) } - } + + Event::StreamResponseOpen(ref rsp, _) | + Event::StreamResponseFail(ref rsp, _) | + Event::StreamResponseEnd(ref rsp, _) => http.matches(&rsp.request), + + _ => false, + }, } } pub(super) fn new(match_: &observe_request::Match) -> Result { - match_.match_.as_ref() + match_ + .match_ + .as_ref() .map(Match::try_from) .unwrap_or_else(|| Err(InvalidMatch::Empty)) } @@ -154,12 +143,10 @@ impl<'a> TryFrom<&'a observe_request::match_::Match> for Match { match_::Match::Any(ref seq) => Match::Any(Self::from_seq(seq)?), - match_::Match::Not(ref m) => { - match m.match_.as_ref() { - Some(m) => Match::Not(Box::new(Self::try_from(m)?)), - None => return Err(InvalidMatch::Empty), - } - } + match_::Match::Not(ref m) => match m.match_.as_ref() { + Some(m) => Match::Not(Box::new(Self::try_from(m)?)), + None => return Err(InvalidMatch::Empty), + }, match_::Match::Source(ref src) => Match::Source(TcpMatch::try_from(src)?), @@ -179,13 +166,9 @@ impl TcpMatch { match *self { // If either a minimum or maximum is not specified, the range is considered to // be over a discrete value. - TcpMatch::PortRange(min, max) => { - min <= addr.port() && addr.port() <= max - } + TcpMatch::PortRange(min, max) => min <= addr.port() && addr.port() <= max, - TcpMatch::Net(ref net) => { - net.matches(&addr.ip()) - }, + TcpMatch::Net(ref net) => net.matches(&addr.ip()), } } } @@ -228,21 +211,16 @@ impl<'a> TryFrom<&'a observe_request::match_::Tcp> for TcpMatch { impl NetMatch { fn matches(&self, addr: &net::IpAddr) -> bool { match *self { - NetMatch::Net4(ref net) => { - match *addr { - net::IpAddr::V6(_) => false, - net::IpAddr::V4(ref addr) => net.contains(addr), - } - } - NetMatch::Net6(ref net) => { - match *addr { - net::IpAddr::V4(_) => false, - net::IpAddr::V6(ref addr) => net.contains(addr), - } - } + NetMatch::Net4(ref net) => match *addr { + net::IpAddr::V6(_) => false, + net::IpAddr::V4(ref addr) => net.contains(addr), + }, + NetMatch::Net6(ref net) => match *addr { + net::IpAddr::V4(_) => false, + net::IpAddr::V6(ref addr) => net.contains(addr), + }, } } - } impl<'a> TryFrom<&'a observe_request::match_::tcp::Netmask> for NetMatch { @@ -263,7 +241,8 @@ impl<'a> TryFrom<&'a observe_request::match_::tcp::Netmask> for NetMatch { let net = match *ip { ip_address::Ip::Ipv4(ref n) => { - let net = Ipv4Net::new((*n).into(), mask).map_err(|_| InvalidMatch::InvalidNetwork)?; + let net = + Ipv4Net::new((*n).into(), mask).map_err(|_| InvalidMatch::InvalidNetwork)?; NetMatch::Net4(net) } ip_address::Ip::Ipv6(ref ip6) => { @@ -281,21 +260,14 @@ impl<'a> TryFrom<&'a observe_request::match_::tcp::Netmask> for NetMatch { impl HttpMatch { fn matches(&self, req: &Arc) -> bool { match *self { - HttpMatch::Scheme(ref m) => { - req.uri.scheme() - .map(|s| *m == s) - .unwrap_or(false) - } + HttpMatch::Scheme(ref m) => req.uri.scheme().map(|s| *m == s).unwrap_or(false), - HttpMatch::Method(ref m) => { - *m == req.method - } + HttpMatch::Method(ref m) => *m == req.method, - HttpMatch::Authority(ref m) => { - req.uri.authority_part() - .map(|a| Self::matches_string(m, a.as_str())) - .unwrap_or(false) - } + HttpMatch::Authority(ref m) => req.uri + .authority_part() + .map(|a| Self::matches_string(m, a.as_str())) + .unwrap_or(false), HttpMatch::Path(ref m) => Self::matches_string(m, req.uri.path()), } @@ -303,7 +275,7 @@ impl HttpMatch { fn matches_string( string_match: &observe_request::match_::http::string_match::Match, - value: &str + value: &str, ) -> bool { use control::pb::proxy::tap::observe_request::match_::http::string_match::Match::*; @@ -312,7 +284,6 @@ impl HttpMatch { Prefix(ref prefix) => value.starts_with(prefix), } } - } impl<'a> TryFrom<&'a observe_request::match_::Http> for HttpMatch { @@ -320,51 +291,45 @@ impl<'a> TryFrom<&'a observe_request::match_::Http> for HttpMatch { fn try_from(m: &'a observe_request::match_::Http) -> Result { use control::pb::proxy::tap::observe_request::match_::http::Match as Pb; - m.match_.as_ref() + m.match_ + .as_ref() .ok_or_else(|| InvalidMatch::Empty) - .and_then(|m| { - match *m { - Pb::Scheme(ref s) => { - s.type_.as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .and_then(|s| { - s.try_into() - .map(HttpMatch::Scheme) - .map_err(|_|InvalidMatch::InvalidScheme) - }) - } + .and_then(|m| match *m { + Pb::Scheme(ref s) => s.type_ + .as_ref() + .ok_or_else(|| InvalidMatch::Empty) + .and_then(|s| { + s.try_into() + .map(HttpMatch::Scheme) + .map_err(|_| InvalidMatch::InvalidScheme) + }), - Pb::Method(ref m) => { - m.type_.as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .and_then(|m| { - m.try_into() - .map(HttpMatch::Method) - .map_err(|_| InvalidMatch::InvalidHttpMethod) - }) + Pb::Method(ref m) => m.type_ + .as_ref() + .ok_or_else(|| InvalidMatch::Empty) + .and_then(|m| { + m.try_into() + .map(HttpMatch::Method) + .map_err(|_| InvalidMatch::InvalidHttpMethod) + }), - } + Pb::Authority(ref a) => a.match_ + .as_ref() + .ok_or_else(|| InvalidMatch::Empty) + .map(|a| HttpMatch::Authority(a.clone())), - Pb::Authority(ref a) => { - a.match_.as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .map(|a|HttpMatch::Authority(a.clone())) - } - - Pb::Path(ref p) => { - p.match_.as_ref() - .ok_or_else(|| InvalidMatch::Empty) - .map(|p| HttpMatch::Path(p.clone())) - } - } + Pb::Path(ref p) => p.match_ + .as_ref() + .ok_or_else(|| InvalidMatch::Empty) + .map(|p| HttpMatch::Path(p.clone())), }) } } #[cfg(test)] mod tests { - use std::net; use std::boxed::Box; + use std::net; use ipnet::{Contains, Ipv4Net, Ipv6Net}; use quickcheck::*; @@ -377,7 +342,7 @@ mod tests { fn arbitrary(g: &mut G) -> Self { ObserveRequest { limit: g.gen(), - match_: Arbitrary::arbitrary(g) + match_: Arbitrary::arbitrary(g), } } } @@ -385,7 +350,7 @@ mod tests { impl Arbitrary for observe_request::Match { fn arbitrary(g: &mut G) -> Self { observe_request::Match { - match_: Arbitrary::arbitrary(g) + match_: Arbitrary::arbitrary(g), } } } @@ -399,7 +364,7 @@ mod tests { 3 => observe_request::match_::Match::Source(Arbitrary::arbitrary(g)), 4 => observe_request::match_::Match::Destination(Arbitrary::arbitrary(g)), 5 => observe_request::match_::Match::Http(Arbitrary::arbitrary(g)), - _ => unreachable!() + _ => unreachable!(), } } } @@ -411,9 +376,12 @@ mod tests { } } - fn shrink(&self) -> Box> { - Box::new(self.matches.shrink() - .map(|matches| observe_request::match_::Seq { matches })) + fn shrink(&self) -> Box> { + Box::new(self.matches.shrink().map(|matches| { + observe_request::match_::Seq { + matches, + } + })) } } @@ -449,12 +417,15 @@ mod tests { impl Arbitrary for observe_request::match_::tcp::Netmask { fn arbitrary(g: &mut G) -> Self { let ip: Option = Arbitrary::arbitrary(g); - let mask = match ip.as_ref().and_then(|a| a.ip.as_ref()) { + let mask = match ip.as_ref().and_then(|a| a.ip.as_ref()) { Some(&ip_address::Ip::Ipv4(_)) => g.gen::() % 32 + 1, Some(&ip_address::Ip::Ipv6(_)) => g.gen::() % 128 + 1, None => 0u32, }; - observe_request::match_::tcp::Netmask { ip, mask } + observe_request::match_::tcp::Netmask { + ip, + mask, + } } } @@ -475,7 +446,7 @@ mod tests { 1 => http::Match::Method(HttpMethod::arbitrary(g)), 2 => http::Match::Authority(http::StringMatch::arbitrary(g)), 3 => http::Match::Path(http::StringMatch::arbitrary(g)), - _ => unreachable!() + _ => unreachable!(), } } } @@ -483,7 +454,7 @@ mod tests { impl Arbitrary for observe_request::match_::http::StringMatch { fn arbitrary(g: &mut G) -> Self { observe_request::match_::http::StringMatch { - match_: Arbitrary::arbitrary(g) + match_: Arbitrary::arbitrary(g), } } } @@ -495,7 +466,7 @@ mod tests { match g.gen::() % 2 { 0 => string_match::Match::Exact(String::arbitrary(g)), 1 => string_match::Match::Prefix(String::arbitrary(g)), - _ => unreachable!() + _ => unreachable!(), } } } @@ -503,7 +474,7 @@ mod tests { impl Arbitrary for IpAddress { fn arbitrary(g: &mut G) -> Self { IpAddress { - ip: Arbitrary::arbitrary(g) + ip: Arbitrary::arbitrary(g), } } } @@ -530,7 +501,7 @@ mod tests { impl Arbitrary for HttpMethod { fn arbitrary(g: &mut G) -> Self { HttpMethod { - type_: Arbitrary::arbitrary(g) + type_: Arbitrary::arbitrary(g), } } } @@ -547,7 +518,7 @@ mod tests { impl Arbitrary for Scheme { fn arbitrary(g: &mut G) -> Self { Scheme { - type_: Arbitrary::arbitrary(g) + type_: Arbitrary::arbitrary(g), } } } diff --git a/proxy/src/telemetry/tap/mod.rs b/proxy/src/telemetry/tap/mod.rs index 6f06b8068..9db6f1843 100644 --- a/proxy/src/telemetry/tap/mod.rs +++ b/proxy/src/telemetry/tap/mod.rs @@ -62,12 +62,16 @@ impl Taps { } impl Tap { - pub fn new(match_: &observe_request::Match, capacity: usize) - -> Result<(Tap, futures_mpsc_lossy::Receiver), InvalidMatch> - { + pub fn new( + match_: &observe_request::Match, + capacity: usize, + ) -> Result<(Tap, futures_mpsc_lossy::Receiver), InvalidMatch> { let (tx, rx) = futures_mpsc_lossy::channel(capacity); let match_ = Match::new(match_)?; - let tap = Tap { match_, tx }; + let tap = Tap { + match_, + tx, + }; Ok((tap, rx)) } diff --git a/proxy/src/tower_fn.rs b/proxy/src/tower_fn.rs index ce964e811..3c87236cc 100644 --- a/proxy/src/tower_fn.rs +++ b/proxy/src/tower_fn.rs @@ -1,22 +1,26 @@ use futures::future::{self, FutureResult}; -use tower::{Service, NewService}; +use tower::{NewService, Service}; pub struct NewServiceFn { f: T, } impl NewServiceFn -where T: Fn() -> N, - N: Service, +where + T: Fn() -> N, + N: Service, { pub fn new(f: T) -> Self { - NewServiceFn { f } + NewServiceFn { + f, + } } } impl NewService for NewServiceFn -where T: Fn() -> N, - N: Service, +where + T: Fn() -> N, + N: Service, { type Request = N::Request; type Response = N::Response; diff --git a/proxy/src/transport/connect.rs b/proxy/src/transport/connect.rs index de5352b92..884747900 100644 --- a/proxy/src/transport/connect.rs +++ b/proxy/src/transport/connect.rs @@ -54,8 +54,12 @@ impl Future for TcpStreamNewNoDelay { fn poll(&mut self) -> Poll { let tcp = try_ready!(self.0.poll()); if let Err(e) = tcp.set_nodelay(true) { - warn!("could not set TCP_NODELAY on {:?}/{:?}: {}", - tcp.local_addr(), tcp.peer_addr(), e); + warn!( + "could not set TCP_NODELAY on {:?}/{:?}: {}", + tcp.local_addr(), + tcp.peer_addr(), + e + ); } Ok(Async::Ready(tcp)) } @@ -90,7 +94,7 @@ impl LookupAddressAndConnect { pub fn new( host_and_port: url::HostAndPort, dns_resolver: dns::Resolver, - handle: &Handle + handle: &Handle, ) -> Self { Self { host_and_port, @@ -109,8 +113,11 @@ impl tokio_connect::Connect for LookupAddressAndConnect { let port = self.host_and_port.port; let handle = self.handle.clone(); let host = self.host_and_port.host.clone(); - let c = self.dns_resolver.resolve_host(&self.host_and_port.host) - .map_err(|_| io::Error::new(io::ErrorKind::NotFound, "DNS resolution failed")) + let c = self.dns_resolver + .resolve_host(&self.host_and_port.host) + .map_err(|_| { + io::Error::new(io::ErrorKind::NotFound, "DNS resolution failed") + }) .and_then(move |ip_addr: IpAddr| { info!("DNS resolved {} to {}", host, ip_addr); let addr = SocketAddr::from((ip_addr, port)); @@ -143,7 +150,11 @@ impl tokio_connect::Connect for TimeoutConnect { let connect = self.connect.connect(); let duration = self.timeout; let timeout = Timeout::new(duration, &self.handle).unwrap(); - TimeoutConnectFuture { connect, duration, timeout } + TimeoutConnectFuture { + connect, + duration, + timeout, + } } } diff --git a/proxy/src/transport/so_original_dst.rs b/proxy/src/transport/so_original_dst.rs index bb885df97..493632f2f 100644 --- a/proxy/src/transport/so_original_dst.rs +++ b/proxy/src/transport/so_original_dst.rs @@ -67,7 +67,7 @@ mod linux { ); let port = sa.sin_port; Ok(SocketAddr::V4(SocketAddrV4::new(ip, ntoh16(port)))) - }, + } libc::AF_INET6 => { assert!(len as usize >= mem::size_of::()); @@ -94,7 +94,7 @@ mod linux { Ok(SocketAddr::V6( SocketAddrV6::new(ip, ntoh16(port), flowinfo, scope_id), )) - }, + } _ => Err(io::Error::new( io::ErrorKind::InvalidInput, "invalid argument", diff --git a/proxy/tests/discovery.rs b/proxy/tests/discovery.rs index e1c2c6a4b..8486893fb 100644 --- a/proxy/tests/discovery.rs +++ b/proxy/tests/discovery.rs @@ -8,17 +8,11 @@ use self::support::*; fn outbound_asks_controller_api() { let _ = env_logger::init(); - let srv = server::new() - .route("/", "hello") - .route("/bye", "bye") - .run(); + let srv = server::new().route("/", "hello").route("/bye", "bye").run(); let ctrl = controller::new() .destination("test.conduit.local", srv.addr) .run(); - let proxy = proxy::new() - .controller(ctrl) - .outbound(srv) - .run(); + let proxy = proxy::new().controller(ctrl).outbound(srv).run(); let client = client::new(proxy.outbound, "test.conduit.local"); assert_eq!(client.get("/"), "hello"); @@ -29,17 +23,12 @@ fn outbound_asks_controller_api() { fn outbound_reconnects_if_controller_stream_ends() { let _ = env_logger::init(); - let srv = server::new() - .route("/recon", "nect") - .run(); + let srv = server::new().route("/recon", "nect").run(); let ctrl = controller::new() .destination_close("test.conduit.local") .destination("test.conduit.local", srv.addr) .run(); - let proxy = proxy::new() - .controller(ctrl) - .outbound(srv) - .run(); + let proxy = proxy::new().controller(ctrl).outbound(srv).run(); let client = client::new(proxy.outbound, "test.conduit.local"); assert_eq!(client.get("/recon"), "nect"); diff --git a/proxy/tests/support/client.rs b/proxy/tests/support/client.rs index 6eb46b7ed..d49627ba9 100644 --- a/proxy/tests/support/client.rs +++ b/proxy/tests/support/client.rs @@ -1,6 +1,6 @@ use support::*; -use self::futures::sync::{oneshot, mpsc}; +use self::futures::sync::{mpsc, oneshot}; use self::tokio_core::net::TcpStream; use self::tower_h2::client::Error; @@ -35,14 +35,14 @@ impl Client { .body(()) .unwrap(); let _ = self.tx.unbounded_send((req, tx)); - rx - .map_err(|_| panic!("client request dropped")) + rx.map_err(|_| panic!("client request dropped")) .and_then(|res| { let stream = RecvBodyStream(res.unwrap().into_parts().1); stream.concat2() - }).map(|body| { - ::std::str::from_utf8(&body).unwrap().to_string() - }).wait().unwrap() + }) + .map(|body| ::std::str::from_utf8(&body).unwrap().to_string()) + .wait() + .unwrap() } } @@ -56,17 +56,20 @@ fn run(addr: SocketAddr) -> Sender { let reactor = core.handle(); let conn = Conn(addr, reactor.clone()); - let h2 = tower_h2::Client::::new(conn, Default::default(), reactor.clone()); + let h2 = tower_h2::Client::::new( + conn, + Default::default(), + reactor.clone(), + ); let done = h2.new_service() .map_err(move |err| println!("connect error ({:?}): {:?}", addr, err)) .and_then(move |mut h2| { rx.for_each(move |(req, cb)| { - let fut = h2.call(req) - .then(|result| { - let _ = cb.send(result); - Ok(()) - }); + let fut = h2.call(req).then(|result| { + let _ = cb.send(result); + Ok(()) + }); reactor.spawn(fut); Ok(()) }) diff --git a/proxy/tests/support/controller.rs b/proxy/tests/support/controller.rs index 81bb511c5..6695006b8 100644 --- a/proxy/tests/support/controller.rs +++ b/proxy/tests/support/controller.rs @@ -35,7 +35,8 @@ impl Controller { } pub fn destination(mut self, dest: &str, addr: SocketAddr) -> Self { - self.destinations.push((dest.into(), Some(destination_update(addr)))); + self.destinations + .push((dest.into(), Some(destination_update(addr)))); self } @@ -68,7 +69,11 @@ struct Svc { } impl Svc { - fn route(&self, path: &str, body: RecvBodyStream) -> Box> { + fn route( + &self, + path: &str, + body: RecvBodyStream, + ) -> Box> { let mut rsp = http::Response::builder(); rsp.version(http::Version::HTTP_2); @@ -94,7 +99,7 @@ impl Svc { let rsp = rsp.body(body).unwrap(); Ok(rsp) })) - }, + } TELEMETRY_REPORT => { let mut reports = self.reports.clone(); Box::new(body.concat2().and_then(move |mut bytes| { @@ -121,7 +126,7 @@ impl Service for Svc { type Request = Request; type Response = Response; type Error = h2::Error; - type Future = Box>; + type Future = Box>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { Ok(Async::Ready(())) @@ -160,11 +165,7 @@ impl Body for GrpcBody { fn poll_data(&mut self) -> Poll, self::h2::Error> { let data = self.message.split_off(0); - let data = if data.is_empty() { - None - } else { - Some(data) - }; + let data = if data.is_empty() { None } else { Some(data) }; Ok(Async::Ready(data)) } @@ -231,12 +232,11 @@ fn run(controller: Controller) -> Listening { }); - core.handle() - .spawn( - serve - .map(|_| ()) - .map_err(|e| println!("controller error: {}", e)) - ); + core.handle().spawn( + serve + .map(|_| ()) + .map_err(|e| println!("controller error: {}", e)), + ); core.run(rx).unwrap(); }) @@ -251,32 +251,26 @@ fn run(controller: Controller) -> Listening { fn destination_update(addr: SocketAddr) -> pb::destination::Update { pb::destination::Update { - update: Some( - pb::destination::update::Update::Add( - pb::destination::WeightedAddrSet { - addrs: vec![ - pb::destination::WeightedAddr { - addr: Some( - pb::common::TcpAddress { - ip: Some(ip_conv(addr.ip())), - port: u32::from(addr.port()), - } - ), - weight: 0, - } - ], - } - ) - ), + update: Some(pb::destination::update::Update::Add( + pb::destination::WeightedAddrSet { + addrs: vec![ + pb::destination::WeightedAddr { + addr: Some(pb::common::TcpAddress { + ip: Some(ip_conv(addr.ip())), + port: u32::from(addr.port()), + }), + weight: 0, + }, + ], + }, + )), } } fn ip_conv(ip: IpAddr) -> pb::common::IpAddress { match ip { - IpAddr::V4(v4) => { - pb::common::IpAddress { - ip: Some(pb::common::ip_address::Ip::Ipv4(v4.into())), - } + IpAddr::V4(v4) => pb::common::IpAddress { + ip: Some(pb::common::ip_address::Ip::Ipv4(v4.into())), }, IpAddr::V6(v6) => { let (first, last) = octets_to_u64s(v6.octets()); @@ -291,21 +285,13 @@ fn ip_conv(ip: IpAddr) -> pb::common::IpAddress { } fn octets_to_u64s(octets: [u8; 16]) -> (u64, u64) { - let first = (u64::from(octets[0]) << 56) - + (u64::from(octets[1]) << 48) - + (u64::from(octets[2]) << 40) - + (u64::from(octets[3]) << 32) - + (u64::from(octets[4]) << 24) - + (u64::from(octets[5]) << 16) - + (u64::from(octets[6]) << 8) - + u64::from(octets[7]); - let last = (u64::from(octets[8]) << 56) - + (u64::from(octets[9]) << 48) - + (u64::from(octets[10]) << 40) - + (u64::from(octets[11]) << 32) - + (u64::from(octets[12]) << 24) - + (u64::from(octets[13]) << 16) - + (u64::from(octets[14]) << 8) - + u64::from(octets[15]); + let first = (u64::from(octets[0]) << 56) + (u64::from(octets[1]) << 48) + + (u64::from(octets[2]) << 40) + (u64::from(octets[3]) << 32) + + (u64::from(octets[4]) << 24) + (u64::from(octets[5]) << 16) + + (u64::from(octets[6]) << 8) + u64::from(octets[7]); + let last = (u64::from(octets[8]) << 56) + (u64::from(octets[9]) << 48) + + (u64::from(octets[10]) << 40) + (u64::from(octets[11]) << 32) + + (u64::from(octets[12]) << 24) + (u64::from(octets[13]) << 16) + + (u64::from(octets[14]) << 8) + u64::from(octets[15]); (first, last) } diff --git a/proxy/tests/support/mod.rs b/proxy/tests/support/mod.rs index c86f202ff..5a179d8a0 100644 --- a/proxy/tests/support/mod.rs +++ b/proxy/tests/support/mod.rs @@ -2,29 +2,29 @@ extern crate bytes; extern crate conduit_proxy; -pub extern crate env_logger; extern crate futures; extern crate h2; extern crate http; extern crate prost; -extern crate tokio_core; extern crate tokio_connect; +extern crate tokio_core; extern crate tower; extern crate tower_h2; extern crate url; +pub extern crate env_logger; -use std::net::SocketAddr; -pub use std::time::Duration; use self::bytes::{BigEndian, Bytes, BytesMut}; pub use self::futures::*; use self::futures::sync::oneshot; -use self::http::{Request, HeaderMap}; +use self::http::{HeaderMap, Request}; use self::http::header::HeaderValue; use self::tokio_connect::Connect; use self::tokio_core::net::TcpListener; use self::tokio_core::reactor::{Core, Handle}; use self::tower::{NewService, Service}; use self::tower_h2::{Body, RecvBody}; +use std::net::SocketAddr; +pub use std::time::Duration; pub mod client; pub mod controller; @@ -32,7 +32,11 @@ pub mod proxy; pub mod server; pub type Shutdown = oneshot::Sender<()>; -pub type ShutdownRx = future::Then, Result<(), ()>, fn(Result<(), oneshot::Canceled>) -> Result<(), ()>>; +pub type ShutdownRx = future::Then< + oneshot::Receiver<()>, + Result<(), ()>, + fn(Result<(), oneshot::Canceled>) -> Result<(), ()>, +>; pub fn shutdown_signal() -> (oneshot::Sender<()>, ShutdownRx) { let (tx, rx) = oneshot::channel(); diff --git a/proxy/tests/support/server.rs b/proxy/tests/support/server.rs index 149c93ba3..ce76ab2c1 100644 --- a/proxy/tests/support/server.rs +++ b/proxy/tests/support/server.rs @@ -39,7 +39,11 @@ impl Server { let mut core = Core::new().unwrap(); let reactor = core.handle(); - let h2 = tower_h2::Server::new(NewSvc(Arc::new(self.routes)), Default::default(), reactor.clone()); + let h2 = tower_h2::Server::new( + NewSvc(Arc::new(self.routes)), + Default::default(), + reactor.clone(), + ); let addr = ([127, 0, 0, 1], 0).into(); let bind = TcpListener::bind(&addr, &reactor).expect("bind"); @@ -60,12 +64,11 @@ impl Server { Ok((h2, reactor)) }); - core.handle() - .spawn( - serve - .map(|_| ()) - .map_err(|e| println!("server error: {}", e)) - ); + core.handle().spawn( + serve + .map(|_| ()) + .map_err(|e| println!("server error: {}", e)), + ); info!("running"); core.run(rx).unwrap(); diff --git a/proxy/tests/telemetry.rs b/proxy/tests/telemetry.rs index bf0708f6a..8636912c2 100644 --- a/proxy/tests/telemetry.rs +++ b/proxy/tests/telemetry.rs @@ -9,9 +9,7 @@ fn inbound_sends_telemetry() { let _ = env_logger::init(); info!("running test server"); - let srv = server::new() - .route("/hey", "hello") - .run(); + let srv = server::new().route("/hey", "hello").run(); let mut ctrl = controller::new(); let reports = ctrl.reports();