mirror of https://github.com/linkerd/linkerd2.git
proxy: use original dst if authority doesnt look like local service (#397)
The proxy will check that the requested authority looks like a local service, and if it doesn't, it will no longer ask the Destination service about the request, instead just using the SO_ORIGINAL_DST, enabling egress naturally. The rules used to determine if it looks like a local service come from this comment: > If default_zone.is_none() and the name is in the form $a.$b.svc, or if !default_zone.is_none() and the name is in the form $a.$b.svc.$default_zone, for some a and some b, then use the Destination service. Otherwise, use the IP given.
This commit is contained in:
parent
b9b16195b8
commit
236f71fbe0
|
@ -142,9 +142,9 @@ const ENV_PUBLIC_CONNECT_TIMEOUT: &str = "CONDUIT_PROXY_PUBLIC_CONNECT_TIMEOUT";
|
|||
|
||||
const ENV_NODE_NAME: &str = "CONDUIT_PROXY_NODE_NAME";
|
||||
const ENV_POD_NAME: &str = "CONDUIT_PROXY_POD_NAME";
|
||||
const ENV_POD_NAMESPACE: &str = "CONDUIT_PROXY_POD_NAMESPACE";
|
||||
const ENV_POD_ZONE: &str = "CONDUIT_PROXY_POD_ZONE";
|
||||
const ENV_DESTINATIONS_AUTOCOMPLETE_FQDN: &str = "CONDUIT_PROXY_DESTINATIONS_AUTOCOMPLETE_FQDN";
|
||||
pub const ENV_POD_NAMESPACE: &str = "CONDUIT_PROXY_POD_NAMESPACE";
|
||||
pub const ENV_POD_ZONE: &str = "CONDUIT_PROXY_POD_ZONE";
|
||||
pub const ENV_DESTINATIONS_AUTOCOMPLETE_FQDN: &str = "CONDUIT_PROXY_DESTINATIONS_AUTOCOMPLETE_FQDN";
|
||||
|
||||
pub const ENV_CONTROL_URL: &str = "CONDUIT_PROXY_CONTROL_URL";
|
||||
const ENV_RESOLV_CONF: &str = "CONDUIT_RESOLV_CONF";
|
||||
|
|
|
@ -13,12 +13,14 @@ impl FullyQualifiedAuthority {
|
|||
/// Case folding is not done; that is done internally inside `Authority`.
|
||||
///
|
||||
/// This assumes the authority is syntactically valid.
|
||||
pub fn new(authority: &Authority, default_namespace: Option<&str>,
|
||||
///
|
||||
/// Returns `None` is authority doesn't look like a local Kubernetes service.
|
||||
pub fn normalize(authority: &Authority, default_namespace: Option<&str>,
|
||||
default_zone: Option<&str>)
|
||||
-> FullyQualifiedAuthority {
|
||||
-> Option<FullyQualifiedAuthority> {
|
||||
// Don't change IP-address-based authorities.
|
||||
if IpAddr::from_str(authority.host()).is_ok() {
|
||||
return FullyQualifiedAuthority(authority.clone())
|
||||
return None;
|
||||
};
|
||||
|
||||
// TODO: `Authority` doesn't have a way to get the serialized form of the
|
||||
|
@ -36,9 +38,11 @@ impl FullyQualifiedAuthority {
|
|||
if name.ends_with('.') {
|
||||
let authority = authority.clone().into_bytes();
|
||||
let normalized = authority.slice(0, authority.len() - 1);
|
||||
return FullyQualifiedAuthority(Authority::from_shared(normalized).unwrap());
|
||||
return Some(FullyQualifiedAuthority(Authority::from_shared(normalized).unwrap()));
|
||||
}
|
||||
let mut parts = name.split('.');
|
||||
|
||||
// parts should have a maximum 4 of pieces (name, namespace, svc, zone)
|
||||
let mut parts = name.splitn(4, '.');
|
||||
|
||||
// `Authority` guarantees the name has at least one part.
|
||||
assert!(parts.next().is_some());
|
||||
|
@ -52,19 +56,34 @@ impl FullyQualifiedAuthority {
|
|||
};
|
||||
|
||||
// Rewrite "$name.$namespace" -> "$name.$namespace.svc".
|
||||
let (has_svc, append_svc) = if let Some(part) = parts.next() {
|
||||
(part.eq_ignore_ascii_case("svc"), false)
|
||||
let append_svc = if let Some(part) = parts.next() {
|
||||
if !part.eq_ignore_ascii_case("svc") {
|
||||
// if not "$name.$namespace.svc", treat as external
|
||||
return None;
|
||||
}
|
||||
|
||||
false
|
||||
} else if has_explicit_namespace {
|
||||
true
|
||||
} else if namespace_to_append.is_none() {
|
||||
// We can't append ".svc" without a namespace, so treat as external.
|
||||
return None;
|
||||
} else {
|
||||
let has_namespace =
|
||||
has_explicit_namespace || namespace_to_append.is_some();
|
||||
(has_namespace, has_namespace)
|
||||
true
|
||||
};
|
||||
|
||||
// Rewrite "$name.$namespace.svc" -> "$name.$namespace.svc.$zone".
|
||||
let zone_to_append = if has_svc && parts.next().is_none() {
|
||||
default_zone
|
||||
} else {
|
||||
let zone_to_append = if let Some(zone) = parts.next() {
|
||||
if let Some(default_zone) = default_zone {
|
||||
if !zone.eq_ignore_ascii_case(default_zone) {
|
||||
// if "a.b.svc.foo" and zone is not "foo",
|
||||
// treat as external
|
||||
return None;
|
||||
}
|
||||
}
|
||||
None
|
||||
} else {
|
||||
default_zone
|
||||
};
|
||||
|
||||
let mut additional_len = 0;
|
||||
|
@ -80,7 +99,7 @@ impl FullyQualifiedAuthority {
|
|||
|
||||
// If we're not going to change anything then don't allocate anything.
|
||||
if additional_len == 0 {
|
||||
return FullyQualifiedAuthority(authority.clone());
|
||||
return Some(FullyQualifiedAuthority(authority.clone()));
|
||||
}
|
||||
|
||||
// `authority.as_str().len()` includes the length of `colon_port`.
|
||||
|
@ -100,8 +119,8 @@ impl FullyQualifiedAuthority {
|
|||
}
|
||||
normalized.extend_from_slice(colon_port.as_bytes());
|
||||
|
||||
FullyQualifiedAuthority(Authority::from_shared(normalized.freeze())
|
||||
.expect("syntactically-valid authority"))
|
||||
Some(FullyQualifiedAuthority(Authority::from_shared(normalized.freeze())
|
||||
.expect("syntactically-valid authority")))
|
||||
}
|
||||
|
||||
pub fn without_trailing_dot(&self) -> &str {
|
||||
|
@ -120,13 +139,30 @@ mod tests {
|
|||
use http::uri::Authority;
|
||||
|
||||
let input = Authority::from_shared(Bytes::from(input.as_bytes())).unwrap();
|
||||
let output = super::FullyQualifiedAuthority::new(
|
||||
&input, default_namespace, default_zone);
|
||||
let output = match super::FullyQualifiedAuthority::normalize(
|
||||
&input, default_namespace, default_zone) {
|
||||
Some(output) => output,
|
||||
None => panic!(
|
||||
"unexpected None for input={:?}, default_namespace={:?}, default_zone={:?}",
|
||||
input,
|
||||
default_namespace,
|
||||
default_zone
|
||||
),
|
||||
};
|
||||
output.without_trailing_dot().into()
|
||||
}
|
||||
|
||||
assert_eq!("name",
|
||||
f("name", None, None));
|
||||
fn none(input: &str, default_namespace: Option<&str>,
|
||||
default_zone: Option<&str>) {
|
||||
use bytes::Bytes;
|
||||
use http::uri::Authority;
|
||||
|
||||
let input = Authority::from_shared(Bytes::from(input.as_bytes())).unwrap();
|
||||
assert_eq!(None, super::FullyQualifiedAuthority::normalize(
|
||||
&input, default_namespace, default_zone));
|
||||
}
|
||||
|
||||
none("name", None, None);
|
||||
assert_eq!("name.namespace.svc",
|
||||
f("name.namespace", None, None));
|
||||
assert_eq!("name.namespace.svc",
|
||||
|
@ -147,14 +183,12 @@ mod tests {
|
|||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc.cluster.local", Some("namespace"), None));
|
||||
|
||||
assert_eq!("name",
|
||||
f("name", None, Some("cluster.local")));
|
||||
none("name", None, Some("cluster.local"));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace", None, Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc", None, Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster",
|
||||
f("name.namespace.svc.cluster", None, Some("cluster.local")));
|
||||
none("name.namespace.svc.cluster", None, Some("cluster.local"));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc.cluster.local", None, Some("cluster.local")));
|
||||
|
||||
|
@ -164,8 +198,7 @@ mod tests {
|
|||
f("name.namespace", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster",
|
||||
f("name.namespace.svc.cluster", Some("namespace"), Some("cluster.local")));
|
||||
none("name.namespace.svc.cluster", Some("namespace"), Some("cluster.local"));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc.cluster.local", Some("namespace"), Some("cluster.local")));
|
||||
|
||||
|
@ -208,29 +241,23 @@ mod tests {
|
|||
f("name.namespace:1234", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster.local:1234",
|
||||
f("name.namespace.svc:1234", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster:1234",
|
||||
f("name.namespace.svc.cluster:1234", Some("namespace"), Some("cluster.local")));
|
||||
none("name.namespace.svc.cluster:1234", Some("namespace"), Some("cluster.local"));
|
||||
assert_eq!("name.namespace.svc.cluster.local:1234",
|
||||
f("name.namespace.svc.cluster.local:1234", Some("namespace"), Some("cluster.local")));
|
||||
|
||||
// "SVC" is recognized as being equivalent to "svc"
|
||||
assert_eq!("name.namespace.SVC.cluster.local",
|
||||
f("name.namespace.SVC", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.SVC.cluster",
|
||||
f("name.namespace.SVC.cluster", Some("namespace"), Some("cluster.local")));
|
||||
none("name.namespace.SVC.cluster", Some("namespace"), Some("cluster.local"));
|
||||
assert_eq!("name.namespace.SVC.cluster.local",
|
||||
f("name.namespace.SVC.cluster.local", Some("namespace"), Some("cluster.local")));
|
||||
|
||||
// IPv4 addresses are left unchanged.
|
||||
assert_eq!("1.2.3.4",
|
||||
f("1.2.3.4", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("1.2.3.4:1234",
|
||||
f("1.2.3.4:1234", Some("namespace"), Some("cluster.local")));
|
||||
none("1.2.3.4", Some("namespace"), Some("cluster.local"));
|
||||
none("1.2.3.4:1234", Some("namespace"), Some("cluster.local"));
|
||||
|
||||
// IPv6 addresses are left unchanged.
|
||||
assert_eq!("[::1]",
|
||||
f("[::1]", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("[::1]:1234",
|
||||
f("[::1]:1234", Some("namespace"), Some("cluster.local")));
|
||||
none("[::1]", Some("namespace"), Some("cluster.local"));
|
||||
none("[::1]:1234", Some("namespace"), Some("cluster.local"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,21 +1,24 @@
|
|||
use std::net::SocketAddr;
|
||||
|
||||
use futures::{Async, Poll};
|
||||
use http;
|
||||
use rand;
|
||||
use std::sync::Arc;
|
||||
use tower;
|
||||
use tower_balance::{self, choose, load, Balance};
|
||||
use tower_buffer::Buffer;
|
||||
use tower_discover::{Change, Discover};
|
||||
use tower_h2;
|
||||
use conduit_proxy_router::Recognize;
|
||||
|
||||
use bind::{self, Bind, Protocol};
|
||||
use control::{self, discovery};
|
||||
use control::discovery::Bind as BindTrait;
|
||||
use ctx;
|
||||
use fully_qualified_authority::FullyQualifiedAuthority;
|
||||
|
||||
type BindProtocol<B> = bind::BindProtocol<Arc<ctx::Proxy>, B>;
|
||||
|
||||
type Discovery<B> = discovery::Watch<BindProtocol<B>>;
|
||||
|
||||
pub struct Outbound<B> {
|
||||
bind: Bind<Arc<ctx::Proxy>, B>,
|
||||
discovery: control::Control,
|
||||
|
@ -38,6 +41,12 @@ impl<B> Outbound<B> {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
pub enum Destination {
|
||||
LocalSvc(FullyQualifiedAuthority),
|
||||
External(SocketAddr),
|
||||
}
|
||||
|
||||
impl<B> Recognize for Outbound<B>
|
||||
where
|
||||
B: tower_h2::Body + 'static,
|
||||
|
@ -45,7 +54,7 @@ where
|
|||
type Request = http::Request<B>;
|
||||
type Response = bind::HttpResponse;
|
||||
type Error = <Self::Service as tower::Service>::Error;
|
||||
type Key = (FullyQualifiedAuthority, Protocol);
|
||||
type Key = (Destination, Protocol);
|
||||
type RouteError = ();
|
||||
type Service = Buffer<Balance<
|
||||
load::WithPendingRequests<Discovery<B>>,
|
||||
|
@ -53,18 +62,38 @@ where
|
|||
>>;
|
||||
|
||||
fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
|
||||
req.uri().authority_part().map(|authority| {
|
||||
let auth = FullyQualifiedAuthority::new(
|
||||
let local = req.uri().authority_part().and_then(|authority| {
|
||||
FullyQualifiedAuthority::normalize(
|
||||
authority,
|
||||
self.default_namespace.as_ref().map(|s| s.as_ref()),
|
||||
self.default_zone.as_ref().map(|s| s.as_ref()));
|
||||
self.default_zone.as_ref().map(|s| s.as_ref()))
|
||||
|
||||
let proto = match req.version() {
|
||||
http::Version::HTTP_2 => Protocol::Http2,
|
||||
_ => Protocol::Http1,
|
||||
};
|
||||
(auth, proto)
|
||||
})
|
||||
});
|
||||
|
||||
// If we can't fully qualify the authority as a local service,
|
||||
// and there is no original dst, then we have nothing! In that
|
||||
// case, we return `None`, which results an "unrecognized" error.
|
||||
//
|
||||
// In practice, this shouldn't ever happen, since we expect the proxy
|
||||
// to be run on Linux servers, with iptables setup, so there should
|
||||
// always be an original destination.
|
||||
let dest = if let Some(local) = local {
|
||||
Destination::LocalSvc(local)
|
||||
} else {
|
||||
let orig_dst = req.extensions()
|
||||
.get::<Arc<ctx::transport::Server>>()
|
||||
.and_then(|ctx| {
|
||||
ctx.orig_dst_if_not_local()
|
||||
});
|
||||
Destination::External(orig_dst?)
|
||||
};
|
||||
|
||||
let proto = match req.version() {
|
||||
http::Version::HTTP_2 => Protocol::Http2,
|
||||
_ => Protocol::Http1,
|
||||
};
|
||||
|
||||
Some((dest, proto))
|
||||
}
|
||||
|
||||
/// Builds a dynamic, load balancing service.
|
||||
|
@ -80,13 +109,20 @@ where
|
|||
&mut self,
|
||||
key: &Self::Key,
|
||||
) -> Result<Self::Service, Self::RouteError> {
|
||||
let &(ref authority, protocol) = key;
|
||||
debug!("building outbound {:?} client to {:?}", protocol, authority);
|
||||
let &(ref dest, protocol) = key;
|
||||
debug!("building outbound {:?} client to {:?}", protocol, dest);
|
||||
|
||||
let resolve = self.discovery.resolve(
|
||||
authority,
|
||||
self.bind.clone().with_protocol(protocol),
|
||||
);
|
||||
let resolve = match *dest {
|
||||
Destination::LocalSvc(ref authority) => {
|
||||
Discovery::LocalSvc(self.discovery.resolve(
|
||||
authority,
|
||||
self.bind.clone().with_protocol(protocol),
|
||||
))
|
||||
},
|
||||
Destination::External(addr) => {
|
||||
Discovery::External(Some((addr, self.bind.clone().with_protocol(protocol))))
|
||||
}
|
||||
};
|
||||
|
||||
let loaded = tower_balance::load::WithPendingRequests::new(resolve);
|
||||
|
||||
|
@ -99,3 +135,39 @@ where
|
|||
Buffer::new(balance, self.bind.executor()).map_err(|_| {})
|
||||
}
|
||||
}
|
||||
|
||||
pub enum Discovery<B> {
|
||||
LocalSvc(discovery::Watch<BindProtocol<B>>),
|
||||
External(Option<(SocketAddr, BindProtocol<B>)>),
|
||||
}
|
||||
|
||||
impl<B> Discover for Discovery<B>
|
||||
where
|
||||
B: tower_h2::Body + 'static,
|
||||
{
|
||||
type Key = SocketAddr;
|
||||
type Request = http::Request<B>;
|
||||
type Response = bind::HttpResponse;
|
||||
type Error = <bind::Service<B> as tower::Service>::Error;
|
||||
type Service = bind::Service<B>;
|
||||
type DiscoverError = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Change<Self::Key, Self::Service>, Self::DiscoverError> {
|
||||
match *self {
|
||||
Discovery::LocalSvc(ref mut w) => w.poll(),
|
||||
Discovery::External(ref mut opt) => {
|
||||
// This "discovers" a single address for an external service
|
||||
// that never has another change. This can mean it floats
|
||||
// in the Balancer forever. However, when we finally add
|
||||
// circuit-breaking, this should be able to take care of itself,
|
||||
// closing down when the connection is no longer usable.
|
||||
if let Some((addr, bind)) = opt.take() {
|
||||
let svc = bind.bind(&addr)?;
|
||||
Ok(Async::Ready(Change::Insert(addr, svc)))
|
||||
} else {
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,10 +7,10 @@ fn outbound_asks_controller_api() {
|
|||
|
||||
let srv = server::new().route("/", "hello").route("/bye", "bye").run();
|
||||
let ctrl = controller::new()
|
||||
.destination("test.conduit.local", srv.addr)
|
||||
.destination("disco.test.svc.cluster.local", srv.addr)
|
||||
.run();
|
||||
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
|
||||
let client = client::new(proxy.outbound, "test.conduit.local");
|
||||
let client = client::new(proxy.outbound, "disco.test.svc.cluster.local");
|
||||
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
assert_eq!(client.get("/bye"), "bye");
|
||||
|
@ -22,11 +22,11 @@ fn outbound_reconnects_if_controller_stream_ends() {
|
|||
|
||||
let srv = server::new().route("/recon", "nect").run();
|
||||
let ctrl = controller::new()
|
||||
.destination_close("test.conduit.local")
|
||||
.destination("test.conduit.local", srv.addr)
|
||||
.destination_close("disco.test.svc.cluster.local")
|
||||
.destination("disco.test.svc.cluster.local", srv.addr)
|
||||
.run();
|
||||
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
|
||||
let client = client::new(proxy.outbound, "test.conduit.local");
|
||||
let client = client::new(proxy.outbound, "disco.test.svc.cluster.local");
|
||||
|
||||
assert_eq!(client.get("/recon"), "nect");
|
||||
}
|
||||
|
@ -39,18 +39,18 @@ fn outbound_updates_newer_services() {
|
|||
//at the same time, do that here
|
||||
let srv = server::http1().route("/h1", "hello h1").run();
|
||||
let ctrl = controller::new()
|
||||
.destination("test.conduit.local", srv.addr)
|
||||
.destination("disco.test.svc.cluster.local", srv.addr)
|
||||
.run();
|
||||
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
|
||||
// the HTTP2 service starts watching first, receiving an addr
|
||||
// from the controller
|
||||
let client1 = client::http2(proxy.outbound, "test.conduit.local");
|
||||
let client1 = client::http2(proxy.outbound, "disco.test.svc.cluster.local");
|
||||
client1.get("/h2"); // 500, ignore
|
||||
|
||||
// a new HTTP1 service needs to be build now, while the HTTP2
|
||||
// service already exists, so make sure previously sent addrs
|
||||
// get into the newer service
|
||||
let client2 = client::http1(proxy.outbound, "test.conduit.local");
|
||||
let client2 = client::http1(proxy.outbound, "disco.test.svc.cluster.local");
|
||||
assert_eq!(client2.get("/h1"), "hello h1");
|
||||
}
|
||||
|
||||
|
@ -60,3 +60,25 @@ fn outbound_times_out() {
|
|||
// Currently, the outbound router will wait forever until discovery tells
|
||||
// it where to send the request. It should probably time out eventually.
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn outbound_uses_orig_dst_if_not_local_svc() {
|
||||
let _ = env_logger::init();
|
||||
|
||||
let srv = server::new()
|
||||
.route("/", "hello")
|
||||
.route("/bye", "bye")
|
||||
.run();
|
||||
let ctrl = controller::new()
|
||||
// no controller rule for srv
|
||||
.run();
|
||||
let proxy = proxy::new()
|
||||
.controller(ctrl)
|
||||
// set outbound orig_dst to srv
|
||||
.outbound(srv)
|
||||
.run();
|
||||
let client = client::new(proxy.outbound, "versioncheck.conduit.io");
|
||||
|
||||
assert_eq!(client.get("/"), "hello");
|
||||
assert_eq!(client.get("/bye"), "bye");
|
||||
}
|
||||
|
|
|
@ -111,6 +111,10 @@ fn run(proxy: Proxy) -> Listening {
|
|||
env.put(config::ENV_PUBLIC_LISTENER, "tcp://127.0.0.1:0".to_owned());
|
||||
env.put(config::ENV_CONTROL_LISTENER, "tcp://127.0.0.1:0".to_owned());
|
||||
|
||||
env.put(config::ENV_POD_NAMESPACE, "test".to_owned());
|
||||
env.put(config::ENV_POD_ZONE, "cluster.local".to_owned());
|
||||
env.put(config::ENV_DESTINATIONS_AUTOCOMPLETE_FQDN, "Kubernetes".to_owned());
|
||||
|
||||
let mut config = config::Config::try_from(&env).unwrap();
|
||||
|
||||
// TODO: We currently can't use `config::ENV_METRICS_FLUSH_INTERVAL_SECS`
|
||||
|
|
|
@ -18,7 +18,7 @@ fn inbound_sends_telemetry() {
|
|||
.inbound(srv)
|
||||
.metrics_flush_interval(Duration::from_millis(500))
|
||||
.run();
|
||||
let client = client::new(proxy.inbound, "test.conduit.local");
|
||||
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
|
||||
|
||||
info!("client.get(/hey)");
|
||||
assert_eq!(client.get("/hey"), "hello");
|
||||
|
@ -30,11 +30,11 @@ fn inbound_sends_telemetry() {
|
|||
// process
|
||||
assert_eq!(report.process.as_ref().unwrap().node, "");
|
||||
assert_eq!(report.process.as_ref().unwrap().scheduled_instance, "");
|
||||
assert_eq!(report.process.as_ref().unwrap().scheduled_namespace, "");
|
||||
assert_eq!(report.process.as_ref().unwrap().scheduled_namespace, "test");
|
||||
// requests
|
||||
assert_eq!(report.requests.len(), 1);
|
||||
let req = &report.requests[0];
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
|
||||
//assert_eq!(req.ctx.as_ref().unwrap().method, GET);
|
||||
assert_eq!(req.count, 1);
|
||||
assert_eq!(req.responses.len(), 1);
|
||||
|
@ -68,7 +68,7 @@ fn http1_inbound_sends_telemetry() {
|
|||
.inbound(srv)
|
||||
.metrics_flush_interval(Duration::from_millis(500))
|
||||
.run();
|
||||
let client = client::http1(proxy.inbound, "test.conduit.local");
|
||||
let client = client::http1(proxy.inbound, "tele.test.svc.cluster.local");
|
||||
|
||||
info!("client.get(/hey)");
|
||||
assert_eq!(client.get("/hey"), "hello");
|
||||
|
@ -80,7 +80,7 @@ fn http1_inbound_sends_telemetry() {
|
|||
// requests
|
||||
assert_eq!(report.requests.len(), 1);
|
||||
let req = &report.requests[0];
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
|
||||
//assert_eq!(req.ctx.as_ref().unwrap().method, GET);
|
||||
assert_eq!(req.count, 1);
|
||||
assert_eq!(req.responses.len(), 1);
|
||||
|
@ -117,7 +117,7 @@ fn inbound_aggregates_telemetry_over_several_requests() {
|
|||
.inbound(srv)
|
||||
.metrics_flush_interval(Duration::from_millis(500))
|
||||
.run();
|
||||
let client = client::new(proxy.inbound, "test.conduit.local");
|
||||
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
|
||||
|
||||
info!("client.get(/hey)");
|
||||
assert_eq!(client.get("/hey"), "hello");
|
||||
|
@ -130,17 +130,13 @@ fn inbound_aggregates_telemetry_over_several_requests() {
|
|||
let report = reports.wait().next().unwrap().unwrap();
|
||||
// proxy inbound
|
||||
assert_eq!(report.proxy, 0);
|
||||
// process
|
||||
assert_eq!(report.process.as_ref().unwrap().node, "");
|
||||
assert_eq!(report.process.as_ref().unwrap().scheduled_instance, "");
|
||||
assert_eq!(report.process.as_ref().unwrap().scheduled_namespace, "");
|
||||
|
||||
// requests -----------------------
|
||||
assert_eq!(report.requests.len(), 2);
|
||||
|
||||
// -- first request -----------------
|
||||
let req = &report.requests[0];
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
|
||||
assert_eq!(req.count, 1);
|
||||
assert_eq!(req.responses.len(), 1);
|
||||
// ---- response --------------------
|
||||
|
@ -160,7 +156,7 @@ fn inbound_aggregates_telemetry_over_several_requests() {
|
|||
|
||||
// -- second request ----------------
|
||||
let req = &report.requests[1];
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
|
||||
// repeated twice
|
||||
assert_eq!(req.count, 2);
|
||||
assert_eq!(req.responses.len(), 1);
|
||||
|
@ -203,7 +199,7 @@ fn records_latency_statistics() {
|
|||
.inbound(srv)
|
||||
.metrics_flush_interval(Duration::from_secs(5))
|
||||
.run();
|
||||
let client = client::new(proxy.inbound, "test.conduit.local");
|
||||
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
|
||||
|
||||
info!("client.get(/hey)");
|
||||
assert_eq!(client.get("/hey"), "hello");
|
||||
|
@ -219,7 +215,7 @@ fn records_latency_statistics() {
|
|||
assert_eq!(report.requests.len(), 2);
|
||||
// first request
|
||||
let req = &report.requests[0];
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
|
||||
let res = &req.responses[0];
|
||||
// response latencies should always have a length equal to the number
|
||||
// of latency buckets in the latency histogram.
|
||||
|
@ -239,7 +235,7 @@ fn records_latency_statistics() {
|
|||
|
||||
// second request
|
||||
let req = &report.requests.get(1).expect("second report");
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "test.conduit.local");
|
||||
assert_eq!(req.ctx.as_ref().unwrap().authority, "tele.test.svc.cluster.local");
|
||||
assert_eq!(req.count, 2);
|
||||
assert_eq!(req.responses.len(), 1);
|
||||
let res = req.responses.get(0).expect("responses[0]");
|
||||
|
|
|
@ -7,10 +7,10 @@ fn outbound_http1() {
|
|||
|
||||
let srv = server::http1().route("/", "hello h1").run();
|
||||
let ctrl = controller::new()
|
||||
.destination("test.conduit.local", srv.addr)
|
||||
.destination("transparency.test.svc.cluster.local", srv.addr)
|
||||
.run();
|
||||
let proxy = proxy::new().controller(ctrl).outbound(srv).run();
|
||||
let client = client::http1(proxy.outbound, "test.conduit.local");
|
||||
let client = client::http1(proxy.outbound, "transparency.test.svc.cluster.local");
|
||||
|
||||
assert_eq!(client.get("/"), "hello h1");
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ fn inbound_http1() {
|
|||
.controller(ctrl)
|
||||
.inbound(srv)
|
||||
.run();
|
||||
let client = client::http1(proxy.inbound, "test.conduit.local");
|
||||
let client = client::http1(proxy.inbound, "transparency.test.svc.cluster.local");
|
||||
|
||||
assert_eq!(client.get("/"), "hello h1");
|
||||
}
|
||||
|
@ -70,7 +70,7 @@ fn http1_removes_connection_headers() {
|
|||
.controller(ctrl)
|
||||
.inbound(srv)
|
||||
.run();
|
||||
let client = client::http1(proxy.inbound, "test.conduit.local");
|
||||
let client = client::http1(proxy.inbound, "transparency.test.svc.cluster.local");
|
||||
|
||||
let res = client.request(client.request_builder("/")
|
||||
.header("x-foo-bar", "baz")
|
||||
|
@ -84,7 +84,7 @@ fn http1_removes_connection_headers() {
|
|||
fn http10_with_host() {
|
||||
let _ = env_logger::init();
|
||||
|
||||
let host = "test.conduit.local";
|
||||
let host = "transparency.test.svc.cluster.local";
|
||||
let srv = server::http1()
|
||||
.route_fn("/", move |req| {
|
||||
assert_eq!(req.version(), http::Version::HTTP_10);
|
||||
|
@ -144,7 +144,7 @@ fn http10_without_host() {
|
|||
fn http11_absolute_uri_differs_from_host() {
|
||||
let _ = env_logger::init();
|
||||
|
||||
let host = "test.conduit.local";
|
||||
let host = "transparency.test.svc.cluster.local";
|
||||
let srv = server::http1()
|
||||
.route_fn("/", move |req| {
|
||||
assert_eq!(req.version(), http::Version::HTTP_11);
|
||||
|
|
Loading…
Reference in New Issue