diff --git a/policy-controller/k8s/index/src/outbound/index.rs b/policy-controller/k8s/index/src/outbound/index.rs index 05f7eebb1..7c8aa6f26 100644 --- a/policy-controller/k8s/index/src/outbound/index.rs +++ b/policy-controller/k8s/index/src/outbound/index.rs @@ -42,7 +42,13 @@ struct NamespaceIndex { #[derive(Debug)] struct Namespace { - service_routes: HashMap, + /// Stores an observable handle for each known service:port, + /// as well as any route resources in the cluster that specify + /// a port. + service_port_routes: HashMap, + /// Stores the route resources (by service name) that do not + /// explicitly target a port. + service_routes: HashMap>, namespace: Arc, } @@ -133,6 +139,7 @@ impl kubert::index::IndexNamespacedResource for Index { .entry(ns.clone()) .or_insert_with(|| Namespace { service_routes: Default::default(), + service_port_routes: Default::default(), namespace: Arc::new(ns), }) .update_service(service.name_unchecked(), &service_info); @@ -177,6 +184,7 @@ impl Index { .entry(namespace.clone()) .or_insert_with(|| Namespace { service_routes: Default::default(), + service_port_routes: Default::default(), namespace: Arc::new(namespace.to_string()), }); let key = ServicePort { service, port }; @@ -198,6 +206,7 @@ impl Index { .entry(ns.clone()) .or_insert_with(|| Namespace { service_routes: Default::default(), + service_port_routes: Default::default(), namespace: Arc::new(ns), }) .apply(route, &self.namespaces.cluster_info, &self.service_info); @@ -229,32 +238,43 @@ impl Namespace { continue; } - if let Some(port) = parent_ref.port { - if let Some(port) = NonZeroU16::new(port) { - let service_port = ServicePort { - port, - service: parent_ref.name.clone(), - }; - tracing::debug!( - ?service_port, - route = route.name(), - "inserting route for service" - ); - let service_routes = - self.service_routes_or_default(service_port, cluster_info, service_info); - service_routes.apply(route.gkn(), outbound_route.clone()); - } else { - tracing::warn!(?parent_ref, "ignoring parent_ref with port 0"); - } + let port = parent_ref.port.and_then(NonZeroU16::new); + if let Some(port) = port { + let service_port = ServicePort { + port, + service: parent_ref.name.clone(), + }; + tracing::debug!( + ?service_port, + route = route.name(), + "inserting route for service" + ); + let service_routes = + self.service_routes_or_default(service_port, cluster_info, service_info); + service_routes.apply(route.gkn(), outbound_route.clone()); } else { - tracing::warn!(?parent_ref, "ignoring parent_ref without port"); + // If the parent_ref doesn't include a port, apply this route + // to all ServiceRoutes which match the Service name. + self.service_port_routes.iter_mut().for_each( + |(ServicePort { service, port: _ }, routes)| { + if service == &parent_ref.name { + routes.apply(route.gkn(), outbound_route.clone()); + } + }, + ); + // Also add the route to the list of routes that target the + // Service without specifying a port. + self.service_routes + .entry(parent_ref.name.clone()) + .or_default() + .insert(route.gkn(), outbound_route.clone()); } } } fn update_service(&mut self, name: String, service: &ServiceInfo) { tracing::debug!(?name, ?service, "updating service"); - for (svc_port, svc_routes) in self.service_routes.iter_mut() { + for (svc_port, svc_routes) in self.service_port_routes.iter_mut() { if svc_port.service != name { continue; } @@ -265,9 +285,12 @@ impl Namespace { } fn delete(&mut self, gkn: GroupKindName) { - for service in self.service_routes.values_mut() { + for service in self.service_port_routes.values_mut() { service.delete(&gkn); } + for routes in self.service_routes.values_mut() { + routes.remove(&gkn); + } } fn service_routes_or_default( @@ -276,33 +299,44 @@ impl Namespace { cluster: &ClusterInfo, service_info: &HashMap, ) -> &mut ServiceRoutes { - self.service_routes.entry(sp.clone()).or_insert_with(|| { - let authority = cluster.service_dns_authority(&self.namespace, &sp.service, sp.port); - let service_ref = ServiceRef { - name: sp.service.clone(), - namespace: self.namespace.to_string(), - }; - let (opaque, accrual) = match service_info.get(&service_ref) { - Some(svc) => (svc.opaque_ports.contains(&sp.port), svc.accrual), - None => (false, None), - }; + self.service_port_routes + .entry(sp.clone()) + .or_insert_with(|| { + let authority = + cluster.service_dns_authority(&self.namespace, &sp.service, sp.port); + let service_ref = ServiceRef { + name: sp.service.clone(), + namespace: self.namespace.to_string(), + }; + let (opaque, accrual) = match service_info.get(&service_ref) { + Some(svc) => (svc.opaque_ports.contains(&sp.port), svc.accrual), + None => (false, None), + }; - let (sender, _) = watch::channel(OutboundPolicy { - http_routes: Default::default(), - authority, - name: sp.service.clone(), - namespace: self.namespace.to_string(), - port: sp.port, - opaque, - accrual, - }); - ServiceRoutes { - routes: Default::default(), - watch: sender, - opaque, - accrual, - } - }) + // The HttpRoutes which target this Service but don't specify + // a port apply to all ports. Therefore we include them. + let routes = self + .service_routes + .get(&sp.service) + .cloned() + .unwrap_or_default(); + + let (sender, _) = watch::channel(OutboundPolicy { + http_routes: routes.clone(), + authority, + name: sp.service.clone(), + namespace: self.namespace.to_string(), + port: sp.port, + opaque, + accrual, + }); + ServiceRoutes { + routes, + watch: sender, + opaque, + accrual, + } + }) } fn convert_route( diff --git a/policy-test/tests/outbound_api_gateway.rs b/policy-test/tests/outbound_api_gateway.rs index 54861fafc..df0ccefa6 100644 --- a/policy-test/tests/outbound_api_gateway.rs +++ b/policy-test/tests/outbound_api_gateway.rs @@ -33,7 +33,7 @@ async fn service_with_no_http_routes() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -56,7 +56,7 @@ async fn service_with_http_route_without_rules() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -94,7 +94,7 @@ async fn service_with_http_routes_without_backends() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -108,7 +108,11 @@ async fn service_with_http_routes_without_backends() { assert_route_is_default(route, &svc, 4191); }); - let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, 4191).build()).await; + let _route = create( + &client, + mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(), + ) + .await; let config = rx .next() @@ -134,7 +138,7 @@ async fn service_with_http_routes_with_backend() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -151,8 +155,11 @@ async fn service_with_http_routes_with_backend() { let backend_name = "backend"; let backend_svc = create_service(&client, &ns, backend_name, 8888).await; let backends = [backend_name]; - let route = - mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(Some(&backends), None, None); + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends( + Some(&backends), + None, + None, + ); let _route = create(&client, route.build()).await; let config = rx @@ -181,7 +188,7 @@ async fn service_with_http_routes_with_cross_namespace_backend() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -213,7 +220,7 @@ async fn service_with_http_routes_with_cross_namespace_backend() { let backend_name = "backend"; let backend_svc = create_service(&client, &backend_ns_name, backend_name, 8888).await; let backends = [backend_name]; - let route = mk_http_route(&ns, "foo-route", &svc, 4191).with_backends( + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends( Some(&backends), Some(backend_ns_name), None, @@ -249,7 +256,7 @@ async fn service_with_http_routes_with_invalid_backend() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -264,8 +271,11 @@ async fn service_with_http_routes_with_invalid_backend() { }); let backends = ["invalid-backend"]; - let route = - mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(Some(&backends), None, None); + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends( + Some(&backends), + None, + None, + ); let _route = create(&client, route.build()).await; let config = rx @@ -294,7 +304,7 @@ async fn service_with_multiple_http_routes() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -311,7 +321,11 @@ async fn service_with_multiple_http_routes() { // Routes should be returned in sorted order by creation timestamp then // name. To ensure that this test isn't timing dependant, routes should // be created in alphabetical order. - let _a_route = create(&client, mk_http_route(&ns, "a-route", &svc, 4191).build()).await; + let _a_route = create( + &client, + mk_http_route(&ns, "a-route", &svc, Some(4191)).build(), + ) + .await; // First route update. let config = rx @@ -321,7 +335,11 @@ async fn service_with_multiple_http_routes() { .expect("watch must return an updated config"); tracing::trace!(?config); - let _b_route = create(&client, mk_http_route(&ns, "b-route", &svc, 4191).build()).await; + let _b_route = create( + &client, + mk_http_route(&ns, "b-route", &svc, Some(4191)).build(), + ) + .await; // Second route update. let config = rx @@ -374,7 +392,7 @@ async fn service_with_consecutive_failure_accrual() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -418,7 +436,7 @@ async fn service_with_consecutive_failure_accrual_defaults() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -456,7 +474,7 @@ async fn service_with_consecutive_failure_accrual_defaults() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -494,7 +512,7 @@ async fn service_with_consecutive_failure_accrual_defaults() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -528,7 +546,7 @@ async fn service_with_default_failure_accrual() { // Default config for Service, no failure accrual let svc = create_service(&client, &ns, "default-failure-accrual", 80).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -558,7 +576,7 @@ async fn service_with_default_failure_accrual() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -583,7 +601,7 @@ async fn opaque_service() { // Create a service let svc = create_opaque_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -606,7 +624,7 @@ async fn route_with_filters() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -622,7 +640,7 @@ async fn route_with_filters() { let backend_name = "backend"; let backends = [backend_name]; - let route = mk_http_route(&ns, "foo-route", &svc, 4191) + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)) .with_backends(Some(&backends), None, None) .with_filters(Some(vec![ k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier { @@ -716,7 +734,7 @@ async fn backend_with_filters() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -733,7 +751,7 @@ async fn backend_with_filters() { let backend_name = "backend"; let backend_svc = create_service(&client, &ns, backend_name, 8888).await; let backends = [backend_name]; - let route = mk_http_route(&ns, "foo-route", &svc, 4191) + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)) .with_backends(Some(&backends), None, Some(vec![ k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier { request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter { @@ -824,6 +842,58 @@ async fn backend_with_filters() { .await; } +#[tokio::test(flavor = "current_thread")] +async fn http_route_with_no_port() { + with_temp_ns(|client, ns| async move { + // Create a service + let svc = create_service(&client, &ns, "my-svc", 4191).await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + // There should be a default route. + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + assert_route_is_default(route, &svc, 4191); + }); + + let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await; + + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an updated config"); + tracing::trace!(?config); + + // The route should apply to the service. + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + assert_route_name_eq(route, "foo-route"); + }); + + // The route should apply to other ports too. + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + assert_route_name_eq(route, "foo-route"); + }); + }) + .await; +} + /* Helpers */ struct HttpRouteBuilder(k8s_gateway_api::HttpRoute); @@ -832,12 +902,13 @@ async fn retry_watch_outbound_policy( client: &kube::Client, ns: &str, svc: &k8s::Service, + port: u16, ) -> tonic::Streaming { // Port-forward to the control plane and start watching the service's // outbound policy. let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await; loop { - match policy_api.watch(ns, svc, 4191).await { + match policy_api.watch(ns, svc, port).await { Ok(rx) => return rx, Err(error) => { tracing::error!( @@ -852,7 +923,7 @@ async fn retry_watch_outbound_policy( } } -fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRouteBuilder { +fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option) -> HttpRouteBuilder { use k8s_gateway_api as api; HttpRouteBuilder(api::HttpRoute { @@ -869,7 +940,7 @@ fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRou namespace: svc.namespace(), name: svc.name_unchecked(), section_name: None, - port: Some(port), + port, }]), }, hostnames: None, @@ -1106,6 +1177,14 @@ fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::Weigh #[track_caller] fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) { + let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); + match kind { + grpc::meta::metadata::Kind::Default(_) => {} + grpc::meta::metadata::Kind::Resource(r) => { + panic!("route expected to be default but got resource {r:?}") + } + } + let backends = route_backends_first_available(route); let backend = assert_singleton(backends); assert_backend_matches_service(backend, svc, port); @@ -1154,3 +1233,14 @@ fn assert_singleton(ts: &[T]) -> &T { assert_eq!(ts.len(), 1); ts.get(0).unwrap() } + +#[track_caller] +fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) { + let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); + match kind { + grpc::meta::metadata::Kind::Default(d) => { + panic!("route expected to not be default, but got default {d:?}") + } + grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name), + } +} diff --git a/policy-test/tests/outbound_api_linkerd.rs b/policy-test/tests/outbound_api_linkerd.rs index fa3301752..b5c203f84 100644 --- a/policy-test/tests/outbound_api_linkerd.rs +++ b/policy-test/tests/outbound_api_linkerd.rs @@ -38,7 +38,7 @@ async fn service_with_no_http_routes() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -61,7 +61,7 @@ async fn service_with_http_route_without_rules() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -99,7 +99,7 @@ async fn service_with_http_routes_without_backends() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -113,7 +113,11 @@ async fn service_with_http_routes_without_backends() { assert_route_is_default(route, &svc, 4191); }); - let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, 4191).build()).await; + let _route = create( + &client, + mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(), + ) + .await; let config = rx .next() @@ -139,7 +143,7 @@ async fn service_with_http_routes_with_backend() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -156,8 +160,11 @@ async fn service_with_http_routes_with_backend() { let backend_name = "backend"; let backend_svc = create_service(&client, &ns, backend_name, 8888).await; let backends = [backend_name]; - let route = - mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(Some(&backends), None, None); + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends( + Some(&backends), + None, + None, + ); let _route = create(&client, route.build()).await; let config = rx @@ -186,7 +193,7 @@ async fn service_with_http_routes_with_cross_namespace_backend() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -218,7 +225,7 @@ async fn service_with_http_routes_with_cross_namespace_backend() { let backend_name = "backend"; let backend_svc = create_service(&client, &backend_ns_name, backend_name, 8888).await; let backends = [backend_name]; - let route = mk_http_route(&ns, "foo-route", &svc, 4191).with_backends( + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends( Some(&backends), Some(backend_ns_name), None, @@ -254,7 +261,7 @@ async fn service_with_http_routes_with_invalid_backend() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -269,8 +276,11 @@ async fn service_with_http_routes_with_invalid_backend() { }); let backends = ["invalid-backend"]; - let route = - mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(Some(&backends), None, None); + let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends( + Some(&backends), + None, + None, + ); let _route = create(&client, route.build()).await; let config = rx @@ -299,7 +309,7 @@ async fn service_with_multiple_http_routes() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -316,7 +326,11 @@ async fn service_with_multiple_http_routes() { // Routes should be returned in sorted order by creation timestamp then // name. To ensure that this test isn't timing dependant, routes should // be created in alphabetical order. - let _a_route = create(&client, mk_http_route(&ns, "a-route", &svc, 4191).build()).await; + let _a_route = create( + &client, + mk_http_route(&ns, "a-route", &svc, Some(4191)).build(), + ) + .await; // First route update. let config = rx @@ -326,7 +340,11 @@ async fn service_with_multiple_http_routes() { .expect("watch must return an updated config"); tracing::trace!(?config); - let _b_route = create(&client, mk_http_route(&ns, "b-route", &svc, 4191).build()).await; + let _b_route = create( + &client, + mk_http_route(&ns, "b-route", &svc, Some(4191)).build(), + ) + .await; // Second route update. let config = rx @@ -379,7 +397,7 @@ async fn service_with_consecutive_failure_accrual() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -423,7 +441,7 @@ async fn service_with_consecutive_failure_accrual_defaults() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -461,7 +479,7 @@ async fn service_with_consecutive_failure_accrual_defaults() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -499,7 +517,7 @@ async fn service_with_consecutive_failure_accrual_defaults() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -533,7 +551,7 @@ async fn service_with_default_failure_accrual() { // Default config for Service, no failure accrual let svc = create_service(&client, &ns, "default-failure-accrual", 80).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -563,7 +581,7 @@ async fn service_with_default_failure_accrual() { ) .await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -588,7 +606,7 @@ async fn opaque_service() { // Create a service let svc = create_opaque_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -611,7 +629,7 @@ async fn route_rule_with_filters() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -631,7 +649,8 @@ async fn route_rule_with_filters() { &ns, "foo-route", &svc, - 4191).with_backends(Some(&backends), None, None).with_filters( Some(vec![ + Some(4191), + ).with_backends(Some(&backends), None, None).with_filters(Some(vec![ k8s::policy::httproute::HttpRouteFilter::RequestHeaderModifier { request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter { set: Some(vec![k8s_gateway_api::HttpHeader { @@ -727,7 +746,7 @@ async fn backend_with_filters() { // Create a service let svc = create_service(&client, &ns, "my-svc", 4191).await; - let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await; + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; let config = rx .next() .await @@ -744,11 +763,12 @@ async fn backend_with_filters() { let backend_name = "backend"; let backend_svc = create_service(&client, &ns, backend_name, 8888).await; let backends = [backend_name]; - let route = mk_http_route( + let route = mk_http_route( &ns, "foo-route", &svc, - 4191).with_backends(Some(&backends), None, Some(vec![ + Some(4191) + ).with_backends(Some(&backends), None, Some(vec![ k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier { request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter { set: Some(vec![k8s_gateway_api::HttpHeader { @@ -774,10 +794,7 @@ async fn backend_with_filters() { }, }, ])); - let _route = create( - &client, - route.build(), - ) + let _route = create(&client, route.build()) .await; let config = rx @@ -842,6 +859,58 @@ async fn backend_with_filters() { .await; } +#[tokio::test(flavor = "current_thread")] +async fn http_route_with_no_port() { + with_temp_ns(|client, ns| async move { + // Create a service + let svc = create_service(&client, &ns, "my-svc", 4191).await; + + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + // There should be a default route. + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + assert_route_is_default(route, &svc, 4191); + }); + + let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await; + + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an updated config"); + tracing::trace!(?config); + + // The route should apply to the service. + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + assert_route_name_eq(route, "foo-route"); + }); + + // The route should apply to other ports too. + let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await; + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + tracing::trace!(?config); + + detect_http_routes(&config, |routes| { + let route = assert_singleton(routes); + assert_route_name_eq(route, "foo-route"); + }); + }) + .await; +} + /* Helpers */ struct HttpRouteBuilder(k8s::policy::HttpRoute); @@ -850,12 +919,13 @@ async fn retry_watch_outbound_policy( client: &kube::Client, ns: &str, svc: &k8s::Service, + port: u16, ) -> tonic::Streaming { // Port-forward to the control plane and start watching the service's // outbound policy. let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await; loop { - match policy_api.watch(ns, svc, 4191).await { + match policy_api.watch(ns, svc, port).await { Ok(rx) => return rx, Err(error) => { tracing::error!( @@ -870,7 +940,7 @@ async fn retry_watch_outbound_policy( } } -fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRouteBuilder { +fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option) -> HttpRouteBuilder { use k8s::policy::httproute as api; HttpRouteBuilder(api::HttpRoute { @@ -887,7 +957,7 @@ fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRou namespace: svc.namespace(), name: svc.name_unchecked(), section_name: None, - port: Some(port), + port, }]), }, hostnames: None, @@ -1125,6 +1195,14 @@ fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::Weigh #[track_caller] fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) { + let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); + match kind { + grpc::meta::metadata::Kind::Default(_) => {} + grpc::meta::metadata::Kind::Resource(r) => { + panic!("route expected to be default but got resource {r:?}") + } + } + let backends = route_backends_first_available(route); let backend = assert_singleton(backends); assert_backend_matches_service(backend, svc, port); @@ -1173,3 +1251,14 @@ fn assert_singleton(ts: &[T]) -> &T { assert_eq!(ts.len(), 1); ts.get(0).unwrap() } + +#[track_caller] +fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) { + let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap(); + match kind { + grpc::meta::metadata::Kind::Default(d) => { + panic!("route expected to not be default, but got default {d:?}") + } + grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name), + } +}