proxy: use original dst if authority doesnt look like local service (#397)
The proxy will check that the requested authority looks like a local service, and if it doesn't, it will no longer ask the Destination service about the request, instead just using the SO_ORIGINAL_DST, enabling egress naturally. The rules used to determine if it looks like a local service come from this comment: > If default_zone.is_none() and the name is in the form $a.$b.svc, or if !default_zone.is_none() and the name is in the form $a.$b.svc.$default_zone, for some a and some b, then use the Destination service. Otherwise, use the IP given.
This commit is contained in:
parent
ea06854fca
commit
30dc48db51
|
|
@ -142,9 +142,9 @@ const ENV_PUBLIC_CONNECT_TIMEOUT: &str = "CONDUIT_PROXY_PUBLIC_CONNECT_TIMEOUT";
|
||||||
|
|
||||||
const ENV_NODE_NAME: &str = "CONDUIT_PROXY_NODE_NAME";
|
const ENV_NODE_NAME: &str = "CONDUIT_PROXY_NODE_NAME";
|
||||||
const ENV_POD_NAME: &str = "CONDUIT_PROXY_POD_NAME";
|
const ENV_POD_NAME: &str = "CONDUIT_PROXY_POD_NAME";
|
||||||
const ENV_POD_NAMESPACE: &str = "CONDUIT_PROXY_POD_NAMESPACE";
|
pub const ENV_POD_NAMESPACE: &str = "CONDUIT_PROXY_POD_NAMESPACE";
|
||||||
const ENV_POD_ZONE: &str = "CONDUIT_PROXY_POD_ZONE";
|
pub const ENV_POD_ZONE: &str = "CONDUIT_PROXY_POD_ZONE";
|
||||||
const ENV_DESTINATIONS_AUTOCOMPLETE_FQDN: &str = "CONDUIT_PROXY_DESTINATIONS_AUTOCOMPLETE_FQDN";
|
pub const ENV_DESTINATIONS_AUTOCOMPLETE_FQDN: &str = "CONDUIT_PROXY_DESTINATIONS_AUTOCOMPLETE_FQDN";
|
||||||
|
|
||||||
pub const ENV_CONTROL_URL: &str = "CONDUIT_PROXY_CONTROL_URL";
|
pub const ENV_CONTROL_URL: &str = "CONDUIT_PROXY_CONTROL_URL";
|
||||||
const ENV_RESOLV_CONF: &str = "CONDUIT_RESOLV_CONF";
|
const ENV_RESOLV_CONF: &str = "CONDUIT_RESOLV_CONF";
|
||||||
|
|
|
||||||
|
|
@ -13,12 +13,14 @@ impl FullyQualifiedAuthority {
|
||||||
/// Case folding is not done; that is done internally inside `Authority`.
|
/// Case folding is not done; that is done internally inside `Authority`.
|
||||||
///
|
///
|
||||||
/// This assumes the authority is syntactically valid.
|
/// This assumes the authority is syntactically valid.
|
||||||
pub fn new(authority: &Authority, default_namespace: Option<&str>,
|
///
|
||||||
|
/// Returns `None` is authority doesn't look like a local Kubernetes service.
|
||||||
|
pub fn normalize(authority: &Authority, default_namespace: Option<&str>,
|
||||||
default_zone: Option<&str>)
|
default_zone: Option<&str>)
|
||||||
-> FullyQualifiedAuthority {
|
-> Option<FullyQualifiedAuthority> {
|
||||||
// Don't change IP-address-based authorities.
|
// Don't change IP-address-based authorities.
|
||||||
if IpAddr::from_str(authority.host()).is_ok() {
|
if IpAddr::from_str(authority.host()).is_ok() {
|
||||||
return FullyQualifiedAuthority(authority.clone())
|
return None;
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: `Authority` doesn't have a way to get the serialized form of the
|
// TODO: `Authority` doesn't have a way to get the serialized form of the
|
||||||
|
|
@ -36,9 +38,11 @@ impl FullyQualifiedAuthority {
|
||||||
if name.ends_with('.') {
|
if name.ends_with('.') {
|
||||||
let authority = authority.clone().into_bytes();
|
let authority = authority.clone().into_bytes();
|
||||||
let normalized = authority.slice(0, authority.len() - 1);
|
let normalized = authority.slice(0, authority.len() - 1);
|
||||||
return FullyQualifiedAuthority(Authority::from_shared(normalized).unwrap());
|
return Some(FullyQualifiedAuthority(Authority::from_shared(normalized).unwrap()));
|
||||||
}
|
}
|
||||||
let mut parts = name.split('.');
|
|
||||||
|
// parts should have a maximum 4 of pieces (name, namespace, svc, zone)
|
||||||
|
let mut parts = name.splitn(4, '.');
|
||||||
|
|
||||||
// `Authority` guarantees the name has at least one part.
|
// `Authority` guarantees the name has at least one part.
|
||||||
assert!(parts.next().is_some());
|
assert!(parts.next().is_some());
|
||||||
|
|
@ -52,19 +56,34 @@ impl FullyQualifiedAuthority {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Rewrite "$name.$namespace" -> "$name.$namespace.svc".
|
// Rewrite "$name.$namespace" -> "$name.$namespace.svc".
|
||||||
let (has_svc, append_svc) = if let Some(part) = parts.next() {
|
let append_svc = if let Some(part) = parts.next() {
|
||||||
(part.eq_ignore_ascii_case("svc"), false)
|
if !part.eq_ignore_ascii_case("svc") {
|
||||||
|
// if not "$name.$namespace.svc", treat as external
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
false
|
||||||
|
} else if has_explicit_namespace {
|
||||||
|
true
|
||||||
|
} else if namespace_to_append.is_none() {
|
||||||
|
// We can't append ".svc" without a namespace, so treat as external.
|
||||||
|
return None;
|
||||||
} else {
|
} else {
|
||||||
let has_namespace =
|
true
|
||||||
has_explicit_namespace || namespace_to_append.is_some();
|
|
||||||
(has_namespace, has_namespace)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Rewrite "$name.$namespace.svc" -> "$name.$namespace.svc.$zone".
|
// Rewrite "$name.$namespace.svc" -> "$name.$namespace.svc.$zone".
|
||||||
let zone_to_append = if has_svc && parts.next().is_none() {
|
let zone_to_append = if let Some(zone) = parts.next() {
|
||||||
default_zone
|
if let Some(default_zone) = default_zone {
|
||||||
} else {
|
if !zone.eq_ignore_ascii_case(default_zone) {
|
||||||
|
// if "a.b.svc.foo" and zone is not "foo",
|
||||||
|
// treat as external
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
None
|
None
|
||||||
|
} else {
|
||||||
|
default_zone
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut additional_len = 0;
|
let mut additional_len = 0;
|
||||||
|
|
@ -80,7 +99,7 @@ impl FullyQualifiedAuthority {
|
||||||
|
|
||||||
// If we're not going to change anything then don't allocate anything.
|
// If we're not going to change anything then don't allocate anything.
|
||||||
if additional_len == 0 {
|
if additional_len == 0 {
|
||||||
return FullyQualifiedAuthority(authority.clone());
|
return Some(FullyQualifiedAuthority(authority.clone()));
|
||||||
}
|
}
|
||||||
|
|
||||||
// `authority.as_str().len()` includes the length of `colon_port`.
|
// `authority.as_str().len()` includes the length of `colon_port`.
|
||||||
|
|
@ -100,8 +119,8 @@ impl FullyQualifiedAuthority {
|
||||||
}
|
}
|
||||||
normalized.extend_from_slice(colon_port.as_bytes());
|
normalized.extend_from_slice(colon_port.as_bytes());
|
||||||
|
|
||||||
FullyQualifiedAuthority(Authority::from_shared(normalized.freeze())
|
Some(FullyQualifiedAuthority(Authority::from_shared(normalized.freeze())
|
||||||
.expect("syntactically-valid authority"))
|
.expect("syntactically-valid authority")))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn without_trailing_dot(&self) -> &str {
|
pub fn without_trailing_dot(&self) -> &str {
|
||||||
|
|
@ -120,13 +139,30 @@ mod tests {
|
||||||
use http::uri::Authority;
|
use http::uri::Authority;
|
||||||
|
|
||||||
let input = Authority::from_shared(Bytes::from(input.as_bytes())).unwrap();
|
let input = Authority::from_shared(Bytes::from(input.as_bytes())).unwrap();
|
||||||
let output = super::FullyQualifiedAuthority::new(
|
let output = match super::FullyQualifiedAuthority::normalize(
|
||||||
&input, default_namespace, default_zone);
|
&input, default_namespace, default_zone) {
|
||||||
|
Some(output) => output,
|
||||||
|
None => panic!(
|
||||||
|
"unexpected None for input={:?}, default_namespace={:?}, default_zone={:?}",
|
||||||
|
input,
|
||||||
|
default_namespace,
|
||||||
|
default_zone
|
||||||
|
),
|
||||||
|
};
|
||||||
output.without_trailing_dot().into()
|
output.without_trailing_dot().into()
|
||||||
}
|
}
|
||||||
|
|
||||||
assert_eq!("name",
|
fn none(input: &str, default_namespace: Option<&str>,
|
||||||
f("name", None, None));
|
default_zone: Option<&str>) {
|
||||||
|
use bytes::Bytes;
|
||||||
|
use http::uri::Authority;
|
||||||
|
|
||||||
|
let input = Authority::from_shared(Bytes::from(input.as_bytes())).unwrap();
|
||||||
|
assert_eq!(None, super::FullyQualifiedAuthority::normalize(
|
||||||
|
&input, default_namespace, default_zone));
|
||||||
|
}
|
||||||
|
|
||||||
|
none("name", None, None);
|
||||||
assert_eq!("name.namespace.svc",
|
assert_eq!("name.namespace.svc",
|
||||||
f("name.namespace", None, None));
|
f("name.namespace", None, None));
|
||||||
assert_eq!("name.namespace.svc",
|
assert_eq!("name.namespace.svc",
|
||||||
|
|
@ -147,14 +183,12 @@ mod tests {
|
||||||
assert_eq!("name.namespace.svc.cluster.local",
|
assert_eq!("name.namespace.svc.cluster.local",
|
||||||
f("name.namespace.svc.cluster.local", Some("namespace"), None));
|
f("name.namespace.svc.cluster.local", Some("namespace"), None));
|
||||||
|
|
||||||
assert_eq!("name",
|
none("name", None, Some("cluster.local"));
|
||||||
f("name", None, Some("cluster.local")));
|
|
||||||
assert_eq!("name.namespace.svc.cluster.local",
|
assert_eq!("name.namespace.svc.cluster.local",
|
||||||
f("name.namespace", None, Some("cluster.local")));
|
f("name.namespace", None, Some("cluster.local")));
|
||||||
assert_eq!("name.namespace.svc.cluster.local",
|
assert_eq!("name.namespace.svc.cluster.local",
|
||||||
f("name.namespace.svc", None, Some("cluster.local")));
|
f("name.namespace.svc", None, Some("cluster.local")));
|
||||||
assert_eq!("name.namespace.svc.cluster",
|
none("name.namespace.svc.cluster", None, Some("cluster.local"));
|
||||||
f("name.namespace.svc.cluster", None, Some("cluster.local")));
|
|
||||||
assert_eq!("name.namespace.svc.cluster.local",
|
assert_eq!("name.namespace.svc.cluster.local",
|
||||||
f("name.namespace.svc.cluster.local", None, Some("cluster.local")));
|
f("name.namespace.svc.cluster.local", None, Some("cluster.local")));
|
||||||
|
|
||||||
|
|
@ -164,8 +198,7 @@ mod tests {
|
||||||
f("name.namespace", Some("namespace"), Some("cluster.local")));
|
f("name.namespace", Some("namespace"), Some("cluster.local")));
|
||||||
assert_eq!("name.namespace.svc.cluster.local",
|
assert_eq!("name.namespace.svc.cluster.local",
|
||||||
f("name.namespace.svc", Some("namespace"), Some("cluster.local")));
|
f("name.namespace.svc", Some("namespace"), Some("cluster.local")));
|
||||||
assert_eq!("name.namespace.svc.cluster",
|
none("name.namespace.svc.cluster", Some("namespace"), Some("cluster.local"));
|
||||||
f("name.namespace.svc.cluster", Some("namespace"), Some("cluster.local")));
|
|
||||||
assert_eq!("name.namespace.svc.cluster.local",
|
assert_eq!("name.namespace.svc.cluster.local",
|
||||||
f("name.namespace.svc.cluster.local", Some("namespace"), Some("cluster.local")));
|
f("name.namespace.svc.cluster.local", Some("namespace"), Some("cluster.local")));
|
||||||
|
|
||||||
|
|
@ -208,29 +241,23 @@ mod tests {
|
||||||
f("name.namespace:1234", Some("namespace"), Some("cluster.local")));
|
f("name.namespace:1234", Some("namespace"), Some("cluster.local")));
|
||||||
assert_eq!("name.namespace.svc.cluster.local:1234",
|
assert_eq!("name.namespace.svc.cluster.local:1234",
|
||||||
f("name.namespace.svc:1234", Some("namespace"), Some("cluster.local")));
|
f("name.namespace.svc:1234", Some("namespace"), Some("cluster.local")));
|
||||||
assert_eq!("name.namespace.svc.cluster:1234",
|
none("name.namespace.svc.cluster:1234", Some("namespace"), Some("cluster.local"));
|
||||||
f("name.namespace.svc.cluster:1234", Some("namespace"), Some("cluster.local")));
|
|
||||||
assert_eq!("name.namespace.svc.cluster.local:1234",
|
assert_eq!("name.namespace.svc.cluster.local:1234",
|
||||||
f("name.namespace.svc.cluster.local:1234", Some("namespace"), Some("cluster.local")));
|
f("name.namespace.svc.cluster.local:1234", Some("namespace"), Some("cluster.local")));
|
||||||
|
|
||||||
// "SVC" is recognized as being equivalent to "svc"
|
// "SVC" is recognized as being equivalent to "svc"
|
||||||
assert_eq!("name.namespace.SVC.cluster.local",
|
assert_eq!("name.namespace.SVC.cluster.local",
|
||||||
f("name.namespace.SVC", Some("namespace"), Some("cluster.local")));
|
f("name.namespace.SVC", Some("namespace"), Some("cluster.local")));
|
||||||
assert_eq!("name.namespace.SVC.cluster",
|
none("name.namespace.SVC.cluster", Some("namespace"), Some("cluster.local"));
|
||||||
f("name.namespace.SVC.cluster", Some("namespace"), Some("cluster.local")));
|
|
||||||
assert_eq!("name.namespace.SVC.cluster.local",
|
assert_eq!("name.namespace.SVC.cluster.local",
|
||||||
f("name.namespace.SVC.cluster.local", Some("namespace"), Some("cluster.local")));
|
f("name.namespace.SVC.cluster.local", Some("namespace"), Some("cluster.local")));
|
||||||
|
|
||||||
// IPv4 addresses are left unchanged.
|
// IPv4 addresses are left unchanged.
|
||||||
assert_eq!("1.2.3.4",
|
none("1.2.3.4", Some("namespace"), Some("cluster.local"));
|
||||||
f("1.2.3.4", Some("namespace"), Some("cluster.local")));
|
none("1.2.3.4:1234", Some("namespace"), Some("cluster.local"));
|
||||||
assert_eq!("1.2.3.4:1234",
|
|
||||||
f("1.2.3.4:1234", Some("namespace"), Some("cluster.local")));
|
|
||||||
|
|
||||||
// IPv6 addresses are left unchanged.
|
// IPv6 addresses are left unchanged.
|
||||||
assert_eq!("[::1]",
|
none("[::1]", Some("namespace"), Some("cluster.local"));
|
||||||
f("[::1]", Some("namespace"), Some("cluster.local")));
|
none("[::1]:1234", Some("namespace"), Some("cluster.local"));
|
||||||
assert_eq!("[::1]:1234",
|
|
||||||
f("[::1]:1234", Some("namespace"), Some("cluster.local")));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,21 +1,24 @@
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
use futures::{Async, Poll};
|
||||||
use http;
|
use http;
|
||||||
use rand;
|
use rand;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tower;
|
use tower;
|
||||||
use tower_balance::{self, choose, load, Balance};
|
use tower_balance::{self, choose, load, Balance};
|
||||||
use tower_buffer::Buffer;
|
use tower_buffer::Buffer;
|
||||||
|
use tower_discover::{Change, Discover};
|
||||||
use tower_h2;
|
use tower_h2;
|
||||||
use conduit_proxy_router::Recognize;
|
use conduit_proxy_router::Recognize;
|
||||||
|
|
||||||
use bind::{self, Bind, Protocol};
|
use bind::{self, Bind, Protocol};
|
||||||
use control::{self, discovery};
|
use control::{self, discovery};
|
||||||
|
use control::discovery::Bind as BindTrait;
|
||||||
use ctx;
|
use ctx;
|
||||||
use fully_qualified_authority::FullyQualifiedAuthority;
|
use fully_qualified_authority::FullyQualifiedAuthority;
|
||||||
|
|
||||||
type BindProtocol<B> = bind::BindProtocol<Arc<ctx::Proxy>, B>;
|
type BindProtocol<B> = bind::BindProtocol<Arc<ctx::Proxy>, B>;
|
||||||
|
|
||||||
type Discovery<B> = discovery::Watch<BindProtocol<B>>;
|
|
||||||
|
|
||||||
pub struct Outbound<B> {
|
pub struct Outbound<B> {
|
||||||
bind: Bind<Arc<ctx::Proxy>, B>,
|
bind: Bind<Arc<ctx::Proxy>, B>,
|
||||||
discovery: control::Control,
|
discovery: control::Control,
|
||||||
|
|
@ -38,6 +41,12 @@ impl<B> Outbound<B> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||||
|
pub enum Destination {
|
||||||
|
LocalSvc(FullyQualifiedAuthority),
|
||||||
|
External(SocketAddr),
|
||||||
|
}
|
||||||
|
|
||||||
impl<B> Recognize for Outbound<B>
|
impl<B> Recognize for Outbound<B>
|
||||||
where
|
where
|
||||||
B: tower_h2::Body + 'static,
|
B: tower_h2::Body + 'static,
|
||||||
|
|
@ -45,7 +54,7 @@ where
|
||||||
type Request = http::Request<B>;
|
type Request = http::Request<B>;
|
||||||
type Response = bind::HttpResponse;
|
type Response = bind::HttpResponse;
|
||||||
type Error = <Self::Service as tower::Service>::Error;
|
type Error = <Self::Service as tower::Service>::Error;
|
||||||
type Key = (FullyQualifiedAuthority, Protocol);
|
type Key = (Destination, Protocol);
|
||||||
type RouteError = ();
|
type RouteError = ();
|
||||||
type Service = Buffer<Balance<
|
type Service = Buffer<Balance<
|
||||||
load::WithPendingRequests<Discovery<B>>,
|
load::WithPendingRequests<Discovery<B>>,
|
||||||
|
|
@ -53,18 +62,38 @@ where
|
||||||
>>;
|
>>;
|
||||||
|
|
||||||
fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
|
fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
|
||||||
req.uri().authority_part().map(|authority| {
|
let local = req.uri().authority_part().and_then(|authority| {
|
||||||
let auth = FullyQualifiedAuthority::new(
|
FullyQualifiedAuthority::normalize(
|
||||||
authority,
|
authority,
|
||||||
self.default_namespace.as_ref().map(|s| s.as_ref()),
|
self.default_namespace.as_ref().map(|s| s.as_ref()),
|
||||||
self.default_zone.as_ref().map(|s| s.as_ref()));
|
self.default_zone.as_ref().map(|s| s.as_ref()))
|
||||||
|
|
||||||
let proto = match req.version() {
|
});
|
||||||
http::Version::HTTP_2 => Protocol::Http2,
|
|
||||||
_ => Protocol::Http1,
|
// If we can't fully qualify the authority as a local service,
|
||||||
};
|
// and there is no original dst, then we have nothing! In that
|
||||||
(auth, proto)
|
// case, we return `None`, which results an "unrecognized" error.
|
||||||
})
|
//
|
||||||
|
// In practice, this shouldn't ever happen, since we expect the proxy
|
||||||
|
// to be run on Linux servers, with iptables setup, so there should
|
||||||
|
// always be an original destination.
|
||||||
|
let dest = if let Some(local) = local {
|
||||||
|
Destination::LocalSvc(local)
|
||||||
|
} else {
|
||||||
|
let orig_dst = req.extensions()
|
||||||
|
.get::<Arc<ctx::transport::Server>>()
|
||||||
|
.and_then(|ctx| {
|
||||||
|
ctx.orig_dst_if_not_local()
|
||||||
|
});
|
||||||
|
Destination::External(orig_dst?)
|
||||||
|
};
|
||||||
|
|
||||||
|
let proto = match req.version() {
|
||||||
|
http::Version::HTTP_2 => Protocol::Http2,
|
||||||
|
_ => Protocol::Http1,
|
||||||
|
};
|
||||||
|
|
||||||
|
Some((dest, proto))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Builds a dynamic, load balancing service.
|
/// Builds a dynamic, load balancing service.
|
||||||
|
|
@ -80,13 +109,20 @@ where
|
||||||
&mut self,
|
&mut self,
|
||||||
key: &Self::Key,
|
key: &Self::Key,
|
||||||
) -> Result<Self::Service, Self::RouteError> {
|
) -> Result<Self::Service, Self::RouteError> {
|
||||||
let &(ref authority, protocol) = key;
|
let &(ref dest, protocol) = key;
|
||||||
debug!("building outbound {:?} client to {:?}", protocol, authority);
|
debug!("building outbound {:?} client to {:?}", protocol, dest);
|
||||||
|
|
||||||
let resolve = self.discovery.resolve(
|
let resolve = match *dest {
|
||||||
authority,
|
Destination::LocalSvc(ref authority) => {
|
||||||
self.bind.clone().with_protocol(protocol),
|
Discovery::LocalSvc(self.discovery.resolve(
|
||||||
);
|
authority,
|
||||||
|
self.bind.clone().with_protocol(protocol),
|
||||||
|
))
|
||||||
|
},
|
||||||
|
Destination::External(addr) => {
|
||||||
|
Discovery::External(Some((addr, self.bind.clone().with_protocol(protocol))))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let loaded = tower_balance::load::WithPendingRequests::new(resolve);
|
let loaded = tower_balance::load::WithPendingRequests::new(resolve);
|
||||||
|
|
||||||
|
|
@ -99,3 +135,39 @@ where
|
||||||
Buffer::new(balance, self.bind.executor()).map_err(|_| {})
|
Buffer::new(balance, self.bind.executor()).map_err(|_| {})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum Discovery<B> {
|
||||||
|
LocalSvc(discovery::Watch<BindProtocol<B>>),
|
||||||
|
External(Option<(SocketAddr, BindProtocol<B>)>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B> Discover for Discovery<B>
|
||||||
|
where
|
||||||
|
B: tower_h2::Body + 'static,
|
||||||
|
{
|
||||||
|
type Key = SocketAddr;
|
||||||
|
type Request = http::Request<B>;
|
||||||
|
type Response = bind::HttpResponse;
|
||||||
|
type Error = <bind::Service<B> as tower::Service>::Error;
|
||||||
|
type Service = bind::Service<B>;
|
||||||
|
type DiscoverError = ();
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
|
||||||
|
match *self {
|
||||||
|
Discovery::LocalSvc(ref mut w) => w.poll(),
|
||||||
|
Discovery::External(ref mut opt) => {
|
||||||
|
// This "discovers" a single address for an external service
|
||||||
|
// that never has another change. This can mean it floats
|
||||||
|
// in the Balancer forever. However, when we finally add
|
||||||
|
// circuit-breaking, this should be able to take care of itself,
|
||||||
|
// closing down when the connection is no longer usable.
|
||||||
|
if let Some((addr, bind)) = opt.take() {
|
||||||
|
let svc = bind.bind(&addr)?;
|
||||||
|
Ok(Async::Ready(Change::Insert(addr, svc)))
|
||||||
|
} else {
|
||||||
|
Ok(Async::NotReady)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,10 +7,10 @@ fn outbound_asks_controller_api() {
|
||||||
|
|
||||||
let srv = server::new().route("/", "hello").route("/bye", "bye").run();
|
let srv = server::new().route("/", "hello").route("/bye", "bye").run();
|
||||||
let ctrl = controller::new()
|
let ctrl = controller::new()
|
||||||
.destination("test.conduit.local", srv.addr)
|
.destination("disco.test.svc.cluster.local", srv.addr)
|
||||||
.run();
|
.run();
|
||||||
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
|
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
|
||||||
let client = client::new(proxy.outbound, "test.conduit.local");
|
let client = client::new(proxy.outbound, "disco.test.svc.cluster.local");
|
||||||
|
|
||||||
assert_eq!(client.get("/"), "hello");
|
assert_eq!(client.get("/"), "hello");
|
||||||
assert_eq!(client.get("/bye"), "bye");
|
assert_eq!(client.get("/bye"), "bye");
|
||||||
|
|
@ -22,11 +22,11 @@ fn outbound_reconnects_if_controller_stream_ends() {
|
||||||
|
|
||||||
let srv = server::new().route("/recon", "nect").run();
|
let srv = server::new().route("/recon", "nect").run();
|
||||||
let ctrl = controller::new()
|
let ctrl = controller::new()
|
||||||
.destination_close("test.conduit.local")
|
.destination_close("disco.test.svc.cluster.local")
|
||||||
.destination("test.conduit.local", srv.addr)
|
.destination("disco.test.svc.cluster.local", srv.addr)
|
||||||
.run();
|
.run();
|
||||||
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
|
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
|
||||||
let client = client::new(proxy.outbound, "test.conduit.local");
|
let client = client::new(proxy.outbound, "disco.test.svc.cluster.local");
|
||||||
|
|
||||||
assert_eq!(client.get("/recon"), "nect");
|
assert_eq!(client.get("/recon"), "nect");
|
||||||
}
|
}
|
||||||
|
|
@ -39,18 +39,18 @@ fn outbound_updates_newer_services() {
|
||||||
//at the same time, do that here
|
//at the same time, do that here
|
||||||
let srv = server::http1().route("/h1", "hello h1").run();
|
let srv = server::http1().route("/h1", "hello h1").run();
|
||||||
let ctrl = controller::new()
|
let ctrl = controller::new()
|
||||||
.destination("test.conduit.local", srv.addr)
|
.destination("disco.test.svc.cluster.local", srv.addr)
|
||||||
.run();
|
.run();
|
||||||
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
|
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
|
||||||
// the HTTP2 service starts watching first, receiving an addr
|
// the HTTP2 service starts watching first, receiving an addr
|
||||||
// from the controller
|
// from the controller
|
||||||
let client1 = client::http2(proxy.outbound, "test.conduit.local");
|
let client1 = client::http2(proxy.outbound, "disco.test.svc.cluster.local");
|
||||||
client1.get("/h2"); // 500, ignore
|
client1.get("/h2"); // 500, ignore
|
||||||
|
|
||||||
// a new HTTP1 service needs to be build now, while the HTTP2
|
// a new HTTP1 service needs to be build now, while the HTTP2
|
||||||
// service already exists, so make sure previously sent addrs
|
// service already exists, so make sure previously sent addrs
|
||||||
// get into the newer service
|
// get into the newer service
|
||||||
let client2 = client::http1(proxy.outbound, "test.conduit.local");
|
let client2 = client::http1(proxy.outbound, "disco.test.svc.cluster.local");
|
||||||
assert_eq!(client2.get("/h1"), "hello h1");
|
assert_eq!(client2.get("/h1"), "hello h1");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -60,3 +60,25 @@ fn outbound_times_out() {
|
||||||
// Currently, the outbound router will wait forever until discovery tells
|
// Currently, the outbound router will wait forever until discovery tells
|
||||||
// it where to send the request. It should probably time out eventually.
|
// it where to send the request. It should probably time out eventually.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn outbound_uses_orig_dst_if_not_local_svc() {
|
||||||
|
let _ = env_logger::init();
|
||||||
|
|
||||||
|
let srv = server::new()
|
||||||
|
.route("/", "hello")
|
||||||
|
.route("/bye", "bye")
|
||||||
|
.run();
|
||||||
|
let ctrl = controller::new()
|
||||||
|
// no controller rule for srv
|
||||||
|
.run();
|
||||||
|
let proxy = proxy::new()
|
||||||
|
.controller(ctrl)
|
||||||
|
// set outbound orig_dst to srv
|
||||||
|
.outbound(srv)
|
||||||
|
.run();
|
||||||
|
let client = client::new(proxy.outbound, "versioncheck.conduit.io");
|
||||||
|
|
||||||
|
assert_eq!(client.get("/"), "hello");
|
||||||
|
assert_eq!(client.get("/bye"), "bye");
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -111,6 +111,10 @@ fn run(proxy: Proxy) -> Listening {
|
||||||
env.put(config::ENV_PUBLIC_LISTENER, "tcp://127.0.0.1:0".to_owned());
|
env.put(config::ENV_PUBLIC_LISTENER, "tcp://127.0.0.1:0".to_owned());
|
||||||
env.put(config::ENV_CONTROL_LISTENER, "tcp://127.0.0.1:0".to_owned());
|
env.put(config::ENV_CONTROL_LISTENER, "tcp://127.0.0.1:0".to_owned());
|
||||||
|
|
||||||
|
env.put(config::ENV_POD_NAMESPACE, "test".to_owned());
|
||||||
|
env.put(config::ENV_POD_ZONE, "cluster.local".to_owned());
|
||||||
|
env.put(config::ENV_DESTINATIONS_AUTOCOMPLETE_FQDN, "Kubernetes".to_owned());
|
||||||
|
|
||||||
let mut config = config::Config::try_from(&env).unwrap();
|
let mut config = config::Config::try_from(&env).unwrap();
|
||||||
|
|
||||||
// TODO: We currently can't use `config::ENV_METRICS_FLUSH_INTERVAL_SECS`
|
// TODO: We currently can't use `config::ENV_METRICS_FLUSH_INTERVAL_SECS`
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ fn inbound_sends_telemetry() {
|
||||||
.inbound(srv)
|
.inbound(srv)
|
||||||
.metrics_flush_interval(Duration::from_millis(500))
|
.metrics_flush_interval(Duration::from_millis(500))
|
||||||
.run();
|
.run();
|
||||||
let client = client::new(proxy.inbound, "test.conduit.local");
|
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
|
||||||
|
|
||||||
info!("client.get(/hey)");
|
info!("client.get(/hey)");
|
||||||
assert_eq!(client.get("/hey"), "hello");
|
assert_eq!(client.get("/hey"), "hello");
|
||||||
|
|
@ -30,11 +30,11 @@ fn inbound_sends_telemetry() {
|
||||||
// process
|
// process
|
||||||
assert_eq!(report.process.as_ref().unwrap().node, "");
|
assert_eq!(report.process.as_ref().unwrap().node, "");
|
||||||
assert_eq!(report.process.as_ref().unwrap().scheduled_instance, "");
|
assert_eq!(report.process.as_ref().unwrap().scheduled_instance, "");
|
||||||
assert_eq!(report.process.as_ref().unwrap().scheduled_namespace, "");
|
assert_eq!(report.process.as_ref().unwrap().scheduled_namespace, "test");
|
||||||
// requests
|
// requests
|
||||||
assert_eq!(report.requests.len(), 1);
|
assert_eq!(report.requests.len(), 1);
|
||||||
let req = &report.requests[0];
|
let req = &report.requests[0];
|
||||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
|
||||||
//assert_eq!(req.ctx.as_ref().unwrap().method, GET);
|
//assert_eq!(req.ctx.as_ref().unwrap().method, GET);
|
||||||
assert_eq!(req.count, 1);
|
assert_eq!(req.count, 1);
|
||||||
assert_eq!(req.responses.len(), 1);
|
assert_eq!(req.responses.len(), 1);
|
||||||
|
|
@ -68,7 +68,7 @@ fn http1_inbound_sends_telemetry() {
|
||||||
.inbound(srv)
|
.inbound(srv)
|
||||||
.metrics_flush_interval(Duration::from_millis(500))
|
.metrics_flush_interval(Duration::from_millis(500))
|
||||||
.run();
|
.run();
|
||||||
let client = client::http1(proxy.inbound, "test.conduit.local");
|
let client = client::http1(proxy.inbound, "tele.test.svc.cluster.local");
|
||||||
|
|
||||||
info!("client.get(/hey)");
|
info!("client.get(/hey)");
|
||||||
assert_eq!(client.get("/hey"), "hello");
|
assert_eq!(client.get("/hey"), "hello");
|
||||||
|
|
@ -80,7 +80,7 @@ fn http1_inbound_sends_telemetry() {
|
||||||
// requests
|
// requests
|
||||||
assert_eq!(report.requests.len(), 1);
|
assert_eq!(report.requests.len(), 1);
|
||||||
let req = &report.requests[0];
|
let req = &report.requests[0];
|
||||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
|
||||||
//assert_eq!(req.ctx.as_ref().unwrap().method, GET);
|
//assert_eq!(req.ctx.as_ref().unwrap().method, GET);
|
||||||
assert_eq!(req.count, 1);
|
assert_eq!(req.count, 1);
|
||||||
assert_eq!(req.responses.len(), 1);
|
assert_eq!(req.responses.len(), 1);
|
||||||
|
|
@ -117,7 +117,7 @@ fn inbound_aggregates_telemetry_over_several_requests() {
|
||||||
.inbound(srv)
|
.inbound(srv)
|
||||||
.metrics_flush_interval(Duration::from_millis(500))
|
.metrics_flush_interval(Duration::from_millis(500))
|
||||||
.run();
|
.run();
|
||||||
let client = client::new(proxy.inbound, "test.conduit.local");
|
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
|
||||||
|
|
||||||
info!("client.get(/hey)");
|
info!("client.get(/hey)");
|
||||||
assert_eq!(client.get("/hey"), "hello");
|
assert_eq!(client.get("/hey"), "hello");
|
||||||
|
|
@ -130,17 +130,13 @@ fn inbound_aggregates_telemetry_over_several_requests() {
|
||||||
let report = reports.wait().next().unwrap().unwrap();
|
let report = reports.wait().next().unwrap().unwrap();
|
||||||
// proxy inbound
|
// proxy inbound
|
||||||
assert_eq!(report.proxy, 0);
|
assert_eq!(report.proxy, 0);
|
||||||
// process
|
|
||||||
assert_eq!(report.process.as_ref().unwrap().node, "");
|
|
||||||
assert_eq!(report.process.as_ref().unwrap().scheduled_instance, "");
|
|
||||||
assert_eq!(report.process.as_ref().unwrap().scheduled_namespace, "");
|
|
||||||
|
|
||||||
// requests -----------------------
|
// requests -----------------------
|
||||||
assert_eq!(report.requests.len(), 2);
|
assert_eq!(report.requests.len(), 2);
|
||||||
|
|
||||||
// -- first request -----------------
|
// -- first request -----------------
|
||||||
let req = &report.requests[0];
|
let req = &report.requests[0];
|
||||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
|
||||||
assert_eq!(req.count, 1);
|
assert_eq!(req.count, 1);
|
||||||
assert_eq!(req.responses.len(), 1);
|
assert_eq!(req.responses.len(), 1);
|
||||||
// ---- response --------------------
|
// ---- response --------------------
|
||||||
|
|
@ -160,7 +156,7 @@ fn inbound_aggregates_telemetry_over_several_requests() {
|
||||||
|
|
||||||
// -- second request ----------------
|
// -- second request ----------------
|
||||||
let req = &report.requests[1];
|
let req = &report.requests[1];
|
||||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
|
||||||
// repeated twice
|
// repeated twice
|
||||||
assert_eq!(req.count, 2);
|
assert_eq!(req.count, 2);
|
||||||
assert_eq!(req.responses.len(), 1);
|
assert_eq!(req.responses.len(), 1);
|
||||||
|
|
@ -203,7 +199,7 @@ fn records_latency_statistics() {
|
||||||
.inbound(srv)
|
.inbound(srv)
|
||||||
.metrics_flush_interval(Duration::from_secs(5))
|
.metrics_flush_interval(Duration::from_secs(5))
|
||||||
.run();
|
.run();
|
||||||
let client = client::new(proxy.inbound, "test.conduit.local");
|
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
|
||||||
|
|
||||||
info!("client.get(/hey)");
|
info!("client.get(/hey)");
|
||||||
assert_eq!(client.get("/hey"), "hello");
|
assert_eq!(client.get("/hey"), "hello");
|
||||||
|
|
@ -219,7 +215,7 @@ fn records_latency_statistics() {
|
||||||
assert_eq!(report.requests.len(), 2);
|
assert_eq!(report.requests.len(), 2);
|
||||||
// first request
|
// first request
|
||||||
let req = &report.requests[0];
|
let req = &report.requests[0];
|
||||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
|
||||||
let res = &req.responses[0];
|
let res = &req.responses[0];
|
||||||
// response latencies should always have a length equal to the number
|
// response latencies should always have a length equal to the number
|
||||||
// of latency buckets in the latency histogram.
|
// of latency buckets in the latency histogram.
|
||||||
|
|
@ -239,7 +235,7 @@ fn records_latency_statistics() {
|
||||||
|
|
||||||
// second request
|
// second request
|
||||||
let req = &report.requests.get(1).expect("second report");
|
let req = &report.requests.get(1).expect("second report");
|
||||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
|
||||||
assert_eq!(req.count, 2);
|
assert_eq!(req.count, 2);
|
||||||
assert_eq!(req.responses.len(), 1);
|
assert_eq!(req.responses.len(), 1);
|
||||||
let res = req.responses.get(0).expect("responses[0]");
|
let res = req.responses.get(0).expect("responses[0]");
|
||||||
|
|
|
||||||
|
|
@ -7,10 +7,10 @@ fn outbound_http1() {
|
||||||
|
|
||||||
let srv = server::http1().route("/", "hello h1").run();
|
let srv = server::http1().route("/", "hello h1").run();
|
||||||
let ctrl = controller::new()
|
let ctrl = controller::new()
|
||||||
.destination("test.conduit.local", srv.addr)
|
.destination("transparency.test.svc.cluster.local", srv.addr)
|
||||||
.run();
|
.run();
|
||||||
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
|
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
|
||||||
let client = client::http1(proxy.outbound, "test.conduit.local");
|
let client = client::http1(proxy.outbound, "transparency.test.svc.cluster.local");
|
||||||
|
|
||||||
assert_eq!(client.get("/"), "hello h1");
|
assert_eq!(client.get("/"), "hello h1");
|
||||||
}
|
}
|
||||||
|
|
@ -25,7 +25,7 @@ fn inbound_http1() {
|
||||||
.controller(ctrl)
|
.controller(ctrl)
|
||||||
.inbound(srv)
|
.inbound(srv)
|
||||||
.run();
|
.run();
|
||||||
let client = client::http1(proxy.inbound, "test.conduit.local");
|
let client = client::http1(proxy.inbound, "transparency.test.svc.cluster.local");
|
||||||
|
|
||||||
assert_eq!(client.get("/"), "hello h1");
|
assert_eq!(client.get("/"), "hello h1");
|
||||||
}
|
}
|
||||||
|
|
@ -70,7 +70,7 @@ fn http1_removes_connection_headers() {
|
||||||
.controller(ctrl)
|
.controller(ctrl)
|
||||||
.inbound(srv)
|
.inbound(srv)
|
||||||
.run();
|
.run();
|
||||||
let client = client::http1(proxy.inbound, "test.conduit.local");
|
let client = client::http1(proxy.inbound, "transparency.test.svc.cluster.local");
|
||||||
|
|
||||||
let res = client.request(client.request_builder("/")
|
let res = client.request(client.request_builder("/")
|
||||||
.header("x-foo-bar", "baz")
|
.header("x-foo-bar", "baz")
|
||||||
|
|
@ -84,7 +84,7 @@ fn http1_removes_connection_headers() {
|
||||||
fn http10_with_host() {
|
fn http10_with_host() {
|
||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
|
|
||||||
let host = "test.conduit.local";
|
let host = "transparency.test.svc.cluster.local";
|
||||||
let srv = server::http1()
|
let srv = server::http1()
|
||||||
.route_fn("/", move |req| {
|
.route_fn("/", move |req| {
|
||||||
assert_eq!(req.version(), http::Version::HTTP_10);
|
assert_eq!(req.version(), http::Version::HTTP_10);
|
||||||
|
|
@ -144,7 +144,7 @@ fn http10_without_host() {
|
||||||
fn http11_absolute_uri_differs_from_host() {
|
fn http11_absolute_uri_differs_from_host() {
|
||||||
let _ = env_logger::init();
|
let _ = env_logger::init();
|
||||||
|
|
||||||
let host = "test.conduit.local";
|
let host = "transparency.test.svc.cluster.local";
|
||||||
let srv = server::http1()
|
let srv = server::http1()
|
||||||
.route_fn("/", move |req| {
|
.route_fn("/", move |req| {
|
||||||
assert_eq!(req.version(), http::Version::HTTP_11);
|
assert_eq!(req.version(), http::Version::HTTP_11);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue