mirror of https://github.com/linkerd/linkerd2.git
Proxy: Map unqualified/partially-qualified names to FQDN (#59)
* Proxy: Map unqualified/partially-qualified names to FQDN Previously we required the service to fully qualify all service names for outbound traffic. Many services are written assuming that Kubernetes will complete names using its DNS search path, and those services weren't working with Conduit. Now add an option, used by default, to fully-qualify the domain names. Currently only Kubernetes-like name completion for services is supported, but the configuration syntax is open-ended to allow for alternatives in the future. Also, the auto-completion can be disabled for applications that prefer to ensure they're always using unambiguous names. Once routing is implemented then it is likely that (default) routing rules will replace these hard-coded rules. Unit tests for the name completion logic are included. Part of the solution for #9. The changes to `conduit inject` to actually use this facility will be in another PR.
This commit is contained in:
parent
6f9e8be2ee
commit
8385a7a8c1
|
@ -51,7 +51,17 @@ pub struct Config {
|
|||
|
||||
pub pod_name: Option<String>,
|
||||
pub pod_namespace: Option<String>,
|
||||
pub pod_zone: Option<String>,
|
||||
pub node_name: Option<String>,
|
||||
|
||||
/// Should we use `pod_namespace` and/or `pod_zone` to map unqualified/partially-qualified
|
||||
/// to fully-qualified names using the given platform's conventions?
|
||||
destinations_autocomplete_fqdn: Option<Environment>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum Environment {
|
||||
Kubernetes,
|
||||
}
|
||||
|
||||
/// Configuration settings for binding a listener.
|
||||
|
@ -76,6 +86,7 @@ pub enum Error {
|
|||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum ParseError {
|
||||
EnviromentUnsupported,
|
||||
NotANumber,
|
||||
HostIsNotAnIpAddress,
|
||||
NotUnicode,
|
||||
|
@ -132,6 +143,8 @@ const ENV_PUBLIC_CONNECT_TIMEOUT: &str = "CONDUIT_PROXY_PUBLIC_CONNECT_TIMEOUT";
|
|||
const ENV_NODE_NAME: &str = "CONDUIT_PROXY_NODE_NAME";
|
||||
const ENV_POD_NAME: &str = "CONDUIT_PROXY_POD_NAME";
|
||||
const ENV_POD_NAMESPACE: &str = "CONDUIT_PROXY_POD_NAMESPACE";
|
||||
const ENV_POD_ZONE: &str = "CONDUIT_PROXY_POD_ZONE";
|
||||
const ENV_DESTINATIONS_AUTOCOMPLETE_FQDN: &str = "CONDUIT_PROXY_DESTINATIONS_AUTOCOMPLETE_FQDN";
|
||||
|
||||
pub const ENV_CONTROL_URL: &str = "CONDUIT_PROXY_CONTROL_URL";
|
||||
const ENV_RESOLV_CONF: &str = "CONDUIT_RESOLV_CONF";
|
||||
|
@ -170,7 +183,10 @@ impl<'a> TryFrom<&'a Strings> for Config {
|
|||
let report_timeout = parse(strings, ENV_REPORT_TIMEOUT_SECS, parse_number);
|
||||
let pod_name = strings.get(ENV_POD_NAME);
|
||||
let pod_namespace = strings.get(ENV_POD_NAMESPACE);
|
||||
let pod_zone = strings.get(ENV_POD_ZONE);
|
||||
let node_name = strings.get(ENV_NODE_NAME);
|
||||
let destinations_autocomplete_fqdn =
|
||||
parse(strings, ENV_DESTINATIONS_AUTOCOMPLETE_FQDN, parse_environment);
|
||||
|
||||
Ok(Config {
|
||||
private_listener: Listener {
|
||||
|
@ -204,11 +220,29 @@ impl<'a> TryFrom<&'a Strings> for Config {
|
|||
Duration::from_secs(report_timeout?.unwrap_or(DEFAULT_REPORT_TIMEOUT_SECS)),
|
||||
pod_name: pod_name?,
|
||||
pod_namespace: pod_namespace?,
|
||||
pod_zone: pod_zone?,
|
||||
node_name: node_name?,
|
||||
destinations_autocomplete_fqdn: destinations_autocomplete_fqdn?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Config {
|
||||
pub fn default_destination_namespace(&self) -> Option<&String> {
|
||||
match self.destinations_autocomplete_fqdn {
|
||||
Some(Environment::Kubernetes) => self.pod_namespace.as_ref(),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn default_destination_zone(&self) -> Option<&String> {
|
||||
match self.destinations_autocomplete_fqdn {
|
||||
Some(Environment::Kubernetes) => self.pod_zone.as_ref(),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Addr =====
|
||||
|
||||
impl FromStr for Addr {
|
||||
|
@ -275,6 +309,13 @@ impl Strings for TestEnv {
|
|||
|
||||
// ===== Parsing =====
|
||||
|
||||
fn parse_environment(s: &str) -> Result<Environment, ParseError> {
|
||||
match s {
|
||||
"Kubernetes" => Ok(Environment::Kubernetes),
|
||||
_ => Err(ParseError::EnviromentUnsupported),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_number<T>(s: &str) -> Result<T, ParseError> where T: FromStr {
|
||||
s.parse().map_err(|_| ParseError::NotANumber)
|
||||
}
|
||||
|
|
|
@ -4,11 +4,12 @@ use std::net::SocketAddr;
|
|||
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use futures::sync::mpsc;
|
||||
use http::uri::Authority;
|
||||
use tower::Service;
|
||||
use tower_discover::{Change, Discover};
|
||||
use tower_grpc;
|
||||
|
||||
use fully_qualified_authority::FullyQualifiedAuthority;
|
||||
|
||||
use super::codec::Protobuf;
|
||||
use super::pb::common::{Destination, TcpAddress};
|
||||
use super::pb::proxy::destination::Update as PbUpdate;
|
||||
|
@ -24,7 +25,7 @@ pub type ClientBody = ::tower_grpc::client::codec::EncodingBody<
|
|||
/// A handle to start watching a destination for address changes.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Discovery {
|
||||
tx: mpsc::UnboundedSender<(Authority, mpsc::UnboundedSender<Update>)>,
|
||||
tx: mpsc::UnboundedSender<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
|
||||
}
|
||||
|
||||
/// A `tower_discover::Discover`, given to a `tower_balance::Balance`.
|
||||
|
@ -37,7 +38,7 @@ pub struct Watch<B> {
|
|||
/// A background handle to eventually bind on the controller thread.
|
||||
#[derive(Debug)]
|
||||
pub struct Background {
|
||||
rx: mpsc::UnboundedReceiver<(Authority, mpsc::UnboundedSender<Update>)>,
|
||||
rx: mpsc::UnboundedReceiver<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
|
||||
}
|
||||
|
||||
type DiscoveryWatch<F> = DestinationSet<
|
||||
|
@ -51,14 +52,14 @@ type DiscoveryWatch<F> = DestinationSet<
|
|||
/// the controller destination API.
|
||||
#[derive(Debug)]
|
||||
pub struct DiscoveryWork<F> {
|
||||
destinations: HashMap<Authority, DiscoveryWatch<F>>,
|
||||
destinations: HashMap<FullyQualifiedAuthority, DiscoveryWatch<F>>,
|
||||
/// A queue of authorities that need to be reconnected.
|
||||
reconnects: VecDeque<Authority>,
|
||||
reconnects: VecDeque<FullyQualifiedAuthority>,
|
||||
/// The Destination.Get RPC client service.
|
||||
/// Each poll, records whether the rpc service was till ready.
|
||||
rpc_ready: bool,
|
||||
/// A receiver of new watch requests.
|
||||
rx: mpsc::UnboundedReceiver<(Authority, mpsc::UnboundedSender<Update>)>,
|
||||
rx: mpsc::UnboundedReceiver<(FullyQualifiedAuthority, mpsc::UnboundedSender<Update>)>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -115,7 +116,7 @@ pub fn new() -> (Discovery, Background) {
|
|||
|
||||
impl Discovery {
|
||||
/// Start watching for address changes for a certain authority.
|
||||
pub fn resolve<B>(&self, authority: &Authority, bind: B) -> Watch<B> {
|
||||
pub fn resolve<B>(&self, authority: &FullyQualifiedAuthority, bind: B) -> Watch<B> {
|
||||
trace!("resolve; authority={:?}", authority);
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
self.tx
|
||||
|
@ -251,7 +252,7 @@ where
|
|||
Entry::Vacant(vac) => {
|
||||
let req = Destination {
|
||||
scheme: "k8s".into(),
|
||||
path: vac.key().as_str().into(),
|
||||
path: vac.key().without_trailing_dot().into(),
|
||||
};
|
||||
let stream = DestinationSvc::new(&mut rpc).get(req);
|
||||
vac.insert(DestinationSet {
|
||||
|
@ -292,7 +293,7 @@ where
|
|||
trace!("Destination.Get reconnect {:?}", auth);
|
||||
let req = Destination {
|
||||
scheme: "k8s".into(),
|
||||
path: auth.as_str().into(),
|
||||
path: auth.without_trailing_dot().into(),
|
||||
};
|
||||
set.rx = DestinationSvc::new(&mut rpc).get(req);
|
||||
set.needs_reconnect = false;
|
||||
|
@ -346,7 +347,7 @@ where
|
|||
};
|
||||
if needs_reconnect {
|
||||
set.needs_reconnect = true;
|
||||
self.reconnects.push_back(Authority::clone(auth));
|
||||
self.reconnects.push_back(FullyQualifiedAuthority::clone(auth));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ use tower_reconnect::Reconnect;
|
|||
use url::HostAndPort;
|
||||
|
||||
use dns;
|
||||
use fully_qualified_authority::FullyQualifiedAuthority;
|
||||
use transport::LookupAddressAndConnect;
|
||||
use timeout::Timeout;
|
||||
|
||||
|
@ -57,7 +58,7 @@ pub fn new() -> (Control, Background) {
|
|||
// ===== impl Control =====
|
||||
|
||||
impl Control {
|
||||
pub fn resolve<B>(&self, auth: &http::uri::Authority, bind: B) -> Watch<B> {
|
||||
pub fn resolve<B>(&self, auth: &FullyQualifiedAuthority, bind: B) -> Watch<B> {
|
||||
self.disco.resolve(auth, bind)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,237 @@
|
|||
use bytes::BytesMut;
|
||||
|
||||
use std::ascii::AsciiExt;
|
||||
use std::net::IpAddr;
|
||||
use std::str::FromStr;
|
||||
|
||||
use http::uri::Authority;
|
||||
|
||||
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
|
||||
pub struct FullyQualifiedAuthority(Authority);
|
||||
|
||||
impl FullyQualifiedAuthority {
|
||||
/// Normalizes the name according to Kubernetes service naming conventions.
|
||||
/// Case folding is not done; that is done internally inside `Authority`.
|
||||
///
|
||||
/// This assumes the authority is syntactically valid.
|
||||
pub fn new(authority: &Authority, default_namespace: Option<&str>,
|
||||
default_zone: Option<&str>)
|
||||
-> FullyQualifiedAuthority {
|
||||
// Don't change IP-address-based authorities.
|
||||
if IpAddr::from_str(authority.host()).is_ok() {
|
||||
return FullyQualifiedAuthority(authority.clone())
|
||||
};
|
||||
|
||||
// TODO: `Authority` doesn't have a way to get the serialized form of the
|
||||
// port, so do it ourselves.
|
||||
let (name, colon_port) = {
|
||||
let authority = authority.as_str();
|
||||
match authority.rfind(':') {
|
||||
Some(p) => authority.split_at(p),
|
||||
None => (authority, ""),
|
||||
}
|
||||
};
|
||||
|
||||
// A fully qualified name ending with a dot is normalized by removing the
|
||||
// dot and doing nothing else.
|
||||
if name.ends_with('.') {
|
||||
let authority = authority.clone().into_bytes();
|
||||
let normalized = authority.slice(0, authority.len() - 1);
|
||||
return FullyQualifiedAuthority(Authority::from_shared(normalized).unwrap());
|
||||
}
|
||||
let mut parts = name.split('.');
|
||||
|
||||
// `Authority` guarantees the name has at least one part.
|
||||
assert!(parts.next().is_some());
|
||||
|
||||
// Rewrite "$name" -> "$name.$default_namespace".
|
||||
let has_explicit_namespace = parts.next().is_some();
|
||||
let namespace_to_append = if !has_explicit_namespace {
|
||||
default_namespace
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Rewrite "$name.$namespace" -> "$name.$namespace.svc".
|
||||
let (has_svc, append_svc) = if let Some(part) = parts.next() {
|
||||
(part.eq_ignore_ascii_case("svc"), false)
|
||||
} else {
|
||||
let has_namespace =
|
||||
has_explicit_namespace || namespace_to_append.is_some();
|
||||
(has_namespace, has_namespace)
|
||||
};
|
||||
|
||||
// Rewrite "$name.$namespace.svc" -> "$name.$namespace.svc.$zone".
|
||||
let zone_to_append = if has_svc && parts.next().is_none() {
|
||||
default_zone
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut additional_len = 0;
|
||||
if let Some(namespace) = namespace_to_append {
|
||||
additional_len += 1 + namespace.len(); // "." + namespace
|
||||
}
|
||||
if append_svc {
|
||||
additional_len += 4; // ".svc"
|
||||
}
|
||||
if let Some(zone) = zone_to_append {
|
||||
additional_len += 1 + zone.len(); // "." + zone
|
||||
}
|
||||
|
||||
// If we're not going to change anything then don't allocate anything.
|
||||
if additional_len == 0 {
|
||||
return FullyQualifiedAuthority(authority.clone());
|
||||
}
|
||||
|
||||
// `authority.as_str().len()` includes the length of `colon_port`.
|
||||
let mut normalized =
|
||||
BytesMut::with_capacity(authority.as_str().len() + additional_len);
|
||||
normalized.extend_from_slice(name.as_bytes());
|
||||
if let Some(namespace) = namespace_to_append {
|
||||
normalized.extend_from_slice(b".");
|
||||
normalized.extend_from_slice(namespace.as_bytes());
|
||||
}
|
||||
if append_svc {
|
||||
normalized.extend_from_slice(b".svc");
|
||||
}
|
||||
if let Some(zone) = zone_to_append {
|
||||
normalized.extend_from_slice(b".");
|
||||
normalized.extend_from_slice(zone.as_bytes());
|
||||
}
|
||||
normalized.extend_from_slice(colon_port.as_bytes());
|
||||
|
||||
FullyQualifiedAuthority(Authority::from_shared(normalized.freeze())
|
||||
.expect("syntactically-valid authority"))
|
||||
}
|
||||
|
||||
pub fn without_trailing_dot(&self) -> &str {
|
||||
self.0.as_str()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[test]
|
||||
fn test_normalized_authority() {
|
||||
fn f(input: &str, default_namespace: Option<&str>,
|
||||
default_zone: Option<&str>)
|
||||
-> String {
|
||||
use bytes::Bytes;
|
||||
use http::uri::Authority;
|
||||
|
||||
let input = Authority::from_shared(Bytes::from(input.as_bytes())).unwrap();
|
||||
let output = super::FullyQualifiedAuthority::new(
|
||||
&input, default_namespace, default_zone);
|
||||
output.without_trailing_dot().into()
|
||||
}
|
||||
|
||||
assert_eq!("name",
|
||||
f("name", None, None));
|
||||
assert_eq!("name.namespace.svc",
|
||||
f("name.namespace", None, None));
|
||||
assert_eq!("name.namespace.svc",
|
||||
f("name.namespace.svc", None, None));
|
||||
assert_eq!("name.namespace.svc.cluster",
|
||||
f("name.namespace.svc.cluster", None, None));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc.cluster.local", None, None));
|
||||
|
||||
assert_eq!("name.namespace.svc",
|
||||
f("name", Some("namespace"), None));
|
||||
assert_eq!("name.namespace.svc",
|
||||
f("name.namespace", Some("namespace"), None));
|
||||
assert_eq!("name.namespace.svc",
|
||||
f("name.namespace.svc", Some("namespace"), None));
|
||||
assert_eq!("name.namespace.svc.cluster",
|
||||
f("name.namespace.svc.cluster", Some("namespace"), None));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc.cluster.local", Some("namespace"), None));
|
||||
|
||||
assert_eq!("name",
|
||||
f("name", None, Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace", None, Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc", None, Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster",
|
||||
f("name.namespace.svc.cluster", None, Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc.cluster.local", None, Some("cluster.local")));
|
||||
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster",
|
||||
f("name.namespace.svc.cluster", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc.cluster.local", Some("namespace"), Some("cluster.local")));
|
||||
|
||||
// Fully-qualified names end with a dot and aren't modified except by removing the dot.
|
||||
assert_eq!("name",
|
||||
f("name.", None, None));
|
||||
assert_eq!("name.namespace",
|
||||
f("name.namespace.", None, None));
|
||||
assert_eq!("name.namespace.svc",
|
||||
f("name.namespace.svc.", None, None));
|
||||
assert_eq!("name.namespace.svc.cluster",
|
||||
f("name.namespace.svc.cluster.", None, None));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc.cluster.local.", None, None));
|
||||
assert_eq!("name",
|
||||
f("name.", Some("namespace"), None));
|
||||
assert_eq!("name.namespace",
|
||||
f("name.namespace.", Some("namespace"), None));
|
||||
assert_eq!("name.namespace.svc",
|
||||
f("name.namespace.svc.", Some("namespace"), None));
|
||||
assert_eq!("name.namespace.svc.cluster",
|
||||
f("name.namespace.svc.cluster.", Some("namespace"), None));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc.cluster.local.", Some("namespace"), None));
|
||||
assert_eq!("name",
|
||||
f("name.", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace",
|
||||
f("name.namespace.", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc",
|
||||
f("name.namespace.svc.", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster",
|
||||
f("name.namespace.svc.cluster.", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster.local",
|
||||
f("name.namespace.svc.cluster.local.", Some("namespace"), Some("cluster.local")));
|
||||
|
||||
// Ports are preserved.
|
||||
assert_eq!("name.namespace.svc.cluster.local:1234",
|
||||
f("name:1234", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster.local:1234",
|
||||
f("name.namespace:1234", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster.local:1234",
|
||||
f("name.namespace.svc:1234", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster:1234",
|
||||
f("name.namespace.svc.cluster:1234", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.svc.cluster.local:1234",
|
||||
f("name.namespace.svc.cluster.local:1234", Some("namespace"), Some("cluster.local")));
|
||||
|
||||
// "SVC" is recognized as being equivalent to "svc"
|
||||
assert_eq!("name.namespace.SVC.cluster.local",
|
||||
f("name.namespace.SVC", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.SVC.cluster",
|
||||
f("name.namespace.SVC.cluster", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("name.namespace.SVC.cluster.local",
|
||||
f("name.namespace.SVC.cluster.local", Some("namespace"), Some("cluster.local")));
|
||||
|
||||
// IPv4 addresses are left unchanged.
|
||||
assert_eq!("1.2.3.4",
|
||||
f("1.2.3.4", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("1.2.3.4:1234",
|
||||
f("1.2.3.4:1234", Some("namespace"), Some("cluster.local")));
|
||||
|
||||
// IPv6 addresses are left unchanged.
|
||||
assert_eq!("[::1]",
|
||||
f("[::1]", Some("namespace"), Some("cluster.local")));
|
||||
assert_eq!("[::1]:1234",
|
||||
f("[::1]:1234", Some("namespace"), Some("cluster.local")));
|
||||
}
|
||||
}
|
|
@ -62,6 +62,7 @@ pub mod control;
|
|||
pub mod convert;
|
||||
mod ctx;
|
||||
mod dns;
|
||||
mod fully_qualified_authority;
|
||||
mod inbound;
|
||||
mod logging;
|
||||
mod map_err;
|
||||
|
@ -204,10 +205,16 @@ impl Main {
|
|||
.map_or_else(|| bind.clone(), |t| bind.clone().with_connect_timeout(t))
|
||||
.with_ctx(ctx.clone());
|
||||
|
||||
let outgoing = Outbound::new(
|
||||
bind,
|
||||
control,
|
||||
config.default_destination_namespace().cloned(),
|
||||
config.default_destination_zone().cloned());
|
||||
|
||||
let fut = serve(
|
||||
outbound_listener,
|
||||
h2::server::Builder::default(),
|
||||
Outbound::new(bind, control),
|
||||
outgoing,
|
||||
ctx,
|
||||
sensors,
|
||||
&executor,
|
||||
|
|
|
@ -11,6 +11,7 @@ use tower_router::Recognize;
|
|||
use bind::Bind;
|
||||
use control;
|
||||
use ctx;
|
||||
use fully_qualified_authority::FullyQualifiedAuthority;
|
||||
use telemetry;
|
||||
use transport;
|
||||
|
||||
|
@ -29,15 +30,21 @@ type Error = tower_buffer::Error<
|
|||
pub struct Outbound<B> {
|
||||
bind: Bind<Arc<ctx::Proxy>, B>,
|
||||
discovery: control::Control,
|
||||
default_namespace: Option<String>,
|
||||
default_zone: Option<String>,
|
||||
}
|
||||
|
||||
// ===== impl Outbound =====
|
||||
|
||||
impl<B> Outbound<B> {
|
||||
pub fn new(bind: Bind<Arc<ctx::Proxy>, B>, discovery: control::Control) -> Self {
|
||||
pub fn new(bind: Bind<Arc<ctx::Proxy>, B>, discovery: control::Control,
|
||||
default_namespace: Option<String>, default_zone: Option<String>)
|
||||
-> Outbound<B> {
|
||||
Self {
|
||||
bind,
|
||||
discovery,
|
||||
default_namespace,
|
||||
default_zone,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -49,12 +56,17 @@ where
|
|||
type Request = http::Request<B>;
|
||||
type Response = http::Response<telemetry::sensor::http::ResponseBody<tower_h2::RecvBody>>;
|
||||
type Error = Error;
|
||||
type Key = http::uri::Authority;
|
||||
type Key = FullyQualifiedAuthority;
|
||||
type RouteError = ();
|
||||
type Service = Buffer<Balance<Discovery<B>>>;
|
||||
|
||||
fn recognize(&self, req: &Self::Request) -> Option<Self::Key> {
|
||||
req.uri().authority_part().cloned()
|
||||
req.uri().authority_part().map(|authority|
|
||||
FullyQualifiedAuthority::new(
|
||||
authority,
|
||||
self.default_namespace.as_ref().map(|s| s.as_ref()),
|
||||
self.default_zone.as_ref().map(|s| s.as_ref()))
|
||||
)
|
||||
}
|
||||
|
||||
/// Builds a dynamic, load balancing service.
|
||||
|
@ -68,7 +80,7 @@ where
|
|||
/// changed.
|
||||
fn bind_service(
|
||||
&mut self,
|
||||
authority: &http::uri::Authority,
|
||||
authority: &FullyQualifiedAuthority,
|
||||
) -> Result<Self::Service, Self::RouteError> {
|
||||
debug!("building outbound client to {:?}", authority);
|
||||
|
||||
|
|
Loading…
Reference in New Issue