mirror of https://github.com/linkerd/linkerd2.git
fix(policy): order Server resources deterministically (#13676)
The policy controller iterates over a map of Server resources when associating a Server with a Pod. The map iteration order is non-deterministic. We currently attempt to prevent multiple server resources from selecting a single pod-port, however, this logic is imperfect and unreliable, so the policy controller must handle overlapping Servers in any case... To resolve this, the inbound policy index now sorts Server resources by their creation timestamp (and then name) before iterating over them. This mimics our handling of Route resources... Furthermore, this change relaxes the policy controller's admission logic, avoiding the rejection of overlapping Server resources. This avoids an unnecessary API call (that may timeout, etc). An integration test is added to confirm the API response behavior.
This commit is contained in:
parent
bc6da62771
commit
4190abae7e
|
@ -1240,7 +1240,15 @@ impl Pod {
|
|||
std::hash::BuildHasherDefault::<PortHasher>::default(),
|
||||
);
|
||||
|
||||
for (srvname, server) in policy.servers.iter() {
|
||||
// Sort by creation and then name, similarly to HTTPRoutes, to enforce
|
||||
// precedence.
|
||||
let mut servers = policy.servers.iter().collect::<Vec<_>>();
|
||||
servers.sort_by(|(aname, asrv), (bname, bsrv)| {
|
||||
asrv.created_at
|
||||
.cmp(&bsrv.created_at)
|
||||
.then_with(|| aname.cmp(bname))
|
||||
});
|
||||
for (srvname, server) in servers {
|
||||
if let Selector::Pod(pod_selector) = &server.selector {
|
||||
if pod_selector.matches(&self.meta.labels) {
|
||||
for port in self.select_ports(&server.port_ref).into_iter() {
|
||||
|
@ -1489,7 +1497,15 @@ impl ExternalWorkload {
|
|||
std::hash::BuildHasherDefault::<PortHasher>::default(),
|
||||
);
|
||||
|
||||
for (srvname, server) in policy.servers.iter() {
|
||||
// Sort by creation and then name, similarly to HTTPRoutes, to enforce
|
||||
// precedence.
|
||||
let mut servers = policy.servers.iter().collect::<Vec<_>>();
|
||||
servers.sort_by(|(aname, asrv), (bname, bsrv)| {
|
||||
asrv.created_at
|
||||
.cmp(&bsrv.created_at)
|
||||
.then_with(|| aname.cmp(bname))
|
||||
});
|
||||
for (srvname, server) in servers {
|
||||
if let Selector::ExternalWorkload(selector) = &server.selector {
|
||||
if selector.matches(&self.meta.labels) {
|
||||
// Each server selects exactly one port on an
|
||||
|
|
|
@ -12,6 +12,7 @@ pub(crate) struct Server {
|
|||
pub port_ref: Port,
|
||||
pub protocol: ProxyProtocol,
|
||||
pub access_policy: Option<DefaultPolicy>,
|
||||
pub created_at: Option<k8s::Time>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
|
@ -22,6 +23,7 @@ impl Server {
|
|||
port_ref: srv.spec.port,
|
||||
protocol: proxy_protocol(srv.spec.proxy_protocol, cluster),
|
||||
access_policy: srv.spec.access_policy.and_then(|p| p.parse().ok()),
|
||||
created_at: srv.metadata.creation_timestamp,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
use super::validation;
|
||||
use crate::k8s::policy::{
|
||||
httproute, server::Selector, AuthorizationPolicy, AuthorizationPolicySpec, EgressNetwork,
|
||||
EgressNetworkSpec, HttpLocalRateLimitPolicy, HttpRoute, HttpRouteSpec, LocalTargetRef,
|
||||
MeshTLSAuthentication, MeshTLSAuthenticationSpec, NamespacedTargetRef, Network,
|
||||
NetworkAuthentication, NetworkAuthenticationSpec, RateLimitPolicySpec, Server,
|
||||
ServerAuthorization, ServerAuthorizationSpec, ServerSpec,
|
||||
httproute, AuthorizationPolicy, AuthorizationPolicySpec, EgressNetwork, EgressNetworkSpec,
|
||||
HttpLocalRateLimitPolicy, HttpRoute, HttpRouteSpec, LocalTargetRef, MeshTLSAuthentication,
|
||||
MeshTLSAuthenticationSpec, NamespacedTargetRef, Network, NetworkAuthentication,
|
||||
NetworkAuthenticationSpec, RateLimitPolicySpec, Server, ServerAuthorization,
|
||||
ServerAuthorizationSpec, ServerSpec,
|
||||
};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use futures::future;
|
||||
|
@ -19,9 +19,7 @@ use thiserror::Error;
|
|||
use tracing::{debug, info, trace, warn};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Admission {
|
||||
client: kube::Client,
|
||||
}
|
||||
pub struct Admission {}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {
|
||||
|
@ -102,8 +100,8 @@ impl hyper::service::Service<Request<Body>> for Admission {
|
|||
}
|
||||
|
||||
impl Admission {
|
||||
pub fn new(client: kube::Client) -> Self {
|
||||
Self { client }
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
async fn admit(self, req: AdmissionRequest) -> AdmissionResponse {
|
||||
|
@ -355,40 +353,14 @@ impl Validate<MeshTLSAuthenticationSpec> for Admission {
|
|||
|
||||
#[async_trait::async_trait]
|
||||
impl Validate<ServerSpec> for Admission {
|
||||
/// Checks that `spec` doesn't select the same pod/ports as other existing Servers, and that
|
||||
/// `accessPolicy` contains a valid value
|
||||
//
|
||||
// TODO(ver) this isn't rigorous about detecting servers that select the same port if one port
|
||||
// specifies a numeric port and the other specifies the port's name.
|
||||
/// Checks that `spec` has an `accessPolicy` with a valid value.
|
||||
async fn validate(
|
||||
self,
|
||||
ns: &str,
|
||||
name: &str,
|
||||
_ns: &str,
|
||||
_name: &str,
|
||||
_annotations: &BTreeMap<String, String>,
|
||||
spec: ServerSpec,
|
||||
) -> Result<()> {
|
||||
// Since we can't ensure that the local index is up-to-date with the API server (i.e.
|
||||
// updates may be delayed), we issue an API request to get the latest state of servers in
|
||||
// the namespace.
|
||||
let servers = kube::Api::<Server>::namespaced(self.client, ns)
|
||||
.list(&kube::api::ListParams::default())
|
||||
.await?;
|
||||
for server in servers.items.into_iter() {
|
||||
let server_name = server.name_unchecked();
|
||||
if server_name != name
|
||||
&& server.spec.port == spec.port
|
||||
&& Self::overlaps(&server.spec.selector, &spec.selector)
|
||||
{
|
||||
let server_ns = server.namespace();
|
||||
let server_ns = server_ns.as_deref().unwrap_or("default");
|
||||
bail!(
|
||||
"Server spec '{server_ns}/{server_name}' already defines a policy \
|
||||
for port {}, and selects pods that would be selected by this Server",
|
||||
server.spec.port,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(policy) = spec.access_policy {
|
||||
policy
|
||||
.parse::<index::DefaultPolicy>()
|
||||
|
@ -399,32 +371,6 @@ impl Validate<ServerSpec> for Admission {
|
|||
}
|
||||
}
|
||||
|
||||
impl Admission {
|
||||
/// Detects whether two pod selectors can select the same pod
|
||||
//
|
||||
// TODO(ver) We can probably detect overlapping selectors more effectively. For
|
||||
// example, if `left` selects pods with 'foo=bar' and `right` selects pods with
|
||||
// 'foo', we should indicate the selectors overlap. It's a bit tricky to work
|
||||
// through all of the cases though, so we'll just punt for now.
|
||||
fn overlaps(left: &Selector, right: &Selector) -> bool {
|
||||
match (left, right) {
|
||||
(Selector::Pod(ps_left), Selector::Pod(ps_right)) => {
|
||||
if ps_left.selects_all() || ps_right.selects_all() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
(Selector::ExternalWorkload(et_left), Selector::ExternalWorkload(et_right)) => {
|
||||
if et_left.selects_all() || et_right.selects_all() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
(_, _) => return false,
|
||||
}
|
||||
|
||||
left == right
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Validate<NetworkAuthenticationSpec> for Admission {
|
||||
async fn validate(
|
||||
|
|
|
@ -387,8 +387,7 @@ impl Args {
|
|||
.instrument(info_span!("status_controller")),
|
||||
);
|
||||
|
||||
let client = runtime.client();
|
||||
let runtime = runtime.spawn_server(|| Admission::new(client));
|
||||
let runtime = runtime.spawn_server(Admission::new);
|
||||
|
||||
// Block the main thread on the shutdown signal. Once it fires, wait for the background tasks to
|
||||
// complete before exiting.
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use linkerd_policy_controller_k8s_api::{
|
||||
self as api,
|
||||
policy::server::{Port, ProxyProtocol, Selector, Server, ServerSpec},
|
||||
policy::server::{Port, Selector, Server, ServerSpec},
|
||||
};
|
||||
use linkerd_policy_test::{admission, with_temp_ns};
|
||||
|
||||
|
@ -55,88 +55,6 @@ async fn accepts_server_updates() {
|
|||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn rejects_identitical_pod_selector() {
|
||||
with_temp_ns(|client, ns| async move {
|
||||
let spec = ServerSpec {
|
||||
selector: Selector::Pod(api::labels::Selector::from_iter(Some(("app", "test")))),
|
||||
port: Port::Number(80.try_into().unwrap()),
|
||||
proxy_protocol: None,
|
||||
access_policy: None,
|
||||
};
|
||||
|
||||
let api = kube::Api::namespaced(client, &ns);
|
||||
|
||||
let test0 = Server {
|
||||
metadata: api::ObjectMeta {
|
||||
namespace: Some(ns.clone()),
|
||||
name: Some("test0".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: spec.clone(),
|
||||
};
|
||||
api.create(&kube::api::PostParams::default(), &test0)
|
||||
.await
|
||||
.expect("resource must apply");
|
||||
|
||||
let test1 = Server {
|
||||
metadata: api::ObjectMeta {
|
||||
namespace: Some(ns),
|
||||
name: Some("test1".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec,
|
||||
};
|
||||
api.create(&kube::api::PostParams::default(), &test1)
|
||||
.await
|
||||
.expect_err("resource must not apply");
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn rejects_all_pods_selected() {
|
||||
with_temp_ns(|client, ns| async move {
|
||||
let api = kube::Api::namespaced(client, &ns);
|
||||
|
||||
let test0 = Server {
|
||||
metadata: api::ObjectMeta {
|
||||
namespace: Some(ns.clone()),
|
||||
name: Some("test0".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: ServerSpec {
|
||||
selector: Selector::Pod(api::labels::Selector::from_iter(Some(("app", "test")))),
|
||||
port: Port::Number(80.try_into().unwrap()),
|
||||
proxy_protocol: Some(ProxyProtocol::Http2),
|
||||
access_policy: None,
|
||||
},
|
||||
};
|
||||
api.create(&kube::api::PostParams::default(), &test0)
|
||||
.await
|
||||
.expect("resource must apply");
|
||||
|
||||
let test1 = Server {
|
||||
metadata: api::ObjectMeta {
|
||||
namespace: Some(ns),
|
||||
name: Some("test1".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: ServerSpec {
|
||||
selector: Selector::Pod(api::labels::Selector::default()),
|
||||
port: Port::Number(80.try_into().unwrap()),
|
||||
// proxy protocol doesn't factor into the selection
|
||||
proxy_protocol: Some(ProxyProtocol::Http1),
|
||||
access_policy: None,
|
||||
},
|
||||
};
|
||||
api.create(&kube::api::PostParams::default(), &test1)
|
||||
.await
|
||||
.expect_err("resource must not apply");
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn rejects_invalid_proxy_protocol() {
|
||||
/// Define a Server resource with an invalid proxy protocol
|
||||
|
|
|
@ -249,6 +249,54 @@ async fn server_with_authorization_policy() {
|
|||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn servers_ordered_by_creation() {
|
||||
with_temp_ns(|client, ns| async move {
|
||||
// Create two servers in order.
|
||||
let server0 = create(&client, mk_admin_server(&ns, "linkerd-admin-0", None)).await;
|
||||
let server1 = create(&client, mk_admin_server(&ns, "linkerd-admin-1", None)).await;
|
||||
|
||||
// Then create a pod that we can discover policy for...
|
||||
let pod = create_ready_pod(&client, mk_pause(&ns, "pause")).await;
|
||||
let mut rx = retry_watch_server(&client, &ns, &pod.name_unchecked()).await;
|
||||
|
||||
// The first update should reflect the first server.
|
||||
let config = rx
|
||||
.next()
|
||||
.await
|
||||
.expect("watch must not fail")
|
||||
.expect("watch must return an initial config");
|
||||
assert_eq!(
|
||||
config.labels,
|
||||
convert_args!(hashmap!(
|
||||
"group" => "policy.linkerd.io",
|
||||
"kind" => "server",
|
||||
"name" => server0.metadata.name.as_deref().expect("server0 name"),
|
||||
)),
|
||||
);
|
||||
|
||||
// Delete the first server and ensure that the update reverts to the
|
||||
// second.
|
||||
kube::Api::<k8s::policy::Server>::namespaced(client.clone(), &ns)
|
||||
.delete(
|
||||
server0.metadata.name.as_deref().expect("name"),
|
||||
&kube::api::DeleteParams::default(),
|
||||
)
|
||||
.await
|
||||
.expect("Server must be deleted");
|
||||
let config = next_config(&mut rx).await;
|
||||
assert_eq!(
|
||||
config.labels,
|
||||
convert_args!(hashmap!(
|
||||
"group" => "policy.linkerd.io",
|
||||
"kind" => "server",
|
||||
"name" => server1.metadata.name.as_deref().expect("server1 name"),
|
||||
)),
|
||||
);
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn server_with_audit_policy() {
|
||||
with_temp_ns(|client, ns| async move {
|
||||
|
|
Loading…
Reference in New Issue