mirror of https://github.com/linkerd/linkerd2.git
Allow port in HttpRoute parent_ref to be optional (#11107)
According to the [xRoutes Mesh Binding KEP](https://gateway-api.sigs.k8s.io/geps/gep-1426/#ports), the port in a parent reference is optional: > By default, a Service attachment applies to all ports in the service. Users may want to attach routes to only a specific port in a Service. To do so, the parentRef.port field should be used. > If port is set, the implementation MUST associate the route only with that port. If port is not set, the implementation MUST associate the route with all ports defined in the Service. However, we currently ignore any HttpRoutes which don't have a port specified in the parent ref. We update the policy controller to apply HttpRoutes which do not specify a port in the parent ref to all ports of the parent service. We do this by storing these "portless" HttpRoutes in the index and then copying these routes into every port-specific watch for that service. Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
parent
7a477f7adc
commit
b981f52bcb
|
@ -42,7 +42,13 @@ struct NamespaceIndex {
|
|||
|
||||
#[derive(Debug)]
|
||||
struct Namespace {
|
||||
service_routes: HashMap<ServicePort, ServiceRoutes>,
|
||||
/// Stores an observable handle for each known service:port,
|
||||
/// as well as any route resources in the cluster that specify
|
||||
/// a port.
|
||||
service_port_routes: HashMap<ServicePort, ServiceRoutes>,
|
||||
/// Stores the route resources (by service name) that do not
|
||||
/// explicitly target a port.
|
||||
service_routes: HashMap<String, HashMap<GroupKindName, HttpRoute>>,
|
||||
namespace: Arc<String>,
|
||||
}
|
||||
|
||||
|
@ -133,6 +139,7 @@ impl kubert::index::IndexNamespacedResource<Service> for Index {
|
|||
.entry(ns.clone())
|
||||
.or_insert_with(|| Namespace {
|
||||
service_routes: Default::default(),
|
||||
service_port_routes: Default::default(),
|
||||
namespace: Arc::new(ns),
|
||||
})
|
||||
.update_service(service.name_unchecked(), &service_info);
|
||||
|
@ -177,6 +184,7 @@ impl Index {
|
|||
.entry(namespace.clone())
|
||||
.or_insert_with(|| Namespace {
|
||||
service_routes: Default::default(),
|
||||
service_port_routes: Default::default(),
|
||||
namespace: Arc::new(namespace.to_string()),
|
||||
});
|
||||
let key = ServicePort { service, port };
|
||||
|
@ -198,6 +206,7 @@ impl Index {
|
|||
.entry(ns.clone())
|
||||
.or_insert_with(|| Namespace {
|
||||
service_routes: Default::default(),
|
||||
service_port_routes: Default::default(),
|
||||
namespace: Arc::new(ns),
|
||||
})
|
||||
.apply(route, &self.namespaces.cluster_info, &self.service_info);
|
||||
|
@ -229,32 +238,43 @@ impl Namespace {
|
|||
continue;
|
||||
}
|
||||
|
||||
if let Some(port) = parent_ref.port {
|
||||
if let Some(port) = NonZeroU16::new(port) {
|
||||
let service_port = ServicePort {
|
||||
port,
|
||||
service: parent_ref.name.clone(),
|
||||
};
|
||||
tracing::debug!(
|
||||
?service_port,
|
||||
route = route.name(),
|
||||
"inserting route for service"
|
||||
);
|
||||
let service_routes =
|
||||
self.service_routes_or_default(service_port, cluster_info, service_info);
|
||||
service_routes.apply(route.gkn(), outbound_route.clone());
|
||||
} else {
|
||||
tracing::warn!(?parent_ref, "ignoring parent_ref with port 0");
|
||||
}
|
||||
let port = parent_ref.port.and_then(NonZeroU16::new);
|
||||
if let Some(port) = port {
|
||||
let service_port = ServicePort {
|
||||
port,
|
||||
service: parent_ref.name.clone(),
|
||||
};
|
||||
tracing::debug!(
|
||||
?service_port,
|
||||
route = route.name(),
|
||||
"inserting route for service"
|
||||
);
|
||||
let service_routes =
|
||||
self.service_routes_or_default(service_port, cluster_info, service_info);
|
||||
service_routes.apply(route.gkn(), outbound_route.clone());
|
||||
} else {
|
||||
tracing::warn!(?parent_ref, "ignoring parent_ref without port");
|
||||
// If the parent_ref doesn't include a port, apply this route
|
||||
// to all ServiceRoutes which match the Service name.
|
||||
self.service_port_routes.iter_mut().for_each(
|
||||
|(ServicePort { service, port: _ }, routes)| {
|
||||
if service == &parent_ref.name {
|
||||
routes.apply(route.gkn(), outbound_route.clone());
|
||||
}
|
||||
},
|
||||
);
|
||||
// Also add the route to the list of routes that target the
|
||||
// Service without specifying a port.
|
||||
self.service_routes
|
||||
.entry(parent_ref.name.clone())
|
||||
.or_default()
|
||||
.insert(route.gkn(), outbound_route.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn update_service(&mut self, name: String, service: &ServiceInfo) {
|
||||
tracing::debug!(?name, ?service, "updating service");
|
||||
for (svc_port, svc_routes) in self.service_routes.iter_mut() {
|
||||
for (svc_port, svc_routes) in self.service_port_routes.iter_mut() {
|
||||
if svc_port.service != name {
|
||||
continue;
|
||||
}
|
||||
|
@ -265,9 +285,12 @@ impl Namespace {
|
|||
}
|
||||
|
||||
fn delete(&mut self, gkn: GroupKindName) {
|
||||
for service in self.service_routes.values_mut() {
|
||||
for service in self.service_port_routes.values_mut() {
|
||||
service.delete(&gkn);
|
||||
}
|
||||
for routes in self.service_routes.values_mut() {
|
||||
routes.remove(&gkn);
|
||||
}
|
||||
}
|
||||
|
||||
fn service_routes_or_default(
|
||||
|
@ -276,33 +299,44 @@ impl Namespace {
|
|||
cluster: &ClusterInfo,
|
||||
service_info: &HashMap<ServiceRef, ServiceInfo>,
|
||||
) -> &mut ServiceRoutes {
|
||||
self.service_routes.entry(sp.clone()).or_insert_with(|| {
|
||||
let authority = cluster.service_dns_authority(&self.namespace, &sp.service, sp.port);
|
||||
let service_ref = ServiceRef {
|
||||
name: sp.service.clone(),
|
||||
namespace: self.namespace.to_string(),
|
||||
};
|
||||
let (opaque, accrual) = match service_info.get(&service_ref) {
|
||||
Some(svc) => (svc.opaque_ports.contains(&sp.port), svc.accrual),
|
||||
None => (false, None),
|
||||
};
|
||||
self.service_port_routes
|
||||
.entry(sp.clone())
|
||||
.or_insert_with(|| {
|
||||
let authority =
|
||||
cluster.service_dns_authority(&self.namespace, &sp.service, sp.port);
|
||||
let service_ref = ServiceRef {
|
||||
name: sp.service.clone(),
|
||||
namespace: self.namespace.to_string(),
|
||||
};
|
||||
let (opaque, accrual) = match service_info.get(&service_ref) {
|
||||
Some(svc) => (svc.opaque_ports.contains(&sp.port), svc.accrual),
|
||||
None => (false, None),
|
||||
};
|
||||
|
||||
let (sender, _) = watch::channel(OutboundPolicy {
|
||||
http_routes: Default::default(),
|
||||
authority,
|
||||
name: sp.service.clone(),
|
||||
namespace: self.namespace.to_string(),
|
||||
port: sp.port,
|
||||
opaque,
|
||||
accrual,
|
||||
});
|
||||
ServiceRoutes {
|
||||
routes: Default::default(),
|
||||
watch: sender,
|
||||
opaque,
|
||||
accrual,
|
||||
}
|
||||
})
|
||||
// The HttpRoutes which target this Service but don't specify
|
||||
// a port apply to all ports. Therefore we include them.
|
||||
let routes = self
|
||||
.service_routes
|
||||
.get(&sp.service)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
|
||||
let (sender, _) = watch::channel(OutboundPolicy {
|
||||
http_routes: routes.clone(),
|
||||
authority,
|
||||
name: sp.service.clone(),
|
||||
namespace: self.namespace.to_string(),
|
||||
port: sp.port,
|
||||
opaque,
|
||||
accrual,
|
||||
});
|
||||
ServiceRoutes {
|
||||
routes,
|
||||
watch: sender,
|
||||
opaque,
|
||||
accrual,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn convert_route(
|
||||
|
|
|
@ -33,7 +33,7 @@ async fn service_with_no_http_routes() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -56,7 +56,7 @@ async fn service_with_http_route_without_rules() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -94,7 +94,7 @@ async fn service_with_http_routes_without_backends() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -108,7 +108,11 @@ async fn service_with_http_routes_without_backends() {
|
|||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
|
||||
let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, 4191).build()).await;
|
||||
let _route = create(
|
||||
&client,
|
||||
mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let config = rx
|
||||
.next()
|
||||
|
@ -134,7 +138,7 @@ async fn service_with_http_routes_with_backend() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -151,8 +155,11 @@ async fn service_with_http_routes_with_backend() {
|
|||
let backend_name = "backend";
|
||||
let backend_svc = create_service(&client, &ns, backend_name, 8888).await;
|
||||
let backends = [backend_name];
|
||||
let route =
|
||||
mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(Some(&backends), None, None);
|
||||
let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
|
||||
Some(&backends),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let _route = create(&client, route.build()).await;
|
||||
|
||||
let config = rx
|
||||
|
@ -181,7 +188,7 @@ async fn service_with_http_routes_with_cross_namespace_backend() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -213,7 +220,7 @@ async fn service_with_http_routes_with_cross_namespace_backend() {
|
|||
let backend_name = "backend";
|
||||
let backend_svc = create_service(&client, &backend_ns_name, backend_name, 8888).await;
|
||||
let backends = [backend_name];
|
||||
let route = mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(
|
||||
let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
|
||||
Some(&backends),
|
||||
Some(backend_ns_name),
|
||||
None,
|
||||
|
@ -249,7 +256,7 @@ async fn service_with_http_routes_with_invalid_backend() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -264,8 +271,11 @@ async fn service_with_http_routes_with_invalid_backend() {
|
|||
});
|
||||
|
||||
let backends = ["invalid-backend"];
|
||||
let route =
|
||||
mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(Some(&backends), None, None);
|
||||
let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
|
||||
Some(&backends),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let _route = create(&client, route.build()).await;
|
||||
|
||||
let config = rx
|
||||
|
@ -294,7 +304,7 @@ async fn service_with_multiple_http_routes() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -311,7 +321,11 @@ async fn service_with_multiple_http_routes() {
|
|||
// Routes should be returned in sorted order by creation timestamp then
|
||||
// name. To ensure that this test isn't timing dependant, routes should
|
||||
// be created in alphabetical order.
|
||||
let _a_route = create(&client, mk_http_route(&ns, "a-route", &svc, 4191).build()).await;
|
||||
let _a_route = create(
|
||||
&client,
|
||||
mk_http_route(&ns, "a-route", &svc, Some(4191)).build(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// First route update.
|
||||
let config = rx
|
||||
|
@ -321,7 +335,11 @@ async fn service_with_multiple_http_routes() {
|
|||
.expect("watch must return an updated config");
|
||||
tracing::trace!(?config);
|
||||
|
||||
let _b_route = create(&client, mk_http_route(&ns, "b-route", &svc, 4191).build()).await;
|
||||
let _b_route = create(
|
||||
&client,
|
||||
mk_http_route(&ns, "b-route", &svc, Some(4191)).build(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Second route update.
|
||||
let config = rx
|
||||
|
@ -374,7 +392,7 @@ async fn service_with_consecutive_failure_accrual() {
|
|||
)
|
||||
.await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -418,7 +436,7 @@ async fn service_with_consecutive_failure_accrual_defaults() {
|
|||
)
|
||||
.await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -456,7 +474,7 @@ async fn service_with_consecutive_failure_accrual_defaults() {
|
|||
)
|
||||
.await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -494,7 +512,7 @@ async fn service_with_consecutive_failure_accrual_defaults() {
|
|||
)
|
||||
.await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -528,7 +546,7 @@ async fn service_with_default_failure_accrual() {
|
|||
// Default config for Service, no failure accrual
|
||||
let svc = create_service(&client, &ns, "default-failure-accrual", 80).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -558,7 +576,7 @@ async fn service_with_default_failure_accrual() {
|
|||
)
|
||||
.await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -583,7 +601,7 @@ async fn opaque_service() {
|
|||
// Create a service
|
||||
let svc = create_opaque_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -606,7 +624,7 @@ async fn route_with_filters() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -622,7 +640,7 @@ async fn route_with_filters() {
|
|||
|
||||
let backend_name = "backend";
|
||||
let backends = [backend_name];
|
||||
let route = mk_http_route(&ns, "foo-route", &svc, 4191)
|
||||
let route = mk_http_route(&ns, "foo-route", &svc, Some(4191))
|
||||
.with_backends(Some(&backends), None, None)
|
||||
.with_filters(Some(vec![
|
||||
k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier {
|
||||
|
@ -716,7 +734,7 @@ async fn backend_with_filters() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -733,7 +751,7 @@ async fn backend_with_filters() {
|
|||
let backend_name = "backend";
|
||||
let backend_svc = create_service(&client, &ns, backend_name, 8888).await;
|
||||
let backends = [backend_name];
|
||||
let route = mk_http_route(&ns, "foo-route", &svc, 4191)
|
||||
let route = mk_http_route(&ns, "foo-route", &svc, Some(4191))
|
||||
.with_backends(Some(&backends), None, Some(vec![
|
||||
k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier {
|
||||
request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter {
|
||||
|
@ -824,6 +842,58 @@ async fn backend_with_filters() {
|
|||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn http_route_with_no_port() {
|
||||
with_temp_ns(|client, ns| async move {
|
||||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config);
|
||||
|
||||
// There should be a default route.
|
||||
detect_http_routes(&config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
|
||||
let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await;
|
||||
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an updated config");
|
||||
tracing::trace!(?config);
|
||||
|
||||
// The route should apply to the service.
|
||||
detect_http_routes(&config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
|
||||
// The route should apply to other ports too.
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config);
|
||||
|
||||
detect_http_routes(&config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
/* Helpers */
|
||||
|
||||
struct HttpRouteBuilder(k8s_gateway_api::HttpRoute);
|
||||
|
@ -832,12 +902,13 @@ async fn retry_watch_outbound_policy(
|
|||
client: &kube::Client,
|
||||
ns: &str,
|
||||
svc: &k8s::Service,
|
||||
port: u16,
|
||||
) -> tonic::Streaming<grpc::outbound::OutboundPolicy> {
|
||||
// Port-forward to the control plane and start watching the service's
|
||||
// outbound policy.
|
||||
let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await;
|
||||
loop {
|
||||
match policy_api.watch(ns, svc, 4191).await {
|
||||
match policy_api.watch(ns, svc, port).await {
|
||||
Ok(rx) => return rx,
|
||||
Err(error) => {
|
||||
tracing::error!(
|
||||
|
@ -852,7 +923,7 @@ async fn retry_watch_outbound_policy(
|
|||
}
|
||||
}
|
||||
|
||||
fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRouteBuilder {
|
||||
fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option<u16>) -> HttpRouteBuilder {
|
||||
use k8s_gateway_api as api;
|
||||
|
||||
HttpRouteBuilder(api::HttpRoute {
|
||||
|
@ -869,7 +940,7 @@ fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRou
|
|||
namespace: svc.namespace(),
|
||||
name: svc.name_unchecked(),
|
||||
section_name: None,
|
||||
port: Some(port),
|
||||
port,
|
||||
}]),
|
||||
},
|
||||
hostnames: None,
|
||||
|
@ -1106,6 +1177,14 @@ fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::Weigh
|
|||
|
||||
#[track_caller]
|
||||
fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) {
|
||||
let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
|
||||
match kind {
|
||||
grpc::meta::metadata::Kind::Default(_) => {}
|
||||
grpc::meta::metadata::Kind::Resource(r) => {
|
||||
panic!("route expected to be default but got resource {r:?}")
|
||||
}
|
||||
}
|
||||
|
||||
let backends = route_backends_first_available(route);
|
||||
let backend = assert_singleton(backends);
|
||||
assert_backend_matches_service(backend, svc, port);
|
||||
|
@ -1154,3 +1233,14 @@ fn assert_singleton<T>(ts: &[T]) -> &T {
|
|||
assert_eq!(ts.len(), 1);
|
||||
ts.get(0).unwrap()
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) {
|
||||
let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
|
||||
match kind {
|
||||
grpc::meta::metadata::Kind::Default(d) => {
|
||||
panic!("route expected to not be default, but got default {d:?}")
|
||||
}
|
||||
grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ async fn service_with_no_http_routes() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -61,7 +61,7 @@ async fn service_with_http_route_without_rules() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -99,7 +99,7 @@ async fn service_with_http_routes_without_backends() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -113,7 +113,11 @@ async fn service_with_http_routes_without_backends() {
|
|||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
|
||||
let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, 4191).build()).await;
|
||||
let _route = create(
|
||||
&client,
|
||||
mk_http_route(&ns, "foo-route", &svc, Some(4191)).build(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let config = rx
|
||||
.next()
|
||||
|
@ -139,7 +143,7 @@ async fn service_with_http_routes_with_backend() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -156,8 +160,11 @@ async fn service_with_http_routes_with_backend() {
|
|||
let backend_name = "backend";
|
||||
let backend_svc = create_service(&client, &ns, backend_name, 8888).await;
|
||||
let backends = [backend_name];
|
||||
let route =
|
||||
mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(Some(&backends), None, None);
|
||||
let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
|
||||
Some(&backends),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let _route = create(&client, route.build()).await;
|
||||
|
||||
let config = rx
|
||||
|
@ -186,7 +193,7 @@ async fn service_with_http_routes_with_cross_namespace_backend() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -218,7 +225,7 @@ async fn service_with_http_routes_with_cross_namespace_backend() {
|
|||
let backend_name = "backend";
|
||||
let backend_svc = create_service(&client, &backend_ns_name, backend_name, 8888).await;
|
||||
let backends = [backend_name];
|
||||
let route = mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(
|
||||
let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
|
||||
Some(&backends),
|
||||
Some(backend_ns_name),
|
||||
None,
|
||||
|
@ -254,7 +261,7 @@ async fn service_with_http_routes_with_invalid_backend() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -269,8 +276,11 @@ async fn service_with_http_routes_with_invalid_backend() {
|
|||
});
|
||||
|
||||
let backends = ["invalid-backend"];
|
||||
let route =
|
||||
mk_http_route(&ns, "foo-route", &svc, 4191).with_backends(Some(&backends), None, None);
|
||||
let route = mk_http_route(&ns, "foo-route", &svc, Some(4191)).with_backends(
|
||||
Some(&backends),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let _route = create(&client, route.build()).await;
|
||||
|
||||
let config = rx
|
||||
|
@ -299,7 +309,7 @@ async fn service_with_multiple_http_routes() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -316,7 +326,11 @@ async fn service_with_multiple_http_routes() {
|
|||
// Routes should be returned in sorted order by creation timestamp then
|
||||
// name. To ensure that this test isn't timing dependant, routes should
|
||||
// be created in alphabetical order.
|
||||
let _a_route = create(&client, mk_http_route(&ns, "a-route", &svc, 4191).build()).await;
|
||||
let _a_route = create(
|
||||
&client,
|
||||
mk_http_route(&ns, "a-route", &svc, Some(4191)).build(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// First route update.
|
||||
let config = rx
|
||||
|
@ -326,7 +340,11 @@ async fn service_with_multiple_http_routes() {
|
|||
.expect("watch must return an updated config");
|
||||
tracing::trace!(?config);
|
||||
|
||||
let _b_route = create(&client, mk_http_route(&ns, "b-route", &svc, 4191).build()).await;
|
||||
let _b_route = create(
|
||||
&client,
|
||||
mk_http_route(&ns, "b-route", &svc, Some(4191)).build(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Second route update.
|
||||
let config = rx
|
||||
|
@ -379,7 +397,7 @@ async fn service_with_consecutive_failure_accrual() {
|
|||
)
|
||||
.await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -423,7 +441,7 @@ async fn service_with_consecutive_failure_accrual_defaults() {
|
|||
)
|
||||
.await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -461,7 +479,7 @@ async fn service_with_consecutive_failure_accrual_defaults() {
|
|||
)
|
||||
.await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -499,7 +517,7 @@ async fn service_with_consecutive_failure_accrual_defaults() {
|
|||
)
|
||||
.await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -533,7 +551,7 @@ async fn service_with_default_failure_accrual() {
|
|||
// Default config for Service, no failure accrual
|
||||
let svc = create_service(&client, &ns, "default-failure-accrual", 80).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -563,7 +581,7 @@ async fn service_with_default_failure_accrual() {
|
|||
)
|
||||
.await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -588,7 +606,7 @@ async fn opaque_service() {
|
|||
// Create a service
|
||||
let svc = create_opaque_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -611,7 +629,7 @@ async fn route_rule_with_filters() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -631,7 +649,8 @@ async fn route_rule_with_filters() {
|
|||
&ns,
|
||||
"foo-route",
|
||||
&svc,
|
||||
4191).with_backends(Some(&backends), None, None).with_filters( Some(vec![
|
||||
Some(4191),
|
||||
).with_backends(Some(&backends), None, None).with_filters(Some(vec![
|
||||
k8s::policy::httproute::HttpRouteFilter::RequestHeaderModifier {
|
||||
request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter {
|
||||
set: Some(vec![k8s_gateway_api::HttpHeader {
|
||||
|
@ -727,7 +746,7 @@ async fn backend_with_filters() {
|
|||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc).await;
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
|
@ -744,11 +763,12 @@ async fn backend_with_filters() {
|
|||
let backend_name = "backend";
|
||||
let backend_svc = create_service(&client, &ns, backend_name, 8888).await;
|
||||
let backends = [backend_name];
|
||||
let route = mk_http_route(
|
||||
let route = mk_http_route(
|
||||
&ns,
|
||||
"foo-route",
|
||||
&svc,
|
||||
4191).with_backends(Some(&backends), None, Some(vec![
|
||||
Some(4191)
|
||||
).with_backends(Some(&backends), None, Some(vec![
|
||||
k8s_gateway_api::HttpRouteFilter::RequestHeaderModifier {
|
||||
request_header_modifier: k8s_gateway_api::HttpRequestHeaderFilter {
|
||||
set: Some(vec![k8s_gateway_api::HttpHeader {
|
||||
|
@ -774,10 +794,7 @@ async fn backend_with_filters() {
|
|||
},
|
||||
},
|
||||
]));
|
||||
let _route = create(
|
||||
&client,
|
||||
route.build(),
|
||||
)
|
||||
let _route = create(&client, route.build())
|
||||
.await;
|
||||
|
||||
let config = rx
|
||||
|
@ -842,6 +859,58 @@ async fn backend_with_filters() {
|
|||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn http_route_with_no_port() {
|
||||
with_temp_ns(|client, ns| async move {
|
||||
// Create a service
|
||||
let svc = create_service(&client, &ns, "my-svc", 4191).await;
|
||||
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 4191).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config);
|
||||
|
||||
// There should be a default route.
|
||||
detect_http_routes(&config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_is_default(route, &svc, 4191);
|
||||
});
|
||||
|
||||
let _route = create(&client, mk_http_route(&ns, "foo-route", &svc, None).build()).await;
|
||||
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an updated config");
|
||||
tracing::trace!(?config);
|
||||
|
||||
// The route should apply to the service.
|
||||
detect_http_routes(&config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
|
||||
// The route should apply to other ports too.
|
||||
let mut rx = retry_watch_outbound_policy(&client, &ns, &svc, 9999).await;
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
tracing::trace!(?config);
|
||||
|
||||
detect_http_routes(&config, |routes| {
|
||||
let route = assert_singleton(routes);
|
||||
assert_route_name_eq(route, "foo-route");
|
||||
});
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
/* Helpers */
|
||||
|
||||
struct HttpRouteBuilder(k8s::policy::HttpRoute);
|
||||
|
@ -850,12 +919,13 @@ async fn retry_watch_outbound_policy(
|
|||
client: &kube::Client,
|
||||
ns: &str,
|
||||
svc: &k8s::Service,
|
||||
port: u16,
|
||||
) -> tonic::Streaming<grpc::outbound::OutboundPolicy> {
|
||||
// Port-forward to the control plane and start watching the service's
|
||||
// outbound policy.
|
||||
let mut policy_api = grpc::OutboundPolicyClient::port_forwarded(client).await;
|
||||
loop {
|
||||
match policy_api.watch(ns, svc, 4191).await {
|
||||
match policy_api.watch(ns, svc, port).await {
|
||||
Ok(rx) => return rx,
|
||||
Err(error) => {
|
||||
tracing::error!(
|
||||
|
@ -870,7 +940,7 @@ async fn retry_watch_outbound_policy(
|
|||
}
|
||||
}
|
||||
|
||||
fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRouteBuilder {
|
||||
fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: Option<u16>) -> HttpRouteBuilder {
|
||||
use k8s::policy::httproute as api;
|
||||
|
||||
HttpRouteBuilder(api::HttpRoute {
|
||||
|
@ -887,7 +957,7 @@ fn mk_http_route(ns: &str, name: &str, svc: &k8s::Service, port: u16) -> HttpRou
|
|||
namespace: svc.namespace(),
|
||||
name: svc.name_unchecked(),
|
||||
section_name: None,
|
||||
port: Some(port),
|
||||
port,
|
||||
}]),
|
||||
},
|
||||
hostnames: None,
|
||||
|
@ -1125,6 +1195,14 @@ fn assert_backend_has_failure_filter(backend: &grpc::outbound::http_route::Weigh
|
|||
|
||||
#[track_caller]
|
||||
fn assert_route_is_default(route: &grpc::outbound::HttpRoute, svc: &k8s::Service, port: u16) {
|
||||
let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
|
||||
match kind {
|
||||
grpc::meta::metadata::Kind::Default(_) => {}
|
||||
grpc::meta::metadata::Kind::Resource(r) => {
|
||||
panic!("route expected to be default but got resource {r:?}")
|
||||
}
|
||||
}
|
||||
|
||||
let backends = route_backends_first_available(route);
|
||||
let backend = assert_singleton(backends);
|
||||
assert_backend_matches_service(backend, svc, port);
|
||||
|
@ -1173,3 +1251,14 @@ fn assert_singleton<T>(ts: &[T]) -> &T {
|
|||
assert_eq!(ts.len(), 1);
|
||||
ts.get(0).unwrap()
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn assert_route_name_eq(route: &grpc::outbound::HttpRoute, name: &str) {
|
||||
let kind = route.metadata.as_ref().unwrap().kind.as_ref().unwrap();
|
||||
match kind {
|
||||
grpc::meta::metadata::Kind::Default(d) => {
|
||||
panic!("route expected to not be default, but got default {d:?}")
|
||||
}
|
||||
grpc::meta::metadata::Kind::Resource(resource) => assert_eq!(resource.name, *name),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue