mirror of https://github.com/linkerd/linkerd2.git
policy: Use ClusterInfo in outbound indexer (#10493)
We have a `ClusterInfo` type that includes all cluster-level configuration. The outbound indexer does not use this type and, instead, passes around individual configuration values. This change updates the outbound indexer to use the ClusterInfo configuration type. It also moves the type to its own module. This is largely a reorganization PR, though there is one notable change: InvalidDst response no longer reference a backend. Instead, the backend is left empty, since it's not possible to route requests to it.
This commit is contained in:
parent
d5c895452c
commit
6e9244f485
|
@ -8,8 +8,7 @@ pub use http::{
|
||||||
Method, StatusCode,
|
Method, StatusCode,
|
||||||
};
|
};
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use std::net::IpAddr;
|
use std::{net::IpAddr, num::NonZeroU16};
|
||||||
use std::num::NonZeroU16;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||||
pub struct InboundHttpRoute {
|
pub struct InboundHttpRoute {
|
||||||
|
@ -54,7 +53,7 @@ pub struct OutboundHttpRouteRule {
|
||||||
pub enum Backend {
|
pub enum Backend {
|
||||||
Addr(WeightedAddr),
|
Addr(WeightedAddr),
|
||||||
Dst(WeightedDst),
|
Dst(WeightedDst),
|
||||||
InvalidDst(WeightedDst),
|
InvalidDst { weight: u32, message: String },
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||||
|
|
|
@ -725,6 +725,7 @@ fn convert_outbound_http_route(
|
||||||
fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRouteBackend {
|
fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRouteBackend {
|
||||||
match backend {
|
match backend {
|
||||||
Backend::Addr(addr) => outbound::http_route::WeightedRouteBackend {
|
Backend::Addr(addr) => outbound::http_route::WeightedRouteBackend {
|
||||||
|
weight: addr.weight,
|
||||||
backend: Some(outbound::http_route::RouteBackend {
|
backend: Some(outbound::http_route::RouteBackend {
|
||||||
backend: Some(outbound::Backend {
|
backend: Some(outbound::Backend {
|
||||||
metadata: None,
|
metadata: None,
|
||||||
|
@ -739,9 +740,9 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute
|
||||||
}),
|
}),
|
||||||
filters: Default::default(),
|
filters: Default::default(),
|
||||||
}),
|
}),
|
||||||
weight: addr.weight,
|
|
||||||
},
|
},
|
||||||
Backend::Dst(dst) => outbound::http_route::WeightedRouteBackend {
|
Backend::Dst(dst) => outbound::http_route::WeightedRouteBackend {
|
||||||
|
weight: dst.weight,
|
||||||
backend: Some(outbound::http_route::RouteBackend {
|
backend: Some(outbound::http_route::RouteBackend {
|
||||||
backend: Some(outbound::Backend {
|
backend: Some(outbound::Backend {
|
||||||
metadata: None,
|
metadata: None,
|
||||||
|
@ -761,43 +762,28 @@ fn convert_http_backend(backend: Backend) -> outbound::http_route::WeightedRoute
|
||||||
}),
|
}),
|
||||||
filters: Default::default(),
|
filters: Default::default(),
|
||||||
}),
|
}),
|
||||||
weight: dst.weight,
|
|
||||||
},
|
},
|
||||||
Backend::InvalidDst(dst) => outbound::http_route::WeightedRouteBackend {
|
Backend::InvalidDst { weight, message } => outbound::http_route::WeightedRouteBackend {
|
||||||
|
weight,
|
||||||
backend: Some(outbound::http_route::RouteBackend {
|
backend: Some(outbound::http_route::RouteBackend {
|
||||||
backend: Some(outbound::Backend {
|
backend: None,
|
||||||
metadata: None,
|
|
||||||
queue: Some(default_queue_config()),
|
|
||||||
kind: Some(outbound::backend::Kind::Balancer(
|
|
||||||
outbound::backend::BalanceP2c {
|
|
||||||
discovery: Some(outbound::backend::EndpointDiscovery {
|
|
||||||
kind: Some(outbound::backend::endpoint_discovery::Kind::Dst(
|
|
||||||
outbound::backend::endpoint_discovery::DestinationGet {
|
|
||||||
path: dst.authority.clone(),
|
|
||||||
},
|
|
||||||
)),
|
|
||||||
}),
|
|
||||||
load: Some(default_balancer_config()),
|
|
||||||
},
|
|
||||||
)),
|
|
||||||
}),
|
|
||||||
filters: vec![outbound::http_route::Filter {
|
filters: vec![outbound::http_route::Filter {
|
||||||
kind: Some(outbound::http_route::filter::Kind::FailureInjector(
|
kind: Some(outbound::http_route::filter::Kind::FailureInjector(
|
||||||
api::http_route::HttpFailureInjector {
|
api::http_route::HttpFailureInjector {
|
||||||
status: 500,
|
status: 500,
|
||||||
message: format!("backend {} is invalid", dst.authority),
|
message,
|
||||||
ratio: None,
|
ratio: None,
|
||||||
},
|
},
|
||||||
)),
|
)),
|
||||||
}],
|
}],
|
||||||
}),
|
}),
|
||||||
weight: dst.weight,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_http_backend(outbound: &OutboundPolicy) -> outbound::http_route::WeightedRouteBackend {
|
fn default_http_backend(outbound: &OutboundPolicy) -> outbound::http_route::WeightedRouteBackend {
|
||||||
outbound::http_route::WeightedRouteBackend {
|
outbound::http_route::WeightedRouteBackend {
|
||||||
|
weight: 1,
|
||||||
backend: Some(outbound::http_route::RouteBackend {
|
backend: Some(outbound::http_route::RouteBackend {
|
||||||
backend: Some(outbound::Backend {
|
backend: Some(outbound::Backend {
|
||||||
metadata: Some(Metadata {
|
metadata: Some(Metadata {
|
||||||
|
@ -819,7 +805,6 @@ fn default_http_backend(outbound: &OutboundPolicy) -> outbound::http_route::Weig
|
||||||
}),
|
}),
|
||||||
filters: Default::default(),
|
filters: Default::default(),
|
||||||
}),
|
}),
|
||||||
weight: 1,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -879,6 +864,7 @@ fn default_queue_config() -> outbound::Queue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(ver) this conversion should be made in the api crate.
|
||||||
fn convert_tcp_address(ip_addr: IpAddr, port: NonZeroU16) -> TcpAddress {
|
fn convert_tcp_address(ip_addr: IpAddr, port: NonZeroU16) -> TcpAddress {
|
||||||
let ip = match ip_addr {
|
let ip = match ip_addr {
|
||||||
IpAddr::V4(ipv4) => Ip::Ipv4(ipv4.into()),
|
IpAddr::V4(ipv4) => Ip::Ipv4(ipv4.into()),
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
use std::num::NonZeroU16;
|
||||||
|
|
||||||
|
use crate::{pod::PortSet, DefaultPolicy};
|
||||||
|
use linkerd_policy_controller_core::IpNet;
|
||||||
|
use tokio::time;
|
||||||
|
|
||||||
|
/// Holds cluster metadata.
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct ClusterInfo {
|
||||||
|
/// Networks including PodIPs in this cluster.
|
||||||
|
///
|
||||||
|
/// Unfortunately, there's no way to discover this at runtime.
|
||||||
|
pub networks: Vec<IpNet>,
|
||||||
|
|
||||||
|
/// The namespace where the linkerd control plane is deployed
|
||||||
|
pub control_plane_ns: String,
|
||||||
|
|
||||||
|
/// E.g. "cluster.local"
|
||||||
|
pub dns_domain: String,
|
||||||
|
|
||||||
|
/// The cluster's mesh identity trust domain.
|
||||||
|
pub identity_domain: String,
|
||||||
|
|
||||||
|
/// The cluster-wide default policy.
|
||||||
|
pub default_policy: DefaultPolicy,
|
||||||
|
|
||||||
|
/// The cluster-wide default protocol detection timeout.
|
||||||
|
pub default_detect_timeout: time::Duration,
|
||||||
|
|
||||||
|
/// The default set of ports to be marked opaque.
|
||||||
|
pub default_opaque_ports: PortSet,
|
||||||
|
|
||||||
|
/// The networks that probes are expected to be from.
|
||||||
|
pub probe_networks: Vec<IpNet>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ClusterInfo {
|
||||||
|
pub(crate) fn service_account_identity(&self, ns: &str, sa: &str) -> String {
|
||||||
|
format!(
|
||||||
|
"{}.{}.serviceaccount.identity.{}.{}",
|
||||||
|
sa, ns, self.control_plane_ns, self.identity_domain
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn namespace_identity(&self, ns: &str) -> String {
|
||||||
|
format!(
|
||||||
|
"*.{}.serviceaccount.identity.{}.{}",
|
||||||
|
ns, self.control_plane_ns, self.identity_domain
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn service_dns_authority(&self, ns: &str, svc: &str, port: NonZeroU16) -> String {
|
||||||
|
format!("{}.{}.svc.{}:{port}", svc, ns, self.dns_domain)
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,6 +24,7 @@
|
||||||
#![forbid(unsafe_code)]
|
#![forbid(unsafe_code)]
|
||||||
|
|
||||||
pub mod authorization_policy;
|
pub mod authorization_policy;
|
||||||
|
mod cluster_info;
|
||||||
mod defaults;
|
mod defaults;
|
||||||
pub mod http_route;
|
pub mod http_route;
|
||||||
mod index;
|
mod index;
|
||||||
|
@ -37,51 +38,9 @@ mod server_authorization;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
use linkerd_policy_controller_core::IpNet;
|
|
||||||
use std::time;
|
|
||||||
|
|
||||||
pub use self::{
|
pub use self::{
|
||||||
|
cluster_info::ClusterInfo,
|
||||||
defaults::DefaultPolicy,
|
defaults::DefaultPolicy,
|
||||||
index::{Index, SharedIndex},
|
index::{Index, SharedIndex},
|
||||||
pod::{parse_portset, PortSet},
|
pod::{parse_portset, PortSet},
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Holds cluster metadata.
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct ClusterInfo {
|
|
||||||
/// Networks including PodIPs in this cluster.
|
|
||||||
///
|
|
||||||
/// Unfortunately, there's no way to discover this at runtime.
|
|
||||||
pub networks: Vec<IpNet>,
|
|
||||||
|
|
||||||
/// The namespace where the linkerd control plane is deployed
|
|
||||||
pub control_plane_ns: String,
|
|
||||||
|
|
||||||
/// The cluster's mesh identity trust domain.
|
|
||||||
pub identity_domain: String,
|
|
||||||
|
|
||||||
/// The cluster-wide default policy.
|
|
||||||
pub default_policy: DefaultPolicy,
|
|
||||||
|
|
||||||
/// The cluster-wide default protocol detection timeout.
|
|
||||||
pub default_detect_timeout: time::Duration,
|
|
||||||
|
|
||||||
/// The networks that probes are expected to be from.
|
|
||||||
pub probe_networks: Vec<IpNet>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ClusterInfo {
|
|
||||||
fn service_account_identity(&self, ns: &str, sa: &str) -> String {
|
|
||||||
format!(
|
|
||||||
"{}.{}.serviceaccount.identity.{}.{}",
|
|
||||||
sa, ns, self.control_plane_ns, self.identity_domain
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn namespace_identity(&self, ns: &str) -> String {
|
|
||||||
format!(
|
|
||||||
"*.{}.serviceaccount.identity.{}.{}",
|
|
||||||
ns, self.control_plane_ns, self.identity_domain
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,8 +1,11 @@
|
||||||
use crate::pod::ports_annotation;
|
use crate::{
|
||||||
use crate::{http_route::InboundRouteBinding, pod::PortSet};
|
http_route::InboundRouteBinding,
|
||||||
|
pod::{ports_annotation, PortSet},
|
||||||
|
ClusterInfo,
|
||||||
|
};
|
||||||
use ahash::AHashMap as HashMap;
|
use ahash::AHashMap as HashMap;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use k8s_gateway_api::HttpBackendRef;
|
use k8s_gateway_api::{BackendObjectReference, HttpBackendRef, ParentReference};
|
||||||
use linkerd_policy_controller_core::{
|
use linkerd_policy_controller_core::{
|
||||||
http_route::{Backend, OutboundHttpRoute, OutboundHttpRouteRule, WeightedDst},
|
http_route::{Backend, OutboundHttpRoute, OutboundHttpRouteRule, WeightedDst},
|
||||||
OutboundPolicy,
|
OutboundPolicy,
|
||||||
|
@ -17,32 +20,31 @@ use tokio::sync::watch;
|
||||||
|
|
||||||
use super::http_route::convert;
|
use super::http_route::convert;
|
||||||
|
|
||||||
pub type SharedIndex = Arc<RwLock<Index>>;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Index {
|
pub struct Index {
|
||||||
namespaces: NamespaceIndex,
|
namespaces: NamespaceIndex,
|
||||||
services: HashMap<IpAddr, ServiceRef>,
|
services: HashMap<IpAddr, ServiceRef>,
|
||||||
default_opaque_ports: PortSet,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub type SharedIndex = Arc<RwLock<Index>>;
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub struct ServiceRef {
|
pub struct ServiceRef {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub namespace: String,
|
pub namespace: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Holds all `Pod`, `Server`, and `ServerAuthorization` indices by-namespace.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct NamespaceIndex {
|
struct NamespaceIndex {
|
||||||
|
cluster_info: Arc<ClusterInfo>,
|
||||||
by_ns: HashMap<String, Namespace>,
|
by_ns: HashMap<String, Namespace>,
|
||||||
cluster_domain: Arc<String>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug)]
|
||||||
struct Namespace {
|
struct Namespace {
|
||||||
service_routes: HashMap<ServicePort, ServiceRoutes>,
|
service_routes: HashMap<ServicePort, ServiceRoutes>,
|
||||||
namespace: Arc<String>,
|
namespace: Arc<String>,
|
||||||
cluster_domain: Arc<String>,
|
|
||||||
services: HashMap<String, ServiceInfo>,
|
services: HashMap<String, ServiceInfo>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,10 +76,9 @@ impl kubert::index::IndexNamespacedResource<HttpRoute> for Index {
|
||||||
.or_insert_with(|| Namespace {
|
.or_insert_with(|| Namespace {
|
||||||
service_routes: Default::default(),
|
service_routes: Default::default(),
|
||||||
namespace: Arc::new(ns),
|
namespace: Arc::new(ns),
|
||||||
cluster_domain: self.namespaces.cluster_domain.clone(),
|
|
||||||
services: Default::default(),
|
services: Default::default(),
|
||||||
})
|
})
|
||||||
.apply(route);
|
.apply(route, &self.namespaces.cluster_info);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn delete(&mut self, namespace: String, name: String) {
|
fn delete(&mut self, namespace: String, name: String) {
|
||||||
|
@ -113,7 +114,7 @@ impl kubert::index::IndexNamespacedResource<Service> for Index {
|
||||||
|
|
||||||
let opaque_ports =
|
let opaque_ports =
|
||||||
ports_annotation(service.annotations(), "config.linkerd.io/opaque-ports")
|
ports_annotation(service.annotations(), "config.linkerd.io/opaque-ports")
|
||||||
.unwrap_or_else(|| self.default_opaque_ports.clone());
|
.unwrap_or_else(|| self.namespaces.cluster_info.default_opaque_ports.clone());
|
||||||
let service_info = ServiceInfo { opaque_ports };
|
let service_info = ServiceInfo { opaque_ports };
|
||||||
|
|
||||||
self.namespaces
|
self.namespaces
|
||||||
|
@ -122,7 +123,6 @@ impl kubert::index::IndexNamespacedResource<Service> for Index {
|
||||||
.or_insert_with(|| Namespace {
|
.or_insert_with(|| Namespace {
|
||||||
service_routes: Default::default(),
|
service_routes: Default::default(),
|
||||||
namespace: Arc::new(ns),
|
namespace: Arc::new(ns),
|
||||||
cluster_domain: self.namespaces.cluster_domain.clone(),
|
|
||||||
services: Default::default(),
|
services: Default::default(),
|
||||||
})
|
})
|
||||||
.update_service(service.name_unchecked(), service_info);
|
.update_service(service.name_unchecked(), service_info);
|
||||||
|
@ -138,14 +138,13 @@ impl kubert::index::IndexNamespacedResource<Service> for Index {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Index {
|
impl Index {
|
||||||
pub fn shared(cluster_domain: String, default_opaque_ports: PortSet) -> SharedIndex {
|
pub fn shared(cluster_info: Arc<ClusterInfo>) -> SharedIndex {
|
||||||
Arc::new(RwLock::new(Self {
|
Arc::new(RwLock::new(Self {
|
||||||
namespaces: NamespaceIndex {
|
namespaces: NamespaceIndex {
|
||||||
by_ns: HashMap::default(),
|
by_ns: HashMap::default(),
|
||||||
cluster_domain: Arc::new(cluster_domain),
|
cluster_info,
|
||||||
},
|
},
|
||||||
services: HashMap::default(),
|
services: HashMap::default(),
|
||||||
default_opaque_ports,
|
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,7 +161,6 @@ impl Index {
|
||||||
.or_insert_with(|| Namespace {
|
.or_insert_with(|| Namespace {
|
||||||
service_routes: Default::default(),
|
service_routes: Default::default(),
|
||||||
namespace: Arc::new(namespace.to_string()),
|
namespace: Arc::new(namespace.to_string()),
|
||||||
cluster_domain: self.namespaces.cluster_domain.clone(),
|
|
||||||
services: Default::default(),
|
services: Default::default(),
|
||||||
});
|
});
|
||||||
let key = ServicePort {
|
let key = ServicePort {
|
||||||
|
@ -170,7 +168,7 @@ impl Index {
|
||||||
port,
|
port,
|
||||||
};
|
};
|
||||||
tracing::debug!(?key, "subscribing to service port");
|
tracing::debug!(?key, "subscribing to service port");
|
||||||
let routes = ns.service_routes_or_default(key);
|
let routes = ns.service_routes_or_default(key, &self.namespaces.cluster_info);
|
||||||
Ok(routes.watch.subscribe())
|
Ok(routes.watch.subscribe())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -180,12 +178,16 @@ impl Index {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Namespace {
|
impl Namespace {
|
||||||
fn apply(&mut self, route: HttpRoute) {
|
fn apply(&mut self, route: HttpRoute, cluster_info: &ClusterInfo) {
|
||||||
tracing::debug!(?route);
|
tracing::debug!(?route);
|
||||||
let name = route.name_unchecked();
|
let name = route.name_unchecked();
|
||||||
let outbound_route = match self.convert_route(route.clone()) {
|
let outbound_route = match self.convert_route(route.clone(), cluster_info) {
|
||||||
Ok(route) => route,
|
Ok(route) => route,
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
|
// XXX(ver) This is likely to fire whenever we process routes
|
||||||
|
// that target servers, for instance. Ultimately, we should
|
||||||
|
// unify the handling. Either that or we should reduce the log
|
||||||
|
// level to avoid user-facing noise.
|
||||||
tracing::error!(%error, "failed to convert HttpRoute");
|
tracing::error!(%error, "failed to convert HttpRoute");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -193,7 +195,16 @@ impl Namespace {
|
||||||
tracing::debug!(?outbound_route);
|
tracing::debug!(?outbound_route);
|
||||||
|
|
||||||
for parent_ref in route.spec.inner.parent_refs.iter().flatten() {
|
for parent_ref in route.spec.inner.parent_refs.iter().flatten() {
|
||||||
if parent_ref.kind.as_deref() == Some("Service") {
|
if !is_parent_service(parent_ref) {
|
||||||
|
// XXX(ver) This is likely to fire whenever we process routes
|
||||||
|
// that only target inbound resources.
|
||||||
|
tracing::warn!(
|
||||||
|
parent_kind = parent_ref.kind.as_deref(),
|
||||||
|
"ignoring parent_ref with non-Service kind"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(port) = parent_ref.port {
|
if let Some(port) = parent_ref.port {
|
||||||
if let Some(port) = NonZeroU16::new(port) {
|
if let Some(port) = NonZeroU16::new(port) {
|
||||||
let service_port = ServicePort {
|
let service_port = ServicePort {
|
||||||
|
@ -205,7 +216,7 @@ impl Namespace {
|
||||||
route = route.name_unchecked(),
|
route = route.name_unchecked(),
|
||||||
"inserting route for service"
|
"inserting route for service"
|
||||||
);
|
);
|
||||||
let service_routes = self.service_routes_or_default(service_port);
|
let service_routes = self.service_routes_or_default(service_port, cluster_info);
|
||||||
service_routes.apply(name.clone(), outbound_route.clone());
|
service_routes.apply(name.clone(), outbound_route.clone());
|
||||||
} else {
|
} else {
|
||||||
tracing::warn!(?parent_ref, "ignoring parent_ref with port 0");
|
tracing::warn!(?parent_ref, "ignoring parent_ref with port 0");
|
||||||
|
@ -213,12 +224,6 @@ impl Namespace {
|
||||||
} else {
|
} else {
|
||||||
tracing::warn!(?parent_ref, "ignoring parent_ref without port");
|
tracing::warn!(?parent_ref, "ignoring parent_ref without port");
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
tracing::warn!(
|
|
||||||
parent_kind = parent_ref.kind.as_deref(),
|
|
||||||
"ignoring parent_ref with non-Service kind"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,10 +242,15 @@ impl Namespace {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn service_routes_or_default(&mut self, service_port: ServicePort) -> &mut ServiceRoutes {
|
fn service_routes_or_default(
|
||||||
let authority = format!(
|
&mut self,
|
||||||
"{}.{}.svc.{}:{}",
|
service_port: ServicePort,
|
||||||
service_port.service, self.namespace, self.cluster_domain, service_port.port
|
cluster: &ClusterInfo,
|
||||||
|
) -> &mut ServiceRoutes {
|
||||||
|
let authority = cluster.service_dns_authority(
|
||||||
|
&self.namespace,
|
||||||
|
&service_port.service,
|
||||||
|
service_port.port,
|
||||||
);
|
);
|
||||||
self.service_routes
|
self.service_routes
|
||||||
.entry(service_port.clone())
|
.entry(service_port.clone())
|
||||||
|
@ -263,7 +273,7 @@ impl Namespace {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn convert_route(&self, route: HttpRoute) -> Result<OutboundHttpRoute> {
|
fn convert_route(&self, route: HttpRoute, cluster: &ClusterInfo) -> Result<OutboundHttpRoute> {
|
||||||
let hostnames = route
|
let hostnames = route
|
||||||
.spec
|
.spec
|
||||||
.hostnames
|
.hostnames
|
||||||
|
@ -277,7 +287,7 @@ impl Namespace {
|
||||||
.rules
|
.rules
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.flatten()
|
.flatten()
|
||||||
.map(|r| self.convert_rule(r))
|
.map(|r| self.convert_rule(r, cluster))
|
||||||
.collect::<Result<_>>()?;
|
.collect::<Result<_>>()?;
|
||||||
|
|
||||||
let creation_timestamp = route.metadata.creation_timestamp.map(|Time(t)| t);
|
let creation_timestamp = route.metadata.creation_timestamp.map(|Time(t)| t);
|
||||||
|
@ -289,7 +299,11 @@ impl Namespace {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn convert_rule(&self, rule: HttpRouteRule) -> Result<OutboundHttpRouteRule> {
|
fn convert_rule(
|
||||||
|
&self,
|
||||||
|
rule: HttpRouteRule,
|
||||||
|
cluster: &ClusterInfo,
|
||||||
|
) -> Result<OutboundHttpRouteRule> {
|
||||||
let matches = rule
|
let matches = rule
|
||||||
.matches
|
.matches
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -301,31 +315,90 @@ impl Namespace {
|
||||||
.backend_refs
|
.backend_refs
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.flatten()
|
.flatten()
|
||||||
.filter_map(|b| self.convert_backend(b))
|
.filter_map(|b| convert_backend(&self.namespace, b, cluster, &self.services))
|
||||||
.collect();
|
.collect();
|
||||||
Ok(OutboundHttpRouteRule { matches, backends })
|
Ok(OutboundHttpRouteRule { matches, backends })
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn convert_backend(&self, backend: HttpBackendRef) -> Option<Backend> {
|
fn convert_backend(
|
||||||
|
ns: &str,
|
||||||
|
backend: HttpBackendRef,
|
||||||
|
cluster: &ClusterInfo,
|
||||||
|
services: &HashMap<String, ServiceInfo>,
|
||||||
|
) -> Option<Backend> {
|
||||||
backend.backend_ref.map(|backend| {
|
backend.backend_ref.map(|backend| {
|
||||||
let port = backend.inner.port.unwrap_or_else(|| {
|
if !is_backend_service(&backend.inner) {
|
||||||
tracing::warn!(?backend, "missing port in backend_ref");
|
return Backend::InvalidDst {
|
||||||
u16::default()
|
|
||||||
});
|
|
||||||
let dst = WeightedDst {
|
|
||||||
weight: backend.weight.unwrap_or(1).into(),
|
weight: backend.weight.unwrap_or(1).into(),
|
||||||
authority: format!(
|
message: format!(
|
||||||
"{}.{}.svc.{}:{port}",
|
"unsupported backend type {group} {kind}",
|
||||||
backend.inner.name, self.namespace, self.cluster_domain,
|
group = backend.inner.group.as_deref().unwrap_or("core"),
|
||||||
|
kind = backend.inner.kind.as_deref().unwrap_or("<empty>"),
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
if self.services.contains_key(&backend.inner.name) {
|
|
||||||
Backend::Dst(dst)
|
|
||||||
} else {
|
|
||||||
Backend::InvalidDst(dst)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let name = backend.inner.name;
|
||||||
|
let weight = backend.weight.unwrap_or(1);
|
||||||
|
|
||||||
|
// The gateway API dictates:
|
||||||
|
//
|
||||||
|
// Port is required when the referent is a Kubernetes Service.
|
||||||
|
let port = match backend
|
||||||
|
.inner
|
||||||
|
.port
|
||||||
|
.and_then(|p| NonZeroU16::try_from(p).ok())
|
||||||
|
{
|
||||||
|
Some(port) => port,
|
||||||
|
None => {
|
||||||
|
return Backend::InvalidDst {
|
||||||
|
weight: weight.into(),
|
||||||
|
message: format!("missing port for backend Service {name}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !services.contains_key(&name) {
|
||||||
|
return Backend::InvalidDst {
|
||||||
|
weight: weight.into(),
|
||||||
|
message: format!("Service not found {name}"),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
Backend::Dst(WeightedDst {
|
||||||
|
weight: weight.into(),
|
||||||
|
authority: cluster.service_dns_authority(ns, &name, port),
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn is_parent_service(parent: &ParentReference) -> bool {
|
||||||
|
parent
|
||||||
|
.kind
|
||||||
|
.as_deref()
|
||||||
|
.map(|k| is_service(parent.group.as_deref(), k))
|
||||||
|
// Parent refs require a `kind`.
|
||||||
|
.unwrap_or(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn is_backend_service(backend: &BackendObjectReference) -> bool {
|
||||||
|
is_service(
|
||||||
|
backend.group.as_deref(),
|
||||||
|
// Backends default to `Service` if no kind is specified.
|
||||||
|
backend.kind.as_deref().unwrap_or("Service"),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn is_service(group: Option<&str>, kind: &str) -> bool {
|
||||||
|
// If the group is not specified or empty, assume it's 'core'.
|
||||||
|
group
|
||||||
|
.map(|g| g.eq_ignore_ascii_case("core") || g.is_empty())
|
||||||
|
.unwrap_or(true)
|
||||||
|
&& kind.eq_ignore_ascii_case("Service")
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServiceRoutes {
|
impl ServiceRoutes {
|
||||||
|
|
|
@ -199,8 +199,10 @@ impl TestConfig {
|
||||||
networks: vec![cluster_net],
|
networks: vec![cluster_net],
|
||||||
control_plane_ns: "linkerd".to_string(),
|
control_plane_ns: "linkerd".to_string(),
|
||||||
identity_domain: "cluster.example.com".into(),
|
identity_domain: "cluster.example.com".into(),
|
||||||
|
dns_domain: "cluster.example.com".into(),
|
||||||
default_policy,
|
default_policy,
|
||||||
default_detect_timeout: detect_timeout,
|
default_detect_timeout: detect_timeout,
|
||||||
|
default_opaque_ports: Default::default(),
|
||||||
probe_networks,
|
probe_networks,
|
||||||
};
|
};
|
||||||
let index = Index::shared(cluster.clone());
|
let index = Index::shared(cluster.clone());
|
||||||
|
|
|
@ -11,7 +11,7 @@ use linkerd_policy_controller::{
|
||||||
};
|
};
|
||||||
use linkerd_policy_controller_k8s_index::parse_portset;
|
use linkerd_policy_controller_k8s_index::parse_portset;
|
||||||
use linkerd_policy_controller_k8s_status::{self as status};
|
use linkerd_policy_controller_k8s_status::{self as status};
|
||||||
use std::net::SocketAddr;
|
use std::{net::SocketAddr, sync::Arc};
|
||||||
use tokio::{sync::mpsc, time::Duration};
|
use tokio::{sync::mpsc, time::Duration};
|
||||||
use tonic::transport::Server;
|
use tonic::transport::Server;
|
||||||
use tracing::{info, info_span, instrument, Instrument};
|
use tracing::{info, info_span, instrument, Instrument};
|
||||||
|
@ -118,18 +118,21 @@ async fn main() -> Result<()> {
|
||||||
let probe_networks = probe_networks.map(|IpNets(nets)| nets).unwrap_or_default();
|
let probe_networks = probe_networks.map(|IpNets(nets)| nets).unwrap_or_default();
|
||||||
|
|
||||||
let default_opaque_ports = parse_portset(&default_opaque_ports)?;
|
let default_opaque_ports = parse_portset(&default_opaque_ports)?;
|
||||||
|
let cluster_info = Arc::new(ClusterInfo {
|
||||||
// Build the index data structure, which will be used to process events from all watches
|
|
||||||
// The lookup handle is used by the gRPC server.
|
|
||||||
let index = Index::shared(ClusterInfo {
|
|
||||||
networks: cluster_networks.clone(),
|
networks: cluster_networks.clone(),
|
||||||
identity_domain,
|
identity_domain,
|
||||||
control_plane_ns: control_plane_namespace.clone(),
|
control_plane_ns: control_plane_namespace.clone(),
|
||||||
|
dns_domain: cluster_domain,
|
||||||
default_policy,
|
default_policy,
|
||||||
default_detect_timeout: DETECT_TIMEOUT,
|
default_detect_timeout: DETECT_TIMEOUT,
|
||||||
|
default_opaque_ports,
|
||||||
probe_networks,
|
probe_networks,
|
||||||
});
|
});
|
||||||
let outbound_index = outbound_index::Index::shared(cluster_domain, default_opaque_ports);
|
|
||||||
|
// Build the index data structure, which will be used to process events from all watches
|
||||||
|
// The lookup handle is used by the gRPC server.
|
||||||
|
let index = Index::shared(cluster_info.clone());
|
||||||
|
let outbound_index = outbound_index::Index::shared(cluster_info);
|
||||||
let indexes = IndexPair::shared(index.clone(), outbound_index.clone());
|
let indexes = IndexPair::shared(index.clone(), outbound_index.clone());
|
||||||
|
|
||||||
// Spawn resource indexers that update the index and publish lookups for the gRPC server.
|
// Spawn resource indexers that update the index and publish lookups for the gRPC server.
|
||||||
|
|
Loading…
Reference in New Issue