Use namespace field of backend ref when it is set (#10909)

When the `namespace` field of a `backend_ref` of an `HttpRoute` is set, Linkerd ignores this field and instead assumes that the backend is in the same namespace as the parent `Service`.  

To properly handle the case where the backend is in a different namespace from the parent `Service`, we change the way that service metadata is stored in the policy controller outbound index.  Instead of keeping a separate service metadata map per namespace, we maintain one global service metadata map which is shared between all namespaces using an RwLock.  This allows us to make the two necessary changes:

1. When validating the existence of a backend service, we now look for it in the appropriate namespace instead of the Service's namespace
2. When constructing the backend authority, we use the appropriate namespace instead of the Service's namespace

We also add an API test for this situation.

Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
Alex Leong 2023-05-19 09:32:01 -07:00 committed by GitHub
parent d3b969cffb
commit 97f161c262
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 167 additions and 40 deletions

View File

@ -11,18 +11,19 @@ use linkerd_policy_controller_core::outbound::{
};
use linkerd_policy_controller_k8s_api::{policy as api, ResourceExt, Service, Time};
use parking_lot::RwLock;
use std::{net::IpAddr, num::NonZeroU16, sync::Arc, time};
use std::{hash::Hash, net::IpAddr, num::NonZeroU16, sync::Arc, time};
use tokio::sync::watch;
#[derive(Debug)]
pub struct Index {
namespaces: NamespaceIndex,
services: HashMap<IpAddr, ServiceRef>,
services_by_ip: HashMap<IpAddr, ServiceRef>,
service_info: HashMap<ServiceRef, ServiceInfo>,
}
pub type SharedIndex = Arc<RwLock<Index>>;
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct ServiceRef {
pub name: String,
pub namespace: String,
@ -39,7 +40,6 @@ struct NamespaceIndex {
struct Namespace {
service_routes: HashMap<ServicePort, ServiceRoutes>,
namespace: Arc<String>,
services: HashMap<String, ServiceInfo>,
}
#[derive(Debug, Default)]
@ -72,9 +72,8 @@ impl kubert::index::IndexNamespacedResource<api::HttpRoute> for Index {
.or_insert_with(|| Namespace {
service_routes: Default::default(),
namespace: Arc::new(ns),
services: Default::default(),
})
.apply(route, &self.namespaces.cluster_info);
.apply(route, &self.namespaces.cluster_info, &self.service_info);
}
fn delete(&mut self, namespace: String, name: String) {
@ -107,7 +106,7 @@ impl kubert::index::IndexNamespacedResource<Service> for Index {
name,
namespace: ns.clone(),
};
self.services.insert(addr, service_ref);
self.services_by_ip.insert(addr, service_ref);
}
Err(error) => {
tracing::error!(%error, service=name, cluster_ip, "invalid cluster ip");
@ -126,17 +125,22 @@ impl kubert::index::IndexNamespacedResource<Service> for Index {
.or_insert_with(|| Namespace {
service_routes: Default::default(),
namespace: Arc::new(ns),
services: Default::default(),
})
.update_service(service.name_unchecked(), service_info);
.update_service(service.name_unchecked(), &service_info);
self.service_info.insert(
ServiceRef {
name: service.name_unchecked(),
namespace: service.namespace().expect("Service must have Namespace"),
},
service_info,
);
}
fn delete(&mut self, namespace: String, name: String) {
if let Some(ns) = self.namespaces.by_ns.get_mut(&namespace) {
ns.services.remove(&name);
}
let service_ref = ServiceRef { name, namespace };
self.services.retain(|_, v| *v != service_ref);
self.service_info.remove(&service_ref);
self.services_by_ip.retain(|_, v| *v != service_ref);
}
}
@ -147,7 +151,8 @@ impl Index {
by_ns: HashMap::default(),
cluster_info,
},
services: HashMap::default(),
services_by_ip: HashMap::default(),
service_info: HashMap::default(),
}))
}
@ -164,24 +169,29 @@ impl Index {
.or_insert_with(|| Namespace {
service_routes: Default::default(),
namespace: Arc::new(namespace.to_string()),
services: Default::default(),
});
let key = ServicePort { service, port };
tracing::debug!(?key, "subscribing to service port");
let routes = ns.service_routes_or_default(key, &self.namespaces.cluster_info);
let routes =
ns.service_routes_or_default(key, &self.namespaces.cluster_info, &self.service_info);
Ok(routes.watch.subscribe())
}
pub fn lookup_service(&self, addr: IpAddr) -> Option<ServiceRef> {
self.services.get(&addr).cloned()
self.services_by_ip.get(&addr).cloned()
}
}
impl Namespace {
fn apply(&mut self, route: api::HttpRoute, cluster_info: &ClusterInfo) {
fn apply(
&mut self,
route: api::HttpRoute,
cluster_info: &ClusterInfo,
service_info: &HashMap<ServiceRef, ServiceInfo>,
) {
tracing::debug!(?route);
let name = route.name_unchecked();
let outbound_route = match self.convert_route(route.clone(), cluster_info) {
let outbound_route = match self.convert_route(route.clone(), cluster_info, service_info) {
Ok(route) => route,
Err(error) => {
tracing::error!(%error, "failed to convert HttpRoute");
@ -209,7 +219,8 @@ impl Namespace {
route = route.name_unchecked(),
"inserting route for service"
);
let service_routes = self.service_routes_or_default(service_port, cluster_info);
let service_routes =
self.service_routes_or_default(service_port, cluster_info, service_info);
service_routes.apply(name.clone(), outbound_route.clone());
} else {
tracing::warn!(?parent_ref, "ignoring parent_ref with port 0");
@ -220,7 +231,7 @@ impl Namespace {
}
}
fn update_service(&mut self, name: String, service: ServiceInfo) {
fn update_service(&mut self, name: String, service: &ServiceInfo) {
tracing::debug!(?name, ?service, "updating service");
for (svc_port, svc_routes) in self.service_routes.iter_mut() {
if svc_port.service != name {
@ -230,7 +241,6 @@ impl Namespace {
svc_routes.update_service(opaque, service.accrual);
}
self.services.insert(name, service);
}
fn delete(&mut self, name: String) {
@ -243,10 +253,15 @@ impl Namespace {
&mut self,
sp: ServicePort,
cluster: &ClusterInfo,
service_info: &HashMap<ServiceRef, ServiceInfo>,
) -> &mut ServiceRoutes {
self.service_routes.entry(sp.clone()).or_insert_with(|| {
let authority = cluster.service_dns_authority(&self.namespace, &sp.service, sp.port);
let (opaque, accrual) = match self.services.get(&sp.service) {
let service_ref = ServiceRef {
name: sp.service.clone(),
namespace: self.namespace.to_string(),
};
let (opaque, accrual) = match service_info.get(&service_ref) {
Some(svc) => (svc.opaque_ports.contains(&sp.port), svc.accrual),
None => (false, None),
};
@ -269,7 +284,12 @@ impl Namespace {
})
}
fn convert_route(&self, route: api::HttpRoute, cluster: &ClusterInfo) -> Result<HttpRoute> {
fn convert_route(
&self,
route: api::HttpRoute,
cluster: &ClusterInfo,
service_info: &HashMap<ServiceRef, ServiceInfo>,
) -> Result<HttpRoute> {
let hostnames = route
.spec
.hostnames
@ -283,7 +303,7 @@ impl Namespace {
.rules
.into_iter()
.flatten()
.map(|r| self.convert_rule(r, cluster))
.map(|r| self.convert_rule(r, cluster, service_info))
.collect::<Result<_>>()?;
let creation_timestamp = route.metadata.creation_timestamp.map(|Time(t)| t);
@ -299,6 +319,7 @@ impl Namespace {
&self,
rule: api::httproute::HttpRouteRule,
cluster: &ClusterInfo,
service_info: &HashMap<ServiceRef, ServiceInfo>,
) -> Result<HttpRouteRule> {
let matches = rule
.matches
@ -311,7 +332,7 @@ impl Namespace {
.backend_refs
.into_iter()
.flatten()
.filter_map(|b| convert_backend(&self.namespace, b, cluster, &self.services))
.filter_map(|b| convert_backend(&self.namespace, b, cluster, service_info))
.collect();
Ok(HttpRouteRule { matches, backends })
}
@ -321,7 +342,7 @@ fn convert_backend(
ns: &str,
backend: HttpBackendRef,
cluster: &ClusterInfo,
services: &HashMap<String, ServiceInfo>,
services: &HashMap<ServiceRef, ServiceInfo>,
) -> Option<Backend> {
backend.backend_ref.map(|backend| {
if !is_backend_service(&backend.inner) {
@ -354,8 +375,11 @@ fn convert_backend(
}
}
};
if !services.contains_key(&name) {
let service_ref = ServiceRef {
name: name.clone(),
namespace: backend.inner.namespace.unwrap_or_else(|| ns.to_string()),
};
if !services.contains_key(&service_ref) {
return Backend::Invalid {
weight: weight.into(),
message: format!("Service not found {name}"),
@ -364,7 +388,7 @@ fn convert_backend(
Backend::Service(WeightedService {
weight: weight.into(),
authority: cluster.service_dns_authority(ns, &name, port),
authority: cluster.service_dns_authority(&service_ref.namespace, &name, port),
name,
namespace: ns.to_string(),
port,

View File

@ -19,7 +19,7 @@ pub enum LinkerdInject {
}
/// Creates a cluster-scoped resource.
async fn create_cluster_scoped<T>(client: &kube::Client, obj: T) -> T
pub async fn create_cluster_scoped<T>(client: &kube::Client, obj: T) -> T
where
T: kube::Resource<Scope = kube::core::ClusterResourceScope>,
T: serde::Serialize + serde::de::DeserializeOwned + Clone + std::fmt::Debug,
@ -36,6 +36,21 @@ where
.expect("failed to create resource")
}
/// Creates a cluster-scoped resource.
pub async fn delete_cluster_scoped<T>(client: &kube::Client, obj: T)
where
T: kube::Resource<Scope = kube::core::ClusterResourceScope>,
T: serde::Serialize + serde::de::DeserializeOwned + Clone + std::fmt::Debug,
T::DynamicType: Default,
{
let params = kube::api::DeleteParams {
..Default::default()
};
let api = kube::Api::<T>::all(client.clone());
tracing::trace!(?obj, "Deleting");
api.delete(&obj.name_unchecked(), &params).await.unwrap();
}
/// Creates a namespace-scoped resource.
pub async fn create<T>(client: &kube::Client, obj: T) -> T
where

View File

@ -4,9 +4,10 @@ use futures::prelude::*;
use kube::ResourceExt;
use linkerd_policy_controller_k8s_api as k8s;
use linkerd_policy_test::{
assert_default_accrual_backoff, create, create_annotated_service, create_opaque_service,
create_service, grpc, mk_service, with_temp_ns,
assert_default_accrual_backoff, create, create_annotated_service, create_cluster_scoped,
create_opaque_service, create_service, delete_cluster_scoped, grpc, mk_service, with_temp_ns,
};
use maplit::{btreemap, convert_args};
use tokio::time;
#[tokio::test(flavor = "current_thread")]
@ -108,7 +109,11 @@ async fn service_with_http_routes_without_backends() {
assert_route_is_default(route, &svc, 4191);
});
let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, 4191, None)).await;
let _route = create(
&client,
mk_http_route(&ns, "foo-route", &svc, 4191, None, None),
)
.await;
let config = rx
.next()
@ -149,11 +154,11 @@ async fn service_with_http_routes_with_backend() {
});
let backend_name = "backend";
let _backend_svc = create_service(&client, &ns, backend_name, 8888).await;
let backend_svc = create_service(&client, &ns, backend_name, 8888).await;
let backends = [backend_name];
let _route = create(
&client,
mk_http_route(&ns, "foo-route", &svc, 4191, Some(&backends)),
mk_http_route(&ns, "foo-route", &svc, 4191, Some(&backends), None),
)
.await;
@ -169,6 +174,7 @@ async fn service_with_http_routes_with_backend() {
let route = assert_singleton(routes);
let backends = route_backends_random_available(route);
let backend = assert_singleton(backends);
assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
let filters = &backend.backend.as_ref().unwrap().filters;
assert_eq!(filters.len(), 0);
});
@ -176,6 +182,79 @@ async fn service_with_http_routes_with_backend() {
.await;
}
#[tokio::test(flavor = "current_thread")]
async fn service_with_http_routes_with_cross_namespace_backend() {
with_temp_ns(|client, ns| async move {
// Create a service
let svc = create_service(&client, &ns, "my-svc", 4191).await;
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);
// There should be a default route.
detect_http_routes(&config, |routes| {
let route = assert_singleton(routes);
assert_route_is_default(route, &svc, 4191);
});
let backend_ns_name = format!("{}-backend", ns);
let backend_ns = create_cluster_scoped(
&client,
k8s::Namespace {
metadata: k8s::ObjectMeta {
name: Some(backend_ns_name.clone()),
labels: Some(convert_args!(btreemap!(
"linkerd-policy-test" => std::thread::current().name().unwrap_or(""),
))),
..Default::default()
},
..Default::default()
},
)
.await;
let backend_name = "backend";
let backend_svc = create_service(&client, &backend_ns_name, backend_name, 8888).await;
let backends = [backend_name];
let _route = create(
&client,
mk_http_route(
&ns,
"foo-route",
&svc,
4191,
Some(&backends),
Some(backend_ns_name),
),
)
.await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an updated config");
tracing::trace!(?config);
// There should be a route with a backend with no filters.
detect_http_routes(&config, |routes| {
let route = assert_singleton(routes);
let backends = route_backends_random_available(route);
let backend = assert_singleton(backends);
assert_backend_matches_service(backend.backend.as_ref().unwrap(), &backend_svc, 8888);
let filters = &backend.backend.as_ref().unwrap().filters;
assert_eq!(filters.len(), 0);
});
delete_cluster_scoped(&client, backend_ns).await
})
.await;
}
// TODO: Test fails until handling of invalid backends is implemented.
#[tokio::test(flavor = "current_thread")]
async fn service_with_http_routes_with_invalid_backend() {
@ -200,7 +279,7 @@ async fn service_with_http_routes_with_invalid_backend() {
let backends = ["invalid-backend"];
let _route = create(
&client,
mk_http_route(&ns, "foo-route", &svc, 4191, Some(&backends)),
mk_http_route(&ns, "foo-route", &svc, 4191, Some(&backends), None),
)
.await;
@ -247,7 +326,11 @@ async fn service_with_multiple_http_routes() {
// Routes should be returned in sorted order by creation timestamp then
// name. To ensure that this test isn't timing dependant, routes should
// be created in alphabetical order.
let _a_route = create(&client, mk_http_route(&ns, "a-route", &svc, 4191, None)).await;
let _a_route = create(
&client,
mk_http_route(&ns, "a-route", &svc, 4191, None, None),
)
.await;
// First route update.
let config = rx
@ -257,7 +340,11 @@ async fn service_with_multiple_http_routes() {
.expect("watch must return an updated config");
tracing::trace!(?config);
let _b_route = create(&client, mk_http_route(&ns, "b-route", &svc, 4191, None)).await;
let _b_route = create(
&client,
mk_http_route(&ns, "b-route", &svc, 4191, None, None),
)
.await;
// Second route update.
let config = rx
@ -568,6 +655,7 @@ fn mk_http_route(
svc: &k8s::Service,
port: u16,
backends: Option<&[&str]>,
backends_ns: Option<String>,
) -> k8s::policy::HttpRoute {
use k8s::policy::httproute as api;
let backend_refs = backends.map(|names| {
@ -581,7 +669,7 @@ fn mk_http_route(
port: Some(8888),
group: None,
kind: None,
namespace: None,
namespace: backends_ns.clone(),
},
}),
filters: None,