diff --git a/policy-controller/k8s/index/src/inbound/index.rs b/policy-controller/k8s/index/src/inbound/index.rs index 88650b042..0950ec48e 100644 --- a/policy-controller/k8s/index/src/inbound/index.rs +++ b/policy-controller/k8s/index/src/inbound/index.rs @@ -1240,7 +1240,15 @@ impl Pod { std::hash::BuildHasherDefault::::default(), ); - for (srvname, server) in policy.servers.iter() { + // Sort by creation and then name, similarly to HTTPRoutes, to enforce + // precedence. + let mut servers = policy.servers.iter().collect::>(); + servers.sort_by(|(aname, asrv), (bname, bsrv)| { + asrv.created_at + .cmp(&bsrv.created_at) + .then_with(|| aname.cmp(bname)) + }); + for (srvname, server) in servers { if let Selector::Pod(pod_selector) = &server.selector { if pod_selector.matches(&self.meta.labels) { for port in self.select_ports(&server.port_ref).into_iter() { @@ -1489,7 +1497,15 @@ impl ExternalWorkload { std::hash::BuildHasherDefault::::default(), ); - for (srvname, server) in policy.servers.iter() { + // Sort by creation and then name, similarly to HTTPRoutes, to enforce + // precedence. + let mut servers = policy.servers.iter().collect::>(); + servers.sort_by(|(aname, asrv), (bname, bsrv)| { + asrv.created_at + .cmp(&bsrv.created_at) + .then_with(|| aname.cmp(bname)) + }); + for (srvname, server) in servers { if let Selector::ExternalWorkload(selector) = &server.selector { if selector.matches(&self.meta.labels) { // Each server selects exactly one port on an diff --git a/policy-controller/k8s/index/src/inbound/server.rs b/policy-controller/k8s/index/src/inbound/server.rs index ac984dd4d..d6526e998 100644 --- a/policy-controller/k8s/index/src/inbound/server.rs +++ b/policy-controller/k8s/index/src/inbound/server.rs @@ -12,6 +12,7 @@ pub(crate) struct Server { pub port_ref: Port, pub protocol: ProxyProtocol, pub access_policy: Option, + pub created_at: Option, } impl Server { @@ -22,6 +23,7 @@ impl Server { port_ref: srv.spec.port, protocol: proxy_protocol(srv.spec.proxy_protocol, cluster), access_policy: srv.spec.access_policy.and_then(|p| p.parse().ok()), + created_at: srv.metadata.creation_timestamp, } } } diff --git a/policy-controller/runtime/src/admission.rs b/policy-controller/runtime/src/admission.rs index c33be6a39..b69b1c986 100644 --- a/policy-controller/runtime/src/admission.rs +++ b/policy-controller/runtime/src/admission.rs @@ -1,10 +1,10 @@ use super::validation; use crate::k8s::policy::{ - httproute, server::Selector, AuthorizationPolicy, AuthorizationPolicySpec, EgressNetwork, - EgressNetworkSpec, HttpLocalRateLimitPolicy, HttpRoute, HttpRouteSpec, LocalTargetRef, - MeshTLSAuthentication, MeshTLSAuthenticationSpec, NamespacedTargetRef, Network, - NetworkAuthentication, NetworkAuthenticationSpec, RateLimitPolicySpec, Server, - ServerAuthorization, ServerAuthorizationSpec, ServerSpec, + httproute, AuthorizationPolicy, AuthorizationPolicySpec, EgressNetwork, EgressNetworkSpec, + HttpLocalRateLimitPolicy, HttpRoute, HttpRouteSpec, LocalTargetRef, MeshTLSAuthentication, + MeshTLSAuthenticationSpec, NamespacedTargetRef, Network, NetworkAuthentication, + NetworkAuthenticationSpec, RateLimitPolicySpec, Server, ServerAuthorization, + ServerAuthorizationSpec, ServerSpec, }; use anyhow::{anyhow, bail, ensure, Context, Result}; use futures::future; @@ -19,9 +19,7 @@ use thiserror::Error; use tracing::{debug, info, trace, warn}; #[derive(Clone)] -pub struct Admission { - client: kube::Client, -} +pub struct Admission {} #[derive(Debug, Error)] pub enum Error { @@ -102,8 +100,8 @@ impl hyper::service::Service> for Admission { } impl Admission { - pub fn new(client: kube::Client) -> Self { - Self { client } + pub fn new() -> Self { + Self {} } async fn admit(self, req: AdmissionRequest) -> AdmissionResponse { @@ -355,40 +353,14 @@ impl Validate for Admission { #[async_trait::async_trait] impl Validate for Admission { - /// Checks that `spec` doesn't select the same pod/ports as other existing Servers, and that - /// `accessPolicy` contains a valid value - // - // TODO(ver) this isn't rigorous about detecting servers that select the same port if one port - // specifies a numeric port and the other specifies the port's name. + /// Checks that `spec` has an `accessPolicy` with a valid value. async fn validate( self, - ns: &str, - name: &str, + _ns: &str, + _name: &str, _annotations: &BTreeMap, spec: ServerSpec, ) -> Result<()> { - // Since we can't ensure that the local index is up-to-date with the API server (i.e. - // updates may be delayed), we issue an API request to get the latest state of servers in - // the namespace. - let servers = kube::Api::::namespaced(self.client, ns) - .list(&kube::api::ListParams::default()) - .await?; - for server in servers.items.into_iter() { - let server_name = server.name_unchecked(); - if server_name != name - && server.spec.port == spec.port - && Self::overlaps(&server.spec.selector, &spec.selector) - { - let server_ns = server.namespace(); - let server_ns = server_ns.as_deref().unwrap_or("default"); - bail!( - "Server spec '{server_ns}/{server_name}' already defines a policy \ - for port {}, and selects pods that would be selected by this Server", - server.spec.port, - ); - } - } - if let Some(policy) = spec.access_policy { policy .parse::() @@ -399,32 +371,6 @@ impl Validate for Admission { } } -impl Admission { - /// Detects whether two pod selectors can select the same pod - // - // TODO(ver) We can probably detect overlapping selectors more effectively. For - // example, if `left` selects pods with 'foo=bar' and `right` selects pods with - // 'foo', we should indicate the selectors overlap. It's a bit tricky to work - // through all of the cases though, so we'll just punt for now. - fn overlaps(left: &Selector, right: &Selector) -> bool { - match (left, right) { - (Selector::Pod(ps_left), Selector::Pod(ps_right)) => { - if ps_left.selects_all() || ps_right.selects_all() { - return true; - } - } - (Selector::ExternalWorkload(et_left), Selector::ExternalWorkload(et_right)) => { - if et_left.selects_all() || et_right.selects_all() { - return true; - } - } - (_, _) => return false, - } - - left == right - } -} - #[async_trait::async_trait] impl Validate for Admission { async fn validate( diff --git a/policy-controller/runtime/src/args.rs b/policy-controller/runtime/src/args.rs index 75942f4f2..8363f433d 100644 --- a/policy-controller/runtime/src/args.rs +++ b/policy-controller/runtime/src/args.rs @@ -387,8 +387,7 @@ impl Args { .instrument(info_span!("status_controller")), ); - let client = runtime.client(); - let runtime = runtime.spawn_server(|| Admission::new(client)); + let runtime = runtime.spawn_server(Admission::new); // Block the main thread on the shutdown signal. Once it fires, wait for the background tasks to // complete before exiting. diff --git a/policy-test/tests/admit_server.rs b/policy-test/tests/admit_server.rs index faf67caf6..3d3ab9bff 100644 --- a/policy-test/tests/admit_server.rs +++ b/policy-test/tests/admit_server.rs @@ -1,6 +1,6 @@ use linkerd_policy_controller_k8s_api::{ self as api, - policy::server::{Port, ProxyProtocol, Selector, Server, ServerSpec}, + policy::server::{Port, Selector, Server, ServerSpec}, }; use linkerd_policy_test::{admission, with_temp_ns}; @@ -55,88 +55,6 @@ async fn accepts_server_updates() { .await; } -#[tokio::test(flavor = "current_thread")] -async fn rejects_identitical_pod_selector() { - with_temp_ns(|client, ns| async move { - let spec = ServerSpec { - selector: Selector::Pod(api::labels::Selector::from_iter(Some(("app", "test")))), - port: Port::Number(80.try_into().unwrap()), - proxy_protocol: None, - access_policy: None, - }; - - let api = kube::Api::namespaced(client, &ns); - - let test0 = Server { - metadata: api::ObjectMeta { - namespace: Some(ns.clone()), - name: Some("test0".to_string()), - ..Default::default() - }, - spec: spec.clone(), - }; - api.create(&kube::api::PostParams::default(), &test0) - .await - .expect("resource must apply"); - - let test1 = Server { - metadata: api::ObjectMeta { - namespace: Some(ns), - name: Some("test1".to_string()), - ..Default::default() - }, - spec, - }; - api.create(&kube::api::PostParams::default(), &test1) - .await - .expect_err("resource must not apply"); - }) - .await; -} - -#[tokio::test(flavor = "current_thread")] -async fn rejects_all_pods_selected() { - with_temp_ns(|client, ns| async move { - let api = kube::Api::namespaced(client, &ns); - - let test0 = Server { - metadata: api::ObjectMeta { - namespace: Some(ns.clone()), - name: Some("test0".to_string()), - ..Default::default() - }, - spec: ServerSpec { - selector: Selector::Pod(api::labels::Selector::from_iter(Some(("app", "test")))), - port: Port::Number(80.try_into().unwrap()), - proxy_protocol: Some(ProxyProtocol::Http2), - access_policy: None, - }, - }; - api.create(&kube::api::PostParams::default(), &test0) - .await - .expect("resource must apply"); - - let test1 = Server { - metadata: api::ObjectMeta { - namespace: Some(ns), - name: Some("test1".to_string()), - ..Default::default() - }, - spec: ServerSpec { - selector: Selector::Pod(api::labels::Selector::default()), - port: Port::Number(80.try_into().unwrap()), - // proxy protocol doesn't factor into the selection - proxy_protocol: Some(ProxyProtocol::Http1), - access_policy: None, - }, - }; - api.create(&kube::api::PostParams::default(), &test1) - .await - .expect_err("resource must not apply"); - }) - .await; -} - #[tokio::test(flavor = "current_thread")] async fn rejects_invalid_proxy_protocol() { /// Define a Server resource with an invalid proxy protocol diff --git a/policy-test/tests/inbound_api.rs b/policy-test/tests/inbound_api.rs index 9fc475582..b458f26a1 100644 --- a/policy-test/tests/inbound_api.rs +++ b/policy-test/tests/inbound_api.rs @@ -249,6 +249,54 @@ async fn server_with_authorization_policy() { .await; } +#[tokio::test(flavor = "current_thread")] +async fn servers_ordered_by_creation() { + with_temp_ns(|client, ns| async move { + // Create two servers in order. + let server0 = create(&client, mk_admin_server(&ns, "linkerd-admin-0", None)).await; + let server1 = create(&client, mk_admin_server(&ns, "linkerd-admin-1", None)).await; + + // Then create a pod that we can discover policy for... + let pod = create_ready_pod(&client, mk_pause(&ns, "pause")).await; + let mut rx = retry_watch_server(&client, &ns, &pod.name_unchecked()).await; + + // The first update should reflect the first server. + let config = rx + .next() + .await + .expect("watch must not fail") + .expect("watch must return an initial config"); + assert_eq!( + config.labels, + convert_args!(hashmap!( + "group" => "policy.linkerd.io", + "kind" => "server", + "name" => server0.metadata.name.as_deref().expect("server0 name"), + )), + ); + + // Delete the first server and ensure that the update reverts to the + // second. + kube::Api::::namespaced(client.clone(), &ns) + .delete( + server0.metadata.name.as_deref().expect("name"), + &kube::api::DeleteParams::default(), + ) + .await + .expect("Server must be deleted"); + let config = next_config(&mut rx).await; + assert_eq!( + config.labels, + convert_args!(hashmap!( + "group" => "policy.linkerd.io", + "kind" => "server", + "name" => server1.metadata.name.as_deref().expect("server1 name"), + )), + ); + }) + .await +} + #[tokio::test(flavor = "current_thread")] async fn server_with_audit_policy() { with_temp_ns(|client, ns| async move {