Proxy: Refactor DNS name parsing and normalization (#673)

Proxy: Refactor DNS name parsing and normalization

Only the destination service needs normalized names (and even then,
that's just temporary). The rest of the code needs the name as it was
given, except case-normalized (lowercased). Because DNS fallack isn't
implemented in service discovery yet, Outbound still a temporary
workaround using FullyQualifiedName to keep things working; thta will
be removed once DNS fallback is implemented in service discovery.

Signed-off-by: Brian Smith <brian@briansmith.org>
This commit is contained in:
Brian Smith 2018-04-05 12:32:12 -10:00 committed by GitHub
parent 28d5007cdf
commit 311ef410a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 280 additions and 207 deletions

View File

@ -349,7 +349,7 @@ fn parse_url(s: &str) -> Result<HostAndPort, ParseError> {
// https://github.com/hyperium/http/issues/127. For now just ignore any
// fragment that is there.
HostAndPort::try_from(authority)
HostAndPort::normalize(authority, None)
.map_err(|e| ParseError::UrlError(UrlError::AuthorityError(e)))
}
@ -366,7 +366,7 @@ fn parse<T, Parse>(strings: &Strings, name: &str, parse: Parse) -> Result<Option
match strings.get(name)? {
Some(ref s) => {
let r = parse(s).map_err(|parse_error| {
error!("{} is not valid: {:?}", name, parse_error);
error!("{}={:?} is not valid: {:?}", name, s, parse_error);
Error::InvalidEnvVar
})?;
Ok(Some(r))

View File

@ -16,13 +16,14 @@ use conduit_proxy_controller_grpc::common::{Destination, TcpAddress};
use conduit_proxy_controller_grpc::destination::Update as PbUpdate;
use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2;
use conduit_proxy_controller_grpc::destination::client::{Destination as DestinationSvc};
use transport::DnsNameAndPort;
use control::cache::{Cache, CacheChange, Exists};
/// A handle to start watching a destination for address changes.
#[derive(Clone, Debug)]
pub struct Discovery {
tx: mpsc::UnboundedSender<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
tx: mpsc::UnboundedSender<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>,
}
/// A `tower_discover::Discover`, given to a `tower_balance::Balance`.
@ -35,29 +36,28 @@ pub struct Watch<B> {
/// A background handle to eventually bind on the controller thread.
#[derive(Debug)]
pub struct Background {
rx: mpsc::UnboundedReceiver<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>,
default_destination_namespace: String,
}
/// A future returned from `Background::work()`, doing the work of talking to
/// the controller destination API.
// TODO: debug impl
pub struct DiscoveryWork<T: HttpService<ResponseBody = RecvBody>> {
destinations: HashMap<
FullyQualifiedAuthority,
DestinationSet<T>
>,
default_destination_namespace: String,
destinations: HashMap<DnsNameAndPort, DestinationSet<T>>,
/// A queue of authorities that need to be reconnected.
reconnects: VecDeque<FullyQualifiedAuthority>,
reconnects: VecDeque<DnsNameAndPort>,
/// The Destination.Get RPC client service.
/// Each poll, records whether the rpc service was till ready.
rpc_ready: bool,
/// A receiver of new watch requests.
rx: mpsc::UnboundedReceiver<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>,
}
struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
addrs: Exists<Cache<SocketAddr>>,
query: DestinationServiceQuery<T>,
query: Option<DestinationServiceQuery<T>>,
txs: Vec<mpsc::UnboundedSender<Update>>,
}
@ -129,7 +129,7 @@ pub trait Bind {
///
/// The `Discovery` is used by a listener, the `Background` is consumed
/// on the controller thread.
pub fn new() -> (Discovery, Background) {
pub fn new(default_destination_namespace: String) -> (Discovery, Background) {
let (tx, rx) = mpsc::unbounded();
(
Discovery {
@ -137,6 +137,7 @@ pub fn new() -> (Discovery, Background) {
},
Background {
rx,
default_destination_namespace,
},
)
}
@ -145,7 +146,7 @@ pub fn new() -> (Discovery, Background) {
impl Discovery {
/// Start watching for address changes for a certain authority.
pub fn resolve<B>(&self, authority: &FullyQualifiedAuthority, bind: B) -> Watch<B> {
pub fn resolve<B>(&self, authority: &DnsNameAndPort, bind: B) -> Watch<B> {
trace!("resolve; authority={:?}", authority);
let (tx, rx) = mpsc::unbounded();
self.tx
@ -202,6 +203,7 @@ impl Background {
T::Error: fmt::Debug,
{
DiscoveryWork {
default_destination_namespace: self.default_destination_namespace,
destinations: HashMap::new(),
reconnects: VecDeque::new(),
rpc_ready: false,
@ -276,7 +278,11 @@ where
}
Entry::Vacant(vac) => {
let query =
DestinationServiceQuery::connect(client, vac.key(), "connect");
DestinationServiceQuery::connect_maybe(
&self.default_destination_namespace,
client,
vac.key(),
"connect");
vac.insert(DestinationSet {
addrs: Exists::Unknown,
query,
@ -301,7 +307,11 @@ where
while let Some(auth) = self.reconnects.pop_front() {
if let Some(set) = self.destinations.get_mut(&auth) {
set.query = DestinationServiceQuery::connect(client, &auth, "reconnect");
set.query = DestinationServiceQuery::connect_maybe(
&self.default_destination_namespace,
client,
&auth,
"reconnect");
return true;
} else {
trace!("reconnect no longer needed: {:?}", auth);
@ -314,10 +324,11 @@ where
for (auth, set) in &mut self.destinations {
let needs_reconnect = 'set: loop {
let poll_result = match set.query {
DestinationServiceQuery::NeedsReconnect => {
None |
Some(DestinationServiceQuery::NeedsReconnect) => {
continue;
},
DestinationServiceQuery::ConnectedOrConnecting{ ref mut rx } => {
Some(DestinationServiceQuery::ConnectedOrConnecting{ ref mut rx }) => {
rx.poll()
}
};
@ -353,9 +364,9 @@ where
};
if needs_reconnect {
set.query = DestinationServiceQuery::NeedsReconnect;
set.query = Some(DestinationServiceQuery::NeedsReconnect);
set.reset_on_next_modification();
self.reconnects.push_back(FullyQualifiedAuthority::clone(auth));
self.reconnects.push_back(auth.clone());
}
}
}
@ -365,16 +376,28 @@ where
// ===== impl DestinationServiceQuery =====
impl<T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>> DestinationServiceQuery<T> {
fn connect(client: &mut T, auth: &FullyQualifiedAuthority, connect_or_reconnect: &str) -> Self {
// Initiates a query `query` to the Destination service and returns it as `Some(query)` if the
// given authority's host is of a form suitable for using to query the Destination service.
// Otherwise, returns `None`.
fn connect_maybe(
default_destination_namespace: &str,
client: &mut T,
auth: &DnsNameAndPort,
connect_or_reconnect: &str)
-> Option<Self>
{
trace!("DestinationServiceQuery {} {:?}", connect_or_reconnect, auth);
FullyQualifiedAuthority::normalize(auth, default_destination_namespace)
.map(|auth| {
let req = Destination {
scheme: "k8s".into(),
path: auth.without_trailing_dot().as_str().into(),
path: auth.without_trailing_dot().to_owned(),
};
// TODO: Can grpc::Request::new be removed?
let mut svc = DestinationSvc::new(client.lift_ref());
let response = svc.get(grpc::Request::new(req));
DestinationServiceQuery::ConnectedOrConnecting { rx: UpdateRx::Waiting(response) }
})
}
}
@ -391,7 +414,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
}
}
fn add<A>(&mut self, authority_for_logging: &FullyQualifiedAuthority, addrs_to_add: A)
fn add<A>(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_add: A)
where A: Iterator<Item = SocketAddr>
{
let mut cache = match self.addrs.take() {
@ -405,7 +428,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
self.addrs = Exists::Yes(cache);
}
fn remove<A>(&mut self, authority_for_logging: &FullyQualifiedAuthority, addrs_to_remove: A)
fn remove<A>(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_remove: A)
where A: Iterator<Item = SocketAddr>
{
let cache = match self.addrs.take() {
@ -421,7 +444,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
self.addrs = Exists::Yes(cache);
}
fn no_endpoints(&mut self, authority_for_logging: &FullyQualifiedAuthority, exists: bool) {
fn no_endpoints(&mut self, authority_for_logging: &DnsNameAndPort, exists: bool) {
trace!("no endpoints for {:?} that is known to {}", authority_for_logging,
if exists { "exist" } else { "not exist" });
match self.addrs.take() {
@ -440,7 +463,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
}
fn on_change(txs: &mut Vec<mpsc::UnboundedSender<Update>>,
authority_for_logging: &FullyQualifiedAuthority,
authority_for_logging: &DnsNameAndPort,
addr: SocketAddr,
change: CacheChange) {
let (update_str, update_constructor): (&'static str, fn(SocketAddr) -> Update) =

View File

@ -17,8 +17,7 @@ use tower_h2;
use tower_reconnect::{Error as ReconnectError, Reconnect};
use dns;
use fully_qualified_authority::FullyQualifiedAuthority;
use transport::{HostAndPort, LookupAddressAndConnect};
use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect};
use timeout::{Timeout, TimeoutError};
mod cache;
@ -41,8 +40,9 @@ pub struct Background {
disco: DiscoBg,
}
pub fn new() -> (Control, Background) {
let (tx, rx) = self::discovery::new();
pub fn new(default_destination_namespace: String) -> (Control, Background)
{
let (tx, rx) = self::discovery::new(default_destination_namespace);
let c = Control {
disco: tx,
@ -58,7 +58,7 @@ pub fn new() -> (Control, Background) {
// ===== impl Control =====
impl Control {
pub fn resolve<B>(&self, auth: &FullyQualifiedAuthority, bind: B) -> Watch<B> {
pub fn resolve<B>(&self, auth: &DnsNameAndPort, bind: B) -> Watch<B> {
self.disco.resolve(auth, bind)
}
}

View File

@ -3,6 +3,7 @@ use abstract_ns::HostResolve;
use domain;
use futures::prelude::*;
use ns_dns_tokio;
use std::fmt;
use std::net::IpAddr;
use std::path::Path;
use std::str::FromStr;
@ -18,15 +19,46 @@ pub struct Resolver(ns_dns_tokio::DnsResolver);
pub enum IpAddrFuture {
DNS(ns_dns_tokio::HostFuture),
Fixed(IpAddr),
InvalidDNSName(String),
}
pub enum Error {
InvalidDNSName(String),
NoAddressesFound,
ResolutionFailed(<ns_dns_tokio::HostFuture as Future>::Error),
}
/// A DNS name.
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct Name(abstract_ns::Name);
impl fmt::Display for Name {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
self.0.fmt(f)
}
}
impl Name {
/// Parses the input string as a DNS name, normalizing it to lowercase.
pub fn normalize(s: &str) -> Result<Self, ()> {
// XXX: `abstract_ns::Name::from_str()` wrongly accepts IP addresses as
// domain names. Protect against this. TODO: Fix abstract_ns.
if let Ok(_) = IpAddr::from_str(s) {
return Err(());
}
// XXX: `abstract_ns::Name::from_str()` doesn't accept uppercase letters.
// TODO: Avoid this extra allocation.
let s = s.to_ascii_lowercase();
abstract_ns::Name::from_str(&s)
.map(Name)
.map_err(|_| ())
}
}
impl AsRef<str> for Name {
fn as_ref(&self) -> &str {
self.0.as_ref()
}
}
impl Config {
/// Note that this ignores any errors reading or parsing the resolve.conf
/// file, just like the `domain` crate does.
@ -49,10 +81,7 @@ impl Resolver {
match *host {
transport::Host::DnsName(ref name) => {
trace!("resolve {}", name);
match abstract_ns::Name::from_str(name) {
Ok(name) => IpAddrFuture::DNS(self.0.resolve_host(&name)),
Err(_) => IpAddrFuture::InvalidDNSName(name.clone()),
}
IpAddrFuture::DNS(self.0.resolve_host(&name.0))
}
transport::Host::Ip(addr) => IpAddrFuture::Fixed(addr),
}
@ -74,7 +103,55 @@ impl Future for IpAddrFuture {
Err(e) => Err(Error::ResolutionFailed(e)),
},
IpAddrFuture::Fixed(addr) => Ok(Async::Ready(addr)),
IpAddrFuture::InvalidDNSName(ref name) => Err(Error::InvalidDNSName(name.clone())),
}
}
}
#[cfg(test)]
mod tests {
use super::Name;
#[test]
fn test_dns_name_parsing() {
struct Case {
input: &'static str,
output: &'static str,
}
static VALID: &[Case] = &[
// Almost all digits and dots, similar to IPv4 addresses.
Case { input: "1.2.3.x", output: "1.2.3.x", },
Case { input: "1.2.3.x", output: "1.2.3.x", },
Case { input: "1.2.3.4A", output: "1.2.3.4a", },
Case { input: "a.1.2.3", output: "a.1.2.3", },
Case { input: "1.2.x.3", output: "1.2.x.3", },
Case { input: "a.b.c.d", output: "a.b.c.d", },
// Uppercase letters in labels
Case { input: "A.b.c.d", output: "a.b.c.d", },
Case { input: "a.mIddle.c", output: "a.middle.c", },
Case { input: "a.b.c.D", output: "a.b.c.d", },
];
for case in VALID {
let name = Name::normalize(case.input).expect("is a valid DNS name");
assert_eq!(name.as_ref(), case.output);
}
static INVALID: &[&str] = &[
"",
"1.2.3.4",
"::1",
"[::1]",
":1234",
"1.2.3.4:11234",
"abc.com:1234",
];
for case in INVALID {
assert!(Name::normalize(case).is_err(),
"{} is invalid", case);
}
}
}

View File

@ -1,60 +1,28 @@
use bytes::BytesMut;
use bytes::{BytesMut};
use std::net::IpAddr;
use std::str::FromStr;
use http::uri::Authority;
use transport::DnsNameAndPort;
/// A normalized `Authority`.
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct FullyQualifiedAuthority(Authority);
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct NamedAddress {
pub name: FullyQualifiedAuthority,
pub use_destination_service: bool
}
pub struct FullyQualifiedAuthority(String);
impl FullyQualifiedAuthority {
/// Normalizes the name according to Kubernetes service naming conventions.
/// Case folding is not done; that is done internally inside `Authority`.
///
/// This assumes the authority is syntactically valid.
pub fn normalize(authority: &Authority, default_namespace: &str)
-> NamedAddress
{
// Don't change IP-address-based authorities.
if IpAddr::from_str(authority.host()).is_ok() {
return NamedAddress {
name: FullyQualifiedAuthority(authority.clone()),
use_destination_service: false,
}
};
// TODO: `Authority` doesn't have a way to get the serialized form of the
// port, so do it ourselves.
let (name, colon_port) = {
let authority = authority.as_str();
match authority.rfind(':') {
Some(p) => authority.split_at(p),
None => (authority, ""),
}
};
pub fn normalize(authority: &DnsNameAndPort, default_namespace: &str) -> Option<Self> {
let name: &str = authority.host.as_ref();
// parts should have a maximum 4 of pieces (name, namespace, svc, zone)
let mut parts = name.splitn(4, '.');
// `Authority` guarantees the name has at least one part.
// `dns::Name` guarantees the name has at least one part.
assert!(parts.next().is_some());
// Rewrite "$name" -> "$name.$default_namespace".
let has_explicit_namespace = match parts.next() {
Some("") => {
// "$name." is an external absolute name.
return NamedAddress {
name: FullyQualifiedAuthority(authority.clone()),
use_destination_service: false,
};
return None;
},
Some(_) => true,
None => false,
@ -69,20 +37,14 @@ impl FullyQualifiedAuthority {
let append_svc = if let Some(part) = parts.next() {
if !part.eq_ignore_ascii_case("svc") {
// If not "$name.$namespace.svc", treat as external.
return NamedAddress {
name: FullyQualifiedAuthority(authority.clone()),
use_destination_service: false,
};
return None;
}
false
} else if has_explicit_namespace {
true
} else if namespace_to_append.is_none() {
// We can't append ".svc" without a namespace, so treat as external.
return NamedAddress {
name: FullyQualifiedAuthority(authority.clone()),
use_destination_service: false,
}
return None;
} else {
true
};
@ -100,10 +62,7 @@ impl FullyQualifiedAuthority {
// "a.b.svc." is an external absolute name.
// "a.b.svc.foo" is external if the default zone is not
// "foo".
return NamedAddress {
name: FullyQualifiedAuthority(authority.clone()),
use_destination_service: false,
}
return None;
}
(None, strip_last)
} else {
@ -121,17 +80,16 @@ impl FullyQualifiedAuthority {
additional_len += 1 + zone.len(); // "." + zone
}
// If we're not going to change anything then don't allocate anything.
if additional_len == 0 && !strip_last {
return NamedAddress {
name: FullyQualifiedAuthority(authority.clone()),
use_destination_service: true,
}
}
let port_str_len = match authority.port {
80 => 0, // XXX: Assumes http://, which is all we support right now.
p if p >= 10000 => 1 + 5,
p if p >= 1000 => 1 + 4,
p if p >= 100 => 1 + 3,
p if p >= 10 => 1 + 2,
_ => 1,
};
// `authority.as_str().len()` includes the length of `colon_port`.
let mut normalized =
BytesMut::with_capacity(authority.as_str().len() + additional_len);
let mut normalized = BytesMut::with_capacity(name.len() + additional_len + port_str_len);
normalized.extend_from_slice(name.as_bytes());
if let Some(namespace) = namespace_to_append {
normalized.extend_from_slice(b".");
@ -144,52 +102,58 @@ impl FullyQualifiedAuthority {
normalized.extend_from_slice(b".");
normalized.extend_from_slice(zone.as_bytes());
}
normalized.extend_from_slice(colon_port.as_bytes());
if strip_last {
let new_len = normalized.len() - 1;
normalized.truncate(new_len);
}
let name = Authority::from_shared(normalized.freeze())
.expect("syntactically-valid authority");
let name = FullyQualifiedAuthority(name);
NamedAddress {
name,
use_destination_service: true,
}
// Append the port
if port_str_len > 0 {
normalized.extend_from_slice(b":");
let port = authority.port.to_string();
normalized.extend_from_slice(port.as_ref());
}
pub fn without_trailing_dot(&self) -> &Authority {
Some(FullyQualifiedAuthority(String::from_utf8(normalized.freeze().to_vec()).unwrap()))
}
pub fn without_trailing_dot(&self) -> &str {
&self.0
}
}
#[cfg(test)]
mod tests {
use transport::{DnsNameAndPort, Host, HostAndPort};
use http::uri::Authority;
use std::str::FromStr;
#[test]
fn test_normalized_authority() {
fn local(input: &str, default_namespace: &str) -> String {
use bytes::Bytes;
use http::uri::Authority;
fn dns_name_and_port_from_str(input: &str) -> DnsNameAndPort {
let authority = Authority::from_str(input).unwrap();
match HostAndPort::normalize(&authority, Some(80)) {
Ok(HostAndPort { host: Host::DnsName(host), port }) =>
DnsNameAndPort { host, port },
Err(e) => {
unreachable!("{:?} when parsing {:?}", e, input)
},
_ => unreachable!("Not a DNS name: {:?}", input),
}
}
let input = Authority::from_shared(Bytes::from(input.as_bytes()))
.unwrap();
let output = super::FullyQualifiedAuthority::normalize(
&input, default_namespace);
assert_eq!(output.use_destination_service, true, "input: {}", input);
output.name.without_trailing_dot().as_str().into()
fn local(input: &str, default_namespace: &str) -> String {
let name = dns_name_and_port_from_str(input);
let output = super::FullyQualifiedAuthority::normalize(&name, default_namespace);
assert!(output.is_some(), "input: {}", input);
output.unwrap().without_trailing_dot().into()
}
fn external(input: &str, default_namespace: &str) {
use bytes::Bytes;
use http::uri::Authority;
let input = Authority::from_shared(Bytes::from(input.as_bytes())).unwrap();
let output = super::FullyQualifiedAuthority::normalize(
&input, default_namespace);
assert_eq!(output.use_destination_service, false);
assert_eq!(output.name.without_trailing_dot().as_str(), input);
let name = dns_name_and_port_from_str(input);
let output = super::FullyQualifiedAuthority::normalize(&name, default_namespace);
assert!(output.is_none(), "input: {}", input);
}
assert_eq!("name.namespace.svc.cluster.local", local("name", "namespace"));
@ -245,20 +209,10 @@ mod tests {
local("name.namespace.svc.cluster.local:1234", "namespace"));
// "SVC" is recognized as being equivalent to "svc"
assert_eq!("name.namespace.SVC.cluster.local",
assert_eq!("name.namespace.svc.cluster.local",
local("name.namespace.SVC", "namespace"));
external("name.namespace.SVC.cluster", "namespace");
assert_eq!("name.namespace.SVC.cluster.local",
assert_eq!("name.namespace.svc.cluster.local",
local("name.namespace.SVC.cluster.local", "namespace"));
// IPv4 addresses are left unchanged.
external("1.2.3.4", "namespace");
external("1.2.3.4:1234", "namespace");
external("127.0.0.1", "namespace");
external("127.0.0.1:8080", "namespace");
// IPv6 addresses are left unchanged.
external("[::1]", "namespace");
external("[::1]:1234", "namespace");
}
}

View File

@ -202,11 +202,10 @@ where
config.metrics_flush_interval,
);
let (control, control_bg) = control::new();
let executor = core.handle();
let (control, control_bg) = control::new(config.pod_namespace.clone());
let dns_config = dns::Config::from_file(&config.resolv_conf_path);
let executor = core.handle();
let bind = Bind::new(executor.clone()).with_sensors(sensors.clone());

View File

@ -18,9 +18,10 @@ use bind::{self, Bind, Protocol};
use control::{self, discovery};
use control::discovery::Bind as BindTrait;
use ctx;
use fully_qualified_authority::{FullyQualifiedAuthority, NamedAddress};
use fully_qualified_authority::FullyQualifiedAuthority;
use timeout::Timeout;
use transparency::h1;
use transport::{DnsNameAndPort, Host, HostAndPort};
type BindProtocol<B> = bind::BindProtocol<Arc<ctx::Proxy>, B>;
@ -52,25 +53,11 @@ impl<B> Outbound<B> {
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Destination {
LocalSvc(FullyQualifiedAuthority),
External(SocketAddr),
Hostname(DnsNameAndPort),
ExplicitIp(SocketAddr),
ImplicitOriginalDst(SocketAddr),
}
impl From<FullyQualifiedAuthority> for Destination {
#[inline]
fn from(authority: FullyQualifiedAuthority) -> Self {
Destination::LocalSvc(authority)
}
}
impl From<SocketAddr> for Destination {
#[inline]
fn from(addr: SocketAddr) -> Self {
Destination::External(addr)
}
}
impl<B> Recognize for Outbound<B>
where
B: tower_h2::Body + 'static,
@ -92,38 +79,46 @@ where
// by `NormalizeUri`, as we need to know whether the request will
// be routed by Host/authority or by SO_ORIGINAL_DST, in order to
// determine whether the service is reusable.
let local = req.uri().authority_part()
let authority = req.uri().authority_part()
.cloned()
// Therefore, we need to check the host header as well as the URI
// for a valid authority, before we fall back to SO_ORIGINAL_DST.
.or_else(|| h1::authority_from_host(req))
.map(|authority| {
FullyQualifiedAuthority::normalize(
&authority,
&self.default_namespace)
});
.or_else(|| h1::authority_from_host(req));
// If we can't fully qualify the authority as a local service,
// and there is no original dst, then we have nothing! In that
// case, we return `None`, which results an "unrecognized" error.
//
// In practice, this shouldn't ever happen, since we expect the proxy
// to be run on Linux servers, with iptables setup, so there should
// always be an original destination.
let dest = if let Some(NamedAddress {
name,
use_destination_service: true
}) = local {
Destination::LocalSvc(name)
} else {
let orig_dst = req.extensions()
// TODO: Return error when `HostAndPort::normalize()` fails.
let mut dest = match authority.as_ref()
.and_then(|auth| HostAndPort::normalize(auth, Some(80)).ok()) {
Some(HostAndPort { host: Host::DnsName(dns_name), port }) => {
let authority = DnsNameAndPort { host: dns_name, port };
// Work around the inability of control/discovery.rs to handle unnormalized names.
// TODO: Remove this use of `FullyQualifiedAuthority::normalize()` and use
// `Destination::Hostname` for all `Host::DnsName` values once DNS machinery is
// added to control/discovery.rs.
FullyQualifiedAuthority::normalize(&authority, &self.default_namespace)
.map(|_| Destination::Hostname(authority))
},
Some(HostAndPort { host: Host::Ip(ip), port }) =>
Some(Destination::ExplicitIp(SocketAddr::from((ip, port)))),
None => None
};
if dest.is_none() {
dest = req.extensions()
.get::<Arc<ctx::transport::Server>>()
.and_then(|ctx| {
ctx.orig_dst_if_not_local()
});
Destination::External(orig_dst?)
})
.map(Destination::ImplicitOriginalDst)
};
// If there is no authority in the request URI or in the Host header,
// and there is no original dst, then we have nothing! In that case,
// return `None`, which results an "unrecognized" error. In practice,
// this shouldn't ever happen, since we expect the proxy to be run on
// Linux servers, with iptables setup, so there should always be an
// original destination.
let dest = dest?;
Some(proto.into_key(dest))
}
@ -145,16 +140,15 @@ where
debug!("building outbound {:?} client to {:?}", protocol, dest);
let resolve = match *dest {
Destination::LocalSvc(ref authority) => {
Discovery::LocalSvc(self.discovery.resolve(
Destination::Hostname(ref authority) =>
Discovery::NamedSvc(self.discovery.resolve(
authority,
self.bind.clone().with_protocol(protocol.clone()),
))
},
Destination::External(addr) => {
Discovery::External(Some((addr, self.bind.clone()
.with_protocol(protocol.clone()))))
}
)),
Destination::ExplicitIp(addr) =>
Discovery::ExplicitIp((addr, self.bind.clone().with_protocol(protocol.clone()))),
Destination::ImplicitOriginalDst(addr) =>
Discovery::External(Some((addr, self.bind.clone().with_protocol(protocol.clone())))),
};
let loaded = tower_balance::load::WithPendingRequests::new(resolve);
@ -176,7 +170,8 @@ where
}
pub enum Discovery<B> {
LocalSvc(discovery::Watch<BindProtocol<B>>),
NamedSvc(discovery::Watch<BindProtocol<B>>),
ExplicitIp((SocketAddr, BindProtocol<B>)),
External(Option<(SocketAddr, BindProtocol<B>)>),
}
@ -193,8 +188,18 @@ where
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
match *self {
Discovery::LocalSvc(ref mut w) => w.poll()
Discovery::NamedSvc(ref mut w) => w.poll()
.map_err(|_| BindError::Internal),
Discovery::ExplicitIp((addr, ref bind)) => {
// This "discovers" a single address for a fixed IP address
// that never has another change. This can mean it floats
// in the Balancer forever. However, when we finally add
// circuit-breaking, this should be able to take care of itself,
// closing down when the connection is no longer usable.
let svc = bind.bind(&addr)
.map_err(|_| BindError::External{ addr })?;
Ok(Async::Ready(Change::Insert(addr, svc)))
},
Discovery::External(ref mut opt) => {
// This "discovers" a single address for an external service
// that never has another change. This can mean it floats

View File

@ -9,7 +9,6 @@ use std::str::FromStr;
use http;
use connection;
use convert;
use dns;
#[derive(Debug, Clone)]
@ -24,14 +23,24 @@ pub struct HostAndPort {
pub port: u16,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct DnsNameAndPort {
pub host: dns::Name,
pub port: u16,
}
#[derive(Clone, Debug)]
pub enum Host {
DnsName(String),
DnsName(dns::Name),
Ip(IpAddr),
}
#[derive(Clone, Copy, Debug)]
pub enum HostAndPortError {
/// The host is not a valid DNS name or IP address.
InvalidHost,
/// The port is missing.
MissingPort,
}
@ -45,17 +54,23 @@ pub struct LookupAddressAndConnect {
// ===== impl HostAndPort =====
impl<'a> convert::TryFrom<&'a http::uri::Authority> for HostAndPort {
type Err = HostAndPortError;
fn try_from(a: &http::uri::Authority) -> Result<Self, Self::Err> {
impl HostAndPort {
pub fn normalize(a: &http::uri::Authority, default_port: Option<u16>)
-> Result<Self, HostAndPortError>
{
let host = {
let host = a.host();
match IpAddr::from_str(host) {
Err(_) => Host::DnsName(host.to_owned()),
Ok(ip) => Host::Ip(ip),
match dns::Name::normalize(a.host()) {
Ok(host) => Host::DnsName(host),
Err(_) => {
let ip: IpAddr = IpAddr::from_str(a.host())
.map_err(|_| HostAndPortError::InvalidHost)?;
Host::Ip(ip)
},
}
};
let port = a.port().ok_or_else(|| HostAndPortError::MissingPort)?;
let port = a.port()
.or(default_port)
.ok_or_else(|| HostAndPortError::MissingPort)?;
Ok(HostAndPort {
host,
port

View File

@ -3,7 +3,7 @@ mod so_original_dst;
pub use self::connect::{
Connect,
Host, HostAndPort, HostAndPortError,
DnsNameAndPort, Host, HostAndPort, HostAndPortError,
LookupAddressAndConnect,
};
pub use self::so_original_dst::{GetOriginalDst, SoOriginalDst};