diff --git a/policy-controller/core/src/http_route.rs b/policy-controller/core/src/http_route.rs index 738c36ad8..be0ea740b 100644 --- a/policy-controller/core/src/http_route.rs +++ b/policy-controller/core/src/http_route.rs @@ -8,8 +8,7 @@ pub use http::{ Method, StatusCode, }; use regex::Regex; -use std::net::IpAddr; -use std::num::NonZeroU16; +use std::{net::IpAddr, num::NonZeroU16}; #[derive(Clone, Debug, PartialEq, Eq)] pub struct InboundHttpRoute { @@ -54,7 +53,7 @@ pub struct OutboundHttpRouteRule { pub enum Backend { Addr(WeightedAddr), Dst(WeightedDst), - InvalidDst(WeightedDst), + InvalidDst { weight: u32, message: String }, } #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/policy-controller/grpc/src/lib.rs b/policy-controller/grpc/src/lib.rs index 7cf15c75a..ae613ad71 100644 --- a/policy-controller/grpc/src/lib.rs +++ b/policy-controller/grpc/src/lib.rs @@ -725,6 +725,7 @@ fn convert_outbound_http_route( fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRouteBackend { match backend { Backend::Addr(addr) => outbound::http_route::WeightedRouteBackend { + weight: addr.weight, backend: Some(outbound::http_route::RouteBackend { backend: Some(outbound::Backend { metadata: None, @@ -739,9 +740,9 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute }), filters: Default::default(), }), - weight: addr.weight, }, Backend::Dst(dst) => outbound::http_route::WeightedRouteBackend { + weight: dst.weight, backend: Some(outbound::http_route::RouteBackend { backend: Some(outbound::Backend { metadata: None, @@ -761,43 +762,28 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute }), filters: Default::default(), }), - weight: dst.weight, }, - Backend::InvalidDst(dst) => outbound::http_route::WeightedRouteBackend { + Backend::InvalidDst { weight, message } => outbound::http_route::WeightedRouteBackend { + weight, backend: Some(outbound::http_route::RouteBackend { - backend: Some(outbound::Backend { - metadata: None, - queue: Some(default_queue_config()), - kind: Some(outbound::backend::Kind::Balancer( - outbound::backend::BalanceP2c { - discovery: Some(outbound::backend::EndpointDiscovery { - kind: Some(outbound::backend::endpoint_discovery::Kind::Dst( - outbound::backend::endpoint_discovery::DestinationGet { - path: dst.authority.clone(), - }, - )), - }), - load: Some(default_balancer_config()), - }, - )), - }), + backend: None, filters: vec![outbound::http_route::Filter { kind: Some(outbound::http_route::filter::Kind::FailureInjector( api::http_route::HttpFailureInjector { status: 500, - message: format!("backend {} is invalid", dst.authority), + message, ratio: None, }, )), }], }), - weight: dst.weight, }, } } fn default_http_backend(outbound: &OutboundPolicy) -> outbound::http_route::WeightedRouteBackend { outbound::http_route::WeightedRouteBackend { + weight: 1, backend: Some(outbound::http_route::RouteBackend { backend: Some(outbound::Backend { metadata: Some(Metadata { @@ -819,7 +805,6 @@ fn default_http_backend(outbound: &OutboundPolicy) -> outbound::http_route::Weig }), filters: Default::default(), }), - weight: 1, } } @@ -879,6 +864,7 @@ fn default_queue_config() -> outbound::Queue { } } +// TODO(ver) this conversion should be made in the api crate. fn convert_tcp_address(ip_addr: IpAddr, port: NonZeroU16) -> TcpAddress { let ip = match ip_addr { IpAddr::V4(ipv4) => Ip::Ipv4(ipv4.into()), diff --git a/policy-controller/k8s/index/src/cluster_info.rs b/policy-controller/k8s/index/src/cluster_info.rs new file mode 100644 index 000000000..749482a16 --- /dev/null +++ b/policy-controller/k8s/index/src/cluster_info.rs @@ -0,0 +1,55 @@ +use std::num::NonZeroU16; + +use crate::{pod::PortSet, DefaultPolicy}; +use linkerd_policy_controller_core::IpNet; +use tokio::time; + +/// Holds cluster metadata. +#[derive(Clone, Debug)] +pub struct ClusterInfo { + /// Networks including PodIPs in this cluster. + /// + /// Unfortunately, there's no way to discover this at runtime. + pub networks: Vec, + + /// The namespace where the linkerd control plane is deployed + pub control_plane_ns: String, + + /// E.g. "cluster.local" + pub dns_domain: String, + + /// The cluster's mesh identity trust domain. + pub identity_domain: String, + + /// The cluster-wide default policy. + pub default_policy: DefaultPolicy, + + /// The cluster-wide default protocol detection timeout. + pub default_detect_timeout: time::Duration, + + /// The default set of ports to be marked opaque. + pub default_opaque_ports: PortSet, + + /// The networks that probes are expected to be from. + pub probe_networks: Vec, +} + +impl ClusterInfo { + pub(crate) fn service_account_identity(&self, ns: &str, sa: &str) -> String { + format!( + "{}.{}.serviceaccount.identity.{}.{}", + sa, ns, self.control_plane_ns, self.identity_domain + ) + } + + pub(crate) fn namespace_identity(&self, ns: &str) -> String { + format!( + "*.{}.serviceaccount.identity.{}.{}", + ns, self.control_plane_ns, self.identity_domain + ) + } + + pub(crate) fn service_dns_authority(&self, ns: &str, svc: &str, port: NonZeroU16) -> String { + format!("{}.{}.svc.{}:{port}", svc, ns, self.dns_domain) + } +} diff --git a/policy-controller/k8s/index/src/lib.rs b/policy-controller/k8s/index/src/lib.rs index 6d8a1effb..53533b704 100644 --- a/policy-controller/k8s/index/src/lib.rs +++ b/policy-controller/k8s/index/src/lib.rs @@ -24,6 +24,7 @@ #![forbid(unsafe_code)] pub mod authorization_policy; +mod cluster_info; mod defaults; pub mod http_route; mod index; @@ -37,51 +38,9 @@ mod server_authorization; #[cfg(test)] mod tests; -use linkerd_policy_controller_core::IpNet; -use std::time; - pub use self::{ + cluster_info::ClusterInfo, defaults::DefaultPolicy, index::{Index, SharedIndex}, pod::{parse_portset, PortSet}, }; - -/// Holds cluster metadata. -#[derive(Clone, Debug)] -pub struct ClusterInfo { - /// Networks including PodIPs in this cluster. - /// - /// Unfortunately, there's no way to discover this at runtime. - pub networks: Vec, - - /// The namespace where the linkerd control plane is deployed - pub control_plane_ns: String, - - /// The cluster's mesh identity trust domain. - pub identity_domain: String, - - /// The cluster-wide default policy. - pub default_policy: DefaultPolicy, - - /// The cluster-wide default protocol detection timeout. - pub default_detect_timeout: time::Duration, - - /// The networks that probes are expected to be from. - pub probe_networks: Vec, -} - -impl ClusterInfo { - fn service_account_identity(&self, ns: &str, sa: &str) -> String { - format!( - "{}.{}.serviceaccount.identity.{}.{}", - sa, ns, self.control_plane_ns, self.identity_domain - ) - } - - fn namespace_identity(&self, ns: &str) -> String { - format!( - "*.{}.serviceaccount.identity.{}.{}", - ns, self.control_plane_ns, self.identity_domain - ) - } -} diff --git a/policy-controller/k8s/index/src/outbound_index.rs b/policy-controller/k8s/index/src/outbound_index.rs index 6e95e0f45..47aed2ed5 100644 --- a/policy-controller/k8s/index/src/outbound_index.rs +++ b/policy-controller/k8s/index/src/outbound_index.rs @@ -1,8 +1,11 @@ -use crate::pod::ports_annotation; -use crate::{http_route::InboundRouteBinding, pod::PortSet}; +use crate::{ + http_route::InboundRouteBinding, + pod::{ports_annotation, PortSet}, + ClusterInfo, +}; use ahash::AHashMap as HashMap; use anyhow::Result; -use k8s_gateway_api::HttpBackendRef; +use k8s_gateway_api::{BackendObjectReference, HttpBackendRef, ParentReference}; use linkerd_policy_controller_core::{ http_route::{Backend, OutboundHttpRoute, OutboundHttpRouteRule, WeightedDst}, OutboundPolicy, @@ -17,32 +20,31 @@ use tokio::sync::watch; use super::http_route::convert; -pub type SharedIndex = Arc>; - #[derive(Debug)] pub struct Index { namespaces: NamespaceIndex, services: HashMap, - default_opaque_ports: PortSet, } +pub type SharedIndex = Arc>; + #[derive(Debug, Clone, PartialEq, Eq)] pub struct ServiceRef { pub name: String, pub namespace: String, } +/// Holds all `Pod`, `Server`, and `ServerAuthorization` indices by-namespace. #[derive(Debug)] -pub struct NamespaceIndex { +struct NamespaceIndex { + cluster_info: Arc, by_ns: HashMap, - cluster_domain: Arc, } -#[derive(Debug, Default)] +#[derive(Debug)] struct Namespace { service_routes: HashMap, namespace: Arc, - cluster_domain: Arc, services: HashMap, } @@ -74,10 +76,9 @@ impl kubert::index::IndexNamespacedResource for Index { .or_insert_with(|| Namespace { service_routes: Default::default(), namespace: Arc::new(ns), - cluster_domain: self.namespaces.cluster_domain.clone(), services: Default::default(), }) - .apply(route); + .apply(route, &self.namespaces.cluster_info); } fn delete(&mut self, namespace: String, name: String) { @@ -113,7 +114,7 @@ impl kubert::index::IndexNamespacedResource for Index { let opaque_ports = ports_annotation(service.annotations(), "config.linkerd.io/opaque-ports") - .unwrap_or_else(|| self.default_opaque_ports.clone()); + .unwrap_or_else(|| self.namespaces.cluster_info.default_opaque_ports.clone()); let service_info = ServiceInfo { opaque_ports }; self.namespaces @@ -122,7 +123,6 @@ impl kubert::index::IndexNamespacedResource for Index { .or_insert_with(|| Namespace { service_routes: Default::default(), namespace: Arc::new(ns), - cluster_domain: self.namespaces.cluster_domain.clone(), services: Default::default(), }) .update_service(service.name_unchecked(), service_info); @@ -138,14 +138,13 @@ impl kubert::index::IndexNamespacedResource for Index { } impl Index { - pub fn shared(cluster_domain: String, default_opaque_ports: PortSet) -> SharedIndex { + pub fn shared(cluster_info: Arc) -> SharedIndex { Arc::new(RwLock::new(Self { namespaces: NamespaceIndex { by_ns: HashMap::default(), - cluster_domain: Arc::new(cluster_domain), + cluster_info, }, services: HashMap::default(), - default_opaque_ports, })) } @@ -162,7 +161,6 @@ impl Index { .or_insert_with(|| Namespace { service_routes: Default::default(), namespace: Arc::new(namespace.to_string()), - cluster_domain: self.namespaces.cluster_domain.clone(), services: Default::default(), }); let key = ServicePort { @@ -170,7 +168,7 @@ impl Index { port, }; tracing::debug!(?key, "subscribing to service port"); - let routes = ns.service_routes_or_default(key); + let routes = ns.service_routes_or_default(key, &self.namespaces.cluster_info); Ok(routes.watch.subscribe()) } @@ -180,12 +178,16 @@ impl Index { } impl Namespace { - fn apply(&mut self, route: HttpRoute) { + fn apply(&mut self, route: HttpRoute, cluster_info: &ClusterInfo) { tracing::debug!(?route); let name = route.name_unchecked(); - let outbound_route = match self.convert_route(route.clone()) { + let outbound_route = match self.convert_route(route.clone(), cluster_info) { Ok(route) => route, Err(error) => { + // XXX(ver) This is likely to fire whenever we process routes + // that target servers, for instance. Ultimately, we should + // unify the handling. Either that or we should reduce the log + // level to avoid user-facing noise. tracing::error!(%error, "failed to convert HttpRoute"); return; } @@ -193,31 +195,34 @@ impl Namespace { tracing::debug!(?outbound_route); for parent_ref in route.spec.inner.parent_refs.iter().flatten() { - if parent_ref.kind.as_deref() == Some("Service") { - if let Some(port) = parent_ref.port { - if let Some(port) = NonZeroU16::new(port) { - let service_port = ServicePort { - port, - service: parent_ref.name.clone(), - }; - tracing::debug!( - ?service_port, - route = route.name_unchecked(), - "inserting route for service" - ); - let service_routes = self.service_routes_or_default(service_port); - service_routes.apply(name.clone(), outbound_route.clone()); - } else { - tracing::warn!(?parent_ref, "ignoring parent_ref with port 0"); - } - } else { - tracing::warn!(?parent_ref, "ignoring parent_ref without port"); - } - } else { + if !is_parent_service(parent_ref) { + // XXX(ver) This is likely to fire whenever we process routes + // that only target inbound resources. tracing::warn!( parent_kind = parent_ref.kind.as_deref(), "ignoring parent_ref with non-Service kind" ); + continue; + } + + if let Some(port) = parent_ref.port { + if let Some(port) = NonZeroU16::new(port) { + let service_port = ServicePort { + port, + service: parent_ref.name.clone(), + }; + tracing::debug!( + ?service_port, + route = route.name_unchecked(), + "inserting route for service" + ); + let service_routes = self.service_routes_or_default(service_port, cluster_info); + service_routes.apply(name.clone(), outbound_route.clone()); + } else { + tracing::warn!(?parent_ref, "ignoring parent_ref with port 0"); + } + } else { + tracing::warn!(?parent_ref, "ignoring parent_ref without port"); } } } @@ -237,10 +242,15 @@ impl Namespace { } } - fn service_routes_or_default(&mut self, service_port: ServicePort) -> &mut ServiceRoutes { - let authority = format!( - "{}.{}.svc.{}:{}", - service_port.service, self.namespace, self.cluster_domain, service_port.port + fn service_routes_or_default( + &mut self, + service_port: ServicePort, + cluster: &ClusterInfo, + ) -> &mut ServiceRoutes { + let authority = cluster.service_dns_authority( + &self.namespace, + &service_port.service, + service_port.port, ); self.service_routes .entry(service_port.clone()) @@ -263,7 +273,7 @@ impl Namespace { }) } - fn convert_route(&self, route: HttpRoute) -> Result { + fn convert_route(&self, route: HttpRoute, cluster: &ClusterInfo) -> Result { let hostnames = route .spec .hostnames @@ -277,7 +287,7 @@ impl Namespace { .rules .into_iter() .flatten() - .map(|r| self.convert_rule(r)) + .map(|r| self.convert_rule(r, cluster)) .collect::>()?; let creation_timestamp = route.metadata.creation_timestamp.map(|Time(t)| t); @@ -289,7 +299,11 @@ impl Namespace { }) } - fn convert_rule(&self, rule: HttpRouteRule) -> Result { + fn convert_rule( + &self, + rule: HttpRouteRule, + cluster: &ClusterInfo, + ) -> Result { let matches = rule .matches .into_iter() @@ -301,31 +315,90 @@ impl Namespace { .backend_refs .into_iter() .flatten() - .filter_map(|b| self.convert_backend(b)) + .filter_map(|b| convert_backend(&self.namespace, b, cluster, &self.services)) .collect(); Ok(OutboundHttpRouteRule { matches, backends }) } +} - fn convert_backend(&self, backend: HttpBackendRef) -> Option { - backend.backend_ref.map(|backend| { - let port = backend.inner.port.unwrap_or_else(|| { - tracing::warn!(?backend, "missing port in backend_ref"); - u16::default() - }); - let dst = WeightedDst { +fn convert_backend( + ns: &str, + backend: HttpBackendRef, + cluster: &ClusterInfo, + services: &HashMap, +) -> Option { + backend.backend_ref.map(|backend| { + if !is_backend_service(&backend.inner) { + return Backend::InvalidDst { weight: backend.weight.unwrap_or(1).into(), - authority: format!( - "{}.{}.svc.{}:{port}", - backend.inner.name, self.namespace, self.cluster_domain, + message: format!( + "unsupported backend type {group} {kind}", + group = backend.inner.group.as_deref().unwrap_or("core"), + kind = backend.inner.kind.as_deref().unwrap_or(""), ), }; - if self.services.contains_key(&backend.inner.name) { - Backend::Dst(dst) - } else { - Backend::InvalidDst(dst) + } + + let name = backend.inner.name; + let weight = backend.weight.unwrap_or(1); + + // The gateway API dictates: + // + // Port is required when the referent is a Kubernetes Service. + let port = match backend + .inner + .port + .and_then(|p| NonZeroU16::try_from(p).ok()) + { + Some(port) => port, + None => { + return Backend::InvalidDst { + weight: weight.into(), + message: format!("missing port for backend Service {name}"), + } } + }; + + if !services.contains_key(&name) { + return Backend::InvalidDst { + weight: weight.into(), + message: format!("Service not found {name}"), + }; + } + + Backend::Dst(WeightedDst { + weight: weight.into(), + authority: cluster.service_dns_authority(ns, &name, port), }) - } + }) +} + +#[inline] +fn is_parent_service(parent: &ParentReference) -> bool { + parent + .kind + .as_deref() + .map(|k| is_service(parent.group.as_deref(), k)) + // Parent refs require a `kind`. + .unwrap_or(false) +} + +#[inline] +fn is_backend_service(backend: &BackendObjectReference) -> bool { + is_service( + backend.group.as_deref(), + // Backends default to `Service` if no kind is specified. + backend.kind.as_deref().unwrap_or("Service"), + ) +} + +#[inline] +fn is_service(group: Option<&str>, kind: &str) -> bool { + // If the group is not specified or empty, assume it's 'core'. + group + .map(|g| g.eq_ignore_ascii_case("core") || g.is_empty()) + .unwrap_or(true) + && kind.eq_ignore_ascii_case("Service") } impl ServiceRoutes { diff --git a/policy-controller/k8s/index/src/tests.rs b/policy-controller/k8s/index/src/tests.rs index e3ce0c9e9..d30edc68e 100644 --- a/policy-controller/k8s/index/src/tests.rs +++ b/policy-controller/k8s/index/src/tests.rs @@ -199,8 +199,10 @@ impl TestConfig { networks: vec![cluster_net], control_plane_ns: "linkerd".to_string(), identity_domain: "cluster.example.com".into(), + dns_domain: "cluster.example.com".into(), default_policy, default_detect_timeout: detect_timeout, + default_opaque_ports: Default::default(), probe_networks, }; let index = Index::shared(cluster.clone()); diff --git a/policy-controller/src/main.rs b/policy-controller/src/main.rs index c0a0ed233..d9c938325 100644 --- a/policy-controller/src/main.rs +++ b/policy-controller/src/main.rs @@ -11,7 +11,7 @@ use linkerd_policy_controller::{ }; use linkerd_policy_controller_k8s_index::parse_portset; use linkerd_policy_controller_k8s_status::{self as status}; -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc}; use tokio::{sync::mpsc, time::Duration}; use tonic::transport::Server; use tracing::{info, info_span, instrument, Instrument}; @@ -118,18 +118,21 @@ async fn main() -> Result<()> { let probe_networks = probe_networks.map(|IpNets(nets)| nets).unwrap_or_default(); let default_opaque_ports = parse_portset(&default_opaque_ports)?; - - // Build the index data structure, which will be used to process events from all watches - // The lookup handle is used by the gRPC server. - let index = Index::shared(ClusterInfo { + let cluster_info = Arc::new(ClusterInfo { networks: cluster_networks.clone(), identity_domain, control_plane_ns: control_plane_namespace.clone(), + dns_domain: cluster_domain, default_policy, default_detect_timeout: DETECT_TIMEOUT, + default_opaque_ports, probe_networks, }); - let outbound_index = outbound_index::Index::shared(cluster_domain, default_opaque_ports); + + // Build the index data structure, which will be used to process events from all watches + // The lookup handle is used by the gRPC server. + let index = Index::shared(cluster_info.clone()); + let outbound_index = outbound_index::Index::shared(cluster_info); let indexes = IndexPair::shared(index.clone(), outbound_index.clone()); // Spawn resource indexers that update the index and publish lookups for the gRPC server.