diff --git a/policy-controller/core/src/outbound.rs b/policy-controller/core/src/outbound.rs index ca2ac8730..a895fb961 100644 --- a/policy-controller/core/src/outbound.rs +++ b/policy-controller/core/src/outbound.rs @@ -1,6 +1,6 @@ use crate::routes::{ - FailureInjectorFilter, GroupKindNamespaceName, HeaderModifierFilter, HostMatch, HttpRouteMatch, - RequestRedirectFilter, + FailureInjectorFilter, GroupKindNamespaceName, GrpcRouteMatch, HeaderModifierFilter, HostMatch, + HttpRouteMatch, RequestRedirectFilter, }; use ahash::AHashMap as HashMap; use anyhow::Result; @@ -32,6 +32,7 @@ pub struct OutboundDiscoverTarget { #[derive(Clone, Debug, PartialEq)] pub struct OutboundPolicy { pub http_routes: RouteSet, + pub grpc_routes: RouteSet, pub authority: String, pub name: String, pub namespace: String, diff --git a/policy-controller/core/src/routes.rs b/policy-controller/core/src/routes.rs index 41cbf7ae3..4051ea559 100644 --- a/policy-controller/core/src/routes.rs +++ b/policy-controller/core/src/routes.rs @@ -72,6 +72,12 @@ pub struct HttpRouteMatch { pub method: Option, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct GrpcRouteMatch { + pub headers: Vec, + pub method: Option, +} + #[derive(Clone, Debug)] pub enum PathMatch { Exact(String), @@ -91,6 +97,12 @@ pub enum QueryParamMatch { Regex(String, Regex), } +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct GrpcMethodMatch { + pub method: Option, + pub service: Option, +} + // === impl GroupKindName === impl Ord for GroupKindName { diff --git a/policy-controller/grpc/src/outbound.rs b/policy-controller/grpc/src/outbound.rs index b05c68f8d..dd771fd37 100644 --- a/policy-controller/grpc/src/outbound.rs +++ b/policy-controller/grpc/src/outbound.rs @@ -18,8 +18,9 @@ use linkerd_policy_controller_core::{ }, routes::GroupKindNamespaceName, }; -use std::{num::NonZeroU16, str::FromStr, sync::Arc, time}; +use std::{iter, num::NonZeroU16, str::FromStr, sync::Arc, time}; +mod grpc; mod http; #[derive(Clone, Debug)] @@ -231,7 +232,25 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy { let mut http_routes = outbound.http_routes.into_iter().collect::>(); http_routes.sort_by(timestamp_then_name); - http::protocol(backend, http_routes.into_iter(), accrual) + let mut http_routes = http_routes.into_iter().peekable(); + + let mut grpc_routes = outbound.grpc_routes.into_iter().collect::>(); + grpc_routes.sort_by(timestamp_then_name); + let mut grpc_routes = grpc_routes.into_iter().peekable(); + + // If both HTTP and gRPC routes are present, we choose the route kind by which has the oldest timestamp. + match (http_routes.peek(), grpc_routes.peek()) { + (Some(http), Some(grpc)) => { + if timestamp_then_name(http, grpc).is_gt() { + grpc::protocol(backend, grpc_routes, accrual) + } else { + http::protocol(backend, http_routes, accrual) + } + } + (Some((_, _http)), None) => http::protocol(backend, http_routes, accrual), + (None, Some((_, _grpc))) => grpc::protocol(backend, grpc_routes, accrual), + (None, None) => http::protocol(backend, iter::empty(), accrual), + } }; let metadata = Metadata { diff --git a/policy-controller/grpc/src/outbound/grpc.rs b/policy-controller/grpc/src/outbound/grpc.rs new file mode 100644 index 000000000..0924bc0dc --- /dev/null +++ b/policy-controller/grpc/src/outbound/grpc.rs @@ -0,0 +1,240 @@ +use std::net::SocketAddr; + +use super::{convert_duration, default_balancer_config, default_queue_config}; +use crate::routes::{ + convert_host_match, convert_request_header_modifier_filter, grpc::convert_match, +}; +use linkerd2_proxy_api::{destination, grpc_route, http_route, meta, outbound}; +use linkerd_policy_controller_core::{ + outbound::{Backend, Filter, OutboundRoute, OutboundRouteRule}, + routes::{FailureInjectorFilter, GroupKindNamespaceName, GrpcRouteMatch}, +}; + +pub(crate) fn protocol( + default_backend: outbound::Backend, + routes: impl Iterator)>, + failure_accrual: Option, +) -> outbound::proxy_protocol::Kind { + let routes = routes + .map(|(gknn, route)| convert_outbound_route(gknn, route, default_backend.clone())) + .collect::>(); + outbound::proxy_protocol::Kind::Grpc(outbound::proxy_protocol::Grpc { + routes, + failure_accrual, + }) +} + +fn convert_outbound_route( + gknn: GroupKindNamespaceName, + OutboundRoute { + hostnames, + rules, + creation_timestamp: _, + }: OutboundRoute, + backend: outbound::Backend, +) -> outbound::GrpcRoute { + let metadata = Some(meta::Metadata { + kind: Some(meta::metadata::Kind::Resource(meta::Resource { + group: gknn.group.to_string(), + kind: gknn.kind.to_string(), + namespace: gknn.namespace.to_string(), + name: gknn.name.to_string(), + ..Default::default() + })), + }); + + let hosts = hostnames.into_iter().map(convert_host_match).collect(); + + let rules = rules + .into_iter() + .map( + |OutboundRouteRule { + matches, + backends, + request_timeout, + backend_request_timeout, + filters, + }| { + let backend_request_timeout = backend_request_timeout + .and_then(|d| convert_duration("backend request_timeout", d)); + let backends = backends + .into_iter() + .map(|backend| convert_backend(backend_request_timeout.clone(), backend)) + .collect::>(); + let dist = if backends.is_empty() { + outbound::grpc_route::distribution::Kind::FirstAvailable( + outbound::grpc_route::distribution::FirstAvailable { + backends: vec![outbound::grpc_route::RouteBackend { + backend: Some(backend.clone()), + filters: vec![], + request_timeout: backend_request_timeout, + }], + }, + ) + } else { + outbound::grpc_route::distribution::Kind::RandomAvailable( + outbound::grpc_route::distribution::RandomAvailable { backends }, + ) + }; + outbound::grpc_route::Rule { + matches: matches.into_iter().map(convert_match).collect(), + backends: Some(outbound::grpc_route::Distribution { kind: Some(dist) }), + filters: filters.into_iter().map(convert_to_filter).collect(), + request_timeout: request_timeout + .and_then(|d| convert_duration("request timeout", d)), + } + }, + ) + .collect(); + + outbound::GrpcRoute { + metadata, + hosts, + rules, + } +} + +fn convert_backend( + request_timeout: Option, + backend: Backend, +) -> outbound::grpc_route::WeightedRouteBackend { + match backend { + Backend::Addr(addr) => { + let socket_addr = SocketAddr::new(addr.addr, addr.port.get()); + outbound::grpc_route::WeightedRouteBackend { + weight: addr.weight, + backend: Some(outbound::grpc_route::RouteBackend { + backend: Some(outbound::Backend { + metadata: None, + queue: Some(default_queue_config()), + kind: Some(outbound::backend::Kind::Forward( + destination::WeightedAddr { + addr: Some(socket_addr.into()), + weight: addr.weight, + ..Default::default() + }, + )), + }), + filters: Default::default(), + request_timeout, + }), + } + } + Backend::Service(svc) => { + if svc.exists { + let filters = svc.filters.into_iter().map(convert_to_filter).collect(); + outbound::grpc_route::WeightedRouteBackend { + weight: svc.weight, + backend: Some(outbound::grpc_route::RouteBackend { + backend: Some(outbound::Backend { + metadata: Some(meta::Metadata { + kind: Some(meta::metadata::Kind::Resource(meta::Resource { + group: "core".to_string(), + kind: "Service".to_string(), + name: svc.name, + namespace: svc.namespace, + section: Default::default(), + port: u16::from(svc.port).into(), + })), + }), + queue: Some(default_queue_config()), + kind: Some(outbound::backend::Kind::Balancer( + outbound::backend::BalanceP2c { + discovery: Some(outbound::backend::EndpointDiscovery { + kind: Some(outbound::backend::endpoint_discovery::Kind::Dst( + outbound::backend::endpoint_discovery::DestinationGet { + path: svc.authority, + }, + )), + }), + load: Some(default_balancer_config()), + }, + )), + }), + filters, + request_timeout, + }), + } + } else { + outbound::grpc_route::WeightedRouteBackend { + weight: svc.weight, + backend: Some(outbound::grpc_route::RouteBackend { + backend: Some(outbound::Backend { + metadata: Some(meta::Metadata { + kind: Some(meta::metadata::Kind::Default("invalid".to_string())), + }), + queue: Some(default_queue_config()), + kind: None, + }), + filters: vec![outbound::grpc_route::Filter { + kind: Some(outbound::grpc_route::filter::Kind::FailureInjector( + grpc_route::GrpcFailureInjector { + code: 500, + message: format!("Service not found {}", svc.name), + ratio: None, + }, + )), + }], + request_timeout, + }), + } + } + } + Backend::Invalid { weight, message } => outbound::grpc_route::WeightedRouteBackend { + weight, + backend: Some(outbound::grpc_route::RouteBackend { + backend: Some(outbound::Backend { + metadata: Some(meta::Metadata { + kind: Some(meta::metadata::Kind::Default("invalid".to_string())), + }), + queue: Some(default_queue_config()), + kind: None, + }), + filters: vec![outbound::grpc_route::Filter { + kind: Some(outbound::grpc_route::filter::Kind::FailureInjector( + grpc_route::GrpcFailureInjector { + code: 500, + message, + ratio: None, + }, + )), + }], + request_timeout, + }), + }, + } +} + +fn convert_to_filter(filter: Filter) -> outbound::grpc_route::Filter { + use outbound::grpc_route::filter::Kind as GrpcFilterKind; + + outbound::grpc_route::Filter { + kind: match filter { + Filter::FailureInjector(FailureInjectorFilter { + status, + message, + ratio, + }) => Some(GrpcFilterKind::FailureInjector( + grpc_route::GrpcFailureInjector { + code: u32::from(status.as_u16()), + message, + ratio: Some(http_route::Ratio { + numerator: ratio.numerator, + denominator: ratio.denominator, + }), + }, + )), + Filter::RequestHeaderModifier(filter) => Some(GrpcFilterKind::RequestHeaderModifier( + convert_request_header_modifier_filter(filter), + )), + Filter::RequestRedirect(filter) => { + tracing::warn!(filter = ?filter, "declining to convert invalid filter type for GrpcRoute"); + None + } + Filter::ResponseHeaderModifier(filter) => { + tracing::warn!(filter = ?filter, "declining to convert invalid filter type for GrpcRoute"); + None + } + }, + } +} diff --git a/policy-controller/grpc/src/routes.rs b/policy-controller/grpc/src/routes.rs index 62f8128dc..86d679cac 100644 --- a/policy-controller/grpc/src/routes.rs +++ b/policy-controller/grpc/src/routes.rs @@ -3,6 +3,7 @@ use linkerd_policy_controller_core::routes::{ HeaderModifierFilter, HostMatch, PathModifier, RequestRedirectFilter, }; +pub(crate) mod grpc; pub(crate) mod http; pub(crate) fn convert_host_match(h: HostMatch) -> proto::HostMatch { diff --git a/policy-controller/grpc/src/routes/grpc.rs b/policy-controller/grpc/src/routes/grpc.rs new file mode 100644 index 000000000..6c574cbf2 --- /dev/null +++ b/policy-controller/grpc/src/routes/grpc.rs @@ -0,0 +1,29 @@ +use linkerd2_proxy_api::{grpc_route, http_route}; +use linkerd_policy_controller_core::routes::{GrpcRouteMatch, HeaderMatch}; + +pub(crate) fn convert_match( + GrpcRouteMatch { headers, method }: GrpcRouteMatch, +) -> grpc_route::GrpcRouteMatch { + let headers = headers + .into_iter() + .map(|rule| match rule { + HeaderMatch::Exact(name, value) => http_route::HeaderMatch { + name: name.to_string(), + value: Some(http_route::header_match::Value::Exact( + value.as_bytes().to_vec(), + )), + }, + HeaderMatch::Regex(name, re) => http_route::HeaderMatch { + name: name.to_string(), + value: Some(http_route::header_match::Value::Regex(re.to_string())), + }, + }) + .collect(); + + let rpc = method.map(|value| grpc_route::GrpcRpcMatch { + method: value.method.unwrap_or_default(), + service: value.service.unwrap_or_default(), + }); + + grpc_route::GrpcRouteMatch { rpc, headers } +} diff --git a/policy-controller/k8s/index/src/outbound/index.rs b/policy-controller/k8s/index/src/outbound/index.rs index feb86aef1..1e102c4f1 100644 --- a/policy-controller/k8s/index/src/outbound/index.rs +++ b/policy-controller/k8s/index/src/outbound/index.rs @@ -1,13 +1,13 @@ use crate::{ ports::{ports_annotation, PortSet}, - routes::{ExplicitGKN, HttpRouteResource}, + routes::{ExplicitGKN, HttpRouteResource, ImpliedGKN}, ClusterInfo, }; use ahash::AHashMap as HashMap; use anyhow::{bail, ensure, Result}; use linkerd_policy_controller_core::{ outbound::{Backend, Backoff, FailureAccrual, OutboundPolicy, OutboundRoute, RouteSet}, - routes::{GroupKindNamespaceName, HttpRouteMatch}, + routes::{GroupKindNamespaceName, GrpcRouteMatch, HttpRouteMatch}, }; use linkerd_policy_controller_k8s_api::{ gateway::{self as k8s_gateway_api, ParentReference}, @@ -24,6 +24,7 @@ pub struct Index { service_info: HashMap, } +mod grpc; mod http; pub mod metrics; @@ -51,6 +52,7 @@ struct Namespace { /// Stores the route resources (by service name) that do not /// explicitly target a port. service_http_routes: HashMap>, + service_grpc_routes: HashMap>, namespace: Arc, } @@ -82,6 +84,7 @@ struct RoutesWatch { opaque: bool, accrual: Option, http_routes: RouteSet, + grpc_routes: RouteSet, watch: watch::Sender, } @@ -116,6 +119,21 @@ impl kubert::index::IndexNamespacedResource for Inde } } +impl kubert::index::IndexNamespacedResource for Index { + fn apply(&mut self, route: k8s_gateway_api::GrpcRoute) { + self.apply_grpc(route) + } + + fn delete(&mut self, namespace: String, name: String) { + let gknn = name + .gkn::() + .namespaced(namespace); + for ns_index in self.namespaces.by_ns.values_mut() { + ns_index.delete_grpc_route(&gknn); + } + } +} + impl kubert::index::IndexNamespacedResource for Index { fn apply(&mut self, service: Service) { let name = service.name_unchecked(); @@ -162,6 +180,7 @@ impl kubert::index::IndexNamespacedResource for Index { .entry(ns.clone()) .or_insert_with(|| Namespace { service_http_routes: Default::default(), + service_grpc_routes: Default::default(), service_port_routes: Default::default(), namespace: Arc::new(ns), }) @@ -214,6 +233,7 @@ impl Index { .or_insert_with(|| Namespace { namespace: Arc::new(service_namespace.to_string()), service_http_routes: Default::default(), + service_grpc_routes: Default::default(), service_port_routes: Default::default(), }); @@ -259,6 +279,7 @@ impl Index { .or_insert_with(|| Namespace { namespace: Arc::new(ns), service_http_routes: Default::default(), + service_grpc_routes: Default::default(), service_port_routes: Default::default(), }) .apply_http_route( @@ -270,6 +291,42 @@ impl Index { } } + fn apply_grpc(&mut self, route: k8s_gateway_api::GrpcRoute) { + tracing::debug!(name = route.name_unchecked(), "indexing grpcroute"); + + for parent_ref in route.spec.inner.parent_refs.iter().flatten() { + if !is_parent_service(parent_ref) { + continue; + } + + if !route_accepted_by_service(route.status.as_ref().map(|s| &s.inner), &parent_ref.name) + { + continue; + } + + let ns = parent_ref + .namespace + .clone() + .unwrap_or_else(|| route.namespace().expect("GrpcRoute must have a namespace")); + + self.namespaces + .by_ns + .entry(ns.clone()) + .or_insert_with(|| Namespace { + namespace: Arc::new(ns), + service_http_routes: Default::default(), + service_grpc_routes: Default::default(), + service_port_routes: Default::default(), + }) + .apply_grpc_route( + route.clone(), + parent_ref, + &self.namespaces.cluster_info, + &self.service_info, + ); + } + } + fn reindex_services(&mut self) { for ns in self.namespaces.by_ns.values_mut() { ns.reindex_services(&self.service_info); @@ -336,6 +393,68 @@ impl Namespace { } } + fn apply_grpc_route( + &mut self, + route: k8s_gateway_api::GrpcRoute, + parent_ref: &ParentReference, + cluster_info: &ClusterInfo, + service_info: &HashMap, + ) { + tracing::debug!(?route); + + let outbound_route = + match grpc::convert_route(&self.namespace, route.clone(), cluster_info, service_info) { + Ok(route) => route, + Err(error) => { + tracing::error!(%error, "failed to convert route"); + return; + } + }; + + tracing::debug!(?outbound_route); + + let gknn = route + .gkn() + .namespaced(route.namespace().expect("Route must have namespace")); + + let port = parent_ref.port.and_then(NonZeroU16::new); + + if let Some(port) = port { + let service_port = ServicePort { + port, + service: parent_ref.name.clone(), + }; + + tracing::debug!( + ?service_port, + route = route.name_unchecked(), + "inserting grpcroute for service" + ); + + let service_routes = + self.service_routes_or_default(service_port, cluster_info, service_info); + + service_routes.apply_grpc_route(gknn, outbound_route); + } else { + // If the parent_ref doesn't include a port, apply this route + // to all ServiceRoutes which match the Service name. + self.service_port_routes.iter_mut().for_each( + |(ServicePort { service, port: _ }, routes)| { + if service == &parent_ref.name { + routes.apply_grpc_route(gknn.clone(), outbound_route.clone()); + } + }, + ); + + // Also add the route to the list of routes that target the + // Service without specifying a port. + self.service_grpc_routes + .entry(parent_ref.name.clone()) + .or_default() + .insert(gknn, outbound_route); + } + } + fn reindex_services(&mut self, service_info: &HashMap) { let update_service = |backend: &mut Backend| { if let Backend::Service(svc) = backend { @@ -354,8 +473,13 @@ impl Namespace { .values_mut() .flat_map(|route| route.rules.iter_mut()) .flat_map(|rule| rule.backends.iter_mut()); + let grpc_backends = watch + .grpc_routes + .values_mut() + .flat_map(|route| route.rules.iter_mut()) + .flat_map(|rule| rule.backends.iter_mut()); - http_backends.for_each(update_service); + http_backends.chain(grpc_backends).for_each(update_service); watch.send_if_modified(); } } @@ -386,6 +510,17 @@ impl Namespace { }); } + fn delete_grpc_route(&mut self, gknn: &GroupKindNamespaceName) { + for service in self.service_port_routes.values_mut() { + service.delete_grpc_route(gknn); + } + + self.service_grpc_routes.retain(|_, routes| { + routes.remove(gknn); + !routes.is_empty() + }); + } + fn service_routes_or_default( &mut self, sp: ServicePort, @@ -415,6 +550,11 @@ impl Namespace { .get(&sp.service) .cloned() .unwrap_or_default(); + let grpc_routes = self + .service_grpc_routes + .get(&sp.service) + .cloned() + .unwrap_or_default(); let mut service_routes = ServiceRoutes { opaque, @@ -432,6 +572,9 @@ impl Namespace { let (producer_http_routes, consumer_http_routes): (Vec<_>, Vec<_>) = http_routes .into_iter() .partition(|(gknn, _)| gknn.namespace == *self.namespace); + let (producer_grpc_routes, consumer_grpc_routes): (Vec<_>, Vec<_>) = grpc_routes + .into_iter() + .partition(|(gknn, _)| gknn.namespace == *self.namespace); for (consumer_gknn, consumer_route) in consumer_http_routes { // Consumer routes should only apply to watches from the @@ -441,6 +584,14 @@ impl Namespace { consumer_watch.insert_http_route(consumer_gknn.clone(), consumer_route.clone()); } + for (consumer_gknn, consumer_route) in consumer_grpc_routes { + // Consumer routes should only apply to watches from the + // consumer namespace. + let consumer_watch = + service_routes.watch_for_ns_or_default(consumer_gknn.namespace.to_string()); + + consumer_watch.insert_grpc_route(consumer_gknn.clone(), consumer_route.clone()); + } for (producer_gknn, producer_route) in producer_http_routes { // Insert the route into the producer namespace. @@ -462,6 +613,26 @@ impl Namespace { }); } + for (producer_gknn, producer_route) in producer_grpc_routes { + // Insert the route into the producer namespace. + let producer_watch = + service_routes.watch_for_ns_or_default(producer_gknn.namespace.to_string()); + + producer_watch.insert_grpc_route(producer_gknn.clone(), producer_route.clone()); + + // Producer routes apply to clients in all namespaces, so + // apply it to watches for all other namespaces too. + service_routes + .watches_by_ns + .iter_mut() + .filter(|(namespace, _)| { + namespace.as_str() != producer_gknn.namespace.as_ref() + }) + .for_each(|(_, watch)| { + watch.insert_grpc_route(producer_gknn.clone(), producer_route.clone()) + }); + } + service_routes }) } @@ -514,6 +685,11 @@ impl ServiceRoutes { .get(self.namespace.as_ref()) .map(|watch| watch.http_routes.clone()) .unwrap_or_default(); + let grpc_routes = self + .watches_by_ns + .get(self.namespace.as_ref()) + .map(|watch| watch.grpc_routes.clone()) + .unwrap_or_default(); self.watches_by_ns.entry(namespace).or_insert_with(|| { let (sender, _) = watch::channel(OutboundPolicy { @@ -521,6 +697,7 @@ impl ServiceRoutes { opaque: self.opaque, accrual: self.accrual, http_routes: http_routes.clone(), + grpc_routes: grpc_routes.clone(), name: self.name.to_string(), authority: self.authority.clone(), namespace: self.namespace.to_string(), @@ -528,6 +705,7 @@ impl ServiceRoutes { RoutesWatch { http_routes, + grpc_routes, watch: sender, opaque: self.opaque, accrual: self.accrual, @@ -561,6 +739,32 @@ impl ServiceRoutes { } } + fn apply_grpc_route( + &mut self, + gknn: GroupKindNamespaceName, + route: OutboundRoute, + ) { + if *gknn.namespace == *self.namespace { + // This is a producer namespace route. + let watch = self.watch_for_ns_or_default(gknn.namespace.to_string()); + + watch.insert_grpc_route(gknn.clone(), route.clone()); + + // Producer routes apply to clients in all namespaces, so + // apply it to watches for all other namespaces too. + for (ns, ns_watch) in self.watches_by_ns.iter_mut() { + if ns != &gknn.namespace { + ns_watch.insert_grpc_route(gknn.clone(), route.clone()); + } + } + } else { + // This is a consumer namespace route and should only apply to + // watches from that namespace. + let watch = self.watch_for_ns_or_default(gknn.namespace.to_string()); + watch.insert_grpc_route(gknn, route); + } + } + fn update_service(&mut self, opaque: bool, accrual: Option) { self.opaque = opaque; self.accrual = accrual; @@ -576,6 +780,12 @@ impl ServiceRoutes { watch.remove_http_route(gknn); } } + + fn delete_grpc_route(&mut self, gknn: &GroupKindNamespaceName) { + for watch in self.watches_by_ns.values_mut() { + watch.remove_grpc_route(gknn); + } + } } impl RoutesWatch { @@ -588,6 +798,11 @@ impl RoutesWatch { modified = true; } + if self.grpc_routes != policy.grpc_routes { + policy.grpc_routes = self.grpc_routes.clone(); + modified = true; + } + if self.opaque != policy.opaque { policy.opaque = self.opaque; modified = true; @@ -612,10 +827,25 @@ impl RoutesWatch { self.send_if_modified(); } + fn insert_grpc_route( + &mut self, + gknn: GroupKindNamespaceName, + route: OutboundRoute, + ) { + self.grpc_routes.insert(gknn, route); + + self.send_if_modified(); + } + fn remove_http_route(&mut self, gknn: &GroupKindNamespaceName) { self.http_routes.remove(gknn); self.send_if_modified(); } + + fn remove_grpc_route(&mut self, gknn: &GroupKindNamespaceName) { + self.grpc_routes.remove(gknn); + self.send_if_modified(); + } } fn parse_accrual_config( diff --git a/policy-controller/k8s/index/src/outbound/tests/routes.rs b/policy-controller/k8s/index/src/outbound/tests/routes.rs index 8074a0f70..f4e23ff7c 100644 --- a/policy-controller/k8s/index/src/outbound/tests/routes.rs +++ b/policy-controller/k8s/index/src/outbound/tests/routes.rs @@ -1 +1,2 @@ +mod grpc; mod http; diff --git a/policy-controller/k8s/index/src/outbound/tests/routes/grpc.rs b/policy-controller/k8s/index/src/outbound/tests/routes/grpc.rs new file mode 100644 index 000000000..d8245bdb1 --- /dev/null +++ b/policy-controller/k8s/index/src/outbound/tests/routes/grpc.rs @@ -0,0 +1,174 @@ +use kube::Resource; +use linkerd_policy_controller_core::{ + outbound::{Backend, WeightedService}, + routes::GroupKindNamespaceName, + POLICY_CONTROLLER_NAME, +}; +use linkerd_policy_controller_k8s_api::gateway as k8s_gateway_api; +use tracing::Level; + +use super::super::*; + +#[test] +fn backend_service() { + tracing_subscriber::fmt() + .with_max_level(Level::TRACE) + .try_init() + .ok(); + + let test = TestConfig::default(); + // Create apex service. + let apex = mk_service("ns", "apex", 8080); + test.index.write().apply(apex); + + // Create httproute. + let route = mk_route("ns", "route", 8080, "apex", "backend"); + test.index.write().apply(route); + + let mut rx = test + .index + .write() + .outbound_policy_rx( + "apex".to_string(), + "ns".to_string(), + 8080.try_into().unwrap(), + "ns".to_string(), + ) + .expect("apex.ns should exist"); + + { + let policy = rx.borrow_and_update(); + let backend = policy + .grpc_routes + .get(&GroupKindNamespaceName { + group: k8s_gateway_api::GrpcRoute::group(&()), + kind: k8s_gateway_api::GrpcRoute::kind(&()), + namespace: "ns".into(), + name: "route".into(), + }) + .expect("route should exist") + .rules + .first() + .expect("rule should exist") + .backends + .first() + .expect("backend should exist"); + + let exists = match backend { + Backend::Service(WeightedService { exists, .. }) => exists, + _ => panic!("backend should be a service"), + }; + + // Backend should not exist. + assert!(!exists); + } + + // Create backend service. + let backend = mk_service("ns", "backend", 8080); + test.index.write().apply(backend); + assert!(rx.has_changed().unwrap()); + + { + let policy = rx.borrow_and_update(); + let backend = policy + .grpc_routes + .get(&GroupKindNamespaceName { + group: k8s_gateway_api::GrpcRoute::group(&()), + kind: k8s_gateway_api::GrpcRoute::kind(&()), + namespace: "ns".into(), + name: "route".into(), + }) + .expect("route should exist") + .rules + .first() + .expect("rule should exist") + .backends + .first() + .expect("backend should exist"); + + let exists = match backend { + Backend::Service(WeightedService { exists, .. }) => exists, + _ => panic!("backend should be a service"), + }; + + // Backend should exist. + assert!(exists); + } +} + +fn mk_route( + ns: impl ToString, + name: impl ToString, + port: u16, + parent: impl ToString, + backend: impl ToString, +) -> k8s_gateway_api::GrpcRoute { + use chrono::Utc; + use k8s::{policy::httproute::*, Time}; + + k8s_gateway_api::GrpcRoute { + metadata: k8s::ObjectMeta { + namespace: Some(ns.to_string()), + name: Some(name.to_string()), + creation_timestamp: Some(Time(Utc::now())), + ..Default::default() + }, + spec: k8s_gateway_api::GrpcRouteSpec { + inner: CommonRouteSpec { + parent_refs: Some(vec![ParentReference { + group: Some("core".to_string()), + kind: Some("Service".to_string()), + namespace: Some(ns.to_string()), + name: parent.to_string(), + section_name: None, + port: Some(port), + }]), + }, + hostnames: None, + rules: Some(vec![k8s_gateway_api::GrpcRouteRule { + matches: Some(vec![k8s_gateway_api::GrpcRouteMatch { + headers: None, + method: Some(k8s_gateway_api::GrpcMethodMatch::Exact { + method: Some("Test".to_string()), + service: Some("io.linkerd.Testing".to_string()), + }), + }]), + filters: None, + backend_refs: Some(vec![k8s_gateway_api::GrpcRouteBackendRef { + filters: None, + weight: None, + inner: BackendObjectReference { + group: Some("core".to_string()), + kind: Some("Service".to_string()), + namespace: Some(ns.to_string()), + name: backend.to_string(), + port: Some(port), + }, + }]), + }]), + }, + status: Some(k8s_gateway_api::GrpcRouteStatus { + inner: RouteStatus { + parents: vec![k8s::gateway::RouteParentStatus { + parent_ref: ParentReference { + group: Some("core".to_string()), + kind: Some("Service".to_string()), + namespace: Some(ns.to_string()), + name: parent.to_string(), + section_name: None, + port: Some(port), + }, + controller_name: POLICY_CONTROLLER_NAME.to_string(), + conditions: vec![k8s::Condition { + last_transition_time: Time(chrono::DateTime::::MIN_UTC), + message: "".to_string(), + observed_generation: None, + reason: "Accepted".to_string(), + status: "True".to_string(), + type_: "Accepted".to_string(), + }], + }], + }, + }), + } +} diff --git a/policy-controller/k8s/index/src/routes.rs b/policy-controller/k8s/index/src/routes.rs index 3877aba86..538d2bf4e 100644 --- a/policy-controller/k8s/index/src/routes.rs +++ b/policy-controller/k8s/index/src/routes.rs @@ -1,6 +1,7 @@ use linkerd_policy_controller_core::routes::{GroupKindName, GroupKindNamespaceName}; use linkerd_policy_controller_k8s_api::{gateway as api, policy, Resource, ResourceExt}; +pub mod grpc; pub mod http; #[derive(Debug, Clone)] diff --git a/policy-controller/k8s/index/src/routes/grpc.rs b/policy-controller/k8s/index/src/routes/grpc.rs new file mode 100644 index 000000000..1bb6e9756 --- /dev/null +++ b/policy-controller/k8s/index/src/routes/grpc.rs @@ -0,0 +1,22 @@ +use anyhow::Result; +use linkerd_policy_controller_core::routes; +use linkerd_policy_controller_k8s_api::gateway as k8s_gateway_api; + +pub fn try_match( + k8s_gateway_api::GrpcRouteMatch { headers, method }: k8s_gateway_api::GrpcRouteMatch, +) -> Result { + let headers = headers + .into_iter() + .flatten() + .map(super::http::header_match) + .collect::>()?; + + let method = method.map(|value| match value { + k8s_gateway_api::GrpcMethodMatch::Exact { method, service } + | k8s_gateway_api::GrpcMethodMatch::RegularExpression { method, service } => { + routes::GrpcMethodMatch { method, service } + } + }); + + Ok(routes::GrpcRouteMatch { headers, method }) +} diff --git a/policy-controller/src/main.rs b/policy-controller/src/main.rs index 552509921..594bf2b46 100644 --- a/policy-controller/src/main.rs +++ b/policy-controller/src/main.rs @@ -262,8 +262,11 @@ async fn main() -> Result<()> { let gateway_grpc_routes = runtime.watch_all::(watcher::Config::default()); + let gateway_grpc_routes_indexes = IndexList::new(outbound_index.clone()) + .push(status_index.clone()) + .shared(); tokio::spawn( - kubert::index::namespaced(status_index.clone(), gateway_grpc_routes) + kubert::index::namespaced(gateway_grpc_routes_indexes.clone(), gateway_grpc_routes) .instrument(info_span!("grpcroutes.gateway.networking.k8s.io")), );