Proxy: Map unqualified/partially-qualified names to FQDN (#59)

* Proxy: Map unqualified/partially-qualified names to FQDN

Previously we required the service to fully qualify all service names
for outbound traffic. Many services are written assuming that
Kubernetes will complete names using its DNS search path, and those
services weren't working with Conduit.

Now add an option, used by default, to fully-qualify the domain names.
Currently only Kubernetes-like name completion for services is
supported, but the configuration syntax is open-ended to allow for
alternatives in the future. Also, the auto-completion can be disabled
for applications that prefer to ensure they're always using unambiguous
names. Once routing is implemented then it is likely that (default)
routing rules will replace these hard-coded rules.

Unit tests for the name completion logic are included.

Part of the solution for #9. The changes to `conduit inject` to
actually use this facility will be in another PR.
This commit is contained in:
Brian Smith 2017-12-19 11:59:26 -10:00 committed by GitHub
parent fdf9f1a81c
commit 1164759540
6 changed files with 315 additions and 16 deletions

View File

@ -51,7 +51,17 @@ pub struct Config {
pub pod_name: Option<String>,
pub pod_namespace: Option<String>,
pub pod_zone: Option<String>,
pub node_name: Option<String>,
/// Should we use `pod_namespace` and/or `pod_zone` to map unqualified/partially-qualified
/// to fully-qualified names using the given platform's conventions?
destinations_autocomplete_fqdn: Option<Environment>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum Environment {
Kubernetes,
}
/// Configuration settings for binding a listener.
@ -76,6 +86,7 @@ pub enum Error {
#[derive(Clone, Debug)]
pub enum ParseError {
EnviromentUnsupported,
NotANumber,
HostIsNotAnIpAddress,
NotUnicode,
@ -132,6 +143,8 @@ const ENV_PUBLIC_CONNECT_TIMEOUT: &str = "CONDUIT_PROXY_PUBLIC_CONNECT_TIMEOUT";
const ENV_NODE_NAME: &str = "CONDUIT_PROXY_NODE_NAME";
const ENV_POD_NAME: &str = "CONDUIT_PROXY_POD_NAME";
const ENV_POD_NAMESPACE: &str = "CONDUIT_PROXY_POD_NAMESPACE";
const ENV_POD_ZONE: &str = "CONDUIT_PROXY_POD_ZONE";
const ENV_DESTINATIONS_AUTOCOMPLETE_FQDN: &str = "CONDUIT_PROXY_DESTINATIONS_AUTOCOMPLETE_FQDN";
pub const ENV_CONTROL_URL: &str = "CONDUIT_PROXY_CONTROL_URL";
const ENV_RESOLV_CONF: &str = "CONDUIT_RESOLV_CONF";
@ -170,7 +183,10 @@ impl<'a> TryFrom<&'a Strings> for Config {
let report_timeout = parse(strings, ENV_REPORT_TIMEOUT_SECS, parse_number);
let pod_name = strings.get(ENV_POD_NAME);
let pod_namespace = strings.get(ENV_POD_NAMESPACE);
let pod_zone = strings.get(ENV_POD_ZONE);
let node_name = strings.get(ENV_NODE_NAME);
let destinations_autocomplete_fqdn =
parse(strings, ENV_DESTINATIONS_AUTOCOMPLETE_FQDN, parse_environment);
Ok(Config {
private_listener: Listener {
@ -204,11 +220,29 @@ impl<'a> TryFrom<&'a Strings> for Config {
Duration::from_secs(report_timeout?.unwrap_or(DEFAULT_REPORT_TIMEOUT_SECS)),
pod_name: pod_name?,
pod_namespace: pod_namespace?,
pod_zone: pod_zone?,
node_name: node_name?,
destinations_autocomplete_fqdn: destinations_autocomplete_fqdn?,
})
}
}
impl Config {
pub fn default_destination_namespace(&self) -> Option<&String> {
match self.destinations_autocomplete_fqdn {
Some(Environment::Kubernetes) => self.pod_namespace.as_ref(),
None => None,
}
}
pub fn default_destination_zone(&self) -> Option<&String> {
match self.destinations_autocomplete_fqdn {
Some(Environment::Kubernetes) => self.pod_zone.as_ref(),
None => None,
}
}
}
// ===== impl Addr =====
impl FromStr for Addr {
@ -275,6 +309,13 @@ impl Strings for TestEnv {
// ===== Parsing =====
fn parse_environment(s: &str) -> Result<Environment, ParseError> {
match s {
"Kubernetes" => Ok(Environment::Kubernetes),
_ => Err(ParseError::EnviromentUnsupported),
}
}
fn parse_number<T>(s: &str) -> Result<T, ParseError> where T: FromStr {
s.parse().map_err(|_| ParseError::NotANumber)
}

View File

@ -4,11 +4,12 @@ use std::net::SocketAddr;
use futures::{Async, Future, Poll, Stream};
use futures::sync::mpsc;
use http::uri::Authority;
use tower::Service;
use tower_discover::{Change, Discover};
use tower_grpc;
use fully_qualified_authority::FullyQualifiedAuthority;
use super::codec::Protobuf;
use super::pb::common::{Destination, TcpAddress};
use super::pb::proxy::destination::Update as PbUpdate;
@ -24,7 +25,7 @@ pub type ClientBody = ::tower_grpc::client::codec::EncodingBody<
/// A handle to start watching a destination for address changes.
#[derive(Clone, Debug)]
pub struct Discovery {
tx: mpsc::UnboundedSender<(Authority, mpsc::UnboundedSender<Update>)>,
tx: mpsc::UnboundedSender<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
}
/// A `tower_discover::Discover`, given to a `tower_balance::Balance`.
@ -37,7 +38,7 @@ pub struct Watch<B> {
/// A background handle to eventually bind on the controller thread.
#[derive(Debug)]
pub struct Background {
rx: mpsc::UnboundedReceiver<(Authority, mpsc::UnboundedSender<Update>)>,
rx: mpsc::UnboundedReceiver<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
}
type DiscoveryWatch<F> = DestinationSet<
@ -51,14 +52,14 @@ type DiscoveryWatch<F> = DestinationSet<
/// the controller destination API.
#[derive(Debug)]
pub struct DiscoveryWork<F> {
destinations: HashMap<Authority, DiscoveryWatch<F>>,
destinations: HashMap<FullyQualifiedAuthority, DiscoveryWatch<F>>,
/// A queue of authorities that need to be reconnected.
reconnects: VecDeque<Authority>,
reconnects: VecDeque<FullyQualifiedAuthority>,
/// The Destination.Get RPC client service.
/// Each poll, records whether the rpc service was till ready.
rpc_ready: bool,
/// A receiver of new watch requests.
rx: mpsc::UnboundedReceiver<(Authority, mpsc::UnboundedSender<Update>)>,
rx: mpsc::UnboundedReceiver<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
}
#[derive(Debug)]
@ -115,7 +116,7 @@ pub fn new() -> (Discovery, Background) {
impl Discovery {
/// Start watching for address changes for a certain authority.
pub fn resolve<B>(&self, authority: &Authority, bind: B) -> Watch<B> {
pub fn resolve<B>(&self, authority: &FullyQualifiedAuthority, bind: B) -> Watch<B> {
trace!("resolve; authority={:?}", authority);
let (tx, rx) = mpsc::unbounded();
self.tx
@ -251,7 +252,7 @@ where
Entry::Vacant(vac) => {
let req = Destination {
scheme: "k8s".into(),
path: vac.key().as_str().into(),
path: vac.key().without_trailing_dot().into(),
};
let stream = DestinationSvc::new(&mut rpc).get(req);
vac.insert(DestinationSet {
@ -292,7 +293,7 @@ where
trace!("Destination.Get reconnect {:?}", auth);
let req = Destination {
scheme: "k8s".into(),
path: auth.as_str().into(),
path: auth.without_trailing_dot().into(),
};
set.rx = DestinationSvc::new(&mut rpc).get(req);
set.needs_reconnect = false;
@ -346,7 +347,7 @@ where
};
if needs_reconnect {
set.needs_reconnect = true;
self.reconnects.push_back(Authority::clone(auth));
self.reconnects.push_back(FullyQualifiedAuthority::clone(auth));
}
}
}

View File

@ -17,6 +17,7 @@ use tower_reconnect::Reconnect;
use url::HostAndPort;
use dns;
use fully_qualified_authority::FullyQualifiedAuthority;
use transport::LookupAddressAndConnect;
use timeout::Timeout;
@ -57,7 +58,7 @@ pub fn new() -> (Control, Background) {
// ===== impl Control =====
impl Control {
pub fn resolve<B>(&self, auth: &http::uri::Authority, bind: B) -> Watch<B> {
pub fn resolve<B>(&self, auth: &FullyQualifiedAuthority, bind: B) -> Watch<B> {
self.disco.resolve(auth, bind)
}
}

View File

@ -0,0 +1,237 @@
use bytes::BytesMut;
use std::ascii::AsciiExt;
use std::net::IpAddr;
use std::str::FromStr;
use http::uri::Authority;
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct FullyQualifiedAuthority(Authority);
impl FullyQualifiedAuthority {
/// Normalizes the name according to Kubernetes service naming conventions.
/// Case folding is not done; that is done internally inside `Authority`.
///
/// This assumes the authority is syntactically valid.
pub fn new(authority: &Authority, default_namespace: Option<&str>,
default_zone: Option<&str>)
-> FullyQualifiedAuthority {
// Don't change IP-address-based authorities.
if IpAddr::from_str(authority.host()).is_ok() {
return FullyQualifiedAuthority(authority.clone())
};
// TODO: `Authority` doesn't have a way to get the serialized form of the
// port, so do it ourselves.
let (name, colon_port) = {
let authority = authority.as_str();
match authority.rfind(':') {
Some(p) => authority.split_at(p),
None => (authority, ""),
}
};
// A fully qualified name ending with a dot is normalized by removing the
// dot and doing nothing else.
if name.ends_with('.') {
let authority = authority.clone().into_bytes();
let normalized = authority.slice(0, authority.len() - 1);
return FullyQualifiedAuthority(Authority::from_shared(normalized).unwrap());
}
let mut parts = name.split('.');
// `Authority` guarantees the name has at least one part.
assert!(parts.next().is_some());
// Rewrite "$name" -> "$name.$default_namespace".
let has_explicit_namespace = parts.next().is_some();
let namespace_to_append = if !has_explicit_namespace {
default_namespace
} else {
None
};
// Rewrite "$name.$namespace" -> "$name.$namespace.svc".
let (has_svc, append_svc) = if let Some(part) = parts.next() {
(part.eq_ignore_ascii_case("svc"), false)
} else {
let has_namespace =
has_explicit_namespace || namespace_to_append.is_some();
(has_namespace, has_namespace)
};
// Rewrite "$name.$namespace.svc" -> "$name.$namespace.svc.$zone".
let zone_to_append = if has_svc && parts.next().is_none() {
default_zone
} else {
None
};
let mut additional_len = 0;
if let Some(namespace) = namespace_to_append {
additional_len += 1 + namespace.len(); // "." + namespace
}
if append_svc {
additional_len += 4; // ".svc"
}
if let Some(zone) = zone_to_append {
additional_len += 1 + zone.len(); // "." + zone
}
// If we're not going to change anything then don't allocate anything.
if additional_len == 0 {
return FullyQualifiedAuthority(authority.clone());
}
// `authority.as_str().len()` includes the length of `colon_port`.
let mut normalized =
BytesMut::with_capacity(authority.as_str().len() + additional_len);
normalized.extend_from_slice(name.as_bytes());
if let Some(namespace) = namespace_to_append {
normalized.extend_from_slice(b".");
normalized.extend_from_slice(namespace.as_bytes());
}
if append_svc {
normalized.extend_from_slice(b".svc");
}
if let Some(zone) = zone_to_append {
normalized.extend_from_slice(b".");
normalized.extend_from_slice(zone.as_bytes());
}
normalized.extend_from_slice(colon_port.as_bytes());
FullyQualifiedAuthority(Authority::from_shared(normalized.freeze())
.expect("syntactically-valid authority"))
}
pub fn without_trailing_dot(&self) -> &str {
self.0.as_str()
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_normalized_authority() {
fn f(input: &str, default_namespace: Option<&str>,
default_zone: Option<&str>)
-> String {
use bytes::Bytes;
use http::uri::Authority;
let input = Authority::from_shared(Bytes::from(input.as_bytes())).unwrap();
let output = super::FullyQualifiedAuthority::new(
&input, default_namespace, default_zone);
output.without_trailing_dot().into()
}
assert_eq!("name",
f("name", None, None));
assert_eq!("name.namespace.svc",
f("name.namespace", None, None));
assert_eq!("name.namespace.svc",
f("name.namespace.svc", None, None));
assert_eq!("name.namespace.svc.cluster",
f("name.namespace.svc.cluster", None, None));
assert_eq!("name.namespace.svc.cluster.local",
f("name.namespace.svc.cluster.local", None, None));
assert_eq!("name.namespace.svc",
f("name", Some("namespace"), None));
assert_eq!("name.namespace.svc",
f("name.namespace", Some("namespace"), None));
assert_eq!("name.namespace.svc",
f("name.namespace.svc", Some("namespace"), None));
assert_eq!("name.namespace.svc.cluster",
f("name.namespace.svc.cluster", Some("namespace"), None));
assert_eq!("name.namespace.svc.cluster.local",
f("name.namespace.svc.cluster.local", Some("namespace"), None));
assert_eq!("name",
f("name", None, Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster.local",
f("name.namespace", None, Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster.local",
f("name.namespace.svc", None, Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster",
f("name.namespace.svc.cluster", None, Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster.local",
f("name.namespace.svc.cluster.local", None, Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster.local",
f("name", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster.local",
f("name.namespace", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster.local",
f("name.namespace.svc", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster",
f("name.namespace.svc.cluster", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster.local",
f("name.namespace.svc.cluster.local", Some("namespace"), Some("cluster.local")));
// Fully-qualified names end with a dot and aren't modified except by removing the dot.
assert_eq!("name",
f("name.", None, None));
assert_eq!("name.namespace",
f("name.namespace.", None, None));
assert_eq!("name.namespace.svc",
f("name.namespace.svc.", None, None));
assert_eq!("name.namespace.svc.cluster",
f("name.namespace.svc.cluster.", None, None));
assert_eq!("name.namespace.svc.cluster.local",
f("name.namespace.svc.cluster.local.", None, None));
assert_eq!("name",
f("name.", Some("namespace"), None));
assert_eq!("name.namespace",
f("name.namespace.", Some("namespace"), None));
assert_eq!("name.namespace.svc",
f("name.namespace.svc.", Some("namespace"), None));
assert_eq!("name.namespace.svc.cluster",
f("name.namespace.svc.cluster.", Some("namespace"), None));
assert_eq!("name.namespace.svc.cluster.local",
f("name.namespace.svc.cluster.local.", Some("namespace"), None));
assert_eq!("name",
f("name.", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace",
f("name.namespace.", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace.svc",
f("name.namespace.svc.", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster",
f("name.namespace.svc.cluster.", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster.local",
f("name.namespace.svc.cluster.local.", Some("namespace"), Some("cluster.local")));
// Ports are preserved.
assert_eq!("name.namespace.svc.cluster.local:1234",
f("name:1234", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster.local:1234",
f("name.namespace:1234", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster.local:1234",
f("name.namespace.svc:1234", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster:1234",
f("name.namespace.svc.cluster:1234", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace.svc.cluster.local:1234",
f("name.namespace.svc.cluster.local:1234", Some("namespace"), Some("cluster.local")));
// "SVC" is recognized as being equivalent to "svc"
assert_eq!("name.namespace.SVC.cluster.local",
f("name.namespace.SVC", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace.SVC.cluster",
f("name.namespace.SVC.cluster", Some("namespace"), Some("cluster.local")));
assert_eq!("name.namespace.SVC.cluster.local",
f("name.namespace.SVC.cluster.local", Some("namespace"), Some("cluster.local")));
// IPv4 addresses are left unchanged.
assert_eq!("1.2.3.4",
f("1.2.3.4", Some("namespace"), Some("cluster.local")));
assert_eq!("1.2.3.4:1234",
f("1.2.3.4:1234", Some("namespace"), Some("cluster.local")));
// IPv6 addresses are left unchanged.
assert_eq!("[::1]",
f("[::1]", Some("namespace"), Some("cluster.local")));
assert_eq!("[::1]:1234",
f("[::1]:1234", Some("namespace"), Some("cluster.local")));
}
}

View File

@ -62,6 +62,7 @@ pub mod control;
pub mod convert;
mod ctx;
mod dns;
mod fully_qualified_authority;
mod inbound;
mod logging;
mod map_err;
@ -204,10 +205,16 @@ impl Main {
.map_or_else(|| bind.clone(), |t| bind.clone().with_connect_timeout(t))
.with_ctx(ctx.clone());
let outgoing = Outbound::new(
bind,
control,
config.default_destination_namespace().cloned(),
config.default_destination_zone().cloned());
let fut = serve(
outbound_listener,
h2::server::Builder::default(),
Outbound::new(bind, control),
outgoing,
ctx,
sensors,
&executor,

View File

@ -11,6 +11,7 @@ use tower_router::Recognize;
use bind::Bind;
use control;
use ctx;
use fully_qualified_authority::FullyQualifiedAuthority;
use telemetry;
use transport;
@ -29,15 +30,21 @@ type Error = tower_buffer::Error<
pub struct Outbound<B> {
bind: Bind<Arc<ctx::Proxy>, B>,
discovery: control::Control,
default_namespace: Option<String>,
default_zone: Option<String>,
}
// ===== impl Outbound =====
impl<B> Outbound<B> {
pub fn new(bind: Bind<Arc<ctx::Proxy>, B>, discovery: control::Control) -> Self {
pub fn new(bind: Bind<Arc<ctx::Proxy>, B>, discovery: control::Control,
default_namespace: Option<String>, default_zone: Option<String>)
-> Outbound<B> {
Self {
bind,
discovery,
default_namespace,
default_zone,
}
}
}
@ -49,12 +56,17 @@ where
type Request = http::Request<B>;
type Response = http::Response<telemetry::sensor::http::ResponseBody<tower_h2::RecvBody>>;
type Error = Error;
type Key = http::uri::Authority;
type Key = FullyQualifiedAuthority;
type RouteError = ();
type Service = Buffer<Balance<Discovery<B>>>;
fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
req.uri().authority_part().cloned()
req.uri().authority_part().map(|authority|
FullyQualifiedAuthority::new(
authority,
self.default_namespace.as_ref().map(|s| s.as_ref()),
self.default_zone.as_ref().map(|s| s.as_ref()))
)
}
/// Builds a dynamic, load balancing service.
@ -68,7 +80,7 @@ where
/// changed.
fn bind_service(
&mut self,
authority: &http::uri::Authority,
authority: &FullyQualifiedAuthority,
) -> Result<Self::Service, Self::RouteError> {
debug!("building outbound client to {:?}", authority);