mirror of https://github.com/linkerd/linkerd2.git
Proxy: Move DNS name normalization to service discovery (#722)
Only the destination service needs normalized names (and even then, that's just temporary). The rest of the code needs the name as it was given, except case-normalized (lowercased). Because DNS fallack isn't implemented in service discovery yet, Outbound still a temporary workaround using FullyQualifiedName to keep things working; thta will be removed once DNS fallback is implemented in service discovery. Signed-off-by: Brian Smith <brian@briansmith.org>
This commit is contained in:
parent
716b392231
commit
7d3b715c4d
|
@ -349,7 +349,7 @@ fn parse_url(s: &str) -> Result<HostAndPort, ParseError> {
|
|||
// https://github.com/hyperium/http/issues/127. For now just ignore any
|
||||
// fragment that is there.
|
||||
|
||||
HostAndPort::try_from(authority)
|
||||
HostAndPort::normalize(authority, None)
|
||||
.map_err(|e| ParseError::UrlError(UrlError::AuthorityError(e)))
|
||||
}
|
||||
|
||||
|
@ -366,7 +366,7 @@ fn parse<T, Parse>(strings: &Strings, name: &str, parse: Parse) -> Result<Option
|
|||
match strings.get(name)? {
|
||||
Some(ref s) => {
|
||||
let r = parse(s).map_err(|parse_error| {
|
||||
error!("{} is not valid: {:?}", name, parse_error);
|
||||
error!("{}={:?} is not valid: {:?}", name, s, parse_error);
|
||||
Error::InvalidEnvVar
|
||||
})?;
|
||||
Ok(Some(r))
|
||||
|
|
|
@ -16,13 +16,14 @@ use conduit_proxy_controller_grpc::common::{Destination, TcpAddress};
|
|||
use conduit_proxy_controller_grpc::destination::Update as PbUpdate;
|
||||
use conduit_proxy_controller_grpc::destination::update::Update as PbUpdate2;
|
||||
use conduit_proxy_controller_grpc::destination::client::{Destination as DestinationSvc};
|
||||
use transport::DnsNameAndPort;
|
||||
|
||||
use control::cache::{Cache, CacheChange, Exists};
|
||||
|
||||
/// A handle to start watching a destination for address changes.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Discovery {
|
||||
tx: mpsc::UnboundedSender<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
|
||||
tx: mpsc::UnboundedSender<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>,
|
||||
}
|
||||
|
||||
/// A `tower_discover::Discover`, given to a `tower_balance::Balance`.
|
||||
|
@ -35,29 +36,28 @@ pub struct Watch<B> {
|
|||
/// A background handle to eventually bind on the controller thread.
|
||||
#[derive(Debug)]
|
||||
pub struct Background {
|
||||
rx: mpsc::UnboundedReceiver<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
|
||||
rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>,
|
||||
default_destination_namespace: String,
|
||||
}
|
||||
|
||||
/// A future returned from `Background::work()`, doing the work of talking to
|
||||
/// the controller destination API.
|
||||
// TODO: debug impl
|
||||
pub struct DiscoveryWork<T: HttpService<ResponseBody = RecvBody>> {
|
||||
destinations: HashMap<
|
||||
FullyQualifiedAuthority,
|
||||
DestinationSet<T>
|
||||
>,
|
||||
default_destination_namespace: String,
|
||||
destinations: HashMap<DnsNameAndPort, DestinationSet<T>>,
|
||||
/// A queue of authorities that need to be reconnected.
|
||||
reconnects: VecDeque<FullyQualifiedAuthority>,
|
||||
reconnects: VecDeque<DnsNameAndPort>,
|
||||
/// The Destination.Get RPC client service.
|
||||
/// Each poll, records whether the rpc service was till ready.
|
||||
rpc_ready: bool,
|
||||
/// A receiver of new watch requests.
|
||||
rx: mpsc::UnboundedReceiver<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
|
||||
rx: mpsc::UnboundedReceiver<(DnsNameAndPort, mpsc::UnboundedSender<Update>)>,
|
||||
}
|
||||
|
||||
struct DestinationSet<T: HttpService<ResponseBody = RecvBody>> {
|
||||
addrs: Exists<Cache<SocketAddr, ()>>,
|
||||
query: DestinationServiceQuery<T>,
|
||||
query: Option<DestinationServiceQuery<T>>,
|
||||
txs: Vec<mpsc::UnboundedSender<Update>>,
|
||||
}
|
||||
|
||||
|
@ -129,7 +129,7 @@ pub trait Bind {
|
|||
///
|
||||
/// The `Discovery` is used by a listener, the `Background` is consumed
|
||||
/// on the controller thread.
|
||||
pub fn new() -> (Discovery, Background) {
|
||||
pub fn new(default_destination_namespace: String) -> (Discovery, Background) {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
(
|
||||
Discovery {
|
||||
|
@ -137,6 +137,7 @@ pub fn new() -> (Discovery, Background) {
|
|||
},
|
||||
Background {
|
||||
rx,
|
||||
default_destination_namespace,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
@ -145,7 +146,7 @@ pub fn new() -> (Discovery, Background) {
|
|||
|
||||
impl Discovery {
|
||||
/// Start watching for address changes for a certain authority.
|
||||
pub fn resolve<B>(&self, authority: &FullyQualifiedAuthority, bind: B) -> Watch<B> {
|
||||
pub fn resolve<B>(&self, authority: &DnsNameAndPort, bind: B) -> Watch<B> {
|
||||
trace!("resolve; authority={:?}", authority);
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
self.tx
|
||||
|
@ -205,6 +206,7 @@ impl Background {
|
|||
T::Error: fmt::Debug,
|
||||
{
|
||||
DiscoveryWork {
|
||||
default_destination_namespace: self.default_destination_namespace,
|
||||
destinations: HashMap::new(),
|
||||
reconnects: VecDeque::new(),
|
||||
rpc_ready: false,
|
||||
|
@ -279,7 +281,11 @@ where
|
|||
}
|
||||
Entry::Vacant(vac) => {
|
||||
let query =
|
||||
DestinationServiceQuery::connect(client, vac.key(), "connect");
|
||||
DestinationServiceQuery::connect_maybe(
|
||||
&self.default_destination_namespace,
|
||||
client,
|
||||
vac.key(),
|
||||
"connect");
|
||||
vac.insert(DestinationSet {
|
||||
addrs: Exists::Unknown,
|
||||
query,
|
||||
|
@ -304,7 +310,11 @@ where
|
|||
|
||||
while let Some(auth) = self.reconnects.pop_front() {
|
||||
if let Some(set) = self.destinations.get_mut(&auth) {
|
||||
set.query = DestinationServiceQuery::connect(client, &auth, "reconnect");
|
||||
set.query = DestinationServiceQuery::connect_maybe(
|
||||
&self.default_destination_namespace,
|
||||
client,
|
||||
&auth,
|
||||
"reconnect");
|
||||
return true;
|
||||
} else {
|
||||
trace!("reconnect no longer needed: {:?}", auth);
|
||||
|
@ -317,10 +327,11 @@ where
|
|||
for (auth, set) in &mut self.destinations {
|
||||
let needs_reconnect = 'set: loop {
|
||||
let poll_result = match set.query {
|
||||
DestinationServiceQuery::NeedsReconnect => {
|
||||
None |
|
||||
Some(DestinationServiceQuery::NeedsReconnect) => {
|
||||
continue;
|
||||
},
|
||||
DestinationServiceQuery::ConnectedOrConnecting{ ref mut rx } => {
|
||||
Some(DestinationServiceQuery::ConnectedOrConnecting{ ref mut rx }) => {
|
||||
rx.poll()
|
||||
}
|
||||
};
|
||||
|
@ -356,9 +367,9 @@ where
|
|||
|
||||
};
|
||||
if needs_reconnect {
|
||||
set.query = DestinationServiceQuery::NeedsReconnect;
|
||||
set.query = Some(DestinationServiceQuery::NeedsReconnect);
|
||||
set.reset_on_next_modification();
|
||||
self.reconnects.push_back(FullyQualifiedAuthority::clone(auth));
|
||||
self.reconnects.push_back(auth.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -368,16 +379,28 @@ where
|
|||
// ===== impl DestinationServiceQuery =====
|
||||
|
||||
impl<T: HttpService<RequestBody = BoxBody, ResponseBody = RecvBody>> DestinationServiceQuery<T> {
|
||||
fn connect(client: &mut T, auth: &FullyQualifiedAuthority, connect_or_reconnect: &str) -> Self {
|
||||
// Initiates a query `query` to the Destination service and returns it as `Some(query)` if the
|
||||
// given authority's host is of a form suitable for using to query the Destination service.
|
||||
// Otherwise, returns `None`.
|
||||
fn connect_maybe(
|
||||
default_destination_namespace: &str,
|
||||
client: &mut T,
|
||||
auth: &DnsNameAndPort,
|
||||
connect_or_reconnect: &str)
|
||||
-> Option<Self>
|
||||
{
|
||||
trace!("DestinationServiceQuery {} {:?}", connect_or_reconnect, auth);
|
||||
let req = Destination {
|
||||
scheme: "k8s".into(),
|
||||
path: auth.without_trailing_dot().as_str().into(),
|
||||
};
|
||||
// TODO: Can grpc::Request::new be removed?
|
||||
let mut svc = DestinationSvc::new(client.lift_ref());
|
||||
let response = svc.get(grpc::Request::new(req));
|
||||
DestinationServiceQuery::ConnectedOrConnecting { rx: UpdateRx::Waiting(response) }
|
||||
FullyQualifiedAuthority::normalize(auth, default_destination_namespace)
|
||||
.map(|auth| {
|
||||
let req = Destination {
|
||||
scheme: "k8s".into(),
|
||||
path: auth.without_trailing_dot().to_owned(),
|
||||
};
|
||||
// TODO: Can grpc::Request::new be removed?
|
||||
let mut svc = DestinationSvc::new(client.lift_ref());
|
||||
let response = svc.get(grpc::Request::new(req));
|
||||
DestinationServiceQuery::ConnectedOrConnecting { rx: UpdateRx::Waiting(response) }
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -394,7 +417,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
}
|
||||
}
|
||||
|
||||
fn add<A>(&mut self, authority_for_logging: &FullyQualifiedAuthority, addrs_to_add: A)
|
||||
fn add<A>(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_add: A)
|
||||
where A: Iterator<Item = SocketAddr>
|
||||
{
|
||||
let mut cache = match self.addrs.take() {
|
||||
|
@ -408,7 +431,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
self.addrs = Exists::Yes(cache);
|
||||
}
|
||||
|
||||
fn remove<A>(&mut self, authority_for_logging: &FullyQualifiedAuthority, addrs_to_remove: A)
|
||||
fn remove<A>(&mut self, authority_for_logging: &DnsNameAndPort, addrs_to_remove: A)
|
||||
where A: Iterator<Item = SocketAddr>
|
||||
{
|
||||
let cache = match self.addrs.take() {
|
||||
|
@ -424,7 +447,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
self.addrs = Exists::Yes(cache);
|
||||
}
|
||||
|
||||
fn no_endpoints(&mut self, authority_for_logging: &FullyQualifiedAuthority, exists: bool) {
|
||||
fn no_endpoints(&mut self, authority_for_logging: &DnsNameAndPort, exists: bool) {
|
||||
trace!("no endpoints for {:?} that is known to {}", authority_for_logging,
|
||||
if exists { "exist" } else { "not exist" });
|
||||
match self.addrs.take() {
|
||||
|
@ -443,7 +466,7 @@ impl <T: HttpService<ResponseBody = RecvBody>> DestinationSet<T> {
|
|||
}
|
||||
|
||||
fn on_change(txs: &mut Vec<mpsc::UnboundedSender<Update>>,
|
||||
authority_for_logging: &FullyQualifiedAuthority,
|
||||
authority_for_logging: &DnsNameAndPort,
|
||||
addr: SocketAddr,
|
||||
change: CacheChange) {
|
||||
let (update_str, update_constructor): (&'static str, fn(SocketAddr) -> Update) =
|
||||
|
|
|
@ -17,8 +17,7 @@ use tower_h2;
|
|||
use tower_reconnect::{Error as ReconnectError, Reconnect};
|
||||
|
||||
use dns;
|
||||
use fully_qualified_authority::FullyQualifiedAuthority;
|
||||
use transport::{HostAndPort, LookupAddressAndConnect};
|
||||
use transport::{DnsNameAndPort, HostAndPort, LookupAddressAndConnect};
|
||||
use timeout::{Timeout, TimeoutError};
|
||||
|
||||
mod cache;
|
||||
|
@ -41,8 +40,9 @@ pub struct Background {
|
|||
disco: DiscoBg,
|
||||
}
|
||||
|
||||
pub fn new() -> (Control, Background) {
|
||||
let (tx, rx) = self::discovery::new();
|
||||
pub fn new(default_destination_namespace: String) -> (Control, Background)
|
||||
{
|
||||
let (tx, rx) = self::discovery::new(default_destination_namespace);
|
||||
|
||||
let c = Control {
|
||||
disco: tx,
|
||||
|
@ -58,7 +58,7 @@ pub fn new() -> (Control, Background) {
|
|||
// ===== impl Control =====
|
||||
|
||||
impl Control {
|
||||
pub fn resolve<B>(&self, auth: &FullyQualifiedAuthority, bind: B) -> Watch<B> {
|
||||
pub fn resolve<B>(&self, auth: &DnsNameAndPort, bind: B) -> Watch<B> {
|
||||
self.disco.resolve(auth, bind)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,60 +1,28 @@
|
|||
use bytes::BytesMut;
|
||||
use bytes::{BytesMut};
|
||||
|
||||
use std::net::IpAddr;
|
||||
use std::str::FromStr;
|
||||
|
||||
use http::uri::Authority;
|
||||
use transport::DnsNameAndPort;
|
||||
|
||||
/// A normalized `Authority`.
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
pub struct FullyQualifiedAuthority(Authority);
|
||||
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
pub struct NamedAddress {
|
||||
pub name: FullyQualifiedAuthority,
|
||||
pub use_destination_service: bool
|
||||
}
|
||||
pub struct FullyQualifiedAuthority(String);
|
||||
|
||||
impl FullyQualifiedAuthority {
|
||||
/// Normalizes the name according to Kubernetes service naming conventions.
|
||||
/// Case folding is not done; that is done internally inside `Authority`.
|
||||
///
|
||||
/// This assumes the authority is syntactically valid.
|
||||
pub fn normalize(authority: &Authority, default_namespace: &str)
|
||||
-> NamedAddress
|
||||
{
|
||||
// Don't change IP-address-based authorities.
|
||||
if IpAddr::from_str(authority.host()).is_ok() {
|
||||
return NamedAddress {
|
||||
name: FullyQualifiedAuthority(authority.clone()),
|
||||
use_destination_service: false,
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: `Authority` doesn't have a way to get the serialized form of the
|
||||
// port, so do it ourselves.
|
||||
let (name, colon_port) = {
|
||||
let authority = authority.as_str();
|
||||
match authority.rfind(':') {
|
||||
Some(p) => authority.split_at(p),
|
||||
None => (authority, ""),
|
||||
}
|
||||
};
|
||||
pub fn normalize(authority: &DnsNameAndPort, default_namespace: &str) -> Option<Self> {
|
||||
let name: &str = authority.host.as_ref();
|
||||
|
||||
// parts should have a maximum 4 of pieces (name, namespace, svc, zone)
|
||||
let mut parts = name.splitn(4, '.');
|
||||
|
||||
// `Authority` guarantees the name has at least one part.
|
||||
// `dns::Name` guarantees the name has at least one part.
|
||||
assert!(parts.next().is_some());
|
||||
|
||||
// Rewrite "$name" -> "$name.$default_namespace".
|
||||
let has_explicit_namespace = match parts.next() {
|
||||
Some("") => {
|
||||
// "$name." is an external absolute name.
|
||||
return NamedAddress {
|
||||
name: FullyQualifiedAuthority(authority.clone()),
|
||||
use_destination_service: false,
|
||||
};
|
||||
return None;
|
||||
},
|
||||
Some(_) => true,
|
||||
None => false,
|
||||
|
@ -69,20 +37,14 @@ impl FullyQualifiedAuthority {
|
|||
let append_svc = if let Some(part) = parts.next() {
|
||||
if !part.eq_ignore_ascii_case("svc") {
|
||||
// If not "$name.$namespace.svc", treat as external.
|
||||
return NamedAddress {
|
||||
name: FullyQualifiedAuthority(authority.clone()),
|
||||
use_destination_service: false,
|
||||
};
|
||||
return None;
|
||||
}
|
||||
false
|
||||
} else if has_explicit_namespace {
|
||||
true
|
||||
} else if namespace_to_append.is_none() {
|
||||
// We can't append ".svc" without a namespace, so treat as external.
|
||||
return NamedAddress {
|
||||
name: FullyQualifiedAuthority(authority.clone()),
|
||||
use_destination_service: false,
|
||||
}
|
||||
return None;
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
@ -100,10 +62,7 @@ impl FullyQualifiedAuthority {
|
|||
// "a.b.svc." is an external absolute name.
|
||||
// "a.b.svc.foo" is external if the default zone is not
|
||||
// "foo".
|
||||
return NamedAddress {
|
||||
name: FullyQualifiedAuthority(authority.clone()),
|
||||
use_destination_service: false,
|
||||
}
|
||||
return None;
|
||||
}
|
||||
(None, strip_last)
|
||||
} else {
|
||||
|
@ -121,17 +80,16 @@ impl FullyQualifiedAuthority {
|
|||
additional_len += 1 + zone.len(); // "." + zone
|
||||
}
|
||||
|
||||
// If we're not going to change anything then don't allocate anything.
|
||||
if additional_len == 0 && !strip_last {
|
||||
return NamedAddress {
|
||||
name: FullyQualifiedAuthority(authority.clone()),
|
||||
use_destination_service: true,
|
||||
}
|
||||
}
|
||||
let port_str_len = match authority.port {
|
||||
80 => 0, // XXX: Assumes http://, which is all we support right now.
|
||||
p if p >= 10000 => 1 + 5,
|
||||
p if p >= 1000 => 1 + 4,
|
||||
p if p >= 100 => 1 + 3,
|
||||
p if p >= 10 => 1 + 2,
|
||||
_ => 1,
|
||||
};
|
||||
|
||||
// `authority.as_str().len()` includes the length of `colon_port`.
|
||||
let mut normalized =
|
||||
BytesMut::with_capacity(authority.as_str().len() + additional_len);
|
||||
let mut normalized = BytesMut::with_capacity(name.len() + additional_len + port_str_len);
|
||||
normalized.extend_from_slice(name.as_bytes());
|
||||
if let Some(namespace) = namespace_to_append {
|
||||
normalized.extend_from_slice(b".");
|
||||
|
@ -144,52 +102,58 @@ impl FullyQualifiedAuthority {
|
|||
normalized.extend_from_slice(b".");
|
||||
normalized.extend_from_slice(zone.as_bytes());
|
||||
}
|
||||
normalized.extend_from_slice(colon_port.as_bytes());
|
||||
|
||||
if strip_last {
|
||||
let new_len = normalized.len() - 1;
|
||||
normalized.truncate(new_len);
|
||||
}
|
||||
|
||||
let name = Authority::from_shared(normalized.freeze())
|
||||
.expect("syntactically-valid authority");
|
||||
let name = FullyQualifiedAuthority(name);
|
||||
NamedAddress {
|
||||
name,
|
||||
use_destination_service: true,
|
||||
// Append the port
|
||||
if port_str_len > 0 {
|
||||
normalized.extend_from_slice(b":");
|
||||
let port = authority.port.to_string();
|
||||
normalized.extend_from_slice(port.as_ref());
|
||||
}
|
||||
|
||||
Some(FullyQualifiedAuthority(String::from_utf8(normalized.freeze().to_vec()).unwrap()))
|
||||
}
|
||||
|
||||
pub fn without_trailing_dot(&self) -> &Authority {
|
||||
pub fn without_trailing_dot(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use transport::{DnsNameAndPort, Host, HostAndPort};
|
||||
use http::uri::Authority;
|
||||
use std::str::FromStr;
|
||||
|
||||
#[test]
|
||||
fn test_normalized_authority() {
|
||||
fn local(input: &str, default_namespace: &str) -> String {
|
||||
use bytes::Bytes;
|
||||
use http::uri::Authority;
|
||||
fn dns_name_and_port_from_str(input: &str) -> DnsNameAndPort {
|
||||
let authority = Authority::from_str(input).unwrap();
|
||||
match HostAndPort::normalize(&authority, Some(80)) {
|
||||
Ok(HostAndPort { host: Host::DnsName(host), port }) =>
|
||||
DnsNameAndPort { host, port },
|
||||
Err(e) => {
|
||||
unreachable!("{:?} when parsing {:?}", e, input)
|
||||
},
|
||||
_ => unreachable!("Not a DNS name: {:?}", input),
|
||||
}
|
||||
}
|
||||
|
||||
let input = Authority::from_shared(Bytes::from(input.as_bytes()))
|
||||
.unwrap();
|
||||
let output = super::FullyQualifiedAuthority::normalize(
|
||||
&input, default_namespace);
|
||||
assert_eq!(output.use_destination_service, true, "input: {}", input);
|
||||
output.name.without_trailing_dot().as_str().into()
|
||||
fn local(input: &str, default_namespace: &str) -> String {
|
||||
let name = dns_name_and_port_from_str(input);
|
||||
let output = super::FullyQualifiedAuthority::normalize(&name, default_namespace);
|
||||
assert!(output.is_some(), "input: {}", input);
|
||||
output.unwrap().without_trailing_dot().into()
|
||||
}
|
||||
|
||||
fn external(input: &str, default_namespace: &str) {
|
||||
use bytes::Bytes;
|
||||
use http::uri::Authority;
|
||||
|
||||
let input = Authority::from_shared(Bytes::from(input.as_bytes())).unwrap();
|
||||
let output = super::FullyQualifiedAuthority::normalize(
|
||||
&input, default_namespace);
|
||||
assert_eq!(output.use_destination_service, false);
|
||||
assert_eq!(output.name.without_trailing_dot().as_str(), input);
|
||||
let name = dns_name_and_port_from_str(input);
|
||||
let output = super::FullyQualifiedAuthority::normalize(&name, default_namespace);
|
||||
assert!(output.is_none(), "input: {}", input);
|
||||
}
|
||||
|
||||
assert_eq!("name.namespace.svc.cluster.local", local("name", "namespace"));
|
||||
|
@ -245,20 +209,10 @@ mod tests {
|
|||
local("name.namespace.svc.cluster.local:1234", "namespace"));
|
||||
|
||||
// "SVC" is recognized as being equivalent to "svc"
|
||||
assert_eq!("name.namespace.SVC.cluster.local",
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
local("name.namespace.SVC", "namespace"));
|
||||
external("name.namespace.SVC.cluster", "namespace");
|
||||
assert_eq!("name.namespace.SVC.cluster.local",
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
local("name.namespace.SVC.cluster.local", "namespace"));
|
||||
|
||||
// IPv4 addresses are left unchanged.
|
||||
external("1.2.3.4", "namespace");
|
||||
external("1.2.3.4:1234", "namespace");
|
||||
external("127.0.0.1", "namespace");
|
||||
external("127.0.0.1:8080", "namespace");
|
||||
|
||||
// IPv6 addresses are left unchanged.
|
||||
external("[::1]", "namespace");
|
||||
external("[::1]:1234", "namespace");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -203,11 +203,10 @@ where
|
|||
config.metrics_flush_interval,
|
||||
);
|
||||
|
||||
let (control, control_bg) = control::new();
|
||||
|
||||
let executor = core.handle();
|
||||
let (control, control_bg) = control::new(config.pod_namespace.clone());
|
||||
|
||||
let dns_config = dns::Config::from_file(&config.resolv_conf_path);
|
||||
let executor = core.handle();
|
||||
|
||||
let bind = Bind::new(executor.clone()).with_sensors(sensors.clone());
|
||||
|
||||
|
|
|
@ -18,9 +18,10 @@ use bind::{self, Bind, Protocol};
|
|||
use control::{self, discovery};
|
||||
use control::discovery::Bind as BindTrait;
|
||||
use ctx;
|
||||
use fully_qualified_authority::{FullyQualifiedAuthority, NamedAddress};
|
||||
use fully_qualified_authority::FullyQualifiedAuthority;
|
||||
use timeout::Timeout;
|
||||
use transparency::h1;
|
||||
use transport::{DnsNameAndPort, Host, HostAndPort};
|
||||
|
||||
type BindProtocol<B> = bind::BindProtocol<Arc<ctx::Proxy>, B>;
|
||||
|
||||
|
@ -52,8 +53,8 @@ impl<B> Outbound<B> {
|
|||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
pub enum Destination {
|
||||
LocalSvc(FullyQualifiedAuthority),
|
||||
External(SocketAddr),
|
||||
Hostname(DnsNameAndPort),
|
||||
ImplicitOriginalDst(SocketAddr),
|
||||
}
|
||||
|
||||
impl<B> Recognize for Outbound<B>
|
||||
|
@ -77,38 +78,44 @@ where
|
|||
// by `NormalizeUri`, as we need to know whether the request will
|
||||
// be routed by Host/authority or by SO_ORIGINAL_DST, in order to
|
||||
// determine whether the service is reusable.
|
||||
let local = req.uri().authority_part()
|
||||
let authority = req.uri().authority_part()
|
||||
.cloned()
|
||||
// Therefore, we need to check the host header as well as the URI
|
||||
// for a valid authority, before we fall back to SO_ORIGINAL_DST.
|
||||
.or_else(|| h1::authority_from_host(req))
|
||||
.map(|authority| {
|
||||
FullyQualifiedAuthority::normalize(
|
||||
&authority,
|
||||
&self.default_namespace)
|
||||
});
|
||||
.or_else(|| h1::authority_from_host(req));
|
||||
|
||||
// If we can't fully qualify the authority as a local service,
|
||||
// and there is no original dst, then we have nothing! In that
|
||||
// case, we return `None`, which results an "unrecognized" error.
|
||||
//
|
||||
// In practice, this shouldn't ever happen, since we expect the proxy
|
||||
// to be run on Linux servers, with iptables setup, so there should
|
||||
// always be an original destination.
|
||||
let dest = if let Some(NamedAddress {
|
||||
name,
|
||||
use_destination_service: true
|
||||
}) = local {
|
||||
Destination::LocalSvc(name)
|
||||
} else {
|
||||
let orig_dst = req.extensions()
|
||||
// TODO: Return error when `HostAndPort::normalize()` fails.
|
||||
let mut dest = match authority.as_ref()
|
||||
.and_then(|auth| HostAndPort::normalize(auth, Some(80)).ok()) {
|
||||
Some(HostAndPort { host: Host::DnsName(dns_name), port }) => {
|
||||
let authority = DnsNameAndPort { host: dns_name, port };
|
||||
// Work around the inability of control/discovery.rs to handle unnormalized names.
|
||||
// TODO: Remove this use of `FullyQualifiedAuthority::normalize()` and use
|
||||
// `Destination::Hostname` for all `Host::DnsName` values once DNS machinery is
|
||||
// added to control/discovery.rs.
|
||||
FullyQualifiedAuthority::normalize(&authority, &self.default_namespace)
|
||||
.map(|_| Destination::Hostname(authority))
|
||||
},
|
||||
Some(HostAndPort { host: Host::Ip(_), .. }) |
|
||||
None => None,
|
||||
};
|
||||
|
||||
if dest.is_none() {
|
||||
dest = req.extensions()
|
||||
.get::<Arc<ctx::transport::Server>>()
|
||||
.and_then(|ctx| {
|
||||
ctx.orig_dst_if_not_local()
|
||||
});
|
||||
Destination::External(orig_dst?)
|
||||
})
|
||||
.map(Destination::ImplicitOriginalDst)
|
||||
};
|
||||
|
||||
// If there is no authority in the request URI or in the Host header,
|
||||
// and there is no original dst, then we have nothing! In that case,
|
||||
// return `None`, which results an "unrecognized" error. In practice,
|
||||
// this shouldn't ever happen, since we expect the proxy to be run on
|
||||
// Linux servers, with iptables setup, so there should always be an
|
||||
// original destination.
|
||||
let dest = dest?;
|
||||
|
||||
Some(proto.into_key(dest))
|
||||
}
|
||||
|
@ -130,14 +137,14 @@ where
|
|||
debug!("building outbound {:?} client to {:?}", protocol, dest);
|
||||
|
||||
let resolve = match *dest {
|
||||
Destination::LocalSvc(ref authority) => {
|
||||
Discovery::LocalSvc(self.discovery.resolve(
|
||||
Destination::Hostname(ref authority) => {
|
||||
Discovery::NamedSvc(self.discovery.resolve(
|
||||
authority,
|
||||
self.bind.clone().with_protocol(protocol.clone()),
|
||||
))
|
||||
},
|
||||
Destination::External(addr) => {
|
||||
Discovery::External(Some((addr, self.bind.clone()
|
||||
Destination::ImplicitOriginalDst(addr) => {
|
||||
Discovery::ImplicitOriginalDst(Some((addr, self.bind.clone()
|
||||
.with_protocol(protocol.clone()))))
|
||||
}
|
||||
};
|
||||
|
@ -161,8 +168,8 @@ where
|
|||
}
|
||||
|
||||
pub enum Discovery<B> {
|
||||
LocalSvc(discovery::Watch<BindProtocol<B>>),
|
||||
External(Option<(SocketAddr, BindProtocol<B>)>),
|
||||
NamedSvc(discovery::Watch<BindProtocol<B>>),
|
||||
ImplicitOriginalDst(Option<(SocketAddr, BindProtocol<B>)>),
|
||||
}
|
||||
|
||||
impl<B> Discover for Discovery<B>
|
||||
|
@ -178,9 +185,9 @@ where
|
|||
|
||||
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
|
||||
match *self {
|
||||
Discovery::LocalSvc(ref mut w) => w.poll()
|
||||
Discovery::NamedSvc(ref mut w) => w.poll()
|
||||
.map_err(|_| BindError::Internal),
|
||||
Discovery::External(ref mut opt) => {
|
||||
Discovery::ImplicitOriginalDst(ref mut opt) => {
|
||||
// This "discovers" a single address for an external service
|
||||
// that never has another change. This can mean it floats
|
||||
// in the Balancer forever. However, when we finally add
|
||||
|
|
|
@ -9,7 +9,6 @@ use std::str::FromStr;
|
|||
use http;
|
||||
|
||||
use connection;
|
||||
use convert;
|
||||
use dns;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
@ -24,6 +23,13 @@ pub struct HostAndPort {
|
|||
pub port: u16,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
pub struct DnsNameAndPort {
|
||||
pub host: dns::Name,
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Host {
|
||||
DnsName(dns::Name),
|
||||
|
@ -48,9 +54,10 @@ pub struct LookupAddressAndConnect {
|
|||
|
||||
// ===== impl HostAndPort =====
|
||||
|
||||
impl<'a> convert::TryFrom<&'a http::uri::Authority> for HostAndPort {
|
||||
type Err = HostAndPortError;
|
||||
fn try_from(a: &http::uri::Authority) -> Result<Self, Self::Err> {
|
||||
impl HostAndPort {
|
||||
pub fn normalize(a: &http::uri::Authority, default_port: Option<u16>)
|
||||
-> Result<Self, HostAndPortError>
|
||||
{
|
||||
let host = {
|
||||
match dns::Name::normalize(a.host()) {
|
||||
Ok(host) => Host::DnsName(host),
|
||||
|
@ -61,7 +68,9 @@ impl<'a> convert::TryFrom<&'a http::uri::Authority> for HostAndPort {
|
|||
},
|
||||
}
|
||||
};
|
||||
let port = a.port().ok_or_else(|| HostAndPortError::MissingPort)?;
|
||||
let port = a.port()
|
||||
.or(default_port)
|
||||
.ok_or_else(|| HostAndPortError::MissingPort)?;
|
||||
Ok(HostAndPort {
|
||||
host,
|
||||
port
|
||||
|
|
|
@ -3,7 +3,7 @@ mod so_original_dst;
|
|||
|
||||
pub use self::connect::{
|
||||
Connect,
|
||||
Host, HostAndPort, HostAndPortError,
|
||||
DnsNameAndPort, Host, HostAndPort, HostAndPortError,
|
||||
LookupAddressAndConnect,
|
||||
};
|
||||
pub use self::so_original_dst::{GetOriginalDst, SoOriginalDst};
|
||||
|
|
Loading…
Reference in New Issue