From b981f52bcbc66cf2537c9d89159a02ecf700b84a Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Wed, 19 Jul 2023 16:07:32 -0700 Subject: [PATCH] Allow port in HttpRoute parent_ref to be optional (#11107) According to the [xRoutes Mesh Binding KEP](https://gateway-api.sigs.k8s.io/geps/gep-1426/#ports), the port in a parent reference is optional: > By default, a Service attachment applies to all ports in the service. Users may want to attach routes to only a specific port in a Service. To do so, the parentRef.port field should be used. > If port is set, the implementation MUST associate the route only with that port. If port is not set, the implementation MUST associate the route with all ports defined in the Service. However, we currently ignore any HttpRoutes which don't have a port specified in the parent ref. We update the policy controller to apply HttpRoutes which do not specify a port in the parent ref to all ports of the parent service. We do this by storing these "portless" HttpRoutes in the index and then copying these routes into every port-specific watch for that service. Signed-off-by: Alex Leong --- .../k8s/index/src/outbound/index.rs | 128 ++++++++------ policy-test/tests/outbound_api_gateway.rs | 148 +++++++++++++---- policy-test/tests/outbound_api_linkerd.rs | 157 ++++++++++++++---- 3 files changed, 323 insertions(+), 110 deletions(-) diff --git a/policy-controller/k8s/index/src/outbound/index.rs b/policy-controller/k8s/index/src/outbound/index.rs index 05f7eebb1..7c8aa6f26 100644 --- a/policy-controller/k8s/index/src/outbound/index.rs +++ b/policy-controller/k8s/index/src/outbound/index.rs @@ -42,7 +42,13 @@ struct NamespaceIndex { #[derive(Debug)] struct Namespace { - service_routes: HashMap, + /// Stores an observable handle for each known service:port, + /// as well as any route resources in the cluster that specify + /// a port. + service_port_routes: HashMap, + /// Stores the route resources (by service name) that do not + /// explicitly target a port. + service_routes: HashMap>, namespace: Arc, } @@ -133,6 +139,7 @@ impl kubert::index::IndexNamespacedResource for Index { .entry(ns.clone()) .or_insert_with(|| Namespace { service_routes: Default::default(), + service_port_routes: Default::default(), namespace: Arc::new(ns), }) .update_service(service.name_unchecked(), &service_info); @@ -177,6 +184,7 @@ impl Index { .entry(namespace.clone()) .or_insert_with(|| Namespace { service_routes: Default::default(), + service_port_routes: Default::default(), namespace: Arc::new(namespace.to_string()), }); let key = ServicePort { service, port }; @@ -198,6 +206,7 @@ impl Index { .entry(ns.clone()) .or_insert_with(|| Namespace { service_routes: Default::default(), + service_port_routes: Default::default(), namespace: Arc::new(ns), }) .apply(route, &self.namespaces.cluster_info, &self.service_info); @@ -229,32 +238,43 @@ impl Namespace { continue; } - if let Some(port) = parent_ref.port { - if let Some(port) = NonZeroU16::new(port) { - let service_port = ServicePort { - port, - service: parent_ref.name.clone(), - }; - tracing::debug!( - ?service_port, - route = route.name(), - "inserting route for service" - ); - let service_routes = - self.service_routes_or_default(service_port, cluster_info, service_info); - service_routes.apply(route.gkn(), outbound_route.clone()); - } else { - tracing::warn!(?parent_ref, "ignoring parent_ref with port 0"); - } + let port = parent_ref.port.and_then(NonZeroU16::new); + if let Some(port) = port { + let service_port = ServicePort { + port, + service: parent_ref.name.clone(), + }; + tracing::debug!( + ?service_port, + route = route.name(), + "inserting route for service" + ); + let service_routes = + self.service_routes_or_default(service_port, cluster_info, service_info); + service_routes.apply(route.gkn(), outbound_route.clone()); } else { - tracing::warn!(?parent_ref, "ignoring parent_ref without port"); + // If the parent_ref doesn't include a port, apply this route + // to all ServiceRoutes which match the Service name. + self.service_port_routes.iter_mut().for_each( + |(ServicePort { service, port: _ }, routes)| { + if service == &parent_ref.name { + routes.apply(route.gkn(), outbound_route.clone()); + } + }, + ); + // Also add the route to the list of routes that target the + // Service without specifying a port. + self.service_routes + .entry(parent_ref.name.clone()) + .or_default() + .insert(route.gkn(), outbound_route.clone()); } } } fn update_service(&mut self, name: String, service: &ServiceInfo) { tracing::debug!(?name, ?service, "updating service"); - for (svc_port, svc_routes) in self.service_routes.iter_mut() { + for (svc_port, svc_routes) in self.service_port_routes.iter_mut() { if svc_port.service != name { continue; } @@ -265,9 +285,12 @@ impl Namespace { } fn delete(&mut self, gkn: GroupKindName) { - for service in self.service_routes.values_mut() { + for service in self.service_port_routes.values_mut() { service.delete(&gkn); } + for routes in self.service_routes.values_mut() { + routes.remove(&gkn); + } } fn service_routes_or_default( @@ -276,33 +299,44 @@ impl Namespace { cluster: &ClusterInfo, service_info: &HashMap, ) -> &mut ServiceRoutes { - self.service_routes.entry(sp.clone()).or_insert_with(|| { - let authority = cluster.service_dns_authority(&self.namespace, &sp.service, sp.port); - let service_ref = ServiceRef { - name: sp.service.clone(), - namespace: self.namespace.to_string(), - }; - let (opaque, accrual) = match service_info.get(&service_ref) { - Some(svc) => (svc.opaque_ports.contains(&sp.port), svc.accrual), - None => (false, None), - }; + self.service_port_routes + .entry(sp.clone()) + .or_insert_with(|| { + let authority = + cluster.service_dns_authority(&self.namespace, &sp.service, sp.port); + let service_ref = ServiceRef { + name: sp.service.clone(), + namespace: self.namespace.to_string(), + }; + let (opaque, accrual) = match service_info.get(&service_ref) { + Some(svc) => (svc.opaque_ports.contains(&sp.port), svc.accrual), + None => (false, None), + }; - let (sender, _) = watch::channel(OutboundPolicy { - http_routes: Default::default(), - authority, - name: sp.service.clone(), - namespace: self.namespace.to_string(), - port: sp.port, - opaque, - accrual, - }); - ServiceRoutes { - routes: Default::default(), - watch: sender, - opaque, - accrual, - } - }) + // The HttpRoutes which target this Service but don't specify + // a port apply to all ports. Therefore we include them. + let routes = self + .service_routes + .get(&sp.service) + .cloned() + .unwrap_or_default(); + + let (sender, _) = watch::channel(OutboundPolicy { + http_routes: routes.clone(), + authority, + name: sp.service.clone(), + namespace: self.namespace.to_string(), + port: sp.port, + opaque, + accrual, + }); + ServiceRoutes { + routes, + watch: sender, + opaque, + accrual, + } + }) } fn convert_route( diff --git a/policy-test/tests/outbound_api_gateway.rs b/policy-test/tests/outbound_api_gateway.rs index 54861fafc..df0ccefa6 100644 --- a/policy-test/tests/outbound_api_gateway.rs +++ b/policy-test/tests/outbound_api_gateway.rs @@ -33,7 +33,7 @@ async fn service_with_no_http_routes() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -56,7 +56,7 @@ async fn service_with_http_route_without_rules() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -94,7 +94,7 @@ async fn service_with_http_routes_without_backends() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -108,7 +108,11 @@ async fn service_with_http_routes_without_backends() { assert_route_is_default(route, &svc, 4191); }); - let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, 4191).build()).await; + let _route = create( + &client, + mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(), + ) + .await; let config = rx .next() @@ -134,7 +138,7 @@ async fn service_with_http_routes_with_backend() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -151,8 +155,11 @@ async fn service_with_http_routes_with_backend() { let backend_name = "backend"; let backend_svc = create_service(&client, &ns, backend_name, 8888).await; let backends = [backend_name]; - let route = - mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(Some(&backends), None, None); + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends( + Some(&backends), + None, + None, + ); let _route = create(&client, route.build()).await; let config = rx @@ -181,7 +188,7 @@ async fn service_with_http_routes_with_cross_namespace_backend() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -213,7 +220,7 @@ async fn service_with_http_routes_with_cross_namespace_backend() { let backend_name = "backend"; let backend_svc = create_service(&client, &backend_ns_name, backend_name, 8888).await; let backends = [backend_name]; - let route = mk_http_route(&ns, "foo-route", &svc, 4191).with_backends( + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends( Some(&backends), Some(backend_ns_name), None, @@ -249,7 +256,7 @@ async fn service_with_http_routes_with_invalid_backend() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -264,8 +271,11 @@ async fn service_with_http_routes_with_invalid_backend() { }); let backends = ["invalid-backend"]; - let route = - mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(Some(&backends), None, None); + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends( + Some(&backends), + None, + None, + ); let _route = create(&client, route.build()).await; let config = rx @@ -294,7 +304,7 @@ async fn service_with_multiple_http_routes() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -311,7 +321,11 @@ async fn service_with_multiple_http_routes() { // Routes should be returned in sorted order by creation timestamp then // name. To ensure that this test isn't timing dependant, routes should // be created in alphabetical order. - let _a_route = create(&client, mk_http_route(&ns, "a-route", &svc, 4191).build()).await; + let _a_route = create( + &client, + mk_http_route(&ns, "a-route", &svc, Some(4191)).build(), + ) + .await; // First route update. let config = rx @@ -321,7 +335,11 @@ async fn service_with_multiple_http_routes() { .expect("watch must return an updated config"); tracing::trace!(?config); - let _b_route = create(&client, mk_http_route(&ns, "b-route", &svc, 4191).build()).await; + let _b_route = create( + &client, + mk_http_route(&ns, "b-route", &svc, Some(4191)).build(), + ) + .await; // Second route update. let config = rx @@ -374,7 +392,7 @@ async fn service_with_consecutive_failure_accrual() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -418,7 +436,7 @@ async fn service_with_consecutive_failure_accrual_defaults() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -456,7 +474,7 @@ async fn service_with_consecutive_failure_accrual_defaults() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -494,7 +512,7 @@ async fn service_with_consecutive_failure_accrual_defaults() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -528,7 +546,7 @@ async fn service_with_default_failure_accrual() { // Default config for Service, no failure accrual let svc = create_service(&client, &ns, "default-failure-accrual", 80).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -558,7 +576,7 @@ async fn service_with_default_failure_accrual() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -583,7 +601,7 @@ async fn opaque_service() { // Create a service let svc = create_opaque_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -606,7 +624,7 @@ async fn route_with_filters() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -622,7 +640,7 @@ async fn route_with_filters() { let backend_name = "backend"; let backends = [backend_name]; - let route = mk_http_route(&ns, "foo-route", &svc, 4191) + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)) .with_backends(Some(&backends), None, None) .with_filters(Some(vec![ k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier { @@ -716,7 +734,7 @@ async fn backend_with_filters() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -733,7 +751,7 @@ async fn backend_with_filters() { let backend_name = "backend"; let backend_svc = create_service(&client, &ns, backend_name, 8888).await; let backends = [backend_name]; - let route = mk_http_route(&ns, "foo-route", &svc, 4191) + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)) .with_backends(Some(&backends), None, Some(vec![ k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier { request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter { @@ -824,6 +842,58 @@ async fn backend_with_filters() { .await; } +#[tokio::test(flavor = "current_thread")] +async fn http_route_with_no_port() { + with_temp_ns(|client, ns| async move { + // Create a service + let svc = create_service(&client, &ns, "my-svc", 4191).await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + // There should be a default route. + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + assert_route_is_default(route, &svc, 4191); + }); + + let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await; + + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an updated config"); + tracing::trace!(?config); + + // The route should apply to the service. + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + assert_route_name_eq(route, "foo-route"); + }); + + // The route should apply to other ports too. + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + assert_route_name_eq(route, "foo-route"); + }); + }) + .await; +} + /* Helpers */ struct HttpRouteBuilder(k8s_gateway_api::HttpRoute); @@ -832,12 +902,13 @@ async fn retry_watch_outbound_policy( client: &kube::Client, ns: &str, svc: &k8s::Service, + port: u16, ) -> tonic::Streaming { // Port-forward to the control plane and start watching the service's // outbound policy. let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await; loop { - match policy_api.watch(ns, svc, 4191).await { + match policy_api.watch(ns, svc, port).await { Ok(rx) => return rx, Err(error) => { tracing::error!( @@ -852,7 +923,7 @@ async fn retry_watch_outbound_policy( } } -fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRouteBuilder { +fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option) -> HttpRouteBuilder { use k8s_gateway_api as api; HttpRouteBuilder(api::HttpRoute { @@ -869,7 +940,7 @@ fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRou namespace: svc.namespace(), name: svc.name_unchecked(), section_name: None, - port: Some(port), + port, }]), }, hostnames: None, @@ -1106,6 +1177,14 @@ fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::Weigh #[track_caller] fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) { + let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); + match kind { + grpc::meta::metadata::Kind::Default(_) => {} + grpc::meta::metadata::Kind::Resource(r) => { + panic!("route expected to be default but got resource {r:?}") + } + } + let backends = route_backends_first_available(route); let backend = assert_singleton(backends); assert_backend_matches_service(backend, svc, port); @@ -1154,3 +1233,14 @@ fn assert_singleton(ts: &[T]) -> &T { assert_eq!(ts.len(), 1); ts.get(0).unwrap() } + +#[track_caller] +fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) { + let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); + match kind { + grpc::meta::metadata::Kind::Default(d) => { + panic!("route expected to not be default, but got default {d:?}") + } + grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name), + } +} diff --git a/policy-test/tests/outbound_api_linkerd.rs b/policy-test/tests/outbound_api_linkerd.rs index fa3301752..b5c203f84 100644 --- a/policy-test/tests/outbound_api_linkerd.rs +++ b/policy-test/tests/outbound_api_linkerd.rs @@ -38,7 +38,7 @@ async fn service_with_no_http_routes() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -61,7 +61,7 @@ async fn service_with_http_route_without_rules() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -99,7 +99,7 @@ async fn service_with_http_routes_without_backends() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -113,7 +113,11 @@ async fn service_with_http_routes_without_backends() { assert_route_is_default(route, &svc, 4191); }); - let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, 4191).build()).await; + let _route = create( + &client, + mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(), + ) + .await; let config = rx .next() @@ -139,7 +143,7 @@ async fn service_with_http_routes_with_backend() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -156,8 +160,11 @@ async fn service_with_http_routes_with_backend() { let backend_name = "backend"; let backend_svc = create_service(&client, &ns, backend_name, 8888).await; let backends = [backend_name]; - let route = - mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(Some(&backends), None, None); + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends( + Some(&backends), + None, + None, + ); let _route = create(&client, route.build()).await; let config = rx @@ -186,7 +193,7 @@ async fn service_with_http_routes_with_cross_namespace_backend() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -218,7 +225,7 @@ async fn service_with_http_routes_with_cross_namespace_backend() { let backend_name = "backend"; let backend_svc = create_service(&client, &backend_ns_name, backend_name, 8888).await; let backends = [backend_name]; - let route = mk_http_route(&ns, "foo-route", &svc, 4191).with_backends( + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends( Some(&backends), Some(backend_ns_name), None, @@ -254,7 +261,7 @@ async fn service_with_http_routes_with_invalid_backend() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -269,8 +276,11 @@ async fn service_with_http_routes_with_invalid_backend() { }); let backends = ["invalid-backend"]; - let route = - mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(Some(&backends), None, None); + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends( + Some(&backends), + None, + None, + ); let _route = create(&client, route.build()).await; let config = rx @@ -299,7 +309,7 @@ async fn service_with_multiple_http_routes() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -316,7 +326,11 @@ async fn service_with_multiple_http_routes() { // Routes should be returned in sorted order by creation timestamp then // name. To ensure that this test isn't timing dependant, routes should // be created in alphabetical order. - let _a_route = create(&client, mk_http_route(&ns, "a-route", &svc, 4191).build()).await; + let _a_route = create( + &client, + mk_http_route(&ns, "a-route", &svc, Some(4191)).build(), + ) + .await; // First route update. let config = rx @@ -326,7 +340,11 @@ async fn service_with_multiple_http_routes() { .expect("watch must return an updated config"); tracing::trace!(?config); - let _b_route = create(&client, mk_http_route(&ns, "b-route", &svc, 4191).build()).await; + let _b_route = create( + &client, + mk_http_route(&ns, "b-route", &svc, Some(4191)).build(), + ) + .await; // Second route update. let config = rx @@ -379,7 +397,7 @@ async fn service_with_consecutive_failure_accrual() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -423,7 +441,7 @@ async fn service_with_consecutive_failure_accrual_defaults() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -461,7 +479,7 @@ async fn service_with_consecutive_failure_accrual_defaults() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -499,7 +517,7 @@ async fn service_with_consecutive_failure_accrual_defaults() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -533,7 +551,7 @@ async fn service_with_default_failure_accrual() { // Default config for Service, no failure accrual let svc = create_service(&client, &ns, "default-failure-accrual", 80).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -563,7 +581,7 @@ async fn service_with_default_failure_accrual() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -588,7 +606,7 @@ async fn opaque_service() { // Create a service let svc = create_opaque_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -611,7 +629,7 @@ async fn route_rule_with_filters() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -631,7 +649,8 @@ async fn route_rule_with_filters() { &ns, "foo-route", &svc, - 4191).with_backends(Some(&backends), None, None).with_filters( Some(vec![ + Some(4191), + ).with_backends(Some(&backends), None, None).with_filters(Some(vec![ k8s::policy::httproute::HttpRouteFilter::RequestHeaderModifier { request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter { set: Some(vec![k8s_gateway_api::HttpHeader { @@ -727,7 +746,7 @@ async fn backend_with_filters() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -744,11 +763,12 @@ async fn backend_with_filters() { let backend_name = "backend"; let backend_svc = create_service(&client, &ns, backend_name, 8888).await; let backends = [backend_name]; - let route = mk_http_route( + let route = mk_http_route( &ns, "foo-route", &svc, - 4191).with_backends(Some(&backends), None, Some(vec![ + Some(4191) + ).with_backends(Some(&backends), None, Some(vec![ k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier { request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter { set: Some(vec![k8s_gateway_api::HttpHeader { @@ -774,10 +794,7 @@ async fn backend_with_filters() { }, }, ])); - let _route = create( - &client, - route.build(), - ) + let _route = create(&client, route.build()) .await; let config = rx @@ -842,6 +859,58 @@ async fn backend_with_filters() { .await; } +#[tokio::test(flavor = "current_thread")] +async fn http_route_with_no_port() { + with_temp_ns(|client, ns| async move { + // Create a service + let svc = create_service(&client, &ns, "my-svc", 4191).await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + // There should be a default route. + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + assert_route_is_default(route, &svc, 4191); + }); + + let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await; + + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an updated config"); + tracing::trace!(?config); + + // The route should apply to the service. + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + assert_route_name_eq(route, "foo-route"); + }); + + // The route should apply to other ports too. + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + assert_route_name_eq(route, "foo-route"); + }); + }) + .await; +} + /* Helpers */ struct HttpRouteBuilder(k8s::policy::HttpRoute); @@ -850,12 +919,13 @@ async fn retry_watch_outbound_policy( client: &kube::Client, ns: &str, svc: &k8s::Service, + port: u16, ) -> tonic::Streaming { // Port-forward to the control plane and start watching the service's // outbound policy. let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await; loop { - match policy_api.watch(ns, svc, 4191).await { + match policy_api.watch(ns, svc, port).await { Ok(rx) => return rx, Err(error) => { tracing::error!( @@ -870,7 +940,7 @@ async fn retry_watch_outbound_policy( } } -fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRouteBuilder { +fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option) -> HttpRouteBuilder { use k8s::policy::httproute as api; HttpRouteBuilder(api::HttpRoute { @@ -887,7 +957,7 @@ fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRou namespace: svc.namespace(), name: svc.name_unchecked(), section_name: None, - port: Some(port), + port, }]), }, hostnames: None, @@ -1125,6 +1195,14 @@ fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::Weigh #[track_caller] fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) { + let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); + match kind { + grpc::meta::metadata::Kind::Default(_) => {} + grpc::meta::metadata::Kind::Resource(r) => { + panic!("route expected to be default but got resource {r:?}") + } + } + let backends = route_backends_first_available(route); let backend = assert_singleton(backends); assert_backend_matches_service(backend, svc, port); @@ -1173,3 +1251,14 @@ fn assert_singleton(ts: &[T]) -> &T { assert_eq!(ts.len(), 1); ts.get(0).unwrap() } + +#[track_caller] +fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) { + let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); + match kind { + grpc::meta::metadata::Kind::Default(d) => { + panic!("route expected to not be default, but got default {d:?}") + } + grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name), + } +}