mirror of https://github.com/linkerd/linkerd2.git
Add support for consumer routes (#11118)
The [xRoute Binding KEP](https://gateway-api.sigs.k8s.io/geps/gep-1426/#namespace-boundaries) states that HttpRoutes may be created in either the namespace of their parent Service (producer routes) or in the namespace of the client initiating requests to the service (consumer routes). Linkerd currently only indexes producer routes and ignores consumer routes. We add support for consumer routes by changing the way that HttpRoutes are indexed. We now index each route by the namespace of its parent service instead of by the namespace of the HttpRoute resource. We then further subdivide the `ServiceRoutes` struct to have a watch per-client-namespace instead of just a single watch. This is because clients from different namespaces will have a different view of the routes for a service. When an HttpRoute is updated, if it is a producer route, we apply that HttpRoute to watches for all of the client namespaces. If the route was a consumer route, then we only apply the HttpRoute to watches for that consumer namespace. We also add API tests for consumer and producer routes. A few noteworthy changes: * Because the namespace of the client factors into the lookup, we had to change the discovery target to a type which includes the client namespace. * Because a service may have routes from different namespaces, the route metadata now needs to track group, kind, name, AND namespace instead of just using the namespace of the service. This means that many uses of the `GroupKindName` type are replaced with a `GroupKindNamespaceName` type. Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
parent
f7e2ff4931
commit
393c22553b
|
@ -132,7 +132,7 @@ displayed.`,
|
|||
client := outbound.NewOutboundPoliciesClient(conn)
|
||||
|
||||
result, err = client.Get(cmd.Context(), &outbound.TrafficSpec{
|
||||
SourceWorkload: "diagnostics",
|
||||
SourceWorkload: "default:diagnostics",
|
||||
Target: &outbound.TrafficSpec_Authority{Authority: fmt.Sprintf("%s.%s.svc:%d", name, namespace, port)},
|
||||
})
|
||||
if err != nil {
|
||||
|
|
|
@ -15,6 +15,14 @@ pub struct GroupKindName {
|
|||
pub name: Cow<'static, str>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
|
||||
pub struct GroupKindNamespaceName {
|
||||
pub group: Cow<'static, str>,
|
||||
pub kind: Cow<'static, str>,
|
||||
pub namespace: Cow<'static, str>,
|
||||
pub name: Cow<'static, str>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub enum HostMatch {
|
||||
Exact(String),
|
||||
|
@ -107,6 +115,15 @@ impl GroupKindName {
|
|||
&& self.kind.eq_ignore_ascii_case(&other.kind)
|
||||
&& self.name.eq_ignore_ascii_case(&other.name)
|
||||
}
|
||||
|
||||
pub fn namespaced(self, namespace: String) -> GroupKindNamespaceName {
|
||||
GroupKindNamespaceName {
|
||||
group: self.group,
|
||||
kind: self.kind,
|
||||
namespace: namespace.into(),
|
||||
name: self.name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// === impl PathMatch ===
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use crate::http_route::{
|
||||
GroupKindName, HostMatch, HttpRouteMatch, RequestHeaderModifierFilter, RequestRedirectFilter,
|
||||
GroupKindNamespaceName, HostMatch, HttpRouteMatch, RequestHeaderModifierFilter,
|
||||
RequestRedirectFilter,
|
||||
};
|
||||
use ahash::AHashMap as HashMap;
|
||||
use anyhow::Result;
|
||||
|
@ -14,14 +15,21 @@ pub trait DiscoverOutboundPolicy<T> {
|
|||
|
||||
async fn watch_outbound_policy(&self, target: T) -> Result<Option<OutboundPolicyStream>>;
|
||||
|
||||
fn lookup_ip(&self, addr: IpAddr, port: NonZeroU16) -> Option<T>;
|
||||
fn lookup_ip(&self, addr: IpAddr, port: NonZeroU16, source_namespace: String) -> Option<T>;
|
||||
}
|
||||
|
||||
pub type OutboundPolicyStream = Pin<Box<dyn Stream<Item = OutboundPolicy> + Send + Sync + 'static>>;
|
||||
|
||||
pub struct OutboundDiscoverTarget {
|
||||
pub service_name: String,
|
||||
pub service_namespace: String,
|
||||
pub service_port: NonZeroU16,
|
||||
pub source_namespace: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct OutboundPolicy {
|
||||
pub http_routes: HashMap<GroupKindName, HttpRoute>,
|
||||
pub http_routes: HashMap<GroupKindNamespaceName, HttpRoute>,
|
||||
pub authority: String,
|
||||
pub name: String,
|
||||
pub namespace: String,
|
||||
|
|
|
@ -9,10 +9,10 @@ use linkerd2_proxy_api::{
|
|||
},
|
||||
};
|
||||
use linkerd_policy_controller_core::{
|
||||
http_route::GroupKindName,
|
||||
http_route::GroupKindNamespaceName,
|
||||
outbound::{
|
||||
Backend, DiscoverOutboundPolicy, Filter, HttpRoute, HttpRouteRule, OutboundPolicy,
|
||||
OutboundPolicyStream,
|
||||
Backend, DiscoverOutboundPolicy, Filter, HttpRoute, HttpRouteRule, OutboundDiscoverTarget,
|
||||
OutboundPolicy, OutboundPolicyStream,
|
||||
},
|
||||
};
|
||||
use std::{net::SocketAddr, num::NonZeroU16, sync::Arc, time};
|
||||
|
@ -27,7 +27,7 @@ pub struct OutboundPolicyServer<T> {
|
|||
|
||||
impl<T> OutboundPolicyServer<T>
|
||||
where
|
||||
T: DiscoverOutboundPolicy<(String, String, NonZeroU16)> + Send + Sync + 'static,
|
||||
T: DiscoverOutboundPolicy<OutboundDiscoverTarget> + Send + Sync + 'static,
|
||||
{
|
||||
pub fn new(discover: T, cluster_domain: impl Into<Arc<str>>, drain: drain::Watch) -> Self {
|
||||
Self {
|
||||
|
@ -41,16 +41,33 @@ where
|
|||
OutboundPoliciesServer::new(self)
|
||||
}
|
||||
|
||||
fn lookup(
|
||||
&self,
|
||||
spec: outbound::TrafficSpec,
|
||||
) -> Result<(String, String, NonZeroU16), tonic::Status> {
|
||||
fn lookup(&self, spec: outbound::TrafficSpec) -> Result<OutboundDiscoverTarget, tonic::Status> {
|
||||
let target = spec
|
||||
.target
|
||||
.ok_or_else(|| tonic::Status::invalid_argument("target is required"))?;
|
||||
let source_namespace = spec
|
||||
.source_workload
|
||||
.split_once(':')
|
||||
.ok_or_else(|| {
|
||||
tonic::Status::invalid_argument(format!(
|
||||
"failed to parse source workload: {}",
|
||||
spec.source_workload
|
||||
))
|
||||
})?
|
||||
.0
|
||||
.to_string();
|
||||
let target = match target {
|
||||
outbound::traffic_spec::Target::Addr(target) => target,
|
||||
outbound::traffic_spec::Target::Authority(auth) => return self.lookup_authority(&auth),
|
||||
outbound::traffic_spec::Target::Authority(auth) => {
|
||||
return self.lookup_authority(&auth).map(
|
||||
|(service_namespace, service_name, service_port)| OutboundDiscoverTarget {
|
||||
service_name,
|
||||
service_namespace,
|
||||
service_port,
|
||||
source_namespace,
|
||||
},
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let port = target
|
||||
|
@ -69,7 +86,7 @@ where
|
|||
})?;
|
||||
|
||||
self.index
|
||||
.lookup_ip(addr, port)
|
||||
.lookup_ip(addr, port, source_namespace)
|
||||
.ok_or_else(|| tonic::Status::not_found("No such service"))
|
||||
}
|
||||
|
||||
|
@ -119,7 +136,7 @@ where
|
|||
#[async_trait::async_trait]
|
||||
impl<T> OutboundPolicies for OutboundPolicyServer<T>
|
||||
where
|
||||
T: DiscoverOutboundPolicy<(String, String, NonZeroU16)> + Send + Sync + 'static,
|
||||
T: DiscoverOutboundPolicy<OutboundDiscoverTarget> + Send + Sync + 'static,
|
||||
{
|
||||
async fn get(
|
||||
&self,
|
||||
|
@ -215,9 +232,7 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy {
|
|||
|
||||
let mut http_routes: Vec<_> = http_routes
|
||||
.into_iter()
|
||||
.map(|(gkn, route)| {
|
||||
convert_outbound_http_route(outbound.namespace.clone(), gkn, route, backend.clone())
|
||||
})
|
||||
.map(|(gknn, route)| convert_outbound_http_route(gknn, route, backend.clone()))
|
||||
.collect();
|
||||
|
||||
if http_routes.is_empty() {
|
||||
|
@ -291,8 +306,7 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy {
|
|||
}
|
||||
|
||||
fn convert_outbound_http_route(
|
||||
namespace: String,
|
||||
gkn: GroupKindName,
|
||||
gknn: GroupKindNamespaceName,
|
||||
HttpRoute {
|
||||
hostnames,
|
||||
rules,
|
||||
|
@ -302,10 +316,10 @@ fn convert_outbound_http_route(
|
|||
) -> outbound::HttpRoute {
|
||||
let metadata = Some(Metadata {
|
||||
kind: Some(metadata::Kind::Resource(api::meta::Resource {
|
||||
group: gkn.group.to_string(),
|
||||
kind: gkn.kind.to_string(),
|
||||
namespace,
|
||||
name: gkn.name.to_string(),
|
||||
group: gknn.group.to_string(),
|
||||
kind: gknn.kind.to_string(),
|
||||
namespace: gknn.namespace.to_string(),
|
||||
name: gknn.name.to_string(),
|
||||
..Default::default()
|
||||
})),
|
||||
});
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use anyhow::{anyhow, bail, Result};
|
||||
use k8s_gateway_api as api;
|
||||
use kube::{Resource, ResourceExt};
|
||||
use linkerd_policy_controller_core::http_route::{self, GroupKindName};
|
||||
use linkerd_policy_controller_core::http_route::{self, GroupKindName, GroupKindNamespaceName};
|
||||
use linkerd_policy_controller_k8s_api::policy;
|
||||
use std::num::NonZeroU16;
|
||||
|
||||
|
@ -44,10 +44,12 @@ impl HttpRouteResource {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) fn gkn(&self) -> GroupKindName {
|
||||
pub(crate) fn gknn(&self) -> GroupKindNamespaceName {
|
||||
match self {
|
||||
HttpRouteResource::Linkerd(route) => gkn_for_resource(route),
|
||||
HttpRouteResource::Gateway(route) => gkn_for_resource(route),
|
||||
HttpRouteResource::Linkerd(route) => gkn_for_resource(route)
|
||||
.namespaced(route.namespace().expect("Route must have namespace")),
|
||||
HttpRouteResource::Gateway(route) => gkn_for_resource(route)
|
||||
.namespaced(route.namespace().expect("Route must have namespace")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ use ahash::AHashMap as HashMap;
|
|||
use anyhow::{bail, ensure, Result};
|
||||
use k8s_gateway_api::{BackendObjectReference, HttpBackendRef, ParentReference};
|
||||
use linkerd_policy_controller_core::{
|
||||
http_route::GroupKindName,
|
||||
http_route::GroupKindNamespaceName,
|
||||
outbound::{
|
||||
Backend, Backoff, FailureAccrual, Filter, HttpRoute, HttpRouteRule, OutboundPolicy,
|
||||
WeightedService,
|
||||
|
@ -48,7 +48,7 @@ struct Namespace {
|
|||
service_port_routes: HashMap<ServicePort, ServiceRoutes>,
|
||||
/// Stores the route resources (by service name) that do not
|
||||
/// explicitly target a port.
|
||||
service_routes: HashMap<String, HashMap<GroupKindName, HttpRoute>>,
|
||||
service_routes: HashMap<String, HashMap<GroupKindNamespaceName, HttpRoute>>,
|
||||
namespace: Arc<String>,
|
||||
}
|
||||
|
||||
|
@ -66,21 +66,32 @@ struct ServicePort {
|
|||
|
||||
#[derive(Debug)]
|
||||
struct ServiceRoutes {
|
||||
routes: HashMap<GroupKindName, HttpRoute>,
|
||||
watch: watch::Sender<OutboundPolicy>,
|
||||
namespace: Arc<String>,
|
||||
name: String,
|
||||
port: NonZeroU16,
|
||||
authority: String,
|
||||
watches_by_ns: HashMap<String, RoutesWatch>,
|
||||
opaque: bool,
|
||||
accrual: Option<FailureAccrual>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RoutesWatch {
|
||||
opaque: bool,
|
||||
accrual: Option<FailureAccrual>,
|
||||
routes: HashMap<GroupKindNamespaceName, HttpRoute>,
|
||||
watch: watch::Sender<OutboundPolicy>,
|
||||
}
|
||||
|
||||
impl kubert::index::IndexNamespacedResource<api::HttpRoute> for Index {
|
||||
fn apply(&mut self, route: api::HttpRoute) {
|
||||
self.apply(HttpRouteResource::Linkerd(route))
|
||||
}
|
||||
|
||||
fn delete(&mut self, namespace: String, name: String) {
|
||||
if let Some(ns_index) = self.namespaces.by_ns.get_mut(&namespace) {
|
||||
let gkn = gkn_for_linkerd_http_route(name);
|
||||
ns_index.delete(gkn);
|
||||
let gknn = gkn_for_linkerd_http_route(name).namespaced(namespace);
|
||||
for ns_index in self.namespaces.by_ns.values_mut() {
|
||||
ns_index.delete(&gknn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -91,9 +102,9 @@ impl kubert::index::IndexNamespacedResource<k8s_gateway_api::HttpRoute> for Inde
|
|||
}
|
||||
|
||||
fn delete(&mut self, namespace: String, name: String) {
|
||||
if let Some(ns_index) = self.namespaces.by_ns.get_mut(&namespace) {
|
||||
let gkn = gkn_for_gateway_http_route(name);
|
||||
ns_index.delete(gkn);
|
||||
let gknn = gkn_for_gateway_http_route(name).namespaced(namespace);
|
||||
for ns_index in self.namespaces.by_ns.values_mut() {
|
||||
ns_index.delete(&gknn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -174,24 +185,29 @@ impl Index {
|
|||
|
||||
pub fn outbound_policy_rx(
|
||||
&mut self,
|
||||
namespace: String,
|
||||
service: String,
|
||||
port: NonZeroU16,
|
||||
service_name: String,
|
||||
service_namespace: String,
|
||||
service_port: NonZeroU16,
|
||||
source_namespace: String,
|
||||
) -> Result<watch::Receiver<OutboundPolicy>> {
|
||||
let ns = self
|
||||
.namespaces
|
||||
.by_ns
|
||||
.entry(namespace.clone())
|
||||
.entry(service_namespace.clone())
|
||||
.or_insert_with(|| Namespace {
|
||||
service_routes: Default::default(),
|
||||
service_port_routes: Default::default(),
|
||||
namespace: Arc::new(namespace.to_string()),
|
||||
namespace: Arc::new(service_namespace.to_string()),
|
||||
});
|
||||
let key = ServicePort { service, port };
|
||||
let key = ServicePort {
|
||||
service: service_name,
|
||||
port: service_port,
|
||||
};
|
||||
tracing::debug!(?key, "subscribing to service port");
|
||||
let routes =
|
||||
ns.service_routes_or_default(key, &self.namespaces.cluster_info, &self.service_info);
|
||||
Ok(routes.watch.subscribe())
|
||||
let watch = routes.watch_for_ns_or_default(source_namespace);
|
||||
Ok(watch.watch.subscribe())
|
||||
}
|
||||
|
||||
pub fn lookup_service(&self, addr: IpAddr) -> Option<ServiceRef> {
|
||||
|
@ -200,16 +216,33 @@ impl Index {
|
|||
|
||||
fn apply(&mut self, route: HttpRouteResource) {
|
||||
tracing::debug!(name = route.name(), "indexing route");
|
||||
let ns = route.namespace();
|
||||
self.namespaces
|
||||
.by_ns
|
||||
.entry(ns.clone())
|
||||
.or_insert_with(|| Namespace {
|
||||
service_routes: Default::default(),
|
||||
service_port_routes: Default::default(),
|
||||
namespace: Arc::new(ns),
|
||||
})
|
||||
.apply(route, &self.namespaces.cluster_info, &self.service_info);
|
||||
|
||||
for parent_ref in route.inner().parent_refs.iter().flatten() {
|
||||
if !is_parent_service(parent_ref) {
|
||||
continue;
|
||||
}
|
||||
if !route_accepted_by_service(route.status(), &parent_ref.name) {
|
||||
continue;
|
||||
}
|
||||
let ns = parent_ref
|
||||
.namespace
|
||||
.clone()
|
||||
.unwrap_or_else(|| route.namespace());
|
||||
self.namespaces
|
||||
.by_ns
|
||||
.entry(ns.clone())
|
||||
.or_insert_with(|| Namespace {
|
||||
service_routes: Default::default(),
|
||||
service_port_routes: Default::default(),
|
||||
namespace: Arc::new(ns),
|
||||
})
|
||||
.apply(
|
||||
route.clone(),
|
||||
parent_ref,
|
||||
&self.namespaces.cluster_info,
|
||||
&self.service_info,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -217,6 +250,7 @@ impl Namespace {
|
|||
fn apply(
|
||||
&mut self,
|
||||
route: HttpRouteResource,
|
||||
parent_ref: &ParentReference,
|
||||
cluster_info: &ClusterInfo,
|
||||
service_info: &HashMap<ServiceRef, ServiceInfo>,
|
||||
) {
|
||||
|
@ -230,45 +264,36 @@ impl Namespace {
|
|||
};
|
||||
tracing::debug!(?outbound_route);
|
||||
|
||||
for parent_ref in route.inner().parent_refs.iter().flatten() {
|
||||
if !is_parent_service(parent_ref) {
|
||||
continue;
|
||||
}
|
||||
if !route_accepted_by_service(route.status(), &parent_ref.name) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let port = parent_ref.port.and_then(NonZeroU16::new);
|
||||
if let Some(port) = port {
|
||||
let service_port = ServicePort {
|
||||
port,
|
||||
service: parent_ref.name.clone(),
|
||||
};
|
||||
tracing::debug!(
|
||||
?service_port,
|
||||
route = route.name(),
|
||||
"inserting route for service"
|
||||
);
|
||||
let service_routes =
|
||||
self.service_routes_or_default(service_port, cluster_info, service_info);
|
||||
service_routes.apply(route.gkn(), outbound_route.clone());
|
||||
} else {
|
||||
// If the parent_ref doesn't include a port, apply this route
|
||||
// to all ServiceRoutes which match the Service name.
|
||||
self.service_port_routes.iter_mut().for_each(
|
||||
|(ServicePort { service, port: _ }, routes)| {
|
||||
if service == &parent_ref.name {
|
||||
routes.apply(route.gkn(), outbound_route.clone());
|
||||
}
|
||||
},
|
||||
);
|
||||
// Also add the route to the list of routes that target the
|
||||
// Service without specifying a port.
|
||||
self.service_routes
|
||||
.entry(parent_ref.name.clone())
|
||||
.or_default()
|
||||
.insert(route.gkn(), outbound_route.clone());
|
||||
}
|
||||
let port = parent_ref.port.and_then(NonZeroU16::new);
|
||||
if let Some(port) = port {
|
||||
let service_port = ServicePort {
|
||||
port,
|
||||
service: parent_ref.name.clone(),
|
||||
};
|
||||
tracing::debug!(
|
||||
?service_port,
|
||||
route = route.name(),
|
||||
"inserting route for service"
|
||||
);
|
||||
let service_routes =
|
||||
self.service_routes_or_default(service_port, cluster_info, service_info);
|
||||
service_routes.apply(route.gknn(), outbound_route);
|
||||
} else {
|
||||
// If the parent_ref doesn't include a port, apply this route
|
||||
// to all ServiceRoutes which match the Service name.
|
||||
self.service_port_routes.iter_mut().for_each(
|
||||
|(ServicePort { service, port: _ }, routes)| {
|
||||
if service == &parent_ref.name {
|
||||
routes.apply(route.gknn(), outbound_route.clone());
|
||||
}
|
||||
},
|
||||
);
|
||||
// Also add the route to the list of routes that target the
|
||||
// Service without specifying a port.
|
||||
self.service_routes
|
||||
.entry(parent_ref.name.clone())
|
||||
.or_default()
|
||||
.insert(route.gknn(), outbound_route);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -284,12 +309,12 @@ impl Namespace {
|
|||
}
|
||||
}
|
||||
|
||||
fn delete(&mut self, gkn: GroupKindName) {
|
||||
fn delete(&mut self, gknn: &GroupKindNamespaceName) {
|
||||
for service in self.service_port_routes.values_mut() {
|
||||
service.delete(&gkn);
|
||||
service.delete(gknn);
|
||||
}
|
||||
for routes in self.service_routes.values_mut() {
|
||||
routes.remove(&gkn);
|
||||
routes.remove(gknn);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -321,21 +346,42 @@ impl Namespace {
|
|||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
let (sender, _) = watch::channel(OutboundPolicy {
|
||||
http_routes: routes.clone(),
|
||||
let mut service_routes = ServiceRoutes {
|
||||
opaque,
|
||||
accrual,
|
||||
authority,
|
||||
name: sp.service.clone(),
|
||||
namespace: self.namespace.to_string(),
|
||||
namespace: self.namespace.clone(),
|
||||
name: sp.service,
|
||||
port: sp.port,
|
||||
opaque,
|
||||
accrual,
|
||||
});
|
||||
ServiceRoutes {
|
||||
routes,
|
||||
watch: sender,
|
||||
opaque,
|
||||
accrual,
|
||||
watches_by_ns: Default::default(),
|
||||
};
|
||||
|
||||
// Producer routes are routes in the same namespace as their
|
||||
// parent service. Consumer routes are routes in other
|
||||
// namespaces.
|
||||
let (producer_routes, consumer_routes): (Vec<_>, Vec<_>) = routes
|
||||
.into_iter()
|
||||
.partition(|(gknn, _route)| *gknn.namespace == *self.namespace);
|
||||
for (gknn, route) in consumer_routes {
|
||||
// Consumer routes should only apply to watches from the
|
||||
// consumer namespace.
|
||||
let watch = service_routes.watch_for_ns_or_default(gknn.namespace.to_string());
|
||||
watch.routes.insert(gknn, route);
|
||||
}
|
||||
for (gknn, route) in producer_routes {
|
||||
// Insert the route into the producer namespace.
|
||||
let watch = service_routes.watch_for_ns_or_default(gknn.namespace.to_string());
|
||||
watch.routes.insert(gknn.clone(), route.clone());
|
||||
// Producer routes apply to clients in all namespaces, so
|
||||
// apply it to watches for all other namespaces too.
|
||||
for (ns, watch) in service_routes.watches_by_ns.iter_mut() {
|
||||
if ns != &gknn.namespace {
|
||||
watch.routes.insert(gknn.clone(), route.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
service_routes
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -662,22 +708,75 @@ fn is_service(group: Option<&str>, kind: &str) -> bool {
|
|||
}
|
||||
|
||||
impl ServiceRoutes {
|
||||
fn apply(&mut self, gkn: GroupKindName, route: HttpRoute) {
|
||||
self.routes.insert(gkn, route);
|
||||
self.send_if_modified();
|
||||
fn watch_for_ns_or_default(&mut self, namespace: String) -> &mut RoutesWatch {
|
||||
// The routes from the producer namespace apply to watches in all
|
||||
// namespaces so we copy them.
|
||||
let routes = self
|
||||
.watches_by_ns
|
||||
.get(self.namespace.as_ref())
|
||||
.map(|watch| watch.routes.clone())
|
||||
.unwrap_or_default();
|
||||
self.watches_by_ns.entry(namespace).or_insert_with(|| {
|
||||
let (sender, _) = watch::channel(OutboundPolicy {
|
||||
http_routes: Default::default(),
|
||||
authority: self.authority.clone(),
|
||||
name: self.name.to_string(),
|
||||
namespace: self.namespace.to_string(),
|
||||
port: self.port,
|
||||
opaque: self.opaque,
|
||||
accrual: self.accrual,
|
||||
});
|
||||
RoutesWatch {
|
||||
opaque: self.opaque,
|
||||
accrual: self.accrual,
|
||||
routes,
|
||||
watch: sender,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn apply(&mut self, gknn: GroupKindNamespaceName, route: HttpRoute) {
|
||||
if *gknn.namespace == *self.namespace {
|
||||
// This is a producer namespace route.
|
||||
let watch = self.watch_for_ns_or_default(gknn.namespace.to_string());
|
||||
watch.routes.insert(gknn.clone(), route.clone());
|
||||
watch.send_if_modified();
|
||||
// Producer routes apply to clients in all namespaces, so
|
||||
// apply it to watches for all other namespaces too.
|
||||
for (ns, watch) in self.watches_by_ns.iter_mut() {
|
||||
if ns != &gknn.namespace {
|
||||
watch.routes.insert(gknn.clone(), route.clone());
|
||||
watch.send_if_modified();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// This is a consumer namespace route and should only apply to
|
||||
// watches from that namespace.
|
||||
let watch = self.watch_for_ns_or_default(gknn.namespace.to_string());
|
||||
watch.routes.insert(gknn, route);
|
||||
watch.send_if_modified();
|
||||
}
|
||||
}
|
||||
|
||||
fn update_service(&mut self, opaque: bool, accrual: Option<FailureAccrual>) {
|
||||
self.opaque = opaque;
|
||||
self.accrual = accrual;
|
||||
self.send_if_modified();
|
||||
for watch in self.watches_by_ns.values_mut() {
|
||||
watch.opaque = opaque;
|
||||
watch.accrual = accrual;
|
||||
watch.send_if_modified();
|
||||
}
|
||||
}
|
||||
|
||||
fn delete(&mut self, gkn: &GroupKindName) {
|
||||
self.routes.remove(gkn);
|
||||
self.send_if_modified();
|
||||
fn delete(&mut self, gknn: &GroupKindNamespaceName) {
|
||||
for watch in self.watches_by_ns.values_mut() {
|
||||
watch.routes.remove(gknn);
|
||||
watch.send_if_modified();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RoutesWatch {
|
||||
fn send_if_modified(&mut self) {
|
||||
self.watch.send_if_modified(|policy| {
|
||||
let mut modified = false;
|
||||
|
|
|
@ -8,7 +8,7 @@ use linkerd_policy_controller_core::inbound::{
|
|||
DiscoverInboundServer, InboundServer, InboundServerStream,
|
||||
};
|
||||
use linkerd_policy_controller_core::outbound::{
|
||||
DiscoverOutboundPolicy, OutboundPolicy, OutboundPolicyStream,
|
||||
DiscoverOutboundPolicy, OutboundDiscoverTarget, OutboundPolicy, OutboundPolicyStream,
|
||||
};
|
||||
pub use linkerd_policy_controller_core::IpNet;
|
||||
pub use linkerd_policy_controller_grpc as grpc;
|
||||
|
@ -60,12 +60,22 @@ impl DiscoverInboundServer<(String, String, NonZeroU16)> for InboundDiscover {
|
|||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl DiscoverOutboundPolicy<(String, String, NonZeroU16)> for OutboundDiscover {
|
||||
impl DiscoverOutboundPolicy<OutboundDiscoverTarget> for OutboundDiscover {
|
||||
async fn get_outbound_policy(
|
||||
&self,
|
||||
(namespace, service, port): (String, String, NonZeroU16),
|
||||
OutboundDiscoverTarget {
|
||||
service_name,
|
||||
service_namespace,
|
||||
service_port,
|
||||
source_namespace,
|
||||
}: OutboundDiscoverTarget,
|
||||
) -> Result<Option<OutboundPolicy>> {
|
||||
let rx = match self.0.write().outbound_policy_rx(namespace, service, port) {
|
||||
let rx = match self.0.write().outbound_policy_rx(
|
||||
service_name,
|
||||
service_namespace,
|
||||
service_port,
|
||||
source_namespace,
|
||||
) {
|
||||
Ok(rx) => rx,
|
||||
Err(error) => {
|
||||
tracing::error!(%error, "failed to get outbound policy rx");
|
||||
|
@ -78,18 +88,40 @@ impl DiscoverOutboundPolicy<(String, String, NonZeroU16)> for OutboundDiscover {
|
|||
|
||||
async fn watch_outbound_policy(
|
||||
&self,
|
||||
(namespace, service, port): (String, String, NonZeroU16),
|
||||
OutboundDiscoverTarget {
|
||||
service_name,
|
||||
service_namespace,
|
||||
service_port,
|
||||
source_namespace,
|
||||
}: OutboundDiscoverTarget,
|
||||
) -> Result<Option<OutboundPolicyStream>> {
|
||||
match self.0.write().outbound_policy_rx(namespace, service, port) {
|
||||
match self.0.write().outbound_policy_rx(
|
||||
service_name,
|
||||
service_namespace,
|
||||
service_port,
|
||||
source_namespace,
|
||||
) {
|
||||
Ok(rx) => Ok(Some(Box::pin(tokio_stream::wrappers::WatchStream::new(rx)))),
|
||||
Err(_) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn lookup_ip(&self, addr: IpAddr, port: NonZeroU16) -> Option<(String, String, NonZeroU16)> {
|
||||
fn lookup_ip(
|
||||
&self,
|
||||
addr: IpAddr,
|
||||
port: NonZeroU16,
|
||||
source_namespace: String,
|
||||
) -> Option<OutboundDiscoverTarget> {
|
||||
self.0
|
||||
.read()
|
||||
.lookup_service(addr)
|
||||
.map(|outbound::ServiceRef { namespace, name }| (namespace, name, port))
|
||||
.map(
|
||||
|outbound::ServiceRef { name, namespace }| OutboundDiscoverTarget {
|
||||
service_name: name,
|
||||
service_namespace: namespace,
|
||||
service_port: port,
|
||||
source_namespace,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -263,7 +263,7 @@ impl Running {
|
|||
&self.name,
|
||||
|obj: Option<&k8s::Pod>| -> bool { obj.and_then(get_exit_code).is_some() },
|
||||
);
|
||||
let pod = match time::timeout(time::Duration::from_secs(30), finished).await {
|
||||
let pod = match time::timeout(time::Duration::from_secs(60), finished).await {
|
||||
Ok(Ok(Some(pod))) => pod,
|
||||
Ok(Ok(None)) => unreachable!("Condition must wait for pod"),
|
||||
Ok(Err(error)) => panic!("Failed to wait for exit code: {}: {}", self.name, error),
|
||||
|
|
|
@ -848,48 +848,226 @@ async fn http_route_with_no_port() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
let mut rx_4191 = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config_4191 = rx_4191
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config);
|
||||
tracing::trace!(?config_4191);
|
||||
|
||||
let mut rx_9999 = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
|
||||
let config_9999 = rx_9999
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config_9999);
|
||||
|
||||
// There should be a default route.
|
||||
detect_http_routes(&config, |routes| {
|
||||
detect_http_routes(&config_4191, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
detect_http_routes(&config_9999, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 9999);
|
||||
});
|
||||
|
||||
let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await;
|
||||
|
||||
let config = rx
|
||||
let config_4191 = rx_4191
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an updated config");
|
||||
tracing::trace!(?config);
|
||||
tracing::trace!(?config_4191);
|
||||
|
||||
// The route should apply to the service.
|
||||
detect_http_routes(&config, |routes| {
|
||||
detect_http_routes(&config_4191, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
|
||||
let config_9999 = rx_9999
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an updated config");
|
||||
tracing::trace!(?config_9999);
|
||||
|
||||
// The route should apply to other ports too.
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
|
||||
let config = rx
|
||||
detect_http_routes(&config_9999, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn producer_route() {
|
||||
with_temp_ns(|client, ns| async move {
|
||||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let producer_config = producer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config);
|
||||
tracing::trace!(?producer_config);
|
||||
|
||||
detect_http_routes(&config, |routes| {
|
||||
let mut consumer_rx = retry_watch_outbound_policy(&client, "consumer_ns", &svc, 4191).await;
|
||||
let consumer_config = consumer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?consumer_config);
|
||||
|
||||
// There should be a default route.
|
||||
detect_http_routes(&producer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
detect_http_routes(&consumer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
|
||||
// A route created in the same namespace as its parent service is called
|
||||
// a producer route. It should be returned in outbound policy requests
|
||||
// for that service from ALL namespaces.
|
||||
let _route = create(
|
||||
&client,
|
||||
mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let producer_config = producer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an updated config");
|
||||
tracing::trace!(?producer_config);
|
||||
let consumer_config = consumer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?consumer_config);
|
||||
|
||||
// The route should be returned in queries from the producer namespace.
|
||||
detect_http_routes(&producer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
|
||||
// The route should be returned in queries from a consumer namespace.
|
||||
detect_http_routes(&consumer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn consumer_route() {
|
||||
with_temp_ns(|client, ns| async move {
|
||||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let consumer_ns_name = format!("{}-consumer", ns);
|
||||
let consumer_ns = create_cluster_scoped(
|
||||
&client,
|
||||
k8s::Namespace {
|
||||
metadata: k8s::ObjectMeta {
|
||||
name: Some(consumer_ns_name.clone()),
|
||||
labels: Some(convert_args!(btreemap!(
|
||||
"linkerd-policy-test" => std::thread::current().name().unwrap_or(""),
|
||||
))),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let producer_config = producer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?producer_config);
|
||||
|
||||
let mut consumer_rx =
|
||||
retry_watch_outbound_policy(&client, &consumer_ns_name, &svc, 4191).await;
|
||||
let consumer_config = consumer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?consumer_config);
|
||||
|
||||
let mut other_rx = retry_watch_outbound_policy(&client, "other_ns", &svc, 4191).await;
|
||||
let other_config = other_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?other_config);
|
||||
|
||||
// There should be a default route.
|
||||
detect_http_routes(&producer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
detect_http_routes(&consumer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
detect_http_routes(&other_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
|
||||
// A route created in a different namespace as its parent service is
|
||||
// called a consumer route. It should be returned in outbound policy
|
||||
// requests for that service ONLY when the request comes from the
|
||||
// consumer namespace.
|
||||
let _route = create(
|
||||
&client,
|
||||
mk_http_route(&consumer_ns_name, "foo-route", &svc, Some(4191)).build(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// The route should NOT be returned in queries from the producer namespace.
|
||||
// There should be a default route.
|
||||
assert!(producer_rx.next().now_or_never().is_none());
|
||||
|
||||
// The route should be returned in queries from the same consumer
|
||||
// namespace.
|
||||
let consumer_config = consumer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?consumer_config);
|
||||
|
||||
detect_http_routes(&consumer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
|
||||
// The route should NOT be returned in queries from a different consumer
|
||||
// namespace.
|
||||
assert!(other_rx.next().now_or_never().is_none());
|
||||
|
||||
delete_cluster_scoped(&client, consumer_ns).await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
|
|
@ -865,48 +865,225 @@ async fn http_route_with_no_port() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
let mut rx_4191 = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config_4191 = rx_4191
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config);
|
||||
tracing::trace!(?config_4191);
|
||||
|
||||
let mut rx_9999 = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
|
||||
let config_9999 = rx_9999
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config_9999);
|
||||
|
||||
// There should be a default route.
|
||||
detect_http_routes(&config, |routes| {
|
||||
detect_http_routes(&config_4191, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
detect_http_routes(&config_9999, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 9999);
|
||||
});
|
||||
|
||||
let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await;
|
||||
|
||||
let config = rx
|
||||
let config_4191 = rx_4191
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an updated config");
|
||||
tracing::trace!(?config);
|
||||
tracing::trace!(?config_4191);
|
||||
|
||||
// The route should apply to the service.
|
||||
detect_http_routes(&config, |routes| {
|
||||
detect_http_routes(&config_4191, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
|
||||
let config_9999 = rx_9999
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an updated config");
|
||||
tracing::trace!(?config_9999);
|
||||
|
||||
// The route should apply to other ports too.
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
|
||||
let config = rx
|
||||
detect_http_routes(&config_9999, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn producer_route() {
|
||||
with_temp_ns(|client, ns| async move {
|
||||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let producer_config = producer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config);
|
||||
tracing::trace!(?producer_config);
|
||||
|
||||
detect_http_routes(&config, |routes| {
|
||||
let mut consumer_rx = retry_watch_outbound_policy(&client, "consumer_ns", &svc, 4191).await;
|
||||
let consumer_config = consumer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?consumer_config);
|
||||
|
||||
// There should be a default route.
|
||||
detect_http_routes(&producer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
detect_http_routes(&consumer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
|
||||
// A route created in the same namespace as its parent service is called
|
||||
// a producer route. It should be returned in outbound policy requests
|
||||
// for that service from ALL namespaces.
|
||||
let _route = create(
|
||||
&client,
|
||||
mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let producer_config = producer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an updated config");
|
||||
tracing::trace!(?producer_config);
|
||||
let consumer_config = consumer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?consumer_config);
|
||||
|
||||
// The route should be returned in queries from the producer namespace.
|
||||
detect_http_routes(&producer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
|
||||
// The route should be returned in queries from a consumer namespace.
|
||||
detect_http_routes(&consumer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn consumer_route() {
|
||||
with_temp_ns(|client, ns| async move {
|
||||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let consumer_ns_name = format!("{}-consumer", ns);
|
||||
let consumer_ns = create_cluster_scoped(
|
||||
&client,
|
||||
k8s::Namespace {
|
||||
metadata: k8s::ObjectMeta {
|
||||
name: Some(consumer_ns_name.clone()),
|
||||
labels: Some(convert_args!(btreemap!(
|
||||
"linkerd-policy-test" => std::thread::current().name().unwrap_or(""),
|
||||
))),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut producer_rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let producer_config = producer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?producer_config);
|
||||
|
||||
let mut consumer_rx =
|
||||
retry_watch_outbound_policy(&client, &consumer_ns_name, &svc, 4191).await;
|
||||
let consumer_config = consumer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?consumer_config);
|
||||
|
||||
let mut other_rx = retry_watch_outbound_policy(&client, "other_ns", &svc, 4191).await;
|
||||
let other_config = other_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?other_config);
|
||||
|
||||
// There should be a default route.
|
||||
detect_http_routes(&producer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
detect_http_routes(&consumer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
detect_http_routes(&other_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
|
||||
// A route created in a different namespace as its parent service is
|
||||
// called a consumer route. It should be returned in outbound policy
|
||||
// requests for that service ONLY when the request comes from the
|
||||
// consumer namespace.
|
||||
let _route = create(
|
||||
&client,
|
||||
mk_http_route(&consumer_ns_name, "foo-route", &svc, Some(4191)).build(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// The route should NOT be returned in queries from the producer namespace.
|
||||
// There should be a default route.
|
||||
assert!(producer_rx.next().now_or_never().is_none());
|
||||
|
||||
// The route should be returned in queries from the same consumer
|
||||
// namespace.
|
||||
let consumer_config = consumer_rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?consumer_config);
|
||||
detect_http_routes(&consumer_config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
|
||||
// The route should NOT be returned in queries from a different consumer
|
||||
// namespace.
|
||||
assert!(other_rx.next().now_or_never().is_none());
|
||||
|
||||
delete_cluster_scoped(&client, consumer_ns).await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue