Add support for grpcroute to outbound policy (#12761)

We update the policy-controller to support grpcroutes in the outbound policy API.  When a GrpcRoute resource has a Service as a parent ref, outbound policy requests for that service may return a proxy protocol of grpc with grpc routes.

* if a service has no HttpRoute or GrpcRoutes as children, we continue to return the default http route with a proxy protocol of Detect (so that the proxy is directed to detect Http1 vs Http2)
* similarly, if a service has HttpRoute children only, we continue to return those routes with a proxy protocol of Detect
* if a service has GrpcRoute children only, we return those routes with a proxy protocol of Grpc
* if a service has both types of routes as children, we determine the proxy protocol based on which route type has the oldest created_at timestamp as described in https://gateway-api.sigs.k8s.io/geps/gep-1016/#cross-serving and only return routes of the determined type

Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
Alex Leong 2024-06-25 15:45:49 -07:00 committed by GitHub
parent 4bf0ae33dc
commit a51d7a6444
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 741 additions and 8 deletions

View File

@ -1,6 +1,6 @@
use crate::routes::{
FailureInjectorFilter, GroupKindNamespaceName, HeaderModifierFilter, HostMatch, HttpRouteMatch,
RequestRedirectFilter,
FailureInjectorFilter, GroupKindNamespaceName, GrpcRouteMatch, HeaderModifierFilter, HostMatch,
HttpRouteMatch, RequestRedirectFilter,
};
use ahash::AHashMap as HashMap;
use anyhow::Result;
@ -32,6 +32,7 @@ pub struct OutboundDiscoverTarget {
#[derive(Clone, Debug, PartialEq)]
pub struct OutboundPolicy {
pub http_routes: RouteSet<HttpRouteMatch>,
pub grpc_routes: RouteSet<GrpcRouteMatch>,
pub authority: String,
pub name: String,
pub namespace: String,

View File

@ -72,6 +72,12 @@ pub struct HttpRouteMatch {
pub method: Option<Method>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct GrpcRouteMatch {
pub headers: Vec<HeaderMatch>,
pub method: Option<GrpcMethodMatch>,
}
#[derive(Clone, Debug)]
pub enum PathMatch {
Exact(String),
@ -91,6 +97,12 @@ pub enum QueryParamMatch {
Regex(String, Regex),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct GrpcMethodMatch {
pub method: Option<String>,
pub service: Option<String>,
}
// === impl GroupKindName ===
impl Ord for GroupKindName {

View File

@ -18,8 +18,9 @@ use linkerd_policy_controller_core::{
},
routes::GroupKindNamespaceName,
};
use std::{num::NonZeroU16, str::FromStr, sync::Arc, time};
use std::{iter, num::NonZeroU16, str::FromStr, sync::Arc, time};
mod grpc;
mod http;
#[derive(Clone, Debug)]
@ -231,7 +232,25 @@ fn to_service(outbound: OutboundPolicy) -> outbound::OutboundPolicy {
let mut http_routes = outbound.http_routes.into_iter().collect::<Vec<_>>();
http_routes.sort_by(timestamp_then_name);
http::protocol(backend, http_routes.into_iter(), accrual)
let mut http_routes = http_routes.into_iter().peekable();
let mut grpc_routes = outbound.grpc_routes.into_iter().collect::<Vec<_>>();
grpc_routes.sort_by(timestamp_then_name);
let mut grpc_routes = grpc_routes.into_iter().peekable();
// If both HTTP and gRPC routes are present, we choose the route kind by which has the oldest timestamp.
match (http_routes.peek(), grpc_routes.peek()) {
(Some(http), Some(grpc)) => {
if timestamp_then_name(http, grpc).is_gt() {
grpc::protocol(backend, grpc_routes, accrual)
} else {
http::protocol(backend, http_routes, accrual)
}
}
(Some((_, _http)), None) => http::protocol(backend, http_routes, accrual),
(None, Some((_, _grpc))) => grpc::protocol(backend, grpc_routes, accrual),
(None, None) => http::protocol(backend, iter::empty(), accrual),
}
};
let metadata = Metadata {

View File

@ -0,0 +1,240 @@
use std::net::SocketAddr;
use super::{convert_duration, default_balancer_config, default_queue_config};
use crate::routes::{
convert_host_match, convert_request_header_modifier_filter, grpc::convert_match,
};
use linkerd2_proxy_api::{destination, grpc_route, http_route, meta, outbound};
use linkerd_policy_controller_core::{
outbound::{Backend, Filter, OutboundRoute, OutboundRouteRule},
routes::{FailureInjectorFilter, GroupKindNamespaceName, GrpcRouteMatch},
};
pub(crate) fn protocol(
default_backend: outbound::Backend,
routes: impl Iterator<Item = (GroupKindNamespaceName, OutboundRoute<GrpcRouteMatch>)>,
failure_accrual: Option<outbound::FailureAccrual>,
) -> outbound::proxy_protocol::Kind {
let routes = routes
.map(|(gknn, route)| convert_outbound_route(gknn, route, default_backend.clone()))
.collect::<Vec<_>>();
outbound::proxy_protocol::Kind::Grpc(outbound::proxy_protocol::Grpc {
routes,
failure_accrual,
})
}
fn convert_outbound_route(
gknn: GroupKindNamespaceName,
OutboundRoute {
hostnames,
rules,
creation_timestamp: _,
}: OutboundRoute<GrpcRouteMatch>,
backend: outbound::Backend,
) -> outbound::GrpcRoute {
let metadata = Some(meta::Metadata {
kind: Some(meta::metadata::Kind::Resource(meta::Resource {
group: gknn.group.to_string(),
kind: gknn.kind.to_string(),
namespace: gknn.namespace.to_string(),
name: gknn.name.to_string(),
..Default::default()
})),
});
let hosts = hostnames.into_iter().map(convert_host_match).collect();
let rules = rules
.into_iter()
.map(
|OutboundRouteRule {
matches,
backends,
request_timeout,
backend_request_timeout,
filters,
}| {
let backend_request_timeout = backend_request_timeout
.and_then(|d| convert_duration("backend request_timeout", d));
let backends = backends
.into_iter()
.map(|backend| convert_backend(backend_request_timeout.clone(), backend))
.collect::<Vec<_>>();
let dist = if backends.is_empty() {
outbound::grpc_route::distribution::Kind::FirstAvailable(
outbound::grpc_route::distribution::FirstAvailable {
backends: vec![outbound::grpc_route::RouteBackend {
backend: Some(backend.clone()),
filters: vec![],
request_timeout: backend_request_timeout,
}],
},
)
} else {
outbound::grpc_route::distribution::Kind::RandomAvailable(
outbound::grpc_route::distribution::RandomAvailable { backends },
)
};
outbound::grpc_route::Rule {
matches: matches.into_iter().map(convert_match).collect(),
backends: Some(outbound::grpc_route::Distribution { kind: Some(dist) }),
filters: filters.into_iter().map(convert_to_filter).collect(),
request_timeout: request_timeout
.and_then(|d| convert_duration("request timeout", d)),
}
},
)
.collect();
outbound::GrpcRoute {
metadata,
hosts,
rules,
}
}
fn convert_backend(
request_timeout: Option<prost_types::Duration>,
backend: Backend,
) -> outbound::grpc_route::WeightedRouteBackend {
match backend {
Backend::Addr(addr) => {
let socket_addr = SocketAddr::new(addr.addr, addr.port.get());
outbound::grpc_route::WeightedRouteBackend {
weight: addr.weight,
backend: Some(outbound::grpc_route::RouteBackend {
backend: Some(outbound::Backend {
metadata: None,
queue: Some(default_queue_config()),
kind: Some(outbound::backend::Kind::Forward(
destination::WeightedAddr {
addr: Some(socket_addr.into()),
weight: addr.weight,
..Default::default()
},
)),
}),
filters: Default::default(),
request_timeout,
}),
}
}
Backend::Service(svc) => {
if svc.exists {
let filters = svc.filters.into_iter().map(convert_to_filter).collect();
outbound::grpc_route::WeightedRouteBackend {
weight: svc.weight,
backend: Some(outbound::grpc_route::RouteBackend {
backend: Some(outbound::Backend {
metadata: Some(meta::Metadata {
kind: Some(meta::metadata::Kind::Resource(meta::Resource {
group: "core".to_string(),
kind: "Service".to_string(),
name: svc.name,
namespace: svc.namespace,
section: Default::default(),
port: u16::from(svc.port).into(),
})),
}),
queue: Some(default_queue_config()),
kind: Some(outbound::backend::Kind::Balancer(
outbound::backend::BalanceP2c {
discovery: Some(outbound::backend::EndpointDiscovery {
kind: Some(outbound::backend::endpoint_discovery::Kind::Dst(
outbound::backend::endpoint_discovery::DestinationGet {
path: svc.authority,
},
)),
}),
load: Some(default_balancer_config()),
},
)),
}),
filters,
request_timeout,
}),
}
} else {
outbound::grpc_route::WeightedRouteBackend {
weight: svc.weight,
backend: Some(outbound::grpc_route::RouteBackend {
backend: Some(outbound::Backend {
metadata: Some(meta::Metadata {
kind: Some(meta::metadata::Kind::Default("invalid".to_string())),
}),
queue: Some(default_queue_config()),
kind: None,
}),
filters: vec![outbound::grpc_route::Filter {
kind: Some(outbound::grpc_route::filter::Kind::FailureInjector(
grpc_route::GrpcFailureInjector {
code: 500,
message: format!("Service not found {}", svc.name),
ratio: None,
},
)),
}],
request_timeout,
}),
}
}
}
Backend::Invalid { weight, message } => outbound::grpc_route::WeightedRouteBackend {
weight,
backend: Some(outbound::grpc_route::RouteBackend {
backend: Some(outbound::Backend {
metadata: Some(meta::Metadata {
kind: Some(meta::metadata::Kind::Default("invalid".to_string())),
}),
queue: Some(default_queue_config()),
kind: None,
}),
filters: vec![outbound::grpc_route::Filter {
kind: Some(outbound::grpc_route::filter::Kind::FailureInjector(
grpc_route::GrpcFailureInjector {
code: 500,
message,
ratio: None,
},
)),
}],
request_timeout,
}),
},
}
}
fn convert_to_filter(filter: Filter) -> outbound::grpc_route::Filter {
use outbound::grpc_route::filter::Kind as GrpcFilterKind;
outbound::grpc_route::Filter {
kind: match filter {
Filter::FailureInjector(FailureInjectorFilter {
status,
message,
ratio,
}) => Some(GrpcFilterKind::FailureInjector(
grpc_route::GrpcFailureInjector {
code: u32::from(status.as_u16()),
message,
ratio: Some(http_route::Ratio {
numerator: ratio.numerator,
denominator: ratio.denominator,
}),
},
)),
Filter::RequestHeaderModifier(filter) => Some(GrpcFilterKind::RequestHeaderModifier(
convert_request_header_modifier_filter(filter),
)),
Filter::RequestRedirect(filter) => {
tracing::warn!(filter = ?filter, "declining to convert invalid filter type for GrpcRoute");
None
}
Filter::ResponseHeaderModifier(filter) => {
tracing::warn!(filter = ?filter, "declining to convert invalid filter type for GrpcRoute");
None
}
},
}
}

View File

@ -3,6 +3,7 @@ use linkerd_policy_controller_core::routes::{
HeaderModifierFilter, HostMatch, PathModifier, RequestRedirectFilter,
};
pub(crate) mod grpc;
pub(crate) mod http;
pub(crate) fn convert_host_match(h: HostMatch) -> proto::HostMatch {

View File

@ -0,0 +1,29 @@
use linkerd2_proxy_api::{grpc_route, http_route};
use linkerd_policy_controller_core::routes::{GrpcRouteMatch, HeaderMatch};
pub(crate) fn convert_match(
GrpcRouteMatch { headers, method }: GrpcRouteMatch,
) -> grpc_route::GrpcRouteMatch {
let headers = headers
.into_iter()
.map(|rule| match rule {
HeaderMatch::Exact(name, value) => http_route::HeaderMatch {
name: name.to_string(),
value: Some(http_route::header_match::Value::Exact(
value.as_bytes().to_vec(),
)),
},
HeaderMatch::Regex(name, re) => http_route::HeaderMatch {
name: name.to_string(),
value: Some(http_route::header_match::Value::Regex(re.to_string())),
},
})
.collect();
let rpc = method.map(|value| grpc_route::GrpcRpcMatch {
method: value.method.unwrap_or_default(),
service: value.service.unwrap_or_default(),
});
grpc_route::GrpcRouteMatch { rpc, headers }
}

View File

@ -1,13 +1,13 @@
use crate::{
ports::{ports_annotation, PortSet},
routes::{ExplicitGKN, HttpRouteResource},
routes::{ExplicitGKN, HttpRouteResource, ImpliedGKN},
ClusterInfo,
};
use ahash::AHashMap as HashMap;
use anyhow::{bail, ensure, Result};
use linkerd_policy_controller_core::{
outbound::{Backend, Backoff, FailureAccrual, OutboundPolicy, OutboundRoute, RouteSet},
routes::{GroupKindNamespaceName, HttpRouteMatch},
routes::{GroupKindNamespaceName, GrpcRouteMatch, HttpRouteMatch},
};
use linkerd_policy_controller_k8s_api::{
gateway::{self as k8s_gateway_api, ParentReference},
@ -24,6 +24,7 @@ pub struct Index {
service_info: HashMap<ServiceRef, ServiceInfo>,
}
mod grpc;
mod http;
pub mod metrics;
@ -51,6 +52,7 @@ struct Namespace {
/// Stores the route resources (by service name) that do not
/// explicitly target a port.
service_http_routes: HashMap<String, RouteSet<HttpRouteMatch>>,
service_grpc_routes: HashMap<String, RouteSet<GrpcRouteMatch>>,
namespace: Arc<String>,
}
@ -82,6 +84,7 @@ struct RoutesWatch {
opaque: bool,
accrual: Option<FailureAccrual>,
http_routes: RouteSet<HttpRouteMatch>,
grpc_routes: RouteSet<GrpcRouteMatch>,
watch: watch::Sender<OutboundPolicy>,
}
@ -116,6 +119,21 @@ impl kubert::index::IndexNamespacedResource<k8s_gateway_api::HttpRoute> for Inde
}
}
impl kubert::index::IndexNamespacedResource<k8s_gateway_api::GrpcRoute> for Index {
fn apply(&mut self, route: k8s_gateway_api::GrpcRoute) {
self.apply_grpc(route)
}
fn delete(&mut self, namespace: String, name: String) {
let gknn = name
.gkn::<k8s_gateway_api::GrpcRoute>()
.namespaced(namespace);
for ns_index in self.namespaces.by_ns.values_mut() {
ns_index.delete_grpc_route(&gknn);
}
}
}
impl kubert::index::IndexNamespacedResource<Service> for Index {
fn apply(&mut self, service: Service) {
let name = service.name_unchecked();
@ -162,6 +180,7 @@ impl kubert::index::IndexNamespacedResource<Service> for Index {
.entry(ns.clone())
.or_insert_with(|| Namespace {
service_http_routes: Default::default(),
service_grpc_routes: Default::default(),
service_port_routes: Default::default(),
namespace: Arc::new(ns),
})
@ -214,6 +233,7 @@ impl Index {
.or_insert_with(|| Namespace {
namespace: Arc::new(service_namespace.to_string()),
service_http_routes: Default::default(),
service_grpc_routes: Default::default(),
service_port_routes: Default::default(),
});
@ -259,6 +279,7 @@ impl Index {
.or_insert_with(|| Namespace {
namespace: Arc::new(ns),
service_http_routes: Default::default(),
service_grpc_routes: Default::default(),
service_port_routes: Default::default(),
})
.apply_http_route(
@ -270,6 +291,42 @@ impl Index {
}
}
fn apply_grpc(&mut self, route: k8s_gateway_api::GrpcRoute) {
tracing::debug!(name = route.name_unchecked(), "indexing grpcroute");
for parent_ref in route.spec.inner.parent_refs.iter().flatten() {
if !is_parent_service(parent_ref) {
continue;
}
if !route_accepted_by_service(route.status.as_ref().map(|s| &s.inner), &parent_ref.name)
{
continue;
}
let ns = parent_ref
.namespace
.clone()
.unwrap_or_else(|| route.namespace().expect("GrpcRoute must have a namespace"));
self.namespaces
.by_ns
.entry(ns.clone())
.or_insert_with(|| Namespace {
namespace: Arc::new(ns),
service_http_routes: Default::default(),
service_grpc_routes: Default::default(),
service_port_routes: Default::default(),
})
.apply_grpc_route(
route.clone(),
parent_ref,
&self.namespaces.cluster_info,
&self.service_info,
);
}
}
fn reindex_services(&mut self) {
for ns in self.namespaces.by_ns.values_mut() {
ns.reindex_services(&self.service_info);
@ -336,6 +393,68 @@ impl Namespace {
}
}
fn apply_grpc_route(
&mut self,
route: k8s_gateway_api::GrpcRoute,
parent_ref: &ParentReference,
cluster_info: &ClusterInfo,
service_info: &HashMap<ServiceRef, ServiceInfo>,
) {
tracing::debug!(?route);
let outbound_route =
match grpc::convert_route(&self.namespace, route.clone(), cluster_info, service_info) {
Ok(route) => route,
Err(error) => {
tracing::error!(%error, "failed to convert route");
return;
}
};
tracing::debug!(?outbound_route);
let gknn = route
.gkn()
.namespaced(route.namespace().expect("Route must have namespace"));
let port = parent_ref.port.and_then(NonZeroU16::new);
if let Some(port) = port {
let service_port = ServicePort {
port,
service: parent_ref.name.clone(),
};
tracing::debug!(
?service_port,
route = route.name_unchecked(),
"inserting grpcroute for service"
);
let service_routes =
self.service_routes_or_default(service_port, cluster_info, service_info);
service_routes.apply_grpc_route(gknn, outbound_route);
} else {
// If the parent_ref doesn't include a port, apply this route
// to all ServiceRoutes which match the Service name.
self.service_port_routes.iter_mut().for_each(
|(ServicePort { service, port: _ }, routes)| {
if service == &parent_ref.name {
routes.apply_grpc_route(gknn.clone(), outbound_route.clone());
}
},
);
// Also add the route to the list of routes that target the
// Service without specifying a port.
self.service_grpc_routes
.entry(parent_ref.name.clone())
.or_default()
.insert(gknn, outbound_route);
}
}
fn reindex_services(&mut self, service_info: &HashMap<ServiceRef, ServiceInfo>) {
let update_service = |backend: &mut Backend| {
if let Backend::Service(svc) = backend {
@ -354,8 +473,13 @@ impl Namespace {
.values_mut()
.flat_map(|route| route.rules.iter_mut())
.flat_map(|rule| rule.backends.iter_mut());
let grpc_backends = watch
.grpc_routes
.values_mut()
.flat_map(|route| route.rules.iter_mut())
.flat_map(|rule| rule.backends.iter_mut());
http_backends.for_each(update_service);
http_backends.chain(grpc_backends).for_each(update_service);
watch.send_if_modified();
}
}
@ -386,6 +510,17 @@ impl Namespace {
});
}
fn delete_grpc_route(&mut self, gknn: &GroupKindNamespaceName) {
for service in self.service_port_routes.values_mut() {
service.delete_grpc_route(gknn);
}
self.service_grpc_routes.retain(|_, routes| {
routes.remove(gknn);
!routes.is_empty()
});
}
fn service_routes_or_default(
&mut self,
sp: ServicePort,
@ -415,6 +550,11 @@ impl Namespace {
.get(&sp.service)
.cloned()
.unwrap_or_default();
let grpc_routes = self
.service_grpc_routes
.get(&sp.service)
.cloned()
.unwrap_or_default();
let mut service_routes = ServiceRoutes {
opaque,
@ -432,6 +572,9 @@ impl Namespace {
let (producer_http_routes, consumer_http_routes): (Vec<_>, Vec<_>) = http_routes
.into_iter()
.partition(|(gknn, _)| gknn.namespace == *self.namespace);
let (producer_grpc_routes, consumer_grpc_routes): (Vec<_>, Vec<_>) = grpc_routes
.into_iter()
.partition(|(gknn, _)| gknn.namespace == *self.namespace);
for (consumer_gknn, consumer_route) in consumer_http_routes {
// Consumer routes should only apply to watches from the
@ -441,6 +584,14 @@ impl Namespace {
consumer_watch.insert_http_route(consumer_gknn.clone(), consumer_route.clone());
}
for (consumer_gknn, consumer_route) in consumer_grpc_routes {
// Consumer routes should only apply to watches from the
// consumer namespace.
let consumer_watch =
service_routes.watch_for_ns_or_default(consumer_gknn.namespace.to_string());
consumer_watch.insert_grpc_route(consumer_gknn.clone(), consumer_route.clone());
}
for (producer_gknn, producer_route) in producer_http_routes {
// Insert the route into the producer namespace.
@ -462,6 +613,26 @@ impl Namespace {
});
}
for (producer_gknn, producer_route) in producer_grpc_routes {
// Insert the route into the producer namespace.
let producer_watch =
service_routes.watch_for_ns_or_default(producer_gknn.namespace.to_string());
producer_watch.insert_grpc_route(producer_gknn.clone(), producer_route.clone());
// Producer routes apply to clients in all namespaces, so
// apply it to watches for all other namespaces too.
service_routes
.watches_by_ns
.iter_mut()
.filter(|(namespace, _)| {
namespace.as_str() != producer_gknn.namespace.as_ref()
})
.for_each(|(_, watch)| {
watch.insert_grpc_route(producer_gknn.clone(), producer_route.clone())
});
}
service_routes
})
}
@ -514,6 +685,11 @@ impl ServiceRoutes {
.get(self.namespace.as_ref())
.map(|watch| watch.http_routes.clone())
.unwrap_or_default();
let grpc_routes = self
.watches_by_ns
.get(self.namespace.as_ref())
.map(|watch| watch.grpc_routes.clone())
.unwrap_or_default();
self.watches_by_ns.entry(namespace).or_insert_with(|| {
let (sender, _) = watch::channel(OutboundPolicy {
@ -521,6 +697,7 @@ impl ServiceRoutes {
opaque: self.opaque,
accrual: self.accrual,
http_routes: http_routes.clone(),
grpc_routes: grpc_routes.clone(),
name: self.name.to_string(),
authority: self.authority.clone(),
namespace: self.namespace.to_string(),
@ -528,6 +705,7 @@ impl ServiceRoutes {
RoutesWatch {
http_routes,
grpc_routes,
watch: sender,
opaque: self.opaque,
accrual: self.accrual,
@ -561,6 +739,32 @@ impl ServiceRoutes {
}
}
fn apply_grpc_route(
&mut self,
gknn: GroupKindNamespaceName,
route: OutboundRoute<GrpcRouteMatch>,
) {
if *gknn.namespace == *self.namespace {
// This is a producer namespace route.
let watch = self.watch_for_ns_or_default(gknn.namespace.to_string());
watch.insert_grpc_route(gknn.clone(), route.clone());
// Producer routes apply to clients in all namespaces, so
// apply it to watches for all other namespaces too.
for (ns, ns_watch) in self.watches_by_ns.iter_mut() {
if ns != &gknn.namespace {
ns_watch.insert_grpc_route(gknn.clone(), route.clone());
}
}
} else {
// This is a consumer namespace route and should only apply to
// watches from that namespace.
let watch = self.watch_for_ns_or_default(gknn.namespace.to_string());
watch.insert_grpc_route(gknn, route);
}
}
fn update_service(&mut self, opaque: bool, accrual: Option<FailureAccrual>) {
self.opaque = opaque;
self.accrual = accrual;
@ -576,6 +780,12 @@ impl ServiceRoutes {
watch.remove_http_route(gknn);
}
}
fn delete_grpc_route(&mut self, gknn: &GroupKindNamespaceName) {
for watch in self.watches_by_ns.values_mut() {
watch.remove_grpc_route(gknn);
}
}
}
impl RoutesWatch {
@ -588,6 +798,11 @@ impl RoutesWatch {
modified = true;
}
if self.grpc_routes != policy.grpc_routes {
policy.grpc_routes = self.grpc_routes.clone();
modified = true;
}
if self.opaque != policy.opaque {
policy.opaque = self.opaque;
modified = true;
@ -612,10 +827,25 @@ impl RoutesWatch {
self.send_if_modified();
}
fn insert_grpc_route(
&mut self,
gknn: GroupKindNamespaceName,
route: OutboundRoute<GrpcRouteMatch>,
) {
self.grpc_routes.insert(gknn, route);
self.send_if_modified();
}
fn remove_http_route(&mut self, gknn: &GroupKindNamespaceName) {
self.http_routes.remove(gknn);
self.send_if_modified();
}
fn remove_grpc_route(&mut self, gknn: &GroupKindNamespaceName) {
self.grpc_routes.remove(gknn);
self.send_if_modified();
}
}
fn parse_accrual_config(

View File

@ -1 +1,2 @@
mod grpc;
mod http;

View File

@ -0,0 +1,174 @@
use kube::Resource;
use linkerd_policy_controller_core::{
outbound::{Backend, WeightedService},
routes::GroupKindNamespaceName,
POLICY_CONTROLLER_NAME,
};
use linkerd_policy_controller_k8s_api::gateway as k8s_gateway_api;
use tracing::Level;
use super::super::*;
#[test]
fn backend_service() {
tracing_subscriber::fmt()
.with_max_level(Level::TRACE)
.try_init()
.ok();
let test = TestConfig::default();
// Create apex service.
let apex = mk_service("ns", "apex", 8080);
test.index.write().apply(apex);
// Create httproute.
let route = mk_route("ns", "route", 8080, "apex", "backend");
test.index.write().apply(route);
let mut rx = test
.index
.write()
.outbound_policy_rx(
"apex".to_string(),
"ns".to_string(),
8080.try_into().unwrap(),
"ns".to_string(),
)
.expect("apex.ns should exist");
{
let policy = rx.borrow_and_update();
let backend = policy
.grpc_routes
.get(&GroupKindNamespaceName {
group: k8s_gateway_api::GrpcRoute::group(&()),
kind: k8s_gateway_api::GrpcRoute::kind(&()),
namespace: "ns".into(),
name: "route".into(),
})
.expect("route should exist")
.rules
.first()
.expect("rule should exist")
.backends
.first()
.expect("backend should exist");
let exists = match backend {
Backend::Service(WeightedService { exists, .. }) => exists,
_ => panic!("backend should be a service"),
};
// Backend should not exist.
assert!(!exists);
}
// Create backend service.
let backend = mk_service("ns", "backend", 8080);
test.index.write().apply(backend);
assert!(rx.has_changed().unwrap());
{
let policy = rx.borrow_and_update();
let backend = policy
.grpc_routes
.get(&GroupKindNamespaceName {
group: k8s_gateway_api::GrpcRoute::group(&()),
kind: k8s_gateway_api::GrpcRoute::kind(&()),
namespace: "ns".into(),
name: "route".into(),
})
.expect("route should exist")
.rules
.first()
.expect("rule should exist")
.backends
.first()
.expect("backend should exist");
let exists = match backend {
Backend::Service(WeightedService { exists, .. }) => exists,
_ => panic!("backend should be a service"),
};
// Backend should exist.
assert!(exists);
}
}
fn mk_route(
ns: impl ToString,
name: impl ToString,
port: u16,
parent: impl ToString,
backend: impl ToString,
) -> k8s_gateway_api::GrpcRoute {
use chrono::Utc;
use k8s::{policy::httproute::*, Time};
k8s_gateway_api::GrpcRoute {
metadata: k8s::ObjectMeta {
namespace: Some(ns.to_string()),
name: Some(name.to_string()),
creation_timestamp: Some(Time(Utc::now())),
..Default::default()
},
spec: k8s_gateway_api::GrpcRouteSpec {
inner: CommonRouteSpec {
parent_refs: Some(vec![ParentReference {
group: Some("core".to_string()),
kind: Some("Service".to_string()),
namespace: Some(ns.to_string()),
name: parent.to_string(),
section_name: None,
port: Some(port),
}]),
},
hostnames: None,
rules: Some(vec![k8s_gateway_api::GrpcRouteRule {
matches: Some(vec![k8s_gateway_api::GrpcRouteMatch {
headers: None,
method: Some(k8s_gateway_api::GrpcMethodMatch::Exact {
method: Some("Test".to_string()),
service: Some("io.linkerd.Testing".to_string()),
}),
}]),
filters: None,
backend_refs: Some(vec![k8s_gateway_api::GrpcRouteBackendRef {
filters: None,
weight: None,
inner: BackendObjectReference {
group: Some("core".to_string()),
kind: Some("Service".to_string()),
namespace: Some(ns.to_string()),
name: backend.to_string(),
port: Some(port),
},
}]),
}]),
},
status: Some(k8s_gateway_api::GrpcRouteStatus {
inner: RouteStatus {
parents: vec![k8s::gateway::RouteParentStatus {
parent_ref: ParentReference {
group: Some("core".to_string()),
kind: Some("Service".to_string()),
namespace: Some(ns.to_string()),
name: parent.to_string(),
section_name: None,
port: Some(port),
},
controller_name: POLICY_CONTROLLER_NAME.to_string(),
conditions: vec![k8s::Condition {
last_transition_time: Time(chrono::DateTime::<Utc>::MIN_UTC),
message: "".to_string(),
observed_generation: None,
reason: "Accepted".to_string(),
status: "True".to_string(),
type_: "Accepted".to_string(),
}],
}],
},
}),
}
}

View File

@ -1,6 +1,7 @@
use linkerd_policy_controller_core::routes::{GroupKindName, GroupKindNamespaceName};
use linkerd_policy_controller_k8s_api::{gateway as api, policy, Resource, ResourceExt};
pub mod grpc;
pub mod http;
#[derive(Debug, Clone)]

View File

@ -0,0 +1,22 @@
use anyhow::Result;
use linkerd_policy_controller_core::routes;
use linkerd_policy_controller_k8s_api::gateway as k8s_gateway_api;
pub fn try_match(
k8s_gateway_api::GrpcRouteMatch { headers, method }: k8s_gateway_api::GrpcRouteMatch,
) -> Result<routes::GrpcRouteMatch> {
let headers = headers
.into_iter()
.flatten()
.map(super::http::header_match)
.collect::<Result<_>>()?;
let method = method.map(|value| match value {
k8s_gateway_api::GrpcMethodMatch::Exact { method, service }
| k8s_gateway_api::GrpcMethodMatch::RegularExpression { method, service } => {
routes::GrpcMethodMatch { method, service }
}
});
Ok(routes::GrpcRouteMatch { headers, method })
}

View File

@ -262,8 +262,11 @@ async fn main() -> Result<()> {
let gateway_grpc_routes =
runtime.watch_all::<k8s_gateway_api::GrpcRoute>(watcher::Config::default());
let gateway_grpc_routes_indexes = IndexList::new(outbound_index.clone())
.push(status_index.clone())
.shared();
tokio::spawn(
kubert::index::namespaced(status_index.clone(), gateway_grpc_routes)
kubert::index::namespaced(gateway_grpc_routes_indexes.clone(), gateway_grpc_routes)
.instrument(info_span!("grpcroutes.gateway.networking.k8s.io")),
);