diff --git a/Cargo.lock b/Cargo.lock index 98a457c8e..5cd58583e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -562,7 +562,7 @@ dependencies = [ "tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-service 0.1.0 (git+https://github.com/tower-rs/tower)", "tower-util 0.1.0 (git+https://github.com/tower-rs/tower)", - "trust-dns-resolver 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", + "trust-dns-resolver 0.10.0 (git+https://github.com/bluejekyll/trust-dns?rev=c017c114)", "try-lock 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "untrusted 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "webpki 0.18.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1434,7 +1434,7 @@ dependencies = [ [[package]] name = "trust-dns-proto" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/bluejekyll/trust-dns?rev=c017c114#c017c114177d826d00710572ff1fcb103f752f43" dependencies = [ "byteorder 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1457,7 +1457,7 @@ dependencies = [ [[package]] name = "trust-dns-resolver" version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" +source = "git+https://github.com/bluejekyll/trust-dns?rev=c017c114#c017c114177d826d00710572ff1fcb103f752f43" dependencies = [ "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1469,7 +1469,7 @@ dependencies = [ "resolv-conf 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", - "trust-dns-proto 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "trust-dns-proto 0.5.0 (git+https://github.com/bluejekyll/trust-dns?rev=c017c114)", ] [[package]] @@ -1784,8 +1784,8 @@ dependencies = [ "checksum tower-reconnect 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-service 0.1.0 (git+https://github.com/tower-rs/tower)" = "" "checksum tower-util 0.1.0 (git+https://github.com/tower-rs/tower)" = "" -"checksum trust-dns-proto 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0838272e89f1c693b4df38dc353412e389cf548ceed6f9fd1af5a8d6e0e7cf74" -"checksum trust-dns-resolver 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4e913a5df94658858e548cc95a3212797ee524e487ede091c32f27ca26e11620" +"checksum trust-dns-proto 0.5.0 (git+https://github.com/bluejekyll/trust-dns?rev=c017c114)" = "" +"checksum trust-dns-resolver 0.10.0 (git+https://github.com/bluejekyll/trust-dns?rev=c017c114)" = "" "checksum try-lock 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "119b532a17fbe772d360be65617310164549a07c25a1deab04c84168ce0d4545" "checksum ucd-util 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fd2be2d6639d0f8fe6cdda291ad456e23629558d466e2789d2c3e9892bda285d" "checksum unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5" diff --git a/Cargo.toml b/Cargo.toml index 6c23bcfbf..d599ace3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,8 +69,8 @@ tower-h2 = { git = "https://github.com/tower-rs/tower-h2" } tower-h2-balance = { git = "https://github.com/tower-rs/tower-h2" } tower-grpc = { git = "https://github.com/tower-rs/tower-grpc" } -# dns -trust-dns-resolver = { version = "0.10.0", default-features = false } +# FIXME update to a release when available (>0.10) +trust-dns-resolver = { git = "https://github.com/bluejekyll/trust-dns", rev = "c017c114", default-features = false } # tls ring = "0.13" diff --git a/src/addr.rs b/src/addr.rs index 159d04bb4..99b357f8d 100644 --- a/src/addr.rs +++ b/src/addr.rs @@ -30,23 +30,29 @@ pub enum Error { // === impl Addr === impl Addr { - pub fn new(host: &str, port: u16) -> Result { + pub fn from_str(hostport: &str) -> Result { + SocketAddr::from_str(hostport) + .map(Addr::Socket) + .or_else(|_| NameAddr::from_str(hostport).map(Addr::Name)) + } + + pub fn from_str_and_port(host: &str, port: u16) -> Result { IpAddr::from_str(host) .map(|ip| Addr::Socket((ip, port).into())) - .or_else(|_| NameAddr::new(host, port).map(Addr::Name)) + .or_else(|_| NameAddr::from_str_and_port(host, port).map(Addr::Name)) } pub fn from_authority_and_default_port( a: &http::uri::Authority, default_port: u16, ) -> Result { - Self::new(a.host(), a.port().unwrap_or(default_port)) + Self::from_str_and_port(a.host(), a.port().unwrap_or(default_port)) } pub fn from_authority_with_port(a: &http::uri::Authority) -> Result { a.port() .ok_or(Error::MissingPort) - .and_then(|p| Self::new(a.host(), p)) + .and_then(|p| Self::from_str_and_port(a.host(), p)) } pub fn port(&self) -> u16 { @@ -96,16 +102,36 @@ impl Addr { impl fmt::Display for Addr { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { - Addr::Name(NameAddr { ref name, port }) => write!(f, "{}:{}", name, port), - Addr::Socket(addr) => write!(f, "{}", addr), + Addr::Name(name) => name.fmt(f), + Addr::Socket(addr) => addr.fmt(f), } } } +impl From for Addr { + fn from(na: NameAddr) -> Self { + Addr::Name(na) + } +} + // === impl NameAddr === impl NameAddr { - pub fn new(host: &str, port: u16) -> Result { + pub fn new(name: Name, port: u16) -> Self { + NameAddr { name, port } + } + + pub fn from_str(hostport: &str) -> Result { + let mut parts = hostport.rsplitn(2, ':'); + let port = parts + .next() + .and_then(|p| p.parse::().ok()) + .ok_or(Error::MissingPort)?; + let host = parts.next().ok_or(Error::InvalidHost)?; + Self::from_str_and_port(host, port) + } + + pub fn from_str_and_port(host: &str, port: u16) -> Result { if host.is_empty() { return Err(Error::InvalidHost); } @@ -119,13 +145,13 @@ impl NameAddr { a: &http::uri::Authority, default_port: u16, ) -> Result { - Self::new(a.host(), a.port().unwrap_or(default_port)) + Self::from_str_and_port(a.host(), a.port().unwrap_or(default_port)) } pub fn from_authority_with_port(a: &http::uri::Authority) -> Result { a.port() .ok_or(Error::MissingPort) - .and_then(|p| Self::new(a.host(), p)) + .and_then(|p| Self::from_str_and_port(a.host(), p)) } pub fn name(&self) -> &Name { @@ -141,37 +167,36 @@ impl NameAddr { } pub fn as_authority(&self) -> http::uri::Authority { - http::uri::Authority::from_str(self.name.as_ref()) + http::uri::Authority::from_str(&format!("{}", self)) .expect("NameAddr must be valid authority") } } impl fmt::Display for NameAddr { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}:{}", self.name, self.port) + + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}:{}", self.name.without_trailing_dot(), self.port) } } #[cfg(test)] mod tests { use super::*; - use http::uri::Authority; #[test] fn test_is_loopback() { let cases = &[ - ("localhost", false), // Not absolute - ("localhost.", true), - ("LocalhOsT.", true), // Case-insensitive - ("mlocalhost.", false), // prefixed - ("localhost1.", false), // suffixed - ("127.0.0.1", true), // IPv4 - ("[::1]", true), // IPv6 + ("localhost:80", false), // Not absolute + ("localhost.:80", true), + ("LocalhOsT.:80", true), // Case-insensitive + ("mlocalhost.:80", false), // prefixed + ("localhost1.:80", false), // suffixed + ("127.0.0.1:80", true), // IPv4 + ("[::1]:80", true), // IPv6 ]; for (host, expected_result) in cases { - let authority = Authority::from_static(host); - let hp = Addr::from_authority_and_default_port(&authority, 80).unwrap(); - assert_eq!(hp.is_loopback(), *expected_result, "{:?}", host) + let a = Addr::from_str(host).unwrap(); + assert_eq!(a.is_loopback(), *expected_result, "{:?}", host) } } } diff --git a/src/app/config.rs b/src/app/config.rs index 4114d1574..59f9c5bbd 100644 --- a/src/app/config.rs +++ b/src/app/config.rs @@ -11,6 +11,7 @@ use indexmap::IndexSet; use trust_dns_resolver::config::ResolverOpts; use addr; +use dns; use convert::TryFrom; use transport::tls; use {Conditional, Addr}; @@ -59,6 +60,12 @@ pub struct Config { /// active concurrently. pub destination_concurrency_limit: usize, + /// Configured by `ENV_DESTINATION_GET_SUFFIXES`. + pub destination_get_suffixes: Vec, + + /// Configured by `ENV_DESTINATION_PROFILE_SUFFIXES`. + pub destination_profile_suffixes: Vec, + pub tls_settings: Conditional, /// The path to "/etc/resolv.conf" @@ -123,6 +130,7 @@ pub enum Error { pub enum ParseError { EnvironmentUnsupported, NotADuration, + NotADomainSuffix, NotANumber, HostIsNotAnIpAddress, NotUnicode, @@ -188,6 +196,28 @@ pub const ENV_OUTBOUND_ROUTER_CAPACITY: &str = "LINKERD2_PROXY_OUTBOUND_ROUTER_C pub const ENV_INBOUND_ROUTER_MAX_IDLE_AGE: &str = "LINKERD2_PROXY_INBOUND_ROUTER_MAX_IDLE_AGE"; pub const ENV_OUTBOUND_ROUTER_MAX_IDLE_AGE: &str = "LINKERD2_PROXY_OUTBOUND_ROUTER_MAX_IDLE_AGE"; +/// Constrains which destination names are resolved through the destination +/// service. +/// +/// The value is a comma-separated list of domain name suffixes that may be +/// resolved via the destination service. A value of `.` indicates that all +/// domains should be resolved via the service. +/// +/// If specified and empty, the destination service is not used for resolution. +/// +/// If unspecified, a default value is used. +pub const ENV_DESTINATION_GET_SUFFIXES: &str = "LINKERD2_PROXY_DESTINATION_GET_SUFFIXES"; + +/// Constrains which destination names may be used for profile/route discovery. +/// +/// The value is a comma-separated list of domain name suffixes that may be +/// resolved via the destination service. A value of `.` indicates that all +/// domains should be discovered via the service. +/// +/// If specified and empty, the destination service is not used for route discovery. +/// +/// If unspecified, a default value is used. +pub const ENV_DESTINATION_PROFILE_SUFFIXES: &str = "LINKERD2_PROXY_DESTINATION_PROFILE_SUFFIXES"; /// Limits the maximum number of outbound Destination service queries. /// @@ -249,6 +279,9 @@ const DEFAULT_OUTBOUND_ROUTER_MAX_IDLE_AGE: Duration = Duration::from_secs(60); const DEFAULT_DESTINATION_CLIENT_CONCURRENCY_LIMIT: usize = 100; +const DEFAULT_DESTINATION_GET_SUFFIXES: &str = "svc.cluster.local."; +const DEFAULT_DESTINATION_PROFILE_SUFFIXES: &str = "svc.cluster.local."; + // By default, we keep a list of known assigned ports of server-first protocols. // // https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt @@ -300,6 +333,10 @@ impl<'a> TryFrom<&'a Strings> for Config { let outbound_router_max_idle_age = parse(strings, ENV_OUTBOUND_ROUTER_MAX_IDLE_AGE, parse_duration); let destination_concurrency_limit = parse(strings, ENV_DESTINATION_CLIENT_CONCURRENCY_LIMIT, parse_number); + let destination_get_suffixes = + parse(strings, ENV_DESTINATION_GET_SUFFIXES, parse_dns_suffixes); + let destination_profile_suffixes = + parse(strings, ENV_DESTINATION_PROFILE_SUFFIXES, parse_dns_suffixes); let tls_trust_anchors = parse(strings, ENV_TLS_TRUST_ANCHORS, parse_path); let tls_end_entity_cert = parse(strings, ENV_TLS_CERT, parse_path); let tls_private_key = parse(strings, ENV_TLS_PRIVATE_KEY, parse_path); @@ -440,6 +477,12 @@ impl<'a> TryFrom<&'a Strings> for Config { destination_concurrency_limit: destination_concurrency_limit? .unwrap_or(DEFAULT_DESTINATION_CLIENT_CONCURRENCY_LIMIT), + destination_get_suffixes: destination_get_suffixes? + .unwrap_or(parse_dns_suffixes(DEFAULT_DESTINATION_GET_SUFFIXES).unwrap()), + + destination_profile_suffixes: destination_profile_suffixes? + .unwrap_or(parse_dns_suffixes(DEFAULT_DESTINATION_PROFILE_SUFFIXES).unwrap()), + tls_settings, resolv_conf_path: resolv_conf_path? @@ -600,6 +643,28 @@ where } } +fn parse_dns_suffixes(list: &str) -> Result, ParseError> { + let mut suffixes = Vec::new(); + for item in list.split(',') { + let item = item.trim(); + if !item.is_empty() { + let sfx = parse_dns_suffix(item)?; + suffixes.push(sfx); + } + } + + Ok(suffixes) +} + +fn parse_dns_suffix(s: &str) -> Result { + if s == "." { + return Ok(dns::Suffix::Root); + } + + dns::Name::try_from(s.as_bytes()) + .map(dns::Suffix::Name) + .map_err(|_| ParseError::NotADomainSuffix) +} #[cfg(test)] mod tests { @@ -672,4 +737,37 @@ mod tests { fn parse_duration_number_without_unit_is_invalid() { assert_eq!(parse_duration("1"), Err(ParseError::NotADuration)); } + + #[test] + fn dns_suffixes() { + fn p(s: &str) -> Result, ParseError> { + let sfxs = parse_dns_suffixes(s)? + .into_iter() + .map(|s| format!("{}", s)) + .collect(); + + Ok(sfxs) + } + + assert_eq!(p(""), Ok(vec![]), "empty string"); + assert_eq!(p(",,,"), Ok(vec![]), "empty list components are ignored"); + assert_eq!(p("."), Ok(vec![".".to_owned()]), "root is valid"); + assert_eq!(p("a.b.c"), Ok(vec!["a.b.c".to_owned()]), "a name without trailing dot"); + assert_eq!(p("a.b.c."), Ok(vec!["a.b.c.".to_owned()]), "a name with a trailing dot"); + assert_eq!( + p(" a.b.c. , d.e.f. "), + Ok(vec!["a.b.c.".to_owned(), "d.e.f.".to_owned()]), + "whitespace is ignored" + ); + assert_eq!( + p("a .b.c"), + Err(ParseError::NotADomainSuffix), + "whitespace not allowed within a name" + ); + assert_eq!( + p("mUlti.CasE.nAmE"), + Ok(vec!["multi.case.name".to_owned()]), + "names are coerced to lowercase" + ); + } } diff --git a/src/app/control.rs b/src/app/control.rs index a98e63623..18c9a8a42 100644 --- a/src/app/control.rs +++ b/src/app/control.rs @@ -8,7 +8,7 @@ use {Conditional, Addr}; #[derive(Clone, Debug)] pub struct Config { - host_and_port: Addr, + addr: Addr, tls_server_identity: Conditional, tls_config: tls::ConditionalClientConfig, backoff: Duration, @@ -18,13 +18,13 @@ pub struct Config { impl Config { pub fn new( - host_and_port: Addr, + addr: Addr, tls_server_identity: Conditional, backoff: Duration, connect_timeout: Duration, ) -> Self { Self { - host_and_port, + addr, tls_server_identity, tls_config: Conditional::None(tls::ReasonForNoTls::Disabled), backoff, @@ -46,7 +46,7 @@ impl svc::watch::WithUpdate for Config { impl fmt::Display for Config { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Display::fmt(&self.host_and_port, f) + fmt::Display::fmt(&self.addr, f) } } @@ -116,13 +116,13 @@ pub mod add_origin { fn make(&self, config: &super::Config) -> Result { let inner = self.inner.make(config)?; let scheme = uri::Scheme::from_shared(Bytes::from_static(b"http")).unwrap(); - let authority = config.host_and_port.as_authority(); + let authority = config.addr.as_authority(); Ok(AddOrigin::new(inner, scheme, authority)) } } } -/// Resolves the controller's `host_and_port` once before building a client. +/// Resolves the controller's `addr` once before building a client. pub mod resolve { use futures::{Future, Poll}; use std::marker::PhantomData; @@ -133,6 +133,7 @@ pub mod resolve { use dns; use svc; use transport::{connect, tls}; + use Addr; #[derive(Debug)] pub struct Layer { @@ -171,6 +172,7 @@ pub mod resolve { stack: M, }, Inner(::Future), + Invalid(Option), } #[derive(Debug)] @@ -252,13 +254,16 @@ pub mod resolve { type Future = Init; fn new_service(&self) -> Self::Future { - Init { - state: State::Resolve { - future: self.dns.resolve_one_ip(&self.config.host_and_port), + let state = match self.config.addr { + Addr::Socket(sa) => State::make_inner(sa, &self.config, &self.stack), + Addr::Name(ref na) => State::Resolve { + future: self.dns.resolve_one_ip(na.name()), stack: self.stack.clone(), config: self.config.clone(), }, - } + }; + + Init { state } } } @@ -284,32 +289,46 @@ pub mod resolve { ref stack, } => { let ip = try_ready!(future.poll().map_err(Error::Dns)); - let sa = SocketAddr::from((ip, config.host_and_port.port())); - - let tls = config.tls_server_identity.as_ref().and_then(|id| { - config - .tls_config - .as_ref() - .map(|config| tls::ConnectionConfig { - server_identity: id.clone(), - config: config.clone(), - }) - }); - let target = client::Target { - connect: connect::Target::new(sa, tls), - builder: config.builder.clone(), - log_ctx: ::logging::admin() - .client("control", config.host_and_port.clone()), - }; - - let inner = stack.make(&target).map_err(Error::Invalid)?; - State::Inner(svc::NewService::new_service(&inner)) + let sa = SocketAddr::from((ip, config.addr.port())); + State::make_inner(sa, &config, &stack) + } + State::Invalid(ref mut e) => { + return Err(Error::Invalid(e.take().expect("future polled after failure"))); } }; } } } + impl State + where + M: svc::Stack, + M::Value: svc::NewService, + { + fn make_inner(addr: SocketAddr, config: &super::Config, stack: &M) -> Self { + let tls = config.tls_server_identity.as_ref().and_then(|id| { + config + .tls_config + .as_ref() + .map(|config| tls::ConnectionConfig { + server_identity: id.clone(), + config: config.clone(), + }) + }); + + let target = client::Target { + connect: connect::Target::new(addr, tls), + builder: config.builder.clone(), + log_ctx: ::logging::admin().client("control", config.addr.clone()), + }; + + match stack.make(&target) { + Ok(n) => State::Inner(svc::NewService::new_service(&n)), + Err(e) => State::Invalid(Some(e)), + } + } + } + // === impl Error === impl fmt::Display for Error { diff --git a/src/app/dst.rs b/src/app/dst.rs new file mode 100644 index 000000000..5ca77ab2c --- /dev/null +++ b/src/app/dst.rs @@ -0,0 +1,87 @@ +use http; +use std::fmt; + +use proxy::http::{metrics::classify::CanClassify, profiles}; +use {Addr, NameAddr}; + +use super::classify; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] +pub enum Direction { + In, + Out, +} + +#[derive(Clone, Debug)] +pub struct Route { + pub dst_addr: DstAddr, + pub route: profiles::Route, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct DstAddr { + addr: Addr, + direction: Direction, +} + +// === impl Route === + +impl CanClassify for Route { + type Classify = classify::Request; + + fn classify(&self) -> classify::Request { + self.route.response_classes().clone().into() + } +} + +// === impl DstAddr === + +impl AsRef for DstAddr { + fn as_ref(&self) -> &Addr { + &self.addr + } +} + +impl DstAddr { + pub fn outbound(addr: Addr) -> Self { + DstAddr { addr, direction: Direction::Out } + } + + pub fn inbound(addr: Addr) -> Self { + DstAddr { addr, direction: Direction::In } + } + + pub fn direction(&self) -> Direction { + self.direction + } +} + +impl<'t> From<&'t DstAddr> for http::header::HeaderValue { + fn from(a: &'t DstAddr) -> Self { + http::header::HeaderValue::from_str(&format!("{}", a)) + .expect("addr must be a valid header") + } +} + +impl fmt::Display for DstAddr { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.addr.fmt(f) + } +} + +impl profiles::CanGetDestination for DstAddr { + fn get_destination(&self) -> Option<&NameAddr> { + self.addr.name_addr() + } +} + +impl profiles::WithRoute for DstAddr { + type Output = Route; + + fn with_route(self, route: profiles::Route) -> Self::Output { + Route { + dst_addr: self, + route, + } + } +} diff --git a/src/app/inbound.rs b/src/app/inbound.rs index 8f46d6f0b..43e8ff1b7 100644 --- a/src/app/inbound.rs +++ b/src/app/inbound.rs @@ -3,9 +3,9 @@ use std::fmt; use std::net::SocketAddr; use super::classify; -use proxy::http::{client, normalize_uri::ShouldNormalizeUri, router, Settings}; +use super::dst::DstAddr; +use proxy::http::{router, settings}; use proxy::server::Source; -use svc::stack_per_request::ShouldStackPerRequest; use tap; use transport::{connect, tls}; use {Conditional, NameAddr}; @@ -14,31 +14,16 @@ use {Conditional, NameAddr}; pub struct Endpoint { pub addr: SocketAddr, pub dst_name: Option, - pub settings: Settings, pub source_tls_status: tls::Status, } -// === Recognize === - #[derive(Clone, Debug, Default)] -pub struct Recognize { +pub struct RecognizeEndpoint { default_addr: Option, } // === impl Endpoint === -impl ShouldNormalizeUri for Endpoint { - fn should_normalize_uri(&self) -> bool { - !self.settings.is_http2() && !self.settings.was_absolute_form() - } -} - -impl ShouldStackPerRequest for Endpoint { - fn should_stack_per_request(&self) -> bool { - !self.settings.is_http2() && !self.settings.can_reuse_clients() - } -} - impl classify::CanClassify for Endpoint { type Classify = classify::Request; @@ -47,12 +32,20 @@ impl classify::CanClassify for Endpoint { } } -// Makes it possible to build a client::Stack. -impl From for client::Config { - fn from(ep: Endpoint) -> Self { +impl Endpoint { + pub fn dst_name(&self) -> Option<&NameAddr> { + self.dst_name.as_ref() + } + + fn target(&self) -> connect::Target { let tls = Conditional::None(tls::ReasonForNoTls::InternalTraffic); - let connect = connect::Target::new(ep.addr, tls); - client::Config::new(connect, ep.settings) + connect::Target::new(self.addr, tls) + } +} + +impl settings::router::HasConnect for Endpoint { + fn connect(&self) -> connect::Target { + self.target() } } @@ -60,7 +53,7 @@ impl From for tap::Endpoint { fn from(ep: Endpoint) -> Self { tap::Endpoint { direction: tap::Direction::In, - client: ep.into(), + target: ep.target(), labels: Default::default(), } } @@ -72,44 +65,40 @@ impl fmt::Display for Endpoint { } } -impl Recognize { +// === impl RecognizeEndpoint === + +impl RecognizeEndpoint { pub fn new(default_addr: Option) -> Self { Self { default_addr } } } -impl router::Recognize> for Recognize { +impl router::Recognize> for RecognizeEndpoint { type Target = Endpoint; fn recognize(&self, req: &http::Request) -> Option { let src = req.extensions().get::(); - let source_tls_status = src - .map(|s| s.tls_status.clone()) - .unwrap_or_else(|| Conditional::None(tls::ReasonForNoTls::Disabled)); - + debug!("inbound endpoint: src={:?}", src); let addr = src .and_then(Source::orig_dst_if_not_local) .or(self.default_addr)?; - let dst_name = super::http_request_addr(req) - .ok() - .and_then(|h| h.into_name_addr()); - let settings = Settings::from_request(req); + let source_tls_status = src + .map(|s| s.tls_status.clone()) + .unwrap_or_else(|| Conditional::None(tls::ReasonForNoTls::Disabled)); - let ep = Endpoint { + let dst_name = req + .extensions() + .get::() + .and_then(|a| a.as_ref().name_addr()) + .cloned(); + debug!("inbound endpoint: dst={:?}", dst_name); + + Some(Endpoint { addr, dst_name, - settings, source_tls_status, - }; - debug!("recognize: src={:?} ep={:?}", src, ep); - Some(ep) - } -} - -impl fmt::Display for Recognize { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "in") + }) } } @@ -230,24 +219,17 @@ mod tests { use http; use std::net; - use super::{Endpoint, Recognize}; - use proxy::http::router::Recognize as _Recognize; - use proxy::http::settings::Settings; + use super::{Endpoint, RecognizeEndpoint}; + use proxy::http::router::Recognize; use proxy::server::Source; use transport::tls; use Conditional; fn make_h1_endpoint(addr: net::SocketAddr) -> Endpoint { - let settings = Settings::Http1 { - is_h1_upgrade: false, - was_absolute_form: false, - stack_per_request: true, - }; let source_tls_status = TLS_DISABLED; Endpoint { addr, dst_name: None, - settings, source_tls_status, } } @@ -267,7 +249,7 @@ mod tests { let mut req = http::Request::new(()); req.extensions_mut().insert(src); - Recognize::default().recognize(&req) == rec + RecognizeEndpoint::default().recognize(&req) == rec } fn recognize_default_no_orig_dst( @@ -279,12 +261,12 @@ mod tests { req.extensions_mut() .insert(Source::for_test(remote, local, None, TLS_DISABLED)); - Recognize::new(default).recognize(&req) == default.map(make_h1_endpoint) + RecognizeEndpoint::new(default).recognize(&req) == default.map(make_h1_endpoint) } fn recognize_default_no_ctx(default: Option) -> bool { let req = http::Request::new(()); - Recognize::new(default).recognize(&req) == default.map(make_h1_endpoint) + RecognizeEndpoint::new(default).recognize(&req) == default.map(make_h1_endpoint) } fn recognize_default_no_loop( @@ -296,7 +278,7 @@ mod tests { req.extensions_mut() .insert(Source::for_test(remote, local, Some(local), TLS_DISABLED)); - Recognize::new(default).recognize(&req) == default.map(make_h1_endpoint) + RecognizeEndpoint::new(default).recognize(&req) == default.map(make_h1_endpoint) } } } diff --git a/src/app/main.rs b/src/app/main.rs index 5f63ff299..e4e19e923 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -21,17 +21,23 @@ use metrics::{self, FmtMetrics}; use never::Never; use proxy::{ self, buffer, - http::{client, insert_target, normalize_uri, router}, + http::{client, insert_target, normalize_uri, profiles, router, settings}, limit, reconnect, timeout, }; -use svc::{self, Layer as _Layer, Stack as _Stack}; +use svc::{ + self, shared, + stack::{map_target, phantom_data}, + Layer, Stack, +}; use tap; use task; use telemetry; use transport::{self, connect, tls, BoundPort, Connection, GetOriginalDst}; -use Conditional; +use {Addr, Conditional}; use super::config::Config; +use super::dst::DstAddr; +use super::profiles::Client as ProfilesClient; /// Runs a sidecar proxy. /// @@ -77,7 +83,8 @@ where let control_listener = BoundPort::new( config.control_listener.addr, Conditional::None(tls::ReasonForNoIdentity::NotImplementedForTap.into()), - ).expect("controller listener bind"); + ) + .expect("controller listener bind"); let inbound_listener = { let tls = config.tls_settings.as_ref().and_then(|settings| { @@ -95,7 +102,8 @@ where let outbound_listener = BoundPort::new( config.outbound_listener.addr, Conditional::None(tls::ReasonForNoTls::InternalTraffic), - ).expect("private listener bind"); + ) + .expect("private listener bind"); let runtime = runtime.into(); @@ -103,7 +111,8 @@ where let metrics_listener = BoundPort::new( config.metrics_listener.addr, Conditional::None(tls::ReasonForNoIdentity::NotImplementedForMetrics.into()), - ).expect("metrics listener bind"); + ) + .expect("metrics listener bind"); Main { config, @@ -230,7 +239,7 @@ where .push(reconnect::layer().with_fixed_backoff(config.control_backoff_delay)) .push(proxy::timeout::layer(config.control_connect_timeout)) .push(svc::watch::layer(tls_client_config.clone())) - .push(svc::stack::phantom_data::layer()) + .push(phantom_data::layer()) .push(control::add_origin::Layer::new()) .push(buffer::layer()) .push(limit::layer(config.destination_concurrency_limit)); @@ -258,6 +267,7 @@ where controller.clone(), dns_resolver.clone(), config.namespaces.clone(), + config.destination_get_suffixes, config.destination_concurrency_limit, ); resolver_bg_tx @@ -265,76 +275,152 @@ where .ok() .expect("admin thread must receive resolver task"); + let profiles_client = ProfilesClient::new(controller, Duration::from_secs(3)); + let outbound = { - use super::outbound::{ - discovery::Resolve, orig_proto_upgrade, Endpoint, Recognize, - }; - use super::profiles::Client as ProfilesClient; + use super::outbound::{discovery::Resolve, orig_proto_upgrade, Endpoint}; use proxy::{ - http::{balance, metrics, profiles}, + canonicalize, + http::{balance, header_from_target, metrics}, resolve, }; + let profiles_client = profiles_client.clone(); + let capacity = config.outbound_router_capacity; + let max_idle_age = config.outbound_router_max_idle_age; let endpoint_http_metrics = endpoint_http_metrics.clone(); + let route_http_metrics = route_http_metrics.clone(); + let profile_suffixes = config.destination_profile_suffixes.clone(); - // As the outbound proxy accepts connections, we don't do any - // special transport-level handling. - let accept = transport_metrics.accept("outbound").bind(()); - - // Establishes connections to remote peers. + // Establishes connections to remote peers (for both TCP + // forwarding and HTTP proxying). let connect = connect::Stack::new() .push(proxy::timeout::layer(config.outbound_connect_timeout)) .push(transport_metrics.connect("outbound")); + // Instantiates an HTTP client for for a `client::Config` let client_stack = connect .clone() .push(client::layer("out")) - .push(svc::stack::map_target::layer(|ep: &Endpoint| { - client::Config::from(ep.clone()) - })) - .push(reconnect::layer()); - - let endpoint_stack = client_stack + .push(reconnect::layer()) .push(svc::stack_per_request::layer()) - .push(normalize_uri::layer()) + .push(normalize_uri::layer()); + + // A per-`outbound::Endpoint` stack that: + // + // 1. Records http metrics with per-endpoint labels. + // 2. Instruments `tap` inspection. + // 3. Changes request/response versions when the endpoint + // supports protocol upgrade (and the request may be upgraded). + // 4. Routes requests to the correct client (based on the + // request version and headers). + let endpoint_stack = client_stack + .push(buffer::layer()) + .push(settings::router::layer::()) .push(orig_proto_upgrade::layer()) .push(tap::layer(tap_next_id.clone(), taps.clone())) - .push(metrics::layer::<_, classify::Response>(endpoint_http_metrics)) - .push(svc::watch::layer(tls_client_config)) - .push(buffer::layer()); + .push(metrics::layer::<_, classify::Response>( + endpoint_http_metrics, + )) + .push(svc::watch::layer(tls_client_config)); - let profiles_client = ProfilesClient::new( - controller, - Duration::from_secs(3), - control::KubernetesNormalize::new(config.namespaces.pod.clone()), - ); + // A per-`dst::Route` layer that uses profile data to configure + // a per-route layer. + // + // The `classify` module installs a `classify::Response` + // extension into each request so that all lower metrics + // implementations can use the route-specific configuration. + let dst_route_layer = phantom_data::layer() + .push(metrics::layer::<_, classify::Response>(route_http_metrics)) + .push(classify::layer()); - let dst_route_stack = endpoint_stack + // A per-`DstAddr` stack that does the following: + // + // 1. Adds the `CANONICAL_DST_HEADER` from the `DstAddr`. + // 2. Determines the profile of the destination and applies + // per-route policy. + // 3. Creates a load balancer , configured by resolving the + // `DstAddr` with a resolver. + let dst_stack = endpoint_stack .push(resolve::layer(Resolve::new(resolver))) .push(balance::layer()) .push(buffer::layer()) .push(profiles::router::layer( + profile_suffixes, profiles_client, - svc::stack::phantom_data::layer() - .push(metrics::layer::<_, classify::Response>(route_http_metrics)) - .push(classify::layer()), + dst_route_layer, )) + .push(header_from_target::layer(super::CANONICAL_DST_HEADER)); + + // Routes request using the `DstAddr` extension. + // + // This is shared across addr-stacks so that multiple addrs that + // canonicalize to the same DstAddr use the same dst-stack service. + // + // Note: This router could be replaced with a Stack-based + // router, since the `DstAddr` is known at construction-time. + // But for now it's more important to use the request router's + // caching logic. + let dst_router = dst_stack + .push(buffer::layer()) + .push(router::layer(|req: &http::Request<_>| { + let addr = req.extensions().get::().cloned(); + debug!("outbound dst={:?}", addr); + addr + })) + .make(&router::Config::new("out dst", capacity, max_idle_age)) + .map(shared::stack) + .expect("outbound dst router") + .push(phantom_data::layer()); + + // Canonicalizes the request-specified `Addr` via DNS, and + // annotates each request with a `DstAddr` so that it may be + // routed by the dst_router. + let addr_stack = dst_router + .push(insert_target::layer()) + .push(map_target::layer(|addr: &Addr| { + DstAddr::outbound(addr.clone()) + })) + .push(canonicalize::layer(dns_resolver)); + + // Routes requests to an `Addr`: + // + // 1. If the request is HTTP/2 and has an :authority, this value + // is used. + // + // 2. If the request is absolute-form HTTP/1, the URI's + // authority is used. + // + // 3. If the request has an HTTP/1 Host header, it is used. + // + // 4. Finally, if the Source had an SO_ORIGINAL_DST, this TCP + // address is used. + let addr_router = addr_stack .push(buffer::layer()) .push(timeout::layer(config.bind_timeout)) .push(limit::layer(MAX_IN_FLIGHT)) - .push(router::layer(Recognize::new())); + .push(router::layer(|req: &http::Request<_>| { + let addr = super::http_request_authority_addr(req) + .or_else(|_| super::http_request_host_addr(req)) + .or_else(|_| super::http_request_orig_dst_addr(req)) + .ok(); + debug!("outbound addr={:?}", addr); + addr + })) + .make(&router::Config::new("out addr", capacity, max_idle_age)) + .map(shared::stack) + .expect("outbound addr router") + .push(phantom_data::layer()); - let capacity = config.outbound_router_capacity; - let max_idle_age = config.outbound_router_max_idle_age; - let router = dst_route_stack - .make(&router::Config::new("out", capacity, max_idle_age)) - .expect("outbound router"); + // Instantiates an HTTP service for each `Source` using the + // shared `addr_router`. The `Source` is stored in the request's + // extensions so that it can be used by the `addr_router`. + let server_stack = addr_router + .push(insert_target::layer()); - // As HTTP requests are accepted, we add some request extensions - // including metadata about the request's origin. - let server_stack = svc::stack::phantom_data::layer() - .push(insert_target::layer()) - .bind(svc::shared::stack(router)); + // Instantiated for each TCP connection received from the local + // application (including HTTP connections). + let accept = transport_metrics.accept("outbound").bind(()); serve( "out", @@ -345,66 +431,128 @@ where config.outbound_ports_disable_protocol_detection, get_original_dst.clone(), drain_rx.clone(), - ).map_err(|e| error!("outbound proxy background task failed: {}", e)) + ) + .map_err(|e| error!("outbound proxy background task failed: {}", e)) }; let inbound = { - use super::inbound::{self, Endpoint}; + use super::inbound::{orig_proto_downgrade, rewrite_loopback_addr, Endpoint, RecognizeEndpoint}; use proxy::http::metrics; - // As the inbound proxy accepts connections, we don't do any - // special transport-level handling. - let accept = transport_metrics.accept("inbound").bind(()); + let capacity = config.inbound_router_capacity; + let max_idle_age = config.inbound_router_max_idle_age; + let profile_suffixes = config.destination_profile_suffixes; + let default_fwd_addr = config.inbound_forward.map(|a| a.into()); - // Establishes connections to the local application. + // Establishes connections to the local application (for both + // TCP forwarding and HTTP proxying). let connect = connect::Stack::new() .push(proxy::timeout::layer(config.inbound_connect_timeout)) .push(transport_metrics.connect("inbound")) - .push(inbound::rewrite_loopback_addr::layer()); + .push(rewrite_loopback_addr::layer()); + + // Instantiates an HTTP client for for a `client::Config` + let client_stack = connect + .clone() + .push(client::layer("in")) + .push(reconnect::layer()) + .push(svc::stack_per_request::layer()) + .push(normalize_uri::layer()); // A stack configured by `router::Config`, responsible for building // a router made of route stacks configured by `inbound::Endpoint`. // // If there is no `SO_ORIGINAL_DST` for an inbound socket, // `default_fwd_addr` may be used. - // - // `normalize_uri` and `stack_per_request` are applied on the stack - // selectively. For HTTP/2 stacks, for instance, neither service will be - // employed. - let default_fwd_addr = config.inbound_forward.map(|a| a.into()); - let stack = connect - .clone() - .push(client::layer("in")) - .push(svc::stack::map_target::layer(|ep: &Endpoint| { - client::Config::from(ep.clone()) - })) - .push(reconnect::layer()) - .push(svc::stack_per_request::layer()) - .push(normalize_uri::layer()) + let endpoint_router = client_stack + .push(buffer::layer()) + .push(settings::router::layer::()) .push(tap::layer(tap_next_id, taps)) - .push(metrics::layer::<_, classify::Response>(endpoint_http_metrics)) - .push(classify::layer()) + .push(metrics::layer::<_, classify::Response>( + endpoint_http_metrics, + )) + .push(buffer::layer()) + .push(router::layer(RecognizeEndpoint::new(default_fwd_addr))) + .make(&router::Config::new("in endpoint", capacity, max_idle_age)) + .map(shared::stack) + .expect("inbound endpoint router"); + + // A per-`dst::Route` layer that uses profile data to configure + // a per-route layer. + // + // The `classify` module installs a `classify::Response` + // extension into each request so that all lower metrics + // implementations can use the route-specific configuration. + let dst_route_stack = phantom_data::layer() + .push(metrics::layer::<_, classify::Response>(route_http_metrics)) + .push(classify::layer()); + + // A per-`DstAddr` stack that does the following: + // + // 1. Determines the profile of the destination and applies + // per-route policy. + // 2. Annotates the request with the `DstAddr` so that + // `RecognizeEndpoint` can use the value. + let dst_stack = endpoint_router + .push(phantom_data::layer()) + .push(insert_target::layer()) + .push(buffer::layer()) + .push(profiles::router::layer( + profile_suffixes, + profiles_client, + dst_route_stack, + )); + + // Routes requests to a `DstAddr`. + // + // 1. If the CANONICAL_DST_HEADER is set by the remote peer, + // this value is used to construct a DstAddr. + // + // 2. If the request is HTTP/2 and has an :authority, this value + // is used. + // + // 3. If the request is absolute-form HTTP/1, the URI's + // authority is used. + // + // 4. If the request has an HTTP/1 Host header, it is used. + // + // 5. Finally, if the Source had an SO_ORIGINAL_DST, this TCP + // address is used. + let dst_router = dst_stack .push(buffer::layer()) .push(limit::layer(MAX_IN_FLIGHT)) - .push(router::layer(inbound::Recognize::new(default_fwd_addr))); + .push(router::layer(|req: &http::Request<_>| { + let canonical = req + .headers() + .get(super::CANONICAL_DST_HEADER) + .and_then(|dst| dst.to_str().ok()) + .and_then(|d| Addr::from_str(d).ok()); + info!("inbound canonical={:?}", canonical); - // Build a router using the above policy - let capacity = config.inbound_router_capacity; - let max_idle_age = config.inbound_router_max_idle_age; - let router = stack - .make(&router::Config::new("in", capacity, max_idle_age)) - .expect("inbound router"); + let dst = canonical + .or_else(|| super::http_request_authority_addr(req).ok()) + .or_else(|| super::http_request_host_addr(req).ok()) + .or_else(|| super::http_request_orig_dst_addr(req).ok()); + info!("inbound dst={:?}", dst); + dst.map(DstAddr::inbound) + })) + .make(&router::Config::new("in dst", capacity, max_idle_age)) + .map(shared::stack) + .expect("inbound dst router"); - // As HTTP requests are accepted, we add some request extensions - // including metadata about the request's origin. + // As HTTP requests are accepted, the `Source` connection + // metadata is stored on each request's extensions. // // Furthermore, HTTP/2 requests may be downgraded to HTTP/1.1 per // `orig-proto` headers. This happens in the source stack so that // the router need not detect whether a request _will be_ downgraded. - let source_stack = svc::stack::phantom_data::layer() - .push(inbound::orig_proto_downgrade::layer()) - .push(insert_target::layer()) - .bind(svc::shared::stack(router)); + let source_stack = dst_router + .push(orig_proto_downgrade::layer()) + .push(insert_target::layer()); + + // As the inbound proxy accepts connections, we don't do any + // special transport-level handling. + let accept = transport_metrics.accept("inbound").bind(()); serve( "in", @@ -415,7 +563,8 @@ where config.inbound_ports_disable_protocol_detection, get_original_dst.clone(), drain_rx.clone(), - ).map_err(|e| error!("inbound proxy background task failed: {}", e)) + ) + .map_err(|e| error!("inbound proxy background task failed: {}", e)) }; inbound.join(outbound).map(|_| {}) diff --git a/src/app/metric_labels.rs b/src/app/metric_labels.rs index 7b426b38e..707dfe585 100644 --- a/src/app/metric_labels.rs +++ b/src/app/metric_labels.rs @@ -8,7 +8,7 @@ use metrics::FmtLabels; use transport::tls; use {Conditional, NameAddr}; -use super::{classify, inbound, outbound}; +use super::{classify, dst, inbound, outbound}; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct EndpointLabels { @@ -21,8 +21,7 @@ pub struct EndpointLabels { #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct RouteLabels { - direction: Direction, - dst: Dst, + dst: dst::DstAddr, labels: Option, } @@ -35,16 +34,12 @@ enum Direction { #[derive(Clone, Debug, PartialEq, Eq, Hash)] struct Authority<'a>(&'a NameAddr); -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -struct Dst(outbound::Destination); - // === impl RouteLabels === -impl From for RouteLabels { - fn from(r: outbound::Route) -> Self { +impl From for RouteLabels { + fn from(r: dst::Route) -> Self { RouteLabels { - dst: Dst(r.dst.clone()), - direction: Direction::Out, + dst: r.dst_addr, labels: prefix_labels("rt", r.route.labels().as_ref().into_iter()), } } @@ -52,7 +47,7 @@ impl From for RouteLabels { impl FmtLabels for RouteLabels { fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result { - (&self.dst, &self.direction).fmt_labels(f)?; + self.dst.fmt_labels(f)?; if let Some(labels) = self.labels.as_ref() { write!(f, ",{}", labels)?; @@ -93,7 +88,7 @@ impl From for EndpointLabels { fn from(ep: outbound::Endpoint) -> Self { Self { addr: ep.connect.addr, - dst_name: ep.dst.addr.into_name_addr(), + dst_name: ep.dst_name, direction: Direction::Out, tls_status: ep.connect.tls_status(), labels: prefix_labels("dst", ep.metadata.labels().into_iter()), @@ -128,23 +123,22 @@ impl FmtLabels for Direction { impl<'a> FmtLabels for Authority<'a> { fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self.0.port() { - 80 => write!(f, "authority=\"{}\"", self.0.name()), - _ => write!(f, "authority=\"{}\"", self.0), + if self.0.port() == 80 { + write!(f, "authority=\"{}\"", self.0.name().without_trailing_dot()) + } else { + write!(f, "authority=\"{}\"", self.0) } } } -impl FmtLabels for Dst { +impl FmtLabels for dst::DstAddr { fn fmt_labels(&self, f: &mut fmt::Formatter) -> fmt::Result { - let proto = if self.0.settings.is_http2() { - "h2" - } else { - "h1" - }; - write!(f, "dst=\"{}\",dst_protocol=\"{}\"", self.0.addr, proto)?; + match self.direction() { + dst::Direction::In => Direction::In.fmt_labels(f)?, + dst::Direction::Out => Direction::Out.fmt_labels(f)?, + } - Ok(()) + write!(f, ",dst=\"{}\"", self.as_ref()) } } diff --git a/src/app/mod.rs b/src/app/mod.rs index b5a0484bd..4cfc42812 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -5,6 +5,7 @@ use http; mod classify; pub mod config; mod control; +mod dst; mod inbound; mod main; mod metric_labels; @@ -14,6 +15,8 @@ mod profiles; pub use self::main::Main; use addr::{self, Addr}; +const CANONICAL_DST_HEADER: &'static str = "l5d-dst-canonical"; + pub fn init() -> Result { use convert::TryFrom; use logging; @@ -22,24 +25,29 @@ pub fn init() -> Result { config::Config::try_from(&config::Env) } -fn http_request_addr(req: &http::Request) -> Result { - use proxy::{http::h1, Source}; - const DEFAULT_PORT: u16 = 80; +const DEFAULT_PORT: u16 = 80; +fn http_request_authority_addr(req: &http::Request) -> Result { req.uri() .authority_part() .ok_or(addr::Error::InvalidHost) .and_then(|a| Addr::from_authority_and_default_port(a, DEFAULT_PORT)) - .or_else(|_| { - h1::authority_from_host(req) - .ok_or(addr::Error::InvalidHost) - .and_then(|a| Addr::from_authority_and_default_port(&a, DEFAULT_PORT)) - }) - .or_else(|e| { - req.extensions() - .get::() - .and_then(|src| src.orig_dst_if_not_local()) - .map(Addr::Socket) - .ok_or(e) - }) +} + +fn http_request_host_addr(req: &http::Request) -> Result { + use proxy::http::h1; + + h1::authority_from_host(req) + .ok_or(addr::Error::InvalidHost) + .and_then(|a| Addr::from_authority_and_default_port(&a, DEFAULT_PORT)) +} + +fn http_request_orig_dst_addr(req: &http::Request) -> Result { + use proxy::Source; + + req.extensions() + .get::() + .and_then(|src| src.orig_dst_if_not_local()) + .map(Addr::Socket) + .ok_or(addr::Error::InvalidHost) } diff --git a/src/app/outbound.rs b/src/app/outbound.rs index 960cc1ff1..e5c453f61 100644 --- a/src/app/outbound.rs +++ b/src/app/outbound.rs @@ -1,42 +1,19 @@ -use http; use std::fmt; -use app::classify; use control::destination::{Metadata, ProtocolHint}; -use proxy::http::{ - metrics::classify::CanClassify, - client, - normalize_uri::ShouldNormalizeUri, - profiles::{self, CanGetDestination}, - router, Settings, -}; -use svc::{self, stack_per_request::ShouldStackPerRequest}; +use proxy::http::settings; +use svc; use tap; use transport::{connect, tls}; -use {Addr, NameAddr}; +use NameAddr; #[derive(Clone, Debug)] pub struct Endpoint { - pub dst: Destination, + pub dst_name: Option, pub connect: connect::Target, pub metadata: Metadata, } -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct Destination { - pub addr: Addr, - pub settings: Settings, -} - -#[derive(Clone, Debug)] -pub struct Route { - pub dst: Destination, - pub route: profiles::Route, -} - -#[derive(Clone, Debug, Default)] -pub struct Recognize; - // === impl Endpoint === impl Endpoint { @@ -48,15 +25,9 @@ impl Endpoint { } } -impl ShouldNormalizeUri for Endpoint { - fn should_normalize_uri(&self) -> bool { - !self.dst.settings.is_http2() && !self.dst.settings.was_absolute_form() - } -} - -impl ShouldStackPerRequest for Endpoint { - fn should_stack_per_request(&self) -> bool { - !self.dst.settings.is_http2() && !self.dst.settings.can_reuse_clients() +impl settings::router::HasConnect for Endpoint { + fn connect(&self) -> connect::Target { + self.connect.clone() } } @@ -81,90 +52,23 @@ impl svc::watch::WithUpdate for Endpoint { } } -// Makes it possible to build a client::Stack. -impl From for client::Config { - fn from(ep: Endpoint) -> Self { - client::Config::new(ep.connect, ep.dst.settings) - } -} - impl From for tap::Endpoint { fn from(ep: Endpoint) -> Self { // TODO add route labels... tap::Endpoint { direction: tap::Direction::Out, labels: ep.metadata.labels().clone(), - client: ep.into(), + target: ep.connect.clone(), } } } -// === impl Route === - -impl CanClassify for Route { - type Classify = classify::Request; - - fn classify(&self) -> classify::Request { - self.route.response_classes().clone().into() - } -} - -// === impl Recognize === - -impl Recognize { - pub fn new() -> Self { - Self {} - } -} - -impl router::Recognize> for Recognize { - type Target = Destination; - - fn recognize(&self, req: &http::Request) -> Option { - let addr = super::http_request_addr(req).ok()?; - let settings = Settings::from_request(req); - let dst = Destination::new(addr, settings); - debug!("recognize: dst={:?}", dst); - Some(dst) - } -} - -// === impl Destination === - -impl Destination { - pub fn new(addr: Addr, settings: Settings) -> Self { - Self { addr, settings } - } -} - -impl CanGetDestination for Destination { - fn get_destination(&self) -> Option<&NameAddr> { - match self.addr { - Addr::Name(ref name) => Some(name), - _ => None, - } - } -} - -impl fmt::Display for Destination { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.addr.fmt(f) - } -} - -impl profiles::WithRoute for Destination { - type Output = Route; - - fn with_route(self, route: profiles::Route) -> Self::Output { - Route { dst: self, route } - } -} - pub mod discovery { use futures::{Async, Poll}; use std::net::SocketAddr; - use super::{Destination, Endpoint}; + use super::super::dst::DstAddr; + use super::Endpoint; use control::destination::Metadata; use proxy::resolve; use transport::{connect, tls}; @@ -175,8 +79,8 @@ pub mod discovery { #[derive(Debug)] pub enum Resolution { - Name(Destination, R), - Addr(Destination, Option), + Name(NameAddr, R), + Addr(Option), } // === impl Resolve === @@ -190,17 +94,17 @@ pub mod discovery { } } - impl resolve::Resolve for Resolve + impl resolve::Resolve for Resolve where R: resolve::Resolve, { type Endpoint = Endpoint; type Resolution = Resolution; - fn resolve(&self, dst: &Destination) -> Self::Resolution { - match dst.addr { - Addr::Name(ref name) => Resolution::Name(dst.clone(), self.0.resolve(&name)), - Addr::Socket(ref addr) => Resolution::Addr(dst.clone(), Some(*addr)), + fn resolve(&self, dst: &DstAddr) -> Self::Resolution { + match dst.as_ref() { + Addr::Name(ref name) => Resolution::Name(name.clone(), self.0.resolve(&name)), + Addr::Socket(ref addr) => Resolution::Addr(Some(*addr)), } } } @@ -216,7 +120,7 @@ pub mod discovery { fn poll(&mut self) -> Poll, Self::Error> { match self { - Resolution::Name(ref dst, ref mut res) => match try_ready!(res.poll()) { + Resolution::Name(ref name, ref mut res) => match try_ready!(res.poll()) { resolve::Update::Remove(addr) => { Ok(Async::Ready(resolve::Update::Remove(addr))) } @@ -230,23 +134,22 @@ pub mod discovery { Conditional::Some(_) => tls::ReasonForNoTls::NoConfig, }; let ep = Endpoint { - dst: dst.clone(), + dst_name: Some(name.clone()), connect: connect::Target::new(addr, Conditional::None(tls)), metadata, }; Ok(Async::Ready(resolve::Update::Add(addr, ep))) } }, - Resolution::Addr(ref dst, ref mut addr) => match addr.take() { + Resolution::Addr(ref mut addr) => match addr.take() { Some(addr) => { let tls = tls::ReasonForNoIdentity::NoAuthorityInHttpRequest; let ep = Endpoint { - dst: dst.clone(), + dst_name: None, connect: connect::Target::new(addr, Conditional::None(tls.into())), metadata: Metadata::none(tls), }; - let up = resolve::Update::Add(addr, ep); - Ok(Async::Ready(up)) + Ok(Async::Ready(resolve::Update::Add(addr, ep))) } None => Ok(Async::NotReady), }, @@ -259,7 +162,7 @@ pub mod orig_proto_upgrade { use http; use super::Endpoint; - use proxy::http::{orig_proto, Settings}; + use proxy::http::orig_proto; use svc; #[derive(Debug, Clone)] @@ -302,13 +205,8 @@ pub mod orig_proto_upgrade { type Error = M::Error; fn make(&self, endpoint: &Endpoint) -> Result { - if endpoint.can_use_orig_proto() - && !endpoint.dst.settings.is_http2() - && !endpoint.dst.settings.is_h1_upgrade() - { - let mut upgraded = endpoint.clone(); - upgraded.dst.settings = Settings::Http2; - self.inner.make(&upgraded).map(|i| svc::Either::A(i.into())) + if endpoint.can_use_orig_proto() { + self.inner.make(&endpoint).map(|i| svc::Either::A(i.into())) } else { self.inner.make(&endpoint).map(svc::Either::B) } diff --git a/src/app/profiles.rs b/src/app/profiles.rs index 4823f2de5..9f7f2460b 100644 --- a/src/app/profiles.rs +++ b/src/app/profiles.rs @@ -9,14 +9,12 @@ use tower_h2::{Body, BoxBody, Data, HttpService}; use api::destination as api; -use control; use proxy::http::profiles; use NameAddr; #[derive(Clone, Debug)] -pub struct Client { +pub struct Client { service: Option, - normalize_name: N, backoff: Duration, } @@ -36,35 +34,31 @@ enum State { // === impl Client === -impl Client +impl Client where T: HttpService + Clone, T::ResponseBody: Body, T::Error: fmt::Debug, - N: control::Normalize, { - pub fn new(service: Option, backoff: Duration, normalize_name: N) -> Self { + pub fn new(service: Option, backoff: Duration) -> Self { Self { service, backoff, - normalize_name, } } } -impl profiles::GetRoutes for Client +impl profiles::GetRoutes for Client where T: HttpService + Clone, T::ResponseBody: Body, T::Error: fmt::Debug, - N: control::Normalize, { type Stream = Rx; fn get_routes(&self, dst: &NameAddr) -> Option { - let fqa = self.normalize_name.normalize(dst)?; Some(Rx { - dst: fqa.without_trailing_dot().to_owned(), + dst: format!("{}", dst), state: State::Disconnected, service: self.service.clone(), backoff: self.backoff, diff --git a/src/conditional.rs b/src/conditional.rs index 32838803f..989d37c6c 100644 --- a/src/conditional.rs +++ b/src/conditional.rs @@ -40,4 +40,15 @@ where { self.and_then(|c| Conditional::Some(f(c))) } + + pub fn is_none(&self) -> bool { + match self { + Conditional::None(_) => true, + Conditional::Some(_) => false, + } + } + + pub fn is_some(&self) -> bool { + !self.is_none() + } } diff --git a/src/control/destination/background/mod.rs b/src/control/destination/background/mod.rs index d86a63d06..31fc7b0f7 100644 --- a/src/control/destination/background/mod.rs +++ b/src/control/destination/background/mod.rs @@ -25,7 +25,6 @@ use super::{ResolveRequest, Update}; use app::config::Namespaces; use control::{ cache::Exists, - fully_qualified_authority::{KubernetesNormalize, Normalize as _Normalize}, remote_stream::{Receiver, Remote}, }; use dns; @@ -68,6 +67,7 @@ struct DestinationCache { /// query. struct NewQuery { namespaces: Namespaces, + suffixes: Vec, /// Used for counting the number of currently-active queries. /// /// Each active query will hold a `Weak` reference back to this `Arc`, and @@ -96,10 +96,11 @@ where request_rx: mpsc::UnboundedReceiver, dns_resolver: dns::Resolver, namespaces: Namespaces, + suffixes: Vec, concurrency_limit: usize, ) -> Self { Self { - new_query: NewQuery::new(namespaces, concurrency_limit), + new_query: NewQuery::new(namespaces, suffixes, concurrency_limit), dns_resolver, dsts: DestinationCache::new(), rpc_ready: false, @@ -305,9 +306,10 @@ where impl NewQuery { - fn new(namespaces: Namespaces, concurrency_limit: usize) -> Self { + fn new(namespaces: Namespaces, suffixes: Vec, concurrency_limit: usize) -> Self { Self { namespaces, + suffixes, concurrency_limit, active_query_handle: Arc::new(()), } @@ -338,7 +340,7 @@ impl NewQuery { fn query_destination_service_if_relevant( &self, client: Option<&mut T>, - auth: &NameAddr, + dst: &NameAddr, connect_or_reconnect: &str, ) -> DestinationServiceQuery where @@ -346,36 +348,30 @@ impl NewQuery { T::ResponseBody: Body, T::Error: fmt::Debug, { - trace!( - "DestinationServiceQuery {} {:?}", - connect_or_reconnect, - auth - ); - let default_ns = self.namespaces.pod.clone(); - let client_and_authority = client.and_then(|client| { - KubernetesNormalize::new(default_ns) - .normalize(auth) - .map(|auth| (auth, client)) - }); - match client_and_authority { + trace!("DestinationServiceQuery {} {:?}", connect_or_reconnect, dst); + if !self.suffixes.iter().any(|s| s.contains(dst.name())) { + debug!("dst={} not in suffixes", dst.name()); + return DestinationServiceQuery::Inactive; + } + match client { // If we were able to normalize the authority (indicating we should // query the Destination service), but we're out of queries, return // None and set the "needs query" flag. - Some((ref auth, _)) if !self.has_more_queries() => { + Some(_) if !self.has_more_queries() => { warn!( "Can't query Destination service for {:?}, maximum \ number of queries ({}) reached.", - auth, + dst, self.concurrency_limit, ); DestinationServiceQuery::NoCapacity }, // We should query the Destination service and there is sufficient // query capacity, so we're good to go! - Some((auth, client)) => { + Some(client) => { let req = GetDestination { scheme: "k8s".into(), - path: auth.without_trailing_dot().to_owned(), + path: format!("{}", dst), }; let mut svc = Destination::new(client.lift_ref()); let response = svc.get(grpc::Request::new(req)); @@ -387,7 +383,7 @@ impl NewQuery { }, // This authority should not query the Destination service. Return // None, but set the "needs query" flag to false. - None => DestinationServiceQuery::Inactive + _ => DestinationServiceQuery::Inactive } } diff --git a/src/control/destination/mod.rs b/src/control/destination/mod.rs index fbb3fd3e9..d4071f820 100644 --- a/src/control/destination/mod.rs +++ b/src/control/destination/mod.rs @@ -114,6 +114,7 @@ pub fn new( mut client: Option, dns_resolver: dns::Resolver, namespaces: Namespaces, + suffixes: Vec, concurrency_limit: usize, ) -> (Resolver, impl Future) where @@ -127,6 +128,7 @@ where rx, dns_resolver, namespaces, + suffixes, concurrency_limit, ); let task = future::poll_fn(move || bg.poll_rpc(&mut client)); diff --git a/src/control/fully_qualified_authority.rs b/src/control/fully_qualified_authority.rs deleted file mode 100644 index a347173c3..000000000 --- a/src/control/fully_qualified_authority.rs +++ /dev/null @@ -1,236 +0,0 @@ -use bytes::{BytesMut}; - -use NameAddr; - -pub trait Normalize { - fn normalize(&self, authority: &NameAddr) -> Option; -} - -#[derive(Clone, Debug)] -pub struct KubernetesNormalize { - default_namespace: String, -} - -/// A normalized `Authority`. -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct FullyQualifiedAuthority(String); - -impl KubernetesNormalize { - pub fn new(default_namespace: String) -> Self { - Self { default_namespace } - } -} - -impl Normalize for KubernetesNormalize { - /// Normalizes the name according to Kubernetes service naming conventions. - /// Case folding is not done; that is done internally inside `Authority`. - fn normalize(&self, authority: &NameAddr) -> Option { - let name: &str = authority.name().as_ref(); - - // parts should have a maximum 4 of pieces (name, namespace, svc, zone) - let mut parts = name.splitn(4, '.'); - - // `dns::Name` guarantees the name has at least one part. - assert!(parts.next().is_some()); - - // Rewrite "$name" -> "$name.$default_namespace". - let has_explicit_namespace = match parts.next() { - Some("") => { - // "$name." is an external absolute name. - return None; - }, - Some(_) => true, - None => false, - }; - let namespace_to_append = if !has_explicit_namespace { - Some(&self.default_namespace) - } else { - None - }; - - // Rewrite "$name.$namespace" -> "$name.$namespace.svc". - let append_svc = if let Some(part) = parts.next() { - if !part.eq_ignore_ascii_case("svc") { - // If not "$name.$namespace.svc", treat as external. - return None; - } - false - } else if has_explicit_namespace { - true - } else if namespace_to_append.is_none() { - // We can't append ".svc" without a namespace, so treat as external. - return None; - } else { - true - }; - - // Rewrite "$name.$namespace.svc" -> "$name.$namespace.svc.$zone". - static DEFAULT_ZONE: &str = "cluster.local"; // TODO: make configurable. - let (zone_to_append, strip_last) = if let Some(zone) = parts.next() { - let (zone, strip_last) = - if zone.ends_with('.') { - (&zone[..zone.len() - 1], true) - } else { - (zone, false) - }; - if !zone.eq_ignore_ascii_case(DEFAULT_ZONE) { - // "a.b.svc." is an external absolute name. - // "a.b.svc.foo" is external if the default zone is not - // "foo". - return None; - } - (None, strip_last) - } else { - (Some(DEFAULT_ZONE), false) - }; - - let mut additional_len = 0; - if let Some(namespace) = namespace_to_append { - additional_len += 1 + namespace.len(); // "." + namespace - } - if append_svc { - additional_len += 4; // ".svc" - } - if let Some(zone) = zone_to_append { - additional_len += 1 + zone.len(); // "." + zone - } - - let port_str_len = match authority.port() { - 80 => 0, // XXX: Assumes http://, which is all we support right now. - p if p >= 10000 => 1 + 5, - p if p >= 1000 => 1 + 4, - p if p >= 100 => 1 + 3, - p if p >= 10 => 1 + 2, - _ => 1, - }; - - let mut normalized = BytesMut::with_capacity(name.len() + additional_len + port_str_len); - normalized.extend_from_slice(name.as_bytes()); - if let Some(namespace) = namespace_to_append { - normalized.extend_from_slice(b"."); - normalized.extend_from_slice(namespace.as_bytes()); - } - if append_svc { - normalized.extend_from_slice(b".svc"); - } - if let Some(zone) = zone_to_append { - normalized.extend_from_slice(b"."); - normalized.extend_from_slice(zone.as_bytes()); - } - - if strip_last { - let new_len = normalized.len() - 1; - normalized.truncate(new_len); - } - - // Append the port - if port_str_len > 0 { - normalized.extend_from_slice(b":"); - let port = authority.port().to_string(); - normalized.extend_from_slice(port.as_ref()); - } - - Some(FullyQualifiedAuthority(String::from_utf8(normalized.freeze().to_vec()).unwrap())) - } -} - -impl FullyQualifiedAuthority { - pub fn without_trailing_dot(&self) -> &str { - &self.0 - } -} - -#[cfg(test)] -mod tests { - use http::uri::Authority; - use std::str::FromStr; - - use {Addr, NameAddr}; - use super::Normalize; - - #[test] - fn test_normalized_authority() { - fn dns_name_and_port_from_str(input: &str) -> NameAddr { - let authority = Authority::from_str(input).unwrap(); - match Addr::from_authority_and_default_port(&authority, 80) { - Ok(Addr::Name(name)) => name, - Err(e) => { - unreachable!("{:?} when parsing {:?}", e, input) - }, - _ => unreachable!("Not a DNS name: {:?}", input), - } - } - - fn local(input: &str, default_namespace: &str) -> String { - let name = dns_name_and_port_from_str(input); - let output = super::KubernetesNormalize::new(default_namespace.to_owned()).normalize(&name); - assert!(output.is_some(), "input: {}", input); - output.unwrap().without_trailing_dot().into() - } - - fn external(input: &str, default_namespace: &str) { - let name = dns_name_and_port_from_str(input); - let output = super::KubernetesNormalize::new(default_namespace.to_owned()).normalize(&name); - assert!(output.is_none(), "input: {}", input); - } - - assert_eq!("name.namespace.svc.cluster.local", local("name", "namespace")); - assert_eq!("name.namespace.svc.cluster.local", local("name.namespace", "namespace")); - assert_eq!("name.namespace.svc.cluster.local", - local("name.namespace.svc", "namespace")); - external("name.namespace.svc.cluster", "namespace"); - assert_eq!("name.namespace.svc.cluster.local", - local("name.namespace.svc.cluster.local", "namespace")); - - // Fully-qualified names end with a dot and aren't modified except by removing the dot. - external("name.", "namespace"); - external("name.namespace.", "namespace"); - external("name.namespace.svc.", "namespace"); - external("name.namespace.svc.cluster.", "namespace"); - external("name.namespace.svc.acluster.local.", "namespace"); - assert_eq!("name.namespace.svc.cluster.local", - local("name.namespace.svc.cluster.local.", "namespace")); - - // Irrespective of how other absolute names are resolved, "localhost." - // absolute names aren't ever resolved through the destination service, - // as prescribed by https://tools.ietf.org/html/rfc6761#section-6.3: - // - // The domain "localhost." and any names falling within ".localhost." - // are special in the following ways: [...] - // - // Name resolution APIs and libraries SHOULD recognize localhost - // names as special and SHOULD always return the IP loopback address - // for address queries [...] Name resolution APIs SHOULD NOT send - // queries for localhost names to their configured caching DNS server(s). - external("localhost.", "namespace"); - external("name.localhost.", "namespace"); - external("name.namespace.svc.localhost.", "namespace"); - - // Although it probably isn't the desired behavior in almost any circumstance, match - // standard behavior for non-absolute "localhost" and names that end with - // ".localhost" at least until we're comfortable implementing - // https://wiki.tools.ietf.org/html/draft-ietf-dnsop-let-localhost-be-localhost. - assert_eq!("localhost.namespace.svc.cluster.local", - local("localhost", "namespace")); - assert_eq!("name.localhost.svc.cluster.local", - local("name.localhost", "namespace")); - - // Ports are preserved. - assert_eq!("name.namespace.svc.cluster.local:1234", - local("name:1234", "namespace")); - assert_eq!("name.namespace.svc.cluster.local:1234", - local("name.namespace:1234", "namespace")); - assert_eq!("name.namespace.svc.cluster.local:1234", - local("name.namespace.svc:1234", "namespace")); - external("name.namespace.svc.cluster:1234", "namespace"); - assert_eq!("name.namespace.svc.cluster.local:1234", - local("name.namespace.svc.cluster.local:1234", "namespace")); - - // "SVC" is recognized as being equivalent to "svc" - assert_eq!("name.namespace.svc.cluster.local", - local("name.namespace.SVC", "namespace")); - external("name.namespace.SVC.cluster", "namespace"); - assert_eq!("name.namespace.svc.cluster.local", - local("name.namespace.SVC.cluster.local", "namespace")); - } -} diff --git a/src/control/mod.rs b/src/control/mod.rs index b501c8c13..af0c8cec5 100644 --- a/src/control/mod.rs +++ b/src/control/mod.rs @@ -1,11 +1,9 @@ mod cache; pub mod destination; -mod fully_qualified_authority; mod observe; pub mod pb; mod remote_stream; mod serve_http; -pub use self::fully_qualified_authority::{Normalize, KubernetesNormalize, FullyQualifiedAuthority}; pub use self::observe::Observe; pub use self::serve_http::serve_http; diff --git a/src/control/pb.rs b/src/control/pb.rs index 3838d26a2..ef674eebc 100644 --- a/src/control/pb.rs +++ b/src/control/pb.rs @@ -46,7 +46,7 @@ impl event::StreamResponseEnd { proxy_direction: ctx.endpoint.direction.as_pb().into(), source: Some((&ctx.source.remote).into()), source_meta: Some(ctx.source.src_meta()), - destination: Some((&ctx.endpoint.client.target.addr).into()), + destination: Some((&ctx.endpoint.target.addr).into()), destination_meta: Some(ctx.endpoint.dst_meta()), event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { event: Some(tap::tap_event::http::Event::ResponseEnd(end)), @@ -72,7 +72,7 @@ impl event::StreamResponseFail { proxy_direction: ctx.endpoint.direction.as_pb().into(), source: Some((&ctx.source.remote).into()), source_meta: Some(ctx.source.src_meta()), - destination: Some((&ctx.endpoint.client.target.addr).into()), + destination: Some((&ctx.endpoint.target.addr).into()), destination_meta: Some(ctx.endpoint.dst_meta()), event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { event: Some(tap::tap_event::http::Event::ResponseEnd(end)), @@ -98,7 +98,7 @@ impl event::StreamRequestFail { proxy_direction: ctx.endpoint.direction.as_pb().into(), source: Some((&ctx.source.remote).into()), source_meta: Some(ctx.source.src_meta()), - destination: Some((&ctx.endpoint.client.target.addr).into()), + destination: Some((&ctx.endpoint.target.addr).into()), destination_meta: Some(ctx.endpoint.dst_meta()), event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { event: Some(tap::tap_event::http::Event::ResponseEnd(end)), @@ -131,7 +131,7 @@ impl<'a> TryFrom<&'a Event> for tap::TapEvent { proxy_direction: ctx.endpoint.direction.as_pb().into(), source: Some((&ctx.source.remote).into()), source_meta: Some(ctx.source.src_meta()), - destination: Some((&ctx.endpoint.client.target.addr).into()), + destination: Some((&ctx.endpoint.target.addr).into()), destination_meta: Some(ctx.endpoint.dst_meta()), event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { event: Some(tap::tap_event::http::Event::RequestInit(init)), @@ -154,7 +154,7 @@ impl<'a> TryFrom<&'a Event> for tap::TapEvent { proxy_direction: ctx.request.endpoint.direction.as_pb().into(), source: Some((&ctx.request.source.remote).into()), source_meta: Some(ctx.request.source.src_meta()), - destination: Some((&ctx.request.endpoint.client.target.addr).into()), + destination: Some((&ctx.request.endpoint.target.addr).into()), destination_meta: Some(ctx.request.endpoint.dst_meta()), event: Some(tap::tap_event::Event::Http(tap::tap_event::Http { event: Some(tap::tap_event::http::Event::ResponseInit(init)), @@ -204,7 +204,7 @@ impl event::Endpoint { fn dst_meta(&self) -> tap::tap_event::EndpointMeta { let mut meta = tap::tap_event::EndpointMeta::default(); meta.labels.extend(self.labels.clone()); - meta.labels.insert("tls".to_owned(), format!("{}", self.client.target.tls_status())); + meta.labels.insert("tls".to_owned(), format!("{}", self.target.tls_status())); meta } } diff --git a/src/dns.rs b/src/dns.rs index 61c88893d..1b7e7da0b 100644 --- a/src/dns.rs +++ b/src/dns.rs @@ -1,30 +1,26 @@ +use convert::TryFrom; use futures::prelude::*; -use std::fmt; -use std::net::IpAddr; +use std::{fmt, net}; use std::time::Instant; use tokio::timer::Delay; use trust_dns_resolver::{ - self, config::{ResolverConfig, ResolverOpts}, - error::{ResolveError, ResolveErrorKind}, - lookup_ip::LookupIp, + lookup_ip::{LookupIp}, + system_conf, AsyncResolver, + BackgroundLookupIp, }; +pub use trust_dns_resolver::error::{ResolveError, ResolveErrorKind}; + use app::config::Config; use transport::tls; -use Addr; #[derive(Clone)] pub struct Resolver { resolver: AsyncResolver, } -pub enum IpAddrFuture { - DNS(Box + Send>), - Fixed(IpAddr), -} - #[derive(Debug)] pub enum Error { NoAddressesFound, @@ -36,8 +32,11 @@ pub enum Response { DoesNotExist { retry_after: Option }, } -// `Box` implements `Future` so it doesn't need to be implemented manually. -pub type IpAddrListFuture = Box + Send>; +pub struct IpAddrFuture(::logging::ContextualFuture); + +pub struct RefineFuture(::logging::ContextualFuture); + +pub type IpAddrListFuture = Box + Send>; /// A valid DNS name. /// @@ -46,19 +45,70 @@ pub type IpAddrListFuture = Box + Send /// valid certificate. pub type Name = tls::DnsName; -struct ResolveAllCtx(Name); +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +pub enum Suffix { + Root, // The `.` suffix. + Name(Name), +} -impl fmt::Display for ResolveAllCtx { +struct Ctx(Name); + +pub struct Refine { + pub name: Name, + pub valid_until: Instant, +} + +impl fmt::Display for Ctx { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "resolve_all_ips={}", self.0) + write!(f, "dns={}", self.0) } } -struct ResolveOneCtx(Name); +impl fmt::Display for Suffix { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Suffix::Root => write!(f, "."), + Suffix::Name(n) => n.fmt(f), + } + } +} -impl fmt::Display for ResolveOneCtx { - fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "resolve_one_ip={}", self.0) +impl From for Suffix { + fn from(n: Name) -> Self { + Suffix::Name(n) + } +} + +impl<'s> TryFrom<&'s str> for Suffix { + type Err = >::Err; + fn try_from(s: &str) -> Result { + if s == "." { + Ok(Suffix::Root) + } else { + Name::try_from(s.as_bytes()).map(|n| n.into()) + } + } +} + +impl Suffix { + pub fn contains(&self, name: &Name) -> bool { + match self { + Suffix::Root => true, + Suffix::Name(ref sfx) => { + let name = name.without_trailing_dot(); + let sfx = sfx.without_trailing_dot(); + name.ends_with(sfx) && { + name.len() == sfx.len() || { + // foo.bar.bah (11) + // bar.bah (7) + let idx = name.len() - sfx.len(); + let (hd, _) = name.split_at(idx); + hd.ends_with('.') + } + } + + } + } } } @@ -76,7 +126,7 @@ impl Resolver { /// TODO: This should be infallible like it is in the `domain` crate. pub fn from_system_config_and_env(env_config: &Config) -> Result<(Self, impl Future + Send), ResolveError> { - let (config, opts) = trust_dns_resolver::system_conf::read_system_conf()?; + let (config, opts) = system_conf::read_system_conf()?; let opts = env_config.configure_resolver_opts(opts); trace!("DNS config: {:?}", &config); trace!("DNS opts: {:?}", &opts); @@ -99,21 +149,10 @@ impl Resolver { (resolver, background) } - pub fn resolve_one_ip(&self, host: &Addr) -> IpAddrFuture { - match host { - Addr::Name(n) => { - let name = n.name(); - let ctx = ResolveOneCtx(name.clone()); - let f = ::logging::context_future(ctx, self.lookup_ip(name)); - IpAddrFuture::DNS(Box::new(f)) - } - Addr::Socket(addr) => IpAddrFuture::Fixed(addr.ip()), - } - } + pub fn resolve_all_ips(&self, deadline: Instant, name: &Name) -> IpAddrListFuture { + let lookup = self.resolver.lookup_ip(name.as_ref()); - pub fn resolve_all_ips(&self, deadline: Instant, host: &Name) -> IpAddrListFuture { - let name = host.clone(); - let lookup = self.lookup_ip(&name); + // FIXME this delay logic is really confusing... let f = Delay::new(deadline) .then(move |_| { trace!("after delay"); @@ -121,24 +160,33 @@ impl Resolver { }) .then(move |result| { trace!("completed with {:?}", &result); - match result { - Ok(ips) => Ok(Response::Exists(ips)), - Err(e) => { - if let &ResolveErrorKind::NoRecordsFound { valid_until, .. } = e.kind() { - Ok(Response::DoesNotExist { retry_after: valid_until }) - } else { - Err(e) - } + result.map(Response::Exists).or_else(|e| { + if let &ResolveErrorKind::NoRecordsFound { valid_until, .. } = e.kind() { + Ok(Response::DoesNotExist { retry_after: valid_until }) + } else { + Err(e) } - } + }) }); - Box::new(::logging::context_future(ResolveAllCtx(name), f)) + + Box::new(::logging::context_future(Ctx(name.clone()), f)) } - fn lookup_ip(&self, name: &Name) - -> impl Future - { - self.resolver.lookup_ip(name.as_ref()) + pub fn resolve_one_ip(&self, name: &Name) -> IpAddrFuture { + let f = self.resolver.lookup_ip(name.as_ref()); + IpAddrFuture(::logging::context_future(Ctx(name.clone()), f)) + } + + /// Attempts to refine `name` to a fully-qualified name. + /// + /// This method does DNS resolution for `name` and ignores the IP address + /// result, instead returning the `Name` that was resolved. + /// + /// For example, a name like `web` may be refined to `web.example.com.`, + /// depending on the DNS search path. + pub fn refine(&self, name: &Name) -> RefineFuture { + let f = self.resolver.lookup_ip(name.as_ref()); + RefineFuture(::logging::context_future(Ctx(name.clone()), f)) } } @@ -153,45 +201,45 @@ impl fmt::Debug for Resolver { } impl Future for IpAddrFuture { - type Item = IpAddr; + type Item = net::IpAddr; type Error = Error; fn poll(&mut self) -> Poll { - match *self { - IpAddrFuture::DNS(ref mut inner) => match inner.poll() { - Ok(Async::NotReady) => { - trace!("dns not ready"); - Ok(Async::NotReady) - } , - Ok(Async::Ready(ips)) => { - match ips.iter().next() { - Some(ip) => { - trace!("DNS resolution found: {:?}", ip); - Ok(Async::Ready(ip)) - }, - None => { - trace!("DNS resolution did not find anything"); - Err(Error::NoAddressesFound) - } - } - }, - Err(e) => Err(Error::ResolutionFailed(e)), - }, - IpAddrFuture::Fixed(addr) => Ok(Async::Ready(addr)), - } + let ips = try_ready!(self.0.poll().map_err(Error::ResolutionFailed)); + ips.iter() + .next() + .map(Async::Ready) + .ok_or_else(|| Error::NoAddressesFound) + } +} + +impl Future for RefineFuture { + type Item = Refine; + type Error = ResolveError; + + fn poll(&mut self) -> Poll { + let lookup = try_ready!(self.0.poll()); + let valid_until = lookup.valid_until(); + + let n = lookup.query().name(); + let name = Name::try_from(n.to_ascii().as_bytes()) + .expect("Name returned from resolver must be valid"); + + let refine = Refine { name, valid_until }; + Ok(Async::Ready(refine)) } } #[cfg(test)] mod tests { - use super::Name; + use super::{Name, Suffix}; + use convert::TryFrom; #[test] fn test_dns_name_parsing() { // Stack sure `dns::Name`'s validation isn't too strict. It is // implemented in terms of `webpki::DNSName` which has many more tests // at https://github.com/briansmith/webpki/blob/master/tests/dns_name_tests.rs. - use convert::TryFrom; struct Case { input: &'static str, @@ -232,4 +280,40 @@ mod tests { assert!(Name::try_from(case.as_bytes()).is_err()); } } + + #[test] + fn suffix_valid() { + for (name, suffix) in &[ + ("a", "."), + ("a.", "."), + ("a.b", "."), + ("a.b.", "."), + ("b.c", "b.c"), + ("b.c", "b.c"), + ("a.b.c", "b.c"), + ("a.b.c", "b.c."), + ("a.b.c.", "b.c"), + ("hacker.example.com", "example.com"), + ] { + let n = Name::try_from(name.as_bytes()).unwrap(); + let s = Suffix::try_from(suffix).unwrap(); + assert!(s.contains(&n), format!("{} should contain {}", suffix, name)); + } + } + + #[test] + fn suffix_invalid() { + for (name, suffix) in &[ + ("a", "b"), + ("b", "a.b"), + ("b.a", "b"), + ("hackerexample.com", "example.com"), + ] { + let n = Name::try_from(name.as_bytes()).unwrap(); + let s = Suffix::try_from(suffix).unwrap(); + assert!(!s.contains(&n), format!("{} should not contain {}", suffix, name)); + } + + assert!(Suffix::try_from("").is_err(), "suffix must not be empty"); + } } diff --git a/src/main.rs b/src/main.rs index a38b34382..3ebc922f9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ #![deny(warnings)] +#![recursion_limit="128"] + extern crate linkerd2_proxy; #[macro_use] extern crate log; diff --git a/src/proxy/canonicalize.rs b/src/proxy/canonicalize.rs new file mode 100644 index 000000000..38b399e9f --- /dev/null +++ b/src/proxy/canonicalize.rs @@ -0,0 +1,262 @@ +//! A stack module that lazily, dynamically resolves an `Addr` target, via DNS, +//! to determine it's canonical fully qualified domain name. +//! +//! For example, an application may set an authority value like `web:8080` with a +//! resolv.conf(5) search path of `example.com example.net`. In such a case, +//! this module may build its inner stack with either `web.example.com.:8080`, +//! `web.example.net.:8080`, or `web:8080`, depending on the state of DNS. +//! +//! DNS TTLs are honored and, if the resolution changes, the inner stack is +//! rebuilt with the updated value. + +use futures::{future, Async, Future, Poll}; +use std::time::Duration; +use std::{error, fmt}; +use tokio_timer::{clock, Delay, Timeout}; + +use dns; +use svc; +use {Addr, NameAddr}; + +/// The amount of time to wait for a DNS query to succeed before falling back to +/// an uncanonicalized address. +const DEFAULT_TIMEOUT: Duration = Duration::from_millis(100); + +/// Duration to wait before polling DNS again after an error (or a NXDOMAIN +/// response with no TTL). +const DNS_ERROR_TTL: Duration = Duration::from_secs(3); + +#[derive(Debug, Clone)] +pub struct Layer { + resolver: dns::Resolver, + timeout: Duration, +} + +#[derive(Clone, Debug)] +pub struct Stack> { + resolver: dns::Resolver, + inner: M, + timeout: Duration, +} + +pub struct Service> { + original: NameAddr, + canonical: Option, + resolver: dns::Resolver, + service: Option, + stack: M, + state: State, + timeout: Duration, +} + +enum State { + Pending(Timeout), + ValidUntil(Delay), +} + +#[derive(Debug)] +pub enum Error { + Stack(M), + Service(S), +} + +// === Layer === + +// FIXME the resolver should be abstracted to a trait so that this can be tested +// without a real DNS service. +pub fn layer(resolver: dns::Resolver) -> Layer { + Layer { + resolver, + timeout: DEFAULT_TIMEOUT, + } +} + +impl svc::Layer for Layer +where + M: svc::Stack + Clone, + M::Value: svc::Service, +{ + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; + + fn bind(&self, inner: M) -> Self::Stack { + Stack { + inner, + resolver: self.resolver.clone(), + timeout: self.timeout, + } + } +} + +// === impl Stack === + +impl svc::Stack for Stack +where + M: svc::Stack + Clone, + M::Value: svc::Service, +{ + type Value = svc::Either, M::Value>; + type Error = M::Error; + + fn make(&self, addr: &Addr) -> Result { + match addr { + Addr::Name(na) => { + let svc = Service::new( + na.clone(), + self.inner.clone(), + self.resolver.clone(), + self.timeout, + ); + Ok(svc::Either::A(svc)) + } + Addr::Socket(_) => self.inner.make(&addr).map(svc::Either::B), + } + } +} + +// === impl Service === + +impl Service +where + M: svc::Stack, + M::Value: svc::Service, +{ + fn new(original: NameAddr, stack: M, resolver: dns::Resolver, timeout: Duration) -> Self { + trace!("refining name={}", original.name()); + let f = resolver.refine(original.name()); + let state = State::Pending(Timeout::new(f, timeout)); + + Self { + original, + canonical: None, + stack, + service: None, + resolver, + state, + timeout, + } + } + + fn poll_state(&mut self) -> Poll<(), M::Error> { + loop { + self.state = match self.state { + State::Pending(ref mut fut) => match fut.poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(refine)) => { + trace!( + "update name={}, refined={}", + self.original.name(), + refine.name + ); + // If the resolved name is a new name, bind a + // service with it and set a delay that will notify + // when the resolver should be consulted again. + let canonical = NameAddr::new(refine.name, self.original.port()); + if self.canonical.as_ref() != Some(&canonical) { + let service = self.stack.make(&canonical.clone().into())?; + self.service = Some(service); + self.canonical = Some(canonical); + } + + State::ValidUntil(Delay::new(refine.valid_until)) + } + Err(e) => { + error!("failed to resolve {}: {:?}", self.original.name(), e); + + // If there was an error and there was no + // previously-built service, create one using the + // original name. + if self.service.is_none() { + let addr = self.original.clone().into(); + let service = self.stack.make(&addr)?; + self.service = Some(service); + + // self.canonical is NOT set here, because a + // canonical name has not been determined. + debug_assert!(self.canonical.is_none()); + } + + let valid_until = e + .into_inner() + .and_then(|e| match e.kind() { + dns::ResolveErrorKind::NoRecordsFound { valid_until, .. } => { + *valid_until + } + _ => None, + }) + .unwrap_or_else(|| clock::now() + DNS_ERROR_TTL); + + State::ValidUntil(Delay::new(valid_until)) + } + }, + + State::ValidUntil(ref mut f) => match f.poll().expect("timer must not fail") { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(()) => { + trace!("refresh name={}", self.original.name()); + // The last resolution's TTL expired, so issue a new DNS query. + let f = self.resolver.refine(self.original.name()); + State::Pending(Timeout::new(f, self.timeout)) + } + }, + }; + } + } +} + +impl svc::Service for Service +where + M: svc::Stack, + M::Value: svc::Service, +{ + type Request = ::Request; + type Response = ::Response; + type Error = Error::Error>; + type Future = future::MapErr< + ::Future, + fn(::Error) -> Self::Error, + >; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.poll_state().map_err(Error::Stack)?; + match self.service.as_mut() { + Some(ref mut svc) => { + trace!("checking service readiness"); + svc.poll_ready().map_err(Error::Service) + } + None => { + trace!("resolution has not completed"); + Ok(Async::NotReady) + } + } + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + self.service + .as_mut() + .expect("poll_ready must be called first") + .call(req) + .map_err(Error::Service) + } +} + +// === impl Error === + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Stack(e) => e.fmt(f), + Error::Service(e) => e.fmt(f), + } + } +} + +impl error::Error for Error { + fn cause(&self) -> Option<&error::Error> { + match self { + Error::Stack(e) => e.cause(), + Error::Service(e) => e.cause(), + } + } +} diff --git a/src/proxy/http/client.rs b/src/proxy/http/client.rs index fd9a6dc1b..13b2392ba 100644 --- a/src/proxy/http/client.rs +++ b/src/proxy/http/client.rs @@ -3,22 +3,23 @@ use futures::{future, Async, Future, Poll}; use h2; use http; use hyper; -use std::{self, error, fmt, net}; +use std::{error, fmt, net}; use std::marker::PhantomData; use tokio::executor::Executor; use tower_h2; use super::{h1, Settings}; use super::glue::{BodyPayload, HttpBody, HyperConnect}; +use super::normalize_uri::ShouldNormalizeUri; use super::upgrade::{HttpConnect, Http11Upgrade}; -use svc; +use svc::{self, stack_per_request::ShouldStackPerRequest}; use task::BoxExecutor; use transport::connect; /// Configurs an HTTP Client `Service` `Stack`. /// /// `settings` determines whether an HTTP/1 or HTTP/2 client is used. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct Config { pub target: connect::Target, pub settings: Settings, @@ -152,6 +153,25 @@ impl Config { } } +impl ShouldNormalizeUri for Config { + fn should_normalize_uri(&self) -> bool { + !self.settings.is_http2() && !self.settings.was_absolute_form() + } +} + +impl ShouldStackPerRequest for Config { + fn should_stack_per_request(&self) -> bool { + !self.settings.is_http2() && !self.settings.can_reuse_clients() + } +} + +impl fmt::Display for Config { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.target.addr.fmt(f) + } +} + + // === impl Layer === pub fn layer(proxy_name: &'static str) -> Layer @@ -460,8 +480,8 @@ impl fmt::Display for Error { } } -impl std::error::Error for Error { - fn cause(&self) -> Option<&std::error::Error> { +impl error::Error for Error { + fn cause(&self) -> Option<&error::Error> { match self { Error::Http1(e) => e.cause(), Error::Http2(e) => e.cause(), diff --git a/src/proxy/http/header_from_target.rs b/src/proxy/http/header_from_target.rs new file mode 100644 index 000000000..b3e070a49 --- /dev/null +++ b/src/proxy/http/header_from_target.rs @@ -0,0 +1,99 @@ +use futures::Poll; +use http; +use http::header::{IntoHeaderName, HeaderValue}; + +use svc; + +/// Wraps HTTP `Service` `Stack`s so that a displayable `T` is cloned into each request's +/// extensions. +#[derive(Debug, Clone)] +pub struct Layer { + header: H, +} + +/// Wraps an HTTP `Service` so that the Stack's `T -typed target` is cloned into +/// each request's extensions. +#[derive(Clone, Debug)] +pub struct Stack { + header: H, + inner: M, +} + +#[derive(Clone, Debug)] +pub struct Service { + header: H, + value: HeaderValue, + inner: S, +} + +// === impl Layer === + +pub fn layer(header: H) -> Layer +where + H: IntoHeaderName + Clone +{ + Layer { header } +} + +impl svc::Layer for Layer +where + H: IntoHeaderName + Clone, + T: Clone + Send + Sync + 'static, + HeaderValue: for<'t> From<&'t T>, + M: svc::Stack, + M::Value: svc::Service>, +{ + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; + + fn bind(&self, inner: M) -> Self::Stack { + Stack { + header: self.header.clone(), + inner, + } + } +} + +// === impl Stack === + +impl svc::Stack for Stack +where + H: IntoHeaderName + Clone, + T: Clone + Send + Sync + 'static, + HeaderValue: for<'t> From<&'t T>, + M: svc::Stack, + M::Value: svc::Service>, +{ + type Value = Service; + type Error = M::Error; + + fn make(&self, t: &T) -> Result { + let inner = self.inner.make(t)?; + let header = self.header.clone(); + let value = t.into(); + Ok(Service { header, inner, value }) + } +} + +// === impl Service === + +impl svc::Service for Service +where + H: IntoHeaderName + Clone, + S: svc::Service>, +{ + type Request = S::Request; + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.inner.poll_ready() + } + + fn call(&mut self, mut req: Self::Request) -> Self::Future { + req.headers_mut().insert(self.header.clone(), self.value.clone()); + self.inner.call(req) + } +} diff --git a/src/proxy/http/mod.rs b/src/proxy/http/mod.rs index 5d24214d7..a3d10db26 100644 --- a/src/proxy/http/mod.rs +++ b/src/proxy/http/mod.rs @@ -2,6 +2,7 @@ pub mod balance; pub mod client; pub(super) mod glue; pub mod h1; +pub mod header_from_target; pub mod insert_target; pub mod metrics; pub mod normalize_uri; diff --git a/src/proxy/http/orig_proto.rs b/src/proxy/http/orig_proto.rs index 695c4ae99..53eee4c32 100644 --- a/src/proxy/http/orig_proto.rs +++ b/src/proxy/http/orig_proto.rs @@ -48,66 +48,59 @@ where } fn call(&mut self, mut req: Self::Request) -> Self::Future { - let mut downgrade_response = false; - - if req.version() != http::Version::HTTP_2 { - debug!("upgrading {:?} to HTTP2 with orig-proto", req.version()); - - // absolute-form is far less common, origin-form is the usual, - // so only encode the extra information if it's different than - // the normal. - let was_absolute_form = h1::is_absolute_form(req.uri()); - if !was_absolute_form { - // Since the version is going to set to HTTP_2, the NormalizeUri - // middleware won't normalize the URI automatically, so it - // needs to be done now. - h1::normalize_our_view_of_uri(&mut req); - } - - let val = match (req.version(), was_absolute_form) { - (http::Version::HTTP_11, false) => "HTTP/1.1", - (http::Version::HTTP_11, true) => "HTTP/1.1; absolute-form", - (http::Version::HTTP_10, false) => "HTTP/1.0", - (http::Version::HTTP_10, true) => "HTTP/1.0; absolute-form", - (v, _) => unreachable!("bad orig-proto version: {:?}", v), - }; - req.headers_mut().insert( - L5D_ORIG_PROTO, - HeaderValue::from_static(val) - ); - - // transfer-encoding is illegal in HTTP2 - req.headers_mut().remove(TRANSFER_ENCODING); - - *req.version_mut() = http::Version::HTTP_2; - downgrade_response = true; - } - - let fut = self.inner.call(req); - - if downgrade_response { - fut.map(|mut res| { - debug_assert_eq!(res.version(), http::Version::HTTP_2); - let version = if let Some(orig_proto) = res.headers().get(L5D_ORIG_PROTO) { - debug!("downgrading {} response: {:?}", L5D_ORIG_PROTO, orig_proto); - if orig_proto == "HTTP/1.1" { - http::Version::HTTP_11 - } else if orig_proto == "HTTP/1.0" { - http::Version::HTTP_10 - } else { - warn!("unknown {} header value: {:?}", L5D_ORIG_PROTO, orig_proto); - res.version() - } - } else { - res.version() - }; - *res.version_mut() = version; - res - }) - } else { + if req.version() == http::Version::HTTP_2 || h1::wants_upgrade(&req) { // Just passing through... - fut.map(|res| res) + return self.inner.call(req).map(|res| res) } + + debug!("upgrading {:?} to HTTP2 with orig-proto", req.version()); + + // absolute-form is far less common, origin-form is the usual, + // so only encode the extra information if it's different than + // the normal. + let was_absolute_form = h1::is_absolute_form(req.uri()); + if !was_absolute_form { + // Since the version is going to set to HTTP_2, the NormalizeUri + // middleware won't normalize the URI automatically, so it + // needs to be done now. + h1::normalize_our_view_of_uri(&mut req); + } + + let val = match (req.version(), was_absolute_form) { + (http::Version::HTTP_11, false) => "HTTP/1.1", + (http::Version::HTTP_11, true) => "HTTP/1.1; absolute-form", + (http::Version::HTTP_10, false) => "HTTP/1.0", + (http::Version::HTTP_10, true) => "HTTP/1.0; absolute-form", + (v, _) => unreachable!("bad orig-proto version: {:?}", v), + }; + req.headers_mut().insert( + L5D_ORIG_PROTO, + HeaderValue::from_static(val) + ); + + // transfer-encoding is illegal in HTTP2 + req.headers_mut().remove(TRANSFER_ENCODING); + + *req.version_mut() = http::Version::HTTP_2; + + self.inner.call(req).map(|mut res| { + debug_assert_eq!(res.version(), http::Version::HTTP_2); + let version = if let Some(orig_proto) = res.headers().get(L5D_ORIG_PROTO) { + debug!("downgrading {} response: {:?}", L5D_ORIG_PROTO, orig_proto); + if orig_proto == "HTTP/1.1" { + http::Version::HTTP_11 + } else if orig_proto == "HTTP/1.0" { + http::Version::HTTP_10 + } else { + warn!("unknown {} header value: {:?}", L5D_ORIG_PROTO, orig_proto); + res.version() + } + } else { + res.version() + }; + *res.version_mut() = version; + res + }) } } diff --git a/src/proxy/http/profiles.rs b/src/proxy/http/profiles.rs index ebbfc859f..60a691f8a 100644 --- a/src/proxy/http/profiles.rs +++ b/src/proxy/http/profiles.rs @@ -172,11 +172,13 @@ pub mod router { use http; use std::{error, fmt}; + use dns; use svc; use super::*; - pub fn layer(get_routes: G, route_layer: R) -> Layer + pub fn layer(suffixes: Vec, get_routes: G, route_layer: R) + -> Layer where T: CanGetDestination + WithRoute + Clone, M: svc::Stack, @@ -186,11 +188,11 @@ pub mod router { ::Output, ::Output, svc::shared::Stack, - > - + Clone, + > + Clone, R::Value: svc::Service, { Layer { + suffixes, get_routes, route_layer, default_route: Route::default(), @@ -203,6 +205,7 @@ pub mod router { get_routes: G, route_layer: R, default_route: Route, + suffixes: Vec, _p: ::std::marker::PhantomData M>, } @@ -212,6 +215,7 @@ pub mod router { get_routes: G, route_layer: R, default_route: Route, + suffixes: Vec, } #[derive(Debug)] @@ -254,8 +258,7 @@ pub mod router { ::Output, ::Output, svc::shared::Stack, - > - + Clone, + > + Clone, R::Value: svc::Service, { type Value = as svc::Stack>::Value; @@ -268,6 +271,7 @@ pub mod router { get_routes: self.get_routes.clone(), route_layer: self.route_layer.clone(), default_route: self.default_route.clone(), + suffixes: self.suffixes.clone(), } } } @@ -282,8 +286,7 @@ pub mod router { ::Output, ::Output, svc::shared::Stack, - > - + Clone, + > + Clone, R::Value: svc::Service, { type Value = Service; @@ -298,9 +301,21 @@ pub mod router { stack.make(&t).map_err(Error::Route)? }; - let route_stream = target - .get_destination() - .and_then(|d| self.get_routes.get_routes(&d)); + let route_stream = match target.get_destination() { + Some(ref dst) => { + if self.suffixes.iter().any(|s| s.contains(dst.name())) { + debug!("fetching routes for {:?}", dst); + self.get_routes.get_routes(&dst) + } else { + debug!("skipping route discovery for dst={:?}", dst); + None + } + } + None => { + debug!("no destination for routes"); + None + } + }; Ok(Service { target: target.clone(), @@ -360,10 +375,12 @@ pub mod router { fn call(&mut self, req: Self::Request) -> Self::Future { for (ref condition, ref mut service) in &mut self.routes { if condition.is_match(&req) { + trace!("using configured route: {:?}", condition); return service.call(req); } } + trace!("using default route"); self.default_route.call(req) } } diff --git a/src/proxy/http/router.rs b/src/proxy/http/router.rs index 04917de77..7889b7cc7 100644 --- a/src/proxy/http/router.rs +++ b/src/proxy/http/router.rs @@ -182,6 +182,7 @@ where } fn call(&mut self, request: Self::Request) -> Self::Future { + trace!("routing..."); let inner = self.inner.call(request); ResponseFuture { inner } } diff --git a/src/proxy/http/settings.rs b/src/proxy/http/settings.rs index 69460d2a0..942679367 100644 --- a/src/proxy/http/settings.rs +++ b/src/proxy/http/settings.rs @@ -10,13 +10,6 @@ pub enum Settings { Http1 { /// Indicates whether a new service must be created for each request. stack_per_request: bool, - /// Whether the request wants to use HTTP/1.1's Upgrade mechanism. - /// - /// Since these cannot be translated into orig-proto, it must be - /// tracked here so as to allow those with `is_h1_upgrade: true` to - /// use an explicitly HTTP/1 service, instead of one that might - /// utilize orig-proto. - is_h1_upgrade: bool, /// Whether or not the request URI was in absolute form. /// /// This is used to configure Hyper's behaviour at the connection @@ -31,6 +24,9 @@ pub enum Settings { // ===== impl Settings ===== impl Settings { + // The router need only have enough capacity for each `Settings` variant. + const ROUTER_CAPACITY: usize = 5; + pub fn from_request(req: &http::Request) -> Self { if req.version() == http::Version::HTTP_2 { return Settings::Http2; @@ -49,9 +45,8 @@ impl Settings { .unwrap_or(true); Settings::Http1 { - was_absolute_form: super::h1::is_absolute_form(req.uri()), - is_h1_upgrade: super::h1::wants_upgrade(req), stack_per_request: is_missing_authority, + was_absolute_form: super::h1::is_absolute_form(req.uri()), } } @@ -74,13 +69,6 @@ impl Settings { } } - pub fn is_h1_upgrade(&self) -> bool { - match self { - Settings::Http1 { is_h1_upgrade, .. } => *is_h1_upgrade, - Settings::Http2 => false, - } - } - pub fn is_http2(&self) -> bool { match self { Settings::Http1 { .. } => false, @@ -88,3 +76,179 @@ impl Settings { } } } + +pub mod router { + extern crate linkerd2_router as rt; + + use futures::{Future, Poll}; + use http; + use std::{error, fmt}; + use std::marker::PhantomData; + + use super::Settings; + use proxy::http::client::Config; + use proxy::http::HasH2Reason; + use svc; + use transport::connect; + + pub trait HasConnect { + fn connect(&self) -> connect::Target; + } + + #[derive(Clone, Debug)] + pub struct Layer(PhantomData); + + #[derive(Clone, Debug)] + pub struct Stack(M); + + pub struct Service + where + M: svc::Stack, + M::Value: svc::Service>, + { + router: Router, + } + + pub struct ResponseFuture + where + M: svc::Stack, + M::Value: svc::Service>, + { + inner: as svc::Service>::Future + } + + #[derive(Debug)] + pub enum Error { + Service(E), + Stack(M), + } + + pub struct Recognize(connect::Target); + + type Router = rt::Router, Recognize, M>; + + pub fn layer() -> Layer { + Layer(PhantomData) + } + + impl svc::Layer for Layer + where + T: HasConnect, + M: svc::Stack + Clone, + M::Value: svc::Service>, + { + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; + + fn bind(&self, inner: M) -> Self::Stack { + Stack(inner) + } + } + + impl svc::Stack for Stack + where + T: HasConnect, + M: svc::Stack + Clone, + M::Value: svc::Service>, + { + type Value = Service; + type Error = M::Error; + + fn make(&self, target: &T) -> Result { + use std::time::Duration; + + let router = Router::new( + Recognize(target.connect()), + self.0.clone(), + Settings::ROUTER_CAPACITY, + // Doesn't matter, since we are guaranteed to have enough capacity. + Duration::from_secs(0), + ); + + Ok(Service { router }) + } + } + + impl rt::Recognize> for Recognize { + type Target = Config; + + fn recognize(&self, req: &http::Request) -> Option { + let settings = Settings::from_request(req); + Some(Config::new(self.0.clone(), settings)) + } + } + + impl svc::Service for Service + where + M: svc::Stack, + M::Value: svc::Service>, + { + type Request = as svc::Service>::Request; + type Response = as svc::Service>::Response; + type Error = Error<::Error, M::Error>; + type Future = ResponseFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + match self.router.poll_ready() { + Ok(async) => Ok(async), + Err(rt::Error::Inner(e)) => Err(Error::Service(e)), + Err(rt::Error::Route(e)) => Err(Error::Stack(e)), + Err(rt::Error::NoCapacity(_)) | Err(rt::Error::NotRecognized) => { + unreachable!("router must reliably dispatch"); + } + } + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + ResponseFuture { inner: self.router.call(req) } + } + } + + impl Future for ResponseFuture + where + M: svc::Stack, + M::Value: svc::Service>, + { + type Item = as svc::Service>::Response; + type Error = Error<::Error, M::Error>; + + fn poll(&mut self) -> Poll { + match self.inner.poll() { + Ok(async) => Ok(async), + Err(rt::Error::Inner(e)) => Err(Error::Service(e)), + Err(rt::Error::Route(e)) => Err(Error::Stack(e)), + Err(rt::Error::NoCapacity(_)) | Err(rt::Error::NotRecognized) => { + unreachable!("router must reliably dispatch"); + } + } + } + } + + impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Service(e) => e.fmt(f), + Error::Stack(e) => e.fmt(f), + } + } + } + + impl error::Error for Error { + fn cause(&self) -> Option<&error::Error> { + match self { + Error::Service(e) => e.cause(), + Error::Stack(e) => e.cause(), + } + } + } + + impl HasH2Reason for Error { + fn h2_reason(&self) -> Option<::h2::Reason> { + match self { + Error::Service(e) => e.h2_reason(), + Error::Stack(_) => None, + } + } + } +} diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index 5f735e529..c8fa9ca43 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -3,6 +3,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; pub mod buffer; +pub mod canonicalize; pub mod http; pub mod limit; mod protocol; diff --git a/src/tap/event.rs b/src/tap/event.rs index ab87c0e02..dbb9b28aa 100644 --- a/src/tap/event.rs +++ b/src/tap/event.rs @@ -3,7 +3,8 @@ use http; use indexmap::IndexMap; use std::time::Instant; -use proxy::{http::client, Source}; +use proxy::Source; +use transport::connect; // TODO this should be replaced with a string name. #[derive(Copy, Clone, Debug)] @@ -12,7 +13,7 @@ pub enum Direction { In, Out } #[derive(Clone, Debug)] pub struct Endpoint { pub direction: Direction, - pub client: client::Config, + pub target: connect::Target, pub labels: IndexMap, } diff --git a/src/tap/match_.rs b/src/tap/match_.rs index 1085c7ea9..c4da4149a 100644 --- a/src/tap/match_.rs +++ b/src/tap/match_.rs @@ -95,12 +95,12 @@ impl Match { Match::Destination(ref dst) => match *ev { Event::StreamRequestOpen(ref req) | Event::StreamRequestFail(ref req, _) => { - dst.matches(&req.endpoint.client.target.addr) + dst.matches(&req.endpoint.target.addr) } Event::StreamResponseOpen(ref rsp, _) | Event::StreamResponseFail(ref rsp, _) | Event::StreamResponseEnd(ref rsp, _) => - dst.matches(&rsp.request.endpoint.client.target.addr), + dst.matches(&rsp.request.endpoint.target.addr), _ => false, }, diff --git a/src/transport/connect.rs b/src/transport/connect.rs index 543ed3bc7..11b5ee5b1 100644 --- a/src/transport/connect.rs +++ b/src/transport/connect.rs @@ -2,8 +2,8 @@ extern crate tokio_connect; pub use self::tokio_connect::Connect; -use std::io; use std::net::SocketAddr; +use std::{hash, io}; use never::Never; use svc; @@ -12,6 +12,10 @@ use transport::{connection, tls}; #[derive(Debug, Clone)] pub struct Stack {} +/// A TCP connection target, optionally with TLS. +/// +/// Comparison operations ignore the TLS ClientConfig and only account for the +/// TLS status. #[derive(Clone, Debug)] pub struct Target { pub addr: SocketAddr, @@ -22,10 +26,7 @@ pub struct Target { // ===== impl Target ===== impl Target { - pub fn new( - addr: SocketAddr, - tls: tls::ConditionalConnectionConfig - ) -> Self { + pub fn new(addr: SocketAddr, tls: tls::ConditionalConnectionConfig) -> Self { Self { addr, tls, _p: () } } @@ -44,6 +45,21 @@ impl Connect for Target { } } +impl hash::Hash for Target { + fn hash(&self, state: &mut H) { + self.addr.hash(state); + self.tls_status().is_some().hash(state); + } +} + +impl PartialEq for Target { + fn eq(&self, other: &Target) -> bool { + self.addr.eq(&other.addr) && self.tls_status().is_some().eq(&other.tls_status().is_some()) + } +} + +impl Eq for Target {} + // ===== impl Stack ===== impl Stack { diff --git a/src/transport/tls/dns_name.rs b/src/transport/tls/dns_name.rs index 9a2ef14ad..868ffee86 100644 --- a/src/transport/tls/dns_name.rs +++ b/src/transport/tls/dns_name.rs @@ -12,6 +12,10 @@ impl DnsName { pub fn is_localhost(&self) -> bool { *self == DnsName::try_from("localhost.".as_bytes()).unwrap() } + + pub fn without_trailing_dot(&self) -> &str { + self.as_ref().trim_end_matches('.') + } } impl fmt::Display for DnsName { @@ -56,4 +60,20 @@ mod tests { assert_eq!(dns_name.is_localhost(), *expected_result, "{:?}", dns_name) } } + + #[test] + fn test_without_trailing_dot() { + let cases = &[ + ("localhost", "localhost"), + ("localhost.", "localhost"), + ("LocalhOsT.", "localhost"), + ("web.svc.local", "web.svc.local"), + ("web.svc.local.", "web.svc.local"), + ]; + for (host, expected_result) in cases { + let dns_name = DnsName::try_from(host.as_bytes()).expect(&format!("'{}' was invalid", host)); + assert_eq!(dns_name.without_trailing_dot(), *expected_result, "{:?}", dns_name) + } + assert!(DnsName::try_from(".".as_bytes()).is_err()); + } } diff --git a/tests/discovery.rs b/tests/discovery.rs index ec7c8ffdc..91fe10539 100644 --- a/tests/discovery.rs +++ b/tests/discovery.rs @@ -1,4 +1,5 @@ #![deny(warnings)] +#![recursion_limit="128"] #[macro_use] mod support; use self::support::*; @@ -11,7 +12,7 @@ macro_rules! generate_outbound_dns_limit_test { (server: $make_server:path, client: $make_client:path) => { #[test] fn outbound_dest_limit_does_not_limit_dns() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = $make_server().route("/", "hello").run(); let srv_addr = srv.addr; @@ -71,7 +72,7 @@ macro_rules! generate_tests { #[test] fn outbound_asks_controller_api() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = $make_server().route("/", "hello").route("/bye", "bye").run(); let ctrl = controller::new() @@ -86,7 +87,7 @@ macro_rules! generate_tests { #[test] fn outbound_dest_concurrency_limit() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = $make_server().route("/", "hello").run(); let srv_addr = srv.addr; @@ -158,7 +159,7 @@ macro_rules! generate_tests { #[test] fn outbound_reconnects_if_controller_stream_ends() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = $make_server().route("/recon", "nect").run(); @@ -196,16 +197,22 @@ macro_rules! generate_tests { }) } + fn init_env() -> app::config::TestEnv { + let _ = env_logger_init(); + let mut env = app::config::TestEnv::new(); + + // The bind timeout must be high enough to allow a DNS timeout. + env.put(app::config::ENV_BIND_TIMEOUT, "1s".to_owned()); + + env + } + fn outbound_destinations_reset_on_reconnect(f: F) where F: Fn() -> Option + Send + 'static { use std::thread; - let _ = env_logger::try_init(); - let mut env = app::config::TestEnv::new(); - - // set the bind timeout to 100 ms. - env.put(app::config::ENV_BIND_TIMEOUT, "100ms".to_owned()); + let env = init_env(); let srv = $make_server().route("/", "hello").run(); let ctrl = controller::new(); @@ -242,11 +249,7 @@ macro_rules! generate_tests { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_times_out() { - let _ = env_logger::try_init(); - let mut env = app::config::TestEnv::new(); - - // set the bind timeout to 100 ms. - env.put(app::config::ENV_BIND_TIMEOUT, "100ms".to_owned()); + let env = init_env(); let srv = $make_server().route("/hi", "hello").run(); let ctrl = controller::new(); @@ -268,7 +271,7 @@ macro_rules! generate_tests { #[test] fn outbound_asks_controller_without_orig_dst() { - let _ = env_logger::try_init(); + let _ = init_env(); let srv = $make_server() .route("/", "hello") @@ -291,7 +294,7 @@ macro_rules! generate_tests { #[test] fn outbound_error_reconnects_after_backoff() { - let _ = env_logger::try_init(); + let mut env = init_env(); let srv = $make_server() .route("/", "hello") @@ -306,9 +309,6 @@ macro_rules! generate_tests { dst_tx.send_addr(srv.addr); // but don't drop, to not trigger stream closing reconnects - let mut env = app::config::TestEnv::new(); - - // set the backoff timeout to 100 ms. env.put(app::config::ENV_CONTROL_BACKOFF_DELAY, "100ms".to_owned()); let proxy = proxy::new() @@ -367,7 +367,7 @@ mod http1 { #[test] fn outbound_updates_newer_services() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1().route("/h1", "hello h1").run(); @@ -402,7 +402,7 @@ mod proxy_to_proxy { #[test] fn outbound_http1() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); // Instead of a second proxy, this mocked h2 server will be the target. let srv = server::http2() @@ -433,7 +433,7 @@ mod proxy_to_proxy { #[test] fn inbound_http1() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1() .route_fn("/h1", |req| { diff --git a/tests/shutdown.rs b/tests/shutdown.rs index 1c370dcae..fdd3d43ae 100644 --- a/tests/shutdown.rs +++ b/tests/shutdown.rs @@ -1,10 +1,11 @@ #![deny(warnings)] +#![recursion_limit="128"] mod support; use self::support::*; #[test] fn h2_goaways_connections() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let (shdn, rx) = shutdown_signal(); @@ -24,7 +25,7 @@ fn h2_goaways_connections() { #[test] fn h2_exercise_goaways_connections() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); const RESPONSE_SIZE: usize = 1024 * 16; const NUM_REQUESTS: usize = 50; @@ -84,7 +85,7 @@ fn h2_exercise_goaways_connections() { #[test] fn http1_closes_idle_connections() { use std::cell::RefCell; - let _ = env_logger::try_init(); + let _ = env_logger_init(); let (shdn, rx) = shutdown_signal(); @@ -119,7 +120,7 @@ fn http1_closes_idle_connections() { #[test] fn tcp_waits_for_proxies_to_close() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let (shdn, rx) = shutdown_signal(); let msg1 = "custom tcp hello"; diff --git a/tests/support/controller.rs b/tests/support/controller.rs index 89d8d160a..f6c7fdcfc 100644 --- a/tests/support/controller.rs +++ b/tests/support/controller.rs @@ -48,9 +48,14 @@ impl Controller { pub fn destination_tx(&self, dest: &str) -> DstSender { let (tx, rx) = sync::mpsc::unbounded(); + let path = if dest.contains(":") { + dest.to_owned() + } else { + format!("{}:80", dest) + }; let dst = pb::GetDestination { scheme: "k8s".into(), - path: dest.into(), + path, }; self.expect_dst_calls .lock() diff --git a/tests/support/mod.rs b/tests/support/mod.rs index fa1002ec2..59ad273a6 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -22,7 +22,7 @@ extern crate tower_h2; extern crate tower_grpc; extern crate tower_service; extern crate log; -pub extern crate env_logger; +extern crate env_logger; use std::fmt; pub use std::collections::HashMap; @@ -50,6 +50,25 @@ use self::tower_service::{NewService, Service}; pub const ENV_TEST_PATIENCE_MS: &'static str = "RUST_TEST_PATIENCE_MS"; pub const DEFAULT_TEST_PATIENCE: Duration = Duration::from_millis(15); +/// By default, disable logging in modules that are expected to error in tests. +const DEFAULT_LOG: &'static str = + "error,\ + linkerd2_proxy::proxy::canonicalize=off,\ + linkerd2_proxy::proxy::http::router=off,\ + linkerd2_proxy::proxy::tcp=off"; + +pub fn env_logger_init() { + use std::env; + + let log = env::var("LINKERD2_PROXY_LOG").unwrap_or_else(|_| DEFAULT_LOG.to_owned()); + env::set_var("RUST_LOG", &log); + env::set_var("LINKERD2_PROXY_LOG", &log); + + if let Err(e) = env_logger::try_init() { + eprintln!("Failed to initialize logger: {}", e); + } +} + /// Retry an assertion up to a specified number of times, waiting /// `RUST_TEST_PATIENCE_MS` between retries. /// diff --git a/tests/telemetry.rs b/tests/telemetry.rs index baf187a7e..7450afe11 100644 --- a/tests/telemetry.rs +++ b/tests/telemetry.rs @@ -1,4 +1,5 @@ #![deny(warnings)] +#![recursion_limit="128"] #[macro_use] extern crate log; extern crate regex; @@ -108,7 +109,7 @@ impl TcpFixture { #[test] fn metrics_endpoint_inbound_request_count() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let Fixture { client, metrics, proxy: _proxy } = Fixture::inbound(); // prior to seeing any requests, request count should be empty. @@ -125,7 +126,7 @@ fn metrics_endpoint_inbound_request_count() { #[test] fn metrics_endpoint_outbound_request_count() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let Fixture { client, metrics, proxy: _proxy } = Fixture::outbound(); // prior to seeing any requests, request count should be empty. @@ -210,7 +211,7 @@ mod response_classification { #[test] fn inbound_http() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let Fixture { client, metrics, proxy: _proxy } = Fixture::inbound_with_server(make_test_server()); @@ -235,7 +236,7 @@ mod response_classification { #[test] fn outbound_http() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let Fixture { client, metrics, proxy: _proxy } = Fixture::outbound_with_server(make_test_server()); @@ -271,7 +272,7 @@ mod response_classification { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn metrics_endpoint_inbound_response_latency() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); info!("running test server"); let srv = server::new() @@ -347,7 +348,7 @@ fn metrics_endpoint_inbound_response_latency() { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn metrics_endpoint_outbound_response_latency() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); info!("running test server"); let srv = server::new() @@ -451,7 +452,7 @@ mod outbound_dst_labels { #[test] fn multiple_addr_labels() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) = fixture("labeled.test.svc.cluster.local"); @@ -475,7 +476,7 @@ mod outbound_dst_labels { #[test] fn multiple_addrset_labels() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) = fixture("labeled.test.svc.cluster.local"); @@ -500,7 +501,7 @@ mod outbound_dst_labels { #[test] fn labeled_addr_and_addrset() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) = fixture("labeled.test.svc.cluster.local"); @@ -529,7 +530,7 @@ mod outbound_dst_labels { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn controller_updates_addr_labels() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); info!("running test server"); let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) = @@ -586,7 +587,7 @@ mod outbound_dst_labels { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn controller_updates_set_labels() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); info!("running test server"); let (Fixture { client, metrics, proxy: _proxy }, addr, dst_tx) = fixture("labeled.test.svc.cluster.local"); @@ -637,7 +638,7 @@ mod outbound_dst_labels { #[test] fn metrics_have_no_double_commas() { // Test for regressions to linkerd/linkerd2#600. - let _ = env_logger::try_init(); + let _ = env_logger_init(); info!("running test server"); let inbound_srv = server::new().route("/hey", "hello").run(); @@ -690,7 +691,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn inbound_http_accept() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let Fixture { client, metrics, proxy } = Fixture::inbound(); info!("client.get(/)"); @@ -722,7 +723,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn inbound_http_connect() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let Fixture { client, metrics, proxy } = Fixture::inbound(); info!("client.get(/)"); @@ -743,7 +744,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_http_accept() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let Fixture { client, metrics, proxy } = Fixture::outbound(); info!("client.get(/)"); @@ -775,7 +776,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_http_connect() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let Fixture { client, metrics, proxy } = Fixture::outbound(); info!("client.get(/)"); @@ -796,7 +797,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn inbound_tcp_connect() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::inbound(); @@ -812,7 +813,7 @@ mod transport { #[cfg(macos)] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn inbound_tcp_connect_err() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = tcp::server() .accept_fut(move |sock| { drop(sock); @@ -843,7 +844,7 @@ mod transport { #[cfg(macos)] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_tcp_connect_err() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = tcp::server() .accept_fut(move |sock| { drop(sock); @@ -873,7 +874,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn inbound_tcp_accept() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::inbound(); @@ -905,7 +906,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn inbound_tcp_duration() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::inbound(); @@ -942,7 +943,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn inbound_tcp_write_bytes_total() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::inbound(); let src_expected = format!( @@ -968,7 +969,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn inbound_tcp_read_bytes_total() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::inbound(); let src_expected = format!( @@ -993,7 +994,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_tcp_connect() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::outbound(); @@ -1008,7 +1009,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_tcp_accept() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::outbound(); @@ -1039,7 +1040,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_tcp_duration() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::outbound(); @@ -1076,7 +1077,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_tcp_write_bytes_total() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::outbound(); let src_expected = format!( @@ -1102,7 +1103,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_tcp_read_bytes_total() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::outbound(); let src_expected = format!( @@ -1128,7 +1129,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_tcp_open_connections() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let TcpFixture { client, metrics, proxy: _proxy } = TcpFixture::outbound(); @@ -1156,7 +1157,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn outbound_http_tcp_open_connections() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let Fixture { client, metrics, proxy } = Fixture::outbound(); @@ -1187,7 +1188,7 @@ mod transport { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn metrics_compression() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let Fixture { client, metrics, proxy: _proxy } = Fixture::inbound(); diff --git a/tests/transparency.rs b/tests/transparency.rs index 532cedf13..990e4de0a 100644 --- a/tests/transparency.rs +++ b/tests/transparency.rs @@ -1,11 +1,12 @@ #![deny(warnings)] +#![recursion_limit="128"] #[macro_use] mod support; use self::support::*; #[test] fn outbound_http1() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1().route("/", "hello h1").run(); let ctrl = controller::new() @@ -19,7 +20,7 @@ fn outbound_http1() { #[test] fn inbound_http1() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1().route("/", "hello h1").run(); let proxy = proxy::new() @@ -32,7 +33,7 @@ fn inbound_http1() { #[test] fn outbound_tcp() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let msg1 = "custom tcp hello"; let msg2 = "custom tcp bye"; @@ -57,7 +58,7 @@ fn outbound_tcp() { #[test] fn inbound_tcp() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let msg1 = "custom tcp hello"; let msg2 = "custom tcp bye"; @@ -84,7 +85,7 @@ fn inbound_tcp() { fn tcp_server_first() { use std::sync::mpsc; - let _ = env_logger::try_init(); + let _ = env_logger_init(); let msg1 = "custom tcp server starts"; let msg2 = "custom tcp client second"; @@ -120,7 +121,7 @@ fn tcp_server_first() { #[test] fn tcp_with_no_orig_dst() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::tcp() .accept(move |_| "don't read me") @@ -146,7 +147,7 @@ fn tcp_with_no_orig_dst() { fn tcp_connections_close_if_client_closes() { use std::sync::mpsc; - let _ = env_logger::try_init(); + let _ = env_logger_init(); let msg1 = "custom tcp hello"; let msg2 = "custom tcp bye"; @@ -193,7 +194,7 @@ macro_rules! http1_tests { (proxy: $proxy:expr) => { #[test] fn inbound_http1() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1().route("/", "hello h1").run(); let proxy = $proxy(srv); @@ -204,7 +205,7 @@ macro_rules! http1_tests { #[test] fn http1_removes_connection_headers() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1() .route_fn("/", |req| { @@ -241,7 +242,7 @@ macro_rules! http1_tests { #[test] fn http10_with_host() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let host = "transparency.test.svc.cluster.local"; let srv = server::http1() @@ -267,7 +268,7 @@ macro_rules! http1_tests { #[test] fn http11_absolute_uri_differs_from_host() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); // We shouldn't touch the URI or the Host, just pass directly as we got. let auth = "transparency.test.svc.cluster.local"; @@ -292,7 +293,7 @@ macro_rules! http1_tests { #[test] fn http11_upgrades() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); // To simplify things for this test, we just use the test TCP // client and server to do an HTTP upgrade. @@ -369,7 +370,7 @@ macro_rules! http1_tests { #[test] fn http11_upgrade_h2_stripped() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); // If an `h2` upgrade over HTTP/1.1 were to go by the proxy, // and it succeeded, there would an h2 connection, but it would @@ -405,7 +406,7 @@ macro_rules! http1_tests { #[test] fn http11_connect() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); // To simplify things for this test, we just use the test TCP // client and server to do an HTTP CONNECT. @@ -476,7 +477,7 @@ macro_rules! http1_tests { #[test] fn http11_connect_bad_requests() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::tcp() .accept(move |_sock| -> Vec { @@ -542,7 +543,7 @@ macro_rules! http1_tests { #[test] fn http1_request_with_body_content_length() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1() .route_fn("/", |req| { @@ -566,7 +567,7 @@ macro_rules! http1_tests { #[test] fn http1_request_with_body_chunked() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1() .route_async("/", |req| { @@ -603,7 +604,7 @@ macro_rules! http1_tests { #[test] fn http1_requests_without_body_doesnt_add_transfer_encoding() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1() .route_fn("/", |req| { @@ -644,7 +645,7 @@ macro_rules! http1_tests { #[test] fn http1_content_length_zero_is_preserved() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1() .route_fn("/", |req| { @@ -688,7 +689,7 @@ macro_rules! http1_tests { #[test] fn http1_bodyless_responses() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let req_status_header = "x-test-status-requested"; @@ -752,7 +753,7 @@ macro_rules! http1_tests { #[test] fn http1_head_responses() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1() .route_fn("/", move |req| { @@ -785,7 +786,7 @@ macro_rules! http1_tests { #[test] fn http1_response_end_of_file() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); // test both http/1.0 and 1.1 let srv = server::tcp() @@ -888,7 +889,7 @@ mod proxy_to_proxy { fn http10_without_host() { // Without a host or authority, there's no way to route this test, // so its not part of the proxy_to_proxy set. - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1() .route_fn("/", move |req| { @@ -921,7 +922,7 @@ fn http10_without_host() { #[test] fn http1_one_connection_per_host() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1() .route("/body", "hello hosts") @@ -966,7 +967,7 @@ fn http1_one_connection_per_host() { #[test] fn http1_requests_without_host_have_unique_connections() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); let srv = server::http1() .route("/", "unique hosts") @@ -1021,7 +1022,7 @@ fn http1_requests_without_host_have_unique_connections() { #[test] #[cfg_attr(not(feature = "flaky_tests"), ignore)] fn retry_reconnect_errors() { - let _ = env_logger::try_init(); + let _ = env_logger_init(); // Used to delay `listen` in the server, to force connection refused errors. let (tx, rx) = oneshot::channel();