Revert "Proxy: Refactor DNS name parsing and normalization (#673)" (#700)

This reverts commit 311ef410a8.

Signed-off-by: Brian Smith <brian@briansmith.org>
This commit is contained in:
Brian Smith 2018-04-05 16:49:32 -10:00 committed by GitHub
parent 1b223723bc
commit 7bc4ffd0a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 207 additions and 280 deletions

View File

@ -349,7 +349,7 @@ fn parse_url(s: &str) -> Result<HostAndPort, ParseError> {
// https://github.com/hyperium/http/issues/127. For now just ignore any // https://github.com/hyperium/http/issues/127. For now just ignore any
// fragment that is there. // fragment that is there.
HostAndPort::normalize(authority, None) HostAndPort::try_from(authority)
.map_err(|e| ParseError::UrlError(UrlError::AuthorityError(e))) .map_err(|e| ParseError::UrlError(UrlError::AuthorityError(e)))
} }
@ -366,7 +366,7 @@ fn parse<T, Parse>(strings: &Strings, name: &str, parse: Parse) -> Result<Option
match strings.get(name)? { match strings.get(name)? {
Some(ref s) => { Some(ref s) => {
let r = parse(s).map_err(|parse_error| { let r = parse(s).map_err(|parse_error| {
error!("{}={:?} is not valid: {:?}", name, s, parse_error); error!("{} is not valid: {:?}", name, parse_error);
Error::InvalidEnvVar Error::InvalidEnvVar
})?; })?;
Ok(Some(r)) Ok(Some(r))

View File

@ -16,14 +16,13 @@ use conduit_proxy_controller_grpc::common::{Destination, TcpAddress};
use conduit_proxy_controller_grpc::destination::Update as PbUpdate; use conduit_proxy_controller_grpc::destination::Update as PbUpdate;
use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2; use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2;
use conduit_proxy_controller_grpc::destination::client::{Destination as DestinationSvc}; use conduit_proxy_controller_grpc::destination::client::{Destination as DestinationSvc};
use transport::DnsNameAndPort;
use control::cache::{Cache, CacheChange, Exists}; use control::cache::{Cache, CacheChange, Exists};
/// A handle to start watching a destination for address changes. /// A handle to start watching a destination for address changes.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Discovery { pub struct Discovery {
tx: mpsc::UnboundedSender<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>, tx: mpsc::UnboundedSender<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
} }
/// A `tower_discover::Discover`, given to a `tower_balance::Balance`. /// A `tower_discover::Discover`, given to a `tower_balance::Balance`.
@ -36,28 +35,29 @@ pub struct Watch<B> {
/// A background handle to eventually bind on the controller thread. /// A background handle to eventually bind on the controller thread.
#[derive(Debug)] #[derive(Debug)]
pub struct Background { pub struct Background {
rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>, rx: mpsc::UnboundedReceiver<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
default_destination_namespace: String,
} }
/// A future returned from `Background::work()`, doing the work of talking to /// A future returned from `Background::work()`, doing the work of talking to
/// the controller destination API. /// the controller destination API.
// TODO: debug impl // TODO: debug impl
pub struct DiscoveryWork<T: HttpService<ResponseBody = RecvBody>> { pub struct DiscoveryWork<T: HttpService<ResponseBody = RecvBody>> {
default_destination_namespace: String, destinations: HashMap<
destinations: HashMap<DnsNameAndPort, DestinationSet<T>>, FullyQualifiedAuthority,
DestinationSet<T>
>,
/// A queue of authorities that need to be reconnected. /// A queue of authorities that need to be reconnected.
reconnects: VecDeque<DnsNameAndPort>, reconnects: VecDeque<FullyQualifiedAuthority>,
/// The Destination.Get RPC client service. /// The Destination.Get RPC client service.
/// Each poll, records whether the rpc service was till ready. /// Each poll, records whether the rpc service was till ready.
rpc_ready: bool, rpc_ready: bool,
/// A receiver of new watch requests. /// A receiver of new watch requests.
rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>, rx: mpsc::UnboundedReceiver<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
} }
struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> { struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
addrs: Exists<Cache<SocketAddr>>, addrs: Exists<Cache<SocketAddr>>,
query: Option<DestinationServiceQuery<T>>, query: DestinationServiceQuery<T>,
txs: Vec<mpsc::UnboundedSender<Update>>, txs: Vec<mpsc::UnboundedSender<Update>>,
} }
@ -129,7 +129,7 @@ pub trait Bind {
/// ///
/// The `Discovery` is used by a listener, the `Background` is consumed /// The `Discovery` is used by a listener, the `Background` is consumed
/// on the controller thread. /// on the controller thread.
pub fn new(default_destination_namespace: String) -> (Discovery, Background) { pub fn new() -> (Discovery, Background) {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
( (
Discovery { Discovery {
@ -137,7 +137,6 @@ pub fn new(default_destination_namespace: String) -> (Discovery, Background) {
}, },
Background { Background {
rx, rx,
default_destination_namespace,
}, },
) )
} }
@ -146,7 +145,7 @@ pub fn new(default_destination_namespace: String) -> (Discovery, Background) {
impl Discovery { impl Discovery {
/// Start watching for address changes for a certain authority. /// Start watching for address changes for a certain authority.
pub fn resolve<B>(&self, authority: &DnsNameAndPort, bind: B) -> Watch<B> { pub fn resolve<B>(&self, authority: &FullyQualifiedAuthority, bind: B) -> Watch<B> {
trace!("resolve; authority={:?}", authority); trace!("resolve; authority={:?}", authority);
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
self.tx self.tx
@ -203,7 +202,6 @@ impl Background {
T::Error: fmt::Debug, T::Error: fmt::Debug,
{ {
DiscoveryWork { DiscoveryWork {
default_destination_namespace: self.default_destination_namespace,
destinations: HashMap::new(), destinations: HashMap::new(),
reconnects: VecDeque::new(), reconnects: VecDeque::new(),
rpc_ready: false, rpc_ready: false,
@ -278,11 +276,7 @@ where
} }
Entry::Vacant(vac) => { Entry::Vacant(vac) => {
let query = let query =
DestinationServiceQuery::connect_maybe( DestinationServiceQuery::connect(client, vac.key(), "connect");
&self.default_destination_namespace,
client,
vac.key(),
"connect");
vac.insert(DestinationSet { vac.insert(DestinationSet {
addrs: Exists::Unknown, addrs: Exists::Unknown,
query, query,
@ -307,11 +301,7 @@ where
while let Some(auth) = self.reconnects.pop_front() { while let Some(auth) = self.reconnects.pop_front() {
if let Some(set) = self.destinations.get_mut(&auth) { if let Some(set) = self.destinations.get_mut(&auth) {
set.query = DestinationServiceQuery::connect_maybe( set.query = DestinationServiceQuery::connect(client, &auth, "reconnect");
&self.default_destination_namespace,
client,
&auth,
"reconnect");
return true; return true;
} else { } else {
trace!("reconnect no longer needed: {:?}", auth); trace!("reconnect no longer needed: {:?}", auth);
@ -324,11 +314,10 @@ where
for (auth, set) in &mut self.destinations { for (auth, set) in &mut self.destinations {
let needs_reconnect = 'set: loop { let needs_reconnect = 'set: loop {
let poll_result = match set.query { let poll_result = match set.query {
None | DestinationServiceQuery::NeedsReconnect => {
Some(DestinationServiceQuery::NeedsReconnect) => {
continue; continue;
}, },
Some(DestinationServiceQuery::ConnectedOrConnecting{ ref mut rx }) => { DestinationServiceQuery::ConnectedOrConnecting{ ref mut rx } => {
rx.poll() rx.poll()
} }
}; };
@ -364,9 +353,9 @@ where
}; };
if needs_reconnect { if needs_reconnect {
set.query = Some(DestinationServiceQuery::NeedsReconnect); set.query = DestinationServiceQuery::NeedsReconnect;
set.reset_on_next_modification(); set.reset_on_next_modification();
self.reconnects.push_back(auth.clone()); self.reconnects.push_back(FullyQualifiedAuthority::clone(auth));
} }
} }
} }
@ -376,28 +365,16 @@ where
// ===== impl DestinationServiceQuery ===== // ===== impl DestinationServiceQuery =====
impl<T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>> DestinationServiceQuery<T> { impl<T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>> DestinationServiceQuery<T> {
// Initiates a query `query` to the Destination service and returns it as `Some(query)` if the fn connect(client: &mut T, auth: &FullyQualifiedAuthority, connect_or_reconnect: &str) -> Self {
// given authority's host is of a form suitable for using to query the Destination service.
// Otherwise, returns `None`.
fn connect_maybe(
default_destination_namespace: &str,
client: &mut T,
auth: &DnsNameAndPort,
connect_or_reconnect: &str)
-> Option<Self>
{
trace!("DestinationServiceQuery {} {:?}", connect_or_reconnect, auth); trace!("DestinationServiceQuery {} {:?}", connect_or_reconnect, auth);
FullyQualifiedAuthority::normalize(auth, default_destination_namespace)
.map(|auth| {
let req = Destination { let req = Destination {
scheme: "k8s".into(), scheme: "k8s".into(),
path: auth.without_trailing_dot().to_owned(), path: auth.without_trailing_dot().as_str().into(),
}; };
// TODO: Can grpc::Request::new be removed? // TODO: Can grpc::Request::new be removed?
let mut svc = DestinationSvc::new(client.lift_ref()); let mut svc = DestinationSvc::new(client.lift_ref());
let response = svc.get(grpc::Request::new(req)); let response = svc.get(grpc::Request::new(req));
DestinationServiceQuery::ConnectedOrConnecting { rx: UpdateRx::Waiting(response) } DestinationServiceQuery::ConnectedOrConnecting { rx: UpdateRx::Waiting(response) }
})
} }
} }
@ -414,7 +391,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
} }
} }
fn add<A>(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_add: A) fn add<A>(&mut self, authority_for_logging: &FullyQualifiedAuthority, addrs_to_add: A)
where A: Iterator<Item = SocketAddr> where A: Iterator<Item = SocketAddr>
{ {
let mut cache = match self.addrs.take() { let mut cache = match self.addrs.take() {
@ -428,7 +405,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
self.addrs = Exists::Yes(cache); self.addrs = Exists::Yes(cache);
} }
fn remove<A>(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_remove: A) fn remove<A>(&mut self, authority_for_logging: &FullyQualifiedAuthority, addrs_to_remove: A)
where A: Iterator<Item = SocketAddr> where A: Iterator<Item = SocketAddr>
{ {
let cache = match self.addrs.take() { let cache = match self.addrs.take() {
@ -444,7 +421,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
self.addrs = Exists::Yes(cache); self.addrs = Exists::Yes(cache);
} }
fn no_endpoints(&mut self, authority_for_logging: &DnsNameAndPort, exists: bool) { fn no_endpoints(&mut self, authority_for_logging: &FullyQualifiedAuthority, exists: bool) {
trace!("no endpoints for {:?} that is known to {}", authority_for_logging, trace!("no endpoints for {:?} that is known to {}", authority_for_logging,
if exists { "exist" } else { "not exist" }); if exists { "exist" } else { "not exist" });
match self.addrs.take() { match self.addrs.take() {
@ -463,7 +440,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
} }
fn on_change(txs: &mut Vec<mpsc::UnboundedSender<Update>>, fn on_change(txs: &mut Vec<mpsc::UnboundedSender<Update>>,
authority_for_logging: &DnsNameAndPort, authority_for_logging: &FullyQualifiedAuthority,
addr: SocketAddr, addr: SocketAddr,
change: CacheChange) { change: CacheChange) {
let (update_str, update_constructor): (&'static str, fn(SocketAddr) -> Update) = let (update_str, update_constructor): (&'static str, fn(SocketAddr) -> Update) =

View File

@ -17,7 +17,8 @@ use tower_h2;
use tower_reconnect::{Error as ReconnectError, Reconnect}; use tower_reconnect::{Error as ReconnectError, Reconnect};
use dns; use dns;
use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect}; use fully_qualified_authority::FullyQualifiedAuthority;
use transport::{HostAndPort, LookupAddressAndConnect};
use timeout::{Timeout, TimeoutError}; use timeout::{Timeout, TimeoutError};
mod cache; mod cache;
@ -40,9 +41,8 @@ pub struct Background {
disco: DiscoBg, disco: DiscoBg,
} }
pub fn new(default_destination_namespace: String) -> (Control, Background) pub fn new() -> (Control, Background) {
{ let (tx, rx) = self::discovery::new();
let (tx, rx) = self::discovery::new(default_destination_namespace);
let c = Control { let c = Control {
disco: tx, disco: tx,
@ -58,7 +58,7 @@ pub fn new(default_destination_namespace: String) -> (Control, Background)
// ===== impl Control ===== // ===== impl Control =====
impl Control { impl Control {
pub fn resolve<B>(&self, auth: &DnsNameAndPort, bind: B) -> Watch<B> { pub fn resolve<B>(&self, auth: &FullyQualifiedAuthority, bind: B) -> Watch<B> {
self.disco.resolve(auth, bind) self.disco.resolve(auth, bind)
} }
} }

View File

@ -3,7 +3,6 @@ use abstract_ns::HostResolve;
use domain; use domain;
use futures::prelude::*; use futures::prelude::*;
use ns_dns_tokio; use ns_dns_tokio;
use std::fmt;
use std::net::IpAddr; use std::net::IpAddr;
use std::path::Path; use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
@ -19,46 +18,15 @@ pub struct Resolver(ns_dns_tokio::DnsResolver);
pub enum IpAddrFuture { pub enum IpAddrFuture {
DNS(ns_dns_tokio::HostFuture), DNS(ns_dns_tokio::HostFuture),
Fixed(IpAddr), Fixed(IpAddr),
InvalidDNSName(String),
} }
pub enum Error { pub enum Error {
InvalidDNSName(String),
NoAddressesFound, NoAddressesFound,
ResolutionFailed(<ns_dns_tokio::HostFuture as Future>::Error), ResolutionFailed(<ns_dns_tokio::HostFuture as Future>::Error),
} }
/// A DNS name.
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct Name(abstract_ns::Name);
impl fmt::Display for Name {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
self.0.fmt(f)
}
}
impl Name {
/// Parses the input string as a DNS name, normalizing it to lowercase.
pub fn normalize(s: &str) -> Result<Self, ()> {
// XXX: `abstract_ns::Name::from_str()` wrongly accepts IP addresses as
// domain names. Protect against this. TODO: Fix abstract_ns.
if let Ok(_) = IpAddr::from_str(s) {
return Err(());
}
// XXX: `abstract_ns::Name::from_str()` doesn't accept uppercase letters.
// TODO: Avoid this extra allocation.
let s = s.to_ascii_lowercase();
abstract_ns::Name::from_str(&s)
.map(Name)
.map_err(|_| ())
}
}
impl AsRef<str> for Name {
fn as_ref(&self) -> &str {
self.0.as_ref()
}
}
impl Config { impl Config {
/// Note that this ignores any errors reading or parsing the resolve.conf /// Note that this ignores any errors reading or parsing the resolve.conf
/// file, just like the `domain` crate does. /// file, just like the `domain` crate does.
@ -81,7 +49,10 @@ impl Resolver {
match *host { match *host {
transport::Host::DnsName(ref name) => { transport::Host::DnsName(ref name) => {
trace!("resolve {}", name); trace!("resolve {}", name);
IpAddrFuture::DNS(self.0.resolve_host(&name.0)) match abstract_ns::Name::from_str(name) {
Ok(name) => IpAddrFuture::DNS(self.0.resolve_host(&name)),
Err(_) => IpAddrFuture::InvalidDNSName(name.clone()),
}
} }
transport::Host::Ip(addr) => IpAddrFuture::Fixed(addr), transport::Host::Ip(addr) => IpAddrFuture::Fixed(addr),
} }
@ -103,55 +74,7 @@ impl Future for IpAddrFuture {
Err(e) => Err(Error::ResolutionFailed(e)), Err(e) => Err(Error::ResolutionFailed(e)),
}, },
IpAddrFuture::Fixed(addr) => Ok(Async::Ready(addr)), IpAddrFuture::Fixed(addr) => Ok(Async::Ready(addr)),
} IpAddrFuture::InvalidDNSName(ref name) => Err(Error::InvalidDNSName(name.clone())),
}
}
#[cfg(test)]
mod tests {
use super::Name;
#[test]
fn test_dns_name_parsing() {
struct Case {
input: &'static str,
output: &'static str,
}
static VALID: &[Case] = &[
// Almost all digits and dots, similar to IPv4 addresses.
Case { input: "1.2.3.x", output: "1.2.3.x", },
Case { input: "1.2.3.x", output: "1.2.3.x", },
Case { input: "1.2.3.4A", output: "1.2.3.4a", },
Case { input: "a.1.2.3", output: "a.1.2.3", },
Case { input: "1.2.x.3", output: "1.2.x.3", },
Case { input: "a.b.c.d", output: "a.b.c.d", },
// Uppercase letters in labels
Case { input: "A.b.c.d", output: "a.b.c.d", },
Case { input: "a.mIddle.c", output: "a.middle.c", },
Case { input: "a.b.c.D", output: "a.b.c.d", },
];
for case in VALID {
let name = Name::normalize(case.input).expect("is a valid DNS name");
assert_eq!(name.as_ref(), case.output);
}
static INVALID: &[&str] = &[
"",
"1.2.3.4",
"::1",
"[::1]",
":1234",
"1.2.3.4:11234",
"abc.com:1234",
];
for case in INVALID {
assert!(Name::normalize(case).is_err(),
"{} is invalid", case);
} }
} }
} }

View File

@ -1,28 +1,60 @@
use bytes::{BytesMut}; use bytes::BytesMut;
use transport::DnsNameAndPort; use std::net::IpAddr;
use std::str::FromStr;
use http::uri::Authority;
/// A normalized `Authority`. /// A normalized `Authority`.
#[derive(Clone, Debug, Eq, Hash, PartialEq)] #[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct FullyQualifiedAuthority(String); pub struct FullyQualifiedAuthority(Authority);
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct NamedAddress {
pub name: FullyQualifiedAuthority,
pub use_destination_service: bool
}
impl FullyQualifiedAuthority { impl FullyQualifiedAuthority {
/// Normalizes the name according to Kubernetes service naming conventions. /// Normalizes the name according to Kubernetes service naming conventions.
/// Case folding is not done; that is done internally inside `Authority`. /// Case folding is not done; that is done internally inside `Authority`.
pub fn normalize(authority: &DnsNameAndPort, default_namespace: &str) -> Option<Self> { ///
let name: &str = authority.host.as_ref(); /// This assumes the authority is syntactically valid.
pub fn normalize(authority: &Authority, default_namespace: &str)
-> NamedAddress
{
// Don't change IP-address-based authorities.
if IpAddr::from_str(authority.host()).is_ok() {
return NamedAddress {
name: FullyQualifiedAuthority(authority.clone()),
use_destination_service: false,
}
};
// TODO: `Authority` doesn't have a way to get the serialized form of the
// port, so do it ourselves.
let (name, colon_port) = {
let authority = authority.as_str();
match authority.rfind(':') {
Some(p) => authority.split_at(p),
None => (authority, ""),
}
};
// parts should have a maximum 4 of pieces (name, namespace, svc, zone) // parts should have a maximum 4 of pieces (name, namespace, svc, zone)
let mut parts = name.splitn(4, '.'); let mut parts = name.splitn(4, '.');
// `dns::Name` guarantees the name has at least one part. // `Authority` guarantees the name has at least one part.
assert!(parts.next().is_some()); assert!(parts.next().is_some());
// Rewrite "$name" -> "$name.$default_namespace". // Rewrite "$name" -> "$name.$default_namespace".
let has_explicit_namespace = match parts.next() { let has_explicit_namespace = match parts.next() {
Some("") => { Some("") => {
// "$name." is an external absolute name. // "$name." is an external absolute name.
return None; return NamedAddress {
name: FullyQualifiedAuthority(authority.clone()),
use_destination_service: false,
};
}, },
Some(_) => true, Some(_) => true,
None => false, None => false,
@ -37,14 +69,20 @@ impl FullyQualifiedAuthority {
let append_svc = if let Some(part) = parts.next() { let append_svc = if let Some(part) = parts.next() {
if !part.eq_ignore_ascii_case("svc") { if !part.eq_ignore_ascii_case("svc") {
// If not "$name.$namespace.svc", treat as external. // If not "$name.$namespace.svc", treat as external.
return None; return NamedAddress {
name: FullyQualifiedAuthority(authority.clone()),
use_destination_service: false,
};
} }
false false
} else if has_explicit_namespace { } else if has_explicit_namespace {
true true
} else if namespace_to_append.is_none() { } else if namespace_to_append.is_none() {
// We can't append ".svc" without a namespace, so treat as external. // We can't append ".svc" without a namespace, so treat as external.
return None; return NamedAddress {
name: FullyQualifiedAuthority(authority.clone()),
use_destination_service: false,
}
} else { } else {
true true
}; };
@ -62,7 +100,10 @@ impl FullyQualifiedAuthority {
// "a.b.svc." is an external absolute name. // "a.b.svc." is an external absolute name.
// "a.b.svc.foo" is external if the default zone is not // "a.b.svc.foo" is external if the default zone is not
// "foo". // "foo".
return None; return NamedAddress {
name: FullyQualifiedAuthority(authority.clone()),
use_destination_service: false,
}
} }
(None, strip_last) (None, strip_last)
} else { } else {
@ -80,16 +121,17 @@ impl FullyQualifiedAuthority {
additional_len += 1 + zone.len(); // "." + zone additional_len += 1 + zone.len(); // "." + zone
} }
let port_str_len = match authority.port { // If we're not going to change anything then don't allocate anything.
80 => 0, // XXX: Assumes http://, which is all we support right now. if additional_len == 0 && !strip_last {
p if p >= 10000 => 1 + 5, return NamedAddress {
p if p >= 1000 => 1 + 4, name: FullyQualifiedAuthority(authority.clone()),
p if p >= 100 => 1 + 3, use_destination_service: true,
p if p >= 10 => 1 + 2, }
_ => 1, }
};
let mut normalized = BytesMut::with_capacity(name.len() + additional_len + port_str_len); // `authority.as_str().len()` includes the length of `colon_port`.
let mut normalized =
BytesMut::with_capacity(authority.as_str().len() + additional_len);
normalized.extend_from_slice(name.as_bytes()); normalized.extend_from_slice(name.as_bytes());
if let Some(namespace) = namespace_to_append { if let Some(namespace) = namespace_to_append {
normalized.extend_from_slice(b"."); normalized.extend_from_slice(b".");
@ -102,58 +144,52 @@ impl FullyQualifiedAuthority {
normalized.extend_from_slice(b"."); normalized.extend_from_slice(b".");
normalized.extend_from_slice(zone.as_bytes()); normalized.extend_from_slice(zone.as_bytes());
} }
normalized.extend_from_slice(colon_port.as_bytes());
if strip_last { if strip_last {
let new_len = normalized.len() - 1; let new_len = normalized.len() - 1;
normalized.truncate(new_len); normalized.truncate(new_len);
} }
// Append the port let name = Authority::from_shared(normalized.freeze())
if port_str_len > 0 { .expect("syntactically-valid authority");
normalized.extend_from_slice(b":"); let name = FullyQualifiedAuthority(name);
let port = authority.port.to_string(); NamedAddress {
normalized.extend_from_slice(port.as_ref()); name,
use_destination_service: true,
}
} }
Some(FullyQualifiedAuthority(String::from_utf8(normalized.freeze().to_vec()).unwrap())) pub fn without_trailing_dot(&self) -> &Authority {
}
pub fn without_trailing_dot(&self) -> &str {
&self.0 &self.0
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use transport::{DnsNameAndPort, Host, HostAndPort};
use http::uri::Authority;
use std::str::FromStr;
#[test] #[test]
fn test_normalized_authority() { fn test_normalized_authority() {
fn dns_name_and_port_from_str(input: &str) -> DnsNameAndPort {
let authority = Authority::from_str(input).unwrap();
match HostAndPort::normalize(&authority, Some(80)) {
Ok(HostAndPort { host: Host::DnsName(host), port }) =>
DnsNameAndPort { host, port },
Err(e) => {
unreachable!("{:?} when parsing {:?}", e, input)
},
_ => unreachable!("Not a DNS name: {:?}", input),
}
}
fn local(input: &str, default_namespace: &str) -> String { fn local(input: &str, default_namespace: &str) -> String {
let name = dns_name_and_port_from_str(input); use bytes::Bytes;
let output = super::FullyQualifiedAuthority::normalize(&name, default_namespace); use http::uri::Authority;
assert!(output.is_some(), "input: {}", input);
output.unwrap().without_trailing_dot().into() let input = Authority::from_shared(Bytes::from(input.as_bytes()))
.unwrap();
let output = super::FullyQualifiedAuthority::normalize(
&input, default_namespace);
assert_eq!(output.use_destination_service, true, "input: {}", input);
output.name.without_trailing_dot().as_str().into()
} }
fn external(input: &str, default_namespace: &str) { fn external(input: &str, default_namespace: &str) {
let name = dns_name_and_port_from_str(input); use bytes::Bytes;
let output = super::FullyQualifiedAuthority::normalize(&name, default_namespace); use http::uri::Authority;
assert!(output.is_none(), "input: {}", input);
let input = Authority::from_shared(Bytes::from(input.as_bytes())).unwrap();
let output = super::FullyQualifiedAuthority::normalize(
&input, default_namespace);
assert_eq!(output.use_destination_service, false);
assert_eq!(output.name.without_trailing_dot().as_str(), input);
} }
assert_eq!("name.namespace.svc.cluster.local", local("name", "namespace")); assert_eq!("name.namespace.svc.cluster.local", local("name", "namespace"));
@ -209,10 +245,20 @@ mod tests {
local("name.namespace.svc.cluster.local:1234", "namespace")); local("name.namespace.svc.cluster.local:1234", "namespace"));
// "SVC" is recognized as being equivalent to "svc" // "SVC" is recognized as being equivalent to "svc"
assert_eq!("name.namespace.svc.cluster.local", assert_eq!("name.namespace.SVC.cluster.local",
local("name.namespace.SVC", "namespace")); local("name.namespace.SVC", "namespace"));
external("name.namespace.SVC.cluster", "namespace"); external("name.namespace.SVC.cluster", "namespace");
assert_eq!("name.namespace.svc.cluster.local", assert_eq!("name.namespace.SVC.cluster.local",
local("name.namespace.SVC.cluster.local", "namespace")); local("name.namespace.SVC.cluster.local", "namespace"));
// IPv4 addresses are left unchanged.
external("1.2.3.4", "namespace");
external("1.2.3.4:1234", "namespace");
external("127.0.0.1", "namespace");
external("127.0.0.1:8080", "namespace");
// IPv6 addresses are left unchanged.
external("[::1]", "namespace");
external("[::1]:1234", "namespace");
} }
} }

View File

@ -202,10 +202,11 @@ where
config.metrics_flush_interval, config.metrics_flush_interval,
); );
let (control, control_bg) = control::new(config.pod_namespace.clone()); let (control, control_bg) = control::new();
let executor = core.handle();
let dns_config = dns::Config::from_file(&config.resolv_conf_path); let dns_config = dns::Config::from_file(&config.resolv_conf_path);
let executor = core.handle();
let bind = Bind::new(executor.clone()).with_sensors(sensors.clone()); let bind = Bind::new(executor.clone()).with_sensors(sensors.clone());

View File

@ -18,10 +18,9 @@ use bind::{self, Bind, Protocol};
use control::{self, discovery}; use control::{self, discovery};
use control::discovery::Bind as BindTrait; use control::discovery::Bind as BindTrait;
use ctx; use ctx;
use fully_qualified_authority::FullyQualifiedAuthority; use fully_qualified_authority::{FullyQualifiedAuthority, NamedAddress};
use timeout::Timeout; use timeout::Timeout;
use transparency::h1; use transparency::h1;
use transport::{DnsNameAndPort, Host, HostAndPort};
type BindProtocol<B> = bind::BindProtocol<Arc<ctx::Proxy>, B>; type BindProtocol<B> = bind::BindProtocol<Arc<ctx::Proxy>, B>;
@ -53,11 +52,25 @@ impl<B> Outbound<B> {
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Destination { pub enum Destination {
Hostname(DnsNameAndPort), LocalSvc(FullyQualifiedAuthority),
ExplicitIp(SocketAddr), External(SocketAddr),
ImplicitOriginalDst(SocketAddr),
} }
impl From<FullyQualifiedAuthority> for Destination {
#[inline]
fn from(authority: FullyQualifiedAuthority) -> Self {
Destination::LocalSvc(authority)
}
}
impl From<SocketAddr> for Destination {
#[inline]
fn from(addr: SocketAddr) -> Self {
Destination::External(addr)
}
}
impl<B> Recognize for Outbound<B> impl<B> Recognize for Outbound<B>
where where
B: tower_h2::Body + 'static, B: tower_h2::Body + 'static,
@ -79,46 +92,38 @@ where
// by `NormalizeUri`, as we need to know whether the request will // by `NormalizeUri`, as we need to know whether the request will
// be routed by Host/authority or by SO_ORIGINAL_DST, in order to // be routed by Host/authority or by SO_ORIGINAL_DST, in order to
// determine whether the service is reusable. // determine whether the service is reusable.
let authority = req.uri().authority_part() let local = req.uri().authority_part()
.cloned() .cloned()
// Therefore, we need to check the host header as well as the URI // Therefore, we need to check the host header as well as the URI
// for a valid authority, before we fall back to SO_ORIGINAL_DST. // for a valid authority, before we fall back to SO_ORIGINAL_DST.
.or_else(|| h1::authority_from_host(req)); .or_else(|| h1::authority_from_host(req))
.map(|authority| {
FullyQualifiedAuthority::normalize(
&authority,
&self.default_namespace)
});
// If we can't fully qualify the authority as a local service,
// TODO: Return error when `HostAndPort::normalize()` fails. // and there is no original dst, then we have nothing! In that
let mut dest = match authority.as_ref() // case, we return `None`, which results an "unrecognized" error.
.and_then(|auth| HostAndPort::normalize(auth, Some(80)).ok()) { //
Some(HostAndPort { host: Host::DnsName(dns_name), port }) => { // In practice, this shouldn't ever happen, since we expect the proxy
let authority = DnsNameAndPort { host: dns_name, port }; // to be run on Linux servers, with iptables setup, so there should
// Work around the inability of control/discovery.rs to handle unnormalized names. // always be an original destination.
// TODO: Remove this use of `FullyQualifiedAuthority::normalize()` and use let dest = if let Some(NamedAddress {
// `Destination::Hostname` for all `Host::DnsName` values once DNS machinery is name,
// added to control/discovery.rs. use_destination_service: true
FullyQualifiedAuthority::normalize(&authority, &self.default_namespace) }) = local {
.map(|_| Destination::Hostname(authority)) Destination::LocalSvc(name)
}, } else {
Some(HostAndPort { host: Host::Ip(ip), port }) => let orig_dst = req.extensions()
Some(Destination::ExplicitIp(SocketAddr::from((ip, port)))),
None => None
};
if dest.is_none() {
dest = req.extensions()
.get::<Arc<ctx::transport::Server>>() .get::<Arc<ctx::transport::Server>>()
.and_then(|ctx| { .and_then(|ctx| {
ctx.orig_dst_if_not_local() ctx.orig_dst_if_not_local()
}) });
.map(Destination::ImplicitOriginalDst) Destination::External(orig_dst?)
}; };
// If there is no authority in the request URI or in the Host header,
// and there is no original dst, then we have nothing! In that case,
// return `None`, which results an "unrecognized" error. In practice,
// this shouldn't ever happen, since we expect the proxy to be run on
// Linux servers, with iptables setup, so there should always be an
// original destination.
let dest = dest?;
Some(proto.into_key(dest)) Some(proto.into_key(dest))
} }
@ -140,15 +145,16 @@ where
debug!("building outbound {:?} client to {:?}", protocol, dest); debug!("building outbound {:?} client to {:?}", protocol, dest);
let resolve = match *dest { let resolve = match *dest {
Destination::Hostname(ref authority) => Destination::LocalSvc(ref authority) => {
Discovery::NamedSvc(self.discovery.resolve( Discovery::LocalSvc(self.discovery.resolve(
authority, authority,
self.bind.clone().with_protocol(protocol.clone()), self.bind.clone().with_protocol(protocol.clone()),
)), ))
Destination::ExplicitIp(addr) => },
Discovery::ExplicitIp((addr, self.bind.clone().with_protocol(protocol.clone()))), Destination::External(addr) => {
Destination::ImplicitOriginalDst(addr) => Discovery::External(Some((addr, self.bind.clone()
Discovery::External(Some((addr, self.bind.clone().with_protocol(protocol.clone())))), .with_protocol(protocol.clone()))))
}
}; };
let loaded = tower_balance::load::WithPendingRequests::new(resolve); let loaded = tower_balance::load::WithPendingRequests::new(resolve);
@ -170,8 +176,7 @@ where
} }
pub enum Discovery<B> { pub enum Discovery<B> {
NamedSvc(discovery::Watch<BindProtocol<B>>), LocalSvc(discovery::Watch<BindProtocol<B>>),
ExplicitIp((SocketAddr, BindProtocol<B>)),
External(Option<(SocketAddr, BindProtocol<B>)>), External(Option<(SocketAddr, BindProtocol<B>)>),
} }
@ -188,18 +193,8 @@ where
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> { fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
match *self { match *self {
Discovery::NamedSvc(ref mut w) => w.poll() Discovery::LocalSvc(ref mut w) => w.poll()
.map_err(|_| BindError::Internal), .map_err(|_| BindError::Internal),
Discovery::ExplicitIp((addr, ref bind)) => {
// This "discovers" a single address for a fixed IP address
// that never has another change. This can mean it floats
// in the Balancer forever. However, when we finally add
// circuit-breaking, this should be able to take care of itself,
// closing down when the connection is no longer usable.
let svc = bind.bind(&addr)
.map_err(|_| BindError::External{ addr })?;
Ok(Async::Ready(Change::Insert(addr, svc)))
},
Discovery::External(ref mut opt) => { Discovery::External(ref mut opt) => {
// This "discovers" a single address for an external service // This "discovers" a single address for an external service
// that never has another change. This can mean it floats // that never has another change. This can mean it floats

View File

@ -9,6 +9,7 @@ use std::str::FromStr;
use http; use http;
use connection; use connection;
use convert;
use dns; use dns;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -23,24 +24,14 @@ pub struct HostAndPort {
pub port: u16, pub port: u16,
} }
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct DnsNameAndPort {
pub host: dns::Name,
pub port: u16,
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum Host { pub enum Host {
DnsName(dns::Name), DnsName(String),
Ip(IpAddr), Ip(IpAddr),
} }
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub enum HostAndPortError { pub enum HostAndPortError {
/// The host is not a valid DNS name or IP address.
InvalidHost,
/// The port is missing. /// The port is missing.
MissingPort, MissingPort,
} }
@ -54,23 +45,17 @@ pub struct LookupAddressAndConnect {
// ===== impl HostAndPort ===== // ===== impl HostAndPort =====
impl HostAndPort { impl<'a> convert::TryFrom<&'a http::uri::Authority> for HostAndPort {
pub fn normalize(a: &http::uri::Authority, default_port: Option<u16>) type Err = HostAndPortError;
-> Result<Self, HostAndPortError> fn try_from(a: &http::uri::Authority) -> Result<Self, Self::Err> {
{
let host = { let host = {
match dns::Name::normalize(a.host()) { let host = a.host();
Ok(host) => Host::DnsName(host), match IpAddr::from_str(host) {
Err(_) => { Err(_) => Host::DnsName(host.to_owned()),
let ip: IpAddr = IpAddr::from_str(a.host()) Ok(ip) => Host::Ip(ip),
.map_err(|_| HostAndPortError::InvalidHost)?;
Host::Ip(ip)
},
} }
}; };
let port = a.port() let port = a.port().ok_or_else(|| HostAndPortError::MissingPort)?;
.or(default_port)
.ok_or_else(|| HostAndPortError::MissingPort)?;
Ok(HostAndPort { Ok(HostAndPort {
host, host,
port port

View File

@ -3,7 +3,7 @@ mod so_original_dst;
pub use self::connect::{ pub use self::connect::{
Connect, Connect,
DnsNameAndPort, Host, HostAndPort, HostAndPortError, Host, HostAndPort, HostAndPortError,
LookupAddressAndConnect, LookupAddressAndConnect,
}; };
pub use self::so_original_dst::{GetOriginalDst, SoOriginalDst}; pub use self::so_original_dst::{GetOriginalDst, SoOriginalDst};